欧美free性护士vide0shd,老熟女,一区二区三区,久久久久夜夜夜精品国产,久久久久久综合网天天,欧美成人护士h版

首頁綜合 正文
目錄

柚子快報激活碼778899分享:RabbitMQ 學(xué)習(xí)筆記

柚子快報激活碼778899分享:RabbitMQ 學(xué)習(xí)筆記

http://yzkb.51969.com/

學(xué)習(xí)視頻:動力節(jié)點RabbitMQ教程|12小時學(xué)會rabbitmq消息中間件_嗶哩嗶哩_bilibili

一、RabbitMQ 運行環(huán)境搭建

RabbitMQ 是使用 Erlang 語言開發(fā)的,所以要先下載安裝 Erlang

下載時一定要注意版本兼容性:RabbitMQ Erlang 版本要求 — 兔子MQ

二、啟動及停止 RabbitMQ

1、啟動 RabbitMQ

進入到安裝目錄的 sbin 目錄下

# -detached 表示在后臺啟動運行 rabbitmq, 不加該參數(shù)表示前臺啟動

# rabbitmq 的運行日志存放在安裝目錄的 var 目錄下

# 啟動

./rabbitmq-server -detached

2、查看 RabbitMQ 狀態(tài)

進入到安裝目錄的 sbin 目錄下

# -n rabbit 是指定節(jié)點名稱為 rabbit,目前只有一個節(jié)點,節(jié)點名默認(rèn)為 rabbit

# 此處 -n rabbit 也可以省略

# 查看狀態(tài)

./rabbitmqctl -n rabbit status

3、停止 RabbitMQ

進入到安裝目錄的 sbin 目錄下

# 停止

./rabbitmqctl shutdown

4、配置 path 環(huán)境變量

打開配置文件

vim /etc/profile

進行配置

RABBIT_HOME=/usr/local/rabbitmq_server-3.10.11

PATH=$PATH:$RABBIT_HOME/sbin

export RABBIT_HOME PATH

刷新環(huán)境變量

source /etc/profile

三、RabbitMQ 管理命令

./rabbitmqctl 是一個管理命令,可以管理 rabbitmq 的很多操作

./rabbitmqctl help 可以查看有哪些操作

查看具體子命令,可以使用 ./rabbitmqctl help 子命令名稱

1、用戶管理

用戶管理包括增加用戶,刪除用戶,查看用戶列表,修改用戶密碼。

這些操作都是通過 rabbitmqct 管理命令來實現(xiàn)完成

查看幫助:rabbitmqct add_user --help

查看當(dāng)前用戶列表

rabbitmqctl list_users

新增一個用戶

# 語法:rabbitmqctl add_user Username Password

rabbitmqctl add_user admin 123456

2、設(shè)置用戶角色

設(shè)置用戶角色

# 語法:rabbitmqctl set_user_tags User Tag

# 這里設(shè)置用戶的角色為管理員角色

rabbitmqctl set_user_tags admin administrator

3、設(shè)置用戶權(quán)限

設(shè)置用戶權(quán)限

# 說明:此操作設(shè)置了 admin 用戶擁有操作虛擬主機/下的所以權(quán)限

rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

四、web 管理后臺

Rabbitmq 有一個 web 管理后臺,這個管理后臺是以插件的方式提供的,啟動后臺 web 管理功能,需要切換到安裝目錄的 sbin 目錄下進行操作

1、啟用管理后臺

# 查看 rabbitmq 的插件列表

./rabbitmq-plugins list

# 啟用

./rabbitmq-plugins enable rabbitmq_management

# 禁用

./rabbitmq-plugins disable rabbitmq_management

2、訪問管理后臺

訪問時需要檢查虛擬機的防火 墻

使用:http://你的虛擬機ip:15672?就可以訪問了

注意:如果使用默認(rèn)用戶 guest,密碼 guest 登錄,會提示 User can only log in via localhost,說明 guest 用戶只能從 localhost 本機登錄,所以不要使用該用戶

3、新建虛擬主機

新建主機

建完后如下

五、RabbitMQ 工作模型

broker 相當(dāng)于 mysql 服務(wù)器,virtual host 相當(dāng)于數(shù)據(jù)庫(可以有多個數(shù)據(jù)庫)

queue 相當(dāng)于表,消息相當(dāng)于記錄

消息隊列有三個核心要素:消息生產(chǎn)者、消息隊列、消息消費者

生產(chǎn)者(Producer):發(fā)送消息的應(yīng)用;消費者(Consumer):接收消息的應(yīng)用;

代理(Broker):就是消息服務(wù)器,RabbitMQ Server 就是 Message Broker

鏈接(Connection):鏈接 RabbitMQ 服務(wù)器的 TCP 長連接

信道(Channel):鏈接中的一個虛擬通道,消息隊列發(fā)送或者接收消息時,都是通過信道進行的

虛擬主機(Virtual host):一個虛擬分組,在代碼中就是一個字符串,當(dāng)多個不同的用戶使用同一個 RabbitMQ 服務(wù)時,可以劃分出多個 Virtual host,每個用戶在自己的 Virtual host 創(chuàng)建 exchange/queue 等(分類比較清晰、相互隔離)

交換機(Exchange):交換機負(fù)責(zé)從生產(chǎn)者接收消息,并根據(jù)交換機類型分發(fā)到對應(yīng)的消息隊列中,起到一個路由的作用

路由鍵(Routing Key):交換機根據(jù)路由鍵來決定消息分發(fā)到那個隊列,路由鍵是消息的目的地址

綁定(Binding):綁定是隊列與交換機的一個關(guān)聯(lián)鏈接(關(guān)聯(lián)關(guān)系)

隊列(Queue):存儲消息的緩存

消息(Message):由生產(chǎn)者通過 RabbitMQ 發(fā)送給消費者的信息(消息可以是任何數(shù)據(jù),字符串、user 對象、json 串等)

六、RabbitMQ 交換機類型

Exchange(X)可翻譯為交換機/交換器/路由器,類型有以下幾種:

Fanout Exchange(扇形)Direct Exchange(直連)Topic Exchange(主題)Headers Exchange(頭部)

1、Fanout Exchange

1.1、介紹

Fanout 扇形,散開的;扇形交換機

投遞到所有綁定的隊列,不需要路由鍵,不需要進行路由鍵的匹配,相當(dāng)于廣播、群發(fā)

P 表示生產(chǎn)者X 表示交換機紅色部分表示隊列

1.2、示例

添加依賴

org.springframework.boot

spring-boot-starter-amqp

2.6.13

application.yml 配置文件

spring:

rabbitmq:

host: 192.168.224.133 # ip

port: 5672 # 端口

username: admin # 用戶名

password: 123456 # 密碼

virtual-host: powernode # 虛擬主機

配置類

/*

rabbitmq三部曲

1.定義交換機

2.定義隊列

3.綁定交換機和隊列

*/

@Configuration

public class RabbitConfig {

// 1.定義交換機

@Bean

public FanoutExchange fanoutExchange() {

return new FanoutExchange("exchange.fanout");

}

// 2.定義隊列

@Bean

public Queue queueA() {

return new Queue("queue.fanout.a");

}

@Bean

public Queue queueB() {

return new Queue("queue.fanout.b");

}

// 3.綁定交換機和隊列

@Bean

public Binding bindingA(FanoutExchange fanoutExchange, Queue queueA) {

// 將隊列A綁定到扇形交換機

return BindingBuilder.bind(queueA).to(fanoutExchange);

}

@Bean

public Binding bindingB(FanoutExchange fanoutExchange, Queue queueB) {

// 將隊列B綁定到扇形交換機

return BindingBuilder.bind(queueB).to(fanoutExchange);

}

}

