欧美free性护士vide0shd,老熟女,一区二区三区,久久久久夜夜夜精品国产,久久久久久综合网天天,欧美成人护士h版

首頁綜合 正文
目錄

柚子快報(bào)邀請(qǐng)碼778899分享:面試 RabbitMQ高級(jí)

柚子快報(bào)邀請(qǐng)碼778899分享:面試 RabbitMQ高級(jí)

http://yzkb.51969.com/

MQ高級(jí)

消息隊(duì)列在使用過程中,面臨著很多實(shí)際問題需要思考:

消息可靠性

消息從發(fā)送到消費(fèi)者接收,會(huì)經(jīng)歷多個(gè)過程:

其中的每一步都可能導(dǎo)致消息丟失,常見的丟失原因包括:

發(fā)送時(shí)丟失:

生產(chǎn)者發(fā)送的消息未送達(dá)exchange

消息到達(dá)exchange后未到達(dá)queue

MQ宕機(jī),queue將消息丟失

consumer接收到消息后未消費(fèi)就宕機(jī)

針對(duì)這些問題,RabbitMQ分別給出了解決方案:

生產(chǎn)者確認(rèn)機(jī)制

mq持久化

消費(fèi)者確認(rèn)機(jī)制

失敗重試機(jī)制

生產(chǎn)者消息確認(rèn)

RabbitMQ提供了publisher confirm機(jī)制來避免消息發(fā)送到MQ過程中丟失。這種機(jī)制必須給每個(gè)消息指定一個(gè)唯一ID。消息發(fā)送到MQ以后,會(huì)返回一個(gè)結(jié)果給發(fā)送者,表示消息是否處理成功。

返回結(jié)果有兩種方式:

publisher-confirm,發(fā)送者確認(rèn)

消息成功投遞到交換機(jī),返回ack消息未投遞到交換機(jī),返回nack publisher-return,發(fā)送者回執(zhí)

消息投遞到交換機(jī)了,但是沒有路由到隊(duì)列。返回ACK,及路由失敗原因。

ts%5Cimage-20210718160907166.png&pos_id=img-ApPSKg7g-1712734315091)

注意

確認(rèn)機(jī)制發(fā)送消息時(shí),需要給每個(gè)消息設(shè)置一個(gè)全局唯一id,以區(qū)分不同消息,避免ack沖突

修改配置

首先,修改publisher服務(wù)中的application.yml文件,添加下面的內(nèi)容:

spring:

rabbitmq:

publisher-confirm-type: correlated

publisher-returns: true

template:

mandatory: true

說明:

publisher-confirm-type:開啟publisher-confirm,這里支持兩種類型:

? simple:同步等待confirm結(jié)果,直到超時(shí)

? correlated:異步回調(diào),定義ConfirmCallback,MQ返回結(jié)果時(shí)會(huì)回調(diào)這個(gè)ConfirmCallback

publisher-returns:開啟publisher-return功能,同樣是基于callback機(jī)制,不過是定義ReturnCallback

template.mandatory:定義消息路由失敗時(shí)的策略。true,這調(diào)用ReturnCallback;false:則直接丟棄消息

定義ReturnConfirm回調(diào)

每個(gè)RabbitTemplate只能配置一個(gè)ReturnCallback,因此需要在項(xiàng)目加載時(shí)配置:

修改publisher服務(wù),添加一個(gè):

package cn.itcast.mq.config;

@Slf4j

@Configuration

public class CommonConfig implements ApplicationContextAware {

@Override

public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {

// 獲取RabbitTemplate

RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);

// 設(shè)置ReturnCallback

rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {

// 投遞失敗,記錄日志

log.info("消息發(fā)送失敗,應(yīng)答碼{},原因{},交換機(jī){},路由鍵{},消息{}",

replyCode, replyText, exchange, routingKey, message.toString());

// 如果有業(yè)務(wù)需要,可以重發(fā)消息

});

rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {

/**

* @param correlationData 自定義的數(shù)據(jù)

* @param ack 是否確認(rèn)

* @param cause 原因

*/

@Override

public void confirm(CorrelationData correlationData, boolean ack, String cause) {

if(ack){

// 3.1.ack,消息成功

log.debug("消息發(fā)送成功, ID:{}", correlationData.getId());

}else{

// 3.2.nack,消息失敗

log.error("消息發(fā)送失敗, ID:{}, 原因{}",correlationData.getId(), cause);

}

}

});

}

