Mobx 源码解读(四) Reaction

Reaction 是一类的特殊的 Derivation,可以注册响应函数,使之在条件满足时自动执行。常用于触发副作用,比如打印日志、更新 DOM 或者发送网络请求。

Reaction 的生命周期可以用下图表示:

Reaction示意

  1. Reaction 在创建之后初次执行或依赖过期时,会加入到全局的 pendingReactions 队列中
  2. pendingReactions 中的 Reaction 重新执行
  3. onInvalidate 调用,执行响应函数。
  4. 响应函数执行,同时收集依赖,即「运行时依赖收集」

使用 autorun, when 等 api,创建 Reaction 后,会立即执行该 Reaction 的 schedule 方法。第二篇中提到,Observable 在 reportChanged 时,也会调用这个方法将 Reaction 加入到全局的待执行队列中:

1
2
3
4
5
6
7
schedule() {
if (!this._isScheduled) {
this._isScheduled = true
globalState.pendingReactions.push(this)
runReactions()
}
}

与之前不同,这时的 runReactions 会立即执行 pendingReactions 中的所有 Reaction:

1
2
3
4
5
function runReactions() {
// 此时不在事务当中,且没有 Reaction 正在执行
if (globalState.inBatch > 0 || globalState.isRunningReactions) return
reactionScheduler(runReactionsHelper)
}

Reaction 初次执行时会第一次收集依赖,此后当依赖发生变化时,它被加入 pendingReactions 中,并在下一次事务结束时重新执行,更新依赖。接下来看看 Reaction 执行过程中是如何完成依赖的收集和更新的:

Reaction 执行过程

runReactionsHelper 依次执行 pendingReactions 中所有的 Reaction:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 设定 Reaction 计算的最大迭代次数,避免 Reaction 重新触发自身造成死循环
const MAX_REACTION_ITERATIONS = 100

// 实际执行 Reaction 重新计算的函数
function runReactionsHelper() {
globalState.isRunningReactions = true
const allReactions = globalState.pendingReactions
let iterations = 0

// 当执行 Reaction 时,可能触发新的 Reaction(Reaction 内允许设置 Observable的值),加入到 pendingReactions 中
// 所以这里使用了两层循环,外层检查确保新加入的 Reaction 也会得到执行
// 内层每次清空当前 Reaction 数组,并迭代处理当前所有 Reaction
while (allReactions.length > 0) {
// 限制最大迭代次数
if (++iterations === MAX_REACTION_ITERATIONS) {
console.error(
`Reaction doesn't converge to a stable state after ${MAX_REACTION_ITERATIONS} iterations.` +
` Probably there is a cycle in the reactive function: ${allReactions[0]}`
)
allReactions.splice(0) // 清空 pendingReactions 数组
}
// 清空 pendingReactions 数组,遍历所有 pendingReactions
let remainingReactions = allReactions.splice(0)
for (let i = 0, l = remainingReactions.length; i < l; i++)
// 依次调用 runReaction 方法
remainingReactions[i].runReaction()
}
globalState.isRunningReactions = false
}

pendingReactions 中的 Reaction 依次调用 runReaction 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
runReaction() {
if (!this.isDisposed) {
startBatch()
// 「是否待重新计算」的标志置为 false
this._isScheduled = false
// 根据 dependenciesState 判断是否需要重新计算,-1,0,2三种状态好判断,
// 计算值特有的1状态参看本系列第三篇文章
if (shouldCompute(this)) {
// 「正在收集依赖过程中」的标志置为 true
this._isTrackPending = true

// 构造时传入的 onInvalidate
this.onInvalidate()
// 通知 spy
if (this._isTrackPending && isSpyEnabled()) {
spyReport({
object: this,
type: "scheduled-reaction"
})
}
}
endBatch()
}
}

注意这里调用了 onInvalidate,该函数可以在响应函数执行之前做一些判断,控制响应函数执行的「时机」。

Mobx 提供了 autorun, when, autorunAsync 等 api 用于创建 Reaction,它们的区别就在于构造 Reaction 时传入的 onInvalidate 函数不同:

不同类型的 Reaction

autorun

autorun 的 onInvalidate 会直接执行 Reaction 的 track 方法,也就是说,只要 Reaction 执行,就会执行响应函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
function autorun(arg1: any, arg2: any, arg3?: any) {
let name: string, view: (r: IReactionPublic) => any, scope: any
// 参数处理...

// 可以提供 this 值作为第三个参数
if (scope) view = view.bind(scope)

// 第二个参数即`onInvalidate`
// onInvalidate 未进行任何判断,直接调用 track 方法
const reaction = new Reaction(name, function() {
this.track(reactionRunner)
})

// view 即我们传入的函数,可以拿到 Reaction 实例作为参数
function reactionRunner() {
view(reaction)
}

// 调用 schedule 方法,第一次执行并收集依赖
reaction.schedule()

// 返回该 Reaction 的 Disposer
return reaction.getDisposer()
}

autorunAsync

autorunAsync(action: () => void, minimumDelay?: number, scope?)

节流版 autorun,在 minimumDelay 时间内只会执行一次响应函数。通过在 onInvalidate 内做节流处理来实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
function autorunAsync(arg1: any, arg2: any, arg3?: any, arg4?: any) {
let name: string, func: (r: IReactionPublic) => any, delay: number, scope: any
// 参数处理...

if (scope) func = func.bind(scope)

// 节流处理
let isScheduled = false
const r = new Reaction(name, () => {
if (!isScheduled) {
isScheduled = true
setTimeout(() => {
isScheduled = false
if (!r.isDisposed) r.track(reactionRunner)
}, delay)
}
})

function reactionRunner() {
func(r)
}

r.schedule()
return r.getDisposer()
}

