From cb791f193458412532d31296a99cd3a3868938f2 Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Thu, 2 Nov 2023 15:11:24 -0500 Subject: [PATCH 1/3] Rework flow start tasks to not require flow_type --- core/hooks/insert_start.go | 2 +- core/models/schedules_test.go | 1 - core/models/starts.go | 15 ++++----------- core/models/starts_test.go | 5 ++--- core/runner/runner_test.go | 10 +++++----- core/tasks/handler/contact_tasks.go | 4 ++-- core/tasks/ivr/cron_test.go | 2 +- core/tasks/ivr/start_ivr_flow_batch_test.go | 2 +- core/tasks/starts/start_flow.go | 4 ++-- core/tasks/starts/start_flow_test.go | 2 +- web/ivr/ivr_test.go | 4 ++-- 11 files changed, 21 insertions(+), 30 deletions(-) diff --git a/core/hooks/insert_start.go b/core/hooks/insert_start.go index cec720f08..0cd33ecf5 100644 --- a/core/hooks/insert_start.go +++ b/core/hooks/insert_start.go @@ -57,7 +57,7 @@ func (h *insertStartHook) Apply(ctx context.Context, rt *runtime.Runtime, tx *sq } // create our start - start := models.NewFlowStart(oa.OrgID(), models.StartTypeFlowAction, flow.FlowType(), flow.ID()). + start := models.NewFlowStart(oa.OrgID(), models.StartTypeFlowAction, flow.ID()). WithGroupIDs(groupIDs). WithContactIDs(contactIDs). WithURNs(event.URNs). diff --git a/core/models/schedules_test.go b/core/models/schedules_test.go index c0e430362..8c6da94ad 100644 --- a/core/models/schedules_test.go +++ b/core/models/schedules_test.go @@ -47,7 +47,6 @@ func TestGetExpired(t *testing.T) { assert.Nil(t, schedules[1].Broadcast()) start := schedules[1].FlowStart() assert.NotNil(t, start) - assert.Equal(t, models.FlowTypeMessaging, start.FlowType) assert.Equal(t, testdata.Favorites.ID, start.FlowID) assert.Equal(t, testdata.Org1.ID, start.OrgID) assert.Equal(t, []models.ContactID{testdata.Cathy.ID, testdata.George.ID}, start.ContactIDs) diff --git a/core/models/starts.go b/core/models/starts.go index 5f09030ed..f71939f27 100644 --- a/core/models/starts.go +++ b/core/models/starts.go @@ -81,7 +81,6 @@ type FlowStart struct { OrgID OrgID `json:"org_id" db:"org_id"` CreatedByID UserID `json:"created_by_id" db:"created_by_id"` FlowID FlowID `json:"flow_id" db:"flow_id"` - FlowType FlowType `json:"flow_type"` URNs []urns.URN `json:"urns,omitempty"` ContactIDs []ContactID `json:"contact_ids,omitempty"` @@ -97,14 +96,8 @@ type FlowStart struct { } // NewFlowStart creates a new flow start objects for the passed in parameters -func NewFlowStart(orgID OrgID, startType StartType, flowType FlowType, flowID FlowID) *FlowStart { - return &FlowStart{ - UUID: uuids.New(), - OrgID: orgID, - StartType: startType, - FlowType: flowType, - FlowID: flowID, - } +func NewFlowStart(orgID OrgID, startType StartType, flowID FlowID) *FlowStart { + return &FlowStart{UUID: uuids.New(), OrgID: orgID, StartType: startType, FlowID: flowID} } func (s *FlowStart) WithGroupIDs(groupIDs []GroupID) *FlowStart { @@ -254,13 +247,13 @@ const sqlInsertStartGroup = ` INSERT INTO flows_flowstart_groups(flowstart_id, contactgroup_id) VALUES(:flowstart_id, :contactgroup_id)` // CreateBatch creates a batch for this start using the passed in contact ids -func (s *FlowStart) CreateBatch(contactIDs []ContactID, last bool, totalContacts int) *FlowStartBatch { +func (s *FlowStart) CreateBatch(contactIDs []ContactID, flowType FlowType, last bool, totalContacts int) *FlowStartBatch { return &FlowStartBatch{ StartID: s.ID, StartType: s.StartType, OrgID: s.OrgID, FlowID: s.FlowID, - FlowType: s.FlowType, + FlowType: flowType, ContactIDs: contactIDs, ParentSummary: s.ParentSummary, SessionHistory: s.SessionHistory, diff --git a/core/models/starts_test.go b/core/models/starts_test.go index fef073086..deb0064fe 100644 --- a/core/models/starts_test.go +++ b/core/models/starts_test.go @@ -51,7 +51,6 @@ func TestStarts(t *testing.T) { assert.Equal(t, testdata.Org1.ID, start.OrgID) assert.Equal(t, testdata.Admin.ID, start.CreatedByID) assert.Equal(t, testdata.SingleMessage.ID, start.FlowID) - assert.Equal(t, models.FlowTypeMessaging, start.FlowType) assert.Equal(t, null.NullString, start.Query) assert.False(t, start.Exclusions.StartedPreviously) assert.False(t, start.Exclusions.InAFlow) @@ -69,7 +68,7 @@ func TestStarts(t *testing.T) { assertdb.Query(t, rt.DB, `SELECT count(*) FROM flows_flowstart WHERE id = $1 AND status = 'S' AND contact_count = 2`, startID).Returns(1) assertdb.Query(t, rt.DB, `SELECT count(*) FROM flows_flowstart_contacts WHERE flowstart_id = $1`, startID).Returns(2) - batch := start.CreateBatch([]models.ContactID{testdata.Cathy.ID, testdata.Bob.ID}, false, 3) + batch := start.CreateBatch([]models.ContactID{testdata.Cathy.ID, testdata.Bob.ID}, models.FlowTypeMessaging, false, 3) assert.Equal(t, startID, batch.StartID) assert.Equal(t, models.StartTypeManual, batch.StartType) assert.Equal(t, testdata.SingleMessage.ID, batch.FlowID) @@ -99,7 +98,7 @@ func TestStartsBuilding(t *testing.T) { uuids.SetGenerator(uuids.NewSeededGenerator(12345)) defer uuids.SetGenerator(uuids.DefaultGenerator) - start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeManual, models.FlowTypeMessaging, testdata.Favorites.ID). + start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeManual, testdata.Favorites.ID). WithGroupIDs([]models.GroupID{testdata.DoctorsGroup.ID}). WithExcludeGroupIDs([]models.GroupID{testdata.TestersGroup.ID}). WithContactIDs([]models.ContactID{testdata.Cathy.ID, testdata.Bob.ID}). diff --git a/core/runner/runner_test.go b/core/runner/runner_test.go index 0ede8d8cd..05c8ccddc 100644 --- a/core/runner/runner_test.go +++ b/core/runner/runner_test.go @@ -29,13 +29,13 @@ func TestStartFlowBatch(t *testing.T) { defer testsuite.Reset(testsuite.ResetAll) // create a start object - start1 := models.NewFlowStart(models.OrgID(1), models.StartTypeManual, models.FlowTypeMessaging, testdata.SingleMessage.ID). + start1 := models.NewFlowStart(models.OrgID(1), models.StartTypeManual, testdata.SingleMessage.ID). WithContactIDs([]models.ContactID{testdata.Cathy.ID, testdata.Bob.ID, testdata.George.ID, testdata.Alexandria.ID}) err := models.InsertFlowStarts(ctx, rt.DB, []*models.FlowStart{start1}) require.NoError(t, err) - batch1 := start1.CreateBatch([]models.ContactID{testdata.Cathy.ID, testdata.Bob.ID}, false, 4) - batch2 := start1.CreateBatch([]models.ContactID{testdata.George.ID, testdata.Alexandria.ID}, true, 4) + batch1 := start1.CreateBatch([]models.ContactID{testdata.Cathy.ID, testdata.Bob.ID}, models.FlowTypeBackground, false, 4) + batch2 := start1.CreateBatch([]models.ContactID{testdata.George.ID, testdata.Alexandria.ID}, models.FlowTypeBackground, true, 4) // start the first batch... sessions, err := runner.StartFlowBatch(ctx, rt, batch1) @@ -65,10 +65,10 @@ func TestStartFlowBatch(t *testing.T) { // create a start object with params testdata.InsertFlowStart(rt, testdata.Org1, testdata.IncomingExtraFlow, nil) - start2 := models.NewFlowStart(models.OrgID(1), models.StartTypeManual, models.FlowTypeMessaging, testdata.IncomingExtraFlow.ID). + start2 := models.NewFlowStart(models.OrgID(1), models.StartTypeManual, testdata.IncomingExtraFlow.ID). WithContactIDs([]models.ContactID{testdata.Cathy.ID}). WithParams([]byte(`{"name":"Fred", "age":33}`)) - batch3 := start2.CreateBatch([]models.ContactID{testdata.Cathy.ID}, true, 1) + batch3 := start2.CreateBatch([]models.ContactID{testdata.Cathy.ID}, models.FlowTypeMessaging, true, 1) sessions, err = runner.StartFlowBatch(ctx, rt, batch3) require.NoError(t, err) diff --git a/core/tasks/handler/contact_tasks.go b/core/tasks/handler/contact_tasks.go index b1f1d5942..475194457 100644 --- a/core/tasks/handler/contact_tasks.go +++ b/core/tasks/handler/contact_tasks.go @@ -725,7 +725,7 @@ func TriggerIVRFlow(ctx context.Context, rt *runtime.Runtime, orgID models.OrgID tx, _ := rt.DB.BeginTxx(ctx, nil) // create and insert our flow start - start := models.NewFlowStart(orgID, models.StartTypeTrigger, models.FlowTypeVoice, flowID).WithContactIDs(contactIDs) + start := models.NewFlowStart(orgID, models.StartTypeTrigger, flowID).WithContactIDs(contactIDs) err := models.InsertFlowStarts(ctx, tx, []*models.FlowStart{start}) if err != nil { tx.Rollback() @@ -749,7 +749,7 @@ func TriggerIVRFlow(ctx context.Context, rt *runtime.Runtime, orgID models.OrgID } // create our batch of all our contacts - task := &ivr.StartIVRFlowBatchTask{FlowStartBatch: start.CreateBatch(contactIDs, true, len(contactIDs))} + task := &ivr.StartIVRFlowBatchTask{FlowStartBatch: start.CreateBatch(contactIDs, models.FlowTypeVoice, true, len(contactIDs))} // queue this to our ivr starter, it will take care of creating the calls then calling back in rc := rt.RP.Get() diff --git a/core/tasks/ivr/cron_test.go b/core/tasks/ivr/cron_test.go index 50d3be89c..4d3e879e3 100644 --- a/core/tasks/ivr/cron_test.go +++ b/core/tasks/ivr/cron_test.go @@ -30,7 +30,7 @@ func TestRetries(t *testing.T) { rt.DB.MustExec(`UPDATE channels_channel SET channel_type = 'ZZ', config = '{"max_concurrent_events": 1}' WHERE id = $1`, testdata.TwilioChannel.ID) // create a flow start for cathy - start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeTrigger, models.FlowTypeVoice, testdata.IVRFlow.ID). + start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeTrigger, testdata.IVRFlow.ID). WithContactIDs([]models.ContactID{testdata.Cathy.ID}) err := tasks.Queue(rc, queue.BatchQueue, testdata.Org1.ID, &starts.StartFlowTask{FlowStart: start}, queue.DefaultPriority) diff --git a/core/tasks/ivr/start_ivr_flow_batch_test.go b/core/tasks/ivr/start_ivr_flow_batch_test.go index d3a68331e..3a43e0384 100644 --- a/core/tasks/ivr/start_ivr_flow_batch_test.go +++ b/core/tasks/ivr/start_ivr_flow_batch_test.go @@ -34,7 +34,7 @@ func TestIVR(t *testing.T) { rt.DB.MustExec(`UPDATE channels_channel SET channel_type = 'ZZ', config = '{"max_concurrent_events": 1}' WHERE id = $1`, testdata.TwilioChannel.ID) // create a flow start for cathy - start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeTrigger, models.FlowTypeVoice, testdata.IVRFlow.ID). + start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeTrigger, testdata.IVRFlow.ID). WithContactIDs([]models.ContactID{testdata.Cathy.ID}) service.callError = errors.Errorf("unable to create call") diff --git a/core/tasks/starts/start_flow.go b/core/tasks/starts/start_flow.go index 2a27ceb33..283590807 100644 --- a/core/tasks/starts/start_flow.go +++ b/core/tasks/starts/start_flow.go @@ -133,11 +133,11 @@ func createFlowStartBatches(ctx context.Context, rt *runtime.Runtime, start *mod for i, idBatch := range idBatches { isLast := (i == len(idBatches)-1) - batch := start.CreateBatch(idBatch, isLast, len(contactIDs)) + batch := start.CreateBatch(idBatch, flow.FlowType(), isLast, len(contactIDs)) // task is different if we are an IVR flow var batchTask tasks.Task - if start.FlowType == models.FlowTypeVoice { + if flow.FlowType() == models.FlowTypeVoice { batchTask = &ivr.StartIVRFlowBatchTask{FlowStartBatch: batch} } else { batchTask = &StartFlowBatchTask{FlowStartBatch: batch} diff --git a/core/tasks/starts/start_flow_test.go b/core/tasks/starts/start_flow_test.go index dcd066dba..fdd17e6e0 100644 --- a/core/tasks/starts/start_flow_test.go +++ b/core/tasks/starts/start_flow_test.go @@ -217,7 +217,7 @@ func TestStartFlowTask(t *testing.T) { testsuite.ReindexElastic(ctx) // handle our start task - start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeManual, models.FlowTypeMessaging, tc.flowID). + start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeManual, tc.flowID). WithGroupIDs(tc.groupIDs). WithExcludeGroupIDs(tc.excludeGroupIDs). WithContactIDs(tc.contactIDs). diff --git a/web/ivr/ivr_test.go b/web/ivr/ivr_test.go index dd55db684..01c47b5b7 100644 --- a/web/ivr/ivr_test.go +++ b/web/ivr/ivr_test.go @@ -94,7 +94,7 @@ func TestTwilioIVR(t *testing.T) { }, "results": {} }`) - start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeTrigger, models.FlowTypeVoice, testdata.IVRFlow.ID). + start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeTrigger, testdata.IVRFlow.ID). WithContactIDs([]models.ContactID{testdata.Cathy.ID, testdata.Bob.ID, testdata.George.ID}). WithParentSummary(parentSummary) @@ -397,7 +397,7 @@ func TestVonageIVR(t *testing.T) { vonage.IgnoreSignatures = true // create a flow start for cathy and george - start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeTrigger, models.FlowTypeVoice, testdata.IVRFlow.ID). + start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeTrigger, testdata.IVRFlow.ID). WithContactIDs([]models.ContactID{testdata.Cathy.ID, testdata.George.ID}). WithParams(json.RawMessage(`{"ref_id":"123"}`)) From bc374bfd410636287a435439156224ed9060941e Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Thu, 2 Nov 2023 15:12:27 -0500 Subject: [PATCH 2/3] go mod tidy --- go.mod | 1 - 1 file changed, 1 deletion(-) diff --git a/go.mod b/go.mod index fc0332554..0653a4f63 100644 --- a/go.mod +++ b/go.mod @@ -19,7 +19,6 @@ require ( github.com/nyaruka/ezconf v0.2.1 github.com/nyaruka/gocommon v1.42.2 github.com/nyaruka/goflow v0.197.2 - github.com/nyaruka/logrus_sentry v0.8.2-0.20190129182604-c2962b80ba7d github.com/nyaruka/null/v3 v3.0.0 github.com/nyaruka/redisx v0.5.0 github.com/nyaruka/rp-indexer/v8 v8.3.0 From c8a4ee0c28cb6771d198791d4e6f7b5937710906 Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Thu, 2 Nov 2023 15:45:48 -0500 Subject: [PATCH 3/3] Fix test --- core/models/starts_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/core/models/starts_test.go b/core/models/starts_test.go index deb0064fe..3202cd8f7 100644 --- a/core/models/starts_test.go +++ b/core/models/starts_test.go @@ -121,7 +121,6 @@ func TestStartsBuilding(t *testing.T) { "started_previously": false }, "flow_id": %d, - "flow_type": "M", "group_ids": [%d], "org_id": 1, "params": {