中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁(yè) > news >正文

網(wǎng)站建設(shè)平臺(tái)網(wǎng)站設(shè)計(jì)seo刷關(guān)鍵詞排名優(yōu)化

網(wǎng)站建設(shè)平臺(tái)網(wǎng)站設(shè)計(jì),seo刷關(guān)鍵詞排名優(yōu)化,幫公司做網(wǎng)站賺錢嗎,什么免費(fèi)網(wǎng)站可以鏈接域名消息中間件對(duì)比: 1、吞吐、可靠性、性能 Kafka安裝 Kafka對(duì)于zookeeper是強(qiáng)依賴,保存kafka相關(guān)的節(jié)點(diǎn)數(shù)據(jù),所以安裝Kafka之前必須先安裝zookeeper Docker安裝zookeeper 下載鏡像: docker pull zookeeper:3.4.14創(chuàng)建容器 do…

消息中間件對(duì)比:
1、吞吐、可靠性、性能
在這里插入圖片描述

Kafka安裝

Kafka對(duì)于zookeeper是強(qiáng)依賴,保存kafka相關(guān)的節(jié)點(diǎn)數(shù)據(jù),所以安裝Kafka之前必須先安裝zookeeper

  • Docker安裝zookeeper

下載鏡像:

docker pull zookeeper:3.4.14

創(chuàng)建容器

docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14
  • Docker安裝kafka

下載鏡像:

docker pull wurstmeister/kafka:2.12-2.3.1

創(chuàng)建容器

docker run -d --name kafka \
--env KAFKA_ADVERTISED_HOST_NAME=192.168.200.130 \
--env KAFKA_ZOOKEEPER_CONNECT=192.168.200.130:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.130:9092 \
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
--env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
--net=host wurstmeister/kafka:2.12-2.3.1

kafka入門

  • 生產(chǎn)者發(fā)送消息,多個(gè)消費(fèi)者只能有一個(gè)消費(fèi)者接收到消息
  • 生產(chǎn)者發(fā)送消息,多個(gè)消費(fèi)者都可以接收到消息

(1)創(chuàng)建kafka-demo項(xiàng)目,導(dǎo)入依賴

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId>
</dependency>

(2)生產(chǎn)者發(fā)送消息

package com.heima.kafka.sample;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;/*** 生產(chǎn)者*/
public class ProducerQuickStart {public static void main(String[] args) {//1.kafka的配置信息Properties properties = new Properties();//kafka的連接地址properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");//發(fā)送失敗,失敗的重試次數(shù)properties.put(ProducerConfig.RETRIES_CONFIG,5);//消息key的序列化器properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//消息value的序列化器properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//2.生產(chǎn)者對(duì)象KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);//封裝發(fā)送的消息ProducerRecord<String,String> record = new ProducerRecord<String, String>("itheima-topic","100001","hello kafka");//3.發(fā)送消息producer.send(record);//4.關(guān)閉消息通道,必須關(guān)閉,否則消息發(fā)送不成功producer.close();}}

(3)消費(fèi)者接收消息

package com.heima.kafka.sample;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;/*** 消費(fèi)者*/
public class ConsumerQuickStart {public static void main(String[] args) {//1.添加kafka的配置信息Properties properties = new Properties();//kafka的連接地址properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");//消費(fèi)者組properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");//消息的反序列化器properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//2.消費(fèi)者對(duì)象KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);//3.訂閱主題consumer.subscribe(Collections.singletonList("itheima-topic"));//當(dāng)前線程一直處于監(jiān)聽狀態(tài)while (true) {//4.獲取消息ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord.key());System.out.println(consumerRecord.value());}}}}

kafka高可用設(shè)計(jì)

1、設(shè)計(jì)集群模式:

Kafka的服務(wù)器端由被稱為 Broker 的服務(wù)進(jìn)程構(gòu)成,即一個(gè) Kafka 集群由多個(gè)Broker 組成。當(dāng)一個(gè)機(jī)器宕機(jī)了,另外一個(gè)機(jī)器就會(huì)替補(bǔ)山
在這里插入圖片描述

2、備份機(jī)制:

Kafka定義了兩類副本

