欧美free性护士vide0shd,老熟女,一区二区三区,久久久久夜夜夜精品国产,久久久久久综合网天天,欧美成人护士h版

首頁綜合 正文
目錄

柚子快報激活碼778899分享:Golang學習筆記

柚子快報激活碼778899分享:Golang學習筆記

http://yzkb.51969.com/

RabbitMQ 簡介

實現了高級消息隊列協議(Advanced Message Queuing Protcol)AMQP消息隊列中間件的作用(Redis實現MQ里面有寫過,這里簡單帶過)

解耦削峰異步處理緩存消息通信提高擴展性

RabbitMQ 架構理解

#mermaid-svg-qMZy3zJ1gpoZK2gq {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-qMZy3zJ1gpoZK2gq .error-icon{fill:#552222;}#mermaid-svg-qMZy3zJ1gpoZK2gq .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-qMZy3zJ1gpoZK2gq .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-qMZy3zJ1gpoZK2gq .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-qMZy3zJ1gpoZK2gq .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-qMZy3zJ1gpoZK2gq .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-qMZy3zJ1gpoZK2gq .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-qMZy3zJ1gpoZK2gq .marker{fill:#333333;stroke:#333333;}#mermaid-svg-qMZy3zJ1gpoZK2gq .marker.cross{stroke:#333333;}#mermaid-svg-qMZy3zJ1gpoZK2gq svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-qMZy3zJ1gpoZK2gq .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-qMZy3zJ1gpoZK2gq .cluster-label text{fill:#333;}#mermaid-svg-qMZy3zJ1gpoZK2gq .cluster-label span{color:#333;}#mermaid-svg-qMZy3zJ1gpoZK2gq .label text,#mermaid-svg-qMZy3zJ1gpoZK2gq span{fill:#333;color:#333;}#mermaid-svg-qMZy3zJ1gpoZK2gq .node rect,#mermaid-svg-qMZy3zJ1gpoZK2gq .node circle,#mermaid-svg-qMZy3zJ1gpoZK2gq .node ellipse,#mermaid-svg-qMZy3zJ1gpoZK2gq .node polygon,#mermaid-svg-qMZy3zJ1gpoZK2gq .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-qMZy3zJ1gpoZK2gq .node .label{text-align:center;}#mermaid-svg-qMZy3zJ1gpoZK2gq .node.clickable{cursor:pointer;}#mermaid-svg-qMZy3zJ1gpoZK2gq .arrowheadPath{fill:#333333;}#mermaid-svg-qMZy3zJ1gpoZK2gq .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-qMZy3zJ1gpoZK2gq .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-qMZy3zJ1gpoZK2gq .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-qMZy3zJ1gpoZK2gq .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-qMZy3zJ1gpoZK2gq .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-qMZy3zJ1gpoZK2gq .cluster text{fill:#333;}#mermaid-svg-qMZy3zJ1gpoZK2gq .cluster span{color:#333;}#mermaid-svg-qMZy3zJ1gpoZK2gq div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-qMZy3zJ1gpoZK2gq :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;}

channel

binding

channel

channel

channel

Producer 生產者

Exchange交換機

Queue消息隊列

Consumer消費者

Consumer消費者

Consumer消費者

binding(綁定):交換機將消息路由給Queue所遵循的規(guī)則,可以定義一個路由鍵,用于交換機篩選特定的Queue

Routing_Key(路由鍵):Producer 和 Consumer 協商一致的 key 策略。主要在交換機的 direct(直連)和 topic(主題) 模式下使用,fanout(廣播)模式下不使用Routing_Key Exchange(交換機):主要功能是分發(fā)消息給特定的Queue,只負責轉發(fā),不具備存儲消息的功能。Exchange有以下四種模式:

direct(直連模式),根據攜帶的Routing_Key來篩選特定的Queue進行消息投遞。是RabbitMQ的默認類型,可以不指定Routing_Key,在創(chuàng)建時會默認生成與Queue重名。hander(頭模式),使用場景不多,消息路由涉及多個屬性的時候,交換機使用多屬性來代替Routing_key建立路由規(guī)則,還可以定義匹配單詞的個數,例如any為有一個單詞滿足條件就匹配成功。all為所有單詞都滿足條件才匹配成功。fanout(廣播模式),不看Routing_Key。只根據Exchange和Queue的binding情況來分發(fā)信息。所有與之binding的queue都將接收到同一條消息。topic(主題模式),相當于模糊查詢。topic的routing_key是使用 . 來進行隔斷的。有兩種匹配方法:

