golang使用sse事件流调用AI大模型
十假杰出青年 2024-07-15 16:01:01 阅读 53
目录
前言第一步 解决没有官方SDK的痛第二步 实现流式传输什么是SSE,SSE和WebSocket的区别基于gin实现SSE服务器gin接收AI大模型数据流响应1. 前端携带自定义问题请求后端接口2. 后端接受请求解析问题,然后创建stream对象3. 构建请求参数,调用创建数据流客户端接口4. 调用http客户端,发起请求,将响应结果封装在数据流对象里面5. 获取到数据流,然后就监听返回的数据流6. 具体接受数据流的方法7. 就是把解析到的数据流放入通道,C.Stream监听通道获取流,使用C.sseevent返回前端8. over,你就可以看到效果了。
如有不足,请指正
前言
本次Ai大模型,我们选择清华大学出品的智谱AI大模型,为什么呢?因为chatgpt已经有开源第三方库,可以直接调,但是要科学上网(借助代理也是可是实现的)。只要是公司业务有这方面的需求。。。
第一步 解决没有官方SDK的痛
接口文档
没办法,照着python,java比葫芦画瓢,自己造。
我们来到非SDK用户这边,文档说先获取APIkey,然后根据APIkey生成JWTtoken,把token加到请求头,然后使用http请求就可以了。以下是golang代码:
<code>// 这边我已经封装成了方法,只需要传入apikey,token过期时间
token, err := go_ZhiPuAI.GenerateJwtToken(global.GvaConfig.ZhiPuAI.ApiKey, global.GvaConfig.ZhiPuAI.ExpSeconds)
//这是具体代码
func GenerateJwtToken(apiKey string, expSeconds int) (string, error) {
// 分割apiKey以获取id和secret
parts := strings.Split(apiKey, ".")
if len(parts) != 2 {
return "", fmt.Errorf("invalid apiKey: %v", parts)
}
id, secret := parts[0], parts[1]
// 创建JWT的payload
claims := jwt.MapClaims{
"api_key": id,
"exp": time.Now().Unix()*1000 + int64(expSeconds)*1000,
"timestamp": time.Now().Unix() * 1000,
}
// 创建一个新的Token对象,并指定签名算法和claims
token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims)
// 添加headers
token.Header["alg"] = "HS256"
token.Header["sign_type"] = "SIGN"
// 使用secret对token进行签名
tokenString, err := token.SignedString([]byte(secret))
if err != nil {
return "", err
}
return tokenString, nil
}
将鉴权 token 放入 HTTP 请求的 header 中
用户需要将生成的鉴权 token 放入 HTTP 的 Authorization header 头中:
Authorization: 鉴权token
Example:curl请求中的token参数示例
curl --location 'https://open.bigmodel.cn/api/paas/v4/chat/completions' \
--header 'Authorization: Bearer <你的token>' \
--header 'Content-Type: application/json' \
--data '{
"model": "glm-4",
"messages": [
{
"role": "user",
"content": "你好"
}
]
}'
这样我们就可以访问了。
第二步 实现流式传输
这是python的代码示例,我们只需在请求字段里加上”stream=True“即可开启流式传输
from zhipuai import ZhipuAI
client = ZhipuAI(api_key="") # 请填写您自己的APIKeycode>
response = client.chat.completions.create(
model="glm-4", # 填写需要调用的模型名称code>
messages=[
{ "role": "system", "content": "你是一个乐于解答各种问题的助手,你的任务是为用户提供专业、准确、有见地的建议。"},
{ "role": "user", "content": "我对太阳系的行星非常感兴趣,特别是土星。请提供关于土星的基本信息,包括其大小、组成、环系统和任何独特的天文现象。"},
],
stream=True,
)
for chunk in response:
print(chunk.choices[0].delta)
因为python官方库已经实现好了,只需一个for循环,golang又要自己动手实现,淦。。
什么是SSE,SSE和WebSocket的区别
WebSocket:一种双向通信协议,同时支持服务端和客户端之间的实时交互。WebSocket 是基于 TCP 的长连接,和HTTP 协议相比,它能实现轻量级的、低延迟的数据传输,非常适合实时通信场景,主要用于交互性强的双向通信。
SSE(Server-Sent Events)是一种基于 HTTP 协议的推送技术。服务端可以使用 SSE
来向客户端推送数据,但客户端不能通过SSE向服务端发送数据。相较于 WebSocket,SSE 更简单、更轻量级,但只能实现单向通信。
SSE(Server-Sent Events)和 WebSocket 都是用于实现服务器与客户端之间实时双向通信的技术。虽然它们都可以用于实时更新数据,但它们在实现方式、特点和适用场景上有着明显的区别。
两者的主要区别:
SSE | WebSocket | |
---|---|---|
通信 | 单向通信 | 双向通信 |
协议 | HTTP | WebSocket |
自动重连 | 支持 | 不支持,需要客服端自行支持 |
数据格式 | 文本格式 | 二进制数据、文本格式 |
跨域 | 不支持(若跨域需配置指定的Access-Control-Allow-Origin) | 支持 |
适用场景 | SSE 适用于需要服务器向客户端单向实时推送数据的场景,例如实时更新的新闻、股票行情等。优点:简单易用,对服务器压力小,浏览器兼容性好。缺点:只支持单向通信,无法进行双向交互。 | 适用于需要客户端和服务器之间实时双向通信的场景,例如聊天室、实时协作应用等。优点:支持双向通信,实时性更高,可以实现更丰富的交互效果。缺点:需要独立的 TCP 连接,对服务器压力更大,浏览器兼容性相对较差。 |
基于gin实现SSE服务器
//前端代码
<!DOCTYPE html>
<html>
<head>
<title>SSE test</title>
<script type="text/javascript">code>
// 向后端服务器发起sse请求
const es = new EventSource("http://127.0.0.1:9000/v1/VoiceoverScript/chat");
// 监听事件流
es.onmessage = function (e) {
document.getElementById("test")
.insertAdjacentHTML("beforeend", "<li>" + e.data + "</li>");
console.log(e);
}
// 监听”chat“事件流
es.addEventListener("chat", (e) => {
document.getElementById("test")
.insertAdjacentHTML("beforeend", "<a>" + e.data + "</a>");
console.log(e)
});
es.onerror = function (e) {
// readyState说明
// 0:浏览器与服务端尚未建立连接或连接已被关闭
// 1:浏览器与服务端已成功连接,浏览器正在处理接收到的事件及数据
// 2:浏览器与服务端建立连接失败,客户端不再继续建立与服务端之间的连接
console.log("readyState = " + e.currentTarget.readyState);
}
</script>
</head>
<body>
<h1>SSE test</h1>
<div>
<ul id="test">code>
</ul>
</div>
</body>
</html>
//后端代码
//注意 **我注释的代码,是不使用gin框架封装的Stream方法,也就是C.Stream(func())和C.ssevent(),只是C.Stream要改成for循环持续的从通道里面进行读,直到通道关闭,结束for循环**
package main
import (
"fmt"
"github.com/gin-gonic/gin"
"io"
"testing"
"time"
)
func SSE(c *gin.Context) {
// 设置响应头,告诉前端适用event-stream事件流交互
//c.Writer.Header().Set("Content-Type", "text/event-stream")
//c.Writer.Header().Set("Cache-Control", "no-cache")
//c.Writer.Header().Set("Connection", "keep-alive")
// 判断是否支持sse
//w := c.Writer
//flusher, _ := w.(http.Flusher)
// 接收前端页面关闭连接通知
closeNotify := c.Request.Context().Done()
// 开启协程监听前端页面是否关闭了连接,关闭连接会触发此方法
go func() {
<-closeNotify
fmt.Println("SSE关闭了")
return
}()
//新建一个通道,用于数据接收和响应
Chan := make(chan string, 10)
// 异步接收GPT响应,然后把响应的数据发送到通道Chan
go func() {
// 记得关闭通道
defer close(Chan)
// 模拟gpt回复
s := `在远古时代的一个神秘而神奇的大陆上,有着一座被人们称为“永恒之城”的城市。这座城市建立在一座巍峨的山脉之中,被壮丽的自然景观所环绕。`
//
for _, char := range s {
Chan <- string(char)
// 模拟时间卡顿
time.Sleep(time.Second * 1)
}
}()
// gin框架封装的stream,会持续的调用这个func方法,记得返回true;返回false代表结束调用func方法
c.Stream(func(w io.Writer) bool {
i := <-Chan
c.SSEvent("chat", i) // c.SSEvent会自动修改响应头为事件流,并发送”test“事件流给前端监听”test“的回调方法
//flusher.Flush() // 确保立即发送
return true
})
}
func TestSSE(t *testing.T) {
engine := gin.Default()
// 设置跨域中间件
engine.Use(func(context *gin.Context) {
origin := context.GetHeader("Origin")
// 允许 Origin 字段中的域发送请求
context.Writer.Header().Add("Access-Control-Allow-Origin", origin) // 这边我的前端页面在63342,会涉及跨域,这个根据自己情况设置,或者直接设置为”*“,放行所有的
// 设置预验请求有效期为 86400 秒
context.Writer.Header().Set("Access-Control-Max-Age", "86400")
// 设置允许请求的方法
context.Writer.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE, UPDATE, PATCH")
// 设置允许请求的 Header
context.Writer.Header().Set("Access-Control-Allow-Headers", "Content-Type, Content-Length, Apitoken")
// 设置拿到除基本字段外的其他字段,如上面的Apitoken, 这里通过引用Access-Control-Expose-Headers,进行配置,效果是一样的。
context.Writer.Header().Set("Access-Control-Expose-Headers", "Content-Length, Access-Control-Allow-Headers")
// 配置是否可以带认证信息
context.Writer.Header().Set("Access-Control-Allow-Credentials", "true")
// OPTIONS请求返回200
if context.Request.Method == "OPTIONS" {
fmt.Println(context.Request.Header)
context.AbortWithStatus(200)
} else {
context.Next()
}
})
engine.GET("/v1/VoiceoverScript/chat", SSE) // 记得适用get请求,我用post前端报404,资料说是SSE只支持get请求
engine.Run(":9000")
}
gin接收AI大模型数据流响应
1. 前端携带自定义问题请求后端接口
// 注意因为sse要求,使用了Get请求
func InitVoiceoverScriptRouter(engine *gin.Engine) {
// 需要登录
tokenGroup := engine.Group("/v1/VoiceoverScript").Use(middleware.JWTAuthMiddleware())
{
tokenGroup.GET("/chat", v1.StartChat)
}
}
2. 后端接受请求解析问题,然后创建stream对象
question := c.Query("question")
user, _ := c.Get("user")
u, ok := user.(*model.User)
if !ok {
response.ResponseError(c, response.CodeInvalidToken)
}
// 获取Ai大模型数据流
stream, err := ai.ProcessService(c, &request.Request{ Prompt: question}, int64(u.UserID))
if err != nil {
global.GvaLogger.Sugar().Error("aiClient Process bizError: %v", err)
response.ResponseErrorWithMsg(c, response.ServerError, "错误:"+err.Error())
return
}
defer stream.Close()
....
3. 构建请求参数,调用创建数据流客户端接口
func ProcessService(ctx *gin.Context, r *Request, uid int64) (stream *go_ZhiPuAI.ChatCompletionStream, err error) {
//TODO 根据用户id更新减少用户的使用次数
chatRequest := go_ZhiPuAI.ChatCompletionRequest{
Model: "glm-4",
Messages: []go_ZhiPuAI.ChatCompletionMessage{
{
Role: go_ZhiPuAI.ChatMessageRoleAssistant,
Content: "你是一个聪明且富有创造力的世界上最通情达理的人"},
{
Role: go_ZhiPuAI.ChatMessageRoleUser,
Content: r.Prompt,
},
},
Stream: true,
}
token, err := go_ZhiPuAI.GenerateJwtToken(global.GvaConfig.ZhiPuAI.ApiKey, global.GvaConfig.ZhiPuAI.ExpSeconds)
if err != nil {
return nil, err
}
client := go_ZhiPuAI.NewClientWithConfig(
go_ZhiPuAI.ClientConfig{
AuthToken: token,
BaseURL: global.GvaConfig.ZhiPuAI.BaseUrl,
HTTPClient: &http.Client{ },
},
)
if client == nil {
global.GvaLogger.Sugar().Error("智普客户端初始化失败!")
return nil, errors.New("key失效")
}
// 主要是这里
// 携带自定义请求参数请求然后返回数据流对象
completionStream, err := client.CreateChatCompletionStream(ctx, chatRequest)
if err != nil {
global.GvaLogger.Sugar().Error("aiClient client.CreateChatCompletion bizError:%v", err)
//goUtil.New(func() { 起一个协程把当前key的状态设为0,不可用
//refreshKey(gptClient)
//})
return nil, err
}
return completionStream, nil
}
4. 调用http客户端,发起请求,将响应结果封装在数据流对象里面
func (c *Client) CreateChatCompletionStream(
ctx context.Context,
request ChatCompletionRequest,
) (stream *ChatCompletionStream, err error) {
urlSuffix := "/chat/completions"
//if !checkEndpointSupportsModel(urlSuffix, request.Model) {
//err = ErrChatCompletionInvalidModel
//return
//}
request.Stream = true
req, err := c.newStreamRequest(ctx, "POST", urlSuffix, request)
if err != nil {
return
}
resp, err := c.config.HTTPClient.Do(req) //nolint:bodyclose // body is closed in stream.Close()
if err != nil {
return
}
// 使用http客户端发起请求,把响应结果使用bufio.NewReader(resp.Body)存入数据流对象
stream = &ChatCompletionStream{
streamReader: &streamReader[ChatCompletionStreamResponse]{
emptyMessagesLimit: c.config.EmptyMessagesLimit,
reader: bufio.NewReader(resp.Body),
response: resp,
errAccumulator: newErrorAccumulator(),
unmarshaler: &jsonUnmarshaler{ },
},
}
return
}
5. 获取到数据流,然后就监听返回的数据流
这里就和之前的SSE服务器差不多的逻辑,就是开启协程监听大模型返回数据流,放入通道里面 主要是Stream.Recv()方法
// 获取Ai大模型数据流
stream, err := ai.ProcessService(c, &request.Request{ Prompt: question}, int64(u.UserID))
if err != nil {
global.GvaLogger.Sugar().Error("aiClient Process bizError: %v", err)
response.ResponseErrorWithMsg(c, response.ServerError, "错误:"+err.Error())
return
}
defer stream.Close()
// 通道
chanStream := make(chan *go_ZhiPuAI.ChatProcessResponse, 100)
// 异步协程
go func() {
defer stream.Close()
defer close(chanStream)
for i := 0; ; i++ {
streamResponse, err := stream.Recv()
if errors.Is(err, io.EOF) {
global.GvaLogger.Sugar().Debug("Stream finished")
chanStream <- go_ZhiPuAI.StopResponse
return
}
if err != nil {
global.GvaLogger.Sugar().Error("Stream error: %v\n", err)
chanStream <- go_ZhiPuAI.ErrorsResponse
return
}
if len(streamResponse.Choices) == 0 {
global.GvaLogger.Sugar().Debug("Stream finished")
chanStream <- go_ZhiPuAI.StopResponse
return
}
choice := streamResponse.Choices[0]
data := &go_ZhiPuAI.ChatProcessResponse{
ID: streamResponse.ID,
Role: choice.Delta.Role,
Segment: go_ZhiPuAI.SegmentText,
DateTime: time.Now().Format("2006-01-02 15:04:05"),
Content: choice.Delta.Content,
ParentMessageID: go_ZhiPuAI.AssistantMessageId,
}
if i == 0 {
data.Segment = go_ZhiPuAI.SegmentStart
}
if choice.FinishReason == go_ZhiPuAI.SegmentStop {
data.Segment = go_ZhiPuAI.SegmentStop
}
chanStream <- data
if choice.FinishReason == go_ZhiPuAI.SegmentStop {
return
}
}
}()
6. 具体接受数据流的方法
SSE数据流格式 以data开头,/n/n结尾
data:{“id”:“chatcmpl-7wMdSo9fWVTEGzhbuJXEkBBx85boW”,“object”:“chat.completion.chunk”,“created”:1694144390,“model”:“gpt-3.5-turbo-0613”,“choices”:[{“index”:0,“delta”:{“role”:“assistant”,“content”:“”},“finish_reason”:null}]}
代码大概意思就是使用reader每次读到/n就停止,这代表一条消息,然后去除data,序列化拿到想要的内容
数据流以[DONE]结尾,代表数据流传输结束
func (stream *streamReader[T]) Recv() (response T, err error) {
if stream.isFinished {
err = io.EOF
return
}
var emptyMessagesCount uint
waitForData:
line, err := stream.reader.ReadBytes('\n')
li, _ := stream.reader.ReadBytes('\n')
fmt.Println(li)
if err != nil {
respErr := stream.errAccumulator.unmarshalError()
if respErr != nil {
err = fmt.Errorf("error, %w", respErr.Error)
}
return
}
var headerData = []byte("data: ")
line = bytes.TrimSpace(line)
if !bytes.HasPrefix(line, headerData) {
if writeErr := stream.errAccumulator.write(line); writeErr != nil {
err = writeErr
return
}
emptyMessagesCount++
if emptyMessagesCount > stream.emptyMessagesLimit {
err = ErrTooManyEmptyStreamMessages
return
}
goto waitForData
}
line = bytes.TrimPrefix(line, headerData)
if string(line) == "[DONE]" {
stream.isFinished = true
err = io.EOF
return
}
err = stream.unmarshaler.unmarshal(line, &response)
return
}
7. 就是把解析到的数据流放入通道,C.Stream监听通道获取流,使用C.sseevent返回前端
var msgList []*go_ZhiPuAI.ChatProcessResponse
c.Stream(func(w io.Writer) bool {
if msg, ok := <-chanStream; ok {
if msg == go_ZhiPuAI.ErrorsResponse {
return false
}
msgList = append(msgList, msg)
//marshal, _ := json.Marshal(msg)
c.SSEvent("chat", msg.Content)
//flusher.Flush() //确保立即发送
if msg == go_ZhiPuAI.StopResponse {
return false
}
return true
}
return true
})
// 将会话存入数据库
goUtil.New(func() {
})
8. over,你就可以看到效果了。
如有不足,请指正
声明
本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。