中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁 > news >正文

網(wǎng)站做優(yōu)化按點(diǎn)擊收費(fèi)抖音推廣引流

網(wǎng)站做優(yōu)化按點(diǎn)擊收費(fèi),抖音推廣引流,中小企業(yè)網(wǎng)站開發(fā),免費(fèi)網(wǎng)站注冊com文章目錄 搭建Kafka監(jiān)控平臺合理規(guī)劃Kafka部署環(huán)境合理優(yōu)化Kafka集群配置優(yōu)化Kafka客戶端使用方式合理保證消息安全消費(fèi)者防止消息重復(fù)消費(fèi) 生產(chǎn)環(huán)境常見問題分析消息零丟失方案消息積壓如何處理如何保證消息順序 搭建Kafka監(jiān)控平臺 官網(wǎng)地址 下載efak-web-3.0.2-bin.tar.gz安…

文章目錄

    • 搭建Kafka監(jiān)控平臺
    • 合理規(guī)劃Kafka部署環(huán)境
    • 合理優(yōu)化Kafka集群配置
    • 優(yōu)化Kafka客戶端使用方式
      • 合理保證消息安全
      • 消費(fèi)者防止消息重復(fù)消費(fèi)
    • 生產(chǎn)環(huán)境常見問題分析
      • 消息零丟失方案
      • 消息積壓如何處理
      • 如何保證消息順序

搭建Kafka監(jiān)控平臺

官網(wǎng)地址

在這里插入圖片描述



下載efak-web-3.0.2-bin.tar.gz安裝包之后,efak需要依賴JDK和數(shù)據(jù)庫。數(shù)據(jù)庫支持本地化的SQLLite以及集中式的MySQL。生產(chǎn)環(huán)境建議使用MySQL。

數(shù)據(jù)庫不需要建表初始化,EFAK在執(zhí)行過程中會自己完成初始化。



將efak壓縮包解壓

[root@worker1 ~]# tar -zxvf efak-web-3.0.2-bin.tar.gz -C /app/kafka/eagle



修改efak解壓目錄下的conf/system-config.properties。 這個(gè)文件中提供了完整的配置,下面只列出需要修改的部分。

######################################
# multi zookeeper & kafka cluster list
# Settings prefixed with 'kafka.eagle.' will be deprecated, use 'efak.' instead
######################################
# 指向Zookeeper地址
efak.zk.cluster.alias=cluster1
cluster1.zk.list=worker1:2181,worker2:2181,worker3:2181######################################
# zookeeper enable acl
######################################
# Zookeeper權(quán)限控制
cluster1.zk.acl.enable=false
cluster1.zk.acl.schema=digest
#cluster1.zk.acl.username=test
#cluster1.zk.acl.password=test123######################################
# kafka offset storage
######################################
# offset選擇存在kafka中。
cluster1.efak.offset.storage=kafka
#cluster2.efak.offset.storage=zk######################################
# kafka mysql jdbc driver address
######################################
#指向自己的MySQL服務(wù)。庫需要提前創(chuàng)建
efak.driver=com.mysql.cj.jdbc.Driver
efak.url=jdbc:mysql://worker1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
efak.username=root
efak.password=root



配置EFAK的環(huán)境變量

[root@worker1 ~]# vi ~/.bash_profile
export KE_HOME=/app/kafka/eagle/efak-web-3.0.2
PATH=$PATH:#KE_HOME/bin:$HOME/.local/bin:$HOME/bin[root@worker1 ~]# source ~/.bash_profile



啟動EFAK

配置完成后,先啟動Zookeeper和Kafka服務(wù),然后調(diào)用EFAK的bin目錄下的ke.sh腳本啟動服務(wù)

