柚子快報(bào)激活碼778899分享:RabbitMq筆記
柚子快報(bào)激活碼778899分享:RabbitMq筆記
目錄
1.1. MQ ?的相關(guān)概念
1.1.1. ?什么是 MQ
1.1.2. 為什么要用 MQ
1.1.3. MQ 的分類
1.1.4. MQ 的選擇
1.2.2.四大核心概念
1.2.3.RabbitMQ 核心部分
1.2.4.各個(gè)名詞介紹
1.2.5.安裝Centos7
1.2.5.安裝RabbitMQ
2. Hello World
2.1. 依賴
1.1. MQ ?的相關(guān)概念
1.1.1. ?什么是 MQ
????????MQ(message queue),從字面意思上看,本質(zhì)是個(gè)隊(duì)列,
FIFO 先入先出,只不過隊(duì)列中存放的內(nèi)容是 message
而已,還是一種跨進(jìn)程的通信機(jī)制,用于上下游傳遞消息。在互聯(lián)網(wǎng)架構(gòu)中,
MQ 是一種非常常見的上下游“邏輯解耦
+
物理解耦”的消息通信服務(wù)。使用了
MQ
之后,消息發(fā)送上游只需要依賴
MQ,不用依賴其他服務(wù)。
1.1.2. 為什么要用 MQ
????????1.流量消峰
????????2.應(yīng)用解耦
????????3.異步處理
1.1.3. MQ 的分類
????????2007 年發(fā)布,是一個(gè)在
AMQP(
高級(jí)消息隊(duì)列協(xié)議
)
基礎(chǔ)上完成的,可復(fù)用的企業(yè)消息系統(tǒng),是
當(dāng)前最
主流的消息中間件之一
????????優(yōu)點(diǎn):
由于
erlang
語言的
高并發(fā)特性
,性能較好;
吞吐量到萬級(jí)
,
MQ
功能比較完備
,健壯、穩(wěn)定、易 用、跨平臺(tái)、
支持多種語言
如:
Python
、
Ruby
、
.NET
、
Java
、
JMS
、
C
、
PHP
、
ActionScript
、
XMPP
、STOMP 等,支持
AJAX
文檔齊全;開源提供的管理界面非常棒,用起來很好用
,
社區(qū)活躍度高;更新頻率相當(dāng)高 https://www.rabbitmq.com/news.html
1.1.4. MQ 的選擇
????????RabbitMQ 結(jié)合 erlang
語言本身的并發(fā)優(yōu)勢(shì),性能好
時(shí)效性微秒級(jí)
,
社區(qū)活躍度也比較高,管理界面用起來十分方便,如果你的
數(shù)據(jù)量沒有那么大
,中小型公司優(yōu)先選擇功能比較完備的
RabbitMQ
。
1.2.2.四大核心概念
生產(chǎn)者
????????產(chǎn)生數(shù)據(jù)發(fā)送消息的程序是生產(chǎn)者。
交換機(jī)
????????交換機(jī)是 RabbitMQ 非常重要的一個(gè)部件,一方面它接收來自生產(chǎn)者的消息,另一方面它將消息推送到隊(duì)列中。交換機(jī)必須確切知道如何處理它接收到的消息,是將這些消息推送到特定隊(duì)列還是推送到多個(gè)隊(duì)列,亦或者是把消息丟棄,這個(gè)得有交換機(jī)類型決定。
隊(duì)列
????????隊(duì)列是 RabbitMQ
內(nèi)部使用的一種數(shù)據(jù)結(jié)構(gòu),盡管消息流經(jīng)
RabbitMQ 和應(yīng)用程序,但它們只能存儲(chǔ)在隊(duì)列中。隊(duì)列僅受主機(jī)的內(nèi)存和磁盤限制的約束,本質(zhì)上是一個(gè)大的消息緩沖區(qū)。許多生產(chǎn)者可以將消息發(fā)送到一個(gè)隊(duì)列,許多消費(fèi)者可以嘗試從一個(gè)隊(duì)列接收數(shù)據(jù)。這就是我們使用隊(duì)列的方式。
消費(fèi)者
????????消費(fèi)與接收具有相似的含義。消費(fèi)者大多時(shí)候是一個(gè)等待接收消息的程序。請(qǐng)注意生產(chǎn)者,消費(fèi)者和消息中間件很多時(shí)候并不在同一機(jī)器上。同一個(gè)應(yīng)用程序既可以是生產(chǎn)者又是可以是消費(fèi)者。
1.2.3.RabbitMQ 核心部分
1.2.4.各個(gè)名詞介紹
Broker
:接收和分發(fā)消息的應(yīng)用,
RabbitMQ Server
就是
Message Broker
Virtual host
:出于多租戶和安全因素設(shè)計(jì)的,把
AMQP 的基本組件劃分到一個(gè)虛擬的分組中,類似于網(wǎng)絡(luò)中的
namespace
概念。當(dāng)多個(gè)不同的用戶使用同一個(gè)
RabbitMQ server 提供的服務(wù)時(shí),可以劃分出多個(gè)
vhost
,每個(gè)用戶在自己的
vhost
創(chuàng)建
exchange
/
queue
等
Connection
:
publisher
/
consumer
和
broker
之間的
TCP
連接
Channel
:如果每一次訪問
RabbitMQ
都建立一個(gè)
Connection
,在消息量大的時(shí)候建立
TCP
Connection
的開銷將是巨大的,效率也較低。
Channel
是在
connection 內(nèi)部建立的邏輯連接,如果應(yīng)用程序支持多線程,通常每個(gè)
thread
創(chuàng)建單獨(dú)的
channel
進(jìn)行通訊,
AMQP method
包含了
channel id 幫助客戶端和
message broker
識(shí)別
channel
,所以
channel
之間是完全隔離的。
Channel 作為輕量級(jí)的
Connection
極大減少了操作系統(tǒng)建立
TCP connection
的開銷
Exchange
:
message
到達(dá)
broker
的第一站,根據(jù)分發(fā)規(guī)則,匹配查詢表中的
routing key
,分發(fā)
消息到
queue
中去。常用的類型有:
direct (point-to-point), topic (publish-subscribe) and fanout
(multicast)
Queue
:
消息最終被送到這里等待
consumer
取走
Binding
:
exchange
和
queue
之間的虛擬連接,
binding
中可以包含
routing key
,
Binding 信息被保存到
exchange
中的查詢表中,用于
message
的分發(fā)依據(jù)
1.2.5.安裝Centos7
嘗試網(wǎng)絡(luò)連通性
配置靜態(tài)ip地址
vi /etc/sysconfig/network-scripts/ifcfg-ens33
BROWSER_ONLY=no
BOOTPROTO=static #修改類型
DEFROUTE=yes
IPV4_FAILURE_FATAL=no
IPV6INIT=yes
IPV6_AUTOCONF=yes
IPV6_DEFROUTE=yes
IPV6_FAILURE_FATAL=no
IPV6_ADDR_GEN_MODE=stable-privacy
NAME=ens33
UUID=bc21e331-eea2-4f5b-b229-c9794f1c5048
DEVICE=ens33
ONBOOT=yes #修改啟動(dòng)
IPADDR=192.168.200.10 #ip
NETMASK=255.255.255.0 #子網(wǎng)掩碼
GATEWAY=192.168.200.2 #網(wǎng)關(guān)
DNS1=8.8.8.8
DNS2=114.114.114.114
systemctl restart network #重啟網(wǎng)絡(luò)
ping www.baidu.com
1.2.5.安裝RabbitMQ
1.
官網(wǎng)地址
https://www.rabbitmq.com/download.html
2.
文件上傳
上傳到
/usr/local/software
目錄下
(
如果沒有
software
需要自己創(chuàng)建
)
-rw-r--r--. 1 root root 43K Jun 15 ?2021 rabbitmq_delayed_message_exchange-3.8.0.ez
-rw-r--r--. 1 root root 15M Jun 15 ?2021 rabbitmq-server-3.8.8-1.el7.noarch.rpm
-rw-r--r--. 1 root root 18M Jun 15 ?2021 erlang-21.3-1.el7.x86_64.rpm
3.安裝文件(分別按照以下順序安裝)
1.逐個(gè)安裝軟件
rpm -ivh erlang-21.3-1.el7.x86_64.rpm
yum install socat -y
rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm
2.查看服務(wù)狀態(tài)
/sbin/service rabbitmq-server status
3.啟動(dòng)服務(wù)
/sbin/service rabbitmq-server start
4.添加開機(jī)啟動(dòng) RabbitMQ 服務(wù)
chkconfig rabbitmq-server on
5.查看服務(wù)狀態(tài)
/sbin/service rabbitmq-server status
6.停止服務(wù)(選擇執(zhí)行)
/sbin/service rabbitmq-server stop
7.開啟 web 管理插件
rabbitmq-plugins enable rabbitmq_management
8.啟動(dòng)服務(wù)
/sbin/service rabbitmq-server start
9.用默認(rèn)賬號(hào)密碼(guest)訪問地址 http://192.168.200.10:15672/出現(xiàn)權(quán)限問題
10.查看防火墻狀態(tài)
?systemctl status firewalld
11.關(guān)閉防火墻
?systemctl stop firewalld
12.永久關(guān)閉防火墻
systemctl enable firewalld
13.控制臺(tái)展示訪問地址 http://192.168.200.10:15672/
4.
添加一個(gè)新的用戶
創(chuàng)建賬號(hào)
rabbitmqctl add_user admin 123
設(shè)置用戶角色
rabbitmqctl set_user_tags admin administrator
設(shè)置用戶權(quán)限
? ? 語法?set_permissions [-p
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
用戶 user_admin 具有/vhost1 這個(gè) virtual host 中所有資源的配置、寫、讀權(quán)限
當(dāng)前用戶和角色
rabbitmqctl list_users
5.
再次利用
admin
用戶登錄
6.
重置命令
關(guān)閉應(yīng)用的命令為
rabbitmqctl stop_app
清除的命令為
rabbitmqctl reset
重新啟動(dòng)命令為
rabbitmqctl start_app
2. Hello World
“
P
”是我們的生產(chǎn)者,“
C
”是我們的消費(fèi)者。中間的框是一個(gè)隊(duì)列
-RabbitMQ
代
表使用者保留的消息緩沖區(qū)
2.1. 依賴
2.2. 消息生產(chǎn)者
public class Producer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
//創(chuàng)建一個(gè)連接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("182.92.234.71");
factory.setUsername("admin");
factory.setPassword("123");
//channel 實(shí)現(xiàn)了自動(dòng) close 接口 自動(dòng)關(guān)閉 不需要顯示關(guān)閉
try(Connection connection = factory.newConnection();Channel channel =
connection.createChannel()) {
/**
* 生成一個(gè)隊(duì)列
* 1.隊(duì)列名稱
* 2.隊(duì)列里面的消息是否持久化 默認(rèn)消息存儲(chǔ)在內(nèi)存中
* 3.該隊(duì)列是否只供一個(gè)消費(fèi)者進(jìn)行消費(fèi) 是否進(jìn)行共享 true 可以多個(gè)消費(fèi)者消費(fèi)
* 4.是否自動(dòng)刪除 最后一個(gè)消費(fèi)者端開連接以后 該隊(duì)列是否自動(dòng)刪除 true 自動(dòng)刪除
* 5.其他參數(shù)
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String message="hello world";
/**
* 發(fā)送一個(gè)消息
* 1.發(fā)送到那個(gè)交換機(jī)
* 2.路由的 key 是哪個(gè)
* 3.其他的參數(shù)信息
* 4.發(fā)送消息的消息體
*/
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("消息發(fā)送完畢");
}
}
}
2.3. 消息消費(fèi)者
public class Consumer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("182.92.234.71");
factory.setUsername("admin");
factory.setPassword("123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
System.out.println("等待接收消息....");
//推送的消息如何進(jìn)行消費(fèi)的接口回調(diào)
DeliverCallback deliverCallback=(consumerTag,delivery)->{
String message= new String(delivery.getBody());
System.out.println(message);
};
//取消消費(fèi)的一個(gè)回調(diào)接口 如在消費(fèi)的時(shí)候隊(duì)列被刪除掉了
CancelCallback cancelCallback=(consumerTag)->{
System.out.println("消息消費(fèi)被中斷");
};
/**
* 消費(fèi)者消費(fèi)消息
* 1.消費(fèi)哪個(gè)隊(duì)列
* 2.消費(fèi)成功之后是否要自動(dòng)應(yīng)答 true 代表自動(dòng)應(yīng)答 false 手動(dòng)應(yīng)答
* 3.消費(fèi)者未成功消費(fèi)的回調(diào)
*/
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
柚子快報(bào)激活碼778899分享:RabbitMq筆記
文章鏈接
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場(chǎng)。
轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。