中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁(yè) > news >正文

安美東莞網(wǎng)站建設(shè)東莞seo網(wǎng)絡(luò)營(yíng)銷(xiāo)

安美東莞網(wǎng)站建設(shè),東莞seo網(wǎng)絡(luò)營(yíng)銷(xiāo),初中做歷史的網(wǎng)站,蘇州工業(yè)園區(qū)招聘官網(wǎng)kafka 一、簡(jiǎn)介1.1、場(chǎng)景選擇,與其他mq相比1.2、應(yīng)用場(chǎng)景1.2.1、流量消峰1.2.2、解耦1.2.3、異步通訊 1.3、消息隊(duì)列的兩種模式1.3.1、點(diǎn)對(duì)點(diǎn)模式1.3.2、發(fā)布/訂閱模式 1.4、Kafka 基礎(chǔ)架構(gòu) 二、安裝部署2.1、安裝包方式2.2、docker安裝方式2.3、docker安裝kafka-ma…

kafka

  • 一、簡(jiǎn)介
    • 1.1、場(chǎng)景選擇,與其他mq相比
    • 1.2、應(yīng)用場(chǎng)景
      • 1.2.1、流量消峰
      • 1.2.2、解耦
      • 1.2.3、異步通訊
    • 1.3、消息隊(duì)列的兩種模式
      • 1.3.1、點(diǎn)對(duì)點(diǎn)模式
      • 1.3.2、發(fā)布/訂閱模式
    • 1.4、Kafka 基礎(chǔ)架構(gòu)
  • 二、安裝部署
    • 2.1、安裝包方式
    • 2.2、docker安裝方式
    • 2.3、docker安裝kafka-map圖形化管理工具
  • 三、Kafka 命令行操作
    • 3.1、主題命令行操作
    • 3.2、生產(chǎn)者命令行操作
    • 3.3、消費(fèi)者命令行操作
  • 四、Springboot整合Kafka
    • 4.1、依賴(lài)配置
    • 4.2、代碼示例
      • 4.2.1、簡(jiǎn)單的生產(chǎn)、消費(fèi)
      • 4.2.2、指定分區(qū)生產(chǎn)、消費(fèi)
      • 4.2.3、消息確認(rèn)

一、簡(jiǎn)介

1.1、場(chǎng)景選擇,與其他mq相比

1、目前企業(yè)中比較常見(jiàn)的消息隊(duì)列產(chǎn)品主 要有 Kafka、ActiveMQ 、RabbitMQ 、RocketMQ 等。

大數(shù)據(jù)場(chǎng)景主要采用 Kafka 作為消息隊(duì)列。在 JavaEE 開(kāi)發(fā)中主要采用 ActiveMQ、RabbitMQ、RocketMQ。


2、Kafka與其他消息隊(duì)列MQ(如ActiveMQ、RabbitMQ等)相比,有以下幾個(gè)區(qū)別:

  1. 磁盤(pán)存儲(chǔ):Kafka將所有消息都保存在磁盤(pán)上,并使用內(nèi)存映射文件進(jìn)行讀寫(xiě)。這種存儲(chǔ)方式可以支持大量的消息數(shù)據(jù),而且數(shù)據(jù)還可以保留很長(zhǎng)時(shí)間,比如幾個(gè)月甚至幾年。而其他MQ的存儲(chǔ)方式多是基于內(nèi)存,不適合存儲(chǔ)大量的數(shù)據(jù)。

  2. 分布式設(shè)計(jì):Kafka是作為分布式系統(tǒng)設(shè)計(jì)的,可以在多個(gè)節(jié)點(diǎn)之間實(shí)現(xiàn)消息的高效傳輸和處理。其他MQ也支持分布式部署,但Kafka在這方面更加優(yōu)秀。

  3. 發(fā)布/訂閱模式:Kafka采用發(fā)布/訂閱模式,允許多個(gè)消費(fèi)者同時(shí)訂閱同一個(gè)主題,而且Kafka消費(fèi)者可以自定義從哪個(gè)位置開(kāi)始消費(fèi)消息。其他MQ中,消費(fèi)者一般需要通過(guò)消費(fèi)者組來(lái)進(jìn)行負(fù)載均衡,而且其他MQ消費(fèi)者只能從當(dāng)前位置開(kāi)始消費(fèi)

  4. 大數(shù)據(jù)處理:Kafka最初是為大數(shù)據(jù)處理而設(shè)計(jì)的,它可以非常高效地處理海量數(shù)據(jù),適合用于數(shù)據(jù)倉(cāng)庫(kù)、日志處理、統(tǒng)計(jì)分析等場(chǎng)景。其他MQ則更多用于異步通信、任務(wù)調(diào)度、實(shí)時(shí)通知等領(lǐng)域。

  5. 生態(tài)系統(tǒng):Kafka擁有非常豐富的生態(tài)系統(tǒng),包括Kafka Connect、Kafka Streams等工具和框架,可以方便地與大數(shù)據(jù)處理平臺(tái)(如Hadoop、Spark、Flink等)進(jìn)行集成。其他MQ的生態(tài)系統(tǒng)相對(duì)較小。

