欧美free性护士vide0shd,老熟女,一区二区三区,久久久久夜夜夜精品国产,久久久久久综合网天天,欧美成人护士h版

目錄

柚子快報(bào)邀請(qǐng)碼778899分享:分布式 RabbitMq

柚子快報(bào)邀請(qǐng)碼778899分享:分布式 RabbitMq

http://yzkb.51969.com/

目錄

一、為什么要用到RabbitMq?

二、RabbitMq有什么作用?

1.解耦

2.異步

三、RabbitMq的模型

1.helloword模型

2.Work模型

3.發(fā)布訂閱模型

4.路由鍵模型

5.主題模型

四、RabbitMq跟SpringBoot的整合

1.導(dǎo)入依賴

2.yml配置

3.創(chuàng)建隊(duì)列、創(chuàng)建交換機(jī)、將隊(duì)列與交換機(jī)綁定并設(shè)置路由鍵

4.生產(chǎn)者發(fā)送消息

5.消費(fèi)者消費(fèi)消息

五、ACK機(jī)制

1.什么是消息確認(rèn)機(jī)制?

2.手動(dòng)開啟ACK

2.消費(fèi)者應(yīng)答

六、消息的可靠性

1.消息可靠性講的不能丟失,MQ是如何保證消息可靠性的?

1.1消息從生產(chǎn)者到交換機(jī)有可能會(huì)丟失。這里可以通過confirm機(jī)制來解決

1.2交換機(jī)到隊(duì)列也有可能會(huì)丟失。這里可以通過return機(jī)制來解決

1.33、從隊(duì)列到消費(fèi)者也有可能會(huì)丟失。這里可以通過手動(dòng)ACK解決。

2.confirm和return機(jī)制的實(shí)現(xiàn)

2.1yml配置

3.MQ是如何實(shí)現(xiàn)消息確認(rèn)機(jī)制的?

4.消息補(bǔ)償機(jī)制

5.服務(wù)端實(shí)現(xiàn)遠(yuǎn)程調(diào)用(RPC)

5.1通過java網(wǎng)絡(luò)編程包

5.2RestTemplate

5.3ApacheHttpClient:通過工具類即可

七、消息的重復(fù)消費(fèi)

1.消息被消費(fèi)多次的后果

2.怎么解決消息被重復(fù)消費(fèi)

4.冪等性解決方案

八、死信隊(duì)列

1.什么是死信?

2.什么是非正常的消息

3.什么是死信隊(duì)列

九、延遲隊(duì)列

一、為什么要用到RabbitMq?

因?yàn)橄裎覀冎暗捻?xiàng)目,代碼之間的執(zhí)行都是同步的,一個(gè)業(yè)務(wù)的處理必須等待上一個(gè)業(yè)務(wù)的完成,這樣就比較耗費(fèi)時(shí)間,比如我們的用戶查詢數(shù)據(jù)的時(shí)候,對(duì)于用戶而言他只需要查尋數(shù)據(jù)這一個(gè)操作,對(duì)于我們服務(wù)端而言可能還需要做一些處理,像存入緩存、刪除緩存,只有做完這些操作我們服務(wù)端才會(huì)把數(shù)據(jù)傳給用戶,但是這些是我們業(yè)務(wù)的處理,不應(yīng)該讓用戶來承擔(dān)這樣的一個(gè)時(shí)間成本,并且用戶等待數(shù)據(jù)的時(shí)間過長(zhǎng),給用戶也會(huì)帶來了很不好的體驗(yàn)感,同時(shí)模塊之前的耦合性很高,一個(gè)模塊宕機(jī)后,全部模塊都不能用了。所以要中間件RabbitMQ

二、RabbitMq有什么作用?

1.解耦

就是rabbitMq有一個(gè)生產(chǎn)者模塊負(fù)責(zé)發(fā)送消息到隊(duì)列中,一個(gè)消費(fèi)者模塊負(fù)責(zé)從隊(duì)列中拿到數(shù)據(jù)進(jìn)行消費(fèi),模塊與模塊間分離,通過RabbitMq進(jìn)行數(shù)據(jù)通信

