怎么給網(wǎng)站做php后臺(tái)百度文庫賬號(hào)登錄入口
目錄
- 一、簡介
- 1.1、消費(fèi)模式
- 二、消費(fèi)者
- 2.1、maven依賴
- 2.2、application配置
- 2.3、消費(fèi)監(jiān)聽
- 三、生產(chǎn)者
- 3.1、發(fā)送消息
- 3.2、運(yùn)行結(jié)果
- 四、其他
一、簡介
??在之前的文章中,我們講過了,同步發(fā)送單條消息,異步發(fā)送單條消息,發(fā)送單向消息,發(fā)送順序消息,批量發(fā)送消息,事務(wù)消息,我們使用的模式都是 集群消費(fèi)模式(Cluster),本文就來講另外一種消息消費(fèi)模式,也就是廣播消費(fèi)模式(Broadcast)
1.1、消費(fèi)模式
??在 Apache RocketMQ 中,實(shí)現(xiàn)消息消費(fèi)的方式主要是兩種:
-
集群消費(fèi)模式(Cluster):
在集群消費(fèi)模式下,同一個(gè)消費(fèi)者組(Consumer Group)中的每個(gè)消費(fèi)者都會(huì)消費(fèi)消息的一個(gè)副本。消息會(huì)被分發(fā)到不同的消費(fèi)者實(shí)例上,但是同一個(gè)消息只會(huì)被同一個(gè)消費(fèi)者組中的一個(gè)消費(fèi)者消費(fèi)。 -
廣播消費(fèi)模式(Broadcast):
在廣播消費(fèi)模式下,同一個(gè)消費(fèi)者組中的每個(gè)消費(fèi)者都會(huì)收到消息的一個(gè)副本,即每個(gè)消費(fèi)者都會(huì)獨(dú)立地消費(fèi)消息。消息會(huì)被廣播到同一個(gè)消費(fèi)者組中的所有消費(fèi)者實(shí)例上。
??那么怎么使用廣播消費(fèi)模式呢?其實(shí)很簡單,通過在消費(fèi)者的 @RocketMQMessageListener 注解中設(shè)置 messageModel 參數(shù)為 MessageModel.BROADCASTING,即可將消費(fèi)者設(shè)置為廣播模式。在廣播模式下,同一個(gè)消費(fèi)者組中的每個(gè)消費(fèi)者都會(huì)收到消息的一個(gè)副本,每個(gè)消費(fèi)者都會(huì)獨(dú)立地消費(fèi)消息,從而實(shí)現(xiàn)了消息的廣播消費(fèi)。接下里看看具體操作吧。
二、消費(fèi)者
2.1、maven依賴
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>rocketmq</artifactId><groupId>com.alian</groupId><version>1.0.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>11-broadcasting-message-one</artifactId><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties></project>
2.2、application配置
application.properties
server.port=8011# rocketmq地址
rocketmq.name-server=192.168.0.234:9876
# 默認(rèn)的消費(fèi)者組
rocketmq.consumer.group=BROADCASTING_CONSUMER_GROUP
# 批量拉取消息的數(shù)量
rocketmq.consumer.pull-batch-size=10
# 廣播消費(fèi)模式
rocketmq.consumer.message-model=BROADCASTING
??實(shí)際上對(duì)于本文來說,下面兩個(gè)配置不用配置,也不會(huì)生效。
# 默認(rèn)的消費(fèi)者組
rocketmq.consumer.group=BROADCASTING_CONSUMER_GROUP
# 廣播消費(fèi)模式
rocketmq.consumer.message-model=BROADCASTING
??因?yàn)閮?yōu)先的是@RocketMQMessageListener 注解中設(shè)置 consumerGroup 和messageModel 參數(shù)。
2.3、消費(fèi)監(jiān)聽
??@RocketMQMessageListener是RocketMQ提供的注解,用于配置消費(fèi)者監(jiān)聽器的相關(guān)屬性。
package com.alian.broadcasting;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;@Slf4j
@Service
@RocketMQMessageListener(topic = "broadcasting_string_message_topic",consumerGroup = "BROADCASTING_CONSUMER_GROUP",messageModel = MessageModel.BROADCASTING)
public class StringMessageConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {log.info("第一個(gè)消費(fèi)者接收到的字符串消息: {}", message);// 處理消息的業(yè)務(wù)邏輯}
}
??關(guān)于這里@RocketMQMessageListener的參數(shù)做個(gè)簡單解釋:
- topic:必填,指定該消費(fèi)者訂閱的Topic名稱
- consumerGroup:必填,指定該消費(fèi)者所屬的消費(fèi)者組名稱,同一個(gè)組內(nèi)的消費(fèi)者實(shí)例通常進(jìn)行負(fù)載均衡消費(fèi)
- messageModel:設(shè)置消費(fèi)模式,取值范圍CLUSTERING(集群消費(fèi))、BROADCASTING(廣播消費(fèi))
MessageModel.java
public enum MessageModel {BROADCASTING("BROADCASTING"),CLUSTERING("CLUSTERING");private final String modeCN;MessageModel(String modeCN) {this.modeCN = modeCN;}public String getModeCN() {return this.modeCN;}
}
三、生產(chǎn)者
??生產(chǎn)者我就復(fù)用前面批量消息發(fā)送的模塊了
3.1、發(fā)送消息
@Slf4j
@SpringBootTest
public class SendBatchedBroadcastingMessageTest {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Testpublic void syncSendStringMessagesWithBuilder() {String topic = "broadcasting_string_message_topic";for (int i = 0; i < 10; i++) {String message = "廣播消息:" + i;Message<String> rocketMessage = MessageBuilder.withPayload(message).build();rocketMQTemplate.convertAndSend(topic, rocketMessage);}}@Testpublic void syncSendBatchStringMessagesWithBuilder() {String topic = "string_message_topic";String message = "批量廣播消息:";List<Message<String>> messageList = new ArrayList<>();for (int i = 0; i < 10; i++) {Message<String> rocketMessage = MessageBuilder.withPayload(message + i)// 設(shè)置消息類型.setHeader(MessageHeaders.CONTENT_TYPE, "text/plain").build();// 加入到列表messageList.add(rocketMessage);}// 使用syncSend發(fā)送批量消息SendResult sendResult = rocketMQTemplate.syncSend(topic, messageList);log.info("批量消息發(fā)送結(jié)果:{}",sendResult);}@AfterEachpublic void waiting() {try {Thread.sleep(3000L);} catch (InterruptedException e) {e.printStackTrace();}}}
我們先啟動(dòng)消費(fèi)者,然后生產(chǎn)者發(fā)送消息。
3.2、運(yùn)行結(jié)果
運(yùn)行結(jié)果:
[NSUMER_GROUP_12] c.a.broadcasting.StringMessageConsumer : 第一個(gè)消費(fèi)者接收到的字符串消息: 廣播消息:1
[NSUMER_GROUP_11] c.a.broadcasting.StringMessageConsumer : 第一個(gè)消費(fèi)者接收到的字符串消息: 廣播消息:0
[NSUMER_GROUP_13] c.a.broadcasting.StringMessageConsumer : 第一個(gè)消費(fèi)者接收到的字符串消息: 廣播消息:3
[NSUMER_GROUP_14] c.a.broadcasting.StringMessageConsumer : 第一個(gè)消費(fèi)者接收到的字符串消息: 廣播消息:2
[NSUMER_GROUP_15] c.a.broadcasting.StringMessageConsumer : 第一個(gè)消費(fèi)者接收到的字符串消息: 廣播消息:4
[NSUMER_GROUP_16] c.a.broadcasting.StringMessageConsumer : 第一個(gè)消費(fèi)者接收到的字符串消息: 廣播消息:5
[NSUMER_GROUP_17] c.a.broadcasting.StringMessageConsumer : 第一個(gè)消費(fèi)者接收到的字符串消息: 廣播消息:6
[NSUMER_GROUP_18] c.a.broadcasting.StringMessageConsumer : 第一個(gè)消費(fèi)者接收到的字符串消息: 廣播消息:7
[NSUMER_GROUP_19] c.a.broadcasting.StringMessageConsumer : 第一個(gè)消費(fèi)者接收到的字符串消息: 廣播消息:8
[NSUMER_GROUP_20] c.a.broadcasting.StringMessageConsumer : 第一個(gè)消費(fèi)者接收到的字符串消息: 廣播消息:9
[ONSUMER_GROUP_1] c.a.broadcasting.StringMessageConsumer : 第一個(gè)消費(fèi)者接收到的字符串消息: 批量廣播消息:0
[ONSUMER_GROUP_3] c.a.broadcasting.StringMessageConsumer : 第一個(gè)消費(fèi)者接收到的字符串消息: 批量廣播消息:2
[ONSUMER_GROUP_5] c.a.broadcasting.StringMessageConsumer : 第一個(gè)消費(fèi)者接收到的字符串消息: 批量廣播消息:4
[ONSUMER_GROUP_6] c.a.broadcasting.StringMessageConsumer : 第一個(gè)消費(fèi)者接收到的字符串消息: 批量廣播消息:5
[ONSUMER_GROUP_4] c.a.broadcasting.StringMessageConsumer : 第一個(gè)消費(fèi)者接收到的字符串消息: 批量廣播消息:3
[ONSUMER_GROUP_2] c.a.broadcasting.StringMessageConsumer : 第一個(gè)消費(fèi)者接收到的字符串消息: 批量廣播消息:1
[ONSUMER_GROUP_7] c.a.broadcasting.StringMessageConsumer : 第一個(gè)消費(fèi)者接收到的字符串消息: 批量廣播消息:6
[NSUMER_GROUP_10] c.a.broadcasting.StringMessageConsumer : 第一個(gè)消費(fèi)者接收到的字符串消息: 批量廣播消息:9
[ONSUMER_GROUP_8] c.a.broadcasting.StringMessageConsumer : 第一個(gè)消費(fèi)者接收到的字符串消息: 批量廣播消息:7
[ONSUMER_GROUP_9] c.a.broadcasting.StringMessageConsumer : 第一個(gè)消費(fèi)者接收到的字符串消息: 批量廣播消息:8[NSUMER_GROUP_11] c.a.broadcasting.StringMessageConsumer : 第二個(gè)消費(fèi)者接收到的字符串消息: 廣播消息:0
[NSUMER_GROUP_12] c.a.broadcasting.StringMessageConsumer : 第二個(gè)消費(fèi)者接收到的字符串消息: 廣播消息:1
[NSUMER_GROUP_13] c.a.broadcasting.StringMessageConsumer : 第二個(gè)消費(fèi)者接收到的字符串消息: 廣播消息:2
[NSUMER_GROUP_14] c.a.broadcasting.StringMessageConsumer : 第二個(gè)消費(fèi)者接收到的字符串消息: 廣播消息:3
[NSUMER_GROUP_15] c.a.broadcasting.StringMessageConsumer : 第二個(gè)消費(fèi)者接收到的字符串消息: 廣播消息:4
[NSUMER_GROUP_16] c.a.broadcasting.StringMessageConsumer : 第二個(gè)消費(fèi)者接收到的字符串消息: 廣播消息:5
[NSUMER_GROUP_17] c.a.broadcasting.StringMessageConsumer : 第二個(gè)消費(fèi)者接收到的字符串消息: 廣播消息:6
[NSUMER_GROUP_18] c.a.broadcasting.StringMessageConsumer : 第二個(gè)消費(fèi)者接收到的字符串消息: 廣播消息:7
[NSUMER_GROUP_19] c.a.broadcasting.StringMessageConsumer : 第二個(gè)消費(fèi)者接收到的字符串消息: 廣播消息:8
[NSUMER_GROUP_20] c.a.broadcasting.StringMessageConsumer : 第二個(gè)消費(fèi)者接收到的字符串消息: 廣播消息:9
[ONSUMER_GROUP_5] c.a.broadcasting.StringMessageConsumer : 第二個(gè)消費(fèi)者接收到的字符串消息: 批量廣播消息:4
[ONSUMER_GROUP_7] c.a.broadcasting.StringMessageConsumer : 第二個(gè)消費(fèi)者接收到的字符串消息: 批量廣播消息:6
[ONSUMER_GROUP_3] c.a.broadcasting.StringMessageConsumer : 第二個(gè)消費(fèi)者接收到的字符串消息: 批量廣播消息:2
[ONSUMER_GROUP_4] c.a.broadcasting.StringMessageConsumer : 第二個(gè)消費(fèi)者接收到的字符串消息: 批量廣播消息:3
[ONSUMER_GROUP_8] c.a.broadcasting.StringMessageConsumer : 第二個(gè)消費(fèi)者接收到的字符串消息: 批量廣播消息:7
[ONSUMER_GROUP_9] c.a.broadcasting.StringMessageConsumer : 第二個(gè)消費(fèi)者接收到的字符串消息: 批量廣播消息:8
[ONSUMER_GROUP_2] c.a.broadcasting.StringMessageConsumer : 第二個(gè)消費(fèi)者接收到的字符串消息: 批量廣播消息:1
[ONSUMER_GROUP_1] c.a.broadcasting.StringMessageConsumer : 第二個(gè)消費(fèi)者接收到的字符串消息: 批量廣播消息:0
[NSUMER_GROUP_10] c.a.broadcasting.StringMessageConsumer : 第二個(gè)消費(fèi)者接收到的字符串消息: 批量廣播消息:9
[ONSUMER_GROUP_6] c.a.broadcasting.StringMessageConsumer : 第二個(gè)消費(fèi)者接收到的字符串消息: 批量廣播消息:5
四、其他
??RocketMQ 通過消費(fèi)者組(Consumer Group)來維護(hù)不同消費(fèi)者的消費(fèi)進(jìn)度。每個(gè)消費(fèi)者組都有一個(gè)消費(fèi)進(jìn)度(offset),用于標(biāo)記該組下的消費(fèi)者在某個(gè)主題(Topic)和隊(duì)列(Queue)上已經(jīng)消費(fèi)到的位置。所以:不同的消費(fèi)者組會(huì)被視為不同的消費(fèi)者;如果消費(fèi)者重啟或重新加入組,就能從對(duì)應(yīng)Queue的offset處繼續(xù)消費(fèi)。
??不過使用廣播消費(fèi)模式時(shí),Consumer Group 的概念基本上沒有作用,因?yàn)槊總€(gè)消費(fèi)者實(shí)例都會(huì)獨(dú)立地收到消息的一個(gè)副本。在廣播模式下,同一個(gè)消費(fèi)者組中的每個(gè)消費(fèi)者都會(huì)收到消息的一個(gè)副本,每個(gè)消費(fèi)者都會(huì)獨(dú)立地消費(fèi)消息,而不像集群消費(fèi)模式中那樣,一個(gè)消費(fèi)者組中的消費(fèi)者會(huì)共同消費(fèi)消息。
??廣播消費(fèi)模式在RocketMQ中最好的好處就是消費(fèi)者解耦:不同的消費(fèi)者可以獨(dú)立消費(fèi)消息,相互之間不受影響,提高了系統(tǒng)的擴(kuò)展性,它的適用場景有:
- 日志收集 - 需要將日志數(shù)據(jù)分發(fā)給多個(gè)日志收集系統(tǒng),每個(gè)系統(tǒng)都需要收到全量日志。
- 數(shù)據(jù)備份 - 實(shí)時(shí)備份數(shù)據(jù)到多個(gè)存儲(chǔ)系統(tǒng),確保數(shù)據(jù)有冗余副本。
- 信息推送 - 向多個(gè)推送通道投遞并發(fā)送消息通知,如站內(nèi)信、短信、Push等。
- 狀態(tài)同步 - 將數(shù)據(jù)變更實(shí)時(shí)同步到集群的所有節(jié)點(diǎn),保證集群節(jié)點(diǎn)狀態(tài)一致。
- 負(fù)載均衡 - 將任務(wù)或請(qǐng)求廣播給所有服務(wù)實(shí)例,由每個(gè)實(shí)例獨(dú)立處理,實(shí)現(xiàn)負(fù)載分擔(dān)。
- 監(jiān)控告警 - 將系統(tǒng)監(jiān)控?cái)?shù)據(jù)廣播給多個(gè)監(jiān)控系統(tǒng),多視角分析。