欧美free性护士vide0shd,老熟女,一区二区三区,久久久久夜夜夜精品国产,久久久久久综合网天天,欧美成人护士h版

目錄

柚子快報(bào)激活碼778899分享:分布式 RabbitMQ

柚子快報(bào)激活碼778899分享:分布式 RabbitMQ

http://yzkb.51969.com/

讓你快速了解RabbitMQ

什么是RabbitM

RabbitMQ是實(shí)現(xiàn)了高級(jí)消息隊(duì)列協(xié)議(AMQP)的開(kāi)源消息代理軟件(亦稱(chēng)面向消息的中間件)

消息隊(duì)列的概念

哪什么是消息隊(duì)列呢?

消息隊(duì)列(Message Queue)”是在消息的傳輸過(guò)程中保存消息的容器。在消息隊(duì)列中,通常有生產(chǎn)者和消費(fèi)者兩個(gè)角色。生產(chǎn)者只負(fù)責(zé)發(fā)送數(shù)據(jù)到消息隊(duì)列,誰(shuí)從消息隊(duì)列中取出數(shù)據(jù)處理,他不管。消費(fèi)者只負(fù)責(zé)從消息隊(duì)列中取出數(shù)據(jù)處理,他不管這是誰(shuí)發(fā)送的數(shù)據(jù)。

哪消息隊(duì)列有什么作用呢

主要作用有三點(diǎn):

1.解耦

實(shí)現(xiàn)生產(chǎn)者和消費(fèi)者的解耦,生產(chǎn)者和消費(fèi)者不直接調(diào)用,也不用關(guān)心對(duì)方如何處理,代碼的維護(hù)性提高

比如使用openfeign實(shí)現(xiàn)服務(wù)調(diào)用,被調(diào)用服務(wù)的接口發(fā)生修改,服務(wù)調(diào)用方也需要進(jìn)行修改,服務(wù)之間的耦合性就會(huì)較高,那么不利于開(kāi)發(fā)和維護(hù)

2.異步

同步調(diào)用,服務(wù)A調(diào)用服務(wù)B,必須等待服務(wù)B執(zhí)行完業(yè)務(wù),服務(wù)A才能執(zhí)行其它業(yè)務(wù)

異步調(diào)用,服務(wù)A發(fā)送消息給消息隊(duì)列,馬上返回完成其它業(yè)務(wù),不用等待服務(wù)B執(zhí)行完

3.削峰

這其實(shí)是MQ一個(gè)很重要的應(yīng)用,可以通過(guò)控制消息隊(duì)列的長(zhǎng)度來(lái)限制請(qǐng)求流量,從而達(dá)到限流保護(hù)服務(wù)器的作用

假如說(shuō)系統(tǒng)A在某個(gè)時(shí)間段請(qǐng)求大量增加,有上萬(wàn)條數(shù)據(jù)發(fā)送過(guò)來(lái),系統(tǒng)A就會(huì)把發(fā)送過(guò)來(lái)的數(shù)據(jù)直接發(fā)到SQL中,mysl數(shù)據(jù)庫(kù)里執(zhí)行,如果處理不過(guò)來(lái)就會(huì)直接導(dǎo)致系統(tǒng)癱瘓,使用MQ,系統(tǒng)A不再是直接發(fā)送SQL到數(shù)據(jù)庫(kù),而是把數(shù)據(jù)發(fā)送到MQ,MQ短時(shí)間積壓數(shù)據(jù)是可以接受的,然后由消費(fèi)者每次拉取2000條進(jìn)行處理,防止在請(qǐng)求峰值時(shí)期大量的請(qǐng)求直接發(fā)送到MySQL導(dǎo)致系統(tǒng)崩潰。

同時(shí)消息隊(duì)列也有它的缺點(diǎn)

1.系統(tǒng)的復(fù)雜性提高了

2.系統(tǒng)的可用性降低了

消息隊(duì)列的概念

