Spring Boot WebFlux:实现web(Server-Sent Events)事件异步推送

泰山AI 2024-06-22 15:33:04 阅读 59

WebFlux

Spring Boot中,Flux是一个重要的概念,它是Spring Framework 5.0以后引入的响应式编程框架WebFlux的核心组件之一。FluxReactor项目的一部分,它实现了Reactive Streams规范,用于处理异步、非阻塞的数据流。

与传统的Spring MVC不同,WebFlux不需要Servlet API,而是通过Reactor项目提供的FluxMono来构建响应式应用程序。Flux用于表示包含0到N个元素的异步序列,而Mono则表示包含0到1个元素的异步序列。这使得WebFlux可以在有限的资源下提高系统的吞吐量和伸缩性,从而在处理大量请求时表现出更好的性能。

WebFlux中,所有的操作都是非阻塞的,并且基于Reactor的背压机制来控制数据流的速度,从而防止系统资源的过度消耗。这使得WebFlux在处理高并发请求时具有更好的稳定性和可扩展性。

Spring Boot中使用Flux时,通常需要在控制器方法中返回一个Flux对象,以表示一个异步的数据流。然后,可以使用各种操作符来对这个数据流进行转换、过滤、聚合等操作,以满足业务需求。例如,可以使用map操作符将数据流中的每个元素转换为另一种形式,使用filter操作符过滤掉不符合条件的元素,使用reduce操作符将数据流中的所有元素聚合成一个结果等。

总之,FluxSpring Boot中实现响应式编程的重要组件之一,它使得应用程序能够更好地处理高并发请求,提高系统的吞吐量和伸缩性。

Flux 的主要特性包括:

异步非阻塞:Flux 旨在以非阻塞的方式处理数据,这意味着它不会阻塞调用线程,而是立即返回一个表示异步计算结果的对象。这使得它能够在高并发环境下提供更好的性能和响应能力。

背压(Backpressure):背压是响应式编程中的一个重要概念,它允许接收方控制发送方的数据发送速率,以防止数据过快产生而导致的资源耗尽。Flux 提供了背压机制,使得接收方可以通知发送方其处理能力,从而避免数据丢失或资源耗尽。

冷流与热流:Flux 中的流可以是冷的(Cold)或热的(Hot)。冷流会为每个订阅者重新生成数据,而热流则会广播数据给所有订阅者,无论订阅者何时订阅。

操作符:Flux 提供了一系列的操作符,这些操作符可以链式调用,用于对流中的数据进行转换、过滤、聚合等操作。这些操作符使得在流上执行复杂的逻辑变得简单且直观。

错误处理:Flux 支持错误处理机制,可以在流中出现错误时进行适当的处理,如重试、忽略错误或传播错误。

与其他响应式库的集成:Flux 可以与其他遵循 Reactive Streams 规范的响应式库无缝集成,如 RxJava、Rx.NET 等。

Spring Boot 应用程序中,Flux 通常用于构建响应式 Web 服务。你可以使用 Spring WebFlux基于 Project Reactor)来创建异步的、非阻塞的 Web 控制器,这些控制器可以返回 Flux 或 Mono 对象作为响应。这种模型非常适合处理大量并发请求,特别是在微服务架构中。

WebFlux和sse

FluxSpring Framework 5.0中引入的响应式编程库Project Reactor的核心组件之一。它是一个用于表示异步、非阻塞数据流的类型,你可以在其上应用各种操作符来进行数据转换、过滤和聚合等操作。在Spring Boot应用程序中,Flux通常用于构建响应式Web服务,返回给客户端的是一个数据流而不是一次性的数据包。这使得客户端可以持续接收服务器的更新,而不需要频繁地发起新的请求。

SSE(Server-Sent Events)是另一种在服务器端和客户端之间建立异步通信的机制。与WebSocket不同,SSE是一种单向通信协议,只允许服务器向客户端发送数据。当服务器有新的数据产生时,它会通过SSE连接将数据发送给客户端。SSE基于HTTP协议,这意味着它可以在现有的HTTP基础设施上工作,而不需要额外的配置或协议支持。此外,SSE使用简单的文本格式来表示传输的数据,这使得它在浏览器端的支持非常广泛,除了IE浏览器外,其他现代浏览器都支持SSE

