Skip to content

Commit 22e24df

Browse files
committed
[SPARK-53812][SDP] Refactor DefineDataset and DefineFlow protos to group related properties and future-proof
### What changes were proposed in this pull request? - In `DefineDataset', pulls out all the properties that are table-specific into their own sub-message. - In `DefineFlow`, pulls out the `relation` property into its own sub-message. ### Why are the changes needed? The DefineDataset and DefineFlow Spark Connect protos are moshpits of properties that could be refactored into a more coherent structure: - In `DefineDataset`, there are a set of properties that are only relevant to tables (not views). They can be - In `DefineFlow`, the relation property refers to flows that write the results of a relation to a target table. In the future, we may want to introduce additional flows types that mutate the target table in different ways. ### Does this PR introduce _any_ user-facing change? No, these protos haven't been shipped yet. ### How was this patch tested? Updated existing tests. ### Was this patch authored or co-authored using generative AI tooling? Closes #52532 from sryza/table-flow-details. Lead-authored-by: Sandy Ryza <[email protected]> Co-authored-by: Sandy Ryza <[email protected]> Signed-off-by: Sandy Ryza <[email protected]>
1 parent d6f713e commit 22e24df

File tree

7 files changed

+358
-214
lines changed

7 files changed

+358
-214
lines changed

python/pyspark/pipelines/spark_connect_graph_element_registry.py

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,6 @@ def __init__(self, spark: SparkSession, dataflow_graph_id: str) -> None:
4747

4848
def register_dataset(self, dataset: Dataset) -> None:
4949
if isinstance(dataset, Table):
50-
table_properties = dataset.table_properties
51-
partition_cols = dataset.partition_cols
52-
5350
if isinstance(dataset.schema, str):
5451
schema_string = dataset.schema
5552
schema_data_type = None
@@ -60,7 +57,15 @@ def register_dataset(self, dataset: Dataset) -> None:
6057
schema_string = None
6158
schema_data_type = None
6259

63-
format = dataset.format
60+
table_details = pb2.PipelineCommand.DefineDataset.TableDetails(
61+
table_properties=dataset.table_properties,
62+
partition_cols=dataset.partition_cols,
63+
format=dataset.format,
64+
# Even though schema_string is not required, the generated Python code seems to
65+
# erroneously think it is required.
66+
schema_string=schema_string, # type: ignore[arg-type]
67+
schema_data_type=schema_data_type,
68+
)
6469

6570
if isinstance(dataset, MaterializedView):
6671
dataset_type = pb2.DatasetType.MATERIALIZED_VIEW
@@ -72,12 +77,8 @@ def register_dataset(self, dataset: Dataset) -> None:
7277
messageParameters={"dataset_type": type(dataset).__name__},
7378
)
7479
elif isinstance(dataset, TemporaryView):
75-
table_properties = None
76-
partition_cols = None
77-
schema_string = None
78-
schema_data_type = None
79-
format = None
8080
dataset_type = pb2.DatasetType.TEMPORARY_VIEW
81+
table_details = None
8182
else:
8283
raise PySparkTypeError(
8384
errorClass="UNSUPPORTED_PIPELINES_DATASET_TYPE",
@@ -89,15 +90,10 @@ def register_dataset(self, dataset: Dataset) -> None:
8990
dataset_name=dataset.name,
9091
dataset_type=dataset_type,
9192
comment=dataset.comment,
92-
table_properties=table_properties,
93-
partition_cols=partition_cols,
94-
format=format,
93+
table_details=table_details,
9594
source_code_location=source_code_location_to_proto(dataset.source_code_location),
96-
# Even though schema_string is not required, the generated Python code seems to
97-
# erroneously think it is required.
98-
schema_string=schema_string, # type: ignore[arg-type]
99-
schema_data_type=schema_data_type,
10095
)
96+
10197
command = pb2.Command()
10298
command.pipeline_command.define_dataset.CopyFrom(inner_command)
10399
self._client.execute_command(command)
@@ -107,11 +103,15 @@ def register_flow(self, flow: Flow) -> None:
107103
df = flow.func()
108104
relation = cast(ConnectDataFrame, df)._plan.plan(self._client)
109105

106+
relation_flow_details = pb2.PipelineCommand.DefineFlow.WriteRelationFlowDetails(
107+
relation=relation,
108+
)
109+
110110
inner_command = pb2.PipelineCommand.DefineFlow(
111111
dataflow_graph_id=self._dataflow_graph_id,
112112
flow_name=flow.name,
113113
target_dataset_name=flow.target,
114-
relation=relation,
114+
relation_flow_details=relation_flow_details,
115115
sql_conf=flow.spark_conf,
116116
source_code_location=source_code_location_to_proto(flow.source_code_location),
117117
)

python/pyspark/sql/connect/proto/pipelines_pb2.py

Lines changed: 60 additions & 47 deletions
Large diffs are not rendered by default.

0 commit comments

Comments
 (0)