From 26c66219c0eef146fa3990b16402a811877e54d6 Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Wed, 5 Oct 2022 18:30:38 +0100 Subject: [PATCH 1/2] Experiments with postgres libraray --- src/datastore/postgres/PgDataStore.ts | 78 ++------------------------ src/datastore/postgres/schema/index.ts | 6 ++ src/datastore/postgres/schema/v1.ts | 13 +++-- src/datastore/postgres/schema/v2.ts | 15 ++++- src/datastore/postgres/schema/v3.ts | 14 ++--- src/datastore/postgres/schema/v4.ts | 14 ++--- src/datastore/postgres/schema/v5.ts | 12 ++-- src/datastore/postgres/schema/v6.ts | 12 ++-- src/datastore/postgres/schema/v7.ts | 13 +++-- src/datastore/postgres/schema/v8.ts | 58 ++++++++----------- 10 files changed, 88 insertions(+), 147 deletions(-) create mode 100644 src/datastore/postgres/schema/index.ts diff --git a/src/datastore/postgres/PgDataStore.ts b/src/datastore/postgres/PgDataStore.ts index a9b7f075e..1985b8d95 100644 --- a/src/datastore/postgres/PgDataStore.ts +++ b/src/datastore/postgres/PgDataStore.ts @@ -13,9 +13,6 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ - -import { Pool } from "pg"; - import { MatrixUser, MatrixRoom, @@ -24,6 +21,7 @@ import { MatrixRoomData, UserActivitySet, UserActivity, + PostgresStore, } from "matrix-appservice-bridge"; import { DataStore, RoomOrigin, ChannelMappings, UserFeatures } from "../DataStore"; import { MatrixDirectoryVisibility } from "../../bridge/IrcHandler"; @@ -37,6 +35,7 @@ import { StringCrypto } from "../StringCrypto"; import { toIrcLowerCase } from "../../irc/formatting"; import { NeDBDataStore } from "../NedbDataStore"; import QuickLRU from "quick-lru"; +import schemas from './schema'; const log = getLogger("PgDatastore"); @@ -52,37 +51,24 @@ interface RoomRecord { origin: RoomOrigin; } -export class PgDataStore implements DataStore { +export class PgDataStore extends PostgresStore implements DataStore { private serverMappings: {[domain: string]: IrcServer} = {}; public static readonly LATEST_SCHEMA = 8; - private pgPool: Pool; - private hasEnded = false; private cryptoStore?: StringCrypto; private userFeatureCache = new QuickLRU({ maxSize: FEATURE_CACHE_SIZE, }); - constructor(private bridgeDomain: string, connectionString: string, pkeyPath?: string, min = 1, max = 4) { - this.pgPool = new Pool({ - connectionString, - min, + constructor(private bridgeDomain: string, connectionString: string, pkeyPath?: string, max = 4) { + super(schemas, { + url: connectionString, max, }); - this.pgPool.on("error", (err) => { - log.error("Postgres Error: %s", err); - }); if (pkeyPath) { this.cryptoStore = new StringCrypto(); this.cryptoStore.load(pkeyPath); } - process.on("beforeExit", () => { - if (this.hasEnded) { - return; - } - // Ensure we clean up on exit - this.pgPool.end(); - }) } public async setServerFromConfig(server: IrcServer, serverConfig: IrcServerConfig): Promise { @@ -689,63 +675,11 @@ export class PgDataStore implements DataStore { await this.pgPool.query("INSERT INTO deactivated_users VALUES ($1, $2)", [userId, Date.now()]); } - public async ensureSchema() { - log.info("Starting postgres database engine"); - let currentVersion = await this.getSchemaVersion(); - while (currentVersion < PgDataStore.LATEST_SCHEMA) { - log.info(`Updating schema to v${currentVersion + 1}`); - // eslint-disable-next-line @typescript-eslint/no-var-requires - const runSchema = require(`./schema/v${currentVersion + 1}`).runSchema; - try { - await runSchema(this.pgPool); - currentVersion++; - await this.updateSchemaVersion(currentVersion); - } - catch (ex) { - log.warn(`Failed to run schema v${currentVersion + 1}:`, ex); - throw Error("Failed to update database schema"); - } - } - log.info(`Database schema is at version v${currentVersion}`); - } - public async getRoomCount(): Promise { const res = await this.pgPool.query(`SELECT COUNT(*) FROM rooms`); return res.rows[0]; } - public async destroy() { - log.info("Destroy called"); - if (this.hasEnded) { - // No-op if end has already been called. - return; - } - this.hasEnded = true; - await this.pgPool.end(); - log.info("PostgresSQL connection ended"); - // This will no-op - } - - private async updateSchemaVersion(version: number) { - log.debug(`updateSchemaVersion: ${version}`); - await this.pgPool.query("UPDATE schema SET version = $1;", [version]); - } - - private async getSchemaVersion(): Promise { - try { - const { rows } = await this.pgPool.query("SELECT version FROM SCHEMA"); - return rows[0].version; - } - catch (ex) { - if (ex.code === "42P01") { // undefined_table - log.warn("Schema table could not be found"); - return 0; - } - log.error("Failed to get schema version: %s", ex); - } - throw Error("Couldn't fetch schema version"); - } - private static BuildUpsertStatement(table: string, constraint: string, keyNames: string[]): string { const keys = keyNames.join(", "); const keysValues = `\$${keyNames.map((k, i) => i + 1).join(", $")}`; diff --git a/src/datastore/postgres/schema/index.ts b/src/datastore/postgres/schema/index.ts new file mode 100644 index 000000000..cdb427185 --- /dev/null +++ b/src/datastore/postgres/schema/index.ts @@ -0,0 +1,6 @@ +import { SchemaUpdateFunction } from "matrix-appservice-bridge"; +import v1 from './v1'; + +export default [ + v1, +] as SchemaUpdateFunction[]; diff --git a/src/datastore/postgres/schema/v1.ts b/src/datastore/postgres/schema/v1.ts index 8e2b11a7a..04abbf86f 100644 --- a/src/datastore/postgres/schema/v1.ts +++ b/src/datastore/postgres/schema/v1.ts @@ -1,8 +1,8 @@ -import { PoolClient } from "pg"; +import { SchemaUpdateFunction } from 'matrix-appservice-bridge'; -export async function runSchema(connection: PoolClient) { +const updateFn: SchemaUpdateFunction = async (sql) => { // Create schema - await connection.query(` + await sql` CREATE TABLE schema ( version INTEGER UNIQUE NOT NULL ); @@ -59,5 +59,8 @@ export async function runSchema(connection: PoolClient) { count INTEGER ); - INSERT INTO ipv6_counter VALUES (0);`); -} + INSERT INTO ipv6_counter VALUES (0);`; + +}; + +export default updateFn; diff --git a/src/datastore/postgres/schema/v2.ts b/src/datastore/postgres/schema/v2.ts index fa91adf12..68b36d92b 100644 --- a/src/datastore/postgres/schema/v2.ts +++ b/src/datastore/postgres/schema/v2.ts @@ -3,9 +3,18 @@ import { PoolClient } from "pg"; export async function runSchema(connection: PoolClient) { // Create schema await connection.query(` + `); +} +import { SchemaUpdateFunction } from 'matrix-appservice-bridge'; + +const updateFn: SchemaUpdateFunction = async (sql) => { + // Create schema + await sql` CREATE TABLE last_seen ( user_id TEXT UNIQUE NOT NULL, ts BIGINT NOT NULL - ); - `); -} + );`; + +}; + +export default updateFn; diff --git a/src/datastore/postgres/schema/v3.ts b/src/datastore/postgres/schema/v3.ts index 8c9f455d1..c7bd429db 100644 --- a/src/datastore/postgres/schema/v3.ts +++ b/src/datastore/postgres/schema/v3.ts @@ -1,11 +1,11 @@ -import { PoolClient } from "pg"; +import { SchemaUpdateFunction } from 'matrix-appservice-bridge'; -export async function runSchema(connection: PoolClient) { - // Create schema - await connection.query(` +const updateFn: SchemaUpdateFunction = async (sql) => { + await sql` CREATE TABLE room_visibility ( room_id TEXT UNIQUE NOT NULL, visibility BOOLEAN NOT NULL - ); - `); -} + );` +}; + +export default updateFn; diff --git a/src/datastore/postgres/schema/v4.ts b/src/datastore/postgres/schema/v4.ts index 2fb991684..41e5f64df 100644 --- a/src/datastore/postgres/schema/v4.ts +++ b/src/datastore/postgres/schema/v4.ts @@ -1,11 +1,11 @@ -import { PoolClient } from "pg"; +import { SchemaUpdateFunction } from 'matrix-appservice-bridge'; -export async function runSchema(connection: PoolClient) { - // Create schema - await connection.query(` +const updateFn: SchemaUpdateFunction = async (sql) => { + await sql` CREATE TABLE deactivated_users ( user_id TEXT UNIQUE NOT NULL, ts BIGINT NOT NULL - ); - `); -} + );` +}; + +export default updateFn; diff --git a/src/datastore/postgres/schema/v5.ts b/src/datastore/postgres/schema/v5.ts index 8f6b92657..420187a99 100644 --- a/src/datastore/postgres/schema/v5.ts +++ b/src/datastore/postgres/schema/v5.ts @@ -1,7 +1,7 @@ -import { PoolClient } from "pg"; +import { SchemaUpdateFunction } from 'matrix-appservice-bridge'; -export async function runSchema(connection: PoolClient) { - await connection.query(` - CREATE INDEX client_config_domain_username_idx ON client_config (domain, (config->>'username')); - `); -} +const updateFn: SchemaUpdateFunction = async (sql) => { + await sql`CREATE INDEX client_config_domain_username_idx ON client_config (domain, (config->>'username'));` +}; + +export default updateFn; diff --git a/src/datastore/postgres/schema/v6.ts b/src/datastore/postgres/schema/v6.ts index cfc5dcc21..c37b1ba87 100644 --- a/src/datastore/postgres/schema/v6.ts +++ b/src/datastore/postgres/schema/v6.ts @@ -1,7 +1,7 @@ -import { PoolClient } from "pg"; +import { SchemaUpdateFunction } from 'matrix-appservice-bridge'; -export async function runSchema(connection: PoolClient) { - await connection.query(` - DROP INDEX client_config_domain_username_idx; - `); -} +const updateFn: SchemaUpdateFunction = async (sql) => { + await sql`DROP INDEX client_config_domain_username_idx;` +}; + +export default updateFn; diff --git a/src/datastore/postgres/schema/v7.ts b/src/datastore/postgres/schema/v7.ts index 6a7a834e2..a7184774a 100644 --- a/src/datastore/postgres/schema/v7.ts +++ b/src/datastore/postgres/schema/v7.ts @@ -1,10 +1,11 @@ -import { PoolClient } from "pg"; +import { SchemaUpdateFunction } from 'matrix-appservice-bridge'; -export async function runSchema(connection: PoolClient) { - await connection.query(` +const updateFn: SchemaUpdateFunction = async (sql) => { + await sql` CREATE TABLE user_activity ( user_id TEXT UNIQUE, data JSON - ); - `); -} + );` +}; + +export default updateFn; diff --git a/src/datastore/postgres/schema/v8.ts b/src/datastore/postgres/schema/v8.ts index 453b80732..35ffe45f3 100644 --- a/src/datastore/postgres/schema/v8.ts +++ b/src/datastore/postgres/schema/v8.ts @@ -1,59 +1,47 @@ -import { PoolClient } from "pg"; -import { Logging } from "matrix-appservice-bridge"; +import { Logger, SchemaUpdateFunction } from "matrix-appservice-bridge"; -const log = Logging.get('postgres/schema/v8'); +const log = new Logger('postgres/schema/v8'); -function domainSetToValues(domains: string[], count: number): [string, Array] { - let res = ""; - const values: Array = []; - // VALUES ... - for (let index = 0; index < domains.length; index++) { - const i = index * 3; - if (res) { - res += ", "; - } - res += `($${i+1}, $${i+2}, $${i+3})`; - values.push(count, "*", domains[index]); - } - return [res, values]; -} - -export async function runSchema(connection: PoolClient) { - - await connection.query(` +const updateFn: SchemaUpdateFunction = async (sql) => { + await sql` ALTER TABLE ipv6_counter ADD COLUMN homeserver TEXT, ADD COLUMN server TEXT; ALTER TABLE ipv6_counter ADD CONSTRAINT cons_ipv6_counter_unique UNIQUE(homeserver, server); - `); - + `; // Migrate data. - const existingCounterRes = await connection.query<{count: string}>("SELECT count FROM ipv6_counter;"); - const existingCounter = existingCounterRes && parseInt(existingCounterRes.rows[0].count, 10); + const existingCounterRes = await sql<{count: string}[]>`SELECT count FROM ipv6_counter;`; + const existingCounter = existingCounterRes && parseInt(existingCounterRes?.[0].count, 10); // If we have a counter value if (existingCounter) { - const serverConfigsRes = await connection.query<{domain: string}>( - "SELECT DISTINCT domain FROM client_config WHERE config->>'ipv6' IS NOT NULL;" - ); - if (serverConfigsRes.rowCount === 0) { + const serverConfigsRes = await sql<{domain: string}[]>` + SELECT DISTINCT domain FROM client_config WHERE config->>'ipv6' IS NOT NULL; + `; + if (serverConfigsRes?.length === 0) { // No servers to migrate? throw Error("No client_configs found with ipv6 addresses, but counter was found"); } - else if (serverConfigsRes.rowCount > 1) { + else if (serverConfigsRes?.length > 1) { log.warn("More than one IPv6 server configured, starting both ipv6 counters from the same value."); } // Because we cannot determine which IRC network(s) are using the existing counter // (owing to a bug where we treated the counter as global across all networks), this assumes // that both networks start from the same counter value. - const [statement, values] = domainSetToValues(serverConfigsRes.rows.map(d => d.domain), existingCounter); - await connection.query(`INSERT INTO ipv6_counter (count, homeserver, server) VALUES ${statement}`, values); + const domains = serverConfigsRes.flatMap(r => ({ + count: existingCounter, + domain: r.domain, + homeserver: '*' + })); + await sql`INSERT INTO ipv6_counter ${sql(domains, 'count', 'domain', 'homeserver')})`; } - await connection.query(` + await sql` DELETE FROM ipv6_counter WHERE server IS NULL; ALTER TABLE ipv6_counter ALTER COLUMN server SET NOT NULL; - `); -} + `; +}; + +export default updateFn; From 0cd75fbbe525b9677dfda96ef6550fdb7d725642 Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Thu, 6 Oct 2022 10:40:41 +0100 Subject: [PATCH 2/2] Rewrite PgDatastore --- src/datastore/postgres/PgDataStore.ts | 416 +++++++++++-------------- src/datastore/postgres/schema/index.ts | 14 + 2 files changed, 191 insertions(+), 239 deletions(-) diff --git a/src/datastore/postgres/PgDataStore.ts b/src/datastore/postgres/PgDataStore.ts index 1985b8d95..5c19c36bf 100644 --- a/src/datastore/postgres/PgDataStore.ts +++ b/src/datastore/postgres/PgDataStore.ts @@ -30,7 +30,6 @@ import { IrcClientConfig } from "../../models/IrcClientConfig"; import { IrcServer, IrcServerConfig } from "../../irc/IrcServer"; import { getLogger } from "../../logging"; -import Bluebird from "bluebird"; import { StringCrypto } from "../StringCrypto"; import { toIrcLowerCase } from "../../irc/formatting"; import { NeDBDataStore } from "../NedbDataStore"; @@ -53,8 +52,6 @@ interface RoomRecord { export class PgDataStore extends PostgresStore implements DataStore { private serverMappings: {[domain: string]: IrcServer} = {}; - - public static readonly LATEST_SCHEMA = 8; private cryptoStore?: StringCrypto; private userFeatureCache = new QuickLRU({ maxSize: FEATURE_CACHE_SIZE, @@ -130,9 +127,9 @@ export class PgDataStore extends PostgresStore implements DataStore { irc_json: ircJson, matrix_json: matrixJson, }; - const statement = PgDataStore.BuildUpsertStatement("rooms", - "ON CONSTRAINT cons_rooms_unique", Object.keys(parameters)); - await this.pgPool.query(statement, Object.values(parameters)); + await this.sql`INSERT INTO rooms ${this.sql(parameters)} + ON CONFLICT cons_rooms_unique + DO UPDATE SET ${this.sql(parameters)}`; } private static pgToRoomEntry(pgEntry: RoomRecord): Entry { @@ -158,23 +155,26 @@ export class PgDataStore extends PostgresStore implements DataStore { ircChannel: string, origin?: RoomOrigin ): Promise { - let statement = "SELECT * FROM rooms WHERE room_id = $1 AND irc_domain = $2 AND irc_channel = $3"; - let params = [roomId, ircDomain, ircChannel]; + let statement = `SELECT * FROM rooms + WHERE room_id = ${roomId} + AND irc_domain = ${ircDomain} + AND irc_channel = ${ircChannel}`; if (origin) { - statement += " AND origin = $4"; - params = params.concat(origin); + statement += ` AND origin = ${origin}`; } - const pgEntry = await this.pgPool.query(statement, params); - if (!pgEntry.rowCount) { + const pgEntry = await this.sql`${statement}`; + if (!pgEntry?.length) { return null; } - return PgDataStore.pgToRoomEntry(pgEntry.rows[0]); + return PgDataStore.pgToRoomEntry(pgEntry[0]); } public async getAllChannelMappings(): Promise { - const entries = (await this.pgPool.query( - "SELECT irc_domain, room_id, irc_channel FROM rooms WHERE type = 'channel'" - )).rows; + const entries = this.sql<{ + irc_domain: string, + room_id: string, + irc_channel: string + }[]>`SELECT irc_domain, room_id, irc_channel FROM rooms WHERE type = 'channel'`; const mappings: ChannelMappings = {}; const validDomains = Object.keys(this.serverMappings); @@ -198,37 +198,39 @@ export class PgDataStore extends PostgresStore implements DataStore { return mappings; } - public getEntriesByMatrixId(roomId: string): Bluebird { - return Bluebird.cast(this.pgPool.query("SELECT * FROM rooms WHERE room_id = $1", [ - roomId - ])).then((result) => result.rows).map((e) => PgDataStore.pgToRoomEntry(e)); + public async getEntriesByMatrixId(roomId: string): Promise { + const entries = await this.sql`SELECT * FROM rooms WHERE room_id = ${roomId}`; + return entries.flatMap((e) => PgDataStore.pgToRoomEntry(e)); } public async getProvisionedMappings(roomId: string): Promise { - const res = await this.pgPool.query("SELECT * FROM rooms WHERE room_id = $1 AND origin = 'provision'", [ - roomId - ]).then((result) => result.rows); + const res = await this.sql` + SELECT * + FROM rooms + WHERE room_id = ${roomId} + AND origin = 'provision'`; return res.map((e) => PgDataStore.pgToRoomEntry(e)); } public async removeRoom(roomId: string, ircDomain: string, ircChannel: string, origin?: RoomOrigin): Promise { - let statement = "DELETE FROM rooms WHERE room_id = $1 AND irc_domain = $2 AND irc_channel = $3"; - let params = [roomId, ircDomain, ircChannel]; + let statement = `DELETE FROM rooms + WHERE room_id = ${roomId} + AND irc_domain = ${ircDomain} + AND irc_channel = ${ircChannel}`; if (origin) { - statement += " AND origin = $4"; - params = params.concat(origin); + statement += ` AND origin = ${origin}`; } - await this.pgPool.query(statement, params); + await this.sql`${statement}`; } public async getIrcChannelsForRoomId(roomId: string): Promise { - let entries = await this.pgPool.query("SELECT irc_domain, irc_channel FROM rooms WHERE room_id = $1", [roomId]); - if (entries.rowCount === 0) { + let entries = await this.sql`SELECT irc_domain, irc_channel FROM rooms WHERE room_id = ${roomId}`; + if (entries.length === 0) { // Could be a PM room, if it's not a channel. - entries = await this.pgPool.query("SELECT irc_domain, irc_nick FROM pm_rooms WHERE room_id = $1", [roomId]); + entries = await this.sql`SELECT irc_domain, irc_nick FROM pm_rooms WHERE room_id = ${roomId}`; } const rooms: IrcRoom[] = []; - for (const row of entries.rows) { + for (const row of entries) { const server = this.serverMappings[row.irc_domain]; if (server) { rooms.push(new IrcRoom(server, row.irc_channel || row.irc_nick)); @@ -238,12 +240,9 @@ export class PgDataStore extends PostgresStore implements DataStore { } public async getIrcChannelsForRoomIds(roomIds: string[]): Promise<{ [roomId: string]: IrcRoom[] }> { - const entries = await this.pgPool.query( - "SELECT room_id, irc_domain, irc_channel FROM rooms WHERE room_id IN $1", - [roomIds] - ); + const entries = await this.sql`SELECT room_id, irc_domain, irc_channel FROM rooms WHERE room_id IN ${roomIds}`; const mapping: { [roomId: string]: IrcRoom[] } = {}; - entries.rows.forEach((e) => { + entries.forEach((e) => { const server = this.serverMappings[e.irc_domain]; if (!server) { // ! is used here because typescript doesn't understand the .filter @@ -258,14 +257,11 @@ export class PgDataStore extends PostgresStore implements DataStore { } public async getMatrixRoomsForChannel(server: IrcServer, channel: string): Promise { - const entries = await this.pgPool.query( - "SELECT room_id, matrix_json FROM rooms WHERE irc_domain = $1 AND irc_channel = $2", - [ - server.domain, - // Channels must be lowercase - toIrcLowerCase(channel), - ]); - return entries.rows.map((e) => new MatrixRoom(e.room_id, e.matrix_json)); + const entries = await this.sql`SELECT room_id, matrix_json + FROM rooms + WHERE irc_domain = ${server.domain} + AND irc_channel = ${toIrcLowerCase(channel)}`; + return entries.map((e) => new MatrixRoom(e.room_id, e.matrix_json)); } public async getMappingsForChannelByOrigin( @@ -276,29 +272,22 @@ export class PgDataStore extends PostgresStore implements DataStore { if (!Array.isArray(origin)) { origin = [origin]; } - const inStatement = origin.map((_, i) => `\$${i + 3}`).join(", "); - const entries = await this.pgPool.query( - `SELECT * FROM rooms WHERE irc_domain = $1 AND irc_channel = $2 AND origin IN (${inStatement})`, - [ - server.domain, - // Channels must be lowercase - toIrcLowerCase(channel), - ].concat(origin)); - return entries.rows.map((e) => PgDataStore.pgToRoomEntry(e)); + const entries = await this.sql`SELECT * + FROM rooms + WHERE irc_domain = ${server.domain} + AND irc_channel = ${toIrcLowerCase(channel)} + AND origin IN (${origin})`; + return entries.map((e) => PgDataStore.pgToRoomEntry(e)); } public async getModesForChannel(server: IrcServer, channel: string): Promise<{ [id: string]: string[] }> { log.debug(`Getting modes for ${server.domain} ${channel}`); const mapping: {[id: string]: string[]} = {}; - const entries = await this.pgPool.query( - "SELECT room_id, irc_json->>'modes' AS modes FROM rooms " + - "WHERE irc_domain = $1 AND irc_channel = $2", - [ - server.domain, - // Channels must be lowercase - toIrcLowerCase(channel), - ]); - entries.rows.forEach((e) => { + const entries = await this.sql`SELECT room_id, irc_json->>'modes' AS modes + FROM rooms + WHERE irc_domain = ${server.domain} + AND irc_channel = ${toIrcLowerCase(channel)}`; + entries.forEach((e) => { mapping[e.room_id] = e.modes || []; }); return mapping; @@ -332,15 +321,13 @@ export class PgDataStore extends PostgresStore implements DataStore { delete ircRoomSerial.domain; delete ircRoomSerial.channel; delete ircRoomSerial.type; - await this.pgPool.query( - "UPDATE rooms SET irc_json = $4 WHERE room_id = $1 AND irc_channel = $2 AND irc_domain = $3", - [ - roomId, - entry.remote.get("channel"), - entry.remote.get("domain"), - JSON.stringify(ircRoomSerial), - ] - ); + const channel = entry.remote.get("channel"); + const domain = entry.remote.get("domain"); + await this.sql`UPDATE rooms + WHERE room_id = ${roomId} + SET irc_json = ${JSON.stringify(ircRoomSerial)} + AND irc_channel = ${channel} + AND irc_domain = ${domain}`; } } @@ -349,52 +336,38 @@ export class PgDataStore extends PostgresStore implements DataStore { ): Promise { log.debug(`setPmRoom (matrix_user_id=${userId}, virtual_user_id=${virtualUserId}, ` + `room_id=${matrixRoom.getId()}, irc_nick=${ircRoom.getChannel()})`); - await this.pgPool.query( - PgDataStore.BuildUpsertStatement("pm_rooms", "ON CONSTRAINT cons_pm_rooms_matrix_irc_unique", [ - "room_id", - "irc_domain", - "irc_nick", - "matrix_user_id", - "virtual_user_id", - ]), [ - matrixRoom.getId(), - ircRoom.getDomain(), - ircRoom.getChannel(), - userId, - virtualUserId, - ]); + const data = { + room_id: ircRoom.roomId, + irc_domain: ircRoom.getDomain(), + irc_nick: ircRoom.getChannel(), + matrix_user_id: userId, + virtual_user_id: virtualUserId, + }; + await this.sql`INSERT INTO pm_rooms ${this.sql(data)} + ON CONSTRAINT cons_pm_rooms_matrix_irc_unique + DO UPDATE SET ${this.sql(data)}`; } public async removePmRoom(roomId: string): Promise { log.debug(`removePmRoom (room_id=${roomId}`); - await this.pgPool.query("DELETE FROM pm_rooms WHERE room_id = $1", [roomId]); + await this.sql`DELETE FROM pm_rooms WHERE room_id = ${roomId}`; } public async getMatrixPmRoom(realUserId: string, virtualUserId: string): Promise { log.debug(`getMatrixPmRoom (matrix_user_id=${realUserId}, virtual_user_id=${virtualUserId})`); - const res = await this.pgPool.query( - "SELECT room_id FROM pm_rooms WHERE matrix_user_id = $1 AND virtual_user_id = $2", - [ - realUserId, - virtualUserId, - ] - ); - if (res.rowCount === 0) { - return null; - } - return new MatrixRoom(res.rows[0].room_id); + const res = await this.sql`SELECT room_id + FROM pm_rooms + WHERE matrix_user_id = ${realUserId} + AND virtual_user_id = ${virtualUserId}`; + return res?.[0] ? new MatrixRoom(res[0].room_id) : null; } public async getMatrixPmRoomById(roomId: string): Promise { log.debug(`getMatrixPmRoom (roomId=${roomId})`); - const res = await this.pgPool.query( - "SELECT room_id, matrix_user_id, virtual_user_id FROM pm_rooms WHERE room_id = $1", [ - roomId, - ]); - if (res.rowCount === 0) { - return null; - } - return new MatrixRoom(res.rows[0].room_id); + const res = await this.sql`SELECT room_id, matrix_user_id, virtual_user_id + FROM pm_rooms + WHERE room_id = ${roomId}`; + return res?.[0] ? new MatrixRoom(res[0].room_id) : null; } public async getTrackedChannelsForServer(domain: string): Promise { @@ -403,79 +376,61 @@ export class PgDataStore extends PostgresStore implements DataStore { return []; } log.info(`Fetching all channels for ${domain}`); - const chanSet = await this.pgPool.query( - "SELECT DISTINCT irc_channel FROM rooms WHERE irc_domain = $1", [domain]); - return chanSet.rows.map((e) => e.irc_channel as string); + const chanSet = await this.sql`SELECT DISTINCT irc_channel FROM rooms WHERE irc_domain = ${domain}`; + return chanSet.map((e) => e.irc_channel as string); } public async getRoomIdsFromConfig(): Promise { return ( - await this.pgPool.query("SELECT room_id FROM rooms WHERE origin = 'config'") - ).rows.map((e) => e.room_id); + await this.sql`SELECT room_id FROM rooms WHERE origin = 'config'` + ).map((e) => e.room_id); } public async removeConfigMappings(): Promise { - await this.pgPool.query("DELETE FROM rooms WHERE origin = 'config'"); + await this.sql`DELETE FROM rooms WHERE origin = 'config'`; } public async getIpv6Counter(server: IrcServer, homeserver: string|null): Promise { homeserver = homeserver || "*"; - const res = await this.pgPool.query( - "SELECT count FROM ipv6_counter WHERE server = $1 AND homeserver = $2", - [server.domain, homeserver] - ); - return res.rows[0]?.count !== undefined ? parseInt(res.rows[0].count, 10) : 0; - } - - public async setIpv6Counter(counter: number, server: IrcServer, homeserver: string|null): Promise { - await this.pgPool.query( - PgDataStore.BuildUpsertStatement( - "ipv6_counter", - "ON CONSTRAINT cons_ipv6_counter_unique", [ - "count", - "homeserver", - "server" - ], - ), - [counter, homeserver || "*", server.domain], - ); + const res = await this.sql`SELECT count + FROM ipv6_counter + WHERE server = ${server.domain} + AND homeserver = ${homeserver}`; + return parseInt(res?.[0]?.count, 10) || 0; + } + + public async setIpv6Counter(count: number, server: IrcServer, homeserver: string|null): Promise { + const data = {count, homeserver: homeserver || "*", server: server.domain}; + await this.sql`INSERT INTO ipv6_counter ${this.sql(data)} + ON CONSTRAINT cons_ipv6_counter_unique + DO UPDATE SET count = ${count}`; } public async upsertMatrixRoom(room: MatrixRoom): Promise { // XXX: This is an upsert operation, but we don't have enough details to go on // so this will just update a rooms data entry. We only use this call to update // topics on an existing room. - await this.pgPool.query("UPDATE rooms SET matrix_json = $1 WHERE room_id = $2", [ - JSON.stringify(room.serialize()), - room.getId(), - ]); + await this.sql`UPDATE rooms + SET matrix_json = ${JSON.stringify(room.serialize())} + WHERE room_id = ${room.getId()}`; } public async getAdminRoomById(roomId: string): Promise { - const res = await this.pgPool.query("SELECT room_id FROM admin_rooms WHERE room_id = $1", [roomId]); - if (res.rowCount === 0) { - return null; - } - return new MatrixRoom(roomId); + const res = await this.sql`SELECT room_id FROM admin_rooms WHERE room_id = ${roomId}`; + return res?.[0] ? new MatrixRoom(res?.[0].room_id) : null; } public async storeAdminRoom(room: MatrixRoom, userId: string): Promise { - await this.pgPool.query(PgDataStore.BuildUpsertStatement("admin_rooms", "(room_id)", [ - "room_id", - "user_id", - ]), [room.getId(), userId]); + await this.sql`INSERT INTO admin_rooms ${this.sql({room_id: room.getId(), user_id: userId})}`; } public async getAdminRoomByUserId(userId: string): Promise { - const res = await this.pgPool.query("SELECT room_id FROM admin_rooms WHERE user_id = $1", [userId]); - if (res.rowCount === 0) { - return null; - } - return new MatrixRoom(res.rows[0].room_id); + const res = await this.sql`SELECT room_id FROM admin_rooms WHERE user_id = ${userId}`; + return res?.[0] ? new MatrixRoom(res?.[0].room_id) : null; } public async removeAdminRoom(room: MatrixRoom): Promise { - await this.pgPool.query("DELETE FROM admin_rooms WHERE room_id = $1", [room.roomId]); + await this.sql`DELETE FROM admin_rooms WHERE room_id = ${room.roomId}`; } public async storeMatrixUser(matrixUser: MatrixUser): Promise { @@ -483,21 +438,20 @@ export class PgDataStore extends PostgresStore implements DataStore { user_id: matrixUser.getId(), data: JSON.stringify(matrixUser.serialize()), }; - const statement = PgDataStore.BuildUpsertStatement("matrix_users", "(user_id)", Object.keys(parameters)); - await this.pgPool.query(statement, Object.values(parameters)); + await this.sql`INSERT INTO matrix_users ${this.sql(parameters)} + ON CONFLICT (user_id) + DO UPDATE SET data = ${parameters.data}`; } public async getIrcClientConfig(userId: string, domain: string): Promise { - const res = await this.pgPool.query( - "SELECT config, password FROM client_config WHERE user_id = $1 and domain = $2", - [ - userId, - domain - ]); - if (res.rowCount === 0) { + const res = await this.sql`SELECT config, password + FROM client_config + WHERE user_id = ${userId} + AND domain = ${domain}`; + if (!res?.length) { return null; } - const row = res.rows[0]; + const [row] = res; const config = row.config || {}; // This may not be defined. if (row.password && this.cryptoStore) { config.password = this.cryptoStore.decrypt(row.password); @@ -512,7 +466,7 @@ export class PgDataStore extends PostgresStore implements DataStore { } log.debug(`Storing client configuration for ${userId}`); // We need to make sure we have a matrix user in the store. - await this.pgPool.query("INSERT INTO matrix_users VALUES ($1, NULL) ON CONFLICT DO NOTHING", [userId]); + await this.sql`INSERT INTO matrix_users VALUES (${userId}, NULL) ON CONFLICT DO NOTHING`; let password = config.getPassword(); if (password && this.cryptoStore) { password = this.cryptoStore.encrypt(password); @@ -524,20 +478,15 @@ export class PgDataStore extends PostgresStore implements DataStore { password, config: JSON.stringify(config.serialize(true)), }; - const statement = PgDataStore.BuildUpsertStatement( - "client_config", "ON CONSTRAINT cons_client_config_unique", Object.keys(parameters)); - await this.pgPool.query(statement, Object.values(parameters)); + await this.sql`INSERT INTO client_config ${this.sql(parameters)} + ON CONSTRAINT cons_client_config_unique + DO UPDATE SET ${this.sql(parameters)}`; } public async getMatrixUserByLocalpart(localpart: string): Promise { - const res = await this.pgPool.query("SELECT user_id, data FROM matrix_users WHERE user_id = $1", [ - `@${localpart}:${this.bridgeDomain}`, - ]); - if (res.rowCount === 0) { - return null; - } - const row = res.rows[0]; - return new MatrixUser(row.user_id, row.data); + const userId = `@${localpart}:${this.bridgeDomain}`; + const res = await this.sql`SELECT user_id, data FROM matrix_users WHERE user_id = ${userId}`; + return res?.[0] ? new MatrixUser(res[0].user_id, res[0].data) : null; } public async getUserFeatures(userId: string): Promise { @@ -545,36 +494,36 @@ export class PgDataStore extends PostgresStore implements DataStore { if (existing) { return existing; } - const pgRes = await this.pgPool.query("SELECT features FROM user_features WHERE user_id = $1", [userId]); - const features = (pgRes.rows[0] || {}); + const pgRes = await this.sql`SELECT features FROM user_features WHERE user_id = ${userId}`; + const features = (pgRes?.[0]?.features || {}); this.userFeatureCache.set(userId, features); return features; } public async storeUserFeatures(userId: string, features: UserFeatures): Promise { - const statement = PgDataStore.BuildUpsertStatement("user_features", "(user_id)", [ - "user_id", - "features", - ]); - await this.pgPool.query(statement, [userId, JSON.stringify(features)]); + const parameters = { + user_id: userId, + features: JSON.stringify(features) + }; + await this.sql`INSERT INTO user_features ${this.sql(parameters)} + ON CONFLICT (user_id) + DO UPDATE SET features = ${parameters.features}`; } public async getUserActivity(): Promise { - const res = await this.pgPool.query('SELECT * FROM user_activity'); + const res = await this.sql`SELECT * FROM user_activity`; const users: {[mxid: string]: UserActivity} = {}; - for (const row of res.rows) { + for (const row of res) { users[row['user_id']] = row['data']; } return { users }; } public async storeUserActivity(userId: string, activity: UserActivity) { - const stmt = PgDataStore.BuildUpsertStatement( - 'user_activity', - '(user_id)', - ['user_id', 'data'], - ); - await this.pgPool.query(stmt, [userId, JSON.stringify(activity)]); + const data = JSON.stringify(activity); + await this.sql`INSERT INTO user_activity ${this.sql({user_id: userId, data})} + ON CONFLICT (user_id) + DO UPDATE SET data = ${data}`; } public async storePass(userId: string, domain: string, pass: string, encrypt = true): Promise { @@ -590,102 +539,91 @@ export class PgDataStore extends PostgresStore implements DataStore { domain, password, }; - const statement = PgDataStore.BuildUpsertStatement("client_config", - "ON CONSTRAINT cons_client_config_unique", Object.keys(parameters)); - await this.pgPool.query(statement, Object.values(parameters)); + await this.sql`INSERT INTO client_config ${this.sql(parameters)} + ON CONSTRAINT cons_client_config_unique + DO UPDATE SET password = ${password}`; } public async removePass(userId: string, domain: string): Promise { - await this.pgPool.query("UPDATE client_config SET password = NULL WHERE user_id = $1 AND domain = $2", - [userId, domain]); + await this.sql`UPDATE client_config SET password = NULL WHERE user_id = ${userId} AND domain = ${domain}`; } public async getMatrixUserByUsername(domain: string, username: string): Promise { // This will need a join - const res = await this.pgPool.query( - "SELECT client_config.user_id, matrix_users.data FROM client_config, matrix_users " + - "WHERE config->>'username' = $1 AND domain = $2 AND client_config.user_id = matrix_users.user_id", - [username, domain] - ); - if (res.rowCount === 0) { + const res = await this.sql` + SELECT client_config.user_id, matrix_users.data + FROM client_config, matrix_users + WHERE config->>'username' = ${username} + AND domain = ${domain} + AND client_config.user_id = matrix_users.user_id + `; + if (res?.length === 0) { return undefined; } - else if (res.rowCount > 1) { - log.error("getMatrixUserByUsername returned %s results for %s on %s", res.rowCount, username, domain); + else if (res.length > 1) { + log.error("getMatrixUserByUsername returned %s results for %s on %s", res.length, username, domain); } - return new MatrixUser(res.rows[0].user_id, res.rows[0].data); + return new MatrixUser(res[0].user_id, res[0].data); } public async getCountForUsernamePrefix(domain: string, usernamePrefix: string): Promise { - const res = await this.pgPool.query("SELECT COUNT(*) FROM client_config " + - "WHERE domain = $2 AND config->>'username' LIKE $1 || '%'", - [usernamePrefix, domain]); - const count = parseInt(res.rows[0].count, 10); - return count; + const res = await this.sql<{count: number}[]>`SELECT COUNT(*) + FROM client_config + WHERE domain = ${domain} + AND config->>'username' LIKE ${usernamePrefix} || '%'`; + return res[0].count; } public async roomUpgradeOnRoomMigrated(oldRoomId: string, newRoomId: string) { - await this.pgPool.query("UPDATE rooms SET room_id = $1 WHERE room_id = $2", [newRoomId, oldRoomId]); + await this.sql`UPDATE rooms SET room_id = ${newRoomId} WHERE room_id = ${oldRoomId}`; } public async updateLastSeenTimeForUser(userId: string) { - const statement = PgDataStore.BuildUpsertStatement("last_seen", "(user_id)", [ - "user_id", - "ts", - ]); - await this.pgPool.query(statement, [userId, Date.now()]); + const ts = Date.now(); + await this.sql`INSERT INTO last_seen ${this.sql({userId, ts})} + ON CONFLICT (user_id) + DO UPDATE SET ts = ${ts}`; } public async getLastSeenTimeForUsers(): Promise<{ user_id: string; ts: number }[]> { - const res = await this.pgPool.query(`SELECT * FROM last_seen`); - return res.rows; + const res = await this.sql<{ user_id: string; ts: number }[]>`SELECT * FROM last_seen`; + return res; } public async getAllUserIds() { - const res = await this.pgPool.query(`SELECT user_id FROM matrix_users`); - return res.rows.map((u) => u.user_id); + const res = await this.sql`SELECT user_id FROM matrix_users`; + return res.map((u) => u.user_id); } public async getRoomsVisibility(roomIds: string[]): Promise> { const map: Map = new Map(); - const res = await this.pgPool.query("SELECT room_id, visibility FROM room_visibility WHERE room_id IN $1", [ - roomIds, - ]); - for (const row of res.rows) { + const res = await this.sql< + {room_id: string, visibility: MatrixDirectoryVisibility}[] + >`SELECT room_id, visibility FROM room_visibility WHERE room_id IN ${roomIds}`; + for (const row of res) { map.set(row.room_id, row.visibility ? "public" : "private"); } return map; } public async setRoomVisibility(roomId: string, visibility: MatrixDirectoryVisibility) { - const statement = PgDataStore.BuildUpsertStatement("room_visibility", "(room_id)", [ - "room_id", - "visibility", - ]); - await this.pgPool.query(statement, [roomId, visibility === "public"]); + await this.sql`INSERT INTO room_visibility ${this.sql({roomId, visibility})} + ON CONFLICT (room_id) + DO UPDATE SET visibility = ${visibility}`; log.info(`setRoomVisibility ${roomId} => ${visibility}`); } public async isUserDeactivated(userId: string): Promise { - const res = await this.pgPool.query(`SELECT user_id FROM deactivated_users WHERE user_id = $1`, [userId]); - return res.rowCount > 0; + const res = await this.sql`SELECT user_id FROM deactivated_users WHERE user_id = ${userId}`; + return res?.length > 0; } public async deactivateUser(userId: string) { - await this.pgPool.query("INSERT INTO deactivated_users VALUES ($1, $2)", [userId, Date.now()]); + await this.sql`INSERT INTO deactivated_users VALUES (${userId}, ${Date.now()})`; } public async getRoomCount(): Promise { - const res = await this.pgPool.query(`SELECT COUNT(*) FROM rooms`); - return res.rows[0]; - } - - private static BuildUpsertStatement(table: string, constraint: string, keyNames: string[]): string { - const keys = keyNames.join(", "); - const keysValues = `\$${keyNames.map((k, i) => i + 1).join(", $")}`; - const keysSets = keyNames.map((k, i) => `${k} = \$${i + 1}`).join(", "); - const statement = `INSERT INTO ${table} (${keys}) VALUES (${keysValues}) ` + - `ON CONFLICT ${constraint} DO UPDATE SET ${keysSets}`; - return statement; + const res = await this.sql<{count: number}[]>`SELECT COUNT(*) AS count FROM rooms`; + return res?.[0].count; } } diff --git a/src/datastore/postgres/schema/index.ts b/src/datastore/postgres/schema/index.ts index cdb427185..bab9b0138 100644 --- a/src/datastore/postgres/schema/index.ts +++ b/src/datastore/postgres/schema/index.ts @@ -1,6 +1,20 @@ import { SchemaUpdateFunction } from "matrix-appservice-bridge"; import v1 from './v1'; +import v2 from './v2'; +import v3 from './v3'; +import v4 from './v4'; +import v5 from './v5'; +import v6 from './v6'; +import v7 from './v7'; +import v8 from './v8'; export default [ v1, + v2, + v3, + v4, + v5, + v6, + v7, + v8, ] as SchemaUpdateFunction[];