專業(yè)做網(wǎng)站app的公司有哪些有品質(zhì)的網(wǎng)站推廣公司
文章目錄
- 01 KafkaSink 版本&導(dǎo)言
- 02 KafkaSink 基本概念
- 03 KafkaSink 工作原理
- 1.初始化連接
- 2.定義序列化模式
- 3.創(chuàng)建KafkaSink算子
- 4.創(chuàng)建數(shù)據(jù)源
- 5.將數(shù)據(jù)流添加到KafkaSink
- 6.內(nèi)部工作機(jī)制
- 04 KafkaSink參數(shù)配置
- 05 KafkaSink 應(yīng)用依賴
- 06 KafkaSink 快速入門
- 6.1 包結(jié)構(gòu)
- 6.2 項目配置
- 6.3 pom文件
- 6.4 Flink集成KafkaSink作業(yè)
- 6.5 驗證
- 07 總結(jié)
01 KafkaSink 版本&導(dǎo)言
Flink版本:
本文主要是基于Flink1.14.4 版本
導(dǎo)言:
Apache Flink 作為流式處理領(lǐng)域的先鋒,為實時數(shù)據(jù)處理提供了強(qiáng)大而靈活的解決方案。其中,KafkaSink 是 Flink 生態(tài)系統(tǒng)中的關(guān)鍵組件之一,扮演著將 Flink 處理的數(shù)據(jù)可靠地發(fā)送到 Kafka 主題的角色。本文將深入探討 KafkaSink 的工作原理、配置和最佳實踐,幫助讀者全面掌握在 Flink 中使用 KafkaSink 的技巧和方法。
02 KafkaSink 基本概念
KafkaSink 是 Apache Flink 提供的用于將流式數(shù)據(jù)發(fā)送到 Kafka 的連接器。它允許 Flink 應(yīng)用程序?qū)⒔?jīng)過處理的數(shù)據(jù)以高效和可靠的方式傳輸?shù)?Kafka 主題,從而實現(xiàn)流處理與消息隊列的無縫集成。
特性和優(yōu)勢:
- Exactly-Once 語義: KafkaSink 提供 Exactly-Once 語義,確保數(shù)據(jù)不會丟失,也不會重復(fù)寫入 Kafka 主題。這是通過 Flink 提供的端到端一致性保障的一部分。
- 高性能: KafkaSink 被設(shè)計為高性能的組件,能夠處理大規(guī)模的數(shù)據(jù)流,并以低延遲將數(shù)據(jù)發(fā)送到 Kafka。其底層使用 Kafka 生產(chǎn)者 API,充分利用 Kafka 的并發(fā)性和批量處理能力。
- 配置靈活: 用戶可以通過配置參數(shù)定制 KafkaSink 的行為,包括 Kafka 服務(wù)器地址、主題名稱、生產(chǎn)者配置等。這種靈活性使得 KafkaSink 可以適應(yīng)不同場景和需求。
- Exactly-Once Sink Semantics: KafkaSink 通過 Kafka 生產(chǎn)者的事務(wù)支持,確保在發(fā)生故障時能夠保持?jǐn)?shù)據(jù)的一致性,即使在 Flink 任務(wù)重新啟動后也能繼續(xù)從上次中斷的地方進(jìn)行。
03 KafkaSink 工作原理
KafkaSink是Apache Flink中用于將流式數(shù)據(jù)寫入Apache Kafka的關(guān)鍵組件。其工作原理涉及幾個主要步驟,同時我將介紹一些源碼片段以解釋其內(nèi)部實現(xiàn)。
1.初始化連接
用戶需要配置Kafka連接屬性,包括Kafka服務(wù)器地址、序列化器等。在Flink中,這通常通過創(chuàng)建Properties對象來完成。
// 創(chuàng)建KafksSink配置Properties properties = new Properties();properties.setProperty(ProducerConfig.ACKS_CONFIG, "1");properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "0");properties.setProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000");
2.定義序列化模式
KafkaRecordSerializationSchema
是 Apache Flink 中用于將數(shù)據(jù)流轉(zhuǎn)換為 Kafka 記錄(record)的序列化模式(Serialization Schema)。它允許將 Flink 數(shù)據(jù)流中的元素轉(zhuǎn)換為 Kafka 生產(chǎn)者記錄,并定義了如何序列化元素的邏輯。
在 Flink 中,當(dāng)你想要將數(shù)據(jù)發(fā)送到 Kafka 主題,需要一個序列化模式來將 Flink 數(shù)據(jù)流中的元素序列化為 Kafka 記錄。而 KafkaRecordSerializationSchema
就是為此目的而設(shè)計的。
// 序列化模式
KafkaRecordSerializationSchema<String> recordSerializer = KafkaRecordSerializationSchema.builder()//設(shè)置對哪個主題進(jìn)行序列化.setTopic("topic_a")//設(shè)置數(shù)據(jù)值序列化方式.setValueSerializationSchema(new SimpleStringSchema())//設(shè)置數(shù)據(jù)key序列化方式.setKeySerializationSchema(new SimpleStringSchema()).build();
3.創(chuàng)建KafkaSink算子
使用Flink提供的KafkaSink
類創(chuàng)建一個Kafka生產(chǎn)者實例。以下是簡化的源碼片段,展示了如何創(chuàng)建實例:
注意:如果傳遞保證選擇Exactly Once (精確一次),需要設(shè)置 客戶端的超時時間 ,否則會報錯
Caused by: org.apache.kafka.common.KafkaException: Unexpected error in InitProducerIdResponse; The transaction timeout is larger than the maximum value allowed by the broker (as configured by transaction.max.timeout.ms),需要設(shè)置 transaction.timeout.ms 小于15分鐘,后續(xù)會專門出一篇關(guān)于這個傳遞保證的博客講述。
// 創(chuàng)建KafkaSink算子
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()//設(shè)置kafka各種參數(shù).setKafkaProducerConfig(properties)//設(shè)置序列化模式.setRecordSerializer(recordSerializer)//設(shè)置傳遞保證//At Most Once (至多一次): 系統(tǒng)保證消息要么被成功傳遞一次,要么根本不被傳遞。這種保證意味著消息可能會丟失,但不會被傳遞多//At Least Once (至少一次): 系統(tǒng)保證消息至少會被傳遞一次,但可能會導(dǎo)致消息的重復(fù)傳遞。這種保證確保了消息的不丟失,但應(yīng)用//Exactly Once (精確一次): 系統(tǒng)保證消息會被確切地傳遞一次,而沒有任何重復(fù)。這是最高級別的傳遞保證,確保消息不會丟失且不會.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)//設(shè)置集群地址.setBootstrapServers("127.0.0.1:9092")//設(shè)置事務(wù)前綴.setTransactionalIdPrefix("flink_").build();
4.創(chuàng)建數(shù)據(jù)源
創(chuàng)建數(shù)據(jù)源,每隔1000ms下發(fā)一筆數(shù)據(jù)
// 生成一個數(shù)據(jù)流
SourceFunction<String> sourceFunction = new SourceFunction<String>() {@Overridepublic void run(SourceContext<String> sourceContext) throws Exception {while (true) {String id = UUID.randomUUID().toString();sourceContext.collect( id);logger.info("正在下發(fā)數(shù)據(jù):{}",id);Thread.sleep(1000);}}@Overridepublic void cancel() {}// 創(chuàng)建數(shù)據(jù)源
DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction).setParallelism(1);
5.將數(shù)據(jù)流添加到KafkaSink
在Flink應(yīng)用程序中,通過addSink()
方法將要寫入Kafka主題數(shù)據(jù)流添加到KafkaSink,以下是一個簡化的示例:
// 數(shù)據(jù)流數(shù)據(jù)通過KafkaSink算子寫入kafka
dataStreamSource.sinkTo(kafkaSink).setParallelism(1);// 執(zhí)行任務(wù)
env.execute("KafkaSinkStreamJobDemo");
6.內(nèi)部工作機(jī)制
KafkaSink會將接收到的數(shù)據(jù)流分區(qū)為若干個并行的數(shù)據(jù)流,每個并行數(shù)據(jù)流由一個Kafka生產(chǎn)者實例負(fù)責(zé)向Kafka主題寫入數(shù)據(jù)。這樣可以提高寫入的吞吐量和并行度。
以下是源碼中的一部分,展示了KafkaSink是如何將數(shù)據(jù)發(fā)送到Kafka的:
@Override
public void invoke(IN value, Context context) throws Exception {// 將數(shù)據(jù)發(fā)送到Kafka主題producer.send(new ProducerRecord<>(topic, value.toString()));
}
KafkaSink的源碼相對復(fù)雜,涉及到與Kafka的交互、并行處理、容錯等方面的實現(xiàn)。
總的來說,KafkaSink通過整合Flink和Kafka的功能,提供了一種高效、可靠的方式將流式數(shù)據(jù)寫入Kafka主題,適用于各種實時數(shù)據(jù)處理場景。
04 KafkaSink參數(shù)配置
需要根據(jù)具體的安全需求和環(huán)境配置 Kafka 的安全性參數(shù)。建議查閱最新版本的 Kafka 文檔以獲取詳細(xì)的安全配置指南:https://kafka.apache.org/documentation/#producerconfigs
在 Apache Flink 中,ProducerConfig
是用于配置 Kafka 生產(chǎn)者的類,它是 Kafka 客戶端庫中的一部分。下面是一些常見的配置選項及其解釋:
bootstrap.servers
集群的地址列表,用于初始化連接。生產(chǎn)者會從這些服務(wù)器中選擇一個 broker 進(jìn)行連接。
public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
metadata.max.age.ms
元數(shù)據(jù)的最大緩存時間。在此時間內(nèi),生產(chǎn)者將重復(fù)使用已經(jīng)獲取的元數(shù)據(jù),而不會向服務(wù)器發(fā)送新的元數(shù)據(jù)請求
public static final String METADATA_MAX_AGE_CONFIG = "metadata.max.age.ms";
batch.size
控制批量發(fā)送到 Kafka 的消息大小。當(dāng)消息積累到一定大小時,生產(chǎn)者會將它們一起發(fā)送到 Kafka 以提高效率
public static final String BATCH_SIZE_CONFIG = "batch.size";
acks
消息確認(rèn)機(jī)制,控制生產(chǎn)者收到確認(rèn)的方式??梢允恰癮ll”(所有副本都確認(rèn)),“1”(至少一個副本確認(rèn))或“0”(不需要確認(rèn))
public static final String ACKS_CONFIG = "acks";
linger.ms
生產(chǎn)者在發(fā)送批量消息前等待的時間,以使更多的消息聚合成一個批次。默認(rèn)是0,表示立即發(fā)送
public static final String LINGER_MS_CONFIG = "linger.ms";
request.timeout.ms
發(fā)送請求到 Kafka 服務(wù)器的超時時間
public static final String REQUEST_TIMEOUT_MS_CONFIG = "request.timeout.ms";
delivery.timeout.ms
這個參數(shù)在 Kafka 生產(chǎn)者的配置中是存在的,它表示生產(chǎn)者在發(fā)送消息后等待生產(chǎn)者確認(rèn)的最大時間。如果在這段時間內(nèi)沒有收到確認(rèn),生產(chǎn)者將重試發(fā)送消息或者拋出異常,具體取決于 retries 參數(shù)的配置
public static final String DELIVERY_TIMEOUT_MS_CONFIG = "delivery.timeout.ms";
client.id
用于區(qū)分不同生產(chǎn)者實例的客戶端 ID
public static final String CLIENT_ID_CONFIG = "client.id";
send.buffer.bytes
Kafka 消費者用于網(wǎng)絡(luò) socket 發(fā)送數(shù)據(jù)的緩沖區(qū)大小
public static final String SEND_BUFFER_CONFIG = "send.buffer.bytes";
receive.buffer.bytes
Kafka 消費者用于網(wǎng)絡(luò) socket 接收數(shù)據(jù)的緩沖區(qū)大小
public static final String RECEIVE_BUFFER_CONFIG = "receive.buffer.bytes";
max.request.size
單個請求發(fā)送的最大字節(jié)數(shù)
public static final String MAX_REQUEST_SIZE_CONFIG = "max.request.size";
reconnect.backoff.ms
用于控制在與 Kafka 服務(wù)器連接斷開后重新連接的時間間隔。具體來說,它定義了在發(fā)起重新連接嘗試之間等待的時間量,以毫秒為單位。如果連接失敗,生產(chǎn)者將在此時間間隔之后嘗試重新連接到 Kafka 服務(wù)器
public static final String RECONNECT_BACKOFF_MS_CONFIG = "reconnect.backoff.ms";
reconnect.backoff.max.ms
用于控制重新連接的最大退避時間。具體來說,它定義了在發(fā)起重新連接嘗試之間等待的最長時間量,以毫秒為單位。如果連接失敗,生產(chǎn)者將在此時間間隔之后嘗試重新連接到 Kafka 服務(wù)器
public static final String RECONNECT_BACKOFF_MAX_MS_CONFIG = "reconnect.backoff.max.ms";
max.block.ms
當(dāng) Kafka 隊列已滿時,生產(chǎn)者將阻塞的最長時間(毫秒),超時后會拋出異常
public static final String MAX_BLOCK_MS_CONFIG = "max.block.ms";
buffer.memory
生產(chǎn)者用于緩沖等待發(fā)送到服務(wù)器的消息的內(nèi)存大小。默認(rèn)是33554432字節(jié)(32MB)
public static final String BUFFER_MEMORY_CONFIG = "buffer.memory";
retries
生產(chǎn)者發(fā)送失敗后的重試次數(shù)。默認(rèn)是0,表示不重試
public static final String RETRIES_CONFIG = "retries";
key.serializer
用于序列化消息鍵的序列化器類。通常是指實現(xiàn)了Serializer接口的類的全限定名
public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer";
value.serializer
用于序列化消息值的序列化器類
public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer";
connections.max.idle.ms
客戶端與服務(wù)器保持空閑連接的最長時間(毫秒)。默認(rèn)值為 540000(即 9 分鐘)。例如:
"900000"
表示客戶端與服務(wù)器保持空閑連接的最長時間為 15 分鐘
public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = "connections.max.idle.ms";
partitioner.class
用于指定消息將被發(fā)送到哪個分區(qū)的算法,即分區(qū)器的實現(xiàn)類。Kafka 中的主題(topic)通常被劃分為多個分區(qū),每個分區(qū)都包含有序的消息序列。分區(qū)器決定了生產(chǎn)者發(fā)送的消息應(yīng)該被分配到哪個分區(qū)中。
通過配置
partitioner.class
,用戶可以自定義分區(qū)算法,以滿足特定的業(yè)務(wù)需求。Kafka 提供了默認(rèn)的分區(qū)器,也允許用戶根據(jù)自己的邏輯實現(xiàn)自定義的分區(qū)器。例如,以下是配置
partitioner.class
的示例:partitioner.class=com.example.CustomPartitioner
在這個示例中,
com.example.CustomPartitioner
是用戶自定義的分區(qū)器類的全限定名。該類必須實現(xiàn) Kafka 提供的org.apache.kafka.clients.producer.Partitioner
接口,該接口定義了確定消息應(yīng)該被發(fā)送到哪個分區(qū)的方法。自定義分區(qū)器可以根據(jù)消息的內(nèi)容、鍵(如果有)、以及其他上下文信息,靈活地決定消息應(yīng)該被發(fā)送到哪個分區(qū)。這樣的自定義分區(qū)策略可以幫助實現(xiàn)一些特定的業(yè)務(wù)邏輯,例如確保相關(guān)的消息被發(fā)送到相同的分區(qū),以提高消費的局部性。
在沒有顯式配置
partitioner.class
的情況下,Kafka 使用默認(rèn)的分區(qū)器,該分區(qū)器根據(jù)消息的鍵(如果有)或者采用輪詢的方式將消息平均分配到所有分區(qū)。
public static final String PARTITIONER_CLASS_CONFIG = "partitioner.class";
interceptor.classes
用于指定一組攔截器類。攔截器類是實現(xiàn) Kafka 接口
org.apache.kafka.clients.producer.ProducerInterceptor
或者org.apache.kafka.clients.consumer.ConsumerInterceptor
的類,用于在生產(chǎn)者或消費者發(fā)送或接收消息之前或之后對消息進(jìn)行處理。攔截器允許用戶對消息進(jìn)行自定義的預(yù)處理或后處理。這些操作可以包括但不限于:
- 對消息進(jìn)行加工、轉(zhuǎn)換、過濾。
- 在消息發(fā)送或接收之前或之后記錄日志。
- 對消息的時間戳或鍵進(jìn)行修改。
通過配置
interceptor.classes
參數(shù),可以指定一組攔截器類,并且它們將按順序應(yīng)用于每個消息。這樣的攔截器鏈?zhǔn)沟迷谙⑻幚磉^程中可以執(zhí)行多個不同的操作。例如,以下是配置
interceptor.classes
的示例:interceptor.classes=com.example.MyProducerInterceptor, com.example.MyConsumerInterceptor
在這個示例中,
com.example.MyProducerInterceptor
和com.example.MyConsumerInterceptor
是用戶定義的攔截器類的全限定名。這兩個類必須分別實現(xiàn) Kafka 提供的org.apache.kafka.clients.producer.ProducerInterceptor
和org.apache.kafka.clients.consumer.ConsumerInterceptor
接口。需要注意的是,攔截器類的順序很重要。攔截器將按照它們在
interceptor.classes
參數(shù)中聲明的順序依次應(yīng)用于每個消息。如果需要確保攔截器按照特定的順序應(yīng)用,可以通過配置參數(shù)來指定順序。攔截器提供了一種靈活的方式來實現(xiàn)特定的消息處理邏輯,同時也允許用戶對消息進(jìn)行監(jiān)控和記錄。
public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes";
enable.idempotence
public static final String ENABLE_IDEMPOTENCE_CONFIG = "enable.idempotence";
transaction.timeout.ms
public static final String TRANSACTION_TIMEOUT_CONFIG = "transaction.timeout.ms";
transactional.id
用于啟用生產(chǎn)者的冪等性。冪等性是指對于同一個生產(chǎn)者實例,無論消息發(fā)送多少次,最終只會產(chǎn)生一條副本(實際上是一個冪等序列)的性質(zhì)。這可以防止由于網(wǎng)絡(luò)錯誤、重試或者生產(chǎn)者重新啟動等情況導(dǎo)致的重復(fù)消息。
啟用生產(chǎn)者的冪等性可以通過設(shè)置
enable.idempotence
參數(shù)為true
來實現(xiàn)。例如:enable.idempotence=true
啟用冪等性會自動設(shè)置一些與冪等性相關(guān)的配置,例如:
acks
配置將被設(shè)置為 “all”,確保所有的 ISR(In-Sync Replicas)都已經(jīng)接收到消息。max.in.flight.requests.per.connection
將被設(shè)置為 1,以確保在一個連接上只有一個未確認(rèn)的請求。冪等性對于確保消息傳遞的精確一次語義非常重要。在啟用冪等性的情況下,生產(chǎn)者會為每條消息分配一個唯一的序列號,以便在重試發(fā)生時 Broker 能夠正確地識別并去重重復(fù)的消息。
需要注意的是,啟用冪等性會對性能產(chǎn)生一些開銷,因為它引入了額外的序列號和一些額外的網(wǎng)絡(luò)開銷。在生產(chǎn)環(huán)境中,需要仔細(xì)評估冪等性對性能的影響,并根據(jù)實際需求權(quán)衡性能和可靠性。
public static final String TRANSACTIONAL_ID_CONFIG = "transactional.id";
security.providers
參數(shù)已經(jīng)被 Kafka 移除了。在較早的 Kafka 版本中,這個參數(shù)可能被用于指定安全性相關(guān)的提供者。然而,從 Kafka 2.0 開始,Kafka 已經(jīng)采用了基于 JAAS(Java Authentication and Authorization Service)的身份驗證和授權(quán)機(jī)制,這個參數(shù)不再被使用。
現(xiàn)在,Kafka 的安全性配置主要包括以下幾個方面:
- 身份驗證機(jī)制(Authentication Mechanisms):Kafka 支持多種身份驗證機(jī)制,如SSL/TLS、SASL(Simple Authentication and Security Layer)、OAuth等。通過配置
security.protocol
參數(shù)選擇所需的身份驗證機(jī)制。- 授權(quán)機(jī)制(Authorization Mechanisms):Kafka 使用 ACL(Access Control Lists)來控制對主題和分區(qū)的訪問權(quán)限??梢酝ㄟ^配置
authorizer.class.name
參數(shù)選擇 ACL 的實現(xiàn)類。- 加密通信(Encryption):可以通過配置 SSL/TLS 來對 Kafka 通信進(jìn)行加密,以保護(hù)數(shù)據(jù)在傳輸過程中的安全性。
- 客戶端配置(Client Configuration):客戶端需要根據(jù)服務(wù)端的安全配置進(jìn)行相應(yīng)的配置,如設(shè)置 SSL/TLS 的信任證書、SASL 的認(rèn)證信息等。
需要根據(jù)具體的安全需求和環(huán)境配置 Kafka 的安全性參數(shù)。建議查閱最新版本的 Kafka 文檔以獲取詳細(xì)的安全配置指南。
public static final String SECURITY_PROVIDERS_CONFIG = "security.providers";
retry.backoff.ms
用于定義在發(fā)生可重試的發(fā)送錯誤后,生產(chǎn)者在進(jìn)行重試之前等待的時間間隔,以毫秒為單位。
當(dāng)生產(chǎn)者發(fā)送消息到 Kafka 時,可能會遇到一些可重試的錯誤,例如網(wǎng)絡(luò)問題、Kafka 服務(wù)器繁忙等。retry.backoff.ms 允許在出現(xiàn)這些可重試錯誤后等待一段時間,然后再次嘗試發(fā)送消息,以避免頻繁的重試。這樣的設(shè)計有助于在短時間內(nèi)解決暫時性的問題,而不至于對 Kafka 服務(wù)器造成額外的負(fù)擔(dān)。
具體而言,如果發(fā)生了可重試的錯誤,生產(chǎn)者將等待 retry.backoff.ms 指定的時間間隔,然后進(jìn)行下一次重試。如果重試依然失敗,生產(chǎn)者可能會繼續(xù)進(jìn)行更多的重試,每次之間間隔逐漸增加,以避免過度壓力和頻繁的連接嘗試。
默認(rèn)情況下,retry.backoff.ms 的值通常是 100 毫秒,但可以根據(jù)實際需求和環(huán)境進(jìn)行調(diào)整
public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms";
compression.type
控制發(fā)送到 Kafka 的消息是否壓縮。可以是“none”、“gzip”、“snappy”或“l(fā)z4”
public static final String COMPRESSION_TYPE_CONFIG = "compression.type";
metrics.sample.window.ms
用于配置 Kafka Broker 的參數(shù),用于定義度量指標(biāo)(metrics)的采樣窗口的時間跨度,以毫秒為單位。
具體來說,這個參數(shù)指定了度量指標(biāo)的采樣窗口的持續(xù)時間。在這個時間段內(nèi),Kafka Broker 會收集和計算各種指標(biāo),比如吞吐量、延遲、請求處理時間等。然后,這些度量指標(biāo)可以被監(jiān)控工具或者外部系統(tǒng)使用,以便實時地監(jiān)控 Kafka Broker 的運(yùn)行狀態(tài)和性能指標(biāo)。
通過調(diào)整
metrics.sample.window.ms
這個參數(shù),可以改變度量指標(biāo)采樣的時間窗口長度,以適應(yīng)不同的監(jiān)控和性能分析需求。較短的采樣窗口可以提供更加實時的性能指標(biāo),但也會增加系統(tǒng)資源的開銷;而較長的采樣窗口則可以減少資源開銷,但會犧牲一些實時性。默認(rèn)情況下,
metrics.sample.window.ms
的值通常是 30000 毫秒(30秒),但根據(jù)具體的 Kafka 集群配置和監(jiān)控需求,可以進(jìn)行調(diào)整。
public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = "metrics.sample.window.ms";
metrics.num.samples
用于配置 Kafka Broker 的參數(shù),用于指定在每個度量指標(biāo)采樣窗口中收集的樣本數(shù)量。
具體來說,度量指標(biāo)(metrics)是用于監(jiān)視 Kafka Broker 運(yùn)行狀態(tài)和性能的關(guān)鍵數(shù)據(jù),比如吞吐量、延遲、請求處理時間等。而
metrics.num.samples
參數(shù)則控制了在每個采樣窗口內(nèi)收集多少個樣本。這些樣本可以用于計算度量指標(biāo)的平均值、最大值、最小值等統(tǒng)計信息。通過調(diào)整
metrics.num.samples
這個參數(shù),可以平衡度量指標(biāo)的準(zhǔn)確性和資源消耗之間的權(quán)衡。較大的樣本數(shù)量可以提供更加準(zhǔn)確的度量指標(biāo)統(tǒng)計信息,但會增加系統(tǒng)資源的開銷;而較小的樣本數(shù)量則可以減少資源消耗,但可能會犧牲一些準(zhǔn)確性。默認(rèn)情況下,
metrics.num.samples
的值通常是 2,但根據(jù)具體的 Kafka 集群配置和監(jiān)控需求,可以進(jìn)行調(diào)整。
public static final String METRICS_NUM_SAMPLES_CONFIG = "metrics.num.samples";
metrics.recording.level
用于配置度量指標(biāo)(metrics)的記錄級別。這個參數(shù)決定了哪些度量指標(biāo)會被記錄和匯報。
具體來說,
metrics.recording.level
可以設(shè)置為以下幾個級別之一:
INFO
:記錄常規(guī)的度量指標(biāo),如吞吐量、延遲等。DEBUG
:記錄更詳細(xì)的度量指標(biāo)信息,可能包括更多的細(xì)節(jié)和較低級別的度量指標(biāo)。TRACE
:記錄非常詳細(xì)的度量指標(biāo)信息,包括所有細(xì)節(jié)和最低級別的度量指標(biāo)。通過調(diào)整
metrics.recording.level
這個參數(shù),可以靈活地控制記錄的度量指標(biāo)的級別,以滿足不同場景下的監(jiān)控和分析需求。例如,在生產(chǎn)環(huán)境中,通常會將記錄級別設(shè)置為INFO
或者DEBUG
,以便實時監(jiān)控 Kafka 集群的運(yùn)行狀態(tài)和性能指標(biāo);而在調(diào)試或者故障排查時,可以將記錄級別設(shè)置為TRACE
,以獲取更詳細(xì)的信息。默認(rèn)情況下,
metrics.recording.level
的值通常是INFO
,但可以根據(jù)具體的需求和環(huán)境進(jìn)行調(diào)整。
public static final String METRICS_RECORDING_LEVEL_CONFIG = "metrics.recording.level";
metric.reporters
用于指定要使用的度量指標(biāo)(metrics)報告器。度量指標(biāo)報告器負(fù)責(zé)將 Kafka Broker 收集到的度量指標(biāo)信息發(fā)送到指定的位置,以供監(jiān)控和分析使用。
具體來說,
metric.reporters
參數(shù)接受一個逗號分隔的報告器類名列表,這些報告器類名必須實現(xiàn) Kafka 的org.apache.kafka.common.metrics.MetricsReporter
接口。通過配置這個參數(shù),可以啟用不同的度量指標(biāo)報告器,并將度量指標(biāo)信息發(fā)送到不同的目的地,比如日志、JMX、Graphite、InfluxDB 等。例如,可以使用以下配置啟用 JMX 報告器和日志報告器:
metric.reporters=jmx, kafka.metrics.KafkaMetricsReporter
這樣配置后,Kafka Broker 將同時使用 JMX 報告器和日志報告器,將度量指標(biāo)信息發(fā)送到 JMX 和日志中。
默認(rèn)情況下,
metric.reporters
參數(shù)為空,表示不使用任何度量指標(biāo)報告器。在實際部署中,根據(jù)監(jiān)控和分析需求,可以配置不同的度量指標(biāo)報告器來收集和報告度量指標(biāo)信息。
public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters";
max.in.flight.requests.per.connection
用于控制在任何給定時間內(nèi)允許向單個 Broker 發(fā)送的未確認(rèn)請求的最大數(shù)量。
在 Kafka 中,生產(chǎn)者發(fā)送消息到 Broker 時,可以選擇等待服務(wù)器確認(rèn)(acknowledgement)消息發(fā)送成功后再發(fā)送下一條消息,或者繼續(xù)發(fā)送下一條消息而不等待前一條消息的確認(rèn)。當(dāng)生產(chǎn)者選擇繼續(xù)發(fā)送下一條消息時,這些未確認(rèn)的消息就會處于 “in-flight” 狀態(tài)。
max.in.flight.requests.per.connection
參數(shù)就是用來限制在這種情況下的未確認(rèn)請求的數(shù)量。如果未確認(rèn)請求的數(shù)量達(dá)到了這個限制,生產(chǎn)者將會阻塞,直到有一些請求被確認(rèn),才會繼續(xù)發(fā)送新的請求。通過調(diào)整
max.in.flight.requests.per.connection
參數(shù),可以平衡生產(chǎn)者的吞吐量和消息傳遞的可靠性之間的權(quán)衡。較大的值可以提高生產(chǎn)者的吞吐量,因為它允許更多的消息在未確認(rèn)狀態(tài)下發(fā)送,而較小的值可以提高消息傳遞的可靠性,因為它限制了未確認(rèn)請求的數(shù)量,從而減少了消息丟失的風(fēng)險。默認(rèn)情況下,
max.in.flight.requests.per.connection
的值是 5。根據(jù)應(yīng)用程序的要求和實際情況,可以適當(dāng)?shù)卣{(diào)整這個參數(shù)的值。
public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = "max.in.flight.requests.per.connection";
05 KafkaSink 應(yīng)用依賴
<!-- Flink kafka 連接器依賴 start -->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.12</artifactId><version>1.14.4</version>
</dependency>
<!-- Flink kafka 連接器依賴 end -->
06 KafkaSink 快速入門
6.1 包結(jié)構(gòu)
6.2 項目配置
log4j2.properties
rootLogger.level=INFO
rootLogger.appenderRef.console.ref=ConsoleAppender
appender.console.name=ConsoleAppender
appender.console.type=CONSOLE
appender.console.layout.type=PatternLayout
appender.console.layout.pattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log.file=D:\\tmproot
Logger.level=INFO
6.3 pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.aurora</groupId><artifactId>aurora_kafka_connector</artifactId><version>1.0-SNAPSHOT</version><!--屬性設(shè)置--><properties><!--java_JDK版本--><java.version>1.8</java.version><!--maven打包插件--><maven.plugin.version>3.8.1</maven.plugin.version><!--編譯編碼UTF-8--><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><!--輸出報告編碼UTF-8--><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><!--json數(shù)據(jù)格式處理工具--><fastjson.version>1.2.75</fastjson.version><!--log4j版本--><log4j.version>2.17.1</log4j.version><!--flink版本--><flink.version>1.14.4</flink.version><!--scala版本--><scala.binary.version>2.12</scala.binary.version></properties><!--依賴管理--><dependencies><!-- fastJson工具類依賴 start --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version></dependency><!-- fastJson工具類依賴 end --><!-- log4j日志框架依賴 start --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>${log4j.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>${log4j.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>${log4j.version}</version></dependency><!-- log4j日志框架依賴 end --><!-- Flink基礎(chǔ)依賴 start --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- Flink基礎(chǔ)依賴 end --><!-- Flink kafka 連接器依賴 start --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- Flink kafka 連接器依賴 end --></dependencies><!--編譯打包--><build><finalName>${project.name}</finalName><!--資源文件打包--><resources><resource><directory>src/main/resources</directory></resource><resource><directory>src/main/java</directory><includes><include>**/*.xml</include></includes></resource></resources><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>org.apache.flink:force-shading</exclude><exclude>org.google.code.flindbugs:jar305</exclude><exclude>org.slf4j:*</exclude><excluder>org.apache.logging.log4j:*</excluder></excludes></artifactSet><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.aurora.demo,ElasticsearchSinkStreamingJobDemo</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins><!--插件統(tǒng)一管理--><pluginManagement><plugins><!--maven打包插件--><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>${spring.boot.version}</version><configuration><fork>true</fork><finalName>${project.build.finalName}</finalName></configuration><executions><execution><goals><goal>repackage</goal></goals></execution></executions></plugin><!--編譯打包插件--><plugin><artifactId>maven-compiler-plugin</artifactId><version>${maven.plugin.version}</version><configuration><source>${java.version}</source><target>${java.version}</target><encoding>UTF-8</encoding><compilerArgs><arg>-parameters</arg></compilerArgs></configuration></plugin></plugins></pluginManagement></build></project>
6.4 Flink集成KafkaSink作業(yè)
package com.aurora;import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.Properties;
import java.util.UUID;/*** 描述:Flink集成kafkaSink,實現(xiàn)數(shù)據(jù)流寫入Kafka集群** @author 淺夏的貓* @version 1.0.0* @date 2024-02-18 20:52:25*/
public class KafkaSinkStreamJobDemo {private static final Logger logger = LoggerFactory.getLogger(KafkaSinkStreamJobDemo.class);public static void main(String[] args) {try {logger.info("開始啟動作業(yè)!!!");// 創(chuàng)建Flink運(yùn)行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 創(chuàng)建KafksSink配置Properties properties = new Properties();properties.setProperty(ProducerConfig.ACKS_CONFIG, "1");properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "0");properties.setProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000");// 序列化模式KafkaRecordSerializationSchema<String> recordSerializer = KafkaRecordSerializationSchema.builder()//設(shè)置對哪個主題進(jìn)行序列化.setTopic("topic_a")//設(shè)置數(shù)據(jù)值序列化方式.setValueSerializationSchema(new SimpleStringSchema())//設(shè)置數(shù)據(jù)key序列化方式.setKeySerializationSchema(new SimpleStringSchema()).build();// 創(chuàng)建KafkaSink算子KafkaSink<String> kafkaSink = KafkaSink.<String>builder()//設(shè)置kafka各種參數(shù).setKafkaProducerConfig(properties)//設(shè)置序列化模式.setRecordSerializer(recordSerializer)//設(shè)置傳遞保證//At Most Once (至多一次): 系統(tǒng)保證消息要么被成功傳遞一次,要么根本不被傳遞。這種保證意味著消息可能會丟失,但不會被傳遞多次。//At Least Once (至少一次): 系統(tǒng)保證消息至少會被傳遞一次,但可能會導(dǎo)致消息的重復(fù)傳遞。這種保證確保了消息的不丟失,但應(yīng)用程序需要能夠處理重復(fù)消息的情況。//Exactly Once (精確一次): 系統(tǒng)保證消息會被確切地傳遞一次,而沒有任何重復(fù)。這是最高級別的傳遞保證,確保消息不會丟失且不會被重復(fù).setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)//設(shè)置集群地址.setBootstrapServers("127.0.0.1:9092")//設(shè)置事務(wù)前綴.setTransactionalIdPrefix("flink_").build();// 生成一個數(shù)據(jù)流SourceFunction<String> sourceFunction = new SourceFunction<String>() {@Overridepublic void run(SourceContext<String> sourceContext) throws Exception {while (true) {String id = UUID.randomUUID().toString();sourceContext.collect( id);logger.info("正在下發(fā)數(shù)據(jù):{}",id);Thread.sleep(1000);}}@Overridepublic void cancel() {}};// 創(chuàng)建數(shù)據(jù)源DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction).setParallelism(1);// 數(shù)據(jù)流數(shù)據(jù)通過KafkaSink算子寫入kafkadataStreamSource.sinkTo(kafkaSink).setParallelism(1);// 執(zhí)行任務(wù)env.execute("KafkaSinkStreamJobDemo");} catch (Exception e) {e.printStackTrace();}}
}
6.5 驗證
構(gòu)建并運(yùn)行 Flink 應(yīng)用,確保應(yīng)用能夠成功發(fā)送數(shù)據(jù)到 Kafka 主題。你可以通過 Kafka Consumer 來驗證是否成功接收到了消息。
這個簡單的示例展示了如何使用 Kafka Sink 集成到流處理系統(tǒng)中,并且它是可運(yùn)行的。在實際應(yīng)用中,你可以根據(jù)需要配置更多參數(shù),例如序列化器、acks 級別、以及其他相關(guān)的生產(chǎn)者和 Kafka 配置。
通過kafka命令啟動一個消費者,觀察是否實時消費到數(shù)據(jù)
#windows
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic topic_a#linux
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_a
07 總結(jié)
Kafka Sink 是實現(xiàn)流處理到 Kafka 集群的關(guān)鍵組件之一。通過上述示例,你可以開始使用 Kafka Sink 將你的流處理數(shù)據(jù)發(fā)送到 Kafka,從而實現(xiàn)可靠的消息傳遞。在實際應(yīng)用中,確保根據(jù)業(yè)務(wù)需求和性能要求調(diào)整配置參數(shù),以獲得最佳的性能和穩(wěn)定性。