[root@worker1 bin]$ ./ke.sh start
-- 日志很長,看到以下內(nèi)容表示服務(wù)啟動成功
[2023-06-28 16:09:43] INFO: [Job done!]
Welcome to______    ______    ___     __ __/ ____/   / ____/   /   |   / //_// __/     / /_      / /| |  / ,<   / /___    / __/     / ___ | / /| |  
/_____/   /_/       /_/  |_|/_/ |_|  
( Eagle For Apache Kafka? )Version v3.0.2 -- Copyright 2016-2022
*******************************************************************
* EFAK Service has started success.
* Welcome, Now you can visit 'http://192.168.232.128:8048'
* Account:admin ,Password:123456
*******************************************************************
* <Usage> ke.sh [start|status|stop|restart|stats] </Usage>
* <Usage> https://www.kafka-eagle.org/ </Usage>
*******************************************************************



EFAK管理頁面

接下來就可以訪問EFAK的管理頁面。http://192.168.232.128:8048。 默認(rèn)的用戶名是admin ,密碼是123456

關(guān)于EFAK更多的使用方式,比如EFAK服務(wù)如何集群部署等,可以參考官方文檔。



合理規(guī)劃Kafka部署環(huán)境

機(jī)械硬盤kafka使用機(jī)械硬盤即可,可以不用給Kafka固態(tài)硬盤



內(nèi)存,修改 bin/kafka-start-server.sh啟動腳本,修改JVM啟動參數(shù)中的內(nèi)存,默認(rèn)只申請了1G內(nèi)存。

[oper@worker1 bin]$ cat kafka-server-start.sh 
......
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; thenexport KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi
......

對于主流的16核32G服務(wù)器,可以適當(dāng)擴(kuò)大Kafka的內(nèi)存。例如:

export KAFKA_HEAP_OPTS="‐Xmx16G ‐Xms16G ‐Xmn10G ‐XX:MetaspaceSize=256M ‐XX:+UseG1GC ‐XX:MaxGCPauseMillis=50 ‐XX:G1HeapRegionSize=16M"



高性能網(wǎng)卡,Kafka本身的服務(wù)性能非常高,單機(jī)就可以支持百萬級的TPS,在高流量沖擊下,網(wǎng)絡(luò)非常有可能優(yōu)先成為性能瓶頸。對于Kafka服務(wù)器,建議配置高性能的網(wǎng)卡。成本允許的話,盡量選擇千兆以上的網(wǎng)卡。



合理優(yōu)化Kafka集群配置

合理配置partition分區(qū)數(shù)量

Kafka的單個(gè)Partition讀寫效率是非常高的,但是Kafka的Partition設(shè)計(jì)是非常碎片化。如果Partition文件過多,很容易嚴(yán)重影響Kafka的整體性能。

  • 盡量不要使用過多的Topic,通常不建議超過3個(gè)Topic。
  • 一個(gè)分區(qū)下的副本集數(shù)量不要設(shè)置過多 將副本數(shù)設(shè)置為2就可以了。

至于Partition的數(shù)量,最好根據(jù)業(yè)務(wù)情況靈活調(diào)整。partition數(shù)量設(shè)置多一些,可以一定程度增加Topic的吞吐量。但是過多的partition數(shù)量還是同樣會帶來partition索引的壓力。

Kafka提供了一個(gè)生產(chǎn)者的性能壓測腳本,可以用來衡量集群的整體性能。

# num-record表示要發(fā)送100000條壓測消息,record-size表示每條消息大小1KB,throughput表示限流控制,設(shè)置為小于0表示不限流。
# properducer-props用來設(shè)置生產(chǎn)者的參數(shù)。
[oper@worker1 bin]$ ./kafka-producer-perf-test.sh --topic test --num-record 1000000 --record-size 1024 --throughput -1 --producer-props bootstrap.servers=worker1:9092 acks=1
94846 records sent, 18969.2 records/sec (18.52 MB/sec), 1157.4 ms avg latency, 1581.0 ms max latency.
133740 records sent, 26748.0 records/sec (26.12 MB/sec), 1150.6 ms avg latency, 1312.0 ms max latency.
146760 records sent, 29346.1 records/sec (28.66 MB/sec), 1051.5 ms avg latency, 1164.0 ms max latency.
137400 records sent, 27480.0 records/sec (26.84 MB/sec), 1123.7 ms avg latency, 1182.0 ms max latency.
158700 records sent, 31740.0 records/sec (31.00 MB/sec), 972.1 ms avg latency, 1022.0 ms max latency.
158775 records sent, 31755.0 records/sec (31.01 MB/sec), 963.5 ms avg latency, 1055.0 ms max latency.
1000000 records sent, 28667.259123 records/sec (28.00 MB/sec), 1030.44 ms avg latency, 1581.00 ms max latency, 1002 ms 50th, 1231 ms 95th, 1440 ms 99th, 1563 ms 99.9th.



