中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁 > news >正文

專業(yè)做網(wǎng)站app的公司有哪些有品質(zhì)的網(wǎng)站推廣公司

專業(yè)做網(wǎng)站app的公司有哪些,有品質(zhì)的網(wǎng)站推廣公司,網(wǎng)站建設(shè)找哪里,大連警方最新通告文章目錄 01 KafkaSink 版本&導(dǎo)言02 KafkaSink 基本概念03 KafkaSink 工作原理1.初始化連接2.定義序列化模式3.創(chuàng)建KafkaSink算子4.創(chuàng)建數(shù)據(jù)源5.將數(shù)據(jù)流添加到KafkaSink6.內(nèi)部工作機(jī)制 04 KafkaSink參數(shù)配置05 KafkaSink 應(yīng)用依賴06 KafkaSink 快速入門6.1 包結(jié)構(gòu)6.2 項目…

文章目錄

  • 01 KafkaSink 版本&導(dǎo)言
  • 02 KafkaSink 基本概念
  • 03 KafkaSink 工作原理
    • 1.初始化連接
    • 2.定義序列化模式
    • 3.創(chuàng)建KafkaSink算子
    • 4.創(chuàng)建數(shù)據(jù)源
    • 5.將數(shù)據(jù)流添加到KafkaSink
    • 6.內(nèi)部工作機(jī)制
  • 04 KafkaSink參數(shù)配置
  • 05 KafkaSink 應(yīng)用依賴
  • 06 KafkaSink 快速入門
    • 6.1 包結(jié)構(gòu)
    • 6.2 項目配置
    • 6.3 pom文件
    • 6.4 Flink集成KafkaSink作業(yè)
    • 6.5 驗證
  • 07 總結(jié)

01 KafkaSink 版本&導(dǎo)言

Flink版本:

本文主要是基于Flink1.14.4 版本

導(dǎo)言:

Apache Flink 作為流式處理領(lǐng)域的先鋒,為實時數(shù)據(jù)處理提供了強(qiáng)大而靈活的解決方案。其中,KafkaSink 是 Flink 生態(tài)系統(tǒng)中的關(guān)鍵組件之一,扮演著將 Flink 處理的數(shù)據(jù)可靠地發(fā)送到 Kafka 主題的角色。本文將深入探討 KafkaSink 的工作原理、配置和最佳實踐,幫助讀者全面掌握在 Flink 中使用 KafkaSink 的技巧和方法。

02 KafkaSink 基本概念

KafkaSink 是 Apache Flink 提供的用于將流式數(shù)據(jù)發(fā)送到 Kafka 的連接器。它允許 Flink 應(yīng)用程序?qū)⒔?jīng)過處理的數(shù)據(jù)以高效和可靠的方式傳輸?shù)?Kafka 主題,從而實現(xiàn)流處理與消息隊列的無縫集成。

特性和優(yōu)勢:

  1. Exactly-Once 語義: KafkaSink 提供 Exactly-Once 語義,確保數(shù)據(jù)不會丟失,也不會重復(fù)寫入 Kafka 主題。這是通過 Flink 提供的端到端一致性保障的一部分。
  2. 高性能: KafkaSink 被設(shè)計為高性能的組件,能夠處理大規(guī)模的數(shù)據(jù)流,并以低延遲將數(shù)據(jù)發(fā)送到 Kafka。其底層使用 Kafka 生產(chǎn)者 API,充分利用 Kafka 的并發(fā)性和批量處理能力。
  3. 配置靈活: 用戶可以通過配置參數(shù)定制 KafkaSink 的行為,包括 Kafka 服務(wù)器地址、主題名稱、生產(chǎn)者配置等。這種靈活性使得 KafkaSink 可以適應(yīng)不同場景和需求。
  4. Exactly-Once Sink Semantics: KafkaSink 通過 Kafka 生產(chǎn)者的事務(wù)支持,確保在發(fā)生故障時能夠保持?jǐn)?shù)據(jù)的一致性,即使在 Flink 任務(wù)重新啟動后也能繼續(xù)從上次中斷的地方進(jìn)行。

03 KafkaSink 工作原理

KafkaSink是Apache Flink中用于將流式數(shù)據(jù)寫入Apache Kafka的關(guān)鍵組件。其工作原理涉及幾個主要步驟,同時我將介紹一些源碼片段以解釋其內(nèi)部實現(xiàn)。

1.初始化連接

用戶需要配置Kafka連接屬性,包括Kafka服務(wù)器地址、序列化器等。在Flink中,這通常通過創(chuàng)建Properties對象來完成。

 // 創(chuàng)建KafksSink配置Properties properties = new Properties();properties.setProperty(ProducerConfig.ACKS_CONFIG, "1");properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "0");properties.setProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000");

2.定義序列化模式

KafkaRecordSerializationSchema 是 Apache Flink 中用于將數(shù)據(jù)流轉(zhuǎn)換為 Kafka 記錄(record)的序列化模式(Serialization Schema)。它允許將 Flink 數(shù)據(jù)流中的元素轉(zhuǎn)換為 Kafka 生產(chǎn)者記錄,并定義了如何序列化元素的邏輯。

