欧美free性护士vide0shd,老熟女,一区二区三区,久久久久夜夜夜精品国产,久久久久久综合网天天,欧美成人护士h版

首頁綜合 正文
目錄

柚子快報(bào)激活碼778899分享:分布式 RabbitMQ實(shí)戰(zhàn)

Wish心愿購綜合2025-08-16310

柚子快報(bào)激活碼778899分享:分布式 RabbitMQ實(shí)戰(zhàn)

http://yzkb.51969.com/

文章目錄

1、簡介2、MQ優(yōu)點(diǎn)缺點(diǎn)MQ的應(yīng)用場景AMQP工作原理市面上常見的MQ

3、Linux安裝RabbitMQ3.1 版本對應(yīng)3.2 安裝socat3.3 下載 Erlang/OTP、安裝、驗(yàn)證 erlang方法一:1. 下載2. 將下載的Erlang服務(wù)上傳到服務(wù)器上面3. 解壓4. 編譯erlang的依賴環(huán)境5. 安裝Erlang6. 配置Erlang環(huán)境7. 測試Erlang是否安裝成功

方法二:1. 下載2. 安裝3. 驗(yàn)證 erlang 是否安裝成功。4. 卸載 erlang(遇到下載的erlang與rabbitmq-server 版本沖突)5. 重新安裝 erlang 和驗(yàn)證 erlang

3.4 安裝、驗(yàn)證rabbitmq-server(rabbitMQ服務(wù)器)1. 下載RabbitMQ2. 將RabbitMQ上傳到服務(wù)器3. 解壓RabbitMQ服務(wù)4. 配置環(huán)境變量5. 開啟web管理插件6. 啟動(dòng)RabbitMQ服務(wù)7. 訪問RabbitMQ管理界面8. 設(shè)置允許遠(yuǎn)程訪問方法一——新加用戶方法二——設(shè)置guest

4、RabbitMQ實(shí)戰(zhàn)4.1 什么是消息隊(duì)列4.2 RabbitMQ簡介4.3 消息隊(duì)列應(yīng)用場景1. 任務(wù)異步處理:2. 應(yīng)用程序解耦合:

4.4 RabbitMQ的工作原理1. 組成部分說明:2. 生產(chǎn)者發(fā)送消息流程:3. 消費(fèi)者接收消息流程:

4.5 六種工作模式4.5.1 基本消息模式(簡單消息模式)1.1 案例實(shí)戰(zhàn)1、 新建一個(gè)maven工程2、 添加依賴3、 再到j(luò)ava目錄下創(chuàng)建org.example.util包,在此包下創(chuàng)建連接工具類:4、 生產(chǎn)者發(fā)送消息4.1 在org.example.simple包下創(chuàng)建Send類,用于生產(chǎn)者發(fā)送消息。4.2 運(yùn)行上述main方法,在控制臺(tái)打印信息如下所示:4.3 打開瀏覽器訪問:http://IP:156724.4 如下圖點(diǎn)擊Queues,可以在隊(duì)列列表中可以看到名為simple_queue的隊(duì)列。4.5 點(diǎn)擊隊(duì)列名稱simple_queue,進(jìn)入詳情頁 --->Get messages,可以查看消息:

5、消費(fèi)者接收消息5.1 在org.example.simple包下創(chuàng)建Receiver類,用于消費(fèi)者接收消息5.2 運(yùn)行上述main方法,在控制臺(tái)打印信息如下:5.3 打開瀏覽器訪問:http://IP:156725.4 再看看隊(duì)列的消息,已經(jīng)被消費(fèi)了,Ready值為0,Total值也為0了。

1.2 消息確認(rèn)機(jī)制ACK1、 在org.example.simple包下創(chuàng)建ACKReceiver類,用于消費(fèi)者接收消息2、自動(dòng)ACK存在的問題2.1 修改消費(fèi)者Receiver類的代碼2.2 生產(chǎn)者Send類不做任何修改,**直接運(yùn)行Send類中的main方法**,2.3 運(yùn)行Receiver類消費(fèi)者中的main方法,程序拋出異常:2.4再查看rabbitmq的web管理界面:

3、演示手動(dòng)ACK3.1 重新運(yùn)行生產(chǎn)者Send中的main方法,實(shí)現(xiàn)發(fā)送消息,3.2 再修改ACKReceiver類中的handleDelivery方法3.3 再運(yùn)行ACKReceiver類中的main方法,程序拋出異常:3.4 查看web管理頁面:

4、最后消息確認(rèn)機(jī)制的正確做法4.1 我們要在監(jiān)聽隊(duì)列時(shí)設(shè)置第二個(gè)參數(shù)為false,代碼中手動(dòng)進(jìn)行ACK4.2 最后運(yùn)行ACKReceiver類中的main方法,查看web管理頁面

4.5.2 work工作隊(duì)列模式2.1 案例實(shí)戰(zhàn)1、生產(chǎn)者2、消費(fèi)者13、消費(fèi)者24、進(jìn)行消息消費(fèi)

2.2 能者多勞2.3 訂閱模型分類1. 說明2. Exchange類型有以下幾種:

4.5.3 Publish/subscribe | 發(fā)布/訂閱模式 (交換機(jī)類型:Fanout,也稱為廣播)3.1 案例實(shí)戰(zhàn)1、生產(chǎn)者2、消費(fèi)者1(注冊成功發(fā)給短信服務(wù))3、消費(fèi)者2(注冊成功發(fā)給郵件服務(wù))4、進(jìn)行消息消費(fèi)5、思考5.1 publish/subscribe與work queues有什么區(qū)別。5.2 實(shí)際工作用 publish/subscribe還是work queues?

4.5.4 Routing 路由模式(交換機(jī)類型:direct)4.1 案例實(shí)戰(zhàn)1、生產(chǎn)者2、消費(fèi)者1(使用routing key為sms來綁定隊(duì)列與交換機(jī))3、消費(fèi)者2(用routing key為email來綁定隊(duì)列與交換機(jī))

4.5.5 Topics通配符模式(交換機(jī)類型:topics)4.1 Topics模型示意圖:4.2 通配符規(guī)則4.3 舉例4.4 案例實(shí)戰(zhàn)1、生產(chǎn)者2、消費(fèi)者13、消費(fèi)者2

4.6 SpringBoot整合RabbitMQ4.6.1 創(chuàng)建SpringBoot項(xiàng)目4.6.2 添加依賴4.6.3 添加配置4.6.4 添加配置類4.6.5 在測試類中添加生產(chǎn)者,并發(fā)送消息4.6.6 生產(chǎn)者發(fā)送消息測試結(jié)果4.6.7 創(chuàng)建消息接收器(消費(fèi)者)4.6.8 消費(fèi)者消費(fèi)結(jié)果

5、RabbitMQ問題相關(guān)解決方案5.1 生產(chǎn)端可靠性投遞方案介紹5.1.1 關(guān)于怎么保證生產(chǎn)者的可靠性投遞5.1.2 消息發(fā)送方案一:消息落庫,對消息狀態(tài)進(jìn)行標(biāo)記方案二:消息延遲投遞,做二次確認(rèn),回調(diào)檢查

5.1.3 消息落庫,對消息狀態(tài)打標(biāo)的具體實(shí)現(xiàn)開啟消息回調(diào)機(jī)制1. 創(chuàng)建數(shù)據(jù)庫2. 創(chuàng)建Spring Boot項(xiàng)目3. 添加依賴4. 添加配置5. 創(chuàng)建實(shí)體類6. 創(chuàng)建mapper7. 創(chuàng)建Controller8. 創(chuàng)建配置類9. 進(jìn)行任務(wù)調(diào)度10.最終結(jié)果1、發(fā)送消息成功2、模擬生產(chǎn)者消息首次發(fā)送失敗3、模擬消息首次發(fā)送失敗,定時(shí)任務(wù)重試也失敗

11.在高并發(fā)的場景下是否合適?