發(fā)送消息

@Component

public class MessageService {

@Resource

private RabbitTemplate rabbitTemplate;

public void sendMsg() {

// 定義要發(fā)送的消息

String msg = "hello world";

// 轉(zhuǎn)換并且發(fā)送

Message message = new Message(msg.getBytes());

rabbitTemplate.convertAndSend("exchange.fanout", "", message);

}

}

接收消息

@Component

public class ReceiveMessage {

@RabbitListener(queues = {"queue.fanout.a", "queue.fanout.b"})

public void receiveMsg(Message message) {

byte[] body = message.getBody();

String msg = new String(body);

System.out.println("接收到的消息為: " + msg);

}

}

2、Direct Exchange

2.1、介紹

根據(jù)路由鍵精確匹配(一摸一樣)進行路由消息隊列

P 表示生產(chǎn)者X 表示交換機紅色部分表示隊列

2.2、示例

?添加依賴

org.springframework.boot

spring-boot-starter-amqp

2.6.13

application.yml 配置文件

spring:

rabbitmq:

host: 192.168.224.133 # ip

port: 5672 # 端口

username: admin # 用戶名

password: 123456 # 密碼

virtual-host: powernode # 虛擬主機

配置類

@Configuration

public class RabbitConfig {

// 1.定義交換機

@Bean

public DirectExchange directExchange() {

return ExchangeBuilder.directExchange("exchange.direct").build();

}

// 2.定義隊列

@Bean

public Queue queueA() {

return QueueBuilder.durable("queue.direct.a").build();

}

@Bean

public Queue queueB() {

return QueueBuilder.durable("queue.direct.b").build();

}

// 3.交換機和隊列進行綁定

@Bean

public Binding bindingA(DirectExchange directExchange, Queue queueA) {

return BindingBuilder.bind(queueA).to(directExchange).with("error");

}

@Bean

public Binding bindingB1(DirectExchange directExchange, Queue queueB) {

return BindingBuilder.bind(queueB).to(directExchange).with("info");

}

@Bean

public Binding bindingB2(DirectExchange directExchange, Queue queueB) {

return BindingBuilder.bind(queueB).to(directExchange).with("error");

}

@Bean

public Binding bindingB3(DirectExchange directExchange, Queue queueB) {

return BindingBuilder.bind(queueB).to(directExchange).with("warning");

}

}

發(fā)送消息

@Service

public class MessageService {

@Resource

private RabbitTemplate rabbitTemplate;

public void sendMsg() {

Message message = MessageBuilder.withBody("hello world".getBytes()).build();

rabbitTemplate.convertAndSend("exchange.direct", "info", message);

}

}

接收消息

@Component

public class ReceiveMessage {

@RabbitListener(queues = {"queue.direct.a", "queue.direct.b"})

public void receiveMsg(Message message) {

byte[] body = message.getBody();

String msg = new String(body);

System.out.println("接收到的消息為: " + msg);

}

}

3、Topic Exchange

3.1、介紹

通配符匹配,相當(dāng)于模糊匹配

# 匹配多個單詞,用來表示任意數(shù)量(零個或多個)單詞* 匹配一個單詞(必須有一個,而且只有一個),用 . 隔開的為一個單詞舉例

beijing.# = beijing.queue.abc,beijing.queue.xyz.xxxbeijing.* = beijing.queue,beijing.xyz

發(fā)送時指定的路由鍵:lazy.orange.rabbit

P 表示生產(chǎn)者X 表示交換機紅色部分表示隊列

3.2、示例

?添加依賴

org.springframework.boot

spring-boot-starter-amqp

2.6.13

application.yml 配置文件

spring:

rabbitmq:

host: 192.168.224.133 # ip

port: 5672 # 端口

username: admin # 用戶名

password: 123456 # 密碼

virtual-host: powernode # 虛擬主機

配置類

@Configuration

public class RabbitConfig {

// 1.定義交換機

@Bean

public TopicExchange topicExchange() {

return ExchangeBuilder.topicExchange("exchange.topic").build();

}

// 2.定義隊列

@Bean

public Queue queueA() {

return QueueBuilder.durable("queue.topic.a").build();

}

@Bean

public Queue queueB() {

return QueueBuilder.durable("queue.topic.b").build();

}

// 3.交換機和隊列進行綁定

@Bean

public Binding bindingA(TopicExchange topicExchange, Queue queueA) {

return BindingBuilder.bind(queueA).to(topicExchange).with("*.orange.*");

}

@Bean

public Binding bindingB1(TopicExchange topicExchange, Queue queueB) {

return BindingBuilder.bind(queueB).to(topicExchange).with("*.*.rabbit");

}

@Bean

public Binding bindingB2(TopicExchange topicExchange, Queue queueB) {

return BindingBuilder.bind(queueB).to(topicExchange).with("lazy.#");

}

}

發(fā)送消息

@Service

public class MessageService {

@Resource

private RabbitTemplate rabbitTemplate; // 用RabbitTemplate也可以

public void sendMsg() {

Message message = MessageBuilder.withBody("hello world".getBytes()).build();

rabbitTemplate.convertAndSend("exchange.topic", "lazy.orange.rabbit", message);

}

}

接收消息

@Component

public class ReceiveMessage {

@RabbitListener(queues = {"queue.topic.a", "queue.topic.b"})

public void receiveMsg(Message message) {

byte[] body = message.getBody();

String msg = new String(body);

System.out.println("接收到的消息為: " + msg);

}

}

4、Headers Exchange

4.1、介紹

用的比較少

基于消息內(nèi)容中的 headers 屬性進行匹配

P 表示生產(chǎn)者X 表示交換機紅色部分表示隊列

4.2、示例

?添加依賴

org.springframework.boot

spring-boot-starter-amqp

2.6.13

application.yml 配置文件

spring:

rabbitmq:

host: 192.168.224.133 # ip

port: 5672 # 端口

username: admin # 用戶名

password: 123456 # 密碼

virtual-host: powernode # 虛擬主機

配置類

@Configuration

public class RabbitConfig {

// 1.定義交換機

@Bean

public HeadersExchange headersExchange() {

return ExchangeBuilder.headersExchange("exchange.headers").build();

}

// 2.定義隊列

@Bean

public Queue queueA() {

return QueueBuilder.durable("queue.headers.a").build();

}

@Bean

public Queue queueB() {

return QueueBuilder.durable("queue.headers.b").build();

}

// 3.交換機和隊列進行綁定

@Bean

public Binding bindingA(HeadersExchange headersExchange, Queue queueA) {

Map headerValues = new HashMap<>();

headerValues.put("type", "m");

headerValues.put("status", 1);

return BindingBuilder.bind(queueA).to(headersExchange).whereAll(headerValues).match();

}

@Bean

public Binding bindingB(HeadersExchange headersExchange, Queue queueB) {

Map headerValues = new HashMap<>();

headerValues.put("type", "s");

headerValues.put("status", 0);

return BindingBuilder.bind(queueB).to(headersExchange).whereAll(headerValues).match();

}

}

發(fā)送消息

@Service

