中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁 > news >正文

用vue.js做網(wǎng)站視頻推廣

用vue.js做網(wǎng)站,視頻推廣,新開傳奇私服發(fā)布網(wǎng)站,環(huán)保網(wǎng)站設(shè)計(jì)一、Consumer 批量消費(fèi)(推模式) Consumer端先啟動 Consumer端后啟動. 正常情況下:應(yīng)該是Consumer需要先啟動 consumer.setConsumeMessageBatchMaxSize(10);//每次拉取10條 package quickstart; import java.util.List; import co…

一、Consumer 批量消費(fèi)(推模式)

Consumer端先啟動?

Consumer端后啟動. 正常情況下:應(yīng)該是Consumer需要先啟動

consumer.setConsumeMessageBatchMaxSize(10);//每次拉取10

package quickstart;

import java.util.List;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;

import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;

import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;

import com.alibaba.rocketmq.client.exception.MQClientException;

import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;

import com.alibaba.rocketmq.common.message.MessageExt;

/**

* Consumer,訂閱消息

*/

public class Consumer {

public static void main(String[] args) throws InterruptedException, MQClientException {

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");

consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");

consumer.setConsumeMessageBatchMaxSize(10);

/**

* 設(shè)置Consumer第一次啟動是從隊(duì)列頭部開始消費(fèi)還是隊(duì)列尾部開始消費(fèi)<br>

* 如果非第一次啟動,那么按照上次消費(fèi)的位置繼續(xù)消費(fèi) ,(消費(fèi)順序消息的時(shí)候設(shè)置)

*/

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.subscribe("TopicTest", "*");

consumer.registerMessageListener(new MessageListenerConcurrently() {

public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

try {

System.out.println("msgs的長度" + msgs.size());

System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);

} catch (Exception e) {

e.printStackTrace();

return ConsumeConcurrentlyStatus.RECONSUME_LATER;

}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

}

});由于這里是Consumer先啟動,所以他回去輪詢MQ上是否有訂閱隊(duì)列的消息,由于每次producer插入一條,Consumer就拿一條所以測試結(jié)果如下(每次size都是1

2、Consumer端后啟動,也就是Producer先啟動

由于這里是Consumer后啟動,所以MQ上也就堆積了一堆數(shù)據(jù),Consumer

1、Producer端重試

也就是ProducerMQ上發(fā)消息沒有發(fā)送成功,我們可以設(shè)置發(fā)送失敗重試的次數(shù),發(fā)送并觸發(fā)回調(diào)函數(shù)

2Consumer端重試

2.1、exception的情況,一般重復(fù)16?10s30s、1分鐘、2分鐘、3分鐘等等

上面的代碼中消費(fèi)異常的情況返回

return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重試

正常則返回:

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功

二、消息重試機(jī)制:消息重試分為2

1Producer端重試

2、Consumer端重試

consumer.start();

System.out.println("Consumer Started.");

}

}

consumer.setConsumeMessageBatchMaxSize(10);//每次拉取10

//設(shè)置重試的次數(shù)

producer.setRetryTimesWhenSendFailed(3);

//開啟生產(chǎn)者

producer.start();

//創(chuàng)建一條消息

Message msg = new Message("PushTopic", "push", "1", "我是一條普通消息".getBytes());

//發(fā)送消息

SendResult result = producer.send(msg);

//發(fā)送,并觸發(fā)回調(diào)函數(shù)

producer.send(msg, new SendCallback() {

@Override

//成功的回調(diào)函數(shù)

public void onSuccess(SendResult sendResult) {

System.out.println(sendResult.getSendStatus());

System.out.println("成功了");

}

@Override

//出現(xiàn)異常的回調(diào)函數(shù)

public void onException(Throwable e) {

System.out.println("失敗了"+e.getMessage());

}

});

package quickstart;

import java.util.List;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;

import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;

import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;

import com.alibaba.rocketmq.client.exception.MQClientException;

import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;

import com.alibaba.rocketmq.common.message.MessageExt;

/**

* Consumer,訂閱消息

*/

public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");

consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");

consumer.setConsumeMessageBatchMaxSize(10);

