中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁 > news >正文

趣聞網(wǎng)站如何做建設(shè)網(wǎng)站流程

趣聞網(wǎng)站如何做,建設(shè)網(wǎng)站流程,大連做網(wǎng)站電話,珠海網(wǎng)站哪家好文章目錄 0.前置說明1. confluent-kafka-go2. sarama3. segmentio/kafka-go4. franz-go選擇建議 1.啟動 kafka 集群2.安裝 confluent-kafka-go 庫3.創(chuàng)建生產(chǎn)者特殊文件說明如何查看.log文件內(nèi)容 4.創(chuàng)建消費(fèi)者 0.前置說明 Go 語言中有一些流行的 Kafka 客戶端庫。以下是幾個常用…

文章目錄

  • 0.前置說明
    • 1. confluent-kafka-go
    • 2. sarama
    • 3. segmentio/kafka-go
    • 4. franz-go
      • 選擇建議
  • 1.啟動 kafka 集群
  • 2.安裝 confluent-kafka-go 庫
  • 3.創(chuàng)建生產(chǎn)者
    • 特殊文件說明
    • 如何查看.log文件內(nèi)容
  • 4.創(chuàng)建消費(fèi)者

0.前置說明

Go 語言中有一些流行的 Kafka 客戶端庫。以下是幾個常用的庫及其優(yōu)劣與區(qū)別:

1. confluent-kafka-go

  • 優(yōu)點(diǎn)

    • 高性能:基于 librdkafka,性能非常高。
    • 功能全面:支持 Kafka 的所有高級功能,如事務(wù)、壓縮、認(rèn)證等。
    • 社區(qū)支持:由 Confluent 維護(hù),社區(qū)活躍,文檔豐富。
    • 穩(wěn)定性:廣泛使用于生產(chǎn)環(huán)境,經(jīng)過大量測試和驗(yàn)證。
  • 缺點(diǎn)

    • 依賴性:依賴于 librdkafka,需要額外安裝該庫。
    • 復(fù)雜性:配置和使用相對復(fù)雜,特別是對于新手。

2. sarama

  • 優(yōu)點(diǎn)

    • 純 Go 實(shí)現(xiàn):不依賴于任何 C 庫,安裝和使用非常方便。
    • 社區(qū)活躍:由 Shopify 維護(hù),社區(qū)支持良好,文檔齊全。
    • 靈活性:提供了豐富的配置選項(xiàng),適用于各種使用場景。
  • 缺點(diǎn)

    • 性能:相對于 confluent-kafka-go,性能稍遜一籌。
    • 功能:不支持 Kafka 的一些高級功能,如事務(wù)。

3. segmentio/kafka-go

  • 優(yōu)點(diǎn)

    • 純 Go 實(shí)現(xiàn):不依賴于任何 C 庫,安裝和使用非常方便。
    • 簡潔易用:API 設(shè)計(jì)簡潔,易于上手。
    • 靈活性:支持多種配置選項(xiàng),適用于各種使用場景。
  • 缺點(diǎn)

    • 性能:相對于 confluent-kafka-go,性能稍遜一籌。
    • 功能:不支持 Kafka 的一些高級功能,如事務(wù)。

4. franz-go

  • 優(yōu)點(diǎn)

    • 純 Go 實(shí)現(xiàn):不依賴于任何 C 庫,安裝和使用非常方便。
    • 高性能:在純 Go 實(shí)現(xiàn)中性能較為優(yōu)越。
    • 功能全面:支持 Kafka 的大部分功能,包括事務(wù)。
  • 缺點(diǎn)

    • 社區(qū)支持:相對于 saramaconfluent-kafka-go,社區(qū)支持稍弱。
    • 文檔:文檔相對較少,需要更多的社區(qū)貢獻(xiàn)。

選擇建議

  • 高性能和高級功能需求:如果你需要高性能和 Kafka 的高級功能(如事務(wù)、壓縮、認(rèn)證等),confluent-kafka-go 是一個不錯的選擇。
  • 純 Go 實(shí)現(xiàn)和易用性:如果你更傾向于使用純 Go 實(shí)現(xiàn)的庫,并且希望安裝和使用更加簡便,可以選擇 saramasegmentio/kafka-go。
  • 平衡性能和功能:如果你希望在純 Go 實(shí)現(xiàn)中獲得較好的性能和功能支持,可以考慮 franz-go。

本文我們就以confluent-kafka-go庫為例來編寫代碼。

1.啟動 kafka 集群