public class MessageService {

@Resource

private RabbitTemplate rabbitTemplate;

public void sendMsg() {

// 消息屬性

MessageProperties messageProperties = new MessageProperties();

Map headers = new HashMap<>();

headers.put("type", "s");

headers.put("status", 0);

// 設(shè)置消息頭

messageProperties.setHeaders(headers);

// 添加了消息屬性

Message message = MessageBuilder.withBody("hello world".getBytes()).andProperties(messageProperties).build();

// 對于頭部交換機,路由key無所謂(不需要)

rabbitTemplate.convertAndSend("exchange.headers", "", message);

}

}

接收消息

@Component

public class ReceiveMessage {

@RabbitListener(queues = {"queue.headers.a", "queue.headers.b"})

public void receiveMsg(Message message) {

byte[] body = message.getBody();

String msg = new String(body);

System.out.println("接收到的消息為: " + msg);

}

}

七、RabbitMQ 過期時間

過期時間也叫 TTL 消息,TTL:Time To Live

消息的過期時間有兩種設(shè)置方式:(過期消息)

1、設(shè)置單條消息的過期時間

?添加依賴

org.springframework.boot

spring-boot-starter-amqp

2.6.13

application.yml 配置文件

spring:

rabbitmq:

host: 192.168.224.133 # ip

port: 5672 # 端口

username: admin # 用戶名

password: 123456 # 密碼

virtual-host: powernode # 虛擬主機

配置類

@Configuration

public class RabbitConfig {

// 1.定義交換機

@Bean

public DirectExchange directExchange() {

return ExchangeBuilder.directExchange("exchange.direct").build();

}

// 2.定義隊列

@Bean

public Queue queue() {

return QueueBuilder.durable("queue.ttl").build();

}

// 3.交換機和隊列進行綁定

@Bean

public Binding binding(DirectExchange directExchange, Queue queue) {

return BindingBuilder.bind(queue).to(directExchange).with("info");

}

}

發(fā)送消息

@Service

public class MessageService {

@Resource

private RabbitTemplate rabbitTemplate;

public void sendMsg() {

MessageProperties messageProperties = new MessageProperties();

messageProperties.setExpiration("15000"); // 過期的毫秒數(shù)

Message message = MessageBuilder.withBody("hello world".getBytes()).andProperties(messageProperties).build();

rabbitTemplate.convertAndSend("exchange.direct", "info", message);

}

}

2、隊列屬性設(shè)置消息過期時間

?添加依賴

org.springframework.boot

spring-boot-starter-amqp

2.6.13

application.yml 配置文件

spring:

rabbitmq:

host: 192.168.224.133 # ip

port: 5672 # 端口

username: admin # 用戶名

password: 123456 # 密碼

virtual-host: powernode # 虛擬主機

配置類

@Configuration

public class RabbitConfig {

// 1.定義交換機

@Bean

public DirectExchange directExchange() {

return ExchangeBuilder.directExchange("exchange.direct").build();

}

// 2.定義隊列

@Bean

public Queue queue() {

// 設(shè)置消息過期時間

Map arguments = new HashMap<>();

arguments.put("x-message-ttl", 15000); // 15秒

// 方式1

return new Queue("queue.ttl", true, false, false, arguments);

// 方式2

// return QueueBuilder.durable("queue.ttl").withArguments(arguments).build();

}

// 3.交換機和隊列進行綁定

@Bean

public Binding binding(DirectExchange directExchange, Queue queue) {

return BindingBuilder.bind(queue).to(directExchange).with("info");

}

}

發(fā)送消息

@Service

public class MessageService {

@Resource

private RabbitTemplate rabbitTemplate;

public void sendMsg() {

Message message = MessageBuilder.withBody("hello world".getBytes()).build();

rabbitTemplate.convertAndSend("exchange.direct", "info", message);

}

}

3、注意

如果消息和隊列都設(shè)置了過期時間,則消息的 TTL 以兩者之間較小的那個數(shù)值為準(zhǔn)。

八、死信隊列

也有叫死信交換機、死信郵箱等說法

DLX:Dead-Letter-Exchange 死信交換機,死信郵箱

注意:圖中的 3.理由key 改為 路由key

以下情況下一個消息會進入 DLX(Dead Letter Exchange)死信交換機。

1、消息過期

?添加依賴

org.springframework.boot

spring-boot-starter-amqp

2.6.13

application.yml 配置文件

spring:

rabbitmq:

host: 192.168.224.133 # ip

port: 5672 # 端口

username: admin # 用戶名

password: 123456 # 密碼

virtual-host: powernode # 虛擬主機

配置類

@Configuration

public class RabbitConfig {

// 正常交換機

@Bean

public DirectExchange normalExchange() {

return ExchangeBuilder.directExchange("exchange.normal.b").build();

}

// 正常隊列

@Bean

public Queue normalQueue() {

return QueueBuilder.durable("queue.normal.b")

.deadLetterExchange("exchange.dlx.b") // 設(shè)置死信交換機

.deadLetterRoutingKey("error") // 設(shè)置死信路由key,要和死信交換機和死信隊列綁定的key一樣

.build();

}

// 綁定交換機和隊列(正常)

@Bean

public Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {

return BindingBuilder.bind(normalQueue).to(normalExchange).with("order");

}

// 分割線

// 死信交換機

@Bean

public DirectExchange dlxExchange() {

return ExchangeBuilder.directExchange("exchange.dlx.b").build();

}

// 死信隊列

@Bean

public Queue dlxQueue() {

return QueueBuilder.durable("queue.dlx.b").build();

}

// 綁定交換機和隊列(死信)

@Bean

public Binding bindingDlx(DirectExchange dlxExchange, Queue dlxQueue) {

return BindingBuilder.bind(dlxQueue).to(dlxExchange).with("error");

}

}

發(fā)送消息

@Service

public class MessageService {

@Resource

private RabbitTemplate rabbitTemplate;

public void sendMsg() {

// 消息屬性

MessageProperties messageProperties = new MessageProperties();

// 設(shè)置單條消息過期時間,單位為毫秒

messageProperties.setExpiration("15000");

Message message = MessageBuilder.withBody("hello world".getBytes()).andProperties(messageProperties).build();

// 對于頭部交換機,路由key無所謂(不需要)

rabbitTemplate.convertAndSend("exchange.normal.b", "order", message);

}

}

2、隊列過期

?添加依賴

org.springframework.boot

spring-boot-starter-amqp

2.6.13

application.yml 配置文件

spring:

rabbitmq:

host: 192.168.224.133 # ip

port: 5672 # 端口

username: admin # 用戶名

password: 123456 # 密碼

virtual-host: powernode # 虛擬主機

配置類

@Configuration

public class RabbitConfig {

// 正常交換機

@Bean

public DirectExchange normalExchange() {

return ExchangeBuilder.directExchange("exchange.normal.a").build();

}

// 正常隊列

@Bean

public Queue normalQueue() {

return QueueBuilder.durable("queue.normal.a")

.ttl(15000) // 過期時間 15秒

.deadLetterExchange("exchange.dlx.a") // 設(shè)置死信交換機

.deadLetterRoutingKey("error") // 設(shè)置死信路由key,要和死信交換機和死信隊列綁定的key一樣

.build();

}

// 綁定交換機和隊列(正常)

@Bean

public Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {

return BindingBuilder.bind(normalQueue).to(normalExchange).with("order");

}

// 分割線

// 死信交換機

@Bean

public DirectExchange dlxExchange() {

return ExchangeBuilder.directExchange("exchange.dlx.a").build();

}

// 死信隊列

@Bean

public Queue dlxQueue() {

return QueueBuilder.durable("queue.dlx.a").build();

}

// 綁定交換機和隊列(死信)

@Bean

public Binding bindingDlx(DirectExchange dlxExchange, Queue dlxQueue) {

return BindingBuilder.bind(dlxQueue).to(dlxExchange).with("error");

}

}

