Skip to content

Commit

Permalink
Merge branch 'master' into tomnewton/updated_grafana_dashboards
Browse files Browse the repository at this point in the history
  • Loading branch information
eapolinario authored Nov 14, 2023
2 parents eba0493 + 712ee8e commit 54d2242
Show file tree
Hide file tree
Showing 67 changed files with 1,801 additions and 681 deletions.
13 changes: 13 additions & 0 deletions .github/PULL_REQUEST_TEMPLATE.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ Happy contributing!

<!-- Example: Closes #31 -->

## Docs link

<!-- Add documentation link built by CI jobs here, and specify the changed place -->

## Describe your changes

<!-- List all the proposed changes in your PR -->
Expand All @@ -29,10 +33,19 @@ Happy contributing!
- [ ] All new and existing tests passed.
- [ ] All commits are signed-off.

## Setup Process

<!-- Describe how you set up this PR's environment to help maintainers reproduce your results more easily -->

## Screenshots

<!-- Add all the screenshots which support your changes -->

## Note to reviewers

<!-- Add notes to reviewers if applicable -->

## Related PRs

<!-- Add related pull requests for reviewers to check -->

15 changes: 0 additions & 15 deletions .github/workflows/checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,9 @@ concurrency:

on:
pull_request:
paths:
- 'datacatalog/**'
- 'flyteadmin/**'
- 'flytecopilot/**'
- 'flyteplugins/**'
- 'flytepropeller/**'
- 'flytestdlib/**'
push:
branches:
- master
paths:
- 'datacatalog/**'
- 'flyteadmin/**'
- 'flytecopilot/**'
- 'flyteidl/**'
- 'flyteplugins/**'
- 'flytepropeller/**'
- 'flytestdlib/**'
env:
GO_VERSION: "1.19"
PRIORITIES: "P0"
Expand Down
5 changes: 2 additions & 3 deletions .github/workflows/end2end.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,8 @@ jobs:
advanced_composition/advanced_composition/dynamics.py \
advanced_composition/advanced_composition/map_task.py \
advanced_composition/advanced_composition/subworkflows.py \
data_types_and_io/data_types_and_io/custom_objects.py \
data_types_and_io/data_types_and_io/schema.py \
data_types_and_io/data_types_and_io/typed_schema.py ;
data_types_and_io/data_types_and_io/dataclass.py \
data_types_and_io/data_types_and_io/structured_dataset.py ;
do
pyflyte --config ./boilerplate/flyte/end2end/functional-test-config.yaml \
register \
Expand Down
4 changes: 0 additions & 4 deletions .github/workflows/flyteidl-checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,9 @@ concurrency:

on:
pull_request:
paths:
- 'flyteidl/**'
push:
branches:
- master
paths:
- 'flyteidl/**'
env:
GO_VERSION: "1.19"
jobs:
Expand Down
5 changes: 2 additions & 3 deletions .github/workflows/single-binary.yml
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,8 @@ jobs:
advanced_composition/advanced_composition/dynamics.py \
advanced_composition/advanced_composition/map_task.py \
advanced_composition/advanced_composition/subworkflows.py \
data_types_and_io/data_types_and_io/custom_objects.py \
data_types_and_io/data_types_and_io/schema.py \
data_types_and_io/data_types_and_io/typed_schema.py ;
data_types_and_io/data_types_and_io/dataclass.py \
data_types_and_io/data_types_and_io/structured_dataset.py ;
do
pyflyte --config ./boilerplate/flyte/end2end/functional-test-config.yaml \
register \
Expand Down
111 changes: 88 additions & 23 deletions boilerplate/flyte/end2end/run-tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import sys
import time
import traceback
from typing import Dict, List, Mapping, Tuple
from typing import Dict, List, Mapping, Tuple, Optional

import click
import requests
Expand Down Expand Up @@ -50,15 +50,17 @@
("basics.named_outputs.simple_wf_with_named_outputs", {}),
# # Getting a 403 for the wikipedia image
# # ("basics.reference_task.wf", {}),
("data_types_and_io.custom_objects.wf", {"x": 10, "y": 20}),
("data_types_and_io.dataclass.dataclass_wf", {"x": 10, "y": 20}),
# Enums are not supported in flyteremote
# ("type_system.enums.enum_wf", {"c": "red"}),
("data_types_and_io.schema.df_wf", {"a": 42}),
("data_types_and_io.typed_schema.wf", {}),
("data_types_and_io.structured_dataset.simple_sd_wf", {"a": 42}),
# ("my.imperative.workflow.example", {"in1": "hello", "in2": "foo"}),
],
"integrations-k8s-spark": [
("k8s_spark_plugin.pyspark_pi.my_spark", {"triggered_date": datetime.datetime.now()}),
(
"k8s_spark_plugin.pyspark_pi.my_spark",
{"triggered_date": datetime.datetime.now()},
),
],
"integrations-kfpytorch": [
("kfpytorch_plugin.pytorch_mnist.pytorch_training_wf", {}),
Expand Down Expand Up @@ -89,20 +91,30 @@
}


