Skip to content

Commit

Permalink
refactor(register): remove deprecated register method (#10545)
Browse files Browse the repository at this point in the history
BREAKING CHANGE: The deprecated `register` method has been removed. Please use the file-specific `read_*` methods instead. For in-memory objects, pass them to `ibis.memtable` or `create_table`.
  • Loading branch information
gforsyth authored Dec 17, 2024
1 parent 0c57e8b commit aa60584
Show file tree
Hide file tree
Showing 8 changed files with 215 additions and 708 deletions.
147 changes: 64 additions & 83 deletions ibis/backends/datafusion/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import datafusion as df
import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow_hotfix # noqa: F401
import sqlglot as sg
import sqlglot.expressions as sge
Expand All @@ -28,7 +27,7 @@
from ibis.common.dispatch import lazy_singledispatch
from ibis.expr.operations.udf import InputType
from ibis.formats.pyarrow import PyArrowSchema, PyArrowType
from ibis.util import deprecated, gen_name, normalize_filename, normalize_filenames
from ibis.util import gen_name, normalize_filename, normalize_filenames, warn_deprecated

try:
from datafusion import ExecutionContext as SessionContext
Expand Down Expand Up @@ -88,37 +87,30 @@ def do_connect(
Parameters
----------
config
Mapping of table names to files or a `SessionContext`
Mapping of table names to files (deprecated in 10.0) or a `SessionContext`
instance.
Examples
--------
>>> from datafusion import SessionContext
>>> ctx = SessionContext()
>>> _ = ctx.from_pydict({"a": [1, 2, 3]}, "mytable")
>>> import ibis
>>> config = {
... "astronauts": "ci/ibis-testing-data/parquet/astronauts.parquet",
... "diamonds": "ci/ibis-testing-data/csv/diamonds.csv",
... }
>>> con = ibis.datafusion.connect(config)
>>> con = ibis.datafusion.connect(ctx)
>>> con.list_tables()
['astronauts', 'diamonds']
>>> con.table("diamonds")
DatabaseTable: diamonds
carat float64
cut string
color string
clarity string
depth float64
table float64
price int64
x float64
y float64
z float64
['mytable']
"""
if isinstance(config, SessionContext):
(self.con, config) = (config, None)
else:
if config is not None and not isinstance(config, Mapping):
raise TypeError("Input to ibis.datafusion.connect must be a mapping")
elif config is not None and config: # warn if dict is not empty
warn_deprecated(
"Passing a mapping of tables names to files",
as_of="10.0",
instead="Please use the explicit `read_*` methods for the files you would like to load instead.",
)
if SessionConfig is not None:
df_config = SessionConfig(
{"datafusion.sql_parser.dialect": "PostgreSQL"}
Expand Down Expand Up @@ -178,6 +170,57 @@ def _get_schema_using_query(self, query: str) -> sch.Schema:

return PyArrowSchema.to_ibis(df.schema())

def _register(
self,
source: str | Path | pa.Table | pa.RecordBatch | pa.Dataset | pd.DataFrame,
table_name: str | None = None,
**kwargs: Any,
) -> ir.Table:
import pandas as pd
import pyarrow.dataset as ds

if isinstance(source, (str, Path)):
first = str(source)
elif isinstance(source, pa.Table):
self.con.deregister_table(table_name)
self.con.register_record_batches(table_name, [source.to_batches()])
return self.table(table_name)
elif isinstance(source, pa.RecordBatch):
self.con.deregister_table(table_name)
self.con.register_record_batches(table_name, [[source]])
return self.table(table_name)
elif isinstance(source, ds.Dataset):
self.con.deregister_table(table_name)
self.con.register_dataset(table_name, source)
return self.table(table_name)
elif isinstance(source, pd.DataFrame):
return self.register(pa.Table.from_pandas(source), table_name, **kwargs)
else:
raise ValueError("`source` must be either a string or a pathlib.Path")

if first.startswith(("parquet://", "parq://")) or first.endswith(
("parq", "parquet")
):
return self.read_parquet(source, table_name=table_name, **kwargs)
elif first.startswith(("csv://", "txt://")) or first.endswith(
("csv", "tsv", "txt")
):
return self.read_csv(source, table_name=table_name, **kwargs)
else:
self._register_failure()
return None

def _register_failure(self):
import inspect

msg = ", ".join(
m[0] for m in inspect.getmembers(self) if m[0].startswith("read_")
)
raise ValueError(
f"Cannot infer appropriate read function for input, "
f"please call one of {msg} directly"
)

def _register_builtin_udfs(self):
from ibis.backends.datafusion import udfs

Expand Down Expand Up @@ -345,68 +388,6 @@ def get_schema(
table = database.table(table_name)
return sch.schema(table.schema)

@deprecated(
as_of="9.1",
instead="use the explicit `read_*` method for the filetype you are trying to read, e.g., read_parquet, read_csv, etc.",
)
def register(
self,
source: str | Path | pa.Table | pa.RecordBatch | pa.Dataset | pd.DataFrame,
table_name: str | None = None,
**kwargs: Any,
) -> ir.Table:
return self._register(source, table_name, **kwargs)

def _register(
self,
source: str | Path | pa.Table | pa.RecordBatch | pa.Dataset | pd.DataFrame,
table_name: str | None = None,
**kwargs: Any,
) -> ir.Table:
import pandas as pd

if isinstance(source, (str, Path)):
first = str(source)
elif isinstance(source, pa.Table):
self.con.deregister_table(table_name)
self.con.register_record_batches(table_name, [source.to_batches()])
return self.table(table_name)
elif isinstance(source, pa.RecordBatch):
self.con.deregister_table(table_name)
self.con.register_record_batches(table_name, [[source]])
return self.table(table_name)
elif isinstance(source, ds.Dataset):
self.con.deregister_table(table_name)
self.con.register_dataset(table_name, source)
return self.table(table_name)
elif isinstance(source, pd.DataFrame):
return self.register(pa.Table.from_pandas(source), table_name, **kwargs)
else:
raise ValueError("`source` must be either a string or a pathlib.Path")

if first.startswith(("parquet://", "parq://")) or first.endswith(
("parq", "parquet")
):
return self.read_parquet(source, table_name=table_name, **kwargs)
elif first.startswith(("csv://", "txt://")) or first.endswith(
("csv", "tsv", "txt")
):
return self.read_csv(source, table_name=table_name, **kwargs)
else:
self._register_failure()
return None

def _register_failure(self):
import inspect

msg = ", ".join(
m[0] for m in inspect.getmembers(self) if m[0].startswith("read_")
)
raise ValueError(
f"Cannot infer appropriate read function for input, "
f"please call one of {msg} directly"
)

def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
# self.con.register_table is broken, so we do this roundabout thing
# of constructing a datafusion DataFrame, which has a side effect
Expand Down
6 changes: 4 additions & 2 deletions ibis/backends/datafusion/tests/test_connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ def test_none_config():

def test_str_config(name_to_path):
config = {name: str(path) for name, path in name_to_path.items()}
conn = ibis.datafusion.connect(config)
with pytest.warns(FutureWarning):
conn = ibis.datafusion.connect(config)
assert sorted(conn.list_tables()) == sorted(name_to_path)


def test_path_config(name_to_path):
config = name_to_path
conn = ibis.datafusion.connect(config)
with pytest.warns(FutureWarning):
conn = ibis.datafusion.connect(config)
assert sorted(conn.list_tables()) == sorted(name_to_path)


Expand Down
72 changes: 0 additions & 72 deletions ibis/backends/datafusion/tests/test_register.py

This file was deleted.

72 changes: 0 additions & 72 deletions ibis/backends/duckdb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
from ibis.backends.sql.compilers.base import STAR, AlterTable, C, RenameTable
from ibis.common.dispatch import lazy_singledispatch
from ibis.expr.operations.udf import InputType
from ibis.util import deprecated

if TYPE_CHECKING:
from collections.abc import Iterable, Mapping, MutableMapping, Sequence
Expand Down Expand Up @@ -483,77 +482,6 @@ def drop_database(
with self._safe_raw_sql(sge.Drop(this=name, kind="SCHEMA", replace=force)):
pass

@deprecated(
as_of="9.1",
instead="use the explicit `read_*` method for the filetype you are trying to read, e.g., read_parquet, read_csv, etc.",
)
def register(
self,
source: str | Path | Any,
table_name: str | None = None,
**kwargs: Any,
) -> ir.Table:
"""Register a data source as a table in the current database.
Parameters
----------
source
The data source(s). May be a path to a file or directory of
parquet/csv files, an iterable of parquet or CSV files, a pandas
dataframe, a pyarrow table or dataset, or a postgres URI.
table_name
An optional name to use for the created table. This defaults to a
sequentially generated name.
**kwargs
Additional keyword arguments passed to DuckDB loading functions for
CSV or parquet. See https://duckdb.org/docs/data/csv and
https://duckdb.org/docs/data/parquet for more information.
Returns
-------
ir.Table
The just-registered table
"""

if isinstance(source, (str, Path)):
first = str(source)
elif isinstance(source, (list, tuple)):
first = source[0]
else:
try:
return self.read_in_memory(source, table_name=table_name, **kwargs)
except (duckdb.InvalidInputException, NameError):
self._register_failure()

if first.startswith(("parquet://", "parq://")) or first.endswith(
("parq", "parquet")
):
return self.read_parquet(source, table_name=table_name, **kwargs)
elif first.startswith(
("csv://", "csv.gz://", "txt://", "txt.gz://")
) or first.endswith(("csv", "csv.gz", "tsv", "tsv.gz", "txt", "txt.gz")):
return self.read_csv(source, table_name=table_name, **kwargs)
elif first.startswith(("postgres://", "postgresql://")):
return self.read_postgres(source, table_name=table_name, **kwargs)
elif first.startswith("sqlite://"):
return self.read_sqlite(
first[len("sqlite://") :], table_name=table_name, **kwargs
)
else:
self._register_failure() # noqa: RET503

def _register_failure(self):
import inspect

msg = ", ".join(
name for name, _ in inspect.getmembers(self) if name.startswith("read_")
)
raise ValueError(
f"Cannot infer appropriate read function for input, "
f"please call one of {msg} directly"
)

@util.experimental
def read_json(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,20 +229,6 @@ def test_read_sqlite_no_table_name(con, tmp_path):
con.read_sqlite(path)


@pytest.mark.xfail(
LINUX and SANDBOXED,
reason="nix on linux cannot download duckdb extensions or data due to sandboxing",
raises=duckdb.IOException,
)
def test_register_sqlite(con, tmp_path):
path = tmp_path / "test.db"
sqlite_con = sqlite3.connect(str(path))
sqlite_con.execute("CREATE TABLE t AS SELECT 1 a UNION SELECT 2 UNION SELECT 3")
with pytest.warns(FutureWarning, match="v9.1"):
ft = con.register(f"sqlite://{path}", "t")
assert ft.count().execute()


# Because we create a new connection and the test requires loading/installing a
# DuckDB extension, we need to xfail these on Nix.
@pytest.mark.xfail(
Expand Down
Loading

0 comments on commit aa60584

Please sign in to comment.