Skip to content
On this page

控制并发执行函数

typescript
type TTask = (...args: any[]) => Promise<any>;

type TTaskResult = {
	status: 'resolved' | 'rejected';
	value?: any;
	reason?: any;
}

type TParallelCallResult = {
	resolved: ITaskResult[];
	rejected: ITaskResult[];
}

export const parallelCallTasks = (tasks: TTask[], concurrentNum: number) => {
	return new Promise((resolve, reject) => {
	  const queue: (() => TTask)[] = [];
	  const resolved: ITaskResult[] = [];
	  const rejected: ITaskResult[] = [];
	  let idx = 0;
	  let finishedIdx = 0;
	  const next = (task: TTask) => {
		  idx++;
		  task()
          .then(res => {
              resolved.push({ status: 'resolved', value: res })
          })
          .catch(reason => {
              rejected.push({ status: 'rejected', reason })
          })
          .finally(() => {
	          if(finishedIdx++ === tasks.length - 1) {
		          resolve({ resolved, rejected });
		          return;
	          }
              idx--;
              const fn = queue.shift();
              if(!!fn) {
                  fn();
              }
          })
    }
	tasks.forEach(task => {
		if (idx >= concurrentNum) {
			queue.push(() => next(task))
            return
        }
		next(task)
    })
  })
}

(控制并发执行函数)优化版本

typescript
type TTask = (...args: any[]) => Promise<any>;

type TTaskResult = {
    status: 'resolved' | 'rejected';
    value?: any;
    reason?: any;
};

type TParallelCallResult = {
    resolved: TTaskResult[];
    rejected: TTaskResult[];
};

export const parallelCallTasks = (
    tasks: TTask[],
    concurrentNum: number
): Promise<TParallelCallResult> => {
    // 返回一个 Promise,并明确其解析值的类型
    return new Promise((resolve) => {
        // 1. 边界条件处理:任务为空或并发数无效,直接返回空结果
        if (tasks.length === 0 || concurrentNum <= 0) {
            resolve({ resolved: [], rejected: [] });
            return;
        }

        const resolved: TTaskResult[] = [];
        const rejected: TTaskResult[] = [];
        let taskIndex = 0; // 下一个要执行的任务索引
        let finishedCount = 0; // 已完成的任务数量

        // 2. “工人”函数,负责执行任务
        const run = async () => {
            // 当还有任务需要执行时
            while (taskIndex < tasks.length) {
                const currentIndex = taskIndex++; // 领取当前任务的索引,并预备下一个
                const task = tasks[currentIndex];

                try {
                    const value = await task();
                    resolved.push({ status: 'resolved', value });
                } catch (reason) {
                    rejected.push({ status: 'rejected', reason });
                } finally {
                    finishedCount++;
                    // 3. 检查是否所有任务都已完成
                    if (finishedCount === tasks.length) {
                        resolve({ resolved, rejected });
                    }
                }
            }
        };

        // 4. 启动指定数量的“工人”开始并行处理任务
        const concurrency = Math.min(concurrentNum, tasks.length);
        for (let i = 0; i < concurrency; i++) {
            run();
        }
    });
};