跳到主要内容

TransformStream 详解:从「背压」到可复用的流式管道实现

在处理 SSE/LLM 流式输出大文件上传/下载边下边解码边读边解析时,如果你仍然在写 while (true) { await reader.read() },那么你会很快遇到这些痛点:

  • chunk 边界不可靠(粘连/截断),需要自己做缓冲与拼接
  • 下游处理慢时,上游继续读导致内存飙升(缺少“刹车”)
  • 解析/转换逻辑揉在一坨 while 循环里,难复用、难测试

TransformStream(Web Streams API)就是为了解决这些问题而生的:把“读取 → 转换 → 输出”拆成可组合的管道,并且天然支持背压(Backpressure)

阅读目标

读完你应该能回答:TransformStream 到底“怎么把背压传递回去”?以及如何用它实现「分行/分帧/解析/过滤」这类常见的流式处理。


1. TransformStream 是什么?(一句话 + 一个心智模型)

一句话:TransformStream 是一个同时拥有可写端(writable)和可读端(readable)的“中间件”,你往它的 writable 写入 chunk,它通过 transform() 处理后,从 readable 输出新 chunk。

心智模型:

你最常见的用法是把它放进 pipeThrough()

const out = inputReadable.pipeThrough(new TransformStream({...}));

2. 最小可运行示例:把输入字符串统一变成大写

const upper = new TransformStream<string, string>({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});

const input = new ReadableStream<string>({
start(controller) {
controller.enqueue('hello');
controller.enqueue('stream');
controller.close();
},
});

const output = input.pipeThrough(upper);

for await (const chunk of output) {
console.log(chunk); // HELLO / STREAM
}
小提示

在不支持 for await...of (ReadableStream) 的环境,你可以用 output.getReader() 循环读取;概念完全一样,只是代码更啰嗦。


3. Transformer 三件套:start / transform / flush

创建 TransformStream 时,你传入的是一个 transformer 对象,最常用的就是三个钩子:

  • start(controller):初始化(可选)
  • transform(chunk, controller):每个输入 chunk 的处理逻辑(核心)
  • flush(controller):上游关闭后、下游关闭前的“收尾”(常用于把缓冲区剩余内容吐出去)

3.1 controller 能做什么?

TransformStreamDefaultController 常用 API:

  • controller.enqueue(chunk):往 readable 侧输出一个 chunk
  • controller.error(err):让流进入 error 状态(writable/readable 都会失败)
  • controller.terminate():直接关闭 readable 侧(常用于“提前结束”)
  • controller.desiredSize下游期望还能再接收多少(理解背压的关键)

3.2 一个非常经典的实现:流式“按行切分”(需要缓冲 + flush)

很多协议/格式都是“按行”组织的(日志、NDJSON、SSE、CSV 的简化场景)。但 chunk 边界不等于行边界,所以需要缓冲。

export function createLineSplitterStream() {
let buffer = '';

return new TransformStream<string, string>({
transform(chunk, controller) {
buffer += chunk;

while (true) {
const lfIndex = buffer.indexOf('\n');
if (lfIndex === -1) break;

const line = buffer.slice(0, lfIndex);
buffer = buffer.slice(lfIndex + 1);

// 兼容 \r\n:把末尾 \r 去掉
controller.enqueue(line.endsWith('\r') ? line.slice(0, -1) : line);
}
},
flush(controller) {
if (buffer) controller.enqueue(buffer);
},
});
}

这段代码的“实现知识点”:

  • 必须有 buffer:因为 chunk 可能截断一行
  • 必须有 flush:否则最后一行如果不以 \n 结尾会丢失
  • 尽量只做一件事:切行是切行,解析是解析,保持 Transform 的职责单一,后面才能组合成管道

4. 背压(Backpressure):TransformStream 最重要的隐藏能力

如果你把流当成“无限 push 的队列”,迟早会内存爆。背压就是“下游慢了,上游要自动减速”的机制。

TransformStream 里,背压来自 readable 侧的内部队列:

  • 下游消费慢 → readable 队列堆积
  • readable 队列达到阈值(highWaterMark) → controller.desiredSize <= 0
  • 这会把压力传回 writable 侧 → writer.ready 变成 pending / write() 变慢
  • 上游 pipeTo/pipeThrough 会被自然“刹车”,不会无限读

4.1 用 controller.desiredSize 做“自适应”优化(可选)

当你的转换逻辑会“爆发式输出”(比如 1 个输入 chunk 解析成 N 条消息)时,可以用 desiredSize 做一个简单的节流策略:当下游已经很满,就少吐一点、或者暂停吐出,等下游消化。

注意

desiredSize提示,不是绝对精确的容量;但在做“避免瞬时爆发”的策略时非常有用。


5. 队列策略(Queuing Strategy):highWaterMark 与 size

