中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁 > news >正文

平面設(shè)計(jì)海報(bào)圖片seo按天計(jì)費(fèi)系統(tǒng)

平面設(shè)計(jì)海報(bào)圖片,seo按天計(jì)費(fèi)系統(tǒng),濰坊網(wǎng)站做的好的公司,海滄區(qū)建設(shè)局網(wǎng)站1. 前言 上一篇文章我們介紹了簡單消息的實(shí)現(xiàn),本文將主要來介紹順序消息的實(shí)現(xiàn),順序消息分為局部順序消息和全局順序消息。 順序消息指的是消費(fèi)者在消費(fèi)消息時(shí),按照生產(chǎn)者發(fā)送消息的順序進(jìn)行消費(fèi)。即先發(fā)送的先消費(fèi)【FIFO】。 順序消息分為…

1. 前言

上一篇文章我們介紹了簡單消息的實(shí)現(xiàn),本文將主要來介紹順序消息的實(shí)現(xiàn),順序消息分為局部順序消息和全局順序消息。

順序消息指的是消費(fèi)者在消費(fèi)消息時(shí),按照生產(chǎn)者發(fā)送消息的順序進(jìn)行消費(fèi)。即先發(fā)送的先消費(fèi)【FIFO】。

順序消息分為 全局順序消息和局部順序消息。

全局順序消息就是全局使用一個(gè)queue。

局部順序消息就是 有順序依賴的消息放在同一個(gè)queue中,多個(gè)queue并行消費(fèi)。

2. 局部順序消息

默認(rèn)情況下RocketMQ會(huì)根據(jù)輪詢的方式將消息發(fā)送到某個(gè)broker中的某個(gè)隊(duì)列中,這樣的話就不能保證消息是有序的。

比如在購物網(wǎng)站下單場景下:有 1. 創(chuàng)建訂單---->2. 訂單支付---->3. 訂單發(fā)貨---->4. 訂單完成 四條消息。這四條消息邏輯上肯定是有序的。但是如果采用RocketMQ默認(rèn)的消息投遞方式,那么同一個(gè)訂單,有可能創(chuàng)建訂單被投遞到了 MessageQueue1,訂單支付的話被投遞到了MessageQueue2。 由于消息在不同的MessageQueue中,消費(fèi)者在消費(fèi)的時(shí)候就可能會(huì)出現(xiàn)訂單支付的消息先于創(chuàng)建訂單的消息。

局部順序消息就是要保證同一筆訂單4條消息都放在同一個(gè)queue中,這樣的話就不會(huì)出現(xiàn)訂單支付的消息先于創(chuàng)建訂單的消息被消費(fèi)。就像下圖所示:

局部順序消息

局部順序消息消費(fèi)者在消費(fèi)某個(gè)topic的某個(gè)隊(duì)列中的消息的時(shí)候是順序的。消費(fèi)者使用MessageListenerOrderly類來進(jìn)行消息監(jiān)聽。

2.1. 定義生產(chǎn)者

  1. 這里定義了名為part_order_topic_test的topic。運(yùn)行程序之后該topic可以路由到broker-a 以及broker-b 兩個(gè)broker。

    image-20231003154231683

public class OrderProducer {// 局部順序消費(fèi),核心就是自己選擇Queue,保證需要順序保障的消息落到同一個(gè)隊(duì)列中public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer defaultMQProducer = new DefaultMQProducer("order_producer_group");defaultMQProducer.setNamesrvAddr("172.31.184.89:9876");defaultMQProducer.start();for (int i = 0; i < 10; i++) {int orderId = i;for (int j = 0; j < 5; j++) {// 構(gòu)建消息體,tags和key 只是做一個(gè)簡單區(qū)分Message partOrderMsg = new Message("part_order_topic_test", "order_" + orderId, "KEY_" + orderId, ("局部順序消息處理_" + orderId + ";step_" + j).getBytes());SendResult send = defaultMQProducer.send(partOrderMsg, new MessageQueueSelector() {@Override//這里的arg參數(shù)就是外面的orderIdpublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer orderId = (Integer) arg;int index = orderId % mqs.size();return mqs.get(index);}}, orderId);System.out.printf("%s%n", send);}}defaultMQProducer.shutdown();}
}
  1. 在發(fā)送消息的時(shí)候?qū)崿F(xiàn)MessageQueueSelector接口用于在發(fā)送消息的時(shí)候指定隊(duì)列。其中, public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) 方法有三個(gè)參數(shù):其中,mqs表示當(dāng)前topic所路由的全部隊(duì)列數(shù),這里就是8個(gè)隊(duì)列,broker-a有4個(gè)隊(duì)列,broker-b有4個(gè)隊(duì)列。msg就是傳入的消息體,arg 就是傳入的orderId。

  2. 這里根據(jù)orderId與隊(duì)列數(shù)求模取余來獲取消息應(yīng)該發(fā)送到哪個(gè)隊(duì)列中,這樣就保證了相同的orderId的消息會(huì)落到同一個(gè)隊(duì)列中

    Integer orderId = (Integer) arg;
    int index = orderId % mqs.size();
    return mqs.get(index);
    
