From d40d07d9e222a94c3197208ca45bdafec2ffd40f Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Tue, 1 Aug 2023 10:32:56 +0100 Subject: [PATCH 1/7] Try to unify queue implementations. --- package.json | 2 + src/pool-service/CommandReader.ts | 122 ++++++++++++++++++++++++++ src/pool-service/IrcConnectionPool.ts | 1 + src/pool-service/IrcPoolClient.ts | 5 ++ yarn.lock | 12 +++ 5 files changed, 142 insertions(+) create mode 100644 src/pool-service/CommandReader.ts diff --git a/package.json b/package.json index abc1f0a6d..f596b08fe 100644 --- a/package.json +++ b/package.json @@ -33,6 +33,7 @@ }, "dependencies": { "@sentry/node": "^6.17.9", + "@types/semver": "^7.5.0", "ajv": "^8.12.0", "bluebird": "^3.7.2", "classnames": "^2.3.2", @@ -54,6 +55,7 @@ "react": "^18.2.0", "react-dom": "^18.2.0", "sanitize-html": "^2.7.2", + "semver": "^7.5.4", "typed-emitter": "^2.1.0", "typescript": "^5.0.4", "url-join": "^5.0.0", diff --git a/src/pool-service/CommandReader.ts b/src/pool-service/CommandReader.ts new file mode 100644 index 000000000..849d5bbd3 --- /dev/null +++ b/src/pool-service/CommandReader.ts @@ -0,0 +1,122 @@ +import { Redis } from "ioredis"; +import { READ_BUFFER_MAGIC_BYTES } from "./types"; +import semver from "semver"; +import { Logger } from "matrix-appservice-bridge"; + +const TRIM_EVERY_MS = 30000; +const COMMAND_BLOCK_TIMEOUT = 10000; +const TRIM_MAXLEN_COUNT = 100_000; + +const log = new Logger('RedisCommandReader'); + +export class RedisCommandReader { + + private shouldRun = true; + private commandStreamId = "$" + private supportsMinId = false; + private trimInterval?: NodeJS.Timer; + + constructor( + private readonly redis: Redis, + private readonly streamName: string, + private readonly onCommand: (cmdType: CommandType, cmdPayload: PayloadType) => Promise) { + + } + + private updateLastRead(lastRead: string) { + this.commandStreamId = lastRead; + } + + public stop() { + this.shouldRun = false; + clearInterval(this.trimInterval); + } + + public async readQueue() { + const newCmds = await this.redis.xread( + "BLOCK", COMMAND_BLOCK_TIMEOUT, "STREAMS", this.streamName, this.commandStreamId + ).catch(ex => { + log.warn(`Failed to read new command:`, ex); + return null; + }); + if (newCmds === null) { + // This means we've waited for some time and seen no new commands, to be safe revert to the HEAD of the queue. + log.info(`Stream has been idle for ${COMMAND_BLOCK_TIMEOUT}ms, listening for messages at $`); + this.commandStreamId = '$'; + return; + } + // This is a list of keys, containing a list of commands, hence needing to deeply extract the values. + for (const [msgId, [cmdType, payload]] of newCmds[0][1]) { + // If we crash, we don't want to get stuck on this msg. + this.updateLastRead(msgId); + const commandType = cmdType as CommandType; + let commandData: PayloadType|Buffer; + if (typeof payload === 'string' && payload[0] === '{') { + commandData = JSON.parse(payload) as PayloadType; + } + else { + commandData = Buffer.from(payload).subarray(READ_BUFFER_MAGIC_BYTES.length); + } + setImmediate( + () => this.onCommand(commandType, commandData) + .catch(ex => log.warn(`Failed to handle msg ${msgId} (${commandType}, ${payload})`, ex) + ), + ); + } + + } + + public async getSupported() { + const serverLines = (await + this.redis.info("Server")).split('\n').filter(v => !v.startsWith('#')).map(v => v.split(':', 2) + ) as [string, string][]; + const options = new Map(serverLines); + const version = options.get('redis_version'); + this.supportsMinId = !!(version && semver.satisfies(version, '>=6.2')); + } + + private async trimCommandStream() { + if (this.commandStreamId === '$') { + // At the head of the queue, don't trim. + return; + } + try { + let trimCount; + if (this.supportsMinId) { + trimCount = await this.redis.xtrim( + this.streamName, "MINID", this.commandStreamId + ); + } + else { + // If Redis doesn't support minid (requires >=6.2), we can fallback to + // trimming a large amount of messages instead. + trimCount = await this.redis.xtrim( + this.streamName, "MAXLEN", TRIM_MAXLEN_COUNT + ); + } + log.debug(`Trimmed ${trimCount} commands from the OUT stream`); + } + catch (ex) { + log.warn(`Failed to trim commands from the OUT stream`, ex); + } + } + + public async start() { + await this.getSupported(); + this.trimInterval = setInterval(this.trimCommandStream.bind(this), TRIM_EVERY_MS); + log.info(`Listening for new commands`); + let loopCommandCheck: () => void; + // eslint-disable-next-line prefer-const + loopCommandCheck = () => { + if (!this.shouldRun) { + log.info(`Finished`); + return; + } + this.readQueue().finally(() => { + return loopCommandCheck(); + }); + } + + loopCommandCheck(); + } +} diff --git a/src/pool-service/IrcConnectionPool.ts b/src/pool-service/IrcConnectionPool.ts index 124fa7e57..09b5f300b 100644 --- a/src/pool-service/IrcConnectionPool.ts +++ b/src/pool-service/IrcConnectionPool.ts @@ -337,6 +337,7 @@ export class IrcConnectionPool { return; } try { + log.debug(`Trimming up to ${this.commandStreamId}`); const trimCount = await this.cmdWriter.xtrim( REDIS_IRC_POOL_COMMAND_IN_STREAM, "MINID", this.commandStreamId ); diff --git a/src/pool-service/IrcPoolClient.ts b/src/pool-service/IrcPoolClient.ts index 96a440bce..a0013cbe0 100644 --- a/src/pool-service/IrcPoolClient.ts +++ b/src/pool-service/IrcPoolClient.ts @@ -14,6 +14,7 @@ import { ClientId, ConnectionCreateArgs, HEARTBEAT_EVERY_MS, import { Logger } from 'matrix-appservice-bridge'; import { EventEmitter } from "stream"; import TypedEmitter from "typed-emitter"; +import { RedisCommandReader } from "./CommandReader"; const log = new Logger('IrcPoolClient'); @@ -32,6 +33,7 @@ export class IrcPoolClient extends (EventEmitter as unknown as new () => TypedEm private commandStreamId = "$"; private missedHeartbeats = 0; private heartbeatInterval?: NodeJS.Timer; + private commandReader: RedisCommandReader; cmdReader: Redis; constructor(url: string) { @@ -45,6 +47,9 @@ export class IrcPoolClient extends (EventEmitter as unknown as new () => TypedEm this.cmdReader = new Redis(url, { lazyConnect: true, }); + this.commandReader = new RedisCommandReader( + this.cmdReader, REDIS_IRC_POOL_COMMAND_OUT_STREAM, this.handleCommand + ); } public updateLastRead(msgId: string) { diff --git a/yarn.lock b/yarn.lock index 97a029689..62291c1e0 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1138,6 +1138,11 @@ resolved "https://registry.yarnpkg.com/@types/semver/-/semver-7.3.13.tgz#da4bfd73f49bd541d28920ab0e2bf0ee80f71c91" integrity sha512-21cFJr9z3g5dW8B0CVI9g2O9beqaThGQ6ZFBqHfwhzLDKUxaqTIy3vnfah/UPkfOiF2pLq+tGz+W8RyCskuslw== +"@types/semver@^7.5.0": + version "7.5.0" + resolved "https://registry.yarnpkg.com/@types/semver/-/semver-7.5.0.tgz#591c1ce3a702c45ee15f47a42ade72c2fd78978a" + integrity sha512-G8hZ6XJiHnuhQKR7ZmysCeJWE08o8T0AXtk5darsCaTVsYZhhgUrq53jizaR2FvsoeCwJhlmwTjkXBY5Pn/ZHw== + "@types/serve-static@*": version "1.15.1" resolved "https://registry.yarnpkg.com/@types/serve-static/-/serve-static-1.15.1.tgz#86b1753f0be4f9a1bee68d459fcda5be4ea52b5d" @@ -5477,6 +5482,13 @@ semver@^6.0.0, semver@^6.3.0: resolved "https://registry.yarnpkg.com/semver/-/semver-6.3.0.tgz#ee0a64c8af5e8ceea67687b133761e1becbd1d3d" integrity sha512-b39TBaTSfV6yBrapU89p5fKekE2m/NwnDocOVruQFS1/veMgdzuPcnOM34M6CwxW8jH/lxEa5rBoDeUwu5HHTw== +semver@^7.5.4: + version "7.5.4" + resolved "https://registry.yarnpkg.com/semver/-/semver-7.5.4.tgz#483986ec4ed38e1c6c48c34894a9182dbff68a6e" + integrity sha512-1bCSESV6Pv+i21Hvpxp3Dx+pSD8lIPt8uVjRrxAUt/nbswYc+tK6Y2btiULjd4+fnq15PX+nqQDC7Oft7WkwcA== + dependencies: + lru-cache "^6.0.0" + send@0.18.0: version "0.18.0" resolved "https://registry.yarnpkg.com/send/-/send-0.18.0.tgz#670167cc654b05f5aa4a767f9113bb371bc706be" From 961469c7071383bfc47e9d2e7171aafaa802c2fd Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Tue, 1 Aug 2023 10:43:59 +0100 Subject: [PATCH 2/7] Switch pool client and connection pool to command reader --- src/pool-service/CommandReader.ts | 21 ++----- src/pool-service/IrcConnectionPool.ts | 42 ++++--------- src/pool-service/IrcPoolClient.ts | 91 +++++---------------------- 3 files changed, 36 insertions(+), 118 deletions(-) diff --git a/src/pool-service/CommandReader.ts b/src/pool-service/CommandReader.ts index 849d5bbd3..5a1a1f9d6 100644 --- a/src/pool-service/CommandReader.ts +++ b/src/pool-service/CommandReader.ts @@ -1,5 +1,4 @@ import { Redis } from "ioredis"; -import { READ_BUFFER_MAGIC_BYTES } from "./types"; import semver from "semver"; import { Logger } from "matrix-appservice-bridge"; @@ -9,7 +8,7 @@ const TRIM_MAXLEN_COUNT = 100_000; const log = new Logger('RedisCommandReader'); -export class RedisCommandReader { +export class RedisCommandReader { private shouldRun = true; private commandStreamId = "$" @@ -19,7 +18,7 @@ export class RedisCommandReader { constructor( private readonly redis: Redis, private readonly streamName: string, - private readonly onCommand: (cmdType: CommandType, cmdPayload: PayloadType) => Promise) { + private readonly onCommand: (cmdType: string, cmdPayload: string) => Promise) { } @@ -49,17 +48,9 @@ export class RedisCommandReader { for (const [msgId, [cmdType, payload]] of newCmds[0][1]) { // If we crash, we don't want to get stuck on this msg. this.updateLastRead(msgId); - const commandType = cmdType as CommandType; - let commandData: PayloadType|Buffer; - if (typeof payload === 'string' && payload[0] === '{') { - commandData = JSON.parse(payload) as PayloadType; - } - else { - commandData = Buffer.from(payload).subarray(READ_BUFFER_MAGIC_BYTES.length); - } setImmediate( - () => this.onCommand(commandType, commandData) - .catch(ex => log.warn(`Failed to handle msg ${msgId} (${commandType}, ${payload})`, ex) + () => this.onCommand(cmdType, payload) + .catch(ex => log.warn(`Failed to handle msg ${msgId} (${cmdType}, ${payload})`, ex) ), ); } @@ -94,10 +85,10 @@ export class RedisCommandReader { this.streamName, "MAXLEN", TRIM_MAXLEN_COUNT ); } - log.debug(`Trimmed ${trimCount} commands from the OUT stream`); + log.debug(`Trimmed ${trimCount} commands from the stream`); } catch (ex) { - log.warn(`Failed to trim commands from the OUT stream`, ex); + log.warn(`Failed to trim commands from the stream`, ex); } } diff --git a/src/pool-service/IrcConnectionPool.ts b/src/pool-service/IrcConnectionPool.ts index 09b5f300b..b063612d8 100644 --- a/src/pool-service/IrcConnectionPool.ts +++ b/src/pool-service/IrcConnectionPool.ts @@ -20,6 +20,7 @@ import { OutCommandType, import { parseMessage } from 'matrix-org-irc'; import { collectDefaultMetrics, register, Gauge } from 'prom-client'; import { createServer, Server } from 'http'; +import { RedisCommandReader } from './CommandReader'; collectDefaultMetrics(); @@ -52,6 +53,7 @@ export class IrcConnectionPool { private metricsServer?: Server; private shouldRun = true; private heartbeatTimer?: NodeJS.Timer; + private readonly commandReader: RedisCommandReader; constructor(private readonly config: typeof Config) { this.shouldRun = false; @@ -60,6 +62,9 @@ export class IrcConnectionPool { this.cmdWriter.on('connecting', () => { log.debug('Connecting to', config.redisUri); }); + this.commandReader = new RedisCommandReader( + this.cmdReader, REDIS_IRC_POOL_COMMAND_IN_STREAM, this.handleStreamCommand.bind(this) + ); } private updateLastRead(lastRead: string) { @@ -307,6 +312,12 @@ export class IrcConnectionPool { } } + private async handleStreamCommand(cmdType: string, payload: string) { + const commandType = cmdType as InCommandType; + const commandData = JSON.parse(payload) as IrcConnectionPoolCommandIn; + return this.handleCommand(commandType, commandData); + } + public async handleInternalPing({ info }: IrcConnectionPoolCommandIn) { const { clientId } = info; const conn = this.connections.get(clientId); @@ -411,38 +422,11 @@ export class IrcConnectionPool { void this.trimCommandStream(); }, HEARTBEAT_EVERY_MS); - - log.info(`Listening for new commands`); - setImmediate(async () => { - while (this.shouldRun) { - const newCmds = await this.cmdReader.xread( - "BLOCK", 0, "STREAMS", REDIS_IRC_POOL_COMMAND_IN_STREAM, this.commandStreamId - ).catch(ex => { - log.warn(`Failed to read new command:`, ex); - return null; - }); - if (newCmds === null) { - // Unexpected, this is blocking. - continue; - } - // This is a list of keys, containing a list of commands, hence needing to deeply extract the values. - for (const [msgId, [cmdType, payload]] of newCmds[0][1]) { - const commandType = cmdType as InCommandType; - - // If we crash, we don't want to get stuck on this msg. - await this.updateLastRead(msgId); - const commandData = JSON.parse(payload) as IrcConnectionPoolCommandIn; - setImmediate( - () => this.handleCommand(commandType, commandData) - .catch(ex => log.warn(`Failed to handle msg ${msgId} (${commandType}, ${payload})`, ex) - ), - ); - } - } - }); + return this.commandReader.start(); } public async close() { + this.commandReader.stop(); if (this.heartbeatTimer) { clearInterval(this.heartbeatTimer) } diff --git a/src/pool-service/IrcPoolClient.ts b/src/pool-service/IrcPoolClient.ts index a0013cbe0..789aea572 100644 --- a/src/pool-service/IrcPoolClient.ts +++ b/src/pool-service/IrcPoolClient.ts @@ -8,8 +8,7 @@ import { ClientId, ConnectionCreateArgs, HEARTBEAT_EVERY_MS, PROTOCOL_VERSION, READ_BUFFER_MAGIC_BYTES, REDIS_IRC_POOL_COMMAND_IN_STREAM, REDIS_IRC_POOL_COMMAND_OUT_STREAM, - REDIS_IRC_POOL_CONNECTIONS, REDIS_IRC_POOL_HEARTBEAT_KEY, REDIS_IRC_POOL_VERSION_KEY, - REDIS_IRC_POOL_COMMAND_OUT_STREAM_LAST_READ } from "./types"; + REDIS_IRC_POOL_CONNECTIONS, REDIS_IRC_POOL_HEARTBEAT_KEY, REDIS_IRC_POOL_VERSION_KEY } from "./types"; import { Logger } from 'matrix-appservice-bridge'; import { EventEmitter } from "stream"; @@ -20,7 +19,6 @@ const log = new Logger('IrcPoolClient'); const CONNECTION_TIMEOUT = 40000; const MAX_MISSED_HEARTBEATS = 5; -const COMMAND_BLOCK_TIMEOUT = HEARTBEAT_EVERY_MS * 2; type Events = { lostConnection: () => void, @@ -30,10 +28,9 @@ export class IrcPoolClient extends (EventEmitter as unknown as new () => TypedEm private readonly redis: Redis; private readonly connections = new Map>(); public shouldRun = true; - private commandStreamId = "$"; private missedHeartbeats = 0; private heartbeatInterval?: NodeJS.Timer; - private commandReader: RedisCommandReader; + private commandReader: RedisCommandReader; cmdReader: Redis; constructor(url: string) { @@ -48,17 +45,10 @@ export class IrcPoolClient extends (EventEmitter as unknown as new () => TypedEm lazyConnect: true, }); this.commandReader = new RedisCommandReader( - this.cmdReader, REDIS_IRC_POOL_COMMAND_OUT_STREAM, this.handleCommand + this.cmdReader, REDIS_IRC_POOL_COMMAND_OUT_STREAM, this.handleStreamCommand.bind(this) ); } - public updateLastRead(msgId: string) { - this.commandStreamId = msgId; - this.redis.set(REDIS_IRC_POOL_COMMAND_OUT_STREAM_LAST_READ, msgId).catch((ex) => { - log.error(`Failed to update last-read to ${msgId}`, ex); - }) - } - public async sendCommand(type: T, payload: InCommandPayload[T]) { await this.redis.xadd(REDIS_IRC_POOL_COMMAND_IN_STREAM, "*", type, JSON.stringify({ origin_ts: Date.now(), @@ -146,6 +136,18 @@ export class IrcPoolClient extends (EventEmitter as unknown as new () => TypedEm } } + private async handleStreamCommand(cmdType: string, payload: string) { + const commandType = cmdType as OutCommandType|ClientId; + let commandData: IrcConnectionPoolCommandOut|Buffer; + if (typeof payload === 'string' && payload[0] === '{') { + commandData = JSON.parse(payload) as IrcConnectionPoolCommandOut; + } + else { + commandData = Buffer.from(payload).subarray(READ_BUFFER_MAGIC_BYTES.length); + } + return this.handleCommand(commandType, commandData); + } + private async handleCommand( commandTypeOrClientId: T|ClientId, commandData: IrcConnectionPoolCommandOut|Buffer) { // I apologise about this insanity. @@ -197,6 +199,7 @@ export class IrcPoolClient extends (EventEmitter as unknown as new () => TypedEm } public async close() { + this.commandReader.stop(); if (!this.shouldRun) { // Already killed, just exit. log.warn("close called, but pool client is not running"); @@ -213,57 +216,10 @@ export class IrcPoolClient extends (EventEmitter as unknown as new () => TypedEm this.shouldRun = false; } - public async handleIncomingCommand() { - const newCmds = await this.cmdReader.xread( - "BLOCK", COMMAND_BLOCK_TIMEOUT, "STREAMS", REDIS_IRC_POOL_COMMAND_OUT_STREAM, this.commandStreamId - ).catch(ex => { - log.warn(`Failed to read new command:`, ex); - return null; - }); - if (newCmds === null) { - // Implies we might be listening for stale messages. - this.commandStreamId = '$'; - return; - } - for (const [msgId, [cmdType, payload]] of newCmds[0][1]) { - const commandType = cmdType as OutCommandType|ClientId; - let commandData: IrcConnectionPoolCommandOut|Buffer; - if (typeof payload === 'string' && payload[0] === '{') { - commandData = JSON.parse(payload) as IrcConnectionPoolCommandOut; - } - else { - commandData = Buffer.from(payload).subarray(READ_BUFFER_MAGIC_BYTES.length); - } - setImmediate( - () => this.handleCommand(commandType, commandData) - .catch(ex => log.warn(`Failed to handle msg ${msgId} (${commandType}, ${payload})`, ex) - ), - ); - this.updateLastRead(msgId); - } - } - - private async trimCommandStream() { - if (this.commandStreamId === '$') { - // At the head of the queue, don't trim. - return; - } - try { - const trimCount = await this.redis.xtrim( - REDIS_IRC_POOL_COMMAND_OUT_STREAM, "MINID", this.commandStreamId - ); - log.debug(`Trimmed ${trimCount} commands from the OUT stream`); - } - catch (ex) { - log.warn(`Failed to trim commands from the OUT stream`, ex); - } - } - private async checkHeartbeat() { const lastHeartbeat = parseInt(await this.redis.get(REDIS_IRC_POOL_HEARTBEAT_KEY) ?? '0'); if (lastHeartbeat + HEARTBEAT_EVERY_MS + 1000 > Date.now()) { this.missedHeartbeats = 0; - void this.trimCommandStream(); return; } @@ -279,7 +235,6 @@ export class IrcPoolClient extends (EventEmitter as unknown as new () => TypedEm public async listen() { log.info(`Listening for new commands`); - let loopCommandCheck: () => void; await this.cmdReader.connect(); await this.redis.connect(); @@ -301,18 +256,6 @@ export class IrcPoolClient extends (EventEmitter as unknown as new () => TypedEm } this.heartbeatInterval = setInterval(this.checkHeartbeat.bind(this), HEARTBEAT_EVERY_MS); - - // eslint-disable-next-line prefer-const - loopCommandCheck = () => { - if (!this.shouldRun) { - log.info(`Finished`); - return; - } - this.handleIncomingCommand().finally(() => { - return loopCommandCheck(); - }); - } - - loopCommandCheck(); + await this.commandReader.start(); } } From 12ff0710dcdb131b5baf3b072d1bd9a8b860c38c Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Tue, 1 Aug 2023 10:48:44 +0100 Subject: [PATCH 3/7] Document version requirement. --- docs/connection_pooling.md | 4 +++- src/pool-service/CommandReader.ts | 10 +++++++++- src/pool-service/IrcConnectionPool.ts | 4 ---- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/docs/connection_pooling.md b/docs/connection_pooling.md index 8145798c0..b1b4e7b11 100644 --- a/docs/connection_pooling.md +++ b/docs/connection_pooling.md @@ -9,7 +9,9 @@ The IRC bridge can be configured to run it's IRC connections through a seperate allowing you to restart and update (in most cases) the main process while keeping connections alive. This in effect allows you to have a bridge that *appears* to not restart (sometimes nicknamed eternal bridges). -To configure the bridge in this mode you will need to setup a [Redis](https://redis.io/) instance. +To configure the bridge in this mode you will need to setup a [Redis](https://redis.io/) instance. Ideally, you +**should** run the bridge with Redis `6.2.0` or greater as it is more efficent when used with streams. The bridge +requires Redis `5.0.0` or greater to run. In your bridge, configure the following: diff --git a/src/pool-service/CommandReader.ts b/src/pool-service/CommandReader.ts index 5a1a1f9d6..085b3b0fb 100644 --- a/src/pool-service/CommandReader.ts +++ b/src/pool-service/CommandReader.ts @@ -63,7 +63,15 @@ export class RedisCommandReader { ) as [string, string][]; const options = new Map(serverLines); const version = options.get('redis_version'); - this.supportsMinId = !!(version && semver.satisfies(version, '>=6.2')); + if (!version) { + log.warn(`Unable to identify Redis version, assuming unsupported version`); + this.supportsMinId = false; + return; + } + if (semver.lt(version, '5.0.0')) { + throw new Error('Redis version is unsupported. The minimum required version is 5.0.0'); + } + this.supportsMinId = !!semver.satisfies(version, '>=6.2'); } private async trimCommandStream() { diff --git a/src/pool-service/IrcConnectionPool.ts b/src/pool-service/IrcConnectionPool.ts index b063612d8..c5d3bb9cc 100644 --- a/src/pool-service/IrcConnectionPool.ts +++ b/src/pool-service/IrcConnectionPool.ts @@ -67,10 +67,6 @@ export class IrcConnectionPool { ); } - private updateLastRead(lastRead: string) { - this.commandStreamId = lastRead; - } - private async sendCommandOut(type: T, payload: OutCommandPayload[T]) { await this.cmdWriter.xadd(REDIS_IRC_POOL_COMMAND_OUT_STREAM, "*", type, JSON.stringify({ info: payload, From cdb426d62c4006e47b43c9ce3f458ff61eb9942d Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Tue, 1 Aug 2023 10:49:51 +0100 Subject: [PATCH 4/7] changelog --- changelog.d/1763.bugfix | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 changelog.d/1763.bugfix diff --git a/changelog.d/1763.bugfix b/changelog.d/1763.bugfix new file mode 100644 index 000000000..1ea52c0ff --- /dev/null +++ b/changelog.d/1763.bugfix @@ -0,0 +1,2 @@ +Fix Redis <=6.2 failing to clear the command queue in pooling mode. + From 610f6dc31ecbd710d57861608a08d030415adb28 Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Tue, 1 Aug 2023 10:52:29 +0100 Subject: [PATCH 5/7] A newline --- src/pool-service/CommandReader.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/src/pool-service/CommandReader.ts b/src/pool-service/CommandReader.ts index 085b3b0fb..e94ecf2c5 100644 --- a/src/pool-service/CommandReader.ts +++ b/src/pool-service/CommandReader.ts @@ -9,7 +9,6 @@ const TRIM_MAXLEN_COUNT = 100_000; const log = new Logger('RedisCommandReader'); export class RedisCommandReader { - private shouldRun = true; private commandStreamId = "$" private supportsMinId = false; From 5f651cd000bb17892decd1832e1003550a552430 Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Tue, 1 Aug 2023 10:53:04 +0100 Subject: [PATCH 6/7] Move types --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index f596b08fe..303902f5b 100644 --- a/package.json +++ b/package.json @@ -33,7 +33,6 @@ }, "dependencies": { "@sentry/node": "^6.17.9", - "@types/semver": "^7.5.0", "ajv": "^8.12.0", "bluebird": "^3.7.2", "classnames": "^2.3.2", @@ -79,6 +78,7 @@ "@types/react": "^18.0.26", "@types/react-dom": "^18.0.9", "@types/sanitize-html": "^2.6.2", + "@types/semver": "^7.5.0", "@typescript-eslint/eslint-plugin": "^5.38.0", "@typescript-eslint/parser": "^5.38.0", "@vitejs/plugin-react": "^3.0.1", From 64b019a64f8605f41fe5c5304266a197bc838f22 Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Tue, 1 Aug 2023 16:46:51 +0100 Subject: [PATCH 7/7] Tidy up version check --- src/pool-service/CommandReader.ts | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/src/pool-service/CommandReader.ts b/src/pool-service/CommandReader.ts index e94ecf2c5..334d02b5e 100644 --- a/src/pool-service/CommandReader.ts +++ b/src/pool-service/CommandReader.ts @@ -57,16 +57,26 @@ export class RedisCommandReader { } public async getSupported() { - const serverLines = (await - this.redis.info("Server")).split('\n').filter(v => !v.startsWith('#')).map(v => v.split(':', 2) - ) as [string, string][]; - const options = new Map(serverLines); + let options: Map; + try { + // Fetch the "Server" info block and parse out the various lines. + const serverLines = ( + await this.redis.info("Server") + ).split('\n').filter(v => !v.startsWith('#')).map(v => v.split(':', 2)) as [string, string][]; + options = new Map(serverLines); + } + catch (ex) { + log.error("Failed to fetch server info from Redis", ex); + // Treat it as if we got zero useful options back. + options = new Map(); + } const version = options.get('redis_version'); if (!version) { - log.warn(`Unable to identify Redis version, assuming unsupported version`); + log.warn(`Unable to identify Redis version, assuming unsupported version.`); this.supportsMinId = false; return; } + // We did get a server version back but we know it's unsupported. if (semver.lt(version, '5.0.0')) { throw new Error('Redis version is unsupported. The minimum required version is 5.0.0'); }