欧美free性护士vide0shd,老熟女,一区二区三区,久久久久夜夜夜精品国产,久久久久久综合网天天,欧美成人护士h版

首頁綜合 正文
目錄

柚子快報邀請碼778899分享:RabbitMq學習

柚子快報邀請碼778899分享:RabbitMq學習

http://yzkb.51969.com/

Springboot整合RabbitMq

使用步驟

1、引入spring-boot-starter-amqp的依賴,并配置host主機地址、port端口、virtualHost虛擬主機、用戶名、密碼等

2、聲明交換機、隊列、交換機與隊列的綁定關系

3、使用RabbitTemplate的convertAndSend方法將消息發(fā)送給交換機,交換機收到消息后,路由給所綁定的隊列

4、消費者使用@RabbitListener注解方法監(jiān)聽隊列,當收到消息時,回調(diào)此注解方法

補充(詳細)

1、虛擬主機有什么用?

用于數(shù)據(jù)隔離,不同項目可以創(chuàng)建不同的虛擬主機。但是配置時,需要有對應虛擬主機權限的用戶才可以使用指定的虛擬主機

用于數(shù)據(jù)隔離,不同項目可以創(chuàng)建不同的虛擬主機。但是配置時,需要有對應虛擬主機權限的用戶才可以使用指定的虛擬主機

2、當在rabbitmq中已經(jīng)在web管理后臺,通過手動的方式創(chuàng)建了隊列,交換機時,啟動項目時又去聲明隊列,交換機時,然后項目停了,然后又去啟動項目又去聲明隊列交換機時?;蛘吆竺鎲禹椖繒r,聲明的交換機類型改變時?原來已經(jīng)存在的交換機或隊列是否會被刪除?原來里面已經(jīng)存在的消息是否會被清空掉?

如果消息代理中已經(jīng)存在對應名稱的交換機,那么如果修改了代碼中此交換機類型的定義,那么啟動的時候就會報錯。

3、rabbitTemplate的convertAndSend發(fā)送消息時,可以不指定交換機,直接發(fā)給隊列(會使用默認的交換機,默認的交換機就是根據(jù)消息發(fā)送時所指定的routekey找到與此routeKey名稱相同的消息隊列)

4、rabbitTemplate的convertAndSend發(fā)送消息時,可以指定交換機,交換機會根據(jù)當前交換機類型和此交換機的綁定關系將消息路由給對應綁定的隊列

5、rabbitTemplate的convertAndSend發(fā)送不同的java類型消息,可以指定消息轉換器。在使用@RabbitListener注解方法監(jiān)聽消息時,聲明發(fā)送的java類型即可

6、在同一項目中可以使用@RabbitmListener注解的1個方法來監(jiān)聽指定的多個消息隊列。

7、在同一項目中的多個方法都使用了@RabbitListener注解,并且這幾個方法監(jiān)聽的消息隊列中有相同的,那么當這些相同消息隊列中收到消息時,會負載均衡的交給這幾個方法處理(也就是1個消息只會給到其中1個方法處理),這就是work queues工作隊列模式。

8、在同一項目中的多個方法都使用了@RabbitListener注解,并且這幾個方法監(jiān)聽的消息隊列中有相同的,即使這幾個方法中處理的效率有高有低(故意在其中某個方法中睡它10s,這個方法的處理效率就低了),但是他們收到的消息數(shù)量仍然是按負載均衡分發(fā)的。那么肯定要解決這個問題,因此可以加上配置:spring.rabbit.listener.simple.prefetch=1,意思就是消費者每次拉取1條消息,這條消息處理完成之后,消息代理才會將下1條消息發(fā)過來,這樣就不是按照負載均衡的方式發(fā)給這多個方法了,而是能者多勞。

9、在不同的項目中都監(jiān)聽了同1個消息隊列,本來這個隊列中有消息時,是按負載均衡1個消費者1個來輪流發(fā),但是如果其中某個消費者掛了,剩余的消息是否還按輪流來?當它再次上線,是否繼續(xù)讓它輪流來?

10、在同一隊列上有多個消費者在監(jiān)聽,當消息發(fā)給某個消費者時,這個消費者在處理時發(fā)生了異常,這個消息會被忽略掉?還是交給其它消費者?默認會被忽略掉

11、@RabbitListener可以標注在方法上,指定需要監(jiān)聽的消息隊列(可指定多個消息隊列),然后在方法參數(shù)位置上聲明所要處理的消息類型(消息的發(fā)送者所發(fā)送的類型)。

12、@RabbitListener可以標注在類上,然后在這個類中的某個方法上使用@RabbitHandler注解,并在這個 方法的方法參數(shù)位置聲明byte[] data來接收消息數(shù)據(jù)。

13、1個消息隊列有多個消費者在監(jiān)聽,每個消息都會只發(fā)給其中某1個消費者處理,這樣的模型叫工作隊列模式

14、交換機類型有默認交換機類型、Fanout類型、Direct類型、Topic類型。

