Skip to content

Commit

Permalink
fix: only create request if it does not exist
Browse files Browse the repository at this point in the history
  • Loading branch information
stephhuynh18 committed Apr 4, 2024
1 parent 14771a4 commit 0b745f7
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 62 deletions.
35 changes: 35 additions & 0 deletions src/controllers/__tests__/request-controller.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,41 @@ describe('createRequest', () => {
await controller.createRequest(req, mockResponse())
expect(markPreviousReplacedSpy).toBeCalledTimes(1)
})

test('simultaenous requests', async () => {
const cid = randomCID()
const streamId = randomStreamID()
const timestamp = new Date()
const origin = '203.0.113.195'
const req = mockRequest({
headers: {
'Content-type': 'application/json',
'X-Forwarded-For': [` ${origin}`, `${origin}, 2001:db8:85a3:8d3:1319:8a2e:370:7348`],
},
body: {
cid: cid.toString(),
streamId: streamId.toString(),
timestamp: timestamp.toISOString(),
},
})

const requestRepository = container.resolve('requestRepository')
const findByCidSpy = jest.spyOn(requestRepository, 'findByCid')
const res0 = mockResponse()
const res1 = mockResponse()

await Promise.all([controller.createRequest(req, res0), controller.createRequest(req, res1)])

expect(findByCidSpy).toBeCalledTimes(1)
expect(findByCidSpy).toBeCalledWith(cid)

const status0 = res0.status.mock.calls[0][0]
const status1 = res1.status.mock.calls[0][0]
const onlyOneCreated =
(status0 === StatusCodes.CREATED && status1 === StatusCodes.ACCEPTED) ||
(status1 === StatusCodes.CREATED && status0 === StatusCodes.ACCEPTED)
expect(onlyOneCreated).toBeTruthy()
})
})

