欧美free性护士vide0shd,老熟女,一区二区三区,久久久久夜夜夜精品国产,久久久久久综合网天天,欧美成人护士h版

目錄

柚子快報(bào)激活碼778899分享:分布式 rabbitmq

柚子快報(bào)激活碼778899分享:分布式 rabbitmq

http://yzkb.51969.com/

一,什么是消息中間件

MQ全稱為Message Queue,本質(zhì)上是個(gè)隊(duì)列,F(xiàn)IFO先入先出。是在消息的傳輸過(guò)程中保存消息的容器??梢杂糜趹?yīng)用程序和應(yīng)用程序之間的通信方法。多用于分布式系統(tǒng)之間進(jìn)行通信,在項(xiàng)目中,可將一些無(wú)需即時(shí)返回且耗時(shí)的操作提取出來(lái),進(jìn)行異步處理,而這種異步處理的方式大大的節(jié)省了服務(wù)器的請(qǐng)求響應(yīng)時(shí)間,從而提高了系統(tǒng)的吞吐量

二,為什么要使用消息中間件

消息中間件優(yōu)點(diǎn):異步,解耦,限流

同步通信:耗時(shí)長(zhǎng),受網(wǎng)絡(luò)波動(dòng)影響,不能保證高成功率,耦合性高。

同步,異步

并發(fā):一段時(shí)間(1S)多個(gè)請(qǐng)求數(shù)

并行:時(shí)間節(jié)點(diǎn),多個(gè)指令同時(shí)被執(zhí)行

串行:順序執(zhí)行

1.異步處理:

將不需要同步處理的并且耗時(shí)長(zhǎng)的操作由消息隊(duì)列通知消息接收方進(jìn)行異步處理,提高了應(yīng)用程序的響應(yīng)時(shí)間。

消息隊(duì)列:Redis 發(fā)布訂閱(pub/sub)

異步方式:用戶點(diǎn)擊完下單按鈕后,只需等待25ms就能得到下單響應(yīng) (20 + 5 = 25ms)。也就是說(shuō),訂單消息提交到MQ,MQ回饋一個(gè)消息成功,然后再把訂單提交到數(shù)據(jù)庫(kù)20ms,就完成了。至于MQ通知庫(kù)存、支付、物流系統(tǒng)所花費(fèi)的時(shí)間和訂單系統(tǒng)成功沒(méi)有關(guān)系了。 這樣這個(gè)訂單系統(tǒng)提升用戶體驗(yàn)和系統(tǒng)吞吐量(單位時(shí)間內(nèi)處理請(qǐng)求的數(shù)目)

[外鏈圖片轉(zhuǎn)存失敗,源站可能有防盜鏈機(jī)制,建議將圖片保存下來(lái)直接上傳(img-fw7ZuYfR-1691115209630)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20230713152842364.png)]

同步處理(耗時(shí)長(zhǎng)):

同步方式的問(wèn)題:當(dāng)一個(gè)用戶提交訂單到成功需要300ms+300ms+300ms+20ms = 920ms,這是不能容忍的。也就是說(shuō)庫(kù)存、支付、物流、最后保存數(shù)據(jù)庫(kù)全部成功,訂單的提交才算完成。

2.系統(tǒng)的耦合性越高,容錯(cuò)性就越低,可維護(hù)性就越低

服務(wù)與之間耦合度,比如訂單服務(wù)與用戶積分服務(wù)(需求:下單成功,增加積分)

如果不用消息隊(duì)列,訂單服務(wù)和積分服務(wù)就要通信,下單后調(diào)用積分服務(wù)的接口通知積分服務(wù)進(jìn)行處理(或者定時(shí)掃描之類的),那么調(diào)用接口失敗,或者延時(shí)等等…一系列的問(wèn)題要考慮處理,非常繁瑣

用了消息隊(duì)列,用戶A下單成功后下單服務(wù)通過(guò)redis發(fā)布(mq的生產(chǎn)者)一消息,就不用管了.用戶積分服務(wù)redis訂閱了(mq的消費(fèi)者),就會(huì)受到這用戶A下單的消息,進(jìn)行處理.這就降低了多個(gè)服務(wù)之間的耦合,即使積分服務(wù)發(fā)生異常,也不會(huì)影響用戶正常下單.處理起來(lái)就非常的絲滑,各干各的互不影響.

解決方案:應(yīng)用程序解耦合

MQ相當(dāng)于一個(gè)中介,生產(chǎn)方通過(guò)MQ與消費(fèi)方交互,它將應(yīng)用程序進(jìn)行解耦合。

使用消息隊(duì)列的方式:使用 MQ 使得應(yīng)用間解耦,提升容錯(cuò)性和可維護(hù)性。庫(kù)存和支付和物流直接去MQ取到訂單的信息即可,即使庫(kù)存系統(tǒng)報(bào)錯(cuò),沒(méi)關(guān)系,等到庫(kù)存修復(fù)后再次從MQ中去取就可以了

3.高并發(fā)(分批處理請(qǐng)求)

訂單系統(tǒng),在下單的時(shí)候就會(huì)往數(shù)據(jù)庫(kù)寫(xiě)數(shù)據(jù)。但是數(shù)據(jù)庫(kù)只能支撐每秒1000左右的并發(fā)寫(xiě)入,并發(fā)量再高就容易宕機(jī)。低峰期的時(shí)候并發(fā)也就100多個(gè),但是在高峰期時(shí)候,并發(fā)量會(huì)突然激增到5000以上,這個(gè)時(shí)候數(shù)據(jù)庫(kù)肯定卡死了。但不一定宕機(jī),只會(huì)很慢,一旦宕機(jī)就會(huì)有消息丟失。

解決方案:削峰填谷

消息被MQ保存起來(lái)了,5000條數(shù)據(jù)對(duì)于MQ,簡(jiǎn)直是小意思,然后系統(tǒng)就可以按照自己的消費(fèi)能力來(lái)消費(fèi),比如每秒1000個(gè)數(shù)據(jù),這樣慢慢寫(xiě)入數(shù)據(jù)庫(kù),這樣就不會(huì)卡死數(shù)據(jù)庫(kù)了

4,MQ的劣勢(shì)

1、系統(tǒng)可用性降低:系統(tǒng)引入的外部依賴越多,系統(tǒng)穩(wěn)定性越差。一旦 MQ 宕機(jī),就會(huì)對(duì)業(yè)務(wù)造成影響。如何保證MQ的高可用?

2、系統(tǒng)復(fù)雜度提高:MQ 的加入大大增加了系統(tǒng)的復(fù)雜度,以前系統(tǒng)間是同步的遠(yuǎn)程調(diào)用,現(xiàn)在是通過(guò) MQ 進(jìn)行異步調(diào)用。如何保證消息沒(méi)有被重復(fù)消費(fèi)?怎么處理消息丟失情況?那么保證消息傳遞的順序性?

3、一致性問(wèn)題:A 系統(tǒng)處理完業(yè)務(wù),通過(guò) MQ 給B、C、D三個(gè)系統(tǒng)發(fā)消息數(shù)據(jù),如果 B 系統(tǒng)、C 系統(tǒng)處理成功,D 系統(tǒng)處理失敗。如何保證消息數(shù)據(jù)處理的一致性?

既然 MQ 有優(yōu)勢(shì)也有劣勢(shì),那么使用 MQ 需要滿足什么條件呢?

生產(chǎn)者不需要從消費(fèi)者處獲得反饋。引入消息隊(duì)列之前的直接調(diào)用,其接口的返回值應(yīng)該為空,這才讓明明下層的動(dòng)作還沒(méi)做,上層卻當(dāng)成動(dòng)作做完了繼續(xù)往后走,即所謂異步成為了可能。 容許短暫的不一致性。 確實(shí)是用了有效果。即解耦、提速、削峰這些方面的收益,超過(guò)加入MQ,管理MQ這些成本。

常見(jiàn)的 MQ 產(chǎn)品

目前業(yè)界有很多的 MQ 產(chǎn)品,例如 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq等,也有直接使用 Redis 充當(dāng)消息隊(duì)列的案例,而這些消息隊(duì)列產(chǎn)品,各有側(cè)重,在實(shí)際選型時(shí),需要結(jié)合自身需求及 MQ 產(chǎn)品特征,綜合考慮。

** **RabbitMQActiveMQRocketMQKafka公司/社區(qū)RabbitApache阿里Apache開(kāi)發(fā)語(yǔ)言ErlangJavaJavaScala&Java協(xié)議支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定義自定義協(xié)議,社區(qū)封裝了http協(xié)議支持客戶端支持語(yǔ)言官方支持Erlang,Java,Ruby等,社區(qū)產(chǎn)出多種API,幾乎支持所有語(yǔ)言Java,C,C++,Python,PHP,Perl,.net等Java,C++(不成熟)官方支持Java,社區(qū)產(chǎn)出多種API,如PHP,Python等單機(jī)吞吐量萬(wàn)級(jí)(其次)萬(wàn)級(jí)(最差)十萬(wàn)級(jí)(最好)十萬(wàn)級(jí)(次之)消息延遲微秒級(jí)毫秒級(jí)毫秒級(jí)毫秒以內(nèi)功能特性并發(fā)能力強(qiáng),性能極其好,延時(shí)低,社區(qū)活躍,管理界面豐富老牌產(chǎn)品,成熟度高,文檔較多MQ功能比較完備,擴(kuò)展性佳只支持主要的MQ功能,畢竟是為大數(shù)據(jù)領(lǐng)域準(zhǔn)備的。