Fanout類型交換機:1個Fanout類型交換機可以綁定多個消息隊列,當Fanout類型收到1個消息時,會直接發(fā)給所綁定的每1個消息隊列 Direct類型,1個Direct類型交換機可以綁定多個消息隊列,每綁定1個消息隊列時,需要指定對應的路由key(routeKey),當Direct類型交換機收到1個消息時,會根據(jù)消息發(fā)送時所指定的路由key發(fā)送給routeKey完全匹配到的消息隊列, Topic類型,1個Topic類型交換機可以綁定多個消息隊列,每綁定1個消息隊列時,需要指定對應的通配符(#代指0個或多個單詞和*代指1個單詞),當Topic類型交換機收到1個消息時,會根據(jù)消息發(fā)送時所指定的路由key發(fā)送給routeKey通配匹配到的消息隊列,

15、Spring Amqp提供了聲明隊列、交換機、隊列和交換機綁定關系的類。在sprigboot中如何使用呢?只需要將它們以bean的形式定義出來,并且項目中必須至少使用了1個@RabbitListener注解,那么springboot會幫助我們在rabbitmq消息代理服務器中創(chuàng)建對應的隊列、交換機、隊列和交換機綁定關系(注意前提:要想以@bean的方式創(chuàng)建隊列和交換機,必須至少有一個監(jiān)聽者@RabbitListener,否則即使聲明了Queue、Exchange、Binding這些bean,也不會創(chuàng)建成功的隊列和交換機的)

可以使用QueueBuilder來聲明隊列

可考慮定義為持久當項目重啟過程中時,原來已存在的隊列仍然能正常收到消息,并且這些消息能正常消費,不會被刪除或清空 可以使用ExchangeBuilder來聲明交換機

不同的交換機類型有不同的實現(xiàn)類如果在項目啟動前就已經(jīng)存在了該名稱的交換機,并且類型相同,那么就不會創(chuàng)建。如果已經(jīng)存在了該名稱的交換機,但是現(xiàn)在項目代碼中又把這個類型改成了其它類型,那么創(chuàng)建此交換機不會成功,僅會打印錯誤創(chuàng)建的日志,不影響啟動 可以使用BindingBuilder來聲明隊列和交換機的綁定關系

聲明代碼示例

@Configuration

public class QueueConfig {

@Bean

public Queue queue3() {

return QueueBuilder.durable("direct.queue3").build();

}

@Bean

public Exchange exchange3() {

return ExchangeBuilder.directExchange("direct.exchange3").build();

}

@Bean

public Binding binding3() {

return BindingBuilder.bind(queue3()).to(exchange3()).with("A3").noargs();

}

}

16、Spring Amqp還可以使用@RabbitListener注解來聲明隊列和交換機(這個不需要前提,直接如下聲明就會創(chuàng)建),如下聲明會創(chuàng)建對應的消息隊列,交換機,交換機和隊列的綁定,并且當消息隊列中有消息時,被注解的方法將會被回調(diào)

@RabbitListener(bindings = {

@QueueBinding(

// 隊列是否持久: 當消息代理重啟時, 非持舊隊列將會干掉了。不設置時,默認是持久的。

value = @Queue(value = "direct.queue1",durable = "true"),

// 交換機是否持久: 當消息代理重啟時, 非持久隊列將會干掉了。

// 不設置時,默認是持久的。

// 默認就是DIRECT類型交換機

exchange = @Exchange(value = "direct.exchange1",type = ExchangeTypes.DIRECT),

// 當發(fā)送到direct.exchange1交換機的消息時,所指定的routeKey是red或者是blue時,由此方法處理

key = {"red","blue"}

)

})

public void listenQueue(AddUser addUser) {

// ...

}

17、使用rabbitmqTemplate#convert(exchange, routeKey, object)發(fā)送消息時,所發(fā)送消息的類型是Object類型。默認支持的類型是Message類型,它有2個屬性byte[] body和MessageProperties messageProperties。

如果發(fā)送的消息的類型不是Message類型,那么會使用1個消息轉換器(org.springframework.amqp.support.converter.MessageConverter),將object對象轉換為Message對象。

在spring amqp中默認使用的是SimpleMessageConverter對此消息作序列化處理,它會在object是byte[]時,直接就創(chuàng)建Message,是String時,直接獲取字符串轉為utf8編碼的字節(jié),實現(xiàn)了Serializable時,使用jkd的序列化機制轉為byte[],在這3種情況下,都會設置MessageProperties#setContentType為對應的內(nèi)容類型。在rabbitmq的web后臺管理頁面看到的是字節(jié)數(shù)組轉base64字符串的形式。在反序列化時,也會使用對應的逆向方式去作反序列化。

可以使用jackson序列化的方式,引入jackson的依賴com.fasterxml.jackson.core的jackson-databind,然后定義org.springframework.amqp.support.converter.Jackson2JsonMessageConverter的bean即可,它會自動生效的。也可以直接創(chuàng)建它,然后將它直接設置給RabbitTemplate就行了

18、docker安裝rabbitmq

docker run \

-e RABBITMQ_DEFAULT_USER=guest \

-e RABBITMQ_DEFAULT_PASS=guest \

-v mq-plugins:/plugins \

--name mq \

--hostname mq \

-p 15672:15672 \

-p 5672:5672 \

--network hmall \

-d \

rabbitmq:3.8-management

19、在集群模式下如何保證使用@RabbitListener標注的方法只存在1個消費者呢?只是在聲明隊列的時候的參數(shù)(arguments參數(shù)),將x-single-active-consumer設置為True即可。參考:RabbitMQ多消費者實例時,保證只有一個消費者進行消費(單活消費者模式)。

如果要保證消息順序,可以這樣考慮:可以通過添加多個消息隊列,每個消息隊列只允許1個消費者,并且開啟手動確認,并且設置prefetch為1,也就是每個隊列每次拉取1條消息,當手動確認完了這條消息,再來處下1條消息,來保證處理消息的順序。但這樣會大大限制并發(fā)能力,可以將同1批要保證順序的消息按順序的發(fā)往同1個消息隊列,這可以通過對這1批消息通過某種哈希運算得出相同消息隊列id,只要消息的發(fā)送是按順序的,那么消息的消費就是按順序的,并且增加多個隊列就相當于在增加并發(fā)度

20、消息可靠性,消息從發(fā)送者到mq,再從mq到消費者,其中每個環(huán)節(jié)都可能發(fā)生問題。

發(fā)送者的可靠性,這部分的可靠性由生產(chǎn)者重連機制和生產(chǎn)者確認機制來保證。

生產(chǎn)者重連:由于網(wǎng)絡波動的存在,可能會出現(xiàn)客戶端連接mq失敗的情況。spring amqp提供了開啟連接失敗后的重連機制(注意:這不是消息發(fā)送失敗的重試機制,是連接失敗的重試機制)。但是,這個重連是阻塞式的重試,在多次重試等待的過程中,當前線程是被阻塞的,因此如果對業(yè)務性能有要求,建議禁用重試機制。如果一定要使用,就要合理配置等待時長和重試次數(shù),當然,也可以考慮使用異步線程來執(zhí)行發(fā)送消息的代碼 (注意:這里說的重連是項目啟動之后,使用rabbitmqTemplate發(fā)送消息時,肯定需要連接mq,是這個時候的重連,不是指的,項目啟動時的重連。并且使用rabbitmqTemplate發(fā)送消息的下1行代碼在重連期間是不會執(zhí)行的,當在試完最大嘗試次數(shù)還沒有連接成功后,就會在當前線程拋出異常,下1行代碼不會執(zhí)行了) spring:

rabbitmq:

host: xxx.xx.xx.xx

port: 5672

virtual-host: /demo-vh

username: guest

password: xxxxxx

connection-timeout: 1s # 設置mq的連接超時時間

template:

retry:

enabled: true # 開啟超時重連機制

initial-interval: 1000ms # 失敗后的初始等待時間

multiplier: 1 # 失敗后下次的等待時長倍數(shù)

max-attempts: 3 # 最大重連次數(shù)

生產(chǎn)者確認:rabbitmq提供了Publisher Confiirm和Publisher Return這2種確認機制。開啟確認機制后,在mq成功收到消息后,會返回確認消息給生產(chǎn)者。返回的額結果有以下幾種情況:

消息投遞到了mq,但是路由失敗。此時會通過publisher Return返回路由異常原因,然后返回ack,告知投遞成功臨時消息投遞到了mq,并且入隊成功,返回ack,告知投遞成功持久消息投遞到了mq,并且入隊完成持久化,返回ack告知投遞成功其它情況都會返回nack,告知投遞失敗 配置如下: spring:

rabbitmq:

host: xxx.xx.xx.xx

port: 5672

virtual-host: /demo-vh

username: guest

password: xxxxxx

connection-timeout: 1s # 設置mq的連接超時時間

template:

retry:

enabled: true # 開啟超時重連機制

initial-interval: 1000ms # 失敗后的初始等待時間

multiplier: 1 # 失敗后下次的等待時長倍數(shù)

max-attempts: 3 # 最大重連次數(shù)

publisher-confirms: true # 開啟確認機制(注意開啟確認機制后,對效率有所影響的哦,

# 演示發(fā)送大量消息時,建議關掉)

publisher-returns: true # 開啟確認機制

設置confirmCallback和returnCallback @Slf4j

@Configuration

public class RabbitConfig {

@Autowired

private RabbitTemplate rabbitTemplate;

@Bean

public MessageConverter messageConverter() {

return new Jackson2JsonMessageConverter();

}

@PostConstruct

public void postProcessTemplate() {

// 設置returnCallback

rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {

/* 1. 當發(fā)送消息給mq的交換機, 并且指定1個在此交換機上不存在的綁定關系的routeKey時, 此方法會被回調(diào)

2. 當發(fā)送消息給mq的交換機, 并且交換機能夠根據(jù)綁定關系將消息路由到隊列時, 這個方法是不會回調(diào)的

*/

@Override

public void returnedMessage(Message message,

int replyCode,

String replyText,

String exchange,

String routingKey) {

log.info("統(tǒng)一收到returnedMessage, message: {}, replayCode:{}, replyText: {}," +

"exchange: {}, routeKey:{}",

message, replyCode, replyText, exchange, routingKey);

}

});

// 設置confirmCallback

rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {

/* 1. 當發(fā)送消息給指定的交換機, mq交換機收到消息時, 會回調(diào)此方法, 并且傳過來的ack為true

(無論此交換機后面是否能將此消息路由到隊列, 都會回調(diào)此方法)

2. 當發(fā)送消息給1個不存在交換機時, 會回調(diào)此方法, 傳過來的ack為false

3. 當發(fā)送1個消息時, 此時斷網(wǎng)的情況下, 經(jīng)過一小段時間后, 會回調(diào)此方法, 傳過來的ack為false

*/

@Override

public void confirm(CorrelationData correlationData, // 能夠從此對象中拿到發(fā)送消息時的信息

boolean ack,

String cause) {

log.info("統(tǒng)一收到回執(zhí), ack: {}, correlationData: {}, cause: {}",

ack, correlationData, cause);

}

});

log.info("已設置confirmCallback和returnCallback");

}

}

發(fā)送消息 @RequestMapping("rabbit")

@RestController

public class RabbitController {

@Autowired

private RabbitTemplate rabbitTemplate;

@GetMapping("sendToMq2")

public Object sendToMq2(String content, String targetExchage, String routeKey) {

HashMap data = new HashMap<>();

data.put("content", content);

rabbitTemplate.convertAndSend(targetExchage, routeKey, data);

return "ok";

}

@GetMapping("sendToMq3")

public Object sendToMq3(String content, String targetExchage, String routeKey) {

CorrelationData correlationData = new CorrelationData();

correlationData.setId(UUID.randomUUID().toString());

correlationData

.getFuture()

.addCallback(new ListenableFutureCallback() {

@Override

public void onFailure(Throwable ex) {

log.info("失敗...");

}

/* 接收到回執(zhí)時, 該方法觸發(fā)。

*/

@Override

public void onSuccess(CorrelationData.Confirm result) {

log.info("接收到回執(zhí), 是否ack: {}, 原因: {}",

result.isAck(), result.getReason());

}

});

HashMap data = new HashMap<>();

data.put("content", content);

rabbitTemplate.convertAndSend(targetExchage, routeKey, data, correlationData);

return "ok";

}

}

