中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當前位置: 首頁 > news >正文

合肥做網(wǎng)站優(yōu)化哪家好建立網(wǎng)站需要什么條件

合肥做網(wǎng)站優(yōu)化哪家好,建立網(wǎng)站需要什么條件,中鐵建設門戶網(wǎng)登錄咋進不去了,樂山 網(wǎng)站建設一、部署單點RocketMQ Docker 部署 RocketMQ (圖文并茂超詳細)_docker 部署rocketmq-CSDN博客 這個博主講的很好,可食用,替大家實踐了一遍 二、原理篇 為什么使用RocketMQ: 為什么選擇RocketMQ | RocketMQ 關于一些原理,感覺…

一、部署單點RocketMQ

Docker 部署 RocketMQ (圖文并茂超詳細)_docker 部署rocketmq-CSDN博客

這個博主講的很好,可食用,替大家實踐了一遍

二、原理篇

為什么使用RocketMQ:

為什么選擇RocketMQ | RocketMQ

關于一些原理,感覺官網(wǎng)講的也非常透徹

領域模型概述 | RocketMQ

還有一些功能特性:

普通消息 | RocketMQ

本文的實操篇只是講了發(fā)送普通消息

關于中間件對比,下面我之前有看過一些很好的文章:

Kafka、RabbitMQ、RocketMQ等消息中間件的對比_rabbimq rocket 差異-CSDN博客

rpc和zmq性能對比 rpc mq區(qū)別_mob6454cc70642f的技術博客_51CTO博客

RabbitMQ,RocketMQ,Kafka--區(qū)別/對比/選型_51CTO博客_rocketmq rabbitmq kafka選型

三、實操篇

先講講原理:

如果你需要不同業(yè)務,就需要不同消費者組,不要想著同一個消費者組可以通過訂閱不同主題達到不同業(yè)務,因為同一個消費者組內的功能必須是一致的,可以換個角度想,既然你是一個業(yè)務,一個業(yè)務就是一個主題嘛,你用不同的業(yè)務實現(xiàn),就多添加幾個消費者組,分別訂閱那個主題(業(yè)務),然后通過不同的Tag區(qū)分就行了,而且而且,不要想著說:一個消費者組一個主題通過不同Tag來區(qū)分,雖然我在剛剛學習的時候也這樣子想,結果踩了一天的坑,看了好多博客好文來理解,在文末也有關于為什么不能這樣子做。

1、引入依賴

RocketMQ的依賴:

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.3</version>
</dependency>

demo案例的全部依賴:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.bluefoxyu</groupId><artifactId>RocketMQ-demo</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>17</maven.compiler.source><maven.compiler.target>17</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>3.2.4</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.3</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.27</version></dependency><!-- https://mvnrepository.com/artifact/org.projectlombok/lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.20</version><scope>provided</scope></dependency><!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>2.0.16</version></dependency></dependencies></project>

2、啟動自動裝配

2.2.3 版本的RocketMQ 沒有適配 SpringBoot3,只適配SpringBoot2,所以需要自己去配置好自動裝配。可以參考我下面這篇文章:

Springboot3+自動裝配_springboot3自動裝配-CSDN博客

在項目中的 resources 目錄下創(chuàng)建 META-INF/spring 文件夾,并創(chuàng)建下面這個文件。

org.springframework.boot.autoconfigure.AutoConfiguration.imports

# RocketMQ 2.2.3 version does not adapt to SpringBoot3
org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration

3、配置application.yml

server:port: 8080spring:profiles:active: devrocketmq:name-server: xxx:9876 # NameServer 地址producer:group: rocketmq-v3-demo-sent-message-group_bluefoxyu # 全局發(fā)送者組定義send-message-timeout: 2000# 發(fā)送消息失敗時的重試次數(shù)。設置為 1 表示如果發(fā)送失敗,會再重試一次(總共嘗試兩次)。適用于同步發(fā)送消息失敗時的重試次數(shù)。retry-times-when-send-failed: 1# 異步發(fā)送失敗時的重試次數(shù)。設置為 1 表示在異步發(fā)送失敗時會再嘗試一次。適用于異步發(fā)送消息失敗時的重試次數(shù)。retry-times-when-send-async-failed: 1logging:level:com:bluefoxyu:producer: infoconsumer: infocontroller: info

