Skip to content

Commit

Permalink
[flyteadmin] Show diff structure when re-registration two different t…
Browse files Browse the repository at this point in the history
…ask with same ids (#4924)

* add json diff module

Signed-off-by: Austin Liu <[email protected]>

* add CreateTaskFailureReason

Signed-off-by: Austin Liu <[email protected]>

* update task proto buff gen code

Signed-off-by: Austin Liu <[email protected]>

* add task diff DifferentStructureError msg

Signed-off-by: Austin Liu <[email protected]>

* prettify json error message & rollback idl change

Signed-off-by: Austin Liu <[email protected]>

* refactor to compiledWorkflow

Signed-off-by: Austin Liu <[email protected]>

* clean up

Signed-off-by: Austin Liu <[email protected]>

* add create task errors test

Signed-off-by: Austin Liu <[email protected]>

format

Signed-off-by: Austin Liu <[email protected]>

* cleanup error & test

Signed-off-by: Austin Liu <[email protected]>

* rollback proto change

Signed-off-by: Austin Liu <[email protected]>

rollback proto change

Signed-off-by: Austin Liu <[email protected]>

* add launchplan

Signed-off-by: Austin Liu <[email protected]>

* cleanup

Signed-off-by: Austin Liu <[email protected]>

* test: add no diff case

Signed-off-by: Austin Liu <[email protected]>

---------

Signed-off-by: Austin Liu <[email protected]>
  • Loading branch information
austin362667 authored Mar 27, 2024
1 parent ec0bc4c commit a5e2733
Show file tree
Hide file tree
Showing 10 changed files with 338 additions and 39 deletions.
5 changes: 5 additions & 0 deletions flyteadmin/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ require (
github.com/spf13/cobra v1.7.0
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.4
github.com/wI2L/jsondiff v0.5.0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1
go.opentelemetry.io/otel v1.21.0
golang.org/x/oauth2 v0.16.0
Expand Down Expand Up @@ -166,6 +167,10 @@ require (
github.com/spf13/viper v1.11.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/subosito/gotenv v1.2.0 // indirect
github.com/tidwall/gjson v1.17.0 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
github.com/tidwall/sjson v1.2.5 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1 // indirect
go.opentelemetry.io/otel/exporters/jaeger v1.17.0 // indirect
Expand Down
11 changes: 11 additions & 0 deletions flyteadmin/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1265,13 +1265,22 @@ github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69
github.com/tidwall/gjson v1.3.2/go.mod h1:P256ACg0Mn+j1RXIDXoss50DeIABTYK1PULOJHhxOls=
github.com/tidwall/gjson v1.6.8/go.mod h1:zeFuBCIqD4sN/gmqBzZ4j7Jd6UcA2Fc56x7QFsv+8fI=
github.com/tidwall/gjson v1.7.1/go.mod h1:5/xDoumyyDNerp2U36lyolv46b3uF/9Bu6OfyQ9GImk=
github.com/tidwall/gjson v1.14.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/gjson v1.17.0 h1:/Jocvlh98kcTfpN2+JzGQWQcqrPQwDrVEMApx/M5ZwM=
github.com/tidwall/gjson v1.17.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E=
github.com/tidwall/match v1.0.3/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tidwall/pretty v1.0.2/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tidwall/pretty v1.1.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tidwall/sjson v1.0.4/go.mod h1:bURseu1nuBkFpIES5cz6zBtjmYeOQmEESshn7VpF15Y=
github.com/tidwall/sjson v1.1.5/go.mod h1:VuJzsZnTowhSxWdOgsAnb886i4AjEyTkk7tNtsL7EYE=
github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY=
github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28=
github.com/tinylib/msgp v1.1.2/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/uber-go/atomic v1.3.2/go.mod h1:/Ct5t2lcmbJ4OSe/waGBoaVvVqtO0bmtfVNex1PFV8g=
Expand All @@ -1288,6 +1297,8 @@ github.com/unrolled/secure v0.0.0-20181005190816-ff9db2ff917f/go.mod h1:mnPT77IA
github.com/urfave/negroni v1.0.0/go.mod h1:Meg73S6kFm/4PpbYdq35yYWoCZ9mS/YSx+lKnmiohz4=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/wI2L/jsondiff v0.5.0 h1:RRMTi/mH+R2aXcPe1VYyvGINJqQfC3R+KSEakuU1Ikw=
github.com/wI2L/jsondiff v0.5.0/go.mod h1:qqG6hnK0Lsrz2BpIVCxWiK9ItsBCpIZQiv0izJjOZ9s=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v0.0.0-20180714160509-73f8eece6fdc/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
Expand Down
65 changes: 61 additions & 4 deletions flyteadmin/pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ import (
"strings"

"github.com/golang/protobuf/proto"
"github.com/wI2L/jsondiff"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytestdlib/logger"
)

Expand Down Expand Up @@ -109,8 +111,47 @@ func NewIncompatibleClusterError(ctx context.Context, errorMsg, curCluster strin
return statusErr
}

func NewWorkflowExistsDifferentStructureError(ctx context.Context, request *admin.WorkflowCreateRequest) FlyteAdminError {
errorMsg := "workflow with different structure already exists"
func compareJsons(jsonArray1 jsondiff.Patch, jsonArray2 jsondiff.Patch) []string {
results := []string{}
map1 := make(map[string]jsondiff.Operation)
for _, obj := range jsonArray1 {
map1[obj.Path] = obj
}

for _, obj := range jsonArray2 {
if val, ok := map1[obj.Path]; ok {
result := fmt.Sprintf("\t\t- %v: %v -> %v", obj.Path, obj.Value, val.Value)
results = append(results, result)
}
}
return results
}

func NewTaskExistsDifferentStructureError(ctx context.Context, request *admin.TaskCreateRequest, oldSpec *core.CompiledTask, newSpec *core.CompiledTask) FlyteAdminError {
errorMsg := "task with different structure already exists:\n"
diff, _ := jsondiff.Compare(oldSpec, newSpec)
rdiff, _ := jsondiff.Compare(newSpec, oldSpec)
rs := compareJsons(diff, rdiff)

errorMsg += strings.Join(rs, "\n")

return NewFlyteAdminErrorf(codes.InvalidArgument, errorMsg)

}

func NewTaskExistsIdenticalStructureError(ctx context.Context, request *admin.TaskCreateRequest) FlyteAdminError {
errorMsg := "task with identical structure already exists"
return NewFlyteAdminErrorf(codes.AlreadyExists, errorMsg)
}

func NewWorkflowExistsDifferentStructureError(ctx context.Context, request *admin.WorkflowCreateRequest, oldSpec *core.CompiledWorkflowClosure, newSpec *core.CompiledWorkflowClosure) FlyteAdminError {
errorMsg := "workflow with different structure already exists:\n"
diff, _ := jsondiff.Compare(oldSpec, newSpec)
rdiff, _ := jsondiff.Compare(newSpec, oldSpec)
rs := compareJsons(diff, rdiff)

errorMsg += strings.Join(rs, "\n")

statusErr, transformationErr := NewFlyteAdminError(codes.InvalidArgument, errorMsg).WithDetails(&admin.CreateWorkflowFailureReason{
Reason: &admin.CreateWorkflowFailureReason_ExistsDifferentStructure{
ExistsDifferentStructure: &admin.WorkflowErrorExistsDifferentStructure{
Expand All @@ -119,7 +160,7 @@ func NewWorkflowExistsDifferentStructureError(ctx context.Context, request *admi
},
})
if transformationErr != nil {
logger.Panicf(ctx, "Failed to wrap grpc status in type 'Error': %v", transformationErr)
logger.Errorf(ctx, "Failed to wrap grpc status in type 'Error': %v", transformationErr)
return NewFlyteAdminErrorf(codes.InvalidArgument, errorMsg)
}
return statusErr
Expand All @@ -135,12 +176,28 @@ func NewWorkflowExistsIdenticalStructureError(ctx context.Context, request *admi
},
})
if transformationErr != nil {
logger.Panicf(ctx, "Failed to wrap grpc status in type 'Error': %v", transformationErr)
logger.Errorf(ctx, "Failed to wrap grpc status in type 'Error': %v", transformationErr)
return NewFlyteAdminErrorf(codes.AlreadyExists, errorMsg)
}
return statusErr
}

func NewLaunchPlanExistsDifferentStructureError(ctx context.Context, request *admin.LaunchPlanCreateRequest, oldSpec *admin.LaunchPlanSpec, newSpec *admin.LaunchPlanSpec) FlyteAdminError {
errorMsg := "launch plan with different structure already exists:\n"
diff, _ := jsondiff.Compare(oldSpec, newSpec)
rdiff, _ := jsondiff.Compare(newSpec, oldSpec)
rs := compareJsons(diff, rdiff)

errorMsg += strings.Join(rs, "\n")

return NewFlyteAdminErrorf(codes.InvalidArgument, errorMsg)
}

func NewLaunchPlanExistsIdenticalStructureError(ctx context.Context, request *admin.LaunchPlanCreateRequest) FlyteAdminError {
errorMsg := "launch plan with identical structure already exists"
return NewFlyteAdminErrorf(codes.AlreadyExists, errorMsg)
}

func IsDoesNotExistError(err error) bool {
adminError, ok := err.(FlyteAdminError)
return ok && adminError.Code() == codes.NotFound
Expand Down
Loading

0 comments on commit a5e2733

Please sign in to comment.