生產(chǎn)者 向消息隊(duì)列發(fā)送消息的服務(wù) 消費(fèi)者 從消息隊(duì)列取消息的服務(wù) 隊(duì)列 queue 存放消息的容器,采用FIFO數(shù)據(jù)結(jié)構(gòu) 交換機(jī) exchange 實(shí)現(xiàn)消息路由,將消息分發(fā)到對(duì)應(yīng)的隊(duì)列中 消息服務(wù)器 Broker 進(jìn)行消息通信的軟件平臺(tái)服務(wù)器 虛擬主機(jī) virtual host 類(lèi)似于namespace,將不同用戶(hù)的交換機(jī)和隊(duì)列區(qū)分開(kāi)來(lái) 連接 connection 網(wǎng)絡(luò)連接 通道 channel 數(shù)據(jù)通信的通道

RabbitMQ的基本使用

1.添加用戶(hù)

不同的系統(tǒng)可以使用各自的用戶(hù)登錄RabbitMQ,可以在Admin的User頁(yè)面添加新用戶(hù)

2.添加虛擬主機(jī)

虛擬主機(jī)相當(dāng)于一個(gè)獨(dú)立的MQ服務(wù),有自身的隊(duì)列、交換機(jī)、綁定策略等。 添加虛擬主機(jī)

3.添加隊(duì)列

不同的消息隊(duì)列保存不同類(lèi)型的消息,如支付消息、秒殺消息、數(shù)據(jù)同步消息等。 添加隊(duì)列,需要填寫(xiě)虛擬主機(jī)、類(lèi)型、名稱(chēng)、持久化、自動(dòng)刪除和參數(shù)等。

4.添加交換機(jī)

生產(chǎn)者將消息發(fā)送到交換機(jī)Exchange,再由交換機(jī)路由到一個(gè)或多個(gè)隊(duì)列中; 交換器的類(lèi)型有fanout、direct、topic、headers這四種,下篇文章將詳細(xì)介紹。 添加交換機(jī)

RabbitMQ的五種消息模型

RabbitMQ提供了多種消息模型,官網(wǎng)上第6種是RPC不屬于常規(guī)的消息隊(duì)列。 屬于消息模型的是前5種:

簡(jiǎn)單的一對(duì)一模型工作隊(duì)列模型 ,一個(gè)生產(chǎn)者將消息分發(fā)給多個(gè)消費(fèi)者發(fā)布/訂閱模型 ,生產(chǎn)者發(fā)布消息,多個(gè)消費(fèi)者同時(shí)收取路由模型 ,生產(chǎn)者通過(guò)關(guān)鍵字發(fā)送消息給特定消費(fèi)者主題模型 ,路由模式基礎(chǔ)上,在關(guān)鍵字里加入了通配符

1.一對(duì)一模型

最基本的隊(duì)列模型: 一個(gè)生產(chǎn)者發(fā)送消息到一個(gè)隊(duì)列,一個(gè)消費(fèi)者從隊(duì)列中取消息。

2.操作步驟

1)啟動(dòng)Rabbitmq,在管理頁(yè)面中創(chuàng)建用戶(hù)admin 2)使用admin登錄,然后創(chuàng)建虛擬主機(jī)myhost

3.案例代碼

導(dǎo)入依賴(lài)

com.rabbitmq

amqp-client

3.4.1

開(kāi)發(fā)工具類(lèi)

public class MQUtils {

public static final String QUEUE_NAME = "myqueue01";

public static final String QUEUE_NAME2 = "myqueue02";

public static final String EXCHANGE_NAME = "myexchange01";

public static final String EXCHANGE_NAME2 = "myexchange02";

public static final String EXCHANGE_NAME3 = "myexchange03";

/**

* 獲得MQ的連接

* @return

* @throws IOException

*/

public static Connection getConnection() throws IOException {

ConnectionFactory connectionFactory = new ConnectionFactory();

//配置服務(wù)器名、端口、虛擬主機(jī)名、登錄賬號(hào)和密碼

connectionFactory.setHost("localhost");

connectionFactory.setPort(5672);

connectionFactory.setVirtualHost("myhost");

connectionFactory.setUsername("admin");

connectionFactory.setPassword("123456");

return connectionFactory.newConnection();

}

}

開(kāi)發(fā)生產(chǎn)者

/**

* 生產(chǎn)者,發(fā)送簡(jiǎn)單的消息到隊(duì)列中

*/

