diff --git a/pkg/api/run_http.go b/pkg/api/run_http.go index a6608fc3..97de784b 100644 --- a/pkg/api/run_http.go +++ b/pkg/api/run_http.go @@ -89,7 +89,7 @@ func ConstructHttpEndpoint(env *Environ, port int, operator string, gens core.Ge } httpDef.InstanceDefs = append(httpDef.InstanceDefs, unpackerIns) httpDef.Connections["httpServer.handler)body"] = []string{"(unpacker"} - httpDef.Connections["unpacker)"] = []string{"(operator"} + httpDef.Connections["unpacker)item"] = []string{"(operator"} } if outDef.Equals(elem.HTTP_RESPONSE_DEF) { diff --git a/pkg/elem/encoding_json_read.go b/pkg/elem/encoding_json_read.go index f22fbb16..a0fb0e7d 100644 --- a/pkg/elem/encoding_json_read.go +++ b/pkg/elem/encoding_json_read.go @@ -14,8 +14,16 @@ var encodingJSONReadCfg = &builtinConfig{ Type: "binary", }, Out: core.TypeDef{ - Type: "generic", - Generic: "itemType", + Type: "map", + Map: map[string]*core.TypeDef{ + "valid": { + Type: "boolean", + }, + "item": { + Type: "generic", + Generic: "itemType", + }, + }, }, }, }, @@ -24,6 +32,8 @@ var encodingJSONReadCfg = &builtinConfig{ opFunc: func(op *core.Operator) { in := op.Main().In() out := op.Main().Out() + def, _ := op.Define() + itemDef := def.ServiceDefs[core.MAIN_SERVICE].Out.Map["item"] for !op.CheckStop() { i := in.Pull() if core.IsMarker(i) { @@ -33,11 +43,19 @@ var encodingJSONReadCfg = &builtinConfig{ var obj interface{} err := json.Unmarshal([]byte(i.(utils.Binary)), &obj) if err != nil { - out.Push(nil) + out.Map("item").Push(nil) + out.Map("valid").Push(false) continue } obj = utils.CleanValue(obj) - out.Push(obj) // TODO: Make this safer + err = itemDef.VerifyData(obj) + if err == nil { + out.Map("item").Push(obj) + out.Map("valid").Push(true) + } else { + out.Map("item").Push(nil) + out.Map("valid").Push(false) + } } }, } diff --git a/pkg/elem/encoding_json_read_test.go b/pkg/elem/encoding_json_read_test.go index 80566d8f..4a2c8cd8 100644 --- a/pkg/elem/encoding_json_read_test.go +++ b/pkg/elem/encoding_json_read_test.go @@ -33,7 +33,39 @@ func Test_JsonRead__String(t *testing.T) { o.Main().Out().Bufferize() o.Start() o.Main().In().Push(utils.Binary("\"test\"")) - a.PortPushes("test", o.Main().Out()) + a.PortPushes("test", o.Main().Out().Map("item")) + a.PortPushes(true, o.Main().Out().Map("valid")) +} + +func Test_JsonRead__Invalid(t *testing.T) { + a := assertions.New(t) + + o, err := buildOperator( + core.InstanceDef{ + Operator: "slang.encoding.JSONRead", + Generics: map[string]*core.TypeDef{ + "itemType": { + Type: "map", + Map: map[string]*core.TypeDef{ + "a": { + Type: "number", + }, + "b": { + Type: "boolean", + }, + }, + }, + }, + }, + ) + require.NoError(t, err) + + o.Main().Out().Bufferize() + o.Start() + o.Main().In().Push(utils.Binary("\"test\"")) + a.PortPushes(nil, o.Main().Out().Map("item").Map("a")) + a.PortPushes(nil, o.Main().Out().Map("item").Map("b")) + a.PortPushes(false, o.Main().Out().Map("valid")) } func Test_JsonRead__Complex(t *testing.T) { @@ -65,5 +97,6 @@ func Test_JsonRead__Complex(t *testing.T) { o.Main().Out().Bufferize() o.Start() o.Main().In().Push(utils.Binary("{\"a\":[1,2,3],\"b\":true}")) - a.PortPushes(map[string]interface{}{"a": []interface{}{1.0, 2.0, 3.0}, "b": true}, o.Main().Out()) + a.PortPushes(map[string]interface{}{"a": []interface{}{1.0, 2.0, 3.0}, "b": true}, o.Main().Out().Map("item")) + a.PortPushes(true, o.Main().Out().Map("valid")) }