Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flyteadmin] Show diff structure when re-registration two different task with same ids #4924

Merged
merged 13 commits into from
Mar 27, 2024
5 changes: 5 additions & 0 deletions flyteadmin/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ require (
github.com/spf13/cobra v1.7.0
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.4
github.com/wI2L/jsondiff v0.5.0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1
go.opentelemetry.io/otel v1.21.0
golang.org/x/oauth2 v0.16.0
Expand Down Expand Up @@ -166,6 +167,10 @@ require (
github.com/spf13/viper v1.11.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/subosito/gotenv v1.2.0 // indirect
github.com/tidwall/gjson v1.17.0 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
github.com/tidwall/sjson v1.2.5 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1 // indirect
go.opentelemetry.io/otel/exporters/jaeger v1.17.0 // indirect
Expand Down
11 changes: 11 additions & 0 deletions flyteadmin/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1265,13 +1265,22 @@ github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69
github.com/tidwall/gjson v1.3.2/go.mod h1:P256ACg0Mn+j1RXIDXoss50DeIABTYK1PULOJHhxOls=
github.com/tidwall/gjson v1.6.8/go.mod h1:zeFuBCIqD4sN/gmqBzZ4j7Jd6UcA2Fc56x7QFsv+8fI=
github.com/tidwall/gjson v1.7.1/go.mod h1:5/xDoumyyDNerp2U36lyolv46b3uF/9Bu6OfyQ9GImk=
github.com/tidwall/gjson v1.14.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/gjson v1.17.0 h1:/Jocvlh98kcTfpN2+JzGQWQcqrPQwDrVEMApx/M5ZwM=
github.com/tidwall/gjson v1.17.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E=
github.com/tidwall/match v1.0.3/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tidwall/pretty v1.0.2/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tidwall/pretty v1.1.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tidwall/sjson v1.0.4/go.mod h1:bURseu1nuBkFpIES5cz6zBtjmYeOQmEESshn7VpF15Y=
github.com/tidwall/sjson v1.1.5/go.mod h1:VuJzsZnTowhSxWdOgsAnb886i4AjEyTkk7tNtsL7EYE=
github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY=
github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28=
github.com/tinylib/msgp v1.1.2/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/uber-go/atomic v1.3.2/go.mod h1:/Ct5t2lcmbJ4OSe/waGBoaVvVqtO0bmtfVNex1PFV8g=
Expand All @@ -1288,6 +1297,8 @@ github.com/unrolled/secure v0.0.0-20181005190816-ff9db2ff917f/go.mod h1:mnPT77IA
github.com/urfave/negroni v1.0.0/go.mod h1:Meg73S6kFm/4PpbYdq35yYWoCZ9mS/YSx+lKnmiohz4=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/wI2L/jsondiff v0.5.0 h1:RRMTi/mH+R2aXcPe1VYyvGINJqQfC3R+KSEakuU1Ikw=
github.com/wI2L/jsondiff v0.5.0/go.mod h1:qqG6hnK0Lsrz2BpIVCxWiK9ItsBCpIZQiv0izJjOZ9s=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v0.0.0-20180714160509-73f8eece6fdc/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
Expand Down
65 changes: 61 additions & 4 deletions flyteadmin/pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
"strings"

"github.com/golang/protobuf/proto"
"github.com/wI2L/jsondiff"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytestdlib/logger"
)

Expand Down Expand Up @@ -109,8 +111,47 @@
return statusErr
}