@Bean

public DirectExchange simpleExchange(){

// 三個(gè)參數(shù):交換機(jī)名稱、是否持久化、當(dāng)沒有queue與其綁定時(shí)是否自動(dòng)刪除

return new DirectExchange("simple.direct", false, false);

}

@Bean

public Queue simpleQueue(){

return new Queue("simple.queue",false);

}

@Bean

public Binding binding(){

return

BindingBuilder.bind(simpleQueue()).to(simpleExchange()).with("simple");

}

}

發(fā)送消息測(cè)試

ConfirmCallback可以在發(fā)送消息時(shí)指定,因?yàn)槊總€(gè)業(yè)務(wù)處理confirm成功或失敗的邏輯不一定相同。

在publisher服務(wù)的cn.itcast.mq.spring.SpringAmqpTest類中,定義一個(gè)單元測(cè)試方法:

public void testSendMessage2SimpleQueue() throws InterruptedException {

// 1.消息體

String message = "hello, spring amqp!";

// 2.全局唯一的消息ID,需要封裝到CorrelationData中

CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

// 4.發(fā)送消息

rabbitTemplate.convertAndSend("task.direct", "task", message, correlationData);

// 休眠一會(huì)兒,等待ack回執(zhí)

Thread.sleep(2000);

}

設(shè)置不存在的交換機(jī)嘗試發(fā)送 交換機(jī): task.direct 路由: task 結(jié)果: 發(fā)送確認(rèn)回調(diào)返回false消息沒有正確發(fā)送到MQ中 ? return回調(diào)未觸發(fā) 設(shè)置存在的交換機(jī),不存在的路由嘗試發(fā)送 交換機(jī): simple.direct 路由: task 結(jié)果: 發(fā)送確認(rèn)回調(diào)返回true消息已經(jīng)發(fā)送到MQ中 ? return回調(diào)觸發(fā),返回了消息,并提示路由錯(cuò)誤 設(shè)置正確的交換機(jī),正確的路由 交換機(jī): simple.direct 路由: simple 結(jié)果: 發(fā)送確認(rèn)回調(diào)返回true消息已經(jīng)發(fā)送到MQ中 ? return回調(diào)未觸發(fā)

結(jié)論:

通過發(fā)送確認(rèn) 和 消息返還機(jī)制可以確保消息 一定能夠投遞到指定的隊(duì)列中,如果消息沒有投遞成功或返還了也可以通過 定時(shí)重新投遞的方式進(jìn)行補(bǔ)償

消息持久化

生產(chǎn)者確認(rèn)可以確保消息投遞到RabbitMQ的隊(duì)列中,但是消息發(fā)送到RabbitMQ以后,如果突然宕機(jī),也可能導(dǎo)致消息丟失。

要想確保消息在RabbitMQ中安全保存,必須開啟消息持久化機(jī)制。

交換機(jī)持久化

隊(duì)列持久化

消息持久化

交換機(jī)持久化

RabbitMQ中交換機(jī)默認(rèn)是非持久化的,mq重啟后就丟失。

SpringAMQP中可以通過代碼指定交換機(jī)持久化:

@Bean

public DirectExchange simpleExchange(){

// 三個(gè)參數(shù):交換機(jī)名稱、是否持久化、當(dāng)沒有queue與其綁定時(shí)是否自動(dòng)刪除

return new DirectExchange("simple.direct", true, false);

}

事實(shí)上,默認(rèn)情況下,由SpringAMQP聲明的交換機(jī)都是持久化的。

在RabbitMQ控制臺(tái)上,持久化的交換機(jī)都會(huì)帶上D的標(biāo)識(shí):

隊(duì)列持久化

RabbitMQ中隊(duì)列如果設(shè)置成非持久化的,mq重啟后就丟失。

SpringAMQP中可以通過代碼指定交換機(jī)持久化:

@Bean

public Queue simpleQueue(){

return new Queue("simple.queue",true);

}

事實(shí)上,默認(rèn)情況下,由SpringAMQP聲明的隊(duì)列都是持久化的。