public class SimpleProducer {

public static void main(String[] args) throws IOException {

Connection connection = MQUtils.getConnection();

//創(chuàng)建通道

Channel channel = connection.createChannel();

//定義隊(duì)列

channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);

String msg = "Hello World!";

//發(fā)布消息到隊(duì)列

channel.basicPublish("",MQUtils.QUEUE_NAME,null,msg.getBytes());

channel.close();

connection.close();

}

}

運(yùn)行生產(chǎn)者代碼,管理頁(yè)面點(diǎn)進(jìn)myqueue01,在GetMessages中可以看到消息

開(kāi)發(fā)消費(fèi)者

/**

* 消費(fèi)者,從隊(duì)列中讀取簡(jiǎn)單的消息

*/

public class SimpleConsumer {

public static void main(String[] args) throws IOException, InterruptedException {

Connection connection = MQUtils.getConnection();

Channel channel = connection.createChannel();

//定義隊(duì)列

channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);

//創(chuàng)建消費(fèi)者

QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

//消費(fèi)者消費(fèi)通道中的消息

channel.basicConsume(MQUtils.QUEUE_NAME,true,queueingConsumer);

//讀取消息

while(true){

QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();

System.out.println(new String(delivery.getBody()));

}

}

}

工作隊(duì)列模型

工作隊(duì)列,生產(chǎn)者將消息分發(fā)給多個(gè)消費(fèi)者,如果生產(chǎn)者生產(chǎn)了100條消息,消費(fèi)者1消費(fèi)50條,消費(fèi)者2消費(fèi)50條。

1 案例代碼

開(kāi)發(fā)生產(chǎn)者

/**

多對(duì)多模式的生產(chǎn)者,會(huì)發(fā)送多條消息到隊(duì)列中

*/

public class WorkProductor {

public static void main(String[] args) throws IOException, InterruptedException {

Connection connection = MQUtils.getConnection();

Channel channel = connection.createChannel();

channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);

for(int i = 0;i < 100;i++){

String msg = "Hello-->" + i;

channel.basicPublish("",MQUtils.QUEUE_NAME,null, msg.getBytes());

System.out.println("send:" + msg);

Thread.sleep(10);

}

channel.close();

connection.close();

}

}

開(kāi)發(fā)消費(fèi)者1

/**

* 多對(duì)多模式的消費(fèi)者1

*/

public class WorkConsumer01 {

public static void main(String[] args) throws IOException, InterruptedException {

Connection connection = MQUtils.getConnection();

Channel channel = connection.createChannel();

channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);

QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

//消費(fèi)者消費(fèi)通道中的消息

channel.basicConsume(MQUtils.QUEUE_NAME,true,queueingConsumer);

while(true){

QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();

System.out.println("WorkConsumer1 receive :" + new String(delivery.getBody()));

Thread.sleep(10);

}

}

}

開(kāi)發(fā)消費(fèi)者2

/**

* 多對(duì)多模式的消費(fèi)者2

*/

public class WorkConsumer02 {

public static void main(String[] args) throws IOException, InterruptedException {

Connection connection = MQUtils.getConnection();

Channel channel = connection.createChannel();

channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);

QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

//消費(fèi)者消費(fèi)通道中的消息

channel.basicConsume(MQUtils.QUEUE_NAME,true,queueingConsumer);

while(true){

QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();

System.out.println("WorkConsumer2 receive :" + new String(delivery.getBody()));

Thread.sleep(1000);

}

}

}

生產(chǎn)者發(fā)送100個(gè)消息,兩個(gè)消費(fèi)者分別讀取了50條。 看消息內(nèi)容,發(fā)現(xiàn)隊(duì)列發(fā)送消息采用的是輪詢(xún)方式,也就是先發(fā)給消費(fèi)者1,再發(fā)給消費(fèi)者2,依次往復(fù)。

2 能者多勞

上面案例中有一個(gè)問(wèn)題:消費(fèi)者處理消息的速度是不一樣的,消費(fèi)者1處理后睡眠10毫秒(Thread.sleep(10)),消費(fèi)者2是1000毫秒,速度相差100倍,但是最后處理的消息數(shù)還是一樣的。這樣就存在效率問(wèn)題:處理能力強(qiáng)的消費(fèi)者得不到更多的消息。

