Skip to content

Commit

Permalink
Make kannel channel paused when we get Queued message in response
Browse files Browse the repository at this point in the history
  • Loading branch information
norkans7 committed Jan 3, 2024
1 parent 23e0c2a commit e8096c0
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 14 deletions.
18 changes: 15 additions & 3 deletions handlers/kannel/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,13 +200,25 @@ func (h *handler) Send(ctx context.Context, msg courier.MsgOut, clog *courier.Ch
}

var resp *http.Response
var respBody []byte
if verifySSL {
resp, _, err = h.RequestHTTP(req, clog)
resp, respBody, err = h.RequestHTTP(req, clog)
} else {
resp, _, err = h.RequestHTTPInsecure(req, clog)
resp, respBody, err = h.RequestHTTPInsecure(req, clog)
}

status := h.Backend().NewStatusUpdate(msg.Channel(), msg.ID(), courier.MsgStatusErrored, clog)

if strings.Contains(string(respBody), "Queued") {
rc := h.Backend().RedisPool().Get()
defer rc.Close()
rateLimitKey := fmt.Sprintf("rate_limit:%s", msg.Channel().UUID())
rateLimitBulkKey := fmt.Sprintf("rate_limit_bulk:%s", msg.Channel().UUID())
// We pause sending 30 seconds so the connection to the SMSC is reset
rc.Do("SET", rateLimitKey, "engaged", "EX", 30)
rc.Do("SET", rateLimitBulkKey, "engaged", "EX", 30)
// the message reached kannel and we should not return an error here, so continue to get the status below
}

if err == nil && resp.StatusCode/100 == 2 {
status.SetStatus(courier.MsgStatusWired)
}
Expand Down
11 changes: 11 additions & 0 deletions handlers/kannel/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,17 @@ var defaultSendTestCases = []OutgoingTestCase{
ExpectedURLParams: map[string]string{"text": `Error Message`, "to": "+250788383383", "coding": "", "priority": ""},
SendPrep: setSendURL,
},
{
Label: "Rate Limit Engaged",
MsgText: "Hello",
MsgURN: "tel:+250788383383",
MsgHighPriority: false,
ExpectedMsgStatus: "W",
MockResponseBody: "3: Queued for later delivery",
MockResponseStatus: 202,
ExpectedURLParams: map[string]string{"text": `Hello`, "to": "+250788383383", "coding": "", "priority": ""},
SendPrep: setSendURL,
},
{
Label: "Custom Params",
MsgText: "Custom Params",
Expand Down
7 changes: 2 additions & 5 deletions handlers/whatsapp_legacy/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -906,12 +906,11 @@ func (h *handler) sendWhatsAppMsg(rc redis.Conn, msg courier.MsgOut, sendPath *u

if resp != nil && (resp.StatusCode == 429 || resp.StatusCode == 503) {
rateLimitKey := fmt.Sprintf("rate_limit:%s", msg.Channel().UUID())
rc.Do("SET", rateLimitKey, "engaged")

// The rate limit is 50 requests per second
// We pause sending 2 seconds so the limit count is reset
// TODO: In the future we should the header value when available
rc.Do("EXPIRE", rateLimitKey, 2)
rc.Do("SET", rateLimitKey, "engaged", "EX", 2)

return "", "", errors.New("received rate-limit response from send endpoint")
}
Expand All @@ -923,12 +922,10 @@ func (h *handler) sendWhatsAppMsg(rc redis.Conn, msg courier.MsgOut, sendPath *u
if err == nil && len(errPayload.Errors) > 0 {
if hasTiersError(*errPayload) {
rateLimitBulkKey := fmt.Sprintf("rate_limit_bulk:%s", msg.Channel().UUID())
rc.Do("SET", rateLimitBulkKey, "engaged")

// The WA tiers spam rate limit hit
// We pause the bulk queue for 24 hours and 5min
rc.Do("EXPIRE", rateLimitBulkKey, (60*60*24)+(5*60))

rc.Do("SET", rateLimitBulkKey, "engaged", "EX", (60*60*24)+(5*60))

Check warning on line 928 in handlers/whatsapp_legacy/handler.go

View check run for this annotation

Codecov / codecov/patch

handlers/whatsapp_legacy/handler.go#L928

Added line #L928 was not covered by tests
err := errors.Errorf("received error from send endpoint: %s", errPayload.Errors[0].Title)
return "", "", err
}
Expand Down
9 changes: 3 additions & 6 deletions queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ func TestLua(t *testing.T) {
delay := time.Second*2 - time.Duration(time.Now().UnixNano()%int64(time.Second))
time.Sleep(delay)

conn.Do("SET", "rate_limit_bulk:chan1", "engaged")
conn.Do("EXPIRE", "rate_limit_bulk:chan1", 5)
conn.Do("SET", "rate_limit_bulk:chan1", "engaged", "EX", 5)

// we have the rate limit set,
queue, value, err := PopFromQueue(conn, "msgs")
Expand Down Expand Up @@ -120,8 +119,7 @@ func TestLua(t *testing.T) {
assert.NoError(err)

// make sure pause bulk key do not prevent use to get from the high priority queue
conn.Do("SET", "rate_limit_bulk:chan1", "engaged")
conn.Do("EXPIRE", "rate_limit_bulk:chan1", 5)
conn.Do("SET", "rate_limit_bulk:chan1", "engaged", "EX", 5)

queue, value, err = PopFromQueue(conn, "msgs")
assert.NoError(err)
Expand Down Expand Up @@ -194,8 +192,7 @@ func TestLua(t *testing.T) {
err = PushOntoQueue(conn, "msgs", "chan1", rate, `[{"id":34}]`, HighPriority)
assert.NoError(err)

conn.Do("SET", "rate_limit:chan1", "engaged")
conn.Do("EXPIRE", "rate_limit:chan1", 5)
conn.Do("SET", "rate_limit:chan1", "engaged", "EX", 5)

// we have the rate limit set,
queue, value, err = PopFromQueue(conn, "msgs")
Expand Down

0 comments on commit e8096c0

Please sign in to comment.