1112 字
6 分钟
JavaScript Promise 并发限制

题目描述#

实现一个 Promise 并发控制器,要求:

  1. 支持限制同时执行的 Promise 数量
  2. 任务队列管理,按添加顺序执行
  3. 支持动态添加新任务
  4. 返回每个任务的结果(按执行顺序)
  5. 支持暂停/恢复执行
  6. 提供任务状态监控

要求完整实现并发控制逻辑,并通过测试用例验证。

解题思路#

  1. 队列管理:使用任务队列存储待执行函数
  2. 并发控制:维护当前执行中的任务计数
  3. 任务调度:当执行数小于限制数时取出任务执行
  4. 结果收集:按任务顺序存储结果
  5. 状态管理:跟踪任务完成/失败状态
  6. 流程控制:提供暂停/恢复机制

关键洞察#

  1. 生产者-消费者模型:任务队列作为缓冲区
  2. 递归调度:任务完成时自动触发下一个任务
  3. 状态分离:任务状态与执行结果独立管理
  4. 原子操作:使用锁机制防止并发问题
  5. 错误隔离:单个任务失败不影响整体执行
  6. 资源优化:避免内存泄漏和资源耗尽

代码流程#

flowchart TD 
	A[添加任务] --> B[加入队列] 
	C[执行器] --> D{并发数未满?} 
	D -->|是| E[取出任务执行] 
	D -->|否| F[等待] 
	E --> G[更新执行计数] 
	G --> H[任务完成] 
	H --> I[保存结果] 
	I --> J[减少执行计数] 
	J --> D

代码实现#

class PromisePool {
constructor(concurrency = 5) {
this.concurrency = concurrency;
this.queue = [];
this.running = 0;
this.results = [];
this.paused = false;
this.pendingResolve = null;
this.drainedResolve = null;
this.taskStates = new Map(); // 任务ID -> 状态
this.nextTaskId = 1;
}
// 添加任务
add(taskFn, ...args) {
const taskId = this.nextTaskId++;
this.taskStates.set(taskId, 'pending');
return new Promise((resolve, reject) => {
this.queue.push({
taskId,
task: () => taskFn(...args),
resolve,
reject,
args
});
this.taskStates.set(taskId, 'queued');
this._maybeExecute();
});
}
// 执行任务
_execute({ taskId, task, resolve, reject }) {
if (this.paused) return;
this.running++;
this.taskStates.set(taskId, 'running');
Promise.resolve()
.then(task)
.then(result => {
resolve(result);
this.results.push(result);
this.taskStates.set(taskId, 'fulfilled');
return result;
})
.catch(error => {
reject(error);
this.taskStates.set(taskId, 'rejected');
return error;
})
.finally(() => {
this.running--;
this.taskStates.delete(taskId);
if (this.pendingResolve) {
this.pendingResolve();
this.pendingResolve = null;
}
this._maybeExecute();
if (this.queue.length === 0 && this.running === 0 && this.drainedResolve) {
this.drainedResolve(this.results);
}
});
}
// 调度任务
_maybeExecute() {
while (this.running < this.concurrency && this.queue.length > 0 && !this.paused) {
const task = this.queue.shift();
this._execute(task);
}
}
// 暂停执行
pause() {
this.paused = true;
}
// 恢复执行
resume() {
this.paused = false;
this._maybeExecute();
}
// 等待所有任务完成
async drain() {
if (this.queue.length === 0 && this.running === 0) {
return this.results;
}
return new Promise(resolve => {
this.drainedResolve = resolve;
});
}
// 等待空闲槽位
async onIdle() {
if (this.running < this.concurrency) return;
return new Promise(resolve => {
this.pendingResolve = resolve;
});
}
// 获取任务状态
getTaskState(taskId) {
return this.taskStates.get(taskId) || 'unknown';
}
// 获取统计信息
getStats() {
return {
total: this.results.length + this.running + this.queue.length,
completed: this.results.length,
running: this.running,
queued: this.queue.length,
concurrency: this.concurrency
};
}
}

使用示例#

// 模拟异步任务
const delay = (ms, value) =>
new Promise(resolve => setTimeout(() => {
console.log(`Completed: ${value}`);
resolve(value);
}, ms));
// 创建并发池(最大并发数=2)
const pool = new PromisePool(2);
// 添加任务
pool.add(delay, 1000, 'Task 1');
pool.add(delay, 1500, 'Task 2');
pool.add(delay, 500, 'Task 3');
pool.add(delay, 800, 'Task 4');
// 获取任务状态
console.log(pool.getTaskState(3)); // "queued"
// 动态添加任务
setTimeout(() => {
pool.add(delay, 300, 'Task 5');
}, 1200);
// 暂停/恢复
setTimeout(() => {
console.log('Pausing pool');
pool.pause();
setTimeout(() => {
console.log('Resuming pool');
pool.resume();
}, 1000);
}, 800);
// 等待所有任务完成
pool.drain().then(results => {
console.log('All tasks completed:', results);
// 输出: ['Task 1', 'Task 2', 'Task 3', 'Task 4', 'Task 5']
});
// 输出:
// Completed: Task 1 (1000ms)
// Completed: Task 3 (500ms)
// Completed: Task 2 (1500ms)
// Completed: Task 4 (800ms)
// Completed: Task 5 (300ms)

业务场景#

  1. API 请求控制:限制同时发出的 HTTP 请求数量
  2. 文件处理:控制同时读取/写入的文件数量
  3. 爬虫程序:限制并发爬取页面数量
  4. 数据库操作:防止过多并发连接拖垮数据库
  5. 图片批量处理:限制同时处理的图片数量
  6. 批量数据处理:大数据处理时控制内存使用

性能优势

  • 防止资源耗尽导致系统崩溃
  • 优化网络请求避免触发限流
  • 平衡负载提高整体处理效率
  • 避免浏览器标签页卡顿或无响应
  • 提高系统稳定性与可靠性

相似题目#

  1. Promise 链式控制(中等) 核心:Promise.then 顺序执行控制
  2. 请求重试机制(中等) 核心:错误捕获 + 指数退避重试
  3. 任务优先级队列(困难) 核心:优先级队列 + 抢占式调度
  4. 异步任务超时控制(中等) 核心:Promise.race + 超时拒绝
  5. 实现 Promise.all(中等) 核心:多 Promise 并行执行与结果收集
JavaScript Promise 并发限制
https://website-truelovings-projects.vercel.app/posts/code/javascript/javascript-promise-并发限制/
作者
欢迎来到StarSky的网站!
发布于
2025-08-27
许可协议
CC BY-NC-SA 4.0