可以在RabbitMQ控制臺(tái)看到持久化的隊(duì)列都會(huì)帶上D的標(biāo)示:

消息持久化

利用SpringAMQP發(fā)送消息時(shí),可以設(shè)置消息的屬性(MessageProperties),指定delivery-mode:

? 1:非持久化

? 2:持久化

用java代碼指定:

默認(rèn)情況下,SpringAMQP發(fā)出的任何消息都是持久化的,不用特意指定。

@Test

public void testSendMessage2SimpleQueue() throws InterruptedException {

String routingKey = "simple";

String message = "hello, spring amqp!";

// 自定義數(shù)據(jù)

CorrelationData data = new CorrelationData(UUID.randomUUID().toString());

// 發(fā)送消息

rabbitTemplate.convertAndSend("simple.direct", routingKey, message, new MessagePostProcessor() {

// 后置處理消息

@Override

public Message postProcessMessage(Message message) throws AmqpException {

// 設(shè)置消息的持久化方式

message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);

return message;

}

},data);

}

消費(fèi)者消息確認(rèn)

RabbitMQ是閱后即焚機(jī)制,RabbitMQ確認(rèn)消息被消費(fèi)者消費(fèi)后會(huì)立刻刪除。

而RabbitMQ是通過消費(fèi)者回執(zhí)來確認(rèn)消費(fèi)者是否成功處理消息的:消費(fèi)者獲取消息后,應(yīng)該向RabbitMQ發(fā)送ACK回執(zhí),表明自己已經(jīng)處理消息。

設(shè)想這樣的場(chǎng)景:

1:RabbitMQ投遞消息給消費(fèi)者

2:消費(fèi)者獲取消息后,返回ACK給RabbitMQ

3:RabbitMQ刪除消息

4:消費(fèi)者宕機(jī),消息尚未處理

這樣消息就丟失了,因此消費(fèi)者返回ACK的實(shí)際非常重要

而SpringAMQP則允許配置三種確認(rèn)模式:

?manual:手動(dòng)ack,需要在業(yè)務(wù)代碼結(jié)束后,調(diào)用api發(fā)送ack。

?auto:自動(dòng)ack,由spring監(jiān)測(cè)listener代碼是否出現(xiàn)異常,沒有異常則返回ack;拋出異常則返回nack

?none:關(guān)閉ack,MQ假定消費(fèi)者獲取消息后會(huì)成功處理,因此消息投遞后立即被刪除

由此可知:

none模式下,消息投遞是不可靠的,可能丟失auto模式類似事務(wù)機(jī)制,出現(xiàn)異常時(shí)返回nack,消息回滾到mq;沒有異常,返回ackmanual:自己根據(jù)業(yè)務(wù)情況,判斷什么時(shí)候該ack

一般,我們都是使用默認(rèn)的auto即可。

演示none模式

修改consumer服務(wù)的application.yml文件,添加下面內(nèi)容:

spring:

rabbitmq:

listener:

simple:

acknowledge-mode: none # 關(guān)閉ack

修改consumer服務(wù)的SpringRabbitListener類中的方法,模擬一個(gè)消息處理異常:

@RabbitListener(queues = "simple.queue")

public void listenSimpleQueue(String msg) {

log.info("消費(fèi)者接收到simple.queue的消息:【{}】", msg);

// 模擬異常

System.out.println(1 / 0);

log.debug("消息處理完成!");

}

測(cè)試可以發(fā)現(xiàn),當(dāng)消息處理拋異常時(shí),消息依然被RabbitMQ刪除了。

演示auto模式

再次把確認(rèn)機(jī)制修改為auto:

spring:

rabbitmq:

listener:

simple:

acknowledge-mode: auto # 關(guān)閉ack

在異常位置打斷點(diǎn),再次發(fā)送消息,程序卡在斷點(diǎn)時(shí),可以發(fā)現(xiàn)此時(shí)消息狀態(tài)為unack(未確定狀態(tài)):

拋出異常后,因?yàn)镾pring會(huì)自動(dòng)返回nack,所以消息恢復(fù)至Ready狀態(tài),并且沒有被RabbitMQ刪除:

消費(fèi)失敗重試機(jī)制

