跳到主要内容

高性能 SSE 流式解析器:基于 Web Streams 的声明式管道实现

很多 LLM 的实时输出(“一边生成一边显示 token”)本质上都是 SSE(Server-Sent Events):服务端持续往 HTTP 响应里写入文本帧,前端持续消费。

难点不在 “拿到数据”,而在 “稳定地按协议分帧并解析”:

  • TCP 是字节流:一次 read() 可能拿到多条消息(粘连),也可能只拿到半条(截断)。
  • UTF-8 多字节字符:如果你解码方式不对,中文可能被切断导致乱码/替换字符。
  • 工程化诉求:希望像读数组一样 for await...of 消费,而不是到处手写 reader 循环。

这篇文档会从 0 设计并实现一套 高性能 SSE 流式解析器,核心是深度运用 Web Streams API(ReadableStreamTransformStream),搭建一个:

解码 → 分块(按空行分帧) → 解析(event/data/id/retry)
的声明式处理管道,并最终封装成支持 for await...of 的易用工具。


1. SSE 必要知识(只讲解析器需要的部分)

SSE 的响应头通常是:

Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive

一条 SSE 消息由若干行字段组成,以一个 空行 结束:

id: 101
event: message
data: {"delta":"你"}
data: {"delta":"好"}

你需要记住 4 个解析规则:

  1. 消息边界:以 \n\n\r\n\r\n 作为结束标记(空行)。
  2. 多行 data:同一条消息里的多行 data: 要用 \n 拼接成一个整体。
  3. 注释/心跳:以 : 开头的行是注释,可忽略。
  4. 字段格式field: value;冒号后如果有一个空格要去掉(SSE 约定)。
关键提醒

永远不要把“每次读取到的 chunk”当成“完整消息”。
chunk 只是 TCP/中间层缓冲切出来的片段,和 SSE 帧边界没有任何必然关系。


2. 为什么用 Web Streams + TransformStream(而不是手写 while(reader.read()))

你当然可以手写 reader.read() 循环,但 TransformStream 带来的收益很大:

  • 声明式管道pipeThrough() 把“解码/分帧/解析”分层,代码可读、可复用。
  • 天然背压(Backpressure):下游慢,上游会自动减速,减少无意义的内存堆积。
  • 易测试:每段 Transform 都能单测(喂入字符串 → 输出帧/对象)。
  • 更适合工程封装:最终对外暴露 AsyncIterable,业务侧只写 for await...of

3. 设计目标与接口定义

我们希望最终能写出这样的业务代码:

for await (const msg of consumeSSEMessages(res.body)) {
if (msg.data === '[DONE]') break;
// 业务层随意:JSON.parse、拼接字符串、更新 UI……
}

先定义一个最常用的消息结构(够用即可):

export type SSEMessage = {
id?: string;
event?: string;
retry?: number;
data: string;
raw: string; // 保留原始帧,便于排查问题
};

4. 实现一:分块 Transform(解决粘连与截断)

分块阶段的职责只有一个:把连续文本流按 SSE 的空行边界切成“完整帧”

核心技巧:

  • 维护一个 buffer(内部缓冲区),每来一段 chunk 就追加进去。
  • 循环查找分隔符 \n\n\r\n\r\n
  • 找到就切出一帧 frame,剩余留在 buffer 里等待下一段数据补全。
  • 为了防止异常数据把内存撑爆,给 buffer 加一个上限(可配置)。
export function createSSEFrameStream(options?: {maxBufferSize?: number}) {
const maxBufferSize = options?.maxBufferSize ?? 1024 * 1024; // 1MB
let buffer = '';

return new TransformStream<string, string>({
transform(chunk, controller) {
buffer += chunk;

if (buffer.length > maxBufferSize) {
controller.error(new Error(`SSE 缓冲区溢出:${buffer.length} > ${maxBufferSize}`));
return;
}

while (true) {
const lfIndex = buffer.indexOf('\n\n');
const crlfIndex = buffer.indexOf('\r\n\r\n');

if (lfIndex === -1 && crlfIndex === -1) break;

const useCRLF = crlfIndex !== -1 && (lfIndex === -1 || crlfIndex < lfIndex);
const splitIndex = useCRLF ? crlfIndex : lfIndex;
const delimiterLen = useCRLF ? 4 : 2;

const frame = buffer.slice(0, splitIndex);
buffer = buffer.slice(splitIndex + delimiterLen);
controller.enqueue(frame);
}
},
flush(controller) {
const tail = buffer.trim();
if (tail) controller.enqueue(tail);
},
});
}
为什么这一步能“精准解决”粘连与截断?

因为我们不再依赖 chunk 边界,而是依赖 SSE 协议边界(空行)
无论 TCP 怎么分包、代理怎么缓冲,只要字节顺序不乱,最终都能在缓冲区里拼出完整帧。


5. 实现二:解析 Transform(把帧变成结构化消息)

现在我们拿到的是一整帧文本(不含结尾空行),下一步按行解析 event/data/id/retry

export function parseSSEFrame(frame: string): SSEMessage | null {
const lines = frame.split(/\r?\n/);

const dataLines: string[] = [];
let id: string | undefined;
let event: string | undefined;
let retry: number | undefined;

for (const line of lines) {
if (!line || line.startsWith(':')) continue;

const idx = line.indexOf(':');
const field = idx === -1 ? line : line.slice(0, idx);
let value = idx === -1 ? '' : line.slice(idx + 1);
if (value.startsWith(' ')) value = value.slice(1);

if (field === 'data') dataLines.push(value);
else if (field === 'id') id = value;
else if (field === 'event') event = value;
else if (field === 'retry') {
const n = Number(value);
if (Number.isFinite(n)) retry = n;
}
}

// 纯心跳/空帧可以忽略(按你的业务需要也可以选择保留)
if (dataLines.length === 0 && !event && !id && retry === undefined) return null;

return {
raw: frame,
id,
event,
retry,
data: dataLines.join('\n'),
};
}

