Skip to content

Commit

Permalink
chore!: remove pyarrow-based file reader (#3587)
Browse files Browse the repository at this point in the history
BREAKING CHANGE: removed the following: reading using PyArrow,
`PythonStorageConfig` type and `StorageConfig.python` function,
`daft.table.schema_inference`, refactored `StorageConfig` and
`NativeStorageConfig` into one type

Also includes two small fixes:
- `Schema` struct equality now also depends on the ordering of the
fields
- Monotonically increasing ID schema now always inserts ID as the first
column

todo:
- [x] refactor StorageConfig and NativeStorageConfig into one struct
- [x] clean up table_io.py
  • Loading branch information
kevinzwang authored Dec 18, 2024
1 parent 6602502 commit 3a3707a
Show file tree
Hide file tree
Showing 41 changed files with 467 additions and 1,391 deletions.
31 changes: 4 additions & 27 deletions daft/daft/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -586,37 +586,14 @@ class IOConfig:
"""Replaces values if provided, returning a new IOConfig."""
...

class NativeStorageConfig:
"""Storage configuration for the Rust-native I/O layer."""
class StorageConfig:
"""Configuration for interacting with a particular storage backend."""

# Whether or not to use a multithreaded tokio runtime for processing I/O
multithreaded_io: bool
io_config: IOConfig

def __init__(self, multithreaded_io: bool, io_config: IOConfig): ...

class PythonStorageConfig:
"""Storage configuration for the legacy Python I/O layer."""

io_config: IOConfig

def __init__(self, io_config: IOConfig): ...

class StorageConfig:
"""Configuration for interacting with a particular storage backend, using a particular I/O layer implementation."""

@staticmethod
def native(config: NativeStorageConfig) -> StorageConfig:
"""Create from a native storage config."""
...

@staticmethod
def python(config: PythonStorageConfig) -> StorageConfig:
"""Create from a Python storage config."""
...

@property
def config(self) -> NativeStorageConfig | PythonStorageConfig: ...
def __init__(self, multithreaded_io: bool, io_config: IOConfig | None): ...

class ScanTask:
"""A batch of scan tasks for reading data from an external source."""
Expand Down Expand Up @@ -650,8 +627,8 @@ class ScanTask:
url: str,
file_format: FileFormatConfig,
schema: PySchema,
num_rows: int | None,
storage_config: StorageConfig,
num_rows: int | None,
size_bytes: int | None,
pushdowns: Pushdowns | None,
stats: PyTable | None,
Expand Down
2 changes: 1 addition & 1 deletion daft/delta_lake/delta_lake_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def __init__(
# Thus, if we don't detect any credentials being available, we attempt to detect it from the environment using our Daft credentials chain.
#
# See: https://github.com/delta-io/delta-rs/issues/2117
deltalake_sdk_io_config = storage_config.config.io_config
deltalake_sdk_io_config = storage_config.io_config
scheme = urlparse(table_uri).scheme
if scheme == "s3" or scheme == "s3a":
# Try to get region from boto3
Expand Down
38 changes: 13 additions & 25 deletions daft/expressions/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -1333,7 +1333,6 @@ def download(
max_connections: int = 32,
on_error: Literal["raise", "null"] = "raise",
io_config: IOConfig | None = None,
use_native_downloader: bool = True,
) -> Expression:
"""Treats each string as a URL, and downloads the bytes contents as a bytes column.
Expand All @@ -1351,37 +1350,26 @@ def download(
the error but fallback to a Null value. Defaults to "raise".
io_config: IOConfig to use when accessing remote storage. Note that the S3Config's `max_connections` parameter will be overridden
with `max_connections` that is passed in as a kwarg.
use_native_downloader (bool): Use the native downloader rather than python based one.
Defaults to True.
Returns:
Expression: a Binary expression which is the bytes contents of the URL, or None if an error occurred during download
"""
if use_native_downloader:
raise_on_error = False
if on_error == "raise":
raise_on_error = True
elif on_error == "null":
raise_on_error = False
if on_error == "raise":
raise_on_error = True
elif on_error == "null":
raise_on_error = False
else:
raise NotImplementedError(f"Unimplemented on_error option: {on_error}.")

if not (isinstance(max_connections, int) and max_connections > 0):
raise ValueError(f"Invalid value for `max_connections`: {max_connections}")

multi_thread = ExpressionUrlNamespace._should_use_multithreading_tokio_runtime()
io_config = ExpressionUrlNamespace._override_io_config_max_connections(max_connections, io_config)
return Expression._from_pyexpr(
_url_download(self._expr, max_connections, raise_on_error, multi_thread, io_config)
)
else:
from daft.udf_library import url_udfs
raise NotImplementedError(f"Unimplemented on_error option: {on_error}.")

return url_udfs.download_udf(
Expression._from_pyexpr(self._expr),
max_worker_threads=max_connections,
on_error=on_error,
)
if not (isinstance(max_connections, int) and max_connections > 0):
raise ValueError(f"Invalid value for `max_connections`: {max_connections}")

multi_thread = ExpressionUrlNamespace._should_use_multithreading_tokio_runtime()
io_config = ExpressionUrlNamespace._override_io_config_max_connections(max_connections, io_config)
return Expression._from_pyexpr(
_url_download(self._expr, max_connections, raise_on_error, multi_thread, io_config)
)

def upload(
self,
Expand Down
2 changes: 1 addition & 1 deletion daft/hudi/hudi_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
class HudiScanOperator(ScanOperator):
def __init__(self, table_uri: str, storage_config: StorageConfig) -> None:
super().__init__()
resolved_path, self._resolved_fs = _resolve_paths_and_filesystem(table_uri, storage_config.config.io_config)
resolved_path, self._resolved_fs = _resolve_paths_and_filesystem(table_uri, storage_config.io_config)
self._table = HudiTable(table_uri, self._resolved_fs, resolved_path[0])
self._storage_config = storage_config
self._schema = Schema.from_pyarrow_schema(self._table.schema)
Expand Down
11 changes: 2 additions & 9 deletions daft/io/_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
CsvSourceConfig,
FileFormatConfig,
IOConfig,
NativeStorageConfig,
PythonStorageConfig,
StorageConfig,
)
from daft.dataframe import DataFrame
Expand All @@ -32,7 +30,6 @@ def read_csv(
io_config: Optional["IOConfig"] = None,
file_path_column: Optional[str] = None,
hive_partitioning: bool = False,
use_native_downloader: bool = True,
schema_hints: Optional[Dict[str, DataType]] = None,
_buffer_size: Optional[int] = None,
_chunk_size: Optional[int] = None,
Expand All @@ -58,8 +55,6 @@ def read_csv(
io_config (IOConfig): Config to be used with the native downloader
file_path_column: Include the source path(s) as a column with this name. Defaults to None.
hive_partitioning: Whether to infer hive_style partitions from file paths and include them as columns in the Dataframe. Defaults to False.
use_native_downloader: Whether to use the native downloader instead of PyArrow for reading Parquet. This
is currently experimental.
returns:
DataFrame: parsed DataFrame
Expand Down Expand Up @@ -91,10 +86,8 @@ def read_csv(
chunk_size=_chunk_size,
)
file_format_config = FileFormatConfig.from_csv_config(csv_config)
if use_native_downloader:
storage_config = StorageConfig.native(NativeStorageConfig(True, io_config))
else:
storage_config = StorageConfig.python(PythonStorageConfig(io_config=io_config))
storage_config = StorageConfig(True, io_config)

builder = get_tabular_files_scan(
path=path,
infer_schema=infer_schema,
Expand Down
6 changes: 3 additions & 3 deletions daft/io/_deltalake.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from daft import context
from daft.api_annotations import PublicAPI
from daft.daft import IOConfig, NativeStorageConfig, ScanOperatorHandle, StorageConfig
from daft.daft import IOConfig, ScanOperatorHandle, StorageConfig
from daft.dataframe import DataFrame
from daft.dependencies import unity_catalog
from daft.io.catalog import DataCatalogTable
Expand Down Expand Up @@ -60,7 +60,7 @@ def read_deltalake(
)

io_config = context.get_context().daft_planning_config.default_io_config if io_config is None else io_config
storage_config = StorageConfig.native(NativeStorageConfig(multithreaded_io, io_config))
storage_config = StorageConfig(multithreaded_io, io_config)

if isinstance(table, str):
table_uri = table
Expand All @@ -72,7 +72,7 @@ def read_deltalake(
# Override the storage_config with the one provided by Unity catalog
table_io_config = table.io_config
if table_io_config is not None:
storage_config = StorageConfig.native(NativeStorageConfig(multithreaded_io, table_io_config))
storage_config = StorageConfig(multithreaded_io, table_io_config)
else:
raise ValueError(
f"table argument must be a table URI string, DataCatalogTable or UnityCatalogTable instance, but got: {type(table)}, {table}"
Expand Down
4 changes: 2 additions & 2 deletions daft/io/_hudi.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from daft import context
from daft.api_annotations import PublicAPI
from daft.daft import IOConfig, NativeStorageConfig, ScanOperatorHandle, StorageConfig
from daft.daft import IOConfig, ScanOperatorHandle, StorageConfig
from daft.dataframe import DataFrame
from daft.logical.builder import LogicalPlanBuilder

Expand Down Expand Up @@ -33,7 +33,7 @@ def read_hudi(
io_config = context.get_context().daft_planning_config.default_io_config if io_config is None else io_config

multithreaded_io = context.get_context().get_or_create_runner().name != "ray"
storage_config = StorageConfig.native(NativeStorageConfig(multithreaded_io, io_config))
storage_config = StorageConfig(multithreaded_io, io_config)

hudi_operator = HudiScanOperator(table_uri, storage_config=storage_config)

Expand Down
4 changes: 2 additions & 2 deletions daft/io/_iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from daft import context
from daft.api_annotations import PublicAPI
from daft.daft import IOConfig, NativeStorageConfig, ScanOperatorHandle, StorageConfig
from daft.daft import IOConfig, ScanOperatorHandle, StorageConfig
from daft.dataframe import DataFrame
from daft.logical.builder import LogicalPlanBuilder

Expand Down Expand Up @@ -123,7 +123,7 @@ def read_iceberg(
io_config = context.get_context().daft_planning_config.default_io_config if io_config is None else io_config

multithreaded_io = context.get_context().get_or_create_runner().name != "ray"
storage_config = StorageConfig.native(NativeStorageConfig(multithreaded_io, io_config))
storage_config = StorageConfig(multithreaded_io, io_config)

iceberg_operator = IcebergScanOperator(pyiceberg_table, snapshot_id=snapshot_id, storage_config=storage_config)

Expand Down
11 changes: 2 additions & 9 deletions daft/io/_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
FileFormatConfig,
IOConfig,
JsonSourceConfig,
NativeStorageConfig,
PythonStorageConfig,
StorageConfig,
)
from daft.dataframe import DataFrame
Expand All @@ -25,7 +23,6 @@ def read_json(
io_config: Optional["IOConfig"] = None,
file_path_column: Optional[str] = None,
hive_partitioning: bool = False,
use_native_downloader: bool = True,
schema_hints: Optional[Dict[str, DataType]] = None,
_buffer_size: Optional[int] = None,
_chunk_size: Optional[int] = None,
Expand All @@ -45,8 +42,6 @@ def read_json(
io_config (IOConfig): Config to be used with the native downloader
file_path_column: Include the source path(s) as a column with this name. Defaults to None.
hive_partitioning: Whether to infer hive_style partitions from file paths and include them as columns in the Dataframe. Defaults to False.
use_native_downloader: Whether to use the native downloader instead of PyArrow for reading Parquet. This
is currently experimental.
returns:
DataFrame: parsed DataFrame
Expand All @@ -68,10 +63,8 @@ def read_json(

json_config = JsonSourceConfig(_buffer_size, _chunk_size)
file_format_config = FileFormatConfig.from_json_config(json_config)
if use_native_downloader:
storage_config = StorageConfig.native(NativeStorageConfig(True, io_config))
else:
storage_config = StorageConfig.python(PythonStorageConfig(io_config=io_config))
storage_config = StorageConfig(True, io_config)

builder = get_tabular_files_scan(
path=path,
infer_schema=infer_schema,
Expand Down
9 changes: 1 addition & 8 deletions daft/io/_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@
from daft.daft import (
FileFormatConfig,
IOConfig,
NativeStorageConfig,
ParquetSourceConfig,
PythonStorageConfig,
StorageConfig,
)
from daft.dataframe import DataFrame
Expand All @@ -26,7 +24,6 @@ def read_parquet(
io_config: Optional["IOConfig"] = None,
file_path_column: Optional[str] = None,
hive_partitioning: bool = False,
use_native_downloader: bool = True,
coerce_int96_timestamp_unit: Optional[Union[str, TimeUnit]] = None,
schema_hints: Optional[Dict[str, DataType]] = None,
_multithreaded_io: Optional[bool] = None,
Expand All @@ -49,7 +46,6 @@ def read_parquet(
io_config (IOConfig): Config to be used with the native downloader
file_path_column: Include the source path(s) as a column with this name. Defaults to None.
hive_partitioning: Whether to infer hive_style partitions from file paths and include them as columns in the Dataframe. Defaults to False.
use_native_downloader: Whether to use the native downloader instead of PyArrow for reading Parquet.
coerce_int96_timestamp_unit: TimeUnit to coerce Int96 TimeStamps to. e.g.: [ns, us, ms], Defaults to None.
_multithreaded_io: Whether to use multithreading for IO threads. Setting this to False can be helpful in reducing
the amount of system resources (number of connections and thread contention) when running in the Ray runner.
Expand Down Expand Up @@ -87,10 +83,7 @@ def read_parquet(
file_format_config = FileFormatConfig.from_parquet_config(
ParquetSourceConfig(coerce_int96_timestamp_unit=pytimeunit, row_groups=row_groups, chunk_size=_chunk_size)
)
if use_native_downloader:
storage_config = StorageConfig.native(NativeStorageConfig(multithreaded_io, io_config))
else:
storage_config = StorageConfig.python(PythonStorageConfig(io_config=io_config))
storage_config = StorageConfig(multithreaded_io, io_config)

builder = get_tabular_files_scan(
path=path,
Expand Down
4 changes: 2 additions & 2 deletions daft/io/_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from daft import context, from_pydict
from daft.api_annotations import PublicAPI
from daft.daft import PythonStorageConfig, ScanOperatorHandle, StorageConfig
from daft.daft import ScanOperatorHandle, StorageConfig
from daft.dataframe import DataFrame
from daft.datatype import DataType
from daft.logical.builder import LogicalPlanBuilder
Expand Down Expand Up @@ -94,7 +94,7 @@ def read_sql(
)

io_config = context.get_context().daft_planning_config.default_io_config
storage_config = StorageConfig.python(PythonStorageConfig(io_config))
storage_config = StorageConfig(True, io_config)

sql_conn = SQLConnection.from_url(conn) if isinstance(conn, str) else SQLConnection.from_connection_factory(conn)
sql_operator = SQLScanOperator(
Expand Down
2 changes: 1 addition & 1 deletion daft/sql/sql_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,8 @@ def _construct_scan_task(
url=self.conn.url,
file_format=file_format_config,
schema=self._schema._schema,
num_rows=num_rows,
storage_config=self.storage_config,
num_rows=num_rows,
size_bytes=size_bytes,
pushdowns=pushdowns if not apply_pushdowns_to_sql else None,
stats=stats,
Expand Down
Loading

0 comments on commit 3a3707a

Please sign in to comment.