Fastapi集成SSE服务后端主动推送消息到前端

blackcatlzm 2024-06-22 08:09:09 阅读 99

sse技术简介

SSE(Server-Sent Events)是一种允许服务器向客户端浏览器推送信息的技术。它是 HTML5 的一部分,专门用于建立一个单向的从服务器到客户端的通信连接。SSE的使用场景非常广泛,包括实时消息推送、实时通知更新等。

严格地说,HTTP 无法做到服务器主动推送信息。但是,有一种变通方法,就是服务器向客户端声明,接下来要发送的是流信息(streaming)。

也就是说,发送的不是一次性的数据包,而是一个数据流,会连续不断地发送过来。这时,客户端不会关闭连接,会一直等着服务器发过来的新的数据流,视频播放就是这样的例子。本质上,这种通信就是以流信息的方式,完成一次用时很长的下载。

SSE 就是利用这种机制,使用流信息向浏览器推送信息。

FastAPI集成SSE功能

配置环境

pip install -i https://pypi.tuna.tsinghua.edu.cn/simple sse-starlette fastapi[all]

代码实现:

try:

from sse_starlette.sse import EventSourceResponse

import asyncio

except:

import os

os.system('pip install -i https://pypi.tuna.tsinghua.edu.cn/simple sse-starlette')

from sse_starlette.sse import EventSourceResponse

import asyncio

@router.get("/sse")

async def root(request: Request):

async def event_generator(request: Request):

while True:

if await request.is_disconnected():

print("连接已中断")

break

sse_test_data = gl.gl_Redis.hgetall('sse:test:data')

sse_test_data['warn_site'] = sse_test_data['warn_site'].split(',') if sse_test_data.get('warn_site') else []

yield {

"event": "message",

# "retry": 15000,

"data": demjson.encode(sse_test_data)

}

await asyncio.sleep(10)

g = event_generator(request)

return EventSourceResponse(g)

SEE客户端测试脚本

import aiohttp

import asyncio

async def test():

headers = {'Content-Type': 'text/event-stream'}

sseresp = aiohttp.request("GET", r"http://127.0.0.1:9000/api/sse", headers=headers)

async with sseresp as r:

async for chunk in r.content.iter_any():

print(chunk.decode())

if __name__ == '__main__':

loop = asyncio.get_event_loop()

loop.run_until_complete(test())

【特别提示】在现在大多数webviews框架打包APP时,sse服务无效!!!

本文由博客一文多发平台 OpenWrite 发布!



声明

本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。