1.2、應(yīng)用場(chǎng)景

1.2.1、流量消峰

如雙十一秒殺期間,參與用戶(hù):10億人/s,但是我們的系統(tǒng)只能支持處理能力:1千萬(wàn)人/s,為了避免服務(wù)掛掉或者請(qǐng)求超時(shí)等等問(wèn)題,我們可以將10億的請(qǐng)求都寫(xiě)入到消息隊(duì)列中,我們系統(tǒng)再去取消息隊(duì)列上的消息消費(fèi),達(dá)到流量消峰的效果。

在這里插入圖片描述

1.2.2、解耦

我們的數(shù)據(jù)源來(lái)源可能非常多,不可能都全部去集成。例如場(chǎng)景:我們要去買(mǎi)東西,不用去知道他是在哪里進(jìn)貨的,我們只需要去超市買(mǎi)就可以,消息隊(duì)列也是如此,那么多的數(shù)據(jù)讓他們?nèi)繉?xiě)入到kafka消息隊(duì)列中即可,我們?cè)偃ハ㈥?duì)列中獲取我們的數(shù)據(jù)。

在這里插入圖片描述

1.2.3、異步通訊

通常我們寫(xiě)代碼,如注冊(cè)完,要發(fā)短信,如果同步處理,等到發(fā)短信成功后再返回結(jié)果給用戶(hù),這樣請(qǐng)求時(shí)間太久了。
在這里插入圖片描述
消息隊(duì)列的方式,可以再注冊(cè)的時(shí)候,發(fā)送給隊(duì)列,我們這時(shí)候就可以返回給用戶(hù)注冊(cè)成功了,然后消費(fèi)者再去消費(fèi)發(fā)送短信的隊(duì)列,達(dá)到異步的效果。

在這里插入圖片描述


很多人可能會(huì)問(wèn),多線程的方式不是也能實(shí)現(xiàn)?到底選擇多線程還是消息隊(duì)列呢?

  1. 當(dāng)需要進(jìn)行任務(wù)處理,并且任務(wù)處理之間沒(méi)有明顯的依賴(lài)關(guān)系時(shí),使用消息隊(duì)列更適合。將任務(wù)發(fā)送到消息隊(duì)列中,由消費(fèi)者進(jìn)行消費(fèi),這樣可以實(shí)現(xiàn)解耦、提高可靠性和系統(tǒng)的擴(kuò)展性。例如,一個(gè)網(wǎng)站需要生成大量的報(bào)告,將報(bào)告生成請(qǐng)求通過(guò)消息隊(duì)列進(jìn)行異步處理是一個(gè)不錯(cuò)的選擇。

  2. 當(dāng)需要對(duì)任務(wù)進(jìn)行精細(xì)控制,并且任務(wù)處理之間存在明顯的依賴(lài)關(guān)系時(shí),使用多線程更適合。多線程可以實(shí)現(xiàn)更加細(xì)粒度的任務(wù)處理,可以控制任務(wù)的執(zhí)行順序、進(jìn)行資源的共享等。例如,一個(gè)電商網(wǎng)站需要實(shí)時(shí)監(jiān)控庫(kù)存的變化,需要在某個(gè)商品的庫(kù)存下降到一定數(shù)量時(shí)進(jìn)行補(bǔ)貨,在這種情況下使用多線程處理更加合適。

總之,消息隊(duì)列和多線程都有自己的優(yōu)勢(shì)和劣勢(shì),要根據(jù)具體的場(chǎng)景選擇合適的方式,才能更好地提高系統(tǒng)效率和可靠性。

1.3、消息隊(duì)列的兩種模式

1.3.1、點(diǎn)對(duì)點(diǎn)模式

消費(fèi)者主動(dòng)拉取數(shù)據(jù),消息收到后清除消息

