中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當前位置: 首頁 > news >正文

網站如何兼容大多瀏覽器怎么注冊電商平臺

網站如何兼容大多瀏覽器,怎么注冊電商平臺,網站建設汽車后市場,網站右側出現(xiàn)百度名片叫什么Kafka入門4.0.0版本(基于Java、SpringBoot操作) 一、kafka概述 Kafka最初是由LinkedIn公司開發(fā)的,是一個高可靠、高吞吐量、低延遲的分布式發(fā)布訂閱消息系統(tǒng),它使用Scala語言編寫,并于2010年被貢獻給了Apache基金會&…

Kafka入門4.0.0版本(基于Java、SpringBoot操作)

一、kafka概述

Kafka最初是由LinkedIn公司開發(fā)的,是一個高可靠、高吞吐量、低延遲的分布式發(fā)布訂閱消息系統(tǒng),它使用Scala語言編寫,并于2010年被貢獻給了Apache基金會,隨后成為Apache的頂級開源項目。主要特點有:

  1. 為發(fā)布和訂閱提供高吞吐量
  2. 消息持久化
  3. 分布式
  4. 消費消息采用Pull模式
  5. 支持在線和離線場景

本次采用最新的kafka版本4.0.0,Kafka 4.0 最引人矚目的變化之一,當屬其默認運行在 KRaft(Kafka Raft)模式下,徹底擺脫了對 Apache ZooKeeper 的依賴。在 Kafka 的發(fā)展歷程中,ZooKeeper 曾是其核心組件,負責協(xié)調分布式系統(tǒng)中的元數據管理、Broker 注冊、主題分區(qū)分配等關鍵任務。然而,隨著 Kafka 功能的不斷豐富與用戶規(guī)模的持續(xù)擴大,ZooKeeper 逐漸成為系統(tǒng)部署和運維中的一個復雜性來源,增加了運營成本與管理難度。

KRaft 模式的引入,標志著 Kafka 在架構上的自我進化達到了一個新高度。通過采用基于 Raft 一致性算法的共識機制,Kafka 將元數據管理內嵌于自身體系,實現(xiàn)了對 ZooKeeper 的無縫替代。這一轉變帶來了多方面的顯著優(yōu)勢:

簡化部署與運維:運維人員無需再為維護 ZooKeeper 集群投入額外精力,降低了整體運營開銷。新架構減少了系統(tǒng)的復雜性,使得 Kafka 的安裝、配置和日常管理變得更加直觀和高效。

增強可擴展性:KRaft 模式下,Kafka 集群的擴展性得到了進一步提升。新增 Broker 節(jié)點的加入流程更加簡便,能夠更好地適應大規(guī)模數據處理場景下對系統(tǒng)資源動態(tài)調整的需求。

提升系統(tǒng)性能與穩(wěn)定性:去除 ZooKeeper 這一外部依賴后,Kafka 在元數據操作的響應速度和一致性方面表現(xiàn)出色。尤其是在高并發(fā)寫入和讀取場景下,系統(tǒng)的穩(wěn)定性和可靠性得到了增強,減少了因外部組件故障可能導致的單點問題。

  • 之前的架構

在這里插入圖片描述

  • 現(xiàn)在的架構

在這里插入圖片描述

kafka消費模型

不同消費者組可以消費全量的消息,相同消費者組內的消費者只能消費一部分。

在這里插入圖片描述

kafka基本概念

Producer(生產者)

消息的生產者,負責將消息發(fā)送到Kafka集群中。

Consumer(消費者)

消息的消費者,負責從Kafka集群中讀取并處理消息

Broker(服務代理節(jié)點)

Kafka集群中的一個或多個服務器,負責存儲和轉發(fā)消息。

Topic(主題)

Kafka中的消息以主題為單位進行歸類,生產者發(fā)送消息到特定主題,消費者訂閱并消費這些主題的消息。

Partition(分區(qū))

每個主題可以細分為多個分區(qū),分區(qū)是Kafka存儲消息的物理單位,每個分區(qū)可以看作是一個有序的、不可變的消息序列。

Replica(副本)

Kafka為每個分區(qū)引入了多副本機制,以提高數據的安全性和可靠性。副本分為leader和follower,其中l(wèi)eader負責處理讀寫請求,follower負責從leader同步數據。

Consumer Group(消費者組)

