Oasis's Cloud

一个人的首要责任,就是要有雄心。雄心是一种高尚的激情,它可以采取多种合理的形式。
—— 《一个数学家的辩白》

React scheduler

基于 v19.2.0 版本

作者:oasis


调度器的源码位于 React 代码仓库 packages/scheduler 目录。其结构如下:

scheduler
|-src
    |-forks
|-index.js

index.js 作为入口文件,导出了 forks 下 Scheduler.js 的方法和变量。 在 Scheduler.js 的第一行导入了调度器优先级的定义:

export const NoPriority = 0; // 无优先级:默认状态,通常表示任务尚未分配优先级
export const ImmediatePriority = 1; // 立即优先级:最高优先级
export const UserBlockingPriority = 2; // 用户阻塞优先级:高优先级;用于用户交互相关的任务。例如:按钮点击、表单提交等用户直接触发的操作。
export const NormalPriority = 3; // 普通优先级:默认优先级,例如:数据获取、状态更新等
export const LowPriority = 4; // 低优先级:可延迟执行的任务。例如:预加载、分析数据上报等
export const IdlePriority = 5; // 空闲优先级:最低优先级。仅在浏览器空闲时执行

优先级控制算法

Scheduler 的优先级通过 packages/scheduler/src/SchedulerMinHeap.js 实现的最小堆进行管理。最小堆是基于完全二叉树实现的。因为完全二叉树具有如下特点:

  1. 可以用数组存储顺序存储
完全二叉树:
          1[0]
        /     \
      2[1]    3[2]
     /   \   /
   4[3] 5[4] 6[5]

对应的数组:[1, 2, 3, 4, 5, 6]
索引:     [0, 1, 2, 3, 4, 5]
  1. 父子节点索引关系固定

对于索引为 i 的节点: - 父节点索引:(i - 1) >>> 1 或 Math.floor((i - 1) / 2) 在 siftUp 方法中对应的代码:const parentIndex = (index - 1) >>> 1; - 左子节点索引:i * 2 + 1 - 右子节点索引:i * 2 + 2

  1. 数组长度等于节点个数

执行示例可参看:最小堆执行过程

调度器的工作流程

Scheduler.js 中的初始化代码如下:

let schedulePerformWorkUntilDeadline;
if (typeof localSetImmediate === 'function') {
    // Node.js 和旧版 IE。
    // 我们更倾向于使用 setImmediate 有几个原因。
    //
    // 与 MessageChannel 不同,它不会阻止 Node.js 进程退出。
    // (尽管这是 Scheduler 的 DOM 分支,但在 Node.js 15+(它有 MessageChannel)和 jsdom 的混合环境中,你可能会来到这里。)
    // https://github.com/facebook/react/issues/20756
    //
    // 另外,它运行得更早,这正是我们想要的语义。
    // 如果其他浏览器实现了它,最好使用它。
    // 尽管这两种方式都不如原生调度。
    schedulePerformWorkUntilDeadline = () => {
        localSetImmediate(performWorkUntilDeadline);
    };
} else if (typeof MessageChannel !== 'undefined') {
    // DOM 和 Worker 环境。
    // 我们更倾向于使用 MessageChannel,因为它避免了 setTimeout 的 4ms 限制。
    const channel = new MessageChannel();
    const port = channel.port2;
    channel.port1.onmessage = performWorkUntilDeadline;
    schedulePerformWorkUntilDeadline = () => {
        port.postMessage(null);
    };
} else {
    // 我们应该只在非浏览器环境中回退到这里。
    schedulePerformWorkUntilDeadline = () => {
        localSetTimeout(performWorkUntilDeadline, 0);
    };
}

在这段代码中会根据不同的环境进入不同的实现中。 因为本文的实验在Chrome 中运行,所以只关注 MessageChannel 的流程。

在 MessageChannel 的逻辑中初始化了 schedulePerformWorkUnitlDeadline 方法,所以这里需要有方法调用它,MessageChannel 才能工作。

调用schedulePerformWorkUnitlDeadline 的关键是 unstable_scheduleCallback

react-reconciler 包中的 scheduleTaskForRootDuringMicrotask() 会调用 scheduleCallback,给 schedule 增加待处理的任务。