當(dāng)消費(fèi)者出現(xiàn)異常后,消息會(huì)不斷requeue(重入隊(duì))到隊(duì)列,再重新發(fā)送給消費(fèi)者,然后再次異常,再次requeue,無限循環(huán),導(dǎo)致mq的消息處理飆升,帶來不必要的壓力:

解決辦法:

本地重試

我們可以利用Spring的retry機(jī)制,在消費(fèi)者出現(xiàn)異常時(shí)利用本地重試,而不是無限制的requeue到mq隊(duì)列。

修改consumer服務(wù)的application.yml文件,添加內(nèi)容:

spring:

rabbitmq:

listener:

simple:

retry:

enabled: true # 開啟消費(fèi)者失敗重試

initial-interval: 1000ms # 初識(shí)的失敗等待時(shí)長(zhǎng)為1秒

multiplier: 1 # 失敗的等待時(shí)長(zhǎng)倍數(shù),下次等待時(shí)長(zhǎng) = multiplier * last-interval

max-attempts: 3 # 最大重試次數(shù)

stateless: true # true無狀態(tài);false有狀態(tài)。如果業(yè)務(wù)中包含事務(wù),這里改為false

重啟consumer服務(wù),重復(fù)之前的測(cè)試??梢园l(fā)現(xiàn):

在重試3次后,SpringAMQP會(huì)拋出異常AmqpRejectAndDontRequeueException,說明本地重試觸發(fā)了 查看RabbitMQ控制臺(tái),發(fā)現(xiàn)消息被刪除了,說明最后SpringAMQP返回的是ack,mq刪除消息了

結(jié)論:

開啟本地重試時(shí),消息處理過程中拋出異常,不會(huì)被requeue到隊(duì)列,而是在消費(fèi)者本地重試

重試達(dá)到最大次數(shù)后,spring會(huì)返回ack,消息會(huì)被丟棄

失敗策略

在之前的測(cè)試中,達(dá)到最大重試次數(shù)后,消息會(huì)被丟棄,這是由Spring內(nèi)部機(jī)制決定的。

在開啟重試模式后,重試次數(shù)耗盡,如果消息依然失敗,則需要有MessageRecovery接口來處理,它包含三種不同的實(shí)現(xiàn):

RejectAndDontRequeueRecoverer:重試耗盡后,直接reject,丟棄消息。默認(rèn)就是這種方式 ImmediateRequeueMessageRecoverer:重試耗盡后,返回nack,消息重新入隊(duì) RepublishMessageRecoverer:重試耗盡后,將失敗消息投遞到指定的交換機(jī)

比較優(yōu)雅的一種處理方案是RepublishMessageRecoverer,失敗后將消息投遞到一個(gè)指定的,專門存放異常消息的隊(duì)列,后續(xù)由人工集中處理。

1)在consumer服務(wù)中定義處理失敗消息的交換機(jī)和隊(duì)列

@Bean

public DirectExchange errorMessageExchange(){

return new DirectExchange("error.direct");

}

@Bean

public Queue errorQueue(){

return new Queue("error.queue", true);

}

@Bean

public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){

return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");

}

2)定義一個(gè)RepublishMessageRecoverer,關(guān)聯(lián)隊(duì)列和交換機(jī)

@Bean

public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){

return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");

}

完整代碼:

package cn.itcast.mq.config;

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.DirectExchange;

import org.springframework.amqp.core.Queue;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.amqp.rabbit.retry.MessageRecoverer;

import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;

import org.springframework.context.annotation.Bean;

@Configuration

public class ErrorMessageConfig {

@Bean

public DirectExchange errorMessageExchange(){

return new DirectExchange("error.direct");

}

@Bean

public Queue errorQueue(){

return new Queue("error.queue", true);

}

@Bean

public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){

return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");

}

@Bean

public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){

return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");

}

}

總結(jié)

如何確保RabbitMQ消息的可靠性?

開啟生產(chǎn)者確認(rèn)機(jī)制,確保生產(chǎn)者的消息能到達(dá)隊(duì)列

開啟持久化功能,確保消息未消費(fèi)前在隊(duì)列中不會(huì)丟失

開啟消費(fèi)者確認(rèn)機(jī)制為auto,由spring確認(rèn)消息處理成功后完成ack

開啟消費(fèi)者失敗重試機(jī)制,并設(shè)置MessageRecoverer,多次重試失敗后將消息投遞到異常交換機(jī),交由人工處理