在 Flink 中,當(dāng)你想要將數(shù)據(jù)發(fā)送到 Kafka 主題,需要一個序列化模式來將 Flink 數(shù)據(jù)流中的元素序列化為 Kafka 記錄。而 KafkaRecordSerializationSchema 就是為此目的而設(shè)計的。

// 序列化模式
KafkaRecordSerializationSchema<String> recordSerializer = KafkaRecordSerializationSchema.builder()//設(shè)置對哪個主題進(jìn)行序列化.setTopic("topic_a")//設(shè)置數(shù)據(jù)值序列化方式.setValueSerializationSchema(new SimpleStringSchema())//設(shè)置數(shù)據(jù)key序列化方式.setKeySerializationSchema(new SimpleStringSchema()).build();

3.創(chuàng)建KafkaSink算子

使用Flink提供的KafkaSink類創(chuàng)建一個Kafka生產(chǎn)者實例。以下是簡化的源碼片段,展示了如何創(chuàng)建實例:

注意:如果傳遞保證選擇Exactly Once (精確一次),需要設(shè)置 客戶端的超時時間 ,否則會報錯

Caused by: org.apache.kafka.common.KafkaException: Unexpected error in InitProducerIdResponse; The transaction timeout is larger than the maximum value allowed by the broker (as configured by transaction.max.timeout.ms),需要設(shè)置 transaction.timeout.ms 小于15分鐘,后續(xù)會專門出一篇關(guān)于這個傳遞保證的博客講述。

// 創(chuàng)建KafkaSink算子
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()//設(shè)置kafka各種參數(shù).setKafkaProducerConfig(properties)//設(shè)置序列化模式.setRecordSerializer(recordSerializer)//設(shè)置傳遞保證//At Most Once (至多一次): 系統(tǒng)保證消息要么被成功傳遞一次,要么根本不被傳遞。這種保證意味著消息可能會丟失,但不會被傳遞多//At Least Once (至少一次): 系統(tǒng)保證消息至少會被傳遞一次,但可能會導(dǎo)致消息的重復(fù)傳遞。這種保證確保了消息的不丟失,但應(yīng)用//Exactly Once (精確一次): 系統(tǒng)保證消息會被確切地傳遞一次,而沒有任何重復(fù)。這是最高級別的傳遞保證,確保消息不會丟失且不會.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)//設(shè)置集群地址.setBootstrapServers("127.0.0.1:9092")//設(shè)置事務(wù)前綴.setTransactionalIdPrefix("flink_").build();

4.創(chuàng)建數(shù)據(jù)源

創(chuàng)建數(shù)據(jù)源,每隔1000ms下發(fā)一筆數(shù)據(jù)

// 生成一個數(shù)據(jù)流
SourceFunction<String> sourceFunction = new SourceFunction<String>() {@Overridepublic void run(SourceContext<String> sourceContext) throws Exception {while (true) {String id = UUID.randomUUID().toString();sourceContext.collect( id);logger.info("正在下發(fā)數(shù)據(jù):{}",id);Thread.sleep(1000);}}@Overridepublic void cancel() {}// 創(chuàng)建數(shù)據(jù)源
DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction).setParallelism(1);

5.將數(shù)據(jù)流添加到KafkaSink

在Flink應(yīng)用程序中,通過addSink()方法將要寫入Kafka主題數(shù)據(jù)流添加到KafkaSink,以下是一個簡化的示例:

// 數(shù)據(jù)流數(shù)據(jù)通過KafkaSink算子寫入kafka
dataStreamSource.sinkTo(kafkaSink).setParallelism(1);// 執(zhí)行任務(wù)
env.execute("KafkaSinkStreamJobDemo");

6.內(nèi)部工作機(jī)制

KafkaSink會將接收到的數(shù)據(jù)流分區(qū)為若干個并行的數(shù)據(jù)流,每個并行數(shù)據(jù)流由一個Kafka生產(chǎn)者實例負(fù)責(zé)向Kafka主題寫入數(shù)據(jù)。這樣可以提高寫入的吞吐量和并行度。

以下是源碼中的一部分,展示了KafkaSink是如何將數(shù)據(jù)發(fā)送到Kafka的:

@Override
public void invoke(IN value, Context context) throws Exception {// 將數(shù)據(jù)發(fā)送到Kafka主題producer.send(new ProducerRecord<>(topic, value.toString()));
}

KafkaSink的源碼相對復(fù)雜,涉及到與Kafka的交互、并行處理、容錯等方面的實現(xiàn)。

總的來說,KafkaSink通過整合Flink和Kafka的功能,提供了一種高效、可靠的方式將流式數(shù)據(jù)寫入Kafka主題,適用于各種實時數(shù)據(jù)處理場景。

04 KafkaSink參數(shù)配置

需要根據(jù)具體的安全需求和環(huán)境配置 Kafka 的安全性參數(shù)。建議查閱最新版本的 Kafka 文檔以獲取詳細(xì)的安全配置指南:https://kafka.apache.org/documentation/#producerconfigs

在 Apache Flink 中,ProducerConfig 是用于配置 Kafka 生產(chǎn)者的類,它是 Kafka 客戶端庫中的一部分。下面是一些常見的配置選項及其解釋:

bootstrap.servers

集群的地址列表,用于初始化連接。生產(chǎn)者會從這些服務(wù)器中選擇一個 broker 進(jìn)行連接。

