Skip to content

Commit

Permalink
Make kannel channel paused when we get Queued message in response
Browse files Browse the repository at this point in the history
  • Loading branch information
norkans7 committed Dec 13, 2023
1 parent 86ae080 commit 698d6c1
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 13 deletions.
17 changes: 15 additions & 2 deletions handlers/kannel/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kannel

import (
"context"
"errors"
"fmt"
"net/http"
"net/url"
Expand Down Expand Up @@ -200,10 +201,22 @@ func (h *handler) Send(ctx context.Context, msg courier.MsgOut, clog *courier.Ch
}

var resp *http.Response
var respBody []byte
if verifySSL {
resp, _, err = h.RequestHTTP(req, clog)
resp, respBody, err = h.RequestHTTP(req, clog)
} else {
resp, _, err = h.RequestHTTPInsecure(req, clog)
resp, respBody, err = h.RequestHTTPInsecure(req, clog)
}

if strings.Contains(string(respBody), "Queued") {
rc := h.Backend().RedisPool().Get()
defer rc.Close()
rateLimitKey := fmt.Sprintf("rate_limit:%s", msg.Channel().UUID())
rateLimitBulkKey := fmt.Sprintf("rate_limit_bulk:%s", msg.Channel().UUID())
// We pause sending 30 seconds so the connection to the SMSC is reset
rc.Do("SET", rateLimitKey, "engaged", "EX", 30)
rc.Do("SET", rateLimitBulkKey, "engaged", "EX", 30)
return nil, errors.New("received Queued response from kannel, we'll pause sending to empty the queue")

Check warning on line 219 in handlers/kannel/handler.go

View check run for this annotation

Codecov / codecov/patch

handlers/kannel/handler.go#L212-L219

Added lines #L212 - L219 were not covered by tests
}

status := h.Backend().NewStatusUpdate(msg.Channel(), msg.ID(), courier.MsgStatusErrored, clog)
Expand Down
7 changes: 2 additions & 5 deletions handlers/whatsapp_legacy/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -906,12 +906,11 @@ func (h *handler) sendWhatsAppMsg(rc redis.Conn, msg courier.MsgOut, sendPath *u

if resp != nil && (resp.StatusCode == 429 || resp.StatusCode == 503) {
rateLimitKey := fmt.Sprintf("rate_limit:%s", msg.Channel().UUID())
rc.Do("SET", rateLimitKey, "engaged")

// The rate limit is 50 requests per second
// We pause sending 2 seconds so the limit count is reset
// TODO: In the future we should the header value when available
rc.Do("EXPIRE", rateLimitKey, 2)
rc.Do("SET", rateLimitKey, "engaged", "EX", 2)

return "", "", errors.New("received rate-limit response from send endpoint")
}
Expand All @@ -923,12 +922,10 @@ func (h *handler) sendWhatsAppMsg(rc redis.Conn, msg courier.MsgOut, sendPath *u
if err == nil && len(errPayload.Errors) > 0 {
if hasTiersError(*errPayload) {
rateLimitBulkKey := fmt.Sprintf("rate_limit_bulk:%s", msg.Channel().UUID())
rc.Do("SET", rateLimitBulkKey, "engaged")

// The WA tiers spam rate limit hit
// We pause the bulk queue for 24 hours and 5min
rc.Do("EXPIRE", rateLimitBulkKey, (60*60*24)+(5*60))

rc.Do("SET", rateLimitBulkKey, "engaged", "EX", (60*60*24)+(5*60))

Check warning on line 928 in handlers/whatsapp_legacy/handler.go

View check run for this annotation

Codecov / codecov/patch

handlers/whatsapp_legacy/handler.go#L928

Added line #L928 was not covered by tests
err := errors.Errorf("received error from send endpoint: %s", errPayload.Errors[0].Title)
return "", "", err
}
Expand Down
9 changes: 3 additions & 6 deletions queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ func TestLua(t *testing.T) {
delay := time.Second*2 - time.Duration(time.Now().UnixNano()%int64(time.Second))
time.Sleep(delay)

conn.Do("SET", "rate_limit_bulk:chan1", "engaged")
conn.Do("EXPIRE", "rate_limit_bulk:chan1", 5)
conn.Do("SET", "rate_limit_bulk:chan1", "engaged", "EX", 5)

// we have the rate limit set,
queue, value, err := PopFromQueue(conn, "msgs")
Expand Down Expand Up @@ -120,8 +119,7 @@ func TestLua(t *testing.T) {
assert.NoError(err)

// make sure pause bulk key do not prevent use to get from the high priority queue
conn.Do("SET", "rate_limit_bulk:chan1", "engaged")
conn.Do("EXPIRE", "rate_limit_bulk:chan1", 5)
conn.Do("SET", "rate_limit_bulk:chan1", "engaged", "EX", 5)

queue, value, err = PopFromQueue(conn, "msgs")
assert.NoError(err)
Expand Down Expand Up @@ -194,8 +192,7 @@ func TestLua(t *testing.T) {
err = PushOntoQueue(conn, "msgs", "chan1", rate, `[{"id":34}]`, HighPriority)
assert.NoError(err)

conn.Do("SET", "rate_limit:chan1", "engaged")
conn.Do("EXPIRE", "rate_limit:chan1", 5)
conn.Do("SET", "rate_limit:chan1", "engaged", "EX", 5)

// we have the rate limit set,
queue, value, err = PopFromQueue(conn, "msgs")
Expand Down

0 comments on commit 698d6c1

Please sign in to comment.