安卓網站開發(fā)平臺東莞百度seo電話
目錄
當我們在項目中引入了新的中間件之后,數(shù)據的風險性就要多一層考慮。那么,RabbitMQ 的消息是怎么知道有沒有被消費者消費的呢?生產者又怎么確保自己發(fā)送成功了呢?這些問題將在文章中進行解答。
一、簡介
1.1 背景
在 MQ 中,消費者和生產者并不直接進行通信,生產者只負責把消息發(fā)送到隊列,消費者只負責從隊列獲取消息。
- 消費者從隊列 獲取到消息后,這條消息就不在隊列中了。如果此時消費者所在的信道 因為網絡中斷沒有消費到,那這條消息就 被永遠地丟失了。所以,我們希望等待消費者 成功消費掉這條消息之后再刪除消息。
- 生產者向交換機 發(fā)送消息后,也 不能保證消息準確發(fā)送過去了,消息就像 石沉大海 一樣,所以 發(fā)送消息也需要進行消息確認。
1.2 定義
為了保證消息從隊列可靠地到達消費者,RabbitMQ 提供了 消息確認機制(Message Acknowledgement)。
消費者在訂閱隊列時,可以指定 autoAck
參數(shù):
autoAck=false
:RabbitMQ 會 等待消費者顯式地回復確認信號 后才從內存(或磁盤)中移除消息(實際上時先打上刪除標記,之后再刪除)。autoAck=true
:RabbitMQ 會 自動把發(fā)送出去的消息置為確認,然后內存(或磁盤)中刪除,而 不管消費者是否真正地消費到了這些消息。
采用消息確認機制后,只要設置 autoAck
參數(shù)為 false
,消費者就有足夠的時間處理消息(任務),不用擔心處理消息過程中消費者進程掛掉后消息丟失的問題,因為 RabbitMQ 會一直等待持有消息知道消費者顯式調用 Basic.Ack
命令為止。
對于 RabbitMQ 服務器端而言,當 autoAck
參數(shù)為 false
時,隊列中的消息分成了兩部分:
- 一部分是 等待投遞給消費者的消息;
- 另一部分是 已經投遞給消費者,但是還沒有收到消費者確認信號的消息。
如果 RabbitMQ 服務器端 一直沒有收到消費者的確認信息,并且 消費此消息的消費者已經斷開連接,則服務器端會安排 該消息重新進入隊列,等待投遞給下一個消費者(也可能還是原來的那個消費者)。
RabbitMQ 不會為未確認的消息設置過期時間,它 判斷此消息是否需要重新投遞給消費者的唯一依據是該消息連接是否已經斷開,這個設計的原因是 RabbitMQ 允許消費者消費一條消息的時間可以很久很久。
1.3 如何查看確認/未確認的消息數(shù)?
RabbitMQ 的 Web 管理平臺上可以看到當前隊列中的 “Ready” 狀態(tài)和 “Unacknowledged” 狀態(tài)的消息數(shù):
- Read 狀態(tài): 等待投遞給消費者的消息數(shù)。
- Unacknowledged 狀態(tài): 已經投遞給消費者但是未收到確認信號的消息樹。
二、消息確認機制的分類
RabbitMQ 消息確認機制分為兩大類:
消息發(fā)送確認
,又分為:- 生產者到交換機的確認;
- 交換機到隊列的確認。
消息接收確認
。
2.1 消息發(fā)送確認
RabbitMQ 的消息發(fā)送確認有兩種實現(xiàn)方式:ConfirmCallback 方法、ReturnCallback 方法。
1)ConfirmCallback方法
ConfirmCallback
是一個回調接口,用于確認消息否是到達交換機中。
配置方式:
spring.rabbitmq.publisher-confirm-type=correlated
它有三個值:
none
:禁用發(fā)布確認模式,默認值。correlated
:發(fā)布消息成功到交換機后觸發(fā)回調方法。simple
:經測試有兩種效果:一是和 correlated 一樣會觸發(fā)回調方法;二是在發(fā)布消息成功后使用 rabbitTemplate 調用 waitForConfirm 或 waitForConfirmsOrDie方法等待 broker 節(jié)點返回發(fā)送結果,根據返回結果來判定下一步的邏輯。要注意的是 waitForConfirmsOrDie 方法如果返回 false 則會關閉 channel,則接下來無法發(fā)送消息到 broker。
2)ReturnCallback方法
ReturnCallback
也是一個回調接口,用于確認消息是否在交換機中路由到了隊列。
(該方法可以不使用,因為交換機和隊列是在代碼里面綁定的,如果消息成功投遞到 Broker 后幾乎不存在綁定隊列失敗,除非代碼寫錯了。)
配置方式:
spring.rabbitmq.publisher-returns=true
3)代碼實現(xiàn)方式一:統(tǒng)一配置
a.配置類
RabbitDirectConfig.java
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** <p> @Title RabbitDirectConfig* <p> @Description 直連交換機配置* Direct Exchange是RabbitMQ默認的交換機模式,也是最簡單的模式,根據key全文匹配去尋找隊列。** @author ACGkaka* @date 2023/1/12 15:09*/
@Slf4j
@Configuration
public class RabbitDirectConfig {public static final String DIRECT_EXCHANGE_NAME = "TEST_DIRECT_EXCHANGE";public static final String DIRECT_ROUTING_NAME = "TEST_DIRECT_ROUTING";public static final String DIRECT_QUEUE_NAME = "TEST_DIRECT_QUEUE";@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);// 設置開啟Mandatory,才能觸發(fā)回調函數(shù),無論消息推送結果怎么樣都強制調用回調函數(shù)rabbitTemplate.setMandatory(true);//設置message序列化方法rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());// 設置消息發(fā)送到交換機(Exchange)回調rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {log.info(">>>>>>>>>>【INFO】消息發(fā)送到交換機(Exchange)成功, 相關數(shù)據: {}", correlationData);} else {log.error(">>>>>>>>>>【ERROR】消息發(fā)送到交換機(Exchange)失敗, 錯誤原因: {}, 相關數(shù)據: {}", cause, correlationData);}});// 設置消息發(fā)送到隊列(Queue)回調(經測試,只有失敗才會調用)rabbitTemplate.setReturnsCallback((returnedMessage) -> {log.error(">>>>>>>>>>【ERROR】消息發(fā)送到隊列(Queue)失敗:響應碼: {}, 響應信息: {}, 交換機: {}, 路由鍵: {}, 消息內容: {}",returnedMessage.getReplyCode(), returnedMessage.getReplyText(), returnedMessage.getExchange(), returnedMessage.getRoutingKey(), returnedMessage.getMessage());});return rabbitTemplate;}/*** 消息監(jiān)聽-反序列化*/@Beanpublic RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(new Jackson2JsonMessageConverter());return factory;}/*** 隊列,命名:testDirectQueue** @return 隊列*/@Beanpublic Queue testDirectQueue() {// durable:是否持久化,默認是false,持久化隊列:會被存儲在磁盤上,當消息代理重啟時仍然存在,暫存隊列:當前連接有效// exclusive:默認false,只能被當前創(chuàng)建的連接使用,而且當連接關閉后隊列即被刪除。此參考優(yōu)先級高于durable。// autoDelete:是否自動刪除,當沒有生產者或消費者使用此隊列,該隊列會自動刪除。// 一般設置一下隊列的持久化就好,其余兩個默認falsereturn new Queue(DIRECT_QUEUE_NAME, true);}/*** Direct交換機,命名:testDirectExchange* @return Direct交換機*/@BeanDirectExchange testDirectExchange() {return new DirectExchange(DIRECT_EXCHANGE_NAME, true, false);}/*** 綁定 將隊列和交換機綁定,并設置用于匹配鍵:testDirectRouting* @return 綁定*/@BeanBinding bindingDirect() {return BindingBuilder.bind(testDirectQueue()).to(testDirectExchange()).with(DIRECT_ROUTING_NAME);}
}
a.生產者
SendMessageController.java
import com.demo.config.RabbitDirectConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;/*** <p> @Title SendMessageController* <p> @Description 推送消息接口** @author ACGkaka* @date 2023/1/12 15:23*/
@Slf4j
@RestController
public class SendMessageController {/*** 使用 RabbitTemplate,這提供了接收/發(fā)送等方法。*/@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/sendDirectMessage")public String sendDirectMessage() {String messageId = String.valueOf(UUID.randomUUID());String messageData = "Hello world.";String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String, Object> map = new HashMap<>();map.put("messageId", messageId);map.put("messageData", messageData);map.put("createTime", createTime);// 將消息攜帶綁定鍵值:TEST_DIRECT_ROUTING,發(fā)送到交換機:TEST_DIRECT_EXCHANGErabbitTemplate.convertAndSend(RabbitDirectConfig.DIRECT_EXCHANGE_NAME, RabbitDirectConfig.DIRECT_ROUTING_NAME, map);return "OK";}}
c.消費者
DirectReceiver.java
import com.demo.config.RabbitDirectConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;/*** <p> @Title DirectReceiver* <p> @Description 直連交換機監(jiān)聽類** @author ACGkaka* @date 2023/1/12 15:59*/
@Slf4j
@Component
public class DirectReceiver {@RabbitListener(queues = RabbitDirectConfig.DIRECT_QUEUE_NAME)public void process(Map<String, Object> testMessage) {System.out.println("DirectReceiver消費者收到消息:" + testMessage.toString());}}
d.測試結果
成功發(fā)送時,執(zhí)行結果:
交換機錯誤時,執(zhí)行結果:
路由鍵錯誤時,執(zhí)行結果:
4)代碼實現(xiàn)方式二:單獨配置
除了在配置類里面統(tǒng)一設置回調方法外,還可以在每次推送消息到隊列時,手動使用 CorrelationData
指定回調方法。
@GetMapping("/sendDirectMessage2")
public String sendDirectMessage2() {String messageId = String.valueOf(UUID.randomUUID());String messageData = "Hello world.";String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String, Object> map = new HashMap<>();map.put("messageId", messageId);map.put("messageData", messageData);map.put("createTime", createTime);//生成唯一標識CorrelationData correlationData = new CorrelationData(messageId);//不管成功失敗都會調用confirm或者throwable,這是異步調用correlationData.getFuture().addCallback(confirm -> {// 設置消息發(fā)送到交換機(Exchange)回調if (confirm != null && confirm.isAck()) {log.info(">>>>>>>>>>【INFO】發(fā)送成功ACK,msgId: {}, message: {}", correlationData.getId(), map);} else {log.error(">>>>>>>>>>【ERROR】發(fā)送失敗NACK,msgId: {}, message: {}", correlationData.getId(), map);}},throwable -> {//發(fā)生錯誤,鏈接mq異常,mq未打開等...報錯回調System.out.println("發(fā)送失敗throwable = " + throwable + ", id:" + correlationData.getId());});// 將消息攜帶綁定鍵值:TEST_DIRECT_ROUTING,發(fā)送到交換機:TEST_DIRECT_EXCHANGErabbitTemplate.convertAndSend(RabbitDirectConfig.DIRECT_EXCHANGE_NAME, RabbitDirectConfig.DIRECT_ROUTING_NAME, map, correlationData);return "OK";
}
2.2 消息接收確認
消費者確認發(fā)生在 監(jiān)聽隊列的消費者處理業(yè)務失敗,如:發(fā)生了異常、不符合要求的數(shù)據等。這些場景就 需要我們手動處理消息,比如:重新發(fā)送消息或者丟棄消息。
RabbitMQ 的 消息確認機制(ACK)
默認是自動確認的。自動確認會 在消息發(fā)送給消費者后立即確認,但 存在丟失消息的可能。如果消費端消費邏輯拋出了異常,假如我們使用了事務的回滾,也只是保證了數(shù)據的一致性,消息還是丟失了。也就是消費端沒有處理成功這條消息,那么就相當于丟失了消息。
消息的確認模式有三種:
AcknowledgeMode.NONE
:自動確認。(默認)AcknowledgeMode.AUTO
:根據情況確認。AcknowledgeMode.MANUAL
:手動確認。(推薦)
消費者收到消息后,手動調用 Channel 的 basicAck()
/basicReject()
/basicNack()
方法后,RabbitMQ 收到消息后,才認為本次投遞完成。
basicAck()
:用于確認當前消息。basicReject()
:用于拒絕當前消息,可以自定義是否重回隊列。basicNack()
:用于批量拒絕消息(這是 AMPQ 0-9-1 的 RabbitMQ 擴展)。
1)basicAck() 方法
basicAck()
方法 用于確認當前消息,Channel 類中的方法定義如下:
void basicAck(long deliveryTag, boolean multiple) throws IOException;
參數(shù)說明:
- long deliveryTag: 當一個消費者向 RabbitMQ 注冊后,會建立起一個 Channel。RabbitMQ 會用
basic.deliver
方法向消費者推送消息,這個方法攜帶了一個deliveryTag
,它代表了 RabbitMQ 向該 Channel 投遞的這條消息的唯一標識ID,是一個單調遞增的正整數(shù),deliveryTag
的范圍僅限于當前 Channel。 - boolean multiple: 是否批處理,一般為 false,當該參數(shù)為 true 時,則可以一次性確認
deliveryTag
小于等于傳入值的所有消息。
2)basicReject() 方法
basicReject()
方法 用于明確拒絕當前的消息。RabbitMQ 在 2.0.0 版本開始引入,Channel 類中的方法定義如下:
void basicReject(long deliveryTag, boolean requeue) throws IOException;
參數(shù)說明:
- long deliveryTag: 當一個消費者向 RabbitMQ 注冊后,會建立起一個 Channel。RabbitMQ 會用
basic.deliver
方法向消費者推送消息,這個方法攜帶了一個deliveryTag
,它代表了 RabbitMQ 向該 Channel 投遞的這條消息的唯一標識ID,是一個單調遞增的正整數(shù),deliveryTag
的范圍僅限于當前 Channel。 - boolean requeue: 是否重新放回隊列。
- 如果參數(shù)為 true,則 RabbitMQ 會重新將這條消息存入隊列,以便發(fā)送給下一個訂閱的消費者。
- 如果參數(shù)為 false,則 RabbitMQ 會立即把消息從隊列中移除,不會把它發(fā)送給新的消費者。
3)basicNack() 方法
basicNack()
方法 用于批量拒絕消息。由于 basicReject() 方法一次只能拒絕一條消息,如果想批量拒絕消息,則可以使用 basicNack() 方法。Channel 類中的方法定義如下:
參數(shù)說明:
- long deliveryTag: 當一個消費者向 RabbitMQ 注冊后,會建立起一個 Channel。RabbitMQ 會用
basic.deliver
方法向消費者推送消息,這個方法攜帶了一個deliveryTag
,它代表了 RabbitMQ 向該 Channel 投遞的這條消息的唯一標識ID,是一個單調遞增的正整數(shù),deliveryTag
的范圍僅限于當前 Channel。 - boolean multiple: 是否批處理,一般為 false,當該參數(shù)為 true 時,則可以一次性確認
deliveryTag
小于等于傳入值的所有消息。 - boolean requeue: 是否重新放回隊列。
- 如果參數(shù)為 true,則 RabbitMQ 會重新將這條消息存入隊列,以便發(fā)送給下一個訂閱的消費者。
- 如果參數(shù)為 false,則 RabbitMQ 會立即把消息從隊列中移除,不會把它發(fā)送給新的消費者。
4)代碼實現(xiàn)
a.配置方式一:代碼配置
如果我們之前配置了 Jackson2JsonMessageConverter.java
的序列化方式,那么我們可以接著指定消費方的消息確認模式為 AcknowledgeMode.MANUL
。
/*** 消息監(jiān)聽配置*/
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();// 設置連接工廠factory.setConnectionFactory(connectionFactory);// 設置消息確認模式factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);// 設置反序列化factory.setMessageConverter(new Jackson2JsonMessageConverter());return factory;
}
b.配置方式二:配置文件
我們可以直接在 application.yml
中進行如下配置:
# 確認模式,默認auto,自動確認;manual:手動確認
spring.rabbitmq.listener.simple.acknowledge-mode=manual
注意: yaml中指定的是消費端容器的默認配置,如果我們在代碼中有自定義注入
RabbitListenerContainerFactory
示例之后,還需要使用默認配置,需要在代碼中進行設置,如下所示:
@Autowired
private SimpleRabbitListenerContainerFactoryConfigurer configurer;/*** 消息監(jiān)聽配置*/
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();// 設置連接工廠factory.setConnectionFactory(connectionFactory);// 采用yaml中的配置configurer.configure(factory, connectionFactory);// 設置反序列化factory.setMessageConverter(new Jackson2JsonMessageConverter());return factory;
}
c.生產者
SendMessageController.java
import com.demo.config.RabbitDirectConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;/*** <p> @Title SendMessageController* <p> @Description 推送消息接口** @author ACGkaka* @date 2023/1/12 15:23*/
@Slf4j
@RestController
public class SendMessageController {/*** 使用 RabbitTemplate,這提供了接收/發(fā)送等方法。*/@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/sendDirectMessage")public String sendDirectMessage() {String messageId = String.valueOf(UUID.randomUUID());String messageData = "Hello world.";String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String, Object> map = new HashMap<>();map.put("messageId", messageId);map.put("messageData", messageData);map.put("createTime", createTime);// 將消息攜帶綁定鍵值:TEST_DIRECT_ROUTING,發(fā)送到交換機:TEST_DIRECT_EXCHANGErabbitTemplate.convertAndSend(RabbitDirectConfig.DIRECT_EXCHANGE_NAME, RabbitDirectConfig.DIRECT_ROUTING_NAME, map);return "OK";}}
d.消費者
DirectReceiver.java
import com.demo.config.RabbitDirectConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.Map;/*** <p> @Title DirectReceiver* <p> @Description 直連交換機監(jiān)聽類** @author ACGkaka* @date 2023/1/12 15:59*/
@Slf4j
@Component
public class DirectReceiver {@RabbitListener(queues = RabbitDirectConfig.DIRECT_QUEUE_NAME)public void process(Map<String, Object> testMessage, Message message, Channel channel) throws IOException {try {log.info("DirectReceiver消費者收到消息: {}", testMessage.toString());// 手動答應消費完成,從隊列中刪除該消息channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);} catch (Exception e) {log.error("DirectReceiver消費者消費失敗,原因: {}", e.getMessage(), e);// 手動答應消費完成,從隊列中刪除該消息(不重回隊列)channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);}}}
e.測試結果
場景一:消費者進行手動確認,生產者推送2條消息:
可以看到,生產者推送2條消息后立馬被消費了。
場景二:消費者不進行手動確認,生產者推送2條消息:
雖然消費者消費完畢,但是由于沒有進行手動確認,所以2條消息會一直處于 Unacked
狀態(tài),直到消費者下線。
關閉 SpringBoot 程序,消費者下線后,消息由 Unacked
狀態(tài)轉為 Ready
狀態(tài),等待下一個消費者上線后重新進行消費。
整理完畢,完結撒花~ 🌻
參考地址:
1.RabbitMQ(4):消息確認機制詳解,https://juejin.cn/post/7029232312197840904
2.RabbitMQ消息確認機制(ACK),https://blog.csdn.net/pan_junbiao/article/details/112956537
3.RabbitMQ高級,https://blog.csdn.net/hnhroot/article/details/125921527
4.關于rabbitMQ在yml配置手動ack不生效,重復答應的問題,https://blog.csdn.net/love_Saber_Archer/article/details/109111088