WebSocket vs SSE: 实时数据推送到前端的选择与实现(详细)

一只牛博 2024-06-30 16:03:02 阅读 77

Websocket和Server-Sent Events 对比推送数据给前端及各自的实现

二者对比WebSocket:Server-Sent Events (SSE):选择 WebSocket 还是 SSE:

Websocket 实现使用原生 WebSocket API:使用 Netty 创建 WebSocket:总结和选择:Netty 实现 Websocket

Server-Sent Events (SSE)实现创建DataManager接口实现实现说明前端实现弊端以及解决方案

在现代 Web 应用程序中,实时数据推送给前端变得越来越重要。无论是实时聊天、实时通知还是仪表板上的实时更新,都需要一种有效的方式来将数据推送给前端。本文将介绍两种常用的实现方法:WebSocket 和 Server-Sent Events(SSE),并提供详细的实现步骤。

二者对比

WebSocket 和 Server-Sent Events (SSE) 都是用于实现实时数据推送的技术,但它们在设计、用途和实现上有一些重要的区别。让我们详细比较这两种技术。

WebSocket:

双向通信

WebSocket 允许双向通信,客户端和服务器都可以在任何时候向对方发送数据。这使得 WebSocket 非常适用于需要双向交互的应用,如在线聊天、多人协作工具等。

持久连接

WebSocket 建立持久连接,客户端和服务器之间的连接保持打开状态。这减少了与建立和关闭连接相关的开销,适用于频繁的数据交换。

低延迟

由于持久连接,WebSocket 可以实现低延迟的实时数据传输,适用于需要快速响应的应用。

复杂性

实现 WebSocket 可能相对复杂,需要更多的服务器资源和额外的协议处理。

跨域通信

WebSocket 通常需要配置服务器以允许跨域通信,因为它们使用自定义协议。

浏览器支持

WebSocket 在现代浏览器中得到广泛支持。

Server-Sent Events (SSE):

单向通信

SSE 是一种单向通信,只允许服务器向客户端发送数据。客户端无法向服务器发送数据。

HTTP 协议

SSE 建立在 HTTP 协议之上,使用标准 HTTP 请求和响应。这使得 SSE 更容易部署,因为它与现有的 HTTP 基础设施兼容。

简单性

SSE 的实现相对简单,服务器和客户端都不需要太多复杂的逻辑。

无需专用库

SSE 不需要额外的库或协议处理,客户端可以使用浏览器的原生 EventSource API 来接收数据。

跨域通信

SSE 支持跨域通信,可以通过 CORS(跨域资源共享)机制进行配置。

浏览器支持

SSE 在现代浏览器中也得到广泛支持,但与 WebSocket 相比,它的历史要长一些。

选择 WebSocket 还是 SSE:

WebSocket 适用于需要双向通信和低延迟的场景,例如在线游戏、实时聊天应用等。

SSE 适用于单向服务器到客户端的实时数据推送,例如新闻更新、实时股票报价、天气预报等,特别是当你希望使用现有的 HTTP 基础设施时。

在某些情况下,你甚至可以同时使用 WebSocket 和 SSE,根据不同的需求选择合适的技术。

无论选择哪种技术,都需要考虑你的应用程序的具体需求和复杂性。WebSocket 提供了更多的灵活性和功能,而 SSE 更加简单和易于部署。最终的选择取决于你的项目目标和资源。

Websocket 实现

使用原生 WebSocket API:

简单性

Spring Boot 提供了对原生 WebSocket API 的支持,使得创建 WebSocket 应用相对简单。开发人员可以直接使用 Java 标准库中的 WebSocket 相关类来处理 WebSocket 通信。

依赖

原生 WebSocket 不需要额外的依赖,因为 WebSocket API 已经包含在 Java 标准库中。

性能

原生 WebSocket API 在性能方面表现良好,适用于大多数中小型应用。

生态系统

使用原生 WebSocket 可以更容易地集成到现有的 Spring Boot 生态系统中,例如 Spring Security 等。

简单应用

当你需要创建相对简单的 WebSocket 应用时,原生 WebSocket 是一个不错的选择。

使用 Netty 创建 WebSocket:

灵活性

Netty 是一个高度可定制的异步事件驱动框架,它可以用于创建各种网络应用,包括 WebSocket。Netty 提供了更多的灵活性和自定义选项,适用于复杂的 WebSocket 应用。

性能

Netty 以其高性能和低延迟而闻名,适用于需要处理大量并发连接的应用。

协议支持

