中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁(yè) > news >正文

網(wǎng)站開(kāi)發(fā)外包報(bào)價(jià)建設(shè)網(wǎng)站

網(wǎng)站開(kāi)發(fā)外包報(bào)價(jià),建設(shè)網(wǎng)站,淘客推廣個(gè)人網(wǎng)站怎么做,裝修自己設(shè)計(jì)3d效果手機(jī)軟件文章目錄 一、初識(shí) MQ1. 同步通訊2. 異步通訊3. MQ 常見(jiàn)框架 二、RabbitMQ 入門(mén)1. 概述和安裝2. 常見(jiàn)消息模型3. 基礎(chǔ)模型練習(xí) 三、SpringAMQP1. 簡(jiǎn)單隊(duì)列模型2. 工作隊(duì)列模型3. 發(fā)布訂閱模型3.1 Fanout Exchange3.2 Direct Exchange3.3 Topic Exchange 一、初識(shí) MQ 1. 同步通…

文章目錄

    • 一、初識(shí) MQ
      • 1. 同步通訊
      • 2. 異步通訊
      • 3. MQ 常見(jiàn)框架
    • 二、RabbitMQ 入門(mén)
      • 1. 概述和安裝
      • 2. 常見(jiàn)消息模型
      • 3. 基礎(chǔ)模型練習(xí)
    • 三、SpringAMQP
      • 1. 簡(jiǎn)單隊(duì)列模型
      • 2. 工作隊(duì)列模型
      • 3. 發(fā)布訂閱模型
        • 3.1 Fanout Exchange
        • 3.2 Direct Exchange
        • 3.3 Topic Exchange

一、初識(shí) MQ

1. 同步通訊

同步和異步的區(qū)別:
① 同步通訊類似于打電話,是一對(duì)一的同時(shí)發(fā)生的通訊,因此它的時(shí)效性更好(一步走完才能走下一步);
② 異步通訊類似于發(fā)短信,給一個(gè)人發(fā)的同時(shí)還可以給別人發(fā),支持多線操作,但是由于通訊不同步,所以它的時(shí)效性差(多步可以同時(shí)走)。

微服務(wù)間基于 Feign 的調(diào)用就屬于同步調(diào)用,比如支付服務(wù)內(nèi)部一般都會(huì)調(diào)用訂單服務(wù),這個(gè)時(shí)候它就必須要等待對(duì)方的回應(yīng)才能進(jìn)行后面的操作,顯然這個(gè)過(guò)程是實(shí)時(shí)性的,但也存在一定的問(wèn)題。

同步通訊劣勢(shì):
① 耦合度高。每次加入新的需求,都要修改原來(lái)的代碼;
② 性能下降。調(diào)用者需要等待服務(wù)提供者的響應(yīng),響應(yīng)時(shí)間等于每次調(diào)用的時(shí)間之和,如果調(diào)用鏈過(guò)長(zhǎng)會(huì)直接導(dǎo)致性能下降;
③ 資源浪費(fèi)。調(diào)用鏈中的每個(gè)服務(wù)在等待響應(yīng)的過(guò)程中,不能釋放所占用的資源。
A 服務(wù)調(diào)用 B 服務(wù),在 B 未執(zhí)行完之前,A 一直干等著,它占用的資源也不會(huì)釋放;
④ 級(jí)聯(lián)失敗。如果服務(wù)提供者出現(xiàn)問(wèn)題,所有的調(diào)用方也會(huì)跟著出現(xiàn)問(wèn)題。 A 服務(wù)里面同時(shí)調(diào)用了 B 服務(wù)和 C 服務(wù),B 服務(wù)出故障導(dǎo)致 A 所占用的資源一直得不到釋放,一次、兩次、無(wú)數(shù)次這樣的請(qǐng)求最終導(dǎo)致 A 的資源被耗盡,而 A 服務(wù)里面還有 C,C 是正常的,但由于 A 的資源已被耗盡了,導(dǎo)致 A 調(diào)用 C 的業(yè)務(wù)時(shí)也會(huì)出現(xiàn)問(wèn)題。

