Skip to content

Commit

Permalink
Merge branch 'main' into no_more_ticketers
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed Oct 17, 2023
2 parents 728f681 + 92f4d6a commit 6ccc961
Show file tree
Hide file tree
Showing 10 changed files with 48 additions and 251 deletions.
20 changes: 10 additions & 10 deletions core/ivr/ivr.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"log/slog"
"net/http"
"net/url"
"path"
Expand All @@ -27,7 +28,6 @@ import (
"github.com/nyaruka/mailroom/runtime"
"github.com/nyaruka/null/v3"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

type CallID string
Expand Down Expand Up @@ -132,7 +132,7 @@ func HangupCall(ctx context.Context, rt *runtime.Runtime, call *models.Call) (*m
}

if err := call.AttachLog(ctx, rt.DB, clog); err != nil {
logrus.WithError(err).Error("error attaching ivr channel log")
slog.Error("error attaching ivr channel log", "error", err)
}

return clog, err
Expand Down Expand Up @@ -199,7 +199,7 @@ func RequestCall(ctx context.Context, rt *runtime.Runtime, oa *models.OrgAssets,
// log any error inserting our channel log, but continue
if clog != nil {
if err := models.InsertChannelLogs(ctx, rt, []*models.ChannelLog{clog}); err != nil {
logrus.WithError(err).Error("error inserting channel log")
slog.Error("error inserting channel log", "error", err)
}
}

Expand All @@ -224,7 +224,7 @@ func RequestStartForCall(ctx context.Context, rt *runtime.Runtime, channel *mode

// we are at max calls, do not move on
if count >= maxCalls {
logrus.WithField("channel_id", channel.ID()).Info("call being queued, max concurrent reached")
slog.Info("call being queued, max concurrent reached", "channel_id", channel.ID())
err := call.MarkThrottled(ctx, rt.DB, time.Now())
if err != nil {
return nil, errors.Wrapf(err, "error marking call as throttled")
Expand Down Expand Up @@ -275,7 +275,7 @@ func RequestStartForCall(ctx context.Context, rt *runtime.Runtime, channel *mode
return clog, errors.Wrapf(err, "error updating session external id")
}
if err := call.AttachLog(ctx, rt.DB, clog); err != nil {
logrus.WithError(err).Error("error attaching ivr channel log")
slog.Error("error attaching ivr channel log", "error", err)
}

return clog, nil
Expand All @@ -285,7 +285,7 @@ func RequestStartForCall(ctx context.Context, rt *runtime.Runtime, channel *mode
func HandleAsFailure(ctx context.Context, db *sqlx.DB, svc Service, call *models.Call, w http.ResponseWriter, rootErr error) error {
err := call.MarkFailed(ctx, db, time.Now())
if err != nil {
logrus.WithError(err).Error("error marking call as failed")
slog.Error("error marking call as failed", "error", err)
}
return svc.WriteErrorResponse(w, rootErr)
}
Expand Down Expand Up @@ -430,7 +430,7 @@ func ResumeIVRFlow(
if call.Status() == models.CallStatusErrored || call.Status() == models.CallStatusFailed {
err = models.ExitSessions(ctx, rt.DB, []models.SessionID{session.ID()}, models.SessionStatusInterrupted)
if err != nil {
logrus.WithError(err).Error("error interrupting session")
slog.Error("error interrupting session", "error", err)
}

return svc.WriteErrorResponse(w, fmt.Errorf("ending call due to previous status callback"))
Expand Down Expand Up @@ -513,7 +513,7 @@ func ResumeIVRFlow(
} else {
err = models.ExitSessions(ctx, rt.DB, []models.SessionID{session.ID()}, models.SessionStatusCompleted)
if err != nil {
logrus.WithError(err).Error("error closing session")
slog.Error("error closing session", "error", err)
}

return svc.WriteErrorResponse(w, fmt.Errorf("call completed"))
Expand Down Expand Up @@ -545,9 +545,9 @@ func buildMsgResume(
time.Sleep(time.Second)

if resp != nil {
logrus.WithField("retry", retry).WithField("status", resp.StatusCode).WithField("url", resume.Attachment.URL()).Info("retrying download of attachment")
slog.Info("retrying download of attachment", "retry", retry, "status", resp.StatusCode, "url", resume.Attachment.URL())
} else {
logrus.WithError(err).WithField("retry", retry).WithField("url", resume.Attachment.URL()).Info("retrying download of attachment")
slog.Info("retrying download of attachment", "error", err, "retry", retry, "url", resume.Attachment.URL())
}
}

Expand Down
11 changes: 1 addition & 10 deletions core/models/contacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,8 +463,6 @@ type contactEnvelope struct {
URNs []ContactURN `json:"urns"`
Tickets []struct {
UUID flows.TicketUUID `json:"uuid"`
TicketerID TicketerID `json:"ticketer_id"`
ExternalID string `json:"external_id"`
TopicID TopicID `json:"topic_id"`
Body string `json:"body"`
AssigneeID UserID `json:"assignee_id"`
Expand Down Expand Up @@ -522,14 +520,7 @@ LEFT JOIN (
SELECT
contact_id,
array_agg(
json_build_object(
'uuid', t.uuid,
'body', t.body,
'external_id', t.external_id,
'ticketer_id', t.ticketer_id,
'topic_id', t.topic_id,
'assignee_id', t.assignee_id
) ORDER BY t.opened_on DESC, t.id DESC
json_build_object('uuid', t.uuid, 'body', t.body, 'topic_id', t.topic_id, 'assignee_id', t.assignee_id) ORDER BY t.opened_on DESC, t.id DESC
) as tickets
FROM
tickets_ticket t
Expand Down
2 changes: 1 addition & 1 deletion core/models/contacts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestContacts(t *testing.T) {
assert.Equal(t, "tel:+16055742222?id=10001", bob.URNs()[0].String())
assert.Equal(t, "whatsapp:250788373373?id=30000", bob.URNs()[1].String())
assert.Equal(t, 0, bob.Groups().Count())
assert.Nil(t, bob.Ticket()) // because ticketer no longer exists
assert.NotNil(t, bob.Ticket())

assert.Equal(t, "George", george.Name())
assert.Equal(t, decimal.RequireFromString("30"), george.Fields()["age"].QueryValue())
Expand Down
8 changes: 2 additions & 6 deletions core/models/http_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ const (
// LogTypeClassifierCalled is our type for when we call a classifier
LogTypeClassifierCalled = "classifier_called"

// LogTypeTicketerCalled is our type for when we call a ticketer
LogTypeTicketerCalled = "ticketer_called"

// LogTypeAirtimeTransferred is our type for when we make an airtime transfer
LogTypeAirtimeTransferred = "airtime_transferred"
)
Expand All @@ -46,7 +43,6 @@ type HTTPLog struct {
CreatedOn time.Time `db:"created_on"`
FlowID FlowID `db:"flow_id"`
ClassifierID ClassifierID `db:"classifier_id"`
TicketerID TicketerID `db:"ticketer_id"`
AirtimeTransferID AirtimeTransferID `db:"airtime_transfer_id"`
}

Expand Down Expand Up @@ -90,8 +86,8 @@ func (h *HTTPLog) SetAirtimeTransferID(tid AirtimeTransferID) {
}

const insertHTTPLogsSQL = `
INSERT INTO request_logs_httplog( log_type, org_id, url, status_code, flow_id, classifier_id, ticketer_id, airtime_transfer_id, request, response, is_error, request_time, num_retries, created_on)
VALUES(:log_type, :org_id, :url, :status_code, :flow_id, :classifier_id, :ticketer_id, :airtime_transfer_id, :request, :response, :is_error, :request_time, :num_retries, :created_on)
INSERT INTO request_logs_httplog( log_type, org_id, url, status_code, flow_id, classifier_id, airtime_transfer_id, request, response, is_error, request_time, num_retries, created_on)
VALUES(:log_type, :org_id, :url, :status_code, :flow_id, :classifier_id, :airtime_transfer_id, :request, :response, :is_error, :request_time, :num_retries, :created_on)
RETURNING id
`

Expand Down
37 changes: 18 additions & 19 deletions core/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/runtime"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/exp/maps"
)

Expand Down Expand Up @@ -56,7 +55,7 @@ func ResumeFlow(ctx context.Context, rt *runtime.Runtime, oa *models.OrgAssets,
if err != nil {
// if this flow just isn't available anymore, log this error
if err == models.ErrNotFound {
logrus.WithField("contact_uuid", session.Contact().UUID()).WithField("session_uuid", session.UUID()).WithField("flow_id", session.CurrentFlowID()).Error("unable to find flow for resume")
slog.Error("unable to find flow for resume", "contact_uuid", session.Contact().UUID(), "session_uuid", session.UUID(), "flow_id", session.CurrentFlowID())
return nil, models.ExitSessions(ctx, rt.DB, []models.SessionID{session.ID()}, models.SessionStatusFailed)
}
return nil, errors.Wrapf(err, "error loading session flow: %d", session.CurrentFlowID())
Expand Down Expand Up @@ -118,7 +117,7 @@ func ResumeFlow(ctx context.Context, rt *runtime.Runtime, oa *models.OrgAssets,
return nil, errors.Wrapf(err, "error committing session changes on resume")
}

logrus.WithField("contact_uuid", resume.Contact().UUID()).WithField("session_uuid", session.UUID()).WithField("resume_type", resume.Type()).WithField("elapsed", time.Since(start)).Info("resumed session")
slog.Info("resumed session", "contact_uuid", resume.Contact().UUID(), "session_uuid", session.UUID(), "resume_type", resume.Type(), "elapsed", time.Since(start))
return session, nil
}

Expand All @@ -131,7 +130,7 @@ func StartFlowBatch(ctx context.Context, rt *runtime.Runtime, batch *models.Flow
defer func() {
err := models.MarkStartComplete(ctx, rt.DB, batch.StartID)
if err != nil {
logrus.WithError(err).WithField("start_id", batch.StartID).Error("error marking start as complete")
slog.Error("error marking start as complete", "error", err, "start_id", batch.StartID)
}
}()
}
Expand All @@ -145,7 +144,7 @@ func StartFlowBatch(ctx context.Context, rt *runtime.Runtime, batch *models.Flow
// try to load our flow
flow, err := oa.FlowByID(batch.FlowID)
if err == models.ErrNotFound {
logrus.WithField("flow_id", batch.FlowID).Info("skipping flow start, flow no longer active or archived")
slog.Info("skipping flow start, flow no longer active or archived", "flow_id", batch.FlowID)
return nil, nil
}
if err != nil {
Expand Down Expand Up @@ -257,7 +256,7 @@ func StartFlow(ctx context.Context, rt *runtime.Runtime, oa *models.OrgAssets, f
}

if len(remaining) > 0 {
logrus.WithField("contacts", remaining).Warn("failed to acquire locks for contacts")
slog.Warn("failed to acquire locks for contacts", "contacts", remaining)
}

return sessions, nil
Expand Down Expand Up @@ -313,23 +312,23 @@ func StartFlowForContacts(
}

start := time.Now()
log := logrus.WithField("flow_name", flow.Name()).WithField("flow_uuid", flow.UUID())
log := slog.With("flow_name", flow.Name(), "flow_uuid", flow.UUID())

// for each trigger start the flow
sessions := make([]flows.Session, 0, len(triggers))
sprints := make([]flows.Sprint, 0, len(triggers))

for _, trigger := range triggers {
// start our flow session
log := log.WithField("contact_uuid", trigger.Contact().UUID())
log := log.With("contact_uuid", trigger.Contact().UUID())
start := time.Now()

session, sprint, err := goflow.Engine(rt.Config).NewSession(sa, trigger)
if err != nil {
log.WithError(err).Errorf("error starting flow")
log.Error("error starting flow", "error", err)
continue
}
log.WithField("elapsed", time.Since(start)).Info("flow engine start")
log.Info("flow engine start", "elapsed", time.Since(start))
analytics.Gauge("mr.flow_start_elapsed", float64(time.Since(start)))

sessions = append(sessions, session)
Expand Down Expand Up @@ -372,7 +371,7 @@ func StartFlowForContacts(
err = tx.Commit()

if err == nil {
logrus.WithField("elapsed", time.Since(commitStart)).WithField("count", len(sessions)).Debug("sessions committed")
slog.Debug("sessions committed", "elapsed", time.Since(commitStart), "count", len(sessions))
}
}

Expand Down Expand Up @@ -401,22 +400,22 @@ func StartFlowForContacts(
err = models.InterruptSessionsForContactsTx(txCTX, tx, []models.ContactID{models.ContactID(session.Contact().ID())})
if err != nil {
tx.Rollback()
log.WithField("contact_uuid", session.Contact().UUID()).WithError(err).Errorf("error interrupting contact")
log.Error("error interrupting contact", "error", err, "contact_uuid", session.Contact().UUID())
continue
}
}

dbSession, err := models.InsertSessions(txCTX, rt, tx, oa, []flows.Session{session}, []flows.Sprint{sprint}, []*models.Contact{contact}, hook)
if err != nil {
tx.Rollback()
log.WithField("contact_uuid", session.Contact().UUID()).WithError(err).Errorf("error writing session to db")
log.Error("error writing session to db", "error", err, "contact_uuid", session.Contact().UUID())
continue
}

err = tx.Commit()
if err != nil {
tx.Rollback()
log.WithField("contact_uuid", session.Contact().UUID()).WithError(err).Errorf("error comitting session to db")
log.Error("error comitting session to db", "error", err, "contact_uuid", session.Contact().UUID())
continue
}

Expand Down Expand Up @@ -448,36 +447,36 @@ func StartFlowForContacts(

// we failed with our post commit hooks, try one at a time, logging those errors
for _, session := range dbSessions {
log = log.WithField("contact_uuid", session.ContactUUID())
log = log.With("contact_uuid", session.ContactUUID())

txCTX, cancel = context.WithTimeout(ctx, postCommitTimeout)
defer cancel()

tx, err := rt.DB.BeginTxx(txCTX, nil)
if err != nil {
tx.Rollback()
log.WithError(err).Error("error starting transaction to retry post commits")
log.Error("error starting transaction to retry post commits", "error", err)
continue
}

err = models.ApplyEventPostCommitHooks(ctx, rt, tx, oa, []*models.Scene{session.Scene()})
if err != nil {
tx.Rollback()
log.WithError(err).Errorf("error applying post commit hook")
log.Error("error applying post commit hook", "error", err)
continue
}

err = tx.Commit()

if err != nil {
tx.Rollback()
log.WithError(err).Errorf("error comitting post commit hook")
log.Error("error comitting post commit hook", "error", err)
continue
}
}
}

// figure out both average and total for total execution and commit time for our flows
log.WithField("elapsed", time.Since(start)).WithField("count", len(dbSessions)).Info("flow started, sessions created")
log.Info("flow started, sessions created", "elapsed", time.Since(start), "count", len(dbSessions))
return dbSessions, nil
}
Binary file modified mailroom_test.dump
Binary file not shown.
20 changes: 0 additions & 20 deletions web/contact/testdata/modify.json
Original file line number Diff line number Diff line change
Expand Up @@ -1409,10 +1409,6 @@
"modifiers": [
{
"type": "ticket",
"ticketer": {
"uuid": "ffc903f7-8cbb-443f-9627-87106842d1aa",
"name": "RapidPro Tickets"
},
"topic": {
"uuid": "0a8f2e00-fef6-402c-bd79-d789446ec0e0",
"name": "Support"
Expand Down Expand Up @@ -1444,10 +1440,6 @@
],
"ticket": {
"uuid": "d2f852ec-7b4e-457f-ae7f-f8b243c49ff5",
"ticketer": {
"uuid": "ffc903f7-8cbb-443f-9627-87106842d1aa",
"name": "RapidPro Tickets"
},
"topic": {
"uuid": "0a8f2e00-fef6-402c-bd79-d789446ec0e0",
"name": "Support"
Expand All @@ -1465,10 +1457,6 @@
"created_on": "2018-07-06T12:30:00.123456789Z",
"ticket": {
"uuid": "d2f852ec-7b4e-457f-ae7f-f8b243c49ff5",
"ticketer": {
"uuid": "ffc903f7-8cbb-443f-9627-87106842d1aa",
"name": "RapidPro Tickets"
},
"topic": {
"uuid": "0a8f2e00-fef6-402c-bd79-d789446ec0e0",
"name": "Support"
Expand Down Expand Up @@ -1561,10 +1549,6 @@
],
"ticket": {
"uuid": "d2f852ec-7b4e-457f-ae7f-f8b243c49ff5",
"ticketer": {
"uuid": "ffc903f7-8cbb-443f-9627-87106842d1aa",
"name": "RapidPro Tickets"
},
"topic": {
"uuid": "0a8f2e00-fef6-402c-bd79-d789446ec0e0",
"name": "Support"
Expand Down Expand Up @@ -1655,10 +1639,6 @@
],
"ticket": {
"uuid": "d2f852ec-7b4e-457f-ae7f-f8b243c49ff5",
"ticketer": {
"uuid": "ffc903f7-8cbb-443f-9627-87106842d1aa",
"name": "RapidPro Tickets"
},
"topic": {
"uuid": "0a8f2e00-fef6-402c-bd79-d789446ec0e0",
"name": "Support"
Expand Down
1 change: 0 additions & 1 deletion web/ticket/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ func TestTicketClose(t *testing.T) {
testdata.InsertOpenTicket(rt, testdata.Org1, testdata.Cathy, testdata.DefaultTopic, "Have you seen my cookies?", time.Now(), testdata.Admin)
testdata.InsertOpenTicket(rt, testdata.Org1, testdata.Cathy, testdata.DefaultTopic, "Have you seen my cookies?", time.Now(), nil)
testdata.InsertClosedTicket(rt, testdata.Org1, testdata.Cathy, testdata.DefaultTopic, "Have you seen my cookies?", testdata.Editor)
testdata.InsertOpenTicket(rt, testdata.Org1, testdata.Cathy, testdata.DefaultTopic, "Have you seen my cookies?", time.Now(), nil)

testsuite.RunWebTests(t, ctx, rt, "testdata/close.json", nil)
}
Expand Down
Loading

0 comments on commit 6ccc961

Please sign in to comment.