Netty 支持多种协议,不仅限于 WebSocket。这意味着你可以在同一个应用程序中处理多种网络通信需求。

集成

尽管 Netty 可以集成到 Spring Boot 中,但其集成可能需要更多的配置和代码。

复杂应用

当你需要处理复杂的 WebSocket 场景,如高并发、自定义协议、复杂的消息处理等时,使用 Netty 是更好的选择。

总结和选择:

选择原生 WebSocket 还是使用 Netty 创建 WebSocket 应取决于你的项目需求和复杂性:

如果你的应用相对简单,对性能要求不是很高,可以考虑使用原生 WebSocket API,它更容易上手并且不需要额外的依赖。

如果你的应用需要处理高并发、复杂的协议、自定义消息处理或需要最大程度的性能和灵活性,那么使用 Netty 创建 WebSocket 可能更合适。Netty 为你提供了更多的控制权和自定义选项。

无论你选择哪种方法,Spring Boot 都提供了良好的支持,使得在应用中集成 WebSocket 变得相对容易。因此,你可以根据具体的项目需求来选择适合你的方法。

Netty 实现 Websocket

添加 maven 坐标

<!-- netty -->

<dependency>

<groupId>io.netty</groupId>

<artifactId>netty-common</artifactId>

<version>4.1.79.Final</version>

</dependency>

创建 NettyWebsocketServer

package com.todoitbo.baseSpringbootDasmart.netty.server;

import com.todoitbo.baseSpringbootDasmart.netty.handler.HeartbeatHandler;

import com.todoitbo.baseSpringbootDasmart.netty.handler.WebSocketHandler;

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.*;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioServerSocketChannel;

import io.netty.handler.codec.http.HttpObjectAggregator;

import io.netty.handler.codec.http.HttpServerCodec;

import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;

import io.netty.handler.stream.ChunkedWriteHandler;

import io.netty.handler.timeout.IdleStateHandler;

import io.netty.handler.traffic.ChannelTrafficShapingHandler;

/**

* @author xiaobo

* @date 2023/9/5

*/

public class NettyWebsocketServer {

private final int port;

public NettyWebsocketServer(int port) {

this.port = port;

}

public void run() throws Exception {

EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 创建用于接受客户端连接的 boss 线程池

EventLoopGroup workerGroup = new NioEventLoopGroup(); // 创建用于处理客户端请求的 worker 线程池

try {

ServerBootstrap b = new ServerBootstrap();

b.group(bossGroup, workerGroup)

.channel(NioServerSocketChannel.class)

.childHandler(new ChannelInitializer<SocketChannel>() {

@Override

public void initChannel(SocketChannel ch) throws Exception {

ChannelTrafficShapingHandler trafficShapingHandler = new ChannelTrafficShapingHandler(

1, // 读取速率限制(字节/秒)

1, // 写入速率限制(字节/秒)

1, // 流量检查时间间隔(毫秒)

1 // 最大允许的时间窗口(毫秒)

);

ChannelPipeline pipeline = ch.pipeline();

// 添加心跳检测处理器,3秒内没有读写事件将触发 IdleStateEvent,下面的顺序错了也会出现问题的

pipeline.addLast(new IdleStateHandler(30, 0, 0));

pipeline.addLast(new HeartbeatHandler());

pipeline.addLast(new HttpServerCodec()); // 处理 HTTP 请求

pipeline.addLast(new ChunkedWriteHandler()); // 写大数据流的处理器

pipeline.addLast(new HttpObjectAggregator(8192)); // 将 HTTP 消息聚合为 FullHttpRequest 或 FullHttpResponse

// pipeline.addLast(new WebSocketFrameAggregator(8192)); // 将 HTTP 消息聚合为 FullHttpRequest 或 FullHttpResponse

// pipeline.addLast(new WebSocketServerCompressionHandler()); // 消息压缩

pipeline.addLast(new WebSocketHandler()); // 自定义 WebSocket 处理器

pipeline.addLast(new WebSocketServerProtocolHandler("/ws", null, true, 65536 * 10)); // 处理 WebSocket 升级握手和数据帧处理

}

})

.option(ChannelOption.SO_BACKLOG, 128) // 设置服务器接受队列大小

.childOption(ChannelOption.SO_KEEPALIVE, true); // 开启 TCP 连接的 Keep-Alive 功能

// Bind and start to accept incoming connections.

System.out.println("TCP server started successfully");

ChannelFuture f = b.bind(port).sync(); // 绑定端口并等待绑定完成

// Wait until the server socket is closed.

// In this example, this does not happen, but you can do that to gracefully // shut down your server. f.channel().closeFuture().sync(); // 阻塞直到服务器关闭

} finally {

// 优雅地关闭线程池

workerGroup.shutdownGracefully();

bossGroup.shutdownGracefully();

}

}

}

