網(wǎng)站做優(yōu)化按點(diǎn)擊收費(fèi)抖音推廣引流
文章目錄
- 搭建Kafka監(jiān)控平臺
- 合理規(guī)劃Kafka部署環(huán)境
- 合理優(yōu)化Kafka集群配置
- 優(yōu)化Kafka客戶端使用方式
- 合理保證消息安全
- 消費(fèi)者防止消息重復(fù)消費(fèi)
- 生產(chǎn)環(huán)境常見問題分析
- 消息零丟失方案
- 消息積壓如何處理
- 如何保證消息順序
搭建Kafka監(jiān)控平臺
官網(wǎng)地址
下載efak-web-3.0.2-bin.tar.gz安裝包之后,efak需要依賴JDK和數(shù)據(jù)庫。數(shù)據(jù)庫支持本地化的SQLLite以及集中式的MySQL。生產(chǎn)環(huán)境建議使用MySQL。
數(shù)據(jù)庫不需要建表初始化,EFAK在執(zhí)行過程中會自己完成初始化。
將efak壓縮包解壓
[root@worker1 ~]# tar -zxvf efak-web-3.0.2-bin.tar.gz -C /app/kafka/eagle
修改efak解壓目錄下的conf/system-config.properties。 這個(gè)文件中提供了完整的配置,下面只列出需要修改的部分。
######################################
# multi zookeeper & kafka cluster list
# Settings prefixed with 'kafka.eagle.' will be deprecated, use 'efak.' instead
######################################
# 指向Zookeeper地址
efak.zk.cluster.alias=cluster1
cluster1.zk.list=worker1:2181,worker2:2181,worker3:2181######################################
# zookeeper enable acl
######################################
# Zookeeper權(quán)限控制
cluster1.zk.acl.enable=false
cluster1.zk.acl.schema=digest
#cluster1.zk.acl.username=test
#cluster1.zk.acl.password=test123######################################
# kafka offset storage
######################################
# offset選擇存在kafka中。
cluster1.efak.offset.storage=kafka
#cluster2.efak.offset.storage=zk######################################
# kafka mysql jdbc driver address
######################################
#指向自己的MySQL服務(wù)。庫需要提前創(chuàng)建
efak.driver=com.mysql.cj.jdbc.Driver
efak.url=jdbc:mysql://worker1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
efak.username=root
efak.password=root
配置EFAK的環(huán)境變量
[root@worker1 ~]# vi ~/.bash_profile
export KE_HOME=/app/kafka/eagle/efak-web-3.0.2
PATH=$PATH:#KE_HOME/bin:$HOME/.local/bin:$HOME/bin[root@worker1 ~]# source ~/.bash_profile
啟動EFAK
配置完成后,先啟動Zookeeper和Kafka服務(wù),然后調(diào)用EFAK的bin目錄下的ke.sh腳本啟動服務(wù)
[root@worker1 bin]$ ./ke.sh start
-- 日志很長,看到以下內(nèi)容表示服務(wù)啟動成功
[2023-06-28 16:09:43] INFO: [Job done!]
Welcome to______ ______ ___ __ __/ ____/ / ____/ / | / //_// __/ / /_ / /| | / ,< / /___ / __/ / ___ | / /| |
/_____/ /_/ /_/ |_|/_/ |_|
( Eagle For Apache Kafka? )Version v3.0.2 -- Copyright 2016-2022
*******************************************************************
* EFAK Service has started success.
* Welcome, Now you can visit 'http://192.168.232.128:8048'
* Account:admin ,Password:123456
*******************************************************************
* <Usage> ke.sh [start|status|stop|restart|stats] </Usage>
* <Usage> https://www.kafka-eagle.org/ </Usage>
*******************************************************************
EFAK管理頁面
接下來就可以訪問EFAK的管理頁面。http://192.168.232.128:8048。 默認(rèn)的用戶名是admin ,密碼是123456
關(guān)于EFAK更多的使用方式,比如EFAK服務(wù)如何集群部署等,可以參考官方文檔。
合理規(guī)劃Kafka部署環(huán)境
機(jī)械硬盤kafka使用機(jī)械硬盤即可,可以不用給Kafka固態(tài)硬盤
內(nèi)存,修改 bin/kafka-start-server.sh
啟動腳本,修改JVM啟動參數(shù)中的內(nèi)存,默認(rèn)只申請了1G內(nèi)存。
[oper@worker1 bin]$ cat kafka-server-start.sh
......
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; thenexport KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi
......
對于主流的16核32G服務(wù)器,可以適當(dāng)擴(kuò)大Kafka的內(nèi)存。例如:
export KAFKA_HEAP_OPTS="‐Xmx16G ‐Xms16G ‐Xmn10G ‐XX:MetaspaceSize=256M ‐XX:+UseG1GC ‐XX:MaxGCPauseMillis=50 ‐XX:G1HeapRegionSize=16M"
高性能網(wǎng)卡,Kafka本身的服務(wù)性能非常高,單機(jī)就可以支持百萬級的TPS,在高流量沖擊下,網(wǎng)絡(luò)非常有可能優(yōu)先成為性能瓶頸。對于Kafka服務(wù)器,建議配置高性能的網(wǎng)卡。成本允許的話,盡量選擇千兆以上的網(wǎng)卡。
合理優(yōu)化Kafka集群配置
合理配置partition分區(qū)數(shù)量
Kafka的單個(gè)Partition讀寫效率是非常高的,但是Kafka的Partition設(shè)計(jì)是非常碎片化。如果Partition文件過多,很容易嚴(yán)重影響Kafka的整體性能。
- 盡量不要使用過多的Topic,通常不建議超過3個(gè)Topic。
- 一個(gè)分區(qū)下的副本集數(shù)量不要設(shè)置過多 將副本數(shù)設(shè)置為2就可以了。
至于Partition的數(shù)量,最好根據(jù)業(yè)務(wù)情況靈活調(diào)整。partition數(shù)量設(shè)置多一些,可以一定程度增加Topic的吞吐量。但是過多的partition數(shù)量還是同樣會帶來partition索引的壓力。
Kafka提供了一個(gè)生產(chǎn)者的性能壓測腳本,可以用來衡量集群的整體性能。
# num-record表示要發(fā)送100000條壓測消息,record-size表示每條消息大小1KB,throughput表示限流控制,設(shè)置為小于0表示不限流。
# properducer-props用來設(shè)置生產(chǎn)者的參數(shù)。
[oper@worker1 bin]$ ./kafka-producer-perf-test.sh --topic test --num-record 1000000 --record-size 1024 --throughput -1 --producer-props bootstrap.servers=worker1:9092 acks=1
94846 records sent, 18969.2 records/sec (18.52 MB/sec), 1157.4 ms avg latency, 1581.0 ms max latency.
133740 records sent, 26748.0 records/sec (26.12 MB/sec), 1150.6 ms avg latency, 1312.0 ms max latency.
146760 records sent, 29346.1 records/sec (28.66 MB/sec), 1051.5 ms avg latency, 1164.0 ms max latency.
137400 records sent, 27480.0 records/sec (26.84 MB/sec), 1123.7 ms avg latency, 1182.0 ms max latency.
158700 records sent, 31740.0 records/sec (31.00 MB/sec), 972.1 ms avg latency, 1022.0 ms max latency.
158775 records sent, 31755.0 records/sec (31.01 MB/sec), 963.5 ms avg latency, 1055.0 ms max latency.
1000000 records sent, 28667.259123 records/sec (28.00 MB/sec), 1030.44 ms avg latency, 1581.00 ms max latency, 1002 ms 50th, 1231 ms 95th, 1440 ms 99th, 1563 ms 99.9th.
合理對數(shù)據(jù)進(jìn)行壓縮
在生產(chǎn)者的ProducerConfig中,有一個(gè)配置 COMPRESSION_TYPE_CONFIG,是用來對消息進(jìn)行壓縮的。
public static final String COMPRESSION_TYPE_CONFIG = "compression.type";
生產(chǎn)者配置了壓縮策略后,會對生產(chǎn)的每個(gè)消息進(jìn)行壓縮,從而降低Producer到Broker的網(wǎng)絡(luò)傳輸,也降低了Broker的數(shù)據(jù)存儲壓力。
所支持的幾種壓縮算法中,zstd算法具有最高的數(shù)據(jù)壓縮比,但是吞吐量不高,lz4在吞吐量方面的優(yōu)勢比較明顯。壓縮消息必然增加CPU的消耗,如果CPU資源緊張,就不要壓縮了。
關(guān)于數(shù)據(jù)壓縮機(jī)制,可以在Broker的conf/server.properties文件中進(jìn)行配置。正常情況下,Broker從Producer端接收到消息后不會對其進(jìn)行任何修改,但是如果Broker端和Producer端指定了不同的壓縮算法,就會產(chǎn)生很多異常的表現(xiàn)。
compression.typeType: string
Default: producer
Valid Values: [uncompressed, zstd, lz4, snappy, gzip, producer]
如果開啟了消息壓縮,那么在消費(fèi)者端自然是要進(jìn)行解壓縮的。
在Kafka中,消息從Producer到Broker再到Consumer會一直攜帶消息的壓縮方式,這樣當(dāng)Consumer讀取到消息集合時(shí),自然就知道了這些消息使用的是哪種壓縮算法,也就可以自己進(jìn)行解壓了。但是這時(shí)要注意的是應(yīng)用中使用的Kafka客戶端版本和Kafka服務(wù)端版本是否匹配。
優(yōu)化Kafka客戶端使用方式
合理保證消息安全
消息生產(chǎn)者設(shè)置好發(fā)送者應(yīng)答參數(shù)
- 消息生產(chǎn)者的
acks
參數(shù),可以設(shè)置為0、1、all或-1 - Broker的min.insync.replicas參數(shù)。如果生產(chǎn)者的acks設(shè)置為-1或all,服務(wù)端并不是強(qiáng)行要求所有Paritition都完成寫入再返回,而是可以配置多少個(gè)Partition完成消息寫入后,再往Producer返回消息。
生產(chǎn)者端的冪等性配置
// 值為 true 或 false
public static final String ENABLE_IDEMPOTENCE_CONFIG = "enable.idempotence";
如果要開啟冪等性,那么
- 生產(chǎn)者消息緩存機(jī)制中的
max.in.flight.requests.per.connection
<=5 - 重試次數(shù)
retries
>0 - 應(yīng)答機(jī)制中的
acks
必須為all
其他補(bǔ)充點(diǎn):
- 如果沒有設(shè)置沖突配置,則默認(rèn)啟用冪等性。
- 如果設(shè)置了沖突的配置,并且冪等性沒有顯式啟用,則冪等性被禁用。
- 即顯示開啟的冪等性,又有沖突配置,就拋異常
使用生產(chǎn)者事務(wù)機(jī)制發(fā)送消息
// 1 初始化事務(wù)
void initTransactions();
// 2 開啟事務(wù)
void beginTransaction() throws ProducerFencedException;
// 3 提交事務(wù)
void commitTransaction() throws ProducerFencedException;
// 4 放棄事務(wù)(類似于回滾事務(wù)的操作)
void abortTransaction() throws ProducerFencedException;
SpringBoot集成Kafka時(shí)使用的KafkaTemplate就提供了事務(wù)消息機(jī)制
消費(fèi)者端合理使用提交方式
如果消費(fèi)者要使用異步方式進(jìn)行業(yè)務(wù)處理,那么如果業(yè)務(wù)處理失敗,此時(shí)消費(fèi)者已經(jīng)提交了Offset,這個(gè)消息就無法重試了,這就會造成消息丟失。
因此在消費(fèi)者端,盡量不要使用異步處理方式,在絕大部分場景下,就能夠通過Kafka的消費(fèi)者重試機(jī)制,保證消息安全處理。此時(shí),在消費(fèi)者端,需要更多考慮的問題,就變成了消費(fèi)重試機(jī)制造成的消息重復(fù)消費(fèi)的問題。
消費(fèi)者防止消息重復(fù)消費(fèi)
問題的產(chǎn)生:消費(fèi)者業(yè)務(wù)處理時(shí)間較長,此時(shí)消費(fèi)者正常處理消息的過程中,Broker端就已經(jīng)等不下去了,認(rèn)為這個(gè)消費(fèi)者處理失敗了。這時(shí)就會往同組的其他消費(fèi)者實(shí)例投遞消息,這就造成了消息重復(fù)處理。這就給消費(fèi)者端帶來不必要的冪等性問題。
解決方式:
-
根據(jù)業(yè)務(wù)ID校驗(yàn)。比如對于訂單消息,消費(fèi)者根據(jù)訂單ID去確認(rèn)一下這個(gè)訂單消息有沒有處理過。
-
統(tǒng)一的方式處理冪等性問題
將Offset放到Redis中自行進(jìn)行管理。通過Redis中的offset來判斷消息之前是否處理過。
具體實(shí)現(xiàn)詳情請參考上文《客戶端 — 客戶端屬性分析 — 消息重復(fù)消費(fèi)問題》
生產(chǎn)環(huán)境常見問題分析
消息零丟失方案
1、 生產(chǎn)者發(fā)送消息到Broker不丟失
生產(chǎn)者丟失消息的原因就和ack機(jī)制有關(guān),如果將acks
設(shè)置為1、all或-1 就表示接收到Broker成功將消息寫入文件后的ack應(yīng)答
2、Broker端保證消息不丟失
Broker端丟失消息的原因主要有兩種:Leader partition重新選舉、服務(wù)器非正常關(guān)機(jī) pageCache中的消息還未刷盤
《服務(wù)端的Zookeeper元數(shù)據(jù)梳理 — Partition故障恢復(fù)機(jī)制》 《服務(wù)端日志 — 合理配置刷盤頻率》
消息生產(chǎn)者將acks
設(shè)置為all可以有效避免因?yàn)長eader partition故障導(dǎo)致的消息丟失;
而pageCache刷盤可以通過下面的參數(shù)來設(shè)定合理的配置
# 多長時(shí)間進(jìn)行一次強(qiáng)制刷盤。默認(rèn)是Long.MAX。
flush.ms# 表示當(dāng)同一個(gè)Partiton的消息條數(shù)積累到這個(gè)數(shù)量時(shí),就會申請一次刷盤操作。默認(rèn)是Long.MAX。
log.flush.interval.messages#當(dāng)一個(gè)消息在內(nèi)存中保留的時(shí)間,達(dá)到這個(gè)數(shù)量時(shí),就會申請一次刷盤操作。他的默認(rèn)值是空。如果這個(gè)參數(shù)配置為空,則生效的是下一個(gè)參數(shù)。
log.flush.interval.ms# 日志刷新程序檢查是否需要將日志刷新到磁盤的頻率(以毫秒為單位),默認(rèn)也是Long.MAX。
log.flush.scheduler.interval.ms
3、消費(fèi)者保證消息不丟失
同步處理業(yè)務(wù)邏輯,同步手動提交offset
消息積壓如何處理
-
如果業(yè)務(wù)運(yùn)行正常,只是因?yàn)橄M(fèi)者處理消息過慢,造成消息加壓。
增加消費(fèi)者個(gè)數(shù),最多讓一個(gè)消費(fèi)者組下的消費(fèi)者個(gè)數(shù)=Partition分區(qū)數(shù),讓一個(gè)Consumer負(fù)責(zé)一個(gè)分區(qū),將消費(fèi)進(jìn)度提升到最大。
如果還是不夠,那就新創(chuàng)建一個(gè)topic,給這個(gè)topic更多的partition,然后啟動一批消費(fèi)者,將消息從舊topic中搬運(yùn)到新topic中,這些消費(fèi)者不處理業(yè)務(wù)消息,僅僅是搬運(yùn)。這樣就可以創(chuàng)建更多的消費(fèi)者消費(fèi)從新topic消費(fèi)了
-
如果是消費(fèi)者業(yè)務(wù)處理異常導(dǎo)致的消息積壓。
使用一種降級方案。先啟動一個(gè)Consumer將Topic下的消息先轉(zhuǎn)發(fā)到其他隊(duì)列中,然后再慢慢分析新隊(duì)列里的消息處理問題。類似于死信隊(duì)列的處理方式。
如何保證消息順序
Kafka很難保證消息的順序,因?yàn)镵afka設(shè)計(jì)的最優(yōu)先重點(diǎn)是海量吞吐。
這個(gè)問題要分兩個(gè)方面考慮:
- Topic下各個(gè)partition是并發(fā)處理消息的 ,producer要保證一組有序的消息發(fā)送到一個(gè)partition中
- consumer要從partition中順序處理消息
問題一:producer要保證一組有序的消息發(fā)送到一個(gè)partition中
第一種簡單粗暴的方式就只為Topic創(chuàng)建一個(gè)partition,這種方式放棄了多partition帶來的高吞吐量。
第二種方式,自己定義一個(gè)分區(qū)路由機(jī)制,實(shí)現(xiàn)org.apache.kafka.clients.producer.Partitioner
接口,將消息分配到同一個(gè)Partition上。
此時(shí)發(fā)送方保證了發(fā)送時(shí)的消息順序性。但是存在網(wǎng)絡(luò)問題,導(dǎo)致發(fā)送的一批消息中某一個(gè)消息丟失,這樣消息到Broker時(shí)就還是沒辦法保證順序性。Kafka是通過冪等性中單調(diào)遞增的sequenceNumber來保證消息是順序,因?yàn)槭菃握{(diào)遞增的,所以還能判斷是否存在消息丟失一旦Kafka發(fā)現(xiàn)Producer傳過來的SequenceNumber出現(xiàn)了跨越,那么就意味著中間有可能消息出現(xiàn)了丟失,就會往Producer拋出一個(gè)OutOfOrderSequenceException異常。
問題二:Partition中的消息有序后,如何保證Consumer的消費(fèi)順序是有序的
// 一次拉取消息的數(shù)量
public static final String FETCH_MAX_BYTES_CONFIG = "fetch.max.bytes";
Note that the consumer performs multiple fetches in parallel.public static final int DEFAULT_FETCH_MAX_BYTES = 50 * 1024 * 1024;
從上方fetch.max.bytes
參數(shù)的最后一個(gè)解釋可以知道Consumer其實(shí)是每次并行的拉取多個(gè)Batch批次的消息進(jìn)行處理的。所以在Kafka提供的客戶端Consumer中,是沒有辦法直接保證消費(fèi)的消息順序。
我們能做的就是在Consumer的處理邏輯中,將消息進(jìn)行排序。比如將消息按照業(yè)務(wù)獨(dú)立性收集到一個(gè)集合中,然后在集合中對消息進(jìn)行排序。
RocketMQ中提供了順序消息的實(shí)現(xiàn)。他的實(shí)現(xiàn)原理是先鎖定一個(gè)隊(duì)列,消費(fèi)完這一個(gè)隊(duì)列后,才開始鎖定下一個(gè)隊(duì)列,并消費(fèi)隊(duì)列中的消息。再結(jié)合MessageQueue中的消息有序性,就能保證整體消息的消費(fèi)順序是有序的。