Skip to content

Commit

Permalink
feat: make replica connection optional
Browse files Browse the repository at this point in the history
  • Loading branch information
Samika Kashyap authored and Samika Kashyap committed Jun 4, 2024
1 parent 0fd79ff commit c10f1c5
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 56 deletions.
116 changes: 72 additions & 44 deletions src/__tests__/ceramic_integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ async function makeCAS(
container: Injector,
dbConnection: Knex,
minConfig: MinimalCASConfig,
replicaDbConnection: Knex
replicaDbConnection: { connection: Knex; type: string }
): Promise<CeramicAnchorApp> {
const configCopy = cloneDeep(config)
configCopy.mode = minConfig.mode
Expand All @@ -205,7 +205,10 @@ async function makeCAS(
mode: 'inmemory',
}
return new CeramicAnchorApp(
container.provideValue('config', configCopy).provideValue('dbConnection', dbConnection).provideValue('replicaDbConnection', replicaDbConnection)
container
.provideValue('config', configCopy)
.provideValue('dbConnection', dbConnection)
.provideValue('replicaDbConnection', replicaDbConnection)
)
}

Expand Down Expand Up @@ -257,8 +260,8 @@ describe('Ceramic Integration Test', () => {
let dbConnection1: Knex
let dbConnection2: Knex

let replicaDbConnection1: Knex
let replicaDbConnection2: Knex
let replicaDbConnection1: { connection: Knex; type: string }
let replicaDbConnection2: { connection: Knex; type: string }

let casPort1: number
let cas1: CeramicAnchorApp
Expand Down Expand Up @@ -334,25 +337,35 @@ describe('Ceramic Integration Test', () => {
replicaDbConnection1 = await createReplicaDbConnection()
casPort1 = await getPort()

cas1 = await makeCAS(createInjector(), dbConnection1, {
mode: 'server',
ipfsPort: ipfsApiPort1,
ganachePort: ganacheServer.port,
port: casPort1,
useSmartContractAnchors,
}, replicaDbConnection1)
cas1 = await makeCAS(
createInjector(),
dbConnection1,
{
mode: 'server',
ipfsPort: ipfsApiPort1,
ganachePort: ganacheServer.port,
port: casPort1,
useSmartContractAnchors,
},
replicaDbConnection1
)
await cas1.start()
anchorService1 = cas1.container.resolve('anchorService')
dbConnection2 = await teeDbConnection(dbConnection1)
replicaDbConnection2 = await createReplicaDbConnection()
const casPort2 = await getPort()
cas2 = await makeCAS(createInjector(), dbConnection2, {
mode: 'server',
ipfsPort: ipfsApiPort2,
ganachePort: ganacheServer.port,
port: casPort2,
useSmartContractAnchors,
}, replicaDbConnection2)
cas2 = await makeCAS(
createInjector(),
dbConnection2,
{
mode: 'server',
ipfsPort: ipfsApiPort2,
ganachePort: ganacheServer.port,
port: casPort2,
useSmartContractAnchors,
},
replicaDbConnection2
)
await cas2.start()
anchorService2 = cas2.container.resolve('anchorService')

Expand All @@ -376,7 +389,7 @@ describe('Ceramic Integration Test', () => {
await cas1.stop()
await cas2.stop()
await Promise.all([dbConnection1.destroy(), dbConnection2.destroy()])
await Promise.all([replicaDbConnection1.destroy(), replicaDbConnection2.destroy()])
await Promise.all([replicaDbConnection1.connection.destroy(), replicaDbConnection2.connection.destroy()])
await Promise.all([ceramic1.close(), ceramic2.close()])
})

Expand Down Expand Up @@ -543,13 +556,18 @@ describe('CAR file', () => {
const dbConnection = await createDbConnection()
const dummyReplicaDbConnection = await createReplicaDbConnection()
const casPort = await getPort()
const cas = await makeCAS(createInjector(), dbConnection, {
mode: 'server',
ipfsPort: ipfsApiPort,
ganachePort: ganacheServer.port,
port: casPort,
useSmartContractAnchors: true,
}, dummyReplicaDbConnection)
const cas = await makeCAS(
createInjector(),
dbConnection,
{
mode: 'server',
ipfsPort: ipfsApiPort,
ganachePort: ganacheServer.port,
port: casPort,
useSmartContractAnchors: true,
},
dummyReplicaDbConnection
)
await cas.start()

const ceramicIPFS = await createIPFS(await getPort())
Expand Down Expand Up @@ -620,30 +638,40 @@ describe('Metrics Options', () => {
const dbConnection = await createDbConnection()
const dummyReplicaDbConnection = await createReplicaDbConnection()
const casPort = await getPort()
const cas = await makeCAS(createInjector(), dbConnection, {
mode: 'server',
ipfsPort: ipfsApiPort,
ganachePort: ganacheServer.port,
port: casPort,
useSmartContractAnchors: true,
metrics: {
instanceIdentifier: '234fffffffffffffffffffffffffffffffff9726129',
const cas = await makeCAS(
createInjector(),
dbConnection,
{
mode: 'server',
ipfsPort: ipfsApiPort,
ganachePort: ganacheServer.port,
port: casPort,
useSmartContractAnchors: true,
metrics: {
instanceIdentifier: '234fffffffffffffffffffffffffffffffff9726129',
},
},
}, dummyReplicaDbConnection)
dummyReplicaDbConnection
)
await cas.start()
// Teardown
await cas.stop()

const cas2 = await makeCAS(createInjector(), dbConnection, {
mode: 'server',
ipfsPort: ipfsApiPort,
ganachePort: ganacheServer.port,
port: casPort,
useSmartContractAnchors: true,
metrics: {
instanceIdentifier: '',
const cas2 = await makeCAS(
createInjector(),
dbConnection,
{
mode: 'server',
ipfsPort: ipfsApiPort,
ganachePort: ganacheServer.port,
port: casPort,
useSmartContractAnchors: true,
metrics: {
instanceIdentifier: '',
},
},
}, dummyReplicaDbConnection)
dummyReplicaDbConnection
)
await cas2.start()
await cas2.stop()

Expand Down
2 changes: 1 addition & 1 deletion src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ import { makeWitnessService, type IWitnessService } from './services/witness-ser
type DependenciesContext = {
config: Config
dbConnection: Knex
replicaDbConnection: Knex
replicaDbConnection: { connection: Knex; type: string }
}

type ProvidedContext = {
Expand Down
2 changes: 1 addition & 1 deletion src/controllers/__tests__/request-controller.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class MockMetadataService implements IMetadataService {
// TODO: WS2-3238 Add calls to replica db connection in the test as well
describe('createRequest', () => {
let dbConnection: Knex
let replicaDbConnection: Knex
let replicaDbConnection: { connection: Knex; type: string }
let container: Injector<Tokens>
let controller: RequestController

Expand Down
18 changes: 14 additions & 4 deletions src/db-connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ export async function createDbConnection(dbConfig: Db = config.db): Promise<Knex

export async function createReplicaDbConnection(
replica_db_config: Replicadb = config.replica_db
): Promise<Knex> {
): Promise<{ connection: Knex; type: string }> {
const replicaKnexConfig: Knex.Config = {
client: replica_db_config.client,
connection: replica_db_config.connection.connectionString || {
Expand All @@ -93,12 +93,22 @@ export async function createReplicaDbConnection(
}
let connection
try {
// Validation that the config has all the required replica db fields else it throws
const { host, port, user, password, database } = replica_db_config.connection
if (!host || !port || !user || !password || !database) {
throw new Error(
'Missing required database connection parameters. Parameters: host, port, user, password, database'
)
}
connection = knex(replicaKnexConfig)
return { connection, type: 'replica' }
} catch (e) {
throw new Error(`Replica database connection failed: ${e}`)
logger.imp(
`Not connecting to replica db with config ${replica_db_config}, error: ${e}. Connecting to the main db for reads`
)
connection = await createDbConnection()
}

return connection
return { connection, type: 'main' }
}

/**
Expand Down
4 changes: 2 additions & 2 deletions src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ async function startApp() {
logger.imp('Connecting to database...')
const connection = await createDbConnection()
logger.imp(`Connected to database: ${config.db.client}`)
const replicaConnection = await createReplicaDbConnection()
const { connection: replicaConnection, type: replicaDbType } = await createReplicaDbConnection()
logger.imp(`Connected to replica database: ${config.db.client}`)

const container = createInjector()
.provideValue('config', config)
.provideValue('dbConnection', connection)
.provideValue('replicaDbConnection', replicaConnection)
.provideValue('replicaDbConnection', { connection: replicaConnection, type: replicaDbType })

const app = new CeramicAnchorApp(container)
await app.start()
Expand Down
10 changes: 8 additions & 2 deletions src/repositories/replication-request-repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ const TABLE_NAME = 'request'
* Replication request repository.
*/
export interface IReplicationRequestRepository {
readonly connectionType: string
readonly table: Knex.QueryBuilder
/**
* Finds a request with the given CID if exists using the replica database.
Expand All @@ -20,11 +21,16 @@ export interface IReplicationRequestRepository {
export class ReplicationRequestRepository implements IReplicationRequestRepository {
static inject = ['replicaDbConnection'] as const

constructor(private readonly connection: Knex) {}
constructor(private readonly connection: { connection: Knex; type: string }) {}

get table(): Knex.QueryBuilder {
return this.connection(TABLE_NAME)
return this.connection.connection(TABLE_NAME)
}

get connectionType(): string {
return this.connection.type
}

/**
* Finds a request with the given CID if exists using the replica database.
* @param cid CID the request is for
Expand Down
8 changes: 6 additions & 2 deletions src/services/request-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ export class RequestService {
try {
request = await this.replicationRequestRepository.findByCid(cid)
} catch (e) {
logger.err(`Error fetching request from replica db for ${cid}, error: ${e}`)
logger.err(
`Error fetching request from db with connecion: ${this.replicationRequestRepository.connectionType} for ${cid}, error: ${e}`
)
}
if (!request) {
logger.debug(`Request not found in replica db for ${cid}, fetching from main_db`)
Expand Down Expand Up @@ -88,7 +90,9 @@ export class RequestService {
try {
found = await this.replicationRequestRepository.findByCid(cid)
} catch (e) {
logger.err(`Error fetching request from replica db for ${cid}, error: ${e}`)
logger.err(
`Error fetching request from db with connecion: ${this.replicationRequestRepository.connectionType} for ${cid}, error: ${e}`
)
}
if (!found) {
logger.debug(`Request not found in replica db for ${cid}, fetching from main_db`)
Expand Down

0 comments on commit c10f1c5

Please sign in to comment.