上面我用的spring-boot-starter-amqp版本是2.1.8.RELEASE,在2.7.12版本中,配置有所不同,應如下配置: spring:

rabbitmq:

publisher-confirm_type: correlated # 開啟publisher confirm機制, 并設置confirm類型

# publisher-confirm-type有3中模式可選

# - none 關閉confirm機制

# - simple 同步阻塞等待mq的繪制消息

# - correlated 異步回調(diào)方式返回回執(zhí)消息

publisher-returns: true # 開啟publisher return機制

其中,當publisher-confirm_type: simple時,發(fā)布消息成功后使用rabbitTemplate調(diào)用waitForConfirms或waitForConfirmsOrDie方法等待broker節(jié)點返回發(fā)送結果,根據(jù)返回結果來判定下一步的邏輯,要注意的點是waitForConfirmsOrDie方法如果返回false則會關閉channel,則接下來無法發(fā)送消息到broker; mq的可靠性:在默認情況下,rabbitmq會將收到的消息保存到內(nèi)存中以降低消息收發(fā)的延遲。這樣會導致2個問題。1個是:一旦mq宕機,內(nèi)存中的消息會丟失。第二個是:內(nèi)存空間有限,當消費者故障或處理過慢時,會導致消息積壓,引發(fā)mq阻塞。

數(shù)據(jù)持久化

交換機持久化(如果是非持久化,當mq服務器重啟時,會丟失;spring已設置默認持久化,通過durable屬性設置)隊列持久化 (如果是非持久化,當mq服務器重啟時,會丟失;spring已設置默認持久化,通過durable屬性設置)消息持久化

如果是非持久化,當mq服務器重啟時,會丟失;需要發(fā)送消息時設置deliveryMode,1為非持久化,2為持久化,可以參考MessageProperties中的deliveryMode屬性中使用的MessageDeliveryMode這個枚舉類,默認是持久化的??梢允褂胷abbitTemplate#convertAndSend發(fā)送時,指定MessagePostProcessor消息后置處理器,從Message對象中拿到MessageProperties,然后設置MessageProperties的deliveryMode屬性)也可以使用MessageBuilder這個構建者來構建消息 LazyQueue:從RabbitMO的3.6.0版本開始,就增加了Lazy Queue的概念,也就是惰性隊列。性能較之前有很大提升

惰性隊列的特征如下

接收到消息后直接存入磁盤而非內(nèi)存(內(nèi)存中只保留最近的消息,默認2048條)消費者要消費消息時才會從磁盤中讀取并加載到內(nèi)存支持數(shù)百萬條的消息存儲在3.12版本后,所有隊列都是Lazy Queue模式,無法更改。 如何創(chuàng)建惰性隊列

在rabbitmq后臺管理頁,在聲明隊列時,指定Aruguements參數(shù)中,添加x-queue-mode為lazy即可 代碼的方式 @Bean

public Queue queue4() {

return QueueBuilder

.durable("direct.queue4")

// .lazy() // 需要2.2版本以上才有直接設置lazy的方法,不過沒事,用下面的也是一樣的

.withArgument("x-queue-mode","lazy")

.build();

}

@RabbitListener(queuesToDeclare = @Queue(

name = "lazy.queue2",

durable = "true",

arguments = @Argument(name = "x-queue-mode", value = "lazy")

))

public void lazyQueue2(String msg) {

log.info("消費消息: {}", msg);

}

rabbitmq如何保證消息的可靠性?

首先通過配置可以讓交換機、隊列、以及發(fā)送的消息都持久化。這樣隊列中的消息會持久化到磁盤,MQ重啟消息依然存在RabbitMQ在3.6版本引入了LazyQueue,并且在3.12版本后會稱為隊列的默認模式。LazyQueue會將所有消息都持久化,并且性能有很大提升(測試1000000條,19s就完成了,不會出現(xiàn)page out,并且不會阻塞mq接收新的消息)。開啟持久化和生產(chǎn)者確認時,RabbitMO只有在消息持久化完成后才會給生產(chǎn)者返回ACK回執(zhí) 消費者的可靠性

為了確認消費者是否成功處理消息,RabbitMQ提供了消費者確認機制(Consumer Acknowledgement)當消費者處理消息結束后,應該向RabbitMQ發(fā)送一個回執(zhí),告知RabbitMQ自己消息處理狀態(tài)?;貓?zhí)有三種可選值:

ack:成功處理消息,RabbitMO從隊列中刪除該消息nack:消息處理失敗,RabbitMo需要再次投遞消息reject:消息處理失敗并拒絕該消息,RabbitMO從隊列中刪除該消息 SpringAMQP已經(jīng)實現(xiàn)了消息確認功能。并允許我們通過配置文件選擇ACK處理方式(通過:spring.rabbitmq.listener.simple.acknowledge-mode屬性來配置),有如下三種方式

none: 不處理。即消息投遞給消費者后立刻ack,消息會立刻從Mq刪除,不管監(jiān)聽方法是否出現(xiàn)異常。 manual: 手動模式。需要自己在業(yè)務代碼中調(diào)用api,發(fā)送ack或reject,可以捕獲異常控制重試次數(shù),甚至可以控制失敗消息的處理方式,存在業(yè)務入侵,但更靈活

代碼示例 @Component

@RabbitListener(queues = "test.queue1")

public class MessageConsumer {

@RabbitHandler

public void recivedMessage(Message msg,

OrderReturnApplyEntity orderReturnApplyEntity,

Channel channel) throws IOException {

try {

System.out.println("接收到消息:" + msg);

int i = 1 / 0;

// 確認收到消息,false只確認當前consumer一個消息收到,true確認所有consumer獲得的消息

// 當前作為mq的消費端有1個consumeTag的消費者標識, mq每次發(fā)送消息時都會標識這條消息的deliveryTag, 這個投遞標識會遞增。

// 當消費者斷開連接后, 又連接上了mq, 此時devliveryTag會從1開始繼續(xù)遞增

// 所以如果要唯一標識消息的話, 就要在發(fā)送消息的時候, 指定correlationData的id

channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);

} catch (Exception e) {

if (msg.getMessageProperties().getRedelivered()) {

System.out.println("消息重試后依然失敗,拒絕再次接收");

// 拒絕消息,不再重新入隊

// (如果綁定了死信隊列消息會進入死信隊列,沒有綁定死信隊列則消息被丟棄,

// 也可以把失敗消息記錄到redis或者mysql中),也可以設置為true再重試。

channel.basicReject(msg.getMessageProperties().getDeliveryTag(), false);

} else {

System.out.println("消息消費時出現(xiàn)異常,即將再次返回隊列處理");

// Nack消息,重新入隊(重試一次)參數(shù)二表示是否批量,參數(shù)三表示是否重新入隊列

channel.basicNack(msg.getMessageProperties().getDeliveryTag(),

false, true);

}

log.error("處理消息發(fā)生錯誤: {}", e);

}

}

}

配置如下: server:

port: 8081

spring:

rabbitmq:

host: 119.23.61.24

port: 5672

virtual-host: /demo-vh

username: guest

password: 17E821zj

connection-timeout: 1s # 設置mq的連接超時時間

template:

retry:

enabled: true # 開啟超時重連機制

initial-interval: 1000ms # 失敗后的初始等待時間

multiplier: 1 # 失敗后下次的等待時長倍數(shù)

max-attempts: 3 # 最大重連次數(shù)

publisher-confirms: true # 開啟消息發(fā)送確認機制

publisher-returns: true # 開啟消息return機制

listener:

simple:

prefetch: 1 # 每次拉取1個消息, 處理完成后, 再拉取下1個消息, 能者多勞

acknowledge-mode: munual # 消費者確認機制為手動

