企業(yè)網(wǎng)站建設(shè)模塊長沙百度開戶
前言:
上一篇我們分享了 RocketMQ 消息重試的一些基本原理,本篇我們基于 Spring Boot 整合 RocketMQ 來分享一下 RocketMQ 消息基于手動提交的案例,在分享手動進行消息 ACK 中也會分享消息重試的使用。
RocketMQ 系列文章傳送門
RocketMQ 的介紹及核心概念講解
Spring Boot 整合 RocketMQ 之普通消息
Spring Boot 整合 RocketMQ 之定時/延時消息
Spring Boot 整合 RocketMQ 之順序消息
Spring Boot 整合 RocketMQ 之事務(wù)消息
RocketMQ 之消息重試機制
同步消息手動提交 ACK 案例分享
同步消息 Producer 消息發(fā)送代碼案例
同步消息發(fā)送的代碼前面分享過,感興趣的朋友也可以通過上面的系列文章鏈接去查看,這里還是簡單的分享一下同步消息 Producer 消息發(fā)送代碼,具體如下:
package com.order.service.rocketmq.producer;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;import java.util.ArrayList;
import java.util.List;/*** @ClassName: OneWayMessageProducer* @Author: zhangyong* @Date: 2024/9/27 17:27* @Description: 同步消息發(fā)送者*/
@Slf4j
@Component
public class SyncMessageProducer {@Autowiredprivate RocketMQTemplate rocketMqTemplate;/*** @param message:* @date 2024/10/10 17:47* @description 同步消息發(fā)送*/public void sendSyncMessage(String message) {rocketMqTemplate.syncSend("sync-topic", MessageBuilder.withPayload(message).build());}}
同步消息手動 ACK Consumer 端代碼案例分享
RocketMQ 消息手動 ACK 就不能再使用 @RocketMQMessageListener 注解 + 實現(xiàn) RocketMQListener 接口的方式來實現(xiàn)了,RocketMQListener 的源碼如下:
package org.apache.rocketmq.spring.core;public interface RocketMQListener<T> {void onMessage(T var1);
}
我們可以看到 RocketMQListener 中只提供了一個 onMessage 方法,且返回值為 void,不接受返回值,因此沒辦法進行手動 ACK。
這里我們使用 DefaultMQPushConsumer 來實現(xiàn)消息的手動 ACK,DefaultMQPushConsumer 實現(xiàn)了 MQPushConsumer 接口,具體實現(xiàn)代碼如下:
package com.order.service.rocketmq.consumer;import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;/*** @ClassName: ManualCommitSyncCounsumer* @Author: Author* @Date: 2024/10/16 16:23* @Description:*/
@Slf4j
@Component
public class ManualCommitSyncCounsumer {/*** @date 2024/10/16 17:19* @description 同步消息消費成功 手動提交*/@PostConstructpublic void onSyncMessage() throws MQClientException {//消費者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sync-group");//設(shè)置最大消息重試次數(shù)//consumer.setMaxReconsumeTimes(2);//RocketMQ 地址可以 可以用配置文件consumer.setNamesrvAddr("xxx-xxx-rocketmq.xxx.com:19876");//訂閱一個或多個topic,并指定tag過濾條件,這里指定 * 表示接收所有 tag 的消息consumer.subscribe("sync-topic", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {//存儲消息id 和消費次數(shù)的關(guān)系final Map<String, Integer> map = new HashMap<>();@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String dateStr = sdf.format(new Date());//消息處理MessageExt messageExt = list.get(0);String message = new String(messageExt.getBody());String msgId = messageExt.getMsgId();//獲取消息消費次數(shù)Integer count = map.get(msgId);if (count == null) {count = 0;}//次數(shù)+1count = count + 1;//覆蓋mapmap.put(msgId, count);if (count > 2) {log.info("當前時間:{},當前消息id:{},是第【{}】次消費,直接返回消費消費成功,消息內(nèi)容:{}", dateStr, msgId, count, message);//消息消費成功后移除map.remove(msgId);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}log.info("當前時間:{},當前消息id:{},是第【{}】次消費,消息內(nèi)容:{}", dateStr, msgId, count, message);//模擬除 0 異常//int a = 1 / 0;log.info("當前時間:{},當前消息id:{},是第【{}】次消費,消息消費成功,消息內(nèi)容:{}", dateStr, msgId, count, message);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//return null;}});consumer.start();}}
RocketMQ 同步消費端手動 ACK 結(jié)果驗證:
正常消費返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS
2024-10-19 10:23:07.575 INFO 10820 --- [MessageThread_1] c.o.s.r.c.ManualCommitSyncCounsumer : 當前時間:2024-10-19 10:23:07,當前消息id:7F0000012A4418B4AAC25EECF4F50000,是第【1】次消費,消息內(nèi)容:小明同學(xué)你媽喊你回家吃飯了
2024-10-19 10:23:07.575 INFO 10820 --- [MessageThread_1] c.o.s.r.c.ManualCommitSyncCounsumer : 當前時間:2024-10-19 10:23:07,當前消息id:7F0000012A4418B4AAC25EECF4F50000,是第【1】次消費,消息消費成功,消息內(nèi)容:小明同學(xué)你媽喊你回家吃飯了
沒有模擬除 0 異常,正常消費返回,一次就消費成功,結(jié)果符合預(yù)期。
模擬除 0 異常
2024-10-19 10:19:59.026 INFO 34052 --- [MessageThread_2] c.o.s.r.c.ManualCommitSyncCounsumer : 當前時間:2024-10-19 10:19:59,當前消息id:7F000001850418B4AAC25EEA14AF0003,是第【1】次消費,消息內(nèi)容:小明同學(xué)你媽喊你回家吃飯了
2024-10-19 10:20:09.029 INFO 34052 --- [MessageThread_3] c.o.s.r.c.ManualCommitSyncCounsumer : 當前時間:2024-10-19 10:20:09,當前消息id:7F000001850418B4AAC25EEA14AF0003,是第【2】次消費,消息內(nèi)容:小明同學(xué)你媽喊你回家吃飯了
2024-10-19 10:20:39.032 INFO 34052 --- [MessageThread_4] c.o.s.r.c.ManualCommitSyncCounsumer : 當前時間:2024-10-19 10:20:39,當前消息id:7F000001850418B4AAC25EEA14AF0003,是第【3】次消費,直接返回消費消費成功,消息內(nèi)容:小明同學(xué)你媽喊你回家吃飯了
可以看到消息在不斷重試消息,分別是 2024-10-19 10:19:59、2024-10-19 10:20:09、2024-10-19 10:20:39 觸發(fā)了消費,時間間隔也是 10秒、30秒,結(jié)果符合預(yù)期。
正常消費但是返回 NULL
2024-10-19 10:25:47.338 INFO 27256 --- [MessageThread_1] c.o.s.r.c.ManualCommitSyncCounsumer : 當前時間:2024-10-19 10:25:47,當前消息id:7F0000016A7818B4AAC25EEF65120000,是第【1】次消費,消息內(nèi)容:小明同學(xué)你媽喊你回家吃飯了
2024-10-19 10:25:47.338 INFO 27256 --- [MessageThread_1] c.o.s.r.c.ManualCommitSyncCounsumer : 當前時間:2024-10-19 10:25:47,當前消息id:7F0000016A7818B4AAC25EEF65120000,是第【1】次消費,消息消費成功,消息內(nèi)容:小明同學(xué)你媽喊你回家吃飯了
2024-10-19 10:25:57.344 INFO 27256 --- [MessageThread_2] c.o.s.r.c.ManualCommitSyncCounsumer : 當前時間:2024-10-19 10:25:57,當前消息id:7F0000016A7818B4AAC25EEF65120000,是第【2】次消費,消息內(nèi)容:小明同學(xué)你媽喊你回家吃飯了
2024-10-19 10:25:57.344 INFO 27256 --- [MessageThread_2] c.o.s.r.c.ManualCommitSyncCounsumer : 當前時間:2024-10-19 10:25:57,當前消息id:7F0000016A7818B4AAC25EEF65120000,是第【2】次消費,消息消費成功,消息內(nèi)容:小明同學(xué)你媽喊你回家吃飯了
2024-10-19 10:26:27.347 INFO 27256 --- [MessageThread_3] c.o.s.r.c.ManualCommitSyncCounsumer : 當前時間:2024-10-19 10:26:27,當前消息id:7F0000016A7818B4AAC25EEF65120000,是第【3】次消費,直接返回消費消費成功,消息內(nèi)容:小明同學(xué)你媽喊你回家吃飯了
可以看到返回 NULL 和模擬除 0 異常是一樣的效果,消息在不斷重試消息,分別是 2024-10-19 10:19:59、2024-10-19 10:20:09、2024-10-19 10:20:39 觸發(fā)了消費,時間間隔也是 10秒、30秒,結(jié)果符合預(yù)期。
上面的案例我們沒有控制消息消費重試次數(shù),我們可以設(shè)置一個消息消費重試次數(shù),代碼如下:
//設(shè)置最大消息重試次數(shù)
consumer.setMaxReconsumeTimes(2);
使用 @RocketMQMessageListener 注解 + 實現(xiàn) RocketMQListener 接口的方式消費消息的時候消費失敗也會自動進行重試,因此我們一定要控制好重試次數(shù),可以 @RocketMQMessageListener 注解的 maxReconsumeTimes 來控制重試次數(shù)(高版本的 starter 才有該屬性)。
不管使用哪種方式進行消費,對于達到重試次數(shù)還是消費失敗的消息一般有兩種處理方式,分別是:
- 直接返回消息消費成功,使用本地庫記錄消息進行重新推送或者人工介入處理。
- 不進行處理,讓消息進入死信隊列,然后去監(jiān)聽死信隊列進行處理。
順序消息 Producer 消息發(fā)送代碼案例
順序消息發(fā)送的代碼前面分享過,感興趣的朋友也可以通過上面的系列文章鏈接去查看,這里還是簡單的分享一下順序消息 Producer 消息發(fā)送代碼,具體如下:
package com.order.service.rocketmq.producer;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;/*** @ClassName: OneWayMessageProducer* @Author: Author* @Date: 2024/9/27 17:27* @Description: 順序消息發(fā)送者*/
@Slf4j
@Component
public class OrderlyMessageProducer {@Autowiredprivate RocketMQTemplate rocketMqTemplate;/*** @date 2024/10/11 15:45* @description 同步順序消息*/public void syncSendOrderly() {//hashKey 用來計算決定消息發(fā)送到哪個隊列 一般是訂單 ID 等信息 這里我們模擬訂單 ID 發(fā)送rocketMqTemplate.syncSendOrderly("orderly-topic", MessageBuilder.withPayload("同步順序消息,訂單編號:666666 創(chuàng)建").build(), "666666");rocketMqTemplate.syncSendOrderly("orderly-topic", MessageBuilder.withPayload("同步順序消息,訂單編號:666666 支付").build(), "666666");rocketMqTemplate.syncSendOrderly("orderly-topic", MessageBuilder.withPayload("同步順序消息,訂單編號:666666 確認收貨").build(), "666666");rocketMqTemplate.syncSendOrderly("orderly-topic", MessageBuilder.withPayload("同步順序消息,訂單編號:888888 創(chuàng)建").build(), "888888");rocketMqTemplate.syncSendOrderly("orderly-topic", MessageBuilder.withPayload("同步順序消息,訂單編號:888888 支付").build(), "888888");rocketMqTemplate.syncSendOrderly("orderly-topic", MessageBuilder.withPayload("同步順序消息,訂單編號:888888 確認收貨").build(), "888888");}}
順序消息 Consumer 消息消費代碼案例
同步消息手動 ACK 我們是注冊了一個 MessageListenerConcurrently 消息監(jiān)聽器,順序消息的手動 ACK 我們需要注冊一個 MessageListenerOrderly 的消息監(jiān)聽器,具體代碼如下:
package com.order.service.rocketmq.consumer;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;/*** @ClassName: ManualCommitOrderlyMessageConsumer* @Author: Author* @Date: 2024/10/10 17:35* @Description: 順序消息消費*/
@Slf4j
@Component
public class ManualCommitOrderlyMessageConsumer {/*** @date 2024/10/16 17:19* @description 順序消息消費成功 手動提交*/@PostConstructpublic void onOrderlyMessage() throws MQClientException {//消費者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderly-group");//設(shè)置最大消息重試次數(shù)//consumer.setMaxReconsumeTimes(2);//RocketMQ 地址可以 可以用配置文件consumer.setNamesrvAddr("dev-ztn-rocketmq.eminxing.com:19876");//訂閱一個或多個topic,并指定tag過濾條件,這里指定 * 表示接收所有 tag 的消息consumer.subscribe("orderly-topic", "*");consumer.registerMessageListener(new MessageListenerOrderly() {//存儲消息id 和消費次數(shù)的關(guān)系final Map<String, Integer> map = new HashMap<>();@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String dateStr = sdf.format(new Date());//消息處理MessageExt messageExt = list.get(0);String message = new String(messageExt.getBody());String msgId = messageExt.getMsgId();//獲取消息消費次數(shù)Integer count = map.get(msgId);if (count == null) {count = 0;}//次數(shù)+1count = count + 1;//覆蓋mapmap.put(msgId, count);if (count > 2) {log.info("當前時間:{},當前消息id:{},是第【{}】次消費,直接返回順序消費消費成功,消息內(nèi)容:{}", dateStr, msgId, count, message);//消息消費成功后移除map.remove(msgId);return ConsumeOrderlyStatus.SUCCESS;}log.info("當前時間:{},當前消息id:{},是第【{}】次消費,順序消息內(nèi)容:{}", dateStr, msgId, count, message);//模擬除 0 異常//int a = 1 / 0;log.info("當前時間:{},當前消息id:{},是第【{}】次消費,順序消息消費成功,消息內(nèi)容:{}", dateStr, msgId, count, message);return ConsumeOrderlyStatus.SUCCESS;//return null;}});consumer.start();}}
RocketMQ 順序消費端手動 ACK 結(jié)果驗證:
正常消費返回 ConsumeOrderlyStatus.SUCCESS
2024-10-19 13:46:44.293 INFO 29348 --- [orderly-group_1] s.r.c.ManualCommitOrderlyMessageConsumer : 當前時間:2024-10-19 13:46:44,當前消息id:7F00000172A418B4AAC25FA75EBC0000,是第【1】次消費,順序消息內(nèi)容:同步順序消息,訂單編號:666666 創(chuàng)建
2024-10-19 13:46:44.294 INFO 29348 --- [orderly-group_1] s.r.c.ManualCommitOrderlyMessageConsumer : 當前時間:2024-10-19 13:46:44,當前消息id:7F00000172A418B4AAC25FA75EBC0000,是第【1】次消費,順序消息消費成功,消息內(nèi)容:同步順序消息,訂單編號:666666 創(chuàng)建
2024-10-19 13:46:44.295 INFO 29348 --- [orderly-group_2] s.r.c.ManualCommitOrderlyMessageConsumer : 當前時間:2024-10-19 13:46:44,當前消息id:7F00000172A418B4AAC25FA75EC00001,是第【1】次消費,順序消息內(nèi)容:同步順序消息,訂單編號:666666 支付
2024-10-19 13:46:44.295 INFO 29348 --- [orderly-group_2] s.r.c.ManualCommitOrderlyMessageConsumer : 當前時間:2024-10-19 13:46:44,當前消息id:7F00000172A418B4AAC25FA75EC00001,是第【1】次消費,順序消息消費成功,消息內(nèi)容:同步順序消息,訂單編號:666666 支付
2024-10-19 13:46:44.295 INFO 29348 --- [orderly-group_2] s.r.c.ManualCommitOrderlyMessageConsumer : 當前時間:2024-10-19 13:46:44,當前消息id:7F00000172A418B4AAC25FA75EC30002,是第【1】次消費,順序消息內(nèi)容:同步順序消息,訂單編號:666666 確認收貨
2024-10-19 13:46:44.295 INFO 29348 --- [orderly-group_2] s.r.c.ManualCommitOrderlyMessageConsumer : 當前時間:2024-10-19 13:46:44,當前消息id:7F00000172A418B4AAC25FA75EC30002,是第【1】次消費,順序消息消費成功,消息內(nèi)容:同步順序消息,訂單編號:666666 確認收貨
2024-10-19 13:46:44.298 INFO 29348 --- [orderly-group_3] s.r.c.ManualCommitOrderlyMessageConsumer : 當前時間:2024-10-19 13:46:44,當前消息id:7F00000172A418B4AAC25FA75EC50003,是第【1】次消費,順序消息內(nèi)容:同步順序消息,訂單編號:888888 創(chuàng)建
2024-10-19 13:46:44.298 INFO 29348 --- [orderly-group_3] s.r.c.ManualCommitOrderlyMessageConsumer : 當前時間:2024-10-19 13:46:44,當前消息id:7F00000172A418B4AAC25FA75EC50003,是第【1】次消費,順序消息消費成功,消息內(nèi)容:同步順序消息,訂單編號:888888 創(chuàng)建
2024-10-19 13:46:44.300 INFO 29348 --- [orderly-group_4] s.r.c.ManualCommitOrderlyMessageConsumer : 當前時間:2024-10-19 13:46:44,當前消息id:7F00000172A418B4AAC25FA75EC70004,是第【1】次消費,順序消息內(nèi)容:同步順序消息,訂單編號:888888 支付
2024-10-19 13:46:44.300 INFO 29348 --- [orderly-group_4] s.r.c.ManualCommitOrderlyMessageConsumer : 當前時間:2024-10-19 13:46:44,當前消息id:7F00000172A418B4AAC25FA75EC70004,是第【1】次消費,順序消息消費成功,消息內(nèi)容:同步順序消息,訂單編號:888888 支付
2024-10-19 13:46:44.302 INFO 29348 --- [orderly-group_5] s.r.c.ManualCommitOrderlyMessageConsumer : 當前時間:2024-10-19 13:46:44,當前消息id:7F00000172A418B4AAC25FA75EC90005,是第【1】次消費,順序消息內(nèi)容:同步順序消息,訂單編號:888888 確認收貨
2024-10-19 13:46:44.303 INFO 29348 --- [orderly-group_5] s.r.c.ManualCommitOrderlyMessageConsumer : 當前時間:2024-10-19 13:46:44,當前消息id:7F00000172A418B4AAC25FA75EC90005,是第【1】次消費,順序消息消費成功,消息內(nèi)容:同步順序消息,訂單編號:888888 確認收貨
可以看到是按發(fā)送順序消費的,結(jié)果符合預(yù)期。
模擬除 0 異常
2024-10-19 13:50:45.587 INFO 27604 --- [orderly-group_4] s.r.c.ManualCommitOrderlyMessageConsumer : 當前時間:2024-10-19 13:50:45,當前消息id:7F0000016BD418B4AAC25FAB0D4F0000,是第【1】次消費,順序消息內(nèi)容:同步順序消息,訂單編號:666666 創(chuàng)建
2024-10-19 13:50:46.588 INFO 27604 --- [orderly-group_5] s.r.c.ManualCommitOrderlyMessageConsumer : 當前時間:2024-10-19 13:50:46,當前消息id:7F0000016BD418B4AAC25FAB0D4F0000,是第【2】次消費,順序消息內(nèi)容:同步順序消息,訂單編號:666666 創(chuàng)建
2024-10-19 13:50:47.592 INFO 27604 --- [orderly-group_6] s.r.c.ManualCommitOrderlyMessageConsumer : 當前時間:2024-10-19 13:50:47,當前消息id:7F0000016BD418B4AAC25FAB0D4F0000,是第【3】次消費,直接返回順序消費消費成功,消息內(nèi)容:同步順序消息,訂單編號:666666 創(chuàng)建
2024-10-19 13:50:47.592 INFO 27604 --- [orderly-group_6] s.r.c.ManualCommitOrderlyMessageConsumer : 當前時間:2024-10-19 13:50:47,當前消息id:7F0000016BD418B4AAC25FAB0D540001,是第【1】次消費,順序消息內(nèi)容:同步順序消息,訂單編號:666666 支付
2024-10-19 13:50:48.593 INFO 27604 --- [orderly-group_7] s.r.c.ManualCommitOrderlyMessageConsumer : 當前時間:2024-10-19 13:50:48,當前消息id:7F0000016BD418B4AAC25FAB0D540001,是第【2】次消費,順序消息內(nèi)容:同步順序消息,訂單編號:666666 支付
2024-10-19 13:50:49.595 INFO 27604 --- [orderly-group_8] s.r.c.ManualCommitOrderlyMessageConsumer : 當前時間:2024-10-19 13:50:49,當前消息id:7F0000016BD418B4AAC25FAB0D540001,是第【3】次消費,直接返回順序消費消費成功,消息內(nèi)容:同步順序消息,訂單編號:666666 支付
2024-10-19 13:50:49.595 INFO 27604 --- [orderly-group_8] s.r.c.ManualCommitOrderlyMessageConsumer : 當前時間:2024-10-19 13:50:49,當前消息id:7F0000016BD418B4AAC25FAB0D550002,是第【1】次消費,順序消息內(nèi)容:同步順序消息,訂單編號:666666 確認收貨
2024-10-19 13:50:50.610 INFO 27604 --- [orderly-group_9] s.r.c.ManualCommitOrderlyMessageConsumer : 當前時間:2024-10-19 13:50:50,當前消息id:7F0000016BD418B4AAC25FAB0D550002,是第【2】次消費,順序消息內(nèi)容:同步順序消息,訂單編號:666666 確認收貨
2024-10-19 13:50:51.611 INFO 27604 --- [rderly-group_10] s.r.c.ManualCommitOrderlyMessageConsumer : 當前時間:2024-10-19 13:50:51,當前消息id:7F0000016BD418B4AAC25FAB0D550002,是第【3】次消費,直接返回順序消費消費成功,消息內(nèi)容:同步順序消息,訂單編號:666666 確認收貨
2024-10-19 13:50:51.611 INFO 27604 --- [rderly-group_10] s.r.c.ManualCommitOrderlyMessageConsumer : 當前時間:2024-10-19 13:50:51,當前消息id:7F0000016BD418B4AAC25FAB0D580003,是第【1】次消費,順序消息內(nèi)容:同步順序消息,訂單編號:888888 創(chuàng)建
2024-10-19 13:50:52.617 INFO 27604 --- [rderly-group_11] s.r.c.ManualCommitOrderlyMessageConsumer : 當前時間:2024-10-19 13:50:52,當前消息id:7F0000016BD418B4AAC25FAB0D580003,是第【2】次消費,順序消息內(nèi)容:同步順序消息,訂單編號:888888 創(chuàng)建
2024-10-19 13:50:53.618 INFO 27604 --- [rderly-group_12] s.r.c.ManualCommitOrderlyMessageConsumer : 當前時間:2024-10-19 13:50:53,當前消息id:7F0000016BD418B4AAC25FAB0D580003,是第【3】次消費,直接返回順序消費消費成功,消息內(nèi)容:同步順序消息,訂單編號:888888 創(chuàng)建
2024-10-19 13:50:53.618 INFO 27604 --- [rderly-group_12] s.r.c.ManualCommitOrderlyMessageConsumer : 當前時間:2024-10-19 13:50:53,當前消息id:7F0000016BD418B4AAC25FAB0D590004,是第【1】次消費,順序消息內(nèi)容:同步順序消息,訂單編號:888888 支付
2024-10-19 13:50:54.620 INFO 27604 --- [rderly-group_13] s.r.c.ManualCommitOrderlyMessageConsumer : 當前時間:2024-10-19 13:50:54,當前消息id:7F0000016BD418B4AAC25FAB0D590004,是第【2】次消費,順序消息內(nèi)容:同步順序消息,訂單編號:888888 支付
2024-10-19 13:50:55.627 INFO 27604 --- [rderly-group_14] s.r.c.ManualCommitOrderlyMessageConsumer : 當前時間:2024-10-19 13:50:55,當前消息id:7F0000016BD418B4AAC25FAB0D590004,是第【3】次消費,直接返回順序消費消費成功,消息內(nèi)容:同步順序消息,訂單編號:888888 支付
2024-10-19 13:50:55.627 INFO 27604 --- [rderly-group_14] s.r.c.ManualCommitOrderlyMessageConsumer : 當前時間:2024-10-19 13:50:55,當前消息id:7F0000016BD418B4AAC25FAB0D5B0005,是第【1】次消費,順序消息內(nèi)容:同步順序消息,訂單編號:888888 確認收貨
2024-10-19 13:50:56.629 INFO 27604 --- [rderly-group_15] s.r.c.ManualCommitOrderlyMessageConsumer : 當前時間:2024-10-19 13:50:56,當前消息id:7F0000016BD418B4AAC25FAB0D5B0005,是第【2】次消費,順序消息內(nèi)容:同步順序消息,訂單編號:888888 確認收貨
2024-10-19 13:50:57.631 INFO 27604 --- [rderly-group_16] s.r.c.ManualCommitOrderlyMessageConsumer : 當前時間:2024-10-19 13:50:57,當前消息id:7F0000016BD418B4AAC25FAB0D5B0005,是第【3】次消費,直接返回順序消費消費成功,消息內(nèi)容:同步順序消息,訂單編號:888888 確認收貨
可以看到是按發(fā)送順序消費的,且每條消息都是經(jīng)過第三次消費后才返回成功的,每次消費具體上一次消費的時間間隔是 1 秒,結(jié)果符合預(yù)期。
這里需要注意一點的時候上一條消息沒有消費成功時,后面一條消息永遠不會被消費,這個從我們的日志中也能夠體現(xiàn)出來,因此我們在使用順序消息的時候一定要注意消息的重試次數(shù),消息重試次數(shù)我們可以通過自己的業(yè)務(wù)來判斷消費幾次,也可以使用 RocketMQ 來實現(xiàn),代碼如下:
//設(shè)置最大消息重試次數(shù)
consumer.setMaxReconsumeTimes(2);
模擬返回 NULL
2024-10-19 14:00:51.484 INFO 22728 --- [rderly-group_14] s.r.c.ManualCommitOrderlyMessageConsumer : 當前時間:2024-10-19 14:00:51,當前消息id:7F00000158C818B4AAC25FB44C170000,是第【1】次消費,順序消息內(nèi)容:同步順序消息,訂單編號:666666 創(chuàng)建
2024-10-19 14:00:52.485 INFO 22728 --- [rderly-group_15] s.r.c.ManualCommitOrderlyMessageConsumer : 當前時間:2024-10-19 14:00:52,當前消息id:7F00000158C818B4AAC25FB44C170000,是第【2】次消費,順序消息內(nèi)容:同步順序消息,訂單編號:666666 創(chuàng)建
2024-10-19 14:00:53.492 INFO 22728 --- [rderly-group_16] s.r.c.ManualCommitOrderlyMessageConsumer : 當前時間:2024-10-19 14:00:53,當前消息id:7F00000158C818B4AAC25FB44C170000,是第【3】次消費,直接返回順序消費消費成功,消息內(nèi)容:同步順序消息,訂單編號:666666 創(chuàng)建
2024-10-19 14:00:53.492 INFO 22728 --- [rderly-group_16] s.r.c.ManualCommitOrderlyMessageConsumer : 當前時間:2024-10-19 14:00:53,當前消息id:7F00000158C818B4AAC25FB44C1B0001,是第【1】次消費,順序消息內(nèi)容:同步順序消息,訂單編號:666666 支付
2024-10-19 14:00:54.492 INFO 22728 --- [rderly-group_17] s.r.c.ManualCommitOrderlyMessageConsumer : 當前時間:2024-10-19 14:00:54,當前消息id:7F00000158C818B4AAC25FB44C1B0001,是第【2】次消費,順序消息內(nèi)容:同步順序消息,訂單編號:666666 支付
2024-10-19 14:00:55.494 INFO 22728 --- [rderly-group_18] s.r.c.ManualCommitOrderlyMessageConsumer : 當前時間:2024-10-19 14:00:55,當前消息id:7F00000158C818B4AAC25FB44C1B0001,是第【3】次消費,直接返回順序消費消費成功,消息內(nèi)容:同步順序消息,訂單編號:666666 支付
2024-10-19 14:00:55.494 INFO 22728 --- [rderly-group_18] s.r.c.ManualCommitOrderlyMessageConsumer : 當前時間:2024-10-19 14:00:55,當前消息id:7F00000158C818B4AAC25FB44C1D0002,是第【1】次消費,順序消息內(nèi)容:同步順序消息,訂單編號:666666 確認收貨
2024-10-19 14:00:56.495 INFO 22728 --- [rderly-group_19] s.r.c.ManualCommitOrderlyMessageConsumer : 當前時間:2024-10-19 14:00:56,當前消息id:7F00000158C818B4AAC25FB44C1D0002,是第【2】次消費,順序消息內(nèi)容:同步順序消息,訂單編號:666666 確認收貨
2024-10-19 14:00:57.497 INFO 22728 --- [rderly-group_20] s.r.c.ManualCommitOrderlyMessageConsumer : 當前時間:2024-10-19 14:00:57,當前消息id:7F00000158C818B4AAC25FB44C1D0002,是第【3】次消費,直接返回順序消費消費成功,消息內(nèi)容:同步順序消息,訂單編號:666666 確認收貨
2024-10-19 14:00:57.497 INFO 22728 --- [rderly-group_20] s.r.c.ManualCommitOrderlyMessageConsumer : 當前時間:2024-10-19 14:00:57,當前消息id:7F00000158C818B4AAC25FB44C1F0003,是第【1】次消費,順序消息內(nèi)容:同步順序消息,訂單編號:888888 創(chuàng)建
2024-10-19 14:00:58.510 INFO 22728 --- [orderly-group_1] s.r.c.ManualCommitOrderlyMessageConsumer : 當前時間:2024-10-19 14:00:58,當前消息id:7F00000158C818B4AAC25FB44C1F0003,是第【2】次消費,順序消息內(nèi)容:同步順序消息,訂單編號:888888 創(chuàng)建
2024-10-19 14:00:59.520 INFO 22728 --- [orderly-group_2] s.r.c.ManualCommitOrderlyMessageConsumer : 當前時間:2024-10-19 14:00:59,當前消息id:7F00000158C818B4AAC25FB44C1F0003,是第【3】次消費,直接返回順序消費消費成功,消息內(nèi)容:同步順序消息,訂單編號:888888 創(chuàng)建
2024-10-19 14:00:59.520 INFO 22728 --- [orderly-group_2] s.r.c.ManualCommitOrderlyMessageConsumer : 當前時間:2024-10-19 14:00:59,當前消息id:7F00000158C818B4AAC25FB44C200004,是第【1】次消費,順序消息內(nèi)容:同步順序消息,訂單編號:888888 支付
2024-10-19 14:01:00.521 INFO 22728 --- [orderly-group_3] s.r.c.ManualCommitOrderlyMessageConsumer : 當前時間:2024-10-19 14:01:00,當前消息id:7F00000158C818B4AAC25FB44C200004,是第【2】次消費,順序消息內(nèi)容:同步順序消息,訂單編號:888888 支付
2024-10-19 14:01:01.523 INFO 22728 --- [orderly-group_4] s.r.c.ManualCommitOrderlyMessageConsumer : 當前時間:2024-10-19 14:01:01,當前消息id:7F00000158C818B4AAC25FB44C200004,是第【3】次消費,直接返回順序消費消費成功,消息內(nèi)容:同步順序消息,訂單編號:888888 支付
2024-10-19 14:01:01.523 INFO 22728 --- [orderly-group_4] s.r.c.ManualCommitOrderlyMessageConsumer : 當前時間:2024-10-19 14:01:01,當前消息id:7F00000158C818B4AAC25FB44C220005,是第【1】次消費,順序消息內(nèi)容:同步順序消息,訂單編號:888888 確認收貨
2024-10-19 14:01:02.525 INFO 22728 --- [orderly-group_5] s.r.c.ManualCommitOrderlyMessageConsumer : 當前時間:2024-10-19 14:01:02,當前消息id:7F00000158C818B4AAC25FB44C220005,是第【2】次消費,順序消息內(nèi)容:同步順序消息,訂單編號:888888 確認收貨
2024-10-19 14:01:03.527 INFO 22728 --- [orderly-group_6] s.r.c.ManualCommitOrderlyMessageConsumer : 當前時間:2024-10-19 14:01:03,當前消息id:7F00000158C818B4AAC25FB44C220005,是第【3】次消費,直接返回順序消費消費成功,消息內(nèi)容:同步順序消息,訂單編號:888888 確認收貨
可以看到是按發(fā)送順序消費的,且每條消息都是經(jīng)過第三次消費后才返回成功的,每次消費具體上一次消費的時間間隔是 1 秒,和模擬除 0異常 一樣的結(jié)果,結(jié)果符合預(yù)期。
總結(jié):RocketMQ 消息重試次數(shù)可以通過 @RocketMQMessageListener 注解的 maxReconsumeTimes 屬性來設(shè)置,也可以通過編碼來設(shè)置,不管采用何種方式來設(shè)置,我們都要在業(yè)務(wù)編碼中做好處理,過多重視造成的性能問題,以及沒有合理處理消費失敗的消息造成的消息丟失問題,對于順序消息我們更要慎重對待,順序消息回到導(dǎo)致消息阻塞。
如有不正確的地方歡迎各位指出糾正。