wordpress設(shè)置圖片大小seo數(shù)據(jù)統(tǒng)計分析工具有哪些
目錄
前言
1. 創(chuàng)建BrokerServer類
1.1 啟動服務(wù)器
1.2 停止服務(wù)器
1.3 處理一個客戶端的連接
1.3.1?解析請求得到Request對象
1.3.2 根據(jù)請求計算響應(yīng)
1.3.3 將響應(yīng)寫回給客戶端
1.3.4?遍歷Session的哈希表,把斷開的Socket對象的鍵值對進(jìn)行刪除
2. 處理訂閱消息請求詳解(補(bǔ)充)
3. 序列化/反序列化實現(xiàn)(補(bǔ)充)
結(jié)語
前言
? ? ? ? 上一章節(jié),我們定義了本項目的應(yīng)用層傳輸協(xié)議.并且創(chuàng)建了各種參數(shù)類.本章節(jié)的目標(biāo)是對BrokerServer(實現(xiàn)一個TCP服務(wù)器)進(jìn)行實現(xiàn),對連接進(jìn)行處理,根據(jù)請求計算響應(yīng)返回給客戶端.
1. 創(chuàng)建BrokerServer類
public class BrokerServer {// 當(dāng)前考慮一個一個服務(wù)器中只有一個虛擬主機(jī)private VirtualHost virtualHost = new VirtualHost("default");// 使用哈希表表示當(dāng)前會話,也就是有哪些客戶端在和服務(wù)器進(jìn)行通信// key: channelId value:對應(yīng)對的Socket對象private ConcurrentHashMap<String, Socket> sessions = new ConcurrentHashMap<>();private ServerSocket serverSocket = null;// 引入線程池來處理多個客戶端private ExecutorService executorService = null;// 控制服務(wù)器是否繼續(xù)運(yùn)行private volatile boolean runnable = true;
}
1.1 啟動服務(wù)器
1. 首先將線程池進(jìn)行創(chuàng)建,用來處理多個連接.
2. 設(shè)置循環(huán)用來監(jiān)聽連接
3. 將處理連接交給線程池.
/*** 1. 啟動服務(wù)器*/public void start() throws IOException {System.out.println("[BrokerServer] 啟動!");// newCachedThreadPool自動申請新的線程executorService = Executors.newCachedThreadPool();try {while (runnable){Socket clientSocket = serverSocket.accept();// 把處理連接的邏輯發(fā)送給線程池executorService.submit(()->{processConnection(clientSocket);});}}catch (SocketException e){System.out.println("[BrokerServer] 服務(wù)器停止運(yùn)行!");
// e.printStackTrace();}}
1.2 停止服務(wù)器
1. 將標(biāo)志位runnable設(shè)置為false
2. 停止線程池的服務(wù)
3. 關(guān)閉服務(wù)器套接字
/*** 2. 停止服務(wù)器*/public void stop() throws IOException {runnable = false;// 停止線程池executorService.shutdownNow();serverSocket.close();}
1.3 處理一個客戶端的連接
1. 我們是從請求中獲取的信息是二進(jìn)制文件,我們不能直接使用InputStream和OutputStream,我們借助DataInputStream和DataOutputStream進(jìn)行操作字節(jié)流.
2. 使用DataInputStream進(jìn)行讀取請求的時候,讀到末尾的時候會拋出一個異常,我們將這個異常視作為處理正確的業(yè)務(wù)邏輯.我么catch掉這個異常就可以.
3. 解析得到請求對象
4. 更具請求計算響應(yīng)
5. 當(dāng)處理完響應(yīng)之后,要進(jìn)行關(guān)閉連接,并且將一個連接中其他Channel進(jìn)行關(guān)閉.
/*** 3. 處理一個客戶端的連接* 在一個連接中會出現(xiàn)多個請求和多個響應(yīng).在一個連接中要循環(huán)的處理*/private void processConnection(Socket clientSocket) {try(InputStream inputStream = clientSocket.getInputStream();OutputStream outputStream = clientSocket.getOutputStream()){try(DataInputStream dataInputStream = new DataInputStream(inputStream);DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){while (true){// 1. 讀取并解析請求Request request = readRequest(dataInputStream);// 2. 根據(jù)請求計算響應(yīng)Response response = process(request,clientSocket);// 3. 將響應(yīng)寫回客戶端writeResponse(dataOutputStream, response);}}}catch (EOFException | SocketException e){// 處理正確的業(yè)務(wù)邏輯// 上述進(jìn)行讀取數(shù)據(jù)的時候,如果數(shù)據(jù)讀到末尾(EOF) ,就會拋出一個異常// 借助這個異常結(jié)束上述循環(huán)System.out.println("[BrokerServer] 連接關(guān)閉! 客戶端地址:" +clientSocket.getInetAddress().toString()+ ",端口號: "+clientSocket.getPort());}catch (ClassNotFoundException | MqException e) {e.printStackTrace();} catch (IOException e) {// 處理真正的異常System.out.println("[BrokerServer] connection 出現(xiàn)異常");e.printStackTrace();}finally {try {// 當(dāng)連接處理完成之后,進(jìn)行關(guān)閉連接clientSocket.close();// 一個連接中可能會包含多個channel,需要把當(dāng)前這個Socket對應(yīng)的所有channel進(jìn)行關(guān)閉clearClosedSession(clientSocket);} catch (IOException e) {e.printStackTrace();}}}
1.3.1?解析請求得到Request對象
1. 根據(jù)我們自定義的格式,先讀前4個字節(jié)是請求的類型,在讀4個字節(jié)是payload的長度,在讀就是payload.
2. 讀取payload的時候,我們先根據(jù)長度創(chuàng)建字符數(shù)組,然后按照字符數(shù)組進(jìn)行獲取payload,比較讀取完的長度是否與原來請求的長度一致,不一致說明有消息的丟失.進(jìn)行拋出異常.
3. 最后得到完整的請求對象,交給下面的方法進(jìn)行處理.
/*** 3.1 解析請求得到Request對象*/private Request readRequest(DataInputStream dataInputStream) throws IOException {Request request = new Request();// 1. 首先讀取四個字節(jié),為請求的typerequest.setType(dataInputStream.readInt());// 2. 在讀四個字節(jié)就是payload的長度request.setLength(dataInputStream.readInt());// 3. 創(chuàng)建字符數(shù)組,并進(jìn)行讀取到數(shù)組中byte[] payload = new byte[request.getLength()];int n = dataInputStream.read(payload);if (n != request.getLength()){throw new IOException("[BrokerServer] 請求格式出錯");}// 4. 將讀取的數(shù)組內(nèi)容寫入到實體的Request對象中request.setPayload(payload);return request;}
1.3.2 根據(jù)請求計算響應(yīng)
1. 我們根據(jù)請求對象的payload進(jìn)行解析,此處需要注意的是,我們讀取到的payload是字節(jié)數(shù)組,我們需要進(jìn)行反序列化成字符數(shù)組.
2. 根據(jù)請求對象的Type值進(jìn)行區(qū)分,到底客戶端要調(diào)用服務(wù)器那些功能.
3. 處理完請求之后就要進(jìn)行構(gòu)造響應(yīng)了.
4. 返回響應(yīng)對象
/*** 3.2 根據(jù)請求計算響應(yīng)* @param request* @param clientSocket* @return*/private Response process(Request request, Socket clientSocket) throws IOException, ClassNotFoundException, MqException {// 1. 根據(jù)request中的payload進(jìn)行解析// payload 是根據(jù) request 中 type 進(jìn)行變化的BasicArguments basicArguments = (BasicArguments) BinaryTool.fromBytes(request.getPayload());// 打印請求的信息System.out.println("[Request] rid=" + basicArguments.getRid()+ ", channelId=" + basicArguments.getChannelId() +"type=" + request.getType() + ",length=" + request.getLength());// 2. 根據(jù)type的值,區(qū)分調(diào)用哪種功能boolean ok = true;if (request.getType() == 0X1){// 1. 創(chuàng)建一個channelsessions.put(basicArguments.getChannelId(),clientSocket);System.out.println("[BrokerServer] 創(chuàng)建channel完成 getChannelId="+ basicArguments.getChannelId());}else if (request.getType() == 0x2){// 2. 銷毀一個channelsessions.remove(basicArguments.getChannelId());System.out.println("[BrokerServer] 銷毀channel完成 getChannelId="+ basicArguments.getChannelId());}else if (request.getType() == 0x3){// 3. 創(chuàng)建交換機(jī)ExchangeDeclareArguments arguments = (ExchangeDeclareArguments) basicArguments;// 調(diào)用虛擬主機(jī)的功能方法ok = virtualHost.exchangeDeclare(arguments.getExchangeName(),arguments.getExchangeType(),arguments.isDurable(),arguments.isAutoDelete(),arguments.getArguments());System.out.println("[BrokerServer] 創(chuàng)建交換機(jī)完成 ExchangeName="+ arguments.getExchangeName());}else if (request.getType() == 0x4){// 4. 銷毀交換機(jī)ExchangeDeleteArguments arguments = (ExchangeDeleteArguments) basicArguments;ok = virtualHost.exchangeDelete(arguments.getExchangeName());System.out.println("[BrokerServer] 刪除交換機(jī)完成 ExchangeName="+ arguments.getExchangeName());}else if (request.getType() == 0x5) {// 5. 創(chuàng)建隊列QueueDeclareArguments arguments = (QueueDeclareArguments) basicArguments;ok = virtualHost.queueDeclare(arguments.getQueueName(), arguments.isDurable(),arguments.isExclusive(), arguments.isAutoDelete(), arguments.getArguments());System.out.println("[BrokerServer] 創(chuàng)建隊列完成 QueueName="+ arguments.getQueueName());} else if (request.getType() == 0x6) {// 6. 刪除隊列QueueDeleteArguments arguments = (QueueDeleteArguments) basicArguments;ok = virtualHost.queueDelete((arguments.getQueueName()));System.out.println("[BrokerServer] 刪除隊列完成 QueueName="+ arguments.getQueueName());} else if (request.getType() == 0x7) {// 7. 創(chuàng)建綁定QueueBindArguments arguments = (QueueBindArguments) basicArguments;ok = virtualHost.queueBind(arguments.getQueueName(),arguments.getExchangeName(), arguments.getBindingKey());System.out.println("[BrokerServer] 創(chuàng)建綁定完成 QueueName="+ arguments.getQueueName()+ ",ExchangeName=" + arguments.getExchangeName());} else if (request.getType() == 0x8) {// 8. 刪除綁定QueueUnbindArguments arguments = (QueueUnbindArguments) basicArguments;ok = virtualHost.queueUnbind(arguments.getQueueName(), arguments.getExchangeName());System.out.println("[BrokerServer] 刪除綁定完成 QueueName="+ arguments.getQueueName()+ ",ExchangeName=" + arguments.getExchangeName());} else if (request.getType() == 0x9) {// 9. 發(fā)布消息BasicPublishArguments arguments = (BasicPublishArguments) basicArguments;ok = virtualHost.basicPublish(arguments.getExchangeName(), arguments.getRoutingKey(),arguments.getBasicProperties(), arguments.getBody());System.out.println("[BrokerServer] 發(fā)布消息完成 ExchangeName=" + arguments.getExchangeName());}else if (request.getType() == 0xa) {// 10. 訂閱消息BasicConsumeArguments arguments = (BasicConsumeArguments) basicArguments;ok = virtualHost.basicConsume(arguments.getConsumeTag(), arguments.getQueueName(), arguments.isAutoAck(), new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws IOException, MqException {// 將服務(wù)器收到的消息進(jìn)行推送給客戶端// 先知道當(dāng)前這個收到的消息, 要發(fā)給哪個客戶端.// 此處 consumerTag 其實是 channelId. 根據(jù) channelId 去 sessions 中查詢, 就可以得到對應(yīng)的// socket 對象了, 從而可以往里面發(fā)送數(shù)據(jù)了// 1. 根據(jù) channelId 找到 socket 對象Socket clientSocket = sessions.get(consumerTag);if (clientSocket == null || clientSocket.isClosed()) {throw new MqException("[BrokerServer] 訂閱消息的客戶端已經(jīng)關(guān)閉!");}// 2. 構(gòu)造響應(yīng)數(shù)據(jù)SubScribeReturns subScribeReturns = new SubScribeReturns();subScribeReturns.setChannelId(consumerTag);subScribeReturns.setRid(""); // 由于這里只有響應(yīng), 沒有請求, 不需要去對應(yīng). rid 暫時不需要.subScribeReturns.setOk(true);subScribeReturns.setConsumerTag(consumerTag);subScribeReturns.setBasicProperties(basicProperties);subScribeReturns.setBody(body);byte[] payload = BinaryTool.toBytes(subScribeReturns);Response response = new Response();// 0xc 表示服務(wù)器給消費(fèi)者客戶端推送的消息數(shù)據(jù).response.setType(0xc);// response 的 payload 就是一個 SubScribeReturnsresponse.setLength(payload.length);response.setPayload(payload);// 3. 把數(shù)據(jù)寫回給客戶端.// 注意! 此處的 dataOutputStream 這個對象不能 close !!!// 如果 把 dataOutputStream 關(guān)閉, 就會直接把 clientSocket 里的 outputStream 也關(guān)了.// 此時就無法繼續(xù)往 socket 中寫入后續(xù)數(shù)據(jù)了.DataOutputStream dataOutputStream = new DataOutputStream(clientSocket.getOutputStream());writeResponse(dataOutputStream, response);}});}else if (request.getType() == 0xb) {// 10. 調(diào)用 basicAck 確認(rèn)消息.BasicAckArguments arguments = (BasicAckArguments) basicArguments;ok = virtualHost.basicAck(arguments.getQueueName(), arguments.getMessageId());System.out.println("[BrokerServer] 消費(fèi)者確認(rèn)消息完成 QueueName=" + arguments.getQueueName()+ ", MessageId=" + arguments.getMessageId());} else {// 當(dāng)前的 type 是非法的.throw new MqException("[BrokerServer] 未知的 type! type=" + request.getType());}// 3. 構(gòu)造響應(yīng)BasicReturns basicReturns = new BasicReturns();basicReturns.setChannelId(basicArguments.getChannelId());basicReturns.setRid(basicArguments.getRid());basicReturns.setOk(ok);byte[] payload = BinaryTool.toBytes(basicReturns);Response response = new Response();response.setType(request.getType());response.setLength(payload.length);response.setPayload(payload);System.out.println("[Response] rid=" + basicReturns.getRid() + ", channelId=" + basicReturns.getChannelId()+ ", type=" + response.getType() + ", length=" + response.getLength());return response;}
1.3.3 將響應(yīng)寫回給客戶端
1. 注意寫入類型和長度是寫入固定的4個字節(jié),那么我們就使用dataOutputStream.writeInt()
2. 寫完響應(yīng)之后,記得要刷新緩沖區(qū)?dataOutputStream.flush();
/*** 3.3 將響應(yīng)寫回給客戶端* @param dataOutputStream* @param response*/private void writeResponse(DataOutputStream dataOutputStream, Response response) throws IOException {// 將響應(yīng)的屬性從計算好的響應(yīng)中進(jìn)行設(shè)置dataOutputStream.writeInt(response.getType());dataOutputStream.writeInt(response.getLength());dataOutputStream.write(response.getPayload());// 刷新緩沖區(qū)dataOutputStream.flush();}
1.3.4?遍歷Session的哈希表,把斷開的Socket對象的鍵值對進(jìn)行刪除
由于Socket都已經(jīng)斷開連接了,那么存儲在內(nèi)存中的Session也就沒有存在的必要了.這個集合中存放的是一個連接中的change對應(yīng)的Session,當(dāng)連接斷開之后,Channel也就不會再進(jìn)行工作了,新的連接會創(chuàng)建新的Channel.
注意:我們在使用Map.entrySet進(jìn)行遍歷Map的時候,不要一遍遍歷一遍進(jìn)行刪除,這樣是不穩(wěn)定的,我們遍歷Map將需要進(jìn)行移除的Session進(jìn)行添加到待刪除的鏈表中,最后遍歷待刪除的數(shù)據(jù)結(jié)構(gòu)進(jìn)行刪除.
上述就是整個封裝好的BrokerServer服務(wù)器.
下面呢,我對有關(guān)根據(jù)請求計算響應(yīng)中訂閱消息這一功能,再進(jìn)行詳細(xì)的闡述,這塊比較難以理解,因為涉及到回調(diào)函數(shù),大家可能不知道這個回調(diào)函數(shù)掉用的時機(jī)是哪里.
2. 處理訂閱消息請求詳解(補(bǔ)充)
?第二個紅框部分是回調(diào)函數(shù).
????????只有消費(fèi)者訂閱的隊列中有消息了,并且輪詢的方式選中了這個消費(fèi)者,才會獲得消息的本體,此時線程池才會執(zhí)行到這個回調(diào)方法,此時才拿到消息的本體,可以將消息的屬性和本體寫入到SubscribeReturn中,進(jìn)而推送給消費(fèi)者進(jìn)行消費(fèi)消息.如果沒有消息給這個消費(fèi)者,那么也不會進(jìn)行斷開連接,只要服務(wù)器不斷開連接客戶端一直在等待分配的消息進(jìn)行消費(fèi).這一點希望,讀者能夠進(jìn)一步的理解.等總結(jié)完客戶端,那么我就會帶著大家,再來理一遍這個訂閱消息的這個思路.
3. 序列化/反序列化實現(xiàn)(補(bǔ)充)
要想能進(jìn)行序列化和反序列化就必須對目標(biāo)對象進(jìn)行實現(xiàn)serializable接口.
1. 我們使用ByteArrayOutputStream和ObjectOutputStream進(jìn)行將一個對象序列化為字節(jié)數(shù)組(輸出的是字節(jié)用output)
2. 我們ByteArrayInputStream和ObjectInputStream將一個字節(jié)數(shù)組反序列化成一個對象(輸入的是字節(jié)用Input)
結(jié)語
? ? ? ? 至此,我們就徹底的完成了mqserver 的搭建,只剩下mqclient的搭建,我們在下一系列完成客戶端的搭建,請持續(xù)關(guān)注,謝謝!!!
????????完整的項目代碼已上傳Gitee,歡迎大家訪問.👇👇👇
模擬實現(xiàn)消息隊列https://gitee.com/yao-fa/advanced-java-ee/tree/master/My-mq