From 698d6c1cb4cddba3d3fabaf96ef915130305922a Mon Sep 17 00:00:00 2001 From: Norbert Kwizera Date: Tue, 12 Dec 2023 17:01:47 +0200 Subject: [PATCH] Make kannel channel paused when we get Queued message in response --- handlers/kannel/handler.go | 17 +++++++++++++++-- handlers/whatsapp_legacy/handler.go | 7 ++----- queue/queue_test.go | 9 +++------ 3 files changed, 20 insertions(+), 13 deletions(-) diff --git a/handlers/kannel/handler.go b/handlers/kannel/handler.go index c249a6fef..d1c352f03 100644 --- a/handlers/kannel/handler.go +++ b/handlers/kannel/handler.go @@ -2,6 +2,7 @@ package kannel import ( "context" + "errors" "fmt" "net/http" "net/url" @@ -200,10 +201,22 @@ func (h *handler) Send(ctx context.Context, msg courier.MsgOut, clog *courier.Ch } var resp *http.Response + var respBody []byte if verifySSL { - resp, _, err = h.RequestHTTP(req, clog) + resp, respBody, err = h.RequestHTTP(req, clog) } else { - resp, _, err = h.RequestHTTPInsecure(req, clog) + resp, respBody, err = h.RequestHTTPInsecure(req, clog) + } + + if strings.Contains(string(respBody), "Queued") { + rc := h.Backend().RedisPool().Get() + defer rc.Close() + rateLimitKey := fmt.Sprintf("rate_limit:%s", msg.Channel().UUID()) + rateLimitBulkKey := fmt.Sprintf("rate_limit_bulk:%s", msg.Channel().UUID()) + // We pause sending 30 seconds so the connection to the SMSC is reset + rc.Do("SET", rateLimitKey, "engaged", "EX", 30) + rc.Do("SET", rateLimitBulkKey, "engaged", "EX", 30) + return nil, errors.New("received Queued response from kannel, we'll pause sending to empty the queue") } status := h.Backend().NewStatusUpdate(msg.Channel(), msg.ID(), courier.MsgStatusErrored, clog) diff --git a/handlers/whatsapp_legacy/handler.go b/handlers/whatsapp_legacy/handler.go index 40fffe999..88e8378b7 100644 --- a/handlers/whatsapp_legacy/handler.go +++ b/handlers/whatsapp_legacy/handler.go @@ -906,12 +906,11 @@ func (h *handler) sendWhatsAppMsg(rc redis.Conn, msg courier.MsgOut, sendPath *u if resp != nil && (resp.StatusCode == 429 || resp.StatusCode == 503) { rateLimitKey := fmt.Sprintf("rate_limit:%s", msg.Channel().UUID()) - rc.Do("SET", rateLimitKey, "engaged") // The rate limit is 50 requests per second // We pause sending 2 seconds so the limit count is reset // TODO: In the future we should the header value when available - rc.Do("EXPIRE", rateLimitKey, 2) + rc.Do("SET", rateLimitKey, "engaged", "EX", 2) return "", "", errors.New("received rate-limit response from send endpoint") } @@ -923,12 +922,10 @@ func (h *handler) sendWhatsAppMsg(rc redis.Conn, msg courier.MsgOut, sendPath *u if err == nil && len(errPayload.Errors) > 0 { if hasTiersError(*errPayload) { rateLimitBulkKey := fmt.Sprintf("rate_limit_bulk:%s", msg.Channel().UUID()) - rc.Do("SET", rateLimitBulkKey, "engaged") // The WA tiers spam rate limit hit // We pause the bulk queue for 24 hours and 5min - rc.Do("EXPIRE", rateLimitBulkKey, (60*60*24)+(5*60)) - + rc.Do("SET", rateLimitBulkKey, "engaged", "EX", (60*60*24)+(5*60)) err := errors.Errorf("received error from send endpoint: %s", errPayload.Errors[0].Title) return "", "", err } diff --git a/queue/queue_test.go b/queue/queue_test.go index 5f6d11eeb..caef0b70e 100644 --- a/queue/queue_test.go +++ b/queue/queue_test.go @@ -60,8 +60,7 @@ func TestLua(t *testing.T) { delay := time.Second*2 - time.Duration(time.Now().UnixNano()%int64(time.Second)) time.Sleep(delay) - conn.Do("SET", "rate_limit_bulk:chan1", "engaged") - conn.Do("EXPIRE", "rate_limit_bulk:chan1", 5) + conn.Do("SET", "rate_limit_bulk:chan1", "engaged", "EX", 5) // we have the rate limit set, queue, value, err := PopFromQueue(conn, "msgs") @@ -120,8 +119,7 @@ func TestLua(t *testing.T) { assert.NoError(err) // make sure pause bulk key do not prevent use to get from the high priority queue - conn.Do("SET", "rate_limit_bulk:chan1", "engaged") - conn.Do("EXPIRE", "rate_limit_bulk:chan1", 5) + conn.Do("SET", "rate_limit_bulk:chan1", "engaged", "EX", 5) queue, value, err = PopFromQueue(conn, "msgs") assert.NoError(err) @@ -194,8 +192,7 @@ func TestLua(t *testing.T) { err = PushOntoQueue(conn, "msgs", "chan1", rate, `[{"id":34}]`, HighPriority) assert.NoError(err) - conn.Do("SET", "rate_limit:chan1", "engaged") - conn.Do("EXPIRE", "rate_limit:chan1", 5) + conn.Do("SET", "rate_limit:chan1", "engaged", "EX", 5) // we have the rate limit set, queue, value, err = PopFromQueue(conn, "msgs")