慈溪高端網(wǎng)站設(shè)計(jì)百度貼吧怎么做推廣
目錄
- 一、獨(dú)立消費(fèi)者消費(fèi)某一個(gè)主題中某個(gè)分區(qū)數(shù)據(jù)案例
- 1.1、案例需求
- 1.2、案例代碼
- 1.3、測(cè)試
一、獨(dú)立消費(fèi)者消費(fèi)某一個(gè)主題中某個(gè)分區(qū)數(shù)據(jù)案例
1.1、案例需求
- 創(chuàng)建一個(gè)獨(dú)立消費(fèi)者,消費(fèi)firstTopic主題 0 號(hào)分區(qū)的數(shù)據(jù),所下圖所示:
1.2、案例代碼
-
生產(chǎn)者往firstTopic主題 0 號(hào)分區(qū)發(fā)送數(shù)據(jù)代碼
package com.xz.kafka.producer;import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties;public class CustomProducerCallback {public static void main(String[] args) throws InterruptedException {//1、創(chuàng)建 kafka 生產(chǎn)者的配置對(duì)象Properties properties = new Properties();//2、給 kafka 配置對(duì)象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");//3、指定對(duì)應(yīng)的key和value的序列化類(lèi)型 key.serializer value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//4、創(chuàng)建 kafka 生產(chǎn)者對(duì)象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);//5、調(diào)用 send 方法,發(fā)送消息for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<>("firstTopic", 0,"","hello kafka" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null){System.out.println("主題: "+metadata.topic() + " 分區(qū): "+ metadata.partition());}}});Thread.sleep(2);}// 3 關(guān)閉資源kafkaProducer.close();} }
-
消費(fèi)者消費(fèi)firstTopic主題 0 分區(qū)數(shù)據(jù)代碼
package com.xz.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration; import java.util.ArrayList; import java.util.Properties;public class CustomConsumerPartition {public static void main(String[] args) {// 配置Properties properties = new Properties();// 連接properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");// 反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 組idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");// 1 創(chuàng)建一個(gè)消費(fèi)者KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);// 2 訂閱主題對(duì)應(yīng)的分區(qū)ArrayList<TopicPartition> topicPartitions = new ArrayList<>();topicPartitions.add(new TopicPartition("firstTopic",0));kafkaConsumer.assign(topicPartitions);// 3 消費(fèi)數(shù)據(jù)while (true){ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}} }
1.3、測(cè)試
- 在 IDEA 中執(zhí)行消費(fèi)者程序,如下圖:
- 在 IDEA 中執(zhí)行生產(chǎn)者程序 ,在控制臺(tái)觀察生成幾個(gè) 0號(hào)分區(qū)的數(shù)據(jù),如下圖:
- 在 IDEA 控制臺(tái),觀察接收到的數(shù)據(jù),只能消費(fèi)到 0 號(hào)分區(qū)數(shù)據(jù)表示正確。