柚子快報(bào)激活碼778899分享:分布式 rabbitmq
柚子快報(bào)激活碼778899分享:分布式 rabbitmq
一,什么是消息中間件
MQ全稱為Message Queue,本質(zhì)上是個(gè)隊(duì)列,F(xiàn)IFO先入先出。是在消息的傳輸過(guò)程中保存消息的容器??梢杂糜趹?yīng)用程序和應(yīng)用程序之間的通信方法。多用于分布式系統(tǒng)之間進(jìn)行通信,在項(xiàng)目中,可將一些無(wú)需即時(shí)返回且耗時(shí)的操作提取出來(lái),進(jìn)行異步處理,而這種異步處理的方式大大的節(jié)省了服務(wù)器的請(qǐng)求響應(yīng)時(shí)間,從而提高了系統(tǒng)的吞吐量
二,為什么要使用消息中間件
消息中間件優(yōu)點(diǎn):異步,解耦,限流
同步通信:耗時(shí)長(zhǎng),受網(wǎng)絡(luò)波動(dòng)影響,不能保證高成功率,耦合性高。
同步,異步
并發(fā):一段時(shí)間(1S)多個(gè)請(qǐng)求數(shù)
并行:時(shí)間節(jié)點(diǎn),多個(gè)指令同時(shí)被執(zhí)行
串行:順序執(zhí)行
1.異步處理:
將不需要同步處理的并且耗時(shí)長(zhǎng)的操作由消息隊(duì)列通知消息接收方進(jìn)行異步處理,提高了應(yīng)用程序的響應(yīng)時(shí)間。
消息隊(duì)列:Redis 發(fā)布訂閱(pub/sub)
異步方式:用戶點(diǎn)擊完下單按鈕后,只需等待25ms就能得到下單響應(yīng) (20 + 5 = 25ms)。也就是說(shuō),訂單消息提交到MQ,MQ回饋一個(gè)消息成功,然后再把訂單提交到數(shù)據(jù)庫(kù)20ms,就完成了。至于MQ通知庫(kù)存、支付、物流系統(tǒng)所花費(fèi)的時(shí)間和訂單系統(tǒng)成功沒(méi)有關(guān)系了。 這樣這個(gè)訂單系統(tǒng)提升用戶體驗(yàn)和系統(tǒng)吞吐量(單位時(shí)間內(nèi)處理請(qǐng)求的數(shù)目)
[外鏈圖片轉(zhuǎn)存失敗,源站可能有防盜鏈機(jī)制,建議將圖片保存下來(lái)直接上傳(img-fw7ZuYfR-1691115209630)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20230713152842364.png)]
同步處理(耗時(shí)長(zhǎng)):
同步方式的問(wèn)題:當(dāng)一個(gè)用戶提交訂單到成功需要300ms+300ms+300ms+20ms = 920ms,這是不能容忍的。也就是說(shuō)庫(kù)存、支付、物流、最后保存數(shù)據(jù)庫(kù)全部成功,訂單的提交才算完成。
2.系統(tǒng)的耦合性越高,容錯(cuò)性就越低,可維護(hù)性就越低
服務(wù)與之間耦合度,比如訂單服務(wù)與用戶積分服務(wù)(需求:下單成功,增加積分)
如果不用消息隊(duì)列,訂單服務(wù)和積分服務(wù)就要通信,下單后調(diào)用積分服務(wù)的接口通知積分服務(wù)進(jìn)行處理(或者定時(shí)掃描之類的),那么調(diào)用接口失敗,或者延時(shí)等等…一系列的問(wèn)題要考慮處理,非常繁瑣
用了消息隊(duì)列,用戶A下單成功后下單服務(wù)通過(guò)redis發(fā)布(mq的生產(chǎn)者)一消息,就不用管了.用戶積分服務(wù)redis訂閱了(mq的消費(fèi)者),就會(huì)受到這用戶A下單的消息,進(jìn)行處理.這就降低了多個(gè)服務(wù)之間的耦合,即使積分服務(wù)發(fā)生異常,也不會(huì)影響用戶正常下單.處理起來(lái)就非常的絲滑,各干各的互不影響.
解決方案:應(yīng)用程序解耦合
MQ相當(dāng)于一個(gè)中介,生產(chǎn)方通過(guò)MQ與消費(fèi)方交互,它將應(yīng)用程序進(jìn)行解耦合。
使用消息隊(duì)列的方式:使用 MQ 使得應(yīng)用間解耦,提升容錯(cuò)性和可維護(hù)性。庫(kù)存和支付和物流直接去MQ取到訂單的信息即可,即使庫(kù)存系統(tǒng)報(bào)錯(cuò),沒(méi)關(guān)系,等到庫(kù)存修復(fù)后再次從MQ中去取就可以了
3.高并發(fā)(分批處理請(qǐng)求)
訂單系統(tǒng),在下單的時(shí)候就會(huì)往數(shù)據(jù)庫(kù)寫(xiě)數(shù)據(jù)。但是數(shù)據(jù)庫(kù)只能支撐每秒1000左右的并發(fā)寫(xiě)入,并發(fā)量再高就容易宕機(jī)。低峰期的時(shí)候并發(fā)也就100多個(gè),但是在高峰期時(shí)候,并發(fā)量會(huì)突然激增到5000以上,這個(gè)時(shí)候數(shù)據(jù)庫(kù)肯定卡死了。但不一定宕機(jī),只會(huì)很慢,一旦宕機(jī)就會(huì)有消息丟失。
解決方案:削峰填谷
消息被MQ保存起來(lái)了,5000條數(shù)據(jù)對(duì)于MQ,簡(jiǎn)直是小意思,然后系統(tǒng)就可以按照自己的消費(fèi)能力來(lái)消費(fèi),比如每秒1000個(gè)數(shù)據(jù),這樣慢慢寫(xiě)入數(shù)據(jù)庫(kù),這樣就不會(huì)卡死數(shù)據(jù)庫(kù)了
4,MQ的劣勢(shì)
1、系統(tǒng)可用性降低:系統(tǒng)引入的外部依賴越多,系統(tǒng)穩(wěn)定性越差。一旦 MQ 宕機(jī),就會(huì)對(duì)業(yè)務(wù)造成影響。如何保證MQ的高可用?
2、系統(tǒng)復(fù)雜度提高:MQ 的加入大大增加了系統(tǒng)的復(fù)雜度,以前系統(tǒng)間是同步的遠(yuǎn)程調(diào)用,現(xiàn)在是通過(guò) MQ 進(jìn)行異步調(diào)用。如何保證消息沒(méi)有被重復(fù)消費(fèi)?怎么處理消息丟失情況?那么保證消息傳遞的順序性?
3、一致性問(wèn)題:A 系統(tǒng)處理完業(yè)務(wù),通過(guò) MQ 給B、C、D三個(gè)系統(tǒng)發(fā)消息數(shù)據(jù),如果 B 系統(tǒng)、C 系統(tǒng)處理成功,D 系統(tǒng)處理失敗。如何保證消息數(shù)據(jù)處理的一致性?
既然 MQ 有優(yōu)勢(shì)也有劣勢(shì),那么使用 MQ 需要滿足什么條件呢?
生產(chǎn)者不需要從消費(fèi)者處獲得反饋。引入消息隊(duì)列之前的直接調(diào)用,其接口的返回值應(yīng)該為空,這才讓明明下層的動(dòng)作還沒(méi)做,上層卻當(dāng)成動(dòng)作做完了繼續(xù)往后走,即所謂異步成為了可能。 容許短暫的不一致性。 確實(shí)是用了有效果。即解耦、提速、削峰這些方面的收益,超過(guò)加入MQ,管理MQ這些成本。
常見(jiàn)的 MQ 產(chǎn)品
目前業(yè)界有很多的 MQ 產(chǎn)品,例如 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq等,也有直接使用 Redis 充當(dāng)消息隊(duì)列的案例,而這些消息隊(duì)列產(chǎn)品,各有側(cè)重,在實(shí)際選型時(shí),需要結(jié)合自身需求及 MQ 產(chǎn)品特征,綜合考慮。
** **RabbitMQActiveMQRocketMQKafka公司/社區(qū)RabbitApache阿里Apache開(kāi)發(fā)語(yǔ)言ErlangJavaJavaScala&Java協(xié)議支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定義自定義協(xié)議,社區(qū)封裝了http協(xié)議支持客戶端支持語(yǔ)言官方支持Erlang,Java,Ruby等,社區(qū)產(chǎn)出多種API,幾乎支持所有語(yǔ)言Java,C,C++,Python,PHP,Perl,.net等Java,C++(不成熟)官方支持Java,社區(qū)產(chǎn)出多種API,如PHP,Python等單機(jī)吞吐量萬(wàn)級(jí)(其次)萬(wàn)級(jí)(最差)十萬(wàn)級(jí)(最好)十萬(wàn)級(jí)(次之)消息延遲微秒級(jí)毫秒級(jí)毫秒級(jí)毫秒以內(nèi)功能特性并發(fā)能力強(qiáng),性能極其好,延時(shí)低,社區(qū)活躍,管理界面豐富老牌產(chǎn)品,成熟度高,文檔較多MQ功能比較完備,擴(kuò)展性佳只支持主要的MQ功能,畢竟是為大數(shù)據(jù)領(lǐng)域準(zhǔn)備的。
三,RabbitMQ簡(jiǎn)介
RabbitMQ是由erlang語(yǔ)言開(kāi)發(fā),基于AMQP(Advanced Message Queue 高級(jí)消息隊(duì)列協(xié)議)協(xié)議實(shí)現(xiàn)的消息隊(duì)列,它是一種應(yīng)用程序之間的通信方法,消息隊(duì)列在分布式系統(tǒng)開(kāi)發(fā)中應(yīng)用非常廣泛。2007年,Rabbit 技術(shù)公司基于 AMQP 標(biāo)準(zhǔn)開(kāi)發(fā)的 RabbitMQ 1.0 發(fā)布。 (Erlang 語(yǔ)言由 Ericson 設(shè)計(jì),專門為開(kāi)發(fā)高并發(fā)和分布式系統(tǒng)的一種語(yǔ)言,在電信領(lǐng)域使用廣泛)
RabbitMQ官方地址:http://www.rabbitmq.com/
RabbitMQ 基礎(chǔ)架構(gòu)如下圖:
上圖說(shuō)明:
1、Broker:接收和分發(fā)消息的應(yīng)用,就是一個(gè)中介,RabbitMQ Server就是 Message Broker 2、Virtual host:出于多租戶和安全因素設(shè)計(jì)的,把 AMQP 的基本組件劃分到一個(gè)虛擬的分組中,類似于網(wǎng)絡(luò)中的 namespace 概念。當(dāng)多個(gè)不同的用戶使用同一個(gè) RabbitMQ server 提供的服務(wù)時(shí),可以劃分出多個(gè)vhost,每個(gè)用戶在自己的 vhost 創(chuàng)建 exchange/queue 等 3、Connection:publisher/consumer 和 broker 之間的 TCP 連接 4、Channel:如果每一次訪問(wèn) RabbitMQ 都建立一個(gè) Connection,在消息量大的時(shí)候建立 TCP Connection的開(kāi)銷將是巨大的,效率也較低。Channel 是在 connection 內(nèi)部建立的邏輯連接,如果應(yīng)用程序支持多線程,通常每個(gè)thread創(chuàng)建單獨(dú)的 channel 進(jìn)行通訊,AMQP method 包含了channel id 幫助客戶端和message broker 識(shí)別 channel,所以 channel 之間是完全隔離的。Channel 作為輕量級(jí)的 Connection 極大減少了操作系統(tǒng)建立 TCP connection 的開(kāi)銷
5、Exchange:message 到達(dá) broker 的第一站,根據(jù)分發(fā)規(guī)則,匹配查詢表中的 routing key,分發(fā)消息到queue 中去。常用的類型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast) 6、Queue:消息最終被送到這里等待 consumer 取走 7、Binding:exchange 和 queue 之間的虛擬連接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查詢表中,用于 message 的分發(fā)依據(jù)
RabbitMQ提供了6種模式:簡(jiǎn)單模式,work模式,Publish/Subscribe發(fā)布與訂閱模式,Routing路由模式,Topics主題模式,RPC遠(yuǎn)程調(diào)用模式(遠(yuǎn)程調(diào)用,不太算MQ;暫不作介紹);官網(wǎng)對(duì)應(yīng)模式介紹:https://www.rabbitmq.com/getstarted.html , 點(diǎn)擊手冊(cè)按鈕 RabbitMQ Tutorials
2. 安裝及配置RabbitMQ
RabbitMQ 官方地址:http://www.rabbitmq.com/
win10安裝
安裝erlang和rabbitmq后,進(jìn)入rabbitmq安裝目錄的sbin目錄執(zhí)行下面命令
rabbitmq-plugins.bat enable rabbitmq_management
[外鏈圖片轉(zhuǎn)存失敗,源站可能有防盜鏈機(jī)制,建議將圖片保存下來(lái)直接上傳(img-tUbZI5GZ-1691115209633)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20230713154626768.png)]
登錄URL:
http://localhost:15672/
用戶名密碼:guest/guest
問(wèn)題:
? 登錄用戶是中文解決方案:
? 1、創(chuàng)建用戶為英文,再安裝相關(guān)環(huán)境
? 2、修改相應(yīng)的目錄
? 用管理員執(zhí)行CMD
rabbitmq-service.bat remove
set RABBITMQ_BASE=D:\rabbitmq_server\data
rabbitmq-service.bat install
rabbitmq-plugins enable rabbitmq_management
查看進(jìn)程
tasklist | find /i "erl"
關(guān)閉進(jìn)程
taskkill /pid 7300 -t -f //將7300改成對(duì)應(yīng)端口號(hào)
3. RabbitMQ快速入門
3.1 生產(chǎn)方工程搭建
1.添加相關(guān)依賴
修改pom.xml文件內(nèi)容為如下:
2.啟動(dòng)類
package com.woniu.rabbitmq;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RabbitApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitApplication.class);
}
}
3.配置RabbitMQ
創(chuàng)建application.yml,內(nèi)容如下:
spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: /woniu
username: woniu
password: woniu
創(chuàng)建隊(duì)列參數(shù)說(shuō)明:
參數(shù)說(shuō)明name字符串值,queue的名稱。durable布爾值,表示該 queue 是否持久化。 持久化意味著當(dāng) RabbitMQ 重啟后,該 queue 是否會(huì)恢復(fù)/仍存在。 另外,需要注意的是,queue 的持久化不等于其中的消息也會(huì)被持久化。exclusive布爾值,表示該 queue 是否排它式使用。排它式使用意味著僅聲明他的連接可見(jiàn)/可用,其它連接不可見(jiàn)/不可用。autoDelete布爾值,表示當(dāng)該 queue 沒(méi)“人”(connection)用時(shí),是否會(huì)被自動(dòng)刪除。
不指定 durable、exclusive 和 autoDelete 時(shí),默認(rèn)為 true 、 false 和 false 。表示持久化、非排它、不用自動(dòng)刪除。
創(chuàng)建交換機(jī)參數(shù)說(shuō)明
參數(shù)說(shuō)明name字符串值,exchange 的名稱。durable布爾值,表示該 exchage 是否持久化。 持久化意味著當(dāng) RabbitMQ 重啟后,該 exchange 是否會(huì)恢復(fù)/仍存在。autoDelete布爾值,表示當(dāng)該 exchange 沒(méi)“人”(queue)用時(shí),是否會(huì)被自動(dòng)刪除。
不指定 durable 和 autoDelete 時(shí),默認(rèn)為 true 和 false 。表示持久化、不用自動(dòng)刪除
4. AMQP
4.1. 相關(guān)概念介紹
AMQP 一個(gè)提供統(tǒng)一消息服務(wù)的應(yīng)用層標(biāo)準(zhǔn)高級(jí)消息隊(duì)列協(xié)議,是應(yīng)用層協(xié)議的一個(gè)開(kāi)放標(biāo)準(zhǔn),為面向消息的中間件設(shè)計(jì)。AMQP是一個(gè)二進(jìn)制協(xié)議,擁有一些現(xiàn)代化特點(diǎn):多信道、協(xié)商式,異步,安全,擴(kuò)平臺(tái),中立,高效。
RabbitMQ是AMQP協(xié)議的Erlang的實(shí)現(xiàn)。
概念說(shuō)明連接Connection一個(gè)網(wǎng)絡(luò)連接,比如TCP/IP套接字連接。會(huì)話Session端點(diǎn)之間的命名對(duì)話。在一個(gè)會(huì)話上下文中,保證“恰好傳遞一次”。信道Channel多路復(fù)用連接中的一條獨(dú)立的雙向數(shù)據(jù)流通道。為會(huì)話提供物理傳輸介質(zhì)??蛻舳薈lientAMQP連接或者會(huì)話的發(fā)起者。AMQP是非對(duì)稱的,客戶端生產(chǎn)和消費(fèi)消息,服務(wù)器存儲(chǔ)和路由這些消息。服務(wù)節(jié)點(diǎn)Broker消息中間件的服務(wù)節(jié)點(diǎn);一般情況下可以將一個(gè)RabbitMQ Broker看作一臺(tái)RabbitMQ 服務(wù)器。端點(diǎn)AMQP對(duì)話的任意一方。一個(gè)AMQP連接包括兩個(gè)端點(diǎn)(一個(gè)是客戶端,一個(gè)是服務(wù)器)。消費(fèi)者Consumer一個(gè)從消息隊(duì)列里請(qǐng)求消息的客戶端程序。生產(chǎn)者Producer一個(gè)向交換機(jī)發(fā)布消息的客戶端應(yīng)用程序。
4.2. RabbitMQ運(yùn)轉(zhuǎn)流程
在入門案例中:
生產(chǎn)者發(fā)送消息
生產(chǎn)者創(chuàng)建連接(Connection),開(kāi)啟一個(gè)信道(Channel),連接到RabbitMQ Broker;聲明隊(duì)列并設(shè)置屬性;如是否排它,是否持久化,是否自動(dòng)刪除;將路由鍵(空字符串)與隊(duì)列綁定起來(lái);發(fā)送消息至RabbitMQ Broker;關(guān)閉信道;關(guān)閉連接; 消費(fèi)者接收消息
消費(fèi)者創(chuàng)建連接(Connection),開(kāi)啟一個(gè)信道(Channel),連接到RabbitMQ Broker向Broker 請(qǐng)求消費(fèi)相應(yīng)隊(duì)列中的消息,設(shè)置相應(yīng)的回調(diào)函數(shù);等待Broker回應(yīng)投遞隊(duì)列中的消息,消費(fèi)者接收消息;確認(rèn)(ack,自動(dòng)確認(rèn))接收到的消息;RabbitMQ從隊(duì)列中刪除相應(yīng)已經(jīng)被確認(rèn)的消息;關(guān)閉信道;關(guān)閉連接;
4.3. 生產(chǎn)者流轉(zhuǎn)過(guò)程說(shuō)明
客戶端與代理服務(wù)器Broker建立連接。會(huì)調(diào)用newConnection() 方法,這個(gè)方法會(huì)進(jìn)一步封裝Protocol Header 0-9-1 的報(bào)文頭發(fā)送給Broker ,以此通知Broker 本次交互采用的是AMQP0-9-1 協(xié)議,緊接著B(niǎo)roker 返回Connection.Start 來(lái)建立連接,在連接的過(guò)程中涉及Connection.Start/.Start-OK 、Connection.Tune/.Tune-Ok ,Connection.Open/ .Open-Ok 這6 個(gè)命令的交互。 客戶端調(diào)用connection.createChannel方法。此方法開(kāi)啟信道,其包裝的channel.open命令發(fā)送給Broker; channel.basicPublish方法對(duì)應(yīng)的AMQP命令為Basic.Publish,這個(gè)命令包含了content Header 和content Body()。content Header 包含了消息體的屬性,例如:投遞模式,優(yōu)先級(jí)等,content Body 包含了消息體本身。 客戶端發(fā)送完消息需要關(guān)閉資源時(shí),涉及到Channel.Close和Channel.Close-Ok 與Connetion.Close和Connection.Close-Ok的命令交互。
4.4. 消費(fèi)者流轉(zhuǎn)過(guò)程說(shuō)明
消費(fèi)者客戶端與代理服務(wù)器Broker建立連接。會(huì)調(diào)用newConnection() 方法,這個(gè)方法會(huì)進(jìn)一步封裝Protocol Header 0-9-1 的報(bào)文頭發(fā)送給Broker ,以此通知Broker 本次交互采用的是AMQP0-9-1 協(xié)議,緊接著B(niǎo)roker 返回Connection.Start 來(lái)建立連接,在連接的過(guò)程中涉及Connection.Start/.Start-OK 、Connection.Tune/.Tune-Ok ,Connection.Open/ .Open-Ok 這6 個(gè)命令的交互。消費(fèi)者客戶端調(diào)用connection.createChannel方法。和生產(chǎn)者客戶端一樣,協(xié)議涉及Channel . Open/Open-Ok命令。在真正消費(fèi)之前,消費(fèi)者客戶端需要向Broker 發(fā)送Basic.Consume 命令(即調(diào)用channel.basicConsume 方法〉將Channel 置為接收模式,之后Broker響應(yīng)Basic . Consume - Ok 以告訴消費(fèi)者客戶端準(zhǔn)備好消費(fèi)消息。Broker 向消費(fèi)者客戶端推送(Push) 消息,即Basic.Deliver 命令,這個(gè)命令和Basic.Publish 命令一樣會(huì)攜帶Content Header 和Content Body。消費(fèi)者接收到消息并正確消費(fèi)之后,向Broker 發(fā)送確認(rèn),即Basic.Ack 命令??蛻舳税l(fā)送完消息需要關(guān)閉資源時(shí),涉及到Channel.Close和Channl.Close-Ok 與Connetion.Close和Connection.Close-Ok的命令交互。
5. RabbitMQ工作模式
5.1. Work queues工作隊(duì)列模式
simple模式: 一個(gè)生產(chǎn)者一個(gè)消費(fèi)者 定義rabbitconfig: 創(chuàng)建消息隊(duì)列,交換機(jī)及其之間綁定 @Configuration
public class RabbitmqConfig {
/**
* simple 隊(duì)列
*/
@Bean
public Queue simpleQueue(){
return QueueBuilder.durable("simpleQueue").build();
//return new Queue("simpleQueue");
}
}
定義生產(chǎn)者
/**
* 往消息隊(duì)列返送消息
*/
@Component
public class SimpleProduct {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(String msg){
rabbitTemplate.convertAndSend("simpleQueue",msg);
}
}
? 定義消費(fèi)者
@Component
public class SimpleConsumer {
/**
* 消費(fèi)消息
*/
@RabbitListener(queues = "simpleQueue")
private void recevie(String msg){
System.out.println("消費(fèi)者接收到:"+msg);
}
}
**work模式:**一個(gè)生產(chǎn)者多個(gè)消費(fèi)者,也稱之為競(jìng)爭(zhēng)模式 創(chuàng)建兩個(gè)消費(fèi)者監(jiān)聽(tīng)隊(duì)列
Work Queues與入門程序的簡(jiǎn)單模式相比,多了一個(gè)或一些消費(fèi)端,多個(gè)消費(fèi)端共同消費(fèi)同一個(gè)隊(duì)列中的消息。它們處于競(jìng)爭(zhēng)者的關(guān)系,一條消息只會(huì)被一個(gè)消費(fèi)者接收,rabbit采用輪詢的方式將消息是平均發(fā)送給消費(fèi)者的;消費(fèi)者在處理完某條消息后,才會(huì)收到下一條消息。
應(yīng)用場(chǎng)景:對(duì)于任務(wù)過(guò)重或任務(wù)較多情況使用工作隊(duì)列可以提高任務(wù)處理的速度。如生產(chǎn)者生產(chǎn)一千條消息,那么c1和c2各消費(fèi)500條,隊(duì)列消費(fèi)消息是均衡分配
復(fù)制消費(fèi)方代碼,重新編寫(xiě)一個(gè)消費(fèi)端,然后啟動(dòng)兩個(gè)消費(fèi)端,進(jìn)行測(cè)試
5.2. 訂閱模式類型
訂閱模式示例圖:
在訂閱模型中,多了一個(gè)exchange角色,而且過(guò)程略有變化:
P:生產(chǎn)者,也就是要發(fā)送消息的程序,但是不再發(fā)送到隊(duì)列中,而是發(fā)給X(交換機(jī))C:消費(fèi)者,消息的接受者,會(huì)一直等待消息到來(lái)。Queue:消息隊(duì)列,接收消息、緩存消息。Exchange:交換機(jī),圖中的X。一方面,接收生產(chǎn)者發(fā)送的消息。另一方面,知道如何處理消息,例如遞交給某個(gè)特別隊(duì)列、遞交給所有隊(duì)列、或是將消息丟棄。到底如何操作,取決于Exchange的類型。Exchange有常見(jiàn)以下3種類型:
Fanout:廣播,將消息交給所有綁定到交換機(jī)的隊(duì)列Direct:定向,把消息交給符合指定routing key 的隊(duì)列Topic:通配符,把消息交給符合routing pattern(路由模式) 的隊(duì)列
Exchange(交換機(jī))只負(fù)責(zé)轉(zhuǎn)發(fā)消息,不具備存儲(chǔ)消息的能力,因此如果沒(méi)有任何隊(duì)列與Exchange綁定,或者沒(méi)有符合路由規(guī)則的隊(duì)列,那么消息會(huì)丟失!
5.2.1 廣播模式
1.創(chuàng)建RabbitMQ隊(duì)列與交換機(jī)綁定的配置類com…rabbitmq.config.RabbitMQConfig
/**
* 負(fù)責(zé):創(chuàng)建消息隊(duì)列,交換機(jī)及其之間綁定
*/
@Configuration
public class RabbitmqConfig {
/**
* fanout 模式
*/
@Bean
public Queue fanoutQueueA(){
return QueueBuilder.durable("fanoutQueueA").build();
}
@Bean
public Queue fanoutQueueB(){
return QueueBuilder.durable("fanoutQueueB").build();
}
/**
* 創(chuàng)建交換機(jī)
* @return
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("fanoutExchange");
}
/**
* 把消息隊(duì)列,綁定到交換機(jī)上
* IOC 在調(diào)用配置中的方法時(shí),如果有參數(shù),默認(rèn)以形參的名字找到IOC中對(duì)應(yīng)的方法
*/
@Bean
public Binding fanoutQueueAToFanoutExchange( Queue fanoutQueueA,
FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueueA).to(fanoutExchange);
}
@Bean
public Binding fanoutQueueToFanoutExchange( Queue fanoutQueueB,
FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueueB).to(fanoutExchange);
}
}
1、實(shí)現(xiàn)生產(chǎn)者
/**
* 往消息隊(duì)列返送消息
*/
@Component
public class FanoutProduct {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* arg1: 交換機(jī)名字
* arg2: 路由名字
* arg3: 參數(shù)名字
* @param msg
*/
public void send(String msg){
rabbitTemplate.convertAndSend("fanoutExchange","",msg);
}
}
創(chuàng)建交換機(jī)參數(shù)說(shuō)明:
參數(shù)說(shuō)明exchange字符串值,交換機(jī)名稱type交換機(jī)的類型,有三種類型:FANOUT、DIRECT、TOPICdurable交換機(jī)是否持久化,表示當(dāng)rabbitmq重啟時(shí)或者意外宕機(jī),這個(gè)交換機(jī)還在不在autoDelete是否自動(dòng)刪除,表示當(dāng)該交換機(jī)沒(méi)人發(fā)消息時(shí),是否會(huì)被自動(dòng)刪除。internal內(nèi)部使用,一般為falsearguments其它參數(shù)
發(fā)送消息參數(shù)說(shuō)明
參數(shù)說(shuō)明exchange字符串值,交換機(jī)名稱routingKey如果交換機(jī)類型是fanout,則routingKey為""props消息基本屬性配置body要發(fā)送的消息的內(nèi)容
2、消費(fèi)方實(shí)現(xiàn)
@Component
public class FanoutConsumer {
/**
* 消費(fèi)消息
*/
@RabbitListener(queues = "fanoutQueueA")
private void recevie(String msg){
System.out.println("消費(fèi)者A接收到:"+msg);
}
/**
* 消費(fèi)消息
*/
@RabbitListener(queues = "fanoutQueueB")
private void recevieB(String msg){
System.out.println("消費(fèi)者B接收到:"+msg);
}
}
發(fā)布訂閱模式與工作隊(duì)列模式的區(qū)別
1、工作隊(duì)列模式不用定義交換機(jī),而發(fā)布/訂閱模式需要定義交換機(jī)。
2、發(fā)布/訂閱模式的生產(chǎn)方是面向交換機(jī)發(fā)送消息,工作隊(duì)列模式的生產(chǎn)方是面向隊(duì)列發(fā)送消息(底層使用默認(rèn)交換機(jī))。
3、發(fā)布/訂閱模式需要設(shè)置隊(duì)列和交換機(jī)的綁定,工作隊(duì)列模式不需要設(shè)置,實(shí)際上工作隊(duì)列模式會(huì)將隊(duì)列綁 定到默認(rèn)的交換機(jī) 。
5.2.2 Routing路由模式
路由模式特點(diǎn):
隊(duì)列與交換機(jī)的綁定,不能是任意綁定了,而是要指定一個(gè)RoutingKey(路由key)消息的發(fā)送方在 向 Exchange發(fā)送消息時(shí),也必須指定消息的 RoutingKey。Exchange不再把消息交給每一個(gè)綁定的隊(duì)列,而是根據(jù)消息的Routing Key進(jìn)行判斷,只有隊(duì)列的Routingkey與消息的 Routing key完全一致,才會(huì)接收到消息
在編碼上與 Publish/Subscribe發(fā)布與訂閱模式 的區(qū)別是交換機(jī)的類型為:Direct,還有隊(duì)列綁定交換機(jī)的時(shí)候需要指定routing key。
1、創(chuàng)建RabbitMQ隊(duì)列與交換機(jī)綁定的配置類com…rabbitmq.config.RabbitMQConfig
/**
* 負(fù)責(zé):創(chuàng)建消息隊(duì)列,交換機(jī)及其之間綁定
*/
@Configuration
public class RabbitmqConfig {
//==================route===
/**
* 路由模式:通過(guò)路由key將消息發(fā)送給指定消息隊(duì)列
* 1個(gè)交換機(jī),2個(gè)消息隊(duì)列,1個(gè)生產(chǎn)者,2個(gè)消費(fèi)者
*/
@Bean
public Queue routeQueueA(){
return QueueBuilder.durable("routeQueueA").build();
}
@Bean
public Queue routeQueueB(){
return QueueBuilder.durable("routeQueueB").build();
}
@Bean
public DirectExchange directExchange(){
return new DirectExchange("directExchange");
}
//綁定
@Bean
public Binding routeQueueToDirectExchange(Queue routeQueueA,
DirectExchange directExchange){
return BindingBuilder.bind(routeQueueA).to(directExchange).with("red");
}
@Bean
public Binding routeQueueBToDirectExchange(Queue routeQueueB,
DirectExchange directExchange){
return BindingBuilder.bind(routeQueueB).to(directExchange).with("blue");
}
}
2、生產(chǎn)方實(shí)現(xiàn)
*/
@Component
public class DirectProduct {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* arg1: 交換機(jī)名字
* arg2: 路由名字
* arg3: 參數(shù)名字
* @param msg
*/
public void send(String msg,String routekey){
rabbitTemplate.convertAndSend("directExchange",routekey,msg);
}
}
3.消費(fèi)方實(shí)現(xiàn)
創(chuàng)建2個(gè)消費(fèi)方并啟動(dòng),然后使用生產(chǎn)者發(fā)送消息;在消費(fèi)者對(duì)應(yīng)的控制臺(tái)可以查看到生產(chǎn)者發(fā)送對(duì)應(yīng)routing key對(duì)應(yīng)隊(duì)列的消息;到達(dá)按照需要接收的效果
@Component
public class DirectConsumer {
/**
* 消費(fèi)消息
*/
@RabbitListener(queues = "routeQueueA")
private void recevie(String msg){
System.out.println("消費(fèi)者A接收到:"+msg);
}
/**
* 消費(fèi)消息
*/
@RabbitListener(queues = "routeQueueB")
private void recevieB(String msg){
System.out.println("消費(fèi)者B接收到:"+msg);
}
}
5.2.3. Topics通配符模式(模糊路由)
Topic類型與Direct相比,都是可以根據(jù)RoutingKey把消息路由到不同的隊(duì)列。只不過(guò)Topic類型Exchange可以讓隊(duì)列在綁定Routing key 的時(shí)候使用通配符!Routingkey 一般都是有一個(gè)或多個(gè)單詞組成,多個(gè)單詞之間以”.”分割,例如: item.insert
通配符規(guī)則:
#:匹配0個(gè)或多個(gè)詞
*:匹配不多不少恰好1個(gè)詞
舉例:
item.#:能夠匹配item.insert.abc 或者 item.insert
item.*:只能匹配item.insert
創(chuàng)建RabbitMQ隊(duì)列與交換機(jī)綁定的配置類com…rabbitmq.config.RabbitMQConfig
package com.woniu.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
// Topics通配符模式
@Bean
public Queue tt01(){
return new Queue("tt01");
}
@Bean
public Queue tt02(){
return new Queue("tt02");
}
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("tte");
}
@Bean
public Binding ttBinding01(){
return BindingBuilder.bind(tt01()).to(topicExchange()).with("#.error");
}
@Bean
public Binding ttBinding02(){
return BindingBuilder.bind(tt02()).to(topicExchange()).with("order.*");
}
}
1、生產(chǎn)方代碼實(shí)現(xiàn)
使用topic類型的Exchange
package com.woniu.rabbitmq.controller;
import com.woniu.rabbitmq.config.RabbitMQConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ProducerController {
@RequestMapping("/sendT1MT1/{msg}")
public String sendT1MT1(@PathVariable String msg){
rabbitTemplate.convertAndSend("tte", "11.error", msg);
return "success";
}
@RequestMapping("/sendT1MT2/{msg}")
public String sendT1MT2(@PathVariable String msg){
rabbitTemplate.convertAndSend("tte", "11.22.error", msg);
return "success";
}
@RequestMapping("/sendT1MF/{msg}")
public String sendT1MF(@PathVariable String msg){
rabbitTemplate.convertAndSend("tte", "11.error.22", msg);
return "success";
}
@RequestMapping("/sendT2MF/{msg}")
public String sendT2MF(@PathVariable String msg){
rabbitTemplate.convertAndSend("tte", "order.11.22", msg);
return "success";
}
@RequestMapping("/sendT2MT/{msg}")
public String sendT2MT(@PathVariable String msg){
rabbitTemplate.convertAndSend("tte", "order.1", msg);
return "success";
}
}
消費(fèi)者
@Component
public class DirectComsume {
@RabbitListener(queues = "tt01")
public void received(String msg){
System.out.println("接收到消息:"+msg);
}
@RabbitListener(queues = "tt02")
public void receivedB(String msg){
System.out.println("接收到消息:"+msg);
}
}
創(chuàng)建2個(gè)消費(fèi)方并啟動(dòng),然后使用生產(chǎn)者發(fā)送消息;在消費(fèi)者對(duì)應(yīng)的控制臺(tái)可以查看到生產(chǎn)者發(fā)送對(duì)應(yīng)routing key對(duì)應(yīng)隊(duì)列的消息;到達(dá)按照需要接收的效果;并且這些routing key可以使用通配符。
四,RabbitMQ高級(jí)
1.消息的可靠投遞(生產(chǎn)者端)
在使用 RabbitMQ 的時(shí)候,作為消息發(fā)送方希望杜絕任何消息丟失或者投遞失敗場(chǎng)景。RabbitMQ 為我們提供了兩種方式用來(lái)控制消息的投遞可靠性模式。
confirm 確認(rèn)模式return 退回模式
rabbitmq 整個(gè)消息投遞的路徑為:producer—>rabbitmq broker—>exchange—>queue—>consumer
消息從 producer 到 exchange,不管exchange是否收到生產(chǎn)者消息,都會(huì)返回一個(gè) confirmCallback 。消息從 exchange–>queue 投遞失敗則會(huì)返回一個(gè) returnCallback 。
我們將利用這兩個(gè) callback 控制消息的可靠性投遞
1.1.confirmCallback確認(rèn)模式
1.在配置文件中 添加publisher-confirm-type: correlated配置
spring:
rabbitmq:
host: localhost
port: 5672
username: woniu
password: woniu
virtual-host: /woniu
publisher-confirm-type: correlated #發(fā)布消息成功到交換器后會(huì)觸發(fā)回調(diào)方法
publisher-returns: true #返回確認(rèn)信息
在springboot2.2.0.RELEASE版本之前(spring.rabbitmq.publisher-confirm發(fā)布確認(rèn)屬性配置)是amqp正式支持的屬性,用來(lái)配置消息發(fā)送到交換器之后是否觸發(fā)回調(diào)方法,在2.2.0及之后該屬性過(guò)期使用spring.rabbitmq.publisher-confirm-type屬性配置代替,用來(lái)配置更多的確認(rèn)類型;
NONE值是禁用發(fā)布確認(rèn)模式,是默認(rèn)值; CORRELATED值是發(fā)布消息成功到交換器后會(huì)觸發(fā)回調(diào)方法; SIMPLE值經(jīng)測(cè)試有兩種效果,其一效果和CORRELATED值一樣會(huì)觸發(fā)回調(diào)方法,其二在發(fā)布消息成功后使用rabbitTemplate調(diào)用waitForConfirms或waitForConfirmsOrDie方法等待broker節(jié)點(diǎn)返回發(fā)送結(jié)果,根據(jù)返回結(jié)果來(lái)判定下一步的邏輯,要注意的點(diǎn)是waitForConfirmsOrDie方法如果返回false則會(huì)關(guān)閉channel,則接下來(lái)無(wú)法發(fā)送消息到broker。
2、編寫(xiě)Product類
package com.woniu.product;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 往消息隊(duì)列返送消息
*/
@Component
public class ConfirmProduct implements RabbitTemplate.ConfirmCallback ,RabbitTemplate.ReturnsCallback{
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* arg1: 交換機(jī)名字
* arg2: 路由名字
* arg3: 參數(shù)名字
* @param msg
*/
public void send(String msg,String routekey){
rabbitTemplate.setConfirmCallback(this); //設(shè)置回調(diào)
rabbitTemplate.setReturnsCallback(this);
rabbitTemplate.convertAndSend("confimExchange",routekey,msg);
}
/**
*
* @param correlationData 消息的唯一標(biāo)識(shí),如果發(fā)送失敗,可以根據(jù)這個(gè)標(biāo)識(shí)補(bǔ)發(fā)信息
* @param status 交換機(jī)是否成功收到信息,true:成功
* @param reason 失敗的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean status, String reason) {
System.out.println("進(jìn)入confirm方法");
System.out.println(status);
System.out.println(reason);
}
/**
* 只有消息路由失敗進(jìn)入,比如:找不到路由等
* @param returnedMessage
*/
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
returnedMessage.getMessage(); //失敗的原因
returnedMessage.getReplyCode(); //失敗的狀態(tài)碼
returnedMessage.getReplyText(); //失敗的原因
returnedMessage.getExchange(); //交換機(jī)
returnedMessage.getRoutingKey(); //路由
}
}
失敗演示可以把convertAndSend的交換機(jī)名字寫(xiě)錯(cuò)
1.2.ReturnCallBack確認(rèn)模式
在上個(gè)例子的基礎(chǔ)上,再添加一個(gè)測(cè)試方法returnedMessage
由于路由鍵不正確 022,故交換機(jī)的消息無(wú)法發(fā)送到消息隊(duì)列,setReturnCallback()方法,也就是Exchange路由到Queue失敗時(shí)執(zhí)行,這個(gè)前提是必須設(shè)置 rabbitTemplate.setMandatory(true);如果不加這句話,意味著交換機(jī)處理消息模式采用默認(rèn)的模式,模式模式是直接丟掉該消息,不會(huì)執(zhí)行setReturnCallback()方法。 當(dāng)然如果交換機(jī)發(fā)送消息到隊(duì)列,如果成功了也不會(huì)執(zhí)行該方法,因?yàn)閟etReturnCallback是交換機(jī)發(fā)送消息到隊(duì)列失敗才執(zhí)行的。
失敗演示生產(chǎn)者可以把發(fā)送的routekey寫(xiě)錯(cuò)
消息的可靠投遞小結(jié)
設(shè)置ConnectionFactory的publisher-confirms=“true” 開(kāi)啟 確認(rèn)模式。 使用rabbitTemplate.setConfirmCallback設(shè)置回調(diào)函數(shù)。當(dāng)消息發(fā)送到exchange后回調(diào)confirm方法。在方法中判斷ack,如果為true,則發(fā)送成功,如果為false,則發(fā)送失敗,需要處理。
設(shè)置ConnectionFactory的publisher-returns=“true” 開(kāi)啟 退回模式。使用rabbitTemplate.setReturnCallback設(shè)置退回函數(shù),當(dāng)消息從exchange路由到queue失敗后,如果設(shè)置了rabbitTemplate.setMandatory(true)參數(shù),則會(huì)將消息退回給producer。并執(zhí)行回調(diào)函數(shù)returnedMessage。
在RabbitMQ中也提供了事務(wù)機(jī)制,但是性能較差,此處不做講解。 使用channel下列方法,完成事務(wù)控制: txSelect(), 用于將當(dāng)前channel設(shè)置成transaction模式 txCommit(),用于提交事務(wù) txRollback(),用于回滾事務(wù)
2.Consumer ACK(消費(fèi)者端)
ack指Acknowledge(翻譯為:應(yīng)答),表示消費(fèi)端收到消息后的確認(rèn)方式。有三種確認(rèn)方式:
自動(dòng)確認(rèn):acknowledge=“none”手動(dòng)確認(rèn):acknowledge=“manual”根據(jù)異常情況確認(rèn):acknowledge=“auto”,(這種方式使用麻煩,不作講解)
其中自動(dòng)確認(rèn)是指,當(dāng)消息一旦被Consumer接收到,則自動(dòng)確認(rèn)收到,并將相應(yīng) message 從 RabbitMQ 的消息緩存中移除。但是在實(shí)際業(yè)務(wù)處理中,很可能消息接收到,業(yè)務(wù)處理出現(xiàn)異常,那么該消息就會(huì)丟失。如果設(shè)置了手動(dòng)確認(rèn)方式,則需要在業(yè)務(wù)處理成功后,調(diào)用channel.basicAck(),手動(dòng)確認(rèn),如果出現(xiàn)異常,則調(diào)用channel.basicNack()方法,讓其自動(dòng)重新發(fā)送消息。
2.1 消費(fèi)方工程搭建
1.在配置文件中 添加手動(dòng)確認(rèn)的配置
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual
prefetch: 1 # 默認(rèn)一批一批消費(fèi)的,提高消息,默認(rèn)250個(gè)
2.編寫(xiě)Ack監(jiān)聽(tīng)器
@Component
public class ConfirmConsumer {
/**
* 消費(fèi)消息
*/
@RabbitListener(queues = "confirmQueue")
private void recevie(String msg, Message message, Channel channel) throws IOException {
//arg1:消息的id
//arg2:是否批量確認(rèn)這些信息 false:否
//channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
/**
* arg1:消息的id
* arg2:是否批量確認(rèn)這些信息 false:否
* arg3:是否重新將消息放回隊(duì)列中,false:不放會(huì),一般都是false
*/
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
System.out.println("消費(fèi)者A接收到:"+msg);
}
}
basicAck的批量應(yīng)答問(wèn)題說(shuō)明:
channel.basicAck(8,true) 如果前面還有4,6,7的deliveryTag未被確認(rèn),則會(huì)一起確認(rèn),減少網(wǎng)絡(luò)流量,當(dāng)然當(dāng)前deliveryTag=8這條消息也會(huì)確認(rèn),如果沒(méi)有前面沒(méi)有未被確認(rèn)的消息,則只會(huì)確認(rèn)當(dāng)前消息,也就是說(shuō)可以一次性確認(rèn)某個(gè)隊(duì)列小于等于delivery_tag值的所有消息
basicNack的參數(shù)說(shuō)明:
第一個(gè)參數(shù)為deliveryTag,也就是每個(gè)消息標(biāo)記index,消息標(biāo)記值從1開(kāi)始,依次遞增 第二個(gè)參數(shù)為multiple,表示是否批量,如果為true,那么小于或者等于該消息標(biāo)記的消息(如果還沒(méi)有簽收)都會(huì)拒絕簽收 第三個(gè)參數(shù)為requeue,表示被拒絕的消息是否重回隊(duì)列,如果設(shè)置為true,則消息重新回到queue,那么broker會(huì)重新推送該消息給消費(fèi)端,如果設(shè)置為false,則消息在隊(duì)列中被刪除,即消息會(huì)被直接丟失(當(dāng)然如果為false,還有一種情況就是放到死信隊(duì)列)
啟動(dòng)之前的生產(chǎn)者發(fā)送消息給test_queue_confirm隊(duì)列,如果拋出異常則該消息一直重發(fā)
2.2 消息可靠性總結(jié)
持久化
exchange要持久化queue要持久化message要持久化
生產(chǎn)方確認(rèn)Confirm 消費(fèi)方確認(rèn)Ack Broker高可用
3. 死信隊(duì)列
TTL(time to live)
TTL是rabbitmq 中一個(gè)消息或者隊(duì)列的屬性,表明一條信息或者該隊(duì)列中的所有消息的最大存活時(shí)間。單位是毫秒,換句話說(shuō),如果一條信息設(shè)置了TTL 屬性或者設(shè)置TTL屬性的隊(duì)列,那么這個(gè)條信息如果在TTL設(shè)置的時(shí)間沒(méi)有被消費(fèi),則會(huì)成為死信。如果同時(shí)配置了隊(duì)列的TTL和消息的TTL, 那么較小的那個(gè)值將會(huì)被使用。
死信隊(duì)列
英文縮寫(xiě):DLX 。Dead Letter Exchange(死信交換機(jī)),當(dāng)消息在隊(duì)列成為Dead message后,通過(guò)該隊(duì)列把這條死信消息發(fā)給另一個(gè)交換機(jī),這個(gè)交換機(jī)就是DLX。
消息成為死信的三種情況(面試常問(wèn)):
隊(duì)列消息長(zhǎng)度到達(dá)限制(淘汰最早的消息);消費(fèi)者拒接消費(fèi)消息,basicNack/basicReject,并且不把消息重新放入原目標(biāo)隊(duì)列,requeue=false;原隊(duì)列存在消息過(guò)期設(shè)置,消息到達(dá)超時(shí)時(shí)間未被消費(fèi);
隊(duì)列綁定死信交換機(jī): 給隊(duì)列設(shè)置參數(shù): x-dead-letter-exchange 和 x-dead-letter-routing-key
5.1 死信隊(duì)列實(shí)現(xiàn)過(guò)程
1、創(chuàng)建RabbitMQ隊(duì)列與交換機(jī)綁定的配置類com…rabbitmq.config.RabbitMQConfig
package com.woniu.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.SerializerMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitMQConfig {
// 1. 創(chuàng)建DLX交換機(jī)
@Bean
public DirectExchange dlxDirectExchange(){
return new DirectExchange("dlx_exchange");
}
// 2. ttl隊(duì)列
@Bean
public Queue ttlQueue(){
Map
// 1、正常隊(duì)列綁定死信交換機(jī)-->
// 1.1 x-dead-letter-exchange 死信交換機(jī)的名稱
args.put("x-dead-letter-exchange", "dlx_exchange");
// 1.2 x-dead-letter-routing-key 正常隊(duì)列發(fā)送消息到死信 交換機(jī)的routingKey
args.put("x-dead-letter-routing-key", "dlx01");
// 2 消息成為死信的三種情況
// 2.1 設(shè)置隊(duì)列的過(guò)期時(shí)間 ttl x-message-ttl
args.put("x-message-ttl", 1000 * 10);
// 2.2 設(shè)置隊(duì)列的長(zhǎng)度限制 x-max-length 10條消息,超過(guò)進(jìn)死信
args.put("x-max-length", 10);
// 2.3 消費(fèi)者拒接消費(fèi)消息,并且不重回隊(duì)列 這種情況后面在消費(fèi)工程測(cè)試
return QueueBuilder.durable("ttlQueue").withArguments(args).build();
}
// 3. 死信隊(duì)列
@Bean
public Queue dlxQ(){
return new Queue("dlxQ");
}
// 4.dlxQ綁定DXL交換機(jī)
@Bean
public Binding dlxBinding(){
return BindingBuilder.bind(dlxQ()).to(dlxDirectExchange()).with("dlx01");
}
}
2、生產(chǎn)者工程測(cè)試:
//死信隊(duì)列測(cè)試
@Test
public void testDlx(){
//1、測(cè)試過(guò)期時(shí)間,死信消息
//rabbitTemplate.convertAndSend("ttlQueue","我是一條消息,我會(huì)死嗎");
//2、測(cè)試隊(duì)列長(zhǎng)度限制,消息死信
for (int i = 0; i < 20 ; i++) {
rabbitTemplate.convertAndSend("ttlQueue","我是一條消息,我會(huì)死嗎");
}
//前兩步測(cè)試結(jié)果:死信隊(duì)列會(huì)有21條記錄 1(過(guò)期) + 10(限制)+10(正常隊(duì)列過(guò)期后的10條)
}
3、消息成為死信的第三種情況實(shí)現(xiàn)
1).在配置文件中 添加手動(dòng)確認(rèn)的配置
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual
2).添加正常隊(duì)列的監(jiān)聽(tīng)器
package com.woniu.rabbitmq.mq;
import com.rabbitmq.client.Channel;
import lombok.SneakyThrows;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/*
* @Date:2023/2/28
* @Description:
*/
@Component
@RabbitListener(queues = "ttlQueue")
public class TtlListener {
@SneakyThrows
@RabbitHandler
public void dlxQ(String msg, Message message, Channel channel){
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.println(new String(message.getBody()));
System.out.println("處理業(yè)務(wù)邏輯");
int m = 1/0;
channel.basicAck(deliveryTag,true);
}catch(Exception ex){
/** basicNack(long deliveryTag, boolean multiple, boolean requeue)
* multiple是否批量. true:將一次性拒絕所有小于或者等于deliveryTag的消息。
* requeue:被拒絕的消息是否重回隊(duì)列。如果設(shè)置為true,則消息重新回到queue,broker會(huì)重新發(fā)送該消息給消費(fèi)端,如果 * 為 requeue=false,不重回隊(duì)列,則消息發(fā)送最終到死信隊(duì)列
*/
channel.basicNack(deliveryTag,true,false);
}
}
}
3).在生產(chǎn)端的testDlx方法再次給正常交換機(jī)發(fā)送消息
//死信隊(duì)列測(cè)試
@Test
public void testDlx(){
rabbitTemplate.convertAndSend("ttlQueue","我是一條消息,我會(huì)死嗎?");
}
4. 延遲隊(duì)列
延遲隊(duì)列,即消息進(jìn)入隊(duì)列后不會(huì)立即被消費(fèi)者調(diào)用,只有到達(dá)指定時(shí)間后,才會(huì)被調(diào)用者調(diào)用消費(fèi)。
如下需求:
下單后,30分鐘未支付,取消訂單,回滾庫(kù)存。
當(dāng)用戶提交訂單后,數(shù)據(jù)庫(kù)保存訂單信息,同時(shí)庫(kù)存表相應(yīng)的庫(kù)存減少,然后消息隊(duì)列保存訂單的信息(如訂單Id),此時(shí)庫(kù)存系統(tǒng)監(jiān)聽(tīng)隊(duì)列,隊(duì)列不會(huì)把消息立刻發(fā)送給庫(kù)存,而是過(guò)30分鐘再把信息發(fā)送給庫(kù)存系統(tǒng),庫(kù)存系統(tǒng)去查詢訂單數(shù)據(jù)庫(kù),根據(jù)訂單id查詢,如果該訂單還沒(méi)有支付,則取消訂單,回滾庫(kù)存,如果支付過(guò)了,則庫(kù)存表什么都不用做。也就是給用戶30分鐘的機(jī)會(huì),一個(gè)訂單在30分鐘后還沒(méi)有支付,則該訂單的庫(kù)存信息直接回滾。
新用戶注冊(cè)成功7天后,發(fā)送短信問(wèn)候。
實(shí)現(xiàn)方式:
定時(shí)器:我們可以寫(xiě)一段代碼,在某個(gè)時(shí)間段查詢訂單表的支付情況。把提交訂單的時(shí)間查出來(lái)和當(dāng)前系統(tǒng)時(shí)間比較,30分鐘之類如果訂單狀態(tài)為支付,則取消該訂單,大家思考一下有什么問(wèn)題?延遲隊(duì)列:很可惜,在RabbitMQ中并未提供延遲隊(duì)列功能。但是可以使用:TTL+死信隊(duì)列 組合實(shí)現(xiàn)延遲隊(duì)列的效果。
6.1 延遲隊(duì)列實(shí)現(xiàn)過(guò)程
通過(guò)插件實(shí)現(xiàn)
a, 把插件放入rabbitmq安裝目錄的plugins目錄
b, 進(jìn)入rabbitmq 安裝目錄的sbin 目錄
執(zhí)行下面命令讓改插件生效
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
代碼實(shí)現(xiàn)
1:定義個(gè)一個(gè)延遲交換機(jī),一個(gè)隊(duì)列
// 延遲交換機(jī),消息隊(duì)列
@Bean
public Queue delayQueue(){
return QueueBuilder.durable("delay").build();
}
/**
* CustomExchange: 自定義交換機(jī) ,是fanout,direct,topic 交換機(jī)
* @return
*/
@Bean
public CustomExchange customExchange(){
Map map = new HashMap();
//指定交換機(jī)類型
map.put("x-delayed-type","direct");
/**
* arg1:交換機(jī)名字
* arg2: 交換機(jī)信息類型,延遲消息
* arg3: 是否持久化,是否將沒(méi)有被消費(fèi)的消息持久化
* arg4: 沒(méi)有隊(duì)列綁定到交換機(jī),交換機(jī)是否刪除。
* arg5: 初始化參數(shù)
*/
return new CustomExchange("customExchange","x-delayed-message",
true,false,map);
}
/**
* 交換機(jī)綁定隊(duì)列
* @param delayQueue
* @param customExchange
* @return
*/
@Bean
public Binding delayQueueTocustomExchange(Queue delayQueue,CustomExchange customExchange){
return BindingBuilder.bind(delayQueue).to(customExchange).with("delay").noargs();
}
2: 定義個(gè)生產(chǎn)者
/**
* 往消息隊(duì)列返送消息
*/
@Component
@Slf4j
public class DelayProduct {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(String msg,int delayTime){
log.info("發(fā)送消息");
rabbitTemplate.convertAndSend("customExchange", "delay", msg,
new MessagePostProcessor() {
/**
* 在消息發(fā)送到消息隊(duì)列之前對(duì)消息進(jìn)行處理
* @param message
* @return
* @throws AmqpException
*/
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 給消息設(shè)置過(guò)期時(shí)間,單位是毫秒
message.getMessageProperties().setDelay(delayTime);
return message;
}
});
}
}
3: 定義個(gè)消費(fèi)者
@Component
@Slf4j
public class DelayConsumer {
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = "delay")
public void recevied(String msg,Channel channel,Message message) throws IOException {
log.info("消費(fèi)消息:"+ msg);
/**
* 消費(fèi)信息的id.
* 是否批量確認(rèn)信息
*/
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
5. 日志與監(jiān)控
7.1 RabbitMQ日志
RabbitMQ默認(rèn)日志存放路徑: /var/log/rabbitmq/rabbit@xxx.log
日志包含了RabbitMQ的版本號(hào)、Erlang的版本號(hào)、RabbitMQ服務(wù)節(jié)點(diǎn)名稱、cookie的hash值、RabbitMQ配置文件地址、內(nèi)存限制、磁盤(pán)限制、默認(rèn)賬戶guest的創(chuàng)建以及權(quán)限配置等等。
7.2 rabbitmq常用命令
1、查看隊(duì)列
rabbitmqctl list_queues #查看所有虛擬主機(jī)里面的隊(duì)列
rabbitmqctl list_queues -p /vhost #查看某個(gè)虛擬主機(jī)里面的隊(duì)列
2、刪除所有隊(duì)列
rabbitmqctl stop_app #關(guān)閉應(yīng)用
rabbitmqctl reset #清除隊(duì)列中的消息
rabbitmqctl start_app # 再次啟動(dòng)此應(yīng)用
注意:此方式,會(huì)同時(shí)刪除一些配置信息,需要慎用
3、查看rabbitmq中的交換機(jī)
rabbitmqctl list_exchanges [-p vhost]
4、rabbitmq的用戶操作命令
rabbitmqctl list_users
rabbitmqctl add_user 用戶名 密碼
rabbitmqctl delete_user 用戶名
5、查看未被確認(rèn)的隊(duì)列
rabbitmqctl list_queues name messages_unacknowledged
6、查看隊(duì)列環(huán)境變量
rabbitmqctl environment
7、查看隊(duì)列消費(fèi)者信息
rabbitmqctl list_consumers
8、查看隊(duì)列連接
rabbitmqctl list_connections
9、查看準(zhǔn)備就緒的隊(duì)列
rabbitmqctl list_queues name messages_ready
10、查看單個(gè)隊(duì)列的內(nèi)存使用
rabbitmqctl list_queues name memory
11、列出所有虛擬主機(jī)
rabbitmqctl list_vhosts
rabbitmqctl status | grep rabbit ##查看rabbitmq的版本
6 消息追蹤
在使用任何消息中間件的過(guò)程中,難免會(huì)出現(xiàn)某條消息異常丟失的情況。
對(duì)于RabbitMQ而言,可能是因?yàn)樯a(chǎn)者或消費(fèi)者與RabbitMQ斷開(kāi)了連接,而它們與RabbitMQ又采用了不同的確認(rèn)機(jī)制;也有可能是因?yàn)榻粨Q器與隊(duì)列之間不同的轉(zhuǎn)發(fā)策略;甚至是交換器并沒(méi)有與任何隊(duì)列進(jìn)行綁定,生產(chǎn)者又不感知或者沒(méi)有采取相應(yīng)的措施;另外RabbitMQ本身的集群策略也可能導(dǎo)致消息的丟失。這個(gè)時(shí)候就需要有一個(gè)較好的機(jī)制跟蹤記錄消息的投遞過(guò)程,以此協(xié)助開(kāi)發(fā)和運(yùn)維人員進(jìn)行問(wèn)題的定位。
在RabbitMQ中可以使用Firehose和rabbitmq_tracing插件功能來(lái)實(shí)現(xiàn)消息追蹤。
8.1 消息追蹤-Firehose(了解)
firehose的機(jī)制是將生產(chǎn)者投遞給隊(duì)列的消息,以及隊(duì)列投遞給消費(fèi)者的消息按照指定的格式發(fā)送到默認(rèn)的exchange上。這個(gè)默認(rèn)的exchange的名稱為amq.rabbitmq.trace,它是一個(gè)topic類型的exchange。發(fā)送到這個(gè)exchange上的消息的routing key為 publish.exchangename 和 deliver.queuename。其中exchangename和queuename為實(shí)際交換機(jī)和隊(duì)列的名稱,分別對(duì)應(yīng)生產(chǎn)者投遞到exchange的消息,和消費(fèi)者從queue上獲取的消息。
1、打開(kāi)trace 功能
rabbitmqctl trace_on [-p vhost] ##開(kāi)啟Firehose命令
打開(kāi) trace 會(huì)影響消息寫(xiě)入功能,適當(dāng)打開(kāi)后請(qǐng)關(guān)閉,關(guān)閉Firehose命令:rabbitmqctl trace_off [-p vhost],打開(kāi)后會(huì)多一個(gè)交換機(jī),如下圖
2、新建一個(gè)消息隊(duì)列,并給該交換機(jī)綁定一個(gè)消息隊(duì)列
3、打開(kāi)任何一個(gè)其他的隊(duì)列,并往隊(duì)列發(fā)送一條消息,則這個(gè)test_trace隊(duì)列也會(huì)有其他隊(duì)列的消息
8.2 消息追蹤-rabbitmq_tracing
rabbitmq_tracing和Firehose在實(shí)現(xiàn)上如出一轍,只不過(guò)rabbitmq_tracing的方式比Firehose多了一層GUI的包裝,更容易使用和管理。
1、啟用插件:
[root@localhost ~]# rabbitmq-plugins list ###查詢插件
[root@localhost ~]# rabbitmq-plugins enable rabbitmq_tracing
1、新建一個(gè)trace,將來(lái)所有的消息都被trace保存起來(lái),文件的默認(rèn)路徑為/var/tmp/rabbitmq-tracing
不管在哪個(gè)隊(duì)列發(fā)送消息,都會(huì)保存到日志文件mytrace.log中
如果是用其它的用戶創(chuàng)建這個(gè)消息日志。則需要在/etc/rabbitmq/rabbit.config配置文件添加如下內(nèi)容:創(chuàng)建的用戶名和密碼
{rabbitmq_tracing,
[
{directory, "/var/log/rabbitmq/rabbitmq_tracing"},
{username, "woniu"},
{password, "woniu"}
]
}
重啟消息隊(duì)列服務(wù)器即可
7.RabbitMQ應(yīng)用問(wèn)題
消息可靠性保障、消息冪等性處理 、微服務(wù)中用消息隊(duì)列實(shí)現(xiàn)微服務(wù)的異步調(diào)用,而用openfeign采用的同步
9.1 消息可靠性保障-消息補(bǔ)償
消息補(bǔ)償機(jī)制
需求:100%確保消息發(fā)送成功
9.2 消息冪等性保障-樂(lè)觀鎖(了解)
冪等性指一次和多次請(qǐng)求某一個(gè)資源,對(duì)于資源本身應(yīng)該具有同樣的結(jié)果。也就是說(shuō),其任意多次執(zhí)行對(duì)資源本身所產(chǎn)生的影響均與一次執(zhí)行的影響相同。MQ中指,消費(fèi)多條相同的消息,得到與消費(fèi)該消息一次相同的結(jié)果。
樂(lè)觀鎖解決方案
第一次生產(chǎn)者發(fā)送一條消息,但是消費(fèi)方系統(tǒng)宕機(jī),即不能立即消費(fèi),于是回調(diào)檢查服務(wù)監(jiān)聽(tīng)不到Q2的響應(yīng)消息,也不會(huì)寫(xiě)入數(shù)據(jù)庫(kù)MDB,當(dāng)隔一段時(shí)間后,生產(chǎn)者又發(fā)送一條延遲消息到Q3隊(duì)列,回調(diào)檢查服務(wù)能監(jiān)聽(tīng)到Q3隊(duì)列消息,于是和MDB去比較是否有,由于消費(fèi)方的失敗,消息最終沒(méi)有入庫(kù)MDB,這個(gè)時(shí)候回調(diào)檢查服務(wù)和MDB數(shù)據(jù)庫(kù)比對(duì)失敗,于是通知生產(chǎn)者,重新發(fā)送一條消息給消費(fèi)者,那么這個(gè)時(shí)候Q1就有2條消息了,當(dāng)消費(fèi)方正常運(yùn)行的時(shí)候,由于監(jiān)聽(tīng)的Q1是兩條2消息,怎么辦呢?樂(lè)觀鎖
第一次執(zhí)行:version=1 update account set money = money - 500 , version = version + 1 where id = 1 and version = 1
第二次執(zhí)行:version=2 update account set money = money - 500 , version = version + 1 where id = 1 and version = 1
9.3 消息積壓?jiǎn)栴}
實(shí)際場(chǎng)景可能有這樣現(xiàn)象:大量消息在rabbitmq里積壓了幾個(gè)小時(shí)了還沒(méi)消息,怎么辦?
[root@localhost ~]# rabbitmq-plugins list ###查詢插件
[root@localhost ~]# rabbitmq-plugins enable rabbitmq_tracing
1、新建一個(gè)trace,將來(lái)所有的消息都被trace保存起來(lái),文件的默認(rèn)路徑為/var/tmp/rabbitmq-tracing
不管在哪個(gè)隊(duì)列發(fā)送消息,都會(huì)保存到日志文件mytrace.log中
如果是用其它的用戶創(chuàng)建這個(gè)消息日志。則需要在/etc/rabbitmq/rabbit.config配置文件添加如下內(nèi)容:創(chuàng)建的用戶名和密碼
{rabbitmq_tracing,
[
{directory, "/var/log/rabbitmq/rabbitmq_tracing"},
{username, "woniu"},
{password, "woniu"}
]
}
重啟消息隊(duì)列服務(wù)器即可
7.RabbitMQ應(yīng)用問(wèn)題
消息可靠性保障、消息冪等性處理 、微服務(wù)中用消息隊(duì)列實(shí)現(xiàn)微服務(wù)的異步調(diào)用,而用openfeign采用的同步
9.1 消息可靠性保障-消息補(bǔ)償
消息補(bǔ)償機(jī)制
需求:100%確保消息發(fā)送成功
9.2 消息冪等性保障-樂(lè)觀鎖(了解)
冪等性指一次和多次請(qǐng)求某一個(gè)資源,對(duì)于資源本身應(yīng)該具有同樣的結(jié)果。也就是說(shuō),其任意多次執(zhí)行對(duì)資源本身所產(chǎn)生的影響均與一次執(zhí)行的影響相同。MQ中指,消費(fèi)多條相同的消息,得到與消費(fèi)該消息一次相同的結(jié)果。
樂(lè)觀鎖解決方案
第一次生產(chǎn)者發(fā)送一條消息,但是消費(fèi)方系統(tǒng)宕機(jī),即不能立即消費(fèi),于是回調(diào)檢查服務(wù)監(jiān)聽(tīng)不到Q2的響應(yīng)消息,也不會(huì)寫(xiě)入數(shù)據(jù)庫(kù)MDB,當(dāng)隔一段時(shí)間后,生產(chǎn)者又發(fā)送一條延遲消息到Q3隊(duì)列,回調(diào)檢查服務(wù)能監(jiān)聽(tīng)到Q3隊(duì)列消息,于是和MDB去比較是否有,由于消費(fèi)方的失敗,消息最終沒(méi)有入庫(kù)MDB,這個(gè)時(shí)候回調(diào)檢查服務(wù)和MDB數(shù)據(jù)庫(kù)比對(duì)失敗,于是通知生產(chǎn)者,重新發(fā)送一條消息給消費(fèi)者,那么這個(gè)時(shí)候Q1就有2條消息了,當(dāng)消費(fèi)方正常運(yùn)行的時(shí)候,由于監(jiān)聽(tīng)的Q1是兩條2消息,怎么辦呢?樂(lè)觀鎖
第一次執(zhí)行:version=1 update account set money = money - 500 , version = version + 1 where id = 1 and version = 1
第二次執(zhí)行:version=2 update account set money = money - 500 , version = version + 1 where id = 1 and version = 1
9.3 消息積壓?jiǎn)栴}
實(shí)際場(chǎng)景可能有這樣現(xiàn)象:大量消息在rabbitmq里積壓了幾個(gè)小時(shí)了還沒(méi)消息,怎么辦?
這種時(shí)候只好采用 “丟棄+批量重導(dǎo)” 的方式來(lái)解決了,臨時(shí)寫(xiě)個(gè)程序,連接到mq里面消費(fèi)數(shù)據(jù),收到消息之后直接將其丟棄,快速消費(fèi)掉積壓的消息,降低MQ的壓力?;蛘叨鄦讉€(gè)消費(fèi)端。
柚子快報(bào)激活碼778899分享:分布式 rabbitmq
參考閱讀
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場(chǎng)。
轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。