Skip to content

Commit

Permalink
Handle session state sync properly with tx-behavior settings (#8318)
Browse files Browse the repository at this point in the history
  • Loading branch information
fantix authored Feb 7, 2025
1 parent e6a7994 commit 943415d
Show file tree
Hide file tree
Showing 8 changed files with 266 additions and 92 deletions.
1 change: 1 addition & 0 deletions edb/server/dbview/dbview.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ cdef class DatabaseConnectionView:
cdef describe_state(self)
cdef encode_state(self)
cdef decode_state(self, type_id, data)
cdef bint needs_commit_after_state_sync(self)

cdef check_capabilities(
self,
Expand Down
10 changes: 10 additions & 0 deletions edb/server/dbview/dbview.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,16 @@ cdef class DatabaseConnectionView:
aliases, session_config, globals_, type_id, data
)

cdef bint needs_commit_after_state_sync(self):
return any(
tx_conf in self._config
for tx_conf in [
"default_transaction_isolation",
"default_transaction_deferrable",
# default_transaction_access_mode is not yet a backend config
]
)

property txid:
def __get__(self):
return self._txid
Expand Down
2 changes: 2 additions & 0 deletions edb/server/pgcon/pgcon.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ cdef class PGConnection:
public object pinned_by

object last_state
bint state_reset_needs_commit
public object last_init_con_data

str last_indirect_return
Expand Down Expand Up @@ -165,6 +166,7 @@ cdef class PGConnection:
object bind_datas, bytes state,
ssize_t start, ssize_t end, int dbver, object parse_array,
object query_prefix,
bint needs_commit_state,
)

cdef _rewrite_copy_data(
Expand Down
66 changes: 64 additions & 2 deletions edb/server/pgcon/pgcon.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,51 @@ cdef class PGConnection:
# by the backend.
self.aborted_with_error = None

# Session State Management
# ------------------------
# Due to the fact that backend sessions are not pinned to frontend
# sessions (EdgeQL, SQL, etc.) out of transactions, we need to sync
# the backend state with the frontend state before executing queries.
#
# For performance reasons, we try to avoid syncing the state by
# remembering the last state we've synced (last_state), and prefer
# backend connection with the same state as the frontend.
#
# Syncing the state is done by resetting the session state as a whole,
# followed by applying the new state, so that we don't have to track
# individual config resets. Again for performance reasons, the state
# sync is usually applied in the same implicit transaction as the
# actual query in order to avoid extra round trips.
#
# Though, there are exceptions when we need to sync the state in a
# separate transaction by inserting a SYNC message before the actual
# query. This is because either that the query itself is a START
# TRANSACTION / non-transactional command and a few other cases (see
# _parse_execute() below), or the state change affects new transaction
# creation like changing the `default_transaction_isolation` or its
# siblings (see `needs_commit_state` parameters). In such cases, we
# remember the `last_state` immediately after we received the
# ReadyForQuery message caused by the SYNC above, if there are no
# errors happened during state sync. Otherwise, we only remember
# `last_state` after the implicit transaction ends successfully, when
# we're sure the state is synced permanently.
#
# The actual queries may also change the session state. Regardless of
# how we synced state previously, we always remember the `last_state`
# after successful executions (also after transactions without errors,
# implicit or explicit).
#
# Finally, resetting an existing session state that was positive in
# `needs_commit_state` also requires a commit, because the new state
# may not have `needs_commit_state`. To achieve this, we remember the
# previous `needs_commit_state` in `state_reset_needs_commit` and
# always insert a SYNC in the next state sync if it's True. Also, if
# the actual queries modified those `default_transaction_*` settings,
# we also need to set `state_reset_needs_commit` to True for the next
# state sync(reset). See `needs_commit_after_state_sync()` functions
# in dbview classes (EdgeQL and SQL).
self.last_state = dbview.DEFAULT_STATE
self.state_reset_needs_commit = False

cpdef set_stmt_cache_size(self, int maxsize):
self.prep_stmts.resize(maxsize)
Expand Down Expand Up @@ -590,6 +634,7 @@ cdef class PGConnection:
object bind_datas, bytes state,
ssize_t start, ssize_t end, int dbver, object parse_array,
object query_prefix,
bint needs_commit_state,
):
# parse_array is an array of booleans for output with the same size as
# the query_unit_group, indicating if each unit is freshly parsed
Expand All @@ -607,6 +652,8 @@ cdef class PGConnection:

