From 4bec625f4f06575ab527a41c1625a94493eee0e6 Mon Sep 17 00:00:00 2001 From: Phillip Cloud <417981+cpcloud@users.noreply.github.com> Date: Sat, 7 Sep 2024 09:27:07 -0400 Subject: [PATCH 01/15] refactor(backends): clean up memtable resources --- ibis/backends/__init__.py | 7 ++++ ibis/backends/duckdb/__init__.py | 10 +++--- ibis/backends/exasol/__init__.py | 30 ++++++++-------- ibis/backends/mssql/__init__.py | 19 +++++----- ibis/backends/mysql/__init__.py | 19 +++++----- ibis/backends/oracle/__init__.py | 24 +++++++------ ibis/backends/pandas/__init__.py | 3 ++ ibis/backends/polars/__init__.py | 4 +++ ibis/backends/postgres/__init__.py | 12 ++++--- ibis/backends/pyspark/__init__.py | 7 ---- ibis/backends/risingwave/__init__.py | 18 +++++----- ibis/backends/snowflake/__init__.py | 21 +++++++---- ibis/backends/sql/__init__.py | 4 +++ ibis/backends/sqlite/__init__.py | 6 ---- ibis/backends/tests/test_client.py | 52 ++++++++++++++++++++++++++++ ibis/backends/trino/__init__.py | 5 --- 16 files changed, 147 insertions(+), 94 deletions(-) diff --git a/ibis/backends/__init__.py b/ibis/backends/__init__.py index 7db4ba06f4ae..41b3a87eced1 100644 --- a/ibis/backends/__init__.py +++ b/ibis/backends/__init__.py @@ -1116,6 +1116,7 @@ def _register_in_memory_tables(self, expr: ir.Expr) -> None: for memtable in expr.op().find(ops.InMemoryTable): if not self._in_memory_table_exists(memtable.name): self._register_in_memory_table(memtable) + self._register_memtable_finalizer(memtable) def _register_in_memory_table(self, op: ops.InMemoryTable) -> None: if self.supports_in_memory_tables: @@ -1123,6 +1124,12 @@ def _register_in_memory_table(self, op: ops.InMemoryTable) -> None: f"{self.name} must implement `_register_in_memory_table` to support in-memory tables" ) + def _register_memtable_finalizer(self, op: ops.InMemoryTable) -> None: + if self.supports_in_memory_tables: + raise NotImplementedError( + f"{self.name} must implement `_register_memtable_finalizer` to support in-memory tables" + ) + def _run_pre_execute_hooks(self, expr: ir.Expr) -> None: """Backend-specific hooks to run before an expression is executed.""" self._register_udfs(expr) diff --git a/ibis/backends/duckdb/__init__.py b/ibis/backends/duckdb/__init__.py index 73bc9629cdce..d5f29f156f0e 100644 --- a/ibis/backends/duckdb/__init__.py +++ b/ibis/backends/duckdb/__init__.py @@ -160,12 +160,9 @@ def create_table( properties.append(sge.TemporaryProperty()) catalog = "temp" - temp_memtable_view = None - if obj is not None: if not isinstance(obj, ir.Expr): table = ibis.memtable(obj) - temp_memtable_view = table.op().name else: table = obj @@ -234,9 +231,6 @@ def create_table( ).sql(self.name) ) - if temp_memtable_view is not None: - self.con.unregister(temp_memtable_view) - return self.table(name, database=(catalog, database)) def table( @@ -1620,10 +1614,14 @@ def _in_memory_table_exists(self, name: str) -> bool: def _register_in_memory_table(self, op: ops.InMemoryTable) -> None: self.con.register(op.name, op.data.to_pyarrow(op.schema)) + def _register_memtable_finalizer(self, op: ops.InMemoryTable): # if we don't aggressively unregister tables duckdb will keep a # reference to every memtable ever registered, even if there's no # way for a user to access the operation anymore, resulting in a # memory leak + # + # we can't use drop_table, because self.con.register creates a view, so + # use the corresponding unregister method weakref.finalize(op, self.con.unregister, op.name) def _register_udfs(self, expr: ir.Expr) -> None: diff --git a/ibis/backends/exasol/__init__.py b/ibis/backends/exasol/__init__.py index c31d19b334db..8889e1c9105c 100644 --- a/ibis/backends/exasol/__init__.py +++ b/ibis/backends/exasol/__init__.py @@ -1,9 +1,9 @@ from __future__ import annotations -import atexit import contextlib import datetime import re +import weakref from typing import TYPE_CHECKING, Any from urllib.parse import unquote_plus @@ -278,12 +278,14 @@ def process_item(item: Any): with self._safe_raw_sql(create_stmt_sql): if not df.empty: self.con.ext.insert_multi(name, rows) - atexit.register(self._clean_up_tmp_table, ident) - def _clean_up_tmp_table(self, ident: sge.Identifier) -> None: - with self._safe_raw_sql( - sge.Drop(kind="TABLE", this=ident, exists=True, cascade=True) - ): + def _register_memtable_finalizer(self, op: ops.InMemoryTable): + weakref.finalize(op, self._clean_up_tmp_table, op.name) + + def _clean_up_tmp_table(self, name: str) -> None: + ident = sg.to_identifier(name, quoted=self.compiler.quoted) + sql = sge.Drop(kind="TABLE", this=ident, exists=True, cascade=True) + with self._safe_raw_sql(sql): pass def create_table( @@ -334,11 +336,9 @@ def create_table( quoted = self.compiler.quoted - temp_memtable_view = None if obj is not None: if not isinstance(obj, ir.Expr): table = ibis.memtable(obj) - temp_memtable_view = table.op().name else: table = obj @@ -356,8 +356,10 @@ def create_table( if not schema: schema = table.schema() - table = sg.table(temp_name, catalog=database, quoted=quoted) - target = sge.Schema(this=table, expressions=schema.to_sqlglot(self.dialect)) + table_expr = sg.table(temp_name, catalog=database, quoted=quoted) + target = sge.Schema( + this=table_expr, expressions=schema.to_sqlglot(self.dialect) + ) create_stmt = sge.Create(kind="TABLE", this=target) @@ -365,7 +367,7 @@ def create_table( with self._safe_raw_sql(create_stmt): if query is not None: self.con.execute( - sge.Insert(this=table, expression=query).sql(self.name) + sge.Insert(this=table_expr, expression=query).sql(self.name) ) if overwrite: @@ -373,14 +375,10 @@ def create_table( sge.Drop(kind="TABLE", this=this, exists=True).sql(self.name) ) self.con.execute( - f"RENAME TABLE {table.sql(self.name)} TO {this.sql(self.name)}" + f"RENAME TABLE {table_expr.sql(self.name)} TO {this.sql(self.name)}" ) if schema is None: - # Clean up temporary memtable if we've created one - # for in-memory reads - if temp_memtable_view is not None: - self.drop_table(temp_memtable_view) return self.table(name, database=database) # preserve the input schema if it was provided diff --git a/ibis/backends/mssql/__init__.py b/ibis/backends/mssql/__init__.py index e42fd6a0d746..e20624e1cd07 100644 --- a/ibis/backends/mssql/__init__.py +++ b/ibis/backends/mssql/__init__.py @@ -625,11 +625,9 @@ def create_table( properties.append(sge.TemporaryProperty()) catalog, db = None, None - temp_memtable_view = None if obj is not None: if not isinstance(obj, ir.Expr): table = ibis.memtable(obj) - temp_memtable_view = table.op().name else: table = obj @@ -647,11 +645,14 @@ def create_table( if not schema: schema = table.schema() - table = sg.table( - "#" * temp + temp_name, catalog=catalog, db=db, quoted=self.compiler.quoted - ) + quoted = self.compiler.quoted raw_table = sg.table(temp_name, catalog=catalog, db=db, quoted=False) - target = sge.Schema(this=table, expressions=schema.to_sqlglot(self.dialect)) + target = sge.Schema( + this=sg.table( + "#" * temp + temp_name, catalog=catalog, db=db, quoted=quoted + ), + expressions=schema.to_sqlglot(self.dialect), + ) create_stmt = sge.Create( kind="TABLE", @@ -659,7 +660,7 @@ def create_table( properties=sge.Properties(expressions=properties), ) - this = sg.table(name, catalog=catalog, db=db, quoted=self.compiler.quoted) + this = sg.table(name, catalog=catalog, db=db, quoted=quoted) raw_this = sg.table(name, catalog=catalog, db=db, quoted=False) with self._safe_ddl(create_stmt) as cur: if query is not None: @@ -692,10 +693,6 @@ def create_table( db = "dbo" if schema is None: - # Clean up temporary memtable if we've created one - # for in-memory reads - if temp_memtable_view is not None: - self.drop_table(temp_memtable_view) return self.table(name, database=(catalog, db)) # preserve the input schema if it was provided diff --git a/ibis/backends/mysql/__init__.py b/ibis/backends/mysql/__init__.py index 9600f380065e..8f4f5a599c4d 100644 --- a/ibis/backends/mysql/__init__.py +++ b/ibis/backends/mysql/__init__.py @@ -403,11 +403,9 @@ def create_table( if temp: properties.append(sge.TemporaryProperty()) - temp_memtable_view = None if obj is not None: if not isinstance(obj, ir.Expr): table = ibis.memtable(obj) - temp_memtable_view = table.op().name else: table = obj @@ -425,8 +423,10 @@ def create_table( if not schema: schema = table.schema() - table = sg.table(temp_name, catalog=database, quoted=self.compiler.quoted) - target = sge.Schema(this=table, expressions=schema.to_sqlglot(self.dialect)) + table_expr = sg.table(temp_name, catalog=database, quoted=self.compiler.quoted) + target = sge.Schema( + this=table_expr, expressions=schema.to_sqlglot(self.dialect) + ) create_stmt = sge.Create( kind="TABLE", @@ -437,7 +437,9 @@ def create_table( this = sg.table(name, catalog=database, quoted=self.compiler.quoted) with self._safe_raw_sql(create_stmt) as cur: if query is not None: - insert_stmt = sge.Insert(this=table, expression=query).sql(self.name) + insert_stmt = sge.Insert(this=table_expr, expression=query).sql( + self.name + ) cur.execute(insert_stmt) if overwrite: @@ -445,15 +447,10 @@ def create_table( sge.Drop(kind="TABLE", this=this, exists=True).sql(self.name) ) cur.execute( - f"ALTER TABLE IF EXISTS {table.sql(self.name)} RENAME TO {this.sql(self.name)}" + f"ALTER TABLE IF EXISTS {table_expr.sql(self.name)} RENAME TO {this.sql(self.name)}" ) if schema is None: - # Clean up temporary memtable if we've created one - # for in-memory reads - if temp_memtable_view is not None: - self.drop_table(temp_memtable_view) - return self.table(name, database=database) # preserve the input schema if it was provided diff --git a/ibis/backends/oracle/__init__.py b/ibis/backends/oracle/__init__.py index 8cbfbfa372d3..ddce12d810ff 100644 --- a/ibis/backends/oracle/__init__.py +++ b/ibis/backends/oracle/__init__.py @@ -2,10 +2,10 @@ from __future__ import annotations -import atexit import contextlib import re import warnings +import weakref from functools import cached_property from operator import itemgetter from typing import TYPE_CHECKING, Any @@ -419,11 +419,9 @@ def create_table( if temp: properties.append(sge.TemporaryProperty()) - temp_memtable_view = None if obj is not None: if not isinstance(obj, ir.Expr): table = ibis.memtable(obj) - temp_memtable_view = table.op().name else: table = obj @@ -468,10 +466,6 @@ def create_table( ) if schema is None: - # Clean up temporary memtable if we've created one - # for in-memory reads - if temp_memtable_view is not None: - self.drop_table(temp_memtable_view) return self.table(name, database=database) # preserve the input schema if it was provided @@ -527,7 +521,8 @@ def _register_in_memory_table(self, op: ops.InMemoryTable) -> None: insert_stmt, list(data.iloc[start:end].itertuples(index=False)) ) - atexit.register(self._clean_up_tmp_table, name) + def _register_memtable_finalizer(self, op: ops.InMemoryTable): + weakref.finalize(op, self._clean_up_tmp_table, op.name) def _get_schema_using_query(self, query: str) -> sch.Schema: name = util.gen_name("oracle_metadata") @@ -608,6 +603,13 @@ def _fetch_from_cursor(self, cursor, schema: sch.Schema) -> pd.DataFrame: return OraclePandasData.convert_table(df, schema) def _clean_up_tmp_table(self, name: str) -> None: + dialect = self.dialect + + ident = sg.to_identifier(name, quoted=self.compiler.quoted) + + truncate = sge.TruncateTable(expressions=[ident]).sql(dialect) + drop = sge.Drop(kind="TABLE", this=ident).sql(dialect) + with self.begin() as bind: # global temporary tables cannot be dropped without first truncating them # @@ -616,9 +618,9 @@ def _clean_up_tmp_table(self, name: str) -> None: # ignore DatabaseError exceptions because the table may not exist # because it's already been deleted with contextlib.suppress(oracledb.DatabaseError): - bind.execute(f'TRUNCATE TABLE "{name}"') + bind.execute(truncate) with contextlib.suppress(oracledb.DatabaseError): - bind.execute(f'DROP TABLE "{name}"') + bind.execute(drop) - def _drop_cached_table(self, name): + def _drop_cached_table(self, name: str) -> None: self._clean_up_tmp_table(name) diff --git a/ibis/backends/pandas/__init__.py b/ibis/backends/pandas/__init__.py index b26a6e7ead9f..3e12095f5d56 100644 --- a/ibis/backends/pandas/__init__.py +++ b/ibis/backends/pandas/__init__.py @@ -331,6 +331,9 @@ def execute(self, query, params=None, limit="default", **kwargs): def _create_cached_table(self, name, expr): return self.create_table(name, expr.execute()) + def _register_memtable_finalizer(self, op: ops.InMemoryTable): + """No-op, let Python handle clean up.""" + @lazy_singledispatch def _convert_object(obj: Any, _conn): diff --git a/ibis/backends/polars/__init__.py b/ibis/backends/polars/__init__.py index 8afabfc79b3d..60c33c9a5f87 100644 --- a/ibis/backends/polars/__init__.py +++ b/ibis/backends/polars/__init__.py @@ -1,5 +1,6 @@ from __future__ import annotations +import weakref from collections.abc import Iterable, Mapping from functools import lru_cache from pathlib import Path @@ -81,6 +82,9 @@ def _in_memory_table_exists(self, name: str) -> bool: def _register_in_memory_table(self, op: ops.InMemoryTable) -> None: self._add_table(op.name, op.data.to_polars(op.schema).lazy()) + def _register_memtable_finalizer(self, op: ops.InMemoryTable) -> None: + weakref.finalize(op, self.drop_table, op.name, force=True) + @deprecated( as_of="9.1", instead="use the explicit `read_*` method for the filetype you are trying to read, e.g., read_parquet, read_csv, etc.", diff --git a/ibis/backends/postgres/__init__.py b/ibis/backends/postgres/__init__.py index c36d3874f550..fcfa517a8aff 100644 --- a/ibis/backends/postgres/__init__.py +++ b/ibis/backends/postgres/__init__.py @@ -663,8 +663,10 @@ def create_table( if not schema: schema = table.schema() - table = sg.table(temp_name, db=database, quoted=self.compiler.quoted) - target = sge.Schema(this=table, expressions=schema.to_sqlglot(self.dialect)) + table_expr = sg.table(temp_name, db=database, quoted=self.compiler.quoted) + target = sge.Schema( + this=table_expr, expressions=schema.to_sqlglot(self.dialect) + ) create_stmt = sge.Create( kind="TABLE", @@ -675,7 +677,9 @@ def create_table( this = sg.table(name, catalog=database, quoted=self.compiler.quoted) with self._safe_raw_sql(create_stmt) as cur: if query is not None: - insert_stmt = sge.Insert(this=table, expression=query).sql(self.dialect) + insert_stmt = sge.Insert(this=table_expr, expression=query).sql( + self.dialect + ) cur.execute(insert_stmt) if overwrite: @@ -683,7 +687,7 @@ def create_table( sge.Drop(kind="TABLE", this=this, exists=True).sql(self.dialect) ) cur.execute( - f"ALTER TABLE IF EXISTS {table.sql(self.dialect)} RENAME TO {this.sql(self.dialect)}" + f"ALTER TABLE IF EXISTS {table_expr.sql(self.dialect)} RENAME TO {this.sql(self.dialect)}" ) if schema is None: diff --git a/ibis/backends/pyspark/__init__.py b/ibis/backends/pyspark/__init__.py index 9899f220702f..def05da78f82 100644 --- a/ibis/backends/pyspark/__init__.py +++ b/ibis/backends/pyspark/__init__.py @@ -594,13 +594,11 @@ def create_table( table_loc = self._to_sqlglot_table(database) catalog, db = self._to_catalog_db_tuple(table_loc) - temp_memtable_view = None if obj is not None: if isinstance(obj, ir.Expr): table = obj else: table = ibis.memtable(obj) - temp_memtable_view = table.op().name query = self.compile(table) mode = "overwrite" if overwrite else "error" with self._active_catalog_database(catalog, db): @@ -615,11 +613,6 @@ def create_table( else: raise com.IbisError("The schema or obj parameter is required") - # Clean up temporary memtable if we've created one - # for in-memory reads - if temp_memtable_view is not None: - self.drop_table(temp_memtable_view) - return self.table(name, database=(catalog, db)) def create_view( diff --git a/ibis/backends/risingwave/__init__.py b/ibis/backends/risingwave/__init__.py index 2ccbe3b7a399..e824d93d93a3 100644 --- a/ibis/backends/risingwave/__init__.py +++ b/ibis/backends/risingwave/__init__.py @@ -195,11 +195,9 @@ def create_table( f"Creating temp tables is not supported by {self.name}" ) - temp_memtable_view = None if obj is not None: if not isinstance(obj, ir.Expr): table = ibis.memtable(obj) - temp_memtable_view = table.op().name else: table = obj @@ -217,8 +215,10 @@ def create_table( if not schema: schema = table.schema() - table = sg.table(temp_name, db=database, quoted=self.compiler.quoted) - target = sge.Schema(this=table, expressions=schema.to_sqlglot(self.dialect)) + table_expr = sg.table(temp_name, db=database, quoted=self.compiler.quoted) + target = sge.Schema( + this=table_expr, expressions=schema.to_sqlglot(self.dialect) + ) if connector_properties is None: create_stmt = sge.Create( @@ -241,20 +241,18 @@ def create_table( this = sg.table(name, db=database, quoted=self.compiler.quoted) with self._safe_raw_sql(create_stmt) as cur: if query is not None: - insert_stmt = sge.Insert(this=table, expression=query).sql(self.dialect) + insert_stmt = sge.Insert(this=table_expr, expression=query).sql( + self.dialect + ) cur.execute(insert_stmt) if overwrite: self.drop_table(name, database=database, force=True) cur.execute( - f"ALTER TABLE {table.sql(self.dialect)} RENAME TO {this.sql(self.dialect)}" + f"ALTER TABLE {table_expr.sql(self.dialect)} RENAME TO {this.sql(self.dialect)}" ) if schema is None: - # Clean up temporary memtable if we've created one - # for in-memory reads - if temp_memtable_view is not None: - self.drop_table(temp_memtable_view) return self.table(name, database=database) # preserve the input schema if it was provided diff --git a/ibis/backends/snowflake/__init__.py b/ibis/backends/snowflake/__init__.py index 91dfb8221297..8f77fcc96cc7 100644 --- a/ibis/backends/snowflake/__init__.py +++ b/ibis/backends/snowflake/__init__.py @@ -8,6 +8,7 @@ import os import tempfile import warnings +import weakref from operator import itemgetter from pathlib import Path from typing import TYPE_CHECKING, Any @@ -662,6 +663,19 @@ def _register_in_memory_table(self, op: ops.InMemoryTable) -> None: pq.write_table(data, path, compression="zstd") self.read_parquet(path, table_name=name) + def _register_memtable_finalizer(self, op: ops.InMemoryTable): + def drop_table(con, sql: str): + if not con.is_closed(): + with con.cursor() as cur: + cur.execute(sql) + + drop_stmt = sg.exp.Drop( + kind="TABLE", + this=sg.table(op.name, quoted=self.compiler.quoted), + exists=True, + ) + weakref.finalize(op, drop_table, self.con, drop_stmt.sql(self.dialect)) + def create_catalog(self, name: str, force: bool = False) -> None: current_catalog = self.current_catalog current_database = self.current_database @@ -821,11 +835,9 @@ def create_table( if comment is not None: properties.append(sge.SchemaCommentProperty(this=sge.convert(comment))) - temp_memtable_view = None if obj is not None: if not isinstance(obj, ir.Expr): table = ibis.memtable(obj) - temp_memtable_view = table.op().name else: table = obj @@ -846,11 +858,6 @@ def create_table( with self._safe_raw_sql(create_stmt): pass - # Clean up temporary memtable if we've created one - # for in-memory reads - if temp_memtable_view is not None: - self.drop_table(temp_memtable_view) - return self.table(name, database=(catalog, db)) def read_csv( diff --git a/ibis/backends/sql/__init__.py b/ibis/backends/sql/__init__.py index 1b7896fb794d..a5e43c2dfd08 100644 --- a/ibis/backends/sql/__init__.py +++ b/ibis/backends/sql/__init__.py @@ -1,6 +1,7 @@ from __future__ import annotations import abc +import weakref from functools import partial from typing import TYPE_CHECKING, Any, ClassVar @@ -618,3 +619,6 @@ def _register_pandas_udf(self, udf_node: ops.ScalarUDF) -> str: raise NotImplementedError( f"pandas UDFs are not supported in the {self.dialect} backend" ) + + def _register_memtable_finalizer(self, op: ops.InMemoryTable): + weakref.finalize(op, self.drop_table, op.name, force=True) diff --git a/ibis/backends/sqlite/__init__.py b/ibis/backends/sqlite/__init__.py index 042c8168519a..c52c654486ee 100644 --- a/ibis/backends/sqlite/__init__.py +++ b/ibis/backends/sqlite/__init__.py @@ -456,11 +456,9 @@ def create_table( if schema is not None: schema = ibis.schema(schema) - temp_memtable_view = None if obj is not None: if not isinstance(obj, ir.Expr): obj = ibis.memtable(obj) - temp_memtable_view = obj.op().name self._run_pre_execute_hooks(obj) @@ -516,10 +514,6 @@ def create_table( ) if schema is None: - # Clean up temporary memtable if we've created one - # for in-memory reads - if temp_memtable_view is not None: - self.drop_table(temp_memtable_view) return self.table(name, database=database) # preserve the input schema if it was provided diff --git a/ibis/backends/tests/test_client.py b/ibis/backends/tests/test_client.py index 1f8bfc55db87..1346bc5e9b32 100644 --- a/ibis/backends/tests/test_client.py +++ b/ibis/backends/tests/test_client.py @@ -1767,3 +1767,55 @@ def test_insert_into_table_missing_columns(con, temp_table): expected_result = {"a": [1], "b": [1]} assert result == expected_result + + +@pytest.mark.never( + ["pandas", "dask"], raises=AssertionError, reason="backend is going away" +) +@pytest.mark.notyet(["druid"], raises=AssertionError, reason="can't drop tables") +@pytest.mark.notyet( + ["clickhouse"], raises=AssertionError, reason="memtables are assembled every time" +) +def test_memtable_cleanup(con): + t = ibis.memtable({"a": [1, 2, 3], "b": list("def")}, name="temp_memtable") + + # the table isn't registered until we actually execute, and since we + # haven't yet executed anything, the table shouldn't be there + assert "temp_memtable" not in con.list_tables() + + # execute, which means the table is registered and should be visible in + # con.list_tables() + con.execute(t.select("a")) + assert "temp_memtable" in con.list_tables() + + con.execute(t.select("b")) + assert "temp_memtable" in con.list_tables() + + # remove all references to `t`, which means the `op` shouldn't be reachable + # and the table should thus be dropped and no longer visible in + # con.list_tables() + del t + assert "temp_memtable" not in con.list_tables() + + +@pytest.mark.never( + ["pandas", "dask"], raises=AssertionError, reason="backend is going away" +) +@pytest.mark.notyet(["druid"], raises=AssertionError, reason="can't drop tables") +@pytest.mark.notyet( + ["clickhouse"], raises=AssertionError, reason="memtables are assembled every time" +) +def test_memtable_cleanup_by_overwriting_variable(con): + t = ibis.memtable({"a": [1, 2, 3], "b": list("def")}, name="temp_memtable") + + assert "temp_memtable" not in con.list_tables() + + con.execute(t.select("a")) + assert "temp_memtable" in con.list_tables() + + con.execute(t.select("b")) + assert "temp_memtable" in con.list_tables() + + # original `t` is gone, so this should drop the table + t = None + assert "temp_memtable" not in con.list_tables() diff --git a/ibis/backends/trino/__init__.py b/ibis/backends/trino/__init__.py index da70f42f40de..7c4ab32ec77b 100644 --- a/ibis/backends/trino/__init__.py +++ b/ibis/backends/trino/__init__.py @@ -483,13 +483,11 @@ def create_table( if comment: property_list.append(sge.SchemaCommentProperty(this=sge.convert(comment))) - temp_memtable_view = None if obj is not None: if isinstance(obj, ir.Table): table = obj else: table = ibis.memtable(obj, schema=schema) - temp_memtable_view = table.op().name self._run_pre_execute_hooks(table) @@ -533,9 +531,6 @@ def create_table( ).sql(self.name) ) - if temp_memtable_view is not None: - self.drop_table(temp_memtable_view) - return self.table(orig_table_ref.name, database=(catalog, db)) def _fetch_from_cursor(self, cursor, schema: sch.Schema) -> pd.DataFrame: From f119e20be02678af78e451b1ec1af4c90154a6d3 Mon Sep 17 00:00:00 2001 From: Phillip Cloud <417981+cpcloud@users.noreply.github.com> Date: Sun, 8 Sep 2024 10:00:46 -0400 Subject: [PATCH 02/15] test: use special mocker assertion methods that give better error messages --- ibis/backends/snowflake/tests/test_client.py | 8 ++++---- ibis/backends/tests/test_dataframe_interchange.py | 14 +++++++------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/ibis/backends/snowflake/tests/test_client.py b/ibis/backends/snowflake/tests/test_client.py index b286a6239305..1b2b956e30ed 100644 --- a/ibis/backends/snowflake/tests/test_client.py +++ b/ibis/backends/snowflake/tests/test_client.py @@ -312,17 +312,17 @@ def test_compile_does_not_make_requests(con, mocker): expr = astronauts.year_of_selection.value_counts() spy = mocker.spy(con.con, "cursor") assert expr.compile() is not None - assert spy.call_count == 0 + spy.assert_not_called() t = ibis.memtable({"a": [1, 2, 3]}) assert con.compile(t) is not None - assert spy.call_count == 0 + spy.assert_not_called() assert ibis.to_sql(t, dialect="snowflake") is not None - assert spy.call_count == 0 + spy.assert_not_called() assert ibis.to_sql(expr) is not None - assert spy.call_count == 0 + spy.assert_not_called() # this won't be hit in CI, but folks can test locally diff --git a/ibis/backends/tests/test_dataframe_interchange.py b/ibis/backends/tests/test_dataframe_interchange.py index 29c14e78ef30..bae32ad6811e 100644 --- a/ibis/backends/tests/test_dataframe_interchange.py +++ b/ibis/backends/tests/test_dataframe_interchange.py @@ -62,12 +62,12 @@ def test_dataframe_interchange_dataframe_methods_execute(con, alltypes, mocker): df = t.__dataframe__() - assert to_pyarrow.call_count == 0 + to_pyarrow.assert_not_called() assert df.metadata == pa_df.metadata assert df.num_rows() == pa_df.num_rows() assert df.num_chunks() == pa_df.num_chunks() assert len(list(df.get_chunks())) == df.num_chunks() - assert to_pyarrow.call_count == 1 + to_pyarrow.assert_called_once() @pytest.mark.notimpl(["flink"]) @@ -81,7 +81,7 @@ def test_dataframe_interchange_column_methods_execute(con, alltypes, mocker): col = df.get_column(0) pa_col = pa_df.get_column(0) - assert to_pyarrow.call_count == 0 + to_pyarrow.assert_not_called() assert col.size() == pa_col.size() assert col.offset == pa_col.offset @@ -91,7 +91,7 @@ def test_dataframe_interchange_column_methods_execute(con, alltypes, mocker): assert col.num_chunks() == pa_col.num_chunks() assert len(list(col.get_chunks())) == pa_col.num_chunks() assert len(list(col.get_buffers())) == len(list(pa_col.get_buffers())) - assert to_pyarrow.call_count == 1 + to_pyarrow.assert_called_once() # Access another column doesn't execute col2 = df.get_column(1) @@ -111,13 +111,13 @@ def test_dataframe_interchange_select_after_execution_no_reexecute( df = t.__dataframe__() # An operation that requires loading data - assert to_pyarrow.call_count == 0 + to_pyarrow.assert_not_called() assert df.num_rows() == pa_df.num_rows() - assert to_pyarrow.call_count == 1 + to_pyarrow.assert_called_once() # Subselect columns doesn't reexecute df2 = df.select_columns([1, 0]) pa_df2 = pa_df.select_columns([1, 0]) assert df2.num_rows() == pa_df2.num_rows() assert df2.column_names() == pa_df2.column_names() - assert to_pyarrow.call_count == 1 + to_pyarrow.assert_called_once() From 85ac3ad64dae199c9d78f68c05df99d108d96fc5 Mon Sep 17 00:00:00 2001 From: Phillip Cloud <417981+cpcloud@users.noreply.github.com> Date: Mon, 9 Sep 2024 08:27:04 -0400 Subject: [PATCH 03/15] test(flink): memtables are constructed every time --- ibis/backends/tests/test_client.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/ibis/backends/tests/test_client.py b/ibis/backends/tests/test_client.py index 1346bc5e9b32..10c3dffd7cd0 100644 --- a/ibis/backends/tests/test_client.py +++ b/ibis/backends/tests/test_client.py @@ -1774,7 +1774,9 @@ def test_insert_into_table_missing_columns(con, temp_table): ) @pytest.mark.notyet(["druid"], raises=AssertionError, reason="can't drop tables") @pytest.mark.notyet( - ["clickhouse"], raises=AssertionError, reason="memtables are assembled every time" + ["clickhouse", "flink"], + raises=AssertionError, + reason="memtables are assembled every time", ) def test_memtable_cleanup(con): t = ibis.memtable({"a": [1, 2, 3], "b": list("def")}, name="temp_memtable") @@ -1803,7 +1805,9 @@ def test_memtable_cleanup(con): ) @pytest.mark.notyet(["druid"], raises=AssertionError, reason="can't drop tables") @pytest.mark.notyet( - ["clickhouse"], raises=AssertionError, reason="memtables are assembled every time" + ["clickhouse", "flink"], + raises=AssertionError, + reason="memtables are assembled every time", ) def test_memtable_cleanup_by_overwriting_variable(con): t = ibis.memtable({"a": [1, 2, 3], "b": list("def")}, name="temp_memtable") From ff12db6abf74f27e52569995f1eff9ba18fbfde8 Mon Sep 17 00:00:00 2001 From: Phillip Cloud <417981+cpcloud@users.noreply.github.com> Date: Tue, 10 Sep 2024 08:05:22 -0400 Subject: [PATCH 04/15] test(memtable): generate a table name instead of a fixed name --- ibis/backends/tests/test_client.py | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/ibis/backends/tests/test_client.py b/ibis/backends/tests/test_client.py index 10c3dffd7cd0..6f30a39c3ab7 100644 --- a/ibis/backends/tests/test_client.py +++ b/ibis/backends/tests/test_client.py @@ -1779,25 +1779,26 @@ def test_insert_into_table_missing_columns(con, temp_table): reason="memtables are assembled every time", ) def test_memtable_cleanup(con): - t = ibis.memtable({"a": [1, 2, 3], "b": list("def")}, name="temp_memtable") + name = ibis.util.gen_name("temp_memtable") + t = ibis.memtable({"a": [1, 2, 3], "b": list("def")}, name=name) # the table isn't registered until we actually execute, and since we # haven't yet executed anything, the table shouldn't be there - assert "temp_memtable" not in con.list_tables() + assert name not in con.list_tables() # execute, which means the table is registered and should be visible in # con.list_tables() con.execute(t.select("a")) - assert "temp_memtable" in con.list_tables() + assert name in con.list_tables() con.execute(t.select("b")) - assert "temp_memtable" in con.list_tables() + assert name in con.list_tables() # remove all references to `t`, which means the `op` shouldn't be reachable # and the table should thus be dropped and no longer visible in # con.list_tables() del t - assert "temp_memtable" not in con.list_tables() + assert name not in con.list_tables() @pytest.mark.never( @@ -1810,16 +1811,17 @@ def test_memtable_cleanup(con): reason="memtables are assembled every time", ) def test_memtable_cleanup_by_overwriting_variable(con): - t = ibis.memtable({"a": [1, 2, 3], "b": list("def")}, name="temp_memtable") + name = ibis.util.gen_name("temp_memtable") + t = ibis.memtable({"a": [1, 2, 3], "b": list("def")}, name=name) - assert "temp_memtable" not in con.list_tables() + assert name not in con.list_tables() con.execute(t.select("a")) - assert "temp_memtable" in con.list_tables() + assert name in con.list_tables() con.execute(t.select("b")) - assert "temp_memtable" in con.list_tables() + assert name in con.list_tables() # original `t` is gone, so this should drop the table t = None - assert "temp_memtable" not in con.list_tables() + assert name not in con.list_tables() From 22cec690c519ea21ba603930567923e3679a9677 Mon Sep 17 00:00:00 2001 From: Phillip Cloud <417981+cpcloud@users.noreply.github.com> Date: Tue, 10 Sep 2024 08:07:47 -0400 Subject: [PATCH 05/15] test(mysql): acquiesce to the sql gods; live to fight another day --- ibis/backends/mysql/__init__.py | 7 +++++++ ibis/backends/tests/test_client.py | 10 ++++++++++ 2 files changed, 17 insertions(+) diff --git a/ibis/backends/mysql/__init__.py b/ibis/backends/mysql/__init__.py index 8f4f5a599c4d..d9cbf8d064c2 100644 --- a/ibis/backends/mysql/__init__.py +++ b/ibis/backends/mysql/__init__.py @@ -535,3 +535,10 @@ def _fetch_from_cursor(self, cursor, schema: sch.Schema) -> pd.DataFrame: raise df = MySQLPandasData.convert_table(df, schema) return df + + def _register_memtable_finalizer(self, op: ops.InMemoryTable): + """No-op. + + Executing **any** SQL in a finalizer causes the underlying connection + socket to be set to `None`. It is unclear why this happens. + """ diff --git a/ibis/backends/tests/test_client.py b/ibis/backends/tests/test_client.py index 6f30a39c3ab7..7aea75b953fe 100644 --- a/ibis/backends/tests/test_client.py +++ b/ibis/backends/tests/test_client.py @@ -1778,6 +1778,11 @@ def test_insert_into_table_missing_columns(con, temp_table): raises=AssertionError, reason="memtables are assembled every time", ) +@pytest.mark.notyet( + ["mysql"], + raises=AssertionError, + reason="can't execute SQL inside of a finalizer without breaking everything", +) def test_memtable_cleanup(con): name = ibis.util.gen_name("temp_memtable") t = ibis.memtable({"a": [1, 2, 3], "b": list("def")}, name=name) @@ -1810,6 +1815,11 @@ def test_memtable_cleanup(con): raises=AssertionError, reason="memtables are assembled every time", ) +@pytest.mark.notyet( + ["mysql"], + raises=AssertionError, + reason="can't execute SQL inside of a finalizer without breaking everything", +) def test_memtable_cleanup_by_overwriting_variable(con): name = ibis.util.gen_name("temp_memtable") t = ibis.memtable({"a": [1, 2, 3], "b": list("def")}, name=name) From a0d43b515fe8924fd66144b4b2b0523b89478430 Mon Sep 17 00:00:00 2001 From: Phillip Cloud <417981+cpcloud@users.noreply.github.com> Date: Tue, 10 Sep 2024 08:10:50 -0400 Subject: [PATCH 06/15] chore(mysql): bring back annoying memtable cleanup --- ibis/backends/mysql/__init__.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/ibis/backends/mysql/__init__.py b/ibis/backends/mysql/__init__.py index d9cbf8d064c2..2ebd125d44d0 100644 --- a/ibis/backends/mysql/__init__.py +++ b/ibis/backends/mysql/__init__.py @@ -403,9 +403,11 @@ def create_table( if temp: properties.append(sge.TemporaryProperty()) + temp_memtable_view = None if obj is not None: if not isinstance(obj, ir.Expr): table = ibis.memtable(obj) + temp_memtable_view = table.op().name else: table = obj @@ -451,6 +453,11 @@ def create_table( ) if schema is None: + # Clean up temporary memtable if we've created one + # for in-memory reads + if temp_memtable_view is not None: + self.drop_table(temp_memtable_view) + return self.table(name, database=database) # preserve the input schema if it was provided From a287865b35c9d60449ebcc749be112d0d17f1b41 Mon Sep 17 00:00:00 2001 From: Phillip Cloud <417981+cpcloud@users.noreply.github.com> Date: Tue, 10 Sep 2024 10:49:25 -0400 Subject: [PATCH 07/15] chore: remove redundant test --- ibis/backends/tests/test_client.py | 31 ------------------------------ 1 file changed, 31 deletions(-) diff --git a/ibis/backends/tests/test_client.py b/ibis/backends/tests/test_client.py index 7aea75b953fe..292a24225fd1 100644 --- a/ibis/backends/tests/test_client.py +++ b/ibis/backends/tests/test_client.py @@ -1804,34 +1804,3 @@ def test_memtable_cleanup(con): # con.list_tables() del t assert name not in con.list_tables() - - -@pytest.mark.never( - ["pandas", "dask"], raises=AssertionError, reason="backend is going away" -) -@pytest.mark.notyet(["druid"], raises=AssertionError, reason="can't drop tables") -@pytest.mark.notyet( - ["clickhouse", "flink"], - raises=AssertionError, - reason="memtables are assembled every time", -) -@pytest.mark.notyet( - ["mysql"], - raises=AssertionError, - reason="can't execute SQL inside of a finalizer without breaking everything", -) -def test_memtable_cleanup_by_overwriting_variable(con): - name = ibis.util.gen_name("temp_memtable") - t = ibis.memtable({"a": [1, 2, 3], "b": list("def")}, name=name) - - assert name not in con.list_tables() - - con.execute(t.select("a")) - assert name in con.list_tables() - - con.execute(t.select("b")) - assert name in con.list_tables() - - # original `t` is gone, so this should drop the table - t = None - assert name not in con.list_tables() From 39dbe026d31b05d7fbc5ac91fe4e0c06b8f51a33 Mon Sep 17 00:00:00 2001 From: Phillip Cloud <417981+cpcloud@users.noreply.github.com> Date: Tue, 10 Sep 2024 10:50:10 -0400 Subject: [PATCH 08/15] chore: make test more robust to interface changes --- ibis/backends/tests/test_client.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/ibis/backends/tests/test_client.py b/ibis/backends/tests/test_client.py index 292a24225fd1..8a9225ae5012 100644 --- a/ibis/backends/tests/test_client.py +++ b/ibis/backends/tests/test_client.py @@ -1789,18 +1789,18 @@ def test_memtable_cleanup(con): # the table isn't registered until we actually execute, and since we # haven't yet executed anything, the table shouldn't be there - assert name not in con.list_tables() + assert not con._in_memory_table_exists(name) # execute, which means the table is registered and should be visible in # con.list_tables() con.execute(t.select("a")) - assert name in con.list_tables() + assert con._in_memory_table_exists(name) con.execute(t.select("b")) - assert name in con.list_tables() + assert con._in_memory_table_exists(name) # remove all references to `t`, which means the `op` shouldn't be reachable # and the table should thus be dropped and no longer visible in # con.list_tables() del t - assert name not in con.list_tables() + assert not con._in_memory_table_exists(name) From 7bcd3274738e27176f702c1f1136121ff229bea3 Mon Sep 17 00:00:00 2001 From: Phillip Cloud <417981+cpcloud@users.noreply.github.com> Date: Tue, 10 Sep 2024 10:51:40 -0400 Subject: [PATCH 09/15] chore(exasol): remove incorrect flag setting --- ibis/backends/exasol/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/ibis/backends/exasol/__init__.py b/ibis/backends/exasol/__init__.py index 8889e1c9105c..b7010e0b5f3e 100644 --- a/ibis/backends/exasol/__init__.py +++ b/ibis/backends/exasol/__init__.py @@ -42,7 +42,6 @@ class Backend(SQLBackend, CanCreateDatabase, CanCreateSchema): compiler = sc.exasol.compiler supports_temporary_tables = False supports_create_or_replace = False - supports_in_memory_tables = False supports_python_udfs = False @property From 191a107a54540cf081a041b540ba0d592d076913 Mon Sep 17 00:00:00 2001 From: Phillip Cloud <417981+cpcloud@users.noreply.github.com> Date: Tue, 10 Sep 2024 11:08:59 -0400 Subject: [PATCH 10/15] chore: factor out finalizer registration --- ibis/backends/__init__.py | 14 +++++++++++--- ibis/backends/duckdb/__init__.py | 5 ++--- ibis/backends/exasol/__init__.py | 6 ++---- ibis/backends/mysql/__init__.py | 2 +- ibis/backends/oracle/__init__.py | 7 +------ ibis/backends/pandas/__init__.py | 2 +- ibis/backends/polars/__init__.py | 5 ++--- ibis/backends/snowflake/__init__.py | 14 -------------- ibis/backends/sql/__init__.py | 5 ++--- 9 files changed, 22 insertions(+), 38 deletions(-) diff --git a/ibis/backends/__init__.py b/ibis/backends/__init__.py index 41b3a87eced1..350937b5938b 100644 --- a/ibis/backends/__init__.py +++ b/ibis/backends/__init__.py @@ -2,6 +2,7 @@ import abc import collections.abc +import contextlib import functools import importlib.metadata import keyword @@ -1116,7 +1117,9 @@ def _register_in_memory_tables(self, expr: ir.Expr) -> None: for memtable in expr.op().find(ops.InMemoryTable): if not self._in_memory_table_exists(memtable.name): self._register_in_memory_table(memtable) - self._register_memtable_finalizer(memtable) + weakref.finalize( + memtable, self._finalize_in_memory_table, memtable.name + ) def _register_in_memory_table(self, op: ops.InMemoryTable) -> None: if self.supports_in_memory_tables: @@ -1124,10 +1127,15 @@ def _register_in_memory_table(self, op: ops.InMemoryTable) -> None: f"{self.name} must implement `_register_in_memory_table` to support in-memory tables" ) - def _register_memtable_finalizer(self, op: ops.InMemoryTable) -> None: + def _finalize_in_memory_table(self, name: str) -> None: + """Wrap `_finalize_memtable` to suppress exceptions.""" + with contextlib.suppress(Exception): + self._finalize_memtable(name) + + def _finalize_memtable(self, name: str) -> None: if self.supports_in_memory_tables: raise NotImplementedError( - f"{self.name} must implement `_register_memtable_finalizer` to support in-memory tables" + f"{self.name} must implement `_finalize_memtable` to support in-memory tables" ) def _run_pre_execute_hooks(self, expr: ir.Expr) -> None: diff --git a/ibis/backends/duckdb/__init__.py b/ibis/backends/duckdb/__init__.py index d5f29f156f0e..7dc8bed0f835 100644 --- a/ibis/backends/duckdb/__init__.py +++ b/ibis/backends/duckdb/__init__.py @@ -7,7 +7,6 @@ import os import urllib import warnings -import weakref from operator import itemgetter from pathlib import Path from typing import TYPE_CHECKING, Any @@ -1614,7 +1613,7 @@ def _in_memory_table_exists(self, name: str) -> bool: def _register_in_memory_table(self, op: ops.InMemoryTable) -> None: self.con.register(op.name, op.data.to_pyarrow(op.schema)) - def _register_memtable_finalizer(self, op: ops.InMemoryTable): + def _finalize_memtable(self, name: str) -> None: # if we don't aggressively unregister tables duckdb will keep a # reference to every memtable ever registered, even if there's no # way for a user to access the operation anymore, resulting in a @@ -1622,7 +1621,7 @@ def _register_memtable_finalizer(self, op: ops.InMemoryTable): # # we can't use drop_table, because self.con.register creates a view, so # use the corresponding unregister method - weakref.finalize(op, self.con.unregister, op.name) + self.con.unregister(name) def _register_udfs(self, expr: ir.Expr) -> None: con = self.con diff --git a/ibis/backends/exasol/__init__.py b/ibis/backends/exasol/__init__.py index b7010e0b5f3e..05456db385f5 100644 --- a/ibis/backends/exasol/__init__.py +++ b/ibis/backends/exasol/__init__.py @@ -3,7 +3,6 @@ import contextlib import datetime import re -import weakref from typing import TYPE_CHECKING, Any from urllib.parse import unquote_plus @@ -278,15 +277,14 @@ def process_item(item: Any): if not df.empty: self.con.ext.insert_multi(name, rows) - def _register_memtable_finalizer(self, op: ops.InMemoryTable): - weakref.finalize(op, self._clean_up_tmp_table, op.name) - def _clean_up_tmp_table(self, name: str) -> None: ident = sg.to_identifier(name, quoted=self.compiler.quoted) sql = sge.Drop(kind="TABLE", this=ident, exists=True, cascade=True) with self._safe_raw_sql(sql): pass + _finalize_memtable = _clean_up_tmp_table + def create_table( self, name: str, diff --git a/ibis/backends/mysql/__init__.py b/ibis/backends/mysql/__init__.py index 2ebd125d44d0..eec4df9ac634 100644 --- a/ibis/backends/mysql/__init__.py +++ b/ibis/backends/mysql/__init__.py @@ -543,7 +543,7 @@ def _fetch_from_cursor(self, cursor, schema: sch.Schema) -> pd.DataFrame: df = MySQLPandasData.convert_table(df, schema) return df - def _register_memtable_finalizer(self, op: ops.InMemoryTable): + def _finalize_memtable(self, name: str) -> None: """No-op. Executing **any** SQL in a finalizer causes the underlying connection diff --git a/ibis/backends/oracle/__init__.py b/ibis/backends/oracle/__init__.py index ddce12d810ff..51d9427a2a6c 100644 --- a/ibis/backends/oracle/__init__.py +++ b/ibis/backends/oracle/__init__.py @@ -5,7 +5,6 @@ import contextlib import re import warnings -import weakref from functools import cached_property from operator import itemgetter from typing import TYPE_CHECKING, Any @@ -521,9 +520,6 @@ def _register_in_memory_table(self, op: ops.InMemoryTable) -> None: insert_stmt, list(data.iloc[start:end].itertuples(index=False)) ) - def _register_memtable_finalizer(self, op: ops.InMemoryTable): - weakref.finalize(op, self._clean_up_tmp_table, op.name) - def _get_schema_using_query(self, query: str) -> sch.Schema: name = util.gen_name("oracle_metadata") dialect = self.name @@ -622,5 +618,4 @@ def _clean_up_tmp_table(self, name: str) -> None: with contextlib.suppress(oracledb.DatabaseError): bind.execute(drop) - def _drop_cached_table(self, name: str) -> None: - self._clean_up_tmp_table(name) + _finalize_memtable = _drop_cached_table = _clean_up_tmp_table diff --git a/ibis/backends/pandas/__init__.py b/ibis/backends/pandas/__init__.py index 3e12095f5d56..8581f9d8adab 100644 --- a/ibis/backends/pandas/__init__.py +++ b/ibis/backends/pandas/__init__.py @@ -331,7 +331,7 @@ def execute(self, query, params=None, limit="default", **kwargs): def _create_cached_table(self, name, expr): return self.create_table(name, expr.execute()) - def _register_memtable_finalizer(self, op: ops.InMemoryTable): + def _finalize_memtable(self, op: ops.InMemoryTable): """No-op, let Python handle clean up.""" diff --git a/ibis/backends/polars/__init__.py b/ibis/backends/polars/__init__.py index 60c33c9a5f87..a81eac1bf340 100644 --- a/ibis/backends/polars/__init__.py +++ b/ibis/backends/polars/__init__.py @@ -1,6 +1,5 @@ from __future__ import annotations -import weakref from collections.abc import Iterable, Mapping from functools import lru_cache from pathlib import Path @@ -82,8 +81,8 @@ def _in_memory_table_exists(self, name: str) -> bool: def _register_in_memory_table(self, op: ops.InMemoryTable) -> None: self._add_table(op.name, op.data.to_polars(op.schema).lazy()) - def _register_memtable_finalizer(self, op: ops.InMemoryTable) -> None: - weakref.finalize(op, self.drop_table, op.name, force=True) + def _finalize_memtable(self, op: ops.InMemoryTable) -> None: + self.drop_table(op.name, force=True) @deprecated( as_of="9.1", diff --git a/ibis/backends/snowflake/__init__.py b/ibis/backends/snowflake/__init__.py index 8f77fcc96cc7..5eb378186414 100644 --- a/ibis/backends/snowflake/__init__.py +++ b/ibis/backends/snowflake/__init__.py @@ -8,7 +8,6 @@ import os import tempfile import warnings -import weakref from operator import itemgetter from pathlib import Path from typing import TYPE_CHECKING, Any @@ -663,19 +662,6 @@ def _register_in_memory_table(self, op: ops.InMemoryTable) -> None: pq.write_table(data, path, compression="zstd") self.read_parquet(path, table_name=name) - def _register_memtable_finalizer(self, op: ops.InMemoryTable): - def drop_table(con, sql: str): - if not con.is_closed(): - with con.cursor() as cur: - cur.execute(sql) - - drop_stmt = sg.exp.Drop( - kind="TABLE", - this=sg.table(op.name, quoted=self.compiler.quoted), - exists=True, - ) - weakref.finalize(op, drop_table, self.con, drop_stmt.sql(self.dialect)) - def create_catalog(self, name: str, force: bool = False) -> None: current_catalog = self.current_catalog current_database = self.current_database diff --git a/ibis/backends/sql/__init__.py b/ibis/backends/sql/__init__.py index a5e43c2dfd08..2d64efe6076b 100644 --- a/ibis/backends/sql/__init__.py +++ b/ibis/backends/sql/__init__.py @@ -1,7 +1,6 @@ from __future__ import annotations import abc -import weakref from functools import partial from typing import TYPE_CHECKING, Any, ClassVar @@ -620,5 +619,5 @@ def _register_pandas_udf(self, udf_node: ops.ScalarUDF) -> str: f"pandas UDFs are not supported in the {self.dialect} backend" ) - def _register_memtable_finalizer(self, op: ops.InMemoryTable): - weakref.finalize(op, self.drop_table, op.name, force=True) + def _finalize_memtable(self, name: str) -> None: + self.drop_table(name, force=True) From b19cc51e407dc39b46961822008570765f5483ed Mon Sep 17 00:00:00 2001 From: Phillip Cloud <417981+cpcloud@users.noreply.github.com> Date: Tue, 10 Sep 2024 11:11:31 -0400 Subject: [PATCH 11/15] chore: fix polars --- ibis/backends/polars/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ibis/backends/polars/__init__.py b/ibis/backends/polars/__init__.py index a81eac1bf340..8fe8df3debed 100644 --- a/ibis/backends/polars/__init__.py +++ b/ibis/backends/polars/__init__.py @@ -81,8 +81,8 @@ def _in_memory_table_exists(self, name: str) -> bool: def _register_in_memory_table(self, op: ops.InMemoryTable) -> None: self._add_table(op.name, op.data.to_polars(op.schema).lazy()) - def _finalize_memtable(self, op: ops.InMemoryTable) -> None: - self.drop_table(op.name, force=True) + def _finalize_memtable(self, name: str) -> None: + self.drop_table(name, force=True) @deprecated( as_of="9.1", From 06ae2e4dd7ed6e33cc5392d0290a9b861cb97bd5 Mon Sep 17 00:00:00 2001 From: Phillip Cloud <417981+cpcloud@users.noreply.github.com> Date: Tue, 10 Sep 2024 11:12:18 -0400 Subject: [PATCH 12/15] chore: fix pandas signature --- ibis/backends/pandas/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ibis/backends/pandas/__init__.py b/ibis/backends/pandas/__init__.py index 8581f9d8adab..5404e744ed01 100644 --- a/ibis/backends/pandas/__init__.py +++ b/ibis/backends/pandas/__init__.py @@ -331,7 +331,7 @@ def execute(self, query, params=None, limit="default", **kwargs): def _create_cached_table(self, name, expr): return self.create_table(name, expr.execute()) - def _finalize_memtable(self, op: ops.InMemoryTable): + def _finalize_memtable(self, name: str) -> None: """No-op, let Python handle clean up.""" From dd2ee705db11a8b57ee1b97a51b10b5d979a3ca4 Mon Sep 17 00:00:00 2001 From: Phillip Cloud <417981+cpcloud@users.noreply.github.com> Date: Tue, 10 Sep 2024 11:15:39 -0400 Subject: [PATCH 13/15] chore: revert test assertion changes --- ibis/backends/snowflake/tests/test_client.py | 8 ++++---- ibis/backends/tests/test_dataframe_interchange.py | 14 +++++++------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/ibis/backends/snowflake/tests/test_client.py b/ibis/backends/snowflake/tests/test_client.py index 1b2b956e30ed..b286a6239305 100644 --- a/ibis/backends/snowflake/tests/test_client.py +++ b/ibis/backends/snowflake/tests/test_client.py @@ -312,17 +312,17 @@ def test_compile_does_not_make_requests(con, mocker): expr = astronauts.year_of_selection.value_counts() spy = mocker.spy(con.con, "cursor") assert expr.compile() is not None - spy.assert_not_called() + assert spy.call_count == 0 t = ibis.memtable({"a": [1, 2, 3]}) assert con.compile(t) is not None - spy.assert_not_called() + assert spy.call_count == 0 assert ibis.to_sql(t, dialect="snowflake") is not None - spy.assert_not_called() + assert spy.call_count == 0 assert ibis.to_sql(expr) is not None - spy.assert_not_called() + assert spy.call_count == 0 # this won't be hit in CI, but folks can test locally diff --git a/ibis/backends/tests/test_dataframe_interchange.py b/ibis/backends/tests/test_dataframe_interchange.py index bae32ad6811e..29c14e78ef30 100644 --- a/ibis/backends/tests/test_dataframe_interchange.py +++ b/ibis/backends/tests/test_dataframe_interchange.py @@ -62,12 +62,12 @@ def test_dataframe_interchange_dataframe_methods_execute(con, alltypes, mocker): df = t.__dataframe__() - to_pyarrow.assert_not_called() + assert to_pyarrow.call_count == 0 assert df.metadata == pa_df.metadata assert df.num_rows() == pa_df.num_rows() assert df.num_chunks() == pa_df.num_chunks() assert len(list(df.get_chunks())) == df.num_chunks() - to_pyarrow.assert_called_once() + assert to_pyarrow.call_count == 1 @pytest.mark.notimpl(["flink"]) @@ -81,7 +81,7 @@ def test_dataframe_interchange_column_methods_execute(con, alltypes, mocker): col = df.get_column(0) pa_col = pa_df.get_column(0) - to_pyarrow.assert_not_called() + assert to_pyarrow.call_count == 0 assert col.size() == pa_col.size() assert col.offset == pa_col.offset @@ -91,7 +91,7 @@ def test_dataframe_interchange_column_methods_execute(con, alltypes, mocker): assert col.num_chunks() == pa_col.num_chunks() assert len(list(col.get_chunks())) == pa_col.num_chunks() assert len(list(col.get_buffers())) == len(list(pa_col.get_buffers())) - to_pyarrow.assert_called_once() + assert to_pyarrow.call_count == 1 # Access another column doesn't execute col2 = df.get_column(1) @@ -111,13 +111,13 @@ def test_dataframe_interchange_select_after_execution_no_reexecute( df = t.__dataframe__() # An operation that requires loading data - to_pyarrow.assert_not_called() + assert to_pyarrow.call_count == 0 assert df.num_rows() == pa_df.num_rows() - to_pyarrow.assert_called_once() + assert to_pyarrow.call_count == 1 # Subselect columns doesn't reexecute df2 = df.select_columns([1, 0]) pa_df2 = pa_df.select_columns([1, 0]) assert df2.num_rows() == pa_df2.num_rows() assert df2.column_names() == pa_df2.column_names() - to_pyarrow.assert_called_once() + assert to_pyarrow.call_count == 1 From b1aab140beaa4d64fedfaf904ff2a309126655eb Mon Sep 17 00:00:00 2001 From: Phillip Cloud <417981+cpcloud@users.noreply.github.com> Date: Tue, 10 Sep 2024 11:31:15 -0400 Subject: [PATCH 14/15] chore(bigquery): implement `_finalize_memtable` so we can handle the session dataset usage --- ibis/backends/bigquery/__init__.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/ibis/backends/bigquery/__init__.py b/ibis/backends/bigquery/__init__.py index fd81a778b9fa..4d053d605e26 100644 --- a/ibis/backends/bigquery/__init__.py +++ b/ibis/backends/bigquery/__init__.py @@ -180,14 +180,19 @@ def _in_memory_table_exists(self, name: str) -> bool: else: return True + def _finalize_memtable(self, name: str) -> None: + session_dataset = self._session_dataset + table_id = sg.table( + name, db=session_dataset.dataset_id, catalog=session_dataset.project + ) + drop_sql_stmt = sge.Drop(kind="TABLE", this=table_id, exists=True) + self.raw_sql(drop_sql_stmt) + def _register_in_memory_table(self, op: ops.InMemoryTable) -> None: session_dataset = self._session_dataset table_id = sg.table( - op.name, - db=session_dataset.dataset_id, - catalog=session_dataset.project, - quoted=False, + op.name, db=session_dataset.dataset_id, catalog=session_dataset.project ).sql(dialect=self.name) bq_schema = BigQuerySchema.from_ibis(op.schema) From 26acf61f84020cfe6cf0e92c86de054f6f3ff0b7 Mon Sep 17 00:00:00 2001 From: Phillip Cloud <417981+cpcloud@users.noreply.github.com> Date: Tue, 10 Sep 2024 11:54:32 -0400 Subject: [PATCH 15/15] chore: apparently quoting project id is not allowed --- ibis/backends/bigquery/__init__.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/ibis/backends/bigquery/__init__.py b/ibis/backends/bigquery/__init__.py index 4d053d605e26..22f6b5e7e479 100644 --- a/ibis/backends/bigquery/__init__.py +++ b/ibis/backends/bigquery/__init__.py @@ -183,7 +183,10 @@ def _in_memory_table_exists(self, name: str) -> bool: def _finalize_memtable(self, name: str) -> None: session_dataset = self._session_dataset table_id = sg.table( - name, db=session_dataset.dataset_id, catalog=session_dataset.project + name, + db=session_dataset.dataset_id, + catalog=session_dataset.project, + quoted=False, ) drop_sql_stmt = sge.Drop(kind="TABLE", this=table_id, exists=True) self.raw_sql(drop_sql_stmt) @@ -192,7 +195,10 @@ def _register_in_memory_table(self, op: ops.InMemoryTable) -> None: session_dataset = self._session_dataset table_id = sg.table( - op.name, db=session_dataset.dataset_id, catalog=session_dataset.project + op.name, + db=session_dataset.dataset_id, + catalog=session_dataset.project, + quoted=False, ).sql(dialect=self.name) bq_schema = BigQuerySchema.from_ibis(op.schema)