Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Optimize store #3061

Merged
merged 12 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions tests/waku_archive/test_driver_postgres.nim
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,15 @@ suite "Postgres driver":
var futures = newSeq[Future[ArchiveDriverResult[void]]](0)

let beforeSleep = now()
for _ in 1 .. 100:

for _ in 1 .. 25:
futures.add(driver.sleep(1))

await allFutures(futures)

let diff = now() - beforeSleep
# Actually, the diff randomly goes between 1 and 2 seconds.
# although in theory it should spend 1s because we establish 100
# connections and we spawn 100 tasks that spend ~1s each.
assert diff < 20_000_000_000

assert diff < 2_000_000_000 ## nanoseconds

asyncTest "Insert a message":
const contentTopic = "test-content-topic"
Expand Down
139 changes: 102 additions & 37 deletions waku/common/databases/db_postgres/dbconn.nim
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import
std/[times, strutils, asyncnet, os, sequtils],
std/[times, strutils, asyncnet, os, sequtils, sets],
results,
chronos,
chronos/threadsync,
metrics,
re,
chronicles
Expand All @@ -11,9 +12,36 @@ include db_connector/db_postgres

type DataProc* = proc(result: ptr PGresult) {.closure, gcsafe, raises: [].}

type DbConnWrapper* = ref object
dbConn: DbConn
open: bool
preparedStmts: HashSet[string] ## [stmtName's]
futBecomeFree*: Future[void]
## to notify the pgasyncpool that this conn is free, i.e. not busy

## Connection management

proc check*(db: DbConn): Result[void, string] =
proc containsPreparedStmt*(dbConnWrapper: DbConnWrapper, preparedStmt: string): bool =
return dbConnWrapper.preparedStmts.contains(preparedStmt)

proc inclPreparedStmt*(dbConnWrapper: DbConnWrapper, preparedStmt: string) =
dbConnWrapper.preparedStmts.incl(preparedStmt)

proc getDbConn*(dbConnWrapper: DbConnWrapper): DbConn =
return dbConnWrapper.dbConn

proc isPgDbConnBusy*(dbConnWrapper: DbConnWrapper): bool =
if isNil(dbConnWrapper.futBecomeFree):
return false
return not dbConnWrapper.futBecomeFree.finished()

proc isPgDbConnOpen*(dbConnWrapper: DbConnWrapper): bool =
return dbConnWrapper.open

proc setPgDbConnOpen*(dbConnWrapper: DbConnWrapper, newOpenState: bool) =
dbConnWrapper.open = newOpenState

proc check(db: DbConn): Result[void, string] =
var message: string
try:
message = $db.pqErrorMessage()
Expand All @@ -25,11 +53,11 @@ proc check*(db: DbConn): Result[void, string] =

return ok()

proc open*(connString: string): Result[DbConn, string] =
proc openDbConn(connString: string): Result[DbConn, string] =
## Opens a new connection.
var conn: DbConn = nil
try:
conn = open("", "", "", connString)
conn = open("", "", "", connString) ## included from db_postgres module
except DbError:
return err("exception opening new connection: " & getCurrentExceptionMsg())

Expand All @@ -46,22 +74,35 @@ proc open*(connString: string): Result[DbConn, string] =

return ok(conn)

proc closeDbConn*(db: DbConn) {.raises: [OSError].} =
let fd = db.pqsocket()
if fd != -1:
asyncengine.unregister(cast[asyncengine.AsyncFD](fd))
db.close()
proc new*(T: type DbConnWrapper, connString: string): Result[T, string] =
let dbConn = openDbConn(connString).valueOr:
return err("failed to establish a new connection: " & $error)

return ok(DbConnWrapper(dbConn: dbConn, open: true))

proc closeDbConn*(
dbConnWrapper: DbConnWrapper
): Result[void, string] {.raises: [OSError].} =
let fd = dbConnWrapper.dbConn.pqsocket()
if fd == -1:
return err("error file descriptor -1 in closeDbConn")

asyncengine.unregister(cast[asyncengine.AsyncFD](fd))

dbConnWrapper.dbConn.close()

return ok()

