Skip to content

Commit

Permalink
[Flyte][3][flytepropeller][Attribute Access][flytectl] Binary IDL Wit…
Browse files Browse the repository at this point in the history
…h MessagePack (flyteorg#5763)
  • Loading branch information
Future-Outlier authored Oct 5, 2024
1 parent e3d99b5 commit 9abfbda
Show file tree
Hide file tree
Showing 18 changed files with 1,454 additions and 71 deletions.
1 change: 1 addition & 0 deletions flyteadmin/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ require (
github.com/prometheus/procfs v0.10.1 // indirect
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 // indirect
github.com/sendgrid/rest v2.6.9+incompatible // indirect
github.com/shamaton/msgpack/v2 v2.2.2 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/spf13/afero v1.8.2 // indirect
github.com/spf13/cast v1.4.1 // indirect
Expand Down
1 change: 1 addition & 0 deletions flytecopilot/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ require (
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.10.1 // indirect
github.com/shamaton/msgpack/v2 v2.2.2 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/spf13/afero v1.8.2 // indirect
github.com/spf13/cast v1.4.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions flytecopilot/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,8 @@ github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/shamaton/msgpack/v2 v2.2.2 h1:GOIg0c9LV04VwzOOqZSrmsv/JzjNOOMxnS/HvOHGdgs=
github.com/shamaton/msgpack/v2 v2.2.2/go.mod h1:6khjYnkx73f7VQU7wjcFS9DFjs+59naVWJv1TB7qdOI=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ=
Expand Down
1 change: 1 addition & 0 deletions flytectl/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ require (
github.com/prometheus/procfs v0.10.1 // indirect
github.com/rivo/uniseg v0.4.7 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/shamaton/msgpack/v2 v2.2.2 // indirect
github.com/spf13/afero v1.9.2 // indirect
github.com/spf13/cast v1.4.1 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions flytectl/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,8 @@ github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/shamaton/msgpack/v2 v2.2.2 h1:GOIg0c9LV04VwzOOqZSrmsv/JzjNOOMxnS/HvOHGdgs=
github.com/shamaton/msgpack/v2 v2.2.2/go.mod h1:6khjYnkx73f7VQU7wjcFS9DFjs+59naVWJv1TB7qdOI=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
Expand Down
2 changes: 2 additions & 0 deletions flyteidl/clients/go/coreutils/extract_literal.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ func ExtractFromLiteral(literal *core.Literal) (interface{}, error) {
default:
return nil, fmt.Errorf("unsupported literal scalar primitive type %T", scalarValue)
}
case *core.Scalar_Binary:
return scalarValue.Binary, nil
case *core.Scalar_Blob:
return scalarValue.Blob.Uri, nil
case *core.Scalar_Schema:
Expand Down
30 changes: 1 addition & 29 deletions flyteidl/clients/go/coreutils/extract_literal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func TestFetchLiteral(t *testing.T) {
s := MakeBinaryLiteral([]byte{'h'})
assert.Equal(t, []byte{'h'}, s.GetScalar().GetBinary().GetValue())
_, err := ExtractFromLiteral(s)
assert.NotNil(t, err)
assert.Nil(t, err)
})

t.Run("NoneType", func(t *testing.T) {
Expand All @@ -124,34 +124,6 @@ func TestFetchLiteral(t *testing.T) {
assert.Nil(t, err)
})

t.Run("Generic", func(t *testing.T) {
literalVal := map[string]interface{}{
"x": 1,
"y": "ystringvalue",
}
var literalType = &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_STRUCT}}
lit, err := MakeLiteralForType(literalType, literalVal)
assert.NoError(t, err)
extractedLiteralVal, err := ExtractFromLiteral(lit)
assert.NoError(t, err)
fieldsMap := map[string]*structpb.Value{
"x": {
Kind: &structpb.Value_NumberValue{NumberValue: 1},
},
"y": {
Kind: &structpb.Value_StringValue{StringValue: "ystringvalue"},
},
}
expectedStructVal := &structpb.Struct{
Fields: fieldsMap,
}
extractedStructValue := extractedLiteralVal.(*structpb.Struct)
assert.Equal(t, len(expectedStructVal.Fields), len(extractedStructValue.Fields))
for key, val := range expectedStructVal.Fields {
assert.Equal(t, val.Kind, extractedStructValue.Fields[key].Kind)
}
})

t.Run("Generic Passed As String", func(t *testing.T) {
literalVal := "{\"x\": 1,\"y\": \"ystringvalue\"}"
var literalType = &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_STRUCT}}
Expand Down
34 changes: 30 additions & 4 deletions flyteidl/clients/go/coreutils/literals.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
package coreutils

import (
"encoding/json"
"fmt"
"math"
"reflect"
Expand All @@ -14,11 +13,14 @@ import (
"github.com/golang/protobuf/ptypes"
structpb "github.com/golang/protobuf/ptypes/struct"
"github.com/pkg/errors"
"github.com/shamaton/msgpack/v2"

"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytestdlib/storage"
)

const MESSAGEPACK = "msgpack"

func MakePrimitive(v interface{}) (*core.Primitive, error) {
switch p := v.(type) {
case int:
Expand Down Expand Up @@ -144,6 +146,7 @@ func MakeBinaryLiteral(v []byte) *core.Literal {
Value: &core.Scalar_Binary{
Binary: &core.Binary{
Value: v,
Tag: MESSAGEPACK,
},
},
},
Expand Down Expand Up @@ -389,7 +392,7 @@ func MakeLiteralForSimpleType(t core.SimpleType, s string) (*core.Literal, error
scalar.Value = &core.Scalar_Binary{
Binary: &core.Binary{
Value: []byte(s),
// TODO Tag not supported at the moment
Tag: MESSAGEPACK,
},
}
case core.SimpleType_ERROR:
Expand Down Expand Up @@ -559,12 +562,35 @@ func MakeLiteralForType(t *core.LiteralType, v interface{}) (*core.Literal, erro
strValue = fmt.Sprintf("%.0f", math.Trunc(f))
}
if newT.Simple == core.SimpleType_STRUCT {
// If the type is a STRUCT, we expect the input to be a complex object
// like the following example:
// inputs:
// dc:
// a: 1
// b: 3.14
// c: "example_string"
// Instead of storing it directly as a structured value, we will serialize
// the input object using MsgPack and return it as a binary IDL object.

// If the value is not already a string (meaning it's not already serialized),
// proceed with serialization.
if _, isValueStringType := v.(string); !isValueStringType {
byteValue, err := json.Marshal(v)
byteValue, err := msgpack.Marshal(v)
if err != nil {
return nil, fmt.Errorf("unable to marshal to json string for struct value %v", v)
}
strValue = string(byteValue)
return &core.Literal{
Value: &core.Literal_Scalar{
Scalar: &core.Scalar{
Value: &core.Scalar_Binary{
Binary: &core.Binary{
Value: byteValue,
Tag: MESSAGEPACK,
},
},
},
},
}, nil
}
}
lv, err := MakeLiteralForSimpleType(newT.Simple, strValue)
Expand Down
73 changes: 73 additions & 0 deletions flyteidl/clients/go/coreutils/literals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/golang/protobuf/ptypes"
structpb "github.com/golang/protobuf/ptypes/struct"
"github.com/pkg/errors"
"github.com/shamaton/msgpack/v2"
"github.com/stretchr/testify/assert"

"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
Expand Down Expand Up @@ -242,6 +243,16 @@ func TestMakeDefaultLiteralForType(t *testing.T) {
assert.NotNil(t, l.GetScalar().GetError())
})

t.Run("binary", func(t *testing.T) {
l, err := MakeDefaultLiteralForType(&core.LiteralType{Type: &core.LiteralType_Simple{
Simple: core.SimpleType_BINARY,
}})
assert.NoError(t, err)
assert.NotNil(t, l.GetScalar().GetBinary())
assert.NotNil(t, l.GetScalar().GetBinary().GetValue())
assert.NotNil(t, l.GetScalar().GetBinary().GetTag())
})

t.Run("struct", func(t *testing.T) {
l, err := MakeDefaultLiteralForType(&core.LiteralType{Type: &core.LiteralType_Simple{
Simple: core.SimpleType_STRUCT,
Expand Down Expand Up @@ -444,6 +455,68 @@ func TestMakeLiteralForType(t *testing.T) {
assert.Equal(t, expectedVal, actualVal)
})

t.Run("SimpleBinary", func(t *testing.T) {
// We compare the deserialized values instead of the raw msgpack bytes because Go does not guarantee the order
// of map keys during serialization. This means that while the serialized bytes may differ, the deserialized
// values should be logically equivalent.

var literalType = &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_STRUCT}}
v := map[string]interface{}{
"a": int64(1),
"b": 3.14,
"c": "example_string",
"d": map[string]interface{}{
"1": int64(100),
"2": int64(200),
},
"e": map[string]interface{}{
"a": int64(1),
"b": 3.14,
},
"f": []string{"a", "b", "c"},
}

val, err := MakeLiteralForType(literalType, v)
assert.NoError(t, err)

msgpackBytes, err := msgpack.Marshal(v)
assert.NoError(t, err)

literalVal := &core.Literal{
Value: &core.Literal_Scalar{
Scalar: &core.Scalar{
Value: &core.Scalar_Binary{
Binary: &core.Binary{
Value: msgpackBytes,
Tag: MESSAGEPACK,
},
},
},
},
}

expectedLiteralVal, err := ExtractFromLiteral(literalVal)
assert.NoError(t, err)
actualLiteralVal, err := ExtractFromLiteral(val)
assert.NoError(t, err)

// Check if the extracted value is of type *core.Binary (not []byte)
expectedBinary, ok := expectedLiteralVal.(*core.Binary)
assert.True(t, ok, "expectedLiteralVal is not of type *core.Binary")
actualBinary, ok := actualLiteralVal.(*core.Binary)
assert.True(t, ok, "actualLiteralVal is not of type *core.Binary")

// Now check if the Binary values match
var expectedVal, actualVal map[string]interface{}
err = msgpack.Unmarshal(expectedBinary.Value, &expectedVal)
assert.NoError(t, err)
err = msgpack.Unmarshal(actualBinary.Value, &actualVal)
assert.NoError(t, err)

// Finally, assert that the deserialized values are equal
assert.Equal(t, expectedVal, actualVal)
})

t.Run("ArrayStrings", func(t *testing.T) {
var literalType = &core.LiteralType{Type: &core.LiteralType_CollectionType{
CollectionType: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_STRING}}}}
Expand Down
1 change: 1 addition & 0 deletions flyteidl/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/mitchellh/mapstructure v1.5.0
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c
github.com/pkg/errors v0.9.1
github.com/shamaton/msgpack/v2 v2.2.2
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.9.0
golang.org/x/net v0.27.0
Expand Down
2 changes: 2 additions & 0 deletions flyteidl/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ github.com/prometheus/procfs v0.10.1/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPH
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/shamaton/msgpack/v2 v2.2.2 h1:GOIg0c9LV04VwzOOqZSrmsv/JzjNOOMxnS/HvOHGdgs=
github.com/shamaton/msgpack/v2 v2.2.2/go.mod h1:6khjYnkx73f7VQU7wjcFS9DFjs+59naVWJv1TB7qdOI=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
Expand Down
1 change: 1 addition & 0 deletions flyteplugins/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ require (
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.10.1 // indirect
github.com/shamaton/msgpack/v2 v2.2.2 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/spf13/afero v1.8.2 // indirect
github.com/spf13/cast v1.4.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions flyteplugins/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,8 @@ github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/shamaton/msgpack/v2 v2.2.2 h1:GOIg0c9LV04VwzOOqZSrmsv/JzjNOOMxnS/HvOHGdgs=
github.com/shamaton/msgpack/v2 v2.2.2/go.mod h1:6khjYnkx73f7VQU7wjcFS9DFjs+59naVWJv1TB7qdOI=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ=
Expand Down
3 changes: 2 additions & 1 deletion flytepropeller/pkg/compiler/validators/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"golang.org/x/exp/slices"
"k8s.io/apimachinery/pkg/util/sets"

"github.com/flyteorg/flyte/flyteidl/clients/go/coreutils"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
)

Expand Down Expand Up @@ -47,7 +48,7 @@ func literalTypeForScalar(scalar *core.Scalar) *core.LiteralType {
// If the binary has a tag, treat it as a structured type (e.g., dict, dataclass, Pydantic BaseModel).
// Otherwise, treat it as raw binary data.
// Reference: https://github.com/flyteorg/flyte/blob/master/rfc/system/5741-binary-idl-with-message-pack.md
if len(v.Binary.Tag) > 0 {
if v.Binary.Tag == coreutils.MESSAGEPACK {
literalType = &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_STRUCT}}
} else {
literalType = &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_BINARY}}
Expand Down
4 changes: 2 additions & 2 deletions flytepropeller/pkg/compiler/validators/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestLiteralTypeForLiterals(t *testing.T) {
Value: &core.Scalar_Binary{
Binary: &core.Binary{
Value: serializedBinaryData,
Tag: "msgpack",
Tag: coreutils.MESSAGEPACK,
},
},
},
Expand Down Expand Up @@ -83,7 +83,7 @@ func TestLiteralTypeForLiterals(t *testing.T) {
Value: &core.Scalar_Binary{
Binary: &core.Binary{
Value: serializedBinaryData,
Tag: "msgpack",
Tag: coreutils.MESSAGEPACK,
},
},
},
Expand Down
Loading

0 comments on commit 9abfbda

Please sign in to comment.