4、啟動類

相比這個不必多說了。

RocketMQApplication:
package com.bluefoxyu;import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@Slf4j
@SpringBootApplication
public class RocketMQApplication {public static void main(String[] args) {SpringApplication.run(RocketMQApplication.class, args);}
}

5、編寫一個統(tǒng)一格式的消息對象

package com.bluefoxyu.message;import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;import java.io.Serial;
import java.io.Serializable;@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class GeneralMessageEvent implements Serializable {@Serialprivate static final long serialVersionUID = 1L;private String body;private String keys;}

上述實體類實現(xiàn)了Serializable接口,能夠正常被序列化或者反序列化。

6、生產(chǎn)者

編寫一個生產(chǎn)者,統(tǒng)一做好發(fā)送消息的一個模板,方便簡化接口實現(xiàn)發(fā)送消息的代碼編寫,顯得更加優(yōu)雅一點,說到發(fā)送消息,就需要知道發(fā)送到哪個主題,然后哪些消費者組去消費,然后還有每條消息的唯一標識key,唯一標識可以用uuid生成,也可以用redis生成一個增長的不重復的id,這里使用uuid簡化。

注意:如果你的項目里面只有一個消費者組,只有一個消費業(yè)務,這樣子是不需要傳Tag(過濾標簽)的,但是正常情況都會有多個消息隊列任務,下面提供兩種重載的方法。

code:

