Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Redis <=6.2 throwing errors when trying to clear the command queue in pooling mode. #1763

Merged
merged 7 commits into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions changelog.d/1763.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Fix Redis <=6.2 failing to clear the command queue in pooling mode.

4 changes: 3 additions & 1 deletion docs/connection_pooling.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ The IRC bridge can be configured to run it's IRC connections through a seperate
allowing you to restart and update (in most cases) the main process while keeping connections alive. This in
effect allows you to have a bridge that *appears* to not restart (sometimes nicknamed eternal bridges).

To configure the bridge in this mode you will need to setup a [Redis](https://redis.io/) instance.
To configure the bridge in this mode you will need to setup a [Redis](https://redis.io/) instance. Ideally, you
**should** run the bridge with Redis `6.2.0` or greater as it is more efficent when used with streams. The bridge
requires Redis `5.0.0` or greater to run.

In your bridge, configure the following:

Expand Down
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
"react": "^18.2.0",
"react-dom": "^18.2.0",
"sanitize-html": "^2.7.2",
"semver": "^7.5.4",
"typed-emitter": "^2.1.0",
"typescript": "^5.0.4",
"url-join": "^5.0.0",
Expand All @@ -77,6 +78,7 @@
"@types/react": "^18.0.26",
"@types/react-dom": "^18.0.9",
"@types/sanitize-html": "^2.6.2",
"@types/semver": "^7.5.0",
"@typescript-eslint/eslint-plugin": "^5.38.0",
"@typescript-eslint/parser": "^5.38.0",
"@vitejs/plugin-react": "^3.0.1",
Expand Down
130 changes: 130 additions & 0 deletions src/pool-service/CommandReader.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import { Redis } from "ioredis";
import semver from "semver";
import { Logger } from "matrix-appservice-bridge";

const TRIM_EVERY_MS = 30000;
const COMMAND_BLOCK_TIMEOUT = 10000;
const TRIM_MAXLEN_COUNT = 100_000;

const log = new Logger('RedisCommandReader');

export class RedisCommandReader {
private shouldRun = true;
private commandStreamId = "$"
private supportsMinId = false;
private trimInterval?: NodeJS.Timer;

constructor(
private readonly redis: Redis,
private readonly streamName: string,
private readonly onCommand: (cmdType: string, cmdPayload: string) => Promise<void>) {

}

private updateLastRead(lastRead: string) {
this.commandStreamId = lastRead;
}

public stop() {
this.shouldRun = false;
clearInterval(this.trimInterval);
}

public async readQueue() {
const newCmds = await this.redis.xread(
"BLOCK", COMMAND_BLOCK_TIMEOUT, "STREAMS", this.streamName, this.commandStreamId
).catch(ex => {
log.warn(`Failed to read new command:`, ex);
return null;
});
if (newCmds === null) {
// This means we've waited for some time and seen no new commands, to be safe revert to the HEAD of the queue.
log.info(`Stream has been idle for ${COMMAND_BLOCK_TIMEOUT}ms, listening for messages at $`);
this.commandStreamId = '$';
return;
}
// This is a list of keys, containing a list of commands, hence needing to deeply extract the values.
for (const [msgId, [cmdType, payload]] of newCmds[0][1]) {
// If we crash, we don't want to get stuck on this msg.
this.updateLastRead(msgId);
setImmediate(
() => this.onCommand(cmdType, payload)
.catch(ex => log.warn(`Failed to handle msg ${msgId} (${cmdType}, ${payload})`, ex)
),
);
}

}

public async getSupported() {
let options: Map<string, string>;
try {
// Fetch the "Server" info block and parse out the various lines.
const serverLines = (
await this.redis.info("Server")
).split('\n').filter(v => !v.startsWith('#')).map(v => v.split(':', 2)) as [string, string][];
options = new Map(serverLines);
}
catch (ex) {
log.error("Failed to fetch server info from Redis", ex);
// Treat it as if we got zero useful options back.
options = new Map();
}
const version = options.get('redis_version');
if (!version) {
log.warn(`Unable to identify Redis version, assuming unsupported version.`);
this.supportsMinId = false;
return;
}
// We did get a server version back but we know it's unsupported.
if (semver.lt(version, '5.0.0')) {
throw new Error('Redis version is unsupported. The minimum required version is 5.0.0');
}
this.supportsMinId = !!semver.satisfies(version, '>=6.2');
}

private async trimCommandStream() {
if (this.commandStreamId === '$') {
// At the head of the queue, don't trim.
return;
}
try {
let trimCount;
if (this.supportsMinId) {
trimCount = await this.redis.xtrim(
this.streamName, "MINID", this.commandStreamId
);
}
else {
// If Redis doesn't support minid (requires >=6.2), we can fallback to
// trimming a large amount of messages instead.
trimCount = await this.redis.xtrim(
this.streamName, "MAXLEN", TRIM_MAXLEN_COUNT
);
}
log.debug(`Trimmed ${trimCount} commands from the stream`);
}
catch (ex) {
log.warn(`Failed to trim commands from the stream`, ex);
}
}

public async start() {
await this.getSupported();
this.trimInterval = setInterval(this.trimCommandStream.bind(this), TRIM_EVERY_MS);
log.info(`Listening for new commands`);
let loopCommandCheck: () => void;
// eslint-disable-next-line prefer-const
loopCommandCheck = () => {
if (!this.shouldRun) {
log.info(`Finished`);
return;
}
this.readQueue().finally(() => {
return loopCommandCheck();
});
}

loopCommandCheck();
}
}
47 changes: 14 additions & 33 deletions src/pool-service/IrcConnectionPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import { OutCommandType,
import { parseMessage } from 'matrix-org-irc';
import { collectDefaultMetrics, register, Gauge } from 'prom-client';
import { createServer, Server } from 'http';
import { RedisCommandReader } from './CommandReader';

collectDefaultMetrics();

Expand Down Expand Up @@ -52,6 +53,7 @@ export class IrcConnectionPool {
private metricsServer?: Server;
private shouldRun = true;
private heartbeatTimer?: NodeJS.Timer;
private readonly commandReader: RedisCommandReader;

constructor(private readonly config: typeof Config) {
this.shouldRun = false;
Expand All @@ -60,10 +62,9 @@ export class IrcConnectionPool {
this.cmdWriter.on('connecting', () => {
log.debug('Connecting to', config.redisUri);
});
}

private updateLastRead(lastRead: string) {
this.commandStreamId = lastRead;
this.commandReader = new RedisCommandReader(
this.cmdReader, REDIS_IRC_POOL_COMMAND_IN_STREAM, this.handleStreamCommand.bind(this)
);
}

private async sendCommandOut<T extends OutCommandType>(type: T, payload: OutCommandPayload[T]) {
Expand Down Expand Up @@ -307,6 +308,12 @@ export class IrcConnectionPool {
}
}

private async handleStreamCommand(cmdType: string, payload: string) {
const commandType = cmdType as InCommandType;
const commandData = JSON.parse(payload) as IrcConnectionPoolCommandIn<InCommandType>;
return this.handleCommand(commandType, commandData);
}

public async handleInternalPing({ info }: IrcConnectionPoolCommandIn<InCommandType.ConnectionPing>) {
const { clientId } = info;
const conn = this.connections.get(clientId);
Expand Down Expand Up @@ -337,6 +344,7 @@ export class IrcConnectionPool {
return;
}
try {
log.debug(`Trimming up to ${this.commandStreamId}`);
const trimCount = await this.cmdWriter.xtrim(
REDIS_IRC_POOL_COMMAND_IN_STREAM, "MINID", this.commandStreamId
);
Expand Down Expand Up @@ -410,38 +418,11 @@ export class IrcConnectionPool {
void this.trimCommandStream();
}, HEARTBEAT_EVERY_MS);


log.info(`Listening for new commands`);
setImmediate(async () => {
while (this.shouldRun) {
const newCmds = await this.cmdReader.xread(
"BLOCK", 0, "STREAMS", REDIS_IRC_POOL_COMMAND_IN_STREAM, this.commandStreamId
).catch(ex => {
log.warn(`Failed to read new command:`, ex);
return null;
});
if (newCmds === null) {
// Unexpected, this is blocking.
continue;
}
// This is a list of keys, containing a list of commands, hence needing to deeply extract the values.
for (const [msgId, [cmdType, payload]] of newCmds[0][1]) {
const commandType = cmdType as InCommandType;

// If we crash, we don't want to get stuck on this msg.
await this.updateLastRead(msgId);
const commandData = JSON.parse(payload) as IrcConnectionPoolCommandIn<InCommandType>;
setImmediate(
() => this.handleCommand(commandType, commandData)
.catch(ex => log.warn(`Failed to handle msg ${msgId} (${commandType}, ${payload})`, ex)
),
);
}
}
});
return this.commandReader.start();
}

public async close() {
this.commandReader.stop();
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer)
}
Expand Down
Loading
Loading