一個(gè)公網(wǎng)ip可以做幾個(gè)網(wǎng)站青島官網(wǎng)seo方法
背景
接上一篇《LLM大模型統(tǒng)一封裝接口解決方案》架構(gòu)確定后,流式方案非常規(guī)請(qǐng)求,需要特殊處理。
本解決方案就是針對(duì)上一篇中所需要的流式(打字機(jī)效果進(jìn)行編碼)
什么是SSE
SSE(Server-Sent Events,服務(wù)器發(fā)送事件)是一種基于HTTP的服務(wù)器到客戶端的單向通信技術(shù),用于實(shí)現(xiàn)服務(wù)器向客戶端推送數(shù)據(jù)的功能。SSE協(xié)議標(biāo)準(zhǔn)由HTML5規(guī)范定義,并且其定義被包含在HTML Living Standard中。
SSE允許服務(wù)器通過HTTP連接向客戶端發(fā)送數(shù)據(jù),而無需客戶端發(fā)起請(qǐng)求。這使得SSE非常適合于實(shí)時(shí)通信或推送通知給客戶端的應(yīng)用程序,例如實(shí)時(shí)股票報(bào)價(jià)、即時(shí)通訊、實(shí)時(shí)監(jiān)控等場景。
基本上,SSE由以下要素組成:
- 服務(wù)器:負(fù)責(zé)向客戶端發(fā)送事件流的HTTP服務(wù)器。
- 客戶端:通過瀏覽器中的EventSource API與服務(wù)器建立連接,接收服務(wù)器發(fā)送的事件。
- 事件流(Event Stream):服務(wù)器向客戶端發(fā)送的數(shù)據(jù)流,格式為純文本,使用一種特定的格式進(jìn)行編碼,例如MIME類型為"text/event-stream"。
SSE的優(yōu)點(diǎn)包括簡單易用、實(shí)現(xiàn)方便、跨瀏覽器支持良好等。然而,它也有一些限制,例如不能支持雙向通信,與WebSocket相比,SSE的實(shí)時(shí)性稍遜一籌。
Java框架說明
pom 文件引入的核心依賴包
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.0</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>aip.com</groupId><artifactId>aip-com</artifactId><version>0.0.1</version><name>aip-com</name><description>aip com project for Spring Boot</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId></dependency><dependency><groupId>io.reactivex.rxjava2</groupId><artifactId>rxjava</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
Java后端核心代碼
本方法是標(biāo)準(zhǔn)的SSE協(xié)議標(biāo)準(zhǔn)
private final ExecutorService executorService = Executors.newFixedThreadPool(5);/*** 會(huì)話請(qǐng)求** @return String*/@PostMapping(value = "/completions", consumes = MediaType.APPLICATION_JSON_VALUE)@Operation(summary = "會(huì)話請(qǐng)求")public SseEmitter completions(@RequestBody CompletionRequest completionRequest) {response.setContentType(MediaType.TEXT_EVENT_STREAM_VALUE);SseEmitter emitter = new SseEmitter();executorService.execute(() -> {try {for (int i = 0; i < 10; i++) {// 向客戶端發(fā)送事件emitter.send(SseEmitter.event().name("message").data(JsonHelper.toJSONString(new StreamCompletionResult.Builder().ended(false).message(String.valueOf(i)).build())));Thread.sleep(1000);}emitter.complete();} catch (Exception e) {emitter.completeWithError(e);}});return emitter;/*** 會(huì)話請(qǐng)求** @return String*/@GetMapping(value = "/stream")@Operation(summary = "會(huì)話請(qǐng)求")public SseEmitter stream() {response.setContentType(MediaType.TEXT_EVENT_STREAM_VALUE);SseEmitter emitter = new SseEmitter();executorService.execute(() -> {try {for (int i = 0; i < 10; i++) {// 向客戶端發(fā)送事件emitter.send(SseEmitter.event().name("message").data(JsonHelper.toJSONString(new StreamCompletionResult.Builder().ended(false).message(String.valueOf(i)).build())));Thread.sleep(1000);}emitter.complete();} catch (Exception e) {emitter.completeWithError(e);}});return emitter;
Flux 和 Flowable 對(duì)比
Flux 和 Flowable 都是響應(yīng)式編程庫中的數(shù)據(jù)流類型,用于處理異步和基于事件的流式數(shù)據(jù)。它們分別來自于不同的庫,Flux 是 Reactor 庫的一部分,而 Flowable 則是 RxJava 庫的一部分。以下是它們之間的一些區(qū)別:
-
庫的來源:
- Flux 來自于 Reactor 庫,是 Reactor 的核心組件之一,React的核心模塊用于基于反應(yīng)式流規(guī)范處理數(shù)據(jù)流。
- Flowable 來自于 RxJava 庫,是 RxJava 的核心類之一,RxJava 是 Java 平臺(tái)的反應(yīng)式擴(kuò)展庫,用于處理異步和基于事件的編程。
-
背壓策略:
- Flux 默認(rèn)采用背壓策略為 BUFFER,可以通過 onBackpressureBuffer、onBackpressureDrop、onBackpressureLatest 等方法來指定不同的背壓策略。
- Flowable 默認(rèn)也是支持背壓的,但是相比 Flux,Flowable 提供了更多的背壓策略,如 BUFFER、DROP、LATEST、ERROR、MISSING。
-
反應(yīng)式規(guī)范:
- Flux 遵循 Reactor 庫的反應(yīng)式流規(guī)范,使用 Mono 和 Flux 來表示異步流和單個(gè)結(jié)果。
- Flowable 遵循 RxJava 庫的反應(yīng)式流規(guī)范,使用 Observable 和 Flowable 來表示異步流和單個(gè)結(jié)果。
-
生態(tài)系統(tǒng):
- Reactor 生態(tài)系統(tǒng)主要用于基于 Reactor 的應(yīng)用程序。
- RxJava 生態(tài)系統(tǒng)則更廣泛,它是 ReactiveX 的一部分,支持多種語言和平臺(tái),并有許多衍生項(xiàng)目。
總的來說,Flux 和 Flowable 在概念上很相似,都用于處理異步和基于事件的流式數(shù)據(jù),但它們來自于不同的庫,并且有一些細(xì)微的區(qū)別,如背壓策略和生態(tài)系統(tǒng)支持。您可以根據(jù)項(xiàng)目需求選擇適合的庫和數(shù)據(jù)流類型。
Java后端Flowable方式
本方法是Flowable方式,非標(biāo)準(zhǔn)流式規(guī)則
/*** 會(huì)話請(qǐng)求** @return String*/@GetMapping(value = "/stream")@Operation(summary = "會(huì)話請(qǐng)求")public Flowable<String> stream() {response.setContentType(MediaType.TEXT_EVENT_STREAM_VALUE);Flowable<String> typingFlow = Flowable.create(emitter -> {executorService.execute(() -> {try {for (int i = 0; i < 10; i++) {emitter.onNext(JsonHelper.toJSONString(new StreamCompletionResult.Builder().ended(false).message(String.valueOf(i)).build()));Thread.sleep(1000);}emitter.onComplete();} catch (Exception e) {}});}, BackpressureStrategy.BUFFER);return typingFlow;}
Java后端Flux方式
本方法是Flux方式,非標(biāo)準(zhǔn)流式規(guī)則
/*** 會(huì)話請(qǐng)求** @return String*/@GetMapping(value = "/stream")@Operation(summary = "會(huì)話請(qǐng)求")public Flux<String> stream() {response.setContentType(MediaType.TEXT_EVENT_STREAM_VALUE);Flux<String> typingFlow = Flux.create(emitter -> {executorService.execute(() -> {try {for (int i = 0; i < 10; i++) {emitter.next(JsonHelper.toJSONString(new StreamCompletionResult.Builder().ended(false).message(String.valueOf(i)).build()));Thread.sleep(1000);}emitter.complete();} catch (Exception e) {}});}, FluxSink.OverflowStrategy.BUFFER);return typingFlow;}
}
HTML 客戶端接收示例程序
function EventSourceGetRequest() SSE 默認(rèn)方法,只支持GET請(qǐng)求,適合演示用途以及后端包裝好服務(wù)
function fetchPostRequest() fetch POST 請(qǐng)求實(shí)現(xiàn)SSE,支持所有請(qǐng)求(POST,GET等)以及傳遞參數(shù)
sse.html 內(nèi)容
<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>SEE Example</title><script>// SSE 默認(rèn)方法,只支持GET請(qǐng)求function EventSourceGetRequest() {if(typeof(EventSource)!=="undefined"){var eventSource = new EventSource('http://127.0.0.1:8090/v1/chat/stream');eventSource.onmessage = function(event){document.getElementById('result').insertAdjacentHTML('beforeend', `${event.data}<br/><br/>`);console.log(event)};}else{document.getElementById("result").innerHTML="抱歉,你的瀏覽器不支持 server-sent 事件...";}}// fetch POST 請(qǐng)求實(shí)現(xiàn)SSEfunction fetchPostRequest() {fetch('http://127.0.0.1:8090/v1/chat/completions', {method: 'POST',headers: {'Content-Type': 'application/json'},body: JSON.stringify({}),}).then(response => {// 檢查響應(yīng)是否成功if (!response.ok) {throw new Error('Network response was not ok');}// 返回 ReadableStream 對(duì)象return response.body;}).then(stream => {// 創(chuàng)建一個(gè)新的文本解碼器const decoder = new TextDecoder();// 獲取一個(gè) reader 對(duì)象const reader = stream.getReader();let chunk = ''// 逐塊讀取數(shù)據(jù)function read() {reader.read().then(({ done, value }) => {if (done) {document.getElementById('result').insertAdjacentHTML('beforeend', `${chunk}<hr/>`);console.log('Stream has ended');return;}// 將數(shù)據(jù)塊轉(zhuǎn)換為字符串并顯示const tmp = decoder.decode(value, { stream: true });if (tmp.startsWith('event:') && chunk!='') {document.getElementById('result').insertAdjacentHTML('beforeend', `${chunk}<hr/>`);chunk = tmp}else{chunk = chunk + tmp}// 繼續(xù)讀取下一塊數(shù)據(jù)read();});}// 開始讀取數(shù)據(jù)read();}).catch(error => {// 處理錯(cuò)誤console.error('There was a problem with the fetch operation:', error);});}// EventSourceGetRequest();fetchPostRequest();</script>
</head>
<body><h1>SEE result</h1><div id="result"></div>
</body>
</html>
- 標(biāo)準(zhǔn)SSE示例
- 擴(kuò)展SSE