Skip to content

Commit

Permalink
postgres_driver: better partition creation without exclusive access
Browse files Browse the repository at this point in the history
  • Loading branch information
Ivansete-status committed Jul 7, 2024
1 parent c93f804 commit 1014de2
Showing 1 changed file with 37 additions and 8 deletions.
45 changes: 37 additions & 8 deletions waku/waku_archive/driver/postgres_driver/postgres_driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -915,7 +915,9 @@ const COULD_NOT_ACQUIRE_ADVISORY_LOCK* = "could not acquire advisory lock"
proc performWriteQueryWithLock*(
self: PostgresDriver, queryToProtect: string
): Future[ArchiveDriverResult[void]] {.async.} =
## This wraps the original query in a script so that we make sure a pg_advisory lock protects it
## This wraps the original query in a script so that we make sure a pg_advisory lock protects it.
## The purpose of this proc is to protect write queries that might be performed simultaneously
## to the same database, from different store nodes.
debug "performWriteQueryWithLock", queryToProtect
let query =
fmt"""
Expand Down Expand Up @@ -944,6 +946,11 @@ proc performWriteQueryWithLock*(
END $$;
"""
(await self.performWriteQuery(query)).isOkOr:
if error.contains(COULD_NOT_ACQUIRE_ADVISORY_LOCK):
## We don't consider this as an error. Just someone else acquired the advisory lock
debug "skip performWriteQuery because the advisory lock is acquired by other"
return ok()

debug "protected query ended with error", error = $error
return err("protected query ended with error:" & $error)

Expand All @@ -968,22 +975,44 @@ proc addPartition(

let partitionName = "messages_" & fromInSec & "_" & untilInSec

## Create the partition table but not attach it yet to the main table
let createPartitionQuery =
"CREATE TABLE IF NOT EXISTS " & partitionName & " PARTITION OF " &
"messages FOR VALUES FROM ('" & fromInNanoSec & "') TO ('" & untilInNanoSec & "');"
"CREATE TABLE IF NOT EXISTS " & partitionName &
" (LIKE messages INCLUDING DEFAULTS INCLUDING CONSTRAINTS);"

(await self.performWriteQueryWithLock(createPartitionQuery)).isOkOr:
if error.contains("already exists"):
debug "skip create new partition as it already exists: ", skipped_error = $error
return ok()

if error.contains(COULD_NOT_ACQUIRE_ADVISORY_LOCK):
debug "skip create new partition because the advisory lock is acquired by other"
return ok()

## for any different error, just consider it
return err(fmt"error adding partition [{partitionName}]: " & $error)

## Add constraint to the partition table so that EXCLUSIVE ACCESS is not performed when
## the partition is attached to the main table.
let constraintName = partitionName & "_by_range_check"
let addTimeConstraintQuery =
"ALTER TABLE " & partitionName & " ADD CONSTRAINT " & constraintName &
" CHECK ( storedAt >= " & fromInNanoSec & " AND storedAt < " & untilInNanoSec & " );"

(await self.performWriteQueryWithLock(addTimeConstraintQuery)).isOkOr:
return err(fmt"error creating constraint [{partitionName}]: " & $error)

## Attaching the new created table as a new partition. That does not require EXCLUSIVE ACCESS.
let attachPartitionQuery =
"ALTER TABLE messages ATTACH PARTITION " & partitionName & " FOR VALUES FROM (" &
fromInNanoSec & ") TO (" & untilInNanoSec & ");"

(await self.performWriteQueryWithLock(attachPartitionQuery)).isOkOr:
return err(fmt"error attaching partition [{partitionName}]: " & $error)

## Dropping the check constraint as it was only necessary to prevent full scan,
## and EXCLUSIVE ACCESS, to the whole messages table, when the new partition was attached.
let dropConstraint =
"ALTER TABLE " & partitionName & " DROP CONSTRAINT " & constraintName & ";"

(await self.performWriteQueryWithLock(dropConstraint)).isOkOr:
return err(fmt"error dropping constraint [{partitionName}]: " & $error)

debug "new partition added", query = createPartitionQuery

self.partitionMngr.addPartitionInfo(partitionName, beginning, `end`)
Expand Down

0 comments on commit 1014de2

Please sign in to comment.