Skip to content

Commit

Permalink
refactor(backends): clean up memtable resources
Browse files Browse the repository at this point in the history
  • Loading branch information
cpcloud committed Sep 8, 2024
1 parent 06d20a6 commit fb642a4
Show file tree
Hide file tree
Showing 18 changed files with 187 additions and 144 deletions.
7 changes: 7 additions & 0 deletions ibis/backends/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1116,13 +1116,20 @@ def _register_in_memory_tables(self, expr: ir.Expr) -> None:
for memtable in expr.op().find(ops.InMemoryTable):
if not self._table_exists(memtable.name):
self._register_in_memory_table(memtable)
self._register_memtable_finalizer(memtable)

def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
if self.supports_in_memory_tables:
raise NotImplementedError(
f"{self.name} must implement `_register_in_memory_table` to support in-memory tables"
)

def _register_memtable_finalizer(self, op: ops.InMemoryTable) -> None:
if self.supports_in_memory_tables:

Check warning on line 1128 in ibis/backends/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/__init__.py#L1128

Added line #L1128 was not covered by tests
raise NotImplementedError(
f"{self.name} must implement `_register_memtable_finalizer` to support in-memory tables"
)

def _run_pre_execute_hooks(self, expr: ir.Expr) -> None:
"""Backend-specific hooks to run before an expression is executed."""
self._register_udfs(expr)
Expand Down
12 changes: 6 additions & 6 deletions ibis/backends/duckdb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import os
import urllib
import warnings
import weakref
from operator import itemgetter
from pathlib import Path
from typing import TYPE_CHECKING, Any
Expand Down Expand Up @@ -158,12 +159,9 @@ def create_table(
properties.append(sge.TemporaryProperty())
catalog = "temp"

temp_memtable_view = None

if obj is not None:
if not isinstance(obj, ir.Expr):
table = ibis.memtable(obj)
temp_memtable_view = table.op().name
else:
table = obj

Expand Down Expand Up @@ -242,9 +240,6 @@ def create_table(
).sql(self.name)
)

if temp_memtable_view is not None:
self.con.unregister(temp_memtable_view)

return self.table(name, database=(catalog, database))

