Skip to content

Commit

Permalink
PR Feedback - don't store parquet read options in table connection
Browse files Browse the repository at this point in the history
  • Loading branch information
Stefan Verbruggen committed Aug 23, 2024
1 parent 9dc8cf3 commit edc6a04
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,17 @@ def load_input(
"""Loads the input as a Polars DataFrame or LazyFrame."""
metadata = context.metadata if context.metadata is not None else {}
date_format = extract_date_format_from_partition_definition(context)
parquet_read_options = (
context.resource_config.get("parquet_read_options", None)
if context.resource_config is not None
else None
)
dataset = _table_reader(
table_slice,
connection,
version=metadata.get("table_version"),
date_format=date_format,
parquet_read_options=parquet_read_options,
)

if table_slice.columns is not None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ def handle_output(
table_uri=new_table_uri,
storage_options=connection.storage_options,
table_config=connection.table_config,
parquet_read_options=connection.parquet_read_options,
)
super().handle_output(context, table_slice, obj, new_connection)
self.repository.branch(step_branch_name).commit(
Expand Down
10 changes: 8 additions & 2 deletions libraries/dagster-delta/dagster_delta/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,12 @@ def load_input(
connection: TableConnection,
) -> T:
"""Loads the input as a pyarrow Table or RecordBatchReader."""
dataset = _table_reader(table_slice, connection)
parquet_read_options = (
context.resource_config.get("parquet_read_options", None)
if context.resource_config is not None
else None
)
dataset = _table_reader(table_slice, connection, parquet_read_options=parquet_read_options)

if context.dagster_type.typing_type == ds.Dataset:
if table_slice.columns is not None:
Expand Down Expand Up @@ -512,6 +517,7 @@ def _table_reader(
connection: TableConnection,
version: Optional[int] = None,
date_format: Optional[dict[str, str]] = None,
parquet_read_options: Optional[ds.ParquetReadOptions] = None,
) -> ds.Dataset:
table = DeltaTable(
table_uri=connection.table_uri,
Expand All @@ -534,7 +540,7 @@ def _table_reader(
partition_expr = filters_to_expression([partition_filters])

logger.debug("Dataset input predicate %s", partition_expr)
dataset = table.to_pyarrow_dataset(parquet_read_options=connection.parquet_read_options)
dataset = table.to_pyarrow_dataset(parquet_read_options=parquet_read_options)
if partition_expr is not None:
dataset = dataset.filter(expression=partition_expr)

Expand Down
3 changes: 0 additions & 3 deletions libraries/dagster-delta/dagster_delta/io_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ class TableConnection: # noqa: D101
table_uri: str
storage_options: dict[str, str]
table_config: Optional[dict[str, str]]
parquet_read_options: Optional[ds.ParquetReadOptions]


class _StorageOptionsConfig(TypedDict, total=False):
Expand Down Expand Up @@ -250,7 +249,6 @@ def connect( # noqa: D102
resource_config = cast(_DeltaTableIOManagerResourceConfig, context.resource_config)
root_uri = resource_config["root_uri"].rstrip("/")
storage_options = resource_config["storage_options"]
parquet_read_options = resource_config.get("parquet_read_options", None)

if "local" in storage_options:
storage_options = storage_options["local"]
Expand Down Expand Up @@ -281,7 +279,6 @@ def connect( # noqa: D102
table_uri=table_uri,
storage_options=storage_options or {},
table_config=table_config,
parquet_read_options=parquet_read_options,
)

yield conn
Expand Down

0 comments on commit edc6a04

Please sign in to comment.