因?yàn)殛?duì)列默認(rèn)采用是自動(dòng)確認(rèn)機(jī)制,消息發(fā)過(guò)去后就自動(dòng)確認(rèn),隊(duì)列不清楚每個(gè)消息具體什么時(shí)間處理完,所以平均分配消息數(shù)量。

實(shí)現(xiàn)能者多勞:

channel.basicQos(1);限制隊(duì)列一次發(fā)一個(gè)消息給消費(fèi)者,等消費(fèi)者有了反饋,再發(fā)下一條channel.basicAck 消費(fèi)完消息后手動(dòng)反饋,處理快的消費(fèi)者就能處理更多消息basicConsume 中的參數(shù)改為false

/**

多對(duì)多模式的消費(fèi)者1

*/

public class WorkConsumer1 {

public static void main(String[] args) throws IOException, InterruptedException {

Connection connection = MQUtils.getConnection();

Channel channel = connection.createChannel();

channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);

//同一時(shí)刻服務(wù)器只發(fā)送一條消息給消費(fèi)者

channel.basicQos(1);

QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

//true是自動(dòng)返回完成狀態(tài),false表示手動(dòng)

channel.basicConsume(MQUtils.QUEUE_NAME,false,queueingConsumer);

while(true){

QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();

System.out.println("WorkConsumer1 receive :" + new String(delivery.getBody()));

Thread.sleep(10);

//手動(dòng)確定返回狀態(tài),不寫(xiě)就是自動(dòng)確認(rèn)

channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);

}

}

}

/**

* 多對(duì)多模式的消費(fèi)者2

*/

public class WorkConsumer2 {

public static void main(String[] args) throws IOException, InterruptedException {

Connection connection = MQUtils.getConnection();

Channel channel = connection.createChannel();

channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);

//同一時(shí)刻服務(wù)器只發(fā)送一條消息給消費(fèi)者

channel.basicQos(1);

QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

//true是自動(dòng)返回完成狀態(tài),false表示手動(dòng)

channel.basicConsume(MQUtils.QUEUE_NAME,false,queueingConsumer);

while(true){

QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();

System.out.println("WorkConsumer2 receive :" + new String(delivery.getBody()));

Thread.sleep(1000);

//手動(dòng)確定返回狀態(tài),不寫(xiě)就是自動(dòng)確認(rèn)

channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);

}

}

}

3 發(fā)布/訂閱模型

發(fā)布/訂閱模式和Work模式的區(qū)別是:Work模式只存在一個(gè)隊(duì)列,多個(gè)消費(fèi)者共同消費(fèi)一個(gè)隊(duì)列中的消息;而發(fā)布訂閱模式存在多個(gè)隊(duì)列,不同的消費(fèi)者可以從各自的隊(duì)列中處理完全相同的消息。

1. 操作步驟

實(shí)現(xiàn)步驟:

創(chuàng)建交換機(jī)(Exchange)類(lèi)型是fanout(扇出)交換機(jī)需要綁定不同的隊(duì)列不同的消費(fèi)者從不同的隊(duì)列中獲得消息生產(chǎn)者發(fā)送消息到交換機(jī)再由交換機(jī)將消息分發(fā)到多個(gè)隊(duì)列

新建隊(duì)列

新建交換機(jī)

點(diǎn)擊交換機(jī),在bindings里面綁定兩個(gè)隊(duì)列

2.案例代碼

生產(chǎn)者

/**

* 發(fā)布和訂閱模式的生產(chǎn)者,消息會(huì)通過(guò)交換機(jī)發(fā)到隊(duì)列

*/

public class PublishProductor {

public static void main(String[] args) throws IOException {

Connection connection = MQUtils.getConnection();

Channel channel = connection.createChannel();

//聲明fanout exchange

channel.exchangeDeclare(MQUtils.EXCHANGE_NAME,"fanout");

String msg = "Hello Fanout";

//發(fā)布消息到交換機(jī)

channel.basicPublish(MQUtils.EXCHANGE_NAME,"",null,msg.getBytes());

System.out.println("send:" + msg);

channel.close();

connection.close();

}

}

消費(fèi)者1