合理對數(shù)據(jù)進(jìn)行壓縮

在生產(chǎn)者的ProducerConfig中,有一個(gè)配置 COMPRESSION_TYPE_CONFIG,是用來對消息進(jìn)行壓縮的。

public static final String COMPRESSION_TYPE_CONFIG = "compression.type";

生產(chǎn)者配置了壓縮策略后,會對生產(chǎn)的每個(gè)消息進(jìn)行壓縮,從而降低Producer到Broker的網(wǎng)絡(luò)傳輸,也降低了Broker的數(shù)據(jù)存儲壓力。

所支持的幾種壓縮算法中,zstd算法具有最高的數(shù)據(jù)壓縮比,但是吞吐量不高,lz4在吞吐量方面的優(yōu)勢比較明顯。壓縮消息必然增加CPU的消耗,如果CPU資源緊張,就不要壓縮了。

關(guān)于數(shù)據(jù)壓縮機(jī)制,可以在Broker的conf/server.properties文件中進(jìn)行配置。正常情況下,Broker從Producer端接收到消息后不會對其進(jìn)行任何修改,但是如果Broker端和Producer端指定了不同的壓縮算法,就會產(chǎn)生很多異常的表現(xiàn)。

compression.typeType:	string
Default:	producer
Valid Values:	[uncompressed, zstd, lz4, snappy, gzip, producer]



如果開啟了消息壓縮,那么在消費(fèi)者端自然是要進(jìn)行解壓縮的。

在Kafka中,消息從Producer到Broker再到Consumer會一直攜帶消息的壓縮方式,這樣當(dāng)Consumer讀取到消息集合時(shí),自然就知道了這些消息使用的是哪種壓縮算法,也就可以自己進(jìn)行解壓了。但是這時(shí)要注意的是應(yīng)用中使用的Kafka客戶端版本和Kafka服務(wù)端版本是否匹配。



優(yōu)化Kafka客戶端使用方式

合理保證消息安全

消息生產(chǎn)者設(shè)置好發(fā)送者應(yīng)答參數(shù)

  • 消息生產(chǎn)者的acks參數(shù),可以設(shè)置為0、1、all或-1
  • Broker的min.insync.replicas參數(shù)。如果生產(chǎn)者的acks設(shè)置為-1或all,服務(wù)端并不是強(qiáng)行要求所有Paritition都完成寫入再返回,而是可以配置多少個(gè)Partition完成消息寫入后,再往Producer返回消息。



生產(chǎn)者端的冪等性配置

// 值為 true 或 false
public static final String ENABLE_IDEMPOTENCE_CONFIG = "enable.idempotence";

如果要開啟冪等性,那么

  • 生產(chǎn)者消息緩存機(jī)制中的 max.in.flight.requests.per.connection <=5
  • 重試次數(shù)retries>0
  • 應(yīng)答機(jī)制中的acks必須為all

其他補(bǔ)充點(diǎn):

  • 如果沒有設(shè)置沖突配置,則默認(rèn)啟用冪等性。
  • 如果設(shè)置了沖突的配置,并且冪等性沒有顯式啟用,則冪等性被禁用。
  • 即顯示開啟的冪等性,又有沖突配置,就拋異常