5.2 [RabbitMQ](https://so.csdn.net/so/search?q=RabbitMQ&spm=1001.2101.3001.7020) 如何避免消息重復(fù)消費(fèi)?5.2.1 冪等性5.2.2 高并發(fā)的情況下如何避免消息重復(fù)消費(fèi)5.2.3 解決重復(fù)消費(fèi)的案例代碼1. 添加依賴2. 添加配置3. 生產(chǎn)者代碼4. 消費(fèi)者代碼5. 最終結(jié)果1、消息消費(fèi)成功2、模擬消息重復(fù)消費(fèi)場景

5.3 [RabbitMQ](https://so.csdn.net/so/search?q=RabbitMQ&spm=1001.2101.3001.7020) 如何避免消息積壓?5.3.1 解決方案

更多命令

1、簡介

素材:鏈接:https://pan.baidu.com/s/1YjVM9WBEIVCbYZmlzowyKw?pwd=lpkz

官網(wǎng):https://www.rabbitmq.com/

RabbitMQ 是一個(gè)開源的消息隊(duì)列中間件,采用 Erlang 語言編寫,支持多種消息協(xié)議,如 AMQP、MQTT、STOMP 等。它可以作為消息的中轉(zhuǎn)站,在分布式系統(tǒng)中協(xié)調(diào)不同組件之間的數(shù)據(jù)傳輸,實(shí)現(xiàn)松耦合的系統(tǒng)架構(gòu)。

2、MQ

優(yōu)點(diǎn)

靈活的路由方式,支持消息的廣播、點(diǎn)對點(diǎn)、主題訂閱等多種路由方式;異步處理消息,提高系統(tǒng)的并發(fā)性能;持久化機(jī)制,保證在服務(wù)器宕機(jī)、重啟等情況下消息的可靠性;高可用和負(fù)載均衡,支持集群和鏡像模式,提供高可用和負(fù)載均衡的目標(biāo);插件機(jī)制,支持豐富的插件,如認(rèn)證授權(quán)、可視化管理等。

缺點(diǎn)

系統(tǒng)可用性降低: 系統(tǒng)引入的外部依賴越多,系統(tǒng)穩(wěn)定性越差。一旦MQ宕機(jī),就會(huì)對業(yè)務(wù)造成影響。系統(tǒng)復(fù)雜度提高: MQ的加入大大增加了系統(tǒng)的復(fù)雜度,以前系統(tǒng)間是同步的遠(yuǎn)程調(diào)用,現(xiàn)在是通過MQ進(jìn)行異步調(diào)用。一致性問題 : A系統(tǒng)處理完業(yè)務(wù),通過MQ給B、C、D三個(gè)系統(tǒng)發(fā)消息數(shù)據(jù),如果B系統(tǒng)、C系統(tǒng)處理成功,D系統(tǒng)處理失敗,則會(huì)造成數(shù)據(jù)處理的不一致。

MQ的應(yīng)用場景

高峰流量:搶紅包、秒殺活動(dòng)、搶火車票等這些業(yè)務(wù)場景都是短時(shí)間內(nèi)需要處理大量請求,如果直接連接系統(tǒng)處理業(yè)務(wù),會(huì)耗費(fèi)大量資源,有可能造成系統(tǒng)癱瘓。 而使用MQ后,可以先讓用戶將請求發(fā)送到MQ中,MQ會(huì)先保存請求消息,不會(huì)占用系統(tǒng)資源,且MQ會(huì)進(jìn)行消息排序,先請求的秒殺成功,后請求的秒殺失敗。消息分發(fā):如電商網(wǎng)站要推送促銷信息,該業(yè)務(wù)耗費(fèi)時(shí)間較多,但對時(shí)效性要求不高,可以使用MQ做消息分發(fā)。數(shù)據(jù)同步:假如我們需要將數(shù)據(jù)保存到數(shù)據(jù)庫之外,還需要一段時(shí)間將數(shù)據(jù)同步到緩存(如Redis)、搜索引擎(如Elasticsearch)中。此時(shí)可以將數(shù)據(jù)庫的數(shù)據(jù)作為消息發(fā)送到MQ中,并同步到緩存、 搜索引擎中。異步處理:在電商系統(tǒng)中,訂單完成后,需要及時(shí)的通知子系統(tǒng)(進(jìn)銷存系統(tǒng)發(fā)貨,用戶服務(wù)積分,發(fā)送短信)進(jìn)行下一步操作。為了保證訂單系統(tǒng)的高性能,應(yīng)該直接返回訂單結(jié)果,之后讓MQ通知子系統(tǒng)做其他非實(shí)時(shí)的業(yè)務(wù)操作。這樣能保證核心業(yè)務(wù)的高效及時(shí)離線處理:在銀行系統(tǒng)中,如果要查詢近十年的歷史賬單,這是非常耗時(shí)的操作。如果發(fā)送同步請求,則會(huì)花費(fèi)大量時(shí)間等待響應(yīng)。此時(shí)使用MQ發(fā)送異步請求,等到查詢出結(jié)果后獲取結(jié)果即可。

AMQP

1、什么是 AMQP : 即Advanced Message Queuing Protocol(高級(jí)消息隊(duì)列協(xié)議),是一個(gè)網(wǎng)絡(luò)協(xié)議,專門為消息中間件設(shè)計(jì)?;诖藚f(xié)議的客戶端與消息中間件可傳遞消息,并不受不同中間件產(chǎn)品,不同開發(fā)語言等條件的限制。2006年AMQP規(guī)范發(fā)布,類比HTTP。

2、AMQP工作過程: 生產(chǎn)者(Publisher)將消息發(fā)布到交換機(jī)(Exchange),交換機(jī)根據(jù)規(guī)則將消息分發(fā)給交換機(jī)綁定的隊(duì)列(Queue),隊(duì)列再將消息投遞給訂閱了此隊(duì)列的消費(fèi)者

工作原理

Producer【消息的生產(chǎn)者】 一個(gè)向交換機(jī)發(fā)布消息的客戶端應(yīng)用程序。Connection 【連接】 生產(chǎn)者/消費(fèi)者和RabbitMQ服務(wù)器之間建立的TCP連接。Channel【信道】 是TCP里面的虛擬連接。例如:Connection相當(dāng)于電纜,Channel相當(dāng)于獨(dú)立光纖束,一條TCP連接中可以創(chuàng)建多條信道,增加連接效率。無論是發(fā)布消息、接收消息、訂閱隊(duì)列都是通過信道完成的。Broker 消息隊(duì)列服務(wù)器實(shí)體。即RabbitMQ服務(wù)器Virtual Host【虛擬主機(jī)】 出于多租戶和安全因素設(shè)計(jì)的,把AMQP的基本組件劃分到一個(gè)虛擬的分組中。每個(gè)vhost本質(zhì)上就是一個(gè)mini版的RabbitMQ服務(wù)器,擁有自己的隊(duì)列、交換機(jī)、綁定和權(quán)限機(jī)制。當(dāng)多個(gè)不同的用戶使用同一個(gè)RabbitMQ服務(wù)器時(shí),可以劃分出多個(gè)虛擬主機(jī)。RabbitMQ默認(rèn)的虛擬主機(jī)路徑是 /Exchange 【交換機(jī)】 用來接收生產(chǎn)者發(fā)送的消息,并根據(jù)分發(fā)規(guī)則,將這些消息分發(fā)給服務(wù)器中的隊(duì)列中。不同的交換機(jī)有不同的分發(fā)規(guī)則。Queue【消息隊(duì)列】 用來保存消息直到發(fā)送給消費(fèi)者。它是消息的容器,也是消息的終點(diǎn)。消息一直在隊(duì)列里面,等待消費(fèi)者鏈接到這個(gè) 隊(duì)列將其取走。Binding 【綁定】 消息隊(duì)列和交換機(jī)之間的虛擬連接,綁定中包含路由規(guī)則,綁定信息保存到交換機(jī)的路由表中,作為消息的分發(fā)依據(jù)。Consumer【消息的消費(fèi)者】 表示一個(gè)從消息隊(duì)列中取得消息的客戶端應(yīng)用程序。

市面上常見的MQ

3、Linux安裝RabbitMQ

安裝rabbitmq分3個(gè)步: 1、先安裝socat, ——》2、安裝erlang, ——》3、安裝rabbitmq-server。

3.1 版本對應(yīng)

網(wǎng)址:https://www.rabbitmq.com/which-erlang.html

3.2 安裝socat

命令:yum -y install socat

3.3 下載 Erlang/OTP、安裝、驗(yàn)證 erlang

官網(wǎng):下載 - Erlang/OTP

方法一:

1. 下載

2. 將下載的Erlang服務(wù)上傳到服務(wù)器上面

cd /home

mkdir /home/rabbitMQ

cd /home/rabbitMQ

3. 解壓

tar -zvxf otp_src_24.0.tar.gz

4. 編譯erlang的依賴環(huán)境

跟大家講一下,erlang依賴的環(huán)境特別特別多,就拿gcc來說,如果以前安裝過這個(gè)環(huán)境還不止,所以我們重新安裝一下也無所謂所以我們執(zhí)行以下的命令:

解壓成功,安裝編譯所需要的依賴文件

yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel unixODBC-devel

等待安裝完畢

創(chuàng)建Erlang文件夾

mkdir /home/rabbitMQ/erlang

cd /home/rabbitMQ/otp_src_24.0

然后執(zhí)行下面的命令

./configure --prefix=/home/rabbitMQ/erlang --without-javac

5. 安裝Erlang

make : 編譯 make install : 安裝 && : 前面的命令執(zhí)行成功后面的命令才會(huì)執(zhí)行

make && make install

6. 配置Erlang環(huán)境

vi /etc/profile

加入

#set erlang environment

export ERLANG_HOME=/home/rabbitMQ/erlang

export PATH=$JAVA_HOME/bin:$JRE_HOME/bin:${ERLANG_HOME}/bin:$PATH

按Esc鍵 輸入 :wq (退出并保存) :q! (退出不保存)

刷新配置文件

7. 測試Erlang是否安裝成功

輸入命令: erl

如圖所示說明已經(jīng)安裝成功了??!

方法二:

1. 下載

下載命令: sudo yum install erlang

2. 安裝

接著上一步, 繼續(xù)回復(fù)“y”, 提示:見到Complete!(成功),表示安裝erlang 成功了。

3. 驗(yàn)證 erlang 是否安裝成功。

命令: yum info erlang 提示: erlang 的版本信息、軟件網(wǎng)址、占用大小空間等,就表示安裝成功了。

4. 卸載 erlang(遇到下載的erlang與rabbitmq-server 版本沖突)

執(zhí)行3條命令

yum list | grep erlang

yum -y remove erlang-*

yum remove erlang.x86_64

5. 重新安裝 erlang 和驗(yàn)證 erlang

安裝已經(jīng)下載好的erlang包, 文件路徑 ./rabbitMQ/ 文件下 安裝命令: rpm -ivh erlang-23.3-2.el8.x86_64.rpm 驗(yàn)證erlang命令: yum info erlang

3.4 安裝、驗(yàn)證rabbitmq-server(rabbitMQ服務(wù)器)

注意:需要下載Linux版本的

官網(wǎng):https://www.rabbitmq.com/

在RabbitMQ官網(wǎng)可以看到RabbitMQ對應(yīng)的Erlang版本

1. 下載RabbitMQ

2. 將RabbitMQ上傳到服務(wù)器

cd /home/rabbitMQ

3. 解壓RabbitMQ服務(wù)

根據(jù)壓縮包后綴不同使用不同的命令進(jìn)行解壓

xz -d rabbitmq-server-generic-unix-latest-toolchain-3.9.5.tar.xz

tar -xvf rabbitmq-server-generic-unix-latest-toolchain-3.9.5.tar

4. 配置環(huán)境變量

vi /etc/profile

加入

#set rabbitmq environment

export RABBITMQ=/home/rabbitMQ/rabbitmq_server-3.9.5

export PATH=$PATH:${RABBITMQ}/sbin

按Esc鍵 輸入 :wq (退出并保存) :q! (退出不保存)

刷新配置文件

source /etc/profile

5. 開啟web管理插件

cd /home/rabbitMQ/rabbitmq_server-3.9.5/sbin

./rabbitmq-plugins enable rabbitmq_management # 啟動(dòng)指定的插件:

啟動(dòng)插件成功

6. 啟動(dòng)RabbitMQ服務(wù)

ls

./rabbitmq-server -detached # 以守護(hù)進(jìn)程啟動(dòng)

7. 訪問RabbitMQ管理界面

瀏覽器訪問:http://IP:15672

看到如下這個(gè)界面就是正常啟動(dòng)了

8. 設(shè)置允許遠(yuǎn)程訪問

從上面截圖可以看到使用guest登錄,提示“User can only log in via localhost”,無法登錄,原因是3.3.0版本后禁止用戶在除locahost外的訪問,只能通過本地主機(jī)登錄。

方法一——新加用戶

新加個(gè)用戶,設(shè)置權(quán)限,設(shè)置角色。

rabbitmqctl add_user admin admin:這個(gè)命令是用來添加一個(gè)新的RabbitMQ用戶這個(gè)命令將創(chuàng)建一個(gè)名為admin的用戶,并設(shè)置其密碼為admin請注意,這兩個(gè)參數(shù)(用戶名和密碼)在你的問題中是硬編碼的,這在實(shí)際生產(chǎn)環(huán)境中并不安全,建議使用更復(fù)雜和隨機(jī)化的用戶名和密碼rabbitmqctl set_permissions -p / admin ".*" ".*" ".*":這個(gè)命令是設(shè)置用戶admin在RabbitMQ的權(quán)限這里的-p /參數(shù)表示設(shè)置的是全局權(quán)限而".*" ".*" ".*"表示賦予admin用戶所有權(quán)限,包括配置權(quán)限、寫權(quán)限和讀權(quán)限r(nóng)abbitmqctl set_user_tags admin administrator:這個(gè)命令是為用戶admin添加了一個(gè)標(biāo)簽(或者權(quán)限等級(jí))在這個(gè)例子中,添加的是administrator標(biāo)簽這個(gè)命令可能不是必要的,因?yàn)镽abbitMQ通常不會(huì)直接使用這種用戶標(biāo)簽

rabbitmqctl add_user admin admin

rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

rabbitmqctl set_user_tags admin administrator

登錄成功

方法二——設(shè)置guest

在/home/rabbitMQ/rabbitmq_server-3.9.5/plugins/rabbit-3.9.5/ebin目錄下找到rabbit.app文件 (find / -name rabbit.app),修改參數(shù)。

{loopback_users, [<<"guest">>]}, 修改成{loopback_users, []},

重啟服務(wù)

rabbitmqctl stop #停止RabbitMQ

cd /home/rabbitMQ/rabbitmq_server-3.9.5/sbin

./rabbitmq-server -detached # 以守護(hù)進(jìn)程啟動(dòng)RabbitMQ

使用guest賬號(hào)登錄成功

4、RabbitMQ實(shí)戰(zhàn)

4.1 什么是消息隊(duì)列

MQ全稱為Message Queue,即消息隊(duì)列?!毕㈥?duì)列”是在消息的傳輸過程中保存消息的容器。它是典型的:生產(chǎn)者、消費(fèi)者模型。生產(chǎn)者不斷向消息隊(duì)列中生產(chǎn)消息,消費(fèi)者不斷的從隊(duì)列中獲取消息。因?yàn)橄⒌纳a(chǎn)和消費(fèi)都是異步的,而且只關(guān)心消息的發(fā)送和接收,沒有業(yè)務(wù)邏輯的侵入,這樣就實(shí)現(xiàn)了生產(chǎn)者和消費(fèi)者的解耦。

下圖中Producer為生產(chǎn)者,Queue為消息隊(duì)列,Consumer為消費(fèi)者

4.2 RabbitMQ簡介

RabbitMQ是一個(gè)開源的消息中間件,它實(shí)現(xiàn)了高效、可靠的消息傳遞機(jī)制,主要用于應(yīng)用程序之間的異步通信。它基于AMQP(高級(jí)消息隊(duì)列協(xié)議)規(guī)范設(shè)計(jì),支持多種編程語言,并提供了豐富的特性和靈活的架構(gòu)。

RabbitMQ的工作原理是利用隊(duì)列來存儲(chǔ)消息,并通過發(fā)布-訂閱模式實(shí)現(xiàn)消息的發(fā)送和接收。在這個(gè)模式下,消息的發(fā)送者將消息發(fā)布到一個(gè)交換器,交換器根據(jù)預(yù)定義的規(guī)則將消息路由到一個(gè)或多個(gè)隊(duì)列,然后消息的接收者從隊(duì)列中訂閱并消費(fèi)這些消息。

4.3 消息隊(duì)列應(yīng)用場景

1. 任務(wù)異步處理:

高并發(fā)環(huán)境下,由于來不及同步處理,請求往往會(huì)發(fā)生堵塞,比如說,大量的insert,update之類的請求同時(shí)到達(dá)MySQL,直接導(dǎo)致無數(shù)的行鎖表鎖,甚至最后請求會(huì)堆積過多,從而觸發(fā)too many connections錯(cuò)誤。通過使用消息隊(duì)列,我們可以異步處理請求,從而緩解系統(tǒng)的壓力。將不需要同步處理的并且耗時(shí)長的操作由消息隊(duì)列通知消息接收方進(jìn)行異步處理。減少了應(yīng)用程序的響應(yīng)時(shí)間。

2. 應(yīng)用程序解耦合:

MQ相當(dāng)于一個(gè)中介,生產(chǎn)方通過MQ與消費(fèi)方交互,它將應(yīng)用程序進(jìn)行解耦合

4.4 RabbitMQ的工作原理

1. 組成部分說明:

· Broker:消息隊(duì)列服務(wù)進(jìn)程,此進(jìn)程包括兩個(gè)部分:Exchange和Queue

· Exchange:消息隊(duì)列交換機(jī),按一定的規(guī)則將消息路由轉(zhuǎn)發(fā)到某個(gè)隊(duì)列,對消息進(jìn)行過濾。

· Queue:消息隊(duì)列,存儲(chǔ)消息的隊(duì)列,消息到達(dá)隊(duì)列并轉(zhuǎn)發(fā)給指定的接受者

· Producer:消息生產(chǎn)者,即生產(chǎn)方客戶端,生產(chǎn)方客戶端將消息發(fā)送給消息隊(duì)列

· Consumer:消息消費(fèi)者,即消費(fèi)方客戶端,接收MQ轉(zhuǎn)發(fā)的消息。

2. 生產(chǎn)者發(fā)送消息流程:

1、生產(chǎn)者和Broker建立TCP連接。

2、生產(chǎn)者和Broker建立Channel通道(信道)。

3、生產(chǎn)者通過Channel通道(信道)把消息發(fā)送給Broker,由Exchange將消息進(jìn)行轉(zhuǎn)發(fā)。

4、Exchange將消息轉(zhuǎn)發(fā)到指定的Queue(隊(duì)列)

3. 消費(fèi)者接收消息流程:

1、消費(fèi)者和Broker建立TCP連接

2、消費(fèi)者和Broker建立Channel通道(信道)

3、消費(fèi)者監(jiān)聽指定的Queue(隊(duì)列) (每個(gè)隊(duì)列都有一個(gè)名字)

4、當(dāng)有消息到達(dá)Queue時(shí)Broker默認(rèn)將消息推送給消費(fèi)者。

5、消費(fèi)者接收到消息。

6、ack(消息確認(rèn)機(jī)制)回復(fù)

