萬(wàn)維網(wǎng)申請(qǐng)網(wǎng)站域名搜狗站長(zhǎng)工具平臺(tái)
Apache?Kafka是一個(gè)開源分布式事件流平臺(tái),被數(shù)千家公司用于高性能數(shù)據(jù)管道、流分析、數(shù)據(jù)集成和關(guān)鍵任務(wù)應(yīng)用程序
Kafka 官網(wǎng):Apache Kafka
關(guān)于ZooKeeper的棄用
根據(jù) Kafka官網(wǎng)信息,隨著Apache Kafka 3.5版本的發(fā)布,Zookeeper現(xiàn)已被標(biāo)記為已棄用。未來(lái)計(jì)劃在Apache Kafka(4.0版)的下一個(gè)主要版本中刪除ZooKeeper,該版本最快將于2024年4月發(fā)布。在棄用階段,ZooKeeper仍然支持用于Kafka集群元數(shù)據(jù)的管理,但不建議用于新的部署。新的部署方式使用?KRaft 模式,KRaft 模式部署可以看筆者的文章《kafka 集群 KRaft 模式搭建》,考慮到一些公司仍然在使用老版本的 Kafka,故筆者寫這篇文章記錄 Kafka 集群Zookeeper 模式搭建
官網(wǎng)信息截圖
筆者使用3臺(tái)服務(wù)器,它們的 ip 分別是?192.168.3.232、192.168.2.90、192.168.2.11
目錄
1、官網(wǎng)下載 Kafka
2、配置 Kafka
3、啟動(dòng) Kafka 集群
4、關(guān)閉 Kafka 集群
5、使用Kafka 可視化工具查看
6、測(cè)試Kafka集群
1、官網(wǎng)下載 Kafka
這里筆者下載最新版3.6.0
3.6.0 版本需要至少 java8 及以上版本,筆者使用的是 java8 版本
關(guān)于 linux 安裝 java,沒(méi)安裝過(guò)的朋友可以參考《linux 系統(tǒng)安裝 jdk》
下載完成
將 kafka分別上傳到3臺(tái)linux
在3臺(tái)服務(wù)器上分別創(chuàng)建 kafka 安裝目錄
mkdir /usr/local/kafka
在3臺(tái)服務(wù)器上分別將 kafka 安裝包解壓到新創(chuàng)建的 kafka 目錄
tar -xzf kafka_2.13-3.6.0.tgz -C /usr/local/kafka
2、配置 Kafka
進(jìn)入配置目錄
cd /usr/local/kafka/kafka_2.13-3.6.0/config
編輯配置文件?server.properties
vi server.properties
配置?broker.id,advertised.listeners,zookeeper.connect
broker.id 每個(gè)節(jié)點(diǎn)的id
advertised.listeners 本機(jī)的外網(wǎng)訪問(wèn)地址
zookeeper.connect zookeeper 地址
192.168.3.232 節(jié)點(diǎn)配置
advertised.listeners 筆者配置為本機(jī)地址
192.168.2.90 節(jié)點(diǎn)
192.168.2.11 節(jié)點(diǎn)
筆者zookeeper 地址是 192.168.2.130:2181
zookeeper 版本是3.8.3
關(guān)于zookeeper單機(jī)安裝和集群安裝可以參考:《Linux環(huán)境 安裝 zookeeper》《windows環(huán)境 安裝 zookeeper》《linux 使用 nginx 搭建 zookeeper 集群》
3、啟動(dòng) Kafka 集群
首先啟動(dòng)?zookeeper
然后在3臺(tái)機(jī)器上依次啟動(dòng) Kafka
進(jìn)入 kafka 目錄
cd /usr/local/kafka/kafka_2.13-3.6.0
下面2個(gè)命令皆可
bin/kafka-server-start.sh config/server.properties
或
bin/kafka-server-start.sh -daemon config/server.properties
4、關(guān)閉 Kafka 集群
關(guān)閉命令
bin/kafka-server-stop.sh
在 3 個(gè)節(jié)點(diǎn)上分別執(zhí)行關(guān)閉命令
5、使用Kafka 可視化工具查看
下載地址:https://www.kafkatool.com/download.html
運(yùn)行效果
6、測(cè)試Kafka集群
新建 maven 項(xiàng)目,添加 Kafka 依賴
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.0</version>
</dependency>
筆者新建 maven項(xiàng)目?kafka-learn
kafka-learn 項(xiàng)目 pom 文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.wsjzzcbq</groupId><artifactId>kafka-learn</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.0</version></dependency></dependencies><build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><configuration><source>11</source><target>11</target></configuration></plugin></plugins></build>
</project>
新建生產(chǎn)者?ProducerDemo
package com.wsjzzcbq;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;/*** Demo** @author wsjz* @date 2023/11/24*/
public class ProducerDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {Properties properties = new Properties();//配置集群節(jié)點(diǎn)信息properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.3.232:9092,192.168.2.90:9092,192.168.2.11:9092");//配置序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());Producer<String, String> producer = new KafkaProducer<>(properties);//topic 名稱是demo_topicProducerRecord<String, String> producerRecord = new ProducerRecord<>("demo_topic", "明月別枝驚鵲");RecordMetadata recordMetadata = producer.send(producerRecord).get();System.out.println(recordMetadata.topic());System.out.println(recordMetadata.partition());System.out.println(recordMetadata.offset());}
}
新建消費(fèi)者?ConsumerDemo
package com.wsjzzcbq;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;/*** ConsumerDemo** @author wsjz* @date 2023/11/24*/
public class ConsumerDemo {public static void main(String[] args) {Properties properties = new Properties();// 配置集群節(jié)點(diǎn)信息properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.3.232:9092,192.168.2.90:9092,192.168.2.11:9092");// 消費(fèi)分組名properties.put(ConsumerConfig.GROUP_ID_CONFIG, "demo_group");// 序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties);// 消費(fèi)者訂閱主題consumer.subscribe(Arrays.asList("demo_topic"));while (true) {ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String,String> record:records) {System.out.printf("收到消息:partition=%d, offset=%d, key=%s, value=%s%n",record.partition(),record.offset(),record.key(),record.value());}}}
}
運(yùn)行測(cè)試
效果圖
至此完