特產(chǎn)網(wǎng)站怎么做宣傳推廣方式
本文給大家介紹一下在 Spring Boot 項目中如何集成消息隊列 RabbitMQ,包含對 RibbitMQ 的架構介紹、應用場景、坑點解析以及代碼實戰(zhàn)。
我將使用 waynboot-mall 項目作為代碼講解,項目地址:https://github.com/wayn111/waynboot-mall。本文大綱如下,
RabbitMQ 架構介紹
RibbitMQ 是一個基于 AMQP 協(xié)議的開源消息隊列系統(tǒng),具有高性能、高可用、高擴展等特點。通常作為在系統(tǒng)間傳遞消息的中間件,它可以實現(xiàn)異步處理、應用解耦、流量削峰等功能。
RibbitMQ 的主要組件介紹如下,
-
producter:生產(chǎn)者,創(chuàng)建消息,然后將消息發(fā)布(發(fā)送)到 RabbitMQ。
-
channel: 信道,多路復用連接中的一條獨立的雙向數(shù)據(jù)流通道。信道是建立在真實的 TCP 連接內地虛擬鏈接,AMQP 命令都是通過信道發(fā)出去的,不管是發(fā)布消息、訂閱隊列還是接收消息,這些動作都是通過信道完成。因為對于操作系統(tǒng)來說,建立和銷毀 TCP 都是非常昂貴的開銷,所以引入了信道的概念,以復用一條 TCP 連接。
-
broker: 標識消息隊列服務器實體 rabbitmq-server。
-
連接器:這是負責接收客戶端連接請求和建立連接的組件。RabbitMQ 支持多種連接器,如 AMQP 0-9-1, AMQP 1.0, MQTT, STOMP 等。
-
v-host:虛擬主機,這是 RabbitMQ 的邏輯隔離單元,每個虛擬主機相當于一個獨立的代理,擁有自己的交換器、隊列、綁定、權限等。不同的虛擬主機之間是相互隔離的,不能共享資源。一個 RabbitMQ 實例可以創(chuàng)建多個虛擬主機,以滿足不同的業(yè)務需求。
-
exchange:交換機,這是負責接收生產(chǎn)者發(fā)送的消息,并根據(jù)路由規(guī)則將消息分發(fā)到相應的隊列或者其他交換器的組件。RabbitMQ 支持多種類型的交換器,如 fanout, direct, topic, headers 等。
-
binding:綁定,這是負責將交換器和隊列之間建立關聯(lián)關系的組件。綁定可以指定一個路由鍵或者模式匹配規(guī)則,以決定哪些消息可以被路由到哪些隊列。
-
queue:隊列,這是負責存儲消費者需要消費的消息的組件。隊列可以有多種屬性和特性,如持久化、排他性、自動刪除、死信隊列、優(yōu)先級隊列等。隊列可以綁定到一個或多個交換器上,并指定一個或多個路由鍵或者模式匹配規(guī)則。
-
consuemer:消費者,連接到 RabbitMQ 服務器,并訂閱到隊列上,接收來自隊列的消息。
應用場景
RabbitMQ 是一個非常強大和靈活的消息中間件,它可以應用于多種場景和需求。以下是一些常見的 RabbitMQ 應用場景和實戰(zhàn)經(jīng)驗:
-
異步處理:當系統(tǒng)需要執(zhí)行一些耗時或者不重要的任務時,可以使用 RabbitMQ 將任務封裝成消息發(fā)送到隊列中,然后由專門的消費者來異步地執(zhí)行這些任務。這樣可以提高系統(tǒng)的響應速度和用戶體驗,同時也可以避免因為任務失敗或超時而影響主流程的執(zhí)行。例如在 waynboot-mall 項目中,用戶下單后需要發(fā)送郵件通知,這個任務就可以使用 RabbitMQ 異步處理。
-
流量削峰:當系統(tǒng)面臨突發(fā)的高并發(fā)請求時,如果直接讓所有請求打到后端服務器上,可能會導致服務器崩潰或者響應緩慢。這時可以使用 RabbitMQ 作為一個緩沖層,將請求先發(fā)送到隊列中,然后由后端服務器按照自己的處理能力從隊列中拉取請求進行處理。這樣可以平滑地分攤請求壓力,避免系統(tǒng)崩潰或者服務降級。例如,在 waynboot-mall 項目中,每天晚上八點有秒殺活動,這時可以使用 RabbitMQ 來削峰限流,保證系統(tǒng)的穩(wěn)定運行。
-
消息廣播:當系統(tǒng)需要將消息發(fā)送到多個接收方時,可以使用 RabbitMQ 的發(fā)布/訂閱模式,將消息發(fā)送到一個 fanout 類型的交換器上,然后由多個隊列綁定到這個交換器上,從而實現(xiàn)消息的廣播功能。這樣可以實現(xiàn)一對多的消息通信,同時也可以根據(jù)不同的業(yè)務需求,訂閱不同的消息內容。例如,在 waynboot-mall 項目中,當商品信息發(fā)生變化時,需要通知搜索系統(tǒng)、推薦系統(tǒng)、緩存系統(tǒng)等多個系統(tǒng),這時可以使用 RabbitMQ 的消息廣播功能。
-
消息路由:當系統(tǒng)需要根據(jù)不同的條件將消息發(fā)送到不同的接收方時,可以使用 RabbitMQ 的路由模式,將消息發(fā)送到一個 direct 或者 topic 類型的交換器上,然后由多個隊列綁定到這個交換器上,并指定不同的路由鍵或者模式匹配規(guī)則,從而實現(xiàn)消息的路由功能。這樣可以實現(xiàn)多對多的消息通信,同時也可以靈活地控制消息的分發(fā)和消費。例如,在 waynboot-mall 項目中,當訂單狀態(tài)發(fā)生變化時,需要通知不同的系統(tǒng)進行不同的處理,這時可以使用 RabbitMQ 的消息路由功能。
坑點分析
在使用 RabbitMQ 的過程中,有一些常見的問題需要注意:
-
消息確認:消息確認是 RabbitMQ 保證消息可靠傳遞的機制。消息確認分為生產(chǎn)者確認和消費者確認。生產(chǎn)者確認是指生產(chǎn)者發(fā)送消息后,等待 RabbitMQ 返回一個確認消息,表明消息已經(jīng)被正確接收和存儲。消費者確認是指消費者接收消息后,向 RabbitMQ 發(fā)送一個確認消息,表明消息已經(jīng)被正確處理和消費。在 waynboot-mall 項目中,消費者開啟了手動消息確認。
-
消息持久化:消息持久化是指將消息存儲到磁盤上,以防止 RabbitMQ 重啟或者崩潰時丟失消息。消息持久化需要滿足以下三個條件:交換器、隊列和消息都需要設置為持久化。持久化會影響 RabbitMQ 的性能,因為需要進行磁盤 IO 操作。建議根據(jù)業(yè)務需求選擇是否需要持久化消息,并合理地配置磁盤空間和清理策略。在 waynboot-mall 項目中,交換器、隊列設置了持久化,消息沒有設置持久化(消息設置持久化會對 RabbitMQ 的性能造成較大影響)。
-
死信隊列:死信隊列是指存儲那些因為某些原因無法被正常消費的消息的隊列。死信隊列可以用來處理一些異常或者失敗的情況,如消息過期、隊列達到最大長度、消費者拒絕等。建議使用死信隊列來監(jiān)控和處理這些情況,并根據(jù)業(yè)務需求選擇合適的重試或者補償策略。在 waynboot-mall 項目中,當訂單消費者處理消息失敗重試三次后,會將訂單消息發(fā)送到死信隊列。
-
集群和鏡像:集群和鏡像是 RabbitMQ 實現(xiàn)高可用和高擴展的兩種方式。集群是指將多個 RabbitMQ 實例組成一個邏輯單元,共享元數(shù)據(jù)和負載均衡。鏡像是指將同一個隊列在多個節(jié)點上創(chuàng)建副本,實現(xiàn)數(shù)據(jù)冗余和容錯。建議根據(jù)業(yè)務需求選擇合適的集群模式和鏡像類型,并注意集群中的網(wǎng)絡分區(qū)、腦裂等問題。
代碼實戰(zhàn)
在 waynboot-mall 項目中,消息層包含兩個模塊 waynboot-message-core 以及 waynboot-message-consumer,目錄結構如下,
|--?waynboot-message-core?????//?核心消息配置,供其他服務集成使用
|???|--?config
|???|--?constant
|???|--?dto
|--?waynboot-message-consumer?//?消息消費服務,訂閱隊列接收消息,調用其他服務執(zhí)行一些具體的業(yè)務邏輯
|???|--?api
|???|--?config
|???|--?consumer
waynboot-message-core 包目錄說明如下,
-
config:核心消息配置目錄,包含業(yè)務上使用的訂單消息、郵件消息、死信消息、延遲消息的交換機、隊列、路由綁定配置以及 RabbitTemplate 配置。
-
constants:核心消息配置的相關常量目錄,包含 MQ 的常量類,這里面會定義訂單、郵件、死信、延遲消息的交換機名稱、隊列名稱、路由鍵名稱等。
-
dto:核心消息配置的數(shù)據(jù)轉換實體目錄,包含 OrderDTO 等。
waynboot-message-consumer 包目錄說明如下,
-
api:消息消費服務調用其他服務定義的 api 包目錄,包含 MobileApi 類用來調用 moibile-api。
-
config:消息消費服務的核心配置目錄,包含 RestTemplate 配置類。
-
consumer:消息消費服務的消費者包目錄,包含下單、發(fā)送郵件、未支付訂單超時取消等消費者。
添加 POM 依賴
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>${spring-boot.version}</version>
</dependency>
指定虛擬主機
在 waynboot-mall 項目中,通過 yml 文件的 spring.rabbitmq.virtual-host=“/” 屬性來指定虛擬主機名稱。
建議大家在使用 RabbitMQ 時都配置好自己項目的虛擬主機名稱,來達到各系統(tǒng)資源隔離的目的。當然如果 RabbitMQ 服務只有一個項目在用,那就用默認的 / 作為虛擬主機名稱也是可以的。
小知識:出于多租戶和安全因素設計的,vhost 把 AMQP 的基本組件劃分到一個虛擬的分組中。每個 vhost 本質上就是一個 mini 版的 RabbitMQ 服務器,擁有自己的隊列、交換機、綁定和權限機制。當多個不同的用戶使用同一個 RabbitMQ 服務器時,可以劃分出多個虛擬主機。RabbitMQ 默認的虛擬主機路徑是 /。
生產(chǎn)者發(fā)送消息
在 waynboot-mall 項目中,用訂單消息來舉例,生產(chǎn)者發(fā)送消息需要經(jīng)過三個步驟
1. 創(chuàng)建訂單消息的交換機、隊列以及路由綁定
public?class?MQConstants?{public?static?final?String?ORDER_DIRECT_QUEUE?=?"order_direct_queue";public?static?final?String?ORDER_DIRECT_EXCHANGE?=?"order_direct_exchange";public?static?final?String?ORDER_DIRECT_ROUTING?=?"order_direct_routing";
}@Configuration
public?class?BusinessRabbitConfig?{@Beanpublic?Queue?orderDirectQueue()?{return?new?Queue(MQConstants.ORDER_DIRECT_QUEUE);}@BeanDirectExchange?orderDirectExchange()?{return?new?DirectExchange(MQConstants.ORDER_DIRECT_EXCHANGE);}@BeanBinding?bindingOrderDirect()?{return?BindingBuilder.bind(orderDirectQueue()).to(orderDirectExchange()).with(MQConstants.ORDER_DIRECT_ROUTING);}}
在 BusinessRabbitConfig 中,我們創(chuàng)建了訂單交換機、隊列以及路由綁定關系。在 Spring 項目中,項目啟動時,就會自動在 RabbitMQ 服務器上創(chuàng)建好這些東西。
交換機列表
隊列列表
2. 生產(chǎn)者配置
生產(chǎn)者的消息發(fā)送確認主要包含兩部分,
producter -> rabbitmq broker exchange -> queue
-
消息從 producte( 生產(chǎn)者)發(fā)送到 rabbitmq broker(RabbitMQ 服務器)的交換機中,發(fā)送后會觸發(fā) confirmCallBack 回調
-
消息從 exchange 發(fā)送到 queue,投遞失敗則會調用 returnCallBack 回調
waynboot-mall 項目的 yml 中關于 RabbitMQ 的相關配置如下,
spring:#?配置rabbitMq?服務器rabbitmq:host:?127.0.0.1port:?5672username:?guestpassword:?guest#?消息確認配置項#?確認消息已發(fā)送到交換機(Exchange)publisher-confirm-type:?correlated#?確認消息已發(fā)送到隊列(Queue)publisher-returns:?true#?虛擬主機名稱virtual-host:?/
publisher-confirm-type 屬性
可以看到,我們設置了 publisher-confirm-type 屬性為 correlated,表示開啟發(fā)布確認模式,用來確認消息已發(fā)送到交換機,publisher-confirm-type 有三個選項:
-
NONE:禁用發(fā)布確認模式,是默認值
-
CORRELATED:發(fā)布消息成功到交換器后會觸發(fā)回 confirmCallBack 回調方法
-
SIMPLE:經(jīng)測試有兩種效果,其一效果和 CORRELATED 值一樣會觸發(fā)回調方法,其二在發(fā)布消息成功后使用 rabbitTemplate 調用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 節(jié)點返回發(fā)送結果,根據(jù)返回結果來判定下一步的邏輯,要注意的點是 waitForConfirmsOrDie 方法如果返回 false 則會關閉 channel,則接下來無法發(fā)送消息到 broker。
publisher-returns 屬性
在 RabbitMQ 中,消息發(fā)送到交換機中也不代表消費者一定能接收到消息,所以我們還需要設置 publisher-returns 為 true 來表示確認交換機中消息已經(jīng)發(fā)送到隊列里。true 表示開啟失敗回調,開啟后當消息無法路由到指定隊列時會觸發(fā) ReturnCallback 回調。
接著是 RabbitTemplateConfig 的代碼,這里面會定義前面提到的 confirmCallBack、returnCallBack 相關代碼,
@Slf4j
@Component
public?class?RabbitTemplateConfig?{@Beanpublic?RabbitTemplate?rabbitTemplate(CachingConnectionFactory?connectionFactory)?{RabbitTemplate?rabbitTemplate?=?new?RabbitTemplate(connectionFactory);//?設置開啟Mandatory,才能觸發(fā)回調函數(shù),無論消息推送結果怎么樣都強制調用回調函數(shù)rabbitTemplate.setMandatory(true);//?交換機收到消息回調rabbitTemplate.setConfirmCallback((correlationData,?ack,?cause)?->?log.info("消息發(fā)送成功:correlationData({}),ack({}),cause({})",?correlationData,?ack,?cause));//?隊列收到消息回調,如果失敗的話會進行?returnCallback?的回調處理,反之成功就不會回調。rabbitTemplate.setReturnsCallback(returned?->?{log.info("returnCallback:?????"?+?"消息:"?+?returned.getMessage());log.info("returnCallback:?????"?+?"回應碼:"?+?returned.getReplyCode());log.info("returnCallback:?????"?+?"回應信息:"?+?returned.getReplyText());log.info("returnCallback:?????"?+?"交換機:"?+?returned.getExchange());log.info("returnCallback:?????"?+?"路由鍵:"?+?returned.getRoutingKey());});return?rabbitTemplate;}
}
在 RabbitTemplateConfig 類代碼里,我們可以設置 confirmCallBack、returnCallBack 回調函數(shù)后,監(jiān)控生產(chǎn)者發(fā)送消息是否被交換機接收、以及交換機是否把消息發(fā)送到隊列中。
3. 使用 RabbitTemplate 發(fā)送消息
在 Spring Boot 項目中,集成了 spring-boot-starter-amqp 依賴后,就可以直接注入 RabbitTemplate 來發(fā)送消息。
這里用 waynboot-mall 項目中的異步下單流程舉例,代碼如下,
@Slf4j
@Service
@AllArgsConstructor
public?class?OrderServiceImpl?extends?ServiceImpl<OrderMapper,?Order>?implements?IOrderService?{private?RabbitTemplate?rabbitTemplate;@Overridepublic?R?asyncSubmit(OrderVO?orderVO)?{OrderDTO?orderDTO?=?new?OrderDTO();...//?開始異步下單String?uid?=?IdUtil.getUid();//?1.?創(chuàng)建消息ID,確認機制發(fā)送消息時,需要給每個消息設置一個全局唯一?id,以區(qū)分不同消息,避免?ack?沖突CorrelationData?correlationData?=?new?CorrelationData(uid);//?2.?創(chuàng)建消息載體?Message?,AMQP?規(guī)范中定義的消息承載類,用來在生產(chǎn)者和消費者之前傳遞消息Map<String,?Object>?map?=?new?HashMap<>();map.put("order",?orderDTO);map.put("notifyUrl",?WaynConfig.getMobileUrl()?+?"/callback/order/submit");try?{Message?message?=?MessageBuilder.withBody(JSON.toJSONString(map).getBytes(Constants.UTF_ENCODING)).setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();//?3.?發(fā)送消息到?RabbitMQ?服務器,需要指定交換機、路由鍵、消息載體以及消息IDrabbitTemplate.convertAndSend(MQConstants.ORDER_DIRECT_EXCHANGE,?MQConstants.ORDER_DIRECT_ROUTING,?message,?correlationData);}?catch?(UnsupportedEncodingException?e)?{log.error(e.getMessage(),?e);}return?R.success().add("actualPrice",?actualPrice).add("orderSn",?orderSn);}
}
waynboot-mall 項目中在使用 rabbitTemplate 發(fā)送消息時,按照如下步驟,大家可以參考
-
創(chuàng)建消息 ID,確認機制發(fā)送消息時,需要給每個消息設置一個全局唯一 id,以區(qū)分不同消息,消費者消費時出現(xiàn) ack 沖突。
-
創(chuàng)建消息載體 Message ,AMQP 規(guī)范中定義的消息承載類,用來在生產(chǎn)者和消費者之前傳遞消息。
-
發(fā)送消息到 RabbitMQ 服務器,需要指定交換機、路由鍵、消息載體以及消息 ID。
以上就是生產(chǎn)者發(fā)送消息時所有相關代碼了,接著我們看下消費者處理消息的相關代碼。
消費者處理消息
在 waynboot-mall 項目中,還是用訂單消息來舉例,消費者 yml 配置如下,
1. 消費者配置
在 RabbitMQ 的消息消費環(huán)節(jié),需要注意的一點就是,如果需要確保消費者不出現(xiàn)漏消費,則需要開啟消費者的手動 ack 模式。
spring:rabbitmq:host:?127.0.0.1port:?5672username:?guestpassword:?guest...listener:simple:#?消息確認方式,其有三種配置方式,分別是none、manual(手動ack)?和auto(自動ack)?默認autoacknowledge-mode:?manual#?一個消費者最多可處理的nack(未確認)消息數(shù)量,默認是250prefetch:?250#?設置消費者數(shù)量concurrency:?1
acknowledge-mode 屬性
在 yml 文件的消費者配置中,acknowledge-mode 屬性用于指定消息確認模式,有三種模式:
-
手動確認 manual,在該模式下,消費者消費消息后需要根據(jù)消費情況給 Broker 返回一個回執(zhí),是確認 ack 使 Broker 刪除該條已消費的消息,還是失敗確認返回 nack,還是拒絕該消息。開啟手動確認后,如果消費者接收到消息后還沒有返回 ack 就宕機了,這種情況下消息也不會丟失,只有 RabbitMQ 接收到返回 ack 后,消息才會從隊列中被刪除。
-
自動確認 none,rabbitmq 默認消費者正確處理所有請求(不設置時的默認方式)。
- 根據(jù)請況確認 auto,主要分成以下幾種情況:
-
如果消費者在消費的過程中沒有拋出異常,則自動確認。
-
當消費者消費的過程中拋出 AmqpRejectAndDontRequeueException 異常的時候,則消息會被拒絕,且該消息不會重回隊列。
-
當拋出 ImmediateAcknowledgeAmqpException 異常,消息會被確認。
-
如果拋出其他的異常,則消息會被拒絕,但是與前兩個不同的是,該消息會重回隊列,如果此時只有一個消費者監(jiān)聽該隊列,那么該消息重回隊列后又會推送給該消費者,會造成死循環(huán)的情況。
-
prefetch 屬性
消費者配置中,prefetch 屬性用于指定消費者每次從隊列獲取的消息數(shù)量。
每個 customer 會在 MQ 預取一些消息放入內存的 LinkedBlockingQueue 中進行消費,這個值越高,消息傳遞的越快,但非順序處理消息的風險更高。如果 ack 模式為 none,則忽略。
prefetch 默認值以前是 1,這可能會導致高效使用者的利用率不足。從 spring-amqp 2.0 版開始,默認的 prefetch 值是 250,這將使消費者在大多數(shù)常見場景中保持忙碌,從而提高吞吐量。
不過在有些情況下,尤其是處理速度比較慢的大消息,消息可能在內存中大量堆積,消耗大量內存;以及對于一些嚴格要求順序的消息,prefetch 的值應當設置為 1。
對于低容量消息和多個消費者的情況(也包括單 listener 容器的 concurrency 配置)希望在多個使用者之間實現(xiàn)更均勻的消息分布,建議在手動 ack 下并設置 prefetch=1。
如果要保證消息的可靠不丟失,當 prefetch 大于 1 時,可能會出現(xiàn)因為服務宕機引起的數(shù)據(jù)丟失,故建議將 prefetch=1。
concurrency 屬性
消費者配置中,concurrency 屬性設置的是對每個 listener 在初始化的時候設置的并發(fā)消費者的個數(shù)。在上面的 yml 配置中,concurrency=1,即每個 Listener 容器將開啟一個線程去處理消息。在 2.0 以后的版本中,可以在注解中配置該參數(shù),實例代碼如下,
@RabbitListener(queues?=?MQConstants.ORDER_DIRECT_QUEUE,?concurrency?=?"2")
public?void?process(Channel?channel,?Message?message)?throws?IOException?{String?body?=?new?String(message.getBody());log.info("OrderPayConsumer?消費者收到消息:?{}",?body);...
}
2. 使用 RabbitListener 注解消費消息
在 waynboot-mall 項目中,消費者監(jiān)聽隊列代碼如下,
@Slf4j
@Component
public?class?OrderPayConsumer?{@Resourceprivate?RedisCache?redisCache;@Resourceprivate?MobileApi?mobileApi;@RabbitListener(queues?=?MQConstants.ORDER_DIRECT_QUEUE)public?void?process(Channel?channel,?Message?message)?throws?IOException?{//?1.?轉換訂單消息String?body?=?new?String(message.getBody());log.info("OrderPayConsumer?消費者收到消息:?{}",?body);//?2.?獲取消息IDString?msgId?=?message.getMessageProperties().getHeader("spring_returned_message_correlation");//?3.?獲取發(fā)送taglong?deliveryTag?=?message.getMessageProperties().getDeliveryTag();//?4.?消費消息冪等性處理if?(redisCache.getCacheObject(ORDER_CONSUMER_MAP.getKey())?!=?null)?{//?redis中包含該?key,說明該消息已經(jīng)被消費過log.error("msgId:?{},消息已經(jīng)被消費",?msgId);channel.basicAck(deliveryTag,?false);//?確認消息已消費return;}try?{//?5.?下單處理mobileApi.submitOrder(body);//?6.?手動ack,消息成功確認channel.basicAck(deliveryTag,?false);//?7.?設置消息已被消費標識redisCache.setCacheObject(ORDER_CONSUMER_MAP.getKey(),?msgId,?ORDER_CONSUMER_MAP.getExpireSecond());}?catch?(Exception?e)?{channel.basicNack(deliveryTag,?false,?false);log.error(e.getMessage(),?e);}}
}
waynboot-mall 項目中在使用 RabbitListener 注解消費消息時,按照如下步驟,大家可以參考
-
將 message 參數(shù)轉換成訂單消息。
-
從 message 參數(shù)中獲取消息唯一 msgId。
-
從 message 參數(shù)中獲取消息發(fā)送 tag。
-
冪等性處理,根據(jù)第二步獲取的 msgId ,消費消息時需要先判斷 msgId 是否已經(jīng)被處理。
-
調用 mobile-api 服務,進行下單邏輯處理,在 mobileApi.submitOrder(body) 方法中使用 Spring-Retry 的 @Retryable 注解,進行自動重試。
-
手動 ack,basicAck(long deliveryTag, boolean multiple)。basicAck 方法表示成功確認,使用此方法后,消息就會被 rabbitmq 服務器刪除。
其中參數(shù) long deliveryTag 為消息的唯一序號也就是第三步獲取的發(fā)送 tag,第二個 boolean multiple 參數(shù)表示是否一次消費多條消息,false 表示只確認該序列號對應的消息,true 則表示確認該序列號對應的消息以及比該序列號小的所有消息,比如我先發(fā)送 2 條消息,他們的序列號分別為 2,3,并且他們都沒有被確認,還留在隊列中,那么如果當前消息序列號為 4,那么當 multiple 為 true,則序列號為 2、3 的消息也會被一同確認。
-
冪等性處理,消息已經(jīng)被成功消費后,根據(jù)第二步獲取的 msgId 設置冪等標識。
總結一下
這篇文章給大家講解了在 Spring Boot 項目中如何集成消息隊列 RabbitMQ 用于業(yè)務邏輯解耦,有架構介紹、應用場景、坑點解析、代碼實戰(zhàn) 4 個部分,能帶領大家比較全面的了解一波 RabbitMQ。大家在自己的項目中如果需要引入 RabbitMQ 時,都可以參考本文的代碼實戰(zhàn)配置,幫助大家快速集成、避免踩坑。