app開發(fā)和網(wǎng)站建設(shè)區(qū)別怎么注冊一個自己的網(wǎng)站
- Kafka簡介
- 集群部署
- 配置Kafka
- 測試Kafka
1.Kafka簡介
數(shù)據(jù)緩沖隊列。同時提高了可擴展性。具有峰值處理能力,使用消息隊列能夠使關(guān)鍵組件頂住突發(fā)的訪問壓力,而不會因為突發(fā)的超負荷的請求而完全崩潰。
Kafka是一個分布式、支持分區(qū)的(partition)、多副本的(replica),基于zookeeper協(xié)調(diào)的分布式消息系統(tǒng),它的最大的特性就是可以實時的處理大量數(shù)據(jù)以滿足各種需求場景:比如基于hadoop的批處理系統(tǒng)、低延遲的實時系統(tǒng)、web/nginx日志、訪問日志,消息服務(wù)等等,用scala語言編寫,Linkedin于2010年貢獻給了Apache基金會并成為頂級開源項目。
特性:
高吞吐量:kafka每秒可以處理幾十萬條消息。
可擴展性:kafka集群支持熱擴展- 持久性、
可靠性:消息被持久化到本地磁盤,并且支持數(shù)據(jù)備份防止數(shù)據(jù)丟失
容錯性:允許集群中節(jié)點失敗(若副本數(shù)量為n,則允許n-1個節(jié)點失敗)
高并發(fā):支持數(shù)千個客戶端同時讀寫
它主要包括以下組件:
話題(Topic):是特定類型的消息流。(每條發(fā)布到 kafka 集群的消息屬于的類別,即 kafka 是面向 topic 的。)
生產(chǎn)者(Producer):是能夠發(fā)布消息到話題的任何對象(發(fā)布消息到 kafka 集群的終端或服務(wù)).
消費者(Consumer):可以訂閱一個或多個話題,從而消費這些已發(fā)布的消息。
服務(wù)代理(Broker):已發(fā)布的消息保存在一組服務(wù)器中,它們被稱為代理(Broker)或Kafka集群。partition(區(qū)):每個 topic 包含一個或多個 partition。
replication:partition 的副本,保障 partition 的高可用。
leader:replica 中的一個角色, producer 和 consumer 只跟 leader 交互。
follower:replica 中的一個角色,從 leader 中復制數(shù)據(jù)。
zookeeper:kafka 通過 zookeeper 來存儲集群的信息。
Zookeeper:
ZooKeeper是一個分布式協(xié)調(diào)服務(wù),它的主要作用是為分布式系統(tǒng)提供一致性服務(wù),提供的功能包括:配置維護、分布式同步等。Kafka的運行依賴ZooKeeper。 ?也是java微服務(wù)里面使用的一個注冊中心服務(wù)
ZooKeeper主要用來協(xié)調(diào)Kafka的各個broker,不僅可以實現(xiàn)broker的負載均衡,而且當增加了broker或者某個broker故障了,ZooKeeper將會通知生產(chǎn)者和消費者,這樣可以保證整個系統(tǒng)正常運轉(zhuǎn)。
在Kafka中,一個topic會被分成多個區(qū)并被分到多個broker上,分區(qū)的信息以及broker的分布情況與消費者當前消費的狀態(tài)信息都會保存在ZooKeeper中。
2.集群部署
? ? ? ? 2.1環(huán)境
系統(tǒng):Centos-Stream7
節(jié)點:
192.168.26.166? ?es01 ?
192.168.26.170? ?es02 ?
192.168.26.171???es03
軟件版本:kafka_2.12-3.0.2.tgz
? ? ? ? 2.2??安裝配置jdk8
#yum install -y java-1.8.0-openjdk
? ? ? ? 2.3??安裝配置zookeeper
在配置中要注意每個配置項后面不要有空格否則會導致zookeeper啟動不起來!!!!
Kafka運行依賴ZK,Kafka官網(wǎng)提供的tar包中,已經(jīng)包含了ZK,這里不再額外下載ZK程序。
配置相互解析---三臺機器(在es集群上安裝的kafka):
# vim /etc/hosts
192.168.26.166? ?es01 ?
192.168.26.170? ?es02 ?
192.168.26.171???es03
安裝Kafka:
# wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/3.0.2/kafka_2.12-3.0.2.tgz
# tar xzvf kafka_2.12-2.8.0.tgz -C /usr/local/
# mv /usr/local/kafka_2.12-2.8.0/ /usr/local/kafka/
配置zookeeper:
在es01節(jié)點中:
# sed -i 's/^[^#]/#&/' /usr/local/kafka/config/zookeeper.properties
# vim /usr/local/kafka/config/zookeeper.properties ?#添加如下配置
dataDir=/opt/data/zookeeper/data ?# 需要創(chuàng)建,所有節(jié)點一致
dataLogDir=/opt/data/zookeeper/logs # 需要創(chuàng)建,所有節(jié)點一致
clientPort=2181?
tickTime=2000?
initLimit=20?
syncLimit=10?# 以下 IP 信息根據(jù)自己服務(wù)器的 IP 進行修改
server.1=192.168.19.20:2888:3888 ?//kafka集群IP:Port
server.2=192.168.19.21:2888:3888
server.3=192.168.19.22:2888:3888
#創(chuàng)建data、log目錄# mkdir -p /opt/data/zookeeper/{data,logs}
#創(chuàng)建myid文件
# echo 1 > /opt/data/zookeeper/data/myid ? ? #myid號按順序排
在es02節(jié)點中:
# sed -i 's/^[^#]/#&/' /usr/local/kafka/config/zookeeper.properties
# vim /usr/local/kafka/config/zookeeper.properties
dataDir=/opt/data/zookeeper/data?
dataLogDir=/opt/data/zookeeper/logs
clientPort=2181?
tickTime=2000?
initLimit=20?
syncLimit=10?
server.1=192.168.19.20:2888:3888
server.2=192.168.19.21:2888:3888
server.3=192.168.19.22:2888:3888
#創(chuàng)建data、log目錄# mkdir -p /opt/data/zookeeper/{data,logs}
#創(chuàng)建myid文件
# echo 2 > /opt/data/zookeeper/data/myid
在es03節(jié)點中:
# sed -i 's/^[^#]/#&/' /usr/local/kafka/config/zookeeper.properties
# vim /usr/local/kafka/config/zookeeper.properties
dataDir=/opt/data/zookeeper/data?
dataLogDir=/opt/data/zookeeper/logs
clientPort=2181?
tickTime=2000?
initLimit=20?
syncLimit=10?
server.1=192.168.19.20:2888:3888
server.2=192.168.19.21:2888:3888
server.3=192.168.19.22:2888:3888
#創(chuàng)建data、log目錄# mkdir -p /opt/data/zookeeper/{data,logs}
#創(chuàng)建myid文件
# echo 3 > /opt/data/zookeeper/data/myid
配置項含義:
dataDir ?? ?ZK數(shù)據(jù)存放目錄。
dataLogDir ?ZK日志存放目錄。
clientPort ?客戶端連接ZK服務(wù)的端口。
tickTime ? ?ZK服務(wù)器之間或客戶端與服務(wù)器之間維持心跳的時間間隔。
initLimit ? 允許follower連接并同步到Leader的初始化連接時間,當初始化連接時間超過該值,則表示連接失敗。
syncLimit ? Leader與Follower之間發(fā)送消息時如果follower在設(shè)置時間內(nèi)不能與leader通信,那么此follower將會被丟棄。
server.1=192.168.19.20:2888:3888 ? ?2888是follower與leader交換信息的端口,3888是當leader掛了時用來執(zhí)行選舉時服務(wù)器相互通信的端口。
3.配置Kafka
? ? ? ? 3.1??配置
# sed -i 's/^[^#]/#&/' /usr/local/kafka/config/server.properties
# vim /usr/local/kafka/config/server.properties ?#在最后添加
broker.id=1 ?#改?
listeners=PLAINTEXT://192.168.19.20:9092 ? #改
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/data/kafka/logs ?
num.partitions=6
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=536870912
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.19.20:2181,192.168.19.21:2181,192.168.19.22:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
[root@es01 ~]# mkdir -p /opt/data/kafka/logs
? ? ? ? 3.2??其他節(jié)點配置
只需把配置好的安裝包直接分發(fā)到其他節(jié)點,修改 Kafka的broker.id和 listeners就可以了。
? ? ? ? 3.3??配置項含義
broker.id?
?? ?每一個broker在集群中的唯一標識,要求是正數(shù)。在改變IP地址,不改變broker.id的時不會影響consumers
listeners=PLAINTEXT://192.168.19.22:9092 ? ? ??
?? ?監(jiān)聽地址
num.network.threads ?
?? ?broker 處理消息的最大線程數(shù),一般情況下不需要去修改
num.io.threads
?? ?broker處理磁盤IO 的線程數(shù) ,數(shù)值應該大于你的硬盤數(shù)
socket.send.buffer.bytes ??? ??? ?
?? ?socket的發(fā)送緩沖區(qū)
socket.receive.buffer.bytes?? ??? ?
?? ?socket的接收緩沖區(qū)
socket.request.max.bytes
?? ?socket請求的最大數(shù)值,防止serverOOM,message.max.bytes必然要小于socket.request.max.bytes,會被topic創(chuàng)建時的指定參數(shù)覆蓋
log.dirs ? ? ? ?日志文件目錄
num.partitions
num.recovery.threads.per.data.dir ? 每個數(shù)據(jù)目錄(數(shù)據(jù)目錄即指的是上述log.dirs配置的目錄路徑)用于日志恢復啟動和關(guān)閉時的線程數(shù)量。
offsets.topic.replication.factortransaction state log replication factor ?事務(wù)主題的復制因子(設(shè)置更高以確??捎眯?#xff09;。 內(nèi)部主題創(chuàng)建將失敗,直到群集大小滿足此復制因素要求
log.cleanup.policy = delete
?? ?日志清理策略 選擇有:delete和compact 主要針對過期數(shù)據(jù)的處理,或是日志文件達到限制的額度,會被 topic創(chuàng)建時的指定參數(shù)覆蓋
log.cleanup.interval.mins=1
?? ?指定日志每隔多久檢查看是否可以被刪除,默認1分鐘?? ?
log.retention.hours
?? ?數(shù)據(jù)存儲的最大時間 超過這個時間 會根據(jù)log.cleanup.policy設(shè)置的策略處理數(shù)據(jù),也就是消費端能夠多久去消費數(shù)據(jù)。log.retention.bytes和log.retention.minutes或者log.retention.hours任意一個達到要求,都會執(zhí)行刪除,會被topic創(chuàng)建時的指定參數(shù)覆蓋log.segment.bytes
?? ?topic的分區(qū)是以一堆segment文件存儲的,這個控制每個segment的大小,會被topic創(chuàng)建時的指定參數(shù)覆蓋
log.retention.check.interval.ms?
?? ?文件大小檢查的周期時間,是否觸發(fā) log.cleanup.policy中設(shè)置的策略
zookeeper.connect ??
?? ?ZK主機地址,如果zookeeper是集群則以逗號隔開。
zookeeper.connection.timeout.ms ? ??
?? ?連接到Zookeeper的超時時間。
4.測試Kafka
? ? ? ? 4.1? 啟動zookeeper集群
在三個節(jié)點依次執(zhí)行:
# cd /usr/local/kafka
# nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
查看端口:
# netstat -lntp | grep 2181
? ? ? ? 4.2? 啟動Kafka
在三個節(jié)點依次執(zhí)行:
# cd /usr/local/kafka
# nohup bin/kafka-server-start.sh config/server.properties &
? ? ? ? 4.3? 測驗
驗證??在192.168.26.166上創(chuàng)建topic:
# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testtopic
參數(shù)解釋:
–zookeeper指定zookeeper的地址和端口,
–partitions指定partition的數(shù)量,
–replication-factor指定數(shù)據(jù)副本的數(shù)量在26.170上面查詢192.168.26.166上的topic:
[root@es03 kafka]# bin/kafka-topics.sh --zookeeper 192.168.26.166:2181 --list
testtopic
? ? ? ? (二)模擬消息生產(chǎn)和消費
發(fā)送消息到192.168.26.166:
[root@es01 kafka]# bin/kafka-console-producer.sh --broker-list 192.168.19.20:9092 --topic testtopic
>hello
>你好呀
>
從192.168.26.171接受消息:
[root@es02 kafka]# bin/kafka-console-consumer.sh --bootstrap-server ?192.168.19.21:9092 --topic testtopic --from-beginning