Skip to content

Commit

Permalink
Use sys_pgcon for long-term advisory locks (#8320)
Browse files Browse the repository at this point in the history
* Fixes the issue of advisory locks getting leaked back to the
connection pool.
* Fixes the `test_ext_ai_indexing_*` failures in CI.
  • Loading branch information
fantix authored and msullivan committed Feb 20, 2025
1 parent da9b73f commit a90e7aa
Showing 1 changed file with 59 additions and 8 deletions.
67 changes: 59 additions & 8 deletions edb/server/protocol/ai_ext.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ async def _ext_ai_index_builder_controller_loop(
models = await _ext_ai_fetch_active_models(pgconn)
if models:
if not holding_lock:
holding_lock = await _ext_ai_lock(pgconn)
holding_lock = await _ext_ai_lock(tenant, pgconn)
if holding_lock:
try:
processed, errors = (
Expand All @@ -166,7 +166,7 @@ async def _ext_ai_index_builder_controller_loop(
finally:
if processed == 0 or errors != 0:
await asyncutil.deferred_shield(
_ext_ai_unlock(pgconn))
_ext_ai_unlock(tenant))
holding_lock = False
finally:
tenant.release_pgcon(dbname, pgconn)
Expand Down Expand Up @@ -214,22 +214,73 @@ async def _ext_ai_fetch_active_models(
return result


# The _ext_ai_lock() is a long-term lock held in the system pgcon. It is used
# in the index builder job above guarding multiple alternating database pgcons
# and outgoing HTTP requests (free up pgcons while waiting for a response from
# external services), so that different Gel tenants on the same backend
# run this job exclusively.
#
# The following implementation is also safe to be used by multiple tasks within
# the same tenant (though at the time of writing, there is only one such task
# per tenant). To achieve this, we added an extra query on pg_locks to check if
# it's already held by another task, because advisory locks allow reentrancy
# from the same session (the same sys_pgcon). And to avoid racing conditions,
# we use another advisory lock over the 2 queries of check-and-lock in the
# local session. This also means, one must use _ext_ai_lock() instead of an
# individual lock of the 2 locks here to avoid misuse.
#
# If you are editing the magic numbers here: make sure it fits in a Postgres
# Oid type (uint32), or you'll need to change the `classid` query below.
_EXT_AI_ADVISORY_LOCK = b"3987734540"
_EXT_AI_ADVISORY_LOCK_LOCK = b"3987734541"


async def _ext_ai_lock(
tenant: srv_tenant.Tenant,
pgconn: pgcon.PGConnection,
) -> bool:
b = await pgconn.sql_fetch_val(
b"SELECT pg_try_advisory_lock(" + _EXT_AI_ADVISORY_LOCK + b")")
return b == b'\x01'
# We use transaction-level advisory locks to ensure releasing
await pgconn.sql_execute(b"START TRANSACTION")
try:
b = await pgconn.sql_fetch_val(
b"SELECT pg_try_advisory_xact_lock("
+ _EXT_AI_ADVISORY_LOCK_LOCK
+ b")"
)
if b == b'\x01':
lock_free = await pgconn.sql_fetch_val(
b'''
SELECT NOT EXISTS (
SELECT
1
FROM
pg_locks
WHERE
locktype = 'advisory'
AND classid = 0
AND objid = \
''' + _EXT_AI_ADVISORY_LOCK + b')')
if lock_free == b'\x01':
async with tenant.use_sys_pgcon() as syscon:
# The long-term holding lock must be on session-level
b = await syscon.sql_fetch_val(
b"SELECT pg_try_advisory_lock("
+ _EXT_AI_ADVISORY_LOCK
+ b")"
)
return b == b'\x01'
finally:
await pgconn.sql_execute(b"ROLLBACK")

return False


async def _ext_ai_unlock(
pgconn: pgcon.PGConnection,
tenant: srv_tenant.Tenant,
) -> None:
await pgconn.sql_fetch_val(
b"SELECT pg_advisory_unlock(" + _EXT_AI_ADVISORY_LOCK + b")")
async with tenant.use_sys_pgcon() as syscon:
await syscon.sql_fetch_val(
b"SELECT pg_advisory_unlock(" + _EXT_AI_ADVISORY_LOCK + b")")


async def _ext_ai_index_builder_work(
Expand Down

0 comments on commit a90e7aa

Please sign in to comment.