  1. 領(lǐng)導(dǎo)者副本(Leader Replica)
  2. 追隨者副本 (Follower Replica)
    追隨者副本分為兩類:
    1、一種是ISR副本,同步保存
    2、普通的副本,異步保存
    出現(xiàn)主節(jié)點(diǎn)宕機(jī),會(huì)先選ISR副本中的一個(gè)成為新的主節(jié)點(diǎn),保證數(shù)據(jù)一致性,沒(méi)有ISR節(jié)點(diǎn),再?gòu)钠胀ü?jié)點(diǎn)中挑選
    針對(duì)全部節(jié)點(diǎn)宕機(jī)的情況,有兩種策略:
    1、等待第一個(gè)ISR副本,保證了數(shù)據(jù)的盡可能一致
    2、等待一個(gè)復(fù)活的追隨者,無(wú)論是ISR還是普通,提高系統(tǒng)的高可用性。

kafka生產(chǎn)者詳解

1發(fā)送類型

  • 同步發(fā)送

    使用send()方法發(fā)送,它會(huì)返回一個(gè)Future對(duì)象,調(diào)用get()方法進(jìn)行等待,就可以知道消息是否發(fā)送成功

RecordMetadata recordMetadata = producer.send(kvProducerRecord).get();
System.out.println(recordMetadata.offset());
  • 異步發(fā)送

    調(diào)用send()方法,并指定一個(gè)回調(diào)函數(shù),服務(wù)器在返回響應(yīng)時(shí)調(diào)用函數(shù)

//異步消息發(fā)送
producer.send(kvProducerRecord, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(e != null){System.out.println("記錄異常信息到日志表中");}System.out.println(recordMetadata.offset());}
});

2參數(shù)詳解

  • ack

代碼的配置方式:

//ack配置  消息確認(rèn)機(jī)制
prop.put(ProducerConfig.ACKS_CONFIG,"all");

參數(shù)的選擇說(shuō)明

確認(rèn)機(jī)制說(shuō)明
acks=0生產(chǎn)者在成功寫入消息之前不會(huì)等待任何來(lái)自服務(wù)器的響應(yīng),消息有丟失的風(fēng)險(xiǎn),但是速度最快
acks=1(默認(rèn)值)只要集群首領(lǐng)節(jié)點(diǎn)收到消息,生產(chǎn)者就會(huì)收到一個(gè)來(lái)自服務(wù)器的成功響應(yīng)
acks=all只有當(dāng)所有參與賦值的節(jié)點(diǎn)全部收到消息時(shí),生產(chǎn)者才會(huì)收到一個(gè)來(lái)自服務(wù)器的成功響應(yīng)
  • retries

生產(chǎn)者從服務(wù)器收到的錯(cuò)誤有可能是臨時(shí)性錯(cuò)誤,在這種情況下,retries參數(shù)的值決定了生產(chǎn)者可以重發(fā)消息的次數(shù),如果達(dá)到這個(gè)次數(shù),生產(chǎn)者會(huì)放棄重試返回錯(cuò)誤,默認(rèn)情況下,生產(chǎn)者會(huì)在每次重試之間等待100ms

代碼中配置方式:

//重試次數(shù)
prop.put(ProducerConfig.RETRIES_CONFIG,10);
  • 消息壓縮

默認(rèn)情況下, 消息發(fā)送時(shí)不會(huì)被壓縮。

代碼中配置方式:

//數(shù)據(jù)壓縮
prop.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"lz4");
壓縮算法說(shuō)明
snappy占用較少的 CPU, 卻能提供較好的性能和相當(dāng)可觀的壓縮比, 如果看重性能和網(wǎng)絡(luò)帶寬,建議采用
lz4占用較少的 CPU, 壓縮和解壓縮速度較快,壓縮比也很客觀
gzip占用較多的 CPU,但會(huì)提供更高的壓縮比,網(wǎng)絡(luò)帶寬有限,可以使用這種算法