在這里插入圖片描述

1.3.2、發(fā)布/訂閱模式

  • 可以有多個(gè)topic主題(瀏覽、點(diǎn)贊、收藏、評(píng)論等)
  • 消費(fèi)者消費(fèi)數(shù)據(jù)之后,不刪除數(shù)據(jù)(可以控制什么時(shí)候刪除)
  • 每個(gè)消費(fèi)者相互獨(dú)立,都可以消費(fèi)到數(shù)據(jù)

這個(gè)方式用的多,它可以處理更多復(fù)雜的場(chǎng)景。

在這里插入圖片描述

1.4、Kafka 基礎(chǔ)架構(gòu)

1、海量數(shù)據(jù)分而置之,為方便擴(kuò)展,并提高吞吐量,一個(gè)topic分為多個(gè)partition分區(qū)。

如100T的數(shù)據(jù),我可以分成3個(gè)區(qū),每個(gè)區(qū)三十幾G,可以提高吞吐量。
在這里插入圖片描述

2、配合分區(qū)的設(shè)計(jì),提出消費(fèi)者組的概念,組內(nèi)每個(gè)消費(fèi)者并行消費(fèi)

一個(gè)分區(qū)的數(shù)據(jù),只能由一個(gè)消費(fèi)者處理,如有兩個(gè)消費(fèi)者消費(fèi)不知道由第一個(gè)還是第二個(gè)來(lái)消費(fèi)。
在這里插入圖片描述3、為提高可用性,為每個(gè)partition增加若干副本,防止一個(gè)分區(qū)掛了,類(lèi)似NameNode HA

副本分為leader和fallower之分,follower不做被消費(fèi),只是為了防止leader副本掛了后,follower有條件成為leader,提高可用性。
在這里插入圖片描述

  1. zookeeper中記錄誰(shuí)是leader和整個(gè)集群中哪些服務(wù)器正在工作,Kafka2.8.0以后也可以配置不采用ZK,安裝包已內(nèi)置ZK

在這里插入圖片描述

  1. Producer:消息生產(chǎn)者,就是向 Kafka broker 發(fā)消息的客戶(hù)端。
  2. Consumer:消息消費(fèi)者,向 Kafka broker 取消息的客戶(hù)端。
  3. Consumer Group(CG):消費(fèi)者組,由多個(gè) consumer 組成。消費(fèi)者組內(nèi)每個(gè)消費(fèi)者負(fù)責(zé)消費(fèi)不同分區(qū)的數(shù)據(jù),一個(gè)分區(qū)只能由一個(gè)組內(nèi)消費(fèi)者消費(fèi);消費(fèi)者組之間互不影響。所有的消費(fèi)者都屬于某個(gè)消費(fèi)者組,即消費(fèi)者組是邏輯上的一個(gè)訂閱者。
  4. Broker:一臺(tái) Kafka 服務(wù)器就是一個(gè) broker。一個(gè)集群由多個(gè) broker 組成。一個(gè)broker 可以容納多個(gè) topic。
  5. Topic:可以理解為一個(gè)隊(duì)列,生產(chǎn)者和消費(fèi)者面向的都是一個(gè) topic。
  6. Partition:為了實(shí)現(xiàn)擴(kuò)展性,一個(gè)非常大的 topic 可以分布到多個(gè) broker(即服務(wù)器)上,一個(gè) topic 可以分為多個(gè) partition,每個(gè) partition 是一個(gè)有序的隊(duì)列。
  7. Replica:副本。一個(gè) topic 的每個(gè)分區(qū)都有若干個(gè)副本,一個(gè) Leader 和若干個(gè)Follower。
  8. Leader:每個(gè)分區(qū)多個(gè)副本的“主”,生產(chǎn)者發(fā)送數(shù)據(jù)的對(duì)象,以及消費(fèi)者消費(fèi)數(shù)據(jù)的對(duì)象都是 Leader。
  9. Follower:每個(gè)分區(qū)多個(gè)副本中的“從”,實(shí)時(shí)從 Leader 中同步數(shù)據(jù),保持和Leader 數(shù)據(jù)的同步。Leader 發(fā)生故障時(shí),某個(gè) Follower 會(huì)成為新的 Leader。

二、安裝部署

2.1、安裝包方式

官網(wǎng)下載鏈接: https://kafka.apache.org/downloads

1、選擇你要的版本進(jìn)行下載

在這里插入圖片描述

2、將安裝包上傳服務(wù)器,并解壓

