Skip to content

Commit

Permalink
Refactor metrics so that everything is sent from Heartbeat in the bac…
Browse files Browse the repository at this point in the history
…kend
  • Loading branch information
rowanseymour committed Dec 17, 2024
1 parent 68d55f9 commit f191230
Show file tree
Hide file tree
Showing 9 changed files with 276 additions and 114 deletions.
15 changes: 6 additions & 9 deletions backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"strings"

"github.com/gomodule/redigo/redis"
"github.com/nyaruka/gocommon/aws/cwatch"
"github.com/nyaruka/gocommon/httpx"
"github.com/nyaruka/gocommon/urns"
)
Expand Down Expand Up @@ -68,7 +67,7 @@ type Backend interface {
// WriteChannelLog writes the passed in channel log to our backend
WriteChannelLog(context.Context, *ChannelLog) error

// PopNextOutgoingMsg returns the next message that needs to be sent, callers should call MarkOutgoingMsgComplete with the
// PopNextOutgoingMsg returns the next message that needs to be sent, callers should call OnSendComplete with the
// returned message when they have dealt with the message (regardless of whether it was sent or not)
PopNextOutgoingMsg(context.Context) (MsgOut, error)

Expand All @@ -80,10 +79,11 @@ type Backend interface {
// a message is being forced in being resent by a user
ClearMsgSent(context.Context, MsgID) error

// MarkOutgoingMsgComplete marks the passed in message as having been processed. Note this should be called even in the case
// of errors during sending as it will manage the number of active workers per channel. The status parameter can be
// used to determine any sort of deduping of msg sends
MarkOutgoingMsgComplete(context.Context, MsgOut, StatusUpdate)
// OnSendComplete is called when the sender has finished trying to send a message
OnSendComplete(context.Context, MsgOut, StatusUpdate, *ChannelLog)

// OnReceiveComplete is called when the server has finished handling an incoming request
OnReceiveComplete(context.Context, Channel, []Event, *ChannelLog)

// SaveAttachment saves an attachment to backend storage
SaveAttachment(context.Context, Channel, string, []byte, string) (string, error)
Expand All @@ -106,9 +106,6 @@ type Backend interface {

// RedisPool returns the redisPool for this backend
RedisPool() *redis.Pool

// CloudWatch return the CloudWatch service for this backend
CloudWatch() *cwatch.Service
}

// Media is a resolved media object that can be used as a message attachment
Expand Down
93 changes: 56 additions & 37 deletions backends/rapidpro/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,6 @@ func init() {
courier.RegisterBackend("rapidpro", newBackend)
}

type stats struct {
// both sqlx and redis provide wait stats which are cummulative that we need to convert into increments by
// tracking their previous values
dbWaitDuration time.Duration
redisWaitDuration time.Duration
}

type backend struct {
config *courier.Config

Expand Down Expand Up @@ -94,7 +87,7 @@ type backend struct {
// tracking of external ids of messages we've sent in case we need one before its status update has been written
sentExternalIDs *redisx.IntervalHash

stats stats
stats *StatsCollector
}

// NewBackend creates a new RapidPro backend
Expand Down Expand Up @@ -131,6 +124,8 @@ func newBackend(cfg *courier.Config) courier.Backend {
receivedExternalIDs: redisx.NewIntervalHash("seen-external-ids", time.Hour*24, 2), // 24 - 48 hours
sentIDs: redisx.NewIntervalSet("sent-ids", time.Hour, 2), // 1 - 2 hours
sentExternalIDs: redisx.NewIntervalHash("sent-external-ids", time.Hour, 2), // 1 - 2 hours

stats: NewStatsCollector(),
}
}

Expand Down Expand Up @@ -194,7 +189,6 @@ func (b *backend) Start() error {
if err != nil {
return err
}
b.cw.StartQueue(time.Second * 3)

// check attachment bucket access
if err := b.s3.Test(ctx, b.config.S3AttachmentsBucket); err != nil {
Expand Down Expand Up @@ -253,8 +247,6 @@ func (b *backend) Stop() error {
// wait for our threads to exit
b.waitGroup.Wait()

// stop cloudwatch service
b.cw.StopQueue()
return nil
}

Expand Down Expand Up @@ -464,8 +456,8 @@ func (b *backend) ClearMsgSent(ctx context.Context, id courier.MsgID) error {
return b.sentIDs.Rem(rc, id.String())
}

// MarkOutgoingMsgComplete marks the passed in message as having completed processing, freeing up a worker for that channel
func (b *backend) MarkOutgoingMsgComplete(ctx context.Context, msg courier.MsgOut, status courier.StatusUpdate) {
// OnSendComplete is called when the sender has finished trying to send a message
func (b *backend) OnSendComplete(ctx context.Context, msg courier.MsgOut, status courier.StatusUpdate, clog *courier.ChannelLog) {
rc := b.rp.Get()
defer rc.Close()

Expand All @@ -489,6 +481,13 @@ func (b *backend) MarkOutgoingMsgComplete(ctx context.Context, msg courier.MsgOu
slog.Error("unable to update session timeout", "error", err, "session_id", dbMsg.SessionID_)
}
}

b.stats.RecordSend(msg.Channel().ChannelType(), wasSuccess, clog.Elapsed)
}

// OnReceiveComplete is called when the server has finished handling an incoming request
func (b *backend) OnReceiveComplete(ctx context.Context, ch courier.Channel, events []courier.Event, clog *courier.ChannelLog) {
b.stats.RecordReceive(ch.ChannelType(), events, clog.Elapsed)

Check warning on line 490 in backends/rapidpro/backend.go

View check run for this annotation

Codecov / codecov/patch

backends/rapidpro/backend.go#L489-L490

Added lines #L489 - L490 were not covered by tests
}

// WriteMsg writes the passed in message to our store
Expand Down Expand Up @@ -737,7 +736,6 @@ func (b *backend) Health() string {
return health.String()
}

// Heartbeat is called every minute, we log our queue depth to librato
func (b *backend) Heartbeat() error {
rc := b.rp.Get()
defer rc.Close()
Expand Down Expand Up @@ -774,34 +772,60 @@ func (b *backend) Heartbeat() error {
dbStats := b.db.Stats()
redisStats := b.rp.Stats()

dbWaitDurationInPeriod := dbStats.WaitDuration - b.stats.dbWaitDuration
redisWaitDurationInPeriod := redisStats.WaitDuration - b.stats.redisWaitDuration

b.stats.dbWaitDuration = dbStats.WaitDuration
b.stats.redisWaitDuration = redisStats.WaitDuration
stats := b.stats.Stats(dbStats, redisStats)

metrics := make([]cwtypes.MetricDatum, 0, 10)
hostDim := cwatch.Dimension("Host", b.config.InstanceID)

b.CloudWatch().Queue(
metrics = append(metrics,
cwatch.Datum("DBConnectionsInUse", float64(dbStats.InUse), cwtypes.StandardUnitCount, hostDim),
cwatch.Datum("DBConnectionWaitDuration", float64(dbWaitDurationInPeriod/time.Millisecond), cwtypes.StandardUnitMilliseconds, hostDim),
cwatch.Datum("DBConnectionWaitDuration", float64(stats.DBWaitDuration/time.Second), cwtypes.StandardUnitSeconds, hostDim),
cwatch.Datum("RedisConnectionsInUse", float64(redisStats.ActiveCount), cwtypes.StandardUnitCount, hostDim),
cwatch.Datum("RedisConnectionsWaitDuration", float64(redisWaitDurationInPeriod/time.Millisecond), cwtypes.StandardUnitMilliseconds, hostDim),
)
cwatch.Datum("RedisConnectionsWaitDuration", float64(stats.RedisWaitDuration/time.Second), cwtypes.StandardUnitSeconds, hostDim),

b.CloudWatch().Queue(
cwatch.Datum("QueuedMsgs", float64(bulkSize), cwtypes.StandardUnitCount, cwatch.Dimension("QueueName", "bulk")),
cwatch.Datum("QueuedMsgs", float64(prioritySize), cwtypes.StandardUnitCount, cwatch.Dimension("QueueName", "priority")),
cwatch.Datum("ContactsCreated", float64(stats.ContactsCreated), cwtypes.StandardUnitCount),
)

slog.Info("current metrics",
"db_inuse", dbStats.InUse,
"db_wait", dbWaitDurationInPeriod,
"redis_inuse", redisStats.ActiveCount,
"redis_wait", redisWaitDurationInPeriod,
"priority_size", prioritySize,
"bulk_size", bulkSize,
)
for cType, count := range stats.ReceiveRequests {
metrics = append(metrics, cwatch.Datum("ReceiveRequests", float64(count), cwtypes.StandardUnitCount, cwatch.Dimension("ChannelType", string(cType))))
}

Check warning on line 793 in backends/rapidpro/backend.go

View check run for this annotation

Codecov / codecov/patch

backends/rapidpro/backend.go#L792-L793

Added lines #L792 - L793 were not covered by tests
for cType, count := range stats.ReceiveMessages {
metrics = append(metrics, cwatch.Datum("ReceiveMessages", float64(count), cwtypes.StandardUnitCount, cwatch.Dimension("ChannelType", string(cType))))
}

Check warning on line 796 in backends/rapidpro/backend.go

View check run for this annotation

Codecov / codecov/patch

backends/rapidpro/backend.go#L795-L796

Added lines #L795 - L796 were not covered by tests
for cType, count := range stats.ReceiveStatuses {
metrics = append(metrics, cwatch.Datum("ReceiveStatuses", float64(count), cwtypes.StandardUnitCount, cwatch.Dimension("ChannelType", string(cType))))
}

Check warning on line 799 in backends/rapidpro/backend.go

View check run for this annotation

Codecov / codecov/patch

backends/rapidpro/backend.go#L798-L799

Added lines #L798 - L799 were not covered by tests
for cType, count := range stats.ReceiveEvents {
metrics = append(metrics, cwatch.Datum("ReceiveEvents", float64(count), cwtypes.StandardUnitCount, cwatch.Dimension("ChannelType", string(cType))))
}

Check warning on line 802 in backends/rapidpro/backend.go

View check run for this annotation

Codecov / codecov/patch

backends/rapidpro/backend.go#L801-L802

Added lines #L801 - L802 were not covered by tests
for cType, count := range stats.ReceiveIgnored {
metrics = append(metrics, cwatch.Datum("ReceiveIgnored", float64(count), cwtypes.StandardUnitCount, cwatch.Dimension("ChannelType", string(cType))))
}

Check warning on line 805 in backends/rapidpro/backend.go

View check run for this annotation

Codecov / codecov/patch

backends/rapidpro/backend.go#L804-L805

Added lines #L804 - L805 were not covered by tests
for cType, count := range stats.ReceiveDuration {
avgTime := float64(count) / float64(stats.ReceiveRequests[cType])
metrics = append(metrics, cwatch.Datum("ReceiveDuration", float64(avgTime), cwtypes.StandardUnitCount, cwatch.Dimension("ChannelType", string(cType))))
}

Check warning on line 809 in backends/rapidpro/backend.go

View check run for this annotation

Codecov / codecov/patch

backends/rapidpro/backend.go#L807-L809

Added lines #L807 - L809 were not covered by tests
for cType, count := range stats.SendSuccesses {
metrics = append(metrics, cwatch.Datum("SendSucceeded", float64(count), cwtypes.StandardUnitCount, cwatch.Dimension("ChannelType", string(cType))))
}

Check warning on line 812 in backends/rapidpro/backend.go

View check run for this annotation

Codecov / codecov/patch

backends/rapidpro/backend.go#L811-L812

Added lines #L811 - L812 were not covered by tests
for cType, count := range stats.SendErrors {
metrics = append(metrics, cwatch.Datum("SendErrored", float64(count), cwtypes.StandardUnitCount, cwatch.Dimension("ChannelType", string(cType))))
}

Check warning on line 815 in backends/rapidpro/backend.go

View check run for this annotation

Codecov / codecov/patch

backends/rapidpro/backend.go#L814-L815

Added lines #L814 - L815 were not covered by tests
for cType, duration := range stats.SendDuration {
avgTime := float64(duration) / float64(stats.SendSuccesses[cType]+stats.SendErrors[cType])
metrics = append(metrics, cwatch.Datum("SendDuration", avgTime, cwtypes.StandardUnitSeconds, cwatch.Dimension("ChannelType", string(cType))))
}

Check warning on line 819 in backends/rapidpro/backend.go

View check run for this annotation

Codecov / codecov/patch

backends/rapidpro/backend.go#L817-L819

Added lines #L817 - L819 were not covered by tests

ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
if err := b.cw.Send(ctx, metrics...); err != nil {
slog.Error("error sending metrics", "error", err)

Check warning on line 823 in backends/rapidpro/backend.go

View check run for this annotation

Codecov / codecov/patch

backends/rapidpro/backend.go#L823

Added line #L823 was not covered by tests
} else {
slog.Info("sent metrics to cloudwatch", "metrics", len(metrics))
}
cancel()

return nil
}

Expand Down Expand Up @@ -878,8 +902,3 @@ func (b *backend) Status() string {
func (b *backend) RedisPool() *redis.Pool {
return b.rp
}

// CloudWatch return the cloudwatch service
func (b *backend) CloudWatch() *cwatch.Service {
return b.cw
}
2 changes: 1 addition & 1 deletion backends/rapidpro/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -900,7 +900,7 @@ func (ts *BackendTestSuite) TestOutgoingQueue() {
ts.Equal(msg.Text(), "test message")

// mark this message as dealt with
ts.b.MarkOutgoingMsgComplete(ctx, msg, ts.b.NewStatusUpdate(msg.Channel(), msg.ID(), courier.MsgStatusWired, clog))
ts.b.OnSendComplete(ctx, msg, ts.b.NewStatusUpdate(msg.Channel(), msg.ID(), courier.MsgStatusWired, clog), clog)

// this message should now be marked as sent
sent, err := ts.b.WasMsgSent(ctx, msg.ID())
Expand Down
6 changes: 1 addition & 5 deletions backends/rapidpro/contact.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@ import (
"time"
"unicode/utf8"

cwtypes "github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
"github.com/jmoiron/sqlx"
"github.com/nyaruka/courier"
"github.com/nyaruka/gocommon/aws/cwatch"
"github.com/nyaruka/gocommon/dbutil"
"github.com/nyaruka/gocommon/urns"
"github.com/nyaruka/gocommon/uuids"
Expand Down Expand Up @@ -218,9 +216,7 @@ func contactForURN(ctx context.Context, b *backend, org OrgID, channel *Channel,
// store this URN on our contact
contact.URNID_ = contactURN.ID

// report that we created a new contact
b.cw.Queue(cwatch.Datum("ContactCreated", float64(1), cwtypes.StandardUnitCount))
b.stats.RecordContactCreated()

// and return it
return contact, nil
}
144 changes: 144 additions & 0 deletions backends/rapidpro/stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package rapidpro

import (
"database/sql"
"maps"
"sync"
"time"

"github.com/gomodule/redigo/redis"
"github.com/nyaruka/courier"
)

type Stats struct {
ReceiveRequests map[courier.ChannelType]int // number of handler requests
ReceiveMessages map[courier.ChannelType]int // number of messages received
ReceiveStatuses map[courier.ChannelType]int // number of status updates received
ReceiveEvents map[courier.ChannelType]int // number of other events received
ReceiveIgnored map[courier.ChannelType]int // number of requests ignored
ReceiveDuration map[courier.ChannelType]time.Duration

SendSuccesses map[courier.ChannelType]int // number of sends that succeeded
SendErrors map[courier.ChannelType]int // number of sends that errored
SendDuration map[courier.ChannelType]time.Duration

ContactsCreated int

DBWaitDuration time.Duration
RedisWaitDuration time.Duration
}

func newStats() *Stats {
return &Stats{
ReceiveRequests: make(map[courier.ChannelType]int),
ReceiveMessages: make(map[courier.ChannelType]int),
ReceiveStatuses: make(map[courier.ChannelType]int),
ReceiveEvents: make(map[courier.ChannelType]int),
ReceiveIgnored: make(map[courier.ChannelType]int),
ReceiveDuration: make(map[courier.ChannelType]time.Duration),

SendSuccesses: make(map[courier.ChannelType]int),
SendErrors: make(map[courier.ChannelType]int),
SendDuration: make(map[courier.ChannelType]time.Duration),

ContactsCreated: 0,
}
}

func (s *Stats) reset(db sql.DBStats, rp redis.PoolStats) {
clear(s.ReceiveRequests)
clear(s.ReceiveMessages)
clear(s.ReceiveStatuses)
clear(s.ReceiveEvents)
clear(s.ReceiveIgnored)
clear(s.ReceiveDuration)

clear(s.SendSuccesses)
clear(s.SendErrors)
clear(s.SendDuration)

s.ContactsCreated = 0

s.DBWaitDuration = db.WaitDuration
s.RedisWaitDuration = rp.WaitDuration
}

// StatsCollector provides threadsafe stats collection
type StatsCollector struct {
mutex sync.Mutex
stats *Stats
}

// NewStatsCollector creates a new stats collector
func NewStatsCollector() *StatsCollector {
return &StatsCollector{stats: newStats()}
}

func (c *StatsCollector) RecordReceive(typ courier.ChannelType, evts []courier.Event, d time.Duration) {
c.mutex.Lock()
c.stats.ReceiveRequests[typ]++

for _, e := range evts {
switch e.(type) {
case courier.MsgIn:
c.stats.ReceiveMessages[typ]++
case courier.StatusUpdate:
c.stats.ReceiveStatuses[typ]++
case courier.ChannelEvent:
c.stats.ReceiveEvents[typ]++

Check warning on line 88 in backends/rapidpro/stats.go

View check run for this annotation

Codecov / codecov/patch

backends/rapidpro/stats.go#L82-L88

Added lines #L82 - L88 were not covered by tests
}
}
if len(evts) == 0 {
c.stats.ReceiveIgnored[typ]++
}

c.stats.ReceiveDuration[typ] += d
c.mutex.Unlock()
}

func (c *StatsCollector) RecordSend(typ courier.ChannelType, success bool, d time.Duration) {
c.mutex.Lock()
if success {
c.stats.SendSuccesses[typ]++
} else {
c.stats.SendErrors[typ]++
}

Check warning on line 105 in backends/rapidpro/stats.go

View check run for this annotation

Codecov / codecov/patch

backends/rapidpro/stats.go#L104-L105

Added lines #L104 - L105 were not covered by tests
c.stats.SendDuration[typ] += d
c.mutex.Unlock()
}

func (c *StatsCollector) RecordContactCreated() {
c.mutex.Lock()
c.stats.ContactsCreated++
c.mutex.Unlock()
}

// Stats returns the stats for the period since the last call
func (c *StatsCollector) Stats(db sql.DBStats, rp redis.PoolStats) *Stats {
c.mutex.Lock()
defer c.mutex.Unlock()

stats := &Stats{
ContactsCreated: c.stats.ContactsCreated,

ReceiveRequests: maps.Clone(c.stats.ReceiveRequests),
ReceiveMessages: maps.Clone(c.stats.ReceiveMessages),
ReceiveStatuses: maps.Clone(c.stats.ReceiveStatuses),
ReceiveEvents: maps.Clone(c.stats.ReceiveEvents),
ReceiveIgnored: maps.Clone(c.stats.ReceiveIgnored),
ReceiveDuration: maps.Clone(c.stats.ReceiveDuration),

SendSuccesses: maps.Clone(c.stats.SendSuccesses),
SendErrors: maps.Clone(c.stats.SendErrors),
SendDuration: maps.Clone(c.stats.SendDuration),

// both sqlx and redis provide wait stats which are cummulative that we need to convert into increments by
// tracking their previous values
DBWaitDuration: db.WaitDuration - c.stats.DBWaitDuration,
RedisWaitDuration: rp.WaitDuration - c.stats.RedisWaitDuration,
}

c.stats.reset(db, rp)

return stats
}
Loading

0 comments on commit f191230

Please sign in to comment.