dede怎么設(shè)置wap網(wǎng)站云客網(wǎng)平臺
1. mq簡介
????????消息隊(duì)列是分布式系統(tǒng)中的異步通信中間件,采用"生產(chǎn)者-消費(fèi)者"模型實(shí)現(xiàn)服務(wù)間解耦通信
核心作用
- 服務(wù)解耦
- 異步處理
- 流量削峰
- 數(shù)據(jù)同步
- 最終一致性
消息隊(duì)列模式
- 發(fā)布/訂閱模式:一對多廣播
- 工作隊(duì)列模式:競爭消費(fèi)
- 死信隊(duì)列:處理失敗消息
- 延遲隊(duì)列:定時任務(wù)處理
- 消息回溯:Kafka按offset重新消費(fèi)
2. mq入門
? ? ? ? 使用SpringAMQP實(shí)現(xiàn)HelloWorld中的基礎(chǔ)消息隊(duì)列功能,一個生產(chǎn)者,一個隊(duì)列,一個消費(fèi)者
2.1 啟動mq
? ? ? ? 打開mq下載目錄,輸入命令(rabbitmq-server start)啟動
網(wǎng)址localhost:15672訪問,賬號密碼均為guest
2.2 導(dǎo)入依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.gaohe</groupId><artifactId>clouddemo</artifactId><packaging>pom</packaging><version>0.0.1-SNAPSHOT</version><modules><module>publisher</module><module>consumer</module></modules><name>clouddemo</name><description>clouddemo</description><properties><java.version>17</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><spring-boot.version>3.3.3</spring-boot.version><maven.compiler.source>17</maven.compiler.source><maven.compiler.target>17</maven.compiler.target></properties><dependencies><!--AMQP依賴,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--lombok--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency><!--AMQP依賴,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>${spring-boot.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>${spring-boot.version}</version><configuration><mainClass>com.gaohe.clouddemo.ClouddemoApplication</mainClass><skip>true</skip></configuration><executions><execution><id>repackage</id><goals><goal>repackage</goal></goals></execution></executions></plugin></plugins></build></project>
2.3 在yml配置文件中配置連接信息
spring:rabbitmq:host: localhost # 主機(jī)名port: 5672 # 端口virtual-host: / # 虛擬主機(jī)username: guest # 用戶名password: guest # 密碼
2.4 在publisher中利用RabbitTemplate發(fā)送信息到simple.queue隊(duì)列
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@Slf4j
@SpringBootTest(classes = PublisherApplication.class)
public class PublisherTest {@Autowiredpublic RabbitTemplate rabbitTemplate;// 發(fā)送消息
@Test
public void test1(){
// 1.發(fā)送的隊(duì)列String queueName1 ="hello.queue";
// 2.發(fā)送的消息String msg = "你好我喲一個帽衫";
// 3.發(fā)送rabbitTemplate.convertAndSend(queueName1,msg);
}}
2.5 在consumer服務(wù)中編寫消費(fèi)邏輯,綁定simple.queue這個隊(duì)列
package com.gaohe.consumer.lisenner;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class HelloLisenner {@RabbitListener(queues = "hello.queue")public void helloQueueLisenner(String msg){System.out.println("helloQueueLisenner"+msg);}@RabbitListener(queues = "hello.queue")public void helloQueueLisenner2(String msg){System.out.println("helloQueueLisenner2"+msg);}}
3.交換機(jī)
????????Exchange是消息隊(duì)列系統(tǒng)中的消息路由中樞,負(fù)責(zé)接收生產(chǎn)者發(fā)送的消息并根據(jù)特定規(guī)則將消息路由到一個或多個隊(duì)列中。
常見exchange類型包括:
- Fanout:廣播
- Direct:路由
- Topic:話題
3.1 路由交換機(jī)(FanoutExchange)
- 在consumer服務(wù)創(chuàng)建一個類,添加注解,聲明交換機(jī),隊(duì)列以及綁定關(guān)系對象
package com.gaohe.consumer.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;@Component
public class FanoutConfig {// 交換機(jī)@Beanpublic FanoutExchange fanout1(){return new FanoutExchange("itgaohe.fanout");}// 定義隊(duì)列@Beanpublic Queue queue1(){return new Queue("fanout.queue1");}// 隊(duì)列綁定交換機(jī)@Beanpublic Binding binding1(FanoutExchange fanout1){return BindingBuilder.bind(queue1()).to(fanout1);}// 定義隊(duì)列@Beanpublic Queue queue2(){return new Queue("fanout.queue2");}// 隊(duì)列綁定交換機(jī)@Beanpublic Binding binding2(FanoutExchange fanout1){return BindingBuilder.bind(queue2()).to(fanout1);}
}
?
- 在consumer服務(wù)中的監(jiān)聽類中添加方法進(jìn)行監(jiān)聽
package com.gaohe.consumer.lisenner;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class FanoutLisenner {@RabbitListener(queues = "fanout.queue1")public void fanoutQueueLisenner(String msg){System.out.println("fanoutQueueLisenner:"+msg);}@RabbitListener(queues = "fanout.queue2")public void fanoutQueueLisenner2(String msg){System.out.println("fanoutQueueLisenner2:"+msg);}
}
?
- 在publisher服務(wù)創(chuàng)建測試類進(jìn)行測試
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@Slf4j
@SpringBootTest(classes = PublisherApplication.class)
public class PublisherTest {@Autowiredpublic RabbitTemplate rabbitTemplate;@Testpublic void test3(){
// 1.發(fā)送的隊(duì)列String exName ="itgaohe.fanout";
// 2.發(fā)送的消息String msg = "你好";
// 3.發(fā)送rabbitTemplate.convertAndSend(exName,"",msg);}}
3.2 路由交換機(jī)(DirectExchange)
? ? ? ? 交換機(jī),隊(duì)列不僅可以單獨(dú)配置,也可以在監(jiān)聽類使用注解進(jìn)行配置
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class DirectLisenner {@RabbitListener(bindings = @QueueBinding(value = @Queue("direct.queue1"),exchange = @Exchange(value = "itgaohe.direct",type = ExchangeTypes.DIRECT),key = {"blue","red"}))public void directQueueLisenner(String msg){System.out.println("directQueueLisenner"+msg);}@RabbitListener(bindings = @QueueBinding(value = @Queue("direct.queue2"),exchange = @Exchange(value = "itgaohe.direct",type = ExchangeTypes.DIRECT),key = {"yellow","red"}))public void directQueueLisenner2(String msg){System.out.println("directQueueLisenner2"+msg);}}
? ? ? ? ?publisher測試類進(jìn)行測試
package com.gaohe.publisher;import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@Slf4j
@SpringBootTest(classes = PublisherApplication.class)
public class PublisherTest {@Autowiredpublic RabbitTemplate rabbitTemplate;@Testpublic void test3(){
// 1.發(fā)送的隊(duì)列String exName ="itgaohe.direct";
// 2.發(fā)送的消息String msg = "I LOVE YOU ";
// 3.發(fā)送rabbitTemplate.convertAndSend(exName,"yellow",msg);}}
3.3 廣播交換機(jī)(TopicExchange)
- 監(jiān)聽類
package com.gaohe.consumer.lisenner;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;@Component
public class TopicLisenner {@RabbitListener(bindings = @QueueBinding(value = @Queue("topic.queue1"),exchange = @Exchange(value = "itgaohe.topic",type = ExchangeTypes.TOPIC),key = {"china.#","#.weather"}))public void directQueueLisenner(String msg){System.out.println("directQueueLisenner"+msg);}@RabbitListener(bindings = @QueueBinding(value = @Queue("topic.queue2"),exchange = @Exchange(value = "itgaohe.topic",type = ExchangeTypes.TOPIC),key = {"us.#","#.weather"}))public void directQueueLisenner2(String msg){System.out.println("directQueueLisenner2"+msg);}}
- 測試類
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@Slf4j
@SpringBootTest(classes = PublisherApplication.class)
public class PublisherTest {@Autowiredpublic RabbitTemplate rabbitTemplate;@Testpublic void test4(){
// 1.發(fā)送的String exName ="itgaohe.topic";
// 2.發(fā)送的消息String msg = "hello world6666";
// 3.發(fā)送rabbitTemplate.convertAndSend(exName,"aa.weather",msg);}
}
? ? ? ? 用的最多的是路由交換機(jī)和廣播交換機(jī)
4. mq消息轉(zhuǎn)換器
????????消息轉(zhuǎn)換器是消息中間件中的數(shù)據(jù)格式轉(zhuǎn)換層,負(fù)責(zé)在消息生產(chǎn)/消費(fèi)過程中實(shí)現(xiàn):
-
Java對象 ? 消息體序列化/反序列化
-
消息屬性(headers/properties)的自動處理
-
不同數(shù)據(jù)格式間的相互轉(zhuǎn)換
配置消息轉(zhuǎn)換器
- 父工程導(dǎo)入依賴
<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId>
</dependency>
- 給提供者和消費(fèi)者配置消息轉(zhuǎn)換器Bean對象
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;@Component
public class mqConfig {@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}
}
- 定義消費(fèi)者,監(jiān)聽隊(duì)列并消費(fèi)消息
- 測試
?