Skip to content

Commit

Permalink
Merge pull request #658 from nyaruka/slog
Browse files Browse the repository at this point in the history
Replace logrus with slog in rapidpro backend
  • Loading branch information
rowanseymour authored Oct 12, 2023
2 parents edb2fc8 + 56a4ad4 commit 1b14a95
Show file tree
Hide file tree
Showing 11 changed files with 63 additions and 66 deletions.
50 changes: 24 additions & 26 deletions backends/rapidpro/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"database/sql"
"encoding/json"
"fmt"
"log/slog"
"net/url"
"path"
"path/filepath"
Expand All @@ -29,7 +30,6 @@ import (
"github.com/nyaruka/gocommon/uuids"
"github.com/nyaruka/redisx"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

// the name for our message queue
Expand Down Expand Up @@ -105,10 +105,10 @@ func newBackend(cfg *courier.Config) courier.Backend {
// Start starts our RapidPro backend, this tests our various connections and starts our spool flushers
func (b *backend) Start() error {
// parse and test our redis config
log := logrus.WithFields(logrus.Fields{
"comp": "backend",
"state": "starting",
})
log := slog.With(
"comp", "backend",
"state", "starting",
)
log.Info("starting backend")

// parse and test our db config
Expand Down Expand Up @@ -137,7 +137,7 @@ func (b *backend) Start() error {
err = b.db.PingContext(ctx)
cancel()
if err != nil {
log.WithError(err).Error("db not reachable")
log.Error("db not reachable", "error", err)
} else {
log.Info("db ok")
}
Expand Down Expand Up @@ -183,7 +183,7 @@ func (b *backend) Start() error {
defer conn.Close()
_, err = conn.Do("PING")
if err != nil {
log.WithError(err).Error("redis not reachable")
log.Error("redis not reachable", "error", err)
} else {
log.Info("redis ok")
}
Expand Down Expand Up @@ -221,12 +221,12 @@ func (b *backend) Start() error {

// check our storages
if err := checkStorage(b.attachmentStorage); err != nil {
log.WithError(err).Error(b.attachmentStorage.Name() + " attachment storage not available")
log.Error(b.attachmentStorage.Name()+" attachment storage not available", "error", err)
} else {
log.Info(b.attachmentStorage.Name() + " attachment storage ok")
}
if err := checkStorage(b.logStorage); err != nil {
log.WithError(err).Error(b.logStorage.Name() + " log storage not available")
log.Error(b.logStorage.Name()+" log storage not available", "error", err)
} else {
log.Info(b.logStorage.Name() + " log storage ok")
}
Expand All @@ -240,7 +240,7 @@ func (b *backend) Start() error {
err = courier.EnsureSpoolDirPresent(b.config.SpoolDir, "events")
}
if err != nil {
log.WithError(err).Error("spool directories not writable")
log.Error("spool directories not writable", "error", err)
} else {
log.Info("spool directories ok")
}
Expand All @@ -260,7 +260,7 @@ func (b *backend) Start() error {
courier.RegisterFlusher(path.Join(b.config.SpoolDir, "statuses"), b.flushStatusFile)
courier.RegisterFlusher(path.Join(b.config.SpoolDir, "events"), b.flushChannelEventFile)

logrus.WithFields(logrus.Fields{"comp": "backend", "state": "started"}).Info("backend started")
slog.Info("backend started", "comp", "backend", "state", "started")
return nil
}

Expand Down Expand Up @@ -496,14 +496,14 @@ func (b *backend) MarkOutgoingMsgComplete(ctx context.Context, msg courier.MsgOu
rc.Send("expire", dateKey, 60*60*24*2)
_, err := rc.Do("")
if err != nil {
logrus.WithError(err).WithField("sent_msgs_key", dateKey).Error("unable to add new unsent message")
slog.Error("unable to add new unsent message", "error", err, "sent_msgs_key", dateKey)
}

// if our msg has an associated session and timeout, update that
if dbMsg.SessionWaitStartedOn_ != nil {
err = updateSessionTimeout(ctx, b, dbMsg.SessionID_, *dbMsg.SessionWaitStartedOn_, dbMsg.SessionTimeout_)
if err != nil {
logrus.WithError(err).WithField("session_id", dbMsg.SessionID_).Error("unable to update session timeout")
slog.Error("unable to update session timeout", "error", err, "session_id", dbMsg.SessionID_)
}
}
}
Expand All @@ -529,7 +529,7 @@ func (b *backend) NewStatusUpdateByExternalID(channel courier.Channel, externalI

// WriteStatusUpdate writes the passed in MsgStatus to our store
func (b *backend) WriteStatusUpdate(ctx context.Context, status courier.StatusUpdate) error {
log := logrus.WithFields(logrus.Fields{"msg_id": status.MsgID(), "msg_external_id": status.ExternalID(), "status": status.Status()})
log := slog.With("msg_id", status.MsgID(), "msg_external_id", status.ExternalID(), "status", status.Status())
su := status.(*StatusUpdate)

if status.MsgID() == courier.NilMsgID && status.ExternalID() == "" {
Expand All @@ -553,15 +553,15 @@ func (b *backend) WriteStatusUpdate(ctx context.Context, status courier.StatusUp

err := b.sentExternalIDs.Set(rc, fmt.Sprintf("%d|%s", su.ChannelID_, su.ExternalID_), fmt.Sprintf("%d", status.MsgID()))
if err != nil {
log.WithError(err).Error("error recording external id")
log.Error("error recording external id", "error", err)
}
}

// we sent a message that errored so clear our sent flag to allow it to be retried
if status.Status() == courier.MsgStatusErrored {
err := b.ClearMsgSent(ctx, status.MsgID())
if err != nil {
log.WithError(err).Error("error clearing sent flags")
log.Error("error clearing sent flags", "error", err)
}
}
}
Expand Down Expand Up @@ -799,16 +799,14 @@ func (b *backend) Heartbeat() error {
analytics.Gauge("courier.bulk_queue", float64(bulkSize))
analytics.Gauge("courier.priority_queue", float64(prioritySize))

logrus.WithFields(logrus.Fields{
"db_busy": dbStats.InUse,
"db_idle": dbStats.Idle,
"db_wait_time": dbWaitDurationInPeriod,
"db_wait_count": dbWaitCountInPeriod,
"redis_wait_time": dbWaitDurationInPeriod,
"redis_wait_count": dbWaitCountInPeriod,
"priority_size": prioritySize,
"bulk_size": bulkSize,
}).Info("current analytics")
slog.Info("current analytics", "db_busy", dbStats.InUse,
"db_idle", dbStats.Idle,
"db_wait_time", dbWaitDurationInPeriod,
"db_wait_count", dbWaitCountInPeriod,
"redis_wait_time", dbWaitDurationInPeriod,
"redis_wait_count", dbWaitCountInPeriod,
"priority_size", prioritySize,
"bulk_size", bulkSize)

return nil
}
Expand Down
3 changes: 1 addition & 2 deletions backends/rapidpro/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/nyaruka/gocommon/uuids"
"github.com/nyaruka/null/v3"
"github.com/nyaruka/redisx/assertredis"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/suite"
)

Expand All @@ -47,7 +46,7 @@ func (ts *BackendTestSuite) SetupSuite() {
storageDir = "_test_storage"

// turn off logging
logrus.SetOutput(io.Discard)
log.SetOutput(io.Discard)

b, err := courier.NewBackend(testConfig())
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions backends/rapidpro/channel_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"fmt"
"log"
"log/slog"
"os"
"strconv"
"time"
Expand All @@ -14,7 +15,6 @@ import (
"github.com/nyaruka/courier"
"github.com/nyaruka/gocommon/urns"
"github.com/nyaruka/null/v3"
"github.com/sirupsen/logrus"
)

// ChannelEventID is the type of our channel event ids
Expand Down Expand Up @@ -124,7 +124,7 @@ func writeChannelEvent(ctx context.Context, b *backend, event courier.ChannelEve

// failed writing, write to our spool instead
if err != nil {
logrus.WithError(err).WithField("channel_id", dbEvent.ChannelID).WithField("event_type", dbEvent.EventType_).Error("error writing channel event to db")
slog.Error("error writing channel event to db", "error", err, "channel_id", dbEvent.ChannelID, "event_type", dbEvent.EventType_)
}

if err != nil {
Expand Down Expand Up @@ -171,7 +171,7 @@ func writeChannelEventToDB(ctx context.Context, b *backend, e *ChannelEvent, clo
// if we had a problem queueing the event, log it
err = queueChannelEvent(rc, contact, e)
if err != nil {
logrus.WithError(err).WithField("evt_id", e.ID_).Error("error queueing channel event")
slog.Error("error queueing channel event", "error", err, "evt_id", e.ID_)
}

return nil
Expand Down
16 changes: 8 additions & 8 deletions backends/rapidpro/channel_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"log/slog"
"path"
"sync"
"time"
Expand All @@ -15,7 +16,6 @@ import (
"github.com/nyaruka/gocommon/jsonx"
"github.com/nyaruka/gocommon/storage"
"github.com/nyaruka/gocommon/syncx"
"github.com/sirupsen/logrus"
)

const sqlInsertChannelLog = `
Expand Down Expand Up @@ -57,7 +57,7 @@ type channelError struct {

// queues the passed in channel log to a writer
func queueChannelLog(ctx context.Context, b *backend, clog *courier.ChannelLog) {
log := logrus.WithFields(logrus.Fields{"log_uuid": clog.UUID(), "log_type": clog.Type(), "channel_uuid": clog.Channel().UUID()})
log := slog.With("log_uuid", clog.UUID(), "log_type", clog.Type(), "channel_uuid", clog.Channel().UUID())
dbChan := clog.Channel().(*Channel)

// so that we don't save null
Expand All @@ -79,7 +79,7 @@ func queueChannelLog(ctx context.Context, b *backend, clog *courier.ChannelLog)

// if log is attached to a call or message, only write to storage
if clog.Attached() {
log = log.WithField("storage", "s3")
log = log.With("storage", "s3")
v := &stChannelLog{
UUID: clog.UUID(),
Type: clog.Type(),
Expand All @@ -94,7 +94,7 @@ func queueChannelLog(ctx context.Context, b *backend, clog *courier.ChannelLog)
}
} else {
// otherwise write to database so it's retrievable
log = log.WithField("storage", "db")
log = log.With("storage", "db")
v := &dbChannelLog{
UUID: clog.UUID(),
Type: clog.Type(),
Expand Down Expand Up @@ -136,14 +136,14 @@ func writeDBChannelLogs(ctx context.Context, db *sqlx.DB, batch []*dbChannelLog)
for _, v := range batch {
err = dbutil.BulkQuery(ctx, db, sqlInsertChannelLog, []*dbChannelLog{v})
if err != nil {
log := logrus.WithField("comp", "log writer").WithField("log_uuid", v.UUID)
log := slog.With("comp", "log writer", "log_uuid", v.UUID)

if qerr := dbutil.AsQueryError(err); qerr != nil {
query, params := qerr.Query()
log = log.WithFields(logrus.Fields{"sql": query, "sql_params": params})
log = log.With("sql", query, "sql_params", params)
}

log.WithError(err).Error("error writing channel log")
log.Error("error writing channel log", "error", err)
}
}
}
Expand Down Expand Up @@ -174,6 +174,6 @@ func writeStorageChannelLogs(ctx context.Context, st storage.Storage, batch []*s
}
}
if err := st.BatchPut(ctx, uploads); err != nil {
logrus.WithField("comp", "storage log writer").Error("error writing channel logs")
slog.Error("error writing channel logs", "comp", "storage log writer")
}
}
10 changes: 5 additions & 5 deletions backends/rapidpro/contact.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"database/sql/driver"
"log/slog"
"strconv"
"time"
"unicode/utf8"
Expand All @@ -16,7 +17,6 @@ import (
"github.com/nyaruka/gocommon/uuids"
"github.com/nyaruka/null/v3"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

// used by unit tests to slow down urn operations to test races
Expand Down Expand Up @@ -107,7 +107,7 @@ func contactForURN(ctx context.Context, b *backend, org OrgID, channel *Channel,
contact := &Contact{}
err := b.db.GetContext(ctx, contact, lookupContactFromURNSQL, urn.Identity(), org)
if err != nil && err != sql.ErrNoRows {
logrus.WithError(err).WithField("urn", urn.Identity()).WithField("org_id", org).Error("error looking up contact")
slog.Error("error looking up contact", "error", err, "urn", urn.Identity(), "org_id", org)
return nil, errors.Wrap(err, "error looking up contact by URN")
}

Expand All @@ -116,13 +116,13 @@ func contactForURN(ctx context.Context, b *backend, org OrgID, channel *Channel,
// insert it
tx, err := b.db.BeginTxx(ctx, nil)
if err != nil {
logrus.WithError(err).WithField("urn", urn.Identity()).WithField("org_id", org).Error("error looking up contact")
slog.Error("error looking up contact", "error", err, "urn", urn.Identity(), "org_id", org)
return nil, errors.Wrap(err, "error beginning transaction")
}

err = setDefaultURN(tx, channel, contact, urn, authTokens)
if err != nil {
logrus.WithError(err).WithField("urn", urn.Identity()).WithField("org_id", org).Error("error looking up contact")
slog.Error("error looking up contact", "error", err, "urn", urn.Identity(), "org_id", org)
tx.Rollback()
return nil, errors.Wrap(err, "error setting default URN for contact")
}
Expand All @@ -148,7 +148,7 @@ func contactForURN(ctx context.Context, b *backend, org OrgID, channel *Channel,

// in the case of errors, we log the error but move onwards anyways
if err != nil {
logrus.WithField("channel_uuid", channel.UUID()).WithField("channel_type", channel.ChannelType()).WithField("urn", urn).WithError(err).Error("unable to describe URN")
slog.Error("unable to describe URN", "error", err, "channel_uuid", channel.UUID(), "channel_type", channel.ChannelType(), "urn", urn)
} else {
name = attrs["name"]
}
Expand Down
6 changes: 3 additions & 3 deletions backends/rapidpro/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"encoding/json"
"fmt"
"log"
"log/slog"
"os"
"strings"
"time"
Expand All @@ -22,7 +23,6 @@ import (
"github.com/nyaruka/gocommon/uuids"
"github.com/nyaruka/null/v3"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
filetype "gopkg.in/h2non/filetype.v1"
)

Expand Down Expand Up @@ -228,7 +228,7 @@ func writeMsg(ctx context.Context, b *backend, msg courier.MsgIn, clog *courier.

// fail? log
if err != nil {
logrus.WithError(err).WithField("msg", m.UUID()).Error("error writing to db")
slog.Error("error writing to db", "error", err, "msg", m.UUID())
}

// if we failed write to spool
Expand Down Expand Up @@ -282,7 +282,7 @@ func writeMsgToDB(ctx context.Context, b *backend, m *Msg, clog *courier.Channel
// if we had a problem queueing the handling, log it, but our message is written, it'll
// get picked up by our rapidpro catch-all after a period
if err != nil {
logrus.WithError(err).WithField("msg_id", m.ID_).Error("error queueing msg handling")
slog.Error("error queueing msg handling", "error", err, "msg_id", m.ID_)
}

return nil
Expand Down
Loading

0 comments on commit 1b14a95

Please sign in to comment.