/**

* 發(fā)布訂閱模式的消費(fèi)者1

* 兩個(gè)消費(fèi)者綁定的消息隊(duì)列不同

* 通過(guò)交換機(jī)一個(gè)消息能被不同隊(duì)列的兩個(gè)消費(fèi)者同時(shí)獲取

* 一個(gè)隊(duì)列可以有多個(gè)消費(fèi)者,隊(duì)列中的消息只能被一個(gè)消費(fèi)者獲取

*/

public class SubscribeConsumer1 {

public static void main(String[] args) throws IOException, InterruptedException {

Connection connection = MQUtils.getConnection();

Channel channel = connection.createChannel();

channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);

//綁定隊(duì)列1到交換機(jī)

channel.queueBind(MQUtils.QUEUE_NAME,MQUtils.EXCHANGE_NAME,"");

QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

channel.basicConsume(MQUtils.QUEUE_NAME,true,queueingConsumer);

while(true){

QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();

System.out.println("Consumer1 receive :" + new String(delivery.getBody()));

}

}

}

消費(fèi)者2

public class SubscribeConsumer2 {

public static void main(String[] args) throws IOException, InterruptedException {

Connection connection = MQUtils.getConnection();

Channel channel = connection.createChannel();

channel.queueDeclare(MQUtils.QUEUE_NAME2,false,false,false,null);

//綁定隊(duì)列2到交換機(jī)

channel.queueBind(MQUtils.QUEUE_NAME2,MQUtils.EXCHANGE_NAME,"");

QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

channel.basicConsume(MQUtils.QUEUE_NAME2,true,queueingConsumer);

while(true){

QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();

System.out.println("Consumer2 receive :" + new String(delivery.getBody()));

}

}

}

路由模型

路由模式的消息隊(duì)列可以給隊(duì)列綁定不同的key,生產(chǎn)者發(fā)送消息時(shí),給消息設(shè)置不同的key,這樣交換機(jī)在分發(fā)消息時(shí),可以讓消息路由到key匹配的隊(duì)列中。 可以想象上圖是一個(gè)日志處理系統(tǒng),C1可以處理error日志消息,C2可以處理info\error\warining類(lèi)型的日志消息,使用路由模式就很容易實(shí)現(xiàn)了。

1 操作步驟

新建direct類(lèi)型的交換機(jī)

2.案例代碼

生產(chǎn)者,給myqueue01綁定了key:error,myqueue02綁定了key:debug,然后發(fā)送了key:error的消息

/**

路由模式的生產(chǎn)者,發(fā)布消息會(huì)有特定的Key,消息會(huì)被綁定特定Key的消費(fèi)者獲取

*/

public class RouteProductor {

public static void main(String[] args) throws IOException, InterruptedException {

Connection connection = MQUtils.getConnection();

Channel channel = connection.createChannel();

//聲明交換機(jī)類(lèi)型為direct

channel.exchangeDeclare(MQUtils.EXCHANGE_NAME2,"direct");

String msg = "Hello-->Route";

//綁定隊(duì)列1到交換機(jī),指定了Key為error

channel.queueBind(MQUtils.QUEUE_NAME,MQUtils.EXCHANGE_NAME2,"error");

//綁定隊(duì)列2到交換機(jī),指定了Key為debug

channel.queueBind(MQUtils.QUEUE_NAME2,MQUtils.EXCHANGE_NAME2,"debug");

//error是一個(gè)指定的Key

channel.basicPublish(MQUtils.EXCHANGE_NAME2,"error",null,msg.getBytes());

System.out.println("send:" + msg);

channel.close();

connection.close();

}

}

消費(fèi)者1

/**

* 路由模式的消費(fèi)者1

* 可以指定Key,消費(fèi)特定的消息

*/

public class RouteConsumer1 {

public static void main(String[] args) throws IOException, InterruptedException {

Connection connection = MQUtils.getConnection();

Channel channel = connection.createChannel();

channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);

QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

channel.basicConsume(MQUtils.QUEUE_NAME,true,queueingConsumer);

while(true){

QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();

System.out.println("RouteConsumer1 receive :" + new String(delivery.getBody()));

}

}

}

消費(fèi)者2

/**

* 路由模式的消費(fèi)者2

* 可以指定Key,消費(fèi)特定的消息

*/