这里需要注意一下,pipeline.addLast的顺序不一致可能会导致程序报错,运行时

创建心跳 handle

package com.todoitbo.baseSpringbootDasmart.netty.handler;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.ChannelInboundHandlerAdapter;

import io.netty.handler.timeout.IdleState;

import io.netty.handler.timeout.IdleStateEvent;

public class HeartbeatHandler extends ChannelInboundHandlerAdapter {

int readTimeOut = 0;

@Override

public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

IdleStateEvent event = (IdleStateEvent) evt;

if(event.state() == IdleState.READER_IDLE){

readTimeOut++;

}

if(readTimeOut >= 3){

System.out.println("超时超过3次,断开连接");

ctx.close();

}

}

}

创建WebSocketHandler

package com.todoitbo.baseSpringbootDasmart.netty.handler;

import cn.hutool.core.collection.CollectionUtil;

import com.todoitbo.baseSpringbootDasmart.netty.NamedChannelGroup;

import io.netty.buffer.ByteBuf;

import io.netty.buffer.Unpooled;

import io.netty.channel.*;

import io.netty.handler.codec.http.*;

import io.netty.handler.codec.http.websocketx.*;

import io.netty.util.AttributeKey;

import io.netty.util.CharsetUtil;

import lombok.extern.slf4j.Slf4j;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

/**

* @author xiaobo

*/

@Slf4j

public class WebSocketHandler extends SimpleChannelInboundHandler<Object> {

private WebSocketServerHandshaker handshaker;

public static final AttributeKey<String> USER_ID_KEY = AttributeKey.valueOf("userId");

public static final AttributeKey<String> GROUP_ID_KEY = AttributeKey.valueOf("groupId");

private static final Map<Channel, String> WORK_CHANNEL_MAP = new HashMap<Channel,String>();

@Override

public void channelActive(ChannelHandlerContext ctx) throws Exception {

log.info("与客户端建立连接,通道开启!");

// 添加到channelGroup通道组(广播)

// 之后可以根据ip来进行分组

NamedChannelGroup.getChannelGroup("default").add(ctx.channel());

}

@Override

public void channelInactive(ChannelHandlerContext ctx) throws Exception {

log.info("与客户端断开连接,通道关闭!");

// 从channelGroup通道组(广播)中删除

// 之后可以根据ip来进行分组

Channel channel = ctx.channel();

NamedChannelGroup.getChannelGroup("default").remove(channel);

WORK_CHANNEL_MAP.remove(channel);

}

public boolean userAuthentication(ChannelHandlerContext ctx,FullHttpRequest req) {

// 提取URI参数

QueryStringDecoder queryStringDecoder = new QueryStringDecoder(req.uri());

Map<String, List<String>> parameters = queryStringDecoder.parameters();

// 根据参数进行处理

List<String> userId = parameters.get("userId");

List<String> groupId = parameters.get("groupId");

if (CollectionUtil.isNotEmpty(userId) && CollectionUtil.isNotEmpty(groupId)) {

ctx.channel().attr(USER_ID_KEY).set(userId.get(0));

ctx.channel().attr(GROUP_ID_KEY).set(groupId.get(0));

return true;

}else {

return false;

}

}

private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {

// 检查是否升级到WebSocket

if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) {

// 如果不是WebSocket协议的握手请求,返回400 Bad Request响应

sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));

return;

}

// 构建握手响应

WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(

getWebSocketLocation(req), null, true);

handshaker = wsFactory.newHandshaker(req);

if (handshaker == null) {

// 如果不支持WebSocket版本,返回HTTP 426 Upgrade Required响应

WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());

} else {

handshaker.handshake(ctx.channel(), req);

// 进行WebSocket握手

// 在认证成功后,设置用户ID到Channel属性中

boolean authentication = userAuthentication(ctx,req);// 这里需要实现用户认证逻辑

if (!authentication) {

// 用户认证失败,可能需要关闭连接或发送认证失败消息

// 1. 关闭连接:

ctx.close();

// 2. 发送认证失败消息给客户端:

String failureMessage = "认证失败,请提供有效的身份验证信息。";

ctx.writeAndFlush(failureMessage);

return;

}

// 其他逻辑...

WORK_CHANNEL_MAP.put(ctx.channel(), ctx.channel().attr(GROUP_ID_KEY).get());

}

}