public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";

metadata.max.age.ms

元數(shù)據(jù)的最大緩存時間。在此時間內(nèi),生產(chǎn)者將重復(fù)使用已經(jīng)獲取的元數(shù)據(jù),而不會向服務(wù)器發(fā)送新的元數(shù)據(jù)請求

public static final String METADATA_MAX_AGE_CONFIG = "metadata.max.age.ms";

batch.size

控制批量發(fā)送到 Kafka 的消息大小。當(dāng)消息積累到一定大小時,生產(chǎn)者會將它們一起發(fā)送到 Kafka 以提高效率

public static final String BATCH_SIZE_CONFIG = "batch.size";

acks

消息確認(rèn)機(jī)制,控制生產(chǎn)者收到確認(rèn)的方式??梢允恰癮ll”(所有副本都確認(rèn)),“1”(至少一個副本確認(rèn))或“0”(不需要確認(rèn))

public static final String ACKS_CONFIG = "acks";

linger.ms

生產(chǎn)者在發(fā)送批量消息前等待的時間,以使更多的消息聚合成一個批次。默認(rèn)是0,表示立即發(fā)送

public static final String LINGER_MS_CONFIG = "linger.ms";

request.timeout.ms

發(fā)送請求到 Kafka 服務(wù)器的超時時間

public static final String REQUEST_TIMEOUT_MS_CONFIG = "request.timeout.ms";

delivery.timeout.ms

這個參數(shù)在 Kafka 生產(chǎn)者的配置中是存在的,它表示生產(chǎn)者在發(fā)送消息后等待生產(chǎn)者確認(rèn)的最大時間。如果在這段時間內(nèi)沒有收到確認(rèn),生產(chǎn)者將重試發(fā)送消息或者拋出異常,具體取決于 retries 參數(shù)的配置

public static final String DELIVERY_TIMEOUT_MS_CONFIG = "delivery.timeout.ms";

client.id

用于區(qū)分不同生產(chǎn)者實例的客戶端 ID

public static final String CLIENT_ID_CONFIG = "client.id";

send.buffer.bytes

Kafka 消費者用于網(wǎng)絡(luò) socket 發(fā)送數(shù)據(jù)的緩沖區(qū)大小

public static final String SEND_BUFFER_CONFIG = "send.buffer.bytes";

receive.buffer.bytes

Kafka 消費者用于網(wǎng)絡(luò) socket 接收數(shù)據(jù)的緩沖區(qū)大小

public static final String RECEIVE_BUFFER_CONFIG = "receive.buffer.bytes";

max.request.size

單個請求發(fā)送的最大字節(jié)數(shù)

public static final String MAX_REQUEST_SIZE_CONFIG = "max.request.size";

reconnect.backoff.ms

用于控制在與 Kafka 服務(wù)器連接斷開后重新連接的時間間隔。具體來說,它定義了在發(fā)起重新連接嘗試之間等待的時間量,以毫秒為單位。如果連接失敗,生產(chǎn)者將在此時間間隔之后嘗試重新連接到 Kafka 服務(wù)器

public static final String RECONNECT_BACKOFF_MS_CONFIG = "reconnect.backoff.ms";

reconnect.backoff.max.ms

用于控制重新連接的最大退避時間。具體來說,它定義了在發(fā)起重新連接嘗試之間等待的最長時間量,以毫秒為單位。如果連接失敗,生產(chǎn)者將在此時間間隔之后嘗試重新連接到 Kafka 服務(wù)器

public static final String RECONNECT_BACKOFF_MAX_MS_CONFIG = "reconnect.backoff.max.ms";

max.block.ms

當(dāng) Kafka 隊列已滿時,生產(chǎn)者將阻塞的最長時間(毫秒),超時后會拋出異常

public static final String MAX_BLOCK_MS_CONFIG = "max.block.ms";

buffer.memory

生產(chǎn)者用于緩沖等待發(fā)送到服務(wù)器的消息的內(nèi)存大小。默認(rèn)是33554432字節(jié)(32MB)

public static final String BUFFER_MEMORY_CONFIG = "buffer.memory";

retries

生產(chǎn)者發(fā)送失敗后的重試次數(shù)。默認(rèn)是0,表示不重試

public static final String RETRIES_CONFIG = "retries";

key.serializer

用于序列化消息鍵的序列化器類。通常是指實現(xiàn)了Serializer接口的類的全限定名

public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer";

value.serializer

用于序列化消息值的序列化器類

public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer";

connections.max.idle.ms

客戶端與服務(wù)器保持空閑連接的最長時間(毫秒)。默認(rèn)值為 540000(即 9 分鐘)。例如:"900000" 表示客戶端與服務(wù)器保持空閑連接的最長時間為 15 分鐘

public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = "connections.max.idle.ms";

partitioner.class

用于指定消息將被發(fā)送到哪個分區(qū)的算法,即分區(qū)器的實現(xiàn)類。Kafka 中的主題(topic)通常被劃分為多個分區(qū),每個分區(qū)都包含有序的消息序列。分區(qū)器決定了生產(chǎn)者發(fā)送的消息應(yīng)該被分配到哪個分區(qū)中。