問題1:假設已經(jīng)配置為manual手動確認模式,在recivedMessage方法中,忘記basicAck或者basicReject或者basicNack, 會怎么樣? 測試步驟:在rabbitmq后臺手動發(fā)1條數(shù)據(jù),到test.queue1隊列中,在recivedMessage處理消息的方法中,聲明該消息,但就是不去(basicAck或者basicReject或者basicNack)。發(fā)現(xiàn),消息處理方法收到了1次消息,因此方法只調(diào)用了1次,在rabbitmq的web后臺該消息一直處于unacked狀態(tài)。此時,關閉消費者服務,在rabbitmq的web后臺該消息處于ready狀態(tài),即待投遞。然后,再次啟動消費者,此消息又投遞了過來,消費者方法又調(diào)用了1次,此時再以同樣的配置和代碼啟動另外1個消費者,這個新啟動的消費者沒有收到這個消息(說明它不會將已投遞但未確認的消息投遞給這個新的消費者)。然后將原來的消費者停掉,此時發(fā)現(xiàn)新啟動的消費者立刻收到了這條消息,不過消息仍處于unacked狀態(tài)。這證明這個消息在發(fā)送給1個消費者之后,會等待消費者的回執(zhí),如果消費者遲遲不給回執(zhí),那就一直等,直到這個消費者掛了,消息才會變?yōu)閞eady待投遞狀態(tài),才會投遞給其它的消費者。 測試2:使用basicAck確認收到消息后,消息將從隊列中刪除。如下代碼測試,當收到消息時,使用basicAck(消息投遞標記,是否批量確認),批量確認指的是,將deliveryTag小于當前消息投遞標記的消息一并確認,這樣broker就會清理掉之前未確認的消息,這可以適用于某些情況:既然最后面的消息都確認了,之前的消息確不確認也就沒啥關系的情況。 @Slf4j

@Configuration

public class RabbitConfig {

@RabbitListener(bindings = {

@QueueBinding(

value = @Queue(value = "direct.queue2",durable = "true"),

exchange = @Exchange(value = "direct.exchange2",

type = ExchangeTypes.FANOUT),

key = {"red","blue"}

)

})

public void listenQueue(Message message, String msg, Channel channel) {

log.info("收到消息=====================");

log.info("channel:{}", channel);

log.info("msg:{}", msg);

log.info("message:{},", new String(message.getBody()));

// receivedDeliveryMode-是否持久化的消息,

// redelivered-是否重新投遞的消息,

// receivedRoutingKey-路由key,

// deliveryTag-投遞唯一標記(從1開始遞增, 每次消費者重啟后, 繼續(xù)從1開始)

// consumerTag-當前消費者唯一標記(每個消費者都有自己的唯一標記,每次消費者重連后,生成新的標記)

// consumerQueue-當前消費者收到消息的隊列

log.info("messageProperties:{}", message.getMessageProperties());

// deliveryTag-投遞標記(broker用于標記此消息),

// multiple-是否批量確認(批量確認會讓broker將小于當前消息的deliveryTag的消息給確認掉刪了)

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

log.info("處理結束=====================");

}

測試3:使用basicNack(deliveryTag, mulitiple, requeue) 拒絕簽收該消息,第3個參數(shù)決定是否讓消息重新回到隊列,如果消息回到隊列后,重新變?yōu)閞eady待投遞狀態(tài),會選擇消費者再次進行進行投遞。如果不回到隊列,那么broker將會刪除此消息,但是如果此隊列還綁定了死信交換機,那么此消息將會發(fā)給死信交換機。 @Slf4j

@Configuration

public class RabbitConfig {

@RabbitListener(bindings = {

@QueueBinding(

value = @Queue(value = "direct.queue2",durable = "true"),

exchange = @Exchange(value = "direct.exchange2",

type = ExchangeTypes.FANOUT),

key = {"red","blue"}

)

})

public void listenQueue(Message message, String msg, Channel channel) {

log.info("收到消息=====================");

log.info("channel:{}", channel);

log.info("msg:{}", msg);

log.info("message:{},", new String(message.getBody()));

// receivedDeliveryMode-是否持久化的消息,

// redelivered-是否重新投遞的消息,

// receivedRoutingKey-路由key,

// deliveryTag-投遞唯一標記(從1開始遞增, 每次消費者重啟后, 繼續(xù)從1開始)

// consumerTag-當前消費者唯一標記(每個消費者都有自己的唯一標記,每次消費者重連后,生成新的標記)

// consumerQueue-當前消費者收到消息的隊列

log.info("messageProperties:{}", message.getMessageProperties());

// deliveryTag-投遞標記(broker用于標記此消息),

// multiple-是否批量確認(批量確認會讓broker將小于當前消息的deliveryTag的消息給確認掉刪了)

// requeue-是否繼續(xù)入隊,

// =====================以下是2種情況的代碼及對應的解釋=====================

// 如果不繼續(xù)入隊, 那么broker將會刪除這個消息,

// 但是如果這個隊列綁定了死信交換機,那么會發(fā)到該私信交換機中

channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);

// 如果繼續(xù)入隊, 那么消息重新回到隊列處于待投遞狀態(tài), 然后又會投遞給當前消費者,

// 然后當前消費者又去讓這個消息去入隊待投遞, 然后又投遞給當前消費者,

// 然后就成了死循環(huán)了。

// 此時, 再開1個一樣的消費者,再監(jiān)聽此隊列,結果2個消費者都死循環(huán)了。

//channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);

log.info("處理結束=====================");

}

}

測試4:使用basicReject(deliveryTag, requeue) 拒絕該消息,與上面使用basicNack(deliveryTag, multiple, requeue)一樣的測試結果,只是basicNack方法中比basicReject多了個multiple的參數(shù)。 @Slf4j

@Configuration

public class RabbitConfig {

@RabbitListener(bindings = {

@QueueBinding(

value = @Queue(value = "direct.queue2",durable = "true"),

exchange = @Exchange(value = "direct.exchange2",

type = ExchangeTypes.FANOUT),

key = {"red","blue"}

)

})

public void listenQueue(Message message, String msg, Channel channel) {

log.info("收到消息=====================");

log.info("channel:{}", channel);

log.info("msg:{}", msg);

log.info("message:{},", new String(message.getBody()));

// receivedDeliveryMode-是否持久化的消息,

// redelivered-是否重新投遞的消息,

// receivedRoutingKey-路由key,

// deliveryTag-投遞唯一標記(從1開始遞增, 每次消費者重啟后, 繼續(xù)從1開始)

// consumerTag-當前消費者唯一標記(每個消費者都有自己的唯一標記,每次消費者重連后,生成新的標記)

// consumerQueue-當前消費者收到消息的隊列

log.info("messageProperties:{}", message.getMessageProperties());

// deliveryTag-投遞標記(broker用于標記此消息),

// requeue-是否繼續(xù)入隊,

// =====================以下是2種情況的代碼及對應的解釋=====================

// 如果不繼續(xù)入隊, 那么broker將會刪除這個消息,

// 但是如果這個隊列綁定了死信交換機,那么會發(fā)到該私信交換機中

channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);

// 如果繼續(xù)入隊, 那么消息重新回到隊列處于待投遞狀態(tài), 然后又會投遞給當前消費者,

// 然后當前消費者又去讓這個消息去入隊待投遞, 然后又投遞給當前消費者,

// 然后就成了死循環(huán)了。

// 此時, 再開1個一樣的消費者,再監(jiān)聽此隊列,結果2個消費者都死循環(huán)了。

// channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);

log.info("處理結束=====================");

}

}

auto:自動模式。SpringAMQP利用AOP對我們的消息處理邏輯做了環(huán)繞增強,

當業(yè)務正常執(zhí)行時則自動返回ack。當業(yè)務出現(xiàn)異常時,根據(jù)異常判斷返回不同結果:

如果是業(yè)務異常,會自動返回nack,并重新入隊(就會導致無限重試,導致程序死循環(huán))

測試:當消費者監(jiān)聽方法在處理消息的過程中發(fā)生NullPointer異常時,會自動對該消息進行nack(并重新入隊),然后mq服務器又會再次將此消息投遞給此消費者,并且此時mq管理后臺中對應隊列中該消息的狀態(tài)時nack(消費者一旦nack,這個消息就變成ready待投遞了,然后再次馬上又投遞給消費者了,就馬上變成了nack。測試時,我又開了1個一樣的消費者服務,結果2個消費者服務都一直不斷的拋出異常),當把消費者停了的時候,此消息又變成ready待投遞了 如果是消息處理或校驗異常,自動返回reject,消息會被刪除,不會重新入隊,不會導致死循環(huán)(在監(jiān)聽方法中,手動拋出MessageConversionException,那么也是跟reject并且不重新入隊,一樣的效果,消息會被刪除,不會導致死循環(huán)) 測試代碼:

配置 server:

port: 8081

spring:

rabbitmq:

host: 119.23.61.24

port: 5672

virtual-host: /demo-vh

username: guest

password: 17E821zj

connection-timeout: 1s # 設置mq的連接超時時間

template:

retry:

enabled: true # 開啟超時重連機制