tar -xzf kafka_2.12-3.4.0.tgz

進(jìn)入目錄
在這里插入圖片描述
3、修改config目錄下的server.properties配置文件

vim server.properties

修改log.dirs的路徑,這個(gè)是kafka存儲(chǔ)數(shù)據(jù)的地方,默認(rèn)放在了臨時(shí)文件夾里,容易被刪除,我們需改成我們服務(wù)器可以存放的目錄,如我放在home底下
在這里插入圖片描述
放開(kāi)注釋,修改地址成你的ip
在這里插入圖片描述
在這里插入圖片描述

4、修改kafka的環(huán)境變量

#編輯配置文件
vi /etc/profile# KAFKA_HOME
export KAFKA_HOME=kafka根目錄
export PATH=$PATH:$KAFKA_HOME/bin#例如筆者的文件路徑
export KAFKA_HOME=/www/wwwroot/kafka_2.12-3.4.0/
export PATH=$PATH:$KAFKA_HOME/bin# 添加配置后重新加載配置文件
source /etc/profile

5、配置config下的zookeeper配置,同樣也只是修改存儲(chǔ)路徑

vim zookeeper.properties將dataDir修改成你自己存儲(chǔ)路徑,如我的dataDir=/home/zookeeper
還有下面的advertised放開(kāi)注釋,更換ip,zookeeper連接也更換ip

在這里插入圖片描述

6、啟動(dòng),需先啟動(dòng)zookeeper
注意:要先有java環(huán)境

# 啟動(dòng)zookeeper,直接指定到你們自己的bin和config目錄
nohup /www/wwwroot/kafka_2.12-3.4.0/bin/zookeeper-server-start.sh  -daemon  /www/wwwroot/kafka_2.12-3.4.0/config/zookeeper.properties# 啟動(dòng)kafka
nohup /www/wwwroot/kafka_2.12-3.4.0/bin/kafka-server-start.sh -daemon /www/wwwroot/kafka_2.12-3.4.0/config/server.properties

7、驗(yàn)證是否啟動(dòng)成功,查看端口是有被用,或者看日志文件

在這里插入圖片描述
在這里插入圖片描述

2.2、docker安裝方式

1、docker 安裝zookeeper

# 安裝鏡像
docker pull wurstmeister/zookeeper#啟動(dòng)容器
docker run -d --name zookeeper_server  --restart always  -p 2181:2181  wurstmeister/zookeeper# 查看端口是否啟動(dòng)成功
netstat -anp |grep 2181

2、docker 安裝 kafka

# 安裝鏡像
docker pull wurstmeister/kafka#啟動(dòng)容器
docker run -d --name kafka_server  --restart always -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=<這里換成你的zookeeper地址和端口> -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://<這里換成你的kafka地址和端口> -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka-----------------------------------------------
# 發(fā)送消息與消費(fèi)測(cè)試
docker exec -it kafka_server /bin/bash# 進(jìn)入bin,注意你的版本號(hào)可能與我的不同
cd /opt/kafka_2.13-2.8.1/bin/# 發(fā)送消息
./kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test# 另起窗口,進(jìn)入容器
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --from-beginning

啟動(dòng)參數(shù)解釋:
KAFKA_BROKER_ID:該ID是集群的唯一標(biāo)識(shí)

KAFKA_ADVERTISED_LISTENERS:kafka發(fā)布到zookeeper供客戶(hù)端使用的服務(wù)地址。

KAFKA_ZOOKEEPER_CONNECT:zk的連接地址

KAFKA_LISTENERS:允許使用PLAINTEXT偵聽(tīng)器

如圖:發(fā)送和接收都成功
在這里插入圖片描述在這里插入圖片描述

2.3、docker安裝kafka-map圖形化管理工具

# 拉取鏡像
docker pull dushixiang/kafka-map# 啟動(dòng)容器
docker run -d --name kafka-map -p 9080:8080 --restart always  -v /home/kafka-map/data:/usr/local/kafka-map/data -e DEFAULT_USERNAME=admin  -e DEFAULT_PASSWORD=admin  --restart always dushixiang/kafka-map:latest

開(kāi)發(fā)端口,訪問(wèn)界面http://ip:9080 賬號(hào)密碼都是設(shè)置的admin

圖形化工具教程:

1、新建一個(gè)連接,連接你的kafka
在這里插入圖片描述2、點(diǎn)擊topic