4.5 六種工作模式

RabbitMQ有六種工作模式:基本消息模式、work消息模式、Publish/subscribe (交換機(jī)類型:Fanout,也稱為廣播模式)、Routing 路由模型(交換機(jī)類型:direct)、Topics 通配符模式(交換機(jī)類型:topics)、RPC

? 我們這里給大家重點(diǎn)介紹基本消息模式, Routing路由模式(重點(diǎn))、Topic通配符模式(重點(diǎn))。

4.5.1 基本消息模式(簡單消息模式)

在上圖的模型中,有以下概念:

P:生產(chǎn)者,也就是要發(fā)送消息的程序

C:消費(fèi)者:消息的接受者,會(huì)一直等待消息到來。

queue:消息隊(duì)列,圖中紅色部分??梢跃彺嫦?;生產(chǎn)者向其中投遞消息,消費(fèi)者從其中取出消息。

1.1 案例實(shí)戰(zhàn)

1、 新建一個(gè)maven工程

根據(jù)下面的步驟建立maven項(xiàng)目

2、 添加依賴

com.rabbitmq

amqp-client

5.7.1

org.slf4j

slf4j-simple

1.7.25

compile

3、 再到j(luò)ava目錄下創(chuàng)建org.example.util包,在此包下創(chuàng)建連接工具類:

public class ConnectionUtil {

/**

* 建立與RabbitMQ的連接

* @return

* @throws Exception

*/

public static Connection getConnection() throws Exception {

//定義連接工廠

ConnectionFactory factory = new ConnectionFactory();

//設(shè)置服務(wù)地址 (因?yàn)閞abbitmq安裝到linux上面,這里填寫linux的IP地址)

factory.setHost("192.168.181.128");

//端口

factory.setPort(5672);

//設(shè)置賬號(hào)信息,用戶名、密碼(rabbitmq的用戶名和密碼)

factory.setUsername("guest");

factory.setPassword("guest");

// 通過工廠獲取連接

Connection connection = factory.newConnection();

return connection;

}

}

4、 生產(chǎn)者發(fā)送消息

4.1 在org.example.simple包下創(chuàng)建Send類,用于生產(chǎn)者發(fā)送消息。

public class Send {

private final static String QUEUE_NAME = "simple_queue"; // 隊(duì)列名

public static void main(String[] argv) throws Exception {

// 1、獲取到連接

Connection connection = ConnectionUtil.getConnection();

// 2、從連接中創(chuàng)建通道,使用通道才能完成消息相關(guān)的操作

Channel channel = connection.createChannel();

// 3、聲明(創(chuàng)建)隊(duì)列

//參數(shù):String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments

/**

* 參數(shù)明細(xì)

* 1、queue 隊(duì)列名稱

* 2、durable 是否持久化,如果持久化,mq重啟后隊(duì)列還在

* 3、exclusive 是否獨(dú)占連接,隊(duì)列只允許在該連接中訪問,如果connection連接關(guān)閉隊(duì)列則自動(dòng)刪除,如果將此參數(shù)設(shè)置true可用于臨時(shí)隊(duì)列的創(chuàng)建

* 4、autoDelete 自動(dòng)刪除,隊(duì)列不再使用時(shí)是否自動(dòng)刪除此隊(duì)列,如果將此參數(shù)和exclusive參數(shù)設(shè)置為true就可以實(shí)現(xiàn)臨時(shí)隊(duì)列(隊(duì)列不用了就自動(dòng)刪除)

* 5、arguments 參數(shù),可以設(shè)置一個(gè)隊(duì)列的擴(kuò)展參數(shù),比如:可設(shè)置存活時(shí)間

*/

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 4、消息內(nèi)容

String message = "Hello World!";

// 向指定的隊(duì)列中發(fā)送消息

//參數(shù):String exchange, String routingKey, BasicProperties props, byte[] body

/**

* 參數(shù)明細(xì):

* 1、exchange,交換機(jī),如果不指定將使用mq的默認(rèn)交換機(jī)(設(shè)置為"")

* 2、routingKey,路由key,交換機(jī)根據(jù)路由key來將消息轉(zhuǎn)發(fā)到指定的隊(duì)列,如果使用默認(rèn)交換機(jī),routingKey設(shè)置為隊(duì)列的名稱

* 3、props,消息的屬性

* 4、body,消息內(nèi)容

*/

channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

System.out.println(" [x] Sent '" + message + "'");

//關(guān)閉通道和連接(資源關(guān)閉最好用try-catch-finally語句處理)

channel.close();

connection.close();

}

}

4.2 運(yùn)行上述main方法,在控制臺(tái)打印信息如下所示:

4.3 打開瀏覽器訪問:http://IP:15672

web管理頁面:服務(wù)器地址/端口號(hào) 默認(rèn)用戶及密碼:guest,如果沒有配置可根據(jù) 設(shè)置允許遠(yuǎn)程訪問中進(jìn)行用戶名密碼配置

4.4 如下圖點(diǎn)擊Queues,可以在隊(duì)列列表中可以看到名為simple_queue的隊(duì)列。

4.5 點(diǎn)擊隊(duì)列名稱simple_queue,進(jìn)入詳情頁 —>Get messages,可以查看消息:

5、消費(fèi)者接收消息

5.1 在org.example.simple包下創(chuàng)建Receiver類,用于消費(fèi)者接收消息

public class Receiver{

private final static String QUEUE_NAME = "simple_queue"; //隊(duì)列名

public static void main(String[] argv) throws Exception {

// 獲取到連接

Connection connection = ConnectionUtil.getConnection();

//創(chuàng)建會(huì)話通道,生產(chǎn)者和mq服務(wù)所有通信都在channel通道中完成

Channel channel = connection.createChannel();

// 聲明隊(duì)列

//參數(shù):String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments

/**

* 參數(shù)明細(xì)

* 1、queue 隊(duì)列名稱

* 2、durable 是否持久化,如果持久化,mq重啟后隊(duì)列還在

* 3、exclusive 是否獨(dú)占連接,隊(duì)列只允許在該連接中訪問,如果connection連接關(guān)閉隊(duì)列則自動(dòng)刪除,如果將此參數(shù)設(shè)置true可用于臨時(shí)隊(duì)列的創(chuàng)建

* 4、autoDelete 自動(dòng)刪除,隊(duì)列不再使用時(shí)是否自動(dòng)刪除此隊(duì)列,如果將此參數(shù)和exclusive參數(shù)設(shè)置為true就可以實(shí)現(xiàn)臨時(shí)隊(duì)列(隊(duì)列不用了就自動(dòng)刪除)

* 5、arguments 參數(shù),可以設(shè)置一個(gè)隊(duì)列的擴(kuò)展參數(shù),比如:可設(shè)置存活時(shí)間

*/

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

//實(shí)現(xiàn)消費(fèi)方法

DefaultConsumer consumer = new DefaultConsumer(channel){

// 獲取消息,并且處理,這個(gè)方法類似事件監(jiān)聽,如果有消息的時(shí)候,會(huì)被自動(dòng)調(diào)用

/**

* 當(dāng)接收到消息后此方法將被調(diào)用

* @param consumerTag 消費(fèi)者標(biāo)簽,用來標(biāo)識(shí)消費(fèi)者的,在監(jiān)聽隊(duì)列時(shí)設(shè)置channel.basicConsume

* @param envelope 信封,通過envelope

* @param properties 消息屬性

* @param body 消息內(nèi)容

* @throws IOException

*/

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

// body 即消息體

String msg = new String(body,"utf-8");

System.out.println(" [x] received : " + msg + "!");

}

};

// 監(jiān)聽隊(duì)列,第二個(gè)參數(shù):是否自動(dòng)進(jìn)行消息確認(rèn)(用于監(jiān)聽queue隊(duì)列中是否收到了消息,如果收到消息自動(dòng)調(diào)用上面DefaultConsumer進(jìn)行默認(rèn)消費(fèi))。

//參數(shù):String queue, boolean autoAck, Consumer callback

/**

* 參數(shù)明細(xì):

* 1、queue 隊(duì)列名稱

* 2、autoAck 自動(dòng)回復(fù),當(dāng)消費(fèi)者接收到消息后要告訴mq消息已接收,如果將此參數(shù)設(shè)置為true表示會(huì)自動(dòng)回復(fù)mq,如果設(shè)置為false要通過編程實(shí)現(xiàn)回復(fù)

* 3、callback,消費(fèi)方法,當(dāng)消費(fèi)者接收到消息要執(zhí)行的方法

*/

channel.basicConsume(QUEUE_NAME, true, consumer);

}

}

5.2 運(yùn)行上述main方法,在控制臺(tái)打印信息如下:

5.3 打開瀏覽器訪問:http://IP:15672

web管理頁面:服務(wù)器地址/端口號(hào) 默認(rèn)用戶及密碼:guest,如果沒有配置可根據(jù) 設(shè)置允許遠(yuǎn)程訪問中進(jìn)行用戶名密碼配置

5.4 再看看隊(duì)列的消息,已經(jīng)被消費(fèi)了,Ready值為0,Total值也為0了。

我們發(fā)現(xiàn),消費(fèi)者已經(jīng)獲取了消息,但是程序沒有停止,一直在監(jiān)聽隊(duì)列中是否有新的消息。一旦有新的消息進(jìn)入隊(duì)列,就會(huì)立即打印

1.2 消息確認(rèn)機(jī)制ACK

通過剛才的案例可以看出,消息一旦被消費(fèi)者接收,隊(duì)列中的消息就會(huì)被刪除。

那么問題來了:RabbitMQ怎么知道消息被接收了呢?

如果消費(fèi)者領(lǐng)取消息后,還沒執(zhí)行操作就掛掉了呢?或者拋出了異常?消息消費(fèi)失敗,但是RabbitMQ無從得知,這樣消息就丟失了!

因此,RabbitMQ有一個(gè)ACK機(jī)制。當(dāng)消費(fèi)者獲取消息后,會(huì)向RabbitMQ發(fā)送回執(zhí)ACK,告知消息已經(jīng)被接收。不過這種回執(zhí)ACK分兩種情況:

? 自動(dòng)ACK:消息一旦被接收,消費(fèi)者自動(dòng)發(fā)送ACK

? 手動(dòng)ACK:消息接收后,不會(huì)發(fā)送ACK,需要手動(dòng)調(diào)用

大家覺得哪種更好呢?

這需要看消息的重要性:

? 如果消息不太重要,丟失也沒有影響,那么自動(dòng)ACK會(huì)比較方便

? 如果消息非常重要,不容丟失。那么最好在消費(fèi)完成后手動(dòng)ACK,否則接收消息后就自動(dòng)ACK,RabbitMQ就會(huì)把消息從隊(duì)列中刪除。如果此時(shí)消費(fèi)者宕機(jī),那么消息就丟失了。

之前的測試都是自動(dòng)ACK的,如果要手動(dòng)ACK,需要改動(dòng)我們的代碼。

1、 在org.example.simple包下創(chuàng)建ACKReceiver類,用于消費(fèi)者接收消息

public class ACKReceiver {

private final static String QUEUE_NAME = "simple_queue";

public static void main(String[] argv) throws Exception {

// 獲取到連接

Connection connection = ConnectionUtil.getConnection();

// 創(chuàng)建通道

final Channel channel = connection.createChannel();

// 聲明隊(duì)列

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 定義隊(duì)列的消費(fèi)者

DefaultConsumer consumer = new DefaultConsumer(channel) {

// 獲取消息,并且處理,這個(gè)方法類似事件監(jiān)聽,如果有消息的時(shí)候,會(huì)被自動(dòng)調(diào)用

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

// body 即消息體

String msg = new String(body);

System.out.println(" [x] received : " + msg + "!");

// 手動(dòng)進(jìn)行ACK

/*

* void basicAck(long deliveryTag, boolean multiple) throws IOException;

* deliveryTag:用來標(biāo)識(shí)消息的id

* multiple:是否批量.true:將一次性ack所有小于deliveryTag的消息。

*/

channel.basicAck(envelope.getDeliveryTag(), false);

}

};

// 監(jiān)聽隊(duì)列,第二個(gè)參數(shù)false,手動(dòng)進(jìn)行ACK

channel.basicConsume(QUEUE_NAME, false, consumer);

}

}

ACKReceiver類與Receiver類最大的區(qū)別就是在消息消費(fèi)的時(shí)候添加了channel.basicAck(envelope.getDeliveryTag(), false);channel.basicConsume(QUEUE_NAME, false, consumer);

2、自動(dòng)ACK存在的問題

2.1 修改消費(fèi)者Receiver類的代碼

因?yàn)镽eceiver類是采用自動(dòng)ACK,在handleDelivery方法中添加異常,如下:

2.2 生產(chǎn)者Send類不做任何修改,直接運(yùn)行Send類中的main方法,

消息發(fā)送成功,再訪問到RabbitMQ的web界面(注:之前啟動(dòng)的Receiver消費(fèi)者要停掉服務(wù)),

2.3 運(yùn)行Receiver類消費(fèi)者中的main方法,程序拋出異常:

2.4再查看rabbitmq的web管理界面:

消費(fèi)者拋出異常,但是消息依然被消費(fèi),實(shí)際上我們還沒獲取到消息。

3、演示手動(dòng)ACK

注意:先把Receiver消費(fèi)者服務(wù)停止掉

3.1 重新運(yùn)行生產(chǎn)者Send中的main方法,實(shí)現(xiàn)發(fā)送消息,

消息發(fā)送成功后,再次查看web管理界面,效果如下所示,隊(duì)列中收到消息一條。

3.2 再修改ACKReceiver類中的handleDelivery方法

增加如下圖紅框里的異常代碼(模擬手動(dòng)進(jìn)行ack前拋出異常)。

