手機(jī)怎么創(chuàng)建自己的網(wǎng)頁seo優(yōu)化首頁
基于Rocket MQ擴(kuò)展的無限延遲消息隊列
背景:
- Rocket MQ支持的延遲隊列時間是固定間隔的, 默認(rèn)19個等級(包含0等級): 0s, 1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h. 我們的需求是實現(xiàn)用戶下單后48小時或72小時給用戶發(fā)送逼單郵件. 使用默認(rèn)的延遲消息無法實現(xiàn)該功能, 所以對方案進(jìn)行了改造.
實現(xiàn)原理:
-
簡單而言, 就是在Rocket MQ延遲隊列固定時間間隔的基礎(chǔ)上, 通過多次發(fā)送延遲消息, 達(dá)到任意延時時間組合計算. 通過反射的方式, 實現(xiàn)延遲業(yè)務(wù)邏輯的調(diào)用.
-
源碼如下:
-
/** Copyright (c) 2020-2030 XXX.Co.Ltd. All Rights Reserved.*/ package com.example.xxx.utils;import com.vevor.bmp.crm.common.constants.MQConstants; import lombok.Data; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.ConsumeMode; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component;import javax.annotation.Resource; import java.io.Serializable; import java.util.Calendar; import java.util.Date; import java.util.concurrent.TimeUnit;/*** @version :1.8.0* @description :基于Rocket MQ的任意延遲時長工具* @program :user-growth* @date :Created in 2023/5/22 3:35 下午* @since :1.8.0*/ @Slf4j @Component @RocketMQMessageListener(consumerGroup = MQConstants.CRM_DELAY_QUEUE_TOPIC_GROUP,topic = MQConstants.CRM_DELAY_QUEUE_TOPIC,// 消息消費順序consumeMode = ConsumeMode.CONCURRENTLY,// 最大消息重復(fù)消費次數(shù)maxReconsumeTimes = 3) public class RocketMQDelayQueueUtils implements RocketMQListener<RocketMQDelayQueueUtils.DelayTable<Object>> {/*** Rocket MQ客戶端*/@Resourceprivate RocketMQTemplate rocketMQTemplate;/*** MQ默認(rèn)延遲等級*/private static final long[] TIME_DELAY_LEVEL = new long[]{0L, 1000L, 5000L, 10000L,30000L, 60000L, 120000L, 180000L, 240000L, 300000L, 360000L, 420000L,480000L, 540000L, 600000L, 1200000L, 1800000L, 3600000L, 7200000L};@SneakyThrows@Overridepublic void onMessage(DelayTable<Object> message) {Date endTime = message.getEndTime();int delayLevel = getDelayLevel(endTime);// 繼續(xù)延遲if (delayLevel != 0) {int currentDelayCount = message.getCurrentDelayCount();currentDelayCount++;message.setCurrentDelayCount(currentDelayCount);message.setCurrentDelayLevel(delayLevel);message.setCurrentDelayMillis(TIME_DELAY_LEVEL[delayLevel]);this.sendDelayMessage(message);return;}// 執(zhí)行業(yè)務(wù)log.info("delay message end! start to process business...");Class<? extends DelayMessageHandler> messageHandler = message.getMessageHandler();if (messageHandler != null) {DelayMessageHandler delayMessageHandler = messageHandler.newInstance();delayMessageHandler.handle();}}/*** 延遲消息體** @param <E> 消息類型*/@Datapublic static class DelayTable<E> implements Serializable {private static final long serialVersionUID = 2405172041950251807L;/*** 延遲消息體*/private E content;/*** 消息延遲結(jié)束時間*/private Date endTime;/*** 總延遲毫秒數(shù)*/private long totalDelayTime;/*** 總延遲時間單位*/private TimeUnit totalDelayTimeUnit;/*** 當(dāng)前延遲次數(shù)*/private int currentDelayCount;/*** 當(dāng)前延遲等級*/private int currentDelayLevel;/*** 當(dāng)前延遲毫秒數(shù)*/private long currentDelayMillis;/*** 延遲處理邏輯*/private Class<? extends DelayMessageHandler> messageHandler;}/*** 發(fā)送延遲消息** @param message 消息體* @param delay 延遲時長* @param timeUnit 延遲時間單位* @param handler 延遲時間到了之后,需要處理的邏輯* @param <E> 延遲消息類型*/public <E> void delay(E message, int delay, TimeUnit timeUnit, Class<? extends DelayMessageHandler> handler) {// 把延遲時間轉(zhuǎn)換成時間戳(毫秒)long totalDelayMills = timeUnit.toMillis(delay);// 根據(jù)延遲時間計算結(jié)束時間Calendar instance = Calendar.getInstance();instance.add(Calendar.MILLISECOND, (int)totalDelayMills);Date endTime = instance.getTime();// 根據(jù)延遲時間匹配延遲等級(delay level)int delayLevel = getDelayLevel(endTime);long delayMillis = TIME_DELAY_LEVEL[delayLevel];// 發(fā)送消息DelayTable<E> delayTable = new DelayTable<>();// 全局?jǐn)?shù)據(jù)delayTable.setContent(message);delayTable.setMessageHandler(handler);delayTable.setEndTime(endTime);delayTable.setTotalDelayTime(delay);delayTable.setTotalDelayTimeUnit(timeUnit);// 當(dāng)前延遲等級數(shù)據(jù)delayTable.setCurrentDelayCount(1);delayTable.setCurrentDelayLevel(delayLevel);delayTable.setCurrentDelayMillis(delayMillis);this.sendDelayMessage(delayTable);}/*** 計算延遲等級** @param targetTime 延遲截止時間* @return Rocket MQ延遲消息等級*/private static int getDelayLevel(Date targetTime) {long currentTime = System.currentTimeMillis();long delayMillis = targetTime.getTime() - currentTime;if (delayMillis <= 0) {// 不延遲,即延遲等級為 0return 0;}// 判斷處于哪個延遲等級// 0s, 1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1hfor (int i = 1; i <= 18; i++) {long delayLevelTime = TIME_DELAY_LEVEL[i];if (delayMillis < delayLevelTime) {return i - 1;} else if (delayMillis == delayLevelTime) {return i;}}// 最大延遲等級為 18return 18;}/*** 發(fā)送延遲消息** @param delayTable 延遲對象,可以循環(huán)使用*/@SneakyThrowsprivate <E> void sendDelayMessage(DelayTable<E> delayTable) {// 消息序列化Message<DelayTable<E>> message = MessageBuilder.withPayload(delayTable).build();// 設(shè)置\發(fā)送延遲消息int delayLevel = delayTable.getCurrentDelayLevel();rocketMQTemplate.syncSend(MQConstants.CRM_DELAY_QUEUE_TOPIC, message, 3000, delayLevel);log.debug("delay count: {}, delay level: {}, time: {} milliseconds",delayTable.currentDelayCount, delayLevel, TIME_DELAY_LEVEL[delayLevel]);}/*** 延遲回調(diào)接口** 回調(diào)邏輯必須實現(xiàn)該接口#hander()方法,在延遲結(jié)束后,會通過反射的方式調(diào)用該方法*/public interface DelayMessageHandler extends Serializable {long serialVersionUID = 2405172041950251807L;/*** 回調(diào)函數(shù)*/void handle();}}
測試代碼:
-
/** Copyright (c) 2020-2030 Sishun.Co.Ltd. All Rights Reserved.*/ package com.vevor.bmp.crm.io.controller;import com.vevor.bmp.crm.cpm.utils.RocketMQDelayQueueUtils; import com.vevor.common.pojo.vo.ResponseResult; import lombok.Data; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.redisson.api.RBlockingQueue; import org.redisson.api.RedissonClient; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource; import java.util.concurrent.TimeUnit;/*** @version :1.8.0* @description :延遲隊列測試* @program :user-growth* @date :Created in 2023/5/22 4:54 下午* @since :1.8.0*/ @Slf4j @RestController public class DelayQueueController {@Resourceprivate RocketMQDelayQueueUtils rocketMQDelayQueueUtils;@GetMapping("/mq/delay")@SneakyThrowspublic ResponseResult<String> mqDelay(@RequestParam Integer delay, @RequestParam String task) {// 獲取延時隊列rocketMQDelayQueueUtils.delay(task, delay, TimeUnit.SECONDS, CallBack.class);return ResponseResult.success();}/*** @version :* @description :* @program :user-growth* @date :Created in 2023/5/23 2:11 下午* @since :*/@Datapublic static class CallBack implements RocketMQDelayQueueUtils.DelayMessageHandler {/*** 回調(diào)函數(shù)*/@Overridepublic void handle() {log.info("i am business logical! {}", System.currentTimeMillis());}} }
優(yōu)缺點:
- 優(yōu)點: 與定時任務(wù)框架相比, 通過延遲消息的方式具實時性高、 支持分布式、輕量級、高并發(fā)等優(yōu)點.
- 缺點: 消息的準(zhǔn)確性不可靠, 正常情況下準(zhǔn)確性在秒級, 但是當(dāng)MQ服務(wù)出現(xiàn)消息堆積時, 消息的時間就會偏差較大, 所以準(zhǔn)確性依賴MQ服務(wù)的穩(wěn)定.