new TransformStream(transformer, writableStrategy, readableStrategy) 允许你分别控制两端队列的策略:

  • highWaterMark:队列阈值,超过就开始背压
  • size(chunk):每个 chunk 的“体积”如何计算(默认每个 chunk 记为 1)

5.1 常见场景:对象流(按条数背压,而不是按字节)

例如你在解析 NDJSON(每行一个 JSON 对象),希望下游一次最多只积压 1~2 个对象:

const parseJSONLines = new TransformStream<string, unknown>(
{
transform(line, controller) {
if (!line.trim()) return;
controller.enqueue(JSON.parse(line));
},
},
undefined,
{highWaterMark: 2},
);

此时当下游处理慢,最多就积压 2 条对象,背压会立刻传回上游,避免解析侧把内存打爆。


6. 错误、取消与“提前结束”:怎么写才稳?

6.1 transform 抛错 vs controller.error

两者效果类似:都会让流进入 error 状态,下游读取会抛错/拒绝,上游写入也会失败。

更推荐的做法是:

  • 可预期的校验失败:用 controller.error(new Error(...)),表达“这是流级错误”
  • 真正的异常:让它抛出也行,但要确保你理解错误会穿透整条管道

6.2 controller.terminate:只想停止输出(常用于 take / 找到目标就结束)

例如:只取前 3 个 chunk(之后就不再输出):

export function createTakeStream<T>(n: number) {
let left = n;

return new TransformStream<T, T>({
transform(chunk, controller) {
if (left-- > 0) controller.enqueue(chunk);
if (left <= 0) controller.terminate();
},
});
}
terminate 的影响

terminate() 会关闭 readable 侧,但上游 writable 侧未必立刻停止“写入尝试”。在管道里使用时,建议把“只取前 N 个”的逻辑放在靠近消费端的位置,并正确处理上游的取消/关闭。


7. 组合管道的最佳实践:小 Transform 叠罗汉

不要试图写一个“万能 TransformStream”。更推荐写一堆小的、可测试的 Transform,然后用 pipeThrough() 组合:

const stream = response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(createLineSplitterStream())
.pipeThrough(
new TransformStream<string, unknown>({
transform(line, controller) {
controller.enqueue({line, ts: Date.now()});
},
}),
);

这也是 Web Streams 特别适合做“声明式流式处理”的原因:每段 Transform 都像一个纯函数(输入 → 输出),非常工程化。

延伸阅读

如果你正在做 SSE/LLM 增量输出解析,可以直接参考同目录的实战文档:docs/frontend/javascript/api/sse-stream-parser.md


8. 常见坑位清单(写之前先过一遍)

  • 忘了 flush:涉及缓冲的 Transform(切行/分帧)如果没有 flush,很容易丢最后一段
  • 把 chunk 当消息:chunk 只是“传输切片”,协议消息必须靠分隔符/长度字段解析
  • 字符串拼接无上限:buffer 一定要有上限(尤其是网络流),否则异常数据会撑爆内存
  • 编码问题:字节流转文本请优先用 TextDecoderStream,不要自己 new TextDecoder().decode() 乱切 UTF-8
  • 输出爆发:1 入 N 出的 Transform,要考虑 desiredSize 与下游处理能力

9. 面试高频问答

9.1 TransformStream 解决了什么问题?

它把“读取/转换/输出”拆成可组合管道,并且在管道里自动处理背压:下游慢了,上游会被自然减速,避免内存无限堆积。

9.2 背压是什么?TransformStream 如何把背压传回上游?

背压就是“下游消费能力不足时,限制上游继续生产”。TransformStream 的 readable 侧有内部队列,当队列达到 highWaterMarkdesiredSize 变小甚至 ≤ 0,这会让 writable 侧的 writer.ready/write() 变慢或挂起,从而让 pipeTo/pipeThrough 的上游自动减速。

9.3 transform 和 flush 的区别?

  • transform(chunk):每个输入 chunk 都会调用
  • flush():上游 close 后调用一次,用来把缓冲区剩余内容“吐干净”

9.4 controller.terminate 和 controller.error 有什么区别?

  • terminate():正常结束输出(readable 关闭),常用于“满足条件就停止”
  • error():异常结束(readable/writable 都进入 error),表示整条流失败

9.5 pipeThrough 和 pipeTo 怎么选?

  • pipeThrough(transform):用于“中间变换”,输入 ReadableStream → 输出 ReadableStream
  • pipeTo(writable):用于“最终落地”,把 ReadableStream 写到 WritableStream,并返回一个 Promise 表示写入完成/失败

9.6 什么时候需要配置 highWaterMark?

当你的 chunk 不是“等价大小”(比如对象、变长字符串、解析后 1 入 N 出),或者你希望更快触发背压(减少积压),就应该调 highWaterMark(必要时配合 size())。