生產(chǎn)者運(yùn)行結(jié)果(部分截圖)

image-20231003160039915

從運(yùn)行結(jié)果可以看出相同orderId的消息被投遞到了同一個(gè)MessageQueue中,而相同MessageQueue隊(duì)列天然是有順序的。

2.2.定義消費(fèi)者

說完了生產(chǎn)者,接著來說說消費(fèi)者。消費(fèi)者的邏輯主要是在消費(fèi)的時(shí)候需要實(shí)現(xiàn) MessageListenerOrderly 類來進(jìn)行消息監(jiān)聽。核心代碼是:

	// 2.訂閱消費(fèi)消息defaultMQPushConsumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {for (MessageExt msg : msgs) {System.out.println("消費(fèi)得到的消息是={}" + msg);System.out.println("消息體內(nèi)容是={}" + new String(msg.getBody()));}return ConsumeOrderlyStatus.SUCCESS;}});

這里啟動(dòng)了三個(gè)消費(fèi)者,不管消費(fèi)者消費(fèi)的順序如何,相同的orderId下的5條消息都是被順序消費(fèi)的。

image-20231010125014684

image-20231010125043838

image-20231010125110337

3. 碰到的問題

在首次調(diào)試的時(shí)候出現(xiàn)了一個(gè) broker is full 的錯(cuò)誤。這是由于磁盤空間不足導(dǎo)致的,可以通過 df -h 命令查看當(dāng)前磁盤空間的占用情況,當(dāng)磁盤空間使用率超過90%的話則會(huì)報(bào)此錯(cuò)。

image-20231003131237358

4. 全局順序消息

全局順序消息是指消費(fèi)者消費(fèi)全部消息都是順序的,只能讓所有的消息都發(fā)送到同一個(gè)MessageQueue中來實(shí)現(xiàn),在高并發(fā)場景下會(huì)非常影響效率。

5. 廣播消息

廣播消息是向主題(topic)的所有訂閱者發(fā)送消息,訂閱同一個(gè)topic的多個(gè)消費(fèi)者,都能全量收到生產(chǎn)者發(fā)送的所有消息。

廣播消息的生產(chǎn)者與普通同步消息的生產(chǎn)者實(shí)現(xiàn)是一致的,不同的是消費(fèi)者的消息模式不同。這里給出消費(fèi)者實(shí)現(xiàn)的不同之處。

	DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("broadCastGroup");defaultMQPushConsumer.setNamesrvAddr("172.31.184.89:9876");// 設(shè)置消費(fèi)者的模式是廣播模式defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING);//從第一位開始消費(fèi)defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

6. 延遲消息

延遲消息與普通消息的不同之處在于,它們要在指定的時(shí)間之后才會(huì)被傳遞。生產(chǎn)者并不會(huì)延遲發(fā)送消息,而是發(fā)送到topic里面,消費(fèi)者延遲指定的時(shí)間進(jìn)行消費(fèi)。

6.1. 延遲消息生產(chǎn)者