when

when(debugName?, predicate: () => boolean, effect: () => void, scope?)

when 在 predicate 函数返回 true 时执行,通过在 onInvalidate 进行这个判断来实现。另外,它只执行一次就销毁,所以它的 onInvalidate 不调用 track 去收集依赖,直接执行响应函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
function when(arg1: any, arg2: any, arg3?: any, arg4?: any) {
let name: string, predicate: () => boolean, effect: Lambda, scope: any
// 参数处理...

const disposer = autorun(name, r => {
// 满足 predicate
if (predicate.call(scope)) {
r.dispose()
const prevUntracked = untrackedStart()
// 直接执行 effect
;(effect as any).call(scope)
untrackedEnd(prevUntracked)
}
})
return disposer
}

运行时依赖收集

Reaction 的 track 方法会执行响应函数并进行依赖收集,也就是「运行时依赖收集」的过程。

track 方法本身只是做了开始一个新事务,一些标志属性的修改和通知 spy 等工作。

运行时依赖收集的核心步骤在 trackDerivedFunction 函数中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 执行响应函数,同时收集依赖
function trackDerivedFunction<T>(derivation: IDerivation, f: () => T, context) {
// 将该 Derivation 的 dependenciesState 和当前所有依赖的 lowestObserverState 设为最新
changeDependenciesStateTo0(derivation)

// 预分配足够的空间??
derivation.newObserving = new Array(derivation.observing.length + 100)
// 记录新的依赖的数量
derivation.unboundDepsCount = 0
// 每次执行都分配一个 uid
derivation.runId = ++globalState.runId
// 当前 Derivation 记录到全局的 trackingDerivation 中,这样被观察的 Observable 在其 reportObserved 方法中
// 就能获取到该 Derivation
const prevTracking = globalState.trackingDerivation
globalState.trackingDerivation = derivation
let result
try {
// 执行响应函数,收集使用到的所有依赖,加入 newObserving 数组中
result = f.call(context)
} catch (e) {
result = new CaughtException(e)
}
globalState.trackingDerivation = prevTracking

// 比较新旧依赖,更新依赖
bindDependencies(derivation)
return result
}

注意响应函数调用时不仅仅是执行了该函数,还触发了所有被观察的 Observable 的 reportObserved 方法,从而更新了当前 Derivation 的 newObserving 数组。这部分内容可以回顾第二篇。

再来看看 bindDependencies 方法如何更新依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
function bindDependencies(derivation: IDerivation) {
const prevObserving = derivation.observing
const observing = (derivation.observing = derivation.newObserving!)
// 记录更新依赖过程中,新观察的 Derivation 的最新状态
let lowestNewObservingDerivationState = IDerivationState.UP_TO_DATE

// 遍历新的 observing 数组,使用 diffValue 这个属性来辅助 diff 过程:
// 所有 Observable 的 diffValue 初值都是0(要么刚被创建,继承自 BaseAtom 的初值0;
// 要么经过上次的 bindDependencies 后,置为了0)
// 如果 diffValue 为0,保留该 Observable,并将 diffValue 置为1
// 如果 diffValue 为1,说明是重复的依赖,无视掉
let i0 = 0,
l = derivation.unboundDepsCount
for (let i = 0; i < l; i++) {
const dep = observing[i]
if (dep.diffValue === 0) {
dep.diffValue = 1
// i0 不等于 i,即前面有重复的 dep 被无视,依次往前移覆盖
if (i0 !== i) observing[i0] = dep
i0++
}

// 更新 lowestNewObservingDerivationState
if (((dep as any) as IDerivation).dependenciesState > lowestNewObservingDerivationState) {
lowestNewObservingDerivationState = ((dep as any) as IDerivation).dependenciesState
}
}
observing.length = i0

derivation.newObserving = null
// 遍历 prevObserving 数组,检查 diffValue:(经过上一次的 bindDependencies后,该数组中不会有重复)
// 如果为0,说明没有在 newObserving 中出现,调用 removeObserver 将 dep 和 derivation 间的联系移除
// 如果为1,依然被观察,将 diffValue 置为0(在下面的循环有用处)
l = prevObserving.length
while (l--) {
const dep = prevObserving[l]
if (dep.diffValue === 0) {
removeObserver(dep, derivation)
}
dep.diffValue = 0
}

// 再次遍历新的 observing 数组,检查 diffValue
// 如果为0,说明是在上面的循环中置为了0,即是本来就被观察的依赖,什么都不做
// 如果为1,说明是新增的依赖,调用 addObserver 新增依赖,并将 diffValue 置为0,为下一次 bindDependencies 做准备
while (i0--) {
const dep = observing[i0]
if (dep.diffValue === 1) {
dep.diffValue = 0
addObserver(dep, derivation)
}
}

// 某些新观察的 Derivation 可能在依赖更新过程中过期
// 避免这些 Derivation 没有机会传播过期的信息(#916)
if (lowestNewObservingDerivationState !== IDerivationState.UP_TO_DATE) {
derivation.dependenciesState = lowestNewObservingDerivationState
derivation.onBecomeStale()
}
}

朴素算法比较新旧 observing 数组的时间复杂度为 O(n^2),这里借助 diffValue 属性的辅助将复杂度降到了 O(n)。

这样 Reaction 的执行完成,其依赖也得到了更新。当依赖发生变化后,Reaction 会被加入 pendingReactions 中,并重复上述过程。