Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Silence NotFound when get task resource #4388

Merged
merged 4 commits into from
Nov 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions flyteadmin/pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,8 @@ func NewWorkflowExistsIdenticalStructureError(ctx context.Context, request *admi
}
return statusErr
}

func IsDoesNotExistError(err error) bool {
hamersaw marked this conversation as resolved.
Show resolved Hide resolved
adminError, ok := err.(FlyteAdminError)
return ok && adminError.Code() == codes.NotFound
}
13 changes: 13 additions & 0 deletions flyteadmin/pkg/errors/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package errors

import (
"context"
"errors"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -90,3 +91,15 @@ func TestNewWorkflowExistsIdenticalStructureError(t *testing.T) {
_, ok = details.GetReason().(*admin.CreateWorkflowFailureReason_ExistsIdenticalStructure)
assert.True(t, ok)
}

func TestIsDoesNotExistError(t *testing.T) {
assert.True(t, IsDoesNotExistError(NewFlyteAdminError(codes.NotFound, "foo")))
}

func TestIsNotDoesNotExistError(t *testing.T) {
assert.False(t, IsDoesNotExistError(NewFlyteAdminError(codes.Canceled, "foo")))
}

func TestIsNotDoesNotExistErrorBecauseOfNoneAdminError(t *testing.T) {
assert.False(t, IsDoesNotExistError(errors.New("foo")))
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
"hash/fnv"
"math/rand"

"google.golang.org/grpc/codes"

"github.com/flyteorg/flyte/flyteadmin/pkg/errors"
"github.com/flyteorg/flyte/flyteadmin/pkg/executioncluster"
"github.com/flyteorg/flyte/flyteadmin/pkg/executioncluster/interfaces"
Expand Down Expand Up @@ -102,10 +100,8 @@
LaunchPlan: spec.LaunchPlan,
ResourceType: admin.MatchableResource_EXECUTION_CLUSTER_LABEL,
})
if err != nil {
if flyteAdminError, ok := err.(errors.FlyteAdminError); !ok || flyteAdminError.Code() != codes.NotFound {
return nil, err
}
if err != nil && !errors.IsDoesNotExistError(err) {
return nil, err

Check warning on line 104 in flyteadmin/pkg/executioncluster/impl/random_cluster_selector.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/executioncluster/impl/random_cluster_selector.go#L104

Added line #L104 was not covered by tests
}

var weightedRandomList random.WeightedRandomList
Expand Down
15 changes: 5 additions & 10 deletions flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,8 @@
LaunchPlan: launchPlanName,
ResourceType: admin.MatchableResource_PLUGIN_OVERRIDE,
})
if err != nil {
ec, ok := err.(errors.FlyteAdminError)
if !ok || ec.Code() != codes.NotFound {
return nil, err
}
if err != nil && !errors.IsDoesNotExistError(err) {
return nil, err
}
if override != nil && override.Attributes != nil && override.Attributes.GetPluginOverrides() != nil {
return override.Attributes.GetPluginOverrides().Overrides, nil
Expand Down Expand Up @@ -427,11 +424,9 @@
Domain: request.Domain,
ResourceType: admin.MatchableResource_CLUSTER_ASSIGNMENT,
})
if err != nil {
if flyteAdminError, ok := err.(errors.FlyteAdminError); !ok || flyteAdminError.Code() != codes.NotFound {
logger.Errorf(ctx, "Failed to get cluster assignment overrides with error: %v", err)
return nil, err
}
if err != nil && !errors.IsDoesNotExistError(err) {
logger.Errorf(ctx, "Failed to get cluster assignment overrides with error: %v", err)
return nil, err

Check warning on line 429 in flyteadmin/pkg/manager/impl/execution_manager.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/manager/impl/execution_manager.go#L428-L429

Added lines #L428 - L429 were not covered by tests
}
if resource != nil && resource.Attributes.GetClusterAssignment() != nil {
return resource.Attributes.GetClusterAssignment(), nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
ResourceType: admin.MatchableResource_QUALITY_OF_SERVICE_SPECIFICATION,
})
if err != nil {
if _, ok := err.(errors.FlyteAdminError); !ok || err.(errors.FlyteAdminError).Code() != codes.NotFound {
if !errors.IsDoesNotExistError(err) {

Check warning on line 46 in flyteadmin/pkg/manager/impl/executions/quality_of_service.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/manager/impl/executions/quality_of_service.go#L46

Added line #L46 was not covered by tests
logger.Warningf(ctx,
"Failed to fetch override values when assigning quality of service values for [%+v] with err: %v",
workflowIdentifier, err)
Expand Down
3 changes: 2 additions & 1 deletion flyteadmin/pkg/manager/impl/executions/queues.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"math/rand"

"github.com/flyteorg/flyte/flyteadmin/pkg/errors"
hamersaw marked this conversation as resolved.
Show resolved Hide resolved
"github.com/flyteorg/flyte/flyteadmin/pkg/manager/impl/resources"
"github.com/flyteorg/flyte/flyteadmin/pkg/manager/interfaces"
repoInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/repositories/interfaces"
Expand Down Expand Up @@ -64,7 +65,7 @@ func (q *queueAllocatorImpl) GetQueue(ctx context.Context, identifier core.Ident
ResourceType: admin.MatchableResource_EXECUTION_QUEUE,
})

if err != nil {
if err != nil && !errors.IsDoesNotExistError(err) {
logger.Warningf(ctx, "Failed to fetch override values when assigning execution queue for [%+v] with err: %v",
identifier, err)
}
Expand Down
3 changes: 2 additions & 1 deletion flyteadmin/pkg/manager/impl/util/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"k8s.io/apimachinery/pkg/api/resource"

"github.com/flyteorg/flyte/flyteadmin/pkg/errors"
"github.com/flyteorg/flyte/flyteadmin/pkg/manager/interfaces"
runtimeInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces"
workflowengineInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/workflowengine/interfaces"
Expand Down Expand Up @@ -100,7 +101,7 @@ func GetTaskResources(ctx context.Context, id *core.Identifier, resourceManager
}

resource, err := resourceManager.GetResource(ctx, request)
if err != nil {
if err != nil && !errors.IsDoesNotExistError(err) {
logger.Infof(ctx, "Failed to fetch override values when assigning task resource default values for [%+v]: %v",
id, err)
}
Expand Down
10 changes: 4 additions & 6 deletions flyteadmin/pkg/manager/impl/util/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,12 +275,10 @@ func GetMatchableResource(ctx context.Context, resourceManager interfaces.Resour
Workflow: workflowName,
ResourceType: resourceType,
})
if err != nil {
if flyteAdminError, ok := err.(errors.FlyteAdminError); !ok || flyteAdminError.Code() != codes.NotFound {
logger.Errorf(ctx, "Failed to get %v overrides in %s project %s domain %s workflow with error: %v", resourceType,
project, domain, workflowName, err)
return nil, err
}
if err != nil && !errors.IsDoesNotExistError(err) {
logger.Errorf(ctx, "Failed to get %v overrides in %s project %s domain %s workflow with error: %v", resourceType,
project, domain, workflowName, err)
return nil, err
}
return matchableResource, nil
}
Expand Down
Loading