From 6463885bf16514c32e79e190b615edaaa2e4d20b Mon Sep 17 00:00:00 2001 From: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Date: Thu, 8 Aug 2024 21:46:08 +0200 Subject: [PATCH] chore: Optimize hash queries with lookup table (#2933) * Upgrade Postgres schema to add messages_lookup table * Perform optimized query for messageHash-only queries --- .../content_script_version_7.nim | 30 ++++ .../pg_migration_manager.nim | 4 +- .../test_driver_postgres_query.nim | 29 ++++ .../test_driver_postgres_query.nim | 33 +++++ .../driver/postgres_driver/migrations.nim | 2 +- .../postgres_driver/partitions_manager.nim | 5 +- .../postgres_driver/postgres_driver.nim | 133 +++++++++++++++--- .../postgres_driver/postgres_driver.nim | 99 +++++++++++-- 8 files changed, 298 insertions(+), 37 deletions(-) create mode 100644 migrations/message_store_postgres/content_script_version_7.nim diff --git a/migrations/message_store_postgres/content_script_version_7.nim b/migrations/message_store_postgres/content_script_version_7.nim new file mode 100644 index 0000000000..01d7ad84e1 --- /dev/null +++ b/migrations/message_store_postgres/content_script_version_7.nim @@ -0,0 +1,30 @@ +const ContentScriptVersion_7* = + """ + +-- Create lookup table +CREATE TABLE IF NOT EXISTS messages_lookup ( + timestamp BIGINT NOT NULL, + messageHash VARCHAR NOT NULL + ); + +-- Put data into lookup table +INSERT INTO messages_lookup (messageHash, timestamp) SELECT messageHash, timestamp from messages; + +ALTER TABLE messages_lookup ADD CONSTRAINT messageIndexLookupTable PRIMARY KEY (messageHash, timestamp); + +-- Create indexes +CREATE INDEX IF NOT EXISTS idx_messages_messagehash ON messages (messagehash); +CREATE INDEX IF NOT EXISTS idx_messages_timestamp ON messages (timestamp); +CREATE INDEX IF NOT EXISTS idx_messages_lookup_messagehash ON messages_lookup (messagehash); +CREATE INDEX IF NOT EXISTS idx_messages_lookup_timestamp ON messages_lookup (timestamp); + +DROP INDEX IF EXISTS i_query_storedat; +DROP INDEX IF EXISTS i_query; + +CREATE INDEX IF NOT EXISTS idx_query_pubsubtopic ON messages (pubsubTopic); +CREATE INDEX IF NOT EXISTS idx_query_contenttopic ON messages (contentTopic); + +-- Update to new version +UPDATE version SET version = 7 WHERE version = 6; + +""" diff --git a/migrations/message_store_postgres/pg_migration_manager.nim b/migrations/message_store_postgres/pg_migration_manager.nim index e90a51fc24..051ac9e79a 100644 --- a/migrations/message_store_postgres/pg_migration_manager.nim +++ b/migrations/message_store_postgres/pg_migration_manager.nim @@ -1,6 +1,7 @@ import content_script_version_1, content_script_version_2, content_script_version_3, - content_script_version_4, content_script_version_5, content_script_version_6 + content_script_version_4, content_script_version_5, content_script_version_6, + content_script_version_7 type MigrationScript* = object version*: int @@ -17,6 +18,7 @@ const PgMigrationScripts* = MigrationScript(version: 4, scriptContent: ContentScriptVersion_4), MigrationScript(version: 5, scriptContent: ContentScriptVersion_5), MigrationScript(version: 6, scriptContent: ContentScriptVersion_6), + MigrationScript(version: 7, scriptContent: ContentScriptVersion_7), ] proc getMigrationScripts*(currentVersion: int64, targetVersion: int64): seq[string] = diff --git a/tests/waku_archive/test_driver_postgres_query.nim b/tests/waku_archive/test_driver_postgres_query.nim index 15c3e2c975..7a135a7a46 100644 --- a/tests/waku_archive/test_driver_postgres_query.nim +++ b/tests/waku_archive/test_driver_postgres_query.nim @@ -1788,3 +1788,32 @@ suite "Postgres driver - queries": var existsRes = await driver.existsTable("version") assert existsRes.isOk(), existsRes.error check existsRes.get() == true + + asyncTest "Query by message hash only": + const contentTopic = "test-content-topic" + + let timeOrigin = now() + let expected = + @[ + fakeWakuMessage(@[byte 0], contentTopic = contentTopic, ts = ts(00, timeOrigin)), + fakeWakuMessage(@[byte 1], contentTopic = contentTopic, ts = ts(10, timeOrigin)), + fakeWakuMessage(@[byte 2], contentTopic = contentTopic, ts = ts(20, timeOrigin)), + ] + var messages = expected + + var hashes = newSeq[WakuMessageHash](0) + for msg in messages: + let hash = computeMessageHash(DefaultPubsubTopic, msg) + hashes.add(hash) + let ret = await driver.put(hash, DefaultPubsubTopic, msg) + assert ret.isOk(), ret.error + + let ret = (await driver.getMessages(hashes = hashes)).valueOr: + assert false, $error + return + + check: + ret.len == 3 + ret[2][0] == hashes[0] + ret[1][0] == hashes[1] + ret[0][0] == hashes[2] diff --git a/tests/waku_archive_legacy/test_driver_postgres_query.nim b/tests/waku_archive_legacy/test_driver_postgres_query.nim index 05ccebafc1..29c8e07c22 100644 --- a/tests/waku_archive_legacy/test_driver_postgres_query.nim +++ b/tests/waku_archive_legacy/test_driver_postgres_query.nim @@ -1952,3 +1952,36 @@ suite "Postgres driver - queries": var existsRes = await driver.existsTable("version") assert existsRes.isOk(), existsRes.error check existsRes.get() == true + + asyncTest "Query by message hash only - legacy": + const contentTopic = "test-content-topic" + + let timeOrigin = now() + let expected = + @[ + fakeWakuMessage(@[byte 0], contentTopic = contentTopic, ts = ts(00, timeOrigin)), + fakeWakuMessage(@[byte 1], contentTopic = contentTopic, ts = ts(10, timeOrigin)), + fakeWakuMessage(@[byte 2], contentTopic = contentTopic, ts = ts(20, timeOrigin)), + ] + var messages = expected + + var hashes = newSeq[WakuMessageHash](0) + for msg in messages: + let hash = computeMessageHash(DefaultPubsubTopic, msg) + hashes.add(hash) + require ( + await driver.put( + DefaultPubsubTopic, msg, computeDigest(msg), hash, msg.timestamp + ) + ).isOk() + + let ret = (await driver.getMessages(hashes = hashes)).valueOr: + assert false, $error + return + + check: + ret.len == 3 + ## (PubsubTopic, WakuMessage, seq[byte], Timestamp, WakuMessageHash) + ret[2][4] == hashes[0] + ret[1][4] == hashes[1] + ret[0][4] == hashes[2] diff --git a/waku/waku_archive/driver/postgres_driver/migrations.nim b/waku/waku_archive/driver/postgres_driver/migrations.nim index f99146bbfd..6409a07069 100644 --- a/waku/waku_archive/driver/postgres_driver/migrations.nim +++ b/waku/waku_archive/driver/postgres_driver/migrations.nim @@ -9,7 +9,7 @@ import logScope: topics = "waku archive migration" -const SchemaVersion* = 6 # increase this when there is an update in the database schema +const SchemaVersion* = 7 # increase this when there is an update in the database schema proc breakIntoStatements*(script: string): seq[string] = ## Given a full migration script, that can potentially contain a list diff --git a/waku/waku_archive/driver/postgres_driver/partitions_manager.nim b/waku/waku_archive/driver/postgres_driver/partitions_manager.nim index 0591209ce8..fe8209d51b 100644 --- a/waku/waku_archive/driver/postgres_driver/partitions_manager.nim +++ b/waku/waku_archive/driver/postgres_driver/partitions_manager.nim @@ -15,7 +15,7 @@ logScope: type TimeRange* = tuple[beginning: int64, `end`: int64] type - Partition = object + Partition* = object name: string timeRange: TimeRange @@ -132,5 +132,8 @@ proc calcEndPartitionTime*(startTime: Timestamp): Timestamp = proc getName*(partition: Partition): string = return partition.name +proc getTimeRange*(partition: Partition): TimeRange = + return partition.timeRange + func `==`*(a, b: Partition): bool {.inline.} = return a.name == b.name diff --git a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim index 351d0f925b..42c56b5a6e 100644 --- a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim +++ b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim @@ -30,6 +30,10 @@ const InsertRowStmtDefinition = """INSERT INTO messages (id, messageHash, pubsubTopic, contentTopic, payload, version, timestamp, meta) VALUES ($1, $2, $3, $4, $5, $6, $7, CASE WHEN $8 = '' THEN NULL ELSE $8 END) ON CONFLICT DO NOTHING;""" +const InsertRowInMessagesLookupStmtName = "InsertRowMessagesLookup" +const InsertRowInMessagesLookupStmtDefinition = + """INSERT INTO messages_lookup (messageHash, timestamp) VALUES ($1, $2) ON CONFLICT DO NOTHING;""" + const SelectClause = """SELECT messageHash, pubsubTopic, contentTopic, payload, version, timestamp, meta FROM messages """ @@ -301,27 +305,44 @@ method put*( ## until we completely remove the store/archive-v2 logic let fakeId = "0" + ( + ## Add the row to the messages table + await s.writeConnPool.runStmt( + InsertRowStmtName, + InsertRowStmtDefinition, + @[ + fakeId, messageHash, pubsubTopic, contentTopic, payload, version, timestamp, + meta, + ], + @[ + int32(fakeId.len), + int32(messageHash.len), + int32(pubsubTopic.len), + int32(contentTopic.len), + int32(payload.len), + int32(version.len), + int32(timestamp.len), + int32(meta.len), + ], + @[int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0)], + ) + ).isOkOr: + return err("could not put msg in messages table: " & $error) + + ## Now add the row to messages_lookup return await s.writeConnPool.runStmt( - InsertRowStmtName, - InsertRowStmtDefinition, - @[fakeId, messageHash, pubsubTopic, contentTopic, payload, version, timestamp, meta], - @[ - int32(fakeId.len), - int32(messageHash.len), - int32(pubsubTopic.len), - int32(contentTopic.len), - int32(payload.len), - int32(version.len), - int32(timestamp.len), - int32(meta.len), - ], - @[int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0)], + InsertRowInMessagesLookupStmtName, + InsertRowInMessagesLookupStmtDefinition, + @[messageHash, timestamp], + @[int32(messageHash.len), int32(timestamp.len)], + @[int32(0), int32(0)], ) method getAllMessages*( s: PostgresDriver ): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} = ## Retrieve all messages from the store. + debug "beginning of getAllMessages" var rows: seq[(WakuMessageHash, PubsubTopic, WakuMessage)] proc rowCallback(pqResult: ptr PGresult) = @@ -486,7 +507,7 @@ proc getMessageHashesArbitraryQuery( .} = ## This proc allows to handle atypical queries. We don't use prepared statements for those. - debug "beginning of getMessagesV2ArbitraryQuery" + debug "beginning of getMessageHashesArbitraryQuery" var query = """SELECT messageHash FROM messages""" var statements: seq[string] @@ -658,7 +679,7 @@ proc getMessageHashesPreparedStmt( var rows: seq[(WakuMessageHash, PubsubTopic, WakuMessage)] - debug "beginning of getMessagesV2PreparedStmt" + debug "beginning of getMessageHashesPreparedStmt" proc rowCallback(pqResult: ptr PGresult) = hashCallbackImpl(pqResult, rows) @@ -736,6 +757,50 @@ proc getMessageHashesPreparedStmt( return ok(rows) +proc getMessagesByMessageHashes( + s: PostgresDriver, hashes: string, maxPageSize: uint +): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} = + ## Retrieves information only filtering by a given messageHashes list. + ## This proc levarages on the messages_lookup table to have better query performance + ## and only query the desired partitions in the partitioned messages table + debug "beginning of getMessagesByMessageHashes" + var query = + fmt""" + WITH min_timestamp AS ( + SELECT MIN(timestamp) AS min_ts + FROM messages_lookup + WHERE messagehash IN ( + {hashes} + ) + ) + SELECT m.messageHash, pubsubTopic, contentTopic, payload, version, m.timestamp, meta + FROM messages m + INNER JOIN + messages_lookup l + ON + m.timestamp = l.timestamp + AND m.messagehash = l.messagehash + WHERE + l.timestamp >= (SELECT min_ts FROM min_timestamp) + AND l.messagehash IN ( + {hashes} + ) + ORDER BY + m.timestamp DESC, + m.messagehash DESC + LIMIT {maxPageSize}; + """ + + var rows: seq[(WakuMessageHash, PubsubTopic, WakuMessage)] + proc rowCallback(pqResult: ptr PGresult) = + rowCallbackImpl(pqResult, rows) + + (await s.readConnPool.pgQuery(query = query, rowCallback = rowCallback)).isOkOr: + return err("failed to run query: " & $error) + + debug "end of getMessagesByMessageHashes" + return ok(rows) + method getMessages*( s: PostgresDriver, includeData = true, @@ -752,6 +817,11 @@ method getMessages*( let hexHashes = hashes.mapIt(toHex(it)) + if cursor.isNone() and pubsubTopic.isNone() and contentTopics.len == 0 and + startTime.isNone() and endTime.isNone() and hexHashes.len > 0: + return + await s.getMessagesByMessageHashes("'" & hexHashes.join("','") & "'", maxPageSize) + if contentTopics.len > 0 and hexHashes.len > 0 and pubsubTopic.isSome() and startTime.isSome() and endTime.isSome(): ## Considered the most common query. Therefore, we use prepared statements to optimize it. @@ -892,7 +962,7 @@ method deleteOldestMessagesNotWithinLimit*( ): Future[ArchiveDriverResult[void]] {.async.} = debug "beginning of deleteOldestMessagesNotWithinLimit" - let execRes = await s.writeConnPool.pgQuery( + var execRes = await s.writeConnPool.pgQuery( """DELETE FROM messages WHERE messageHash NOT IN ( SELECT messageHash FROM messages ORDER BY timestamp DESC LIMIT ? @@ -902,6 +972,18 @@ method deleteOldestMessagesNotWithinLimit*( if execRes.isErr(): return err("error in deleteOldestMessagesNotWithinLimit: " & execRes.error) + execRes = await s.writeConnPool.pgQuery( + """DELETE FROM messages_lookup WHERE messageHash NOT IN + ( + SELECT messageHash FROM messages ORDER BY timestamp DESC LIMIT ? + );""", + @[$limit], + ) + if execRes.isErr(): + return err( + "error in deleteOldestMessagesNotWithinLimit messages_lookup: " & execRes.error + ) + debug "end of deleteOldestMessagesNotWithinLimit" return ok() @@ -1226,8 +1308,12 @@ proc getTableSize*( return ok(tableSize) proc removePartition( - self: PostgresDriver, partitionName: string + self: PostgresDriver, partition: Partition ): Future[ArchiveDriverResult[void]] {.async.} = + ## Removes the desired partition and also removes the rows from messages_lookup table + ## whose rows belong to the partition time range + + let partitionName = partition.getName() debug "beginning of removePartition", partitionName var partSize = "" @@ -1251,6 +1337,13 @@ proc removePartition( debug "removed partition", partition_name = partitionName, partition_size = partSize self.partitionMngr.removeOldestPartitionName() + ## Now delete rows from the messages_lookup table + let timeRange = partition.getTimeRange() + let `end` = timeRange.`end` * 1_000_000_000 + let deleteRowsQuery = "DELETE FROM messages_lookup WHERE timestamp < " & $`end` + (await self.performWriteQuery(deleteRowsQuery)).isOkOr: + return err(fmt"error in {deleteRowsQuery}: " & $error) + return ok() proc removePartitionsOlderThan( @@ -1265,7 +1358,7 @@ proc removePartitionsOlderThan( return err("could not get oldest partition in removePartitionOlderThan: " & $error) while not oldestPartition.containsMoment(tsInSec): - (await self.removePartition(oldestPartition.getName())).isOkOr: + (await self.removePartition(oldestPartition)).isOkOr: return err("issue in removePartitionsOlderThan: " & $error) oldestPartition = self.partitionMngr.getOldestPartition().valueOr: @@ -1295,7 +1388,7 @@ proc removeOldestPartition( debug "Skipping to remove the current partition" return ok() - return await self.removePartition(oldestPartition.getName()) + return await self.removePartition(oldestPartition) proc containsAnyPartition*(self: PostgresDriver): bool = return not self.partitionMngr.isEmpty() diff --git a/waku/waku_archive_legacy/driver/postgres_driver/postgres_driver.nim b/waku/waku_archive_legacy/driver/postgres_driver/postgres_driver.nim index 8f76c4d2da..f9dd05a61c 100644 --- a/waku/waku_archive_legacy/driver/postgres_driver/postgres_driver.nim +++ b/waku/waku_archive_legacy/driver/postgres_driver/postgres_driver.nim @@ -27,6 +27,10 @@ const InsertRowStmtDefinition = # TODO: get the sql queries from a file """INSERT INTO messages (id, messageHash, contentTopic, payload, pubsubTopic, version, timestamp, meta) VALUES ($1, $2, $3, $4, $5, $6, $7, CASE WHEN $8 = '' THEN NULL ELSE $8 END) ON CONFLICT DO NOTHING;""" +const InsertRowInMessagesLookupStmtName = "InsertRowMessagesLookup" +const InsertRowInMessagesLookupStmtDefinition = + """INSERT INTO messages_lookup (messageHash, timestamp) VALUES ($1, $2) ON CONFLICT DO NOTHING;""" + const SelectNoCursorAscStmtName = "SelectWithoutCursorAsc" const SelectNoCursorAscStmtDef = """SELECT contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash, meta FROM messages @@ -212,27 +216,43 @@ method put*( trace "put PostgresDriver", timestamp = timestamp + ( + await s.writeConnPool.runStmt( + InsertRowStmtName, + InsertRowStmtDefinition, + @[ + digest, messageHash, contentTopic, payload, pubsubTopic, version, timestamp, + meta, + ], + @[ + int32(digest.len), + int32(messageHash.len), + int32(contentTopic.len), + int32(payload.len), + int32(pubsubTopic.len), + int32(version.len), + int32(timestamp.len), + int32(meta.len), + ], + @[int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0)], + ) + ).isOkOr: + return err("could not put msg in messages table: " & $error) + + ## Now add the row to messages_lookup return await s.writeConnPool.runStmt( - InsertRowStmtName, - InsertRowStmtDefinition, - @[digest, messageHash, contentTopic, payload, pubsubTopic, version, timestamp, meta], - @[ - int32(digest.len), - int32(messageHash.len), - int32(contentTopic.len), - int32(payload.len), - int32(pubsubTopic.len), - int32(version.len), - int32(timestamp.len), - int32(meta.len), - ], - @[int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0)], + InsertRowInMessagesLookupStmtName, + InsertRowInMessagesLookupStmtDefinition, + @[messageHash, timestamp], + @[int32(messageHash.len), int32(timestamp.len)], + @[int32(0), int32(0)], ) method getAllMessages*( s: PostgresDriver ): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} = ## Retrieve all messages from the store. + debug "beginning of getAllMessages" var rows: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp, WakuMessageHash)] proc rowCallback(pqResult: ptr PGresult) = @@ -591,6 +611,50 @@ proc getMessagesV2PreparedStmt( return ok(rows) +proc getMessagesByMessageHashes( + s: PostgresDriver, hashes: string, maxPageSize: uint +): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} = + ## Retrieves information only filtering by a given messageHashes list. + ## This proc levarages on the messages_lookup table to have better query performance + ## and only query the desired partitions in the partitioned messages table + debug "beginning of getMessagesByMessageHashes" + var query = + fmt""" + WITH min_timestamp AS ( + SELECT MIN(timestamp) AS min_ts + FROM messages_lookup + WHERE messagehash IN ( + {hashes} + ) + ) + SELECT contentTopic, payload, pubsubTopic, version, m.timestamp, id, m.messageHash, meta + FROM messages m + INNER JOIN + messages_lookup l + ON + m.timestamp = l.timestamp + AND m.messagehash = l.messagehash + WHERE + l.timestamp >= (SELECT min_ts FROM min_timestamp) + AND l.messagehash IN ( + {hashes} + ) + ORDER BY + m.timestamp DESC, + m.messagehash DESC + LIMIT {maxPageSize}; + """ + + var rows: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp, WakuMessageHash)] + proc rowCallback(pqResult: ptr PGresult) = + rowCallbackImpl(pqResult, rows) + + (await s.readConnPool.pgQuery(query = query, rowCallback = rowCallback)).isOkOr: + return err("failed to run query: " & $error) + + debug "end of getMessagesByMessageHashes" + return ok(rows) + method getMessages*( s: PostgresDriver, includeData = true, @@ -603,8 +667,15 @@ method getMessages*( maxPageSize = DefaultPageSize, ascendingOrder = true, ): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} = + debug "beginning of getMessages" + let hexHashes = hashes.mapIt(toHex(it)) + if cursor.isNone() and pubsubTopic.isNone() and contentTopicSeq.len == 0 and + startTime.isNone() and endTime.isNone() and hexHashes.len > 0: + return + await s.getMessagesByMessageHashes("'" & hexHashes.join("','") & "'", maxPageSize) + if contentTopicSeq.len == 1 and hexHashes.len == 1 and pubsubTopic.isSome() and startTime.isSome() and endTime.isSome(): ## Considered the most common query. Therefore, we use prepared statements to optimize it.