品牌設(shè)計(jì)網(wǎng)站怎樣推廣自己的廣告
一、概述
消息隊(duì)列
定義
- 消息隊(duì)列模型:一種分布式系統(tǒng)中的消息傳遞方案,由消息隊(duì)列、生產(chǎn)者和消費(fèi)者組成
- 消息隊(duì)列:負(fù)責(zé)存儲(chǔ)和管理消息的中間件,也稱(chēng)為消息代理(Message Broker)
- 生產(chǎn)者:負(fù)責(zé) 產(chǎn)生并發(fā)送 消息到隊(duì)列的應(yīng)用程序
- 消費(fèi)者:負(fù)責(zé)從隊(duì)列 獲取并處理 消息的應(yīng)用程序
功能
:實(shí)現(xiàn)消息發(fā)送和處理的解耦,支持異步通信,提高系統(tǒng)的可擴(kuò)展性和可靠性- 主流消息隊(duì)列解決方案
- RabbitMQ:輕量級(jí),支持多種協(xié)議,適合中小規(guī)模應(yīng)用
- RocketMQ:阿里開(kāi)源,高性能,適合大規(guī)模分布式應(yīng)用
Stream
定義
:Stream:Redis 5.0 引入的一種數(shù)據(jù)類(lèi)型,用于處理高吞吐量的消息流、事件流等場(chǎng)景功能
:按時(shí)間順序 ”添加、讀取、消費(fèi)“ 消息,支持消費(fèi)者組、消息確認(rèn)等功能
二、Stream 工作流程
- 寫(xiě)入消息:
- 生產(chǎn)者通過(guò)?
XADD
?向 Stream 中添加消息。每條消息自動(dòng)獲得唯一的 ID,按時(shí)間順序存入 Stream。
- 生產(chǎn)者通過(guò)?
- 創(chuàng)建消費(fèi)者組
- 如果使用消費(fèi)者組,首先需要通過(guò)?
XGROUP CREATE
?創(chuàng)建消費(fèi)者組。 - 消費(fèi)者組會(huì)根據(jù)時(shí)間順序?qū)⑾⒎峙浣o組內(nèi)的消費(fèi)者。
- 如果使用消費(fèi)者組,首先需要通過(guò)?
- 讀取消息:
- 消費(fèi)者使用
XREADGROUP
命令讀取 Stream 中的消息。 - 消息按規(guī)則分配給不同消費(fèi)者處理,每個(gè)消費(fèi)者讀取到不同的消息。
- 消費(fèi)者使用
- 確認(rèn)消息:
- 消費(fèi)者在處理完消息后,使用?
XACK
?命令確認(rèn)消息,表示該消息已成功處理。 - 如果消息未確認(rèn)(例如消費(fèi)者崩潰或超時(shí)),它將保持在?Pending?狀態(tài),等待重新分配給其他消費(fèi)者。
- 消費(fèi)者在處理完消息后,使用?
- 重新分配未確認(rèn)消息:
- 如果消息在一定時(shí)間內(nèi)沒(méi)有被確認(rèn),其他消費(fèi)者可以讀取未確認(rèn)的消息并進(jìn)行處理。
- 可通過(guò)
XPENDING
命令查看未確認(rèn)消息,或在消費(fèi)者組中設(shè)置時(shí)間閾值自動(dòng)重新分配。
- 刪除消費(fèi)者組:
- 不再需要消費(fèi)者組時(shí),使用
XGROUP DESTROY
命令刪除消費(fèi)者組
- 不再需要消費(fèi)者組時(shí),使用
三、Stream 實(shí)現(xiàn)
消費(fèi)者組模式
定義
:Redis Streams 的一部分,用于處理消息的分布式消費(fèi)優(yōu)點(diǎn)
- 消息分流:多消費(fèi)者爭(zhēng)搶消息,加快消費(fèi)速度,避免消息堆積
- 消息標(biāo)示:避免消息漏讀,消費(fèi)者讀取消息后不馬上銷(xiāo)毀,加入 consumerGroup 維護(hù)的 pending list 隊(duì)列等待 ACK
- 消息確認(rèn):通過(guò)消息 ACK 機(jī)制,保證消息至少被消費(fèi)一次
- 可以阻塞讀取,避免盲等
實(shí)現(xiàn)方法
:通過(guò) Stream 數(shù)據(jù)類(lèi)型實(shí)現(xiàn)消息隊(duì)列,命令以 “X” 開(kāi)頭
常用命令
XGROUP CREATE key groupName ID [MKSTREAM]
- 功能:創(chuàng)建消費(fèi)者組
- 參數(shù)
- key:隊(duì)列名稱(chēng)
- groupName:組名稱(chēng)
- ID:起始 ID 標(biāo)識(shí),$ 表示隊(duì)列中最后一個(gè)消息,0 表示隊(duì)列中第一個(gè)消息
- MKSTREAM:隊(duì)列不存在則創(chuàng)建隊(duì)列
XGROUP DESTORY key groupName
- 功能:刪除指定消費(fèi)者組
XGROUP CREATECONSUMER key groupName consumerName
- 功能:添加組中消費(fèi)者
XGROUP DELCONSUMER key groupName consumerName
- 功能:刪除組中消費(fèi)者
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key …] ID [ID …]
- 功能:讀取組中的消息
- gourp:消費(fèi)者組名稱(chēng)
- consumer:消費(fèi)者名稱(chēng)(不存在則自動(dòng)創(chuàng)建)
- count:本次查詢(xún)的最大數(shù)量
- BLOCK milliseconds:當(dāng)沒(méi)有消息時(shí)最長(zhǎng)等待時(shí)間
- NOACK:無(wú)需手動(dòng) ACK,獲取到消息后自動(dòng)確認(rèn)
- STREAMS KEY:指定隊(duì)列名稱(chēng)
- ID:獲取消息的起始 ID,
>
表示從下一個(gè)未消費(fèi)消息開(kāi)始 (常用)
XPENDING key group [ [ IDLE min-idle-time ] start end count [consumer] ]
- 功能:獲取 pending-list 中的消息
- IDLE:獲取消息后、確認(rèn)消息前的這段時(shí)間,空閑時(shí)間超過(guò) min-idle-time 則取出
- start:獲取的最小目標(biāo) ID
- end:獲取的最大目標(biāo) ID
- count:獲取的數(shù)量
- consumer:獲取 consumer 的 pending-list
XACK key group ID [ ID … ]
- 功能:確認(rèn)從組中讀取的消息已被處理
- key:隊(duì)列名稱(chēng)
- group:組名稱(chēng)
- ID:消息的 ID
表格版命令
-
命令
命令 功能 XGROUP CREATE key groupName ID [MKSTREAM] 創(chuàng)建消費(fèi)者組 XGROUP DESTORY key groupName 刪除指定消費(fèi)者組 XGROUP CREATECONSUMER key groupName consumerName 添加組中消費(fèi)者 XGROUP DELCONSUMER key groupName consumerName 刪除組中消費(fèi)者 XREADGROUP GROUP groupName consumerName [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key …] ID [ID …] 讀取組中的消息,ID 填寫(xiě) “ >
” 則讀取第一條未讀消息XACK key group ID [ ID … ] 確認(rèn)從組中讀取的消息已被處理 -
屬性
屬性名 定義 key 隊(duì)列名稱(chēng) groupName 消費(fèi)者組名稱(chēng) ID 起始 ID 標(biāo)示,$ 代表隊(duì)列中最后一個(gè)消息,0 代表第一個(gè)消息 MKSTREAM 隊(duì)列不存在時(shí)自動(dòng)創(chuàng)建隊(duì)列 BLOCK milliseconds 沒(méi)有消息時(shí)的最大等待時(shí)長(zhǎng) NOACK 無(wú)需手動(dòng) ACK,獲取到消息后自動(dòng)確認(rèn) STREAMS key 指定隊(duì)列名稱(chēng)
運(yùn)行邏輯
while(true) {// 嘗試監(jiān)聽(tīng)隊(duì)列,使用阻塞模式,最長(zhǎng)等待 2000 msObject msg = redis.call("XREADGROUP GROUP group1 consumer1 COUNT 1 BLOCK 2000 STREAMS s1 >");if(msg == null) {continue;}try {// 處理消息,完成后一定要 ACKhandleMessage(msg);} catch (Exception e) {while(true) {// 重新讀取阻塞隊(duì)列消息Object msg = redis.call("XREADGROUP GROUP group1 consumer1 COUNT 1 STREAM S1 0");if(msg == null) // 如果阻塞隊(duì)中的消息已經(jīng)全部處理則退出pending-listbreak;try {handleMessage(msg); // 重新處理 pending-list 中的消息} catch (Exception e){continue; // 如果還出錯(cuò), 則繼續(xù)重新讀取}}}
}
四、示例
-
目標(biāo):消息隊(duì)列實(shí)現(xiàn)數(shù)據(jù)庫(kù)異步修改數(shù)據(jù)庫(kù),將下單 message 緩存在 redis 中,減小下單操作對(duì)數(shù)據(jù)庫(kù)的沖擊
-
項(xiàng)目結(jié)構(gòu)
- RedisConfig?配置類(lèi):創(chuàng)建消費(fèi)者組是一次性的操作,適合放在配置類(lèi)中
- VoucherOrderHandler?內(nèi)部類(lèi):消費(fèi)者的邏輯和訂單業(yè)務(wù)相關(guān),因此適合放在?VoucherOrderServiceImpl?中
- 多線程啟動(dòng)邏輯:消費(fèi)者線程的啟動(dòng)與訂單業(yè)務(wù)密切相關(guān),直接放在?VoucherOrderServiceImpl?類(lèi)中更符合職責(zé)分離原則
src/main/java ├── com/example │ ├── config │ │ └── RedisConfig.java // Redis 配置類(lèi),包含消費(fèi)者組初始化 │ ├── service │ │ ├── VoucherOrderService.java │ │ └── impl │ │ └── VoucherOrderServiceImpl.java // 包含 VoucherOrderHandler 內(nèi)部類(lèi) │ ├── entity │ │ └── VoucherOrder.java // 優(yōu)惠券訂單實(shí)體 │ ├── utils │ │ └── BeanUtil.java // 用于 Map 轉(zhuǎn) Bean 的工具類(lèi) │ └── controller │ └── VoucherOrderController.java // 如果有 Controller
-
創(chuàng)建消費(fèi)者組(config.RedisConfig)
@Bean public void initStreamGroup() {// 檢查是否存在消費(fèi)者組 g1try {stringRedisTemplate.opsForStream().createGroup("stream.orders", "g1");} catch (RedisSystemException e) {// 如果 group 已存在,拋出異常,可忽略log.warn("消費(fèi)者組 g1 已存在");} }
-
創(chuàng)建消費(fèi)者線程
- 位置:作為 VoucherOrderServiceImpl 內(nèi)的預(yù)構(gòu)造部分
@PostConstruct public void startConsumers() {for (int i = 0; i < 5; i++) { // 5 個(gè)線程,模擬多個(gè)消費(fèi)者new Thread(new VoucherOrderHandler()).start();} }
-
添加消息到消息隊(duì)列 (src/main/resources/lua/SECKILL_SCRIPT.lua)
--1. 參數(shù)列表 --1.1. 優(yōu)惠券id local voucherId = ARGV[1] --1.2. 用戶(hù)id local userId = ARGV[2] --1.3. 訂單id local orderId = ARGV[3]--2. 數(shù)據(jù)key local stockKey = 'seckill:stock:' .. voucherId --2.1. 庫(kù)存key local orderKey = 'seckill:order' .. voucherId --2.2. 訂單key--3. 腳本業(yè)務(wù) --3.1. 判斷庫(kù)存是否充足 get stockKey if( tonumber( redis.call('GET', stockKey) ) <= 0 ) thenreturn 1 end --3.2. 判斷用戶(hù)是否重復(fù)下單 SISMEMBER orderKey userId if( redis.call( 'SISMEMBER', orderKey, userId ) == 1 ) thenreturn 2 end --3.4 扣庫(kù)存 incrby stockKey -1 redis.call( 'INCRBY', stockKey, -1 ) --3.5 下單(保存用戶(hù)) sadd orderKey userId redis.call( 'SADD', orderKey, userId ) -- 3.6. 發(fā)送消息到隊(duì)列中 redis.call( 'XADD', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId )
-
創(chuàng)建消費(fèi)者類(lèi)(ServiceImpl)
- 位置:作為 VoucherOrderServiceImpl 內(nèi)的私有類(lèi)
// 在ServiceImpl中創(chuàng)建一個(gè)VoucherOrderHandler消費(fèi)者類(lèi),專(zhuān)門(mén)用于處理消息隊(duì)列中的消息 private class VoucherOrderHandler implements Runnable {@Overridepublic void run() {while (true) {try {// 1. 獲取消息隊(duì)列中的訂單信息List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create( "stream.order", ReadOffset.lastConsumed()));// 2. 沒(méi)有消息則重新監(jiān)聽(tīng)if (list == null || list.isEmpty() ) continue;// 3. 獲取消息中的 voucherOrderMapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);// 4. 創(chuàng)建訂單createVoucherOrder(voucherOrder);// 5. 確認(rèn)當(dāng)前消息已消費(fèi) XACKstringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());} catch ( Exception e) {log.error("處理訂單異常", e);// 6. 處理訂單失敗則消息會(huì)加入pending-list,繼續(xù)處理pending-listhandlePendingList();}}}// 處理pending-list中的消息private void handlePendingList() {while(true) {try {// 1. 消費(fèi)pending-list中的消息List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"), // 消費(fèi)者此消息的消費(fèi)者StreamReadOptions.empty().count(1), // StreamOffset.create("stream.order", ReadOffset.from("0")) // 從pending-list的第一條消息開(kāi)始讀);// 2. 退出條件, list 為空 -> pending-list 已全部處理if(list == null || list.isEmpty()) break;// 3. 獲取消息中的 voucherOrderMapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);// 4. 創(chuàng)建訂單createVoucherOrder(voucherOrder);// 5. 確認(rèn)消息已消費(fèi)(XACK)stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());} catch (Exception e) {log.error("處理pendding訂單異常", e);try{Thread.sleep(20); // 如果發(fā)生異常則休眠一會(huì)再重新消費(fèi)pending-list中的消息} catch (Exception e2) {e.printStackTrace(); }}}} }
-
創(chuàng)建消息方法
- 目標(biāo):用戶(hù)通過(guò)這個(gè)方法發(fā)送一條創(chuàng)建訂單的 Message 給 Redis Stream
// 創(chuàng)建Lua腳本對(duì)象 private static final DefaultRedisScript<Long> SECKILL_SCRIPT;// Lua腳本初始化 (通過(guò)靜態(tài)代碼塊) static {SECKILL_SCRIPT = new DefaultRedisScript<>();SECKILL_SCRIPT.setLocation(new ClassPathResource("lua/SECKILL_SCRIPT.lua"));SECKILL_SCRIPT.setResultType(Long.class); }@Override public void createVoucherOrder(Long voucherId, Long userId) {// 生成訂單 ID(模擬)long orderId = System.currentTimeMillis();// 執(zhí)行 Lua 腳本Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(), // 使用空的 key 列表voucherId.toString(), userId.toString(), String.valueOf(orderId));// 根據(jù) Lua 腳本返回結(jié)果處理if (result == 1) {throw new RuntimeException("庫(kù)存不足!");} else if (result == 2) {throw new RuntimeException("不能重復(fù)下單!");}// 如果腳本執(zhí)行成功,則訂單消息會(huì)進(jìn)入 Redis Stream,消費(fèi)者組會(huì)自動(dòng)處理System.out.println("訂單創(chuàng)建成功!"); }
(缺陷) 單消費(fèi)者模式
常用命令
- XADD key [NOMKSTREAM] [MAXLEN | MINID [=|~] threshold [LIMIT count] * | ID field value [field value …]
- XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ ID … ]
缺陷
:有消息漏讀風(fēng)險(xiǎn)
五、其他消息隊(duì)列方案
(缺陷) List 實(shí)現(xiàn)
優(yōu)點(diǎn)
- 不受 JVM 內(nèi)存上限限制:因?yàn)槔?Redis 存儲(chǔ)
- 數(shù)據(jù)安全 :因?yàn)榛?List 結(jié)構(gòu)本身是數(shù)據(jù)存儲(chǔ),基于 Redis 持久化機(jī)制
- 消息有序性:通過(guò) List 結(jié)構(gòu)的 LPUSH & BRPOP 命令實(shí)現(xiàn)順序
缺點(diǎn)
- 消息丟失:BRPOP 的時(shí)候如果宕機(jī)則消息會(huì)丟失
- 只支持單消費(fèi)者
(缺陷) PubSub 實(shí)現(xiàn)
- 定義
- Publish & Subscribe 模型,一種消息隊(duì)列模型
- 生產(chǎn)者向指定的 channel 來(lái) public 消息
- 消費(fèi)者從 subscribe 的 channel 中接收消息
- 功能:支持多消費(fèi)者模式,多個(gè)消費(fèi)者可以同時(shí) subscribe 一個(gè) channel
- 優(yōu)點(diǎn):采用發(fā)布訂閱模型,支持多生產(chǎn)者、消費(fèi)者
- 缺點(diǎn)
- 不支持?jǐn)?shù)據(jù)持久化
- 無(wú)法避免消息丟失
- 消息堆積有上限,超出時(shí)數(shù)據(jù)丟失