diff --git a/config/env/dev.json b/config/env/dev.json index bde134af..862fa10b 100644 --- a/config/env/dev.json +++ b/config/env/dev.json @@ -69,10 +69,15 @@ "db": { "connection": { "database": "@@DB_NAME", + "replicaDatabase": "@@REPLICA_DB_NAME", "host": "@@DB_HOST", + "replicaHost": "@@REPLICA_DB_HOST", "user": "@@DB_USERNAME", + "replicaUser": "@@REPLICA_DB_USERNAME", "password": "@@DB_PASSWORD", - "port": "@@DB_PORT" + "replicaPassword": "@@DB_PASSWORD", + "port": "@@DB_PORT", + "replicaPort": "@@REPLICA_DB_PORT" } }, "queue": { diff --git a/src/app.ts b/src/app.ts index e4d8856f..1c8473b8 100644 --- a/src/app.ts +++ b/src/app.ts @@ -44,15 +44,18 @@ import { } from './services/queue/sqs-queue-service.js' import { makeMerkleCarService, type IMerkleCarService } from './services/merkle-car-service.js' import { makeWitnessService, type IWitnessService } from './services/witness-service.js' +import { ReplicationRequestRepository } from './repositories/replication-request-repository.js' type DependenciesContext = { config: Config dbConnection: Knex + replicaDbConnection: Knex } type ProvidedContext = { anchorService: AnchorService requestRepository: RequestRepository + replicationRequestRepository: ReplicationRequestRepository anchorRepository: AnchorRepository transactionRepository: TransactionRepository blockchainService: BlockchainService @@ -96,6 +99,7 @@ export class CeramicAnchorApp { // register repositories .provideClass('metadataRepository', MetadataRepository) .provideFactory('requestRepository', RequestRepository.make) + .provideFactory('replicationRequestRepository', ReplicationRequestRepository.make) .provideClass('anchorRepository', AnchorRepository) .provideClass('transactionRepository', TransactionRepository) // register services @@ -130,7 +134,7 @@ export class CeramicAnchorApp { Metrics.count('HELLO', 1) logger.imp('Metrics exporter started') if (this.config.metrics.instanceIdentifier) { - Metrics.setInstanceIdentifier(this.config.metrics.instanceIdentifier) + Metrics.setInstanceIdentifier(this.config.metrics.instanceIdentifier) } } catch (e: any) { logger.imp('ERROR: Metrics exporter failed to start. Continuing anyway.') diff --git a/src/db-connection.ts b/src/db-connection.ts index e8c6a2fa..fa1df44f 100644 --- a/src/db-connection.ts +++ b/src/db-connection.ts @@ -69,6 +69,35 @@ export async function createDbConnection(dbConfig: Db = config.db): Promise { + const replicaKnexConfig: Knex.Config = { + client: dbConfig.client, + connection: dbConfig.replicaConnectionString, + debug: dbConfig.debug, + migrations: dbConfig.migrations, + pool: { min: 3, max: 30 }, + // In our DB, identifiers have snake case formatting while in our code identifiers have camel case formatting. + // We use the following transformers so we can always use camel case formatting in our code. + + // transforms identifier names in our queries from camel case to snake case. This is because the DB uses snake case identifiers. + wrapIdentifier: (value, origWrap): string => origWrap(snakeCase(value)), + // modifies returned rows from the DB. This transforms identifiers from snake case to camel case. + postProcessResponse: (result) => toCamelCase(result), + } + let connection + try { + connection = knex(replicaKnexConfig) + } catch (e) { + throw new Error(`Database connection failed: ${e}`) + } + + await runMigrations(connection).catch((err) => { + throw new Error(`Migrations have failed: ${err}`) + }) + + return connection +} + /** * USED FOR TESTING * Clears all tables diff --git a/src/main.ts b/src/main.ts index 46f6b41e..130630fb 100644 --- a/src/main.ts +++ b/src/main.ts @@ -3,16 +3,19 @@ import 'reflect-metadata' import { CeramicAnchorApp } from './app.js' import { logger } from './logger/index.js' import { config } from 'node-config-ts' -import { createDbConnection } from './db-connection.js' +import { createDbConnection, createReplicaDbConnection } from './db-connection.js' import { createInjector } from 'typed-inject' async function startApp() { logger.imp('Connecting to database...') const connection = await createDbConnection() logger.imp(`Connected to database: ${config.db.client}`) + const replicaConnection = await createReplicaDbConnection() + logger.imp(`Connected to replica database: ${config.db.client}`) const container = createInjector() .provideValue('dbConnection', connection) + .provideValue('replicaDbConnection', replicaConnection) .provideValue('config', config) const app = new CeramicAnchorApp(container) diff --git a/src/repositories/replication-request-repository.ts b/src/repositories/replication-request-repository.ts new file mode 100644 index 00000000..17c3ba2f --- /dev/null +++ b/src/repositories/replication-request-repository.ts @@ -0,0 +1,40 @@ +import { Knex } from 'knex' +import { Request } from '../models/request' +import { CID } from 'multiformats/cid' +import { Config } from 'node-config-ts' + +export class ReplicationRequestRepository { + static make = make + private readonly connection: Knex + + constructor(connection: Knex) { + this.connection = connection + } + + /** + * Finds a request with the given CID if exists using the replica database. + * @param cid CID the request is for + * @returns Promise for the associated request + */ + async findByCid(cid: CID | string): Promise { + const found = await this.connection + .table('requests') + .where({ cid: String(cid) }) + .first() + if (found) { + return new Request(found) + } + return undefined + } + + // Add more methods that utilize the replica connection here +} + +/** + * Injection factory. + */ +function make(config: Config, replicaConnection: Knex) { + return new ReplicationRequestRepository(replicaConnection) +} + +make.inject = ['config', 'replicaDbConnection'] as const diff --git a/src/services/request-service.ts b/src/services/request-service.ts index a60f6a7b..d3c55757 100644 --- a/src/services/request-service.ts +++ b/src/services/request-service.ts @@ -13,6 +13,7 @@ import { IQueueProducerService } from './queue/queue-service.type.js' import { RequestQMessage } from '../models/queue-message.js' import type { OutputOf } from 'codeco' import type { CASResponse } from '@ceramicnetwork/codecs' +import { ReplicationRequestRepository } from '../repositories/replication-request-repository.js' const ISO8601_DATE_FORMAT = new Intl.DateTimeFormat('sv-SE', { month: '2-digit', @@ -32,6 +33,7 @@ export class RequestService { static inject = [ 'config', 'requestRepository', + 'replicationRequestRepository', 'requestPresentationService', 'metadataService', 'validationQueueService', @@ -40,6 +42,7 @@ export class RequestService { constructor( config: Config, private readonly requestRepository: RequestRepository, + private readonly replicationRequestRepository: ReplicationRequestRepository, private readonly requestPresentationService: RequestPresentationService, private readonly metadataService: IMetadataService, private readonly validationQueueService: IQueueProducerService @@ -63,7 +66,8 @@ export class RequestService { } async findByCid(cid: CID): Promise | undefined> { - const found = await this.requestRepository.findByCid(cid) + // TODO: updated the call only here for now, upadte for calls + const found = await this.replicationRequestRepository.findByCid(cid) if (!found) return undefined return this.requestPresentationService.body(found) } @@ -109,7 +113,7 @@ export class RequestService { crt: storedRequest.createdAt, org: origin, }) - Metrics.count(METRIC_NAMES.PUBLISH_TO_QUEUE, 1) + Metrics.count(METRIC_NAMES.PUBLISH_TO_QUEUE, 1) } else { await this.requestRepository.markReplaced(storedRequest) Metrics.count(METRIC_NAMES.UPDATED_STORED_REQUEST, 1) @@ -126,10 +130,10 @@ export class RequestService { stream: request.streamId, origin: request.origin, cacao: 'cacaoDomain' in params ? params.cacaoDomain : '', - }; + } // DO NOT REMOVE - this logging is used by business metrics - logger.imp(`Anchor request received: ${JSON.stringify(logData)}`); + logger.imp(`Anchor request received: ${JSON.stringify(logData)}`) return this.requestPresentationService.body(storedRequest) }