From 18bcd04305bba235f0c319a6f1eba841e217cd1f Mon Sep 17 00:00:00 2001 From: tshemsedinov Date: Thu, 28 Mar 2019 01:01:36 +0200 Subject: [PATCH] Implement Lock and LockManager Refs: https://github.com/metarhia/metasync/issues/416 --- lib/locks.js | 127 ++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 94 insertions(+), 33 deletions(-) diff --git a/lib/locks.js b/lib/locks.js index 9ead2eb7..4e3e5de3 100644 --- a/lib/locks.js +++ b/lib/locks.js @@ -3,6 +3,7 @@ const { Worker, isMainThread, parentPort } = require('worker_threads'); const threads = new Set(); +const resources = new Map(); const LOCKED = 0; const UNLOCKED = 1; @@ -17,62 +18,95 @@ const sendMessage = message => { } }; +class Lock { + constructor(name, options, callback) { + if (typeof options === 'function') { + callback = options; + options = {}; + } + const { mode, ifAvailable, steal } = options; + this.name = name; + this.mode = mode || 'exclusive'; + this.ifAvailable = ifAvailable || false; + this.steal = steal || false; + this.callback = callback; + } +} + class Mutex { - constructor(resourceName, shared, initial = false) { - this.resourceName = resourceName; - this.lock = new Int32Array(shared, 0, 1); - if (initial) Atomics.store(this.lock, 0, UNLOCKED); + constructor(resourceName, buffer, initial = false) { + this.name = resourceName; + this.flag = new Int32Array(buffer, 0, 1); + if (initial) Atomics.store(this.flag, 0, UNLOCKED); this.owner = false; this.trying = false; - this.callback = null; + this.queue = []; + this.current = null; } - enter(callback) { - this.callback = callback; + enter(lock) { + this.queue.push(lock); this.trying = true; - this.tryEnter(); + return this.tryEnter(); } tryEnter() { - if (!this.callback) return; - const prev = Atomics.exchange(this.lock, 0, LOCKED); - if (prev === UNLOCKED) { - this.owner = true; - this.trying = false; - this.callback(this).then(() => { - this.leave(); - }); - this.callback = null; - } + if (this.queue.length === 0) return; + const prev = Atomics.exchange(this.flag, 0, LOCKED); + if (prev === LOCKED) return; + this.owner = true; + this.trying = false; + const lock = this.queue.shift(); + this.current = lock; + return lock.callback(lock).then(() => { + this.leave(); + }); + } + + enterIfAvailable(lock) { + if (this.owner) return lock.callback(); + const prev = Atomics.exchange(this.flag, 0, LOCKED); + if (prev === LOCKED) return lock.callback(); + this.owner = true; + this.trying = false; + this.current = lock; + return lock.callback(lock).then(() => { + this.leave(); + }); } leave() { if (!this.owner) return; - Atomics.store(this.lock, 0, UNLOCKED); + Atomics.store(this.flag, 0, UNLOCKED); this.owner = false; - sendMessage({ kind: 'leave', resourceName: this.resourceName }); + this.current = null; + sendMessage({ kind: 'leave', resourceName: this.name }); + this.tryEnter(); } } -const resources = new Map(); - -const request = (resourceName, callback) => { - let lock = resources.get(resourceName); - if (!lock) { +const request = (resourceName, options, callback) => { + const lock = new Lock(resourceName, options, callback); + let mutex = resources.get(resourceName); + if (!mutex) { const buffer = new SharedArrayBuffer(4); - lock = new Mutex(resourceName, buffer, true); - resources.set(resourceName, lock); + mutex = new Mutex(resourceName, buffer, true); + resources.set(resourceName, mutex); sendMessage({ kind: 'create', resourceName, buffer }); } - lock.enter(callback); - return lock; + if (lock.ifAvailable) return mutex.enterIfAvailable(lock); + return mutex.enter(lock); }; const receiveMessage = message => { const { kind, resourceName, buffer } = message; if (kind === 'create') { - const lock = new Mutex(resourceName, buffer); - resources.set(resourceName, lock); + const mutex = new Mutex(resourceName, buffer); + resources.set(resourceName, mutex); + } else if (kind === 'leave') { + for (const mutex of resources) { + if (mutex.trying) mutex.tryEnter(); + } } }; @@ -96,6 +130,33 @@ class Thread { } } -const locks = { resources, request, sendMessage, receiveMessage, Thread }; +class LockManagerSnapshot { + constructor() { + const held = []; + const pending = []; + this.held = held; + this.pending = pending; + + for (const mutex of resources) { + if (mutex.queue.length > 0) { + pending.push(...mutex.queue); + } + if (mutex.current) { + held.push(mutex.current); + } + } + } +} + +class LockManager { + constructor() { + this.request = request; + this.Thread = Thread; + } + query() { + const snapshot = new LockManagerSnapshot(); + return Promise.resolve(snapshot); + } +} -module.exports = { locks }; +module.exports = { locks: new LockManager() };