柚子快報(bào)邀請(qǐng)碼778899分享:分布式 RabbitMq
柚子快報(bào)邀請(qǐng)碼778899分享:分布式 RabbitMq
目錄
一、為什么要用到RabbitMq?
二、RabbitMq有什么作用?
1.解耦
2.異步
三、RabbitMq的模型
1.helloword模型
2.Work模型
3.發(fā)布訂閱模型
4.路由鍵模型
5.主題模型
四、RabbitMq跟SpringBoot的整合
1.導(dǎo)入依賴
2.yml配置
3.創(chuàng)建隊(duì)列、創(chuàng)建交換機(jī)、將隊(duì)列與交換機(jī)綁定并設(shè)置路由鍵
4.生產(chǎn)者發(fā)送消息
5.消費(fèi)者消費(fèi)消息
五、ACK機(jī)制
1.什么是消息確認(rèn)機(jī)制?
2.手動(dòng)開啟ACK
2.消費(fèi)者應(yīng)答
六、消息的可靠性
1.消息可靠性講的不能丟失,MQ是如何保證消息可靠性的?
1.1消息從生產(chǎn)者到交換機(jī)有可能會(huì)丟失。這里可以通過confirm機(jī)制來解決
1.2交換機(jī)到隊(duì)列也有可能會(huì)丟失。這里可以通過return機(jī)制來解決
1.33、從隊(duì)列到消費(fèi)者也有可能會(huì)丟失。這里可以通過手動(dòng)ACK解決。
2.confirm和return機(jī)制的實(shí)現(xiàn)
2.1yml配置
3.MQ是如何實(shí)現(xiàn)消息確認(rèn)機(jī)制的?
4.消息補(bǔ)償機(jī)制
5.服務(wù)端實(shí)現(xiàn)遠(yuǎn)程調(diào)用(RPC)
5.1通過java網(wǎng)絡(luò)編程包
5.2RestTemplate
5.3ApacheHttpClient:通過工具類即可
七、消息的重復(fù)消費(fèi)
1.消息被消費(fèi)多次的后果
2.怎么解決消息被重復(fù)消費(fèi)
4.冪等性解決方案
八、死信隊(duì)列
1.什么是死信?
2.什么是非正常的消息
3.什么是死信隊(duì)列
九、延遲隊(duì)列
一、為什么要用到RabbitMq?
因?yàn)橄裎覀冎暗捻?xiàng)目,代碼之間的執(zhí)行都是同步的,一個(gè)業(yè)務(wù)的處理必須等待上一個(gè)業(yè)務(wù)的完成,這樣就比較耗費(fèi)時(shí)間,比如我們的用戶查詢數(shù)據(jù)的時(shí)候,對(duì)于用戶而言他只需要查尋數(shù)據(jù)這一個(gè)操作,對(duì)于我們服務(wù)端而言可能還需要做一些處理,像存入緩存、刪除緩存,只有做完這些操作我們服務(wù)端才會(huì)把數(shù)據(jù)傳給用戶,但是這些是我們業(yè)務(wù)的處理,不應(yīng)該讓用戶來承擔(dān)這樣的一個(gè)時(shí)間成本,并且用戶等待數(shù)據(jù)的時(shí)間過長(zhǎng),給用戶也會(huì)帶來了很不好的體驗(yàn)感,同時(shí)模塊之前的耦合性很高,一個(gè)模塊宕機(jī)后,全部模塊都不能用了。所以要中間件RabbitMQ
二、RabbitMq有什么作用?
1.解耦
就是rabbitMq有一個(gè)生產(chǎn)者模塊負(fù)責(zé)發(fā)送消息到隊(duì)列中,一個(gè)消費(fèi)者模塊負(fù)責(zé)從隊(duì)列中拿到數(shù)據(jù)進(jìn)行消費(fèi),模塊與模塊間分離,通過RabbitMq進(jìn)行數(shù)據(jù)通信
2.異步
它可以不需要等待我們代碼的全部執(zhí)行,就可以用戶所需要的數(shù)據(jù)將消息發(fā)送到隊(duì)列中,然后由隊(duì)列推給用戶
三、RabbitMq的模型
1.helloword模型
一個(gè)生產(chǎn)者,一個(gè)隊(duì)列,一個(gè)消費(fèi)者
2.Work模型
?一個(gè)生產(chǎn)者,一個(gè)隊(duì)列,多個(gè)消費(fèi)者
采用多個(gè)消費(fèi)者是為了加快消息的消費(fèi),多個(gè)消費(fèi)者之間采用輪訓(xùn)的方式。
3.發(fā)布訂閱模型
一個(gè)生產(chǎn)者,一個(gè)交換機(jī) ,多個(gè)隊(duì)列,一個(gè)隊(duì)列上對(duì)應(yīng)一個(gè)消費(fèi)者,消費(fèi)者只有訂閱才能收到消息,交換機(jī)通過廣播給訂閱的隊(duì)列
4.路由鍵模型
路由鍵模型,跟發(fā)布訂閱一樣,只不過多了個(gè)路由鍵,進(jìn)行一個(gè)條件的判斷
5.主題模型
主體模型跟路由鍵類似,只不過路由鍵多了兩個(gè)符號(hào) *代表可以接單個(gè)字符,# 代表可以接多個(gè)字符
四、RabbitMq跟SpringBoot的整合
1.導(dǎo)入依賴
2.yml配置
spring:
rabbitmq:
host: 192.168.107.123 #虛擬主機(jī)的ip地址
port: 5672 #RabbitMq的端口號(hào)
username: guest #匿名用戶
password: guest
virtual-host: / # 虛擬機(jī)主機(jī),隊(duì)列就是保存在虛擬主機(jī)中
3.創(chuàng)建隊(duì)列、創(chuàng)建交換機(jī)、將隊(duì)列與交換機(jī)綁定并設(shè)置路由鍵
// 交換機(jī)的類型是路由鍵
@Bean
public DirectExchange directExchange() {
return new DirectExchange("direct-exchange");
}
@Bean
public Queue directQueue1() {
return new Queue("direct-queue1");
}
@Bean
public Queue directQueue2() {
return new Queue("direct-queue2");
}
// 綁定
@Bean
public Binding directQueue1Bind() {
// 給direct-queue1綁定了兩個(gè)隊(duì)列
BindingBuilder.DirectExchangeRoutingKeyConfigurer to = BindingBuilder.bind(directQueue1()).to(directExchange());
// 綁定了兩個(gè)路由鍵
to.with("error");
Binding warn = to.with("info");
return warn;
}
@Bean
public Binding directQueue2Bind() {
return BindingBuilder.bind(directQueue1()).to(directExchange()).with("info");
}
4.生產(chǎn)者發(fā)送消息
@Test
void contextLoads() {
rabbitTemplate.convertAndSend("direct-exchange", "error", "toString");
System.out.println("生產(chǎn)者消息發(fā)送完成");
}
5.消費(fèi)者消費(fèi)消息
@Component
public class HelloQueueListener {
@RabbitListener(queues = "direct-queue1")
public void cosnuermMsg(String msg) {
System.out.println("消費(fèi)者拿到的數(shù)是:" + msg);
}
}
五、ACK機(jī)制
ACK機(jī)制是消費(fèi)端的一個(gè)消息確認(rèn)機(jī)制
1.什么是消息確認(rèn)機(jī)制?
MQServer把消息推送給消費(fèi)者后,消費(fèi)者開始消費(fèi),消費(fèi)完成后需要把結(jié)果給MQServer應(yīng)答一下,消費(fèi)結(jié)果有兩種情況:失敗、成功
消費(fèi)成功:應(yīng)答ACK,MQServer手動(dòng)ACK后就明白這個(gè)消息已經(jīng)被成功的消費(fèi)了,可以從隊(duì)列中刪除了。
消費(fèi)失?。簯?yīng)答NACK。MQServer收到NACK后知道了消費(fèi)者無法消費(fèi)這個(gè)消息,發(fā)送給其他的消費(fèi)者進(jìn)行消費(fèi)。如果其他的消費(fèi)者也是無法消費(fèi),此時(shí)需要這類消息全部的收集起來入庫(kù),通知相關(guān)人員來檢查。
消費(fèi)者默認(rèn)自動(dòng)應(yīng)答,不出異常自動(dòng)應(yīng)答,出了異常應(yīng)答NACK,并且把這個(gè)消息壓入到隊(duì)列
ready:待分配(消費(fèi)者)的消息的數(shù)量。
unackded:待應(yīng)答的消息數(shù)量。
total:總消息的數(shù)量。
當(dāng)出現(xiàn)異常沒有處理時(shí)候,那么被認(rèn)為應(yīng)答nACK,消息回到隊(duì)列的待應(yīng)答狀態(tài),關(guān)掉消費(fèi)者,則進(jìn)入待分配狀態(tài)客戶端先創(chuàng)建連接對(duì)象,有了連接對(duì)象才能創(chuàng)建信道進(jìn)行數(shù)據(jù)的傳輸
channel斷開后把待應(yīng)答的消息,全部變?yōu)榇峙洹?/p>
mqServer是支持持久化的,重啟后數(shù)據(jù)還有,down刪除就沒有了
2.手動(dòng)開啟ACK
1.ylm配置文件
spring:
rabbitmq:
host: 192.168.127.102
port: 5672
username: guest
password: guest
virtual-host: /
listener:
simple:
acknowledge-mode: manual # 手動(dòng)ACK
2.消費(fèi)者應(yīng)答
@RabbitListener(queues = "hello-queue")
public void cosnuermMsg(String msg, Channel channel, Message message) {
System.out.println("消費(fèi)者拿到的數(shù)是:" + msg);
// 每個(gè)消息的標(biāo)識(shí)
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// 開始消費(fèi)消息
Boolean flag = customerData(msg);
if (flag) {
// 確定消息消費(fèi)成功,應(yīng)答ACK
// 第一個(gè)參數(shù):消息的唯一標(biāo)識(shí)
// 第二個(gè)參數(shù):是否批量應(yīng)答,一般都是false
channel.basicAck(deliveryTag, false);
System.out.println("消息成功,應(yīng)答ACK");
return;
}
System.out.println("消息過程中沒有出現(xiàn)異常,但是消息沒有消費(fèi)成功,應(yīng)答NACK");
} catch (Exception e) {
e.printStackTrace();
System.out.println("消費(fèi)過程中出現(xiàn)了異常,應(yīng)答NACK");
}
// 消息消費(fèi)失敗,應(yīng)答NACK
// 第三個(gè)參數(shù)是:是否壓入隊(duì)列,如果設(shè)置為false該消息就丟棄了
try {
channel.basicNack(deliveryTag, false, true);
} catch (IOException e) {
e.printStackTrace();
}
}
// 完成消息的消費(fèi)
private Boolean customerData(String msg) {
Integer i = Integer.parseInt(msg);
if (i == 0) { // 沒有插入成功,但是也沒有出現(xiàn)異常
return false;
}
return true;
}
六、消息的可靠性
1.消息可靠性講的不能丟失,MQ是如何保證消息可靠性的?
1.1消息從生產(chǎn)者到交換機(jī)有可能會(huì)丟失。這里可以通過confirm機(jī)制來解決
confirm機(jī)制是RabbitMQ自己提供的一個(gè)機(jī)制,用來確認(rèn)消息是否到了交換機(jī)了。
1.2交換機(jī)到隊(duì)列也有可能會(huì)丟失。這里可以通過return機(jī)制來解決
reutrn機(jī)制是RabbitMQ自己提供的一個(gè)機(jī)制,用來確認(rèn)消息是否到了隊(duì)列了。
1.33、從隊(duì)列到消費(fèi)者也有可能會(huì)丟失。這里可以通過手動(dòng)ACK解決。
手動(dòng)ACK后就明白這個(gè)消息已經(jīng)被成功的消費(fèi)了,可以從隊(duì)列中刪除了
2.confirm和return機(jī)制的實(shí)現(xiàn)
2.1yml配置
spring:
rabbitmq:
host: 192.168.127.102
port: 5672
username: guest
password: guest
virtual-host: /
publisher-returns: true #開啟return
publisher-confirm-type: simple # 開啟confirm
/**
* confirm機(jī)制和return機(jī)制
*/
@Component
public class MsgConfirm implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
//這里要對(duì)mq的return和confirm進(jìn)行覆蓋
rabbitTemplate.setReturnCallback(this);
rabbitTemplate.setConfirmCallback(this);
}
// confirm機(jī)制確認(rèn)消息是否到了交換機(jī)
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("消息已經(jīng)到了交換機(jī)");
} else {
System.out.println("消息沒到了交換機(jī)");
}
}
// 確認(rèn)消息是否到了隊(duì)列
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("消息沒有到隊(duì)列," + replyText + "," + exchange + "," + routingKey);
}
3.MQ是如何實(shí)現(xiàn)消息確認(rèn)機(jī)制的?
生產(chǎn)者發(fā)送消息的時(shí)候除了交換機(jī),數(shù)據(jù),路由鍵把回調(diào)函數(shù)也發(fā)送過去了,因?yàn)橄⑹欠竦搅私粨Q機(jī)時(shí)MQServer確定的,如果到了就調(diào)用ACK
4.消息補(bǔ)償機(jī)制
但是這只是通知消息是否正常到達(dá),并沒有對(duì)消息沒有達(dá)到的情況進(jìn)行一個(gè)處理,所以我們要進(jìn)行消息補(bǔ)償,生產(chǎn)者對(duì)未發(fā)送成功的消息進(jìn)行一個(gè)消息補(bǔ)償,特別是對(duì)業(yè)務(wù)很重要的數(shù)據(jù)必須補(bǔ)償,無關(guān)緊要的倒可以不必因?yàn)橄⒀a(bǔ)償需要成本
也就是mq發(fā)送數(shù)據(jù)出現(xiàn)故障的時(shí)候,就可以考慮別的方式發(fā)送數(shù)據(jù)了,這就是消息補(bǔ)償,如:RPC遠(yuǎn)程調(diào)用直接由生產(chǎn)者發(fā)送給消費(fèi)者,不通過mq
5.服務(wù)端實(shí)現(xiàn)遠(yuǎn)程調(diào)用(RPC)
5.1通過java網(wǎng)絡(luò)編程包
5.2RestTemplate
@Test
void contextLoads2() throws Exception {
// 這個(gè)類可以發(fā)送一個(gè)請(qǐng)求過去
RestTemplate restTemplate = new RestTemplate();
String info = restTemplate.getForObject("http://localhost:8080/send?msg=HTTP", String.class);
System.out.println("遠(yuǎn)程調(diào)用的返回的結(jié)果:"+info);
}
5.3ApacheHttpClient:通過工具類即可
七、消息的重復(fù)消費(fèi)
消息的重復(fù)消費(fèi)指的是一個(gè)消息被同一個(gè)消費(fèi)者消費(fèi)了多次。
1.消息被消費(fèi)多次的后果
消費(fèi)者拿到消息后干的事情是扣款或者是發(fā)送短信等待。
正常來說消費(fèi)一次就夠了,重復(fù)消費(fèi)后就發(fā)現(xiàn)扣款了多次。
2.怎么解決消息被重復(fù)消費(fèi)
控制冪等性? 冪等性:多次相同的操作不會(huì)對(duì)數(shù)據(jù)產(chǎn)生影響
接口的冪等性 就是多次相同的操作不會(huì)對(duì)數(shù)據(jù)再次改變 消息的冪等性 就是這個(gè)消息的重復(fù)消費(fèi)不會(huì)對(duì)數(shù)據(jù)產(chǎn)生影響
3.為什么消息會(huì)被重復(fù)消費(fèi)
消費(fèi)者拿到數(shù)據(jù)開始消費(fèi),并且也消費(fèi)成功了,在做ACK應(yīng)答之前網(wǎng)絡(luò)出現(xiàn)了閃斷,消費(fèi)者和MQServer斷開了連接。MQServer中的待應(yīng)答就會(huì)變成待分配,此時(shí)消息已經(jīng)成功消費(fèi)了,因?yàn)槭情W斷,所以又再次的連接成功,MQServer在再次的把消息推送給了消費(fèi)者,消費(fèi)者再次拿到數(shù)據(jù)再次進(jìn)行消費(fèi),這里就出現(xiàn)了重復(fù)的消費(fèi)。
這個(gè)地方只需要把消費(fèi)者中調(diào)用消費(fèi)數(shù)據(jù)的方法控制冪等性就可以了
4.冪等性解決方案
Token機(jī)制:關(guān)于解決表單的重復(fù)提交就是服務(wù)端生成一個(gè)token帶給表單,表單中隱藏這個(gè)token,多次提交攜帶token過去,第一次token有效進(jìn)行操作,然后把token設(shè)為失效了,然后后面的提交攜帶的token就是失效的了,就不會(huì)操作了
CAS保證接口冪等性
樂觀鎖實(shí)現(xiàn)冪等性
防重表
緩存隊(duì)列
select+insert
八、死信隊(duì)列
1.什么是死信?
非正常的消息就是死信
2.什么是非正常的消息
被Nack的,被拒絕的,超過隊(duì)列長(zhǎng)度被擠出去的信息
3.什么是死信隊(duì)列
保存死信的消息的隊(duì)列就是死信隊(duì)列
被nack的消息可以壓入隊(duì)列,然后在隊(duì)列中配置了死信交換機(jī)和路由鍵
?的信息,轉(zhuǎn)到死信交換機(jī),由死信交換機(jī)發(fā)給死信隊(duì)列,然后交給另一個(gè)消費(fèi)者處理
一般無法處理的消息都是死信,會(huì)把這些死信消息全部的轉(zhuǎn)到同一個(gè)隊(duì)列來做特殊的處理,這個(gè)隊(duì)列就是死信隊(duì)列。
九、延遲隊(duì)列
有些場(chǎng)景不希望馬上消費(fèi),是需要延時(shí)一段時(shí)間后再去消費(fèi)。比如:10分鐘后,半個(gè)小時(shí)后干點(diǎn)事情。
比如:訂單超過關(guān)閉,驗(yàn)證碼超時(shí)失效,這些都是延時(shí)任務(wù),就可以通過延遲隊(duì)列完成
柚子快報(bào)邀請(qǐng)碼778899分享:分布式 RabbitMq
相關(guān)鏈接
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場(chǎng)。
轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。