2.異步

它可以不需要等待我們代碼的全部執(zhí)行,就可以用戶所需要的數(shù)據(jù)將消息發(fā)送到隊(duì)列中,然后由隊(duì)列推給用戶

三、RabbitMq的模型

1.helloword模型

一個(gè)生產(chǎn)者,一個(gè)隊(duì)列,一個(gè)消費(fèi)者

2.Work模型

?一個(gè)生產(chǎn)者,一個(gè)隊(duì)列,多個(gè)消費(fèi)者

采用多個(gè)消費(fèi)者是為了加快消息的消費(fèi),多個(gè)消費(fèi)者之間采用輪訓(xùn)的方式。

3.發(fā)布訂閱模型

一個(gè)生產(chǎn)者,一個(gè)交換機(jī) ,多個(gè)隊(duì)列,一個(gè)隊(duì)列上對(duì)應(yīng)一個(gè)消費(fèi)者,消費(fèi)者只有訂閱才能收到消息,交換機(jī)通過廣播給訂閱的隊(duì)列

4.路由鍵模型

路由鍵模型,跟發(fā)布訂閱一樣,只不過多了個(gè)路由鍵,進(jìn)行一個(gè)條件的判斷

5.主題模型

主體模型跟路由鍵類似,只不過路由鍵多了兩個(gè)符號(hào) *代表可以接單個(gè)字符,# 代表可以接多個(gè)字符

四、RabbitMq跟SpringBoot的整合

1.導(dǎo)入依賴

org.springframework.boot

spring-boot-starter-amqp

2.yml配置

spring:

rabbitmq:

host: 192.168.107.123 #虛擬主機(jī)的ip地址

port: 5672 #RabbitMq的端口號(hào)

username: guest #匿名用戶

password: guest

virtual-host: / # 虛擬機(jī)主機(jī),隊(duì)列就是保存在虛擬主機(jī)中

3.創(chuàng)建隊(duì)列、創(chuàng)建交換機(jī)、將隊(duì)列與交換機(jī)綁定并設(shè)置路由鍵

// 交換機(jī)的類型是路由鍵

@Bean

public DirectExchange directExchange() {

return new DirectExchange("direct-exchange");

}

@Bean

public Queue directQueue1() {

return new Queue("direct-queue1");

}

@Bean

public Queue directQueue2() {

return new Queue("direct-queue2");

}

// 綁定

@Bean

public Binding directQueue1Bind() {

// 給direct-queue1綁定了兩個(gè)隊(duì)列

BindingBuilder.DirectExchangeRoutingKeyConfigurer to = BindingBuilder.bind(directQueue1()).to(directExchange());

// 綁定了兩個(gè)路由鍵

to.with("error");

Binding warn = to.with("info");

return warn;

}

@Bean

public Binding directQueue2Bind() {

return BindingBuilder.bind(directQueue1()).to(directExchange()).with("info");

}

4.生產(chǎn)者發(fā)送消息

@Test

void contextLoads() {

rabbitTemplate.convertAndSend("direct-exchange", "error", "toString");

System.out.println("生產(chǎn)者消息發(fā)送完成");

}

5.消費(fèi)者消費(fèi)消息

@Component

public class HelloQueueListener {

@RabbitListener(queues = "direct-queue1")

public void cosnuermMsg(String msg) {

System.out.println("消費(fèi)者拿到的數(shù)是:" + msg);

}

}

五、ACK機(jī)制

ACK機(jī)制是消費(fèi)端的一個(gè)消息確認(rèn)機(jī)制

1.什么是消息確認(rèn)機(jī)制?

MQServer把消息推送給消費(fèi)者后,消費(fèi)者開始消費(fèi),消費(fèi)完成后需要把結(jié)果給MQServer應(yīng)答一下,消費(fèi)結(jié)果有兩種情況:失敗、成功