/**

* 設(shè)置Consumer第一次啟動是從隊(duì)列頭部開始消費(fèi)還是隊(duì)列尾部開始消費(fèi)<br>

* 如果非第一次啟動,那么按照上次消費(fèi)的位置繼續(xù)消費(fèi)

*/

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.subscribe("TopicTest", "*");

consumer.registerMessageListener(new MessageListenerConcurrently() {

public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

try {

// System.out.println("msgs的長度" + msgs.size());

System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);

for (MessageExt msg : msgs) {

String msgbody = new String(msg.getBody(), "utf-8");

if (msgbody.equals("Hello RocketMQ 4")) {

System.out.println("======錯(cuò)誤=======");

int a = 1 / 0;

}

}

} catch (Exception e) {

e.printStackTrace();

if(msgs.get(0).getReconsumeTimes()==3){

//記錄日志

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功

}else{

return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重試

}

}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功

}

});

consumer.start();

System.out.println("Consumer Started.");

}

}

假如超過了多少次之后我們可以讓他不再重試記錄 日志。

if(msgs.get(0).getReconsumeTimes()==3){

//記錄日志

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功

}

2.2超時(shí)的情況,這種情況MQ會無限制的發(fā)送給消費(fèi)端。

就是由于網(wǎng)絡(luò)的情況,MQ發(fā)送數(shù)據(jù)之后,Consumer端并沒有收到導(dǎo)致超時(shí)。也就是消費(fèi)端沒有給我返回return 任何狀態(tài),這樣的就認(rèn)為沒有到達(dá)Consumer端。

這里模擬Producer只發(fā)送一條數(shù)據(jù)。consumer端暫停1分鐘并且不發(fā)送接收狀態(tài)給MQ

package model;

import java.util.List;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;

import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;

import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;

import com.alibaba.rocketmq.client.exception.MQClientException;

import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;

import com.alibaba.rocketmq.common.message.MessageExt;

/**

* Consumer,訂閱消息

*/

public class Consumer {

public static void main(String[] args) throws InterruptedException, MQClientException {

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer");

consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");1、集群消費(fèi)

2、廣播消費(fèi)

rocketMQ默認(rèn)是集群消費(fèi),我們可以通過在Consumer來支持廣播消費(fèi)

三、消費(fèi)模式

consumer.setConsumeMessageBatchMaxSize(10);

/**

* 設(shè)置Consumer第一次啟動是從隊(duì)列頭部開始消費(fèi)還是隊(duì)列尾部開始消費(fèi)<br>

* 如果非第一次啟動,那么按照上次消費(fèi)的位置繼續(xù)消費(fèi)

*/

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.subscribe("TopicTest", "*");

consumer.registerMessageListener(new MessageListenerConcurrently() {

public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

try {

// 表示業(yè)務(wù)處理時(shí)間

System.out.println("=========開始暫停===============");

Thread.sleep(60000);

for (MessageExt msg : msgs) {

System.out.println(" Receive New Messages: " + msg);

}

} catch (Exception e) {

e.printStackTrace();

return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重試

}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功

}

});

consumer.start();

System.out.println("Consumer Started.");

}

}

consumer.setMessageModel(MessageModel.BROADCASTING);// 廣播消費(fèi)

package model;

import java.util.List;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;

import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;

import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;

import com.alibaba.rocketmq.client.exception.MQClientException;

import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;

import com.alibaba.rocketmq.common.message.MessageExt;

import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;

/**

* Consumer,訂閱消息

*/

