Skip to content

Commit

Permalink
add last online to archive
Browse files Browse the repository at this point in the history
  • Loading branch information
SionoiS committed Jul 17, 2024
1 parent 5c539fe commit d8364b5
Show file tree
Hide file tree
Showing 13 changed files with 197 additions and 22 deletions.
11 changes: 11 additions & 0 deletions migrations/message_store_postgres/content_script_version_7.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
const ContentScriptVersion_7* =
"""
-- Create a new table to hold the last online timestamp
CREATE TABLE last_online (timestamp BIGINT NOT NULL);
-- Insert a placeholder first value
INSERT INTO last_online (timestamp) VALUES(1);
-- Update to new version
UPDATE version SET version = 7 WHERE version = 6;
"""
4 changes: 3 additions & 1 deletion migrations/message_store_postgres/pg_migration_manager.nim
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import
content_script_version_1, content_script_version_2, content_script_version_3,
content_script_version_4, content_script_version_5, content_script_version_6
content_script_version_4, content_script_version_5, content_script_version_6,
content_script_version_7

type MigrationScript* = object
version*: int
Expand All @@ -17,6 +18,7 @@ const PgMigrationScripts* =
MigrationScript(version: 4, scriptContent: ContentScriptVersion_4),
MigrationScript(version: 5, scriptContent: ContentScriptVersion_5),
MigrationScript(version: 6, scriptContent: ContentScriptVersion_6),
MigrationScript(version: 7, scriptContent: ContentScriptVersion_7),
]

proc getMigrationScripts*(currentVersion: int64, targetVersion: int64): seq[string] =
Expand Down
3 changes: 1 addition & 2 deletions tests/waku_archive/archive_utils.nim
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
{.used.}

import std/options, stew/results, chronos, libp2p/crypto/crypto
import std/options, results, chronos, libp2p/crypto/crypto

import
waku/[
node/peer_manager,
waku_core,
waku_archive,
waku_archive/common,
waku_archive/driver/sqlite_driver,
common/databases/db_sqlite,
],
Expand Down
13 changes: 13 additions & 0 deletions tests/waku_archive/test_driver_postgres_query.nim
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,19 @@ suite "Postgres driver - queries":

(await driver.close()).expect("driver to close")

asyncTest "check last online":
let timestamp = getNowInNanosecondTime()

let setRes = await driver.setLastOnline(timestamp)
assert setRes.isOk(), setRes.error

let getRes = await driver.getLastOnline()
assert getRes.isOk(), getRes.error

let timeRes = getRes.get()

assert timestamp == timeRes

asyncTest "no content topic":
## Given
const contentTopic = "test-content-topic"
Expand Down
24 changes: 17 additions & 7 deletions tests/waku_archive/test_driver_sqlite_query.nim
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,7 @@ import
std/[options, sequtils, random, algorithm], testutils/unittests, chronos, chronicles

import
waku/[
common/databases/db_sqlite,
waku_archive,
waku_archive/driver/sqlite_driver,
waku_core,
waku_core/message/digest,
],
waku/[waku_archive, waku_core, waku_core/message/digest],
../testlib/common,
../testlib/wakucore,
../waku_archive/archive_utils
Expand All @@ -21,6 +15,22 @@ logScope:
# Initialize the random number generator
common.randomize()

suite "SQLite driver - query last online":
asyncTest "check last online":
let driver = newSqliteArchiveDriver()

let timestamp = getNowInNanosecondTime()

let setRes = await driver.setLastOnline(timestamp)
assert setRes.isOk(), setRes.error

let getRes = await driver.getLastOnline()
assert getRes.isOk(), getRes.error

let timeRes = getRes.get()

assert timestamp == timeRes

suite "SQLite driver - query by content topic":
asyncTest "no content topic":
## Given
Expand Down
37 changes: 34 additions & 3 deletions waku/waku_archive/archive.nim
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import
std/[times, options, sequtils, algorithm],
stew/[results, byteutils],
stew/[byteutils],
chronicles,
chronos,
metrics
Expand All @@ -28,6 +28,8 @@ const
# Metrics reporting
WakuArchiveDefaultMetricsReportInterval* = chronos.minutes(30)

DefaultLastOnlineInterval = chronos.minutes(1)

# Message validation
# 20 seconds maximum allowable sender timestamp "drift"
MaxMessageTimestampVariance* = getNanoSecondTime(20)
Expand All @@ -47,6 +49,9 @@ type WakuArchive* = ref object
retentionPolicyHandle: Future[void]
metricsHandle: Future[void]

lastOnlineInterval: timer.Duration
onlineHandle: Future[void]

