带并发限制的异步调度器

@温叶博客  May 28, 2023

题目

题目

分析

这道题的特点就是限制了同时执行的并发数目,如果一个任务完成了就取剩下的任务继续执行。我们就可以很容易的想到,定义一个队列,将一时间执行不了的任务先放到任务队列中存着呗,等之前任务执行完再取一个执行,以此类推直至队列执行完。

需要注意到的难点是add方法是会返回一个promise的,但是任务啥时候执行到是不确定的,因此这里实现方法就是将返回promise的resolve方法传递给任务本身,任务执行完后调用下resolve即可。

代码实现

type Task<T> = (() => Promise<T>) & { resolve?: Function };

class Scheduler<T = number> {
  private restCount: number; // 剩余可执行次数
  private waitQueue: Task<T>[] = []; // 等待队列

  constructor(maxCount: number) {
    this.restCount = maxCount;
  }

  // 运行任务
  private runTask(task: Task<T>) {
    task().then((data) => {
      task.resolve!(data); // 执行resolve
      this.restCount++; // 执行后次数+1
      // 此时还有更多任务则递归执行
      if (this.restCount > 0 && this.waitQueue.length > 0) {
        const task = this.waitQueue.shift()!;
        this.runTask(task);
        this.restCount--; // 添加执行次数-1
      }
    });
  }

  // 添加任务
  add(task: Task<T>): Promise<T> {
    return new Promise((resolve) => {
      task.resolve = resolve; // 将resolve传递
      if (this.restCount > 0) {
        this.runTask(task);
        this.restCount--;
      } else {
        this.waitQueue.push(task);
      }
    });
  }
}

function timeout(ms: number, data: number): Promise<number> {
  return new Promise((resolve) => {
    setTimeout(() => {
      resolve(data);
    }, ms);
  });
}

const scheduler = new Scheduler(2);
function addTask(ms: number, data: number) {
  return scheduler.add(() => timeout(ms, data)).then((data) => console.log(data));
}

addTask(1000, 1);
addTask(500, 2);
addTask(300, 3);
addTask(400, 4);
// 2 3 1 4

添加新评论