Skip to content

Commit

Permalink
Performance improvements for pooling stream reader (#1751)
Browse files Browse the repository at this point in the history
* Read all messages from the polling in the IrcPoolClient

* Read all messages from a poll from the connection pool

* Keep more history in the trim job

* Don't cache message head for the pool between restarts

* Cache the read head for the client

* Trim up to last message, and trim on the correct sides.

* changelog

* Don't trim on $

* Try tracking the correct dir

* Fix archive path

* Wait for charlie to join before setting a PL

* Trace basic to file

* Don't trim $
  • Loading branch information
Half-Shot authored Jul 26, 2023
1 parent 7cfb057 commit 61af8b4
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 58 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/e2e-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ jobs:
with:
name: e2e-traces
path: |
.e2e-traces
matrix-appservice-irc/.e2e-traces
integration-test-pool:
runs-on: ubuntu-latest
Expand Down Expand Up @@ -221,4 +221,4 @@ jobs:
with:
name: e2e-traces-pool
path: |
.e2e-traces
matrix-appservice-irc/.e2e-traces
1 change: 1 addition & 0 deletions changelog.d/1751.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve processing speed of commands sent to and from the proxy when the bridge is configured in pooling mode.
1 change: 1 addition & 0 deletions spec/e2e/basic.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ describe('Basic bridge usage', () => {
testEnv = await IrcBridgeE2ETest.createTestEnv({
matrixLocalparts: ['alice'],
ircNicks: ['bob'],
traceToFile: true,
});
await testEnv.setUp();
});
Expand Down
6 changes: 4 additions & 2 deletions spec/e2e/powerlevels.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,20 @@ describe('Ensure powerlevels are appropriately applied', () => {
const cRoomId = await testEnv.joinChannelHelper(alice, await testEnv.createAdminRoomHelper(alice), channel);

// Now have charlie join and be opped.
const charlieJoinPromise = bob.waitForEvent('join');
await charlie.join(channel);
const operatorPL = testEnv.ircBridge.config.ircService.servers.localhost.modePowerMap!.o;
const plEvent = alice.waitForPowerLevel(
cRoomId, {
users: {
[bobUserId]: operatorPL,
[charlieUserId]: operatorPL,
[testEnv.ircBridge.appServiceUserId]: 100,
[bobUserId]: operatorPL,
},
}
},
);

await charlieJoinPromise;
await bob.send('MODE', channel, '+o', charlie.nick);
await plEvent;
});
Expand Down
1 change: 0 additions & 1 deletion spec/util/e2e-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ export class E2ETestMatrixClient extends MatrixClient {
const sortOrder = value !== null && typeof value === "object" ? Object.keys(value).sort() : undefined;
const jsonLeft = JSON.stringify(evValue, sortOrder);
const jsonRight = JSON.stringify(value, sortOrder);
console.log(jsonLeft, "---", jsonRight);
if (jsonLeft !== jsonRight) {
return undefined;
}
Expand Down
68 changes: 34 additions & 34 deletions src/pool-service/IrcConnectionPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Redis } from 'ioredis';
import { Logger, LogLevel } from 'matrix-appservice-bridge';
import { createConnection, Socket } from 'net';
import tls from 'tls';
import { REDIS_IRC_POOL_COMMAND_IN_STREAM_LAST_READ, OutCommandType,
import { OutCommandType,
REDIS_IRC_POOL_COMMAND_OUT_STREAM, IrcConnectionPoolCommandIn,
ConnectionCreateArgs, InCommandType, CommandError,
REDIS_IRC_POOL_HEARTBEAT_KEY,
Expand All @@ -25,7 +25,7 @@ collectDefaultMetrics();

const log = new Logger('IrcConnectionPool');
const TIME_TO_WAIT_BEFORE_PONG = 10000;
const STREAM_HISTORY_MAXLEN = 50;


const Config = {
redisUri: process.env.REDIS_URL ?? 'redis://localhost:6379',
Expand Down Expand Up @@ -64,9 +64,6 @@ export class IrcConnectionPool {

private updateLastRead(lastRead: string) {
this.commandStreamId = lastRead;
this.cmdWriter.set(REDIS_IRC_POOL_COMMAND_IN_STREAM_LAST_READ, lastRead).catch((ex) => {
log.warn(`Unable to update last-read for command.in`, ex);
})
}

private async sendCommandOut<T extends OutCommandType>(type: T, payload: OutCommandPayload[T]) {
Expand Down Expand Up @@ -334,6 +331,22 @@ export class IrcConnectionPool {
});
}

private async trimCommandStream() {
if (this.commandStreamId === '$') {
// At the head of the queue, don't trim.
return;
}
try {
const trimCount = await this.cmdWriter.xtrim(
REDIS_IRC_POOL_COMMAND_IN_STREAM, "MINID", this.commandStreamId
);
log.debug(`Trimmed ${trimCount} commands from the IN stream`);
}
catch (ex) {
log.warn(`Failed to trim commands from the IN stream`, ex);
}
}

public async start() {
if (this.shouldRun) {
// Is already running!
Expand Down Expand Up @@ -382,7 +395,7 @@ export class IrcConnectionPool {
await this.sendHeartbeat();

// Fetch the last read index.
this.commandStreamId = await this.cmdWriter.get(REDIS_IRC_POOL_COMMAND_IN_STREAM_LAST_READ) || "$";
this.commandStreamId = "$";

// Warn of any existing connections.
await this.cmdWriter.del(REDIS_IRC_POOL_CONNECTIONS);
Expand All @@ -394,49 +407,36 @@ export class IrcConnectionPool {
this.sendHeartbeat().catch((ex) => {
log.warn(`Failed to send heartbeat`, ex);
});
this.cmdWriter.xtrim(
REDIS_IRC_POOL_COMMAND_IN_STREAM, "MAXLEN", "~", STREAM_HISTORY_MAXLEN
).then(trimCount => {
log.debug(`Trimmed ${trimCount} commands from the IN stream`);
}).catch((ex) => {
log.warn(`Failed to trim commands from the IN stream`, ex);
});
this.cmdWriter.xtrim(
REDIS_IRC_POOL_COMMAND_OUT_STREAM, "MAXLEN", "~", STREAM_HISTORY_MAXLEN
).then(trimCount => {
log.debug(`Trimmed ${trimCount} commands from the OUT stream`);
}).catch((ex) => {
log.warn(`Failed to trim commands from the OUT stream`, ex);
});
void this.trimCommandStream();
}, HEARTBEAT_EVERY_MS);


log.info(`Listening for new commands`);
setImmediate(async () => {
while (this.shouldRun) {
const newCmd = await this.cmdReader.xread(
const newCmds = await this.cmdReader.xread(
"BLOCK", 0, "STREAMS", REDIS_IRC_POOL_COMMAND_IN_STREAM, this.commandStreamId
).catch(ex => {
log.warn(`Failed to read new command:`, ex);
return null;
});
if (newCmd === null) {
if (newCmds === null) {
// Unexpected, this is blocking.
continue;
}
// This is a list of keys, containing a list of commands, hence needing to deeply extract the values.
const [msgId, [cmdType, payload]] = newCmd[0][1][0];

const commandType = cmdType as InCommandType;

// If we crash, we don't want to get stuck on this msg.
await this.updateLastRead(msgId);
const commandData = JSON.parse(payload) as IrcConnectionPoolCommandIn<InCommandType>;
setImmediate(
() => this.handleCommand(commandType, commandData)
.catch(ex => log.warn(`Failed to handle msg ${msgId} (${commandType}, ${payload})`, ex)
),
);
for (const [msgId, [cmdType, payload]] of newCmds[0][1]) {
const commandType = cmdType as InCommandType;

// If we crash, we don't want to get stuck on this msg.
await this.updateLastRead(msgId);
const commandData = JSON.parse(payload) as IrcConnectionPoolCommandIn<InCommandType>;
setImmediate(
() => this.handleCommand(commandType, commandData)
.catch(ex => log.warn(`Failed to handle msg ${msgId} (${commandType}, ${payload})`, ex)
),
);
}
}
});
}
Expand Down
63 changes: 45 additions & 18 deletions src/pool-service/IrcPoolClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import { ClientId, ConnectionCreateArgs, HEARTBEAT_EVERY_MS,
PROTOCOL_VERSION,
READ_BUFFER_MAGIC_BYTES,
REDIS_IRC_POOL_COMMAND_IN_STREAM, REDIS_IRC_POOL_COMMAND_OUT_STREAM,
REDIS_IRC_POOL_CONNECTIONS, REDIS_IRC_POOL_HEARTBEAT_KEY, REDIS_IRC_POOL_VERSION_KEY } from "./types";
REDIS_IRC_POOL_CONNECTIONS, REDIS_IRC_POOL_HEARTBEAT_KEY, REDIS_IRC_POOL_VERSION_KEY,
REDIS_IRC_POOL_COMMAND_OUT_STREAM_LAST_READ } from "./types";

import { Logger } from 'matrix-appservice-bridge';
import { EventEmitter } from "stream";
Expand All @@ -18,6 +19,7 @@ const log = new Logger('IrcPoolClient');

const CONNECTION_TIMEOUT = 40000;
const MAX_MISSED_HEARTBEATS = 5;
const COMMAND_BLOCK_TIMEOUT = HEARTBEAT_EVERY_MS * 2;

type Events = {
lostConnection: () => void,
Expand Down Expand Up @@ -45,6 +47,13 @@ export class IrcPoolClient extends (EventEmitter as unknown as new () => TypedEm
});
}

public updateLastRead(msgId: string) {
this.commandStreamId = msgId;
this.redis.set(REDIS_IRC_POOL_COMMAND_OUT_STREAM_LAST_READ, msgId).catch((ex) => {
log.error(`Failed to update last-read to ${msgId}`, ex);
})
}

public async sendCommand<T extends InCommandType>(type: T, payload: InCommandPayload[T]) {
await this.redis.xadd(REDIS_IRC_POOL_COMMAND_IN_STREAM, "*", type, JSON.stringify({
origin_ts: Date.now(),
Expand Down Expand Up @@ -200,38 +209,56 @@ export class IrcPoolClient extends (EventEmitter as unknown as new () => TypedEm
}

public async handleIncomingCommand() {
const newCmd = await this.cmdReader.xread(
"BLOCK", 0, "STREAMS", REDIS_IRC_POOL_COMMAND_OUT_STREAM, this.commandStreamId
const newCmds = await this.cmdReader.xread(
"BLOCK", COMMAND_BLOCK_TIMEOUT, "STREAMS", REDIS_IRC_POOL_COMMAND_OUT_STREAM, this.commandStreamId
).catch(ex => {
log.warn(`Failed to read new command:`, ex);
return null;
});
if (newCmd === null) {
// Unexpected, this is blocking.
if (newCmds === null) {
// Implies we might be listening for stale messages.
this.commandStreamId = '$';
return;
}
const [msgId, [cmdType, payload]] = newCmd[0][1][0];
this.commandStreamId = msgId;
for (const [msgId, [cmdType, payload]] of newCmds[0][1]) {
const commandType = cmdType as OutCommandType|ClientId;
let commandData: IrcConnectionPoolCommandOut|Buffer;
if (typeof payload === 'string' && payload[0] === '{') {
commandData = JSON.parse(payload) as IrcConnectionPoolCommandOut;
}
else {
commandData = Buffer.from(payload).subarray(READ_BUFFER_MAGIC_BYTES.length);
}
setImmediate(
() => this.handleCommand(commandType, commandData)
.catch(ex => log.warn(`Failed to handle msg ${msgId} (${commandType}, ${payload})`, ex)
),
);
this.updateLastRead(msgId);
}
}

const commandType = cmdType as OutCommandType|ClientId;
let commandData: IrcConnectionPoolCommandOut|Buffer;
if (typeof payload === 'string' && payload[0] === '{') {
commandData = JSON.parse(payload) as IrcConnectionPoolCommandOut;
private async trimCommandStream() {
if (this.commandStreamId === '$') {
// At the head of the queue, don't trim.
return;
}
else {
commandData = Buffer.from(payload).subarray(READ_BUFFER_MAGIC_BYTES.length);
try {
const trimCount = await this.redis.xtrim(
REDIS_IRC_POOL_COMMAND_OUT_STREAM, "MINID", this.commandStreamId
);
log.debug(`Trimmed ${trimCount} commands from the OUT stream`);
}
catch (ex) {
log.warn(`Failed to trim commands from the OUT stream`, ex);
}
setImmediate(
() => this.handleCommand(commandType, commandData)
.catch(ex => log.warn(`Failed to handle msg ${msgId} (${commandType}, ${payload})`, ex)
),
);
}

private async checkHeartbeat() {
const lastHeartbeat = parseInt(await this.redis.get(REDIS_IRC_POOL_HEARTBEAT_KEY) ?? '0');
if (lastHeartbeat + HEARTBEAT_EVERY_MS + 1000 > Date.now()) {
this.missedHeartbeats = 0;
void this.trimCommandStream();
return;
}

Expand Down
3 changes: 2 additions & 1 deletion src/pool-service/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ export const REDIS_IRC_POOL_VERSION_KEY = "ircbridge.poolversion";
export const REDIS_IRC_POOL_HEARTBEAT_KEY = "ircbridge.pool.💓";
export const REDIS_IRC_POOL_COMMAND_OUT_STREAM = "ircbridge.stream.command.out";
export const REDIS_IRC_POOL_COMMAND_IN_STREAM = "ircbridge.stream.command.in";
export const REDIS_IRC_POOL_COMMAND_IN_STREAM_LAST_READ = "ircbridge.stream.command.last-read";

export const REDIS_IRC_POOL_COMMAND_OUT_STREAM_LAST_READ = "ircbridge.stream.out.command.last-read";

export const REDIS_IRC_POOL_CONNECTIONS = "ircbridge.connections";
export const REDIS_IRC_CLIENT_STATE_KEY = `ircbridge.clientstate`; //client-id
Expand Down

0 comments on commit 61af8b4

Please sign in to comment.