網(wǎng)站域名所有人東莞網(wǎng)站建設(shè)平臺(tái)
目錄
1. 事務(wù)核心組件
1.1 冪等性生產(chǎn)者(Idempotent Producer)
1.2 事務(wù)協(xié)調(diào)器(TransactionCoordinator)
1.3 事務(wù)日志(Transaction Log)
2. 事務(wù)執(zhí)行流程
2.1 事務(wù)初始化
2.2 發(fā)送消息
2.3 事務(wù)提交(兩階段提交)
2.4 事務(wù)完成
3. 消費(fèi)者事務(wù)隔離
3.1 隔離級(jí)別
3.2 實(shí)現(xiàn)機(jī)制
4. 異常處理與容錯(cuò)
4.1 生產(chǎn)者宕機(jī)
4.2 協(xié)調(diào)器宕機(jī)
4.3 Broker宕機(jī)
5. 關(guān)鍵源碼解析
5.1 事務(wù)協(xié)調(diào)器核心邏輯
5.2 兩階段提交實(shí)現(xiàn)
5.3 消費(fèi)者過(guò)濾未提交消息
6. 事務(wù)配置與使用
6.1 生產(chǎn)者配置
6.2 消費(fèi)者配置
7. 事務(wù)性能與限制
總結(jié)
- 冪等生產(chǎn)者:通過(guò)
ProducerID
和SequenceNumber
去重,避免消息重復(fù)(源碼見(jiàn)ProducerStateManager
)。- 事務(wù)協(xié)調(diào)器(TransactionCoordinator):
- 每個(gè)事務(wù)綁定一個(gè)Coordinator,處理
BEGIN_TRANSACTION
、COMMIT
/ABORT
請(qǐng)求。- 事務(wù)狀態(tài)存儲(chǔ)在內(nèi)部Topic
__transaction_state
中(通過(guò)TransactionStateManager
管理)。
- 兩階段提交:
- 階段1:標(biāo)記事務(wù)為“預(yù)提交”,寫(xiě)入所有參與分區(qū)的數(shù)據(jù)。
- 階段2:寫(xiě)入
COMMIT
標(biāo)記到事務(wù)日志,消費(fèi)者僅可見(jiàn)已提交的事務(wù)消息。
Kafka事務(wù)機(jī)制通過(guò)冪等性生產(chǎn)者、事務(wù)協(xié)調(diào)器(TransactionCoordinator) 和 兩階段提交(2PC) 實(shí)現(xiàn)跨分區(qū)的原子性寫(xiě)入,確保消息要么全部提交,要么全部丟棄。以下是核心實(shí)現(xiàn)機(jī)制:
1. 事務(wù)核心組件
1.1 冪等性生產(chǎn)者(Idempotent Producer)
- 作用:確保單分區(qū)內(nèi)消息不重復(fù)。
- 實(shí)現(xiàn)機(jī)制:
-
- PID(Producer ID):每個(gè)生產(chǎn)者實(shí)例唯一,由Broker分配。
- Sequence Number:每個(gè)消息的單調(diào)遞增序列號(hào),Broker校驗(yàn)序列號(hào)連續(xù)性。
- 源碼類(lèi):
ProducerStateManager
(管理PID與序列號(hào))。
1.2 事務(wù)協(xié)調(diào)器(TransactionCoordinator)
- 作用:管理事務(wù)生命周期,協(xié)調(diào)事務(wù)提交或中止。
- 實(shí)現(xiàn)機(jī)制:
-
- 每個(gè)事務(wù)綁定一個(gè)協(xié)調(diào)器(通過(guò)事務(wù)ID哈希選擇Broker)。
- 維護(hù)事務(wù)狀態(tài)機(jī)(
TransactionState
),存儲(chǔ)在內(nèi)部Topic__transaction_state
。 - 源碼類(lèi):
TransactionCoordinator
、TransactionStateManager
。
1.3 事務(wù)日志(Transaction Log)
- 作用:持久化事務(wù)狀態(tài),防止協(xié)調(diào)器宕機(jī)后數(shù)據(jù)丟失。
- 存儲(chǔ)位置:內(nèi)部Topic
__transaction_state
,每個(gè)分區(qū)對(duì)應(yīng)一個(gè)協(xié)調(diào)器。 - 數(shù)據(jù)格式:事務(wù)ID、PID、狀態(tài)(
PrepareCommit
、Completed
等)、超時(shí)時(shí)間。
2. 事務(wù)執(zhí)行流程
2.1 事務(wù)初始化
- 生產(chǎn)者初始化事務(wù):
-
- 調(diào)用
initTransactions()
,向協(xié)調(diào)器注冊(cè)事務(wù)ID,獲取PID。 - 協(xié)調(diào)器在
__transaction_state
中記錄事務(wù)元數(shù)據(jù)。
- 調(diào)用
2.2 發(fā)送消息
- 發(fā)送事務(wù)消息:
-
- 生產(chǎn)者發(fā)送消息時(shí)攜帶PID、序列號(hào)、事務(wù)ID。
- Broker將消息寫(xiě)入日志,但標(biāo)記為未提交(對(duì)消費(fèi)者不可見(jiàn))。
2.3 事務(wù)提交(兩階段提交)
- 階段1:Prepare Commit
生產(chǎn)者向協(xié)調(diào)器發(fā)送EndTxnRequest
,協(xié)調(diào)器將事務(wù)狀態(tài)置為PrepareCommit
,并持久化到事務(wù)日志。 - 階段2:Commit Markers寫(xiě)入
協(xié)調(diào)器向所有涉及的分區(qū)Leader發(fā)送WriteTxnMarkers
請(qǐng)求,Leader在日志中寫(xiě)入事務(wù)提交標(biāo)記(Control Batch)。
2.4 事務(wù)完成
- Broker將事務(wù)消息標(biāo)記為已提交,消費(fèi)者可讀取(需配置
isolation.level=read_committed
)。
3. 消費(fèi)者事務(wù)隔離
3.1 隔離級(jí)別
read_committed
:僅消費(fèi)已提交的事務(wù)消息(跳過(guò)未提交的Control Batch)。read_uncommitted
:消費(fèi)所有消息(默認(rèn)模式,不保證事務(wù)原子性)。
3.2 實(shí)現(xiàn)機(jī)制
- 消費(fèi)者在拉取消息時(shí),Broker根據(jù)隔離級(jí)別過(guò)濾未提交的事務(wù)消息。
- 源碼邏輯:
KafkaConsumer
的fetcher
模塊解析Control Batch,決定是否跳過(guò)消息。
4. 異常處理與容錯(cuò)
4.1 生產(chǎn)者宕機(jī)
- 事務(wù)超時(shí)(
transaction.timeout.ms
):協(xié)調(diào)器自動(dòng)中止未完成的事務(wù)。 - 新生產(chǎn)者實(shí)例需重新初始化事務(wù),舊事務(wù)狀態(tài)由協(xié)調(diào)器清理。
4.2 協(xié)調(diào)器宕機(jī)
- 事務(wù)日志持久化在
__transaction_state
,新協(xié)調(diào)器加載日志恢復(fù)狀態(tài)。
4.3 Broker宕機(jī)
- 副本機(jī)制保證事務(wù)日志和消息日志的高可用,Leader切換后繼續(xù)處理事務(wù)。
5. 關(guān)鍵源碼解析
5.1 事務(wù)協(xié)調(diào)器核心邏輯
//事務(wù)狀態(tài)管理(TransactionStateManager)
public class TransactionStateManager {// 持久化事務(wù)狀態(tài)到__transaction_statedef appendTransactionToLog(transactionState: TransactionState): Unit = {val records = List(new SimpleRecord(transactionState.toBytes))transactionLog.append(records)}
}
5.2 兩階段提交實(shí)現(xiàn)
// 協(xié)調(diào)器發(fā)送提交標(biāo)記(TransactionCoordinator)
private def sendTxnMarkers(transactionState: TransactionState): Unit = {
// 向所有分區(qū)Leader發(fā)送WriteTxnMarkersRequest
transactionState.partitions.foreach { partition =>val request = new WriteTxnMarkersRequest.Builder(partition, Commit)sendRequestToLeader(request)}
}
5.3 消費(fèi)者過(guò)濾未提交消息
// 消費(fèi)者拉取消息過(guò)濾(ConsumerFetcherThread)
private def filterCommittedMessages(records: ConsumerRecords): ConsumerRecords = {
if (isolationLevel == IsolationLevel.READ_COMMITTED) {records.filter(_.controlBatchType != ControlBatchType.ABORT)
} else {records
}
}
6. 事務(wù)配置與使用
6.1 生產(chǎn)者配置
props.put("enable.idempotence", "true"); // 開(kāi)啟冪等性
props.put("transactional.id", "my-tx-id"); // 必須設(shè)置事務(wù)ID
props.put("transaction.timeout.ms", "60000"); // 事務(wù)超時(shí)時(shí)間
6.2 消費(fèi)者配置
props.put("isolation.level", "read_committed"); // 僅消費(fèi)已提交消息
7. 事務(wù)性能與限制
- 性能開(kāi)銷(xiāo):事務(wù)引入兩階段提交和日志持久化,吞吐量下降約20%-30%。
- 限制:
-
- 事務(wù)僅支持單會(huì)話(huà)(一個(gè)生產(chǎn)者實(shí)例)。
- 事務(wù)消息的消費(fèi)者必須使用Kafka Consumer API(不支持舊版基于ZooKeeper的消費(fèi)者)。
總結(jié)
Kafka事務(wù)通過(guò)以下機(jī)制實(shí)現(xiàn)跨分區(qū)的原子性:
- 冪等性生產(chǎn)者:避免單分區(qū)消息重復(fù)。
- 事務(wù)協(xié)調(diào)器與兩階段提交:確保所有分區(qū)要么全部提交,要么全部回滾。
- 事務(wù)日志持久化:保障協(xié)調(diào)器故障恢復(fù)后狀態(tài)一致。
- 消費(fèi)者隔離級(jí)別:控制事務(wù)消息的可見(jiàn)性。
正確配置后,Kafka事務(wù)可支持金融級(jí)場(chǎng)景的精確一次(Exactly-Once)語(yǔ)義