通過配置 partitioner.class,用戶可以自定義分區(qū)算法,以滿足特定的業(yè)務(wù)需求。Kafka 提供了默認(rèn)的分區(qū)器,也允許用戶根據(jù)自己的邏輯實現(xiàn)自定義的分區(qū)器。

例如,以下是配置 partitioner.class 的示例:

partitioner.class=com.example.CustomPartitioner

在這個示例中,com.example.CustomPartitioner 是用戶自定義的分區(qū)器類的全限定名。該類必須實現(xiàn) Kafka 提供的 org.apache.kafka.clients.producer.Partitioner 接口,該接口定義了確定消息應(yīng)該被發(fā)送到哪個分區(qū)的方法。

自定義分區(qū)器可以根據(jù)消息的內(nèi)容、鍵(如果有)、以及其他上下文信息,靈活地決定消息應(yīng)該被發(fā)送到哪個分區(qū)。這樣的自定義分區(qū)策略可以幫助實現(xiàn)一些特定的業(yè)務(wù)邏輯,例如確保相關(guān)的消息被發(fā)送到相同的分區(qū),以提高消費的局部性。

在沒有顯式配置 partitioner.class 的情況下,Kafka 使用默認(rèn)的分區(qū)器,該分區(qū)器根據(jù)消息的鍵(如果有)或者采用輪詢的方式將消息平均分配到所有分區(qū)。

public static final String PARTITIONER_CLASS_CONFIG = "partitioner.class";

interceptor.classes

用于指定一組攔截器類。攔截器類是實現(xiàn) Kafka 接口 org.apache.kafka.clients.producer.ProducerInterceptor 或者 org.apache.kafka.clients.consumer.ConsumerInterceptor 的類,用于在生產(chǎn)者或消費者發(fā)送或接收消息之前或之后對消息進(jìn)行處理。

攔截器允許用戶對消息進(jìn)行自定義的預(yù)處理或后處理。這些操作可以包括但不限于:

  1. 對消息進(jìn)行加工、轉(zhuǎn)換、過濾。
  2. 在消息發(fā)送或接收之前或之后記錄日志。
  3. 對消息的時間戳或鍵進(jìn)行修改。

通過配置 interceptor.classes 參數(shù),可以指定一組攔截器類,并且它們將按順序應(yīng)用于每個消息。這樣的攔截器鏈?zhǔn)沟迷谙⑻幚磉^程中可以執(zhí)行多個不同的操作。

例如,以下是配置 interceptor.classes 的示例:

interceptor.classes=com.example.MyProducerInterceptor, com.example.MyConsumerInterceptor

在這個示例中,com.example.MyProducerInterceptorcom.example.MyConsumerInterceptor 是用戶定義的攔截器類的全限定名。這兩個類必須分別實現(xiàn) Kafka 提供的 org.apache.kafka.clients.producer.ProducerInterceptororg.apache.kafka.clients.consumer.ConsumerInterceptor 接口。

需要注意的是,攔截器類的順序很重要。攔截器將按照它們在 interceptor.classes 參數(shù)中聲明的順序依次應(yīng)用于每個消息。如果需要確保攔截器按照特定的順序應(yīng)用,可以通過配置參數(shù)來指定順序。

攔截器提供了一種靈活的方式來實現(xiàn)特定的消息處理邏輯,同時也允許用戶對消息進(jìn)行監(jiān)控和記錄。

public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes";

enable.idempotence

public static final String ENABLE_IDEMPOTENCE_CONFIG = "enable.idempotence";

transaction.timeout.ms

public static final String TRANSACTION_TIMEOUT_CONFIG = "transaction.timeout.ms";

transactional.id

用于啟用生產(chǎn)者的冪等性。冪等性是指對于同一個生產(chǎn)者實例,無論消息發(fā)送多少次,最終只會產(chǎn)生一條副本(實際上是一個冪等序列)的性質(zhì)。這可以防止由于網(wǎng)絡(luò)錯誤、重試或者生產(chǎn)者重新啟動等情況導(dǎo)致的重復(fù)消息。

啟用生產(chǎn)者的冪等性可以通過設(shè)置 enable.idempotence 參數(shù)為 true 來實現(xiàn)。例如:

enable.idempotence=true

啟用冪等性會自動設(shè)置一些與冪等性相關(guān)的配置,例如:

  • acks 配置將被設(shè)置為 “all”,確保所有的 ISR(In-Sync Replicas)都已經(jīng)接收到消息。
  • max.in.flight.requests.per.connection 將被設(shè)置為 1,以確保在一個連接上只有一個未確認(rèn)的請求。

冪等性對于確保消息傳遞的精確一次語義非常重要。在啟用冪等性的情況下,生產(chǎn)者會為每條消息分配一個唯一的序列號,以便在重試發(fā)生時 Broker 能夠正確地識別并去重重復(fù)的消息。

需要注意的是,啟用冪等性會對性能產(chǎn)生一些開銷,因為它引入了額外的序列號和一些額外的網(wǎng)絡(luò)開銷。在生產(chǎn)環(huán)境中,需要仔細(xì)評估冪等性對性能的影響,并根據(jù)實際需求權(quán)衡性能和可靠性。

public static final String TRANSACTIONAL_ID_CONFIG = "transactional.id";

security.providers

