Skip to content

Commit

Permalink
chore: Optimize hash queries with lookup table (#2933)
Browse files Browse the repository at this point in the history
* Upgrade Postgres schema to add messages_lookup table
* Perform optimized query for messageHash-only queries
  • Loading branch information
Ivansete-status committed Aug 8, 2024
1 parent 8584b94 commit 6463885
Show file tree
Hide file tree
Showing 8 changed files with 298 additions and 37 deletions.
30 changes: 30 additions & 0 deletions migrations/message_store_postgres/content_script_version_7.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
const ContentScriptVersion_7* =
"""
-- Create lookup table
CREATE TABLE IF NOT EXISTS messages_lookup (
timestamp BIGINT NOT NULL,
messageHash VARCHAR NOT NULL
);
-- Put data into lookup table
INSERT INTO messages_lookup (messageHash, timestamp) SELECT messageHash, timestamp from messages;
ALTER TABLE messages_lookup ADD CONSTRAINT messageIndexLookupTable PRIMARY KEY (messageHash, timestamp);
-- Create indexes
CREATE INDEX IF NOT EXISTS idx_messages_messagehash ON messages (messagehash);
CREATE INDEX IF NOT EXISTS idx_messages_timestamp ON messages (timestamp);
CREATE INDEX IF NOT EXISTS idx_messages_lookup_messagehash ON messages_lookup (messagehash);
CREATE INDEX IF NOT EXISTS idx_messages_lookup_timestamp ON messages_lookup (timestamp);
DROP INDEX IF EXISTS i_query_storedat;
DROP INDEX IF EXISTS i_query;
CREATE INDEX IF NOT EXISTS idx_query_pubsubtopic ON messages (pubsubTopic);
CREATE INDEX IF NOT EXISTS idx_query_contenttopic ON messages (contentTopic);
-- Update to new version
UPDATE version SET version = 7 WHERE version = 6;
"""
4 changes: 3 additions & 1 deletion migrations/message_store_postgres/pg_migration_manager.nim
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import
content_script_version_1, content_script_version_2, content_script_version_3,
content_script_version_4, content_script_version_5, content_script_version_6
content_script_version_4, content_script_version_5, content_script_version_6,
content_script_version_7

type MigrationScript* = object
version*: int
Expand All @@ -17,6 +18,7 @@ const PgMigrationScripts* =
MigrationScript(version: 4, scriptContent: ContentScriptVersion_4),
MigrationScript(version: 5, scriptContent: ContentScriptVersion_5),
MigrationScript(version: 6, scriptContent: ContentScriptVersion_6),
MigrationScript(version: 7, scriptContent: ContentScriptVersion_7),
]

proc getMigrationScripts*(currentVersion: int64, targetVersion: int64): seq[string] =
Expand Down
29 changes: 29 additions & 0 deletions tests/waku_archive/test_driver_postgres_query.nim
Original file line number Diff line number Diff line change
Expand Up @@ -1788,3 +1788,32 @@ suite "Postgres driver - queries":
var existsRes = await driver.existsTable("version")
assert existsRes.isOk(), existsRes.error
check existsRes.get() == true

asyncTest "Query by message hash only":
const contentTopic = "test-content-topic"

let timeOrigin = now()
let expected =
@[
fakeWakuMessage(@[byte 0], contentTopic = contentTopic, ts = ts(00, timeOrigin)),
fakeWakuMessage(@[byte 1], contentTopic = contentTopic, ts = ts(10, timeOrigin)),
fakeWakuMessage(@[byte 2], contentTopic = contentTopic, ts = ts(20, timeOrigin)),
]
var messages = expected

var hashes = newSeq[WakuMessageHash](0)
for msg in messages:
let hash = computeMessageHash(DefaultPubsubTopic, msg)
hashes.add(hash)
let ret = await driver.put(hash, DefaultPubsubTopic, msg)
assert ret.isOk(), ret.error

let ret = (await driver.getMessages(hashes = hashes)).valueOr:
assert false, $error
return

check:
ret.len == 3
ret[2][0] == hashes[0]
ret[1][0] == hashes[1]
ret[0][0] == hashes[2]
33 changes: 33 additions & 0 deletions tests/waku_archive_legacy/test_driver_postgres_query.nim
Original file line number Diff line number Diff line change
Expand Up @@ -1952,3 +1952,36 @@ suite "Postgres driver - queries":
var existsRes = await driver.existsTable("version")
assert existsRes.isOk(), existsRes.error
check existsRes.get() == true

asyncTest "Query by message hash only - legacy":
const contentTopic = "test-content-topic"

let timeOrigin = now()
let expected =
@[
fakeWakuMessage(@[byte 0], contentTopic = contentTopic, ts = ts(00, timeOrigin)),
fakeWakuMessage(@[byte 1], contentTopic = contentTopic, ts = ts(10, timeOrigin)),
fakeWakuMessage(@[byte 2], contentTopic = contentTopic, ts = ts(20, timeOrigin)),
]
var messages = expected

