Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(backends): clean up resources produced by memtable #10055

Merged
merged 15 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions ibis/backends/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1116,13 +1116,20 @@
for memtable in expr.op().find(ops.InMemoryTable):
if not self._in_memory_table_exists(memtable.name):
self._register_in_memory_table(memtable)
self._register_memtable_finalizer(memtable)

def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
if self.supports_in_memory_tables:
raise NotImplementedError(
f"{self.name} must implement `_register_in_memory_table` to support in-memory tables"
)

def _register_memtable_finalizer(self, op: ops.InMemoryTable) -> None:
cpcloud marked this conversation as resolved.
Show resolved Hide resolved
if self.supports_in_memory_tables:

Check warning on line 1128 in ibis/backends/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/__init__.py#L1128

Added line #L1128 was not covered by tests
raise NotImplementedError(
f"{self.name} must implement `_register_memtable_finalizer` to support in-memory tables"
)

def _run_pre_execute_hooks(self, expr: ir.Expr) -> None:
"""Backend-specific hooks to run before an expression is executed."""
self._register_udfs(expr)
Expand Down
10 changes: 4 additions & 6 deletions ibis/backends/duckdb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,12 +160,9 @@ def create_table(
properties.append(sge.TemporaryProperty())
catalog = "temp"

temp_memtable_view = None

if obj is not None:
if not isinstance(obj, ir.Expr):
table = ibis.memtable(obj)
temp_memtable_view = table.op().name
else:
table = obj

Expand Down Expand Up @@ -234,9 +231,6 @@ def create_table(
).sql(self.name)
)

if temp_memtable_view is not None:
self.con.unregister(temp_memtable_view)

return self.table(name, database=(catalog, database))