三,RabbitMQ簡(jiǎn)介

RabbitMQ是由erlang語(yǔ)言開(kāi)發(fā),基于AMQP(Advanced Message Queue 高級(jí)消息隊(duì)列協(xié)議)協(xié)議實(shí)現(xiàn)的消息隊(duì)列,它是一種應(yīng)用程序之間的通信方法,消息隊(duì)列在分布式系統(tǒng)開(kāi)發(fā)中應(yīng)用非常廣泛。2007年,Rabbit 技術(shù)公司基于 AMQP 標(biāo)準(zhǔn)開(kāi)發(fā)的 RabbitMQ 1.0 發(fā)布。 (Erlang 語(yǔ)言由 Ericson 設(shè)計(jì),專門為開(kāi)發(fā)高并發(fā)和分布式系統(tǒng)的一種語(yǔ)言,在電信領(lǐng)域使用廣泛)

RabbitMQ官方地址:http://www.rabbitmq.com/

RabbitMQ 基礎(chǔ)架構(gòu)如下圖:

上圖說(shuō)明:

1、Broker:接收和分發(fā)消息的應(yīng)用,就是一個(gè)中介,RabbitMQ Server就是 Message Broker 2、Virtual host:出于多租戶和安全因素設(shè)計(jì)的,把 AMQP 的基本組件劃分到一個(gè)虛擬的分組中,類似于網(wǎng)絡(luò)中的 namespace 概念。當(dāng)多個(gè)不同的用戶使用同一個(gè) RabbitMQ server 提供的服務(wù)時(shí),可以劃分出多個(gè)vhost,每個(gè)用戶在自己的 vhost 創(chuàng)建 exchange/queue 等 3、Connection:publisher/consumer 和 broker 之間的 TCP 連接 4、Channel:如果每一次訪問(wèn) RabbitMQ 都建立一個(gè) Connection,在消息量大的時(shí)候建立 TCP Connection的開(kāi)銷將是巨大的,效率也較低。Channel 是在 connection 內(nèi)部建立的邏輯連接,如果應(yīng)用程序支持多線程,通常每個(gè)thread創(chuàng)建單獨(dú)的 channel 進(jìn)行通訊,AMQP method 包含了channel id 幫助客戶端和message broker 識(shí)別 channel,所以 channel 之間是完全隔離的。Channel 作為輕量級(jí)的 Connection 極大減少了操作系統(tǒng)建立 TCP connection 的開(kāi)銷

5、Exchange:message 到達(dá) broker 的第一站,根據(jù)分發(fā)規(guī)則,匹配查詢表中的 routing key,分發(fā)消息到queue 中去。常用的類型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast) 6、Queue:消息最終被送到這里等待 consumer 取走 7、Binding:exchange 和 queue 之間的虛擬連接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查詢表中,用于 message 的分發(fā)依據(jù)

RabbitMQ提供了6種模式:簡(jiǎn)單模式,work模式,Publish/Subscribe發(fā)布與訂閱模式,Routing路由模式,Topics主題模式,RPC遠(yuǎn)程調(diào)用模式(遠(yuǎn)程調(diào)用,不太算MQ;暫不作介紹);官網(wǎng)對(duì)應(yīng)模式介紹:https://www.rabbitmq.com/getstarted.html , 點(diǎn)擊手冊(cè)按鈕 RabbitMQ Tutorials

2. 安裝及配置RabbitMQ

RabbitMQ 官方地址:http://www.rabbitmq.com/

win10安裝

安裝erlang和rabbitmq后,進(jìn)入rabbitmq安裝目錄的sbin目錄執(zhí)行下面命令

rabbitmq-plugins.bat enable rabbitmq_management

[外鏈圖片轉(zhuǎn)存失敗,源站可能有防盜鏈機(jī)制,建議將圖片保存下來(lái)直接上傳(img-tUbZI5GZ-1691115209633)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20230713154626768.png)]

登錄URL:

http://localhost:15672/

用戶名密碼:guest/guest

問(wèn)題:

? 登錄用戶是中文解決方案:

? 1、創(chuàng)建用戶為英文,再安裝相關(guān)環(huán)境

? 2、修改相應(yīng)的目錄

? 用管理員執(zhí)行CMD

rabbitmq-service.bat remove

set RABBITMQ_BASE=D:\rabbitmq_server\data

rabbitmq-service.bat install

rabbitmq-plugins enable rabbitmq_management

查看進(jìn)程

tasklist | find /i "erl"

關(guān)閉進(jìn)程

taskkill /pid 7300 -t -f //將7300改成對(duì)應(yīng)端口號(hào)

3. RabbitMQ快速入門

3.1 生產(chǎn)方工程搭建

1.添加相關(guān)依賴

修改pom.xml文件內(nèi)容為如下:

org.springframework.boot

spring-boot-starter-amqp

2.啟動(dòng)類

package com.woniu.rabbitmq;

import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication

public class RabbitApplication {

public static void main(String[] args) {

SpringApplication.run(RabbitApplication.class);

}

}

3.配置RabbitMQ

創(chuàng)建application.yml,內(nèi)容如下:

spring:

rabbitmq:

host: localhost

port: 5672

virtual-host: /woniu

username: woniu

password: woniu

創(chuàng)建隊(duì)列參數(shù)說(shuō)明:

參數(shù)說(shuō)明name字符串值,queue的名稱。durable布爾值,表示該 queue 是否持久化。 持久化意味著當(dāng) RabbitMQ 重啟后,該 queue 是否會(huì)恢復(fù)/仍存在。 另外,需要注意的是,queue 的持久化不等于其中的消息也會(huì)被持久化。exclusive布爾值,表示該 queue 是否排它式使用。排它式使用意味著僅聲明他的連接可見(jiàn)/可用,其它連接不可見(jiàn)/不可用。autoDelete布爾值,表示當(dāng)該 queue 沒(méi)“人”(connection)用時(shí),是否會(huì)被自動(dòng)刪除。

不指定 durable、exclusive 和 autoDelete 時(shí),默認(rèn)為 true 、 false 和 false 。表示持久化、非排它、不用自動(dòng)刪除。

創(chuàng)建交換機(jī)參數(shù)說(shuō)明

參數(shù)說(shuō)明name字符串值,exchange 的名稱。durable布爾值,表示該 exchage 是否持久化。 持久化意味著當(dāng) RabbitMQ 重啟后,該 exchange 是否會(huì)恢復(fù)/仍存在。autoDelete布爾值,表示當(dāng)該 exchange 沒(méi)“人”(queue)用時(shí),是否會(huì)被自動(dòng)刪除。

不指定 durable 和 autoDelete 時(shí),默認(rèn)為 true 和 false 。表示持久化、不用自動(dòng)刪除

4. AMQP

4.1. 相關(guān)概念介紹

AMQP 一個(gè)提供統(tǒng)一消息服務(wù)的應(yīng)用層標(biāo)準(zhǔn)高級(jí)消息隊(duì)列協(xié)議,是應(yīng)用層協(xié)議的一個(gè)開(kāi)放標(biāo)準(zhǔn),為面向消息的中間件設(shè)計(jì)。AMQP是一個(gè)二進(jìn)制協(xié)議,擁有一些現(xiàn)代化特點(diǎn):多信道、協(xié)商式,異步,安全,擴(kuò)平臺(tái),中立,高效。

RabbitMQ是AMQP協(xié)議的Erlang的實(shí)現(xiàn)。

概念說(shuō)明連接Connection一個(gè)網(wǎng)絡(luò)連接,比如TCP/IP套接字連接。會(huì)話Session端點(diǎn)之間的命名對(duì)話。在一個(gè)會(huì)話上下文中,保證“恰好傳遞一次”。信道Channel多路復(fù)用連接中的一條獨(dú)立的雙向數(shù)據(jù)流通道。為會(huì)話提供物理傳輸介質(zhì)??蛻舳薈lientAMQP連接或者會(huì)話的發(fā)起者。AMQP是非對(duì)稱的,客戶端生產(chǎn)和消費(fèi)消息,服務(wù)器存儲(chǔ)和路由這些消息。服務(wù)節(jié)點(diǎn)Broker消息中間件的服務(wù)節(jié)點(diǎn);一般情況下可以將一個(gè)RabbitMQ Broker看作一臺(tái)RabbitMQ 服務(wù)器。端點(diǎn)AMQP對(duì)話的任意一方。一個(gè)AMQP連接包括兩個(gè)端點(diǎn)(一個(gè)是客戶端,一個(gè)是服務(wù)器)。消費(fèi)者Consumer一個(gè)從消息隊(duì)列里請(qǐng)求消息的客戶端程序。生產(chǎn)者Producer一個(gè)向交換機(jī)發(fā)布消息的客戶端應(yīng)用程序。