proc validate*(msg: WakuMessage): Result[void, string] =
if msg.ephemeral:
# Ephemeral message, do not store
Expand All @@ -70,12 +75,17 @@ proc new*(
driver: ArchiveDriver,
validator: MessageValidator = validate,
retentionPolicy = none(RetentionPolicy),
lastOnlineInterval = DefaultLastOnlineInterval,
): Result[T, string] =
if driver.isNil():
return err("archive driver is Nil")

let archive =
WakuArchive(driver: driver, validator: validator, retentionPolicy: retentionPolicy)
let archive = WakuArchive(
driver: driver,
validator: validator,
retentionPolicy: retentionPolicy,
lastOnlineInterval: lastOnlineInterval,
)

return ok(archive)

Expand Down Expand Up @@ -184,6 +194,12 @@ proc findMessages*(
ArchiveResponse(cursor: cursor, topics: topics, hashes: hashes, messages: messages)
)

proc getLastOnline*(self: WakuArchive): Future[Result[Timestamp, string]] {.async.} =
let time = (await self.driver.getLastOnline()).valueOr:
return err($error)

return ok(time)

proc periodicRetentionPolicy(self: WakuArchive) {.async.} =
let policy = self.retentionPolicy.get()

Expand All @@ -207,11 +223,23 @@ proc periodicMetricReport(self: WakuArchive) {.async.} =

await sleepAsync(WakuArchiveDefaultMetricsReportInterval)

proc periodicSetLastOnline(self: WakuArchive) {.async.} =
## Save a timestamp periodically
## so that a node can know when it was last online
while true:
let ts = getNowInNanosecondTime()

(await self.driver.setLastOnline(ts)).isOkOr:
error "Failed to set last online timestamp", error, time = ts

await sleepAsync(self.lastOnlineInterval)

proc start*(self: WakuArchive) =
if self.retentionPolicy.isSome():
self.retentionPolicyHandle = self.periodicRetentionPolicy()

self.metricsHandle = self.periodicMetricReport()
self.onlineHandle = self.periodicSetLastOnline()

proc stopWait*(self: WakuArchive) {.async.} =
var futures: seq[Future[void]]
Expand All @@ -222,4 +250,7 @@ proc stopWait*(self: WakuArchive) {.async.} =
if not self.metricsHandle.isNil:
futures.add(self.metricsHandle.cancelAndWait())

if not self.onlineHandle.isNil:
futures.add(self.onlineHandle.cancelAndWait())

await noCancel(allFutures(futures))
2 changes: 1 addition & 1 deletion waku/waku_archive/common.nim
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{.push raises: [].}

import std/options, results, stew/byteutils, stew/arrayops, nimcrypto/sha2
import std/options, results
import ../waku_core, ../common/paging

## Public API types
Expand Down
10 changes: 10 additions & 0 deletions waku/waku_archive/driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,13 @@ method existsTable*(
driver: ArchiveDriver, tableName: string
): Future[ArchiveDriverResult[bool]] {.base, async.} =
discard

method setLastOnline*(
driver: ArchiveDriver, time: Timestamp
): Future[ArchiveDriverResult[void]] {.base, async.} =
discard

method getLastOnline*(
driver: ArchiveDriver
): Future[ArchiveDriverResult[Timestamp]] {.base, async.} =
discard
2 changes: 1 addition & 1 deletion waku/waku_archive/driver/postgres_driver/migrations.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import
logScope:
topics = "waku archive migration"

const SchemaVersion* = 6 # increase this when there is an update in the database schema
const SchemaVersion* = 7 # increase this when there is an update in the database schema

proc breakIntoStatements*(script: string): seq[string] =
## Given a full migration script, that can potentially contain a list
Expand Down
24 changes: 24 additions & 0 deletions waku/waku_archive/driver/postgres_driver/postgres_driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ const SelectCursorByHashDef =
"""SELECT timestamp FROM messages
WHERE messageHash = $1"""

const UpdateLastOnlineName = "UpdateLastOnline"
const UpdateLastOnlineDef = """UPDATE last_online SET timestamp = ($1);"""

const DefaultMaxNumConns = 50