" * " 匹配一個單詞,例子如下" # " 匹配0個~多個單詞,例子如下

rabbitMQ.* == rabbitMQ.topic != rabbitMQ.topic.topic

rabbitMQ.# == rabbit.topic == rabbit.topic.topic

Queue(消息隊列的存儲數據結構):

存儲方式:

持久化,在Server本地硬盤存儲一份臨時隊列,重啟后丟失數據自動刪除,不存在用戶連接則刪除queue 隊列對ACK請求的不同情況

consumer 接收并 ack,queue 刪除數據并向 consumer 發(fā)送新消息consumer 接收但是未 ack 就斷開了連接,queue 會認為消息并未傳送成功,consumer 再次連接時會重新發(fā)送消息如果consumer 接收消息成功 ,但是忘記 ack 則 queue 不會重復發(fā)送消息如果 consumer 拒收消息,則 queue 會向另外滿足條件的 consumer 繼續(xù)發(fā)送這條消息

RabbitMQ 工作流程

Producer方向

Producer 與 RabbitMQ Broker 建立連接,開啟一個信道 channel聲明交換機并設置屬性(交換機類型、持久化等)聲明Queue并設置屬性(持久化,自動刪除等)通過Routing_key來binding交換機和Queue發(fā)送信息給交換,交換機根據Routing_key來確認投遞的queue查找成功后將消息存到queue查找失敗將消息丟棄或拋回給生產者關閉channel

Consumer方向

與 queue 建立連接,開啟channel向queue請求隊列中的msg等待queue回應,開始接收消息消息處理完成后 返回回調確認ackqueue 將確認的消息從隊列中刪除關閉channel

RabbitMQ的兩種部署方式

Meta Data : 元數據(描述數據的數據)

vhost meta data : 為Queue、Exchange、Binding提供命名空間級別的隔離exchange meta data:記錄路由的名稱類型和屬性binding mate data:映射 routing_key和queue之間的綁定關系queue mate data:表隊列名稱和屬性

普通模式

對于該模式的兩個節(jié)點,消息只會存在其中一個節(jié)點,另一個節(jié)點只保存mate data,當consumer 連接節(jié)點2訪問節(jié)點1的數據信息時,消息會在兩個節(jié)點中傳遞。 該模式下p和c應盡量連接每個節(jié)點,這樣起到線性拓展的作用。 但存在一個問題,如果節(jié)點上還有未消費的消息,但是節(jié)點掛了。如果節(jié)點設置了持久化,則需要在節(jié)點重啟的時候消息才會恢復。如果未設置持久化,則消息會丟失。

鏡像模式

消息存在多個節(jié)點中,消息會在節(jié)點與節(jié)點之間同步,可實現高可用(當一個節(jié)點掛了,另一個節(jié)點可以接替其位置,繼續(xù)工作)但會降低性能,因為大量消息進入和同步,會占用大量帶寬,但是為了保證高可靠性需要取舍。

面試題

Q:如何保證消息不被重復消費?

A:MQ通過確認機制ACK,進行確認。確認后消息從queue中刪除,保證消息不被重復消費的。如果因為網絡原因ack沒有成功發(fā)出,導致消息重新投遞??梢允褂萌治ㄒ幌d來避免。

消息發(fā)送者發(fā)送消息時攜帶一個全局唯一的消息id消費者監(jiān)聽到消息后,根據id在redis或者db中查詢是否存在消費記錄如果沒有消費就正常消費,消費完畢后,寫入redis或者db如果消息消費過則直接丟棄 Q:如何保證消息的消費順序?

A:RabbitMQ中存在一個設置,叫獨占隊列。即在同一時間只有一個消費者會消費消息。從而制止了異步操作,保證消費順序?;蛘咭粋€Producer對一個Consumer Q:如何保證數據一致性?

A:因為MQ的使用場景多為分布式系統(tǒng),所以一般不追求強一致性。而保證最終一致性就可以。而保證數據最終一致性,可以采用消息補償機制。即消息在消費者處理完之后調用生產者的API修改數據狀態(tài)。如未調用API則判斷為消息處理失敗或出錯。此時間隔一段時間后重新投遞消息進行再次操作。消費者收到消息,處理完畢后,發(fā)送一條響應消息給生產者也是消息補償機制,本意是確認消費者成功消費消息。ACK也是處理方法

