中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁 > news >正文

網(wǎng)站主機是服務(wù)器嗎深圳廣告公司

網(wǎng)站主機是服務(wù)器嗎,深圳廣告公司,建設(shè)部門網(wǎng)站,什么是網(wǎng)絡(luò)廣告營銷文章目錄 概述一、MQ消息亂序問題分析1.1 相同topic內(nèi)的消息亂序1.2 不同topic的消息亂序 二、解決方案方案一: 順序消息Kafka1. Kafka 順序消息的實現(xiàn)1.1 生產(chǎn)者:確保同一業(yè)務(wù)主鍵的消息發(fā)送到同一個分區(qū)1.2 消費者:順序消費消息 2. Kafka 順…

文章目錄

  • 概述
  • 一、MQ消息亂序問題分析
    • 1.1 相同topic內(nèi)的消息亂序
    • 1.2 不同topic的消息亂序
  • 二、解決方案
    • 方案一: 順序消息
      • Kafka
        • 1. Kafka 順序消息的實現(xiàn)
          • 1.1 生產(chǎn)者:確保同一業(yè)務(wù)主鍵的消息發(fā)送到同一個分區(qū)
          • 1.2 消費者:順序消費消息
        • 2. Kafka 順序消息實現(xiàn)的局限性
        • 3. 小結(jié)
      • RocketMQ
        • 1. 使用 RocketMQ 實現(xiàn)順序消費
          • 1.1 生產(chǎn)者:發(fā)送順序消息
          • 1.2 消費者:順序消費消息
        • 2. RocketMQ 順序消息的局限性
        • 3. 小結(jié)
    • 方案二: 前置檢測(Pre-check)
      • 前置檢測的方案
      • 方案1: 使用輔助表進行前置檢測
        • 1.1 方案設(shè)計
        • 1.2 數(shù)據(jù)庫表設(shè)計
        • 1.3 消費者前置檢測代碼實現(xiàn)
      • 方案2: 使用序列號/時間戳進行順序檢查
        • 2.1 方案設(shè)計
        • 2.2 消費者前置檢測代碼實現(xiàn)
      • 3. 小結(jié)
    • 方案三: 狀態(tài)機
      • 1. 狀態(tài)機的設(shè)計思路
      • 2. 狀態(tài)機的實現(xiàn)步驟
      • 3. 設(shè)計與實現(xiàn)
      • 3.1 狀態(tài)機設(shè)計
        • 3.1.1 定義狀態(tài)
        • 3.1.2 定義事件
        • 3.1.3 狀態(tài)機邏輯
        • 3.1.4 使用狀態(tài)機處理消息
      • 4. 運行流程
      • 5. 小結(jié)
    • 監(jiān)控與報警
      • 偽實現(xiàn)
  • 總結(jié)

在這里插入圖片描述


概述

在分布式系統(tǒng)中,消息隊列(MQ)作為實現(xiàn)系統(tǒng)解耦和異步通信的重要工具,廣泛應(yīng)用于各種業(yè)務(wù)場景。然而,消息消費時出現(xiàn)的亂序問題,常常會對業(yè)務(wù)邏輯的正確執(zhí)行和系統(tǒng)穩(wěn)定性產(chǎn)生不良影響。

接下來我們將詳細探討MQ消息亂序問題的根源,并提供一系列在實際應(yīng)用中可行的解決方案,包括順序消息、前置檢測、狀態(tài)機等方式

在這里插入圖片描述

一、MQ消息亂序問題分析

1.1 相同topic內(nèi)的消息亂序

  • 并發(fā)消費:為了提高消息處理吞吐量,通常會配置多個消費者實例來并發(fā)消費同一個隊列中的消息。然而,由于消費者實例的性能差異,可能導(dǎo)致消息的消費順序與發(fā)送順序不一致。
  • 消息分區(qū):MQ系統(tǒng)通常采用分區(qū)化設(shè)計,當(dāng)同一業(yè)務(wù)邏輯的消息分發(fā)到不同的分區(qū)時,可能出現(xiàn)亂序。
  • 網(wǎng)絡(luò)延遲與抖動:消息在傳輸過程中可能會受到網(wǎng)絡(luò)延遲和抖動的影響,導(dǎo)致消息到達消費者端的順序與發(fā)送順序不一致。
  • 消息重試與故障恢復(fù):當(dāng)消費者處理消息失敗或出現(xiàn)故障時,重試機制或故障恢復(fù)操作不當(dāng),也可能導(dǎo)致消息亂序。

1.2 不同topic的消息亂序