使用壓縮可以降低網(wǎng)絡(luò)傳輸開銷和存儲(chǔ)開銷,而這往往是向 Kafka 發(fā)送消息的瓶頸所在。

kafka消費(fèi)者

消息的有序性

方法:一個(gè)topic分區(qū)能保證自己的數(shù)據(jù)是按照先后消費(fèi)的,但是不能保證跨分區(qū)消息處理的先后順序。我么只能使用一個(gè)分區(qū),在單分區(qū)種,消息可以保證嚴(yán)格順序消費(fèi)

提交和偏移量

在這里插入圖片描述
自動(dòng)提交:
當(dāng)enable.auto.commit被設(shè)置為true,提交方式就是讓消費(fèi)者自動(dòng)提交偏移量,每隔5秒消費(fèi)者會(huì)自動(dòng)把從poll0方法接收的最大偏移量提交上去,這樣只是記錄了規(guī)定時(shí)間內(nèi)的最大偏移量,其實(shí)與數(shù)據(jù)提交的偏移量存在偏差,因此可能會(huì)出現(xiàn)數(shù)據(jù)的重復(fù)提交或者丟失
手動(dòng)提交
當(dāng)enableauto.commit被設(shè)置為false可以有以下三種提交方式

  • 提交當(dāng)前偏移量(同步提交)
  • 異步提交
  • 同步和異步組合提交

同步提交:commitSync()方法會(huì)一直嘗試直至提交成功,如果提交失敗也可以記錄到錯(cuò)誤日志里。

while (true){ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.println(record.value());System.out.println(record.key());try {consumer.commitSync();//同步提交當(dāng)前最新的偏移量}catch (CommitFailedException e){System.out.println("記錄提交失敗的異常:"+e);}}
}

異步提交:手動(dòng)提交有一個(gè)缺點(diǎn),那就是當(dāng)發(fā)起提交調(diào)用時(shí)應(yīng)用會(huì)阻塞。消息沒(méi)有重試機(jī)制

while (true){ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.println(record.value());System.out.println(record.key());}consumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {if(e!=null){System.out.println("記錄錯(cuò)誤的提交偏移量:"+ map+",異常信息"+e);}}});
}

同步和異步組合提交

異步提交也有個(gè)缺點(diǎn),那就是如果服務(wù)器返回提交失敗,異步提交不會(huì)進(jìn)行重試。相比較起來(lái),同步提交會(huì)進(jìn)行重試直到成功或者最后拋出異常給應(yīng)用。異步提交沒(méi)有實(shí)現(xiàn)重試是因?yàn)?#xff0c;如果同時(shí)存在多個(gè)異步提交,進(jìn)行重試可能會(huì)導(dǎo)致位移覆蓋。

舉個(gè)例子,假如我們發(fā)起了一個(gè)異步提交commitA,此時(shí)的提交位移為2000,隨后又發(fā)起了一個(gè)異步提交commitB且位移為3000;commitA提交失敗但commitB提交成功,此時(shí)commitA進(jìn)行重試并成功的話,會(huì)將實(shí)際上將已經(jīng)提交的位移從3000回滾到2000,導(dǎo)致消息重復(fù)消費(fèi)。

try {while (true){ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.println(record.value());System.out.println(record.key());}consumer.commitAsync();}
}catch (Exception e){+e.printStackTrace();System.out.println("記錄錯(cuò)誤信息:"+e);
}finally {try {consumer.commitSync();}finally {consumer.close();}
}

springboot整合kafka

1、在父類中的pop文件中導(dǎo)入依賴包

