diff --git a/src/services/__tests__/anchor-service.test.ts b/src/services/__tests__/anchor-service.test.ts index 861c0471..81879a98 100644 --- a/src/services/__tests__/anchor-service.test.ts +++ b/src/services/__tests__/anchor-service.test.ts @@ -378,6 +378,30 @@ describe('anchor service', () => { expect(anchors.length).toEqual(numRequests) }) + test('Retry persisting anchors on serializable erorr', async () => { + const requests = await fake.multipleRequests(4) + + const [candidates] = await anchorService._findCandidates(requests, 0) + const merkleTree = await anchorService._buildMerkleTree(candidates) + const ipfsProofCid = await ipfsService.storeRecord({}) + const anchors = await anchorService._createAnchorCommits(ipfsProofCid, merkleTree) + + // this update will lock the request rows for 1 second + // it is not awaited, so it will run in parallel with the persistAnchorResult + connection.transaction(async (trx) => { + await requestRepository + .withConnection(trx) + .updateRequests({ status: RequestStatus.REPLACED }, requests) + + await Utils.delay(1000) + }) + // an error will be thrown because of the lock from above, but will retry until it works + await anchorService._persistAnchorResult(anchors, candidates) + + const readyRequests = await requestRepository.findByStatus(RequestStatus.READY) + expect(readyRequests.length).toEqual(0) + }) + describe('Request pinning', () => { async function anchorRequests(numRequests: number): Promise { // Create Requests @@ -711,7 +735,7 @@ describe('anchor service', () => { const batch = new MockQueueMessage({ bid: uuidv4(), - rids: requests.map(({id}) => id), + rids: requests.map(({ id }) => id), }) anchorBatchQueueService.receiveMessage.mockReturnValue(Promise.resolve(batch)) diff --git a/src/services/anchor-service.ts b/src/services/anchor-service.ts index ad82e0e7..9c3b32c0 100644 --- a/src/services/anchor-service.ts +++ b/src/services/anchor-service.ts @@ -579,6 +579,7 @@ export class AnchorService { { isolationLevel: 'repeatable read' } ) .catch(async (err) => { + logger.err(`Error persisting anchor results: ${err}`) if (err?.code === REPEATED_READ_SERIALIZATION_ERROR) { Metrics.count(METRIC_NAMES.DB_SERIALIZATION_ERROR, 1) await Utils.delay(100)