網(wǎng)站設(shè)計廣州鄭州建網(wǎng)站的公司
kafka的log存儲解析——topic的分區(qū)partition分段segment以及索引等
引言Kafka中的Message是以topic為基本單位組織的,不同的topic之間是相互獨立的。每個topic又可以分成幾個不同的partition(每個topic有幾個partition是在創(chuàng)建topic時指定
的),每個partition存儲一部分Message。借用官方的一張圖,可以直觀地看到topic和partition的關(guān)系。
partition是以文件的形式存儲在文件系統(tǒng)中,比如,創(chuàng)建了一個名為page_visits的topic,其有5個partition,那么在Kafka的數(shù)據(jù)目錄中(由配置文件中的log.dirs指定的)中就有這
樣5個目錄: page_visits-0,?page_visits-1,page_visits-2,page_visits-3,page_visits-4,其命名規(guī)則為<topic_name>-<partition_id>,里面存儲的分別就是這5個partition的
數(shù)據(jù)。
接下來,本文將分析partition目錄中的文件的存儲格式和相關(guān)的代碼所在的位置。
Partition中的每條Message由offset來表示它在這個partition中的偏移量,這個offset不是該Message在partition數(shù)據(jù)文件中的實際存儲位置,而是邏輯上一個值,它唯一確定了
partition中的一條Message。因此,可以認(rèn)為offset是partition中Message的id。partition中的每條Message包含了以下三個屬性:
其中offset為long型,MessageSize為int32,表示data有多大,data為message的具體內(nèi)容。它的格式和Kafka通訊協(xié)議中介紹的MessageSet格式是一致。
Partition的數(shù)據(jù)文件則包含了若干條上述格式的Message,按offset由小到大排列在一起。它的實現(xiàn)類為FileMessageSet,類圖如下:
它的主要方法如下:
我們來思考一下,如果一個partition只有一個數(shù)據(jù)文件會怎么樣?
那Kafka是如何解決查找效率的的問題呢?有兩大法寶:1) 分段?2) 索引。
Kafka解決查詢效率的手段之一是將數(shù)據(jù)文件分段,比如有100條Message,它們的offset是從0到99。假設(shè)將數(shù)據(jù)文件分成5段,第一段為0-19,第二段為20-39,以此類推,每
段放在一個單獨的數(shù)據(jù)文件里面,數(shù)據(jù)文件以該段中最小的offset命名。這樣在查找指定offset的Message的時候,用二分查找就可以定位到該Message在哪個段中。
數(shù)據(jù)文件分段使得可以在一個較小的數(shù)據(jù)文件中查找對應(yīng)offset的Message了,但是這依然需要順序掃描才能找到對應(yīng)offset的Message。為了進(jìn)一步提高查找的效率,Kafka為
每個分段后的數(shù)據(jù)文件建立了索引文件,文件名與數(shù)據(jù)文件的名字是一樣的,只是文件擴(kuò)展名為.index。
索引文件中包含若干個索引條目,每個條目表示數(shù)據(jù)文件中一條Message的索引。索引包含兩個部分(均為4個字節(jié)的數(shù)字),分別為相對offset和position。
index文件中并沒有為數(shù)據(jù)文件中的每條Message建立索引,而是采用了稀疏存儲的方式,每隔一定字節(jié)的數(shù)據(jù)建立一條索引。這樣避免了索引文件占用過多的空間,從而可以將
索引文件保留在內(nèi)存中。但缺點是沒有建立索引的Message也不能一次定位到其在數(shù)據(jù)文件的位置,從而需要做一次順序掃描,但是這次順序掃描的范圍就很小了。
Partition的數(shù)據(jù)文件
offset
MessageSize
data
append: 把給定的ByteBufferMessageSet中的Message寫入到這個數(shù)據(jù)文件中。
searchFor: 從指定的startingPosition開始搜索找到第一個Message其offset是大于或者等于指定的offset,并返回其在文件中的位置Position。它的實現(xiàn)方式是從
startingPosition開始讀取12個字節(jié),分別是當(dāng)前MessageSet的offset和size。如果當(dāng)前offset小于指定的offset,那么將position向后移動LogOverHead+MessageSize(其
中LogOverHead為offset+messagesize,為12個字節(jié))。
read:準(zhǔn)確名字應(yīng)該是slice,它截取其中一部分返回一個新的FileMessageSet。它不保證截取的位置數(shù)據(jù)的完整性。
sizeInBytes: 表示這個FileMessageSet占有了多少字節(jié)的空間。
truncateTo: 把這個文件截斷,這個方法不保證截斷位置的Message的完整性。
readInto: 從指定的相對位置開始把文件的內(nèi)容讀取到對應(yīng)的ByteBuffer中。
1. 新數(shù)據(jù)是添加在文件末尾(調(diào)用FileMessageSet的append方法),不論文件數(shù)據(jù)文件有多大,這個操作永遠(yuǎn)都是O(1)的。
2. 查找某個offset的Message(調(diào)用FileMessageSet的searchFor方法)是順序查找的。因此,如果數(shù)據(jù)文件很大的話,查找的效率就低。
數(shù)據(jù)文件的分段
為數(shù)據(jù)文件建索引
相對offset:因為數(shù)據(jù)文件分段以后,每個數(shù)據(jù)文件的起始offset不為0,相對offset表示這條Message相對于其所屬數(shù)據(jù)文件中最小的offset的大小。舉例,分段后的一個數(shù)
據(jù)文件的offset是從20開始,那么offset為25的Message在index文件中的相對offset就是25-20 = 5。存儲相對offset可以減小索引文件占用的空間。
position,表示該條Message在數(shù)據(jù)文件中的絕對位置。只要打開文件并移動文件指針到這個position就可以讀取對應(yīng)的Message了。在Kafka中,索引文件的實現(xiàn)類為OffsetIndex,它的類圖如下:
主要的方法有:
我們以幾張圖來總結(jié)一下Message是如何在Kafka中存儲的,以及如何查找指定offset的Message的。
Message是按照topic來組織,每個topic可以分成多個的partition,比如:有5個partition的名為為page_visits的topic的目錄結(jié)構(gòu)為:
partition是分段的,每個段叫LogSegment,包括了一個數(shù)據(jù)文件和一個索引文件,下圖是某個partition目錄下的文件:
可以看到,這個partition有4個LogSegment。
借用博主@lizhitao博客上的一張圖來展示是如何查找Message的。
比如:要查找絕對offset為7的Message:
這套機(jī)制是建立在offset是有序的。索引文件被映射到內(nèi)存中,所以查找的速度還是很快的。
append方法,添加一對offset和position到index文件中,這里的offset將會被轉(zhuǎn)成相對的offset。
lookup, 用二分查找的方式去查找小于或等于給定offset的最大的那個offset
小結(jié)
1. 首先是用二分查找確定它是在哪個LogSegment中,自然是在第一個Segment中。
2. 打開這個Segment的index文件,也是用二分查找找到offset小于或者等于指定offset的索引條目中最大的那個offset。自然offset為6的那個索引是我們要找的,通過索引文
件我們知道offset為6的Message在數(shù)據(jù)文件中的位置為9807。
3. 打開數(shù)據(jù)文件,從位置為9807的那個地方開始順序掃描直到找到offset為7的那條Message。一句話,Kafka的Message存儲采用了分區(qū)(partition),分段(LogSegment)和稀疏索引這幾個手段來達(dá)到了高效性。
Kafka 將消息以?topic 為單位進(jìn)行歸納
將向?Kafka topic 發(fā)布消息的程序成為?producers.
將預(yù)訂?topics 并消費消息的程序成為?consumer.
Kafka 以集群的方式運行,可以由一個或多個服務(wù)組成,每個服務(wù)叫做一個?broker.
producers 通過網(wǎng)絡(luò)將消息發(fā)送到?Kafka 集群,集群向消費者提供消息
數(shù)據(jù)傳輸?shù)氖聞?wù)定義通常有以下三種級別:
(
1)最多一次: 消息不會被重復(fù)發(fā)送,最多被傳輸一次,但也有可能一次不傳輸
(
2)最少一次: 消息不會被漏發(fā)送,最少被傳輸一次,但也有可能被重復(fù)傳輸.
(
3)精確的一次(Exactly once):不會漏傳輸也不會重復(fù)傳輸,每個消息都傳輸被一次而且僅僅被傳輸一次,這是大家所期望的
(
1)節(jié)點必須可以維護(hù)和?ZooKeeper 的連接,Zookeeper 通過心跳機(jī)制檢查每個節(jié)點的連接
(
2)如果節(jié)點是個?follower,他必須能及時的同步?leader 的寫操作,延時不能太久
producer 直接將數(shù)據(jù)發(fā)送到?broker 的?leader(主節(jié)點),不需要在多個節(jié)點進(jìn)行分發(fā),為了幫助?producer 做到這點,所有的?Kafka 節(jié)點都可以及時的告知:哪些節(jié)點是活動的,
目標(biāo)topic 目標(biāo)分區(qū)的?leader 在哪。這樣?producer 就可以直接將消息發(fā)送到目的地了
Kafaconsumer 消費消息時,向?broker 發(fā)出"fetch"請求去消費特定分區(qū)的消息,consumer 指定消息在日志中的偏移量(offset),就可以消費從這個位置開始的消息,
customer 擁有了?offset 的控制權(quán),可以向后回滾去重新消費之前的消息,這是很有意義的
Kafka 最初考慮的問題是,customer 應(yīng)該從?brokes 拉取消息還是?brokers 將消息推送到
consumer,也就是?pull 還?push。在這方面,Kafka 遵循了一種大部分消息系統(tǒng)共同的傳統(tǒng)的設(shè)計:producer 將消息推送到?broker,consumer 從?broker 拉取消息
一些消息系統(tǒng)比如?Scribe 和?ApacheFlume 采用了?push 模式,將消息推送到下游的?consumer。這樣做有好處也有壞處:由?broker 決定消息推送的速率,對于不同消費速率的
consumer 就不太好處理了。消息系統(tǒng)都致力于讓?consumer 以最大的速率最快速的消費消息,但不幸的是,push 模式下,當(dāng)?broker 推送的速率遠(yuǎn)大于?consumer 消費的速率
時,?consumer 恐怕就要崩潰了。最終?Kafka 還是選取了傳統(tǒng)的?pull 模式
Pull 模式的另外一個好處是?consumer 可以自主決定是否批量的從?broker 拉取數(shù)據(jù)。Push 模式必須在不知道下游?consumer 消費能力和消費策略的情況下決定是立即推送每條
消息還是緩存之后批量推送。如果為了避免?consumer 崩潰而采用較低的推送速率,將可能導(dǎo)致一次只推送較少的消息而造成浪費。Pull 模式下,consumer 就可以根據(jù)自己的
消費能力去決定這些策略
Pull 有個缺點是,如果?broker 沒有可供消費的消息,將導(dǎo)致?consumer 不斷在循環(huán)中輪詢,
直到新消息到?t 達(dá)。為了避免這點,Kafka 有個參數(shù)可以讓?consumer 阻塞知道新消息到達(dá)
(當(dāng)然也可以阻塞知道消息的數(shù)量達(dá)到某個特定的量這樣就可以批量發(fā)
消息由一個固定長度的頭部和可變長度的字節(jié)數(shù)組組成。頭部包含了一個版本號和?CRC32
校驗碼。
·消息長度: 4 bytes (value: 1+4+n)
·版本號: 1 byte
·CRC 校驗碼: 4 bytes
·具體的消息: n bytes
(1).Kafka 把?topic 中一個?parition 大文件分成多個小文件段,通過多個小文件段,就容易定期清除或刪除已經(jīng)消費完文件,減少磁盤占用。
(2).通過索引信息可以快速定位?message 和確定?response 的最大大小。
(3).通過?index 元數(shù)據(jù)全部映射到?memory,可以避免?segment file 的?IO 磁盤操作。(4).通過索引文件稀疏存儲,可以大幅降低?index 文件元數(shù)據(jù)占用空間大小。
(1).Kafka 持久化日志,這些日志可以被重復(fù)讀取和無限期保留
(2).Kafka 是一個分布式系統(tǒng):它以集群的方式運行,可以靈活伸縮,在內(nèi)部通過復(fù)制數(shù)據(jù)提升容錯能力和高可用性
(3).Kafka 支持實時的流式處理