柚子快報激活碼778899分享:Golang學習筆記
RabbitMQ 簡介
實現了高級消息隊列協議(Advanced Message Queuing Protcol)AMQP消息隊列中間件的作用(Redis實現MQ里面有寫過,這里簡單帶過)
解耦削峰異步處理緩存消息通信提高擴展性
RabbitMQ 架構理解
#mermaid-svg-qMZy3zJ1gpoZK2gq {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-qMZy3zJ1gpoZK2gq .error-icon{fill:#552222;}#mermaid-svg-qMZy3zJ1gpoZK2gq .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-qMZy3zJ1gpoZK2gq .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-qMZy3zJ1gpoZK2gq .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-qMZy3zJ1gpoZK2gq .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-qMZy3zJ1gpoZK2gq .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-qMZy3zJ1gpoZK2gq .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-qMZy3zJ1gpoZK2gq .marker{fill:#333333;stroke:#333333;}#mermaid-svg-qMZy3zJ1gpoZK2gq .marker.cross{stroke:#333333;}#mermaid-svg-qMZy3zJ1gpoZK2gq svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-qMZy3zJ1gpoZK2gq .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-qMZy3zJ1gpoZK2gq .cluster-label text{fill:#333;}#mermaid-svg-qMZy3zJ1gpoZK2gq .cluster-label span{color:#333;}#mermaid-svg-qMZy3zJ1gpoZK2gq .label text,#mermaid-svg-qMZy3zJ1gpoZK2gq span{fill:#333;color:#333;}#mermaid-svg-qMZy3zJ1gpoZK2gq .node rect,#mermaid-svg-qMZy3zJ1gpoZK2gq .node circle,#mermaid-svg-qMZy3zJ1gpoZK2gq .node ellipse,#mermaid-svg-qMZy3zJ1gpoZK2gq .node polygon,#mermaid-svg-qMZy3zJ1gpoZK2gq .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-qMZy3zJ1gpoZK2gq .node .label{text-align:center;}#mermaid-svg-qMZy3zJ1gpoZK2gq .node.clickable{cursor:pointer;}#mermaid-svg-qMZy3zJ1gpoZK2gq .arrowheadPath{fill:#333333;}#mermaid-svg-qMZy3zJ1gpoZK2gq .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-qMZy3zJ1gpoZK2gq .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-qMZy3zJ1gpoZK2gq .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-qMZy3zJ1gpoZK2gq .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-qMZy3zJ1gpoZK2gq .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-qMZy3zJ1gpoZK2gq .cluster text{fill:#333;}#mermaid-svg-qMZy3zJ1gpoZK2gq .cluster span{color:#333;}#mermaid-svg-qMZy3zJ1gpoZK2gq div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-qMZy3zJ1gpoZK2gq :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;}
channel
binding
channel
channel
channel
Producer 生產者
Exchange交換機
Queue消息隊列
Consumer消費者
Consumer消費者
Consumer消費者
binding(綁定):交換機將消息路由給Queue所遵循的規(guī)則,可以定義一個路由鍵,用于交換機篩選特定的Queue
Routing_Key(路由鍵):Producer 和 Consumer 協商一致的 key 策略。主要在交換機的 direct(直連)和 topic(主題) 模式下使用,fanout(廣播)模式下不使用Routing_Key Exchange(交換機):主要功能是分發(fā)消息給特定的Queue,只負責轉發(fā),不具備存儲消息的功能。Exchange有以下四種模式:
direct(直連模式),根據攜帶的Routing_Key來篩選特定的Queue進行消息投遞。是RabbitMQ的默認類型,可以不指定Routing_Key,在創(chuàng)建時會默認生成與Queue重名。hander(頭模式),使用場景不多,消息路由涉及多個屬性的時候,交換機使用多屬性來代替Routing_key建立路由規(guī)則,還可以定義匹配單詞的個數,例如any為有一個單詞滿足條件就匹配成功。all為所有單詞都滿足條件才匹配成功。fanout(廣播模式),不看Routing_Key。只根據Exchange和Queue的binding情況來分發(fā)信息。所有與之binding的queue都將接收到同一條消息。topic(主題模式),相當于模糊查詢。topic的routing_key是使用 . 來進行隔斷的。有兩種匹配方法:
" * " 匹配一個單詞,例子如下" # " 匹配0個~多個單詞,例子如下
rabbitMQ.* == rabbitMQ.topic != rabbitMQ.topic.topic
rabbitMQ.# == rabbit.topic == rabbit.topic.topic
Queue(消息隊列的存儲數據結構):
存儲方式:
持久化,在Server本地硬盤存儲一份臨時隊列,重啟后丟失數據自動刪除,不存在用戶連接則刪除queue 隊列對ACK請求的不同情況
consumer 接收并 ack,queue 刪除數據并向 consumer 發(fā)送新消息consumer 接收但是未 ack 就斷開了連接,queue 會認為消息并未傳送成功,consumer 再次連接時會重新發(fā)送消息如果consumer 接收消息成功 ,但是忘記 ack 則 queue 不會重復發(fā)送消息如果 consumer 拒收消息,則 queue 會向另外滿足條件的 consumer 繼續(xù)發(fā)送這條消息
RabbitMQ 工作流程
Producer方向
Producer 與 RabbitMQ Broker 建立連接,開啟一個信道 channel聲明交換機并設置屬性(交換機類型、持久化等)聲明Queue并設置屬性(持久化,自動刪除等)通過Routing_key來binding交換機和Queue發(fā)送信息給交換,交換機根據Routing_key來確認投遞的queue查找成功后將消息存到queue查找失敗將消息丟棄或拋回給生產者關閉channel
Consumer方向
與 queue 建立連接,開啟channel向queue請求隊列中的msg等待queue回應,開始接收消息消息處理完成后 返回回調確認ackqueue 將確認的消息從隊列中刪除關閉channel
RabbitMQ的兩種部署方式
Meta Data : 元數據(描述數據的數據)
vhost meta data : 為Queue、Exchange、Binding提供命名空間級別的隔離exchange meta data:記錄路由的名稱類型和屬性binding mate data:映射 routing_key和queue之間的綁定關系queue mate data:表隊列名稱和屬性
普通模式
對于該模式的兩個節(jié)點,消息只會存在其中一個節(jié)點,另一個節(jié)點只保存mate data,當consumer 連接節(jié)點2訪問節(jié)點1的數據信息時,消息會在兩個節(jié)點中傳遞。 該模式下p和c應盡量連接每個節(jié)點,這樣起到線性拓展的作用。 但存在一個問題,如果節(jié)點上還有未消費的消息,但是節(jié)點掛了。如果節(jié)點設置了持久化,則需要在節(jié)點重啟的時候消息才會恢復。如果未設置持久化,則消息會丟失。
鏡像模式
消息存在多個節(jié)點中,消息會在節(jié)點與節(jié)點之間同步,可實現高可用(當一個節(jié)點掛了,另一個節(jié)點可以接替其位置,繼續(xù)工作)但會降低性能,因為大量消息進入和同步,會占用大量帶寬,但是為了保證高可靠性需要取舍。
面試題
Q:如何保證消息不被重復消費?
A:MQ通過確認機制ACK,進行確認。確認后消息從queue中刪除,保證消息不被重復消費的。如果因為網絡原因ack沒有成功發(fā)出,導致消息重新投遞??梢允褂萌治ㄒ幌d來避免。
消息發(fā)送者發(fā)送消息時攜帶一個全局唯一的消息id消費者監(jiān)聽到消息后,根據id在redis或者db中查詢是否存在消費記錄如果沒有消費就正常消費,消費完畢后,寫入redis或者db如果消息消費過則直接丟棄 Q:如何保證消息的消費順序?
A:RabbitMQ中存在一個設置,叫獨占隊列。即在同一時間只有一個消費者會消費消息。從而制止了異步操作,保證消費順序?;蛘咭粋€Producer對一個Consumer Q:如何保證數據一致性?
A:因為MQ的使用場景多為分布式系統(tǒng),所以一般不追求強一致性。而保證最終一致性就可以。而保證數據最終一致性,可以采用消息補償機制。即消息在消費者處理完之后調用生產者的API修改數據狀態(tài)。如未調用API則判斷為消息處理失敗或出錯。此時間隔一段時間后重新投遞消息進行再次操作。消費者收到消息,處理完畢后,發(fā)送一條響應消息給生產者也是消息補償機制,本意是確認消費者成功消費消息。ACK也是處理方法
RabbitMQ的使用(Golang使用amqp包)
代碼部分參考 upup小亮的博客
代碼只是簡單的操作,主要是熟悉流程。對于如何創(chuàng)建Queue和綁定Exchange之類的操作有個了解。
Simple(簡單收發(fā)模式,只有一個Queue)
Simple運行機制與WorkQueue相似,只是一個Consumer與多個Consumer的區(qū)別。多個Consumer之間存在競爭關系,所以工作隊列是創(chuàng)建多個Consumer,多個競爭只有一個可以獲取消息消費。消費成功后ack消息刪除。 演示代碼放到一起了:
WorkQueue 工作隊列
生產者
// simple and work queue
func main2() {
// 連接到 rabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("無法創(chuàng)建連接:%s", err)
return
}
// 默認關閉
defer conn.Close()
// 創(chuàng)建通道Channel
ch, err := conn.Channel()
if err != nil {
log.Fatalf("無法創(chuàng)建channel:%s", err)
return
}
// 通道關閉
defer ch.Close()
// 創(chuàng)建存儲隊列
queue, err := ch.QueueDeclare(
"hello", // 隊列名稱
false, // 持久化設置,可以為true根據需求選擇
false, // 自動刪除,沒有用戶連接刪除queue一般不選用
false, //獨占
false, //等待服務器確認
nil) //參數
if err != nil {
fmt.Println(err)
log.Fatalf("無法聲明隊列:%s", err)
return
}
var body string
// 發(fā)送信息
for i := 0; i < 10; i++ {
fmt.Println(i)
body = "Hello RabbitMQ" + string(i)
err = ch.Publish(
"",
queue.Name,
false, // 必須發(fā)送到消息隊列
false, // 不等待服務器確認
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
if err != nil {
log.Fatalf("消息生產失敗:%s", err)
continue
}
}
}
消費者
// create conn
// 如果同時運行兩個這樣的consumer代碼,就是工作隊列。只有一個consumer就是simple
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("無法創(chuàng)建連接:%s", err)
return
}
defer conn.Close()
// create channel
ch, err := conn.Channel()
if err != nil {
log.Fatalf("無法創(chuàng)建channel:%s", err)
return
}
defer ch.Close()
// create queue
queue, err := ch.QueueDeclare(
"hello",
false,
false,
false,
false,
nil)
if err != nil {
log.Fatalf("無法創(chuàng)建queue:%s", err)
return
}
// 消費信息
msgs, err := ch.Consume(
queue.Name,
"",
true,
false,
false,
false,
nil)
if err != nil {
log.Fatalf("無法消費信息:%s", err)
return
}
for msg := range msgs {
log.Println(string(msg.Body))
}
return
pub/sub 發(fā)布訂閱模式
發(fā)布訂閱模式可以創(chuàng)建兩個Queue,綁定到同一個Exchange中 生產者這邊只需要跟交換機對接,而交換機類型為fanout:
func main() {
// 連接到 rabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("無法創(chuàng)建連接:%s", err)
}
// 默認關閉
defer conn.Close()
// 創(chuàng)建通道Channel
ch, err := conn.Channel()
if err != nil {
log.Fatalf("無法創(chuàng)建channel:%s", err)
}
defer ch.Close()
// create exchange
ex := ch.ExchangeDeclare(
"exchange1", // 交換機名稱
"fanout", // 交換機類型
true, // 是否持久化
false, // 是否自動刪除
false, // 是否內部使用
false, // 是否等待服務器響應
nil, // 其他屬性
)
fmt.Println(ex)
body := "Hello RabbitMQ for Pub/Sub"
err = ch.Publish(
"exchange1",
"", // routing key 可以為空,因為fanout不看routing key
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
if err != nil {
log.Fatalf("err %s:", err)
}
log.Println(body)
}
消費者:創(chuàng)建交換機,類型為fanout,創(chuàng)建隊列,綁定交換機(創(chuàng)建多個consumer綁定同一個queue和同一個交換機。這樣發(fā)送一個消息,所有的consumer都能收到。== 發(fā)布訂閱模型)
// Pub/Sub
// Create conn
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil{
log.Fatalf(err)
}
defer conn.Close()
// channel create
ch, err := conn.Channel()
if err != nil{
log.Fatalf(err)
}
defer ch.Close()
// exchange create
ex := ch.ExchangeDeclare(
"exchange1",
"fanout",
true,
false,
false,
false,
nil)
fmt.Println(ex)
// queue create
queue, err := ch.QueueDeclare(
"hello",
false,
false,
false,
false,
nil)
if err != nil{
log.Fatalf(err)
}
err = ch.QueueBind(
queue.Name,
"",
"exchange1",
false,
nil)
if err != nil{
log.Fatalf(err)
}
msgs, err := ch.Consume(
queue.Name,
"",
true,
false,
false,
false,
nil)
if err != nil{
log.Fatalf(err)
}
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()
log.Printf("Waiting for messages. To exit press CTRL+C")
<-make(chan struct{}) // 阻塞主goroutine
}
Routing 模式(對特定的隊列投遞消息)
生產者
func main() {
// 連接到 rabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("無法創(chuàng)建連接:%s", err)
}
// 默認關閉
defer conn.Close()
// 創(chuàng)建通道Channel
ch, err := conn.Channel()
if err != nil {
log.Fatalf("無法創(chuàng)建channel:%s", err)
}
defer ch.Close()
// create exchange
ex := ch.ExchangeDeclare(
"exchange1", // 交換機名稱
"direct", // 交換機類型
true, // 是否持久化
false, // 是否自動刪除
false, // 是否內部使用
false, // 是否等待服務器響應
nil, // 其他屬性
)
fmt.Println(ex)
body := "Hello RabbitMQ for direct routing"
// 發(fā)布消息到交換機,并指定路由鍵
err = ch.Publish(
"logs_direct", // 交換機名稱
"routing_key", // 路由鍵
false, // 是否等待服務器響應
false, // 是否立即將消息寫入磁盤
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
},
)
if err != nil{
log.Fatalf("無法創(chuàng)建send msg:%s", err)
}
log.Printf("Sent message: %s", message)
消費者
func main() {
// 連接到RabbitMQ服務器
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil{
log.Fatalf("無法創(chuàng)建send msg:%s", err)
}
defer conn.Close()
// 創(chuàng)建一個通道
ch, err := conn.Channel()
if err != nil{
log.Fatalf("無法創(chuàng)建send msg:%s", err)
}
defer ch.Close()
// 聲明一個交換機
err = ch.ExchangeDeclare(
"logs_direct", // 交換機名稱
"direct", // 交換機類型
true, // 是否持久化
false, // 是否自動刪除
false, // 是否內部使用
false, // 是否等待服務器響應
nil, // 其他屬性
)
if err != nil{
log.Fatalf("無法創(chuàng)建send msg:%s", err)
}
// 聲明一個臨時隊列
q, err := ch.QueueDeclare(
"", // 隊列名稱,留空表示由RabbitMQ自動生成,因為定義了key所以隊列名可以是隨意的,畢竟是依靠key來進行匹配的
false, // 是否持久化
false, // 是否自動刪除(當沒有任何消費者連接時)
true, // 是否排他隊列(僅限于當前連接)
false, // 是否等待服務器響應
nil, // 其他屬性
)
// 將隊列綁定到交換機上,并指定要接收的路由鍵
err = ch.QueueBind(
q.Name, // 隊列名稱
"routing_key", // 路由鍵
"logs_direct", // 交換機名稱
false, // 是否等待服務器響應
nil, // 其他屬性
)
if err != nil{
log.Fatalf("無法創(chuàng)建send msg:%s", err)
}
// 訂閱消息
msgs, err := ch.Consume(
q.Name, // 隊列名稱
"", // 消費者標識符,留空表示由RabbitMQ自動生成
true, // 是否自動應答
false, // 是否獨占模式(僅限于當前連接)
false, // 是否等待服務器響應
false, // 其他屬性
nil, // 其他屬性
)
failOnError(err, "Failed to register a consumer")
// 接收消息的goroutine
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()
log.Printf("Waiting for messages. To exit press CTRL+C")
<-make(chan struct{}) // 阻塞主goroutine
topic
func main() {
// 連接到RabbitMQ服務器
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil{
log.Fatalf(err)
}
defer conn.Close()
// 創(chuàng)建一個通道
ch, err := conn.Channel()
if err != nil{
log.Fatalf(err)
}
defer ch.Close()
// 聲明一個交換機
err = ch.ExchangeDeclare(
"logs_topic", // 交換機名稱
"topic", // 交換機類型
true, // 是否持久化
false, // 是否自動刪除
false, // 是否內部使用
false, // 是否等待服務器響應
nil, // 其他屬性
)
if err != nil{
log.Fatalf(err)
}
// 定義要發(fā)送的消息的路由鍵和內容
routingKey := "example.key.das"
message := "Hello, RabbitMQ!"
// 發(fā)布消息到交換機,并指定路由鍵
err = ch.Publish(
"logs_topic", // 交換機名稱
routingKey, // 路由鍵
false, // 是否等待服務器響應
false, // 是否立即發(fā)送
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
},
)
if err != nil{
log.Fatalf(err)
}
log.Printf("Sent message: %s", message)
}
消費者
func main() {
// 連接到RabbitMQ服務器
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil{
log.Fatalf(err)
}
defer conn.Close()
// 創(chuàng)建一個通道
ch, err := conn.Channel()
if err != nil{
log.Fatalf(err)
}
defer ch.Close()
// 聲明一個交換機
err = ch.ExchangeDeclare(
"logs_topic", // 交換機名稱
"topic", // 交換機類型
true, // 是否持久化
false, // 是否自動刪除
false, // 是否內部使用
false, // 是否等待服務器響應
nil, // 其他屬性
)
if err != nil{
log.Fatalf(err)
}
// 聲明一個臨時隊列
q, err := ch.QueueDeclare(
"", // 隊列名稱,留空表示由RabbitMQ自動生成
false, // 是否持久化
false, // 是否自動刪除(當沒有任何消費者連接時)
true, // 是否排他隊列(僅限于當前連接)
false, // 是否等待服務器響應
nil, // 其他屬性
)
if err != nil{
log.Fatalf(err)
}
// 將隊列綁定到交換機上,并指定要接收的路由鍵
err = ch.QueueBind(
q.Name, // 隊列名稱
"example.#", // 路由鍵,可以使用通配符*匹配一個單詞
"logs_topic", // 交換機名稱
false, // 是否等待服務器響應
nil, // 其他屬性
)
if err != nil{
log.Fatalf(err)
}
// 創(chuàng)建一個消費者通道
msgs, err := ch.Consume(
q.Name, // 隊列名稱
"", // 消費者標識符,留空表示由RabbitMQ自動生成
true, // 是否自動應答
false, // 是否排他消費者
false, // 是否阻塞
false, // 是否等待服務器響應
nil, // 其他屬性
)
if err != nil{
log.Fatalf(err)
}
// 接收和處理消息
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()
log.Printf("Waiting for messages...")
// 阻塞
<-forever
}
柚子快報激活碼778899分享:Golang學習筆記
好文閱讀
本文內容根據網絡資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。
轉載請注明,如有侵權,聯系刪除。