Atomics.waitAtomics.notifyAtomics.waitAsync

发布于 · 标签:ECMAScript ES2020

Atomics.waitAtomics.notify 是低级同步原语,可用于实现互斥锁和其他同步方法。但是,由于 Atomics.wait 是阻塞的,因此无法在主线程上调用它(尝试这样做会抛出 TypeError)。

从 8.7 版开始,V8 支持一个非阻塞版本,Atomics.waitAsync,它也可以在主线程上使用。

在这篇文章中,我们将解释如何使用这些低级 API 来实现一个既可以同步(用于工作线程)也可以异步(用于工作线程或主线程)的互斥锁。

Atomics.waitAtomics.waitAsync 接受以下参数

  • buffer:一个由 SharedArrayBuffer 支持的 Int32ArrayBigInt64Array
  • index:数组中的有效索引
  • expectedValue:我们期望在由 (buffer, index) 描述的内存位置中存在的值
  • timeout:以毫秒为单位的超时(可选,默认为 Infinity

Atomics.wait 的返回值是一个字符串。如果内存位置不包含预期值,Atomics.wait 会立即返回 'not-equal' 值。否则,线程将被阻塞,直到另一个线程使用相同的内存位置调用 Atomics.notify 或超时到达。在前一种情况下,Atomics.wait 返回 'ok' 值,在后一种情况下,Atomics.wait 返回 'timed-out' 值。

Atomics.notify 接受以下参数

  • 一个由 SharedArrayBuffer 支持的 Int32ArrayBigInt64Array
  • 一个索引(在数组中有效)
  • 要通知的等待者数量(可选,默认为 Infinity

它按 FIFO 顺序通知给定数量的等待者,这些等待者正在等待由 (buffer, index) 描述的内存位置。如果有多个与同一位置相关的 Atomics.wait 调用或 Atomics.waitAsync 调用挂起,它们都在同一个 FIFO 队列中。

Atomics.wait 相反,Atomics.waitAsync 始终立即返回。返回值是以下之一

  • { async: false, value: 'not-equal' }(如果内存位置不包含预期值)
  • { async: false, value: 'timed-out' }(仅适用于立即超时 0)
  • { async: true, value: promise }

该 Promise 稍后可能会解析为字符串值 'ok'(如果 Atomics.notify 使用相同的内存位置调用)或 'timed-out'(如果超时到达)。该 Promise 永远不会被拒绝。

以下示例演示了 Atomics.waitAsync 的基本用法

const sab = new SharedArrayBuffer(16);
const i32a = new Int32Array(sab);
const result = Atomics.waitAsync(i32a, 0, 0, 1000);
// | | ^ timeout (opt)
// | ^ expected value
// ^ index

if (result.value === 'not-equal') {
// The value in the SharedArrayBuffer was not the expected one.
} else {
result.value instanceof Promise; // true
result.value.then(
(value) => {
if (value == 'ok') { /* notified */ }
else { /* value is 'timed-out' */ }
});
}

// In this thread, or in another thread:
Atomics.notify(i32a, 0);

接下来,我们将展示如何实现一个既可以同步也可以异步使用的互斥锁。同步版本的互斥锁的实现之前已经讨论过,例如 在这篇博文中

在这个例子中,我们没有在 Atomics.waitAtomics.waitAsync 中使用超时参数。该参数可用于实现具有超时的条件变量。

我们的互斥锁类 AsyncLockSharedArrayBuffer 上运行,并实现了以下方法

  • lock — 阻塞线程,直到我们能够锁定互斥锁(仅可用于工作线程)
  • unlock — 解锁互斥锁(lock 的对应方法)
  • executeLocked(callback) — 非阻塞锁,主线程可以使用;安排 callback 在我们成功获取锁后执行

让我们看看如何实现这些方法。类定义包括常量和一个以 SharedArrayBuffer 作为参数的构造函数。

class AsyncLock {
static INDEX = 0;
static UNLOCKED = 0;
static LOCKED = 1;

constructor(sab) {
this.sab = sab;
this.i32a = new Int32Array(sab);
}

lock() {
/* … */
}

unlock() {
/* … */
}

executeLocked(f) {
/* … */
}
}

这里 i32a[0] 包含 LOCKEDUNLOCKED 值。它也是 Atomics.waitAtomics.waitAsync 的等待位置。AsyncLock 类确保以下不变式

  1. 如果 i32a[0] == LOCKED,并且一个线程开始等待(通过 Atomics.waitAtomics.waitAsynci32a[0],它最终将被通知。
  2. 在收到通知后,线程尝试获取锁。如果它获取了锁,它将在释放锁时再次通知。

同步锁定和解锁 #

接下来,我们将展示阻塞的 lock 方法,该方法只能从工作线程调用

lock() {
while (true) {
const oldValue = Atomics.compareExchange(this.i32a, AsyncLock.INDEX,
/* old value >>> */ AsyncLock.UNLOCKED,
/* new value >>> */ AsyncLock.LOCKED);
if (oldValue == AsyncLock.UNLOCKED) {
return;
}
Atomics.wait(this.i32a, AsyncLock.INDEX,
AsyncLock.LOCKED); // <<< expected value at start
}
}

当一个线程调用 lock() 时,它首先尝试通过使用 Atomics.compareExchange 将锁状态从 UNLOCKED 更改为 LOCKED 来获取锁。Atomics.compareExchange 尝试以原子方式进行状态更改,并返回内存位置的原始值。如果原始值为 UNLOCKED,我们知道状态更改成功,并且线程获取了锁。不需要做任何其他操作。

如果 Atomics.compareExchange 无法更改锁状态,则另一个线程必须持有锁。因此,此线程尝试 Atomics.wait 以等待另一个线程释放锁。如果内存位置仍然保存预期值(在本例中为 AsyncLock.LOCKED),则调用 Atomics.wait 将阻塞线程,并且 Atomics.wait 调用将仅在另一个线程调用 Atomics.notify 时返回。

unlock 方法将锁设置为 UNLOCKED 状态,并调用 Atomics.notify 以唤醒一个正在等待锁的等待者。状态更改始终预期会成功,因为此线程持有锁,并且在此期间没有人应该调用 unlock()

unlock() {
const oldValue = Atomics.compareExchange(this.i32a, AsyncLock.INDEX,
/* old value >>> */ AsyncLock.LOCKED,
/* new value >>> */ AsyncLock.UNLOCKED);
if (oldValue != AsyncLock.LOCKED) {
throw new Error('Tried to unlock while not holding the mutex');
}
Atomics.notify(this.i32a, AsyncLock.INDEX, 1);
}

简单的情况如下:锁是空闲的,线程 T1 通过使用 Atomics.compareExchange 更改锁状态来获取它。线程 T2 尝试通过调用 Atomics.compareExchange 获取锁,但它无法更改锁状态。然后 T2 调用 Atomics.wait,这会阻塞线程。在某个时刻,T1 释放锁并调用 Atomics.notify。这使得 T2 中的 Atomics.wait 调用返回 'ok',唤醒 T2。然后 T2 再次尝试获取锁,这次成功了。

还有 2 种可能的极端情况——这些情况说明了 Atomics.waitAtomics.waitAsync 检查索引处特定值的原因

  • T1 持有锁,T2 尝试获取它。首先,T2 尝试使用 Atomics.compareExchange 更改锁状态,但没有成功。但随后 T1 在 T2 设法调用 Atomics.wait 之前释放了锁。当 T2 调用 Atomics.wait 时,它会立即返回 'not-equal' 值。在这种情况下,T2 继续执行下一个循环迭代,再次尝试获取锁。
  • T1 持有锁,T2 使用 Atomics.wait 等待它。T1 释放锁——T2 醒来(Atomics.wait 调用返回)并尝试执行 Atomics.compareExchange 以获取锁,但另一个线程 T3 速度更快,已经获取了锁。因此,对 Atomics.compareExchange 的调用无法获取锁,T2 再次调用 Atomics.wait,阻塞直到 T3 释放锁。

由于后一种极端情况,互斥锁不是“公平的”。T2 可能一直在等待锁被释放,但 T3 来了并立即获取了它。更现实的锁实现可能会使用多个状态来区分“锁定”和“锁定并存在争用”。

异步锁定 #

非阻塞的 executeLocked 方法可以从主线程调用,与阻塞的 lock 方法不同。它获取一个回调函数作为其唯一参数,并安排在成功获取锁后执行回调。

executeLocked(f) {
const self = this;

async function tryGetLock() {
while (true) {
const oldValue = Atomics.compareExchange(self.i32a, AsyncLock.INDEX,
/* old value >>> */ AsyncLock.UNLOCKED,
/* new value >>> */ AsyncLock.LOCKED);
if (oldValue == AsyncLock.UNLOCKED) {
f();
self.unlock();
return;
}
const result = Atomics.waitAsync(self.i32a, AsyncLock.INDEX,
AsyncLock.LOCKED);
// ^ expected value at start
await result.value;
}
}

tryGetLock();
}

内部函数 tryGetLock 首先尝试使用 Atomics.compareExchange 获取锁,如前所述。如果它成功更改了锁状态,它可以执行回调、解锁并返回。

如果 Atomics.compareExchange 无法获取锁,我们需要在锁可能空闲时再次尝试。我们不能阻塞并等待锁变为空闲——相反,我们使用 Atomics.waitAsync 和它返回的 Promise 来安排新的尝试。

如果我们成功启动了 Atomics.waitAsync,则返回的 Promise 在持有锁的线程执行 Atomics.notify 时解析。然后,正在等待锁的线程再次尝试获取锁,如前所述。

在异步版本中,也可能出现相同的极端情况(锁在 Atomics.compareExchange 调用和 Atomics.waitAsync 调用之间被释放,以及锁在 Promise 解析和 Atomics.compareExchange 调用之间再次被获取),因此代码必须以稳健的方式处理它们。

结论 #

在这篇文章中,我们展示了如何使用同步原语 Atomics.waitAtomics.waitAsyncAtomics.notify 来实现一个既可以在主线程中使用也可以在工作线程中使用的互斥锁。

功能支持 #

Atomics.waitAtomics.notify #

  • Chrome: 68 版开始支持
  • Firefox: 78 版开始支持
  • Safari: 不支持
  • Node.js: 8.10.0 版开始支持
  • Babel: 不支持

Atomics.waitAsync #

  • Chrome: 87 版开始支持
  • Firefox: 不支持
  • Safari: 不支持
  • Node.js: 16 版开始支持
  • Babel: 不支持