在這里插入圖片描述3、進(jìn)入topic,可以看到需要消費(fèi)的信息與實(shí)時(shí)發(fā)送的消息

在這里插入圖片描述4、下拉消息

在這里插入圖片描述

三、Kafka 命令行操作

3.1、主題命令行操作

進(jìn)入kafka目錄,如果是docker安裝的,進(jìn)入容器內(nèi)的opt/kafka_版本號(hào)目錄下

1)、查看操作主題命令參數(shù)

bin/kafka-topics.sh

在這里插入圖片描述topic的命名都是有規(guī)律的,按上面的提示,固定前綴 bin/kafka-topics.sh --bootstrap-server ip:9092 加上面的提示,如下

2)、查看當(dāng)前服務(wù)器中的所有 topic

bin/kafka-topics.sh --bootstrap-server  ip:9092 --list

3)、創(chuàng)建 topic 主題,topic名:first

bin/kafka-topics.sh --bootstrap-server  localhost:9092 --create --partitions 1 --replication-factor 1 --topic first

4)、查看 first 主題的詳情

bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic first

5)、修改分區(qū)數(shù)(注意:分區(qū)數(shù)只能增加,不能減少)

bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic first --partitions 3

6)、刪除 topic

bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic first

3.2、生產(chǎn)者命令行操作

1)、查看操作生產(chǎn)者命令參數(shù)

bin/kafka-console-producer.sh

在這里插入圖片描述
2)、發(fā)送消息(發(fā)送到topic為first)

bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first

3.3、消費(fèi)者命令行操作

1)、查看操作消費(fèi)者命令參數(shù)

bin/kafka-console-consumer.sh

在這里插入圖片描述
2)、消費(fèi)消息(監(jiān)聽(tīng)topic為first的)

只消費(fèi)新的:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first

把主題中所有的數(shù)據(jù)都讀取出來(lái)(包括歷史數(shù)據(jù)):

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092   --from-beginning --topic first

四、Springboot整合Kafka

4.1、依賴(lài)配置

pom依賴(lài)

 <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

yml配置

spring:# ======================== ↓↓↓↓↓↓ kafka相關(guān)配置 ↓↓↓↓↓↓ ===============================kafka:bootstrap-servers: ip:9092 # 指定kafka server地址,集群(多個(gè)逗號(hào)分隔)producer:# 指定消息key和消息體的編解碼方式key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer# 寫(xiě)入失敗時(shí),重試次數(shù)。當(dāng)leader節(jié)點(diǎn)失效,一個(gè)repli節(jié)點(diǎn)會(huì)替代成為leader節(jié)點(diǎn),此時(shí)可能出現(xiàn)寫(xiě)入失敗,# 當(dāng)retris為0時(shí),produce不會(huì)重復(fù)。retirs重發(fā),此時(shí)repli節(jié)點(diǎn)完全成為leader節(jié)點(diǎn),不會(huì)產(chǎn)生消息丟失。retries: 0# 每次批量發(fā)送消息的數(shù)量,produce積累到一定數(shù)據(jù),一次發(fā)送batch-size: 16384# produce積累數(shù)據(jù)一次發(fā)送,緩存大小達(dá)到buffer.memory就發(fā)送數(shù)據(jù)buffer-memory: 33554432consumer:group-id: default_consumer_group # 指定默認(rèn)消費(fèi)者 群組IDenable-auto-commit: true  # true自動(dòng)提交auto-commit-interval: 1000# procedure要求leader在考慮完成請(qǐng)求之前收到的確認(rèn)數(shù),用于控制發(fā)送記錄在服務(wù)端的持久化,其值可以為如下:# acks = 0 如果設(shè)置為零,則生產(chǎn)者將不會(huì)等待來(lái)自服務(wù)器的任何確認(rèn),該記錄將立即添加到套接字緩沖區(qū)并視為已發(fā)送。在這種情況下,無(wú)法保證服務(wù)器已收到記錄,并且重試配置將不會(huì)生效(因?yàn)榭蛻?hù)端通常不會(huì)知道任何故障),為每條記錄返回的偏移量始終設(shè)置為-1。# acks = 1 這意味著leader會(huì)將記錄寫(xiě)入其本地日志,但無(wú)需等待所有副本服務(wù)器的完全確認(rèn)即可做出回應(yīng),在這種情況下,如果leader在確認(rèn)記錄后立即失敗,但在將數(shù)據(jù)復(fù)制到所有的副本服務(wù)器之前,則記錄將會(huì)丟失。# acks = all 這意味著leader將等待完整的同步副本集以確認(rèn)記錄,這保證了只要至少一個(gè)同步副本服務(wù)器仍然存活,記錄就不會(huì)丟失,這是最強(qiáng)有力的保證,這相當(dāng)于acks = -1的設(shè)置。# 可以設(shè)置的值為:all, -1, 0, 1acks: 1# 指定消息key和消息體的編解碼方式key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

