中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁 > news >正文

wordpress設(shè)置圖片大小seo數(shù)據(jù)統(tǒng)計分析工具有哪些

wordpress設(shè)置圖片大小,seo數(shù)據(jù)統(tǒng)計分析工具有哪些,菠菜網(wǎng)站的代理怎么做,做外貿(mào)網(wǎng)站案例目錄 前言 1. 創(chuàng)建BrokerServer類 1.1 啟動服務(wù)器 1.2 停止服務(wù)器 1.3 處理一個客戶端的連接 1.3.1 解析請求得到Request對象 1.3.2 根據(jù)請求計算響應(yīng) 1.3.3 將響應(yīng)寫回給客戶端 1.3.4 遍歷Session的哈希表,把斷開的Socket對象的鍵值對進(jìn)行刪除 2. 處理訂閱消息請求詳解(補(bǔ)充) …

目錄

前言

1. 創(chuàng)建BrokerServer類

1.1 啟動服務(wù)器

1.2 停止服務(wù)器

1.3 處理一個客戶端的連接

1.3.1?解析請求得到Request對象

1.3.2 根據(jù)請求計算響應(yīng)

1.3.3 將響應(yīng)寫回給客戶端

1.3.4?遍歷Session的哈希表,把斷開的Socket對象的鍵值對進(jìn)行刪除

2. 處理訂閱消息請求詳解(補(bǔ)充)

3. 序列化/反序列化實現(xiàn)(補(bǔ)充)

結(jié)語


前言

? ? ? ? 上一章節(jié),我們定義了本項目的應(yīng)用層傳輸協(xié)議.并且創(chuàng)建了各種參數(shù)類.本章節(jié)的目標(biāo)是對BrokerServer(實現(xiàn)一個TCP服務(wù)器)進(jìn)行實現(xiàn),對連接進(jìn)行處理,根據(jù)請求計算響應(yīng)返回給客戶端.


1. 創(chuàng)建BrokerServer類

public class BrokerServer {// 當(dāng)前考慮一個一個服務(wù)器中只有一個虛擬主機(jī)private VirtualHost virtualHost = new VirtualHost("default");// 使用哈希表表示當(dāng)前會話,也就是有哪些客戶端在和服務(wù)器進(jìn)行通信// key: channelId value:對應(yīng)對的Socket對象private ConcurrentHashMap<String, Socket> sessions = new ConcurrentHashMap<>();private ServerSocket serverSocket = null;// 引入線程池來處理多個客戶端private ExecutorService executorService = null;// 控制服務(wù)器是否繼續(xù)運(yùn)行private volatile boolean runnable = true;
}

1.1 啟動服務(wù)器

1. 首先將線程池進(jìn)行創(chuàng)建,用來處理多個連接.

2. 設(shè)置循環(huán)用來監(jiān)聽連接

3. 將處理連接交給線程池.

