黑龍江進(jìn)入疫情緊急狀態(tài)seo需要什么技術(shù)
文章目錄
- 1 WebSocket
- 1.1 簡(jiǎn)介
- 1.2 WebSocket作用和調(diào)用
- 1.2.1 作用
- 1.2.2 js端調(diào)用
- 1.3 Javax
- 1.3.1 服務(wù)端
- 1.3.1.1 服務(wù)端接收
- 1.3.1.2 服務(wù)端集成
- 1.3.1.3 ping和pong消息
- 1.3.2 客戶端
- 1.3.2.1 客戶端接收
- 1.3.2.2 客戶端發(fā)送
- 1.4 WebMVC
- 1.4.1 服務(wù)端
- 1.1.4.1 服務(wù)端接收
- 1.1.4.2 服務(wù)端集成
- 1.1.4.3 服務(wù)器握手?jǐn)r截
- 1.1.4.4 服務(wù)器地址問(wèn)題
- 1.4.2 客戶端
- 1.4.2.1 客戶端接收
- 1.4.2.2 客服端發(fā)送
- 1.5 WebFlux
- 1.5.1 服務(wù)端
- 1.5.1.1 服務(wù)端發(fā)送接收
- 1.5.1.2 服務(wù)端集成
- 1.5.2 客戶端
- 1.5.2.1 客戶端發(fā)送接收
- 1.5.2.2 客戶端發(fā)送
1 WebSocket
1.1 簡(jiǎn)介
WebSocket
協(xié)議是基于TCP
的一種新的網(wǎng)絡(luò)協(xié)議。它實(shí)現(xiàn)了瀏覽器與服務(wù)器全雙工(full-duplex
)通信——允許服務(wù)器主動(dòng)發(fā)送信息給客戶端,建立客戶端和服務(wù)器之間的通信渠道。瀏覽器和服務(wù)器僅需一次握手,兩者之間就直接可以創(chuàng)建持久性的連接,并進(jìn)行雙向數(shù)據(jù)傳輸。
1.2 WebSocket作用和調(diào)用
1.2.1 作用
HTTP
是基于請(qǐng)求響應(yīng)式的,即通信只能由客戶端發(fā)起,服務(wù)端做出響應(yīng),無(wú)狀態(tài),無(wú)連接:
無(wú)狀態(tài)
:每次連接只處理一個(gè)請(qǐng)求,請(qǐng)求結(jié)束后斷開連接。無(wú)連接
:對(duì)于事務(wù)處理沒(méi)有記憶能力,服務(wù)器不知道客戶端是什么狀態(tài)。
通過(guò)HTTP
實(shí)現(xiàn)即時(shí)通訊,只能是頁(yè)面輪詢向服務(wù)器發(fā)出請(qǐng)求,服務(wù)器返回查詢結(jié)果。輪詢的效率低,非常浪費(fèi)資源,因?yàn)楸仨毑煌_B接,或者 HTTP
連接始終打開。
WebSocket
的最大特點(diǎn)就是,服務(wù)器可以主動(dòng)向客戶端推送信息,客戶端也可以主動(dòng)向服務(wù)器發(fā)送信息
,是真正的雙向平等對(duì)話。
WebSocket
特點(diǎn):
- 建立在
TCP
協(xié)議之上,服務(wù)器端的實(shí)現(xiàn)比較容易。 - 與
HTTP
協(xié)議有著良好的兼容性。默認(rèn)端口也是80和443
,并且握手階段采用HTTP
協(xié)議,因此握手時(shí)不容易屏蔽,能通過(guò)各種HTTP
代理服務(wù)器。 - 數(shù)據(jù)格式比較輕量,性能開銷小,通信高效。
- 可以發(fā)送文本,也可以發(fā)送二進(jìn)制數(shù)據(jù)。
- 沒(méi)有同源限制,客戶端可以與任意服務(wù)器通信。
- 協(xié)議標(biāo)識(shí)符是
ws
(如果加密,則為wss
),服務(wù)器網(wǎng)址就是URL
1.2.2 js端調(diào)用
<script>var ws = new WebSocket('ws://localhost:8080/webSocket/10086');// 獲取連接狀態(tài)console.log('ws連接狀態(tài):' + ws.readyState);//監(jiān)聽(tīng)是否連接成功ws.onopen = function () {console.log('ws連接狀態(tài):' + ws.readyState);//連接成功則發(fā)送一個(gè)數(shù)據(jù)ws.send('test1');}// 接聽(tīng)服務(wù)器發(fā)回的信息并處理展示ws.onmessage = function (data) {console.log('接收到來(lái)自服務(wù)器的消息:');console.log(data);//完成通信后關(guān)閉WebSocket連接ws.close();}// 監(jiān)聽(tīng)連接關(guān)閉事件ws.onclose = function () {// 監(jiān)聽(tīng)整個(gè)過(guò)程中websocket的狀態(tài)console.log('ws連接狀態(tài):' + ws.readyState);}// 監(jiān)聽(tīng)并處理error事件ws.onerror = function (error) {console.log(error);}function sendMessage() {var content = $("#message").val();$.ajax({url: '/socket/publish?userId=10086&message=' + content,type: 'GET',data: { "id": "7777", "content": content },success: function (data) {console.log(data)}})}
</script>
下面主要介紹三種方式:Javax,WebMVC,WebFlux
,在Spring Boot
中的服務(wù)端和客戶端配置
1.3 Javax
在java
的擴(kuò)展包javax.websocket
中就定義了一套WebSocket
的接口規(guī)范
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>
1.3.1 服務(wù)端
1.3.1.1 服務(wù)端接收
一般使用注解的方式來(lái)進(jìn)行配置
/*** html頁(yè)面與之關(guān)聯(lián)的接口* var reqUrl = "http://localhost:8081/websocket/" + cid;* socket = new WebSocket(reqUrl.replace("http", "ws"));*/
@Component
@ServerEndpoint("/websocket/{type}")
public class JavaxWebSocketServerEndpoint {@OnOpenpublic void onOpen(Session session, EndpointConfig config,@PathParam(value = "type") String type) {//連接建立}@OnClosepublic void onClose(Session session, CloseReason reason) {//連接關(guān)閉}@OnMessagepublic void onMessage(Session session, String message) {//接收文本信息}@OnMessagepublic void onMessage(Session session, PongMessage message) {//接收pong信息}@OnMessagepublic void onMessage(Session session, ByteBuffer message) {//接收二進(jìn)制信息,也可以用byte[]接收}@OnErrorpublic void onError(Session session, Throwable e) {//異常處理}
}
我們?cè)陬惿咸砑?@ServerEndpoint
注解來(lái)表示這是一個(gè)服務(wù)端點(diǎn),同時(shí)可以在注解中配置路徑,這個(gè)路徑可以配置成動(dòng)態(tài)的,使用{}
包起來(lái)就可以了
@OnOpen
:用來(lái)標(biāo)記對(duì)應(yīng)的方法作為客戶端連接上來(lái)之后的回調(diào),Session
就相當(dāng)于和客戶端的連接了,我們可以把它緩存起來(lái)用于發(fā)送消息;通過(guò)@PathParam
注解就可以獲得動(dòng)態(tài)路徑中對(duì)應(yīng)值了@OnClose
:用來(lái)標(biāo)記對(duì)應(yīng)的方法作為客戶端斷開連接之后的回調(diào),我們可以在這個(gè)方法中移除對(duì)應(yīng)Session
的緩存,同時(shí)可以接受一個(gè)CloseReason
的參數(shù)用于獲取關(guān)閉原因@OnMessage
:用來(lái)標(biāo)記對(duì)應(yīng)的方法作為接收到消息之后的回調(diào),我們可以接受文本消息,二進(jìn)制消息和pong消息@OnError
:用來(lái)標(biāo)記對(duì)應(yīng)的方法作為拋出異常之后的回調(diào),可以獲得對(duì)應(yīng)的Session
和異常對(duì)象
1.3.1.2 服務(wù)端集成
@Configuration(proxyBeanMethods = false)
public class JavaxWebSocketConfiguration {@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}
}
依賴Spring
的WebSocket
模塊,手動(dòng)注入ServerEndpointExporter
就可以了
需要注意ServerEndpointExporter
是Spring
中的類,算是Spring
為了支持javax.websocket
的原生用法所提供的支持類
javax.websocket
庫(kù)中定義了PongMessage
而沒(méi)有PingMessage
通過(guò)測(cè)試發(fā)現(xiàn)基本上所有的WebSocket
包括前端js
自帶的,都實(shí)現(xiàn)了自動(dòng)回復(fù);也就是說(shuō)當(dāng)接收到一個(gè)ping
消息之后,是會(huì)自動(dòng)回應(yīng)一個(gè)pong
消息,所以沒(méi)有必要再自己接受ping
消息來(lái)處理了,即我們不會(huì)接受到ping
消息;
當(dāng)然我上面講的ping和pong
都是需要使用框架提供的api
,如果是我們自己通過(guò)Message
來(lái)自定義心跳數(shù)據(jù)的話是沒(méi)有任何的處理的,下面是對(duì)應(yīng)的api
//發(fā)送ping
session.getAsyncRemote().sendPing(ByteBuffer buffer);//發(fā)送pong
session.getAsyncRemote().sendPong(ByteBuffer buffer);
1.3.1.3 ping和pong消息
ping 消息
和 pong 消息
都是 WebSocket
協(xié)議中的特殊消息類型,用于進(jìn)行心跳?;詈蜋z測(cè) WebSocket
連接的健康狀態(tài)。
ping 消息
:由服務(wù)器端(或客戶端)發(fā)送給對(duì)端的消息。它用于發(fā)起一個(gè)心跳檢測(cè)請(qǐng)求,要求對(duì)端回復(fù)一個(gè)pong
消息作為響應(yīng)。ping
消息通常用于檢測(cè)對(duì)端的連接是否仍然處于活動(dòng)狀態(tài),以及測(cè)量網(wǎng)絡(luò)延遲。pong 消息
:由對(duì)端(即客戶端或服務(wù)器端)作為對(duì)ping
消息的響應(yīng)發(fā)送回來(lái)。它用于確認(rèn)接收到ping
消息,并表明連接仍然活躍。
當(dāng)一方發(fā)送一個(gè) ping
消息時(shí),對(duì)端應(yīng)該立即發(fā)送一個(gè) pong
消息作為響應(yīng)。通過(guò)交換 ping 和 pong 消息
,可以檢測(cè)連接是否仍然有效,以及測(cè)量網(wǎng)絡(luò)的延遲時(shí)間。
ping 和 pong
消息通常由 WebSocket
底層協(xié)議處理,開發(fā)人員可以通過(guò)設(shè)置相應(yīng)的參數(shù)來(lái)啟用或禁用這些消息的交換。一般情況下,WebSocket
客戶端和服務(wù)器都會(huì)自動(dòng)處理 ping 和 pong
消息,無(wú)需開發(fā)人員顯式地處理。ping 和 pong
消息是屬于底層協(xié)議層
1.3.2 客戶端
1.3.2.1 客戶端接收
客戶端也是使用注解配置
@ClientEndpoint
public class JavaxWebSocketClientEndpoint {@OnOpenpublic void onOpen(Session session) {//連接建立}@OnClosepublic void onClose(Session session, CloseReason reason) {//連接關(guān)閉}@OnMessagepublic void onMessage(Session session, String message) {//接收文本消息}@OnMessagepublic void onMessage(Session session, PongMessage message) {//接收pong消息}@OnMessagepublic void onMessage(Session session, ByteBuffer message) {//接收二進(jìn)制消息}@OnErrorpublic void onError(Session session, Throwable e) {//異常處理}
}
客戶端使用@ClientEndpoint
來(lái)標(biāo)記,其他的@OnOpen,@OnClose,@OnMessage,@OnError
和服務(wù)端一模一樣
1.3.2.2 客戶端發(fā)送
WebSocketContainer container = ContainerProvider.getWebSocketContainer();
Session session = container.connectToServer(JavaxWebSocketClientEndpoint.class, uri);
我們可以通過(guò)ContainerProvider
來(lái)獲得一個(gè)WebSocketContainer
,然后調(diào)用connectToServer
方法將我們的客戶端類和連接的uri傳入就行了
通過(guò)ContainerProvider#getWebSocketContainer
獲得WebSocketContainer
其實(shí)是基于SPI
實(shí)現(xiàn)的
在Spring
的環(huán)境中更推薦大家使用ServletContextAware
來(lái)獲得,代碼如下
@Component
public class JavaxWebSocketContainer implements ServletContextAware {private volatile WebSocketContainer container;public WebSocketContainer getContainer() {if (container == null) {synchronized (this) {if (container == null) {container = ContainerProvider.getWebSocketContainer();}}}return container;}@Overridepublic void setServletContext(@NonNull ServletContext servletContext) {if (container == null) {container = (WebSocketContainer) servletContext.getAttribute("javax.websocket.server.ServerContainer");}}
}
發(fā)消息
Session session = container.connectToServer(JavaxWebSocketClientEndpoint.class, uri);//發(fā)送文本消息
session.getAsyncRemote().sendText(String message);//發(fā)送二進(jìn)制消息
session.getAsyncRemote().sendBinary(ByteBuffer message);//發(fā)送對(duì)象消息,會(huì)嘗試使用Encoder編碼
session.getAsyncRemote().sendObject(Object message);//發(fā)送ping
session.getAsyncRemote().sendPing(ByteBuffer buffer);//發(fā)送pong
session.getAsyncRemote().sendPong(ByteBuffer buffer);
1.4 WebMVC
pom依賴
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>
1.4.1 服務(wù)端
1.1.4.1 服務(wù)端接收
我們實(shí)現(xiàn)一個(gè)WebSocketHandler
來(lái)處理WebSocket
的連接,關(guān)閉,消息和異常
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;public class ServletWebSocketServerHandler implements WebSocketHandler {@Overridepublic void afterConnectionEstablished(@NonNull WebSocketSession session) throws Exception {//連接建立}@Overridepublic void handleMessage(@NonNull WebSocketSession session, @NonNull WebSocketMessage<?> message) throws Exception {//接收消息}@Overridepublic void handleTransportError(@NonNull WebSocketSession session, @NonNull Throwable exception) throws Exception {//異常處理}@Overridepublic void afterConnectionClosed(@NonNull WebSocketSession session, @NonNull CloseStatus closeStatus) throws Exception {//連接關(guān)閉}@Overridepublic boolean supportsPartialMessages() {//是否支持接收不完整的消息return false;}
}
1.1.4.2 服務(wù)端集成
首先需要添加@EnableWebSocket
來(lái)啟用WebSocket
然后實(shí)現(xiàn)WebSocketConfigurer
來(lái)注冊(cè)WebSocket
路徑以及對(duì)應(yīng)的WebSocketHandler
@Configuration
@EnableWebSocket
public class ServletWebSocketServerConfigurer implements WebSocketConfigurer {@Overridepublic void registerWebSocketHandlers(@NonNull WebSocketHandlerRegistry registry) {registry//添加處理器到對(duì)應(yīng)的路徑.addHandler(new ServletWebSocketServerHandler(), "/websocket")//注冊(cè)Handler.setAllowedOrigins("*");}
}
1.1.4.3 服務(wù)器握手?jǐn)r截
提供了HandshakeInterceptor
來(lái)攔截握手
@Configuration
@EnableWebSocket
public class ServletWebSocketServerConfigurer implements WebSocketConfigurer {@Overridepublic void registerWebSocketHandlers(@NonNull WebSocketHandlerRegistry registry) {registry//添加處理器到對(duì)應(yīng)的路徑.addHandler(new ServletWebSocketServerHandler(), "/websocket")//添加握手?jǐn)r截器.addInterceptors(new ServletWebSocketHandshakeInterceptor()).setAllowedOrigins("*");}public static class ServletWebSocketHandshakeInterceptor implements HandshakeInterceptor {@Overridepublic boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {//握手之前if (request instanceof ServletServerHttpRequest) {String path = request.getURI().getPath();if(requestIsValid(path)){String[] params = getParams(path);attributes.put("WEBSOCKET_AUTH", params[0]);attributes.put("WEBSOCKET_PID", params[1]);attributes.put("WEBSOCKET_SN", params[2]);attributes.put("WEBSOCKET_OPENID", params[3]);attributes.put("WEBSOCKET_FIRSTONE","yes");}}System.out.println("================Before Handshake================");return true;}@Overridepublic void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {//握手之后System.out.println("================After Handshake================");if(e!=null) e.printStackTrace();System.out.println("================After Handshake================");}private boolean requestIsValid(String url){//在這里可以寫上具體的鑒權(quán)邏輯boolean isvalid = false;if(StringUtils.isNotEmpty(url)&& url.startsWith("/netgate/")&& url.split("/").length==6){isvalid = true;}return isvalid;}private String[] getParams(String url){url = url.replace("/netgate/","");return url.split("/");}}
}
1.1.4.4 服務(wù)器地址問(wèn)題
當(dāng)在集成的時(shí)候發(fā)現(xiàn)這種方式?jīng)]辦法動(dòng)態(tài)匹配路徑,它的路徑就是固定的,沒(méi)辦法使用如/websocket/**
這樣的通配符
在研究了一下之后發(fā)現(xiàn)可以在UrlPathHelper
上解決
@Configuration
@EnableWebSocket
public class ServletWebSocketServerConfigurer implements WebSocketConfigurer {@Overridepublic void registerWebSocketHandlers(@NonNull WebSocketHandlerRegistry registry) {if (registry instanceof ServletWebSocketHandlerRegistry) {//替換UrlPathHelper((ServletWebSocketHandlerRegistry) registry).setUrlPathHelper(new PrefixUrlPathHelper("/websocket"));}registry//添加處理器到對(duì)應(yīng)的路徑.addHandler(new ServletWebSocketServerHandler(), "/websocket/**").setAllowedOrigins("*");}public class PrefixUrlPathHelper extends UrlPathHelper {private String prefix;public PrefixUrlPathHelper(String prefix){this.prefix=prefix;}@Overridepublic @NonNull String resolveAndCacheLookupPath(@NonNull HttpServletRequest request) {//獲得原本的PathString path = super.resolveAndCacheLookupPath(request);//如果是指定前綴就返回對(duì)應(yīng)的通配路徑if (path.startsWith(prefix)) {return prefix + "/**";}return path;}}
}
因?yàn)樗鼉?nèi)部實(shí)際上就是用一個(gè)Map<String, WebSocketHandler>
來(lái)存的,所以沒(méi)有辦法用通配符
1.4.2 客戶端
1.4.2.1 客戶端接收
和服務(wù)端一樣我們需要先實(shí)現(xiàn)一個(gè)WebSocketHandler
來(lái)處理WebSocket
的連接,關(guān)閉,消息和異常
public class ServletWebSocketClientHandler implements WebSocketHandler {@Overridepublic void afterConnectionEstablished(@NonNull WebSocketSession session) throws Exception {//連接建立}@Overridepublic void handleMessage(@NonNull WebSocketSession session, @NonNull WebSocketMessage<?> message) throws Exception {//接收消息}@Overridepublic void handleTransportError(@NonNull WebSocketSession session, @NonNull Throwable exception) throws Exception {//異常處理}@Overridepublic void afterConnectionClosed(@NonNull WebSocketSession session, @NonNull CloseStatus closeStatus) throws Exception {//連接關(guān)閉}@Overridepublic boolean supportsPartialMessages() {//是否支持接收不完整的消息return false;}
}
1.4.2.2 客服端發(fā)送
WebSocketClient client = new StandardWebSocketClient();
WebSocketHandler handler = new ServletWebSocketClientHandler();
WebSocketConnectionManager manager = new WebSocketConnectionManager(client, handler, uri);
manager.start();
首先我們需要先new一個(gè)StandardWebSocketClient
,可以傳入一個(gè)WebSocketContainer
參數(shù),獲得該對(duì)象的方式上面已經(jīng)介紹過(guò)了,這邊就先略過(guò)
然后new一個(gè)WebSocketConnectionManager
傳入WebSocketClient
,WebSocketHandler
還有路徑uri
最后調(diào)用一下WebSocketConnectionManager
的start
方法就可以了
這里如果大家去看WebSocketClient
的實(shí)現(xiàn)類就會(huì)發(fā)現(xiàn)有StandardWebSocketClient
還有JettyWebSocketClient
等等,所以大家可以根據(jù)自身項(xiàng)目所使用的容器來(lái)選擇不同的WebSocketClient
實(shí)現(xiàn)類
這里給大家貼一小段Spring適配不同容器WebSocket的代碼
public abstract class AbstractHandshakeHandler implements HandshakeHandler, Lifecycle {private static final boolean tomcatWsPresent;private static final boolean jettyWsPresent;private static final boolean jetty10WsPresent;private static final boolean undertowWsPresent;private static final boolean glassfishWsPresent;private static final boolean weblogicWsPresent;private static final boolean websphereWsPresent;static {ClassLoader classLoader = AbstractHandshakeHandler.class.getClassLoader();tomcatWsPresent = ClassUtils.isPresent("org.apache.tomcat.websocket.server.WsHttpUpgradeHandler", classLoader);jetty10WsPresent = ClassUtils.isPresent("org.eclipse.jetty.websocket.server.JettyWebSocketServerContainer", classLoader);jettyWsPresent = ClassUtils.isPresent("org.eclipse.jetty.websocket.server.WebSocketServerFactory", classLoader);undertowWsPresent = ClassUtils.isPresent("io.undertow.websockets.jsr.ServerWebSocketContainer", classLoader);glassfishWsPresent = ClassUtils.isPresent("org.glassfish.tyrus.servlet.TyrusHttpUpgradeHandler", classLoader);weblogicWsPresent = ClassUtils.isPresent("weblogic.websocket.tyrus.TyrusServletWriter", classLoader);websphereWsPresent = ClassUtils.isPresent("com.ibm.websphere.wsoc.WsWsocServerContainer", classLoader);}
}
發(fā)消息
import org.springframework.web.socket.*;WebSocketSession session = ...//發(fā)送文本消息
session.sendMessage(new TextMessage(CharSequence message);//發(fā)送二進(jìn)制消息
session.sendMessage(new BinaryMessage(ByteBuffer message));//發(fā)送ping
session.sendMessage(new PingMessage(ByteBuffer message));//發(fā)送pong
session.sendMessage(new PongMessage(ByteBuffer message));
1.5 WebFlux
WebFlux
的WebSocket
不需要額外的依賴包
1.5.1 服務(wù)端
1.5.1.1 服務(wù)端發(fā)送接收
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;public class ReactiveWebSocketServerHandler implements WebSocketHandler {@NonNull@Overridepublic Mono<Void> handle(WebSocketSession session) {Mono<Void> send = session.send(Flux.create(sink -> {//可以持有sink對(duì)象在任意時(shí)候調(diào)用next發(fā)送消息sink.next(WebSocketMessage message);})).doOnError(it -> {//異常處理});Mono<Void> receive = session.receive().doOnNext(it -> {//接收消息}).doOnError(it -> {//異常處理}).then();@SuppressWarnings("all")Disposable disposable = session.closeStatus().doOnError(it -> {//異常處理}).subscribe(it -> {//連接關(guān)閉});return Mono.zip(send, receive).then();}
}
首先需要注意這里的WebSocketHandler
和WebSocketSession
是reactive
包下的:
- 通過(guò)
WebSocketSession#send
方法來(lái)持有一個(gè)FluxSink<WebSocketMessage>
來(lái)用于發(fā)送消息 - 通過(guò)
WebSocketSession#receive
來(lái)訂閱消息 - 通過(guò)
WebSocketSession#closeStatus
來(lái)訂閱連接關(guān)閉事件
1.5.1.2 服務(wù)端集成
注入WebSocketHandlerAdapter
@Configuration(proxyBeanMethods = false)
public class ReactiveWebSocketConfiguration {@Beanpublic WebSocketHandlerAdapter webSocketHandlerAdapter() {return new WebSocketHandlerAdapter();}
}
注冊(cè)一個(gè)HandlerMapping
同時(shí)配置路徑和對(duì)應(yīng)的WebSocketHandler
@Order(Ordered.HIGHEST_PRECEDENCE)
@Component
public class ReactiveWebSocketServerHandlerMapping extends SimpleUrlHandlerMapping {public ReactiveWebSocketServerHandlerMapping() {Map<String, WebSocketHandler> map = new HashMap<>();map.put("/websocket/**", new ReactiveWebSocketServerHandler());setUrlMap(map);setOrder(100);}
}
注意
:我們自定義的HandlerMapping
需要設(shè)置order
,如果不設(shè)置,默認(rèn)為Ordered.LOWEST_PRECEDENCE
,會(huì)導(dǎo)致這個(gè)HandlerMapping
被放在最后,當(dāng)有客戶端連接上來(lái)時(shí)會(huì)被其他的HandlerMapping
優(yōu)先匹配上而連接失敗
1.5.2 客戶端
1.5.2.1 客戶端發(fā)送接收
客戶端WebSocketHandler
的寫法和服務(wù)端的一樣
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;public class ReactiveWebSocketClientHandler implements WebSocketHandler {@NonNull@Overridepublic Mono<Void> handle(WebSocketSession session) {Mono<Void> send = session.send(Flux.create(sink -> {//可以持有sink對(duì)象在任意時(shí)候調(diào)用next發(fā)送消息sink.next(WebSocketMessage message);})).doOnError(it -> {//處理異常});Mono<Void> receive = session.receive().doOnNext(it -> {//接收消息}).doOnError(it -> {//異常處理}).then();@SuppressWarnings("all")Disposable disposable = session.closeStatus().doOnError(it -> {//異常處理}).subscribe(it -> {//連接關(guān)閉});return Mono.zip(send, receive).then();}
}
1.5.2.2 客戶端發(fā)送
import org.springframework.web.reactive.socket.client.WebSocketClient;WebSocketClient client = ReactorNettyWebSocketClient();
WebSocketHandler handler = new ReactiveWebSocketClientHandler();
client.execute(uri, handler).subscribe();
首先我們需要先new一個(gè)ReactorNettyWebSocketClient
然后調(diào)用一下WebSocketClient
的execute
方法傳入路徑uri
和WebSocketHandler
并繼續(xù)調(diào)用subscribe
方法就可以了
注意
:WebFlux
和 WebMVC
中的 WebSocketClient
一樣,Reactive
包中的WebSocketClient
也有很多實(shí)現(xiàn)類,比如ReactorNettyWebSocketClient
,JettyWebSocketClient
,UndertowWebSocketClient
,TomcatWebSocketClient
等等,也是需要大家基于自身項(xiàng)目的容器使用不同的實(shí)現(xiàn)類
這里也給大家貼一小段Reactive適配不同容器WebSocket的代碼
public class HandshakeWebSocketService implements WebSocketService, Lifecycle {private static final boolean tomcatPresent;private static final boolean jettyPresent;private static final boolean jetty10Present;private static final boolean undertowPresent;private static final boolean reactorNettyPresent;static {ClassLoader loader = HandshakeWebSocketService.class.getClassLoader();tomcatPresent = ClassUtils.isPresent("org.apache.tomcat.websocket.server.WsHttpUpgradeHandler", loader);jettyPresent = ClassUtils.isPresent("org.eclipse.jetty.websocket.server.WebSocketServerFactory", loader);jetty10Present = ClassUtils.isPresent("org.eclipse.jetty.websocket.server.JettyWebSocketServerContainer", loader);undertowPresent = ClassUtils.isPresent("io.undertow.websockets.WebSocketProtocolHandshakeHandler", loader);reactorNettyPresent = ClassUtils.isPresent("reactor.netty.http.server.HttpServerResponse", loader);}
}
發(fā)消息
我們需要使用在WebSocketHandler
中獲得的FluxSink<WebSocketMessage>
來(lái)發(fā)送消息
import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;public class ReactiveWebSocket {private final WebSocketSession session;private final FluxSink<WebSocketMessage> sender;public ReactiveWebSocket(WebSocketSession session, FluxSink<WebSocketMessage> sender) {this.session = session;this.sender = sender;}public String getId() {return session.getId();}public URI getUri() {return session.getHandshakeInfo().getUri();}public void send(Object message) {if (message instanceof WebSocketMessage) {sender.next((WebSocketMessage) message);} else if (message instanceof String) {//發(fā)送文本消息sender.next(session.textMessage((String) message));} else if (message instanceof DataBuffer) {//發(fā)送二進(jìn)制消息sender.next(session.binaryMessage(factory -> (DataBuffer) message));} else if (message instanceof ByteBuffer) {//發(fā)送二進(jìn)制消息sender.next(session.binaryMessage(factory -> factory.wrap((ByteBuffer) message)));} else if (message instanceof byte[]) {//發(fā)送二進(jìn)制消息sender.next(session.binaryMessage(factory -> factory.wrap((byte[]) message)));} else {throw new IllegalArgumentException("Message type not match");}}public void ping() {//發(fā)送pingsender.next(session.pingMessage(factory -> factory.wrap(ByteBuffer.allocate(0))));}public void pong() {//發(fā)送pongsender.next(session.pongMessage(factory -> factory.wrap(ByteBuffer.allocate(0))));}public void close(CloseStatus reason) {sender.complete();session.close(reason).subscribe();}
}