4.2、代碼示例

4.2.1、簡(jiǎn)單的生產(chǎn)、消費(fèi)

生產(chǎn)者

@RestController
public class Producer {@Resourceprivate KafkaTemplate kafkaTemplate;/*** 最簡(jiǎn)單的發(fā)送* @param msg*/@GetMapping("/test")public void test(String msg){kafkaTemplate.send("two", msg);}

消費(fèi)者

	/*** 消費(fèi)監(jiān)聽(tīng) 自動(dòng)提交* 監(jiān)聽(tīng)所有的分區(qū)* @param record*/@KafkaListener(topics = "two")public void listen(ConsumerRecord<?, ?> record) {log.info("topic: " + record.topic() + " <|============|> 消息內(nèi)容:" + record.value());System.out.println("topic: " + record.topic() + " <|============|> 消息內(nèi)容:" + record.value());}

4.2.2、指定分區(qū)生產(chǎn)、消費(fèi)

在 Kafka 中,生產(chǎn)者可以指定消息的分區(qū)和鍵(Key)屬性。指定分區(qū)和鍵屬性可以帶來(lái)以下好處:

  1. 控制消息的分發(fā)

通過(guò)指定分區(qū),生產(chǎn)者可以控制消息被發(fā)送到哪個(gè)分區(qū),從而控制消息的分發(fā)。例如,如果您想要按照時(shí)間戳對(duì)消息進(jìn)行排序,可以將消息發(fā)送到同一個(gè)分區(qū)中。

  1. 提高消息的局部性

Kafka 會(huì)將同一個(gè)分區(qū)中的消息存儲(chǔ)在同一個(gè) Broker 上,這可以提高消息的局部性,從而提高消息的處理效率。

  1. 提高消息的可靠性

通過(guò)指定鍵屬性,生產(chǎn)者可以確保具有相同鍵的消息被發(fā)送到同一個(gè)分區(qū)中。這可以確保消息按照順序被處理,從而提高消息的可靠性。

生產(chǎn)者

/*** 指定分區(qū)發(fā)送* @param msg*/
@GetMapping("/test3")
public void test3(String msg){/*** 參數(shù)1:topic* 參數(shù)2:分區(qū)* 參數(shù)3:key* 參數(shù)4:消息內(nèi)容*/// 有keykafkaTemplate.send("two",0,"111", msg+"0分區(qū)");kafkaTemplate.send("two",1,"222", msg+"1分區(qū)");// 沒(méi)有keykafkaTemplate.send("two",2,null, msg+"2分區(qū)");// 發(fā)送另一個(gè)topic消費(fèi)者測(cè)試用kafkaTemplate.send("first",0,"111", msg+"first");
}

消費(fèi)者

	/*** @Title 指定topic、partition、offset消費(fèi)* @Description 同時(shí)監(jiān)聽(tīng)two和first,監(jiān)聽(tīng)two的0號(hào)和1號(hào)分區(qū)、first的 "0號(hào)和1號(hào)" 分區(qū),指向1號(hào)分區(qū)的offset初始值為8**/@KafkaListener(topicPartitions = {@TopicPartition(topic = "two", partitions = { "0","1" }),@TopicPartition(topic = "first", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "8"))})public void onMessage2(ConsumerRecord<?, ?> record) {System.out.println("topic:"+record.topic()+"|partition:"+record.partition()+"|offset:"+record.offset()+"|value:"+record.value());}// ↓↓↓↓↓↓↓說(shuō)明示例↓↓↓↓↓↓↓↓↓// 如果沒(méi)指定分區(qū),就算你生產(chǎn)者發(fā)送了多個(gè)分區(qū),我也全都能接收//@KafkaListener(topics = "two")//public void listen(ConsumerRecord<?, ?> record) {//log.info("topic: " + record.topic() + " <|============|> 消息內(nèi)容:" + record.value());//System.out.println("topic: " + record.topic() + " <|============|> 消息內(nèi)容:" + record.value());//}

在這里插入圖片描述

4.2.3、消息確認(rèn)

