1112 字
6 分钟
JavaScript Promise 并发限制

题目描述
实现一个 Promise 并发控制器,要求:
- 支持限制同时执行的 Promise 数量
- 任务队列管理,按添加顺序执行
- 支持动态添加新任务
- 返回每个任务的结果(按执行顺序)
- 支持暂停/恢复执行
- 提供任务状态监控
要求完整实现并发控制逻辑,并通过测试用例验证。
解题思路
- 队列管理:使用任务队列存储待执行函数
- 并发控制:维护当前执行中的任务计数
- 任务调度:当执行数小于限制数时取出任务执行
- 结果收集:按任务顺序存储结果
- 状态管理:跟踪任务完成/失败状态
- 流程控制:提供暂停/恢复机制
关键洞察
- 生产者-消费者模型:任务队列作为缓冲区
- 递归调度:任务完成时自动触发下一个任务
- 状态分离:任务状态与执行结果独立管理
- 原子操作:使用锁机制防止并发问题
- 错误隔离:单个任务失败不影响整体执行
- 资源优化:避免内存泄漏和资源耗尽
代码流程
flowchart TD A[添加任务] --> B[加入队列] C[执行器] --> D{并发数未满?} D -->|是| E[取出任务执行] D -->|否| F[等待] E --> G[更新执行计数] G --> H[任务完成] H --> I[保存结果] I --> J[减少执行计数] J --> D
代码实现
class PromisePool { constructor(concurrency = 5) { this.concurrency = concurrency; this.queue = []; this.running = 0; this.results = []; this.paused = false; this.pendingResolve = null; this.drainedResolve = null; this.taskStates = new Map(); // 任务ID -> 状态 this.nextTaskId = 1; }
// 添加任务 add(taskFn, ...args) { const taskId = this.nextTaskId++; this.taskStates.set(taskId, 'pending');
return new Promise((resolve, reject) => { this.queue.push({ taskId, task: () => taskFn(...args), resolve, reject, args });
this.taskStates.set(taskId, 'queued'); this._maybeExecute(); }); }
// 执行任务 _execute({ taskId, task, resolve, reject }) { if (this.paused) return;
this.running++; this.taskStates.set(taskId, 'running');
Promise.resolve() .then(task) .then(result => { resolve(result); this.results.push(result); this.taskStates.set(taskId, 'fulfilled'); return result; }) .catch(error => { reject(error); this.taskStates.set(taskId, 'rejected'); return error; }) .finally(() => { this.running--; this.taskStates.delete(taskId);
if (this.pendingResolve) { this.pendingResolve(); this.pendingResolve = null; }
this._maybeExecute();
if (this.queue.length === 0 && this.running === 0 && this.drainedResolve) { this.drainedResolve(this.results); } }); }
// 调度任务 _maybeExecute() { while (this.running < this.concurrency && this.queue.length > 0 && !this.paused) { const task = this.queue.shift(); this._execute(task); } }
// 暂停执行 pause() { this.paused = true; }
// 恢复执行 resume() { this.paused = false; this._maybeExecute(); }
// 等待所有任务完成 async drain() { if (this.queue.length === 0 && this.running === 0) { return this.results; }
return new Promise(resolve => { this.drainedResolve = resolve; }); }
// 等待空闲槽位 async onIdle() { if (this.running < this.concurrency) return;
return new Promise(resolve => { this.pendingResolve = resolve; }); }
// 获取任务状态 getTaskState(taskId) { return this.taskStates.get(taskId) || 'unknown'; }
// 获取统计信息 getStats() { return { total: this.results.length + this.running + this.queue.length, completed: this.results.length, running: this.running, queued: this.queue.length, concurrency: this.concurrency }; }}
使用示例
// 模拟异步任务const delay = (ms, value) => new Promise(resolve => setTimeout(() => { console.log(`Completed: ${value}`); resolve(value); }, ms));
// 创建并发池(最大并发数=2)const pool = new PromisePool(2);
// 添加任务pool.add(delay, 1000, 'Task 1');pool.add(delay, 1500, 'Task 2');pool.add(delay, 500, 'Task 3');pool.add(delay, 800, 'Task 4');
// 获取任务状态console.log(pool.getTaskState(3)); // "queued"
// 动态添加任务setTimeout(() => { pool.add(delay, 300, 'Task 5');}, 1200);
// 暂停/恢复setTimeout(() => { console.log('Pausing pool'); pool.pause();
setTimeout(() => { console.log('Resuming pool'); pool.resume(); }, 1000);}, 800);
// 等待所有任务完成pool.drain().then(results => { console.log('All tasks completed:', results); // 输出: ['Task 1', 'Task 2', 'Task 3', 'Task 4', 'Task 5']});
// 输出:// Completed: Task 1 (1000ms)// Completed: Task 3 (500ms)// Completed: Task 2 (1500ms)// Completed: Task 4 (800ms)// Completed: Task 5 (300ms)
业务场景
- API 请求控制:限制同时发出的 HTTP 请求数量
- 文件处理:控制同时读取/写入的文件数量
- 爬虫程序:限制并发爬取页面数量
- 数据库操作:防止过多并发连接拖垮数据库
- 图片批量处理:限制同时处理的图片数量
- 批量数据处理:大数据处理时控制内存使用
性能优势:
- 防止资源耗尽导致系统崩溃
- 优化网络请求避免触发限流
- 平衡负载提高整体处理效率
- 避免浏览器标签页卡顿或无响应
- 提高系统稳定性与可靠性
相似题目
- Promise 链式控制(中等) 核心:Promise.then 顺序执行控制
- 请求重试机制(中等) 核心:错误捕获 + 指数退避重试
- 任务优先级队列(困难) 核心:优先级队列 + 抢占式调度
- 异步任务超时控制(中等) 核心:Promise.race + 超时拒绝
- 实现 Promise.all(中等) 核心:多 Promise 并行执行与结果收集
JavaScript Promise 并发限制
https://website-truelovings-projects.vercel.app/posts/code/javascript/javascript-promise-并发限制/