4.2. RabbitMQ運(yùn)轉(zhuǎn)流程

在入門案例中:

生產(chǎn)者發(fā)送消息

生產(chǎn)者創(chuàng)建連接(Connection),開(kāi)啟一個(gè)信道(Channel),連接到RabbitMQ Broker;聲明隊(duì)列并設(shè)置屬性;如是否排它,是否持久化,是否自動(dòng)刪除;將路由鍵(空字符串)與隊(duì)列綁定起來(lái);發(fā)送消息至RabbitMQ Broker;關(guān)閉信道;關(guān)閉連接; 消費(fèi)者接收消息

消費(fèi)者創(chuàng)建連接(Connection),開(kāi)啟一個(gè)信道(Channel),連接到RabbitMQ Broker向Broker 請(qǐng)求消費(fèi)相應(yīng)隊(duì)列中的消息,設(shè)置相應(yīng)的回調(diào)函數(shù);等待Broker回應(yīng)投遞隊(duì)列中的消息,消費(fèi)者接收消息;確認(rèn)(ack,自動(dòng)確認(rèn))接收到的消息;RabbitMQ從隊(duì)列中刪除相應(yīng)已經(jīng)被確認(rèn)的消息;關(guān)閉信道;關(guān)閉連接;

4.3. 生產(chǎn)者流轉(zhuǎn)過(guò)程說(shuō)明

客戶端與代理服務(wù)器Broker建立連接。會(huì)調(diào)用newConnection() 方法,這個(gè)方法會(huì)進(jìn)一步封裝Protocol Header 0-9-1 的報(bào)文頭發(fā)送給Broker ,以此通知Broker 本次交互采用的是AMQP0-9-1 協(xié)議,緊接著B(niǎo)roker 返回Connection.Start 來(lái)建立連接,在連接的過(guò)程中涉及Connection.Start/.Start-OK 、Connection.Tune/.Tune-Ok ,Connection.Open/ .Open-Ok 這6 個(gè)命令的交互。 客戶端調(diào)用connection.createChannel方法。此方法開(kāi)啟信道,其包裝的channel.open命令發(fā)送給Broker; channel.basicPublish方法對(duì)應(yīng)的AMQP命令為Basic.Publish,這個(gè)命令包含了content Header 和content Body()。content Header 包含了消息體的屬性,例如:投遞模式,優(yōu)先級(jí)等,content Body 包含了消息體本身。 客戶端發(fā)送完消息需要關(guān)閉資源時(shí),涉及到Channel.Close和Channel.Close-Ok 與Connetion.Close和Connection.Close-Ok的命令交互。

4.4. 消費(fèi)者流轉(zhuǎn)過(guò)程說(shuō)明

