南京建設(shè)銀行網(wǎng)站首頁想要推廣頁
目錄
- 死信隊(duì)列是什么
- 怎樣實(shí)現(xiàn)一個(gè)死信隊(duì)列
- 說明
- 實(shí)現(xiàn)過程
- 導(dǎo)入依賴
- 添加配置
- 編寫mq配置類
- 添加業(yè)務(wù)隊(duì)列的消費(fèi)者
- 添加死信隊(duì)列的消費(fèi)者
- 添加消息發(fā)送者
- 添加消息測(cè)試類
- 測(cè)試
- 死信隊(duì)列的應(yīng)用場(chǎng)景
- 總結(jié)
死信隊(duì)列是什么
“死信”是RabbitMQ中的一種消息機(jī)制,當(dāng)你在消費(fèi)消息時(shí),如果隊(duì)列里的消息出現(xiàn)以下情況:
- 消息被否定確認(rèn),使用 channel.basicNack 或 channel.basicReject ,并且此時(shí)requeue 屬性被設(shè)置為false。
- 消息在隊(duì)列的存活時(shí)間超過設(shè)置的TTL時(shí)間。
- 消息隊(duì)列的消息數(shù)量已經(jīng)超過最大隊(duì)列長(zhǎng)度。
那么該消息將成為“死信”。
“死信”消息會(huì)被RabbitMQ進(jìn)行特殊處理,如果配置了死信隊(duì)列信息,那么該消息將會(huì)被丟進(jìn)死信隊(duì)列中,如果沒有配置,則該消息將會(huì)被丟棄。
怎樣實(shí)現(xiàn)一個(gè)死信隊(duì)列
說明
配置死信隊(duì)列大概可以分為三個(gè)步驟:
1.配置業(yè)務(wù)隊(duì)列,綁定到業(yè)務(wù)交換機(jī)上
2.為業(yè)務(wù)隊(duì)列配置死信交換機(jī)和路由key
3.為死信交換機(jī)配置死信隊(duì)列
注意,并不是直接聲明一個(gè)公共的死信隊(duì)列,然后所以死信消息就自己跑到死信隊(duì)列里去了。而是為每個(gè)需要使用死信的業(yè)務(wù)隊(duì)列配置一個(gè)死信交換機(jī),這里同一個(gè)項(xiàng)目的死信交換機(jī)可以共用一個(gè),然后為每個(gè)業(yè)務(wù)隊(duì)列分配一個(gè)單獨(dú)的路由key。
實(shí)現(xiàn)過程
導(dǎo)入依賴
<!--RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
添加配置
spring: #rabbitmqrabbitmq:host: 83.136.16.134password: guestusername: guestlistener:type: simplesimple:default-requeue-rejected: falseacknowledge-mode: manual
編寫mq配置類
代碼里面有詳細(xì)說明,這里不在贅述。
package com.miaosha.study.mq;import com.sun.org.apache.regexp.internal.RE;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** @Author: laz* @CreateTime: 2023-02-27 09:16* @Version: 1.0*/
@Configuration
public class RabbitmqConfig {/*** 業(yè)務(wù)交換機(jī)*/public static final String BUSINESS_EXCHANGE_NAME = "business.exchange";/*** 業(yè)務(wù)隊(duì)列a*/public static final String BUSINESS_QUEUEA_NAME = "business.queue.a";/*** 業(yè)務(wù)交換機(jī)b*/public static final String BUSINESS_QUEUEB_NAME = "business.queue.b";/*** 死信交換機(jī)*/public static final String DEAD_LETTER_EXCHANGE_NAME = "dead.letter.exchange";/*** 死信隊(duì)列a*/public static final String DEAD_LETTER_QUEUEA_NAME = "dead.letter.queue.a";/*** 死信隊(duì)列b*/public static final String DEAD_LETTER_QUEUEB_NAME = "dead.letter.queue.b";/*** 死信隊(duì)列路由鍵a*/public static final String DEAD_LETTER_QUEUEA_ROUNTING_KEY_NAME = "dead.letter.queue.a.rounting.key";/*** 死信隊(duì)列路由鍵b*/public static final String DEAD_LETTER_QUEUEB_ROUNTING_KEY_NAME = "dead.letter.queue.b.rounting.key";/*** 申明業(yè)務(wù)交換機(jī)* @return*/@Beanpublic FanoutExchange businessExchange(){return new FanoutExchange(BUSINESS_EXCHANGE_NAME);}/*** 申明死信交換機(jī)* @return*/@Beanpublic DirectExchange deadletterExchange(){return new DirectExchange(DEAD_LETTER_EXCHANGE_NAME);}/*** 申明業(yè)務(wù)隊(duì)列a* @return*/@Beanpublic Queue queuea(){Map<String,Object> map = new HashMap<>();//綁定死信交換機(jī)map.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE_NAME);//綁定的死信路由鍵map.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUEA_ROUNTING_KEY_NAME);return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).withArguments(map).build();}/*** 申明業(yè)務(wù)隊(duì)列b* @return*/@Beanpublic Queue queueb(){Map<String,Object> map = new HashMap<>();//綁定死信交換機(jī)map.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE_NAME);//綁定的死信路由鍵map.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUEB_ROUNTING_KEY_NAME);return QueueBuilder.durable(BUSINESS_QUEUEB_NAME).withArguments(map).build();}/*** 申明死信隊(duì)列a* @return*/@Beanpublic Queue deadletterQueuea(){return new Queue(DEAD_LETTER_QUEUEA_NAME);}/*** 申明死信隊(duì)列b* @return*/@Beanpublic Queue deadletterQueueb(){return new Queue(DEAD_LETTER_QUEUEB_NAME);}/*** 隊(duì)列a綁定到業(yè)務(wù)交換機(jī)* @return*/@Beanpublic Binding businessBindinga(){return BindingBuilder.bind(queuea()).to(businessExchange());}/*** 隊(duì)列b綁定到業(yè)務(wù)交換機(jī)* @return*/@Beanpublic Binding businessBindingb(){return BindingBuilder.bind(queueb()).to(businessExchange());}/*** 死信隊(duì)列a綁定到死信交換機(jī)* @return*/@Beanpublic Binding deadletterBindinga(){return BindingBuilder.bind(deadletterQueuea()).to(deadletterExchange()).with(DEAD_LETTER_QUEUEA_ROUNTING_KEY_NAME);}/*** 死信隊(duì)列b綁定到死信交換機(jī)* @return*/@Beanpublic Binding deadletterBindingB(){return BindingBuilder.bind(deadletterQueueb()).to(deadletterExchange()).with(DEAD_LETTER_QUEUEB_ROUNTING_KEY_NAME);}
}
添加業(yè)務(wù)隊(duì)列的消費(fèi)者
package com.miaosha.study.mq;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;import static com.miaosha.study.mq.RabbitmqConfig.BUSINESS_QUEUEA_NAME;
import static com.miaosha.study.mq.RabbitmqConfig.BUSINESS_QUEUEB_NAME;/*** @Author: laz* @CreateTime: 2023-02-27 09:53* @Version: 1.0*/
@Slf4j
@Component
public class RabbitmqReceiver {/*** 監(jiān)聽業(yè)務(wù)隊(duì)列a* @param message*/@RabbitListener(queues = BUSINESS_QUEUEA_NAME)public void queuea(Message message, Channel channel) throws IOException {String msg = new String(message.getBody());log.info("業(yè)務(wù)隊(duì)列A接受到消息【{}】",msg);boolean ack = true;Exception exception = null;try {//這里模擬業(yè)務(wù)邏輯出現(xiàn)異常的情況if (msg.contains("fail")){throw new RuntimeException("dead letter exception");}} catch (Exception e){ack = false;exception = e;}//當(dāng)ack為false時(shí)(業(yè)務(wù)邏輯出現(xiàn)異常),說明當(dāng)前消息消費(fèi)異常,這里直接放入死信隊(duì)列if (!ack){log.error("業(yè)務(wù)隊(duì)列A消費(fèi)發(fā)生異常,error msg:{}", exception.getMessage());/*** void basicNack(long deliveryTag, boolean multiple, boolean requeue)* 參數(shù)一:當(dāng)前消息的唯一id* 參數(shù)二:是否針對(duì)多條消息* 參數(shù)三:是否從新入隊(duì)列*/channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);} else {channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}/*** 監(jiān)聽業(yè)務(wù)隊(duì)列b* @param msg*/@RabbitListener(queues = BUSINESS_QUEUEB_NAME)public void queueb(Message msg,Channel channel) throws Exception{String str = new String(msg.getBody());log.info("業(yè)務(wù)隊(duì)列B接受到消息【{}】",str);channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);}
}
添加死信隊(duì)列的消費(fèi)者
package com.miaosha.study.mq;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;import static com.miaosha.study.mq.RabbitmqConfig.*;/*** @Author: laz* @CreateTime: 2023-02-27 09:58* @Version: 1.0*/
@Slf4j
@Component
public class DeadLetterReceiver {/*** 監(jiān)聽業(yè)務(wù)隊(duì)列a* @param msg*/@RabbitListener(queues = DEAD_LETTER_QUEUEA_NAME)public void queuea(Message msg, Channel channel) throws IOException {String str = new String(msg.getBody());log.info("死信隊(duì)列A接受到消息【{}】",str);channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);log.info("死信消息properties:{}", msg.getMessageProperties());}/*** 監(jiān)聽業(yè)務(wù)隊(duì)列b* @param msg*/@RabbitListener(queues = DEAD_LETTER_QUEUEB_NAME)public void queueb(Message msg, Channel channel) throws IOException {String str = new String(msg.getBody());log.info("死信隊(duì)列B接受到消息【{}】",str);channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);log.info("死信消息properties:{}", msg.getMessageProperties());}
}
添加消息發(fā)送者
package com.miaosha.study.mq;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import static com.miaosha.study.mq.RabbitmqConfig.BUSINESS_EXCHANGE_NAME;/*** @Author: laz* @CreateTime: 2023-02-27 09:49* @Version: 1.0*/
@Component
public class RabbitmqSender {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg(String msg){rabbitTemplate.convertAndSend(BUSINESS_EXCHANGE_NAME,"",msg);}
}
添加消息測(cè)試類
package com.miaosha.study.controller;import com.miaosha.study.mq.RabbitmqSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** @Author: laz* @CreateTime: 2023-02-27 09:59* @Version: 1.0*/
@RestController
@RequestMapping("mq")
public class TestController {@Autowiredprivate RabbitmqSender rabbitmqSender;@RequestMapping("testDeadLetterQueue/{msg}")public void testDeadLetterQueue(@PathVariable("msg")String msg){rabbitmqSender.sendMsg(msg);}
}
測(cè)試
運(yùn)行項(xiàng)目,訪問:http://localhost:8081/mq/testDeadLetterQueue/msg
可以看到,此時(shí)只有業(yè)務(wù)消費(fèi)者消費(fèi)了消息,死信隊(duì)列并沒有消費(fèi)到消息。
然后根據(jù)消費(fèi)者里面的邏輯,我們發(fā)送一條 ‘fail’的消息,再次測(cè)試
訪問:http://localhost:8081/mq/testDeadLetterQueue/fail
可以看到,死信隊(duì)列a已收到消息。到此實(shí)現(xiàn)死信隊(duì)列的流程就通了。
注意:我們的死信消息MessageProperties
中的內(nèi)容比較多,代表的含義分別是:
字段名 | 含義 |
---|---|
x-first-death-exchange | 第一次被拋入的死信交換機(jī)的名稱 |
x-first-death-reason | 第一次成為死信的原因,rejected:消息在重新進(jìn)入隊(duì)列時(shí)被隊(duì)列拒絕,由于default-requeue-rejected 參數(shù)被設(shè)置為false。expired :消息過期。maxlen : 隊(duì)列內(nèi)消息數(shù)量超過隊(duì)列最大容量 |
x-first-death-queue | 第一次成為死信前所在隊(duì)列名稱 |
x-death | 歷次被投入死信交換機(jī)的信息列表,同一個(gè)消息每次進(jìn)入一個(gè)死信交換機(jī),這個(gè)數(shù)組的信息就會(huì)被更新 |
死信隊(duì)列的應(yīng)用場(chǎng)景
一般用在較為重要的業(yè)務(wù)隊(duì)列中,確保未被正確消費(fèi)的消息不被丟棄,一般發(fā)生消費(fèi)異??赡茉蛑饕杏捎谙⑿畔⒈旧泶嬖阱e(cuò)誤導(dǎo)致處理異常,處理過程中參數(shù)校驗(yàn)異常,或者因網(wǎng)絡(luò)波動(dòng)導(dǎo)致的查詢異常等等,當(dāng)發(fā)生異常時(shí),當(dāng)然不能每次通過日志來獲取原消息,然后讓運(yùn)維幫忙重新投遞消息。通過配置死信隊(duì)列,可以讓未正確處理的消息暫存到另一個(gè)隊(duì)列中,待后續(xù)排查清楚問題后,編寫相應(yīng)的處理代碼來處理死信消息,這樣比手工恢復(fù)數(shù)據(jù)要好太多了。
總結(jié)
死信隊(duì)列其實(shí)并沒有什么神秘的地方,不過是綁定在死信交換機(jī)上的普通隊(duì)列,而死信交換機(jī)也只是一個(gè)普通的交換機(jī),不過是用來專門處理死信的交換機(jī)。
死信消息的生命周期:
- 業(yè)務(wù)消息被投入業(yè)務(wù)隊(duì)列
- 消費(fèi)者消費(fèi)業(yè)務(wù)隊(duì)列的消息,由于處理過程中發(fā)生異常,于是進(jìn)行了nck或者reject操作
- 被nck或reject的消息由RabbitMQ投遞到死信交換機(jī)中
- 死信交換機(jī)將消息投入相應(yīng)的死信隊(duì)列
- 死信隊(duì)列的消費(fèi)者消費(fèi)死信消息
本篇文章到此結(jié)束!希望對(duì)您有所幫助。