3.3 再運(yùn)行ACKReceiver類中的main方法,程序拋出異常:

3.4 查看web管理頁面:

消息沒有被消費(fèi)掉!

這是因?yàn)殡m然我們設(shè)置了手動(dòng)ACK,但是代碼中并沒有進(jìn)行消息確認(rèn)!所以消息并未被真正消費(fèi)掉。

4、最后消息確認(rèn)機(jī)制的正確做法

4.1 我們要在監(jiān)聽隊(duì)列時(shí)設(shè)置第二個(gè)參數(shù)為false,代碼中手動(dòng)進(jìn)行ACK

代碼如下圖紅框所示(之前異常的代碼需要注釋掉)

4.2 最后運(yùn)行ACKReceiver類中的main方法,查看web管理頁面

消費(fèi)者消費(fèi)成功!

生產(chǎn)者避免數(shù)據(jù)丟失:https://www.cnblogs.com/vipstone/p/9350075.html

4.5.2 work工作隊(duì)列模式

工作隊(duì)列或者競爭消費(fèi)者模式

work queues與入門程序(基本消息模式)相比,多了一個(gè)消費(fèi)端,兩個(gè)消費(fèi)端共同消費(fèi)同一個(gè)隊(duì)列中的消息,但是一個(gè)消息只能被一個(gè)消費(fèi)者獲取。

這個(gè)消息模型在Web應(yīng)用程序中特別有用,可以處理短的HTTP請求窗口中無法處理復(fù)雜的任務(wù)。

接下來我們來模擬這個(gè)流程:

P:生產(chǎn)者:任務(wù)的發(fā)布者

C1:消費(fèi)者1:領(lǐng)取任務(wù)并且完成任務(wù),假設(shè)完成速度較慢(模擬耗時(shí))

C2:消費(fèi)者2:領(lǐng)取任務(wù)并且完成任務(wù),假設(shè)完成速度較快

2.1 案例實(shí)戰(zhàn)

1、生產(chǎn)者

在org.example.work包中創(chuàng)建Send類,生產(chǎn)者循環(huán)發(fā)送50條消息

public class Send {

private final static String QUEUE_NAME = "test_work_queue";

public static void main(String[] argv) throws Exception {

// 獲取到連接

Connection connection = ConnectionUtil.getConnection();

// 獲取通道

Channel channel = connection.createChannel();

// 聲明隊(duì)列

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 循環(huán)發(fā)布任務(wù)

for (int i = 0; i < 50; i++) {

// 消息內(nèi)容

String message = "task .. " + i;

// 向指定的隊(duì)列中發(fā)送消息

//參數(shù):String exchange, String routingKey, BasicProperties props, byte[] body

/**

* 參數(shù)明細(xì):

* 1、exchange,交換機(jī),如果不指定將使用mq的默認(rèn)交換機(jī)(設(shè)置為"")

* 2、routingKey,路由key,交換機(jī)根據(jù)路由key來將消息轉(zhuǎn)發(fā)到指定的隊(duì)列,如果使用默認(rèn)交換機(jī),routingKey設(shè)置為隊(duì)列的名稱

* 3、props,消息的屬性

* 4、body,消息內(nèi)容

*/

channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

System.out.println(" [x] Sent '" + message + "'");

// 模擬網(wǎng)絡(luò)延時(shí)

Thread.sleep(i * 2);

}

// 關(guān)閉通道和連接

channel.close();

connection.close();

}

}

2、消費(fèi)者1

在org.example.work包中創(chuàng)建Receiver1類

public class Receiver1 {

private final static String QUEUE_NAME = "test_work_queue";

public static void main(String[] argv) throws Exception {

// 獲取到連接

Connection connection = ConnectionUtil.getConnection();

//創(chuàng)建會(huì)話通道,生產(chǎn)者和mq服務(wù)所有通信都在channel通道中完成

Channel channel = connection.createChannel();

// 聲明隊(duì)列

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

//實(shí)現(xiàn)消費(fèi)方法

DefaultConsumer consumer = new DefaultConsumer(channel){

// 獲取消息,并且處理,這個(gè)方法類似事件監(jiān)聽,如果有消息的時(shí)候,會(huì)被自動(dòng)調(diào)用

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

// body 即消息體

String msg = new String(body,"utf-8");

System.out.println(" [消費(fèi)者1] received : " + msg + "!");

//模擬任務(wù)耗時(shí)1s

try {

TimeUnit.SECONDS.sleep(1);

} catch (Exception e) {

e.printStackTrace();

}

}

};

// 監(jiān)聽隊(duì)列,第二個(gè)參數(shù):是否自動(dòng)進(jìn)行消息確認(rèn)。

channel.basicConsume(QUEUE_NAME, true, consumer);

}

}

3、消費(fèi)者2

在org.example.work包中創(chuàng)建Receiver2類

public class Receiver2 {

private final static String QUEUE_NAME = "test_work_queue";

public static void main(String[] argv) throws Exception {

// 獲取到連接

Connection connection = ConnectionUtil.getConnection();

//創(chuàng)建會(huì)話通道,生產(chǎn)者和mq服務(wù)所有通信都在channel通道中完成

Channel channel = connection.createChannel();

// 聲明隊(duì)列

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

//實(shí)現(xiàn)消費(fèi)方法

DefaultConsumer consumer = new DefaultConsumer(channel){

// 獲取消息,并且處理,這個(gè)方法類似事件監(jiān)聽,如果有消息的時(shí)候,會(huì)被自動(dòng)調(diào)用

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

// body 即消息體

String msg = new String(body,"utf-8");

System.out.println(" [消費(fèi)者1] received : " + msg + "!");

}

};

// 監(jiān)聽隊(duì)列,第二個(gè)參數(shù):是否自動(dòng)進(jìn)行消息確認(rèn)。

channel.basicConsume(QUEUE_NAME, true, consumer);

}

}

4、進(jìn)行消息消費(fèi)

接下來,兩個(gè)消費(fèi)者一同啟動(dòng),然后發(fā)送50條消息(先將兩個(gè)消費(fèi)者一起啟動(dòng),再啟動(dòng)生產(chǎn)者):

2.2 能者多勞

剛才的實(shí)現(xiàn)有問題嗎?

? 消費(fèi)者1比消費(fèi)者2的效率要低,一次任務(wù)的耗時(shí)較長

? 然而兩人最終消費(fèi)的消息數(shù)量是一樣的

? 消費(fèi)者2大量時(shí)間處于空閑狀態(tài),消費(fèi)者1一直忙碌

現(xiàn)在的狀態(tài)屬于是把任務(wù)平均分配,正確的做法應(yīng)該是消費(fèi)越快的人,消費(fèi)的越多。

怎么實(shí)現(xiàn)呢?

通過BasicQos方法設(shè)置prefetchCount = 1。這樣RabbitMQ就會(huì)使得每個(gè)Consumer在同一個(gè)時(shí)間點(diǎn)最多處理1個(gè)Message。換句話說,在接收到該Consumer的ack前,它不會(huì)將新的Message分發(fā)給它。相反,它會(huì)將其分派給不是仍然忙碌的下一個(gè)Consumer。

值得注意的是:prefetchCount在手動(dòng)ack的情況下才生效,自動(dòng)ack不生效。

注意: 需要在Receiver1和Receiver2中添加紅框中的代碼進(jìn)行設(shè)置

2.3 訂閱模型分類

1. 說明

1、一個(gè)生產(chǎn)者多個(gè)消費(fèi)者

2、每個(gè)消費(fèi)者都有一個(gè)自己的隊(duì)列

3、生產(chǎn)者沒有將消息直接發(fā)送給隊(duì)列,而是發(fā)送給exchange(交換機(jī)、轉(zhuǎn)發(fā)器)

4、每個(gè)隊(duì)列都需要綁定到交換機(jī)上

5、生產(chǎn)者發(fā)送的消息,經(jīng)過交換機(jī)到達(dá)隊(duì)列,實(shí)現(xiàn)一個(gè)消息被多個(gè)消費(fèi)者消費(fèi)

例子:注冊->發(fā)郵件、發(fā)短信

X(Exchanges):交換機(jī)一方面:接收生產(chǎn)者發(fā)送的消息。另一方面:知道如何處理消息,例如遞交給某個(gè)特別隊(duì)列、遞交給所有隊(duì)列、或是將消息丟棄。到底如何操作,取決于Exchange的類型。

2. Exchange類型有以下幾種:

? Fanout: 廣播,將消息交給所有綁定到交換機(jī)的隊(duì)列 (它是沒有routing key路由鍵)

? Direct:定向,把消息交給符合指定routing key 的隊(duì)列 (重點(diǎn)) (路由鍵是寫死的字符串)

? Topic:通配符,把消息交給符合routing pattern(路由模式) 的隊(duì)列(重點(diǎn)) (路由鍵是采用通配符#、進(jìn)行動(dòng)態(tài)匹配)

? Header: header模式與routing不同的地方在于,header模式取消routingkey,使用header中的 key/value(鍵值對)匹配隊(duì)列。

Header模式不展開了,感興趣可以參考這篇文章https://blog.csdn.net/zhu_tianwei/article/details/40923131

Exchange(交換機(jī))只負(fù)責(zé)轉(zhuǎn)發(fā)消息,不具備存儲(chǔ)消息的能力,因此如果沒有任何隊(duì)列與Exchange綁定,或者沒有符合路由規(guī)則的隊(duì)列,那么消息會(huì)丟失!

4.5.3 Publish/subscribe | 發(fā)布/訂閱模式 (交換機(jī)類型:Fanout,也稱為廣播)

(廣播模式中沒有routing key,是從路由模式開始才有)

Publish/subscribe模型示意圖 :

3.1 案例實(shí)戰(zhàn)

1、生產(chǎn)者

和前面兩種模式不同:

1) 聲明Exchange,不再聲明Queue

2) 發(fā)送消息到Exchange,不再發(fā)送到Queue

在org.example.publishsubscribe包中創(chuàng)建Send類

public class Send {

private final static String EXCHANGE_NAME = "test_fanout_exchange";

public static void main(String[] argv) throws Exception {

// 獲取到連接

Connection connection = ConnectionUtil.getConnection();

// 獲取通道

Channel channel = connection.createChannel();

// 聲明exchange,指定類型為fanout

channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

// 消息內(nèi)容

String message = "注冊成功!!";

// 發(fā)布消息到Exchange (廣播模式下是沒有routingKey,所以參數(shù)二為””)

// 向指定的隊(duì)列中發(fā)送消息

//參數(shù):String exchange, String routingKey, BasicProperties props, byte[] body

/**

* 參數(shù)明細(xì):

* 1、exchange,交換機(jī),如果不指定將使用mq的默認(rèn)交換機(jī)(設(shè)置為"")

* 2、routingKey,路由key,交換機(jī)根據(jù)路由key來將消息轉(zhuǎn)發(fā)到指定的隊(duì)列,如果使用默認(rèn)交換機(jī),routingKey設(shè)置為隊(duì)列的名稱

* 3、props,消息的屬性

* 4、body,消息內(nèi)容

*/

channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());

System.out.println(" [生產(chǎn)者] Sent '" + message + "'");

channel.close();

connection.close();

}

}

2、消費(fèi)者1(注冊成功發(fā)給短信服務(wù))

在org.example.publishsubscribe包中創(chuàng)建Receiver1類

public class Receiver1 {

private final static String QUEUE_NAME = "fanout_exchange_queue_sms";//短信隊(duì)列

private final static String EXCHANGE_NAME = "test_fanout_exchange";

public static void main(String[] argv) throws Exception {

// 獲取到連接

Connection connection = ConnectionUtil.getConnection();

// 獲取通道

Channel channel = connection.createChannel();

// 聲明隊(duì)列

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 綁定隊(duì)列到交換機(jī)

channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

// 定義隊(duì)列的消費(fèi)者

DefaultConsumer consumer = new DefaultConsumer(channel) {

// 獲取消息,并且處理,這個(gè)方法類似事件監(jiān)聽,如果有消息的時(shí)候,會(huì)被自動(dòng)調(diào)用

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

// body 即消息體

String msg = new String(body);

System.out.println(" [短信服務(wù)] received : " + msg + "!");

}

};

// 監(jiān)聽隊(duì)列,自動(dòng)返回完成

channel.basicConsume(QUEUE_NAME, true, consumer);

}

}

3、消費(fèi)者2(注冊成功發(fā)給郵件服務(wù))

在org.example.publishsubscribe包中創(chuàng)建Receiver2類

public class Receiver2 {

//郵件隊(duì)列

private final static String QUEUE_NAME = "fanout_exchange_queue_email";

private final static String EXCHANGE_NAME = "test_fanout_exchange";

public static void main(String[] argv) throws Exception {

// 獲取到連接

Connection connection = ConnectionUtil.getConnection();

// 獲取通道

Channel channel = connection.createChannel();

// 聲明隊(duì)列

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 綁定隊(duì)列到交換機(jī)

channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

// 定義隊(duì)列的消費(fèi)者

DefaultConsumer consumer = new DefaultConsumer(channel) {

// 獲取消息,并且處理,這個(gè)方法類似事件監(jiān)聽,如果有消息的時(shí)候,會(huì)被自動(dòng)調(diào)用

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

// body 即消息體

String msg = new String(body);

System.out.println(" [郵件服務(wù)] received : " + msg + "!");

}

};

// 監(jiān)聽隊(duì)列,自動(dòng)返回完成

channel.basicConsume(QUEUE_NAME, true, consumer);

}

}

4、進(jìn)行消息消費(fèi)

我們運(yùn)行兩個(gè)消費(fèi)者,然后發(fā)送1條消息:

