Atomics.wait
和 Atomics.notify
是低级同步原语,可用于实现互斥锁和其他同步方法。但是,由于 Atomics.wait
是阻塞的,因此无法在主线程上调用它(尝试这样做会抛出 TypeError
)。
从 8.7 版开始,V8 支持一个非阻塞版本,Atomics.waitAsync
,它也可以在主线程上使用。
在这篇文章中,我们将解释如何使用这些低级 API 来实现一个既可以同步(用于工作线程)也可以异步(用于工作线程或主线程)的互斥锁。
Atomics.wait
和 Atomics.waitAsync
接受以下参数
buffer
:一个由SharedArrayBuffer
支持的Int32Array
或BigInt64Array
index
:数组中的有效索引expectedValue
:我们期望在由(buffer, index)
描述的内存位置中存在的值timeout
:以毫秒为单位的超时(可选,默认为Infinity
)
Atomics.wait
的返回值是一个字符串。如果内存位置不包含预期值,Atomics.wait
会立即返回 'not-equal'
值。否则,线程将被阻塞,直到另一个线程使用相同的内存位置调用 Atomics.notify
或超时到达。在前一种情况下,Atomics.wait
返回 'ok'
值,在后一种情况下,Atomics.wait
返回 'timed-out'
值。
Atomics.notify
接受以下参数
- 一个由
SharedArrayBuffer
支持的Int32Array
或BigInt64Array
- 一个索引(在数组中有效)
- 要通知的等待者数量(可选,默认为
Infinity
)
它按 FIFO 顺序通知给定数量的等待者,这些等待者正在等待由 (buffer, index)
描述的内存位置。如果有多个与同一位置相关的 Atomics.wait
调用或 Atomics.waitAsync
调用挂起,它们都在同一个 FIFO 队列中。
与 Atomics.wait
相反,Atomics.waitAsync
始终立即返回。返回值是以下之一
{ async: false, value: 'not-equal' }
(如果内存位置不包含预期值){ async: false, value: 'timed-out' }
(仅适用于立即超时 0){ async: true, value: promise }
该 Promise 稍后可能会解析为字符串值 'ok'
(如果 Atomics.notify
使用相同的内存位置调用)或 'timed-out'
(如果超时到达)。该 Promise 永远不会被拒绝。
以下示例演示了 Atomics.waitAsync
的基本用法
const sab = new SharedArrayBuffer(16);
const i32a = new Int32Array(sab);
const result = Atomics.waitAsync(i32a, 0, 0, 1000);
// | | ^ timeout (opt)
// | ^ expected value
// ^ index
if (result.value === 'not-equal') {
// The value in the SharedArrayBuffer was not the expected one.
} else {
result.value instanceof Promise; // true
result.value.then(
(value) => {
if (value == 'ok') { /* notified */ }
else { /* value is 'timed-out' */ }
});
}
// In this thread, or in another thread:
Atomics.notify(i32a, 0);
接下来,我们将展示如何实现一个既可以同步也可以异步使用的互斥锁。同步版本的互斥锁的实现之前已经讨论过,例如 在这篇博文中。
在这个例子中,我们没有在 Atomics.wait
和 Atomics.waitAsync
中使用超时参数。该参数可用于实现具有超时的条件变量。
我们的互斥锁类 AsyncLock
在 SharedArrayBuffer
上运行,并实现了以下方法
lock
— 阻塞线程,直到我们能够锁定互斥锁(仅可用于工作线程)unlock
— 解锁互斥锁(lock
的对应方法)executeLocked(callback)
— 非阻塞锁,主线程可以使用;安排callback
在我们成功获取锁后执行
让我们看看如何实现这些方法。类定义包括常量和一个以 SharedArrayBuffer
作为参数的构造函数。
class AsyncLock {
static INDEX = 0;
static UNLOCKED = 0;
static LOCKED = 1;
constructor(sab) {
this.sab = sab;
this.i32a = new Int32Array(sab);
}
lock() {
/* … */
}
unlock() {
/* … */
}
executeLocked(f) {
/* … */
}
}
这里 i32a[0]
包含 LOCKED
或 UNLOCKED
值。它也是 Atomics.wait
和 Atomics.waitAsync
的等待位置。AsyncLock
类确保以下不变式
- 如果
i32a[0] == LOCKED
,并且一个线程开始等待(通过Atomics.wait
或Atomics.waitAsync
)i32a[0]
,它最终将被通知。 - 在收到通知后,线程尝试获取锁。如果它获取了锁,它将在释放锁时再次通知。
同步锁定和解锁 #
接下来,我们将展示阻塞的 lock
方法,该方法只能从工作线程调用
lock() {
while (true) {
const oldValue = Atomics.compareExchange(this.i32a, AsyncLock.INDEX,
/* old value >>> */ AsyncLock.UNLOCKED,
/* new value >>> */ AsyncLock.LOCKED);
if (oldValue == AsyncLock.UNLOCKED) {
return;
}
Atomics.wait(this.i32a, AsyncLock.INDEX,
AsyncLock.LOCKED); // <<< expected value at start
}
}
当一个线程调用 lock()
时,它首先尝试通过使用 Atomics.compareExchange
将锁状态从 UNLOCKED
更改为 LOCKED
来获取锁。Atomics.compareExchange
尝试以原子方式进行状态更改,并返回内存位置的原始值。如果原始值为 UNLOCKED
,我们知道状态更改成功,并且线程获取了锁。不需要做任何其他操作。
如果 Atomics.compareExchange
无法更改锁状态,则另一个线程必须持有锁。因此,此线程尝试 Atomics.wait
以等待另一个线程释放锁。如果内存位置仍然保存预期值(在本例中为 AsyncLock.LOCKED
),则调用 Atomics.wait
将阻塞线程,并且 Atomics.wait
调用将仅在另一个线程调用 Atomics.notify
时返回。
unlock
方法将锁设置为 UNLOCKED
状态,并调用 Atomics.notify
以唤醒一个正在等待锁的等待者。状态更改始终预期会成功,因为此线程持有锁,并且在此期间没有人应该调用 unlock()
。
unlock() {
const oldValue = Atomics.compareExchange(this.i32a, AsyncLock.INDEX,
/* old value >>> */ AsyncLock.LOCKED,
/* new value >>> */ AsyncLock.UNLOCKED);
if (oldValue != AsyncLock.LOCKED) {
throw new Error('Tried to unlock while not holding the mutex');
}
Atomics.notify(this.i32a, AsyncLock.INDEX, 1);
}
简单的情况如下:锁是空闲的,线程 T1 通过使用 Atomics.compareExchange
更改锁状态来获取它。线程 T2 尝试通过调用 Atomics.compareExchange
获取锁,但它无法更改锁状态。然后 T2 调用 Atomics.wait
,这会阻塞线程。在某个时刻,T1 释放锁并调用 Atomics.notify
。这使得 T2 中的 Atomics.wait
调用返回 'ok'
,唤醒 T2。然后 T2 再次尝试获取锁,这次成功了。
还有 2 种可能的极端情况——这些情况说明了 Atomics.wait
和 Atomics.waitAsync
检查索引处特定值的原因
- T1 持有锁,T2 尝试获取它。首先,T2 尝试使用
Atomics.compareExchange
更改锁状态,但没有成功。但随后 T1 在 T2 设法调用Atomics.wait
之前释放了锁。当 T2 调用Atomics.wait
时,它会立即返回'not-equal'
值。在这种情况下,T2 继续执行下一个循环迭代,再次尝试获取锁。 - T1 持有锁,T2 使用
Atomics.wait
等待它。T1 释放锁——T2 醒来(Atomics.wait
调用返回)并尝试执行Atomics.compareExchange
以获取锁,但另一个线程 T3 速度更快,已经获取了锁。因此,对Atomics.compareExchange
的调用无法获取锁,T2 再次调用Atomics.wait
,阻塞直到 T3 释放锁。
由于后一种极端情况,互斥锁不是“公平的”。T2 可能一直在等待锁被释放,但 T3 来了并立即获取了它。更现实的锁实现可能会使用多个状态来区分“锁定”和“锁定并存在争用”。
异步锁定 #
非阻塞的 executeLocked
方法可以从主线程调用,与阻塞的 lock
方法不同。它获取一个回调函数作为其唯一参数,并安排在成功获取锁后执行回调。
executeLocked(f) {
const self = this;
async function tryGetLock() {
while (true) {
const oldValue = Atomics.compareExchange(self.i32a, AsyncLock.INDEX,
/* old value >>> */ AsyncLock.UNLOCKED,
/* new value >>> */ AsyncLock.LOCKED);
if (oldValue == AsyncLock.UNLOCKED) {
f();
self.unlock();
return;
}
const result = Atomics.waitAsync(self.i32a, AsyncLock.INDEX,
AsyncLock.LOCKED);
// ^ expected value at start
await result.value;
}
}
tryGetLock();
}
内部函数 tryGetLock
首先尝试使用 Atomics.compareExchange
获取锁,如前所述。如果它成功更改了锁状态,它可以执行回调、解锁并返回。
如果 Atomics.compareExchange
无法获取锁,我们需要在锁可能空闲时再次尝试。我们不能阻塞并等待锁变为空闲——相反,我们使用 Atomics.waitAsync
和它返回的 Promise 来安排新的尝试。
如果我们成功启动了 Atomics.waitAsync
,则返回的 Promise 在持有锁的线程执行 Atomics.notify
时解析。然后,正在等待锁的线程再次尝试获取锁,如前所述。
在异步版本中,也可能出现相同的极端情况(锁在 Atomics.compareExchange
调用和 Atomics.waitAsync
调用之间被释放,以及锁在 Promise 解析和 Atomics.compareExchange
调用之间再次被获取),因此代码必须以稳健的方式处理它们。
结论 #
在这篇文章中,我们展示了如何使用同步原语 Atomics.wait
、Atomics.waitAsync
和 Atomics.notify
来实现一个既可以在主线程中使用也可以在工作线程中使用的互斥锁。
功能支持 #
Atomics.wait
和 Atomics.notify
#
- Chrome: 从 68 版开始支持
- Firefox: 从 78 版开始支持
- Safari: 不支持
- Node.js: 从 8.10.0 版开始支持
- Babel: 不支持
Atomics.waitAsync
#
- Chrome: 从 87 版开始支持
- Firefox: 不支持
- Safari: 不支持
- Node.js: 从 16 版开始支持
- Babel: 不支持