export function createSSEParseStream() {
return new TransformStream<string, SSEMessage>({
transform(frame, controller) {
const msg = parseSSEFrame(frame);
if (msg) controller.enqueue(msg);
},
});
}

6. 组装声明式管道:解码 → 分块 → 解析

这一步把三段拼起来:

export function createSSEMessageStream(body: ReadableStream<Uint8Array>) {
return body
.pipeThrough(new TextDecoderStream()) // 解决 UTF-8 多字节截断
.pipeThrough(createSSEFrameStream()) // 解决粘连/截断(SSE 帧边界)
.pipeThrough(createSSEParseStream()); // 得到结构化消息
}
为什么推荐 TextDecoderStream?

它专门为“流式 UTF-8 解码”设计:当一个中文字符跨 chunk 时,它会自动缓存不完整字节并在下一个 chunk 补全,不会产生乱码。


7. 封装为 AsyncIterable:支持 for await...of 消费

ReadableStream 在很多环境里还不能直接 for await...of,所以我们自己封装一个小工具:

export async function* streamToAsyncIterable<T>(stream: ReadableStream<T>) {
const reader = stream.getReader();
try {
while (true) {
const {value, done} = await reader.read();
if (done) return;
yield value;
}
} finally {
// 如果业务侧提前 break,这里会执行,用于向上游传播取消信号
await reader.cancel().catch(() => {});
reader.releaseLock();
}
}

export function consumeSSEMessages(body: ReadableStream<Uint8Array>, options?: {maxBufferSize?: number}) {
const msgStream = body
.pipeThrough(new TextDecoderStream())
.pipeThrough(createSSEFrameStream(options))
.pipeThrough(createSSEParseStream());

return streamToAsyncIterable(msgStream);
}

到这里,你就拥有了一个高效、健壮、易用的底层流式数据处理能力。


8. 实战:LLM 流式输出(data: JSON + [DONE])

很多服务会用类似下面的 SSE 格式推送增量 token(示例仅表达结构):

data: {"delta":"你"}

data: {"delta":"好"}

data: [DONE]

前端消费示例:

async function run() {
const controller = new AbortController();

const res = await fetch('/api/chat', {
method: 'POST',
headers: {
Accept: 'text/event-stream',
'Content-Type': 'application/json',
},
body: JSON.stringify({prompt: '你好'}),
signal: controller.signal,
});

if (!res.body) throw new Error('响应体为空,无法进行 SSE 流式消费');

let text = '';
for await (const msg of consumeSSEMessages(res.body)) {
if (msg.data === '[DONE]') break;

// 业务层解析:不保证每条都是 JSON,务必 try/catch
try {
const json = JSON.parse(msg.data) as {delta?: string};
if (json.delta) {
text += json.delta;
// TODO:更新 UI
}
} catch {
// 非 JSON 的 data(比如心跳/日志),按需处理
}
}
}
工程建议

如果你的上游约定了固定的结束标记(如 [DONE]),建议在业务层尽早 break,让取消信号尽快回传,节省带宽与 CPU。


9. 性能与健壮性 Checklist(面试也常问)

  • 不要按 chunk 当消息:一定要缓冲并按 \n\n/\r\n\r\n 分帧。
  • 解码要“流式”:用 TextDecoderStreamTextDecoder(..., {stream:true})
  • 限制 buffer 最大长度:避免恶意/异常服务端导致内存飙升。
  • 利用背压:TransformStream 管道天然背压,比手写循环更不容易堆积。
  • 保留 raw:线上排障时能快速确认到底是服务端格式问题还是客户端解析问题。

10. 面试高频问答

Q1:SSE 和 WebSocket 的区别是什么?怎么选?

  • SSE:单向(服务端 → 客户端),基于 HTTP,天然穿透代理更友好,适合“实时推送/增量输出”。
  • WebSocket:双向,协议升级,适合强交互(游戏、协同编辑),但运维/代理层面成本更高。
  • LLM 实时输出:通常 SSE 已足够(尤其是“服务端持续推 token”)。

Q2:为什么“粘连/截断”一定会发生?

因为 TCP 不携带应用层消息边界:你在服务端写 3 次,客户端可能 1 次读完,也可能分 5 次读完,完全正常。

Q3:为什么不能直接 chunk.split('\\n\\n')

因为一个分隔符 \n\n 可能被拆到两个 chunk 里;同时一个 chunk 里也可能包含多条消息。必须维护跨 chunk 的缓冲区。

Q4:UTF-8 多字节字符被拆开会怎样?怎么解决?

会出现乱码或替换字符(�)。用 TextDecoderStreamTextDecoder.decode(value, {stream:true}) 让解码器自动拼接不完整字节。

Q5:TransformStream 的背压是什么?对性能有什么帮助?

下游消费慢时,上游读取会自动减速,减少缓存堆积和 GC 压力,整体更稳定。

Q6:data: 多行为什么要用 \\n 拼接?

这是 SSE 规范:同一事件里的多行 data: 代表一段文本的多行内容,最终以换行拼接为一个 data 字符串。

Q7:如何处理“服务端永远不发空行”的异常情况?

两层防线:

  1. maxBufferSize:到阈值直接 controller.error(),避免无限吃内存。
  2. 监控与超时:业务层配合 AbortController 超时取消。

Q8:为什么把解析拆成“分块 Transform + 解析 Transform”?

职责更单一:分块只管边界,解析只管语义;可复用、可测试、可替换(比如解析成 JSON、解析 OpenAI 风格协议等)。