ReadableStream:从使用到“实现原理”的一篇讲透(Web Streams)
当你在前端做过下面这些事情时,你其实已经在和 ReadableStream 打交道了:
fetch()逐块读取响应体(比如 SSE、LLM 流式输出、下载大文件)TextDecoderStream/TransformStream搭建“声明式管道”- 想要“边到边处理”:边下载边解码、边解析边渲染,而不是等全量数据到齐
这篇文档不仅告诉你怎么用,更重点讲清它在内部是怎么运转的:队列、读请求、背压(Backpressure)、pull() 何时触发、为什么 desiredSize 能“让上游慢下来”。
1. ReadableStream 是什么?一句话先定性
ReadableStream 表示一个可被按需读取的数据源:数据不是一次性给完,而是“分块(chunk)”逐步产生、逐步消费。
它解决的核心问题是:让生产速度自动匹配消费速度,避免“生产太快 → 内存堆爆 / 卡顿”,这就是背压。
- 上游往里倒水(
enqueue) - 下游从里舀水(
read) - 水位太高就告诉上游“先别倒了”(背压)
2. 先会用:最小可运行示例(默认流)
下面是一个最小的 ReadableStream:产生 3 个字符串后关闭。
const stream = new ReadableStream<string>({
start(controller) {
controller.enqueue('A');
controller.enqueue('B');
controller.enqueue('C');
controller.close();
},
});
const reader = stream.getReader();
while (true) {
const { value, done } = await reader.read();
if (done) break;
console.log(value);
}
reader.releaseLock();
你必须记住 3 个关键点:
getReader()会**锁定(lock)**流:同一时间只能有一个 reader 读这个流。read()返回的是Promise<{ value, done }>:没有数据时会挂起等待。- 用完要
releaseLock():否则后续pipeThrough()/ 再次getReader()可能失败或被迫等待。
3. UnderlyingSource 与 Controller:你写的“生产者接口”
创建 ReadableStream 时你传入的对象叫 UnderlyingSource,它的核心方法是:
start(controller):初始化时调用一次(可选)pull(controller):当系统“需要更多数据”时调用(可选,但最关键)cancel(reason):消费者不读了/管道被取消时调用(可选)
而 controller 是你向流里“喂数据”的控制器:
controller.enqueue(chunk):推入一个 chunkcontroller.close():声明后续不再有数据controller.error(err):让流进入 errored 状态controller.desiredSize:背压指标(越小表示下游越慢 / 队列越满)
如果你在 start() 里无限 enqueue(),等价于“无脑生产”,背压很难发挥作用。
4. “实现视角”核心:两套队列 + 三态机
为了理解 ReadableStream 的行为,你脑中只要有这张结构图就够了:
- 内部 chunk 队列:保存已经生产但尚未被消费的数据
- 读请求队列:当消费者先
read()、但队列里没数据时,把 promise 的 resolve/reject 暂存起来 - 状态机:
readable/closed/errored
5. 读与写到底怎么“对上号”?两段伪代码讲透
5.1 消费者调用 read()
当你执行 reader.read() 时,内部大致做的是:
- 队列有 chunk:直接出队一个 chunk,立刻 resolve
- 队列为空但已关闭:立刻返回
{ done: true } - 队列为空且仍可读:把这次 read 的 resolve/reject 放进“读请求队列”,等未来
enqueue()来唤醒
5.2 生产者调用 enqueue()
当你 controller.enqueue(chunk) 时,内部大致做的是:
- 存在挂起的 read 请求:直接把 chunk 交给最早的那个 read(不进队列)
- 没有挂起 read:chunk 进入内部队列(队列“水位”上升)
- 队列变满时(
desiredSize <= 0),系统会开始对上游施加背压:减少/暂停对pull()的调用
read() 与 enqueue() 谁先来都行:
先 read → 挂起等数据;先 enqueue → 缓存在队列里等消费。
6. 背压(Backpressure):为什么 desiredSize 能“让上游慢下来”
ReadableStream 内部会维护一个“队列总大小”(可以是 chunk 个数,也可以是字节数),并用队列策略计算一个阈值:
highWaterMark:你允许队列最多“囤多少”size(chunk):每个 chunk 占用多少“配额”(可选;字节流通常按byteLength)
然后得到一个很关键的数:
desiredSize = highWaterMark - 队列当前总大小
直觉解释:
desiredSize大于 0:下游还有余力,上游可以继续产出desiredSize接近 0 / 小于等于 0:队列满了,下游来不及,上游该“刹车”
6.1 一个“正确使用 pull()”的生产者
下面这个例子会在 pull() 里按需生产,并且用 desiredSize 做节流提示:
function sleep(ms: number) {
return new Promise((r) => setTimeout(r, ms));
}
let i = 0;
const stream = new ReadableStream<string>(
{
async pull(controller) {
// 模拟上游生产开销
await sleep(50);
controller.enqueue(`chunk-${i++}`);
// 生产到 5 个就收工
if (i >= 5) controller.close();
// 观察背压信号(数值越小表示越“满”)
console.log('desiredSize:', controller.desiredSize);
},
cancel(reason) {
console.log('cancel:', reason);
},
},
{
highWaterMark: 2,
size() {
return 1; // 每个 chunk 算 1 个单位
},
},
);
如果消费者很慢(比如每 500ms 才 read 一次),你会看到 desiredSize 经常变小甚至为 0:这就是背压在告诉你“别再囤货了”。
7. 默认流 vs 字节流(BYOB):为什么有时你会看到 Uint8Array
在浏览器里最常见的场景是:
fetch()的Response.body通常是ReadableStream<Uint8Array>(字节流)
字节流除了默认 reader,还可能使用 BYOB Reader(Bring Your Own Buffer):
- 由消费者提供一块 buffer
- 底层直接把数据填进这块 buffer,减少分配与拷贝
大多数业务开发只用默认 reader 就够了。
当你在做高性能 I/O(音视频、加密、压缩、大文件)且频繁分配 Uint8Array 成为瓶颈时,再考虑 BYOB。
8. 与 fetch 流式读取的最佳实践:解码要“流式”
很多人第一次读 Uint8Array 会踩坑:中文被切半导致乱码。原因是 UTF-8 是多字节编码,一个字符可能跨 chunk。
正确解法有两个:
8.1 用 TextDecoderStream(推荐)
const res = await fetch('/api/stream');
const textStream = res.body!.pipeThrough(new TextDecoderStream());
for await (const textChunk of readableStreamToAsyncIterable(textStream)) {
console.log('文本块:', textChunk);
}
8.2 手动 TextDecoder,记得开启 stream 模式
const decoder = new TextDecoder('utf-8');
const reader = res.body!.getReader();
let out = '';
while (true) {
const { value, done } = await reader.read();
if (done) break;
out += decoder.decode(value, { stream: true });
}
out += decoder.decode(); // flush
9. pipeThrough / pipeTo / tee:工程化“流式管道”的三件套
pipeThrough(transformStream):ReadableStream -> TransformStream -> ReadableStreampipeTo(writableStream):ReadableStream -> WritableStream(返回 Promise,表示写完/失败)tee():把一个可读流分裂成两个可读流(通常用于“一份数据两路消费”)
如果你已经 getReader() 了,这个流就被锁定;此时再 pipeThrough() / tee() 可能会报错。
解决方案:要么全程用 pipe 管道;要么读取完及时 releaseLock()。
10. 把 ReadableStream 变成 AsyncIterable:for await...of 的通用适配器
有些 API(例如 for await...of)更适合业务消费。下面是一个通用转换函数:
export async function* readableStreamToAsyncIterable<T>(stream: ReadableStream<T>) {
const reader = stream.getReader();
try {
while (true) {
const { value, done } = await reader.read();
if (done) return;
yield value;
}
} finally {
reader.releaseLock();
}
}
这样你就能写出“像读数组一样读流”的代码:
for await (const chunk of readableStreamToAsyncIterable(res.body!)) {
// chunk: Uint8Array
}
11. 常见问题与排查清单(非常实用)
- 报错:ReadableStream is locked
说明你已经拿了 reader 或正在 pipe,先releaseLock()或不要混用两种消费方式。 - 内存越来越大
常见原因是上游在start()里无限enqueue();把生产移到pull(),并正确设置highWaterMark。 - 中文乱码/丢字
你在逐块decode()时没用流式解码;用TextDecoderStream或decode(..., {stream:true})。 - 消费者提前退出但网络还在下载
记得在不需要时调用reader.cancel()或让pipeTo绑定AbortSignal。
12. 面试高频问答
Q1:ReadableStream 解决的核心问题是什么?
A:让“生产数据”和“消费数据”解耦,同时通过背压自动调速,避免一次性加载带来的内存与卡顿问题。
Q2:什么是背压?它是怎么产生作用的?
A:背压是“下游慢 → 上游自动慢”的反馈机制。ReadableStream 通过队列水位(highWaterMark/size)计算 desiredSize,当队列接近满时减少 pull() 的触发,从而抑制继续生产。
Q3:为什么建议把生产逻辑写在 pull(),而不是 start()?
A:pull() 只会在“需要更多数据”时被调用,更容易和背压配合;start() 适合做初始化,不适合无节制生产。
Q4:read() 在没有数据时为什么不会返回空值,而是会 await?
A:因为 ReadableStream 允许“先发起 read 请求再等待数据到来”。内部会把这次 read 的 resolve 暂存到“读请求队列”,等未来 enqueue/close/error 再统一唤醒。
Q5:ReadableStream 的三种状态是什么?如何转换?
A:readable / closed / errored。调用 close() 会走向 closed(通常会等队列清空);调用 error() 直接进入 errored,后续读会 reject。
Q6:什么是“锁定(lock)”?为什么会报 is locked?
A:同一个流同一时间只能被一个 reader 或一个 pipe 管道独占消费。你 getReader() 后流会被锁定;未释放前再去 getReader() / tee() / pipeThrough() 就可能报错。
Q7:pipeThrough 和 pipeTo 有什么区别?
A:pipeThrough 连接一个 TransformStream 并返回新的 ReadableStream;pipeTo 把数据写入 WritableStream 并返回一个表示完成/失败的 Promise。
Q8:tee() 的典型使用场景是什么?
A:一份输入需要两路消费,例如“边解析边缓存”、或“一路显示进度一路写入文件”。注意 tee 会让背压关系变复杂,两个分支都会影响上游节奏。
Q9:为什么流式解码 UTF-8 要用 TextDecoderStream 或 stream:true?
A:UTF-8 字符可能跨 chunk;非流式 decode 会把被切开的字节当成非法序列,导致乱码或替换字符。流式解码会把不完整字节暂存,等下个 chunk 补齐再输出。
Q10:ReadableStream 和 AsyncIterator/Generator 的关系是什么?
A:两者都能“逐步产出数据”,但 ReadableStream 自带背压与管道(pipe/transform),更适合 I/O;AsyncIterator 更轻量、语义简单。工程里常把 ReadableStream 适配成 AsyncIterable 方便消费。