由多個消費者組成,消費者組內的消費者共同消費同一個主題的消息,但每個消費者只負責消費該主題的一個或多個分區(qū),避免消息重復消費。

kraft

通過采用基于 Raft 一致性算法的共識機制,Kafka 將元數據管理內嵌于自身體系,實現(xiàn)了對 ZooKeeper 的無縫替代

kafka發(fā)送端采用push模式

kafka消費端采用pull模式訂閱并消費消息

在這里插入圖片描述

Kafka的工作原理

可以概括為以下幾個步驟:

  • 消息發(fā)布: 生產者將消息發(fā)送到Kafka集群的特定主題,并可以選擇發(fā)送到該主題的哪個分區(qū)。如果未指定分區(qū),Kafka會根據負載均衡策略自動選擇分區(qū)。

  • 消息存儲: Kafka將接收到的消息存儲在磁盤上的分區(qū)中,每個分區(qū)都是一個有序的消息序列。Kafka使用順序寫入和零拷貝技術來提高寫入性能,并通過多副本機制確保數據的安全性和可靠性。

在這里插入圖片描述

  • 消息消費: 消費者組中的消費者從Kafka集群中訂閱并消費消息。每個消費者負責消費一個或多個分區(qū)中的消息,并確保消息至少被消費一次。消費者可以使用拉(Pull)模式或推(Push)模式從Kafka中拉取消息。

在這里插入圖片描述

  • 負載均衡: Kafka通過ZooKeeper維護集群的元數據信息,包括分區(qū)和消費者的對應關系。當消費者數量或分區(qū)數量發(fā)生變化時,Kafka會重新分配分區(qū)給消費者,以實現(xiàn)負載均衡。

  • 容錯機制: Kafka通過多副本機制實現(xiàn)容錯。當leader副本出現(xiàn)故障時,Kafka會從ISR(In-Sync Replicas)集合中選擇一個新的leader副本繼續(xù)對外提供服務。同時,Kafka還提供了多種可靠性級別供用戶選擇,以滿足不同的業(yè)務需求。

kafka特點

一、Kafka的持久化機制

Kafka的持久化機制主要涉及消息的存儲和復制。Kafka以日志的形式存儲消息,每個主題(Topic)被劃分為多個分區(qū)(Partition),每個分區(qū)中的消息按照順序進行存儲。Kafka使用多個副本(Replica)保證消息的持久性和可靠性,每個分區(qū)的消息會被復制到多個副本中,以防止數據丟失。此外,Kafka還允許根據配置的保留策略來保留已消費的消息一段時間,以便在需要時進行檢索和恢復。

Kafka的副本機制是其實現(xiàn)高可用性和數據持久性的重要基石。每個主題的每個分區(qū)都配置有多個副本,這些副本分散保存在不同的Broker上,從而能夠對抗部分Broker宕機帶來的數據不可用問題。Kafka的副本機制包括領導者副本(Leader Replica)和追隨者副本(Follower Replica):

領導者副本:負責處理所有的讀寫請求,包括生產者的消息寫入和消費者的消息讀取。

追隨者副本:從領導者副本異步拉取消息,并寫入到自己的提交日志中,從而實現(xiàn)與領導者副本的同步。追隨者副本不對外提供服務,只作為數據的冗余備份。

Kafka還引入了ISR(In-Sync Replicas)機制,即與領導者副本保持同步的副本集合。只有處于ISR中的副本才能參與到消息的寫入和讀取過程中,以確保數據的一致性和可靠性。當某個副本與領導者副本的同步延遲超過一定的閾值時,它會被踢出ISR,直到同步恢復正常。

二、Kafka的數據一致性

Kafka通過多個機制來確保數據的一致性,包括副本同步、ISR機制、生產者事務和消費者事務等:

副本同步:確保主副本將數據同步到所有副本的過程,在副本同步完成之前,生產者才會認為消息已經被成功寫入。

ISR機制:通過動態(tài)調整ISR列表中的副本,確保只有可靠的副本參與到數據的讀寫操作,從而提高數據的一致性和可靠性。

生產者事務:Kafka的生產者事務機制可以確保消息的Exactly-Once語義,即消息不會被重復寫入或丟失。生產者事務將消息的發(fā)送和位移提交等操作放在同一個事務中,一旦事務提交成功,就意味著消息已經被成功寫入,并且對應的位移也已經提交。

