仿新浪微博網(wǎng)站代碼推廣資源網(wǎng)
kafka的處理的一些問(wèn)題
- 消費(fèi)者客戶端不但沒(méi)有背壓而且內(nèi)存充足,但產(chǎn)生的消費(fèi)延遲越來(lái)越大
- 在Kafka的Leader副本宕機(jī)時(shí)
消費(fèi)者客戶端不但沒(méi)有背壓而且內(nèi)存充足,但產(chǎn)生的消費(fèi)延遲越來(lái)越大
比如我們這個(gè)kakfa集群一共有3個(gè)Broker節(jié)點(diǎn)
TOp1有5個(gè)分區(qū),P0、P1、P2、P3、P4,這些分區(qū)分布在3個(gè)不同Broker節(jié)點(diǎn)上,而我們創(chuàng)建了包含兩個(gè)消費(fèi)者的消費(fèi)者組。
消費(fèi)者1同時(shí)消費(fèi)P0、P1和P4分區(qū)的數(shù)據(jù)。
消費(fèi)者2消費(fèi)P2和P3分區(qū)的數(shù)據(jù)
看到消費(fèi)延遲,大家想去就是增加消費(fèi)者數(shù)量和分區(qū)數(shù)量,讓我消費(fèi)者數(shù)量增加到和Partition的數(shù)量一樣多,這樣每個(gè)消費(fèi)者就可以僅僅消費(fèi)一個(gè)分區(qū)的數(shù)據(jù),可以達(dá)到消費(fèi)能力1最大化 。
了解消費(fèi)者背后的執(zhí)行原理。該如何優(yōu)化消費(fèi)者消費(fèi)數(shù)據(jù)的吞吐量。
消費(fèi)者在調(diào)用poll()方法到遠(yuǎn)端的Broker節(jié)點(diǎn)拉去數(shù)據(jù)時(shí)。優(yōu)先從nextInLineFetch中獲取數(shù)據(jù),這個(gè)nextInLineFetch就是數(shù)據(jù)接收緩沖區(qū),
如果數(shù)據(jù)接收緩沖區(qū)中沒(méi)有待消費(fèi)的數(shù)據(jù),這個(gè)時(shí)候才會(huì)調(diào)用SendFetches方法,到Broker端拉去數(shù)據(jù),
kafka是向響應(yīng)的Broker節(jié)點(diǎn)發(fā)送拉取數(shù)據(jù)的網(wǎng)絡(luò)請(qǐng)求,我們都知道網(wǎng)路請(qǐng)求對(duì)于內(nèi)存請(qǐng)求是比較慢的,因此這些拉取數(shù)據(jù)的網(wǎng)絡(luò)請(qǐng)求是由Broker端異步執(zhí)行的,異步執(zhí)行拉取數(shù)據(jù)請(qǐng)求,就必須通過(guò)future監(jiān)聽(tīng)數(shù)據(jù)是否已經(jīng)準(zhǔn)備好,當(dāng)數(shù)據(jù)準(zhǔn)備好之后,會(huì)異步將數(shù)放到數(shù)據(jù)接收緩存completedFetches中,
這是因?yàn)镮O請(qǐng)求比較耗時(shí),所以盡量一次批量拉取更多的數(shù)據(jù)放到緩存中,這樣就可以降低發(fā)起網(wǎng)絡(luò)的IO次數(shù),進(jìn)而提升消費(fèi)能力,現(xiàn)在緩沖區(qū)completedFetches中已經(jīng)有數(shù)據(jù)了,就會(huì)把completedFetches中隊(duì)頭的數(shù)據(jù)解析到nextInLineFetch中
解析成消費(fèi)者可以消費(fèi)的數(shù)據(jù)格式,然后清除completedFetches中隊(duì)頭的元素。
隨后如果有消費(fèi)調(diào)用poll()方法拉取數(shù),就會(huì)優(yōu)先從nextInLineFetch中獲取數(shù)據(jù),注意,消費(fèi)者客戶端每次獲取的數(shù)據(jù)量是由參數(shù) max.poll.records控制的,默認(rèn)值是500。 相當(dāng)于每次從nextInLineFetch獲取500條數(shù)據(jù)并返回給消費(fèi)者。
當(dāng)消費(fèi)者消費(fèi)完500條數(shù)據(jù)之后,會(huì)再次調(diào)用poll()方法,
再拉取500條數(shù)據(jù) ,當(dāng)消費(fèi)者把nextlnLineFetch緩存的數(shù)據(jù)都消費(fèi)完之后,相當(dāng)于再調(diào)用poll()方式時(shí),nextInLineFetch已經(jīng)咩有待消費(fèi)的數(shù)據(jù)了,這個(gè)時(shí)候,就會(huì)把completedFetch的新的隊(duì)頭元素解析解析成nextInLineFetch。可以適當(dāng)?shù)膶⒃搮?shù)增加到16KB或者32KB
而參數(shù)fetch.max.bytes標(biāo)識(shí)每次poll操作,從Broker端最多拉取數(shù)據(jù)量,默認(rèn)值時(shí)50MB,如果我們內(nèi)存資源充足,建議增大fetch.max.bytes
增加到200MB以上.參數(shù)max.partition.fetch.bytes的默認(rèn)值是1MB。表示每次poll返回的,每個(gè)Broker節(jié)點(diǎn)上每個(gè)分區(qū)的最大字節(jié)數(shù)。因此我們?cè)倩仡^看這個(gè)例子。
那么每次從Broker-102上最多能拉取到的數(shù)據(jù)也就是1MB。數(shù)據(jù)量未免太小了,有的時(shí)候剛消費(fèi)完1MB,就得再次經(jīng)過(guò)一次網(wǎng)絡(luò)IO拉取下一批數(shù)據(jù),這可能是造成消費(fèi)延遲的主要原因。大家可以根據(jù)自己的Topic的實(shí)際分區(qū)數(shù),來(lái)合理設(shè)置每個(gè)分區(qū)每次拉取數(shù)據(jù)的大小,因此建議可以將每個(gè)分區(qū)每次拉取數(shù)據(jù)的大小設(shè)置成10MB以上。 max.partition.fetch.bytes增加到10MB以上
但有的時(shí)候只是提高每個(gè)分區(qū)每次最大拉取到的數(shù)量也是不夠的,因?yàn)槊總€(gè)Broker最多返回的最大字節(jié)數(shù)由參數(shù)fetch.max.bytes控制,這個(gè)參數(shù)的默認(rèn)值是50MB,有時(shí)候也可以適當(dāng)?shù)奶嵘@個(gè)參數(shù)的默認(rèn)值,比如增加到200MB。
這樣就能再本地盡量緩存更多的數(shù)據(jù),以提升消費(fèi)者消費(fèi)數(shù)據(jù)的能力,降低消費(fèi)延遲,主要適用于內(nèi)存充足,你消費(fèi)能力不足的場(chǎng)景,
消費(fèi)客戶端根本不能修改啦這個(gè)參數(shù),因?yàn)樵O(shè)置了靜態(tài)的