if state is not None and start == 0:
self._build_apply_state_req(state, out)
if needs_commit_state or self.state_reset_needs_commit:
self.write_sync(out)

# Build the parse_array first, closing statements if needed before
# actually executing any command that may fail, in order to ensure
Expand Down Expand Up @@ -716,13 +763,16 @@ cdef class PGConnection:
finally:
await self.after_command()

async def wait_for_state_resp(self, bytes state, bint state_sync):
async def wait_for_state_resp(
self, bytes state, bint state_sync, bint needs_commit_state
):
if state_sync:
try:
await self._parse_apply_state_resp(2 if state is None else 3)
finally:
await self.wait_for_sync()
self.last_state = state
self.state_reset_needs_commit = needs_commit_state
else:
await self._parse_apply_state_resp(2 if state is None else 3)

Expand Down Expand Up @@ -973,6 +1023,7 @@ cdef class PGConnection:
tx_isolation,
list param_data_types,
bytes query_prefix,
bint needs_commit_state,
):
cdef:
WriteBuffer out
Expand Down Expand Up @@ -1005,6 +1056,8 @@ cdef class PGConnection:
or not query.is_transactional
or query.run_and_rollback
or tx_isolation is not None
or needs_commit_state
or self.state_reset_needs_commit
):
# This query has START TRANSACTION or non-transactional command
# like CREATE DATABASE in it.
Expand Down Expand Up @@ -1194,7 +1247,8 @@ cdef class PGConnection:

try:
if state is not None:
await self.wait_for_state_resp(state, state_sync)
await self.wait_for_state_resp(
state, state_sync, needs_commit_state)

if query.run_and_rollback or tx_isolation is not None:
await self.wait_for_sync()
Expand Down Expand Up @@ -1304,6 +1358,7 @@ cdef class PGConnection:
bint use_pending_func_cache = 0,
tx_isolation = None,
query_prefix = None,
bint needs_commit_state = False,
):
self.before_command()
started_at = time.monotonic()
Expand All @@ -1319,6 +1374,7 @@ cdef class PGConnection:
tx_isolation,
param_data_types,
query_prefix or b'',
needs_commit_state,
)
finally:
metrics.backend_query_duration.observe(
Expand Down Expand Up @@ -1493,6 +1549,8 @@ cdef class PGConnection:
)
await self.wait_for_sync()
self.last_state = state
self.state_reset_needs_commit = (
dbv.needs_commit_after_state_sync())
finally:
await self.after_command()

Expand All @@ -1512,6 +1570,8 @@ cdef class PGConnection:
)
await self.wait_for_sync()
self.last_state = state
self.state_reset_needs_commit = (
dbv.needs_commit_after_state_sync())
try:
return await self._parse_sql_extended_query(
actions,
Expand All @@ -1522,6 +1582,8 @@ cdef class PGConnection:
finally:
if not dbv.in_tx():
self.last_state = dbv.serialize_state()
self.state_reset_needs_commit = (
dbv.needs_commit_after_state_sync())
finally:
await self.after_command()

Expand Down
34 changes: 30 additions & 4 deletions edb/server/protocol/execute.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ cdef class ExecutionGroup:
object dbv, # can be DatabaseConnectionView or Database
fe_conn: frontend.AbstractFrontendConnection = None,
bytes state = None,
bint needs_commit_state = False,
):
cdef int dbver