消費(fèi)者客戶端與代理服務(wù)器Broker建立連接。會(huì)調(diào)用newConnection() 方法,這個(gè)方法會(huì)進(jìn)一步封裝Protocol Header 0-9-1 的報(bào)文頭發(fā)送給Broker ,以此通知Broker 本次交互采用的是AMQP0-9-1 協(xié)議,緊接著B(niǎo)roker 返回Connection.Start 來(lái)建立連接,在連接的過(guò)程中涉及Connection.Start/.Start-OK 、Connection.Tune/.Tune-Ok ,Connection.Open/ .Open-Ok 這6 個(gè)命令的交互。消費(fèi)者客戶端調(diào)用connection.createChannel方法。和生產(chǎn)者客戶端一樣,協(xié)議涉及Channel . Open/Open-Ok命令。在真正消費(fèi)之前,消費(fèi)者客戶端需要向Broker 發(fā)送Basic.Consume 命令(即調(diào)用channel.basicConsume 方法〉將Channel 置為接收模式,之后Broker響應(yīng)Basic . Consume - Ok 以告訴消費(fèi)者客戶端準(zhǔn)備好消費(fèi)消息。Broker 向消費(fèi)者客戶端推送(Push) 消息,即Basic.Deliver 命令,這個(gè)命令和Basic.Publish 命令一樣會(huì)攜帶Content Header 和Content Body。消費(fèi)者接收到消息并正確消費(fèi)之后,向Broker 發(fā)送確認(rèn),即Basic.Ack 命令??蛻舳税l(fā)送完消息需要關(guān)閉資源時(shí),涉及到Channel.Close和Channl.Close-Ok 與Connetion.Close和Connection.Close-Ok的命令交互。

5. RabbitMQ工作模式

5.1. Work queues工作隊(duì)列模式

simple模式: 一個(gè)生產(chǎn)者一個(gè)消費(fèi)者 定義rabbitconfig: 創(chuàng)建消息隊(duì)列,交換機(jī)及其之間綁定 @Configuration

public class RabbitmqConfig {

/**

* simple 隊(duì)列

*/

@Bean

public Queue simpleQueue(){

return QueueBuilder.durable("simpleQueue").build();

//return new Queue("simpleQueue");

}

}

定義生產(chǎn)者

/**

* 往消息隊(duì)列返送消息

*/

@Component

public class SimpleProduct {

@Autowired

private RabbitTemplate rabbitTemplate;

public void send(String msg){

rabbitTemplate.convertAndSend("simpleQueue",msg);

}

}

? 定義消費(fèi)者

@Component

public class SimpleConsumer {

/**

* 消費(fèi)消息

*/

@RabbitListener(queues = "simpleQueue")

private void recevie(String msg){

System.out.println("消費(fèi)者接收到:"+msg);

}

}

**work模式:**一個(gè)生產(chǎn)者多個(gè)消費(fèi)者,也稱之為競(jìng)爭(zhēng)模式 創(chuàng)建兩個(gè)消費(fèi)者監(jiān)聽(tīng)隊(duì)列

Work Queues與入門程序的簡(jiǎn)單模式相比,多了一個(gè)或一些消費(fèi)端,多個(gè)消費(fèi)端共同消費(fèi)同一個(gè)隊(duì)列中的消息。它們處于競(jìng)爭(zhēng)者的關(guān)系,一條消息只會(huì)被一個(gè)消費(fèi)者接收,rabbit采用輪詢的方式將消息是平均發(fā)送給消費(fèi)者的;消費(fèi)者在處理完某條消息后,才會(huì)收到下一條消息。

應(yīng)用場(chǎng)景:對(duì)于任務(wù)過(guò)重或任務(wù)較多情況使用工作隊(duì)列可以提高任務(wù)處理的速度。如生產(chǎn)者生產(chǎn)一千條消息,那么c1和c2各消費(fèi)500條,隊(duì)列消費(fèi)消息是均衡分配

復(fù)制消費(fèi)方代碼,重新編寫(xiě)一個(gè)消費(fèi)端,然后啟動(dòng)兩個(gè)消費(fèi)端,進(jìn)行測(cè)試

5.2. 訂閱模式類型

訂閱模式示例圖:

在訂閱模型中,多了一個(gè)exchange角色,而且過(guò)程略有變化:

P:生產(chǎn)者,也就是要發(fā)送消息的程序,但是不再發(fā)送到隊(duì)列中,而是發(fā)給X(交換機(jī))C:消費(fèi)者,消息的接受者,會(huì)一直等待消息到來(lái)。Queue:消息隊(duì)列,接收消息、緩存消息。Exchange:交換機(jī),圖中的X。一方面,接收生產(chǎn)者發(fā)送的消息。另一方面,知道如何處理消息,例如遞交給某個(gè)特別隊(duì)列、遞交給所有隊(duì)列、或是將消息丟棄。到底如何操作,取決于Exchange的類型。Exchange有常見(jiàn)以下3種類型:

Fanout:廣播,將消息交給所有綁定到交換機(jī)的隊(duì)列Direct:定向,把消息交給符合指定routing key 的隊(duì)列Topic:通配符,把消息交給符合routing pattern(路由模式) 的隊(duì)列

Exchange(交換機(jī))只負(fù)責(zé)轉(zhuǎn)發(fā)消息,不具備存儲(chǔ)消息的能力,因此如果沒(méi)有任何隊(duì)列與Exchange綁定,或者沒(méi)有符合路由規(guī)則的隊(duì)列,那么消息會(huì)丟失!

5.2.1 廣播模式

1.創(chuàng)建RabbitMQ隊(duì)列與交換機(jī)綁定的配置類com…rabbitmq.config.RabbitMQConfig

/**

* 負(fù)責(zé):創(chuàng)建消息隊(duì)列,交換機(jī)及其之間綁定

*/

@Configuration

public class RabbitmqConfig {

/**

* fanout 模式

*/

@Bean

public Queue fanoutQueueA(){

return QueueBuilder.durable("fanoutQueueA").build();

}

@Bean

public Queue fanoutQueueB(){

return QueueBuilder.durable("fanoutQueueB").build();

}

/**

* 創(chuàng)建交換機(jī)

* @return

*/

@Bean

public FanoutExchange fanoutExchange(){

return new FanoutExchange("fanoutExchange");

}

/**

* 把消息隊(duì)列,綁定到交換機(jī)上

* IOC 在調(diào)用配置中的方法時(shí),如果有參數(shù),默認(rèn)以形參的名字找到IOC中對(duì)應(yīng)的方法

*/

@Bean

public Binding fanoutQueueAToFanoutExchange( Queue fanoutQueueA,

FanoutExchange fanoutExchange){

return BindingBuilder.bind(fanoutQueueA).to(fanoutExchange);

}

@Bean

public Binding fanoutQueueToFanoutExchange( Queue fanoutQueueB,

FanoutExchange fanoutExchange){

return BindingBuilder.bind(fanoutQueueB).to(fanoutExchange);

}

}

1、實(shí)現(xiàn)生產(chǎn)者

/**

* 往消息隊(duì)列返送消息

*/

@Component

public class FanoutProduct {

@Autowired

private RabbitTemplate rabbitTemplate;

/**

* arg1: 交換機(jī)名字

* arg2: 路由名字

* arg3: 參數(shù)名字

* @param msg

*/

public void send(String msg){

rabbitTemplate.convertAndSend("fanoutExchange","",msg);

}

}

創(chuàng)建交換機(jī)參數(shù)說(shuō)明:

參數(shù)說(shuō)明exchange字符串值,交換機(jī)名稱type交換機(jī)的類型,有三種類型:FANOUT、DIRECT、TOPICdurable交換機(jī)是否持久化,表示當(dāng)rabbitmq重啟時(shí)或者意外宕機(jī),這個(gè)交換機(jī)還在不在autoDelete是否自動(dòng)刪除,表示當(dāng)該交換機(jī)沒(méi)人發(fā)消息時(shí),是否會(huì)被自動(dòng)刪除。internal內(nèi)部使用,一般為falsearguments其它參數(shù)

發(fā)送消息參數(shù)說(shuō)明

參數(shù)說(shuō)明exchange字符串值,交換機(jī)名稱routingKey如果交換機(jī)類型是fanout,則routingKey為""props消息基本屬性配置body要發(fā)送的消息的內(nèi)容

2、消費(fèi)方實(shí)現(xiàn)

@Component

public class FanoutConsumer {

/**

* 消費(fèi)消息

*/

@RabbitListener(queues = "fanoutQueueA")

private void recevie(String msg){

System.out.println("消費(fèi)者A接收到:"+msg);

}

/**

* 消費(fèi)消息

*/

@RabbitListener(queues = "fanoutQueueB")

private void recevieB(String msg){

System.out.println("消費(fèi)者B接收到:"+msg);

}

}

發(fā)布訂閱模式與工作隊(duì)列模式的區(qū)別

1、工作隊(duì)列模式不用定義交換機(jī),而發(fā)布/訂閱模式需要定義交換機(jī)。

2、發(fā)布/訂閱模式的生產(chǎn)方是面向交換機(jī)發(fā)送消息,工作隊(duì)列模式的生產(chǎn)方是面向隊(duì)列發(fā)送消息(底層使用默認(rèn)交換機(jī))。

3、發(fā)布/訂閱模式需要設(shè)置隊(duì)列和交換機(jī)的綁定,工作隊(duì)列模式不需要設(shè)置,實(shí)際上工作隊(duì)列模式會(huì)將隊(duì)列綁 定到默認(rèn)的交換機(jī) 。

5.2.2 Routing路由模式

路由模式特點(diǎn):

隊(duì)列與交換機(jī)的綁定,不能是任意綁定了,而是要指定一個(gè)RoutingKey(路由key)消息的發(fā)送方在 向 Exchange發(fā)送消息時(shí),也必須指定消息的 RoutingKey。Exchange不再把消息交給每一個(gè)綁定的隊(duì)列,而是根據(jù)消息的Routing Key進(jìn)行判斷,只有隊(duì)列的Routingkey與消息的 Routing key完全一致,才會(huì)接收到消息

在編碼上與 Publish/Subscribe發(fā)布與訂閱模式 的區(qū)別是交換機(jī)的類型為:Direct,還有隊(duì)列綁定交換機(jī)的時(shí)候需要指定routing key。

1、創(chuàng)建RabbitMQ隊(duì)列與交換機(jī)綁定的配置類com…rabbitmq.config.RabbitMQConfig

/**

* 負(fù)責(zé):創(chuàng)建消息隊(duì)列,交換機(jī)及其之間綁定

*/

@Configuration

public class RabbitmqConfig {

//==================route===

/**

* 路由模式:通過(guò)路由key將消息發(fā)送給指定消息隊(duì)列

* 1個(gè)交換機(jī),2個(gè)消息隊(duì)列,1個(gè)生產(chǎn)者,2個(gè)消費(fèi)者

*/

@Bean

public Queue routeQueueA(){

return QueueBuilder.durable("routeQueueA").build();

}

@Bean

public Queue routeQueueB(){

return QueueBuilder.durable("routeQueueB").build();

}

@Bean

public DirectExchange directExchange(){

return new DirectExchange("directExchange");

}

//綁定

@Bean

public Binding routeQueueToDirectExchange(Queue routeQueueA,

DirectExchange directExchange){

return BindingBuilder.bind(routeQueueA).to(directExchange).with("red");

}

@Bean

public Binding routeQueueBToDirectExchange(Queue routeQueueB,

DirectExchange directExchange){

return BindingBuilder.bind(routeQueueB).to(directExchange).with("blue");

}

}

2、生產(chǎn)方實(shí)現(xiàn)

*/

@Component

public class DirectProduct {

@Autowired

private RabbitTemplate rabbitTemplate;

/**

* arg1: 交換機(jī)名字

* arg2: 路由名字

* arg3: 參數(shù)名字

* @param msg

*/

public void send(String msg,String routekey){

rabbitTemplate.convertAndSend("directExchange",routekey,msg);

}

}

3.消費(fèi)方實(shí)現(xiàn)

創(chuàng)建2個(gè)消費(fèi)方并啟動(dòng),然后使用生產(chǎn)者發(fā)送消息;在消費(fèi)者對(duì)應(yīng)的控制臺(tái)可以查看到生產(chǎn)者發(fā)送對(duì)應(yīng)routing key對(duì)應(yīng)隊(duì)列的消息;到達(dá)按照需要接收的效果

@Component

public class DirectConsumer {

/**

* 消費(fèi)消息

*/

@RabbitListener(queues = "routeQueueA")

private void recevie(String msg){

System.out.println("消費(fèi)者A接收到:"+msg);

}

/**

* 消費(fèi)消息

*/

@RabbitListener(queues = "routeQueueB")

private void recevieB(String msg){

System.out.println("消費(fèi)者B接收到:"+msg);

}

}

5.2.3. Topics通配符模式(模糊路由)

Topic類型與Direct相比,都是可以根據(jù)RoutingKey把消息路由到不同的隊(duì)列。只不過(guò)Topic類型Exchange可以讓隊(duì)列在綁定Routing key 的時(shí)候使用通配符!Routingkey 一般都是有一個(gè)或多個(gè)單詞組成,多個(gè)單詞之間以”.”分割,例如: item.insert

通配符規(guī)則:

#:匹配0個(gè)或多個(gè)詞

*:匹配不多不少恰好1個(gè)詞

舉例:

item.#:能夠匹配item.insert.abc 或者 item.insert

item.*:只能匹配item.insert

創(chuàng)建RabbitMQ隊(duì)列與交換機(jī)綁定的配置類com…rabbitmq.config.RabbitMQConfig

package com.woniu.rabbitmq.config;

import org.springframework.amqp.core.*;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

@Configuration

public class RabbitMQConfig {

// Topics通配符模式

@Bean

public Queue tt01(){

return new Queue("tt01");

}

@Bean

public Queue tt02(){

return new Queue("tt02");

}

@Bean

public TopicExchange topicExchange(){

return new TopicExchange("tte");

}

@Bean

public Binding ttBinding01(){

return BindingBuilder.bind(tt01()).to(topicExchange()).with("#.error");

}

@Bean

public Binding ttBinding02(){

return BindingBuilder.bind(tt02()).to(topicExchange()).with("order.*");

}

}

1、生產(chǎn)方代碼實(shí)現(xiàn)

使用topic類型的Exchange

package com.woniu.rabbitmq.controller;

import com.woniu.rabbitmq.config.RabbitMQConfig;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.web.bind.annotation.PathVariable;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RestController;

@RestController

public class ProducerController {

@RequestMapping("/sendT1MT1/{msg}")

public String sendT1MT1(@PathVariable String msg){

rabbitTemplate.convertAndSend("tte", "11.error", msg);

return "success";

}

@RequestMapping("/sendT1MT2/{msg}")

public String sendT1MT2(@PathVariable String msg){

rabbitTemplate.convertAndSend("tte", "11.22.error", msg);

return "success";

}

@RequestMapping("/sendT1MF/{msg}")

public String sendT1MF(@PathVariable String msg){

rabbitTemplate.convertAndSend("tte", "11.error.22", msg);

return "success";

}

@RequestMapping("/sendT2MF/{msg}")

public String sendT2MF(@PathVariable String msg){

rabbitTemplate.convertAndSend("tte", "order.11.22", msg);

return "success";

}

@RequestMapping("/sendT2MT/{msg}")

public String sendT2MT(@PathVariable String msg){

rabbitTemplate.convertAndSend("tte", "order.1", msg);

return "success";

}

}

消費(fèi)者

@Component

public class DirectComsume {

@RabbitListener(queues = "tt01")

public void received(String msg){

System.out.println("接收到消息:"+msg);

}

@RabbitListener(queues = "tt02")

public void receivedB(String msg){

System.out.println("接收到消息:"+msg);

}

}

創(chuàng)建2個(gè)消費(fèi)方并啟動(dòng),然后使用生產(chǎn)者發(fā)送消息;在消費(fèi)者對(duì)應(yīng)的控制臺(tái)可以查看到生產(chǎn)者發(fā)送對(duì)應(yīng)routing key對(duì)應(yīng)隊(duì)列的消息;到達(dá)按照需要接收的效果;并且這些routing key可以使用通配符。

四,RabbitMQ高級(jí)

1.消息的可靠投遞(生產(chǎn)者端)

在使用 RabbitMQ 的時(shí)候,作為消息發(fā)送方希望杜絕任何消息丟失或者投遞失敗場(chǎng)景。RabbitMQ 為我們提供了兩種方式用來(lái)控制消息的投遞可靠性模式。

confirm 確認(rèn)模式return 退回模式

rabbitmq 整個(gè)消息投遞的路徑為:producer—>rabbitmq broker—>exchange—>queue—>consumer

消息從 producer 到 exchange,不管exchange是否收到生產(chǎn)者消息,都會(huì)返回一個(gè) confirmCallback 。消息從 exchange–>queue 投遞失敗則會(huì)返回一個(gè) returnCallback 。

我們將利用這兩個(gè) callback 控制消息的可靠性投遞

1.1.confirmCallback確認(rèn)模式

1.在配置文件中 添加publisher-confirm-type: correlated配置

spring:

rabbitmq:

host: localhost

port: 5672

username: woniu

password: woniu

virtual-host: /woniu

publisher-confirm-type: correlated #發(fā)布消息成功到交換器后會(huì)觸發(fā)回調(diào)方法

publisher-returns: true #返回確認(rèn)信息

在springboot2.2.0.RELEASE版本之前(spring.rabbitmq.publisher-confirm發(fā)布確認(rèn)屬性配置)是amqp正式支持的屬性,用來(lái)配置消息發(fā)送到交換器之后是否觸發(fā)回調(diào)方法,在2.2.0及之后該屬性過(guò)期使用spring.rabbitmq.publisher-confirm-type屬性配置代替,用來(lái)配置更多的確認(rèn)類型;

NONE值是禁用發(fā)布確認(rèn)模式,是默認(rèn)值; CORRELATED值是發(fā)布消息成功到交換器后會(huì)觸發(fā)回調(diào)方法; SIMPLE值經(jīng)測(cè)試有兩種效果,其一效果和CORRELATED值一樣會(huì)觸發(fā)回調(diào)方法,其二在發(fā)布消息成功后使用rabbitTemplate調(diào)用waitForConfirms或waitForConfirmsOrDie方法等待broker節(jié)點(diǎn)返回發(fā)送結(jié)果,根據(jù)返回結(jié)果來(lái)判定下一步的邏輯,要注意的點(diǎn)是waitForConfirmsOrDie方法如果返回false則會(huì)關(guān)閉channel,則接下來(lái)無(wú)法發(fā)送消息到broker。

2、編寫(xiě)Product類

package com.woniu.product;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.core.ReturnedMessage;

import org.springframework.amqp.rabbit.connection.CorrelationData;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

/**

* 往消息隊(duì)列返送消息

*/

@Component

public class ConfirmProduct implements RabbitTemplate.ConfirmCallback ,RabbitTemplate.ReturnsCallback{

@Autowired

private RabbitTemplate rabbitTemplate;

/**

* arg1: 交換機(jī)名字

* arg2: 路由名字

* arg3: 參數(shù)名字

* @param msg

*/

public void send(String msg,String routekey){

rabbitTemplate.setConfirmCallback(this); //設(shè)置回調(diào)

rabbitTemplate.setReturnsCallback(this);

rabbitTemplate.convertAndSend("confimExchange",routekey,msg);

}

/**

*

* @param correlationData 消息的唯一標(biāo)識(shí),如果發(fā)送失敗,可以根據(jù)這個(gè)標(biāo)識(shí)補(bǔ)發(fā)信息

* @param status 交換機(jī)是否成功收到信息,true:成功

* @param reason 失敗的原因

*/

@Override

public void confirm(CorrelationData correlationData, boolean status, String reason) {

System.out.println("進(jìn)入confirm方法");

System.out.println(status);

System.out.println(reason);

}

/**

* 只有消息路由失敗進(jìn)入,比如:找不到路由等

* @param returnedMessage

*/

@Override

public void returnedMessage(ReturnedMessage returnedMessage) {

returnedMessage.getMessage(); //失敗的原因

returnedMessage.getReplyCode(); //失敗的狀態(tài)碼

returnedMessage.getReplyText(); //失敗的原因

returnedMessage.getExchange(); //交換機(jī)

returnedMessage.getRoutingKey(); //路由

}

}

失敗演示可以把convertAndSend的交換機(jī)名字寫(xiě)錯(cuò)

1.2.ReturnCallBack確認(rèn)模式

在上個(gè)例子的基礎(chǔ)上,再添加一個(gè)測(cè)試方法returnedMessage

由于路由鍵不正確 022,故交換機(jī)的消息無(wú)法發(fā)送到消息隊(duì)列,setReturnCallback()方法,也就是Exchange路由到Queue失敗時(shí)執(zhí)行,這個(gè)前提是必須設(shè)置 rabbitTemplate.setMandatory(true);如果不加這句話,意味著交換機(jī)處理消息模式采用默認(rèn)的模式,模式模式是直接丟掉該消息,不會(huì)執(zhí)行setReturnCallback()方法。 當(dāng)然如果交換機(jī)發(fā)送消息到隊(duì)列,如果成功了也不會(huì)執(zhí)行該方法,因?yàn)閟etReturnCallback是交換機(jī)發(fā)送消息到隊(duì)列失敗才執(zhí)行的。