使用生產(chǎn)者事務(wù)機(jī)制發(fā)送消息

// 1 初始化事務(wù)
void initTransactions();
// 2 開啟事務(wù)
void beginTransaction() throws ProducerFencedException;
// 3 提交事務(wù)
void commitTransaction() throws ProducerFencedException;
// 4 放棄事務(wù)(類似于回滾事務(wù)的操作)
void abortTransaction() throws ProducerFencedException;

SpringBoot集成Kafka時(shí)使用的KafkaTemplate就提供了事務(wù)消息機(jī)制

在這里插入圖片描述



消費(fèi)者端合理使用提交方式

如果消費(fèi)者要使用異步方式進(jìn)行業(yè)務(wù)處理,那么如果業(yè)務(wù)處理失敗,此時(shí)消費(fèi)者已經(jīng)提交了Offset,這個(gè)消息就無法重試了,這就會造成消息丟失。

因此在消費(fèi)者端,盡量不要使用異步處理方式,在絕大部分場景下,就能夠通過Kafka的消費(fèi)者重試機(jī)制,保證消息安全處理。此時(shí),在消費(fèi)者端,需要更多考慮的問題,就變成了消費(fèi)重試機(jī)制造成的消息重復(fù)消費(fèi)的問題。



消費(fèi)者防止消息重復(fù)消費(fèi)

問題的產(chǎn)生:消費(fèi)者業(yè)務(wù)處理時(shí)間較長,此時(shí)消費(fèi)者正常處理消息的過程中,Broker端就已經(jīng)等不下去了,認(rèn)為這個(gè)消費(fèi)者處理失敗了。這時(shí)就會往同組的其他消費(fèi)者實(shí)例投遞消息,這就造成了消息重復(fù)處理。這就給消費(fèi)者端帶來不必要的冪等性問題。

解決方式:

  • 根據(jù)業(yè)務(wù)ID校驗(yàn)。比如對于訂單消息,消費(fèi)者根據(jù)訂單ID去確認(rèn)一下這個(gè)訂單消息有沒有處理過。

  • 統(tǒng)一的方式處理冪等性問題

    將Offset放到Redis中自行進(jìn)行管理。通過Redis中的offset來判斷消息之前是否處理過。

    具體實(shí)現(xiàn)詳情請參考上文《客戶端 — 客戶端屬性分析 — 消息重復(fù)消費(fèi)問題》



生產(chǎn)環(huán)境常見問題分析

消息零丟失方案

在這里插入圖片描述



1、 生產(chǎn)者發(fā)送消息到Broker不丟失

生產(chǎn)者丟失消息的原因就和ack機(jī)制有關(guān),如果將acks設(shè)置為1、all或-1 就表示接收到Broker成功將消息寫入文件后的ack應(yīng)答



2、Broker端保證消息不丟失

Broker端丟失消息的原因主要有兩種:Leader partition重新選舉、服務(wù)器非正常關(guān)機(jī) pageCache中的消息還未刷盤

《服務(wù)端的Zookeeper元數(shù)據(jù)梳理 — Partition故障恢復(fù)機(jī)制》 《服務(wù)端日志 — 合理配置刷盤頻率》

消息生產(chǎn)者將acks設(shè)置為all可以有效避免因?yàn)長eader partition故障導(dǎo)致的消息丟失;

而pageCache刷盤可以通過下面的參數(shù)來設(shè)定合理的配置

# 多長時(shí)間進(jìn)行一次強(qiáng)制刷盤。默認(rèn)是Long.MAX。
flush.ms# 表示當(dāng)同一個(gè)Partiton的消息條數(shù)積累到這個(gè)數(shù)量時(shí),就會申請一次刷盤操作。默認(rèn)是Long.MAX。
log.flush.interval.messages#當(dāng)一個(gè)消息在內(nèi)存中保留的時(shí)間,達(dá)到這個(gè)數(shù)量時(shí),就會申請一次刷盤操作。他的默認(rèn)值是空。如果這個(gè)參數(shù)配置為空,則生效的是下一個(gè)參數(shù)。
log.flush.interval.ms# 日志刷新程序檢查是否需要將日志刷新到磁盤的頻率(以毫秒為單位),默認(rèn)也是Long.MAX。
log.flush.scheduler.interval.ms