發(fā)送消息

@Service

public class MessageService {

@Resource

private RabbitTemplate rabbitTemplate;

public void sendMsg() {

Message message = MessageBuilder.withBody("hello world".getBytes()).build();

rabbitTemplate.convertAndSend("exchange.normal.a", "order", message);

}

}

3、隊列達到最大長度

??添加依賴

org.springframework.boot

spring-boot-starter-amqp

2.6.13

application.yml 配置文件

spring:

rabbitmq:

host: 192.168.224.133 # ip

port: 5672 # 端口

username: admin # 用戶名

password: 123456 # 密碼

virtual-host: powernode # 虛擬主機

配置類

@Configuration

public class RabbitConfig {

// 正常交換機

@Bean

public DirectExchange normalExchange() {

return ExchangeBuilder.directExchange("exchange.normal.c").build();

}

// 正常隊列

@Bean

public Queue normalQueue() {

return QueueBuilder.durable("queue.normal.c")

.deadLetterExchange("exchange.dlx.c") // 設(shè)置死信交換機

.deadLetterRoutingKey("error") // 設(shè)置死信路由key,要和死信交換機和死信隊列綁定的key一樣

.maxLength(5) // 設(shè)置隊列最大長度

.build();

}

// 綁定交換機和隊列(正常)

@Bean

public Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {

return BindingBuilder.bind(normalQueue).to(normalExchange).with("order");

}

// 分割線

// 死信交換機

@Bean

public DirectExchange dlxExchange() {

return ExchangeBuilder.directExchange("exchange.dlx.c").build();

}

// 死信隊列

@Bean

public Queue dlxQueue() {

return QueueBuilder.durable("queue.dlx.c").build();

}

// 綁定交換機和隊列(死信)

@Bean

public Binding bindingDlx(DirectExchange dlxExchange, Queue dlxQueue) {

return BindingBuilder.bind(dlxQueue).to(dlxExchange).with("error");

}

}

發(fā)送消息

@Service

public class MessageService {

@Resource

private RabbitTemplate rabbitTemplate;

public void sendMsg() {

for (int i = 1; i <= 10; i++) {

String str = "hello world" + i;

Message message = MessageBuilder.withBody(str.getBytes()).build();

// 對于頭部交換機,路由key無所謂(不需要)

rabbitTemplate.convertAndSend("exchange.normal.c", "order", message);

}

}

}

4、消費者拒絕消息不進行重新投遞

從正常的隊列接收消息,但是對消息不進行確認(rèn),并且不對消息進行重新投遞,此時消息就進入死信隊列

??添加依賴

org.springframework.boot

spring-boot-starter-amqp

2.6.13

application.yml 配置文件

spring:

rabbitmq:

host: 192.168.224.133 # ip

port: 5672 # 端口

username: admin # 用戶名

password: 123456 # 密碼

virtual-host: powernode # 虛擬主機

listener:

simple:

acknowledge-mode: manual # 啟動手動確認(rèn)

配置類

@Configuration

public class RabbitConfig {

// 正常交換機

@Bean

public DirectExchange normalExchange() {

return ExchangeBuilder.directExchange("exchange.normal.d").build();

}

// 正常隊列

@Bean

public Queue normalQueue() {

return QueueBuilder.durable("queue.normal.d")

.deadLetterExchange("exchange.dlx.d") // 設(shè)置死信交換機

.deadLetterRoutingKey("error") // 設(shè)置死信路由key,要和死信交換機和死信隊列綁定的key一樣

.build();

}

// 綁定交換機和隊列(正常)

@Bean

public Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {

return BindingBuilder.bind(normalQueue).to(normalExchange).with("order");

}

// 分割線

// 死信交換機

@Bean

public DirectExchange dlxExchange() {

return ExchangeBuilder.directExchange("exchange.dlx.d").build();

}

// 死信隊列

@Bean

public Queue dlxQueue() {

return QueueBuilder.durable("queue.dlx.d").build();

}

// 綁定交換機和隊列(死信)

@Bean

public Binding bindingDlx(DirectExchange dlxExchange, Queue dlxQueue) {

return BindingBuilder.bind(dlxQueue).to(dlxExchange).with("error");

}

}

發(fā)送消息

@Service

public class MessageService {

@Resource

private RabbitTemplate rabbitTemplate;

public void sendMsg() {

String str = "hello world";

Message message = MessageBuilder.withBody(str.getBytes()).build();

rabbitTemplate.convertAndSend("exchange.normal.d", "order", message);

}

}

接收消息

@Component

public class ReceiveMessage {

@RabbitListener(queues = {"queue.normal.d"})

public void receiveMsg(Message message, Channel channel) {

// 獲取消息屬性

MessageProperties messageProperties = message.getMessageProperties();

// 獲取消息的唯一標(biāo)識,類似人的身份證號

long deliveryTag = messageProperties.getDeliveryTag();

try {

// 手動加一段錯誤代碼

int i = 1 / 0;

byte[] body = message.getBody();

String str = new String(body);

System.out.println("接收到的消息為: " + str);

// 消費者的手動確認(rèn)

// multiple為false,只確認(rèn)當(dāng)前消息,改為true是確認(rèn)當(dāng)前消息以前的消息

// 確認(rèn)后服務(wù)器就可以刪了

channel.basicAck(deliveryTag, false);

} catch (Exception e) {

try {

// 接收者出現(xiàn)問題

// multiple為false,只確認(rèn)當(dāng)前消息,改為true是確認(rèn)當(dāng)前消息以前的消息

// requeue為true,表示重新入隊,為false表示不重新入隊

// channel.basicNack(deliveryTag, false, true);

// requeue改為false,不重新入隊,這時就會進入死信隊列

channel.basicNack(deliveryTag, false, false);

} catch (IOException ex) {

throw new RuntimeException(ex);

}

throw new RuntimeException(e);

}

}

}

5、消費者拒絕消息

開啟手動確認(rèn)模式,并拒絕消息,不重新投遞,則進入死信隊列

??添加依賴

org.springframework.boot

spring-boot-starter-amqp

2.6.13

application.yml 配置文件

spring:

rabbitmq:

host: 192.168.224.133 # ip

port: 5672 # 端口

username: admin # 用戶名

password: 123456 # 密碼

virtual-host: powernode # 虛擬主機

listener:

simple:

acknowledge-mode: manual # 啟動手動確認(rèn)

配置類

@Configuration

public class RabbitConfig {

// 正常交換機

@Bean

public DirectExchange normalExchange() {

return ExchangeBuilder.directExchange("exchange.normal.e").build();

}

// 正常隊列

@Bean

public Queue normalQueue() {

return QueueBuilder.durable("queue.normal.e")

.deadLetterExchange("exchange.dlx.e") // 設(shè)置死信交換機

.deadLetterRoutingKey("error") // 設(shè)置死信路由key,要和死信交換機和死信隊列綁定的key一樣

.build();

}

// 綁定交換機和隊列(正常)

@Bean

public Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {

return BindingBuilder.bind(normalQueue).to(normalExchange).with("order");

}

// 分割線

// 死信交換機

@Bean

public DirectExchange dlxExchange() {

return ExchangeBuilder.directExchange("exchange.dlx.e").build();

}

// 死信隊列

@Bean

public Queue dlxQueue() {

return QueueBuilder.durable("queue.dlx.e").build();

}

// 綁定交換機和隊列(死信)

@Bean

public Binding bindingDlx(DirectExchange dlxExchange, Queue dlxQueue) {

return BindingBuilder.bind(dlxQueue).to(dlxExchange).with("error");

}

}

