網(wǎng)站建設(shè)平臺(tái)網(wǎng)站設(shè)計(jì)seo刷關(guān)鍵詞排名優(yōu)化
消息中間件對(duì)比:
1、吞吐、可靠性、性能
Kafka安裝
Kafka對(duì)于zookeeper是強(qiáng)依賴,保存kafka相關(guān)的節(jié)點(diǎn)數(shù)據(jù),所以安裝Kafka之前必須先安裝zookeeper
- Docker安裝zookeeper
下載鏡像:
docker pull zookeeper:3.4.14
創(chuàng)建容器
docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14
- Docker安裝kafka
下載鏡像:
docker pull wurstmeister/kafka:2.12-2.3.1
創(chuàng)建容器
docker run -d --name kafka \
--env KAFKA_ADVERTISED_HOST_NAME=192.168.200.130 \
--env KAFKA_ZOOKEEPER_CONNECT=192.168.200.130:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.130:9092 \
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
--env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
--net=host wurstmeister/kafka:2.12-2.3.1
kafka入門
- 生產(chǎn)者發(fā)送消息,多個(gè)消費(fèi)者只能有一個(gè)消費(fèi)者接收到消息
- 生產(chǎn)者發(fā)送消息,多個(gè)消費(fèi)者都可以接收到消息
(1)創(chuàng)建kafka-demo項(xiàng)目,導(dǎo)入依賴
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId>
</dependency>
(2)生產(chǎn)者發(fā)送消息
package com.heima.kafka.sample;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;/*** 生產(chǎn)者*/
public class ProducerQuickStart {public static void main(String[] args) {//1.kafka的配置信息Properties properties = new Properties();//kafka的連接地址properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");//發(fā)送失敗,失敗的重試次數(shù)properties.put(ProducerConfig.RETRIES_CONFIG,5);//消息key的序列化器properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//消息value的序列化器properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//2.生產(chǎn)者對(duì)象KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);//封裝發(fā)送的消息ProducerRecord<String,String> record = new ProducerRecord<String, String>("itheima-topic","100001","hello kafka");//3.發(fā)送消息producer.send(record);//4.關(guān)閉消息通道,必須關(guān)閉,否則消息發(fā)送不成功producer.close();}}
(3)消費(fèi)者接收消息
package com.heima.kafka.sample;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;/*** 消費(fèi)者*/
public class ConsumerQuickStart {public static void main(String[] args) {//1.添加kafka的配置信息Properties properties = new Properties();//kafka的連接地址properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");//消費(fèi)者組properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");//消息的反序列化器properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//2.消費(fèi)者對(duì)象KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);//3.訂閱主題consumer.subscribe(Collections.singletonList("itheima-topic"));//當(dāng)前線程一直處于監(jiān)聽狀態(tài)while (true) {//4.獲取消息ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord.key());System.out.println(consumerRecord.value());}}}}
kafka高可用設(shè)計(jì)
1、設(shè)計(jì)集群模式:
Kafka的服務(wù)器端由被稱為 Broker 的服務(wù)進(jìn)程構(gòu)成,即一個(gè) Kafka 集群由多個(gè)Broker 組成。當(dāng)一個(gè)機(jī)器宕機(jī)了,另外一個(gè)機(jī)器就會(huì)替補(bǔ)山
2、備份機(jī)制:
Kafka定義了兩類副本
- 領(lǐng)導(dǎo)者副本(Leader Replica)
- 追隨者副本 (Follower Replica)
追隨者副本分為兩類:
1、一種是ISR副本,同步保存
2、普通的副本,異步保存
出現(xiàn)主節(jié)點(diǎn)宕機(jī),會(huì)先選ISR副本中的一個(gè)成為新的主節(jié)點(diǎn),保證數(shù)據(jù)一致性,沒(méi)有ISR節(jié)點(diǎn),再?gòu)钠胀ü?jié)點(diǎn)中挑選
針對(duì)全部節(jié)點(diǎn)宕機(jī)的情況,有兩種策略:
1、等待第一個(gè)ISR副本,保證了數(shù)據(jù)的盡可能一致
2、等待一個(gè)復(fù)活的追隨者,無(wú)論是ISR還是普通,提高系統(tǒng)的高可用性。
kafka生產(chǎn)者詳解
1發(fā)送類型
-
同步發(fā)送
使用send()方法發(fā)送,它會(huì)返回一個(gè)Future對(duì)象,調(diào)用get()方法進(jìn)行等待,就可以知道消息是否發(fā)送成功
RecordMetadata recordMetadata = producer.send(kvProducerRecord).get();
System.out.println(recordMetadata.offset());
-
異步發(fā)送
調(diào)用send()方法,并指定一個(gè)回調(diào)函數(shù),服務(wù)器在返回響應(yīng)時(shí)調(diào)用函數(shù)
//異步消息發(fā)送
producer.send(kvProducerRecord, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(e != null){System.out.println("記錄異常信息到日志表中");}System.out.println(recordMetadata.offset());}
});
2參數(shù)詳解
- ack
代碼的配置方式:
//ack配置 消息確認(rèn)機(jī)制
prop.put(ProducerConfig.ACKS_CONFIG,"all");
參數(shù)的選擇說(shuō)明
確認(rèn)機(jī)制 | 說(shuō)明 |
---|---|
acks=0 | 生產(chǎn)者在成功寫入消息之前不會(huì)等待任何來(lái)自服務(wù)器的響應(yīng),消息有丟失的風(fēng)險(xiǎn),但是速度最快 |
acks=1(默認(rèn)值) | 只要集群首領(lǐng)節(jié)點(diǎn)收到消息,生產(chǎn)者就會(huì)收到一個(gè)來(lái)自服務(wù)器的成功響應(yīng) |
acks=all | 只有當(dāng)所有參與賦值的節(jié)點(diǎn)全部收到消息時(shí),生產(chǎn)者才會(huì)收到一個(gè)來(lái)自服務(wù)器的成功響應(yīng) |
- retries
生產(chǎn)者從服務(wù)器收到的錯(cuò)誤有可能是臨時(shí)性錯(cuò)誤,在這種情況下,retries參數(shù)的值決定了生產(chǎn)者可以重發(fā)消息的次數(shù),如果達(dá)到這個(gè)次數(shù),生產(chǎn)者會(huì)放棄重試返回錯(cuò)誤,默認(rèn)情況下,生產(chǎn)者會(huì)在每次重試之間等待100ms
代碼中配置方式:
//重試次數(shù)
prop.put(ProducerConfig.RETRIES_CONFIG,10);
- 消息壓縮
默認(rèn)情況下, 消息發(fā)送時(shí)不會(huì)被壓縮。
代碼中配置方式:
//數(shù)據(jù)壓縮
prop.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"lz4");
壓縮算法 | 說(shuō)明 |
---|---|
snappy | 占用較少的 CPU, 卻能提供較好的性能和相當(dāng)可觀的壓縮比, 如果看重性能和網(wǎng)絡(luò)帶寬,建議采用 |
lz4 | 占用較少的 CPU, 壓縮和解壓縮速度較快,壓縮比也很客觀 |
gzip | 占用較多的 CPU,但會(huì)提供更高的壓縮比,網(wǎng)絡(luò)帶寬有限,可以使用這種算法 |
使用壓縮可以降低網(wǎng)絡(luò)傳輸開銷和存儲(chǔ)開銷,而這往往是向 Kafka 發(fā)送消息的瓶頸所在。
kafka消費(fèi)者
消息的有序性
方法:一個(gè)topic分區(qū)能保證自己的數(shù)據(jù)是按照先后消費(fèi)的,但是不能保證跨分區(qū)消息處理的先后順序。我么只能使用一個(gè)分區(qū),在單分區(qū)種,消息可以保證嚴(yán)格順序消費(fèi)
提交和偏移量
自動(dòng)提交:
當(dāng)enable.auto.commit被設(shè)置為true,提交方式就是讓消費(fèi)者自動(dòng)提交偏移量,每隔5秒消費(fèi)者會(huì)自動(dòng)把從poll0方法接收的最大偏移量提交上去,這樣只是記錄了規(guī)定時(shí)間內(nèi)的最大偏移量,其實(shí)與數(shù)據(jù)提交的偏移量存在偏差,因此可能會(huì)出現(xiàn)數(shù)據(jù)的重復(fù)提交或者丟失
手動(dòng)提交
當(dāng)enableauto.commit被設(shè)置為false可以有以下三種提交方式
- 提交當(dāng)前偏移量(同步提交)
- 異步提交
- 同步和異步組合提交
同步提交:commitSync()方法會(huì)一直嘗試直至提交成功,如果提交失敗也可以記錄到錯(cuò)誤日志里。
while (true){ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.println(record.value());System.out.println(record.key());try {consumer.commitSync();//同步提交當(dāng)前最新的偏移量}catch (CommitFailedException e){System.out.println("記錄提交失敗的異常:"+e);}}
}
異步提交:手動(dòng)提交有一個(gè)缺點(diǎn),那就是當(dāng)發(fā)起提交調(diào)用時(shí)應(yīng)用會(huì)阻塞。消息沒(méi)有重試機(jī)制
while (true){ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.println(record.value());System.out.println(record.key());}consumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {if(e!=null){System.out.println("記錄錯(cuò)誤的提交偏移量:"+ map+",異常信息"+e);}}});
}
同步和異步組合提交
異步提交也有個(gè)缺點(diǎn),那就是如果服務(wù)器返回提交失敗,異步提交不會(huì)進(jìn)行重試。相比較起來(lái),同步提交會(huì)進(jìn)行重試直到成功或者最后拋出異常給應(yīng)用。異步提交沒(méi)有實(shí)現(xiàn)重試是因?yàn)?#xff0c;如果同時(shí)存在多個(gè)異步提交,進(jìn)行重試可能會(huì)導(dǎo)致位移覆蓋。
舉個(gè)例子,假如我們發(fā)起了一個(gè)異步提交commitA,此時(shí)的提交位移為2000,隨后又發(fā)起了一個(gè)異步提交commitB且位移為3000;commitA提交失敗但commitB提交成功,此時(shí)commitA進(jìn)行重試并成功的話,會(huì)將實(shí)際上將已經(jīng)提交的位移從3000回滾到2000,導(dǎo)致消息重復(fù)消費(fèi)。
try {while (true){ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.println(record.value());System.out.println(record.key());}consumer.commitAsync();}
}catch (Exception e){+e.printStackTrace();System.out.println("記錄錯(cuò)誤信息:"+e);
}finally {try {consumer.commitSync();}finally {consumer.close();}
}
springboot整合kafka
1、在父類中的pop文件中導(dǎo)入依賴包
```xml
<!-- kafkfa -->
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId>
</dependency>
2、在需要用到kafka的微服務(wù)的naco中分別配置生產(chǎn)者和消費(fèi)者配置
spring:kafka:bootstrap-servers: 192.168.200.130:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer
spring:kafka:bootstrap-servers: 192.168.200.130:9092consumer:group-id: ${spring.application.name}key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer
傳遞消息為對(duì)象
目前springboot整合后的kafka,因?yàn)樾蛄谢魇荢tringSerializer,這個(gè)時(shí)候如果需要傳遞對(duì)象可以有兩種方式
方式一:可以自定義序列化器,對(duì)象類型眾多,這種方式通用性不強(qiáng),本章節(jié)不介紹
方式二:可以把要傳遞的對(duì)象進(jìn)行轉(zhuǎn)json字符串,接收消息后再轉(zhuǎn)為對(duì)象即可,本項(xiàng)目采用這種方式
- 發(fā)送消息
@GetMapping("/hello")
public String hello(){User user = new User();user.setUsername("xiaowang");user.setAge(18);kafkaTemplate.send("user-topic", JSON.toJSONString(user));return "ok";
}
- 接收消息
package com.heima.kafka.listener;import com.alibaba.fastjson.JSON;
import com.heima.kafka.pojo.User;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;@Component
public class HelloListener {@KafkaListener(topics = "user-topic")public void onMessage(String message){if(!StringUtils.isEmpty(message)){User user = JSON.parseObject(message, User.class);System.out.println(user);}}
}