From 6dce6bec3d51d2a1cdbaae86da0cf507b720e3e3 Mon Sep 17 00:00:00 2001 From: Nic Pottier Date: Tue, 30 May 2017 10:59:31 -0500 Subject: [PATCH 1/2] Output is now session, more tests, fix index offsets for jsonparser --- .gitignore | 1 + cmd/flowrunner/main.go | 2 +- cmd/flowrunner/runner_test.go | 55 +++- .../testdata/flows/all_actions.json | 2 +- .../testdata/flows/all_actions_test.json | 238 ++++++++++++++++++ .../testdata/flows/subflow_test.json | 8 +- cmd/flowserver/flow.go | 22 +- excellent/evaluator_test.go | 4 + flows/engine/engine.go | 22 +- flows/interfaces.go | 10 +- flows/runs/output.go | 112 --------- flows/runs/run.go | 28 +-- flows/runs/session.go | 112 +++++++++ utils/json.go | 23 +- utils/json_test.go | 36 +++ utils/resolver.go | 21 +- vendor/github.com/buger/jsonparser/parser.go | 5 + 17 files changed, 529 insertions(+), 172 deletions(-) create mode 100644 cmd/flowrunner/testdata/flows/all_actions_test.json delete mode 100644 flows/runs/output.go create mode 100644 flows/runs/session.go create mode 100644 utils/json_test.go diff --git a/.gitignore b/.gitignore index 84e699bca..d01274304 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ # Compiled Object files, Static and Dynamic libs (Shared Objects) +.vscode *~ *.o *.a diff --git a/cmd/flowrunner/main.go b/cmd/flowrunner/main.go index 479b4db19..91f9d7871 100644 --- a/cmd/flowrunner/main.go +++ b/cmd/flowrunner/main.go @@ -186,7 +186,7 @@ func main() { inputs = append(inputs, event) // rebuild our output - output, err = runs.ReadRunOutput(outJSON) + output, err = runs.ReadSession(outJSON) if err != nil { log.Fatalf("Error unmarshalling output: %s", err) } diff --git a/cmd/flowrunner/runner_test.go b/cmd/flowrunner/runner_test.go index eed8c8567..a3319184a 100644 --- a/cmd/flowrunner/runner_test.go +++ b/cmd/flowrunner/runner_test.go @@ -6,10 +6,14 @@ import ( "fmt" "io/ioutil" "log" + "net" + "net/http" + "net/http/httptest" "strings" "testing" "github.com/nyaruka/goflow/flows" + "github.com/nyaruka/goflow/flows/actions" "github.com/nyaruka/goflow/flows/definition" "github.com/nyaruka/goflow/flows/engine" "github.com/nyaruka/goflow/flows/events" @@ -26,9 +30,11 @@ var flowTests = []struct { {"two_questions.json", "", "", "two_questions_test.json"}, {"subflow.json", "", "", "subflow_test.json"}, {"brochure.json", "", "", "brochure_test.json"}, + {"all_actions.json", "", "", "all_actions_test.json"}, } var writeOutput bool +var serverURL = "" func init() { flag.BoolVar(&writeOutput, "write", false, "whether to rewrite TestFlow output") @@ -64,6 +70,18 @@ func runFlow(env utils.Environment, flowFilename string, contactFilename string, return nil, fmt.Errorf("Error unmarshalling flows '%s': %s", flowFilename, err) } + // rewrite the URL on any webhook actions + for _, flow := range runnerFlows { + for _, n := range flow.Nodes() { + for _, a := range n.Actions() { + webhook, isWebhook := a.(*actions.WebhookAction) + if isWebhook { + webhook.URL = strings.Replace(webhook.URL, "http://localhost", serverURL, 1) + } + } + } + } + contactJSON, err := readFile("contacts/", contactFilename) if err != nil { return nil, err @@ -99,7 +117,7 @@ func runFlow(env utils.Environment, flowFilename string, contactFilename string, } outputs = append(outputs, outJSON) - output, err = runs.ReadRunOutput(outJSON) + output, err = runs.ReadSession(outJSON) if err != nil { return nil, fmt.Errorf("Error marshalling output: %s", err) } @@ -134,8 +152,39 @@ func runFlow(env utils.Environment, flowFilename string, contactFilename string, return outputs, nil } +// set up a mock server for webhook actions +func startTestHTTPServer() { + server := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + cmd := r.URL.Query().Get("cmd") + defer r.Body.Close() + w.Header().Set("Date", "") + + switch cmd { + case "success": + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{ "ok": "true" }`)) + case "unavailable": + w.WriteHeader(http.StatusServiceUnavailable) + w.Write([]byte(`{ "errors": ["service unavailable"] }`)) + default: + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte(`{ "errors": ["bad_request"] }`)) + } + })) + // manually create a listener for our test server so that our output is predictable + l, err := net.Listen("tcp", "127.0.0.1:49999") + if err != nil { + log.Fatal(err) + } + server.Listener = l + server.Start() + defer server.Close() + serverURL = server.URL +} + func TestFlows(t *testing.T) { env := utils.NewDefaultEnvironment() + startTestHTTPServer() for _, test := range flowTests { testJSON, err := readFile("flows/", test.output) @@ -195,11 +244,11 @@ func TestFlows(t *testing.T) { } for i := range outputs { - o, err := runs.ReadRunOutput(outputs[i]) + o, err := runs.ReadSession(outputs[i]) if err != nil { t.Errorf("Error unmarshalling output: %s\n", err) } - expectedO, err := runs.ReadRunOutput(flowTest.Outputs[i]) + expectedO, err := runs.ReadSession(flowTest.Outputs[i]) if err != nil { t.Errorf("Error unmarshalling output: %s\n", err) } diff --git a/cmd/flowrunner/testdata/flows/all_actions.json b/cmd/flowrunner/testdata/flows/all_actions.json index 4214485ac..198004701 100644 --- a/cmd/flowrunner/testdata/flows/all_actions.json +++ b/cmd/flowrunner/testdata/flows/all_actions.json @@ -80,7 +80,7 @@ "uuid": "06153fbd-3e2c-413a-b0df-ed15d631835a", "type": "webhook", "method": "get", - "url": "http://echo.jsontest.com/key/value/one/two" + "url": "http://localhost/?cmd=success" } ] } diff --git a/cmd/flowrunner/testdata/flows/all_actions_test.json b/cmd/flowrunner/testdata/flows/all_actions_test.json new file mode 100644 index 000000000..834791929 --- /dev/null +++ b/cmd/flowrunner/testdata/flows/all_actions_test.json @@ -0,0 +1,238 @@ +{ + "resume_events": [], + "outputs": [ + { + "runs": [ + { + "uuid": "", + "flow": "flow1", + "channel": "", + "contact": "contact1-test-4b7f-a34b-e37e31e86451", + "path": [ + { + "uuid": "", + "node": "node1", + "arrived_on": "2000-01-01T00:00:00.000000000-00:00", + "left_on": "2000-01-01T00:00:00.000000000-00:00", + "events": [ + { + "type": "add_to_group", + "created_on": "2000-01-01T00:00:00.000000000-00:00", + "step": "", + "groups": [ + { + "uuid": "", + "name": "Survey Audience" + } + ] + }, + { + "type": "save_to_contact", + "created_on": "2000-01-01T00:00:00.000000000-00:00", + "step": "", + "field": "f06c5b73-eb0d-4417-b7e0-4f650ed30dc8", + "name": "activation_token", + "value": "XXX-YYY-ZZZ" + }, + { + "type": "email", + "created_on": "2000-01-01T00:00:00.000000000-00:00", + "step": "", + "email": "", + "subject": "Here is your activation token", + "body": "Your activation token is XXX-YYY-ZZZ" + }, + { + "type": "flow_enter", + "created_on": "2000-01-01T00:00:00.000000000-00:00", + "step": "", + "flow": "b7cf0d83-f1c9-411c-96fd-c511a4cfa86d", + "contact": "contact1-test-4b7f-a34b-e37e31e86451" + }, + { + "type": "flow_exit", + "created_on": "2000-01-01T00:00:00.000000000-00:00", + "step": "", + "flow": "b7cf0d83-f1c9-411c-96fd-c511a4cfa86d", + "contact": "contact1-test-4b7f-a34b-e37e31e86451", + "status": "C", + "exited_on": "2000-01-01T00:00:00.000000000-00:00" + }, + { + "type": "msg_out", + "created_on": "2000-01-01T00:00:00.000000000-00:00", + "step": "", + "urn": "tel:+12065551212", + "text": "Hi Ben Haggerty, are you ready?" + }, + { + "type": "msg_out", + "created_on": "2000-01-01T00:00:00.000000000-00:00", + "step": "", + "contact": "contact1-test-4b7f-a34b-e37e31e86451", + "text": "Hi Ben Haggerty, are you ready to complete today's survey?" + }, + { + "type": "save_result", + "created_on": "2000-01-01T00:00:00.000000000-00:00", + "step": "", + "node": "node1", + "name": "gender", + "value": "m", + "category": "Male" + }, + { + "type": "save_to_contact", + "created_on": "2000-01-01T00:00:00.000000000-00:00", + "step": "", + "field": "0cb17b2a-3bfe-4a19-8c99-98ab9561045d", + "name": "Gender", + "value": "Male" + }, + { + "type": "error", + "created_on": "2000-01-01T00:00:00.000000000-00:00", + "step": "", + "text": "Get http://127.0.0.1:49999/?cmd=success: dial tcp 127.0.0.1:49999: getsockopt: connection refused" + }, + { + "type": "webhook", + "created_on": "2000-01-01T00:00:00.000000000-00:00", + "step": "", + "url": "http://127.0.0.1:49999/?cmd=success", + "status": "F", + "status_code": 0, + "request": "GET /?cmd=success HTTP/1.1\r\nHost: 127.0.0.1:49999\r\nUser-Agent: Go-http-client/1.1\r\nAccept-Encoding: gzip\r\n\r\n", + "response": "" + } + ] + } + ], + "status": "C", + "child": "c458a838-efa8-42d7-a655-3e4784bcbf5b", + "results": { + "gender": { + "node": "node1", + "name": "gender", + "value": "m", + "category": "Male", + "created_on": "2000-01-01T00:00:00.000000000-00:00" + } + }, + "created_on": "2000-01-01T00:00:00.000000000-00:00", + "modified_on": "2000-01-01T00:00:00.000000000-00:00", + "expires_on": "2000-01-01T00:00:00.000000000-00:00", + "timesout_on": "2000-01-01T00:00:00.000000000-00:00", + "exited_on": "2000-01-01T00:00:00.000000000-00:00" + }, + { + "uuid": "", + "flow": "b7cf0d83-f1c9-411c-96fd-c511a4cfa86d", + "channel": "", + "contact": "contact1-test-4b7f-a34b-e37e31e86451", + "path": [], + "status": "C", + "parent": "2d17c317-93c1-497d-879d-3ed56cf1b0d4", + "results": {}, + "created_on": "2000-01-01T00:00:00.000000000-00:00", + "modified_on": "2000-01-01T00:00:00.000000000-00:00", + "expires_on": "2000-01-01T00:00:00.000000000-00:00", + "timesout_on": "2000-01-01T00:00:00.000000000-00:00", + "exited_on": "2000-01-01T00:00:00.000000000-00:00" + } + ], + "events": [ + { + "type": "add_to_group", + "created_on": "2000-01-01T00:00:00.000000000-00:00", + "step": "", + "groups": [ + { + "uuid": "", + "name": "Survey Audience" + } + ] + }, + { + "type": "save_to_contact", + "created_on": "2000-01-01T00:00:00.000000000-00:00", + "step": "", + "field": "f06c5b73-eb0d-4417-b7e0-4f650ed30dc8", + "name": "activation_token", + "value": "XXX-YYY-ZZZ" + }, + { + "type": "email", + "created_on": "2000-01-01T00:00:00.000000000-00:00", + "step": "", + "email": "", + "subject": "Here is your activation token", + "body": "Your activation token is XXX-YYY-ZZZ" + }, + { + "type": "flow_enter", + "created_on": "2000-01-01T00:00:00.000000000-00:00", + "step": "", + "flow": "b7cf0d83-f1c9-411c-96fd-c511a4cfa86d", + "contact": "contact1-test-4b7f-a34b-e37e31e86451" + }, + { + "type": "flow_exit", + "created_on": "2000-01-01T00:00:00.000000000-00:00", + "step": "", + "flow": "b7cf0d83-f1c9-411c-96fd-c511a4cfa86d", + "contact": "contact1-test-4b7f-a34b-e37e31e86451", + "status": "C", + "exited_on": "2000-01-01T00:00:00.000000000-00:00" + }, + { + "type": "msg_out", + "created_on": "2000-01-01T00:00:00.000000000-00:00", + "step": "", + "urn": "tel:+12065551212", + "text": "Hi Ben Haggerty, are you ready?" + }, + { + "type": "msg_out", + "created_on": "2000-01-01T00:00:00.000000000-00:00", + "step": "", + "contact": "contact1-test-4b7f-a34b-e37e31e86451", + "text": "Hi Ben Haggerty, are you ready to complete today's survey?" + }, + { + "type": "save_result", + "created_on": "2000-01-01T00:00:00.000000000-00:00", + "step": "", + "node": "node1", + "name": "gender", + "value": "m", + "category": "Male" + }, + { + "type": "save_to_contact", + "created_on": "2000-01-01T00:00:00.000000000-00:00", + "step": "", + "field": "0cb17b2a-3bfe-4a19-8c99-98ab9561045d", + "name": "Gender", + "value": "Male" + }, + { + "type": "error", + "created_on": "2000-01-01T00:00:00.000000000-00:00", + "step": "", + "text": "Get http://127.0.0.1:49999/?cmd=success: dial tcp 127.0.0.1:49999: getsockopt: connection refused" + }, + { + "type": "webhook", + "created_on": "2000-01-01T00:00:00.000000000-00:00", + "step": "", + "url": "http://127.0.0.1:49999/?cmd=success", + "status": "F", + "status_code": 0, + "request": "GET /?cmd=success HTTP/1.1\r\nHost: 127.0.0.1:49999\r\nUser-Agent: Go-http-client/1.1\r\nAccept-Encoding: gzip\r\n\r\n", + "response": "" + } + ] + } + ] +} \ No newline at end of file diff --git a/cmd/flowrunner/testdata/flows/subflow_test.json b/cmd/flowrunner/testdata/flows/subflow_test.json index 039a6883d..1ffe37ce1 100644 --- a/cmd/flowrunner/testdata/flows/subflow_test.json +++ b/cmd/flowrunner/testdata/flows/subflow_test.json @@ -53,7 +53,7 @@ "type": "flow", "flow": "flow2" }, - "child": "55abd5d1-624b-4322-b72d-bc76821b9a11", + "child": "5a08f736-52ac-4934-9d6c-71e3d3929d17", "results": {}, "created_on": "2000-01-01T00:00:00.000000000-00:00", "modified_on": "2000-01-01T00:00:00.000000000-00:00", @@ -92,7 +92,7 @@ "wait": { "type": "msg" }, - "parent": "a4f1694f-565c-466d-b684-19ce11bedc2b", + "parent": "c2bb899c-cccd-4e40-9ada-423a309c0c5f", "results": {}, "created_on": "2000-01-01T00:00:00.000000000-00:00", "modified_on": "2000-01-01T00:00:00.000000000-00:00", @@ -213,7 +213,7 @@ "contact": "contact1-test-4b7f-a34b-e37e31e86451", "text": "Ryan Lewis" }, - "parent": "a4f1694f-565c-466d-b684-19ce11bedc2b", + "parent": "c2bb899c-cccd-4e40-9ada-423a309c0c5f", "results": { "name": { "node": "flow2-node1", @@ -290,7 +290,7 @@ } ], "status": "C", - "child": "55abd5d1-624b-4322-b72d-bc76821b9a11", + "child": "5a08f736-52ac-4934-9d6c-71e3d3929d17", "results": {}, "created_on": "2000-01-01T00:00:00.000000000-00:00", "modified_on": "2000-01-01T00:00:00.000000000-00:00", diff --git a/cmd/flowserver/flow.go b/cmd/flowserver/flow.go index a6867daf8..f186e2b7f 100644 --- a/cmd/flowserver/flow.go +++ b/cmd/flowserver/flow.go @@ -17,8 +17,8 @@ import ( ) type flowResponse struct { - Contact *flows.Contact `json:"contact"` - RunOutput flows.RunOutput `json:"run_output"` + Contact *flows.Contact `json:"contact"` + Session flows.Session `json:"session"` } type startRequest struct { @@ -75,19 +75,19 @@ func handleStart(w http.ResponseWriter, r *http.Request) (interface{}, error) { env := engine.NewFlowEnvironment(utils.NewDefaultEnvironment(), startFlows, []flows.FlowRun{}, []*flows.Contact{contact}) // start our flow - output, err := engine.StartFlow(env, startFlows[0], contact, nil, input) + session, err := engine.StartFlow(env, startFlows[0], contact, nil, input) if err != nil { return nil, fmt.Errorf("error starting flow: %s", err) } - return &flowResponse{Contact: contact, RunOutput: output}, nil + return &flowResponse{Contact: contact, Session: session}, nil } type resumeRequest struct { - Flows json.RawMessage `json:"flows" validate:"required,min=1"` - Contact json.RawMessage `json:"contact" validate:"required"` - RunOutput json.RawMessage `json:"run_output" validate:"required"` - Event *utils.TypedEnvelope `json:"event" validate:"required"` + Flows json.RawMessage `json:"flows" validate:"required,min=1"` + Contact json.RawMessage `json:"contact" validate:"required"` + Session json.RawMessage `json:"session" validate:"required"` + Event *utils.TypedEnvelope `json:"event" validate:"required"` } func handleResume(w http.ResponseWriter, r *http.Request) (interface{}, error) { @@ -116,7 +116,7 @@ func handleResume(w http.ResponseWriter, r *http.Request) (interface{}, error) { } // read our run - runOutput, err := runs.ReadRunOutput(resume.RunOutput) + runOutput, err := runs.ReadSession(resume.Session) if err != nil { return nil, err } @@ -145,10 +145,10 @@ func handleResume(w http.ResponseWriter, r *http.Request) (interface{}, error) { activeRun := runOutput.ActiveRun() // resume our flow - output, err := engine.ResumeFlow(env, activeRun, event) + session, err := engine.ResumeFlow(env, activeRun, event) if err != nil { return nil, fmt.Errorf("error resuming flow: %s", err) } - return &flowResponse{Contact: contact, RunOutput: output}, nil + return &flowResponse{Contact: contact, Session: session}, nil } diff --git a/excellent/evaluator_test.go b/excellent/evaluator_test.go index 1518d5e15..1db5d5c76 100644 --- a/excellent/evaluator_test.go +++ b/excellent/evaluator_test.go @@ -120,6 +120,8 @@ func TestEvaluateTemplate(t *testing.T) { strMap["2"] = "two" strMap["3"] = "three" strMap["four"] = "four" + strMap["with space"] = "spacy" + strMap["with-dash"] = "dashy" intMap := make(map[int]string) intMap[1] = "one" @@ -165,6 +167,8 @@ func TestEvaluateTemplate(t *testing.T) { {"@(str_map[key])", "four", false}, {"@(str_map[lower(key)])", "four", false}, {"@(title(missing))", "", true}, + {`@(str_map["with-dash"])`, "dashy", false}, + {`@(str_map["with space"])`, "spacy", false}, {"@string1 world", "foo world", false}, diff --git a/flows/engine/engine.go b/flows/engine/engine.go index e740f675f..11cbc7200 100644 --- a/flows/engine/engine.go +++ b/flows/engine/engine.go @@ -12,7 +12,7 @@ type VisitedMap map[flows.NodeUUID]bool const noDestination = flows.NodeUUID("") // StartFlow starts the flow for the passed in contact, returning the created FlowRun -func StartFlow(env flows.FlowEnvironment, flow flows.Flow, contact *flows.Contact, parent flows.FlowRun, input flows.Input) (flows.RunOutput, error) { +func StartFlow(env flows.FlowEnvironment, flow flows.Flow, contact *flows.Contact, parent flows.FlowRun, input flows.Input) (flows.Session, error) { // build our run run := flow.CreateRun(env, contact, parent) @@ -24,24 +24,24 @@ func StartFlow(env flows.FlowEnvironment, flow flows.Flow, contact *flows.Contac // no first node, nothing to do (valid but weird) if len(flow.Nodes()) == 0 { run.Exit(flows.RunCompleted) - return run.Output(), nil + return run.Session(), nil } initTranslations(run) // off to the races err := continueRunUntilWait(run, flow.Nodes()[0].UUID(), nil, input) - return run.Output(), err + return run.Session(), err } // ResumeFlow resumes our flow from the last step -func ResumeFlow(env flows.FlowEnvironment, run flows.FlowRun, event flows.Event) (flows.RunOutput, error) { +func ResumeFlow(env flows.FlowEnvironment, run flows.FlowRun, event flows.Event) (flows.Session, error) { // to resume a flow, hydrate our run with the environment run.Hydrate(env) // no steps to resume from, nothing to do, return if len(run.Path()) == 0 { - return run.Output(), nil + return run.Session(), nil } initTranslations(run) @@ -54,17 +54,17 @@ func ResumeFlow(env flows.FlowEnvironment, run flows.FlowRun, event flows.Event) if node == nil { err := fmt.Errorf("cannot resume at node '%s' that no longer exists", step.Node()) run.AddError(step, err) - return run.Output(), err + return run.Session(), err } destination, step, err := resumeNode(run, node, step, event) if err != nil { - return run.Output(), err + return run.Session(), err } err = continueRunUntilWait(run, destination, step, nil) if err != nil { - return run.Output(), err + return run.Session(), err } // if we ran to completion and have a parent, resume that flow @@ -72,13 +72,13 @@ func ResumeFlow(env flows.FlowEnvironment, run flows.FlowRun, event flows.Event) event := events.NewFlowExitEvent(run) parentRun, err := env.GetRun(run.Parent().UUID()) if err != nil { - return run.Output(), err + return run.Session(), err } - parentRun.SetOutput(run.Output()) + parentRun.SetSession(run.Session()) return ResumeFlow(env, parentRun, event) } - return run.Output(), nil + return run.Session(), nil } // initializes our context based on our flow and current context diff --git a/flows/interfaces.go b/flows/interfaces.go index d13e6e4df..0c682ecd0 100644 --- a/flows/interfaces.go +++ b/flows/interfaces.go @@ -164,8 +164,8 @@ type Result interface { utils.VariableResolver } -// RunOutput represents the output of a Run in its last execution cycle -type RunOutput interface { +// Session represents the session of a flow run which may contain many runs +type Session interface { Runs() []FlowRun AddRun(FlowRun) @@ -194,9 +194,9 @@ type FlowRun interface { Results() Results Environment() FlowEnvironment - Output() RunOutput - SetOutput(RunOutput) - ResetOutput() + Session() Session + SetSession(Session) + ResetSession() Status() RunStatus Exit(RunStatus) diff --git a/flows/runs/output.go b/flows/runs/output.go deleted file mode 100644 index 5a0d618d5..000000000 --- a/flows/runs/output.go +++ /dev/null @@ -1,112 +0,0 @@ -package runs - -import ( - "encoding/json" - - "github.com/nyaruka/goflow/flows" - "github.com/nyaruka/goflow/flows/events" - "github.com/nyaruka/goflow/utils" -) - -type runOutput struct { - runs []flows.FlowRun - events []flows.Event -} - -func newRunOutput() *runOutput { - output := runOutput{} - return &output -} - -func (o *runOutput) AddRun(run flows.FlowRun) { o.runs = append(o.runs, run) } -func (o *runOutput) Runs() []flows.FlowRun { return o.runs } - -func (o *runOutput) ActiveRun() flows.FlowRun { - var active flows.FlowRun - mostRecent := utils.ZeroTime - - for _, run := range o.runs { - // We are complete, therefore can't be active - if run.IsComplete() { - continue - } - - // We have a child, and it isn't complete, we can't be active - if run.Child() != nil && run.Child().Status() == flows.RunActive { - continue - } - - // this is more recent than our most recent flow - if run.ModifiedOn().After(mostRecent) { - active = run - mostRecent = run.ModifiedOn() - } - } - return active -} - -func (o *runOutput) AddEvent(event flows.Event) { o.events = append(o.events, event) } -func (o *runOutput) Events() []flows.Event { return o.events } - -//------------------------------------------------------------------------------------------ -// JSON Encoding / Decoding -//------------------------------------------------------------------------------------------ - -// ReadRunOutput decodes a run output from the passed in JSON -func ReadRunOutput(data json.RawMessage) (flows.RunOutput, error) { - runOutput := &runOutput{} - err := json.Unmarshal(data, runOutput) - if err == nil { - // err = run.Validate() - } - return runOutput, err -} - -type outputEnvelope struct { - Runs []*flowRun `json:"runs"` - Events []*utils.TypedEnvelope `json:"events"` -} - -func (o *runOutput) UnmarshalJSON(data []byte) error { - var oe outputEnvelope - var err error - - err = json.Unmarshal(data, &oe) - if err != nil { - return err - } - - o.runs = make([]flows.FlowRun, len(oe.Runs)) - for i := range o.runs { - o.runs[i] = oe.Runs[i] - } - - o.events = make([]flows.Event, len(oe.Events)) - for i := range o.events { - o.events[i], err = events.EventFromEnvelope(oe.Events[i]) - if err != nil { - return err - } - } - return nil -} - -func (o *runOutput) MarshalJSON() ([]byte, error) { - var oe outputEnvelope - - oe.Events = make([]*utils.TypedEnvelope, len(o.events)) - for i, event := range o.events { - eventData, err := json.Marshal(event) - if err != nil { - return nil, err - } - oe.Events[i] = &utils.TypedEnvelope{Type: event.Type(), Data: eventData} - } - - oe.Runs = make([]*flowRun, len(o.runs)) - for i := range o.runs { - oe.Runs[i] = o.runs[i].(*flowRun) - } - - return json.Marshal(oe) -} diff --git a/flows/runs/run.go b/flows/runs/run.go index de6750661..dc5cc9845 100644 --- a/flows/runs/run.go +++ b/flows/runs/run.go @@ -36,7 +36,7 @@ type flowRun struct { parent flows.FlowRunReference child flows.FlowRunReference - output flows.RunOutput + session flows.Session path []flows.Step flowTranslations flows.FlowTranslations @@ -58,8 +58,8 @@ func (r *flowRun) Contact() *flows.Contact { return r.contact } // Hydrate prepares a deserialized run for executions func (r *flowRun) Hydrate(env flows.FlowEnvironment) error { // start with a fresh output if we don't have one - if r.output == nil { - r.ResetOutput() + if r.session == nil { + r.ResetSession() } // save off our environment @@ -115,13 +115,13 @@ func (r *flowRun) Context() flows.Context { return r.context } func (r *flowRun) Environment() flows.FlowEnvironment { return r.environment } func (r *flowRun) Results() flows.Results { return r.results } -func (r *flowRun) Output() flows.RunOutput { return r.output } -func (r *flowRun) SetOutput(output flows.RunOutput) { - r.output = output - r.output.AddRun(r) +func (r *flowRun) Session() flows.Session { return r.session } +func (r *flowRun) SetSession(session flows.Session) { + r.session = session + r.session.AddRun(r) } -func (r *flowRun) ResetOutput() { - r.SetOutput(newRunOutput()) +func (r *flowRun) ResetSession() { + r.SetSession(newSession()) } func (r *flowRun) IsComplete() bool { @@ -158,7 +158,7 @@ func (r *flowRun) AddEvent(s flows.Step, e flows.Event) { fs := s.(*step) fs.addEvent(e) - r.Output().AddEvent(e) + r.Session().AddEvent(e) r.setModifiedOn(now) } @@ -233,17 +233,17 @@ func NewRun(env flows.FlowEnvironment, flow flows.Flow, contact *flows.Contact, // create our new context r.context = NewContextForContact(contact, r) - // set our output + // set our session if parent != nil { parentRun := parent.(*flowRun) r.parent = newReferenceFromRun(parentRun) parentRun.child = newReferenceFromRun(r) - r.output = parent.Output() + r.session = parent.Session() } else { - r.output = newRunOutput() + r.session = newSession() } - r.output.AddRun(r) + r.session.AddRun(r) return r } diff --git a/flows/runs/session.go b/flows/runs/session.go new file mode 100644 index 000000000..80ce00bf7 --- /dev/null +++ b/flows/runs/session.go @@ -0,0 +1,112 @@ +package runs + +import ( + "encoding/json" + + "github.com/nyaruka/goflow/flows" + "github.com/nyaruka/goflow/flows/events" + "github.com/nyaruka/goflow/utils" +) + +type session struct { + runs []flows.FlowRun + events []flows.Event +} + +func newSession() *session { + session := session{} + return &session +} + +func (s *session) AddRun(run flows.FlowRun) { s.runs = append(s.runs, run) } +func (s *session) Runs() []flows.FlowRun { return s.runs } + +func (s *session) ActiveRun() flows.FlowRun { + var active flows.FlowRun + mostRecent := utils.ZeroTime + + for _, run := range s.runs { + // We are complete, therefore can't be active + if run.IsComplete() { + continue + } + + // We have a child, and it isn't complete, we can't be active + if run.Child() != nil && run.Child().Status() == flows.RunActive { + continue + } + + // this is more recent than our most recent flow + if run.ModifiedOn().After(mostRecent) { + active = run + mostRecent = run.ModifiedOn() + } + } + return active +} + +func (s *session) AddEvent(event flows.Event) { s.events = append(s.events, event) } +func (s *session) Events() []flows.Event { return s.events } + +//------------------------------------------------------------------------------------------ +// JSON Encoding / Decoding +//------------------------------------------------------------------------------------------ + +// ReadSession decodes a session from the passed in JSON +func ReadSession(data json.RawMessage) (flows.Session, error) { + session := &session{} + err := json.Unmarshal(data, session) + if err == nil { + err = utils.ValidateAll(session) + } + return session, err +} + +type sessionEnvelope struct { + Runs []*flowRun `json:"runs"` + Events []*utils.TypedEnvelope `json:"events"` +} + +func (s *session) UnmarshalJSON(data []byte) error { + var se sessionEnvelope + var err error + + err = json.Unmarshal(data, &se) + if err != nil { + return err + } + + s.runs = make([]flows.FlowRun, len(se.Runs)) + for i := range s.runs { + s.runs[i] = se.Runs[i] + } + + s.events = make([]flows.Event, len(se.Events)) + for i := range s.events { + s.events[i], err = events.EventFromEnvelope(se.Events[i]) + if err != nil { + return err + } + } + return nil +} + +func (s *session) MarshalJSON() ([]byte, error) { + var se sessionEnvelope + + se.Events = make([]*utils.TypedEnvelope, len(s.events)) + for i, event := range s.events { + eventData, err := json.Marshal(event) + if err != nil { + return nil, err + } + se.Events[i] = &utils.TypedEnvelope{Type: event.Type(), Data: eventData} + } + + se.Runs = make([]*flowRun, len(s.runs)) + for i := range s.runs { + se.Runs[i] = s.runs[i].(*flowRun) + } + + return json.Marshal(se) +} diff --git a/utils/json.go b/utils/json.go index 54dbb2832..2b17df0a7 100644 --- a/utils/json.go +++ b/utils/json.go @@ -1,17 +1,36 @@ package utils -import "github.com/buger/jsonparser" +import ( + "strconv" + + "github.com/buger/jsonparser" +) type JSONFragment []byte func (j JSONFragment) Default() interface{} { - return string(j) + return j } func (j JSONFragment) Resolve(key string) interface{} { + _, err := strconv.Atoi(key) + + // this is a numerical index, convert to jsonparser format + if err == nil { + jIdx := "[" + key + "]" + val, _, _, err := jsonparser.Get(j, jIdx) + if err == nil { + return JSONFragment(val) + } + } + val, _, _, err := jsonparser.Get(j, key) if err != nil { return err } return JSONFragment(val) } + +func (j JSONFragment) String() string { + return string(j) +} diff --git a/utils/json_test.go b/utils/json_test.go new file mode 100644 index 000000000..e5f6bdad7 --- /dev/null +++ b/utils/json_test.go @@ -0,0 +1,36 @@ +package utils + +import ( + "testing" +) + +func TestJSON(t *testing.T) { + var jsonTests = []struct { + JSON string + lookup string + expected string + }{ + {`["one", "two", "three"]`, "0", "one"}, + {`{"1": "one"}`, "1", "one"}, + {`{"arr": ["one", "two"]}`, "arr[1]", "two"}, + {`{"arr": ["one", "two"]}`, "arr.1", "two"}, + {`{"key": {"key2": "val2"}}`, "key.key2", "val2"}, + {`{"key": {"key-with-dash": "val2"}}`, `key["key-with-dash"]`, "val2"}, + {`{"key": {"key with space": "val2"}}`, `key["key with space"]`, "val2"}, + } + + env := NewDefaultEnvironment() + for _, test := range jsonTests { + fragment := JSONFragment(test.JSON) + value := ResolveVariable(env, fragment, test.lookup) + + valueStr, err := ToString(env, value) + if err != nil { + t.Errorf("Error getting string value for lookup: '%s' and JSON fragment:\n%s", test.lookup, test.JSON) + continue + } + if valueStr != test.expected { + t.Errorf("FExpected: '%s' Got: '%s' for lookup: '%s' and JSON fragment:\n%s", test.expected, valueStr, test.lookup, test.JSON) + } + } +} diff --git a/utils/resolver.go b/utils/resolver.go index 919799c8b..707de3a05 100644 --- a/utils/resolver.go +++ b/utils/resolver.go @@ -2,6 +2,7 @@ package utils import ( "strconv" + "strings" ) // VariableResolver defines the interface used by Excellent objects that can be indexed into @@ -74,16 +75,17 @@ func ResolveVariable(env Environment, variable interface{}, key string) interfac } // popNextVariable pops the next variable off our string: -// "foo.bar.baz" -> "foo", "bar.baz" -// "foo[0].bar" -> "foo", "[0].baz" -// "foo.0.bar" -> "foo", "0.baz" -// "[0].bar" -> "0", "bar" -func popNextVariable(key string) (string, string) { +// foo.bar.baz -> "foo", "bar.baz" +// foo[0].bar -> "foo", "[0].baz" +// foo.0.bar -> "foo", "0.baz" +// [0].bar -> "0", "bar" +// foo["my key"] -> "foo", "my key" +func popNextVariable(input string) (string, string) { var keyStart = 0 var keyEnd = -1 var restStart = -1 - for i, c := range key { + for i, c := range input { if i == 0 && c == '[' { keyStart++ } else if c == '[' { @@ -102,10 +104,13 @@ func popNextVariable(key string) (string, string) { } if keyEnd == -1 { - return key, "" + return input, "" } - return key[keyStart:keyEnd], key[restStart:] + key := strings.Trim(input[keyStart:keyEnd], "\"") + rest := input[restStart:] + + return key, rest } type mapResolver struct { diff --git a/vendor/github.com/buger/jsonparser/parser.go b/vendor/github.com/buger/jsonparser/parser.go index ca6d4bce1..1ba954d41 100644 --- a/vendor/github.com/buger/jsonparser/parser.go +++ b/vendor/github.com/buger/jsonparser/parser.go @@ -188,7 +188,12 @@ func searchKeys(data []byte, keys ...string) int { if curIdx == aIdx { valueFound = value valueOffset = offset + if dataType == String { + valueOffset = valueOffset - 2 + valueFound = data[i+valueOffset : i+valueOffset+len(value)+2] + } } + curIdx += 1 }) From 0cfc908a2a7022aca83a2db3c21dd8e89a24d19d Mon Sep 17 00:00:00 2001 From: Nic Pottier Date: Tue, 30 May 2017 11:03:58 -0500 Subject: [PATCH 2/2] make sure test server isn't already closed when running tests --- cmd/flowrunner/runner_test.go | 14 ++++++---- .../testdata/flows/all_actions_test.json | 28 ++++++------------- .../testdata/flows/subflow_test.json | 8 +++--- 3 files changed, 21 insertions(+), 29 deletions(-) diff --git a/cmd/flowrunner/runner_test.go b/cmd/flowrunner/runner_test.go index a3319184a..be6f579a1 100644 --- a/cmd/flowrunner/runner_test.go +++ b/cmd/flowrunner/runner_test.go @@ -153,7 +153,7 @@ func runFlow(env utils.Environment, flowFilename string, contactFilename string, } // set up a mock server for webhook actions -func startTestHTTPServer() { +func newTestHTTPServer() *httptest.Server { server := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { cmd := r.URL.Query().Get("cmd") defer r.Body.Close() @@ -177,14 +177,18 @@ func startTestHTTPServer() { log.Fatal(err) } server.Listener = l - server.Start() - defer server.Close() - serverURL = server.URL + return server } func TestFlows(t *testing.T) { env := utils.NewDefaultEnvironment() - startTestHTTPServer() + + server := newTestHTTPServer() + server.Start() + defer server.Close() + + // save away our server URL so we can rewrite our URLs + serverURL = server.URL for _, test := range flowTests { testJSON, err := readFile("flows/", test.output) diff --git a/cmd/flowrunner/testdata/flows/all_actions_test.json b/cmd/flowrunner/testdata/flows/all_actions_test.json index 834791929..4573ec59b 100644 --- a/cmd/flowrunner/testdata/flows/all_actions_test.json +++ b/cmd/flowrunner/testdata/flows/all_actions_test.json @@ -89,27 +89,21 @@ "name": "Gender", "value": "Male" }, - { - "type": "error", - "created_on": "2000-01-01T00:00:00.000000000-00:00", - "step": "", - "text": "Get http://127.0.0.1:49999/?cmd=success: dial tcp 127.0.0.1:49999: getsockopt: connection refused" - }, { "type": "webhook", "created_on": "2000-01-01T00:00:00.000000000-00:00", "step": "", "url": "http://127.0.0.1:49999/?cmd=success", - "status": "F", - "status_code": 0, + "status": "S", + "status_code": 200, "request": "GET /?cmd=success HTTP/1.1\r\nHost: 127.0.0.1:49999\r\nUser-Agent: Go-http-client/1.1\r\nAccept-Encoding: gzip\r\n\r\n", - "response": "" + "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: \r\n\r\n{ \"ok\": \"true\" }" } ] } ], "status": "C", - "child": "c458a838-efa8-42d7-a655-3e4784bcbf5b", + "child": "c9943827-b80b-4e22-8e46-6be598049f4e", "results": { "gender": { "node": "node1", @@ -132,7 +126,7 @@ "contact": "contact1-test-4b7f-a34b-e37e31e86451", "path": [], "status": "C", - "parent": "2d17c317-93c1-497d-879d-3ed56cf1b0d4", + "parent": "746ab857-7354-4aa9-8b16-a9b62d23c4aa", "results": {}, "created_on": "2000-01-01T00:00:00.000000000-00:00", "modified_on": "2000-01-01T00:00:00.000000000-00:00", @@ -216,21 +210,15 @@ "name": "Gender", "value": "Male" }, - { - "type": "error", - "created_on": "2000-01-01T00:00:00.000000000-00:00", - "step": "", - "text": "Get http://127.0.0.1:49999/?cmd=success: dial tcp 127.0.0.1:49999: getsockopt: connection refused" - }, { "type": "webhook", "created_on": "2000-01-01T00:00:00.000000000-00:00", "step": "", "url": "http://127.0.0.1:49999/?cmd=success", - "status": "F", - "status_code": 0, + "status": "S", + "status_code": 200, "request": "GET /?cmd=success HTTP/1.1\r\nHost: 127.0.0.1:49999\r\nUser-Agent: Go-http-client/1.1\r\nAccept-Encoding: gzip\r\n\r\n", - "response": "" + "response": "HTTP/1.1 200 OK\r\nContent-Length: 16\r\nContent-Type: text/plain; charset=utf-8\r\nDate: \r\n\r\n{ \"ok\": \"true\" }" } ] } diff --git a/cmd/flowrunner/testdata/flows/subflow_test.json b/cmd/flowrunner/testdata/flows/subflow_test.json index 1ffe37ce1..39bc31072 100644 --- a/cmd/flowrunner/testdata/flows/subflow_test.json +++ b/cmd/flowrunner/testdata/flows/subflow_test.json @@ -53,7 +53,7 @@ "type": "flow", "flow": "flow2" }, - "child": "5a08f736-52ac-4934-9d6c-71e3d3929d17", + "child": "ec00435a-38a7-499a-9bf3-0ae8796d5e06", "results": {}, "created_on": "2000-01-01T00:00:00.000000000-00:00", "modified_on": "2000-01-01T00:00:00.000000000-00:00", @@ -92,7 +92,7 @@ "wait": { "type": "msg" }, - "parent": "c2bb899c-cccd-4e40-9ada-423a309c0c5f", + "parent": "b891f53a-9f65-400b-9d48-7fcbfba55e5e", "results": {}, "created_on": "2000-01-01T00:00:00.000000000-00:00", "modified_on": "2000-01-01T00:00:00.000000000-00:00", @@ -213,7 +213,7 @@ "contact": "contact1-test-4b7f-a34b-e37e31e86451", "text": "Ryan Lewis" }, - "parent": "c2bb899c-cccd-4e40-9ada-423a309c0c5f", + "parent": "b891f53a-9f65-400b-9d48-7fcbfba55e5e", "results": { "name": { "node": "flow2-node1", @@ -290,7 +290,7 @@ } ], "status": "C", - "child": "5a08f736-52ac-4934-9d6c-71e3d3929d17", + "child": "ec00435a-38a7-499a-9bf3-0ae8796d5e06", "results": {}, "created_on": "2000-01-01T00:00:00.000000000-00:00", "modified_on": "2000-01-01T00:00:00.000000000-00:00",