單位門戶網(wǎng)站建設方案google海外版入口
一、MQ與RabbitMQ概述
1. MQ簡述
MQ(Message Queue)消息隊列,是基礎數(shù)據(jù)結構中 “先進先出” 的一種數(shù)據(jù)結構,也是在消息的傳輸過程中保存消息的容器(中間件),多用于分布式系統(tǒng)之間進行通信。
一般MQ用來解決系統(tǒng)耦合、異步消息、流量削峰等問題,實現(xiàn)高性能、高可用、可伸縮和最終一致性架構。(AP架構)
分布式系統(tǒng)有兩種通信方式:直接遠程調用 和 借助第三方(MQ)完成間接通信。(發(fā)送方稱為生產(chǎn)者,接收方稱為消費者)
2. MQ的優(yōu)勢與劣勢
2.1 MQ的優(yōu)勢
MQ的優(yōu)勢:(應用解耦、異步、削峰)
- 應用解耦:提高系統(tǒng)容錯性和可維護性;
- 異步提速:提升用戶體驗和系統(tǒng)吞吐量;
- 削峰填谷:提高系統(tǒng)穩(wěn)定性。
1、應用解耦
2、異步提速
3、削峰填谷
填谷:
使用了MQ之后,限制消費消息的速度為1000,這樣一來,高峰期產(chǎn)生的數(shù)據(jù)勢必會被積壓在MQ中,高峰就被“削”掉了,但是因為消息積壓,在高峰期過后的一段時間內,消費消息的速度還是會維持在1000,直到消費完積壓的消息,這就叫做“填谷”,從而提升系統(tǒng)的穩(wěn)定性。
2.2 MQ的劣勢
引入MQ會遇到下列問題:
- 消息可靠性問題(如何確保發(fā)送的消息至少被消費者消費一次,避免消息丟失問題)
- 延遲消息問題 (如何實現(xiàn)消息的延遲投遞,解決方案:使用延時隊列、TTL、延遲隊列插件實現(xiàn))
- 高可用問題(如何避免單點MQ故障而導致的不可用問題,解決方案:搭建MQ集群)
- 消息堆積問題(如何解決數(shù)百萬消息堆積,無法及時消費的問題)
3. 常見的MQ產(chǎn)品
市面上有很多MQ產(chǎn)品,例如RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq等,也有直接使用Rdis充當消息隊列的場景。在實際技術選型時,需要結合自身需求及MQ產(chǎn)品特征來綜合考慮。
幾種常見MQ的對比:
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
---|---|---|---|---|
公司/社區(qū) | Rabbit | Apache | 阿里 | Apache |
開發(fā)語言 | Erlang(二郎神,高并發(fā)語言) | Java | Java | Scala&Java |
協(xié)議支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定義協(xié)議 | 自定義協(xié)議,社區(qū)封裝了http協(xié)議支持 |
可用性 | 高 | 一般 | 高 | 高 |
單機吞吐量 | 一般(萬級) | 差 | 高(十萬級) | 非常高(十萬級) |
消息延遲 | 微秒級 | 毫秒級 | 毫秒級 | 毫秒以內 |
消息可靠性 | 高 | 一般 | 高 | 一般 |
功能特性 | 并發(fā)能力強,性能極其好,延遲低,社區(qū)活躍,管理界面豐富 | 老牌產(chǎn)品,成熟度高,文檔較多 | MQ功能比較完備,擴展性佳 | 只支持主要的MQ功能,畢竟是為大數(shù)據(jù)領域準備的。 |
-
追求可用性(高->低):Kafka、 RocketMQ 、RabbitMQ;
-
追求可靠性:RabbitMQ、RocketMQ;
-
追求吞吐能力:RocketMQ、Kafka;
-
追求消息低延遲:RabbitMQ、Kafka。
4. RabbitMQ簡述
RabbitMQ官網(wǎng)地址:http://www.rabbitmq.com/
RabbitMQ是基于AMQP協(xié)議使用Erlang語言開發(fā)的一款消息隊列產(chǎn)品。
AMQP (全稱Advanced Message Queuing Protocol,表示高級消息隊列協(xié)議),是一個網(wǎng)絡協(xié)議,是應用層協(xié)議的一個開放標準,為面向消息的中間件設計?;诖藚f(xié)議的客戶端與消息中間件可傳遞消息,并不受客戶端/中間件不同產(chǎn)品,不同的開發(fā)語言等條件的限制。2006年,AMQP規(guī)范發(fā)布。類比HTTP。(同類的協(xié)議還有MQTT用于物聯(lián)網(wǎng)場景下)
RabbitMQ中的一些角色:(AMQP協(xié)議消息中間件類似)
- publisher:生產(chǎn)者;
- consumer:消費者;
- exchange :交換機,負責消息路由;
- queue:隊列,存儲消息;
- virtualHost:虛擬主機,隔離不同租戶的exchange、queue、消息的隔離。
RabbitMQ工作模式:
RabbitMQ提供了6種工作模型,但是我們常用的只有5種:簡單隊列模型、工作隊列模型、發(fā)布訂閱模型(廣播、路由、主題)。(第六種RPC遠程調用不屬于mq)
- 官網(wǎng)對應模式介紹:https://www.rabbitmq.com/getstarted.html
JMS(JavaMessage Service)
- JMS,Java消息服務應用程序接口,即Java操作消息中間件的API;
- JMS是JavaEE規(guī)范的一種,類比JDBC;
- 很多消息中間件都實現(xiàn)了JMS規(guī)范,例如:ActiveMQ。RabbitMQ官方?jīng)]有提供JMS的實現(xiàn)包,但是開源社區(qū)有。
二、RabbitMQ安裝與配置
1. 基于docker快速安裝RabbitMQ
擴展:docker-compose安裝rabbitmq:https://gitee.com/aopmin/docker-compose/blob/master/Linux/RabbitMQ/docker-compose.yml
1、拉取鏡像
docker pull rabbitmq:3.8-management
2、運行容器
docker run \-e RABBITMQ_DEFAULT_USER=admin \-e RABBITMQ_DEFAULT_PASS=123456 \-v /docker/rabbitmq/plugins:/plugins \--name rabbitmq \--hostname my-rabbit \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq:3.8-management
- \ 代表換行
- -e 指定環(huán)境變量
- -e RABBITMQ_DEFAULT_USER=admin用戶名
- -e RABBITMQ_DEFAULT_PASS=123456密碼
- -v 掛載目錄或文件 (數(shù)據(jù)卷)
- -p 15672:15672 用于web管理頁面使用的端口 (管理員頁面)
- -p 5672:5672 用于生產(chǎn)和消費端使用的端口(通信端口,也就是在代碼里使用)
- -d 后臺運行
- –name rabbitmq 容器名字
- –hostname my-rabbit(RabbitMQ的一個重要注意事項是它根據(jù)所謂的 “節(jié)點名稱” 存儲數(shù)據(jù),默認為主機名);
3、啟動xxx插件
# 進入容器
docker exec -it rabbitmq /bin/bash# 啟動xxx插件
rabbitmq-plugins enable xxx
RabbitMQ管理端:
管理端訪問地址:http://ip:15672/
2. 創(chuàng)建用戶和虛擬機
1、添加一個新用戶:
添加成功后列表會顯示該用戶,但是這個用戶沒有操作權限,需要為他創(chuàng)建一個虛擬機:
2、創(chuàng)建虛擬機
為指定用戶授權:
最后該用戶就可以操作這個虛擬機了:
三、RabbitMQ快速入門
使用簡單模型中的基本模式完成消息傳遞:
官方的HelloWorld示例是基于簡單消息隊列模型來實現(xiàn)的,其中包括三個角色:
- publisher:消息發(fā)布者,將消息發(fā)送到隊列queue;
- queue:消息隊列,負責接受并緩存消息;
- consumer:訂閱隊列,處理隊列中的消息。
1. 基礎環(huán)境搭建
1、創(chuàng)建父工程mq-demo,并在pom文件中導入如下依賴:
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.9.RELEASE</version><relativePath/>
</parent><dependencies><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--SpringAMQP依賴,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--單元測試--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency>
</dependencies>
2、創(chuàng)建子模塊publisher、consumer,并編寫啟動類和yml配置文件:
# 日志輸出格式配置
logging:pattern:dateformat: MM-dd HH:mm:ss:SSS
2. publisher消息發(fā)布者實現(xiàn)
消息收發(fā)流程:Connection連接、Channel通道、queue隊列、exchange 交換機。
publisher消息發(fā)布者實現(xiàn)思路:
- 建立連接
- 創(chuàng)建Channel
- 聲明隊列
- 發(fā)送消息
- 關閉連接和channel
1、編寫publisher測試代碼:
package com.baidou.mq.test;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 生產(chǎn)者* @author 白豆五* @version 2023/04/27* @since JDK8*/
public class PublisherTest {@Testpublic void testSendMessage() throws IOException, TimeoutException {// 1.創(chuàng)建連接工廠ConnectionFactory factory = new ConnectionFactory();// 1.1.設置連接參數(shù),分別是:主機名、端口號、vhost、用戶名、密碼factory.setHost("192.168.200.128");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("admin");factory.setPassword("123456");// 1.2.建立連接Connection connection = factory.newConnection();// 2.創(chuàng)建通道ChannelChannel channel = connection.createChannel();// 3.創(chuàng)建隊列String queueName = "simple.queue";channel.queueDeclare(queueName, false, false, false, null);// 4.發(fā)送消息String message = "hello, rabbitmq!";channel.basicPublish("", queueName, null, message.getBytes());System.out.println("發(fā)送消息成功:【" + message + "】");// 5.關閉通道和連接channel.close();connection.close();}
}
2、在建立連接處打斷點,并以debug方式啟動(方便觀察每個組件的創(chuàng)建)
查看連接信息:
繼續(xù)按F8,查看通道信息:
繼續(xù)按F8,查看隊列信息:
最后直接放行程序,查看隊列中的消息:
3. consumer消費者實現(xiàn)
consumer消費者實現(xiàn)思路:
- 建立連接
- 創(chuàng)建Channel
- 聲明隊列
- 訂閱消息
1、編寫消費者代碼
package com.baidou.mq.test;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 消費者* @author 白豆五* @version 2023/04/27* @since JDK8*/
public class ConsumerTest {public static void main(String[] args) throws IOException, TimeoutException {// 1.創(chuàng)建連接工廠ConnectionFactory factory = new ConnectionFactory();// 1.1.設置連接參數(shù),分別是:主機名、端口號、vhost、用戶名、密碼factory.setHost("192.168.200.128");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("admin");factory.setPassword("123456");// 1.2.建立連接Connection connection = factory.newConnection();// 2.創(chuàng)建通道ChannelChannel channel = connection.createChannel();// 3.創(chuàng)建隊列String queueName = "simple.queue";channel.queueDeclare(queueName, false, false, false, null);// 4.訂閱消息channel.basicConsume(queueName, true, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {// 5.處理消息String message = new String(body);System.out.println("接收到消息:【" + message + "】");}});System.out.println("等待接收消息。。。。");}
}
2、測試(啟動程序后會一直執(zhí)行,不用的時候將程序結束即可)
四、SpringAMQP與RabbitMQ工作模型
1. SpringAMQP概述
AMQP是消息中間件收發(fā)消息的協(xié)議(規(guī)范),具體實現(xiàn)由各個消息中間廠商實現(xiàn);(例如 RabbitMQ)
SpringAMQP是基于RabbitMQ封裝的一套模板,并且還利用SpringBoot對其實現(xiàn)了自動裝配,使用起來非常方便。
SpringAMQP的官方地址:https://spring.io/projects/spring-amqp
SpringAMQP提供了三個功能:
- 自動聲明隊列、交換機及其綁定關系;
- 基于注解的監(jiān)聽器模式,異步接收消息;
- 封裝了RabbitTemplate工具,用于發(fā)送消息 。
RabbitMQ工作模型:簡單隊列模型、工作隊列模型、發(fā)布訂閱模型(廣播、路由、主題)。
2. BasicQueue 簡單隊列模型
使用SpringAMQP實現(xiàn)基礎消息隊列功能:
1、在父工程中引入spring-amqp起步依賴:
<!--SpringAMQP依賴,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、消息發(fā)送
2.1、在publisher服務的application.yml中添加rabbitmq配置:
# 配置rabbitmq
spring:rabbitmq:host: 192.168.200.128 # 主機名port: 5672 # 端口virtual-host: / # 虛擬主機username: admin # 用戶名password: 123456 # 密碼# 配置日志格式
logging:pattern:dateformat: MM-dd HH:mm:ss:SSS
2.2、在publisher服務中編寫測試類SpringAmqpTest,并利用RabbitTemplate實現(xiàn)消息發(fā)送:
package com.baidou.mq.test;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;/*** 使用SpringAMQP實現(xiàn)簡單隊列模型的消息發(fā)送** @author 白豆五* @version 2023/04/27* @since JDK8*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {// 操作RabbitMQ的模板類@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 測試簡單隊列模型*/@Testpublic void testSimpleQueue() {// 隊列名稱String queueName = "simple.queue";// 消息String message = "hello, spring amqp!";// 發(fā)送消息rabbitTemplate.convertAndSend(queueName, message);}
}
3、消息接收
3.1、在consumer服務的application.yml中添加rabbitmq配置:
# 配置rabbitmq
spring:rabbitmq:host: 192.168.200.128 # 主機名port: 5672 # 端口virtual-host: / # 虛擬主機username: admin # 用戶名password: 123456 # 密碼# 配置日志格式
logging:pattern:dateformat: MM-dd HH:mm:ss:SSS
3.2、在consumer服務的com.baidou.mq.listener
包中創(chuàng)建SpringRabbitListener類:
package com.baidou.mq.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 消息監(jiān)聽類** @author 白豆五* @version 2023/04/27* @since JDK8*/
@Component
public class SpringRabbitListener {/*** 訂閱消息** @param msg* @throws InterruptedException*/@RabbitListener(queues = "simple.queue") // 指定監(jiān)聽的隊列名稱為simple.queuepublic void listenSimpleQueueMessage(String msg) throws InterruptedException {System.out.println("spring 消費者接收到消息:【" + msg + "】");}
}
4、測試
先啟動consumer服務(啟動類),然后在publisher服務中運行測試代碼,發(fā)送MQ消息。