RabbitMQ的使用(Golang使用amqp包)

代碼部分參考 upup小亮的博客

代碼只是簡單的操作,主要是熟悉流程。對于如何創(chuàng)建Queue和綁定Exchange之類的操作有個了解。

Simple(簡單收發(fā)模式,只有一個Queue)

Simple運行機制與WorkQueue相似,只是一個Consumer與多個Consumer的區(qū)別。多個Consumer之間存在競爭關系,所以工作隊列是創(chuàng)建多個Consumer,多個競爭只有一個可以獲取消息消費。消費成功后ack消息刪除。 演示代碼放到一起了:

WorkQueue 工作隊列

生產者

// simple and work queue

func main2() {

// 連接到 rabbitMQ

conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")

if err != nil {

log.Fatalf("無法創(chuàng)建連接:%s", err)

return

}

// 默認關閉

defer conn.Close()

// 創(chuàng)建通道Channel

ch, err := conn.Channel()

if err != nil {

log.Fatalf("無法創(chuàng)建channel:%s", err)

return

}

// 通道關閉

defer ch.Close()

// 創(chuàng)建存儲隊列

queue, err := ch.QueueDeclare(

"hello", // 隊列名稱

false, // 持久化設置,可以為true根據需求選擇

false, // 自動刪除,沒有用戶連接刪除queue一般不選用

false, //獨占

false, //等待服務器確認

nil) //參數

if err != nil {

fmt.Println(err)

log.Fatalf("無法聲明隊列:%s", err)

return

}

var body string

// 發(fā)送信息

for i := 0; i < 10; i++ {

fmt.Println(i)

body = "Hello RabbitMQ" + string(i)

err = ch.Publish(

"",

queue.Name,

false, // 必須發(fā)送到消息隊列

false, // 不等待服務器確認

amqp.Publishing{

ContentType: "text/plain",

Body: []byte(body),

})

if err != nil {

log.Fatalf("消息生產失敗:%s", err)

continue

}

}

}

消費者

// create conn

// 如果同時運行兩個這樣的consumer代碼,就是工作隊列。只有一個consumer就是simple

conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")

if err != nil {

log.Fatalf("無法創(chuàng)建連接:%s", err)

return

}

defer conn.Close()

// create channel

ch, err := conn.Channel()

if err != nil {

log.Fatalf("無法創(chuàng)建channel:%s", err)

return

}

defer ch.Close()

// create queue

queue, err := ch.QueueDeclare(

"hello",

false,

false,

false,

false,

nil)

if err != nil {

log.Fatalf("無法創(chuàng)建queue:%s", err)

return

}

// 消費信息

msgs, err := ch.Consume(

queue.Name,

"",

true,

false,

false,

false,

nil)

if err != nil {

log.Fatalf("無法消費信息:%s", err)

return

}

for msg := range msgs {

log.Println(string(msg.Body))

}

return

pub/sub 發(fā)布訂閱模式

發(fā)布訂閱模式可以創(chuàng)建兩個Queue,綁定到同一個Exchange中 生產者這邊只需要跟交換機對接,而交換機類型為fanout:

func main() {

// 連接到 rabbitMQ

conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")

if err != nil {

log.Fatalf("無法創(chuàng)建連接:%s", err)

}

// 默認關閉

defer conn.Close()

// 創(chuàng)建通道Channel

ch, err := conn.Channel()

if err != nil {

log.Fatalf("無法創(chuàng)建channel:%s", err)

}

defer ch.Close()

// create exchange

ex := ch.ExchangeDeclare(

"exchange1", // 交換機名稱

"fanout", // 交換機類型

true, // 是否持久化

false, // 是否自動刪除

false, // 是否內部使用

false, // 是否等待服務器響應

nil, // 其他屬性

)

fmt.Println(ex)

body := "Hello RabbitMQ for Pub/Sub"

err = ch.Publish(

"exchange1",

"", // routing key 可以為空,因為fanout不看routing key

false,

false,

amqp.Publishing{

ContentType: "text/plain",

Body: []byte(body),

})

if err != nil {

log.Fatalf("err %s:", err)

}

log.Println(body)

}

