控制并发执行函数
typescript
type TTask = (...args: any[]) => Promise<any>;
type TTaskResult = {
status: 'resolved' | 'rejected';
value?: any;
reason?: any;
}
type TParallelCallResult = {
resolved: ITaskResult[];
rejected: ITaskResult[];
}
export const parallelCallTasks = (tasks: TTask[], concurrentNum: number) => {
return new Promise((resolve, reject) => {
const queue: (() => TTask)[] = [];
const resolved: ITaskResult[] = [];
const rejected: ITaskResult[] = [];
let idx = 0;
let finishedIdx = 0;
const next = (task: TTask) => {
idx++;
task()
.then(res => {
resolved.push({ status: 'resolved', value: res })
})
.catch(reason => {
rejected.push({ status: 'rejected', reason })
})
.finally(() => {
if(finishedIdx++ === tasks.length - 1) {
resolve({ resolved, rejected });
return;
}
idx--;
const fn = queue.shift();
if(!!fn) {
fn();
}
})
}
tasks.forEach(task => {
if (idx >= concurrentNum) {
queue.push(() => next(task))
return
}
next(task)
})
})
}
(控制并发执行函数)优化版本
typescript
type TTask = (...args: any[]) => Promise<any>;
type TTaskResult = {
status: 'resolved' | 'rejected';
value?: any;
reason?: any;
};
type TParallelCallResult = {
resolved: TTaskResult[];
rejected: TTaskResult[];
};
export const parallelCallTasks = (
tasks: TTask[],
concurrentNum: number
): Promise<TParallelCallResult> => {
// 返回一个 Promise,并明确其解析值的类型
return new Promise((resolve) => {
// 1. 边界条件处理:任务为空或并发数无效,直接返回空结果
if (tasks.length === 0 || concurrentNum <= 0) {
resolve({ resolved: [], rejected: [] });
return;
}
const resolved: TTaskResult[] = [];
const rejected: TTaskResult[] = [];
let taskIndex = 0; // 下一个要执行的任务索引
let finishedCount = 0; // 已完成的任务数量
// 2. “工人”函数,负责执行任务
const run = async () => {
// 当还有任务需要执行时
while (taskIndex < tasks.length) {
const currentIndex = taskIndex++; // 领取当前任务的索引,并预备下一个
const task = tasks[currentIndex];
try {
const value = await task();
resolved.push({ status: 'resolved', value });
} catch (reason) {
rejected.push({ status: 'rejected', reason });
} finally {
finishedCount++;
// 3. 检查是否所有任务都已完成
if (finishedCount === tasks.length) {
resolve({ resolved, rejected });
}
}
}
};
// 4. 启动指定数量的“工人”开始并行处理任务
const concurrency = Math.min(concurrentNum, tasks.length);
for (let i = 0; i < concurrency; i++) {
run();
}
});
};