參數(shù)已經(jīng)被 Kafka 移除了。在較早的 Kafka 版本中,這個參數(shù)可能被用于指定安全性相關(guān)的提供者。然而,從 Kafka 2.0 開始,Kafka 已經(jīng)采用了基于 JAAS(Java Authentication and Authorization Service)的身份驗證和授權(quán)機(jī)制,這個參數(shù)不再被使用。

現(xiàn)在,Kafka 的安全性配置主要包括以下幾個方面:

  1. 身份驗證機(jī)制(Authentication Mechanisms):Kafka 支持多種身份驗證機(jī)制,如SSL/TLS、SASL(Simple Authentication and Security Layer)、OAuth等。通過配置 security.protocol 參數(shù)選擇所需的身份驗證機(jī)制。
  2. 授權(quán)機(jī)制(Authorization Mechanisms):Kafka 使用 ACL(Access Control Lists)來控制對主題和分區(qū)的訪問權(quán)限??梢酝ㄟ^配置 authorizer.class.name 參數(shù)選擇 ACL 的實現(xiàn)類。
  3. 加密通信(Encryption):可以通過配置 SSL/TLS 來對 Kafka 通信進(jìn)行加密,以保護(hù)數(shù)據(jù)在傳輸過程中的安全性。
  4. 客戶端配置(Client Configuration):客戶端需要根據(jù)服務(wù)端的安全配置進(jìn)行相應(yīng)的配置,如設(shè)置 SSL/TLS 的信任證書、SASL 的認(rèn)證信息等。

需要根據(jù)具體的安全需求和環(huán)境配置 Kafka 的安全性參數(shù)。建議查閱最新版本的 Kafka 文檔以獲取詳細(xì)的安全配置指南。

public static final String SECURITY_PROVIDERS_CONFIG = "security.providers";

retry.backoff.ms

用于定義在發(fā)生可重試的發(fā)送錯誤后,生產(chǎn)者在進(jìn)行重試之前等待的時間間隔,以毫秒為單位。

當(dāng)生產(chǎn)者發(fā)送消息到 Kafka 時,可能會遇到一些可重試的錯誤,例如網(wǎng)絡(luò)問題、Kafka 服務(wù)器繁忙等。retry.backoff.ms 允許在出現(xiàn)這些可重試錯誤后等待一段時間,然后再次嘗試發(fā)送消息,以避免頻繁的重試。這樣的設(shè)計有助于在短時間內(nèi)解決暫時性的問題,而不至于對 Kafka 服務(wù)器造成額外的負(fù)擔(dān)。

具體而言,如果發(fā)生了可重試的錯誤,生產(chǎn)者將等待 retry.backoff.ms 指定的時間間隔,然后進(jìn)行下一次重試。如果重試依然失敗,生產(chǎn)者可能會繼續(xù)進(jìn)行更多的重試,每次之間間隔逐漸增加,以避免過度壓力和頻繁的連接嘗試。

默認(rèn)情況下,retry.backoff.ms 的值通常是 100 毫秒,但可以根據(jù)實際需求和環(huán)境進(jìn)行調(diào)整

public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms";

compression.type

控制發(fā)送到 Kafka 的消息是否壓縮。可以是“none”、“gzip”、“snappy”或“l(fā)z4”

public static final String COMPRESSION_TYPE_CONFIG = "compression.type";

metrics.sample.window.ms

用于配置 Kafka Broker 的參數(shù),用于定義度量指標(biāo)(metrics)的采樣窗口的時間跨度,以毫秒為單位。

具體來說,這個參數(shù)指定了度量指標(biāo)的采樣窗口的持續(xù)時間。在這個時間段內(nèi),Kafka Broker 會收集和計算各種指標(biāo),比如吞吐量、延遲、請求處理時間等。然后,這些度量指標(biāo)可以被監(jiān)控工具或者外部系統(tǒng)使用,以便實時地監(jiān)控 Kafka Broker 的運(yùn)行狀態(tài)和性能指標(biāo)。

通過調(diào)整 metrics.sample.window.ms 這個參數(shù),可以改變度量指標(biāo)采樣的時間窗口長度,以適應(yīng)不同的監(jiān)控和性能分析需求。較短的采樣窗口可以提供更加實時的性能指標(biāo),但也會增加系統(tǒng)資源的開銷;而較長的采樣窗口則可以減少資源開銷,但會犧牲一些實時性。

默認(rèn)情況下,metrics.sample.window.ms 的值通常是 30000 毫秒(30秒),但根據(jù)具體的 Kafka 集群配置和監(jiān)控需求,可以進(jìn)行調(diào)整。

public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = "metrics.sample.window.ms";

metrics.num.samples

用于配置 Kafka Broker 的參數(shù),用于指定在每個度量指標(biāo)采樣窗口中收集的樣本數(shù)量。

具體來說,度量指標(biāo)(metrics)是用于監(jiān)視 Kafka Broker 運(yùn)行狀態(tài)和性能的關(guān)鍵數(shù)據(jù),比如吞吐量、延遲、請求處理時間等。而 metrics.num.samples 參數(shù)則控制了在每個采樣窗口內(nèi)收集多少個樣本。這些樣本可以用于計算度量指標(biāo)的平均值、最大值、最小值等統(tǒng)計信息。

