Skip to content

Commit

Permalink
Refactor metrics so that everything is sent from Heartbeat in the bac…
Browse files Browse the repository at this point in the history
…kend
  • Loading branch information
rowanseymour committed Dec 17, 2024
1 parent 68d55f9 commit 3ea1c7d
Show file tree
Hide file tree
Showing 9 changed files with 278 additions and 114 deletions.
15 changes: 6 additions & 9 deletions backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"strings"

"github.com/gomodule/redigo/redis"
"github.com/nyaruka/gocommon/aws/cwatch"
"github.com/nyaruka/gocommon/httpx"
"github.com/nyaruka/gocommon/urns"
)
Expand Down Expand Up @@ -68,7 +67,7 @@ type Backend interface {
// WriteChannelLog writes the passed in channel log to our backend
WriteChannelLog(context.Context, *ChannelLog) error

// PopNextOutgoingMsg returns the next message that needs to be sent, callers should call MarkOutgoingMsgComplete with the
// PopNextOutgoingMsg returns the next message that needs to be sent, callers should call OnSendComplete with the
// returned message when they have dealt with the message (regardless of whether it was sent or not)
PopNextOutgoingMsg(context.Context) (MsgOut, error)

Expand All @@ -80,10 +79,11 @@ type Backend interface {
// a message is being forced in being resent by a user
ClearMsgSent(context.Context, MsgID) error

// MarkOutgoingMsgComplete marks the passed in message as having been processed. Note this should be called even in the case
// of errors during sending as it will manage the number of active workers per channel. The status parameter can be
// used to determine any sort of deduping of msg sends
MarkOutgoingMsgComplete(context.Context, MsgOut, StatusUpdate)
// OnSendComplete is called when the sender has finished trying to send a message
OnSendComplete(context.Context, MsgOut, StatusUpdate, *ChannelLog)

// OnReceiveComplete is called when the server has finished handling an incoming request
OnReceiveComplete(context.Context, Channel, []Event, *ChannelLog)

// SaveAttachment saves an attachment to backend storage
SaveAttachment(context.Context, Channel, string, []byte, string) (string, error)
Expand All @@ -106,9 +106,6 @@ type Backend interface {

// RedisPool returns the redisPool for this backend
RedisPool() *redis.Pool

// CloudWatch return the CloudWatch service for this backend
CloudWatch() *cwatch.Service
}

// Media is a resolved media object that can be used as a message attachment
Expand Down
81 changes: 44 additions & 37 deletions backends/rapidpro/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,6 @@ func init() {
courier.RegisterBackend("rapidpro", newBackend)
}

type stats struct {
// both sqlx and redis provide wait stats which are cummulative that we need to convert into increments by
// tracking their previous values
dbWaitDuration time.Duration
redisWaitDuration time.Duration
}

type backend struct {
config *courier.Config

Expand Down Expand Up @@ -94,7 +87,7 @@ type backend struct {
// tracking of external ids of messages we've sent in case we need one before its status update has been written
sentExternalIDs *redisx.IntervalHash

stats stats
stats *StatsCollector
}

// NewBackend creates a new RapidPro backend
Expand Down Expand Up @@ -131,6 +124,8 @@ func newBackend(cfg *courier.Config) courier.Backend {
receivedExternalIDs: redisx.NewIntervalHash("seen-external-ids", time.Hour*24, 2), // 24 - 48 hours
sentIDs: redisx.NewIntervalSet("sent-ids", time.Hour, 2), // 1 - 2 hours
sentExternalIDs: redisx.NewIntervalHash("sent-external-ids", time.Hour, 2), // 1 - 2 hours

stats: NewStatsCollector(),
}
}

Expand Down Expand Up @@ -194,7 +189,6 @@ func (b *backend) Start() error {
if err != nil {
return err
}
b.cw.StartQueue(time.Second * 3)

// check attachment bucket access
if err := b.s3.Test(ctx, b.config.S3AttachmentsBucket); err != nil {
Expand Down Expand Up @@ -253,8 +247,6 @@ func (b *backend) Stop() error {
// wait for our threads to exit
b.waitGroup.Wait()

// stop cloudwatch service
b.cw.StopQueue()
return nil
}

Expand Down Expand Up @@ -464,8 +456,8 @@ func (b *backend) ClearMsgSent(ctx context.Context, id courier.MsgID) error {
return b.sentIDs.Rem(rc, id.String())
}

// MarkOutgoingMsgComplete marks the passed in message as having completed processing, freeing up a worker for that channel
func (b *backend) MarkOutgoingMsgComplete(ctx context.Context, msg courier.MsgOut, status courier.StatusUpdate) {
// OnSendComplete is called when the sender has finished trying to send a message
func (b *backend) OnSendComplete(ctx context.Context, msg courier.MsgOut, status courier.StatusUpdate, clog *courier.ChannelLog) {
rc := b.rp.Get()
defer rc.Close()

Expand All @@ -489,6 +481,13 @@ func (b *backend) MarkOutgoingMsgComplete(ctx context.Context, msg courier.MsgOu
slog.Error("unable to update session timeout", "error", err, "session_id", dbMsg.SessionID_)
}
}

b.stats.RecordOutgoing(msg.Channel().ChannelType(), wasSuccess, clog.Elapsed)
}

// OnReceiveComplete is called when the server has finished handling an incoming request
func (b *backend) OnReceiveComplete(ctx context.Context, ch courier.Channel, events []courier.Event, clog *courier.ChannelLog) {
b.stats.RecordIncoming(ch.ChannelType(), events, clog.Elapsed)

Check warning on line 490 in backends/rapidpro/backend.go

View check run for this annotation

Codecov / codecov/patch

backends/rapidpro/backend.go#L489-L490

Added lines #L489 - L490 were not covered by tests
}

// WriteMsg writes the passed in message to our store
Expand Down Expand Up @@ -737,7 +736,6 @@ func (b *backend) Health() string {
return health.String()
}

// Heartbeat is called every minute, we log our queue depth to librato
func (b *backend) Heartbeat() error {
rc := b.rp.Get()
defer rc.Close()
Expand Down Expand Up @@ -774,34 +772,48 @@ func (b *backend) Heartbeat() error {
dbStats := b.db.Stats()
redisStats := b.rp.Stats()

dbWaitDurationInPeriod := dbStats.WaitDuration - b.stats.dbWaitDuration
redisWaitDurationInPeriod := redisStats.WaitDuration - b.stats.redisWaitDuration

b.stats.dbWaitDuration = dbStats.WaitDuration
b.stats.redisWaitDuration = redisStats.WaitDuration
stats := b.stats.Stats(dbStats, redisStats)

metrics := make([]cwtypes.MetricDatum, 0, 10)
hostDim := cwatch.Dimension("Host", b.config.InstanceID)

b.CloudWatch().Queue(
metrics = append(metrics,
cwatch.Datum("DBConnectionsInUse", float64(dbStats.InUse), cwtypes.StandardUnitCount, hostDim),
cwatch.Datum("DBConnectionWaitDuration", float64(dbWaitDurationInPeriod/time.Millisecond), cwtypes.StandardUnitMilliseconds, hostDim),
cwatch.Datum("DBConnectionWaitDuration", float64(stats.DBWaitDuration/time.Second), cwtypes.StandardUnitSeconds, hostDim),
cwatch.Datum("RedisConnectionsInUse", float64(redisStats.ActiveCount), cwtypes.StandardUnitCount, hostDim),
cwatch.Datum("RedisConnectionsWaitDuration", float64(redisWaitDurationInPeriod/time.Millisecond), cwtypes.StandardUnitMilliseconds, hostDim),
)
cwatch.Datum("RedisConnectionsWaitDuration", float64(stats.RedisWaitDuration/time.Second), cwtypes.StandardUnitSeconds, hostDim),

b.CloudWatch().Queue(
cwatch.Datum("QueuedMsgs", float64(bulkSize), cwtypes.StandardUnitCount, cwatch.Dimension("QueueName", "bulk")),
cwatch.Datum("QueuedMsgs", float64(prioritySize), cwtypes.StandardUnitCount, cwatch.Dimension("QueueName", "priority")),
cwatch.Datum("ContactsCreated", float64(stats.ContactsCreated), cwtypes.StandardUnitCount),
)

slog.Info("current metrics",
"db_inuse", dbStats.InUse,
"db_wait", dbWaitDurationInPeriod,
"redis_inuse", redisStats.ActiveCount,
"redis_wait", redisWaitDurationInPeriod,
"priority_size", prioritySize,
"bulk_size", bulkSize,
)
metrics = append(metrics, stats.IncomingRequests.Metrics("IncomingRequests")...)
metrics = append(metrics, stats.IncomingMessages.Metrics("IncomingMessages")...)
metrics = append(metrics, stats.IncomingStatuses.Metrics("IncomingStatuses")...)
metrics = append(metrics, stats.IncomingEvents.Metrics("IncomingEvents")...)
metrics = append(metrics, stats.IncomingIgnored.Metrics("IncomingIgnored")...)
metrics = append(metrics, stats.OutgoingSends.Metrics("OutgoingSends")...)
metrics = append(metrics, stats.OutgoingErrors.Metrics("OutgoingErrors")...)

// turn our duration stats into averages for metrics
for cType, count := range stats.IncomingDuration {
avgTime := float64(count) / float64(stats.IncomingRequests[cType])
metrics = append(metrics, cwatch.Datum("IncomingDuration", float64(avgTime), cwtypes.StandardUnitCount, cwatch.Dimension("ChannelType", string(cType))))
}

Check warning on line 803 in backends/rapidpro/backend.go

View check run for this annotation

Codecov / codecov/patch

backends/rapidpro/backend.go#L801-L803

Added lines #L801 - L803 were not covered by tests
for cType, duration := range stats.OutgoingDuration {
avgTime := float64(duration) / float64(stats.OutgoingSends[cType]+stats.OutgoingErrors[cType])
metrics = append(metrics, cwatch.Datum("OutgoingDuration", avgTime, cwtypes.StandardUnitSeconds, cwatch.Dimension("ChannelType", string(cType))))
}

Check warning on line 807 in backends/rapidpro/backend.go

View check run for this annotation

Codecov / codecov/patch

backends/rapidpro/backend.go#L805-L807

Added lines #L805 - L807 were not covered by tests

ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
if err := b.cw.Send(ctx, metrics...); err != nil {
slog.Error("error sending metrics", "error", err)

Check warning on line 811 in backends/rapidpro/backend.go

View check run for this annotation

Codecov / codecov/patch

backends/rapidpro/backend.go#L811

Added line #L811 was not covered by tests
} else {
slog.Info("sent metrics to cloudwatch", "metrics", len(metrics))
}
cancel()

return nil
}

Expand Down Expand Up @@ -878,8 +890,3 @@ func (b *backend) Status() string {
func (b *backend) RedisPool() *redis.Pool {
return b.rp
}

// CloudWatch return the cloudwatch service
func (b *backend) CloudWatch() *cwatch.Service {
return b.cw
}
2 changes: 1 addition & 1 deletion backends/rapidpro/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -900,7 +900,7 @@ func (ts *BackendTestSuite) TestOutgoingQueue() {
ts.Equal(msg.Text(), "test message")

// mark this message as dealt with
ts.b.MarkOutgoingMsgComplete(ctx, msg, ts.b.NewStatusUpdate(msg.Channel(), msg.ID(), courier.MsgStatusWired, clog))
ts.b.OnSendComplete(ctx, msg, ts.b.NewStatusUpdate(msg.Channel(), msg.ID(), courier.MsgStatusWired, clog), clog)

// this message should now be marked as sent
sent, err := ts.b.WasMsgSent(ctx, msg.ID())
Expand Down
6 changes: 1 addition & 5 deletions backends/rapidpro/contact.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@ import (
"time"
"unicode/utf8"

cwtypes "github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
"github.com/jmoiron/sqlx"
"github.com/nyaruka/courier"
"github.com/nyaruka/gocommon/aws/cwatch"
"github.com/nyaruka/gocommon/dbutil"
"github.com/nyaruka/gocommon/urns"
"github.com/nyaruka/gocommon/uuids"
Expand Down Expand Up @@ -218,9 +216,7 @@ func contactForURN(ctx context.Context, b *backend, org OrgID, channel *Channel,
// store this URN on our contact
contact.URNID_ = contactURN.ID

// report that we created a new contact
b.cw.Queue(cwatch.Datum("ContactCreated", float64(1), cwtypes.StandardUnitCount))
b.stats.RecordContactCreated()

// and return it
return contact, nil
}
158 changes: 158 additions & 0 deletions backends/rapidpro/stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package rapidpro

import (
"database/sql"
"maps"
"sync"
"time"

"github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
"github.com/gomodule/redigo/redis"
"github.com/nyaruka/courier"
"github.com/nyaruka/gocommon/aws/cwatch"
)

type CountByType map[courier.ChannelType]int

func (c CountByType) Metrics(name string) []types.MetricDatum {
m := make([]types.MetricDatum, 0, len(c))
for typ, count := range c {
m = append(m, cwatch.Datum(name, float64(count), types.StandardUnitCount, cwatch.Dimension("ChannelType", string(typ))))
}

Check warning on line 21 in backends/rapidpro/stats.go

View check run for this annotation

Codecov / codecov/patch

backends/rapidpro/stats.go#L20-L21

Added lines #L20 - L21 were not covered by tests
return m
}

type DurationByType map[courier.ChannelType]time.Duration

type Stats struct {
IncomingRequests CountByType // number of handler requests
IncomingMessages CountByType // number of messages received
IncomingStatuses CountByType // number of status updates received
IncomingEvents CountByType // number of other events received
IncomingIgnored CountByType // number of requests ignored
IncomingDuration DurationByType // total time spent handling requests

OutgoingSends CountByType // number of sends that succeeded
OutgoingErrors CountByType // number of sends that errored
OutgoingDuration DurationByType // total time spent sending messages

ContactsCreated int

DBWaitDuration time.Duration
RedisWaitDuration time.Duration
}

func newStats() *Stats {
return &Stats{
IncomingRequests: make(CountByType),
IncomingMessages: make(CountByType),
IncomingStatuses: make(CountByType),
IncomingEvents: make(CountByType),
IncomingIgnored: make(CountByType),
IncomingDuration: make(DurationByType),

OutgoingSends: make(CountByType),
OutgoingErrors: make(CountByType),
OutgoingDuration: make(DurationByType),

ContactsCreated: 0,
}
}

func (s *Stats) reset(db sql.DBStats, rp redis.PoolStats) {
clear(s.IncomingRequests)
clear(s.IncomingMessages)
clear(s.IncomingStatuses)
clear(s.IncomingEvents)
clear(s.IncomingIgnored)
clear(s.IncomingDuration)

clear(s.OutgoingSends)
clear(s.OutgoingErrors)
clear(s.OutgoingDuration)

s.ContactsCreated = 0

s.DBWaitDuration = db.WaitDuration
s.RedisWaitDuration = rp.WaitDuration
}

// StatsCollector provides threadsafe stats collection
type StatsCollector struct {
mutex sync.Mutex
stats *Stats
}

// NewStatsCollector creates a new stats collector
func NewStatsCollector() *StatsCollector {
return &StatsCollector{stats: newStats()}
}

func (c *StatsCollector) RecordIncoming(typ courier.ChannelType, evts []courier.Event, d time.Duration) {
c.mutex.Lock()
c.stats.IncomingRequests[typ]++

for _, e := range evts {
switch e.(type) {
case courier.MsgIn:
c.stats.IncomingMessages[typ]++
case courier.StatusUpdate:
c.stats.IncomingStatuses[typ]++
case courier.ChannelEvent:
c.stats.IncomingEvents[typ]++

Check warning on line 102 in backends/rapidpro/stats.go

View check run for this annotation

Codecov / codecov/patch

backends/rapidpro/stats.go#L96-L102

Added lines #L96 - L102 were not covered by tests
}
}
if len(evts) == 0 {
c.stats.IncomingIgnored[typ]++
}

c.stats.IncomingDuration[typ] += d
c.mutex.Unlock()
}

func (c *StatsCollector) RecordOutgoing(typ courier.ChannelType, success bool, d time.Duration) {
c.mutex.Lock()
if success {
c.stats.OutgoingSends[typ]++
} else {
c.stats.OutgoingErrors[typ]++
}

Check warning on line 119 in backends/rapidpro/stats.go

View check run for this annotation

Codecov / codecov/patch

backends/rapidpro/stats.go#L118-L119

Added lines #L118 - L119 were not covered by tests
c.stats.OutgoingDuration[typ] += d
c.mutex.Unlock()
}

func (c *StatsCollector) RecordContactCreated() {
c.mutex.Lock()
c.stats.ContactsCreated++
c.mutex.Unlock()
}

// Stats returns the stats for the period since the last call
func (c *StatsCollector) Stats(db sql.DBStats, rp redis.PoolStats) *Stats {
c.mutex.Lock()
defer c.mutex.Unlock()

stats := &Stats{
ContactsCreated: c.stats.ContactsCreated,

IncomingRequests: maps.Clone(c.stats.IncomingRequests),
IncomingMessages: maps.Clone(c.stats.IncomingMessages),
IncomingStatuses: maps.Clone(c.stats.IncomingStatuses),
IncomingEvents: maps.Clone(c.stats.IncomingEvents),
IncomingIgnored: maps.Clone(c.stats.IncomingIgnored),
IncomingDuration: maps.Clone(c.stats.IncomingDuration),

OutgoingSends: maps.Clone(c.stats.OutgoingSends),
OutgoingErrors: maps.Clone(c.stats.OutgoingErrors),
OutgoingDuration: maps.Clone(c.stats.OutgoingDuration),

// both sqlx and redis provide wait stats which are cummulative that we need to convert into increments by
// tracking their previous values
DBWaitDuration: db.WaitDuration - c.stats.DBWaitDuration,
RedisWaitDuration: rp.WaitDuration - c.stats.RedisWaitDuration,
}

c.stats.reset(db, rp)

return stats
}
Loading

0 comments on commit 3ea1c7d

Please sign in to comment.