describe('Publish to queue', () => {
Expand Down
24 changes: 15 additions & 9 deletions src/controllers/request-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,17 +119,23 @@ export class RequestController {
)

try {
const found = await this.requestService.findByCid(requestParams.cid)
logger.debug(`Found request for ${requestParams.cid} of stream ${requestParams.streamId}`)
if (found) {
return res.status(StatusCodes.ACCEPTED).json(found)
}

const body = await this.requestService.createOrUpdate(requestParams, origin)
const body = await this.requestService.create(requestParams, origin)

Metrics.count(METRIC_NAMES.ANCHOR_REQUESTED, 1, { source: parseOrigin(req) })
// request was newly created
if (body) {
Metrics.count(METRIC_NAMES.ANCHOR_REQUESTED, 1, { source: parseOrigin(req) })
return res.status(StatusCodes.CREATED).json(body)
}

return res.status(StatusCodes.CREATED).json(body)
// request already exists so retrieve it
const found = await this.requestService.findByCid(requestParams.cid)
if (!found) {
throw new Error(
`Request with cid ${requestParams.cid} was not created and not found. This should not happen`
)
}
logger.debug(`Found request for ${requestParams.cid} of stream ${requestParams.streamId}`)
return res.status(StatusCodes.ACCEPTED).json(found)
} catch (err: any) {
Metrics.count(METRIC_NAMES.REQUEST_NOT_CREATED, 1, { source: parseOrigin(req) })
return this.getBadRequestResponse(
Expand Down
74 changes: 36 additions & 38 deletions src/repositories/__tests__/request-repository.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,19 @@ describe('request repository test', () => {
await connection2.destroy()
})

test('createOrUpdate: can createOrUpdate simultaneously', async () => {
test('create: simultaneous creates will only create one request', async () => {
const request = generateRequest({
status: RequestStatus.READY,
})

const [result1, result2] = await Promise.all([
requestRepository.createOrUpdate(request),
requestRepository.createOrUpdate(request),
requestRepository.create(request),
requestRepository.create(request),
])
expect(result1).toEqual(result2)
expect(
(result1 === null && result2?.cid === request.cid) ||
(result2 === null && result1?.cid === request.cid)
).toBeTruthy()
})

describe('findRequestsToGarbageCollect', () => {
Expand Down Expand Up @@ -801,41 +804,41 @@ describe('request repository test', () => {
const streamId = randomStreamID()

// Create three COMPLETED requests. These should not be changed
const completedRequests = await Promise.all(
times(3).map(async (n) => {
const request = new Request({
const completedRequests = await requestRepository.createRequests(
times(3).map((n) => {
return new Request({
cid: randomCID().toString(),
streamId: streamId.toString(),
timestamp: ONE_HOUR_AGO.minus({ minute: n }).toJSDate(),
status: RequestStatus.COMPLETED,
origin: 'same-origin',
})
return requestRepository.createOrUpdate(request)
})
)

// PENDING but with a different streamId
const unrelatedStreamRequest = await requestRepository.createOrUpdate(
const [unrelatedStreamRequest] = await requestRepository.createRequests([
new Request({
cid: randomCID().toString(),
streamId: randomStreamID().toString(),
timestamp: ONE_HOUR_AGO.toJSDate(),
status: RequestStatus.PENDING,
origin: 'same-origin',
})
)
}),
])

// Create three PENDING requests at `oneHourAgo` plus some minutes
const requestsP = times(3).map(async (n) => {
const request = new Request({
cid: randomCID().toString(),
streamId: streamId.toString(),
timestamp: ONE_HOUR_AGO.plus({ minute: n }).toJSDate(),
status: RequestStatus.PENDING,
origin: 'same-origin',
const requestsP = await requestRepository.createRequests(
times(3).map((n) => {
return new Request({
cid: randomCID().toString(),
streamId: streamId.toString(),
timestamp: ONE_HOUR_AGO.plus({ minute: n }).toJSDate(),
status: RequestStatus.PENDING,
origin: 'same-origin',
})
})
return requestRepository.createOrUpdate(request)
})
)
const requests = await Promise.all(requestsP)
const last = requests[requests.length - 1]
expectPresent(last)
Expand Down Expand Up @@ -867,17 +870,18 @@ describe('request repository test', () => {

test('mark regardless of time', async () => {
const streamId = randomStreamID()
const requestsP = times(3).map(async (n) => {
const request = new Request({
cid: randomCID().toString(),
streamId: streamId.toString(),
timestamp: ONE_HOUR_AGO.plus({ minute: n }).toJSDate(),
status: RequestStatus.PENDING,
origin: 'same-origin',
const requestsP = requestRepository.createRequests(
times(3).map((n) => {
return new Request({
cid: randomCID().toString(),
streamId: streamId.toString(),
timestamp: ONE_HOUR_AGO.plus({ minute: n }).toJSDate(),
status: RequestStatus.PENDING,
origin: 'same-origin',
})
})
return requestRepository.createOrUpdate(request)
})
const requests: Array<Request> = await Promise.all(requestsP)
)
const requests: Array<Request> = await requestsP
expectPresent(requests[0])
const rowsAffected = await requestRepository.markReplaced(requests[0])
expect(rowsAffected).toEqual(requests.length - 1) // Mark every request except the last one
Expand All @@ -900,14 +904,8 @@ describe('request repository test', () => {
const MAX_LIMIT = 3

function createRequests(n: number): Promise<Request[]> {
return Promise.all(
times(n).map(() => {
return requestRepository.createOrUpdate(
generateRequest({
status: RequestStatus.READY,
})
)
})
return requestRepository.createRequests(
times(n).map(() => generateRequest({ status: RequestStatus.READY }))
)
}

Expand Down
22 changes: 14 additions & 8 deletions src/repositories/request-repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,16 +110,19 @@ export class RequestRepository {
}

/**
* Create/update client request
* @returns A promise that resolves to the created request
* Create a client request
* @returns A promise that resolves to the created request if it doesn't already exist
*/
async createOrUpdate(request: Request): Promise<Request> {
const keys = Object.keys(request).filter((key) => key !== 'id') // all keys except ID
async create(request: Request): Promise<Request | null> {
const [created] = await this.table
.insert(request.toDB(), ['id'])
.returning(Object.keys(RequestCodec.props))
.onConflict('cid')
.merge(keys)
.ignore()

if (!created) {
return null
}

logEvent.db({
type: 'request',
Expand All @@ -139,10 +142,13 @@ export class RequestRepository {
/**
* For test use. Creates an array of requests.
* @param requests array of requests
* @returns
* @returns array of the stored requests
*/
async createRequests(requests: Array<Request>): Promise<void> {
await this.table.insert(requests.map((request) => request.toDB()))
async createRequests(requests: Array<Request>): Promise<Request[]> {
const stored = await this.table
.insert(requests.map((request) => request.toDB()))
.returning(Object.keys(RequestCodec.props))
return stored.map((r) => new Request(r))
}

/**
Expand Down
5 changes: 4 additions & 1 deletion src/services/__tests__/fake-factory.util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ export class FakeFactory {
request.message = 'Request is pending.'
request.pinned = true

const stored = await this.requestRepository.createOrUpdate(request)
const stored = await this.requestRepository.create(request)
if (!stored) {
throw new Error(`Request with cid ${request.cid} already exists. Cannot create again`)
}
await this.requestRepository.markReplaced(stored)
return stored
}
Expand Down
10 changes: 8 additions & 2 deletions src/services/__tests__/ipfs-service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -273,12 +273,15 @@ describe('pubsub', () => {
}
)
const requestRepository = injector.resolve('requestRepository')
const createdRequest = await requestRepository.createOrUpdate(
const createdRequest = await requestRepository.create(
generateRequest({
streamId: pubsubMessage.stream.toString(),
status: RequestStatus.COMPLETED,
})
)
if (!createdRequest) {
throw new Error('Failed to create request because it already exists')
}
const anchorRepository = injector.resolve('anchorRepository')
const anchorCid = randomCID()
await anchorRepository.createAnchors([
Expand Down Expand Up @@ -324,7 +327,7 @@ describe('pubsub', () => {
// @ts-ignore
const beforeWindow = new Date(Date.now() - service.pubsubResponderWindowMs - 1000)
const requestRepository = injector.resolve('requestRepository')
const createdRequest = await requestRepository.createOrUpdate(
const createdRequest = await requestRepository.create(
generateRequest({
streamId: pubsubMessage.stream.toString(),
status: RequestStatus.COMPLETED,
Expand All @@ -333,6 +336,9 @@ describe('pubsub', () => {
updatedAt: beforeWindow,
})
)
if (!createdRequest) {
throw new Error('Failed to create request because it already exists')
}
const anchorRepository = injector.resolve('anchorRepository')
const anchorCid = randomCID()
await anchorRepository.createAnchors([
Expand Down
13 changes: 9 additions & 4 deletions src/services/request-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ export class RequestService {
private readonly metadataService: IMetadataService,
private readonly validationQueueService: IQueueProducerService<RequestQMessage>
) {
this.publishToQueue = Boolean(config.queue.sqsQueueUrl)
this.publishToQueue = Boolean(config.queue.sqsQueueUrl)
}

async getStatusForCid(cid: CID): Promise<OutputOf<typeof CASResponse> | { error: string }> {
Expand All @@ -68,10 +68,10 @@ export class RequestService {
return this.requestPresentationService.body(found)
}

async createOrUpdate(
async create(
params: RequestAnchorParams,
origin: string
): Promise<OutputOf<typeof CASResponse>> {
): Promise<OutputOf<typeof CASResponse> | null> {
let genesisFields: GenesisFields
if ('genesisFields' in params) {
genesisFields = params.genesisFields
Expand All @@ -92,7 +92,12 @@ export class RequestService {
request.pinned = true
request.timestamp = params.timestamp ?? new Date()

const storedRequest = await this.requestRepository.createOrUpdate(request)
const storedRequest = await this.requestRepository.create(request)

// request already exists
if (!storedRequest) {
return null
}

if (this.publishToQueue) {
// the validation worker will handle replacing requests
Expand Down

0 comments on commit 0b745f7

Please sign in to comment.