private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) {

// 发送HTTP响应

if (res.status().code() != 200) {

ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);

res.content().writeBytes(buf);

buf.release();

HttpUtil.setContentLength(res, res.content().readableBytes());

}

ChannelFuture future = ctx.channel().writeAndFlush(res);

if (!HttpUtil.isKeepAlive(req) || res.status().code() != 200) {

future.addListener(ChannelFutureListener.CLOSE);

}

}

private String getWebSocketLocation(FullHttpRequest req) {

return "ws://" + req.headers().get(HttpHeaderNames.HOST) + req.uri();

}

private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {

// 处理WebSocket消息,可以根据实际需求进行处理

if (frame instanceof TextWebSocketFrame) {

// 处理文本消息

String text = ((TextWebSocketFrame) frame).text();

System.out.println("Received message: " + text);

// 可以在这里处理WebSocket消息并发送响应

// ...

} else if (frame instanceof BinaryWebSocketFrame) {

// 处理二进制WebSocket消息

// ...

System.out.println("123");

} else if (frame instanceof CloseWebSocketFrame) {

// 处理WebSocket关闭请求

handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());

} else if (frame instanceof PingWebSocketFrame) {

// 处理WebSocket Ping消息

System.out.println("cs");

ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));

}

}

@Override

protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {

if (msg instanceof FullHttpRequest) {

// 处理HTTP握手请求

handleHttpRequest(ctx, (FullHttpRequest) msg);

} else if (msg instanceof WebSocketFrame) {

// 处理WebSocket消息

handleWebSocketFrame(ctx, (WebSocketFrame) msg);

}

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {

// 发生异常时的处理

log.error(cause.getMessage());

ctx.close();

}

}

创建NamedChannelGroup

package com.todoitbo.baseSpringbootDasmart.netty;

import io.netty.channel.group.ChannelGroup;

import io.netty.channel.group.DefaultChannelGroup;

import io.netty.util.concurrent.GlobalEventExecutor;

import java.util.Map;

import java.util.concurrent.ConcurrentHashMap;

public class NamedChannelGroup{

private String groupName;

public static Map<String,ChannelGroup> channelGroupMap = new ConcurrentHashMap<>();

static {

channelGroupMap.put("default", new DefaultChannelGroup(GlobalEventExecutor.INSTANCE));

}

public static void setGroupName(String groupName){

channelGroupMap.put(groupName, new DefaultChannelGroup(GlobalEventExecutor.INSTANCE));

}

public static ChannelGroup getChannelGroup(String groupName){

return channelGroupMap.get(groupName);

}

}

Server-Sent Events (SSE)实现

创建DataManager

package com.todoitbo.baseSpringbootDasmart.sse;

import org.springframework.http.MediaType;

import org.springframework.stereotype.Component;

import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;

import java.util.ArrayList;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

/**

* 数据管理器用于管理Server-Sent Events (SSE) 的订阅和数据推送。

* @author xiaobo

*/

@Component

public class DataManager {

private final Map<String, List<SseEmitter>> dataEmitters = new HashMap<>();

/**

* 订阅特定数据类型的SSE连接。

*

* @param dataType 要订阅的数据类型

* @param emitter SSE连接

*/

public void subscribe(String dataType, SseEmitter emitter) {

dataEmitters.computeIfAbsent(dataType, k -> new ArrayList<>()).add(emitter);

emitter.onCompletion(() -> removeEmitter(dataType, emitter));

emitter.onTimeout(() -> removeEmitter(dataType, emitter));

}

/**

* 推送特定数据类型的数据给所有已订阅的连接。

*

* @param dataType 要推送的数据类型

* @param data 要推送的数据

*/

public void pushData(String dataType, String data) {

List<SseEmitter> emitters = dataEmitters.getOrDefault(dataType, new ArrayList<>());

emitters.forEach(emitter -> {

try {

emitter.send(SseEmitter.event().data(data, MediaType.TEXT_PLAIN));

} catch (IOException e) {

removeEmitter(dataType, emitter);

}

});

}

private void removeEmitter(String dataType, SseEmitter emitter) {

List<SseEmitter> emitters = dataEmitters.get(dataType);

if (emitters != null) {

emitters.remove(emitter);

}

}

}

接口实现

package com.todoitbo.baseSpringbootDasmart.controller;

import com.todoitbo.baseSpringbootDasmart.sse.DataManager;