例如,系統(tǒng)A在01:00時向TopicA發(fā)送了消息msgA-01:00,而系統(tǒng)B在01:01時向TopicB發(fā)送了消息msgB-01:01。消費者無法預(yù)設(shè)msgA-01:00必然先于msgB-01:01被接收。消息系統(tǒng)中的分區(qū)策略、消費者的處理能力、網(wǎng)絡(luò)等因素共同導(dǎo)致無法確保消息遵循嚴格的先進先出(FIFO)原則。


二、解決方案

為了應(yīng)對消息亂序問題,有幾種常見的解決方案,包括順序消息、前置檢測、狀態(tài)機等。

方案一: 順序消息

順序消息是通過確保同一業(yè)務(wù)主鍵的消息發(fā)送到同一分區(qū),從而保證消息的順序性。

Kafka

Kafka 為例,雖然它不保證全局消息順序,但可以通過合理的分區(qū)策略和消息鍵來確保消息的局部順序性。

下面是使用 Kafka 作為消息隊列(MQ)時,如何實現(xiàn)順序消息的解決方案。通過使用 Kafka 的分區(qū)策略和消息鍵(key),可以確保同一業(yè)務(wù)主鍵的消息發(fā)送到同一個分區(qū),從而保證消息的順序性。

1. Kafka 順序消息的實現(xiàn)
1.1 生產(chǎn)者:確保同一業(yè)務(wù)主鍵的消息發(fā)送到同一個分區(qū)

通過指定消息的 key,Kafka 會確保具有相同 key 的消息發(fā)送到同一個分區(qū)。這樣,即使多個消費者并行消費,也能保證消息在同一個分區(qū)內(nèi)的順序。

