柚子快報(bào)激活碼778899分享:分布式 RabbitMQ實(shí)戰(zhàn)
文章目錄
1、簡介2、MQ優(yōu)點(diǎn)缺點(diǎn)MQ的應(yīng)用場景AMQP工作原理市面上常見的MQ
3、Linux安裝RabbitMQ3.1 版本對應(yīng)3.2 安裝socat3.3 下載 Erlang/OTP、安裝、驗(yàn)證 erlang方法一:1. 下載2. 將下載的Erlang服務(wù)上傳到服務(wù)器上面3. 解壓4. 編譯erlang的依賴環(huán)境5. 安裝Erlang6. 配置Erlang環(huán)境7. 測試Erlang是否安裝成功
方法二:1. 下載2. 安裝3. 驗(yàn)證 erlang 是否安裝成功。4. 卸載 erlang(遇到下載的erlang與rabbitmq-server 版本沖突)5. 重新安裝 erlang 和驗(yàn)證 erlang
3.4 安裝、驗(yàn)證rabbitmq-server(rabbitMQ服務(wù)器)1. 下載RabbitMQ2. 將RabbitMQ上傳到服務(wù)器3. 解壓RabbitMQ服務(wù)4. 配置環(huán)境變量5. 開啟web管理插件6. 啟動(dòng)RabbitMQ服務(wù)7. 訪問RabbitMQ管理界面8. 設(shè)置允許遠(yuǎn)程訪問方法一——新加用戶方法二——設(shè)置guest
4、RabbitMQ實(shí)戰(zhàn)4.1 什么是消息隊(duì)列4.2 RabbitMQ簡介4.3 消息隊(duì)列應(yīng)用場景1. 任務(wù)異步處理:2. 應(yīng)用程序解耦合:
4.4 RabbitMQ的工作原理1. 組成部分說明:2. 生產(chǎn)者發(fā)送消息流程:3. 消費(fèi)者接收消息流程:
4.5 六種工作模式4.5.1 基本消息模式(簡單消息模式)1.1 案例實(shí)戰(zhàn)1、 新建一個(gè)maven工程2、 添加依賴3、 再到j(luò)ava目錄下創(chuàng)建org.example.util包,在此包下創(chuàng)建連接工具類:4、 生產(chǎn)者發(fā)送消息4.1 在org.example.simple包下創(chuàng)建Send類,用于生產(chǎn)者發(fā)送消息。4.2 運(yùn)行上述main方法,在控制臺(tái)打印信息如下所示:4.3 打開瀏覽器訪問:http://IP:156724.4 如下圖點(diǎn)擊Queues,可以在隊(duì)列列表中可以看到名為simple_queue的隊(duì)列。4.5 點(diǎn)擊隊(duì)列名稱simple_queue,進(jìn)入詳情頁 --->Get messages,可以查看消息:
5、消費(fèi)者接收消息5.1 在org.example.simple包下創(chuàng)建Receiver類,用于消費(fèi)者接收消息5.2 運(yùn)行上述main方法,在控制臺(tái)打印信息如下:5.3 打開瀏覽器訪問:http://IP:156725.4 再看看隊(duì)列的消息,已經(jīng)被消費(fèi)了,Ready值為0,Total值也為0了。
1.2 消息確認(rèn)機(jī)制ACK1、 在org.example.simple包下創(chuàng)建ACKReceiver類,用于消費(fèi)者接收消息2、自動(dòng)ACK存在的問題2.1 修改消費(fèi)者Receiver類的代碼2.2 生產(chǎn)者Send類不做任何修改,**直接運(yùn)行Send類中的main方法**,2.3 運(yùn)行Receiver類消費(fèi)者中的main方法,程序拋出異常:2.4再查看rabbitmq的web管理界面:
3、演示手動(dòng)ACK3.1 重新運(yùn)行生產(chǎn)者Send中的main方法,實(shí)現(xiàn)發(fā)送消息,3.2 再修改ACKReceiver類中的handleDelivery方法3.3 再運(yùn)行ACKReceiver類中的main方法,程序拋出異常:3.4 查看web管理頁面:
4、最后消息確認(rèn)機(jī)制的正確做法4.1 我們要在監(jiān)聽隊(duì)列時(shí)設(shè)置第二個(gè)參數(shù)為false,代碼中手動(dòng)進(jìn)行ACK4.2 最后運(yùn)行ACKReceiver類中的main方法,查看web管理頁面
4.5.2 work工作隊(duì)列模式2.1 案例實(shí)戰(zhàn)1、生產(chǎn)者2、消費(fèi)者13、消費(fèi)者24、進(jìn)行消息消費(fèi)
2.2 能者多勞2.3 訂閱模型分類1. 說明2. Exchange類型有以下幾種:
4.5.3 Publish/subscribe | 發(fā)布/訂閱模式 (交換機(jī)類型:Fanout,也稱為廣播)3.1 案例實(shí)戰(zhàn)1、生產(chǎn)者2、消費(fèi)者1(注冊成功發(fā)給短信服務(wù))3、消費(fèi)者2(注冊成功發(fā)給郵件服務(wù))4、進(jìn)行消息消費(fèi)5、思考5.1 publish/subscribe與work queues有什么區(qū)別。5.2 實(shí)際工作用 publish/subscribe還是work queues?
4.5.4 Routing 路由模式(交換機(jī)類型:direct)4.1 案例實(shí)戰(zhàn)1、生產(chǎn)者2、消費(fèi)者1(使用routing key為sms來綁定隊(duì)列與交換機(jī))3、消費(fèi)者2(用routing key為email來綁定隊(duì)列與交換機(jī))
4.5.5 Topics通配符模式(交換機(jī)類型:topics)4.1 Topics模型示意圖:4.2 通配符規(guī)則4.3 舉例4.4 案例實(shí)戰(zhàn)1、生產(chǎn)者2、消費(fèi)者13、消費(fèi)者2
4.6 SpringBoot整合RabbitMQ4.6.1 創(chuàng)建SpringBoot項(xiàng)目4.6.2 添加依賴4.6.3 添加配置4.6.4 添加配置類4.6.5 在測試類中添加生產(chǎn)者,并發(fā)送消息4.6.6 生產(chǎn)者發(fā)送消息測試結(jié)果4.6.7 創(chuàng)建消息接收器(消費(fèi)者)4.6.8 消費(fèi)者消費(fèi)結(jié)果
5、RabbitMQ問題相關(guān)解決方案5.1 生產(chǎn)端可靠性投遞方案介紹5.1.1 關(guān)于怎么保證生產(chǎn)者的可靠性投遞5.1.2 消息發(fā)送方案一:消息落庫,對消息狀態(tài)進(jìn)行標(biāo)記方案二:消息延遲投遞,做二次確認(rèn),回調(diào)檢查
5.1.3 消息落庫,對消息狀態(tài)打標(biāo)的具體實(shí)現(xiàn)開啟消息回調(diào)機(jī)制1. 創(chuàng)建數(shù)據(jù)庫2. 創(chuàng)建Spring Boot項(xiàng)目3. 添加依賴4. 添加配置5. 創(chuàng)建實(shí)體類6. 創(chuàng)建mapper7. 創(chuàng)建Controller8. 創(chuàng)建配置類9. 進(jìn)行任務(wù)調(diào)度10.最終結(jié)果1、發(fā)送消息成功2、模擬生產(chǎn)者消息首次發(fā)送失敗3、模擬消息首次發(fā)送失敗,定時(shí)任務(wù)重試也失敗
11.在高并發(fā)的場景下是否合適?
5.2 [RabbitMQ](https://so.csdn.net/so/search?q=RabbitMQ&spm=1001.2101.3001.7020) 如何避免消息重復(fù)消費(fèi)?5.2.1 冪等性5.2.2 高并發(fā)的情況下如何避免消息重復(fù)消費(fèi)5.2.3 解決重復(fù)消費(fèi)的案例代碼1. 添加依賴2. 添加配置3. 生產(chǎn)者代碼4. 消費(fèi)者代碼5. 最終結(jié)果1、消息消費(fèi)成功2、模擬消息重復(fù)消費(fèi)場景
5.3 [RabbitMQ](https://so.csdn.net/so/search?q=RabbitMQ&spm=1001.2101.3001.7020) 如何避免消息積壓?5.3.1 解決方案
更多命令
1、簡介
素材:鏈接:https://pan.baidu.com/s/1YjVM9WBEIVCbYZmlzowyKw?pwd=lpkz
官網(wǎng):https://www.rabbitmq.com/
RabbitMQ 是一個(gè)開源的消息隊(duì)列中間件,采用 Erlang 語言編寫,支持多種消息協(xié)議,如 AMQP、MQTT、STOMP 等。它可以作為消息的中轉(zhuǎn)站,在分布式系統(tǒng)中協(xié)調(diào)不同組件之間的數(shù)據(jù)傳輸,實(shí)現(xiàn)松耦合的系統(tǒng)架構(gòu)。
2、MQ
優(yōu)點(diǎn)
靈活的路由方式,支持消息的廣播、點(diǎn)對點(diǎn)、主題訂閱等多種路由方式;異步處理消息,提高系統(tǒng)的并發(fā)性能;持久化機(jī)制,保證在服務(wù)器宕機(jī)、重啟等情況下消息的可靠性;高可用和負(fù)載均衡,支持集群和鏡像模式,提供高可用和負(fù)載均衡的目標(biāo);插件機(jī)制,支持豐富的插件,如認(rèn)證授權(quán)、可視化管理等。
缺點(diǎn)
系統(tǒng)可用性降低: 系統(tǒng)引入的外部依賴越多,系統(tǒng)穩(wěn)定性越差。一旦MQ宕機(jī),就會(huì)對業(yè)務(wù)造成影響。系統(tǒng)復(fù)雜度提高: MQ的加入大大增加了系統(tǒng)的復(fù)雜度,以前系統(tǒng)間是同步的遠(yuǎn)程調(diào)用,現(xiàn)在是通過MQ進(jìn)行異步調(diào)用。一致性問題 : A系統(tǒng)處理完業(yè)務(wù),通過MQ給B、C、D三個(gè)系統(tǒng)發(fā)消息數(shù)據(jù),如果B系統(tǒng)、C系統(tǒng)處理成功,D系統(tǒng)處理失敗,則會(huì)造成數(shù)據(jù)處理的不一致。
MQ的應(yīng)用場景
高峰流量:搶紅包、秒殺活動(dòng)、搶火車票等這些業(yè)務(wù)場景都是短時(shí)間內(nèi)需要處理大量請求,如果直接連接系統(tǒng)處理業(yè)務(wù),會(huì)耗費(fèi)大量資源,有可能造成系統(tǒng)癱瘓。 而使用MQ后,可以先讓用戶將請求發(fā)送到MQ中,MQ會(huì)先保存請求消息,不會(huì)占用系統(tǒng)資源,且MQ會(huì)進(jìn)行消息排序,先請求的秒殺成功,后請求的秒殺失敗。消息分發(fā):如電商網(wǎng)站要推送促銷信息,該業(yè)務(wù)耗費(fèi)時(shí)間較多,但對時(shí)效性要求不高,可以使用MQ做消息分發(fā)。數(shù)據(jù)同步:假如我們需要將數(shù)據(jù)保存到數(shù)據(jù)庫之外,還需要一段時(shí)間將數(shù)據(jù)同步到緩存(如Redis)、搜索引擎(如Elasticsearch)中。此時(shí)可以將數(shù)據(jù)庫的數(shù)據(jù)作為消息發(fā)送到MQ中,并同步到緩存、 搜索引擎中。異步處理:在電商系統(tǒng)中,訂單完成后,需要及時(shí)的通知子系統(tǒng)(進(jìn)銷存系統(tǒng)發(fā)貨,用戶服務(wù)積分,發(fā)送短信)進(jìn)行下一步操作。為了保證訂單系統(tǒng)的高性能,應(yīng)該直接返回訂單結(jié)果,之后讓MQ通知子系統(tǒng)做其他非實(shí)時(shí)的業(yè)務(wù)操作。這樣能保證核心業(yè)務(wù)的高效及時(shí)離線處理:在銀行系統(tǒng)中,如果要查詢近十年的歷史賬單,這是非常耗時(shí)的操作。如果發(fā)送同步請求,則會(huì)花費(fèi)大量時(shí)間等待響應(yīng)。此時(shí)使用MQ發(fā)送異步請求,等到查詢出結(jié)果后獲取結(jié)果即可。
AMQP
1、什么是 AMQP : 即Advanced Message Queuing Protocol(高級(jí)消息隊(duì)列協(xié)議),是一個(gè)網(wǎng)絡(luò)協(xié)議,專門為消息中間件設(shè)計(jì)?;诖藚f(xié)議的客戶端與消息中間件可傳遞消息,并不受不同中間件產(chǎn)品,不同開發(fā)語言等條件的限制。2006年AMQP規(guī)范發(fā)布,類比HTTP。
2、AMQP工作過程: 生產(chǎn)者(Publisher)將消息發(fā)布到交換機(jī)(Exchange),交換機(jī)根據(jù)規(guī)則將消息分發(fā)給交換機(jī)綁定的隊(duì)列(Queue),隊(duì)列再將消息投遞給訂閱了此隊(duì)列的消費(fèi)者
工作原理
Producer【消息的生產(chǎn)者】 一個(gè)向交換機(jī)發(fā)布消息的客戶端應(yīng)用程序。Connection 【連接】 生產(chǎn)者/消費(fèi)者和RabbitMQ服務(wù)器之間建立的TCP連接。Channel【信道】 是TCP里面的虛擬連接。例如:Connection相當(dāng)于電纜,Channel相當(dāng)于獨(dú)立光纖束,一條TCP連接中可以創(chuàng)建多條信道,增加連接效率。無論是發(fā)布消息、接收消息、訂閱隊(duì)列都是通過信道完成的。Broker 消息隊(duì)列服務(wù)器實(shí)體。即RabbitMQ服務(wù)器Virtual Host【虛擬主機(jī)】 出于多租戶和安全因素設(shè)計(jì)的,把AMQP的基本組件劃分到一個(gè)虛擬的分組中。每個(gè)vhost本質(zhì)上就是一個(gè)mini版的RabbitMQ服務(wù)器,擁有自己的隊(duì)列、交換機(jī)、綁定和權(quán)限機(jī)制。當(dāng)多個(gè)不同的用戶使用同一個(gè)RabbitMQ服務(wù)器時(shí),可以劃分出多個(gè)虛擬主機(jī)。RabbitMQ默認(rèn)的虛擬主機(jī)路徑是 /Exchange 【交換機(jī)】 用來接收生產(chǎn)者發(fā)送的消息,并根據(jù)分發(fā)規(guī)則,將這些消息分發(fā)給服務(wù)器中的隊(duì)列中。不同的交換機(jī)有不同的分發(fā)規(guī)則。Queue【消息隊(duì)列】 用來保存消息直到發(fā)送給消費(fèi)者。它是消息的容器,也是消息的終點(diǎn)。消息一直在隊(duì)列里面,等待消費(fèi)者鏈接到這個(gè) 隊(duì)列將其取走。Binding 【綁定】 消息隊(duì)列和交換機(jī)之間的虛擬連接,綁定中包含路由規(guī)則,綁定信息保存到交換機(jī)的路由表中,作為消息的分發(fā)依據(jù)。Consumer【消息的消費(fèi)者】 表示一個(gè)從消息隊(duì)列中取得消息的客戶端應(yīng)用程序。
市面上常見的MQ
3、Linux安裝RabbitMQ
安裝rabbitmq分3個(gè)步: 1、先安裝socat, ——》2、安裝erlang, ——》3、安裝rabbitmq-server。
3.1 版本對應(yīng)
網(wǎng)址:https://www.rabbitmq.com/which-erlang.html
3.2 安裝socat
命令:yum -y install socat
3.3 下載 Erlang/OTP、安裝、驗(yàn)證 erlang
官網(wǎng):下載 - Erlang/OTP
方法一:
1. 下載
2. 將下載的Erlang服務(wù)上傳到服務(wù)器上面
cd /home
mkdir /home/rabbitMQ
cd /home/rabbitMQ
3. 解壓
tar -zvxf otp_src_24.0.tar.gz
4. 編譯erlang的依賴環(huán)境
跟大家講一下,erlang依賴的環(huán)境特別特別多,就拿gcc來說,如果以前安裝過這個(gè)環(huán)境還不止,所以我們重新安裝一下也無所謂所以我們執(zhí)行以下的命令:
解壓成功,安裝編譯所需要的依賴文件
yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel unixODBC-devel
等待安裝完畢
創(chuàng)建Erlang文件夾
mkdir /home/rabbitMQ/erlang
cd /home/rabbitMQ/otp_src_24.0
然后執(zhí)行下面的命令
./configure --prefix=/home/rabbitMQ/erlang --without-javac
5. 安裝Erlang
make : 編譯 make install : 安裝 && : 前面的命令執(zhí)行成功后面的命令才會(huì)執(zhí)行
make && make install
6. 配置Erlang環(huán)境
vi /etc/profile
加入
#set erlang environment
export ERLANG_HOME=/home/rabbitMQ/erlang
export PATH=$JAVA_HOME/bin:$JRE_HOME/bin:${ERLANG_HOME}/bin:$PATH
按Esc鍵 輸入 :wq (退出并保存) :q! (退出不保存)
刷新配置文件
7. 測試Erlang是否安裝成功
輸入命令: erl
如圖所示說明已經(jīng)安裝成功了??!
方法二:
1. 下載
下載命令: sudo yum install erlang
2. 安裝
接著上一步, 繼續(xù)回復(fù)“y”, 提示:見到Complete!(成功),表示安裝erlang 成功了。
3. 驗(yàn)證 erlang 是否安裝成功。
命令: yum info erlang 提示: erlang 的版本信息、軟件網(wǎng)址、占用大小空間等,就表示安裝成功了。
4. 卸載 erlang(遇到下載的erlang與rabbitmq-server 版本沖突)
執(zhí)行3條命令
yum list | grep erlang
yum -y remove erlang-*
yum remove erlang.x86_64
5. 重新安裝 erlang 和驗(yàn)證 erlang
安裝已經(jīng)下載好的erlang包, 文件路徑 ./rabbitMQ/ 文件下 安裝命令: rpm -ivh erlang-23.3-2.el8.x86_64.rpm 驗(yàn)證erlang命令: yum info erlang
3.4 安裝、驗(yàn)證rabbitmq-server(rabbitMQ服務(wù)器)
注意:需要下載Linux版本的
官網(wǎng):https://www.rabbitmq.com/
在RabbitMQ官網(wǎng)可以看到RabbitMQ對應(yīng)的Erlang版本
1. 下載RabbitMQ
2. 將RabbitMQ上傳到服務(wù)器
cd /home/rabbitMQ
3. 解壓RabbitMQ服務(wù)
根據(jù)壓縮包后綴不同使用不同的命令進(jìn)行解壓
xz -d rabbitmq-server-generic-unix-latest-toolchain-3.9.5.tar.xz
tar -xvf rabbitmq-server-generic-unix-latest-toolchain-3.9.5.tar
4. 配置環(huán)境變量
vi /etc/profile
加入
#set rabbitmq environment
export RABBITMQ=/home/rabbitMQ/rabbitmq_server-3.9.5
export PATH=$PATH:${RABBITMQ}/sbin
按Esc鍵 輸入 :wq (退出并保存) :q! (退出不保存)
刷新配置文件
source /etc/profile
5. 開啟web管理插件
cd /home/rabbitMQ/rabbitmq_server-3.9.5/sbin
./rabbitmq-plugins enable rabbitmq_management # 啟動(dòng)指定的插件:
啟動(dòng)插件成功
6. 啟動(dòng)RabbitMQ服務(wù)
ls
./rabbitmq-server -detached # 以守護(hù)進(jìn)程啟動(dòng)
7. 訪問RabbitMQ管理界面
瀏覽器訪問:http://IP:15672
看到如下這個(gè)界面就是正常啟動(dòng)了
8. 設(shè)置允許遠(yuǎn)程訪問
從上面截圖可以看到使用guest登錄,提示“User can only log in via localhost”,無法登錄,原因是3.3.0版本后禁止用戶在除locahost外的訪問,只能通過本地主機(jī)登錄。
方法一——新加用戶
新加個(gè)用戶,設(shè)置權(quán)限,設(shè)置角色。
rabbitmqctl add_user admin admin:這個(gè)命令是用來添加一個(gè)新的RabbitMQ用戶這個(gè)命令將創(chuàng)建一個(gè)名為admin的用戶,并設(shè)置其密碼為admin請注意,這兩個(gè)參數(shù)(用戶名和密碼)在你的問題中是硬編碼的,這在實(shí)際生產(chǎn)環(huán)境中并不安全,建議使用更復(fù)雜和隨機(jī)化的用戶名和密碼rabbitmqctl set_permissions -p / admin ".*" ".*" ".*":這個(gè)命令是設(shè)置用戶admin在RabbitMQ的權(quán)限這里的-p /參數(shù)表示設(shè)置的是全局權(quán)限而".*" ".*" ".*"表示賦予admin用戶所有權(quán)限,包括配置權(quán)限、寫權(quán)限和讀權(quán)限r(nóng)abbitmqctl set_user_tags admin administrator:這個(gè)命令是為用戶admin添加了一個(gè)標(biāo)簽(或者權(quán)限等級(jí))在這個(gè)例子中,添加的是administrator標(biāo)簽這個(gè)命令可能不是必要的,因?yàn)镽abbitMQ通常不會(huì)直接使用這種用戶標(biāo)簽
rabbitmqctl add_user admin admin
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
rabbitmqctl set_user_tags admin administrator
登錄成功
方法二——設(shè)置guest
在/home/rabbitMQ/rabbitmq_server-3.9.5/plugins/rabbit-3.9.5/ebin目錄下找到rabbit.app文件 (find / -name rabbit.app),修改參數(shù)。
{loopback_users, [<<"guest">>]}, 修改成{loopback_users, []},
重啟服務(wù)
rabbitmqctl stop #停止RabbitMQ
cd /home/rabbitMQ/rabbitmq_server-3.9.5/sbin
./rabbitmq-server -detached # 以守護(hù)進(jìn)程啟動(dòng)RabbitMQ
使用guest賬號(hào)登錄成功
4、RabbitMQ實(shí)戰(zhàn)
4.1 什么是消息隊(duì)列
MQ全稱為Message Queue,即消息隊(duì)列?!毕㈥?duì)列”是在消息的傳輸過程中保存消息的容器。它是典型的:生產(chǎn)者、消費(fèi)者模型。生產(chǎn)者不斷向消息隊(duì)列中生產(chǎn)消息,消費(fèi)者不斷的從隊(duì)列中獲取消息。因?yàn)橄⒌纳a(chǎn)和消費(fèi)都是異步的,而且只關(guān)心消息的發(fā)送和接收,沒有業(yè)務(wù)邏輯的侵入,這樣就實(shí)現(xiàn)了生產(chǎn)者和消費(fèi)者的解耦。
下圖中Producer為生產(chǎn)者,Queue為消息隊(duì)列,Consumer為消費(fèi)者
4.2 RabbitMQ簡介
RabbitMQ是一個(gè)開源的消息中間件,它實(shí)現(xiàn)了高效、可靠的消息傳遞機(jī)制,主要用于應(yīng)用程序之間的異步通信。它基于AMQP(高級(jí)消息隊(duì)列協(xié)議)規(guī)范設(shè)計(jì),支持多種編程語言,并提供了豐富的特性和靈活的架構(gòu)。
RabbitMQ的工作原理是利用隊(duì)列來存儲(chǔ)消息,并通過發(fā)布-訂閱模式實(shí)現(xiàn)消息的發(fā)送和接收。在這個(gè)模式下,消息的發(fā)送者將消息發(fā)布到一個(gè)交換器,交換器根據(jù)預(yù)定義的規(guī)則將消息路由到一個(gè)或多個(gè)隊(duì)列,然后消息的接收者從隊(duì)列中訂閱并消費(fèi)這些消息。
4.3 消息隊(duì)列應(yīng)用場景
1. 任務(wù)異步處理:
高并發(fā)環(huán)境下,由于來不及同步處理,請求往往會(huì)發(fā)生堵塞,比如說,大量的insert,update之類的請求同時(shí)到達(dá)MySQL,直接導(dǎo)致無數(shù)的行鎖表鎖,甚至最后請求會(huì)堆積過多,從而觸發(fā)too many connections錯(cuò)誤。通過使用消息隊(duì)列,我們可以異步處理請求,從而緩解系統(tǒng)的壓力。將不需要同步處理的并且耗時(shí)長的操作由消息隊(duì)列通知消息接收方進(jìn)行異步處理。減少了應(yīng)用程序的響應(yīng)時(shí)間。
2. 應(yīng)用程序解耦合:
MQ相當(dāng)于一個(gè)中介,生產(chǎn)方通過MQ與消費(fèi)方交互,它將應(yīng)用程序進(jìn)行解耦合
4.4 RabbitMQ的工作原理
1. 組成部分說明:
· Broker:消息隊(duì)列服務(wù)進(jìn)程,此進(jìn)程包括兩個(gè)部分:Exchange和Queue
· Exchange:消息隊(duì)列交換機(jī),按一定的規(guī)則將消息路由轉(zhuǎn)發(fā)到某個(gè)隊(duì)列,對消息進(jìn)行過濾。
· Queue:消息隊(duì)列,存儲(chǔ)消息的隊(duì)列,消息到達(dá)隊(duì)列并轉(zhuǎn)發(fā)給指定的接受者
· Producer:消息生產(chǎn)者,即生產(chǎn)方客戶端,生產(chǎn)方客戶端將消息發(fā)送給消息隊(duì)列
· Consumer:消息消費(fèi)者,即消費(fèi)方客戶端,接收MQ轉(zhuǎn)發(fā)的消息。
2. 生產(chǎn)者發(fā)送消息流程:
1、生產(chǎn)者和Broker建立TCP連接。
2、生產(chǎn)者和Broker建立Channel通道(信道)。
3、生產(chǎn)者通過Channel通道(信道)把消息發(fā)送給Broker,由Exchange將消息進(jìn)行轉(zhuǎn)發(fā)。
4、Exchange將消息轉(zhuǎn)發(fā)到指定的Queue(隊(duì)列)
3. 消費(fèi)者接收消息流程:
1、消費(fèi)者和Broker建立TCP連接
2、消費(fèi)者和Broker建立Channel通道(信道)
3、消費(fèi)者監(jiān)聽指定的Queue(隊(duì)列) (每個(gè)隊(duì)列都有一個(gè)名字)
4、當(dāng)有消息到達(dá)Queue時(shí)Broker默認(rèn)將消息推送給消費(fèi)者。
5、消費(fèi)者接收到消息。
6、ack(消息確認(rèn)機(jī)制)回復(fù)
4.5 六種工作模式
RabbitMQ有六種工作模式:基本消息模式、work消息模式、Publish/subscribe (交換機(jī)類型:Fanout,也稱為廣播模式)、Routing 路由模型(交換機(jī)類型:direct)、Topics 通配符模式(交換機(jī)類型:topics)、RPC
? 我們這里給大家重點(diǎn)介紹基本消息模式, Routing路由模式(重點(diǎn))、Topic通配符模式(重點(diǎn))。
4.5.1 基本消息模式(簡單消息模式)
在上圖的模型中,有以下概念:
P:生產(chǎn)者,也就是要發(fā)送消息的程序
C:消費(fèi)者:消息的接受者,會(huì)一直等待消息到來。
queue:消息隊(duì)列,圖中紅色部分??梢跃彺嫦?;生產(chǎn)者向其中投遞消息,消費(fèi)者從其中取出消息。
1.1 案例實(shí)戰(zhàn)
1、 新建一個(gè)maven工程
根據(jù)下面的步驟建立maven項(xiàng)目
2、 添加依賴
3、 再到j(luò)ava目錄下創(chuàng)建org.example.util包,在此包下創(chuàng)建連接工具類:
public class ConnectionUtil {
/**
* 建立與RabbitMQ的連接
* @return
* @throws Exception
*/
public static Connection getConnection() throws Exception {
//定義連接工廠
ConnectionFactory factory = new ConnectionFactory();
//設(shè)置服務(wù)地址 (因?yàn)閞abbitmq安裝到linux上面,這里填寫linux的IP地址)
factory.setHost("192.168.181.128");
//端口
factory.setPort(5672);
//設(shè)置賬號(hào)信息,用戶名、密碼(rabbitmq的用戶名和密碼)
factory.setUsername("guest");
factory.setPassword("guest");
// 通過工廠獲取連接
Connection connection = factory.newConnection();
return connection;
}
}
4、 生產(chǎn)者發(fā)送消息
4.1 在org.example.simple包下創(chuàng)建Send類,用于生產(chǎn)者發(fā)送消息。
public class Send {
private final static String QUEUE_NAME = "simple_queue"; // 隊(duì)列名
public static void main(String[] argv) throws Exception {
// 1、獲取到連接
Connection connection = ConnectionUtil.getConnection();
// 2、從連接中創(chuàng)建通道,使用通道才能完成消息相關(guān)的操作
Channel channel = connection.createChannel();
// 3、聲明(創(chuàng)建)隊(duì)列
//參數(shù):String queue, boolean durable, boolean exclusive, boolean autoDelete, Map
/**
* 參數(shù)明細(xì)
* 1、queue 隊(duì)列名稱
* 2、durable 是否持久化,如果持久化,mq重啟后隊(duì)列還在
* 3、exclusive 是否獨(dú)占連接,隊(duì)列只允許在該連接中訪問,如果connection連接關(guān)閉隊(duì)列則自動(dòng)刪除,如果將此參數(shù)設(shè)置true可用于臨時(shí)隊(duì)列的創(chuàng)建
* 4、autoDelete 自動(dòng)刪除,隊(duì)列不再使用時(shí)是否自動(dòng)刪除此隊(duì)列,如果將此參數(shù)和exclusive參數(shù)設(shè)置為true就可以實(shí)現(xiàn)臨時(shí)隊(duì)列(隊(duì)列不用了就自動(dòng)刪除)
* 5、arguments 參數(shù),可以設(shè)置一個(gè)隊(duì)列的擴(kuò)展參數(shù),比如:可設(shè)置存活時(shí)間
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 4、消息內(nèi)容
String message = "Hello World!";
// 向指定的隊(duì)列中發(fā)送消息
//參數(shù):String exchange, String routingKey, BasicProperties props, byte[] body
/**
* 參數(shù)明細(xì):
* 1、exchange,交換機(jī),如果不指定將使用mq的默認(rèn)交換機(jī)(設(shè)置為"")
* 2、routingKey,路由key,交換機(jī)根據(jù)路由key來將消息轉(zhuǎn)發(fā)到指定的隊(duì)列,如果使用默認(rèn)交換機(jī),routingKey設(shè)置為隊(duì)列的名稱
* 3、props,消息的屬性
* 4、body,消息內(nèi)容
*/
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
//關(guān)閉通道和連接(資源關(guān)閉最好用try-catch-finally語句處理)
channel.close();
connection.close();
}
}
4.2 運(yùn)行上述main方法,在控制臺(tái)打印信息如下所示:
4.3 打開瀏覽器訪問:http://IP:15672
web管理頁面:服務(wù)器地址/端口號(hào) 默認(rèn)用戶及密碼:guest,如果沒有配置可根據(jù) 設(shè)置允許遠(yuǎn)程訪問中進(jìn)行用戶名密碼配置
4.4 如下圖點(diǎn)擊Queues,可以在隊(duì)列列表中可以看到名為simple_queue的隊(duì)列。
4.5 點(diǎn)擊隊(duì)列名稱simple_queue,進(jìn)入詳情頁 —>Get messages,可以查看消息:
5、消費(fèi)者接收消息
5.1 在org.example.simple包下創(chuàng)建Receiver類,用于消費(fèi)者接收消息
public class Receiver{
private final static String QUEUE_NAME = "simple_queue"; //隊(duì)列名
public static void main(String[] argv) throws Exception {
// 獲取到連接
Connection connection = ConnectionUtil.getConnection();
//創(chuàng)建會(huì)話通道,生產(chǎn)者和mq服務(wù)所有通信都在channel通道中完成
Channel channel = connection.createChannel();
// 聲明隊(duì)列
//參數(shù):String queue, boolean durable, boolean exclusive, boolean autoDelete, Map
/**
* 參數(shù)明細(xì)
* 1、queue 隊(duì)列名稱
* 2、durable 是否持久化,如果持久化,mq重啟后隊(duì)列還在
* 3、exclusive 是否獨(dú)占連接,隊(duì)列只允許在該連接中訪問,如果connection連接關(guān)閉隊(duì)列則自動(dòng)刪除,如果將此參數(shù)設(shè)置true可用于臨時(shí)隊(duì)列的創(chuàng)建
* 4、autoDelete 自動(dòng)刪除,隊(duì)列不再使用時(shí)是否自動(dòng)刪除此隊(duì)列,如果將此參數(shù)和exclusive參數(shù)設(shè)置為true就可以實(shí)現(xiàn)臨時(shí)隊(duì)列(隊(duì)列不用了就自動(dòng)刪除)
* 5、arguments 參數(shù),可以設(shè)置一個(gè)隊(duì)列的擴(kuò)展參數(shù),比如:可設(shè)置存活時(shí)間
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//實(shí)現(xiàn)消費(fèi)方法
DefaultConsumer consumer = new DefaultConsumer(channel){
// 獲取消息,并且處理,這個(gè)方法類似事件監(jiān)聽,如果有消息的時(shí)候,會(huì)被自動(dòng)調(diào)用
/**
* 當(dāng)接收到消息后此方法將被調(diào)用
* @param consumerTag 消費(fèi)者標(biāo)簽,用來標(biāo)識(shí)消費(fèi)者的,在監(jiān)聽隊(duì)列時(shí)設(shè)置channel.basicConsume
* @param envelope 信封,通過envelope
* @param properties 消息屬性
* @param body 消息內(nèi)容
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// body 即消息體
String msg = new String(body,"utf-8");
System.out.println(" [x] received : " + msg + "!");
}
};
// 監(jiān)聽隊(duì)列,第二個(gè)參數(shù):是否自動(dòng)進(jìn)行消息確認(rèn)(用于監(jiān)聽queue隊(duì)列中是否收到了消息,如果收到消息自動(dòng)調(diào)用上面DefaultConsumer進(jìn)行默認(rèn)消費(fèi))。
//參數(shù):String queue, boolean autoAck, Consumer callback
/**
* 參數(shù)明細(xì):
* 1、queue 隊(duì)列名稱
* 2、autoAck 自動(dòng)回復(fù),當(dāng)消費(fèi)者接收到消息后要告訴mq消息已接收,如果將此參數(shù)設(shè)置為true表示會(huì)自動(dòng)回復(fù)mq,如果設(shè)置為false要通過編程實(shí)現(xiàn)回復(fù)
* 3、callback,消費(fèi)方法,當(dāng)消費(fèi)者接收到消息要執(zhí)行的方法
*/
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
5.2 運(yùn)行上述main方法,在控制臺(tái)打印信息如下:
5.3 打開瀏覽器訪問:http://IP:15672
web管理頁面:服務(wù)器地址/端口號(hào) 默認(rèn)用戶及密碼:guest,如果沒有配置可根據(jù) 設(shè)置允許遠(yuǎn)程訪問中進(jìn)行用戶名密碼配置
5.4 再看看隊(duì)列的消息,已經(jīng)被消費(fèi)了,Ready值為0,Total值也為0了。
我們發(fā)現(xiàn),消費(fèi)者已經(jīng)獲取了消息,但是程序沒有停止,一直在監(jiān)聽隊(duì)列中是否有新的消息。一旦有新的消息進(jìn)入隊(duì)列,就會(huì)立即打印
1.2 消息確認(rèn)機(jī)制ACK
通過剛才的案例可以看出,消息一旦被消費(fèi)者接收,隊(duì)列中的消息就會(huì)被刪除。
那么問題來了:RabbitMQ怎么知道消息被接收了呢?
如果消費(fèi)者領(lǐng)取消息后,還沒執(zhí)行操作就掛掉了呢?或者拋出了異常?消息消費(fèi)失敗,但是RabbitMQ無從得知,這樣消息就丟失了!
因此,RabbitMQ有一個(gè)ACK機(jī)制。當(dāng)消費(fèi)者獲取消息后,會(huì)向RabbitMQ發(fā)送回執(zhí)ACK,告知消息已經(jīng)被接收。不過這種回執(zhí)ACK分兩種情況:
? 自動(dòng)ACK:消息一旦被接收,消費(fèi)者自動(dòng)發(fā)送ACK
? 手動(dòng)ACK:消息接收后,不會(huì)發(fā)送ACK,需要手動(dòng)調(diào)用
大家覺得哪種更好呢?
這需要看消息的重要性:
? 如果消息不太重要,丟失也沒有影響,那么自動(dòng)ACK會(huì)比較方便
? 如果消息非常重要,不容丟失。那么最好在消費(fèi)完成后手動(dòng)ACK,否則接收消息后就自動(dòng)ACK,RabbitMQ就會(huì)把消息從隊(duì)列中刪除。如果此時(shí)消費(fèi)者宕機(jī),那么消息就丟失了。
之前的測試都是自動(dòng)ACK的,如果要手動(dòng)ACK,需要改動(dòng)我們的代碼。
1、 在org.example.simple包下創(chuàng)建ACKReceiver類,用于消費(fèi)者接收消息
public class ACKReceiver {
private final static String QUEUE_NAME = "simple_queue";
public static void main(String[] argv) throws Exception {
// 獲取到連接
Connection connection = ConnectionUtil.getConnection();
// 創(chuàng)建通道
final Channel channel = connection.createChannel();
// 聲明隊(duì)列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 定義隊(duì)列的消費(fèi)者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 獲取消息,并且處理,這個(gè)方法類似事件監(jiān)聽,如果有消息的時(shí)候,會(huì)被自動(dòng)調(diào)用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// body 即消息體
String msg = new String(body);
System.out.println(" [x] received : " + msg + "!");
// 手動(dòng)進(jìn)行ACK
/*
* void basicAck(long deliveryTag, boolean multiple) throws IOException;
* deliveryTag:用來標(biāo)識(shí)消息的id
* multiple:是否批量.true:將一次性ack所有小于deliveryTag的消息。
*/
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 監(jiān)聽隊(duì)列,第二個(gè)參數(shù)false,手動(dòng)進(jìn)行ACK
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
ACKReceiver類與Receiver類最大的區(qū)別就是在消息消費(fèi)的時(shí)候添加了channel.basicAck(envelope.getDeliveryTag(), false);channel.basicConsume(QUEUE_NAME, false, consumer);
2、自動(dòng)ACK存在的問題
2.1 修改消費(fèi)者Receiver類的代碼
因?yàn)镽eceiver類是采用自動(dòng)ACK,在handleDelivery方法中添加異常,如下:
2.2 生產(chǎn)者Send類不做任何修改,直接運(yùn)行Send類中的main方法,
消息發(fā)送成功,再訪問到RabbitMQ的web界面(注:之前啟動(dòng)的Receiver消費(fèi)者要停掉服務(wù)),
2.3 運(yùn)行Receiver類消費(fèi)者中的main方法,程序拋出異常:
2.4再查看rabbitmq的web管理界面:
消費(fèi)者拋出異常,但是消息依然被消費(fèi),實(shí)際上我們還沒獲取到消息。
3、演示手動(dòng)ACK
注意:先把Receiver消費(fèi)者服務(wù)停止掉
3.1 重新運(yùn)行生產(chǎn)者Send中的main方法,實(shí)現(xiàn)發(fā)送消息,
消息發(fā)送成功后,再次查看web管理界面,效果如下所示,隊(duì)列中收到消息一條。
3.2 再修改ACKReceiver類中的handleDelivery方法
增加如下圖紅框里的異常代碼(模擬手動(dòng)進(jìn)行ack前拋出異常)。
3.3 再運(yùn)行ACKReceiver類中的main方法,程序拋出異常:
3.4 查看web管理頁面:
消息沒有被消費(fèi)掉!
這是因?yàn)殡m然我們設(shè)置了手動(dòng)ACK,但是代碼中并沒有進(jìn)行消息確認(rèn)!所以消息并未被真正消費(fèi)掉。
4、最后消息確認(rèn)機(jī)制的正確做法
4.1 我們要在監(jiān)聽隊(duì)列時(shí)設(shè)置第二個(gè)參數(shù)為false,代碼中手動(dòng)進(jìn)行ACK
代碼如下圖紅框所示(之前異常的代碼需要注釋掉)
4.2 最后運(yùn)行ACKReceiver類中的main方法,查看web管理頁面
消費(fèi)者消費(fèi)成功!
生產(chǎn)者避免數(shù)據(jù)丟失:https://www.cnblogs.com/vipstone/p/9350075.html
4.5.2 work工作隊(duì)列模式
工作隊(duì)列或者競爭消費(fèi)者模式
work queues與入門程序(基本消息模式)相比,多了一個(gè)消費(fèi)端,兩個(gè)消費(fèi)端共同消費(fèi)同一個(gè)隊(duì)列中的消息,但是一個(gè)消息只能被一個(gè)消費(fèi)者獲取。
這個(gè)消息模型在Web應(yīng)用程序中特別有用,可以處理短的HTTP請求窗口中無法處理復(fù)雜的任務(wù)。
接下來我們來模擬這個(gè)流程:
P:生產(chǎn)者:任務(wù)的發(fā)布者
C1:消費(fèi)者1:領(lǐng)取任務(wù)并且完成任務(wù),假設(shè)完成速度較慢(模擬耗時(shí))
C2:消費(fèi)者2:領(lǐng)取任務(wù)并且完成任務(wù),假設(shè)完成速度較快
2.1 案例實(shí)戰(zhàn)
1、生產(chǎn)者
在org.example.work包中創(chuàng)建Send類,生產(chǎn)者循環(huán)發(fā)送50條消息
public class Send {
private final static String QUEUE_NAME = "test_work_queue";
public static void main(String[] argv) throws Exception {
// 獲取到連接
Connection connection = ConnectionUtil.getConnection();
// 獲取通道
Channel channel = connection.createChannel();
// 聲明隊(duì)列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 循環(huán)發(fā)布任務(wù)
for (int i = 0; i < 50; i++) {
// 消息內(nèi)容
String message = "task .. " + i;
// 向指定的隊(duì)列中發(fā)送消息
//參數(shù):String exchange, String routingKey, BasicProperties props, byte[] body
/**
* 參數(shù)明細(xì):
* 1、exchange,交換機(jī),如果不指定將使用mq的默認(rèn)交換機(jī)(設(shè)置為"")
* 2、routingKey,路由key,交換機(jī)根據(jù)路由key來將消息轉(zhuǎn)發(fā)到指定的隊(duì)列,如果使用默認(rèn)交換機(jī),routingKey設(shè)置為隊(duì)列的名稱
* 3、props,消息的屬性
* 4、body,消息內(nèi)容
*/
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
// 模擬網(wǎng)絡(luò)延時(shí)
Thread.sleep(i * 2);
}
// 關(guān)閉通道和連接
channel.close();
connection.close();
}
}
2、消費(fèi)者1
在org.example.work包中創(chuàng)建Receiver1類
public class Receiver1 {
private final static String QUEUE_NAME = "test_work_queue";
public static void main(String[] argv) throws Exception {
// 獲取到連接
Connection connection = ConnectionUtil.getConnection();
//創(chuàng)建會(huì)話通道,生產(chǎn)者和mq服務(wù)所有通信都在channel通道中完成
Channel channel = connection.createChannel();
// 聲明隊(duì)列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//實(shí)現(xiàn)消費(fèi)方法
DefaultConsumer consumer = new DefaultConsumer(channel){
// 獲取消息,并且處理,這個(gè)方法類似事件監(jiān)聽,如果有消息的時(shí)候,會(huì)被自動(dòng)調(diào)用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// body 即消息體
String msg = new String(body,"utf-8");
System.out.println(" [消費(fèi)者1] received : " + msg + "!");
//模擬任務(wù)耗時(shí)1s
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
e.printStackTrace();
}
}
};
// 監(jiān)聽隊(duì)列,第二個(gè)參數(shù):是否自動(dòng)進(jìn)行消息確認(rèn)。
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
3、消費(fèi)者2
在org.example.work包中創(chuàng)建Receiver2類
public class Receiver2 {
private final static String QUEUE_NAME = "test_work_queue";
public static void main(String[] argv) throws Exception {
// 獲取到連接
Connection connection = ConnectionUtil.getConnection();
//創(chuàng)建會(huì)話通道,生產(chǎn)者和mq服務(wù)所有通信都在channel通道中完成
Channel channel = connection.createChannel();
// 聲明隊(duì)列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//實(shí)現(xiàn)消費(fèi)方法
DefaultConsumer consumer = new DefaultConsumer(channel){
// 獲取消息,并且處理,這個(gè)方法類似事件監(jiān)聽,如果有消息的時(shí)候,會(huì)被自動(dòng)調(diào)用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// body 即消息體
String msg = new String(body,"utf-8");
System.out.println(" [消費(fèi)者1] received : " + msg + "!");
}
};
// 監(jiān)聽隊(duì)列,第二個(gè)參數(shù):是否自動(dòng)進(jìn)行消息確認(rèn)。
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
4、進(jìn)行消息消費(fèi)
接下來,兩個(gè)消費(fèi)者一同啟動(dòng),然后發(fā)送50條消息(先將兩個(gè)消費(fèi)者一起啟動(dòng),再啟動(dòng)生產(chǎn)者):
2.2 能者多勞
剛才的實(shí)現(xiàn)有問題嗎?
? 消費(fèi)者1比消費(fèi)者2的效率要低,一次任務(wù)的耗時(shí)較長
? 然而兩人最終消費(fèi)的消息數(shù)量是一樣的
? 消費(fèi)者2大量時(shí)間處于空閑狀態(tài),消費(fèi)者1一直忙碌
現(xiàn)在的狀態(tài)屬于是把任務(wù)平均分配,正確的做法應(yīng)該是消費(fèi)越快的人,消費(fèi)的越多。
怎么實(shí)現(xiàn)呢?
通過BasicQos方法設(shè)置prefetchCount = 1。這樣RabbitMQ就會(huì)使得每個(gè)Consumer在同一個(gè)時(shí)間點(diǎn)最多處理1個(gè)Message。換句話說,在接收到該Consumer的ack前,它不會(huì)將新的Message分發(fā)給它。相反,它會(huì)將其分派給不是仍然忙碌的下一個(gè)Consumer。
值得注意的是:prefetchCount在手動(dòng)ack的情況下才生效,自動(dòng)ack不生效。
注意: 需要在Receiver1和Receiver2中添加紅框中的代碼進(jìn)行設(shè)置
2.3 訂閱模型分類
1. 說明
1、一個(gè)生產(chǎn)者多個(gè)消費(fèi)者
2、每個(gè)消費(fèi)者都有一個(gè)自己的隊(duì)列
3、生產(chǎn)者沒有將消息直接發(fā)送給隊(duì)列,而是發(fā)送給exchange(交換機(jī)、轉(zhuǎn)發(fā)器)
4、每個(gè)隊(duì)列都需要綁定到交換機(jī)上
5、生產(chǎn)者發(fā)送的消息,經(jīng)過交換機(jī)到達(dá)隊(duì)列,實(shí)現(xiàn)一個(gè)消息被多個(gè)消費(fèi)者消費(fèi)
例子:注冊->發(fā)郵件、發(fā)短信
X(Exchanges):交換機(jī)一方面:接收生產(chǎn)者發(fā)送的消息。另一方面:知道如何處理消息,例如遞交給某個(gè)特別隊(duì)列、遞交給所有隊(duì)列、或是將消息丟棄。到底如何操作,取決于Exchange的類型。
2. Exchange類型有以下幾種:
? Fanout: 廣播,將消息交給所有綁定到交換機(jī)的隊(duì)列 (它是沒有routing key路由鍵)
? Direct:定向,把消息交給符合指定routing key 的隊(duì)列 (重點(diǎn)) (路由鍵是寫死的字符串)
? Topic:通配符,把消息交給符合routing pattern(路由模式) 的隊(duì)列(重點(diǎn)) (路由鍵是采用通配符#、進(jìn)行動(dòng)態(tài)匹配)
? Header: header模式與routing不同的地方在于,header模式取消routingkey,使用header中的 key/value(鍵值對)匹配隊(duì)列。
Header模式不展開了,感興趣可以參考這篇文章https://blog.csdn.net/zhu_tianwei/article/details/40923131
Exchange(交換機(jī))只負(fù)責(zé)轉(zhuǎn)發(fā)消息,不具備存儲(chǔ)消息的能力,因此如果沒有任何隊(duì)列與Exchange綁定,或者沒有符合路由規(guī)則的隊(duì)列,那么消息會(huì)丟失!
4.5.3 Publish/subscribe | 發(fā)布/訂閱模式 (交換機(jī)類型:Fanout,也稱為廣播)
(廣播模式中沒有routing key,是從路由模式開始才有)
Publish/subscribe模型示意圖 :
3.1 案例實(shí)戰(zhàn)
1、生產(chǎn)者
和前面兩種模式不同:
1) 聲明Exchange,不再聲明Queue
2) 發(fā)送消息到Exchange,不再發(fā)送到Queue
在org.example.publishsubscribe包中創(chuàng)建Send類
public class Send {
private final static String EXCHANGE_NAME = "test_fanout_exchange";
public static void main(String[] argv) throws Exception {
// 獲取到連接
Connection connection = ConnectionUtil.getConnection();
// 獲取通道
Channel channel = connection.createChannel();
// 聲明exchange,指定類型為fanout
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 消息內(nèi)容
String message = "注冊成功!!";
// 發(fā)布消息到Exchange (廣播模式下是沒有routingKey,所以參數(shù)二為””)
// 向指定的隊(duì)列中發(fā)送消息
//參數(shù):String exchange, String routingKey, BasicProperties props, byte[] body
/**
* 參數(shù)明細(xì):
* 1、exchange,交換機(jī),如果不指定將使用mq的默認(rèn)交換機(jī)(設(shè)置為"")
* 2、routingKey,路由key,交換機(jī)根據(jù)路由key來將消息轉(zhuǎn)發(fā)到指定的隊(duì)列,如果使用默認(rèn)交換機(jī),routingKey設(shè)置為隊(duì)列的名稱
* 3、props,消息的屬性
* 4、body,消息內(nèi)容
*/
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [生產(chǎn)者] Sent '" + message + "'");
channel.close();
connection.close();
}
}
2、消費(fèi)者1(注冊成功發(fā)給短信服務(wù))
在org.example.publishsubscribe包中創(chuàng)建Receiver1類
public class Receiver1 {
private final static String QUEUE_NAME = "fanout_exchange_queue_sms";//短信隊(duì)列
private final static String EXCHANGE_NAME = "test_fanout_exchange";
public static void main(String[] argv) throws Exception {
// 獲取到連接
Connection connection = ConnectionUtil.getConnection();
// 獲取通道
Channel channel = connection.createChannel();
// 聲明隊(duì)列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 綁定隊(duì)列到交換機(jī)
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 定義隊(duì)列的消費(fèi)者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 獲取消息,并且處理,這個(gè)方法類似事件監(jiān)聽,如果有消息的時(shí)候,會(huì)被自動(dòng)調(diào)用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// body 即消息體
String msg = new String(body);
System.out.println(" [短信服務(wù)] received : " + msg + "!");
}
};
// 監(jiān)聽隊(duì)列,自動(dòng)返回完成
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
3、消費(fèi)者2(注冊成功發(fā)給郵件服務(wù))
在org.example.publishsubscribe包中創(chuàng)建Receiver2類
public class Receiver2 {
//郵件隊(duì)列
private final static String QUEUE_NAME = "fanout_exchange_queue_email";
private final static String EXCHANGE_NAME = "test_fanout_exchange";
public static void main(String[] argv) throws Exception {
// 獲取到連接
Connection connection = ConnectionUtil.getConnection();
// 獲取通道
Channel channel = connection.createChannel();
// 聲明隊(duì)列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 綁定隊(duì)列到交換機(jī)
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 定義隊(duì)列的消費(fèi)者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 獲取消息,并且處理,這個(gè)方法類似事件監(jiān)聽,如果有消息的時(shí)候,會(huì)被自動(dòng)調(diào)用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// body 即消息體
String msg = new String(body);
System.out.println(" [郵件服務(wù)] received : " + msg + "!");
}
};
// 監(jiān)聽隊(duì)列,自動(dòng)返回完成
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
4、進(jìn)行消息消費(fèi)
我們運(yùn)行兩個(gè)消費(fèi)者,然后發(fā)送1條消息:
注意: 啟動(dòng)有可能會(huì)報(bào)錯(cuò):channel error; protocol method: #method
報(bào)這個(gè)錯(cuò)誤,證明我們沒有聲明交換機(jī),卻拿來使用了,所以我們需要先啟動(dòng)生產(chǎn)者進(jìn)行交換機(jī)聲明,然后在按照上面的流程走就沒有問題了
5、思考
5.1 publish/subscribe與work queues有什么區(qū)別。
區(qū)別:
1)work queues不用定義交換機(jī),而publish/subscribe需要定義交換機(jī)。
2)publish/subscribe的生產(chǎn)方是面向交換機(jī)發(fā)送消息,work queues的生產(chǎn)方是面向隊(duì)列發(fā)送消息(底層使用默認(rèn)交換機(jī))。
3)publish/subscribe需要設(shè)置隊(duì)列和交換機(jī)的綁定,work queues不需要設(shè)置,實(shí)際上work queues會(huì)將隊(duì)列綁定到默認(rèn)的交換機(jī) 。
相同點(diǎn):
所以兩者實(shí)現(xiàn)的發(fā)布/訂閱的效果是一樣的,多個(gè)消費(fèi)端監(jiān)聽同一個(gè)隊(duì)列不會(huì)重復(fù)消費(fèi)消息。
5.2 實(shí)際工作用 publish/subscribe還是work queues?
建議使用 publish/subscribe,發(fā)布訂閱模式比工作隊(duì)列模式更強(qiáng)大(也可以做到同一隊(duì)列競爭),并且發(fā)布訂閱模式(廣播模式)可以指定自己專用的交換機(jī)。
4.5.4 Routing 路由模式(交換機(jī)類型:direct)
(路由模式中的routing key路由鍵格式為寫死的字符串,而Topic通配符模式中的routing key是使用通配符#和*來匹配多個(gè)或一個(gè)routing key,而通過routing key來實(shí)現(xiàn)將隊(duì)列與交換機(jī)進(jìn)行綁定)
Routing模型示意圖:
P:生產(chǎn)者,向Exchange發(fā)送消息,發(fā)送消息時(shí),會(huì)指定一個(gè)routing key。
X:Exchange(交換機(jī)),接收生產(chǎn)者的消息,然后把消息遞交給與routing key完全匹配的隊(duì)列
C1:消費(fèi)者,其所在隊(duì)列指定了需要routing key 為 error 的消息
C2:消費(fèi)者,其所在隊(duì)列指定了需要routing key 為 info、error、warning 的消息
4.1 案例實(shí)戰(zhàn)
1、生產(chǎn)者
在org.example.routingkey包中創(chuàng)建Send類
public class Send {
private final static String EXCHANGE_NAME = "test_direct_exchange";
public static void main(String[] argv) throws Exception {
// 獲取到連接
Connection connection = ConnectionUtil.getConnection();
// 獲取通道
Channel channel = connection.createChannel();
// 聲明exchange,指定類型為direct
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 消息內(nèi)容,
String message = "注冊成功!請短信回復(fù)[T]退訂";
// 發(fā)送消息,并且指定routing key 為:sms,只有短信服務(wù)能接收到消息
channel.basicPublish(EXCHANGE_NAME, "sms", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
}
注:上述生產(chǎn)者在發(fā)送消息時(shí),是指定routing key為sms,而根據(jù)上面提到的Routing模型示意圖,生產(chǎn)者將消息發(fā)送給exchange交換機(jī),交換機(jī)再通過routing key與queue隊(duì)列進(jìn)行綁定,我們把sms作為短信的路由鍵。
? 在下面的消費(fèi)者中使用routing key為sms將隊(duì)列與交換機(jī)進(jìn)行綁定后,就可以接收到生產(chǎn)者routing key為sms的消息了,換句話其他消費(fèi)者如果沒有使用routing key為sms綁定隊(duì)列與交換機(jī),就獲取不到生產(chǎn)者發(fā)送的消息了(消費(fèi)者2就沒有使用routing key為sms來綁定隊(duì)列與交換機(jī))。
2、消費(fèi)者1(使用routing key為sms來綁定隊(duì)列與交換機(jī))
在org.example.routingkey包中創(chuàng)建Receiver1類
public class Receiver1 {
private final static String QUEUE_NAME = "direct_exchange_queue_sms";//短信隊(duì)列
private final static String EXCHANGE_NAME = "test_direct_exchange";
public static void main(String[] argv) throws Exception {
// 獲取到連接
Connection connection = ConnectionUtil.getConnection();
// 獲取通道
Channel channel = connection.createChannel();
// 聲明隊(duì)列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 綁定隊(duì)列到交換機(jī),同時(shí)指定需要訂閱的routing key??梢灾付ǘ鄠€(gè)
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "sms");//指定接收發(fā)送方指定routing key為sms的消息
//channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "email");
// 定義隊(duì)列的消費(fèi)者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 獲取消息,并且處理,這個(gè)方法類似事件監(jiān)聽,如果有消息的時(shí)候,會(huì)被自動(dòng)調(diào)用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// body 即消息體
String msg = new String(body);
System.out.println(" [短信服務(wù)] received : " + msg + "!");
}
};
// 監(jiān)聽隊(duì)列,自動(dòng)ACK
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
3、消費(fèi)者2(用routing key為email來綁定隊(duì)列與交換機(jī))
在org.example.routingkey包中創(chuàng)建Receiver2類
public class Receiver2 {
//郵件隊(duì)列
private final static String QUEUE_NAME = "direct_exchange_queue_email"; private final static String EXCHANGE_NAME = "test_direct_exchange";
public static void main(String[] argv) throws Exception {
// 獲取到連接
Connection connection = ConnectionUtil.getConnection();
// 獲取通道
Channel channel = connection.createChannel();
// 聲明隊(duì)列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 綁定隊(duì)列到交換機(jī),同時(shí)指定需要訂閱的routing key??梢灾付ǘ鄠€(gè)
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "email");//指定接收發(fā)送方指定routing key為email的消息
// 定義隊(duì)列的消費(fèi)者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 獲取消息,并且處理,這個(gè)方法類似事件監(jiān)聽,如果有消息的時(shí)候,會(huì)被自動(dòng)調(diào)用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// body 即消息體
String msg = new String(body);
System.out.println(" [郵件服務(wù)] received : " + msg + "!");
}
};
// 監(jiān)聽隊(duì)列,自動(dòng)ACK
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
我們發(fā)送sms的RoutingKey,發(fā)現(xiàn)結(jié)果:只有指定短信的消費(fèi)者1收到消息了(因?yàn)橄M(fèi)者1在綁定交換機(jī)的時(shí)候使用了sms這個(gè)routingkey)
4.5.5 Topics通配符模式(交換機(jī)類型:topics)
路由模式中的routing key路由鍵格式為寫死的字符串,而Topic通配符模式中的routing key是使用通配符#和*來匹配多個(gè)或一個(gè)routing key,而通過routing key來實(shí)現(xiàn)將隊(duì)列與交換機(jī)進(jìn)行綁定
4.1 Topics模型示意圖:
每個(gè)消費(fèi)者監(jiān)聽自己的隊(duì)列,并且設(shè)置帶通配符的routingkey,生產(chǎn)者將消息發(fā)給broker,由交換機(jī)根據(jù)routingkey來轉(zhuǎn)發(fā)消息到指定的隊(duì)列。
Routingkey一般都是有一個(gè)或者多個(gè)單詞組成,多個(gè)單詞之間以“.”分割,例如:inform.sms
4.2 通配符規(guī)則
星號(hào)(*):匹配不多不少恰好1個(gè)詞。井號(hào)(#):匹配一個(gè)或多個(gè)詞
4.3 舉例
如示意圖所示
*.orange.* : 只能匹配 test.orange.test (只能匹配一個(gè)詞)
*.*.rabbit : 只能匹配 test.test.rabbit (只能匹配兩個(gè)詞)
lazy.# : 可以匹配 lazy.test 和 lazy.test.test (可以匹配一個(gè)或多個(gè)詞)
4.4 案例實(shí)戰(zhàn)
1、生產(chǎn)者
在org.example.topics包中創(chuàng)建Send類
public class Send {
private final static String EXCHANGE_NAME = "test_topic_exchange";
public static void main(String[] argv) throws Exception {
// 獲取到連接
Connection connection = ConnectionUtil.getConnection();
// 獲取通道
Channel channel = connection.createChannel();
// 聲明exchange,指定類型為topic
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
// 消息內(nèi)容
String message = "這是一只行動(dòng)迅速的橙色的兔子";
// 發(fā)送消息,并且指定routing key為:quick.orange.rabbit
channel.basicPublish(EXCHANGE_NAME, "quick.orange.rabbit", null, message.getBytes());
System.out.println(" [動(dòng)物描述:] Sent '" + message + "'");
channel.close();
connection.close();
}
}
2、消費(fèi)者1
在org.example.topics包中創(chuàng)建Receiver1類
public class Receiver1 {
private final static String QUEUE_NAME = "topic_exchange_queue_Q1";
private final static String EXCHANGE_NAME = "test_topic_exchange";
public static void main(String[] argv) throws Exception {
// 獲取到連接
Connection connection = ConnectionUtil.getConnection();
// 獲取通道
Channel channel = connection.createChannel();
// 聲明隊(duì)列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 綁定隊(duì)列到交換機(jī),同時(shí)指定需要訂閱的routing key。訂閱所有的橙色動(dòng)物
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.orange.*");
// 定義隊(duì)列的消費(fèi)者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 獲取消息,并且處理,這個(gè)方法類似事件監(jiān)聽,如果有消息的時(shí)候,會(huì)被自動(dòng)調(diào)用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {
// body 即消息體
String msg = new String(body);
System.out.println(" [消費(fèi)者1] received : " + msg + "!");
}
};
// 監(jiān)聽隊(duì)列,自動(dòng)ACK
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
? 注:上框代碼中消費(fèi)者在將隊(duì)列與交換機(jī)進(jìn)行綁定時(shí),routing key是使用通配符模式來匹配生產(chǎn)者在發(fā)送消息時(shí)所指定的routing key,而非之前Routing路由模式是指定具體的某個(gè)routing key(之前路由模式是:生產(chǎn)者發(fā)送消息并指定routing key為test,這時(shí)消費(fèi)者在將隊(duì)列與交換機(jī)進(jìn)行綁定時(shí),如果指定了routing key為test,則可以接收到生產(chǎn)者發(fā)送的消息,反之不行,之前路由模式中的生產(chǎn)者與消費(fèi)者的routing key是直接寫死,而通配模式中消費(fèi)者綁定隊(duì)列與交換機(jī)時(shí)的routing key為使用通配符的形式進(jìn)行匹配)
? 上框代碼中*.orange.*是可以匹配到生產(chǎn)者中的quick.orange.rabbit,因此消費(fèi)者1是可以接收到生產(chǎn)者發(fā)送的消息。
3、消費(fèi)者2
在org.example.topics包中創(chuàng)建Receiver2類
public class Receiver2 {
private final static String QUEUE_NAME = "topic_exchange_queue_Q2";
private final static String EXCHANGE_NAME = "test_topic_exchange";
public static void main(String[] argv) throws Exception {
// 獲取到連接
Connection connection = ConnectionUtil.getConnection();
// 獲取通道
Channel channel = connection.createChannel();
// 聲明隊(duì)列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 綁定隊(duì)列到交換機(jī),同時(shí)指定需要訂閱的routing key。訂閱關(guān)于兔子以及懶惰動(dòng)物的消息
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*.rabbit");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "lazy.#");
// 定義隊(duì)列的消費(fèi)者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 獲取消息,并且處理,這個(gè)方法類似事件監(jiān)聽,如果有消息的時(shí)候,會(huì)被自動(dòng)調(diào)用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {
// body 即消息體
String msg = new String(body);
System.out.println(" [消費(fèi)者2] received : " + msg + "!");
}
};
// 監(jiān)聽隊(duì)列,自動(dòng)ACK
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
上框代碼中*.*.rabbit是可以匹配到生產(chǎn)者中的quick.orange.rabbit,它是可以接收到生產(chǎn)者發(fā)送的消息,但是lazy.#是不可以匹配quick.orange.rabbit,故懶兔子是接收不到生產(chǎn)者發(fā)送的消息。結(jié)果消費(fèi)者1和消費(fèi)者2都接收到消息了
4.6 SpringBoot整合RabbitMQ
4.6.1 創(chuàng)建SpringBoot項(xiàng)目
根據(jù)下圖進(jìn)行SpringBoot項(xiàng)目創(chuàng)建 注意: 選擇JDK1.8的建議用3.0以下的SpringBoot版本,3.0或以上的SpringBoot版本建議使用JDK17或更高版本
4.6.2 添加依賴
在pom文件里面添加如下依賴
4.6.3 添加配置
yml配置文件添加
server:
port: 8080
spring:
rabbitmq:
host: 服務(wù)器的主機(jī)名或IP地址
port: 5672
username: rabbitMQ賬號(hào)
password: rabbitMQ密碼
# 虛擬主機(jī)名,默認(rèn)為"/"
virtual-host: /
# 發(fā)布者確認(rèn)模式
publisher-confirm-type: correlated
# 是否啟用發(fā)布者的返回功能
publisher-returns: true
# 模版配置
template:
retry:
# 發(fā)布重試,默認(rèn)false
enabled: true
# 重試時(shí)間 默認(rèn)1000ms
initial-interval: 10000ms
# 重試最大間隔時(shí)間
max-interval: 300000ms
# 重試的時(shí)間隔乘數(shù),比如配2,0 第一次等于10s,第二次等于20s,第三次等于40s
multiplier: 2
# 交換機(jī)類型
exchange: topic.exchange
listener:
# 默認(rèn)配置是simple
type: simple
simple:
# 手動(dòng)ack Acknowledge mode of container. auto none
acknowledge-mode: manual
# 消費(fèi)者調(diào)用程序線程的最小數(shù)量
concurrency: 10
# 消費(fèi)者最大數(shù)量
max-concurrency: 10
# 限制消費(fèi)者每次只處理一條信息,處理完在繼續(xù)下一條
prefetch: 1
# 啟動(dòng)時(shí)是否默認(rèn)啟動(dòng)容器
auto-startup: true
# 被拒絕時(shí)重新進(jìn)入隊(duì)列
default-requeue-rejected: true
4.6.4 添加配置類
在com.example.rabbitmq.config包中創(chuàng)建RabbitmqConfig類
@Configuration
public class RabbitmqConfig {
public static final String QUEUE_EMAIL = "queue_email"; // email隊(duì)列
public static final String QUEUE_SMS = "queue_sms"; // sms隊(duì)列
public static final String EXCHANGE_NAME="topic.exchange"; // topics類型交換機(jī)
// routingkey的值通常是使用了通配符,#代表匹配一個(gè)或多個(gè),*代表匹配一個(gè)
public static final String ROUTINGKEY_EMAIL="topic.#.email.#"; // routingkey路由鍵
public static final String ROUTINGKEY_SMS="topic.#.sms.#";
// 聲明交換機(jī)(構(gòu)建topic類型的交換機(jī))
@Bean(EXCHANGE_NAME)
public Exchange exchange(){
// durable(true) 持久化,rabbitmq重啟之后交換機(jī)還在
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
// 聲明email隊(duì)列
/*
* new Queue(QUEUE_EMAIL,true,false,false)
* durable="true" 持久化 rabbitmq重啟的時(shí)候不需要?jiǎng)?chuàng)建新的隊(duì)列
* auto-delete 表示消息隊(duì)列沒有在使用時(shí)將被自動(dòng)刪除 默認(rèn)是false
* exclusive 表示該消息隊(duì)列是否只在當(dāng)前connection生效,默認(rèn)是false
*/
@Bean(QUEUE_EMAIL)
public Queue emailQueue(){
return new Queue(QUEUE_EMAIL);
}
// 聲明sms隊(duì)列
@Bean(QUEUE_SMS)
public Queue smsQueue(){
return new Queue(QUEUE_SMS);
}
// ROUTINGKEY_EMAIL隊(duì)列綁定交換機(jī),指定routingKey
@Bean
public Binding bindingEmail(@Qualifier(QUEUE_EMAIL) Queue queue,
@Qualifier(EXCHANGE_NAME) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();
}
// 使用routingkey實(shí)現(xiàn)queue與exchange兩者間的綁定
// norags()表示無參
//ROUTINGKEY_SMS隊(duì)列綁定交換機(jī),指定routingKey
@Bean
public Binding bindingSMS(@Qualifier(QUEUE_SMS) Queue queue,
@Qualifier(EXCHANGE_NAME) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();
}
}
4.6.5 在測試類中添加生產(chǎn)者,并發(fā)送消息
@Resource
private RabbitTemplate rabbitTemplate;
@Test
void contextLoads() {
/**
* 參數(shù):
* 1、交換機(jī)名稱
* 2、routingKey 是用來讓交換機(jī)通過routingKey將消息發(fā)送給所對應(yīng)的隊(duì)列
* 3、消息內(nèi)容
*/
for (int i = 0; i < 5; i++) {
String message = "恭喜您,注冊成功!userid=" + i;
rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME, "topic.sms.email", message);
// 交換機(jī)綁定隊(duì)列是通過routingkey,而此處代表的routingkey為topic.sms.email,它與可以匹配到RabbitmqConfig類中的ROUTINGKEY_EMAIL和ROUTINGKEY_SMS,
// 即交換機(jī)通過ROUTINGKEY_EMAIL和ROUTINGKEY_SMS將message變量的值作為消息發(fā)送到queue_email和queue_sms兩個(gè)隊(duì)列,因?yàn)榇颂幋a是使用for循環(huán),即向這兩個(gè)隊(duì)列分別發(fā)送了5次消息。
System.out.println(" [x] Sent '" + message + "'");
}
}
啟動(dòng)測試類
4.6.6 生產(chǎn)者發(fā)送消息測試結(jié)果
web管理界面: 可以看到已經(jīng)創(chuàng)建了交換機(jī)以及queue_email、queue_sms 2個(gè)隊(duì)列,并且向這兩個(gè)隊(duì)列分別發(fā)送了5條消息:
4.6.7 創(chuàng)建消息接收器(消費(fèi)者)
在com.example.rabbitmq.receiver包中創(chuàng)建ReceiveHandler類
@Component
public class ReceiveHandler {
// 監(jiān)聽郵件隊(duì)列
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "queue_email", durable = "true"),
exchange = @Exchange(
value = "topic.exchange",
ignoreDeclarationExceptions = "true",
type = ExchangeTypes.TOPIC
),
key = {"topic.#.email.#","email.*"}))
public void receive_email(Message msg, Channel channel){
// 消息的唯一標(biāo)識(shí)
long deliveryTag = msg.getMessageProperties().getDeliveryTag();
System.out.println(" [郵件服務(wù)] received : " + new String(msg.getBody()) + "!");
try {
channel.basicAck(deliveryTag, false);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
// 監(jiān)聽短信隊(duì)列
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "queue_sms", durable = "true"),
exchange = @Exchange(
value = "topic.exchange",
ignoreDeclarationExceptions = "true",
type = ExchangeTypes.TOPIC
),
key = {"topic.#.sms.#"}))
public void receive_sms(Message msg, Channel channel){
// 消息的唯一標(biāo)識(shí)
long deliveryTag = msg.getMessageProperties().getDeliveryTag();
System.out.println(" [短信服務(wù)] received : " + new String(msg.getBody()) + "!");
try {
channel.basicAck(deliveryTag, false);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
我們在配置文件中設(shè)置了手動(dòng)ACK機(jī)制,所以我們代碼也要進(jìn)行手動(dòng)ACK,不然會(huì)報(bào)錯(cuò)
屬性說明:
@Componet: 添加到類上的注解,將注解注解所標(biāo)識(shí)的類注冊到Spring容器
@RabbitListener: 方法上的注解,聲明這個(gè)方法是一個(gè)消費(fèi)者方法,需要指定下面的屬性:
? bindings: 指定綁定關(guān)系,可以有多個(gè)。值是@QueueBinding的數(shù)組。
? @QueueBinding包含下面屬性:
? ? value: 這個(gè)消費(fèi)者關(guān)聯(lián)的隊(duì)列。值是@Queue,代表一個(gè)隊(duì)列
? ? exchange: 隊(duì)列所綁定的交換機(jī),值是@Exchange類型
? ? key: 隊(duì)列和交換機(jī)綁定的RoutingKey,可指定多個(gè)
Message msg: 在這個(gè)上下文中,Message 是一個(gè)RabbitMQ消息的封裝。它包含了消息的內(nèi)容(body)以及一些其他的屬性(例如,消息的唯一標(biāo)識(shí),即deliveryTag,以及消息的優(yōu)先級(jí)等)。你可以將Message對象看作是一個(gè)封裝了消息和其相關(guān)屬性的對象。Channel channel: Channel 是RabbitMQ的一個(gè)關(guān)鍵組件,它提供了一個(gè)高效和可靠的方式來發(fā)送和接收消息。在生產(chǎn)者-消費(fèi)者模型中,生產(chǎn)者通過Channel將消息發(fā)送到RabbitMQ,消費(fèi)者則從Channel接收消息。Channel對象可以創(chuàng)建多個(gè),但是每個(gè)Channel都應(yīng)該有唯一的標(biāo)識(shí)符。
啟動(dòng)SPringBoot項(xiàng)目
4.6.8 消費(fèi)者消費(fèi)結(jié)果
效果如下所示
5、RabbitMQ問題相關(guān)解決方案
5.1 生產(chǎn)端可靠性投遞方案介紹
既然我們項(xiàng)目中用到的RabbitMQ,它有它的優(yōu)點(diǎn)比如:解耦、異常、流量削峰,但是我們還需要考慮額外的東西,比如消息的可靠性。
什么是消息的可靠性,我們從兩個(gè)方面來講解,第一個(gè)方面是怎么保證生產(chǎn)者的可靠性投遞,即確保生產(chǎn)端發(fā)出的消息能真真正正地投遞到了隊(duì)列,最后給消費(fèi)者消費(fèi)。第二個(gè)消費(fèi)者怎么去做冪等性的保證,也就是說我們使用RabbitMQ發(fā)送消息時(shí),會(huì)出現(xiàn)同一條消息會(huì)重復(fù)多發(fā)的情況。(即怎么保存消費(fèi)者只消費(fèi)一條消息,另外的重復(fù)發(fā)多的消息做丟棄)
5.1.1 關(guān)于怎么保證生產(chǎn)者的可靠性投遞
可以從以下三點(diǎn)去做:
怎么去保證消息的成功發(fā)出。去保證RabbitMQ成功接收到消息,即隊(duì)列要成功接收到消息。保證生產(chǎn)者能夠接收到RabbitMQ的確認(rèn)應(yīng)答,即隊(duì)列收到了消息需要應(yīng)答給生產(chǎn)者,生產(chǎn)者就能夠知道RabbitMQ是收到了消息,這條消息是發(fā)送成功。因?yàn)榇蠹叶贾繰abbitMQ本質(zhì)上就是生產(chǎn)者—>隊(duì)列—>消費(fèi)者,其中生產(chǎn)者只負(fù)責(zé)發(fā)送消息,然后隊(duì)列只負(fù)責(zé)消息的中轉(zhuǎn),而消費(fèi)者只負(fù)責(zé)消息的消費(fèi)。需要完善消息的補(bǔ)償機(jī)制。
關(guān)于具體實(shí)現(xiàn)保證生產(chǎn)者的可靠性投遞,市面上有兩種主流的方案。
5.1.2 消息發(fā)送
方案一:消息落庫,對消息狀態(tài)進(jìn)行標(biāo)記
? 解析:
? 如上圖所示,MSG_DB為消息數(shù)據(jù)庫,BIZ_DB為業(yè)務(wù)數(shù)據(jù)庫,
第一步就是將業(yè)務(wù)數(shù)據(jù)庫入庫,還有需要發(fā)送的消息入庫到消息數(shù)據(jù)庫中(入庫后的消息其狀態(tài)為投遞中)。第二步是producer生產(chǎn)者再將消息發(fā)送給RabbitMQ第三步RabbitMQ會(huì)開啟確認(rèn)回調(diào),producer生產(chǎn)者會(huì)監(jiān)聽來自RabbitMQ的確認(rèn)回調(diào)。第四步producer生產(chǎn)者監(jiān)聽到了回調(diào)表示消息發(fā)送成功了,生產(chǎn)者就會(huì)更新MSG_DB消息數(shù)據(jù)庫中剛剛發(fā)送的消息的狀態(tài)為投遞成功,上圖status 1表示成功,0表示投遞中。使用分布式定時(shí)任務(wù)get status:0表示獲取狀態(tài)為投遞中的消息分布式定義任務(wù)將獲取狀態(tài)為投遞中的消息進(jìn)行Retry Send重發(fā)(再執(zhí)行第2、3、4步)。Retry count > 3表示重發(fā)次數(shù)超過3次,就更新當(dāng)前消息的狀態(tài)為2,同時(shí)停止重發(fā)。
方案二:消息延遲投遞,做二次確認(rèn),回調(diào)檢查
? 解析:
? 上圖中BIZ DB為業(yè)務(wù)數(shù)據(jù)庫,Upstream service上層業(yè)務(wù)(看成生產(chǎn)者發(fā)送消息),Downstream service下層業(yè)務(wù)(看成消費(fèi)者接收消息)
Upstream service(生產(chǎn)端)將業(yè)務(wù)數(shù)據(jù)入庫到BIZ DB,再執(zhí)行first Send即第一次發(fā)送消息到RabbitMQ中Step2: Second Send Delay Check第二次延遲發(fā)送消息(算上第一次,即生產(chǎn)端會(huì)向RabbitMQ發(fā)送兩次消息)Step3: Listener Consume 消費(fèi)者監(jiān)聽隊(duì)列推送過來的消息然后進(jìn)行消費(fèi)。消費(fèi)者消費(fèi)完后會(huì)生成確認(rèn)消息發(fā)送給RabbitMQ中,即Downstream service不僅僅做消費(fèi)者,它也可以做生產(chǎn)者去發(fā)送消息,將確認(rèn)消息發(fā)送到隊(duì)列中。Callback Service 回調(diào)服務(wù),Listener Confirm監(jiān)聽隊(duì)列推送過來的確認(rèn)信息。監(jiān)聽到確認(rèn)信息后,回調(diào)函數(shù)會(huì)將消息入庫到消息數(shù)據(jù)庫中。Check Detail 如果監(jiān)聽到的是延遲投遞的第二次消息,回調(diào)函數(shù)就會(huì)到MSG_DB消息數(shù)據(jù)庫里檢查這個(gè)消息數(shù)據(jù)在數(shù)據(jù)庫中是否存在(因?yàn)樵诘谖宀降臅r(shí)候把消息入庫了消息數(shù)據(jù)庫),如果數(shù)據(jù)庫中不存在這個(gè)消息就說明消費(fèi)者沒有把確認(rèn)消息發(fā)送給隊(duì)列(即消費(fèi)者消費(fèi)失敗了),這時(shí)Callback servcie回調(diào)服務(wù)就會(huì)執(zhí)行RPC ReSend Command(RPC 重發(fā)命令)再重新從第一步開始執(zhí)行。
這個(gè)方案相對于第一種方案的優(yōu)點(diǎn)是:數(shù)據(jù)庫操作減少了。
其流程為:
發(fā)送消息時(shí),將當(dāng)前消息數(shù)據(jù)庫存入數(shù)據(jù)庫,投遞狀態(tài)為消息投遞中。開啟消息確認(rèn)回調(diào)機(jī)制。確認(rèn)成功,更新投遞狀態(tài)為消息投遞成功。
開啟定時(shí)任務(wù),重新投遞失敗的消息。重試超過3次,更新投遞狀態(tài)為投遞失敗。
5.1.3 消息落庫,對消息狀態(tài)打標(biāo)的具體實(shí)現(xiàn)
開啟消息回調(diào)機(jī)制
1. 創(chuàng)建數(shù)據(jù)庫
DROP DATABASE IF EXISTS `rabbit_msg_rk`;
CREATE DATABASE `rabbit_msg_rk`;
USE `rabbit_msg_rk`;
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for message_log
-- ----------------------------
DROP TABLE IF EXISTS `message_log`;
CREATE TABLE `message_log` (
`id` int(0) NOT NULL AUTO_INCREMENT,
`message_id` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
`order_id` int(0) NULL DEFAULT NULL,
`try_count` int(0) NULL DEFAULT 0,
`status` int(0) NULL DEFAULT 0,
`create_time` datetime(3) NULL DEFAULT NULL,
`update_time` datetime(3) NULL DEFAULT NULL,
`try_time` datetime(3) NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE,
INDEX `order_id`(`order_id`) USING BTREE,
CONSTRAINT `message_log_ibfk_1` FOREIGN KEY (`order_id`) REFERENCES `orders` (`id`) ON DELETE CASCADE ON UPDATE CASCADE
) ENGINE = InnoDB AUTO_INCREMENT = 18 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;
-- ----------------------------
-- Table structure for orders
-- ----------------------------
DROP TABLE IF EXISTS `orders`;
CREATE TABLE `orders` (
`id` int(0) NOT NULL AUTO_INCREMENT,
`stock_id` int(0) NOT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;
SET FOREIGN_KEY_CHECKS = 1;
2. 創(chuàng)建Spring Boot項(xiàng)目
如下圖所示創(chuàng)建Spring Boot項(xiàng)目 注意: 選擇JDK1.8的建議用3.0以下的SpringBoot版本,3.0或以上的SpringBoot版本建議使用JDK17或更高版本
3. 添加依賴
在pom.xml文件中添加如下依賴
4. 添加配置
在yml文件中添加如下配置
server:
port: 8080
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://服務(wù)器的主機(jī)名或IP地址:端口/數(shù)據(jù)庫名?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
username: mysql賬號(hào)
password: mysql密碼
rabbitmq:
host: 服務(wù)器的主機(jī)名或IP地址
port: 5672
username: rabbitMQ賬號(hào)
password: rabbitMQ密碼
# 虛擬主機(jī)名,默認(rèn)為"/"
virtual-host: /
# 消息確認(rèn)回調(diào)
# none:表示禁用發(fā)送方確認(rèn)機(jī)制
# correlated:表示開啟發(fā)送方確認(rèn)機(jī)制
# simple:表示開啟發(fā)送方確認(rèn)機(jī)制,并支持 waitForConfirms() 和 waitForConfirmsOrDie() 的調(diào)用。
publisher-confirm-type: correlated
# 消息失敗回調(diào)
publisher-returns: true
# 模版配置
template:
retry:
# 發(fā)布重試,默認(rèn)false
enabled: true
# 重試時(shí)間 默認(rèn)1000ms
initial-interval: 10000ms
# 重試最大間隔時(shí)間
max-interval: 300000ms
# 重試的時(shí)間隔乘數(shù),比如配2,0 第一次等于10s,第二次等于20s,第三次等于40s
multiplier: 2
exchange: topic.exchange
listener:
# 默認(rèn)配置是simple
type: simple
simple:
# 手動(dòng)ack Acknowledge mode of container. auto none
acknowledge-mode: manual
# 消費(fèi)者調(diào)用程序線程的最小數(shù)量
concurrency: 10
# 消費(fèi)者最大數(shù)量
max-concurrency: 10
# 限制消費(fèi)者每次只處理一條信息,處理完在繼續(xù)下一條
prefetch: 1
# 啟動(dòng)時(shí)是否默認(rèn)啟動(dòng)容器
auto-startup: true
# 被拒絕時(shí)重新進(jìn)入隊(duì)列
default-requeue-rejected: true
5. 創(chuàng)建實(shí)體類
在com.example.rabbitmqmsgrk.model包里面創(chuàng)建下面兩個(gè)類
訂單類:
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@TableName("orders")
public class Order implements Serializable {
/**
* 訂單id
*/
@TableId(value = "id", type = IdType.AUTO)
private Integer id;
/**
* 庫存id
*/
@TableField("stock_id")
private Integer stockId;
}
消息類:
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@TableName("message_log")
public class MassageLog implements Serializable {
/**
* 消息uid
*/
@TableField("message_id")
private String messageId;
/**
* 訂單id
*/
@TableField("order_id")
private Integer orderId;
/**
* 重試時(shí)間
*/
@TableField("try_time")
private LocalDateTime tryTime;
/**
* 重試次數(shù),閾值:3
*/
@TableField("try_count")
private Integer tryCount;
/**
* 消息狀態(tài),0:未發(fā)送成功、1:發(fā)送成功、2:失敗消息
*/
private Integer status;
@TableField("create_time")
private LocalDateTime createTime;
@TableField("update_time")
private LocalDateTime updateTime;
}
6. 創(chuàng)建mapper
在com.example.rabbitmqmsgrk.mapper包下面分別創(chuàng)建MessageLogMapper、OrderMapper兩個(gè)接口
// MessageLog接口
@Mapper
public interface MessageLogMapper extends BaseMapper
}
// Order 接口
@Mapper
public interface OrderMapper extends BaseMapper
}
在resources下面創(chuàng)建mapper包,然后在包下面創(chuàng)建MessageLogMapper、OrderMapper這兩個(gè)接口的接口映射文件
MessageLogMapper.xml
message_id,order_id,try_count,try_time,status,create_time,update_time
OrderMapper.xml
id,stock_id
7. 創(chuàng)建Controller
在com.example.rabbitmqmsgrk.web包下面創(chuàng)建OrderController類
@RestController
public class OrderController {
@Resource
private MessageLogMapper messageLogMapper;
@Resource
private RabbitTemplate rabbitTemplate;
@GetMapping("/msgTest")
public String msgTest(Integer orderId,Integer stockId){
Order order = new Order();
order.setId(orderId);
order.setStockId(stockId);
//數(shù)據(jù)庫記錄發(fā)送的消息
String msgId= UUID.randomUUID().toString();
MessageLog messageLog = new MessageLog();
messageLog.setMessageId(msgId);
messageLog.setStatus(0);
messageLog.setOrderId(orderId);
messageLog.setTryCount(0);
messageLog.setTryTime(LocalDateTime.now().plusMinutes(1));
messageLog.setCreateTime(LocalDateTime.now());
messageLogMapper.insert(messageLog);
/**
* 發(fā)送消息
* @param exchange 為交換機(jī)名字
* @param routingKey 為路由鍵
* @param object 為需要發(fā)送消息的內(nèi)容
* @param correlationData 為本次消息的ID
*/
rabbitTemplate.convertAndSend("msg.exchange","msg.routing.key", order, new CorrelationData(msgId));
return "成功";
}
}
8. 創(chuàng)建配置類
在com.example.rabbitmqmsgrk.config包中創(chuàng)建RabbitMQConfig類
@Configuration
public class RabbitMQConfig {
private static final Logger LOGGER= LoggerFactory.getLogger(RabbitMQConfig.class);
// 注入緩存連接工廠依賴對象
@Resource
private CachingConnectionFactory cachingConnectionFactory;
@Resource
private MessageLogMapper messageLogMapper;
@Bean
public RabbitTemplate rabbitTemplate(){
RabbitTemplate rabbitTemplate=new RabbitTemplate(cachingConnectionFactory);
/**
* 消息確認(rèn)回調(diào),確認(rèn)消息是否會(huì)到達(dá)broker
* data:消息的唯一標(biāo)識(shí)
* ack:確認(rèn)結(jié)果
* cause:失敗原因
*/
rabbitTemplate.setConfirmCallback((data,ack,cause)->{
String msgId = data.getId();
if(ack){
LOGGER.info("{}==============>消息發(fā)送成功",msgId);
//在生產(chǎn)者發(fā)送消息的時(shí)候會(huì)把消息入庫到MSG_DB消息數(shù)據(jù)庫中,此消息的狀態(tài)status值為0,表示消息投遞中。當(dāng)消費(fèi)者監(jiān)聽到消息后,這里的setConfirmCallback()方法中實(shí)現(xiàn)消息確認(rèn)回調(diào),更新status的值為1表示投遞成功。
messageLogMapper.update(new MessageLog(),new UpdateWrapper
}else {
LOGGER.error("{}=============>消息發(fā)送失敗",msgId);
}
});
/**
* 消息失敗回調(diào)
* msg:消息主題
* repCode:響應(yīng)碼
* repText:響應(yīng)內(nèi)容
* exchange:交換機(jī)
* routingKey:路由鍵
*/
rabbitTemplate.setReturnCallback((msg,reCode,repText,exchange,routingKey)->{
LOGGER.error("{}==============>消息發(fā)送失敗 ",msg.getBody());
});
return rabbitTemplate;
}
// 創(chuàng)建名為msg.queue的隊(duì)列
@Bean
public Queue queue(){
return new Queue("msg.queue");
}
//聲明交換機(jī)(構(gòu)建direct類型的交換機(jī), 交換機(jī)名為msg.exchange)
@Bean
public DirectExchange directExchange(){
return new DirectExchange("msg.exchange");
}
// 通過msg.routing.key將隊(duì)列綁定到交換機(jī)
@Bean
public Binding binding(){
return BindingBuilder.bind(queue()).to(directExchange ()).with("msg.routing.key");
}
}
注意: 需要再yml中spring.rabbitmq進(jìn)行如下設(shè)置
# 消息確認(rèn)回調(diào)
publisher-confirm-type: correlated
# 消息失敗回調(diào)
publisher-returns: true
9. 進(jìn)行任務(wù)調(diào)度
在com.example.rabbitmqmsgrk.schedule包中創(chuàng)建MsgSchedule類,進(jìn)行定時(shí)任務(wù)
@Component
public class MsgSchedule {
@Resource
private MessageLogMapper messageLogMapper;
@Resource
private OrderMapper orderMapper;
@Resource
private RabbitTemplate rabbitTemplate;
// 定時(shí)任務(wù),使用cron表達(dá)式實(shí)現(xiàn)每隔10秒執(zhí)行一次下面的msgTask()方法
@Scheduled(cron = "0/10 * * * * ?")
public void msgTask(){
// 查詢消息狀態(tài)為0即正在投遞中的,并且tryTime重試時(shí)間小于當(dāng)前時(shí)間。
List
list.forEach(messageLog -> {
//判斷是否嘗試次數(shù)到3,代表發(fā)送失敗,修改當(dāng)前消息的status為2
if(messageLog.getTryCount()>=3){
messageLogMapper.update(new MessageLog(),new UpdateWrapper
}
//沒到3,繼續(xù)發(fā)送,并且修改狀態(tài)
messageLogMapper.update(new MessageLog(),new UpdateWrapper
.set("update_time",LocalDateTime.now()).set("try_time", LocalDateTime.now().plusMinutes(1))
.eq("message_id",messageLog.getMessageId()));
Order order = orderMapper.selectById(messageLog.getOrderId());
//重新發(fā)送消息
rabbitTemplate.convertAndSend("msg.exchange","msg.routing.key", order, new CorrelationData(messageLog.getMessageId()));
});
}
}
注意:在啟動(dòng)類上面添加@EnableScheduling注解
需要再orders表中添加以下數(shù)據(jù)
訪問:http://localhost:8080/msgTest?orderId=1&stockId=1 請求接口成功
Cron表達(dá)式學(xué)習(xí):https://blog.csdn.net/ITKidKid/article/details/126386738
10.最終結(jié)果
1、發(fā)送消息成功
2、模擬生產(chǎn)者消息首次發(fā)送失敗
我們將controller中msgTest方法里面的converAndSend中交換機(jī)的名字改掉,改成 “msg.exchange.test”,如此生產(chǎn)者發(fā)送消息肯定失敗,這個(gè)消息的狀態(tài)值為0,后面記得改回來
訪問: http://localhost:8080/msgTest?orderId=1&stockId=1
消息發(fā)送失敗
定時(shí)任務(wù)去查詢消息狀態(tài)為投遞中的消息進(jìn)行重發(fā)
消息重發(fā)成功
再查看數(shù)據(jù)庫message_log表,可以看到這個(gè)消息的try_count值為1,表示它進(jìn)行過一次重試,最后status值由0被修改成了1
3、模擬消息首次發(fā)送失敗,定時(shí)任務(wù)重試也失敗
將定時(shí)任務(wù)中的converAndSend中交換機(jī)的名字也改掉,改成 “msg.exchange.test”,如此來模擬生產(chǎn)者首次發(fā)送的消息失敗,定時(shí)任務(wù)重試也失敗。后面記得改回來
訪問: http://localhost:8080/msgTest?orderId=1&stockId=1
訪問成功立刻查看數(shù)據(jù)庫message_log表,可以看到這個(gè)消息的try_count值為0,status值也是0,重試次數(shù)為0,即狀態(tài)是投遞中
定時(shí)任務(wù)MsgSchedule類中的定時(shí)任務(wù)每隔10秒就會(huì)去重試下,當(dāng)重試次數(shù)超過3次,就直接將當(dāng)前消息的狀態(tài)值修改為2,再次查看message_log表的數(shù)據(jù)。
注:等待時(shí)間大約3分鐘,即3分鐘后當(dāng)前消息的status狀態(tài)才會(huì)變成2
最后不要忘記更正controller中以及MsgSchedule類中交換機(jī)的名字。
11.在高并發(fā)的場景下是否合適?
第一種方案對數(shù)據(jù)有兩次入庫,一次業(yè)務(wù)數(shù)據(jù)入庫,一次消息入庫。這樣對數(shù)據(jù)的入庫是一個(gè)瓶頸。
其實(shí)我們只需要對業(yè)務(wù)進(jìn)行入庫。
5.2 RabbitMQ 如何避免消息重復(fù)消費(fèi)?
5.2.1 冪等性
消息的冪等性是指一次消息傳遞可能會(huì)發(fā)生多次,但最終業(yè)務(wù)狀態(tài)只會(huì)改變一次。換句話說,即使多次收到了同一消息,也不會(huì)導(dǎo)致重復(fù)的業(yè)務(wù)處理。冪等操作的特點(diǎn)是其任意多次執(zhí)行所產(chǎn)生的影響均與一次執(zhí)行的影響相同。
保證消息的冪等性在開發(fā)中是很重要的,例如在客戶點(diǎn)擊付款的情況下,如果點(diǎn)擊了多次,系統(tǒng)也只能扣一次費(fèi)。此外,實(shí)現(xiàn)冪等性操作可以免去因重試等造成系統(tǒng)產(chǎn)生的未知問題。
然而,消息隊(duì)列如RabbitMQ、RocketMQ、kafka等,都可能出現(xiàn)消息的重復(fù)發(fā)送,這個(gè)是消息隊(duì)列無法保障的。在這種情況下,我們需要開發(fā)人員去保證消息的冪等性。實(shí)際上,消息隊(duì)列沒法幫你做到消費(fèi)端的冪等性,消費(fèi)端的冪等性得基于業(yè)務(wù)場景進(jìn)行實(shí)現(xiàn)。但是,至少得保證消息不能丟,且至少被消費(fèi)一次。
5.2.2 高并發(fā)的情況下如何避免消息重復(fù)消費(fèi)
唯一id+加指紋碼,利用數(shù)據(jù)庫主鍵去重。優(yōu)點(diǎn):實(shí)現(xiàn)簡單缺點(diǎn):高并發(fā)下有數(shù)據(jù)寫入瓶頸。利用Redis的原子性來實(shí)習(xí)。使用Redis進(jìn)行冪等是需要考慮的問題是否進(jìn)行數(shù)據(jù)庫落庫,落庫后數(shù)據(jù)和緩存如何做到保證冪等(Redis和數(shù)據(jù)庫如何同時(shí)成功同時(shí)失?。??如果不進(jìn)行落庫,都放在Redis中如何這是Redis和數(shù)據(jù)庫的同步策略?還有放在緩存中就能百分之百的成功嗎?
5.2.3 解決重復(fù)消費(fèi)的案例代碼
1. 添加依賴
在pom.xml中添加如下依賴
2. 添加配置
在application.yml中添加如下配置,記住,一定要設(shè)置手動(dòng)ACK,不然會(huì)報(bào)錯(cuò) acknowledge-mode: manual
spring:
# redis配置
redis:
# 超時(shí)時(shí)間
timeout: 10000ms
# 服務(wù)器地址
host: 服務(wù)器地址
# 服務(wù)器端口
port: 6379
database: 0
lettuce:
pool:
# 連接池最大連接數(shù) 默認(rèn)8 ,負(fù)數(shù)表示沒有限制
max-active: 1024
# 最大連接阻塞等待時(shí)間,默認(rèn)-1
max-wait: 10000ms
# 最大空閑連接
max-idle: 200
# 最小空閑連接
min-idle: 5
password: redis密碼
# rabbitmq配置
rabbitmq:
simple:
# 手動(dòng)ack Acknowledge mode of container. auto none
acknowledge-mode: manual
3. 生產(chǎn)者代碼
在com.example.rabbitmqmsgrk.web包中的OrderController類中創(chuàng)建repetition方法用于發(fā)送消息
@GetMapping("/repetition")
public String repetition(){
// 給消息封裝一個(gè)唯一id對象
String msgId= UUID.randomUUID().toString();
/**
* 發(fā)送消息
* @param exchange 為交換機(jī)名字
* @param routingKey 為路由鍵
* @param object 為需要發(fā)送消息的內(nèi)容
* @param correlationData 為本次消息的ID
*/
rabbitTemplate.convertAndSend("msg.exchange","msg.routing.key", "消息重復(fù)消費(fèi)問題處理", new CorrelationData(msgId));
return "成功";
}
4. 消費(fèi)者代碼
在com.example.rabbitmqmsgrk.config 中創(chuàng)建ReceiveHandler消費(fèi)者接收類,用于進(jìn)行消息消費(fèi)
@Component
public class ReceiveHandler {
@Resource
private StringRedisTemplate stringRedisTemplate;
// 監(jiān)聽隊(duì)列
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "msg.queue", durable = "true"),
exchange = @Exchange(
value = "msg.exchange",
ignoreDeclarationExceptions = "true",
type = ExchangeTypes.TOPIC
),
key = {"msg.routing.*", "msg.#"}))
public void repetition(String msg, Channel channel, Message message) throws IOException {
// 1. 消息的唯一標(biāo)識(shí)
long deliveryTag = message.getMessageProperties().getDeliveryTag();
// 2. 獲取MessageId, 消息唯一id
String messageId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");
// 3. 設(shè)置key到Redis
if (stringRedisTemplate.opsForValue().setIfAbsent(messageId, "0", 10, TimeUnit.SECONDS)) {
// 4. 消費(fèi)消息
System.out.println("接收到消息:" + msg);
// 5. 設(shè)置key的value為1
stringRedisTemplate.opsForValue().set(messageId, "1", 10, TimeUnit.SECONDS);
// 6. 手動(dòng)ack
channel.basicAck(deliveryTag, false);
} else {
// 4. 獲取Redis中的value即可 如果是1,手動(dòng)ack
if ("1".equalsIgnoreCase(stringRedisTemplate.opsForValue().get(messageId))) {
System.out.println("消息:" + messageId + "已消費(fèi)");
// 5. 手動(dòng)ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
}
}
5. 最終結(jié)果
1、消息消費(fèi)成功
訪問:http://localhost:8080/repetition
查看控制臺(tái)和redis,可以看到redis中的值變成1了,表示消息成功消費(fèi)了
2、模擬消息重復(fù)消費(fèi)場景
在OrderController類中repetition`方法添加for循環(huán)模擬消息重復(fù)消費(fèi)場景,連續(xù)發(fā)送3次重復(fù)消息
for (int i = 0; i < 3; i++) {
rabbitTemplate.convertAndSend("msg.exchange", "msg.routing.key", "消息重復(fù)消費(fèi)問題處理", new CorrelationData(msgId));
}
訪問:http://localhost:8080/repetition
查看控制臺(tái),可以看到消息重復(fù)進(jìn)行了消費(fèi)
5.3 RabbitMQ 如何避免消息積壓?
5.3.1 解決方案
優(yōu)化業(yè)務(wù)流程:檢查并優(yōu)化業(yè)務(wù)邏輯,確保消費(fèi)者能夠及時(shí)處理消息。這可能需要對業(yè)務(wù)邏輯進(jìn)行重新設(shè)計(jì),或者增加更多的消費(fèi)者來提高處理速度。增加消費(fèi)者數(shù)量:通過增加消費(fèi)者的數(shù)量,可以并行處理更多的消息,從而減輕單個(gè)消費(fèi)者的負(fù)擔(dān)。調(diào)整RabbitMQ的參數(shù):根據(jù)實(shí)際情況,調(diào)整RabbitMQ的參數(shù),如消息的生存時(shí)間(TTL)、最大隊(duì)列長度等。例如,可以設(shè)置消息的生存時(shí)間為較短的時(shí)間,以便消息能夠在隊(duì)列中保留的時(shí)間更短,從而減少隊(duì)列的壓力。使用死信隊(duì)列:在RabbitMQ中,可以使用死信隊(duì)列來處理無法正常處理的消息。當(dāng)消息在隊(duì)列中過期或者被拒絕時(shí),可以將其發(fā)送到死信隊(duì)列中,以便進(jìn)行后續(xù)處理。監(jiān)控和告警:建立監(jiān)控系統(tǒng),實(shí)時(shí)監(jiān)控RabbitMQ的運(yùn)行狀態(tài)和隊(duì)列情況。當(dāng)出現(xiàn)消息積壓時(shí),及時(shí)發(fā)出告警通知,以便能夠及時(shí)采取措施解決問題。臨時(shí)擴(kuò)容:如果以上措施無法滿足需求,可以考慮臨時(shí)擴(kuò)容,增加更多的消費(fèi)者和資源來處理積壓的消息。但這只是應(yīng)急措施,需要在后續(xù)對系統(tǒng)進(jìn)行進(jìn)一步的優(yōu)化和改進(jìn)。
更多命令
一、卸載rabbitmq相關(guān)的
1、卸載前先停掉rabbitmq服務(wù),執(zhí)行命令
service rabbitmq-server stop
2、查看rabbitmq安裝的相關(guān)列表
yum list | grep rabbitmq
3、卸載rabbitmq已安裝的相關(guān)內(nèi)容
yum -y remove rabbitmq-server.noarch
二、卸載erlang
1、查看erlang安裝的相關(guān)列表
yum list | grep erlang
2、卸載erlang已安裝的相關(guān)內(nèi)容
yum -y remove erlang-*
yum remove erlang.x86_64
啟動(dòng)服務(wù):rabbitmq-server -detached # 以后臺(tái)守護(hù)進(jìn)程方式啟動(dòng)
查看狀態(tài):rabbitmqctl status
關(guān)閉服務(wù):rabbitmqctl stop
列出角色:rabbitmqctl list_users
rabbitmqctl list_permissions # 查看(指定vhostpath)所有用戶的權(quán)限
rabbitmqctl list_permissions -p / # 查看virtual host為/的所有用戶權(quán)限
rabbitmqctl list_user_permissions developer # 查看指定用戶的權(quán)限
rabbitmqctl clear_permissions developer # 清除用戶權(quán)限
rabbitmqctl delete_user guest # 刪除用戶
rabbitmqctl change_password developer dev123456 # 修改密碼
感謝博主:
https://blog.csdn.net/Bejpse/article/details/126424250
https://blog.csdn.net/hengheng__/article/details/123390048
柚子快報(bào)激活碼778899分享:分布式 RabbitMQ實(shí)戰(zhàn)
相關(guān)閱讀
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場。
轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。