initial-interval: 1000ms # 失敗后的初始等待時間

multiplier: 1 # 失敗后下次的等待時長倍數(shù)

max-attempts: 3 # 最大重連次數(shù)

publisher-confirms: true # 開啟消息發(fā)送確認機制

publisher-returns: true # 開啟消息return機制

listener:

simple:

prefetch: 1 # 每次拉取1個消息, 處理完成后, 再拉取下1個消息, 能者多勞

acknowledge-mode: auto # 消費者確認機制

代碼:下面代碼就是,故意在確認機制已經(jīng)是自動確認的配置下,依然確認或拒絕,檢查不同情況下,消息的流轉情況 @Slf4j

@Configuration

public class RabbitConfig {

@RabbitListener(bindings = {

@QueueBinding(

value = @Queue(value = "direct.queue2",durable = "true"),

exchange = @Exchange(value = "direct.exchange2",

type = ExchangeTypes.FANOUT),

key = {"red","blue"}

)

})

public void listenQueue(Message message, String msg, Channel channel) {

log.info("收到消息=====================");

log.info("channel:{}", channel);

log.info("msg:{}", msg);

log.info("message:{},", new String(message.getBody()));

// receivedDeliveryMode-是否持久化的消息,

// redelivered-是否重新投遞的消息,

// receivedRoutingKey-路由key,

// deliveryTag-投遞唯一標記(從1開始遞增, 每次消費者重啟后, 繼續(xù)從1開始)

// consumerTag-當前消費者唯一標記(每個消費者都有自己的唯一標記,每次消費者重連后,生成新的標記)

// consumerQueue-當前消費者收到消息的隊列

log.info("messageProperties:{}", message.getMessageProperties());

String body = new String(message.getBody());

long deliveryTag = message.getMessageProperties().getDeliveryTag();

// 以下的原意是指: 方法本身的作用, 并不是在確認模式是自動確認下調(diào)用這些方法的作用

if ("1".equals(body)) {

// 原意: 確認, 不批量

//(channel會shutdown, 然后會重新連接, 然后消息被刪除)

channel.basicAck(deliveryTag, false);

}

else if ("2".equals(body)) {

// 原意: 拒絕, 重新入隊

//(channel會shutdown, 然后會重新連接, 消息會重新入隊,

// 然后消費者再次收到此消息, 然后不斷循環(huán), 并且中間的過程會拋出異常)

channel.basicReject(deliveryTag, true);

}

else if ("3".equals(body)) {

// 原意: 拒絕, 不重新入隊(broker將會刪除此消息,

// 如果該隊列還綁定了死信交換機,那么會發(fā)往此交換機)

//(收到1次消息后, channel會shutdown, 然后會重新連接, 不會再次收到該消息,

// 因為消息已經(jīng)被刪除了)

channel.basicReject(deliveryTag, false);

}

else if ("4".equals(body)) {

// 原意: 拒絕, 不批量, 重新入隊(會再次投遞給消費者)

//(與2表現(xiàn)幾乎一致)

channel.basicNack(deliveryTag, false, true);

}

else if ("5".equals(body)) {

// 原意: 拒絕, 批量, 重新入隊(對于之前未確認的消息,批量拒絕并重新入隊)

//(與2表現(xiàn)幾乎一致)

channel.basicNack(deliveryTag, true, true);

}

else if ("6".equals(body)) {

// 拋出空指針異常,

//(收到1次消息, 然后這里拋出異常, 然后會自動nack并重新入隊, 然后又收到該消息, 不斷循環(huán))

throw new NullPointerException("666...");

}

else if ("7".equals(body)) {

// 拋出消息轉換異常,

//(拋出異常,消息會被刪除, 并且不會重新入隊, 不會死循環(huán)。

// 同basicReject拒絕消息并且設置不重新入隊)

throw new MessageConversionException("777...");

}

else {

// 模擬正常處理

System.out.println("自動確認模式正常處理情況...");

}

log.info("處理結束=====================");

}

}

失敗重試機制:

當消費者出現(xiàn)異常后,如果消費者設置的參數(shù)讓此消息再次回到隊列,那么消息會requeue(重新入隊)到隊列,等待投遞,然后就會再投遞給消費者,消費者收到該消息后再次異常,由于消費者設置的參數(shù)又讓此消息再次回到隊列,因此就會無限循環(huán),導致mq的消息處理飆升,帶來不必要的壓力。我們可以利用Spring的retry機制,在消費者出現(xiàn)異常時利用本地重試,而不是無限制的requeue到mq隊列,嘗試作如下配置來設置消費者。 server:

port: 8081

spring:

rabbitmq:

host: xx.xx.xx.xx

port: 5672

virtual-host: /demo-vh

username: guest

password: xx

connection-timeout: 1s # 設置mq的連接超時時間

template: #(消息生產(chǎn)者的配置)

retry:

enabled: true # 開啟超時重連機制

initial-interval: 1000ms # 失敗后的初始等待時間

multiplier: 1 # 失敗后下次的等待時長倍數(shù)

max-attempts: 3 # 最大重連次數(shù)

publisher-confirms: true # 開啟消息發(fā)送確認機制

publisher-returns: true # 開啟消息return機制

listener: # (消息消費者的配置)

simple:

prefetch: 1 # 每次拉取1個消息, 處理完成后, 再拉取下1個消息, 能者多勞

acknowledge-mode: auto # 消費者確認機制

## ===========添加失敗重試機制===========

retry:

enabled: true # 開啟消費者失敗重試

initial-interval: 1000ms # 初始的失敗等待時長為1s

multiplier: 1 # 下次失敗的等待時長倍數(shù),

# 下次等待時長 = multiplier * last-interval

max-attempts: 3 # 最大重試次數(shù)(不設置的話, 默認配置的就是3次)

stateless: true # true無狀態(tài), false有狀態(tài)。如果業(yè)務中包含事務, 這里改為false

當消費者添加如上失敗重試機制前,我們發(fā)現(xiàn)本來消費者的監(jiān)聽方法拋出空指針異常,然后一直在不斷的nack消息(并且設置重新入隊參數(shù)為true),然后mq又投遞過來然后又導致空指針異常,然后又nack消息并重新入隊,然后不斷的死循環(huán)的跑著(同上1個例子配置確認模式為auto,并且監(jiān)聽方法中拋出NullPointerException異常的例子)。加上失敗重試機制的配置后,同樣是在確認模式為auto,并且監(jiān)聽方法中拋出NullPointerException異常的情況下,發(fā)現(xiàn)消費者就拉取了1次消息,然后在本地重試了3次,在這期間mq也并沒有投遞消息過來,當重試3次都失敗后,此消息從消息隊列中刪除了(重試次數(shù)耗盡都失敗之后,直接拒絕了該消息,并且不重新入隊)。 失敗消息處理策略:在開啟重試模式后,重試次數(shù)耗盡,如果消息依然失敗,則需要有MessageRecoverer接口來處理,它包含三種不同的實現(xiàn)

實現(xiàn)方式分類

RejectAndDontRequeueRecoverer: 重試耗盡后,直接reject,丟棄消息。默認就是這種方式(當重試都失敗之后,直接丟棄該消息)ImmediateRequeueMessageRecoverer: 重試耗盡后,返回nack,消息重新入隊(當重試都失敗之后,重新入隊,接著繼續(xù)接收該消息,如果重試都失敗之后,又重新入隊)RepublishMessageRecoverer: 重試耗盡后,將失敗消息投遞到指定的交換機(當重試都失敗之后,發(fā)送到指定的交換機,將此消息路由到與此交換機所綁定的隊列) RepublishMessageRecoverer使用示例 示例描述:當向direct.queue2發(fā)送1個payload為7的消息時,消費者就會只拉取1次該消息,并且會在本地重試3次,如果重試3次都失?。ū纠兄灰⑹?就會拋出空指針異常)之后,就會發(fā)送到error.direct交換機,然后根據(jù)路由key路由到error.queue消息隊列,其中消息的內(nèi)容就是異常棧的字符串,這樣就可以讓人工介入處理。并且使用失敗消息處理策略后,不會出現(xiàn)無限制:失敗重試,然后重發(fā),繼續(xù)失敗重試。 server:

port: 8081

spring:

rabbitmq:

host: xx.xx.xx.xx

port: 5672

virtual-host: /demo-vh

username: guest

password: xxx

connection-timeout: 1s # 設置mq的連接超時時間

template:

retry:

enabled: true # 開啟超時重連機制

initial-interval: 1000ms # 失敗后的初始等待時間

