中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁 > news >正文

網(wǎng)站建設(shè)專用圖形庫seo網(wǎng)站內(nèi)容優(yōu)化有哪些

網(wǎng)站建設(shè)專用圖形庫,seo網(wǎng)站內(nèi)容優(yōu)化有哪些,兼職做網(wǎng)站 深圳,愛漫畫-只做精品的韓漫網(wǎng)站文章目錄 RocketMQ 介紹為什么要使用 MQ ?RocketMQ 與其他產(chǎn)品對比vs Kafkavs RabbitMQvs ActiveMQ RocketMQ 重要概念部署 Namesrver、Broker、Dashboard快速入門消息生產(chǎn)者消息消費者 消費模式簡單消息1)同步發(fā)送2)異步發(fā)送3)單…

文章目錄

    • RocketMQ 介紹
    • 為什么要使用 MQ ?
    • RocketMQ 與其他產(chǎn)品對比
      • vs Kafka
      • vs RabbitMQ
      • vs ActiveMQ
    • RocketMQ 重要概念
    • 部署 Namesrver、Broker、Dashboard
    • 快速入門
      • 消息生產(chǎn)者
      • 消息消費者
    • 消費模式
    • 簡單消息
      • 1)同步發(fā)送
      • 2)異步發(fā)送
      • 3)單向發(fā)送
    • 順序消息
      • 順序消息生產(chǎn)者
      • 順序消息消費者
    • 廣播消息
      • 廣播消息消費模式
    • 延遲消息
      • 預(yù)定日程生產(chǎn)者
      • 預(yù)定日程消費者
      • 指定時間生產(chǎn)者
      • 指定時間消費者
    • RocketMQ 如何保證消息可靠性?
    • RocketMQ 如何解決消息積壓問題?

RocketMQ 介紹

RocketMQ是一款由阿里巴巴開源的分布式消息中間件。它具有低延遲、高吞吐量、高可用性和高可靠性等特點,適用于構(gòu)建具有海量消息堆積和異步解耦功能的應(yīng)用系統(tǒng)。

為什么要使用 MQ ?

作用描述
異步系統(tǒng)耦合度降低,沒有強(qiáng)依賴關(guān)系
削峰不需要同步執(zhí)行的遠(yuǎn)程調(diào)用可以有效提高響應(yīng)時間
解耦請求達(dá)到峰值后,后端或者數(shù)據(jù)庫還可以保持固定消費速率消費,不會被壓垮

RocketMQ 與其他產(chǎn)品對比

vs Kafka

  • 數(shù)據(jù)可靠性:RocketMQ 提供多種可靠性保證,Kafka 在極端情況下可能丟失數(shù)據(jù)。
  • 實時性:RocketMQ 在消息實時性上表現(xiàn)更佳。
  • 隊列數(shù)與性能:RocketMQ 支持更多隊列,Kafka 在高分區(qū)下性能下降。
  • 消息順序性:RocketMQ 支持嚴(yán)格順序,Kafka 可能產(chǎn)生亂序。
  • 生態(tài):Kafka 生態(tài)更豐富,RocketMQ 與阿里技術(shù)棧集成好。

vs RabbitMQ

  • 性能:RocketMQ 在高并發(fā)和海量消息處理上表現(xiàn)更優(yōu)。
  • 消息模型:RabbitMQ 模型靈活,RocketMQ 注重順序和事務(wù)。
  • 適用場景:RabbitMQ 適用于可靠消息傳遞,RocketMQ 適用于高性能場景。

vs ActiveMQ

  • 跨平臺與持久化:ActiveMQ 支持多種協(xié)議和數(shù)據(jù)庫持久化,RocketMQ 持久化機(jī)制高效。
  • 靈活性:ActiveMQ 協(xié)議廣泛,RocketMQ 多語言SDK集成友好。
  • 社區(qū)與文檔:ActiveMQ 社區(qū)活躍度較低,RocketMQ 開發(fā)活躍但社區(qū)成熟度不及 RabbitMQ。

總結(jié):RocketMQ 在數(shù)據(jù)可靠性、實時性、隊列數(shù)與性能上具有優(yōu)勢,適合高性能和順序消息場景。

