網(wǎng)站建設(shè)與管理課程長沙排名優(yōu)化公司
Kafka 位移主題
- 位移格式
- 創(chuàng)建位移
- 提交位移
- 刪除位移
Kafka 的內(nèi)部主題 (Internal Topic) : __consumer_offsets
(位移主題,Offsets Topic)
老 Consumer 會將位移消息提交到 ZK 中保存
- 當 Consumer 重啟后,能自動從 ZK 中讀取位移數(shù)據(jù),繼續(xù)消費上次位置
- Broker 不用保存位移數(shù)據(jù),減少 Broker 開銷
- 但 ZK 不適合大量寫操作
新 Consumer 推出了位移管理機制 :
- 將 Consumer 的位移數(shù)據(jù)提交到
__consumer_offsets
中 __consumer_offsets
作用 : 保存 Kafka 消費者的位移信息
位移格式
__consumer_offsets
的消息格式是 Kafka 自定義
- 不要隨意向該主題寫消息,可能會造成 Broker 掛
- Consumer API 會自動向位移主題寫消息
位移主題的 3 種消息格式 :
- Key/Value 分別為消息鍵/消息體
- 保存 Consumer Group 信息的消息 : 用來注冊 Consumer Group
- 刪除 Group 過期位移 , 刪除 Group 的消息
Key/Value 結(jié)構(gòu) :
- Key 的 3 部分:
<Group ID, 主題名, 分區(qū)號>
- Value 有:時間戳 , 用戶自定義的數(shù)據(jù) , 位移值
刪除 Group 消息 :
- tombstone 消息 (墓碑消息 , delete mark) :特點 : 空消息體 , 消息體是 null
- 當某個 Group 下的所有 Consumer 都停止,且位移數(shù)據(jù)都已被刪除 (徹底刪除該 Group) :Kafka 會向位移主題的對應(yīng)分區(qū)寫入 tombstone 消息
創(chuàng)建位移
位移主題自動創(chuàng)建 :
- 當 Kafka 的第一個 Consumer 啟動時,Kafka 會自動創(chuàng)建位移主題
- 位移主題的分區(qū)數(shù) :
offsets.topic.num.partitions
,默認值 : 50 - 副本數(shù) :
offsets.topic.replication.factor
, 默認值 : 3
Kafka 日志路徑下會有很多 __consumer_offsets-xxx
的目錄
- Kafka 創(chuàng)建的位移主題
手動創(chuàng)建位移主題 :不建議 (bug 代碼有硬編碼 50 )
提交位移
Consumer 提交位移方式:
- 自動提交位移
- 手動提交位移
自動提交位移 :
- Consumer 在后臺定期提交位移
- 自動提交 :
enable.auto.commit=true
- 提交間隔 :
auto.commit.interval.ms
- 優(yōu)點 : 不用管位移提交,就能保證消息消費不會丟失
- 缺點 : 沒法把控 Consumer 端的位移管理 ; 只要 Consumer 啟動 , 就會不斷向位移主題寫入消息
與 Kafka 集成的框架都禁用手動提交位移
enable.auto.commit = false
- Consumer 用
consumer.commitSync
,向位移主題寫入相應(yīng)的消息
自動提交位移的問題例子 :
- Consumer 消費到某個主題的最新一條消息 (位移 : 100)
- 之后該主題沒有新消息產(chǎn)生,所以 Consumer 無消息可消費,則位移一直是 100
- 而自動提交位移,向位移主題中不斷寫位移 =100
刪除位移
Compaction : Kafka 刪除位移主題的過期消息
Kafka 用后臺線程 (Log Cleaner) 定期檢查 Compact 的主題,判斷是否有可刪除數(shù)據(jù)
- 當位移主題占用過多磁盤時,建議檢查 Log Cleaner 線程的狀態(tài)
Compact 過期策略 :
- 同個 Key 的兩條消息 M1 和 M2,當 M1 發(fā)送時間早于 M2,那 M1 為過期消息
Compact 過程 :
- 掃描日志的所有消息,剔除那些過期的消息,把剩下的消息整理在一起
- 位移為 0、2 和 3 的消息的 Key 都是 K1,Compact 后,只會保存位移為 3 的消息