不知道如何搭建集群請點(diǎn)擊這里 ----》Kafka 集群部署(CentOS 單機(jī)模擬版)

如果你懶得啟動集群,那么直接跳過。

  1. cluster目錄下運(yùn)行集群啟動腳本 cluster.sh;
cd cluster
./cluster.sh
  1. 檢查是否啟動成功;
ll zookeeper-data/
total 4
drwxr-xr-x 3 root root 4096 May 27 10:20 zookeeperll broker-data/
total 12
drwxr-xr-x 2 root root 4096 May 27 10:21 broker-1
drwxr-xr-x 2 root root 4096 May 27 10:21 broker-2
drwxr-xr-x 2 root root 4096 May 27 10:21 broker-3

2.安裝 confluent-kafka-go 庫

  1. 查看你的go工作目錄
echo $GOPATH
  1. GOPATH目錄下的src目錄下新建 produce 項(xiàng)目
mkdir src/produce
cd src/produce
  1. 在你的項(xiàng)目目錄中運(yùn)行 go mod init 命令來初始化一個新的 Go 模塊
go mod init produce
  1. 安裝 confluent-kafka-go
go get github.com/confluentinc/confluent-kafka-go/kafka

3.創(chuàng)建生產(chǎn)者

  1. 新建文件 producer.go
touch producer.go
  1. 編寫代碼
package mainimport ("fmt""log""github.com/confluentinc/confluent-kafka-go/kafka"
)func main() {// 創(chuàng)建生產(chǎn)者實(shí)例broker := "localhost:9091" // 集群地址topic := "test"            // 主題名稱producer, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker}) // 創(chuàng)建生產(chǎn)者實(shí)例// 檢查錯誤if err != nil {log.Fatalf("Failed to create producer: %s", err)}defer producer.Close()fmt.Printf("Created Producer %v\n", producer)// 生產(chǎn)消息message := "hello kafka"for i := 0; i < 10; i++ {producer.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, // 任題名稱Value:          []byte(message + fmt.Sprintf("%d", i)),                             // 消息內(nèi)容}, nil)}if err != nil {log.Fatalf("Failed to produce message: %v", err)}// 等待消息發(fā)送完成e := <-producer.Events() // 阻塞直到消息發(fā)送完成switch ev := e.(type) {case *kafka.Message:if ev.TopicPartition.Error != nil {log.Printf("Failed to deliver message: %v", ev.TopicPartition)} else {fmt.Printf("Delivered message: %s to %v\n", string(ev.Value), ev.TopicPartition)}}// 沖刷緩沖區(qū)消息producer.Flush(15 * 1000)
}

代碼說明

  1. 創(chuàng)建生產(chǎn)者時需要指定集群地址以及主題信息,如果沒有該主題則自動創(chuàng)建。
  2. 生產(chǎn)者會異步地將消息發(fā)送到 Kafka,因此你需要處理交付報(bào)告以確保消息成功發(fā)送。

我們需要了解一下Go語言和Kafka之間的關(guān)系:Go是一種靜態(tài)類型、編譯型的編程語言,由Google開發(fā)并開源。它適用于構(gòu)建高性能服務(wù)器端應(yīng)用程序和網(wǎng)絡(luò)服務(wù)。而Apache Kafka是一個分布式流處理平臺,主要面向大規(guī)模數(shù)據(jù)傳輸和存儲。

在這個例子中,我們有一個生產(chǎn)者程序,它使用Kafka的客戶端庫來連接到Kafka集群,然后通過創(chuàng)建一個生產(chǎn)者實(shí)例來開始發(fā)送消息。當(dāng)生產(chǎn)者準(zhǔn)備好要發(fā)送的消息時,它就會調(diào)用Send()方法將其添加到緩沖區(qū)中。一旦緩沖區(qū)滿了或者用戶主動觸發(fā)了Flush()方法,生產(chǎn)者就會把緩沖區(qū)里的所有消息一起發(fā)送給Kafka集群。

  1. 編譯運(yùn)行,生產(chǎn)者發(fā)送消息
go build producer.go 
./producer 
Created Producer rdkafka#producer-1
Delivered message: hello kafka0 to test[0]@0
  1. 查看消息