Spring Boot应用程序中,你可以使用WebFlux框架来实现SSE。在WebFlux中,你可以返回一个类型为Flux<ServerSentEvent>的对象来创建一个SSE端点。ServerSentEvent是一个特殊的类型,用于表示服务器发送给客户端的事件。当这个Flux对象发出事件时,它们将作为SSE事件发送给客户端。

总结起来,FluxSSE都是用于在服务器端和客户端之间建立异步通信的技术。Flux是一个更通用的响应式编程库,可以用于构建各种异步场景,而SSE则是一种专门用于服务器向客户端推送事件的机制。在Spring Boot应用程序中,你可以使用Flux来实现SSE端点,从而利用SSE的优势来提供实时数据更新给客户端。

教程

springboot 项目引入webflux依赖

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency>

创建flux控制器,FluxController 类,一下代码是接入文心一言大模型web流式接口,并使用flux返回给前端。

import com.alibaba.fastjson.JSON;import lombok.extern.slf4j.Slf4j;import org.json.JSONArray;import org.json.JSONObject;import org.springframework.http.HttpHeaders;import org.springframework.http.MediaType;import org.springframework.web.bind.annotation.PostMapping;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import org.springframework.web.reactive.function.client.WebClient;import reactor.core.publisher.Flux;import java.io.IOException;/** * @author tarzan */@RestController@RequestMapping@Slf4jpublic class FluxController { public static final String API_KEY = ""; public static final String SECRET_KEY = ""; private static final String ACCESS_TOKEN_URL = "https://aip.baidubce.com/oauth/2.0/token?grant_type=client_credentials&client_id=" + API_KEY + "&client_secret=" + SECRET_KEY; private static final String CHAT_URL = "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/eb-instant"; @PostMapping(path = "/event-stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<String> helloWorld() throws IOException { /* return Flux.fromArray("hello world".split(" ")) // 每个元素延迟500毫秒,以模拟逐字效果 .delayElements(Duration.ofMillis(500));*/ // 构造聊天请求参数 JSONObject chatPayload = new JSONObject(); JSONArray messagesArray = new JSONArray(); JSONObject message = new JSONObject(); message.put("role", "user"); message.put("content", "为什么现在的农村出来的大学生,混的都不好?"); messagesArray.put(message); chatPayload.put("messages", messagesArray); chatPayload.put("stream", true); StringBuffer answer=new StringBuffer(); return WEB_CLIENT.post().uri("rpc/2.0/ai_custom/v1/wenxinworkshop/chat/eb-instant?access_token=" + getAccessToken()).contentType(MediaType.APPLICATION_JSON) .bodyValue(chatPayload.toString()) .retrieve() .bodyToFlux(String.class) // 可能需要其他流处理,比如map、filter等 .map(data -> { String result=JSON.parseObject(data).getString("result"); answer.append(result); return result; }).doOnComplete(() -> { // 当Flux完成时,输出结束消息 System.out.println("处理完毕,流已关闭。"); System.out.println(answer); }); } private static final WebClient WEB_CLIENT = WebClient.builder().baseUrl("https://aip.baidubce.com").defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_STREAM_JSON_VALUE).build(); static String getAccessToken() { // 构建请求体 String requestBody = "grant_type=client_credentials" + "&client_id=" + API_KEY + "&client_secret=" + SECRET_KEY; return WEB_CLIENT.post().uri("/oauth/2.0/token").bodyValue(requestBody).retrieve() // 假设服务器返回的是 JSON 格式的字符串 .bodyToMono(String.class) .map(data -> JSON.parseObject(data).getString("access_token") ).block(); }}

前端代码如下:

<html lang="zh"><head><meta charset="UTF-8"/><title>es</title><script src="js/jquery-3.1.1.js"></script></head><noscript><h2 style="color: #e80b0a;">Sorry,浏览器不支持WebSocket</h2></noscript><body><div><p id="data"></p></div></body><script type="text/javascript"> $(function() { /* const eventSource = new EventSource("/event-stream"); const dataDiv = $('#data'); eventSource.onmessage = function(event) { console.log('Received message:'+event.data); dataDiv.append(event.data); }; eventSource.onerror = function(error) { console.error('EventSource failed:'+error); dataDiv.append(error); eventSource.close(); };*/ const dataDiv = $('#data'); // 准备要发送的数据 const data = new FormData(); data.append('key1', 'value1'); data.append('key2', 'value2');// 发送 POST 请求并处理响应 fetch('/event-stream', { method: 'POST', body: data }).then(response => { // 检查响应是否为 SSE 流 if (response.headers.get('Content-Type') === 'text/event-stream') { // 返回一个可读流以读取响应体 return response.body.getReader(); } else { throw new Error('Response is not an SSE stream'); } }).then(reader => { // 递归处理 SSE 事件的函数 function processSSEEvent(reader) { return reader.read().then(({ done, value}) => { if (done) { console.log('SSE stream ended'); return; } // value 是一个包含数据的 Uint8Array const eventStr = new TextDecoder().decode(value).trim(); // const event = eventStr.trim(); // 处理事件(例如,解析为 JSON,如果事件是 JSON 格式) console.log('Received SSE event:', eventStr); const event = eventStr.replace(/^data:/, ''); dataDiv.append(event); // 继续读取下一个事件 return processSSEEvent(reader); }); } // 开始处理 SSE 事件 return processSSEEvent(reader); }).catch(error => { console.error('Error fetching SSE stream:', error); }) });</script></html>

前端代码接收flux流式返回结果,将响应结果不断输入到前端页面上。代码中注释部分EventSource,只支持get请求,如果是调用get请求的接口可以使用注释的代码,实现flux返回的结果。

补充

上面代码只适合接收纯文本的数据,如果是json数据请修改代码如下:

let aiType = chatType.val(); const data = new FormData(); data.append('qa', qa); fetch(`/chat/${ user.id}/${ aiType}`, { method: 'POST', body: data, // 你的POST数据 }).then(response => { if (!response.body || !response.body.getReader) { chatNormal(); throw new Error('Response does not support streaming'); } const reader = response.body.getReader(); let buffer = ''; let fullAnswer = ''; let isStart = true; return new ReadableStream({ start(controller) { function readChunk() { return reader.read().then(async ({ done, value}) => { if (done) { controller.close(); chatNormal(); // 复制功能 copy(); // 分享功能 share(); return; } const chunk = new TextDecoder().decode(value); buffer += chunk; // 分割并处理每一行 const lines = buffer.split('\n'); for (let i = 0; i < lines.length - 1; i++) { const eventStr = lines[i]; if (eventStr !== '') { let event = eventStr.replace(/^data:/, ''); let res = JSON.parse(event); await printCharacters(res); if (isStart) { fullAnswer = ''; isStart = false; } } } buffer = lines[lines.length - 1]; // 保留未完整的一行 await readChunk(); // 继续读取下一块数据 }); } async function printCharacters(res) { let sentence = res.answer; for (let word of sentence.split('')) { await new Promise(resolve => setTimeout(() => { fullAnswer = fullAnswer + word; subscribe(res, fullAnswer); resolve(); }, 25)); } } readChunk(); }, }); });

以上代码是我接入AI聊天接口,通过event-stream 返回给前端,实现逐字输出的部分代码片段

EventSource简介

EventSourceHTML5中引入的一种新的API,它允许服务器向客户端推送实时事件。这种推送是基于HTTP协议的,并且使用一种特殊的MIME类型,即"text/event-stream",这使得服务器能够发送一系列的事件到客户端。

EventSource主要用途是实现服务器和客户端之间的实时通信。客户端通过创建一个EventSource对象并指定一个URL,就可以开始监听服务器在该URL上发送的事件。一旦服务器有新的事件数据要发送,它就会将这些数据以特定的格式(即"event: data"的形式)发送给客户端。客户端在接收到这些数据后,可以通过注册的事件处理函数来处理这些数据。

EventSource具有一些重要的特性,包括实时性、低延迟和易用性。由于它使用长连接的方式进行数据传输,因此相比于传统的轮询方式,它能够更加高效地传输数据。此外,由于它是基于HTTP协议的,因此它可以在各种不同的场景下使用,并且与WebSocket和长轮询等方式兼容。

总的来说,EventSource是一种非常有用的技术,它使得服务器能够实时地向客户端推送事件,从而提高了Web应用程序的实时性和响应性。

实现效果

在这里插入图片描述



声明

本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。