Skip to content

Commit

Permalink
added convertToLowerCamel option in proto udf
Browse files Browse the repository at this point in the history
  • Loading branch information
adranwit committed Jun 1, 2020
1 parent 3030153 commit f0bc6e1
Show file tree
Hide file tree
Showing 8 changed files with 213 additions and 165 deletions.
3 changes: 2 additions & 1 deletion shared/meta/deployment/go.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
"Targets": [
{
"MinReleaseVersion": {
"1.14": "1",
"1.15": "1",
"1.14": "3",
"1.13": "2",
"1.12": "7",
"1.11": "5",
Expand Down
20 changes: 10 additions & 10 deletions shared/static/meta.go

Large diffs are not rendered by default.

22 changes: 11 additions & 11 deletions shared/static/req.go

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions shared/static/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
)

func init() {
var memStorage = storage.NewMemoryService()
var memStorage = storage.NewMemoryService();
{
err := memStorage.Upload("mem://github.com/viant/endly/Version", bytes.NewReader([]byte{48, 46, 52, 57, 46, 49, 10}))
err := memStorage.Upload("mem://github.com/viant/endly/Version", bytes.NewReader([]byte{48,46,52,57,46,49,10}))
if err != nil {
log.Printf("failed to upload: mem://github.com/viant/endly/Version %v", err)
}
Expand Down
258 changes: 129 additions & 129 deletions shared/static/workflow.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion shared/workflow/req/tomcat_start.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
"Directory": "$appDirectory",
"Commands": [
{
"Command": "tomcat/bin/catalina.sh jpda start",
"Command": "tomcat/bin/catalina.sh jpda start",
"Success": [
"Tomcat started."
],
Expand Down
12 changes: 10 additions & 2 deletions testing/runner/http/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,18 +165,26 @@ func (s *service) handleRequest(client *http.Client, metric *runtimeMetric, trip
trip.timeout = true
atomic.AddUint32(&metric.timeouts, 1)
} else if err != nil {
fmt.Printf("%v\n", err)
trip.err = err
metric.err = err
atomic.AddUint32(&metric.errors, 1)
return
}
defer response.Body.Close()
if trip.err != nil || trip.timeout {
defer func() {
if response != nil && response.Body != nil {
response.Body.Close()
}
}()

if trip.err != nil || trip.timeout || response == nil {
return
}
var content []byte
if response.ContentLength > 0 {
content, err = ioutil.ReadAll(response.Body)
}

if trip.expected {
trip.response = &http.Response{
Header: response.Header,
Expand Down
57 changes: 48 additions & 9 deletions udf/protobuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package udf

import (
"bytes"
"encoding/json"
"fmt"
"github.com/jhump/protoreflect/desc/protoparse"
"github.com/jhump/protoreflect/dynamic"
"github.com/jhump/protoreflect/dynamic/msgregistry"
"github.com/pkg/errors"
"github.com/viant/toolbox"
"github.com/viant/toolbox/data"
"io"
Expand All @@ -16,8 +18,9 @@ import (

//ProtoCodec represent a proto codec
type ProtoCodec struct {
registry *msgregistry.MessageRegistry
msgType string
registry *msgregistry.MessageRegistry
msgType string
convertToLowerCamel bool
}

func (c *ProtoCodec) AsMessage(msgType string, data []byte) (interface{}, error) {
Expand Down Expand Up @@ -60,15 +63,45 @@ func (c *ProtoCodec) AsBinary(msgType string, msg interface{}) ([]byte, error) {
if err != nil {
return nil, err
}
if c.convertToLowerCamel {
data, err = c.toLowerCamel(err, data)
if err != nil {
err = errors.Wrapf(err, "failed to convert to lowerCase fields")
return nil, err
}
}
protoMsg := dynamic.NewMessage(msgDescriptor)
if err = protoMsg.UnmarshalJSON(data); err != nil {
err = protoMsg.UnmarshalJSON(data)
if err != nil {
err = errors.Wrapf(err, "failed to UnmarshalJSON")
return nil, err
}
return protoMsg.Marshal()
}


func (c *ProtoCodec) toLowerCamel(err error, data []byte) ([]byte, error) {
aMap := map[string]interface{}{}
err = json.Unmarshal(data, &aMap)
if err != nil {
return nil, errors.Wrapf(err, "invalid JSON")
}
transformed := map[string]interface{}{}
err = toolbox.CopyMap(aMap, transformed, func(key, value interface{}) (interface{}, interface{}, bool) {
if value == nil || toolbox.AsString(value) == "" || key == nil {
return nil, nil, false
}
return toolbox.ToCaseFormat(toolbox.AsString(key), toolbox.CaseUpperCamel, toolbox.CaseLowerCamel), value, true
})
if err != nil {
return nil, err
}
return json.Marshal(transformed)
}


//NewProtoCodec creates a new protobuf codec
func NewProtoCodec(schemaFile, importPath string, msgType string) (*ProtoCodec, error) {
func NewProtoCodec(schemaFile, importPath string, msgType string, lowercaseKey bool) (*ProtoCodec, error) {
parser := protoparse.Parser{ImportPaths: []string{importPath}, IncludeSourceCodeInfo: true}
descriptors, err := parser.ParseFiles(schemaFile)
if err != nil {
Expand All @@ -80,8 +113,9 @@ func NewProtoCodec(schemaFile, importPath string, msgType string) (*ProtoCodec,
registry.AddFile(baseURL, desc)
}
return &ProtoCodec{
registry: registry,
msgType: msgType,
registry: registry,
msgType: msgType,
convertToLowerCamel: lowercaseKey,
}, nil

}
Expand All @@ -93,12 +127,16 @@ func getProtoCodec(source string, args []interface{}) (*ProtoCodec, error) {
schemaFile := toolbox.AsString(args[0])
messageType := toolbox.AsString(args[1])
importPath, filename := path.Split(schemaFile)
if len(args) > 2 {
if len(args) > 2 && args[2] != nil {
importPath = toolbox.AsString(args[2])
} else {
schemaFile = filename
}
return NewProtoCodec(schemaFile, importPath, messageType)
lowercaseKey := false
if len(args) > 3 {
lowercaseKey = toolbox.AsBoolean(args[3])
}
return NewProtoCodec(schemaFile, importPath, messageType, lowercaseKey)
}

//NewProtoWriter creates a new proto writer provider
Expand All @@ -108,7 +146,8 @@ func NewProtoWriter(args ...interface{}) (func(source interface{}, state data.Ma
return nil, err
}
return func(source interface{}, state data.Map) (interface{}, error) {
return codec.AsBinary(codec.msgType, source)
data, err := codec.AsBinary(codec.msgType, source)
return data, err
}, nil
}

Expand Down

0 comments on commit f0bc6e1

Please sign in to comment.