3、消費(fèi)者保證消息不丟失

同步處理業(yè)務(wù)邏輯,同步手動提交offset



消息積壓如何處理

在這里插入圖片描述

  • 如果業(yè)務(wù)運(yùn)行正常,只是因?yàn)橄M(fèi)者處理消息過慢,造成消息加壓。

    增加消費(fèi)者個(gè)數(shù),最多讓一個(gè)消費(fèi)者組下的消費(fèi)者個(gè)數(shù)=Partition分區(qū)數(shù),讓一個(gè)Consumer負(fù)責(zé)一個(gè)分區(qū),將消費(fèi)進(jìn)度提升到最大。

    如果還是不夠,那就新創(chuàng)建一個(gè)topic,給這個(gè)topic更多的partition,然后啟動一批消費(fèi)者,將消息從舊topic中搬運(yùn)到新topic中,這些消費(fèi)者不處理業(yè)務(wù)消息,僅僅是搬運(yùn)。這樣就可以創(chuàng)建更多的消費(fèi)者消費(fèi)從新topic消費(fèi)了

  • 如果是消費(fèi)者業(yè)務(wù)處理異常導(dǎo)致的消息積壓。

    使用一種降級方案。先啟動一個(gè)Consumer將Topic下的消息先轉(zhuǎn)發(fā)到其他隊(duì)列中,然后再慢慢分析新隊(duì)列里的消息處理問題。類似于死信隊(duì)列的處理方式。



如何保證消息順序

Kafka很難保證消息的順序,因?yàn)镵afka設(shè)計(jì)的最優(yōu)先重點(diǎn)是海量吞吐。

在這里插入圖片描述



這個(gè)問題要分兩個(gè)方面考慮:

  1. Topic下各個(gè)partition是并發(fā)處理消息的 ,producer要保證一組有序的消息發(fā)送到一個(gè)partition中
  2. consumer要從partition中順序處理消息



問題一:producer要保證一組有序的消息發(fā)送到一個(gè)partition中

第一種簡單粗暴的方式就只為Topic創(chuàng)建一個(gè)partition,這種方式放棄了多partition帶來的高吞吐量。

第二種方式,自己定義一個(gè)分區(qū)路由機(jī)制,實(shí)現(xiàn)org.apache.kafka.clients.producer.Partitioner接口,將消息分配到同一個(gè)Partition上。