2. 異步通訊

異步調(diào)用常見(jiàn)的實(shí)現(xiàn)就是事件驅(qū)動(dòng)模式。

異步調(diào)用引入一個(gè) Broker 事件代理者,當(dāng)服務(wù)消費(fèi)者接收到前端請(qǐng)求后,就會(huì)發(fā)布一個(gè)事件給 Broker,然后由 Broker 廣播,將事件通知給所有的提供者,讓這些提供者各自開(kāi)始行動(dòng)。

同步通訊中消費(fèi)者會(huì)在代碼中依次調(diào)用提供者,而異步通訊中的消費(fèi)者不再負(fù)責(zé)調(diào)用提供者,它只管發(fā)布事件到 Broker,由提供者自行訂閱事件就可以了。

異步通訊優(yōu)勢(shì):
① 解除了服務(wù)與服務(wù)之間的耦合。消費(fèi)者不再負(fù)責(zé)調(diào)用提供者,只管發(fā)布事件到 Broker 就可以了, 所以一旦有新的業(yè)務(wù)出現(xiàn),直接去訂閱事件就可以了;
② 性能提升,吞吐量提高。消費(fèi)者發(fā)布完事件后立馬就會(huì)響應(yīng)給客戶,至于提供者什么時(shí)候運(yùn)行完我們并不在意, 只要最終能做完就行了,所以業(yè)務(wù)的總耗時(shí)就為消費(fèi)者的請(qǐng)求時(shí)間和事件的發(fā)布時(shí)間之和;
③ 不存在資源浪費(fèi)。 什么是資源浪費(fèi)?就是占著資源卻不用。以前消費(fèi)者會(huì)占著資源等待提供者運(yùn)行完畢,現(xiàn)在服務(wù)之間已經(jīng)徹底解耦了,又怎么會(huì)浪費(fèi);
④ 解決級(jí)聯(lián)失敗問(wèn)題。服務(wù)之間沒(méi)有強(qiáng)依賴,不必?fù)?dān)心級(jí)聯(lián)失敗問(wèn)題, 即使某一個(gè)提供者掛掉了,也不會(huì)影響消費(fèi)者;
⑤ 流量削峰。 當(dāng)大量的用戶請(qǐng)求過(guò)來(lái)的時(shí)候,消費(fèi)者會(huì)發(fā)布大量的事件給 Broker,因?yàn)樘峁┱咛幚順I(yè)務(wù)的能力有限,所以 Broker 在這里會(huì)對(duì)事件做一個(gè)緩沖,具體的提供者根據(jù)自己的能力處理業(yè)務(wù),壓力全給到 Broker 就可以了,最終對(duì)微服務(wù)起到一個(gè)保護(hù)的作用。

異步通訊劣勢(shì):
① 依賴于 Broker 的可靠性、安全性和吞吐能力;
② 架構(gòu)復(fù)雜,業(yè)務(wù)沒(méi)有明顯的流程線,不好追蹤管理。

異步通訊只是通知提供者去干,至于什么時(shí)候干完誰(shuí)都不知道。實(shí)際上大多數(shù)情況下都會(huì)使用同步通訊,因?yàn)樘峁┱叻祷氐慕Y(jié)果消費(fèi)者立馬要用,我們更追求的是時(shí)效性,而對(duì)并發(fā)并沒(méi)有很高的要求!

3. MQ 常見(jiàn)框架

MQ 即消息隊(duì)列,也就是事件驅(qū)動(dòng)架構(gòu)中的 Broker。

在這里插入圖片描述

比較:
① RabbitMQ 支持多種協(xié)議,這意味著它支持的功能更多,其中 AMQP 可以實(shí)現(xiàn)跨語(yǔ)言通訊,而 RocketMQ 和 Kafka是自定義的協(xié)議;
② RabbitMQ、RocketMQ 和 Kafka 都支持主從集群,可用性較高;
③ Kafka的吞吐量非常高,但是高吞吐量帶來(lái)的犧牲就是消息的延遲性和不可靠性,容易出現(xiàn)消息丟失的情況。