消費(fèi)成功:應(yīng)答ACK,MQServer手動(dòng)ACK后就明白這個(gè)消息已經(jīng)被成功的消費(fèi)了,可以從隊(duì)列中刪除了。

消費(fèi)失?。簯?yīng)答NACK。MQServer收到NACK后知道了消費(fèi)者無法消費(fèi)這個(gè)消息,發(fā)送給其他的消費(fèi)者進(jìn)行消費(fèi)。如果其他的消費(fèi)者也是無法消費(fèi),此時(shí)需要這類消息全部的收集起來入庫(kù),通知相關(guān)人員來檢查。

消費(fèi)者默認(rèn)自動(dòng)應(yīng)答,不出異常自動(dòng)應(yīng)答,出了異常應(yīng)答NACK,并且把這個(gè)消息壓入到隊(duì)列

ready:待分配(消費(fèi)者)的消息的數(shù)量。

unackded:待應(yīng)答的消息數(shù)量。

total:總消息的數(shù)量。

當(dāng)出現(xiàn)異常沒有處理時(shí)候,那么被認(rèn)為應(yīng)答nACK,消息回到隊(duì)列的待應(yīng)答狀態(tài),關(guān)掉消費(fèi)者,則進(jìn)入待分配狀態(tài)客戶端先創(chuàng)建連接對(duì)象,有了連接對(duì)象才能創(chuàng)建信道進(jìn)行數(shù)據(jù)的傳輸

channel斷開后把待應(yīng)答的消息,全部變?yōu)榇峙洹?/p>

mqServer是支持持久化的,重啟后數(shù)據(jù)還有,down刪除就沒有了

2.手動(dòng)開啟ACK

1.ylm配置文件

spring:

rabbitmq:

host: 192.168.127.102

port: 5672

username: guest

password: guest

virtual-host: /

listener:

simple:

acknowledge-mode: manual # 手動(dòng)ACK

2.消費(fèi)者應(yīng)答

@RabbitListener(queues = "hello-queue")

public void cosnuermMsg(String msg, Channel channel, Message message) {

System.out.println("消費(fèi)者拿到的數(shù)是:" + msg);

// 每個(gè)消息的標(biāo)識(shí)

long deliveryTag = message.getMessageProperties().getDeliveryTag();

try {

// 開始消費(fèi)消息

Boolean flag = customerData(msg);

if (flag) {

// 確定消息消費(fèi)成功,應(yīng)答ACK

// 第一個(gè)參數(shù):消息的唯一標(biāo)識(shí)

// 第二個(gè)參數(shù):是否批量應(yīng)答,一般都是false

channel.basicAck(deliveryTag, false);

System.out.println("消息成功,應(yīng)答ACK");

return;

}

System.out.println("消息過程中沒有出現(xiàn)異常,但是消息沒有消費(fèi)成功,應(yīng)答NACK");

} catch (Exception e) {

e.printStackTrace();

System.out.println("消費(fèi)過程中出現(xiàn)了異常,應(yīng)答NACK");

}

// 消息消費(fèi)失敗,應(yīng)答NACK

// 第三個(gè)參數(shù)是:是否壓入隊(duì)列,如果設(shè)置為false該消息就丟棄了

try {

channel.basicNack(deliveryTag, false, true);

} catch (IOException e) {

e.printStackTrace();

}

}

// 完成消息的消費(fèi)

private Boolean customerData(String msg) {

Integer i = Integer.parseInt(msg);

if (i == 0) { // 沒有插入成功,但是也沒有出現(xiàn)異常

return false;

}

return true;

}

六、消息的可靠性

1.消息可靠性講的不能丟失,MQ是如何保證消息可靠性的?

1.1消息從生產(chǎn)者到交換機(jī)有可能會(huì)丟失。這里可以通過confirm機(jī)制來解決

confirm機(jī)制是RabbitMQ自己提供的一個(gè)機(jī)制,用來確認(rèn)消息是否到了交換機(jī)了。