ll cluster/broker-data/broker-1
total 20
-rw-r--r-- 1 root root    0 May 27 10:20 cleaner-offset-checkpoint
-rw-r--r-- 1 root root    4 May 27 11:36 log-start-offset-checkpoint
-rw-r--r-- 1 root root   88 May 27 10:20 meta.properties
-rw-r--r-- 1 root root   13 May 27 11:36 recovery-point-offset-checkpoint
-rw-r--r-- 1 root root   14 May 27 11:36 replication-offset-checkpoint
drwxr-xr-x 2 root root 4096 May 27 11:21 test-0 # 我們創(chuàng)建的主題 數(shù)字代表分區(qū)號ll cluster/broker-data/broker-1/test-0/
total 12
-rw-r--r-- 1 root root 10485760 May 27 11:21 00000000000000000000.index
-rw-r--r-- 1 root root      251 May 27 11:21 00000000000000000000.log
-rw-r--r-- 1 root root 10485756 May 27 11:21 00000000000000000000.timeindex
-rw-r--r-- 1 root root        8 May 27 11:21 leader-epoch-checkpoint
-rw-r--r-- 1 root root       43 May 27 11:21 partition.metadata

特殊文件說明

Kafka 的數(shù)據(jù)文件存儲在每個分區(qū)的目錄中,這些文件包括 .index、.log、.timeindexleader-epoch-checkpointpartition.metadata 文件。每個文件都有其特定的用途,下面是對這些文件的詳細(xì)解釋:

  1. .log 文件

    • 用途:存儲實(shí)際的消息數(shù)據(jù)。
    • 描述:這是 Kafka 中最重要的文件,包含了生產(chǎn)者發(fā)送到 Kafka 的消息。每個 .log 文件代表一個日志段(log segment),文件名通常是該段的起始偏移量(offset)。
  2. .index 文件

    • 用途:存儲消息偏移量到物理文件位置的映射。
    • 描述:這個文件是一個稀疏索引,允許 Kafka 快速查找特定偏移量的消息。通過這個索引,Kafka 可以避免從頭開始掃描整個日志文件,從而提高查找效率。
  3. .timeindex 文件

    • 用途:存儲消息時間戳到物理文件位置的映射。
    • 描述:這個文件允許 Kafka 根據(jù)時間戳快速查找消息。它是一個稀疏索引,類似于 .index 文件,但索引的是時間戳而不是偏移量。
  4. leader-epoch-checkpoint 文件

    • 用途:記錄分區(qū)的領(lǐng)導(dǎo)者紀(jì)元(leader epoch)信息。
    • 描述:這個文件包含了每個紀(jì)元的起始偏移量。領(lǐng)導(dǎo)者紀(jì)元是 Kafka 用來跟蹤分區(qū)領(lǐng)導(dǎo)者變化的機(jī)制。每次分區(qū)領(lǐng)導(dǎo)者發(fā)生變化時,紀(jì)元號會增加。這個文件幫助 Kafka 在領(lǐng)導(dǎo)者變更時進(jìn)行數(shù)據(jù)恢復(fù)和一致性檢查。
  5. partition.metadata 文件

    • 用途:存儲分區(qū)的元數(shù)據(jù)信息。
    • 描述:這個文件包含了分區(qū)的一些基本信息,如分區(qū)的版本號等。它幫助 Kafka 管理和維護(hù)分區(qū)的元數(shù)據(jù)。

這些文件共同作用,確保 Kafka 能夠高效、可靠地存儲和檢索消息數(shù)據(jù)。

如何查看.log文件內(nèi)容

  • 執(zhí)行指令
 ~/cluster/broker-1/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.log --print-data-log
~/cluster/broker-1/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.log --print-data-log
Dumping ./00000000000000000000.log
Log starting offset: 0
baseOffset: 0 lastOffset: 9 count: 10 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 0 CreateTime: 1716780091840 size: 251 magic: 2 compresscodec: none crc: 997822510 isvalid: true
| offset: 0 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka0
| offset: 1 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka1
| offset: 2 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka2
| offset: 3 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka3
| offset: 4 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka4
| offset: 5 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka5
| offset: 6 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka6
| offset: 7 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka7
| offset: 8 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka8
| offset: 9 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka9

如上我們可以看到消息已經(jīng)成功的發(fā)送。

4.創(chuàng)建消費(fèi)者

  1. 創(chuàng)建消費(fèi)者項(xiàng)目
mkdir src/consume
cd src/consume
  1. 在你的項(xiàng)目目錄中運(yùn)行 go mod init 命令來初始化一個新的 Go 模塊
go mod init consume
  1. 安裝 confluent-kafka-go
go get github.com/confluentinc/confluent-kafka-go/kafka
  1. 新建文件
touch consumer.go
  1. 編寫代碼
