Skip to content

Commit

Permalink
support attribute access
Browse files Browse the repository at this point in the history
Signed-off-by: Future-Outlier <[email protected]>
  • Loading branch information
Future-Outlier committed Aug 7, 2024
1 parent 0b32845 commit 1dcfbc4
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 6 deletions.
2 changes: 2 additions & 0 deletions flytepropeller/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
github.com/spf13/cobra v1.7.0
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.9.0
github.com/vmihailenco/msgpack/v5 v5.4.1
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0
go.opentelemetry.io/otel v1.24.0
go.opentelemetry.io/otel/trace v1.24.0
Expand Down Expand Up @@ -123,6 +124,7 @@ require (
github.com/spf13/viper v1.11.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/subosito/gotenv v1.2.0 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1 // indirect
go.opentelemetry.io/otel/exporters/jaeger v1.17.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions flytepropeller/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,10 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8=
github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok=
github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down
97 changes: 91 additions & 6 deletions flytepropeller/pkg/controller/nodes/attr_path_resolver.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
package nodes

import (
"google.golang.org/protobuf/types/known/structpb"

"encoding/json"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/errors"
"github.com/vmihailenco/msgpack/v5"
"google.golang.org/protobuf/types/known/structpb"
"strings"
)

// resolveAttrPathInPromise resolves the literal with attribute path
// If the promise is chained with attributes (e.g. promise.a["b"][0]), then we need to resolve the promise
func resolveAttrPathInPromise(nodeID string, literal *core.Literal, bindAttrPath []*core.PromiseAttribute) (*core.Literal, error) {
var currVal *core.Literal = literal
var tmpVal *core.Literal
var err error
var exist bool
count := 0

Expand All @@ -38,13 +39,19 @@ func resolveAttrPathInPromise(nodeID string, literal *core.Literal, bindAttrPath
}

// resolve dataclass
if currVal.GetScalar() != nil && currVal.GetScalar().GetGeneric() != nil {
st := currVal.GetScalar().GetGeneric()
if scalar := currVal.GetScalar(); scalar != nil {
// start from index "count"
currVal, err = resolveAttrPathInPbStruct(nodeID, st, bindAttrPath[count:])
var err error

if json := scalar.GetJson(); json != nil {
currVal, err = resolveAttrPathInJson(nodeID, json.GetValue(), bindAttrPath[count:])

Check warning on line 47 in flytepropeller/pkg/controller/nodes/attr_path_resolver.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/attr_path_resolver.go#L47

Added line #L47 was not covered by tests
} else if generic := scalar.GetGeneric(); generic != nil {
currVal, err = resolveAttrPathInPbStruct(nodeID, generic, bindAttrPath[count:])
}
if err != nil {
return nil, err
}

}

return currVal, nil
Expand Down Expand Up @@ -84,6 +91,82 @@ func resolveAttrPathInPbStruct(nodeID string, st *structpb.Struct, bindAttrPath
return literal, err
}

// resolveAttrPathInJson resolves the msgpack bytes (e.g. dataclass) with attribute path
func resolveAttrPathInJson(nodeID string, json_byte []byte, bindAttrPath []*core.PromiseAttribute) (*core.Literal,
error) {

Check warning on line 96 in flytepropeller/pkg/controller/nodes/attr_path_resolver.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/attr_path_resolver.go#L96

Added line #L96 was not covered by tests

var currVal interface{}
var tmpVal interface{}
var exist bool
var jsonStr string

Check warning on line 101 in flytepropeller/pkg/controller/nodes/attr_path_resolver.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/attr_path_resolver.go#L98-L101

Added lines #L98 - L101 were not covered by tests

err := msgpack.Unmarshal(json_byte, &jsonStr)
if err != nil {
return nil, err

Check warning on line 105 in flytepropeller/pkg/controller/nodes/attr_path_resolver.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/attr_path_resolver.go#L103-L105

Added lines #L103 - L105 were not covered by tests
}

// Golang has problem with unmarshalling integer as float64
// reference: https://stackoverflow.com/questions/22343083/json-unmarshaling-with-long-numbers-gives-floating-point-number

decoder := json.NewDecoder(strings.NewReader(jsonStr))
decoder.UseNumber()
err = decoder.Decode(&tmpVal)
if err != nil {
return nil, err

Check warning on line 115 in flytepropeller/pkg/controller/nodes/attr_path_resolver.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/attr_path_resolver.go#L111-L115

Added lines #L111 - L115 were not covered by tests
}
currVal = convertNumbers(tmpVal)

Check warning on line 117 in flytepropeller/pkg/controller/nodes/attr_path_resolver.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/attr_path_resolver.go#L117

Added line #L117 was not covered by tests

// Turn the current value to a map so it can be resolved more easily
for _, attr := range bindAttrPath {
switch resolvedVal := currVal.(type) {

Check warning on line 121 in flytepropeller/pkg/controller/nodes/attr_path_resolver.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/attr_path_resolver.go#L120-L121

Added lines #L120 - L121 were not covered by tests
// map
case map[string]interface{}:
tmpVal, exist = resolvedVal[attr.GetStringValue()]
if !exist {
return nil, errors.Errorf(errors.PromiseAttributeResolveError, nodeID, "key [%v] does not exist in literal %v", attr.GetStringValue(), currVal)

Check warning on line 126 in flytepropeller/pkg/controller/nodes/attr_path_resolver.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/attr_path_resolver.go#L123-L126

Added lines #L123 - L126 were not covered by tests
}
currVal = tmpVal

Check warning on line 128 in flytepropeller/pkg/controller/nodes/attr_path_resolver.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/attr_path_resolver.go#L128

Added line #L128 was not covered by tests
// list
case []interface{}:
if int(attr.GetIntValue()) >= len(resolvedVal) {
return nil, errors.Errorf(errors.PromiseAttributeResolveError, nodeID, "index [%v] is out of range of %v", attr.GetIntValue(), currVal)

Check warning on line 132 in flytepropeller/pkg/controller/nodes/attr_path_resolver.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/attr_path_resolver.go#L130-L132

Added lines #L130 - L132 were not covered by tests
}
currVal = resolvedVal[attr.GetIntValue()]

Check warning on line 134 in flytepropeller/pkg/controller/nodes/attr_path_resolver.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/attr_path_resolver.go#L134

Added line #L134 was not covered by tests
}
}

// After resolve, convert the interface to literal
literal, err := convertInterfaceToLiteral(nodeID, currVal)

Check warning on line 139 in flytepropeller/pkg/controller/nodes/attr_path_resolver.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/attr_path_resolver.go#L139

Added line #L139 was not covered by tests

return literal, err

Check warning on line 141 in flytepropeller/pkg/controller/nodes/attr_path_resolver.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/attr_path_resolver.go#L141

Added line #L141 was not covered by tests
}

// convertNumbers recursively converts json.Number to int64 or float64
func convertNumbers(v interface{}) interface{} {
switch vv := v.(type) {
case map[string]interface{}:
for key, value := range vv {
vv[key] = convertNumbers(value)

Check warning on line 149 in flytepropeller/pkg/controller/nodes/attr_path_resolver.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/attr_path_resolver.go#L145-L149

Added lines #L145 - L149 were not covered by tests
}
return vv
case []interface{}:
for i, value := range vv {
vv[i] = convertNumbers(value)

Check warning on line 154 in flytepropeller/pkg/controller/nodes/attr_path_resolver.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/attr_path_resolver.go#L151-L154

Added lines #L151 - L154 were not covered by tests
}
return vv
case json.Number:

Check warning on line 157 in flytepropeller/pkg/controller/nodes/attr_path_resolver.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/attr_path_resolver.go#L156-L157

Added lines #L156 - L157 were not covered by tests
// Try to convert to int64 first
if intVal, err := vv.Int64(); err == nil {
return intVal

Check warning on line 160 in flytepropeller/pkg/controller/nodes/attr_path_resolver.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/attr_path_resolver.go#L159-L160

Added lines #L159 - L160 were not covered by tests
}
// If it fails, fall back to float64
if floatVal, err := vv.Float64(); err == nil {
return floatVal

Check warning on line 164 in flytepropeller/pkg/controller/nodes/attr_path_resolver.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/attr_path_resolver.go#L163-L164

Added lines #L163 - L164 were not covered by tests
}
}
return v

Check warning on line 167 in flytepropeller/pkg/controller/nodes/attr_path_resolver.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/attr_path_resolver.go#L167

Added line #L167 was not covered by tests
}

// convertInterfaceToLiteral converts the protobuf struct (e.g. dataclass) to literal
func convertInterfaceToLiteral(nodeID string, obj interface{}) (*core.Literal, error) {

Expand Down Expand Up @@ -137,6 +220,8 @@ func convertInterfaceToLiteralScalar(nodeID string, obj interface{}) (*core.Lite
value.Value = &core.Primitive_StringValue{StringValue: obj}
case int:
value.Value = &core.Primitive_Integer{Integer: int64(obj)}
case int64:
value.Value = &core.Primitive_Integer{Integer: obj}

Check warning on line 224 in flytepropeller/pkg/controller/nodes/attr_path_resolver.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/attr_path_resolver.go#L223-L224

Added lines #L223 - L224 were not covered by tests
case float64:
value.Value = &core.Primitive_FloatValue{FloatValue: obj}
case bool:
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
golang.org/x/sync v0.7.0
gorm.io/driver/postgres v1.5.3
sigs.k8s.io/controller-runtime v0.16.3

)

require (
Expand Down Expand Up @@ -177,6 +178,8 @@ require (
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
github.com/tidwall/sjson v1.2.5 // indirect
github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect; indirects
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
github.com/wI2L/jsondiff v0.5.0 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1331,6 +1331,10 @@ github.com/unrolled/secure v0.0.0-20181005190816-ff9db2ff917f/go.mod h1:mnPT77IA
github.com/urfave/negroni v1.0.0/go.mod h1:Meg73S6kFm/4PpbYdq35yYWoCZ9mS/YSx+lKnmiohz4=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8=
github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok=
github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
github.com/wI2L/jsondiff v0.5.0 h1:RRMTi/mH+R2aXcPe1VYyvGINJqQfC3R+KSEakuU1Ikw=
github.com/wI2L/jsondiff v0.5.0/go.mod h1:qqG6hnK0Lsrz2BpIVCxWiK9ItsBCpIZQiv0izJjOZ9s=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
Expand Down

0 comments on commit 1dcfbc4

Please sign in to comment.