發(fā)送消息

@Service

public class MessageService {

@Resource

private RabbitTemplate rabbitTemplate;

public void sendMsg() {

String str = "hello world";

Message message = MessageBuilder.withBody(str.getBytes()).build();

rabbitTemplate.convertAndSend("exchange.normal.e", "order", message);

}

}

接收消息

@Component

public class ReceiveMessage {

@RabbitListener(queues = {"queue.normal.e"})

public void receiveMsg(Message message, Channel channel) throws IOException {

// 獲取消息屬性

MessageProperties messageProperties = message.getMessageProperties();

// 獲取消息的唯一標(biāo)識,類似人的身份證號

long deliveryTag = messageProperties.getDeliveryTag();

// 拒絕消息

// 第一個參數(shù)是消息的唯一標(biāo)識

// 第二個參數(shù)是是否重新入隊

channel.basicReject(deliveryTag, false);

}

}

九、延遲隊列

場景:有一個訂單,15 分鐘內(nèi)如果不支付,就把該訂單設(shè)置為交易關(guān)閉,那么就不能支付了,這類實現(xiàn)延遲任務(wù)的場景就可以采用延遲隊列來實現(xiàn),當(dāng)然除了延遲隊列來實現(xiàn),也可以有一些其他方法實現(xiàn);

1、采用消息中間件

RabbitMQ 本身不支持延遲隊列,可以使用 TTL 結(jié)合 DLX 的方式來實現(xiàn)消息的延遲投遞,即把 DLX 跟某個隊列綁定,到了指定時間,消息過期后,就會從 DLX 路由到這個隊列,消費者可以從這個隊列取走消息?

代碼:正常延遲

???添加依賴

org.springframework.boot

spring-boot-starter-amqp

2.6.13

application.yml 配置文件

spring:

rabbitmq:

host: 192.168.224.133 # ip

port: 5672 # 端口

username: admin # 用戶名

password: 123456 # 密碼

virtual-host: powernode # 虛擬主機

配置類

@Configuration

public class RabbitConfig {

// 1.定義交換機(一機兩用,正常交換機和死信交換機)

@Bean

public DirectExchange directExchange() {

return ExchangeBuilder.directExchange("exchange.delay.a").build();

}

// 2.定義隊列

@Bean

public Queue normalQueue() {

return QueueBuilder.durable("queue.delay.normal.a")

.ttl(25000) // 過期時間25秒

.deadLetterExchange("exchange.delay.a") // 設(shè)置死信交換機

.deadLetterRoutingKey("error") // 死信路由key

.build();

}

@Bean

public Queue dlxQueue() {

return QueueBuilder.durable("queue.delay.dlx.a").build();

}

// 3.交換機和隊列進行綁定

@Bean

public Binding bindingNormal(DirectExchange directExchange, Queue normalQueue) {

return BindingBuilder.bind(normalQueue).to(directExchange).with("order");

}

@Bean

public Binding bindingDlx(DirectExchange directExchange, Queue dlxQueue) {

return BindingBuilder.bind(dlxQueue).to(directExchange).with("error");

}

}

發(fā)送消息

@Service

public class MessageService {

@Resource

private RabbitTemplate rabbitTemplate;

public void sendMsg() {

Message message = MessageBuilder.withBody("hello world".getBytes()).build();

rabbitTemplate.convertAndSend("exchange.delay.a", "order", message);

}

}

問題:如果先發(fā)送的消息,消息延遲時間長,會影響后面的延遲時間段的消息的消費

解決:不同延遲時間的消息要發(fā)到不同的隊列上,同一個隊列的消息,它的延遲時間應(yīng)該一樣

?代碼:解決延遲問題

?添加依賴

org.springframework.boot

spring-boot-starter-amqp

2.6.13

application.yml 配置文件

spring:

rabbitmq:

host: 192.168.224.133 # ip

port: 5672 # 端口

username: admin # 用戶名

password: 123456 # 密碼

virtual-host: powernode # 虛擬主機

配置類

@Configuration

public class RabbitConfig {

// 1.定義交換機(一機兩用,正常交換機和死信交換機)

@Bean

public DirectExchange directExchange() {

return ExchangeBuilder.directExchange("exchange.delay").build();

}

// 2.定義隊列

// 正常的訂單隊列

@Bean

public Queue normalOrderQueue() {

return QueueBuilder.durable("queue.delay.normal.order")

.deadLetterExchange("exchange.delay") // 設(shè)置死信交換機

.deadLetterRoutingKey("error") // 死信路由key

.build();

}

// 正常的支付隊列

@Bean

public Queue normalPayQueue() {

return QueueBuilder.durable("queue.delay.normal.pay")

.deadLetterExchange("exchange.delay") // 設(shè)置死信交換機

.deadLetterRoutingKey("error") // 死信路由key

.build();

}

// 死信隊列

@Bean

public Queue dlxQueue() {

return QueueBuilder.durable("queue.delay.dlx").build();

}

// 3.交換機和隊列進行綁定

// 綁定正常的訂單隊列

@Bean

public Binding bindingNormalOrderQueue(DirectExchange directExchange, Queue normalOrderQueue) {

return BindingBuilder.bind(normalOrderQueue).to(directExchange).with("order");

}

// 綁定正常的支付隊列

@Bean

public Binding bindingNormalPayQueue(DirectExchange directExchange, Queue normalPayQueue) {

return BindingBuilder.bind(normalPayQueue).to(directExchange).with("pay");

}

// 綁定死信隊列

@Bean

public Binding bindingDlx(DirectExchange directExchange, Queue dlxQueue) {

return BindingBuilder.bind(dlxQueue).to(directExchange).with("error");

}

}

發(fā)送消息

@Service

public class MessageService {

@Resource

private RabbitTemplate rabbitTemplate;

public void sendMsg() {

// 第一條消息

Message orderMsg = MessageBuilder.withBody("這是一條訂單消息 20秒過期 ".getBytes()).setExpiration("20000").build();

// 第二條消息

Message payMsg = MessageBuilder.withBody("這是一條支付消息 10秒過期 ".getBytes()).setExpiration("10000").build();

rabbitTemplate.convertAndSend("exchange.delay", "order", orderMsg);

System.out.println("訂單消息發(fā)送消息時間是: " + new SimpleDateFormat("HH:mm:ss").format(new Date()));

rabbitTemplate.convertAndSend("exchange.delay", "pay", payMsg);

System.out.println("支付消息發(fā)送消息時間是: " + new SimpleDateFormat("HH:mm:ss").format(new Date()));

}

}

接收消息

@Component

public class ReceiveMessage {

@RabbitListener(queues = {"queue.delay.dlx"})

public void receiveMsg(Message message) {

byte[] body = message.getBody();

String msg = new String(body);

System.out.println("接收的消息是: " + msg + "接收的時間是: " + new SimpleDateFormat("HH:mm:ss").format(new Date()));

}

}

2、使用延遲插件

使用 rabbitmq-delayed-message-exchange 延遲插件

下載

選擇對應(yīng)的版本下載 rabbitmq-delayed-message-exchange 插件,下載地址:

Community Plugins — RabbitMQ

