Skip to content

Commit

Permalink
feat(dask): add read_csv and read_parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
gforsyth authored and cpcloud committed Oct 31, 2023
1 parent 20fd120 commit e9260af
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 2 deletions.
58 changes: 58 additions & 0 deletions ibis/backends/dask/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@
import ibis.expr.operations as ops
import ibis.expr.schema as sch
import ibis.expr.types as ir
from ibis import util
from ibis.backends.dask.core import execute_and_reset
from ibis.backends.pandas import BasePandasBackend
from ibis.backends.pandas.core import _apply_schema
from ibis.formats.pandas import DaskData

if TYPE_CHECKING:
import pathlib
from collections.abc import Mapping, MutableMapping

# Make sure that the pandas backend options have been loaded
Expand Down Expand Up @@ -114,6 +116,62 @@ def compile(

return execute_and_reset(query.op(), params=params, **kwargs)

def read_csv(
self, source: str | pathlib.Path, table_name: str | None = None, **kwargs: Any
):
"""Register a CSV file as a table in the current session.
Parameters
----------
source
The data source. Can be a local or remote file, pathlike objects
also accepted.
table_name
An optional name to use for the created table. This defaults to
a generated name.
**kwargs
Additional keyword arguments passed to Pandas loading function.
See https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html
for more information.
Returns
-------
ir.Table
The just-registered table
"""
table_name = table_name or util.gen_name("read_csv")
df = dd.read_csv(source, **kwargs)
self.dictionary[table_name] = df
return self.table(table_name)

def read_parquet(
self, source: str | pathlib.Path, table_name: str | None = None, **kwargs: Any
):
"""Register a parquet file as a table in the current session.
Parameters
----------
source
The data source(s). May be a path to a file, an iterable of files,
or directory of parquet files.
table_name
An optional name to use for the created table. This defaults to
a generated name.
**kwargs
Additional keyword arguments passed to Pandas loading function.
See https://pandas.pydata.org/docs/reference/api/pandas.read_parquet.html
for more information.
Returns
-------
ir.Table
The just-registered table
"""
table_name = table_name or util.gen_name("read_parquet")
df = dd.read_parquet(source, **kwargs)
self.dictionary[table_name] = df
return self.table(table_name)

def table(self, name: str, schema: sch.Schema | None = None):
df = self.dictionary[name]
schema = schema or self.schemas.get(name, None)
Expand Down
2 changes: 0 additions & 2 deletions ibis/backends/tests/test_register.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,6 @@ def ft_data(data_dir):
@pytest.mark.notyet(
[
"bigquery",
"dask",
"impala",
"mssql",
"mysql",
Expand Down Expand Up @@ -462,7 +461,6 @@ def test_read_parquet_glob(con, tmp_path, ft_data):
@pytest.mark.notyet(
[
"bigquery",
"dask",
"impala",
"mssql",
"mysql",
Expand Down

0 comments on commit e9260af

Please sign in to comment.