Skip to content

Commit

Permalink
feat: read replica db for findByCid (#1212)
Browse files Browse the repository at this point in the history
* feat: read replica db for findByCid

* fix: env var passing

* chore: repo static inject

* feat: read replica db for findByCid

* fix: env var passing

* feat: make findByCid calls go through read replica

* fix: failing tests

* fix: provide db before service, order matters

* fix: provide replica db connection

* fix: add a spy on findByCid replica

* fix: add .js to imports

* chore: add debug logs

* chore: add lockfile cuz CI needs it for some reason

* feat: add metrics when request is not found in replica db

* fix: move logs + metrics above the call

---------

Co-authored-by: Samika Kashyap <[email protected]>
  • Loading branch information
samika98 and Samika Kashyap authored May 31, 2024
1 parent c2121a0 commit e8b6558
Show file tree
Hide file tree
Showing 14 changed files with 206 additions and 36 deletions.
12 changes: 12 additions & 0 deletions config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,18 @@
"tableName": "knex_migrations"
}
},
"replica_db": {
"client": "postgresql",
"connection": {
"database": "@@REPLICA_DB_NAME",
"host": "@@REPLICA_DB_HOST",
"user": "@@REPLICA_DB_USERNAME",
"password": "@@REPLICA_DB_PASSWORD",
"port": "@@REPLICA_DB_PORT",
"connectionString": ""
},
"debug": false
},
"queue": {
"type": "sqs",
"awsRegion": "us-east-1",
Expand Down
9 changes: 9 additions & 0 deletions config/env/dev.json
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,15 @@
"port": "@@DB_PORT"
}
},
"replica_db": {
"connection": {
"database": "@@REPLICA_DB_NAME",
"host": "@@REPLICA_DB_HOST",
"user": "@@REPLICA_DB_USERNAME",
"password": "@@REPLICA_DB_PASSWORD",
"port": "@@REPLICA_DB_PORT"
}
},
"queue": {
"type": "sqs",
"awsRegion": "@@AWS_REGION",
Expand Down
9 changes: 9 additions & 0 deletions config/env/prod.json
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,15 @@
"port": "@@DB_PORT"
}
},
"replica_db": {
"connection": {
"database": "@@REPLICA_DB_NAME",
"host": "@@REPLICA_DB_HOST",
"user": "@@REPLICA_DB_USERNAME",
"password": "@@REPLICA_DB_PASSWORD",
"port": "@@REPLICA_DB_PORT"
}
},
"queue": {
"type": "sqs",
"awsRegion": "@@AWS_REGION",
Expand Down
10 changes: 10 additions & 0 deletions config/env/test.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,16 @@
"tableName": "knex_migrations"
}
},
"replica_db": {
"connection": {
"database": "@@REPLICA_DB_NAME",
"host": "@@REPLICA_DB_HOST",
"user": "@@REPLICA_DB_USERNAME",
"password": "@@REPLICA_DB_PASSWORD",
"port": "@@REPLICA_DB_PORT",
"connectionString": "@@DATABASE_URL"
}
},
"queue": {
"type": "sqs",
"awsRegion": "us-east-1",
Expand Down
16 changes: 8 additions & 8 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@
"supertest": "^6.3.3",
"tmp-promise": "^3.0.3",
"ts-essentials": "^9.3.2",
"typescript": "^5.0.4"
"typescript": "^5.4.5"
},
"release-it": {
"git": {
Expand Down
27 changes: 19 additions & 8 deletions src/__tests__/ceramic_integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import type { GanacheServer } from './make-ganache.util.js'
import tmp from 'tmp-promise'
import getPort from 'get-port'
import type { Knex } from 'knex'
import { clearTables, createDbConnection } from '../db-connection.js'
import { clearTables, createDbConnection, createReplicaDbConnection } from '../db-connection.js'
import { CeramicAnchorApp } from '../app.js'
import { config } from 'node-config-ts'
import cloneDeep from 'lodash.clonedeep'
Expand Down Expand Up @@ -184,7 +184,8 @@ interface MinimalCASConfig {
async function makeCAS(
container: Injector,
dbConnection: Knex,
minConfig: MinimalCASConfig
minConfig: MinimalCASConfig,
replicaDbConnection: Knex
): Promise<CeramicAnchorApp> {
const configCopy = cloneDeep(config)
configCopy.mode = minConfig.mode
Expand All @@ -204,7 +205,7 @@ async function makeCAS(
mode: 'inmemory',
}
return new CeramicAnchorApp(
container.provideValue('config', configCopy).provideValue('dbConnection', dbConnection)
container.provideValue('config', configCopy).provideValue('dbConnection', dbConnection).provideValue('replicaDbConnection', replicaDbConnection)
)
}

Expand Down Expand Up @@ -256,6 +257,9 @@ describe('Ceramic Integration Test', () => {
let dbConnection1: Knex
let dbConnection2: Knex

let replicaDbConnection1: Knex
let replicaDbConnection2: Knex

let casPort1: number
let cas1: CeramicAnchorApp
let anchorService1: AnchorService
Expand Down Expand Up @@ -319,12 +323,15 @@ describe('Ceramic Integration Test', () => {
await anchorLauncher.stop()
})

// TODO_WS2-3238_1 : update tests to test with replica db connection as well
// TODO_WS2-3238_2 : make hermetic env have replica db connection
describe('Using anchor version 1', () => {
beforeAll(async () => {
const useSmartContractAnchors = true

// Start anchor services
dbConnection1 = await createDbConnection()
replicaDbConnection1 = await createReplicaDbConnection()
casPort1 = await getPort()

cas1 = await makeCAS(createInjector(), dbConnection1, {
Expand All @@ -333,18 +340,19 @@ describe('Ceramic Integration Test', () => {
ganachePort: ganacheServer.port,
port: casPort1,
useSmartContractAnchors,
})
}, replicaDbConnection1)
await cas1.start()
anchorService1 = cas1.container.resolve('anchorService')
dbConnection2 = await teeDbConnection(dbConnection1)
replicaDbConnection2 = await createReplicaDbConnection()
const casPort2 = await getPort()
cas2 = await makeCAS(createInjector(), dbConnection2, {
mode: 'server',
ipfsPort: ipfsApiPort2,
ganachePort: ganacheServer.port,
port: casPort2,
useSmartContractAnchors,
})
}, replicaDbConnection2)
await cas2.start()
anchorService2 = cas2.container.resolve('anchorService')

Expand All @@ -368,6 +376,7 @@ describe('Ceramic Integration Test', () => {
await cas1.stop()
await cas2.stop()
await Promise.all([dbConnection1.destroy(), dbConnection2.destroy()])
await Promise.all([replicaDbConnection1.destroy(), replicaDbConnection2.destroy()])
await Promise.all([ceramic1.close(), ceramic2.close()])
})

Expand Down Expand Up @@ -532,14 +541,15 @@ describe('CAR file', () => {
const casIPFS = await createIPFS(ipfsApiPort)
const ganacheServer = await makeGanache()
const dbConnection = await createDbConnection()
const dummyReplicaDbConnection = await createReplicaDbConnection()
const casPort = await getPort()
const cas = await makeCAS(createInjector(), dbConnection, {
mode: 'server',
ipfsPort: ipfsApiPort,
ganachePort: ganacheServer.port,
port: casPort,
useSmartContractAnchors: true,
})
}, dummyReplicaDbConnection)
await cas.start()

const ceramicIPFS = await createIPFS(await getPort())
Expand Down Expand Up @@ -608,6 +618,7 @@ describe('Metrics Options', () => {
const casIPFS = await createIPFS(ipfsApiPort)
const ganacheServer = await makeGanache()
const dbConnection = await createDbConnection()
const dummyReplicaDbConnection = await createReplicaDbConnection()
const casPort = await getPort()
const cas = await makeCAS(createInjector(), dbConnection, {
mode: 'server',
Expand All @@ -618,7 +629,7 @@ describe('Metrics Options', () => {
metrics: {
instanceIdentifier: '234fffffffffffffffffffffffffffffffff9726129',
},
})
}, dummyReplicaDbConnection)
await cas.start()
// Teardown
await cas.stop()
Expand All @@ -632,7 +643,7 @@ describe('Metrics Options', () => {
metrics: {
instanceIdentifier: '',
},
})
}, dummyReplicaDbConnection)
await cas2.start()
await cas2.stop()

Expand Down
8 changes: 6 additions & 2 deletions src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { BlockchainService } from './services/blockchain/blockchain-service.js'
import { HTTPEventProducerService } from './services/event-producer/http/http-event-producer-service.js'
import { AnchorRepository } from './repositories/anchor-repository.js'
import { RequestRepository } from './repositories/request-repository.js'
import { ReplicationRequestRepository } from './repositories/replication-request-repository.js'
import { TransactionRepository } from './repositories/transaction-repository.js'
import { HealthcheckController } from './controllers/healthcheck-controller.js'
import { AnchorController } from './controllers/anchor-controller.js'
Expand Down Expand Up @@ -48,11 +49,13 @@ import { makeWitnessService, type IWitnessService } from './services/witness-ser
type DependenciesContext = {
config: Config
dbConnection: Knex
replicaDbConnection: Knex
}

type ProvidedContext = {
anchorService: AnchorService
requestRepository: RequestRepository
replicationRequestRepository: ReplicationRequestRepository
anchorRepository: AnchorRepository
transactionRepository: TransactionRepository
blockchainService: BlockchainService
Expand Down Expand Up @@ -98,6 +101,7 @@ export class CeramicAnchorApp {
.provideFactory('requestRepository', RequestRepository.make)
.provideClass('anchorRepository', AnchorRepository)
.provideClass('transactionRepository', TransactionRepository)
.provideClass('replicationRequestRepository', ReplicationRequestRepository)
// register services
.provideFactory('blockchainService', EthereumBlockchainService.make)
.provideClass('eventProducerService', HTTPEventProducerService)
Expand All @@ -113,8 +117,8 @@ export class CeramicAnchorApp {
.provideClass('healthcheckService', HealthcheckService)
.provideClass('requestPresentationService', RequestPresentationService)
.provideClass('anchorRequestParamsParser', AnchorRequestParamsParser)
.provideClass('requestService', RequestService)
.provideClass('continualAnchoringScheduler', TaskSchedulerService)
.provideClass('requestService', RequestService)

try {
Metrics.start(
Expand All @@ -130,7 +134,7 @@ export class CeramicAnchorApp {
Metrics.count('HELLO', 1)
logger.imp('Metrics exporter started')
if (this.config.metrics.instanceIdentifier) {
Metrics.setInstanceIdentifier(this.config.metrics.instanceIdentifier)
Metrics.setInstanceIdentifier(this.config.metrics.instanceIdentifier)
}
} catch (e: any) {
logger.imp('ERROR: Metrics exporter failed to start. Continuing anyway.')
Expand Down
23 changes: 19 additions & 4 deletions src/controllers/__tests__/request-controller.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { describe, expect, jest, test, beforeAll, afterAll } from '@jest/globals'
import { createDbConnection, clearTables } from '../../db-connection.js'
import { createDbConnection, clearTables, createReplicaDbConnection } from '../../db-connection.js'
import { createInjector, Injector } from 'typed-inject'
import { config } from 'node-config-ts'
import { RequestController } from '../request-controller.js'
Expand Down Expand Up @@ -32,11 +32,13 @@ import { RequestService } from '../../services/request-service.js'
import { ValidationSqsQueueService } from '../../services/queue/sqs-queue-service.js'
import { makeWitnessService } from '../../services/witness-service.js'
import { makeMerkleCarService } from '../../services/merkle-car-service.js'
import { ReplicationRequestRepository } from '../../repositories/replication-request-repository.js'

type Tokens = {
requestController: RequestController
requestRepository: RequestRepository
metadataService: IMetadataService
replicationRequestRepository: ReplicationRequestRepository
}

const FAKE_STREAM_ID_1 = StreamID.fromString(
Expand Down Expand Up @@ -80,19 +82,24 @@ class MockMetadataService implements IMetadataService {

// TODO: CDB-2287 Add tests checking for expected errors when missing/malformed CID/StreamID/GenesisCommit
// are detected in a CAR file
// TODO: WS2-3238 Add calls to replica db connection in the test as well
describe('createRequest', () => {
let dbConnection: Knex
let replicaDbConnection: Knex
let container: Injector<Tokens>
let controller: RequestController

beforeAll(async () => {
dbConnection = await createDbConnection()
replicaDbConnection = await createReplicaDbConnection()
await clearTables(dbConnection)
container = createInjector()
.provideValue('config', config)
.provideValue('dbConnection', dbConnection)
.provideValue('replicaDbConnection', replicaDbConnection)
.provideClass('metadataRepository', MetadataRepository)
.provideFactory('requestRepository', RequestRepository.make)
.provideClass('replicationRequestRepository', ReplicationRequestRepository)
.provideClass('anchorRepository', AnchorRepository)
.provideClass('ipfsService', MockIpfsService)
.provideFactory('merkleCarService', makeMerkleCarService)
Expand Down Expand Up @@ -343,13 +350,19 @@ describe('createRequest', () => {

const requestRepository = container.resolve('requestRepository')
const findByCidSpy = jest.spyOn(requestRepository, 'findByCid')
const replicaRequestRepository = container.resolve('replicationRequestRepository')
const findByCidSpyReplica = jest.spyOn(replicaRequestRepository, 'findByCid')
const res0 = mockResponse()
const res1 = mockResponse()

await Promise.all([controller.createRequest(req, res0), controller.createRequest(req, res1)])

expect(findByCidSpy).toBeCalledTimes(1)
expect(findByCidSpy).toBeCalledWith(cid)
try {
expect(findByCidSpyReplica).toBeCalledTimes(1)
expect(findByCidSpyReplica).toBeCalledWith(cid)
} catch (err) {
expect(findByCidSpy).toBeCalledTimes(1)
expect(findByCidSpy).toBeCalledWith(cid)
}

const status0 = res0.status.mock.calls[0][0]
const status1 = res1.status.mock.calls[0][0]
Expand All @@ -370,8 +383,10 @@ describe('createRequest', () => {
queue: { sqsQueueUrl: 'testurl' },
})
.provideValue('dbConnection', dbConnection)
.provideValue('replicaDbConnection', replicaDbConnection)
.provideClass('metadataRepository', MetadataRepository)
.provideFactory('requestRepository', RequestRepository.make)
.provideClass('replicationRequestRepository', ReplicationRequestRepository)
.provideClass('anchorRepository', AnchorRepository)
.provideClass('ipfsService', MockIpfsService)
.provideFactory('merkleCarService', makeMerkleCarService)
Expand Down
Loading

0 comments on commit e8b6558

Please sign in to comment.