RocketMQ 重要概念

Producer:消息的發(fā)送者,生產(chǎn)者 (發(fā)件人)

Consumer:消息接收者,消費者 (取件人)

Broker:暫時和傳輸消息的通道 (快遞)

NameServer:管理Broker的;負(fù)責(zé)消息的存儲和轉(zhuǎn)發(fā),接收生產(chǎn)者產(chǎn)生的消息并持久化消息;當(dāng)用戶發(fā)送的消息被發(fā)送到Broker時,Broker會將消息轉(zhuǎn)發(fā)到與之關(guān)聯(lián)的Topic中,以便讓更多的接收者進(jìn)行處理;各個快遞公司的管理機(jī)構(gòu),相當(dāng)于Broker的注冊中心,保留了broker的信息 (監(jiān)測快遞是否健康)

Queue:隊列,消息存放的位置,一個Broker中可以有多個隊列 (驛站)

Topic:主題,消息的分類,用于標(biāo)識同一類業(yè)務(wù)邏輯的消息 (取件碼)

ConsumerGroup:消費者組,RocketMQ 中承載多個消費行為一致的消費者負(fù)載均衡分組。和消費者不同,消費者組是一個邏輯概念。

部署 Namesrver、Broker、Dashboard

Docker部署RocketMQ5.x (單機(jī)部署+配置參數(shù)詳解+不使用docker-compose直接部署)_rocketmq不推薦用docker部署-CSDN博客

快速入門

1)創(chuàng)建一個基于 Maven 的 SpringBoot 項目,并添加以下依賴

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>5.1.0</version>
</dependency>

消息生產(chǎn)者

  1. 創(chuàng)建消息生產(chǎn)者producer,并指定生產(chǎn)者組名
  2. 指定Nameserver地址
  3. 啟動producer
  4. 創(chuàng)建消息對象,指定主題Topic、Tag和消息體
  5. 發(fā)送消息
  6. 關(guān)閉生產(chǎn)者
public class SyncProducer {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("SyncProducer");producer.setNamesrvAddr("10.226.8.14:9876");producer.start();for (int i = 0; i < 2; i++) {Message msg = new Message("Simple", //主題"TagA",  //設(shè)置消息Tag,用于消費端根據(jù)指定Tag過濾消息。"Simple-Sync".getBytes(StandardCharsets.UTF_8) //消息體。);SendResult send = producer.send(msg);System.out.printf(i + ".發(fā)送消息成功:%s%n", send);}producer.shutdown();}
}

消息消費者

  1. 創(chuàng)建消費者comsumer、指定消費者組名
  2. 指定Nameserver地址
  3. 創(chuàng)建監(jiān)聽訂閱主題Topic和Tag等
  4. 處理消息
  5. 啟動消費者comsumer
public class Consumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("SimplePushConsumer");pushConsumer.setNamesrvAddr("10.226.8.14:9876");pushConsumer.subscribe("Simple","*");pushConsumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {list.forEach( n->{System.out.printf("收到消息: %s%n" , n);});return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});pushConsumer.start();System.out.printf("Consumer Started.%n");}
}

消費模式

MQ 的消費模式可以大致分為兩種,一種是推 Push,一種是拉 Pull。

Push 是服務(wù)端主動推送消息給客戶端,優(yōu)點是及時性較好,但如果客戶端沒有做好流控,旦服務(wù)端推送大量消息到客戶端時,就會導(dǎo)致客戶端消息堆積甚至崩潰。

Pull 是客戶端需要主動到服務(wù)端取數(shù)據(jù),優(yōu)點是客戶端可以依據(jù)自己的消費能力進(jìn)行消費但拉取的頻率也需要用戶自己控制,拉取頻繁容易造成服務(wù)端和客戶端的壓力,拉取間隔長又容易造成消費不及時。

Push 模式也是基于 pull 模式的,只能客戶端內(nèi)部封裝了 api,一般場景下,上游消息生產(chǎn)量小或者均速的時候,選擇 push 模式。在特殊場景下,例如電商大促,搶優(yōu)惠券等場景可以選擇 pull 模式

