在線旅游攻略網(wǎng)站建設(shè)方案關(guān)鍵詞優(yōu)化好
1、官網(wǎng)下載最新版本的kafka,里面已經(jīng)集成zookeeper。直接解壓到D盤
2、配置文件修改,config目錄下面的zookeeper.properties.? ?設(shè)置zookeeper數(shù)據(jù)目錄
dataDir=D:/kafka_2.12-3.6.0/tmp/zookeeper
3、修改kafka的配置文件server.properties.? ?主要修改內(nèi)容如下:
zookeeper.connect=localhost:2181
log.dirs=D:\\kafka_2.12-3.6.0\\logs
listeners=PLAINTEXT://localhost:9092
其他默認(rèn)即可。
4、修改完成后進(jìn)入bin目錄:啟動zookeeper和kafka,命令如下
zookeeper-server-start.bat ../../config/zookeeper.properties
kafka-server-start.bat ../../config/server.properties
5、命令行創(chuàng)建topic,命令如下:
kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic hello
6、創(chuàng)建生產(chǎn)者和消費者,測試。生產(chǎn)者輸入消息,消費者就會收到相應(yīng)的消息了?
kafka-console-producer.bat --broker-list localhost:9092 --topic hello
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic hello--from-beginning
7、創(chuàng)建springboot工程,測試
引入依賴:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
8、yml文件配置kafka
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
acks: 1
retries: 3
batch-size: 16384
properties:
linger:
ms: 0
buffer-memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: helloGroup
enable-auto-commit: false
auto-commit-interval: 1000
auto-offset-reset: latest
properties:
request:
timeout:
ms: 18000
session:
timeout:
ms: 12000
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
9、使用springboot? ??KafkaTemplate發(fā)送消息
@RequestMapping(value = "/sendMessage", method = RequestMethod.GET)
public String sendMessage(String message) {
kafkaTemplate.send("hello", message);
return "發(fā)送成功~";
}
10、消息消費,
@KafkaListener(topics = "hello")
public void receiveMessage(ConsumerRecord<String, String> record) {
String topic = record.topic();
long offset = record.offset();
int partition = record.partition();
String message = record.value();
System.out.println("topic = " + topic);
System.out.println("offset = " + offset);
System.out.println("partition = " + partition);
System.out.println("message = " + message);
}