multiplier: 1 # 失敗后下次的等待時長倍數(shù)

max-attempts: 3 # 最大重連次數(shù)

publisher-confirms: true # 開啟消息發(fā)送確認機制

publisher-returns: true # 開啟消息return機制

listener:

simple:

prefetch: 1 # 每次拉取1個消息, 處理完成后, 再拉取下1個消息, 能者多勞

acknowledge-mode: auto # 消費者確認機制

## ===========添加失敗重試機制===========

retry:

enabled: true # 開啟消費者失敗重試

initial-interval: 1000ms # 初始的失敗等待時長為1s

multiplier: 1 # 下次失敗的等待時長倍數(shù),

# 下次等待時長 = multiplier * last-interval

max-attempts: 3 # 最大重試次數(shù)(不設置的話, 默認配置的就是3次)

stateless: true # true無狀態(tài), false有狀態(tài)。如果業(yè)務中包含事務, 這里改為false

@Slf4j

@Configuration

public class RabbitConfig {

@RabbitListener(bindings = {

@QueueBinding(

value = @Queue(value = "direct.queue2",durable = "true"),

exchange = @Exchange(value = "direct.exchange2",

type = ExchangeTypes.FANOUT),

key = {"red","blue"}

)

})

public void listenQueue(Message message, String msg, Channel channel) {

log.info("收到消息=====================");

log.info("channel:{}", channel);

log.info("msg:{}", msg);

log.info("message:{},", new String(message.getBody()));

// receivedDeliveryMode-是否持久化的消息,

// redelivered-是否重新投遞的消息,

// receivedRoutingKey-路由key,

// deliveryTag-投遞唯一標記(從1開始遞增, 每次消費者重啟后, 繼續(xù)從1開始)

// consumerTag-當前消費者唯一標記(每個消費者都有自己的唯一標記,每次消費者重連后,生成新的標記)

// consumerQueue-當前消費者收到消息的隊列

log.info("messageProperties:{}", message.getMessageProperties());

String body = new String(message.getBody());

long deliveryTag = message.getMessageProperties().getDeliveryTag();

if ("7".equals(body)) {

// 拋出消息轉換異常,

//(拋出異常,消息會被刪除, 并且不會重新入隊, 不會死循環(huán)。

// 同basicReject拒絕消息并且設置不重新入隊)

throw new MessageConversionException("777...");

}

log.info("處理結束=====================");

}

}

@Slf4j

@Configuration

// 當開啟了消費者失敗重試時, 當前配置類才生效

@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled",

havingValue = "true")

public class ErrorConfiguration {

// 定義1個直連交換機

@Bean

public DirectExchange errorExchange(){

return new DirectExchange("error.direct");

}

// 定義1個消息隊列

@Bean

public Queue errorQueue(){

return new Queue("error.queue");

}

// 綁定

@Bean

public Binding errorBinding(Queue errorQueue, DirectExchange errorExchange){

return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");

}

// 消息消費時重試耗盡并且都失敗(例如: 確認模式為auto,并且監(jiān)聽方法中拋出NullPointerException異常)時

// 的后續(xù)處理策略,

// 因為這里返回的是RepublishMessageRecoverer,所以在重試耗盡時發(fā)送到指定交換機,并攜帶指定的路由key

@Bean

public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){

log.debug("加載RepublishMessageRecoverer");

return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");

}

}

業(yè)務冪等性:通過以上所有的手段,我們可以保證消息至少被消費者消費1次。但是由于網(wǎng)絡波動等原因?qū)е孪M者消費同一消息多次,這個時候,就需要保證消息的冪等性。所謂的冪等性指的是,消費同一消息多次產(chǎn)生的效果與消費該消息1次的效果是相同的,或者說對業(yè)務狀態(tài)的影響是一致的。

唯一消息id方案:

給每個消息都設置一個唯一id,利用id區(qū)分是否是重復消息:

每一條消息都生成一個唯一的id,與消息一起投遞給消費者消費者接收到消息后處理自己的業(yè)務,業(yè)務處理成功后將消息ID保存到數(shù)據(jù)庫如果下次又收到相同消息,去數(shù)據(jù)庫查詢判斷是否存在,存在則為重復消息放棄處理 使用步驟:消費者和生產(chǎn)者中都配置如下的消息轉換器,并且設置createMessageIds屬性為true @Bean

public MessageConverter jacksonMessageConvertor(){

Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();

// 1. 設置此屬性后, 會在使用rabbitTemplate發(fā)送消息時,

// 當未設置消息屬性MessageProperties#messageId時,對消息對象的MessageProperties的messageId設

// 置1個uuid值, 用來作為這條消息的標識。

// 2. 當然也可以在使用rabbitTemplate發(fā)送消息時, 指定1個MessagePostProcessor,

// 來設置MessageProperties#messageId的值

jjmc.setCreateMessageIds(true);

return jjmc;

}

業(yè)務判斷

結合業(yè)務邏輯,基于業(yè)務本身作判斷

以我們的業(yè)務為例:我們要在支付后修改訂單狀態(tài)為已支付,應該在修改訂單狀態(tài)前先查詢訂單狀態(tài),判斷狀態(tài)是否是未支付。只有未支付訂單才需要修改,其它狀態(tài)不做處理: @Component

@RequiredArgsConstructor

public class PayStatusListener {

private final IOrderService orderService;

@RabbitListener(bindings = @QueueBinding(

value = @Queue(name = "mark.order.pay.queue", durable = "true"),

exchange = @Exchange(name = "pay.topic", type = ExchangeTypes.TOPIC),

key = "pay.success"

))

public void listenOrderPay(Long orderId) {

/*

// 1.查詢訂單

Order order = orderService.getById(orderId);

// 2.判斷訂單狀態(tài)是否為未支付

if(order == null || order.getStatus() != 1){

// 訂單不存在,或者狀態(tài)異常

return;

}

// 3.如果未支付,標記訂單狀態(tài)為已支付

orderService.markOrderPaySuccess(orderId);

*/

// 其實可以使用下面的一步搞定(類似于樂觀鎖機制)

// update order set status = 2 where id = ? AND status = 1

orderService.lambdaUpdate()

.set(Order::getStatus, 2)

.set(Order::getPayTime, LocalDateTime.now())

.eq(Order::getId, orderId)

.eq(Order::getStatus, 1)

.update();

}

}

如何保證支付服務與交易服務之間的訂單狀態(tài)一致性?

首先,支付服務會正在用戶支付成功以后利用MQ消息通知交易服務完成訂單狀態(tài)同步。其次,為了保證MQ消息的可靠性,我們采用了生產(chǎn)者確認機制、消費者確認、消費者失敗重試等策略,確保消息投遞和處理的可靠性。同時也開啟了MQ的持久化,避免因服務宕機導致消息丟失最后,我們還在交易服務更新訂單狀態(tài)時做了業(yè)務冪等判斷,避免因消息重復消費導致訂單狀態(tài)異常。 如果交易服務消息處理失敗,有沒有什么兜底方案?

我們可以在交易服務設置定時任務,定期查詢訂單支付狀態(tài)。這樣即便MQ通知失敗,還可以利用定時任務作為兜底方案,確保訂單支付狀態(tài)的最終一致性。 延遲消息:勝場這發(fā)送消息時,指定1個時間,消費者不會立刻收到消息,而是在指定時間之后才收到消息

死信交換機方案

當一個隊列中的消息滿足下列情況之一時,就會成為死信 (dead letter)

消費者使用basic.reject或 basic.nack聲明消費失敗,并且消息的requeue參數(shù)設置為false消息是一個過期消息(達到了隊列或消息本身設置的過期時間),超時無人消費要投遞的隊列消息堆積滿了,最早的消息可能成為死信 如果隊列通過dead-letter-exchange屬性指定了一個交換機,那么該隊列中的死信就會投遞到這個交換機中。這個交換機稱為該隊列的死信交換機 (Dead Letter Exchange,簡稱DLX) 示例 配置如下: server:

port: 8081

spring:

rabbitmq:

host: xx.xx.xx.xx

port: 5672

virtual-host: /demo-vh

username: guest

password: xxx

connection-timeout: 1s # 設置mq的連接超時時間

template:

retry:

enabled: true # 開啟超時重連機制

initial-interval: 1000ms # 失敗后的初始等待時間

multiplier: 1 # 失敗后下次的等待時長倍數(shù)

max-attempts: 3 # 最大重連次數(shù)

publisher-confirms: true # 開啟消息發(fā)送確認機制

publisher-returns: true # 開啟消息return機制

listener:

simple:

