diff --git a/flyteadmin/go.mod b/flyteadmin/go.mod index ac74384250..b7d4364542 100644 --- a/flyteadmin/go.mod +++ b/flyteadmin/go.mod @@ -175,6 +175,8 @@ require ( github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/pretty v1.2.0 // indirect github.com/tidwall/sjson v1.2.5 // indirect + github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect + github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1 // indirect go.opentelemetry.io/otel/exporters/jaeger v1.17.0 // indirect diff --git a/flyteadmin/go.sum b/flyteadmin/go.sum index dba9da2e86..1e8e7cc367 100644 --- a/flyteadmin/go.sum +++ b/flyteadmin/go.sum @@ -1295,6 +1295,10 @@ github.com/unrolled/secure v0.0.0-20181005190816-ff9db2ff917f/go.mod h1:mnPT77IA github.com/urfave/negroni v1.0.0/go.mod h1:Meg73S6kFm/4PpbYdq35yYWoCZ9mS/YSx+lKnmiohz4= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8= +github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok= +github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= +github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= github.com/wI2L/jsondiff v0.5.0 h1:RRMTi/mH+R2aXcPe1VYyvGINJqQfC3R+KSEakuU1Ikw= github.com/wI2L/jsondiff v0.5.0/go.mod h1:qqG6hnK0Lsrz2BpIVCxWiK9ItsBCpIZQiv0izJjOZ9s= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= diff --git a/flytepropeller/pkg/controller/nodes/attr_path_resolver.go b/flytepropeller/pkg/controller/nodes/attr_path_resolver.go index 204b0d7190..cec6c35dce 100644 --- a/flytepropeller/pkg/controller/nodes/attr_path_resolver.go +++ b/flytepropeller/pkg/controller/nodes/attr_path_resolver.go @@ -86,7 +86,7 @@ func resolveAttrPathInPbStruct(nodeID string, st *structpb.Struct, bindAttrPath } // After resolve, convert the interface to literal - literal, err := convertInterfaceToLiteral(nodeID, currVal) + literal, err := convertInterfaceToLiteral(nodeID, currVal, false) return literal, err } @@ -136,7 +136,7 @@ func resolveAttrPathInJson(nodeID string, json_byte []byte, bindAttrPath []*core } // After resolve, convert the interface to literal - literal, err := convertInterfaceToLiteral(nodeID, currVal) + literal, err := convertInterfaceToLiteral(nodeID, currVal, true) return literal, err } @@ -168,34 +168,48 @@ func convertNumbers(v interface{}) interface{} { } // convertInterfaceToLiteral converts the protobuf struct (e.g. dataclass) to literal -func convertInterfaceToLiteral(nodeID string, obj interface{}) (*core.Literal, error) { +func convertInterfaceToLiteral(nodeID string, obj interface{}, isJson bool) (*core.Literal, error) { literal := &core.Literal{} switch obj := obj.(type) { case map[string]interface{}: - jsonBytes, err := json.Marshal(obj) - if err != nil { - return nil, err - } - jsonBytes, err = msgpack.Marshal(jsonBytes) - if err != nil { - return nil, err - } - literal.Value = &core.Literal_Scalar{ - Scalar: &core.Scalar{ - Value: &core.Scalar_Json{ - Json: &core.Json{ - Value: jsonBytes, + if isJson { + jsonBytes, err := json.Marshal(obj) + if err != nil { + return nil, err + } + jsonBytes, err = msgpack.Marshal(jsonBytes) + if err != nil { + return nil, err + } + literal.Value = &core.Literal_Scalar{ + Scalar: &core.Scalar{ + Value: &core.Scalar_Json{ + Json: &core.Json{ + Value: jsonBytes, + }, }, }, - }, + } + } else { + newSt, err := structpb.NewStruct(obj) + if err != nil { + return nil, err + } + literal.Value = &core.Literal_Scalar{ + Scalar: &core.Scalar{ + Value: &core.Scalar_Generic{ + Generic: newSt, + }, + }, + } } case []interface{}: literals := []*core.Literal{} for _, v := range obj { // recursively convert the interface to literal - literal, err := convertInterfaceToLiteral(nodeID, v) + literal, err := convertInterfaceToLiteral(nodeID, v, isJson) if err != nil { return nil, err }