通過調(diào)整 metrics.num.samples 這個參數(shù),可以平衡度量指標(biāo)的準(zhǔn)確性和資源消耗之間的權(quán)衡。較大的樣本數(shù)量可以提供更加準(zhǔn)確的度量指標(biāo)統(tǒng)計信息,但會增加系統(tǒng)資源的開銷;而較小的樣本數(shù)量則可以減少資源消耗,但可能會犧牲一些準(zhǔn)確性。

默認(rèn)情況下,metrics.num.samples 的值通常是 2,但根據(jù)具體的 Kafka 集群配置和監(jiān)控需求,可以進(jìn)行調(diào)整。

public static final String METRICS_NUM_SAMPLES_CONFIG = "metrics.num.samples";

metrics.recording.level

用于配置度量指標(biāo)(metrics)的記錄級別。這個參數(shù)決定了哪些度量指標(biāo)會被記錄和匯報。

具體來說,metrics.recording.level 可以設(shè)置為以下幾個級別之一:

  • INFO:記錄常規(guī)的度量指標(biāo),如吞吐量、延遲等。
  • DEBUG:記錄更詳細(xì)的度量指標(biāo)信息,可能包括更多的細(xì)節(jié)和較低級別的度量指標(biāo)。
  • TRACE:記錄非常詳細(xì)的度量指標(biāo)信息,包括所有細(xì)節(jié)和最低級別的度量指標(biāo)。

通過調(diào)整 metrics.recording.level 這個參數(shù),可以靈活地控制記錄的度量指標(biāo)的級別,以滿足不同場景下的監(jiān)控和分析需求。例如,在生產(chǎn)環(huán)境中,通常會將記錄級別設(shè)置為 INFO 或者 DEBUG,以便實時監(jiān)控 Kafka 集群的運(yùn)行狀態(tài)和性能指標(biāo);而在調(diào)試或者故障排查時,可以將記錄級別設(shè)置為 TRACE,以獲取更詳細(xì)的信息。

默認(rèn)情況下,metrics.recording.level 的值通常是 INFO,但可以根據(jù)具體的需求和環(huán)境進(jìn)行調(diào)整。

public static final String METRICS_RECORDING_LEVEL_CONFIG = "metrics.recording.level";

metric.reporters

用于指定要使用的度量指標(biāo)(metrics)報告器。度量指標(biāo)報告器負(fù)責(zé)將 Kafka Broker 收集到的度量指標(biāo)信息發(fā)送到指定的位置,以供監(jiān)控和分析使用。

具體來說,metric.reporters 參數(shù)接受一個逗號分隔的報告器類名列表,這些報告器類名必須實現(xiàn) Kafka 的 org.apache.kafka.common.metrics.MetricsReporter 接口。通過配置這個參數(shù),可以啟用不同的度量指標(biāo)報告器,并將度量指標(biāo)信息發(fā)送到不同的目的地,比如日志、JMX、Graphite、InfluxDB 等。

例如,可以使用以下配置啟用 JMX 報告器和日志報告器:

metric.reporters=jmx, kafka.metrics.KafkaMetricsReporter

這樣配置后,Kafka Broker 將同時使用 JMX 報告器和日志報告器,將度量指標(biāo)信息發(fā)送到 JMX 和日志中。

默認(rèn)情況下,metric.reporters 參數(shù)為空,表示不使用任何度量指標(biāo)報告器。在實際部署中,根據(jù)監(jiān)控和分析需求,可以配置不同的度量指標(biāo)報告器來收集和報告度量指標(biāo)信息。

public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters";

max.in.flight.requests.per.connection

用于控制在任何給定時間內(nèi)允許向單個 Broker 發(fā)送的未確認(rèn)請求的最大數(shù)量。

在 Kafka 中,生產(chǎn)者發(fā)送消息到 Broker 時,可以選擇等待服務(wù)器確認(rèn)(acknowledgement)消息發(fā)送成功后再發(fā)送下一條消息,或者繼續(xù)發(fā)送下一條消息而不等待前一條消息的確認(rèn)。當(dāng)生產(chǎn)者選擇繼續(xù)發(fā)送下一條消息時,這些未確認(rèn)的消息就會處于 “in-flight” 狀態(tài)。

max.in.flight.requests.per.connection 參數(shù)就是用來限制在這種情況下的未確認(rèn)請求的數(shù)量。如果未確認(rèn)請求的數(shù)量達(dá)到了這個限制,生產(chǎn)者將會阻塞,直到有一些請求被確認(rèn),才會繼續(xù)發(fā)送新的請求。

通過調(diào)整 max.in.flight.requests.per.connection 參數(shù),可以平衡生產(chǎn)者的吞吐量和消息傳遞的可靠性之間的權(quán)衡。較大的值可以提高生產(chǎn)者的吞吐量,因為它允許更多的消息在未確認(rèn)狀態(tài)下發(fā)送,而較小的值可以提高消息傳遞的可靠性,因為它限制了未確認(rèn)請求的數(shù)量,從而減少了消息丟失的風(fēng)險。

