怎么做外貿(mào)推廣網(wǎng)站搜索關(guān)鍵詞優(yōu)化
目錄
一、客戶端代碼實(shí)現(xiàn)
1.1、需求分析
1.2、具體實(shí)現(xiàn)
1)實(shí)現(xiàn) ConnectionFactory
2)實(shí)現(xiàn) Connection
3)實(shí)現(xiàn) Channel
二、編寫 Demo?
2.1、實(shí)例?
2.1、實(shí)例演示
一、客戶端代碼實(shí)現(xiàn)
1.1、需求分析
RabbitMQ 的客戶端設(shè)定:一個(gè)客戶端可以有多個(gè)模塊,每個(gè)模塊都可以和 broker server 之間建立 “邏輯上的連接” (channel),這幾個(gè)模塊的channel 彼此之間是互相不影響的,同時(shí)這幾個(gè) channel 又復(fù)用的同一個(gè) TCP 連接,省去了頻繁 建立/銷毀?TCP 連接的開銷(三次握手、四次揮手......).
這里,我們也按照這樣的邏輯實(shí)現(xiàn) 消息隊(duì)列 的客戶端,主要涉及到以下三個(gè)核心類:
- ConnectionFactory:連接工廠,這個(gè)類持有服務(wù)器的地址,主要功能就是創(chuàng)建 Connection 對(duì)象.
- Connection:表示一個(gè) TCP連接,持有 Socket 對(duì)象,用來 寫入請(qǐng)求/讀取響應(yīng),管理多個(gè)Channel 對(duì)象.
- Channel:表示一個(gè)邏輯上的連接,需要提供一系列的方法,去和服務(wù)器提供的核心 API 對(duì)應(yīng)(客戶端提供的這些方法的內(nèi)部,就是寫入了一個(gè)特定的請(qǐng)求,然后等待服務(wù)器響應(yīng)).
1.2、具體實(shí)現(xiàn)
1)實(shí)現(xiàn) ConnectionFactory
主要用來創(chuàng)建 Connection 對(duì)象.
public class ConnectionFactory {//broker server 的 ip 地址private String host;//broker server 的端口號(hào)private int port;// //訪問 broker server 的哪個(gè)虛擬主機(jī)
// //這里暫時(shí)先不涉及
// private String virtualHostName;
// private String username;
// private String password;public Connection newConnection() throws IOException {Connection connection = new Connection(host, port);return connection;}public String getHost() {return host;}public void setHost(String host) {this.host = host;}public int getPort() {return port;}public void setPort(int port) {this.port = port;}
}
2)實(shí)現(xiàn) Connection
屬性如下
private Socket socket;//一個(gè) socket 連接需要管理多個(gè) channelprivate ConcurrentHashMap<String, Channel> channelMap = new ConcurrentHashMap<>();private InputStream inputStream;private OutputStream outputStream;// DataXXX 主要用來 讀取/寫入 特定格式數(shù)據(jù)(例如 readInt())private DataInputStream dataInputStream;private DataOutputStream dataOutputStream;//用來處理 0xc 的回調(diào),這里開銷可能會(huì)很大,不希望把 Connection 阻塞住,因此使用 線程池 來處理private ExecutorService callbackPool;
構(gòu)造如下
這里不光需要初始化屬性,還需要?jiǎng)?chuàng)建一個(gè)掃描線程,由這個(gè)線程負(fù)責(zé)不停的從 socket 中讀取響應(yīng)數(shù)據(jù),把這個(gè)響應(yīng)數(shù)據(jù)再交給對(duì)應(yīng)的 channel 負(fù)責(zé)處理
public Connection(String host, int port) throws IOException {socket = new Socket(host, port);inputStream = socket.getInputStream();outputStream = socket.getOutputStream();dataInputStream = new DataInputStream(inputStream);dataOutputStream = new DataOutputStream(outputStream);callbackPool = Executors.newFixedThreadPool(4);//創(chuàng)建一個(gè)掃描線程,由這個(gè)線程負(fù)責(zé)不停的從 socket 中讀取響應(yīng)數(shù)據(jù),把這個(gè)響應(yīng)數(shù)據(jù)再交給對(duì)應(yīng)的 channel 負(fù)責(zé)處理Thread t = new Thread(() -> {try {while(!socket.isClosed()) {Response response = readResponse();dispatchResponse(response);}} catch (SocketException e) {//連接正常斷開的,此時(shí)這個(gè)異??梢院雎許ystem.out.println("[Connection] 連接正常斷開!");} catch(IOException | ClassNotFoundException | MqException e) {System.out.println("[Connection] 連接異常斷開!");e.printStackTrace();}});t.start();}
釋放 Connection 相關(guān)資源
public void close() {try {callbackPool.shutdown();channelMap.clear();inputStream.close();outputStream.close();socket.close();} catch (IOException e) {e.printStackTrace();}}
使用這個(gè)方法來區(qū)別,當(dāng)前的響應(yīng)是一個(gè)針對(duì)控制請(qǐng)求的響應(yīng),還是服務(wù)器推送過來的消息.
如果是服務(wù)器推送過來的消息,就響應(yīng)表明是 0xc,也就是一個(gè)回調(diào),通過線程池來進(jìn)行處理;
如果只是一個(gè)普通的響應(yīng),就把這個(gè)結(jié)果放到 channel 的 哈希表中(隨后 channel 會(huì)喚醒所有阻塞等待響應(yīng)的線程,去 map 中拿數(shù)據(jù)).
public void dispatchResponse(Response response) throws IOException, ClassNotFoundException, MqException {if(response.getType() == 0xc) {//服務(wù)器推送過來的消息數(shù)據(jù)SubScribeReturns subScribeReturns = (SubScribeReturns) BinaryTool.fromBytes(response.getPayload());//根據(jù) channelId 找到對(duì)應(yīng)的 channel 對(duì)象Channel channel = channelMap.get(subScribeReturns.getChannelId());if(channel == null) {throw new MqException("[Connection] 該消息對(duì)應(yīng)的 channel 再客戶端中不存在!channelId=" + channel.getChannelId());}//執(zhí)行該 channel 對(duì)象內(nèi)部的回調(diào)(這里的開銷未知,有可能很大,同時(shí)不希望把這里阻塞住,所以使用線程池來執(zhí)行)callbackPool.submit(() -> {try {channel.getConsumer().handlerDelivery(subScribeReturns.getConsumerTag(), subScribeReturns.getBasicProperties(),subScribeReturns.getBody());} catch(MqException | IOException e) {e.printStackTrace();}});} else {//當(dāng)前響應(yīng)是針對(duì)剛才的控制請(qǐng)求的響應(yīng)BasicReturns basicReturns = (BasicReturns) BinaryTool.fromBytes(response.getPayload());//把這個(gè)結(jié)果放到 channel 的 哈希表中Channel channel = channelMap.get(basicReturns.getChannelId());if(channel == null) {throw new MqException("[Connection] 該消息對(duì)應(yīng)的 channel 在客戶端中不存在!channelId=" + channel.getChannelId());}channel.putReturns(basicReturns);}}
發(fā)送請(qǐng)求和讀取響應(yīng)
/*** 發(fā)送請(qǐng)求* @param request* @throws IOException*/public void writeRequest(Request request) throws IOException {dataOutputStream.writeInt(request.getType());dataOutputStream.writeInt(request.getLength());dataOutputStream.write(request.getPayload());dataOutputStream.flush();System.out.println("[Connection] 發(fā)送請(qǐng)求!type=" + request.getType() + ", length=" + request.getLength());}/*** 讀取響應(yīng)*/public Response readResponse() throws IOException {Response response = new Response();response.setType(dataInputStream.readInt());response.setLength(dataInputStream.readInt());byte[] payload = new byte[response.getLength()];int n = dataInputStream.read(payload);if(n != response.getLength()) {throw new IOException("讀取的響應(yīng)格式不完整! n=" + n + ", responseLen=" + response.getLength());}response.setPayload(payload);System.out.println("[Connection] 收到響應(yīng)!type=" + response.getType() + ", length=" + response.getLength());return response;}
在 Connection 中提供創(chuàng)建 Channel 的方法
public Channel createChannel() throws IOException {String channelId = "C-" + UUID.randomUUID().toString();Channel channel = new Channel(channelId, this);//放到 Connection 管理的 channel 的 Map 集合中channelMap.put(channelId, channel);//同時(shí)也需要把 “創(chuàng)建channel” 這個(gè)消息告訴服務(wù)器boolean ok = channel.createChannel();if(!ok) {//如果創(chuàng)建失敗,就說明這次創(chuàng)建 channel 操作不順利//把剛才加入 hash 表的鍵值對(duì)再刪了channelMap.remove(channelId);return null;}return channel;}
Ps:代碼中使用了很多次 UUID ,這里我們和之前一樣,使用加前綴的方式來進(jìn)行區(qū)分.
3)實(shí)現(xiàn) Channel
屬性和構(gòu)造如下
private String channelId;// 當(dāng)前這個(gè) channel 是屬于哪一個(gè)連接private Connection connection;//用來存儲(chǔ)后續(xù)客戶端收到的服務(wù)器響應(yīng),已經(jīng)辨別是哪個(gè)響應(yīng)(要對(duì)的上號(hào)) key 是 ridprivate ConcurrentHashMap<String, BasicReturns> basicReturnsMap = new ConcurrentHashMap<>();//如果當(dāng)前 Channel 訂閱了某個(gè)隊(duì)列,就需要記錄對(duì)應(yīng)的回調(diào)是什么,當(dāng)該隊(duì)列消息返回回來的時(shí)候,調(diào)用回調(diào)//此處約定一個(gè) Channel 只能有一個(gè)回調(diào)private Consumer consumer;public Channel(String channelId, Connection connection) {this.channelId = channelId;this.connection = connection;}public String getChannelId() {return channelId;}public void setChannelId(String channelId) {this.channelId = channelId;}public Connection getConnection() {return connection;}public void setConnection(Connection connection) {this.connection = connection;}public ConcurrentHashMap<String, BasicReturns> getBasicReturnsMap() {return basicReturnsMap;}public void setBasicReturnsMap(ConcurrentHashMap<String, BasicReturns> basicReturnsMap) {this.basicReturnsMap = basicReturnsMap;}public Consumer getConsumer() {return consumer;}public void setConsumer(Consumer consumer) {this.consumer = consumer;
實(shí)現(xiàn) 0x1 創(chuàng)建 channel
主要就是構(gòu)造構(gòu)造出 request,然后發(fā)送請(qǐng)求到 BrokerServer 服務(wù)器,阻塞等待服務(wù)器響應(yīng).
/*** 0x1* 和服務(wù)器進(jìn)行交互,告訴服務(wù)器,此處客戶端已經(jīng)創(chuàng)建了新的 channel 了* @return*/public boolean createChannel() throws IOException {//構(gòu)造 payloadBasicArguments arguments = new BasicArguments();arguments.setChannelId(channelId);arguments.setRid(generateRid());byte[] payload = BinaryTool.toBytes(arguments);//發(fā)送請(qǐng)求Request request = new Request();request.setType(0x1);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);//等待服務(wù)器響應(yīng)BasicReturns basicReturns = waitResult(arguments.getRid());return basicReturns.isOk();}/*** 生成 rid* @return*/public String generateRid() {return "R-" + UUID.randomUUID().toString();}/*** 阻塞等待服務(wù)器響應(yīng)* @param rid* @return*/private BasicReturns waitResult(String rid) {BasicReturns basicReturns = null;while((basicReturns = basicReturnsMap.get(rid)) == null) {//查詢結(jié)果為空,就說明咱們?nèi)ゲ锁B驛站要取的包裹還沒到//此時(shí)就需要阻塞等待synchronized (this) {try {wait();} catch (InterruptedException e) {throw new RuntimeException(e);}}}basicReturnsMap.remove(rid);return basicReturns;}/*** 由 Connection 中的方法調(diào)用,區(qū)分為普通響應(yīng)之后觸發(fā)* 將響應(yīng)放回到 channel 管理的 map 中,并喚醒所有線程* @param basicReturns*/public void putReturns(BasicReturns basicReturns) {basicReturnsMap.put(basicReturns.getRid(), basicReturns);synchronized (this) {//當(dāng)前也不知道有多少線程再等待上述的這個(gè)響應(yīng)//因此就把所有等待的線程喚醒notifyAll();}}
Ps:其他的 請(qǐng)求操作也和 0x1 的方式幾乎一樣,這里不一一展示了,主要說一下 0xa
0xa 消費(fèi)者訂閱隊(duì)列消息,這里要先設(shè)置好回調(diào)到屬性中,方便 Connection 通過這個(gè)屬性來 處理回調(diào)
值得注意的一點(diǎn), 我們約定 channelId 就是 consumerTag
public boolean basicConsume(String queueName, boolean autoAck, Consumer consumer) throws IOException, MqException {//先設(shè)置回調(diào)if(this.consumer != null) {throw new MqException("該 channel 已經(jīng)設(shè)置過消費(fèi)消息回調(diào)了,不能重復(fù)!");}this.consumer = consumer;BasicConsumeArguments basicConsumeArguments = new BasicConsumeArguments();basicConsumeArguments.setRid(generateRid());basicConsumeArguments.setChannelId(channelId);basicConsumeArguments.setConsumerTag(channelId); // 注意:此處的 consumerTag 使用 channelId 來表示basicConsumeArguments.setQueueName(queueName);basicConsumeArguments.setAutoAck(autoAck);byte[] payload = BinaryTool.toBytes(basicConsumeArguments);Request request = new Request();request.setType(0xa);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(basicConsumeArguments.getRid());return basicReturns.isOk();}
二、編寫 Demo?
2.1、實(shí)例?
到了這里基本就實(shí)現(xiàn)完成了一個(gè) 跨主機(jī)/服務(wù)器 之間的生產(chǎn)者消費(fèi)者模型了(功能上可以滿足日常開發(fā)對(duì)消息隊(duì)列的使用),但是還具有很強(qiáng)的擴(kuò)展性,可以繼續(xù)參考 RabbitMQ,如果有想法的,或者是遇到不會(huì)的問題,可以私信我~
以下我來我來編寫一個(gè) demo,模擬 跨主機(jī)/服務(wù)器 之間的生產(chǎn)者消費(fèi)者模型(這里為了方便,就在本機(jī)演示).
首先再 spring boot 項(xiàng)目的啟動(dòng)類中 創(chuàng)建 BrokerServer ,綁定端口號(hào),然后啟動(dòng)
@SpringBootApplication
public class RabbitmqProjectApplication {public static ConfigurableApplicationContext context;public static void main(String[] args) throws IOException {context = SpringApplication.run(RabbitmqProjectApplication.class, args);BrokerServer brokerServer = new BrokerServer(9090);brokerServer.start();}}
編寫消費(fèi)者
public class DemoConsumer {public static void main(String[] args) throws IOException, MqException, InterruptedException {//建立連接ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(9090);Connection connection = factory.newConnection();Channel channel = connection.createChannel();//創(chuàng)建交換機(jī)和隊(duì)列(這里和生產(chǎn)者創(chuàng)建交換機(jī)和隊(duì)列不沖突,誰先啟動(dòng),就按照誰的創(chuàng)建,即使已經(jīng)存在交換機(jī)和隊(duì)列,再創(chuàng)建也不會(huì)有什么副作用)channel.exchangeDeclare("demoExchange", ExchangeType.DIRECT, true, false, null);channel.queueDeclare("demoQueue", true, false, false, null);//消費(fèi)者消費(fèi)消息channel.basicConsume("demoQueue", true, new Consumer() {@Overridepublic void handlerDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {System.out.println("開銷消費(fèi)");System.out.println("consumerTag=" + consumerTag);System.out.println("body=" + new String(body));System.out.println("消費(fèi)完畢");}});//由于消費(fèi)者不知道生產(chǎn)者要生產(chǎn)多少,就在這里通過循環(huán)模擬一直等待while(true) {Thread.sleep(500);}}}
編寫生產(chǎn)者
public class DemoProducer {public static void main(String[] args) throws IOException, InterruptedException {//建立連接ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(9090);Connection connection = factory.newConnection();Channel channel = connection.createChannel();//創(chuàng)建交換機(jī)和隊(duì)列(這里和消費(fèi)者創(chuàng)建交換機(jī)和隊(duì)列不沖突,誰先啟動(dòng),就按照誰的創(chuàng)建,即使已經(jīng)存在交換機(jī)和隊(duì)列,再創(chuàng)建也不會(huì)有什么副作用)channel.exchangeDeclare("demoExchange", ExchangeType.DIRECT, true, false, null);channel.queueDeclare("demoQueue", true, false, false, null);//生產(chǎn)消息byte[] body1 = "Im cyk1 !".getBytes();channel.basicPublish("demoExchange", "demoQueue", null, body1);Thread.sleep(500);//關(guān)閉連接channel.close();connection.close();}}
2.1、實(shí)例演示
啟動(dòng) spring boot 項(xiàng)目(啟動(dòng) BrokerServer)
運(yùn)行消費(fèi)者(消費(fèi)者和生產(chǎn)者誰先后運(yùn)行都可以)
?
運(yùn)行生產(chǎn)者
?
?