prefetch: 1 # 每次拉取1個消息, 處理完成后, 再拉取下1個消息, 能者多勞

acknowledge-mode: auto # 消費者確認機制

## ===========添加失敗重試機制===========

retry:

enabled: true # 開啟消費者失敗重試

initial-interval: 1000ms # 初始的失敗等待時長為1s

multiplier: 1 # 下次失敗的等待時長倍數(shù),

# 下次等待時長 = multiplier * last-interval

max-attempts: 3 # 最大重試次數(shù)(不設置的話, 默認配置的就是3次)

stateless: true # true無狀態(tài), false有狀態(tài)。如果業(yè)務中包含事務, 這里改為false

代碼如下 @Slf4j

@Configuration

public class RabbitConfig {

/* 死信處理的交換機、隊列、綁定等定義 */

@Bean

public org.springframework.amqp.core.Exchange dlxExchange() {

Exchange exchange = ExchangeBuilder.directExchange("dlx.directExchange")

.durable(true)

.build();

return exchange;

}

@Bean

public org.springframework.amqp.core.Queue dlxQueue() {

Queue queue = QueueBuilder.durable("dlx.queue").build();

return queue;

}

@Bean

public Binding dlxExAndQueueBinding() {

// 建立 dlx.directExchange交換機 到 dlx.queue隊列 的綁定關系, 并指定路由key為red

Binding binding = BindingBuilder.bind(dlxQueue())

.to(dlxExchange())

.with("red")

.noargs();

return binding;

}

/* 讓消息成為死信的交換機、隊列、綁定等定義 */

// 當向direct.timedExchange交換機發(fā)送消息,并且攜帶red作為路由key,那么此消息會被路由到direct.queue隊列

// 并且, 當這個消息設置了過期時間(通過設置MessageProperties#expiration屬性), 同時direct.queue又沒有消費者,

// 那么, 當?shù)搅诉^期時間時, 這個消息會被發(fā)送到該隊列所綁定的死信交換機, 并攜帶原消息原來的路由key,

// 然后, 我們在下面的監(jiān)聽方法中監(jiān)聽死信隊列

@Bean

public org.springframework.amqp.core.Exchange directTimedExchange() {

Exchange exchange = ExchangeBuilder.directExchange("direct.timedExchange")

.durable(true)

.build();

return exchange;

}

@Bean

public org.springframework.amqp.core.Queue directQueue() {

org.springframework.amqp.core.Queue queue = QueueBuilder.durable("direct.queue")

// 通過設置參數(shù), 來指定該隊列的死信交換機

.withArgument("x-dead-letter-exchange", "dlx.directExchange")

.build();

return queue;

}

@Bean

public Binding exAndQueueBinding() {

// 建立 direct.timedExchange 交換機 到 direct.queue 隊列 的綁定關系, 并指定路由key為red

Binding binding = BindingBuilder.bind(directQueue())

.to(directTimedExchange())

.with("red")

.noargs();

return binding;

}

/* 監(jiān)聽死信隊列 */

@RabbitListener(queues = {"dlx.queue"})

public void handleDlxMsg(Message message) {

log.info("收到消息=====================");

// 可以在此處觀察日志的輸出時間, 和消息的數(shù)據(jù)(我設置的消息的數(shù)據(jù)就是消息的發(fā)送時間)

log.info("message:{},", new String(message.getBody()));

}

}

@Slf4j

@RequestMapping("rabbit")

@RestController

public class RabbitController {

@Autowired

private RabbitTemplate rabbitTemplate;

@GetMapping("orderMsg")

public Object orderMsg(String expiration, String exchange, String routeKey) {

SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");

String content = sdf.format(new Date());

rabbitTemplate.convertAndSend(exchange, routeKey, content, new MessagePostProcessor() {

@Override

public Message postProcessMessage(Message message) throws AmqpException {

// 設置消息過期時間

message.getMessageProperties().setExpiration(expiration);

return message;

}

});

return "ok";

}

}

測試步驟: 第一步:發(fā)送http://localhost:8081/rabbit/orderMsg?expiration=10000&exchange=direct.timedExchange&routeKey=red,發(fā)現(xiàn)確實是在10秒后收到消息 第二步:發(fā)送http://localhost:8081/rabbit/orderMsg?expiration=5000&exchange=direct.timedExchange&routeKey=red,發(fā)現(xiàn)確實是在5秒后收到消息 第三步:發(fā)送完http://localhost:8081/rabbit/orderMsg?expiration=5000&exchange=direct.timedExchange&routeKey=red,接著隔1-2秒發(fā)送http://localhost:8081/rabbit/orderMsg?expiration=10000&exchange=direct.timedExchange&routeKey=red,發(fā)現(xiàn)1個是在5s后收到消息,1個是在10s后收到消息 第四步:發(fā)送完http://localhost:8081/rabbit/orderMsg?expiration=10000&exchange=direct.timedExchange&routeKey=red,接著隔1-2秒發(fā)送http://localhost:8081/rabbit/orderMsg?expiration=5000&exchange=direct.timedExchange&routeKey=red,2個消息都是隔10s才收到的消息 這足以證明如果采取這種方案是有問題的,必須是處于消息隊列頂端的消息隊列到期時,才會立馬進入死信隊列。所以如果要用這種方案的話,最好是分超時隊列,不同的超時時間發(fā)送的不同的隊列,這樣就能保證,最先進入隊列的消息先超時,后面的消息也都能正常延遲消費。 延遲插件

介紹

RabbitMO的官方也推出了一個插件,原生支持延遲消息功能。該插件的原理是設計了一種支持延遲消息功能的交換機,當消息投遞到交換機后可以在交換機中暫存一定時間,到期后再投遞到隊列。下載rabbitmq延遲消息插件地址:rabbitmq-delayed-message-exchangerabbitmq官方文檔中對延遲消息插件的介紹及使用說明github上README.md關于延遲消息插件的安裝使用說明 安裝

官方介紹的步驟

第一步,先下載延遲消息插件第二步,如果需要找到rabbitmq的插件目錄,可以執(zhí)行:rabbitmq-plugins directories -s第三步,將下載的延遲消息插件復制到插件目錄第四步,執(zhí)行開啟插件命令:rabbitmq-plugins enable rabbitmq_delayed_message_exchange 自己的安裝步驟(使用docker安裝rabbitmq時,未掛載插件目錄),由于自己之前安裝rabbitmq之前沒有將插件目錄掛載出來,所以步驟不一樣

docker exec -it rabbitmq /bin/bash進入到rabbitmq容器中,然后在當前的 / 目錄下有個plugins目錄,進入可以看到很多.ez結尾的插件docker cp ./rabbitmq_delayed_message_exchange-3.8.17.8f537ac.ez 037a6fed1d41:/plugins/,將當前的延遲消息插件復制到rabbitmq容器中的/plugins目錄中docker exec -it rabbitmq rabbitmq-plugins enable rabbitmq_delayed_message_exchange,在容器外部執(zhí)行此命令,讓rabbitmq容器啟用此插件 docker已掛載插件目錄的rabbitmq容器安裝步驟

安裝rabbitmq docker run \

-e RABBITMQ_DEFAULT_USER=guest \

-e RABBITMQ_DEFAULT_PASS=guest \

-v mq-plugins:/plugins \

--name mq \

--hostname mq \

-p 15672:15672 \

-p 5672:5672 \

--network hmall \

-d \

rabbitmq:3.8-management

docker inspect mq查看,可以看到在Mounts節(jié)點中,已將source掛載到了容器中的Destination(即/plugins目錄中),然后將.ez的延遲消息插件拷貝到source所代表的文件位置即可 docker exec -it rabbitmq rabbitmq-plugins enable rabbitmq_delayed_message_exchange,在容器外部執(zhí)行此命令,讓rabbitmq容器啟用此插件 使用:使用的時候,只需要在聲明交換機時,設置此交換機的delayed屬性為true即可(spring-amqp包須>=1.6),而在發(fā)送消息的時候,需要設置MessageProperties#setDelay(Integer)傳入需要延遲的時間,單位:毫秒,其實就是設置x-delay頭。 代碼如下: server:

port: 8081

spring:

rabbitmq:

host: xx

port: 5672

virtual-host: /demo-vh

username: guest

password: xx

connection-timeout: 1s # 設置mq的連接超時時間

template:

retry:

enabled: true # 開啟超時重連機制

initial-interval: 1000ms # 失敗后的初始等待時間

multiplier: 1 # 失敗后下次的等待時長倍數(shù)

max-attempts: 3 # 最大重連次數(shù)

