Skip to content

Commit

Permalink
feat: read replica db for findByCid
Browse files Browse the repository at this point in the history
  • Loading branch information
Samika Kashyap committed May 17, 2024
1 parent 9e952f0 commit 0c3fb85
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 7 deletions.
7 changes: 6 additions & 1 deletion config/env/dev.json
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,15 @@
"db": {
"connection": {
"database": "@@DB_NAME",
"replicaDatabase": "@@REPLICA_DB_NAME",
"host": "@@DB_HOST",
"replicaHost": "@@REPLICA_DB_HOST",
"user": "@@DB_USERNAME",
"replicaUser": "@@REPLICA_DB_USERNAME",
"password": "@@DB_PASSWORD",
"port": "@@DB_PORT"
"replicaPassword": "@@DB_PASSWORD",
"port": "@@DB_PORT",
"replicaPort": "@@REPLICA_DB_PORT"
}
},
"queue": {
Expand Down
6 changes: 5 additions & 1 deletion src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,18 @@ import {
} from './services/queue/sqs-queue-service.js'
import { makeMerkleCarService, type IMerkleCarService } from './services/merkle-car-service.js'
import { makeWitnessService, type IWitnessService } from './services/witness-service.js'
import { ReplicationRequestRepository } from './repositories/replication-request-repository.js'

type DependenciesContext = {
config: Config
dbConnection: Knex
replicaDbConnection: Knex
}

type ProvidedContext = {
anchorService: AnchorService
requestRepository: RequestRepository
replicationRequestRepository: ReplicationRequestRepository
anchorRepository: AnchorRepository
transactionRepository: TransactionRepository
blockchainService: BlockchainService
Expand Down Expand Up @@ -96,6 +99,7 @@ export class CeramicAnchorApp {
// register repositories
.provideClass('metadataRepository', MetadataRepository)
.provideFactory('requestRepository', RequestRepository.make)
.provideFactory('replicationRequestRepository', ReplicationRequestRepository.make)
.provideClass('anchorRepository', AnchorRepository)
.provideClass('transactionRepository', TransactionRepository)
// register services
Expand Down Expand Up @@ -130,7 +134,7 @@ export class CeramicAnchorApp {
Metrics.count('HELLO', 1)
logger.imp('Metrics exporter started')
if (this.config.metrics.instanceIdentifier) {
Metrics.setInstanceIdentifier(this.config.metrics.instanceIdentifier)
Metrics.setInstanceIdentifier(this.config.metrics.instanceIdentifier)
}
} catch (e: any) {
logger.imp('ERROR: Metrics exporter failed to start. Continuing anyway.')
Expand Down
29 changes: 29 additions & 0 deletions src/db-connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,35 @@ export async function createDbConnection(dbConfig: Db = config.db): Promise<Knex
return connection
}

export async function createReplicaDbConnection(dbConfig: Db = config.db): Promise<Knex> {
const replicaKnexConfig: Knex.Config = {
client: dbConfig.client,
connection: dbConfig.replicaConnectionString,
debug: dbConfig.debug,
migrations: dbConfig.migrations,
pool: { min: 3, max: 30 },
// In our DB, identifiers have snake case formatting while in our code identifiers have camel case formatting.
// We use the following transformers so we can always use camel case formatting in our code.

// transforms identifier names in our queries from camel case to snake case. This is because the DB uses snake case identifiers.
wrapIdentifier: (value, origWrap): string => origWrap(snakeCase(value)),
// modifies returned rows from the DB. This transforms identifiers from snake case to camel case.
postProcessResponse: (result) => toCamelCase(result),
}
let connection
try {
connection = knex(replicaKnexConfig)
} catch (e) {
throw new Error(`Database connection failed: ${e}`)
}

await runMigrations(connection).catch((err) => {
throw new Error(`Migrations have failed: ${err}`)
})

return connection
}

/**
* USED FOR TESTING
* Clears all tables
Expand Down
5 changes: 4 additions & 1 deletion src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,19 @@ import 'reflect-metadata'
import { CeramicAnchorApp } from './app.js'
import { logger } from './logger/index.js'
import { config } from 'node-config-ts'
import { createDbConnection } from './db-connection.js'
import { createDbConnection, createReplicaDbConnection } from './db-connection.js'
import { createInjector } from 'typed-inject'

async function startApp() {
logger.imp('Connecting to database...')
const connection = await createDbConnection()
logger.imp(`Connected to database: ${config.db.client}`)
const replicaConnection = await createReplicaDbConnection()
logger.imp(`Connected to replica database: ${config.db.client}`)

const container = createInjector()
.provideValue('dbConnection', connection)
.provideValue('replicaDbConnection', replicaConnection)
.provideValue('config', config)

const app = new CeramicAnchorApp(container)
Expand Down
40 changes: 40 additions & 0 deletions src/repositories/replication-request-repository.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { Knex } from 'knex'
import { Request } from '../models/request'
import { CID } from 'multiformats/cid'
import { Config } from 'node-config-ts'

export class ReplicationRequestRepository {
static make = make
private readonly connection: Knex

constructor(connection: Knex) {
this.connection = connection
}

/**
* Finds a request with the given CID if exists using the replica database.
* @param cid CID the request is for
* @returns Promise for the associated request
*/
async findByCid(cid: CID | string): Promise<Request | undefined> {
const found = await this.connection
.table('requests')
.where({ cid: String(cid) })
.first()
if (found) {
return new Request(found)
}
return undefined
}

// Add more methods that utilize the replica connection here
}

/**
* Injection factory.
*/
function make(config: Config, replicaConnection: Knex) {
return new ReplicationRequestRepository(replicaConnection)
}

make.inject = ['config', 'replicaDbConnection'] as const
12 changes: 8 additions & 4 deletions src/services/request-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { IQueueProducerService } from './queue/queue-service.type.js'
import { RequestQMessage } from '../models/queue-message.js'
import type { OutputOf } from 'codeco'
import type { CASResponse } from '@ceramicnetwork/codecs'
import { ReplicationRequestRepository } from '../repositories/replication-request-repository.js'

const ISO8601_DATE_FORMAT = new Intl.DateTimeFormat('sv-SE', {
month: '2-digit',
Expand All @@ -32,6 +33,7 @@ export class RequestService {
static inject = [
'config',
'requestRepository',
'replicationRequestRepository',
'requestPresentationService',
'metadataService',
'validationQueueService',
Expand All @@ -40,6 +42,7 @@ export class RequestService {
constructor(
config: Config,
private readonly requestRepository: RequestRepository,
private readonly replicationRequestRepository: ReplicationRequestRepository,
private readonly requestPresentationService: RequestPresentationService,
private readonly metadataService: IMetadataService,
private readonly validationQueueService: IQueueProducerService<RequestQMessage>
Expand All @@ -63,7 +66,8 @@ export class RequestService {
}

async findByCid(cid: CID): Promise<OutputOf<typeof CASResponse> | undefined> {
const found = await this.requestRepository.findByCid(cid)
// TODO: updated the call only here for now, upadte for calls
const found = await this.replicationRequestRepository.findByCid(cid)
if (!found) return undefined
return this.requestPresentationService.body(found)
}
Expand Down Expand Up @@ -109,7 +113,7 @@ export class RequestService {
crt: storedRequest.createdAt,
org: origin,
})
Metrics.count(METRIC_NAMES.PUBLISH_TO_QUEUE, 1)
Metrics.count(METRIC_NAMES.PUBLISH_TO_QUEUE, 1)
} else {
await this.requestRepository.markReplaced(storedRequest)
Metrics.count(METRIC_NAMES.UPDATED_STORED_REQUEST, 1)
Expand All @@ -126,10 +130,10 @@ export class RequestService {
stream: request.streamId,
origin: request.origin,
cacao: 'cacaoDomain' in params ? params.cacaoDomain : '',
};
}

// DO NOT REMOVE - this logging is used by business metrics
logger.imp(`Anchor request received: ${JSON.stringify(logData)}`);
logger.imp(`Anchor request received: ${JSON.stringify(logData)}`)

return this.requestPresentationService.body(storedRequest)
}
Expand Down

0 comments on commit 0c3fb85

Please sign in to comment.