抄襲網(wǎng)站后臺會侵權(quán)嗎瀏覽器打開網(wǎng)站
文章目錄
- 整體架構(gòu)
- 元數(shù)據(jù)更新
整體架構(gòu)
消息在真正發(fā)往Kafka之前,有可能需要經(jīng)歷攔截器(Interceptor)、序列化器(Serializer)和分區(qū)器(Partitioner)等一系列的作用,那么在此之后又會發(fā)生什么呢?下面我們來看一下生產(chǎn)者客戶端的整體架構(gòu),如圖所示。
整個生產(chǎn)者客戶端由兩個線程協(xié)調(diào)運行,這兩個線程分別為主線程和Sender線程(發(fā)送線程)。在主線程中由KafkaProducer創(chuàng)建消息,然后通過可能的攔截器、序列化器和分區(qū)器的作用之后緩存到消息累加器(RecordAccumulator,也稱為消息收集器)中。Sender線程負責從RecordAccumulator中獲取消息并將其發(fā)送到Kafka中。
RecordAccumulator主要用來緩存消息以便Sender線程可以批量發(fā)送,進而減少網(wǎng)絡(luò)傳輸?shù)馁Y源消耗以提升性能。RecordAccumulator緩存的大小可以通過生產(chǎn)者客戶端參數(shù)buffer.memory配置,默認值為33554432B,即32MB。如果生產(chǎn)者發(fā)送消息的速度超過發(fā)送到服務(wù)器的速度,則會導致生產(chǎn)者空間不足,這個時候KafkaProducer的send0方法調(diào)用要么被阻塞,要么拋出異常,這個取決于參數(shù)max.b1ock.ms的配置,此參數(shù)的默認值為60000,即60秒。
主線程中發(fā)送過來的消息都會被追加到RecordAccumulator的某個雙端隊列(Deque)中,在RecordAccumulator的內(nèi)部為每個分區(qū)都維護了一個雙端隊列,隊列中的內(nèi)容就是ProducerBatch,即Deque。消息寫入緩存時,追加到雙端隊列的尾部;Sender讀取消息時,從雙端隊列的頭部讀取。注意ProducerBatch不是ProducerRecord,ProducerBatch中可以包含一至多個ProducerRecord。通俗地說,ProducerRecord是生產(chǎn)者中創(chuàng)建的消息,而ProducerBatch是指一個消息批次,ProducerRecord會被包含在ProducerBatch中,這樣可以使字節(jié)的使用更加緊漆。與此同時,將較小的ProducerRecord拼漆成一個較大的ProducerBatch,也可以減少網(wǎng)絡(luò)請求的次數(shù)以提升整體的吞吐量。ProducerBatch和消息的具體格式有關(guān)。如果生產(chǎn)者客戶端需要向很多分區(qū)發(fā)送消息,則可以將buffer.memory參數(shù)適當調(diào)大以增加整體的吞吐量。
消息在網(wǎng)絡(luò)上都是以字節(jié)(Byte)的形式傳輸?shù)?#xff0c;在發(fā)送之前需要創(chuàng)建一塊內(nèi)存區(qū)域來保存對應(yīng)的消息。在Kafka生產(chǎn)者客戶端中,通過java.io.ByteBuffer實現(xiàn)消息內(nèi)存的創(chuàng)建和釋放。不過頻繁的創(chuàng)建和釋放是比較耗費資源的,在RecordAccumulator的內(nèi)部還有一個BufferPool,它主要用來實現(xiàn)ByteBuffer的復用,以實現(xiàn)緩存的高效利用。不過BufferPool只針對特定大小的ByteBuffer進行管理,而其他大小的ByteBuffer不會緩存進BufferPool中,這個特定的大小由batch.size參數(shù)來指定,默認值為16384B,即16KB。我們可以適當?shù)卣{(diào)大batch.size參數(shù)以便多緩存一些消息。
ProducerBatch的大小和batch.size參數(shù)也有著密切的關(guān)系。當一條消息(ProducerRecord)流入RecordAccumulator時,會先尋找與消息分區(qū)所對應(yīng)的雙端隊列(如果沒有則新建),再從這個雙端隊列的尾部獲取一個ProducerBatch(如果沒有則新建),查看ProducerBatch中是否還可以寫入這個ProducerRecord,如果可以則寫入,如果不可以則需要創(chuàng)建一個新的ProducerBatch。在新建ProducerBatch時評估這條消息的大小是否超過batch.size參數(shù)的大小,如果不超過,那么就以batch.size參數(shù)的大小來創(chuàng)建ProducerBatch,這樣在使用完這段內(nèi)存區(qū)域之后,可以通過BufferPool的管理來進行復用;如果超過,那么就以評估的大小來創(chuàng)建ProducerBatch,這段內(nèi)存區(qū)域不會被復用。
Sender從RecordAccumulator中獲取緩存的消息之后,會進一步將原本<分區(qū),Deque>的保存形式轉(zhuǎn)變成<Node,ListProducerBatch>的形式,其中Node表示Kafka集群的broker節(jié)點。對于網(wǎng)絡(luò)連接來說,生產(chǎn)者客戶端是與具體的broker節(jié)點建立的連接,也就是向具體的broker節(jié)點發(fā)送消息,而并不關(guān)心消息屬于哪一個分區(qū);而對于KafkaProducer的應(yīng)用邏輯而言,我們只關(guān)注向哪個分區(qū)中發(fā)送哪些消息,所以在這單需要做一個應(yīng)用邏輯層面到網(wǎng)絡(luò)IO層面的轉(zhuǎn)換。
在轉(zhuǎn)換成<Node,ListProducerBatch>>的形式之后,Sender還會進一步封裝成<Node,Request>的形式,這樣就可以將Request請求發(fā)往各個Node了,這里的Request是指Kafka的各種協(xié)議請求,對于消息發(fā)送而言就是指具體的ProduceRequest。
請求在從Sender線程發(fā)往Kafka之前還會保存到InFlightRequests中,InFlightRequests保存對象的具體形式為Map<NodeId,Deque>,它的主要作用是緩存了已經(jīng)發(fā)出去但還沒有收到響應(yīng)的請求(NodeId是一個String類型,表示節(jié)點的id編號)。與此同時,InFlightRequests還提供了許多管理類的方法,并且通過配置參數(shù)還可以限制每個連接(也就是客戶端與Node之間的連接)最多緩存的請求數(shù)。這個配置參數(shù)為max.in.flight.requests.per.connection,默認值為5,即每個連接最多只能緩存5個未響應(yīng)的請求,超過該數(shù)值之后就不能再向這個連接發(fā)送更多的請求了,除非有緩存的請求收到了響應(yīng)(Response)。通過比較Deque的size與這個參數(shù)的大小來判斷對應(yīng)的Node中是否已經(jīng)堆積了很多未響應(yīng)的消息,如果真是如此,那么說明這個Node節(jié)點負載較大或網(wǎng)絡(luò)連接有問題,再繼續(xù)向其發(fā)送請求會增大請求超時的可能。
元數(shù)據(jù)更新
上面提及的InFlightRequests還可以獲得leastLoadedNode,即所有Node中負載最小的那一個。這里的負載最小是通過每個Node在InFlightRequests中還未確認的請求決定的,未確認的請求越多則認為負載越大。對于圖中的InFlightRequests來說,圖中展示了三個節(jié)點Node0、Node1和Node2,很明顯Node1的負載最小。也就是說,Node1為當前的leastLoadedNodec選擇leastLoadedNode發(fā)送請求可以使它能夠盡快發(fā)出,避免因網(wǎng)絡(luò)擁塞等異常而影響整體的進度。leastLoadedNode的概念可以用于多個應(yīng)用場合,比如元數(shù)據(jù)請求、消費者組播協(xié)議的交互。
我們只知道主題的名稱,對于其他一些必要的信息卻一無所知。KafkaProducer要將此消息追加到指定主題的某個分區(qū)所對應(yīng)的leader副本之前,首先需要知道主題的分區(qū)數(shù)量,然后經(jīng)過計算得出(或者直接指定)目標分區(qū),之后KafkaProducer需要知道目標分區(qū)的leader副本所在的broker節(jié)點的地址、端口等信息才能建立連接,最終才能將消息發(fā)送到Kafka,在這一過程中所需要的信息都屬于元數(shù)據(jù)信息。
在上面的講解中我們了解了bootstrap.servers參數(shù)只需要配置部分broker節(jié)點的地址即可,不需要配置所有broker節(jié)點的地址,因為客戶端可以自己發(fā)現(xiàn)其他broker節(jié)點的地址,這一過程也屬于元數(shù)據(jù)相關(guān)的更新操作。與此同時,分區(qū)數(shù)量及l(fā)eader副本的分布都會動態(tài)地變化,客戶端也需要動態(tài)地捕捉這些變化。
元數(shù)據(jù)是指Kafka集群的元數(shù)據(jù),這些元數(shù)據(jù)具體記錄了集群中有哪些主題,這些主題有哪些分區(qū),每個分區(qū)的leader副本分配在哪個節(jié)點上,follower副本分配在哪些節(jié)點上,哪些副本在AR、ISR等集合中,集群中有哪些節(jié)點,控制器節(jié)點又是哪一個等信息。
當客戶端中沒有需要使用的元數(shù)據(jù)信息時,比如沒有指定的主題信息,或者超過metadata.max.age.ms時間沒有更新元數(shù)據(jù)都會引起元數(shù)據(jù)的更新操作??蛻舳藚?shù)metadata.max.age.ms的默認值為300000,即5分鐘。元數(shù)據(jù)的更新操作是在客戶端內(nèi)部進行的,對客戶端的外部使用者不可見。當需要更新元數(shù)據(jù)時,會先挑選出leastLoadedNode,然后向這個Node發(fā)送MetadataRequest請求來獲取具體的元數(shù)據(jù)信息。這個更新操作是由Sender線程發(fā)起的,在創(chuàng)建完MetadataRequest之后同樣會存入InFlightRequests,之后的步驟就和發(fā)送消息時的類似。元數(shù)據(jù)雖然由Sender線程負責更新,但是主線程也需要讀取這些信息,這里的數(shù)據(jù)同步通過synchronized和final關(guān)鍵字來保障。