注意: 啟動(dòng)有可能會(huì)報(bào)錯(cuò):channel error; protocol method: #method(reply-code=404, reply-text=NOT_FOUND - no exchange ‘test_fanout_exchange’ in vhost ‘/’, class-id=50, method-id=20)

報(bào)這個(gè)錯(cuò)誤,證明我們沒有聲明交換機(jī),卻拿來使用了,所以我們需要先啟動(dòng)生產(chǎn)者進(jìn)行交換機(jī)聲明,然后在按照上面的流程走就沒有問題了

5、思考

5.1 publish/subscribe與work queues有什么區(qū)別。

區(qū)別:

1)work queues不用定義交換機(jī),而publish/subscribe需要定義交換機(jī)。

2)publish/subscribe的生產(chǎn)方是面向交換機(jī)發(fā)送消息,work queues的生產(chǎn)方是面向隊(duì)列發(fā)送消息(底層使用默認(rèn)交換機(jī))。

3)publish/subscribe需要設(shè)置隊(duì)列和交換機(jī)的綁定,work queues不需要設(shè)置,實(shí)際上work queues會(huì)將隊(duì)列綁定到默認(rèn)的交換機(jī) 。

相同點(diǎn):

所以兩者實(shí)現(xiàn)的發(fā)布/訂閱的效果是一樣的,多個(gè)消費(fèi)端監(jiān)聽同一個(gè)隊(duì)列不會(huì)重復(fù)消費(fèi)消息。

5.2 實(shí)際工作用 publish/subscribe還是work queues?

建議使用 publish/subscribe,發(fā)布訂閱模式比工作隊(duì)列模式更強(qiáng)大(也可以做到同一隊(duì)列競爭),并且發(fā)布訂閱模式(廣播模式)可以指定自己專用的交換機(jī)。

4.5.4 Routing 路由模式(交換機(jī)類型:direct)

(路由模式中的routing key路由鍵格式為寫死的字符串,而Topic通配符模式中的routing key是使用通配符#和*來匹配多個(gè)或一個(gè)routing key,而通過routing key來實(shí)現(xiàn)將隊(duì)列與交換機(jī)進(jìn)行綁定)

Routing模型示意圖:

P:生產(chǎn)者,向Exchange發(fā)送消息,發(fā)送消息時(shí),會(huì)指定一個(gè)routing key。

X:Exchange(交換機(jī)),接收生產(chǎn)者的消息,然后把消息遞交給與routing key完全匹配的隊(duì)列

C1:消費(fèi)者,其所在隊(duì)列指定了需要routing key 為 error 的消息

C2:消費(fèi)者,其所在隊(duì)列指定了需要routing key 為 info、error、warning 的消息

4.1 案例實(shí)戰(zhàn)

1、生產(chǎn)者

在org.example.routingkey包中創(chuàng)建Send類

public class Send {

private final static String EXCHANGE_NAME = "test_direct_exchange";

public static void main(String[] argv) throws Exception {

// 獲取到連接

Connection connection = ConnectionUtil.getConnection();

// 獲取通道

Channel channel = connection.createChannel();

// 聲明exchange,指定類型為direct

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

// 消息內(nèi)容,

String message = "注冊成功!請短信回復(fù)[T]退訂";

// 發(fā)送消息,并且指定routing key 為:sms,只有短信服務(wù)能接收到消息

channel.basicPublish(EXCHANGE_NAME, "sms", null, message.getBytes());

System.out.println(" [x] Sent '" + message + "'");

channel.close();

connection.close();

}

}

注:上述生產(chǎn)者在發(fā)送消息時(shí),是指定routing key為sms,而根據(jù)上面提到的Routing模型示意圖,生產(chǎn)者將消息發(fā)送給exchange交換機(jī),交換機(jī)再通過routing key與queue隊(duì)列進(jìn)行綁定,我們把sms作為短信的路由鍵。

? 在下面的消費(fèi)者中使用routing key為sms將隊(duì)列與交換機(jī)進(jìn)行綁定后,就可以接收到生產(chǎn)者routing key為sms的消息了,換句話其他消費(fèi)者如果沒有使用routing key為sms綁定隊(duì)列與交換機(jī),就獲取不到生產(chǎn)者發(fā)送的消息了(消費(fèi)者2就沒有使用routing key為sms來綁定隊(duì)列與交換機(jī))。

2、消費(fèi)者1(使用routing key為sms來綁定隊(duì)列與交換機(jī))

在org.example.routingkey包中創(chuàng)建Receiver1類

public class Receiver1 {

private final static String QUEUE_NAME = "direct_exchange_queue_sms";//短信隊(duì)列

private final static String EXCHANGE_NAME = "test_direct_exchange";

public static void main(String[] argv) throws Exception {

// 獲取到連接

Connection connection = ConnectionUtil.getConnection();

// 獲取通道

Channel channel = connection.createChannel();

// 聲明隊(duì)列

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 綁定隊(duì)列到交換機(jī),同時(shí)指定需要訂閱的routing key??梢灾付ǘ鄠€(gè)

channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "sms");//指定接收發(fā)送方指定routing key為sms的消息

//channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "email");

// 定義隊(duì)列的消費(fèi)者

DefaultConsumer consumer = new DefaultConsumer(channel) {

// 獲取消息,并且處理,這個(gè)方法類似事件監(jiān)聽,如果有消息的時(shí)候,會(huì)被自動(dòng)調(diào)用

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

// body 即消息體

String msg = new String(body);

System.out.println(" [短信服務(wù)] received : " + msg + "!");

}

};

// 監(jiān)聽隊(duì)列,自動(dòng)ACK

channel.basicConsume(QUEUE_NAME, true, consumer);

}

}

3、消費(fèi)者2(用routing key為email來綁定隊(duì)列與交換機(jī))

在org.example.routingkey包中創(chuàng)建Receiver2類

public class Receiver2 {

//郵件隊(duì)列

private final static String QUEUE_NAME = "direct_exchange_queue_email"; private final static String EXCHANGE_NAME = "test_direct_exchange";

public static void main(String[] argv) throws Exception {

// 獲取到連接

Connection connection = ConnectionUtil.getConnection();

// 獲取通道

Channel channel = connection.createChannel();

// 聲明隊(duì)列

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 綁定隊(duì)列到交換機(jī),同時(shí)指定需要訂閱的routing key??梢灾付ǘ鄠€(gè)

channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "email");//指定接收發(fā)送方指定routing key為email的消息

// 定義隊(duì)列的消費(fèi)者

DefaultConsumer consumer = new DefaultConsumer(channel) {

// 獲取消息,并且處理,這個(gè)方法類似事件監(jiān)聽,如果有消息的時(shí)候,會(huì)被自動(dòng)調(diào)用

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

// body 即消息體

String msg = new String(body);

System.out.println(" [郵件服務(wù)] received : " + msg + "!");

}

};

// 監(jiān)聽隊(duì)列,自動(dòng)ACK

channel.basicConsume(QUEUE_NAME, true, consumer);

}

}

我們發(fā)送sms的RoutingKey,發(fā)現(xiàn)結(jié)果:只有指定短信的消費(fèi)者1收到消息了(因?yàn)橄M(fèi)者1在綁定交換機(jī)的時(shí)候使用了sms這個(gè)routingkey)

4.5.5 Topics通配符模式(交換機(jī)類型:topics)

路由模式中的routing key路由鍵格式為寫死的字符串,而Topic通配符模式中的routing key是使用通配符#和*來匹配多個(gè)或一個(gè)routing key,而通過routing key來實(shí)現(xiàn)將隊(duì)列與交換機(jī)進(jìn)行綁定

4.1 Topics模型示意圖:

每個(gè)消費(fèi)者監(jiān)聽自己的隊(duì)列,并且設(shè)置帶通配符的routingkey,生產(chǎn)者將消息發(fā)給broker,由交換機(jī)根據(jù)routingkey來轉(zhuǎn)發(fā)消息到指定的隊(duì)列。

Routingkey一般都是有一個(gè)或者多個(gè)單詞組成,多個(gè)單詞之間以“.”分割,例如:inform.sms

4.2 通配符規(guī)則

星號(hào)(*):匹配不多不少恰好1個(gè)詞。井號(hào)(#):匹配一個(gè)或多個(gè)詞

4.3 舉例

如示意圖所示

*.orange.* : 只能匹配 test.orange.test (只能匹配一個(gè)詞)

*.*.rabbit : 只能匹配 test.test.rabbit (只能匹配兩個(gè)詞)

lazy.# : 可以匹配 lazy.test 和 lazy.test.test (可以匹配一個(gè)或多個(gè)詞)

4.4 案例實(shí)戰(zhàn)

1、生產(chǎn)者

在org.example.topics包中創(chuàng)建Send類

public class Send {

private final static String EXCHANGE_NAME = "test_topic_exchange";

public static void main(String[] argv) throws Exception {

// 獲取到連接

Connection connection = ConnectionUtil.getConnection();

// 獲取通道

Channel channel = connection.createChannel();

// 聲明exchange,指定類型為topic

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

// 消息內(nèi)容

String message = "這是一只行動(dòng)迅速的橙色的兔子";

// 發(fā)送消息,并且指定routing key為:quick.orange.rabbit

channel.basicPublish(EXCHANGE_NAME, "quick.orange.rabbit", null, message.getBytes());

System.out.println(" [動(dòng)物描述:] Sent '" + message + "'");

channel.close();

connection.close();

}

}

2、消費(fèi)者1

在org.example.topics包中創(chuàng)建Receiver1類

public class Receiver1 {

private final static String QUEUE_NAME = "topic_exchange_queue_Q1";

private final static String EXCHANGE_NAME = "test_topic_exchange";

public static void main(String[] argv) throws Exception {

// 獲取到連接

Connection connection = ConnectionUtil.getConnection();

// 獲取通道

Channel channel = connection.createChannel();

// 聲明隊(duì)列

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 綁定隊(duì)列到交換機(jī),同時(shí)指定需要訂閱的routing key。訂閱所有的橙色動(dòng)物

channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.orange.*");

// 定義隊(duì)列的消費(fèi)者

DefaultConsumer consumer = new DefaultConsumer(channel) {

// 獲取消息,并且處理,這個(gè)方法類似事件監(jiān)聽,如果有消息的時(shí)候,會(huì)被自動(dòng)調(diào)用

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {

// body 即消息體

String msg = new String(body);

System.out.println(" [消費(fèi)者1] received : " + msg + "!");

}

};

// 監(jiān)聽隊(duì)列,自動(dòng)ACK

channel.basicConsume(QUEUE_NAME, true, consumer);

}

}

? 注:上框代碼中消費(fèi)者在將隊(duì)列與交換機(jī)進(jìn)行綁定時(shí),routing key是使用通配符模式來匹配生產(chǎn)者在發(fā)送消息時(shí)所指定的routing key,而非之前Routing路由模式是指定具體的某個(gè)routing key(之前路由模式是:生產(chǎn)者發(fā)送消息并指定routing key為test,這時(shí)消費(fèi)者在將隊(duì)列與交換機(jī)進(jìn)行綁定時(shí),如果指定了routing key為test,則可以接收到生產(chǎn)者發(fā)送的消息,反之不行,之前路由模式中的生產(chǎn)者與消費(fèi)者的routing key是直接寫死,而通配模式中消費(fèi)者綁定隊(duì)列與交換機(jī)時(shí)的routing key為使用通配符的形式進(jìn)行匹配)

? 上框代碼中*.orange.*是可以匹配到生產(chǎn)者中的quick.orange.rabbit,因此消費(fèi)者1是可以接收到生產(chǎn)者發(fā)送的消息。

3、消費(fèi)者2

在org.example.topics包中創(chuàng)建Receiver2類

public class Receiver2 {

private final static String QUEUE_NAME = "topic_exchange_queue_Q2";

private final static String EXCHANGE_NAME = "test_topic_exchange";

public static void main(String[] argv) throws Exception {

// 獲取到連接

Connection connection = ConnectionUtil.getConnection();

// 獲取通道

Channel channel = connection.createChannel();

// 聲明隊(duì)列

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 綁定隊(duì)列到交換機(jī),同時(shí)指定需要訂閱的routing key。訂閱關(guān)于兔子以及懶惰動(dòng)物的消息

channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*.rabbit");

channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "lazy.#");

// 定義隊(duì)列的消費(fèi)者

DefaultConsumer consumer = new DefaultConsumer(channel) {

// 獲取消息,并且處理,這個(gè)方法類似事件監(jiān)聽,如果有消息的時(shí)候,會(huì)被自動(dòng)調(diào)用

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {

// body 即消息體

String msg = new String(body);

System.out.println(" [消費(fèi)者2] received : " + msg + "!");

}

};

// 監(jiān)聽隊(duì)列,自動(dòng)ACK

channel.basicConsume(QUEUE_NAME, true, consumer);

}

}

上框代碼中*.*.rabbit是可以匹配到生產(chǎn)者中的quick.orange.rabbit,它是可以接收到生產(chǎn)者發(fā)送的消息,但是lazy.#是不可以匹配quick.orange.rabbit,故懶兔子是接收不到生產(chǎn)者發(fā)送的消息。結(jié)果消費(fèi)者1和消費(fèi)者2都接收到消息了

4.6 SpringBoot整合RabbitMQ

4.6.1 創(chuàng)建SpringBoot項(xiàng)目

根據(jù)下圖進(jìn)行SpringBoot項(xiàng)目創(chuàng)建 注意: 選擇JDK1.8的建議用3.0以下的SpringBoot版本,3.0或以上的SpringBoot版本建議使用JDK17或更高版本

4.6.2 添加依賴

在pom文件里面添加如下依賴

org.springframework.boot

spring-boot-starter-amqp

4.6.3 添加配置