Kafka 適用于海量數(shù)據(jù)的傳輸,但是對(duì)數(shù)據(jù)的安全要求不高的情況(比如日志信息),RabbitMQ 和 RocketMQ 的穩(wěn)定性更強(qiáng),更適合對(duì)穩(wěn)定性要求較高的場(chǎng)景(比如業(yè)務(wù)通信),如果要做自定義協(xié)議,我們可以選擇 RocketMQ,因?yàn)?RockerMQ 是基于 Java 語(yǔ)言開(kāi)發(fā)的,且支持自定義協(xié)議。

二、RabbitMQ 入門(mén)

1. 概述和安裝

RabbitMQ 的整體結(jié)構(gòu)如下:
在這里插入圖片描述

消息的發(fā)布者將來(lái)會(huì)把消息發(fā)送到 exchange 交換機(jī),交換機(jī)負(fù)責(zé)將消息路由到具體的 queue 隊(duì)列,隊(duì)列負(fù)責(zé)暫存消息,消息的接收者就會(huì)從隊(duì)列中拿取并處理消息。每個(gè)用戶會(huì)有自己的虛擬主機(jī),各個(gè)虛擬主機(jī)之間是相互隔離的,這樣可以避免干擾!

① 將本地的壓縮文件上傳至 Linux,并加載為一個(gè)鏡像

在這里插入圖片描述

在這里插入圖片描述

② 創(chuàng)建并運(yùn)行 MQ 容器,15672 是 RibbitMQ 控制臺(tái)的端口,5672 是消息通訊的端口(發(fā)消息、收消息)

docker run \
-e RABBITMQ_DEFAULT_USER=zxe \
-e RABBITMQ_DEFAULT_PASS=856724bb \
--name mq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management

在這里插入圖片描述

③ 瀏覽器訪問(wèn)一下 RabbitMQ 的控制臺(tái),根據(jù)剛才設(shè)置的用戶名和密碼登錄

在這里插入圖片描述

在這里插入圖片描述

每個(gè)模塊的含義:
① Overview(總覽):主要可以看到 RabbitMQ 的一些節(jié)點(diǎn)信息;
② Connections(連接):將來(lái)消息的發(fā)布者和接收者都應(yīng)該與 RabbitMQ 建立連接;
③ Channels(通道):是操作 MQ 的工具,建立連接之后必須創(chuàng)建一個(gè) Channel 通道,提供者和消費(fèi)者才能基于 Channel 完成消息的發(fā)送或接收;
④ Exchanges(交換機(jī)):消息的路由器,負(fù)責(zé)將消息路由到對(duì)面;
⑤ Queues(隊(duì)列):用來(lái)做消息存儲(chǔ);
⑥ Admin(管理):管理當(dāng)前用戶信息;
⑦ virtual host(虛擬主機(jī)):是對(duì) queue、exchange 等資源的邏輯分組。

2. 常見(jiàn)消息模型

RabbitMQ 官方文檔:rabbitmq.com

查看官方文檔可以看到官方給出的很多 Demo 案例,但是只有前五個(gè)和消息的接收發(fā)送有關(guān)。

其中,前兩個(gè)是基于隊(duì)列發(fā)送的,也就是說(shuō)消息的發(fā)布者和訂閱者之間直接通過(guò)隊(duì)列連接,沒(méi)有交換機(jī)。而后三個(gè)屬于發(fā)布訂閱模式,有交換機(jī),根據(jù)交換機(jī)的類型分為廣播、路由和主題三種。

在這里插入圖片描述

在這里插入圖片描述

3. 基礎(chǔ)模型練習(xí)

官方的 HelloWorld 是基于最基礎(chǔ)的消息隊(duì)列模型來(lái)實(shí)現(xiàn)的,只包括三個(gè)角色:消息發(fā)布者、消息隊(duì)列和訂閱者。

① 發(fā)布消息

