蘇州專業(yè)網(wǎng)站建設(shè)開發(fā)網(wǎng)站seo快速排名優(yōu)化的軟件
RabbitMQ死信隊(duì)列&延遲交換機(jī)
1.什么是死信
死信&死信隊(duì)列 |
---|
![]() |
死信隊(duì)列的應(yīng)用:
- 基于死信隊(duì)列在隊(duì)列消息已滿的情況下,消息也不會(huì)丟失
- 實(shí)現(xiàn)延遲消費(fèi)的效果。比如:下訂單時(shí),有15分鐘的付款時(shí)間
2. 實(shí)現(xiàn)死信隊(duì)列
2.1 準(zhǔn)備Exchange&Queue
package com.llp.rabbitmq.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 死信隊(duì)列配置*/
@Configuration
public class DeadLetterConfig {public static final String NORMAL_EXCHANGE = "normal-exchange";public static final String NORMAL_QUEUE = "normal-queue";public static final String NORMAL_ROUTING_KEY = "normal.#";public static final String DEAD_EXCHANGE = "dead-exchange";public static final String DEAD_QUEUE = "dead-queue";public static final String DEAD_ROUTING_KEY = "dead.#";@Beanpublic Exchange normalExchange(){return ExchangeBuilder.topicExchange(NORMAL_EXCHANGE).build();}@Beanpublic Queue normalQueue(){//普通隊(duì)列,綁定死信隊(duì)列return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey("dead.abc").build();}@Beanpublic Binding normalBinding(Queue normalQueue,Exchange normalExchange){return BindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_ROUTING_KEY).noargs();}@Beanpublic Exchange deadExchange(){return ExchangeBuilder.topicExchange(DEAD_EXCHANGE).build();}@Beanpublic Queue deadQueue(){return QueueBuilder.durable(DEAD_QUEUE).build();}@Beanpublic Binding deadBinding(Queue deadQueue,Exchange deadExchange){return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs();}
}
2.2 實(shí)現(xiàn)效果
-
基于消費(fèi)者進(jìn)行reject或者nack實(shí)現(xiàn)死信效果
package com.llp.rabbitmq.topic;import com.llp.rabbitmq.config.DeadLetterConfig; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;import java.io.IOException;@Component public class DeadListener {@RabbitListener(queues = DeadLetterConfig.NORMAL_QUEUE)public void consume(String msg, Channel channel, Message message) throws IOException {System.out.println("接收到normal隊(duì)列的消息:" + msg);//設(shè)置消息決絕消費(fèi),不需要重新放入到隊(duì)列中channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);//或者//channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);} }
-
消息的生存時(shí)間
-
給消息設(shè)置生存時(shí)間
@Test public void publishExpire(){String msg = "dead letter expire";rabbitTemplate.convertAndSend(DeadLetterConfig.NORMAL_EXCHANGE, "normal.abc", msg, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration("5000");return message;}}); }
-
給隊(duì)列設(shè)置消息的生存時(shí)間
@Bean public Queue normalQueue(){return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey("dead.abc").ttl(10000).build(); }
-
-
設(shè)置Queue中的消息最大長度
@Bean public Queue normalQueue(){return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey("dead.abc").maxLength(1).build(); }
只要Queue中已經(jīng)有一個(gè)消息,如果再次發(fā)送一個(gè)消息,這個(gè)消息會(huì)變?yōu)樗佬?#xff01;
3.延遲交換機(jī)
下載地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/3.8.9
死信隊(duì)列實(shí)現(xiàn)延遲消費(fèi)時(shí),如果延遲時(shí)間比較復(fù)雜,比較多,直接使用死信隊(duì)列時(shí),需要?jiǎng)?chuàng)建大量的隊(duì)列還對(duì)應(yīng)不同的時(shí)間,可以采用延遲交換機(jī)來解決這個(gè)問題。
將下載的文件上傳到linux服務(wù)器并使用如下指令,將文件方到rabbitmq容器的plugins目錄下
docker cp rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez 9da57c5038ba:/opt/rabbitmq/plugins
在rabbitmq容器的/opt/rabbitmq/sbin目錄下執(zhí)行
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
重啟容器生效
docker restart 9da57c5038ba
可以看到添加插件后多了一個(gè)延遲交換機(jī)的選項(xiàng)
-
構(gòu)建延遲交換機(jī)
package com.llp.rabbitmq.config;import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;import java.util.HashMap; import java.util.Map;/*** 延遲隊(duì)列*/ @Configuration public class DelayedConfig {public static final String DELAYED_EXCHANGE = "delayed-exchange";public static final String DELAYED_QUEUE = "delayed-queue";public static final String DELAYED_ROUTING_KEY = "delayed.#";@Beanpublic Exchange delayedExchange(){Map<String, Object> arguments = new HashMap<>();arguments.put("x-delayed-type","topic");Exchange exchange = new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message",true,false,arguments);return exchange;}@Beanpublic Queue delayedQueue(){return QueueBuilder.durable(DELAYED_QUEUE).build();}@Beanpublic Binding delayedBinding(Queue delayedQueue,Exchange delayedExchange){return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();} }
-
發(fā)送消息
package com.llp.rabbitmq;import com.llp.rabbitmq.config.DelayedConfig; import org.junit.jupiter.api.Test; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest public class DelayedPublisherTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void publish(){rabbitTemplate.convertAndSend(DelayedConfig.DELAYED_EXCHANGE, "delayed.abc", "xxxx", new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {//設(shè)置消息指定多少時(shí)間被消費(fèi),單位毫秒message.getMessageProperties().setDelay(30000);return message;}});} }
**延遲交換機(jī)存在的問題:**在延遲推送消息的過程中rabbitmq重啟了、或者說服務(wù)器宕機(jī)了就會(huì)導(dǎo)致消息丟失