電子商務網(wǎng)站的建設目標是什么關鍵詞優(yōu)化軟件
目錄
一丶Zookkeeper概述
?二、Zookeeper 特點
2.1Zookeeper 應用場景
2.2Zookeeper 選舉機制
2.2.1第一次啟動選舉機制
2.2.2非第一次啟動選舉機制
三、部署 Zookeeper 集群
3.1//安裝 JDK
?3.2安裝 Zookeeper
3.2.1修改配置文件
3.2.2拷貝配置好的 Zookeeper 配置文件到其他機器上
?3.2.3在每個節(jié)點上創(chuàng)建數(shù)據(jù)目錄和日志目錄
3.2.4在每個節(jié)點的dataDir指定的目錄下創(chuàng)建一個 myid 的文件
四、Kafka概述
4.1Kafka 定義
4.2Kafka 簡介
4.3為什么需要消息列隊
4.4使用消息隊列的好處
4.4.1解耦
4.4.2可恢復性
4.4.3緩沖
4.4.4靈活性
4.4.5異步通信
4.5消息隊列的兩種模式
4.5.1點對點模式
4.5.2發(fā)布/訂閱模式
4.6Kafka 的特性
4.7artation 數(shù)據(jù)路由規(guī)則
五、?部署 kafka 集群
5.1安裝 Kafka
5.1.1修改配置文件
5.1.2修改環(huán)境變量
六、Kafka 架構深入
6.1Kafka 工作流程及文件存儲機制
七、 部署 Zookeeper+Kafka 集群
7.1部署 Filebeat
7.2部署 ELK,在 Logstash 組件所在節(jié)點上新建一個 Logstash 配置文件
?7.3瀏覽器訪問 http://192.168.10.13:5601 登錄 Kibana
一丶Zookkeeper概述
Zookeeper是一個開源的分布式的,為分布式框架提供協(xié)調(diào)服務的Apache項目。
//Zookeeper 工作機制 Zookeeper從設計模式角度來理解:是一個基于觀察者模式設計的分布式服務管理框架,它負責存儲和管理大家都關心的數(shù)據(jù),然后接受觀察者的注冊,一旦這些數(shù)據(jù)的狀態(tài)發(fā)生變化,Zookeeper就將負責通知已經(jīng)在Zookeeper上注冊的那些觀察者做出相應的反應。也就是說 Zookeeper = 文件系統(tǒng) + 通知機制
?二、Zookeeper 特點
(1)Zookeeper:一個領導者(Leader),多個跟隨者(Follower)組成的集群。
(2)Zookeepe集群中只要有半數(shù)以上節(jié)點存活,Zookeeper集群就能正常服務。所以Zookeeper適合安裝奇數(shù)臺服務器。
(3)全局數(shù)據(jù)一致:每個Server保存一份相同的數(shù)據(jù)副本,Client無論連接到哪個Server,數(shù)據(jù)都是一致的。
(4)更新請求順序執(zhí)行,來自同一個Client的更新請求按其發(fā)送順序依次執(zhí)行,即先進先出。
(5)數(shù)據(jù)更新原子性,一次數(shù)據(jù)更新要么成功,要么失敗。
(6)實時性,在一定時間范圍內(nèi),Client能讀到最新數(shù)據(jù)。 ?
2.1Zookeeper 應用場景
提供的服務包括:統(tǒng)一命名服務、統(tǒng)一配置管理、統(tǒng)一集群管理、服務器節(jié)點動態(tài)上下線、軟負載均衡等。
●統(tǒng)一命名服務 在分布式環(huán)境下,經(jīng)常需要對應用/服務進行統(tǒng)一命名,便于識別。例如:IP不容易記住,而域名容易記住。
●統(tǒng)一配置管理
(1)分布式環(huán)境下,配置文件同步非常常見。一般要求一個集群中,所有節(jié)點的配置信息是一致的,比如Kafka集群。對配置文件修改后,希望能夠快速同步到各個節(jié)點上。
(2)配置管理可交由ZooKeeper實現(xiàn)??蓪⑴渲眯畔懭隯ooKeeper上的一個Znode。各個客戶端服務器監(jiān)聽這個Znode。一旦 Znode中的數(shù)據(jù)被修改,ZooKeeper將通知各個客戶端服務器。
●統(tǒng)一集群管理
(1)分布式環(huán)境中,實時掌握每個節(jié)點的狀態(tài)是必要的。可根據(jù)節(jié)點實時狀態(tài)做出一些調(diào)整。
(2)ZooKeeper可以實現(xiàn)實時監(jiān)控節(jié)點狀態(tài)變化。可將節(jié)點信息寫入ZooKeeper上的一個ZNode。監(jiān)聽這個ZNode可獲取它的實時狀態(tài)變化。
●服務器動態(tài)上下線 客戶端能實時洞察到服務器上下線的變化。
●軟負載均衡 在Zookeeper中記錄每臺服務器的訪問數(shù),讓訪問數(shù)最少的服務器去處理最新的客戶端請求。
2.2Zookeeper 選舉機制
2.2.1第一次啟動選舉機制
(1)服務器1啟動,發(fā)起一次選舉。服務器1投自己一票。此時服務器1票數(shù)一票,不夠半數(shù)以上(3票),選舉無法完成,服務器1狀態(tài)保持為LOOKING;
(2)服務器2啟動,再發(fā)起一次選舉。服務器1和2分別投自己一票并交換選票信息:此時服務器1發(fā)現(xiàn)服務器2的myid比自己目前投票推舉的(服務器1)大,更改選票為推舉服務器2。此時服務器1票數(shù)0票,服務器2票數(shù)2票,沒有半數(shù)以上結果,選舉無法完成,服務器1,2狀態(tài)保持LOOKING
(3)服務器3啟動,發(fā)起一次選舉。此時服務器1和2都會更改選票為服務器3。此次投票結果:服務器1為0票,服務器2為0票,服務器3為3票。此時服務器3的票數(shù)已經(jīng)超過半數(shù),服務器3當選Leader。服務器1,2更改狀態(tài)為FOLLOWING,服務器3更改狀態(tài)為LEADING; (4)服務器4啟動,發(fā)起一次選舉。此時服務器1,2,3已經(jīng)不是LOOKING狀態(tài),不會更改選票信息。交換選票信息結果:服務器3為3票,服務器4為1票。此時服務器4服從多數(shù),更改選票信息為服務器3,并更改狀態(tài)為FOLLOWING;
(5)服務器5啟動,同4一樣當小弟。
2.2.2非第一次啟動選舉機制
(1)當ZooKeeper 集群中的一臺服務器出現(xiàn)以下兩種情況之一時,就會開始進入Leader選舉:? ? ? ? ? ?1)服務器初始化啟動。
? ? ? ?2)服務器運行期間無法和Leader保持連接。
(2)而當一臺機器進入Leader選舉流程時,當前集群也可能會處于以下兩種狀態(tài):
? ? ? ?1)集群中本來就已經(jīng)存在一個Leader。 對于已經(jīng)存在Leader的情況,機器試圖去選舉Leader時,會被告知當前服務器的Leader信息,對于該機器來說,僅僅需要和 Leader機器建立連接,并進行狀態(tài)同步即可。
? ? ? ?2)集群中確實不存在Leader。 假設ZooKeeper由5臺服務器組成,SID分別為1、2、3、4、5,ZXID分別為8、8、8、7、7,并且此時SID為3的服務器是Leader。某一時刻,3和5服務器出現(xiàn)故障,因此開始進行Leader選舉。
選舉Leader規(guī)則:
? ? ? ? ?1.EPOCH大的直接勝出
? ? ? ? ?2.EPOCH相同,事務id大的勝出
? ? ? ? ?3.事務id相同,服務器id大的勝出
SID:服務器ID。用來唯一標識一臺ZooKeeper集群中的機器,每臺機器不能重復,和myid一致。
ZXID:事務ID。ZXID是一個事務ID,用來標識一次服務器狀態(tài)的變更。在某一時刻,集群中的每臺機器的ZXID值不一定完全一致,這和ZooKeeper服務器對于客戶端“更新請求”的處理邏輯速度有關。
Epoch:每個Leader任期的代號。沒有Leader時同一輪投票過程中的邏輯時鐘值是相同的。每投完一次票這個數(shù)據(jù)就會增加
三、部署 Zookeeper 集群
準備 3 臺服務器做 Zookeeper 集群
192.168.237.21
192.168.237.22
192.168.237.23
3.1//安裝 JDK
yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel
java -version
?3.2安裝 Zookeeper
下載安裝包 官方下載地址:Index of /dist/zookeeper
cd /opt
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.5.7/apache-zookeeper-3.5.7-bin.tar.gz
tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz
mv apache-zookeeper-3.5.7-bin /usr/local/zookeeper-3.5.7
3.2.1修改配置文件
cd /usr/local/zookeeper-3.5.7/conf/
cp zoo_sample.cfg zoo.cfgvim zoo.cfgtickTime=2000 #通信心跳時間,Zookeeper服務器與客戶端心跳時間,單位毫秒
initLimit=10 #Leader和Follower初始連接時能容忍的最多心跳數(shù)(tickTime的數(shù)量),這里表示為10*2s
syncLimit=5 #Leader和Follower之間同步通信的超時時間,這里表示如果超過5*2s,Leader認為Follwer死掉,并從服務器列表中刪除Follwer
dataDir=/usr/local/zookeeper-3.5.7/data #修改,指定保存Zookeeper中的數(shù)據(jù)的目錄,目錄需要單獨創(chuàng)建
dataLogDir=/usr/local/zookeeper-3.5.7/logs #添加,指定存放日志的目錄,目錄需要單獨創(chuàng)建
clientPort=2181 #客戶端連接端口
#添加集群信息
server.1=192.168.237.21:3188:3288
server.2=192.168.237.22:3188:3288
server.3=192.168.237.23:3188:3288server.A=B:C:D
server.A=B:C:D
●A是一個數(shù)字,表示這個是第幾號服務器。集群模式下需要在zoo.cfg中dataDir指定的目錄下創(chuàng)建一個文件myid,這個文件里面有一個數(shù)據(jù)就是A的值,Zookeeper啟動時讀取此文件,拿到里面的數(shù)據(jù)與zoo.cfg里面的配置信息比較從而判斷到底是哪個server。
●B是這個服務器的地址。
●C是這個服務器Follower與集群中的Leader服務器交換信息的端口。●D是萬一集群中的Leader服務器掛了,需要一個端口來重新進行選舉,選出一個新的Leader,而這個端口就是用來執(zhí)行選舉時服務器相互通信的端口。
3.2.2拷貝配置好的 Zookeeper 配置文件到其他機器上
scp /usr/local/zookeeper-3.5.7/conf/zoo.cfg 192.168.23722:/usr/local/zookeeper-3.5.7/conf/
scp /usr/local/zookeeper-3.5.7/conf/zoo.cfg 192.168.237.23:/usr/local/zookeeper-3.5.7/conf/
?3.2.3在每個節(jié)點上創(chuàng)建數(shù)據(jù)目錄和日志目錄
mkdir /usr/local/zookeeper-3.5.7/data
mkdir /usr/local/zookeeper-3.5.7/logs
3.2.4在每個節(jié)點的dataDir指定的目錄下創(chuàng)建一個 myid 的文件
echo 1 > /usr/local/zookeeper-3.5.7/data/myid
echo 2 > /usr/local/zookeeper-3.5.7/data/myid
echo 3 > /usr/local/zookeeper-3.5.7/data/myid
3.2.5配置 Zookeeper 啟動腳本
vim /etc/init.d/zookeeper#!/bin/bash
#chkconfig:2345 20 90
#description:Zookeeper Service Control Script
ZK_HOME='/usr/local/zookeeper-3.5.7'
case $1 in
start)echo "---------- zookeeper 啟動 ------------"$ZK_HOME/bin/zkServer.sh start
;;
stop)echo "---------- zookeeper 停止 ------------"$ZK_HOME/bin/zkServer.sh stop
;;
restart)echo "---------- zookeeper 重啟 ------------"$ZK_HOME/bin/zkServer.sh restart
;;
status)echo "---------- zookeeper 狀態(tài) ------------"$ZK_HOME/bin/zkServer.sh status
;;
*)echo "Usage: $0 {start|stop|restart|status}"
esac# 設置開機自啟
chmod +x /etc/init.d/zookeeper
chkconfig --add zookeeper#分別啟動 Zookeeper
service zookeeper start#查看當前狀態(tài)
service zookeeper status
四、Kafka概述
4.1Kafka 定義
Kafka 是一個分布式的基于發(fā)布/訂閱模式的消息隊列(MQ,Message Queue),主要應用于大數(shù)據(jù)實時處理領域。
4.2Kafka 簡介
Kafka 是最初由 Linkedin 公司開發(fā),是一個分布式、支持分區(qū)的(partition)、多副本的(replica),基于 Zookeeper 協(xié)調(diào)的分布式消息中間件系統(tǒng),它的最大的特性就是可以實時的處理大量數(shù)據(jù)以滿足各種需求場景,比如基于 hadoop 的批處理系統(tǒng)、低延遲的實時系統(tǒng)、Spark/Flink 流式處理引擎,nginx 訪問日志,消息服務等等,用 scala 語言編寫, Linkedin 于 2010 年貢獻給了 Apache 基金會并成為頂級開源項目。
4.3為什么需要消息列隊
主要原因是由于在高并發(fā)環(huán)境下,同步請求來不及處理,請求往往會發(fā)生阻塞。比如大量的請求并發(fā)訪問數(shù)據(jù)庫,導致行鎖表鎖,最后請求線程會堆積過多,從而觸發(fā) too many connection 錯誤,引發(fā)雪崩效應。 我們使用消息隊列,通過異步處理請求,從而緩解系統(tǒng)的壓力。消息隊列常應用于異步處理,流量削峰,應用解耦,消息通訊等場景。
當前比較常見的 MQ 中間件有 ActiveMQ、RabbitMQ、RocketMQ、Kafka 等。
4.4使用消息隊列的好處
4.4.1解耦
允許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束。
4.4.2可恢復性
系統(tǒng)的一部分組件失效時,不會影響到整個系統(tǒng)。消息隊列降低了進程間的耦合度,所以即使一個處理消息的進程掛掉,加入隊列中的消息仍然可以在系統(tǒng)恢復后被處理。
4.4.3緩沖
有助于控制和優(yōu)化數(shù)據(jù)流經(jīng)過系統(tǒng)的速度,解決生產(chǎn)消息和消費消息的處理速度不一致的情況。
4.4.4靈活性
& 峰值處理能力 在訪問量劇增的情況下,應用仍然需要繼續(xù)發(fā)揮作用,但是這樣的突發(fā)流量并不常見。如果為以能處理這類峰值訪問為標準來投入資源隨時待命無疑是巨大的浪費。使用消息隊列能夠使關鍵組件頂住突發(fā)的訪問壓力,而不會因為突發(fā)的超負荷的請求而完全崩潰。
4.4.5異步通信
很多時候,用戶不想也不需要立即處理消息。消息隊列提供了異步處理機制,允許用戶把一個消息放入隊列,但并不立即處理它。想向隊列中放入多少消息就放多少,然后在需要的時候再去處理它們。
4.5消息隊列的兩種模式
4.5.1點對點模式
(一對一,消費者主動拉取數(shù)據(jù),消息收到后消息清除)
消息生產(chǎn)者生產(chǎn)消息發(fā)送到消息隊列中,然后消息消費者從消息隊列中取出并且消費消息。消息被消費以后,消息隊列中不再有存儲,所以消息消費者不可能消費到已經(jīng)被消費的消息。消息隊列支持存在多個消費者,但是對一個消息而言,只會有一個消費者可以消費。
4.5.2發(fā)布/訂閱模式
(一對多,又叫觀察者模式,消費者消費數(shù)據(jù)之后不會清除消息)
消息生產(chǎn)者(發(fā)布)將消息發(fā)布到 topic 中,同時有多個消息消費者(訂閱)消費該消息。和點對點方式不同,發(fā)布到 topic 的消息會被所有訂閱者消費。 發(fā)布/訂閱模式是定義對象間一種一對多的依賴關系,使得每當一個對象(目標對象)的狀態(tài)發(fā)生改變,則所有依賴于它的對象(觀察者對象)都會得到通知并自動更新。
4.6Kafka 的特性
(1)Broker 一臺 kafka 服務器就是一個 broker。一個集群由多個 broker 組成。一個 broker 可以容納多個 topic。
(2)Topic 可以理解為一個隊列,生產(chǎn)者和消費者面向的都是一個 topic。 類似于數(shù)據(jù)庫的表名或者 ES 的 index 物理上不同 topic 的消息分開存儲
(3)Partition 為了實現(xiàn)擴展性,一個非常大的 topic 可以分布到多個 broker(即服務器)上,一個 topic 可以分割為一個或多個 partition,每個 partition 是一個有序的隊列。Kafka 只保證 partition 內(nèi)的記錄是有序的,而不保證 topic 中不同 partition 的順序
4.7artation 數(shù)據(jù)路由規(guī)則
1.指定了 patition,則直接使用;
2.未指定 patition 但指定 key(相當于消息中某個屬性),通過對 key 的 value 進行 hash 取模,選出一個 patition;
3.patition 和 key 都未指定,使用輪詢選出一個 patition。
每條消息都會有一個自增的編號,用于標識消息的偏移量,標識順序從 0 開始。
每個 partition 中的數(shù)據(jù)使用多個 segment 文件存儲。
如果 topic 有多個 partition,消費數(shù)據(jù)時就不能保證數(shù)據(jù)的順序。嚴格保證消息的消費順序的場景下(例如商品秒殺、 搶紅包),需
要將 partition 數(shù)目設為 1。
五、?部署 kafka 集群
5.1安裝 Kafka
官方下載地址:Apache Kafka
cd /opt wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.7.1/kafka_2.13-2.7.1.tgz
cd /opt/
tar zxvf kafka_2.13-2.7.1.tgz
mv kafka_2.13-2.7.1 /usr/local/kafka
5.1.1修改配置文件
cd /usr/local/kafka/config/
cp server.properties{,.bak}vim server.properties
broker.id=0 ●21行,broker的全局唯一編號,每個broker不能重復,因此要在其他機器上配置 broker.id=1、broker.id=2
listeners=PLAINTEXT://192.168.10.17:9092 ●31行,指定監(jiān)聽的IP和端口,如果修改每個broker的IP需區(qū)分開來,也可保持默認配置不用修改
num.network.threads=3 #42行,broker 處理網(wǎng)絡請求的線程數(shù)量,一般情況下不需要去修改
num.io.threads=8 #45行,用來處理磁盤IO的線程數(shù)量,數(shù)值應該大于硬盤數(shù)
socket.send.buffer.bytes=102400 #48行,發(fā)送套接字的緩沖區(qū)大小
socket.receive.buffer.bytes=102400 #51行,接收套接字的緩沖區(qū)大小
socket.request.max.bytes=104857600 #54行,請求套接字的緩沖區(qū)大小
log.dirs=/usr/local/kafka/logs #60行,kafka運行日志存放的路徑,也是數(shù)據(jù)存放的路徑
num.partitions=1 #65行,topic在當前broker上的默認分區(qū)個數(shù),會被topic創(chuàng)建時的指定參數(shù)覆蓋
num.recovery.threads.per.data.dir=1 #69行,用來恢復和清理data下數(shù)據(jù)的線程數(shù)量
log.retention.hours=168 #103行,segment文件(數(shù)據(jù)文件)保留的最長時間,單位為小時,默認為7天,超時將被刪除
log.segment.bytes=1073741824 #110行,一個segment文件最大的大小,默認為 1G,超出將新建一個新的segment文件
zookeeper.connect=192.168.237.21:2181,192.168.237.22:2181,192.168.237.23:2181 ●123行,配置連接Zookeeper集群地址
5.1.2修改環(huán)境變量
vim /etc/profile
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/binsource /etc/profile//配置 Zookeeper 啟動腳本
vim /etc/init.d/kafka
#!/bin/bash
#chkconfig:2345 22 88
#description:Kafka Service Control Script
KAFKA_HOME='/usr/local/kafka'
case $1 in
start)echo "---------- Kafka 啟動 ------------"${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
;;
stop)echo "---------- Kafka 停止 ------------"${KAFKA_HOME}/bin/kafka-server-stop.sh
;;
restart)$0 stop$0 start
;;
status)echo "---------- Kafka 狀態(tài) ------------"count=$(ps -ef | grep kafka | egrep -cv "grep|$$")if [ "$count" -eq 0 ];thenecho "kafka is not running"elseecho "kafka is running"fi
;;
*)echo "Usage: $0 {start|stop|restart|status}"
esac//設置開機自啟
chmod +x /etc/init.d/kafka
chkconfig --add kafka//分別啟動 Kafka
service kafka start3.Kafka 命令行操作
//創(chuàng)建topic
kafka-topics.sh --create --zookeeper 192.168.10.17:2181,192.168.10.21:2181,192.168.10.22:2181 --replication-factor 2 --partitions 3 --topic testkafka-topics.sh --create --zookeeper 192.168.10.17:2181,192.168.10.20:2181,192.168.10.21:2181 --replication-factor 2 --partitions 3 --topic test--zookeeper:定義 zookeeper 集群服務器地址,如果有多個 IP 地址使用逗號分割,一般使用一個 IP 即可
--replication-factor:定義分區(qū)副本數(shù),1 代表單副本,建議為 2
--partitions:定義分區(qū)數(shù) --topic:定義 topic 名稱//查看當前服務器中的所有 topic
kafka-topics.sh --list --zookeeper 192.168.10.17:2181,192.168.10.21:2181,192.168.10.22:2181//查看某個 topic 的詳情
kafka-topics.sh --describe --zookeeper 192.168.10.17:2181,192.168.10.21:2181,192.168.10.22:2181//發(fā)布消息
kafka-console-producer.sh --broker-list 192.168.10.17:9092,192.168.10.21:9092,192.168.10.22:9092 --topic test//消費消息
kafka-console-consumer.sh --bootstrap-server 192.168.10.17:9092,192.168.10.21:9092,192.168.10.22:9092 --topic test --from-beginning---------------------------------------------------------------------------------------from-beginning:會把主題中以往所有的數(shù)據(jù)都讀取出來//修改分區(qū)數(shù)
kafka-topics.sh --zookeeper 192.168.10.17:2181,192.168.10.21:2181,192.168.10.22:2181 --alter --topic test --partitions 6//刪除 topic
kafka-topics.sh --delete --zookeeper 192.168.10.17:2181,192.168.10.21:2181,192.168.10.22:2181 --topic test
六、Kafka 架構深入
6.1Kafka 工作流程及文件存儲機制
Kafka 中消息是以 topic 進行分類的,生產(chǎn)者生產(chǎn)消息,消費者消費消息,都是面向 topic 的。
topic 是邏輯上的概念,而 partition 是物理上的概念,每個 partition 對應于一個 log 文件,該 log 文件中存儲的就是 producer 生產(chǎn)的數(shù)據(jù)。Producer 生產(chǎn)的數(shù)據(jù)會被不斷追加到該 log 文件末端,且每條數(shù)據(jù)都有自己的 offset。 消費者組中的每個消費者,都會實時記錄自己消費到了哪個 offset,以便出錯恢復時,從上次的位置繼續(xù)消費。
由于生產(chǎn)者生產(chǎn)的消息會不斷追加到 log 文件末尾,為防止 log 文件過大導致數(shù)據(jù)定位效率低下,Kafka 采取了分片和索引機制,將每個 partition 分為多個 segment。每個 segment 對應兩個文件:“.index” 文件和 “.log” 文件。這些文件位于一個文件夾下,該文件夾的命名規(guī)則為:topic名稱+分區(qū)序號。例如,test 這個 topic 有三個分區(qū), 則其對應的文件夾為 test-0、test-1、test-2。
index 和 log 文件以當前 segment 的第一條消息的 offset 命名。
“.index” 文件存儲大量的索引信息,“.log” 文件存儲大量的數(shù)據(jù),索引文件中的元數(shù)據(jù)指向對應數(shù)據(jù)文件中 message 的物理偏移地址。
數(shù)據(jù)可靠性保證
為保證 producer 發(fā)送的數(shù)據(jù),能可靠的發(fā)送到指定的 topic,topic 的每個 partition 收到 producer 發(fā)送的數(shù)據(jù)后, 都需要向 producer 發(fā)送 ack(acknowledgement 確認收到),如果 producer 收到 ack,就會進行下一輪的發(fā)送,否則重新發(fā)送數(shù)據(jù)。
數(shù)據(jù)一致性問題
LEO:指的是每個副本最大的 offset;
HW:指的是消費者能見到的最大的 offset,所有副本中最小的 LEO。
(1)follower 故障 follower 發(fā)生故障后會被臨時踢出 ISR(Leader 維護的一個和 Leader 保持同步的 Follower 集合),待該 follower 恢復后,follower 會讀取本地磁盤記錄的上次的 HW,并將 log 文件高于 HW 的部分截取掉,從 HW 開始向 leader 進行同步。等該 follower 的 LEO 大于等于該 Partition 的 HW,即 follower 追上 leader 之后,就可以重新加入 ISR 了。
(2)leader 故障 leader 發(fā)生故障之后,會從 ISR 中選出一個新的 leader, 之后,為保證多個副本之間的數(shù)據(jù)一致性,其余的 follower 會先將各自的 log 文件高于 HW 的部分截掉,然后從新的 leader 同步數(shù)據(jù)。
注:這只能保證副本之間的數(shù)據(jù)一致性,并不能保證數(shù)據(jù)不丟失或者不重復。
//ack 應答機制 對于某些不太重要的數(shù)據(jù),對數(shù)據(jù)的可靠性要求不是很高,能夠容忍數(shù)據(jù)的少量丟失,所以沒必要等 ISR 中的 follower 全部接收成功。所以 Kafka 為用戶提供了三種可靠性級別,用戶根據(jù)對可靠性和延遲的要求進行權衡選擇。
當 producer 向 leader 發(fā)送數(shù)據(jù)時,可以通過 request.required.acks 參數(shù)來設置數(shù)據(jù)可靠性的級別: ●0:這意味著producer無需等待來自broker的確認而繼續(xù)發(fā)送下一批消息。這種情況下數(shù)據(jù)傳輸效率最高,但是數(shù)據(jù)可靠性確是最低的。當broker故障時有可能丟失數(shù)據(jù)。
●1(默認配置):這意味著producer在ISR中的leader已成功收到的數(shù)據(jù)并得到確認后發(fā)送下一條message。如果在follower同步成功之前l(fā)eader故障,那么將會丟失數(shù)據(jù)。
●-1(或者是all):producer需要等待ISR中的所有follower都確認接收到數(shù)據(jù)后才算一次發(fā)送完成,可靠性最高。但是如果在 follower 同步完成后,broker 發(fā)送ack 之前,leader 發(fā)生故障,那么會造成數(shù)據(jù)重復。
三種機制性能依次遞減,數(shù)據(jù)可靠性依次遞增
?注:在 0.11 版本以前的Kafka,對此是無能為力的,只能保證數(shù)據(jù)不丟失,再在下游消費者對數(shù)據(jù)做全局去重。在 0.11 及以后版本的 Kafka,引入了一項重大特性:冪等性。所謂的冪等性就是指 Producer 不論向 Server 發(fā)送多少次重復數(shù)據(jù), Server 端都只會持久化一條。
七、 部署 Zookeeper+Kafka 集群
7.1部署 Filebeat
cd /usr/local/filebeatvim filebeat.yml
filebeat.prospectors:- type: logenabled: truepaths:- /var/log/httpd/access_logtags: ["access"]- type: logenabled: truepaths:- /var/log/httpd/error_logtags: ["error"]......#添加輸出到 Kafka 的配置
output.kafka:enabled: truehosts: ["192.168.10.17:9092","192.168.10.21:9092","192.168.10.22:9092"] #指定 Kafka 集群配置topic: "httpd" #指定 Kafka 的 topic#啟動 filebeat
./filebeat -e -c filebeat.yml
7.2部署 ELK,在 Logstash 組件所在節(jié)點上新建一個 Logstash 配置文件
cd /etc/logstash/conf.d/vim kafka.conf
input {kafka {bootstrap_servers => "192.168.10.17:9092,192.168.10.21:9092,192.168.10.22:9092" #kafka集群地址topics => "httpd" #拉取的kafka的指定topictype => "httpd_kafka" #指定 type 字段codec => "json" #解析json格式的日志數(shù)據(jù)auto_offset_reset => "latest" #拉取最近數(shù)據(jù),earliest為從頭開始拉取decorate_events => true #傳遞給elasticsearch的數(shù)據(jù)額外增加kafka的屬性數(shù)據(jù)}
}output {if "access" in [tags] {elasticsearch {hosts => ["192.168.10.15:9200"]index => "httpd_access-%{+YYYY.MM.dd}"}}if "error" in [tags] {elasticsearch {hosts => ["192.168.10.15:9200"]index => "httpd_error-%{+YYYY.MM.dd}"}}stdout { codec => rubydebug }
}#啟動 logstash
logstash -f kafka.conf
注:生產(chǎn)黑屏操作es時查看所有的索引:curl -X GET "localhost:9200/_cat/indices?v"
?7.3瀏覽器訪問 http://192.168.10.13:5601 登錄 Kibana
單擊“Create Index Pattern”按鈕添加索引“filebeat_test-*”,單擊 “create” 按鈕創(chuàng)建,單擊 “Discover” 按鈕可查看圖表信息及日志信息。