修改網(wǎng)站圖標seo有名氣的優(yōu)化公司
系列文章目錄
上手第一關(guān),手把手教你安裝kafka與可視化工具kafka-eagle
Kafka是什么,以及如何使用SpringBoot對接Kafka
架構(gòu)必備能力——kafka的選型對比及應用場景
Kafka存取原理與實現(xiàn)分析,打破面試難關(guān)
- 系列文章目錄
- 一、主題與分區(qū)
- 1. 模型
- 2. 消息與分發(fā)
- 二、分區(qū)內(nèi)數(shù)據(jù)的存儲
- 1. 消息的存儲
- ① 偏移量與日志文件
- ② 索引的構(gòu)成
- 2. 消息的讀取
- ① 消費偏移量的存儲
- ②Compaction策略
- ③查找并讀取消息
- 3. 快速存取實現(xiàn)
- 總結(jié)
在前面的幾篇內(nèi)容中,我們依次講了Kafka的安裝、與Spring Boot的結(jié)合,還有選型與應用場景。但是筆者也知道,對于很多小伙伴來說,原理及實現(xiàn)才算重頭戲,而且也是面試熱點,那么本次我們先來進行存取原理的分析,當然抱著疑問去學習才是最快的,因此在開始之前,我也先拋出一些Kafka的重點與熱點問題,希望大家在學習過程中能總結(jié)印證
- Kafka為什么吞吐量這么高?
- Kafka的數(shù)據(jù)存與取有什么特點?
📕作者簡介:戰(zhàn)斧,從事金融IT行業(yè),有著多年一線開發(fā)、架構(gòu)經(jīng)驗;愛好廣泛,樂于分享,致力于創(chuàng)作更多高質(zhì)量內(nèi)容
📗本文收錄于 kafka 專欄,有需要者,可直接訂閱專欄實時獲取更新
📘高質(zhì)量專欄 云原生、RabbitMQ、Spring全家桶 等仍在更新,歡迎指導
📙Zookeeper Redis dubbo docker netty等諸多框架,以及架構(gòu)與分布式專題即將上線,敬請期待
一、主題與分區(qū)
1. 模型
我們其實在《架構(gòu)必備能力——kafka的選型對比及應用場景》 一文中其實講到了Kafka的模型,我們這里再把老圖拿出來用一遍
不難看出,邏輯上的源頭就是主題
,也即Topic,而主題又劃分為多個分區(qū)
。我們先來談談主題與分區(qū)的實現(xiàn),在Kafka中,可以使用以下命令來聲明一個主題并指定分區(qū):
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 4 --topic my-topic
其中:
–create: 聲明一個新的主題。
–zookeeper: 指定 ZooKeeper 的地址和端口號。
–replication-factor: 指定副本因子,即每個分區(qū)在集群中的副本數(shù)量。這里指定為1,表示每個分區(qū)只有一個副本。
–partitions: 指定分區(qū)數(shù)。這里指定為4,表示該主題有4個分區(qū)。
–topic: 指定主題名稱,這里為 my-topic。
注意:如果要指定分區(qū)數(shù)量,必須在創(chuàng)建主題的時候指定,之后無法更改。因此,在創(chuàng)建主題時應該仔細考慮分區(qū)數(shù)量,以滿足業(yè)務需求。
當然,如果有同學還記得前面的內(nèi)容,應該知道我們在對接Spring Boot時,并沒有提前建立主題而是直接使用了。其中的原因是,我們在Spring Boot中使用Kafka,如果在發(fā)送消息時指定的主題不存在,Kafka會自動創(chuàng)建該主題。在創(chuàng)建時,Kafka將使用默認的分區(qū)數(shù)量(通常為1),以及默認的副本因子(通常為1)來創(chuàng)建分區(qū)。
2. 消息與分發(fā)
然后當我們發(fā)布者向某個主題發(fā)送消息時,其就會被“分發(fā)”到某一個分區(qū)里
那么有小伙伴肯定會問:
Kafka的主題消息會進哪個分區(qū)是我們可以決定的嗎?默認是進入哪個分區(qū)?
答案是Kafka的主題消息可以由生產(chǎn)者自己決定要發(fā)送到哪個分區(qū),也可以使用Kafka提供的默認分區(qū)分配算法來自動決定消息要進入哪個分區(qū)。
-
指定分區(qū):如果生產(chǎn)者自己決定要發(fā)送到哪個分區(qū),可以在發(fā)送消息時指定消息要發(fā)送到的分區(qū)編號。此時,如果指定的分區(qū)編號存在,則消息會被發(fā)送到該分區(qū);如果指定的分區(qū)編號不存在,則會拋出異常。
-
自動分區(qū):如果使用默認的分區(qū)分配算法,Kafka提供了多種分配算法,例如
輪詢
(Round-Robin)、隨機
(Random)、哈希
(Hash)等。默認情況下,Kafka使用哈希算法將消息均勻地分配到所有可用的分區(qū)中
當然在此之前,我們可以看下KafkaTemplate前面提供的API
不難知道Kafka消息除了指明主題以外,還由以下要素組成:
- 消息的key:是一個可選項,用于標識消息的唯一性和分區(qū)。如果不指定key,則會隨機分配一個key,并將消息發(fā)送到隨機的分區(qū)。
- 消息的value:是消息的實際內(nèi)容,也是必填項。
- 消息的時間戳:是可選項,用于標識消息的時間戳。Kafka可以根據(jù)時間戳來處理消息的順序、分配和延遲。
- 消息的分區(qū):指定消息應該發(fā)送到哪個分區(qū)。如果不指定分區(qū),則使用默認的分區(qū)器來決定分區(qū)。
二、分區(qū)內(nèi)數(shù)據(jù)的存儲
從邏輯上來說,kafka的分區(qū)是一個消息隊列,當我們發(fā)送的消息經(jīng)由分區(qū)器進行分發(fā)后,就會進入某個分區(qū)并被順序的保存下來。在實現(xiàn)上,Kafka的分區(qū)更像一個日志記錄系統(tǒng),把消息當作日志,順序的寫入磁盤
1. 消息的存儲
我們需要知道,Kafka中,每個分區(qū)被組織為一組日志段
(Log Segment),其中每個日志段都包含了一個連續(xù)的消息序列。當一個日志段被寫滿后,它將被關(guān)閉并分配一個更高的編號,新的消息將被追加到一個新的日志段中。而日志段
的核心又由兩個部分組成:索引文件
(index file)和數(shù)據(jù)文件
(data file)
-
數(shù)據(jù)文件: 也叫日志文件,數(shù)據(jù)文件是消息分區(qū)的核心部分,它是以追加方式寫入的文件。當有新的消息寫入分區(qū)時,Kafka會根據(jù)協(xié)議、消息頭、消息體等信息將消息封裝成字節(jié)流,然后追加寫入數(shù)據(jù)文件。
-
索引文件: 索引文件是一個不可變的有序映射,它將消息偏移量映射到數(shù)據(jù)文件中的位置。當一個消費者讀取一個分區(qū)的消息時,它會使用
偏移量
讀取索引文件中的位置,并從該位置讀取數(shù)據(jù)文件中的消息。
如下圖,就是我們上期發(fā)送了一條消息,而建立的目錄test_topic-0,代表該目錄是test_topic主題下的 0 號分區(qū),可以看到里面的 index文件 和 log 文件
① 偏移量與日志文件
要想更深入的了解,我們必須先解釋一下kafka中消息偏移量(offset)
的概念:當一條記錄需要寫入分區(qū)的時候,它會被追加到 log 文件的末尾,同時會被分配一個唯一的序號,稱為 Offset(偏移量)。Offset 是一個遞增的、不可變的數(shù)字,由 Kafka 自動維護。需要注意的是,在后續(xù)內(nèi)容中,我們還會提到各種不同的偏移量,請注意區(qū)分,不要混淆了
由于Offset 初始值為 0,所以當?shù)谝粭l消息達到分區(qū)后,就會建立起 00000000000000000000.log 這樣的文件來進行消息的存儲,后續(xù)消息將會在這個文件內(nèi)追加寫入,直到文件大小超出限制(其默認值為1GB)
舉個例子,當?shù)?70411個消息(Offset = 170410)來到時,發(fā)現(xiàn) 00000000000000000000.log 已經(jīng)超過了 1 G,此時其就會新創(chuàng)建一個日志段,同時以本offset為名,新建一個日志文件,命名為 0000000000000170410.log,此時本分區(qū)就形成了兩個日志段,情況如下:
② 索引的構(gòu)成
我們上面講了 .log 文件,也即數(shù)據(jù)文件的創(chuàng)建機制。但是還沒講段的另一個組成部分,也即索引文件。索引其實就像字典的目錄,是幫助大家快速找到某條消息的工具,索引文件存儲的內(nèi)容主要就是 消息偏移量(offset) 與 消息存儲地址(position) 的映射關(guān)系。
Kafka的索引文件由多個索引條目
(index entry)組成,每個索引條目包含兩個核心字段:
- offset:消息的偏移量(這里是相對偏移量,每個索引文件都以0起始,其對應的真實偏移量為段初始偏移 + 本offset);
- position:消息在日志文件中的磁盤位置(相對偏移量,偏移量僅適用于對應的日志文件)
需要注意的是,不是每一條消息都會有索引。這里有參數(shù) index.interval.bytes 的控制,其默認值為 4 KB,即表示當前分區(qū) log 文件寫入了 4 KB 數(shù)據(jù)后才會在索引文件中增加一個索引條目
2. 消息的讀取
現(xiàn)在我們已經(jīng)存儲了一些數(shù)據(jù),下面就要開始讀取了,我們目前掌握了這些文件,那么怎么才能找到并讀取消息呢?
① 消費偏移量的存儲
我們不難理解,每個消費者負責需要消費分配給它的分區(qū)上的消息,并記錄自己在每個分區(qū)上消費的最新偏移量。對于消費者而言,怎么知道自己應該要消費哪個offset的消息?消費者可以通過以下兩種方式記錄消費的偏移量:
-
手動提交偏移量:消費者在消費消息時,可以手動調(diào)用 consumer.commitSync() 或 consumer.commitAsync() 方法將消費的偏移量提交到 Kafka 中。該方法接收的參數(shù)表示要提交的偏移量的值,提交后,Kafka 會將該偏移量記錄到內(nèi)部的偏移量管理器中。
-
自動同步提交偏移量:消費者可以將 enable.auto.commit 參數(shù)設(shè)置為 true,開啟自動提交偏移量的功能。啟用該功能后,Kafka 會自動記錄消費者消費過的最新偏移量,并定期將其定期提交到 Kafka 中。
但不管怎么樣,這個消費的偏移量最終都是由kafka來進行保存的,那么其具體的存儲是怎么實現(xiàn)的呢?Kafka其實提供了將給定消費者組的所有偏移存儲在一個叫做組協(xié)調(diào)器(group coordinator)
的組件。
通過官方文檔不難看出,當組協(xié)調(diào)器
收到偏移量變動的請求時,會將對應數(shù)據(jù)存儲在內(nèi)置的主題 __consumer_offsets
中(在舊版本中偏移量是存在ZK中的),我們可以在ZK中看到這個主題的情況:
在我們的本地目錄中也能看到這個 __consumer_offsets 主題一共建了50個分區(qū)(默認):
當然它分區(qū)的個數(shù),可以在Kafka服務器配置文件中通過參數(shù)offsets.topic.num.partitions
進行配置。
當我們以某個消費者組消費掉某條消息并提交偏移量后,偏移量會被提交到 __consumer_offsets Topic的一個特定分區(qū),該分區(qū)由所消費的主題和消費者組的哈希值決定。在我的例子里,是被提交到了 __consumer_offsets-45,如下:
②Compaction策略
相信你會對這種存儲消費位置的方式有所困惑,因為按照我們前面的說法,Kafka的內(nèi)容都是以日志形式存儲的,在使用的過程中,日志豈不是會越來越大?到最后找一次偏移量都很麻煩?這就不得不提到Kafka中的Compaction策略
compaction是一種保留最后N個版本的消息的消息清理策略,它保留特定鍵的最新值,同時刪除無用的鍵值,從而減少存儲空間。具體來說,Compaction會保留每個消息主題中最新的一組鍵值對,并刪除所有鍵相同但值較舊的消息。
使用Compaction策略
需要滿足以下條件:
- 消息的鍵必須是唯一的
- 消息的鍵必須是可序列化的
- 消息必須按照鍵進行劃分
- 消息的存儲時間必須足夠長,以便新消息可以替換舊消息
而這些消費偏移量的數(shù)據(jù),存儲的內(nèi)容如下
key = group.id+topic+分區(qū)號
value= offset 的值
這樣就導致某個消費組在某個分區(qū)的消費數(shù)據(jù)只會有一條,所以找起來并沒有那么復雜
③查找并讀取消息
上面我們講了消費偏移量的存儲,其實查找偏移量的過程也是一樣的,同一個消費組會先從特定的 __consumer_offsets 拿取偏移量,拿到偏移量以后,比如偏移量是 170417,我們?nèi)砸陨厦娴奈募闆r為例,那么它找到消息的邏輯如下:
-
首先用二分查找確定它是在哪個Segment文件中,其中0000000000000000000.index為最開始的文件,第二個文件為0000000000000170410.index(起始偏移為170410+1 = 170411),而第三個文件為0000000000000239430.index(起始偏移為239430+1 = 239431)。所以這個offset = 170417就落在第二個文件中。其他后續(xù)文件可以依此類推,以起始偏移量命名并排列這些文件,然后根據(jù)二分查找法就可以快速定位到具體文件位置。
-
用該offset減去索引文件的編號,即170417 - 170410 = 7,也用二分查找法找到索引文件中等于或者小于7的最大的那個編號??梢钥闯鑫覀兡軌蛘业絒4,476]這組數(shù)據(jù),476即offset=170410 + 4 = 170414的消息在log文件中的偏移量。
-
打開數(shù)據(jù)文件(0000000000000170410.log),從位置為476的那個地方開始順序掃描直到找到offset為170417的那條Message。
總結(jié)來說:就是通過二分法先找到index文件,然后再在index文件中通過二分法找到某一條索引條目,然后根據(jù)該索引條目給出的地址去log文件中快速定位,最后從這個定位開始,順序掃描下去直到找到我們指定的偏移量數(shù)據(jù)
3. 快速存取實現(xiàn)
我們上面講了Kafka的一大堆的奇特設(shè)計,不知道小伙伴們是否產(chǎn)生過疑問,比如為什么一個主題要分成多個分區(qū) ?一個分區(qū)為什么要劃成多個段?以及為什么把數(shù)據(jù)存儲成日志格式 ? 其實這些都是在優(yōu)化性能,我們從快速存取的角度講一下Kafka都做了哪些努力【面試重點】:
-
多分區(qū)負載均衡:Kafka支持將一個主題的數(shù)據(jù)分散至多個分區(qū),不同分區(qū)位于多個broker節(jié)點上,實現(xiàn)了集群負載均衡,從而提高了寫入和讀取的性能。
-
分段存儲:Kafka會將數(shù)據(jù)分段存儲,每個段的大小和時間可以根據(jù)需求進行配置,這樣可以提高讀取性能并減少刪除操作對IO的影響。
-
批量寫入:Kafka允許客戶端一次性寫入多條消息到broker,減少了網(wǎng)絡(luò)傳輸?shù)臅r間。
-
零拷貝:Kafka使用
mmap
映射磁盤上的文件到虛擬內(nèi)存空間,然后通過直接內(nèi)存訪問(Direct Memory Access)的方式將數(shù)據(jù)從磁盤讀取到內(nèi)存中,還使用sendfile
系統(tǒng)調(diào)用來實現(xiàn)網(wǎng)絡(luò)發(fā)送時的零拷貝,這樣網(wǎng)絡(luò)數(shù)據(jù)也可以直接從內(nèi)核空間中發(fā)送,避免了數(shù)據(jù)拷貝到用戶空間的過程。 -
異步刷盤:Kafka支持異步刷盤,即將消息寫入日志后,不會立即將數(shù)據(jù)從內(nèi)存刷入磁盤,而是會緩存一段時間再批量寫入磁盤,減少了磁盤I/O的次數(shù),提高了寫入性能。
-
稀疏索引:Kafka會為每個段維護一個索引,以便在讀取數(shù)據(jù)時快速定位到所需數(shù)據(jù)的位置。這樣可以避免全盤掃描,提高數(shù)據(jù)讀取性能。但如果每個消息都寫進索引,會導致索引文件臃腫,且降低存儲速度,所以采用了稀疏索引的方式
如果你按照《Kafka是什么,以及如何使用SpringBoot對接Kafka》中的動手操作過,我們可以繼續(xù)來做個實現(xiàn),我們先看一下log文件,如下
然后我們把發(fā)送的代碼改成如下,這樣一次發(fā)送1000條消息,注意,我們在這里還加上了 kafkaTemplate.flush(),因為當使用Kafka Template發(fā)送消息時,消息并不會立即發(fā)送到Kafka Broker,而是會被緩存在Kafka Template中,以減少通信次數(shù),如果我們需要立即發(fā)送,這時候就可以使用kafkaTemplate.flush()方法來實現(xiàn)立即發(fā)送。
@Service
public class KafkaService {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String message) {for (int i = 0; i < 1000; i++) {kafkaTemplate.send("another_topic2", 0,"key",message + i);}kafkaTemplate.flush();System.out.println("we have send message");}
}
但當我們發(fā)送消息,成功輸出 we have send message ,并又成功接收到消息后,如圖
我們卻會看到 log 文件的大小沒有發(fā)生變化,即便是不停的刷新目錄也無濟于事
然而如果我們單擊并右鍵選中該文件,就會看到該文件被更新,且大小發(fā)生變動
這就說明了其寫入硬盤的過程是異步且有延遲的,使用了操作系統(tǒng)的延遲寫入(delayed write)機制。但其傳輸數(shù)據(jù)卻可以脫離硬盤,使用內(nèi)存緩存作為收發(fā)介質(zhì),直接實現(xiàn)傳達
總結(jié)
今天我們詳細講解了消息在kafak中的存與取,也介紹了不少細節(jié)點,知道了Kafka采用批量傳輸設(shè)計減少網(wǎng)絡(luò)訪問次數(shù),然后用分區(qū)、分段、追加日志等方案來提高吞吐量,并且利用了操作系統(tǒng)的零拷貝、異步刷盤等方式來減少磁盤寫入的瓶頸,最終成為了一款性能優(yōu)異、吞吐量極大的中間件。希望通過今天的學習,能對大家有所幫助,我們將在后面繼續(xù)講解kafka的其他實現(xiàn)細節(jié)。如果你對此有興趣,可以直接訂閱本
kafka 專欄