Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/default to maindb if read replica errs #1221

Merged
merged 4 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 72 additions & 44 deletions src/__tests__/ceramic_integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ async function makeCAS(
container: Injector,
dbConnection: Knex,
minConfig: MinimalCASConfig,
replicaDbConnection: Knex
replicaDbConnection: { connection: Knex; type: string }
): Promise<CeramicAnchorApp> {
const configCopy = cloneDeep(config)
configCopy.mode = minConfig.mode
Expand All @@ -205,7 +205,10 @@ async function makeCAS(
mode: 'inmemory',
}
return new CeramicAnchorApp(
container.provideValue('config', configCopy).provideValue('dbConnection', dbConnection).provideValue('replicaDbConnection', replicaDbConnection)
container
.provideValue('config', configCopy)
.provideValue('dbConnection', dbConnection)
.provideValue('replicaDbConnection', replicaDbConnection)
)
}

Expand Down Expand Up @@ -257,8 +260,8 @@ describe('Ceramic Integration Test', () => {
let dbConnection1: Knex
let dbConnection2: Knex

let replicaDbConnection1: Knex
let replicaDbConnection2: Knex
let replicaDbConnection1: { connection: Knex; type: string }
let replicaDbConnection2: { connection: Knex; type: string }

let casPort1: number
let cas1: CeramicAnchorApp
Expand Down Expand Up @@ -334,25 +337,35 @@ describe('Ceramic Integration Test', () => {
replicaDbConnection1 = await createReplicaDbConnection()
casPort1 = await getPort()

cas1 = await makeCAS(createInjector(), dbConnection1, {
mode: 'server',
ipfsPort: ipfsApiPort1,
ganachePort: ganacheServer.port,
port: casPort1,
useSmartContractAnchors,
}, replicaDbConnection1)
cas1 = await makeCAS(
createInjector(),
dbConnection1,
{
mode: 'server',
ipfsPort: ipfsApiPort1,
ganachePort: ganacheServer.port,
port: casPort1,
useSmartContractAnchors,
},
replicaDbConnection1
)
await cas1.start()
anchorService1 = cas1.container.resolve('anchorService')
dbConnection2 = await teeDbConnection(dbConnection1)
replicaDbConnection2 = await createReplicaDbConnection()
const casPort2 = await getPort()
cas2 = await makeCAS(createInjector(), dbConnection2, {
mode: 'server',
ipfsPort: ipfsApiPort2,
ganachePort: ganacheServer.port,
port: casPort2,
useSmartContractAnchors,
}, replicaDbConnection2)
cas2 = await makeCAS(
createInjector(),
dbConnection2,
{
mode: 'server',
ipfsPort: ipfsApiPort2,
ganachePort: ganacheServer.port,
port: casPort2,
useSmartContractAnchors,
},
replicaDbConnection2
)
await cas2.start()
anchorService2 = cas2.container.resolve('anchorService')

Expand All @@ -376,7 +389,7 @@ describe('Ceramic Integration Test', () => {
await cas1.stop()
await cas2.stop()
await Promise.all([dbConnection1.destroy(), dbConnection2.destroy()])
await Promise.all([replicaDbConnection1.destroy(), replicaDbConnection2.destroy()])
await Promise.all([replicaDbConnection1.connection.destroy(), replicaDbConnection2.connection.destroy()])
await Promise.all([ceramic1.close(), ceramic2.close()])
})

Expand Down Expand Up @@ -543,13 +556,18 @@ describe('CAR file', () => {
const dbConnection = await createDbConnection()
const dummyReplicaDbConnection = await createReplicaDbConnection()
const casPort = await getPort()
const cas = await makeCAS(createInjector(), dbConnection, {
mode: 'server',
ipfsPort: ipfsApiPort,
ganachePort: ganacheServer.port,
port: casPort,
useSmartContractAnchors: true,
}, dummyReplicaDbConnection)
const cas = await makeCAS(
createInjector(),
dbConnection,
{
mode: 'server',
ipfsPort: ipfsApiPort,
ganachePort: ganacheServer.port,
port: casPort,
useSmartContractAnchors: true,
},
dummyReplicaDbConnection
)
await cas.start()

const ceramicIPFS = await createIPFS(await getPort())
Expand Down Expand Up @@ -620,30 +638,40 @@ describe('Metrics Options', () => {
const dbConnection = await createDbConnection()
const dummyReplicaDbConnection = await createReplicaDbConnection()
const casPort = await getPort()
const cas = await makeCAS(createInjector(), dbConnection, {
mode: 'server',
ipfsPort: ipfsApiPort,
ganachePort: ganacheServer.port,
port: casPort,
useSmartContractAnchors: true,
metrics: {
instanceIdentifier: '234fffffffffffffffffffffffffffffffff9726129',
const cas = await makeCAS(
createInjector(),
dbConnection,
{
mode: 'server',
ipfsPort: ipfsApiPort,
ganachePort: ganacheServer.port,
port: casPort,
useSmartContractAnchors: true,
metrics: {
instanceIdentifier: '234fffffffffffffffffffffffffffffffff9726129',
},
},
}, dummyReplicaDbConnection)
dummyReplicaDbConnection
)
await cas.start()
// Teardown
await cas.stop()

const cas2 = await makeCAS(createInjector(), dbConnection, {
mode: 'server',
ipfsPort: ipfsApiPort,
ganachePort: ganacheServer.port,
port: casPort,
useSmartContractAnchors: true,
metrics: {
instanceIdentifier: '',
const cas2 = await makeCAS(
createInjector(),
dbConnection,
{
mode: 'server',
ipfsPort: ipfsApiPort,
ganachePort: ganacheServer.port,
port: casPort,
useSmartContractAnchors: true,
metrics: {
instanceIdentifier: '',
},
},
}, dummyReplicaDbConnection)
dummyReplicaDbConnection
)
await cas2.start()
await cas2.stop()

Expand Down
2 changes: 1 addition & 1 deletion src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ import { makeWitnessService, type IWitnessService } from './services/witness-ser
type DependenciesContext = {
config: Config
dbConnection: Knex
replicaDbConnection: Knex
replicaDbConnection: { connection: Knex; type: string }
}

type ProvidedContext = {
Expand Down
2 changes: 1 addition & 1 deletion src/controllers/__tests__/request-controller.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class MockMetadataService implements IMetadataService {
// TODO: WS2-3238 Add calls to replica db connection in the test as well
describe('createRequest', () => {
let dbConnection: Knex
let replicaDbConnection: Knex
let replicaDbConnection: { connection: Knex; type: string }
let container: Injector<Tokens>
let controller: RequestController

Expand Down
18 changes: 14 additions & 4 deletions src/db-connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ export async function createDbConnection(dbConfig: Db = config.db): Promise<Knex

export async function createReplicaDbConnection(
replica_db_config: Replicadb = config.replica_db
): Promise<Knex> {
): Promise<{ connection: Knex; type: string }> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dav1do this change makes the readReplica optional and by introducing a connectionType we have captured the two modes it can work in.
The type is also useful to avoid misleading logs in the connection failure mode giving devs more insight into what is happening in the background

const replicaKnexConfig: Knex.Config = {
client: replica_db_config.client,
connection: replica_db_config.connection.connectionString || {
Expand All @@ -93,12 +93,22 @@ export async function createReplicaDbConnection(
}
let connection
try {
// Validation that the config has all the required replica db fields else it throws
const { host, port, user, password, database } = replica_db_config.connection
if (!host || !port || !user || !password || !database) {
throw new Error(
'Missing required database connection parameters. Parameters: host, port, user, password, database'
)
}
connection = knex(replicaKnexConfig)
return { connection, type: 'replica' }
} catch (e) {
throw new Error(`Replica database connection failed: ${e}`)
logger.imp(
`Not connecting to replica db with config ${replica_db_config}, error: ${e}. Connecting to the main db for reads`
)
connection = await createDbConnection()
}

return connection
return { connection, type: 'main' }
}

/**
Expand Down
4 changes: 2 additions & 2 deletions src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ async function startApp() {
logger.imp('Connecting to database...')
const connection = await createDbConnection()
logger.imp(`Connected to database: ${config.db.client}`)
const replicaConnection = await createReplicaDbConnection()
const { connection: replicaConnection, type: replicaDbType } = await createReplicaDbConnection()
logger.imp(`Connected to replica database: ${config.db.client}`)

const container = createInjector()
.provideValue('config', config)
.provideValue('dbConnection', connection)
.provideValue('replicaDbConnection', replicaConnection)
.provideValue('replicaDbConnection', { connection: replicaConnection, type: replicaDbType })

const app = new CeramicAnchorApp(container)
await app.start()
Expand Down
10 changes: 8 additions & 2 deletions src/repositories/replication-request-repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ const TABLE_NAME = 'request'
* Replication request repository.
*/
export interface IReplicationRequestRepository {
readonly connectionType: string
readonly table: Knex.QueryBuilder
/**
* Finds a request with the given CID if exists using the replica database.
Expand All @@ -20,11 +21,16 @@ export interface IReplicationRequestRepository {
export class ReplicationRequestRepository implements IReplicationRequestRepository {
static inject = ['replicaDbConnection'] as const

constructor(private readonly connection: Knex) {}
constructor(private readonly connection: { connection: Knex; type: string }) {}

get table(): Knex.QueryBuilder {
return this.connection(TABLE_NAME)
return this.connection.connection(TABLE_NAME)
}

get connectionType(): string {
return this.connection.type
}

/**
* Finds a request with the given CID if exists using the replica database.
* @param cid CID the request is for
Expand Down
20 changes: 17 additions & 3 deletions src/services/request-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,14 @@ export class RequestService {
* @returns The request.
*/
async getStatusForCid(cid: CID): Promise<OutputOf<typeof CASResponse> | { error: string }> {
let request = await this.replicationRequestRepository.findByCid(cid)
let request
try {
request = await this.replicationRequestRepository.findByCid(cid)
} catch (e) {
logger.err(
`Error fetching request from db with connecion: ${this.replicationRequestRepository.connectionType} for ${cid}, error: ${e}`
)
}
if (!request) {
logger.debug(`Request not found in replica db for ${cid}, fetching from main_db`)
Metrics.count(METRIC_NAMES.REPLICA_DB_REQUEST_NOT_FOUND, 1)
Expand All @@ -79,11 +86,18 @@ export class RequestService {
* @returns The request.
*/
async findByCid(cid: CID): Promise<OutputOf<typeof CASResponse> | undefined> {
let found = await this.replicationRequestRepository.findByCid(cid)
let found
try {
found = await this.replicationRequestRepository.findByCid(cid)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My preference would be to expect the replica to work if it exists, but default the config to main if there is none defined. I think we should expect it to work if it's defined, but it should be optional.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In terms of implementation, I'd expect we either set up a RO connection pool, or we just use the main pool and the app still calls replicationRequestRepository for all reads because it doesn't require the writer, without needing to know anything about the actual DB used.

This also avoids doing double reads for things that don't actually exist and potentially getting hung trying to talk to a non-exists RO instance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, it all around a better design to make the read replication optional. FYI, currently all our reads do not got to the replica. If the configuration for the read replica is not specified or if any of the config params are missing, we default to the maindb. Will add a commit for this and tag you, let me know if this might fix it or there might be something else I missed

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the changes you made, do we still need this fallback? if it fails now, it seems like it's failing for a reason so there's not sure we need to try another database.

Copy link
Contributor Author

@samika98 samika98 Jun 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup, in the case of the read replica not having the request. If sync b/w the db's is slow

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, what I was trying to say is that if the RR doesn't have the request, it's unlikely the main DB does so there's no reason to ask. If for some reason we're far enough behind on replicating (i.e. minutes), we probably don't want to load the main database more and should investigate. Otherwise, we can tolerate the client retrying again in a few seconds and the RR will respond with data.

} catch (e) {
logger.err(
`Error fetching request from db with connecion: ${this.replicationRequestRepository.connectionType} for ${cid}, error: ${e}`
)
}
if (!found) {
found = await this.requestRepository.findByCid(cid)
logger.debug(`Request not found in replica db for ${cid}, fetching from main_db`)
Metrics.count(METRIC_NAMES.REPLICA_DB_REQUEST_NOT_FOUND, 1)
found = await this.requestRepository.findByCid(cid)
if (!found) {
throw new RequestDoesNotExistError(cid)
}
Expand Down