Skip to content

Commit

Permalink
more fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
SionoiS committed Mar 28, 2024
1 parent 4fb6050 commit 7c93644
Show file tree
Hide file tree
Showing 7 changed files with 578 additions and 100 deletions.
2 changes: 1 addition & 1 deletion waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,7 @@ proc mountLegacyStore*(node: WakuNode) {.async.} =
return err(error)

let request = request.toArchiveQuery()
let response = await node.wakuArchive.findMessages(request)
let response = await node.wakuArchive.findMessagesV2(request)
return response.toHistoryResult()

node.wakuLegacyStore =
Expand Down
68 changes: 68 additions & 0 deletions waku/waku_archive/archive.nim
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,74 @@ proc findMessages*(

return ok(ArchiveResponse(hashes: hashes, messages: messages, cursor: cursor))

proc findMessagesV2*(
self: WakuArchive, query: ArchiveQuery
): Future[ArchiveResult] {.async, gcsafe.} =
## Search the archive to return a single page of messages matching the query criteria

let maxPageSize =
if query.pageSize <= 0:
DefaultPageSize
else:
min(query.pageSize, MaxPageSize)

let isAscendingOrder = query.direction.into()

if query.contentTopics.len > 10:
return err(ArchiveError.invalidQuery("too many content topics"))

let queryStartTime = getTime().toUnixFloat()

let rows = (
await self.driver.getMessagesV2(
contentTopic = query.contentTopics,
pubsubTopic = query.pubsubTopic,
cursor = query.cursor,
startTime = query.startTime,
endTime = query.endTime,
maxPageSize = maxPageSize + 1,
ascendingOrder = isAscendingOrder,
)
).valueOr:
return err(ArchiveError(kind: ArchiveErrorKind.DRIVER_ERROR, cause: error))

let queryDuration = getTime().toUnixFloat() - queryStartTime
waku_archive_query_duration_seconds.observe(queryDuration)

var messages = newSeq[WakuMessage]()
var cursor = none(ArchiveCursor)

if rows.len == 0:
return ok(ArchiveResponse(messages: messages, cursor: cursor))

## Messages
let pageSize = min(rows.len, int(maxPageSize))

messages = rows[0 ..< pageSize].mapIt(it[1])

## Cursor
if rows.len > int(maxPageSize):
## Build last message cursor
## The cursor is built from the last message INCLUDED in the response
## (i.e. the second last message in the rows list)

let (pubsubTopic, message, digest, storeTimestamp, _) = rows[^2]

cursor = some(
ArchiveCursor(
digest: MessageDigest.fromBytes(digest),
storeTime: storeTimestamp,
sendertime: message.timestamp,
pubsubTopic: pubsubTopic,
)
)

# All messages MUST be returned in chronological order
if not isAscendingOrder:
reverse(messages)

return ok(ArchiveResponse(messages: messages, cursor: cursor))

proc periodicRetentionPolicy(self: WakuArchive) {.async.} =
debug "executing message retention policy"

Expand Down
12 changes: 12 additions & 0 deletions waku/waku_archive/driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,18 @@ method getAllMessages*(
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.base, async.} =
discard

method getMessagesV2*(
driver: ArchiveDriver,
contentTopic = newSeq[ContentTopic](0),
pubsubTopic = none(PubsubTopic),
cursor = none(ArchiveCursor),
startTime = none(Timestamp),
endTime = none(Timestamp),
maxPageSize = DefaultPageSize,
ascendingOrder = true,
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.base, async.} =
discard

method getMessages*(
driver: ArchiveDriver,
contentTopic = newSeq[ContentTopic](0),
Expand Down
Loading

0 comments on commit 7c93644

Please sign in to comment.