proc `$`(self: SqlQuery): string =
return cast[string](self)

proc sendQuery(
db: DbConn, query: SqlQuery, args: seq[string]
dbConnWrapper: DbConnWrapper, query: SqlQuery, args: seq[string]
): Future[Result[void, string]] {.async.} =
## This proc can be used directly for queries that don't retrieve values back.

if db.status != CONNECTION_OK:
db.check().isOkOr:
if dbConnWrapper.dbConn.status != CONNECTION_OK:
dbConnWrapper.dbConn.check().isOkOr:
return err("failed to connect to database: " & $error)

return err("unknown reason")
Expand All @@ -72,17 +113,16 @@ proc sendQuery(
except DbError:
return err("exception formatting the query: " & getCurrentExceptionMsg())

let success = db.pqsendQuery(cstring(wellFormedQuery))
let success = dbConnWrapper.dbConn.pqsendQuery(cstring(wellFormedQuery))
if success != 1:
db.check().isOkOr:
dbConnWrapper.dbConn.check().isOkOr:
return err("failed pqsendQuery: " & $error)

return err("failed pqsendQuery: unknown reason")

return ok()

proc sendQueryPrepared(
db: DbConn,
dbConnWrapper: DbConnWrapper,
stmtName: string,
paramValues: openArray[string],
paramLengths: openArray[int32],
Expand All @@ -96,8 +136,8 @@ proc sendQueryPrepared(
$paramValues.len & " " & $paramLengths.len & " " & $paramFormats.len
return err("lengths discrepancies in sendQueryPrepared: " & $lengthsErrMsg)

if db.status != CONNECTION_OK:
db.check().isOkOr:
if dbConnWrapper.dbConn.status != CONNECTION_OK:
dbConnWrapper.dbConn.check().isOkOr:
return err("failed to connect to database: " & $error)

return err("unknown reason")
Expand All @@ -110,7 +150,7 @@ proc sendQueryPrepared(

const ResultFormat = 0 ## 0 for text format, 1 for binary format.

let success = db.pqsendQueryPrepared(
let success = dbConnWrapper.dbConn.pqsendQueryPrepared(
stmtName,
nParams,
cstrArrayParams,
Expand All @@ -119,49 +159,63 @@ proc sendQueryPrepared(
ResultFormat,
)
if success != 1:
db.check().isOkOr:
dbConnWrapper.dbConn.check().isOkOr:
return err("failed pqsendQueryPrepared: " & $error)

return err("failed pqsendQueryPrepared: unknown reason")

return ok()

proc waitQueryToFinish(
db: DbConn, rowCallback: DataProc = nil
dbConnWrapper: DbConnWrapper, rowCallback: DataProc = nil
): Future[Result[void, string]] {.async.} =
## The 'rowCallback' param is != nil when the underlying query wants to retrieve results (SELECT.)
## For other queries, like "INSERT", 'rowCallback' should be nil.

var dataAvailable = false
let futDataAvailable = newFuture[void]("futDataAvailable")

proc onDataAvailable(udata: pointer) {.gcsafe, raises: [].} =
dataAvailable = true
if not futDataAvailable.completed():
futDataAvailable.complete()

let asyncFd = cast[asyncengine.AsyncFD](pqsocket(db))
let asyncFd = cast[asyncengine.AsyncFD](pqsocket(dbConnWrapper.dbConn))

asyncengine.addReader2(asyncFd, onDataAvailable).isOkOr:
dbConnWrapper.futBecomeFree.fail(newException(ValueError, $error))
return err("failed to add event reader in waitQueryToFinish: " & $error)
defer:
asyncengine.removeReader2(asyncFd).isOkOr:
return err("failed to remove event reader in waitQueryToFinish: " & $error)

while not dataAvailable:
await sleepAsync(timer.milliseconds(1))
await futDataAvailable

## Now retrieve the result
## Now retrieve the result from the database
while true:
let pqResult = db.pqgetResult()
let pqResult = dbConnWrapper.dbConn.pqgetResult()

if pqResult == nil:
db.check().isOkOr:
dbConnWrapper.dbConn.check().isOkOr:
if not dbConnWrapper.futBecomeFree.failed():
dbConnWrapper.futBecomeFree.fail(newException(ValueError, $error))
return err("error in query: " & $error)

return ok() # reached the end of the results
dbConnWrapper.futBecomeFree.complete()
return ok() # reached the end of the results. The query is completed

if not rowCallback.isNil():
rowCallback(pqResult)

pqclear(pqResult)

proc dbConnQuery*(
db: DbConn, query: SqlQuery, args: seq[string], rowCallback: DataProc
dbConnWrapper: DbConnWrapper,
query: SqlQuery,
args: seq[string],
rowCallback: DataProc,
requestId: string,
): Future[Result[void, string]] {.async, gcsafe.} =
dbConnWrapper.futBecomeFree = newFuture[void]("dbConnQuery")

let cleanedQuery = ($query).replace(" ", "").replace("\n", "")
## remove everything between ' or " all possible sequence of numbers. e.g. rm partition partition
var querySummary = cleanedQuery.replace(re"""(['"]).*?\1""", "")
Expand All @@ -170,15 +224,17 @@ proc dbConnQuery*(

var queryStartTime = getTime().toUnixFloat()

(await db.sendQuery(query, args)).isOkOr:
(await dbConnWrapper.sendQuery(query, args)).isOkOr:
error "error in dbConnQuery", error = $error
dbConnWrapper.futBecomeFree.fail(newException(ValueError, $error))
return err("error in dbConnQuery calling sendQuery: " & $error)

let sendDuration = getTime().toUnixFloat() - queryStartTime
query_time_secs.set(sendDuration, [querySummary, "sendQuery"])

queryStartTime = getTime().toUnixFloat()

(await db.waitQueryToFinish(rowCallback)).isOkOr:
(await dbConnWrapper.waitQueryToFinish(rowCallback)).isOkOr:
return err("error in dbConnQuery calling waitQueryToFinish: " & $error)

let waitDuration = getTime().toUnixFloat() - queryStartTime
Expand All @@ -188,6 +244,7 @@ proc dbConnQuery*(

if "insert" notin ($query).toLower():
debug "dbConnQuery",
requestId,
query = $query,
querySummary,
waitDurationSecs = waitDuration,
Expand All @@ -196,23 +253,28 @@ proc dbConnQuery*(
return ok()

proc dbConnQueryPrepared*(
db: DbConn,
dbConnWrapper: DbConnWrapper,
stmtName: string,
paramValues: seq[string],
paramLengths: seq[int32],
paramFormats: seq[int32],
rowCallback: DataProc,
requestId: string,
): Future[Result[void, string]] {.async, gcsafe.} =
dbConnWrapper.futBecomeFree = newFuture[void]("dbConnQueryPrepared")
var queryStartTime = getTime().toUnixFloat()
db.sendQueryPrepared(stmtName, paramValues, paramLengths, paramFormats).isOkOr:

dbConnWrapper.sendQueryPrepared(stmtName, paramValues, paramLengths, paramFormats).isOkOr:
dbConnWrapper.futBecomeFree.fail(newException(ValueError, $error))
error "error in dbConnQueryPrepared", error = $error
return err("error in dbConnQueryPrepared calling sendQuery: " & $error)

let sendDuration = getTime().toUnixFloat() - queryStartTime
query_time_secs.set(sendDuration, [stmtName, "sendQuery"])

queryStartTime = getTime().toUnixFloat()

(await db.waitQueryToFinish(rowCallback)).isOkOr:
(await dbConnWrapper.waitQueryToFinish(rowCallback)).isOkOr:
return err("error in dbConnQueryPrepared calling waitQueryToFinish: " & $error)

let waitDuration = getTime().toUnixFloat() - queryStartTime
Expand All @@ -222,6 +284,9 @@ proc dbConnQueryPrepared*(

if "insert" notin stmtName.toLower():
debug "dbConnQueryPrepared",
stmtName, waitDurationSecs = waitDuration, sendDurationSecs = sendDuration
requestId,
stmtName,
waitDurationSecs = waitDuration,
sendDurationSecs = sendDuration

return ok()
Loading
Loading