Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix #38 Auto discover masters when using sentinel #31

Merged
merged 14 commits into from
Jun 4, 2024
24 changes: 24 additions & 0 deletions examples/connect.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { FalkorDB } from 'falkordb';

const db = await FalkorDB.connect({
// username: 'myUsername',
// password: 'myPassword',
socket: {
host: 'localhost',
port: 26379
}
})
db.on('error', console.error)

console.log('Connected to FalkorDB')

const graph = db.selectGraph('myGraph')
const result = await graph.query('MATCH (n) RETURN n')

console.log(result)

console.log(await db.list())
console.log(await db.info())
console.log(await db.connection.info())

db.close()
7 changes: 7 additions & 0 deletions src/commands/SENTINEL_MASTER.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
export const IS_READ_ONLY = true;

export function transformArguments(dbname: string): Array<string> {
return ['SENTINEL', 'MASTER', dbname];
}

export declare function transformReply(): Array<string>;
7 changes: 7 additions & 0 deletions src/commands/SENTINEL_MASTERS.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
export const IS_READ_ONLY = true;

export function transformArguments(): Array<string> {
return ['SENTINEL', 'MASTERS'];
}

export declare function transformReply(): Array<Array<string>>;
8 changes: 7 additions & 1 deletion src/commands/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import * as SLOWLOG from './SLOWLOG';
import * as CONSTRAINT_CREATE from './CONSTRAINT_CREATE';
import * as CONSTRAINT_DROP from './CONSTRAINT_DROP';
import * as COPY from './COPY';
import * as SENTINEL_MASTER from './SENTINEL_MASTER';
import * as SENTINEL_MASTERS from './SENTINEL_MASTERS';

import { RedisCommandArgument, RedisCommandArguments } from '@redis/client/dist/lib/commands';

Expand Down Expand Up @@ -40,7 +42,11 @@ export default {
CONSTRAINT_DROP,
constraintDrop: CONSTRAINT_DROP,
COPY,
copy: COPY
copy: COPY,
SENTINEL_MASTER,
sentinelMaster: SENTINEL_MASTER,
SENTINEL_MASTERS,
sentinelMasters: SENTINEL_MASTERS
};

type QueryParam = null | string | number | boolean | QueryParams | Array<QueryParam>;
Expand Down
80 changes: 78 additions & 2 deletions src/falkordb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,19 @@ import * as tls from 'tls';
import * as net from 'net';
import { EventEmitter } from 'events';

import { RedisClientOptions, RedisFunctions, RedisScripts, createClient } from 'redis';
import { RedisClientOptions, RedisDefaultModules, RedisFunctions, RedisScripts, createClient } from 'redis';

import Graph, { GraphConnection } from './graph';
import commands from './commands';
import { RedisClientType } from '@redis/client';

type NetSocketOptions = Partial<net.SocketConnectOpts> & {
tls?: false;
};

type TypedRedisClientOptions = RedisClientOptions<{ falkordb: typeof commands }, RedisFunctions, RedisScripts>;
type TypedRedisClientType = RedisClientType<RedisDefaultModules & { falkordb: typeof commands }, RedisFunctions, RedisScripts>

interface TlsSocketOptions extends tls.ConnectionOptions {
tls: true;
}
Expand Down Expand Up @@ -91,17 +95,82 @@ export interface FalkorDBOptions {
clientInfoTag?: string;
}

function extractDetails(masters: Array<Array<string>>) {
const allDetails: Record<string, string>[] = [];
for (const master of masters) {
const details: Record<string, string> = {};
for (let i = 0; i < master.length; i += 2) {
details[master[i]] = master[i + 1];
}
allDetails.push(details);
}
return allDetails;
}

export default class FalkorDB extends EventEmitter {

#client: GraphConnection;
#sentinel?: GraphConnection;

private constructor(client: GraphConnection) {
super();
this.#client = client;
}

private async connectServer(client: TypedRedisClientType, redisOption: TypedRedisClientOptions) {

// If not connected to sentinel, throws an error on missing command
const masters = await client.falkordb.sentinelMasters();
const details = extractDetails(masters);

if (details.length > 1) {
throw new Error('Multiple masters are not supported');
}

// Connect to the server with the details from sentinel
const socketOptions: tls.ConnectionOptions = {
...redisOption.socket,
host: details[0]['ip'] as string,
port: parseInt(details[0]['port'])
};
const serverOptions: TypedRedisClientOptions = {
...redisOption,
socket: socketOptions
};
const realClient = createClient<{ falkordb: typeof commands }, RedisFunctions, RedisScripts>(serverOptions)

// Set original client as sentinel and server client as client
this.#sentinel = client;
this.#client = realClient;

await realClient
.on('error', async err => {

console.debug('Error on server connection', err)

// Disconnect the client to avoid further errors and retries
realClient.disconnect();

// If error occurs on previous server connection, no need to reconnect
if (this.#client !== realClient) {
return;
}

try {
await this.connectServer(client, redisOption)
console.debug('Connected to server')
} catch (e) {
console.debug('Error on server reconnect', e)

// Forward errors if reconnection fails
this.emit('error', err)
}
})
.connect();
}
Comment on lines +120 to +170
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The connectServer method is a significant addition and is central to the new functionality for connecting to Redis through Sentinel. Here are a few observations and suggestions:

  1. The method handles the scenario of multiple masters appropriately by throwing an error, which is good for avoiding unexpected behavior in environments that do not support multiple masters.
  2. The use of spread syntax for combining options (...redisOption.socket) is clean and effective.
  3. The error handling within the .on('error', ...) callback is robust, including reconnection attempts and error forwarding.

However, there are a couple of areas that could be improved:

  • The method assumes that the details array will always have at least one element (details[0]). It would be safer to add a check to ensure that details is not empty before accessing its elements.
  • The reconnection logic inside the error handler could potentially lead to unbounded recursion if connectServer keeps failing. Consider implementing a maximum retry limit or using a more sophisticated backoff strategy.
130a131,133
+        if (details.length === 0) {
+            throw new Error('No master details were found');
+        }
160a164,166
+        // Implementing a retry limit
+        if (retryCount >= MAX_RETRIES) throw new Error('Max retries reached');
+        retryCount++;

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
private async connectServer(client: TypedRedisClientType, redisOption: TypedRedisClientOptions) {
// If not connected to sentinel, throws an error on missing command
const masters = await client.falkordb.sentinelMasters();
const details = extractDetails(masters);
if (details.length > 1) {
throw new Error('Multiple masters are not supported');
}
// Connect to the server with the details from sentinel
const socketOptions: tls.ConnectionOptions = {
...redisOption.socket,
host: details[0]['ip'] as string,
port: parseInt(details[0]['port'])
};
const serverOptions: TypedRedisClientOptions = {
...redisOption,
socket: socketOptions
};
const realClient = createClient<{ falkordb: typeof commands }, RedisFunctions, RedisScripts>(serverOptions)
// Set original client as sentinel and server client as client
this.#sentinel = client;
this.#client = realClient;
await realClient
.on('error', async err => {
console.debug('Error on server connection', err)
// Disconnect the client to avoid further errors and retries
realClient.disconnect();
// If error occurs on previous server connection, no need to reconnect
if (this.#client !== realClient) {
return;
}
try {
await this.connectServer(client, redisOption)
console.debug('Connected to server')
} catch (e) {
console.debug('Error on server reconnect', e)
// Forward errors if reconnection fails
this.emit('error', err)
}
})
.connect();
}
private async connectServer(client: TypedRedisClientType, redisOption: TypedRedisClientOptions, retryCount = 0) {
// If not connected to sentinel, throws an error on missing command
const masters = await client.falkordb.sentinelMasters();
const details = extractDetails(masters);
if (details.length > 1) {
throw new Error('Multiple masters are not supported');
}
if (details.length === 0) {
throw new Error('No master details were found');
}
// Connect to the server with the details from sentinel
const socketOptions: tls.ConnectionOptions = {
...redisOption.socket,
host: details[0]['ip'] as string,
port: parseInt(details[0]['port'])
};
const serverOptions: TypedRedisClientOptions = {
...redisOption,
socket: socketOptions
};
const realClient = createClient<{ falkordb: typeof commands }, RedisFunctions, RedisScripts>(serverOptions)
// Set original client as sentinel and server client as client
this.#sentinel = client;
this.#client = realClient;
await realClient
.on('error', async err => {
console.debug('Error on server connection', err)
// Disconnect the client to avoid further errors and retries
realClient.disconnect();
// If error occurs on previous server connection, no need to reconnect
if (this.#client !== realClient) {
return;
}
// Implementing a retry limit
if (retryCount >= MAX_RETRIES) throw new Error('Max retries reached');
retryCount++;
try {
await this.connectServer(client, redisOption, retryCount)
console.debug('Connected to server')
} catch (e) {
console.debug('Error on server reconnect', e)
// Forward errors if reconnection fails
this.emit('error', err)
}
})
.connect();
}


static async connect(options?: FalkorDBOptions) {
const redisOption = (options ?? {}) as RedisClientOptions<{ falkordb: typeof commands }, RedisFunctions, RedisScripts>;
const redisOption = (options ?? {}) as TypedRedisClientOptions;

// If the URL is provided, and the protocol is `falkor` replaces it with `redis` for the underline redis client
// e.g. falkor://localhost:6379 -> redis://localhost:6379
Expand All @@ -114,12 +183,19 @@ export default class FalkorDB extends EventEmitter {
}

const client = createClient<{ falkordb: typeof commands }, RedisFunctions, RedisScripts>(redisOption)

const falkordb = new FalkorDB(client);

await client
.on('error', err => falkordb.emit('error', err)) // Forward errors
.connect();

try {
await falkordb.connectServer(client, redisOption)
} catch (e) {
console.debug('Error in connecting to sentinel, connecting to server directly');
}

return falkordb
}

Expand Down
Loading