diff --git a/libraries/dagster-delta-polars/dagster_delta_polars/deltalake_polars_type_handler.py b/libraries/dagster-delta-polars/dagster_delta_polars/deltalake_polars_type_handler.py index 8eda082..7770aad 100644 --- a/libraries/dagster-delta-polars/dagster_delta_polars/deltalake_polars_type_handler.py +++ b/libraries/dagster-delta-polars/dagster_delta_polars/deltalake_polars_type_handler.py @@ -58,11 +58,17 @@ def load_input( """Loads the input as a Polars DataFrame or LazyFrame.""" metadata = context.metadata if context.metadata is not None else {} date_format = extract_date_format_from_partition_definition(context) + parquet_read_options = ( + context.resource_config.get("parquet_read_options", None) + if context.resource_config is not None + else None + ) dataset = _table_reader( table_slice, connection, version=metadata.get("table_version"), date_format=date_format, + parquet_read_options=parquet_read_options, ) if table_slice.columns is not None: diff --git a/libraries/dagster-delta-polars/dagster_delta_polars/lakefs_io/deltalake_polars_lakefs_type_handler.py b/libraries/dagster-delta-polars/dagster_delta_polars/lakefs_io/deltalake_polars_lakefs_type_handler.py index e2eeda6..1a80ade 100644 --- a/libraries/dagster-delta-polars/dagster_delta_polars/lakefs_io/deltalake_polars_lakefs_type_handler.py +++ b/libraries/dagster-delta-polars/dagster_delta_polars/lakefs_io/deltalake_polars_lakefs_type_handler.py @@ -125,7 +125,6 @@ def handle_output( table_uri=new_table_uri, storage_options=connection.storage_options, table_config=connection.table_config, - parquet_read_options=connection.parquet_read_options, ) super().handle_output(context, table_slice, obj, new_connection) self.repository.branch(step_branch_name).commit( diff --git a/libraries/dagster-delta/dagster_delta/handler.py b/libraries/dagster-delta/dagster_delta/handler.py index a217fdf..0d91cb5 100644 --- a/libraries/dagster-delta/dagster_delta/handler.py +++ b/libraries/dagster-delta/dagster_delta/handler.py @@ -297,7 +297,12 @@ def load_input( connection: TableConnection, ) -> T: """Loads the input as a pyarrow Table or RecordBatchReader.""" - dataset = _table_reader(table_slice, connection) + parquet_read_options = ( + context.resource_config.get("parquet_read_options", None) + if context.resource_config is not None + else None + ) + dataset = _table_reader(table_slice, connection, parquet_read_options=parquet_read_options) if context.dagster_type.typing_type == ds.Dataset: if table_slice.columns is not None: @@ -512,6 +517,7 @@ def _table_reader( connection: TableConnection, version: Optional[int] = None, date_format: Optional[dict[str, str]] = None, + parquet_read_options: Optional[ds.ParquetReadOptions] = None, ) -> ds.Dataset: table = DeltaTable( table_uri=connection.table_uri, @@ -534,7 +540,7 @@ def _table_reader( partition_expr = filters_to_expression([partition_filters]) logger.debug("Dataset input predicate %s", partition_expr) - dataset = table.to_pyarrow_dataset(parquet_read_options=connection.parquet_read_options) + dataset = table.to_pyarrow_dataset(parquet_read_options=parquet_read_options) if partition_expr is not None: dataset = dataset.filter(expression=partition_expr) diff --git a/libraries/dagster-delta/dagster_delta/io_manager.py b/libraries/dagster-delta/dagster_delta/io_manager.py index f41d37e..ba84149 100644 --- a/libraries/dagster-delta/dagster_delta/io_manager.py +++ b/libraries/dagster-delta/dagster_delta/io_manager.py @@ -37,7 +37,6 @@ class TableConnection: # noqa: D101 table_uri: str storage_options: dict[str, str] table_config: Optional[dict[str, str]] - parquet_read_options: Optional[ds.ParquetReadOptions] class _StorageOptionsConfig(TypedDict, total=False): @@ -250,7 +249,6 @@ def connect( # noqa: D102 resource_config = cast(_DeltaTableIOManagerResourceConfig, context.resource_config) root_uri = resource_config["root_uri"].rstrip("/") storage_options = resource_config["storage_options"] - parquet_read_options = resource_config.get("parquet_read_options", None) if "local" in storage_options: storage_options = storage_options["local"] @@ -281,7 +279,6 @@ def connect( # noqa: D102 table_uri=table_uri, storage_options=storage_options or {}, table_config=table_config, - parquet_read_options=parquet_read_options, ) yield conn