消費者:創(chuàng)建交換機,類型為fanout,創(chuàng)建隊列,綁定交換機(創(chuàng)建多個consumer綁定同一個queue和同一個交換機。這樣發(fā)送一個消息,所有的consumer都能收到。== 發(fā)布訂閱模型)

// Pub/Sub

// Create conn

conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")

if err != nil{

log.Fatalf(err)

}

defer conn.Close()

// channel create

ch, err := conn.Channel()

if err != nil{

log.Fatalf(err)

}

defer ch.Close()

// exchange create

ex := ch.ExchangeDeclare(

"exchange1",

"fanout",

true,

false,

false,

false,

nil)

fmt.Println(ex)

// queue create

queue, err := ch.QueueDeclare(

"hello",

false,

false,

false,

false,

nil)

if err != nil{

log.Fatalf(err)

}

err = ch.QueueBind(

queue.Name,

"",

"exchange1",

false,

nil)

if err != nil{

log.Fatalf(err)

}

msgs, err := ch.Consume(

queue.Name,

"",

true,

false,

false,

false,

nil)

if err != nil{

log.Fatalf(err)

}

go func() {

for d := range msgs {

log.Printf("Received a message: %s", d.Body)

}

}()

log.Printf("Waiting for messages. To exit press CTRL+C")

<-make(chan struct{}) // 阻塞主goroutine

}

Routing 模式(對特定的隊列投遞消息)

生產者

