盤龍區(qū)網(wǎng)絡(luò)推廣群排名優(yōu)化軟件官網(wǎng)
參考:?【kafka專欄】不用zookeeper怎么安裝kafka集群-最新kafka3.0版本
一、kafka集群實(shí)例角色規(guī)劃
在本專欄的之前的一篇文章《kafka3種zk的替代方案》已經(jīng)為大家介紹過在kafka3.0種已經(jīng)可以將zookeeper去掉。
上圖中黑色代表broker(消息代理服務(wù)),褐色/藍(lán)色代表Controller(集群控制器服務(wù))
- 左圖(kafka2.0):一個集群所有節(jié)點(diǎn)都是broker角色,kafka從三個broker中選舉出來一個Controller控制器,控制器將集群元數(shù)據(jù)信息(比如主題分類、消費(fèi)進(jìn)度等)保存到zookeeper,用于集群各節(jié)點(diǎn)之間分布式交互。
- 右圖(kafka3.0):假設(shè)一個集群有四個broker,指定三個作為Conreoller角色(藍(lán)色),從三個Controller中選舉出來一個Controller作為主控制器(褐色),其他的2個備用。zookeeper不再被需要!相關(guān)的元數(shù)據(jù)信息以kafka日志的形式存在(即:以消息隊(duì)列消息的形式存在)。
- controller通信端口:9093, 作用與zk的2181端口類似 。
在搭建kafka3.0集群之前, 我們需要先做好kafka實(shí)例角色規(guī)劃。(四個broker, 需要通過主動配置指定三個作為Controller, Controller需要奇數(shù)個, 這一點(diǎn)和zk是一樣的)
主機(jī)名稱 | ip | 角色 | node.id |
---|---|---|---|
kafka-vm1 | 192.168.1.111 | broker,controller | 1 |
kafka-vm2 | 192.168.1.112 | broker,controller | 2 |
kafka-vm3 | 192.168.1.113 | broker,controller | 3 |
kafka-vm4 | 192.168.1.114 | broker | 4 |
二、準(zhǔn)備工作
- kafka3.x不再支持JDK8,建議安裝JDK11或JDK17。
- 新建kafka持久化日志數(shù)據(jù)mkdir -p /data/kafka;并保證安裝kafka的用戶具有該目錄的讀寫權(quán)限。
各個機(jī)器節(jié)點(diǎn)執(zhí)行:
- # 安裝jdk(kafka3.x不再支持JDK8,建議安裝JDK11或JDK17, 這里安裝jdk11)
- # 下載安裝jdk11, 參考: https://blog.csdn.net/justlpf/article/details/127268046
-
- # 下載kafka
- adduser kafka
- cd /opt
- wget https://downloads.apache.org/kafka/3.3.1/kafka_2.12-3.3.1.tgz
- tar -xf kafka_2.12-3.3.1.tgz
-
- chown -R kafka:kafka kafka_2.12-3.3.1*
-
- mkdir -p /data/kafka
- chown -R kafka:kafka /data/kafka
192.168.1.111 data-vm1 192.168.1.112 data-vm2 192.168.1.113 data-vm3 192.168.1.114 data-vm4
三、修改Kraft協(xié)議配置文件
在kafka3.x版本中,使用Kraft協(xié)議代替zookeeper進(jìn)行集群的Controller選舉,所以要針對它進(jìn)行配置。
vi /opt/kafka_2.12-3.3.1/config/kraft/server.properties
具體配置參數(shù)如下:
- # data-vm1節(jié)點(diǎn)
- node.id=1
- process.roles=broker,controller
- listeners=PLAINTEXT://data-vm1:9092,CONTROLLER://data-vm1:9093
- advertised.listeners=PLAINTEXT://:9092
- controller.quorum.voters=1@data-vm1:9093,2@data-vm2:9093,3@data-vm3:9093
- log.dirs=/data/kafka/
-
- # data-vm2節(jié)點(diǎn)
- node.id=2
- process.roles=broker,controller
- listeners=PLAINTEXT://data-vm2:9092,CONTROLLER://data-vm2:9093
- advertised.listeners=PLAINTEXT://:9092
- controller.quorum.voters=1@data-vm1:9093,2@data-vm2:9093,3@data-vm3:9093
- log.dirs=/data/kafka/
-
- # data-vm3節(jié)點(diǎn)
- node.id=3
- process.roles=broker,controller
- listeners=PLAINTEXT://data-vm3:9092,CONTROLLER://data-vm3:9093
- advertised.listeners=PLAINTEXT://:9092
- controller.quorum.voters=1@data-vm1:9093,2@data-vm2:9093,3@data-vm3:9093
- log.dirs=/data/kafka/
node.id
:這將作為集群中的節(jié)點(diǎn) ID,唯一標(biāo)識,按照我們事先規(guī)劃好的(上文),在不同的服務(wù)器上這個值不同。其實(shí)就是kafka2.0中的broker.id,只是在3.0版本中kafka實(shí)例不再只擔(dān)任broker角色,也有可能是controller角色,所以改名叫做node節(jié)點(diǎn)。
process.roles
:一個節(jié)點(diǎn)可以充當(dāng)broker或controller或兩者兼而有之。按照我們事先規(guī)劃好的(上文),在不同的服務(wù)器上這個值不同。多個角色用逗號分開。
listeners
: broker將使用9092端口,而kraft controller控制器將使用9093端口。
advertised.listeners
: 這里指定kafka通過代理暴漏的地址,如果都是局域網(wǎng)使用,就配置PLAINTEXT://:9092即可。
controller.quorum.voters
:這個配置用于指定controller主控選舉的投票節(jié)點(diǎn),所有process.roles包含controller角色的規(guī)劃節(jié)點(diǎn)都要參與,即:zimug1、zimug2、zimug3。其配置格式為:node.id1@host1:9093,node.id2@host2:9093
log.dirs
:kafka 將存儲數(shù)據(jù)的日志目錄,在準(zhǔn)備工作中創(chuàng)建好的目錄。
所有kafka節(jié)點(diǎn)都要按照上文中的節(jié)點(diǎn)規(guī)劃進(jìn)行配置,完成config/kraft/server.properties配置文件的修改。
四、格式化存儲目錄
生成一個唯一的集群ID(在一臺kafka服務(wù)器上執(zhí)行一次即可),這一個步驟是在安裝kafka2.0版本的時候不存在的。
$ /opt/kafka_2.12-3.3.1/bin/kafka-storage.sh random-uuid SzIhECn-QbCLzIuNxk1A2A
使用生成的集群ID+配置文件格式化存儲目錄log.dirs,
所以這一步確認(rèn)配置及路徑確實(shí)存在,
并且kafka用戶有訪問權(quán)限(檢查準(zhǔn)備工作是否做對)。
每一臺主機(jī)服務(wù)器都要執(zhí)行命令:
/opt/kafka_2.12-3.3.1/bin/kafka-storage.sh format \ -t SzIhECn-QbCLzIuNxk1A2A \ -c /opt/kafka_2.12-3.3.1/config/kraft/server.properties
格式化操作完成之后,log.dirs?目錄下多出一個Meta.properties文件?,存儲了當(dāng)前的kafka節(jié)點(diǎn)的id(node.id),當(dāng)前節(jié)點(diǎn)屬于哪個集群(cluster.id)
- [root@data-vm2 ~]# ll /data/kafka/
- 總用量 8
- -rw-r--r--. 1 root root 249 10月 11 18:23 bootstrap.checkpoint
- -rw-r--r--. 1 root root 86 10月 11 18:23 meta.properties
-
- $ cat /data/kafka/meta.properties
- #
- #Tue Apr 12 07:39:07 CST 2022
- node.id=1
- version=1
- cluster.id=SzIhECn-QbCLzIuNxk1A2A
五、 啟動集群,完成基礎(chǔ)測試
zimug1 zimug2 zimug3是三臺應(yīng)用服務(wù)器的主機(jī)名稱(參考上文中的角色規(guī)劃),實(shí)現(xiàn)方式已經(jīng)在本專欄《linux主機(jī)與ip解析》中進(jìn)行了說明。將下面的命令集合保存為一個shell腳本,并賦予執(zhí)行權(quán)限。執(zhí)行該腳本即可啟動kafka集群所有的節(jié)點(diǎn),前提是:你已經(jīng)按照本專欄的《集群各節(jié)點(diǎn)之間的ssh免密登錄》安裝方式做了集群各節(jié)點(diǎn)之間的ssh免密登錄。
啟動命令:
- bin/kafka-server-start.sh \
- /opt/kafka_2.12-3.3.1/config/kraft/server.properties
-
- # 后臺運(yùn)行
- nohup bin/kafka-server-start.sh \
- /opt/kafka_2.12-3.3.1/config/kraft/server.properties 2>&1 &
腳本:?
- #!/bin/bash
- kafkaServers='data-vm1 data-vm2 data-vm3'
- #啟動所有的kafka
- for kafka in $kafkaServers
- do
- ssh -T $kafka <<EOF
- nohup /opt/kafka_2.12-3.3.1/bin/kafka-server-start.sh /opt/kafka_2.12-3.3.1/config/kraft/server.properties 1>/dev/null 2>&1 &
- EOF
- echo 從節(jié)點(diǎn) $kafka 啟動kafka3.0...[ done ]
- sleep 5
- done
六、一鍵停止集群腳本
一鍵停止kafka集群各節(jié)點(diǎn)的腳本,與啟動腳本的使用方式及原理是一樣的。
停止命令:
/opt/kafka_2.12-3.3.1/bin/kafka-server-stop.sh
執(zhí)行腳本:
- #!/bin/bash
- kafkaServers='data-vm1 data-vm2 data-vm3'
- #停止所有的kafka
- for kafka in $kafkaServers
- do
- ssh -T $kafka <<EOF
- cd /opt/kafka_2.12-3.3.1
- bin/kafka-server-stop.sh
- EOF
- echo 從節(jié)點(diǎn) $kafka 停止kafka...[ done ]
- sleep 5
- done
七、測試Kafka集群
7.1 創(chuàng)建topic
- [root@data-vm1 kafka_2.12-3.3.1]# bin/kafka-topics.sh \
- --create \
- --topic quickstart-events \
- --bootstrap-server data-vm4:9092
-
- Created topic quickstart-events.
- [root@data-vm1 kafka_2.12-3.3.1]#
-
- #
- [root@data-vm1 kafka_2.12-3.3.1]# bin/kafka-topics.sh \
- --create \
- --topic quickstart-events \
- --bootstrap-server data-vm1:9092,data-vm2:9092,data-vm3:9092
7.2 查看topic列表
- bin/kafka-topics.sh \
- --list \
- --bootstrap-server data-vm4:9092
-
- #
- bin/kafka-topics.sh \
- --list \
- --bootstrap-server data-vm1:9092,data-vm2:9092,data-vm3:9092,data-vm4:9092
7.3 查看消息詳情
- [root@data-vm1 kafka_2.12-3.3.1]# bin/kafka-topics.sh \
- --describe \
- --topic quickstart-events \
- --bootstrap-server data-vm3:9092
-
- Topic: quickstart-events TopicId: zSOJC6wNRRGQ4MudfHLGvQ PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
- Topic: quickstart-events Partition: 0 Leader: 1 Replicas: 1 Isr: 1
-
- [root@data-vm1 kafka_2.12-3.3.1]#
7.4 生產(chǎn)消息
- [root@data-vm1 kafka_2.12-3.3.1]# bin/kafka-console-producer.sh \
- --topic quickstart-events \
- --bootstrap-server data-vm1:9092
-
- # 參考: 創(chuàng)建并配置topic
- bin/kafka-topics.sh \
- --bootstrap-server localhost:9092 \
- --create \
- --topic my-topic \
- --partitions 1 \
- --replication-factor 1 \
- --config max.message.bytes=64000 \
- --config flush.messages=1
-
- # ------------------------- 參考 ------------------------ #
- # 1: 修改已創(chuàng)建topic配置
- # (Overrides can also be changed or set later using the alter configs command.)
- bin/kafka-configs.sh \
- --bootstrap-server localhost:9092 \
- --entity-type topics \
- --entity-name my-topic \
- --alter \
- --add-config max.message.bytes=128000
-
- # 2: 檢查已修改的topic配置是否生效
- # (To check overrides set on the topic you can do)
- bin/kafka-configs.sh \
- --bootstrap-server localhost:9092 \
- --entity-type topics \
- --entity-name my-topic \
- --describe
-
- # 3. 恢復(fù)到原來的配置
- # (To remove an override you can do)
- bin/kafka-configs.sh \
- --bootstrap-server localhost:9092 \
- --entity-type topics \
- --entity-name my-topic \
- --alter \
- --delete-config max.message.bytes
-
- # 4. 增加分區(qū)數(shù)
- # (To add partitions you can do)
- bin/kafka-topics.sh \
- --bootstrap-server broker_host:port \
- --alter \
- --topic my_topic_name \
- --partitions 40
-
- # 5. 添加配置
- # (To add configs:)
- bin/kafka-configs.sh \
- --bootstrap-server broker_host:port \
- --entity-type topics \
- --entity-name my_topic_name \
- --alter \
- --add-config x=y
-
- # 6. 移除配置
- # (To remove a config:)
- bin/kafka-configs.sh \
- --bootstrap-server broker_host:port \
- --entity-type topics \
- --entity-name my_topic_name \
- --alter \
- --delete-config x
-
- # 7. 刪除topic
- # (And finally deleting a topic:)
- bin/kafka-topics.sh \
- --bootstrap-server broker_host:port \
- --delete \
- --topic my_topic_name
7.5 消費(fèi)消息
bin/kafka-console-consumer.sh \ --topic quickstart-events \ --from-beginning \ --bootstrap-server data-vm4:9092
7.6 查看消費(fèi)者組
- # 檢查消費(fèi)者postition
- # Checking consumer position
- bin/kafka-consumer-groups.sh \
- --bootstrap-server localhost:9092 \
- --describe \
- --group my-group
-
- TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
- my-topic 0 2 4 2 consumer-1-029af89c-873c-4751-a720-cefd41a669d6 /127.0.0.1 consumer-1
- my-topic 1 2 3 1 consumer-1-029af89c-873c-4751-a720-cefd41a669d6 /127.0.0.1 consumer-1
- my-topic 2 2 3 1 consumer-2-42c1abd4-e3b2-425d-a8bb-e1ea49b29bb2 /127.0.0.1 consumer-2
7.7 查看消費(fèi)者組列表
- # list all consumer groups across all topics
- bin/kafka-consumer-groups.sh \
- --bootstrap-server localhost:9092 \
- --list
-
- test-consumer-group
-
-
- # To view offsets, as mentioned earlier,
- # we "describe" the consumer group like this:
- bin/kafka-consumer-groups.sh \
- --bootstrap-server localhost:9092 \
- --describe \
- --group my-group
-
- TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
- topic3 0 241019 395308 154289 consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1 consumer2
- topic2 1 520678 803288 282610 consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1 consumer2
- topic3 1 241018 398817 157799 consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1 consumer2
- topic1 0 854144 855809 1665 consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1 consumer1
- topic2 0 460537 803290 342753 consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1 consumer1
- topic3 2 243655 398812 155157 consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1 consumer4
-
- # 更多配置參考:
- # https://kafka.apache.org/32/documentation.html#uses