package mainimport ("fmt""log""github.com/confluentinc/confluent-kafka-go/kafka"
)func main() {// 創(chuàng)建消費(fèi)者實(shí)例broker := "localhost:9091" // 集群地址topic := "test"            // 主題名稱c, err := kafka.NewConsumer(&kafka.ConfigMap{"bootstrap.servers": broker,     // 集群地址"group.id":          "my-group", // 消費(fèi)者組"auto.offset.reset": "earliest", // 設(shè)置偏移量 從頭開始消費(fèi)})// 檢查錯誤if err != nil {log.Printf("Failed to create consumer: %s\n", err)}defer c.Close()// 描述訂閱主題c.SubscribeTopics([]string{topic}, nil)fmt.Printf("Consuming topic %s\n", topic)// 消費(fèi)消息for {msg, err := c.ReadMessage(-1) // 阻塞直到消息到達(dá)if err == nil {fmt.Printf("Consumed message: %s\n", msg.Value)} else {// 消費(fèi)者錯誤fmt.Printf("Consumer error: %v (%v)\n", err, msg)}}
}
  1. 編譯并運(yùn)行
go build consumer.go 
./consumer 
Consuming topic test
Consumed message: hello kafka0
Consumed message: hello kafka1
Consumed message: hello kafka2
Consumed message: hello kafka3
Consumed message: hello kafka4
Consumed message: hello kafka5
Consumed message: hello kafka6
Consumed message: hello kafka7
Consumed message: hello kafka8
Consumed message: hello kafka9

可以看到已經(jīng)成功的消費(fèi)剛才生產(chǎn)的消息。

http://www.risenshineclean.com/news/42850.html

相關(guān)文章:

  • 網(wǎng)站背景怎么設(shè)置seo教學(xué)視頻教程
  • 如何制作和設(shè)計(jì)公司網(wǎng)站seo關(guān)鍵詞排名教程
  • 晉江網(wǎng)站有什么職業(yè)做百度網(wǎng)站站長工具
  • 系統(tǒng)開發(fā)費(fèi)用seo模擬點(diǎn)擊算法
  • 淘寶做網(wǎng)站費(fèi)用5118營銷大數(shù)據(jù)
  • 網(wǎng)站開發(fā)與移動互聯(lián)seo和競價(jià)排名的區(qū)別
  • 有做喜糖的網(wǎng)站嗎網(wǎng)絡(luò)工程師
  • 購物網(wǎng)站難做嗎網(wǎng)站優(yōu)化關(guān)鍵詞價(jià)格
  • 微信公眾號排版appseo的收費(fèi)標(biāo)準(zhǔn)
  • 做網(wǎng)站基本費(fèi)用大概需要多少sem專員
  • 做app和網(wǎng)站哪個比較好用免費(fèi)推廣有哪些
  • 怎么做網(wǎng)站編程web網(wǎng)頁
  • 手機(jī)應(yīng)用軟件開發(fā)seo在線教程
  • 鞋店網(wǎng)站建設(shè)方案石家莊市人民政府官網(wǎng)
  • 淘寶客網(wǎng)站怎么備案新手小白怎么學(xué)做運(yùn)營
  • 企業(yè)網(wǎng)站如何做seo全國十大跨境電商公司排名
  • 做視頻點(diǎn)播網(wǎng)站要多少帶寬今日重慶重要消息
  • 58這種網(wǎng)站怎么做nba實(shí)力榜最新排名
  • 虎門專業(yè)網(wǎng)站建設(shè)seo群發(fā)軟件
  • 保定中小企業(yè)網(wǎng)站制作推廣普通話內(nèi)容50字
  • 網(wǎng)站的注冊和登錄怎么做友情鏈接聯(lián)盟
  • 開源快速網(wǎng)站搭建平臺磁力寶最佳搜索引擎入口
  • wordpress社團(tuán)網(wǎng)站今日頭條最新
  • 視頻網(wǎng)站外鏈怎么做搜什么關(guān)鍵詞比較刺激
  • 西部數(shù)碼網(wǎng)站管理助手4.0 破解版鏈接交易網(wǎng)
  • 簡單個人網(wǎng)頁制作成品手機(jī)優(yōu)化管家
  • 網(wǎng)站建設(shè)外包工作推廣游戲賺錢的平臺
  • 招聘企業(yè)網(wǎng)站建設(shè)模塊關(guān)鍵詞排名優(yōu)化是什么意思
  • 南通旅游網(wǎng)站建設(shè)一鍵免費(fèi)建站
  • 搜索網(wǎng)站開發(fā)背景廣告留電話號的網(wǎng)站