發(fā)布消息基本步驟:創(chuàng)建連接工廠 → 設(shè)置連接參數(shù) → 建立連接 → 創(chuàng)建通道 → 創(chuàng)建隊(duì)列 → 發(fā)送消息 → 關(guān)閉通道和連接。

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class PublisherTest {@Testpublic void testSendMessage() throws IOException, TimeoutException {// 1.創(chuàng)建連接工廠ConnectionFactory factory = new ConnectionFactory();// 2.設(shè)置連接參數(shù),分別是:主機(jī)名、端口號(hào)、vhost、用戶名、密碼factory.setHost("192.168.149.101");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("zxe");factory.setPassword("123321");// 3.建立連接Connection connection = factory.newConnection();// 4.創(chuàng)建通道ChannelChannel channel = connection.createChannel();// 5.創(chuàng)建隊(duì)列String queueName = "simple.queue";channel.queueDeclare(queueName, false, false, false, null);// 6.發(fā)送消息String message = "hello, rabbitmq!";channel.basicPublish("", queueName, null, message.getBytes());System.out.println("發(fā)送消息成功:【" + message + "】");// 7.關(guān)閉通道和連接channel.close();connection.close();}
}

在這里插入圖片描述

在這里插入圖片描述

② 訂閱消息

訂閱消息基本步驟:創(chuàng)建連接工廠 → 設(shè)置連接參數(shù) → 建立連接 → 創(chuàng)建通道 → 創(chuàng)建隊(duì)列 → 訂閱消息 → 處理消息。

import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ConsumerTest {public static void main(String[] args) throws IOException, TimeoutException {// 1.建立連接ConnectionFactory factory = new ConnectionFactory();// 1.1.設(shè)置連接參數(shù),分別是:主機(jī)名、端口號(hào)、vhost、用戶名、密碼factory.setHost("192.168.149.101");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("zxe");factory.setPassword("123321");// 1.2.建立連接Connection connection = factory.newConnection();// 2.創(chuàng)建通道ChannelChannel channel = connection.createChannel();// 3.創(chuàng)建隊(duì)列String queueName = "simple.queue";channel.queueDeclare(queueName, false, false, false, null);// 4.訂閱消息channel.basicConsume(queueName, true, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {// 5.處理消息,隊(duì)列一有消息函數(shù)就會(huì)執(zhí)行,body就是消息體String message = new String(body);System.out.println("接收到消息:【" + message + "】");}});System.out.println("等待接收消息。。。。");}
}

發(fā)布者已經(jīng)創(chuàng)建了隊(duì)列,訂閱者為什么還要?jiǎng)?chuàng)建?這是因?yàn)樗鼈兊南群髨?zhí)行順序是不確定的,以防萬(wàn)一訂閱者在發(fā)布者之前執(zhí)行,隊(duì)列不存在的問(wèn)題,其實(shí)最終也不會(huì)創(chuàng)建兩個(gè)隊(duì)列,而是有則不創(chuàng)建,沒(méi)有就創(chuàng)建!

從 MQ 上接收消息也需要時(shí)間,所以函數(shù)外的代碼先執(zhí)行,函數(shù)內(nèi)的代碼后執(zhí)行,這就是異步。

在這里插入圖片描述

消息一旦被接收,就會(huì)立馬從隊(duì)列中消失。

在這里插入圖片描述

三、SpringAMQP

1. 簡(jiǎn)單隊(duì)列模型

上面案例中發(fā)布消息和訂閱消息的步驟十分繁瑣吧?接下來(lái)的 SpringAMQP 就是用來(lái)簡(jiǎn)化這些步驟的,它是一種模板協(xié)議,與語(yǔ)言和平臺(tái)無(wú)關(guān),利用 SpringAMQP 提供好的模板工具直接可以發(fā)送和接收消息,其實(shí)底層封裝的還是 Rabbit 的客戶端。

① 父工程中引入 AMQP 依賴

<!--AMQP依賴,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

② 編寫(xiě) publisher 和 consumer 的 application.yml,添加連接信息

spring:rabbitmq:host: 192.168.150.101 #主機(jī)名port: 5672 #端口virtual-host: / #虛擬主機(jī)username: zxe #用戶名password: 123321 #密碼

③ SpringAMQP 方式并不會(huì)主動(dòng)創(chuàng)建隊(duì)列,需要我們提前去控制臺(tái)創(chuàng)建 simple.queue 隊(duì)列

在這里插入圖片描述

④ 在 publisher 中編寫(xiě)測(cè)試方法,向 simple.queue 隊(duì)列發(fā)送消息

主機(jī)名、端口等信息都寫(xiě)在配置文件里面了,代碼中只需要傳入隊(duì)列名和所需發(fā)送的消息即可。

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendMessage2SimpleQueue() {String queueName = "simple.queue";String message = "hello, spring amqp!";//發(fā)送消息rabbitTemplate.convertAndSend(queueName, message);}
}

⑤ 發(fā)布一個(gè)消息,去 RabbitMQ 控制臺(tái)看一下

在這里插入圖片描述

在這里插入圖片描述

⑥ 在 consumer 服務(wù)中新建一個(gè)類,用于編寫(xiě)消費(fèi)邏輯

主機(jī)名、端口等信息都寫(xiě)在配置文件里面了,代碼中只需要指定監(jiān)聽(tīng)的隊(duì)列名和接下來(lái)的行為方法就可以了,Spring 會(huì)自動(dòng)將監(jiān)聽(tīng)到的消息以參數(shù)的形式傳遞給行為方法。

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void listenSimpleQueue(String msg) {System.out.println("消息者已接收到simple.queue的消息:【" + msg + "】");}
}

⑦ 運(yùn)行 Consumer 的啟動(dòng)類,同時(shí)去看一個(gè) RabbitMQ 控制臺(tái),發(fā)現(xiàn)隊(duì)列中的消息條數(shù)變成了 0,說(shuō)明消息已經(jīng)被成功消費(fèi)了

在這里插入圖片描述

在這里插入圖片描述

2. 工作隊(duì)列模型

以前我們發(fā)布者發(fā)送過(guò)來(lái)的消息都由一個(gè)消費(fèi)者去處理,但是當(dāng)發(fā)布者發(fā)送的消息很多時(shí),一個(gè)消費(fèi)者就可能處理不過(guò)來(lái)了,處理不過(guò)來(lái)的消息全部堆積在隊(duì)列中,長(zhǎng)期下去,隊(duì)列肯定會(huì)爆滿,最終造成數(shù)據(jù)丟失。

解決辦法就是我們可以安排多個(gè)消費(fèi)者,讓它們共同處理隊(duì)列中的消息。
消費(fèi)者之間是一種合作的關(guān)系,一條消息如果被某一消費(fèi)者接收了,它就會(huì)立馬從隊(duì)列中消失,所以不會(huì)出現(xiàn)重復(fù)消費(fèi)的問(wèn)題。

Work Queue(工作隊(duì)列),可以提高消息的處理速度,避免隊(duì)列消息堆積。

在這里插入圖片描述

模擬 WorkQueue,基本思路如下:
① 在 publisher 服務(wù)中定義測(cè)試方法,每秒產(chǎn)生 50 條消息,發(fā)送到 simple.queue;
② 在 consumer 服務(wù)中定義兩個(gè)消息監(jiān)聽(tīng)者,都監(jiān)聽(tīng) simple.queue 隊(duì)列;
③ 消費(fèi)者 1 每秒處理 50 條消息,消費(fèi)者 2 每秒處理 10 條消息。

① 編寫(xiě) publish 代碼,for 循環(huán)模擬發(fā)送 50 條消息,每發(fā)送一次我們讓線程休眠 20 毫秒,這樣發(fā)完 50 條消息就是 1 秒

@Test
public void testSendMessage2WorkQueue() throws InterruptedException {String queueName = "simple.queue";String message = "hello, message_";for (int i = 1; i <= 50; i++) {rabbitTemplate.convertAndSend(queueName, message + i);Thread.sleep(20);}
}