package com.bluefoxyu.producer;import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;/*** 封裝全體的消息生產(chǎn)者*/
@Slf4j
@Component
@RequiredArgsConstructor
public class GeneralMessageProducer {private final RocketMQTemplate rocketMQTemplate;/*** 發(fā)送普通消息** @param topic            消息發(fā)送主題,用于標識同一類業(yè)務邏輯的消息* @param keys             消息索引鍵,可根據(jù)關鍵字精確查找某條消息* @param messageSendEvent 普通消息發(fā)送事件,自定義對象,最終都會序列化為字符串* @return 消息發(fā)送 RocketMQ 返回結果*/public SendResult sendMessage(String topic, String keys, GeneralMessageEvent messageSendEvent) {SendResult sendResult;try {Message<?> message = MessageBuilder.withPayload(messageSendEvent).setHeader(MessageConst.PROPERTY_KEYS, keys).build();// 2000L 表示發(fā)送消息的超時時間為 2000 毫秒,即 2 秒sendResult = rocketMQTemplate.syncSend(topic,message,2000L);log.info("[普通消息] 消息發(fā)送結果:{},消息ID:{},消息Keys:{}", sendResult.getSendStatus(), sendResult.getMsgId(), keys);} catch (Throwable ex) {log.error("[普通消息] 消息發(fā)送失敗,消息體:{}", JSON.toJSONString(messageSendEvent), ex);throw ex;}return sendResult;}/*** 發(fā)送普通消息** @param topic            消息發(fā)送主題,用于標識同一類業(yè)務邏輯的消息* @param tag              消息的過濾標簽,消費者可通過Tag對消息進行過濾,僅接收指定標簽的消息。* @param keys             消息索引鍵,可根據(jù)關鍵字精確查找某條消息* @param messageSendEvent 普通消息發(fā)送事件,自定義對象,最終都會序列化為字符串* @return 消息發(fā)送 RocketMQ 返回結果*/public SendResult sendMessage(String topic, String tag, String keys, GeneralMessageEvent messageSendEvent) {SendResult sendResult;try {// 構建消息的 destination (主題和標簽)StringBuilder destinationBuilder = StrUtil.builder().append(topic);if (StrUtil.isNotBlank(tag)) {destinationBuilder.append(":").append(tag);  // 設置tag}Message<?> message = MessageBuilder.withPayload(messageSendEvent).setHeader(MessageConst.PROPERTY_KEYS, keys).setHeader(MessageConst.PROPERTY_TAGS, tag) // 設置消息的標簽.build();// 2000L 表示發(fā)送消息的超時時間為 2000 毫秒,即 2 秒sendResult = rocketMQTemplate.syncSend(destinationBuilder.toString(),message,2000L);log.info("[普通消息] 消息發(fā)送結果:{},消息ID:{},消息Keys:{}", sendResult.getSendStatus(), sendResult.getMsgId(), keys);} catch (Throwable ex) {log.error("[普通消息] 消息發(fā)送失敗,消息體:{}", JSON.toJSONString(messageSendEvent), ex);throw ex;}return sendResult;}}

7、定義一個constant

package com.bluefoxyu.constant;/*** RocketMQ 常量類* @author bluefoxyu*/
public class RocketMQConstant {/*** Group 消費者組定義*/public static final String GENERAL_MESSAGE_CONSUMER_GROUP = "general_message_consumer_group";public static final String MESSAGE_CONSUMER_GROUP_A = "message_consumer_group_A";public static final String MESSAGE_CONSUMER_GROUP_B = "message_consumer_group_B";public static final String MESSAGE_CONSUMER_GROUP_C = "message_consumer_group_C";/*** Topic 主題定義*/public static final String MESSAGE_TOPIC_1 = "message_topic_1";public static final String MESSAGE_TOPIC_2 = "message_topic_2";/*** Tag 標簽*/public static final String MESSAGE_TAG_A = "message_tag_A";public static final String MESSAGE_TAG_B = "message_tag_B";public static final String MESSAGE_TAG_C = "message_tag_C";}

8、多/單個消費者訂閱一個主題

1.實現(xiàn)消費者

這里需要實現(xiàn)監(jiān)聽的消息的實體類類型是什么,GeneralMessageEvent 是我們之前封裝的統(tǒng)一消息對象

implements RocketMQListener<GeneralMessageEvent>

在onMessage方法中,通過

JSON.toJSONString(message)

就可以拿到解析好的消息內容,也就是我們真正需要發(fā)送的消息,下面我編寫三個消費者來進行消費,不過綁定的都是同一個主題,類似負載均衡的功能,這里只用一個消費者也是一樣的,因為后續(xù)還需要測其他功能,所以這里我寫了三個消費者。

GeneralMessageConsumer1:
package com.bluefoxyu.consumer;import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;import static com.bluefoxyu.constant.RocketMQConstant.GENERAL_MESSAGE_CONSUMER_GROUP;
import static com.bluefoxyu.constant.RocketMQConstant.MESSAGE_TOPIC_1;/*** topic 對應主題* consumerGroup 指定消費的分組*/
@Slf4j
@Component
@RocketMQMessageListener(topic = MESSAGE_TOPIC_1,consumerGroup = GENERAL_MESSAGE_CONSUMER_GROUP // 相同的 consumerGroup
)
public class GeneralMessageConsumer1 implements RocketMQListener<GeneralMessageEvent>{@Overridepublic void onMessage(GeneralMessageEvent message) {log.info("[消費者GeneralMessageConsumer1] 接收到消息,消息體:{},消息:{}", JSON.toJSONString(message), JSON.toJSONString(message.getBody()));}}
GeneralMessageConsumer2:
package com.bluefoxyu.consumer;import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;import static com.bluefoxyu.constant.RocketMQConstant.GENERAL_MESSAGE_CONSUMER_GROUP;
import static com.bluefoxyu.constant.RocketMQConstant.MESSAGE_TOPIC_1;/*** topic 對應主題* consumerGroup 指定消費的分組*/
@Slf4j
@Component
@RocketMQMessageListener(topic = MESSAGE_TOPIC_1,consumerGroup = GENERAL_MESSAGE_CONSUMER_GROUP // 相同的 consumerGroup
)
public class GeneralMessageConsumer2 implements RocketMQListener<GeneralMessageEvent>{@Overridepublic void onMessage(GeneralMessageEvent message) {log.info("[消費者GeneralMessageConsumer2] 接收到消息,消息體:{},消息:{}", JSON.toJSONString(message), JSON.toJSONString(message.getBody()));}}
GeneralMessageConsumer3:
package com.bluefoxyu.consumer;import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;import static com.bluefoxyu.constant.RocketMQConstant.GENERAL_MESSAGE_CONSUMER_GROUP;
import static com.bluefoxyu.constant.RocketMQConstant.MESSAGE_TOPIC_1;/*** topic 對應主題* consumerGroup 指定消費的分組*/
@Slf4j
@Component
@RocketMQMessageListener(topic = MESSAGE_TOPIC_1,consumerGroup = GENERAL_MESSAGE_CONSUMER_GROUP // 相同的 consumerGroup
)
public class GeneralMessageConsumer3 implements RocketMQListener<GeneralMessageEvent> {@Overridepublic void onMessage(GeneralMessageEvent message) {log.info("[消費者GeneralMessageConsumer3] 接收到消息,消息體:{},消息:{}", JSON.toJSONString(message), JSON.toJSONString(message.getBody()));}}

2.編寫接口發(fā)送消息

發(fā)送消息需要發(fā)送這四要素:

