From 7124bd67eb5f22d785d90455c0db6c06b1730b10 Mon Sep 17 00:00:00 2001 From: Ben Butler-Cole Date: Tue, 24 Sep 2024 10:00:01 +0100 Subject: [PATCH] Reuse connections when resetting tables --- metrics/timescaledb/db.py | 57 ++++++++++++++-------------- tests/metrics/timescaledb/test_db.py | 30 +++++++-------- 2 files changed, 41 insertions(+), 46 deletions(-) diff --git a/metrics/timescaledb/db.py b/metrics/timescaledb/db.py index 3309550..ac284cf 100644 --- a/metrics/timescaledb/db.py +++ b/metrics/timescaledb/db.py @@ -13,9 +13,10 @@ def reset_table(table, batch_size=None): - _drop_table(table, batch_size) - _ensure_table(table) - log.info("Reset table", table=table.name) + with _get_engine().begin() as connection: + _drop_table(connection, table, batch_size) + _ensure_table(connection, table) + log.info("Reset table", table=table.name) def write(table, rows): @@ -68,30 +69,29 @@ def _batch_size(table): return max_params // len(table.columns) -def _drop_table(table, batch_size): - with _get_engine().begin() as connection: - log.debug("Removing table: %s", table.name) +def _drop_table(connection, table, batch_size, cascade=False): + log.debug("Removing table: %s", table.name) - if not _has_table(connection, table): - return + if not _has_table(connection, table): + return - if _is_hypertable(table): - # We have limited shared memory in our hosted database, so we can't DROP or - # TRUNCATE our hypertables. Instead for each "raw" table we need to: - # * empty the raw rows (from the named table) in batches - # * drop the sharded "child" tables in batches - # * drop the now empty raw table - while _has_rows(connection, table): - _delete_rows(connection, table, batch_size) + if _is_hypertable(table): + # We have limited shared memory in our hosted database, so we can't DROP or + # TRUNCATE our hypertables. Instead for each "raw" table we need to: + # * empty the raw rows (from the named table) in batches + # * drop the sharded "child" tables in batches + # * drop the now empty raw table + while _has_rows(connection, table): + _delete_rows(connection, table, batch_size) - log.debug("Removed all raw rows", table=table.name) + log.debug("Removed all raw rows", table=table.name) - _drop_child_tables(connection, table) - log.debug("Removed all child tables", table=table.name) + _drop_child_tables(connection, table) + log.debug("Removed all child tables", table=table.name) - connection.execute(text(f"DROP TABLE {table.name}")) + connection.execute(text(f"DROP TABLE {table.name}")) - log.debug("Removed raw table", table=table.name) + log.debug("Removed raw table", table=table.name) def _has_table(connection, table): @@ -142,16 +142,15 @@ def _drop_child_tables(connection, table): connection.execute(text(f"DROP TABLE IF EXISTS {tables}")) -def _ensure_table(table): - with _get_engine().begin() as connection: - connection.execute(schema.CreateTable(table, if_not_exists=True)) +def _ensure_table(connection, table): + connection.execute(schema.CreateTable(table, if_not_exists=True)) - if _is_hypertable(table): - connection.execute( - text( - f"SELECT create_hypertable('{table.name}', 'time', if_not_exists => TRUE);" - ) + if _is_hypertable(table): + connection.execute( + text( + f"SELECT create_hypertable('{table.name}', 'time', if_not_exists => TRUE);" ) + ) @functools.cache diff --git a/tests/metrics/timescaledb/test_db.py b/tests/metrics/timescaledb/test_db.py index 58f17e8..a16976e 100644 --- a/tests/metrics/timescaledb/test_db.py +++ b/tests/metrics/timescaledb/test_db.py @@ -34,7 +34,7 @@ def get_rows(engine, table): return connection.execute(select(table)).all() -def assert_is_hypertable(engine, table): +def assert_is_hypertable(connection, engine, table): sql = """ SELECT count(*) @@ -46,8 +46,7 @@ def assert_is_hypertable(engine, table): trigger_name = 'ts_insert_blocker'; """ - with engine.connect() as connection: - result = connection.execute(text(sql), {"table_name": table.name}).fetchone() + result = connection.execute(text(sql), {"table_name": table.name}).fetchone() # We should have one trigger called ts_insert_blocker for a hypertable. assert result[0] == 1, result @@ -75,25 +74,19 @@ def hypertable(request): def test_ensure_table(engine, table): with engine.begin() as connection: assert not db._has_table(connection, table) - - db._ensure_table(table) - - with engine.begin() as connection: + db._ensure_table(connection, table) assert db._has_table(connection, table) def test_ensure_hypertable(engine, hypertable): with engine.begin() as connection: assert not db._has_table(connection, hypertable) - - db._ensure_table(hypertable) - - with engine.begin() as connection: + db._ensure_table(connection, hypertable) assert db._has_table(connection, hypertable) - # check there are timescaledb child tables - # https://stackoverflow.com/questions/1461722/how-to-find-child-tables-that-inherit-from-another-table-in-psql - assert_is_hypertable(engine, hypertable) + # check there are timescaledb child tables + # https://stackoverflow.com/questions/1461722/how-to-find-child-tables-that-inherit-from-another-table-in-psql + assert_is_hypertable(connection, engine, hypertable) def test_get_url(monkeypatch): @@ -114,7 +107,8 @@ def test_get_url_with_prefix(monkeypatch): def test_reset_table(engine, table): - db._ensure_table(table) + with engine.begin() as connection: + db._ensure_table(connection, table) # put enough rows in the db to make sure we exercise the batch removal of rows batch_size = 5 @@ -126,7 +120,8 @@ def test_reset_table(engine, table): def test_reset_hypertable(engine, hypertable): - db._ensure_table(hypertable) + with engine.begin() as connection: + db._ensure_table(connection, hypertable) # put enough rows in the db to make sure we exercise the batch removal of rows batch_size = 5 @@ -156,7 +151,8 @@ def check_reset(batch_size, engine, rows, table): def test_write(engine, table): # set up a table to write to - db._ensure_table(table) + with engine.begin() as connection: + db._ensure_table(connection, table) rows = [{"value": "write" + str(i)} for i in range(1, 4)] db.write(table, rows)