平面設(shè)計(jì)海報(bào)圖片seo按天計(jì)費(fèi)系統(tǒng)
1. 前言
上一篇文章我們介紹了簡單消息的實(shí)現(xiàn),本文將主要來介紹順序消息的實(shí)現(xiàn),順序消息分為局部順序消息和全局順序消息。
順序消息指的是消費(fèi)者在消費(fèi)消息時(shí),按照生產(chǎn)者發(fā)送消息的順序進(jìn)行消費(fèi)。即先發(fā)送的先消費(fèi)【FIFO】。
順序消息分為 全局順序消息和局部順序消息。
全局順序消息就是全局使用一個(gè)queue。
局部順序消息就是 有順序依賴的消息放在同一個(gè)queue中,多個(gè)queue并行消費(fèi)。
2. 局部順序消息
默認(rèn)情況下RocketMQ會(huì)根據(jù)輪詢的方式將消息發(fā)送到某個(gè)broker中的某個(gè)隊(duì)列中,這樣的話就不能保證消息是有序的。
比如在購物網(wǎng)站下單場景下:有 1. 創(chuàng)建訂單---->2. 訂單支付---->3. 訂單發(fā)貨---->4. 訂單完成 四條消息。這四條消息邏輯上肯定是有序的。但是如果采用RocketMQ默認(rèn)的消息投遞方式,那么同一個(gè)訂單,有可能創(chuàng)建訂單被投遞到了 MessageQueue1,訂單支付的話被投遞到了MessageQueue2。 由于消息在不同的MessageQueue中,消費(fèi)者在消費(fèi)的時(shí)候就可能會(huì)出現(xiàn)訂單支付的消息先于創(chuàng)建訂單的消息。
局部順序消息就是要保證同一筆訂單4條消息都放在同一個(gè)queue中,這樣的話就不會(huì)出現(xiàn)訂單支付的消息先于創(chuàng)建訂單的消息被消費(fèi)。就像下圖所示:
局部順序消息消費(fèi)者在消費(fèi)某個(gè)topic的某個(gè)隊(duì)列中的消息的時(shí)候是順序的。消費(fèi)者使用MessageListenerOrderly類來進(jìn)行消息監(jiān)聽。
2.1. 定義生產(chǎn)者
-
這里定義了名為part_order_topic_test的topic。運(yùn)行程序之后該topic可以路由到broker-a 以及broker-b 兩個(gè)broker。
public class OrderProducer {// 局部順序消費(fèi),核心就是自己選擇Queue,保證需要順序保障的消息落到同一個(gè)隊(duì)列中public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer defaultMQProducer = new DefaultMQProducer("order_producer_group");defaultMQProducer.setNamesrvAddr("172.31.184.89:9876");defaultMQProducer.start();for (int i = 0; i < 10; i++) {int orderId = i;for (int j = 0; j < 5; j++) {// 構(gòu)建消息體,tags和key 只是做一個(gè)簡單區(qū)分Message partOrderMsg = new Message("part_order_topic_test", "order_" + orderId, "KEY_" + orderId, ("局部順序消息處理_" + orderId + ";step_" + j).getBytes());SendResult send = defaultMQProducer.send(partOrderMsg, new MessageQueueSelector() {@Override//這里的arg參數(shù)就是外面的orderIdpublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer orderId = (Integer) arg;int index = orderId % mqs.size();return mqs.get(index);}}, orderId);System.out.printf("%s%n", send);}}defaultMQProducer.shutdown();}
}
-
在發(fā)送消息的時(shí)候?qū)崿F(xiàn)MessageQueueSelector接口用于在發(fā)送消息的時(shí)候指定隊(duì)列。其中,
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg)
方法有三個(gè)參數(shù):其中,mqs表示當(dāng)前topic所路由的全部隊(duì)列數(shù),這里就是8個(gè)隊(duì)列,broker-a有4個(gè)隊(duì)列,broker-b有4個(gè)隊(duì)列。msg就是傳入的消息體,arg 就是傳入的orderId。 -
這里根據(jù)orderId與隊(duì)列數(shù)求模取余來獲取消息應(yīng)該發(fā)送到哪個(gè)隊(duì)列中,這樣就保證了相同的orderId的消息會(huì)落到同一個(gè)隊(duì)列中
Integer orderId = (Integer) arg; int index = orderId % mqs.size(); return mqs.get(index);
生產(chǎn)者運(yùn)行結(jié)果(部分截圖)
從運(yùn)行結(jié)果可以看出相同orderId的消息被投遞到了同一個(gè)MessageQueue中,而相同MessageQueue隊(duì)列天然是有順序的。
2.2.定義消費(fèi)者
說完了生產(chǎn)者,接著來說說消費(fèi)者。消費(fèi)者的邏輯主要是在消費(fèi)的時(shí)候需要實(shí)現(xiàn) MessageListenerOrderly 類來進(jìn)行消息監(jiān)聽。核心代碼是:
// 2.訂閱消費(fèi)消息defaultMQPushConsumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {for (MessageExt msg : msgs) {System.out.println("消費(fèi)得到的消息是={}" + msg);System.out.println("消息體內(nèi)容是={}" + new String(msg.getBody()));}return ConsumeOrderlyStatus.SUCCESS;}});
這里啟動(dòng)了三個(gè)消費(fèi)者,不管消費(fèi)者消費(fèi)的順序如何,相同的orderId下的5條消息都是被順序消費(fèi)的。
3. 碰到的問題
在首次調(diào)試的時(shí)候出現(xiàn)了一個(gè) broker is full 的錯(cuò)誤。這是由于磁盤空間不足導(dǎo)致的,可以通過 df -h
命令查看當(dāng)前磁盤空間的占用情況,當(dāng)磁盤空間使用率超過90%的話則會(huì)報(bào)此錯(cuò)。
4. 全局順序消息
全局順序消息是指消費(fèi)者消費(fèi)全部消息都是順序的,只能讓所有的消息都發(fā)送到同一個(gè)MessageQueue中來實(shí)現(xiàn),在高并發(fā)場景下會(huì)非常影響效率。
5. 廣播消息
廣播消息是向主題(topic)的所有訂閱者發(fā)送消息,訂閱同一個(gè)topic的多個(gè)消費(fèi)者,都能全量收到生產(chǎn)者發(fā)送的所有消息。
廣播消息的生產(chǎn)者與普通同步消息的生產(chǎn)者實(shí)現(xiàn)是一致的,不同的是消費(fèi)者的消息模式不同。這里給出消費(fèi)者實(shí)現(xiàn)的不同之處。
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("broadCastGroup");defaultMQPushConsumer.setNamesrvAddr("172.31.184.89:9876");// 設(shè)置消費(fèi)者的模式是廣播模式defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING);//從第一位開始消費(fèi)defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
6. 延遲消息
延遲消息與普通消息的不同之處在于,它們要在指定的時(shí)間之后才會(huì)被傳遞。生產(chǎn)者并不會(huì)延遲發(fā)送消息,而是發(fā)送到topic里面,消費(fèi)者延遲指定的時(shí)間進(jìn)行消費(fèi)。
6.1. 延遲消息生產(chǎn)者
DefaultMQProducer defaultMQProducer = new DefaultMQProducer("scheduled_group");defaultMQProducer.setNamesrvAddr("172.31.186.180:9876");defaultMQProducer.start();for (int i = 0; i < 100; i++) {Message message = new Message("Schedule_topic", ("延遲消息測試" + i).getBytes());//設(shè)置延遲級別,默認(rèn)有18個(gè)延遲級別,這個(gè)消息將延遲10秒消費(fèi)message.setDelayTimeLevel(3);defaultMQProducer.send(message);}System.out.println("所有延遲消息發(fā)送完成");defaultMQProducer.shutdown();
延遲消息生產(chǎn)者與普通消息生產(chǎn)者主要的區(qū)別是延遲消息需要調(diào)用 setDelayTimeLevel
方法設(shè)置延遲級別,這里設(shè)置級別是3,則是延遲10秒。RocketMQ提供了18種延遲級別??梢栽?RocketMQ的儀表板中的集群中的broker配置中找到。
延遲消息的消費(fèi)者與普通消息的消費(fèi)者相同的。RocketMQ內(nèi)部通過名為SCHEDULE_TOPIC_XXXX 的topic來存放延遲消息。
7.批量消息
批量發(fā)送消息提高了傳遞消息的性能。官方建議批量消息的總大小不應(yīng)超過1M,實(shí)際不應(yīng)超過4M。如果超過4M的批量消息需要進(jìn)行分批處理。同時(shí)設(shè)置broker的配置參數(shù)為4M(在broker的配置文件中修改:maxMessageSize=4194304)。核心代碼如下:
//4.創(chuàng)建消息List<Message> messageList = new ArrayList<>();for (int i = 0; i < 100*100; i++) {// 創(chuàng)建消息,指定topic,以及消息體messageList.add(new Message("batch_topic", ("飛哥測試批量消息" + i).getBytes()));}//批量消息消息小于4M的處理SendResult send = defaultMQProducer.send(messageList);System.out.println(send);
8.過濾消息
使用tag過濾
在大多數(shù)情況下,標(biāo)簽是一種簡單而有用的設(shè)計(jì),可以用來選擇你想要的消息。
首先是根據(jù)tag來過濾消息,生產(chǎn)者在發(fā)送消息的時(shí)候指定該消息的tag標(biāo)簽,消費(fèi)者則可以根據(jù)tag來過濾消息。
8.1. 過濾消息生產(chǎn)者
這里定義了三個(gè)tag,分別是tagA,tagB以及tagC,生產(chǎn)者在生產(chǎn)消息的時(shí)候給每個(gè)消息指定不同的tag。
DefaultMQProducer defaultMQProducer = new DefaultMQProducer("TagProducer_group");defaultMQProducer.setNamesrvAddr("172.31.184.89:9876");defaultMQProducer.start();String[] tags = new String[]{"tagA", "tagB", "tagC"};for (int i = 0; i < 15; i++) {Message message = new Message("TagFilterTest", tags[i % tags.length], ("飛哥tag消息過濾" + tags[i % tags.length]).getBytes());SendResult send = defaultMQProducer.send(message);System.out.printf("%s%n", send);}defaultMQProducer.shutdown();
8.2. 過濾消息的消費(fèi)者
消費(fèi)者過濾出了標(biāo)簽帶有tagA以及tagC的消息進(jìn)行消費(fèi)。這里其實(shí)是broker將consumer需要的消息推給消費(fèi)者。
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("tagConsumer");defaultMQPushConsumer.setNamesrvAddr("172.31.184.89:9876");defaultMQPushConsumer.subscribe("TagFilterTest", "tagA||tagC");defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {for (MessageExt msg : msgs) {System.out.println("接收到的消息=" + msg);System.out.println("接收到的消息體=" + new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});defaultMQPushConsumer.start();System.out.println("消費(fèi)者已經(jīng)啟動(dòng)");
使用SQL過濾
SQL 功能可以通過發(fā)送消息時(shí)輸入的屬性進(jìn)行一些計(jì)算,在RocketMQ定義的語法下,可以實(shí)現(xiàn)一些有趣的邏輯。
語法
RocketMQ只定義了一些基本的語法類支持這個(gè)特性。
1. 數(shù)值比較:如 `>`,`>=`,`<=`,`BETWEEN`,`=`;
2. 字符比較:如 `=`,'<>',`IN`;
3. `IS NULL` 或 `IS NOT NULL` ;
4. 邏輯`AND`,`OR`,`NOT`;
常量類型有:
1. 數(shù)字,如 123,
2. 字符,如 'abc',必須用單引號;
3. `NULL`,特殊常數(shù);
4. 布爾值,`TRUE` 或 `FALSE`;
SQL過濾生產(chǎn)者
生產(chǎn)者主要設(shè)置屬性過濾 message.putUserProperty("a", String.valueOf(i));
表示第一條消息鍵值對是 a=0,第二條消息鍵值對是a=1。
DefaultMQProducer defaultMQProducer = new DefaultMQProducer("TagProducer_group");defaultMQProducer.setNamesrvAddr("172.31.184.89:9876");defaultMQProducer.start();String[] tags = new String[]{"tagA", "tagB", "tagC"};for (int i = 0; i < 15; i++) {Message message = new Message("SQLFilterTest", tags[i % tags.length], ("飛哥sql消息過濾" + tags[i % tags.length]).getBytes());message.putUserProperty("a", String.valueOf(i));SendResult send = defaultMQProducer.send(message);System.out.printf("%s%n", send);}defaultMQProducer.shutdown();
SQL過濾消費(fèi)者:
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("tagConsumer");defaultMQPushConsumer.setNamesrvAddr("172.31.184.89:9876");defaultMQPushConsumer.subscribe("SQLFilterTest", MessageSelector.bySql("(TAGS is not null and TAGS in ('tagA','tagC'))"+" and (a is null and a between 0 and 3)"));defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {for (MessageExt msg : msgs) {System.out.println("接收到的消息=" + msg);System.out.println("接收到的消息體=" + new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});defaultMQPushConsumer.start();System.out.println("消費(fèi)者已經(jīng)啟動(dòng)");
如果運(yùn)行報(bào) The broker does not support consumer to filter message by SQL92
則需要修改 broker.conf 文件,增加如下配置:
# 開啟對 propertyfilter的支持
enablePropertyFilter = true
filterSupportRetry = true
然后重啟broker。
總結(jié)
本文介紹了局部順序消息,全局順序消息,廣播消息,延遲消息,以及如何批量發(fā)送消息和過濾消息。