Expand All @@ -94,9 +95,14 @@ cdef class ExecutionGroup:
dbver,
parse_array,
None, # query_prefix
needs_commit_state,
)
if state is not None:
await be_conn.wait_for_state_resp(state, state_sync=0)
await be_conn.wait_for_state_resp(
state,
state_sync=needs_commit_state,
needs_commit_state=needs_commit_state,
)
for i, unit in enumerate(self.group):
ignore_data = unit.output_format == FMT_NONE
rv = await be_conn.wait_for_command(
Expand Down Expand Up @@ -238,11 +244,13 @@ async def execute(
cdef:
bytes state = None, orig_state = None
WriteBuffer bound_args_buf
bint needs_commit_state = False

query_unit = compiled.query_unit_group[0]

if not dbv.in_tx():
orig_state = state = dbv.serialize_state()
needs_commit_state = dbv.needs_commit_after_state_sync()

new_types = None
server = dbv.server
Expand All @@ -269,13 +277,19 @@ async def execute(
close_frontend_conns=query_unit.drop_db_reset_connections,
)
if query_unit.system_config:
# execute_system_config() always sync state in a separate tx,
# so we don't need to pass down the needs_commit_state here
await execute_system_config(be_conn, dbv, query_unit, state)
else:
config_ops = query_unit.config_ops

if query_unit.sql:
if query_unit.user_schema:
await be_conn.parse_execute(query=query_unit, state=state)
await be_conn.parse_execute(
query=query_unit,
state=state,
needs_commit_state=needs_commit_state,
)
if query_unit.ddl_stmt_id is not None:
ddl_ret = be_conn.load_last_ddl_return(query_unit)
if ddl_ret and ddl_ret['new_types']:
Expand All @@ -299,6 +313,7 @@ async def execute(
param_data_types=data_types,
use_prep_stmt=use_prep_stmt,
state=state,
needs_commit_state=needs_commit_state,
dbver=dbv.dbver,
use_pending_func_cache=compiled.use_pending_func_cache,
tx_isolation=tx_isolation,
Expand Down Expand Up @@ -411,6 +426,8 @@ async def execute(
# 1. An orphan ROLLBACK command without a paring start tx
# 2. There was no SQL, so the state can't have been synced.
be_conn.last_state = state
be_conn.state_reset_needs_commit = (
dbv.needs_commit_after_state_sync())
if compiled.recompiled_cache:
for req, qu_group in compiled.recompiled_cache:
dbv.cache_compiled_query(req, qu_group)
Expand All @@ -437,7 +454,7 @@ async def execute_script(
object global_schema, roles
WriteBuffer bind_data
int dbver = dbv.dbver
bint parse
bint parse, needs_commit_state = False

user_schema = extensions = ext_config_settings = cached_reflection = None
feature_used_metrics = None
Expand All @@ -451,6 +468,7 @@ async def execute_script(
in_tx = dbv.in_tx()
if not in_tx:
orig_state = state = dbv.serialize_state()
needs_commit_state = dbv.needs_commit_after_state_sync()

data = None

Expand Down Expand Up @@ -503,10 +521,16 @@ async def execute_script(
dbver,
parse_array,
query_prefix,
needs_commit_state,
)

if idx == 0 and state is not None:
await conn.wait_for_state_resp(state, state_sync=0)
await conn.wait_for_state_resp(
state,
state_sync=needs_commit_state,
needs_commit_state=needs_commit_state,
)
conn.state_reset_needs_commit = needs_commit_state
# state is restored, clear orig_state so that we can
# set conn.last_state correctly later
orig_state = None
Expand Down Expand Up @@ -622,6 +646,8 @@ async def execute_script(
state = dbv.serialize_state()
if state is not orig_state:
conn.last_state = state
conn.state_reset_needs_commit = (
dbv.needs_commit_after_state_sync())
elif updated_user_schema:
dbv._in_tx_user_schema_pickle = user_schema

Expand Down
1 change: 1 addition & 0 deletions edb/server/protocol/pg_ext.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ cdef class ConnectionView:
cdef inline _reset_tx_state(
self, bint chain_implicit, bint chain_explicit
)
cdef bint needs_commit_after_state_sync(self)
cpdef inline close_portal_if_exists(self, str name)
cpdef inline close_portal(self, str name)
cdef inline find_portal(self, str name)
Expand Down
10 changes: 10 additions & 0 deletions edb/server/protocol/pg_ext.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,16 @@ cdef class ConnectionView:
self._session_state_db_cache = (self._settings, rv)
return rv

cdef bint needs_commit_after_state_sync(self):
return any(
tx_conf in self._settings
for tx_conf in [
"default_transaction_isolation",
"default_transaction_deferrable",
"default_transaction_read_only",
]
)


cdef class PgConnection(frontend.FrontendConnection):
interface = "sql"
Expand Down
Loading

0 comments on commit 943415d

Please sign in to comment.