From 5a2b91ca904cdc49383370f9b25c90ccd41b0381 Mon Sep 17 00:00:00 2001 From: Shaya Potter Date: Tue, 5 Nov 2024 11:48:50 +0200 Subject: [PATCH] address review comments --- packages/client/lib/client/cache.ts | 107 +++---------------- packages/client/lib/client/commands-queue.ts | 1 + packages/client/lib/client/index.ts | 11 +- packages/client/lib/client/parser.ts | 10 +- packages/client/lib/client/pool.ts | 22 ++-- packages/client/lib/cluster/index.ts | 4 + packages/client/lib/sentinel/index.ts | 11 +- 7 files changed, 45 insertions(+), 121 deletions(-) diff --git a/packages/client/lib/client/cache.ts b/packages/client/lib/client/cache.ts index 3a8fd951b4..03eee829b6 100644 --- a/packages/client/lib/client/cache.ts +++ b/packages/client/lib/client/cache.ts @@ -1,10 +1,9 @@ import { EventEmitter } from 'stream'; -import RedisClient, { RedisClientType } from '.'; +import RedisClient from '.'; import { RedisArgument, ReplyUnion, TransformReply, TypeMapping } from '../RESP/types'; import { BasicCommandParser } from './parser'; type CachingClient = RedisClient; -type CachingClientType = RedisClientType; type CmdFunc = () => Promise; export interface ClientSideCacheConfig { @@ -23,6 +22,17 @@ interface ClientSideCacheEntry { validate(): boolean; } +function generateCacheKey(redisArgs: ReadonlyArray): string { + const tmp = new Array(redisArgs.length*2); + + for (let i = 0; i < redisArgs.length; i++) { + tmp[i] = redisArgs[i].length; + tmp[i+redisArgs.length] = redisArgs[i]; + } + + return tmp.join('_'); +} + abstract class ClientSideCacheEntryBase implements ClientSideCacheEntry { #invalidated = false; readonly #expireTime: number; @@ -125,7 +135,7 @@ export class BasicClientSideCache extends ClientSideCacheProvider { ) { let reply: ReplyUnion; - const cacheKey = parser.cacheKey; + const cacheKey = generateCacheKey(parser.redisArgs); // "2" let cacheEntry = this.get(cacheKey); @@ -339,10 +349,6 @@ export class BasicClientSideCache extends ClientSideCacheProvider { export abstract class PooledClientSideCacheProvider extends BasicClientSideCache { #disabled = false; - abstract updateRedirect(id: number): void; - abstract addClient(client: CachingClientType): void; - abstract removeClient(client: CachingClientType): void; - disable() { this.#disabled = true; } @@ -367,8 +373,6 @@ export abstract class PooledClientSideCacheProvider extends BasicClientSideCache return super.has(cacheKey); } - onPoolConnect(factory: () => CachingClientType) {}; - onPoolClose() { this.clear(); }; @@ -376,18 +380,6 @@ export abstract class PooledClientSideCacheProvider extends BasicClientSideCache // doesn't do anything special in pooling, clears cache on every client disconnect export class BasicPooledClientSideCache extends PooledClientSideCacheProvider { - - override updateRedirect(id: number): void { - return; - } - - override addClient(client: CachingClientType): void { - return; - } - override removeClient(client: CachingClientType): void { - return; - } - override onError() { this.clear(false); } @@ -459,75 +451,4 @@ export class PooledNoRedirectClientSideCache extends BasicPooledClientSideCache override onError() {} override onClose() {} -} - -// Only clears cache on "management"/"redirect" client disconnect -export class PooledRedirectClientSideCache extends PooledClientSideCacheProvider { - #id?: number; - #clients: Set = new Set(); - #redirectClient?: CachingClientType; - - constructor(config: ClientSideCacheConfig) { - super(config); - this.disable(); - } - - override trackingOn(): string[] { - if (this.#id) { - return ['CLIENT', 'TRACKING', 'ON', 'REDIRECT', this.#id.toString()]; - } else { - return []; - } - } - - override updateRedirect(id: number) { - this.#id = id; - for (const client of this.#clients) { - client.sendCommand(this.trackingOn()).catch(() => {}); - } - } - - override addClient(client: CachingClientType) { - this.#clients.add(client); - } - - override removeClient(client: CachingClientType) { - this.#clients.delete(client); - } - - override onError(): void {}; - - override async onPoolConnect(factory: () => CachingClientType) { - const client = factory(); - this.#redirectClient = client; - - client.on("error", () => { - this.disable(); - this.clear(); - }).on("ready", async () => { - const clientId = await client.withTypeMapping({}).clientId(); - this.updateRedirect(clientId); - this.enable(); - }) - - try { - await client.connect(); - } catch (err) { - throw err; - } - } - - override onClose() {}; - - override onPoolClose() { - super.onPoolClose(); - - if (this.#redirectClient) { - this.#id = undefined; - const client = this.#redirectClient; - this.#redirectClient = undefined; - - return client.close(); - } - } -} +} \ No newline at end of file diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index 0199aa92c1..0af8113506 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -111,6 +111,7 @@ export default class RedisCommandsQueue { onErrorReply: err => this.#onErrorReply(err), onPush: push => { if (!this.#onPush(push)) { + // currently only supporting "invalidate" over RESP3 push messages switch (push[0].toString()) { case "invalidate": { if (this.#invalidateCallback) { diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index c256f3a1a3..437f9f2e43 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -290,6 +290,9 @@ export default class RedisClient< #watchEpoch?: number; #clientSideCache?: ClientSideCacheProvider; + get clientSideCache() { + return this._self.#clientSideCache; + } get options(): RedisClientOptions | undefined { return this._self.#options; @@ -311,7 +314,6 @@ export default class RedisClient< return this._self.#socket.socketEpoch; } - get isWatching() { return this._self.#watchEpoch !== undefined; } @@ -414,10 +416,7 @@ export default class RedisClient< } if (this.#clientSideCache) { - const tracking = this.#clientSideCache.trackingOn(); - if (tracking) { - commands.push(tracking); - } + commands.push(this.#clientSideCache.trackingOn()); } return commands; @@ -855,7 +854,7 @@ export default class RedisClient< } const chainId = Symbol('Pipeline Chain'), - promise = Promise.allSettled( + promise = Promise.all( commands.map(({ args }) => this._self.#queue.addCommand(args, { chainId, typeMapping: this._commandOptions?.typeMapping diff --git a/packages/client/lib/client/parser.ts b/packages/client/lib/client/parser.ts index 76251ea67a..3e82023042 100644 --- a/packages/client/lib/client/parser.ts +++ b/packages/client/lib/client/parser.ts @@ -34,8 +34,14 @@ export class BasicCommandParser implements CommandParser { } get cacheKey() { - let cacheKey = this.#redisArgs.map((arg) => arg.length).join('_'); - return cacheKey + '_' + this.#redisArgs.join('_'); + const tmp = new Array(this.#redisArgs.length*2); + + for (let i = 0; i < this.#redisArgs.length; i++) { + tmp[i] = this.#redisArgs[i].length; + tmp[i+this.#redisArgs.length] = this.#redisArgs[i]; + } + + return tmp.join('_'); } push(...arg: Array) { diff --git a/packages/client/lib/client/pool.ts b/packages/client/lib/client/pool.ts index 400268f4bb..07553775ec 100644 --- a/packages/client/lib/client/pool.ts +++ b/packages/client/lib/client/pool.ts @@ -7,7 +7,7 @@ import { TimeoutError } from '../errors'; import { attachConfig, functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander'; import { CommandOptions } from './commands-queue'; import RedisClientMultiCommand, { RedisClientMultiCommandType } from './multi-command'; -import { BasicPooledClientSideCache, ClientSideCacheConfig, PooledClientSideCacheProvider, PooledNoRedirectClientSideCache, PooledRedirectClientSideCache } from './cache'; +import { BasicPooledClientSideCache, ClientSideCacheConfig, PooledClientSideCacheProvider } from './cache'; import { BasicCommandParser } from './parser'; export interface RedisPoolOptions { @@ -215,6 +215,9 @@ export class RedisClientPool< } #clientSideCache?: PooledClientSideCacheProvider; + get clientSideCache() { + return this._self.#clientSideCache; + } /** * You are probably looking for {@link RedisClient.createPool `RedisClient.createPool`}, @@ -241,8 +244,7 @@ export class RedisClientPool< } else { const cscConfig = options.clientSideCache; this.#clientSideCache = clientOptions.clientSideCache = new BasicPooledClientSideCache(cscConfig); - this.#clientSideCache = clientOptions.clientSideCache = new PooledNoRedirectClientSideCache(cscConfig); - this.#clientSideCache = clientOptions.clientSideCache = new PooledRedirectClientSideCache(cscConfig); +// this.#clientSideCache = clientOptions.clientSideCache = new PooledNoRedirectClientSideCache(cscConfig); } } @@ -312,13 +314,6 @@ export class RedisClientPool< if (this._self.#isOpen) return; // TODO: throw error? this._self.#isOpen = true; - try { - this._self.#clientSideCache?.onPoolConnect(this._self.#clientFactory); - } catch (err) { - this.destroy(); - throw err; - } - const promises = []; while (promises.length < this._self.#options.minimum) { promises.push(this._self.#create()); @@ -334,7 +329,7 @@ export class RedisClientPool< return this as unknown as RedisClientPoolType; } - async #create(redirect?: boolean) { + async #create() { const node = this._self.#clientsInUse.push( this._self.#clientFactory() .on('error', (err: Error) => this.emit('error', err)) @@ -342,10 +337,6 @@ export class RedisClientPool< try { const client = node.value; - if (this._self.#clientSideCache) { - this._self.#clientSideCache.addClient(node.value); - } - await client.connect(); } catch (err) { this._self.#clientsInUse.remove(node); @@ -436,7 +427,6 @@ export class RedisClientPool< for (let i = 0; i < toDestroy; i++) { // TODO: shift vs pop const client = this.#idleClients.shift()! - this.#clientSideCache?.removeClient(client); client.destroy(); } } diff --git a/packages/client/lib/cluster/index.ts b/packages/client/lib/cluster/index.ts index cfe6719398..fdb13df35f 100644 --- a/packages/client/lib/cluster/index.ts +++ b/packages/client/lib/cluster/index.ts @@ -270,6 +270,10 @@ export default class RedisCluster< return this._self.#slots.slots; } + get clientSideCache() { + return this._self.#slots.clientSideCache; + } + /** * An array of the cluster masters. * Use with {@link RedisCluster.prototype.nodeClient} to get the client for a specific master node. diff --git a/packages/client/lib/sentinel/index.ts b/packages/client/lib/sentinel/index.ts index a2037533c9..519c2828b5 100644 --- a/packages/client/lib/sentinel/index.ts +++ b/packages/client/lib/sentinel/index.ts @@ -16,7 +16,7 @@ import { RedisVariadicArgument } from '../commands/generic-transformers'; import { WaitQueue } from './wait-queue'; import { TcpNetConnectOpts } from 'node:net'; import { RedisTcpSocketOptions } from '../client/socket'; -import { BasicPooledClientSideCache, PooledClientSideCacheProvider, PooledNoRedirectClientSideCache, PooledRedirectClientSideCache } from '../client/cache'; +import { BasicPooledClientSideCache, PooledClientSideCacheProvider } from '../client/cache'; interface ClientInfo { id: number; @@ -266,6 +266,10 @@ export default class RedisSentinel< #masterClientCount = 0; #masterClientInfo?: ClientInfo; + get clientSideCache() { + return this._self.#internal.clientSideCache; + } + constructor(options: RedisSentinelOptions) { super(); @@ -558,7 +562,7 @@ class RedisSentinelInternal< readonly #name: string; readonly #nodeClientOptions: RedisClientOptions; - readonly #sentinelClientOptions: RedisClientOptions; + readonly #sentinelClientOptions: RedisClientOptions; readonly #scanInterval: number; readonly #passthroughClientErrorEvents: boolean; @@ -619,8 +623,7 @@ class RedisSentinelInternal< } else { const cscConfig = options.clientSideCache; this.#clientSideCache = this.#nodeClientOptions.clientSideCache = new BasicPooledClientSideCache(cscConfig); - this.#clientSideCache = this.#nodeClientOptions.clientSideCache = new PooledNoRedirectClientSideCache(cscConfig); - this.#clientSideCache = this.#nodeClientOptions.clientSideCache = new PooledRedirectClientSideCache(cscConfig); +// this.#clientSideCache = this.#nodeClientOptions.clientSideCache = new PooledNoRedirectClientSideCache(cscConfig); } }