跳到主要内容

基于文件 Hash 的高可用上传方案(Verify / Upload / Merge + 并发池 + 暂停恢复)

大文件上传在真实网络环境里非常“折磨人”:

  • 网络一抖就失败;失败后只能重来,浪费时间/流量
  • 同时开很多分片请求,前端和后端都可能被打爆(请求风暴)
  • 用户想暂停一下、切个页面、换个网络,再继续——你得支持

本文给出一套可落地的高可用上传方案:基于文件 Hash + “预检(Verify)-上传(Upload)-合并(Merge)”三步协议,实现精准断点续传;并通过Promise 请求池动态控制并发,结合 CancelToken(取消令牌)实现暂停/恢复,保证弱网下稳定性。


1. 目标与非目标

1.1 目标

  • 断点续传:只上传缺失分片,失败可重试,尽量不重传已成功分片
  • 弱网稳定:通过并发控制 + 重试退避 + 超时机制,提升成功率
  • 可控并发:避免请求风暴,保护后端与浏览器资源
  • 可暂停/恢复:用户随时暂停(立即停)与恢复(继续缺失分片)
  • 可观测:能统计成功率、耗时、失败原因,便于优化与排障

1.2 非目标(边界要写清)

  • 不讨论“秒传/去重”带来的跨用户隐私与权限问题(但会在安全章节提示风险)
  • 不在本文给出特定语言的完整后端代码(只讲协议与关键实现要点)
  • 不追求“极限速度”,更强调“可控、稳定、可恢复”

2. 总体流程(先看懂全局)

核心思想可以用一句话概括:

Verify 决定“传不传、传哪些”,Upload 负责“稳稳地把缺的传完”,Merge 负责“最终一致性”。


3. 三步协议设计(前后端对齐的关键)

下面是一套“够用且可扩展”的接口约定,你可以按团队实际路径/鉴权方式调整。

3.1 Verify(预检)——确定是否秒传/断点续传

请求(JSON)

{
"fileHash": "sha256:xxx",
"fileName": "demo.mp4",
"fileSize": 104857600,
"chunkSize": 5242880
}

响应(两种典型情况)

  1. 秒传(服务端已存在完整文件)
{
"uploaded": true,
"fileUrl": "https://cdn.example.com/files/xxx.mp4"
}
  1. 需要上传(新上传或断点续传)
{
"uploaded": false,
"uploadId": "u_20260304_abcdef",
"uploadedChunkIndexes": [0, 1, 2, 5, 6]
}

关键点

  • fileHash 是这套方案的“指纹”。建议使用 SHA-256(更安全)或 MD5(更快)——无论选哪个,都建议在后端做最终校验。
  • uploadedChunkIndexes 是断点续传的核心:客户端只补传缺失分片。
  • uploadId 用来区分一次上传会话(建议与用户/租户绑定),避免不同用户 hash 相同引发越权/污染。

3.2 Upload(分片上传)——幂等、可取消、可重试

请求(multipart/form-data),字段示例:

  • uploadId: string
  • fileHash: string
  • chunkIndex: number(建议从 0 开始)
  • chunkHash: string(可选,用于分片校验)
  • chunk: Blob(二进制分片)

响应

{ "ok": true }

关键点

  • 必须幂等:同一个 (uploadId, chunkIndex) 重传不应报错(网络重试是常态)。
  • 必须可取消:前端暂停时要能立刻取消正在上传的请求。

3.3 Merge(合并)——生成最终文件并校验一致性

请求(JSON)

{
"uploadId": "u_20260304_abcdef",
"fileHash": "sha256:xxx",
"fileName": "demo.mp4",
"totalChunks": 20
}

响应

{
"fileUrl": "https://cdn.example.com/files/xxx.mp4"
}

关键点

  • 合并前服务端应检查分片是否齐全;合并后建议校验最终文件 Hash 与 fileHash 一致。
  • 合并过程建议加锁(分布式锁/数据库锁)避免并发合并。

4. 客户端实现要点(TypeScript 思路 + 关键代码)

下面的代码以“思路清晰、可直接改造落地”为目标,重点放在:切片、请求池、动态并发、取消令牌、暂停/恢复

4.1 切片:把文件拆成 chunks

export type Chunk = {
index: number
start: number
end: number
blob: Blob
size: number
}

export function createChunks(file: File, chunkSize: number): Chunk[] {
const chunks: Chunk[] = []
let index = 0
for (let start = 0; start < file.size; start += chunkSize) {
const end = Math.min(start + chunkSize, file.size)
const blob = file.slice(start, end)
chunks.push({index, start, end, blob, size: end - start})
index++
}
return chunks
}

建议经验值:

  • chunkSize 通常 2MB~10MB 较常见(太小请求多,太大失败重传成本高)
  • 首次可默认 5MB,后续根据监控数据调整

