家具網(wǎng)站怎么做aso網(wǎng)站
文章目錄
- 什么是死信交換機(jī)?
- 死信交換機(jī)實(shí)現(xiàn)延遲消息的思路
- 實(shí)現(xiàn)過(guò)程
- 配置類
- 消費(fèi)者監(jiān)聽(tīng)死信隊(duì)列
- 發(fā)送延遲消息
- 注意事項(xiàng)
- 總結(jié)
在開(kāi)發(fā)過(guò)程中,我們常常會(huì)遇到需要延遲處理某些消息的場(chǎng)景,例如訂單的支付超時(shí)處理、短信的定時(shí)發(fā)送等。本文將介紹如何使用RabbitMQ的死信交換機(jī)(Dead Letter Exchange,DLX)來(lái)實(shí)現(xiàn)延遲消息的處理。
什么是死信交換機(jī)?
死信交換機(jī)是一種特殊的交換機(jī),用于處理不能被正常消費(fèi)的消息。當(dāng)消息在隊(duì)列中出現(xiàn)以下幾種情況時(shí),會(huì)被轉(zhuǎn)發(fā)到死信交換機(jī):
- 消息被拒絕(Basic.Reject或Basic.Nack)并且requeue參數(shù)設(shè)置為false。
- 消息在隊(duì)列中的存活時(shí)間超過(guò)了TTL(Time To Live)。
- 隊(duì)列的最大長(zhǎng)度已滿,導(dǎo)致消息被丟棄。
通過(guò)配置死信交換機(jī),我們可以將這些“死信”轉(zhuǎn)發(fā)到一個(gè)特殊的隊(duì)列,從而進(jìn)行后續(xù)處理。
死信交換機(jī)實(shí)現(xiàn)延遲消息的思路
利用消息的TTL屬性和死信交換機(jī),我們可以實(shí)現(xiàn)延遲消息的處理。具體步驟如下:
- 創(chuàng)建一個(gè)普通交換機(jī)和隊(duì)列,隊(duì)列綁定一個(gè)死信交換機(jī)。
- 發(fā)送消息到普通隊(duì)列,并設(shè)置消息的TTL。
- 消息過(guò)期后,會(huì)轉(zhuǎn)發(fā)到死信交換機(jī),死信交換機(jī)再將消息路由到實(shí)際處理的隊(duì)列中。
實(shí)現(xiàn)過(guò)程
配置類
首先,我們需要配置普通交換機(jī)、隊(duì)列和綁定關(guān)系:
package com.itheima.consumer.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class NormalConfiguration {@Beanpublic DirectExchange normalExchange() {return ExchangeBuilder.directExchange("normal.direct").build();}@Beanpublic Queue normalQueue() {return QueueBuilder.durable("normal.queue").deadLetterExchange("dlx.direct").build();}@Beanpublic Binding normalExchangeBinding(Queue normalQueue, DirectExchange normalExchange) {return BindingBuilder.bind(normalQueue).to(normalExchange).with("hi");}
}
在上述配置中,我們創(chuàng)建了一個(gè)普通交換機(jī)normal.direct
,以及一個(gè)普通隊(duì)列normal.queue
,并將隊(duì)列綁定到了死信交換機(jī)dlx.direct
。
消費(fèi)者監(jiān)聽(tīng)死信隊(duì)列
接下來(lái),我們需要定義一個(gè)消費(fèi)者來(lái)監(jiān)聽(tīng)死信隊(duì)列,并處理延遲后的消息:
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.core.ExchangeTypes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;@Component
public class DlxMessageListener {private static final Logger log = LoggerFactory.getLogger(DlxMessageListener.class);@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "dlx.queue", durable = "true"),exchange = @Exchange(name = "dlx.direct", type = ExchangeTypes.DIRECT),key = {"hi"}))public void listenDlxQueue1(String message) {log.info("消費(fèi)者監(jiān)聽(tīng)到dlx.queue消息:{}", message);}
}
發(fā)送延遲消息
最后,我們編寫(xiě)一個(gè)測(cè)試方法來(lái)發(fā)送延遲消息:
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
public class DelayMessageTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testvoid testSendDelayMessage() {rabbitTemplate.convertAndSend("normal.direct", "hi", "hello", message -> {message.getMessageProperties().setExpiration("10000");return message;});}
}
在上述測(cè)試方法中,我們向普通隊(duì)列發(fā)送了一條消息,并設(shè)置其TTL為10000毫秒(即10秒)。當(dāng)消息在普通隊(duì)列中存活超過(guò)10秒后,會(huì)被轉(zhuǎn)發(fā)到死信交換機(jī),然后由消費(fèi)者監(jiān)聽(tīng)并處理。
注意事項(xiàng)
需要注意的是,RabbitMQ的消息過(guò)期是基于追溯方式來(lái)實(shí)現(xiàn)的,也就是說(shuō)當(dāng)一個(gè)消息的TTL到期以后,不一定會(huì)立即被移除或投遞到死信交換機(jī),而是在消息恰好處于隊(duì)首時(shí)才會(huì)被處理。當(dāng)隊(duì)列中消息堆積很多的時(shí)候,過(guò)期消息可能不會(huì)被按時(shí)處理,因此你設(shè)置的TTL時(shí)間不一定準(zhǔn)確。
總結(jié)
通過(guò)以上配置和代碼,我們實(shí)現(xiàn)了使用RabbitMQ死信交換機(jī)來(lái)處理延遲消息。利用死信交換機(jī)的機(jī)制,我們可以方便地實(shí)現(xiàn)各種復(fù)雜的消息處理場(chǎng)景,提高系統(tǒng)的靈活性和可靠性。