秦皇島企業(yè)建網(wǎng)站廣東seo外包服務
目錄
一、死信交換機
1.1、什么是死信交換機
1.2、TTL
1.2.1、什么是 TTL?
1.2.2、通過 TTL 模擬觸發(fā)死信
二、延遲隊列
2.1、什么是延遲隊列
2.2、配置延遲隊列插件
2.2.1、延遲隊列配置
a)下載鏡像
b)運行容器
c)剛剛設定的RabbitMQ的數(shù)據(jù)卷名稱為`mq-plugins`,所以我們使用下面命令查看數(shù)據(jù)卷:
d)在此目錄下,進入 MQ 容器內(nèi)部.
e)開啟插件
2.3、SpringAMQP 使用延遲隊列插件
一、死信交換機
1.1、什么是死信交換機
想要知道什么是死信交換機,先來看看什么是死信(dead letter)~
當生產(chǎn)者發(fā)送了一個消息,經(jīng)過交換機到達隊列時,滿足下列情況之一時,就可以成為死信:
- 消費者使用 basic.reject 或 basic.nack 聲明消費失敗,并且消息的 requeue 參數(shù)設置為 false(消息不重新加入到隊列中).
- 消息設置了過期時間,到了時間沒有被消費掉.
- 要投遞的隊列消息堆積滿了(隊列設置了最大消息數(shù)目),最早的消息可能會成為死信(LRU 算法淘汰的消息).
那么如果這個時候,一個隊列配置了 dead-letter-exchange 屬性,指定了一個交換機,那么隊列中的死信就會投遞到這個交換機中,而這個交換機就稱為 死信交換機.
1.2、TTL
1.2.1、什么是 TTL?
TTL 就是過期時間.? 如果一個隊列中的消息到了過期時間還沒有被消費, 就會變成死信.
這里的消息到了過期時間實際上有兩種情況:
- 消息所在的隊列設置了消息過期時間(x_message_ttl).
- 消息本身設置了存活時間.
1.2.2、通過 TTL 模擬觸發(fā)死信
a)聲明一個直接交換機和一個配置了過期時間(x-message-ttl 屬性)以及配?deadLetterExchange、deadLetterRoutingKey 屬性的普通隊列,用來生成死信
@Configuration
public class TTLMessageConfig {@Beanpublic DirectExchange ttlDirectExchange() {return new DirectExchange("ttl.direct");}@Beanpublic Queue ttlQueue() {return QueueBuilder.durable("ttl.queue").ttl(5000) //延時 5 s.deadLetterExchange("dl.direct") //消息如果超時沒被消費就給這個死信交換機.deadLetterRoutingKey("dl").build();}@Beanpublic Binding ttlBinding() {return BindingBuilder.bind(ttlQueue()).to(ttlDirectExchange()).with("ttl");}}
b)這里我們基于注解的方式,聲明一組死信交換機和隊列
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "dl.queue", durable = "true"),exchange = @Exchange(name = "dl.exchange"),key = "dl"))public void listenDlQueue(String msg) {log.info("消費者收到死信消息!msg=" + msg);}
c)生產(chǎn)者發(fā)送一個過期時間為 5s?的消息
@Testpublic void testTTLMessage() {//1.構(gòu)造一個消息Message message = MessageBuilder.withBody("hello ttl message".getBytes()).setDeliveryMode(MessageDeliveryMode.PERSISTENT).setExpiration("5000").build();//2.發(fā)送消息rabbitTemplate.convertAndSend("ttl.direct", "ttl", message);//3.記錄日志log.info("消息已經(jīng)成功發(fā)送!");}
d)執(zhí)行結(jié)果如下
Ps:通過執(zhí)行結(jié)果,也可以看出,如果消息和隊列都設置了過期時間,那么以時間短的為主.
二、延遲隊列
2.1、什么是延遲隊列
剛剛我們利用 TTL 結(jié)合死信交換機,實現(xiàn)了當消息發(fā)出后,消費者延遲收到消息的效果。這種消息模式就成為 延遲隊列(Delay Queue) 模式。
延遲隊列經(jīng)常用于以下場景:
- 延遲發(fā)送短信.
- 用戶下單,如果再 5 分鐘內(nèi)沒有支付,就自動取消.
- 預約工作會議,10 分鐘后自動通知所有參會人員.
2.2、配置延遲隊列插件
由于 利用 TTL 結(jié)合死信交換機的方式實現(xiàn)起來比較麻煩,并且延遲隊列的需求又非常多,因此 RabbitMQ 官方推出了一個插件,可以通過更簡單的方式,達到延遲隊列的效果.
2.2.1、延遲隊列配置
我們在Centos7虛擬機中使用Docker來安裝。
a)下載鏡像
docker pull rabbitmq:3.8-management
b)運行容器
docker run \-e RABBITMQ_DEFAULT_USER=itcast \-e RABBITMQ_DEFAULT_PASS=123321 \-v mq-plugins:/plugins \--name mq \--hostname mq1 \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq:3.8-management
Ps:此命令還額外配置了插件目錄對應的數(shù)據(jù)卷.
c)剛剛設定的RabbitMQ的數(shù)據(jù)卷名稱為`mq-plugins`,所以我們使用下面命令查看數(shù)據(jù)卷:
docker volume inspect mq-plugins
結(jié)果如下?
使用 cd 命令切換到 Mountpoint 指定的目錄下.
d)在此目錄下,進入 MQ 容器內(nèi)部.
我的容器名為`mq`,所以執(zhí)行下面命令:
docker exec -it mq bash
e)開啟插件
進入容器內(nèi)部后,執(zhí)行以下命令開啟插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
2.3、SpringAMQP 使用延遲隊列插件
a)聲明一個延遲隊列.? 這里實際上和聲明普通交換機只多出了一個 delayed 屬性,設置為 true 就表示為延遲隊列.
以下是基于 注解的方式聲明交換機、隊列、綁定.
Ps:如果是通過 java 代碼的方式聲明交換機,只需要 ExchangeBuilder().directExhange.delay() 即可.
@Component
@Slf4j
public class SpringRabbitListener {@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "delay.queue", durable = "true"),exchange = @Exchange(name = "delay.direct", delayed = "true"),key = "delay"))public void listenDelayExchange(String msg) {log.info("消費者接收到到了延遲消息!msg=" + msg);}}
b)生產(chǎn)者只需要在生產(chǎn)消息的時候添加一個 header:"x-delay",對應的值就是延遲時間,單位是毫秒:
@Testpublic void testDelayMessage() {//1.準備消息Message message = MessageBuilder.withBody("hello ttl message".getBytes()).setDeliveryMode(MessageDeliveryMode.PERSISTENT).setHeader("x-delay", 5000) // 消息延遲時間.build();//2.消息 ID 需要封裝到 CorrelationData 中CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());//3.發(fā)送消息rabbitTemplate.convertAndSend("delay.direct", "delay", message, correlationData);log.info("消息已經(jīng)成功發(fā)送!");}
c)結(jié)果如下?