死信交換機(jī)

初識(shí)死信交換機(jī)

什么是死信交換機(jī)

什么是死信?

當(dāng)一個(gè)隊(duì)列中的消息滿足下列情況之一時(shí),可以成為死信(dead letter):

? 消費(fèi)者使用basic.reject或basic.nack聲明消費(fèi)失敗,并且消息的requeue參數(shù)設(shè)置為false

? 消息是一個(gè)過期消息,超時(shí)無人消費(fèi)

? 要投遞的隊(duì)列消息滿了,無法投遞

如果這個(gè)包含死信的隊(duì)列配置了dead-letter-exchange屬性,指定了一個(gè)交換機(jī),那么隊(duì)列中的死信就會(huì)投遞到這個(gè)交換機(jī)中,而這個(gè)交換機(jī)稱為死信交換機(jī)(Dead Letter Exchange,簡(jiǎn)稱DLX)。

如圖,一個(gè)消息被消費(fèi)者拒絕了,變成了死信:

因?yàn)閟imple.queue綁定了死信交換機(jī) dl.direct,因此死信會(huì)投遞給這個(gè)交換機(jī):

如果這個(gè)死信交換機(jī)也綁定了一個(gè)隊(duì)列,則消息最終會(huì)進(jìn)入這個(gè)存放死信的隊(duì)列:

另外,隊(duì)列將死信投遞給死信交換機(jī)時(shí),必須知道兩個(gè)信息:

死信交換機(jī)名稱死信交換機(jī)與死信隊(duì)列綁定的RoutingKey

這樣才能確保投遞的消息能到達(dá)死信交換機(jī),并且正確的路由到死信隊(duì)列。

利用死信交換機(jī)接收死信

在失敗重試策略中,默認(rèn)的RejectAndDontRequeueRecoverer會(huì)在本地重試次數(shù)耗盡后,發(fā)送reject給RabbitMQ,消息變成死信,被丟棄。

在consumer中CommonConfig 修改消息策略

// 修改 失敗消息策略

@Bean

public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){

// return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");

return new RejectAndDontRequeueRecoverer();

}

我們可以給simple.queue添加一個(gè)死信交換機(jī),給死信交換機(jī)綁定一個(gè)隊(duì)列。這樣消息變成死信后也不會(huì)丟棄,而是最終投遞到死信交換機(jī),路由到與死信交換機(jī)綁定的隊(duì)列。

總結(jié)

什么樣的消息會(huì)成為死信?

消息被消費(fèi)者reject或者返回nack

消息超時(shí)未消費(fèi)

隊(duì)列滿了

死信交換機(jī)的使用場(chǎng)景是什么?

如果隊(duì)列綁定了死信交換機(jī),死信會(huì)投遞到死信交換機(jī)

可以利用死信交換機(jī)手機(jī)所有消費(fèi)者處理失敗的消息(死信),交由人工處理,進(jìn)一步提高消息隊(duì)列的可靠性

TTL

一個(gè)隊(duì)列中的消息如果超時(shí)未消費(fèi),則會(huì)變成死信,超時(shí)分為兩種情況:

? 消息所在的隊(duì)列設(shè)置了超時(shí)時(shí)間

? 消息本身設(shè)置了超時(shí)時(shí)間

接收超時(shí)死信的死信交換機(jī)

在consumer服務(wù)的SpringRabbitListener中,定義一個(gè)新的消費(fèi)者,并且聲明 死信交換機(jī)、死信隊(duì)列:

@RabbitListener(bindings = @QueueBinding(

value = @Queue(name = "dl.ttl.queue", durable = "true"),

exchange = @Exchange(name = "dl.ttl.direct"),

key = "ttl"

))