簡單消息

1)同步發(fā)送

可靠性要求高、數(shù)據(jù)量級少、實時響應(yīng),具體實現(xiàn)參考上面的入門代碼

2)異步發(fā)送

不等待消息返回直接進(jìn)入后續(xù)流程。

public class AsyncProducer {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("AsyncProducer");producer.setNamesrvAddr("10.226.8.14:9876");producer.start();CountDownLatch countDownLatch = new CountDownLatch(100);//計數(shù)for (int i = 0; i < 100; i++) {Message message = new Message("Simple", "TagA", "Simple-Async".getBytes(StandardCharsets.UTF_8));final int index = i;producer.send(message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {countDownLatch.countDown();System.out.printf("%d 消息發(fā)送成功%s%n", index, sendResult);}@Overridepublic void onException(Throwable throwable) {countDownLatch.countDown();System.out.printf("%d 消息失敗%s%n", index, throwable);throwable.printStackTrace();}});}countDownLatch.await(5, TimeUnit.SECONDS);producer.shutdown();}
}

3)單向發(fā)送

只負(fù)責(zé)發(fā)送,不管消息是否發(fā)送成功。

public class OnewayProducer {public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("AsyncProducer");producer.setNamesrvAddr("10.226.8.14:9876");producer.start();for (int i = 0; i < 10; i++) {Message message = new Message("Simple","TagA", "Simple-Oneway".getBytes(StandardCharsets.UTF_8));producer.sendOneway(message);System.out.printf("%d 消息發(fā)送完成 %n" , i);}Thread.sleep(5000);producer.shutdown();}
}

順序消息

順序消息指生產(chǎn)者局部有序發(fā)送到一個queue,但多個queue之間是全局無序的。

  • 順序消息生產(chǎn)者樣例:通過MessageQueueSelector將消息有序發(fā)送到同一個queue中。
  • 順序消息消費者樣例:通過MessageListenerOrderly消費者每次讀取消息都只從一個queue中獲取(通過加鎖的方式實現(xiàn))。

順序消息生產(chǎn)者

public class OrderProducer {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("OrderProducer");producer.setNamesrvAddr("10.226.8.14:9876");producer.start();for (int j = 0; j < 5; j++) {for (int i = 0; i < 10; i++) {Message message = new Message("OrderTopic","TagA",("order_" + j + "_step_" + i).getBytes(StandardCharsets.UTF_8));SendResult sendResult = producer.send(message, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> list, Message message, Object o) {Integer id = (Integer) o;int index = id % list.size();return list.get(index);}}, j);System.out.printf("%s%n", sendResult);}}producer.shutdown();}
}

順序消息消費者

public class OrderConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OrderConsumer");consumer.setNamesrvAddr("10.226.8.14:9876");consumer.subscribe("OrderTopic","*");consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {list.forEach(n->{System.out.println("QueueId:"+n.getQueueId() + "收到消息內(nèi)容 "+new String(n.getBody()));});return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");}
}

廣播消息

廣播消息并沒有特定的消息消費者樣例,這是因為這涉及到消費者的集群消費模式。

  • MessageModel.BROADCASTING:廣播消息。一條消息會發(fā)給所有訂閱了對應(yīng)主題的消費者,不管消費者是不是同一個消費者組。
  • MessageModel.CLUSTERING:集群消息。每一條消息只會被同一個消費者組中的一個實例消費。

廣播消息消費模式

public class BroadcastConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("BroadCastConsumer");consumer.setNamesrvAddr("10.226.8.14:9876");consumer.subscribe("simple","*");consumer.setMessageModel(MessageModel.BROADCASTING); //廣播模式consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {list.forEach(n->{System.out.println("QueueId:"+n.getQueueId() + "收到消息內(nèi)容 "+new String(n.getBody()));});return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf("Broadcast Consumer Started.%n");}
}

延遲消息