public class RouteConsumer2 {

public static void main(String[] args) throws IOException, InterruptedException {

Connection connection = MQUtils.getConnection();

Channel channel = connection.createChannel();

channel.queueDeclare(MQUtils.QUEUE_NAME2,false,false,false,null);

QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

channel.basicConsume(MQUtils.QUEUE_NAME2,true,queueingConsumer);

while(true){

QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();

System.out.println("RouteConsumer2 receive :" + new String(delivery.getBody()));

}

}

}

主題模型

主題模式和路由模式差不多,在key中可以加入通配符:

* 匹配任意一個(gè)單詞 com.* ----> com.hopu com.blb com.baidu# 匹配.號(hào)隔開(kāi)的0個(gè)或多個(gè)單詞 com.# —> com.hopu.net com.hopu com.163.xxx.xxx.xxx

1 案例代碼

生產(chǎn)者代碼

/**

主題模式的生產(chǎn)者

*/

public class TopicProductor {

public static void main(String[] args) throws IOException, InterruptedException {

Connection connection = MQUtils.getConnection();

Channel channel = connection.createChannel();

//聲明交換機(jī)類(lèi)型為topic

channel.exchangeDeclare(MQUtils.EXCHANGE_NAME3,"topic");

//綁定隊(duì)列到交換機(jī),最后指定了Key

channel.queueBind(MQUtils.QUEUE_NAME,MQUtils.EXCHANGE_NAME3,"xray.#");

//綁定隊(duì)列到交換機(jī),最后指定了Key

channel.queueBind(MQUtils.QUEUE_NAME2,MQUtils.EXCHANGE_NAME3,"*.*.cn");

String msg = "Hello-->Topic";

channel.basicPublish(MQUtils.EXCHANGE_NAME3,"rabbit.com.cn",null,msg.getBytes());

System.out.println("send:" + msg);

channel.close();

connection.close();

}

}

消費(fèi)者1

/**

* 主題模式的消費(fèi)者1 ,類(lèi)似路由模式,可以使用通配符對(duì)Key進(jìn)行篩選

* #匹配1個(gè)或多個(gè)單詞,*匹配一個(gè)單詞

*/

public class TopicConsumer1 {

public static void main(String[] args) throws IOException, InterruptedException {

Connection connection = MQUtils.getConnection();

Channel channel = connection.createChannel();

channel.queueDeclare(MQUtils.QUEUE_NAME,false,false,false,null);

QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

channel.basicConsume(MQUtils.QUEUE_NAME,true,queueingConsumer);

while(true){

QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();

System.out.println("TopicConsumer1 receive :" + new String(delivery.getBody()));

}

}

}

消費(fèi)者2

/**

* 主題模式的消費(fèi)者2

*/

public class TopicConsumer2 {

public static void main(String[] args) throws IOException, InterruptedException {

Connection connection = MQUtils.getConnection();

Channel channel = connection.createChannel();

channel.queueDeclare(MQUtils.QUEUE_NAME2,false,false,false,null);

QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

channel.basicConsume(MQUtils.QUEUE_NAME2,true,queueingConsumer);

while(true){

QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();

System.out.println("TopicConsumer2 receive :" + new String(delivery.getBody()));

}

}

}

SpringBoot整合RabbitMQ

創(chuàng)建兩個(gè)SpringBoot項(xiàng)目,一個(gè)作為生產(chǎn)者,一個(gè)作為消費(fèi)者

生產(chǎn)者會(huì)發(fā)送兩種消息:保存課程(更新和添加),刪除課程

消費(fèi)者監(jiān)聽(tīng)兩個(gè)隊(duì)列:保存課程隊(duì)列和刪除課程隊(duì)列

2)給生產(chǎn)者和消費(fèi)者服務(wù)添加依賴(lài)

org.springframework.boot

spring-boot-starter-amqp

3) 給生產(chǎn)者和消費(fèi)者服務(wù)添加配置

spring:

rabbitmq:

host: localhost

port: 5672

username: admin

password: 123456

virtual-host: myhost

4)生產(chǎn)者的配置,用于生成消息隊(duì)列和交換機(jī)

/**

* RabbitMQ的配置

*/

@Configuration

