泉州做網(wǎng)站建設淘寶客怎么做推廣
rabbitmq-spring-boot-start配置使用手冊
文章目錄
- 1.yaml配置如下
- 2.引入pom依賴如下
- 2.1 引入項目resources下libs中的jar包依賴如下
- 2.2引入maven私服依賴如下
- 3.啟動類配置如下
- 4.項目中測試發(fā)送消息如下
- 5.項目中消費消息代碼示例
- 6.mq管理后臺交換機隊列創(chuàng)建及路由綁定關系如下
1.yaml配置如下
?? rps中的每一個都可以按照Sping官方的RabbitAutoConfiguration自動裝配的RabbitProperties的樣式來配置,做到了實現(xiàn)配置多個rabbitMq服務器配置和一個rabbitMq服務器下可以配置多個不同類型的交換機和隊列進行綁定,還實現(xiàn)了普通隊列發(fā)送消息、
? ? 一:延遲插件實現(xiàn)延遲隊列
? ? ? ? 交換機類型必須CustomExchange
? ? 二:TTL + 死信隊列/延遲交換機實現(xiàn)延遲隊列
? ? 三: 延遲交換機 + 消息設置setHeader(“x-delay”, xxx)
? ? 以下配置了兩個不同的rabbitMq服務器,每一個rabbitMq服務器對應多個隊列,還配置了兩個相同的rabbitMq服務器,只不過兩個相同的rabbitMq服務器有不同的交換機,都是一個rabbitMq服務器可以對應相同的一套配置,代碼功能服用性強,也方便快捷
## 配置需要保證唯一不重復(eqps中的每一的index唯一,一般配置成遞增的,隊列交換機綁定關系的bean注入都是根據(jù)rps的List下標+eqps中index下標注入保證了唯一性)
zlf:rabbit:rps:## 如果virtual-host不同,在配置一個即可,addresses不同也是可以在配置,eqps的下標以之對應上即可- rabbitmq:virtual-host: /dyict-uataddresses: 192.168.40.61port: 5672username: "admin"password: "admin"- rabbitmq:virtual-host: /testaddresses: 192.168.40.60port: 5672username: "admin"password: "admin"- rabbitmq:virtual-host: /test2addresses: 192.168.40.60port: 5672username: "admin"password: "admin"eqps:## 下標遞增且唯一- index: 0eqs:- function-type: Delaydelay-type: 1exchange-type: customexchange-name: zlf.delay.test1queue-name: delay.test1routing-key: delay.test1.keyexchange-args:x-delayed-type: directqueue-args: {}- function-type: Normaldelay-type: 0exchange-type: directexchange-name: zlf.normal.test1queue-name: normal.test1routing-key: normal.test1.keyexchange-args: {}queue-args: {}- function-type: Delaydelay-type: 2exchange-type: directexchange-name: zlf.delay.test2queue-name: delay.test2## 不用監(jiān)聽正常的隊列,直接根據(jù)同一個路由鍵去路由,然后監(jiān)聽死信隊列routing-key: zlf.delay-test2-keydlx-exchange-name: zlf.dlx-test1dlx-exchange-type: directdlx-queue-name: dlx-test1dlx-key: zlf.dlx-test1-keyexchange-args: {}queue-args:x-dead-letter-exchange: zlf.dlx-test1x-dead-letter-routing-key: zlf.dlx-test1-key## 單位毫秒 30sx-message-ttl: 30000- function-type: Delaydelay-type: 3exchange-type: directexchange-name: zlf.delay.test3queue-name: delay.test3routing-key: zlf.delay-test3-keyexchange-args: {}queue-args: {}- index: 1eqs:- function-type: Delaydelay-type: 1exchange-type: customexchange-name: zlf.delay.test1queue-name: delay.test1routing-key: delay.test1.keyexchange-args:x-delayed-type: directqueue-args: {}- function-type: Normaldelay-type: 0exchange-type: directexchange-name: zlf.normal.test1queue-name: normal.test1routing-key: normal.test1.keyexchange-args: {}queue-args: {}- function-type: Delaydelay-type: 2exchange-type: directexchange-name: zlf.delay.test2queue-name: delay.test2## 不用監(jiān)聽正常的隊列,直接根據(jù)同一個路由鍵去路由,然后監(jiān)聽死信隊列routing-key: zlf.delay-test2-keydlx-exchange-name: zlf.dlx-test1dlx-exchange-type: directdlx-queue-name: dlx-test1dlx-key: zlf.dlx-test1-keyexchange-args: {}queue-args:x-dead-letter-exchange: zlf.dlx-test1x-dead-letter-routing-key: zlf.dlx-test1-key## 單位毫秒 30sx-message-ttl: 30000- function-type: Delaydelay-type: 3exchange-type: directexchange-name: zlf.delay.test3queue-name: delay.test3routing-key: zlf.delay-test3-keyexchange-args: {}queue-args: {}- index: 2eqs:- function-type: Delaydelay-type: 1exchange-type: customexchange-name: zlf.delay.test1queue-name: delay.test1routing-key: delay.test1.keyexchange-args:x-delayed-type: directqueue-args: {}- function-type: Normaldelay-type: 0exchange-type: directexchange-name: zlf.normal.test1queue-name: normal.test1routing-key: normal.test1.keyexchange-args: {}queue-args: {}- function-type: Delaydelay-type: 2exchange-type: directexchange-name: zlf.delay.test2queue-name: delay.test2## 不用監(jiān)聽正常的隊列,直接根據(jù)同一個路由鍵去路由,然后監(jiān)聽死信隊列routing-key: zlf.delay-test2-keydlx-exchange-name: zlf.dlx-test1dlx-exchange-type: directdlx-queue-name: dlx-test1dlx-key: zlf.dlx-test1-keyexchange-args: {}queue-args:x-dead-letter-exchange: zlf.dlx-test1x-dead-letter-routing-key: zlf.dlx-test1-key## 單位毫秒 30sx-message-ttl: 30000- function-type: Delaydelay-type: 3exchange-type: directexchange-name: zlf.delay.test3queue-name: delay.test3routing-key: zlf.delay-test3-keyexchange-args: {}queue-args: {}
2.引入pom依賴如下
2.1 引入項目resources下libs中的jar包依賴如下
? ? 右鍵點擊rabbitmq-spring-boot-start-1.0-SNAPSHOT.jar將該jar包手動導入(add as Library),復制該jar包到resources下libs,若果maven自動導入就不用右鍵手動導入
<dependency><groupId>org.zlf</groupId><artifactId>rabbitmq-spring-boot-start</artifactId><version>1.0-SNAPSHOT</version><scope>system</scope><systemPath>${pom.basedir}/src/main/resources/libs/rabbitmq-spring-boot-start-1.0-SNAPSHOT.jar</systemPath>
</dependency>
2.2引入maven私服依賴如下
<dependency><groupId>org.zlf</groupId><artifactId>rabbitmq-spring-boot-start</artifactId><version>1.0-SNAPSHOT</version>
</dependency>
3.啟動類配置如下
@EnableZlfRabbitMq
@Import(value = {RabbitService.class, ZlfMqSpringUtils.class})
@SpringBootApplication(exclude = {RabbitAutoConfiguration.class})
4.項目中測試發(fā)送消息如下
? ? Controller測試可以根據(jù)rps的下標 + eqps的index下標來復制多個Controlle類,只需要調用api設置這兩個下標對應解析即可發(fā)送
rabbitService.sendMsg6(0, 0, msg);
rabbitService.sendMsg6(1, 0, msg);
rabbitService.sendMsg6(2, 0, msg);
,,,,,,,,,,
上面配置了三個下標組合就有以下幾種:
0 0 / 01 /11
1 0 / 11 /12
2 0 / 21 / 22
可以復制Controller1、Controller2、Controller3,,,,,,,調用時候只需要指定下標組合即可發(fā)送消息
package xxxx.controller;import com.dy.corporate.member.utils.SpringUtils;
import com.zlf.constants.ZlfMqRegistrarBeanNamePrefix;
import com.zlf.service.RabbitService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@Slf4j
@RestController
@RequestMapping("rabbit")
public class RabbitMqTestController {@Autowiredprivate RabbitService rabbitService;@GetMapping("/sendDelayMsg")public String sendDelayMsg(@RequestParam(value = "msg") String msg) {log.info("sendDelayMsg.msg:{}", msg);RabbitTemplate rabbitTemplate = (RabbitTemplate) SpringUtils.getBean(ZlfMqRegistrarBeanNamePrefix.rabbitTemplatePrefix + 0);rabbitService.sendDelayed(rabbitTemplate, "zlf.delay.test1", "delay.test1.key", msg, 10);return "ok";}@GetMapping("/sendDelayMsg2")public String sendDelayMsg2(@RequestParam(value = "msg") String msg) {log.info("sendDelayMsg2.msg:{}", msg);rabbitService.sendMsg6(0, 0, msg);return "ok";}@GetMapping("/sendNormalMsg")public String sendNormalMsg(@RequestParam(value = "msg") String msg) {log.info("sendNormalMsg.msg:{}", msg);RabbitTemplate rabbitTemplate = (RabbitTemplate) SpringUtils.getBean(ZlfMqRegistrarBeanNamePrefix.rabbitTemplatePrefix + 0);rabbitService.sendMsg(rabbitTemplate, "zlf.delay.test1", "delay.test1.key", msg);return "ok";}@GetMapping("/sendNormalMsg2")public String sendNormalMsg2(@RequestParam(value = "msg") String msg) {log.info("sendNormalMsg2.msg:{}", msg);rabbitService.sendMsg6(0, 1, msg);return "ok";}@GetMapping("/sendDelayDlxMsg")public String sendDelayDlxMsg(@RequestParam(value = "msg") String msg) {log.info("sendDelayDlxMsg.msg:{}", msg);RabbitTemplate rabbitTemplate = (RabbitTemplate) SpringUtils.getBean(ZlfMqRegistrarBeanNamePrefix.rabbitTemplatePrefix + 0);//正常發(fā)延遲交換機和延遲隊列的路由鍵rabbitService.sendDelayed(rabbitTemplate, "zlf.delay.test2", "zlf.delay-test2-key", msg, 10);return "ok";}@GetMapping("/sendDelayDlxMsg2")public String sendDelayDlxMsg2(@RequestParam(value = "msg") String msg) {log.info("sendDelayDlxMsg2.msg:{}", msg);//正常發(fā)延遲交換機和延遲隊列的路由鍵rabbitService.sendDelayed6(0, 2, msg,10);return "ok";}@GetMapping("/sendDelayMsg3")public String sendDelayMsg3(@RequestParam(value = "msg") String msg) {log.info("sendDelayMsg3.msg:{}", msg);rabbitService.sendDelayed6(0, 3, msg, 10);return "ok";}}
項目utils下放入SpringUtils類:
package xxx.utils;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;/*** @author zlf* @description spring上下文工具類* @date 2024/03/11**/
@Component
public class SpringUtils implements ApplicationContextAware {private static final Logger logger = LoggerFactory.getLogger(SpringUtils.class);private static ApplicationContext applicationContext;@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {logger.info("應用程序上下文 : [{}]", "開始初始化");SpringUtils.applicationContext = applicationContext;logger.info("應用程序上下文 : [{}]", "初始化完成");}/*** 獲取applicationContext*發(fā)給* @return*/public static ApplicationContext getApplicationContext() {return applicationContext;}/*** 通過name獲取 Bean.** @param name* @return*/public static Object getBean(String name) {return getApplicationContext().getBean(name);}/*** 通過class獲取Bean.** @param clazz* @param <T>* @return*/public static <T> T getBean(Class<T> clazz) {return getApplicationContext().getBean(clazz);}/*** 通過name,以及Clazz返回指定的Bean** @param name* @param clazz* @param <T>* @return*/public static <T> T getBean(String name, Class<T> clazz) {return getApplicationContext().getBean(name, clazz);}}
以上測試用例MqConsumer都是可以正常消費到對應隊列中的消息的
5.項目中消費消息代碼示例
? ? 消費者中只需要指定對應的消費監(jiān)聽工廠即可,監(jiān)聽工廠配置如下:
ZlfMqRegistrarBeanNamePrefix.simpleRabbitListenerContainerFactory + rps的下標
可以復制多個MqConsumer1,MqConsumer2,,,,,,,,,
? ? 然后指定對應的監(jiān)聽工廠配置下標即可,經(jīng)過測試上面三個配置發(fā)送4中發(fā)送消息,監(jiān)聽消費都是正常的
package xxx.listener;import com.rabbitmq.client.Channel;
import com.zlf.constants.ZlfMqRegistrarBeanNamePrefix;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;/*** 手動ack業(yè)務demo* long deliveryTag = message.getMessageProperties().getDeliveryTag();* try {* int a = 1/0;* User user = JSONObject.parseObject(userStr,User.class);* log.info(user.toString());* //手動ack 第二個參數(shù)為false是表示僅僅確認當前消息 true表示確認之前所有的消息* channel.basicAck(deliveryTag,false);* } catch (Exception e) {* //手動nack 告訴rabbitmq該消息消費失敗 第三個參數(shù):如果被拒絕的消息應該被重新請求,而不是被丟棄或變成死信,則為true* try {* channel.basicNack(deliveryTag,false,false);* } catch (IOException ex) {* throw new RuntimeException("消息處理失敗");* }* }* //channel.basicNack(); 不ack* //channel.basicReject(); 拒絕*/
@Slf4j
@Component
public class MqConsumer {/*** 延遲插件實現(xiàn)延遲隊列監(jiān)聽隊列消息** @param message* @param channel* @throws IOException*/@RabbitHandler@RabbitListener(queues = "delay.test1", containerFactory = ZlfMqRegistrarBeanNamePrefix.simpleRabbitListenerContainerFactory + 0)public void mqConsumer1(Message message, Channel channel) throws IOException {String msg = new String(message.getBody(), "UTF-8");try {log.info("mqConsumer1=====>msg:{}", msg);} catch (Exception e) {log.error("mqConsumer1消費異常:{}", e.getMessage());} finally {channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);}//channel.basicNack(); 不ack//channel.basicReject(); 拒絕}/*** 普通隊列監(jiān)聽隊列消息** @param message* @param channel* @throws IOException*/@RabbitHandler@RabbitListener(queues = "normal.test1", containerFactory = ZlfMqRegistrarBeanNamePrefix.simpleRabbitListenerContainerFactory + 0)public void mqConsumer2(Message message, Channel channel) throws IOException {String msg = new String(message.getBody(), "UTF-8");try {log.info("mqConsumer2=====>msg:{}", msg);} catch (Exception e) {log.error("mqConsumer2消費異常:{}", e.getMessage());} finally {channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);}}/*** TTL + 死信隊列實現(xiàn)延遲隊列監(jiān)聽延遲隊列消息(此處省略)* 綁定的那個延遲隊列,消息如果正常消費,則不會將消息投遞到死信隊列上,* 只有消息變成死信才會被投遞到死信隊列上*//*** TTL + 死信隊列實現(xiàn)延遲隊列監(jiān)聽死信隊列消息* 成為死信的條件* * 1.隊列消息長度到達限制。* * 2.消費者拒接消費消息,basicNack/basicReject,并且不把消息重新放入原目標隊列,requeue=false。* * 3.原隊列存在消息過期設置,消息到達超時時間未被消費。** @param message* @param channel* @throws IOException*/@RabbitHandler@RabbitListener(//監(jiān)聽連接工程指定containerFactory = ZlfMqRegistrarBeanNamePrefix.simpleRabbitListenerContainerFactory + 0,bindings = @QueueBinding(//延遲交換機exchange = @Exchange(value = "zlf.delay.test2",//持久化參數(shù)設置durable = "true",//交換機類型指定type = ExchangeTypes.DIRECT),//延遲交換機路由延遲隊列的keykey = "zlf.delay-test2-key",//死信隊列value = @Queue(value = "dlx-test1",//持久化參數(shù)設置durable = "true"//, //autoDelete = "false",)//ignoreDeclarationExceptions = "true")//,//concurrency = "1", // 指定監(jiān)聽該隊列的消費者個數(shù)//ackMode = "MANUAL"// 手動ack)public void mqConsumer4(Message message, Channel channel) throws IOException {String msg = new String(message.getBody(), "UTF-8");try {log.info("mqConsumer4=====>msg:{}", msg);} catch (Exception e) {log.error("mqConsumer4:{}", e.getMessage());} finally {channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);}}/*** 延遲交換機 + 消息設置setHeader("x-delay", xxx)** @param message* @param channel* @throws IOException*/@RabbitHandler@RabbitListener(queues = "delay.test3", containerFactory = ZlfMqRegistrarBeanNamePrefix.simpleRabbitListenerContainerFactory + 0)public void mqConsumer5(Message message, Channel channel) throws IOException {String msg = new String(message.getBody(), "UTF-8");try {log.info("mqConsumer5=====>msg:{}", msg);} catch (Exception e) {log.error("mqConsumer5消費異常:{}", e.getMessage());} finally {channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);}//channel.basicNack(); 不ack//channel.basicReject(); 拒絕}}
6.mq管理后臺交換機隊列創(chuàng)建及路由綁定關系如下
? ? 以下交換機和隊列定義的時候都是持久化的,上面三個配置示都在rabbitMq的管理后臺生成了相同的隊列、交換機和綁定關系(唯一不一樣的是錯誤對了是根據(jù)rps的List的下標來的,保證唯一),以下截圖只展示一個即rps[0]的交換機、隊列和綁定關系的創(chuàng)建,其它兩個都是一樣的(唯一不同就是錯誤隊列交換機是后綴角標是遞增的)。
zlf.delay.test1交換機(延遲插件)
zlf.delay.test1交換和delay.test1隊列的綁定關系:
zlf.normal.test1交換機(普通交換機)
zlf.normal.test1交換機和normal.test1的綁定關系:
zlf.delay.test2交換機:(ttl + 死信隊列)
zlf.delay.test2交換機和delay.test2綁定關系:
zlf.dlx-test1交換機:(死信交換機)
zlf.dlx-test1死信交換機和dlx-test1綁定關系:
zlf.delay.test3交換機:
zlf.delay.test3交換機delay.test3的綁定關系:
error交換機
error.direct + 一個下標,類型、路由鍵是固定
error.direct + 一個下標 和 error.queue+ 一個下標 的綁定關系: