diff --git a/boilerplate/flyte/end2end/run-tests.py b/boilerplate/flyte/end2end/run-tests.py index 5365da006e..9f8a8e85cb 100644 --- a/boilerplate/flyte/end2end/run-tests.py +++ b/boilerplate/flyte/end2end/run-tests.py @@ -5,7 +5,7 @@ import sys import time import traceback -from typing import Dict, List, Mapping, Tuple +from typing import Dict, List, Mapping, Tuple, Optional import click import requests @@ -50,15 +50,17 @@ ("basics.named_outputs.simple_wf_with_named_outputs", {}), # # Getting a 403 for the wikipedia image # # ("basics.reference_task.wf", {}), - ("data_types_and_io.custom_objects.wf", {"x": 10, "y": 20}), + ("data_types_and_io.dataclass.dataclass_wf", {"x": 10, "y": 20}), # Enums are not supported in flyteremote # ("type_system.enums.enum_wf", {"c": "red"}), - ("data_types_and_io.schema.df_wf", {"a": 42}), - ("data_types_and_io.typed_schema.wf", {}), + ("data_types_and_io.structured_dataset.simple_sd_wf", {"a": 42}), # ("my.imperative.workflow.example", {"in1": "hello", "in2": "foo"}), ], "integrations-k8s-spark": [ - ("k8s_spark_plugin.pyspark_pi.my_spark", {"triggered_date": datetime.datetime.now()}), + ( + "k8s_spark_plugin.pyspark_pi.my_spark", + {"triggered_date": datetime.datetime.now()}, + ), ], "integrations-kfpytorch": [ ("kfpytorch_plugin.pytorch_mnist.pytorch_training_wf", {}), @@ -89,20 +91,30 @@ } -def execute_workflow(remote, version, workflow_name, inputs): +def execute_workflow( + remote: FlyteRemote, + version, + workflow_name, + inputs, + cluster_pool_name: Optional[str] = None, +): print(f"Fetching workflow={workflow_name} and version={version}") wf = remote.fetch_workflow(name=workflow_name, version=version) - return remote.execute(wf, inputs=inputs, wait=False) + return remote.execute(wf, inputs=inputs, wait=False, cluster_pool=cluster_pool_name) -def executions_finished(executions_by_wfgroup: Dict[str, List[FlyteWorkflowExecution]]) -> bool: +def executions_finished( + executions_by_wfgroup: Dict[str, List[FlyteWorkflowExecution]] +) -> bool: for executions in executions_by_wfgroup.values(): if not all([execution.is_done for execution in executions]): return False return True -def sync_executions(remote: FlyteRemote, executions_by_wfgroup: Dict[str, List[FlyteWorkflowExecution]]): +def sync_executions( + remote: FlyteRemote, executions_by_wfgroup: Dict[str, List[FlyteWorkflowExecution]] +): try: for executions in executions_by_wfgroup.values(): for execution in executions: @@ -125,6 +137,7 @@ def schedule_workflow_groups( workflow_groups: List[str], remote: FlyteRemote, terminate_workflow_on_failure: bool, + cluster_pool_name: Optional[str] = None, ) -> Dict[str, bool]: """ Schedule workflows executions for all workflow groups and return True if all executions succeed, otherwise @@ -135,14 +148,19 @@ def schedule_workflow_groups( for wf_group in workflow_groups: workflows = FLYTESNACKS_WORKFLOW_GROUPS.get(wf_group, []) executions_by_wfgroup[wf_group] = [ - execute_workflow(remote, tag, workflow[0], workflow[1]) for workflow in workflows + execute_workflow(remote, tag, workflow[0], workflow[1], cluster_pool_name) + for workflow in workflows ] # Wait for all executions to finish attempt = 0 - while attempt == 0 or (not executions_finished(executions_by_wfgroup) and attempt < MAX_ATTEMPTS): + while attempt == 0 or ( + not executions_finished(executions_by_wfgroup) and attempt < MAX_ATTEMPTS + ): attempt += 1 - print(f"Not all executions finished yet. Sleeping for some time, will check again in {WAIT_TIME}s") + print( + f"Not all executions finished yet. Sleeping for some time, will check again in {WAIT_TIME}s" + ) time.sleep(WAIT_TIME) sync_executions(remote, executions_by_wfgroup) @@ -158,9 +176,13 @@ def schedule_workflow_groups( if len(non_succeeded_executions) != 0: print(f"Failed executions for {wf_group}:") for execution in non_succeeded_executions: - print(f" workflow={execution.spec.launch_plan.name}, execution_id={execution.id.name}") + print( + f" workflow={execution.spec.launch_plan.name}, execution_id={execution.id.name}" + ) if terminate_workflow_on_failure: - remote.terminate(execution, "aborting execution scheduled in functional test") + remote.terminate( + execution, "aborting execution scheduled in functional test" + ) # A workflow group succeeds iff all of its executions succeed results[wf_group] = len(non_succeeded_executions) == 0 return results @@ -179,17 +201,21 @@ def run( priorities: List[str], config_file_path, terminate_workflow_on_failure: bool, + test_project_name: str, + test_project_domain: str, + cluster_pool_name: Optional[str] = None, ) -> List[Dict[str, str]]: remote = FlyteRemote( Config.auto(config_file=config_file_path), - default_project="flytesnacks", - default_domain="development", + test_project_name, + test_project_domain, ) # For a given release tag and priority, this function filters the workflow groups from the flytesnacks # manifest file. For example, for the release tag "v0.2.224" and the priority "P0" it returns [ "core" ]. manifest_url = ( - "https://raw.githubusercontent.com/flyteorg/flytesnacks/" f"{flytesnacks_release_tag}/flyte_tests_manifest.json" + "https://raw.githubusercontent.com/flyteorg/flytesnacks/" + f"{flytesnacks_release_tag}/flyte_tests_manifest.json" ) r = requests.get(manifest_url) parsed_manifest = r.json() @@ -197,7 +223,11 @@ def run( workflow_groups = ( ["lite"] if "lite" in priorities - else [group["name"] for group in parsed_manifest if group["priority"] in priorities] + else [ + group["name"] + for group in parsed_manifest + if group["priority"] in priorities + ] ) results = [] @@ -215,7 +245,11 @@ def run( valid_workgroups.append(workflow_group) results_by_wfgroup = schedule_workflow_groups( - flytesnacks_release_tag, valid_workgroups, remote, terminate_workflow_on_failure + flytesnacks_release_tag, + valid_workgroups, + remote, + terminate_workflow_on_failure, + cluster_pool_name, ) for workflow_group, succeeded in results_by_wfgroup.items(): @@ -246,6 +280,9 @@ def run( @click.command() +@click.argument("flytesnacks_release_tag") +@click.argument("priorities") +@click.argument("config_file") @click.option( "--return_non_zero_on_failure", default=False, @@ -258,18 +295,46 @@ def run( is_flag=True, help="Abort failing workflows upon exit", ) -@click.argument("flytesnacks_release_tag") -@click.argument("priorities") -@click.argument("config_file") +@click.option( + "--test_project_name", + default="flytesnacks", + type=str, + is_flag=False, + help="Name of project to run functional tests on", +) +@click.option( + "--test_project_domain", + default="development", + type=str, + is_flag=False, + help="Name of domain in project to run functional tests on", +) +@click.argument( + "cluster_pool_name", + required=False, + type=str, + default=None, +) def cli( flytesnacks_release_tag, priorities, config_file, return_non_zero_on_failure, terminate_workflow_on_failure, + test_project_name, + test_project_domain, + cluster_pool_name, ): print(f"return_non_zero_on_failure={return_non_zero_on_failure}") - results = run(flytesnacks_release_tag, priorities, config_file, terminate_workflow_on_failure) + results = run( + flytesnacks_release_tag, + priorities, + config_file, + terminate_workflow_on_failure, + test_project_name, + test_project_domain, + cluster_pool_name, + ) # Write a json object in its own line describing the result of this run to stdout print(f"Result of run:\n{json.dumps(results)}") diff --git a/boilerplate/flyte/golang_test_targets/Makefile b/boilerplate/flyte/golang_test_targets/Makefile index d3ab9817f7..280e1e55e4 100644 --- a/boilerplate/flyte/golang_test_targets/Makefile +++ b/boilerplate/flyte/golang_test_targets/Makefile @@ -3,9 +3,6 @@ # # TO OPT OUT OF UPDATES, SEE https://github.com/flyteorg/boilerplate/blob/master/Readme.rst -.PHONY: codespell -codespell: - git ls-files | grep -vE 'go\.mod|go\.sum|flyteidl/|\.pb$$|\.git|\.pdf|\.svg|requirements\.txt|gen/' | xargs codespell -w .PHONY: download_tooling download_tooling: #download dependencies (including test deps) for the package @@ -16,8 +13,8 @@ generate: download_tooling #generate go code @boilerplate/flyte/golang_test_targets/go-gen.sh .PHONY: lint -lint: codespell download_tooling #lints the package for common code smells - GL_DEBUG=linters_output,env golangci-lint run --fix --deadline=5m --exclude deprecated -v +lint: download_tooling #lints the package for common code smells + GL_DEBUG=linters_output,env golangci-lint run --deadline=5m --exclude deprecated -v # If code is failing goimports linter, this will fix. # skips 'vendor' diff --git a/boilerplate/flyte/golang_test_targets/download_tooling.sh b/boilerplate/flyte/golang_test_targets/download_tooling.sh index c7e5577ef3..9cd49959f4 100755 --- a/boilerplate/flyte/golang_test_targets/download_tooling.sh +++ b/boilerplate/flyte/golang_test_targets/download_tooling.sh @@ -19,6 +19,7 @@ tools=( "github.com/EngHabu/mockery/cmd/mockery" "github.com/flyteorg/flytestdlib/cli/pflags@latest" "github.com/golangci/golangci-lint/cmd/golangci-lint" + "github.com/daixiang0/gci" "github.com/alvaroloes/enumer" "github.com/pseudomuto/protoc-gen-doc/cmd/protoc-gen-doc" ) diff --git a/boilerplate/flyte/golang_test_targets/goimports b/boilerplate/flyte/golang_test_targets/goimports index af1829036c..40f50d106e 100755 --- a/boilerplate/flyte/golang_test_targets/goimports +++ b/boilerplate/flyte/golang_test_targets/goimports @@ -6,3 +6,4 @@ # TO OPT OUT OF UPDATES, SEE https://github.com/flyteorg/boilerplate/blob/master/Readme.rst goimports -w $(find . -type f -name '*.go' -not -path "./vendor/*" -not -path "./pkg/client/*" -not -path "./boilerplate/*") +gci write -s standard -s default -s "prefix(github.com/flyteorg)" --custom-order --skip-generated .