From 0b745f7734e270781919dab9eb2d25a7a1b48c51 Mon Sep 17 00:00:00 2001 From: stephhuynh18 Date: Thu, 4 Apr 2024 15:11:09 -0700 Subject: [PATCH] fix: only create request if it does not exist --- .../__tests__/request-controller.test.ts | 35 +++++++++ src/controllers/request-controller.ts | 24 +++--- .../__tests__/request-repository.test.ts | 74 +++++++++---------- src/repositories/request-repository.ts | 22 ++++-- src/services/__tests__/fake-factory.util.ts | 5 +- src/services/__tests__/ipfs-service.test.ts | 10 ++- src/services/request-service.ts | 13 +++- 7 files changed, 121 insertions(+), 62 deletions(-) diff --git a/src/controllers/__tests__/request-controller.test.ts b/src/controllers/__tests__/request-controller.test.ts index d67bbc9ed..eb1a90c07 100644 --- a/src/controllers/__tests__/request-controller.test.ts +++ b/src/controllers/__tests__/request-controller.test.ts @@ -323,6 +323,41 @@ describe('createRequest', () => { await controller.createRequest(req, mockResponse()) expect(markPreviousReplacedSpy).toBeCalledTimes(1) }) + + test('simultaenous requests', async () => { + const cid = randomCID() + const streamId = randomStreamID() + const timestamp = new Date() + const origin = '203.0.113.195' + const req = mockRequest({ + headers: { + 'Content-type': 'application/json', + 'X-Forwarded-For': [` ${origin}`, `${origin}, 2001:db8:85a3:8d3:1319:8a2e:370:7348`], + }, + body: { + cid: cid.toString(), + streamId: streamId.toString(), + timestamp: timestamp.toISOString(), + }, + }) + + const requestRepository = container.resolve('requestRepository') + const findByCidSpy = jest.spyOn(requestRepository, 'findByCid') + const res0 = mockResponse() + const res1 = mockResponse() + + await Promise.all([controller.createRequest(req, res0), controller.createRequest(req, res1)]) + + expect(findByCidSpy).toBeCalledTimes(1) + expect(findByCidSpy).toBeCalledWith(cid) + + const status0 = res0.status.mock.calls[0][0] + const status1 = res1.status.mock.calls[0][0] + const onlyOneCreated = + (status0 === StatusCodes.CREATED && status1 === StatusCodes.ACCEPTED) || + (status1 === StatusCodes.CREATED && status0 === StatusCodes.ACCEPTED) + expect(onlyOneCreated).toBeTruthy() + }) }) describe('Publish to queue', () => { diff --git a/src/controllers/request-controller.ts b/src/controllers/request-controller.ts index 608151453..be186b58d 100644 --- a/src/controllers/request-controller.ts +++ b/src/controllers/request-controller.ts @@ -119,17 +119,23 @@ export class RequestController { ) try { - const found = await this.requestService.findByCid(requestParams.cid) - logger.debug(`Found request for ${requestParams.cid} of stream ${requestParams.streamId}`) - if (found) { - return res.status(StatusCodes.ACCEPTED).json(found) - } - - const body = await this.requestService.createOrUpdate(requestParams, origin) + const body = await this.requestService.create(requestParams, origin) - Metrics.count(METRIC_NAMES.ANCHOR_REQUESTED, 1, { source: parseOrigin(req) }) + // request was newly created + if (body) { + Metrics.count(METRIC_NAMES.ANCHOR_REQUESTED, 1, { source: parseOrigin(req) }) + return res.status(StatusCodes.CREATED).json(body) + } - return res.status(StatusCodes.CREATED).json(body) + // request already exists so retrieve it + const found = await this.requestService.findByCid(requestParams.cid) + if (!found) { + throw new Error( + `Request with cid ${requestParams.cid} was not created and not found. This should not happen` + ) + } + logger.debug(`Found request for ${requestParams.cid} of stream ${requestParams.streamId}`) + return res.status(StatusCodes.ACCEPTED).json(found) } catch (err: any) { Metrics.count(METRIC_NAMES.REQUEST_NOT_CREATED, 1, { source: parseOrigin(req) }) return this.getBadRequestResponse( diff --git a/src/repositories/__tests__/request-repository.test.ts b/src/repositories/__tests__/request-repository.test.ts index f2076eef3..68629abf9 100644 --- a/src/repositories/__tests__/request-repository.test.ts +++ b/src/repositories/__tests__/request-repository.test.ts @@ -76,16 +76,19 @@ describe('request repository test', () => { await connection2.destroy() }) - test('createOrUpdate: can createOrUpdate simultaneously', async () => { + test('create: simultaneous creates will only create one request', async () => { const request = generateRequest({ status: RequestStatus.READY, }) const [result1, result2] = await Promise.all([ - requestRepository.createOrUpdate(request), - requestRepository.createOrUpdate(request), + requestRepository.create(request), + requestRepository.create(request), ]) - expect(result1).toEqual(result2) + expect( + (result1 === null && result2?.cid === request.cid) || + (result2 === null && result1?.cid === request.cid) + ).toBeTruthy() }) describe('findRequestsToGarbageCollect', () => { @@ -801,41 +804,41 @@ describe('request repository test', () => { const streamId = randomStreamID() // Create three COMPLETED requests. These should not be changed - const completedRequests = await Promise.all( - times(3).map(async (n) => { - const request = new Request({ + const completedRequests = await requestRepository.createRequests( + times(3).map((n) => { + return new Request({ cid: randomCID().toString(), streamId: streamId.toString(), timestamp: ONE_HOUR_AGO.minus({ minute: n }).toJSDate(), status: RequestStatus.COMPLETED, origin: 'same-origin', }) - return requestRepository.createOrUpdate(request) }) ) // PENDING but with a different streamId - const unrelatedStreamRequest = await requestRepository.createOrUpdate( + const [unrelatedStreamRequest] = await requestRepository.createRequests([ new Request({ cid: randomCID().toString(), streamId: randomStreamID().toString(), timestamp: ONE_HOUR_AGO.toJSDate(), status: RequestStatus.PENDING, origin: 'same-origin', - }) - ) + }), + ]) // Create three PENDING requests at `oneHourAgo` plus some minutes - const requestsP = times(3).map(async (n) => { - const request = new Request({ - cid: randomCID().toString(), - streamId: streamId.toString(), - timestamp: ONE_HOUR_AGO.plus({ minute: n }).toJSDate(), - status: RequestStatus.PENDING, - origin: 'same-origin', + const requestsP = await requestRepository.createRequests( + times(3).map((n) => { + return new Request({ + cid: randomCID().toString(), + streamId: streamId.toString(), + timestamp: ONE_HOUR_AGO.plus({ minute: n }).toJSDate(), + status: RequestStatus.PENDING, + origin: 'same-origin', + }) }) - return requestRepository.createOrUpdate(request) - }) + ) const requests = await Promise.all(requestsP) const last = requests[requests.length - 1] expectPresent(last) @@ -867,17 +870,18 @@ describe('request repository test', () => { test('mark regardless of time', async () => { const streamId = randomStreamID() - const requestsP = times(3).map(async (n) => { - const request = new Request({ - cid: randomCID().toString(), - streamId: streamId.toString(), - timestamp: ONE_HOUR_AGO.plus({ minute: n }).toJSDate(), - status: RequestStatus.PENDING, - origin: 'same-origin', + const requestsP = requestRepository.createRequests( + times(3).map((n) => { + return new Request({ + cid: randomCID().toString(), + streamId: streamId.toString(), + timestamp: ONE_HOUR_AGO.plus({ minute: n }).toJSDate(), + status: RequestStatus.PENDING, + origin: 'same-origin', + }) }) - return requestRepository.createOrUpdate(request) - }) - const requests: Array = await Promise.all(requestsP) + ) + const requests: Array = await requestsP expectPresent(requests[0]) const rowsAffected = await requestRepository.markReplaced(requests[0]) expect(rowsAffected).toEqual(requests.length - 1) // Mark every request except the last one @@ -900,14 +904,8 @@ describe('request repository test', () => { const MAX_LIMIT = 3 function createRequests(n: number): Promise { - return Promise.all( - times(n).map(() => { - return requestRepository.createOrUpdate( - generateRequest({ - status: RequestStatus.READY, - }) - ) - }) + return requestRepository.createRequests( + times(n).map(() => generateRequest({ status: RequestStatus.READY })) ) } diff --git a/src/repositories/request-repository.ts b/src/repositories/request-repository.ts index 5a48cc89e..2f0209714 100644 --- a/src/repositories/request-repository.ts +++ b/src/repositories/request-repository.ts @@ -110,16 +110,19 @@ export class RequestRepository { } /** - * Create/update client request - * @returns A promise that resolves to the created request + * Create a client request + * @returns A promise that resolves to the created request if it doesn't already exist */ - async createOrUpdate(request: Request): Promise { - const keys = Object.keys(request).filter((key) => key !== 'id') // all keys except ID + async create(request: Request): Promise { const [created] = await this.table .insert(request.toDB(), ['id']) .returning(Object.keys(RequestCodec.props)) .onConflict('cid') - .merge(keys) + .ignore() + + if (!created) { + return null + } logEvent.db({ type: 'request', @@ -139,10 +142,13 @@ export class RequestRepository { /** * For test use. Creates an array of requests. * @param requests array of requests - * @returns + * @returns array of the stored requests */ - async createRequests(requests: Array): Promise { - await this.table.insert(requests.map((request) => request.toDB())) + async createRequests(requests: Array): Promise { + const stored = await this.table + .insert(requests.map((request) => request.toDB())) + .returning(Object.keys(RequestCodec.props)) + return stored.map((r) => new Request(r)) } /** diff --git a/src/services/__tests__/fake-factory.util.ts b/src/services/__tests__/fake-factory.util.ts index 5b5a44a2d..a9adf07c3 100644 --- a/src/services/__tests__/fake-factory.util.ts +++ b/src/services/__tests__/fake-factory.util.ts @@ -40,7 +40,10 @@ export class FakeFactory { request.message = 'Request is pending.' request.pinned = true - const stored = await this.requestRepository.createOrUpdate(request) + const stored = await this.requestRepository.create(request) + if (!stored) { + throw new Error(`Request with cid ${request.cid} already exists. Cannot create again`) + } await this.requestRepository.markReplaced(stored) return stored } diff --git a/src/services/__tests__/ipfs-service.test.ts b/src/services/__tests__/ipfs-service.test.ts index be5ff3748..af09c6d92 100644 --- a/src/services/__tests__/ipfs-service.test.ts +++ b/src/services/__tests__/ipfs-service.test.ts @@ -273,12 +273,15 @@ describe('pubsub', () => { } ) const requestRepository = injector.resolve('requestRepository') - const createdRequest = await requestRepository.createOrUpdate( + const createdRequest = await requestRepository.create( generateRequest({ streamId: pubsubMessage.stream.toString(), status: RequestStatus.COMPLETED, }) ) + if (!createdRequest) { + throw new Error('Failed to create request because it already exists') + } const anchorRepository = injector.resolve('anchorRepository') const anchorCid = randomCID() await anchorRepository.createAnchors([ @@ -324,7 +327,7 @@ describe('pubsub', () => { // @ts-ignore const beforeWindow = new Date(Date.now() - service.pubsubResponderWindowMs - 1000) const requestRepository = injector.resolve('requestRepository') - const createdRequest = await requestRepository.createOrUpdate( + const createdRequest = await requestRepository.create( generateRequest({ streamId: pubsubMessage.stream.toString(), status: RequestStatus.COMPLETED, @@ -333,6 +336,9 @@ describe('pubsub', () => { updatedAt: beforeWindow, }) ) + if (!createdRequest) { + throw new Error('Failed to create request because it already exists') + } const anchorRepository = injector.resolve('anchorRepository') const anchorCid = randomCID() await anchorRepository.createAnchors([ diff --git a/src/services/request-service.ts b/src/services/request-service.ts index 95135721e..032e16486 100644 --- a/src/services/request-service.ts +++ b/src/services/request-service.ts @@ -44,7 +44,7 @@ export class RequestService { private readonly metadataService: IMetadataService, private readonly validationQueueService: IQueueProducerService ) { - this.publishToQueue = Boolean(config.queue.sqsQueueUrl) + this.publishToQueue = Boolean(config.queue.sqsQueueUrl) } async getStatusForCid(cid: CID): Promise | { error: string }> { @@ -68,10 +68,10 @@ export class RequestService { return this.requestPresentationService.body(found) } - async createOrUpdate( + async create( params: RequestAnchorParams, origin: string - ): Promise> { + ): Promise | null> { let genesisFields: GenesisFields if ('genesisFields' in params) { genesisFields = params.genesisFields @@ -92,7 +92,12 @@ export class RequestService { request.pinned = true request.timestamp = params.timestamp ?? new Date() - const storedRequest = await this.requestRepository.createOrUpdate(request) + const storedRequest = await this.requestRepository.create(request) + + // request already exists + if (!storedRequest) { + return null + } if (this.publishToQueue) { // the validation worker will handle replacing requests