From 9fae078a51403cb610e5cb39ac237114977e810d Mon Sep 17 00:00:00 2001 From: stephhuynh18 Date: Tue, 7 May 2024 11:55:39 -0700 Subject: [PATCH 1/2] fix: add tests and log when there is an error when persisting anchors --- src/services/__tests__/anchor-service.test.ts | 19 ++++++++++++++++++- src/services/anchor-service.ts | 1 + 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/src/services/__tests__/anchor-service.test.ts b/src/services/__tests__/anchor-service.test.ts index 861c0471..585ec63c 100644 --- a/src/services/__tests__/anchor-service.test.ts +++ b/src/services/__tests__/anchor-service.test.ts @@ -378,6 +378,23 @@ describe('anchor service', () => { expect(anchors.length).toEqual(numRequests) }) + test('Retry persisting anchors on serializable erorr', async () => { + const requests = await fake.multipleRequests(4) + + const [candidates] = await anchorService._findCandidates(requests, 0) + const merkleTree = await anchorService._buildMerkleTree(candidates) + const ipfsProofCid = await ipfsService.storeRecord({}) + const anchors = await anchorService._createAnchorCommits(ipfsProofCid, merkleTree) + + await Promise.all([ + requestRepository.updateRequests({ status: RequestStatus.REPLACED }, requests), + anchorService._persistAnchorResult(anchors, candidates), + ]) + + const readyRequests = await requestRepository.findByStatus(RequestStatus.READY) + expect(readyRequests.length).toEqual(0) + }) + describe('Request pinning', () => { async function anchorRequests(numRequests: number): Promise { // Create Requests @@ -711,7 +728,7 @@ describe('anchor service', () => { const batch = new MockQueueMessage({ bid: uuidv4(), - rids: requests.map(({id}) => id), + rids: requests.map(({ id }) => id), }) anchorBatchQueueService.receiveMessage.mockReturnValue(Promise.resolve(batch)) diff --git a/src/services/anchor-service.ts b/src/services/anchor-service.ts index ad82e0e7..9c3b32c0 100644 --- a/src/services/anchor-service.ts +++ b/src/services/anchor-service.ts @@ -579,6 +579,7 @@ export class AnchorService { { isolationLevel: 'repeatable read' } ) .catch(async (err) => { + logger.err(`Error persisting anchor results: ${err}`) if (err?.code === REPEATED_READ_SERIALIZATION_ERROR) { Metrics.count(METRIC_NAMES.DB_SERIALIZATION_ERROR, 1) await Utils.delay(100) From 12e247fd5e9b2da309e699422d30e238390f79fd Mon Sep 17 00:00:00 2001 From: stephhuynh18 Date: Tue, 7 May 2024 13:14:14 -0700 Subject: [PATCH 2/2] chore: improve test and add comments --- src/services/__tests__/anchor-service.test.ts | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/services/__tests__/anchor-service.test.ts b/src/services/__tests__/anchor-service.test.ts index 585ec63c..81879a98 100644 --- a/src/services/__tests__/anchor-service.test.ts +++ b/src/services/__tests__/anchor-service.test.ts @@ -386,10 +386,17 @@ describe('anchor service', () => { const ipfsProofCid = await ipfsService.storeRecord({}) const anchors = await anchorService._createAnchorCommits(ipfsProofCid, merkleTree) - await Promise.all([ - requestRepository.updateRequests({ status: RequestStatus.REPLACED }, requests), - anchorService._persistAnchorResult(anchors, candidates), - ]) + // this update will lock the request rows for 1 second + // it is not awaited, so it will run in parallel with the persistAnchorResult + connection.transaction(async (trx) => { + await requestRepository + .withConnection(trx) + .updateRequests({ status: RequestStatus.REPLACED }, requests) + + await Utils.delay(1000) + }) + // an error will be thrown because of the lock from above, but will retry until it works + await anchorService._persistAnchorResult(anchors, candidates) const readyRequests = await requestRepository.findByStatus(RequestStatus.READY) expect(readyRequests.length).toEqual(0)