高性能 SSE 流式解析器:基于 Web Streams 的声明式管道实现
很多 LLM 的实时输出(“一边生成一边显示 token”)本质上都是 SSE(Server-Sent Events):服务端持续往 HTTP 响应里写入文本帧,前端持续消费。
难点不在 “拿到数据”,而在 “稳定地按协议分帧并解析”:
- TCP 是字节流:一次
read()可能拿到多条消息(粘连),也可能只拿到半条(截断)。 - UTF-8 多字节字符:如果你解码方式不对,中文可能被切断导致乱码/替换字符。
- 工程化诉求:希望像读数组一样
for await...of消费,而不是到处手写 reader 循环。
这篇文档会从 0 设计并实现一套 高性能 SSE 流式解析器,核心是深度运用 Web Streams API(ReadableStream、TransformStream),搭建一个:
解码 → 分块(按空行分帧) → 解析(event/data/id/retry)
的声明式处理管道,并最终封装成支持for await...of的易用工具。
1. SSE 必要知识(只讲解析器需要的部分)
SSE 的响应头通常是:
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
一条 SSE 消息由若干行字段组成,以一个 空行 结束:
id: 101
event: message
data: {"delta":"你"}
data: {"delta":"好"}
你需要记住 4 个解析规则:
- 消息边界:以
\n\n或\r\n\r\n作为结束标记(空行)。 - 多行 data:同一条消息里的多行
data:要用\n拼接成一个整体。 - 注释/心跳:以
:开头的行是注释,可忽略。 - 字段格式:
field: value;冒号后如果有一个空格要去掉(SSE 约定)。
永远不要把“每次读取到的 chunk”当成“完整消息”。
chunk 只是 TCP/中间层缓冲切出来的片段,和 SSE 帧边界没有任何必然关系。
2. 为什么用 Web Streams + TransformStream(而不是手写 while(reader.read()))
你当然可以手写 reader.read() 循环,但 TransformStream 带来的收益很大:
- 声明式管道:
pipeThrough()把“解码/分帧/解析”分层,代码可读、可复用。 - 天然背压(Backpressure):下游慢,上游会自动减速,减少无意义的内存堆积。
- 易测试:每段 Transform 都能单测(喂入字符串 → 输出帧/对象)。
- 更适合工程封装:最终对外暴露
AsyncIterable,业务侧只写for await...of。
3. 设计目标与接口定义
我们希望最终能写出这样的业务代码:
for await (const msg of consumeSSEMessages(res.body)) {
if (msg.data === '[DONE]') break;
// 业务层随意:JSON.parse、拼接字符串、更新 UI……
}
先定义一个最常用的消息结构(够用即可):
export type SSEMessage = {
id?: string;
event?: string;
retry?: number;
data: string;
raw: string; // 保留原始帧,便于排查问题
};
4. 实现一:分块 Transform(解决粘连与截断)
分块阶段的职责只有一个:把连续文本流按 SSE 的空行边界切成“完整帧”。
核心技巧:
- 维护一个
buffer(内部缓冲区),每来一段 chunk 就追加进去。 - 循环查找分隔符
\n\n或\r\n\r\n。 - 找到就切出一帧
frame,剩余留在buffer里等待下一段数据补全。 - 为了防止异常数据把内存撑爆,给
buffer加一个上限(可配置)。
export function createSSEFrameStream(options?: {maxBufferSize?: number}) {
const maxBufferSize = options?.maxBufferSize ?? 1024 * 1024; // 1MB
let buffer = '';
return new TransformStream<string, string>({
transform(chunk, controller) {
buffer += chunk;
if (buffer.length > maxBufferSize) {
controller.error(new Error(`SSE 缓冲区溢出:${buffer.length} > ${maxBufferSize}`));
return;
}
while (true) {
const lfIndex = buffer.indexOf('\n\n');
const crlfIndex = buffer.indexOf('\r\n\r\n');
if (lfIndex === -1 && crlfIndex === -1) break;
const useCRLF = crlfIndex !== -1 && (lfIndex === -1 || crlfIndex < lfIndex);
const splitIndex = useCRLF ? crlfIndex : lfIndex;
const delimiterLen = useCRLF ? 4 : 2;
const frame = buffer.slice(0, splitIndex);
buffer = buffer.slice(splitIndex + delimiterLen);
controller.enqueue(frame);
}
},
flush(controller) {
const tail = buffer.trim();
if (tail) controller.enqueue(tail);
},
});
}
因为我们不再依赖 chunk 边界,而是依赖 SSE 协议边界(空行):
无论 TCP 怎么分包、代理怎么缓冲,只要字节顺序不乱,最终都能在缓冲区里拼出完整帧。
5. 实现二:解析 Transform(把帧变成结构化消息)
现在我们拿到的是一整帧文本(不含结尾空行),下一步按行解析 event/data/id/retry。
export function parseSSEFrame(frame: string): SSEMessage | null {
const lines = frame.split(/\r?\n/);
const dataLines: string[] = [];
let id: string | undefined;
let event: string | undefined;
let retry: number | undefined;
for (const line of lines) {
if (!line || line.startsWith(':')) continue;
const idx = line.indexOf(':');
const field = idx === -1 ? line : line.slice(0, idx);
let value = idx === -1 ? '' : line.slice(idx + 1);
if (value.startsWith(' ')) value = value.slice(1);
if (field === 'data') dataLines.push(value);
else if (field === 'id') id = value;
else if (field === 'event') event = value;
else if (field === 'retry') {
const n = Number(value);
if (Number.isFinite(n)) retry = n;
}
}
// 纯心跳/空帧可以忽略(按你的业务需要也可以选择保留)
if (dataLines.length === 0 && !event && !id && retry === undefined) return null;
return {
raw: frame,
id,
event,
retry,
data: dataLines.join('\n'),
};
}
export function createSSEParseStream() {
return new TransformStream<string, SSEMessage>({
transform(frame, controller) {
const msg = parseSSEFrame(frame);
if (msg) controller.enqueue(msg);
},
});
}
6. 组装声明式管道:解码 → 分块 → 解析
这一步把三段拼起来:
export function createSSEMessageStream(body: ReadableStream<Uint8Array>) {
return body
.pipeThrough(new TextDecoderStream()) // 解决 UTF-8 多字节截断
.pipeThrough(createSSEFrameStream()) // 解决粘连/截断(SSE 帧边界)
.pipeThrough(createSSEParseStream()); // 得到结构化消息
}
它专门为“流式 UTF-8 解码”设计:当一个中文字符跨 chunk 时,它会自动缓存不完整字节并在下一个 chunk 补全,不会产生乱码。
7. 封装为 AsyncIterable:支持 for await...of 消费
ReadableStream 在很多环境里还不能直接 for await...of,所以我们自己封装一个小工具:
export async function* streamToAsyncIterable<T>(stream: ReadableStream<T>) {
const reader = stream.getReader();
try {
while (true) {
const {value, done} = await reader.read();
if (done) return;
yield value;
}
} finally {
// 如果业务侧提前 break,这里会执行,用于向上游传播取消信号
await reader.cancel().catch(() => {});
reader.releaseLock();
}
}
export function consumeSSEMessages(body: ReadableStream<Uint8Array>, options?: {maxBufferSize?: number}) {
const msgStream = body
.pipeThrough(new TextDecoderStream())
.pipeThrough(createSSEFrameStream(options))
.pipeThrough(createSSEParseStream());
return streamToAsyncIterable(msgStream);
}
到这里,你就拥有了一个高效、健壮、易用的底层流式数据处理能力。
8. 实战:LLM 流式输出(data: JSON + [DONE])
很多服务会用类似下面的 SSE 格式推送增量 token(示例仅表达结构):
data: {"delta":"你"}
data: {"delta":"好"}
data: [DONE]
前端消费示例:
async function run() {
const controller = new AbortController();
const res = await fetch('/api/chat', {
method: 'POST',
headers: {
Accept: 'text/event-stream',
'Content-Type': 'application/json',
},
body: JSON.stringify({prompt: '你好'}),
signal: controller.signal,
});
if (!res.body) throw new Error('响应体为空,无法进行 SSE 流式消费');
let text = '';
for await (const msg of consumeSSEMessages(res.body)) {
if (msg.data === '[DONE]') break;
// 业务层解析:不保证每条都是 JSON,务必 try/catch
try {
const json = JSON.parse(msg.data) as {delta?: string};
if (json.delta) {
text += json.delta;
// TODO:更新 UI
}
} catch {
// 非 JSON 的 data(比如心跳/日志),按需处理
}
}
}
如果你的上游约定了固定的结束标记(如 [DONE]),建议在业务层尽早 break,让取消信号尽快回传,节省带宽与 CPU。
9. 性能与健壮性 Checklist(面试也常问)
- 不要按 chunk 当消息:一定要缓冲并按
\n\n/\r\n\r\n分帧。 - 解码要“流式”:用
TextDecoderStream或TextDecoder(..., {stream:true})。 - 限制 buffer 最大长度:避免恶意/异常服务端导致内存飙升。
- 利用背压:TransformStream 管道天然背压,比手写循环更不容易堆积。
- 保留 raw:线上排障时能快速确认到底是服务端格式问题还是客户端解析问题。
10. 面试高频问答
Q1:SSE 和 WebSocket 的区别是什么?怎么选?
- SSE:单向(服务端 → 客户端),基于 HTTP,天然穿透代理更友好,适合“实时推送/增量输出”。
- WebSocket:双向,协议升级,适合强交互(游戏、协同编辑),但运维/代理层面成本更高。
- LLM 实时输出:通常 SSE 已足够(尤其是“服务端持续推 token”)。
Q2:为什么“粘连/截断”一定会发生?
因为 TCP 不携带应用层消息边界:你在服务端写 3 次,客户端可能 1 次读完,也可能分 5 次读完,完全正常。
Q3:为什么不能直接 chunk.split('\\n\\n')?
因为一个分隔符 \n\n 可能被拆到两个 chunk 里;同时一个 chunk 里也可能包含多条消息。必须维护跨 chunk 的缓冲区。
Q4:UTF-8 多字节字符被拆开会怎样?怎么解决?
会出现乱码或替换字符(�)。用 TextDecoderStream 或 TextDecoder.decode(value, {stream:true}) 让解码器自动拼接不完整字节。
Q5:TransformStream 的背压是什么?对性能有什么帮助?
下游消费慢时,上游读取会自动减速,减少缓存堆积和 GC 压力,整体更稳定。
Q6:data: 多行为什么要用 \\n 拼接?
这是 SSE 规范:同一事件里的多行 data: 代表一段文本的多行内容,最终以换行拼接为一个 data 字符串。
Q7:如何处理“服务端永远不发空行”的异常情况?
两层防线:
maxBufferSize:到阈值直接controller.error(),避免无限吃内存。- 监控与超时:业务层配合
AbortController超时取消。
Q8:为什么把解析拆成“分块 Transform + 解析 Transform”?
职责更单一:分块只管边界,解析只管语义;可复用、可测试、可替换(比如解析成 JSON、解析 OpenAI 风格协议等)。