此時(shí)發(fā)送方保證了發(fā)送時(shí)的消息順序性。但是存在網(wǎng)絡(luò)問題,導(dǎo)致發(fā)送的一批消息中某一個(gè)消息丟失,這樣消息到Broker時(shí)就還是沒辦法保證順序性。Kafka是通過冪等性中單調(diào)遞增的sequenceNumber來保證消息是順序,因?yàn)槭菃握{(diào)遞增的,所以還能判斷是否存在消息丟失一旦Kafka發(fā)現(xiàn)Producer傳過來的SequenceNumber出現(xiàn)了跨越,那么就意味著中間有可能消息出現(xiàn)了丟失,就會往Producer拋出一個(gè)OutOfOrderSequenceException異常。



問題二:Partition中的消息有序后,如何保證Consumer的消費(fèi)順序是有序的

// 一次拉取消息的數(shù)量
public static final String FETCH_MAX_BYTES_CONFIG = "fetch.max.bytes";
Note that the consumer performs multiple fetches in parallel.public static final int DEFAULT_FETCH_MAX_BYTES = 50 * 1024 * 1024;

從上方fetch.max.bytes參數(shù)的最后一個(gè)解釋可以知道Consumer其實(shí)是每次并行的拉取多個(gè)Batch批次的消息進(jìn)行處理的。所以在Kafka提供的客戶端Consumer中,是沒有辦法直接保證消費(fèi)的消息順序。

我們能做的就是在Consumer的處理邏輯中,將消息進(jìn)行排序。比如將消息按照業(yè)務(wù)獨(dú)立性收集到一個(gè)集合中,然后在集合中對消息進(jìn)行排序。

RocketMQ中提供了順序消息的實(shí)現(xiàn)。他的實(shí)現(xiàn)原理是先鎖定一個(gè)隊(duì)列,消費(fèi)完這一個(gè)隊(duì)列后,才開始鎖定下一個(gè)隊(duì)列,并消費(fèi)隊(duì)列中的消息。再結(jié)合MessageQueue中的消息有序性,就能保證整體消息的消費(fèi)順序是有序的。

http://www.risenshineclean.com/news/62350.html

相關(guān)文章:

  • 大同網(wǎng)站建設(shè)設(shè)計(jì)seo排名技術(shù)軟件
  • 接網(wǎng)站開發(fā)的公司合肥網(wǎng)站優(yōu)化搜索
  • 制作 網(wǎng)站導(dǎo)航 下拉菜單今日國際軍事新聞最新消息
  • 做網(wǎng)站 怎么提升瀏覽量seo推廣官網(wǎng)
  • 做網(wǎng)絡(luò)課堂的平臺有哪些網(wǎng)站鄭州seo推廣外包
  • 運(yùn)動網(wǎng)頁設(shè)計(jì)哪里有seo排名優(yōu)化
  • 做網(wǎng)站建設(shè)費(fèi)用nba最新排名東西部
  • 重慶建設(shè)摩托車質(zhì)量怎么樣seo入門版
  • 幫詐騙公司做網(wǎng)站企業(yè)網(wǎng)站推廣技巧
  • 阜寧做網(wǎng)站的價(jià)格怎么推廣自己的網(wǎng)站
  • php 網(wǎng)站 項(xiàng)目cilimao磁力貓搜索引擎
  • wordpress手機(jī)評論百度seo新站優(yōu)化
  • swiper做的網(wǎng)站百度網(wǎng)頁版瀏覽器入口
  • 青島做網(wǎng)站費(fèi)用廚師培訓(xùn)機(jī)構(gòu)
  • 平臺企業(yè)采用勞務(wù)派遣方式用工的seo的形式有哪些
  • 網(wǎng)站自做書本永久免費(fèi)的網(wǎng)站服務(wù)器有哪些軟件
  • 建設(shè)一個(gè)網(wǎng)站的步驟有哪些網(wǎng)絡(luò)推廣公司怎么找客戶
  • 百度搜索網(wǎng)站介紹杭州上城區(qū)抖音seo有多好
  • 做網(wǎng)站優(yōu)化哪家公司好關(guān)鍵詞自動優(yōu)化
  • 合肥知名網(wǎng)站制作上海關(guān)鍵詞優(yōu)化排名哪家好
  • 松原網(wǎng)站建設(shè)網(wǎng)站建設(shè)的好公司
  • 做網(wǎng)站網(wǎng)頁掙錢不免費(fèi)刷seo
  • 深圳做網(wǎng)站公司地點(diǎn)十大免費(fèi)網(wǎng)站推廣平臺
  • 百度搜索量seo要點(diǎn)
  • 圖床網(wǎng)站怎么做廣州seo團(tuán)隊(duì)
  • 石家莊高鐵站123網(wǎng)址之家
  • 秦皇島網(wǎng)站制作方案電商網(wǎng)站怎樣優(yōu)化
  • 網(wǎng)站建設(shè)公司的網(wǎng)銷好做嗎百度輸入法免費(fèi)下載
  • wordpress使用步驟杭州seo網(wǎng)站推廣排名
  • 云南房產(chǎn)網(wǎng)站建設(shè)seo的理解