public void listenDlQueue(String msg){

log.info("接收到 dl.ttl.queue的延遲消息:{}", msg);

聲明一個(gè)隊(duì)列,并且指定TTL

要給隊(duì)列設(shè)置超時(shí)時(shí)間,需要在聲明隊(duì)列時(shí)配置x-message-ttl屬性:

@Bean

public Queue ttlQueue(){

return QueueBuilder.durable("ttl.queue") // 指定隊(duì)列名稱,并持久化

.ttl(10000) // 設(shè)置隊(duì)列的超時(shí)時(shí)間,10秒

.deadLetterExchange("dl.ttl.direct") // 指定死信交換機(jī)

.build();

}

注意,這個(gè)隊(duì)列設(shè)定了死信交換機(jī)為dl.ttl.direct

聲明交換機(jī),將ttl與交換機(jī)綁定:

@Bean

public DirectExchange ttlExchange(){

return new DirectExchange("ttl.direct");

}

@Bean

public Binding ttlBinding(){

return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");

}

總結(jié)

消息超時(shí)的兩種方式是?

? 給隊(duì)列設(shè)置ttl屬性,進(jìn)入隊(duì)列后超過ttl時(shí)間的消息變成死信

? 給消息設(shè)置ttl屬性,隊(duì)列接收到消息超過ttl時(shí)間后變?yōu)樗佬?/p>

如何實(shí)現(xiàn)發(fā)送一個(gè)消息20秒后消費(fèi)者才收到消息?

? 給消息的目標(biāo)隊(duì)列指定死信交換機(jī)

? 將消費(fèi)者監(jiān)聽的隊(duì)列綁定到死信交換機(jī)

? 發(fā)送消息時(shí)給消息設(shè)置超時(shí)時(shí)間為20秒

延遲隊(duì)列

利用TTL結(jié)合死信交換機(jī),我們實(shí)現(xiàn)了消息發(fā)出后,消費(fèi)者延遲收到消息的效果。這種消息模式就稱為延遲隊(duì)列(Delay Queue)模式。

延遲隊(duì)列的使用場(chǎng)景包括:

延遲發(fā)送短信用戶下單,如果用戶在15 分鐘內(nèi)未支付,則自動(dòng)取消預(yù)約工作會(huì)議,20分鐘后自動(dòng)通知所有參會(huì)人員

因?yàn)檠舆t隊(duì)列的需求非常多,所以RabbitMQ的官方也推出了一個(gè)插件,原生支持延遲隊(duì)列效果。

這個(gè)插件就是DelayExchange插件。參考RabbitMQ的插件列表頁面:https://www.rabbitmq.com/community-plugins.html

使用方式可以參考官網(wǎng)地址:https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq

DelayExchange原理

DelayExchange需要將一個(gè)交換機(jī)聲明為delayed類型。當(dāng)我們發(fā)送消息到delayExchange時(shí),流程如下:

接受消息

判斷消息是否具備x-delay屬性

如果有,說明是延遲消息,持久化到硬盤,讀取x-delay值,作為延遲時(shí)間

返回routing not found結(jié)果給消息發(fā)送者

x-delay時(shí)間到期后,重新投遞消息到指定隊(duì)列

使用DelayExchange

插件的使用也非常簡(jiǎn)單:聲明一個(gè)交換機(jī),交換機(jī)的類型可以是任意類型,只需要設(shè)定delayed屬性為true即可,然后聲明隊(duì)列與其綁定即可。

1)聲明DelayExchange交換機(jī)

基于注解方式(推薦):

@RabbitListener(bindings = @QueueBinding(

value = @Queue(name = "delay.queue", durable = "true"),

exchange = @Exchange(name = "delay.direct",delayed = "true"),

key = "delay"

))

public void listenDelayedQueue(String msg){

log.info("接收到 delay.queue的延遲消息:{}", msg);

}

也可以基于@Bean的方式:

2)發(fā)送消息

發(fā)送消息時(shí),一定要攜帶x-delay屬性,指定延遲的時(shí)間:

@Test

public void testDelayedMsg() {

// 創(chuàng)建消息

Message message = MessageBuilder

.withBody("hello, delay message".getBytes(StandardCharsets.UTF_8))

.setHeader("x-delay",10000)

.build();

// 消息ID,需要封裝到CorrelationData中

CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

// 發(fā)送消息

rabbitTemplate.convertAndSend("delay.direct", "delay", message, correlationData);

log.debug("發(fā)送消息成功");

}

總結(jié)

延遲隊(duì)列插件的使用步驟?

聲明一個(gè)交換機(jī),添加delayed屬性為true

發(fā)送消息時(shí),添加x-delay頭,值為超時(shí)時(shí)間

惰性隊(duì)列

消息堆積問題