  1. ?topic 主題?
  2. ?key 唯一標識
  3. message 需要發(fā)送的消息
package com.bluefoxyu.controller;import com.bluefoxyu.producer.GeneralMessageProducer;
import com.bluefoxyu.message.GeneralMessageEvent;
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.client.producer.SendResult;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.UUID;import static com.bluefoxyu.constant.RocketMQConstant.MESSAGE_TOPIC_1;@RestController
@RequiredArgsConstructor
public class controller {private final GeneralMessageProducer generalMessageDemoProduce;@PostMapping("/send/topic1/general-messageA")public String sendTopic1GeneralMessageA() {String keys = UUID.randomUUID().toString();GeneralMessageEvent generalMessageEvent = GeneralMessageEvent.builder().body("Message for A").keys(keys).build();SendResult sendResult = generalMessageDemoProduce.sendMessage(MESSAGE_TOPIC_1,keys,generalMessageEvent);// 返回發(fā)送成功的狀態(tài)名return sendResult.getSendStatus().name();}@PostMapping("/send/topic1/general-messageB")public String sendTopic1GeneralMessageB() {String keys = UUID.randomUUID().toString();GeneralMessageEvent generalMessageEvent = GeneralMessageEvent.builder().body("Message for B").keys(keys).build();SendResult sendResult = generalMessageDemoProduce.sendMessage(MESSAGE_TOPIC_1,keys,generalMessageEvent);return sendResult.getSendStatus().name();}@PostMapping("/send/topic1/general-messageC")public String sendTopic1GeneralMessageC() {String keys = UUID.randomUUID().toString();GeneralMessageEvent generalMessageEvent = GeneralMessageEvent.builder().body("Message for C").keys(keys).build();SendResult sendResult = generalMessageDemoProduce.sendMessage(MESSAGE_TOPIC_1,keys,generalMessageEvent);return sendResult.getSendStatus().name();}}

3.接口測試

準備這三個測試接口:

開始分別測試三個接口這里就不一一展示了。

看控制臺:

如結果消費了三次

9、測試多個消費者分別訂閱不同主題

如果相同消費組的三個消費者組分別訂閱不同主題,會怎么樣呢。修改的代碼如下,

當然,哈哈哈哈哈哈,就是消費不到消息(對于小白的我也被困擾了好久),由于是有問題的,代碼就不粘貼了【狗頭】。如下:

這里參考了一篇大佬的文章:

rocketmq問題匯總-一個consumerGroup只對應一個topic_org.apache.rocketmq.client.exception.mqbrokerexcep-CSDN博客

看完后悟了很多,大概意思就是一個消費者組中的職責應該是一致的,應該都去訂閱相同主題的,如果一個消費者訂閱了兩個主題,那么其他同組的消費者也應該訂閱那兩個主題,參考評論區(qū)這幾大佬的評論:

這個大佬就說的很透徹了:

10、一個消費者訂閱多個主題

在上面說了既然一個消費者可以訂閱多個主題,但是前提條件是同一個消費組中必須訂閱相同主題,那應該怎么實現(xiàn)呢。

直接給代碼:

package com.bluefoxyu.consumer;import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
import org.springframework.stereotype.Component;import java.util.List;/*** topic 對應主題* consumerGroup 指定消費的分組* RocketMQPushConsumerLifecycleListener 基礎改監(jiān)聽器可以監(jiān)聽多個主題*/
@Slf4j
@Component
@RocketMQMessageListener(topic = "",consumerGroup = "rocketmq-v2-demo-message-group_bluefoxyu" // 相同的 consumerGroup
)
public class GeneralMessageConsumer implements RocketMQListener<GeneralMessageEvent> , RocketMQPushConsumerLifecycleListener {// 由于修改成可以訂閱多個主題,在下面prepareStart就已經(jīng)消費了,onMessage就不會執(zhí)行了@Overridepublic void onMessage(GeneralMessageEvent message) {/*log.info("消費者GeneralMessageConsumer接收到消息,消息體:{}", JSON.toJSONString(message));System.out.println("General message = " + message);*/}@Overridepublic void prepareStart(DefaultMQPushConsumer consumer) {try {consumer.subscribe("rocketmq-demo-message_topic_general", "*");consumer.subscribe("rocketmq-demo-message_topic_TagA", "*");consumer.subscribe("rocketmq-demo-message_topic_TagB", "*");consumer.registerMessageListener((List<MessageExt> messages, ConsumeConcurrentlyContext context) -> {if (CollectionUtils.isNotEmpty(messages)) {for (MessageExt message : messages) {log.info("消費者GeneralMessageConsumer接收到消息,消費完成:消費主題為:{} , 消費的消息為:{}" ,message.getTopic() ,new String(message.getBody()));}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});} catch (MQClientException e) {log.error("消費失敗,異常消息為:{}",e.getMessage());}}}
package com.bluefoxyu.consumer;import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
import org.springframework.stereotype.Component;import java.util.List;/*** topic 對應主題* consumerGroup 指定消費的分組* RocketMQPushConsumerLifecycleListener 基礎改監(jiān)聽器可以監(jiān)聽多個主題*/
@Slf4j
@Component
@RocketMQMessageListener(topic = "",consumerGroup = "rocketmq-v2-demo-message-group_bluefoxyu" // 相同的 consumerGroup
)
public class TagAMessageConsumer implements RocketMQListener<GeneralMessageEvent> , RocketMQPushConsumerLifecycleListener {// 由于修改成可以訂閱多個主題,在下面prepareStart就已經(jīng)消費了,onMessage就不會執(zhí)行了@Overridepublic void onMessage(GeneralMessageEvent message) {/*log.info("消費者TagAMessageConsumer接收到消息,消息體:{}", JSON.toJSONString(message));System.out.println("tagA message = " + message);*/}@Overridepublic void prepareStart(DefaultMQPushConsumer consumer) {try {consumer.subscribe("rocketmq-demo-message_topic_general", "*");consumer.subscribe("rocketmq-demo-message_topic_TagA", "*");consumer.subscribe("rocketmq-demo-message_topic_TagB", "*");consumer.registerMessageListener((List<MessageExt> messages, ConsumeConcurrentlyContext context) -> {if (CollectionUtils.isNotEmpty(messages)) {for (MessageExt message : messages) {log.info("消費者TagAMessageConsumer接收到消息,消費完成:消費主題為:{} , 消費的消息為:{}" ,message.getTopic() ,new String(message.getBody()));}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});} catch (MQClientException e) {log.error("消費失敗,異常消息為:{}",e.getMessage());}}
}
package com.bluefoxyu.consumer;import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
import org.springframework.stereotype.Component;import java.util.List;/*** topic 對應主題* consumerGroup 指定消費的分組* RocketMQPushConsumerLifecycleListener 基礎改監(jiān)聽器可以監(jiān)聽多個主題*/
@Slf4j
@Component
@RocketMQMessageListener(topic = "",consumerGroup = "rocketmq-v2-demo-message-group_bluefoxyu" // 相同的 consumerGroup
)
public class TagBMessageConsumer implements RocketMQListener<GeneralMessageEvent> , RocketMQPushConsumerLifecycleListener {// 由于修改成可以訂閱多個主題,在下面prepareStart就已經(jīng)消費了,onMessage就不會執(zhí)行了@Overridepublic void onMessage(GeneralMessageEvent message) {/*log.info("消費者TagBMessageConsumer接收到消息,消息體:{}", JSON.toJSONString(message));System.out.println("tagB message = " + message);*/}@Overridepublic void prepareStart(DefaultMQPushConsumer consumer) {try {consumer.subscribe("rocketmq-demo-message_topic_general", "*");consumer.subscribe("rocketmq-demo-message_topic_TagA", "*");consumer.subscribe("rocketmq-demo-message_topic_TagB", "*");consumer.registerMessageListener((List<MessageExt> messages, ConsumeConcurrentlyContext context) -> {if (CollectionUtils.isNotEmpty(messages)) {for (MessageExt message : messages) {log.info("消費者TagBMessageConsumer接收到消息,消費完成:消費主題為:{} , 消費的消息為:{}" ,message.getTopic() ,new String(message.getBody()));}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});} catch (MQClientException e) {log.error("消費失敗,異常消息為:{}",e.getMessage());}}
}

分別測試三個接口:

參考這位大佬的博客:

rocketmq (消費者消費同一個消費組,不同的topic)_rocketmq一個消費組消費多個topic-CSDN博客

11、多個消費者組訂閱相同主題

這個業(yè)務經(jīng)常是有的,希望訂閱同一種業(yè)務,但是有不同的實現(xiàn),這時候就需要使用Tag過濾標簽來區(qū)分了。

1、實現(xiàn)消費者
MessageConsumerA:

package com.bluefoxyu.consumer;import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;import static com.bluefoxyu.constant.RocketMQConstant.*;/*** topic 對應主題* consumerGroup 指定消費的分組*/
@Slf4j
@Component
@RocketMQMessageListener(topic = MESSAGE_TOPIC_2,consumerGroup = MESSAGE_CONSUMER_GROUP_A,selectorExpression = MESSAGE_TAG_A
)
public class MessageConsumerA implements RocketMQListener<GeneralMessageEvent>{@Overridepublic void onMessage(GeneralMessageEvent message) {log.info("[消費者MessageConsumerA] 接收到消息,消息體:{},消息:{}", JSON.toJSONString(message), JSON.toJSONString(message.getBody()));}}
MessageConsumerB:

package com.bluefoxyu.consumer;import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;import static com.bluefoxyu.constant.RocketMQConstant.*;/*** topic 對應主題* consumerGroup 指定消費的分組*/
@Slf4j
@Component
@RocketMQMessageListener(topic = MESSAGE_TOPIC_2,consumerGroup = MESSAGE_CONSUMER_GROUP_B,selectorExpression = MESSAGE_TAG_B
)
public class MessageConsumerB implements RocketMQListener<GeneralMessageEvent>{@Overridepublic void onMessage(GeneralMessageEvent message) {log.info("[消費者MessageConsumerB] 接收到消息,消息體:{},消息:{}", JSON.toJSONString(message), JSON.toJSONString(message.getBody()));}}
MessageConsumerC:

package com.bluefoxyu.consumer;import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;import static com.bluefoxyu.constant.RocketMQConstant.*;/*** topic 對應主題* consumerGroup 指定消費的分組*/
@Slf4j
@Component
@RocketMQMessageListener(topic = MESSAGE_TOPIC_2,consumerGroup = MESSAGE_CONSUMER_GROUP_C,selectorExpression = MESSAGE_TAG_C
)
public class MessageConsumerC implements RocketMQListener<GeneralMessageEvent>{@Overridepublic void onMessage(GeneralMessageEvent message) {log.info("[消費者MessageConsumerC] 接收到消息,消息體:{},消息:{}", JSON.toJSONString(message), JSON.toJSONString(message.getBody()));}}

2、編寫接口發(fā)送消息

再controller添加那三個接口

