wordpress 仿虎嗅主題seo百度推廣
1 Kafka消費(fèi)方式
(1)pull(拉)模式:消費(fèi)者從broker中主動(dòng)拉取數(shù)據(jù)。(Kafka中使用)
不足:如果Kafka中沒(méi)有數(shù)據(jù),消費(fèi)者可能會(huì)陷入循環(huán),一直返回空數(shù)據(jù)。
(2)push(推)模式:Kafka中不適用此種方式,因?yàn)閎roker決定消息發(fā)送速率,很難適應(yīng)所有消費(fèi)者的消費(fèi)速度。
2 Kafka消費(fèi)者工作流程
2.1 消費(fèi)者總體工作流程
(1)生產(chǎn)者向分區(qū)中的每個(gè)Leader發(fā)送一批批的數(shù)據(jù)。
(2)Follower主動(dòng)與Leader同步數(shù)據(jù),保證數(shù)據(jù)的可靠性。
(3)消費(fèi)者可以消費(fèi)某一個(gè)分區(qū)的數(shù)據(jù),一個(gè)消費(fèi)者也可以消費(fèi)多個(gè)分區(qū)的數(shù)據(jù),消費(fèi)者與消費(fèi)者之間是完全獨(dú)立的。
(4)每一個(gè)分區(qū)的數(shù)據(jù)只能由消費(fèi)者組中的一個(gè)消費(fèi)者進(jìn)行消費(fèi)。(把消費(fèi)者組當(dāng)成一個(gè)獨(dú)立的消費(fèi)者,同一個(gè)分區(qū)不能由同一個(gè)消費(fèi)者組里面兩個(gè)及以上的消費(fèi)者消費(fèi))
(5)消費(fèi)到哪里的具體位置為offset,offset保存在系統(tǒng)主題_consumer_offsets中。(Kafka的底層數(shù)據(jù)是持久化到磁盤(pán)上)
2.2 消費(fèi)者組原理
??Consumer Group(CG):消費(fèi)者組,由多個(gè)consumer組成。形成一個(gè)消費(fèi)者組的條件,是所有消費(fèi)者的groupid相同。
(1)消費(fèi)者組內(nèi)每個(gè)消費(fèi)者負(fù)責(zé)消費(fèi)不同分區(qū)的數(shù)據(jù),一個(gè)分區(qū)只能由一個(gè)組內(nèi)消費(fèi)者消費(fèi)。
(2)消費(fèi)者組之間互不影響。所有的消費(fèi)者都屬于某個(gè)消費(fèi)者組,即消費(fèi)者組是邏輯上的一個(gè)訂閱者。
(3)如果向消費(fèi)組中的消費(fèi)者數(shù)超過(guò)主題分區(qū)數(shù)量,則有一部分消費(fèi)者就會(huì)閑置,不會(huì)接收任何消息。
??coordinator:輔助實(shí)現(xiàn)消費(fèi)者組的初始化和分區(qū)的分配。
??coordinator節(jié)點(diǎn)選擇 = groupid(寫(xiě)代碼時(shí)手動(dòng)給的)的hashcode值 % 50(consumer_offsets的分區(qū)數(shù)量)
例如: groupid的hashcode值 = 1,1% 50 = 1,那么 consumer_offsets 主題的1號(hào)分區(qū),在哪個(gè)broker上,就選擇這個(gè)節(jié)點(diǎn)的coordinator作為這個(gè)消費(fèi)者組的老大。消費(fèi)者組下的所有的消費(fèi)者提交offset的時(shí)候就往這個(gè)分區(qū)去提交offset。
消費(fèi)者組初始化流程:
(0)生產(chǎn)者把數(shù)據(jù)發(fā)送到Kafka集群,選擇節(jié)點(diǎn)的coordinator。
(1)每個(gè)消費(fèi)者都往選出的coordinator發(fā)送請(qǐng)求,表示要加入到組當(dāng)中。
(2)coordinator會(huì)從消費(fèi)者中選出一個(gè)消費(fèi)者作為L(zhǎng)eader。
(3)coordinator會(huì)把收集到的所有topic信息都發(fā)送給消費(fèi)者的Leader。
(4)Leader制定消費(fèi)方案。
(5)制定計(jì)劃后,Leader將消費(fèi)方案發(fā)給coordinator。
(6)coordinator把消費(fèi)方案下發(fā)給各個(gè)消費(fèi)者。
(7)每個(gè)消費(fèi)者會(huì)定期給coordinator發(fā)送心跳反應(yīng)(默認(rèn)3s),**一旦超時(shí)(session.timeout.ms=45s)則該消費(fèi)者會(huì)被移除并觸發(fā)再平衡,別的消費(fèi)者繼續(xù)完成接下來(lái)的任務(wù);或消費(fèi)者處理消息的時(shí)間過(guò)長(zhǎng)(max.poil.interval.ms=5分鐘)**也會(huì)觸發(fā)再平衡。
消費(fèi)者組詳細(xì)消費(fèi)流程:
(1)消費(fèi)者組創(chuàng)建消費(fèi)者網(wǎng)絡(luò)連接客戶(hù)端,主要用于與Kafka集群進(jìn)行交會(huì)。
(2)消費(fèi)者調(diào)用sendFetches方法用于抓取數(shù)據(jù)的初始化。
(3)消費(fèi)者網(wǎng)絡(luò)連接客戶(hù)端調(diào)用send方法發(fā)送請(qǐng)求。
(4)Leader通過(guò)回調(diào)方法onSuccess把數(shù)據(jù)拉取到消息隊(duì)列里。
(5)消費(fèi)者一次拉取一批次數(shù)據(jù),經(jīng)過(guò)反序列化、攔截器再進(jìn)行數(shù)據(jù)處理。