Skip to content

Commit

Permalink
Merge branch 'main' into schedule_rework_2
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed Nov 2, 2023
2 parents 78ecff4 + 46d6b2e commit a03f874
Show file tree
Hide file tree
Showing 13 changed files with 23 additions and 36 deletions.
2 changes: 1 addition & 1 deletion core/hooks/insert_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (h *insertStartHook) Apply(ctx context.Context, rt *runtime.Runtime, tx *sq
}

// create our start
start := models.NewFlowStart(oa.OrgID(), models.StartTypeFlowAction, flow.FlowType(), flow.ID()).
start := models.NewFlowStart(oa.OrgID(), models.StartTypeFlowAction, flow.ID()).
WithGroupIDs(groupIDs).
WithContactIDs(contactIDs).
WithURNs(event.URNs).
Expand Down
1 change: 1 addition & 0 deletions core/models/schedules_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func TestGetExpired(t *testing.T) {

assert.Equal(t, s2, schedules[1].ID())
assert.Nil(t, schedules[1].Broadcast())

trigger := schedules[1].Trigger()
assert.NotNil(t, trigger)
assert.Equal(t, testdata.Favorites.ID, trigger.FlowID())
Expand Down
15 changes: 4 additions & 11 deletions core/models/starts.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ type FlowStart struct {
OrgID OrgID `json:"org_id" db:"org_id"`
CreatedByID UserID `json:"created_by_id" db:"created_by_id"`
FlowID FlowID `json:"flow_id" db:"flow_id"`
FlowType FlowType `json:"flow_type"`

URNs []urns.URN `json:"urns,omitempty"`
ContactIDs []ContactID `json:"contact_ids,omitempty"`
Expand All @@ -97,14 +96,8 @@ type FlowStart struct {
}

// NewFlowStart creates a new flow start objects for the passed in parameters
func NewFlowStart(orgID OrgID, startType StartType, flowType FlowType, flowID FlowID) *FlowStart {
return &FlowStart{
UUID: uuids.New(),
OrgID: orgID,
StartType: startType,
FlowType: flowType,
FlowID: flowID,
}
func NewFlowStart(orgID OrgID, startType StartType, flowID FlowID) *FlowStart {
return &FlowStart{UUID: uuids.New(), OrgID: orgID, StartType: startType, FlowID: flowID}
}

func (s *FlowStart) WithGroupIDs(groupIDs []GroupID) *FlowStart {
Expand Down Expand Up @@ -254,13 +247,13 @@ const sqlInsertStartGroup = `
INSERT INTO flows_flowstart_groups(flowstart_id, contactgroup_id) VALUES(:flowstart_id, :contactgroup_id)`

// CreateBatch creates a batch for this start using the passed in contact ids
func (s *FlowStart) CreateBatch(contactIDs []ContactID, last bool, totalContacts int) *FlowStartBatch {
func (s *FlowStart) CreateBatch(contactIDs []ContactID, flowType FlowType, last bool, totalContacts int) *FlowStartBatch {
return &FlowStartBatch{
StartID: s.ID,
StartType: s.StartType,
OrgID: s.OrgID,
FlowID: s.FlowID,
FlowType: s.FlowType,
FlowType: flowType,
ContactIDs: contactIDs,
ParentSummary: s.ParentSummary,
SessionHistory: s.SessionHistory,
Expand Down
6 changes: 2 additions & 4 deletions core/models/starts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ func TestStarts(t *testing.T) {
assert.Equal(t, testdata.Org1.ID, start.OrgID)
assert.Equal(t, testdata.Admin.ID, start.CreatedByID)
assert.Equal(t, testdata.SingleMessage.ID, start.FlowID)
assert.Equal(t, models.FlowTypeMessaging, start.FlowType)
assert.Equal(t, null.NullString, start.Query)
assert.False(t, start.Exclusions.StartedPreviously)
assert.False(t, start.Exclusions.InAFlow)
Expand All @@ -69,7 +68,7 @@ func TestStarts(t *testing.T) {
assertdb.Query(t, rt.DB, `SELECT count(*) FROM flows_flowstart WHERE id = $1 AND status = 'S' AND contact_count = 2`, startID).Returns(1)
assertdb.Query(t, rt.DB, `SELECT count(*) FROM flows_flowstart_contacts WHERE flowstart_id = $1`, startID).Returns(2)

batch := start.CreateBatch([]models.ContactID{testdata.Cathy.ID, testdata.Bob.ID}, false, 3)
batch := start.CreateBatch([]models.ContactID{testdata.Cathy.ID, testdata.Bob.ID}, models.FlowTypeMessaging, false, 3)
assert.Equal(t, startID, batch.StartID)
assert.Equal(t, models.StartTypeManual, batch.StartType)
assert.Equal(t, testdata.SingleMessage.ID, batch.FlowID)
Expand Down Expand Up @@ -99,7 +98,7 @@ func TestStartsBuilding(t *testing.T) {
uuids.SetGenerator(uuids.NewSeededGenerator(12345))
defer uuids.SetGenerator(uuids.DefaultGenerator)

start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeManual, models.FlowTypeMessaging, testdata.Favorites.ID).
start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeManual, testdata.Favorites.ID).
WithGroupIDs([]models.GroupID{testdata.DoctorsGroup.ID}).
WithExcludeGroupIDs([]models.GroupID{testdata.TestersGroup.ID}).
WithContactIDs([]models.ContactID{testdata.Cathy.ID, testdata.Bob.ID}).
Expand All @@ -122,7 +121,6 @@ func TestStartsBuilding(t *testing.T) {
"started_previously": false
},
"flow_id": %d,
"flow_type": "M",
"group_ids": [%d],
"org_id": 1,
"params": {
Expand Down
6 changes: 1 addition & 5 deletions core/models/triggers.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/lib/pq"
"github.com/nyaruka/gocommon/dbutil"
"github.com/nyaruka/gocommon/uuids"
"github.com/nyaruka/goflow/envs"
"github.com/nyaruka/goflow/flows"
"github.com/nyaruka/goflow/flows/triggers"
Expand Down Expand Up @@ -89,13 +88,10 @@ func (t *Trigger) UnmarshalJSON(b []byte) error { return json.Unmarshal(b, &t.t)

// Start generates an insertable flow start for scheduled trigger
func (t *Trigger) FlowStart() *FlowStart {
// TODO remove flow type from start task payload so we don't have to know it here
s := NewFlowStart(t.t.OrgID, StartTypeTrigger, FlowTypeMessaging, t.t.FlowID).
return NewFlowStart(t.t.OrgID, StartTypeTrigger, t.t.FlowID).
WithContactIDs(t.t.ContactIDs).
WithGroupIDs(t.t.IncludeGroupIDs).
WithExcludeGroupIDs(t.t.ExcludeGroupIDs)
s.UUID = uuids.New()
return s
}

// loadTriggers loads all non-schedule triggers for the passed in org
Expand Down
10 changes: 5 additions & 5 deletions core/runner/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ func TestStartFlowBatch(t *testing.T) {
defer testsuite.Reset(testsuite.ResetAll)

// create a start object
start1 := models.NewFlowStart(models.OrgID(1), models.StartTypeManual, models.FlowTypeMessaging, testdata.SingleMessage.ID).
start1 := models.NewFlowStart(models.OrgID(1), models.StartTypeManual, testdata.SingleMessage.ID).
WithContactIDs([]models.ContactID{testdata.Cathy.ID, testdata.Bob.ID, testdata.George.ID, testdata.Alexandria.ID})
err := models.InsertFlowStarts(ctx, rt.DB, []*models.FlowStart{start1})
require.NoError(t, err)

batch1 := start1.CreateBatch([]models.ContactID{testdata.Cathy.ID, testdata.Bob.ID}, false, 4)
batch2 := start1.CreateBatch([]models.ContactID{testdata.George.ID, testdata.Alexandria.ID}, true, 4)
batch1 := start1.CreateBatch([]models.ContactID{testdata.Cathy.ID, testdata.Bob.ID}, models.FlowTypeBackground, false, 4)
batch2 := start1.CreateBatch([]models.ContactID{testdata.George.ID, testdata.Alexandria.ID}, models.FlowTypeBackground, true, 4)

// start the first batch...
sessions, err := runner.StartFlowBatch(ctx, rt, batch1)
Expand Down Expand Up @@ -65,10 +65,10 @@ func TestStartFlowBatch(t *testing.T) {

// create a start object with params
testdata.InsertFlowStart(rt, testdata.Org1, testdata.IncomingExtraFlow, nil)
start2 := models.NewFlowStart(models.OrgID(1), models.StartTypeManual, models.FlowTypeMessaging, testdata.IncomingExtraFlow.ID).
start2 := models.NewFlowStart(models.OrgID(1), models.StartTypeManual, testdata.IncomingExtraFlow.ID).
WithContactIDs([]models.ContactID{testdata.Cathy.ID}).
WithParams([]byte(`{"name":"Fred", "age":33}`))
batch3 := start2.CreateBatch([]models.ContactID{testdata.Cathy.ID}, true, 1)
batch3 := start2.CreateBatch([]models.ContactID{testdata.Cathy.ID}, models.FlowTypeMessaging, true, 1)

sessions, err = runner.StartFlowBatch(ctx, rt, batch3)
require.NoError(t, err)
Expand Down
4 changes: 2 additions & 2 deletions core/tasks/handler/contact_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -725,7 +725,7 @@ func TriggerIVRFlow(ctx context.Context, rt *runtime.Runtime, orgID models.OrgID
tx, _ := rt.DB.BeginTxx(ctx, nil)

// create and insert our flow start
start := models.NewFlowStart(orgID, models.StartTypeTrigger, models.FlowTypeVoice, flowID).WithContactIDs(contactIDs)
start := models.NewFlowStart(orgID, models.StartTypeTrigger, flowID).WithContactIDs(contactIDs)
err := models.InsertFlowStarts(ctx, tx, []*models.FlowStart{start})
if err != nil {
tx.Rollback()
Expand All @@ -749,7 +749,7 @@ func TriggerIVRFlow(ctx context.Context, rt *runtime.Runtime, orgID models.OrgID
}

// create our batch of all our contacts
task := &ivr.StartIVRFlowBatchTask{FlowStartBatch: start.CreateBatch(contactIDs, true, len(contactIDs))}
task := &ivr.StartIVRFlowBatchTask{FlowStartBatch: start.CreateBatch(contactIDs, models.FlowTypeVoice, true, len(contactIDs))}

// queue this to our ivr starter, it will take care of creating the calls then calling back in
rc := rt.RP.Get()
Expand Down
2 changes: 1 addition & 1 deletion core/tasks/ivr/cron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestRetries(t *testing.T) {
rt.DB.MustExec(`UPDATE channels_channel SET channel_type = 'ZZ', config = '{"max_concurrent_events": 1}' WHERE id = $1`, testdata.TwilioChannel.ID)

// create a flow start for cathy
start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeTrigger, models.FlowTypeVoice, testdata.IVRFlow.ID).
start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeTrigger, testdata.IVRFlow.ID).
WithContactIDs([]models.ContactID{testdata.Cathy.ID})

err := tasks.Queue(rc, queue.BatchQueue, testdata.Org1.ID, &starts.StartFlowTask{FlowStart: start}, queue.DefaultPriority)
Expand Down
2 changes: 1 addition & 1 deletion core/tasks/ivr/start_ivr_flow_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestIVR(t *testing.T) {
rt.DB.MustExec(`UPDATE channels_channel SET channel_type = 'ZZ', config = '{"max_concurrent_events": 1}' WHERE id = $1`, testdata.TwilioChannel.ID)

// create a flow start for cathy
start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeTrigger, models.FlowTypeVoice, testdata.IVRFlow.ID).
start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeTrigger, testdata.IVRFlow.ID).
WithContactIDs([]models.ContactID{testdata.Cathy.ID})

service.callError = errors.Errorf("unable to create call")
Expand Down
4 changes: 2 additions & 2 deletions core/tasks/starts/start_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,11 @@ func createFlowStartBatches(ctx context.Context, rt *runtime.Runtime, start *mod
for i, idBatch := range idBatches {
isLast := (i == len(idBatches)-1)

batch := start.CreateBatch(idBatch, isLast, len(contactIDs))
batch := start.CreateBatch(idBatch, flow.FlowType(), isLast, len(contactIDs))

// task is different if we are an IVR flow
var batchTask tasks.Task
if start.FlowType == models.FlowTypeVoice {
if flow.FlowType() == models.FlowTypeVoice {
batchTask = &ivr.StartIVRFlowBatchTask{FlowStartBatch: batch}
} else {
batchTask = &StartFlowBatchTask{FlowStartBatch: batch}
Expand Down
2 changes: 1 addition & 1 deletion core/tasks/starts/start_flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func TestStartFlowTask(t *testing.T) {
testsuite.ReindexElastic(ctx)

// handle our start task
start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeManual, models.FlowTypeMessaging, tc.flowID).
start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeManual, tc.flowID).
WithGroupIDs(tc.groupIDs).
WithExcludeGroupIDs(tc.excludeGroupIDs).
WithContactIDs(tc.contactIDs).
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ require (
github.com/nyaruka/ezconf v0.2.1
github.com/nyaruka/gocommon v1.42.2
github.com/nyaruka/goflow v0.197.2
github.com/nyaruka/logrus_sentry v0.8.2-0.20190129182604-c2962b80ba7d
github.com/nyaruka/null/v3 v3.0.0
github.com/nyaruka/redisx v0.5.0
github.com/nyaruka/rp-indexer/v8 v8.3.0
Expand Down
4 changes: 2 additions & 2 deletions web/ivr/ivr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func TestTwilioIVR(t *testing.T) {
},
"results": {}
}`)
start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeTrigger, models.FlowTypeVoice, testdata.IVRFlow.ID).
start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeTrigger, testdata.IVRFlow.ID).
WithContactIDs([]models.ContactID{testdata.Cathy.ID, testdata.Bob.ID, testdata.George.ID}).
WithParentSummary(parentSummary)

Expand Down Expand Up @@ -397,7 +397,7 @@ func TestVonageIVR(t *testing.T) {
vonage.IgnoreSignatures = true

// create a flow start for cathy and george
start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeTrigger, models.FlowTypeVoice, testdata.IVRFlow.ID).
start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeTrigger, testdata.IVRFlow.ID).
WithContactIDs([]models.ContactID{testdata.Cathy.ID, testdata.George.ID}).
WithParams(json.RawMessage(`{"ref_id":"123"}`))

Expand Down

0 comments on commit a03f874

Please sign in to comment.