def table(
Expand Down Expand Up @@ -1601,6 +1596,11 @@ def _table_exists(self, name: str) -> bool:
def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
self.con.register(op.name, op.data.to_pyarrow(op.schema))

Check warning on line 1597 in ibis/backends/duckdb/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/duckdb/__init__.py#L1596-L1597

Added lines #L1596 - L1597 were not covered by tests

def _register_memtable_finalizer(self, op: ops.InMemoryTable):

Check warning on line 1599 in ibis/backends/duckdb/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/duckdb/__init__.py#L1599

Added line #L1599 was not covered by tests
# we can't use drop_table, because self.con.register creates a view, so
# use the corresponding unregister method
weakref.finalize(op, self.con.unregister, op.name)

Check warning on line 1602 in ibis/backends/duckdb/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/duckdb/__init__.py#L1602

Added line #L1602 was not covered by tests

def _register_udfs(self, expr: ir.Expr) -> None:
con = self.con

Expand Down
28 changes: 12 additions & 16 deletions ibis/backends/exasol/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from __future__ import annotations

import atexit
import contextlib
import datetime
import re
import weakref
from typing import TYPE_CHECKING, Any
from urllib.parse import unquote_plus

Expand Down Expand Up @@ -293,12 +293,14 @@ def process_item(item: Any):
with self._safe_raw_sql(create_stmt_sql):
if not df.empty:
self.con.ext.insert_multi(name, rows)
atexit.register(self._clean_up_tmp_table, ident)

def _clean_up_tmp_table(self, ident: sge.Identifier) -> None:
with self._safe_raw_sql(
sge.Drop(kind="TABLE", this=ident, exists=True, cascade=True)
):
def _register_memtable_finalizer(self, op: ops.InMemoryTable):
weakref.finalize(op, self._clean_up_tmp_table, op.name)

def _clean_up_tmp_table(self, name: str) -> None:
ident = sg.to_identifier(name, quoted=self.compiler.quoted)
sql = sge.Drop(kind="TABLE", this=ident, exists=True, cascade=True)
with self._safe_raw_sql(sql):
pass

def create_table(
Expand Down Expand Up @@ -349,11 +351,9 @@ def create_table(

quoted = self.compiler.quoted

temp_memtable_view = None
if obj is not None:
if not isinstance(obj, ir.Expr):
table = ibis.memtable(obj)
temp_memtable_view = table.op().name
else:
table = obj

Expand Down Expand Up @@ -382,31 +382,27 @@ def create_table(
else:
temp_name = name

table = sg.table(temp_name, catalog=database, quoted=quoted)
target = sge.Schema(this=table, expressions=column_defs)
table_expr = sg.table(temp_name, catalog=database, quoted=quoted)
target = sge.Schema(this=table_expr, expressions=column_defs)

create_stmt = sge.Create(kind="TABLE", this=target)

this = sg.table(name, catalog=database, quoted=quoted)
with self._safe_raw_sql(create_stmt):
if query is not None:
self.con.execute(
sge.Insert(this=table, expression=query).sql(self.name)
sge.Insert(this=table_expr, expression=query).sql(self.name)
)

if overwrite:
self.con.execute(
sge.Drop(kind="TABLE", this=this, exists=True).sql(self.name)
)
self.con.execute(
f"RENAME TABLE {table.sql(self.name)} TO {this.sql(self.name)}"
f"RENAME TABLE {table_expr.sql(self.name)} TO {this.sql(self.name)}"
)

if schema is None:
# Clean up temporary memtable if we've created one
# for in-memory reads
if temp_memtable_view is not None:
self.drop_table(temp_memtable_view)
return self.table(name, database=database)

# preserve the input schema if it was provided
Expand Down
19 changes: 8 additions & 11 deletions ibis/backends/mssql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -625,11 +625,9 @@ def create_table(
properties.append(sge.TemporaryProperty())
catalog, db = None, None

temp_memtable_view = None
if obj is not None:
if not isinstance(obj, ir.Expr):
table = ibis.memtable(obj)
temp_memtable_view = table.op().name
else:
table = obj

Expand Down Expand Up @@ -657,19 +655,22 @@ def create_table(
else:
temp_name = name

table = sg.table(
"#" * temp + temp_name, catalog=catalog, db=db, quoted=self.compiler.quoted
)
quoted = self.compiler.quoted
raw_table = sg.table(temp_name, catalog=catalog, db=db, quoted=False)
target = sge.Schema(this=table, expressions=column_defs)
target = sge.Schema(
this=sg.table(
"#" * temp + temp_name, catalog=catalog, db=db, quoted=quoted
),
expressions=column_defs,
)

create_stmt = sge.Create(
kind="TABLE",
this=target,
properties=sge.Properties(expressions=properties),
)

this = sg.table(name, catalog=catalog, db=db, quoted=self.compiler.quoted)
this = sg.table(name, catalog=catalog, db=db, quoted=quoted)
raw_this = sg.table(name, catalog=catalog, db=db, quoted=False)
with self._safe_ddl(create_stmt) as cur:
if query is not None:
Expand Down Expand Up @@ -702,10 +703,6 @@ def create_table(
db = "dbo"

if schema is None:
# Clean up temporary memtable if we've created one
# for in-memory reads
if temp_memtable_view is not None:
self.drop_table(temp_memtable_view)
return self.table(name, database=(catalog, db))

# preserve the input schema if it was provided
Expand Down
17 changes: 6 additions & 11 deletions ibis/backends/mysql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,11 +403,9 @@ def create_table(
if temp:
properties.append(sge.TemporaryProperty())

temp_memtable_view = None
if obj is not None:
if not isinstance(obj, ir.Expr):
table = ibis.memtable(obj)
temp_memtable_view = table.op().name
else:
table = obj

Expand Down Expand Up @@ -435,8 +433,8 @@ def create_table(
else:
temp_name = name

table = sg.table(temp_name, catalog=database, quoted=self.compiler.quoted)
target = sge.Schema(this=table, expressions=column_defs)
table_expr = sg.table(temp_name, catalog=database, quoted=self.compiler.quoted)
target = sge.Schema(this=table_expr, expressions=column_defs)

create_stmt = sge.Create(
kind="TABLE",
Expand All @@ -447,23 +445,20 @@ def create_table(
this = sg.table(name, catalog=database, quoted=self.compiler.quoted)
with self._safe_raw_sql(create_stmt) as cur:
if query is not None:
insert_stmt = sge.Insert(this=table, expression=query).sql(self.name)
insert_stmt = sge.Insert(this=table_expr, expression=query).sql(
self.name
)
cur.execute(insert_stmt)

if overwrite:
cur.execute(
sge.Drop(kind="TABLE", this=this, exists=True).sql(self.name)
)
cur.execute(
f"ALTER TABLE IF EXISTS {table.sql(self.name)} RENAME TO {this.sql(self.name)}"
f"ALTER TABLE IF EXISTS {table_expr.sql(self.name)} RENAME TO {this.sql(self.name)}"
)

if schema is None:
# Clean up temporary memtable if we've created one
# for in-memory reads
if temp_memtable_view is not None:
self.drop_table(temp_memtable_view)

return self.table(name, database=database)

# preserve the input schema if it was provided
Expand Down
24 changes: 13 additions & 11 deletions ibis/backends/oracle/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

from __future__ import annotations

import atexit
import contextlib
import re
import warnings
import weakref
from functools import cached_property
from operator import itemgetter
from typing import TYPE_CHECKING, Any
Expand Down Expand Up @@ -419,11 +419,9 @@ def create_table(
if temp:
properties.append(sge.TemporaryProperty())

temp_memtable_view = None
if obj is not None:
if not isinstance(obj, ir.Expr):
table = ibis.memtable(obj)
temp_memtable_view = table.op().name
else:
table = obj

Expand Down Expand Up @@ -478,10 +476,6 @@ def create_table(
)

if schema is None:
# Clean up temporary memtable if we've created one
# for in-memory reads
if temp_memtable_view is not None:
self.drop_table(temp_memtable_view)
return self.table(name, database=database)

# preserve the input schema if it was provided
Expand Down Expand Up @@ -551,7 +545,8 @@ def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
insert_stmt, list(data.iloc[start:end].itertuples(index=False))
)

atexit.register(self._clean_up_tmp_table, name)
def _register_memtable_finalizer(self, op: ops.InMemoryTable):
weakref.finalize(op, self._clean_up_tmp_table, op.name)

def _get_schema_using_query(self, query: str) -> sch.Schema:
name = util.gen_name("oracle_metadata")
Expand Down Expand Up @@ -632,6 +627,13 @@ def _fetch_from_cursor(self, cursor, schema: sch.Schema) -> pd.DataFrame:
return OraclePandasData.convert_table(df, schema)

def _clean_up_tmp_table(self, name: str) -> None:
dialect = self.dialect

ident = sg.to_identifier(name, quoted=self.compiler.quoted)

truncate = sge.TruncateTable(expressions=[ident]).sql(dialect)
drop = sge.Drop(kind="TABLE", this=ident).sql(dialect)

with self.begin() as bind:
# global temporary tables cannot be dropped without first truncating them
#
Expand All @@ -640,9 +642,9 @@ def _clean_up_tmp_table(self, name: str) -> None:
# ignore DatabaseError exceptions because the table may not exist
# because it's already been deleted
with contextlib.suppress(oracledb.DatabaseError):
bind.execute(f'TRUNCATE TABLE "{name}"')
bind.execute(truncate)
with contextlib.suppress(oracledb.DatabaseError):
bind.execute(f'DROP TABLE "{name}"')
bind.execute(drop)

def _drop_cached_table(self, name):
def _drop_cached_table(self, name: str) -> None:
self._clean_up_tmp_table(name)
3 changes: 3 additions & 0 deletions ibis/backends/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,9 @@ def execute(self, query, params=None, limit="default", **kwargs):
def _create_cached_table(self, name, expr):
return self.create_table(name, expr.execute())

def _register_memtable_finalizer(self, op: ops.InMemoryTable):
"""No-op, let Python handle clean up."""


@lazy_singledispatch
def _convert_object(obj: Any, _conn):
Expand Down
10 changes: 6 additions & 4 deletions ibis/backends/postgres/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -701,8 +701,8 @@ def create_table(
else:
temp_name = name

table = sg.table(temp_name, db=database, quoted=self.compiler.quoted)
target = sge.Schema(this=table, expressions=column_defs)
table_expr = sg.table(temp_name, db=database, quoted=self.compiler.quoted)
target = sge.Schema(this=table_expr, expressions=column_defs)

create_stmt = sge.Create(
kind="TABLE",
Expand All @@ -713,15 +713,17 @@ def create_table(
this = sg.table(name, catalog=database, quoted=self.compiler.quoted)
with self._safe_raw_sql(create_stmt) as cur:
if query is not None:
insert_stmt = sge.Insert(this=table, expression=query).sql(self.dialect)
insert_stmt = sge.Insert(this=table_expr, expression=query).sql(
self.dialect
)
cur.execute(insert_stmt)

if overwrite:
cur.execute(
sge.Drop(kind="TABLE", this=this, exists=True).sql(self.dialect)
)
cur.execute(
f"ALTER TABLE IF EXISTS {table.sql(self.dialect)} RENAME TO {this.sql(self.dialect)}"
f"ALTER TABLE IF EXISTS {table_expr.sql(self.dialect)} RENAME TO {this.sql(self.dialect)}"
)

if schema is None:
Expand Down
7 changes: 0 additions & 7 deletions ibis/backends/pyspark/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -597,13 +597,11 @@ def create_table(
table_loc = self._to_sqlglot_table(database)
catalog, db = self._to_catalog_db_tuple(table_loc)

temp_memtable_view = None
if obj is not None:
if isinstance(obj, ir.Expr):
table = obj
else:
table = ibis.memtable(obj)
temp_memtable_view = table.op().name
query = self.compile(table)
mode = "overwrite" if overwrite else "error"
with self._active_catalog_database(catalog, db):
Expand All @@ -618,11 +616,6 @@ def create_table(
else:
raise com.IbisError("The schema or obj parameter is required")

# Clean up temporary memtable if we've created one
# for in-memory reads
if temp_memtable_view is not None:
self.drop_table(temp_memtable_view)

return self.table(name, database=(catalog, db))

def create_view(
Expand Down
Loading

0 comments on commit fb642a4

Please sign in to comment.