對酒店網(wǎng)站建設(shè)的意見互聯(lián)網(wǎng)廣告行業(yè)
文章目錄
- 1、心跳機(jī)制簡介
- 2、心跳機(jī)制實(shí)現(xiàn)方式
- 3、客戶端
- 4 、服務(wù)端
- 5、代碼實(shí)現(xiàn)
- 5.1 KeepAlive.java
- 5.2 MyClient.java
- 5.3 MyServer
- 5.4 測試結(jié)果
1、心跳機(jī)制簡介
在分布式系統(tǒng)中,分布在不同主機(jī)上的節(jié)點(diǎn)需要檢測其他節(jié)點(diǎn)的狀態(tài),如服務(wù)器節(jié)點(diǎn)需要檢測從節(jié)點(diǎn)是否失效。為了檢測對方節(jié)點(diǎn)的有效性,每隔固定時(shí)間就發(fā)送一個(gè)固定信息給對方,對方回復(fù)一個(gè)固定信息,如果長時(shí)間沒有收到對方的回復(fù),則斷開與對方的連接。
發(fā)包方既可以是服務(wù)端,也可以是客戶端,這要看具體實(shí)現(xiàn)。因?yàn)槭敲扛艄潭〞r(shí)間發(fā)送一次,類似心跳,所以發(fā)送的固定信息稱為心跳包。心跳包一般為比較小的包,可根據(jù)具體實(shí)現(xiàn)。心跳包主要應(yīng)用于長連接的保持與短線鏈接。
一般而言,應(yīng)該客戶端主動(dòng)向服務(wù)器發(fā)送心跳包,因?yàn)榉?wù)器向客戶端發(fā)送心跳包會影響服務(wù)器的性能。
實(shí)現(xiàn)原理:長連接的維持,是要客戶端程序,定時(shí)向服務(wù)端程序,發(fā)送一個(gè)維持連接包的。如果,長時(shí)間未發(fā)送維持連接包,服務(wù)端程序?qū)嚅_連接。
2、心跳機(jī)制實(shí)現(xiàn)方式
心跳機(jī)制有兩種實(shí)現(xiàn)方式,一種基于TCP自帶的心跳包,TCP的SO_KEEPALIVE選項(xiàng)可以,系統(tǒng)默認(rèn)的默認(rèn)跳幀頻率為2小時(shí),超過2小時(shí)后,本地的TCP 實(shí)現(xiàn)會發(fā)送一個(gè)數(shù)據(jù)包給遠(yuǎn)程的 Socket. 如果遠(yuǎn)程Socket 沒有發(fā)回響應(yīng), TCP實(shí)現(xiàn)就會持續(xù)嘗試 11 分鐘, 直到接收到響應(yīng)為止。 否則就會自動(dòng)斷開Socket連接。但TCP自帶的心跳包無法檢測比較敏感地知道對方的狀態(tài),默認(rèn)2小時(shí)的空閑時(shí)間,對于大多數(shù)的應(yīng)用而言太長了??梢允止ら_啟KeepAlive功能并設(shè)置合理的KeepAlive參數(shù)。
另一種在應(yīng)用層自己進(jìn)行實(shí)現(xiàn),基本步驟如下:
- 1、Client使用定時(shí)器,不斷發(fā)送心跳;
- 2、Server收到心跳后,回復(fù)一個(gè)包;
- 3、Server為每個(gè)Client啟動(dòng)超時(shí)定時(shí)器,如果在指定時(shí)間內(nèi)沒有收到Client的心跳包,則Client失效。
3、客戶端
Client通過持有Socket的對象,可以隨時(shí)(使用sendObject方法)發(fā)送Massage Object(消息)給服務(wù)端。
如果keepAliveDelay毫秒(程序中是2秒)內(nèi)未發(fā)送任何數(shù)據(jù),則自動(dòng)發(fā)送一個(gè)KeepAlive Object(心跳)給服務(wù)端,用于維持連接。
由于,我們向服務(wù)端,可以發(fā)送很多不同的消息對象,服務(wù)端也可以返回不同的對象。所以,對于返回對象的處理,要編寫具體的ObjectAction實(shí)現(xiàn)類進(jìn)行處理。通過Client.addActionMap方法進(jìn)行添加。這樣,程序會回調(diào)處理
4 、服務(wù)端
由于客戶端會定時(shí)(keepAliveDelay毫秒)發(fā)送維持連接的信息過來,所以,服務(wù)端要有一個(gè)檢測機(jī)制。
即當(dāng)服務(wù)端receiveTimeDelay毫秒(程序中是3秒)內(nèi)未接收任何數(shù)據(jù),則自動(dòng)斷開與客戶端的連接。
ActionMapping的原理與客戶端相似(相同)。
通過添加相應(yīng)的ObjectAction實(shí)現(xiàn)類,可以實(shí)現(xiàn)不同對象的響應(yīng)、應(yīng)答過程。
5、代碼實(shí)現(xiàn)
5.1 KeepAlive.java
package com.zyz.mynative.demo02;/*** @author zyz* @version 1.0* @data 2023/2/15 11:05* @Description:*/
import java.io.Serializable;
import java.text.SimpleDateFormat;
import java.util.Date;/**** 維持連接的消息對象(心跳對象)* @author zyz* @version 1.0* @data 2023/2/15 11:05* @Description:*/
public class KeepAlive implements Serializable{private static final long serialVersionUID = -2813120366138988480L;/* 覆蓋該方法,僅用于測試使用。* @see java.lang.Object#toString()*/@Overridepublic String toString() {return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())+"\t維持連接包";}}
5.2 MyClient.java
package com.zyz.mynative.demo02;/*** @author zyz* @version 1.0* @data 2023/2/15 11:06* @Description:*/import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.concurrent.ConcurrentHashMap;/*** C/S架構(gòu)的客戶端對象,持有該對象,可以隨時(shí)向服務(wù)端發(fā)送消息。** @author zyz* @version 1.0* @data 2023/2/15 11:06* @Description:*/
public class MyClient {/*** 處理服務(wù)端發(fā)回的對象,可實(shí)現(xiàn)該接口。*/public static interface ObjectAction {void doAction(Object obj, MyClient client);}public static final class DefaultObjectAction implements ObjectAction {@Overridepublic void doAction(Object obj, MyClient client) {System.out.println("處理:\t" + obj.toString());}}private String serverIp;private int port;private Socket socket;//連接狀態(tài)private boolean running = false;/*** 最后一次發(fā)送數(shù)據(jù)的時(shí)間*/private long lastSendTime;/*** 用于保存接收消息對象類型及該類型消息處理的對象*/private ConcurrentHashMap<Class, ObjectAction> actionMapping = new ConcurrentHashMap<Class, ObjectAction>();public MyClient(String serverIp, int port) {this.serverIp = serverIp;this.port = port;}public void start() throws UnknownHostException, IOException {if (running) {return;}socket = new Socket(serverIp, port);System.out.println("本地端口:" + socket.getLocalPort());lastSendTime = System.currentTimeMillis();running = true;//保持長連接的線程,每隔2秒項(xiàng)服務(wù)器發(fā)一個(gè)一個(gè)保持連接的心跳消息new Thread(new KeepAliveWatchDog()).start();//接受消息的線程,處理消息new Thread(new ReceiveWatchDog()).start();}public void stop() {if (running) {running = false;};}/*** 添加接收對象的處理對象。** @param cls 待處理的對象,其所屬的類。* @param action 處理過程對象。*/public void addActionMap(Class<Object> cls, ObjectAction action) {actionMapping.put(cls, action);}public void sendObject(Object obj) throws IOException {ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());oos.writeObject(obj);System.out.println("發(fā)送:\t" + obj);oos.flush();}class KeepAliveWatchDog implements Runnable {long checkDelay = 10;//兩秒鐘檢測一次long keepAliveDelay = 2000;@Overridepublic void run() {while (running) {if (System.currentTimeMillis() - lastSendTime > keepAliveDelay) {try {MyClient.this.sendObject(new KeepAlive());} catch (IOException e) {e.printStackTrace();MyClient.this.stop();}lastSendTime = System.currentTimeMillis();} else {try {Thread.sleep(checkDelay);} catch (InterruptedException e) {e.printStackTrace();MyClient.this.stop();}}}}}class ReceiveWatchDog implements Runnable {@Overridepublic void run() {while (running) {try {InputStream in = socket.getInputStream();if (in.available() > 0) {ObjectInputStream ois = new ObjectInputStream(in);Object obj = ois.readObject();System.out.println("接收:\t" + obj);ObjectAction oa = actionMapping.get(obj.getClass());oa = oa == null ? new DefaultObjectAction() : oa;oa.doAction(obj, MyClient.this);} else {Thread.sleep(10);}} catch (Exception e) {e.printStackTrace();MyClient.this.stop();}}}}public static void main(String[] args) throws UnknownHostException, IOException {String serverIp = "127.0.0.1";int port = 65432;MyClient client = new MyClient(serverIp, port);client.start();}
}
5.3 MyServer
package com.zyz.mynative.demo02;import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ConcurrentHashMap;/*** C/S架構(gòu)的服務(wù)端對象。** @author zyz* @version 1.0* @data 2023/2/15 11:08* @Description:*/
public class MyServer {/*** 要處理客戶端發(fā)來的對象,并返回一個(gè)對象,可實(shí)現(xiàn)該接口。*/public interface ObjectAction {Object doAction(Object rev, MyServer server);}public static final class DefaultObjectAction implements ObjectAction {@Overridepublic Object doAction(Object rev, MyServer server) {System.out.println("處理并返回:" + rev);return rev;}}private int port;private volatile boolean running = false;private long receiveTimeDelay = 3000;/*** ConcurrentHashMap是線程安全的,ConcurrentHashMap并非鎖住整個(gè)方法,* 而是通過原子操作和局部加鎖的方法保證了多線程的線程安全,且盡可能減少了性能損耗。*/private ConcurrentHashMap<Class, ObjectAction> actionMapping = new ConcurrentHashMap<Class, ObjectAction>();private Thread connWatchDog;public MyServer(int port) {this.port = port;}/*** 通過繼承Runnable ,開啟線程*/public void start() {if (running) {return;};running = true;connWatchDog = new Thread(new ConnWatchDog());connWatchDog.start();}@SuppressWarnings("deprecation")public void stop() {if (running) {running = false;};if (connWatchDog != null) {connWatchDog.stop();};}public void addActionMap(Class<Object> cls, ObjectAction action) {actionMapping.put(cls, action);}/*** 繼承Runnable 重寫run方法、實(shí)現(xiàn)線程*/class ConnWatchDog implements Runnable {@Overridepublic void run() {try {ServerSocket ss = new ServerSocket(port, 5);while (running) {Socket s = ss.accept();new Thread(new SocketAction(s)).start();}} catch (IOException e) {e.printStackTrace();MyServer.this.stop();}}}/*** 繼承Runnable 重寫run方法、實(shí)現(xiàn)線程,通過匿名內(nèi)部類*/class SocketAction implements Runnable {Socket s;boolean run = true;long lastReceiveTime = System.currentTimeMillis();public SocketAction(Socket s) {this.s = s;}@Overridepublic void run() {while (running && run) {if (System.currentTimeMillis() - lastReceiveTime > receiveTimeDelay) {overThis();} else {try {InputStream in = s.getInputStream();if (in.available() > 0) {ObjectInputStream ois = new ObjectInputStream(in);Object obj = ois.readObject();lastReceiveTime = System.currentTimeMillis();System.out.println("接收:\t" + obj);ObjectAction oa = actionMapping.get(obj.getClass());oa = oa == null ? new DefaultObjectAction() : oa;Object out = oa.doAction(obj, MyServer.this);if (out != null) {ObjectOutputStream oos = new ObjectOutputStream(s.getOutputStream());oos.writeObject(out);oos.flush();}} else {Thread.sleep(10);}} catch (Exception e) {e.printStackTrace();overThis();}}}}private void overThis() {if (run) {run = false;};if (s != null) {try {s.close();} catch (IOException e) {e.printStackTrace();}}System.out.println("關(guān)閉:" + s.getRemoteSocketAddress());}}public static void main(String[] args) {int port = 65432;MyServer server = new MyServer(port);server.start();}}
5.4 測試結(jié)果
先開啟服務(wù)端,在開啟客戶端
參考資料:
- 1、java實(shí)現(xiàn)心跳機(jī)制
- 2、java實(shí)現(xiàn)長連接