yml配置文件添加

server:

port: 8080

spring:

rabbitmq:

host: 服務(wù)器的主機(jī)名或IP地址

port: 5672

username: rabbitMQ賬號(hào)

password: rabbitMQ密碼

# 虛擬主機(jī)名,默認(rèn)為"/"

virtual-host: /

# 發(fā)布者確認(rèn)模式

publisher-confirm-type: correlated

# 是否啟用發(fā)布者的返回功能

publisher-returns: true

# 模版配置

template:

retry:

# 發(fā)布重試,默認(rèn)false

enabled: true

# 重試時(shí)間 默認(rèn)1000ms

initial-interval: 10000ms

# 重試最大間隔時(shí)間

max-interval: 300000ms

# 重試的時(shí)間隔乘數(shù),比如配2,0 第一次等于10s,第二次等于20s,第三次等于40s

multiplier: 2

# 交換機(jī)類型

exchange: topic.exchange

listener:

# 默認(rèn)配置是simple

type: simple

simple:

# 手動(dòng)ack Acknowledge mode of container. auto none

acknowledge-mode: manual

# 消費(fèi)者調(diào)用程序線程的最小數(shù)量

concurrency: 10

# 消費(fèi)者最大數(shù)量

max-concurrency: 10

# 限制消費(fèi)者每次只處理一條信息,處理完在繼續(xù)下一條

prefetch: 1

# 啟動(dòng)時(shí)是否默認(rèn)啟動(dòng)容器

auto-startup: true

# 被拒絕時(shí)重新進(jìn)入隊(duì)列

default-requeue-rejected: true

4.6.4 添加配置類

在com.example.rabbitmq.config包中創(chuàng)建RabbitmqConfig類

@Configuration

public class RabbitmqConfig {

public static final String QUEUE_EMAIL = "queue_email"; // email隊(duì)列

public static final String QUEUE_SMS = "queue_sms"; // sms隊(duì)列

public static final String EXCHANGE_NAME="topic.exchange"; // topics類型交換機(jī)

// routingkey的值通常是使用了通配符,#代表匹配一個(gè)或多個(gè),*代表匹配一個(gè)

public static final String ROUTINGKEY_EMAIL="topic.#.email.#"; // routingkey路由鍵

public static final String ROUTINGKEY_SMS="topic.#.sms.#";

// 聲明交換機(jī)(構(gòu)建topic類型的交換機(jī))

@Bean(EXCHANGE_NAME)

public Exchange exchange(){

// durable(true) 持久化,rabbitmq重啟之后交換機(jī)還在

return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();

}

// 聲明email隊(duì)列

/*

* new Queue(QUEUE_EMAIL,true,false,false)

* durable="true" 持久化 rabbitmq重啟的時(shí)候不需要?jiǎng)?chuàng)建新的隊(duì)列

* auto-delete 表示消息隊(duì)列沒有在使用時(shí)將被自動(dòng)刪除 默認(rèn)是false

* exclusive 表示該消息隊(duì)列是否只在當(dāng)前connection生效,默認(rèn)是false

*/

@Bean(QUEUE_EMAIL)

public Queue emailQueue(){

return new Queue(QUEUE_EMAIL);

}

// 聲明sms隊(duì)列

@Bean(QUEUE_SMS)

public Queue smsQueue(){

return new Queue(QUEUE_SMS);

}

// ROUTINGKEY_EMAIL隊(duì)列綁定交換機(jī),指定routingKey

@Bean

public Binding bindingEmail(@Qualifier(QUEUE_EMAIL) Queue queue,

@Qualifier(EXCHANGE_NAME) Exchange exchange){

return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();

}

// 使用routingkey實(shí)現(xiàn)queue與exchange兩者間的綁定

// norags()表示無參

//ROUTINGKEY_SMS隊(duì)列綁定交換機(jī),指定routingKey

@Bean

public Binding bindingSMS(@Qualifier(QUEUE_SMS) Queue queue,

@Qualifier(EXCHANGE_NAME) Exchange exchange){

return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();

}

}

4.6.5 在測試類中添加生產(chǎn)者,并發(fā)送消息

@Resource

private RabbitTemplate rabbitTemplate;

@Test

void contextLoads() {

/**

* 參數(shù):

* 1、交換機(jī)名稱

* 2、routingKey 是用來讓交換機(jī)通過routingKey將消息發(fā)送給所對應(yīng)的隊(duì)列

* 3、消息內(nèi)容

*/

for (int i = 0; i < 5; i++) {

String message = "恭喜您,注冊成功!userid=" + i;

rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME, "topic.sms.email", message);

// 交換機(jī)綁定隊(duì)列是通過routingkey,而此處代表的routingkey為topic.sms.email,它與可以匹配到RabbitmqConfig類中的ROUTINGKEY_EMAIL和ROUTINGKEY_SMS,

// 即交換機(jī)通過ROUTINGKEY_EMAIL和ROUTINGKEY_SMS將message變量的值作為消息發(fā)送到queue_email和queue_sms兩個(gè)隊(duì)列,因?yàn)榇颂幋a是使用for循環(huán),即向這兩個(gè)隊(duì)列分別發(fā)送了5次消息。

System.out.println(" [x] Sent '" + message + "'");

}

}

啟動(dòng)測試類

4.6.6 生產(chǎn)者發(fā)送消息測試結(jié)果

web管理界面: 可以看到已經(jīng)創(chuàng)建了交換機(jī)以及queue_email、queue_sms 2個(gè)隊(duì)列,并且向這兩個(gè)隊(duì)列分別發(fā)送了5條消息:

4.6.7 創(chuàng)建消息接收器(消費(fèi)者)

在com.example.rabbitmq.receiver包中創(chuàng)建ReceiveHandler類

@Component

public class ReceiveHandler {

// 監(jiān)聽郵件隊(duì)列

@RabbitListener(bindings = @QueueBinding(

value = @Queue(value = "queue_email", durable = "true"),

exchange = @Exchange(

value = "topic.exchange",

ignoreDeclarationExceptions = "true",

type = ExchangeTypes.TOPIC

),

key = {"topic.#.email.#","email.*"}))

public void receive_email(Message msg, Channel channel){

// 消息的唯一標(biāo)識(shí)

long deliveryTag = msg.getMessageProperties().getDeliveryTag();

System.out.println(" [郵件服務(wù)] received : " + new String(msg.getBody()) + "!");

try {

channel.basicAck(deliveryTag, false);

} catch (IOException e) {

throw new RuntimeException(e);

}

}

// 監(jiān)聽短信隊(duì)列

@RabbitListener(bindings = @QueueBinding(

value = @Queue(value = "queue_sms", durable = "true"),

exchange = @Exchange(

value = "topic.exchange",

ignoreDeclarationExceptions = "true",

type = ExchangeTypes.TOPIC

),

key = {"topic.#.sms.#"}))

public void receive_sms(Message msg, Channel channel){

// 消息的唯一標(biāo)識(shí)

long deliveryTag = msg.getMessageProperties().getDeliveryTag();

System.out.println(" [短信服務(wù)] received : " + new String(msg.getBody()) + "!");

try {

channel.basicAck(deliveryTag, false);

} catch (IOException e) {

throw new RuntimeException(e);

}

}

}

我們在配置文件中設(shè)置了手動(dòng)ACK機(jī)制,所以我們代碼也要進(jìn)行手動(dòng)ACK,不然會(huì)報(bào)錯(cuò)

屬性說明:

@Componet: 添加到類上的注解,將注解注解所標(biāo)識(shí)的類注冊到Spring容器

@RabbitListener: 方法上的注解,聲明這個(gè)方法是一個(gè)消費(fèi)者方法,需要指定下面的屬性:

? bindings: 指定綁定關(guān)系,可以有多個(gè)。值是@QueueBinding的數(shù)組。

? @QueueBinding包含下面屬性:

? ? value: 這個(gè)消費(fèi)者關(guān)聯(lián)的隊(duì)列。值是@Queue,代表一個(gè)隊(duì)列

? ? exchange: 隊(duì)列所綁定的交換機(jī),值是@Exchange類型

? ? key: 隊(duì)列和交換機(jī)綁定的RoutingKey,可指定多個(gè)

Message msg: 在這個(gè)上下文中,Message 是一個(gè)RabbitMQ消息的封裝。它包含了消息的內(nèi)容(body)以及一些其他的屬性(例如,消息的唯一標(biāo)識(shí),即deliveryTag,以及消息的優(yōu)先級(jí)等)。你可以將Message對象看作是一個(gè)封裝了消息和其相關(guān)屬性的對象。Channel channel: Channel 是RabbitMQ的一個(gè)關(guān)鍵組件,它提供了一個(gè)高效和可靠的方式來發(fā)送和接收消息。在生產(chǎn)者-消費(fèi)者模型中,生產(chǎn)者通過Channel將消息發(fā)送到RabbitMQ,消費(fèi)者則從Channel接收消息。Channel對象可以創(chuàng)建多個(gè),但是每個(gè)Channel都應(yīng)該有唯一的標(biāo)識(shí)符。

啟動(dòng)SPringBoot項(xiàng)目

4.6.8 消費(fèi)者消費(fèi)結(jié)果

效果如下所示

5、RabbitMQ問題相關(guān)解決方案

5.1 生產(chǎn)端可靠性投遞方案介紹

既然我們項(xiàng)目中用到的RabbitMQ,它有它的優(yōu)點(diǎn)比如:解耦、異常、流量削峰,但是我們還需要考慮額外的東西,比如消息的可靠性。

什么是消息的可靠性,我們從兩個(gè)方面來講解,第一個(gè)方面是怎么保證生產(chǎn)者的可靠性投遞,即確保生產(chǎn)端發(fā)出的消息能真真正正地投遞到了隊(duì)列,最后給消費(fèi)者消費(fèi)。第二個(gè)消費(fèi)者怎么去做冪等性的保證,也就是說我們使用RabbitMQ發(fā)送消息時(shí),會(huì)出現(xiàn)同一條消息會(huì)重復(fù)多發(fā)的情況。(即怎么保存消費(fèi)者只消費(fèi)一條消息,另外的重復(fù)發(fā)多的消息做丟棄)

5.1.1 關(guān)于怎么保證生產(chǎn)者的可靠性投遞

可以從以下三點(diǎn)去做:

怎么去保證消息的成功發(fā)出。去保證RabbitMQ成功接收到消息,即隊(duì)列要成功接收到消息。保證生產(chǎn)者能夠接收到RabbitMQ的確認(rèn)應(yīng)答,即隊(duì)列收到了消息需要應(yīng)答給生產(chǎn)者,生產(chǎn)者就能夠知道RabbitMQ是收到了消息,這條消息是發(fā)送成功。因?yàn)榇蠹叶贾繰abbitMQ本質(zhì)上就是生產(chǎn)者—>隊(duì)列—>消費(fèi)者,其中生產(chǎn)者只負(fù)責(zé)發(fā)送消息,然后隊(duì)列只負(fù)責(zé)消息的中轉(zhuǎn),而消費(fèi)者只負(fù)責(zé)消息的消費(fèi)。需要完善消息的補(bǔ)償機(jī)制。

關(guān)于具體實(shí)現(xiàn)保證生產(chǎn)者的可靠性投遞,市面上有兩種主流的方案。

5.1.2 消息發(fā)送

方案一:消息落庫,對消息狀態(tài)進(jìn)行標(biāo)記

? 解析:

? 如上圖所示,MSG_DB為消息數(shù)據(jù)庫,BIZ_DB為業(yè)務(wù)數(shù)據(jù)庫,

第一步就是將業(yè)務(wù)數(shù)據(jù)庫入庫,還有需要發(fā)送的消息入庫到消息數(shù)據(jù)庫中(入庫后的消息其狀態(tài)為投遞中)。第二步是producer生產(chǎn)者再將消息發(fā)送給RabbitMQ第三步RabbitMQ會(huì)開啟確認(rèn)回調(diào),producer生產(chǎn)者會(huì)監(jiān)聽來自RabbitMQ的確認(rèn)回調(diào)。第四步producer生產(chǎn)者監(jiān)聽到了回調(diào)表示消息發(fā)送成功了,生產(chǎn)者就會(huì)更新MSG_DB消息數(shù)據(jù)庫中剛剛發(fā)送的消息的狀態(tài)為投遞成功,上圖status 1表示成功,0表示投遞中。使用分布式定時(shí)任務(wù)get status:0表示獲取狀態(tài)為投遞中的消息分布式定義任務(wù)將獲取狀態(tài)為投遞中的消息進(jìn)行Retry Send重發(fā)(再執(zhí)行第2、3、4步)。Retry count > 3表示重發(fā)次數(shù)超過3次,就更新當(dāng)前消息的狀態(tài)為2,同時(shí)停止重發(fā)。

方案二:消息延遲投遞,做二次確認(rèn),回調(diào)檢查

? 解析:

? 上圖中BIZ DB為業(yè)務(wù)數(shù)據(jù)庫,Upstream service上層業(yè)務(wù)(看成生產(chǎn)者發(fā)送消息),Downstream service下層業(yè)務(wù)(看成消費(fèi)者接收消息)