4.2 计算 Hash:别把主线程卡死

最简单的 SHA-256 示例(注意:会把整个文件读入内存,大文件不推荐):

export async function sha256(file: Blob): Promise<string> {
const buffer = await file.arrayBuffer()
const digest = await crypto.subtle.digest('SHA-256', buffer)
const bytes = new Uint8Array(digest)
const hex = [...bytes].map((b) => b.toString(16).padStart(2, '0')).join('')
return `sha256:${hex}`
}

大文件更推荐:

  • Web Worker 做 Hash 计算(避免阻塞 UI)
  • 使用支持增量 Hash的库(例如增量 MD5 / 增量 SHA-256),边读边算,减少内存峰值
  • 如果你们业务只需要“断点续传定位”而不追求强校验,可采用“抽样 Hash”(需评估碰撞风险)

4.3 CancelToken:让请求“可被用户控制”

这里的 CancelToken 是一种通用模式:底层使用 AbortController,上层统一拿 signal 传给请求函数。

export class CancelToken {
private controller = new AbortController()

get signal() {
return this.controller.signal
}

cancel(reason: string = 'canceled') {
this.controller.abort(reason)
}
}

暂停的本质通常是两件事:

  1. 不再发起新的分片请求(暂停调度)
  2. 立刻取消正在进行的请求(取消令牌)

4.4 Promise 请求池:限制并发,避免请求风暴

如果你直接 Promise.all(chunks.map(uploadChunk)),浏览器会同时发出海量请求:抢占带宽、占满连接、拖垮后端,弱网下更容易连环失败。

一个“够用”的请求池需要:

  • 队列 + 并发上限
  • pause()/resume()
  • setConcurrency() 动态调节
  • AbortSignal 配合,支持取消
type PoolTask<T> = (signal: AbortSignal) => Promise<T>

export class PromisePool {
private concurrency: number
private running = 0
private paused = false
private queue: Array<() => void> = []
private idleResolvers: Array<() => void> = []
private controller = new AbortController()

constructor(concurrency: number) {
this.concurrency = Math.max(1, concurrency)
}

get signal() {
return this.controller.signal
}

setConcurrency(concurrency: number) {
this.concurrency = Math.max(1, concurrency)
this.drain()
}

pause() {
this.paused = true
}

resume() {
this.paused = false
this.drain()
}

cancelAll(reason = 'canceled') {
this.controller.abort(reason)
this.queue = []
this.maybeResolveIdle()
}

add<T>(task: PoolTask<T>): Promise<T> {
return new Promise<T>((resolve, reject) => {
const run = () => {
if (this.signal.aborted) {
reject(this.signal.reason ?? new Error('aborted'))
return
}

this.running++
task(this.signal)
.then(resolve, reject)
.finally(() => {
this.running--
this.drain()
this.maybeResolveIdle()
})
}

this.queue.push(run)
this.drain()
})
}

async waitForIdle(): Promise<void> {
if (this.running === 0 && this.queue.length === 0) return
await new Promise<void>((resolve) => this.idleResolvers.push(resolve))
}

private drain() {
while (!this.paused && this.running < this.concurrency && this.queue.length > 0) {
const run = this.queue.shift()!
run()
}
}

private maybeResolveIdle() {
if (this.running !== 0) return
if (this.queue.length !== 0) return
const resolvers = this.idleResolvers
this.idleResolvers = []
resolvers.forEach((r) => r())
}
}

4.5 动态并发:弱网/抖动时自动“降档”

推荐一个简单且有效的思路:AIMD(加性增、乘性减),和 TCP 拥塞控制很像:

  • 稳定时:并发 +1(慢慢加速)
  • 一旦出现明显失败/超时:并发 ×0.5(快速降速止血)
export class ConcurrencyTuner {
private readonly min: number
private readonly max: number
private current: number
private window: Array<{ok: boolean; ms: number; status?: number}> = []
private readonly windowSize = 20

constructor(opts: {min: number; max: number; initial: number}) {
this.min = opts.min
this.max = opts.max
this.current = Math.min(this.max, Math.max(this.min, opts.initial))
}

onResult(result: {ok: boolean; ms: number; status?: number}) {
this.window.push(result)
if (this.window.length > this.windowSize) this.window.shift()

const fails = this.window.filter((x) => !x.ok).length
const avg = this.window.reduce((s, x) => s + x.ms, 0) / this.window.length

// 失败明显 or 平均耗时飙升:快速降并发
if (fails >= 3 || avg > 4000) {
this.current = Math.max(this.min, Math.floor(this.current * 0.5))
return this.current
}

// 稳定:慢慢升并发
if (fails === 0 && avg < 1500) {
this.current = Math.min(this.max, this.current + 1)
}

return this.current
}

getConcurrency() {
return this.current
}
}

