From 28bdb70be46d3fb3a6f992b3f9f2de1defd85a30 Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Sun, 7 Jul 2024 15:57:54 +0200 Subject: [PATCH 1/2] postgres_driver: better partition creation without exclusive access --- .../postgres_driver/postgres_driver.nim | 45 +++++++++++++++---- 1 file changed, 37 insertions(+), 8 deletions(-) diff --git a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim index 516d8d70e3..5793139e22 100644 --- a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim +++ b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim @@ -918,7 +918,9 @@ const COULD_NOT_ACQUIRE_ADVISORY_LOCK* = "could not acquire advisory lock" proc performWriteQueryWithLock*( self: PostgresDriver, queryToProtect: string ): Future[ArchiveDriverResult[void]] {.async.} = - ## This wraps the original query in a script so that we make sure a pg_advisory lock protects it + ## This wraps the original query in a script so that we make sure a pg_advisory lock protects it. + ## The purpose of this proc is to protect write queries that might be performed simultaneously + ## to the same database, from different store nodes. debug "performWriteQueryWithLock", queryToProtect let query = fmt""" @@ -947,6 +949,11 @@ proc performWriteQueryWithLock*( END $$; """ (await self.performWriteQuery(query)).isOkOr: + if error.contains(COULD_NOT_ACQUIRE_ADVISORY_LOCK): + ## We don't consider this as an error. Just someone else acquired the advisory lock + debug "skip performWriteQuery because the advisory lock is acquired by other" + return ok() + debug "protected query ended with error", error = $error return err("protected query ended with error:" & $error) @@ -971,22 +978,44 @@ proc addPartition( let partitionName = "messages_" & fromInSec & "_" & untilInSec + ## Create the partition table but not attach it yet to the main table let createPartitionQuery = - "CREATE TABLE IF NOT EXISTS " & partitionName & " PARTITION OF " & - "messages FOR VALUES FROM ('" & fromInNanoSec & "') TO ('" & untilInNanoSec & "');" + "CREATE TABLE IF NOT EXISTS " & partitionName & + " (LIKE messages INCLUDING DEFAULTS INCLUDING CONSTRAINTS);" (await self.performWriteQueryWithLock(createPartitionQuery)).isOkOr: if error.contains("already exists"): debug "skip create new partition as it already exists: ", skipped_error = $error return ok() - if error.contains(COULD_NOT_ACQUIRE_ADVISORY_LOCK): - debug "skip create new partition because the advisory lock is acquired by other" - return ok() - - ## for any different error, just consider it return err(fmt"error adding partition [{partitionName}]: " & $error) + ## Add constraint to the partition table so that EXCLUSIVE ACCESS is not performed when + ## the partition is attached to the main table. + let constraintName = partitionName & "_by_range_check" + let addTimeConstraintQuery = + "ALTER TABLE " & partitionName & " ADD CONSTRAINT " & constraintName & + " CHECK ( storedAt >= " & fromInNanoSec & " AND storedAt < " & untilInNanoSec & " );" + + (await self.performWriteQueryWithLock(addTimeConstraintQuery)).isOkOr: + return err(fmt"error creating constraint [{partitionName}]: " & $error) + + ## Attaching the new created table as a new partition. That does not require EXCLUSIVE ACCESS. + let attachPartitionQuery = + "ALTER TABLE messages ATTACH PARTITION " & partitionName & " FOR VALUES FROM (" & + fromInNanoSec & ") TO (" & untilInNanoSec & ");" + + (await self.performWriteQueryWithLock(attachPartitionQuery)).isOkOr: + return err(fmt"error attaching partition [{partitionName}]: " & $error) + + ## Dropping the check constraint as it was only necessary to prevent full scan, + ## and EXCLUSIVE ACCESS, to the whole messages table, when the new partition was attached. + let dropConstraint = + "ALTER TABLE " & partitionName & " DROP CONSTRAINT " & constraintName & ";" + + (await self.performWriteQueryWithLock(dropConstraint)).isOkOr: + return err(fmt"error dropping constraint [{partitionName}]: " & $error) + debug "new partition added", query = createPartitionQuery self.partitionMngr.addPartitionInfo(partitionName, beginning, `end`) From ffc2fc84d54de7564605ca6ac4e977cced62e21c Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Sun, 7 Jul 2024 23:27:19 +0200 Subject: [PATCH 2/2] CHANGELOG.md update for v0.30.2 --- CHANGELOG.md | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index fae1476903..bb7bbfa3d8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,22 @@ +## v0.30.2 (2024-07-08) + +### Release highlights + +* Patch to avoid exclusive access when creating new partitions in the postgres messages table. + +### Changes + +- fix: postgres_driver better partition creation without exclusive access [28bdb70b](https://github.com/waku-org/nwaku/commit/28bdb70be46d3fb3a6f992b3f9f2de1defd85a30) + +This release supports the following [libp2p protocols](https://docs.libp2p.io/concepts/protocols/): +| Protocol | Spec status | Protocol id | +| ---: | :---: | :--- | +| [`11/WAKU2-RELAY`](https://github.com/vacp2p/rfc-index/blob/main/waku/standards/core/11/relay.md) | `stable` | `/vac/waku/relay/2.0.0` | +| [`12/WAKU2-FILTER`](https://github.com/vacp2p/rfc-index/blob/main/waku/standards/core/12/filter.md) | `draft` | `/vac/waku/filter/2.0.0-beta1`
`/vac/waku/filter-subscribe/2.0.0-beta1`
`/vac/waku/filter-push/2.0.0-beta1` | +| [`13/WAKU2-STORE`](https://github.com/vacp2p/rfc-index/blob/main/waku/standards/core/13/store.md) | `draft` | `/vac/waku/store/2.0.0-beta4` | +| [`19/WAKU2-LIGHTPUSH`](https://github.com/vacp2p/rfc-index/blob/main/waku/standards/core/19/lightpush.md) | `draft` | `/vac/waku/lightpush/2.0.0-beta1` | +| [`66/WAKU2-METADATA`](https://github.com/waku-org/specs/blob/master/standards/core/metadata.md) | `raw` | `/vac/waku/metadata/1.0.0` | + ## v0.30.1 (2024-07-03) ### Notes