個(gè)人可以做商城網(wǎng)站嗎優(yōu)秀品牌策劃方案
文章目錄
- 消息隊(duì)列(Message Queue)
- 什么場(chǎng)景下,使用消息隊(duì)列?
- 消息隊(duì)列 概述
- RabbitMQ 消息隊(duì)列
- RabbitMQ 概念
- 名詞 概念
- RabbitMQ 流程
- RabbitMQ 安裝
- RabbitMQ 頁(yè)面介紹
- Exchange 交換機(jī)類(lèi)型
- Spring Boot 整合RabbitMQ
- AmqpAdmin 與 RabbitTemplate 的使用
- 整合的 序列化問(wèn)題
- Spring Boot 整合的 RabbitListener 監(jiān)聽(tīng)
- @RabbitListener 注解
- @RabbitHandler 注解
- @PostConstruct 注解 和 @Primary 注解 使用
- RabbitMQ 發(fā)送端(生產(chǎn)者) 消息確認(rèn)機(jī)制
- 事務(wù)消息(了解即可)
- RabbitMQ消息確認(rèn)的 三個(gè)階段
- 第一個(gè)階段:可靠抵達(dá) - confirmCallback
- 第二個(gè)階段:可靠抵達(dá) - ReturnCallback
- 可靠抵達(dá) 消費(fèi)端(消費(fèi)者) Ack消息確認(rèn)機(jī)制
- 第三個(gè)階段:ack 消息確認(rèn)機(jī)制
- 電商 訂單中心
- 訂單中心 的 重要經(jīng)驗(yàn)
- 訂單登錄 攔截
- Feign 遠(yuǎn)程調(diào)用丟失請(qǐng)求頭問(wèn)題(丟失請(qǐng)求頭 等同于 登錄失效了)
- Feign 異步情況丟失上下文問(wèn)題
- 接口冪等性 處理(防重復(fù)提交)
- 冪等性 概念
- 冪等性 考慮情況
- 冪等解決方案
- token機(jī)制
- 各種鎖機(jī)制
- 數(shù)據(jù)庫(kù)悲觀鎖
- 數(shù)據(jù)庫(kù)樂(lè)觀鎖
- 業(yè)務(wù)層分布式鎖
- 各種唯一約束
- 數(shù)據(jù)庫(kù)唯一約束
- redis set 防重
- 防重表
- 全局請(qǐng)求唯一id
- 分布式事務(wù)
- 本地事務(wù)
- 分布式事務(wù)的 問(wèn)題 以及 理論
- CAP定理
- BASE 理論
- 分布式事務(wù)的 多種方案
- 2PC模式
- 柔性事務(wù) - TCC 事務(wù)補(bǔ)償型 方案(常用)
- 柔性事務(wù) - 最大努力通知型方案(常用)
- 柔性事務(wù) - 可靠消息 + 最終一致性方案(常用)
- Seata 框架
- Seata 介紹
- Seata 環(huán)境搭建
- Seata 的幾個(gè)模式
- 訂單服務(wù)采用 最終一致性方案 解決分布式事務(wù)問(wèn)題
- RabbitMQ 延時(shí)隊(duì)列(實(shí)現(xiàn)定時(shí)任務(wù))
- 延遲隊(duì)列的 設(shè)計(jì)和實(shí)現(xiàn)
- 鎖和解鎖庫(kù)存 架構(gòu)實(shí)現(xiàn)
- 訂單服務(wù) 庫(kù)存解鎖 場(chǎng)景
- 訂單服務(wù) 定時(shí)關(guān)單 場(chǎng)景
- RabbitMQ 消息積壓、丟失、重復(fù)解決方案
- 如何保證消息可靠性 - 消息丟失
- 消息丟失場(chǎng)景 一:消息發(fā)送出去,由于網(wǎng)絡(luò)原因沒(méi)有抵達(dá)服務(wù)器。
- 消息丟失場(chǎng)景 二:消息抵達(dá)Broker,Broker要將消息寫(xiě)入磁盤(pán)(持久化)才算成功。此時(shí),Broker尚未持久化完成,宕機(jī)。
- 消息丟失場(chǎng)景 三:自動(dòng)ACK的狀態(tài)下。消費(fèi)者收到消息,但沒(méi)來(lái)得及處理消息,服務(wù)器宕機(jī)了。
- 如何保證消息可靠性 - 消息重復(fù)
- 如何保證消息可靠性 - 消息積壓
消息隊(duì)列(Message Queue)
什么場(chǎng)景下,使用消息隊(duì)列?
- 需要
異步處理
的內(nèi)容。
- 可能某個(gè)操作要調(diào)用多個(gè)服務(wù)流程去完成,這幾者之間也沒(méi)有強(qiáng)依賴(lài)性,無(wú)需等待返回。
應(yīng)用解耦
沒(méi)有使用消息隊(duì)列:訂單系統(tǒng) 調(diào)用 庫(kù)存系統(tǒng)接口,庫(kù)存系統(tǒng)維護(hù)升級(jí),那么訂單系統(tǒng)調(diào)用的接口也要維護(hù)升級(jí), 所以很麻煩!
使用消息隊(duì)列:訂單系統(tǒng) 無(wú)需依靠著庫(kù)存系統(tǒng)的接口,只需要給消息隊(duì)列發(fā)送消息,庫(kù)存系統(tǒng)去消息隊(duì)列消費(fèi)消息即可,實(shí)現(xiàn)應(yīng)用上的解耦。
流量控制
(也叫做 流量削峰)
業(yè)務(wù)中涉及到某一時(shí)間段內(nèi),有大量請(qǐng)求需要處理,可以通過(guò)消息隊(duì)列來(lái)做到流量控制。
例如:電商系統(tǒng)里面的 秒殺業(yè)務(wù)。
消息隊(duì)列 概述
消息服務(wù)中兩個(gè)重要概念:
消息代理(message broker)和 目的地(destination)
當(dāng)消息發(fā)送者發(fā)送消息以后,將由消息代理接管,消息代理保證消息傳遞到指定目的地。
消息隊(duì)列主要有兩種形式的目的地
:
- 隊(duì)列(queue):點(diǎn)對(duì)點(diǎn)消息通信(point - to - point)
- 過(guò)程:消息發(fā)送給消息代理,消息代理將其放入一個(gè)隊(duì)列,接收者從里面獲取消息,消毒讀取后移除隊(duì)列。
- 消息只有唯一的發(fā)送者 ,一個(gè)或多個(gè)接收者。
- 主題(topic):發(fā)布(publish)/訂閱(subscribe)消息通信。
- 過(guò)程:消息發(fā)送到主題,多個(gè)接收者(訂閱者)監(jiān)聽(tīng)(訂閱)這個(gè)主題,那么就會(huì)在消息到達(dá)同時(shí)收到消息。
消息隊(duì)列的 兩種協(xié)議:(遵循了協(xié)議就能夠調(diào)用 消息隊(duì)列中間件)
Spring 集成支持:
Spring Boot 繼承自動(dòng)配置:
市面上的MQ產(chǎn)品:ActiveMQ、RabbitMQ、RocketMQ、Kafka
本次項(xiàng)目采用AMQP的Rabbitmq來(lái)做消息隊(duì)列。
RabbitMQ 消息隊(duì)列
RabbitMQ 概念
名詞 概念
Message:消息,由消息頭和消息體組成。消息體是不透明的,消息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對(duì)于其他消息的優(yōu)先權(quán))、delivery-mode(指出該消息可能需要持久性存儲(chǔ))等。
Publisher:消息的生產(chǎn)者,也是一個(gè)向交換器發(fā)布消息的客戶(hù)端應(yīng)用程序。
Exchange:交換器,用來(lái)接受生產(chǎn)者發(fā)送的消息并將這些消息路由給服務(wù)器中的隊(duì)列。四種交換器類(lèi)型:direct(默認(rèn))、fanout、topic、headers,不同類(lèi)型轉(zhuǎn)發(fā)消息的策略也不同。
Queue:消息隊(duì)列。
Binding:綁定,用于消息隊(duì)列和交換機(jī)之間的關(guān)聯(lián)。
Connection:網(wǎng)絡(luò)連接,比如:TCP連接。
Channel:信道,多路復(fù)用連接中的獨(dú)立的雙向數(shù)據(jù)流通道。信道是建立在真實(shí)的TCP連接內(nèi)的虛擬連接,AMQP命令都是通過(guò)信道發(fā)出去的,不管是發(fā)布消息、訂閱隊(duì)列還是接受消息,這些動(dòng)作都是通過(guò)信道完成。因?yàn)閷?duì)于操作系統(tǒng)來(lái)說(shuō)建立和銷(xiāo)毀TCP都是非常昂貴的開(kāi)銷(xiāo),所以引入了信道的概念,以服用一條TCP連接。
Consumer:消費(fèi)者
Virtual Host:虛擬主機(jī),表示一批交換器、消息隊(duì)列和相關(guān)對(duì)象。每個(gè)虛擬主機(jī)本質(zhì)上就是一個(gè)mini版的RabbitMQ服務(wù)器,擁有自己的隊(duì)列、交換器、綁定和權(quán)限機(jī)制。目的:起到隔離效果,比如:一個(gè)RabbitMQ中,有一臺(tái)針對(duì)Java調(diào)用的虛擬主機(jī),有一臺(tái)針對(duì)python調(diào)用的虛擬主機(jī),這樣Java虛擬主機(jī)如果出現(xiàn)問(wèn)題,也不會(huì)影響python這臺(tái)(也有按照開(kāi)發(fā)、測(cè)試、生產(chǎn)環(huán)境來(lái)的)。
Broker:表示消息隊(duì)列服務(wù)器實(shí)體。
RabbitMQ 流程
RabbitMQ流程圖:
RabbitMQ 安裝
- 執(zhí)行docker命令。
# 1. 啟動(dòng) rabbitmq:management 容器
docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 \
-p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management
# 2. 自動(dòng)重啟
docker update rabbitmq --restart=always
rabbitmq:management 集成了:web管理后臺(tái)的端口。
端口 解釋:
- 啟動(dòng)成功后,訪(fǎng)問(wèn) IP:15672 查看頁(yè)面即可。
- 賬號(hào)密碼默認(rèn)都是:guest
RabbitMQ 頁(yè)面介紹
Overview 概述:
Admin 管理:
Exchanges 交換機(jī):
Queues 隊(duì)列:
Exchange 交換機(jī)類(lèi)型
Direct Exchange:直接模式,直接 交換機(jī)。
- 將消息交給指定隊(duì)列,路由(routing key)按照綁定(Binding)關(guān)系,將消息發(fā)到對(duì)應(yīng)的隊(duì)列中,這個(gè)是完全匹配,完全按照路由綁定關(guān)系,去找對(duì)應(yīng)的消息隊(duì)列。
- 這種叫做完全匹配、單播模式。例如:routing key 為 dog 的消息,不會(huì)轉(zhuǎn)發(fā) dog.puppy … 其他的。
Fanout Exchange:廣播模式, 扇形 交換機(jī)
- 每個(gè)發(fā)到fanout類(lèi)型交換器的消息都會(huì)分到所有綁定的隊(duì)列上去。
Topic Exchange:主題模式 , 主題 交換機(jī)
- topic 交換機(jī)通過(guò)模式匹配分配消息的路由鍵屬性,將路由鍵和某個(gè)模式進(jìn)行匹配。
- 就相當(dāng)于會(huì)區(qū)分路由鍵,不同的路由會(huì)走向不同的隊(duì)列。會(huì)涉及到:通配符(#、*)
Spring Boot 整合RabbitMQ
- 引入amqp啟動(dòng)類(lèi),RabbitAutoConfiguration就會(huì)自動(dòng)生效。
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- RabbitAutoConfiguration類(lèi),自動(dòng)配置了RabbitTemplate、AmqpAdmin 等實(shí)例對(duì)象。
- @EnableRabbit 標(biāo)識(shí)啟動(dòng)注解。
package com.atguigu.gulimall.order;import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;// 啟動(dòng) 注解
@EnableRabbit
@SpringBootApplication
public class GulimallOrderApplication {public static void main(String[] args) {SpringApplication.run(GulimallOrderApplication.class, args);}}
- 配置rabbitmq屬性。
# rabbitmq 配置信息
spring.rabbitmq.host=www.gulimall.com
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
AmqpAdmin 與 RabbitTemplate 的使用
package com.atguigu.gulimall.order;import com.atguigu.gulimall.order.entity.OrderReturnReasonEntity;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;import java.util.Date;@SpringBootTest
class GulimallOrderApplicationTests {@AutowiredAmqpAdmin amqpAdmin;@AutowiredRabbitTemplate rabbitTemplate;// 發(fā)送message@Testpublic void sendMessage(){OrderReturnReasonEntity orderReturnReasonEntity = new OrderReturnReasonEntity();orderReturnReasonEntity.setId(1l);orderReturnReasonEntity.setCreateTime(new Date());orderReturnReasonEntity.setName("測(cè)試實(shí)體類(lèi)");// 1. 發(fā)送消息// 如果發(fā)送的消息是個(gè)對(duì)象,我們會(huì)使用序列化機(jī)制,將對(duì)象寫(xiě)出去。對(duì)象必須實(shí)現(xiàn)Serializable!!rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",orderReturnReasonEntity);}// 創(chuàng)建交換機(jī)@Testvoid createExchange() {// 參數(shù):名字、是否持久化、是否自動(dòng)刪除DirectExchange directExchange = new DirectExchange("hello-java-exchange",true,false);amqpAdmin.declareExchange(directExchange);}// 創(chuàng)建隊(duì)列@Testvoid createQueue() {Queue queue = new Queue("hello-java-queue",true,false,false);amqpAdmin.declareQueue(queue);}// 創(chuàng)建綁定@Testvoid createBinding() {// 將exchange指定的交換機(jī)和destination目的地進(jìn)行綁定,使用routingKey作為指定的路由鍵Binding binding = new Binding("hello-java-queue", // 目的地Binding.DestinationType.QUEUE, // 目的地類(lèi)型"hello-java-exchange", // 指定交換機(jī)"hello.java", // 路由keynull // 參數(shù));amqpAdmin.declareBinding(binding);}}
整合的 序列化問(wèn)題
對(duì)于實(shí)體類(lèi)序列化,默認(rèn)為jdk序列化:
可以通過(guò)配置消息轉(zhuǎn)換來(lái)實(shí)現(xiàn)Jackson2JSON序列化:
package com.atguigu.gulimall.order.config;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/* @author: xuyanbo* @description: TODO* @date: 2023/10/27 14:43*/
@Configuration
public class MyRabbitConfig {@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}}
Spring Boot 整合的 RabbitListener 監(jiān)聽(tīng)
@RabbitListener 注解
@RabbitListener注解,參數(shù):
- queues:聲明需要監(jiān)聽(tīng)的所有隊(duì)列。
@RabbitListener注解:標(biāo)注在 類(lèi) 或者 方法上。(看源碼注解能看出)
@RabbitHandler注解:標(biāo)注在 方法上。
被@RabbitListener注解 ,標(biāo)注方法上
:
- 第一個(gè)參數(shù) Message message:原生消息相信信息,消息頭 + 消息體
- 第二個(gè)參數(shù) T<發(fā)送的消息的類(lèi)型> T content :消息體里面對(duì)應(yīng)的實(shí)體信息(一般為實(shí)體類(lèi))
- 第三個(gè)參數(shù) Channel channel : 當(dāng)前傳輸數(shù)據(jù)的通道。
注意:要引入rabbitmq的Channel。
package com.atguigu.gulimall.order.service.impl;import com.alibaba.fastjson.JSON;
import com.atguigu.common.utils.PageUtils;
import com.atguigu.common.utils.Query;
import com.atguigu.gulimall.order.dao.OrderItemDao;
import com.atguigu.gulimall.order.entity.OrderItemEntity;
import com.atguigu.gulimall.order.entity.OrderReturnReasonEntity;
import com.atguigu.gulimall.order.service.OrderItemService;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;import com.rabbitmq.client.Channel;
import java.util.Map;@Service("orderItemService")
public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService {@Overridepublic PageUtils queryPage(Map<String, Object> params) {IPage<OrderItemEntity> page = this.page(new Query<OrderItemEntity>().getPage(params),new QueryWrapper<OrderItemEntity>());return new PageUtils(page);}/* @RabbitListener參數(shù):* queues:聲明需要監(jiān)聽(tīng)的所有隊(duì)列。** 被標(biāo)注方法參數(shù):* 第一個(gè)參數(shù) Message message:原生消息相信信息,消息頭 + 消息體* 第二個(gè)參數(shù) T<發(fā)送的消息的類(lèi)型> T content :消息體里面對(duì)應(yīng)的實(shí)體信息(一般為實(shí)體類(lèi))* 第三個(gè)參數(shù) Channel channel : 當(dāng)前傳輸數(shù)據(jù)的通道。(有問(wèn)題,沒(méi)有Channel類(lèi))*/@RabbitListener(queues = {"hello-java-queue"})public void receiveMessage(Message message, OrderReturnReasonEntity content, Channel channel) {// body里面存儲(chǔ)的是:發(fā)送的消息內(nèi)容byte[] body = message.getBody();// fixme 這種方式太麻煩,可以用參數(shù)直接來(lái)接受消息體里面的內(nèi)容。// OrderReturnReasonEntity orderReturnReasonEntity = JSON.parseObject(body.toString(), OrderReturnReasonEntity.class);System.out.println("body:" + content);// properties存儲(chǔ)的是:發(fā)過(guò)來(lái)的消息頭屬性MessageProperties properties = message.getMessageProperties();System.out.println("接收到消息...內(nèi)容:" + message.toString() + ",類(lèi)型:" + message.getClass());}}
💡Tip:Queue:可以很多人都來(lái)監(jiān)聽(tīng)。只要收到消息,隊(duì)列刪除消息,而且只能有一個(gè)收到此消息場(chǎng)景:
- 訂單服務(wù)啟動(dòng)多個(gè);同一個(gè)消息,只能有一個(gè)客戶(hù)端收到。
- 單個(gè)服務(wù),一次只能處理一個(gè)消息,等消息完全處理完,方法運(yùn)行結(jié)束,才可以接受到下一個(gè)消息。
@RabbitHandler 注解
被@RabbitListener注解 ,標(biāo)注類(lèi)上
參數(shù):
- 重載方法,區(qū)分不同的消息的作用。
- 針對(duì) 接受的消息類(lèi)型 會(huì)有多種的情況,可以使用@RabbitHandler來(lái)標(biāo)識(shí),不同方法來(lái)處理不同情況。
碰壁了,此處有問(wèn)題,沒(méi)有Channel類(lèi)可能跟配置序列化有關(guān)系。
package com.atguigu.gulimall.order.service.impl;import com.atguigu.common.utils.PageUtils;
import com.atguigu.common.utils.Query;
import com.atguigu.gulimall.order.dao.OrderItemDao;
import com.atguigu.gulimall.order.entity.OrderEntity;
import com.atguigu.gulimall.order.entity.OrderItemEntity;
import com.atguigu.gulimall.order.entity.OrderReturnReasonEntity;
import com.atguigu.gulimall.order.service.OrderItemService;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;import com.rabbitmq.client.Channel;
import java.util.Map;@RabbitListener(queues = {"hello-java-queue"})
@Service("orderItemService")
public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService {@Overridepublic PageUtils queryPage(Map<String, Object> params) {IPage<OrderItemEntity> page = this.page(new Query<OrderItemEntity>().getPage(params),new QueryWrapper<OrderItemEntity>());return new PageUtils(page);}/* @RabbitListener參數(shù):* queues:聲明需要監(jiān)聽(tīng)的所有隊(duì)列。** 被標(biāo)注方法參數(shù):* 第一個(gè)參數(shù) Message message:原生消息相信信息,消息頭 + 消息體* 第二個(gè)參數(shù) T<發(fā)送的消息的類(lèi)型> T content :消息體里面對(duì)應(yīng)的實(shí)體信息(一般為實(shí)體類(lèi))* 第三個(gè)參數(shù) Channel channel : 當(dāng)前傳輸數(shù)據(jù)的通道。(有問(wèn)題,沒(méi)有Channel類(lèi))*/@RabbitHandlerpublic void receiveMessageOrderReturnReasonEntity(Channel channel, Message message, OrderReturnReasonEntity content) {System.out.println("接收到消息..." + content);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消息處理完成..." + content);}@RabbitHandlerpublic void receiveMessageForOrderEntity(Channel channel, Message message,OrderEntity content) {System.out.println("接收到消息..." + content);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消息處理完成..." + content);}}
@PostConstruct 注解 和 @Primary 注解 使用
@PostConstruct注解:相當(dāng)于創(chuàng)建完當(dāng)前這個(gè)對(duì)象后,之后調(diào)用的方法??梢苑g為:構(gòu)造器之后。
例如:下面就是等 MyRabbitConfig 創(chuàng)建實(shí)例后,執(zhí)行initRabbitTemplate方法,給RestTemplate對(duì)象配置相關(guān)內(nèi)容。
package com.atguigu.gulimall.order.config;import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.context.annotation.Primary;import javax.annotation.PostConstruct;
import javax.annotation.Resource;/* @author: xuyanbo* @description: TODO* @date: 2023/10/27 14:43*/
@Configuration
public class MyRabbitConfig {@Autowird // 此處報(bào)錯(cuò),閉環(huán)錯(cuò)誤。RabbitTemplate rabbitTemplate;// 使用JSON序列化機(jī)制,進(jìn)行消息轉(zhuǎn)換@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}/* @PostConstruct注解:相當(dāng)于創(chuàng)建完MyRabbitConfig對(duì)象,之后調(diào)用的方法。翻譯為構(gòu)造器之后。*/@PostConstructpublic void initRabbitTemplate(){// 設(shè)置確認(rèn)回調(diào)rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("confirm...");System.out.println("CorrelationData:" + correlationData);System.out.println("ack:" + ack);System.out.println("cause:" + cause);}});}}
@Primary注解在Spring框架中表示當(dāng)有多個(gè)相同類(lèi)型的bean時(shí),使用該注解賦予bean更高的優(yōu)先級(jí)。比如在Spring IOC容器中有多個(gè)相同類(lèi)型的bean時(shí),當(dāng)要注入該類(lèi)型的bean,就可以使用@Primary來(lái)標(biāo)注注入bean的優(yōu)先,優(yōu)先級(jí)高的bean先被注入。
RabbitMQ 發(fā)送端(生產(chǎn)者) 消息確認(rèn)機(jī)制
事務(wù)消息(了解即可)
事務(wù)消息:
- 將所有的過(guò)程都鎖定到一個(gè)事務(wù)中,一起成功,一起失敗。
為了保證消息不丟失,可靠抵達(dá),可以使用事務(wù)消息,但是性能卻下降250倍,所以事務(wù)消息是不推薦使用的,為此引入確認(rèn)機(jī)制
。
RabbitMQ消息確認(rèn)的 三個(gè)階段
先看官方文檔:Reliability Guide — RabbitMQ
一個(gè)完整效果圖:
- p:生產(chǎn)者,c:消費(fèi)者。
p(provider) -> b(Broker):需要 confirmCallback
e(Exchange) -> q(Queue):需要 returnCallback
q(Queue) -> c(Consumer):需要 ack
第一個(gè)階段:可靠抵達(dá) - confirmCallback
實(shí)現(xiàn) 可靠抵達(dá) 步驟:
- 可靠抵達(dá) 配置,注意:不同版本的SpringBoot可能會(huì)不同。
## 開(kāi)啟發(fā)送端消息抵達(dá)Broker確認(rèn)(使用的SpringBoot版本不支持棄用了)
# spring.rabbitmq.publisher-confirms=true
## 開(kāi)啟發(fā)送端消息抵達(dá)Queue確認(rèn)
spring.rabbitmq.publisher-returns=true
## 只要消息抵達(dá)Queue,就會(huì)異步發(fā)送優(yōu)先回調(diào)returnfirm
spring.rabbitmq.template.mandatory=true
spring.rabbitmq.publisher-confirm-type=correlated
在Spring框架中,spring.rabbitmq.publisher-confirms 屬性是用于開(kāi)啟或關(guān)閉消息發(fā)布確認(rèn)的。從Spring AMQP 2.0開(kāi)始,這個(gè)屬性已經(jīng)被棄用,并推薦使用 spring.rabbitmq.confirm-interval 和 spring.rabbitmq.publisher-returns 這兩個(gè)新的屬性來(lái)代替。
- rabbitTemplate的容器對(duì)象,配置好對(duì)應(yīng)的setConfirmCallback方法,方便測(cè)試。
package com.atguigu.gulimall.order.config;import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;/* @author: xuyanbo* @description: TODO* @date: 2023/10/27 14:43*/
@Configuration
public class MyRabbitConfig {RabbitTemplate rabbitTemplate;@Primary@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);this.rabbitTemplate = rabbitTemplate;rabbitTemplate.setMessageConverter(messageConverter());initRabbitTemplate();return rabbitTemplate;}// 使用JSON序列化機(jī)制,進(jìn)行消息轉(zhuǎn)換@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}public void initRabbitTemplate(){// 設(shè)置確認(rèn)回調(diào)rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/* 第一個(gè)參數(shù) correlationData:當(dāng)前消息的唯一關(guān)聯(lián)數(shù)據(jù)(這個(gè)是消息的唯一ID)* 第二個(gè)參數(shù) ack:消息是否成功收到* 第三個(gè)參數(shù) cause:失敗原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("confirm...correlationData:" + correlationData + ",ack:" + ack + ",cause:" + cause);}});}}
- 總結(jié):只要消息抵達(dá)Broker就ack=true。
第二個(gè)階段:可靠抵達(dá) - ReturnCallback
- SpringBoot 配置
## 開(kāi)啟發(fā)送端消息抵達(dá)Broker確認(rèn)(使用的SpringBoot版本不支持棄用了)
# spring.rabbitmq.publisher-confirms=true
## 開(kāi)啟發(fā)送端消息抵達(dá)Queue確認(rèn)
spring.rabbitmq.publisher-returns=true
## 只要消息抵達(dá)Queue,就會(huì)異步發(fā)送優(yōu)先回調(diào)returnfirm
spring.rabbitmq.template.mandatory=true
## 手動(dòng)ack消息,不使用默認(rèn)的消費(fèi)端確認(rèn)
spring.rabbitmq.listener.simple.acknowledge-mode=manual
- 添加消息的唯一ID方便測(cè)試。
- rabbitTemplate.convertAndSend方法,new CorrelationData(UUID.randomUUID().toString())的作用使用UUID來(lái)創(chuàng)建該消息的唯一ID。
@Autowired
RabbitTemplate rabbitTemplate;// new CorrelationData(UUID.randomUUID().toString()) 添加消息唯一ID,方便測(cè)試
rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",orderEntity,new CorrelationData(UUID.randomUUID().toString()));
- 配置 rabbitTemplate.setReturnsCallback方法。
package com.atguigu.gulimall.order.config;import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;/* @author: xuyanbo* @description: TODO* @date: 2023/10/27 14:43*/
@Configuration
public class MyRabbitConfig {RabbitTemplate rabbitTemplate;@Primary@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);this.rabbitTemplate = rabbitTemplate;rabbitTemplate.setMessageConverter(messageConverter());initRabbitTemplate();return rabbitTemplate;}// 使用JSON序列化機(jī)制,進(jìn)行消息轉(zhuǎn)換@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}public void initRabbitTemplate(){// 設(shè)置確認(rèn)回調(diào)rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/* 1、 只要消息抵達(dá)Broker就ack=true* 第一個(gè)參數(shù) correlationData:當(dāng)前消息的唯一關(guān)聯(lián)數(shù)據(jù)(這個(gè)是消息的唯一ID)* 第二個(gè)參數(shù) ack:消息是否成功收到* 第三個(gè)參數(shù) cause:失敗原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("confirm...correlationData:" + correlationData + ",ack:" + ack + ",cause:" + cause);}});// 設(shè)置消息抵達(dá)隊(duì)列的確認(rèn)回調(diào)rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {/* 該方法只有失敗才會(huì)調(diào)用,成功不會(huì)被調(diào)用。* ReturnedMessage對(duì)象里面的參數(shù)解釋:* private final Message message; 投遞失敗的消息詳情信息* private final int replyCode; 回復(fù)的狀態(tài)碼* private final String replyText; 回復(fù)的文本內(nèi)容* private final String exchange; 當(dāng)時(shí)這個(gè)消息發(fā)給哪個(gè)交換機(jī)* private final String routingKey; 當(dāng)時(shí)這個(gè)消息用哪個(gè)路由鍵*/@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {System.out.println("失敗 消息,Message:" + returnedMessage.getMessage()+ ",replyCode:" + returnedMessage.getReplyCode() + ",replyText:" + returnedMessage.getReplyText()+ ",exchange:" + returnedMessage.getExchange() + ",routingKey:" + returnedMessage.getRoutingKey());}});}}
失敗測(cè)試不成功,待解決。
可靠抵達(dá) 消費(fèi)端(消費(fèi)者) Ack消息確認(rèn)機(jī)制
第三個(gè)階段:ack 消息確認(rèn)機(jī)制
💡Tip:ack全稱(chēng):acknowledge 收到通知。
- 默認(rèn)自動(dòng)ack確認(rèn)的,只要消息接收到,客戶(hù)端會(huì)自動(dòng)確認(rèn),(隊(duì)列)服務(wù)端就會(huì)自動(dòng)移除這個(gè)消息。
- 存在問(wèn)題:當(dāng)收到很多消息,自動(dòng)回復(fù)給服務(wù)器ack,然而,服務(wù)器宕機(jī)了。這就導(dǎo)致了消息的丟失。
- 將ack設(shè)置為手動(dòng)確認(rèn),之后進(jìn)行測(cè)試。
- 手動(dòng)確認(rèn)模式,只要我們沒(méi)有明確的告訴mq,貨物被簽收。沒(méi)有Ack,消息就一直是Unacked狀態(tài)。即使出現(xiàn)了宕機(jī),消息也不會(huì)丟失,會(huì)重新變?yōu)閞eady。
## 將ack確認(rèn)設(shè)置為手動(dòng)模式
spring.rabbitmq.listener.simple.acknowledge-mode=manual
- 通過(guò)Channel來(lái)手動(dòng)確認(rèn)消息。
- channel.basicAck(deliveryTag, false) 肯定確認(rèn)
- channel.basicNack(deliveryTag,false,true); 否定確認(rèn)
- channel.basicReject(deliveryTag,true) 也是否定確認(rèn),但是不能批量操作。
package com.atguigu.gulimall.order.service.impl;import com.atguigu.common.utils.PageUtils;
import com.atguigu.common.utils.Query;
import com.atguigu.gulimall.order.dao.OrderItemDao;
import com.atguigu.gulimall.order.entity.OrderEntity;
import com.atguigu.gulimall.order.entity.OrderItemEntity;
import com.atguigu.gulimall.order.entity.OrderReturnReasonEntity;
import com.atguigu.gulimall.order.service.OrderItemService;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;import java.io.IOException;
import java.util.Map;@RabbitListener(queues = {"hello-java-queue"})
@Service("orderItemService")
public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService {@RabbitHandlerpublic void receiveMessageOrderReturnReasonEntity(Channel channel, Message message, OrderReturnReasonEntity content) {System.out.println("接收到消息..." + content);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消息處理完成..." + content);long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.println("deliveryTag: " + deliveryTag);/* channel.basicAck 等同于 簽收獲取,手動(dòng)確認(rèn)。* 第一個(gè)參數(shù):可以理解為一個(gè)貨物標(biāo)簽,就是消息的標(biāo)簽,是哪個(gè)消息被確認(rèn)了。* 第二個(gè)參數(shù):是否批量確認(rèn)。*/// 簽收貨物,非批量模式try {if (deliveryTag % 2 == 0) {// 肯定確認(rèn)channel.basicAck(deliveryTag, false);} else {// 否定確認(rèn)/* channel.basicNack 等同于 拒絕簽收,拒絕確認(rèn)。* 第一個(gè)參數(shù):可以理解為一個(gè)貨物標(biāo)簽,就是消息的標(biāo)簽,是哪個(gè)消息被確認(rèn)了。* 第二個(gè)參數(shù):是否批量確認(rèn)。* 第三個(gè)參數(shù):是否重新回歸隊(duì)列。*/channel.basicNack(deliveryTag,false,true);// channel.basicReject(deliveryTag,true);// 效果一樣,但是不能批量操作。 }} catch (IOException e) {// 網(wǎng)絡(luò)中斷e.printStackTrace();}}}
- 總結(jié):
電商 訂單中心
訂單中心 的 重要經(jīng)驗(yàn)
在電商系統(tǒng)中,訂單中心很重要,涉及到3流,分別是信息流、資金流、物流。訂單中心就相當(dāng)于是三者的中間整合商。
訂單的作用:把感興趣的商品整合一起,生成一個(gè)支付單,然后完成一個(gè)發(fā)貨的物流過(guò)程。
所以,訂單模塊是電商系統(tǒng)的樞紐,在訂單這個(gè)環(huán)節(jié)商需求獲取多個(gè)模塊的數(shù)據(jù)和信息。同時(shí)對(duì)這多個(gè)信息進(jìn)行加工處理,流向下個(gè)環(huán)節(jié)。
訂單所涉及到的信息如下:
訂單總流程:
- 名詞:實(shí)物訂單、虛擬訂單(話(huà)費(fèi))、庫(kù)存鎖定(下了單沒(méi)支付,需要鎖定庫(kù)存)、庫(kù)存解鎖(超時(shí)未支付,解鎖)
- 正常流程:訂單生成 -》 支付訂單 -》 賣(mài)家發(fā)貨 -》 確認(rèn)收貨 -》 交易成功。
訂單生成 流程包括:
- 創(chuàng)建訂單 -》 驗(yàn)令牌(冪等性) -》 驗(yàn)價(jià)格(優(yōu)惠、扣減等等) -》 鎖庫(kù)存(只要有異?;貪L訂單數(shù)據(jù))
鎖庫(kù)存方式:
- 通過(guò)SQL條件來(lái)控制即可。
update `wms_ware_sku` set stock_locked = stock_locked + #{num}
where sku_id = #{skuId}
and ware_id = #{wareId}
and stock - stock_locked >= #{num}
訂單登錄 攔截
- 創(chuàng)建攔截器。
- 也是用到了ThreadLocal存儲(chǔ)用戶(hù)信息。
package com.atguigu.gulimall.order.interceptor;import com.atguigu.common.constant.AuthServerConstant;
import com.atguigu.common.vo.MemberRespVo;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.HandlerInterceptor;import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;/* @author: xuyanbo* @description: TODO* @date: 2023/10/31 11:37*/
@Component
public class LoginUserInterceptor implements HandlerInterceptor {public static ThreadLocal<MemberRespVo> loginUser = new ThreadLocal<>();@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {MemberRespVo userInfo = (MemberRespVo) request.getSession().getAttribute(AuthServerConstant.LOGIN_USER);if (userInfo != null) {loginUser.set(userInfo);return true;} else {// 沒(méi)登錄去登錄request.getSession().setAttribute("msg","請(qǐng)先進(jìn)行登錄");response.sendRedirect("http://auth.gulimall.com/login.html");return false;}}}
- 注冊(cè)攔截器,并且配置路徑。
package com.atguigu.gulimall.order.config;import com.atguigu.gulimall.order.interceptor.LoginUserInterceptor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;/* @author: xuyanbo* @description: TODO* @date: 2023/10/31 11:39*/
@Configuration
public class OrderWebConfiguration implements WebMvcConfigurer {@AutowiredLoginUserInterceptor interceptor;@Overridepublic void addInterceptors(InterceptorRegistry registry) {registry.addInterceptor(interceptor).addPathPatterns("/");}}
Feign 遠(yuǎn)程調(diào)用丟失請(qǐng)求頭問(wèn)題(丟失請(qǐng)求頭 等同于 登錄失效了)
問(wèn)題原因:
解決辦法:
feign在發(fā)起遠(yuǎn)程調(diào)用之前,會(huì)經(jīng)過(guò)一大堆的攔截器,我們也可以添加一個(gè)攔截器,將相關(guān)信息維護(hù)上:
- 添加一個(gè)Feign遠(yuǎn)程調(diào)用攔截器。
- 在攔截器里面,使用RequestContextHolder獲取到請(qǐng)求以及請(qǐng)求頭相關(guān)信息。
- 案例代碼如下:
- ServletRequestAttributes attributes = (ServletRequestAttributes)RequestContextHolder.getRequestAttributes(); 是重點(diǎn)。
RequestContextHolder的原理是ThreadLocal
package com.atguigu.gulimall.order.config;import feign.RequestInterceptor;
import feign.RequestTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;import javax.servlet.http.HttpServletRequest;/* @author: xuyanbo* @description: TODO* @date: 2023/10/31 14:56*/
@Configuration
public class GuliFeignConfig {// 添加一個(gè)攔截器,并且聲明名字@Bean("requestInterceptor")public RequestInterceptor requestInterceptor(){return new RequestInterceptor() {@Overridepublic void apply(RequestTemplate template) {// 1. 使用RequestContextHolder拿到剛進(jìn)來(lái)的這個(gè)請(qǐng)求ServletRequestAttributes attributes = (ServletRequestAttributes)RequestContextHolder.getRequestAttributes();HttpServletRequest request = attributes.getRequest();// 2. 同步請(qǐng)求頭數(shù)據(jù),CookieString cookie = request.getHeader("Cookie");// 3. 給Feign的請(qǐng)求里面,同步當(dāng)前請(qǐng)求的cookie信息template.header("Cookie",cookie);}};}}
Feign 異步情況丟失上下文問(wèn)題
因?yàn)?#xff0c;RequestContextHolder 的原理是 ThreadLocal ,當(dāng)我們使用異步的方式進(jìn)行Feign的遠(yuǎn)程調(diào)用,相當(dāng)于創(chuàng)建了多個(gè)子線(xiàn)程,而不是主線(xiàn)程了,這時(shí)RequestInterceptor攔截器里面的RequestContextHolder就無(wú)法獲取到請(qǐng)求的相關(guān)信息了,因?yàn)檎?qǐng)求信息在主線(xiàn)程的RequestContextHolder中。
解決辦法:
- 將主線(xiàn)程的RequestContextHolder請(qǐng)求屬性提前拿出來(lái),賦值給多個(gè)子線(xiàn)程的RequestContextHolder請(qǐng)求屬性就可以了。
@Override
public OrderConfirmVo confirmOrder() throws ExecutionException, InterruptedException {OrderConfirmVo confirmVo = new OrderConfirmVo();MemberRespVo memberRespVo = LoginUserInterceptor.loginUser.get();System.out.println("主線(xiàn)程..." + Thread.currentThread().getId());// 主線(xiàn)程RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();CompletableFuture<Void> getAddressFuture = CompletableFuture.runAsync(() -> {System.out.println("member子線(xiàn)程..." + Thread.currentThread().getId());// 在member子線(xiàn)程,設(shè)置請(qǐng)求信息RequestContextHolder.setRequestAttributes(requestAttributes);// 1. 遠(yuǎn)程查詢(xún)所有的收貨地址列表List<MemberAddressVo> address = memberFeignService.getAddress(memberRespVo.getId());confirmVo.setAddress(address);}, executor);CompletableFuture<Void> cartFuture = CompletableFuture.runAsync(() -> {System.out.println("cart子線(xiàn)程..." + Thread.currentThread().getId());// 在cart子線(xiàn)程,設(shè)置請(qǐng)求信息RequestContextHolder.setRequestAttributes(requestAttributes);// 2. 遠(yuǎn)程查詢(xún)購(gòu)物車(chē)所有選中的購(gòu)物項(xiàng)List<OrderItemVo> items = cartFeignService.getCurrentUserCartItems();confirmVo.setItems(items);}, executor);CompletableFuture.allOf(getAddressFuture,cartFuture).get();// 3. 查詢(xún)用戶(hù)積分Integer integration = memberRespVo.getIntegration();confirmVo.setIntegration(integration);// 4. 其他數(shù)據(jù)自動(dòng)計(jì)算// TODO 5. 防重令牌return confirmVo;
}
接口冪等性 處理(防重復(fù)提交)
冪等性 概念
冪等性概念:冪等性是指對(duì)同一個(gè)資源的多個(gè)請(qǐng)求,在業(yè)務(wù)邏輯上具有相同的結(jié)果。
冪等性 考慮情況
場(chǎng)景案例:
- 訂單業(yè)務(wù):用戶(hù)點(diǎn)了多次訂單,發(fā)起多次訂單請(qǐng)求,出現(xiàn)了多個(gè)訂單。
- 支付場(chǎng)景:用戶(hù)點(diǎn)了多次支付,發(fā)起了多次支付請(qǐng)求,結(jié)果扣款了多次。
哪些情況需要防止:
- 用戶(hù)多次點(diǎn)擊按鈕。
- 用戶(hù)頁(yè)面回退再次提交。
- 微服務(wù)互相調(diào)用,由于網(wǎng)絡(luò)問(wèn)題,導(dǎo)致請(qǐng)求失敗,feign觸發(fā)重試機(jī)制。
- 其他業(yè)務(wù)情況。
什么情況需要冪等?
- 有一些操作是天然冪等的。
例如:
select * from tableA from id = ?
update tab1 set col1 = 1 where col2 = 2
delete from user where userId = 1
insert into user(userId,name) values (1,0) userId作為唯一主鍵,只會(huì)插入一條用戶(hù)數(shù)據(jù)也是具備冪等性的。
無(wú)論執(zhí)行多少次,結(jié)果都一樣,不會(huì)改變狀態(tài),這些就是天然冪等的,具有冪等性的。
- 不具有冪等性的情況。
update tab1 set col1 = col1 + 1 where col2 = 2 ,每次操作執(zhí)行結(jié)果都會(huì)發(fā)生變化,這就不是冪等性的。
insert into user(userId,name) values (1,0) userId,name都不是唯一主鍵,可以重復(fù),這樣的也是不具備冪等性的。
冪等解決方案
token機(jī)制
- 服務(wù)端提供了發(fā)送 token 的接口。我們?cè)诜治鰳I(yè)務(wù)的時(shí)候,哪些業(yè)務(wù)是存在冪等問(wèn)題的, 就必須在執(zhí)行業(yè)務(wù)前,先去獲取 token,服務(wù)器會(huì)把 token 保存到 redis 中。
- 然后調(diào)用業(yè)務(wù)接口請(qǐng)求時(shí),把 token 攜帶過(guò)去,一般放在請(qǐng)求頭部。
- 服務(wù)器判斷 token 是否存在 redis 中,存在表示第一次請(qǐng)求,然后刪除 token,繼續(xù)執(zhí)行業(yè)務(wù)。
- 如果判斷 token 不存在 redis 中,就表示是重復(fù)操作,直接返回重復(fù)標(biāo)記給 client,這樣就保證了業(yè)務(wù)代碼,不被重復(fù)執(zhí)行。
例如:本項(xiàng)目,就是thymleaf渲染頁(yè)面前,就給頁(yè)面封裝了一個(gè)防重令牌(token),這樣就是前端一個(gè)令牌,后端redis存了一個(gè)令牌。
危險(xiǎn)性:
- 先刪除 token 還是后刪除 token;
- (1) 先刪除可能導(dǎo)致,業(yè)務(wù)確實(shí)沒(méi)有執(zhí)行,重試還帶上之前 token,由于防重設(shè)計(jì)導(dǎo)致, 請(qǐng)求還是不能執(zhí)行。
- (2) 后刪除可能導(dǎo)致,業(yè)務(wù)處理成功,但是服務(wù)閃斷,出現(xiàn)超時(shí),沒(méi)有刪除 token,別人繼續(xù)重試,導(dǎo)致業(yè)務(wù)被執(zhí)行兩邊
- (3) 我們最好設(shè)計(jì)為先刪除 token,如果業(yè)務(wù)調(diào)用失敗,就重新獲取 token 再次請(qǐng)求。
- Token 獲取、比較和刪除必須是原子性
- (1) redis.get(token) 、token.equals、redis.del(token)如果這兩個(gè)操作不是原子,可能導(dǎo) 致,高并發(fā)下,都 get 到同樣的數(shù)據(jù),判斷都成功,繼續(xù)業(yè)務(wù)并發(fā)執(zhí)行
- (2) 可以在 redis 使用 lua 腳本完成這個(gè)操作
if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end
// 1. 驗(yàn)證令牌[令牌的對(duì)比和刪除必須保證原子性]
// 0 令牌失敗,1刪除成功
String script = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
String orderToken = vo.getOrderToken();
Long result = redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList(OrderConstant.USER_ORDER_TOKEN_PREFIX + memberRespVo.getId()), orderToken
);
// 原子驗(yàn)證令牌和刪除令牌
if (result == 0l) {// 令牌驗(yàn)證失敗return response;
} else {// 令牌驗(yàn)證成功// 去創(chuàng)建訂單,驗(yàn)令牌,驗(yàn)價(jià)格,鎖庫(kù)存...
}
各種鎖機(jī)制
數(shù)據(jù)庫(kù)悲觀鎖
悲觀鎖使用時(shí),一般伴隨著事務(wù)一起使用,數(shù)據(jù)鎖定時(shí)間可能會(huì)很長(zhǎng),需要根據(jù)實(shí)際情況選用。另外注意的時(shí),id字段一定是主鍵或者唯一索引,不然可能造成鎖表的結(jié)果,處理起來(lái)會(huì)非常麻煩。
數(shù)據(jù)庫(kù)樂(lè)觀鎖
該方法適合更新的場(chǎng)景中:(帶版本號(hào))
update t_goods set count = count + 1 , version = version + 1
where good_id = 2 and version = 1
根據(jù)version版本,也就是再操作庫(kù)存錢(qián)先獲取到當(dāng)前商品的version版本號(hào),然后操作的時(shí)候帶上此version號(hào)。
例如:我們第一次操作庫(kù)存時(shí),得到version為1,調(diào)用庫(kù)存服務(wù)version變成了2;但返回給訂單服務(wù)出現(xiàn)了問(wèn)題,訂單服務(wù)又一次發(fā)起調(diào)用庫(kù)存服務(wù),當(dāng)訂單服務(wù)傳入的version還是1,在執(zhí)行上面的sql語(yǔ)句時(shí),就不會(huì)執(zhí)行;因?yàn)関ersion已經(jīng)變?yōu)榱?了,where條件就不成立。這樣就保證了不管調(diào)用幾次,只會(huì)真正的處理一次。
樂(lè)觀鎖主要使用于處理讀多寫(xiě)少的問(wèn)題。
業(yè)務(wù)層分布式鎖
如果多個(gè)機(jī)器可能在同一時(shí)間同時(shí)處理相同的數(shù)據(jù),比如:多臺(tái)機(jī)器定時(shí)任務(wù)都拿到了相同數(shù)據(jù)處理,我們就可以加分布式鎖,鎖定此數(shù)據(jù),處理完成后釋放鎖。獲取到鎖的必須先判斷這個(gè)數(shù)據(jù)是否背處理過(guò)。
各種唯一約束
數(shù)據(jù)庫(kù)唯一約束
插入數(shù)據(jù),按照唯一索引來(lái)進(jìn)行插入,比如:訂單號(hào),這樣相同的訂單不可能有兩條記錄插入。
redis set 防重
很多數(shù)據(jù)需要處理,只能被處理一次,比如:我們可以計(jì)算數(shù)據(jù)的MD5將其放入redis的set,每次處理數(shù)據(jù),先看這個(gè)MD5是否已經(jīng)存在,存在就不處理。
防重表
使用訂單號(hào)orderNo作為去重表的唯一索引,把唯一索引插入去重表,在進(jìn)行業(yè)務(wù)操作,且他們?cè)谕皇聞?wù)中。這個(gè)保證了重復(fù)請(qǐng)求時(shí),因?yàn)槿ブ乇碛形ㄒ患s束,導(dǎo)致請(qǐng)求失敗,避免了冪等問(wèn)題。這里注意的是,去重表和業(yè)務(wù)表應(yīng)該在同一庫(kù)中,這樣就保證了在同一個(gè)事務(wù),即使業(yè)務(wù)操作失敗了,也會(huì)把去重表的數(shù)據(jù)回滾。這個(gè)很好的保證了數(shù)據(jù)一致性。
全局請(qǐng)求唯一id
調(diào)用接口時(shí),生成一個(gè)唯一id,redis將數(shù)據(jù)保存到集合中(去重),存在即處理過(guò),可以使用nginx設(shè)置每一個(gè)請(qǐng)求的唯一id:
proxy_set_header X-Request-Id $request_id;
特別是 Feign服務(wù),觸發(fā)重發(fā)請(qǐng)求,拿著以前的老請(qǐng)求再重新發(fā),這樣可以給這個(gè)請(qǐng)求設(shè)置一個(gè)全局唯一ID,就算重復(fù)發(fā)了,也能檢測(cè)出來(lái)是否處理過(guò)。
💡Tips:也適用于鏈路追蹤
分布式事務(wù)
本地事務(wù):在分布式系統(tǒng),只能控制自己的回滾,控制不了其他服務(wù)的回滾。
分布式事務(wù):最大的問(wèn)題就是:網(wǎng)絡(luò)問(wèn)題 + 分布式機(jī)器。
本地事務(wù)
數(shù)據(jù)庫(kù)事務(wù)的四個(gè)特性:ACID
- 原子性(atomicity)
- 一致性(Consistency)
- 隔離性(isolation)
- 持久性(Durability)
事務(wù)的隔離級(jí)別:
- read uncommitted 讀未提交:別的事務(wù)會(huì)讀到其他未提交事務(wù)的數(shù)據(jù),問(wèn)題:臟讀。
- read committed 讀已提交:一個(gè)事務(wù)可以讀取另一個(gè)已提交的事務(wù),但多次讀取會(huì)造成不一樣的結(jié)果,問(wèn)題:不可重復(fù)讀問(wèn)題。Oracle 和 SQL server 默認(rèn)隔離級(jí)別。
- repeatable read 可重復(fù)讀:存在幻讀問(wèn)題。MySQL默認(rèn)隔離級(jí)別。
- serializable 序列化:等同于 串行 效率低。
事務(wù)的傳播行為:
- 一般都是required行為。
同一對(duì)象內(nèi)事務(wù)方法互調(diào)默認(rèn)失效,原因 繞過(guò)了代理對(duì)象,事務(wù)使用代理對(duì)象來(lái)控制的:
解決:使用代理對(duì)象來(lái)調(diào)用事務(wù)方法。
- 引入aop-starter ; spring-boot-starter-aop; 引入了aspectj
- @EnableAspectJAutoProxy(exposeProxy = true);開(kāi)啟aspectj 動(dòng)態(tài)代理功能。以后所有動(dòng)態(tài)代理都是對(duì)外暴露代理對(duì)象。
- 本類(lèi)互調(diào)用調(diào)用對(duì)象
OrderServiceImpl orderService = (OrderServiceImpl)AopContext.currentProxy();
orderService.b(); // 調(diào)用orderService對(duì)象的b方法
orderService.c(); // 調(diào)用orderService對(duì)象的c方法。
分布式事務(wù)的 問(wèn)題 以及 理論
CAP定理
CAP原則又稱(chēng)為CAP定理:
- Consistency 一致性:在分布式系統(tǒng)中的所有數(shù)據(jù)備份,在同一時(shí)刻是否同樣的值。(等同于所有節(jié)點(diǎn)訪(fǎng)問(wèn)同一份最新的數(shù)據(jù)副本)
- Availability 可用性:在集群中一部分節(jié)點(diǎn)故障后,集群整體是否還能影響客戶(hù)端的讀寫(xiě)請(qǐng)求。(對(duì)數(shù)據(jù)更新具備高可用性)
- Partition tolerance 分區(qū)容錯(cuò)性:大多數(shù)分布式系統(tǒng)分布在多個(gè)自網(wǎng)絡(luò)。每個(gè)子網(wǎng)絡(luò)就叫做一個(gè)區(qū)(partition)分區(qū)容錯(cuò)意思是:區(qū)間通信可能失敗,比如,一臺(tái)服務(wù)器放在中國(guó),另一臺(tái)服務(wù)器放在美國(guó),這就是兩個(gè)區(qū),他們之間可能無(wú)法通信。
CAP原則指的是,這三個(gè)要素最多只能同時(shí)實(shí)現(xiàn)兩點(diǎn),不可能三者兼顧。
分布式系統(tǒng)里面,分區(qū)容錯(cuò)肯定是要滿(mǎn)足的,然而 一致性 和 可用性 這二者是相互沖突的。
分布式系統(tǒng)中實(shí)現(xiàn)一致性的raft算法:Raft (采用領(lǐng)導(dǎo)發(fā)布的效果原理)
還有paxos算法。
💡Tip:總結(jié):一般市場(chǎng)上,都是基于AP的,無(wú)法保證c(強(qiáng)一致性),但是可以保證最終一致性。
BASE 理論
是對(duì)CAP理論的延伸,思想是即使無(wú)法做到強(qiáng)一致性(CAP的一致性就是強(qiáng)一致性
),但可以采用適當(dāng)?shù)牟捎萌跻恢滦?#xff0c;即最終一致性
。
BASE是指:
- Basically Available 基本可用:是指分布式系統(tǒng)在出現(xiàn)故障的時(shí)候,允許損失部分可用性(例如:響應(yīng)時(shí)間、功能上的可用性),允許損失部分可用性。需要注意的是:基本可用絕不等價(jià)于系統(tǒng)不可用。
- Soft State 軟狀態(tài):軟狀態(tài)是指允許系統(tǒng)存在中間狀態(tài),而該中間狀態(tài)不會(huì)影響系統(tǒng)整體可用性。
- Eventual Consistency 最終一致性(弱一致性):最終一致性是指系統(tǒng)中的所有數(shù)據(jù)副本經(jīng)過(guò)一定時(shí)間后,最終能夠達(dá)到一致的狀態(tài)。
分布式事務(wù)的 多種方案
2PC模式
場(chǎng)景:不適用于高并發(fā),適用于一般分布式事務(wù),該模式已經(jīng)被取代延申。
數(shù)據(jù)庫(kù)支持的 2PC【2 phase commit 二階提交】,又叫做 XA Transactions。
MySQL 從 5.5 版本開(kāi)始支持,SQL Server 2005 開(kāi)始支持,Oracle 7 開(kāi)始支持。
其中,XA 是一個(gè)兩階段提交協(xié)議,該協(xié)議分為以下兩個(gè)階段:
第一階段:事務(wù)協(xié)調(diào)器要求每個(gè)涉及到事務(wù)的數(shù)據(jù)庫(kù)預(yù)提交(precommit)此操作,并反映是 否可以提交.
第二階段:事務(wù)協(xié)調(diào)器要求每個(gè)數(shù)據(jù)庫(kù)提交數(shù)據(jù)。 其中,如果有任何一個(gè)數(shù)據(jù)庫(kù)否決此次提交,那么所有數(shù)據(jù)庫(kù)都會(huì)被要求回滾它們?cè)诖耸聞?wù) 中的那部分信息。
后來(lái),出現(xiàn)了3PC模式,了解即可。
柔性事務(wù) - TCC 事務(wù)補(bǔ)償型 方案(常用)
場(chǎng)景:不推薦在高并發(fā)場(chǎng)景下,也是常用的分布式事務(wù)解決。
剛性事務(wù):遵循 ACID 原則,強(qiáng)一致性。
柔性事務(wù):遵循 BASE 理論,最終一致性;
與剛性事務(wù)不同,柔性事務(wù)允許一定時(shí)間內(nèi),不同節(jié)點(diǎn)的數(shù)據(jù)不一致,但要求最終一致。
分三個(gè)階段:
第一階段 prepare 行為:調(diào)用各個(gè)服務(wù)的 Try 邏輯。
第二階段 commit 行為:調(diào)用各個(gè)服務(wù)的 Confirm 邏輯。
第三階段 rollback 行為:有一個(gè)服務(wù)異常,則進(jìn)行回滾操作 調(diào)用各個(gè)服務(wù)的 Cancel 邏輯。
此處,所謂的 補(bǔ)償 ,舉個(gè)例子:try邏輯是 某個(gè)數(shù)據(jù)-2 了,那么Cancel 補(bǔ)償邏輯里面就是 +2 .
柔性事務(wù) - 最大努力通知型方案(常用)
場(chǎng)景:基于消息服務(wù)的,適用高并發(fā)場(chǎng)景。
按規(guī)律進(jìn)行通知,不保證數(shù)據(jù)一定能通知成功,但會(huì)提供可查詢(xún)操作接口進(jìn)行核對(duì)。這種方案主要用在與第三方系統(tǒng)通訊時(shí),比如:調(diào)用微信或支付寶支付后的支付結(jié)果通知。這種方案也是結(jié)合 MQ 進(jìn)行實(shí)現(xiàn),例如:通過(guò) MQ 發(fā)送 http 請(qǐng)求,設(shè)置最大通知次數(shù)。達(dá)到通知次數(shù)后即不再通知。
案例:銀行通知、商戶(hù)通知等(各大交易業(yè)務(wù)平臺(tái)間的商戶(hù)通知:多次通知、查詢(xún)校對(duì)、對(duì)賬文件),支付寶的支付成功異步回調(diào)
就是不斷的通知,告訴你結(jié)果。
柔性事務(wù) - 可靠消息 + 最終一致性方案(常用)
場(chǎng)景:基于消息服務(wù)的,適用高并發(fā)場(chǎng)景。
實(shí)現(xiàn):業(yè)務(wù)處理服務(wù)在業(yè)務(wù)事務(wù)提交之前,向?qū)崟r(shí)消息服務(wù)請(qǐng)求發(fā)送消息,實(shí)時(shí)消息服務(wù)只記錄消息數(shù)據(jù),而不是真正的發(fā)送。業(yè)務(wù)處理服務(wù)在業(yè)務(wù)事務(wù)提交之后,向?qū)崟r(shí)消息服務(wù)確認(rèn)發(fā)送。只有在得到確認(rèn)發(fā)送指令后,實(shí)時(shí)消息服務(wù)才會(huì)真正發(fā)送。
也是出現(xiàn)問(wèn)題,發(fā)送消息,服務(wù)接受到消息后就進(jìn)行回滾,與上面那個(gè)相比多了消息這一步。
Seata 框架
Seata 介紹
官方地址:https://seata.io/zh-cn/
Seata是一款開(kāi)源的分布式事務(wù)解決方案,致力于提高高性能和簡(jiǎn)單易用的分布式事務(wù)服務(wù)。提供了多種模式:AT、TCC、SAGA 和 XA事務(wù)模式。
AT:auto自動(dòng)模式
TCC:Try、Confirm、Cancel
Seata很好理解,首先,弄明白下面三個(gè)名詞:
TC:事務(wù)協(xié)調(diào)者,維護(hù)全局和分支。
TM:事務(wù)管理器,處理全局事務(wù)的 開(kāi)始、提交、回滾操作。(所謂的全局事務(wù)是 誰(shuí)是主業(yè)務(wù)發(fā)起的遠(yuǎn)程調(diào)用,那么TM就在誰(shuí)的上面。)
RM:資源管理器,維護(hù)分支事務(wù)。
Seata 環(huán)境搭建
- 創(chuàng)建 UNDO_LOG 表
SEATA AT 模式需要 UNDO_LOG 表:(文檔描述很清楚)
- 給每個(gè)微服務(wù)對(duì)應(yīng)的 數(shù)據(jù)庫(kù) 都創(chuàng)建一個(gè) UNDO_LOG 表。
- 目的:記錄日志的狀態(tài),確定是否回滾。
- 安裝事務(wù)協(xié)調(diào)器:seata-server。
- 根據(jù)指示來(lái)就行,此處我們下載的1.0.0 GA 版本的seata。
- 在common-server公共服務(wù)中,導(dǎo)入Seata相關(guān)依賴(lài):
<!-- 分布式事務(wù)seata -->
<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
- 查找一個(gè)seata-all的依賴(lài),查看版本:確保與事務(wù)協(xié)調(diào)器版本一致。也可以去SpringCloud 官方去看對(duì)應(yīng)版本。
- 本項(xiàng)目是 1.5.2 ,那么事務(wù)協(xié)調(diào)器對(duì)應(yīng)的也是1.5.2版本!
- 從github上面,下載seata-server(事務(wù)協(xié)調(diào)器),先解壓。
- 相關(guān)配置文件解釋:
register.conf 注冊(cè)相關(guān)配置:
- registry 配置注冊(cè)中心,type配置什么類(lèi)型的注冊(cè)中心。
config 配置配置中心,type也是用的什么類(lèi)型的配置中心:
默認(rèn):file類(lèi)型,seata服務(wù)默認(rèn)有個(gè)file.conf文件。也可以改成nacos。
store 配置:配置seata的存儲(chǔ)方式。
- 本次采用file文件方式存儲(chǔ)。
不同版本可能不太一樣,但是效果都差不多的:
- 下面是1.5.2 seata 版本:
- 配置好config、registry、store后,進(jìn)入bin,啟動(dòng)seata項(xiàng)目。
- 并查看nacos是否注冊(cè)成功。
- 所有想要用到分布式事務(wù)的客戶(hù)端微服務(wù),都要適用seata DataSourceProxy代理自己的數(shù)據(jù)源。(seata github上面有介紹)
- 參考官方地址:https://github.com/seata/seata-samples/tree/master/springcloud-jpa-seata
package com.atguigu.gulimall.order.config;import com.zaxxer.hikari.HikariDataSource;
import io.seata.rm.datasource.DataSourceProxy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.StringUtils;import javax.sql.DataSource;/* @author: xuyanbo* @date: 2023/11/2 18:57*/
@Configuration
public class MySeataConfig {// 數(shù)據(jù)源自帶的配置屬性@AutowiredDataSourceProperties dataSourceProperties;@Beanpublic DataSource dataSource(DataSourceProperties dataSourceProperties){// 根據(jù)源碼創(chuàng)建數(shù)據(jù)源以及設(shè)置相關(guān)配置HikariDataSource dataSource = dataSourceProperties.initializeDataSourceBuilder().type(HikariDataSource.class).build();if (StringUtils.hasText(dataSourceProperties.getName())){dataSource.setPoolName(dataSourceProperties.getName());}// 用seata的代理數(shù)據(jù)源,來(lái)配置即可。return new DataSourceProxy(dataSource);}}
- 每個(gè)客戶(hù)端微服務(wù),配置seata的相關(guān)信息。
- 不同版本可能不太一樣,舊版是引入file.conf 和 registry.conf文件。
- 1.5.2版本的seata,是通過(guò)配置application.yml實(shí)現(xiàn):
💡Tips:seata.tx-service-group=服務(wù)名 和 service.vgroup-mapping.服務(wù)名 的配置,用來(lái)映射seata-server的識(shí)別。
spring:datasource:username: rootpassword: rooturl: jdbc:mysql://www.gulimall.com:3306/gulimall_omsdriver-class-name: com.mysql.cj.jdbc.Driver
mybatis-plus:mapper-locations: classpath:/mapper//*.xmlglobal-config:db-config:id-type: auto # 主鍵自增
server:port: 9000# Seata 配置
seata:tx-service-group: gulimall-order #這里每個(gè)服務(wù)都是對(duì)應(yīng)不同的映射名,在配置中心可以看到registry:type: nacosnacos:server-addr: localhost:8848group: DEFAULT_GROUPservice:vgroup-mapping:#這里也要注意 key為映射名,gulimall-order: default
- 給主事務(wù)服務(wù)(訂單服務(wù))添加@GlobalTransactional 和 @Transactional注解。
- 子事務(wù)服務(wù)(倉(cāng)儲(chǔ)服務(wù))添加 @Transactional注解 即可。
@GlobalTransactional
@Transactional
@Override
public SubmitOrderResponseVo submitOrder(OrderSubmitVo vo) {SubmitOrderResponseVo response = new SubmitOrderResponseVo();confirmVoThreadLocal.set(vo);
...
- 啟動(dòng)測(cè)試分布式事務(wù)。
Seata 的幾個(gè)模式
Seata 默認(rèn)是AT模式:是2PC的演變,屬于補(bǔ)償性質(zhì)。
官方案例都已經(jīng)給出,幾種模式的案例:
AT模式:相當(dāng)于自動(dòng)解鎖,不適用于高并發(fā)的分布式事務(wù),僅僅適用于一般的分布式事務(wù)。
要根據(jù)實(shí)際情況來(lái),應(yīng)用屬于自己的分布式事務(wù)。
訂單服務(wù)采用 最終一致性方案 解決分布式事務(wù)問(wèn)題
因?yàn)?#xff0c;訂單服務(wù)屬于高并發(fā)服務(wù),使用其他分布式方案可能會(huì)出現(xiàn)嚴(yán)重問(wèn)題。
因此,考慮使用 可靠消息 + 最終一致性方案
,進(jìn)而保證高并發(fā)。
RabbitMQ 延時(shí)隊(duì)列(實(shí)現(xiàn)定時(shí)任務(wù))
場(chǎng)景:比如未付款訂單,超過(guò)一定時(shí)間后,系統(tǒng)自動(dòng)取消訂單并釋放占有物品。
舊版本:Spring的schedule定時(shí)任務(wù)輪詢(xún)數(shù)據(jù)庫(kù)
缺點(diǎn):消耗系統(tǒng)內(nèi)存、增加了數(shù)據(jù)庫(kù)的壓力、存在較大的時(shí)間誤差。
新版本 RabbitMQ版本解決:rabbitmq的消息TTL和死信Exchange結(jié)合。
消息的TTL(Time To Live)就是消息的存活時(shí)間。
RabbitMQ可以對(duì)隊(duì)列
和消息
分別設(shè)置TTL。
- 對(duì)隊(duì)列設(shè)置就是隊(duì)列沒(méi)有消費(fèi)者連著的保留時(shí)間,也可以對(duì)每一個(gè)單獨(dú)的消息做單獨(dú)的設(shè)置。超過(guò)了這個(gè)時(shí)間,我們認(rèn)為這個(gè)消息就死了,稱(chēng)之為死信。
- 如果隊(duì)列設(shè)置了,消息也設(shè)置了,那么會(huì)取小的。所以一個(gè)消息如果被路由到不同的隊(duì)列中,這個(gè)消息死亡的事件有可能不一樣(不同的隊(duì)列設(shè)置)。這里單個(gè)消息的TTL,因?yàn)樗攀菍?shí)現(xiàn)延遲任務(wù)的關(guān)鍵??梢酝ㄟ^(guò)設(shè)置消息的expiration字段或者x-message-ttl屬性來(lái)設(shè)置時(shí)間,兩者是一樣的效果。
理解DLX(Dead Letter Exchanges):
什么是死信(消息)?
- 一個(gè)消息被Consumer拒收了,并且reject方法的參數(shù)里requeue是false。也就是說(shuō)不會(huì)被再次放在隊(duì)列里,被其他消費(fèi)者使用。(basic.reject/basic.nack)requeue=false
- 上面的消息的TTL到了,消息過(guò)期了。
- 隊(duì)列的長(zhǎng)度限制滿(mǎn)了。排在前面的消息會(huì)被丟棄或者扔到死信路由上。
效果圖:
- 涉及兩種:消息設(shè)置過(guò)期時(shí)間 和 隊(duì)列設(shè)置過(guò)期時(shí)間。
💡Tips:
推薦使用的是隊(duì)列設(shè)置過(guò)期時(shí)間
。因?yàn)?#xff0c;在消息設(shè)置過(guò)期時(shí)間中,rabbitmq采用惰性檢查機(jī)制
。例如:第一個(gè)消息5分鐘過(guò)期,第二個(gè)消息1分鐘過(guò)期,那么第二個(gè)消息就必須等第一個(gè)消息過(guò)期了才能被檢測(cè)到。
延遲隊(duì)列的 設(shè)計(jì)和實(shí)現(xiàn)
基本上是每一個(gè)微服務(wù)對(duì)應(yīng)一個(gè)交換機(jī)就夠了,交換機(jī)命名方式如:服務(wù)名-事件-exchange 對(duì)應(yīng)的一系列服務(wù)。
業(yè)務(wù)流程圖:
畫(huà)設(shè)計(jì)流程圖:(與業(yè)務(wù)流程圖一個(gè)效果)
💡Tips:注意,交換機(jī)是被復(fù)用了,不要嚴(yán)格定義什么死信交換機(jī)之類(lèi)的。其實(shí),都是普通隊(duì)列和交換機(jī),只不過(guò)復(fù)用出了不同效果而已。
SpringCloud集成RabbitMQ了相關(guān)內(nèi)容,直接通過(guò)@Bean進(jìn)行注入,創(chuàng)建即可:
package com.atguigu.gulimall.order.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/* @author: xuyanbo* @date: 2023/11/3 14:13*/
@Configuration
public class MyMQConfig {/* 通過(guò)@Bean方式,將Binding、Queue、Exchange 自動(dòng)創(chuàng)建RabbitMQ對(duì)應(yīng)的交換機(jī)、隊(duì)列、綁定等。*/// 創(chuàng)建死信隊(duì)列@Beanpublic Queue orderDelayQueue(){Map<String, Object> arguments = new HashMap<>();/* x-dead-letter-exchange 綁定死信交換機(jī)* x-dead-letter-routing-key 綁定死信路由* x-message-ttl 綁定過(guò)期時(shí)間*/arguments.put("x-dead-letter-exchange","order-event-exchange");arguments.put("x-dead-letter-routing-key","order.release.order");arguments.put("x-message-ttl",60000);Queue queue = new Queue("order.delay.queue",true,false,false,arguments);return queue;}// 普通隊(duì)列@Beanpublic Queue orderReleaseOrderQueue(){Queue queue = new Queue("order.release.order.queue",true,false,false);return queue;}// 聲明交換機(jī)(根據(jù)路由復(fù)用了,區(qū)分好路由和綁定關(guān)系即可)@Beanpublic Exchange orderEventExchange(){TopicExchange topicExchange = new TopicExchange("order-event-exchange",true,false);return topicExchange;}// 聲明綁定死信隊(duì)列關(guān)系@Beanpublic Binding orderCreateOrderBingding(){return new Binding("order.delay.queue",Binding.DestinationType.QUEUE,"order-event-exchange","order.create.order",null);}// 聲明綁定正常消費(fèi)隊(duì)列關(guān)系@Beanpublic Binding orderReleaseOrderBingding(){return new Binding("order.release.order.queue",Binding.DestinationType.QUEUE,"order-event-exchange","order.release.order",null);}}
💡Tips:RabbitMQ中,已經(jīng)創(chuàng)建了Binding、Queue、Exchange ,在重新啟動(dòng)微服務(wù)執(zhí)行@Bean時(shí),并不會(huì)重新創(chuàng)建也不會(huì)修改屬性之類(lèi)的。
解決辦法:手動(dòng)刪除即可。
這樣就可以測(cè)試一下,延遲隊(duì)列的效果:
// 1. 隨便寫(xiě)個(gè)接口:
@Autowired
RabbitTemplate rabbitTemplate;@GetMapping("/test/orderCreate")
@ResponseBody
public String createOrderTest(){// 訂單下單成功OrderEntity entity = new OrderEntity();entity.setOrderSn(UUID.randomUUID().toString());entity.setModifyTime(new Date());rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",entity);return "ok";
}
// 2. 寫(xiě)個(gè)監(jiān)聽(tīng)器測(cè)試是否成功
@RabbitListener(queues = "order.release.order.queue")
public void listener(OrderEntity entity, Channel channel , Message message) throws IOException {System.out.println("收到的過(guò)期的訂單信息,準(zhǔn)備關(guān)閉訂單:" + entity.getOrderSn());channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
鎖和解鎖庫(kù)存 架構(gòu)實(shí)現(xiàn)
業(yè)務(wù)流程圖,如下:
- 引入rabbitmq依賴(lài)。
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- rabbitmq相關(guān)配置
spring.rabbitmq.host=www.gulimall.com
spring.rabbitmq.virtual-host=/
- 添加 @EnableRabbit 啟動(dòng)類(lèi)
- 添加rabbitmq的序列化機(jī)制轉(zhuǎn)換
package com.atguigu.gulimall.ware.config;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/* @author: xuyanbo* @description: TODO* @date: 2023/10/27 14:43*/
@Configuration
public class MyRabbitConfig {// 使用JSON序列化機(jī)制,進(jìn)行消息轉(zhuǎn)換@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}}
- 給庫(kù)存服務(wù),添加一系列的交換機(jī)、隊(duì)列、綁定。
package com.atguigu.gulimall.ware.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/* @author: xuyanbo* @description: TODO* @date: 2023/10/27 14:43*/
@Configuration
public class MyRabbitConfig {// 使用JSON序列化機(jī)制,進(jìn)行消息轉(zhuǎn)換@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}// fixme 這個(gè)監(jiān)聽(tīng)的作用就是為了觸發(fā)一下rabbitmq,觸發(fā)成功就會(huì)創(chuàng)建這些交換機(jī)或者綁定之類(lèi)的。@RabbitListener(queues = "stock.release.stock.queue")public void handler(){}// 庫(kù)存服務(wù)的默認(rèn)交換機(jī)@Beanpublic Exchange stockEventExchange(){return new TopicExchange("stock-event-exchange",true,false);}// 普通隊(duì)列@Beanpublic Queue stockReleaseStockQueue(){return new Queue("stock.release.stock.queue",true,false,false);}// 延遲隊(duì)列(死信隊(duì)列)@Beanpublic Queue stockDelayQueue(){Map<String, Object> arguments = new HashMap<>();/* x-dead-letter-exchange 綁定死信交換機(jī)* x-dead-letter-routing-key 綁定死信路由* x-message-ttl 綁定過(guò)期時(shí)間*/arguments.put("x-dead-letter-exchange","stock-event-exchange");arguments.put("x-dead-letter-routing-key","stock.release");// 2分鐘arguments.put("x-message-ttl",120000);return new Queue("stock.delay.queue",true,false,false,arguments);}// 正常隊(duì)列的綁定關(guān)系@Beanpublic Binding stockReleaseBinding(){return new Binding("stock.release.stock.queue",Binding.DestinationType.QUEUE,"stock-event-exchange","stock.release.#",null);}// 死信隊(duì)列的綁定關(guān)系@Beanpublic Binding stockLockedBinding(){return new Binding("stock.delay.queue",Binding.DestinationType.QUEUE,"stock-event-exchange","stock.locked",null);}}
💡Tips:記得添加一個(gè)監(jiān)聽(tīng)方法,不然沒(méi)辦法觸發(fā)一下rabbitmq創(chuàng)建這些交換機(jī)或者綁定之類(lèi)的。
- 啟動(dòng)服務(wù)查看創(chuàng)建。
訂單服務(wù) 庫(kù)存解鎖 場(chǎng)景
庫(kù)存解鎖的場(chǎng)景:
- 下訂單成功,訂單過(guò)期沒(méi)有支付被系統(tǒng)自動(dòng)取消、被用戶(hù)手動(dòng)取消。都要解鎖庫(kù)存。
- 下訂單成功,庫(kù)存鎖定成功,接下來(lái)的業(yè)務(wù)調(diào)用失敗,導(dǎo)致訂單回滾。之前鎖定的庫(kù)存就要自動(dòng)解鎖。
數(shù)據(jù)結(jié)構(gòu)圖:
查詢(xún)數(shù)據(jù)庫(kù)關(guān)于這個(gè)訂單的鎖定庫(kù)存信息。兩種情況:
- 有:證明庫(kù)存鎖定成功了。 解鎖就要看訂單情況。
- 沒(méi)有這個(gè)訂單。必須解鎖。
- 有這個(gè)訂單。就要看訂單狀態(tài)。訂單已取消:解鎖庫(kù)存。 沒(méi)取消:不能解鎖。
- 沒(méi)有:庫(kù)存鎖定失敗了,庫(kù)存回滾了。這種情況無(wú)需解鎖。
還有重要一點(diǎn),Rabbitmq監(jiān)聽(tīng)必須設(shè)置為手動(dòng)ack模式:
只要解鎖庫(kù)存的消息失敗。一定告訴服務(wù)解鎖失敗,一定要啟動(dòng)手動(dòng)ack模式,
# 配置:spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 代碼:channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
案例代碼:
- 一定要使用這種try - catch 方式來(lái)手動(dòng)回復(fù),簡(jiǎn)潔又方便。
- 這樣拋出異常等同于消息沒(méi)有處理成功,進(jìn)行拒絕并重新放回隊(duì)列。
- 沒(méi)有拋出異常等同于消息處理成功,ack手動(dòng)返回true
package com.atguigu.gulimall.ware.listener;import com.atguigu.common.to.mq.StockLockedTo;
import com.atguigu.gulimall.ware.service.WareSkuService;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.io.IOException;/* @author: xuyanbo* @description: TODO* @date: 2023/11/4 10:36*/
@Service
@RabbitListener(queues = "stock.release.stock.queue")
public class StockReleaseListener {@AutowiredWareSkuService wareSkuService;/* 1. 庫(kù)存自動(dòng)解鎖:* 下訂單成功,庫(kù)存鎖定成功。接下來(lái)的業(yè)務(wù)調(diào)用失敗,導(dǎo)致訂單回滾。之前鎖定的庫(kù)存就要自動(dòng)解鎖。* 2. 訂單失敗。* 鎖庫(kù)存失敗** 只要解鎖庫(kù)存的消息失敗。一定告訴服務(wù)解鎖失敗,一定要啟動(dòng)手動(dòng)ack模式, channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);。* @param to* @param message*/@RabbitHandlerpublic void handleStockLockedRelease(StockLockedTo to, Message message , Channel channel) throws IOException {// 這樣拋出異常等同于消息沒(méi)有處理成功,進(jìn)行拒絕并重新放回隊(duì)列。// 沒(méi)有拋出異常等同于消息處理成功,ack手動(dòng)返回truetry {System.out.println("收到解決庫(kù)存的消息");wareSkuService.unlockStock(to);channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);} catch (Exception e) {channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);}}}
/* 解鎖* 1. 查詢(xún)數(shù)據(jù)庫(kù)關(guān)于這個(gè)訂單的鎖定庫(kù)存信息。* 兩種情況:* 有:證明庫(kù)存鎖定成功了。* 解鎖:看訂單情況。* 1. 沒(méi)有這個(gè)訂單。必須解鎖。* 2. 有這個(gè)訂單。就要看訂單狀態(tài)。* 已取消:解鎖庫(kù)存。* 沒(méi)取消:不能解鎖。* 沒(méi)有:庫(kù)存鎖定失敗了,庫(kù)存回滾了。這種情況無(wú)需解鎖。*/
@Override
public void unlockStock(StockLockedTo to) {StockDetailTo detail = to.getDetail();Long detailId = detail.getId();WareOrderTaskDetailEntity byId = orderTaskDetailService.getById(detailId);if (byId != null) {// 解鎖Long id = to.getId();WareOrderTaskEntity taskEntity = orderTaskService.getById(id);String orderSn = taskEntity.getOrderSn(); // 根據(jù)訂單號(hào)查詢(xún)訂單狀態(tài)R r = orderFeignService.getOrderStatus(orderSn);if (r.getCode() == 0) {// 訂單數(shù)據(jù)返回成功OrderVo data = r.getData(new TypeReference<OrderVo>() {});// 訂單不存在 或者 4訂單已經(jīng)取消 都要解鎖庫(kù)存if (data == null || data.getStatus() == 4) {// 訂單已經(jīng)被取消了。才能解鎖庫(kù)存if (byId.getLockStatus() == 1) {unLockStock(detail.getSkuId(), detail.getWareId(), detail.getSkuNum(), detailId);} }} else {// 消息拒接以后重新放到隊(duì)列里面,讓別人繼續(xù)消息解鎖throw new RuntimeException("遠(yuǎn)程服務(wù)失敗");}} else {// 無(wú)需解鎖}
}private void unLockStock(Long skuId, Long wareId, Integer num, Long taskDetailId) {// 庫(kù)存解鎖wareSkuDao.unlockStock(skuId, wareId, num);// update `wms_ware_sku` set stock_locked = stock_locked - #{num}// where sku_id = #{skuId} and ware_id = #{wareId}// 更新庫(kù)存工作單的狀態(tài)WareOrderTaskDetailEntity entity = new WareOrderTaskDetailEntity();entity.setId(taskDetailId);entity.setLockStatus(2); // 變?yōu)橐呀怄iorderTaskDetailService.updateById(entity);
}
訂單服務(wù) 定時(shí)關(guān)單 場(chǎng)景
架構(gòu)圖:
其實(shí),就是設(shè)置一個(gè)延遲隊(duì)列即可。
注意:訂單關(guān)閉了,必須向庫(kù)存服務(wù)發(fā)送一個(gè)消息,解鎖庫(kù)存。
整體流程也是通過(guò)rabbitmq監(jiān)聽(tīng)器 等信息實(shí)現(xiàn):
- 注意:此處用到了兩個(gè)@RabbitHandler注解,來(lái)區(qū)分了哪個(gè)是訂單到時(shí)后的主動(dòng)解鎖,哪個(gè)是鎖庫(kù)存之后的解鎖。
RabbitMQ 消息積壓、丟失、重復(fù)解決方案
如何保證消息可靠性 - 消息丟失
消息丟失場(chǎng)景 一:消息發(fā)送出去,由于網(wǎng)絡(luò)原因沒(méi)有抵達(dá)服務(wù)器。
解決辦法:
- 做好容錯(cuò)方法(try - catch)發(fā)送消息可能會(huì)網(wǎng)絡(luò)失敗,失敗后要有重試機(jī)制,可記錄到數(shù)據(jù)庫(kù),采用定期掃描重發(fā)的方式。
// 發(fā)給MQ一個(gè)
try {// TODO 保證消息一定會(huì)發(fā)送出去,每一個(gè)消息都可以做好日志記錄。給數(shù)據(jù)庫(kù)保存每一個(gè)消息的詳細(xì)信息。// TODO 定期掃描數(shù)據(jù)庫(kù)將失敗的消息再發(fā)送一遍。// 執(zhí)行這個(gè)方法的時(shí)候,執(zhí)行完了,網(wǎng)絡(luò)延遲或者失敗了。就要拋出異常,走catch。rabbitTemplate.convertAndSend("order-event-exchange","order.release.other",orderTo);
} catch (Exception e) {// TODO 將沒(méi)發(fā)送成功的消息進(jìn)行重試發(fā)送。...
}
- 做好日志記錄,每個(gè)消息狀態(tài)是否都被服務(wù)器收到都應(yīng)該記錄。
可以設(shè)置一個(gè)MQ消息表
,將每一個(gè)消息保存下來(lái),定期掃描數(shù)據(jù)庫(kù)將失敗的消息再發(fā)送一遍
CREATE TABLE `mq_message` (`message_id` char(32) NOT NULL,`content` text,`to_exchane` varchar(255) DEFAULT NULL,`routing_key` varchar(255) DEFAULT NULL,`class_type` varchar(255) DEFAULT NULL,`message_status` int(1) DEFAULT '0' COMMENT '0-新建 1-已發(fā)送 2-錯(cuò)誤抵達(dá) 3-已抵達(dá)',`create_time` datetime DEFAULT NULL,`update_time` datetime DEFAULT NULL,
PRIMARY KEY (`message_id`)
) ENGINE=InnoDB AUTO_INCREMENT=13 DEFAULT CHARSET=utf8mb4 COMMENT='MQ消息表';
- 做好定期重發(fā),如果消息沒(méi)有發(fā)送成功,定期去數(shù)據(jù)庫(kù)掃描未成功的消息進(jìn)行重發(fā)。
消息丟失場(chǎng)景 二:消息抵達(dá)Broker,Broker要將消息寫(xiě)入磁盤(pán)(持久化)才算成功。此時(shí),Broker尚未持久化完成,宕機(jī)。
解決方式:
publisher也必須加入確認(rèn)回調(diào)機(jī)制,通過(guò)生成者和消費(fèi)者的確認(rèn)機(jī)制解決該問(wèn)題
,確認(rèn)成功的消息,修改數(shù)據(jù)庫(kù)消息狀態(tài)。
消息丟失場(chǎng)景 三:自動(dòng)ACK的狀態(tài)下。消費(fèi)者收到消息,但沒(méi)來(lái)得及處理消息,服務(wù)器宕機(jī)了。
解決方式:
一定開(kāi)啟手動(dòng)ACK,消費(fèi)成功才移除,失敗或者沒(méi)來(lái)的及處理就noAck并重新入隊(duì)。
如何保證消息可靠性 - 消息重復(fù)
消息重復(fù)場(chǎng)景:
- 消息消費(fèi)成功,事務(wù)已經(jīng)提交,ack時(shí),機(jī)器宕機(jī)。導(dǎo)致沒(méi)有ack成功,Broker的消息重新由unack變?yōu)閞eady,并發(fā)送給其他消費(fèi)者。
- 消息消費(fèi)事變, 由于重試機(jī)制,自動(dòng)又將消息發(fā)送出去。
- 成功消費(fèi),ack時(shí)宕機(jī),消息由unack變?yōu)閞eady,Broker又重新發(fā)送。
解決方法:
- 消費(fèi)者的業(yè)務(wù)消費(fèi)接口應(yīng)該設(shè)置為
冪等性
的。比如:扣庫(kù)存有工作單的狀態(tài)標(biāo)志。 - 使用
防重表(redis/mysql)
,發(fā)送消息每一個(gè)都有業(yè)務(wù)的唯一標(biāo)識(shí),處理過(guò)就不用處理過(guò)了。 - RabbitMQ的每一個(gè)消息都有
redelivered字段
,可以獲取是否是被重新投遞過(guò)來(lái)的,而不是第一次投遞過(guò)來(lái)的。
如何保證消息可靠性 - 消息積壓
消費(fèi)積壓場(chǎng)景:
- 消費(fèi)者宕機(jī)積壓。
- 消費(fèi)者消費(fèi)能力不足積壓。
- 發(fā)送者發(fā)送流量太大。
解決方式:
- 上線(xiàn)更多的消費(fèi)者,進(jìn)行正常消費(fèi)。
- 上線(xiàn)專(zhuān)門(mén)的隊(duì)列消費(fèi)服務(wù),將消息先批量取出來(lái),記錄數(shù)據(jù)庫(kù),離線(xiàn)慢慢處理。