柚子快報邀請碼778899分享:RabbitMQ消息重復(fù)消費(fèi)
RabbitMQ消息重復(fù)消費(fèi)問題
同一條消息被一個消費(fèi)者消費(fèi)多次或者被多個消費(fèi)者消費(fèi)??赡軐?dǎo)致系統(tǒng)相關(guān)業(yè)務(wù)重復(fù)執(zhí)行和數(shù)據(jù)不一致問題。
1.場景模擬
生產(chǎn)者
public String sendMessage() {
for (int i = 1; i <= 100; i++) {
//生成消息id
String messageId = UUID.randomUUID().toString();
//消息內(nèi)容
String message = "rabbitMQ測試消息!" + i;
//發(fā)送消息
rabbitTemplate.convertAndSend(DIRECT_EXCHANGE_NAME, DIRECT_ROUTE_KEY,message,msg -> {
msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);//持久化
msg.getMessageProperties().setMessageId(messageId);//設(shè)置消息id
return msg;
});
System.out.println("已發(fā)送消息:id=" + messageId + " message="+ message);
}
return "消息發(fā)送成功!";
}
消費(fèi)者
@RabbitListener(queues = "directQueue")
public void spendMessage(String msg, Channel channel, Message message) throws IOException {
String messageId = message.getMessageProperties().getMessageId();
System.out.println("接收到消息:id=" + messageId+ " message=" + msg);
try {
//模擬消費(fèi)耗時
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
//channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
測試步驟
生產(chǎn)者發(fā)送100條消息
啟動消費(fèi)者
由于最后一條消息沒有調(diào)用basicAck方法,消息并沒有消費(fèi)成功,當(dāng)我們重啟消費(fèi)者服務(wù)時,消息會被再次消費(fèi)。
重啟消費(fèi)者
2.解決方案
因?yàn)槊織l消息都有自己的id(唯一標(biāo)識),可根據(jù)這個id來判斷消息是否被消費(fèi)過。
消費(fèi)消息前先獲取消息id—>查詢緩存是否存在此id,判斷id對應(yīng)的值—>為1則表示該消息被消費(fèi)過,為0則表示消費(fèi)中
以下示例使用redis作為介質(zhì)
消費(fèi)者
@RabbitListener(queues = "directQueue")
public void spendMessage(String msg, Channel channel, Message message) throws IOException {
String messageId = message.getMessageProperties().getMessageId();
//messageId對應(yīng)的緩存值為0時表示消息消費(fèi)中,1表示消費(fèi)完成
if(Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(messageId, "0", 30L, TimeUnit.SECONDS))){//消息第一次被消費(fèi)
try {
//模擬消費(fèi)耗時
System.out.println("接收到消息:id=" + messageId+ " message=" + msg);
Thread.sleep(100);
//業(yè)務(wù)執(zhí)行完成后標(biāo)識消息消費(fèi)完成
redisTemplate.opsForValue().set(messageId,"1",30L,TimeUnit.SECONDS);
//消息消費(fèi)確認(rèn)
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
//消費(fèi)成功后刪除緩存
redisTemplate.delete(messageId);
} catch (Exception e) {
//丟棄消息(關(guān)聯(lián)了死信隊列的話可以放入死信隊列處理)
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
//刪除緩存
redisTemplate.delete(messageId);
}
}else{
String value = redisTemplate.opsForValue().get(messageId);
if("0".equals(value)){
return;
}
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
再次按照以上步驟測試,消息不會被重復(fù)消費(fèi)。
最后一條被消費(fèi)的消息為49,再次啟動后未重復(fù)消費(fèi)。
本示例中緩存過期時間為30s,若啟動消費(fèi)者的時間間隔超過30s,則消息仍會被重復(fù)消費(fèi)
3.死信隊列
死信隊列就是一個普通隊列,可以使用任意種交換機(jī),業(yè)務(wù)隊列可通過綁定死信交換機(jī)和路由鍵自動將被nack和reject且不重新入隊的消息發(fā)送給對應(yīng)的死信隊列?;蛘咄ㄟ^設(shè)置業(yè)務(wù)隊列的消息過期時間實(shí)現(xiàn)延時消息。
測試配置類
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitMQConfig {
private static final String DIRECT_EXCHANGE_NAME = "directExchange";//交換機(jī)名稱
private static final String DIRECT_ROUTE_KEY = "directRoute";//路由鍵
private static final String DIRECT_QUEUE_NAME = "directQueue";//隊列名稱
/**
* 業(yè)務(wù)交換機(jī)
* @return direct交換機(jī)
* 名稱,是否持久化,無隊列自動刪除
*/
@Bean
public DirectExchange directExchange(){
return new DirectExchange(DIRECT_EXCHANGE_NAME,true,false);
}
/**
* 隊列
* @return 隊列
* 名稱,是否持久化,是否獨(dú)占,是否自動刪除
*/
@Bean
public Queue directQueue(){
Map
//隊列綁定的死信交換機(jī)
args.put("x-dead-letter-exchange","dead_exchange");
//隊列的死信路由key
args.put("x-dead-letter-routing-key", "dead_route");
//消息過期時間
//args.put("x-message-ttl",4000);
return QueueBuilder.durable(DIRECT_QUEUE_NAME).withArguments(args).build();
}
/**
* 綁定交換機(jī)和隊列
* @param directExchange 交換機(jī)
* @param queue 隊列
* @return
*/
@Bean
public Binding bindingExchangeWithQueue(DirectExchange directExchange, @Qualifier("directQueue") Queue queue){
return BindingBuilder.bind(queue).to(directExchange).with(DIRECT_ROUTE_KEY);
}
/**
* 死信交換機(jī)
* @return direct交換機(jī)
* 名稱,是否持久化,無隊列自動刪除
*/
@Bean
public DirectExchange deadExchange(){
return new DirectExchange("dead_exchange",true,false);
}
@Bean
public Queue deadQueue(){
return new Queue("dead_queue",true,false,false);
}
@Bean
public Binding bindingDeadExchangeWithQueue(DirectExchange deadExchange, Queue deadQueue){
return BindingBuilder.bind(deadQueue).to(deadExchange).with("dead_route");
}
}
在第二步的代碼中添加模擬報錯代碼
if(msg.endsWith("2")){//模擬消息處理出錯
throw new RuntimeException();
}
消費(fèi)者消費(fèi)完所有消息后,控制臺可以看到死信隊列里有10條消息
新建死信隊列監(jiān)聽
@RabbitListener(queues = "dead_queue")
public void deadMessage(String msg, Channel channel, Message message) throws IOException {
String bodyMessage = new String(message.getBody());
System.out.println("deadMessage = " + bodyMessage);
System.out.println("處理死信消息:" + msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
啟動消費(fèi)者后死信隊列消息被處理
4.延時消息(建議使用延時消息插件)
在生產(chǎn)者消息發(fā)送方法內(nèi)添加如下參數(shù),設(shè)置當(dāng)前發(fā)送消息的過期時間,消息過期后根據(jù)隊列綁定的死信交換機(jī)和路由鍵將消息發(fā)送到死信隊列,死信隊列消費(fèi)者消費(fèi)消息即完成了消息延時消費(fèi)。
**注意:**這種延時消息局限性較大,因?yàn)槿绻劝l(fā)送一條消息設(shè)置過期時間為30s,隨后發(fā)送一條過期時間為10s的消息,仍會是第一條消息過期后第二條消息才能進(jìn)入死信隊列。
發(fā)送10條消息到隊列,依次設(shè)置過期時間為10s到1s
public String sendMessage() {
for (int i = 10; i >= 1; i--) {
//生成消息id
String messageId = UUID.randomUUID().toString();
//消息內(nèi)容
String message = "rabbitMQ測試消息!" + i;
int time = i;
//發(fā)送消息
rabbitTemplate.convertAndSend(DIRECT_EXCHANGE_NAME, DIRECT_ROUTE_KEY,message,msg -> {
msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);//持久化
msg.getMessageProperties().setMessageId(messageId);//設(shè)置消息id
msg.getMessageProperties().setExpiration(time + "000");
return msg;
});
System.out.println("已發(fā)送消息:id=" + messageId + " message="+ message);
}
return "消息發(fā)送成功!";
}
消費(fèi)者注釋消費(fèi)普通隊列的代碼,啟動消費(fèi)者觀察死信隊列消息消費(fèi)順序。
前10s都是沒有處理消息的,因?yàn)榈谝粭l消息未過期,后續(xù)的消息也不會進(jìn)入死信隊列。
柚子快報邀請碼778899分享:RabbitMQ消息重復(fù)消費(fèi)
參考文章
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場。
轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。