const newCallbackNode = scheduleCallback(
      schedulerPriorityLevel,
      performWorkOnRootViaSchedulerTask.bind(null, root),
    );

这样 react-reconciler 和 scheduler 就协同起来了。

除了 scheduleTaskForRootDuringMicrotask外,还有其他地方会调用 scheduleCallback,例如:commitRootflushPassiveEffectsImpl等。

unstable_scheduleCallback() 会创建 Task 对象,计算过期时间,根据 startTime 决定放入哪个队列: - startTime > currentTime → timerQueue(延迟任务) - startTime <= currentTime → taskQueue(立即任务)

之后调用 requestHostCallback()

调用 schedulePerformWorkUntilDeadline() - Node.js: setImmediate() - 浏览器: MessageChannel.postMessage() - 回退: setTimeout()

performWorkUntilDeadline() [异步执行]

const performWorkUntilDeadline = () => {
  if (enableRequestPaint) {
    needsPaint = false;
  }
  if (isMessageLoopRunning) {
    const currentTime = getCurrentTime();
    // 记录开始时间,以便我们可以测量主线程被阻塞了多长时间。
    startTime = currentTime;

    // 如果调度器任务抛出错误,退出当前浏览器任务,以便可以观察到错误。
    //
    // 故意不使用 try-catch,因为这会使某些调试技术变得更困难。
    // 相反,如果 `flushWork` 出错,那么 `hasMoreWork` 将保持为 true,我们会继续工作循环。
    let hasMoreWork = true;
    try {
      hasMoreWork = flushWork(currentTime);
    } finally {
      if (hasMoreWork) {
        // 如果还有更多工作,在前一个消息事件的末尾调度下一个消息事件。
        schedulePerformWorkUntilDeadline();
      } else {
        isMessageLoopRunning = false;
      }
    }
  }
};
function flushWork(initialTime: number) {
  return workLoop(initialTime);
}

flushWork(initialTime) - 设置 isPerformingWork = true - 调用 workLoop(initialTime) - 清理状态和标记

function workLoop(initialTime: number) {
  let currentTime = initialTime;
  advanceTimers(currentTime);
  currentTask = peek(taskQueue);
  while (currentTask !== null) {
    if (!enableAlwaysYieldScheduler) {
      if (currentTask.expirationTime > currentTime && shouldYieldToHost()) {
        break;
      }
    }
    const callback = currentTask.callback;
    if (typeof callback === 'function') {
      currentTask.callback = null;
      currentPriorityLevel = currentTask.priorityLevel;
      const didUserCallbackTimeout = currentTask.expirationTime <= currentTime;
      const continuationCallback = callback(didUserCallbackTimeout);
      currentTime = getCurrentTime();
      if (typeof continuationCallback === 'function') {
        // If a continuation is returned, immediately yield to the main thread
        // regardless of how much time is left in the current time slice.
        currentTask.callback = continuationCallback;
        advanceTimers(currentTime);
        return true;
      } else {
        if (currentTask === peek(taskQueue)) {
          pop(taskQueue);
        }
        advanceTimers(currentTime);
      }
    } else {
      pop(taskQueue);
    }
    currentTask = peek(taskQueue);
    if (enableAlwaysYieldScheduler) {
      if (currentTask === null || currentTask.expirationTime > currentTime) {
        // This currentTask hasn't expired we yield to the browser task.
        break;
      }
    }
  }
  // Return whether there's additional work
  if (currentTask !== null) {
    return true;
  } else {
    return false;
  }
}

workLoop(currentTime) [核心工作循环] - advanceTimers(): 将到期的延迟任务移到 taskQueue - 循环处理 taskQueue 中的任务: - peek(taskQueue) 获取最高优先级任务 - 检查是否应该让出控制权 (shouldYieldToHost) - 执行任务的 callback - 如果 callback 返回 continuation,继续调度 - 任务完成后从队列移除 - 返回是否有更多任务 (hasMoreWork)

如果有更多任务,继续调度 schedulePerformWorkUntilDeadline, 否则停止工作循环:isMessageLoopRuning = false