Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
sjpotter committed Nov 5, 2024
1 parent 901f197 commit 5a2b91c
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 121 deletions.
107 changes: 14 additions & 93 deletions packages/client/lib/client/cache.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import { EventEmitter } from 'stream';
import RedisClient, { RedisClientType } from '.';
import RedisClient from '.';
import { RedisArgument, ReplyUnion, TransformReply, TypeMapping } from '../RESP/types';
import { BasicCommandParser } from './parser';

type CachingClient = RedisClient<any, any, any, any, any>;
type CachingClientType = RedisClientType<any, any, any, any, any>;
type CmdFunc = () => Promise<ReplyUnion>;

export interface ClientSideCacheConfig {
Expand All @@ -23,6 +22,17 @@ interface ClientSideCacheEntry {
validate(): boolean;
}

function generateCacheKey(redisArgs: ReadonlyArray<RedisArgument>): string {
const tmp = new Array(redisArgs.length*2);

for (let i = 0; i < redisArgs.length; i++) {
tmp[i] = redisArgs[i].length;
tmp[i+redisArgs.length] = redisArgs[i];
}

return tmp.join('_');
}

abstract class ClientSideCacheEntryBase implements ClientSideCacheEntry {
#invalidated = false;
readonly #expireTime: number;
Expand Down Expand Up @@ -125,7 +135,7 @@ export class BasicClientSideCache extends ClientSideCacheProvider {
) {
let reply: ReplyUnion;

const cacheKey = parser.cacheKey;
const cacheKey = generateCacheKey(parser.redisArgs);

// "2"
let cacheEntry = this.get(cacheKey);
Expand Down Expand Up @@ -339,10 +349,6 @@ export class BasicClientSideCache extends ClientSideCacheProvider {
export abstract class PooledClientSideCacheProvider extends BasicClientSideCache {
#disabled = false;

abstract updateRedirect(id: number): void;
abstract addClient(client: CachingClientType): void;
abstract removeClient(client: CachingClientType): void;

disable() {
this.#disabled = true;
}
Expand All @@ -367,27 +373,13 @@ export abstract class PooledClientSideCacheProvider extends BasicClientSideCache
return super.has(cacheKey);
}

onPoolConnect(factory: () => CachingClientType) {};

onPoolClose() {
this.clear();
};
}

// doesn't do anything special in pooling, clears cache on every client disconnect
export class BasicPooledClientSideCache extends PooledClientSideCacheProvider {

override updateRedirect(id: number): void {
return;
}

override addClient(client: CachingClientType): void {
return;
}
override removeClient(client: CachingClientType): void {
return;
}

override onError() {
this.clear(false);
}
Expand Down Expand Up @@ -459,75 +451,4 @@ export class PooledNoRedirectClientSideCache extends BasicPooledClientSideCache
override onError() {}

override onClose() {}
}

// Only clears cache on "management"/"redirect" client disconnect
export class PooledRedirectClientSideCache extends PooledClientSideCacheProvider {
#id?: number;
#clients: Set<CachingClientType> = new Set();
#redirectClient?: CachingClientType;

constructor(config: ClientSideCacheConfig) {
super(config);
this.disable();
}

override trackingOn(): string[] {
if (this.#id) {
return ['CLIENT', 'TRACKING', 'ON', 'REDIRECT', this.#id.toString()];
} else {
return [];
}
}

override updateRedirect(id: number) {
this.#id = id;
for (const client of this.#clients) {
client.sendCommand(this.trackingOn()).catch(() => {});
}
}

override addClient(client: CachingClientType) {
this.#clients.add(client);
}

override removeClient(client: CachingClientType) {
this.#clients.delete(client);
}

override onError(): void {};

override async onPoolConnect(factory: () => CachingClientType) {
const client = factory();
this.#redirectClient = client;

client.on("error", () => {
this.disable();
this.clear();
}).on("ready", async () => {
const clientId = await client.withTypeMapping({}).clientId();
this.updateRedirect(clientId);
this.enable();
})

try {
await client.connect();
} catch (err) {
throw err;
}
}

override onClose() {};

override onPoolClose() {
super.onPoolClose();

if (this.#redirectClient) {
this.#id = undefined;
const client = this.#redirectClient;
this.#redirectClient = undefined;

return client.close();
}
}
}
}
1 change: 1 addition & 0 deletions packages/client/lib/client/commands-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ export default class RedisCommandsQueue {
onErrorReply: err => this.#onErrorReply(err),
onPush: push => {
if (!this.#onPush(push)) {
// currently only supporting "invalidate" over RESP3 push messages
switch (push[0].toString()) {
case "invalidate": {
if (this.#invalidateCallback) {
Expand Down
11 changes: 5 additions & 6 deletions packages/client/lib/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,9 @@ export default class RedisClient<
#watchEpoch?: number;

#clientSideCache?: ClientSideCacheProvider;
get clientSideCache() {
return this._self.#clientSideCache;
}

get options(): RedisClientOptions<M, F, S, RESP> | undefined {
return this._self.#options;
Expand All @@ -311,7 +314,6 @@ export default class RedisClient<
return this._self.#socket.socketEpoch;
}


get isWatching() {
return this._self.#watchEpoch !== undefined;
}
Expand Down Expand Up @@ -414,10 +416,7 @@ export default class RedisClient<
}

if (this.#clientSideCache) {
const tracking = this.#clientSideCache.trackingOn();
if (tracking) {
commands.push(tracking);
}
commands.push(this.#clientSideCache.trackingOn());
}

return commands;
Expand Down Expand Up @@ -855,7 +854,7 @@ export default class RedisClient<
}

const chainId = Symbol('Pipeline Chain'),
promise = Promise.allSettled(
promise = Promise.all(
commands.map(({ args }) => this._self.#queue.addCommand(args, {
chainId,
typeMapping: this._commandOptions?.typeMapping
Expand Down
10 changes: 8 additions & 2 deletions packages/client/lib/client/parser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,14 @@ export class BasicCommandParser implements CommandParser {
}

get cacheKey() {
let cacheKey = this.#redisArgs.map((arg) => arg.length).join('_');
return cacheKey + '_' + this.#redisArgs.join('_');
const tmp = new Array(this.#redisArgs.length*2);

for (let i = 0; i < this.#redisArgs.length; i++) {
tmp[i] = this.#redisArgs[i].length;
tmp[i+this.#redisArgs.length] = this.#redisArgs[i];
}

return tmp.join('_');
}

push(...arg: Array<RedisArgument>) {
Expand Down
22 changes: 6 additions & 16 deletions packages/client/lib/client/pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { TimeoutError } from '../errors';
import { attachConfig, functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander';
import { CommandOptions } from './commands-queue';
import RedisClientMultiCommand, { RedisClientMultiCommandType } from './multi-command';
import { BasicPooledClientSideCache, ClientSideCacheConfig, PooledClientSideCacheProvider, PooledNoRedirectClientSideCache, PooledRedirectClientSideCache } from './cache';
import { BasicPooledClientSideCache, ClientSideCacheConfig, PooledClientSideCacheProvider } from './cache';
import { BasicCommandParser } from './parser';

export interface RedisPoolOptions {
Expand Down Expand Up @@ -215,6 +215,9 @@ export class RedisClientPool<
}

#clientSideCache?: PooledClientSideCacheProvider;
get clientSideCache() {
return this._self.#clientSideCache;
}

/**
* You are probably looking for {@link RedisClient.createPool `RedisClient.createPool`},
Expand All @@ -241,8 +244,7 @@ export class RedisClientPool<
} else {
const cscConfig = options.clientSideCache;
this.#clientSideCache = clientOptions.clientSideCache = new BasicPooledClientSideCache(cscConfig);
this.#clientSideCache = clientOptions.clientSideCache = new PooledNoRedirectClientSideCache(cscConfig);
this.#clientSideCache = clientOptions.clientSideCache = new PooledRedirectClientSideCache(cscConfig);
// this.#clientSideCache = clientOptions.clientSideCache = new PooledNoRedirectClientSideCache(cscConfig);
}
}

Expand Down Expand Up @@ -312,13 +314,6 @@ export class RedisClientPool<
if (this._self.#isOpen) return; // TODO: throw error?
this._self.#isOpen = true;

try {
this._self.#clientSideCache?.onPoolConnect(this._self.#clientFactory);
} catch (err) {
this.destroy();
throw err;
}

const promises = [];
while (promises.length < this._self.#options.minimum) {
promises.push(this._self.#create());
Expand All @@ -334,18 +329,14 @@ export class RedisClientPool<
return this as unknown as RedisClientPoolType<M, F, S, RESP, TYPE_MAPPING>;
}

async #create(redirect?: boolean) {
async #create() {
const node = this._self.#clientsInUse.push(
this._self.#clientFactory()
.on('error', (err: Error) => this.emit('error', err))
);

try {
const client = node.value;
if (this._self.#clientSideCache) {
this._self.#clientSideCache.addClient(node.value);
}

await client.connect();
} catch (err) {
this._self.#clientsInUse.remove(node);
Expand Down Expand Up @@ -436,7 +427,6 @@ export class RedisClientPool<
for (let i = 0; i < toDestroy; i++) {
// TODO: shift vs pop
const client = this.#idleClients.shift()!
this.#clientSideCache?.removeClient(client);
client.destroy();
}
}
Expand Down
4 changes: 4 additions & 0 deletions packages/client/lib/cluster/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,10 @@ export default class RedisCluster<
return this._self.#slots.slots;
}

get clientSideCache() {
return this._self.#slots.clientSideCache;
}

/**
* An array of the cluster masters.
* Use with {@link RedisCluster.prototype.nodeClient} to get the client for a specific master node.
Expand Down
11 changes: 7 additions & 4 deletions packages/client/lib/sentinel/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import { RedisVariadicArgument } from '../commands/generic-transformers';
import { WaitQueue } from './wait-queue';
import { TcpNetConnectOpts } from 'node:net';
import { RedisTcpSocketOptions } from '../client/socket';
import { BasicPooledClientSideCache, PooledClientSideCacheProvider, PooledNoRedirectClientSideCache, PooledRedirectClientSideCache } from '../client/cache';
import { BasicPooledClientSideCache, PooledClientSideCacheProvider } from '../client/cache';

interface ClientInfo {
id: number;
Expand Down Expand Up @@ -266,6 +266,10 @@ export default class RedisSentinel<
#masterClientCount = 0;
#masterClientInfo?: ClientInfo;

get clientSideCache() {
return this._self.#internal.clientSideCache;
}

constructor(options: RedisSentinelOptions<M, F, S, RESP, TYPE_MAPPING>) {
super();

Expand Down Expand Up @@ -558,7 +562,7 @@ class RedisSentinelInternal<

readonly #name: string;
readonly #nodeClientOptions: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING, RedisTcpSocketOptions>;
readonly #sentinelClientOptions: RedisClientOptions<typeof RedisSentinelModule, F, S, RESP, TYPE_MAPPING, RedisTcpSocketOptions>;
readonly #sentinelClientOptions: RedisClientOptions<typeof RedisSentinelModule, RedisFunctions, RedisScripts, RespVersions, TypeMapping, RedisTcpSocketOptions>;
readonly #scanInterval: number;
readonly #passthroughClientErrorEvents: boolean;

Expand Down Expand Up @@ -619,8 +623,7 @@ class RedisSentinelInternal<
} else {
const cscConfig = options.clientSideCache;
this.#clientSideCache = this.#nodeClientOptions.clientSideCache = new BasicPooledClientSideCache(cscConfig);
this.#clientSideCache = this.#nodeClientOptions.clientSideCache = new PooledNoRedirectClientSideCache(cscConfig);
this.#clientSideCache = this.#nodeClientOptions.clientSideCache = new PooledRedirectClientSideCache(cscConfig);
// this.#clientSideCache = this.#nodeClientOptions.clientSideCache = new PooledNoRedirectClientSideCache(cscConfig);
}
}

Expand Down

0 comments on commit 5a2b91c

Please sign in to comment.