萊蕪在線沙總寧波seo在線優(yōu)化方案
本次在CentOS7.6上搭建Kafka集群
Apache Kafka 是一個高吞吐量的分布式消息系統(tǒng),被廣泛應用于大規(guī)模數(shù)據(jù)處理和實時數(shù)據(jù)管道中。本文將介紹在CentOS操作系統(tǒng)上搭建Kafka集群的過程,以便于構建可靠的消息處理平臺。
文件分享(KafkaUI、kafka3.3.2、jdk1.8)
鏈接:https://pan.baidu.com/s/1dn_mQKc1FnlQvuSgGjBc1w?pwd=t2v9 提取碼:t2v9
也可以從官網(wǎng)自己下
Kafka官網(wǎng)
步驟概覽
本次使用三臺機器 hostname別名分別為res01(10.41.7.41)、res02(10.41.7.42)、res03(10.41.7.43)
在這個教程中,我們將覆蓋以下主要步驟:
- 準備環(huán)境:安裝和配置Java、Zookeeper和Kafka所需的依賴。
- 配置Zookeeper集群:確保Kafka有可靠的分布式協(xié)調(diào)服務。(本次使用Kafka自帶的zookeeper)
- 配置Kafka集群:在每個節(jié)點上安裝和配置Kafka,設置Kafka集群以實現(xiàn)高性能和高可用性。
步驟詳解
步驟1:環(huán)境準備 (先對第一臺服務器操作 其他的使用scp傳輸之后進行小修改就好)
安裝Java
確保Java安裝正確,并設置JAVA_HOME環(huán)境變量。
如果對于jdk安裝有問題的可以看一下這篇Linux安裝MySQL、JDK(含環(huán)境變量配置)、Tomcat
步驟2:安裝Kafka
下載和解壓Kafka
[root@res01 module]# clear
[root@res01 module]# ll
總用量 104124
drwxr-xr-x. 4 root root 40 11月 10 10:07 data
-rw-r--r--. 1 root root 106619987 11月 10 10:33 kafka_2.13-3.3.2.tgz
[root@res01 module]# tar -zxvf kafka_2.13-3.3.2.tgz
解壓之后通過mv改名
步驟3:配置zookeeper
進入文件夾kafka3.3.2中找到config
接下來主要修改zookeeper.properties和server.properties這兩個文件
zookeeper.properties如下
# 需要去新建/opt/module/data/zookeeper下面這兩個文件夾
dataDir=/opt/module/data/zookeeper/data
dataLogDir=/opt/module/data/zookeeper/logs
clientPort=12181
maxClientCnxns=0
admin.enableServer=false
tickTime=2000
initLimit=10
syncLimit=5
# server.X=hostname:peerPort:leaderPort
# peerPort 是服務器之間通信的端口。
# leaderPort 是用于選舉 leader 的端口。
server.1=res01:12182:12183
server.2=res02:12182:12183
server.3=res03:12182:12183#res01、res02、res03是我本地設置過的主機名 如果沒設置使用ip地址即可
在每個節(jié)點上zookeeper的配置文件中dataDir目錄下創(chuàng)建一個名為myid
的文件,并分別填入相應節(jié)點的ID號:1
、2
、3
。
步驟4:配置Kafka
編輯Kafka配置文件config/server.properties
,設置broker.id
和zookeeper.connect
:
# 設置 broker.id 這個是 Kafka 集群區(qū)分每個節(jié)點的唯一標志符。 對應那個myid即可
broker.id=1
# 將監(jiān)聽端口設置為19091
listeners=PLAINTEXT://res01:19091# 將廣告給客戶端的地址也設置為19091
advertised.listeners=PLAINTEXT://res01:19091num.network.threads=3num.io.threads=8socket.send.buffer.bytes=102400# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600############################# Log Basics ############################## 設置 Kafka 的數(shù)據(jù)存儲路徑 這個目錄下不能有其他非 Kafka 目錄,不然會導致 Kafka 集群無法啟動。
log.dirs=/opt/module/data/kafka-log
# 默認的 Partition 的個數(shù)。
num.partitions=3
# 設置默認的復制因子為3
default.replication.factor=3num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
# Kafka 的數(shù)據(jù)保留的時間,默認是 7 天 168h。 這里使用24小時
log.retention.hours=24log.retention.check.interval.ms=300000
# Kafka 連接的 ZooKeeper 的地址和連接 Kafka 的超時時間。
zookeeper.connect=res01:12181,res02:12181,res03:12181
zookeeper.connection.timeout.ms=6000group.initial.rebalance.delay.ms=0
# 設置是否可以刪除 Topic,默認 Kafka 的 Topic 是不允許刪除的 這里打開了
delete.topic.enable=true# 這是用于啟用或禁用日志清理的選項,默認值為 true,以確保 Kafka 持續(xù)進行日志清理。需要根據(jù)實際需求進行設置。
log.cleaner.enable=true
# 這個參數(shù)控制日志清理線程的數(shù)量。對于你的硬件配置,你可以考慮設置為 4 或 8 來充分利用服務器的性能。
log.cleaner.threads=4
# 這個參數(shù)用于控制日志清理線程的 IO 緩沖區(qū)大小。對于你的硬件配置,可以設置為 8192 或 16384。
log.cleaner.io.buffer.size=8192
# 這個參數(shù)是用來設置主題日志保留的最大字節(jié)數(shù)。對于控制磁盤空間的使用非常重要。例如,如果你希望限制每個主題的數(shù)據(jù)量不超過 100GB,可以設置為 107374182400
log.retention.bytes=107374182400
# 這個參數(shù)用于控制每個日志段文件的最大大小。對于你的硬件配置,你可以設置為 1073741824(即 1GB)。
log.segment.bytes=1073741824
# 這個參數(shù)用于設置 Zookeeper 會話的超時時間。對于較大的集群和連接較慢的網(wǎng)絡,你可以考慮將其設置為 10000,即 10 秒。
zookeeper.session.timeout.ms=10000
重點是這個:
# 設置默認的復制因子為3
default.replication.factor=3
在Kafka集群的每個節(jié)點上,修改broker.id
為對應的節(jié)點ID。
配置kafka環(huán)境變量
#java環(huán)境
export JAVA_HOME=/usr/local/java/jdk1.8
export PATH=$PATH:$JAVA_HOME/bin
#kafka環(huán)境
export KAFKA_HOME=/opt/module/kafka3.3.2
export PATH=$PATH:$KAFKA_HOME/bin
步驟5:復制虛擬機
(已有其他服務器的直接連網(wǎng)線scp就好 環(huán)境變量 配置小改一下就好 還有hosts、ip等等別忘了配置,我這里直接復制虛擬機了 )
配置ip、hostname以及hosts后嘗試ping
res02修改kafka的config文件即可(zookeeper配置文件都一樣 用res01配置的就好)
res03的kafka配置文件同理
以及各臺機器的myid(對應上brokerId即可)
步驟6:配置Kafka集群
確保防火墻或安全組允許Kafka端口通過,通常是9092端口。(我這是修改過的為19091,我直接關防火墻了 方便。)
systemctl stop firewalld.service
#關閉運行的防火墻
systemctl disable firewalld.service
#永久關閉防火墻
本次使用的是絕對路徑 各位可以到kafka目錄下執(zhí)行命令 去掉前面的絕對路徑就好
zookeeper命令
在每個機器上,先啟動zookeeper:
/opt/module/kafka3.3.2/bin/zookeeper-server-stop.sh
#停止命令
/opt/module/kafka3.3.2/bin/zookeeper-server-start.sh /opt/module/kafka3.3.2/config/zookeeper.properties
#啟動命令
/opt/module/kafka3.3.2/bin/zookeeper-server-start.sh -daemon /opt/module/kafka3.3.2/config/zookeeper.properties
#后臺啟動命令 常用~
統(tǒng)一啟動后jps查看進程
Kafka命令
在每個Kafka節(jié)點上,啟動Kafka服務器:
/opt/module/kafka3.3.2/bin/kafka-server-start.sh /opt/module/kafka3.3.2/config/server.properties
#kafka啟動命令
/opt/module/kafka3.3.2/bin/kafka-server-start.sh -daemon /opt/module/kafka3.3.2/config/server.properties
#kafka后臺啟動命令 常用~
/opt/module/kafka3.3.2/bin/kafka-server-stop.sh
#停止命令
統(tǒng)一啟動后jps查看進程
創(chuàng)建主題
使用kafka-topics.sh
命令創(chuàng)建一個主題:這里設置的復制因子為3?
bin/kafka-topics.sh --create --topic 你的topic--bootstrap-server res01:19091--replication-factor 3 --partitions 3
驗證Kafka集群
使用生產(chǎn)者和消費者驗證Kafka集群的功能:
# 啟動生產(chǎn)者
bin/kafka-console-producer.sh --topic myTopic --bootstrap-server res01:19091# 啟動消費者
bin/kafka-console-consumer.sh --topic myTopic --bootstrap-server res01:19091--from-beginning
停止 Zookeeper:
/opt/module/kafka3.3.2/bin/zookeeper-server-stop.sh
啟動 Zookeeper:
/opt/module/kafka3.3.2/bin/zookeeper-server-start.sh /opt/module/kafka3.3.2/config/zookeeper.properties
后臺啟動 Zookeeper:
/opt/module/kafka3.3.2/bin/zookeeper-server-start.sh -daemon /opt/module/kafka3.3.2/config/zookeeper.properties
清空 Kafka 日志:
rm -rf //opt/module/data/kafka-logs/*
啟動 Kafka 服務:
/opt/module/kafka3.3.2/bin/kafka-server-start.sh /opt/module/kafka3.3.2/config/server.properties
后臺啟動 Kafka 服務:
/opt/module/kafka3.3.2/bin/kafka-server-start.sh -daemon /opt/module/kafka3.3.2/config/server.properties
停止 Kafka 服務:
/opt/module/kafka3.3.2/bin/kafka-server-stop.sh
創(chuàng)建 Topic:
/opt/module/kafka3.3.2/bin/kafka-topics.sh --create --topic [TOPIC_NAME] --bootstrap-server [SERVER_IP]:[PORT] --partitions [PARTITIONS_SIZE] --replication-factor [REPLICATION_FACTOR]
刪除 Topic:
/opt/module/kafka3.3.2/bin/kafka-topics.sh --delete --topic [TOPIC_NAME] --bootstrap-server [SERVER_IP]:[PORT]
查看 Topic 信息:
/opt/module/kafka3.3.2/bin/kafka-topics.sh --describe --topic [TOPIC_NAME] --bootstrap-server [SERVER_IP]:[PORT]
列出所有的 Topic:
/opt/module/kafka3.3.2/bin/kafka-topics.sh --list --bootstrap-server [SERVER_IP]:[PORT]
控制臺生產(chǎn)消息:
/opt/module/kafka3.3.2/bin/kafka-console-producer.sh --bootstrap-server [SERVER_IP]:[PORT] --topic [TOPIC_NAME]
控制臺消費信息:
/opt/module/kafka3.3.2/bin/kafka-console-consumer.sh --bootstrap-server [SERVER_IP]:[PORT] --topic [TOPIC_NAME] --from-beginning
查看副本:
/opt/module/kafka3.3.2/bin/kafka-topics.sh --describe --bootstrap-server [SERVER_IP]:[PORT] | grep consumer_offsets
請記住替換 [TOPIC_NAME]、[SERVER_IP]:[PORT]、[PARTITIONS_SIZE]、[REPLICATION_FACTOR] 等位中的值為實際的值。
自己的做的總結如上:
運行Java代碼生成topic 可以看到分區(qū)都是3符合集群要求
運行kafkaui查看詳細情況(百度網(wǎng)盤鏈接里有,自己輸入命令太累了 直接用別人封裝好現(xiàn)成的看就好~)
結論
通過這個步驟,我們成功地搭建了一個基本的Kafka集群。在實際生產(chǎn)環(huán)境中,您可能需要進一步調(diào)整和優(yōu)化配置,以滿足特定需求和性能要求。
希望這個教程可以幫助您成功搭建Kafka集群,為您的數(shù)據(jù)處理和消息傳遞架構提供強大的基礎設施。
最后溫馨提示:如果你遠程服務器起了別名,而自己電腦的hosts別名對應其他的服務器 也會發(fā)生報錯 記得別名對應好ip即可