將插件拷貝到 RabbitMQ 服務(wù)器 plugins 目錄下解壓

// 如果 unzip 沒有安裝,先安裝一下

// yum install unzip -y

unzip rabbitmq_delayed_message_exchange-3.10.2.ez

啟用插件

// 開啟插件

./rabbitmq-plugins enable rabbitmq_delayed_message_exchange

重啟 rabbitmq 使其生效(此處也可以不重啟)

消息發(fā)送后不會直接投遞到隊列,而是存儲到 Mnesia(嵌入式數(shù)據(jù)庫),檢查 x-delay 時間(消息頭部);

延遲插件在 RabbitMQ 3.5.7 及以上的版本才支持,依賴 Erlang/OPT 18.0 及以上運行環(huán)境;

Mnesia 是一個小型數(shù)據(jù)庫,不適合于大量延遲消息的實現(xiàn)解決了消息過期時間不一致出現(xiàn)的問題

代碼實現(xiàn)

添加依賴

org.springframework.boot

spring-boot-starter-amqp

2.6.13

application.yml 配置文件

spring:

rabbitmq:

host: 192.168.224.133 # ip

port: 5672 # 端口

username: admin # 用戶名

password: 123456 # 密碼

virtual-host: powernode # 虛擬主機

配置類

@Configuration

public class RabbitConfig {

// 1.定義交換機

@Bean

public CustomExchange customExchange() {

Map arguments = new HashMap<>();

arguments.put("x-delayed-type", "direct"); // 放一個參數(shù)

return new CustomExchange("exchange.elay.b", "x-delayed-message", true, false, arguments);

}

// 2.定義隊列

@Bean

public Queue queue() {

return QueueBuilder.durable("queue.delay.b").build();

}

// 3.交換機和隊列進行綁定

@Bean

public Binding bindingNormalOrderQueue(CustomExchange customExchange, Queue queue) {

// 綁定,

return BindingBuilder.bind(queue).to(customExchange).with("plugin").noargs();

}

}

發(fā)送消息

@Service

public class MessageService {

@Resource

private RabbitTemplate rabbitTemplate;

public void sendMsg() {

// 第一條消息

MessageProperties messageProperties1 = new MessageProperties();

messageProperties1.setHeader("x-delay", 25000); // 設(shè)置延遲消息

Message message1 = MessageBuilder.withBody("hello world 1".getBytes()).andProperties(messageProperties1).build();

// 第二條消息

MessageProperties messageProperties2 = new MessageProperties();

messageProperties2.setHeader("x-delay", 15000); // 設(shè)置延遲消息

Message message2 = MessageBuilder.withBody("hello world 2".getBytes()).andProperties(messageProperties2).build();

// 發(fā)送消息

rabbitTemplate.convertAndSend("exchange.elay.b", "plugin", message1);

System.out.println("訂單消息發(fā)送消息時間是: " + new SimpleDateFormat("HH:mm:ss").format(new Date()));

rabbitTemplate.convertAndSend("exchange.elay.b", "plugin", message2);

System.out.println("支付消息發(fā)送消息時間是: " + new SimpleDateFormat("HH:mm:ss").format(new Date()));

}

}

接收消息

@Component

public class ReceiveMessage {

@RabbitListener(queues = {"queue.delay.b"})

public void receiveMsg(Message message) {

byte[] body = message.getBody();

String msg = new String(body);

System.out.println("接收的消息是: " + msg + "接收的時間是: " + new SimpleDateFormat("HH:mm:ss").format(new Date()));

}

}

十、消息 Confirm 模式

消息的 confirm 確認(rèn)機制,是指生產(chǎn)者投遞消息后,到達了消息服務(wù)器 Broker 里面的 exchange 交換機,則會給生產(chǎn)者一個應(yīng)答,生產(chǎn)者接收到應(yīng)答,用來確定這條消息是否正常的發(fā)送到 Broker 的 exchange 中,這也是消息可靠性投遞的重要保障;

添加依賴

org.springframework.boot

spring-boot-starter-amqp

2.6.13

application.yml 配置文件

spring:

rabbitmq:

host: 192.168.224.133 # ip

port: 5672 # 端口

username: admin # 用戶名

password: 123456 # 密碼

virtual-host: powernode # 虛擬主機

publisher-confirm-type: correlated # 開啟生產(chǎn)者的確認(rèn)模式,設(shè)置關(guān)聯(lián)模式

配置類

@Configuration

public class RabbitConfig {

// 1.定義交換機

@Bean

public DirectExchange directExchange() {

return ExchangeBuilder.directExchange("exchange.confirm").build();

}

// 2.定義隊列

@Bean

public Queue queue() {

return QueueBuilder.durable("queue.confirm").build();

}

// 3.交換機和隊列進行綁定

@Bean

public Binding bindingA(DirectExchange directExchange, Queue queue) {

return BindingBuilder.bind(queue).to(directExchange).with("info");

}

}

發(fā)送消息

@Service

public class MessageService {

@Resource

private RabbitTemplate rabbitTemplate;

@PostConstruct // 構(gòu)造方法后執(zhí)行它,相當(dāng)于初始化的作用

public void init() {

// 第一個參數(shù): 關(guān)聯(lián)數(shù)據(jù)

// 第二個參數(shù): 是否到達交換機

// 第三個參數(shù): 原因

rabbitTemplate.setConfirmCallback((CorrelationData correlationData, boolean ack, String cause) -> {

// 打印一下關(guān)聯(lián)數(shù)據(jù)

System.out.println("關(guān)聯(lián)數(shù)據(jù): " + correlationData);

if (ack) {

System.out.println("消息正確到達交換機");

}

if (!ack) {

System.out.println("消息沒有到達交換機,原因: " + cause);

}

});

}

public void sendMsg() {

Message message = MessageBuilder.withBody("hello world".getBytes()).build();

CorrelationData correlationData = new CorrelationData(); // 關(guān)聯(lián)數(shù)據(jù)

correlationData.setId("order_123456");

rabbitTemplate.convertAndSend("exchange.confirm", "info", message, correlationData);

}

}

十一、消息 Return 模式

rabbitmq 整個消息投遞的路徑為:

producer —> exchange —> queue —> consumer

消息從 producer 到 exchange 則會返回一個 confirmCallback消息從 exchange -> queue 投遞失敗則會返回一個 returnCallback

我們可以利用這兩個 callback 控制消息的可靠性傳遞

添加依賴

org.springframework.boot

spring-boot-starter-amqp

2.6.13

application.yml 配置文件

spring:

rabbitmq:

host: 192.168.224.133 # ip

port: 5672 # 端口

username: admin # 用戶名

password: 123456 # 密碼

virtual-host: powernode # 虛擬主機

publisher-returns: true # 開啟return模式

配置類

@Configuration

public class RabbitConfig {

// 1.定義交換機

@Bean

public DirectExchange directExchange() {

return ExchangeBuilder.directExchange("exchange.return").build();

}

// 2.定義隊列

@Bean

public Queue queue() {

return QueueBuilder.durable("queue.return").build();

}

// 3.交換機和隊列進行綁定

@Bean

public Binding bindingA(DirectExchange directExchange, Queue queue) {

return BindingBuilder.bind(queue).to(directExchange).with("info");

}

}

發(fā)送消息

@Service

