Skip to content

Commit

Permalink
feat: Add metric for Store And Forward queue size (#1538)
Browse files Browse the repository at this point in the history
* feat: Add metric for Store And Forward queue size

closes #1138

Signed-off-by: Leonard Goodell <[email protected]>
  • Loading branch information
Lenny Goodell authored Jan 17, 2024
1 parent 15d59e2 commit 763fccf
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 19 deletions.
26 changes: 12 additions & 14 deletions internal/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,21 @@

package internal

import (
"github.com/edgexfoundry/go-mod-core-contracts/v3/common"
)

const (
ApiTriggerRoute = common.ApiBase + "/trigger"
ApiAddSecretRoute = common.ApiBase + "/secret"
)
import "github.com/edgexfoundry/go-mod-core-contracts/v3/common"

// SDKVersion indicates the version of the SDK - will be overwritten by build
var SDKVersion = "0.0.0"

// ApplicationVersion indicates the version of the application itself, not the SDK - will be overwritten by build
var ApplicationVersion = "0.0.0"

// Names for the Common Application Service Metrics
// Misc Constants
const (
ApiTriggerRoute = common.ApiBase + "/trigger"
MessageBusSubscribeTopics = "SubscribeTopics"
)

// Common Application Service Metrics constants
const (
MessagesReceivedName = "MessagesReceived"
InvalidMessagesReceivedName = "InvalidMessagesReceived"
Expand All @@ -43,9 +42,8 @@ const (
HttpExportErrorsName = "HttpExportErrors"
MqttExportSizeName = "MqttExportSize"
MqttExportErrorsName = "MqttExportErrors"
MessageBusSubscribeTopics = "SubscribeTopics"
MessageBusPublishTopic = "PublishTopic"
)
StoreForwardQueueSizeName = "StoreForwardQueueSize"

// MetricsReservoirSize is the default Metrics Sample Reservoir size
const MetricsReservoirSize = 1028
// MetricsReservoirSize is the default Metrics Sample Reservoir size
MetricsReservoirSize = 1028
)
24 changes: 19 additions & 5 deletions internal/runtime/storeforward.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"sync/atomic"
"time"

"github.com/edgexfoundry/app-functions-sdk-go/v3/internal"
"github.com/edgexfoundry/app-functions-sdk-go/v3/internal/appfunction"
"github.com/edgexfoundry/app-functions-sdk-go/v3/internal/bootstrap/container"
"github.com/edgexfoundry/app-functions-sdk-go/v3/pkg/interfaces"
Expand All @@ -48,16 +49,29 @@ type storeForwardInfo struct {
}

func newStoreAndForward(runtime *FunctionsPipelineRuntime, dic *di.Container, serviceKey string) *storeForwardInfo {
return &storeForwardInfo{
sf := &storeForwardInfo{
runtime: runtime,
dic: dic,
lc: bootstrapContainer.LoggingClientFrom(dic.Get),
serviceKey: serviceKey,
// Using counter metric now for the retry on success feature because it is thread safe
// and will also be used in future metrics feature to track size of the S&F queue.
// TODO: Register counter metric when using it for the metrics capability
dataCount: gometrics.NewCounter(),
dataCount: gometrics.NewCounter(),
}

metricsManager := bootstrapContainer.MetricsManagerFrom(dic.Get)
if metricsManager == nil {
sf.lc.Errorf("Unable to register %s metric: MetricsManager is not available.", internal.StoreForwardQueueSizeName)
return sf
}

err := metricsManager.Register(internal.StoreForwardQueueSizeName, sf.dataCount, nil)
if err != nil {
sf.lc.Errorf("Unable to register metric %s. Collection will continue, but metric will not be reported: %v",
internal.StoreForwardQueueSizeName, err)
}

sf.lc.Infof("%s metric has been registered and will be reported (if enabled)", internal.StoreForwardQueueSizeName)

return sf
}

func (sf *storeForwardInfo) startStoreAndForwardRetryLoop(
Expand Down
2 changes: 2 additions & 0 deletions internal/runtime/storeforward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,8 @@ func TestStoreForLaterRetry(t *testing.T) {

func TestTriggerRetry(t *testing.T) {
mockLogger := &loggerMocks.LoggingClient{}
mockLogger.On("Infof", "%s metric has been registered and will be reported (if enabled)", "StoreForwardQueueSize")

dic.Update(di.ServiceConstructorMap{
bootstrapContainer.LoggingClientInterfaceName: func(get di.Get) interface{} {
return mockLogger
Expand Down

0 comments on commit 763fccf

Please sign in to comment.