From 7c9364443838c4b1c0eb139a86419258a49fa5e1 Mon Sep 17 00:00:00 2001 From: SionoiS Date: Thu, 28 Mar 2024 10:44:16 -0400 Subject: [PATCH] more fixes --- waku/node/waku_node.nim | 2 +- waku/waku_archive/archive.nim | 68 ++++ waku/waku_archive/driver.nim | 12 + .../postgres_driver/postgres_driver.nim | 237 ++++++++++++- .../driver/sqlite_driver/cursor.nim | 4 +- .../driver/sqlite_driver/queries.nim | 321 +++++++++++++----- .../driver/sqlite_driver/sqlite_driver.nim | 34 +- 7 files changed, 578 insertions(+), 100 deletions(-) diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 3fd2ef7cdd..aba0d54f38 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -720,7 +720,7 @@ proc mountLegacyStore*(node: WakuNode) {.async.} = return err(error) let request = request.toArchiveQuery() - let response = await node.wakuArchive.findMessages(request) + let response = await node.wakuArchive.findMessagesV2(request) return response.toHistoryResult() node.wakuLegacyStore = diff --git a/waku/waku_archive/archive.nim b/waku/waku_archive/archive.nim index 6f5330f652..efbf17e0a9 100644 --- a/waku/waku_archive/archive.nim +++ b/waku/waku_archive/archive.nim @@ -193,6 +193,74 @@ proc findMessages*( return ok(ArchiveResponse(hashes: hashes, messages: messages, cursor: cursor)) +proc findMessagesV2*( + self: WakuArchive, query: ArchiveQuery +): Future[ArchiveResult] {.async, gcsafe.} = + ## Search the archive to return a single page of messages matching the query criteria + + let maxPageSize = + if query.pageSize <= 0: + DefaultPageSize + else: + min(query.pageSize, MaxPageSize) + + let isAscendingOrder = query.direction.into() + + if query.contentTopics.len > 10: + return err(ArchiveError.invalidQuery("too many content topics")) + + let queryStartTime = getTime().toUnixFloat() + + let rows = ( + await self.driver.getMessagesV2( + contentTopic = query.contentTopics, + pubsubTopic = query.pubsubTopic, + cursor = query.cursor, + startTime = query.startTime, + endTime = query.endTime, + maxPageSize = maxPageSize + 1, + ascendingOrder = isAscendingOrder, + ) + ).valueOr: + return err(ArchiveError(kind: ArchiveErrorKind.DRIVER_ERROR, cause: error)) + + let queryDuration = getTime().toUnixFloat() - queryStartTime + waku_archive_query_duration_seconds.observe(queryDuration) + + var messages = newSeq[WakuMessage]() + var cursor = none(ArchiveCursor) + + if rows.len == 0: + return ok(ArchiveResponse(messages: messages, cursor: cursor)) + + ## Messages + let pageSize = min(rows.len, int(maxPageSize)) + + messages = rows[0 ..< pageSize].mapIt(it[1]) + + ## Cursor + if rows.len > int(maxPageSize): + ## Build last message cursor + ## The cursor is built from the last message INCLUDED in the response + ## (i.e. the second last message in the rows list) + + let (pubsubTopic, message, digest, storeTimestamp, _) = rows[^2] + + cursor = some( + ArchiveCursor( + digest: MessageDigest.fromBytes(digest), + storeTime: storeTimestamp, + sendertime: message.timestamp, + pubsubTopic: pubsubTopic, + ) + ) + + # All messages MUST be returned in chronological order + if not isAscendingOrder: + reverse(messages) + + return ok(ArchiveResponse(messages: messages, cursor: cursor)) + proc periodicRetentionPolicy(self: WakuArchive) {.async.} = debug "executing message retention policy" diff --git a/waku/waku_archive/driver.nim b/waku/waku_archive/driver.nim index e91a98ff78..6b7f28fce2 100644 --- a/waku/waku_archive/driver.nim +++ b/waku/waku_archive/driver.nim @@ -32,6 +32,18 @@ method getAllMessages*( ): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.base, async.} = discard +method getMessagesV2*( + driver: ArchiveDriver, + contentTopic = newSeq[ContentTopic](0), + pubsubTopic = none(PubsubTopic), + cursor = none(ArchiveCursor), + startTime = none(Timestamp), + endTime = none(Timestamp), + maxPageSize = DefaultPageSize, + ascendingOrder = true, +): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.base, async.} = + discard + method getMessages*( driver: ArchiveDriver, contentTopic = newSeq[ContentTopic](0), diff --git a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim index 756590cf1a..0d38320e02 100644 --- a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim +++ b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim @@ -35,6 +35,48 @@ const InsertRowStmtDefinition = # TODO: get the sql queries from a file const SelectNoCursorAscStmtName = "SelectWithoutCursorAsc" const SelectNoCursorAscStmtDef = + """SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash FROM messages + WHERE contentTopic IN ($1) AND + messageHash IN ($2) AND + pubsubTopic = $3 AND + storedAt >= $4 AND + storedAt <= $5 + ORDER BY storedAt ASC, messageHash ASC LIMIT $6;""" + +const SelectNoCursorDescStmtName = "SelectWithoutCursorDesc" +const SelectNoCursorDescStmtDef = + """SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash FROM messages + WHERE contentTopic IN ($1) AND + messageHash IN ($2) AND + pubsubTopic = $3 AND + storedAt >= $4 AND + storedAt <= $5 + ORDER BY storedAt DESC, messageHash DESC LIMIT $6;""" + +const SelectWithCursorDescStmtName = "SelectWithCursorDesc" +const SelectWithCursorDescStmtDef = + """SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash FROM messages + WHERE contentTopic IN ($1) AND + messageHash IN ($2) AND + pubsubTopic = $3 AND + (storedAt, messageHash) < ($4,$5) AND + storedAt >= $6 AND + storedAt <= $7 + ORDER BY storedAt DESC, messageHash DESC LIMIT $8;""" + +const SelectWithCursorAscStmtName = "SelectWithCursorAsc" +const SelectWithCursorAscStmtDef = + """SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash FROM messages + WHERE contentTopic IN ($1) AND + messageHash IN ($2) AND + pubsubTopic = $3 AND + (storedAt, messageHash) > ($4,$5) AND + storedAt >= $6 AND + storedAt <= $7 + ORDER BY storedAt ASC, messageHash ASC LIMIT $8;""" + +const SelectNoCursorV2AscStmtName = "SelectWithoutCursorV2Asc" +const SelectNoCursorV2AscStmtDef = """SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash FROM messages WHERE contentTopic IN ($1) AND pubsubTopic = $2 AND @@ -42,8 +84,8 @@ const SelectNoCursorAscStmtDef = storedAt <= $4 ORDER BY storedAt ASC LIMIT $5;""" -const SelectNoCursorDescStmtName = "SelectWithoutCursorDesc" -const SelectNoCursorDescStmtDef = +const SelectNoCursorV2DescStmtName = "SelectWithoutCursorV2Desc" +const SelectNoCursorV2DescStmtDef = """SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash FROM messages WHERE contentTopic IN ($1) AND pubsubTopic = $2 AND @@ -51,8 +93,8 @@ const SelectNoCursorDescStmtDef = storedAt <= $4 ORDER BY storedAt DESC LIMIT $5;""" -const SelectWithCursorDescStmtName = "SelectWithCursorDesc" -const SelectWithCursorDescStmtDef = +const SelectWithCursorV2DescStmtName = "SelectWithCursorV2Desc" +const SelectWithCursorV2DescStmtDef = """SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash FROM messages WHERE contentTopic IN ($1) AND pubsubTopic = $2 AND @@ -61,8 +103,8 @@ const SelectWithCursorDescStmtDef = storedAt <= $6 ORDER BY storedAt DESC LIMIT $7;""" -const SelectWithCursorAscStmtName = "SelectWithCursorAsc" -const SelectWithCursorAscStmtDef = +const SelectWithCursorV2AscStmtName = "SelectWithCursorV2Asc" +const SelectWithCursorV2AscStmtDef = """SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash FROM messages WHERE contentTopic IN ($1) AND pubsubTopic = $2 AND @@ -289,6 +331,70 @@ proc getMessagesArbitraryQuery( statements.add("pubsubTopic = ?") args.add(pubsubTopic.get()) + if cursor.isSome(): + let comp = if ascendingOrder: ">" else: "<" + statements.add("(storedAt, messageHash) " & comp & " (?,?)") + args.add($cursor.get().storeTime) + args.add(toHex(cursor.get().hash)) + + if startTime.isSome(): + statements.add("storedAt >= ?") + args.add($startTime.get()) + + if endTime.isSome(): + statements.add("storedAt <= ?") + args.add($endTime.get()) + + if statements.len > 0: + query &= " WHERE " & statements.join(" AND ") + + var direction: string + if ascendingOrder: + direction = "ASC" + else: + direction = "DESC" + + query &= " ORDER BY storedAt " & direction & ", messageHash " & direction + + query &= " LIMIT ?" + args.add($maxPageSize) + + var rows: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp, WakuMessageHash)] + proc rowCallback(pqResult: ptr PGresult) = + rowCallbackImpl(pqResult, rows) + + (await s.readConnPool.pgQuery(query, args, rowCallback)).isOkOr: + return err("failed to run query: " & $error) + + return ok(rows) + +proc getMessagesV2ArbitraryQuery( + s: PostgresDriver, + contentTopic: seq[ContentTopic] = @[], + pubsubTopic = none(PubsubTopic), + cursor = none(ArchiveCursor), + startTime = none(Timestamp), + endTime = none(Timestamp), + maxPageSize = DefaultPageSize, + ascendingOrder = true, +): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} = + ## This proc allows to handle atypical queries. We don't use prepared statements for those. + + var query = + """SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash FROM messages""" + var statements: seq[string] + var args: seq[string] + + if contentTopic.len > 0: + let cstmt = "contentTopic IN (" & "?".repeat(contentTopic.len).join(",") & ")" + statements.add(cstmt) + for t in contentTopic: + args.add(t) + + if pubsubTopic.isSome(): + statements.add("pubsubTopic = ?") + args.add(pubsubTopic.get()) + if cursor.isSome(): let comp = if ascendingOrder: ">" else: "<" statements.add("(storedAt, id) " & comp & " (?,?)") @@ -333,6 +439,7 @@ proc getMessagesPreparedStmt( cursor = none(ArchiveCursor), startTime: Timestamp, endTime: Timestamp, + hashes: string, maxPageSize = DefaultPageSize, ascOrder = true, ): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} = @@ -355,6 +462,85 @@ proc getMessagesPreparedStmt( var stmtDef = if ascOrder: SelectWithCursorAscStmtDef else: SelectWithCursorDescStmtDef + let hash = toHex(cursor.get().hash) + let storeTime = $cursor.get().storeTime + + ( + await s.readConnPool.runStmt( + stmtName, + stmtDef, + @[ + contentTopic, hashes, pubsubTopic, storeTime, hash, startTimeStr, endTimeStr, + limit, + ], + @[ + int32(contentTopic.len), + int32(pubsubTopic.len), + int32(storeTime.len), + int32(hash.len), + int32(startTimeStr.len), + int32(endTimeStr.len), + int32(limit.len), + ], + @[int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0)], + rowCallback, + ) + ).isOkOr: + return err("failed to run query with cursor: " & $error) + else: + var stmtName = + if ascOrder: SelectNoCursorAscStmtName else: SelectNoCursorDescStmtName + var stmtDef = if ascOrder: SelectNoCursorAscStmtDef else: SelectNoCursorDescStmtDef + + ( + await s.readConnPool.runStmt( + stmtName, + stmtDef, + @[contentTopic, hashes, pubsubTopic, startTimeStr, endTimeStr, limit], + @[ + int32(contentTopic.len), + int32(pubsubTopic.len), + int32(startTimeStr.len), + int32(endTimeStr.len), + int32(limit.len), + ], + @[int32(0), int32(0), int32(0), int32(0), int32(0)], + rowCallback, + ) + ).isOkOr: + return err("failed to run query without cursor: " & $error) + + return ok(rows) + +proc getMessagesV2PreparedStmt( + s: PostgresDriver, + contentTopic: string, + pubsubTopic: PubsubTopic, + cursor = none(ArchiveCursor), + startTime: Timestamp, + endTime: Timestamp, + maxPageSize = DefaultPageSize, + ascOrder = true, +): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} = + ## This proc aims to run the most typical queries in a more performant way, i.e. by means of + ## prepared statements. + ## + ## contentTopic - string with list of conten topics. e.g: "'ctopic1','ctopic2','ctopic3'" + + var rows: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp, WakuMessageHash)] + proc rowCallback(pqResult: ptr PGresult) = + rowCallbackImpl(pqResult, rows) + + let startTimeStr = $startTime + let endTimeStr = $endTime + let limit = $maxPageSize + + if cursor.isSome(): + var stmtName = + if ascOrder: SelectWithCursorV2AscStmtName else: SelectWithCursorV2DescStmtName + var stmtDef = + if ascOrder: SelectWithCursorV2AscStmtDef else: SelectWithCursorV2DescStmtDef + let digest = toHex(cursor.get().digest.data) let storeTime = $cursor.get().storeTime @@ -379,8 +565,9 @@ proc getMessagesPreparedStmt( return err("failed to run query with cursor: " & $error) else: var stmtName = - if ascOrder: SelectNoCursorAscStmtName else: SelectNoCursorDescStmtName - var stmtDef = if ascOrder: SelectNoCursorAscStmtDef else: SelectNoCursorDescStmtDef + if ascOrder: SelectNoCursorV2AscStmtName else: SelectNoCursorV2DescStmtName + var stmtDef = + if ascOrder: SelectNoCursorV2AscStmtDef else: SelectNoCursorV2DescStmtDef ( await s.readConnPool.runStmt( @@ -415,8 +602,8 @@ method getMessages*( ): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} = let hexHashes = hashes.mapIt(toHex(it)) - if contentTopicSeq.len == 1 and pubsubTopic.isSome() and startTime.isSome() and - endTime.isSome(): + if contentTopicSeq.len == 1 and hexHashes.len == 1 and pubsubTopic.isSome() and + startTime.isSome() and endTime.isSome(): ## Considered the most common query. Therefore, we use prepared statements to optimize it. return await s.getMessagesPreparedStmt( contentTopicSeq.join(","), @@ -424,6 +611,7 @@ method getMessages*( cursor, startTime.get(), endTime.get(), + hexHashes.join(","), maxPageSize, ascendingOrder, ) @@ -434,6 +622,35 @@ method getMessages*( ascendingOrder, ) +method getMessagesV2*( + s: PostgresDriver, + contentTopicSeq = newSeq[ContentTopic](0), + pubsubTopic = none(PubsubTopic), + cursor = none(ArchiveCursor), + startTime = none(Timestamp), + endTime = none(Timestamp), + maxPageSize = DefaultPageSize, + ascendingOrder = true, +): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} = + if contentTopicSeq.len == 1 and pubsubTopic.isSome() and startTime.isSome() and + endTime.isSome(): + ## Considered the most common query. Therefore, we use prepared statements to optimize it. + return await s.getMessagesV2PreparedStmt( + contentTopicSeq.join(","), + PubsubTopic(pubsubTopic.get()), + cursor, + startTime.get(), + endTime.get(), + maxPageSize, + ascendingOrder, + ) + else: + ## We will run atypical query. In this case we don't use prepared statemets + return await s.getMessagesV2ArbitraryQuery( + contentTopicSeq, pubsubTopic, cursor, startTime, endTime, maxPageSize, + ascendingOrder, + ) + proc getStr( s: PostgresDriver, query: string ): Future[ArchiveDriverResult[string]] {.async.} = diff --git a/waku/waku_archive/driver/sqlite_driver/cursor.nim b/waku/waku_archive/driver/sqlite_driver/cursor.nim index 0936f97477..9729f0ff79 100644 --- a/waku/waku_archive/driver/sqlite_driver/cursor.nim +++ b/waku/waku_archive/driver/sqlite_driver/cursor.nim @@ -5,7 +5,7 @@ else: import ../../../waku_core, ../../common -type DbCursor* = (Timestamp, seq[byte], PubsubTopic, WakuMessageHash) +type DbCursor* = (Timestamp, seq[byte], PubsubTopic) proc toDbCursor*(c: ArchiveCursor): DbCursor = - (c.storeTime, @(c.digest.data), c.pubsubTopic, c.hash) + (c.storeTime, @(c.digest.data), c.pubsubTopic) diff --git a/waku/waku_archive/driver/sqlite_driver/queries.nim b/waku/waku_archive/driver/sqlite_driver/queries.nim index 7588ef576a..30e8af02b2 100644 --- a/waku/waku_archive/driver/sqlite_driver/queries.nim +++ b/waku/waku_archive/driver/sqlite_driver/queries.nim @@ -285,24 +285,21 @@ proc combineClauses(clauses: varargs[Option[string]]): Option[string] = where &= " AND " & clause return some(where) -proc whereClause( - cursor: Option[bool], # true v3, false v2 +proc whereClausev2( + cursor: bool, pubsubTopic: Option[PubsubTopic], contentTopic: seq[ContentTopic], startTime: Option[Timestamp], endTime: Option[Timestamp], - hashes: seq[WakuMessageHash], ascending: bool, ): Option[string] = let cursorClause = - if cursor.isNone(): - none(string) - else: + if cursor: let comp = if ascending: ">" else: "<" - if cursor.get(): - some("(timestamp, messageHash) " & comp & " (?, ?)") - else: - some("(storedAt, id) " & comp & " (?, ?)") + + some("(storedAt, id) " & comp & " (?, ?)") + else: + none(string) let pubsubTopicClause = if pubsubTopic.isNone(): @@ -333,23 +330,11 @@ proc whereClause( else: some("storedAt <= (?)") - let hashesClause = - if hashes.len <= 0: - none(string) - else: - var where = "messageHash IN (" - where &= "?" - for _ in 1 ..< hashes.len: - where &= ", ?" - where &= ")" - some(where) - return combineClauses( - cursorClause, pubsubTopicClause, contentTopicClause, startTimeClause, endTimeClause, - hashesClause, + cursorClause, pubsubTopicClause, contentTopicClause, startTimeClause, endTimeClause ) -proc selectMessagesWithLimitQuery( +proc selectMessagesWithLimitQueryv2( table: string, where: Option[string], limit: uint, ascending = true, v3 = false ): SqlQueryStr = let order = if ascending: "ASC" else: "DESC" @@ -363,27 +348,12 @@ proc selectMessagesWithLimitQuery( if where.isSome(): query &= " WHERE " & where.get() - query &= " ORDER BY storedAt " & order - - # either order by messageHash (v3) or digest (v2) - if v3: - query &= ", messageHash " & order - else: - query &= ", id " & order + query &= " ORDER BY storedAt " & order & ", id " & order query &= " LIMIT " & $limit & ";" return query -proc selectMessageByHashQuery(): SqlQueryStr = - var query: string - - query = "SELECT contentTopic, payload, version, timestamp, messageHash" - query &= " FROM " & DbTable - query &= " WHERE messageHash = (?)" - - return query - proc prepareStmt( db: SqliteDatabase, stmt: string ): DatabaseResult[SqliteStmt[void, void]] = @@ -391,15 +361,13 @@ proc prepareStmt( checkErr sqlite3_prepare_v2(db.env, stmt, stmt.len.cint, addr s, nil) return ok(SqliteStmt[void, void](s)) -proc execSelectMessagesWithLimitStmt( +proc execSelectMessagesV2WithLimitStmt( s: SqliteStmt, - cursorv2: Option[DbCursor], - cursorv3: Option[(Timestamp, WakuMessageHash)], + cursor: Option[DbCursor], pubsubTopic: Option[PubsubTopic], contentTopic: seq[ContentTopic], startTime: Option[Timestamp], endTime: Option[Timestamp], - hashes: seq[WakuMessageHash], onRowCallback: DataProc, ): DatabaseResult[void] = let s = RawStmtPtr(s) @@ -407,14 +375,8 @@ proc execSelectMessagesWithLimitStmt( # Bind params var paramIndex = 1 - if cursorv3.isSome(): - let (time, hash) = cursorv3.get() - checkErr bindParam(s, paramIndex, time) - paramIndex += 1 - checkErr bindParam(s, paramIndex, toSeq(hash)) - paramIndex += 1 - elif cursorv2.isSome(): # cursorv2 = storedAt, id, pubsubTopic - let (storedAt, id, _, _) = cursorv2.get() + if cursor.isSome(): + let (storedAt, id, _) = cursor.get() checkErr bindParam(s, paramIndex, storedAt) paramIndex += 1 checkErr bindParam(s, paramIndex, id) @@ -429,10 +391,6 @@ proc execSelectMessagesWithLimitStmt( checkErr bindParam(s, paramIndex, topic.toBytes()) paramIndex += 1 - for hash in hashes: - checkErr bindParam(s, paramIndex, toSeq(hash)) - paramIndex += 1 - if startTime.isSome(): let time = startTime.get() checkErr bindParam(s, paramIndex, time) @@ -488,38 +446,233 @@ proc selectMessagesByHistoryQueryWithLimit*( cursor: Option[DbCursor], startTime: Option[Timestamp], endTime: Option[Timestamp], + limit: uint, + ascending: bool, +): DatabaseResult[ + seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp, WakuMessageHash)] +] = + var messages: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp, WakuMessageHash)] = + @[] + + proc queryRowCallback(s: ptr sqlite3_stmt) = + let + pubsubTopic = queryRowPubsubTopicCallback(s, pubsubTopicCol = 3) + message = queryRowWakuMessageCallback( + s, contentTopicCol = 1, payloadCol = 2, versionCol = 4, senderTimestampCol = 5 + ) + digest = queryRowDigestCallback(s, digestCol = 6) + storedAt = queryRowReceiverTimestampCallback(s, storedAtCol = 0) + hash = queryRowWakuMessageHashCallback(s, hashCol = 7) + + messages.add((pubsubTopic, message, digest, storedAt, hash)) + + let query = block: + let where = whereClausev2( + cursor.isSome(), pubsubTopic, contentTopic, startTime, endTime, ascending + ) + + selectMessagesWithLimitQueryv2(DbTable, where, limit, ascending) + + let dbStmt = ?db.prepareStmt(query) + ?dbStmt.execSelectMessagesV2WithLimitStmt( + cursor, pubsubTopic, contentTopic, startTime, endTime, queryRowCallback + ) + dbStmt.dispose() + + return ok(messages) + +### Store v3 ### + +proc selectMessageByHashQuery(): SqlQueryStr = + var query: string + + query = "SELECT contentTopic, payload, version, timestamp, messageHash" + query &= " FROM " & DbTable + query &= " WHERE messageHash = (?)" + + return query + +proc whereClause( + cursor: bool, + pubsubTopic: Option[PubsubTopic], + contentTopic: seq[ContentTopic], + startTime: Option[Timestamp], + endTime: Option[Timestamp], + hashes: seq[WakuMessageHash], + ascending: bool, +): Option[string] = + let cursorClause = + if cursor: + let comp = if ascending: ">" else: "<" + + some("(timestamp, messageHash) " & comp & " (?, ?)") + else: + none(string) + + let pubsubTopicClause = + if pubsubTopic.isNone(): + none(string) + else: + some("pubsubTopic = (?)") + + let contentTopicClause = + if contentTopic.len <= 0: + none(string) + else: + var where = "contentTopic IN (" + where &= "?" + for _ in 1 ..< contentTopic.len: + where &= ", ?" + where &= ")" + some(where) + + let startTimeClause = + if startTime.isNone(): + none(string) + else: + some("storedAt >= (?)") + + let endTimeClause = + if endTime.isNone(): + none(string) + else: + some("storedAt <= (?)") + + let hashesClause = + if hashes.len <= 0: + none(string) + else: + var where = "messageHash IN (" + where &= "?" + for _ in 1 ..< hashes.len: + where &= ", ?" + where &= ")" + some(where) + + return combineClauses( + cursorClause, pubsubTopicClause, contentTopicClause, startTimeClause, endTimeClause, + hashesClause, + ) + +proc execSelectMessagesWithLimitStmt( + s: SqliteStmt, + cursor: Option[(Timestamp, WakuMessageHash)], + pubsubTopic: Option[PubsubTopic], + contentTopic: seq[ContentTopic], + startTime: Option[Timestamp], + endTime: Option[Timestamp], + hashes: seq[WakuMessageHash], + onRowCallback: DataProc, +): DatabaseResult[void] = + let s = RawStmtPtr(s) + + # Bind params + var paramIndex = 1 + + if cursor.isSome(): + let (time, hash) = cursor.get() + checkErr bindParam(s, paramIndex, time) + paramIndex += 1 + checkErr bindParam(s, paramIndex, toSeq(hash)) + paramIndex += 1 + + if pubsubTopic.isSome(): + let pubsubTopic = toBytes(pubsubTopic.get()) + checkErr bindParam(s, paramIndex, pubsubTopic) + paramIndex += 1 + + for topic in contentTopic: + checkErr bindParam(s, paramIndex, topic.toBytes()) + paramIndex += 1 + + for hash in hashes: + checkErr bindParam(s, paramIndex, toSeq(hash)) + paramIndex += 1 + + if startTime.isSome(): + let time = startTime.get() + checkErr bindParam(s, paramIndex, time) + paramIndex += 1 + + if endTime.isSome(): + let time = endTime.get() + checkErr bindParam(s, paramIndex, time) + paramIndex += 1 + + try: + while true: + let v = sqlite3_step(s) + case v + of SQLITE_ROW: + onRowCallback(s) + of SQLITE_DONE: + return ok() + else: + return err($sqlite3_errstr(v)) + finally: + # release implicit transaction + discard sqlite3_reset(s) # same return information as step + discard sqlite3_clear_bindings(s) # no errors possible + +proc selectMessagesWithLimitQuery( + table: string, where: Option[string], limit: uint, ascending = true, v3 = false +): SqlQueryStr = + let order = if ascending: "ASC" else: "DESC" + + var query: string + + query = + "SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash" + query &= " FROM " & table + + if where.isSome(): + query &= " WHERE " & where.get() + + query &= " ORDER BY storedAt " & order & ", messageHash " & order + + query &= " LIMIT " & $limit & ";" + + return query + +proc selectMessagesByStoreQueryWithLimit*( + db: SqliteDatabase, + contentTopic: seq[ContentTopic], + pubsubTopic: Option[PubsubTopic], + cursor: Option[WakuMessageHash], + startTime: Option[Timestamp], + endTime: Option[Timestamp], hashes: seq[WakuMessageHash], limit: uint, ascending: bool, ): DatabaseResult[ seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp, WakuMessageHash)] ] = - # Store v3 cursor is only the hash of the message # Must first get the message timestamp before paginating by time - var cursorv2 = cursor - var cursorv3 = none((Timestamp, WakuMessageHash)) - if cursor.isSome() and cursor.get()[3] != EmptyWakuMessageHash: - let hash: WakuMessageHash = cursor.get()[3] + let newCursor = + if cursor.isSome() and cursor.get() != EmptyWakuMessageHash: + let hash: WakuMessageHash = cursor.get() - var wakuMessage: WakuMessage + var wakuMessage: WakuMessage - proc queryRowCallback(s: ptr sqlite3_stmt) = - wakuMessage = queryRowWakuMessageCallback( - s, contentTopicCol = 0, payloadCol = 1, versionCol = 2, senderTimestampCol = 3 - ) + proc queryRowCallback(s: ptr sqlite3_stmt) = + wakuMessage = queryRowWakuMessageCallback( + s, contentTopicCol = 0, payloadCol = 1, versionCol = 2, senderTimestampCol = 3 + ) - let query = selectMessageByHashQuery() - let dbStmt = ?db.prepareStmt(query) - ?dbStmt.execSelectMessageByHash(hash, queryRowCallback) - dbStmt.dispose() + let query = selectMessageByHashQuery() + let dbStmt = ?db.prepareStmt(query) + ?dbStmt.execSelectMessageByHash(hash, queryRowCallback) + dbStmt.dispose() - let time: Timestamp = wakuMessage.timestamp + let time: Timestamp = wakuMessage.timestamp - cursorv3 = some((time, hash)) - cursorv2 = none(DbCursor) + some((time, hash)) + else: + none((Timestamp, WakuMessageHash)) var messages: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp, WakuMessageHash)] = @[] + proc queryRowCallback(s: ptr sqlite3_stmt) = let pubsubTopic = queryRowPubsubTopicCallback(s, pubsubTopicCol = 3) @@ -533,23 +686,21 @@ proc selectMessagesByHistoryQueryWithLimit*( messages.add((pubsubTopic, message, digest, storedAt, hash)) let query = block: - let cursorState = - if cursorv3.isSome(): - some(true) - elif cursorv2.isSome(): - some(false) - else: - none(bool) - let where = whereClause( - cursorState, pubsubTopic, contentTopic, startTime, endTime, hashes, ascending + newCursor.isSome(), + pubsubTopic, + contentTopic, + startTime, + endTime, + hashes, + ascending, ) - selectMessagesWithLimitQuery(DbTable, where, limit, ascending, cursorv3.isSome()) + + selectMessagesWithLimitQuery(DbTable, where, limit, ascending, true) let dbStmt = ?db.prepareStmt(query) ?dbStmt.execSelectMessagesWithLimitStmt( - cursorv2, cursorv3, pubsubTopic, contentTopic, startTime, endTime, hashes, - queryRowCallback, + newCursor, pubsubTopic, contentTopic, startTime, endTime, hashes, queryRowCallback ) dbStmt.dispose() diff --git a/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim b/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim index 2190ce8fb6..80bd52308f 100644 --- a/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim +++ b/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim @@ -83,20 +83,50 @@ method getAllMessages*( ## Retrieve all messages from the store. return s.db.selectAllMessages() -method getMessages*( +method getMessagesV2*( s: SqliteDriver, contentTopic = newSeq[ContentTopic](0), pubsubTopic = none(PubsubTopic), cursor = none(ArchiveCursor), startTime = none(Timestamp), endTime = none(Timestamp), - hashes = newSeq[WakuMessageHash](0), maxPageSize = DefaultPageSize, ascendingOrder = true, ): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} = + echo "here" + let cursor = cursor.map(toDbCursor) let rowsRes = s.db.selectMessagesByHistoryQueryWithLimit( + contentTopic, + pubsubTopic, + cursor, + startTime, + endTime, + limit = maxPageSize, + ascending = ascendingOrder, + ) + + return rowsRes + +method getMessages*( + s: SqliteDriver, + contentTopic = newSeq[ContentTopic](0), + pubsubTopic = none(PubsubTopic), + cursor = none(ArchiveCursor), + startTime = none(Timestamp), + endTime = none(Timestamp), + hashes = newSeq[WakuMessageHash](0), + maxPageSize = DefaultPageSize, + ascendingOrder = true, +): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} = + let cursor = + if cursor.isSome(): + some(cursor.get().hash) + else: + none(WakuMessageHash) + + let rowsRes = s.db.selectMessagesByStoreQueryWithLimit( contentTopic, pubsubTopic, cursor,