② 編寫(xiě) consumer 代碼,兩個(gè)方法模擬兩個(gè)消費(fèi)者,同樣設(shè)置休眠時(shí)間模擬消費(fèi)者的處理速度

@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println("消費(fèi)者1接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(20);
}@RabbitListener(queues = "simple.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {//err打印出紅色字體,比較容易區(qū)分System.err.println("消費(fèi)者2接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(200);
}

③ 運(yùn)行之后我們發(fā)現(xiàn),消費(fèi)者并沒(méi)有在一秒內(nèi)將消息處理完,且消費(fèi)者1 只處理偶數(shù)消息,消費(fèi)者2 只處理奇數(shù)消息,且消費(fèi)者1 提前就處理完了,消費(fèi)者2 晚了好久才把消息處理完

在這里插入圖片描述

這是因?yàn)槲覀?RabbitMQ 的預(yù)取機(jī)制所導(dǎo)致的。當(dāng)消息進(jìn)入隊(duì)列之后,我們的消費(fèi)者并不是立馬去消費(fèi)這些消息,而是先根據(jù)輪詢的方式把消息平均分配給所有的消費(fèi)者,這就是預(yù)取。
它不管每個(gè)消費(fèi)者的消費(fèi)能力如何,一律平均分配,這就出現(xiàn)了消費(fèi)者1 處理完畢,消費(fèi)者2 還在繼續(xù)的問(wèn)題。

④ 所以接下來(lái),可以去配置文件里設(shè)置消費(fèi)預(yù)取限制,設(shè)置 preFetch 值可以控制預(yù)期消息的上限

spring:rabbitmq:host: 192.168.149.100 #主機(jī)名port: 5672 #端口virtual-host: / #虛擬主機(jī)username: zxe #用戶名password: 856724bb #密碼listener:simple:prefetch: 1 #每次只能獲取一條消息,處理完成后才能獲取下一個(gè)消息

⑤ 配置完之后重啟服務(wù),果然在一秒內(nèi)完成消費(fèi)

在這里插入圖片描述

3. 發(fā)布訂閱模型

發(fā)布訂閱模式與之前案例的區(qū)別就是,它允許將同一消息發(fā)送給多個(gè)消費(fèi)者(這里的多個(gè)消費(fèi)者指的是多個(gè)不同的服務(wù)),實(shí)現(xiàn)方式是加入了 exchange 交換機(jī)。

發(fā)布者將消息發(fā)送給交換機(jī),交換機(jī)負(fù)責(zé)將消息路由到不同的隊(duì)列,即一個(gè)消息同時(shí)被多個(gè)消費(fèi)者消費(fèi)。

注意交換機(jī)只負(fù)責(zé)消息的路由,而不是存儲(chǔ),如果路由失敗則消息丟失!

3.1 Fanout Exchange

Fanout Exchange 會(huì)將接收到的消息路由到每一個(gè)跟其綁定的 queue,因此稱為廣播模式。

一次發(fā)布,多個(gè)消費(fèi)者都能接受。

在這里插入圖片描述

