Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
CMCDragonkai committed Mar 31, 2022
1 parent 7955e52 commit a644cda
Show file tree
Hide file tree
Showing 5 changed files with 376 additions and 88 deletions.
100 changes: 69 additions & 31 deletions src/RWLockReader.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import type { MutexInterface } from 'async-mutex';
import type { ResourceAcquire } from '@matrixai/resources';
import { Mutex } from 'async-mutex';
import { Mutex, withTimeout } from 'async-mutex';
import { withF, withG } from '@matrixai/resources';
import { ErrorAsyncLocksTimeout } from './errors';

/**
* Read-preferring read write lock
Expand All @@ -12,35 +13,68 @@ class RWLockReader {
protected lock: Mutex = new Mutex();
protected release: MutexInterface.Releaser;

public acquireRead: ResourceAcquire<RWLockReader> = async () => {
const readerCount = ++this._readerCount;
// The first reader locks
if (readerCount === 1) {
this.release = await this.lock.acquire();
}
return [
async () => {
const readerCount = --this._readerCount;
// The last reader unlocks
if (readerCount === 0) {
this.release();
public read(timeout?: number): ResourceAcquire<RWLockReader> {
return async () => {
const readerCount = ++this._readerCount;
// The first reader locks
if (readerCount === 1) {
let lock: MutexInterface = this.lock;
if (timeout != null) {
lock = withTimeout(this.lock, timeout, new ErrorAsyncLocksTimeout());
}
},
this,
];
};
try {
console.log('ATTEMPT READ');
this.release = await lock.acquire();
console.log('ACQUIRED READ');

public acquireWrite: ResourceAcquire<RWLockReader> = async () => {
++this._writerCount;
this.release = await this.lock.acquire();
return [
async () => {
} catch (e) {
console.log('READ LOCK TIMEOUT', e.name);
--this._readerCount;
throw e;
}
}
return [
async () => {
console.log('RELEASE READ LOCK');
const readerCount = --this._readerCount;
// The last reader unlocks
if (readerCount === 0) {
this.release();
}
},
this,
];
};
}

public write(timeout?: number): ResourceAcquire<RWLockReader> {
return async () => {
++this._writerCount;
let lock: MutexInterface = this.lock;
if (timeout != null) {
lock = withTimeout(this.lock, timeout, new ErrorAsyncLocksTimeout);
}
let release: MutexInterface.Releaser;
try {
console.log('ATTEMPT WRITE');
release = await lock.acquire();
console.log('ACQUIRED WRITE');

} catch (e) {
console.log('WRITE LOCK TIMEOUT', e.name);
--this._writerCount;
this.release();
},
this,
];
};
throw e;
}
return [
async () => {
console.log('RELEASE WRITE LOCK');
release();
--this._writerCount;
},
this,
];
};
}

public get readerCount(): number {
return this._readerCount;
Expand All @@ -60,26 +94,30 @@ class RWLockReader {

public async withReadF<T>(
f: (resources: [RWLockReader]) => Promise<T>,
timeout?: number
): Promise<T> {
return withF([this.acquireRead], f);
return withF([this.read(timeout)], f);
}

public async withWriteF<T>(
f: (resources: [RWLockReader]) => Promise<T>,
timeout?: number
): Promise<T> {
return withF([this.acquireWrite], f);
return withF([this.write(timeout)], f);
}

public withReadG<T, TReturn, TNext>(
g: (resources: [RWLockReader]) => AsyncGenerator<T, TReturn, TNext>,
timeout?: number
): AsyncGenerator<T, TReturn, TNext> {
return withG([this.acquireRead], g);
return withG([this.read(timeout)], g);
}

public withWriteG<T, TReturn, TNext>(
g: (resources: [RWLockReader]) => AsyncGenerator<T, TReturn, TNext>,
timeout?: number
): AsyncGenerator<T, TReturn, TNext> {
return withG([this.acquireWrite], g);
return withG([this.write(timeout)], g);
}
}

Expand Down
139 changes: 102 additions & 37 deletions src/RWLockWriter.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import type { MutexInterface } from 'async-mutex';
import type { ResourceAcquire } from '@matrixai/resources';
import { Mutex } from 'async-mutex';
import { performance } from 'perf_hooks';
import { Mutex, withTimeout } from 'async-mutex';
import { withF, withG } from '@matrixai/resources';
import { ErrorAsyncLocksTimeout } from './errors';

/**
* Write-preferring read write lock
Expand All @@ -14,42 +16,101 @@ class RWLockWriter {
protected _readerCount: number = 0;
protected _writerCount: number = 0;

public acquireRead: ResourceAcquire<RWLockWriter> = async () => {
if (this._writerCount > 0) {
++this.readerCountBlocked;
await this.writersLock.waitForUnlock();
--this.readerCountBlocked;
}
const readerCount = ++this._readerCount;
// The first reader locks
if (readerCount === 1) {
this.readersRelease = await this.readersLock.acquire();
}
return [
async () => {
const readerCount = --this._readerCount;
// The last reader unlocks
if (readerCount === 0) {
this.readersRelease();
public read(timeout?: number): ResourceAcquire<RWLockWriter> {
return async () => {
const t1 = performance.now();
if (this._writerCount > 0) {
++this.readerCountBlocked;
if (timeout != null) {
let timedOut = false;
const timedOutP = new Promise<void>((resolve) => {
setTimeout(() => {
timedOut = true;
resolve();
}, timeout);
});
await Promise.race([
this.writersLock.waitForUnlock(),
timedOutP
]);
if (timedOut) {
--this.readerCountBlocked;
throw new ErrorAsyncLocksTimeout();
}
} else {
await this.writersLock.waitForUnlock();
}
--this.readerCountBlocked;
}
const readerCount = ++this._readerCount;
// The first reader locks
if (readerCount === 1) {
let readersLock: MutexInterface = this.readersLock;
if (timeout != null) {
timeout = timeout - (performance.now() - t1);
readersLock = withTimeout(
this.readersLock,
timeout,
new ErrorAsyncLocksTimeout()
);
}
},
this,
];
};
try {
this.readersRelease = await readersLock.acquire();
} catch (e) {
--this._readerCount;
throw e;
}
}
return [
async () => {
const readerCount = --this._readerCount;
// The last reader unlocks
if (readerCount === 0) {
this.readersRelease();
}
},
this,
];
};
}

public acquireWrite: ResourceAcquire<RWLockWriter> = async () => {
++this._writerCount;
const writersRelease = await this.writersLock.acquire();
this.readersRelease = await this.readersLock.acquire();
return [
async () => {
this.readersRelease();
public write(timeout?: number): ResourceAcquire<RWLockWriter> {
return async () => {
++this._writerCount;
let writersLock: MutexInterface = this.writersLock;
if (timeout != null) {
writersLock = withTimeout(this.writersLock, timeout, new ErrorAsyncLocksTimeout());
}
const t1 = performance.now();
let writersRelease: MutexInterface.Releaser;
try {
writersRelease = await writersLock.acquire();
} catch (e) {
--this._writerCount;
throw e;
}
let readersLock: MutexInterface = this.readersLock;
if (timeout != null) {
timeout = timeout - (performance.now() - t1);
readersLock = withTimeout(this.readersLock, timeout, new ErrorAsyncLocksTimeout());
}
try {
this.readersRelease = await readersLock.acquire();
} catch (e) {
writersRelease();
--this._writerCount;
},
this,
];
};
throw e;
}
return [
async () => {
this.readersRelease();
writersRelease();
--this._writerCount;
},
this,
];
};
}

public get readerCount(): number {
return this._readerCount + this.readerCountBlocked;
Expand All @@ -73,26 +134,30 @@ class RWLockWriter {

public async withReadF<T>(
f: (resources: [RWLockWriter]) => Promise<T>,
timeout?: number
): Promise<T> {
return withF([this.acquireRead], f);
return withF([this.read(timeout)], f);
}

public async withWriteF<T>(
f: (resources: [RWLockWriter]) => Promise<T>,
timeout?: number
): Promise<T> {
return withF([this.acquireWrite], f);
return withF([this.write(timeout)], f);
}

public withReadG<T, TReturn, TNext>(
g: (resources: [RWLockWriter]) => AsyncGenerator<T, TReturn, TNext>,
timeout?: number
): AsyncGenerator<T, TReturn, TNext> {
return withG([this.acquireRead], g);
return withG([this.read(timeout)], g);
}

public withWriteG<T, TReturn, TNext>(
g: (resources: [RWLockWriter]) => AsyncGenerator<T, TReturn, TNext>,
timeout?: number
): AsyncGenerator<T, TReturn, TNext> {
return withG([this.acquireWrite], g);
return withG([this.write(timeout)], g);
}
}

Expand Down
8 changes: 4 additions & 4 deletions tests/Lock.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -174,19 +174,19 @@ describe(Lock.name, () => {
expect(lock.count).toBe(0);
await lock.withF(async () => {
const f = jest.fn();
await expect(lock.withF(f, 1000)).rejects.toThrow(errors.ErrorAsyncLocksTimeout);
await expect(lock.withF(f, 100)).rejects.toThrow(errors.ErrorAsyncLocksTimeout);
expect(f).not.toBeCalled();
}, 1000);
}, 100);
const g = lock.withG(async function *() {
expect(lock.isLocked()).toBe(true);
expect(lock.count).toBe(1);
const f = jest.fn();
const g = lock.withG(f, 1000);
const g = lock.withG(f, 100);
await expect(g.next()).rejects.toThrow(errors.ErrorAsyncLocksTimeout);
expect(f).not.toBeCalled();
expect(lock.isLocked()).toBe(true);
expect(lock.count).toBe(1);
}, 1000);
}, 100);
await g.next();
expect(lock.isLocked()).toBe(false);
expect(lock.count).toBe(0);
Expand Down
Loading

0 comments on commit a644cda

Please sign in to comment.