長沙部分風(fēng)險(xiǎn)區(qū)域調(diào)整廈門seo
目錄
- 引出
- 點(diǎn)對(duì)點(diǎn)(simple)
- Work queues 一對(duì)多
- 發(fā)布訂閱/fanout模式
- 以登陸驗(yàn)證碼為例
- pom文件導(dǎo)包
- application.yml文件
- rabbitmq的配置
- 生產(chǎn)者生成驗(yàn)證碼,發(fā)送給交換機(jī)
- 消費(fèi)者消費(fèi)驗(yàn)證碼
- topic模式
- 配置類增加配置
- 生產(chǎn)者發(fā)送信息
- 進(jìn)行發(fā)送
- 控制臺(tái)查看
- rabbitmq回調(diào)確認(rèn)
- 配置類
- 驗(yàn)證生產(chǎn)者發(fā)送是否成功
- 延遲隊(duì)列(死信)設(shè)計(jì)
- java代碼步驟
- 創(chuàng)建正常+死信隊(duì)列
- 配置類+常量
- 生產(chǎn)者到正常隊(duì)列
- 消費(fèi)者進(jìn)行延遲消費(fèi)
- 延遲隊(duì)列插件安裝
- 訪問官網(wǎng)
- 進(jìn)入rabbitmq docker容器
- 上傳到linux服務(wù)器
- 拷貝插件到容器中
- 進(jìn)入容器安裝插件
- 打開管理頁面
- 總結(jié)
引出
1.rabbitmq隊(duì)列方式的梳理,點(diǎn)對(duì)點(diǎn),一對(duì)多;
2.發(fā)布訂閱模式,交換機(jī)到消費(fèi)者,以郵箱和手機(jī)驗(yàn)證碼為例;
3.topic模式,根據(jù)規(guī)則決定發(fā)送給哪個(gè)隊(duì)列;
4.rabbitmq回調(diào)確認(rèn),setConfirmCallback和setReturnsCallback;
5.死信隊(duì)列,延遲隊(duì)列,創(chuàng)建方法,正?!佬?#xff0c;設(shè)置延遲時(shí)間;
點(diǎn)對(duì)點(diǎn)(simple)
點(diǎn)對(duì)對(duì)方式傳輸
Work queues 一對(duì)多
1個(gè)生產(chǎn)者多個(gè)消費(fèi)者
發(fā)布訂閱/fanout模式
生產(chǎn)者通過fanout扇出交換機(jī)群發(fā)消息給消費(fèi)者,同一條消息每一個(gè)消費(fèi)者都可以收到。
以登陸驗(yàn)證碼為例
pom文件導(dǎo)包
<!-- qq郵箱--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-mail</artifactId></dependency><!-- 阿里云短信驗(yàn)證碼相關(guān)包--><dependency><groupId>com.aliyun</groupId><artifactId>aliyun-java-sdk-core</artifactId><version>4.5.3</version></dependency><!-- queue的包--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
application.yml文件
server:port: 9099spring:# 模塊的名字application:name: user-auth# 郵箱的配置mail:host: smtp.qq.comport: 587username: xxxxpassword: xxxxx# rabbitmq的配置rabbitmq:host: 192.168.111.130port: 5672username: adminpassword: 123logging:level:com.tianju.auth: debug
rabbitmq的配置
需要用到的常量
package com.tianju.auth.util;/*** rabbitmq的常量*/
public interface RabbitMqConstants {String MQ_MAIL_QUEUE="mq_email_queue";String MQ_PHONE_QUEUE="mq_phone_queue";String MQ_FANOUT_EXCHANGE="mq_fanout_exchange";// 參數(shù) String name, boolean durable, boolean exclusive, boolean autoDeleteboolean durable = true;boolean exclusive = false;boolean autoDelete = false;}
RabbitMqConfig.java配置
郵箱隊(duì)列,電話隊(duì)列,交換機(jī);
郵箱綁定交換機(jī),電話綁定交換機(jī);
創(chuàng)建隊(duì)列參數(shù)說明:
參數(shù) | 說明 |
---|---|
name | 字符串值,queue的名稱。 |
durable | 布爾值,表示該 queue 是否持久化。 持久化意味著當(dāng) RabbitMQ 重啟后,該 queue 是否會(huì)恢復(fù)/仍存在。 另外,需要注意的是,queue 的持久化不等于其中的消息也會(huì)被持久化。 |
exclusive | 布爾值,表示該 queue 是否排它式使用。排它式使用意味著僅聲明他的連接可見/可用,其它連接不可見/不可用。 |
autoDelete | 布爾值,表示當(dāng)該 queue 沒“人”(connection)用時(shí),是否會(huì)被自動(dòng)刪除。 |
不指定 durable、exclusive 和 autoDelete 時(shí),默認(rèn)為 true 、 false 和 false 。表示持久化、非排它、不用自動(dòng)刪除。
創(chuàng)建交換機(jī)參數(shù)說明
參數(shù) | 說明 |
---|---|
name | 字符串值,exchange 的名稱。 |
durable | 布爾值,表示該 exchage 是否持久化。 持久化意味著當(dāng) RabbitMQ 重啟后,該 exchange 是否會(huì)恢復(fù)/仍存在。 |
autoDelete | 布爾值,表示當(dāng)該 exchange 沒“人”(queue)用時(shí),是否會(huì)被自動(dòng)刪除。 |
不指定 durable 和 autoDelete 時(shí),默認(rèn)為
true
和false
。表示持久化、不用自動(dòng)刪除
package com.tianju.auth.config;import com.tianju.auth.util.RabbitMqConstants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMqConfig {@Bean // 郵箱的隊(duì)列public Queue mailQueue(){return new Queue(RabbitMqConstants.MQ_MAIL_QUEUE,RabbitMqConstants.durable,RabbitMqConstants.exclusive,RabbitMqConstants.autoDelete);}@Bean // 電話的隊(duì)列public Queue phoneQueue(){return new Queue(RabbitMqConstants.MQ_PHONE_QUEUE,RabbitMqConstants.durable,RabbitMqConstants.exclusive,RabbitMqConstants.autoDelete);}@Bean // 交換機(jī)public FanoutExchange fanoutExchange(){return new FanoutExchange(RabbitMqConstants.MQ_FANOUT_EXCHANGE,RabbitMqConstants.durable,RabbitMqConstants.autoDelete);}@Beanpublic Binding mailBinding(){return BindingBuilder.bind(mailQueue()).to(fanoutExchange());}@Beanpublic Binding phoneBinding(){return BindingBuilder.bind(phoneQueue()).to(fanoutExchange());}}
生產(chǎn)者生成驗(yàn)證碼,發(fā)送給交換機(jī)
接口
package com.tianju.auth.service;public interface IUserService {/*** 生產(chǎn)者生成信息發(fā)送給交換機(jī)* @param msg 信息,這里是驗(yàn)證碼*/void sendCode(String msg);
}
實(shí)現(xiàn)
package com.tianju.auth.service.impl;import com.tianju.auth.service.IUserService;
import com.tianju.auth.util.RabbitMqConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.ArrayList;
import java.util.List;@Service
@Slf4j
public class UserServiceImpl implements IUserService {@Autowiredprivate RabbitTemplate rabbitTemplate;@Overridepublic void sendCode(String msg) {rabbitTemplate.convertAndSend(RabbitMqConstants.MQ_FANOUT_EXCHANGE,"routingkey.fanout",msg);log.debug("[生產(chǎn)者向交換機(jī):] 發(fā)送一條信息:{}",msg);}}
測試類生成驗(yàn)證碼,發(fā)給交換機(jī)
package com.tianju.auth.service.impl;import cn.hutool.core.lang.Snowflake;
import com.tianju.auth.service.IUserService;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;@SpringBootTest
@RunWith(SpringJUnit4ClassRunner.class)public class UserServiceImplTest {@Autowiredprivate IUserService userService;@Testpublic void sendCode() {String code = new Snowflake().nextIdStr().substring(0, 6);System.out.println(code);userService.sendCode(code);}
}
消費(fèi)者消費(fèi)驗(yàn)證碼
package com.tianju.auth.consumer;import com.tianju.auth.service.IEmailService;
import com.tianju.auth.util.RabbitMqConstants;
import com.tianju.auth.util.SMSUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Slf4j
@Service
public class UserConsumer {@Autowiredprivate IEmailService emailService;@RabbitListener(queues = RabbitMqConstants.MQ_MAIL_QUEUE)public void emailConsumer(String msg){log.debug("[email消費(fèi)者:]消費(fèi){}",msg);emailService.sendEmail("xxxx@qq.com", "登陸驗(yàn)證碼", msg);}@RabbitListener(queues = RabbitMqConstants.MQ_PHONE_QUEUE)public void phoneConsumer(String msg){log.debug("[phone消費(fèi)者:]消費(fèi){}",msg);SMSUtil.send("xxxx", msg);}}
topic模式
例如: routingkey: my.orange.rabbit —-> Q1,Q2
配置類增加配置
package com.tianju.auth.util;/*** rabbitmq的常量*/
public interface RabbitMqConstants {String MQ_MAIL_QUEUE="mq_email_queue";String MQ_PHONE_QUEUE="mq_phone_queue";String MQ_FANOUT_EXCHANGE="mq_fanout_exchange";String MQ_TOPIC_EXCHANGE="mq_topic_exchange";String MQ_TOPIC_QUEUE_A = "mq_topic_queue_a";String MQ_TOPIC_QUEUE_B = "mq_topic_queue_b";// 參數(shù) String name, boolean durable, boolean exclusive, boolean autoDeleteboolean durable = true;boolean exclusive = false;boolean autoDelete = false;}
package com.tianju.auth.config;import com.tianju.auth.util.RabbitMqConstants;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMqConfig {@Bean // 郵箱的隊(duì)列public Queue mailQueue(){return new Queue(RabbitMqConstants.MQ_MAIL_QUEUE,RabbitMqConstants.durable,RabbitMqConstants.exclusive,RabbitMqConstants.autoDelete);}@Bean // 電話的隊(duì)列public Queue phoneQueue(){return new Queue(RabbitMqConstants.MQ_PHONE_QUEUE,RabbitMqConstants.durable,RabbitMqConstants.exclusive,RabbitMqConstants.autoDelete);}@Bean // 交換機(jī)public FanoutExchange fanoutExchange(){return new FanoutExchange(RabbitMqConstants.MQ_FANOUT_EXCHANGE,RabbitMqConstants.durable,RabbitMqConstants.autoDelete);}@Beanpublic Binding mailBinding(){return BindingBuilder.bind(mailQueue()).to(fanoutExchange());}@Beanpublic Binding phoneBinding(){return BindingBuilder.bind(phoneQueue()).to(fanoutExchange());}@Bean // A隊(duì)列public Queue topicAQueue(){return new Queue(RabbitMqConstants.MQ_TOPIC_QUEUE_A,RabbitMqConstants.durable,RabbitMqConstants.exclusive,RabbitMqConstants.autoDelete);}/*** topic模式相關(guān)配置*/@Bean // B隊(duì)列public Queue topicBQueue(){return new Queue(RabbitMqConstants.MQ_TOPIC_QUEUE_B,RabbitMqConstants.durable,RabbitMqConstants.exclusive,RabbitMqConstants.autoDelete);}@Bean // topic的交換機(jī)public TopicExchange topicMyExchange(){return new TopicExchange(RabbitMqConstants.MQ_TOPIC_EXCHANGE,RabbitMqConstants.durable,RabbitMqConstants.autoDelete);}@Beanpublic Binding topicAQueueBinding(){return BindingBuilder.bind(topicAQueue()).to(topicMyExchange()).with("topic.xxx"); // 規(guī)則 topic.xxx}@Beanpublic Binding topicBQueueBinding(){return BindingBuilder.bind(topicBQueue()).to(topicMyExchange()).with("topic.*"); // 規(guī)則 topic.xxx}}
生產(chǎn)者發(fā)送信息
/*** topic模式下,生產(chǎn)者發(fā)送信息給交換機(jī),可以決定給哪個(gè)隊(duì)列發(fā)信息* @param msg 發(fā)送的信息* @param routingKey 類似正則表達(dá)式,決定給誰發(fā)* .with("topic.xxx"); // 規(guī)則 topic.xxx ---- A隊(duì)列* .with("topic.*"); // 規(guī)則 topic.xxx ---- B隊(duì)列* 在配置類中,如上所述配置,則如果輸入的routingKey為 topic.xxx則給A和B發(fā);* 如果輸入的routingKey為 topic.yyy 則 只給B隊(duì)列發(fā);*/void sendMsg(String msg,String routingKey);
實(shí)現(xiàn)
package com.tianju.auth.service.impl;import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.tianju.auth.entity.UserPrivs;
import com.tianju.auth.mapper.UserMapper;
import com.tianju.auth.service.IUserService;
import com.tianju.auth.util.RabbitMqConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.ArrayList;
import java.util.List;@Service
@Slf4j
public class UserServiceImpl implements IUserService {@Autowiredprivate RabbitTemplate rabbitTemplate;@Overridepublic void sendCode(String msg) {rabbitTemplate.convertAndSend(RabbitMqConstants.MQ_FANOUT_EXCHANGE,"routingkey.fanout",msg);log.debug("[生產(chǎn)者向交換機(jī):] 發(fā)送一條信息:{}",msg);}@Overridepublic void sendMsg(String msg,String routingKey) {rabbitTemplate.convertAndSend(RabbitMqConstants.MQ_TOPIC_EXCHANGE,routingKey, // "topic.yyy",此時(shí)只有B隊(duì)列有信息msg);log.debug("[生產(chǎn)者向交換機(jī):] 發(fā)送一條信息:{}",msg);}}
進(jìn)行發(fā)送
package com.tianju.auth.service.impl;import cn.hutool.core.lang.Snowflake;
import com.tianju.auth.service.IUserService;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;@SpringBootTest
@RunWith(SpringJUnit4ClassRunner.class)public class UserServiceImplTest {@Autowiredprivate IUserService userService;@Testpublic void sendCode() {String code = new Snowflake().nextIdStr().substring(0, 6);System.out.println(code);userService.sendCode(code);}@Testpublic void sendTopic() {String code = new Snowflake().nextIdStr().substring(0, 6);System.out.println(code);userService.sendMsg(code,"topic.yyy");}
}
控制臺(tái)查看
rabbitmq回調(diào)確認(rèn)
配置類
spring:# rabbitmq的配置rabbitmq:host: 192.168.111.130port: 5672username: adminpassword: 123# 確認(rèn)收到publisher-confirm-type: correlatedpublisher-returns: true
驗(yàn)證生產(chǎn)者發(fā)送是否成功
使用RabbitTemplate的回調(diào)方法。
先設(shè)置
- setConfirmCallback
- setReturnsCallback
@Autowiredprivate RabbitTemplate rabbitTemplate;@Overridepublic void sendCode(String msg) {rabbitTemplate.convertAndSend(RabbitMqConstants.MQ_FANOUT_EXCHANGE,"routingkey.fanout",msg);log.debug("[生產(chǎn)者向交換機(jī):] 發(fā)送一條信息:{}",msg);}@Overridepublic void sendMsg(String msg,String routingKey) {// 如果發(fā)到交換機(jī),看一下有沒有反饋rabbitTemplate.setConfirmCallback((c,ack,message)->{log.debug("***** setConfirmCallback:ack--{}", ack); // 是否發(fā)送到交換機(jī)log.debug("***** setConfirmCallback:c-->{}",c);// channel error; protocol method: #method<channel.close>(reply-code=404,// reply-text=NOT_FOUND - no exchange 'aaaa' in vhost '/', class-id=60, method-id=40)log.debug("***** setConfirmCallback:m-->{}",message);if (ack){log.debug("[生產(chǎn)者:] 發(fā)送信息到交換機(jī){}","RabbitMqConstants.MQ_TOPIC_EXCHANGE");}else {log.debug(message);}});rabbitTemplate.setReturnsCallback(r->{log.debug("返回文字{}", r.getReplyText());log.debug("返回code{}", r.getReplyCode());log.debug("返回Exchange{}", r.getExchange());log.debug("返回RoutingKey{}", r.getRoutingKey());});rabbitTemplate.convertAndSend(RabbitMqConstants.MQ_TOPIC_EXCHANGE,
// "aaaa",// 失敗的情況routingKey, // "topic.yyy",此時(shí)只有B隊(duì)列有信息msg);log.debug("[生產(chǎn)者向交換機(jī):] 發(fā)送一條信息:{}",msg);}
rabbitTemplate.setConfirmCallback((c,ack,message)->{log.debug("******* setConfirmCallback:ack->{}",ack);log.debug("******* setConfirmCallback:c->{}",c);log.debug("******* setConfirmCallback:chanel->{}",message);if(ack){log.debug("[生產(chǎn)者]發(fā)送信息到達(dá)交換機(jī){}","RabbitMqConstants.MQ_TOPIC_EXCHANGE");}else {log.debug(message);}
});
rabbitTemplate.setReturnsCallback(r->{log.debug("返回文字:{}",r.getReplyText());log.debug("返回code:{}",r.getReplyCode());log.debug("返回Exchange:{}",r.getExchange());log.debug("返回RoutingKey:{}",r.getRoutingKey());
});
rabbitTemplate.convertAndSend(RabbitMqConstants.MQ_TOPIC_EXCHANGE,"abc.xxx",msg
);
@Testpublic void sendTopic() {String code = new Snowflake().nextIdStr().substring(0, 6);System.out.println(code);userService.sendMsg(code,"topic.rrr");}
延遲隊(duì)列(死信)設(shè)計(jì)
Documentation: Table of Contents — RabbitMQ
java代碼步驟
創(chuàng)建正常+死信隊(duì)列
package com.tianju.mq.config;import com.tianju.mq.constants.RabbitMqConstants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitMqConfig {@Beanpublic DirectExchange normalExchange(){return new DirectExchange(RabbitMqConstants.MQ_NORMAL_EXCHANGE,RabbitMqConstants.durable,RabbitMqConstants.autoDelete);}@Beanpublic Queue normalQueue(){Map<String, Object> map = new HashMap<>(2);map.put("x-dead-letter-exchange",RabbitMqConstants.MQ_DELAY_EXCHANGE);map.put("x-dead-letter-routing-key",RabbitMqConstants.MQ_DELAY_ROUTING_KEY);return new Queue(RabbitMqConstants.MQ_NORMAL_QUEUE,RabbitMqConstants.durable,RabbitMqConstants.exclusive,RabbitMqConstants.autoDelete,map);}@Beanpublic Binding normalBinding(){return BindingBuilder.bind(normalQueue()).to(normalExchange()).with(RabbitMqConstants.MQ_NORMAL_ROUTING_KEY);}//------------------死信隊(duì)列設(shè)計(jì)--------------------------/*** 死信(延遲)隊(duì)列* @return*/@Beanpublic Queue delayQueue(){return new Queue(RabbitMqConstants.MQ_DELAY_QUEUE,RabbitMqConstants.durable,RabbitMqConstants.exclusive,RabbitMqConstants.autoDelete);}/*** 死信交換機(jī)* @return*/@Beanpublic DirectExchange delayExchange(){return new DirectExchange(RabbitMqConstants.MQ_DELAY_EXCHANGE,RabbitMqConstants.durable,RabbitMqConstants.autoDelete);}/*** 死信交換機(jī)隊(duì)列綁定* @return*/@Beanpublic Binding delayBinding(){return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(RabbitMqConstants.MQ_DELAY_ROUTING_KEY);}
}
配置類+常量
package com.tianju.mq.constants;public interface RabbitMqConstants {String MQ_DELAY_QUEUE = "mq_delay_queue"; // 延遲隊(duì)列,死信隊(duì)列String MQ_DELAY_EXCHANGE = "mq_delay_exchange"; // 死信交換機(jī)String MQ_DELAY_ROUTING_KEY = "mq_delay_routing_key"; // 死信路由// 正常的隊(duì)列,交換機(jī),路由String MQ_NORMAL_QUEUE = "mq_normal_queue";String MQ_NORMAL_EXCHANGE = "mq_normal_exchange";String MQ_NORMAL_ROUTING_KEY = "mq_normal_routing_key";// 參數(shù)boolean durable = true;boolean exclusive = false;boolean autoDelete = false;
}
server:port: 9099spring:# 郵箱的配置mail:host: smtp.qq.comport: 587username: 826465890@qq.compassword: sdxgilesroqbbbje# rabbitmq的配置rabbitmq:host: 192.168.111.130port: 5672username: adminpassword: 123# 確認(rèn)收到publisher-confirm-type: correlatedpublisher-returns: truelogging:level:com.tianju.mq: debug
生產(chǎn)者到正常隊(duì)列
package com.tianju.mq.service;public interface IUserService {/*** 延遲隊(duì)列的生產(chǎn)者* @param msg 發(fā)送的信息* @param delayTime 延遲的時(shí)間,毫秒*/void sendDelay(String msg,int delayTime);
}
package com.tianju.mq.service.impl;import com.tianju.mq.constants.RabbitMqConstants;
import com.tianju.mq.service.IUserService;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.Date;@Service
@Slf4j
public class UserServiceImpl implements IUserService {@Autowiredprivate RabbitTemplate rabbitTemplate;@Overridepublic void sendDelay(String msg, int delayTime) {rabbitTemplate.convertAndSend(RabbitMqConstants.MQ_NORMAL_EXCHANGE,RabbitMqConstants.MQ_NORMAL_ROUTING_KEY,msg,process->{process.getMessageProperties().setExpiration(String.valueOf(delayTime));return process;});log.debug("[生產(chǎn)者:]發(fā)送消息:{},時(shí)間{},延遲{}秒",msg,new Date(),delayTime/1000);}
}
消費(fèi)者進(jìn)行延遲消費(fèi)
package com.tianju.mq.consumer;import com.tianju.mq.constants.RabbitMqConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;import java.util.Date;@Service
@Slf4j
public class UserConsumer {@RabbitListener(queues = RabbitMqConstants.MQ_DELAY_QUEUE)public void delayConsume(String msg){log.debug("[消費(fèi)者消費(fèi)信息:{},時(shí)間:{}",msg,new Date());}
}
延遲隊(duì)列插件安裝
訪問官網(wǎng)
Community Plugins — RabbitMQ
進(jìn)入rabbitmq docker容器
[root@localhost ~]# docker exec -it rabbitmq bash
查詢插件列表是否存在延遲插件
root@6d2342d51b11:/plugins# rabbitmq-plugins list
Listing plugins with pattern ".*" ...Configured: E = explicitly enabled; e = implicitly enabled| Status: * = running on rabbit@6d2342d51b11|/
[ ] rabbitmq_amqp1_0 3.9.11
[ ] rabbitmq_auth_backend_cache 3.9.11
[ ] rabbitmq_auth_backend_http 3.9.11
[ ] rabbitmq_auth_backend_ldap 3.9.11
[ ] rabbitmq_auth_backend_oauth2 3.9.11
[ ] rabbitmq_auth_mechanism_ssl 3.9.11
[ ] rabbitmq_consistent_hash_exchange 3.9.11
[ ] rabbitmq_event_exchange 3.9.11
[ ] rabbitmq_federation 3.9.11
[ ] rabbitmq_federation_management 3.9.11
[ ] rabbitmq_jms_topic_exchange 3.9.11
[E*] rabbitmq_management 3.9.11
[e*] rabbitmq_management_agent 3.9.11
[ ] rabbitmq_mqtt 3.9.11
[ ] rabbitmq_peer_discovery_aws 3.9.11
[ ] rabbitmq_peer_discovery_common 3.9.11
[ ] rabbitmq_peer_discovery_consul 3.9.11
[ ] rabbitmq_peer_discovery_etcd 3.9.11
[ ] rabbitmq_peer_discovery_k8s 3.9.11
[E*] rabbitmq_prometheus 3.9.11
[ ] rabbitmq_random_exchange 3.9.11
[ ] rabbitmq_recent_history_exchange 3.9.11
[ ] rabbitmq_sharding 3.9.11
[ ] rabbitmq_shovel 3.9.11
[ ] rabbitmq_shovel_management 3.9.11
[ ] rabbitmq_stomp 3.9.11
[ ] rabbitmq_stream 3.9.11
[ ] rabbitmq_stream_management 3.9.11
[ ] rabbitmq_top 3.9.11
[ ] rabbitmq_tracing 3.9.11
[ ] rabbitmq_trust_store 3.9.11
[e*] rabbitmq_web_dispatch 3.9.11
[ ] rabbitmq_web_mqtt 3.9.11
[ ] rabbitmq_web_mqtt_examples 3.9.11
[ ] rabbitmq_web_stomp 3.9.11
[ ] rabbitmq_web_stomp_examples 3.9.11
下載支持3.9.x的插件
退出容器:
root@6d2342d51b11:/plugins# exit
exit
上傳到linux服務(wù)器
在/usr/local/software/下創(chuàng)建文件夾rabbitmq/plugins
[root@localhost software]# mkdir -p rabbitmq/plugins
拷貝插件到容器中
[root@localhost plugins]# docker cp ./rabbitmq_delayed_message_exchange-3.9.0.ez rabbitmq:/plugins
進(jìn)入容器安裝插件
[root@localhost plugins]# docker exec -it rabbitmq bash
root@6d2342d51b11:/# rabbitmq-plugins enable rabbitmq_delayed_message_exchange
打開管理頁面
進(jìn)入Exchange頁面,下拉Type看是否已經(jīng)安裝成功。
總結(jié)
1.rabbitmq隊(duì)列方式的梳理,點(diǎn)對(duì)點(diǎn),一對(duì)多;
2.發(fā)布訂閱模式,交換機(jī)到消費(fèi)者,以郵箱和手機(jī)驗(yàn)證碼為例;
3.topic模式,根據(jù)規(guī)則決定發(fā)送給哪個(gè)隊(duì)列;
4.rabbitmq回調(diào)確認(rèn),setConfirmCallback和setReturnsCallback;
5.死信隊(duì)列,延遲隊(duì)列,創(chuàng)建方法,正?!佬?#xff0c;設(shè)置延遲時(shí)間;