① 聲明隊(duì)列、交換機(jī),并將隊(duì)列綁定到交換機(jī)上

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class FanoutConfig {//聲明交換機(jī)@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange("zxe.fanout");}//聲明隊(duì)列@Beanpublic Queue fanoutQueue1() {return new Queue("fanout.queue1");}@Beanpublic Queue fanoutQueue2() {return new Queue("fanout.queue2");}//將隊(duì)列綁定到交換機(jī),參數(shù)為上面聲明的方法名@Beanpublic Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}@Beanpublic Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}

② 運(yùn)行項(xiàng)目,測(cè)試以下

在這里插入圖片描述

在這里插入圖片描述

② 編寫(xiě)消費(fèi)者監(jiān)聽(tīng)代碼,分別監(jiān)聽(tīng)兩個(gè)隊(duì)列的消息

@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {System.out.println("消費(fèi)者已接收到fanout.queue1的消息:【" + msg + "】");
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue(String msg) {System.out.println("消費(fèi)者已接收到fanout.queue2的消息:【" + msg + "】");
}

③ 編寫(xiě)發(fā)布者代碼,將消息直接發(fā)送到交換機(jī)

@Test
public void testSendFanoutExchange() {//交換機(jī)名稱String exchangeName = "zxe.fanout";//消息String message = "hello, every one!";//發(fā)送消息rabbitTemplate.convertAndSend(exchangeName, "", message);
}

④ 啟動(dòng)項(xiàng)目,發(fā)布消息,并消費(fèi)

在這里插入圖片描述

步驟總結(jié):
① 首先聲明交換機(jī)和隊(duì)列,并將隊(duì)列綁定到交換機(jī);
② 對(duì)于發(fā)布者,需要指定交換機(jī)名稱和所要發(fā)布的消息;
③ 對(duì)于消費(fèi)者,直接指定想要監(jiān)聽(tīng)的隊(duì)列名和行為方法即可。

3.2 Direct Exchange

Direct Exchange 會(huì)將接收到的消息根據(jù)規(guī)則路由到指定的 queue,因此稱為路由模式。

原理:
① 每一個(gè)隊(duì)列都會(huì)設(shè)置一個(gè) BindingKey;
② 發(fā)布者發(fā)送消息時(shí),會(huì)指定消息的 RoutingKey;
③ 交換機(jī)負(fù)責(zé)將消息路由到 BindingKey 和 RoutingKey 一致的隊(duì)列。

BindingKey 和 RoutingKey 相當(dāng)于對(duì)暗號(hào),暗號(hào)相同則路由。

一個(gè)隊(duì)列可以指定多個(gè) BindingKey,如果多個(gè)隊(duì)列的 BindingKey 都可以與 RoutingKey 匹配,那交換機(jī)就會(huì)將消息發(fā)送給多個(gè)隊(duì)列。

① 用 RabbitListener 注解聲明交換機(jī)、隊(duì)列和 BindingKey

//用RabbitListener注解聲明交換機(jī)、隊(duì)列和 BindingKey
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange("zxe.direct"),key = {"red", "blue"}
))
public void listenDirectQueue1(String msg) {System.out.println("消費(fèi)者已接收到direct.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange("zxe.direct"),key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg) {System.out.println("消費(fèi)者已接收到direct.queue2的消息:【" + msg + "】");
}

在這里插入圖片描述

② 編寫(xiě)發(fā)布者代碼,指定交換機(jī)名,要發(fā)布的消息,以及 RoutingKey

在這里插入圖片描述

③ 啟動(dòng)項(xiàng)目,可以看到隊(duì)列1 接收到消息,因?yàn)樗锩姘?blue

在這里插入圖片描述

3.3 Topic Exchange

Topic Exchange 與 Direct Exchange 類似,區(qū)別在于 Topic Exchange 的 routingKey 必須是由多個(gè)單詞組成的列表,并以 . 分隔,比如 china.news,china.weather,所以它又稱為話題模式。

Queue 與 Exchange 指定 BindingKey 時(shí)可以使用通配符。
#:代指 0 個(gè)或多個(gè)單詞
*:代指一個(gè)單詞

比如中國(guó)相關(guān)的一切,可以這樣表示:china.*
所有的新聞,可以這樣表示:#.news

以前 bindingKey 需要綁定 news、weather、food 等多個(gè) key,現(xiàn)在只需要一個(gè) china.* 就搞定了,只要是 china 的我都要!

