-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK- 52810][SDP][SQL] Spark Pipelines CLI Selection Options #51507
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK- 52810][SDP][SQL] Spark Pipelines CLI Selection Options #51507
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Flushing out some thoughts! Haven't looked at tests yet.
python/pyspark/pipelines/cli.py
Outdated
if full_refresh_all: | ||
if full_refresh: | ||
raise PySparkException( | ||
errorClass="CONFLICTING_PIPELINE_REFRESH_OPTIONS", messageParameters={} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thoughts on having sub error classes for mismatched combinations? Or maybe just pass along which two configs are conflicting as a message parameter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added logic to pass along the conflicting option
python/pyspark/pipelines/cli.py
Outdated
result = [] | ||
for table_list in table_lists: | ||
result.extend(table_list) | ||
return result if result else None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If result is an empty list, do we still want to return None? Or should we just return the empty list? What is the implication of either here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed this by using the extend
option in arg parser to avoid creating nested list.
"--full-refresh", | ||
type=parse_table_list, | ||
action="append", | ||
help="List of datasets to reset and recompute (comma-separated).", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here and below, should we document default behavior if this arg is not specified at all?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will extend
split using commas?
python/pyspark/pipelines/cli.py
Outdated
run(spec_path=spec_path) | ||
run( | ||
spec_path=spec_path, | ||
full_refresh=flatten_table_lists(args.full_refresh), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to flatten args.full_refresh
and args.refresh
? I thought we defined their types with the parse_table_list
function, which returns List[str]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is for the case if user provide the same args multiple times.
Ex: (--full_refresh: "a,b"
--full_refresh: "c,d"
). Then we will receive a nested list [["a","b"],["c"]]
. Need to perform a flattening to transform it into a 1D list.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah got it, makes sense
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we were to mark this argument field as extend
rather than append
, would we still need to do any manual flattening?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very good point, extend
creates a 1D list directly.
sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
Outdated
Show resolved
Hide resolved
sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala
Outdated
Show resolved
Hide resolved
@@ -224,6 +225,64 @@ private[connect] object PipelinesHandler extends Logging { | |||
sessionHolder: SessionHolder): Unit = { | |||
val dataflowGraphId = cmd.getDataflowGraphId | |||
val graphElementRegistry = DataflowGraphRegistry.getDataflowGraphOrThrow(dataflowGraphId) | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we extract all this added logic to deduce the full refresh and regular refresh table filters into its own function? And then as part of the scala docs, map the expected filter results depending on what combination of full refresh and partial refresh is selected
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extracted a createTableFilters function
...ipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelineUpdateContextImpl.scala
Outdated
Show resolved
Hide resolved
if (refreshTables.nonEmpty && fullRefreshTables.nonEmpty) { | ||
// check if there is an intersection between the subset | ||
val intersection = refreshTableNames.intersect(fullRefreshTableNames) | ||
if (intersection.nonEmpty) { | ||
throw new IllegalArgumentException( | ||
"Datasets specified for refresh and full refresh cannot overlap: " + | ||
s"${intersection.mkString(", ")}") | ||
} | ||
} | ||
|
||
val fullRefreshTablesFilter: TableFilter = if (fullRefreshAll) { | ||
AllTables | ||
} else if (fullRefreshTables.nonEmpty) { | ||
SomeTables(fullRefreshTableNames) | ||
} else { | ||
NoTables | ||
} | ||
|
||
val refreshTablesFilter: TableFilter = | ||
if (refreshTables.nonEmpty) { | ||
SomeTables(refreshTableNames) | ||
} else if (fullRefreshTablesFilter != NoTables) { | ||
NoTables | ||
} else { | ||
AllTables | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just an optional nit, but as a code reader it's difficult for me to reason about the combinations of fullRefreshTables
and refreshTables
when reading them as sequential but related validation here.
My suggestion would be to restructure this as a match statement, that explicitly handles each combination. Ex.
(fullRefreshTables, refreshTableNames) match {
case (Nil, Nil) => ...
case (fullRefreshTables, Nil) => ...
case ...
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extracted a createTableFilters function
python/pyspark/pipelines/cli.py
Outdated
@@ -28,7 +28,7 @@ | |||
import yaml | |||
from dataclasses import dataclass | |||
from pathlib import Path | |||
from typing import Any, Generator, Mapping, Optional, Sequence | |||
from typing import Any, Generator, Mapping, Optional, Sequence, List |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of alphabetical order: you may need to run dev/reformat-python
to format this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually it didn't reformat this but I manually reordered it
@@ -217,8 +217,30 @@ def change_dir(path: Path) -> Generator[None, None, None]: | |||
os.chdir(prev) | |||
|
|||
|
|||
def run(spec_path: Path) -> None: | |||
"""Run the pipeline defined with the given spec.""" | |||
def run( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we expect it to vary across run for the same pipeline, it should be a CLI arg. If we expect it to be static for a pipeline, it should live in the spec. I would expect selections to vary across runs.
not should_test_connect or not have_yaml, | ||
connect_requirement_message or yaml_requirement_message, | ||
) | ||
class CLIValidationTests(unittest.TestCase): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a meaningful difference between the kinds of tests that are included in this class and the kinds of tests that included in the other class in this file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah I think they can be combined into one.
.../test/scala/org/apache/spark/sql/connect/pipelines/SparkDeclarativePipelinesServerTest.scala
Outdated
Show resolved
Hide resolved
...r/src/test/scala/org/apache/spark/sql/connect/pipelines/PipelineRefreshFunctionalSuite.scala
Outdated
Show resolved
Hide resolved
...r/src/test/scala/org/apache/spark/sql/connect/pipelines/PipelineRefreshFunctionalSuite.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Merged to master |
What changes were proposed in this pull request?
We want to give user the ability to choose a subset of datasets (ex: tables, materialized views) to include in a run.
And the ability to specify if they should ran as regular refresh or full refresh.
Below arguments being added to the
spark-pipelines
CLI to achieve this--full-refresh
: List of datasets to reset and recompute.--full-refresh-all
: Boolean, whether to perform a full graph reset and recompute.--refresh
: List of datasets to update.If no options are specified, the default is to perform a
refresh
for all datasets in the pipeline.To enable above:
TableFilter
to control graph refreshWhy are the changes needed?
These changes are needed because we want to give users option to control what to run and how to run for their pipelines.
Does this PR introduce any user-facing change?
Yes, new CLI options are being added. However, SDP haven't been released yet so no user should be impacted.
How was this patch tested?
Added new test suite in the python CLI to verify argument parsing.
Added new test suite in scala codebase to use the newly added CLI options to run a full pipeline to verify behavior.
Was this patch authored or co-authored using generative AI tooling?
No