DefaultMQProducer defaultMQProducer = new DefaultMQProducer("scheduled_group");defaultMQProducer.setNamesrvAddr("172.31.186.180:9876");defaultMQProducer.start();for (int i = 0; i < 100; i++) {Message message = new Message("Schedule_topic", ("延遲消息測試" + i).getBytes());//設(shè)置延遲級別,默認(rèn)有18個(gè)延遲級別,這個(gè)消息將延遲10秒消費(fèi)message.setDelayTimeLevel(3);defaultMQProducer.send(message);}System.out.println("所有延遲消息發(fā)送完成");defaultMQProducer.shutdown();

延遲消息生產(chǎn)者與普通消息生產(chǎn)者主要的區(qū)別是延遲消息需要調(diào)用 setDelayTimeLevel 方法設(shè)置延遲級別,這里設(shè)置級別是3,則是延遲10秒。RocketMQ提供了18種延遲級別??梢栽?RocketMQ的儀表板中的集群中的broker配置中找到。

image-20231003200021490

延遲消息的消費(fèi)者與普通消息的消費(fèi)者相同的。RocketMQ內(nèi)部通過名為SCHEDULE_TOPIC_XXXX 的topic來存放延遲消息。

image-20231003201410410

7.批量消息

批量發(fā)送消息提高了傳遞消息的性能。官方建議批量消息的總大小不應(yīng)超過1M,實(shí)際不應(yīng)超過4M。如果超過4M的批量消息需要進(jìn)行分批處理。同時(shí)設(shè)置broker的配置參數(shù)為4M(在broker的配置文件中修改:maxMessageSize=4194304)。核心代碼如下:

	//4.創(chuàng)建消息List<Message> messageList = new ArrayList<>();for (int i = 0; i < 100*100; i++) {// 創(chuàng)建消息,指定topic,以及消息體messageList.add(new Message("batch_topic", ("飛哥測試批量消息" + i).getBytes()));}//批量消息消息小于4M的處理SendResult send = defaultMQProducer.send(messageList);System.out.println(send);

8.過濾消息

使用tag過濾

在大多數(shù)情況下,標(biāo)簽是一種簡單而有用的設(shè)計(jì),可以用來選擇你想要的消息。

首先是根據(jù)tag來過濾消息,生產(chǎn)者在發(fā)送消息的時(shí)候指定該消息的tag標(biāo)簽,消費(fèi)者則可以根據(jù)tag來過濾消息。

8.1. 過濾消息生產(chǎn)者

這里定義了三個(gè)tag,分別是tagA,tagB以及tagC,生產(chǎn)者在生產(chǎn)消息的時(shí)候給每個(gè)消息指定不同的tag。

DefaultMQProducer defaultMQProducer = new DefaultMQProducer("TagProducer_group");defaultMQProducer.setNamesrvAddr("172.31.184.89:9876");defaultMQProducer.start();String[] tags = new String[]{"tagA", "tagB", "tagC"};for (int i = 0; i < 15; i++) {Message message = new Message("TagFilterTest", tags[i % tags.length], ("飛哥tag消息過濾" + tags[i % tags.length]).getBytes());SendResult send = defaultMQProducer.send(message);System.out.printf("%s%n", send);}defaultMQProducer.shutdown();

8.2. 過濾消息的消費(fèi)者

消費(fèi)者過濾出了標(biāo)簽帶有tagA以及tagC的消息進(jìn)行消費(fèi)。這里其實(shí)是broker將consumer需要的消息推給消費(fèi)者。

	DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("tagConsumer");defaultMQPushConsumer.setNamesrvAddr("172.31.184.89:9876");defaultMQPushConsumer.subscribe("TagFilterTest", "tagA||tagC");defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {for (MessageExt msg : msgs) {System.out.println("接收到的消息=" + msg);System.out.println("接收到的消息體=" + new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});defaultMQPushConsumer.start();System.out.println("消費(fèi)者已經(jīng)啟動(dòng)");

image-20231003212919155

使用SQL過濾

SQL 功能可以通過發(fā)送消息時(shí)輸入的屬性進(jìn)行一些計(jì)算,在RocketMQ定義的語法下,可以實(shí)現(xiàn)一些有趣的邏輯。

語法

RocketMQ只定義了一些基本的語法類支持這個(gè)特性。

1. 數(shù)值比較:如 `>`,`>=`,`<=`,`BETWEEN`,`=`;
2. 字符比較:如 `=`,'<>',`IN`;
3. `IS NULL` 或 `IS NOT NULL` ;
4. 邏輯`AND`,`OR`,`NOT`;

常量類型有:

1. 數(shù)字,如 123,
2. 字符,如 'abc',必須用單引號;
3. `NULL`,特殊常數(shù);
4. 布爾值,`TRUE` 或 `FALSE`;

SQL過濾生產(chǎn)者

生產(chǎn)者主要設(shè)置屬性過濾 message.putUserProperty("a", String.valueOf(i)); 表示第一條消息鍵值對是 a=0,第二條消息鍵值對是a=1。

	DefaultMQProducer defaultMQProducer = new DefaultMQProducer("TagProducer_group");defaultMQProducer.setNamesrvAddr("172.31.184.89:9876");defaultMQProducer.start();String[] tags = new String[]{"tagA", "tagB", "tagC"};for (int i = 0; i < 15; i++) {Message message = new Message("SQLFilterTest", tags[i % tags.length], ("飛哥sql消息過濾" + tags[i % tags.length]).getBytes());message.putUserProperty("a", String.valueOf(i));SendResult send = defaultMQProducer.send(message);System.out.printf("%s%n", send);}defaultMQProducer.shutdown();

SQL過濾消費(fèi)者:

	DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("tagConsumer");defaultMQPushConsumer.setNamesrvAddr("172.31.184.89:9876");defaultMQPushConsumer.subscribe("SQLFilterTest", MessageSelector.bySql("(TAGS is not null and TAGS in ('tagA','tagC'))"+" and (a is null and a between 0 and 3)"));defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {for (MessageExt msg : msgs) {System.out.println("接收到的消息=" + msg);System.out.println("接收到的消息體=" + new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});defaultMQPushConsumer.start();System.out.println("消費(fèi)者已經(jīng)啟動(dòng)");

如果運(yùn)行報(bào) The broker does not support consumer to filter message by SQL92

image-20231003221207618

則需要修改 broker.conf 文件,增加如下配置:

# 開啟對 propertyfilter的支持
enablePropertyFilter = true 
filterSupportRetry = true

然后重啟broker。

總結(jié)

本文介紹了局部順序消息,全局順序消息,廣播消息,延遲消息,以及如何批量發(fā)送消息和過濾消息。

http://www.risenshineclean.com/news/8809.html

相關(guān)文章:

  • 巨鹿網(wǎng)站建設(shè)網(wǎng)絡(luò)公司關(guān)鍵詞查詢工具
  • 安陽做網(wǎng)站的公司流量寶
  • 網(wǎng)站類型分類有哪些電商網(wǎng)站建設(shè) 網(wǎng)站定制開發(fā)
  • 長春網(wǎng)絡(luò)營銷外包泰州seo推廣
  • 網(wǎng)站節(jié)日制作快速網(wǎng)站輕松排名
  • 社交網(wǎng)站開發(fā)語言俄羅斯引擎搜索
  • 網(wǎng)站交換鏈接怎么做百度競價(jià)個(gè)人開戶
  • 網(wǎng)站代理什么意思百度seo排名優(yōu)化助手
  • 在合肥做網(wǎng)站多少錢千川推廣官網(wǎng)
  • 做網(wǎng)站需要云數(shù)據(jù)庫嗎2345網(wǎng)址導(dǎo)航手機(jī)版
  • 上海網(wǎng)站建設(shè)與設(shè)計(jì)公司好百度手機(jī)網(wǎng)頁版入口
  • 小說網(wǎng)站架構(gòu)win10優(yōu)化工具下載
  • 深圳福田做網(wǎng)站百度推廣開戶需要多少錢
  • 網(wǎng)站開發(fā)網(wǎng)站開發(fā)拼多多怎么查商品排名
  • 成品網(wǎng)站 免費(fèi)網(wǎng)絡(luò)推廣的基本方法有哪些
  • 通江網(wǎng)站建設(shè)國際最新新聞熱點(diǎn)事件
  • 教育行業(yè)網(wǎng)站建設(shè)價(jià)格培訓(xùn)方案
  • 打造公司的網(wǎng)站互聯(lián)網(wǎng)推廣是干什么的
  • 北京做網(wǎng)站好的關(guān)鍵詞熱度分析
  • 網(wǎng)絡(luò)公司給別人做網(wǎng)站的cms是買的授權(quán)么自媒體怎么入門
  • 做網(wǎng)站要用到數(shù)據(jù)庫嗎東莞網(wǎng)站定制開發(fā)
  • 制作表格的軟件app優(yōu)化大師怎么提交作業(yè)
  • 儀征網(wǎng)站建設(shè)河北網(wǎng)站建設(shè)公司排名
  • 虛擬主機(jī)如何建設(shè)多個(gè)網(wǎng)站企業(yè)網(wǎng)站建設(shè)流程
  • 重慶做網(wǎng)站建設(shè)深圳市seo上詞多少錢
  • js實(shí)現(xiàn)網(wǎng)站浮動(dòng)窗口產(chǎn)品關(guān)鍵詞
  • 烏魯木齊app制作seo引擎優(yōu)化軟件
  • 公司做網(wǎng)站怎么做網(wǎng)頁制作app
  • 網(wǎng)站做多個(gè)產(chǎn)品google官網(wǎng)瀏覽器
  • 推廣哪些app最掙錢天津seo選天津旗艦科技a