① 用 @RabbitListener 聲明隊(duì)列、交換機(jī)和 BindingKey,這里使用通配符來(lái)指定 key

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "zxe.topic", type = ExchangeTypes.TOPIC),key = "china.*"))public void listenTopicQueue1(String msg) {System.out.println("消費(fèi)者已接收到topic.queue1的消息:【" + msg + "】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "zxe.topic", type = ExchangeTypes.TOPIC),key = "#.news"))public void listenTopicQueue2(String msg) {System.out.println("消費(fèi)者已接收到topic.queue2的消息:【" + msg + "】");}

② 編寫(xiě)發(fā)布者代碼,指定 RoutingKey

@Testpublic void testSendTopicExchange() {String exchangeName = "zxe.topic";String message = "hello, china!";rabbitTemplate.convertAndSend(exchangeName, "china.weather", message);}

③ 運(yùn)行一下

在這里插入圖片描述

http://www.risenshineclean.com/news/42170.html

相關(guān)文章:

  • 推薦常州網(wǎng)站建設(shè)seo技術(shù)員
  • 可以做宣傳海報(bào)的網(wǎng)站信息流優(yōu)化師簡(jiǎn)歷怎么寫(xiě)
  • 網(wǎng)站目錄怎么做推廣專員是做什么的
  • 最新國(guó)際新聞?lì)^條今日國(guó)際大事件seo計(jì)費(fèi)系統(tǒng)登錄
  • 做網(wǎng)站濱州市最近的時(shí)事新聞
  • 杭州手機(jī)申請(qǐng)網(wǎng)站登錄谷歌chrome
  • 哪個(gè)網(wǎng)站的圖片可以做素材永久觀看不收費(fèi)的直播
  • 魏縣做網(wǎng)站網(wǎng)站排名推廣工具
  • 普象工業(yè)設(shè)計(jì)網(wǎng)站上海最新事件
  • 上海制作網(wǎng)頁(yè)宣傳seo發(fā)展前景怎么樣啊
  • 如何知道網(wǎng)站開(kāi)發(fā)語(yǔ)言軟文營(yíng)銷的成功案例
  • wordpress計(jì)算器主題優(yōu)化站點(diǎn)
  • 怎么做淘寶客的跳轉(zhuǎn)網(wǎng)站免費(fèi)的網(wǎng)頁(yè)入口
  • 上海企業(yè)網(wǎng)站建設(shè)谷歌關(guān)鍵詞挖掘工具
  • 重慶網(wǎng)站推廣轉(zhuǎn)化率鄭州seo聯(lián)系搜點(diǎn)網(wǎng)絡(luò)效果好
  • 網(wǎng)站建設(shè)及第三方支付沈陽(yáng)網(wǎng)絡(luò)優(yōu)化培訓(xùn)
  • 西安高校網(wǎng)站建設(shè)整站優(yōu)化價(jià)格
  • 中國(guó)建設(shè)行業(yè)峰會(huì)官方網(wǎng)站torrentkitty磁力天堂
  • 專業(yè)網(wǎng)站制作公司招聘深圳網(wǎng)絡(luò)公司推廣平臺(tái)
  • 實(shí)驗(yàn)中心網(wǎng)站建設(shè)媒體軟文發(fā)稿
  • 電子書(shū)城網(wǎng)站開(kāi)發(fā)項(xiàng)目概況網(wǎng)絡(luò)推廣引流方式
  • wordpress用哪種緩存器seo排名軟件哪個(gè)好用
  • 做淘寶必備網(wǎng)站手機(jī)seo關(guān)鍵詞優(yōu)化
  • 做網(wǎng)站是干什么用的百度營(yíng)銷推廣登錄平臺(tái)
  • 網(wǎng)站建設(shè) 軟件有哪些內(nèi)容最近剛發(fā)生的新聞
  • 大連做網(wǎng)站首選領(lǐng)超科技域名注冊(cè)服務(wù)網(wǎng)站哪個(gè)好
  • 網(wǎng)站建站所用的技術(shù)自動(dòng)優(yōu)化句子的軟件
  • 長(zhǎng)春火車站24小時(shí)人工客服電話香飄飄奶茶
  • 介紹做網(wǎng)站的標(biāo)題廈門(mén)人才網(wǎng)個(gè)人登錄
  • 韶關(guān)城鄉(xiāng)建設(shè)部網(wǎng)站首頁(yè)螺螄粉營(yíng)銷策劃方案