Skip to content

Commit

Permalink
pgasyncpool: use thread signal to control when the pool is busy
Browse files Browse the repository at this point in the history
  • Loading branch information
Ivansete-status committed Sep 29, 2024
1 parent a0a5650 commit ae5cd5f
Showing 1 changed file with 21 additions and 3 deletions.
24 changes: 21 additions & 3 deletions waku/common/databases/db_postgres/pgasyncpool.nim
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@
{.push raises: [].}

import std/[sequtils, nre, strformat, sets], results, chronos, chronicles
std/[sequtils, nre, strformat, sets],
results,
chronos,
chronos/threadsync,
chronicles,
strutils,
random
import ./dbconn, ../common, ../../../waku_core/time

type PgAsyncPoolState {.pure.} = enum
Expand All @@ -15,6 +22,7 @@ type PgDbConn = ref object
open: bool
busy: bool
preparedStmts: HashSet[string] ## [stmtName's]
busySignal: ThreadSignalPtr ## signal to wait while the pool is busy

type
# Database connection pool
Expand All @@ -24,6 +32,7 @@ type

state: PgAsyncPoolState
conns: seq[PgDbConn]
busySignal: ThreadSignalPtr

proc new*(T: type PgAsyncPool, dbUrl: string, maxConnections: int): DatabaseResult[T] =
var connString: string
Expand All @@ -42,11 +51,15 @@ proc new*(T: type PgAsyncPool, dbUrl: string, maxConnections: int): DatabaseResu
SyntaxError:
return err("could not parse postgres string: " & getCurrentExceptionMsg())

let busySignal = ThreadSignalPtr.new().valueOr:
return err("error creating busySignal ThreadSignalPtr in PgAsyncPool: " & $error)

let pool = PgAsyncPool(
connString: connString,
maxConnections: maxConnections,
state: PgAsyncPoolState.Live,
conns: newSeq[PgDbConn](0),
busySignal: busySignal,
)

return ok(pool)
Expand All @@ -70,7 +83,8 @@ proc close*(pool: PgAsyncPool): Future[Result[void, string]] {.async.} =
# wait for the connections to be released and close them, without
# blocking the async runtime
while pool.conns.anyIt(it.busy):
await sleepAsync(0.milliseconds)
debug "closing waiting while connections are busy"
await pool.busySignal.wait()

for i in 0 ..< pool.conns.len:
if pool.conns[i].busy:
Expand Down Expand Up @@ -110,8 +124,8 @@ proc getConnIndex(pool: PgAsyncPool): Future[DatabaseResult[int]] {.async.} =

if pool.conns.len == pool.maxConnections:
## Can't create more connections. Wait for a free connection without blocking the async runtime.
while pool.isBusy():
await sleepAsync(0.milliseconds)
if pool.isBusy():
await pool.busySignal.wait()

return pool.getFirstFreeConnIndex()
elif pool.conns.len < pool.maxConnections:
Expand Down Expand Up @@ -146,6 +160,10 @@ proc releaseConn(pool: PgAsyncPool, conn: DbConn) =
if pool.conns[i].dbConn == conn:
pool.conns[i].busy = false

pool.busySignal.fireSync().isOkOr:
error "error triggering busySignal in releaseConn", error = $error


const SlowQueryThresholdInNanoSeconds = 1_000_000_000

proc pgQuery*(
Expand Down

0 comments on commit ae5cd5f

Please sign in to comment.