public class Consumer2 {

public static void main(String[] args) throws InterruptedException, MQClientException {

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer");

consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");

consumer.setConsumeMessageBatchMaxSize(10);

consumer.setMessageModel(MessageModel.BROADCASTING);// 廣播消費(fèi)

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.subscribe("TopicTest", "*");

consumer.registerMessageListener(new MessageListenerConcurrently() {

public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

try {異步復(fù)制和同步雙寫主要是主和從的關(guān)系。消息需要實(shí)時(shí)消費(fèi)的,就需要采用主從模式部署

異步復(fù)制:比如這里有一主一從,我們發(fā)送一條消息到主節(jié)點(diǎn)之后,這樣消息就算從producer端發(fā)送成功了,然后通過異步復(fù)制的方法將數(shù)據(jù)復(fù)制到從節(jié)點(diǎn)

同步雙寫:比如這里有一主一從,我們發(fā)送一條消息到主節(jié)點(diǎn)之后,這樣消息就并不算從producer端發(fā)送成功了,需要通過同步雙寫的方法將數(shù)據(jù)同步到從節(jié)點(diǎn)后, 才算數(shù)據(jù)發(fā)

送成功。

如果rocketMq才用雙master部署,ProducerMQ上寫入20條數(shù)據(jù) 其中Master1中拉取了12條 。Master2中拉取了8 條,這種情況下,Master1宕機(jī),那么我們消費(fèi)數(shù)據(jù)的時(shí)

候,只能消費(fèi)到Master2中的8條,Master1中的12條默認(rèn)持久化,不會丟失消息,需要Master1恢復(fù)之后這12條數(shù)據(jù)才能繼續(xù)被消費(fèi),如果想保證消息實(shí)時(shí)消費(fèi),就才用雙

MasterSlave的模式

同步刷盤:在消息到達(dá)MQ后,RocketMQ需要將數(shù)據(jù)持久化,同步刷盤是指數(shù)據(jù)到達(dá)內(nèi)存之后,必須刷到commitlog日志之后才算成功,然后返回producer數(shù)據(jù)已經(jīng)發(fā)送成功。

異步刷盤:,同步刷盤是指數(shù)據(jù)到達(dá)內(nèi)存之后,返回producer說數(shù)據(jù)已經(jīng)發(fā)送成功。,然后再寫入commitlog日志。

commitlog

commitlog就是來存儲所有的元信息,包含消息體,類似于MySQLOracleredolog,所以主要有CommitLog在,Consume Queue即使數(shù)據(jù)丟失,仍然可以恢復(fù)出來。

consumequeue:記錄數(shù)據(jù)的位置,以便Consume快速通過consumequeue找到commitlog中的數(shù)據(jù)

當(dāng)生產(chǎn)者向Kafka發(fā)送消息,且正常得到響應(yīng)的時(shí)候,可以確保生產(chǎn)者不會產(chǎn)生重復(fù)的消息。但是,如果生產(chǎn)者發(fā)送消息后,遇到網(wǎng)絡(luò)問題,無法獲取響應(yīng),生產(chǎn)者就無法判斷該

消息是否成功提交給了Kafka。根據(jù)生產(chǎn)者的機(jī)制,我們知道,當(dāng)出現(xiàn)異常時(shí),會進(jìn)行消息重傳,這就可能出現(xiàn)“At least one”語義。為了實(shí)現(xiàn)“Exactly once”語義,這里提供兩個(gè)

可選方案:

如果業(yè)務(wù)數(shù)據(jù)產(chǎn)生消息可以找到合適的字段作為主鍵,或是有一個(gè)全局ID生成器,可以優(yōu)先考慮選用第二種方案。

為了實(shí)現(xiàn)消費(fèi)者的“Exactly once”語義,在這里提供一種方案,供讀者參考:消費(fèi)者將關(guān)閉自動提交offset的功能且不再手動提交offset,這樣就不使用Offsets Topic這個(gè)內(nèi)部

Topic記錄其offset,而是由消費(fèi)者自己保存offset。這里利用事務(wù)的原子性來實(shí)現(xiàn)“Exactly once”語義,我們將offset和消息處理結(jié)果放在一個(gè)事務(wù)中,事務(wù)執(zhí)行成功則認(rèn)為此消

息被消費(fèi),否則事務(wù)回滾需要重新消費(fèi)。當(dāng)出現(xiàn)消費(fèi)者宕機(jī)重啟或Rebalance操作時(shí),消費(fèi)者可以從關(guān)系型數(shù)據(jù)庫中找到對應(yīng)的offset,然后調(diào)用KafkaConsumer.seek()方法手

動設(shè)置消費(fèi)位置,從此offset處開始繼續(xù)消費(fèi)。

ISRIn-SyncReplica)集合表示的是目前可用alive)且消息量與Leader相差不多的副本集合,這是整個(gè)副本集合的一個(gè)子集。可用相差不多都是很模糊的描述,其實(shí)際

含義是ISR集合中的副本必須滿足下面兩個(gè)條件:

四、conf下的配置文件說明

五、刷盤方式

傳遞保證語義:

At most once:消息可能會丟,但絕不會重復(fù)傳遞。

At least once:消息絕不會丟,但可能會重復(fù)傳遞。

Exactly once: 每條消息只會被傳遞一次。

生產(chǎn)者的“Exactly once”語義方案

每個(gè)分區(qū)只有一個(gè)生產(chǎn)者寫入消息,當(dāng)出現(xiàn)異?;虺瑫r(shí)的情況時(shí),生產(chǎn)者就要查詢此分區(qū)的最后一個(gè)消息,用來決定后續(xù)操作是消息重傳還是繼續(xù)發(fā)送。

為每個(gè)消息添加一個(gè)全局唯一主鍵,生產(chǎn)者不做其他特殊處理,按照之前分析方式進(jìn)行重傳,由消費(fèi)者對消息進(jìn)行去重,實(shí)現(xiàn)“Exactly once”語義。

http://www.risenshineclean.com/news/2134.html

相關(guān)文章:

  • 全球最好的云服務(wù)器搜索引擎優(yōu)化的目的是對用戶友好
  • 廣州做網(wǎng)站多少錢西安百度seo推廣電話
  • 做網(wǎng)站網(wǎng)頁的工作怎么樣自制網(wǎng)站
  • 論壇網(wǎng)站地圖怎么做免費(fèi)seo工具
  • 免費(fèi)建立網(wǎng)站的軟件任務(wù)推廣引流平臺
  • 四川建設(shè)網(wǎng)站電子招標(biāo)網(wǎng)站建設(shè)策劃方案
  • dedecms做地方網(wǎng)站百度推廣電話是多少
  • 深圳建工建設(shè)集團(tuán)有限公司網(wǎng)站seo站群軟件
  • 視頻手機(jī)網(wǎng)站開發(fā)廣告營銷是做什么的
  • 如何做自己的網(wǎng)站后臺網(wǎng)絡(luò)營銷的方式與手段
  • 潘家園網(wǎng)站建設(shè)公司亞馬遜查關(guān)鍵詞排名工具
  • 桂林駿程網(wǎng)站建設(shè)seo優(yōu)化流程
  • 哪個(gè)網(wǎng)站財(cái)經(jīng)做的最好寧波seo資源
  • 移動網(wǎng)站開發(fā)百度百科常州seo收費(fèi)
  • 2017做網(wǎng)站賺錢短視頻平臺推廣方案
  • 重慶seo網(wǎng)站建設(shè)百度站長鏈接提交
  • 手機(jī)有軟件做ppt下載網(wǎng)站王通seo教程
  • 國內(nèi)做日化官方網(wǎng)站怎么推廣軟件
  • 網(wǎng)站開發(fā)自學(xué)還是培訓(xùn)市場營銷計(jì)劃
  • 網(wǎng)站開發(fā)會什么做網(wǎng)絡(luò)營銷推廣
  • sfda的網(wǎng)站的建設(shè)特點(diǎn)免費(fèi)推廣的網(wǎng)站平臺
  • 個(gè)人網(wǎng)站整站源碼下載阜新網(wǎng)絡(luò)推廣
  • 網(wǎng)站制作與維護(hù)公司seo營銷名詞解釋
  • 哪個(gè)網(wǎng)站做演唱會門票網(wǎng)絡(luò)推廣是做什么工作
  • 如何將項(xiàng)目發(fā)布到網(wǎng)上優(yōu)化關(guān)鍵詞首頁排行榜
  • 動態(tài)網(wǎng)站沒有數(shù)據(jù)庫怎么做百度推廣代運(yùn)營公司
  • 做日本暖暖小視頻網(wǎng)站seo服務(wù)內(nèi)容
  • 《網(wǎng)頁設(shè)計(jì)與網(wǎng)站建設(shè)》大作業(yè)要求關(guān)鍵詞愛站網(wǎng)關(guān)鍵詞挖掘工具
  • 想建立什么網(wǎng)站嗎關(guān)鍵詞調(diào)詞平臺哪個(gè)好
  • 網(wǎng)站做二級站全網(wǎng)網(wǎng)絡(luò)營銷