網(wǎng)站建設(shè)專用圖形庫seo網(wǎng)站內(nèi)容優(yōu)化有哪些
文章目錄
- RocketMQ 介紹
- 為什么要使用 MQ ?
- RocketMQ 與其他產(chǎn)品對比
- vs Kafka
- vs RabbitMQ
- vs ActiveMQ
- RocketMQ 重要概念
- 部署 Namesrver、Broker、Dashboard
- 快速入門
- 消息生產(chǎn)者
- 消息消費者
- 消費模式
- 簡單消息
- 1)同步發(fā)送
- 2)異步發(fā)送
- 3)單向發(fā)送
- 順序消息
- 順序消息生產(chǎn)者
- 順序消息消費者
- 廣播消息
- 廣播消息消費模式
- 延遲消息
- 預(yù)定日程生產(chǎn)者
- 預(yù)定日程消費者
- 指定時間生產(chǎn)者
- 指定時間消費者
- RocketMQ 如何保證消息可靠性?
- RocketMQ 如何解決消息積壓問題?
RocketMQ 介紹
RocketMQ是一款由阿里巴巴開源的分布式消息中間件。它具有低延遲、高吞吐量、高可用性和高可靠性等特點,適用于構(gòu)建具有海量消息堆積和異步解耦功能的應(yīng)用系統(tǒng)。
為什么要使用 MQ ?
作用 | 描述 |
---|---|
異步 | 系統(tǒng)耦合度降低,沒有強(qiáng)依賴關(guān)系 |
削峰 | 不需要同步執(zhí)行的遠(yuǎn)程調(diào)用可以有效提高響應(yīng)時間 |
解耦 | 請求達(dá)到峰值后,后端或者數(shù)據(jù)庫還可以保持固定消費速率消費,不會被壓垮 |
RocketMQ 與其他產(chǎn)品對比
vs Kafka
- 數(shù)據(jù)可靠性:RocketMQ 提供多種可靠性保證,Kafka 在極端情況下可能丟失數(shù)據(jù)。
- 實時性:RocketMQ 在消息實時性上表現(xiàn)更佳。
- 隊列數(shù)與性能:RocketMQ 支持更多隊列,Kafka 在高分區(qū)下性能下降。
- 消息順序性:RocketMQ 支持嚴(yán)格順序,Kafka 可能產(chǎn)生亂序。
- 生態(tài):Kafka 生態(tài)更豐富,RocketMQ 與阿里技術(shù)棧集成好。
vs RabbitMQ
- 性能:RocketMQ 在高并發(fā)和海量消息處理上表現(xiàn)更優(yōu)。
- 消息模型:RabbitMQ 模型靈活,RocketMQ 注重順序和事務(wù)。
- 適用場景:RabbitMQ 適用于可靠消息傳遞,RocketMQ 適用于高性能場景。
vs ActiveMQ
- 跨平臺與持久化:ActiveMQ 支持多種協(xié)議和數(shù)據(jù)庫持久化,RocketMQ 持久化機(jī)制高效。
- 靈活性:ActiveMQ 協(xié)議廣泛,RocketMQ 多語言SDK集成友好。
- 社區(qū)與文檔:ActiveMQ 社區(qū)活躍度較低,RocketMQ 開發(fā)活躍但社區(qū)成熟度不及 RabbitMQ。
總結(jié):RocketMQ 在數(shù)據(jù)可靠性、實時性、隊列數(shù)與性能上具有優(yōu)勢,適合高性能和順序消息場景。
RocketMQ 重要概念
Producer:消息的發(fā)送者,生產(chǎn)者 (發(fā)件人)
Consumer:消息接收者,消費者 (取件人)
Broker:暫時和傳輸消息的通道 (快遞)
NameServer:管理Broker的;負(fù)責(zé)消息的存儲和轉(zhuǎn)發(fā),接收生產(chǎn)者產(chǎn)生的消息并持久化消息;當(dāng)用戶發(fā)送的消息被發(fā)送到Broker時,Broker會將消息轉(zhuǎn)發(fā)到與之關(guān)聯(lián)的Topic中,以便讓更多的接收者進(jìn)行處理;各個快遞公司的管理機(jī)構(gòu),相當(dāng)于Broker的注冊中心,保留了broker的信息 (監(jiān)測快遞是否健康)
Queue:隊列,消息存放的位置,一個Broker中可以有多個隊列 (驛站)
Topic:主題,消息的分類,用于標(biāo)識同一類業(yè)務(wù)邏輯的消息 (取件碼)
ConsumerGroup:消費者組,RocketMQ 中承載多個消費行為一致的消費者負(fù)載均衡分組。和消費者不同,消費者組是一個邏輯概念。
部署 Namesrver、Broker、Dashboard
Docker部署RocketMQ5.x (單機(jī)部署+配置參數(shù)詳解+不使用docker-compose直接部署)_rocketmq不推薦用docker部署-CSDN博客
快速入門
1)創(chuàng)建一個基于 Maven 的 SpringBoot 項目,并添加以下依賴
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>5.1.0</version>
</dependency>
消息生產(chǎn)者
- 創(chuàng)建消息生產(chǎn)者producer,并指定生產(chǎn)者組名
- 指定Nameserver地址
- 啟動producer
- 創(chuàng)建消息對象,指定主題Topic、Tag和消息體
- 發(fā)送消息
- 關(guān)閉生產(chǎn)者
public class SyncProducer {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("SyncProducer");producer.setNamesrvAddr("10.226.8.14:9876");producer.start();for (int i = 0; i < 2; i++) {Message msg = new Message("Simple", //主題"TagA", //設(shè)置消息Tag,用于消費端根據(jù)指定Tag過濾消息。"Simple-Sync".getBytes(StandardCharsets.UTF_8) //消息體。);SendResult send = producer.send(msg);System.out.printf(i + ".發(fā)送消息成功:%s%n", send);}producer.shutdown();}
}
消息消費者
- 創(chuàng)建消費者comsumer、指定消費者組名
- 指定Nameserver地址
- 創(chuàng)建監(jiān)聽訂閱主題Topic和Tag等
- 處理消息
- 啟動消費者comsumer
public class Consumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("SimplePushConsumer");pushConsumer.setNamesrvAddr("10.226.8.14:9876");pushConsumer.subscribe("Simple","*");pushConsumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {list.forEach( n->{System.out.printf("收到消息: %s%n" , n);});return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});pushConsumer.start();System.out.printf("Consumer Started.%n");}
}
消費模式
MQ 的消費模式可以大致分為兩種,一種是推 Push,一種是拉 Pull。
Push 是服務(wù)端主動推送消息給客戶端,優(yōu)點是及時性較好,但如果客戶端沒有做好流控,旦服務(wù)端推送大量消息到客戶端時,就會導(dǎo)致客戶端消息堆積甚至崩潰。
Pull 是客戶端需要主動到服務(wù)端取數(shù)據(jù),優(yōu)點是客戶端可以依據(jù)自己的消費能力進(jìn)行消費但拉取的頻率也需要用戶自己控制,拉取頻繁容易造成服務(wù)端和客戶端的壓力,拉取間隔長又容易造成消費不及時。
Push 模式也是基于 pull 模式的,只能客戶端內(nèi)部封裝了 api,一般場景下,上游消息生產(chǎn)量小或者均速的時候,選擇 push 模式。在特殊場景下,例如電商大促,搶優(yōu)惠券等場景可以選擇 pull 模式
簡單消息
1)同步發(fā)送
可靠性要求高、數(shù)據(jù)量級少、實時響應(yīng),具體實現(xiàn)參考上面的入門代碼
2)異步發(fā)送
不等待消息返回直接進(jìn)入后續(xù)流程。
public class AsyncProducer {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("AsyncProducer");producer.setNamesrvAddr("10.226.8.14:9876");producer.start();CountDownLatch countDownLatch = new CountDownLatch(100);//計數(shù)for (int i = 0; i < 100; i++) {Message message = new Message("Simple", "TagA", "Simple-Async".getBytes(StandardCharsets.UTF_8));final int index = i;producer.send(message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {countDownLatch.countDown();System.out.printf("%d 消息發(fā)送成功%s%n", index, sendResult);}@Overridepublic void onException(Throwable throwable) {countDownLatch.countDown();System.out.printf("%d 消息失敗%s%n", index, throwable);throwable.printStackTrace();}});}countDownLatch.await(5, TimeUnit.SECONDS);producer.shutdown();}
}
3)單向發(fā)送
只負(fù)責(zé)發(fā)送,不管消息是否發(fā)送成功。
public class OnewayProducer {public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("AsyncProducer");producer.setNamesrvAddr("10.226.8.14:9876");producer.start();for (int i = 0; i < 10; i++) {Message message = new Message("Simple","TagA", "Simple-Oneway".getBytes(StandardCharsets.UTF_8));producer.sendOneway(message);System.out.printf("%d 消息發(fā)送完成 %n" , i);}Thread.sleep(5000);producer.shutdown();}
}
順序消息
順序消息指生產(chǎn)者局部有序發(fā)送到一個queue,但多個queue之間是全局無序的。
- 順序消息生產(chǎn)者樣例:通過MessageQueueSelector將消息有序發(fā)送到同一個queue中。
- 順序消息消費者樣例:通過MessageListenerOrderly消費者每次讀取消息都只從一個queue中獲取(通過加鎖的方式實現(xiàn))。
順序消息生產(chǎn)者
public class OrderProducer {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("OrderProducer");producer.setNamesrvAddr("10.226.8.14:9876");producer.start();for (int j = 0; j < 5; j++) {for (int i = 0; i < 10; i++) {Message message = new Message("OrderTopic","TagA",("order_" + j + "_step_" + i).getBytes(StandardCharsets.UTF_8));SendResult sendResult = producer.send(message, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> list, Message message, Object o) {Integer id = (Integer) o;int index = id % list.size();return list.get(index);}}, j);System.out.printf("%s%n", sendResult);}}producer.shutdown();}
}
順序消息消費者
public class OrderConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OrderConsumer");consumer.setNamesrvAddr("10.226.8.14:9876");consumer.subscribe("OrderTopic","*");consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {list.forEach(n->{System.out.println("QueueId:"+n.getQueueId() + "收到消息內(nèi)容 "+new String(n.getBody()));});return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");}
}
廣播消息
廣播消息并沒有特定的消息消費者樣例,這是因為這涉及到消費者的集群消費模式。
- MessageModel.BROADCASTING:廣播消息。一條消息會發(fā)給所有訂閱了對應(yīng)主題的消費者,不管消費者是不是同一個消費者組。
- MessageModel.CLUSTERING:集群消息。每一條消息只會被同一個消費者組中的一個實例消費。
廣播消息消費模式
public class BroadcastConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("BroadCastConsumer");consumer.setNamesrvAddr("10.226.8.14:9876");consumer.subscribe("simple","*");consumer.setMessageModel(MessageModel.BROADCASTING); //廣播模式consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {list.forEach(n->{System.out.println("QueueId:"+n.getQueueId() + "收到消息內(nèi)容 "+new String(n.getBody()));});return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf("Broadcast Consumer Started.%n");}
}
延遲消息
延遲消息實現(xiàn)的效果就是在調(diào)用producer.send方法后,消息并不會立即發(fā)送出去,而是會等一段時間再發(fā)送出去。這是RocketMQ特有的一個功能。
- message.setDelayTimeLevel(3):預(yù)定日常定時發(fā)送。1到18分別對應(yīng)messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h;可以在dashboard中broker配置查看。
- msg.setDelayTimeMs(10L):指定時間定時發(fā)送。默認(rèn)支持最大延遲時間為3天,可以根據(jù)broker配置:timerMaxDelaySec修改。
預(yù)定日程生產(chǎn)者
public class ScheduleProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("ScheduleProducer");producer.setNamesrvAddr("10.226.8.14:9876");producer.start();for (int i = 0; i < 2; i++) {Message msg = new Message("Schedule", //主題"TagA", //設(shè)置消息Tag,用于消費端根據(jù)指定Tag過濾消息。"ScheduleProducer".getBytes(StandardCharsets.UTF_8) //消息體。);//1到18分別對應(yīng)messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2hmsg.setDelayTimeLevel(3);producer.send(msg);System.out.printf(i + ".發(fā)送消息成功:%s%n", LocalTime.now());}producer.shutdown();}
}
預(yù)定日程消費者
public class ScheduleConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("SimplePushConsumer");pushConsumer.setNamesrvAddr("10.226.8.14:9876");pushConsumer.subscribe("Schedule","*");pushConsumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {list.forEach( n->{System.out.printf("接收時間:%s %n", LocalTime.now());});return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});pushConsumer.start();System.out.printf("Simple Consumer Started.%n");}
}
指定時間生產(chǎn)者
public class TimeProducer {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("TimeProducer");producer.setNamesrvAddr("192.168.43.137:9876");producer.start();for (int i = 0; i < 2; i++) {Message msg = new Message("Schedule", //主題"TagA", //設(shè)置消息Tag,用于消費端根據(jù)指定Tag過濾消息。"TimeProducer".getBytes(StandardCharsets.UTF_8) //消息體。);// 相對時間:延時消息。此消息將在 10 秒后傳遞給消費者。msg.setDelayTimeMs(10000L);// 絕對時間:定時消息。設(shè)置一個具體的時間,然后在這個時間之后多久在進(jìn)行發(fā)送消息// msg.setDeliverTimeMs(System.currentTimeMillis() + 10000L);producer.send(msg);System.out.printf(i + ".發(fā)送消息成功:%s%n", LocalTime.now());}producer.shutdown();}
}
指定時間消費者
public class TimeConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("TimeConsumer");pushConsumer.setNamesrvAddr("10.226.8.14:9876");pushConsumer.subscribe("Schedule","*");pushConsumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {list.forEach( n->{System.out.printf("接收時間:%s %n", LocalTime.now());});return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});pushConsumer.start();System.out.printf("Simple Consumer Started.%n");}
}
RocketMQ 如何保證消息可靠性?
我們將消息流程分為三大部分,每一部分都有可能會丟失數(shù)據(jù)。
- 生產(chǎn)階段:Producer 通過網(wǎng)絡(luò)將消息發(fā)送給 Broker,這個發(fā)送可能會發(fā)生丟失。比如網(wǎng)絡(luò)延遲不可達(dá)等。
- 存儲階段:Broker 肯定是先把消息放到內(nèi)存的,然后根據(jù)刷盤策略持久化到硬盤中。剛收到 Producer 的消息,放入內(nèi)存,但是異常宕機(jī)了,導(dǎo)致消息丟失。
- 消費階段:消費失敗。比如先提交ack再消費,處理過程中出現(xiàn)異常,該消息就出現(xiàn)了丟失。
解決方案:
- 生產(chǎn)階段:使用同步發(fā)送失敗重試機(jī)制;異步發(fā)送重寫回調(diào)方法檢查發(fā)送結(jié)果;ACK 確認(rèn)機(jī)制。
- 存儲階段:同步刷盤機(jī)制(默認(rèn)情況下是異步刷盤);集群模式采用同步復(fù)制。
- 消費階段:正常消費處理完成才提交ACK(手動ACK);如果處理異常返回重試標(biāo)識。
RocketMQ 如何解決消息積壓問題?
-
增加消費者數(shù)量:
增加消費者實例的數(shù)量,以提高消息的消費速度。
確保消費者實例數(shù)量與消息隊列數(shù)量匹配,以便每個隊列都有專門的消費者處理。 -
優(yōu)化消費者邏輯:
優(yōu)化消費者的處理邏輯,提高單個消費者的處理效率。
使用批量消費的方式來減少每次消費的開銷。 -
擴(kuò)展消息隊列容量:
增加消息隊列的數(shù)量,以分散消息負(fù)載。
動態(tài)調(diào)整隊列數(shù)量,增加處理能力,實現(xiàn)更高的并行處理。 -
設(shè)置消息消費失敗處理機(jī)制:
實施重試機(jī)制,確保消費失敗的消息能夠被重新處理。
設(shè)置死信隊列(DLQ)來處理多次消費失敗的消息。 -
快速失敗丟棄消息:
如果某些消息可以丟棄,考慮在高峰期快速丟棄這些消息以減輕負(fù)擔(dān)。 -
提升系統(tǒng)性能:
優(yōu)化服務(wù)器性能,增加硬件資源,如CPU、內(nèi)存和網(wǎng)絡(luò)帶寬。
確保網(wǎng)絡(luò)連接的穩(wěn)定性和速度,以減少延遲。