默認(rèn)情況下,max.in.flight.requests.per.connection 的值是 5。根據(jù)應(yīng)用程序的要求和實際情況,可以適當(dāng)?shù)卣{(diào)整這個參數(shù)的值。

public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = "max.in.flight.requests.per.connection";

05 KafkaSink 應(yīng)用依賴

<!-- Flink kafka 連接器依賴 start -->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.12</artifactId><version>1.14.4</version>
</dependency>
<!-- Flink kafka 連接器依賴 end -->

06 KafkaSink 快速入門

6.1 包結(jié)構(gòu)

在這里插入圖片描述

6.2 項目配置

log4j2.properties

rootLogger.level=INFO
rootLogger.appenderRef.console.ref=ConsoleAppender
appender.console.name=ConsoleAppender
appender.console.type=CONSOLE
appender.console.layout.type=PatternLayout
appender.console.layout.pattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log.file=D:\\tmproot
Logger.level=INFO

6.3 pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.aurora</groupId><artifactId>aurora_kafka_connector</artifactId><version>1.0-SNAPSHOT</version><!--屬性設(shè)置--><properties><!--java_JDK版本--><java.version>1.8</java.version><!--maven打包插件--><maven.plugin.version>3.8.1</maven.plugin.version><!--編譯編碼UTF-8--><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><!--輸出報告編碼UTF-8--><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><!--json數(shù)據(jù)格式處理工具--><fastjson.version>1.2.75</fastjson.version><!--log4j版本--><log4j.version>2.17.1</log4j.version><!--flink版本--><flink.version>1.14.4</flink.version><!--scala版本--><scala.binary.version>2.12</scala.binary.version></properties><!--依賴管理--><dependencies><!-- fastJson工具類依賴 start --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version></dependency><!-- fastJson工具類依賴 end --><!-- log4j日志框架依賴 start --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>${log4j.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>${log4j.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>${log4j.version}</version></dependency><!-- log4j日志框架依賴 end --><!-- Flink基礎(chǔ)依賴 start --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- Flink基礎(chǔ)依賴 end --><!-- Flink kafka 連接器依賴 start --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- Flink kafka 連接器依賴 end --></dependencies><!--編譯打包--><build><finalName>${project.name}</finalName><!--資源文件打包--><resources><resource><directory>src/main/resources</directory></resource><resource><directory>src/main/java</directory><includes><include>**/*.xml</include></includes></resource></resources><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>org.apache.flink:force-shading</exclude><exclude>org.google.code.flindbugs:jar305</exclude><exclude>org.slf4j:*</exclude><excluder>org.apache.logging.log4j:*</excluder></excludes></artifactSet><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.aurora.demo,ElasticsearchSinkStreamingJobDemo</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins><!--插件統(tǒng)一管理--><pluginManagement><plugins><!--maven打包插件--><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>${spring.boot.version}</version><configuration><fork>true</fork><finalName>${project.build.finalName}</finalName></configuration><executions><execution><goals><goal>repackage</goal></goals></execution></executions></plugin><!--編譯打包插件--><plugin><artifactId>maven-compiler-plugin</artifactId><version>${maven.plugin.version}</version><configuration><source>${java.version}</source><target>${java.version}</target><encoding>UTF-8</encoding><compilerArgs><arg>-parameters</arg></compilerArgs></configuration></plugin></plugins></pluginManagement></build></project>

6.4 Flink集成KafkaSink作業(yè)

package com.aurora;import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.Properties;
import java.util.UUID;/*** 描述:Flink集成kafkaSink,實現(xiàn)數(shù)據(jù)流寫入Kafka集群** @author 淺夏的貓* @version 1.0.0* @date 2024-02-18 20:52:25*/
public class KafkaSinkStreamJobDemo {private static final Logger logger = LoggerFactory.getLogger(KafkaSinkStreamJobDemo.class);public static void main(String[] args) {try {logger.info("開始啟動作業(yè)!!!");// 創(chuàng)建Flink運(yùn)行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 創(chuàng)建KafksSink配置Properties properties = new Properties();properties.setProperty(ProducerConfig.ACKS_CONFIG, "1");properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "0");properties.setProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000");// 序列化模式KafkaRecordSerializationSchema<String> recordSerializer = KafkaRecordSerializationSchema.builder()//設(shè)置對哪個主題進(jìn)行序列化.setTopic("topic_a")//設(shè)置數(shù)據(jù)值序列化方式.setValueSerializationSchema(new SimpleStringSchema())//設(shè)置數(shù)據(jù)key序列化方式.setKeySerializationSchema(new SimpleStringSchema()).build();// 創(chuàng)建KafkaSink算子KafkaSink<String> kafkaSink = KafkaSink.<String>builder()//設(shè)置kafka各種參數(shù).setKafkaProducerConfig(properties)//設(shè)置序列化模式.setRecordSerializer(recordSerializer)//設(shè)置傳遞保證//At Most Once (至多一次): 系統(tǒng)保證消息要么被成功傳遞一次,要么根本不被傳遞。這種保證意味著消息可能會丟失,但不會被傳遞多次。//At Least Once (至少一次): 系統(tǒng)保證消息至少會被傳遞一次,但可能會導(dǎo)致消息的重復(fù)傳遞。這種保證確保了消息的不丟失,但應(yīng)用程序需要能夠處理重復(fù)消息的情況。//Exactly Once (精確一次): 系統(tǒng)保證消息會被確切地傳遞一次,而沒有任何重復(fù)。這是最高級別的傳遞保證,確保消息不會丟失且不會被重復(fù).setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)//設(shè)置集群地址.setBootstrapServers("127.0.0.1:9092")//設(shè)置事務(wù)前綴.setTransactionalIdPrefix("flink_").build();// 生成一個數(shù)據(jù)流SourceFunction<String> sourceFunction = new SourceFunction<String>() {@Overridepublic void run(SourceContext<String> sourceContext) throws Exception {while (true) {String id = UUID.randomUUID().toString();sourceContext.collect( id);logger.info("正在下發(fā)數(shù)據(jù):{}",id);Thread.sleep(1000);}}@Overridepublic void cancel() {}};// 創(chuàng)建數(shù)據(jù)源DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction).setParallelism(1);// 數(shù)據(jù)流數(shù)據(jù)通過KafkaSink算子寫入kafkadataStreamSource.sinkTo(kafkaSink).setParallelism(1);// 執(zhí)行任務(wù)env.execute("KafkaSinkStreamJobDemo");} catch (Exception e) {e.printStackTrace();}}
}

6.5 驗證

構(gòu)建并運(yùn)行 Flink 應(yīng)用,確保應(yīng)用能夠成功發(fā)送數(shù)據(jù)到 Kafka 主題。你可以通過 Kafka Consumer 來驗證是否成功接收到了消息。

這個簡單的示例展示了如何使用 Kafka Sink 集成到流處理系統(tǒng)中,并且它是可運(yùn)行的。在實際應(yīng)用中,你可以根據(jù)需要配置更多參數(shù),例如序列化器、acks 級別、以及其他相關(guān)的生產(chǎn)者和 Kafka 配置。

通過kafka命令啟動一個消費者,觀察是否實時消費到數(shù)據(jù)

#windows
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic topic_a#linux
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_a

在這里插入圖片描述

07 總結(jié)

Kafka Sink 是實現(xiàn)流處理到 Kafka 集群的關(guān)鍵組件之一。通過上述示例,你可以開始使用 Kafka Sink 將你的流處理數(shù)據(jù)發(fā)送到 Kafka,從而實現(xiàn)可靠的消息傳遞。在實際應(yīng)用中,確保根據(jù)業(yè)務(wù)需求和性能要求調(diào)整配置參數(shù),以獲得最佳的性能和穩(wěn)定性。

http://www.risenshineclean.com/news/48151.html

相關(guān)文章:

  • 如何網(wǎng)站建設(shè)seo優(yōu)化排名百度教程
  • 物流三方網(wǎng)站怎么做重慶森林電影
  • 網(wǎng)站數(shù)據(jù)鏈接怎么做商城推廣軟文范文
  • 泉州網(wǎng)站制作平臺谷歌google play下載
  • 如何做網(wǎng)絡(luò)營銷直播倉山區(qū)seo引擎優(yōu)化軟件
  • 北京最好的網(wǎng)站建設(shè)公司全國疫情高峰感染高峰進(jìn)度
  • 西寧商城網(wǎng)站建設(shè)公司網(wǎng)站關(guān)鍵詞優(yōu)化有用嗎
  • 滄州網(wǎng)站備案哈爾濱網(wǎng)絡(luò)推廣
  • 如何做電商網(wǎng)站成都seo培
  • 世界杯視頻直播網(wǎng)站房地產(chǎn)十大營銷手段
  • 太原網(wǎng)站制作案例公司網(wǎng)站模版
  • 柯橋做網(wǎng)站的公司百度網(wǎng)盤官網(wǎng)登錄首頁
  • 怎么做網(wǎng)站訪問統(tǒng)計手機(jī)創(chuàng)建網(wǎng)站免費注冊
  • 有pc網(wǎng)站 移動網(wǎng)站怎么做關(guān)鍵詞怎么找出來
  • 企業(yè)網(wǎng)站建設(shè)哪里好推廣鏈接點擊器
  • 鶴壁seo東莞做網(wǎng)站優(yōu)化
  • 網(wǎng)站是誰做的企業(yè)產(chǎn)品網(wǎng)絡(luò)推廣
  • java網(wǎng)站開發(fā)計劃表情感營銷案例
  • 廣州部隊網(wǎng)站建設(shè)費用拓客團(tuán)隊怎么聯(lián)系
  • 做網(wǎng)站需要用什么開發(fā)軟件福州百度分公司
  • 北京網(wǎng)站編程培訓(xùn)石家莊關(guān)鍵詞優(yōu)化平臺
  • 刪除的網(wǎng)站做404東莞網(wǎng)站seo優(yōu)化托管
  • 免費發(fā)布信息網(wǎng)站大全666代刷網(wǎng)站推廣快速
  • 智能建站軟件東莞網(wǎng)站seo公司哪家大
  • 物流公司網(wǎng)站建設(shè)方案長沙seo網(wǎng)站管理
  • 手機(jī)網(wǎng)站用什么語言開發(fā)互動營銷成功案例
  • 建網(wǎng)站怎么做報分系統(tǒng)長春seo排名收費
  • 哪些網(wǎng)站做的最有特色如何優(yōu)化網(wǎng)站快速排名
  • wordpress 阿里云優(yōu)化教程網(wǎng)
  • 上傳網(wǎng)站備案信息真實性核驗單如何用html制作網(wǎng)頁