def table(
Expand Down Expand Up @@ -1620,10 +1614,14 @@ def _in_memory_table_exists(self, name: str) -> bool:
def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
self.con.register(op.name, op.data.to_pyarrow(op.schema))

def _register_memtable_finalizer(self, op: ops.InMemoryTable):
# if we don't aggressively unregister tables duckdb will keep a
# reference to every memtable ever registered, even if there's no
# way for a user to access the operation anymore, resulting in a
# memory leak
#
# we can't use drop_table, because self.con.register creates a view, so
# use the corresponding unregister method
weakref.finalize(op, self.con.unregister, op.name)
cpcloud marked this conversation as resolved.
Show resolved Hide resolved

def _register_udfs(self, expr: ir.Expr) -> None:
Expand Down
30 changes: 14 additions & 16 deletions ibis/backends/exasol/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from __future__ import annotations

import atexit
import contextlib
import datetime
import re
import weakref
from typing import TYPE_CHECKING, Any
from urllib.parse import unquote_plus

Expand Down Expand Up @@ -278,12 +278,14 @@ def process_item(item: Any):
with self._safe_raw_sql(create_stmt_sql):
if not df.empty:
self.con.ext.insert_multi(name, rows)
atexit.register(self._clean_up_tmp_table, ident)

def _clean_up_tmp_table(self, ident: sge.Identifier) -> None:
with self._safe_raw_sql(
sge.Drop(kind="TABLE", this=ident, exists=True, cascade=True)
):
def _register_memtable_finalizer(self, op: ops.InMemoryTable):
weakref.finalize(op, self._clean_up_tmp_table, op.name)
cpcloud marked this conversation as resolved.
Show resolved Hide resolved

def _clean_up_tmp_table(self, name: str) -> None:
ident = sg.to_identifier(name, quoted=self.compiler.quoted)
sql = sge.Drop(kind="TABLE", this=ident, exists=True, cascade=True)
with self._safe_raw_sql(sql):
pass

def create_table(
Expand Down Expand Up @@ -334,11 +336,9 @@ def create_table(

quoted = self.compiler.quoted

temp_memtable_view = None
if obj is not None:
if not isinstance(obj, ir.Expr):
table = ibis.memtable(obj)
temp_memtable_view = table.op().name
else:
table = obj

Expand All @@ -356,31 +356,29 @@ def create_table(
if not schema:
schema = table.schema()

table = sg.table(temp_name, catalog=database, quoted=quoted)
target = sge.Schema(this=table, expressions=schema.to_sqlglot(self.dialect))
table_expr = sg.table(temp_name, catalog=database, quoted=quoted)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed these variable names because table = <not the memtable> causes it to get GC'd. I gave another example of this in the PR description.

target = sge.Schema(
this=table_expr, expressions=schema.to_sqlglot(self.dialect)
)

create_stmt = sge.Create(kind="TABLE", this=target)

this = sg.table(name, catalog=database, quoted=quoted)
with self._safe_raw_sql(create_stmt):
if query is not None:
self.con.execute(
sge.Insert(this=table, expression=query).sql(self.name)
sge.Insert(this=table_expr, expression=query).sql(self.name)
)

if overwrite:
self.con.execute(
sge.Drop(kind="TABLE", this=this, exists=True).sql(self.name)
)
self.con.execute(
f"RENAME TABLE {table.sql(self.name)} TO {this.sql(self.name)}"
f"RENAME TABLE {table_expr.sql(self.name)} TO {this.sql(self.name)}"
)

if schema is None:
# Clean up temporary memtable if we've created one
# for in-memory reads
if temp_memtable_view is not None:
self.drop_table(temp_memtable_view)
return self.table(name, database=database)

# preserve the input schema if it was provided
Expand Down
19 changes: 8 additions & 11 deletions ibis/backends/mssql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -625,11 +625,9 @@ def create_table(
properties.append(sge.TemporaryProperty())
catalog, db = None, None

temp_memtable_view = None
if obj is not None:
if not isinstance(obj, ir.Expr):
table = ibis.memtable(obj)
temp_memtable_view = table.op().name
else:
table = obj

Expand All @@ -647,19 +645,22 @@ def create_table(
if not schema:
schema = table.schema()

table = sg.table(
"#" * temp + temp_name, catalog=catalog, db=db, quoted=self.compiler.quoted
)
quoted = self.compiler.quoted
raw_table = sg.table(temp_name, catalog=catalog, db=db, quoted=False)
target = sge.Schema(this=table, expressions=schema.to_sqlglot(self.dialect))
target = sge.Schema(
this=sg.table(
"#" * temp + temp_name, catalog=catalog, db=db, quoted=quoted
),
expressions=schema.to_sqlglot(self.dialect),
)

create_stmt = sge.Create(
kind="TABLE",
this=target,
properties=sge.Properties(expressions=properties),
)

this = sg.table(name, catalog=catalog, db=db, quoted=self.compiler.quoted)
this = sg.table(name, catalog=catalog, db=db, quoted=quoted)
raw_this = sg.table(name, catalog=catalog, db=db, quoted=False)
with self._safe_ddl(create_stmt) as cur:
if query is not None:
Expand Down Expand Up @@ -692,10 +693,6 @@ def create_table(
db = "dbo"

if schema is None:
# Clean up temporary memtable if we've created one
# for in-memory reads
if temp_memtable_view is not None:
self.drop_table(temp_memtable_view)
return self.table(name, database=(catalog, db))

# preserve the input schema if it was provided
Expand Down
19 changes: 15 additions & 4 deletions ibis/backends/mysql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,8 +425,10 @@ def create_table(
if not schema:
schema = table.schema()

table = sg.table(temp_name, catalog=database, quoted=self.compiler.quoted)
target = sge.Schema(this=table, expressions=schema.to_sqlglot(self.dialect))
table_expr = sg.table(temp_name, catalog=database, quoted=self.compiler.quoted)
target = sge.Schema(
this=table_expr, expressions=schema.to_sqlglot(self.dialect)
)

create_stmt = sge.Create(
kind="TABLE",
Expand All @@ -437,15 +439,17 @@ def create_table(
this = sg.table(name, catalog=database, quoted=self.compiler.quoted)
with self._safe_raw_sql(create_stmt) as cur:
if query is not None:
insert_stmt = sge.Insert(this=table, expression=query).sql(self.name)
insert_stmt = sge.Insert(this=table_expr, expression=query).sql(
self.name
)
cur.execute(insert_stmt)

if overwrite:
cur.execute(
sge.Drop(kind="TABLE", this=this, exists=True).sql(self.name)
)
cur.execute(
f"ALTER TABLE IF EXISTS {table.sql(self.name)} RENAME TO {this.sql(self.name)}"
f"ALTER TABLE IF EXISTS {table_expr.sql(self.name)} RENAME TO {this.sql(self.name)}"
)

if schema is None:
Expand Down Expand Up @@ -538,3 +542,10 @@ def _fetch_from_cursor(self, cursor, schema: sch.Schema) -> pd.DataFrame:
raise
df = MySQLPandasData.convert_table(df, schema)
return df

def _register_memtable_finalizer(self, op: ops.InMemoryTable):
"""No-op.

Executing **any** SQL in a finalizer causes the underlying connection
socket to be set to `None`. It is unclear why this happens.
"""
24 changes: 13 additions & 11 deletions ibis/backends/oracle/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

from __future__ import annotations

import atexit
import contextlib
import re
import warnings
import weakref
from functools import cached_property
from operator import itemgetter
from typing import TYPE_CHECKING, Any
Expand Down Expand Up @@ -419,11 +419,9 @@ def create_table(
if temp:
properties.append(sge.TemporaryProperty())

temp_memtable_view = None
if obj is not None:
if not isinstance(obj, ir.Expr):
table = ibis.memtable(obj)
temp_memtable_view = table.op().name
else:
table = obj

Expand Down Expand Up @@ -468,10 +466,6 @@ def create_table(
)

if schema is None:
# Clean up temporary memtable if we've created one
# for in-memory reads
if temp_memtable_view is not None:
self.drop_table(temp_memtable_view)
return self.table(name, database=database)

# preserve the input schema if it was provided
Expand Down Expand Up @@ -527,7 +521,8 @@ def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
insert_stmt, list(data.iloc[start:end].itertuples(index=False))
)

atexit.register(self._clean_up_tmp_table, name)
def _register_memtable_finalizer(self, op: ops.InMemoryTable):
weakref.finalize(op, self._clean_up_tmp_table, op.name)

def _get_schema_using_query(self, query: str) -> sch.Schema:
name = util.gen_name("oracle_metadata")
Expand Down Expand Up @@ -608,6 +603,13 @@ def _fetch_from_cursor(self, cursor, schema: sch.Schema) -> pd.DataFrame:
return OraclePandasData.convert_table(df, schema)

def _clean_up_tmp_table(self, name: str) -> None:
dialect = self.dialect

ident = sg.to_identifier(name, quoted=self.compiler.quoted)

truncate = sge.TruncateTable(expressions=[ident]).sql(dialect)
drop = sge.Drop(kind="TABLE", this=ident).sql(dialect)

with self.begin() as bind:
# global temporary tables cannot be dropped without first truncating them
#
Expand All @@ -616,9 +618,9 @@ def _clean_up_tmp_table(self, name: str) -> None:
# ignore DatabaseError exceptions because the table may not exist
# because it's already been deleted
with contextlib.suppress(oracledb.DatabaseError):
bind.execute(f'TRUNCATE TABLE "{name}"')
bind.execute(truncate)
with contextlib.suppress(oracledb.DatabaseError):
bind.execute(f'DROP TABLE "{name}"')
bind.execute(drop)

def _drop_cached_table(self, name):
def _drop_cached_table(self, name: str) -> None:
self._clean_up_tmp_table(name)
3 changes: 3 additions & 0 deletions ibis/backends/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,9 @@ def execute(self, query, params=None, limit="default", **kwargs):
def _create_cached_table(self, name, expr):
return self.create_table(name, expr.execute())

def _register_memtable_finalizer(self, op: ops.InMemoryTable):
"""No-op, let Python handle clean up."""


@lazy_singledispatch
def _convert_object(obj: Any, _conn):
Expand Down
4 changes: 4 additions & 0 deletions ibis/backends/polars/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import weakref
from collections.abc import Iterable, Mapping
from functools import lru_cache
from pathlib import Path
Expand Down Expand Up @@ -81,6 +82,9 @@ def _in_memory_table_exists(self, name: str) -> bool:
def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
self._add_table(op.name, op.data.to_polars(op.schema).lazy())

def _register_memtable_finalizer(self, op: ops.InMemoryTable) -> None:
weakref.finalize(op, self.drop_table, op.name, force=True)

@deprecated(
as_of="9.1",
instead="use the explicit `read_*` method for the filetype you are trying to read, e.g., read_parquet, read_csv, etc.",
Expand Down
12 changes: 8 additions & 4 deletions ibis/backends/postgres/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -663,8 +663,10 @@ def create_table(
if not schema:
schema = table.schema()

table = sg.table(temp_name, db=database, quoted=self.compiler.quoted)
target = sge.Schema(this=table, expressions=schema.to_sqlglot(self.dialect))
table_expr = sg.table(temp_name, db=database, quoted=self.compiler.quoted)
target = sge.Schema(
this=table_expr, expressions=schema.to_sqlglot(self.dialect)
)

create_stmt = sge.Create(
kind="TABLE",
Expand All @@ -675,15 +677,17 @@ def create_table(
this = sg.table(name, catalog=database, quoted=self.compiler.quoted)
with self._safe_raw_sql(create_stmt) as cur:
if query is not None:
insert_stmt = sge.Insert(this=table, expression=query).sql(self.dialect)
insert_stmt = sge.Insert(this=table_expr, expression=query).sql(
self.dialect
)
cur.execute(insert_stmt)

if overwrite:
cur.execute(
sge.Drop(kind="TABLE", this=this, exists=True).sql(self.dialect)
)
cur.execute(
f"ALTER TABLE IF EXISTS {table.sql(self.dialect)} RENAME TO {this.sql(self.dialect)}"
f"ALTER TABLE IF EXISTS {table_expr.sql(self.dialect)} RENAME TO {this.sql(self.dialect)}"
)

if schema is None:
Expand Down
7 changes: 0 additions & 7 deletions ibis/backends/pyspark/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -594,13 +594,11 @@ def create_table(
table_loc = self._to_sqlglot_table(database)
catalog, db = self._to_catalog_db_tuple(table_loc)

temp_memtable_view = None
if obj is not None:
if isinstance(obj, ir.Expr):
table = obj
else:
table = ibis.memtable(obj)
temp_memtable_view = table.op().name
query = self.compile(table)
mode = "overwrite" if overwrite else "error"
with self._active_catalog_database(catalog, db):
Expand All @@ -615,11 +613,6 @@ def create_table(
else:
raise com.IbisError("The schema or obj parameter is required")

# Clean up temporary memtable if we've created one
# for in-memory reads
if temp_memtable_view is not None:
self.drop_table(temp_memtable_view)

return self.table(name, database=(catalog, db))

def create_view(
Expand Down
Loading