func NewWorkflowExistsDifferentStructureError(ctx context.Context, request *admin.WorkflowCreateRequest) FlyteAdminError {
errorMsg := "workflow with different structure already exists"
func compareJsons(jsonArray1 jsondiff.Patch, jsonArray2 jsondiff.Patch) []string {
results := []string{}
map1 := make(map[string]jsondiff.Operation)
for _, obj := range jsonArray1 {
map1[obj.Path] = obj
}

for _, obj := range jsonArray2 {
if val, ok := map1[obj.Path]; ok {
result := fmt.Sprintf("\t\t- %v: %v -> %v", obj.Path, obj.Value, val.Value)
results = append(results, result)
}
}
return results
}

func NewTaskExistsDifferentStructureError(ctx context.Context, request *admin.TaskCreateRequest, oldSpec *core.CompiledTask, newSpec *core.CompiledTask) FlyteAdminError {
errorMsg := "task with different structure already exists:\n"
diff, _ := jsondiff.Compare(oldSpec, newSpec)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since in golang map ordering is random/never guaranteed, I'm worried that converting the specs to bytes here might lead to non deterministic byte arrays

when we compute the digest, we use pbhash to marshal the protos to json

should we do the same here before calling Compare()?

alternatively, do you mind double checking that jsondiff.Compare() returns no diff for multiple calls comparing 2 identical pb objects that contain maps in the message?

Copy link
Contributor Author

@austin362667 austin362667 Mar 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can!
I've carefully double checked that jsondiff.Compare() is an identical operation w.r.t determinism. Also added the test for this function.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @austin362667 for the super fast turnaround!

in this case, i was more concerned about identical maps not showing a false diff - would you mind double checking that? otherwise PR looks great!!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

rdiff, _ := jsondiff.Compare(newSpec, oldSpec)
rs := compareJsons(diff, rdiff)

errorMsg += strings.Join(rs, "\n")

return NewFlyteAdminErrorf(codes.InvalidArgument, errorMsg)

}

func NewTaskExistsIdenticalStructureError(ctx context.Context, request *admin.TaskCreateRequest) FlyteAdminError {
errorMsg := "task with identical structure already exists"
return NewFlyteAdminErrorf(codes.AlreadyExists, errorMsg)
}

func NewWorkflowExistsDifferentStructureError(ctx context.Context, request *admin.WorkflowCreateRequest, oldSpec *core.CompiledWorkflowClosure, newSpec *core.CompiledWorkflowClosure) FlyteAdminError {
errorMsg := "workflow with different structure already exists:\n"
diff, _ := jsondiff.Compare(oldSpec, newSpec)
rdiff, _ := jsondiff.Compare(newSpec, oldSpec)
rs := compareJsons(diff, rdiff)

errorMsg += strings.Join(rs, "\n")

statusErr, transformationErr := NewFlyteAdminError(codes.InvalidArgument, errorMsg).WithDetails(&admin.CreateWorkflowFailureReason{
Reason: &admin.CreateWorkflowFailureReason_ExistsDifferentStructure{
ExistsDifferentStructure: &admin.WorkflowErrorExistsDifferentStructure{
Expand All @@ -119,7 +160,7 @@
},
})
if transformationErr != nil {
logger.Panicf(ctx, "Failed to wrap grpc status in type 'Error': %v", transformationErr)
logger.Errorf(ctx, "Failed to wrap grpc status in type 'Error': %v", transformationErr)

Check warning on line 163 in flyteadmin/pkg/errors/errors.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/errors/errors.go#L163

Added line #L163 was not covered by tests
return NewFlyteAdminErrorf(codes.InvalidArgument, errorMsg)
}
return statusErr
Expand All @@ -135,12 +176,28 @@
},
})
if transformationErr != nil {
logger.Panicf(ctx, "Failed to wrap grpc status in type 'Error': %v", transformationErr)
logger.Errorf(ctx, "Failed to wrap grpc status in type 'Error': %v", transformationErr)

Check warning on line 179 in flyteadmin/pkg/errors/errors.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/errors/errors.go#L179

Added line #L179 was not covered by tests
return NewFlyteAdminErrorf(codes.AlreadyExists, errorMsg)
}
return statusErr
}

func NewLaunchPlanExistsDifferentStructureError(ctx context.Context, request *admin.LaunchPlanCreateRequest, oldSpec *admin.LaunchPlanSpec, newSpec *admin.LaunchPlanSpec) FlyteAdminError {
errorMsg := "launch plan with different structure already exists:\n"
diff, _ := jsondiff.Compare(oldSpec, newSpec)
rdiff, _ := jsondiff.Compare(newSpec, oldSpec)
rs := compareJsons(diff, rdiff)

errorMsg += strings.Join(rs, "\n")

return NewFlyteAdminErrorf(codes.InvalidArgument, errorMsg)
}

func NewLaunchPlanExistsIdenticalStructureError(ctx context.Context, request *admin.LaunchPlanCreateRequest) FlyteAdminError {
errorMsg := "launch plan with identical structure already exists"
return NewFlyteAdminErrorf(codes.AlreadyExists, errorMsg)
}

func IsDoesNotExistError(err error) bool {
adminError, ok := err.(FlyteAdminError)
return ok && adminError.Code() == codes.NotFound
Expand Down
Loading
Loading