public class MessageService {

@Resource

private RabbitTemplate rabbitTemplate;

@PostConstruct // 構(gòu)造方法后執(zhí)行它,相當(dāng)于初始化的作用

public void init() {

rabbitTemplate.setReturnsCallback(message -> {

System.out.println("消息從交換機沒有正確的路由到(投遞到)隊列,原因: " + message.getReplyText());

});

}

public void sendMsg() {

Message message = MessageBuilder.withBody("hello world".getBytes()).build();

CorrelationData correlationData = new CorrelationData(); // 關(guān)聯(lián)數(shù)據(jù)

correlationData.setId("order_654321");

// 發(fā)送正確不會回調(diào),只有發(fā)送錯誤才會回調(diào)

rabbitTemplate.convertAndSend("exchange.return", "info", message, correlationData);

}

}

十二、交換機詳細(xì)屬性

Name:交換機名稱;就是一個字符串Type:交換機類型,direct、topic、fanout、headers 四種Durability:持久化,聲明交換機是否持久化,代表交換機在服務(wù)器重啟后是否還存在Auto delete:是否自動刪除,曾經(jīng)有隊列綁定到該交換機,后來解綁了,那就會自動刪除該交換機Internal:內(nèi)部使用的,如果是 yes,客戶端無法直接發(fā)消息到此交換機,他只能用于交換機與交換機的綁定(用的很少)Arguments:只有一個取值 alternate-exchange,表示備用交換機,當(dāng)正常交換機的消息發(fā)送不到正常隊列時,消息就會往備用交換機里面發(fā)

添加依賴

spring:

rabbitmq:

host: 192.168.224.133 # ip

port: 5672 # 端口

username: admin # 用戶名

password: 123456 # 密碼

virtual-host: powernode # 虛擬主機

publisher-confirm-type: correlated # 開啟生產(chǎn)者的確認(rèn)模式,設(shè)置關(guān)聯(lián)模式

application.yml 配置文件

spring:

rabbitmq:

host: 192.168.224.133 # ip

port: 5672 # 端口

username: admin # 用戶名

password: 123456 # 密碼

virtual-host: powernode # 虛擬主機

配置類

/*

return ExchangeBuilder

.directExchange("exchange.properties.1") // 交換機名字

.durable(false) // 是否持久化,一般都是持久化

.autoDelete() // 設(shè)置自動刪除(當(dāng)隊列跟他解綁后是否自動刪除),一般不是自動刪除

.alternate("") // 設(shè)置備用交換機名字

.build();

*/

@Configuration

public class RabbitConfig {

// 1.定義交換機

// 正常交換機

@Bean

public DirectExchange normalExchange() {

return ExchangeBuilder.

directExchange("exchange.normal.1")

.alternate("exchange.alternate.1") // 設(shè)置備用交換機

.build();

}

// 備用交換機

@Bean

public FanoutExchange alternateExchange() {

return ExchangeBuilder.fanoutExchange("exchange.alternate.1").build();

}

// 2.定義隊列

// 正常隊列

@Bean

public Queue normalQueue() {

return QueueBuilder.durable("queue.normal.1").build();

}

// 備用隊列

@Bean

public Queue alternateQueue() {

return QueueBuilder.durable("queue.alternate.1").build();

}

// 3.交換機和隊列進行綁定

// 正常交換機與正常隊列綁定

@Bean

public Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {

return BindingBuilder.bind(normalQueue).to(normalExchange).with("info");

}

// 備用交換機與備用隊列綁定

@Bean

public Binding bindingAlternate(FanoutExchange alternateExchange, Queue alternateQueue) {

return BindingBuilder.bind(alternateQueue).to(alternateExchange);

}

}

發(fā)送消息

@Service

public class MessageService {

@Resource

private RabbitTemplate rabbitTemplate;

public void sendMsg() {

Message message = MessageBuilder.withBody("hello world".getBytes()).build();

// 發(fā)送正確不會回調(diào),只有發(fā)送錯誤才會回調(diào)

rabbitTemplate.convertAndSend("exchange.normal.1", "error", message);

}

}

十三、隊列詳細(xì)屬性

Type:隊列類型,一般是 ClassicName:隊列名稱,就是一個字符串,隨便一個字符串就可以Durability:聲明隊列是否持久化,代表隊列在服務(wù)器重啟后是否還存在Auto delete:是否自動刪除,如果為 true,當(dāng)沒有消費者連接到這個隊列的時候,隊列會自動刪除Exclusive:exclusive 屬性的隊列只對首次聲明它的連接可見,并且在連接斷開時自動刪除;基本不設(shè)置它,設(shè)置成 falseArguments:隊列的其他屬性,例如指定 DLX(死信交換機等)

1. x-expires:Number

當(dāng) Queue(隊列)在指定的時間未被訪問,則隊列將被自動刪除

2.?x-message-ttl:Number

發(fā)布的消息在隊列中存在多長時間后被取消(單位毫秒)

3.?x-overflow:String

設(shè)置隊列溢出行為,當(dāng)達到隊列的最大長度時,消息會發(fā)生什么,有效值為?Drop Head?或?Reject Publish

4. x-max-length:Number

隊列所能容下消息的最大長度,當(dāng)超出長度后,新消息將會覆蓋最前面的消息,類似于Redis的LRU算法

5. x-single-active-consumer:默認(rèn)為false

激活單一的消費者,也就是該隊列只能有一個消息者消費消息

6.?x-max-length-bytes:Number

限定隊列的最大占用空間,當(dāng)超出后也使用類似于Redis的LRU算法

7.?x-dead-letter-exchange:String

指定隊列關(guān)聯(lián)的死信交換機,有時候我們希望當(dāng)隊列的消息達到上限后溢出的消息不會被刪除掉,而是走到另一個隊列中保存起來

8. x-dead-letter-routing-key:String

指定死信交換機的路由鍵,一般和6一起定義

9. x-max-priority:Number

如果將一個隊列加上優(yōu)先級參數(shù),那么該隊列為優(yōu)先級隊列;

(1)、給隊列加上優(yōu)先級參數(shù)使其成為優(yōu)先級隊列

x-max-priority=10【0-255取值范圍】

(2)、給消息加上優(yōu)先級屬性

通過優(yōu)先級特性,將一個隊列實現(xiàn)插隊消費

MessageProperties messageProperties=new MessageProperties();

messageProperties.setPriority(8);

10.?x-queue-mode:String(理解下即可)

隊列類型x-queue-mode=lazy懶隊列,在磁盤上盡可能多地保留消息以減少RAM使用,如果未設(shè)置,則隊列將保留內(nèi)存緩存以盡可能快地傳遞消息

11.?x-queue-master-locator:String(用的較少,不講)

在集群模式下設(shè)置隊列分配到的主節(jié)點位置信息;

每個queue都有一個master節(jié)點,所有對于queue的操作都是事先在master上完成,之后再slave上進行相同的操作;

每個不同的queue可以坐落在不同的集群節(jié)點上,這些queue如果配置了鏡像隊列,那么會有1個master和多個slave。

基本上所有的操作都落在master上,那么如果這些queues的master都落在個別的服務(wù)節(jié)點上,而其他的節(jié)點又很空閑,這樣就無法做到負(fù)載均衡,那么勢必會影響性能;

關(guān)于master queue host 的分配有幾種策略,可以在queue聲明的時候使用x-queue-master-locator參數(shù),或者在policy上設(shè)置queue-master-locator,或者直接在rabbitmq的配置文件中定義queue_master_locator,有三種可供選擇的策略:

(1)min-masters:選擇master queue數(shù)最少的那個服務(wù)節(jié)點host;

(2)client-local:選擇與client相連接的那個服務(wù)節(jié)點host;

