diff --git a/.eslintignore b/.eslintignore index 1521c8b7..357df9cd 100644 --- a/.eslintignore +++ b/.eslintignore @@ -1 +1,2 @@ dist +locks.js diff --git a/lib/locks.js b/lib/locks.js new file mode 100644 index 00000000..aac4ca2d --- /dev/null +++ b/lib/locks.js @@ -0,0 +1,99 @@ +'use strict'; + +const { Worker, isMainThread, parentPort } = require('worker_threads'); + +const threads = new Set(); + +const LOCKED = 0; +const UNLOCKED = 1; + +const sendMessage = message => { + if (isMainThread) { + for (const thread of threads) { + thread.worker.postMessage(message); + } + } else { + parentPort.postMessage(message); + } +}; + +class Mutex { + constructor(resourceName, shared, initial = false) { + this.resourceName = resourceName; + this.lock = new Int32Array(shared, 0, 1); + if (initial) Atomics.store(this.lock, 0, UNLOCKED); + this.owner = false; + this.trying = false; + this.callback = null; + } + + enter(callback) { + this.callback = callback; + this.trying = true; + this.tryEnter(); + } + + tryEnter() { + if (!this.callback) return; + const prev = Atomics.exchange(this.lock, 0, LOCKED); + if (prev === UNLOCKED) { + this.owner = true; + this.trying = false; + this.callback(this).then(() => { + this.leave(); + }); + this.callback = null; + } + } + + leave() { + if (!this.owner) return; + Atomics.store(this.lock, 0, UNLOCKED); + this.owner = false; + sendMessage({ kind: 'leave', resourceName: this.resourceName }); + } +} + +const resources = new Map(); + +const request = (resourceName, callback) => { + let lock = resources.get(resourceName); + if (!lock) { + const buffer = new SharedArrayBuffer(4); + lock = new Mutex(resourceName, buffer, true); + resources.set(resourceName, lock); + sendMessage({ kind: 'create', resourceName, buffer }); + } + lock.enter(callback); + return lock; +}; + +const receiveMessage = message => { + const { kind, resourceName, buffer } = message; + if (kind === 'create') { + const lock = new Mutex(resourceName, buffer); + resources.set(resourceName, lock); + } +}; + +if (!isMainThread) { + parentPort.on('message', receiveMessage); +} + +class Thread { + constructor() { + const worker = new Worker(__filename); + this.worker = worker; + threads.add(this); + worker.on('message', message => { + for (const thread of threads) { + if (thread.worker !== worker) { + thread.worker.postMessage(message); + } + } + receiveMessage(message); + }); + } +} + +module.exports = { resources, request, sendMessage, receiveMessage, Thread }; diff --git a/metasync.js b/metasync.js index eb8cb10e..90f868ce 100644 --- a/metasync.js +++ b/metasync.js @@ -16,10 +16,12 @@ const submodules = [ 'memoize', // Async memoization 'do', // Simple chain/do 'poolify', // Create pool from factory + 'locks', // Web locks API for node.js ].map(path => require('./lib/' + path)); if (nodeVerion >= 10) { submodules.push(require('./lib/async-iterator')); + submodules.push(require('./lib/locks')); } const { compose } = submodules[0];