Skip to content

Commit

Permalink
feat: RequestQueue v2 locking
Browse files Browse the repository at this point in the history
  • Loading branch information
vladfrangu committed Aug 31, 2023
1 parent 4febb3d commit ce9a3cf
Show file tree
Hide file tree
Showing 2 changed files with 194 additions and 1 deletion.
132 changes: 131 additions & 1 deletion src/emulators/request_queue_emulator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import type { Database, Statement, Transaction, RunResult } from 'better-sqlite3
import { QueueOperationInfo } from './queue_operation_info';
import { STORAGE_NAMES, TIMESTAMP_SQL, DATABASE_FILE_NAME } from '../consts';
import type { DatabaseConnectionCache } from '../database_connection_cache';
import type { RequestModel } from '../resource_clients/request_queue';
import type { ProlongRequestLockOptions, RequestModel, RequestOptions } from '../resource_clients/request_queue';
import { ProcessedRequest } from './batch_add_requests/processed_request';
import { UnprocessedRequest } from './batch_add_requests/unprocessed_request';

Expand Down Expand Up @@ -103,6 +103,20 @@ export class RequestQueueEmulator {

private _deleteRequestTransaction!: Transaction;

private _fetchRequestNotExpired!: Statement<[id: string]>;

private _fetchRequestNotExpiredAndLocked!: Statement<{ id: string; currentTime: number }>;

private _updateOrderNo!: Statement<{ id: string; orderNo: number }>;

private _prolongRequestLockTransaction!: Transaction<(id: string, prolongOptions: ProlongRequestLockOptions) => Date>;

private _deleteRequestLockTransaction!: Transaction<(id: string, options: RequestOptions) => void>;

private _fetchRequestHeadThatWillBeLocked!: Statement<{ queueId: string; limit: number; currentTime: number; }>;

private _listAndLockHeadTransaction!: Transaction<(queueId: string, limit: number, lockSecs: number) => string[]>;

constructor({ queueDir, dbConnections }: RequestQueueEmulatorOptions) {
this.dbPath = join(queueDir, DATABASE_FILE_NAME);
this.dbConnections = dbConnections;
Expand Down Expand Up @@ -420,6 +434,122 @@ export class RequestQueueEmulator {
return this._deleteRequestTransaction(id);
}

prolongRequestLock(id: string, options: ProlongRequestLockOptions) {
if (!this._fetchRequestNotExpired) {
this._fetchRequestNotExpired = this.db.prepare(/* sql */`
SELECT id, orderNo FROM ${this.requestsTableName}
WHERE id = ?
AND orderNo IS NOT NULL
`);
}

this._initUpdateOrderNo();

if (!this._prolongRequestLockTransaction) {
this._prolongRequestLockTransaction = this.db.transaction((passedId, passedOptions) => {
const existingRequest = this._fetchRequestNotExpired.get(passedId) as { orderNo: number; id: string } | undefined;

if (!existingRequest) {
throw new Error(`Request with ID ${passedId} was already handled or doesn't exist`);
}

const unlockTimestamp = Math.abs(existingRequest.orderNo) + passedOptions.lockSecs * 1000;
const newOrderNo = passedOptions.forefront ? -unlockTimestamp : unlockTimestamp;

this._updateOrderNo.run({ id: passedId, orderNo: newOrderNo });

return new Date(unlockTimestamp);
});
}

return this._prolongRequestLockTransaction(id, options);
}

deleteRequestLock(id: string, options: RequestOptions) {
if (!this._fetchRequestNotExpiredAndLocked) {
this._fetchRequestNotExpiredAndLocked = this.db.prepare(/* sql */`
SELECT id FROM ${this.requestsTableName}
WHERE id = :id
AND orderNo IS NOT NULL
AND (
orderNo > :currentTime
OR orderNo < -(:currentTime)
)
`);
}

this._initUpdateOrderNo();

if (!this._deleteRequestLockTransaction) {
this._deleteRequestLockTransaction = this.db.transaction((passedId, { forefront }) => {
const timestamp = Date.now();

const existingRequest = this._fetchRequestNotExpiredAndLocked.get({
id: passedId,
currentTime: timestamp,
}) as { id: string } | undefined;

if (!existingRequest) {
throw new Error(`Request with ID ${passedId} was already handled, doesn't exist, or is not locked`);
}

this._updateOrderNo.run({ id: passedId, orderNo: forefront ? -timestamp : timestamp });
});
}

return this._deleteRequestLockTransaction(id, options);
}

listAndLockHead(queueId: string, limit: number, lockSecs: number): string[] {
if (!this._fetchRequestHeadThatWillBeLocked) {
this._fetchRequestHeadThatWillBeLocked = this.db.prepare(/* sql */`
SELECT id, "json", orderNo FROM ${this.requestsTableName}
WHERE queueId = CAST(:queueId as INTEGER)
AND orderNo IS NOT NULL
AND orderNo <= :currentTime
AND orderNo >= -(:currentTime)
ORDER BY orderNo ASC
LIMIT :limit
`);
}

this._initUpdateOrderNo();

if (!this._listAndLockHeadTransaction) {
this._listAndLockHeadTransaction = this.db.transaction((passedQueueId, passedLimit, passedLockSecs) => {
const timestamp = Date.now();

const requestsToLock = this._fetchRequestHeadThatWillBeLocked.all({
queueId: passedQueueId,
currentTime: timestamp,
limit: passedLimit,
}) as { id: string; json: string; orderNo: number }[];

if (!requestsToLock.length) {
return [];
}

for (const { id, orderNo } of requestsToLock) {
const newOrderNo = (timestamp + passedLockSecs * 1000) * (orderNo > 0 ? 1 : -1);

this._updateOrderNo.run({ id, orderNo: newOrderNo });
}

return requestsToLock.map(({ json }) => json);
});
}

return this._listAndLockHeadTransaction(queueId, limit, lockSecs);
}

private _initUpdateOrderNo() {
this._updateOrderNo ??= this.db.prepare(/* sql */`
UPDATE ${this.requestsTableName}
SET orderNo = :orderNo
WHERE id = :id
`);
}

private _createTables() {
this.db.prepare(`
CREATE TABLE IF NOT EXISTS ${this.queueTableName}(
Expand Down
63 changes: 63 additions & 0 deletions src/resource_clients/request_queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ export interface QueueHead {
items: RequestQueueHeadItem[];
}

export interface ListAndLockHeadResult extends QueueHead {
lockSecs: number;
}

export interface RequestModel {
id?: string;
queueId?: string;
Expand Down Expand Up @@ -69,6 +73,18 @@ export interface RequestOptions {
forefront?: boolean;
}

export interface ProlongRequestLockOptions extends RequestOptions {
lockSecs: number;
}

export interface ProlongRequestLockResult {
lockExpiresAt: Date;
}

export interface ListAndLockOptions extends ListOptions {
lockSecs: number;
}

export class RequestQueueClient {
// Since queues are represented by folders,
// each DB only has one queue with ID 1.
Expand Down Expand Up @@ -233,6 +249,53 @@ export class RequestQueueClient {
throw new Error('This method is not implemented in @apify/storage-local yet.');
}

async prolongRequestLock(id: string, options: ProlongRequestLockOptions): Promise<ProlongRequestLockResult> {
ow(id, ow.string);
ow(options, ow.object.exactShape({
lockSecs: ow.number,
forefront: ow.optional.boolean,
}));

this._getEmulator().updateAccessedAtById(this.id);
const lockExpiresAt = this._getEmulator().prolongRequestLock(id, options);

return { lockExpiresAt };
}

async deleteRequestLock(id: string, options: RequestOptions = {}) {
ow(id, ow.string);
ow(options, ow.object.exactShape({
forefront: ow.optional.boolean,
}));

this._getEmulator().updateAccessedAtById(this.id);
this._getEmulator().deleteRequestLock(id, options);
}

async listAndLockHead(options: ListAndLockOptions): Promise<ListAndLockHeadResult> {
ow(options, ow.object.exactShape({
limit: ow.optional.number.lessThanOrEqual(25),
lockSecs: ow.number,
}));

const {
limit = 25,
lockSecs,
} = options;

this._getEmulator().updateAccessedAtById(this.id);
const requestJsons = this._getEmulator().listAndLockHead(this.id, limit, lockSecs);

const queueModifiedAt = new Date(this._getEmulator().selectModifiedAtById(this.id));
return {
limit,
queueModifiedAt,
hadMultipleClients: false,
items: requestJsons.map((json) => this._jsonToRequest(json)),
lockSecs,
};
}

private _createRequestModel(request: RequestBody, forefront?: boolean): RequestModel {
const orderNo = this._calculateOrderNo(request, forefront);
const id = uniqueKeyToRequestId(request.uniqueKey);
Expand Down

0 comments on commit ce9a3cf

Please sign in to comment.