diff --git a/tests/waku_archive_legacy/test_driver_postgres.nim b/tests/waku_archive_legacy/test_driver_postgres.nim index b83897a331..7657b6e1f1 100644 --- a/tests/waku_archive_legacy/test_driver_postgres.nim +++ b/tests/waku_archive_legacy/test_driver_postgres.nim @@ -4,11 +4,13 @@ import std/[sequtils, options], testutils/unittests, chronos import waku/waku_archive_legacy, waku/waku_archive_legacy/driver/postgres_driver, + waku/waku_archive/driver/postgres_driver as new_postgres_driver, waku/waku_core, waku/waku_core/message/digest, ../testlib/wakucore, ../testlib/testasync, - ../testlib/postgres_legacy + ../testlib/postgres_legacy, + ../testlib/postgres as new_postgres proc computeTestCursor(pubsubTopic: PubsubTopic, message: WakuMessage): ArchiveCursor = ArchiveCursor( @@ -21,22 +23,39 @@ proc computeTestCursor(pubsubTopic: PubsubTopic, message: WakuMessage): ArchiveC suite "Postgres driver": ## Unique driver instance - var driver {.threadvar.}: PostgresDriver + var driver {.threadvar.}: postgres_driver.PostgresDriver + + ## We need to artificially create an instance of the "newDriver" + ## because this is the only one in charge of creating partitions + ## We will clean legacy store soon and this file will get removed. + var newDriver {.threadvar.}: new_postgres_driver.PostgresDriver asyncSetup: - let driverRes = await newTestPostgresDriver() + let driverRes = await postgres_legacy.newTestPostgresDriver() + if driverRes.isErr(): + assert false, driverRes.error + + driver = postgres_driver.PostgresDriver(driverRes.get()) + + let newDriverRes = await new_postgres.newTestPostgresDriver() if driverRes.isErr(): assert false, driverRes.error - driver = PostgresDriver(driverRes.get()) + newDriver = new_postgres_driver.PostgresDriver(newDriverRes.get()) asyncTeardown: - let resetRes = await driver.reset() + var resetRes = await driver.reset() if resetRes.isErr(): assert false, resetRes.error (await driver.close()).expect("driver to close") + resetRes = await newDriver.reset() + if resetRes.isErr(): + assert false, resetRes.error + + (await newDriver.close()).expect("driver to close") + asyncTest "Asynchronous queries": var futures = newSeq[Future[ArchiveDriverResult[void]]](0) diff --git a/tests/waku_archive_legacy/test_driver_postgres_query.nim b/tests/waku_archive_legacy/test_driver_postgres_query.nim index 0889434529..05ccebafc1 100644 --- a/tests/waku_archive_legacy/test_driver_postgres_query.nim +++ b/tests/waku_archive_legacy/test_driver_postgres_query.nim @@ -9,12 +9,15 @@ import waku/waku_archive_legacy, waku/waku_archive_legacy/driver as driver_module, waku/waku_archive_legacy/driver/postgres_driver, + waku/waku_archive/driver/postgres_driver as new_postgres_driver, waku/waku_core, waku/waku_core/message/digest, ../testlib/common, ../testlib/wakucore, ../testlib/testasync, - ../testlib/postgres_legacy + ../testlib/postgres_legacy, + ../testlib/postgres as new_postgres, + ../testlib/testutils logScope: topics = "test archive postgres driver" @@ -36,22 +39,39 @@ proc computeTestCursor(pubsubTopic: PubsubTopic, message: WakuMessage): ArchiveC suite "Postgres driver - queries": ## Unique driver instance - var driver {.threadvar.}: PostgresDriver + var driver {.threadvar.}: postgres_driver.PostgresDriver + + ## We need to artificially create an instance of the "newDriver" + ## because this is the only one in charge of creating partitions + ## We will clean legacy store soon and this file will get removed. + var newDriver {.threadvar.}: new_postgres_driver.PostgresDriver asyncSetup: - let driverRes = await newTestPostgresDriver() + let driverRes = await postgres_legacy.newTestPostgresDriver() + if driverRes.isErr(): + assert false, driverRes.error + + driver = postgres_driver.PostgresDriver(driverRes.get()) + + let newDriverRes = await new_postgres.newTestPostgresDriver() if driverRes.isErr(): assert false, driverRes.error - driver = PostgresDriver(driverRes.get()) + newDriver = new_postgres_driver.PostgresDriver(newDriverRes.get()) asyncTeardown: - let resetRes = await driver.reset() + var resetRes = await driver.reset() if resetRes.isErr(): assert false, resetRes.error (await driver.close()).expect("driver to close") + resetRes = await newDriver.reset() + if resetRes.isErr(): + assert false, resetRes.error + + (await newDriver.close()).expect("driver to close") + asyncTest "no content topic": ## Given const contentTopic = "test-content-topic" @@ -1790,7 +1810,8 @@ suite "Postgres driver - queries": check: filteredMessages.len == 0 - asyncTest "Get oldest and newest message timestamp": + xasyncTest "Get oldest and newest message timestamp": + ## This test no longer makes sense because that will always be controlled by the newDriver const contentTopic = "test-content-topic" let timeOrigin = now() @@ -1842,7 +1863,8 @@ suite "Postgres driver - queries": assert res.isOk(), res.error assert res.get() == newestTime, "Failed to retrieve the newest timestamp" - asyncTest "Delete messages older than certain timestamp": + xasyncTest "Delete messages older than certain timestamp": + ## This test no longer makes sense because that will always be controlled by the newDriver const contentTopic = "test-content-topic" let timeOrigin = now() @@ -1884,7 +1906,8 @@ suite "Postgres driver - queries": assert res.isOk(), res.error assert res.get() == 3, "Failed to retrieve the # of messages after deletion" - asyncTest "Keep last n messages": + xasyncTest "Keep last n messages": + ## This test no longer makes sense because that will always be controlled by the newDriver const contentTopic = "test-content-topic" let timeOrigin = now() diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index 80e59cd987..e681e4f957 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -21,8 +21,6 @@ import ../waku_archive/retention_policy/builder as policy_builder, ../waku_archive/driver as driver, ../waku_archive/driver/builder as driver_builder, - ../waku_archive_legacy/retention_policy as legacy_policy, - ../waku_archive_legacy/retention_policy/builder as legacy_policy_builder, ../waku_archive_legacy/driver as legacy_driver, ../waku_archive_legacy/driver/builder as legacy_driver_builder, ../waku_store, @@ -232,13 +230,7 @@ proc setupProtocols( if archiveDriverRes.isErr(): return err("failed to setup legacy archive driver: " & archiveDriverRes.error) - let retPolicyRes = - legacy_policy.RetentionPolicy.new(conf.storeMessageRetentionPolicy) - if retPolicyRes.isErr(): - return err("failed to create retention policy: " & retPolicyRes.error) - - let mountArcRes = - node.mountLegacyArchive(archiveDriverRes.get(), retPolicyRes.get()) + let mountArcRes = node.mountLegacyArchive(archiveDriverRes.get()) if mountArcRes.isErr(): return err("failed to mount waku legacy archive protocol: " & mountArcRes.error) diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 3a1e0ba455..2a6f5a42e0 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -701,17 +701,11 @@ proc mountArchive*( return ok() proc mountLegacyArchive*( - node: WakuNode, - driver: waku_archive_legacy.ArchiveDriver, - retentionPolicy = none(waku_archive_legacy.RetentionPolicy), + node: WakuNode, driver: waku_archive_legacy.ArchiveDriver ): Result[void, string] = - node.wakuLegacyArchive = waku_archive_legacy.WakuArchive.new( - driver = driver, retentionPolicy = retentionPolicy - ).valueOr: + node.wakuLegacyArchive = waku_archive_legacy.WakuArchive.new(driver = driver).valueOr: return err("error in mountLegacyArchive: " & error) - node.wakuLegacyArchive.start() - return ok() ## Legacy Waku Store diff --git a/waku/waku_archive_legacy.nim b/waku/waku_archive_legacy.nim index e1d7df776a..bcb6b6a54a 100644 --- a/waku/waku_archive_legacy.nim +++ b/waku/waku_archive_legacy.nim @@ -1,7 +1,6 @@ import ./waku_archive_legacy/common, ./waku_archive_legacy/archive, - ./waku_archive_legacy/driver, - ./waku_archive_legacy/retention_policy + ./waku_archive_legacy/driver -export common, archive, driver, retention_policy +export common, archive, driver diff --git a/waku/waku_archive_legacy/archive.nim b/waku/waku_archive_legacy/archive.nim index adef8bdff4..690257ff59 100644 --- a/waku/waku_archive_legacy/archive.nim +++ b/waku/waku_archive_legacy/archive.nim @@ -12,7 +12,6 @@ import import ../common/paging, ./driver, - ./retention_policy, ../waku_core, ../waku_core/message/digest, ./common, @@ -45,11 +44,6 @@ type WakuArchive* = ref object validator: MessageValidator - retentionPolicy: Option[RetentionPolicy] - - retentionPolicyHandle: Future[void] - metricsHandle: Future[void] - proc validate*(msg: WakuMessage): Result[void, string] = if msg.ephemeral: # Ephemeral message, do not store @@ -72,16 +66,12 @@ proc validate*(msg: WakuMessage): Result[void, string] = return ok() proc new*( - T: type WakuArchive, - driver: ArchiveDriver, - validator: MessageValidator = validate, - retentionPolicy = none(RetentionPolicy), + T: type WakuArchive, driver: ArchiveDriver, validator: MessageValidator = validate ): Result[T, string] = if driver.isNil(): return err("archive driver is Nil") - let archive = - WakuArchive(driver: driver, validator: validator, retentionPolicy: retentionPolicy) + let archive = WakuArchive(driver: driver, validator: validator) return ok(archive) @@ -280,44 +270,3 @@ proc findMessagesV2*( reverse(messages) return ok(ArchiveResponse(messages: messages, cursor: cursor)) - -proc periodicRetentionPolicy(self: WakuArchive) {.async.} = - debug "executing message retention policy" - - let policy = self.retentionPolicy.get() - - while true: - (await policy.execute(self.driver)).isOkOr: - waku_legacy_archive_errors.inc(labelValues = [retPolicyFailure]) - error "failed execution of retention policy", error = error - - await sleepAsync(WakuArchiveDefaultRetentionPolicyInterval) - -proc periodicMetricReport(self: WakuArchive) {.async.} = - while true: - let countRes = (await self.driver.getMessagesCount()) - if countRes.isErr(): - error "loopReportStoredMessagesMetric failed to get messages count", - error = countRes.error - else: - let count = countRes.get() - waku_legacy_archive_messages.set(count, labelValues = ["stored"]) - - await sleepAsync(WakuArchiveDefaultMetricsReportInterval) - -proc start*(self: WakuArchive) = - if self.retentionPolicy.isSome(): - self.retentionPolicyHandle = self.periodicRetentionPolicy() - - self.metricsHandle = self.periodicMetricReport() - -proc stopWait*(self: WakuArchive) {.async.} = - var futures: seq[Future[void]] - - if self.retentionPolicy.isSome() and not self.retentionPolicyHandle.isNil(): - futures.add(self.retentionPolicyHandle.cancelAndWait()) - - if not self.metricsHandle.isNil: - futures.add(self.metricsHandle.cancelAndWait()) - - await noCancel(allFutures(futures)) diff --git a/waku/waku_archive_legacy/driver/builder.nim b/waku/waku_archive_legacy/driver/builder.nim index c05e25eec8..77bd46fa44 100644 --- a/waku/waku_archive_legacy/driver/builder.nim +++ b/waku/waku_archive_legacy/driver/builder.nim @@ -17,7 +17,6 @@ export sqlite_driver, queue_driver when defined(postgres): import ## These imports add dependency with an external libpq library - ./postgres_driver/migrations as archive_postgres_driver_migrations, ./postgres_driver export postgres_driver @@ -94,26 +93,6 @@ proc new*( return err("failed to init postgres archive driver: " & res.error) let driver = res.get() - - # Database migration - if migrate: - let migrateRes = await archive_postgres_driver_migrations.migrate(driver) - if migrateRes.isErr(): - return err("ArchiveDriver build failed in migration: " & $migrateRes.error) - - ## This should be started once we make sure the 'messages' table exists - ## Hence, this should be run after the migration is completed. - asyncSpawn driver.startPartitionFactory(onFatalErrorAction) - - info "waiting for a partition to be created" - for i in 0 ..< 100: - if driver.containsAnyPartition(): - break - await sleepAsync(chronos.milliseconds(100)) - - if not driver.containsAnyPartition(): - onFatalErrorAction("a partition could not be created") - return ok(driver) else: return err( diff --git a/waku/waku_archive_legacy/driver/postgres_driver.nim b/waku/waku_archive_legacy/driver/postgres_driver.nim index a106eb2c40..496005cbec 100644 --- a/waku/waku_archive_legacy/driver/postgres_driver.nim +++ b/waku/waku_archive_legacy/driver/postgres_driver.nim @@ -3,9 +3,6 @@ when (NimMajor, NimMinor) < (1, 4): else: {.push raises: [].} -import - ./postgres_driver/postgres_driver, - ./postgres_driver/partitions_manager, - ./postgres_driver/postgres_healthcheck +import ./postgres_driver/postgres_driver -export postgres_driver, partitions_manager, postgres_healthcheck +export postgres_driver diff --git a/waku/waku_archive_legacy/driver/postgres_driver/migrations.nim b/waku/waku_archive_legacy/driver/postgres_driver/migrations.nim deleted file mode 100644 index e404d46482..0000000000 --- a/waku/waku_archive_legacy/driver/postgres_driver/migrations.nim +++ /dev/null @@ -1,89 +0,0 @@ -{.push raises: [].} - -import std/strutils, results, chronicles, chronos -import - ../../../common/databases/common, - ../../../../migrations/message_store_postgres/pg_migration_manager, - ../postgres_driver - -logScope: - topics = "waku archive migration" - -const SchemaVersion* = 6 # increase this when there is an update in the database schema - -proc breakIntoStatements*(script: string): seq[string] = - ## Given a full migration script, that can potentially contain a list - ## of SQL statements, this proc splits it into the contained isolated statements - ## that should be executed one after the other. - var statements = newSeq[string]() - - let lines = script.split('\n') - - var simpleStmt: string - var plSqlStatement: string - var insidePlSqlScript = false - for line in lines: - if line.strip().len == 0: - continue - - if insidePlSqlScript: - if line.contains("END $$"): - ## End of the Pl/SQL script - plSqlStatement &= line - statements.add(plSqlStatement) - plSqlStatement = "" - insidePlSqlScript = false - continue - else: - plSqlStatement &= line & "\n" - - if line.contains("DO $$"): - ## Beginning of the Pl/SQL script - insidePlSqlScript = true - plSqlStatement &= line & "\n" - - if not insidePlSqlScript: - if line.contains(';'): - ## End of simple statement - simpleStmt &= line - statements.add(simpleStmt) - simpleStmt = "" - else: - simpleStmt &= line & "\n" - - return statements - -proc migrate*( - driver: PostgresDriver, targetVersion = SchemaVersion -): Future[DatabaseResult[void]] {.async.} = - debug "starting message store's postgres database migration" - - let currentVersion = (await driver.getCurrentVersion()).valueOr: - return err("migrate error could not retrieve current version: " & $error) - - if currentVersion == targetVersion: - debug "database schema is up to date", - currentVersion = currentVersion, targetVersion = targetVersion - return ok() - - info "database schema is outdated", - currentVersion = currentVersion, targetVersion = targetVersion - - # Load migration scripts - let scripts = pg_migration_manager.getMigrationScripts(currentVersion, targetVersion) - - # Run the migration scripts - for script in scripts: - for statement in script.breakIntoStatements(): - debug "executing migration statement", statement = statement - - (await driver.performWriteQuery(statement)).isOkOr: - error "failed to execute migration statement", - statement = statement, error = error - return err("failed to execute migration statement") - - debug "migration statement executed succesfully", statement = statement - - debug "finished message store's postgres database migration" - - return ok() diff --git a/waku/waku_archive_legacy/driver/postgres_driver/partitions_manager.nim b/waku/waku_archive_legacy/driver/postgres_driver/partitions_manager.nim deleted file mode 100644 index 52a01cef85..0000000000 --- a/waku/waku_archive_legacy/driver/postgres_driver/partitions_manager.nim +++ /dev/null @@ -1,102 +0,0 @@ -## This module is aimed to handle the creation and truncation of partition tables -## in order to limit the space occupied in disk by the database. -## -## The created partitions are referenced by the 'storedAt' field. -## - -import std/deques -import chronos, chronicles - -logScope: - topics = "waku archive partitions_manager" - -## The time range has seconds resolution -type TimeRange* = tuple[beginning: int64, `end`: int64] - -type - Partition = object - name: string - timeRange: TimeRange - - PartitionManager* = ref object - partitions: Deque[Partition] - # FIFO of partition table names. The first is the oldest partition - -proc new*(T: type PartitionManager): T = - return PartitionManager() - -proc getPartitionFromDateTime*( - self: PartitionManager, targetMoment: int64 -): Result[Partition, string] = - ## Returns the partition name that might store a message containing the passed timestamp. - ## In order words, it simply returns the partition name which contains the given timestamp. - ## targetMoment - represents the time of interest, measured in seconds since epoch. - - if self.partitions.len == 0: - return err("There are no partitions") - - for partition in self.partitions: - let timeRange = partition.timeRange - - let beginning = timeRange.beginning - let `end` = timeRange.`end` - - if beginning <= targetMoment and targetMoment < `end`: - return ok(partition) - - return err("Couldn't find a partition table for given time: " & $targetMoment) - -proc getNewestPartition*(self: PartitionManager): Result[Partition, string] = - if self.partitions.len == 0: - return err("there are no partitions allocated") - - let newestPartition = self.partitions.peekLast - return ok(newestPartition) - -proc getOldestPartition*(self: PartitionManager): Result[Partition, string] = - if self.partitions.len == 0: - return err("there are no partitions allocated") - - let oldestPartition = self.partitions.peekFirst - return ok(oldestPartition) - -proc addPartitionInfo*( - self: PartitionManager, partitionName: string, beginning: int64, `end`: int64 -) = - ## The given partition range has seconds resolution. - ## We just store information of the new added partition merely to keep track of it. - let partitionInfo = Partition(name: partitionName, timeRange: (beginning, `end`)) - trace "Adding partition info" - self.partitions.addLast(partitionInfo) - -proc removeOldestPartitionName*(self: PartitionManager) = - ## Simply removed the partition from the tracked/known partitions queue. - ## Just remove it and ignore it. - discard self.partitions.popFirst() - -proc isEmpty*(self: PartitionManager): bool = - return self.partitions.len == 0 - -proc getLastMoment*(partition: Partition): int64 = - ## Considering the time range covered by the partition, this - ## returns the `end` time (number of seconds since epoch) of such range. - let lastTimeInSec = partition.timeRange.`end` - return lastTimeInSec - -proc getPartitionStartTimeInNanosec*(partition: Partition): int64 = - return partition.timeRange.beginning * 1_000_000_000 - -proc containsMoment*(partition: Partition, time: int64): bool = - ## Returns true if the given moment is contained within the partition window, - ## 'false' otherwise. - ## time - number of seconds since epoch - if partition.timeRange.beginning <= time and time < partition.timeRange.`end`: - return true - - return false - -proc getName*(partition: Partition): string = - return partition.name - -func `==`*(a, b: Partition): bool {.inline.} = - return a.name == b.name diff --git a/waku/waku_archive_legacy/driver/postgres_driver/postgres_driver.nim b/waku/waku_archive_legacy/driver/postgres_driver/postgres_driver.nim index 6895813f35..8f76c4d2da 100644 --- a/waku/waku_archive_legacy/driver/postgres_driver/postgres_driver.nim +++ b/waku/waku_archive_legacy/driver/postgres_driver/postgres_driver.nim @@ -15,19 +15,13 @@ import ../../../waku_core, ../../common, ../../driver, - ../../../common/databases/db_postgres as waku_postgres, - ./postgres_healthcheck, - ./partitions_manager + ../../../common/databases/db_postgres as waku_postgres type PostgresDriver* = ref object of ArchiveDriver ## Establish a separate pools for read/write operations writeConnPool: PgAsyncPool readConnPool: PgAsyncPool - ## Partition container - partitionMngr: PartitionManager - futLoopPartitionFactory: Future[void] - const InsertRowStmtName = "InsertRow" const InsertRowStmtDefinition = # TODO: get the sql queries from a file """INSERT INTO messages (id, messageHash, contentTopic, payload, pubsubTopic, @@ -134,17 +128,7 @@ proc new*( let writeConnPool = PgAsyncPool.new(dbUrl, maxNumConnOnEachPool).valueOr: return err("error creating write conn pool PgAsyncPool") - if not isNil(onFatalErrorAction): - asyncSpawn checkConnectivity(readConnPool, onFatalErrorAction) - - if not isNil(onFatalErrorAction): - asyncSpawn checkConnectivity(writeConnPool, onFatalErrorAction) - - let driver = PostgresDriver( - writeConnPool: writeConnPool, - readConnPool: readConnPool, - partitionMngr: PartitionManager.new(), - ) + let driver = PostgresDriver(writeConnPool: writeConnPool, readConnPool: readConnPool) return ok(driver) proc reset*(s: PostgresDriver): Future[ArchiveDriverResult[void]] {.async.} = @@ -267,38 +251,6 @@ method getAllMessages*( return ok(rows) -proc getPartitionsList( - s: PostgresDriver -): Future[ArchiveDriverResult[seq[string]]] {.async.} = - ## Retrieves the seq of partition table names. - ## e.g: @["messages_1708534333_1708534393", "messages_1708534273_1708534333"] - - var partitions: seq[string] - proc rowCallback(pqResult: ptr PGresult) = - for iRow in 0 ..< pqResult.pqNtuples(): - let partitionName = $(pqgetvalue(pqResult, iRow, 0)) - partitions.add(partitionName) - - ( - await s.readConnPool.pgQuery( - """ - SELECT child.relname AS partition_name - FROM pg_inherits - JOIN pg_class parent ON pg_inherits.inhparent = parent.oid - JOIN pg_class child ON pg_inherits.inhrelid = child.oid - JOIN pg_namespace nmsp_parent ON nmsp_parent.oid = parent.relnamespace - JOIN pg_namespace nmsp_child ON nmsp_child.oid = child.relnamespace - WHERE parent.relname='messages' - ORDER BY partition_name ASC - """, - newSeq[string](0), - rowCallback, - ) - ).isOkOr: - return err("getPartitionsList failed in query: " & $error) - - return ok(partitions) - proc getMessagesArbitraryQuery( s: PostgresDriver, contentTopic: seq[ContentTopic] = @[], @@ -764,20 +716,7 @@ method getMessagesCount*( method getOldestMessageTimestamp*( s: PostgresDriver ): Future[ArchiveDriverResult[Timestamp]] {.async.} = - ## In some cases it could happen that we have - ## empty partitions which are older than the current stored rows. - ## In those cases we want to consider those older partitions as the oldest considered timestamp. - let oldestPartition = s.partitionMngr.getOldestPartition().valueOr: - return err("could not get oldest partition: " & $error) - - let oldestPartitionTimeNanoSec = oldestPartition.getPartitionStartTimeInNanosec() - - let intRes = await s.getInt("SELECT MIN(timestamp) FROM messages") - if intRes.isErr(): - ## Just return the oldest partition time considering the partitions set - return ok(Timestamp(oldestPartitionTimeNanoSec)) - - return ok(Timestamp(min(intRes.get(), oldestPartitionTimeNanoSec))) + return err("not implemented because legacy will get deprecated") method getNewestMessageTimestamp*( s: PostgresDriver @@ -791,22 +730,20 @@ method getNewestMessageTimestamp*( method deleteOldestMessagesNotWithinLimit*( s: PostgresDriver, limit: int ): Future[ArchiveDriverResult[void]] {.async.} = - let execRes = await s.writeConnPool.pgQuery( - """DELETE FROM messages WHERE id NOT IN - ( - SELECT id FROM messages ORDER BY timestamp DESC LIMIT ? - );""", - @[$limit], - ) - if execRes.isErr(): - return err("error in deleteOldestMessagesNotWithinLimit: " & execRes.error) + ## Will be completely removed when deprecating store legacy + # let execRes = await s.writeConnPool.pgQuery( + # """DELETE FROM messages WHERE id NOT IN + # ( + # SELECT id FROM messages ORDER BY timestamp DESC LIMIT ? + # );""", + # @[$limit], + # ) + # if execRes.isErr(): + # return err("error in deleteOldestMessagesNotWithinLimit: " & execRes.error) return ok() method close*(s: PostgresDriver): Future[ArchiveDriverResult[void]] {.async.} = - ## Cancel the partition factory loop - s.futLoopPartitionFactory.cancel() - ## Close the database connection let writeCloseRes = await s.writeConnPool.close() let readCloseRes = await s.readConnPool.close() @@ -850,250 +787,41 @@ proc performWriteQuery*( return ok() -proc addPartition( - self: PostgresDriver, startTime: Timestamp, duration: timer.Duration -): Future[ArchiveDriverResult[void]] {.async.} = - ## Creates a partition table that will store the messages that fall in the range - ## `startTime` <= timestamp < `startTime + duration`. - ## `startTime` is measured in seconds since epoch - - let beginning = startTime - let `end` = (startTime + duration.seconds) - - let fromInSec: string = $beginning - let untilInSec: string = $`end` - - let fromInNanoSec: string = fromInSec & "000000000" - let untilInNanoSec: string = untilInSec & "000000000" - - let partitionName = "messages_" & fromInSec & "_" & untilInSec - - let createPartitionQuery = - "CREATE TABLE IF NOT EXISTS " & partitionName & " PARTITION OF " & - "messages FOR VALUES FROM ('" & fromInNanoSec & "') TO ('" & untilInNanoSec & "');" - - (await self.performWriteQuery(createPartitionQuery)).isOkOr: - return err(fmt"error adding partition [{partitionName}]: " & $error) - - debug "new partition added", query = createPartitionQuery - - self.partitionMngr.addPartitionInfo(partitionName, beginning, `end`) - return ok() - -proc initializePartitionsInfo( - self: PostgresDriver -): Future[ArchiveDriverResult[void]] {.async.} = - let partitionNamesRes = await self.getPartitionsList() - if not partitionNamesRes.isOk(): - return err("Could not retrieve partitions list: " & $partitionNamesRes.error) - else: - let partitionNames = partitionNamesRes.get() - for partitionName in partitionNames: - ## partitionName contains something like 'messages_1708449815_1708449875' - let bothTimes = partitionName.replace("messages_", "") - let times = bothTimes.split("_") - if times.len != 2: - return err(fmt"loopPartitionFactory wrong partition name {partitionName}") - - var beginning: int64 - try: - beginning = parseInt(times[0]) - except ValueError: - return err("Could not parse beginning time: " & getCurrentExceptionMsg()) - - var `end`: int64 - try: - `end` = parseInt(times[1]) - except ValueError: - return err("Could not parse end time: " & getCurrentExceptionMsg()) - - self.partitionMngr.addPartitionInfo(partitionName, beginning, `end`) - - return ok() - -const DefaultDatabasePartitionCheckTimeInterval = timer.minutes(10) -const PartitionsRangeInterval = timer.hours(1) ## Time range covered by each parition - -proc loopPartitionFactory( - self: PostgresDriver, onFatalError: OnFatalErrorHandler -) {.async.} = - ## Loop proc that continuously checks whether we need to create a new partition. - ## Notice that the deletion of partitions is handled by the retention policy modules. - - debug "starting loopPartitionFactory" - - if PartitionsRangeInterval < DefaultDatabasePartitionCheckTimeInterval: - onFatalError( - "partition factory partition range interval should be bigger than check interval" - ) - - ## First of all, let's make the 'partition_manager' aware of the current partitions - (await self.initializePartitionsInfo()).isOkOr: - onFatalError("issue in loopPartitionFactory: " & $error) - - while true: - trace "Check if we need to create a new partition" - - let now = times.now().toTime().toUnix() - - if self.partitionMngr.isEmpty(): - debug "adding partition because now there aren't more partitions" - (await self.addPartition(now, PartitionsRangeInterval)).isOkOr: - onFatalError("error when creating a new partition from empty state: " & $error) - else: - let newestPartitionRes = self.partitionMngr.getNewestPartition() - if newestPartitionRes.isErr(): - onFatalError("could not get newest partition: " & $newestPartitionRes.error) - - let newestPartition = newestPartitionRes.get() - if newestPartition.containsMoment(now): - debug "creating a new partition for the future" - ## The current used partition is the last one that was created. - ## Thus, let's create another partition for the future. - - ( - await self.addPartition( - newestPartition.getLastMoment(), PartitionsRangeInterval - ) - ).isOkOr: - onFatalError("could not add the next partition for 'now': " & $error) - elif now >= newestPartition.getLastMoment(): - debug "creating a new partition to contain current messages" - ## There is no partition to contain the current time. - ## This happens if the node has been stopped for quite a long time. - ## Then, let's create the needed partition to contain 'now'. - (await self.addPartition(now, PartitionsRangeInterval)).isOkOr: - onFatalError("could not add the next partition: " & $error) - - await sleepAsync(DefaultDatabasePartitionCheckTimeInterval) - -proc startPartitionFactory*( - self: PostgresDriver, onFatalError: OnFatalErrorHandler -) {.async.} = - self.futLoopPartitionFactory = self.loopPartitionFactory(onFatalError) - -proc getTableSize*( - self: PostgresDriver, tableName: string -): Future[ArchiveDriverResult[string]] {.async.} = - ## Returns a human-readable representation of the size for the requested table. - ## tableName - table of interest. - - let tableSize = ( - await self.getStr( - fmt""" - SELECT pg_size_pretty(pg_total_relation_size(C.oid)) AS "total_size" - FROM pg_class C - where relname = '{tableName}'""" - ) - ).valueOr: - return err("error in getDatabaseSize: " & error) - - return ok(tableSize) - -proc removePartition( - self: PostgresDriver, partitionName: string -): Future[ArchiveDriverResult[void]] {.async.} = - var partSize = "" - let partSizeRes = await self.getTableSize(partitionName) - if partSizeRes.isOk(): - partSize = partSizeRes.get() - - ## Detach and remove the partition concurrently to not block the parent table (messages) - let detachPartitionQuery = - "ALTER TABLE messages DETACH PARTITION " & partitionName & " CONCURRENTLY;" - debug "removeOldestPartition", query = detachPartitionQuery - (await self.performWriteQuery(detachPartitionQuery)).isOkOr: - return err(fmt"error in {detachPartitionQuery}: " & $error) - - ## Drop the partition - let dropPartitionQuery = "DROP TABLE " & partitionName - debug "removeOldestPartition drop partition", query = dropPartitionQuery - (await self.performWriteQuery(dropPartitionQuery)).isOkOr: - return err(fmt"error in {dropPartitionQuery}: " & $error) - - debug "removed partition", partition_name = partitionName, partition_size = partSize - self.partitionMngr.removeOldestPartitionName() - - return ok() - -proc removePartitionsOlderThan( - self: PostgresDriver, tsInNanoSec: Timestamp -): Future[ArchiveDriverResult[void]] {.async.} = - ## Removes old partitions that don't contain the specified timestamp - - let tsInSec = Timestamp(float(tsInNanoSec) / 1_000_000_000) - - var oldestPartition = self.partitionMngr.getOldestPartition().valueOr: - return err("could not get oldest partition in removePartitionOlderThan: " & $error) - - while not oldestPartition.containsMoment(tsInSec): - (await self.removePartition(oldestPartition.getName())).isOkOr: - return err("issue in removePartitionsOlderThan: " & $error) - - oldestPartition = self.partitionMngr.getOldestPartition().valueOr: - return err( - "could not get partition in removePartitionOlderThan in while loop: " & $error - ) - - ## We reached the partition that contains the target timestamp plus don't want to remove it - return ok() - -proc removeOldestPartition( - self: PostgresDriver, forceRemoval: bool = false, ## To allow cleanup in tests -): Future[ArchiveDriverResult[void]] {.async.} = - ## Indirectly called from the retention policy - - let oldestPartition = self.partitionMngr.getOldestPartition().valueOr: - return err("could not remove oldest partition: " & $error) - - if not forceRemoval: - let now = times.now().toTime().toUnix() - let currentPartitionRes = self.partitionMngr.getPartitionFromDateTime(now) - if currentPartitionRes.isOk(): - ## The database contains a partition that would store current messages. - - if currentPartitionRes.get() == oldestPartition: - debug "Skipping to remove the current partition" - return ok() - - return await self.removePartition(oldestPartition.getName()) - -proc containsAnyPartition*(self: PostgresDriver): bool = - return not self.partitionMngr.isEmpty() - method decreaseDatabaseSize*( driver: PostgresDriver, targetSizeInBytes: int64, forceRemoval: bool = false ): Future[ArchiveDriverResult[void]] {.async.} = - var dbSize = (await driver.getDatabaseSize()).valueOr: - return err("decreaseDatabaseSize failed to get database size: " & $error) + ## This is completely disabled and only the non-legacy driver + ## will take care of that + # var dbSize = (await driver.getDatabaseSize()).valueOr: + # return err("decreaseDatabaseSize failed to get database size: " & $error) - ## database size in bytes - var totalSizeOfDB: int64 = int64(dbSize) + # ## database size in bytes + # var totalSizeOfDB: int64 = int64(dbSize) - if totalSizeOfDB <= targetSizeInBytes: - return ok() + # if totalSizeOfDB <= targetSizeInBytes: + # return ok() - debug "start reducing database size", - targetSize = $targetSizeInBytes, currentSize = $totalSizeOfDB + # debug "start reducing database size", + # targetSize = $targetSizeInBytes, currentSize = $totalSizeOfDB - while totalSizeOfDB > targetSizeInBytes and driver.containsAnyPartition(): - (await driver.removeOldestPartition(forceRemoval)).isOkOr: - return err( - "decreaseDatabaseSize inside loop failed to remove oldest partition: " & $error - ) + # while totalSizeOfDB > targetSizeInBytes and driver.containsAnyPartition(): + # (await driver.removeOldestPartition(forceRemoval)).isOkOr: + # return err( + # "decreaseDatabaseSize inside loop failed to remove oldest partition: " & $error + # ) - dbSize = (await driver.getDatabaseSize()).valueOr: - return - err("decreaseDatabaseSize inside loop failed to get database size: " & $error) + # dbSize = (await driver.getDatabaseSize()).valueOr: + # return + # err("decreaseDatabaseSize inside loop failed to get database size: " & $error) - let newCurrentSize = int64(dbSize) - if newCurrentSize == totalSizeOfDB: - return err("the previous partition removal didn't clear database size") + # let newCurrentSize = int64(dbSize) + # if newCurrentSize == totalSizeOfDB: + # return err("the previous partition removal didn't clear database size") - totalSizeOfDB = newCurrentSize + # totalSizeOfDB = newCurrentSize - debug "reducing database size", - targetSize = $targetSizeInBytes, newCurrentSize = $totalSizeOfDB + # debug "reducing database size", + # targetSize = $targetSizeInBytes, newCurrentSize = $totalSizeOfDB return ok() @@ -1146,14 +874,14 @@ method deleteMessagesOlderThanTimestamp*( ): Future[ArchiveDriverResult[void]] {.async.} = ## First of all, let's remove the older partitions so that we can reduce ## the database size. - (await s.removePartitionsOlderThan(tsNanoSec)).isOkOr: - return err("error while removing older partitions: " & $error) - - ( - await s.writeConnPool.pgQuery( - "DELETE FROM messages WHERE timestamp < " & $tsNanoSec - ) - ).isOkOr: - return err("error in deleteMessagesOlderThanTimestamp: " & $error) + # (await s.removePartitionsOlderThan(tsNanoSec)).isOkOr: + # return err("error while removing older partitions: " & $error) + + # ( + # await s.writeConnPool.pgQuery( + # "DELETE FROM messages WHERE timestamp < " & $tsNanoSec + # ) + # ).isOkOr: + # return err("error in deleteMessagesOlderThanTimestamp: " & $error) return ok() diff --git a/waku/waku_archive_legacy/driver/postgres_driver/postgres_healthcheck.nim b/waku/waku_archive_legacy/driver/postgres_driver/postgres_healthcheck.nim deleted file mode 100644 index ff9dff8f79..0000000000 --- a/waku/waku_archive_legacy/driver/postgres_driver/postgres_healthcheck.nim +++ /dev/null @@ -1,41 +0,0 @@ -when (NimMajor, NimMinor) < (1, 4): - {.push raises: [Defect].} -else: - {.push raises: [].} - -import chronos, results -import ../../../common/databases/db_postgres, ../../../common/error_handling - -## Simple query to validate that the postgres is working and attending requests -const HealthCheckQuery = "SELECT version();" -const CheckConnectivityInterval = 60.seconds -const MaxNumTrials = 20 -const TrialInterval = 1.seconds - -proc checkConnectivity*( - connPool: PgAsyncPool, onFatalErrorAction: OnFatalErrorHandler -) {.async.} = - while true: - (await connPool.pgQuery(HealthCheckQuery)).isOkOr: - ## The connection failed once. Let's try reconnecting for a while. - ## Notice that the 'exec' proc tries to establish a new connection. - - block errorBlock: - ## Force close all the opened connections. No need to close gracefully. - (await connPool.resetConnPool()).isOkOr: - onFatalErrorAction("checkConnectivity resetConnPool error: " & error) - - var numTrial = 0 - while numTrial < MaxNumTrials: - let res = await connPool.pgQuery(HealthCheckQuery) - if res.isOk(): - ## Connection resumed. Let's go back to the normal healthcheck. - break errorBlock - - await sleepAsync(TrialInterval) - numTrial.inc() - - ## The connection couldn't be resumed. Let's inform the upper layers. - onFatalErrorAction("postgres health check error: " & error) - - await sleepAsync(CheckConnectivityInterval) diff --git a/waku/waku_archive_legacy/retention_policy.nim b/waku/waku_archive_legacy/retention_policy.nim deleted file mode 100644 index 26916d0dda..0000000000 --- a/waku/waku_archive_legacy/retention_policy.nim +++ /dev/null @@ -1,16 +0,0 @@ -when (NimMajor, NimMinor) < (1, 4): - {.push raises: [Defect].} -else: - {.push raises: [].} - -import results, chronos -import ./driver - -type RetentionPolicyResult*[T] = Result[T, string] - -type RetentionPolicy* = ref object of RootObj - -method execute*( - p: RetentionPolicy, store: ArchiveDriver -): Future[RetentionPolicyResult[void]] {.base, async.} = - discard diff --git a/waku/waku_archive_legacy/retention_policy/builder.nim b/waku/waku_archive_legacy/retention_policy/builder.nim deleted file mode 100644 index b7469220f4..0000000000 --- a/waku/waku_archive_legacy/retention_policy/builder.nim +++ /dev/null @@ -1,88 +0,0 @@ -when (NimMajor, NimMinor) < (1, 4): - {.push raises: [Defect].} -else: - {.push raises: [].} - -import std/[strutils, options], regex, results -import - ../retention_policy, - ./retention_policy_time, - ./retention_policy_capacity, - ./retention_policy_size - -proc new*( - T: type RetentionPolicy, retPolicy: string -): RetentionPolicyResult[Option[RetentionPolicy]] = - let retPolicy = retPolicy.toLower - - # Validate the retention policy format - if retPolicy == "" or retPolicy == "none": - return ok(none(RetentionPolicy)) - - const StoreMessageRetentionPolicyRegex = re2"^\w+:\d*\.?\d+((g|m)b)?$" - if not retPolicy.match(StoreMessageRetentionPolicyRegex): - return err("invalid 'store message retention policy' format: " & retPolicy) - - # Apply the retention policy, if any - let rententionPolicyParts = retPolicy.split(":", 1) - let - policy = rententionPolicyParts[0] - policyArgs = rententionPolicyParts[1] - - if policy == "time": - var retentionTimeSeconds: int64 - try: - retentionTimeSeconds = parseInt(policyArgs) - except ValueError: - return err("invalid time retention policy argument") - - let retPolicy: RetentionPolicy = TimeRetentionPolicy.new(retentionTimeSeconds) - return ok(some(retPolicy)) - elif policy == "capacity": - var retentionCapacity: int - try: - retentionCapacity = parseInt(policyArgs) - except ValueError: - return err("invalid capacity retention policy argument") - - let retPolicy: RetentionPolicy = CapacityRetentionPolicy.new(retentionCapacity) - return ok(some(retPolicy)) - elif policy == "size": - var retentionSize: string - retentionSize = policyArgs - - # captures the size unit such as GB or MB - let sizeUnit = retentionSize.substr(retentionSize.len - 2) - # captures the string type number data of the size provided - let sizeQuantityStr = retentionSize.substr(0, retentionSize.len - 3) - # to hold the numeric value data of size - var inptSizeQuantity: float - var sizeQuantity: int64 - var sizeMultiplier: float - - try: - inptSizeQuantity = parseFloat(sizeQuantityStr) - except ValueError: - return err("invalid size retention policy argument: " & getCurrentExceptionMsg()) - - case sizeUnit - of "gb": - sizeMultiplier = 1024.0 * 1024.0 * 1024.0 - of "mb": - sizeMultiplier = 1024.0 * 1024.0 - else: - return err ( - """invalid size retention value unit: expected "Mb" or "Gb" but got """ & - sizeUnit - ) - - # quantity is converted into bytes for uniform processing - sizeQuantity = int64(inptSizeQuantity * sizeMultiplier) - - if sizeQuantity <= 0: - return err("invalid size retention policy argument: a non-zero value is required") - - let retPolicy: RetentionPolicy = SizeRetentionPolicy.new(sizeQuantity) - return ok(some(retPolicy)) - else: - return err("unknown retention policy") diff --git a/waku/waku_archive_legacy/retention_policy/retention_policy_capacity.nim b/waku/waku_archive_legacy/retention_policy/retention_policy_capacity.nim deleted file mode 100644 index e679e9f167..0000000000 --- a/waku/waku_archive_legacy/retention_policy/retention_policy_capacity.nim +++ /dev/null @@ -1,68 +0,0 @@ -when (NimMajor, NimMinor) < (1, 4): - {.push raises: [Defect].} -else: - {.push raises: [].} - -import results, chronicles, chronos -import ../driver, ../retention_policy - -logScope: - topics = "waku archive retention_policy" - -const DefaultCapacity*: int = 25_000 - -const MaxOverflow = 1.3 - -type - # CapacityRetentionPolicy implements auto deletion as follows: - # - The sqlite DB will driver up to `totalCapacity = capacity` * `MaxOverflow` messages, - # giving an overflowWindow of `capacity * (MaxOverflow - 1) = overflowWindow`. - # - # - In case of an overflow, messages are sorted by `receiverTimestamp` and the oldest ones are - # deleted. The number of messages that get deleted is `(overflowWindow / 2) = deleteWindow`, - # bringing the total number of driverd messages back to `capacity + (overflowWindow / 2)`. - # - # The rationale for batch deleting is efficiency. We keep half of the overflow window in addition - # to `capacity` because we delete the oldest messages with respect to `receiverTimestamp` instead of - # `senderTimestamp`. `ReceiverTimestamp` is guaranteed to be set, while senders could omit setting - # `senderTimestamp`. However, `receiverTimestamp` can differ from node to node for the same message. - # So sorting by `receiverTimestamp` might (slightly) prioritize some actually older messages and we - # compensate that by keeping half of the overflow window. - CapacityRetentionPolicy* = ref object of RetentionPolicy - capacity: int - # represents both the number of messages that are persisted in the sqlite DB (excl. the overflow window explained above), and the number of messages that get loaded via `getAll`. - totalCapacity: int # = capacity * MaxOverflow - deleteWindow: int - # = capacity * (MaxOverflow - 1) / 2; half of the overflow window, the amount of messages deleted when overflow occurs - -proc calculateTotalCapacity(capacity: int, overflow: float): int = - int(float(capacity) * overflow) - -proc calculateOverflowWindow(capacity: int, overflow: float): int = - int(float(capacity) * (overflow - 1)) - -proc calculateDeleteWindow(capacity: int, overflow: float): int = - calculateOverflowWindow(capacity, overflow) div 2 - -proc new*(T: type CapacityRetentionPolicy, capacity = DefaultCapacity): T = - let - totalCapacity = calculateTotalCapacity(capacity, MaxOverflow) - deleteWindow = calculateDeleteWindow(capacity, MaxOverflow) - - CapacityRetentionPolicy( - capacity: capacity, totalCapacity: totalCapacity, deleteWindow: deleteWindow - ) - -method execute*( - p: CapacityRetentionPolicy, driver: ArchiveDriver -): Future[RetentionPolicyResult[void]] {.async.} = - let numMessages = (await driver.getMessagesCount()).valueOr: - return err("failed to get messages count: " & error) - - if numMessages < p.totalCapacity: - return ok() - - (await driver.deleteOldestMessagesNotWithinLimit(limit = p.capacity + p.deleteWindow)).isOkOr: - return err("deleting oldest messages failed: " & error) - - return ok() diff --git a/waku/waku_archive_legacy/retention_policy/retention_policy_size.nim b/waku/waku_archive_legacy/retention_policy/retention_policy_size.nim deleted file mode 100644 index 9f710f028c..0000000000 --- a/waku/waku_archive_legacy/retention_policy/retention_policy_size.nim +++ /dev/null @@ -1,27 +0,0 @@ -when (NimMajor, NimMinor) < (1, 4): - {.push raises: [Defect].} -else: - {.push raises: [].} - -import results, chronicles, chronos -import ../driver, ../retention_policy - -logScope: - topics = "waku archive retention_policy" - -# default size is 30 GiB or 32212254720.0 in bytes -const DefaultRetentionSize*: int64 = 32212254720 - -type SizeRetentionPolicy* = ref object of RetentionPolicy - sizeLimit: int64 - -proc new*(T: type SizeRetentionPolicy, size = DefaultRetentionSize): T = - SizeRetentionPolicy(sizeLimit: size) - -method execute*( - p: SizeRetentionPolicy, driver: ArchiveDriver -): Future[RetentionPolicyResult[void]] {.async.} = - (await driver.decreaseDatabaseSize(p.sizeLimit)).isOkOr: - return err("decreaseDatabaseSize failed: " & $error) - - return ok() diff --git a/waku/waku_archive_legacy/retention_policy/retention_policy_time.nim b/waku/waku_archive_legacy/retention_policy/retention_policy_time.nim deleted file mode 100644 index b5f096e645..0000000000 --- a/waku/waku_archive_legacy/retention_policy/retention_policy_time.nim +++ /dev/null @@ -1,40 +0,0 @@ -when (NimMajor, NimMinor) < (1, 4): - {.push raises: [Defect].} -else: - {.push raises: [].} - -import std/times, results, chronicles, chronos -import ../../waku_core, ../driver, ../retention_policy - -logScope: - topics = "waku archive retention_policy" - -const DefaultRetentionTime*: int64 = 30.days.seconds - -type TimeRetentionPolicy* = ref object of RetentionPolicy - retentionTime: chronos.Duration - -proc new*(T: type TimeRetentionPolicy, retentionTime = DefaultRetentionTime): T = - TimeRetentionPolicy(retentionTime: retentionTime.seconds) - -method execute*( - p: TimeRetentionPolicy, driver: ArchiveDriver -): Future[RetentionPolicyResult[void]] {.async.} = - ## Delete messages that exceed the retention time by 10% and more (batch delete for efficiency) - - let omtRes = await driver.getOldestMessageTimestamp() - if omtRes.isErr(): - return err("failed to get oldest message timestamp: " & omtRes.error) - - let now = getNanosecondTime(getTime().toUnixFloat()) - let retentionTimestamp = now - p.retentionTime.nanoseconds - let thresholdTimestamp = retentionTimestamp - p.retentionTime.nanoseconds div 10 - - if thresholdTimestamp <= omtRes.value: - return ok() - - let res = await driver.deleteMessagesOlderThanTimestamp(ts = retentionTimestamp) - if res.isErr(): - return err("failed to delete oldest messages: " & res.error) - - return ok()