import org.springframework.http.MediaType;

import org.springframework.http.ResponseEntity;

import org.springframework.web.bind.annotation.GetMapping;

import org.springframework.web.bind.annotation.PathVariable;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RestController;

import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import javax.annotation.Resource;

/**

* @author xiaobo

*/

@RestController

@RequestMapping("/environment")

public class EnvironmentController {

@Resource private DataManager dataManager;

@GetMapping(value = "/subscribe", produces = MediaType.TEXT_EVENT_STREAM_VALUE)

public SseEmitter subscribe() {

SseEmitter emitter = new SseEmitter();

dataManager.subscribe("environment", emitter);

return emitter;

}

// 示例:推送环境监测数据给前端

@GetMapping("/push/{testText}")

public ResponseEntity<String> pushEnvironmentData(@PathVariable String testText) {

dataManager.pushData("environment", testText);

return ResponseEntity.ok("Data pushed successfully.");

}

}

实现说明

每个不同类型的数据推送都需要一个对应的SSE订阅端点(subscribe)。每个数据类型都有一个对应的订阅端点,用于前端建立SSE连接,并在后端接收和处理特定类型的数据推送。

在你的后端应用中,对于每种数据类型,你需要创建一个对应的Controller或处理器来处理该数据类型的SSE订阅。每个Controller会有自己的SSE订阅端点,前端可以连接到不同的端点以接收相应类型的数据。

这种方式允许你将不同类型的数据推送逻辑分离,使代码更具可维护性和可扩展性。当有新的数据可用时,只需调用相应类型的数据推送方法,而不必修改通用的SSE管理逻辑。

前端实现

<!DOCTYPE html>

<html>

<head>

<title>SSE Data Receiver</title>

</head>

<body>

<h1>Real-time Data Display</h1>

<div id="data-container"></div>

<script>

const dataContainer = document.getElementById('data-container');

// 创建一个 EventSource 对象,指定 SSE 服务器端点的 URL

const eventSource = new EventSource('http://127.0.0.1:13024/environment/subscribe'); // 根据你的控制器端点来设置URL

// 添加事件处理程序,监听服务器端发送的事件

eventSource.onmessage = (event) => {

const data = event.data;

// 在这里处理从服务器接收到的数据

// 可以将数据显示在页面上或进行其他操作

const newDataElement = document.createElement('p');

newDataElement.textContent = data;

dataContainer.appendChild(newDataElement);

};

eventSource.onerror = (error) => {

// 处理连接错误

console.error('Error occurred:', error);

};

</script>

</body>

</html>

弊端以及解决方案

如果你没什么处理的话,在它首次调用subscribe时候可能会出现连接超时的问题,因为这个是一个长连接,出现这种问题是因为,此时并没有数据产生,至此,除非你刷新页面,否则即使有数据产生前端也不会受到了

你希望前端在第一次订阅SSE连接后,即使后端没有数据产生,之后也能接收到数据。这可以通过以下方式来实现:

保持持久连接: 确保前端建立的SSE连接是持久性连接,不会在第一次连接成功后关闭。这可以让连接一直保持打开状态,即使后端没有即时数据产生。你可以在前端代码中使用以下方式来确保连接持久:

const eventSource = new EventSource('/environment/subscribe');

默认情况下,EventSource对象会自动重连,以保持连接的持久性。

定期发送心跳数据: 在后端定期发送一些心跳数据,以确保连接保持活跃。这可以防止连接超时关闭。你可以在后端定期发送一个包含无用信息的SSE事件,例如:

@Scheduled(fixedRate = 30000) // 每30秒发送一次心跳数据

public void sendHeartbeat() {

dataManager.pushData("heartbeat", "Heartbeat data");

}

前端可以忽略这些心跳事件,但它们会保持连接处于活跃状态。

前端自动重连: 在前端代码中添加自动重连逻辑,以处理连接断开的情况。这样,如果连接由于某种原因断开,前端会自动尝试重新建立连接。示例:

const eventSource = new EventSource('/environment/subscribe');

eventSource.onerror = (error) => {

// 处理连接错误

console.error('Error occurred:', error);

// 重新建立连接

eventSource.close();

setTimeout(() => {

// 重新建立连接

eventSource = new EventSource('/environment/subscribe');

}, 1000); // 1秒后重试

};

通过结合上述方法,你可以确保前端能够建立并保持持久SSE连接,即使后端没有即时数据产生。这样,一旦后端有数据产生,前端也可以接收到数据而无需重新订阅。



声明

本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。