招工 最新招聘信息怎么寫seo搜索引擎優(yōu)化教程
目錄
前言
1. 生產(chǎn)者
2. 消費(fèi)者
3. 啟動(dòng)消息隊(duì)列服務(wù)器
4. 運(yùn)行效果
?結(jié)語(yǔ)
前言
? ? ? ? 在上一章節(jié),我們完成了消息隊(duì)列的客戶端部分,至此我們整個(gè)消息隊(duì)列項(xiàng)目就構(gòu)建完成了,那我們做的這個(gè)消息隊(duì)列到底有什么效果,以及如何去使用我們自己的消息隊(duì)列呢?那么本文,就將我們的MQ進(jìn)行實(shí)戰(zhàn)操作,寫一個(gè)基于MQ的生產(chǎn)者消費(fèi)者模型.本項(xiàng)目全部代碼已上傳Gitee,鏈接放在文章末尾,歡迎大家訪問(wèn)!
1. 生產(chǎn)者
我們的生產(chǎn)者就是一個(gè)客戶端,需要將自己生產(chǎn)出來(lái)的消息發(fā)送到消息隊(duì)列中,供消費(fèi)者進(jìn)行使用.
我們創(chuàng)建一個(gè)生產(chǎn)者,在服務(wù)器端創(chuàng)建交換機(jī)(直接),隊(duì)列,然后往對(duì)應(yīng)的隊(duì)列進(jìn)行投遞消息.
1. 實(shí)例化創(chuàng)建連接的工廠類
2. 設(shè)置消息隊(duì)列服務(wù)器的IP地址以及端口號(hào)
3. 新建一個(gè)連接,創(chuàng)建Channel,交換機(jī),隊(duì)列
4. 新建一個(gè)消息轉(zhuǎn)換成字節(jié)文件進(jìn)行發(fā)送,此時(shí)給線程一個(gè)休眠的時(shí)間,確保已經(jīng)發(fā)送到消息隊(duì)列服務(wù)器
5. 關(guān)閉通道,關(guān)閉連接
package com.example.demo.demo;import com.example.demo.mqclient.Channel;
import com.example.demo.mqclient.Connection;
import com.example.demo.mqclient.ConnectionFactory;
import com.example.demo.mqserver.core.ExchangeType;import java.io.IOException;/*** Created with IntelliJ IDEA.* Description:生產(chǎn)者 通常是一個(gè)單獨(dú)的服務(wù)器程序* User: YAO* Date: 2023-08-03* Time: 16:06*/
public class DemoProducer {public static void main(String[] args) throws IOException, InterruptedException {System.out.println("啟動(dòng)生產(chǎn)者");ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(9090);Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 創(chuàng)建交換機(jī)和隊(duì)列channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null);channel.queueDeclare("testQueue", true, false, false, null);// 創(chuàng)建一個(gè)消息并發(fā)送byte[] body = "hello".getBytes();boolean ok = channel.basicPublish("testExchange", "testQueue", null, body);System.out.println("消息投遞完成! ok=" + ok);Thread.sleep(500);channel.close();connection.close();}
}
2. 消費(fèi)者
消費(fèi)者也是客戶端,所做的前期工作是一樣的,只不過(guò)是發(fā)送的請(qǐng)求不同.
1. 消費(fèi)者需要進(jìn)行訂閱消息,接收到消息之后,執(zhí)行回調(diào)進(jìn)行消費(fèi)消息.
2. 消費(fèi)者需要循環(huán)等待消息隊(duì)列的響應(yīng),等待消費(fèi).
package com.example.demo.demo;import com.example.demo.common.Consumer;
import com.example.demo.common.MqException;
import com.example.demo.mqclient.Channel;
import com.example.demo.mqclient.Connection;
import com.example.demo.mqclient.ConnectionFactory;
import com.example.demo.mqserver.core.BasicProperties;
import com.example.demo.mqserver.core.ExchangeType;import java.io.IOException;
/*** Created with IntelliJ IDEA.* Description:消費(fèi)者 通常是一個(gè)單獨(dú)的服務(wù)器程序* User: YAO* Date: 2023-08-03* Time: 16:07*/
public class DemoConsumer {public static void main(String[] args) throws MqException, InterruptedException, IOException {System.out.println("啟動(dòng)消費(fèi)者!");ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(9090);Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null);channel.queueDeclare("testQueue", true, false, false, null);channel.basicConsume("testQueue", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {System.out.println("[消費(fèi)數(shù)據(jù)] 開(kāi)始!");System.out.println("consumerTag=" + consumerTag);System.out.println("basicProperties=" + basicProperties);String bodyString = new String(body, 0, body.length);System.out.println("body=" + bodyString);System.out.println("[消費(fèi)數(shù)據(jù)] 結(jié)束!");}});// 由于消費(fèi)者也不知道生產(chǎn)者要生產(chǎn)多少, 就在這里通過(guò)這個(gè)循環(huán)模擬一直等待消費(fèi).while (true) {Thread.sleep(500);}}
}
3. 啟動(dòng)消息隊(duì)列服務(wù)器
在Spring Boot 項(xiàng)目的啟動(dòng)類中,實(shí)例化Broker Server,傳入端口號(hào),進(jìn)行啟動(dòng)服務(wù)器.
package com.example.demo;import com.example.demo.mqserver.BrokerServer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;import java.io.IOException;@SpringBootApplication
public class DemoApplication {public static ConfigurableApplicationContext context;public static void main(String[] args) throws IOException {context = SpringApplication.run(DemoApplication.class, args);BrokerServer brokerServer = new BrokerServer(9090);brokerServer.start();}}
4. 運(yùn)行效果
1. 服務(wù)器啟動(dòng):
2. 此時(shí)如果再重啟服務(wù)器,會(huì)提示數(shù)據(jù)庫(kù)已經(jīng)存在,就會(huì)將數(shù)據(jù)恢復(fù)到內(nèi)存
3. 啟動(dòng)生產(chǎn)者進(jìn)行投遞消息
上述就是按照我們自定義的應(yīng)用層協(xié)議進(jìn)行發(fā)送請(qǐng)求.?
我們?cè)賮?lái)看服務(wù)器這邊的日志:
4. 啟動(dòng)消費(fèi)者進(jìn)行消費(fèi)消息?
?我們?cè)賮?lái)看服務(wù)器這邊日志
?結(jié)語(yǔ)
?????????以上就是一個(gè)簡(jiǎn)單的Demo,實(shí)現(xiàn)了基于MQ的生產(chǎn)者消費(fèi)者模型.其他的功能,大家可以在做完這個(gè)項(xiàng)目之后自行進(jìn)行測(cè)試.至此這個(gè)消息隊(duì)列的項(xiàng)目就全部完結(jié)了,內(nèi)容還是很多的,希望可以通過(guò)這個(gè)系列能夠幫助到大家去了解消息隊(duì)列的實(shí)現(xiàn)原理.也希望大家能夠有所收獲,那就到這里吧.接下來(lái)就要開(kāi)始新的項(xiàng)目了(實(shí)現(xiàn)論壇系統(tǒng)),又是一個(gè)挑戰(zhàn),我們一起加油!??
????????完整的項(xiàng)目代碼已上傳Gitee,歡迎大家訪問(wèn).👇👇👇
模擬實(shí)現(xiàn)消息隊(duì)列https://gitee.com/yao-fa/advanced-java-ee/tree/master/My-mq