diff --git a/backend.go b/backend.go index 5fa2b0422..6ee985706 100644 --- a/backend.go +++ b/backend.go @@ -7,7 +7,6 @@ import ( "strings" "github.com/gomodule/redigo/redis" - "github.com/nyaruka/gocommon/aws/cwatch" "github.com/nyaruka/gocommon/httpx" "github.com/nyaruka/gocommon/urns" ) @@ -68,7 +67,7 @@ type Backend interface { // WriteChannelLog writes the passed in channel log to our backend WriteChannelLog(context.Context, *ChannelLog) error - // PopNextOutgoingMsg returns the next message that needs to be sent, callers should call MarkOutgoingMsgComplete with the + // PopNextOutgoingMsg returns the next message that needs to be sent, callers should call OnSendComplete with the // returned message when they have dealt with the message (regardless of whether it was sent or not) PopNextOutgoingMsg(context.Context) (MsgOut, error) @@ -80,10 +79,11 @@ type Backend interface { // a message is being forced in being resent by a user ClearMsgSent(context.Context, MsgID) error - // MarkOutgoingMsgComplete marks the passed in message as having been processed. Note this should be called even in the case - // of errors during sending as it will manage the number of active workers per channel. The status parameter can be - // used to determine any sort of deduping of msg sends - MarkOutgoingMsgComplete(context.Context, MsgOut, StatusUpdate) + // OnSendComplete is called when the sender has finished trying to send a message + OnSendComplete(context.Context, MsgOut, StatusUpdate, *ChannelLog) + + // OnReceiveComplete is called when the server has finished handling an incoming request + OnReceiveComplete(context.Context, Channel, []Event, *ChannelLog) // SaveAttachment saves an attachment to backend storage SaveAttachment(context.Context, Channel, string, []byte, string) (string, error) @@ -106,9 +106,6 @@ type Backend interface { // RedisPool returns the redisPool for this backend RedisPool() *redis.Pool - - // CloudWatch return the CloudWatch service for this backend - CloudWatch() *cwatch.Service } // Media is a resolved media object that can be used as a message attachment diff --git a/backends/rapidpro/backend.go b/backends/rapidpro/backend.go index 709d30abb..b399cdd1b 100644 --- a/backends/rapidpro/backend.go +++ b/backends/rapidpro/backend.go @@ -50,13 +50,6 @@ func init() { courier.RegisterBackend("rapidpro", newBackend) } -type stats struct { - // both sqlx and redis provide wait stats which are cummulative that we need to convert into increments by - // tracking their previous values - dbWaitDuration time.Duration - redisWaitDuration time.Duration -} - type backend struct { config *courier.Config @@ -94,7 +87,7 @@ type backend struct { // tracking of external ids of messages we've sent in case we need one before its status update has been written sentExternalIDs *redisx.IntervalHash - stats stats + stats *StatsCollector } // NewBackend creates a new RapidPro backend @@ -131,6 +124,8 @@ func newBackend(cfg *courier.Config) courier.Backend { receivedExternalIDs: redisx.NewIntervalHash("seen-external-ids", time.Hour*24, 2), // 24 - 48 hours sentIDs: redisx.NewIntervalSet("sent-ids", time.Hour, 2), // 1 - 2 hours sentExternalIDs: redisx.NewIntervalHash("sent-external-ids", time.Hour, 2), // 1 - 2 hours + + stats: NewStatsCollector(), } } @@ -194,7 +189,6 @@ func (b *backend) Start() error { if err != nil { return err } - b.cw.StartQueue(time.Second * 3) // check attachment bucket access if err := b.s3.Test(ctx, b.config.S3AttachmentsBucket); err != nil { @@ -253,8 +247,6 @@ func (b *backend) Stop() error { // wait for our threads to exit b.waitGroup.Wait() - // stop cloudwatch service - b.cw.StopQueue() return nil } @@ -464,8 +456,8 @@ func (b *backend) ClearMsgSent(ctx context.Context, id courier.MsgID) error { return b.sentIDs.Rem(rc, id.String()) } -// MarkOutgoingMsgComplete marks the passed in message as having completed processing, freeing up a worker for that channel -func (b *backend) MarkOutgoingMsgComplete(ctx context.Context, msg courier.MsgOut, status courier.StatusUpdate) { +// OnSendComplete is called when the sender has finished trying to send a message +func (b *backend) OnSendComplete(ctx context.Context, msg courier.MsgOut, status courier.StatusUpdate, clog *courier.ChannelLog) { rc := b.rp.Get() defer rc.Close() @@ -489,6 +481,13 @@ func (b *backend) MarkOutgoingMsgComplete(ctx context.Context, msg courier.MsgOu slog.Error("unable to update session timeout", "error", err, "session_id", dbMsg.SessionID_) } } + + b.stats.RecordOutgoing(msg.Channel().ChannelType(), wasSuccess, clog.Elapsed) +} + +// OnReceiveComplete is called when the server has finished handling an incoming request +func (b *backend) OnReceiveComplete(ctx context.Context, ch courier.Channel, events []courier.Event, clog *courier.ChannelLog) { + b.stats.RecordIncoming(ch.ChannelType(), events, clog.Elapsed) } // WriteMsg writes the passed in message to our store @@ -737,7 +736,6 @@ func (b *backend) Health() string { return health.String() } -// Heartbeat is called every minute, we log our queue depth to librato func (b *backend) Heartbeat() error { rc := b.rp.Get() defer rc.Close() @@ -774,34 +772,48 @@ func (b *backend) Heartbeat() error { dbStats := b.db.Stats() redisStats := b.rp.Stats() - dbWaitDurationInPeriod := dbStats.WaitDuration - b.stats.dbWaitDuration - redisWaitDurationInPeriod := redisStats.WaitDuration - b.stats.redisWaitDuration - - b.stats.dbWaitDuration = dbStats.WaitDuration - b.stats.redisWaitDuration = redisStats.WaitDuration + stats := b.stats.Stats(dbStats, redisStats) + metrics := make([]cwtypes.MetricDatum, 0, 10) hostDim := cwatch.Dimension("Host", b.config.InstanceID) - b.CloudWatch().Queue( + metrics = append(metrics, cwatch.Datum("DBConnectionsInUse", float64(dbStats.InUse), cwtypes.StandardUnitCount, hostDim), - cwatch.Datum("DBConnectionWaitDuration", float64(dbWaitDurationInPeriod/time.Millisecond), cwtypes.StandardUnitMilliseconds, hostDim), + cwatch.Datum("DBConnectionWaitDuration", float64(stats.DBWaitDuration/time.Second), cwtypes.StandardUnitSeconds, hostDim), cwatch.Datum("RedisConnectionsInUse", float64(redisStats.ActiveCount), cwtypes.StandardUnitCount, hostDim), - cwatch.Datum("RedisConnectionsWaitDuration", float64(redisWaitDurationInPeriod/time.Millisecond), cwtypes.StandardUnitMilliseconds, hostDim), - ) + cwatch.Datum("RedisConnectionsWaitDuration", float64(stats.RedisWaitDuration/time.Second), cwtypes.StandardUnitSeconds, hostDim), - b.CloudWatch().Queue( cwatch.Datum("QueuedMsgs", float64(bulkSize), cwtypes.StandardUnitCount, cwatch.Dimension("QueueName", "bulk")), cwatch.Datum("QueuedMsgs", float64(prioritySize), cwtypes.StandardUnitCount, cwatch.Dimension("QueueName", "priority")), + cwatch.Datum("ContactsCreated", float64(stats.ContactsCreated), cwtypes.StandardUnitCount), ) - slog.Info("current metrics", - "db_inuse", dbStats.InUse, - "db_wait", dbWaitDurationInPeriod, - "redis_inuse", redisStats.ActiveCount, - "redis_wait", redisWaitDurationInPeriod, - "priority_size", prioritySize, - "bulk_size", bulkSize, - ) + metrics = append(metrics, stats.IncomingRequests.Metrics("IncomingRequests")...) + metrics = append(metrics, stats.IncomingMessages.Metrics("IncomingMessages")...) + metrics = append(metrics, stats.IncomingStatuses.Metrics("IncomingStatuses")...) + metrics = append(metrics, stats.IncomingEvents.Metrics("IncomingEvents")...) + metrics = append(metrics, stats.IncomingIgnored.Metrics("IncomingIgnored")...) + metrics = append(metrics, stats.OutgoingSends.Metrics("OutgoingSends")...) + metrics = append(metrics, stats.OutgoingErrors.Metrics("OutgoingErrors")...) + + // turn our duration stats into averages for metrics + for cType, count := range stats.IncomingDuration { + avgTime := float64(count) / float64(stats.IncomingRequests[cType]) + metrics = append(metrics, cwatch.Datum("IncomingDuration", float64(avgTime), cwtypes.StandardUnitCount, cwatch.Dimension("ChannelType", string(cType)))) + } + for cType, duration := range stats.OutgoingDuration { + avgTime := float64(duration) / float64(stats.OutgoingSends[cType]+stats.OutgoingErrors[cType]) + metrics = append(metrics, cwatch.Datum("OutgoingDuration", avgTime, cwtypes.StandardUnitSeconds, cwatch.Dimension("ChannelType", string(cType)))) + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + if err := b.cw.Send(ctx, metrics...); err != nil { + slog.Error("error sending metrics", "error", err) + } else { + slog.Info("sent metrics to cloudwatch", "metrics", len(metrics)) + } + cancel() + return nil } @@ -878,8 +890,3 @@ func (b *backend) Status() string { func (b *backend) RedisPool() *redis.Pool { return b.rp } - -// CloudWatch return the cloudwatch service -func (b *backend) CloudWatch() *cwatch.Service { - return b.cw -} diff --git a/backends/rapidpro/backend_test.go b/backends/rapidpro/backend_test.go index 49393ed01..ff322ad76 100644 --- a/backends/rapidpro/backend_test.go +++ b/backends/rapidpro/backend_test.go @@ -900,7 +900,7 @@ func (ts *BackendTestSuite) TestOutgoingQueue() { ts.Equal(msg.Text(), "test message") // mark this message as dealt with - ts.b.MarkOutgoingMsgComplete(ctx, msg, ts.b.NewStatusUpdate(msg.Channel(), msg.ID(), courier.MsgStatusWired, clog)) + ts.b.OnSendComplete(ctx, msg, ts.b.NewStatusUpdate(msg.Channel(), msg.ID(), courier.MsgStatusWired, clog), clog) // this message should now be marked as sent sent, err := ts.b.WasMsgSent(ctx, msg.ID()) diff --git a/backends/rapidpro/contact.go b/backends/rapidpro/contact.go index c30fe4001..e9fd8d709 100644 --- a/backends/rapidpro/contact.go +++ b/backends/rapidpro/contact.go @@ -10,10 +10,8 @@ import ( "time" "unicode/utf8" - cwtypes "github.com/aws/aws-sdk-go-v2/service/cloudwatch/types" "github.com/jmoiron/sqlx" "github.com/nyaruka/courier" - "github.com/nyaruka/gocommon/aws/cwatch" "github.com/nyaruka/gocommon/dbutil" "github.com/nyaruka/gocommon/urns" "github.com/nyaruka/gocommon/uuids" @@ -218,9 +216,7 @@ func contactForURN(ctx context.Context, b *backend, org OrgID, channel *Channel, // store this URN on our contact contact.URNID_ = contactURN.ID - // report that we created a new contact - b.cw.Queue(cwatch.Datum("ContactCreated", float64(1), cwtypes.StandardUnitCount)) + b.stats.RecordContactCreated() - // and return it return contact, nil } diff --git a/backends/rapidpro/stats.go b/backends/rapidpro/stats.go new file mode 100644 index 000000000..c50747239 --- /dev/null +++ b/backends/rapidpro/stats.go @@ -0,0 +1,158 @@ +package rapidpro + +import ( + "database/sql" + "maps" + "sync" + "time" + + "github.com/aws/aws-sdk-go-v2/service/cloudwatch/types" + "github.com/gomodule/redigo/redis" + "github.com/nyaruka/courier" + "github.com/nyaruka/gocommon/aws/cwatch" +) + +type CountByType map[courier.ChannelType]int + +func (c CountByType) Metrics(name string) []types.MetricDatum { + m := make([]types.MetricDatum, 0, len(c)) + for typ, count := range c { + m = append(m, cwatch.Datum(name, float64(count), types.StandardUnitCount, cwatch.Dimension("ChannelType", string(typ)))) + } + return m +} + +type DurationByType map[courier.ChannelType]time.Duration + +type Stats struct { + IncomingRequests CountByType // number of handler requests + IncomingMessages CountByType // number of messages received + IncomingStatuses CountByType // number of status updates received + IncomingEvents CountByType // number of other events received + IncomingIgnored CountByType // number of requests ignored + IncomingDuration DurationByType // total time spent handling requests + + OutgoingSends CountByType // number of sends that succeeded + OutgoingErrors CountByType // number of sends that errored + OutgoingDuration DurationByType // total time spent sending messages + + ContactsCreated int + + DBWaitDuration time.Duration + RedisWaitDuration time.Duration +} + +func newStats() *Stats { + return &Stats{ + IncomingRequests: make(CountByType), + IncomingMessages: make(CountByType), + IncomingStatuses: make(CountByType), + IncomingEvents: make(CountByType), + IncomingIgnored: make(CountByType), + IncomingDuration: make(DurationByType), + + OutgoingSends: make(CountByType), + OutgoingErrors: make(CountByType), + OutgoingDuration: make(DurationByType), + + ContactsCreated: 0, + } +} + +func (s *Stats) reset(db sql.DBStats, rp redis.PoolStats) { + clear(s.IncomingRequests) + clear(s.IncomingMessages) + clear(s.IncomingStatuses) + clear(s.IncomingEvents) + clear(s.IncomingIgnored) + clear(s.IncomingDuration) + + clear(s.OutgoingSends) + clear(s.OutgoingErrors) + clear(s.OutgoingDuration) + + s.ContactsCreated = 0 + + s.DBWaitDuration = db.WaitDuration + s.RedisWaitDuration = rp.WaitDuration +} + +// StatsCollector provides threadsafe stats collection +type StatsCollector struct { + mutex sync.Mutex + stats *Stats +} + +// NewStatsCollector creates a new stats collector +func NewStatsCollector() *StatsCollector { + return &StatsCollector{stats: newStats()} +} + +func (c *StatsCollector) RecordIncoming(typ courier.ChannelType, evts []courier.Event, d time.Duration) { + c.mutex.Lock() + c.stats.IncomingRequests[typ]++ + + for _, e := range evts { + switch e.(type) { + case courier.MsgIn: + c.stats.IncomingMessages[typ]++ + case courier.StatusUpdate: + c.stats.IncomingStatuses[typ]++ + case courier.ChannelEvent: + c.stats.IncomingEvents[typ]++ + } + } + if len(evts) == 0 { + c.stats.IncomingIgnored[typ]++ + } + + c.stats.IncomingDuration[typ] += d + c.mutex.Unlock() +} + +func (c *StatsCollector) RecordOutgoing(typ courier.ChannelType, success bool, d time.Duration) { + c.mutex.Lock() + if success { + c.stats.OutgoingSends[typ]++ + } else { + c.stats.OutgoingErrors[typ]++ + } + c.stats.OutgoingDuration[typ] += d + c.mutex.Unlock() +} + +func (c *StatsCollector) RecordContactCreated() { + c.mutex.Lock() + c.stats.ContactsCreated++ + c.mutex.Unlock() +} + +// Stats returns the stats for the period since the last call +func (c *StatsCollector) Stats(db sql.DBStats, rp redis.PoolStats) *Stats { + c.mutex.Lock() + defer c.mutex.Unlock() + + stats := &Stats{ + ContactsCreated: c.stats.ContactsCreated, + + IncomingRequests: maps.Clone(c.stats.IncomingRequests), + IncomingMessages: maps.Clone(c.stats.IncomingMessages), + IncomingStatuses: maps.Clone(c.stats.IncomingStatuses), + IncomingEvents: maps.Clone(c.stats.IncomingEvents), + IncomingIgnored: maps.Clone(c.stats.IncomingIgnored), + IncomingDuration: maps.Clone(c.stats.IncomingDuration), + + OutgoingSends: maps.Clone(c.stats.OutgoingSends), + OutgoingErrors: maps.Clone(c.stats.OutgoingErrors), + OutgoingDuration: maps.Clone(c.stats.OutgoingDuration), + + // both sqlx and redis provide wait stats which are cummulative that we need to convert into increments by + // tracking their previous values + DBWaitDuration: db.WaitDuration - c.stats.DBWaitDuration, + RedisWaitDuration: rp.WaitDuration - c.stats.RedisWaitDuration, + } + + c.stats.reset(db, rp) + + return stats +} diff --git a/backends/rapidpro/stats_test.go b/backends/rapidpro/stats_test.go new file mode 100644 index 000000000..336f0065b --- /dev/null +++ b/backends/rapidpro/stats_test.go @@ -0,0 +1,60 @@ +package rapidpro_test + +import ( + "database/sql" + "testing" + "time" + + "github.com/gomodule/redigo/redis" + "github.com/nyaruka/courier" + "github.com/nyaruka/courier/backends/rapidpro" + "github.com/stretchr/testify/assert" +) + +func TestStats(t *testing.T) { + db := sql.DBStats{WaitDuration: time.Second} + rp := redis.PoolStats{WaitDuration: time.Second * 3} + + sc := rapidpro.NewStatsCollector() + sc.RecordContactCreated() + sc.RecordContactCreated() + sc.RecordIncoming("T", []courier.Event{}, time.Second) + sc.RecordOutgoing("T", true, time.Second) + sc.RecordOutgoing("T", true, time.Second) + sc.RecordOutgoing("FBA", true, time.Second) + sc.RecordOutgoing("FBA", true, time.Second) + sc.RecordOutgoing("FBA", true, time.Second) + + stats := sc.Stats(db, rp) + + assert.Equal(t, 2, stats.ContactsCreated) + assert.Equal(t, rapidpro.CountByType{"T": 1}, stats.IncomingRequests) + assert.Equal(t, rapidpro.CountByType{}, stats.IncomingMessages) + assert.Equal(t, rapidpro.CountByType{}, stats.IncomingStatuses) + assert.Equal(t, rapidpro.CountByType{}, stats.IncomingEvents) + assert.Equal(t, rapidpro.DurationByType{"T": time.Second}, stats.IncomingDuration) + assert.Equal(t, rapidpro.CountByType{"T": 2, "FBA": 3}, stats.OutgoingSends) + assert.Equal(t, rapidpro.CountByType{}, stats.OutgoingErrors) + assert.Equal(t, rapidpro.DurationByType{"T": time.Second * 2, "FBA": time.Second * 3}, stats.OutgoingDuration) + assert.Equal(t, time.Second, stats.DBWaitDuration) + assert.Equal(t, time.Second*3, stats.RedisWaitDuration) + + sc.RecordOutgoing("FBA", true, time.Second) + sc.RecordOutgoing("FBA", true, time.Second) + db.WaitDuration = time.Second * 3 + rp.WaitDuration = time.Second * 3 + + stats = sc.Stats(db, rp) + + assert.Equal(t, 0, stats.ContactsCreated) + assert.Equal(t, rapidpro.CountByType{}, stats.IncomingRequests) + assert.Equal(t, rapidpro.CountByType{}, stats.IncomingMessages) + assert.Equal(t, rapidpro.CountByType{}, stats.IncomingStatuses) + assert.Equal(t, rapidpro.CountByType{}, stats.IncomingEvents) + assert.Equal(t, rapidpro.DurationByType{}, stats.IncomingDuration) + assert.Equal(t, rapidpro.CountByType{"FBA": 2}, stats.OutgoingSends) + assert.Equal(t, rapidpro.CountByType{}, stats.OutgoingErrors) + assert.Equal(t, rapidpro.DurationByType{"FBA": time.Second * 2}, stats.OutgoingDuration) + assert.Equal(t, time.Second*2, stats.DBWaitDuration) + assert.Equal(t, time.Duration(0), stats.RedisWaitDuration) +} diff --git a/sender.go b/sender.go index 6a6c20a2a..e5d3ea843 100644 --- a/sender.go +++ b/sender.go @@ -7,9 +7,7 @@ import ( "log/slog" "time" - "github.com/aws/aws-sdk-go-v2/service/cloudwatch/types" "github.com/nyaruka/courier/utils/clogs" - "github.com/nyaruka/gocommon/aws/cwatch" "github.com/nyaruka/gocommon/urns" ) @@ -289,8 +287,6 @@ func (w *Sender) sendMessage(msg MsgOut) { log = log.With("quick_replies", msg.QuickReplies()) } - start := time.Now() - // if this is a resend, clear our sent status if msg.IsResend() { err := backend.ClearMsgSent(sendCTX, msg.ID()) @@ -327,26 +323,7 @@ func (w *Sender) sendMessage(msg MsgOut) { log.Warn("duplicate send, marking as wired") } else { - status = w.sendByHandler(sendCTX, handler, msg, clog, log) - - duration := time.Since(start) - secondDuration := float64(duration) / float64(time.Second) - log.Debug("send complete", "status", status.Status(), "elapsed", duration) - - channelTypeDim := cwatch.Dimension("ChannelType", string(msg.Channel().ChannelType())) - - // report to librato - if status.Status() == MsgStatusErrored || status.Status() == MsgStatusFailed { - backend.CloudWatch().Queue( - cwatch.Datum("MsgSendError", float64(secondDuration), types.StandardUnitSeconds, channelTypeDim), - ) - - } else { - backend.CloudWatch().Queue( - cwatch.Datum("MsgSend", float64(secondDuration), types.StandardUnitSeconds, channelTypeDim), - ) - } } // we allot 10 seconds to write our status to the db @@ -367,7 +344,7 @@ func (w *Sender) sendMessage(msg MsgOut) { } // mark our send task as complete - backend.MarkOutgoingMsgComplete(writeCTX, msg, status) + backend.OnSendComplete(writeCTX, msg, status, clog) } func (w *Sender) sendByHandler(ctx context.Context, h ChannelHandler, m MsgOut, clog *ChannelLog, log *slog.Logger) StatusUpdate { diff --git a/server.go b/server.go index 556bae733..c807a4ca4 100644 --- a/server.go +++ b/server.go @@ -16,11 +16,9 @@ import ( "sync" "time" - "github.com/aws/aws-sdk-go-v2/service/cloudwatch/types" "github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5/middleware" "github.com/nyaruka/courier/utils/clogs" - "github.com/nyaruka/gocommon/aws/cwatch" "github.com/nyaruka/gocommon/httpx" "github.com/nyaruka/gocommon/jsonx" ) @@ -248,8 +246,6 @@ func (s *server) initializeChannelHandlers() { func (s *server) channelHandleWrapper(handler ChannelHandler, handlerFunc ChannelHandleFunc, logType clogs.LogType) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { - start := time.Now() - // stuff a few things in our context that help with logging baseCtx := context.WithValue(r.Context(), contextRequestURL, r.URL.String()) baseCtx = context.WithValue(baseCtx, contextRequestStart, time.Now()) @@ -290,8 +286,6 @@ func (s *server) channelHandleWrapper(handler ChannelHandler, handlerFunc Channe clog := NewChannelLogForIncoming(logType, channel, recorder, handler.RedactValues(channel)) events, hErr := handlerFunc(ctx, channel, recorder.ResponseWriter, r, clog) - duration := time.Since(start) - secondDuration := float64(duration) / float64(time.Second) // if we received an error, write it out and report it if hErr != nil { @@ -306,30 +300,15 @@ func (s *server) channelHandleWrapper(handler ChannelHandler, handlerFunc Channe } if channel != nil { - cw := s.Backend().CloudWatch() - channelTypeDim := cwatch.Dimension("ChannelType", string(channel.ChannelType())) - - // if we have a channel but no events were created, we still log this to metrics - if len(events) == 0 { - if hErr != nil { - cw.Queue(cwatch.Datum("ChannelError", float64(secondDuration), types.StandardUnitSeconds, channelTypeDim)) - } else { - cw.Queue(cwatch.Datum("ChannelIgnored", float64(secondDuration), types.StandardUnitSeconds, channelTypeDim)) - } - } - for _, event := range events { switch e := event.(type) { case MsgIn: clog.SetAttached(true) - cw.Queue(cwatch.Datum("MsgReceive", float64(secondDuration), types.StandardUnitSeconds, channelTypeDim)) LogMsgReceived(r, e) case StatusUpdate: clog.SetAttached(true) - cw.Queue(cwatch.Datum("MsgStatus", float64(secondDuration), types.StandardUnitSeconds, channelTypeDim)) LogMsgStatusReceived(r, e) case ChannelEvent: - cw.Queue(cwatch.Datum("EventReceive", float64(secondDuration), types.StandardUnitSeconds, channelTypeDim)) LogChannelEventReceived(r, e) } } @@ -339,9 +318,10 @@ func (s *server) channelHandleWrapper(handler ChannelHandler, handlerFunc Channe if err := s.backend.WriteChannelLog(ctx, clog); err != nil { slog.Error("error writing channel log", "error", err) } + + s.backend.OnReceiveComplete(ctx, channel, events, clog) } else { slog.Info("non-channel specific request", "error", err, "channel_type", handler.ChannelType(), "request", recorder.Trace.RequestTrace, "status", recorder.Trace.Response.StatusCode) - } } } diff --git a/test/backend.go b/test/backend.go index c7b3219eb..836cbbd30 100644 --- a/test/backend.go +++ b/test/backend.go @@ -13,7 +13,6 @@ import ( _ "github.com/lib/pq" "github.com/nyaruka/courier" "github.com/nyaruka/courier/utils" - "github.com/nyaruka/gocommon/aws/cwatch" "github.com/nyaruka/gocommon/httpx" "github.com/nyaruka/gocommon/urns" "github.com/nyaruka/gocommon/uuids" @@ -46,8 +45,6 @@ type MockBackend struct { mutex sync.RWMutex redisPool *redis.Pool - cw *cwatch.Service - writtenMsgs []courier.MsgIn writtenMsgStatuses []courier.StatusUpdate writtenChannelEvents []courier.ChannelEvent @@ -86,11 +83,6 @@ func NewMockBackend() *MockBackend { log.Fatal(err) } - cw, err := cwatch.NewService("root", "tembatemba", "us-east-1", "Temba", "test") - if err != nil { - log.Fatal(err) - } - return &MockBackend{ channels: make(map[courier.ChannelUUID]courier.Channel), channelsByAddress: make(map[courier.ChannelAddress]courier.Channel), @@ -99,7 +91,6 @@ func NewMockBackend() *MockBackend { sentMsgs: make(map[courier.MsgID]bool), seenExternalIDs: make(map[string]courier.MsgUUID), redisPool: redisPool, - cw: cw, } } @@ -179,14 +170,17 @@ func (mb *MockBackend) ClearMsgSent(ctx context.Context, id courier.MsgID) error return nil } -// MarkOutgoingMsgComplete marks the passed msg as having been dealt with -func (mb *MockBackend) MarkOutgoingMsgComplete(ctx context.Context, msg courier.MsgOut, s courier.StatusUpdate) { +// OnSendComplete marks the passed msg as having been dealt with +func (mb *MockBackend) OnSendComplete(ctx context.Context, msg courier.MsgOut, s courier.StatusUpdate, clog *courier.ChannelLog) { mb.mutex.Lock() defer mb.mutex.Unlock() mb.sentMsgs[msg.ID()] = true } +func (mb *MockBackend) OnReceiveComplete(ctx context.Context, ch courier.Channel, events []courier.Event, clog *courier.ChannelLog) { +} + // WriteChannelLog writes the passed in channel log to the DB func (mb *MockBackend) WriteChannelLog(ctx context.Context, clog *courier.ChannelLog) error { mb.mutex.Lock() @@ -391,11 +385,6 @@ func (mb *MockBackend) RedisPool() *redis.Pool { return mb.redisPool } -// CloudWatch returns the cloudwatch service for this backend -func (mb *MockBackend) CloudWatch() *cwatch.Service { - return mb.cw -} - //////////////////////////////////////////////////////////////////////////////// // Methods not part of the backed interface but used in tests ////////////////////////////////////////////////////////////////////////////////