安美東莞網(wǎng)站建設(shè)東莞seo網(wǎng)絡(luò)營(yíng)銷(xiāo)
kafka
- 一、簡(jiǎn)介
- 1.1、場(chǎng)景選擇,與其他mq相比
- 1.2、應(yīng)用場(chǎng)景
- 1.2.1、流量消峰
- 1.2.2、解耦
- 1.2.3、異步通訊
- 1.3、消息隊(duì)列的兩種模式
- 1.3.1、點(diǎn)對(duì)點(diǎn)模式
- 1.3.2、發(fā)布/訂閱模式
- 1.4、Kafka 基礎(chǔ)架構(gòu)
- 二、安裝部署
- 2.1、安裝包方式
- 2.2、docker安裝方式
- 2.3、docker安裝kafka-map圖形化管理工具
- 三、Kafka 命令行操作
- 3.1、主題命令行操作
- 3.2、生產(chǎn)者命令行操作
- 3.3、消費(fèi)者命令行操作
- 四、Springboot整合Kafka
- 4.1、依賴(lài)配置
- 4.2、代碼示例
- 4.2.1、簡(jiǎn)單的生產(chǎn)、消費(fèi)
- 4.2.2、指定分區(qū)生產(chǎn)、消費(fèi)
- 4.2.3、消息確認(rèn)
一、簡(jiǎn)介
1.1、場(chǎng)景選擇,與其他mq相比
1、目前企業(yè)中比較常見(jiàn)的消息隊(duì)列產(chǎn)品主 要有 Kafka、ActiveMQ 、RabbitMQ 、RocketMQ 等。
在大數(shù)據(jù)場(chǎng)景
主要采用 Kafka 作為消息隊(duì)列。在 JavaEE 開(kāi)發(fā)中主要采用 ActiveMQ、RabbitMQ、RocketMQ。
2、Kafka與其他消息隊(duì)列MQ(如ActiveMQ、RabbitMQ等)相比,有以下幾個(gè)區(qū)別:
-
磁盤(pán)存儲(chǔ):
Kafka將所有消息都保存在磁盤(pán)上
,并使用內(nèi)存映射文件進(jìn)行讀寫(xiě)。這種存儲(chǔ)方式可以支持大量的消息數(shù)據(jù),而且數(shù)據(jù)還可以保留很長(zhǎng)時(shí)間,比如幾個(gè)月甚至幾年。而其他MQ的存儲(chǔ)方式多是基于內(nèi)存
,不適合存儲(chǔ)大量的數(shù)據(jù)。 -
分布式設(shè)計(jì):Kafka是作為分布式系統(tǒng)設(shè)計(jì)的,可以在多個(gè)節(jié)點(diǎn)之間實(shí)現(xiàn)消息的高效傳輸和處理。其他MQ也支持分布式部署,但Kafka在這方面更加優(yōu)秀。
-
發(fā)布/訂閱模式:Kafka采用發(fā)布/訂閱模式,允許多個(gè)消費(fèi)者同時(shí)訂閱同一個(gè)主題,而且
Kafka消費(fèi)者可以自定義從哪個(gè)位置開(kāi)始消費(fèi)消息
。其他MQ中,消費(fèi)者一般需要通過(guò)消費(fèi)者組來(lái)進(jìn)行負(fù)載均衡,而且其他MQ消費(fèi)者只能從當(dāng)前位置開(kāi)始消費(fèi)
。 -
大數(shù)據(jù)處理:
Kafka最初是為大數(shù)據(jù)處理而設(shè)計(jì)
的,它可以非常高效地處理海量數(shù)據(jù),適合用于數(shù)據(jù)倉(cāng)庫(kù)、日志處理、統(tǒng)計(jì)分析等場(chǎng)景。其他MQ則更多用于異步通信、任務(wù)調(diào)度、實(shí)時(shí)通知等領(lǐng)域
。 -
生態(tài)系統(tǒng):Kafka擁有非常豐富的生態(tài)系統(tǒng),包括Kafka Connect、Kafka Streams等工具和框架,可以方便地與大數(shù)據(jù)處理平臺(tái)(如Hadoop、Spark、Flink等)進(jìn)行集成。其他MQ的生態(tài)系統(tǒng)相對(duì)較小。
1.2、應(yīng)用場(chǎng)景
1.2.1、流量消峰
如雙十一秒殺期間,參與用戶(hù):10億人/s,但是我們的系統(tǒng)只能支持處理能力:1千萬(wàn)人/s,為了避免服務(wù)掛掉或者請(qǐng)求超時(shí)等等問(wèn)題,我們可以將10億的請(qǐng)求都寫(xiě)入到消息隊(duì)列中,我們系統(tǒng)再去取消息隊(duì)列上的消息消費(fèi),達(dá)到流量消峰的效果。
1.2.2、解耦
我們的數(shù)據(jù)源來(lái)源可能非常多,不可能都全部去集成。例如場(chǎng)景:我們要去買(mǎi)東西,不用去知道他是在哪里進(jìn)貨的,我們只需要去超市買(mǎi)就可以,消息隊(duì)列也是如此,那么多的數(shù)據(jù)讓他們?nèi)繉?xiě)入到kafka消息隊(duì)列中即可,我們?cè)偃ハ㈥?duì)列中獲取我們的數(shù)據(jù)。
1.2.3、異步通訊
通常我們寫(xiě)代碼,如注冊(cè)完,要發(fā)短信,如果同步處理,等到發(fā)短信成功后再返回結(jié)果給用戶(hù),這樣請(qǐng)求時(shí)間太久了。
消息隊(duì)列的方式,可以再注冊(cè)的時(shí)候,發(fā)送給隊(duì)列,我們這時(shí)候就可以返回給用戶(hù)注冊(cè)成功了,然后消費(fèi)者再去消費(fèi)發(fā)送短信的隊(duì)列,達(dá)到異步的效果。
很多人可能會(huì)問(wèn),多線程的方式不是也能實(shí)現(xiàn)?到底選擇多線程還是消息隊(duì)列呢?
-
當(dāng)需要進(jìn)行任務(wù)處理,并且任務(wù)處理之間
沒(méi)有明顯的依賴(lài)關(guān)系
時(shí),使用消息隊(duì)列
更適合。將任務(wù)發(fā)送到消息隊(duì)列中,由消費(fèi)者進(jìn)行消費(fèi),這樣可以實(shí)現(xiàn)解耦、提高可靠性和系統(tǒng)的擴(kuò)展性
。例如,一個(gè)網(wǎng)站需要生成大量的報(bào)告,將報(bào)告生成請(qǐng)求通過(guò)消息隊(duì)列進(jìn)行異步處理是一個(gè)不錯(cuò)的選擇。 -
當(dāng)需要對(duì)任務(wù)進(jìn)行精細(xì)控制,并且任務(wù)處理之間存在
明顯的依賴(lài)關(guān)系
時(shí),使用多線程
更適合。多線程可以實(shí)現(xiàn)更加細(xì)粒度的任務(wù)處理,可以控制任務(wù)的執(zhí)行順序、進(jìn)行資源的共享
等。例如,一個(gè)電商網(wǎng)站需要實(shí)時(shí)監(jiān)控庫(kù)存的變化,需要在某個(gè)商品的庫(kù)存下降到一定數(shù)量時(shí)進(jìn)行補(bǔ)貨,在這種情況下使用多線程處理更加合適。
總之,消息隊(duì)列和多線程都有自己的優(yōu)勢(shì)和劣勢(shì),要根據(jù)具體的場(chǎng)景選擇合適的方式,才能更好地提高系統(tǒng)效率和可靠性。
1.3、消息隊(duì)列的兩種模式
1.3.1、點(diǎn)對(duì)點(diǎn)模式
消費(fèi)者主動(dòng)拉取數(shù)據(jù),消息收到后清除消息
1.3.2、發(fā)布/訂閱模式
- 可以有多個(gè)topic主題(瀏覽、點(diǎn)贊、收藏、評(píng)論等)
- 消費(fèi)者消費(fèi)數(shù)據(jù)之后,不刪除數(shù)據(jù)(可以控制什么時(shí)候刪除)
- 每個(gè)消費(fèi)者相互獨(dú)立,都可以消費(fèi)到數(shù)據(jù)
這個(gè)方式用的多,它可以處理更多復(fù)雜的場(chǎng)景。
1.4、Kafka 基礎(chǔ)架構(gòu)
1、海量數(shù)據(jù)分而置之,為方便擴(kuò)展,并提高吞吐量,一個(gè)topic分為多個(gè)partition分區(qū)。
如100T的數(shù)據(jù),我可以分成3個(gè)區(qū),每個(gè)區(qū)三十幾G,可以提高吞吐量。
2、配合分區(qū)的設(shè)計(jì),提出消費(fèi)者組的概念,組內(nèi)每個(gè)消費(fèi)者并行消費(fèi)
一個(gè)分區(qū)的數(shù)據(jù),只能由一個(gè)消費(fèi)者處理,如有兩個(gè)消費(fèi)者消費(fèi)不知道由第一個(gè)還是第二個(gè)來(lái)消費(fèi)。
3、為提高可用性,為每個(gè)partition增加若干副本,防止一個(gè)分區(qū)掛了,類(lèi)似NameNode HA
副本分為leader和fallower之分,follower不做被消費(fèi),只是為了防止leader副本掛了后,follower有條件成為leader,提高可用性。
zookeeper中記錄誰(shuí)是leader和整個(gè)集群中哪些服務(wù)器正在工作
,Kafka2.8.0以后也可以配置不采用ZK,安裝包已內(nèi)置ZK
- Producer:消息生產(chǎn)者,就是向 Kafka broker 發(fā)消息的客戶(hù)端。
- Consumer:消息消費(fèi)者,向 Kafka broker 取消息的客戶(hù)端。
- Consumer Group(CG):消費(fèi)者組,由多個(gè) consumer 組成。消費(fèi)者組內(nèi)每個(gè)消費(fèi)者負(fù)責(zé)消費(fèi)不同分區(qū)的數(shù)據(jù),一個(gè)分區(qū)只能由一個(gè)組內(nèi)消費(fèi)者消費(fèi);消費(fèi)者組之間互不影響。所有的消費(fèi)者都屬于某個(gè)消費(fèi)者組,即消費(fèi)者組是邏輯上的一個(gè)訂閱者。
- Broker:一臺(tái) Kafka 服務(wù)器就是一個(gè) broker。一個(gè)集群由多個(gè) broker 組成。一個(gè)broker 可以容納多個(gè) topic。
- Topic:可以理解為一個(gè)隊(duì)列,生產(chǎn)者和消費(fèi)者面向的都是一個(gè) topic。
- Partition:為了實(shí)現(xiàn)擴(kuò)展性,一個(gè)非常大的 topic 可以分布到多個(gè) broker(即服務(wù)器)上,一個(gè) topic 可以分為多個(gè) partition,每個(gè) partition 是一個(gè)有序的隊(duì)列。
- Replica:副本。一個(gè) topic 的每個(gè)分區(qū)都有若干個(gè)副本,一個(gè) Leader 和若干個(gè)Follower。
- Leader:每個(gè)分區(qū)多個(gè)副本的“主”,生產(chǎn)者發(fā)送數(shù)據(jù)的對(duì)象,以及消費(fèi)者消費(fèi)數(shù)據(jù)的對(duì)象都是 Leader。
- Follower:每個(gè)分區(qū)多個(gè)副本中的“從”,實(shí)時(shí)從 Leader 中同步數(shù)據(jù),保持和Leader 數(shù)據(jù)的同步。Leader 發(fā)生故障時(shí),某個(gè) Follower 會(huì)成為新的 Leader。
二、安裝部署
2.1、安裝包方式
官網(wǎng)下載鏈接: https://kafka.apache.org/downloads
1、選擇你要的版本進(jìn)行下載
2、將安裝包上傳服務(wù)器,并解壓
tar -xzf kafka_2.12-3.4.0.tgz
進(jìn)入目錄
3、修改config目錄下的server.properties配置文件
vim server.properties
修改log.dirs的路徑,這個(gè)是kafka存儲(chǔ)數(shù)據(jù)的地方,默認(rèn)放在了臨時(shí)文件夾里,容易被刪除,我們需改成我們服務(wù)器可以存放的目錄,如我放在home底下
放開(kāi)注釋,修改地址成你的ip
4、修改kafka的環(huán)境變量
#編輯配置文件
vi /etc/profile# KAFKA_HOME
export KAFKA_HOME=kafka根目錄
export PATH=$PATH:$KAFKA_HOME/bin#例如筆者的文件路徑
export KAFKA_HOME=/www/wwwroot/kafka_2.12-3.4.0/
export PATH=$PATH:$KAFKA_HOME/bin# 添加配置后重新加載配置文件
source /etc/profile
5、配置config下的zookeeper配置,同樣也只是修改存儲(chǔ)路徑
vim zookeeper.properties將dataDir修改成你自己存儲(chǔ)路徑,如我的dataDir=/home/zookeeper
還有下面的advertised放開(kāi)注釋,更換ip,zookeeper連接也更換ip
6、啟動(dòng),需先啟動(dòng)zookeeper
注意:要先有java環(huán)境
# 啟動(dòng)zookeeper,直接指定到你們自己的bin和config目錄
nohup /www/wwwroot/kafka_2.12-3.4.0/bin/zookeeper-server-start.sh -daemon /www/wwwroot/kafka_2.12-3.4.0/config/zookeeper.properties# 啟動(dòng)kafka
nohup /www/wwwroot/kafka_2.12-3.4.0/bin/kafka-server-start.sh -daemon /www/wwwroot/kafka_2.12-3.4.0/config/server.properties
7、驗(yàn)證是否啟動(dòng)成功,查看端口是有被用,或者看日志文件
2.2、docker安裝方式
1、docker 安裝zookeeper
# 安裝鏡像
docker pull wurstmeister/zookeeper#啟動(dòng)容器
docker run -d --name zookeeper_server --restart always -p 2181:2181 wurstmeister/zookeeper# 查看端口是否啟動(dòng)成功
netstat -anp |grep 2181
2、docker 安裝 kafka
# 安裝鏡像
docker pull wurstmeister/kafka#啟動(dòng)容器
docker run -d --name kafka_server --restart always -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=<這里換成你的zookeeper地址和端口> -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://<這里換成你的kafka地址和端口> -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka-----------------------------------------------
# 發(fā)送消息與消費(fèi)測(cè)試
docker exec -it kafka_server /bin/bash# 進(jìn)入bin,注意你的版本號(hào)可能與我的不同
cd /opt/kafka_2.13-2.8.1/bin/# 發(fā)送消息
./kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test# 另起窗口,進(jìn)入容器
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --from-beginning
啟動(dòng)參數(shù)解釋:
KAFKA_BROKER_ID:該ID是集群的唯一標(biāo)識(shí)
KAFKA_ADVERTISED_LISTENERS:kafka發(fā)布到zookeeper供客戶(hù)端使用的服務(wù)地址。
KAFKA_ZOOKEEPER_CONNECT:zk的連接地址
KAFKA_LISTENERS:允許使用PLAINTEXT偵聽(tīng)器
如圖:發(fā)送和接收都成功
2.3、docker安裝kafka-map圖形化管理工具
# 拉取鏡像
docker pull dushixiang/kafka-map# 啟動(dòng)容器
docker run -d --name kafka-map -p 9080:8080 --restart always -v /home/kafka-map/data:/usr/local/kafka-map/data -e DEFAULT_USERNAME=admin -e DEFAULT_PASSWORD=admin --restart always dushixiang/kafka-map:latest
開(kāi)發(fā)端口,訪問(wèn)界面http://ip:9080
賬號(hào)密碼都是設(shè)置的admin
圖形化工具教程:
1、新建一個(gè)連接,連接你的kafka
2、點(diǎn)擊topic
3、進(jìn)入topic,可以看到需要消費(fèi)的信息與實(shí)時(shí)發(fā)送的消息
4、下拉消息
三、Kafka 命令行操作
3.1、主題命令行操作
進(jìn)入kafka目錄,如果是docker安裝的,進(jìn)入容器內(nèi)的opt/kafka_版本號(hào)目錄下
1)、查看操作主題命令參數(shù)
bin/kafka-topics.sh
topic的命名都是有規(guī)律的,按上面的提示,固定前綴 bin/kafka-topics.sh --bootstrap-server ip:9092 加上面的提示,如下
2)、查看當(dāng)前服務(wù)器中的所有 topic
bin/kafka-topics.sh --bootstrap-server ip:9092 --list
3)、創(chuàng)建 topic 主題,topic名:first
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic first
4)、查看 first 主題的詳情
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic first
5)、修改分區(qū)數(shù)(注意:分區(qū)數(shù)只能增加,不能減少)
bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic first --partitions 3
6)、刪除 topic
bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic first
3.2、生產(chǎn)者命令行操作
1)、查看操作生產(chǎn)者命令參數(shù)
bin/kafka-console-producer.sh
2)、發(fā)送消息(發(fā)送到topic為first)
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first
3.3、消費(fèi)者命令行操作
1)、查看操作消費(fèi)者命令參數(shù)
bin/kafka-console-consumer.sh
2)、消費(fèi)消息(監(jiān)聽(tīng)topic為first的)
只消費(fèi)新的:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first
把主題中所有的數(shù)據(jù)都讀取出來(lái)(包括歷史數(shù)據(jù)):
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic first
四、Springboot整合Kafka
4.1、依賴(lài)配置
pom依賴(lài)
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
yml配置
spring:# ======================== ↓↓↓↓↓↓ kafka相關(guān)配置 ↓↓↓↓↓↓ ===============================kafka:bootstrap-servers: ip:9092 # 指定kafka server地址,集群(多個(gè)逗號(hào)分隔)producer:# 指定消息key和消息體的編解碼方式key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer# 寫(xiě)入失敗時(shí),重試次數(shù)。當(dāng)leader節(jié)點(diǎn)失效,一個(gè)repli節(jié)點(diǎn)會(huì)替代成為leader節(jié)點(diǎn),此時(shí)可能出現(xiàn)寫(xiě)入失敗,# 當(dāng)retris為0時(shí),produce不會(huì)重復(fù)。retirs重發(fā),此時(shí)repli節(jié)點(diǎn)完全成為leader節(jié)點(diǎn),不會(huì)產(chǎn)生消息丟失。retries: 0# 每次批量發(fā)送消息的數(shù)量,produce積累到一定數(shù)據(jù),一次發(fā)送batch-size: 16384# produce積累數(shù)據(jù)一次發(fā)送,緩存大小達(dá)到buffer.memory就發(fā)送數(shù)據(jù)buffer-memory: 33554432consumer:group-id: default_consumer_group # 指定默認(rèn)消費(fèi)者 群組IDenable-auto-commit: true # true自動(dòng)提交auto-commit-interval: 1000# procedure要求leader在考慮完成請(qǐng)求之前收到的確認(rèn)數(shù),用于控制發(fā)送記錄在服務(wù)端的持久化,其值可以為如下:# acks = 0 如果設(shè)置為零,則生產(chǎn)者將不會(huì)等待來(lái)自服務(wù)器的任何確認(rèn),該記錄將立即添加到套接字緩沖區(qū)并視為已發(fā)送。在這種情況下,無(wú)法保證服務(wù)器已收到記錄,并且重試配置將不會(huì)生效(因?yàn)榭蛻?hù)端通常不會(huì)知道任何故障),為每條記錄返回的偏移量始終設(shè)置為-1。# acks = 1 這意味著leader會(huì)將記錄寫(xiě)入其本地日志,但無(wú)需等待所有副本服務(wù)器的完全確認(rèn)即可做出回應(yīng),在這種情況下,如果leader在確認(rèn)記錄后立即失敗,但在將數(shù)據(jù)復(fù)制到所有的副本服務(wù)器之前,則記錄將會(huì)丟失。# acks = all 這意味著leader將等待完整的同步副本集以確認(rèn)記錄,這保證了只要至少一個(gè)同步副本服務(wù)器仍然存活,記錄就不會(huì)丟失,這是最強(qiáng)有力的保證,這相當(dāng)于acks = -1的設(shè)置。# 可以設(shè)置的值為:all, -1, 0, 1acks: 1# 指定消息key和消息體的編解碼方式key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer
4.2、代碼示例
4.2.1、簡(jiǎn)單的生產(chǎn)、消費(fèi)
生產(chǎn)者
@RestController
public class Producer {@Resourceprivate KafkaTemplate kafkaTemplate;/*** 最簡(jiǎn)單的發(fā)送* @param msg*/@GetMapping("/test")public void test(String msg){kafkaTemplate.send("two", msg);}
消費(fèi)者
/*** 消費(fèi)監(jiān)聽(tīng) 自動(dòng)提交* 監(jiān)聽(tīng)所有的分區(qū)* @param record*/@KafkaListener(topics = "two")public void listen(ConsumerRecord<?, ?> record) {log.info("topic: " + record.topic() + " <|============|> 消息內(nèi)容:" + record.value());System.out.println("topic: " + record.topic() + " <|============|> 消息內(nèi)容:" + record.value());}
4.2.2、指定分區(qū)生產(chǎn)、消費(fèi)
在 Kafka 中,生產(chǎn)者可以指定消息的分區(qū)和鍵(Key)屬性。指定分區(qū)和鍵屬性可以帶來(lái)以下好處:
- 控制消息的分發(fā)
通過(guò)指定分區(qū),生產(chǎn)者可以控制消息被發(fā)送到哪個(gè)分區(qū),從而控制消息的分發(fā)。例如,如果您想要按照時(shí)間戳對(duì)消息進(jìn)行排序,可以將消息發(fā)送到同一個(gè)分區(qū)中。
- 提高消息的局部性
Kafka 會(huì)將同一個(gè)分區(qū)中的消息存儲(chǔ)在同一個(gè) Broker 上,這可以提高消息的局部性,從而提高消息的處理效率。
- 提高消息的可靠性
通過(guò)指定鍵屬性,生產(chǎn)者可以確保具有相同鍵的消息被發(fā)送到同一個(gè)分區(qū)中。這可以確保消息按照順序被處理,從而提高消息的可靠性。
生產(chǎn)者
/*** 指定分區(qū)發(fā)送* @param msg*/
@GetMapping("/test3")
public void test3(String msg){/*** 參數(shù)1:topic* 參數(shù)2:分區(qū)* 參數(shù)3:key* 參數(shù)4:消息內(nèi)容*/// 有keykafkaTemplate.send("two",0,"111", msg+"0分區(qū)");kafkaTemplate.send("two",1,"222", msg+"1分區(qū)");// 沒(méi)有keykafkaTemplate.send("two",2,null, msg+"2分區(qū)");// 發(fā)送另一個(gè)topic消費(fèi)者測(cè)試用kafkaTemplate.send("first",0,"111", msg+"first");
}
消費(fèi)者
/*** @Title 指定topic、partition、offset消費(fèi)* @Description 同時(shí)監(jiān)聽(tīng)two和first,監(jiān)聽(tīng)two的0號(hào)和1號(hào)分區(qū)、first的 "0號(hào)和1號(hào)" 分區(qū),指向1號(hào)分區(qū)的offset初始值為8**/@KafkaListener(topicPartitions = {@TopicPartition(topic = "two", partitions = { "0","1" }),@TopicPartition(topic = "first", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "8"))})public void onMessage2(ConsumerRecord<?, ?> record) {System.out.println("topic:"+record.topic()+"|partition:"+record.partition()+"|offset:"+record.offset()+"|value:"+record.value());}// ↓↓↓↓↓↓↓說(shuō)明示例↓↓↓↓↓↓↓↓↓// 如果沒(méi)指定分區(qū),就算你生產(chǎn)者發(fā)送了多個(gè)分區(qū),我也全都能接收//@KafkaListener(topics = "two")//public void listen(ConsumerRecord<?, ?> record) {//log.info("topic: " + record.topic() + " <|============|> 消息內(nèi)容:" + record.value());//System.out.println("topic: " + record.topic() + " <|============|> 消息內(nèi)容:" + record.value());//}
4.2.3、消息確認(rèn)
修改手動(dòng)確認(rèn)配置
spring:# ======================== ↓↓↓↓↓↓ kafka相關(guān)配置 ↓↓↓↓↓↓ ===============================kafka:bootstrap-servers: 你的ip:9092 # 指定kafka server地址,集群(多個(gè)逗號(hào)分隔)listener: #配置監(jiān)聽(tīng)者#ack-mode參數(shù)值如下#MANUAL:手動(dòng)確認(rèn)模式,消費(fèi)者需要手動(dòng)調(diào)用 Acknowledgment 對(duì)象的 acknowledge() 方法來(lái)確認(rèn)消息。#MANUAL_IMMEDIATE:立即手動(dòng)確認(rèn)模式,與 MANUAL 模式相同,但是消費(fèi)者在處理完消息后立即提交偏移量,而不是等待下一次拉取。#BATCH:批量確認(rèn)模式,消費(fèi)者將在處理完一批消息后自動(dòng)提交偏移量。#RECORD:記錄確認(rèn)模式,消費(fèi)者將在處理完每條消息后自動(dòng)提交偏移量。ack-mode: manual # 手動(dòng)提交方式#concurrency: 3 # 并發(fā)線程數(shù)producer:# 指定消息key和消息體的編解碼方式key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer# 寫(xiě)入失敗時(shí),重試次數(shù)。當(dāng)leader節(jié)點(diǎn)失效,一個(gè)repli節(jié)點(diǎn)會(huì)替代成為leader節(jié)點(diǎn),此時(shí)可能出現(xiàn)寫(xiě)入失敗,# 當(dāng)retris為0時(shí),produce不會(huì)重復(fù)。retirs重發(fā),此時(shí)repli節(jié)點(diǎn)完全成為leader節(jié)點(diǎn),不會(huì)產(chǎn)生消息丟失。retries: 0# 每次批量發(fā)送消息的數(shù)量,produce積累到一定數(shù)據(jù),一次發(fā)送batch-size: 16384# produce積累數(shù)據(jù)一次發(fā)送,緩存大小達(dá)到buffer.memory就發(fā)送數(shù)據(jù)buffer-memory: 33554432consumer:group-id: default_consumer_group # 指定默認(rèn)消費(fèi)者 群組IDenable-auto-commit: false # true自動(dòng)提交auto-commit-interval: 1000# procedure要求leader在考慮完成請(qǐng)求之前收到的確認(rèn)數(shù),用于控制發(fā)送記錄在服務(wù)端的持久化,其值可以為如下:# acks = 0 如果設(shè)置為零,則生產(chǎn)者將不會(huì)等待來(lái)自服務(wù)器的任何確認(rèn),該記錄將立即添加到套接字緩沖區(qū)并視為已發(fā)送。在這種情況下,無(wú)法保證服務(wù)器已收到記錄,并且重試配置將不會(huì)生效(因?yàn)榭蛻?hù)端通常不會(huì)知道任何故障),為每條記錄返回的偏移量始終設(shè)置為-1。# acks = 1 這意味著leader會(huì)將記錄寫(xiě)入其本地日志,但無(wú)需等待所有副本服務(wù)器的完全確認(rèn)即可做出回應(yīng),在這種情況下,如果leader在確認(rèn)記錄后立即失敗,但在將數(shù)據(jù)復(fù)制到所有的副本服務(wù)器之前,則記錄將會(huì)丟失。# acks = all 這意味著leader將等待完整的同步副本集以確認(rèn)記錄,這保證了只要至少一個(gè)同步副本服務(wù)器仍然存活,記錄就不會(huì)丟失,這是最強(qiáng)有力的保證,這相當(dāng)于acks = -1的設(shè)置。# 可以設(shè)置的值為:all, -1, 0, 1acks: 1# 指定消息key和消息體的編解碼方式key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer
生產(chǎn)者
/*** kafkaTemplate提供了一個(gè)回調(diào)方法addCallback,我們可以在回調(diào)方法中監(jiān)控消息是否發(fā)送成功 或 失敗時(shí)做補(bǔ)償處理* @param msg*/
@GetMapping("test1")
public void test1( String msg) {kafkaTemplate.send("three", msg).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {@Overridepublic void onFailure(Throwable ex) {System.out.println("發(fā)送消息失敗:"+ex.getMessage());}@Overridepublic void onSuccess(SendResult<String, Object> result) {System.out.println("發(fā)送消息成功:" + result.getRecordMetadata().topic() + "-"+ result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());}});
}
消費(fèi)者
/*** 手動(dòng)提交方式,需改配置為手動(dòng)模式,需修改配置* 監(jiān)聽(tīng)所有的分區(qū)* @param record* @param ack*/
@KafkaListener(topics = "three")
public void listen(ConsumerRecord<?, ?> record, Acknowledgment ack) {try {log.info("topic: " + record.topic() + " <|============|> 消息內(nèi)容:" + record.value());System.out.println("topic: " + record.topic() + " <|============|> 消息內(nèi)容:" + record.value());exception(); // 假設(shè)出現(xiàn)異常,沒(méi)有走下面的提交//手動(dòng)提交offsetack.acknowledge();} catch (Exception e) {e.printStackTrace();}
}private void exception() {throw new RuntimeException("異常了");
}
消費(fèi)者異常了后,消息沒(méi)有被確認(rèn),我們重啟項(xiàng)目時(shí),會(huì)再次出現(xiàn)那條未確認(rèn)的消息