延遲消息實現(xiàn)的效果就是在調(diào)用producer.send方法后,消息并不會立即發(fā)送出去,而是會等一段時間再發(fā)送出去。這是RocketMQ特有的一個功能。

  • message.setDelayTimeLevel(3):預(yù)定日常定時發(fā)送。1到18分別對應(yīng)messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h;可以在dashboard中broker配置查看。
  • msg.setDelayTimeMs(10L):指定時間定時發(fā)送。默認(rèn)支持最大延遲時間為3天,可以根據(jù)broker配置:timerMaxDelaySec修改。

預(yù)定日程生產(chǎn)者

public class ScheduleProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("ScheduleProducer");producer.setNamesrvAddr("10.226.8.14:9876");producer.start();for (int i = 0; i < 2; i++) {Message msg = new Message("Schedule", //主題"TagA",  //設(shè)置消息Tag,用于消費端根據(jù)指定Tag過濾消息。"ScheduleProducer".getBytes(StandardCharsets.UTF_8) //消息體。);//1到18分別對應(yīng)messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2hmsg.setDelayTimeLevel(3);producer.send(msg);System.out.printf(i + ".發(fā)送消息成功:%s%n", LocalTime.now());}producer.shutdown();}
}

預(yù)定日程消費者

public class ScheduleConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("SimplePushConsumer");pushConsumer.setNamesrvAddr("10.226.8.14:9876");pushConsumer.subscribe("Schedule","*");pushConsumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {list.forEach( n->{System.out.printf("接收時間:%s %n", LocalTime.now());});return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});pushConsumer.start();System.out.printf("Simple Consumer Started.%n");}
}

指定時間生產(chǎn)者

public class TimeProducer {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("TimeProducer");producer.setNamesrvAddr("192.168.43.137:9876");producer.start();for (int i = 0; i < 2; i++) {Message msg = new Message("Schedule", //主題"TagA",  //設(shè)置消息Tag,用于消費端根據(jù)指定Tag過濾消息。"TimeProducer".getBytes(StandardCharsets.UTF_8) //消息體。);// 相對時間:延時消息。此消息將在 10 秒后傳遞給消費者。msg.setDelayTimeMs(10000L);// 絕對時間:定時消息。設(shè)置一個具體的時間,然后在這個時間之后多久在進(jìn)行發(fā)送消息// msg.setDeliverTimeMs(System.currentTimeMillis() + 10000L);producer.send(msg);System.out.printf(i + ".發(fā)送消息成功:%s%n", LocalTime.now());}producer.shutdown();}
}

指定時間消費者

public class TimeConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("TimeConsumer");pushConsumer.setNamesrvAddr("10.226.8.14:9876");pushConsumer.subscribe("Schedule","*");pushConsumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {list.forEach( n->{System.out.printf("接收時間:%s %n", LocalTime.now());});return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});pushConsumer.start();System.out.printf("Simple Consumer Started.%n");}
}

RocketMQ 如何保證消息可靠性?

我們將消息流程分為三大部分,每一部分都有可能會丟失數(shù)據(jù)。

  • 生產(chǎn)階段:Producer 通過網(wǎng)絡(luò)將消息發(fā)送給 Broker,這個發(fā)送可能會發(fā)生丟失。比如網(wǎng)絡(luò)延遲不可達(dá)等。
  • 存儲階段:Broker 肯定是先把消息放到內(nèi)存的,然后根據(jù)刷盤策略持久化到硬盤中。剛收到 Producer 的消息,放入內(nèi)存,但是異常宕機(jī)了,導(dǎo)致消息丟失。
  • 消費階段:消費失敗。比如先提交ack再消費,處理過程中出現(xiàn)異常,該消息就出現(xiàn)了丟失。

解決方案:

  • 生產(chǎn)階段:使用同步發(fā)送失敗重試機(jī)制;異步發(fā)送重寫回調(diào)方法檢查發(fā)送結(jié)果;ACK 確認(rèn)機(jī)制。
  • 存儲階段:同步刷盤機(jī)制(默認(rèn)情況下是異步刷盤);集群模式采用同步復(fù)制
  • 消費階段:正常消費處理完成才提交ACK(手動ACK);如果處理異常返回重試標(biāo)識。

