From 748ff8e284792351825d1578b743245092888289 Mon Sep 17 00:00:00 2001 From: Brian Botha Date: Mon, 21 Feb 2022 13:40:16 +1100 Subject: [PATCH] WIP - #305 locking --- src/vaults/VaultInternal.ts | 22 +-- src/vaults/VaultManager.ts | 213 +++++++++++++++++------------- src/vaults/types.ts | 3 +- tests/vaults/VaultManager.test.ts | 15 ++- 4 files changed, 145 insertions(+), 108 deletions(-) diff --git a/src/vaults/VaultInternal.ts b/src/vaults/VaultInternal.ts index 3f21e18390..c795a684f5 100644 --- a/src/vaults/VaultInternal.ts +++ b/src/vaults/VaultInternal.ts @@ -17,7 +17,6 @@ import type { NodeConnectionManager } from '../nodes'; import type { ResourceAcquire } from '../utils'; import path from 'path'; import git from 'isomorphic-git'; -import { Mutex } from 'async-mutex'; import Logger from '@matrixai/logger'; import { CreateDestroyStartStop, @@ -189,11 +188,16 @@ class VaultInternal { protected vaultsNamesDomain: DBDomain; protected efs: EncryptedFS; protected efsVault: EncryptedFS; - protected _lock: Mutex = new Mutex(); + protected _lock: RWLock = new RWLock(); - public lock: ResourceAcquire = async () => { - const release = await this._lock.acquire(); - return [async () => release(), this._lock]; + public lockRead: ResourceAcquire = async () => { + const release = await this._lock.acquireRead(); + return [async () => release()]; + }; + + public lockWrite: ResourceAcquire = async () => { + const release = await this._lock.acquireWrite(); + return [async () => release()]; }; constructor({ @@ -381,7 +385,7 @@ class VaultInternal { @ready(new vaultsErrors.ErrorVaultNotRunning()) public async readF(f: (fs: FileSystemReadable) => Promise): Promise { - return withF([this.lock], async () => { + return withF([this.lockRead], async () => { return await f(this.efsVault); }); } @@ -391,7 +395,7 @@ class VaultInternal { g: (fs: FileSystemReadable) => AsyncGenerator, ): AsyncGenerator { const efsVault = this.efsVault; - return withG([this.lock], async function* () { + return withG([this.lockRead], async function* () { return yield* g(efsVault); }); } @@ -413,7 +417,7 @@ class VaultInternal { // Mirrored vaults are immutable throw new vaultsErrors.ErrorVaultImmutable(); } - return withF([this.lock], async () => { + return withF([this.lockWrite], async () => { await this.db.put( this.vaultMetadataDbDomain, VaultInternal.dirtyKey, @@ -585,7 +589,7 @@ class VaultInternal { const efsVault = this.efsVault; const db = this.db; const vaultDbDomain = this.vaultMetadataDbDomain; - return withG([this.lock], async function* () { + return withG([this.lockWrite], async function* () { if ((await db.get(vaultDbDomain, VaultInternal.remoteKey)) != null) { // Mirrored vaults are immutable throw new vaultsErrors.ErrorVaultImmutable(); diff --git a/src/vaults/VaultManager.ts b/src/vaults/VaultManager.ts index de02f07c42..78f6d9ea08 100644 --- a/src/vaults/VaultManager.ts +++ b/src/vaults/VaultManager.ts @@ -1,6 +1,5 @@ -import type { MutexInterface } from 'async-mutex'; import type { DB, DBDomain, DBLevel } from '@matrixai/db'; -import type { VaultId, VaultName, VaultActions } from './types'; +import type { VaultId, VaultName, VaultActions, VaultIdString } from './types'; import type { Vault } from './Vault'; import type { FileSystem } from '../types'; @@ -14,9 +13,9 @@ import type { ACL } from '../acl'; import type { NotificationsManager } from '../notifications'; import type { RemoteInfo } from './VaultInternal'; +import type { ResourceAcquire } from '../utils'; import path from 'path'; import { PassThrough } from 'readable-stream'; -import { Mutex } from 'async-mutex'; import { EncryptedFS, errors as encryptedfsErrors } from 'encryptedfs'; import Logger from '@matrixai/logger'; import { @@ -33,14 +32,14 @@ import { utils as nodesUtils } from '../nodes'; import { utils as keysUtils } from '../keys'; import * as validationUtils from '../validation/utils'; import config from '../config'; -import { mkdirExists, ResourceAcquire, RWLock } from '../utils'; +import { mkdirExists, RWLock, withF, withG } from '../utils'; import * as nodesPB from '../proto/js/polykey/v1/nodes/nodes_pb'; /** * Object map pattern for each vault */ type VaultMap = Map< - VaultId, + VaultIdString, { vault?: VaultInternal; lock: RWLock; @@ -225,7 +224,7 @@ class VaultManager { // Iterate over vaults in memory and destroy them, ensuring that // the working directory commit state is saved - for (const [vaultId, vaultAndLock] of this.vaultMap) { + for (const [vaultIdString, vaultAndLock] of this.vaultMap) { // This is locking each vault... before it tries to do this // but if we are calling stop now // we will have blocked all the other methods @@ -235,13 +234,10 @@ class VaultManager { // this this applies already just be calling stop // in that it waits for stop to finish - await this.transact(async () => { - // Think about it, maybe we should use stop instead - // it will be clearer!! - // await vaultAndLock.vault?.stop(); - + const vaultId = IdInternal.fromString(vaultIdString); + await withF([this.getWriteLock(vaultId)], async () => { await vaultAndLock.vault?.stop(); - }, [vaultId]); + }); } // Need to figure out if this id thing is a good idea @@ -259,7 +255,9 @@ class VaultManager { // If the DB was stopped, the existing sublevel `this.vaultsDb` will not be valid // Therefore we recreate the sublevel here const vaultsDb = await this.db.level(this.vaultsDbDomain[0]); + // Clearing all vaults db data await vaultsDb.clear(); + // Is it necessary to remove the vaults domain? await this.fs.promises.rm(this.vaultsPath, { force: true, recursive: true, @@ -286,22 +284,30 @@ class VaultManager { // replace this transact with our new withF and withG mechanisms // all we need to do is create `ResourceAcquire` types in this domain - protected getLock(vaultId: VaultId): RWLock { - const vaultAndLock = this.vaultMap.get(vaultId); + const vaultIdString = vaultId.toString() as VaultIdString; + const vaultAndLock = this.vaultMap.get(vaultIdString); if (vaultAndLock != null) return vaultAndLock.lock; const lock = new RWLock(); - this.vaultMap.set(vaultId, { lock }); + this.vaultMap.set(vaultIdString, { lock }); return lock; } - protected getReadLock(vaultId: VaultId): ResourceAcquire { + protected getReadLock(vaultId: VaultId): ResourceAcquire { const lock = this.getLock(vaultId); - const release = lock.acquireRead(); - return [async () => release(),]; + return async () => { + const release = await lock.acquireRead(); + return [async () => release()]; + }; } - + protected getWriteLock(vaultId: VaultId): ResourceAcquire { + const lock = this.getLock(vaultId); + return async () => { + const release = await lock.acquireWrite(); + return [async () => release()]; + }; + } /** * Constructs a new vault instance with a given name and @@ -318,8 +324,9 @@ class VaultManager { } const vaultId = await this.generateVaultId(); const lock = new RWLock(); - this.vaultMap.set(vaultId, { lock }); - return await this.transact(async () => { + const vaultIdString = vaultId.toString() as VaultIdString; + this.vaultMap.set(vaultIdString, { lock }); + return await withF([this.getWriteLock(vaultId)], async () => { // Adding vault to name map await this.db.put( this.vaultsNamesDbDomain, @@ -340,9 +347,9 @@ class VaultManager { fresh: true, }); // Adding vault to object map - this.vaultMap.set(vaultId, { lock, vault }); + this.vaultMap.set(vaultIdString, { lock, vault }); return vault.vaultId; - }, [vaultId]); + }); } /** @@ -389,16 +396,17 @@ class VaultManager { if (vaultMeta == null) return; const vaultName = vaultMeta.vaultName; this.logger.info(`Destroying Vault ${vaultsUtils.encodeVaultId(vaultId)}`); - await this.transact(async () => { + const vaultIdString = vaultId.toString() as VaultIdString; + await withF([this.getWriteLock(vaultId)], async () => { const vault = await this.getVault(vaultId); // Destroying vault state and metadata await vault.stop(); await vault.destroy(); // Removing from map - this.vaultMap.delete(vaultId); + this.vaultMap.delete(vaultIdString); // Removing name->id mapping await this.db.del(this.vaultsNamesDbDomain, vaultName); - }, [vaultId]); + }); this.logger.info(`Destroyed Vault ${vaultsUtils.encodeVaultId(vaultId)}`); } @@ -410,9 +418,12 @@ class VaultManager { if ((await this.getVaultName(vaultId)) == null) { throw new vaultsErrors.ErrorVaultsVaultUndefined(); } - const vault = await this.getVault(vaultId); - await vault.stop(); - this.vaultMap.delete(vaultId); + const vaultIdString = vaultId.toString() as VaultIdString; + await withF([this.getWriteLock(vaultId)], async () => { + const vault = await this.getVault(vaultId); + await vault.stop(); + this.vaultMap.delete(vaultIdString); + }); } /** @@ -444,7 +455,7 @@ class VaultManager { vaultId: VaultId, newVaultName: VaultName, ): Promise { - await this.transact(async () => { + await withF([this.getWriteLock(vaultId)], async () => { this.logger.info(`Renaming Vault ${vaultsUtils.encodeVaultId(vaultId)}`); // Checking if new name exists if (await this.getVaultId(newVaultName)) { @@ -470,7 +481,7 @@ class VaultManager { vaultId.toBuffer(), true, ); - }, [vaultId]); + }); } /** @@ -521,7 +532,9 @@ class VaultManager { public async shareVault(vaultId: VaultId, nodeId: NodeId): Promise { const vaultMeta = await this.getVaultMeta(vaultId); if (!vaultMeta) throw new vaultsErrors.ErrorVaultsVaultUndefined(); - await this.transact(async () => { + // FIXME: does this need locking? + // We don't mutate the vault and the domains have their own locking + await withF([this.getWriteLock(vaultId)], async () => { await this.gestaltGraph._transaction(async () => { await this.acl._transaction(async () => { // Node Id permissions translated to other nodes in @@ -540,7 +553,7 @@ class VaultManager { }); }); }); - }, [vaultId]); + }); } /** @@ -570,12 +583,13 @@ class VaultManager { vaultNameOrId: VaultId | VaultName, ): Promise { const vaultId = await this.generateVaultId(); - const lock = new Mutex(); - this.vaultMap.set(vaultId, { lock }); + const lock = new RWLock(); + const vaultIdString = vaultId.toString() as VaultIdString; + this.vaultMap.set(vaultIdString, { lock }); this.logger.info( `Cloning Vault ${vaultsUtils.encodeVaultId(vaultId)} on Node ${nodeId}`, ); - return await this.transact(async () => { + return await withF([this.getWriteLock(vaultId)], async () => { const vault = await VaultInternal.cloneVaultInternal({ targetNodeId: nodeId, targetVaultNameOrId: vaultNameOrId, @@ -589,12 +603,12 @@ class VaultManager { logger: this.logger.getChild(VaultInternal.name), }); // TODO: We need to add the cloned vaultName to the name->id mapping - this.vaultMap.set(vaultId, { lock, vault }); + this.vaultMap.set(vaultIdString, { lock, vault }); this.logger.info( `Cloned Vault ${vaultsUtils.encodeVaultId(vaultId)} on Node ${nodeId}`, ); return vault.vaultId; - }, [vaultId]); + }); } /** @@ -611,14 +625,14 @@ class VaultManager { pullVaultNameOrId?: VaultId | VaultName; }): Promise { if ((await this.getVaultName(vaultId)) == null) return; - await this.withLocks(async () => { + await withF([this.getWriteLock(vaultId)], async () => { const vault = await this.getVault(vaultId); await vault.pullVault({ nodeConnectionManager: this.nodeConnectionManager, pullNodeId, pullVaultNameOrId, }); - }, [this.getLock(vaultId)]); + }); } /** @@ -626,24 +640,28 @@ class VaultManager { * cloned or pulled from */ @ready(new vaultsErrors.ErrorVaultManagerNotRunning()) - public async *handleInfoRequest( - vaultId: VaultId, - ): AsyncGenerator { - // Adherence to git protocol - yield Buffer.from( - gitUtils.createGitPacketLine('# service=git-upload-pack\n'), + public async *handleInfoRequest(vaultId: VaultId): AsyncGenerator { + const efs = this.efs; + return yield* withG( + [this.getReadLock(vaultId)], + async function* (): AsyncGenerator { + // Adherence to git protocol + yield Buffer.from( + gitUtils.createGitPacketLine('# service=git-upload-pack\n'), + ); + yield Buffer.from('0000'); + // Read the commit state of the vault + const uploadPack = await gitUtils.uploadPack({ + fs: efs, + dir: path.join(vaultsUtils.encodeVaultId(vaultId), 'contents'), + gitdir: path.join(vaultsUtils.encodeVaultId(vaultId), '.git'), + advertiseRefs: true, + }); + for (const buffer of uploadPack) { + yield buffer; + } + }, ); - yield Buffer.from('0000'); - // Read the commit state of the vault - const uploadPack = await gitUtils.uploadPack({ - fs: this.efs, - dir: path.join(vaultsUtils.encodeVaultId(vaultId), 'contents'), - gitdir: path.join(vaultsUtils.encodeVaultId(vaultId), '.git'), - advertiseRefs: true, - }); - for (const buffer of uploadPack) { - yield buffer; - } } /** @@ -655,32 +673,34 @@ class VaultManager { vaultId: VaultId, body: Buffer, ): Promise<[PassThrough, PassThrough]> { - if (body.toString().slice(4, 8) === 'want') { - // Parse the request to get the wanted git object - const wantedObjectId = body.toString().slice(9, 49); - const packResult = await gitUtils.packObjects({ - fs: this.efs, - dir: path.join(vaultsUtils.encodeVaultId(vaultId), 'contents'), - gitdir: path.join(vaultsUtils.encodeVaultId(vaultId), '.git'), - refs: [wantedObjectId], - }); - // Generate a contents and progress stream - const readable = new PassThrough(); - const progressStream = new PassThrough(); - const sideBand = gitUtils.mux( - 'side-band-64', - readable, - packResult.packstream, - progressStream, - ); - return [sideBand, progressStream]; - } else { - throw new gitErrors.ErrorGitUnimplementedMethod( - `Request of type '${body - .toString() - .slice(4, 8)}' not valid, expected 'want'`, - ); - } + return await withF([this.getReadLock(vaultId)], async () => { + if (body.toString().slice(4, 8) === 'want') { + // Parse the request to get the wanted git object + const wantedObjectId = body.toString().slice(9, 49); + const packResult = await gitUtils.packObjects({ + fs: this.efs, + dir: path.join(vaultsUtils.encodeVaultId(vaultId), 'contents'), + gitdir: path.join(vaultsUtils.encodeVaultId(vaultId), '.git'), + refs: [wantedObjectId], + }); + // Generate a contents and progress stream + const readable = new PassThrough(); + const progressStream = new PassThrough(); + const sideBand = gitUtils.mux( + 'side-band-64', + readable, + packResult.packstream, + progressStream, + ); + return [sideBand, progressStream]; + } else { + throw new gitErrors.ErrorGitUnimplementedMethod( + `Request of type '${body + .toString() + .slice(4, 8)}' not valid, expected 'want'`, + ); + } + }); } /** @@ -730,8 +750,9 @@ class VaultManager { @ready(new vaultsErrors.ErrorVaultManagerNotRunning()) protected async getVault(vaultId: VaultId): Promise { let vault: VaultInternal | undefined; - let lock: MutexInterface; - let vaultAndLock = this.vaultMap.get(vaultId); + let lock: RWLock; + const vaultIdString = vaultId.toString() as VaultIdString; + let vaultAndLock = this.vaultMap.get(vaultIdString); if (vaultAndLock != null) { ({ vault, lock } = vaultAndLock); // Lock and vault exist @@ -741,8 +762,8 @@ class VaultManager { // Only lock exists let release; try { - release = await lock.acquire(); - ({ vault, lock } = vaultAndLock); + release = await lock.acquireWrite(); + ({ vault } = vaultAndLock); if (vault != null) { return vault; } @@ -756,19 +777,19 @@ class VaultManager { vaultsDbDomain: this.vaultsDbDomain, }); vaultAndLock.vault = vault; - this.vaultMap.set(vaultId, vaultAndLock); + this.vaultMap.set(vaultIdString, vaultAndLock); return vault; } finally { release(); } } else { // Neither vault nor lock exists - lock = new Mutex(); + lock = new RWLock(); vaultAndLock = { lock }; - this.vaultMap.set(vaultId, vaultAndLock); + this.vaultMap.set(vaultIdString, vaultAndLock); let release; try { - release = await lock.acquire(); + release = await lock.acquireWrite(); vault = await VaultInternal.createVaultInternal({ vaultId, keyManager: this.keyManager, @@ -779,7 +800,7 @@ class VaultManager { logger: this.logger.getChild(VaultInternal.name), }); vaultAndLock.vault = vault; - this.vaultMap.set(vaultId, vaultAndLock); + this.vaultMap.set(vaultIdString, vaultAndLock); return vault; } finally { release(); @@ -813,13 +834,15 @@ class VaultManager { // Obtaining locks. const vaultLocks = vaultIds.map((vaultId) => { - return this.getLock(vaultId); + return this.getWriteLock(vaultId); }); // Running the function with locking. - return await this.withLocks(() => { + // FIXME: This currently deadlocks since the locking for the object map + // and `VaultInternal` is shared now + return await withF(vaultLocks, () => { return f(...vaults); - }, vaultLocks); + }); } protected async setupKey(bits: 128 | 192 | 256): Promise { diff --git a/src/vaults/types.ts b/src/vaults/types.ts index 66e053ebfb..8635f526a3 100644 --- a/src/vaults/types.ts +++ b/src/vaults/types.ts @@ -21,8 +21,8 @@ const tagLast = 'last'; const refs = ['HEAD', tagLast] as const; type VaultId = Opaque<'VaultId', Id>; - type VaultIdEncoded = Opaque<'VaultIdEncoded', string>; +type VaultIdString = Opaque<'VaultIdString', string>; type VaultRef = typeof refs[number]; @@ -161,6 +161,7 @@ export { vaultActions }; export type { VaultId, VaultIdEncoded, + VaultIdString, VaultRef, VaultAction, CommitId, diff --git a/tests/vaults/VaultManager.test.ts b/tests/vaults/VaultManager.test.ts index 3700c51208..c09cd9ae69 100644 --- a/tests/vaults/VaultManager.test.ts +++ b/tests/vaults/VaultManager.test.ts @@ -470,6 +470,7 @@ describe('VaultManager', () => { allDb = await DB.createDB({ dbPath: path.join(allDataDir, 'alldb'), + logger, }); nodeGraph = await NodeGraph.createNodeGraph({ @@ -589,9 +590,17 @@ describe('VaultManager', () => { } }, ); - }); + }, 50000); afterAll(async () => { + await nodeConnectionManager.stop(); + await fwdProxy.stop(); + await nodeGraph.stop(); + await nodeGraph.destroy(); + await allDb.stop(); + await allDb.destroy(); + await keyManager.stop(); + await keyManager.destroy(); await remoteKeynode2.stop(); await remoteKeynode2.destroy(); await remoteKeynode1.stop(); @@ -600,14 +609,14 @@ describe('VaultManager', () => { recursive: true, force: true, }); - }); + }, 25000); test('clone vaults from a remote keynode using a vault name', async () => { const vaultManager = await VaultManager.createVaultManager({ vaultsPath, keyManager: dummyKeyManager, gestaltGraph: {} as GestaltGraph, - nodeConnectionManager: {} as NodeConnectionManager, + nodeConnectionManager, acl: {} as ACL, notificationsManager: {} as NotificationsManager, db,