做測試題的網(wǎng)站關(guān)鍵詞優(yōu)化的最佳方法
1、 kafka 是什么,有什么作用
2、Kafka為什么這么快
3、Kafka架構(gòu)及名詞解釋
4、Kafka中的AR、ISR、OSR代表什么
5、HW、LEO代表什么
6、ISR收縮性
7、kafka follower如何與leader同步數(shù)據(jù)
8、Zookeeper 在 Kafka 中的作用(早期)
9、Kafka如何快速讀取指定offset的消息
10、生產(chǎn)者發(fā)送消息有哪些模式
11、發(fā)送消息的分區(qū)策略有哪些
12、Kafka可靠性保證(不丟消息)
13、Kafka 是怎么去實現(xiàn)負(fù)載均衡的
14、簡述Kafka的Rebalance機(jī)制
15、Kafka 負(fù)載均衡會導(dǎo)致什么問題
16、如何增強(qiáng)消費者的消費能力
17、消費者與Topic的分區(qū)策略
18、如何保證消息不被重復(fù)消費(消費者冪等性)
19、為什么Kafka不支持讀寫分離
20、Kafka選舉機(jī)制
21、腦裂問題
22、如何為Kafka集群選擇合適的Topics/Partitions數(shù)量
23、Kafka 分區(qū)數(shù)可以增加或減少嗎?為什么
24、談?wù)勀銓afka生產(chǎn)者冪等性的了解
25、談?wù)勀銓?Kafka事務(wù)的了解?
26、Kafka消息是采用Pull模式,還是Push模式?
27、Kafka缺點
28、Kafka什么時候會丟數(shù)據(jù)
29、Kafka分區(qū)數(shù)越多越好嗎?
30、Kafka如何保證消息的有序性
31、Kafka精確一次性(Exactly-once)如何保證
1、 kafka 是什么,有什么作用
Kafka是一個開源的高吞吐量的分布式消息中間件,對比于其他 1) 緩沖和削峰:上游數(shù)據(jù)時有突發(fā)流量,下游可能扛不住,或者下游沒有足夠多的機(jī)器來保證冗余,kafka在中間可以起到一個緩沖的作用,把消息暫存在kafka中,下游服務(wù)就可以按照自己的節(jié)奏進(jìn)行慢慢處理。
1) 解耦和擴(kuò)展性:項目開始的時候,并不能確定具體需求。消息隊列可以作為一個接口層,解耦重要的業(yè)務(wù)流程。只需要遵守約定,針對數(shù)據(jù)編程即可獲取擴(kuò)展能力。
1) 冗余:可以采用一對多的方式,一個生產(chǎn)者發(fā)布消息,可以被多個訂閱topic的服務(wù)消費到,供多個毫無關(guān)聯(lián)的業(yè)務(wù)使用。
1) 健壯性:消息隊列可以堆積請求,所以消費端業(yè)務(wù)即使短時間死掉,也不會影響主要業(yè)務(wù)的正常進(jìn)行。
1) 異步通信:很多時候,用戶不想也不需要立即處理消息。消息隊列提供了異步處理機(jī)制,允許用戶把一個消息放入隊列,但并不立即處理它。想向隊列中放入多少消息就放多少,然后在需要的時候再去處理它們。
2、Kafka為什么這么快
- 利用 Partition 實現(xiàn)并行處理 不同 Partition 可位于不同機(jī)器,因此可以充分利用集群優(yōu)勢,實現(xiàn)機(jī)器間的并行處理。另一方面,由于 Partition 在物理上對應(yīng)一個文件夾,即使多個 Partition 位于同一個節(jié)點,也可通過配置讓同一節(jié)點上的不同 Partition 置于不同的磁盤上,從而實現(xiàn)磁盤間的并行處理,充分發(fā)揮多磁盤的優(yōu)勢。
- 利用了現(xiàn)代操作系統(tǒng)分頁存儲 Page Cache 來利用內(nèi)存提高 I/O 效率
- 順序?qū)?kafka的消息是不斷追加到文件中的,這個特性使kafka可以充分利用磁盤的順序讀寫性能 由于現(xiàn)代的操作系統(tǒng)提供了預(yù)讀和寫技術(shù),磁盤的順序?qū)懘蠖鄶?shù)情況下比隨機(jī)寫內(nèi)存還要快。順序讀寫不需要硬盤磁頭的尋道時間,只需很少的扇區(qū)旋轉(zhuǎn)時間,所以速度遠(yuǎn)快于隨機(jī)讀寫
- Zero-copy 零拷技術(shù)減少拷貝次數(shù)
- 數(shù)據(jù)批量處理。合并小的請求,然后以流的方式進(jìn)行交互,直頂網(wǎng)絡(luò)上限。在很多情況下,系統(tǒng)的瓶頸不是 CPU 或磁盤,而是網(wǎng)絡(luò)IO。因此,除了操作系統(tǒng)提供的低級批處理之外,Kafka 的客戶端和 broker 還會在通過網(wǎng)絡(luò)發(fā)送數(shù)據(jù)之前,在一個批處理中累積多條記錄 (包括讀和寫)。記錄的批處理分?jǐn)偭司W(wǎng)絡(luò)往返的開銷,使用了更大的數(shù)據(jù)包從而提高了帶寬利用率。
- Pull 拉模式 使用拉模式進(jìn)行消息的獲取消費,與消費端處理能力相符。
- 數(shù)據(jù)壓縮 Kafka還支持對消息集合進(jìn)行壓縮,Producer可以通過GZIP、Snappy、LZ4格式對消息集合進(jìn)行壓縮,數(shù)據(jù)壓縮一般都是和批處理配套使用來作為優(yōu)化手段的。壓縮的好處就是減少傳輸?shù)臄?shù)據(jù)量,減輕對網(wǎng)絡(luò)傳輸?shù)膲毫?Producer壓縮之后,在Consumer需進(jìn)行解壓,雖然增加了CPU的工作,但在對大數(shù)據(jù)處理上,瓶頸在網(wǎng)絡(luò)上而不是CPU,所以這個成本很值得
3、Kafka架構(gòu)及名詞解釋
簡易架構(gòu)圖如下:
詳細(xì)架構(gòu)圖如下
- Broker :一臺kafka服務(wù)器就是一個broker。一個集群由多個broker組成。一個broker可以容納多個topic。
- Producer:消息生產(chǎn)者,向kafka broker發(fā)送消息的客戶端。
- Consumer:消息消費者,向kafka broker取消息的客戶端。
- Topic:隊列,生產(chǎn)者和消費者通過此進(jìn)行對接。
- Consumer Group (CG):若干個Consumer組成的集合。這是kafka用來實現(xiàn)一個topic消息的廣播(發(fā)給所有的consumer)和單播(發(fā)給任意一個consumer)的手段。一個topic可以有多個CG。topic的消息會復(fù)制(不是真的復(fù)制,是概念上的)到所有的CG,但每個CG只會把消息發(fā)給該CG中的一個consumer。如果需要實現(xiàn)廣播,只要每個consumer有一個獨立的CG就可以了。要實現(xiàn)單播只要所有的consumer在同一個CG。用CG還可以將consumer進(jìn)行自由的分組而不需要多次發(fā)送消息到不同的topic。
- Partition:分區(qū),為了實現(xiàn)擴(kuò)展性,一個topic可以分布在多個broker上,一個topic可以分為多個partition,每個partition都是一個有序的隊列。partition中的每條消息都會被分配一個有序的id(offset)。kafka只保證同一個partition中的消息順序,不保證一個topic的整體(多個partition之間)的順序。生產(chǎn)者和消費者使用時可以指定topic中的具體partition。
- 副本:在kafka中,每個主題可以有多個分區(qū),每個分區(qū)又可以有多個副本。這多個副本中,只有一個是leader,而其他的都是follower副本。僅有l(wèi)eader副本可以對外提供服務(wù)。多個follower副本通常存放在和leader副本不同的broker中。通過這樣的機(jī)制實現(xiàn)了高可用,當(dāng)某臺機(jī)器掛掉后,其他follower副本也能迅速”轉(zhuǎn)正“,開始對外提供服務(wù)。
- offset:消費偏移量,topic中的每個分區(qū)都是有序且順序不可變的記錄集,并且不斷地追加到結(jié)構(gòu)化的log文件。分區(qū)中的每一個記錄都會分配一個id號來表示順序,我們稱之為offset,offset用來唯一的標(biāo)識分區(qū)中每一條記錄。可以設(shè)置為“自動提交”與“手動提交”。
4、Kafka中的AR、ISR、OSR代表什么
- AR:Assigned Replicas 指當(dāng)前分區(qū)中的所有副本。
- ISR:In-Sync Replicas 副本同步隊列。ISR中包括Leader和Foller。如果Leader進(jìn)程掛掉,會在ISR隊列中選擇一個服務(wù)作為新的Leader。有replica.lag.max.message(延遲條數(shù))和replica.lag.time.max.ms(延遲時間)兩個參數(shù)決定一臺服務(wù)器是否可以加入ISR副本隊列,在0.10版本之后移除了replica.lag.max.message(延遲條數(shù))參數(shù),防治服務(wù)頻繁的進(jìn)出隊列。任意一個維度超過閾值都會把Follower踢出ISR,存入OSR(Outof-Sync Replicas)列表,新加入的Follower也會先存放在OSR中。
- OSR:(Out-of-Sync Replicas)非同步副本隊列。與leader副本同步滯后過多的副本(不包括leader副本)組成OSR。如果OSR集合中有follower副本“追上”了leader副本,那么leader副本會把它從OSR集合轉(zhuǎn)移至ISR集合。默認(rèn)情況下,當(dāng)leader副本發(fā)生故障時,只有在ISR集合中的副本才有資格被選舉為新的leader,而在OSR集合中的副本則沒有任何機(jī)會(不過這個原則也可以通過修改unclean.leader.election.enable參數(shù)配置來改變)。unclean.leader.election.enable 為true的話,意味著非ISR集合的broker 也可以參與選舉,這樣就有可能發(fā)生數(shù)據(jù)丟失和數(shù)據(jù)不一致的情況,Kafka的可靠性就會降低;而如果unclean.leader.election.enable參數(shù)設(shè)置為false,Kafka的可用性就會降低。
ISR的伸縮:1)Leader跟蹤維護(hù)ISR中follower滯后狀態(tài),落后太多或失效時,leade把他們從ISR剔除。2)OSR中follower“追上”Leader,在ISR中才有資格選舉leader。
5、HW、LEO代表什么
- LEO (Log End Offset),標(biāo)識當(dāng)前日志文件中下一條待寫入的消息的offset。上圖中offset為9的位置即為當(dāng)前日志文件的 LEO,LEO 的大小相當(dāng)于當(dāng)前日志分區(qū)中最后一條消息的offset值加1.分區(qū) ISR 集合中的每個副本都會維護(hù)自身的 LEO ,而 ISR 集合中最小的 LEO 即為分區(qū)的 HW,對消費者而言只能消費 HW 之前的消息。
- HW:replica高水印值,副本中最新一條已提交消息的位移。leader 的HW值也就是實際已提交消息的范圍,每個replica都有HW值,但僅僅leader中的HW才能作為標(biāo)示信息。什么意思呢,就是說當(dāng)按照參數(shù)標(biāo)準(zhǔn)成功完成消息備份(成功同步給follower replica后)才會更新HW的值,代表消息理論上已經(jīng)不會丟失,可以認(rèn)為“已提交”。
6、ISR收縮性
啟動 Kafka時候自動開啟的兩個定時任務(wù),“isr-expiration"和”isr-change-propagation"。
- isr-expiration:isr-expiration任務(wù)會周期性的檢測每個分區(qū)是否需要縮減其ISR集合,相當(dāng)于一個紀(jì)檢委員,巡查尖子班時候發(fā)現(xiàn)有學(xué)生睡覺打牌看小說,就把它的座位移除尖子班,縮減ISR,寧缺毋濫。同樣道理,如果follower數(shù)據(jù)同步趕上leader,那么該follower就能進(jìn)入ISR尖子班,擴(kuò)充。上面關(guān)于ISR尖子班人員的所見,都會記錄到isrChangeSet中,想象成是一個名單列表,誰能進(jìn),誰要出,都記錄在案。
- isr-change-propagation:作用就是檢查isrChangeSet,按照名單上的信息移除和遷入,一般是2500ms檢查一次,但是為了防止頻繁收縮擴(kuò)充影響性能,不是每次都能做變動,必須滿足:1、上一次ISR集合發(fā)生變化距離現(xiàn)在已經(jīng)超過5秒,2、上一次寫入zookeeper的時候距離現(xiàn)在已經(jīng)超過60秒。這兩個條件都滿足,那么就開始換座位!這兩個條件可以由我們來配置。
- Kafka使用這種ISR收縮的方式有效的權(quán)衡了數(shù)據(jù)可靠性與性能之間的關(guān)系。
7、kafka follower如何與leader同步數(shù)據(jù)
Kafka的復(fù)制機(jī)制既不是完全的同步復(fù)制,也不是單純的異步復(fù)制。完全同步復(fù)制要求All Alive Follower都復(fù)制完,這條消息才會被認(rèn)為commit,這種復(fù)制方式極大的影響了吞吐率。而異步復(fù)制方式下,Follower異步的從Leader復(fù)制數(shù)據(jù),數(shù)據(jù)只要被Leader寫入log就被認(rèn)為已經(jīng)commit,這種情況下,如果leader掛掉,會丟失數(shù)據(jù),kafka使用ISR的方式很好的均衡了確保數(shù)據(jù)不丟失以及吞吐率。Follower可以批量的從Leader復(fù)制數(shù)據(jù),而且Leader充分利用磁盤順序讀以及send file(zero copy)機(jī)制,這樣極大的提高復(fù)制性能,內(nèi)部批量寫磁盤,大幅減少了Follower與Leader的消息量差。
8、Zookeeper 在 Kafka 中的作用(早期)
zookeeper 是一個分布式的協(xié)調(diào)組件,早期版本的kafka用zk做meta信息存儲,consumer的消費狀態(tài),group的管理以及 offset的值??紤]到zk本身的一些因素以及整個架構(gòu)較大概率存在單點問題,新版本中逐漸弱化了zookeeper的作用。新的consumer使用了kafka內(nèi)部的group coordination協(xié)議,也減少了對zookeeper的依賴,
但是broker依然依賴于ZK,zookeeper 在kafka中還用來選舉controller 和 檢測broker是否存活等等。
1. Broker注冊:Broker是分布式部署并且互相獨立,此時需要有一個注冊系統(tǒng)能夠?qū)⒄麄€集群中的Broker管理起來,此時就用到的Zookeeper。在Zookeeper上會有一個專門用來進(jìn)行Broker服務(wù)器列表記錄的節(jié)點:/brokes/ids
2.Topic注冊:在kafka中,同一個Topic的消息會被分成多個分區(qū)并將其分布在多個Broker上,這些分區(qū)信息以及與Broker的對應(yīng)關(guān)系也都是由Zookeeper維護(hù),由專門的節(jié)點記錄:/brokers/topics
3.消費者注冊:消費者服務(wù)器在初始化啟動時加入消費者分組的步驟如下:注冊到消費者分組。每個消費者服務(wù)器啟動時,都會到Zookeeper的指定節(jié)點下創(chuàng)建一個屬于自己的消費者節(jié)點,例如/consumer/[groupid]/ids/[consumerid],完成節(jié)點創(chuàng)建后,消費者就會將自己訂閱的Topic信息寫入該臨時節(jié)點。
- 對消費者分組中的消費者的變化注冊監(jiān)聽:每個消費者都需要關(guān)注所屬消費者分組中的其他消費者服務(wù)器的變化情況,即對/consumer/[group_id]/ids節(jié)點注冊子節(jié)點變化的Watcher監(jiān)聽,一旦發(fā)現(xiàn)消費者新增或減少,就觸發(fā)消費者的負(fù)載均衡。
- 對Broker服務(wù)器變化注冊監(jiān)聽:消費者需要對/broker/ids[0-N]中的節(jié)點進(jìn)行監(jiān)聽,如果發(fā)現(xiàn)Broker服務(wù)器列表發(fā)生變化,那么就根據(jù)具體情況來決定是否需要進(jìn)行消費者負(fù)載均衡。
- 進(jìn)行消費者負(fù)載均衡:為了讓同一個Topic下不同分區(qū)的消息盡量均衡地被多個消費者消費而進(jìn)行消費者與消息分區(qū)分配的過程,通常對于一個消費者分組,如果組內(nèi)的消費者服務(wù)器發(fā)生變更或Broker服務(wù)器發(fā)生變更,會進(jìn)行消費者負(fù)載均衡。
- Offset記錄 在消費者對指定消息分區(qū)進(jìn)行消費的過程中,需要定時地將分區(qū)消息的消費進(jìn)度Offset記錄到Zookeeper上,以便對該消費者進(jìn)行重啟或者其他消費者重新接管該消息分區(qū)的消息消費后,能夠從之前的進(jìn)度繼續(xù)進(jìn)行消息消費。Offset在Zookeeper中由一個專門節(jié)點進(jìn)行記錄,其節(jié)點路徑為:/consumers/[groupid]/offsets/[topic]/[brokerid-partition_id] 節(jié)點內(nèi)容就是Offset的值。
4.生產(chǎn)者負(fù)載均衡:由于同一個Topic消息會被分區(qū)并將其分布在多個Broker上,因此生產(chǎn)者需要將消息合理地發(fā)送到這些分布式的Broker上,那么如何實現(xiàn)生產(chǎn)者的負(fù)載均衡,Kafka支持傳統(tǒng)的四層負(fù)載均衡,也支持Zookeeper方式實現(xiàn)負(fù)載均衡。
- 四層負(fù)載均衡:根據(jù)生產(chǎn)者的IP地址和端口來為其圈定一個相關(guān)聯(lián)的Broker。通常,一個生產(chǎn)者只會對應(yīng)單個Broker,然后該生產(chǎn)者產(chǎn)生的消息都發(fā)送到該Broker。這種方式邏輯簡單,每個生產(chǎn)者不需要同其他系統(tǒng)建立額外的TCP鏈接,只需要和Broker維護(hù)單個TCP連接即可。但是無法做到真正的負(fù)載均衡,因為實際系統(tǒng)中的每個生產(chǎn)者產(chǎn)生的消息量及每個Broker的消息存儲量都是不一樣的,如果有些生產(chǎn)者產(chǎn)生的消息遠(yuǎn)多于其他生產(chǎn)者的話,那么會導(dǎo)致不同的Broker接收到的消息總數(shù)差異巨大,同時,生產(chǎn)者也無法實時感知到Broker的新增和刪除。
- 使用Zookeeper進(jìn)行負(fù)載均衡,由于每個Broker啟動時,都會完成Broker注冊過程,生產(chǎn)者會通過該節(jié)點的變化來動態(tài)地感知到Broker服務(wù)器列表的變更,這樣就可以實現(xiàn)動態(tài)的負(fù)載均衡機(jī)制。
5.消費者負(fù)載均衡:與生產(chǎn)者相似,Kafka中的消費者同樣需要進(jìn)行負(fù)載均衡來實現(xiàn)多個消費者合理地從對應(yīng)的Broker服務(wù)器上接收消息,每個消費者分組包含若干消費者,每條消息都只會發(fā)送給分組中的一個消費者,不同的消費者分組消費自己特定的Topic下面的消息,互不干擾。
6.分區(qū)與消費者的關(guān)系:消費組consumer group下有多個Consumer(消費者)。對于每個消費者組(consumer group),Kafka都會為其分配一個全局唯一的Group ID,Group內(nèi)部的所有消費者共享該ID。訂閱的topic下的每個分區(qū)只能分配給某個group下的一個consumer(當(dāng)然該分區(qū)還可以被分配給其他group) 同時,kafka為每個消費者分配一個Consumer ID,通常采用“Hostname:UUID”形式表示。在kafka中,規(guī)定了每個消息分區(qū)只能被同組的一個消費者進(jìn)行消費,因此,需要在zookeeper上記錄消息分區(qū)與Consumer之間的關(guān)系,每個消費者一旦確定了對一個消費分區(qū)的消費權(quán)利,需要將其Consumer ID寫入到平Zookeeper對應(yīng)消息分區(qū)的臨時節(jié)點上,例如:/consumers/[groupid]/owners/topic/[brokerid-partitionid] 其中,[brokerid-partition_id]就是一個消息分區(qū)的表示,節(jié)點內(nèi)容就是該消息分區(qū)上消費者的Consumer ID。
7.補(bǔ)充:早期版本的 kafka 用 zk 做 meta 信息存儲,consumer 的消費狀態(tài),group 的管理以及 offse t的值??紤]到zk本身的一些因素以及整個架構(gòu)較大概率存在單點問題,新版本中確實逐漸弱化了zookeeper的作用。新的consumer使用了kafka內(nèi)部的group coordination協(xié)議,也減少了對zookeeper的依賴
9、Kafka如何快速讀取指定offset的消息
Kafka本地日志存儲根據(jù)segement分段存儲,默認(rèn)1G,其中segement包括index稀疏索引文件和log數(shù)據(jù)文件。其中index文件索引通過offset與posttion來定位數(shù)據(jù)文件中指定message的消息。其中index和log的文件名都為當(dāng)前segement的起始o(jì)ffset。
讀取offset=170418的消息,首先通過offset根據(jù)二分法定位到index索引文件,然后根據(jù)索引文件中的[offset,position](position為物理偏移地址)去log中獲取指定offset的message數(shù)據(jù)。
10、生產(chǎn)者發(fā)送消息有哪些模式
異步發(fā)送
對于生產(chǎn)者的異步發(fā)送來說就是,我發(fā)送完當(dāng)前消息后,并不需要你將當(dāng)前消息的發(fā)送結(jié)果立馬告訴我,而是可以隨即進(jìn)行下一條消息的發(fā)送。但是我會允許添加一個回調(diào)函數(shù),接收你后續(xù)返回的發(fā)送結(jié)果。異步發(fā)送這塊我們直接調(diào)用kafkaProducer的send方法即可實現(xiàn)異步發(fā)送。
同步發(fā)送
如果生產(chǎn)者需要使用同步發(fā)送的方式,只需要拿到 send 方法返回的future對象后,調(diào)用其 get() 方法即可。此時如果消息還未發(fā)送到broker中,get方法會被阻塞,等到 broker 返回消息發(fā)送結(jié)果后會跳出當(dāng)前方法并將結(jié)果返回。
11、發(fā)送消息的分區(qū)策略有哪些
所謂分區(qū)寫入策略,即是生產(chǎn)者將數(shù)據(jù)寫入到kafka主題后,kafka如何將數(shù)據(jù)分配到不同分區(qū)中的策略。
常見的有三種策略,輪詢策略,隨機(jī)策略,和按鍵保存策略。其中輪詢策略是默認(rèn)的分區(qū)策略,而隨機(jī)策略則是較老版本的分區(qū)策略,不過由于其分配的均衡性不如輪詢策略,故而后來改成了輪詢策略為默認(rèn)策略。
輪詢策略
所謂輪詢策略,即按順序輪流將每條數(shù)據(jù)分配到每個分區(qū)中。
舉個例子,假設(shè)主題test有三個分區(qū),分別是分區(qū)A,分區(qū)B和分區(qū)C。那么主題對接收到的第一條消息寫入A分區(qū),第二條消息寫入B分區(qū),第三條消息寫入C分區(qū),第四條消息則又寫入A分區(qū),依此類推。
輪詢策略是默認(rèn)的策略,故而也是使用最頻繁的策略,它能最大限度保證所有消息都平均分配到每一個分區(qū)。除非有特殊的業(yè)務(wù)需求,否則使用這種方式即可。
隨機(jī)策略
隨機(jī)策略,也就是每次都隨機(jī)地將消息分配到每個分區(qū)。其實大概就是先得出分區(qū)的數(shù)量,然后每次獲取一個隨機(jī)數(shù),用該隨機(jī)數(shù)確定消息發(fā)送到哪個分區(qū)。
在比較早的版本,默認(rèn)的分區(qū)策略就是隨機(jī)策略,但其實使用隨機(jī)策略也是為了更好得將消息均衡寫入每個分區(qū)。但后來發(fā)現(xiàn)對這一需求而言,輪詢策略的表現(xiàn)更優(yōu),所以社區(qū)后來的默認(rèn)策略就是輪詢策略了。
hash(Key)
按鍵保存策略,就是當(dāng)生產(chǎn)者發(fā)送數(shù)據(jù)的時候,可以指定一個key,計算這個key的hashCode值,按照hashCode的值對不同消息進(jìn)行存儲。
至于要如何實現(xiàn),那也簡單,只要讓生產(chǎn)者發(fā)送的時候指定key就行。欸剛剛不是說默認(rèn)的是輪詢策略嗎?其實啊,kafka默認(rèn)是實現(xiàn)了兩個策略,沒指定key的時候就是輪詢策略,有的話那激素按鍵保存策略了。
上面有說到一個場景,那就是要順序發(fā)送消息到kafka。前面提到的方案是讓所有數(shù)據(jù)存儲到一個分區(qū)中,但其實更好的做法,就是使用這種按鍵保存策略。
讓需要順序存儲的數(shù)據(jù)都指定相同的鍵,而不需要順序存儲的數(shù)據(jù)指定不同的鍵,這樣一來,即實現(xiàn)了順序存儲的需求,又能夠享受到kafka多分區(qū)的優(yōu)勢,豈不美哉。
粘性分區(qū)
所以如果使用默認(rèn)的輪詢partition策略,可能會造成一個大的batch被輪詢成多個小的batch的情況。鑒于此,kafka2.4的時候推出一種新的分區(qū)策略,即StickyPartitioning Strategy,StickyPartitioning Strategy會隨機(jī)地選擇另一個分區(qū)并會盡可能地堅持使用該分區(qū)——即所謂的粘住這個分區(qū)。
鑒于小batch可能導(dǎo)致延時增加,之前對于無Key消息的分區(qū)策略效率很低。社區(qū)于2.4版本引入了黏性分區(qū)策略(StickyPartitioning Strategy)。該策略是一種全新的策略,能夠顯著地降低給消息指定分區(qū)過程中的延時。使用StickyPartitioner有助于改進(jìn)消息批處理,減少延遲,并減少broker的負(fù)載。
自定義分區(qū)器
實現(xiàn)partitioner接口
切記分區(qū)是實現(xiàn)負(fù)載均衡以及高吞吐量的關(guān)鍵,所以一定要在生產(chǎn)者這一端就要考慮好合適的分區(qū)策略,避免造成消息數(shù)據(jù)的“傾斜”,使得某些分區(qū)成為性能瓶頸,從而導(dǎo)致下游數(shù)據(jù)消費的性能下降的問題。
12、Kafka可靠性保證(不丟消息)
Kafka精確一次性(Exactly-once)保障之一
Kafka可靠性主要從三個方面來看,Broker、Producer、Consumer。1. Brokerbroker寫數(shù)據(jù)時首先寫到PageCache中,pageCache的數(shù)據(jù)通過linux的flusher程序異步批量存儲至磁盤中,此過程稱為刷盤。而pageCache位于內(nèi)存。這部分?jǐn)?shù)據(jù)會在斷電后丟失。刷盤觸發(fā)條件有三:
- 主動調(diào)用sync或fsync函數(shù)
- 可用內(nèi)存低于閥值
- dirty data時間達(dá)到閥值。dirty是pagecache的一個標(biāo)識位,當(dāng)有數(shù)據(jù)寫入到pageCache時,pagecache被標(biāo)注為dirty,數(shù)據(jù)刷盤以后,dirty標(biāo)志清除。
kafka沒有提供同步刷盤的方式,也就是說理論上要完全讓kafka保證單個broker不丟失消息是做不到的,只能通過調(diào)整刷盤機(jī)制的參數(shù)緩解該情況,比如:
減少刷盤間隔log.flush.interval.ms(在刷新到磁盤之前,任何topic中的消息保留在內(nèi)存中的最長時間) 減少刷盤數(shù)據(jù)量大小log.flush.interval.messages(在將消息刷新到磁盤之前,在日志分區(qū)上累積的消息數(shù)量)。
時間越短,數(shù)據(jù)量越小,性能越差,但是丟失的數(shù)據(jù)會變少,可靠性越好。這是一個選擇題。
同時,Kafka通過producer和broker協(xié)同處理消息丟失的情況,一旦producer發(fā)現(xiàn)broker消息丟失,即可自動進(jìn)行retry。retry次數(shù)可根據(jù)參數(shù)retries進(jìn)行配置,超過指定次數(shù)會,此條消息才會被判斷丟失。producer和broker之間,通過ack機(jī)制來判斷消息是否丟失。
- acks=0,producer不等待broker的響應(yīng),效率最高,但是消息很可能會丟。
- acks=1,leader broker收到消息后,不等待其他follower的響應(yīng),即返回ack。也可以理解為ack數(shù)為1。此時,如果follower還沒有收到leader同步的消息leader就掛了,那么消息會丟失。按照上圖中的例子,如果leader收到消息,成功寫入PageCache后,會返回ack,此時producer認(rèn)為消息發(fā)送成功。但此時,按照上圖,數(shù)據(jù)還沒有被同步到follower。如果此時leader斷電,數(shù)據(jù)會丟失。
- acks=-1,leader broker收到消息后,掛起,等待所有ISR列表中的follower返回結(jié)果后,再返回ack。-1等效與all。這種配置下,只有l(wèi)eader寫入數(shù)據(jù)到pagecache是不會返回ack的,還需要所有的ISR返回“成功”才會觸發(fā)ack。如果此時斷電,producer可以知道消息沒有被發(fā)送成功,將會重新發(fā)送。如果在follower收到數(shù)據(jù)以后,成功返回ack,leader斷電,數(shù)據(jù)將存在于原來的follower中。在重新選舉以后,新的leader會持有該部分?jǐn)?shù)據(jù)。數(shù)據(jù)從leader同步到follower,需要2步:
- 數(shù)據(jù)從pageCache被刷盤到disk。因為只有disk中的數(shù)據(jù)才能被同步到replica。
- 數(shù)據(jù)同步到replica,并且replica成功將數(shù)據(jù)寫入PageCache。在producer得到ack后,哪怕是所有機(jī)器都停電,數(shù)據(jù)也至少會存在于leader的磁盤內(nèi)。
- 上面第三點提到了ISR的列表的follower,需要配合另一個參數(shù)才能更好的保證ack的有效性。ISR是Broker維護(hù)的一個“可靠的follower列表”,in-sync Replica列表,broker的配置包含一個參數(shù):min.insync.replicas。該參數(shù)表示ISR中最少的副本數(shù)。如果不設(shè)置該值,ISR中的follower列表可能為空。此時相當(dāng)于acks=1。
Topic 分區(qū)副本
在 Kafka 0.8.0 之前,Kafka 是沒有副本的概念的,那時候人們只會用 Kafka 存儲一些不重要的數(shù)據(jù),因為沒有副本,數(shù)據(jù)很可能會丟失。但是隨著業(yè)務(wù)的發(fā)展,支持副本的功能越來越強(qiáng)烈,所以為了保證數(shù)據(jù)的可靠性,Kafka 從 0.8.0 版本開始引入了分區(qū)副本(詳情請參見 KAFKA-50)。也就是說每個分區(qū)可以人為的配置幾個副本(比如創(chuàng)建主題的時候指定 replication-factor,也可以在 Broker 級別進(jìn)行配置 default.replication.factor),一般會設(shè)置為3。
Kafka 可以保證單個分區(qū)里的事件是有序的,分區(qū)可以在線(可用),也可以離線(不可用)。在眾多的分區(qū)副本里面有一個副本是 Leader,其余的副本是 follower,所有的讀寫操作都是經(jīng)過 Leader 進(jìn)行的,同時 follower 會定期地去 leader 上的復(fù)制數(shù)據(jù)。當(dāng) Leader 掛了的時候,其中一個 follower 會重新成為新的 Leader。通過分區(qū)副本,引入了數(shù)據(jù)冗余,同時也提供了 Kafka 的數(shù)據(jù)可靠性。
Kafka 的分區(qū)多副本架構(gòu)是 Kafka 可靠性保證的核心,把消息寫入多個副本可以使 Kafka 在發(fā)生崩潰時仍能保證消息的持久性。
2. Producer
producer在發(fā)送數(shù)據(jù)時可以將多個請求進(jìn)行合并后異步發(fā)送,合并后的請求首先緩存在本地buffer中,正常情況下,producer客戶端的異步調(diào)用可以通過callback回調(diào)函數(shù)來處理消息發(fā)送失敗或者超時的情況,但是當(dāng)出現(xiàn)以下情況,將會出現(xiàn)數(shù)據(jù)丟失
- producer異常中斷,buffer中的數(shù)據(jù)將丟失。
- producer客戶端內(nèi)存不足,如果采取的策略是丟棄消息(另一種策略是block阻塞),消息也會丟失。
- 消息產(chǎn)生(異步)過快,導(dǎo)致掛起線程過多,內(nèi)存不足,導(dǎo)致程序崩潰,消息丟失。
針對以上情況,可以有以下解決思路。
- producer采用同步方式發(fā)送消息,或者生產(chǎn)數(shù)據(jù)時采用阻塞的線程池,并且線程數(shù)不宜過多。整體思路就是控制消息產(chǎn)生速度。
- 擴(kuò)大buffer的容量配置,配置項為:buffer.memory。這種方法可以緩解數(shù)據(jù)丟失的情況,但不能杜絕。
3.Consumer
Consumer消費消息有以下幾個步驟:
- 接收消息
- 處理消息
- 反饋處理結(jié)果
消費方式主要分為兩種
- 自動提交offset,Automatic Offset Committing (enable.auto.commit=true)
- 手動提交offset,Manual Offset Control(enable.auto.commit=false)
Consumer自動提交機(jī)制是根據(jù)一定的時間間隔,將收到的消息進(jìn)行commit,具體配置為:auto.commit.interval.ms。commit和消費的過程是異步的,也就是說可能存在消費過程未成功,commit消息就已經(jīng)提交,此時就會出現(xiàn)消息丟失。我們可將提交類型改為手動提交,在消費完成后再進(jìn)行提交,這樣可以保證消息“至少被消費一次”(at least once),但如果消費完成后在提交過程中出現(xiàn)故障,則會出現(xiàn)重復(fù)消費的情況,本章不討論,下章講解。
13、Kafka 是怎么去實現(xiàn)負(fù)載均衡的
生產(chǎn)者層面
分區(qū)器是生產(chǎn)者層面的負(fù)載均衡。Kafka 生產(chǎn)者生產(chǎn)消息時,根據(jù)分區(qū)器將消息投遞到指定的分區(qū)中,所以 Kafka 的負(fù)載均衡很大程度上依賴于分區(qū)器。Kafka 默認(rèn)的分區(qū)器是 Kafka 提供的 DefaultPartitioner。它的分區(qū)策略是根據(jù) Key 值進(jìn)行分區(qū)分配的:
如果 key 不為 null:對 Key 值進(jìn)行 Hash 計算,從所有分區(qū)中根據(jù) Key 的 Hash 值計算出一個分區(qū)號;擁有相同 Key 值的消息被寫入同一個分區(qū);如果 key 為 null:消息將以輪詢的方式,在所有可用分區(qū)中分別寫入消息。如果不想使用 Kafka 默認(rèn)的分區(qū)器,用戶可以實現(xiàn) Partitioner 接口,自行實現(xiàn)分區(qū)方法。
注:在筆者的理解中,分區(qū)器的負(fù)載均衡與順序性有著一定程度上的矛盾。
- 負(fù)載均衡的目的是將消息盡可能平均分配,對于 Kafka 而言,就是盡可能將消息平均分配給所有分區(qū);
- 如果使用 Kafka 保證順序性,則需要利用到 Kafka 的分區(qū)順序性的特性。
- 對于需要保證順序性的場景,通常會利用 Key 值實現(xiàn)分區(qū)順序性,那么所有 Key值相同的消息就會進(jìn)入同一個分區(qū)。這樣的情況下,對于大量擁有相同 Key值的消息,會涌入同一個分區(qū),導(dǎo)致一個分區(qū)消息過多,其他分區(qū)沒有消息的情況,即與負(fù)載均衡的思想相悖。
消費者層面
主要根據(jù)消費者的Rebalance機(jī)制實現(xiàn),內(nèi)容詳見下章
14、簡述Kafka的Rebalance機(jī)制
什么是 Rebalance
Rebalance 本質(zhì)上是一種協(xié)議,規(guī)定了一個 Consumer Group 下的所有 consumer 如何達(dá)成一致,來分配訂閱 Topic 的每個分區(qū)。 例如:某 Group 下有 20 個 consumer 實例,它訂閱了一個具有 100 個 partition 的 Topic。正常情況下,kafka 會為每個 Consumer 平均的分配 5 個分區(qū)。這個分配的過程就是 Rebalance。
觸發(fā) Rebalance 的時機(jī)
Rebalance 的觸發(fā)條件有3個。
- 組成員個數(shù)發(fā)生變化。例如有新的 consumer 實例加入該消費組或者離開組。
- 訂閱的 Topic 個數(shù)發(fā)生變化。
- 訂閱 Topic 的分區(qū)數(shù)發(fā)生變化。
Rebalance 發(fā)生時,Group 下所有 consumer 實例都會協(xié)調(diào)在一起共同參與,kafka 能夠保證盡量達(dá)到最公平的分配。但是 Rebalance 過程對 consumer group 會造成比較嚴(yán)重的影響。在 Rebalance 的過程中 consumer group 下的所有消費者實例都會停止工作,等待 Rebalance 過程完成。
Rebalance 過程
Rebalance 過程分為兩步:JoinGroup 請求和 SyncGroup 請求。JoinGroup :JoinGroup 請求的主要作用是將組成員訂閱信息發(fā)送給領(lǐng)導(dǎo)者消費者,待領(lǐng)導(dǎo)者制定好分配方案后,重平衡流程進(jìn)入到 SyncGroup 請求階段。SyncGroup:SyncGroup 請求的主要目的,就是讓協(xié)調(diào)者把領(lǐng)導(dǎo)者制定的分配方案下發(fā)給各個組內(nèi)成員。當(dāng)所有成員都成功接收到分配方案后,消費者組進(jìn)入到 Stable 狀態(tài),即開始正常的消費工作。
15、Kafka 負(fù)載均衡會導(dǎo)致什么問題
在消費者組Rebalance期間,一直等到rebalance結(jié)束前,消費者會出現(xiàn)無法讀取消息,造成整個消費者組一段時間內(nèi)不可用。
16、如何增強(qiáng)消費者的消費能力
1、如果是Kafka消費能力不足,則可以考慮增加Topic的分區(qū)數(shù),并且同時提升消費組的消費者數(shù)量,消費者數(shù)==分區(qū)數(shù)。兩者缺一不可。
2、如果是下游的數(shù)據(jù)處理不及時:則提高每批次拉取的數(shù)量。批次拉取數(shù)據(jù)過少(拉取數(shù)據(jù)/處理時間<生產(chǎn)速度),使處理的數(shù)據(jù)小于生產(chǎn)的數(shù)據(jù),也會造成數(shù)據(jù)積壓。
3、優(yōu)化消費者的處理邏輯,提高處理效率
17、消費者與Topic的分區(qū)策略
Range
Range是對每個Topic而言的(即一個Topic一個Topic分),首先對同一個Topic里面的分區(qū)按照序號進(jìn)行排序,并對消費者按照字母順序進(jìn)行排序。然后用Partitions分區(qū)的個數(shù)除以消費者線程的總數(shù)來決定每個消費者線程消費幾個分區(qū)。如果除不盡,那么前面幾個消費者線程將會多消費一個分區(qū)。
RoundRobin
將消費組內(nèi)所有消費者以及消費者所訂閱的所有topic的partition按照字典序排序,然后通過輪詢方式逐個將分區(qū)以此分配給每個消費者。使用RoundRobin策略有兩個前提條件必須滿足:
- 同一個消費者組里面的所有消費者的num.streams(消費者消費線程數(shù))必須相等;
- 每個消費者訂閱的主題必須相同。
StickyAssignor
無論是RangeAssignor,還是RoundRobinAssignor,當(dāng)前的分區(qū)分配算法都沒有考慮上一次的分配結(jié)果。顯然,在執(zhí)行一次新的分配之前,如果能考慮到上一次分配的結(jié)果,盡量少的調(diào)整分區(qū)分配的變動,顯然是能節(jié)省很多開銷的。
Sticky是“粘性的”,可以理解為分配結(jié)果是帶“粘性的”——每一次分配變更相對上一次分配做最少的變動(上一次的結(jié)果是有粘性的),其目標(biāo)有兩點:
- 分區(qū)的分配盡量的均衡
- 每一次重分配的結(jié)果盡量與上一次分配結(jié)果保持一致
StickyAssignor的模式比其他兩種提供更加均衡的分配結(jié)果,在發(fā)生Consumer或者Partition變更的情況下,也能減少不必要的分區(qū)調(diào)整。
18、如何保證消息不被重復(fù)消費(消費者冪等性)
Kafka精確一次性(Exactly-once)保障之一
冪等性:就是用戶對于同一操作發(fā)起的一次請求或者多次請求的結(jié)果是一致的,不會因為多次點擊而產(chǎn)生了副作用。
出現(xiàn)原因:
- 原因1:Consumer在消費過程中,被強(qiáng)行kill掉消費者線程或異常中斷(消費系統(tǒng)宕機(jī)、重啟等),導(dǎo)致實際消費后的數(shù)據(jù),offset沒有提交。
- 原因2:設(shè)置offset為自動提交,關(guān)閉kafka時,如果在close之前,調(diào)用 consumer.unsubscribe() 則有可能部分offset沒提交,下次重啟會重復(fù)消費。
- 原因3:消費超時導(dǎo)致消費者與集群斷開連接,offset尚未提交,導(dǎo)致重平衡后重復(fù)消費。一般消費超時(session.time.out)有以下原因:并發(fā)過大,消費者突然宕機(jī),處理超時等。
解決思路:
- 提高消費能力,提高單條消息的處理速度,例如對消息處理中比 較耗時的步驟可通過異步的方式進(jìn)行處理、利用多線程處理等。
- 在縮短單條消息消費時常的同時,根據(jù)實際場景可將session.time.out(Consumer心跳超時時間)和max.poll.interval.ms(consumer兩次poll的最大時間間隔)值設(shè)置大一點,避免不必要的rebalance,此外可適當(dāng)減小max.poll.records的值( 表示每次消費的時候,獲取多少條消息),默認(rèn)值是500,可根據(jù)實際消息速率適當(dāng)調(diào)小。這種思路可解決因消費時間過長導(dǎo)致的重復(fù)消費問題, 對代碼改動較小,但無法絕對避免重復(fù)消費問題。
- 根據(jù)業(yè)務(wù)情況制定:引入單獨去重機(jī)制,例如生成消息時,在消息中加入唯一標(biāo)識符如主鍵id。寫入時根據(jù)逐漸主鍵判斷update還是insert。如果寫redis,則每次根據(jù)主鍵id進(jìn)行set即可,天然冪等性。或者使用redis作為緩沖,將id首先寫入redis進(jìn)行重復(fù)判斷,然后在進(jìn)行后續(xù)操作。
- 開啟生產(chǎn)者的精確一次性,也就是冪等性, 再引入producer事務(wù) ,即客戶端傳入一個全局唯一的Transaction ID,這樣即使本次會話掛掉也能根據(jù)這個id找到原來的事務(wù)狀態(tài)
19、為什么Kafka不支持讀寫分離
在 Kafka 中,生產(chǎn)者寫入消息、消費者讀取消息的操作都是與 leader 副本進(jìn)行交互的,從 而實現(xiàn)的是一種主寫主讀的生產(chǎn)消費模型。
Kafka 并不支持主寫從讀,因為主寫從讀有 2 個很明 顯的缺點:
- 數(shù)據(jù)一致性問題。數(shù)據(jù)從主節(jié)點轉(zhuǎn)到從節(jié)點必然會有一個延時的時間窗口,這個時間 窗口會導(dǎo)致主從節(jié)點之間的數(shù)據(jù)不一致。某一時刻,在主節(jié)點和從節(jié)點中 A 數(shù)據(jù)的值都為 X, 之后將主節(jié)點中 A 的值修改為 Y,那么在這個變更通知到從節(jié)點之前,應(yīng)用讀取從節(jié)點中的 A 數(shù)據(jù)的值并不為最新的 Y,由此便產(chǎn)生了數(shù)據(jù)不一致的問題。
- 延時問題。類似 Redis 這種組件,數(shù)據(jù)從寫入主節(jié)點到同步至從節(jié)點中的過程需要經(jīng) 歷網(wǎng)絡(luò)→主節(jié)點內(nèi)存→網(wǎng)絡(luò)→從節(jié)點內(nèi)存這幾個階段,整個過程會耗費一定的時間。而在 Kafka 中,主從同步會比 Redis 更加耗時,它需要經(jīng)歷網(wǎng)絡(luò)→主節(jié)點內(nèi)存→主節(jié)點磁盤→網(wǎng)絡(luò)→從節(jié) 點內(nèi)存→從節(jié)點磁盤這幾個階段。對延時敏感的應(yīng)用而言,主寫從讀的功能并不太適用。
20、Kafka選舉機(jī)制
Kafka選舉主要分為以下三種:
- 控制器(Broker)選舉機(jī)制
- 分區(qū)副本選舉機(jī)制
- 消費組選舉機(jī)制
控制器選舉
控制器是Kafka的核心組件,它的主要作用是在Zookeeper的幫助下管理和協(xié)調(diào)整個Kafka集群包括所有分區(qū)與副本的狀態(tài)。集群中任意一個Broker都能充當(dāng)控制器的角色,但在運行過程中,只能有一個Broker成為控制器。集群中第一個啟動的Broker會通過在Zookeeper中創(chuàng)建臨時節(jié)點/controller來讓自己成為控制器,其他Broker啟動時也會在zookeeper中創(chuàng)建臨時節(jié)點,但是發(fā)現(xiàn)節(jié)點已經(jīng)存在,所以它們會收到一個異常,意識到控制器已經(jīng)存在,那么就會在Zookeeper中創(chuàng)建watch對象,便于它們收到控制器變更的通知。如果控制器與Zookeeper斷開連接或異常退出,其他broker通過watch收到控制器變更的通知,就會嘗試創(chuàng)建臨時節(jié)點/controller,如果有一個Broker創(chuàng)建成功,那么其他broker就會收到創(chuàng)建異常通知,代表控制器已經(jīng)選舉成功,其他Broker只需創(chuàng)建watch對象即可。
控制器作用
- 主題管理:創(chuàng)建、刪除Topic,以及增加Topic分區(qū)等操作都是由控制器執(zhí)行。
- 分區(qū)重分配:執(zhí)行Kafka的reassign腳本對Topic分區(qū)重分配的操作,也是由控制器實現(xiàn)。如果集群中有一個Broker異常退出,控制器會檢查這個broker是否有分區(qū)的副本leader,如果有那么這個分區(qū)就需要一個新的leader,此時控制器就會去遍歷其他副本,決定哪一個成為新的leader,同時更新分區(qū)的ISR集合。如果有一個Broker加入集群中,那么控制器就會通過Broker ID去判斷新加入的Broker中是否含有現(xiàn)有分區(qū)的副本,如果有,就會從分區(qū)副本中去同步數(shù)據(jù)。
- Preferred leader選舉:因為在Kafka集群長時間運行中,broker的宕機(jī)或崩潰是不可避免的,leader就會發(fā)生轉(zhuǎn)移,即使broker重新回來,也不會是leader了。在眾多l(xiāng)eader的轉(zhuǎn)移過程中,就會產(chǎn)生leader不均衡現(xiàn)象,可能一小部分broker上有大量的leader,影響了整個集群的性能,所以就需要把leader調(diào)整回最初的broker上,這就需要Preferred leader選舉。
- 集群成員管理:控制器能夠監(jiān)控新broker的增加,broker的主動關(guān)閉與被動宕機(jī),進(jìn)而做其他工作。這也是利用Zookeeper的ZNode模型和Watcher機(jī)制,控制器會監(jiān)聽Zookeeper中/brokers/ids下臨時節(jié)點的變化。同時對broker中的leader節(jié)點進(jìn)行調(diào)整。
- 元數(shù)據(jù)服務(wù):控制器上保存了最全的集群元數(shù)據(jù)信息,其他所有broker會定期接收控制器發(fā)來的元數(shù)據(jù)更新請求,從而更新其內(nèi)存中的緩存數(shù)據(jù)。
分區(qū)副本選舉機(jī)制
發(fā)生副本選舉的情況:
- 創(chuàng)建主題
- 增加分區(qū)
- 分區(qū)下線(分區(qū)中原先的leader副本下線,此時分區(qū)需要選舉一個新的leader上線來對外提供服務(wù))
- 分區(qū)重分配
分區(qū)leader副本的選舉由Kafka控制器負(fù)責(zé)具體實施。主要過程如下:
- 從Zookeeper中讀取當(dāng)前分區(qū)的所有ISR(in-sync replicas)集合。
- 調(diào)用配置的分區(qū)選擇算法選擇分區(qū)的leader。
分區(qū)副本分為ISR(同步副本)和OSR(非同步副本),當(dāng)leader發(fā)生故障時,只有“同步副本”才可以被選舉為leader。選舉時按照集合中副本的順序查找第一個存活的副本,并且這個副本在ISR集合中。同時kafka支持OSR(非同步副本)也參加選舉,Kafka broker端提供了一個參數(shù)unclean.leader.election.enable,用于控制是否允許非同步副本參與leader選舉;如果開啟,則當(dāng) ISR為空時就會從這些副本中選舉新的leader,這個過程稱為 Unclean leader選舉。可以根據(jù)實際的業(yè)務(wù)場景選擇是否開啟Unclean leader選舉。開啟 Unclean 領(lǐng)導(dǎo)者選舉可能會造成數(shù)據(jù)丟失,但好處是,它使得分區(qū) Leader 副本一直存在,不至于停止對外提供服務(wù),因此提升了高可用性。一般建議是關(guān)閉Unclean leader選舉,因為通常數(shù)據(jù)的一致性要比可用性重要。
消費組(Consumer Group)選主
在Kafka的消費端,會有一個消費者協(xié)調(diào)器以及消費組,組協(xié)調(diào)器(Group Coordinator)需要為消費組內(nèi)的消費者選舉出一個消費組的leader。如果消費組內(nèi)還沒有l(wèi)eader,那么第一個加入消費組的消費者即為消費組的leader,如果某一個時刻leader消費者由于某些原因退出了消費組,那么就會重新選舉leader,選舉源碼如下:
private val members = new mutable.HashMap[String, MemberMetadata]
leaderId = members.keys.headOption
在組協(xié)調(diào)器中消費者的信息是以HashMap的形式存儲的,其中key為消費者的member_id,而value是消費者相關(guān)的元數(shù)據(jù)信息。而leader的取值為HashMap中的第一個鍵值對的key(這種選舉方式等同于隨機(jī))。
消費組的Leader和Coordinator沒有關(guān)聯(lián)。消費組的leader負(fù)責(zé)Rebalance過程中消費分配方案的制定。
21、腦裂問題
controller掛掉后,Kafka集群會重新選舉一個新的controller。這里面存在一個問題,很難確定之前的controller節(jié)點是掛掉還是只是短暫性的故障。如果之前掛掉的controller又正常了,他并不知道自己已經(jīng)被取代了,那么此時集群中會出現(xiàn)兩臺controller。
其實這種情況是很容易發(fā)生。比如,某個controller由于GC而被認(rèn)為已經(jīng)掛掉,并選擇了一個新的controller。在GC的情況下,在最初的controller眼中,并沒有改變?nèi)魏螙|西,該Broker甚至不知道它已經(jīng)暫停了。因此,它將繼續(xù)充當(dāng)當(dāng)前controller,這是分布式系統(tǒng)中的常見情況,稱為腦裂。
假如,處于活躍狀態(tài)的controller進(jìn)入了長時間的GC暫停。它的ZooKeeper會話過期了,之前注冊的/controller節(jié)點被刪除。集群中其他Broker會收到zookeeper的這一通知。
由于集群中必須存在一個controller Broker,所以現(xiàn)在每個Broker都試圖嘗試成為新的controller。假設(shè)Broker 2速度比較快,成為了最新的controller Broker。此時,每個Broker會收到Broker2成為新的controller的通知,由于Broker3正在進(jìn)行"stop the world"的GC,可能不會收到Broker2成為最新的controller的通知。
等到Broker3的GC完成之后,仍會認(rèn)為自己是集群的controller,在Broker3的眼中好像什么都沒有發(fā)生一樣。
現(xiàn)在,集群中出現(xiàn)了兩個controller,它們可能一起發(fā)出具有沖突的命令,就會出現(xiàn)腦裂的現(xiàn)象。如果對這種情況不加以處理,可能會導(dǎo)致嚴(yán)重的不一致。所以需要一種方法來區(qū)分誰是集群當(dāng)前最新的Controller。
Kafka是通過使用epoch number(紀(jì)元編號,也稱為隔離令牌)來完成的。epoch number只是單調(diào)遞增的數(shù)字,第一次選出Controller時,epoch number值為1,如果再次選出新的Controller,則epoch number將為2,依次單調(diào)遞增。
每個新選出的controller通過Zookeeper 的條件遞增操作獲得一個全新的、數(shù)值更大的epoch number 。其他Broker 在知道當(dāng)前epoch number 后,如果收到由controller發(fā)出的包含較舊(較小)epoch number的消息,就會忽略它們,即Broker根據(jù)最大的epoch number來區(qū)分當(dāng)前最新的controller。
上圖,Broker3向Broker1發(fā)出命令:讓Broker1上的某個分區(qū)副本成為leader,該消息的epoch number值為1。于此同時,Broker2也向Broker1發(fā)送了相同的命令,不同的是,該消息的epoch number值為2,此時Broker1只聽從Broker2的命令(由于其epoch number較大),會忽略Broker3的命令,從而避免腦裂的發(fā)生。
22、如何為Kafka集群選擇合適的
Topics/Partitions數(shù)量
1、根據(jù)當(dāng)前topic的消費者數(shù)量確認(rèn)
在kafka中,單個patition是kafka并行操作的最小單元。在producer和broker端,向每一個分區(qū)寫入數(shù)據(jù)是可以完全并行化的,此時,可以通過加大硬件資源的利用率來提升系統(tǒng)的吞吐量,例如對數(shù)據(jù)進(jìn)行壓縮。在consumer段,kafka只允許單個partition的數(shù)據(jù)被一個consumer線程消費。因此,在consumer端,每一個Consumer Group內(nèi)部的consumer并行度完全依賴于被消費的分區(qū)數(shù)量。綜上所述,通常情況下,在一個Kafka集群中,partition的數(shù)量越多,意味著可以到達(dá)的吞吐量越大。
2、根據(jù)consumer端的最大吞吐量確定
我們可以粗略地通過吞吐量來計算kafka集群的分區(qū)數(shù)量。假設(shè)對于單個partition,producer端的可達(dá)吞吐量為p,Consumer端的可達(dá)吞吐量為c,期望的目標(biāo)吞吐量為t,那么集群所需要的partition數(shù)量至少為max(t/p,t/c)。在producer端,單個分區(qū)的吞吐量大小會受到批量大小、數(shù)據(jù)壓縮方法、 確認(rèn)類型(同步/異步)、復(fù)制因子等配置參數(shù)的影響。經(jīng)過測試,在producer端,單個partition的吞吐量通常是在10MB/s左右。在consumer端,單個partition的吞吐量依賴于consumer端每個消息的應(yīng)用邏輯處理速度。因此,我們需要對consumer端的吞吐量進(jìn)行測量。
23、Kafka 分區(qū)數(shù)可以增加或減少嗎,為什么
kafka支持分區(qū)數(shù)增加
例如我們可以使用 bin/kafka-topics.sh -alter --topic --topic topic-name --partitions 3 命令將原本分區(qū)數(shù)為1得topic-name設(shè)置為3。當(dāng)主題中的消息包含有key時(即key不為null),根據(jù)key來計算分區(qū)的行為就會有所影響。當(dāng)topic-config的分區(qū)數(shù)為1時,不管消息的key為何值,消息都會發(fā)往這一個分區(qū)中;當(dāng)分區(qū)數(shù)增加到3時,那么就會根據(jù)消息的key來計算分區(qū)號,原本發(fā)往分區(qū)0的消息現(xiàn)在有可能會發(fā)往分區(qū)1或者分區(qū)2中。如此還會影響既定消息的順序,所以在增加分區(qū)數(shù)時一定要三思而后行。對于基于key計算的主題而言,建議在一開始就設(shè)置好分區(qū)數(shù)量,避免以后對其進(jìn)行調(diào)整。
Kafka 不支持減少分區(qū)數(shù)。
按照Kafka現(xiàn)有的代碼邏輯而言,此功能完全可以實現(xiàn),不過也會使得代碼的復(fù)雜度急劇增大。實現(xiàn)此功能需要考慮的因素很多,比如刪除掉的分區(qū)中的消息該作何處理?如果隨著分區(qū)一起消失則消息的可靠性得不到保障;如果需要保留則又需要考慮如何保留。直接存儲到現(xiàn)有分區(qū)的尾部,消息的時間戳就不會遞增,如此對于Spark、Flink這類需要消息時間戳(事件時間)的組件將會受到影響;如果分散插入到現(xiàn)有的分區(qū)中,那么在消息量很大的時候,內(nèi)部的數(shù)據(jù)復(fù)制會占用很大的資源,而且在復(fù)制期間,此主題的可用性又如何得到保障?與此同時,順序性問題、事務(wù)性問題、以及分區(qū)和副本的狀態(tài)機(jī)切換問題都是不得不面對的。反觀這個功能的收益點卻是很低,如果真的需要實現(xiàn)此類的功能,完全可以重新創(chuàng)建一個分區(qū)數(shù)較小的主題,然后將現(xiàn)有主題中的消息按照既定的邏輯復(fù)制過去即可。
24、談?wù)勀銓afka生產(chǎn)者冪等性的了解
Kafka精確一次性(Exactly-once)保障之一
生產(chǎn)者冪等性主要避免生產(chǎn)者數(shù)據(jù)重復(fù)提交至Kafka broker中并落盤。在正常情況下,Producer向Broker發(fā)送消息,Broker將消息追加寫到對應(yīng)的流(即某一Topic的某一Partition)中并落盤,并向Producer返回ACK信號,表示確認(rèn)收到。但是Producer和Broker之間的通信總有可能出現(xiàn)異常,如果消息已經(jīng)寫入,但ACK在半途丟失了,Producer就會進(jìn)行retry操作再次發(fā)送該消息,造成重復(fù)寫入。
為了實現(xiàn)Producer的冪等性,Kafka引入了Producer ID(即PID)和Sequence Number。
- PID。每個新的Producer在初始化的時候會被分配一個唯一的PID,這個PID對用戶是不可見的。
- Sequence Numbler。對于每個PID,該P(yáng)roducer發(fā)送數(shù)據(jù)的每個都對應(yīng)一個從0開始單調(diào)遞增的Sequence Number
- Broker端在緩存中保存了這seq number,對于接收的每條消息,如果其序號比Broker緩存中序號大于1則接受它,否則將其丟棄,這樣就可以實現(xiàn)了消息重復(fù)提交了.但是只能保證單個Producer對于同一個的Exactly Once語義
Producer使用冪等性的示例非常簡單,與正常情況下Producer使用相比變化不大,只需要 把Producer的配置enable.idempotence設(shè)置為true即可,如下所示:
Properties props = new Properties();
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
//當(dāng)enable.idempotence為true時acks默認(rèn)為 all
// props.put("acks", "all");
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer producer = new KafkaProducer(props);
producer.send(new ProducerRecord(topic, "test");
Prodcuer 冪等性對外保留的接口非常簡單,其底層的實現(xiàn)對上層應(yīng)用做了很好的封裝,應(yīng)用層并不需要去關(guān)心具體的實現(xiàn)細(xì)節(jié),對用戶非常友好
Kafka的冪等性實現(xiàn)了對于單個Producer會話、單個TopicPartition級別的不重不漏,也就是最細(xì)粒度的保證。如果Producer重啟(PID發(fā)生變化),或者寫入是跨Topic、跨Partition的,單純的冪等性就會失效,需要更高級別的事務(wù)性來解決了。當(dāng)然事務(wù)性的原理更加復(fù)雜
25、談?wù)勀銓?Kafka事務(wù)的了解
冪等性可以保證單個Producer會話、單個TopicPartition、單個會話session的不重不漏,如果Producer重啟,或者是寫入跨Topic、跨Partition的消息,冪等性無法保證。此時需要用到Kafka事務(wù)。Kafka 的事務(wù)處理,主要是允許應(yīng)用可以把消費和生產(chǎn)的 batch 處理(涉及多個 Partition)在一個原子單元內(nèi)完成,操作要么全部完成、要么全部失敗。為了實現(xiàn)這種機(jī)制,我們需要應(yīng)用能提供一個唯一 id,即使故障恢復(fù)后也不會改變,這個 id 就是 TransactionnalId(也叫 txn.id),txn.id 可以跟內(nèi)部的 PID 1:1 分配,它們不同的是 txn.id 是用戶提供的,而 PID 是 Producer 內(nèi)部自動生成的(并且故障恢復(fù)后這個 PID 會變化),有了 txn.id 這個機(jī)制,就可以實現(xiàn)多 partition、跨會話的 EOS 語義。當(dāng)用戶使用 Kafka 的事務(wù)性時,Kafka 可以做到的保證:
- 跨會話的冪等性寫入:即使中間故障,恢復(fù)后依然可以保持冪等性;
- 跨會話的事務(wù)恢復(fù):如果一個應(yīng)用實例掛了,啟動的下一個實例依然可以保證上一個事務(wù)完成(commit 或者 abort);
- 跨多個 Topic-Partition 的冪等性寫入,Kafka 可以保證跨多個 Topic-Partition 的數(shù)據(jù)要么全部寫入成功,要么全部失敗,不會出現(xiàn)中間狀態(tài)。
事務(wù)性示例
Kafka 事務(wù)性的使用方法也非常簡單,用戶只需要在 Producer 的配置中配置 transactional.id,通過 initTransactions() 初始化事務(wù)狀態(tài)信息,再通過 beginTransaction() 標(biāo)識一個事務(wù)的開始,然后通過 commitTransaction() 或 abortTransaction() 對事務(wù)進(jìn)行 commit 或 abort,示例如下所示:生產(chǎn)者:
Properties props = new Properties();
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("client.id", "ProducerTranscationnalExample");
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "test-transactional");
props.put("acks", "all");
KafkaProducer producer = new KafkaProducer(props);
producer.initTransactions();
try {String msg = "matt test";producer.beginTransaction();producer.send(new ProducerRecord(topic, "0", msg.toString()));producer.send(new ProducerRecord(topic, "1", msg.toString()));producer.send(new ProducerRecord(topic, "2", msg.toString()));producer.commitTransaction();
} catch (ProducerFencedException e1) {e1.printStackTrace();producer.close();
} catch (KafkaException e2) {e2.printStackTrace();producer.abortTransaction();
}
producer.close();
消費者:消費者應(yīng)該設(shè)置提交事務(wù)的隔離級別
properties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,"read_committed");
Kafka中只有兩種事務(wù)隔離級別:readcommitted、readuncommitted 設(shè)置為readcommitted時候是生產(chǎn)者事務(wù)已提交的數(shù)據(jù)才能讀取到。在執(zhí)行 commitTransaction() 或 abortTransaction() 方法前,設(shè)置為“readcommitted”的消費端應(yīng)用是消費不到這些消息的,不過在 KafkaConsumer 內(nèi)部會緩存這些消息,直到生產(chǎn)者執(zhí)行 commitTransaction() 方法之后它才能將這些消息推送給消費端應(yīng)用。同時KafkaConsumer會根據(jù)分區(qū)對數(shù)據(jù)進(jìn)行整合,推送時按照分區(qū)順序進(jìn)行推送。而不是按照數(shù)據(jù)發(fā)送順序。反之,如果生產(chǎn)者執(zhí)行了 abortTransaction() 方法,那么 KafkaConsumer 會將這些緩存的消息丟棄而不推送給消費端應(yīng)用。設(shè)置為read_uncommitted時候可以讀取到未提交的數(shù)據(jù)(報錯終止前的數(shù)據(jù))
26、Kafka消息是采用Pull模式,還是Push模式
push模式下,消費者速率主要由生產(chǎn)者決定,當(dāng)消息生產(chǎn)速率遠(yuǎn)大于消費速率,消費者容易崩潰,如果為了避免consumer崩潰而采用較低的推送速率,將可能導(dǎo)致一次只推送較少的消息而造成浪費。Pull模式可以根據(jù)自己的消費能力拉取數(shù)據(jù)。Push模式必須在不知道下游consumer消費能力和消費策略的情況下決定是立即推送每條消息還是緩存之后批量推送。Pull有個缺點是,如果broker沒有可供消費的消息,將導(dǎo)致consumer不斷輪詢。但是可以在消費者設(shè)置輪詢間隔。
27、Kafka缺點
- 由于是批量發(fā)送,數(shù)據(jù)并非真正的實時;
- 對于mqtt協(xié)議不支持;
- 不支持物聯(lián)網(wǎng)傳感數(shù)據(jù)直接接入;
- 僅支持統(tǒng)一分區(qū)內(nèi)消息有序,無法實現(xiàn)全局消息有序;
- 監(jiān)控不完善,需要安裝插件;
- 依賴zookeeper進(jìn)行元數(shù)據(jù)管理;3.0版本去除
28、Kafka什么時候會丟數(shù)據(jù)
broker端消費丟失
broker端的消息不丟失,其實就是用partition副本機(jī)制來保證。
- unclean.leader.election為true,且選舉出的首領(lǐng)分區(qū)為OSR時 可能就會發(fā)生消息丟失
- min.insync.replicas為N,則至少要存在N個同步副本才能向分區(qū)寫入數(shù)據(jù)。如果同步副本數(shù)量小于N時broker就會停止接收所有生產(chǎn)者的消息、生產(chǎn)者會出現(xiàn)異常,如果無法正確處理異常,則消息丟失。此時消費者仍然可以讀取已有數(shù)據(jù)、變成只讀狀態(tài)。如果Topic只有一個同步副本,那么在這個副本變?yōu)椴豢捎脮r,數(shù)據(jù)就可能會丟失。
- kafka的數(shù)據(jù)一開始是存儲在PageCache并定期flush到磁盤上的,如果出現(xiàn)斷電或者機(jī)器故障等,PageCache上的數(shù)據(jù)就丟失了。
生產(chǎn)者端
- ack有3種狀態(tài)保證消息被安全生產(chǎn) ack=0,消息傳輸?shù)紹roker端沒收到Broker的反饋即發(fā)送下一條,網(wǎng)絡(luò)故障導(dǎo)致小東西丟失。ack=1,如果剛好leader partition掛了,數(shù)據(jù)就會丟失。ack=all,min.insync.replicas如果小于N或者Topic只有一個同步副本。
- 消息重試機(jī)制未開啟。
- 當(dāng)前消息過大,超過max.request.size大小,默認(rèn)為1MB
- 生產(chǎn)者速率超過消費者,緩存池空間占滿后,生產(chǎn)線程阻塞超過最大時間,此時生產(chǎn)者會拋出異常,如果沒有處理好則會丟失數(shù)據(jù)。
消費者端
enable.auto.commit=true,消費在處理之前提交了offset,則處理異??赡軙斐上⒌膩G失。enable.auto.commit=false,Consumer手動批量提交位點,在批量位點中某個位點數(shù)據(jù)異常時,沒有正確處理異常,而是將批量位點的最后一個位點提交,導(dǎo)致異常數(shù)據(jù)丟失
29、Kafka分區(qū)數(shù)越多越好嗎
并非分區(qū)數(shù)量越多,效率越高
- Topic 每個 partition 在 Kafka 路徑下都有一個自己的目錄,該目錄下有兩個主要的文件:base_offset.log 和 base_offset.index。Kafka 服務(wù)端的 ReplicaManager 會為每個 Broker 節(jié)點保存每個分區(qū)的這兩個文件的文件句柄。所以如果分區(qū)過多,ReplicaManager 需要保持打開狀態(tài)的文件句柄數(shù)也就會很多。
- 每個 Producer, Consumer 進(jìn)程都會為分區(qū)緩存消息,如果分區(qū)過多,緩存的消息越多,占用的內(nèi)存就越大;
- n 個分區(qū)有 1 個 Leader,(n-1) 個 Follower,如果運行過程中 Leader 掛了,則會從剩余 (n-1) 個 Followers 中選舉新 Leader;如果有成千上萬個分區(qū),那么需要很長時間的選舉,消耗較大的性能。
30、Kafka如何保證消息的有序性
單分區(qū)
Kafka在特定條件下可以保障單分區(qū)消息的有序性
kafka在發(fā)送消息過程中,正常情況下是有序的,如果消息出現(xiàn)重試,則會造成消息亂序。導(dǎo)致亂序的原因是:max.in.flight.requests.per.connection默認(rèn)值為5。
該參數(shù)指定了生產(chǎn)者在收到服務(wù)器響應(yīng)之前,請求隊列中可以提交多少個請求,用于提高網(wǎng)絡(luò)吞吐量。
圖中,batch1-5在請求隊列中,batch1作為最新數(shù)據(jù)進(jìn)行提交,提交失敗后如果開啟重試機(jī)制,則batch1會重新添加到本地緩沖池的頭部,然后提交至請求隊列中重新發(fā)送。此時batch1的順序會排在batch5之后,發(fā)生了亂序。
解決方式是將max.in.flight.requests.per.connection設(shè)置為1,消息隊列中只允許有一個請求,這樣消息失敗后,可以第一時間發(fā)送,不會產(chǎn)生亂序,但是會降低網(wǎng)絡(luò)吞吐量。
或者開啟生產(chǎn)者冪等性設(shè)置,開啟后,該P(yáng)roducer發(fā)送的消息都對應(yīng)一個單調(diào)增的Sequence Number。同樣的Broker端也會為每個生產(chǎn)者的每條消息維護(hù)一個序號,并且每commit一條數(shù)據(jù)時就會將其序號遞增。對于接收到的數(shù)據(jù),如果其序號比Borker維護(hù)的序號大一(即表示是下一條數(shù)據(jù)),Broker會接收它,否則將其丟棄。如果消息序號比Broker維護(hù)的序號差值比一大,說明中間有數(shù)據(jù)尚未寫入,即亂序,此時Broker拒絕該消息,Producer拋出InvalidSequenceNumber 如果消息序號小于等于Broker維護(hù)的序號,說明該消息已被保存,即為重復(fù)消息,Broker直接丟棄該消息,Producer拋出DuplicateSequenceNumber Sender發(fā)送失敗后會重試,這樣可以保證每個消息都被發(fā)送到broker
多分區(qū)
Kafka本身無法保障多分區(qū)的有序性,可以通過業(yè)務(wù)設(shè)計進(jìn)行保證,例如需要單表數(shù)據(jù)通過自定義partition的方式發(fā)送至同一個分區(qū)
31、Kafka精確一次性(Exactly-once)如何保證
宏觀上:可靠性 + at least once + 冪等性
具體實現(xiàn):Kafka不丟消息-生產(chǎn)者冪等性-消費者冪等性
詳見目錄:
12、Kafka可靠性保證(不丟消息)
18、如何保證消息不被重復(fù)消費(消費者冪等性)
24、談?wù)勀銓afka生產(chǎn)者冪等性的了解?