做網(wǎng)站推廣有前景嗎站內(nèi)推廣和站外推廣的區(qū)別
1.前言
在生產(chǎn)環(huán)境中由于一些不明原因,導(dǎo)致 RabbitMQ 重啟,在 RabbitMQ 重啟期間生產(chǎn)者消息投遞失敗, 導(dǎo)致消息丟失,需要手動(dòng)處理和恢復(fù)。于是,我們開始思考,如何才能進(jìn)行 RabbitMQ 的消息可靠投遞呢?
2.添加配置信息
在application.properties文件中添加如下配置,交換機(jī)開啟消息確認(rèn)模式
#NONE 值是禁用發(fā)布確認(rèn)模式,是默認(rèn)值
#CORRELATED 值是發(fā)布消息成功到交換器后會(huì)觸發(fā)回調(diào)方法
#SIMPLE 值經(jīng)測試有兩種效果,其一效果和 CORRELATED 值一樣會(huì)觸發(fā)回調(diào)方法,
# 其二在發(fā)布消息成功后使用 rabbitTemplate 調(diào)用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 節(jié)點(diǎn)返回發(fā)送結(jié)果,
# 根據(jù)返回結(jié)果來判定下一步的邏輯,要注意的點(diǎn)是 waitForConfirmsOrDie 方法如果返回 false 則會(huì)關(guān)閉 channel,
# 則接下來無法發(fā)送消息到 broker;
spring.rabbitmq.publisher-confirm-type=correlated
- NONE 值是禁用發(fā)布確認(rèn)模式,是默認(rèn)值
- CORRELATED 值是發(fā)布消息成功到交換器后會(huì)觸發(fā)回調(diào)方法
- SIMPLE 值經(jīng)測試有兩種效果,其一效果和 CORRELATED 值一樣會(huì)觸發(fā)回調(diào)方法,其二在發(fā)布消息成功后使用rabbitTemplate 調(diào)用waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker節(jié)點(diǎn)返回發(fā)送結(jié)果,根據(jù)返回結(jié)果來判定下一步的邏輯,要注意的點(diǎn)是waitForConfirmsOrDie 方法如果返回 false則會(huì)關(guān)閉 channel,則接下來無法發(fā)送消息到 broker;
3. 配置類
package com.hong.springboot.rabbitmq.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @Description: 發(fā)布確認(rèn)高級(jí)版配置類* @Author: hong* @Date: 2024-03-05 20:52* @Version: 1.0**/
@Configuration
public class ConfirmConfig {public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";public static final String CONFIRM_QUEUE_NAME = "confirm.queue";public static final String CONFIRM_ROUTING_KEY = "key1";//聲明業(yè)務(wù) Exchange@Bean("confirmExchange")public DirectExchange confirmExchange() {return new DirectExchange(CONFIRM_EXCHANGE_NAME);}// 聲明確認(rèn)隊(duì)列@Bean("confirmQueue")public Queue confirmQueue() {return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();}// 聲明確認(rèn)隊(duì)列綁定關(guān)系@Beanpublic Binding queueBinding(@Qualifier("confirmQueue") Queue queue,@Qualifier("confirmExchange") DirectExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(CONFIRM_ROUTING_KEY);}
}
4.生產(chǎn)者
package com.hong.springboot.rabbitmq.controller;import com.hong.springboot.rabbitmq.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.text.SimpleDateFormat;
import java.util.Date;/*** @Description: 發(fā)布確認(rèn)高級(jí)版生產(chǎn)者* @Author: hong* @Date: 2024-03-05 20:58* @Version: 1.0**/
@Slf4j
@RequestMapping("/confirm/")
@RestController
public class ConfirmProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;//http://localhost:8080/confirm/sendMsg/Hi,JAVA小生不才@GetMapping("sendMsg/{message}")public void sendMsg(@PathVariable String message) {log.info("當(dāng)前時(shí)間:{},發(fā)送信息給隊(duì)列:{}",new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) , message);rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY, message);}
}
5.消費(fèi)者
package com.hong.springboot.rabbitmq.consumer;import com.hong.springboot.rabbitmq.config.ConfirmConfig;
import com.hong.springboot.rabbitmq.config.DelayedQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.text.SimpleDateFormat;
import java.util.Date;/*** @Description: 發(fā)布確認(rèn)高級(jí)版消費(fèi)者* @Author: hong* @Date: 2024-03-05 21:05* @Version: 1.0**/
@Slf4j
@Component
public class ConfirmConsumer {@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)public void receiveConfirmMessage(Message message){String msg = new String(message.getBody());log.info("當(dāng)前時(shí)間:{},收到信息{}",new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) , msg);}
}
正常情況下,發(fā)送http://localhost:8080/confirm/sendMsg/Hi,JAVA小生不才
6.回調(diào)接口
package com.hong.springboot.rabbitmq.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;/*** @Description: 發(fā)布確認(rèn)高級(jí)版消息生產(chǎn)者的回調(diào)接口* @Author: hong* @Date: 2024-03-09 21:58* @Version: 1.0**/
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback{@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){rabbitTemplate.setConfirmCallback(this);}/*** 交換機(jī)不管是否收到消息的一個(gè)回調(diào)方法* 1.收到消息* correlationData 保存回調(diào)消息的id及相關(guān)信息* b true 交換機(jī)收到消息* s null* 2.未收到消息* correlationData 保存回調(diào)消息的id及相關(guān)信息* b false 交換機(jī)未收到消息* s 失敗的原因* @param correlationData 消息相關(guān)數(shù)據(jù)* @param b 交換機(jī)是否收到消息* @param s 沒收到消息的原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {String id = correlationData != null ? correlationData.getId() : "";if (b) {log.info("交換機(jī)已經(jīng)收到id為:{}的消息", id);} else {log.info("交換機(jī)還未收到id為:{}消息,原因:{}", id, s);}}
}
修改ConfirmProducerController中sendMsg方法
交換機(jī)改個(gè)名字模擬交換機(jī)收不到消息
@GetMapping("sendMsg/{message}")public void sendMsg(@PathVariable String message) {CorrelationData correlationData = new CorrelationData("1");log.info("當(dāng)前時(shí)間:{},發(fā)送信息給隊(duì)列:{}",new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) , message);rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME+"123", ConfirmConfig.CONFIRM_ROUTING_KEY, message,correlationData);}
將routingKey改個(gè)名字模擬隊(duì)列收不到消息
@GetMapping("sendMsg/{message}")public void sendMsg(@PathVariable String message) {CorrelationData correlationData1 = new CorrelationData("1");log.info("當(dāng)前時(shí)間:{},發(fā)送信息給隊(duì)列:{}",new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) , message+"----"+ConfirmConfig.CONFIRM_ROUTING_KEY);rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY,message+"----"+ConfirmConfig.CONFIRM_ROUTING_KEY,correlationData1);CorrelationData correlationData2 = new CorrelationData("2");log.info("當(dāng)前時(shí)間:{},發(fā)送信息給隊(duì)列:{}",new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) , message+"----"+ConfirmConfig.CONFIRM_ROUTING_KEY+"abc");rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY+"abc",message+"----"+ConfirmConfig.CONFIRM_ROUTING_KEY+"abc",correlationData2);}
7.回退消息
從以上模擬場景可以看出,在僅開啟生產(chǎn)者確認(rèn)機(jī)制,交換機(jī)接收到消息后,會(huì)直接給生產(chǎn)者發(fā)送確認(rèn)消息,但若發(fā)現(xiàn)該消息不可路由,那么消息會(huì)被直接丟棄,此時(shí)生產(chǎn)者是不知道消息被丟棄的。因此我們借用mandatory參數(shù)在當(dāng)消息傳遞過程中不可達(dá)目的地時(shí)將消息返回給生產(chǎn)者。
7.1.開啟消息回退機(jī)制
配置文件中添加如下配置
#開啟消息回退機(jī)制
spring.rabbitmq.publisher-returns=true
7.2. 添加消息回退回調(diào)
@PostConstructpublic void init(){rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnsCallback(this);}/*** 當(dāng)消息傳遞過程中不可達(dá)目的地時(shí)將消息返回給生產(chǎn)者* 只有不可達(dá)目的地時(shí)才回調(diào)* @param returnedMessage*/@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.error("消息:{},被交換機(jī) {} 退回,原因:{},路由key:{},code:{}",new String(returnedMessage.getMessage().getBody()), returnedMessage.getExchange(),returnedMessage.getReplyText(), returnedMessage.getRoutingKey(), returnedMessage.getReplyCode());}