品牌廣告設(shè)計制作公司網(wǎng)站源碼班級優(yōu)化大師的功能有哪些
在分布式系統(tǒng)中,消息隊列通常用于解耦服務(wù),RabbitMQ是一個廣泛使用的消息隊列服務(wù)。延遲消息(也稱為延時隊列或TTL消息)是一種常見的場景應(yīng)用,特別適合處理某些任務(wù)在一段時間后執(zhí)行的需求,如訂單超時處理、延時通知等。
本文將以具體代碼為例,展示如何使用RabbitMQ來實現(xiàn)延遲消息處理,涵蓋隊列和交換機的配置、消息的發(fā)送與接收以及死信隊列的處理。
什么是延遲消息?
延遲消息是指消息在發(fā)送到隊列后,經(jīng)過設(shè)定的時間延遲再被消費。RabbitMQ 本身沒有直接支持延遲隊列的功能,但可以通過 TTL(Time To Live)+ 死信隊列(Dead Letter Queue, DLQ) 的組合來實現(xiàn)。當消息超過TTL(消息存活時間)后,不會被立即消費,而是會被轉(zhuǎn)發(fā)到綁定的死信隊列,從而實現(xiàn)延遲處理。
RabbitMQ中的延遲消息原理
在RabbitMQ中,我們可以通過以下幾個概念來實現(xiàn)延遲消息:
- TTL(Time To Live):可以為隊列設(shè)置TTL,消息超過該時間后會被標記為“死信”。
- 死信隊列(Dead Letter Queue):當消息在正常隊列中過期或處理失敗時,RabbitMQ可以將它們路由到一個死信隊列,死信隊列可以用來處理這些過期或未處理的消息。
- x-dead-letter-exchange 和 x-dead-letter-routing-key:可以通過配置隊列的參數(shù),將過期消息發(fā)送到一個專門的死信交換器,并根據(jù)指定的路由鍵轉(zhuǎn)發(fā)到死信隊列。
?
?消息來到ttl.queue消息隊列,過期時間內(nèi)無人消費,消息來到死信交換機hmall.direct,在direct.queue消息隊列無需等待。
1. RabbitMQ的配置
首先,我們需要配置兩個隊列和兩個交換機:一個用于存放延時消息,另一個用于處理超時的死信消息。
package com.heima.stroke.configuration;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitConfig {// 延遲時間 單位:毫秒 (這里設(shè)為30秒)private static final long DELAY_TIME = 1000 * 30;// 行程超時隊列public static final String STROKE_OVER_QUEUE = "STROKE_OVER_QUEUE";// 行程死信隊列public static final String STROKE_DEAD_QUEUE = "STROKE_DEAD_QUEUE";// 行程超時隊列交換機public static final String STROKE_OVER_QUEUE_EXCHANGE = "STROKE_OVER_QUEUE_EXCHANGE";// 行程死信隊列交換機public static final String STROKE_DEAD_QUEUE_EXCHANGE = "STROKE_DEAD_QUEUE_EXCHANGE";// 行程超時交換機 Routing Keypublic static final String STROKE_OVER_KEY = "STROKE_OVER_KEY";// 行程死信交換機 Routing Keypublic static final String STROKE_DEAD_KEY = "STROKE_DEAD_KEY";/*** 聲明行程超時隊列,并設(shè)置其參數(shù)* x-dead-letter-exchange:綁定的死信交換機* x-dead-letter-routing-key:死信路由Key* x-message-ttl:消息的過期時間*/@Beanpublic Queue strokeOverQueue() {Map<String, Object> args = new HashMap<>(3);args.put("x-dead-letter-exchange", STROKE_DEAD_QUEUE_EXCHANGE);args.put("x-dead-letter-routing-key", STROKE_DEAD_KEY);args.put("x-message-ttl", DELAY_TIME); // 設(shè)置TTL為30秒return QueueBuilder.durable(STROKE_OVER_QUEUE).withArguments(args).build();}@Beanpublic DirectExchange strokeOverQueueExchange() {return new DirectExchange(STROKE_OVER_QUEUE_EXCHANGE);}@Beanpublic Binding bindingStrokeOverDirect() {return BindingBuilder.bind(strokeOverQueue()).to(strokeOverQueueExchange()).with(STROKE_OVER_KEY);}
}
解釋:
TTL設(shè)置:我們通過x-message-ttl
設(shè)置消息的過期時間為30秒。
死信隊列綁定:通過x-dead-letter-exchange
和x-dead-letter-routing-key
設(shè)置,當消息過期時,它會被轉(zhuǎn)發(fā)到死信交換機,再路由到死信隊列。
2. 生產(chǎn)者發(fā)送延遲消息
接下來,我們通過生產(chǎn)者向超時隊列發(fā)送消息,這些消息將在TTL過期后轉(zhuǎn)發(fā)到死信隊列。
package com.heima.stroke.rabbitmq;import com.alibaba.fastjson.JSON;
import com.heima.modules.vo.StrokeVO;
import com.heima.stroke.configuration.RabbitConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class MQProducer {private final static Logger logger = LoggerFactory.getLogger(MQProducer.class);@AutowiredRabbitTemplate rabbitTemplate;/*** 發(fā)送延時消息到行程超時隊列** @param strokeVO 消息體*/public void sendOver(StrokeVO strokeVO) {String mqMessage = JSON.toJSONString(strokeVO);logger.info("send timeout msg:{}", mqMessage);rabbitTemplate.convertAndSend(RabbitConfig.STROKE_OVER_QUEUE_EXCHANGE, RabbitConfig.STROKE_OVER_KEY, mqMessage);}
}
解釋:
sendOver
方法將消息發(fā)送到超時隊列,消息將在超時后進入死信隊列。生產(chǎn)者不需要額外處理TTL或死信的配置,只需發(fā)送消息即可。
3. 消費者監(jiān)聽死信隊列
當消息超過TTL后,將會被轉(zhuǎn)發(fā)到死信隊列。消費者需要監(jiān)聽死信隊列并處理這些消息。
j
package com.heima.stroke.rabbitmq;import com.alibaba.fastjson.JSON;
import com.heima.modules.vo.StrokeVO;
import com.heima.stroke.configuration.RabbitConfig;
import com.heima.stroke.handler.StrokeHandler;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;@Component
public class MQConsumer {private final static Logger logger = LoggerFactory.getLogger(MQConsumer.class);@Autowiredprivate StrokeHandler strokeHandler;/*** 監(jiān)聽死信隊列** @param message 消息體* @param channel RabbitMQ的Channel* @param tag 消息的Delivery Tag*/@RabbitListener(bindings = {@QueueBinding(value = @Queue(value = RabbitConfig.STROKE_DEAD_QUEUE, durable = "true"),exchange = @Exchange(value = RabbitConfig.STROKE_DEAD_QUEUE_EXCHANGE),key = RabbitConfig.STROKE_DEAD_KEY)})@RabbitHandlerpublic void processStroke(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {StrokeVO strokeVO = JSON.parseObject(message.getBody(), StrokeVO.class);logger.info("get dead msg:{}", message.getBody());if (strokeVO == null) {return;}try {// 處理超時的行程消息strokeHandler.timeoutHandel(strokeVO);// 手動確認消息channel.basicAck(tag, false);} catch (Exception e) {e.printStackTrace();}}
}
解釋:
@RabbitListener
注解綁定了死信隊列的監(jiān)聽器。當消息被轉(zhuǎn)發(fā)到死信隊列時,該消費者會接收到消息。
使用 channel.basicAck(tag, false)
手動確認消息處理成功,確保消息不會重復(fù)消費。
4. 處理超時業(yè)務(wù)邏輯
在我們的業(yè)務(wù)中,當消息超時未處理時,將其狀態(tài)設(shè)置為超時。
public void timeoutHandel(StrokeVO strokeVO) {// 獲取司機行程ID和乘客行程IDString inviterTripId = strokeVO.getInviterTripId();String inviteeTripId = strokeVO.getInviteeTripId();// 檢查邀請狀態(tài)是否為未確認String inviteeStatus = redisHelper.getHash(HtichConstants.STROKE_INVITE_PREFIX, inviteeTripId, inviterTripId);String inviterStatus = redisHelper.getHash(HtichConstants.STROKE_INVITE_PREFIX, inviterTripId, inviteeTripId);if (String.valueOf(InviteState.UNCONFIRMED.getCode()).equals(inviteeStatus) &&String.valueOf(InviteState.UNCONFIRMED.getCode()).equals(inviterStatus)) {// 更新為超時狀態(tài)redisHelper.addHash(HtichConstants.STROKE_INVITE_PREFIX, inviteeTripId, inviterTripId, String.valueOf(InviteState.TIMEOUT.getCode()));redisHelper.addHash(HtichConstants.STROKE_INVITE_PREFIX, inviterTripId, inviteeTripId, String.valueOf(InviteState.TIMEOUT.getCode()));}
}