修改手動(dòng)確認(rèn)配置
在這里插入圖片描述

spring:# ======================== ↓↓↓↓↓↓ kafka相關(guān)配置 ↓↓↓↓↓↓ ===============================kafka:bootstrap-servers: 你的ip:9092 # 指定kafka server地址,集群(多個(gè)逗號(hào)分隔)listener: #配置監(jiān)聽(tīng)者#ack-mode參數(shù)值如下#MANUAL:手動(dòng)確認(rèn)模式,消費(fèi)者需要手動(dòng)調(diào)用 Acknowledgment 對(duì)象的 acknowledge() 方法來(lái)確認(rèn)消息。#MANUAL_IMMEDIATE:立即手動(dòng)確認(rèn)模式,與 MANUAL 模式相同,但是消費(fèi)者在處理完消息后立即提交偏移量,而不是等待下一次拉取。#BATCH:批量確認(rèn)模式,消費(fèi)者將在處理完一批消息后自動(dòng)提交偏移量。#RECORD:記錄確認(rèn)模式,消費(fèi)者將在處理完每條消息后自動(dòng)提交偏移量。ack-mode: manual # 手動(dòng)提交方式#concurrency: 3 # 并發(fā)線程數(shù)producer:# 指定消息key和消息體的編解碼方式key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer# 寫(xiě)入失敗時(shí),重試次數(shù)。當(dāng)leader節(jié)點(diǎn)失效,一個(gè)repli節(jié)點(diǎn)會(huì)替代成為leader節(jié)點(diǎn),此時(shí)可能出現(xiàn)寫(xiě)入失敗,# 當(dāng)retris為0時(shí),produce不會(huì)重復(fù)。retirs重發(fā),此時(shí)repli節(jié)點(diǎn)完全成為leader節(jié)點(diǎn),不會(huì)產(chǎn)生消息丟失。retries: 0# 每次批量發(fā)送消息的數(shù)量,produce積累到一定數(shù)據(jù),一次發(fā)送batch-size: 16384# produce積累數(shù)據(jù)一次發(fā)送,緩存大小達(dá)到buffer.memory就發(fā)送數(shù)據(jù)buffer-memory: 33554432consumer:group-id: default_consumer_group # 指定默認(rèn)消費(fèi)者 群組IDenable-auto-commit: false  # true自動(dòng)提交auto-commit-interval: 1000# procedure要求leader在考慮完成請(qǐng)求之前收到的確認(rèn)數(shù),用于控制發(fā)送記錄在服務(wù)端的持久化,其值可以為如下:# acks = 0 如果設(shè)置為零,則生產(chǎn)者將不會(huì)等待來(lái)自服務(wù)器的任何確認(rèn),該記錄將立即添加到套接字緩沖區(qū)并視為已發(fā)送。在這種情況下,無(wú)法保證服務(wù)器已收到記錄,并且重試配置將不會(huì)生效(因?yàn)榭蛻?hù)端通常不會(huì)知道任何故障),為每條記錄返回的偏移量始終設(shè)置為-1。# acks = 1 這意味著leader會(huì)將記錄寫(xiě)入其本地日志,但無(wú)需等待所有副本服務(wù)器的完全確認(rèn)即可做出回應(yīng),在這種情況下,如果leader在確認(rèn)記錄后立即失敗,但在將數(shù)據(jù)復(fù)制到所有的副本服務(wù)器之前,則記錄將會(huì)丟失。# acks = all 這意味著leader將等待完整的同步副本集以確認(rèn)記錄,這保證了只要至少一個(gè)同步副本服務(wù)器仍然存活,記錄就不會(huì)丟失,這是最強(qiáng)有力的保證,這相當(dāng)于acks = -1的設(shè)置。# 可以設(shè)置的值為:all, -1, 0, 1acks: 1# 指定消息key和消息體的編解碼方式key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

生產(chǎn)者

/*** kafkaTemplate提供了一個(gè)回調(diào)方法addCallback,我們可以在回調(diào)方法中監(jiān)控消息是否發(fā)送成功 或 失敗時(shí)做補(bǔ)償處理* @param msg*/
@GetMapping("test1")
public void test1( String msg) {kafkaTemplate.send("three", msg).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {@Overridepublic void onFailure(Throwable ex) {System.out.println("發(fā)送消息失敗:"+ex.getMessage());}@Overridepublic void onSuccess(SendResult<String, Object> result) {System.out.println("發(fā)送消息成功:" + result.getRecordMetadata().topic() + "-"+ result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());}});
}