1.2交換機(jī)到隊(duì)列也有可能會(huì)丟失。這里可以通過return機(jī)制來解決

reutrn機(jī)制是RabbitMQ自己提供的一個(gè)機(jī)制,用來確認(rèn)消息是否到了隊(duì)列了。

1.33、從隊(duì)列到消費(fèi)者也有可能會(huì)丟失。這里可以通過手動(dòng)ACK解決。

手動(dòng)ACK后就明白這個(gè)消息已經(jīng)被成功的消費(fèi)了,可以從隊(duì)列中刪除了

2.confirm和return機(jī)制的實(shí)現(xiàn)

2.1yml配置

spring:

rabbitmq:

host: 192.168.127.102

port: 5672

username: guest

password: guest

virtual-host: /

publisher-returns: true #開啟return

publisher-confirm-type: simple # 開啟confirm

/**

* confirm機(jī)制和return機(jī)制

*/

@Component

public class MsgConfirm implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

@Autowired

private RabbitTemplate rabbitTemplate;

@PostConstruct

public void init() {

//這里要對(duì)mq的return和confirm進(jìn)行覆蓋

rabbitTemplate.setReturnCallback(this);

rabbitTemplate.setConfirmCallback(this);

}

// confirm機(jī)制確認(rèn)消息是否到了交換機(jī)

@Override

public void confirm(CorrelationData correlationData, boolean ack, String cause) {

if (ack) {

System.out.println("消息已經(jīng)到了交換機(jī)");

} else {

System.out.println("消息沒到了交換機(jī)");

}

}

// 確認(rèn)消息是否到了隊(duì)列

@Override

public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {

System.out.println("消息沒有到隊(duì)列," + replyText + "," + exchange + "," + routingKey);

}

3.MQ是如何實(shí)現(xiàn)消息確認(rèn)機(jī)制的?

生產(chǎn)者發(fā)送消息的時(shí)候除了交換機(jī),數(shù)據(jù),路由鍵把回調(diào)函數(shù)也發(fā)送過去了,因?yàn)橄⑹欠竦搅私粨Q機(jī)時(shí)MQServer確定的,如果到了就調(diào)用ACK

4.消息補(bǔ)償機(jī)制

但是這只是通知消息是否正常到達(dá),并沒有對(duì)消息沒有達(dá)到的情況進(jìn)行一個(gè)處理,所以我們要進(jìn)行消息補(bǔ)償,生產(chǎn)者對(duì)未發(fā)送成功的消息進(jìn)行一個(gè)消息補(bǔ)償,特別是對(duì)業(yè)務(wù)很重要的數(shù)據(jù)必須補(bǔ)償,無關(guān)緊要的倒可以不必因?yàn)橄⒀a(bǔ)償需要成本

也就是mq發(fā)送數(shù)據(jù)出現(xiàn)故障的時(shí)候,就可以考慮別的方式發(fā)送數(shù)據(jù)了,這就是消息補(bǔ)償,如:RPC遠(yuǎn)程調(diào)用直接由生產(chǎn)者發(fā)送給消費(fèi)者,不通過mq

5.服務(wù)端實(shí)現(xiàn)遠(yuǎn)程調(diào)用(RPC)

5.1通過java網(wǎng)絡(luò)編程包

5.2RestTemplate

@Test

void contextLoads2() throws Exception {

// 這個(gè)類可以發(fā)送一個(gè)請(qǐng)求過去

RestTemplate restTemplate = new RestTemplate();

String info = restTemplate.getForObject("http://localhost:8080/send?msg=HTTP", String.class);

System.out.println("遠(yuǎn)程調(diào)用的返回的結(jié)果:"+info);

}

5.3ApacheHttpClient:通過工具類即可

七、消息的重復(fù)消費(fèi)

消息的重復(fù)消費(fèi)指的是一個(gè)消息被同一個(gè)消費(fèi)者消費(fèi)了多次。

