Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Use attempt_to_set_autocommit everywhere. (#16615)
Browse files Browse the repository at this point in the history
To avoid asserting the type of the database connection.
  • Loading branch information
clokep authored Nov 9, 2023
1 parent dc7f068 commit 2c6a7df
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 22 deletions.
1 change: 1 addition & 0 deletions changelog.d/16615.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Use more generic database methods.
18 changes: 12 additions & 6 deletions synapse/storage/background_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@

if TYPE_CHECKING:
from synapse.server import HomeServer
from synapse.storage.database import DatabasePool, LoggingTransaction
from synapse.storage.database import (
DatabasePool,
LoggingDatabaseConnection,
LoggingTransaction,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -746,10 +750,10 @@ async def create_index_in_background(
The named index will be dropped upon completion of the new index.
"""

def create_index_psql(conn: Connection) -> None:
def create_index_psql(conn: "LoggingDatabaseConnection") -> None:
conn.rollback()
# postgres insists on autocommit for the index
conn.set_session(autocommit=True) # type: ignore
conn.engine.attempt_to_set_autocommit(conn.conn, True)

try:
c = conn.cursor()
Expand Down Expand Up @@ -793,9 +797,9 @@ def create_index_psql(conn: Connection) -> None:
undo_timeout_sql = f"SET statement_timeout = {default_timeout}"
conn.cursor().execute(undo_timeout_sql)

conn.set_session(autocommit=False) # type: ignore
conn.engine.attempt_to_set_autocommit(conn.conn, False)

def create_index_sqlite(conn: Connection) -> None:
def create_index_sqlite(conn: "LoggingDatabaseConnection") -> None:
# Sqlite doesn't support concurrent creation of indexes.
#
# We assume that sqlite doesn't give us invalid indices; however
Expand Down Expand Up @@ -825,7 +829,9 @@ def create_index_sqlite(conn: Connection) -> None:
c.execute(sql)

if isinstance(self.db_pool.engine, engines.PostgresEngine):
runner: Optional[Callable[[Connection], None]] = create_index_psql
runner: Optional[
Callable[[LoggingDatabaseConnection], None]
] = create_index_psql
elif psql_only:
runner = None
else:
Expand Down
8 changes: 4 additions & 4 deletions synapse/storage/databases/main/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ def create_index(conn: LoggingDatabaseConnection) -> None:

# we have to set autocommit, because postgres refuses to
# CREATE INDEX CONCURRENTLY without it.
conn.set_session(autocommit=True)
conn.engine.attempt_to_set_autocommit(conn.conn, True)

try:
c = conn.cursor()
Expand All @@ -301,7 +301,7 @@ def create_index(conn: LoggingDatabaseConnection) -> None:
# we should now be able to delete the GIST index.
c.execute("DROP INDEX IF EXISTS event_search_fts_idx_gist")
finally:
conn.set_session(autocommit=False)
conn.engine.attempt_to_set_autocommit(conn.conn, False)

if isinstance(self.database_engine, PostgresEngine):
await self.db_pool.runWithConnection(create_index)
Expand All @@ -323,7 +323,7 @@ async def _background_reindex_search_order(

def create_index(conn: LoggingDatabaseConnection) -> None:
conn.rollback()
conn.set_session(autocommit=True)
conn.engine.attempt_to_set_autocommit(conn.conn, True)
c = conn.cursor()

# We create with NULLS FIRST so that when we search *backwards*
Expand All @@ -340,7 +340,7 @@ def create_index(conn: LoggingDatabaseConnection) -> None:
ON event_search(origin_server_ts NULLS FIRST, stream_ordering NULLS FIRST)
"""
)
conn.set_session(autocommit=False)
conn.engine.attempt_to_set_autocommit(conn.conn, False)

await self.db_pool.runWithConnection(create_index)

Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/databases/state/bg_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ def reindex_txn(conn: LoggingDatabaseConnection) -> None:
conn.rollback()
if isinstance(self.database_engine, PostgresEngine):
# postgres insists on autocommit for the index
conn.set_session(autocommit=True)
conn.engine.attempt_to_set_autocommit(conn.conn, True)
try:
txn = conn.cursor()
txn.execute(
Expand All @@ -501,7 +501,7 @@ def reindex_txn(conn: LoggingDatabaseConnection) -> None:
)
txn.execute("DROP INDEX IF EXISTS state_groups_state_id")
finally:
conn.set_session(autocommit=False)
conn.engine.attempt_to_set_autocommit(conn.conn, False)
else:
txn = conn.cursor()
txn.execute(
Expand Down
15 changes: 5 additions & 10 deletions tests/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@
from synapse.server import HomeServer
from synapse.storage import DataStore
from synapse.storage.database import LoggingDatabaseConnection
from synapse.storage.engines import PostgresEngine, create_engine
from synapse.storage.engines import create_engine
from synapse.storage.prepare_database import prepare_database
from synapse.types import ISynapseReactor, JsonDict
from synapse.util import Clock
Expand Down Expand Up @@ -1029,18 +1029,15 @@ def setup_test_homeserver(

# Create the database before we actually try and connect to it, based off
# the template database we generate in setupdb()
if isinstance(db_engine, PostgresEngine):
import psycopg2.extensions

if USE_POSTGRES_FOR_TESTS:
db_conn = db_engine.module.connect(
dbname=POSTGRES_BASE_DB,
user=POSTGRES_USER,
host=POSTGRES_HOST,
port=POSTGRES_PORT,
password=POSTGRES_PASSWORD,
)
assert isinstance(db_conn, psycopg2.extensions.connection)
db_conn.autocommit = True
db_engine.attempt_to_set_autocommit(db_conn, True)
cur = db_conn.cursor()
cur.execute("DROP DATABASE IF EXISTS %s;" % (test_db,))
cur.execute(
Expand All @@ -1065,13 +1062,12 @@ def setup_test_homeserver(

hs.setup()

if isinstance(db_engine, PostgresEngine):
if USE_POSTGRES_FOR_TESTS:
database_pool = hs.get_datastores().databases[0]

# We need to do cleanup on PostgreSQL
def cleanup() -> None:
import psycopg2
import psycopg2.extensions

# Close all the db pools
database_pool._db_pool.close()
Expand All @@ -1086,8 +1082,7 @@ def cleanup() -> None:
port=POSTGRES_PORT,
password=POSTGRES_PASSWORD,
)
assert isinstance(db_conn, psycopg2.extensions.connection)
db_conn.autocommit = True
db_engine.attempt_to_set_autocommit(db_conn, True)
cur = db_conn.cursor()

# Try a few times to drop the DB. Some things may hold on to the
Expand Down

0 comments on commit 2c6a7df

Please sign in to comment.