找建設(shè)網(wǎng)站公司seo技術(shù)優(yōu)化服務(wù)
目錄
- 1. 搭建示例工程
- 1.1. 創(chuàng)建工程
- 1.2. 添加依賴
- 2. 編寫生產(chǎn)者
- 3. 編寫消費(fèi)者
- 4. 小結(jié)
需求
官網(wǎng): https://www.rabbitmq.com/
需求:使用簡單模式完成消息傳遞
步驟:
① 創(chuàng)建工程(生成者、消費(fèi)者)
② 分別添加依賴
③ 編寫生產(chǎn)者發(fā)送消息
④ 編寫消費(fèi)者接收消息
1. 搭建示例工程
1.1. 創(chuàng)建工程
創(chuàng)建項(xiàng)目:rabbitmq-producer
創(chuàng)建項(xiàng)目:rabbitmq-consumer
1.2. 添加依賴
往兩個rabbitmq的pom.xml文件中添加如下依賴:
<dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.6.0</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.0</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin></plugins></build>
2. 編寫生產(chǎn)者
編寫消息生產(chǎn)者 com.donglin.rabbitmq.simple.Producer
package com.donglin.rabbitmq.simple;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Producer {public static void main(String[] args) throws Exception {//創(chuàng)建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();//主機(jī)地址connectionFactory.setHost("192.168.121.140");//連接端口;默認(rèn)為 5672connectionFactory.setPort(5672);//虛擬主機(jī)名稱;默認(rèn)為 /connectionFactory.setVirtualHost("/");//連接用戶名;默認(rèn)為guestconnectionFactory.setUsername("admin");//連接密碼;默認(rèn)為guestconnectionFactory.setPassword("admin");//創(chuàng)建連接Connection connection = connectionFactory.newConnection();//創(chuàng)建頻道Channel channel = connection.createChannel();// 聲明(創(chuàng)建)隊(duì)列/*** queue 參數(shù)1:隊(duì)列名稱* durable 參數(shù)2:是否定義持久化隊(duì)列,當(dāng)mq重啟之后,還在* exclusive 參數(shù)3:是否獨(dú)占本次連接* ① 是否獨(dú)占,只能有一個消費(fèi)者監(jiān)聽這個隊(duì)列* ② 當(dāng)connection關(guān)閉時,是否刪除隊(duì)列* autoDelete 參數(shù)4:是否在不使用的時候自動刪除隊(duì)列,當(dāng)沒有consumer時,自動刪除* arguments 參數(shù)5:隊(duì)列其它參數(shù)*/channel.queueDeclare("simple_queue", true, false, false, null);// 要發(fā)送的信息String message = "你好;小兔子!";/*** 參數(shù)1:交換機(jī)名稱,如果沒有指定則使用默認(rèn)Default Exchage* 參數(shù)2:路由key,簡單模式可以傳遞隊(duì)列名稱* 參數(shù)3:配置信息* 參數(shù)4:消息內(nèi)容*/channel.basicPublish("", "simple_queue", null, message.getBytes());System.out.println("已發(fā)送消息:" + message);// 關(guān)閉資源channel.close();connection.close();}
}
運(yùn)行程序:http://192.168.6.100:15672
在執(zhí)行上述的消息發(fā)送之后;可以登錄rabbitMQ的管理控制臺,可以發(fā)現(xiàn)隊(duì)列和其消息:
3. 編寫消費(fèi)者
編寫消息的消費(fèi)者 com.donglin.rabbitmq.simple.Consumer
package com.donglin.rabbitmq.simple;import com.rabbitmq.client.*;import java.io.IOException;public class Consumer {public static void main(String[] args) throws Exception {//1.創(chuàng)建連接工廠ConnectionFactory factory = new ConnectionFactory();//2. 設(shè)置參數(shù)factory.setHost("192.168.121.140");//ipfactory.setPort(5672); //端口 默認(rèn)值 5672factory.setVirtualHost("/");//虛擬機(jī) 默認(rèn)值/factory.setUsername("admin");//用戶名factory.setPassword("admin");//密碼//3. 創(chuàng)建連接 ConnectionConnection connection = factory.newConnection();//4. 創(chuàng)建ChannelChannel channel = connection.createChannel();//5. 創(chuàng)建隊(duì)列Queue/*queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)參數(shù):1. queue:隊(duì)列名稱2. durable:是否持久化,當(dāng)mq重啟之后,還在3. exclusive:* 是否獨(dú)占。只能有一個消費(fèi)者監(jiān)聽這隊(duì)列* 當(dāng)Connection關(guān)閉時,是否刪除隊(duì)列4. autoDelete:是否自動刪除。當(dāng)沒有Consumer時,自動刪除掉5. arguments:參數(shù)。*///如果沒有一個名字叫simple_queue的隊(duì)列,則會創(chuàng)建該隊(duì)列,如果有則不會創(chuàng)建channel.queueDeclare("simple_queue",true,false,false,null);// 接收消息DefaultConsumer consumer = new DefaultConsumer(channel){/*回調(diào)方法,當(dāng)收到消息后,會自動執(zhí)行該方法1. consumerTag:標(biāo)識2. envelope:獲取一些信息,交換機(jī),路由key...3. properties:配置信息4. body:數(shù)據(jù)*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumerTag:"+consumerTag);System.out.println("Exchange:"+envelope.getExchange());System.out.println("RoutingKey:"+envelope.getRoutingKey());System.out.println("properties:"+properties);System.out.println("body:"+new String(body));}};/*basicConsume(String queue, boolean autoAck, Consumer callback)參數(shù):1. queue:隊(duì)列名稱2. autoAck:是否自動確認(rèn) ,類似咱們發(fā)短信,發(fā)送成功會收到一個確認(rèn)消息3. callback:回調(diào)對象*/// 消費(fèi)者類似一個監(jiān)聽程序,主要是用來監(jiān)聽消息channel.basicConsume("simple_queue",true,consumer);}
}
運(yùn)行程序
4. 小結(jié)
上述的入門案例中中其實(shí)使用的是如下的簡單模式:
在上圖的模型中,有以下概念:
- P:生產(chǎn)者,也就是要發(fā)送消息的程序
- C:消費(fèi)者:消息的接受者,會一直等待消息到來。
- queue:消息隊(duì)列,圖中紅色部分。類似一個郵箱,可以緩存消息;生產(chǎn)者向其中投遞消息,消費(fèi)者從其中取出消息。