機(jī)械網(wǎng)站推廣怎么做公眾號(hào)軟文素材
市面常見(jiàn)消息隊(duì)列中間件對(duì)比
技術(shù)名稱 | 吞吐量 /IO/并發(fā) | 時(shí)效性(類似延遲)消息到達(dá)時(shí)間 | 可用性 | 可靠性 | 優(yōu)勢(shì) | 應(yīng)用場(chǎng)景 |
---|---|---|---|---|---|---|
activemq | 萬(wàn)級(jí) | 高 | 高 | 高 | 簡(jiǎn)單易學(xué) | 中小型企業(yè)、項(xiàng)目 |
rabbitmq | 萬(wàn)級(jí) | 極高(微秒) | 高 | 極高 | 生態(tài)好(基本什么語(yǔ)言都支持)、時(shí)效性高、易學(xué) | 適合絕大數(shù)的分布式應(yīng)用 |
kafka | 10萬(wàn) QBS | 高(毫秒) | 極高 | 極高 | 吞吐量大、可靠性、可用性、強(qiáng)大的數(shù)據(jù)流處理能力 | 適合大規(guī)模處理數(shù)據(jù)的場(chǎng)景、比如構(gòu)建日志手機(jī)系統(tǒng)、實(shí)時(shí)數(shù)據(jù)傳輸、事件流收集傳輸 |
rocketmq | 10萬(wàn) QBS | 高ms | 極高 | 極高 | 吞吐量大、可靠性、可用性、可擴(kuò)展性 | 適用于金融等可靠性要求較高的場(chǎng)景、適合大規(guī)模的消息處理。金融、電商、大規(guī)模社交 |
pulsar | 10萬(wàn) QBS | 高ms | 高 | 極高 | 可靠性、可用性很高、新興(技術(shù)架構(gòu)先進(jìn)) | 適合大規(guī)模、高并發(fā)的分布式系統(tǒng)(云原生)適合實(shí)時(shí)分析、事件流處理、物聯(lián)網(wǎng)數(shù)據(jù)處理。 |
RabbitMQ?
RabbitMQ 是基于 AMQP 高級(jí)消息隊(duì)列協(xié)議的。
?實(shí)際使用可根據(jù)官方文檔的 demo 。
官方文檔:RabbitMQ Tutorials | RabbitMQ
模型
生產(chǎn)者:通俗就是發(fā)消息的人,比如在外賣軟件上點(diǎn)餐的人
消費(fèi)者:通俗就是處理消息的任務(wù),比如外賣軟件上的商家,需要根據(jù)顧客的要求制作餐
交換機(jī):負(fù)責(zé)把消息轉(zhuǎn)發(fā)到對(duì)應(yīng)的隊(duì)列中,比如有外賣的時(shí)候,系統(tǒng)給附近的外面小哥派單
隊(duì)列:存放消息的地方,等待消費(fèi)者消費(fèi),比如商家肯定不是只做一份餐,做好的餐放在一個(gè)指定的位置等待外賣小哥來(lái)取餐
路由:轉(zhuǎn)發(fā),就是怎么把消息從一個(gè)地方轉(zhuǎn)到另一個(gè)地方,通常加在交換機(jī)和隊(duì)列之間,比如系統(tǒng)指定某個(gè)范圍的外賣小哥接這單
安裝
1. 首先安裝 RabbitMQ,直接官網(wǎng)下載即可,如果下載速度慢,可以換個(gè)網(wǎng)絡(luò),或者找找有沒(méi)有國(guó)內(nèi)鏡像。(當(dāng)初我下載的時(shí)候找了半天的鏡像都是版本比較老的,結(jié)果想著掛一晚上下載,結(jié)果官網(wǎng)突然就快了,白折騰了。)
官方網(wǎng)站:Installing on Windows | RabbitMQ
國(guó)內(nèi)鏡像:Index of rabbitmq-server-local/v3.12.7
一路 next ,傻瓜式安裝即可
安裝之后檢查服務(wù)中是否已經(jīng)運(yùn)行了。
2. 安裝監(jiān)控面板
在 RabbitMQ 安裝目錄下的 sbin 目錄下的CMD 輸入下面的命令
rabbitmq-plugins.bat enable rabbitmq_management
?安裝成功:
默認(rèn)端口號(hào)是 5672,webUI 是 15672
在瀏覽器輸入地址打開(kāi)管理界面:http://localhost:15672
默認(rèn)賬號(hào)密碼是 guest
注意:1. 安裝目錄不能是中文,不能有空格等非法字符,否則頁(yè)面打不開(kāi)
????????? ?2. 如果想要在遠(yuǎn)程服務(wù)器訪問(wèn) RabbitMQ 管理面板,需要?jiǎng)?chuàng)建管理員賬號(hào),比如在寶塔面板使用時(shí)寶塔面板提供的 admin賬號(hào),地址就是寶塔面板的 IP?
創(chuàng)建賬號(hào):access-control | RabbitMQ
入門
依賴引入
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.20.0</version>
</dependency>
單消費(fèi)者和生產(chǎn)者
一對(duì)一的關(guān)系
1. 生產(chǎn)者代碼
public class SingleProducer {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {//創(chuàng)建連接ConnectionFactory factory = new ConnectionFactory();//設(shè)置了本地連接,如果修改了用戶名和密碼,需要設(shè)置/*factory.setPassword();factory.setUsername();*/factory.setHost("localhost");//建立連接、創(chuàng)建頻道//頻道,類似客戶端,用于調(diào)用serverConnection connection = factory.newConnection();Channel channel = connection.createChannel();//創(chuàng)建隊(duì)列,隊(duì)列持久化,第二份參數(shù)設(shè)置為 truechannel.queueDeclare(QUEUE_NAME, false, false, false, null);String message = "Hello World!";//發(fā)送消息channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));System.out.println(" [*] Waiting for messages. To exit press CTRL+C");}
}
channel 頻道:理解為操作消息隊(duì)列的 Client,通過(guò) channel 收發(fā)消息,提供了和消息對(duì)了 server 建立通信的傳輸方法
channel.queueDeclare 方法參數(shù):
queue:這是一個(gè)字符串參數(shù),代表要聲明的隊(duì)列的名稱。如果隊(duì)列不存在,則會(huì)自動(dòng)創(chuàng)建一個(gè)新的隊(duì)列。
durable:這是一個(gè)布爾值參數(shù),表示隊(duì)列是否持久化。如果設(shè)置為true,則隊(duì)列會(huì)在服務(wù)器重啟后仍然存在;如果設(shè)置為false,則隊(duì)列在服務(wù)器重啟后會(huì)被刪除。默認(rèn)值為false。
exclusive:這也是一個(gè)布爾值參數(shù),表示隊(duì)列是否為獨(dú)占模式。如果設(shè)置為true,則只有當(dāng)前連接可以訪問(wèn)該隊(duì)列;如果設(shè)置為false,則其他連接也可以訪問(wèn)該隊(duì)列。默認(rèn)值為false。
autoDelete:這是另一個(gè)布爾值參數(shù),表示隊(duì)列是否自動(dòng)刪除。如果設(shè)置為true,則當(dāng)最后一個(gè)消費(fèi)者取消訂閱時(shí),隊(duì)列將被刪除;如果設(shè)置為false,則隊(duì)列將一直存在,直到手動(dòng)刪除或服務(wù)器重啟。默認(rèn)值為false。
arguments:這是一個(gè)可選參數(shù),用于設(shè)置隊(duì)列的其他屬性,比如消息的最大長(zhǎng)度、最大優(yōu)先級(jí)等。
channel.basicPublish 參數(shù):
exchange:這是一個(gè)字符串參數(shù),代表交換機(jī)的名稱。如果不需要使用特定的交換機(jī),可以傳遞一個(gè)空字符串("")。交換機(jī)是RabbitMQ中用于接收生產(chǎn)者發(fā)送的消息并根據(jù)綁定規(guī)則路由到隊(duì)列的組件。
routingKey:這也是一個(gè)字符串參數(shù),它指定了發(fā)布消息的隊(duì)列。無(wú)論通道綁定到哪個(gè)隊(duì)列,最終發(fā)布的消息都會(huì)包含這個(gè)指定的路由鍵。路由鍵是用來(lái)確定消息應(yīng)該發(fā)送到哪個(gè)隊(duì)列的重要信息。
message:這是要發(fā)布的消息本身,通常是字節(jié)數(shù)組的形式。
properties:這是一個(gè)可選參數(shù),用于設(shè)置消息的屬性,比如消息的優(yōu)先級(jí)、過(guò)期時(shí)間等。
在使用channel.basicPublish
時(shí),需要注意以下幾點(diǎn):
exchange和routingKey不能為空:在AMQImpl類中的實(shí)現(xiàn)要求這兩個(gè)參數(shù)都不能為null,否則會(huì)拋出異常。
交換機(jī)類型:根據(jù)不同的需求,可以選擇不同類型的交換機(jī),如fanout、direct或topic。每種類型的交換機(jī)都有其特定的路由規(guī)則。
非命名隊(duì)列:在某些情況下,比如日志系統(tǒng),可以使用非命名隊(duì)列,這樣消費(fèi)者可以接收到所有相關(guān)的日志消息,而不是特定的部分。
2. 消費(fèi)者代碼
public class SingleConsumer {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//聲明隊(duì)列,同一個(gè)消息隊(duì)列參數(shù)必須一致channel.queueDeclare(QUEUE_NAME, false, false, false, null);//定義了如何處理消息DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};//接收、消費(fèi)消息channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });System.out.println(" [*] Waiting for messages. To exit press CTRL+C");}
}
?channel.basicConsume 參數(shù):
- queue:這是一個(gè)字符串參數(shù),代表要消費(fèi)的隊(duì)列的名稱。如果隊(duì)列不存在,則會(huì)拋出異常。
- onMessage:這是一個(gè)回調(diào)函數(shù),當(dāng)有新的消息到達(dá)時(shí)會(huì)被調(diào)用。該函數(shù)需要接收兩個(gè)參數(shù):一個(gè)表示消息內(nèi)容的Delivery對(duì)象和一個(gè)表示通道的Channel對(duì)象。
- consumerTag:這是一個(gè)可選參數(shù),用于標(biāo)識(shí)消費(fèi)者。如果沒(méi)有指定,則會(huì)自動(dòng)生成一個(gè)唯一的標(biāo)識(shí)符。
- autoAck:這是一個(gè)布爾值參數(shù),表示是否自動(dòng)確認(rèn)消息。如果設(shè)置為true,則在消息被處理后會(huì)自動(dòng)發(fā)送確認(rèn)信息;如果設(shè)置為false,則需要手動(dòng)發(fā)送確認(rèn)信息。默認(rèn)值為false。
- arguments:這是一個(gè)可選參數(shù),用于設(shè)置消費(fèi)者的其他屬性,比如消息的最大長(zhǎng)度、最大優(yōu)先級(jí)等。
在使用channel.basicConsume
時(shí),需要注意以下幾點(diǎn):
- 隊(duì)列名稱:隊(duì)列名稱應(yīng)該是唯一的,否則會(huì)拋出異常。
- 消息處理:在
onMessage
回調(diào)函數(shù)中,需要對(duì)消息進(jìn)行處理,并根據(jù)需要發(fā)送確認(rèn)信息。 - 消費(fèi)者標(biāo)識(shí)符:可以通過(guò)設(shè)置
consumerTag
來(lái)標(biāo)識(shí)消費(fèi)者,以便在后續(xù)操作中進(jìn)行識(shí)別和管理。 - 消費(fèi)者屬性:可以通過(guò)設(shè)置消費(fèi)者的其他屬性來(lái)控制消費(fèi)者的行為,比如設(shè)置消息的最大長(zhǎng)度、最大優(yōu)先級(jí)等。
多消費(fèi)者
多個(gè)消費(fèi)者,比如一個(gè)工廠生產(chǎn)商品,一個(gè)商店賣不完,分給多個(gè)商店一起賣
生產(chǎn)者代碼和上面一樣
public class MultiProducer {private static final String TASK_QUEUE_NAME = "multi_queue";public static void main(String[] argv) throws Exception {//創(chuàng)建連接ConnectionFactory factory = new ConnectionFactory();//設(shè)置本地連接factory.setHost("localhost");//創(chuàng)建隊(duì)列,創(chuàng)建頻道,類似客戶端try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {//隊(duì)列持久化channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);//設(shè)置消息Scanner scanner = new Scanner(System.in);while(scanner.hasNext()){//輸入消息String message = scanner.nextLine();//發(fā)送消息channel.basicPublish("", TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "'");}}}
}
控制處理任務(wù)的積壓數(shù),最多同時(shí)處理任務(wù)數(shù)
channel.basicQos(1); //最多處理1個(gè)
消息確認(rèn)機(jī)制
ack 確認(rèn)、nack 消息失敗、reject 拒絕
當(dāng)消息拿走之后會(huì)有一個(gè)確認(rèn)機(jī)制,保證消息成功被消費(fèi)。當(dāng)消費(fèi)者接收消息會(huì)給一個(gè)反饋,確認(rèn)消息的狀態(tài),成功消息才會(huì)被移除。
支持配置 autoack ,建議修改為 false,根據(jù)實(shí)際情況手動(dòng)確認(rèn)。
//手動(dòng)確認(rèn)
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
//手動(dòng)拒絕
channel.basicNack(delivery.getEnvelope().getDeliveryTag(),false,false);
?消費(fèi)者代碼
public class MultiConsumer {private static final String TASK_QUEUE_NAME = "multi_queue";public static void main(String[] argv) throws Exception {//創(chuàng)建連接ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");for (int i = 0; i < 2; i++) {final Connection connection = factory.newConnection();final Channel channel = connection.createChannel();//隊(duì)列持久化,參數(shù)要一致channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");//控制處理任務(wù)的積壓數(shù),最多同時(shí)處理任務(wù)數(shù)channel.basicQos(1);//定義了如何處理消息int finalI = i;DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");try {//處理工作的邏輯System.out.println(" [x] Received '" +"消費(fèi)者:" + finalI + " 消息:"+ message + "'");//睡一定時(shí)間,模擬機(jī)器處理能力有限Thread.sleep(20000);channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);} catch (InterruptedException e) {throw new RuntimeException(e);} finally {System.out.println(" [x] Done");channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}};//接收消息,消費(fèi)消息,開(kāi)啟消息監(jiān)聽(tīng)channel.basicConsume(TASK_QUEUE_NAME, false , deliverCallback, consumerTag -> {});}}
}