欧美free性护士vide0shd,老熟女,一区二区三区,久久久久夜夜夜精品国产,久久久久久综合网天天,欧美成人护士h版

首頁綜合 正文
目錄

柚子快報邀請碼778899分享:RabbitMQ消息重復(fù)消費(fèi)

柚子快報邀請碼778899分享:RabbitMQ消息重復(fù)消費(fèi)

http://yzkb.51969.com/

RabbitMQ消息重復(fù)消費(fèi)問題

同一條消息被一個消費(fèi)者消費(fèi)多次或者被多個消費(fèi)者消費(fèi)??赡軐?dǎo)致系統(tǒng)相關(guān)業(yè)務(wù)重復(fù)執(zhí)行和數(shù)據(jù)不一致問題。

1.場景模擬

生產(chǎn)者

public String sendMessage() {

for (int i = 1; i <= 100; i++) {

//生成消息id

String messageId = UUID.randomUUID().toString();

//消息內(nèi)容

String message = "rabbitMQ測試消息!" + i;

//發(fā)送消息

rabbitTemplate.convertAndSend(DIRECT_EXCHANGE_NAME, DIRECT_ROUTE_KEY,message,msg -> {

msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);//持久化

msg.getMessageProperties().setMessageId(messageId);//設(shè)置消息id

return msg;

});

System.out.println("已發(fā)送消息:id=" + messageId + " message="+ message);

}

return "消息發(fā)送成功!";

}

消費(fèi)者

@RabbitListener(queues = "directQueue")

public void spendMessage(String msg, Channel channel, Message message) throws IOException {

String messageId = message.getMessageProperties().getMessageId();

System.out.println("接收到消息:id=" + messageId+ " message=" + msg);

try {

//模擬消費(fèi)耗時

Thread.sleep(100);

} catch (InterruptedException e) {

e.printStackTrace();

}

//channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

}

測試步驟

生產(chǎn)者發(fā)送100條消息

啟動消費(fèi)者

由于最后一條消息沒有調(diào)用basicAck方法,消息并沒有消費(fèi)成功,當(dāng)我們重啟消費(fèi)者服務(wù)時,消息會被再次消費(fèi)。

重啟消費(fèi)者

2.解決方案

因?yàn)槊織l消息都有自己的id(唯一標(biāo)識),可根據(jù)這個id來判斷消息是否被消費(fèi)過。

消費(fèi)消息前先獲取消息id—>查詢緩存是否存在此id,判斷id對應(yīng)的值—>為1則表示該消息被消費(fèi)過,為0則表示消費(fèi)中

以下示例使用redis作為介質(zhì)

消費(fèi)者

@RabbitListener(queues = "directQueue")

public void spendMessage(String msg, Channel channel, Message message) throws IOException {

String messageId = message.getMessageProperties().getMessageId();

//messageId對應(yīng)的緩存值為0時表示消息消費(fèi)中,1表示消費(fèi)完成

if(Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(messageId, "0", 30L, TimeUnit.SECONDS))){//消息第一次被消費(fèi)

try {

//模擬消費(fèi)耗時

System.out.println("接收到消息:id=" + messageId+ " message=" + msg);

Thread.sleep(100);

//業(yè)務(wù)執(zhí)行完成后標(biāo)識消息消費(fèi)完成

redisTemplate.opsForValue().set(messageId,"1",30L,TimeUnit.SECONDS);

//消息消費(fèi)確認(rèn)

channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

//消費(fèi)成功后刪除緩存

redisTemplate.delete(messageId);

} catch (Exception e) {

//丟棄消息(關(guān)聯(lián)了死信隊列的話可以放入死信隊列處理)

channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);

//刪除緩存

redisTemplate.delete(messageId);

}

}else{

String value = redisTemplate.opsForValue().get(messageId);

if("0".equals(value)){

return;

}

channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

}

}

再次按照以上步驟測試,消息不會被重復(fù)消費(fèi)。

最后一條被消費(fèi)的消息為49,再次啟動后未重復(fù)消費(fèi)。

本示例中緩存過期時間為30s,若啟動消費(fèi)者的時間間隔超過30s,則消息仍會被重復(fù)消費(fèi)

3.死信隊列

死信隊列就是一個普通隊列,可以使用任意種交換機(jī),業(yè)務(wù)隊列可通過綁定死信交換機(jī)和路由鍵自動將被nack和reject且不重新入隊的消息發(fā)送給對應(yīng)的死信隊列?;蛘咄ㄟ^設(shè)置業(yè)務(wù)隊列的消息過期時間實(shí)現(xiàn)延時消息。

測試配置類

import org.springframework.amqp.core.*;

import org.springframework.beans.factory.annotation.Qualifier;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

import java.util.Map;

@Configuration