消費(fèi)者

/*** 手動(dòng)提交方式,需改配置為手動(dòng)模式,需修改配置* 監(jiān)聽(tīng)所有的分區(qū)* @param record* @param ack*/
@KafkaListener(topics = "three")
public void listen(ConsumerRecord<?, ?> record, Acknowledgment ack) {try {log.info("topic: " + record.topic() + " <|============|> 消息內(nèi)容:" + record.value());System.out.println("topic: " + record.topic() + " <|============|> 消息內(nèi)容:" + record.value());exception(); // 假設(shè)出現(xiàn)異常,沒(méi)有走下面的提交//手動(dòng)提交offsetack.acknowledge();} catch (Exception e) {e.printStackTrace();}
}private void exception() {throw new RuntimeException("異常了");
}

消費(fèi)者異常了后,消息沒(méi)有被確認(rèn),我們重啟項(xiàng)目時(shí),會(huì)再次出現(xiàn)那條未確認(rèn)的消息

http://www.risenshineclean.com/news/54482.html

相關(guān)文章:

  • Spring做網(wǎng)站和什么內(nèi)容企業(yè)推廣
  • 有哪些網(wǎng)站是做采購(gòu)招標(biāo)的優(yōu)化網(wǎng)站推廣教程排名
  • 外國(guó)黃色網(wǎng)站今日預(yù)測(cè)足球比分預(yù)測(cè)
  • 做爰全過(guò)程網(wǎng)站seo實(shí)戰(zhàn)技術(shù)培訓(xùn)
  • 專(zhuān)做網(wǎng)站app拉新推廣賺傭金
  • 網(wǎng)頁(yè)游戲排行榜3d商丘網(wǎng)站優(yōu)化公司
  • 做動(dòng)圖素材網(wǎng)站百度知道首頁(yè)登錄入口
  • 新服務(wù)器做網(wǎng)站高端婚戀網(wǎng)站排名
  • wordpress小程序調(diào)用seo關(guān)鍵詞排名優(yōu)化銷(xiāo)售
  • 靜態(tài)網(wǎng)站制作模板代寫(xiě)文章
  • 用什么軟件做介紹視頻網(wǎng)站怎么樣推廣自己的網(wǎng)址
  • 自己怎么做獨(dú)立網(wǎng)站整站排名服務(wù)
  • 羅湖做網(wǎng)站多少錢(qián)域名搜索
  • 學(xué)做海報(bào)的網(wǎng)站朋友圈網(wǎng)絡(luò)營(yíng)銷(xiāo)
  • 學(xué)歷網(wǎng)站怎么做seo搜索引擎優(yōu)化營(yíng)銷(xiāo)案例
  • 同里做網(wǎng)站網(wǎng)絡(luò)推廣員好做嗎
  • 哪個(gè)網(wǎng)站可以做印章圖案鄭州網(wǎng)絡(luò)營(yíng)銷(xiāo)排名
  • 中國(guó)建設(shè)教育網(wǎng)站職業(yè)技術(shù)培訓(xùn)
  • 外貿(mào)網(wǎng)站建設(shè)公司如何網(wǎng)上營(yíng)銷(xiāo)
  • 網(wǎng)站后臺(tái)管理系統(tǒng)開(kāi)發(fā)快手作品推廣網(wǎng)站
  • 真題真做報(bào)名網(wǎng)站寧波seo推廣方式排名
  • 外貿(mào)手機(jī)網(wǎng)站模板全網(wǎng)關(guān)鍵詞云查詢(xún)
  • 東阿縣城市建設(shè)局網(wǎng)站seo優(yōu)化網(wǎng)
  • 德州做網(wǎng)站建設(shè)的公司哪家好網(wǎng)站是怎么做出來(lái)的
  • 基本網(wǎng)頁(yè)設(shè)計(jì)seo分析師招聘
  • 微網(wǎng)站微信數(shù)據(jù)庫(kù)設(shè)計(jì)創(chuàng)建網(wǎng)站
  • 天貓轉(zhuǎn)讓濟(jì)南seo培訓(xùn)
  • 企業(yè)網(wǎng)站建設(shè) 論文百度智能建站系統(tǒng)
  • 云南網(wǎng)站設(shè)計(jì)公司關(guān)鍵詞歌詞打印
  • 中國(guó)營(yíng)銷(xiāo)新聞網(wǎng)合肥百度快照優(yōu)化排名