diff --git a/flyteadmin/pkg/errors/errors.go b/flyteadmin/pkg/errors/errors.go index 5fc48b0b67..9c3bd14374 100644 --- a/flyteadmin/pkg/errors/errors.go +++ b/flyteadmin/pkg/errors/errors.go @@ -2,14 +2,17 @@ package errors import ( + "bytes" "context" "fmt" - "strings" - + "github.com/flyteorg/flyte/flytestdlib/pbhash" "github.com/golang/protobuf/proto" + "github.com/shamaton/msgpack/v2" "github.com/wI2L/jsondiff" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "reflect" + "strings" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" @@ -181,7 +184,147 @@ func NewWorkflowExistsIdenticalStructureError(ctx context.Context, request *admi return statusErr } +func IsSameDCFormat(oldSpec *admin.LaunchPlanSpec, newSpec *admin.LaunchPlanSpec) bool { + oldParams := oldSpec.GetDefaultInputs().GetParameters() + newParams := newSpec.GetDefaultInputs().GetParameters() + + // Step 1: Check if both maps have the same keys + if len(oldParams) != len(newParams) { + return false + } + for key := range oldParams { + if _, exists := newParams[key]; !exists { + return false + } + } + + // Step 2: Compare the corresponding values + for key, oldParam := range oldParams { + newParam := newParams[key] + + // Compare Parameter values + if !parametersAreEqual(oldParam, newParam) { + return false + } + } + + return true +} + +func parametersAreEqual(oldParam, newParam *core.Parameter) bool { + oldDefault := oldParam.GetDefault() + newDefault := newParam.GetDefault() + + // If both defaults are nil, they are equal + if oldDefault == nil && newDefault == nil { + return true + } + + // If one is nil and the other is not, they are not equal + if (oldDefault == nil) != (newDefault == nil) { + return false + } + + // Step 2.1: Use pbhash to compare the two values + oldHash, err1 := pbhash.ComputeHash(context.Background(), oldDefault) + newHash, err2 := pbhash.ComputeHash(context.Background(), newDefault) + if err1 != nil || err2 != nil { + return false + } + if bytes.Equal(oldHash, newHash) { + return true + } + + // Step 2.2: Check for Scalar.Generic and Scalar.Binary cases + oldScalar := oldDefault.GetScalar() + newScalar := newDefault.GetScalar() + + // If both scalars are nil, they are not equal + if oldScalar == nil && newScalar == nil { + return false + } + + // Check if one is Scalar.Generic and the other is Scalar.Binary + if isGenericScalar(oldScalar) && isBinaryScalar(newScalar) { + decodedNew, err := decodeBinaryLiteral(newScalar.GetBinary().GetValue()) + if err != nil { + return false + } + stMap := oldScalar.GetGeneric().AsMap() + return deepEqualMaps(stMap, decodedNew) + } + + if isBinaryScalar(oldScalar) && isGenericScalar(newScalar) { + decodedOld, err := decodeBinaryLiteral(oldScalar.GetBinary().GetValue()) + if err != nil { + return false + } + stMap := newScalar.GetGeneric().AsMap() + return deepEqualMaps(decodedOld, stMap) + } + + // If neither special case applies, they are not equal + return false +} + +func isBinaryScalar(scalar *core.Scalar) bool { + return scalar != nil && scalar.GetBinary() != nil +} + +func isGenericScalar(scalar *core.Scalar) bool { + return scalar != nil && scalar.GetGeneric() != nil +} + +func decodeBinaryLiteral(binaryData []byte) (interface{}, error) { + if binaryData == nil { + return nil, fmt.Errorf("binary data is nil") + } + + // Declare 'decoded' as an empty interface to hold any type + var decoded interface{} + err := msgpack.Unmarshal(binaryData, &decoded) + if err != nil { + return nil, err + } + return decoded, nil +} + +func deepEqualMaps(a, b interface{}) bool { + normalizedA := normalizeMapKeys(a) + normalizedB := normalizeMapKeys(b) + return reflect.DeepEqual(normalizedA, normalizedB) +} + +func normalizeMapKeys(data interface{}) interface{} { + switch v := data.(type) { + case map[interface{}]interface{}: + m := make(map[string]interface{}) + for key, value := range v { + keyStr := fmt.Sprintf("%v", key) + m[keyStr] = normalizeMapKeys(value) + } + return m + case map[string]interface{}: + m := make(map[string]interface{}) + for key, value := range v { + m[key] = normalizeMapKeys(value) + } + return m + case []interface{}: + for i, value := range v { + v[i] = normalizeMapKeys(value) + } + return v + default: + return v + } +} + func NewLaunchPlanExistsDifferentStructureError(ctx context.Context, request *admin.LaunchPlanCreateRequest, oldSpec *admin.LaunchPlanSpec, newSpec *admin.LaunchPlanSpec) FlyteAdminError { + if IsSameDCFormat(oldSpec, newSpec) { + return NewLaunchPlanExistsIdenticalStructureError(ctx, request) + } + errorMsg := fmt.Sprintf("%v launch plan with different structure already exists. (Please register a new version of the launch plan):\n", request.Id.Name) diff, _ := jsondiff.Compare(oldSpec, newSpec) rdiff, _ := jsondiff.Compare(newSpec, oldSpec)