當(dāng)生產(chǎn)者發(fā)送消息的速度超過了消費(fèi)者處理消息的速度,就會(huì)導(dǎo)致隊(duì)列中的消息堆積,直到隊(duì)列存儲(chǔ)消息達(dá)到上限。之后發(fā)送的消息就會(huì)成為死信,可能會(huì)被丟棄,這就是消息堆積問題。

解決消息堆積的思路:

增加更多的消費(fèi)者,提高消費(fèi)速度,也就是我們之前說的work queue模式

擴(kuò)大隊(duì)列容積,提高堆積上限

要提升隊(duì)列容積,把消息保存在內(nèi)存中顯然是不行的。

惰性隊(duì)列

從RabbitMQ的3.6.0版本開始,就增加了Lazy Queues的概念,也就是惰性隊(duì)列。惰性隊(duì)列的特征如下:

接收到消息后直接存入磁盤而非內(nèi)存消費(fèi)者要消費(fèi)消息時(shí)才會(huì)從磁盤中讀取并加載到內(nèi)存支持?jǐn)?shù)百萬條的消息存儲(chǔ)

基于命令行設(shè)置lazy-queue

而要設(shè)置一個(gè)隊(duì)列為惰性隊(duì)列,只需要在聲明隊(duì)列時(shí),指定x-queue-mode屬性為lazy即可。可以通過命令行將一個(gè)運(yùn)行中的隊(duì)列修改為惰性隊(duì)列:

rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues

命令解讀:

rabbitmqctl :RabbitMQ的命令行工具set_policy :添加一個(gè)策略Lazy :策略名稱,可以自定義"^lazy-queue$" :用正則表達(dá)式匹配隊(duì)列的名字'{"queue-mode":"lazy"}' :設(shè)置隊(duì)列模式為lazy模式--apply-to queues :策略的作用對(duì)象,是所有的隊(duì)列

基于@Bean聲明lazy-queue

基于@RabbitListener聲明LazyQueue

總結(jié):

消息堆積問題的解決方案?

隊(duì)列上綁定多個(gè)消費(fèi)者,提高消費(fèi)速度

使用惰性隊(duì)列,可以在mq中保存更多消息

惰性隊(duì)列的優(yōu)點(diǎn)有哪些?

基于磁盤存儲(chǔ),消息上限高

沒有間歇性的page-out,性能比較穩(wěn)定

惰性隊(duì)列的缺點(diǎn)有哪些?

基于磁盤存儲(chǔ),消息時(shí)效性會(huì)降低

性能受限于磁盤的IO

集群

集群分類

RabbitMQ的是基于Erlang語言編寫,而Erlang又是一個(gè)面向并發(fā)的語言,天然支持集群模式。RabbitMQ的集群有兩種模式:

?普通集群:是一種分布式集群,將隊(duì)列分散到集群的各個(gè)節(jié)點(diǎn),從而提高整個(gè)集群的并發(fā)能力。

?鏡像集群:是一種主從集群,普通集群的基礎(chǔ)上,添加了主從備份功能,提高集群的數(shù)據(jù)可用性。

鏡像集群雖然支持主從,但主從同步并不是強(qiáng)一致的,某些情況下可能有數(shù)據(jù)丟失的風(fēng)險(xiǎn)。因此在RabbitMQ的3.8版本以后,推出了新的功能:仲裁隊(duì)列來代替鏡像集群,底層采用Raft協(xié)議確保主從的數(shù)據(jù)一致性。

仲裁隊(duì)列

特征

與鏡像隊(duì)列一樣,都是主從模式,支持主從數(shù)據(jù)同步

使用非常簡(jiǎn)單,沒有復(fù)雜的配置

主從同步基于Raft協(xié)議,強(qiáng)一致

柚子快報(bào)邀請(qǐng)碼778899分享:面試 RabbitMQ高級(jí)

http://yzkb.51969.com/

好文鏈接

評(píng)論可見,查看隱藏內(nèi)容

本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場(chǎng)。

轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。

本文鏈接:http://m.gantiao.com.cn/post/19051999.html

發(fā)布評(píng)論

您暫未設(shè)置收款碼

請(qǐng)?jiān)谥黝}配置——文章設(shè)置里上傳

掃描二維碼手機(jī)訪問

文章目錄