失敗演示生產(chǎn)者可以把發(fā)送的routekey寫(xiě)錯(cuò)

消息的可靠投遞小結(jié)

設(shè)置ConnectionFactory的publisher-confirms=“true” 開(kāi)啟 確認(rèn)模式。 使用rabbitTemplate.setConfirmCallback設(shè)置回調(diào)函數(shù)。當(dāng)消息發(fā)送到exchange后回調(diào)confirm方法。在方法中判斷ack,如果為true,則發(fā)送成功,如果為false,則發(fā)送失敗,需要處理。

設(shè)置ConnectionFactory的publisher-returns=“true” 開(kāi)啟 退回模式。使用rabbitTemplate.setReturnCallback設(shè)置退回函數(shù),當(dāng)消息從exchange路由到queue失敗后,如果設(shè)置了rabbitTemplate.setMandatory(true)參數(shù),則會(huì)將消息退回給producer。并執(zhí)行回調(diào)函數(shù)returnedMessage。

在RabbitMQ中也提供了事務(wù)機(jī)制,但是性能較差,此處不做講解。 使用channel下列方法,完成事務(wù)控制: txSelect(), 用于將當(dāng)前channel設(shè)置成transaction模式 txCommit(),用于提交事務(wù) txRollback(),用于回滾事務(wù)

2.Consumer ACK(消費(fèi)者端)

ack指Acknowledge(翻譯為:應(yīng)答),表示消費(fèi)端收到消息后的確認(rèn)方式。有三種確認(rèn)方式:

自動(dòng)確認(rèn):acknowledge=“none”手動(dòng)確認(rèn):acknowledge=“manual”根據(jù)異常情況確認(rèn):acknowledge=“auto”,(這種方式使用麻煩,不作講解)

其中自動(dòng)確認(rèn)是指,當(dāng)消息一旦被Consumer接收到,則自動(dòng)確認(rèn)收到,并將相應(yīng) message 從 RabbitMQ 的消息緩存中移除。但是在實(shí)際業(yè)務(wù)處理中,很可能消息接收到,業(yè)務(wù)處理出現(xiàn)異常,那么該消息就會(huì)丟失。如果設(shè)置了手動(dòng)確認(rèn)方式,則需要在業(yè)務(wù)處理成功后,調(diào)用channel.basicAck(),手動(dòng)確認(rèn),如果出現(xiàn)異常,則調(diào)用channel.basicNack()方法,讓其自動(dòng)重新發(fā)送消息。