Upstream service(生產(chǎn)端)將業(yè)務(wù)數(shù)據(jù)入庫到BIZ DB,再執(zhí)行first Send即第一次發(fā)送消息到RabbitMQ中Step2: Second Send Delay Check第二次延遲發(fā)送消息(算上第一次,即生產(chǎn)端會(huì)向RabbitMQ發(fā)送兩次消息)Step3: Listener Consume 消費(fèi)者監(jiān)聽隊(duì)列推送過來的消息然后進(jìn)行消費(fèi)。消費(fèi)者消費(fèi)完后會(huì)生成確認(rèn)消息發(fā)送給RabbitMQ中,即Downstream service不僅僅做消費(fèi)者,它也可以做生產(chǎn)者去發(fā)送消息,將確認(rèn)消息發(fā)送到隊(duì)列中。Callback Service 回調(diào)服務(wù),Listener Confirm監(jiān)聽隊(duì)列推送過來的確認(rèn)信息。監(jiān)聽到確認(rèn)信息后,回調(diào)函數(shù)會(huì)將消息入庫到消息數(shù)據(jù)庫中。Check Detail 如果監(jiān)聽到的是延遲投遞的第二次消息,回調(diào)函數(shù)就會(huì)到MSG_DB消息數(shù)據(jù)庫里檢查這個(gè)消息數(shù)據(jù)在數(shù)據(jù)庫中是否存在(因?yàn)樵诘谖宀降臅r(shí)候把消息入庫了消息數(shù)據(jù)庫),如果數(shù)據(jù)庫中不存在這個(gè)消息就說明消費(fèi)者沒有把確認(rèn)消息發(fā)送給隊(duì)列(即消費(fèi)者消費(fèi)失敗了),這時(shí)Callback servcie回調(diào)服務(wù)就會(huì)執(zhí)行RPC ReSend Command(RPC 重發(fā)命令)再重新從第一步開始執(zhí)行。

這個(gè)方案相對于第一種方案的優(yōu)點(diǎn)是:數(shù)據(jù)庫操作減少了。

其流程為:

發(fā)送消息時(shí),將當(dāng)前消息數(shù)據(jù)庫存入數(shù)據(jù)庫,投遞狀態(tài)為消息投遞中。開啟消息確認(rèn)回調(diào)機(jī)制。確認(rèn)成功,更新投遞狀態(tài)為消息投遞成功。

開啟定時(shí)任務(wù),重新投遞失敗的消息。重試超過3次,更新投遞狀態(tài)為投遞失敗。

5.1.3 消息落庫,對消息狀態(tài)打標(biāo)的具體實(shí)現(xiàn)

開啟消息回調(diào)機(jī)制

1. 創(chuàng)建數(shù)據(jù)庫

DROP DATABASE IF EXISTS `rabbit_msg_rk`;

CREATE DATABASE `rabbit_msg_rk`;

USE `rabbit_msg_rk`;

SET NAMES utf8mb4;

SET FOREIGN_KEY_CHECKS = 0;

-- ----------------------------

-- Table structure for message_log

-- ----------------------------

DROP TABLE IF EXISTS `message_log`;

CREATE TABLE `message_log` (

`id` int(0) NOT NULL AUTO_INCREMENT,

`message_id` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,

`order_id` int(0) NULL DEFAULT NULL,

`try_count` int(0) NULL DEFAULT 0,

`status` int(0) NULL DEFAULT 0,

`create_time` datetime(3) NULL DEFAULT NULL,

`update_time` datetime(3) NULL DEFAULT NULL,

`try_time` datetime(3) NULL DEFAULT NULL,

PRIMARY KEY (`id`) USING BTREE,

INDEX `order_id`(`order_id`) USING BTREE,

CONSTRAINT `message_log_ibfk_1` FOREIGN KEY (`order_id`) REFERENCES `orders` (`id`) ON DELETE CASCADE ON UPDATE CASCADE

) ENGINE = InnoDB AUTO_INCREMENT = 18 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;

-- ----------------------------

-- Table structure for orders

-- ----------------------------

DROP TABLE IF EXISTS `orders`;

CREATE TABLE `orders` (

`id` int(0) NOT NULL AUTO_INCREMENT,

`stock_id` int(0) NOT NULL,

PRIMARY KEY (`id`) USING BTREE

) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;

SET FOREIGN_KEY_CHECKS = 1;

2. 創(chuàng)建Spring Boot項(xiàng)目

如下圖所示創(chuàng)建Spring Boot項(xiàng)目 注意: 選擇JDK1.8的建議用3.0以下的SpringBoot版本,3.0或以上的SpringBoot版本建議使用JDK17或更高版本

3. 添加依賴

在pom.xml文件中添加如下依賴

com.baomidou

mybatis-plus-boot-starter

3.3.1

mysql

mysql-connector-java

8.0.29

org.projectlombok

lombok

org.springframework.boot

spring-boot-starter-amqp

org.springframework.boot

spring-boot-starter-web

4. 添加配置

在yml文件中添加如下配置

server:

port: 8080

spring:

datasource:

driver-class-name: com.mysql.cj.jdbc.Driver

url: jdbc:mysql://服務(wù)器的主機(jī)名或IP地址:端口/數(shù)據(jù)庫名?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai

username: mysql賬號(hào)

password: mysql密碼

rabbitmq:

host: 服務(wù)器的主機(jī)名或IP地址

port: 5672

username: rabbitMQ賬號(hào)

password: rabbitMQ密碼

# 虛擬主機(jī)名,默認(rèn)為"/"

virtual-host: /

# 消息確認(rèn)回調(diào)

# none:表示禁用發(fā)送方確認(rèn)機(jī)制

# correlated:表示開啟發(fā)送方確認(rèn)機(jī)制

# simple:表示開啟發(fā)送方確認(rèn)機(jī)制,并支持 waitForConfirms() 和 waitForConfirmsOrDie() 的調(diào)用。

publisher-confirm-type: correlated

# 消息失敗回調(diào)

publisher-returns: true

# 模版配置

template:

retry:

# 發(fā)布重試,默認(rèn)false

enabled: true

# 重試時(shí)間 默認(rèn)1000ms

initial-interval: 10000ms

# 重試最大間隔時(shí)間

max-interval: 300000ms

# 重試的時(shí)間隔乘數(shù),比如配2,0 第一次等于10s,第二次等于20s,第三次等于40s

multiplier: 2

exchange: topic.exchange

listener:

# 默認(rèn)配置是simple

type: simple

simple:

# 手動(dòng)ack Acknowledge mode of container. auto none

acknowledge-mode: manual

# 消費(fèi)者調(diào)用程序線程的最小數(shù)量

concurrency: 10

# 消費(fèi)者最大數(shù)量

max-concurrency: 10

# 限制消費(fèi)者每次只處理一條信息,處理完在繼續(xù)下一條

prefetch: 1

# 啟動(dòng)時(shí)是否默認(rèn)啟動(dòng)容器

auto-startup: true

# 被拒絕時(shí)重新進(jìn)入隊(duì)列

default-requeue-rejected: true

5. 創(chuàng)建實(shí)體類

在com.example.rabbitmqmsgrk.model包里面創(chuàng)建下面兩個(gè)類

訂單類:

@Data

@EqualsAndHashCode(callSuper = false)

@Accessors(chain = true)

@TableName("orders")

public class Order implements Serializable {

/**

* 訂單id

*/

@TableId(value = "id", type = IdType.AUTO)

private Integer id;

/**

* 庫存id

*/

@TableField("stock_id")

private Integer stockId;

}

消息類:

@Data

@EqualsAndHashCode(callSuper = false)

@Accessors(chain = true)

@TableName("message_log")

public class MassageLog implements Serializable {

/**

* 消息uid

*/

@TableField("message_id")

private String messageId;

/**

* 訂單id

*/

@TableField("order_id")

private Integer orderId;

/**

* 重試時(shí)間

*/

@TableField("try_time")

private LocalDateTime tryTime;

/**

* 重試次數(shù),閾值:3

*/

@TableField("try_count")

private Integer tryCount;

/**

* 消息狀態(tài),0:未發(fā)送成功、1:發(fā)送成功、2:失敗消息

*/

private Integer status;

@TableField("create_time")

private LocalDateTime createTime;

@TableField("update_time")

private LocalDateTime updateTime;

}

6. 創(chuàng)建mapper

在com.example.rabbitmqmsgrk.mapper包下面分別創(chuàng)建MessageLogMapper、OrderMapper兩個(gè)接口

// MessageLog接口

@Mapper

public interface MessageLogMapper extends BaseMapper {

}

// Order 接口

@Mapper

public interface OrderMapper extends BaseMapper {

}

在resources下面創(chuàng)建mapper包,然后在包下面創(chuàng)建MessageLogMapper、OrderMapper這兩個(gè)接口的接口映射文件

MessageLogMapper.xml

message_id,order_id,try_count,try_time,status,create_time,update_time

OrderMapper.xml

id,stock_id

7. 創(chuàng)建Controller

在com.example.rabbitmqmsgrk.web包下面創(chuàng)建OrderController類

@RestController

public class OrderController {

@Resource

private MessageLogMapper messageLogMapper;

@Resource

private RabbitTemplate rabbitTemplate;

@GetMapping("/msgTest")

public String msgTest(Integer orderId,Integer stockId){

Order order = new Order();

order.setId(orderId);

order.setStockId(stockId);

//數(shù)據(jù)庫記錄發(fā)送的消息

String msgId= UUID.randomUUID().toString();

MessageLog messageLog = new MessageLog();

messageLog.setMessageId(msgId);

messageLog.setStatus(0);

messageLog.setOrderId(orderId);

messageLog.setTryCount(0);

messageLog.setTryTime(LocalDateTime.now().plusMinutes(1));

messageLog.setCreateTime(LocalDateTime.now());

messageLogMapper.insert(messageLog);

/**

* 發(fā)送消息

* @param exchange 為交換機(jī)名字

* @param routingKey 為路由鍵

* @param object 為需要發(fā)送消息的內(nèi)容

* @param correlationData 為本次消息的ID

*/

rabbitTemplate.convertAndSend("msg.exchange","msg.routing.key", order, new CorrelationData(msgId));

return "成功";

}

}

8. 創(chuàng)建配置類

在com.example.rabbitmqmsgrk.config包中創(chuàng)建RabbitMQConfig類

@Configuration

public class RabbitMQConfig {

private static final Logger LOGGER= LoggerFactory.getLogger(RabbitMQConfig.class);

// 注入緩存連接工廠依賴對象

@Resource

private CachingConnectionFactory cachingConnectionFactory;

@Resource

private MessageLogMapper messageLogMapper;

@Bean

public RabbitTemplate rabbitTemplate(){

RabbitTemplate rabbitTemplate=new RabbitTemplate(cachingConnectionFactory);

/**

* 消息確認(rèn)回調(diào),確認(rèn)消息是否會(huì)到達(dá)broker

* data:消息的唯一標(biāo)識(shí)

* ack:確認(rèn)結(jié)果

* cause:失敗原因

*/

rabbitTemplate.setConfirmCallback((data,ack,cause)->{

String msgId = data.getId();

if(ack){

LOGGER.info("{}==============>消息發(fā)送成功",msgId);

//在生產(chǎn)者發(fā)送消息的時(shí)候會(huì)把消息入庫到MSG_DB消息數(shù)據(jù)庫中,此消息的狀態(tài)status值為0,表示消息投遞中。當(dāng)消費(fèi)者監(jiān)聽到消息后,這里的setConfirmCallback()方法中實(shí)現(xiàn)消息確認(rèn)回調(diào),更新status的值為1表示投遞成功。

messageLogMapper.update(new MessageLog(),new UpdateWrapper().set("status",1).eq("message_id",msgId));

}else {

LOGGER.error("{}=============>消息發(fā)送失敗",msgId);

}

});

/**

* 消息失敗回調(diào)

* msg:消息主題

* repCode:響應(yīng)碼

* repText:響應(yīng)內(nèi)容

* exchange:交換機(jī)

* routingKey:路由鍵

*/

rabbitTemplate.setReturnCallback((msg,reCode,repText,exchange,routingKey)->{

LOGGER.error("{}==============>消息發(fā)送失敗 ",msg.getBody());

});

return rabbitTemplate;

}

// 創(chuàng)建名為msg.queue的隊(duì)列

@Bean

public Queue queue(){

return new Queue("msg.queue");

}

//聲明交換機(jī)(構(gòu)建direct類型的交換機(jī), 交換機(jī)名為msg.exchange)

@Bean

public DirectExchange directExchange(){

return new DirectExchange("msg.exchange");

}

// 通過msg.routing.key將隊(duì)列綁定到交換機(jī)

@Bean

public Binding binding(){

return BindingBuilder.bind(queue()).to(directExchange ()).with("msg.routing.key");

}

}

注意: 需要再yml中spring.rabbitmq進(jìn)行如下設(shè)置

# 消息確認(rèn)回調(diào)

publisher-confirm-type: correlated

# 消息失敗回調(diào)

publisher-returns: true

9. 進(jìn)行任務(wù)調(diào)度

在com.example.rabbitmqmsgrk.schedule包中創(chuàng)建MsgSchedule類,進(jìn)行定時(shí)任務(wù)

@Component

public class MsgSchedule {

@Resource

private MessageLogMapper messageLogMapper;

@Resource

private OrderMapper orderMapper;

@Resource

private RabbitTemplate rabbitTemplate;

// 定時(shí)任務(wù),使用cron表達(dá)式實(shí)現(xiàn)每隔10秒執(zhí)行一次下面的msgTask()方法

@Scheduled(cron = "0/10 * * * * ?")

public void msgTask(){

// 查詢消息狀態(tài)為0即正在投遞中的,并且tryTime重試時(shí)間小于當(dāng)前時(shí)間。

List list = messageLogMapper.selectList(new QueryWrapper().eq("status", 0).lt("try_time", LocalDateTime.now()));

list.forEach(messageLog -> {

//判斷是否嘗試次數(shù)到3,代表發(fā)送失敗,修改當(dāng)前消息的status為2

if(messageLog.getTryCount()>=3){

messageLogMapper.update(new MessageLog(),new UpdateWrapper().set("status",2).eq("message_id",messageLog.getMessageId()));

}

//沒到3,繼續(xù)發(fā)送,并且修改狀態(tài)

messageLogMapper.update(new MessageLog(),new UpdateWrapper().set("try_count",messageLog.getTryCount() + 1)

.set("update_time",LocalDateTime.now()).set("try_time", LocalDateTime.now().plusMinutes(1))

.eq("message_id",messageLog.getMessageId()));

Order order = orderMapper.selectById(messageLog.getOrderId());

//重新發(fā)送消息

rabbitTemplate.convertAndSend("msg.exchange","msg.routing.key", order, new CorrelationData(messageLog.getMessageId()));

});

}

}