var hashes = newSeq[WakuMessageHash](0)
for msg in messages:
let hash = computeMessageHash(DefaultPubsubTopic, msg)
hashes.add(hash)
require (
await driver.put(
DefaultPubsubTopic, msg, computeDigest(msg), hash, msg.timestamp
)
).isOk()

let ret = (await driver.getMessages(hashes = hashes)).valueOr:
assert false, $error
return

check:
ret.len == 3
## (PubsubTopic, WakuMessage, seq[byte], Timestamp, WakuMessageHash)
ret[2][4] == hashes[0]
ret[1][4] == hashes[1]
ret[0][4] == hashes[2]
2 changes: 1 addition & 1 deletion waku/waku_archive/driver/postgres_driver/migrations.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import
logScope:
topics = "waku archive migration"

const SchemaVersion* = 6 # increase this when there is an update in the database schema
const SchemaVersion* = 7 # increase this when there is an update in the database schema

proc breakIntoStatements*(script: string): seq[string] =
## Given a full migration script, that can potentially contain a list
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ logScope:
type TimeRange* = tuple[beginning: int64, `end`: int64]

type
Partition = object
Partition* = object
name: string
timeRange: TimeRange

Expand Down Expand Up @@ -132,5 +132,8 @@ proc calcEndPartitionTime*(startTime: Timestamp): Timestamp =
proc getName*(partition: Partition): string =
return partition.name

proc getTimeRange*(partition: Partition): TimeRange =
return partition.timeRange

func `==`*(a, b: Partition): bool {.inline.} =
return a.name == b.name
133 changes: 113 additions & 20 deletions waku/waku_archive/driver/postgres_driver/postgres_driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ const InsertRowStmtDefinition =
"""INSERT INTO messages (id, messageHash, pubsubTopic, contentTopic, payload,
version, timestamp, meta) VALUES ($1, $2, $3, $4, $5, $6, $7, CASE WHEN $8 = '' THEN NULL ELSE $8 END) ON CONFLICT DO NOTHING;"""

const InsertRowInMessagesLookupStmtName = "InsertRowMessagesLookup"
const InsertRowInMessagesLookupStmtDefinition =
"""INSERT INTO messages_lookup (messageHash, timestamp) VALUES ($1, $2) ON CONFLICT DO NOTHING;"""

const SelectClause =
"""SELECT messageHash, pubsubTopic, contentTopic, payload, version, timestamp, meta FROM messages """

Expand Down Expand Up @@ -301,27 +305,44 @@ method put*(
## until we completely remove the store/archive-v2 logic
let fakeId = "0"

(
## Add the row to the messages table
await s.writeConnPool.runStmt(
InsertRowStmtName,
InsertRowStmtDefinition,
@[
fakeId, messageHash, pubsubTopic, contentTopic, payload, version, timestamp,
meta,
],
@[
int32(fakeId.len),
int32(messageHash.len),
int32(pubsubTopic.len),
int32(contentTopic.len),
int32(payload.len),
int32(version.len),
int32(timestamp.len),
int32(meta.len),
],
@[int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0)],
)
).isOkOr:
return err("could not put msg in messages table: " & $error)

## Now add the row to messages_lookup
return await s.writeConnPool.runStmt(
InsertRowStmtName,
InsertRowStmtDefinition,
@[fakeId, messageHash, pubsubTopic, contentTopic, payload, version, timestamp, meta],
@[
int32(fakeId.len),
int32(messageHash.len),
int32(pubsubTopic.len),
int32(contentTopic.len),
int32(payload.len),
int32(version.len),
int32(timestamp.len),
int32(meta.len),
],
@[int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0)],
InsertRowInMessagesLookupStmtName,
InsertRowInMessagesLookupStmtDefinition,
@[messageHash, timestamp],
@[int32(messageHash.len), int32(timestamp.len)],
@[int32(0), int32(0)],
)