2.1 消費(fèi)方工程搭建

1.在配置文件中 添加手動(dòng)確認(rèn)的配置

spring:

rabbitmq:

listener:

simple:

acknowledge-mode: manual

prefetch: 1 # 默認(rèn)一批一批消費(fèi)的,提高消息,默認(rèn)250個(gè)

2.編寫(xiě)Ack監(jiān)聽(tīng)器

@Component

public class ConfirmConsumer {

/**

* 消費(fèi)消息

*/

@RabbitListener(queues = "confirmQueue")

private void recevie(String msg, Message message, Channel channel) throws IOException {

//arg1:消息的id

//arg2:是否批量確認(rèn)這些信息 false:否

//channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

/**

* arg1:消息的id

* arg2:是否批量確認(rèn)這些信息 false:否

* arg3:是否重新將消息放回隊(duì)列中,false:不放會(huì),一般都是false

*/

channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);

System.out.println("消費(fèi)者A接收到:"+msg);

}

}

basicAck的批量應(yīng)答問(wèn)題說(shuō)明:

channel.basicAck(8,true) 如果前面還有4,6,7的deliveryTag未被確認(rèn),則會(huì)一起確認(rèn),減少網(wǎng)絡(luò)流量,當(dāng)然當(dāng)前deliveryTag=8這條消息也會(huì)確認(rèn),如果沒(méi)有前面沒(méi)有未被確認(rèn)的消息,則只會(huì)確認(rèn)當(dāng)前消息,也就是說(shuō)可以一次性確認(rèn)某個(gè)隊(duì)列小于等于delivery_tag值的所有消息

basicNack的參數(shù)說(shuō)明:

第一個(gè)參數(shù)為deliveryTag,也就是每個(gè)消息標(biāo)記index,消息標(biāo)記值從1開(kāi)始,依次遞增 第二個(gè)參數(shù)為multiple,表示是否批量,如果為true,那么小于或者等于該消息標(biāo)記的消息(如果還沒(méi)有簽收)都會(huì)拒絕簽收 第三個(gè)參數(shù)為requeue,表示被拒絕的消息是否重回隊(duì)列,如果設(shè)置為true,則消息重新回到queue,那么broker會(huì)重新推送該消息給消費(fèi)端,如果設(shè)置為false,則消息在隊(duì)列中被刪除,即消息會(huì)被直接丟失(當(dāng)然如果為false,還有一種情況就是放到死信隊(duì)列)

啟動(dòng)之前的生產(chǎn)者發(fā)送消息給test_queue_confirm隊(duì)列,如果拋出異常則該消息一直重發(fā)

2.2 消息可靠性總結(jié)

持久化

exchange要持久化queue要持久化message要持久化

生產(chǎn)方確認(rèn)Confirm 消費(fèi)方確認(rèn)Ack Broker高可用

3. 死信隊(duì)列

TTL(time to live)

TTL是rabbitmq 中一個(gè)消息或者隊(duì)列的屬性,表明一條信息或者該隊(duì)列中的所有消息的最大存活時(shí)間。單位是毫秒,換句話說(shuō),如果一條信息設(shè)置了TTL 屬性或者設(shè)置TTL屬性的隊(duì)列,那么這個(gè)條信息如果在TTL設(shè)置的時(shí)間沒(méi)有被消費(fèi),則會(huì)成為死信。如果同時(shí)配置了隊(duì)列的TTL和消息的TTL, 那么較小的那個(gè)值將會(huì)被使用。

死信隊(duì)列

英文縮寫(xiě):DLX 。Dead Letter Exchange(死信交換機(jī)),當(dāng)消息在隊(duì)列成為Dead message后,通過(guò)該隊(duì)列把這條死信消息發(fā)給另一個(gè)交換機(jī),這個(gè)交換機(jī)就是DLX。

消息成為死信的三種情況(面試常問(wèn)):

隊(duì)列消息長(zhǎng)度到達(dá)限制(淘汰最早的消息);消費(fèi)者拒接消費(fèi)消息,basicNack/basicReject,并且不把消息重新放入原目標(biāo)隊(duì)列,requeue=false;原隊(duì)列存在消息過(guò)期設(shè)置,消息到達(dá)超時(shí)時(shí)間未被消費(fèi);

隊(duì)列綁定死信交換機(jī): 給隊(duì)列設(shè)置參數(shù): x-dead-letter-exchange 和 x-dead-letter-routing-key

5.1 死信隊(duì)列實(shí)現(xiàn)過(guò)程

1、創(chuàng)建RabbitMQ隊(duì)列與交換機(jī)綁定的配置類com…rabbitmq.config.RabbitMQConfig

package com.woniu.rabbitmq.config;

import org.springframework.amqp.core.*;

import org.springframework.amqp.rabbit.connection.ConnectionFactory;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.amqp.support.converter.SerializerMessageConverter;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.context.annotation.Scope;

import java.util.HashMap;

import java.util.Map;

@Configuration

public class RabbitMQConfig {

// 1. 創(chuàng)建DLX交換機(jī)

@Bean

public DirectExchange dlxDirectExchange(){

return new DirectExchange("dlx_exchange");

}

// 2. ttl隊(duì)列

@Bean

public Queue ttlQueue(){

Map args = new HashMap<>();

// 1、正常隊(duì)列綁定死信交換機(jī)-->

// 1.1 x-dead-letter-exchange 死信交換機(jī)的名稱

args.put("x-dead-letter-exchange", "dlx_exchange");

// 1.2 x-dead-letter-routing-key 正常隊(duì)列發(fā)送消息到死信 交換機(jī)的routingKey

args.put("x-dead-letter-routing-key", "dlx01");

// 2 消息成為死信的三種情況

// 2.1 設(shè)置隊(duì)列的過(guò)期時(shí)間 ttl x-message-ttl

args.put("x-message-ttl", 1000 * 10);

// 2.2 設(shè)置隊(duì)列的長(zhǎng)度限制 x-max-length 10條消息,超過(guò)進(jìn)死信

args.put("x-max-length", 10);

// 2.3 消費(fèi)者拒接消費(fèi)消息,并且不重回隊(duì)列 這種情況后面在消費(fèi)工程測(cè)試

return QueueBuilder.durable("ttlQueue").withArguments(args).build();

}

// 3. 死信隊(duì)列

@Bean

public Queue dlxQ(){

return new Queue("dlxQ");

}

// 4.dlxQ綁定DXL交換機(jī)

@Bean

public Binding dlxBinding(){

return BindingBuilder.bind(dlxQ()).to(dlxDirectExchange()).with("dlx01");

}

}

2、生產(chǎn)者工程測(cè)試:

//死信隊(duì)列測(cè)試

@Test

public void testDlx(){

//1、測(cè)試過(guò)期時(shí)間,死信消息

//rabbitTemplate.convertAndSend("ttlQueue","我是一條消息,我會(huì)死嗎");

//2、測(cè)試隊(duì)列長(zhǎng)度限制,消息死信

for (int i = 0; i < 20 ; i++) {

rabbitTemplate.convertAndSend("ttlQueue","我是一條消息,我會(huì)死嗎");

}

//前兩步測(cè)試結(jié)果:死信隊(duì)列會(huì)有21條記錄 1(過(guò)期) + 10(限制)+10(正常隊(duì)列過(guò)期后的10條)

}

3、消息成為死信的第三種情況實(shí)現(xiàn)

1).在配置文件中 添加手動(dòng)確認(rèn)的配置

spring:

rabbitmq:

listener:

simple:

acknowledge-mode: manual

2).添加正常隊(duì)列的監(jiān)聽(tīng)器

package com.woniu.rabbitmq.mq;

import com.rabbitmq.client.Channel;

import lombok.SneakyThrows;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

/*

* @Date:2023/2/28

* @Description:

*/

@Component

@RabbitListener(queues = "ttlQueue")

public class TtlListener {

@SneakyThrows

@RabbitHandler

public void dlxQ(String msg, Message message, Channel channel){

long deliveryTag = message.getMessageProperties().getDeliveryTag();

try {

System.out.println(new String(message.getBody()));

System.out.println("處理業(yè)務(wù)邏輯");

int m = 1/0;

channel.basicAck(deliveryTag,true);

}catch(Exception ex){

/** basicNack(long deliveryTag, boolean multiple, boolean requeue)

* multiple是否批量. true:將一次性拒絕所有小于或者等于deliveryTag的消息。

* requeue:被拒絕的消息是否重回隊(duì)列。如果設(shè)置為true,則消息重新回到queue,broker會(huì)重新發(fā)送該消息給消費(fèi)端,如果 * 為 requeue=false,不重回隊(duì)列,則消息發(fā)送最終到死信隊(duì)列

*/

channel.basicNack(deliveryTag,true,false);

}

}

}

3).在生產(chǎn)端的testDlx方法再次給正常交換機(jī)發(fā)送消息

//死信隊(duì)列測(cè)試

