用vue.js做網(wǎng)站視頻推廣
一、Consumer 批量消費(fèi)(推模式)
Consumer端先啟動?
Consumer端后啟動. 正常情況下:應(yīng)該是Consumer需要先啟動
consumer.setConsumeMessageBatchMaxSize(10);//每次拉取10條
package quickstart;
import java.util.List;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
/**
* Consumer,訂閱消息
*/
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");
consumer.setConsumeMessageBatchMaxSize(10);
/**
* 設(shè)置Consumer第一次啟動是從隊(duì)列頭部開始消費(fèi)還是隊(duì)列尾部開始消費(fèi)<br>
* 如果非第一次啟動,那么按照上次消費(fèi)的位置繼續(xù)消費(fèi) ,(消費(fèi)順序消息的時(shí)候設(shè)置)
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
System.out.println("msgs的長度" + msgs.size());
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});由于這里是Consumer先啟動,所以他回去輪詢MQ上是否有訂閱隊(duì)列的消息,由于每次producer插入一條,Consumer就拿一條所以測試結(jié)果如下(每次size都是1)
2、Consumer端后啟動,也就是Producer先啟動
由于這里是Consumer后啟動,所以MQ上也就堆積了一堆數(shù)據(jù),Consumer的
1、Producer端重試
也就是Producer往MQ上發(fā)消息沒有發(fā)送成功,我們可以設(shè)置發(fā)送失敗重試的次數(shù),發(fā)送并觸發(fā)回調(diào)函數(shù)
2、Consumer端重試
2.1、exception的情況,一般重復(fù)16次?10s、30s、1分鐘、2分鐘、3分鐘等等
上面的代碼中消費(fèi)異常的情況返回
return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重試
正常則返回:
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
二、消息重試機(jī)制:消息重試分為2種
1、Producer端重試
2、Consumer端重試
consumer.start();
System.out.println("Consumer Started.");
}
}
consumer.setConsumeMessageBatchMaxSize(10);//每次拉取10條
//設(shè)置重試的次數(shù)
producer.setRetryTimesWhenSendFailed(3);
//開啟生產(chǎn)者
producer.start();
//創(chuàng)建一條消息
Message msg = new Message("PushTopic", "push", "1", "我是一條普通消息".getBytes());
//發(fā)送消息
SendResult result = producer.send(msg);
//發(fā)送,并觸發(fā)回調(diào)函數(shù)
producer.send(msg, new SendCallback() {
@Override
//成功的回調(diào)函數(shù)
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult.getSendStatus());
System.out.println("成功了");
}
@Override
//出現(xiàn)異常的回調(diào)函數(shù)
public void onException(Throwable e) {
System.out.println("失敗了"+e.getMessage());
}
});
package quickstart;
import java.util.List;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
/**
* Consumer,訂閱消息
*/
public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");
consumer.setConsumeMessageBatchMaxSize(10);
/**
* 設(shè)置Consumer第一次啟動是從隊(duì)列頭部開始消費(fèi)還是隊(duì)列尾部開始消費(fèi)<br>
* 如果非第一次啟動,那么按照上次消費(fèi)的位置繼續(xù)消費(fèi)
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
// System.out.println("msgs的長度" + msgs.size());
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
for (MessageExt msg : msgs) {
String msgbody = new String(msg.getBody(), "utf-8");
if (msgbody.equals("Hello RocketMQ 4")) {
System.out.println("======錯(cuò)誤=======");
int a = 1 / 0;
}
}
} catch (Exception e) {
e.printStackTrace();
if(msgs.get(0).getReconsumeTimes()==3){
//記錄日志
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
}else{
return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重試
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}
假如超過了多少次之后我們可以讓他不再重試記錄 日志。
if(msgs.get(0).getReconsumeTimes()==3){
//記錄日志
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
}
2.2超時(shí)的情況,這種情況MQ會無限制的發(fā)送給消費(fèi)端。
就是由于網(wǎng)絡(luò)的情況,MQ發(fā)送數(shù)據(jù)之后,Consumer端并沒有收到導(dǎo)致超時(shí)。也就是消費(fèi)端沒有給我返回return 任何狀態(tài),這樣的就認(rèn)為沒有到達(dá)Consumer端。
這里模擬Producer只發(fā)送一條數(shù)據(jù)。consumer端暫停1分鐘并且不發(fā)送接收狀態(tài)給MQ
package model;
import java.util.List;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
/**
* Consumer,訂閱消息
*/
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer");
consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");1、集群消費(fèi)
2、廣播消費(fèi)
rocketMQ默認(rèn)是集群消費(fèi),我們可以通過在Consumer來支持廣播消費(fèi)
三、消費(fèi)模式
consumer.setConsumeMessageBatchMaxSize(10);
/**
* 設(shè)置Consumer第一次啟動是從隊(duì)列頭部開始消費(fèi)還是隊(duì)列尾部開始消費(fèi)<br>
* 如果非第一次啟動,那么按照上次消費(fèi)的位置繼續(xù)消費(fèi)
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
// 表示業(yè)務(wù)處理時(shí)間
System.out.println("=========開始暫停===============");
Thread.sleep(60000);
for (MessageExt msg : msgs) {
System.out.println(" Receive New Messages: " + msg);
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重試
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}
consumer.setMessageModel(MessageModel.BROADCASTING);// 廣播消費(fèi)
package model;
import java.util.List;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
/**
* Consumer,訂閱消息
*/
public class Consumer2 {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer");
consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");
consumer.setConsumeMessageBatchMaxSize(10);
consumer.setMessageModel(MessageModel.BROADCASTING);// 廣播消費(fèi)
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {異步復(fù)制和同步雙寫主要是主和從的關(guān)系。消息需要實(shí)時(shí)消費(fèi)的,就需要采用主從模式部署
異步復(fù)制:比如這里有一主一從,我們發(fā)送一條消息到主節(jié)點(diǎn)之后,這樣消息就算從producer端發(fā)送成功了,然后通過異步復(fù)制的方法將數(shù)據(jù)復(fù)制到從節(jié)點(diǎn)
同步雙寫:比如這里有一主一從,我們發(fā)送一條消息到主節(jié)點(diǎn)之后,這樣消息就并不算從producer端發(fā)送成功了,需要通過同步雙寫的方法將數(shù)據(jù)同步到從節(jié)點(diǎn)后, 才算數(shù)據(jù)發(fā)
送成功。
如果rocketMq才用雙master部署,Producer往MQ上寫入20條數(shù)據(jù) 其中Master1中拉取了12條 。Master2中拉取了8 條,這種情況下,Master1宕機(jī),那么我們消費(fèi)數(shù)據(jù)的時(shí)
候,只能消費(fèi)到Master2中的8條,Master1中的12條默認(rèn)持久化,不會丟失消息,需要Master1恢復(fù)之后這12條數(shù)據(jù)才能繼續(xù)被消費(fèi),如果想保證消息實(shí)時(shí)消費(fèi),就才用雙
Master雙Slave的模式
同步刷盤:在消息到達(dá)MQ后,RocketMQ需要將數(shù)據(jù)持久化,同步刷盤是指數(shù)據(jù)到達(dá)內(nèi)存之后,必須刷到commitlog日志之后才算成功,然后返回producer數(shù)據(jù)已經(jīng)發(fā)送成功。
異步刷盤:,同步刷盤是指數(shù)據(jù)到達(dá)內(nèi)存之后,返回producer說數(shù)據(jù)已經(jīng)發(fā)送成功。,然后再寫入commitlog日志。
commitlog:
commitlog就是來存儲所有的元信息,包含消息體,類似于MySQL、Oracle的redolog,所以主要有CommitLog在,Consume Queue即使數(shù)據(jù)丟失,仍然可以恢復(fù)出來。
consumequeue:記錄數(shù)據(jù)的位置,以便Consume快速通過consumequeue找到commitlog中的數(shù)據(jù)
當(dāng)生產(chǎn)者向Kafka發(fā)送消息,且正常得到響應(yīng)的時(shí)候,可以確保生產(chǎn)者不會產(chǎn)生重復(fù)的消息。但是,如果生產(chǎn)者發(fā)送消息后,遇到網(wǎng)絡(luò)問題,無法獲取響應(yīng),生產(chǎn)者就無法判斷該
消息是否成功提交給了Kafka。根據(jù)生產(chǎn)者的機(jī)制,我們知道,當(dāng)出現(xiàn)異常時(shí),會進(jìn)行消息重傳,這就可能出現(xiàn)“At least one”語義。為了實(shí)現(xiàn)“Exactly once”語義,這里提供兩個(gè)
可選方案:
如果業(yè)務(wù)數(shù)據(jù)產(chǎn)生消息可以找到合適的字段作為主鍵,或是有一個(gè)全局ID生成器,可以優(yōu)先考慮選用第二種方案。
為了實(shí)現(xiàn)消費(fèi)者的“Exactly once”語義,在這里提供一種方案,供讀者參考:消費(fèi)者將關(guān)閉自動提交offset的功能且不再手動提交offset,這樣就不使用Offsets Topic這個(gè)內(nèi)部
Topic記錄其offset,而是由消費(fèi)者自己保存offset。這里利用事務(wù)的原子性來實(shí)現(xiàn)“Exactly once”語義,我們將offset和消息處理結(jié)果放在一個(gè)事務(wù)中,事務(wù)執(zhí)行成功則認(rèn)為此消
息被消費(fèi),否則事務(wù)回滾需要重新消費(fèi)。當(dāng)出現(xiàn)消費(fèi)者宕機(jī)重啟或Rebalance操作時(shí),消費(fèi)者可以從關(guān)系型數(shù)據(jù)庫中找到對應(yīng)的offset,然后調(diào)用KafkaConsumer.seek()方法手
動設(shè)置消費(fèi)位置,從此offset處開始繼續(xù)消費(fèi)。
ISR(In-SyncReplica)集合表示的是目前“可用”(alive)且消息量與Leader相差不多的副本集合,這是整個(gè)副本集合的一個(gè)子集。“可用”和“相差不多”都是很模糊的描述,其實(shí)際
含義是ISR集合中的副本必須滿足下面兩個(gè)條件:
四、conf下的配置文件說明
五、刷盤方式
傳遞保證語義:
At most once:消息可能會丟,但絕不會重復(fù)傳遞。
At least once:消息絕不會丟,但可能會重復(fù)傳遞。
Exactly once: 每條消息只會被傳遞一次。
生產(chǎn)者的“Exactly once”語義方案
每個(gè)分區(qū)只有一個(gè)生產(chǎn)者寫入消息,當(dāng)出現(xiàn)異?;虺瑫r(shí)的情況時(shí),生產(chǎn)者就要查詢此分區(qū)的最后一個(gè)消息,用來決定后續(xù)操作是消息重傳還是繼續(xù)發(fā)送。
為每個(gè)消息添加一個(gè)全局唯一主鍵,生產(chǎn)者不做其他特殊處理,按照之前分析方式進(jìn)行重傳,由消費(fèi)者對消息進(jìn)行去重,實(shí)現(xiàn)“Exactly once”語義。