Skip to content

Commit

Permalink
api command errors metric
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia committed Dec 10, 2024
1 parent 6df4c9d commit 7a62f22
Show file tree
Hide file tree
Showing 15 changed files with 336 additions and 20 deletions.
39 changes: 33 additions & 6 deletions internal/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

. "github.com/centrifugal/centrifugo/v5/internal/apiproto"
"github.com/centrifugal/centrifugo/v5/internal/config"
"github.com/centrifugal/centrifugo/v5/internal/configtypes"
"github.com/centrifugal/centrifugo/v5/internal/subsource"

"github.com/centrifugal/centrifuge"
Expand Down Expand Up @@ -56,48 +57,66 @@ func (h *Executor) SetRPCExtension(method string, handler RPCHandler) {
}

func (h *Executor) processCmd(ctx context.Context, cmd *Command, i int, replies []*Reply) {
var method string
if cmd.Publish != nil {
method = "publish"
res := h.Publish(ctx, cmd.Publish)
replies[i].Publish, replies[i].Error = res.Result, res.Error
} else if cmd.Broadcast != nil {
method = "broadcast"
res := h.Broadcast(ctx, cmd.Broadcast)
replies[i].Broadcast, replies[i].Error = res.Result, res.Error
} else if cmd.Subscribe != nil {
method = "subscribe"
res := h.Subscribe(ctx, cmd.Subscribe)
replies[i].Subscribe, replies[i].Error = res.Result, res.Error
} else if cmd.Unsubscribe != nil {
method = "unsubscribe"
res := h.Unsubscribe(ctx, cmd.Unsubscribe)
replies[i].Unsubscribe, replies[i].Error = res.Result, res.Error
} else if cmd.Disconnect != nil {
method = "disconnect"
res := h.Disconnect(ctx, cmd.Disconnect)
replies[i].Disconnect, replies[i].Error = res.Result, res.Error
} else if cmd.History != nil {
method = "history"
res := h.History(ctx, cmd.History)
replies[i].History, replies[i].Error = res.Result, res.Error
} else if cmd.HistoryRemove != nil {
method = "history_remove"
res := h.HistoryRemove(ctx, cmd.HistoryRemove)
replies[i].HistoryRemove, replies[i].Error = res.Result, res.Error
} else if cmd.Presence != nil {
method = "presence"
res := h.Presence(ctx, cmd.Presence)
replies[i].Presence, replies[i].Error = res.Result, res.Error
} else if cmd.PresenceStats != nil {
method = "presence_stats"
res := h.PresenceStats(ctx, cmd.PresenceStats)
replies[i].PresenceStats, replies[i].Error = res.Result, res.Error
} else if cmd.Info != nil {
method = "info"
res := h.Info(ctx, cmd.Info)
replies[i].Info, replies[i].Error = res.Result, res.Error
} else if cmd.Rpc != nil {
method = "rpc"
res := h.RPC(ctx, cmd.Rpc)
replies[i].Rpc, replies[i].Error = res.Result, res.Error
} else if cmd.Refresh != nil {
method = "refresh"
res := h.Refresh(ctx, cmd.Refresh)
replies[i].Refresh, replies[i].Error = res.Result, res.Error
} else if cmd.Channels != nil {
method = "channels"
res := h.Channels(ctx, cmd.Channels)
replies[i].Channels, replies[i].Error = res.Result, res.Error
} else {
method = "unknown"
replies[i].Error = ErrorNotFound
}
if replies[i].Error != nil {
incError(h.config.Protocol, method, replies[i].Error.Code)
}
}

// batchRequestMaxConcurrency is applied for the parallel batch request.
Expand Down Expand Up @@ -183,7 +202,7 @@ func (h *Executor) Publish(ctx context.Context, cmd *PublishRequest) *PublishRes
historyMetaTTL := chOpts.HistoryMetaTTL
if cmd.SkipHistory {
historySize = 0
historyTTL = 0
historyTTL = configtypes.Duration(0)
}

delta := cmd.Delta
Expand Down Expand Up @@ -260,20 +279,26 @@ func (h *Executor) Broadcast(ctx context.Context, cmd *BroadcastRequest) *Broadc
defer func() { <-sem }()
defer wg.Done()
if ch == "" {
respError := ErrorBadRequest
incError(h.config.Protocol, "broadcast_publish", respError.Code)
h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "channel can not be blank in broadcast", nil))
responses[i] = &PublishResponse{Error: ErrorBadRequest}
responses[i] = &PublishResponse{Error: respError}
return
}

_, _, chOpts, found, err := h.cfgContainer.ChannelOptions(ch)
if err != nil {
respError := ErrorInternal
incError(h.config.Protocol, "broadcast_publish", respError.Code)
h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error getting options for channel", map[string]any{"channel": ch, "error": err.Error()}))
responses[i] = &PublishResponse{Error: ErrorInternal}
responses[i] = &PublishResponse{Error: respError}
return
}
if !found {
respError := ErrorUnknownChannel
incError(h.config.Protocol, "broadcast_publish", respError.Code)
h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "can't find namespace for channel", map[string]any{"channel": ch}))
responses[i] = &PublishResponse{Error: ErrorUnknownChannel}
responses[i] = &PublishResponse{Error: respError}
return
}

Expand All @@ -282,7 +307,7 @@ func (h *Executor) Broadcast(ctx context.Context, cmd *BroadcastRequest) *Broadc
historyMetaTTL := chOpts.HistoryMetaTTL
if cmd.SkipHistory {
historySize = 0
historyTTL = 0
historyTTL = configtypes.Duration(0)
}

delta := cmd.Delta
Expand All @@ -304,8 +329,10 @@ func (h *Executor) Broadcast(ctx context.Context, cmd *BroadcastRequest) *Broadc
Epoch: result.StreamPosition.Epoch,
}
} else {
respError := ErrorInternal
incError(h.config.Protocol, "publish", respError.Code)
h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error publishing data to channel during broadcast", map[string]any{"channel": ch, "error": err.Error()}))
resp.Error = ErrorInternal
resp.Error = respError
}
responses[i] = resp
}(i, ch)
Expand Down
7 changes: 7 additions & 0 deletions internal/api/consuming_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

52 changes: 52 additions & 0 deletions internal/api/grpc_handler_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 7a62f22

Please sign in to comment.