public class RabbitMQConfig {

public static final String QUEUE_COURSE_SAVE = "queue.course.save";

public static final String QUEUE_COURSE_REMOVE = "queue.course.remove";

public static final String KEY_COURSE_SAVE = "key.course.save";

public static final String KEY_COURSE_REMOVE = "key.course.remove";

public static final String COURSE_EXCHANGE = "edu.course.exchange";

@Bean

public Queue queueCourseSave() {

return new Queue(QUEUE_COURSE_SAVE);

}

@Bean

public Queue queueCourseRemove() {

return new Queue(QUEUE_COURSE_REMOVE);

}

@Bean

public TopicExchange topicExchange() {

return new TopicExchange(COURSE_EXCHANGE);

}

@Bean

public Binding bindCourseSave() {

return BindingBuilder.bind(queueCourseSave()).to(topicExchange()).with(KEY_COURSE_SAVE);

}

@Bean

public Binding bindCourseRemove() {

return BindingBuilder.bind(queueCourseRemove()).to(topicExchange()).with(KEY_COURSE_REMOVE);

}

}

5) 生產(chǎn)者發(fā)送消息的核心代碼

@Autowired

RabbitTemplate rabbitTemplate;

//發(fā)消息的代碼

rabbitTemplate.convertAndSend(交換機(jī)的名稱(chēng),消息的key,消息內(nèi)容);

6)消費(fèi)者添加監(jiān)聽(tīng)器

@Slf4j

@Component

public class CourseMQListener {

public static final String QUEUE_COURSE_SAVE = "queue.course.save";

public static final String QUEUE_COURSE_REMOVE = "queue.course.remove";

public static final String KEY_COURSE_SAVE = "key.course.save";

public static final String KEY_COURSE_REMOVE = "key.course.remove";

public static final String COURSE_EXCHANGE = "course.exchange";

/**

* 監(jiān)聽(tīng)課程添加操作

*/

@RabbitListener(bindings = {

@QueueBinding(value = @Queue(value = QUEUE_COURSE_SAVE, durable = "true"),

exchange = @Exchange(value = COURSE_EXCHANGE,

type = ExchangeTypes.TOPIC,

ignoreDeclarationExceptions = "true")

, key = KEY_COURSE_SAVE)})

public void receiveCourseSaveMessage(String message) {

try {

log.info("課程添加:{}",message);

} catch (Exception ex) {

ex.printStackTrace();

}

}

/**

* 監(jiān)聽(tīng)課程刪除操作

*/

@RabbitListener(bindings = {

@QueueBinding(value = @Queue(value = QUEUE_COURSE_REMOVE, durable = "true"),

exchange = @Exchange(value = COURSE_EXCHANGE,

type = ExchangeTypes.TOPIC,

ignoreDeclarationExceptions = "true")

, key = KEY_COURSE_REMOVE)})

public void receiveCourseDeleteMessage(Long id) {

try {

log.info("課程刪除完成:{}",id);

} catch (Exception ex) {

ex.printStackTrace();

}

}

}

CourseSaveMessage(String message) { try { log.info(“課程添加:{}”,message); } catch (Exception ex) { ex.printStackTrace(); } }

/**

* 監(jiān)聽(tīng)課程刪除操作

*/

@RabbitListener(bindings = {

@QueueBinding(value = @Queue(value = QUEUE_COURSE_REMOVE, durable = "true"),

exchange = @Exchange(value = COURSE_EXCHANGE,

type = ExchangeTypes.TOPIC,

ignoreDeclarationExceptions = "true")

, key = KEY_COURSE_REMOVE)})

public void receiveCourseDeleteMessage(Long id) {

try {

log.info("課程刪除完成:{}",id);

} catch (Exception ex) {

ex.printStackTrace();

}

}

}

柚子快報(bào)激活碼778899分享:分布式 RabbitMQ

http://yzkb.51969.com/

文章來(lái)源

評(píng)論可見(jiàn),查看隱藏內(nèi)容

本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場(chǎng)。

轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。

本文鏈接:http://m.gantiao.com.cn/post/19056276.html

發(fā)布評(píng)論

您暫未設(shè)置收款碼

請(qǐng)?jiān)谥黝}配置——文章設(shè)置里上傳

掃描二維碼手機(jī)訪問(wèn)

文章目錄