public class RabbitMQConfig {

private static final String DIRECT_EXCHANGE_NAME = "directExchange";//交換機(jī)名稱

private static final String DIRECT_ROUTE_KEY = "directRoute";//路由鍵

private static final String DIRECT_QUEUE_NAME = "directQueue";//隊列名稱

/**

* 業(yè)務(wù)交換機(jī)

* @return direct交換機(jī)

* 名稱,是否持久化,無隊列自動刪除

*/

@Bean

public DirectExchange directExchange(){

return new DirectExchange(DIRECT_EXCHANGE_NAME,true,false);

}

/**

* 隊列

* @return 隊列

* 名稱,是否持久化,是否獨(dú)占,是否自動刪除

*/

@Bean

public Queue directQueue(){

Map args = new HashMap<>(3);

//隊列綁定的死信交換機(jī)

args.put("x-dead-letter-exchange","dead_exchange");

//隊列的死信路由key

args.put("x-dead-letter-routing-key", "dead_route");

//消息過期時間

//args.put("x-message-ttl",4000);

return QueueBuilder.durable(DIRECT_QUEUE_NAME).withArguments(args).build();

}

/**

* 綁定交換機(jī)和隊列

* @param directExchange 交換機(jī)

* @param queue 隊列

* @return

*/

@Bean

public Binding bindingExchangeWithQueue(DirectExchange directExchange, @Qualifier("directQueue") Queue queue){

return BindingBuilder.bind(queue).to(directExchange).with(DIRECT_ROUTE_KEY);

}

/**

* 死信交換機(jī)

* @return direct交換機(jī)

* 名稱,是否持久化,無隊列自動刪除

*/

@Bean

public DirectExchange deadExchange(){

return new DirectExchange("dead_exchange",true,false);

}

@Bean

public Queue deadQueue(){

return new Queue("dead_queue",true,false,false);

}

@Bean

public Binding bindingDeadExchangeWithQueue(DirectExchange deadExchange, Queue deadQueue){

return BindingBuilder.bind(deadQueue).to(deadExchange).with("dead_route");

}

}

在第二步的代碼中添加模擬報錯代碼

if(msg.endsWith("2")){//模擬消息處理出錯

throw new RuntimeException();

}

消費(fèi)者消費(fèi)完所有消息后,控制臺可以看到死信隊列里有10條消息

新建死信隊列監(jiān)聽

@RabbitListener(queues = "dead_queue")

public void deadMessage(String msg, Channel channel, Message message) throws IOException {

String bodyMessage = new String(message.getBody());

System.out.println("deadMessage = " + bodyMessage);

System.out.println("處理死信消息:" + msg);

channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

}

啟動消費(fèi)者后死信隊列消息被處理

4.延時消息(建議使用延時消息插件)

在生產(chǎn)者消息發(fā)送方法內(nèi)添加如下參數(shù),設(shè)置當(dāng)前發(fā)送消息的過期時間,消息過期后根據(jù)隊列綁定的死信交換機(jī)和路由鍵將消息發(fā)送到死信隊列,死信隊列消費(fèi)者消費(fèi)消息即完成了消息延時消費(fèi)。

**注意:**這種延時消息局限性較大,因?yàn)槿绻劝l(fā)送一條消息設(shè)置過期時間為30s,隨后發(fā)送一條過期時間為10s的消息,仍會是第一條消息過期后第二條消息才能進(jìn)入死信隊列。

發(fā)送10條消息到隊列,依次設(shè)置過期時間為10s到1s

public String sendMessage() {

for (int i = 10; i >= 1; i--) {

//生成消息id

String messageId = UUID.randomUUID().toString();

//消息內(nèi)容

String message = "rabbitMQ測試消息!" + i;

int time = i;

//發(fā)送消息

rabbitTemplate.convertAndSend(DIRECT_EXCHANGE_NAME, DIRECT_ROUTE_KEY,message,msg -> {

msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);//持久化

msg.getMessageProperties().setMessageId(messageId);//設(shè)置消息id

msg.getMessageProperties().setExpiration(time + "000");

return msg;

});

System.out.println("已發(fā)送消息:id=" + messageId + " message="+ message);

}

return "消息發(fā)送成功!";

}

消費(fèi)者注釋消費(fèi)普通隊列的代碼,啟動消費(fèi)者觀察死信隊列消息消費(fèi)順序。

前10s都是沒有處理消息的,因?yàn)榈谝粭l消息未過期,后續(xù)的消息也不會進(jìn)入死信隊列。

柚子快報邀請碼778899分享:RabbitMQ消息重復(fù)消費(fèi)

http://yzkb.51969.com/

參考文章

評論可見,查看隱藏內(nèi)容

本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場。

轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。

本文鏈接:http://m.gantiao.com.cn/post/19258989.html

發(fā)布評論

您暫未設(shè)置收款碼

請在主題配置——文章設(shè)置里上傳

掃描二維碼手機(jī)訪問

文章目錄