Skip to content

Commit 201d813

Browse files
authored
fix: handle pg disconnections and transaction management (#92)
* chore: upgrade postgres.js * chore: add auth failed to connection errors * fix: pg transaction and conn error handling * fix: error messages * test: job pg errors * test: queue loop pg error
1 parent 1fee391 commit 201d813

File tree

8 files changed

+103
-48
lines changed

8 files changed

+103
-48
lines changed

package-lock.json

Lines changed: 6 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/pg/postgres-tools/errors.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ export function isPgConnectionError(error: any): string | false {
3939
return 'Postgres connection failed due to a DNS lookup error';
4040
} else if (msg.includes('terminating connection due to administrator command')) {
4141
return 'Postgres connection closed due to administrator command';
42+
} else if (msg.includes('password authentication failed')) {
43+
return 'Postgres authentication failed';
4244
}
4345
}
4446
return false;

src/token-processor/queue/job-queue.ts

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -85,19 +85,22 @@ export class JobQueue {
8585
) {
8686
return;
8787
}
88-
this.jobIds.add(job.id);
8988
await this.db.updateJobStatus({ id: job.id, status: DbJobStatus.queued });
89+
this.jobIds.add(job.id);
9090
void this.queue.add(async () => {
91-
if (this.isRunning) {
92-
if (job.token_id) {
93-
await new ProcessTokenJob({ db: this.db, job: job }).work();
94-
} else if (job.smart_contract_id) {
95-
await new ProcessSmartContractJob({ db: this.db, apiDb: this.apiDb, job: job }).work();
91+
try {
92+
if (this.isRunning) {
93+
if (job.token_id) {
94+
await new ProcessTokenJob({ db: this.db, job: job }).work();
95+
} else if (job.smart_contract_id) {
96+
await new ProcessSmartContractJob({ db: this.db, apiDb: this.apiDb, job: job }).work();
97+
}
98+
} else {
99+
logger.info(`JobQueue cancelling job ${job.id}, queue is now closed`);
96100
}
97-
} else {
98-
logger.info(`JobQueue cancelling job ${job.id}, queue is now closed`);
101+
} finally {
102+
this.jobIds.delete(job.id);
99103
}
100-
this.jobIds.delete(job.id);
101104
});
102105
}
103106

src/token-processor/queue/job/job.ts

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -38,16 +38,13 @@ export abstract class Job {
3838
let finishedWithError = false;
3939
const sw = stopwatch();
4040

41-
// This sql transaction will catch any and all errors that are generated while processing the
42-
// job. Each of them were previously tagged as retryable or not retryable so we'll make a
43-
// decision here about what to do in each case. If we choose to retry, this queue entry will
44-
// simply not be marked as `processed = true` so it can be picked up by the queue at a later
45-
// time.
41+
// This block will catch any and all errors that are generated while processing the job. Each of
42+
// them were previously tagged as retryable or not retryable so we'll make a decision here about
43+
// what to do in each case. If we choose to retry, this queue entry will simply not be marked as
44+
// `processed = true` so it can be picked up by the queue at a later time.
4645
try {
47-
await this.db.sqlWriteTransaction(async sql => {
48-
await this.handler();
49-
processingFinished = true;
50-
});
46+
await this.handler();
47+
processingFinished = true;
5148
} catch (error) {
5249
if (error instanceof RetryableJobError) {
5350
const retries = await this.db.increaseJobRetryCount({ id: this.job.id });
@@ -59,7 +56,7 @@ export abstract class Job {
5956
error,
6057
`Job ${this.description()} recoverable error after ${sw.getElapsed()}ms, trying again later`
6158
);
62-
await this.db.updateJobStatus({ id: this.job.id, status: DbJobStatus.pending });
59+
await this.updateStatus(DbJobStatus.pending);
6360
} else {
6461
logger.warn(error, `Job ${this.description()} max retries reached, giving up`);
6562
processingFinished = true;
@@ -74,12 +71,20 @@ export abstract class Job {
7471
} finally {
7572
if (processingFinished) {
7673
const status = finishedWithError ? DbJobStatus.failed : DbJobStatus.done;
77-
await this.db.updateJobStatus({
78-
id: this.job.id,
79-
status: status,
80-
});
81-
logger.info(`Job ${this.description()} ${status} in ${sw.getElapsed()}ms`);
74+
if (await this.updateStatus(status)) {
75+
logger.info(`Job ${this.description()} ${status} in ${sw.getElapsed()}ms`);
76+
}
8277
}
8378
}
8479
}
80+
81+
private async updateStatus(status: DbJobStatus): Promise<boolean> {
82+
try {
83+
await this.db.updateJobStatus({ id: this.job.id, status: status });
84+
return true;
85+
} catch (error) {
86+
logger.error(`Job ${this.description()} could not update status to ${status}: ${error}`);
87+
return false;
88+
}
89+
}
8590
}

src/token-processor/queue/job/process-smart-contract-job.ts

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -105,14 +105,16 @@ export class ProcessSmartContractJob extends Job {
105105
);
106106
return;
107107
}
108-
await this.db.updateSmartContractTokenCount({ id: contract.id, count: tokenCount });
109-
logger.info(
110-
`ProcessSmartContractJob enqueueing ${tokenCount} tokens for ${this.description()}`
111-
);
112-
await this.db.insertAndEnqueueSequentialTokens({
113-
smart_contract_id: contract.id,
114-
token_count: tokenCount,
115-
type: dbSipNumberToDbTokenType(contract.sip),
108+
await this.db.sqlWriteTransaction(async sql => {
109+
logger.info(
110+
`ProcessSmartContractJob enqueueing ${tokenCount} tokens for ${this.description()}`
111+
);
112+
await this.db.updateSmartContractTokenCount({ id: contract.id, count: tokenCount });
113+
await this.db.insertAndEnqueueSequentialTokens({
114+
smart_contract_id: contract.id,
115+
token_count: tokenCount,
116+
type: dbSipNumberToDbTokenType(contract.sip),
117+
});
116118
});
117119
}
118120
}

src/token-processor/queue/job/process-token-job.ts

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,17 +30,21 @@ export class ProcessTokenJob extends Job {
3030
private contract?: DbSmartContract;
3131

3232
async handler(): Promise<void> {
33-
if (!this.job.token_id) {
33+
const tokenId = this.job.token_id;
34+
if (!tokenId) {
3435
return;
3536
}
36-
const token = await this.db.getToken({ id: this.job.token_id });
37-
if (!token) {
38-
throw Error(`ProcessTokenJob token not found with id ${this.job.token_id}`);
39-
}
40-
const contract = await this.db.getSmartContract({ id: token.smart_contract_id });
41-
if (!contract) {
42-
throw Error(`ProcessTokenJob contract not found with id ${token.smart_contract_id}`);
43-
}
37+
const [token, contract] = await this.db.sqlTransaction(async sql => {
38+
const token = await this.db.getToken({ id: tokenId });
39+
if (!token) {
40+
throw Error(`ProcessTokenJob token not found with id ${tokenId}`);
41+
}
42+
const contract = await this.db.getSmartContract({ id: token.smart_contract_id });
43+
if (!contract) {
44+
throw Error(`ProcessTokenJob contract not found with id ${token.smart_contract_id}`);
45+
}
46+
return [token, contract];
47+
});
4448
this.token = token;
4549
this.contract = contract;
4650

tests/job-queue.test.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { DbJob, DbJobStatus, DbSipNumber, DbSmartContractInsert } from '../src/p
55
import { JobQueue } from '../src/token-processor/queue/job-queue';
66
import { cycleMigrations } from '../src/pg/migrations';
77
import { PgBlockchainApiStore } from '../src/pg/blockchain-api/pg-blockchain-api-store';
8+
import { MockPgBlockchainApiStore } from './helpers';
89

910
class TestJobQueue extends JobQueue {
1011
constructor(args: { db: PgStore; apiDb: PgBlockchainApiStore }) {
@@ -115,4 +116,27 @@ describe('JobQueue', () => {
115116
expect((await db.getJob({ id: job2.id }))?.status).toBe('queued');
116117
expect((await db.getJob({ id: job3.id }))?.status).toBe('queued');
117118
});
119+
120+
test('pg connection errors are not re-thrown', async () => {
121+
const values1: DbSmartContractInsert = {
122+
principal: 'ABCD.test-ft',
123+
sip: DbSipNumber.sip010,
124+
abi: '"some"',
125+
tx_id: '0x123456',
126+
block_height: 1,
127+
};
128+
await db.insertAndEnqueueSmartContract({ values: values1 });
129+
130+
const queue = new JobQueue({ db, apiDb: new MockPgBlockchainApiStore() });
131+
const sleep = (time: number) => {
132+
return new Promise(resolve => setTimeout(resolve, time));
133+
};
134+
135+
// Close DB and start the queue. If the error is not handled correctly, the test will fail.
136+
await db.close();
137+
queue.start();
138+
// Wait 2 seconds and kill the queue.
139+
await sleep(2000);
140+
await queue.close();
141+
});
118142
});

tests/job.test.ts

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,20 @@ class TestRetryableJob extends Job {
99
description(): string {
1010
return 'test';
1111
}
12-
protected handler(): Promise<void> {
12+
handler(): Promise<void> {
1313
throw new RetryableJobError('test');
1414
}
1515
}
1616

17+
class TestDbJob extends Job {
18+
description(): string {
19+
return 'test';
20+
}
21+
async handler(): Promise<void> {
22+
await this.db.sql<{ version: string }[]>`SELECT version()`;
23+
}
24+
}
25+
1726
describe('Job', () => {
1827
let db: PgStore;
1928
let dbJob: DbJob;
@@ -70,4 +79,10 @@ describe('Job', () => {
7079
expect(jobs1[0].retry_count).toBe(1);
7180
expect(jobs1[0].status).toBe('pending');
7281
});
82+
83+
test('db errors are not re-thrown', async () => {
84+
await db.close();
85+
const job = new TestDbJob({ db, job: dbJob });
86+
await expect(job.work()).resolves.not.toThrow();
87+
});
7388
});

0 commit comments

Comments
 (0)