method getAllMessages*(
s: PostgresDriver
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
## Retrieve all messages from the store.
debug "beginning of getAllMessages"

var rows: seq[(WakuMessageHash, PubsubTopic, WakuMessage)]
proc rowCallback(pqResult: ptr PGresult) =
Expand Down Expand Up @@ -486,7 +507,7 @@ proc getMessageHashesArbitraryQuery(
.} =
## This proc allows to handle atypical queries. We don't use prepared statements for those.

debug "beginning of getMessagesV2ArbitraryQuery"
debug "beginning of getMessageHashesArbitraryQuery"
var query = """SELECT messageHash FROM messages"""

var statements: seq[string]
Expand Down Expand Up @@ -658,7 +679,7 @@ proc getMessageHashesPreparedStmt(

var rows: seq[(WakuMessageHash, PubsubTopic, WakuMessage)]

debug "beginning of getMessagesV2PreparedStmt"
debug "beginning of getMessageHashesPreparedStmt"
proc rowCallback(pqResult: ptr PGresult) =
hashCallbackImpl(pqResult, rows)

Expand Down Expand Up @@ -736,6 +757,50 @@ proc getMessageHashesPreparedStmt(

return ok(rows)

proc getMessagesByMessageHashes(
s: PostgresDriver, hashes: string, maxPageSize: uint
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
## Retrieves information only filtering by a given messageHashes list.
## This proc levarages on the messages_lookup table to have better query performance
## and only query the desired partitions in the partitioned messages table
debug "beginning of getMessagesByMessageHashes"
var query =
fmt"""
WITH min_timestamp AS (
SELECT MIN(timestamp) AS min_ts
FROM messages_lookup
WHERE messagehash IN (
{hashes}
)
)
SELECT m.messageHash, pubsubTopic, contentTopic, payload, version, m.timestamp, meta
FROM messages m
INNER JOIN
messages_lookup l
ON
m.timestamp = l.timestamp
AND m.messagehash = l.messagehash
WHERE
l.timestamp >= (SELECT min_ts FROM min_timestamp)
AND l.messagehash IN (
{hashes}
)
ORDER BY
m.timestamp DESC,
m.messagehash DESC
LIMIT {maxPageSize};
"""

var rows: seq[(WakuMessageHash, PubsubTopic, WakuMessage)]
proc rowCallback(pqResult: ptr PGresult) =
rowCallbackImpl(pqResult, rows)

(await s.readConnPool.pgQuery(query = query, rowCallback = rowCallback)).isOkOr:
return err("failed to run query: " & $error)

debug "end of getMessagesByMessageHashes"
return ok(rows)

method getMessages*(
s: PostgresDriver,
includeData = true,
Expand All @@ -752,6 +817,11 @@ method getMessages*(

let hexHashes = hashes.mapIt(toHex(it))

if cursor.isNone() and pubsubTopic.isNone() and contentTopics.len == 0 and
startTime.isNone() and endTime.isNone() and hexHashes.len > 0:
return
await s.getMessagesByMessageHashes("'" & hexHashes.join("','") & "'", maxPageSize)

if contentTopics.len > 0 and hexHashes.len > 0 and pubsubTopic.isSome() and
startTime.isSome() and endTime.isSome():
## Considered the most common query. Therefore, we use prepared statements to optimize it.
Expand Down Expand Up @@ -892,7 +962,7 @@ method deleteOldestMessagesNotWithinLimit*(
): Future[ArchiveDriverResult[void]] {.async.} =
debug "beginning of deleteOldestMessagesNotWithinLimit"

let execRes = await s.writeConnPool.pgQuery(
var execRes = await s.writeConnPool.pgQuery(
"""DELETE FROM messages WHERE messageHash NOT IN
(
SELECT messageHash FROM messages ORDER BY timestamp DESC LIMIT ?
Expand All @@ -902,6 +972,18 @@ method deleteOldestMessagesNotWithinLimit*(
if execRes.isErr():
return err("error in deleteOldestMessagesNotWithinLimit: " & execRes.error)

execRes = await s.writeConnPool.pgQuery(
"""DELETE FROM messages_lookup WHERE messageHash NOT IN
(
SELECT messageHash FROM messages ORDER BY timestamp DESC LIMIT ?
);""",
@[$limit],
)
if execRes.isErr():
return err(
"error in deleteOldestMessagesNotWithinLimit messages_lookup: " & execRes.error
)

debug "end of deleteOldestMessagesNotWithinLimit"
return ok()

Expand Down Expand Up @@ -1226,8 +1308,12 @@ proc getTableSize*(
return ok(tableSize)

proc removePartition(
self: PostgresDriver, partitionName: string
self: PostgresDriver, partition: Partition
): Future[ArchiveDriverResult[void]] {.async.} =
## Removes the desired partition and also removes the rows from messages_lookup table
## whose rows belong to the partition time range

let partitionName = partition.getName()
debug "beginning of removePartition", partitionName

var partSize = ""
Expand All @@ -1251,6 +1337,13 @@ proc removePartition(
debug "removed partition", partition_name = partitionName, partition_size = partSize
self.partitionMngr.removeOldestPartitionName()

## Now delete rows from the messages_lookup table
let timeRange = partition.getTimeRange()
let `end` = timeRange.`end` * 1_000_000_000
let deleteRowsQuery = "DELETE FROM messages_lookup WHERE timestamp < " & $`end`
(await self.performWriteQuery(deleteRowsQuery)).isOkOr:
return err(fmt"error in {deleteRowsQuery}: " & $error)

return ok()

proc removePartitionsOlderThan(
Expand All @@ -1265,7 +1358,7 @@ proc removePartitionsOlderThan(
return err("could not get oldest partition in removePartitionOlderThan: " & $error)

while not oldestPartition.containsMoment(tsInSec):
(await self.removePartition(oldestPartition.getName())).isOkOr:
(await self.removePartition(oldestPartition)).isOkOr:
return err("issue in removePartitionsOlderThan: " & $error)

oldestPartition = self.partitionMngr.getOldestPartition().valueOr:
Expand Down Expand Up @@ -1295,7 +1388,7 @@ proc removeOldestPartition(
debug "Skipping to remove the current partition"
return ok()

return await self.removePartition(oldestPartition.getName())
return await self.removePartition(oldestPartition)

proc containsAnyPartition*(self: PostgresDriver): bool =
return not self.partitionMngr.isEmpty()
Expand Down
Loading

0 comments on commit 6463885

Please sign in to comment.