生產(chǎn)者代碼實現(xiàn)

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class OrderProducer {private final KafkaProducer<String, String> producer;private final String topic;public OrderProducer(String topic) {this.topic = topic;Properties properties = new Properties();properties.put("bootstrap.servers", "localhost:9092");properties.put("key.serializer", StringSerializer.class.getName());properties.put("value.serializer", StringSerializer.class.getName());this.producer = new KafkaProducer<>(properties);}public void sendOrderMessage(String orderId, String orderMessage) {// 使用訂單ID作為消息的 key,以確保同一訂單的消息發(fā)送到同一個分區(qū)ProducerRecord<String, String> record = new ProducerRecord<>(topic, orderId, orderMessage);producer.send(record, (metadata, exception) -> {if (exception != null) {exception.printStackTrace();} else {System.out.println("Message sent: " + metadata);}});}public void close() {producer.close();}public static void main(String[] args) {OrderProducer orderProducer = new OrderProducer("order-topic");// 發(fā)送順序消息,確保同一訂單的消息被發(fā)送到同一分區(qū)orderProducer.sendOrderMessage("order123", "Order Created");orderProducer.sendOrderMessage("order123", "Order Paid");orderProducer.sendOrderMessage("order123", "Order Shipped");// 發(fā)送另一個訂單的消息orderProducer.sendOrderMessage("order456", "Order Created");orderProducer.sendOrderMessage("order456", "Order Paid");orderProducer.close();}
}
  • 在生產(chǎn)者端,通過 ProducerRecord 發(fā)送消息時,設(shè)置了消息的 key 為訂單 ID(orderId)。Kafka 會使用該 key 來確定消息發(fā)送到哪個分區(qū),從而確保同一訂單的所有消息都會被發(fā)送到同一個分區(qū),保證順序。
  • producer.send() 方法的回調(diào)函數(shù)用來處理消息發(fā)送的異步結(jié)果。

1.2 消費者:順序消費消息

消費者使用 MessageListenerConsumer 來消費消息。Kafka 默認會根據(jù)分區(qū)消費順序保證同一分區(qū)內(nèi)消息的順序。我們只需要保證同一個業(yè)務(wù)的消息被路由到同一個分區(qū),消費者就能順序消費這些消息。

消費者代碼實現(xiàn)

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.util.Collections;
import java.util.Properties;public class OrderConsumer {private final KafkaConsumer<String, String> consumer;private final String topic;public OrderConsumer(String topic) {this.topic = topic;Properties properties = new Properties();properties.put("bootstrap.servers", "localhost:9092");properties.put("group.id", "order-consumer-group");properties.put("key.deserializer", StringDeserializer.class.getName());properties.put("value.deserializer", StringDeserializer.class.getName());properties.put("auto.offset.reset", "earliest");this.consumer = new KafkaConsumer<>(properties);}public void consumeMessages() {consumer.subscribe(Collections.singletonList(topic));while (true) {consumer.poll(1000).forEach(record -> {// 處理順序消息System.out.println("Consumed message: " + record.key() + " - " + record.value());});}}public void close() {consumer.close();}public static void main(String[] args) {OrderConsumer orderConsumer = new OrderConsumer("order-topic");// 消費消息,確保同一個訂單的消息順序消費orderConsumer.consumeMessages();}
}
  • 消費者通過 KafkaConsumer 從指定的 topic 中拉取消息。在這種實現(xiàn)中,消息會按照 Kafka 內(nèi)部的消費機制被順序消費。

  • consumer.poll() 方法定期從 Kafka 中拉取消息,并根據(jù) key 分配到相應(yīng)的分區(qū)進行消費。

  • Kafka 的分區(qū)是順序消費的,即每個分區(qū)內(nèi)的消息按照生產(chǎn)者發(fā)送的順序消費。因此,通過確保同一訂單的消息使用相同的 key,就能保證同一分區(qū)內(nèi)消息的消費順序。

2. Kafka 順序消息實現(xiàn)的局限性
  1. 局部順序保證:Kafka 只能保證同一分區(qū)內(nèi)的消息順序,對于跨分區(qū)的消息并不保證順序。因此,確保同一業(yè)務(wù)的消息發(fā)送到同一分區(qū)非常關(guān)鍵。
  2. 性能與吞吐量:為了提高系統(tǒng)的吞吐量和并發(fā)能力,Kafka 會對 topic 進行分區(qū)。分區(qū)數(shù)過多可能影響順序性,但可以通過合理設(shè)計業(yè)務(wù)鍵來平衡性能和順序性要求。
3. 小結(jié)

通過使用 Kafka 的分區(qū)和消息鍵機制,我們可以確保同一業(yè)務(wù)主鍵的消息在同一分區(qū)內(nèi)順序消費。這種方法適用于需要保證順序性的場景,如訂單處理等。生產(chǎn)者確保消息按照業(yè)務(wù)主鍵路由到同一分區(qū),消費者則按分區(qū)順序消費消息,從而避免消息亂序的問題。


RocketMQ

在使用 RocketMQ 作為消息隊列時,確保消息的順序消費可以通過 順序消息Ordered Message)的特性來實現(xiàn)。RocketMQ 支持兩種類型的順序消費:局部順序(確保同一消息隊列內(nèi)的消息順序)和 全局順序(通過單一隊列保證全局順序,但在高并發(fā)情況下可能會影響性能)。

1. 使用 RocketMQ 實現(xiàn)順序消費
1.1 生產(chǎn)者:發(fā)送順序消息

生產(chǎn)者通過指定消息的 key 來確保具有相同 key 的消息被發(fā)送到同一個消息隊列,從而保證順序性。RocketMQ 支持發(fā)送順序消息的 API,通過 MessageQueueSelector 來指定消息發(fā)送到哪個隊列。

生產(chǎn)者代碼實現(xiàn)

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;import java.util.List;public class OrderProducer {private DefaultMQProducer producer;public OrderProducer(String groupName) throws Exception {// 創(chuàng)建生產(chǎn)者實例producer = new DefaultMQProducer(groupName);producer.setNamesrvAddr("localhost:9876"); // RocketMQ服務(wù)器地址producer.start();}public void sendOrderMessage(String orderId, String orderMessage) throws Exception {// 創(chuàng)建消息實例Message message = new Message("OrderTopic", "OrderTag", orderMessage.getBytes(RemotingHelper.DEFAULT_CHARSET));// 使用訂單ID作為消息的key,確保同一訂單的消息發(fā)送到同一隊列SendResult sendResult = producer.send(message, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {String orderId = (String) arg;int queueIndex = orderId.hashCode() % mqs.size(); // 根據(jù)訂單ID選擇隊列return mqs.get(queueIndex);}}, orderId);System.out.println("Message sent: " + sendResult);}public void close() {producer.shutdown();}public static void main(String[] args) throws Exception {OrderProducer producer = new OrderProducer("order-group");// 發(fā)送順序消息,確保同一訂單的消息被發(fā)送到同一隊列producer.sendOrderMessage("order123", "Order Created");producer.sendOrderMessage("order123", "Order Paid");producer.sendOrderMessage("order123", "Order Shipped");producer.sendOrderMessage("order456", "Order Created");producer.sendOrderMessage("order456", "Order Paid");producer.close();}
}
  • 生產(chǎn)者通過 MessageQueueSelector 來確保相同 key 的消息被發(fā)送到相同的隊列。這里我們使用 orderId 作為消息的 key,通過計算 orderId.hashCode() 來決定消息發(fā)送到哪個隊列。確保同一個訂單的消息發(fā)送到同一個隊列,從而在消費時保持順序性。
  • SendResult 會返回發(fā)送結(jié)果,包括消息發(fā)送的狀態(tài)。

1.2 消費者:順序消費消息

在消費者端,RocketMQ 提供了 MessageListenerOrderly 接口來實現(xiàn)順序消費。該接口保證在同一隊列內(nèi),消息會按照發(fā)送的順序被消費。

消費者代碼實現(xiàn)

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageListenerOrderly;
import org.apache.rocketmq.client.consumer.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.consumer.ConsumeConcurrentlyContext;
import org.apache.rocketmq.common.consumer.ConsumeOrderlyContext;import java.util.List;public class OrderConsumer {private DefaultMQPushConsumer consumer;public OrderConsumer(String groupName) throws Exception {// 創(chuàng)建消費者實例consumer = new DefaultMQPushConsumer(groupName);consumer.setNamesrvAddr("localhost:9876"); // RocketMQ服務(wù)器地址consumer.subscribe("OrderTopic", "*"); // 訂閱指定的 topic 和 tag}public void consumeMessages() throws Exception {// 設(shè)置順序消費監(jiān)聽器consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyContext consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {for (MessageExt msg : msgs) {// 消費順序消息System.out.println("Consumed message: " + new String(msg.getBody()));}return ConsumeOrderlyContext.SUCCESS;}});consumer.start();}public void close() {consumer.shutdown();}public static void main(String[] args) throws Exception {OrderConsumer consumer = new OrderConsumer("order-consumer-group");// 開始消費順序消息consumer.consumeMessages();}
}
  • 消費者使用 MessageListenerOrderly 來實現(xiàn)順序消費。該接口保證了消費者在同一消息隊列內(nèi)按順序消費消息。

  • 消費者在接收到消息后,會依次消費并輸出消息內(nèi)容。

  • RocketMQ 是基于消息隊列的,每個隊列內(nèi)的消息是順序消費的,即使有多個消費者,也只會有一個消費者消費某個隊列的消息。通過將同一 key 的消息發(fā)送到同一個隊列,可以確保這些消息按照順序被消費。

  • 需要注意的是,RocketMQ 保證的是 局部順序,即同一隊列內(nèi)的消息按照發(fā)送順序消費。對于多個隊列和多個消費者,只有同一個隊列內(nèi)的消息順序是保證的。


2. RocketMQ 順序消息的局限性
  • 局部順序保證:RocketMQ 只能保證同一隊列內(nèi)的消息順序,對于多個隊列之間的消息沒有順序保證。
  • 性能影響:如果需要保證全局順序,可能需要將所有消息都發(fā)送到同一個隊列,這會影響性能,導(dǎo)致吞吐量下降。通常需要在性能和順序性之間進行權(quán)衡。
3. 小結(jié)

通過使用 RocketMQMessageQueueSelectorMessageListenerOrderly,我們可以保證同一業(yè)務(wù)的消息在同一隊列內(nèi)順序消費。這種方式適用于需要保證順序的場景,如訂單處理、支付等高可靠性的業(yè)務(wù)系統(tǒng)。生產(chǎn)者通過業(yè)務(wù)主鍵選擇隊列,消費者則順序消費消息,確保數(shù)據(jù)一致性和業(yè)務(wù)流程的正確執(zhí)行。


方案二: 前置檢測(Pre-check)

前置檢測(Pre-check)在消息隊列消費中,常用于確保消息消費的順序性,防止因為消息亂序?qū)е碌臄?shù)據(jù)不一致或業(yè)務(wù)錯誤。其核心思想是在消息消費之前進行驗證,確保前置條件滿足才繼續(xù)消費當(dāng)前消息。

在消費者處理消息之前,進行前置條件檢查,確保上一條消息已成功消費。這可以通過消息輔助表來實現(xiàn),或者在消息中附帶序列號、時間戳等信息進行驗證。

前置檢測的方案

前置檢測主要包括以下幾種常見方法:

  1. 使用輔助表進行狀態(tài)檢查:通過創(chuàng)建一個輔助表(如狀態(tài)表或消息表),記錄消息的狀態(tài),消費者可以通過查詢該表來驗證上一個消息是否已經(jīng)成功處理,確保消息按順序消費。

  2. 使用序列號/時間戳進行順序檢查:在消息中包含序列號或時間戳,消費者根據(jù)這些信息判斷當(dāng)前消息是否按預(yù)期順序到達。如果不符合順序,則將當(dāng)前消息暫時緩存,等待前一個消息處理完成。

  3. 利用死信隊列處理無序消息:當(dāng)消息的順序不符合預(yù)期時,可以將這些消息暫時放入死信隊列(DLQ)中,待前置消息消費成功后再重新消費。

方案1: 使用輔助表進行前置檢測

假設(shè)在處理訂單相關(guān)的消息時,我們希望確保訂單的狀態(tài)始終按照正確的順序處理,比如,Order Created 應(yīng)該在 Order Paid 前消費。

1.1 方案設(shè)計
  • 設(shè)計一個 order_status 表,記錄訂單的處理狀態(tài)。
  • 消費者在處理消息前,查詢這個表,確保訂單的前置狀態(tài)已經(jīng)處理完畢。
  • 消費失敗時,可以將消息放入死信隊列或重試。
1.2 數(shù)據(jù)庫表設(shè)計
CREATE TABLE order_status (order_id VARCHAR(255) PRIMARY KEY,status VARCHAR(255) NOT NULL,update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);-- 狀態(tài)示例
-- 訂單創(chuàng)建:CREATED
-- 訂單支付:PAID
-- 訂單完成:COMPLETED
1.3 消費者前置檢測代碼實現(xiàn)
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.consumer.ConsumeOrderlyContext;
import java.util.List;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;public class OrderConsumerWithPreCheck {private static final String DB_URL = "jdbc:mysql://localhost:3306/order_db";private static final String DB_USER = "root";private static final String DB_PASSWORD = "password";private DefaultMQPushConsumer consumer;public OrderConsumerWithPreCheck(String groupName) throws Exception {consumer = new DefaultMQPushConsumer(groupName);consumer.setNamesrvAddr("localhost:9876"); // RocketMQ服務(wù)器地址consumer.subscribe("OrderTopic", "*"); // 訂閱指定的 topic 和 tag}// 檢查訂單狀態(tài)public boolean checkOrderStatus(String orderId, String expectedStatus) throws Exception {try (Connection connection = DriverManager.getConnection(DB_URL, DB_USER, DB_PASSWORD)) {String query = "SELECT status FROM order_status WHERE order_id = ?";try (PreparedStatement statement = connection.prepareStatement(query)) {statement.setString(1, orderId);ResultSet rs = statement.executeQuery();if (rs.next()) {String currentStatus = rs.getString("status");return expectedStatus.equals(currentStatus); // 比對期望狀態(tài)}}}return false; // 訂單未找到,默認返回 false}public void consumeMessages() throws Exception {consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyContext consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {for (MessageExt msg : msgs) {String orderId = msg.getKeys();  // 假設(shè)訂單ID存儲在消息的keys字段String currentStatus = new String(msg.getBody());// 檢查前置狀態(tài),確保當(dāng)前狀態(tài)是順序的try {if ("OrderCreated".equals(currentStatus)) {if (!checkOrderStatus(orderId, "CREATED")) {System.out.println("Order not created yet, skipping message: " + orderId);continue;  // 如果前置狀態(tài)不符合,跳過該消息}} else if ("OrderPaid".equals(currentStatus)) {if (!checkOrderStatus(orderId, "PAID")) {System.out.println("Order not paid yet, skipping message: " + orderId);continue;}}// 消費消息邏輯System.out.println("Processing order message: " + orderId + " - " + currentStatus);// 更新狀態(tài)或其他業(yè)務(wù)邏輯} catch (Exception e) {e.printStackTrace();}}return ConsumeOrderlyContext.SUCCESS;}});consumer.start();}public void close() {consumer.shutdown();}public static void main(String[] args) throws Exception {OrderConsumerWithPreCheck consumer = new OrderConsumerWithPreCheck("order-consumer-group");consumer.consumeMessages();}
}

方案2: 使用序列號/時間戳進行順序檢查

在這種方法中,我們?yōu)槊總€消息分配一個 序列號時間戳,并通過對比當(dāng)前消息的序列號和前一條消息的序列號來確保消息按順序消費。如果序列號不符合預(yù)期,消費者會將該消息緩存,等待前置消息的消費完成。

2.1 方案設(shè)計
  • 消息中包含一個 sequenceIdtimestamp 字段。
  • 消費者檢查當(dāng)前消息的 sequenceId,如果當(dāng)前消息的 sequenceId 小于等于上一個已消費消息的 sequenceId,則跳過當(dāng)前消息。
2.2 消費者前置檢測代碼實現(xiàn)
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.consumer.ConsumeOrderlyContext;import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;public class OrderConsumerWithSequenceCheck {private DefaultMQPushConsumer consumer;private AtomicInteger lastSequenceId = new AtomicInteger(0);  // 記錄最后處理的序列號public OrderConsumerWithSequenceCheck(String groupName) throws Exception {consumer = new DefaultMQPushConsumer(groupName);consumer.setNamesrvAddr("localhost:9876"); // RocketMQ服務(wù)器地址consumer.subscribe("OrderTopic", "*"); // 訂閱指定的 topic 和 tag}// 檢查消息的序列號,確保順序性public boolean checkSequenceId(int currentSequenceId) {return currentSequenceId == lastSequenceId.incrementAndGet();}public void consumeMessages() throws Exception {consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyContext consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {for (MessageExt msg : msgs) {int sequenceId = Integer.parseInt(new String(msg.getBody())); // 消息中的序列號if (!checkSequenceId(sequenceId)) {System.out.println("Out of order message, skipping message with sequence: " + sequenceId);continue;  // 如果消息序列號不符合順序,則跳過}// 消費消息邏輯System.out.println("Processing message with sequence ID: " + sequenceId);// 進行相應(yīng)的業(yè)務(wù)處理}return ConsumeOrderlyContext.SUCCESS;}});consumer.start();}public void close() {consumer.shutdown();}public static void main(String[] args) throws Exception {OrderConsumerWithSequenceCheck consumer = new OrderConsumerWithSequenceCheck("order-consumer-group");consumer.consumeMessages();}
}

3. 小結(jié)

前置檢測方案的核心是通過驗證當(dāng)前消息的處理條件(如訂單的狀態(tài)或消息的序列號),確保前置條件滿足后再繼續(xù)處理當(dāng)前消息。此方案能有效防止由于消息亂序?qū)е碌臄?shù)據(jù)不一致或業(yè)務(wù)錯誤,適用于需要嚴格保證處理順序的場景。

  • 數(shù)據(jù)庫檢查:通過查詢數(shù)據(jù)庫記錄來驗證消息的處理順序。
  • 序列號檢查:通過消息中的序列號或時間戳驗證消息是否按順序到達。

方案三: 狀態(tài)機

可以利用狀態(tài)機來管理消息的消費順序和狀態(tài)。狀態(tài)機的核心思想是定義系統(tǒng)的不同狀態(tài),以及觸發(fā)狀態(tài)變更的事件,從而確保消息在正確的狀態(tài)下被處理。

通過引入狀態(tài)機,我們能夠:

  • 通過狀態(tài)轉(zhuǎn)移機制保證消息按順序消費。
  • 在狀態(tài)轉(zhuǎn)移過程中,避免非法的狀態(tài)變更和消息丟失。

1. 狀態(tài)機的設(shè)計思路

在處理消息時,可以將消息消費的過程視為一系列狀態(tài)的變換。每個消息會根據(jù)其當(dāng)前狀態(tài)決定是否可以進行處理。

  1. 定義狀態(tài)

    • 定義消息消費的不同狀態(tài),例如 PENDING(待處理)、PROCESSING(處理中)、PROCESSED(已處理)。
    • 每個消息在處理過程中會從一個狀態(tài)轉(zhuǎn)移到另一個狀態(tài)。
  2. 定義事件

    • 每個消息可能觸發(fā)一個事件,事件可以是消息的到達或者某些外部條件的變化。
    • 通過事件來決定狀態(tài)的轉(zhuǎn)移。
  3. 處理順序

    • 確保某些消息必須在特定的順序下處理。比如,某個狀態(tài)的消息必須先處理完成,才能處理下一個狀態(tài)的消息。

2. 狀態(tài)機的實現(xiàn)步驟

  • 狀態(tài)定義:使用枚舉類(enum)定義消息的狀態(tài)。
  • 事件定義:根據(jù)消息到達的順序或其他外部條件觸發(fā)不同的事件。
  • 狀態(tài)機實現(xiàn):根據(jù)當(dāng)前狀態(tài)和事件的觸發(fā)來決定狀態(tài)轉(zhuǎn)移。

3. 設(shè)計與實現(xiàn)

假設(shè)我們有一個訂單處理系統(tǒng),訂單的狀態(tài)可能為以下幾種:

  • ORDER_CREATED:訂單已創(chuàng)建
  • ORDER_PAID:訂單已支付
  • ORDER_SHIPPED:訂單已發(fā)貨
  • ORDER_COMPLETED:訂單已完成

我們希望確保消息的消費順序是按順序進行的,即訂單創(chuàng)建 -> 支付 -> 發(fā)貨 -> 完成。

3.1 狀態(tài)機設(shè)計

3.1.1 定義狀態(tài)

首先定義訂單狀態(tài)的枚舉類型 OrderState

public enum OrderState {ORDER_CREATED,  // 訂單已創(chuàng)建ORDER_PAID,     // 訂單已支付ORDER_SHIPPED,  // 訂單已發(fā)貨ORDER_COMPLETED // 訂單已完成
}
3.1.2 定義事件

根據(jù)業(yè)務(wù)需求,定義事件觸發(fā)的條件。比如:

  • ORDER_CREATED_EVENT:訂單創(chuàng)建事件
  • ORDER_PAID_EVENT:訂單支付事件
  • ORDER_SHIPPED_EVENT:訂單發(fā)貨事件
  • ORDER_COMPLETED_EVENT:訂單完成事件
3.1.3 狀態(tài)機邏輯

使用一個狀態(tài)機類來管理狀態(tài)的轉(zhuǎn)換。狀態(tài)機會根據(jù)當(dāng)前狀態(tài)和觸發(fā)的事件來進行狀態(tài)轉(zhuǎn)換。

import java.util.HashMap;
import java.util.Map;public class OrderStateMachine {// 訂單狀態(tài)private OrderState currentState;// 狀態(tài)轉(zhuǎn)移規(guī)則,基于當(dāng)前狀態(tài)和事件決定下一個狀態(tài)private final Map<OrderState, Map<String, OrderState>> transitionTable;public OrderStateMachine() {// 初始化狀態(tài)為 ORDER_CREATEDthis.currentState = OrderState.ORDER_CREATED;// 初始化狀態(tài)轉(zhuǎn)移規(guī)則this.transitionTable = new HashMap<>();// 設(shè)置轉(zhuǎn)移規(guī)則// 從 ORDER_CREATED 可以轉(zhuǎn)到 ORDER_PAIDaddTransition(OrderState.ORDER_CREATED, "ORDER_CREATED_EVENT", OrderState.ORDER_PAID);// 從 ORDER_PAID 可以轉(zhuǎn)到 ORDER_SHIPPEDaddTransition(OrderState.ORDER_PAID, "ORDER_PAID_EVENT", OrderState.ORDER_SHIPPED);// 從 ORDER_SHIPPED 可以轉(zhuǎn)到 ORDER_COMPLETEDaddTransition(OrderState.ORDER_SHIPPED, "ORDER_SHIPPED_EVENT", OrderState.ORDER_COMPLETED);}// 添加狀態(tài)轉(zhuǎn)換規(guī)則private void addTransition(OrderState fromState, String event, OrderState toState) {transitionTable.putIfAbsent(fromState, new HashMap<>());transitionTable.get(fromState).put(event, toState);}// 處理事件并轉(zhuǎn)換狀態(tài)public boolean handleEvent(String event) {Map<String, OrderState> transitions = transitionTable.get(currentState);if (transitions != null && transitions.containsKey(event)) {OrderState nextState = transitions.get(event);System.out.println("State transition: " + currentState + " -> " + nextState);this.currentState = nextState; // 執(zhí)行狀態(tài)轉(zhuǎn)移return true;} else {System.out.println("Invalid event for the current state: " + currentState);return false;}}// 獲取當(dāng)前狀態(tài)public OrderState getCurrentState() {return currentState;}
}
3.1.4 使用狀態(tài)機處理消息

假設(shè)我們在消息隊列中有不同的訂單消息,需要按順序消費。我們將消費者與狀態(tài)機結(jié)合使用,確保消息按照正確的順序消費。

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.consumer.ConsumeOrderlyContext;import java.util.List;public class OrderConsumerWithStateMachine {private static final String TOPIC = "OrderTopic";private static final String GROUP = "OrderConsumerGroup";private DefaultMQPushConsumer consumer;private OrderStateMachine stateMachine;public OrderConsumerWithStateMachine() {consumer = new DefaultMQPushConsumer(GROUP);stateMachine = new OrderStateMachine();try {consumer.setNamesrvAddr("localhost:9876");consumer.subscribe(TOPIC, "*");consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyContext consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {for (MessageExt msg : msgs) {String event = new String(msg.getBody());System.out.println("Received message: " + event);// 根據(jù)消息的內(nèi)容觸發(fā)狀態(tài)機事件if ("ORDER_CREATED_EVENT".equals(event)) {stateMachine.handleEvent("ORDER_CREATED_EVENT");} else if ("ORDER_PAID_EVENT".equals(event)) {stateMachine.handleEvent("ORDER_PAID_EVENT");} else if ("ORDER_SHIPPED_EVENT".equals(event)) {stateMachine.handleEvent("ORDER_SHIPPED_EVENT");} else if ("ORDER_COMPLETED_EVENT".equals(event)) {stateMachine.handleEvent("ORDER_COMPLETED_EVENT");}System.out.println("Current state: " + stateMachine.getCurrentState());}return ConsumeOrderlyContext.SUCCESS;}});consumer.start();System.out.println("Order consumer started");} catch (Exception e) {e.printStackTrace();}}public static void main(String[] args) {new OrderConsumerWithStateMachine();}
}

4. 運行流程

  1. 消費者會根據(jù)事件(如 ORDER_CREATED_EVENT, ORDER_PAID_EVENT 等)處理消息。
  2. 消費者會觸發(fā)狀態(tài)機,狀態(tài)機會根據(jù)當(dāng)前狀態(tài)和事件來進行狀態(tài)轉(zhuǎn)換。
  3. 如果消息的順序不正確(例如 ORDER_PAID_EVENTORDER_CREATED_EVENT 之前到達),狀態(tài)機會拒絕處理,并打印 Invalid event for the current state。

5. 小結(jié)

  • 狀態(tài)機可以幫助管理消息的消費順序,確保在處理消息時遵循正確的流程和業(yè)務(wù)邏輯。
  • 通過定義狀態(tài)和事件,狀態(tài)機提供了一個清晰的框架來管理復(fù)雜的消息處理過程。
  • 結(jié)合消息隊列,狀態(tài)機可以有效地控制消息的順序消費,避免亂序帶來的問題。

監(jiān)控與報警

建立系統(tǒng)的監(jiān)控和報警機制,及時發(fā)現(xiàn)并處理消息錯亂等異常情況。通過設(shè)定閾值或檢測規(guī)則,監(jiān)控系統(tǒng)的消息流轉(zhuǎn),確保及時響應(yīng)并糾正問題。

  • 定期監(jiān)控消息隊列的消費進度,若發(fā)現(xiàn)消費滯后或消息順序異常,自動報警。
  • 通過日志和統(tǒng)計信息,捕獲異常并自動觸發(fā)處理流程。

偽實現(xiàn)

public class MessageMonitor {private static final Logger logger = LoggerFactory.getLogger(MessageMonitor.class);public void monitorMessageQueue() {// 假設(shè)有一個隊列監(jiān)控機制boolean isOutOfOrder = checkMessageOrder();if (isOutOfOrder) {logger.error("Message order error detected, triggering alert!");// 觸發(fā)報警或采取恢復(fù)措施}}private boolean checkMessageOrder() {// 檢查消息順序是否正常return false; // 假設(shè)沒有亂序}
}

總結(jié)

MQ消息亂序是分布式系統(tǒng)中常見的挑戰(zhàn),直接影響到系統(tǒng)的穩(wěn)定性和業(yè)務(wù)一致性。我們可以通過順序消息、前置檢測、狀態(tài)機等解決方案, 保證消息的順序性,提高系統(tǒng)的可靠性和用戶體驗。

在這里插入圖片描述

http://www.risenshineclean.com/news/11312.html

相關(guān)文章:

  • 我的網(wǎng)站沒備案怎么做淘寶客推廣小程序運營推廣公司
  • 服裝網(wǎng)站建設(shè)公司地址國際新聞最新消息今天軍事新聞
  • 黑龍江住房和建設(shè)廳網(wǎng)站寧波seo網(wǎng)絡(luò)推廣推薦
  • 做網(wǎng)站模板的海報尺寸多少錢電商seo優(yōu)化是什么意思
  • 做網(wǎng)站必須要電腦嗎app開發(fā)工具
  • 公司網(wǎng)站建立教程aso優(yōu)化平臺有哪些
  • 重慶建站模板源碼推動防控措施持續(xù)優(yōu)化
  • 用淘寶評論做網(wǎng)站個人博客網(wǎng)頁制作
  • 北京國家建設(shè)部網(wǎng)站首頁上海百度推廣電話
  • 購買云服務(wù)器后怎么做網(wǎng)站seo優(yōu)化幾個關(guān)鍵詞
  • 建設(shè)大型網(wǎng)站世界足球排名前100名
  • wordpress做論壇網(wǎng)站免費推廣網(wǎng)站排行榜
  • 靜態(tài)網(wǎng)站可以做哪些內(nèi)容小程序開發(fā)框架
  • 免費下載ppt模板的網(wǎng)站有哪些站長工具seo源碼
  • 做交易網(wǎng)站百度seo排名如何提升
  • 溫州網(wǎng)站建設(shè)前十公司大同優(yōu)化推廣
  • 美圖秀秀可以做網(wǎng)站嗎河南省網(wǎng)站
  • 加強網(wǎng)站政務(wù)服務(wù)建設(shè)百度指數(shù)使用方法
  • 印刷網(wǎng)站建設(shè) 優(yōu)幫云品牌運營
  • 中英切換的網(wǎng)站咋做中國最大網(wǎng)站排名
  • 做跳轉(zhuǎn)鏈接到自己的網(wǎng)站谷歌瀏覽器官網(wǎng)手機版
  • 提供網(wǎng)站技術(shù)支持系統(tǒng)優(yōu)化軟件排行榜
  • 做網(wǎng)站公司鄭州鄭州的網(wǎng)站建設(shè)公司排名湖南百度推廣
  • 桂林旅游搜索seo神器
  • 新西蘭網(wǎng)站建設(shè)山東seo多少錢
  • 幫別人做網(wǎng)站自己為什么會被抓近10天的時政新聞
  • 做英文網(wǎng)站用什么字體好響應(yīng)式網(wǎng)站模板的應(yīng)用
  • 手機做圖片設(shè)計哪個軟件好湖南正規(guī)seo優(yōu)化
  • 中山建網(wǎng)站費用多少百度推廣關(guān)鍵詞排名規(guī)則
  • 做網(wǎng)站的品牌公司蘭州網(wǎng)絡(luò)推廣