1.消息被消費(fèi)多次的后果

消費(fèi)者拿到消息后干的事情是扣款或者是發(fā)送短信等待。

正常來說消費(fèi)一次就夠了,重復(fù)消費(fèi)后就發(fā)現(xiàn)扣款了多次。

2.怎么解決消息被重復(fù)消費(fèi)

控制冪等性? 冪等性:多次相同的操作不會(huì)對(duì)數(shù)據(jù)產(chǎn)生影響

接口的冪等性 就是多次相同的操作不會(huì)對(duì)數(shù)據(jù)再次改變 消息的冪等性 就是這個(gè)消息的重復(fù)消費(fèi)不會(huì)對(duì)數(shù)據(jù)產(chǎn)生影響

3.為什么消息會(huì)被重復(fù)消費(fèi)

消費(fèi)者拿到數(shù)據(jù)開始消費(fèi),并且也消費(fèi)成功了,在做ACK應(yīng)答之前網(wǎng)絡(luò)出現(xiàn)了閃斷,消費(fèi)者和MQServer斷開了連接。MQServer中的待應(yīng)答就會(huì)變成待分配,此時(shí)消息已經(jīng)成功消費(fèi)了,因?yàn)槭情W斷,所以又再次的連接成功,MQServer在再次的把消息推送給了消費(fèi)者,消費(fèi)者再次拿到數(shù)據(jù)再次進(jìn)行消費(fèi),這里就出現(xiàn)了重復(fù)的消費(fèi)。

這個(gè)地方只需要把消費(fèi)者中調(diào)用消費(fèi)數(shù)據(jù)的方法控制冪等性就可以了

4.冪等性解決方案

Token機(jī)制:關(guān)于解決表單的重復(fù)提交就是服務(wù)端生成一個(gè)token帶給表單,表單中隱藏這個(gè)token,多次提交攜帶token過去,第一次token有效進(jìn)行操作,然后把token設(shè)為失效了,然后后面的提交攜帶的token就是失效的了,就不會(huì)操作了

CAS保證接口冪等性

樂觀鎖實(shí)現(xiàn)冪等性

防重表

緩存隊(duì)列

select+insert

八、死信隊(duì)列

1.什么是死信?

非正常的消息就是死信

2.什么是非正常的消息

被Nack的,被拒絕的,超過隊(duì)列長(zhǎng)度被擠出去的信息

3.什么是死信隊(duì)列

保存死信的消息的隊(duì)列就是死信隊(duì)列

被nack的消息可以壓入隊(duì)列,然后在隊(duì)列中配置了死信交換機(jī)和路由鍵

?的信息,轉(zhuǎn)到死信交換機(jī),由死信交換機(jī)發(fā)給死信隊(duì)列,然后交給另一個(gè)消費(fèi)者處理

一般無法處理的消息都是死信,會(huì)把這些死信消息全部的轉(zhuǎn)到同一個(gè)隊(duì)列來做特殊的處理,這個(gè)隊(duì)列就是死信隊(duì)列。

九、延遲隊(duì)列

有些場(chǎng)景不希望馬上消費(fèi),是需要延時(shí)一段時(shí)間后再去消費(fèi)。比如:10分鐘后,半個(gè)小時(shí)后干點(diǎn)事情。

比如:訂單超過關(guān)閉,驗(yàn)證碼超時(shí)失效,這些都是延時(shí)任務(wù),就可以通過延遲隊(duì)列完成

柚子快報(bào)邀請(qǐng)碼778899分享:分布式 RabbitMq

http://yzkb.51969.com/

相關(guān)鏈接

評(píng)論可見,查看隱藏內(nèi)容

本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場(chǎng)。

轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。

本文鏈接:http://m.gantiao.com.cn/post/18931634.html

發(fā)布評(píng)論

您暫未設(shè)置收款碼

請(qǐng)?jiān)谥黝}配置——文章設(shè)置里上傳

掃描二維碼手機(jī)訪問

文章目錄