From 61af8b4ceb666c3b4531338b4368413c286ccd0d Mon Sep 17 00:00:00 2001 From: Will Hunt Date: Wed, 26 Jul 2023 14:18:49 +0100 Subject: [PATCH] Performance improvements for pooling stream reader (#1751) * Read all messages from the polling in the IrcPoolClient * Read all messages from a poll from the connection pool * Keep more history in the trim job * Don't cache message head for the pool between restarts * Cache the read head for the client * Trim up to last message, and trim on the correct sides. * changelog * Don't trim on $ * Try tracking the correct dir * Fix archive path * Wait for charlie to join before setting a PL * Trace basic to file * Don't trim $ --- .github/workflows/e2e-test.yml | 4 +- changelog.d/1751.bugfix | 1 + spec/e2e/basic.spec.ts | 1 + spec/e2e/powerlevels.spec.ts | 6 ++- spec/util/e2e-test.ts | 1 - src/pool-service/IrcConnectionPool.ts | 68 +++++++++++++-------------- src/pool-service/IrcPoolClient.ts | 63 ++++++++++++++++++------- src/pool-service/types.ts | 3 +- 8 files changed, 89 insertions(+), 58 deletions(-) create mode 100644 changelog.d/1751.bugfix diff --git a/.github/workflows/e2e-test.yml b/.github/workflows/e2e-test.yml index 26fe9a2df..472ab9b22 100644 --- a/.github/workflows/e2e-test.yml +++ b/.github/workflows/e2e-test.yml @@ -144,7 +144,7 @@ jobs: with: name: e2e-traces path: | - .e2e-traces + matrix-appservice-irc/.e2e-traces integration-test-pool: runs-on: ubuntu-latest @@ -221,4 +221,4 @@ jobs: with: name: e2e-traces-pool path: | - .e2e-traces \ No newline at end of file + matrix-appservice-irc/.e2e-traces \ No newline at end of file diff --git a/changelog.d/1751.bugfix b/changelog.d/1751.bugfix new file mode 100644 index 000000000..08ad0a378 --- /dev/null +++ b/changelog.d/1751.bugfix @@ -0,0 +1 @@ +Improve processing speed of commands sent to and from the proxy when the bridge is configured in pooling mode. \ No newline at end of file diff --git a/spec/e2e/basic.spec.ts b/spec/e2e/basic.spec.ts index 6215786da..c62dadf19 100644 --- a/spec/e2e/basic.spec.ts +++ b/spec/e2e/basic.spec.ts @@ -9,6 +9,7 @@ describe('Basic bridge usage', () => { testEnv = await IrcBridgeE2ETest.createTestEnv({ matrixLocalparts: ['alice'], ircNicks: ['bob'], + traceToFile: true, }); await testEnv.setUp(); }); diff --git a/spec/e2e/powerlevels.spec.ts b/spec/e2e/powerlevels.spec.ts index e9ec0858f..2e4ae0009 100644 --- a/spec/e2e/powerlevels.spec.ts +++ b/spec/e2e/powerlevels.spec.ts @@ -31,18 +31,20 @@ describe('Ensure powerlevels are appropriately applied', () => { const cRoomId = await testEnv.joinChannelHelper(alice, await testEnv.createAdminRoomHelper(alice), channel); // Now have charlie join and be opped. + const charlieJoinPromise = bob.waitForEvent('join'); await charlie.join(channel); const operatorPL = testEnv.ircBridge.config.ircService.servers.localhost.modePowerMap!.o; const plEvent = alice.waitForPowerLevel( cRoomId, { users: { + [bobUserId]: operatorPL, [charlieUserId]: operatorPL, [testEnv.ircBridge.appServiceUserId]: 100, - [bobUserId]: operatorPL, }, - } + }, ); + await charlieJoinPromise; await bob.send('MODE', channel, '+o', charlie.nick); await plEvent; }); diff --git a/spec/util/e2e-test.ts b/spec/util/e2e-test.ts index 2ab90fdd5..071d5d8fc 100644 --- a/spec/util/e2e-test.ts +++ b/spec/util/e2e-test.ts @@ -60,7 +60,6 @@ export class E2ETestMatrixClient extends MatrixClient { const sortOrder = value !== null && typeof value === "object" ? Object.keys(value).sort() : undefined; const jsonLeft = JSON.stringify(evValue, sortOrder); const jsonRight = JSON.stringify(value, sortOrder); - console.log(jsonLeft, "---", jsonRight); if (jsonLeft !== jsonRight) { return undefined; } diff --git a/src/pool-service/IrcConnectionPool.ts b/src/pool-service/IrcConnectionPool.ts index 38bc84e7f..124fa7e57 100644 --- a/src/pool-service/IrcConnectionPool.ts +++ b/src/pool-service/IrcConnectionPool.ts @@ -2,7 +2,7 @@ import { Redis } from 'ioredis'; import { Logger, LogLevel } from 'matrix-appservice-bridge'; import { createConnection, Socket } from 'net'; import tls from 'tls'; -import { REDIS_IRC_POOL_COMMAND_IN_STREAM_LAST_READ, OutCommandType, +import { OutCommandType, REDIS_IRC_POOL_COMMAND_OUT_STREAM, IrcConnectionPoolCommandIn, ConnectionCreateArgs, InCommandType, CommandError, REDIS_IRC_POOL_HEARTBEAT_KEY, @@ -25,7 +25,7 @@ collectDefaultMetrics(); const log = new Logger('IrcConnectionPool'); const TIME_TO_WAIT_BEFORE_PONG = 10000; -const STREAM_HISTORY_MAXLEN = 50; + const Config = { redisUri: process.env.REDIS_URL ?? 'redis://localhost:6379', @@ -64,9 +64,6 @@ export class IrcConnectionPool { private updateLastRead(lastRead: string) { this.commandStreamId = lastRead; - this.cmdWriter.set(REDIS_IRC_POOL_COMMAND_IN_STREAM_LAST_READ, lastRead).catch((ex) => { - log.warn(`Unable to update last-read for command.in`, ex); - }) } private async sendCommandOut(type: T, payload: OutCommandPayload[T]) { @@ -334,6 +331,22 @@ export class IrcConnectionPool { }); } + private async trimCommandStream() { + if (this.commandStreamId === '$') { + // At the head of the queue, don't trim. + return; + } + try { + const trimCount = await this.cmdWriter.xtrim( + REDIS_IRC_POOL_COMMAND_IN_STREAM, "MINID", this.commandStreamId + ); + log.debug(`Trimmed ${trimCount} commands from the IN stream`); + } + catch (ex) { + log.warn(`Failed to trim commands from the IN stream`, ex); + } + } + public async start() { if (this.shouldRun) { // Is already running! @@ -382,7 +395,7 @@ export class IrcConnectionPool { await this.sendHeartbeat(); // Fetch the last read index. - this.commandStreamId = await this.cmdWriter.get(REDIS_IRC_POOL_COMMAND_IN_STREAM_LAST_READ) || "$"; + this.commandStreamId = "$"; // Warn of any existing connections. await this.cmdWriter.del(REDIS_IRC_POOL_CONNECTIONS); @@ -394,49 +407,36 @@ export class IrcConnectionPool { this.sendHeartbeat().catch((ex) => { log.warn(`Failed to send heartbeat`, ex); }); - this.cmdWriter.xtrim( - REDIS_IRC_POOL_COMMAND_IN_STREAM, "MAXLEN", "~", STREAM_HISTORY_MAXLEN - ).then(trimCount => { - log.debug(`Trimmed ${trimCount} commands from the IN stream`); - }).catch((ex) => { - log.warn(`Failed to trim commands from the IN stream`, ex); - }); - this.cmdWriter.xtrim( - REDIS_IRC_POOL_COMMAND_OUT_STREAM, "MAXLEN", "~", STREAM_HISTORY_MAXLEN - ).then(trimCount => { - log.debug(`Trimmed ${trimCount} commands from the OUT stream`); - }).catch((ex) => { - log.warn(`Failed to trim commands from the OUT stream`, ex); - }); + void this.trimCommandStream(); }, HEARTBEAT_EVERY_MS); log.info(`Listening for new commands`); setImmediate(async () => { while (this.shouldRun) { - const newCmd = await this.cmdReader.xread( + const newCmds = await this.cmdReader.xread( "BLOCK", 0, "STREAMS", REDIS_IRC_POOL_COMMAND_IN_STREAM, this.commandStreamId ).catch(ex => { log.warn(`Failed to read new command:`, ex); return null; }); - if (newCmd === null) { + if (newCmds === null) { // Unexpected, this is blocking. continue; } // This is a list of keys, containing a list of commands, hence needing to deeply extract the values. - const [msgId, [cmdType, payload]] = newCmd[0][1][0]; - - const commandType = cmdType as InCommandType; - - // If we crash, we don't want to get stuck on this msg. - await this.updateLastRead(msgId); - const commandData = JSON.parse(payload) as IrcConnectionPoolCommandIn; - setImmediate( - () => this.handleCommand(commandType, commandData) - .catch(ex => log.warn(`Failed to handle msg ${msgId} (${commandType}, ${payload})`, ex) - ), - ); + for (const [msgId, [cmdType, payload]] of newCmds[0][1]) { + const commandType = cmdType as InCommandType; + + // If we crash, we don't want to get stuck on this msg. + await this.updateLastRead(msgId); + const commandData = JSON.parse(payload) as IrcConnectionPoolCommandIn; + setImmediate( + () => this.handleCommand(commandType, commandData) + .catch(ex => log.warn(`Failed to handle msg ${msgId} (${commandType}, ${payload})`, ex) + ), + ); + } } }); } diff --git a/src/pool-service/IrcPoolClient.ts b/src/pool-service/IrcPoolClient.ts index 286f64238..96a440bce 100644 --- a/src/pool-service/IrcPoolClient.ts +++ b/src/pool-service/IrcPoolClient.ts @@ -8,7 +8,8 @@ import { ClientId, ConnectionCreateArgs, HEARTBEAT_EVERY_MS, PROTOCOL_VERSION, READ_BUFFER_MAGIC_BYTES, REDIS_IRC_POOL_COMMAND_IN_STREAM, REDIS_IRC_POOL_COMMAND_OUT_STREAM, - REDIS_IRC_POOL_CONNECTIONS, REDIS_IRC_POOL_HEARTBEAT_KEY, REDIS_IRC_POOL_VERSION_KEY } from "./types"; + REDIS_IRC_POOL_CONNECTIONS, REDIS_IRC_POOL_HEARTBEAT_KEY, REDIS_IRC_POOL_VERSION_KEY, + REDIS_IRC_POOL_COMMAND_OUT_STREAM_LAST_READ } from "./types"; import { Logger } from 'matrix-appservice-bridge'; import { EventEmitter } from "stream"; @@ -18,6 +19,7 @@ const log = new Logger('IrcPoolClient'); const CONNECTION_TIMEOUT = 40000; const MAX_MISSED_HEARTBEATS = 5; +const COMMAND_BLOCK_TIMEOUT = HEARTBEAT_EVERY_MS * 2; type Events = { lostConnection: () => void, @@ -45,6 +47,13 @@ export class IrcPoolClient extends (EventEmitter as unknown as new () => TypedEm }); } + public updateLastRead(msgId: string) { + this.commandStreamId = msgId; + this.redis.set(REDIS_IRC_POOL_COMMAND_OUT_STREAM_LAST_READ, msgId).catch((ex) => { + log.error(`Failed to update last-read to ${msgId}`, ex); + }) + } + public async sendCommand(type: T, payload: InCommandPayload[T]) { await this.redis.xadd(REDIS_IRC_POOL_COMMAND_IN_STREAM, "*", type, JSON.stringify({ origin_ts: Date.now(), @@ -200,38 +209,56 @@ export class IrcPoolClient extends (EventEmitter as unknown as new () => TypedEm } public async handleIncomingCommand() { - const newCmd = await this.cmdReader.xread( - "BLOCK", 0, "STREAMS", REDIS_IRC_POOL_COMMAND_OUT_STREAM, this.commandStreamId + const newCmds = await this.cmdReader.xread( + "BLOCK", COMMAND_BLOCK_TIMEOUT, "STREAMS", REDIS_IRC_POOL_COMMAND_OUT_STREAM, this.commandStreamId ).catch(ex => { log.warn(`Failed to read new command:`, ex); return null; }); - if (newCmd === null) { - // Unexpected, this is blocking. + if (newCmds === null) { + // Implies we might be listening for stale messages. + this.commandStreamId = '$'; return; } - const [msgId, [cmdType, payload]] = newCmd[0][1][0]; - this.commandStreamId = msgId; + for (const [msgId, [cmdType, payload]] of newCmds[0][1]) { + const commandType = cmdType as OutCommandType|ClientId; + let commandData: IrcConnectionPoolCommandOut|Buffer; + if (typeof payload === 'string' && payload[0] === '{') { + commandData = JSON.parse(payload) as IrcConnectionPoolCommandOut; + } + else { + commandData = Buffer.from(payload).subarray(READ_BUFFER_MAGIC_BYTES.length); + } + setImmediate( + () => this.handleCommand(commandType, commandData) + .catch(ex => log.warn(`Failed to handle msg ${msgId} (${commandType}, ${payload})`, ex) + ), + ); + this.updateLastRead(msgId); + } + } - const commandType = cmdType as OutCommandType|ClientId; - let commandData: IrcConnectionPoolCommandOut|Buffer; - if (typeof payload === 'string' && payload[0] === '{') { - commandData = JSON.parse(payload) as IrcConnectionPoolCommandOut; + private async trimCommandStream() { + if (this.commandStreamId === '$') { + // At the head of the queue, don't trim. + return; } - else { - commandData = Buffer.from(payload).subarray(READ_BUFFER_MAGIC_BYTES.length); + try { + const trimCount = await this.redis.xtrim( + REDIS_IRC_POOL_COMMAND_OUT_STREAM, "MINID", this.commandStreamId + ); + log.debug(`Trimmed ${trimCount} commands from the OUT stream`); + } + catch (ex) { + log.warn(`Failed to trim commands from the OUT stream`, ex); } - setImmediate( - () => this.handleCommand(commandType, commandData) - .catch(ex => log.warn(`Failed to handle msg ${msgId} (${commandType}, ${payload})`, ex) - ), - ); } private async checkHeartbeat() { const lastHeartbeat = parseInt(await this.redis.get(REDIS_IRC_POOL_HEARTBEAT_KEY) ?? '0'); if (lastHeartbeat + HEARTBEAT_EVERY_MS + 1000 > Date.now()) { this.missedHeartbeats = 0; + void this.trimCommandStream(); return; } diff --git a/src/pool-service/types.ts b/src/pool-service/types.ts index 12b342da2..140333fa8 100644 --- a/src/pool-service/types.ts +++ b/src/pool-service/types.ts @@ -12,7 +12,8 @@ export const REDIS_IRC_POOL_VERSION_KEY = "ircbridge.poolversion"; export const REDIS_IRC_POOL_HEARTBEAT_KEY = "ircbridge.pool.💓"; export const REDIS_IRC_POOL_COMMAND_OUT_STREAM = "ircbridge.stream.command.out"; export const REDIS_IRC_POOL_COMMAND_IN_STREAM = "ircbridge.stream.command.in"; -export const REDIS_IRC_POOL_COMMAND_IN_STREAM_LAST_READ = "ircbridge.stream.command.last-read"; + +export const REDIS_IRC_POOL_COMMAND_OUT_STREAM_LAST_READ = "ircbridge.stream.out.command.last-read"; export const REDIS_IRC_POOL_CONNECTIONS = "ircbridge.connections"; export const REDIS_IRC_CLIENT_STATE_KEY = `ircbridge.clientstate`; //client-id