@Test

public void testDlx(){

rabbitTemplate.convertAndSend("ttlQueue","我是一條消息,我會(huì)死嗎?");

}

4. 延遲隊(duì)列

延遲隊(duì)列,即消息進(jìn)入隊(duì)列后不會(huì)立即被消費(fèi)者調(diào)用,只有到達(dá)指定時(shí)間后,才會(huì)被調(diào)用者調(diào)用消費(fèi)。

如下需求:

下單后,30分鐘未支付,取消訂單,回滾庫(kù)存。

當(dāng)用戶提交訂單后,數(shù)據(jù)庫(kù)保存訂單信息,同時(shí)庫(kù)存表相應(yīng)的庫(kù)存減少,然后消息隊(duì)列保存訂單的信息(如訂單Id),此時(shí)庫(kù)存系統(tǒng)監(jiān)聽(tīng)隊(duì)列,隊(duì)列不會(huì)把消息立刻發(fā)送給庫(kù)存,而是過(guò)30分鐘再把信息發(fā)送給庫(kù)存系統(tǒng),庫(kù)存系統(tǒng)去查詢訂單數(shù)據(jù)庫(kù),根據(jù)訂單id查詢,如果該訂單還沒(méi)有支付,則取消訂單,回滾庫(kù)存,如果支付過(guò)了,則庫(kù)存表什么都不用做。也就是給用戶30分鐘的機(jī)會(huì),一個(gè)訂單在30分鐘后還沒(méi)有支付,則該訂單的庫(kù)存信息直接回滾。

新用戶注冊(cè)成功7天后,發(fā)送短信問(wèn)候。

實(shí)現(xiàn)方式:

定時(shí)器:我們可以寫(xiě)一段代碼,在某個(gè)時(shí)間段查詢訂單表的支付情況。把提交訂單的時(shí)間查出來(lái)和當(dāng)前系統(tǒng)時(shí)間比較,30分鐘之類如果訂單狀態(tài)為支付,則取消該訂單,大家思考一下有什么問(wèn)題?延遲隊(duì)列:很可惜,在RabbitMQ中并未提供延遲隊(duì)列功能。但是可以使用:TTL+死信隊(duì)列 組合實(shí)現(xiàn)延遲隊(duì)列的效果。

6.1 延遲隊(duì)列實(shí)現(xiàn)過(guò)程

通過(guò)插件實(shí)現(xiàn)

a, 把插件放入rabbitmq安裝目錄的plugins目錄

b, 進(jìn)入rabbitmq 安裝目錄的sbin 目錄

執(zhí)行下面命令讓改插件生效

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

代碼實(shí)現(xiàn)

1:定義個(gè)一個(gè)延遲交換機(jī),一個(gè)隊(duì)列

// 延遲交換機(jī),消息隊(duì)列

@Bean

public Queue delayQueue(){

return QueueBuilder.durable("delay").build();

}

/**

* CustomExchange: 自定義交換機(jī) ,是fanout,direct,topic 交換機(jī)

* @return

*/

@Bean

public CustomExchange customExchange(){

Map map = new HashMap();

//指定交換機(jī)類型

map.put("x-delayed-type","direct");

/**

* arg1:交換機(jī)名字

* arg2: 交換機(jī)信息類型,延遲消息

* arg3: 是否持久化,是否將沒(méi)有被消費(fèi)的消息持久化

* arg4: 沒(méi)有隊(duì)列綁定到交換機(jī),交換機(jī)是否刪除。

* arg5: 初始化參數(shù)

*/

return new CustomExchange("customExchange","x-delayed-message",

true,false,map);

}

/**

* 交換機(jī)綁定隊(duì)列

* @param delayQueue

* @param customExchange

* @return

*/

@Bean

public Binding delayQueueTocustomExchange(Queue delayQueue,CustomExchange customExchange){

return BindingBuilder.bind(delayQueue).to(customExchange).with("delay").noargs();

}

2: 定義個(gè)生產(chǎn)者

/**

* 往消息隊(duì)列返送消息

*/

@Component

@Slf4j

public class DelayProduct {

@Autowired

private RabbitTemplate rabbitTemplate;

public void send(String msg,int delayTime){

log.info("發(fā)送消息");

rabbitTemplate.convertAndSend("customExchange", "delay", msg,

new MessagePostProcessor() {

/**

* 在消息發(fā)送到消息隊(duì)列之前對(duì)消息進(jìn)行處理

* @param message

* @return

* @throws AmqpException

*/

@Override

public Message postProcessMessage(Message message) throws AmqpException {

// 給消息設(shè)置過(guò)期時(shí)間,單位是毫秒

message.getMessageProperties().setDelay(delayTime);

return message;

}

});

}

}

3: 定義個(gè)消費(fèi)者

@Component

@Slf4j

public class DelayConsumer {

@Autowired

private RabbitTemplate rabbitTemplate;

@RabbitListener(queues = "delay")

public void recevied(String msg,Channel channel,Message message) throws IOException {

log.info("消費(fèi)消息:"+ msg);

/**

* 消費(fèi)信息的id.

* 是否批量確認(rèn)信息

*/

channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

}

}

5. 日志與監(jiān)控

7.1 RabbitMQ日志

RabbitMQ默認(rèn)日志存放路徑: /var/log/rabbitmq/rabbit@xxx.log

日志包含了RabbitMQ的版本號(hào)、Erlang的版本號(hào)、RabbitMQ服務(wù)節(jié)點(diǎn)名稱、cookie的hash值、RabbitMQ配置文件地址、內(nèi)存限制、磁盤(pán)限制、默認(rèn)賬戶guest的創(chuàng)建以及權(quán)限配置等等。

7.2 rabbitmq常用命令

1、查看隊(duì)列

rabbitmqctl list_queues #查看所有虛擬主機(jī)里面的隊(duì)列

rabbitmqctl list_queues -p /vhost #查看某個(gè)虛擬主機(jī)里面的隊(duì)列

2、刪除所有隊(duì)列

rabbitmqctl stop_app #關(guān)閉應(yīng)用

rabbitmqctl reset #清除隊(duì)列中的消息

rabbitmqctl start_app # 再次啟動(dòng)此應(yīng)用

注意:此方式,會(huì)同時(shí)刪除一些配置信息,需要慎用

3、查看rabbitmq中的交換機(jī)

rabbitmqctl list_exchanges [-p vhost]

4、rabbitmq的用戶操作命令

rabbitmqctl list_users

rabbitmqctl add_user 用戶名 密碼

rabbitmqctl delete_user 用戶名

5、查看未被確認(rèn)的隊(duì)列

rabbitmqctl list_queues name messages_unacknowledged

6、查看隊(duì)列環(huán)境變量

rabbitmqctl environment

7、查看隊(duì)列消費(fèi)者信息

rabbitmqctl list_consumers

8、查看隊(duì)列連接

rabbitmqctl list_connections

9、查看準(zhǔn)備就緒的隊(duì)列

rabbitmqctl list_queues name messages_ready

10、查看單個(gè)隊(duì)列的內(nèi)存使用

rabbitmqctl list_queues name memory

11、列出所有虛擬主機(jī)

rabbitmqctl list_vhosts

rabbitmqctl status | grep rabbit ##查看rabbitmq的版本

6 消息追蹤

在使用任何消息中間件的過(guò)程中,難免會(huì)出現(xiàn)某條消息異常丟失的情況。

對(duì)于RabbitMQ而言,可能是因?yàn)樯a(chǎn)者或消費(fèi)者與RabbitMQ斷開(kāi)了連接,而它們與RabbitMQ又采用了不同的確認(rèn)機(jī)制;也有可能是因?yàn)榻粨Q器與隊(duì)列之間不同的轉(zhuǎn)發(fā)策略;甚至是交換器并沒(méi)有與任何隊(duì)列進(jìn)行綁定,生產(chǎn)者又不感知或者沒(méi)有采取相應(yīng)的措施;另外RabbitMQ本身的集群策略也可能導(dǎo)致消息的丟失。這個(gè)時(shí)候就需要有一個(gè)較好的機(jī)制跟蹤記錄消息的投遞過(guò)程,以此協(xié)助開(kāi)發(fā)和運(yùn)維人員進(jìn)行問(wèn)題的定位。

在RabbitMQ中可以使用Firehose和rabbitmq_tracing插件功能來(lái)實(shí)現(xiàn)消息追蹤。

8.1 消息追蹤-Firehose(了解)

firehose的機(jī)制是將生產(chǎn)者投遞給隊(duì)列的消息,以及隊(duì)列投遞給消費(fèi)者的消息按照指定的格式發(fā)送到默認(rèn)的exchange上。這個(gè)默認(rèn)的exchange的名稱為amq.rabbitmq.trace,它是一個(gè)topic類型的exchange。發(fā)送到這個(gè)exchange上的消息的routing key為 publish.exchangename 和 deliver.queuename。其中exchangename和queuename為實(shí)際交換機(jī)和隊(duì)列的名稱,分別對(duì)應(yīng)生產(chǎn)者投遞到exchange的消息,和消費(fèi)者從queue上獲取的消息。

