如何设计并实现一个好用的大模型套壳站?
我在 2021 年时就开始用 GitHub Copilot 写代码了,2022 年 12 月初刷推特时看到了 ChatGPT,立刻注册了个号玩了下。大模型的这波风口我看到的很早,但却没有做什么行动。那个时候的自己感觉不管做什么起步都已经晚了,套壳站已经满天飞了,OpenAI 的 API Key 也被人卖的差不多了,已经没有什么新的玩法了。
今年过年的时候 DeepSeek 火了,我才惊讶地发现,几年过去了, 豆包、混元、千问虽然在业内打得不可开交,但还是有太多的人至今没有接触过这些大模型应用。我在推特上看到个喷子,喷 DeepSeek 的点居然是问今天天气怎么样,它回答不出来。很多人对这种对话式 AI 的概念还停留在 10 年前的 Siri 等手机语音助手上。换句话说,下沉市场还是一片蓝海。
刚好之前看到腾讯混元大模型的最低配模型 hunyuan-lite 居然是免费的!那我们不如也来试试当一回二道贩子,尝试自己做一个大模型套壳站,会不会有人用我不知道,但开发的过程一定很有意思。
感兴趣的功能排除掉写了一万遍的用户注册登录和一堆 CRUD,我对以下功能的实现原理很感兴趣:
-
SSE 代理:怎样将腾讯云大模型的 SSE 和自己的对话接口接起来?
-
SSE 断点续传:对话生成过程中如果页面刷新了,重新进入时怎样继续生成当前回答?(⚠️ 实践后发现这是最难实现的功能,边缘情况很多)
-
怎样生成对话标题?
-
每次对话的 Token 如何计算?单次对话的 Token 数如何限制?
开始逐个分析上述功能之前,我们先来看看社区做得怎么样了。我按 stars 排序随便挑了几个感兴趣的项目,简单读了下他们的代码后,我信心大增哈哈哈。🤣
https://github.com/ChatGPTNextWeb/NextChat TypeScript 82.1k这应该是大家最初自建套壳站时使用的了,使用 TypeScript 编写。功能中规中矩,我发现了两个有意思的点:
// https://github.com/ChatGPTNextWeb/NextChat/blob/48469bd8ca4b29d40db0ade61b57f9be6f601e01/app/client/api.ts#L197-L201 .concat([ { from: "human", value: "Share from [NextChat]: https://github.com/Yidadaa/ChatGPT-Next-Web", }, ]); // 敬告二开开发者们,为了开源大模型的发展,请不要修改上述消息,此消息用于后续数据清洗使用 // Please do not modify this messageNextChat 在生成公开的对外分享链接时,会在对话最后加上 Share from [NextChat] 的标识。目的是为了后续训练大模型时,能够分辨出哪些是人工产生的数据,哪些是以往的大模型生成的,进而清洗过滤掉大模型生成的内容。
细想一下还挺意思的,“2022 年” 像是一道屏障一样,将互联网上的文字内容隔开来了。2022 年以后的内容,读起来就得留个心眼了,凡是看到 “综上所述” “总的来说” 这些字眼,难免会怀疑是否是用 AI 生成的。它像是泄露的核废水一样,随着时间的推移逐渐蔓延并浸染整片知识的海洋。
// https://github.com/ChatGPTNextWeb/NextChat/blob/48469bd8ca4b29d40db0ade61b57f9be6f601e01/app/locales/cn.ts#L626-L632 Prompt: { History: (content: string) => "这是历史聊天总结作为前情提要:" + content, Topic: "使用四到五个字直接返回这句话的简要主题,不要解释、不要标点、不要语气词、不要多余文本,不要加粗,如果没有主题,请直接返回“闲聊”", Summarize: "简要总结一下对话内容,用作后续的上下文提示 prompt,控制在 200 字以内", },NextChat 的这段代码解答了上面的问题 3 —— 对话标题是使用一段简短的 Prompt + 一个较小的模型生成的。转而一想,这里其实可能存在 Prompt 注入,只是没什么危害罢了。
https://github.com/open-webui/open-webui Python 84.8kopen-webui 的前端做出了高仿 OpenAI 的风格。使用 Python Web 异步库 starlette 返回 SteamingResponse 来代理 SSE 接口。它也实现了对话标题生成的功能,Prompt 上比 NextChat 长很多,并且要求以 JSON 格式返回。
我担心的点是,标题生成本身用的就是小模型,这么长的 Prompt 以及限定 JSON 格式输出,对小模型而言会不会不稳定。🤔
至于并发限流、以及对话的 Token 吞吐量限制,open-webui 写了一个路由中间件解决,这里就不再赘述了。
https://github.com/yangjian102621/geekai Go 4.1k因为我使用 Go 来编写后端,所以找了个 Stars 数很多的 Go 项目。作者应该是 PHP 转 Go 没多久,或者说是刚学编程没多久,这代码质量真的不敢恭维。
好好的 SSE 不用,画蛇添足用了 WebSocket,可从头至尾就没有需要客户端发送消息的场景。甚至这项目背后还接了个 xxl-job。😅 他能获得这么多 stars 只是因为把支付那块也给做完了,小白可以即开即用拿去做套壳。但从代码的可维护性和整洁度上来说,真是一团糟。我都想做个《鉴定网络奇葩代码》短视频了。
这个故事告诉我们,技术好不好不重要,能把事情做完最重要。
https://github.com/swuecho/chat Go 538同样是 Go 项目,这个国外老哥写得代码就好多了。他使用了 langchaingo 来构造拼接对话。说实话我内心觉得这些库用起来挺花里胡哨的,又是什么模板,什么占位符,什么对话链,但最终做的事还是在拼字符串,拼出一个 Prompt 发给大模型。😁
老哥使用了 langchaingo 自带的 summarization 来做对话总结,本质上也是 langchaingo 内置了一段 Prompt。
而关于问题 4,如何计算 Token 数量,由于这个项目支持的模型都是 OpenAI 家的,因此直接使用的 OpenAI 开源的 tiktoken 来进行计算(国会听证会警告)。tiktoken 有 Go 封装的开源实现:github.com/pkoukk/tiktoken-go。
其余的一些项目我有点看不下去了,不如直接开写吧!
数据结构回忆一下,我们是怎样用豆包或元宝的,在页面左侧有一个对话列表,点开对话后可以看到我们发送的和 AI 回复的消息。因此需要创建 Chat (对话)和 Message (消息)两张表。
- Chat 对话表
- Message 消息表
有坑注意!
这里的 ID 均使用 Snowflake 算法生成,Snowflake 生成的是 19 位数字,这在 Go int64 下没问题,但在前端 JavaScript 下会丢失最后 4 位的精度。即 1906281281029672960 在前端会变成 1906281281029673000。
我用了一个简单粗暴且不靠谱的 HACK,将数字除以 1000,去除后三位。
消息表中的 ParentID 和 ChildrenIDs 字段,用于记录父子消息关系。就像豆包可以点击重新生成,进而再生成一条回复。
更复杂的像 ChatGPT,可以点击上文任意一条消息,新建一个分支重新生成对话。为了实现这样的效果,我们在创建一条新的消息记录时,需要 ParentID 指定它的父消息,并更新它父消息的 ChildrenIDs 字段,这俩包在一个数据库事务里做就行。
在需要构造大模型接口 JSON messages 参数时,只需从最后一条消息开始,沿着 ParentID 依次向上遍历,一直到 ParentID 为 0,即可拿到当前对话分支的消息列表。 前端实现像上图中豆包的“上一条”“下一条”翻页的效果,也只需取 ChildrenIDs 构造翻页即可。
这里再补充一些小细节,我发现腾讯元宝的消息 ID 使用 <对话ID>_<自增索引的格式> 表示,如 <对话ID>_1 <对话ID>_2 等,这从设计上使得元宝的对话只能是线性的。 用户只能重新生成最新一轮对话的消息,且不能在历史对话中重新生成创建分支。
实现最简单的 SSE关于 SSE 的简单介绍,可以去阅读我五年前写得 《聊聊 EventStream 服务器端推送》 这篇文章。大模型活了之后每个月都会有人在 Google 上搜 EventStream 搜到这篇。
腾讯云官方的 Go SDK 调用混元大模型时,客户端可以使用 SendOctetStream 方法,接收流式响应,此时 response 中返回的是 channel 类型的 SSEvent。我们可以先对混元大模型做简单的函数封装,从 SDK 的 channel 中提出大模型对话返回的 Content 正文,再打到函数返回值的 channel 中,精简后的代码如下:
func (h *Hunyuan) TextCompletions(ctx context.Context, input TextCompletionsInput) (chan string, error) { // ... eventsCh := response.BaseSSEResponse.Events // 腾讯云 SDK 输出 go func() { for event := range eventsCh { if event.Err != nil { logrus.WithContext(ctx).WithError(event.Err).Error("Failed to get event") break } eventData := event.Data var respParams hunyuan.ChatCompletionsResponseParams if err := json.Unmarshal(eventData, &respParams); err != nil { logrus.WithContext(ctx).WithError(err).Error("Failed to unmarshal event data") continue } if len(respParams.Choices) == 0 { break } choice := respParams.Choices[0] // 默认取第一个结果,貌似我从没见过会有第二个 outputChan <- *choice.Delta.Content // 打到函数返回值的 channel 里 } close(outputChan) }() // ... }我这里直接默认选第一个 Choices ,将 Content 正文放到 channel 里。JSON 反序列化那块,硬要扣的话也可以改用 sonic。
具体到对话接口的设计上,与那些自用的套壳站不同,我们是要给第三方用户使用的,在接口的入参上不能像那些自用站一样每次都将整个对话完整的 messages 发给后端处理,应该尽可能缩减用户前端可控的参数范围。前端只能传入对话 ID、父消息 ID、提问消息正文;历史消息链的拼接和 messages 参数的构造全都应该在后端完成。
对话接口先响应 Content-Type: text/event-stream 头,然后发送一条类型 event:metadata 的消息告诉前端当前对话 ID 和消息 ID,之后就从大模型的 channel 里读消息,写入 ResponseWriter 即可。
大模型接口返回的是逐 Token 生成的内容,这里其实又有一个抉择,SSE 的每条消息,是返回当下完整的消息内容,还是返回新增的 Token 内容呢?
// 返回当下完整的内容 {"v":"你好,很"} {"v":"你好,很高兴认识你"} // 返回新增内容 {"v":"你好,很"} {"v":"高兴"} {"v":"认识你"}现在大家都是选择后者。我担心的点是如果选择后者,前端拼接字符串时会不会有概率乱掉。我在不断测试豆包的时候遇到过一次,但这也是极端情况,实际后端文本是正常的,刷新一下就好了。因此我也随大流选择了返回每次新增的内容。😁
由于我前端处理 SSE 使用的是 eventsource-client 这个库,它在传入对话接口的 URL 后,就只能处理 SSE 格式的响应了。因此这个对话接口的报错,也只能以写入单条 SSE 消息的形式返回,使用 event: error 来区分。
对话标题生成在对话生成结束后,需要判断当前是否为新对话,是的话则需要再调用大模型,让其生成对话标题。生成的对话标题入库存储,同时 SSE 发送一条 event: title 类型的消息,通知前端更新页面上的对话标题。
我这里的 Prompt 写得比较粗糙,你可以根据上文中提到的 NextChat 和 open-webui 的 Prompt 自己再改改。以及是将提问内容放在单独的 user 消息中,还是直接拼在 System Prompt 中,这里也可以再钻研下。
if isNewChat { // Summarize the conversation title from LLM in a new conversation. summaryOutput, err := llmChat.TextCompletions(ctx.Request().Context(), llm.TextCompletionsInput{ Messages: []*llm.TextCompletionsMessage{ {Role: "system", Content: "请根据给出的提问内容,总结生成一个不超过 10 个字的对话标题,尽可能是陈述句,仅输出对话标题,不要有任何其他的内容。"}, {Role: "user", Content: "提问内容:`" + content + "`"}, }, SSE: false, }) if err != nil { logrus.WithContext(ctx.Request().Context()).WithError(err).Error("Failed to get text completions of summary title") } else { for title := range summaryOutput { if err := db.Chats.Update(ctx.Request().Context(), chat.ID, db.UpdateChatOptions{Title: title}); err != nil { logrus.WithContext(ctx.Request().Context()).WithError(err).Error("Failed to update chat title") } // Set the title to the SSE response if the context is not canceled. _ = ctx.SSEResponse("title", title) } } } 对话 Token 计算混元大模型本身提供了 GetTokenCount 接口用于计算消息中的 Token 数,20 QPS 的限制还不收费,足够我们使用了。
从处理流程上来说,用户发起提问时,调用 GetTokenCount 计算提问的 Token 数;回答生成完毕后,计算并更新回答所消耗的 Token 数。为未来可能要做的 Token 付费功能铺垫。进一步,如果还要做不同套餐的上下文长度的限制,提问的长度在开始提问的时就进行判断,而对于大模型回答的长度,则是在调大模型接口时使用 max_tokens 参数限制。
然而混元的 SDK 好像不能指定这个参数,只有走 OpenAI 兼容接口调用时才支持。
我画了一张流程图来梳理目前的整个过程,带 🚀 小火箭图标的意味着这一步可以开个 goroutine 异步进行。如果上述流程没问题,那就请做紧抓稳了,我们后面要引入 SSE 断点续传功能了。
SSE 断点续传这是一个各家大厂都支持的功能,但网上好像还没人讨论应该如何实现,我在相关的大模型套壳开源项目中也没有看到。
具体来说就是,在用户提问后,前端调用了上述对话接口,页面开始逐字打出大模型的回答。就在这时用户突然刷新了页面,或者在新的浏览器标签页中打开了网页,页面上应该要接着之前的回答继续生成完。我称之为“SSE 断点续传”。
我们拆解一下这个需求,最终的效果应该是:
- 用户提问后,刷新页面,页面要能继续接着之前的回答内容生成。
- 用户提问后,点击「停止」按钮,生成停止;刷新页面,要停在之前的回答内容上。
- 用户提问后,刷新页面,页面继续生成;点击「停止」按钮,生成停止;再刷新页面,要停在之前的回答内容上。
- 用户提问后,又在新浏览器窗口打开对话,此时两个窗口要同步生成;点击「停止」按钮,两个窗口要近乎同时停止。
在前文中,我们直接将大模型的 Channel 和当前请求的 Response Channel 接在一起,一旦 SSE 请求被中断,HTTP 请求的 Context Cancel 后,会连带着混元大模型 SDK 生成请求的 Context 一起停止。因此,我们第一步应该是将大模型生成请求独立到一个 goroutine 中进行,且 Context 与外部 HTTP Context 隔离。
不管是刷新还是新开多个浏览器页面,都要能获取到之前已生成的回答内容,那么生成的内容就得找个地方存下来。这个“存下来”还不是持久化存储,因为回答生成完毕后,就会入库存到 Messages 表的 Content 字段中。我们要的是一个性能好的临时存储,它最好还自带过期功能,还支持多个浏览器接收的这种消息订阅分发模式,这里很容易能想到用 Redis。
Redis 关于消息订阅的功能有 PubSub 和 Stream。前者用于实现消息的发送与广播,但消息不会被持久化,发完就忘了;后者引入了消费组的概念,不同的消费组有单独的 position 来消费历史消息,甚至还支持 ACK 机制。那么结果就很明确了,我最终选择了 Redis Stream 来实现这个功能。
大模型生成请求在单独的 goroutine 中进行,生成的内容打到 Redis Stream 中,Stream 的 Key 使用 chat:message-stream:<message_id> 表示。每一个前端的 SSE 请求,都是从 chat:message-stream:<message_id> 中从头开始(游标为 0)间接读取消息并返回。
那么前端在进入页面后,又该如何知道当前对话还在生成中呢?我在每次调用大模型生成时都会在 Redis 里设置一个 Key,生成结束后删除。
set chat:conversation-status:<chatID> <messageID> 5*time.Minute前端获取对话基本信息的 HTTP API,会通过查看这个 Key 是否存在来判断当前对话是否正在生成。如果正在生成,就直接调对话接口,发送空提问消息来开启 SSE 开始拉取回答消息。
这个 chat:conversation-status:<chatID> 我还设置了 5 分钟的过期时间用于兜底,如果因为意外后端重启了,对话不至于一直卡在生成中的状态。
在对话结束后,chat:conversation-status:<chatID> chat:message-stream:<message_id> 这两个 Key 都会被删除,这里会存在一个 race 的极端情况:那就是前端通过 conversation-status 得知对话正在生成,这个时刻之后刚好对话生成结束,前端启动 SSE 后发现 message-stream 被删了,这样就拉不到历史消息了。因此我在删除 conversation-status 后延迟了 5 秒再删除 message-stream 。
// Delete the conversation status in redis. if err := redis.Get().Del(ctx, conversationStatusFlagKey).Err(); err != nil { logrus.WithContext(ctx).WithError(err).WithField("key", conversationStatusFlagKey).Error("Failed to delete redis key") } time.Sleep(5 * time.Second) // Delete the redis stream after the chat message completion is done. if err := redis.Get().Del(ctx, messageStreamKey).Err(); err != nil { logrus.WithContext(ctx).WithError(err).WithField("stream", messageStreamKey).Error("Failed to delete redis stream") }以上,我们就实现了 SSE 消息的断点续传了。但还有一个问题:用户点击前端的「停止」按钮后,我们要能够停掉 goroutine 里正在跑的大模型请求,确保生成的消息内容就停在当下。这里我单独加了个 POST /stop 接口,前端调用后会将 chat:conversation-status:<chatID> 从 Redis 中直接删掉。大模型生成的 goroutine 里再开一个 goroutine 来循环查看这个 Key 是否存在,如果不存在了,就直接关掉大模型请求的 Context:
// Scan for `conversationStatusFlagKey` // If the conversation status is not set, which means the conversation is stopped by the user. go func() { for { select { case <-ctx.Done(): return case <-llmCtx.Done(): return default: _, err := redis.Get().Get(ctx, conversationStatusFlagKey).Result() if errors.Is(err, redispkg.Nil) { llmCancel() return } time.Sleep(1 * time.Second) } } }()至此,我们就完成了上述 SSE 断点续传的 4 个需求,属实不容易,这里的设计我斟酌思考了很久。我也不知道大厂们是怎么做的,如果你有更好的设计或者想法,欢迎留言和我讨论。
具体落实到代码上会复杂些,因为还有更新对话标题、计算 Token 等流程,很多步骤又是可以异步进行的,但互相之间又会用不同的 Context 来同步状态,然后 goroutine 中还有一堆的 defer ,这块的代码我打算后续梳理下流程好好美化下,现在只是停留在可用的状态。
来看看豆包和元宝整个开发过程中,我时常会去看豆包和元宝是怎么做的,参考他们的设计是怎样的(期间要不停地抓包和翻压缩后的 JS 文件),也和大家分享下。
豆包的实现比较复杂,用户发送的消息在浏览器本地的 IndexDB 会存一份。当用户开启新对话提问后,由于这时新对话还没发送到后端,前端会给这个对话和消息生成一个本地的 Local ID。带着 Local ID 将请求发给后端。但正如我前面提到的,Local ID 这种由用户本地生成的数据,后端不应该给予过多的信任,因而对话 Local ID 仅被用在第一次后端生成对话 ID 前,当后端生成并返回了对话 ID 后,后续都用该对话 ID 进行查询;消息 ID 也是同理。最后接口参数会传 本地/后端 的 对话/消息 ID 共 4 个参数,但后端会优先使用后端生成的 ID。我对着这个接口排列组合测了多种情况,发现豆包都能很好的 handle 住。
由于豆包会把消息在本地存一份,因此在页面刷新后,它是知道上次 SSE 断在哪里的。观察豆包的 SSE 返回消息,它的 JSON 中有一个自增的 event_id 游标字段,断点续传时会带上这个 event_id,SSE 接口就只会返回在这之后的消息。这样做是为了省一点传输的流量吗(?
相对而言元宝就大道至简很多。除了我们上面提到的,元宝使用 <对话ID>_<自增索引的格式> 格式的消息 ID 记录线性的消息记录。关于断点续传,元宝是拿着对话 ID + 消息 ID 请求 /continue 接口,后端 SSE 返回全部历史消息和正在生成的消息。但如果再重放 /continue 接口请求,会直接 hang 住,可能这是个 Bug 吧。
再聊聊前端我之前总结过 BAT 三家大厂的 AI 组件库建设情况:
公司 组件库 评价 字节跳动 Semi Design 豆包同款,Semi Design 还支持搭配 Tailwind CSS 使用。缺点是只支持 React,很难受。然后开发团队还说提供了接口,社区可以自己实现 Vue 版本。呃呃,社区实现了,但又没完全实现,居然还要在 Vue 里写 JSX。😅 阿里巴巴(蚂蚁) Ant DesIgn X 打开官网给我浏览器卡得半死。相比其它家有欢迎栏、提示集这类独特组件。还没深入使用过。 腾讯 TDesign 我司这个有点一言难尽。元宝前端虽然用了 TDesign 但 AI 对话那块看起来是自己写的。组件库提供的 ChatInput 占得空间太大了,样式还不好调,我在司内的项目是拿 TDesign 的 Input 组件自己撸了一个。(以上仅代表个人观点,我爱公司😘)因此前端的部分我选择了 Semi Design 组件库,因为我感觉它是真的经历了 Dogfooding 做出来的,实打实的豆包同款前端。我在写的时候前端想去仿豆包的风格,然后发现现成的组件库实现不了同样的样式,便去翻豆包的前端,惊讶地发现我踩的坑他居然都踩过一遍了! 我按照豆包前端强行加 CSS style 和 class 之后,真就搞好了。
这也是我写得第一个 React 项目,不出意外地踩了 StrictMode 下请求会发两次用来检查副作用的坑。🤣 这个过程跟我刚开始写 Vue 一样,一开始是很痛苦的,但写着写着突然就顿悟了,发现 React 把各种东西和功能定义成组件嵌套包起来的设计,还真有点妙。我也理解为什么 Vue 能火了,这俩入门难度确实不一样。
最后再聊聊我的大模型套壳站现已部署至线上:TakoChat - https://tako.chat
Tako(たこ)是日文章鱼🐙的意思。起这个名字只是我单纯觉得微软 Teams 下的章鱼动态 Emoji 很可爱。背后接的是免费版的混元大模型,所以你可以注册体验下,只是目前的功能还很基础。(不清楚阿里云那边短信验证码备案的问题是否解决了,可能会遇到部分运营商收不到短信验证码的问题,可以换不同运营商的号试试)
你可以看到左侧有「实验室」一栏,我是打算在这里动手做做像 MCP 和 Agent 这样的小玩意。(先把坑开了,填不填再说。
呼~ 总算把这篇写完了。我还挺自我感动的,没蹭热度,仅仅只是分享一些自己总结的心得体会,比那些营销号不知道高到哪里去了。
至此,周末也要结束了,明天又可以上班继续修 bug 了 🤤
文章头图来自 @极道寂 PixivID: 69237248