網(wǎng)站安全檢測(cè)在線武漢seo網(wǎng)站優(yōu)化
目錄
成為死信的條件?
消息TTL過(guò)期?
?隊(duì)列達(dá)到最大長(zhǎng)度
?消息被拒
延遲隊(duì)列
?延遲隊(duì)列使用場(chǎng)景
?消息設(shè)置 TTL
隊(duì)列設(shè)置 TTL
?兩者區(qū)別
?
producer 將消息投遞到 broker 或者直接到 queue 里了, consumer 從 queue 取出消息進(jìn)行消費(fèi),但某些時(shí)候由于特定的 原因?qū)е? queue 中的某些消息無(wú)法被消費(fèi) ,這樣的消息如果沒(méi)有后續(xù)的處理,就變成了死信,有死信自然就有了死信隊(duì)列。
成為死信的條件?
-
超過(guò)消息的存活時(shí)間(TTL):可以為消息設(shè)置一個(gè)存活時(shí)間,在該時(shí)間段之后,如果消息還未被消費(fèi)或者被重新投遞到其他隊(duì)列,該消息將成為死信。
-
消息被拒絕(Reject):當(dāng)消息被消費(fèi)者拒絕接收時(shí),可以選擇將該消息重新投遞到另一個(gè)隊(duì)列或?qū)⑵錁?biāo)記為死信。
-
消息達(dá)到最大重試次數(shù):可以通過(guò)在消費(fèi)者端設(shè)置重試次數(shù)限制,當(dāng)消息達(dá)到一定的重試次數(shù)而仍然無(wú)法被消費(fèi)時(shí),該消息將成為死信。
-
隊(duì)列滿溢(Queue Overflow):當(dāng)一個(gè)隊(duì)列的消息數(shù)量已經(jīng)超過(guò)隊(duì)列的最大容量限制時(shí),新消息無(wú)法進(jìn)入隊(duì)列,而被視為死信。
?
消息TTL過(guò)期?
生產(chǎn)者?
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;public class Producer {private static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] argv) throws Exception {try (Channel channel = RabbitMqUtils.getChannel()) {channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);//設(shè)置消息的 TTL 時(shí)間AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();//該信息是用作演示隊(duì)列個(gè)數(shù)限制for (int i = 1; i <11 ; i++) {String message="info"+i;channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties,message.getBytes());System.out.println("生產(chǎn)者發(fā)送消息:"+message);}}}
}
?消費(fèi)者1
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;import java.util.HashMap;
import java.util.Map;public class Consumer01 {//普通交換機(jī)名稱private static final String NORMAL_EXCHANGE = "normal_exchange";//死信交換機(jī)名稱private static final String DEAD_EXCHANGE = "dead_exchange";public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();//聲明死信和普通交換機(jī) 類型為 directchannel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);//聲明死信隊(duì)列String deadQueue = "dead-queue";channel.queueDeclare(deadQueue, false, false, false, null);//死信隊(duì)列綁定死信交換機(jī)與 routingkeychannel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");//正常隊(duì)列綁定死信隊(duì)列信息Map<String, Object> params = new HashMap<>();//正常隊(duì)列設(shè)置死信交換機(jī) 參數(shù) key 是固定值params.put("x-dead-letter-exchange", DEAD_EXCHANGE);//正常隊(duì)列設(shè)置死信 routing-key 參數(shù) key 是固定值params.put("x-dead-letter-routing-key", "lisi");String normalQueue = "normal-queue";channel.queueDeclare(normalQueue, false, false, false, params);channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("Consumer01 接收到消息"+message);};channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {});}
}
消費(fèi)者2
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class Consumer02 {private static final String DEAD_EXCHANGE = "dead_exchange";public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);String deadQueue = "dead-queue";channel.queueDeclare(deadQueue, false, false, false, null);channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");System.out.println("等待接收死信隊(duì)列消息.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("Consumer02 接收死信隊(duì)列的消息" + message);};channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {});}
}
關(guān)閉消費(fèi)者1,模擬出故障
正常隊(duì)列
?消息超時(shí)進(jìn)入死信隊(duì)列
?死信隊(duì)列接收信息
?隊(duì)列達(dá)到最大長(zhǎng)度
生產(chǎn)者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;public class Producer {private static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] argv) throws Exception {try (Channel channel = RabbitMqUtils.getChannel()) {channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);//該信息是用作演示隊(duì)列個(gè)數(shù)限制for (int i = 1; i <11 ; i++) {String message="info"+i;channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null, message.getBytes());System.out.println("生產(chǎn)者發(fā)送消息:"+message);}}}
}
?消費(fèi)者1
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;import java.util.HashMap;
import java.util.Map;public class Consumer01 {//普通交換機(jī)名稱private static final String NORMAL_EXCHANGE = "normal_exchange";//死信交換機(jī)名稱private static final String DEAD_EXCHANGE = "dead_exchange";public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();//聲明死信和普通交換機(jī) 類型為 directchannel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);//聲明死信隊(duì)列String deadQueue = "dead-queue";channel.queueDeclare(deadQueue, false, false, false, null);//死信隊(duì)列綁定死信交換機(jī)與 routingkeychannel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");//正常隊(duì)列綁定死信隊(duì)列信息Map<String, Object> params = new HashMap<>();//正常隊(duì)列設(shè)置死信交換機(jī) 參數(shù) key 是固定值params.put("x-dead-letter-exchange", DEAD_EXCHANGE);//正常隊(duì)列設(shè)置死信 routing-key 參數(shù) key 是固定值params.put("x-dead-letter-routing-key", "lisi");// 設(shè)置正常隊(duì)列長(zhǎng)度的限制params.put("x-max-length",6);String normalQueue = "normal-queue";channel.queueDeclare(normalQueue, false, false, false, params);channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("Consumer01 接收到消息"+message);};channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {});}
}
?消費(fèi)者2
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class Consumer02 {private static final String DEAD_EXCHANGE = "dead_exchange";public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);String deadQueue = "dead-queue";channel.queueDeclare(deadQueue, false, false, false, null);channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");System.out.println("等待接收死信隊(duì)列消息.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("Consumer02 接收死信隊(duì)列的消息" + message);};channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {});}
}
關(guān)閉消費(fèi)者1,模擬接收不到信息
?死信隊(duì)列消費(fèi)了四個(gè)消息
?分析:生產(chǎn)者產(chǎn)生10個(gè)消息,正常隊(duì)列只能接受6個(gè)消息,多的消息便被轉(zhuǎn)移到死信隊(duì)列去了
?消息被拒
生產(chǎn)者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;public class Producer {private static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] argv) throws Exception {try (Channel channel = RabbitMqUtils.getChannel()) {channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);//該信息是用作演示隊(duì)列個(gè)數(shù)限制for (int i = 1; i <11 ; i++) {String message="info"+i;channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null, message.getBytes());System.out.println("生產(chǎn)者發(fā)送消息:"+message);}}}
}
消費(fèi)者1
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;import java.util.HashMap;
import java.util.Map;public class Consumer01 {//普通交換機(jī)名稱private static final String NORMAL_EXCHANGE = "normal_exchange";//死信交換機(jī)名稱private static final String DEAD_EXCHANGE = "dead_exchange";public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();//聲明死信和普通交換機(jī) 類型為 directchannel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);//聲明死信隊(duì)列String deadQueue = "dead-queue";channel.queueDeclare(deadQueue, false, false, false, null);//死信隊(duì)列綁定死信交換機(jī)與 routingkeychannel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");//正常隊(duì)列綁定死信隊(duì)列信息Map<String, Object> params = new HashMap<>();//正常隊(duì)列設(shè)置死信交換機(jī) 參數(shù) key 是固定值params.put("x-dead-letter-exchange", DEAD_EXCHANGE);//正常隊(duì)列設(shè)置死信 routing-key 參數(shù) key 是固定值params.put("x-dead-letter-routing-key", "lisi");String normalQueue = "normal-queue";channel.queueDeclare(normalQueue, false, false, false, params);channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");if(message.equals("info5")){System.out.println("Consumer01 接收到消息" + message + "并拒絕簽收該消息");//requeue 設(shè)置為 false 代表拒絕重新入隊(duì) 該隊(duì)列如果配置了死信交換機(jī)將發(fā)送到死信隊(duì)列中channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);}else {System.out.println("Consumer01 接收到消息"+message);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}};boolean autoAck = false;channel.basicConsume(normalQueue, autoAck, deliverCallback, consumerTag -> {});}
}
?消費(fèi)者2
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class Consumer02 {private static final String DEAD_EXCHANGE = "dead_exchange";public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);String deadQueue = "dead-queue";channel.queueDeclare(deadQueue, false, false, false, null);channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");System.out.println("等待接收死信隊(duì)列消息.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("Consumer02 接收死信隊(duì)列的消息" + message);};channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {});}
}
?結(jié)果
消費(fèi)者1,拒絕接受消息info5,info5進(jìn)入死信隊(duì)列
?消費(fèi)者2,死信隊(duì)列接受到info5
延遲隊(duì)列
?延遲隊(duì)列使用場(chǎng)景
?消息設(shè)置 TTL
?
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;public class Producer {private static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] argv) throws Exception {try (Channel channel = RabbitMqUtils.getChannel()) {channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);//設(shè)置消息的 TTL 時(shí)間AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();//該信息是用作演示隊(duì)列個(gè)數(shù)限制for (int i = 1; i <11 ; i++) {String message="info"+i;channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties,message.getBytes());System.out.println("生產(chǎn)者發(fā)送消息:"+message);}}}
}
隊(duì)列設(shè)置 TTL
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;import java.util.HashMap;
import java.util.Map;public class Consumer01 {//普通交換機(jī)名稱private static final String NORMAL_EXCHANGE = "normal_exchange";//死信交換機(jī)名稱private static final String DEAD_EXCHANGE = "dead_exchange";public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();//聲明死信和普通交換機(jī) 類型為 directchannel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);//聲明死信隊(duì)列String deadQueue = "dead-queue";channel.queueDeclare(deadQueue, false, false, false, null);//死信隊(duì)列綁定死信交換機(jī)與 routingkeychannel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");//正常隊(duì)列綁定死信隊(duì)列信息Map<String, Object> params = new HashMap<>();//正常隊(duì)列設(shè)置死信交換機(jī) 參數(shù) key 是固定值params.put("x-dead-letter-exchange", DEAD_EXCHANGE);//正常隊(duì)列設(shè)置死信 routing-key 參數(shù) key 是固定值params.put("x-dead-letter-routing-key", "lisi");// 設(shè)置 TTL 值為 5000 毫秒(5 秒)params.put("x-message-ttl", 5000);String normalQueue = "normal-queue";channel.queueDeclare(normalQueue, false, false, false, params);channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");if(message.equals("info5")){System.out.println("Consumer01 接收到消息" + message + "并拒絕簽收該消息");//requeue 設(shè)置為 false 代表拒絕重新入隊(duì) 該隊(duì)列如果配置了死信交換機(jī)將發(fā)送到死信隊(duì)列中channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);}else {System.out.println("Consumer01 接收到消息"+message);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}};boolean autoAck = false;channel.basicConsume(normalQueue, autoAck, deliverCallback, consumerTag -> {});}
}
?兩者區(qū)別
?