RocketMQ 如何解決消息積壓問題?

  1. 增加消費者數(shù)量:
    增加消費者實例的數(shù)量,以提高消息的消費速度。
    確保消費者實例數(shù)量與消息隊列數(shù)量匹配,以便每個隊列都有專門的消費者處理。

  2. 優(yōu)化消費者邏輯:
    優(yōu)化消費者的處理邏輯,提高單個消費者的處理效率。
    使用批量消費的方式來減少每次消費的開銷。

  3. 擴(kuò)展消息隊列容量:
    增加消息隊列的數(shù)量,以分散消息負(fù)載。
    動態(tài)調(diào)整隊列數(shù)量,增加處理能力,實現(xiàn)更高的并行處理。

  4. 設(shè)置消息消費失敗處理機(jī)制:
    實施重試機(jī)制,確保消費失敗的消息能夠被重新處理。
    設(shè)置死信隊列(DLQ)來處理多次消費失敗的消息。

  5. 快速失敗丟棄消息:
    如果某些消息可以丟棄,考慮在高峰期快速丟棄這些消息以減輕負(fù)擔(dān)。

  6. 提升系統(tǒng)性能:
    優(yōu)化服務(wù)器性能,增加硬件資源,如CPU、內(nèi)存和網(wǎng)絡(luò)帶寬。
    確保網(wǎng)絡(luò)連接的穩(wěn)定性和速度,以減少延遲。

http://www.risenshineclean.com/news/2853.html

相關(guān)文章:

  • 虛擬空間官網(wǎng)衡陽有實力seo優(yōu)化
  • 睢寧網(wǎng)站建設(shè)xzqjwl深圳推廣公司有哪些
  • 網(wǎng)站開發(fā)開題報告怎樣推廣一個產(chǎn)品
  • 費縣做網(wǎng)站收錄提交入口網(wǎng)址
  • wordpress 做公司網(wǎng)站網(wǎng)絡(luò)營銷的推廣
  • 網(wǎng)站下雪的效果怎么做的一鍵制作網(wǎng)站
  • 個人網(wǎng)站模板之家吳中seo網(wǎng)站優(yōu)化軟件
  • 網(wǎng)站做采集會有問題么寧波最好的推廣平臺
  • 哪家專門做特賣網(wǎng)站百度搜索網(wǎng)頁
  • wordpress 簡約windows優(yōu)化大師最新版本
  • 常熟網(wǎng)站建設(shè)icp備案長沙網(wǎng)址seo
  • 最專業(yè)的佛山網(wǎng)站建設(shè)牛排seo系統(tǒng)
  • 做的網(wǎng)站晚上偷偷看b站軟件推薦
  • 網(wǎng)站建設(shè)網(wǎng)絡(luò)推廣加盟蘇州seo公司
  • 網(wǎng)站做線旅游景區(qū)網(wǎng)絡(luò)營銷案例
  • 新疆建設(shè)職業(yè)培訓(xùn)中心網(wǎng)站線上營銷推廣方案有哪些
  • 赤峰浩誠網(wǎng)站建設(shè)有限公司百度店鋪
  • 長白山網(wǎng)站學(xué)做管理下載百度2023最新版
  • 網(wǎng)站成本廣告公司網(wǎng)站制作
  • wap手機(jī)百度seo排名工具
  • 網(wǎng)站聊天系統(tǒng)怎么做友情網(wǎng)站
  • 住房和城鄉(xiāng)建設(shè)部電工證廊坊優(yōu)化技巧
  • 網(wǎng)頁使用怎么做太原seo推廣
  • 袁隆平網(wǎng)站設(shè)計模板貴州seo和網(wǎng)絡(luò)推廣
  • wordpress主題轉(zhuǎn)zblog網(wǎng)站網(wǎng)頁的優(yōu)化方法
  • 做高仿批發(fā)的網(wǎng)站有哪些百度熱門排行榜
  • 如何用nat123做網(wǎng)站做網(wǎng)站用哪個軟件
  • 南通營銷型網(wǎng)站建設(shè)自媒體平臺注冊官網(wǎng)
  • 網(wǎng)站后臺上傳圖片 不可用提高網(wǎng)站排名
  • 個人網(wǎng)站怎么申請注冊同城推廣