常寧市建設(shè)局網(wǎng)站seo培訓(xùn)
重試機(jī)制
在消息從Broker到消費(fèi)者的傳遞過程中,可能會遇到各種問題,如網(wǎng)絡(luò)故障、服務(wù)不可用、資源不足等,這些問題都可能導(dǎo)致消息處理失敗。為了解決這些問題,RabbitMQ提供了重試機(jī)制,允許消息在處理失敗之后重新發(fā)送。
但如果是程序邏輯引起的錯(cuò)誤,那么多次重試也是不起作用的,因此設(shè)置了重試次數(shù)。
消費(fèi)者確認(rèn)機(jī)制為AUTO時(shí)
當(dāng)消費(fèi)者確認(rèn)機(jī)制是AUTO時(shí),如果程序邏輯錯(cuò)誤,那么就會不斷重試,造成消息積壓。因此我們就需要設(shè)置重試次數(shù),當(dāng)多次重試還是失敗,消息就會被自動確認(rèn),自然消息就會丟失。
spring:rabbitmq:host: 43.138.108.125port: 5672username: adminpassword: adminvirtual-host: mq-springboot-testlistener:simple:retry:enabled: true # 開啟消費(fèi)者失敗重試initial-interval: 5000ms # 初始失敗等待時(shí)長max-attempts: 5 # 最大重試次數(shù)
@Configuration
public class RetryConfig {@Bean("retryQueue")public Queue retryQueue() {return QueueBuilder.durable(Constants.RETRY_QUEUE).build();}@Bean("retryExchange")public Exchange retryExchange() {return ExchangeBuilder.directExchange(Constants.RETRY_EXCHANGE).durable(true).build();}@Bean("retryQueueBind")public Binding retryQueueBind(@Qualifier("retryExchange") Exchange exchange,@Qualifier("retryQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("retry").noargs();}}
@RestController
@RequestMapping("/retry")
public class RetryController {@Resourcepublic RabbitTemplate rabbitTemplate;@RequestMappingpublic void retryQueue() {this.rabbitTemplate.convertAndSend(Constants.RETRY_EXCHANGE, "retry", "hello 重試機(jī)制");System.out.println("重試機(jī)制生產(chǎn)者發(fā)送消息成功");}}
@Configuration
public class RetryListener {@RabbitListener(queues = Constants.RETRY_QUEUE)public void retryListener(String msg) {System.out.println("獲取到消息:" + msg);int n = 3 / 0;}}
上述代碼和可靠性傳輸一文的消費(fèi)者確認(rèn)機(jī)制中策略為AUTO的代碼類似,只不過在此配置文件中加了一個(gè)重試機(jī)制。當(dāng)啟動程序之后,可以看到如下結(jié)果:
重試時(shí):
?
重試結(jié)束之后:?
?
?從測試結(jié)果可以看出,當(dāng)消費(fèi)者確認(rèn)機(jī)制的策略為AUTO時(shí),遇到異常就會進(jìn)行重試,當(dāng)重試結(jié)束之后依然沒有接收時(shí),就會自動確認(rèn)消息。
消費(fèi)者確認(rèn)機(jī)制為MANAUL時(shí)
當(dāng)消費(fèi)者確認(rèn)機(jī)制是MANUL時(shí),修改消費(fèi)者代碼,并啟動程序,查看結(jié)果:
spring:rabbitmq:host: 43.138.108.125port: 5672username: adminpassword: adminvirtual-host: mq-springboot-testlistener:simple:acknowledge-mode: manual # 消息確認(rèn)機(jī)制,手動確認(rèn)retry:enabled: true # 開啟消費(fèi)者失敗重試initial-interval: 5000ms # 初始失敗等待時(shí)長max-attempts: 5 # 最大重試次數(shù)
@Configuration
public class RetryListener {@RabbitListener(queues = Constants.RETRY_QUEUE)public void retryListener(Message msg, Channel channel) throws IOException {try {System.out.println("接收到消息:" + msg);int num = 3 / 0; // 模擬處理失敗channel.basicAck(msg.getMessageProperties().getDeliveryTag(), true);} catch (Exception e) {channel.basicReject(msg.getMessageProperties().getDeliveryTag(), true);}}}
從測試結(jié)果可以得出,消費(fèi)者確認(rèn)機(jī)制為手動確認(rèn)時(shí),并不會依據(jù)配置文件中的重試次數(shù)等內(nèi)容來做,而是依據(jù)消息者自身的代碼實(shí)現(xiàn)來做實(shí)現(xiàn)機(jī)制。原因是因?yàn)槭謩哟_認(rèn)模式下,消費(fèi)者需要顯示地對消息進(jìn)行確認(rèn),如果消費(fèi)者在消息處理過程中遇到異常,可以選擇確認(rèn)不確定消息,也可以選擇重新入隊(duì)。所以重試的控制權(quán)并不在應(yīng)用程序本身,而在于代碼邏輯本身。?
1. 消費(fèi)者確認(rèn)機(jī)制為AUTO時(shí),如果程序邏輯異常,多次重試還是失敗。那么消息就會自動確認(rèn),進(jìn)而消息就會丟失。
2. 消費(fèi)者確認(rèn)機(jī)制為MANAUL時(shí),如果程序邏輯異常,多次重試依然處理失敗,無法被確認(rèn),消息就會積壓。
3. 消費(fèi)者確認(rèn)機(jī)制為NONE時(shí),不管發(fā)生什么情況,當(dāng)消息從Broker內(nèi)部發(fā)出時(shí),就會自動確認(rèn),因此它不存在任何內(nèi)容。
TTL
TTL,過期時(shí)間。當(dāng)消息到達(dá)過期時(shí)間之后,還沒有被消息,消息就會被自動清除。
RabbitMQ可以對隊(duì)列和消息設(shè)置過期時(shí)間。如果兩種方法同時(shí)使用,那么就以兩者較小的值為準(zhǔn)。
設(shè)置消息的TTL
@Configuration
public class TllConfig {@Bean("ttlQueue")public Queue ttlQueue() {return QueueBuilder.durable(Constants.TTL_QUEUE).build();}@Bean("ttlExchange")public Exchange ttlExchange() {return ExchangeBuilder.directExchange(Constants.TTL_EXCHANGE).build();}@Bean("ttlQueueBind")public Binding ttlQueueBind(@Qualifier("ttlExchange") Exchange exchange,@Qualifier("ttlQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("ttl").noargs();}}
@RestController
@RequestMapping("/ttl")
public class TtlController {@Resourceprivate RabbitTemplate rabbitTemplate;@RequestMappingpublic void ttlQueue() {MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration("50000");return message;}};this.rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE, "ttl", "hello ttl", messagePostProcessor);}}
?在TTL的測試中,不需要消費(fèi)者的存在,否則看不到消息在隊(duì)列中的自動丟失。
設(shè)置隊(duì)列的TTL
注意,設(shè)置隊(duì)列的TTL,并不是過期之后刪除整個(gè)隊(duì)列,也是關(guān)于消息設(shè)置的。只不過投遞到該消息隊(duì)列的所有消息都有一個(gè)共同的過期時(shí)間而已。
@Configuration
public class TllConfig {@Bean("ttlQueue")public Queue ttlQueue() {return QueueBuilder.durable(Constants.TTL_QUEUE).ttl(5000).build();}@Bean("ttlExchange")public Exchange ttlExchange() {return ExchangeBuilder.directExchange(Constants.TTL_EXCHANGE).build();}@Bean("ttlQueueBind")public Binding ttlQueueBind(@Qualifier("ttlExchange") Exchange exchange,@Qualifier("ttlQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("ttl").noargs();}}
設(shè)置隊(duì)列的TTL,只需要在聲明隊(duì)列時(shí)給出過期時(shí)間即可。在測試的過程中,如果是先測試了消息的過期時(shí)間,那么在測試隊(duì)列時(shí),需要先將持久化的隊(duì)列給刪除,再啟動程序。
當(dāng)啟動程序之后,可以看到隊(duì)列上加了一個(gè)TTL的標(biāo)識,表明隊(duì)列的過期時(shí)間設(shè)置成功:
?
區(qū)別
設(shè)置隊(duì)列的過期時(shí)間,一旦消息過期,就會從隊(duì)列中刪除。
設(shè)置消息的過期時(shí)間,即使消息過期,如果消息不在隊(duì)首,還得等到消息到達(dá)隊(duì)首之后才會進(jìn)行判定是否過期。如果過期,那就刪除,反之就投遞到相應(yīng)的消費(fèi)者中。
為什么這兩種方法處理的方式不一樣?
因?yàn)樵O(shè)置隊(duì)列的過期時(shí)間,那么隊(duì)列中過期的消息一定在隊(duì)首,RabbitMQ只需要定期從隊(duì)首掃描消息是否有過期的消息即可。
而設(shè)置消息的過期時(shí)間,每條消息的過期時(shí)間都不一致,如果要刪除隊(duì)列的所有過期消息那么就要掃描整個(gè)隊(duì)列,所以不如等到消息要進(jìn)行投遞時(shí)再判斷消息是否過期,這樣可以減少一定的資源消耗。
?