注意:在啟動(dòng)類上面添加@EnableScheduling注解

需要再orders表中添加以下數(shù)據(jù)

訪問:http://localhost:8080/msgTest?orderId=1&stockId=1 請求接口成功

Cron表達(dá)式學(xué)習(xí):https://blog.csdn.net/ITKidKid/article/details/126386738

10.最終結(jié)果

1、發(fā)送消息成功

2、模擬生產(chǎn)者消息首次發(fā)送失敗

我們將controller中msgTest方法里面的converAndSend中交換機(jī)的名字改掉,改成 “msg.exchange.test”,如此生產(chǎn)者發(fā)送消息肯定失敗,這個(gè)消息的狀態(tài)值為0,后面記得改回來

訪問: http://localhost:8080/msgTest?orderId=1&stockId=1

消息發(fā)送失敗

定時(shí)任務(wù)去查詢消息狀態(tài)為投遞中的消息進(jìn)行重發(fā)

消息重發(fā)成功

再查看數(shù)據(jù)庫message_log表,可以看到這個(gè)消息的try_count值為1,表示它進(jìn)行過一次重試,最后status值由0被修改成了1

3、模擬消息首次發(fā)送失敗,定時(shí)任務(wù)重試也失敗

將定時(shí)任務(wù)中的converAndSend中交換機(jī)的名字也改掉,改成 “msg.exchange.test”,如此來模擬生產(chǎn)者首次發(fā)送的消息失敗,定時(shí)任務(wù)重試也失敗。后面記得改回來

訪問: http://localhost:8080/msgTest?orderId=1&stockId=1

訪問成功立刻查看數(shù)據(jù)庫message_log表,可以看到這個(gè)消息的try_count值為0,status值也是0,重試次數(shù)為0,即狀態(tài)是投遞中

定時(shí)任務(wù)MsgSchedule類中的定時(shí)任務(wù)每隔10秒就會(huì)去重試下,當(dāng)重試次數(shù)超過3次,就直接將當(dāng)前消息的狀態(tài)值修改為2,再次查看message_log表的數(shù)據(jù)。

注:等待時(shí)間大約3分鐘,即3分鐘后當(dāng)前消息的status狀態(tài)才會(huì)變成2

最后不要忘記更正controller中以及MsgSchedule類中交換機(jī)的名字。

11.在高并發(fā)的場景下是否合適?

第一種方案對數(shù)據(jù)有兩次入庫,一次業(yè)務(wù)數(shù)據(jù)入庫,一次消息入庫。這樣對數(shù)據(jù)的入庫是一個(gè)瓶頸。

其實(shí)我們只需要對業(yè)務(wù)進(jìn)行入庫。

5.2 RabbitMQ 如何避免消息重復(fù)消費(fèi)?

5.2.1 冪等性

消息的冪等性是指一次消息傳遞可能會(huì)發(fā)生多次,但最終業(yè)務(wù)狀態(tài)只會(huì)改變一次。換句話說,即使多次收到了同一消息,也不會(huì)導(dǎo)致重復(fù)的業(yè)務(wù)處理。冪等操作的特點(diǎn)是其任意多次執(zhí)行所產(chǎn)生的影響均與一次執(zhí)行的影響相同。

保證消息的冪等性在開發(fā)中是很重要的,例如在客戶點(diǎn)擊付款的情況下,如果點(diǎn)擊了多次,系統(tǒng)也只能扣一次費(fèi)。此外,實(shí)現(xiàn)冪等性操作可以免去因重試等造成系統(tǒng)產(chǎn)生的未知問題。

然而,消息隊(duì)列如RabbitMQ、RocketMQ、kafka等,都可能出現(xiàn)消息的重復(fù)發(fā)送,這個(gè)是消息隊(duì)列無法保障的。在這種情況下,我們需要開發(fā)人員去保證消息的冪等性。實(shí)際上,消息隊(duì)列沒法幫你做到消費(fèi)端的冪等性,消費(fèi)端的冪等性得基于業(yè)務(wù)場景進(jìn)行實(shí)現(xiàn)。但是,至少得保證消息不能丟,且至少被消費(fèi)一次。

5.2.2 高并發(fā)的情況下如何避免消息重復(fù)消費(fèi)

唯一id+加指紋碼,利用數(shù)據(jù)庫主鍵去重。優(yōu)點(diǎn):實(shí)現(xiàn)簡單缺點(diǎn):高并發(fā)下有數(shù)據(jù)寫入瓶頸。利用Redis的原子性來實(shí)習(xí)。使用Redis進(jìn)行冪等是需要考慮的問題是否進(jìn)行數(shù)據(jù)庫落庫,落庫后數(shù)據(jù)和緩存如何做到保證冪等(Redis和數(shù)據(jù)庫如何同時(shí)成功同時(shí)失?。??如果不進(jìn)行落庫,都放在Redis中如何這是Redis和數(shù)據(jù)庫的同步策略?還有放在緩存中就能百分之百的成功嗎?

5.2.3 解決重復(fù)消費(fèi)的案例代碼

1. 添加依賴

在pom.xml中添加如下依賴

org.springframework.boot

spring-boot-starter-data-redis

2. 添加配置

在application.yml中添加如下配置,記住,一定要設(shè)置手動(dòng)ACK,不然會(huì)報(bào)錯(cuò) acknowledge-mode: manual

spring:

# redis配置

redis:

# 超時(shí)時(shí)間

timeout: 10000ms

# 服務(wù)器地址

host: 服務(wù)器地址

# 服務(wù)器端口

port: 6379

database: 0

lettuce:

pool:

# 連接池最大連接數(shù) 默認(rèn)8 ,負(fù)數(shù)表示沒有限制

max-active: 1024

# 最大連接阻塞等待時(shí)間,默認(rèn)-1

max-wait: 10000ms

# 最大空閑連接

max-idle: 200

# 最小空閑連接

min-idle: 5

password: redis密碼

# rabbitmq配置

rabbitmq:

simple:

# 手動(dòng)ack Acknowledge mode of container. auto none

acknowledge-mode: manual

3. 生產(chǎn)者代碼

在com.example.rabbitmqmsgrk.web包中的OrderController類中創(chuàng)建repetition方法用于發(fā)送消息

@GetMapping("/repetition")

public String repetition(){

// 給消息封裝一個(gè)唯一id對象

String msgId= UUID.randomUUID().toString();

/**

* 發(fā)送消息

* @param exchange 為交換機(jī)名字

* @param routingKey 為路由鍵

* @param object 為需要發(fā)送消息的內(nèi)容

* @param correlationData 為本次消息的ID

*/

rabbitTemplate.convertAndSend("msg.exchange","msg.routing.key", "消息重復(fù)消費(fèi)問題處理", new CorrelationData(msgId));

return "成功";

}

4. 消費(fèi)者代碼

在com.example.rabbitmqmsgrk.config 中創(chuàng)建ReceiveHandler消費(fèi)者接收類,用于進(jìn)行消息消費(fèi)

@Component

public class ReceiveHandler {

@Resource

private StringRedisTemplate stringRedisTemplate;

// 監(jiān)聽隊(duì)列

@RabbitListener(bindings = @QueueBinding(

value = @Queue(value = "msg.queue", durable = "true"),

exchange = @Exchange(

value = "msg.exchange",

ignoreDeclarationExceptions = "true",

type = ExchangeTypes.TOPIC

),

key = {"msg.routing.*", "msg.#"}))

public void repetition(String msg, Channel channel, Message message) throws IOException {

// 1. 消息的唯一標(biāo)識(shí)

long deliveryTag = message.getMessageProperties().getDeliveryTag();

// 2. 獲取MessageId, 消息唯一id

String messageId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");

// 3. 設(shè)置key到Redis

if (stringRedisTemplate.opsForValue().setIfAbsent(messageId, "0", 10, TimeUnit.SECONDS)) {

// 4. 消費(fèi)消息

System.out.println("接收到消息:" + msg);

// 5. 設(shè)置key的value為1

stringRedisTemplate.opsForValue().set(messageId, "1", 10, TimeUnit.SECONDS);

// 6. 手動(dòng)ack

channel.basicAck(deliveryTag, false);

} else {

// 4. 獲取Redis中的value即可 如果是1,手動(dòng)ack

if ("1".equalsIgnoreCase(stringRedisTemplate.opsForValue().get(messageId))) {

System.out.println("消息:" + messageId + "已消費(fèi)");

// 5. 手動(dòng)ack

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

}

}

}

}

5. 最終結(jié)果

1、消息消費(fèi)成功

訪問:http://localhost:8080/repetition

查看控制臺(tái)和redis,可以看到redis中的值變成1了,表示消息成功消費(fèi)了

2、模擬消息重復(fù)消費(fèi)場景

在OrderController類中repetition`方法添加for循環(huán)模擬消息重復(fù)消費(fèi)場景,連續(xù)發(fā)送3次重復(fù)消息

for (int i = 0; i < 3; i++) {

rabbitTemplate.convertAndSend("msg.exchange", "msg.routing.key", "消息重復(fù)消費(fèi)問題處理", new CorrelationData(msgId));

}

訪問:http://localhost:8080/repetition

查看控制臺(tái),可以看到消息重復(fù)進(jìn)行了消費(fèi)

5.3 RabbitMQ 如何避免消息積壓?

5.3.1 解決方案

優(yōu)化業(yè)務(wù)流程:檢查并優(yōu)化業(yè)務(wù)邏輯,確保消費(fèi)者能夠及時(shí)處理消息。這可能需要對業(yè)務(wù)邏輯進(jìn)行重新設(shè)計(jì),或者增加更多的消費(fèi)者來提高處理速度。增加消費(fèi)者數(shù)量:通過增加消費(fèi)者的數(shù)量,可以并行處理更多的消息,從而減輕單個(gè)消費(fèi)者的負(fù)擔(dān)。調(diào)整RabbitMQ的參數(shù):根據(jù)實(shí)際情況,調(diào)整RabbitMQ的參數(shù),如消息的生存時(shí)間(TTL)、最大隊(duì)列長度等。例如,可以設(shè)置消息的生存時(shí)間為較短的時(shí)間,以便消息能夠在隊(duì)列中保留的時(shí)間更短,從而減少隊(duì)列的壓力。使用死信隊(duì)列:在RabbitMQ中,可以使用死信隊(duì)列來處理無法正常處理的消息。當(dāng)消息在隊(duì)列中過期或者被拒絕時(shí),可以將其發(fā)送到死信隊(duì)列中,以便進(jìn)行后續(xù)處理。監(jiān)控和告警:建立監(jiān)控系統(tǒng),實(shí)時(shí)監(jiān)控RabbitMQ的運(yùn)行狀態(tài)和隊(duì)列情況。當(dāng)出現(xiàn)消息積壓時(shí),及時(shí)發(fā)出告警通知,以便能夠及時(shí)采取措施解決問題。臨時(shí)擴(kuò)容:如果以上措施無法滿足需求,可以考慮臨時(shí)擴(kuò)容,增加更多的消費(fèi)者和資源來處理積壓的消息。但這只是應(yīng)急措施,需要在后續(xù)對系統(tǒng)進(jìn)行進(jìn)一步的優(yōu)化和改進(jìn)。

更多命令

一、卸載rabbitmq相關(guān)的

1、卸載前先停掉rabbitmq服務(wù),執(zhí)行命令

service rabbitmq-server stop

2、查看rabbitmq安裝的相關(guān)列表

yum list | grep rabbitmq

3、卸載rabbitmq已安裝的相關(guān)內(nèi)容

yum -y remove rabbitmq-server.noarch

二、卸載erlang

1、查看erlang安裝的相關(guān)列表

yum list | grep erlang

2、卸載erlang已安裝的相關(guān)內(nèi)容

yum -y remove erlang-*

yum remove erlang.x86_64

啟動(dòng)服務(wù):rabbitmq-server -detached # 以后臺(tái)守護(hù)進(jìn)程方式啟動(dòng)

查看狀態(tài):rabbitmqctl status

關(guān)閉服務(wù):rabbitmqctl stop

列出角色:rabbitmqctl list_users

rabbitmqctl list_permissions # 查看(指定vhostpath)所有用戶的權(quán)限

rabbitmqctl list_permissions -p / # 查看virtual host為/的所有用戶權(quán)限

rabbitmqctl list_user_permissions developer # 查看指定用戶的權(quán)限

rabbitmqctl clear_permissions developer # 清除用戶權(quán)限

rabbitmqctl delete_user guest # 刪除用戶

rabbitmqctl change_password developer dev123456 # 修改密碼

感謝博主:

https://blog.csdn.net/Bejpse/article/details/126424250

https://blog.csdn.net/hengheng__/article/details/123390048

柚子快報(bào)激活碼778899分享:分布式 RabbitMQ實(shí)戰(zhàn)

http://yzkb.51969.com/

相關(guān)閱讀

評論可見,查看隱藏內(nèi)容

本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場。

轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。

本文鏈接:http://m.gantiao.com.cn/post/18844288.html

發(fā)布評論

您暫未設(shè)置收款碼

請?jiān)谥黝}配置——文章設(shè)置里上傳

掃描二維碼手機(jī)訪問

文章目錄