曹縣網(wǎng)站開發(fā)抓取關(guān)鍵詞的軟件
1. 項(xiàng)目背景
要啥項(xiàng)目背景,就是干!!!
- SpringBoot版本:2.7.12
2. Rabbit MQ安裝
這里講解使用docker安裝RabbitMQ,如果在windows下面安裝RabbitMQ,參考下文
【笑小楓的按步照搬系列】Windows下安裝RabbitMQ,圖文完整教程
本文演示使用的windows下的mq,下面額外提供下使用docker安裝RabbitMQ。
2.1 docker拉取RabbitMQ鏡像
[root@k8s-n1 /]# docker pull rabbitmq:3.7.7-management
如下圖:
2.2 創(chuàng)建掛載目錄
[root@k8s-n1 /]# mkdir /mnt/rabbitMQ/data
2.3 查看下載鏡像的鏡像id
[root@k8s-n1 /]# docker images
2.4 啟動(dòng)docker里的RabbitMQ鏡像
[root@k8s-n1 /]# docker run -d --name rabbitmq3.7.7 -p 5672:5672 -p 15672:15672 -v /mnt/rabbitMQ/data:/var/lib/rabbitmq --hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin 2888deb59dfc
參數(shù)說明:
-d 后臺(tái)運(yùn)行容器;
–name 指定容器名;
-p 指定服務(wù)運(yùn)行的端口(5672:應(yīng)用訪問端口;15672:控制臺(tái)Web端口號);
-v 映射目錄或文件;
–hostname 主機(jī)名(RabbitMQ的一個(gè)重要注意事項(xiàng)是它根據(jù)所謂的 “節(jié)點(diǎn)名稱” 存儲(chǔ)數(shù)據(jù),默認(rèn)為主機(jī)名);
-e 指定環(huán)境變量;(RABBITMQ_DEFAULT_VHOST:默認(rèn)虛擬機(jī)名;RABBITMQ_DEFAULT_USER:默認(rèn)的用戶名;RABBITMQ_DEFAULT_PASS:默認(rèn)用戶名的密碼)
2.5 啟動(dòng)成功
[root@k8s-n1 /]# docker ps
- 查看docker容器:
- 瀏覽器訪問
用瀏覽器訪問http://192.168.2.21:15672
訪問成功,表示RabbitMQ安裝成功。
3. Rabbit MQ基礎(chǔ)概念
3.1 虛擬機(jī)(Virtual Host)
虛擬主機(jī),表示一批交換器、消息隊(duì)列和相關(guān)對象。
虛擬主機(jī)是共享相同的身份認(rèn)證和加密環(huán)境的獨(dú)立服務(wù)器域。
vhost 是 AMQP 概念的基礎(chǔ),必須在連接時(shí)指定,RabbitMQ 默認(rèn)的 vhost 是 / 。每個(gè) vhost 本質(zhì)上就是一個(gè) mini 版的 RabbitMQ 服務(wù)器,擁有自己的隊(duì)列、交換器、綁定和權(quán)限機(jī)制。
3.2 發(fā)布者、消費(fèi)者
- 發(fā)布者(Publisher)
生產(chǎn)消息,并將其推送到broker。
- 消費(fèi)者(Consumer)
消費(fèi)者消費(fèi)消息。
3.3 交換機(jī)(exchange)
- 默認(rèn)交換機(jī)Default Exchange
默認(rèn)交換機(jī)是直接交換機(jī)的一種特殊形式,能夠使得它對簡單的應(yīng)用程序非常有用,即創(chuàng)建的每個(gè)隊(duì)列都會(huì)自動(dòng)綁定到默認(rèn)交換機(jī)上,并使用與隊(duì)列名稱相同的routing key。
常用場景
沒有配置交換機(jī)使用的都是默認(rèn)交換機(jī)
- 直接交換機(jī)Direct Exchange
直接交換機(jī)根據(jù)消息的routing key將消息傳遞到匹配的隊(duì)列。直接交換機(jī)是消息單播路由的理想選擇,但是也可以廣播。
常用場景
- 點(diǎn)對點(diǎn)聊天
- 新聞消息分類(體育、娛樂、社會(huì)等)分發(fā)
- 扇形交換機(jī)Fanout Exchange
扇形交換機(jī)將消息路由到綁定到它的所有隊(duì)列,并忽略routing key。如果N個(gè)隊(duì)列綁定到扇形交換機(jī),則當(dāng)向該交換機(jī)發(fā)布消息時(shí),該消息的副本將傳遞給這N個(gè)隊(duì)列。
常用場景
- 大型多人在線游戲可將扇形交換機(jī)用于排行榜(積分、名次等)更新
- 體育新聞網(wǎng)站可以使用扇形交換機(jī)向移動(dòng)客戶端近乎實(shí)時(shí)地分發(fā)分?jǐn)?shù)更新
- 分布式系統(tǒng)可以利用扇形交換機(jī),廣播各種狀態(tài)和配置的更新
- 群聊可以利用扇形交換機(jī)分發(fā)消息
- 主題交換機(jī)Topic Exchange
主題交換機(jī)會(huì)將消息路由到和其綁定的一個(gè)或者多個(gè)隊(duì)列。主題交換機(jī)通常用于發(fā)布訂閱模式,以及廣播。當(dāng)一個(gè)問題涉及到多個(gè)消費(fèi)者/應(yīng)用程序,它們有選擇地選擇要接收哪種類型的消息時(shí),應(yīng)考慮使用主題交換機(jī)。
主題交換機(jī)對路由鍵進(jìn)行模式匹配后進(jìn)行投遞,符號#表示一個(gè)或多個(gè)詞,*表示一個(gè)詞。因此“abc.#”能夠匹配到“abc.def.ghi”,但是“abc.*” 只會(huì)匹配到“abc.def”。
常用場景
- 多個(gè)工作線程處理后臺(tái)任務(wù),每個(gè)工作線程處理特定的任務(wù)
- 股票價(jià)格更新(以及其他類型財(cái)務(wù)數(shù)據(jù)的更新)
- 頭交換機(jī)Headers Exchange
頭交換機(jī),不處理路由鍵,而是根據(jù)發(fā)送的消息內(nèi)容中的headers屬性進(jìn)行匹配。
3.4 隊(duì)列(Queue)
隊(duì)列存儲(chǔ)消息。發(fā)布者生成的消息都在隊(duì)列中,消費(fèi)者從隊(duì)列中獲取消息進(jìn)行消費(fèi)。
3.5 路由鍵(Routing Key)
路由關(guān)鍵字,exchange根據(jù)這個(gè)關(guān)鍵字進(jìn)行消息投遞。
4. SpringBoot集成Rabbit MQ
4.1 環(huán)境準(zhǔn)備
SpringBoot創(chuàng)建過程不多介紹,這里就是一個(gè)最簡單的項(xiàng)目,文末提供項(xiàng)目源碼
- 軟件版本
SpringBoot 2.7.12
Erlang 25.2.1
RabbitMQ 3.11.0
- 添加依賴
pom.xml
<!-- 引入MQ依賴 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
- 添加配置文件
application.yml
server:port: 8080spring:rabbitmq:addresses: 127.0.0.1:5672username: guestpassword: guest# 環(huán)境隔離,默認(rèn)使用“/”(虛擬主機(jī))virtual-host: /connection-timeout: 6000
4.2 單生產(chǎn)者單消費(fèi)者,簡單的小栗子
配置隊(duì)列
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author 笑小楓 <https://www.xiaoxiaofeng.com/>* @date 2023/12/18*/
@Configuration
public class SimpleQueueConfig {/*** 使用默認(rèn)的交換機(jī),進(jìn)行消息發(fā)布消費(fèi)*/@Beanpublic Queue simpleQueue() {return new Queue("simpleQueue");}
}
生產(chǎn)者代碼
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Component;/*** @author 笑小楓 <https://www.xiaoxiaofeng.com/>* @date 2023/12/18*/
@Component
@AllArgsConstructor
@Slf4j
public class SimpleMsgSender {private final AmqpTemplate rabbitTemplate;/*** 直接沒有配置交換機(jī)(exchange),使用默認(rèn)的交換機(jī)*/public void send(String msg) {rabbitTemplate.convertAndSend("simpleQueue", msg);log.info("SimpleMsgSender 發(fā)送消息成功:" + msg);}
}
消費(fèi)者代碼
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** @author 笑小楓 <https://www.xiaoxiaofeng.com/>* @date 2023/12/18*/
@Component
@Slf4j
public class SimpleMsgReceiver {/*** 監(jiān)聽simpleQueue隊(duì)列的消息,進(jìn)行消費(fèi)*/@RabbitListener(queues = "simpleQueue")public void simpleMsgHandle(String msg) {log.info("SimpleMsgReceiver消費(fèi)消息: " + msg);}
}
模擬發(fā)送消息調(diào)用
/*** 模擬使用默認(rèn)的交換機(jī),調(diào)用消息發(fā)送*/@GetMapping("/simpleQueueSend")public String simpleQueueSend(String msg) {simpleMsgSender.send(msg);return "發(fā)送成功";}
GET http://localhost:8080/simpleQueueSend?msg=一條簡單MQ測試消息
可以看到RabbitMQ成功產(chǎn)生一條消息,并且被消費(fèi)成功喲。簡單的例子實(shí)現(xiàn)啦,可以愉快的使用MQ了🚀🚀🚀
4.3 多生產(chǎn)者多消費(fèi)者 的小栗子
配置隊(duì)列
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author 笑小楓 <https://www.xiaoxiaofeng.com/>* @date 2023/12/18*/
@Configuration
public class MoreToMoreQueueConfig {/*** 模擬多個(gè)生產(chǎn)者和多個(gè)消費(fèi)者同時(shí)工作*/@Beanpublic Queue moreToMoreQueue() {return new Queue("moreToMoreQueue");}
}
生產(chǎn)者代碼
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Component;/*** @author 笑小楓 <https://www.xiaoxiaofeng.com/>* @date 2023/12/18*/
@Component
@AllArgsConstructor
@Slf4j
public class MoreToMoreSender {private final AmqpTemplate rabbitTemplate;/*** 模擬多個(gè)生產(chǎn)者和多個(gè)消費(fèi)者同時(shí)工作* 生產(chǎn)者1*/public void sendOne(String msg) {rabbitTemplate.convertAndSend("moreToMoreQueue", "MoreToMoreSender.sendOne:" + msg);log.info("MoreToMoreSender sendOne 發(fā)送消息成功:" + msg);}public void sendTwo(String msg) {rabbitTemplate.convertAndSend("moreToMoreQueue", "MoreToMoreSender.sendTwo:" + msg);log.info("MoreToMoreSender sendTwo 發(fā)送消息成功:" + msg);}
}
消費(fèi)者代碼
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** @author 笑小楓 <https://www.xiaoxiaofeng.com/>* @date 2023/12/18*/
@Component
@Slf4j
public class MoreToMoreReceiver {/*** 監(jiān)聽moreToMoreQueue隊(duì)列的消息,進(jìn)行消費(fèi)*/@RabbitListener(queues = "moreToMoreQueue")public void moreToMoreHandleOne(String msg) {log.info("moreToMoreHandleOne消費(fèi)消息: " + msg);}/*** 監(jiān)聽moreToMoreQueue隊(duì)列的消息,進(jìn)行消費(fèi)*/@RabbitListener(queues = "moreToMoreQueue")public void moreToMoreHandleTwo(String msg) {log.info("moreToMoreHandleTwo消費(fèi)消息: " + msg);}/*** 監(jiān)聽moreToMoreQueue隊(duì)列的消息,進(jìn)行消費(fèi)*/@RabbitListener(queues = "moreToMoreQueue")public void moreToMoreHandleThree(String msg) {log.info("moreToMoreHandleThree消費(fèi)消息: " + msg);}
}
模擬發(fā)送消息調(diào)用
/*** 模擬多生產(chǎn)者多消費(fèi)者*/@GetMapping("/moreToMoreSend")public String moreToMoreSend(String msg) {moreToMoreSender.sendOne(msg);moreToMoreSender.sendTwo(msg);return "發(fā)送成功";}
失策了,應(yīng)該3個(gè)生產(chǎn)者,2個(gè)消費(fèi)者的,算了不想改了,發(fā)兩次消息吧
GET http://localhost:8080/moreToMoreSend?msg=模擬2個(gè)生產(chǎn)者,3個(gè)消費(fèi)者的MQ測試消息1
GET http://localhost:8080/moreToMoreSend?msg=模擬2個(gè)生產(chǎn)者,3個(gè)消費(fèi)者的MQ測試消息2
可以看到發(fā)送的消息被不同的消費(fèi)者消費(fèi)
4.4 直接交換機(jī) Direct Exchange 的小栗子
看一下演示配置
路由directRoutingKeyA
屬于單點(diǎn)發(fā)送(單播路由)
路由directRoutingKeyB
屬于廣播,綁定B、C兩個(gè)隊(duì)列,分別被B、C隊(duì)列的監(jiān)聽消費(fèi)了(廣播路由)
再看一下交換機(jī)對應(yīng)Routing Key綁定的隊(duì)列關(guān)系。(項(xiàng)目啟動(dòng)時(shí),默認(rèn)會(huì)自動(dòng)創(chuàng)建交換機(jī)和隊(duì)列喲,可以修改配置不自動(dòng)創(chuàng)建)
配置隊(duì)列
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author 笑小楓 <https://www.xiaoxiaofeng.com/>* @date 2023/12/18*/
@Configuration
public class DirectExchangeConfig {/*** 使用直接交換機(jī)發(fā)送消息* directRoutingKeyA 單播路由* directRoutingKeyB 廣播路由*/@Beanpublic Queue directQueueA() {return new Queue("directQueue.A");}@Beanpublic Queue directQueueB() {return new Queue("directQueue.B");}@Beanpublic Queue directQueueC() {return new Queue("directQueue.C");}@Beanpublic DirectExchange directExchange() {return new DirectExchange("directExchange");}@Beanpublic Binding bindingDirectExchangeA(Queue directQueueA, DirectExchange directExchange) {return BindingBuilder.bind(directQueueA).to(directExchange).with("directRoutingKeyA");}@Beanpublic Binding bindingDirectExchangeB(Queue directQueueB, DirectExchange directExchange) {return BindingBuilder.bind(directQueueB).to(directExchange).with("directRoutingKeyB");}@Beanpublic Binding bindingDirectExchangeC(Queue directQueueC, DirectExchange directExchange) {return BindingBuilder.bind(directQueueC).to(directExchange).with("directRoutingKeyB");}
}
生產(chǎn)者代碼
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Component;/*** @author 笑小楓 <https://www.xiaoxiaofeng.com/>* @date 2023/12/18*/
@Component
@AllArgsConstructor
@Slf4j
public class DirectExchangeSender {private final AmqpTemplate rabbitTemplate;public void sendA(String msg) {rabbitTemplate.convertAndSend("directExchange", "directRoutingKeyA", "sendA:" + msg);log.info("DirectExchangeSender sendA 發(fā)送消息成功:" + msg);}public void sendB(String msg) {rabbitTemplate.convertAndSend("directExchange", "directRoutingKeyB", "sendB:" + msg);log.info("DirectExchangeSender sendB 發(fā)送消息成功:" + msg);}
}
消費(fèi)者代碼
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** @author 笑小楓 <https://www.xiaoxiaofeng.com/>* @date 2023/12/18*/
@Component
@Slf4j
public class DirectExchangeReceiver {/*** 監(jiān)聽directQueue.A隊(duì)列的消息,進(jìn)行消費(fèi)*/@RabbitListener(queues = "directQueue.A")public void directHandleA(String msg) {log.info("directHandleA消費(fèi)消息: " + msg);}/*** 監(jiān)聽directQueue.B隊(duì)列的消息,進(jìn)行消費(fèi)*/@RabbitListener(queues = "directQueue.B")public void directHandleB(String msg) {log.info("directHandleB消費(fèi)消息: " + msg);}/*** 監(jiān)聽directQueue.C隊(duì)列的消息,進(jìn)行消費(fèi)*/@RabbitListener(queues = "directQueue.C")public void directHandleC(String msg) {log.info("directHandleC消費(fèi)消息: " + msg);}
}
模擬發(fā)送消息調(diào)用
/*** 模擬使用直接交換機(jī)發(fā)送消息*/@GetMapping("/directExchangeSend")public String directExchangeSend(String msg) {directExchangeSender.sendA(msg);directExchangeSender.sendB(msg);return "發(fā)送成功";}
GET http://localhost:8080/directExchangeSend?msg=模擬直接交換機(jī)發(fā)送MQ測試消息
4.5 扇形交換機(jī) Fanout Exchange 的小栗子
看一下演示配置
配置隊(duì)列
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author 笑小楓 <https://www.xiaoxiaofeng.com/>* @date 2023/12/18*/
@Configuration
public class FanoutExchangeConfig {/*** 模擬廣播發(fā)送消息*/@Beanpublic Queue fanoutQueueA() {return new Queue("fanoutQueue.A");}@Beanpublic Queue fanoutQueueB() {return new Queue("fanoutQueue.B");}@Beanpublic Queue fanoutQueueC() {return new Queue("fanoutQueue.C");}@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange("fanoutExchange");}@Beanpublic Binding bindingFanoutExchangeA(Queue fanoutQueueA, FanoutExchange fanoutExchange) {return BindingBuilder.bind(fanoutQueueA).to(fanoutExchange);}@Beanpublic Binding bindingFanoutExchangeB(Queue fanoutQueueB, FanoutExchange fanoutExchange) {return BindingBuilder.bind(fanoutQueueB).to(fanoutExchange);}@Beanpublic Binding bindingFanoutExchangeC(Queue fanoutQueueC, FanoutExchange fanoutExchange) {return BindingBuilder.bind(fanoutQueueC).to(fanoutExchange);}
}
生產(chǎn)者代碼
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Component;/*** @author 笑小楓 <https://www.xiaoxiaofeng.com/>* @date 2023/12/18*/
@Component
@AllArgsConstructor
@Slf4j
public class FanoutExchangeSender {private final AmqpTemplate rabbitTemplate;/*** 直接沒有配置交換機(jī)(exchange),使用默認(rèn)的交換機(jī)*/public void send(String msg) {rabbitTemplate.convertAndSend("fanoutExchange", "", msg);log.info("FanoutExchangeSender 發(fā)送消息成功:" + msg);}
}
消費(fèi)者代碼
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** @author 笑小楓 <https://www.xiaoxiaofeng.com/>* @date 2023/12/18*/
@Component
@Slf4j
public class FanoutExchangeReceiver {/*** 監(jiān)聽simpleQueue隊(duì)列的消息,進(jìn)行消費(fèi)*/@RabbitListener(queues = "fanoutQueue.A")public void fanoutHandleA(String msg) {log.info("fanoutHandleA消費(fèi)消息: " + msg);}/*** 監(jiān)聽simpleQueue隊(duì)列的消息,進(jìn)行消費(fèi)*/@RabbitListener(queues = "fanoutQueue.B")public void fanoutHandleB(String msg) {log.info("fanoutHandleB消費(fèi)消息: " + msg);}/*** 監(jiān)聽simpleQueue隊(duì)列的消息,進(jìn)行消費(fèi)*/@RabbitListener(queues = "fanoutQueue.C")public void fanoutHandleC(String msg) {log.info("fanoutHandleC消費(fèi)消息: " + msg);}
}
模擬發(fā)送消息調(diào)用
@GetMapping("/fanoutExchangeSend")public String fanoutExchangeSend(String msg) {fanoutExchangeSender.send(msg);return "發(fā)送成功";}
GET http://localhost:8080/fanoutExchangeSend?msg=模擬扇形交換機(jī)發(fā)送MQ測試消息
可以看到一個(gè)生產(chǎn)者發(fā)送消息后,3個(gè)消費(fèi)者都消費(fèi)了消息
4.6 主題交換機(jī) Topic Exchange 的小栗子
看一下演示配置
主題交換機(jī)對路由鍵進(jìn)行模式匹配后進(jìn)行投遞,符號#表示一個(gè)或多個(gè)詞,*表示一個(gè)詞。
通過圖,可以預(yù)測一下消息消費(fèi)情況,如下,接下來讓我們一起測試一下吧
topicHandleA 會(huì)消費(fèi)routingKey為:fanoutQueue.A
topicHandleB 會(huì)消費(fèi)routingKey為:fanoutQueue.A、fanoutQueue.B
topicHandleC 會(huì)消費(fèi)routingKey為:fanoutQueue.A、fanoutQueue.B、fanoutQueue.C.1
配置隊(duì)列
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author 笑小楓 <https://www.xiaoxiaofeng.com/>* @date 2023/12/18*/
@Configuration
public class TopicExchangeConfig {/*** 模擬主題交換機(jī)發(fā)送消息*/@Beanpublic Queue topicQueueA() {return new Queue("topicQueue.A");}@Beanpublic Queue topicQueueB() {return new Queue("topicQueue.B");}@Beanpublic Queue topicQueueC() {return new Queue("topicQueue.C.1");}@Beanpublic TopicExchange topicExchange() {return new TopicExchange("topicExchange");}@Beanpublic Binding bindingTopicExchangeA(Queue topicQueueA, TopicExchange topicExchange) {return BindingBuilder.bind(topicQueueA).to(topicExchange).with("topicQueue.A");}@Beanpublic Binding bindingTopicExchangeB(Queue topicQueueB, TopicExchange topicExchange) {return BindingBuilder.bind(topicQueueB).to(topicExchange).with("topicQueue.*");}@Beanpublic Binding bindingTopicExchangeC(Queue topicQueueC, TopicExchange topicExchange) {return BindingBuilder.bind(topicQueueC).to(topicExchange).with("topicQueue.#");}
}
生產(chǎn)者代碼
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Component;/*** @author 笑小楓 <https://www.xiaoxiaofeng.com/>* @date 2023/12/18*/
@Component
@AllArgsConstructor
@Slf4j
public class TopicExchangeSender {private final AmqpTemplate rabbitTemplate;public void sendA(String msg) {rabbitTemplate.convertAndSend("topicExchange", "topicQueue.A", "sendA:" + msg);log.info("TopicExchangeSender sendA 發(fā)送消息成功:" + msg);}public void sendB(String msg) {rabbitTemplate.convertAndSend("topicExchange", "topicQueue.B", "sendB:" + msg);log.info("TopicExchangeSender sendB 發(fā)送消息成功:" + msg);}public void sendC(String msg) {rabbitTemplate.convertAndSend("topicExchange", "topicQueue.C.1", "sendC:" + msg);log.info("TopicExchangeSender sendC 發(fā)送消息成功:" + msg);}
}
消費(fèi)者代碼
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** @author 笑小楓 <https://www.xiaoxiaofeng.com/>* @date 2023/12/18*/
@Component
@Slf4j
public class TopicExchangeReceiver {/*** 監(jiān)聽topicQueue.A隊(duì)列的消息,進(jìn)行消費(fèi)*/@RabbitListener(queues = "topicQueue.A")public void topicHandleA(String msg) {log.info("topicHandleA消費(fèi)消息: " + msg);}/*** 監(jiān)聽topicQueue.B隊(duì)列的消息,進(jìn)行消費(fèi)*/@RabbitListener(queues = "topicQueue.B")public void topicHandleB(String msg) {log.info("topicHandleB消費(fèi)消息: " + msg);}/*** 監(jiān)聽topicQueue.C.1隊(duì)列的消息,進(jìn)行消費(fèi)*/@RabbitListener(queues = "topicQueue.C.1")public void topicHandleC(String msg) {log.info("topicHandleC消費(fèi)消息: " + msg);}
}
模擬發(fā)送消息調(diào)用
@GetMapping("/topicExchangeSend")public String topicExchangeSend(String msg) {topicExchangeSender.sendA(msg);topicExchangeSender.sendB(msg);topicExchangeSender.sendC(msg);return "發(fā)送成功";}
GET http://localhost:8080/topicExchangeSend?msg=模擬主題交換機(jī)發(fā)送MQ測試消息
可以看出,得到的結(jié)果和我們預(yù)測的一致???
5. 接受確認(rèn)機(jī)制 ACK 的小栗子
為了保證消息在消費(fèi)過程中的可靠性,RabbitMQ
引入消息確認(rèn)機(jī)制(ACK(Acknowledge))
,消費(fèi)者在接收到消息并且處理該消息之后,告訴RabbitMQ
它已經(jīng)處理,RabbitMQ
再將該消息刪除。
5.1 消費(fèi)端收到消息后的三種確認(rèn)方式
- auto:根據(jù)偵聽器檢測是正常返回、還是拋出異常來確認(rèn)
- none:當(dāng)消息一旦被消費(fèi)者接收到,則自動(dòng)確認(rèn)收到,并將相應(yīng)消息從 RabbitMQ的消息緩存中移除
- manual:將消息分發(fā)給了消費(fèi)者,并且只有當(dāng)消費(fèi)者處理完成了整個(gè)消息之后才會(huì)被認(rèn)為消息傳遞成功了,然后才會(huì)將內(nèi)存中的消息刪除。
5.2 手動(dòng)確認(rèn),簽收和拒絕的方法
如果消息成功處理,需要調(diào)用channel.basicAck()
方法進(jìn)行簽收:
void basicAck(long deliveryTag, boolean multiple) throws IOException {}
basicAck()
方法需要兩個(gè)參數(shù):
deliveryTag
(唯一標(biāo)識 ID):當(dāng)一個(gè)消費(fèi)者向RabbitMQ
注冊后,會(huì)建立起一個(gè)Channel
,向消費(fèi)者推送消息,這個(gè)方法攜帶了一個(gè)deliveryTag
, 它代表了RabbitMQ
向該Channel
投遞的這條消息的唯一標(biāo)識 ID,是一個(gè)單調(diào)遞增的正整數(shù),deliveryTag
的范圍僅限于當(dāng)前 Channel。multiple
:為了減少網(wǎng)絡(luò)流量,手動(dòng)確認(rèn)可以被批處理,當(dāng)該參數(shù)為true
時(shí),則可以一次性確認(rèn)deliveryTag
小于等于傳入值的所有消息
如果消息處理失敗,調(diào)用channel.basicNack()
方法拒絕簽收:
public void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException {}
basicNack()
方法需要三個(gè)參數(shù):
deliveryTag
:同basicAck
multipl
e:同basicAck
requeue
:重回隊(duì)列。如果設(shè)置為true
,則消息重新回到queue
,服務(wù)端會(huì)重新發(fā)送該消息給消費(fèi)端
5.3 Demo演示
接下來我們演示一下manual
手動(dòng)確認(rèn)應(yīng)該怎么實(shí)現(xiàn)
可以通過application.yml
配置文件和代碼兩種方法進(jìn)行配置,這里主要以代碼的形式講解
spring:rabbitmq:username: guestpassword: guesthost: localhostport: 5672# 消息監(jiān)聽器配置listener:# 消息監(jiān)聽容器類型,默認(rèn) simpletype: simplesimple:# 消息確認(rèn)模式,none、manual和autoacknowledge-mode: manual
創(chuàng)建工廠
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author 笑小楓 <https://www.xiaoxiaofeng.com/>* @date 2023/12/19*/
@Configuration
public class AckRabbitListenerContainerFactory {@Bean(value = "ackListenerContainerFactory", name = "ackListenerContainerFactory")public SimpleRabbitListenerContainerFactory ackListenerContainerFactory() {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(ackConnectionFactory());factory.setConcurrentConsumers(3);factory.setMaxConcurrentConsumers(10);// 設(shè)置消息為手動(dòng)確認(rèn)factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);return factory;}/*** 獲取rabbitmq連接.** @return 返回rabbitmq連接.*/@Bean(value = "ackConnectionFactory")@ConfigurationProperties(prefix = "spring.rabbitmq")public CachingConnectionFactory ackConnectionFactory() {return new CachingConnectionFactory();}
}
配置隊(duì)列
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author 笑小楓 <https://www.xiaoxiaofeng.com/>* @date 2023/12/19*/
@Configuration
public class AckExchangeConfig {@Beanpublic Queue ackQueue() {return new Queue("ackQueue");}@Beanpublic DirectExchange ackExchange() {return new DirectExchange("ackExchange");}@Beanpublic Binding bindingAckExchange(Queue ackQueue, DirectExchange ackExchange) {return BindingBuilder.bind(ackQueue).to(ackExchange).with("ackRoutingKey");}
}
生產(chǎn)者代碼
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Component;/*** @author 笑小楓 <https://www.xiaoxiaofeng.com/>* @date 2023/12/19*/
@Component
@AllArgsConstructor
@Slf4j
public class AckSender {private final AmqpTemplate rabbitTemplate;public void send(String msg) {rabbitTemplate.convertAndSend("ackExchange", "ackRoutingKey", msg);log.info("AckSender 發(fā)送消息成功:" + msg);}
}
消費(fèi)者代碼
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;import java.io.IOException;/*** @author 笑小楓 <https://www.xiaoxiaofeng.com/>* @date 2023/12/19*/
@Component
@Slf4j
public class AckReceiver {/*** 監(jiān)聽ackQueue隊(duì)列的消息,進(jìn)行消費(fèi)*/@RabbitListener(queues = {"ackQueue"}, containerFactory = "ackListenerContainerFactory")public void ackHandle(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {log.info("AckReceiver消費(fèi)消息: " + msg);channel.basicAck(deliveryTag, false);}
}
模擬發(fā)送消息調(diào)用
@GetMapping("/ackSend")public String ackSend(String msg) {ackSender.send(msg);return "發(fā)送成功";}
6. 死信隊(duì)列 的小栗子
6.1 什么是死信隊(duì)列?
在消息隊(duì)列中,執(zhí)行異步任務(wù)時(shí),通常是將消息生產(chǎn)者發(fā)布的消息存儲(chǔ)在隊(duì)列中,由消費(fèi)者從隊(duì)列中獲取并處理這些消息。但是,在某些情況下,消息可能無法正常地被處理和消耗,例如:格式錯(cuò)誤、設(shè)備故障等,這些未成功處理的消息就被稱為“死信”。
為了避免這些未成功處理的消息導(dǎo)致程序異常或?qū)ο到y(tǒng)造成影響,我們需要使用死信隊(duì)列(Dead Letter Queue)。當(dāng)我們設(shè)置死信隊(duì)列后,所有無法成功處理的消息將被捕獲并重定向到指定的死信交換機(jī)中。消費(fèi)者可以從該交換機(jī)中讀取并處理這些“死信”。
6.2 死信隊(duì)列的優(yōu)點(diǎn)
-
提高系統(tǒng)可靠性:避免因未處理的死信而導(dǎo)致程序異常,提高系統(tǒng)的可靠性。
-
實(shí)現(xiàn)延遲消息:可以通過設(shè)置TTL時(shí)間,將超時(shí)未消費(fèi)的消息轉(zhuǎn)移到死信隊(duì)列中,實(shí)現(xiàn)延遲消息的功能。
-
防止濫用:當(dāng)某些生產(chǎn)者惡意發(fā)送低質(zhì)量的消息或進(jìn)行濫用時(shí),可以通過丟棄或重定向死信消息來防止濫用和惡意攻擊。
6.3 死信隊(duì)列的應(yīng)用場景
-
消息格式錯(cuò)誤:當(dāng)消息格式錯(cuò)誤時(shí),可能會(huì)導(dǎo)致消費(fèi)者無法正確地解析或處理該消息,這個(gè)問題通常與生產(chǎn)者的代碼有關(guān)。為了避免消息失效,并提高系統(tǒng)可靠性,我們可以使用死信隊(duì)列。
-
消費(fèi)者故障:另一個(gè)常見的場景是消息處理者無法正確地處理或響應(yīng)到推入到隊(duì)列中的消息,例如消費(fèi)者創(chuàng)建了一個(gè)協(xié)程并在邏輯執(zhí)行完成后未正確地關(guān)閉該協(xié)程。由于該協(xié)程始終處于打開狀態(tài),它將一直阻止該消費(fèi)者對其他消息進(jìn)行正確消費(fèi)。為了避免這種消息掛起并影響其他消息的正常處理,可以將其加入死信中心。
哪些情況的消息會(huì)進(jìn)入死信隊(duì)列
- 消息 TTL 過期
- 隊(duì)列達(dá)到最大長度(隊(duì)列滿了,無法再添加數(shù)據(jù)到 mq 中)
- 消息被拒絕(basic.reject 或 basic.nack)并且 requeue=false
6.4 Demo演示
這里確認(rèn)機(jī)制為auto
的機(jī)制下,消息消費(fèi)異常被拒絕的demo。消息 TTL 過期見延時(shí)隊(duì)列。隊(duì)列達(dá)到最大長度這個(gè)就不演示了😭😭😭
配置隊(duì)列
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author 笑小楓 <https://www.xiaoxiaofeng.com/>* @date 2023/12/19*/
@Configuration
public class DeadLetterQueueConfig {@Beanpublic Queue deadLetterQueue() {return QueueBuilder.durable("deadLetterQueue").withArgument("x-dead-letter-exchange", "dlx-exchange").build();}@Beanpublic Queue normalQueue() {// 聲明該隊(duì)列的死信消息發(fā)送到的 交換機(jī) (隊(duì)列添加了這個(gè)參數(shù)之后會(huì)自動(dòng)與該交換機(jī)綁定,并設(shè)置路由鍵,不需要開發(fā)者手動(dòng)設(shè)置)return QueueBuilder.durable("normalQueue").withArgument("x-message-ttl", 5000).withArgument("x-dead-letter-exchange", "dlx-exchange").build();}@Beanpublic TopicExchange dlxExchange() {return new TopicExchange("dlx-exchange");}@Beanpublic Binding dlxBinding(Queue deadLetterQueue, TopicExchange dlxExchange) {return BindingBuilder.bind(deadLetterQueue).to(dlxExchange).with("#");}
}
生產(chǎn)者代碼
package com.maple.rabbit.sender;import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Component;/*** @author 笑小楓 <https://www.xiaoxiaofeng.com/>* @date 2023/12/19*/
@Component
@AllArgsConstructor
@Slf4j
public class DeadLetterSender {private final AmqpTemplate rabbitTemplate;public void send(String msg) {rabbitTemplate.convertAndSend("normalQueue", msg);log.info("DeadLetterSender 發(fā)送消息成功:" + msg);}
}
消費(fèi)者代碼
package com.maple.rabbit.receiver;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** @author 笑小楓 <https://www.xiaoxiaofeng.com/>* @date 2023/12/19*/
@Component
@Slf4j
public class DeadLetterReceiver {/*** 監(jiān)聽normalQueue隊(duì)列的消息,進(jìn)行消費(fèi),模擬消費(fèi)拋出異常,讓消息進(jìn)入死信隊(duì)列*/@RabbitListener(queues = "normalQueue")public void normalHandle(String msg) {log.info("DeadLetterReceiver normalHandle 消費(fèi)消息: " + msg);throw new RuntimeException("DeadLetterReceiver normalHandle 消費(fèi)消息異常,測試死信隊(duì)列");}/*** 處理進(jìn)入到死信隊(duì)列的消息*/@RabbitListener(queues = "deadLetterQueue")public void deadLetterHandle(String msg) {log.info("DeadLetterReceiver deadLetterHandle 消費(fèi)消息: " + msg);}
}
模擬發(fā)送消息調(diào)用
@GetMapping("/deadLetterSend")public String deadLetterSend(String msg) {deadLetterSender.send(msg);return "發(fā)送成功";}
GET http://localhost:8080/deadLetterSend?msg=模擬死信隊(duì)列,發(fā)送MQ測試消息
可以看到normalHandle()
先消費(fèi)了消息,但是拋出了異常,進(jìn)入死信隊(duì)列,后續(xù)deadLetterHandle()
死信隊(duì)列有監(jiān)聽消費(fèi)了消息。
7. 延時(shí)隊(duì)列 的小栗子
7.1 什么是延時(shí)隊(duì)列
延時(shí)隊(duì)列
,首先,它是一種隊(duì)列,隊(duì)列意味著內(nèi)部的元素是有序
的,元素出隊(duì)和入隊(duì)是有方向性的,元素從一端進(jìn)入,從另一端取出。
其次,延時(shí)隊(duì)列
,最重要的特性就體現(xiàn)在它的延時(shí)
屬性上,跟普通的隊(duì)列不一樣的是,普通隊(duì)列中的元素總是等著希望被早點(diǎn)取出處理,而延時(shí)隊(duì)列中的元素則是希望被在指定時(shí)間得到取出和處理
,所以延時(shí)隊(duì)列中的元素是都是帶時(shí)間屬性的,通常來說是需要被處理的消息或者任務(wù)。
簡單來說,延時(shí)隊(duì)列就是用來存放需要在指定時(shí)間被處理的元素的隊(duì)列。
7.2 延時(shí)隊(duì)列使用場景
- 訂單在十分鐘之內(nèi)未支付則自動(dòng)取消。
- 新創(chuàng)建的店鋪,如果在十天內(nèi)都沒有上傳過商品,則自動(dòng)發(fā)送消息提醒。
- 賬單在一周內(nèi)未支付,則自動(dòng)結(jié)算。
- 用戶注冊成功后,如果三天內(nèi)沒有登陸則進(jìn)行短信提醒。
- 用戶發(fā)起退款,如果三天內(nèi)沒有得到處理則通知相關(guān)運(yùn)營人員。
- 預(yù)定會(huì)議后,需要在預(yù)定的時(shí)間點(diǎn)前十分鐘通知各個(gè)與會(huì)人員參加會(huì)議。
7.3 TTL介紹
在介紹延時(shí)隊(duì)列之前,還需要先介紹一下RabbitMQ中的一個(gè)高級特性——TTL(Time To Live)
。
TTL
是什么呢?TTL
是RabbitMQ中一個(gè)消息或者隊(duì)列的屬性,表明一條消息或者該隊(duì)列中的所有消息的最大存活時(shí)間
,單位是毫秒。換句話說,如果一條消息設(shè)置了TTL屬性或者進(jìn)入了設(shè)置TTL屬性的隊(duì)列,那么這條消息如果在TTL設(shè)置的時(shí)間內(nèi)沒有被消費(fèi),則會(huì)成為“死信”。如果同時(shí)配置了隊(duì)列的TTL和消息的TTL,那么較小的那個(gè)值將會(huì)被使用。
7.4 Demo延時(shí)
配置隊(duì)列
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author 笑小楓 <https://www.xiaoxiaofeng.com/>* @date 2023/12/19*/
@Configuration
public class DelayQueueConfig {@Beanpublic Queue delayDeadQueue() {return QueueBuilder.durable("delayDeadQueue").deadLetterExchange("delay-exchange").build();}@Beanpublic Queue delayQueue() {// 聲明該隊(duì)列的死信消息發(fā)送到的 交換機(jī) (隊(duì)列添加了這個(gè)參數(shù)之后會(huì)自動(dòng)與該交換機(jī)綁定,并設(shè)置路由鍵,不需要開發(fā)者手動(dòng)設(shè)置)// ttl = 5000,設(shè)置TTL,單位ms,5s 內(nèi)沒被消費(fèi),則進(jìn)入死信隊(duì)列return QueueBuilder.durable("delayQueue").ttl(5000).deadLetterExchange("delay-exchange").build();}@Beanpublic TopicExchange delayExchange() {return new TopicExchange("delay-exchange");}@Beanpublic Binding delayBinding(Queue delayDeadQueue, TopicExchange delayExchange) {return BindingBuilder.bind(delayDeadQueue).to(delayExchange).with("#");}
}
生產(chǎn)者代碼
package com.maple.rabbit.sender;import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Component;/*** @author 笑小楓 <https://www.xiaoxiaofeng.com/>* @date 2023/12/19*/
@Component
@AllArgsConstructor
@Slf4j
public class DelaySender {private final AmqpTemplate rabbitTemplate;public void send(String msg) {rabbitTemplate.convertAndSend("delayQueue", msg);log.info("DelaySender 發(fā)送消息成功:" + msg);}
}
消費(fèi)者代碼
package com.maple.rabbit.receiver;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** @author 笑小楓 <https://www.xiaoxiaofeng.com/>* @date 2023/12/19*/
@Component
@Slf4j
public class DelayReceiver {@RabbitListener(queues = "delayDeadQueue")public void delayHandle(String msg) {log.info("DelayReceiver delayHandle 消費(fèi)消息: " + msg);}
}
模擬發(fā)送消息調(diào)用
@GetMapping("/delaySend")public String delaySend(String msg) {delaySender.send(msg);return "發(fā)送成功";}
GET http://localhost:8080/delaySend?msg=模擬延時(shí)隊(duì)列,發(fā)送MQ測試消息
可以看到發(fā)送MQ消息后,5s多一點(diǎn)的時(shí)間(消息發(fā)送,接收,處理都需要耗時(shí),所以會(huì)多一點(diǎn)),死信隊(duì)列消費(fèi)了消息
8. 消息重試機(jī)制
這個(gè)比較簡單,以這個(gè)結(jié)尾吧,看下面配置,找個(gè)消費(fèi)地方拋出異常,既可以看到重試,這里不做演示,累了😭😭😭
server:port: 8080spring:rabbitmq:addresses: 127.0.0.1:5672username: guestpassword: guest# 環(huán)境隔離,默認(rèn)使用“/”(虛擬主機(jī))virtual-host: /connection-timeout: 6000listener:simple:retry:# 是否開啟重試機(jī)制enabled: true# 默認(rèn)是3,是一共三次,而不是重試三次,三次包含了第一執(zhí)行,所以只重試了兩次max-attempts: 3# 重試間隔時(shí)間。毫秒initial-interval: 2000ms#重試最大時(shí)間間隔(單位毫秒)max-interval: 1200000ms#間隔時(shí)間乘子,間隔時(shí)間*乘子=下一次的間隔時(shí)間,最大不能超過設(shè)置的最大間隔時(shí)間multiplier: 2
9. 項(xiàng)目源碼
本文到此就結(jié)束了,如果幫助到你了,幫忙點(diǎn)個(gè)贊👍
本文源碼:https://github.com/hack-feng/maple-product/tree/main/maple-mq-rabbit
🐾我是笑小楓,全網(wǎng)皆可搜的【笑小楓】