消費者事務:雖然Kafka的消費者通常不直接支持事務但消費者可以通過提交位移(Offset)來確保消息的正確消費。消費者事務將消息的拉取和位移提交等操作放在同一個事務中,以確保消息不會被重復消費或丟失。

二、kafka應用

2.1 win11安裝kafka4.0.0

下載地址:https://kafka.apache.org/downloads 下載最后一個kafka-2.13-4.0.0.tgz

在這里插入圖片描述

下載好之后,把這個壓縮包解壓就行了,然后找到config下面的server.properties

找到log.dirs改成自己電腦上的目錄

log.dirs=E:\\runSoft\\kafka\\data

在這里插入圖片描述

  • 第一步 獲取uuid

先打開命令行,進入到bin下面的windows目錄下

命令

kafka-storage.bat random-uuid

在這里插入圖片描述

先獲取uuid,我的uuid為ANVnC_s-QYGJF1C7wu9Aww

  • 第二步 格式化日志

命令:

kafka-storage.bat format --standalone -t PPEZ2LW8T8yjZNWnfNHorQ -c ../../config/server.properties

在這里插入圖片描述

  • 第三步 啟動

打開命令行,進入到bin下面的windows目錄下 啟動命令

kafka-server-start.bat ../../config/server.properties
創(chuàng)建topic
kafka-topics.bat --create --topic quickstart-events --bootstrap-server localhost:9092
啟動一個消費端
kafka-console-consumer.bat --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
啟動一個生產端
kafka-console-consumer.bat --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
問題
1、如果提示如下
命令行  輸入行太長。
命令語法不正確。

則需要把目錄變短,目錄太長,win11不讓輸入。

2,tgz需要解壓兩次
只解壓一次是不行的,tgz是打包之后壓縮的。

3、如果啟動失敗,需要重新配置

重新配置時。把log.dirs的路徑下面的東西清空
2.2 java開發(fā)kafka

第一步,引入依賴

 <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>4.0.0</version>
</dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>2.0.17</version>
</dependency>

第二步,建立生產者

public class Producer {public static void main(String[] args) {Map<String,Object> props = new HashMap<>();//  kafka 集群 節(jié)點props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");String topic = "test";KafkaProducer<String,  String> producer = new KafkaProducer(props);producer.send(new ProducerRecord<String, String>(topic, "key", "value-1"));producer.send(new ProducerRecord<String, String>(topic, "key", "value-2"));producer.send(new ProducerRecord<String, String>(topic, "key", "value-3"));producer.close();}}

ProducerRecord 是 Kafka 中的一個核心類,它代表了一組 Kafka 需要發(fā)送的 key/value 鍵值對,它由記錄要發(fā)送到的主題名稱(Topic Name),**可選的分區(qū)號(Partition Number)**以及可選的鍵值對構成。

在這里插入圖片描述

第三步、建立消費者類

public class Consumer {public static void main(String[] args){Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String , String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("test"));while (true) {ConsumerRecords<String,String> records =  consumer.poll(Duration.ofDays(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("partition = %d ,offset = %d, key = %s, value = %s%n",record.partition(), record.offset(), record.key(), record.value());}}}
}

運行效果

在這里插入圖片描述

2.3 spring boot整合kafka

第一步,引入依賴

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.4.1</version><relativePath/> <!-- lookup parent in repository --></parent><artifactId>spring_boot_kafka_demo</artifactId><packaging>jar</packaging><name>spring_boot_kafka_demo Maven Webapp</name><url>http://maven.apache.org</url><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>4.0.0</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build>
</project>

第二步,編寫配置文件

編寫resources下的application.yml

spring:kafka:bootstrap-servers: localhost:9092consumer:auto-offset-reset: earliest

第三步,編寫生產者

@Service
public class Producer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String message) {kafkaTemplate.send("topic1", message);}@PostConstructpublic void init() {sendMessage("Hello, Kafka!");}
}

第四步,編寫消費者

@Component
public class Consumer {@KafkaListener(id = "myId", topics = "topic1")public void listen(String in) {System.out.println(in);}
}

第五步,編寫啟動類

@SpringBootApplication
public class Application {public static void main(String[] args) {SpringApplication.run(Application.class, args);}}

運行效果

在這里插入圖片描述

2.4 記錄日志到kafka中

第一步,在2.3的基礎上,添加依賴

<dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>2.0.12</version> <!-- Spring Boot 3.x 推薦版本 -->
</dependency>

第二步,添加kafka的日志appender類

public class KafkaLogbackAppender extends UnsynchronizedAppenderBase<ILoggingEvent> {private String topic = "application-logs";private String bootstrapServers = "localhost:9092";private KafkaProducer<String, String> producer;@Overridepublic void start() {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());this.producer = new KafkaProducer<>(props);super.start();}@Overrideprotected void append(ILoggingEvent eventObject) {String msg = eventObject.getFormattedMessage();producer.send(new ProducerRecord<>(topic, msg));}@Overridepublic void stop() {if (producer != null) {producer.close();}super.stop();}// Getter and Setter for XML configpublic void setTopic(String topic) {this.topic = topic;}public void setBootstrapServers(String bootstrapServers) {this.bootstrapServers = bootstrapServers;}
}

第三步,在resources下添加logback-spring.xml文件

<configuration debug="false" scan="true" scanPeriod="30 seconds"><!-- 定義日志格式 --><property name="PATTERN" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n"/><!-- 控制臺輸出 --><appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"><encoder><pattern>${PATTERN}</pattern></encoder></appender><!-- Kafka Appender --><appender name="KAFKA" class="com.demo.KafkaLogbackAppender"><bootstrapServers>localhost:9092</bootstrapServers><topic>application-logs</topic></appender><!-- 根日志輸出 --><root level="info"><appender-ref ref="STDOUT"/><appender-ref ref="KAFKA"/></root></configuration>

第四步,修改Producer類

@Service
public class Producer {private static final Logger logger = LoggerFactory.getLogger(Producer.class);@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String message) {kafkaTemplate.send("topic1", message);}@PostConstructpublic void init() {sendMessage("Hello, Kafka!");logger.info("Message sent");logger.info("Message sent");logger.info("Message sent");logger.info("Message sent");logger.info("Message sent");}
}

第五步,修改Consumer類

@Component
public class Consumer {@KafkaListener(id = "myId", topics = "topic1")public void listen(String in) {System.out.println(in);}@KafkaListener(id = "myId2", topics = "application-logs")public void listen2(String in) {System.out.println("resinfo:"+in);}
}
http://www.risenshineclean.com/news/46679.html

相關文章:

  • 溧陽網站設計唐山seo排名
  • 做安全題目是哪個網站長沙百度網站快速排名
  • 蘇州網站建設設計公司免費網站建設制作
  • 做內貿只要有什么網絡推廣網站搜索引擎技術基礎
  • 網站功能測試方法怎么做好公司官網推廣
  • 手機怎么做網站免費的百度學術官網入口
  • 電腦維修 做網站小廣告圖片
  • 網站建設系統(tǒng) 招標西安百度seo推廣電話
  • 濟寧做網站有哪幾家職業(yè)培訓機構需要什么資質
  • 招商平臺公司山西seo排名
  • 做棋牌推廣網站違法不網絡推廣站
  • 天津網站建設定制公司免費seo網站推廣在線觀看
  • 什么視頻網站可以做鏈接深圳網站開發(fā)公司
  • 商河做網站多少錢my77728域名查詢
  • 做網站要學多久電商運營
  • 為男人做購物網站攜程: 2023年旅行搜索上漲超900%
  • 網站建設招標公告首頁排名關鍵詞優(yōu)化
  • 鄭州網站建設方案書網絡的推廣
  • 微網站制作公司佛山疫情最新消息
  • 濰坊做企業(yè)手機版網站濰坊在線制作網站
  • 網站設計美工多少網絡推廣電話
  • 中化建工北京建設投資有限公司網站最新的疫情信息
  • 企業(yè)網站類型市場營銷互聯(lián)網營銷
  • 展示網站報價搜索引擎環(huán)境優(yōu)化
  • 什么網站可以做引文分析開發(fā)一個網站需要哪些技術
  • 酒泉網站建設平臺百度網盤pc網頁版入口
  • 哈爾濱網站建設與管理今日國際新聞頭條15條
  • 濱海住房和城鄉(xiāng)建設局網站谷歌查詢關鍵詞的工具叫什么
  • 做網站的圖片需要多少錢下載百度極速版免費安裝
  • 云南省建設廳招標辦網站網絡營銷活動策劃