proc new*(
Expand Down Expand Up @@ -1402,3 +1405,24 @@ method deleteMessagesOlderThanTimestamp*(
return err("error in deleteMessagesOlderThanTimestamp: " & $error)

return ok()

method setLastOnline*(
self: PostgresDriver, timestamp: Timestamp
): Future[ArchiveDriverResult[void]] {.async.} =
let timestamp = $timestamp

return await self.writeConnPool.runStmt(
UpdateLastOnlineName,
UpdateLastOnlineDef,
@[timestamp],
@[int32(timestamp.len)],
@[int32(0)],
)

method getLastOnline*(
self: PostgresDriver
): Future[ArchiveDriverResult[Timestamp]] {.async.} =
let time = (await self.getInt("SELECT MAX(timestamp) FROM last_online")).valueOr:
return err("error in getLastOnline: " & $error)

return ok(time)
6 changes: 6 additions & 0 deletions waku/waku_archive/driver/queue_driver/queue_driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -355,3 +355,9 @@ method decreaseDatabaseSize*(

method close*(driver: QueueDriver): Future[ArchiveDriverResult[void]] {.async.} =
return ok()

method setLastOnline*(
driver: QueueDriver, time: Timestamp
): Future[ArchiveDriverResult[void]] {.async.} =
## return ok as it only make sense to save a timestamp to disk
return ok()
50 changes: 50 additions & 0 deletions waku/waku_archive/driver/sqlite_driver/queries.nim
Original file line number Diff line number Diff line change
Expand Up @@ -592,3 +592,53 @@ proc selectMessageHashesByStoreQueryWithLimit*(
dbStmt.dispose()

return ok(rows)

proc createLastOnlineTableQuery*(): SqlQueryStr =
return "CREATE TABLE IF NOT EXISTS last_online (timestamp BIGINT NOT NULL);"

proc insertDefaultQuery*(): SqlQueryStr =
return "INSERT INTO last_online VALUES(1);"

proc createLastOnlineTable*(db: SqliteDatabase): DatabaseResult[void] =
var query = createLastOnlineTableQuery()
discard
?db.query(
query,
proc(s: ptr sqlite3_stmt) =
discard
,
)

query = insertDefaultQuery()

discard
?db.query(
query,
proc(s: ptr sqlite3_stmt) =
discard
,
)

return ok()

proc selectLastOnlineQuery*(): SqlQueryStr =
return "SELECT MAX(timestamp) FROM last_online"

proc execLastOnlineQuery*(db: SqliteDatabase): DatabaseResult[Timestamp] =
var timestamp: Timestamp
proc queryRowCallback(s: ptr sqlite3_stmt) =
timestamp = queryRowTimestampCallback(s, 0)

let query = selectLastOnlineQuery()
let res = db.query(query, queryRowCallback)
if res.isErr():
return err("failed to get the last online timestamp from the database")

return ok(timestamp)

proc updateLastOnlineQuery*(): SqlQueryStr =
return "UPDATE last_online SET timestamp = (?);"

proc prepareUpdateLastOnlineStmt*(db: SqliteDatabase): SqliteStmt[(Timestamp), void] =
let query = updateLastOnlineQuery()
return db.prepareStmt(query, (Timestamp), void).expect("this is a valid statement")
33 changes: 26 additions & 7 deletions waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,23 @@ proc init(db: SqliteDatabase): ArchiveDriverResult[void] =
return err("db not initialized")

# Create table, if doesn't exist
let resCreate = createTable(db)
if resCreate.isErr():
return err("failed to create table: " & resCreate.error())
db.createTable().isOkOr:
return err("failed to create table: " & $error)

# Create indices, if don't exist
let resRtIndex = createOldestMessageTimestampIndex(db)
if resRtIndex.isErr():
return err("failed to create i_ts index: " & resRtIndex.error())
db.createOldestMessageTimestampIndex().isOkOr:
return err("failed to create i_ts index: " & $error)

# Create another table, if doesn't exist
db.createLastOnlineTable().isOkOr:
return err("failed to create last online table: " & $error)

return ok()

type SqliteDriver* = ref object of ArchiveDriver
db: SqliteDatabase
insertStmt: SqliteStmt[InsertMessageParams, void]
lastOnlineStmt: SqliteStmt[(Timestamp), void]

proc new*(T: type SqliteDriver, db: SqliteDatabase): ArchiveDriverResult[T] =
# Database initialization
Expand All @@ -43,7 +46,10 @@ proc new*(T: type SqliteDriver, db: SqliteDatabase): ArchiveDriverResult[T] =

# General initialization
let insertStmt = db.prepareInsertMessageStmt()
return ok(SqliteDriver(db: db, insertStmt: insertStmt))
let lastOnlineStmt = db.prepareUpdateLastOnlineStmt()

return
ok(SqliteDriver(db: db, insertStmt: insertStmt, lastOnlineStmt: lastOnlineStmt))

method put*(
s: SqliteDriver,
Expand Down Expand Up @@ -189,3 +195,16 @@ method existsTable*(
s: SqliteDriver, tableName: string
): Future[ArchiveDriverResult[bool]] {.async.} =
return err("existsTable method not implemented in sqlite_driver")

method setLastOnline*(
self: SqliteDriver, time: Timestamp
): Future[ArchiveDriverResult[void]] {.async.} =
return self.lastOnlineStmt.exec((time))

method getLastOnline*(
self: SqliteDriver
): Future[ArchiveDriverResult[Timestamp]] {.async.} =
let time = self.db.execLastOnlineQuery().valueOr:
return err(error)

return ok(time)

0 comments on commit d8364b5

Please sign in to comment.