公司網(wǎng)站備案 問我借身份證 怎么拒絕百度網(wǎng)址入口
對于未上傳完的文件片占用的磁盤空間,則是通過Redis+Kafka實現(xiàn)動態(tài)延時任務(wù)的存儲與下發(fā)執(zhí)行,保證與MySQL的最終一致性
從 “業(yè)務(wù)背景”、“技術(shù)方案”、“為何用Redis+Kafka”、“如何保證最終一致性” 四個角度來分析:
🧩 一、業(yè)務(wù)背景
在實現(xiàn)大文件 分片上傳、斷點續(xù)傳 功能時,有以下問題:
- 用戶上傳的文件被分成多個小片,每個片段會先暫存在服務(wù)器硬盤的臨時目錄中;
- 文件合并前,這些文件片會一直占據(jù)磁盤空間;
- 如果用戶上傳到一半就不繼續(xù)了(無論是誤操作還是惡意攻擊),這些碎片就永遠(yuǎn)不會被合并清理;
- 從而造成 磁盤空間長期占用、資源浪費、甚至導(dǎo)致服務(wù)崩潰(被攻擊壓垮)。
所以:必須有一種機制自動清理這些未完成上傳的文件碎片。
🔧 二、技術(shù)方案總覽
核心目標(biāo):每個文件片上傳完后,給它設(shè)置一個“延時任務(wù)”,比如:如果 5 秒內(nèi)沒有繼續(xù)上傳,那就清理這個文件的所有碎片。
技術(shù)選型:
Redis
:用來快速緩存上傳狀態(tài)、延時任務(wù)(高性能讀寫);Kafka
:作為可靠的異步消息中間件,負(fù)責(zé)延時任務(wù)的可靠下發(fā)和最終落地執(zhí)行(高可靠性);MySQL
:真正的數(shù)據(jù)持久化落地(低頻寫、穩(wěn)定性要求高)。
🚀 三、為什么用 Redis + Kafka?
? Redis 的作用:
- 用
ZSet
(有序集合)存儲所有文件上傳任務(wù)及其過期時間; - 每次上傳文件片,都會刷新延時時間(例如當(dāng)前時間 + 5秒);
- 定時調(diào)度器(比如每秒執(zhí)行)掃描 Redis 中到期的任務(wù);
- 如果任務(wù)到期未上傳完,就發(fā)消息到 Kafka。
👉 Redis 保證快速更新和調(diào)度精度,但本身不做復(fù)雜業(yè)務(wù)處理。
? Kafka 的作用:
- Kafka 消費端監(jiān)聽 Redis 發(fā)過來的超時任務(wù);
- 收到后執(zhí)行任務(wù):刪除對應(yīng)磁盤碎片,更新數(shù)據(jù)庫中“未完成上傳空間”等信息;
- Kafka 保證消息可靠投遞、可重試、可順序、具備持久化和冪等保障。
👉 Kafka 確保任務(wù)“最終會被執(zhí)行”,即使 Redis/調(diào)度器短暫異常,消息也不會丟。
🧩 四、如何保證與 MySQL 的最終一致性?
這是句子中的關(guān)鍵——“最終一致性”的意思是:即使中間發(fā)生延遲、系統(tǒng)抖動等,只要系統(tǒng)恢復(fù),MySQL 里的數(shù)據(jù)最終會被正確更新,和 Redis 的狀態(tài)保持一致。
?? 具體機制:
階段 | 數(shù)據(jù)存儲層 | 行為 |
---|---|---|
上傳過程中 | Redis | 每次上傳一個文件片,就更新 Redis 中該任務(wù)的延遲時間;同時更新 Redis 中該用戶“未完成上傳空間”字段 |
任務(wù)過期 | Kafka | Redis 將過期任務(wù)發(fā)到 Kafka 任務(wù)隊列 |
任務(wù)執(zhí)行 | 消費端處理 | 1. 刪除磁盤碎片; 2. 將 Redis 中統(tǒng)計的空間同步到 MySQL(更新“未完成上傳空間”字段) 3. 刪除 Redis 中該任務(wù)記錄 |
上傳完成 | 應(yīng)用服務(wù) | 合并文件,Redis 中清除上傳狀態(tài),同時從 Redis 和數(shù)據(jù)庫中扣除“未完成上傳空間”并加到“已完成上傳空間” |
只要 Kafka 不丟消息,消費端一定會最終完成這個任務(wù) → 保證 MySQL 和 Redis 中的數(shù)據(jù)最終一致。
? 總結(jié)
“對于未上傳完的文件片占用的磁盤空間,則是通過Redis+Kafka實現(xiàn)動態(tài)延時任務(wù)的存儲與下發(fā)執(zhí)行,保證與MySQL的最終一致性”
可以如下理解:
- 未上傳完的文件片會臨時占用磁盤,需要被及時清理;
- 每個上傳任務(wù)會被緩存到 Redis(使用 ZSet);
- 到期的任務(wù)由 Redis 調(diào)度器識別后,通過 Kafka 發(fā)送消息通知執(zhí)行清理;
- Kafka 消費端負(fù)責(zé)真正去清理碎片文件 + 同步更新數(shù)據(jù)庫;
- 即便系統(tǒng)某一刻異常,只要 Kafka 的消息還在,最終一定會將 MySQL 的狀態(tài)修正正確;
- 這樣就達(dá)到了:高性能緩存 + 高可靠調(diào)度 + 數(shù)據(jù)最終一致。
? 簡歷中表達(dá)方式:
為應(yīng)對大文件上傳中因中斷或惡意攻擊導(dǎo)致的磁盤碎片堆積問題,設(shè)計并實現(xiàn)基于
Redis ZSet + Kafka
的動態(tài)延時清理機制:
- 使用 Redis 高性能緩存上傳任務(wù)及延時狀態(tài);
- 超時任務(wù)通過 Kafka 異步下發(fā),保障任務(wù)可靠落地執(zhí)行;
- 任務(wù)執(zhí)行過程更新用戶未完成上傳空間并持久化至 MySQL,最終實現(xiàn)數(shù)據(jù)庫與緩存狀態(tài)一致。
當(dāng)然可以,下面我通過一個實際例子,完整演示如何用 Redis + Kafka
來管理未上傳完成的文件片所引發(fā)的磁盤空間占用問題,以及如何保證最終一致性。
🎯 背景場景
假設(shè)用戶 user123 在上傳一個視頻文件,文件大小為 100MB,分成了 10個片段(每片10MB)。系統(tǒng)配置:
- 用戶上傳空間上限:500MB
- 磁盤碎片的保存目錄按月存儲,例如
/tmp/upload/2025-06/user123/abcd1234/
- 延時清理時間為:5秒未上傳下一個片段就觸發(fā)清理任務(wù)
🧩 實際操作流程
🪛 第一次上傳(上傳第1個文件片):
-
用戶上傳第1片(10MB),服務(wù)器接收后將它緩存在
/tmp/upload/2025-06/user123/abcd1234/part1
。 -
系統(tǒng)計算
md5 = abcd1234
(該文件的唯一標(biāo)識)。 -
系統(tǒng)將任務(wù)寫入 Redis:
ZSet 名稱:zset_upload_tasks_1 key: abcd1234+user123 score: 當(dāng)前時間戳 + 5秒(延時清理時間)
-
在 Redis 的另一個
Hash
中記錄:user123 -> unfinishedSize = 10MB
-
任務(wù)調(diào)度器每秒掃描一次 ZSet,發(fā)現(xiàn)該任務(wù)未超時 → 不處理。
🧱 上傳第2片(第2次上傳):
- 用戶緊接著上傳第2片,系統(tǒng)發(fā)現(xiàn)已有這個任務(wù) → 刷新 Redis 中的延時任務(wù)時間(當(dāng)前時間 + 5秒)。
unfinishedSize
累加至 20MB。
🧨 用戶斷開連接,上傳中斷
假設(shè)用戶上傳完第3片(共30MB)后就關(guān)閉瀏覽器了,系統(tǒng)檢測不到任何新的上傳。
?? Redis 延時任務(wù)觸發(fā)
-
5秒過去,調(diào)度器再次掃描 ZSet:
- 發(fā)現(xiàn)
abcd1234+user123
的任務(wù)已超時。
- 發(fā)現(xiàn)
-
系統(tǒng)將該任務(wù)信息通過 Kafka 發(fā)送出去:
{"type": "upload_timeout","userId": "user123","fileMd5": "abcd1234","size": 30MB,"tmpPath": "/tmp/upload/2025-06/user123/abcd1234/"
}
🔁 Kafka 消費端處理任務(wù)
Kafka 消費者監(jiān)聽到這條消息,進(jìn)行以下操作:
-
刪除臨時目錄
/tmp/upload/2025-06/user123/abcd1234/
下所有文件片(節(jié)省磁盤空間)。 -
將 Redis 中
unfinishedSize = 30MB
減去該任務(wù)對應(yīng)的大小。 -
同步更新 MySQL 數(shù)據(jù)庫中:
use_space_unfinished = use_space_unfinished - 30MB
-
日志記錄該任務(wù)已完成,確保冪等。
📌 最終效果(總結(jié))
時間點 | 狀態(tài) | 磁盤使用 | Redis 狀態(tài) | 數(shù)據(jù)庫狀態(tài) |
---|---|---|---|---|
上傳中 | 正常寫入 | +30MB | unfinishedSize = 30MB | unchanged |
超時后 | Kafka觸發(fā) | -30MB | unfinishedSize = 0MB | use_space_unfinished - 30MB |
🎓 一句話總結(jié)
“用戶上傳文件中斷后,Redis 中的延時任務(wù)自動觸發(fā)清理邏輯,通過 Kafka 下發(fā)異步清理任務(wù),釋放磁盤空間并同步更新 MySQL 中的已用空間,確保緩存和持久化層的最終一致性?!?/p>
- 分布式緩存與消息隊列協(xié)作機制的理解;
- Redis 延時調(diào)度能力;
- Kafka 冪等消費處理;
- 數(shù)據(jù)一致性策略(最終一致);
如何基于 Redis + Kafka 管理未上傳完成的文件片,防止服務(wù)器硬盤被惡意占用并確保最終一致性
📘 技術(shù)分享稿:未上傳文件片的清理與一致性保障機制
在開發(fā) SmartDrive 云盤系統(tǒng) 時,我們遇到一個潛在的系統(tǒng)穩(wěn)定性問題:如果用戶上傳文件過程中中斷(例如惡意攻擊、頻繁取消上傳等),未完成的文件分片會持續(xù)占用服務(wù)器磁盤空間。由于這些文件尚未合并,數(shù)據(jù)庫中的用戶“已使用空間”不會更新,長此以往可能導(dǎo)致服務(wù)器磁盤資源被占滿,影響服務(wù)可用性。
為了解決這一問題,我們設(shè)計并實現(xiàn)了一個基于 Redis + Kafka 的動態(tài)延時任務(wù)系統(tǒng),實現(xiàn)文件片清理、用戶空間回收以及數(shù)據(jù)一致性保障。
🔧 實現(xiàn)方案
-
文件上傳分片記錄與限時清理機制
- 用戶上傳每一個文件片時,我們通過
文件MD5 + 用戶ID
構(gòu)造唯一標(biāo)識,并將該上傳任務(wù)存入 Redis 的分布式 ZSet(有序集合)中,設(shè)置延遲時間(如5秒)作為 score。 - 若用戶持續(xù)上傳,則不斷刷新該任務(wù)的過期時間;若中斷,延遲任務(wù)最終會被觸發(fā)。
- 用戶上傳每一個文件片時,我們通過
-
空間占用統(tǒng)計
- Redis 中為每個用戶維護(hù)兩個字段:
use_space_finished
(已上傳完畢)和use_space_unfinished
(上傳中)。 - 每次分片上傳成功后,
use_space_unfinished
累加對應(yīng)片段大小,但不立即寫入數(shù)據(jù)庫,以減少數(shù)據(jù)庫壓力。
- Redis 中為每個用戶維護(hù)兩個字段:
-
Kafka 異步任務(wù)分發(fā)
- 當(dāng)調(diào)度器檢測到某個上傳任務(wù)超時后,會將其作為“上傳中斷”事件寫入 Kafka 任務(wù)隊列。
- Kafka 消費者接收到該事件后,執(zhí)行清理邏輯:刪除臨時目錄下的文件片、扣減 Redis 和數(shù)據(jù)庫中的
use_space_unfinished
,同時記錄日志確保冪等性。
-
一致性保障
- Redis 負(fù)責(zé)高頻寫操作,Kafka 消費者在空閑時批量落庫,保證 Redis 與 MySQL 的最終一致性。
- 通過唯一任務(wù) ID 和冪等機制,確保即使任務(wù)重復(fù)下發(fā)也不會出現(xiàn)重復(fù)扣減或誤刪文件。
💡 舉例說明
假設(shè)用戶 user123
上傳一個 100MB 的視頻,被分成 10 片。上傳前3片后中斷,系統(tǒng)記錄:
- Redis 中
use_space_unfinished = 30MB
- 臨時目錄占用磁盤空間為 30MB
當(dāng)用戶5秒內(nèi)未繼續(xù)上傳,調(diào)度器觸發(fā) Kafka 任務(wù):
- 刪除
/tmp/upload/.../user123/md5xyz/
下所有片段 - Kafka 消費者更新 Redis 和 MySQL,使
use_space_unfinished -= 30MB
最終,系統(tǒng)磁盤被及時釋放,數(shù)據(jù)狀態(tài)一致,避免了無效數(shù)據(jù)積壓。
📌 效果與優(yōu)勢
- ? 系統(tǒng)可用性提升:及時清理無效數(shù)據(jù),防止磁盤被打爆
- ? 高性能:Redis 實現(xiàn)高頻寫入,Kafka 異步處理延時任務(wù),系統(tǒng)抗壓能力強
- 🔐 安全性好:即使遭遇惡意攻擊,也能快速識別并清理
- 📈 一致性保障:Redis 與 MySQL 數(shù)據(jù)最終一致,確保用戶體驗無誤
這個機制目前已經(jīng)穩(wěn)定運行在我們云盤系統(tǒng)的上傳鏈路中,極大地提升了系統(tǒng)的健壯性與可維護(hù)性。如果大家有類似文件上傳、延時處理或一致性問題,也可以參考我們這套 Redis + Kafka 架構(gòu)模式。
架構(gòu)優(yōu)點、對比其他方案、以及可選替代方案
? 為什么選擇 Redis + Kafka 架構(gòu)模式?
一、Redis 的優(yōu)勢:高速緩存 + 精準(zhǔn)調(diào)度
- Redis 支持毫秒級延時處理(通過
ZSet + score + timestamp
實現(xiàn)),適合存儲上傳任務(wù)及調(diào)度時間。 - 具備高并發(fā)處理能力,可支持上傳過程中對用戶空間的快速寫入與實時更新。
- 使用內(nèi)存存儲避免頻繁訪問數(shù)據(jù)庫,緩解數(shù)據(jù)庫壓力。
二、Kafka 的優(yōu)勢:高吞吐 + 異步解耦
- Kafka 具備高可用、高吞吐、可回溯等特性,適合作為任務(wù)通知的“緩沖帶”。
- Redis 負(fù)責(zé)狀態(tài)判斷和觸發(fā),Kafka 負(fù)責(zé)任務(wù)發(fā)出后的“慢處理”,避免阻塞請求線程。
- 異步解耦:上傳請求不需要等待清理邏輯完成,極大提升響應(yīng)速度與系統(tǒng)穩(wěn)定性。
三、二者組合的優(yōu)勢
- Redis 做“調(diào)度器”,Kafka 做“執(zhí)行者”,既確保調(diào)度精度,又實現(xiàn)任務(wù)緩沖與最終一致性。
- Redis 失效后 Kafka 仍可保留任務(wù);Kafka 消費異常可重試,具備很好的魯棒性和可恢復(fù)性。
🔁 與其他架構(gòu)方案對比
架構(gòu)模式 | 優(yōu)勢 | 缺點 |
---|---|---|
? Redis + Kafka | 高性能、高解耦、最終一致性強 | 系統(tǒng)設(shè)計復(fù)雜度略高 |
? 直接寫數(shù)據(jù)庫 | 簡單直觀 | 并發(fā)高時容易寫崩庫,寫放大嚴(yán)重,影響主業(yè)務(wù) |
? 僅用 Redis 實現(xiàn)延時清理 | 寫性能好 | 無法可靠落盤,易丟失任務(wù);需要自行實現(xiàn)冪等與持久化 |
? Quartz / ScheduledExecutorService | 精度較低,線程消耗高 | 不適合大規(guī)模任務(wù)調(diào)度,任務(wù)量大時調(diào)度不穩(wěn)定 |
? RabbitMQ / 延遲隊列 | 可替代 Kafka | 但吞吐與可靠性不如 Kafka,且不易追蹤任務(wù)執(zhí)行狀態(tài) |
🌟 其他可選實現(xiàn)方式(如果 Redis + Kafka 不可用)
1. 使用 Redis + 延遲隊列中間件(如 RabbitMQ 的延遲插件、RocketMQ 延遲消息)
- 替代 Kafka
- 適合中小型系統(tǒng),且部署運維成本較低
2. 使用 Redis + 定時任務(wù)輪詢(如 ScheduledExecutor + Redis)
- 定期掃描 Redis 中即將過期的任務(wù)并執(zhí)行
- 適用于任務(wù)量不大、調(diào)度精度要求不高的場景
3. 利用 Redisson 的 RDelayedQueue
- 支持分布式延遲隊列,適用于 Redis 原生不支持的延遲任務(wù)功能
- 配合業(yè)務(wù)邏輯處理較輕的場景,快速上線
🧠 總結(jié):為什么 Redis + Kafka 更好?
Redis 解決了高頻訪問場景下的快速讀寫 + 精準(zhǔn)延時調(diào)度,Kafka 解決了異步、解耦、冪等、高吞吐處理的問題,兩者結(jié)合:
- 性能強、可靠性高
- 調(diào)度精度好
- 任務(wù)追蹤容易
- 系統(tǒng)擴(kuò)展性強
這套架構(gòu)非常適合大型上傳系統(tǒng)中復(fù)雜的上傳狀態(tài)追蹤、用戶空間管控與數(shù)據(jù)一致性需求,能夠在面對高并發(fā)、突發(fā)流量、異常上傳行為時保持系統(tǒng)穩(wěn)定。
Redis ZSet + Kafka 在上傳文件片時的使用場景中的技術(shù)細(xì)節(jié)解析
🔧 背景場景簡介
當(dāng)用戶上傳大文件時,通常會進(jìn)行分片上傳。但如果惡意用戶僅上傳部分文件片(不合并完成),這些碎片可能長時間占用磁盤資源,導(dǎo)致服務(wù)器空間耗盡。
為此,系統(tǒng)需監(jiān)控這些“未合并文件片”的生命周期,并在長時間未完成上傳的情況下及時清理無效文件片。
這就引出了 Redis + Kafka 的組合使用:
? Redis ZSet 的作用(任務(wù)調(diào)度器)
技術(shù)細(xì)節(jié):
- 使用 Redis 的有序集合(ZSet) 存儲每個未完成上傳任務(wù)。
- Key:如
upload_timeout_task_bucket_{n}
(分桶方案) - Member:任務(wù)唯一 ID,例如
md5_userid
- Score:當(dāng)前時間戳 + 超時時間(例如 5 分鐘)
示例:
ZADD upload_timeout_task_bucket_1 1718178000 md5_1234
表示用戶1234上傳的某個文件片,在 1718178000
(約5分鐘后)仍未完成,則視為超時。
擴(kuò)展細(xì)節(jié):
- 使用內(nèi)存 Map 緩存
md5+userid → zset桶編號
映射,提高查找性能。 - 每次用戶上傳一個新分片時,就更新對應(yīng)任務(wù)的超時時間,延遲5秒或更長時間再次觸發(fā)判斷。
- 如果任務(wù)上傳完成,則從 ZSet 刪除,避免誤清理。
? Kafka 的作用(任務(wù)下發(fā)執(zhí)行者)
技術(shù)細(xì)節(jié):
- Redis 的調(diào)度器定期掃描 ZSet 中即將到期的任務(wù)(例如每秒掃描
score ≤ 當(dāng)前時間戳
的任務(wù))。 - 對這些任務(wù)使用 Kafka 發(fā)送清理事件,異步通知后端執(zhí)行清理動作。
Kafka 消息示例:
{"taskId": "md5_1234","userId": "1234","action": "clean_unfinished_chunks","timestamp": 1718178000
}
- 消息被下游服務(wù)異步消費,刪除文件碎片、釋放空間,并更新數(shù)據(jù)庫與緩存狀態(tài)。
- 具備 冪等處理能力:每條消息帶唯一 ID,消費者側(cè)使用去重機制防止重復(fù)清理。
🔐 并發(fā)與一致性保障機制
分桶設(shè)計:
- 將所有延時任務(wù)分散到多個 ZSet,例如 10 個桶:
upload_timeout_task_bucket_0
到_9
,按哈希值取模分配。 - 減少單個 ZSet 的 size,提升調(diào)度器掃描效率和延遲控制精度。
冪等性控制:
- Kafka 消息處理必須是冪等的,即同一條任務(wù)消息不管消費幾次,最終狀態(tài)一致。
- 可在數(shù)據(jù)庫或 Redis 中記錄任務(wù)處理狀態(tài)(如 taskId → processed)防止重復(fù)執(zhí)行。
與數(shù)據(jù)庫一致性:
- Redis 在內(nèi)存中快速處理臨時狀態(tài);
- Kafka 負(fù)責(zé)寫操作的異步最終一致性通知;
- 最終由數(shù)據(jù)庫記錄實際使用空間(如已上傳/未完成空間大小字段)。
📌 總結(jié)一句話
使用 Redis ZSet 精準(zhǔn)調(diào)度未完成上傳的文件片生命周期,Kafka 異步可靠下發(fā)清理任務(wù),兩者協(xié)作實現(xiàn)了高并發(fā)場景下的磁盤保護(hù)、狀態(tài)可追蹤、任務(wù)冪等、最終一致性處理,有效防止惡意上傳攻擊,保障系統(tǒng)穩(wěn)定性。
ZSet 分桶
“ZSet 分桶”是一個在高并發(fā)或大規(guī)模數(shù)據(jù)處理場景下的性能優(yōu)化策略。它的核心思想是:將原本存儲在一個 Redis 有序集合(ZSet)中的大量任務(wù),拆分成多個 ZSet 存儲,分散訪問壓力,提高查詢效率與調(diào)度精度。
🧠 為什么要“分桶”?
當(dāng)你把所有延遲任務(wù)都放在一個 Redis ZSet 里,比如叫 upload_timeout_tasks
,隨著時間推移,這個集合會變得非常大。ZSet 的查詢效率雖然不錯,但:
ZRANGEBYSCORE
查詢的是有序數(shù)據(jù),任務(wù)一多,掃描就慢;- 每秒調(diào)度器都要掃描一次“過期任務(wù)”,訪問壓力集中;
- 單個 ZSet 容量太大,也可能成為 Redis 內(nèi)存碎片化或阻塞操作的隱患。
為了解決這個問題,我們“分桶”。
🧰 ZSet 分桶的做法
舉個例子:
假設(shè)我們要存儲 100 萬個上傳文件的延時清理任務(wù),不再用一個 ZSet,而是:
upload_timeout_task_bucket_0
upload_timeout_task_bucket_1
...
upload_timeout_task_bucket_9
共 10 個“桶”(ZSet)。我們把任務(wù)“均勻分布”到這些桶中。
如何分布任務(wù)?
可以根據(jù)任務(wù)的哈希值取模分桶,例如:
int bucketIndex = (md5 + userId).hashCode() % 10;
將這個任務(wù)放入第 bucketIndex
個桶中。
這樣每個 ZSet 只維護(hù)一小部分任務(wù),大大減少了單個桶的查詢開銷。
🔄 調(diào)度器如何配合分桶?
原來調(diào)度器每秒只掃描一個 ZSet:
ZRANGEBYSCORE upload_timeout_tasks 0 currentTime
現(xiàn)在變成輪詢每個桶:
for i in 0..9:ZRANGEBYSCORE upload_timeout_task_bucket_i 0 currentTime
這樣每次每個桶掃描的數(shù)據(jù)量變小了,調(diào)度延遲更小、吞吐量更高,也方便并發(fā)處理。
? 總結(jié)
特性 | 說明 |
---|---|
🎯 目的 | 降低單個 Redis ZSet 的壓力,提高調(diào)度效率和查詢性能 |
?? 方式 | 將任務(wù)分散放入多個 ZSet(桶)中,按照哈希取模分配 |
🚀 優(yōu)勢 | 并發(fā)性能更強、查詢更快、調(diào)度更精準(zhǔn)、避免單點瓶頸 |
💡 適用場景 | 上傳分片延時清理、定時任務(wù)調(diào)度、過期資源管理等場景 |
Kafka
🧠 一、Kafka 是什么?
Kafka 是一個高吞吐、可持久化的分布式消息隊列系統(tǒng),主要特點:
特性 | 說明 |
---|---|
發(fā)布-訂閱模式 | 生產(chǎn)者發(fā)布消息,消費者訂閱處理消息 |
高吞吐 | 每秒處理百萬級消息 |
持久化 | 數(shù)據(jù)寫入磁盤,支持消息持久保存 |
可擴(kuò)展性 | 支持多 Broker 組成集群,水平擴(kuò)展 |
容錯性強 | 支持副本機制,節(jié)點宕機也不會丟消息 |
🧰 二、Kafka 在“文件片延遲清理”中的作用
在我們的系統(tǒng)中,用戶上傳文件是通過分片方式進(jìn)行的:
- 如果某些分片長時間未上傳完成,它們就一直占據(jù)服務(wù)器磁盤空間;
- 我們通過 Redis 的 ZSet 創(chuàng)建動態(tài)延時任務(wù),比如“10分鐘后檢查是否上傳完成”;
- 但延時任務(wù)并不直接清理磁盤,而是將清理請求發(fā)送到 Kafka 這個消息隊列中;
- Kafka 中的消費者再異步消費任務(wù),做清理、更新數(shù)據(jù)庫等工作。
🔄 三、流程圖示(簡化)
[用戶上傳文件片] ↓
[Redis ZSet 生成延時任務(wù)](上傳超時5分鐘)↓
[調(diào)度器掃描 ZSet,到期后將任務(wù)發(fā)送到 Kafka Topic]↓
[Kafka 消費者消費消息]↓
[清理臨時分片 + 更新 Redis/MySQL 空間使用數(shù)據(jù)]
📦 四、Kafka 的使用細(xì)節(jié)
1. Producer(生產(chǎn)者)發(fā)送消息
當(dāng)某個文件片超時未合并:
// 偽代碼:發(fā)送清理任務(wù)
kafkaTemplate.send("file-cleanup-topic", msg);
消息內(nèi)容一般包括:
- 用戶 ID
- 文件 MD5
- 分片路徑
- 文件大小
2. Consumer(消費者)消費清理任務(wù)
@KafkaListener(topics = "file-cleanup-topic")
public void cleanupHandler(String msgJson) {// 解析消息// 刪除磁盤中的分片// 更新 Redis 的未完成空間大小// 更新數(shù)據(jù)庫(最終一致性)
}
3. 冪等性保證
Kafka 提供:
-
消息持久化:不怕宕機
-
消息重復(fù)消費:你需要在處理邏輯中加入冪等性設(shè)計,比如:
- 先檢查文件是否已清理過;
- 或根據(jù)“唯一ID”判斷是否是同一任務(wù)。
? 五、使用 Kafka 的優(yōu)勢
優(yōu)勢 | 說明 |
---|---|
解耦調(diào)度與處理邏輯 | Redis 延時任務(wù)調(diào)度只負(fù)責(zé)“發(fā)通知”,真正清理由 Kafka 消費者異步處理 |
提升系統(tǒng)性能與可擴(kuò)展性 | 通過 Kafka 實現(xiàn)異步批量處理,避免同步阻塞 |
高可用保障 | Kafka 的持久化機制確保任務(wù)不會丟失 |
支持冪等處理 | 可以防止重復(fù)刪除、誤刪等操作 |
💬 一句話總結(jié):
在文件上傳場景中,為防止未完成分片長期占用磁盤空間,我們基于 Redis ZSet 構(gòu)建延遲任務(wù)調(diào)度機制,并結(jié)合 Kafka 進(jìn)行任務(wù)異步下發(fā)和消費,實現(xiàn)高吞吐、高可用的碎片清理架構(gòu),同時通過唯一任務(wù) ID 實現(xiàn)冪等處理與最終一致性保障。
系統(tǒng)性地解釋整個鏈路
🧭 場景背景
用戶上傳大文件時,會被切分為多個文件片。為防止用戶上傳未完成就退出(或惡意攻擊),我們使用Redis + Kafka架構(gòu),實現(xiàn):
- 文件片超時未合并則定時清理(延遲任務(wù))
- 上傳空間計算最終一致性(Redis緩存+MySQL同步)
🔄 整體流程圖
上傳分片 → Redis ZSet 記錄延時任務(wù) → 到期 → 發(fā)送 Kafka 消息 → Kafka 消費者執(zhí)行任務(wù)↓ ↓
更新 Redis 緩存使用量(未完成) 清理臨時文件 + 更新 Redis/MySQL 空間使用情況
🧪 一、如何將任務(wù)發(fā)送到 Kafka Topic?
我們使用 Spring Boot + Kafka 的集成方式(Spring for Apache Kafka)。
示例代碼(發(fā)送任務(wù)):
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;public void sendCleanupTask(String userId, String fileMd5, long unfinishedSize) {JSONObject task = new JSONObject();task.put("userId", userId);task.put("fileMd5", fileMd5);task.put("unfinishedSize", unfinishedSize);kafkaTemplate.send("file-cleanup-topic", task.toJSONString());
}
當(dāng) Redis 的 ZSet 檢測到任務(wù)到期(即當(dāng)前時間 > score),就調(diào)用該方法將任務(wù)發(fā)到 Kafka 的 file-cleanup-topic
主題中。
📥 二、Kafka 消費者如何消費消息?
你可以通過 @KafkaListener
注解監(jiān)聽某個 Topic:
@KafkaListener(topics = "file-cleanup-topic", groupId = "file-cleaner-group")
public void consumeCleanupTask(String messageJson) {JSONObject task = JSONObject.parseObject(messageJson);String userId = task.getString("userId");String fileMd5 = task.getString("fileMd5");long size = task.getLong("unfinishedSize");// 1. 刪除 Redis 中對應(yīng)的文件分片記錄redisTemplate.delete("chunk:" + userId + ":" + fileMd5);// 2. 刪除磁盤臨時分片fileService.deleteTempChunks(userId, fileMd5);// 3. 更新 Redis 中的 use_space_unfinished 字段redisTemplate.opsForHash().increment("user:" + userId, "use_space_unfinished", -size);// 4. 將最終結(jié)果異步持久化到數(shù)據(jù)庫userMapper.decreaseUnfinishedSize(userId, size);
}
🧠 三、Redis 與 MySQL 的數(shù)據(jù)更新方式
Redis 緩存的設(shè)計
我們使用 Hash 存儲用戶空間信息:
Key: user:{userId}
Field: use_space // 已上傳完畢的空間
Field: use_space_unfinished // 未上傳完畢的空間
更新邏輯:
- 每上傳一片:
use_space_unfinished += chunkSize
- 清理:
use_space_unfinished -= chunkSize
- 合并:
use_space_unfinished -= totalSize
,use_space += totalSize
MySQL 最終一致性(異步批量同步)
UPDATE user_space
SET use_space_unfinished = use_space_unfinished - #{size}
WHERE user_id = #{userId};
這一步是為了防止 Redis 異常丟失數(shù)據(jù)時,系統(tǒng)還能恢復(fù)一致性。
? 總結(jié)一下完整鏈路
步驟 | 描述 |
---|---|
1. 上傳分片 | 用戶上傳某個分片時,記錄上傳大小,更新 Redis 中的 use_space_unfinished |
2. 創(chuàng)建延遲任務(wù) | 使用 Redis ZSet 記錄(fileMd5+userId)+ 上傳時間戳 |
3. 定時掃描任務(wù) | 到期后調(diào)用 KafkaTemplate.send() 發(fā)送清理任務(wù)到 Kafka |
4. Kafka 消費者處理 | 監(jiān)聽 topic,執(zhí)行任務(wù):清文件 + 更新 Redis + 更新數(shù)據(jù)庫 |
5. 最終一致性 | Redis 快速緩存寫,MySQL 異步持久化,確保數(shù)據(jù)準(zhǔn)確 |
💬 一句話總結(jié):
為了防止文件片上傳未完成導(dǎo)致磁盤資源被長期占用,我們使用 Redis ZSet 實現(xiàn)延時任務(wù)調(diào)度,通過 Kafka 實現(xiàn)任務(wù)異步消費。Redis 記錄用戶未完成空間信息以減輕數(shù)據(jù)庫壓力,Kafka 消費者在任務(wù)觸發(fā)后清理磁盤分片并更新 Redis 與數(shù)據(jù)庫,最終實現(xiàn)空間使用信息的一致性同步和系統(tǒng)高性能處理。