/*** 1. 啟動服務(wù)器*/public void start() throws IOException {System.out.println("[BrokerServer] 啟動!");// newCachedThreadPool自動申請新的線程executorService = Executors.newCachedThreadPool();try {while (runnable){Socket clientSocket = serverSocket.accept();// 把處理連接的邏輯發(fā)送給線程池executorService.submit(()->{processConnection(clientSocket);});}}catch (SocketException e){System.out.println("[BrokerServer] 服務(wù)器停止運(yùn)行!");
//            e.printStackTrace();}}

1.2 停止服務(wù)器

1. 將標(biāo)志位runnable設(shè)置為false

2. 停止線程池的服務(wù)

3. 關(guān)閉服務(wù)器套接字

/*** 2. 停止服務(wù)器*/public void stop() throws IOException {runnable = false;// 停止線程池executorService.shutdownNow();serverSocket.close();}

1.3 處理一個客戶端的連接

1. 我們是從請求中獲取的信息是二進(jìn)制文件,我們不能直接使用InputStream和OutputStream,我們借助DataInputStream和DataOutputStream進(jìn)行操作字節(jié)流.

2. 使用DataInputStream進(jìn)行讀取請求的時候,讀到末尾的時候會拋出一個異常,我們將這個異常視作為處理正確的業(yè)務(wù)邏輯.我么catch掉這個異常就可以.

3. 解析得到請求對象

4. 更具請求計算響應(yīng)

5. 當(dāng)處理完響應(yīng)之后,要進(jìn)行關(guān)閉連接,并且將一個連接中其他Channel進(jìn)行關(guān)閉.

/*** 3. 處理一個客戶端的連接*    在一個連接中會出現(xiàn)多個請求和多個響應(yīng).在一個連接中要循環(huán)的處理*/private void processConnection(Socket clientSocket) {try(InputStream inputStream = clientSocket.getInputStream();OutputStream outputStream = clientSocket.getOutputStream()){try(DataInputStream dataInputStream = new DataInputStream(inputStream);DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){while (true){// 1. 讀取并解析請求Request request = readRequest(dataInputStream);// 2. 根據(jù)請求計算響應(yīng)Response response = process(request,clientSocket);// 3. 將響應(yīng)寫回客戶端writeResponse(dataOutputStream, response);}}}catch (EOFException | SocketException e){// 處理正確的業(yè)務(wù)邏輯// 上述進(jìn)行讀取數(shù)據(jù)的時候,如果數(shù)據(jù)讀到末尾(EOF) ,就會拋出一個異常// 借助這個異常結(jié)束上述循環(huán)System.out.println("[BrokerServer]  連接關(guān)閉! 客戶端地址:" +clientSocket.getInetAddress().toString()+ ",端口號: "+clientSocket.getPort());}catch (ClassNotFoundException | MqException e) {e.printStackTrace();} catch (IOException e) {// 處理真正的異常System.out.println("[BrokerServer] connection 出現(xiàn)異常");e.printStackTrace();}finally {try {// 當(dāng)連接處理完成之后,進(jìn)行關(guān)閉連接clientSocket.close();// 一個連接中可能會包含多個channel,需要把當(dāng)前這個Socket對應(yīng)的所有channel進(jìn)行關(guān)閉clearClosedSession(clientSocket);} catch (IOException e) {e.printStackTrace();}}}

1.3.1?解析請求得到Request對象

1. 根據(jù)我們自定義的格式,先讀前4個字節(jié)是請求的類型,在讀4個字節(jié)是payload的長度,在讀就是payload.

2. 讀取payload的時候,我們先根據(jù)長度創(chuàng)建字符數(shù)組,然后按照字符數(shù)組進(jìn)行獲取payload,比較讀取完的長度是否與原來請求的長度一致,不一致說明有消息的丟失.進(jìn)行拋出異常.

3. 最后得到完整的請求對象,交給下面的方法進(jìn)行處理.

 /*** 3.1 解析請求得到Request對象*/private Request readRequest(DataInputStream dataInputStream) throws IOException {Request request = new Request();// 1. 首先讀取四個字節(jié),為請求的typerequest.setType(dataInputStream.readInt());// 2. 在讀四個字節(jié)就是payload的長度request.setLength(dataInputStream.readInt());// 3. 創(chuàng)建字符數(shù)組,并進(jìn)行讀取到數(shù)組中byte[] payload = new byte[request.getLength()];int n = dataInputStream.read(payload);if (n != request.getLength()){throw new IOException("[BrokerServer] 請求格式出錯");}// 4. 將讀取的數(shù)組內(nèi)容寫入到實體的Request對象中request.setPayload(payload);return request;}

1.3.2 根據(jù)請求計算響應(yīng)

1. 我們根據(jù)請求對象的payload進(jìn)行解析,此處需要注意的是,我們讀取到的payload是字節(jié)數(shù)組,我們需要進(jìn)行反序列化成字符數(shù)組.

2. 根據(jù)請求對象的Type值進(jìn)行區(qū)分,到底客戶端要調(diào)用服務(wù)器那些功能.

3. 處理完請求之后就要進(jìn)行構(gòu)造響應(yīng)了.

4. 返回響應(yīng)對象

/*** 3.2 根據(jù)請求計算響應(yīng)* @param request* @param clientSocket* @return*/private Response process(Request request, Socket clientSocket) throws IOException, ClassNotFoundException, MqException {// 1. 根據(jù)request中的payload進(jìn)行解析//  payload 是根據(jù) request 中 type 進(jìn)行變化的BasicArguments basicArguments = (BasicArguments) BinaryTool.fromBytes(request.getPayload());// 打印請求的信息System.out.println("[Request] rid=" + basicArguments.getRid()+ ", channelId=" + basicArguments.getChannelId() +"type=" + request.getType() + ",length=" + request.getLength());// 2. 根據(jù)type的值,區(qū)分調(diào)用哪種功能boolean ok = true;if (request.getType() == 0X1){// 1. 創(chuàng)建一個channelsessions.put(basicArguments.getChannelId(),clientSocket);System.out.println("[BrokerServer] 創(chuàng)建channel完成 getChannelId="+ basicArguments.getChannelId());}else if (request.getType() == 0x2){// 2. 銷毀一個channelsessions.remove(basicArguments.getChannelId());System.out.println("[BrokerServer] 銷毀channel完成 getChannelId="+ basicArguments.getChannelId());}else if (request.getType() == 0x3){// 3. 創(chuàng)建交換機(jī)ExchangeDeclareArguments arguments = (ExchangeDeclareArguments) basicArguments;// 調(diào)用虛擬主機(jī)的功能方法ok = virtualHost.exchangeDeclare(arguments.getExchangeName(),arguments.getExchangeType(),arguments.isDurable(),arguments.isAutoDelete(),arguments.getArguments());System.out.println("[BrokerServer] 創(chuàng)建交換機(jī)完成 ExchangeName="+ arguments.getExchangeName());}else if (request.getType() == 0x4){// 4. 銷毀交換機(jī)ExchangeDeleteArguments arguments = (ExchangeDeleteArguments) basicArguments;ok = virtualHost.exchangeDelete(arguments.getExchangeName());System.out.println("[BrokerServer] 刪除交換機(jī)完成 ExchangeName="+ arguments.getExchangeName());}else if (request.getType() == 0x5) {// 5. 創(chuàng)建隊列QueueDeclareArguments arguments = (QueueDeclareArguments) basicArguments;ok = virtualHost.queueDeclare(arguments.getQueueName(), arguments.isDurable(),arguments.isExclusive(), arguments.isAutoDelete(), arguments.getArguments());System.out.println("[BrokerServer] 創(chuàng)建隊列完成 QueueName="+ arguments.getQueueName());} else if (request.getType() == 0x6) {// 6. 刪除隊列QueueDeleteArguments arguments = (QueueDeleteArguments) basicArguments;ok = virtualHost.queueDelete((arguments.getQueueName()));System.out.println("[BrokerServer] 刪除隊列完成 QueueName="+ arguments.getQueueName());} else if (request.getType() == 0x7) {// 7. 創(chuàng)建綁定QueueBindArguments arguments = (QueueBindArguments) basicArguments;ok = virtualHost.queueBind(arguments.getQueueName(),arguments.getExchangeName(), arguments.getBindingKey());System.out.println("[BrokerServer] 創(chuàng)建綁定完成 QueueName="+ arguments.getQueueName()+ ",ExchangeName=" + arguments.getExchangeName());} else if (request.getType() == 0x8) {// 8. 刪除綁定QueueUnbindArguments arguments = (QueueUnbindArguments) basicArguments;ok = virtualHost.queueUnbind(arguments.getQueueName(), arguments.getExchangeName());System.out.println("[BrokerServer] 刪除綁定完成 QueueName="+ arguments.getQueueName()+ ",ExchangeName=" + arguments.getExchangeName());} else if (request.getType() == 0x9) {// 9. 發(fā)布消息BasicPublishArguments arguments = (BasicPublishArguments) basicArguments;ok = virtualHost.basicPublish(arguments.getExchangeName(), arguments.getRoutingKey(),arguments.getBasicProperties(), arguments.getBody());System.out.println("[BrokerServer] 發(fā)布消息完成 ExchangeName=" + arguments.getExchangeName());}else if (request.getType() == 0xa) {// 10. 訂閱消息BasicConsumeArguments arguments = (BasicConsumeArguments) basicArguments;ok = virtualHost.basicConsume(arguments.getConsumeTag(), arguments.getQueueName(), arguments.isAutoAck(), new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws IOException, MqException {// 將服務(wù)器收到的消息進(jìn)行推送給客戶端// 先知道當(dāng)前這個收到的消息, 要發(fā)給哪個客戶端.// 此處 consumerTag 其實是 channelId. 根據(jù) channelId 去 sessions 中查詢, 就可以得到對應(yīng)的// socket 對象了, 從而可以往里面發(fā)送數(shù)據(jù)了// 1. 根據(jù) channelId 找到 socket 對象Socket clientSocket = sessions.get(consumerTag);if (clientSocket == null || clientSocket.isClosed()) {throw new MqException("[BrokerServer] 訂閱消息的客戶端已經(jīng)關(guān)閉!");}// 2. 構(gòu)造響應(yīng)數(shù)據(jù)SubScribeReturns subScribeReturns = new SubScribeReturns();subScribeReturns.setChannelId(consumerTag);subScribeReturns.setRid(""); // 由于這里只有響應(yīng), 沒有請求, 不需要去對應(yīng). rid 暫時不需要.subScribeReturns.setOk(true);subScribeReturns.setConsumerTag(consumerTag);subScribeReturns.setBasicProperties(basicProperties);subScribeReturns.setBody(body);byte[] payload = BinaryTool.toBytes(subScribeReturns);Response response = new Response();// 0xc 表示服務(wù)器給消費(fèi)者客戶端推送的消息數(shù)據(jù).response.setType(0xc);// response 的 payload 就是一個 SubScribeReturnsresponse.setLength(payload.length);response.setPayload(payload);// 3. 把數(shù)據(jù)寫回給客戶端.//    注意! 此處的 dataOutputStream 這個對象不能 close !!!//    如果 把 dataOutputStream 關(guān)閉, 就會直接把 clientSocket 里的 outputStream 也關(guān)了.//    此時就無法繼續(xù)往 socket 中寫入后續(xù)數(shù)據(jù)了.DataOutputStream dataOutputStream = new DataOutputStream(clientSocket.getOutputStream());writeResponse(dataOutputStream, response);}});}else if (request.getType() == 0xb) {// 10. 調(diào)用 basicAck 確認(rèn)消息.BasicAckArguments arguments = (BasicAckArguments) basicArguments;ok = virtualHost.basicAck(arguments.getQueueName(), arguments.getMessageId());System.out.println("[BrokerServer] 消費(fèi)者確認(rèn)消息完成 QueueName=" + arguments.getQueueName()+ ", MessageId=" + arguments.getMessageId());} else {// 當(dāng)前的 type 是非法的.throw new MqException("[BrokerServer] 未知的 type! type=" + request.getType());}// 3. 構(gòu)造響應(yīng)BasicReturns basicReturns = new BasicReturns();basicReturns.setChannelId(basicArguments.getChannelId());basicReturns.setRid(basicArguments.getRid());basicReturns.setOk(ok);byte[] payload = BinaryTool.toBytes(basicReturns);Response response = new Response();response.setType(request.getType());response.setLength(payload.length);response.setPayload(payload);System.out.println("[Response] rid=" + basicReturns.getRid() + ", channelId=" + basicReturns.getChannelId()+ ", type=" + response.getType() + ", length=" + response.getLength());return response;}

1.3.3 將響應(yīng)寫回給客戶端

1. 注意寫入類型和長度是寫入固定的4個字節(jié),那么我們就使用dataOutputStream.writeInt()

2. 寫完響應(yīng)之后,記得要刷新緩沖區(qū)?dataOutputStream.flush();

/*** 3.3 將響應(yīng)寫回給客戶端* @param dataOutputStream* @param response*/private void writeResponse(DataOutputStream dataOutputStream, Response response) throws IOException {// 將響應(yīng)的屬性從計算好的響應(yīng)中進(jìn)行設(shè)置dataOutputStream.writeInt(response.getType());dataOutputStream.writeInt(response.getLength());dataOutputStream.write(response.getPayload());// 刷新緩沖區(qū)dataOutputStream.flush();}

1.3.4?遍歷Session的哈希表,把斷開的Socket對象的鍵值對進(jìn)行刪除

由于Socket都已經(jīng)斷開連接了,那么存儲在內(nèi)存中的Session也就沒有存在的必要了.這個集合中存放的是一個連接中的change對應(yīng)的Session,當(dāng)連接斷開之后,Channel也就不會再進(jìn)行工作了,新的連接會創(chuàng)建新的Channel.

注意:我們在使用Map.entrySet進(jìn)行遍歷Map的時候,不要一遍遍歷一遍進(jìn)行刪除,這樣是不穩(wěn)定的,我們遍歷Map將需要進(jìn)行移除的Session進(jìn)行添加到待刪除的鏈表中,最后遍歷待刪除的數(shù)據(jù)結(jié)構(gòu)進(jìn)行刪除.


上述就是整個封裝好的BrokerServer服務(wù)器.


下面呢,我對有關(guān)根據(jù)請求計算響應(yīng)中訂閱消息這一功能,再進(jìn)行詳細(xì)的闡述,這塊比較難以理解,因為涉及到回調(diào)函數(shù),大家可能不知道這個回調(diào)函數(shù)掉用的時機(jī)是哪里.

2. 處理訂閱消息請求詳解(補(bǔ)充)

?第二個紅框部分是回調(diào)函數(shù).

????????只有消費(fèi)者訂閱的隊列中有消息了,并且輪詢的方式選中了這個消費(fèi)者,才會獲得消息的本體,此時線程池才會執(zhí)行到這個回調(diào)方法,此時才拿到消息的本體,可以將消息的屬性和本體寫入到SubscribeReturn中,進(jìn)而推送給消費(fèi)者進(jìn)行消費(fèi)消息.如果沒有消息給這個消費(fèi)者,那么也不會進(jìn)行斷開連接,只要服務(wù)器不斷開連接客戶端一直在等待分配的消息進(jìn)行消費(fèi).這一點希望,讀者能夠進(jìn)一步的理解.等總結(jié)完客戶端,那么我就會帶著大家,再來理一遍這個訂閱消息的這個思路.

3. 序列化/反序列化實現(xiàn)(補(bǔ)充)

要想能進(jìn)行序列化和反序列化就必須對目標(biāo)對象進(jìn)行實現(xiàn)serializable接口.

1. 我們使用ByteArrayOutputStream和ObjectOutputStream進(jìn)行將一個對象序列化為字節(jié)數(shù)組(輸出的是字節(jié)用output)

2. 我們ByteArrayInputStream和ObjectInputStream將一個字節(jié)數(shù)組反序列化成一個對象(輸入的是字節(jié)用Input)


結(jié)語

? ? ? ? 至此,我們就徹底的完成了mqserver 的搭建,只剩下mqclient的搭建,我們在下一系列完成客戶端的搭建,請持續(xù)關(guān)注,謝謝!!!

????????完整的項目代碼已上傳Gitee,歡迎大家訪問.👇👇👇

模擬實現(xiàn)消息隊列https://gitee.com/yao-fa/advanced-java-ee/tree/master/My-mq

http://www.risenshineclean.com/news/62204.html

相關(guān)文章:

  • 網(wǎng)站流量下滑福州seo技巧培訓(xùn)
  • 個人做網(wǎng)站開發(fā)指標(biāo)大數(shù)據(jù)免費(fèi)查詢平臺
  • 建站如何注重內(nèi)容建設(shè)愛站工具下載
  • 蘭州出臺9條優(yōu)化措施西安seo優(yōu)化系統(tǒng)
  • 怎么知道一個網(wǎng)站是誰做的上海seo優(yōu)化公司kinglink
  • 企業(yè)做宣傳網(wǎng)站多少錢河北關(guān)鍵詞排名推廣
  • 小型網(wǎng)站建設(shè)價格低網(wǎng)站人多怎么優(yōu)化
  • 上海網(wǎng)站建設(shè)價格成人就業(yè)技術(shù)培訓(xùn)機(jī)構(gòu)
  • 臺州網(wǎng)站建設(shè)找哪家好點360優(yōu)化大師app
  • sh域名做的好的網(wǎng)站營銷渠道名詞解釋
  • 梅州市建設(shè)培訓(xùn)中心網(wǎng)站模板網(wǎng)站如何建站
  • 大連鼎信網(wǎng)站建設(shè)公司地址crm網(wǎng)站
  • 合肥免費(fèi)做網(wǎng)站seo優(yōu)化專員工作內(nèi)容
  • 什么網(wǎng)站詳情頁做的好百度競價排名多少錢
  • 創(chuàng)意設(shè)計作品賞析蘇州關(guān)鍵詞優(yōu)化怎樣
  • 學(xué)做旗袍衣服的網(wǎng)站seo優(yōu)化排名價格
  • 自己制作網(wǎng)站需要什么站長百度
  • 影視網(wǎng)站代理如何出售自己的域名
  • 網(wǎng)絡(luò)推廣平臺免費(fèi)一鍵優(yōu)化表格
  • 專用車網(wǎng)站建設(shè)哪家專業(yè)網(wǎng)頁設(shè)計的流程
  • asp網(wǎng)站欄目如何修改全國疫情今天最新消息
  • 濰坊網(wǎng)站建設(shè)哪家便宜制作網(wǎng)頁用什么軟件
  • 互聯(lián)網(wǎng)投訴中心官網(wǎng)入口seo點擊工具
  • 蕪湖做網(wǎng)站的客戶百度官網(wǎng)平臺
  • 數(shù)據(jù)庫 網(wǎng)站開發(fā)所需流程百度指數(shù)明星搜索排名
  • 西安門戶網(wǎng)站建設(shè)b站網(wǎng)站推廣mmm
  • wordpress 4.5 多站點百度推廣手機(jī)客戶端
  • 鄭州電力高等??茖W(xué)校學(xué)費(fèi)多少seo引擎優(yōu)化培訓(xùn)
  • 本地wordpress后臺進(jìn)不去東莞關(guān)鍵詞排名seo
  • 在小型網(wǎng)站建設(shè)小組廣州疫情升級