实际项目里你可以把 avg > 4000avg < 1500 这类阈值做成配置,并结合后端 429/503 等状态码更精细地降速。

4.6 上传分片:可取消 + 可重试(带退避)

一个可落地的 uploadChunk(这里用 fetch 示例,生产可替换为你们的请求库):

export async function uploadChunk(params: {
endpoint: string
uploadId: string
fileHash: string
chunkIndex: number
chunk: Blob
signal: AbortSignal
}): Promise<void> {
const form = new FormData()
form.append('uploadId', params.uploadId)
form.append('fileHash', params.fileHash)
form.append('chunkIndex', String(params.chunkIndex))
form.append('chunk', params.chunk)

const res = await fetch(params.endpoint, {
method: 'POST',
body: form,
signal: params.signal
})

if (!res.ok) throw new Error(`uploadChunk failed: ${res.status}`)
}

重试建议:指数退避 + 抖动(避免一堆客户端同时重试再次打爆服务端)。

function sleep(ms: number, signal?: AbortSignal) {
return new Promise<void>((resolve, reject) => {
const id = setTimeout(resolve, ms)
signal?.addEventListener(
'abort',
() => {
clearTimeout(id)
reject(signal.reason ?? new Error('aborted'))
},
{once: true}
)
})
}

export async function withRetry<T>(
fn: () => Promise<T>,
opts: {retries: number; baseDelayMs: number; signal?: AbortSignal}
): Promise<T> {
let attempt = 0
while (true) {
try {
return await fn()
} catch (e) {
if (opts.signal?.aborted) throw e
if (attempt >= opts.retries) throw e
const jitter = Math.random() * 200
const delay = opts.baseDelayMs * Math.pow(2, attempt) + jitter
attempt++
await sleep(delay, opts.signal)
}
}
}

4.7 串起来:Verify → 上传缺失分片(并发池)→ Merge

下面是“最核心”的调度骨架(省略 UI 绑定与错误提示)。

type VerifyResp =
| {uploaded: true; fileUrl: string}
| {uploaded: false; uploadId: string; uploadedChunkIndexes: number[]}

export async function runUpload(params: {
file: File
chunkSize: number
verifyEndpoint: string
uploadEndpoint: string
mergeEndpoint: string
pool: PromisePool
tuner: ConcurrencyTuner
}): Promise<string> {
const fileHash = await sha256(params.file)
const chunks = createChunks(params.file, params.chunkSize)

// 1) Verify
const verifyRes = (await fetch(params.verifyEndpoint, {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({
fileHash,
fileName: params.file.name,
fileSize: params.file.size,
chunkSize: params.chunkSize
}),
signal: params.pool.signal
}).then((r) => r.json())) as VerifyResp

if (verifyRes.uploaded) return verifyRes.fileUrl

const uploaded = new Set<number>(verifyRes.uploadedChunkIndexes)
const missing = chunks.filter((c) => !uploaded.has(c.index))

// 2) Upload(受并发池控制)
const jobs = missing.map((c) =>
params.pool.add(async (signal) => {
const start = performance.now()
try {
await withRetry(
() =>
uploadChunk({
endpoint: params.uploadEndpoint,
uploadId: verifyRes.uploadId,
fileHash,
chunkIndex: c.index,
chunk: c.blob,
signal
}),
{retries: 3, baseDelayMs: 300, signal}
)
const ms = performance.now() - start
params.pool.setConcurrency(params.tuner.onResult({ok: true, ms}))
} catch (e) {
const ms = performance.now() - start
params.pool.setConcurrency(params.tuner.onResult({ok: false, ms}))
throw e
}
})
)

await Promise.all(jobs)
await params.pool.waitForIdle()

// 3) Merge
const mergeRes = await fetch(params.mergeEndpoint, {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({
uploadId: verifyRes.uploadId,
fileHash,
fileName: params.file.name,
totalChunks: chunks.length
}),
signal: params.pool.signal
}).then((r) => r.json())

return mergeRes.fileUrl as string
}

4.8 暂停 / 恢复:实践建议