(3)random:隨機分配;

添加依賴

spring:

rabbitmq:

host: 192.168.224.133 # ip

port: 5672 # 端口

username: admin # 用戶名

password: 123456 # 密碼

virtual-host: powernode # 虛擬主機

publisher-confirm-type: correlated # 開啟生產(chǎn)者的確認(rèn)模式,設(shè)置關(guān)聯(lián)模式

application.yml 配置文件

spring:

rabbitmq:

host: 192.168.224.133 # ip

port: 5672 # 端口

username: admin # 用戶名

password: 123456 # 密碼

virtual-host: powernode # 虛擬主機

配置類

@Configuration

public class RabbitConfig {

// 1.定義交換機

@Bean

public DirectExchange directExchange() {

return ExchangeBuilder.directExchange("exchange.queue.properties").build();

}

// 2.定義隊列

@Bean

public Queue queue() {

// String name 隊列名稱

// boolean durable 是否持久化

// boolean exclusive 排他隊列

// boolean autoDelete 自動刪除

// @Nullable Map arguments

return new Queue("queue.properties.1", true, false, false);

}

// 3.交換機和隊列進行綁定

@Bean

public Binding bindingNormal(DirectExchange directExchange, Queue queue) {

return BindingBuilder.bind(queue).to(directExchange).with("info");

}

}

發(fā)送消息

@Service

public class MessageService {

@Resource

private RabbitTemplate rabbitTemplate;

public void sendMsg() {

Message message = MessageBuilder.withBody("hello world".getBytes()).build();

rabbitTemplate.convertAndSend("exchange.queue.properties", "info", message);

}

}

接收消息

@Component

public class ReceiveMessage {

@RabbitListener(queues = {"queue.properties.1"})

public void receiveMsg(Message message) {

byte[] body = message.getBody();

String msg = new String(body);

System.out.println("接收到的消息為: " + msg);

}

}

十四、消息可靠性投遞

消息的可靠性投遞就是要保證消息投遞過程中每一個環(huán)節(jié)都要成功,那么這肯定要犧牲一些性能,性能與可靠性是無法兼得的;

如果業(yè)務(wù)實時一致性要求不是特別高的場景,可以犧牲一些可靠性來換取性能。

1.代表消息從生產(chǎn)者發(fā)送到 Exchange2.代表消息從 Exchange 路由到 Queue3.代表消息在 Queue 中存儲4.代表消費者監(jiān)聽 Queue 并消費消息

1、確保消息發(fā)送到 RabbitMQ 服務(wù)器的交換機上

可能因為網(wǎng)絡(luò)或者 Broker 的問題導(dǎo)致 1 失敗,而此時應(yīng)該讓生產(chǎn)者知道消息是否正確發(fā)送到了 Broker 的 exchange 中

有兩種解決方案:

第一種是開啟Confirm(確認(rèn))模式;(異步)

第二種是開啟Transaction(事務(wù))模式;(性能低,實際項目中很少用)

2、確保消息路由到正確的隊列

可能因為路由關(guān)鍵字錯誤,或者隊列不存在,或者隊列名稱錯誤導(dǎo)致②失敗。

使用return模式,可以實現(xiàn)消息無法路由的時候返回給生產(chǎn)者;

當(dāng)然在實際生產(chǎn)環(huán)境下,我們不會出現(xiàn)這種問題,我們都會進行嚴(yán)格測試才會上線(很少有這種問題);

另一種方式就是使用備份交換機(alternate-exchange),無法路由的消息會發(fā)送到這個備用交換機上

3、確保消息在隊列正確地存儲

可能因為系統(tǒng)宕機、重啟、關(guān)閉等等情況導(dǎo)致存儲在隊列的消息丟失,即 3 出現(xiàn)問題;

解決方案:

隊列持久化

QueueBuilder.durable(QUEUE).build();

交換機持久化

ExchangeBuilder.directExchange(EXCHANGE).durable(true).build();

消息持久化

MessageProperties messageProperties = new MessageProperties();

messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 默認(rèn)就是持久化的

集群,鏡像隊列,高可用

確保消息從隊列正確地投遞到消費者

采用消息消費時的手動ack確認(rèn)機制來保證;

如果消費者收到消息后未來得及處理即發(fā)生異常,或者處理過程中發(fā)生異常,會導(dǎo)致④失敗。

為了保證消息從隊列可靠地達到消費者,RabbitMQ提供了消息確認(rèn)機制(message acknowledgement);

#開啟手動ack消息消費確認(rèn)

spring.rabbitmq.listener.simple.acknowledge-mode=manual

消費者在訂閱隊列時,通過上面的配置,不自動確認(rèn),采用手動確認(rèn),RabbitMQ會等待消費者顯式地回復(fù)確認(rèn)信號后才從隊列中刪除消息;

如果消息消費失敗,也可以調(diào)用basicReject()或者basicNack()來拒絕當(dāng)前消息而不是確認(rèn)。如果requeue參數(shù)設(shè)置為true,可以把這條消息重新存入隊列,以便發(fā)給下一個消費者(當(dāng)然,只有一個消費者的時候,這種方式可能會出現(xiàn)無限循環(huán)重復(fù)消費的情況,可以投遞到新的隊列中,或者只打印異常日志);

十五、消息的冪等性

消息消費時的冪等性(消息不被重復(fù)消費)

同一個消息,第一次接收,正常處理業(yè)務(wù),如果該消息第二次再接收,那就不能再處理業(yè)務(wù),否則就處理重復(fù)了

冪等性是:對于一個資源,不管你請求一次還是請求多次,對該資源本身造成的影響應(yīng)該是相同的,不能因為重復(fù)的請求而對該資源重復(fù)造成影響;

以接口冪等性舉例:

接口冪等性是指:一個接口用同樣的參數(shù)反復(fù)調(diào)用,不會造成業(yè)務(wù)錯誤,那么這個接口就是具有冪等性的,比如:注冊接口、發(fā)送短信驗證碼接口;

比如同一個訂單我支付兩次,但是只會扣款一次,第二次支付不會扣款,這說明這個支付接口是具有冪等性的

如何避免消息的重復(fù)消費問題?(消息消費時d額冪等性)

全局唯一 ID + Redis

生產(chǎn)者在發(fā)送消息時,為每條消息設(shè)置一個全局唯一的 messageId,消費者拿到消息后,使用setnx 命令,將 messageId 作為 key 放到 redis 中:setnx(messageId, 1),若返回1,說明之前沒有消費過,正常消費;若返回0,說明這條消息之前已消費過,拋棄;

參考代碼

//1、把消息的唯一ID寫入redis

boolean flag = stringRedisTemplate.opsForValue().setIfAbsent("idempotent:" + orders.getId(), String.valueOf(orders.getId())); //如果redis中該key不存在,那么就設(shè)置,存在就不設(shè)置

if (flag) { //key不存在返回true

//相當(dāng)于是第一次消費該消息

//TODO 處理業(yè)務(wù)

System.out.println("正常處理業(yè)務(wù)....." + orders.getId());

}

柚子快報激活碼778899分享:RabbitMQ 學(xué)習(xí)筆記

http://yzkb.51969.com/

好文鏈接

評論可見,查看隱藏內(nèi)容

本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。

轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。

本文鏈接:http://m.gantiao.com.cn/post/18380993.html

發(fā)布評論

您暫未設(shè)置收款碼

請在主題配置——文章設(shè)置里上傳

掃描二維碼手機訪問

文章目錄