Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(register): remove deprecated register method #10545

Merged
merged 4 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 64 additions & 83 deletions ibis/backends/datafusion/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import datafusion as df
import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow_hotfix # noqa: F401
import sqlglot as sg
import sqlglot.expressions as sge
Expand All @@ -28,7 +27,7 @@
from ibis.common.dispatch import lazy_singledispatch
from ibis.expr.operations.udf import InputType
from ibis.formats.pyarrow import PyArrowSchema, PyArrowType
from ibis.util import deprecated, gen_name, normalize_filename, normalize_filenames
from ibis.util import gen_name, normalize_filename, normalize_filenames, warn_deprecated

try:
from datafusion import ExecutionContext as SessionContext
Expand Down Expand Up @@ -88,37 +87,30 @@ def do_connect(
Parameters
----------
config
Mapping of table names to files or a `SessionContext`
Mapping of table names to files (deprecated in 10.0) or a `SessionContext`
instance.

Examples
--------
>>> from datafusion import SessionContext
>>> ctx = SessionContext()
>>> _ = ctx.from_pydict({"a": [1, 2, 3]}, "mytable")
>>> import ibis
>>> config = {
... "astronauts": "ci/ibis-testing-data/parquet/astronauts.parquet",
... "diamonds": "ci/ibis-testing-data/csv/diamonds.csv",
... }
>>> con = ibis.datafusion.connect(config)
>>> con = ibis.datafusion.connect(ctx)
>>> con.list_tables()
['astronauts', 'diamonds']
>>> con.table("diamonds")
DatabaseTable: diamonds
carat float64
cut string
color string
clarity string
depth float64
table float64
price int64
x float64
y float64
z float64
['mytable']
"""
if isinstance(config, SessionContext):
(self.con, config) = (config, None)
else:
if config is not None and not isinstance(config, Mapping):
raise TypeError("Input to ibis.datafusion.connect must be a mapping")
elif config is not None and config: # warn if dict is not empty
warn_deprecated(
"Passing a mapping of tables names to files",
as_of="10.0",
instead="Please use the explicit `read_*` methods for the files you would like to load instead.",
)
if SessionConfig is not None:
df_config = SessionConfig(
{"datafusion.sql_parser.dialect": "PostgreSQL"}
Expand Down Expand Up @@ -178,6 +170,57 @@ def _get_schema_using_query(self, query: str) -> sch.Schema:

return PyArrowSchema.to_ibis(df.schema())

def _register(
self,
source: str | Path | pa.Table | pa.RecordBatch | pa.Dataset | pd.DataFrame,
table_name: str | None = None,
**kwargs: Any,
) -> ir.Table:
import pandas as pd
import pyarrow.dataset as ds

if isinstance(source, (str, Path)):
first = str(source)
elif isinstance(source, pa.Table):
self.con.deregister_table(table_name)
self.con.register_record_batches(table_name, [source.to_batches()])
return self.table(table_name)
elif isinstance(source, pa.RecordBatch):
self.con.deregister_table(table_name)
self.con.register_record_batches(table_name, [[source]])
return self.table(table_name)
elif isinstance(source, ds.Dataset):
self.con.deregister_table(table_name)
self.con.register_dataset(table_name, source)
return self.table(table_name)
elif isinstance(source, pd.DataFrame):
return self.register(pa.Table.from_pandas(source), table_name, **kwargs)
else:
raise ValueError("`source` must be either a string or a pathlib.Path")

if first.startswith(("parquet://", "parq://")) or first.endswith(
("parq", "parquet")
):
return self.read_parquet(source, table_name=table_name, **kwargs)
elif first.startswith(("csv://", "txt://")) or first.endswith(
("csv", "tsv", "txt")
):
return self.read_csv(source, table_name=table_name, **kwargs)
else:
self._register_failure()
return None

def _register_failure(self):
import inspect

msg = ", ".join(
m[0] for m in inspect.getmembers(self) if m[0].startswith("read_")
)
raise ValueError(
f"Cannot infer appropriate read function for input, "
f"please call one of {msg} directly"
)

def _register_builtin_udfs(self):
from ibis.backends.datafusion import udfs

Expand Down Expand Up @@ -345,68 +388,6 @@ def get_schema(
table = database.table(table_name)
return sch.schema(table.schema)

@deprecated(
as_of="9.1",
instead="use the explicit `read_*` method for the filetype you are trying to read, e.g., read_parquet, read_csv, etc.",
)
def register(
self,
source: str | Path | pa.Table | pa.RecordBatch | pa.Dataset | pd.DataFrame,
table_name: str | None = None,
**kwargs: Any,
) -> ir.Table:
return self._register(source, table_name, **kwargs)

def _register(
self,
source: str | Path | pa.Table | pa.RecordBatch | pa.Dataset | pd.DataFrame,
table_name: str | None = None,
**kwargs: Any,
) -> ir.Table:
import pandas as pd

if isinstance(source, (str, Path)):
first = str(source)
elif isinstance(source, pa.Table):
self.con.deregister_table(table_name)
self.con.register_record_batches(table_name, [source.to_batches()])
return self.table(table_name)
elif isinstance(source, pa.RecordBatch):
self.con.deregister_table(table_name)
self.con.register_record_batches(table_name, [[source]])
return self.table(table_name)
elif isinstance(source, ds.Dataset):
self.con.deregister_table(table_name)
self.con.register_dataset(table_name, source)
return self.table(table_name)
elif isinstance(source, pd.DataFrame):
return self.register(pa.Table.from_pandas(source), table_name, **kwargs)
else:
raise ValueError("`source` must be either a string or a pathlib.Path")

if first.startswith(("parquet://", "parq://")) or first.endswith(
("parq", "parquet")
):
return self.read_parquet(source, table_name=table_name, **kwargs)
elif first.startswith(("csv://", "txt://")) or first.endswith(
("csv", "tsv", "txt")
):
return self.read_csv(source, table_name=table_name, **kwargs)
else:
self._register_failure()
return None

def _register_failure(self):
import inspect

msg = ", ".join(
m[0] for m in inspect.getmembers(self) if m[0].startswith("read_")
)
raise ValueError(
f"Cannot infer appropriate read function for input, "
f"please call one of {msg} directly"
)

def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
# self.con.register_table is broken, so we do this roundabout thing
# of constructing a datafusion DataFrame, which has a side effect
Expand Down
6 changes: 4 additions & 2 deletions ibis/backends/datafusion/tests/test_connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ def test_none_config():

def test_str_config(name_to_path):
config = {name: str(path) for name, path in name_to_path.items()}
conn = ibis.datafusion.connect(config)
with pytest.warns(FutureWarning):
conn = ibis.datafusion.connect(config)
assert sorted(conn.list_tables()) == sorted(name_to_path)


def test_path_config(name_to_path):
config = name_to_path
conn = ibis.datafusion.connect(config)
with pytest.warns(FutureWarning):
conn = ibis.datafusion.connect(config)
assert sorted(conn.list_tables()) == sorted(name_to_path)


Expand Down
72 changes: 0 additions & 72 deletions ibis/backends/datafusion/tests/test_register.py

This file was deleted.

72 changes: 0 additions & 72 deletions ibis/backends/duckdb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
from ibis.backends.sql.compilers.base import STAR, AlterTable, C, RenameTable
from ibis.common.dispatch import lazy_singledispatch
from ibis.expr.operations.udf import InputType
from ibis.util import deprecated

if TYPE_CHECKING:
from collections.abc import Iterable, Mapping, MutableMapping, Sequence
Expand Down Expand Up @@ -483,77 +482,6 @@ def drop_database(
with self._safe_raw_sql(sge.Drop(this=name, kind="SCHEMA", replace=force)):
pass

@deprecated(
as_of="9.1",
instead="use the explicit `read_*` method for the filetype you are trying to read, e.g., read_parquet, read_csv, etc.",
)
def register(
self,
source: str | Path | Any,
table_name: str | None = None,
**kwargs: Any,
) -> ir.Table:
"""Register a data source as a table in the current database.

Parameters
----------
source
The data source(s). May be a path to a file or directory of
parquet/csv files, an iterable of parquet or CSV files, a pandas
dataframe, a pyarrow table or dataset, or a postgres URI.
table_name
An optional name to use for the created table. This defaults to a
sequentially generated name.
**kwargs
Additional keyword arguments passed to DuckDB loading functions for
CSV or parquet. See https://duckdb.org/docs/data/csv and
https://duckdb.org/docs/data/parquet for more information.

Returns
-------
ir.Table
The just-registered table

"""

if isinstance(source, (str, Path)):
first = str(source)
elif isinstance(source, (list, tuple)):
first = source[0]
else:
try:
return self.read_in_memory(source, table_name=table_name, **kwargs)
except (duckdb.InvalidInputException, NameError):
self._register_failure()

if first.startswith(("parquet://", "parq://")) or first.endswith(
("parq", "parquet")
):
return self.read_parquet(source, table_name=table_name, **kwargs)
elif first.startswith(
("csv://", "csv.gz://", "txt://", "txt.gz://")
) or first.endswith(("csv", "csv.gz", "tsv", "tsv.gz", "txt", "txt.gz")):
return self.read_csv(source, table_name=table_name, **kwargs)
elif first.startswith(("postgres://", "postgresql://")):
return self.read_postgres(source, table_name=table_name, **kwargs)
elif first.startswith("sqlite://"):
return self.read_sqlite(
first[len("sqlite://") :], table_name=table_name, **kwargs
)
else:
self._register_failure() # noqa: RET503

def _register_failure(self):
import inspect

msg = ", ".join(
name for name, _ in inspect.getmembers(self) if name.startswith("read_")
)
raise ValueError(
f"Cannot infer appropriate read function for input, "
f"please call one of {msg} directly"
)

@util.experimental
def read_json(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,20 +229,6 @@ def test_read_sqlite_no_table_name(con, tmp_path):
con.read_sqlite(path)


@pytest.mark.xfail(
LINUX and SANDBOXED,
reason="nix on linux cannot download duckdb extensions or data due to sandboxing",
raises=duckdb.IOException,
)
def test_register_sqlite(con, tmp_path):
path = tmp_path / "test.db"
sqlite_con = sqlite3.connect(str(path))
sqlite_con.execute("CREATE TABLE t AS SELECT 1 a UNION SELECT 2 UNION SELECT 3")
with pytest.warns(FutureWarning, match="v9.1"):
ft = con.register(f"sqlite://{path}", "t")
assert ft.count().execute()


# Because we create a new connection and the test requires loading/installing a
# DuckDB extension, we need to xfail these on Nix.
@pytest.mark.xfail(
Expand Down
Loading
Loading