暂停与恢复最好按“以服务端为准”来做:

  • 暂停
    • pool.pause():停止调度新任务
    • pool.cancelAll():取消正在上传的请求(立即停)
  • 恢复
    • 重新创建 PromisePool(新 AbortController
    • 再次调用 Verify 获取 uploadedChunkIndexes(服务端是事实来源)
    • 只上传缺失分片,然后 Merge

如果你希望“刷新页面也能恢复”,把 uploadId/fileHash/chunkSize/文件元信息 持久化到 IndexedDB(推荐)或 localStorage,恢复时按同样逻辑走 Verify 即可。


5. 服务端实现要点(高可用的关键在这里)

即使本文主要讲前端,高可用要成立,后端也必须配合做对:

5.1 幂等与一致性

  • UploadChunk(uploadId, chunkIndex) 作为幂等键:重复上传直接返回 ok
  • 分片写入建议先落到临时文件,再原子 rename(防止半写入)
  • Merge 做“齐全性检查” + “合并锁”:
    • 检查 0..totalChunks-1 是否都存在
    • 加锁防止多实例并发 merge
    • 合并完成后校验最终 fileHash

5.2 多实例与共享存储

如果上传服务是多实例(k8s/多机房),分片存储不能放在单机本地盘:

  • 优先:对象存储(S3/OSS/COS)+ 元数据存 DB/Redis
  • 或:共享文件系统(NFS/CephFS)+ 元数据存 DB

目标是:任何一台上传实例都能回答 Verify,并能继续接收上传与发起合并。

5.3 垃圾回收(GC)

断点续传会产生“未完成的上传会话”,必须有清理策略:

  • uploadId 设置 TTL(例如 24h/72h)
  • 定时任务清理过期分片与元数据
  • Merge 成功后立即清理分片

6. 安全与风控(别忽略)

  • 鉴权:Verify/Upload/Merge 都必须校验用户身份与权限(尤其是 merge)
  • 隔离:分片存储路径应包含 userId/tenantId,避免 hash 相同导致跨用户读取
  • Hash 碰撞:如果用 MD5 仅作“定位”,后端仍应做更强的完整性校验(如 SHA-256 + 文件大小)
  • 文件类型与大小限制:后端必须做校验,避免被当作“免费网盘/木马分发”
  • 限流:后端按用户/IP 限流,前端并发池是“礼貌”,后端限流是“底线”

7. 可观测与验收(上线是否真变好)

建议埋点/监控指标:

  • Verify 命中率:秒传率、续传率
  • 分片上传成功率、平均耗时、P95/P99
  • 失败原因分布:超时/断网/4xx/5xx/429
  • 并发度变化曲线:是否频繁降档(弱网用户占比)
  • Merge 成功率与耗时

验收建议(可以写进方案验收标准):

  • 弱网模拟(限速/高延迟/丢包)下成功率提升多少
  • 暂停后 1 秒内是否能“立即停”(请求取消是否生效)
  • 断网重连后是否能继续(Verify 能否正确返回已上传分片)

8. 常见坑与排查思路

  • Hash 计算卡 UI:把 Hash 放 Worker;或者分片/抽样计算;给用户进度提示
  • 并发过大反而更慢:带宽被切碎、重传变多;并发池 + 动态并发才是正解
  • 暂停只停“新任务”不取消“在途请求”:用户体验会很差;必须配合取消令牌
  • Merge 时分片不齐:通常是重试/幂等没做对;Verify/Upload 返回值要可靠
  • 跨用户秒传越权:不要用单纯 fileHash 作为“谁都能拿到同一文件”的依据

面试高频问答

1) 为什么要设计 Verify 预检?

因为它决定了是否秒传是否续传以及需要上传哪些分片。没有 Verify,你只能“盲传”,失败后也很难精确续传。

2) 断点续传的“精准”体现在哪里?

服务端返回 uploadedChunkIndexes,客户端只补传缺的分片;并且 Upload 端点幂等,重试不会造成重复数据或异常。

3) 为什么不能直接 Promise.all 并发上传所有分片?

会造成请求风暴:浏览器连接被占满、带宽被切碎、弱网下失败率飙升,同时后端也会被大量小请求压垮。请求池把并发控制在合理阈值内,整体更稳更快。

4) 并发数一般怎么选?为什么要“动态并发”?

常见范围 3~6(与分片大小、后端能力、用户网络相关)。动态并发能在弱网/抖动时自动降档止血,在稳定时逐步升档,提高吞吐与成功率的综合表现。

5) 暂停/恢复怎么实现才算“真暂停”?

真暂停要同时做到:停止调度新任务 + 取消在途请求(Abort/CancelToken)。恢复时以 Verify 为准,避免重复上传或漏传。

6) Upload 接口为什么必须幂等?

因为重试是常态:超时、断网、页面切后台都会导致重试。幂等能保证重复请求不会破坏状态,提升系统鲁棒性。

7) 分片大小如何影响体验?

分片越小:请求越多、开销越大,但失败重传成本低;分片越大:请求更少、吞吐更高,但失败重传成本大。一般在 2MB~10MB 之间找平衡,并用监控数据校准。

8) 只用 MD5 做 fileHash 有问题吗?

MD5 更快但抗碰撞弱。很多业务用它做“定位/续传”是可以的,但后端最好再做强校验(如 SHA-256 + 文件大小),并做好鉴权隔离,避免安全风险。