    @PostMapping("/send/topic2/messageA")public String sendTopic2MessageA() {String keys = UUID.randomUUID().toString();GeneralMessageEvent generalMessageEvent = GeneralMessageEvent.builder().body("Message for A").keys(keys).build();SendResult sendResult = generalMessageDemoProduce.sendMessage(MESSAGE_TOPIC_2,MESSAGE_TAG_A,keys,generalMessageEvent);// 返回發(fā)送成功的狀態(tài)名return sendResult.getSendStatus().name();}@PostMapping("/send/topic2/messageB")public String sendTopic3MessageB() {String keys = UUID.randomUUID().toString();GeneralMessageEvent generalMessageEvent = GeneralMessageEvent.builder().body("Message for B").keys(keys).build();SendResult sendResult = generalMessageDemoProduce.sendMessage(MESSAGE_TOPIC_2,MESSAGE_TAG_B,keys,generalMessageEvent);return sendResult.getSendStatus().name();}@PostMapping("/send/topic2/messageC")public String sendTopic2MessageC() {String keys = UUID.randomUUID().toString();GeneralMessageEvent generalMessageEvent = GeneralMessageEvent.builder().body("Message for C").keys(keys).build();SendResult sendResult = generalMessageDemoProduce.sendMessage(MESSAGE_TOPIC_2,MESSAGE_TAG_C,keys,generalMessageEvent);return sendResult.getSendStatus().name();}
3、接口測試

消費成功!

也可以參考一個佬的博客:RocketMQ,同一個topic下是否可以通過不同的tag來進行訂閱嗎?_rocketmq一個topic多個tag-CSDN博客

當然,如果訂閱不同主題也是沒問題的,這里就不作演示了。

四、文末大佬好文

最后加上兩個大佬好文,感覺講的都很好:

面試官:RocketMQ同一個消費組內的消費者訂閱了不同tag,會有問題嗎?_rocketmq 訂閱多個tag-CSDN博客

面試官:RocketMQ一個消費組內訂閱同一個主題不同的TAG為什么會丟消息_為什么rocketmq相同消費組不同tag會有問題-CSDN博客

對于一個消費者組訂閱同一個主題不同tag會丟消息,在前幾天從0到1學習的時候,以為是可以的,但是踩了大坑。

http://www.risenshineclean.com/news/44937.html

相關文章:

  • pc網(wǎng)站建設方案有哪些seo綜合排名優(yōu)化
  • 許昌做網(wǎng)站漢獅網(wǎng)絡網(wǎng)站片區(qū)
  • 公司網(wǎng)站開發(fā)建設什么會計科目今日財經(jīng)最新消息
  • 手機網(wǎng)站建設價格低正規(guī)百度推廣
  • 深圳做網(wǎng)站推廣品牌推廣計劃書怎么寫
  • 綿陽市建設局官方網(wǎng)站軍事新聞俄烏最新消息
  • 網(wǎng)站建設費科目外貿(mào)推廣具體是做什么
  • 手機微信網(wǎng)站怎么做的好淘寶直通車推廣怎么收費
  • 佛山英文網(wǎng)建站國外引流推廣軟件
  • 網(wǎng)站首頁改版模板網(wǎng)站好還是自助建站好
  • 鄭州網(wǎng)站建設品牌好網(wǎng)站seo推廣招聘
  • 家庭網(wǎng)絡做網(wǎng)站網(wǎng)絡優(yōu)化工程師主要負責什么工作
  • 河北省網(wǎng)絡科技網(wǎng)站seo網(wǎng)站頁面優(yōu)化包含
  • 英國有哪些做折扣的網(wǎng)站網(wǎng)絡推廣整合平臺
  • 阿克蘇網(wǎng)站建設服務網(wǎng)站建設網(wǎng)絡公司
  • 網(wǎng)校網(wǎng)站建設方案優(yōu)化設計方法
  • 微網(wǎng)站自己怎么做seo排名優(yōu)化收費
  • 慶陽網(wǎng)站建設百度推廣在哪里
  • 廣州市網(wǎng)站建設公司在哪里企業(yè)郵箱域名
  • 四川政府網(wǎng)站建設管理辦法什么是網(wǎng)絡營銷的核心
  • 團購網(wǎng)站平臺建設什么公司適合做seo優(yōu)化
  • ??谧鼍W(wǎng)站的公司如何做好營銷
  • 新網(wǎng)站前期如何做seo怎么在百度免費推廣
  • 諸暨網(wǎng)站制作哪些公司制作西安網(wǎng)站制作推廣
  • 荊州網(wǎng)站建設seo門戶 site
  • 泰和網(wǎng)站制作網(wǎng)站空間
  • 什么網(wǎng)站建設最簡單騰訊3大外包公司
  • wordpress購買服務器百度seo搜索營銷新視角
  • 專門做app的網(wǎng)站內容營銷策略
  • 銅仁市網(wǎng)站建設情況上海百度推廣電話