func main() {

// 連接到 rabbitMQ

conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")

if err != nil {

log.Fatalf("無法創(chuàng)建連接:%s", err)

}

// 默認關閉

defer conn.Close()

// 創(chuàng)建通道Channel

ch, err := conn.Channel()

if err != nil {

log.Fatalf("無法創(chuàng)建channel:%s", err)

}

defer ch.Close()

// create exchange

ex := ch.ExchangeDeclare(

"exchange1", // 交換機名稱

"direct", // 交換機類型

true, // 是否持久化

false, // 是否自動刪除

false, // 是否內部使用

false, // 是否等待服務器響應

nil, // 其他屬性

)

fmt.Println(ex)

body := "Hello RabbitMQ for direct routing"

// 發(fā)布消息到交換機,并指定路由鍵

err = ch.Publish(

"logs_direct", // 交換機名稱

"routing_key", // 路由鍵

false, // 是否等待服務器響應

false, // 是否立即將消息寫入磁盤

amqp.Publishing{

ContentType: "text/plain",

Body: []byte(body),

},

)

if err != nil{

log.Fatalf("無法創(chuàng)建send msg:%s", err)

}

log.Printf("Sent message: %s", message)

消費者

func main() {

// 連接到RabbitMQ服務器

conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")

if err != nil{

log.Fatalf("無法創(chuàng)建send msg:%s", err)

}

defer conn.Close()

// 創(chuàng)建一個通道

ch, err := conn.Channel()

if err != nil{

log.Fatalf("無法創(chuàng)建send msg:%s", err)

}

defer ch.Close()

// 聲明一個交換機

err = ch.ExchangeDeclare(

"logs_direct", // 交換機名稱

"direct", // 交換機類型

true, // 是否持久化

false, // 是否自動刪除

false, // 是否內部使用

false, // 是否等待服務器響應

nil, // 其他屬性

)

if err != nil{

log.Fatalf("無法創(chuàng)建send msg:%s", err)

}

// 聲明一個臨時隊列

q, err := ch.QueueDeclare(

"", // 隊列名稱,留空表示由RabbitMQ自動生成,因為定義了key所以隊列名可以是隨意的,畢竟是依靠key來進行匹配的

false, // 是否持久化

false, // 是否自動刪除(當沒有任何消費者連接時)

true, // 是否排他隊列(僅限于當前連接)

false, // 是否等待服務器響應

nil, // 其他屬性

)

// 將隊列綁定到交換機上,并指定要接收的路由鍵

err = ch.QueueBind(

q.Name, // 隊列名稱

"routing_key", // 路由鍵

"logs_direct", // 交換機名稱

false, // 是否等待服務器響應

nil, // 其他屬性

)

if err != nil{

log.Fatalf("無法創(chuàng)建send msg:%s", err)

}

// 訂閱消息

msgs, err := ch.Consume(

q.Name, // 隊列名稱

"", // 消費者標識符,留空表示由RabbitMQ自動生成

true, // 是否自動應答

false, // 是否獨占模式(僅限于當前連接)

false, // 是否等待服務器響應

false, // 其他屬性

nil, // 其他屬性

)

failOnError(err, "Failed to register a consumer")

// 接收消息的goroutine

go func() {

for d := range msgs {

log.Printf("Received a message: %s", d.Body)

}

}()

log.Printf("Waiting for messages. To exit press CTRL+C")

<-make(chan struct{}) // 阻塞主goroutine

topic

func main() {

// 連接到RabbitMQ服務器

conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")

if err != nil{

log.Fatalf(err)

}

defer conn.Close()

// 創(chuàng)建一個通道

ch, err := conn.Channel()

if err != nil{

log.Fatalf(err)

}

defer ch.Close()

// 聲明一個交換機

err = ch.ExchangeDeclare(

"logs_topic", // 交換機名稱

"topic", // 交換機類型

true, // 是否持久化

false, // 是否自動刪除

false, // 是否內部使用

false, // 是否等待服務器響應

nil, // 其他屬性

)

if err != nil{

log.Fatalf(err)

}

// 定義要發(fā)送的消息的路由鍵和內容

routingKey := "example.key.das"

message := "Hello, RabbitMQ!"

// 發(fā)布消息到交換機,并指定路由鍵

err = ch.Publish(

"logs_topic", // 交換機名稱

routingKey, // 路由鍵

false, // 是否等待服務器響應

false, // 是否立即發(fā)送

amqp.Publishing{

ContentType: "text/plain",

Body: []byte(message),

},

)

if err != nil{

log.Fatalf(err)

}

log.Printf("Sent message: %s", message)

}

消費者

func main() {

// 連接到RabbitMQ服務器

conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")

if err != nil{

log.Fatalf(err)

}

defer conn.Close()

// 創(chuàng)建一個通道

ch, err := conn.Channel()

if err != nil{

log.Fatalf(err)

}

defer ch.Close()

// 聲明一個交換機

err = ch.ExchangeDeclare(

"logs_topic", // 交換機名稱

"topic", // 交換機類型

true, // 是否持久化

false, // 是否自動刪除

false, // 是否內部使用

false, // 是否等待服務器響應

nil, // 其他屬性

)

if err != nil{

log.Fatalf(err)

}

// 聲明一個臨時隊列

q, err := ch.QueueDeclare(

"", // 隊列名稱,留空表示由RabbitMQ自動生成

false, // 是否持久化

false, // 是否自動刪除(當沒有任何消費者連接時)

true, // 是否排他隊列(僅限于當前連接)

false, // 是否等待服務器響應

nil, // 其他屬性

)

if err != nil{

log.Fatalf(err)

}

// 將隊列綁定到交換機上,并指定要接收的路由鍵

err = ch.QueueBind(

q.Name, // 隊列名稱

"example.#", // 路由鍵,可以使用通配符*匹配一個單詞

"logs_topic", // 交換機名稱

false, // 是否等待服務器響應

nil, // 其他屬性

)

if err != nil{

log.Fatalf(err)

}

// 創(chuàng)建一個消費者通道

msgs, err := ch.Consume(

q.Name, // 隊列名稱

"", // 消費者標識符,留空表示由RabbitMQ自動生成

true, // 是否自動應答

false, // 是否排他消費者

false, // 是否阻塞

false, // 是否等待服務器響應

nil, // 其他屬性

)

if err != nil{

log.Fatalf(err)

}

// 接收和處理消息

forever := make(chan bool)

go func() {

for d := range msgs {

log.Printf("Received a message: %s", d.Body)

}

}()

log.Printf("Waiting for messages...")

// 阻塞

<-forever

}

柚子快報激活碼778899分享:Golang學習筆記

http://yzkb.51969.com/

好文閱讀

評論可見,查看隱藏內容

本文內容根據網絡資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。

轉載請注明,如有侵權,聯系刪除。

本文鏈接:http://m.gantiao.com.cn/post/19497780.html

發(fā)布評論

您暫未設置收款碼

請在主題配置——文章設置里上傳

掃描二維碼手機訪問

文章目錄