publisher-confirms: true # 開啟消息發(fā)送確認機制

publisher-returns: true # 開啟消息return機制

listener:

simple:

prefetch: 1 # 每次拉取1個消息, 處理完成后, 再拉取下1個消息, 能者多勞

acknowledge-mode: auto # 消費者確認機制

## ===========添加失敗重試機制===========

retry:

enabled: true # 開啟消費者失敗重試

initial-interval: 1000ms # 初始的失敗等待時長為1s

multiplier: 1 # 下次失敗的等待時長倍數(shù),

# 下次等待時長 = multiplier * last-interval

max-attempts: 3 # 最大重試次數(shù)(不設置的話, 默認配置的就是3次)

stateless: true # true無狀態(tài), false有狀態(tài)。如果業(yè)務中包含事務, 這里改為false

@Slf4j

@Configuration

public class RabbitConfig {

/* 第1個延遲隊列的相關交換機、隊列、綁定、監(jiān)聽方法定義 */

@RabbitListener(bindings = {

@QueueBinding(exchange = @Exchange(name = "dly.direct.ex",delayed = "true",durable = "true"),

key = "dly",

value = @Queue(name = "dly.queue", durable = "true")

)

})

public void listenDelayedMsg(Message message, String msg) {

log.info("收到消息=====================1111");

// 可以在此處觀察日志的輸出時間, 和消息的數(shù)據(jù)(我設置的消息的數(shù)據(jù)就是消息的發(fā)送時間)

log.info("message: {}, msg: {}", new String(message.getBody()), msg);

}

/* 第2個延遲隊列的相關交換機、隊列、綁定、監(jiān)聽方法定義 */

@Bean

public org.springframework.amqp.core.Exchange dly2DirectExchange() {

return ExchangeBuilder.directExchange("dly2.direct.ex").delayed().build();

}

@Bean

public org.springframework.amqp.core.Queue dly2Queue() {

return QueueBuilder.durable("dly2.queue").build();

}

@Bean

public Binding binding() {

return BindingBuilder.bind(dly2Queue()).to(dly2DirectExchange()).with("dly2").noargs();

}

@RabbitListener(queues = {"dly2.queue"})

public void listenDelayedMsg2(Message message, String msg) {

log.info("收到消息=====================2222");

// 可以在此處觀察日志的輸出時間, 和消息的數(shù)據(jù)(我設置的消息的數(shù)據(jù)就是消息的發(fā)送時間)

log.info("message: {}, msg: {}", new String(message.getBody()), msg);

}

}

@Slf4j

@RequestMapping("rabbit")

@RestController

public class RabbitController {

@Autowired

private RabbitTemplate rabbitTemplate;

@GetMapping("delayMsg")

public Object delayMsg(Integer delayTimeMillis, String exchange, String routeKey) {

SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");

String content = sdf.format(new Date());

/* 發(fā)送消息至指定的交換機, 并指定路由key, 注意延遲消息須如下設置延遲時間 */

rabbitTemplate.convertAndSend(exchange, routeKey, content, new MessagePostProcessor() {

@Override

public Message postProcessMessage(Message message) throws AmqpException {

// 設置延遲時間

message.getMessageProperties().setDelay(delayTimeMillis);

return message;

}

});

return "ok";

}

}

測試: 第1個延遲消息測試: http://localhost:8081/rabbit/delayMsg?delayTimeMillis=5000&exchange=dly.direct.ex&routeKey=dly, http://localhost:8081/rabbit/delayMsg?delayTimeMillis=10000&exchange=dly.direct.ex&routeKey=dly 第2個延遲消息測試: http://localhost:8081/rabbit/delayMsg?delayTimeMillis=5000&exchange=dly2.direct.ex&routeKey=dly2, http://localhost:8081/rabbit/delayMsg?delayTimeMillis=10000&exchange=dly2.direct.ex&routeKey=dly2 總結:這種延遲消息都是需要消耗性能的,每來1個延遲消息,它都需要在mq的內(nèi)部維護1個時鐘,時鐘的運行需要CPU不斷的運算。當延遲消息很多的時候,對CPU的占用就越高。而延遲消息指定的延遲時間設置的過長,就會給CPU造成額外的壓力。因此,延遲消息適用于指定延遲的時間較短的消息。 延遲消息優(yōu)化:上面,我們說到延遲消息適用于指定延遲的時間較短的消息。針對延遲時間較長的消息,我們可以對延遲消息做個優(yōu)化,將1個長時間的延遲消息拆分成若干個一小段一小段時間的延遲消息,然后針對業(yè)務做邏輯。 這個就是待發(fā)送的消息,data是數(shù)據(jù),delayMillis中維護了一堆的時間段序列,每次要發(fā)消息到mq時,先從這個時間段序列中獲取該時間段序列作為消息的延遲時間 @Data

public class MultiDelayMessage {

/**

* 消息體

*/

private T data;

/**

* 記錄延遲時間的集合

*/

private List delayMillis;

public MultiDelayMessage(T data, List delayMillis) {

this.data = data;

this.delayMillis = delayMillis;

}

public static MultiDelayMessage of(T data, Long ... delayMillis){

return new MultiDelayMessage<>(data, CollUtils.newArrayList(delayMillis));

}

/**

* 獲取并移除下一個延遲時間

* @return 隊列中的第一個延遲時間

*/

public Long removeNextDelay(){

return delayMillis.remove(0);

}

/**

* 是否還有下一個延遲時間

*/

public boolean hasNextDelay(){

return !delayMillis.isEmpty();

}

}

下單成功后,我們發(fā)送第1個延遲消息到rabbitmq中, // 1.訂單數(shù)據(jù)

// 2.保存訂單詳情

// 3.扣減庫存

// 4.清理購物車商品

// 5.延遲檢測訂單狀態(tài)消息

try {

MultiDelayMessage msg = MultiDelayMessage.of(order.getId(),

10000L, 10000L, 10000L,

15000L, 15000L, 30000L, 30000L);

rabbitTemplate.convertAndSend(

MqConstants.DELAY_EXCHANGE,

MqConstants.DELAY_ORDER_ROUTING_KEY,

msg,

new DelayMessageProcessor(msg.removeNextDelay().intValue())

);

} catch (AmqpException e) {

log.error("延遲消息發(fā)送異常!", e);

}

監(jiān)聽延遲消息交換機所綁定的隊列,當接收到的消息體中,還存在時間段序列時,繼續(xù)發(fā),如果不存在了,那就說明所有的時間都消耗了 @Component

@RequiredArgsConstructor

public class OrderStatusCheckListener {

private final IOrderService orderService;

private final RabbitTemplate rabbitTemplate;

@RabbitListener(bindings = @QueueBinding(

value = @Queue(value = MqConstants.DELAY_ORDER_QUEUE, durable = "true"),

exchange = @Exchange(value = MqConstants.DELAY_EXCHANGE,

delayed = "true",

type = ExchangeTypes.TOPIC),

key = MqConstants.DELAY_ORDER_ROUTING_KEY

))

public void listenOrderDelayMessage(MultiDelayMessage msg) {

// 1.查詢訂單狀態(tài)

Order order = orderService.getById(msg.getData());

// 2.判斷是否已經(jīng)支付

if (order == null || order.getStatus() == 2) {

// 訂單不存在或者已經(jīng)被處理

return;

}

// TODO 3.去支付服務查詢真正的支付狀態(tài)

boolean isPay = false;

// 3.1.已支付,標記訂單狀態(tài)為已支付

if (isPay) {

orderService.markOrderPaySuccess(order.getId());

return;

}

// 4.判斷是否存在延遲時間

if (msg.hasNextDelay()) {

// 4.1.存在,重發(fā)延遲消息

Long nextDelay = msg.removeNextDelay();

rabbitTemplate.convertAndSend(

MqConstants.DELAY_EXCHANGE, MqConstants.DELAY_ORDER_ROUTING_KEY,

msg, new DelayMessageProcessor(nextDelay.intValue()));

return;

}

// 5.不存在,取消訂單

orderService.cancelOrder(order.getId());

}

}

柚子快報邀請碼778899分享:RabbitMq學習

http://yzkb.51969.com/

參考鏈接

評論可見,查看隱藏內(nèi)容

本文內(nèi)容根據(jù)網(wǎng)絡資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。

轉載請注明,如有侵權,聯(lián)系刪除。

本文鏈接:http://m.gantiao.com.cn/post/18461276.html

發(fā)布評論

您暫未設置收款碼

請在主題配置——文章設置里上傳

掃描二維碼手機訪問

文章目錄