1、打開(kāi)trace 功能

rabbitmqctl trace_on [-p vhost] ##開(kāi)啟Firehose命令

打開(kāi) trace 會(huì)影響消息寫(xiě)入功能,適當(dāng)打開(kāi)后請(qǐng)關(guān)閉,關(guān)閉Firehose命令:rabbitmqctl trace_off [-p vhost],打開(kāi)后會(huì)多一個(gè)交換機(jī),如下圖

2、新建一個(gè)消息隊(duì)列,并給該交換機(jī)綁定一個(gè)消息隊(duì)列

3、打開(kāi)任何一個(gè)其他的隊(duì)列,并往隊(duì)列發(fā)送一條消息,則這個(gè)test_trace隊(duì)列也會(huì)有其他隊(duì)列的消息

8.2 消息追蹤-rabbitmq_tracing

rabbitmq_tracing和Firehose在實(shí)現(xiàn)上如出一轍,只不過(guò)rabbitmq_tracing的方式比Firehose多了一層GUI的包裝,更容易使用和管理。

1、啟用插件:

[root@localhost ~]# rabbitmq-plugins list ###查詢插件

[root@localhost ~]# rabbitmq-plugins enable rabbitmq_tracing

1、新建一個(gè)trace,將來(lái)所有的消息都被trace保存起來(lái),文件的默認(rèn)路徑為/var/tmp/rabbitmq-tracing

不管在哪個(gè)隊(duì)列發(fā)送消息,都會(huì)保存到日志文件mytrace.log中

如果是用其它的用戶創(chuàng)建這個(gè)消息日志。則需要在/etc/rabbitmq/rabbit.config配置文件添加如下內(nèi)容:創(chuàng)建的用戶名和密碼

{rabbitmq_tracing,

[

{directory, "/var/log/rabbitmq/rabbitmq_tracing"},

{username, "woniu"},

{password, "woniu"}

]

}

重啟消息隊(duì)列服務(wù)器即可

7.RabbitMQ應(yīng)用問(wèn)題

消息可靠性保障、消息冪等性處理 、微服務(wù)中用消息隊(duì)列實(shí)現(xiàn)微服務(wù)的異步調(diào)用,而用openfeign采用的同步

9.1 消息可靠性保障-消息補(bǔ)償

消息補(bǔ)償機(jī)制

需求:100%確保消息發(fā)送成功

9.2 消息冪等性保障-樂(lè)觀鎖(了解)

冪等性指一次和多次請(qǐng)求某一個(gè)資源,對(duì)于資源本身應(yīng)該具有同樣的結(jié)果。也就是說(shuō),其任意多次執(zhí)行對(duì)資源本身所產(chǎn)生的影響均與一次執(zhí)行的影響相同。MQ中指,消費(fèi)多條相同的消息,得到與消費(fèi)該消息一次相同的結(jié)果。

樂(lè)觀鎖解決方案

第一次生產(chǎn)者發(fā)送一條消息,但是消費(fèi)方系統(tǒng)宕機(jī),即不能立即消費(fèi),于是回調(diào)檢查服務(wù)監(jiān)聽(tīng)不到Q2的響應(yīng)消息,也不會(huì)寫(xiě)入數(shù)據(jù)庫(kù)MDB,當(dāng)隔一段時(shí)間后,生產(chǎn)者又發(fā)送一條延遲消息到Q3隊(duì)列,回調(diào)檢查服務(wù)能監(jiān)聽(tīng)到Q3隊(duì)列消息,于是和MDB去比較是否有,由于消費(fèi)方的失敗,消息最終沒(méi)有入庫(kù)MDB,這個(gè)時(shí)候回調(diào)檢查服務(wù)和MDB數(shù)據(jù)庫(kù)比對(duì)失敗,于是通知生產(chǎn)者,重新發(fā)送一條消息給消費(fèi)者,那么這個(gè)時(shí)候Q1就有2條消息了,當(dāng)消費(fèi)方正常運(yùn)行的時(shí)候,由于監(jiān)聽(tīng)的Q1是兩條2消息,怎么辦呢?樂(lè)觀鎖

第一次執(zhí)行:version=1 update account set money = money - 500 , version = version + 1 where id = 1 and version = 1

第二次執(zhí)行:version=2 update account set money = money - 500 , version = version + 1 where id = 1 and version = 1

9.3 消息積壓?jiǎn)栴}

實(shí)際場(chǎng)景可能有這樣現(xiàn)象:大量消息在rabbitmq里積壓了幾個(gè)小時(shí)了還沒(méi)消息,怎么辦?

[root@localhost ~]# rabbitmq-plugins list ###查詢插件

[root@localhost ~]# rabbitmq-plugins enable rabbitmq_tracing

1、新建一個(gè)trace,將來(lái)所有的消息都被trace保存起來(lái),文件的默認(rèn)路徑為/var/tmp/rabbitmq-tracing

不管在哪個(gè)隊(duì)列發(fā)送消息,都會(huì)保存到日志文件mytrace.log中

如果是用其它的用戶創(chuàng)建這個(gè)消息日志。則需要在/etc/rabbitmq/rabbit.config配置文件添加如下內(nèi)容:創(chuàng)建的用戶名和密碼

{rabbitmq_tracing,

[

{directory, "/var/log/rabbitmq/rabbitmq_tracing"},

{username, "woniu"},

{password, "woniu"}

]

}

重啟消息隊(duì)列服務(wù)器即可

7.RabbitMQ應(yīng)用問(wèn)題

消息可靠性保障、消息冪等性處理 、微服務(wù)中用消息隊(duì)列實(shí)現(xiàn)微服務(wù)的異步調(diào)用,而用openfeign采用的同步

9.1 消息可靠性保障-消息補(bǔ)償

消息補(bǔ)償機(jī)制

需求:100%確保消息發(fā)送成功

9.2 消息冪等性保障-樂(lè)觀鎖(了解)

冪等性指一次和多次請(qǐng)求某一個(gè)資源,對(duì)于資源本身應(yīng)該具有同樣的結(jié)果。也就是說(shuō),其任意多次執(zhí)行對(duì)資源本身所產(chǎn)生的影響均與一次執(zhí)行的影響相同。MQ中指,消費(fèi)多條相同的消息,得到與消費(fèi)該消息一次相同的結(jié)果。

樂(lè)觀鎖解決方案

第一次生產(chǎn)者發(fā)送一條消息,但是消費(fèi)方系統(tǒng)宕機(jī),即不能立即消費(fèi),于是回調(diào)檢查服務(wù)監(jiān)聽(tīng)不到Q2的響應(yīng)消息,也不會(huì)寫(xiě)入數(shù)據(jù)庫(kù)MDB,當(dāng)隔一段時(shí)間后,生產(chǎn)者又發(fā)送一條延遲消息到Q3隊(duì)列,回調(diào)檢查服務(wù)能監(jiān)聽(tīng)到Q3隊(duì)列消息,于是和MDB去比較是否有,由于消費(fèi)方的失敗,消息最終沒(méi)有入庫(kù)MDB,這個(gè)時(shí)候回調(diào)檢查服務(wù)和MDB數(shù)據(jù)庫(kù)比對(duì)失敗,于是通知生產(chǎn)者,重新發(fā)送一條消息給消費(fèi)者,那么這個(gè)時(shí)候Q1就有2條消息了,當(dāng)消費(fèi)方正常運(yùn)行的時(shí)候,由于監(jiān)聽(tīng)的Q1是兩條2消息,怎么辦呢?樂(lè)觀鎖

第一次執(zhí)行:version=1 update account set money = money - 500 , version = version + 1 where id = 1 and version = 1

第二次執(zhí)行:version=2 update account set money = money - 500 , version = version + 1 where id = 1 and version = 1

9.3 消息積壓?jiǎn)栴}

實(shí)際場(chǎng)景可能有這樣現(xiàn)象:大量消息在rabbitmq里積壓了幾個(gè)小時(shí)了還沒(méi)消息,怎么辦?

這種時(shí)候只好采用 “丟棄+批量重導(dǎo)” 的方式來(lái)解決了,臨時(shí)寫(xiě)個(gè)程序,連接到mq里面消費(fèi)數(shù)據(jù),收到消息之后直接將其丟棄,快速消費(fèi)掉積壓的消息,降低MQ的壓力?;蛘叨鄦讉€(gè)消費(fèi)端。

柚子快報(bào)激活碼778899分享:分布式 rabbitmq

http://yzkb.51969.com/

參考閱讀

評(píng)論可見(jiàn),查看隱藏內(nèi)容

本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場(chǎng)。

轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。

本文鏈接:http://m.gantiao.com.cn/post/19173120.html

發(fā)布評(píng)論

您暫未設(shè)置收款碼

請(qǐng)?jiān)谥黝}配置——文章設(shè)置里上傳

掃描二維碼手機(jī)訪問(wèn)

文章目錄