def execute_workflow(remote, version, workflow_name, inputs):
def execute_workflow(
remote: FlyteRemote,
version,
workflow_name,
inputs,
cluster_pool_name: Optional[str] = None,
):
print(f"Fetching workflow={workflow_name} and version={version}")
wf = remote.fetch_workflow(name=workflow_name, version=version)
return remote.execute(wf, inputs=inputs, wait=False)
return remote.execute(wf, inputs=inputs, wait=False, cluster_pool=cluster_pool_name)


def executions_finished(executions_by_wfgroup: Dict[str, List[FlyteWorkflowExecution]]) -> bool:
def executions_finished(
executions_by_wfgroup: Dict[str, List[FlyteWorkflowExecution]]
) -> bool:
for executions in executions_by_wfgroup.values():
if not all([execution.is_done for execution in executions]):
return False
return True


def sync_executions(remote: FlyteRemote, executions_by_wfgroup: Dict[str, List[FlyteWorkflowExecution]]):
def sync_executions(
remote: FlyteRemote, executions_by_wfgroup: Dict[str, List[FlyteWorkflowExecution]]
):
try:
for executions in executions_by_wfgroup.values():
for execution in executions:
Expand All @@ -125,6 +137,7 @@ def schedule_workflow_groups(
workflow_groups: List[str],
remote: FlyteRemote,
terminate_workflow_on_failure: bool,
cluster_pool_name: Optional[str] = None,
) -> Dict[str, bool]:
"""
Schedule workflows executions for all workflow groups and return True if all executions succeed, otherwise
Expand All @@ -135,14 +148,19 @@ def schedule_workflow_groups(
for wf_group in workflow_groups:
workflows = FLYTESNACKS_WORKFLOW_GROUPS.get(wf_group, [])
executions_by_wfgroup[wf_group] = [
execute_workflow(remote, tag, workflow[0], workflow[1]) for workflow in workflows
execute_workflow(remote, tag, workflow[0], workflow[1], cluster_pool_name)
for workflow in workflows
]

# Wait for all executions to finish
attempt = 0
while attempt == 0 or (not executions_finished(executions_by_wfgroup) and attempt < MAX_ATTEMPTS):
while attempt == 0 or (
not executions_finished(executions_by_wfgroup) and attempt < MAX_ATTEMPTS
):
attempt += 1
print(f"Not all executions finished yet. Sleeping for some time, will check again in {WAIT_TIME}s")
print(
f"Not all executions finished yet. Sleeping for some time, will check again in {WAIT_TIME}s"
)
time.sleep(WAIT_TIME)
sync_executions(remote, executions_by_wfgroup)

Expand All @@ -158,9 +176,13 @@ def schedule_workflow_groups(
if len(non_succeeded_executions) != 0:
print(f"Failed executions for {wf_group}:")
for execution in non_succeeded_executions:
print(f" workflow={execution.spec.launch_plan.name}, execution_id={execution.id.name}")
print(
f" workflow={execution.spec.launch_plan.name}, execution_id={execution.id.name}"
)
if terminate_workflow_on_failure:
remote.terminate(execution, "aborting execution scheduled in functional test")
remote.terminate(
execution, "aborting execution scheduled in functional test"
)
# A workflow group succeeds iff all of its executions succeed
results[wf_group] = len(non_succeeded_executions) == 0
return results
Expand All @@ -179,25 +201,33 @@ def run(
priorities: List[str],
config_file_path,
terminate_workflow_on_failure: bool,
test_project_name: str,
test_project_domain: str,
cluster_pool_name: Optional[str] = None,
) -> List[Dict[str, str]]:
remote = FlyteRemote(
Config.auto(config_file=config_file_path),
default_project="flytesnacks",
default_domain="development",
test_project_name,
test_project_domain,
)

# For a given release tag and priority, this function filters the workflow groups from the flytesnacks
# manifest file. For example, for the release tag "v0.2.224" and the priority "P0" it returns [ "core" ].
manifest_url = (
"https://raw.githubusercontent.com/flyteorg/flytesnacks/" f"{flytesnacks_release_tag}/flyte_tests_manifest.json"
"https://raw.githubusercontent.com/flyteorg/flytesnacks/"
f"{flytesnacks_release_tag}/flyte_tests_manifest.json"
)
r = requests.get(manifest_url)
parsed_manifest = r.json()
workflow_groups = []
workflow_groups = (
["lite"]
if "lite" in priorities
else [group["name"] for group in parsed_manifest if group["priority"] in priorities]
else [
group["name"]
for group in parsed_manifest
if group["priority"] in priorities
]
)

results = []
Expand All @@ -215,7 +245,11 @@ def run(
valid_workgroups.append(workflow_group)

results_by_wfgroup = schedule_workflow_groups(
flytesnacks_release_tag, valid_workgroups, remote, terminate_workflow_on_failure
flytesnacks_release_tag,
valid_workgroups,
remote,
terminate_workflow_on_failure,
cluster_pool_name,
)

for workflow_group, succeeded in results_by_wfgroup.items():
Expand Down Expand Up @@ -246,6 +280,9 @@ def run(


@click.command()
@click.argument("flytesnacks_release_tag")
@click.argument("priorities")
@click.argument("config_file")
@click.option(
"--return_non_zero_on_failure",
default=False,
Expand All @@ -258,18 +295,46 @@ def run(
is_flag=True,
help="Abort failing workflows upon exit",
)
@click.argument("flytesnacks_release_tag")
@click.argument("priorities")
@click.argument("config_file")
@click.option(
"--test_project_name",
default="flytesnacks",
type=str,
is_flag=False,
help="Name of project to run functional tests on",
)
@click.option(
"--test_project_domain",
default="development",
type=str,
is_flag=False,
help="Name of domain in project to run functional tests on",
)
@click.argument(
"cluster_pool_name",
required=False,
type=str,
default=None,
)
def cli(
flytesnacks_release_tag,
priorities,
config_file,
return_non_zero_on_failure,
terminate_workflow_on_failure,
test_project_name,
test_project_domain,
cluster_pool_name,
):
print(f"return_non_zero_on_failure={return_non_zero_on_failure}")
results = run(flytesnacks_release_tag, priorities, config_file, terminate_workflow_on_failure)
results = run(
flytesnacks_release_tag,
priorities,
config_file,
terminate_workflow_on_failure,
test_project_name,
test_project_domain,
cluster_pool_name,
)

# Write a json object in its own line describing the result of this run to stdout
print(f"Result of run:\n{json.dumps(results)}")
Expand Down
3 changes: 2 additions & 1 deletion charts/flyte-binary/eks-starter.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
configuration:
database:
username: <DB_USERNAME>
password: <DB_PASSWORD>
host: <RDS_HOST_DNS>
dbname: app # what is this? why not flyteadmin or flyte?
dbname: flyteadmin (<INITAL_DB>)
storage:
metadataContainer: <BUCKET_NAME>
userDataContainer: <USER_DATA_BUCKET_NAME>
Expand Down
1 change: 1 addition & 0 deletions charts/flyte-binary/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ spec:
httpGet:
path: /healthcheck
port: http
initialDelaySeconds: 30
{{- end }}
{{- if .Values.deployment.resources }}
resources: {{- toYaml .Values.deployment.resources | nindent 12 }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ spec:
httpGet:
path: /healthcheck
port: http
initialDelaySeconds: 30
volumeMounts:
- name: cluster-resource-templates
mountPath: /etc/flyte/cluster-resource-templates
Expand Down
7 changes: 6 additions & 1 deletion flyteadmin/auth/token.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,12 @@ func IdentityContextFromIDTokenToken(ctx context.Context, tokenStr, clientID str
logger.Infof(ctx, "Failed to unmarshal claims from id token, err: %v", err)
}

// TODO: Document why automatically specify "all" scope
// This path is used when a user logs into the UI and when you login through the UI, you should have all the capabilities your identity
// allows you to have, which is denoted by the "all" scope.
// There was a plan to one day define one of a handful of scopes (all, proj admin, user, viewer) and if you configure your IDP
// to issue the right scopes, admin can do very light weight 'AuthZ' on admin based on these scopes, but until that plan is effected,
// we just use this single scope that Admin expects for all methods
// And because not all IdPs allow us to configure the Identity Token claims, the scope needs to live here.
return NewIdentityContext(idToken.Audience[0], idToken.Subject, "", idToken.IssuedAt,
sets.NewString(ScopeAll), userInfo, claims)
}
5 changes: 5 additions & 0 deletions flyteadmin/pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,8 @@ func NewWorkflowExistsIdenticalStructureError(ctx context.Context, request *admi
}
return statusErr
}

func IsDoesNotExistError(err error) bool {
adminError, ok := err.(FlyteAdminError)
return ok && adminError.Code() == codes.NotFound
}
13 changes: 13 additions & 0 deletions flyteadmin/pkg/errors/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package errors

import (
"context"
"errors"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -90,3 +91,15 @@ func TestNewWorkflowExistsIdenticalStructureError(t *testing.T) {
_, ok = details.GetReason().(*admin.CreateWorkflowFailureReason_ExistsIdenticalStructure)
assert.True(t, ok)
}

func TestIsDoesNotExistError(t *testing.T) {
assert.True(t, IsDoesNotExistError(NewFlyteAdminError(codes.NotFound, "foo")))
}

func TestIsNotDoesNotExistError(t *testing.T) {
assert.False(t, IsDoesNotExistError(NewFlyteAdminError(codes.Canceled, "foo")))
}

func TestIsNotDoesNotExistErrorBecauseOfNoneAdminError(t *testing.T) {
assert.False(t, IsDoesNotExistError(errors.New("foo")))
}
Loading

0 comments on commit 54d2242

Please sign in to comment.