合肥模板建站多少錢網(wǎng)絡(luò)營銷方式有哪些
RabbitMQ 工作機(jī)制圖:
Connection: 代表客戶端(包括消息生產(chǎn)者和消費(fèi)者)與RabbitMQ之間的連接。
Channel: 連接內(nèi)部的Channel。channel:通道
Exchange: 充當(dāng)消息交換機(jī)的組件。
Queue: 消息隊(duì)列。
★ 消費(fèi)消息
使用 RabbitMQ Java Client 開發(fā) 消息消費(fèi)者 的大致步驟如下:
(1)創(chuàng)建ConnectionFactory,設(shè)置連接信息,再通過ConnectionFactory獲取Connection。
(2)通過Connection獲取Channel。
(3)根據(jù)需要,調(diào)用Channel的queueDeclare()方法聲明隊(duì)列,如果聲明的隊(duì)列已存在,該方法直接獲取已有的隊(duì)列;如果聲明的隊(duì)列還不存在,該方法將會(huì)創(chuàng)建新的隊(duì)列。
(4)調(diào)用Channel 的 basicConsume()方法開始處理消息,調(diào)用該方法時(shí)需要傳入一個(gè)Consumer參數(shù),
該參數(shù)相當(dāng)于JMS中的消息監(jiān)聽器。
這個(gè) basicConsume()方法 相當(dāng)于是異步消費(fèi)。
而同步消費(fèi)會(huì)出現(xiàn)阻塞情況,這就失去消息中間件存在的意義,所以先講異步消費(fèi)。
★ 發(fā)送消息
使用RabbitMQ Java Client依賴庫開發(fā)消息生產(chǎn)者的大致步驟如下:
(1)創(chuàng)建ConnectionFactory,設(shè)置連接信息,再通過ConnectionFactory獲取Connection。
(2)通過Connection獲取Channel。
(3)根據(jù)需要調(diào)用exchangeDeclare()、queueDeclare()方法聲明Exchange和隊(duì)列、并完成隊(duì)列與Exchange的綁定。
如果聲明的Exchange還不存在,則創(chuàng)建該Exchange;否則直接使用已有的Exchange。
Declare:聲明、宣布
(4)調(diào)用Channel的basicPublish()方法發(fā)送消息,調(diào)用該方法的第一個(gè)參數(shù)是exchange,
第二個(gè)參數(shù)為路由key,最后兩個(gè)參數(shù)依次是消息屬性和消息數(shù)據(jù)體。
【注意】:雖然消息生產(chǎn)者與隊(duì)列是完全隔離的, 但如果消息生產(chǎn)者不聲明消息隊(duì)列,那系統(tǒng)中就可能暫時(shí)還沒有任何消息隊(duì)列。
在這種情況下,消息生產(chǎn)者向Exchange發(fā)送的消息將不會(huì)分發(fā)給任何隊(duì)列,這些消息直接就被丟棄了。
【備注】:為了保證消息生產(chǎn)者能將消息發(fā)送到指定隊(duì)列,消息生產(chǎn)者需要聲明消息隊(duì)列,保證消息隊(duì)列的存在。
**問題:**消息生產(chǎn)者 和 消息隊(duì)列 是完全隔離的,但是生產(chǎn)者為什么還要聲明消息隊(duì)列?
**原因:**因?yàn)槌绦蛉绻冗\(yùn)行消息生產(chǎn)者,后運(yùn)行消費(fèi)者,而聲明消息隊(duì)列的方法又只存在消費(fèi)者那邊,那么在先運(yùn)行消息生產(chǎn)者時(shí),就會(huì)因?yàn)檫€沒有生成消息隊(duì)列,所以生產(chǎn)者發(fā)送到exchange的消息,會(huì)因?yàn)闆]有對(duì)應(yīng)的消息隊(duì)列而被丟棄。
代碼演示:
先創(chuàng)建一個(gè)普通的 maven 項(xiàng)目。
添加一些屬性 和 RabbitMQ的依賴
創(chuàng)建消息消費(fèi)者
把創(chuàng)建連接的代碼封裝到一個(gè)方法里去。
消費(fèi)者的代碼
注意:channel.basicConsume 的第二個(gè)參數(shù) autoAck:true,就是表示自動(dòng)確認(rèn)消息已經(jīng)被消費(fèi)完成了。就是當(dāng)消費(fèi)者接收到消息之后,就立馬返回一個(gè)已經(jīng)確認(rèn)消費(fèi)的消息回去給消息隊(duì)列。
這樣容易出現(xiàn)問題,就是消費(fèi)者這邊因?yàn)橐皇盏较⒕蜁?huì)自動(dòng)確認(rèn)消息被消費(fèi)了并返回已經(jīng)消費(fèi)消息的結(jié)果回去給消息隊(duì)列,但是可能消費(fèi)者其實(shí)還沒有把消息消費(fèi)掉,而消息隊(duì)列那邊又以為消費(fèi)者已經(jīng)把消息消費(fèi)了,所以就繼續(xù)發(fā)消息給那個(gè)消費(fèi)者。
而消費(fèi)者一收到消息又自動(dòng)確認(rèn)消費(fèi)并返回,就會(huì)導(dǎo)致這個(gè)消息隊(duì)列的消息越來越多,然后消費(fèi)者消費(fèi)不完。
這個(gè)演示已消費(fèi)未確認(rèn)的演示放最后
執(zhí)行消費(fèi)者
控制臺(tái)查看
原本沒有這個(gè)消息隊(duì)列,通過調(diào)用Channel的queueDeclare()方法聲明隊(duì)列,如果聲明的隊(duì)列已存在,該方法直接獲取已有的隊(duì)列;如果聲明的隊(duì)列還不存在,該方法將會(huì)創(chuàng)建新的隊(duì)列
一開始消費(fèi)者聲明的這個(gè)消息隊(duì)列,這個(gè)是否獨(dú)占的exclusive 參數(shù)我是寫true,
所以下圖的 myQueue01的 Features 就是 Excl
這個(gè)就是創(chuàng)建的消費(fèi)者。
用于消費(fèi)這個(gè) myQueue01 消息隊(duì)列的消費(fèi)者。
后面把 exclusive 改成了 false,是因?yàn)楹竺娴纳a(chǎn)者,需要也聲明這個(gè) myQueue01 消息隊(duì)列,而如果這個(gè)消息隊(duì)列是 獨(dú)占的,就沒法聲明了,所以改成 false
創(chuàng)建消息生產(chǎn)者
生產(chǎn)者發(fā)送完消息就會(huì)關(guān)閉資源
消費(fèi)者則是一直啟動(dòng)著
測(cè)試
先啟動(dòng)消費(fèi)者或者啟動(dòng)生產(chǎn)者都一樣,因?yàn)樯a(chǎn)者和消費(fèi)者都有調(diào)用queueDeclare() 方法聲明消息隊(duì)列,所以不存在發(fā)送消息后沒找到對(duì)應(yīng)的消息隊(duì)列而導(dǎo)致消息被丟棄的情況。
啟動(dòng)消費(fèi)者
然后啟動(dòng)生產(chǎn)者
生產(chǎn)者發(fā)送消息
再看消費(fèi)者,已經(jīng)消費(fèi)了一條消息了。
因?yàn)橄葐?dòng)消費(fèi)者,所以生產(chǎn)者發(fā)送的消息馬上被消費(fèi)了,在控制臺(tái)的隊(duì)列就看不到了。
再測(cè)試:
先啟動(dòng)生產(chǎn)者
關(guān)閉消費(fèi)者,然后啟動(dòng)生產(chǎn)者發(fā)送消息
可以看出消息已經(jīng)生產(chǎn)發(fā)送到消息隊(duì)列了
這一步的流程圖
啟動(dòng)消費(fèi)者消費(fèi)消息
流程圖:
已消費(fèi)未確認(rèn)
注意:channel.basicConsume 的第二個(gè)參數(shù) autoAck:true,就是表示自動(dòng)確認(rèn)消息已經(jīng)被消費(fèi)完成了。就是當(dāng)消費(fèi)者接收到消息之后,就立馬返回一個(gè)已經(jīng)確認(rèn)消費(fèi)的消息回去給消息隊(duì)列。
這樣容易出現(xiàn)問題,就是消費(fèi)者這邊因?yàn)橐皇盏较⒕蜁?huì)自動(dòng)確認(rèn)消息被消費(fèi)了并返回已經(jīng)消費(fèi)消息的結(jié)果回去給消息隊(duì)列,但是可能消費(fèi)者其實(shí)還沒有把消息消費(fèi)掉,而消息隊(duì)列那邊又以為消費(fèi)者已經(jīng)把消息消費(fèi)了,所以就繼續(xù)發(fā)消息給那個(gè)消費(fèi)者。
而消費(fèi)者一收到消息又自動(dòng)確認(rèn)消費(fèi)并返回,就會(huì)導(dǎo)致這個(gè)消息隊(duì)列的消息越來越多,然后消費(fèi)者消費(fèi)不完。
如圖:因?yàn)?autoAck 為false , 所以消費(fèi)者消費(fèi)消息后沒有進(jìn)行確認(rèn)。這里的 unacked 條數(shù)就為1.
如果改成 autoAck 為false ,那么消費(fèi)者消費(fèi)消息的代碼,要加上確認(rèn)消息的方法。
這個(gè)就是手動(dòng)確認(rèn)消息。
完整代碼:
ConnectionUtil 連接工具類
package cn.ljh.app.rabbitmq.util;import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;//連接工具
public class ConnectionUtil
{//獲取連接的方法public static Connection getConnection() throws IOException, TimeoutException{//創(chuàng)建連接工廠----這個(gè)ConnectionFactory源碼可以看出有構(gòu)造器,所以直接new一個(gè)出來ConnectionFactory connectionFactory = new ConnectionFactory();//設(shè)置連接信息connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setUsername("ljh");connectionFactory.setPassword("123456");connectionFactory.setVirtualHost("/"); //連接虛擬主機(jī)//從連接工廠獲取連接Connection connection = connectionFactory.newConnection();//返回連接return connection;}
}
P2PProducer 生產(chǎn)者
package cn.ljh.app.rabbitmq.producer;import cn.ljh.app.rabbitmq.consumer.P2PConsumer;
import cn.ljh.app.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;//消息生產(chǎn)者--使用默認(rèn)的exchange
public class P2PProducer
{//(1)創(chuàng)建ConnectionFactory,設(shè)置連接信息,再通過ConnectionFactory獲取Connection。//(2)通過Connection獲取Channel。//(3)根據(jù)需要調(diào)用exchangeDeclare()、queueDeclare()方法聲明Exchange和隊(duì)列、并完成隊(duì)列與Exchange的綁定。// 如果聲明的Exchange還不存在,則創(chuàng)建該Exchange;否則直接使用已有的Exchange。//(4)調(diào)用Channel的basicPublish()方法發(fā)送消息,調(diào)用該方法的第一個(gè)參數(shù)是exchange,// 第二個(gè)參數(shù)為路由key,最后兩個(gè)參數(shù)依次是消息屬性和消息數(shù)據(jù)體。public static void main(String[] args) throws IOException, TimeoutException{//1、創(chuàng)建連接Connection conn = ConnectionUtil.getConnection();//2、通過Connection獲取Channel。Channel channel = conn.createChannel();//3、調(diào)用exchangeDeclare()方法聲明Exchange、調(diào)用queueDeclare()方法聲明隊(duì)列,并完成隊(duì)列與Exchange的綁定//此處打算直接使用默認(rèn)的Exchange來分發(fā)消息,因此無需聲明 Exchange,只需聲明隊(duì)列channel.queueDeclare(P2PConsumer.QUEUE_NAME, true, false, false, null);String message = "生產(chǎn)者發(fā)送的消息的內(nèi)容";//4、調(diào)用Channel的basicPublish()方法發(fā)送消息channel.basicPublish(""/*默認(rèn)的 Exchange 沒有名字,所以用空的字符串*/,P2PConsumer.QUEUE_NAME/*使用隊(duì)列名作為路由key,表明該消息將會(huì)被路由到該隊(duì)列*/,null /*指定額外的消息的屬性*/,message.getBytes(StandardCharsets.UTF_8)/*消息體必須是字節(jié)數(shù)組類型-->byte[]*/);//5、關(guān)閉資源//關(guān)閉通道channel.close();//關(guān)閉連接conn.close();}
}
P2PConsumer 消費(fèi)者
package cn.ljh.app.rabbitmq.consumer;import cn.ljh.app.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;//消息消費(fèi)者
public class P2PConsumer
{// 使用 RabbitMQ Java Client 開發(fā) 消息消費(fèi)者 的大致步驟如下://(1)創(chuàng)建ConnectionFactory連接工廠,設(shè)置連接信息,再通過ConnectionFactory獲取Connection連接。//(2)通過Connection獲取Channel。//(3)根據(jù)需要、調(diào)用Channel的queueDeclare()方法聲明隊(duì)列, Declare:聲明、宣布// 如果聲明的隊(duì)列已存在,該方法直接獲取已有的隊(duì)列;如果聲明的隊(duì)列還不存在,該方法將會(huì)創(chuàng)建新的隊(duì)列。//(4)調(diào)用Channel 的 basicConsume()方法開始處理消息,調(diào)用該方法時(shí)需要傳入一個(gè)Consumer參數(shù),該參數(shù)相當(dāng)于JMS中的消息監(jiān)聽器。//常量public final static String QUEUE_NAME = "myQueue01";public static void main(String[] args) throws IOException, TimeoutException{//1、創(chuàng)建連接工廠,設(shè)置連接信息,然后再通過連接工廠獲取連接Connection conn = ConnectionUtil.getConnection();//2、通過Connection獲取Channel 消息通道Channel channel = conn.createChannel();//3、調(diào)用 Channel 的 queueDeclare() 方法聲明隊(duì)列//如果聲明的隊(duì)列已存在,該方法直接獲取已有的隊(duì)列;如果聲明的隊(duì)列還不存在,該方法將會(huì)創(chuàng)建新的隊(duì)列//參數(shù)1:聲明的隊(duì)列名; 參數(shù)2:消息隊(duì)列是否持久化//參數(shù)3:是否只允許該消息消費(fèi)者消費(fèi)該隊(duì)列的消息,為true,則其他消費(fèi)者在這個(gè)myQueue01隊(duì)列消息積堆過多的情況下,也無法幫忙消費(fèi)。//參數(shù)4:是否自動(dòng)刪除(如果為true,在該隊(duì)列沒消息的情況下,會(huì)自動(dòng)刪除該隊(duì)列) 參數(shù)5:填寫額外的參數(shù)channel.queueDeclare(QUEUE_NAME, true, false, false, null);//4、調(diào)用Channel 的 basicConsume()方法開始處理消費(fèi)消息channel.basicConsume(QUEUE_NAME/*消費(fèi)這個(gè)名字的消費(fèi)隊(duì)列里面的消息*/,true/*消息的確認(rèn)模式:是否自動(dòng)確認(rèn)*/,new DefaultConsumer(channel){//處理消息:當(dāng)這個(gè)消息隊(duì)列收到消息的時(shí)候,這個(gè)方法就會(huì)被觸發(fā)。重寫這個(gè)方法:@Overridepublic void handleDelivery(String consumerTag,Envelope envelope /*消息所在的信封,存放消息的exchange、路由key這些*/,AMQP.BasicProperties properties /*消息的那些屬性*/,byte[] body /*body:消息的消息體*/) throws IOException{//把消息體中的消息拿出來String message = new String(body, "UTF-8");//printf:格式化輸出函數(shù) %s:輸出字符串 %n:換行System.err.printf("P2PConsumer收到來自Exchange為【%s】、路由key為【%s】的消息,消息內(nèi)容為%s%n",envelope.getExchange(),envelope.getRoutingKey(),message);}});}}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.ljh</groupId><artifactId>rabbitmqtest</artifactId><version>1.0.0</version><name>rabbitmqtest</name><!-- 屬性 --><properties><maven.compiler.source>11</maven.compiler.source><maven.compiler.target>11</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><java.version>11</java.version></properties><!-- 依賴 --><dependencies><!-- RabbitMQ 的依賴庫 --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.13.0</version></dependency></dependencies></project>