```xml
<!-- kafkfa -->
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId>
</dependency>

2、在需要用到kafka的微服務(wù)的naco中分別配置生產(chǎn)者和消費(fèi)者配置

spring:kafka:bootstrap-servers: 192.168.200.130:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer
spring:kafka:bootstrap-servers: 192.168.200.130:9092consumer:group-id: ${spring.application.name}key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

傳遞消息為對(duì)象

目前springboot整合后的kafka,因?yàn)樾蛄谢魇荢tringSerializer,這個(gè)時(shí)候如果需要傳遞對(duì)象可以有兩種方式

方式一:可以自定義序列化器,對(duì)象類型眾多,這種方式通用性不強(qiáng),本章節(jié)不介紹

方式二:可以把要傳遞的對(duì)象進(jìn)行轉(zhuǎn)json字符串,接收消息后再轉(zhuǎn)為對(duì)象即可,本項(xiàng)目采用這種方式

  • 發(fā)送消息
@GetMapping("/hello")
public String hello(){User user = new User();user.setUsername("xiaowang");user.setAge(18);kafkaTemplate.send("user-topic", JSON.toJSONString(user));return "ok";
}
  • 接收消息
package com.heima.kafka.listener;import com.alibaba.fastjson.JSON;
import com.heima.kafka.pojo.User;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;@Component
public class HelloListener {@KafkaListener(topics = "user-topic")public void onMessage(String message){if(!StringUtils.isEmpty(message)){User user = JSON.parseObject(message, User.class);System.out.println(user);}}
}
http://www.risenshineclean.com/news/11081.html

相關(guān)文章:

  • 慈溪高端網(wǎng)站設(shè)計(jì)百度貼吧怎么做推廣
  • 產(chǎn)品網(wǎng)站系統(tǒng)溫州最好的seo
  • 怎樣做網(wǎng)站的二維碼海豹直播nba
  • 完整的網(wǎng)站后臺(tái)權(quán)限編碼百度手機(jī)app
  • 怎么建手機(jī)網(wǎng)站鎮(zhèn)江關(guān)鍵字優(yōu)化公司
  • 用什么給網(wǎng)站做測(cè)試競(jìng)價(jià)惡意點(diǎn)擊犯法嗎
  • 搬瓦工512m內(nèi)存wordpress北京自動(dòng)seo
  • 沙田鎮(zhèn)做網(wǎng)站湖北seo診斷
  • 工信部 網(wǎng)站要獨(dú)立ip2023b站推廣大全
  • 速成網(wǎng)站-百度怎么推廣自己的視頻
  • 給公司申請(qǐng)網(wǎng)站用自己的賬號(hào)淮安百度推廣公司
  • .jsp網(wǎng)站開發(fā)技術(shù)疫情排行榜最新消息
  • 網(wǎng)站欄目變了怎么做跳轉(zhuǎn)競(jìng)價(jià)托管推廣公司
  • 網(wǎng)站推廣及seo方案網(wǎng)站怎么進(jìn)入
  • 網(wǎng)站建設(shè)課程設(shè)計(jì)sem托管公司
  • 做網(wǎng)站的畢業(yè)論文怎么寫網(wǎng)絡(luò)廣告營(yíng)銷典型案例
  • 攝影師的網(wǎng)站有哪些深圳百度seo代理
  • 互聯(lián)網(wǎng)站備案seo關(guān)鍵詞排名優(yōu)化案例
  • 有一個(gè)做場(chǎng)景動(dòng)畫的網(wǎng)站怎么提交百度收錄
  • 蘇州知名高端網(wǎng)站建設(shè)網(wǎng)絡(luò)公司上海seo優(yōu)化外包公司
  • 做網(wǎng)站語(yǔ)言最好梅州seo
  • 一個(gè)網(wǎng)站綁定多個(gè)域名 賣域名公司網(wǎng)站免費(fèi)自建
  • 網(wǎng)站建設(shè)與維護(hù)是做什么安卓系統(tǒng)優(yōu)化軟件
  • 完美代碼網(wǎng)站google play store
  • 做視頻解析網(wǎng)站代寫稿子的平臺(tái)
  • 懷柔高端網(wǎng)站建設(shè)win7系統(tǒng)優(yōu)化
  • 廈門it做網(wǎng)站最強(qiáng)本地推薦本地推薦
  • 保定網(wǎng)頁(yè)制作國(guó)外常用的seo站長(zhǎng)工具
  • 門業(yè)網(wǎng)站模板下載健康碼防疫核驗(yàn)一體機(jī)
  • 新手學(xué)做網(wǎng)站 cs6seo服務(wù)顧問(wèn)