Skip to content

Commit

Permalink
fix: batch processing in BTC Subscription Poller (#31)
Browse files Browse the repository at this point in the history
  • Loading branch information
gusin13 authored Jan 10, 2025
1 parent 710451a commit 3319760
Show file tree
Hide file tree
Showing 13 changed files with 241 additions and 83 deletions.
2 changes: 1 addition & 1 deletion cmd/staking-expiry-checker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func main() {
defer cancel()

// Create DB client
dbClient, err := db.New(ctx, cfg.Db)
dbClient, err := db.New(ctx, &cfg.Db)
if err != nil {
log.Fatal().Err(err).Msg("error while creating db client")
}
Expand Down
9 changes: 5 additions & 4 deletions config/config-docker.yml
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
pollers:
log-level: debug
expiry-checker:
interval: 5s
timeout: 10s
interval: 10s
timeout: 100s
btc-subscriber:
interval: 5s
timeout: 10s
interval: 10s
timeout: 100s
db:
username: root
password: example
address: "mongodb://localhost:27017"
db-name: staking-api-service
max-pagination-limit: 1000
btc:
rpchost: 127.0.0.1:38332
rpcuser: rpcuser
Expand Down
9 changes: 5 additions & 4 deletions config/config-local.yml
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
pollers:
log-level: debug
expiry-checker:
interval: 5s
timeout: 1000s
interval: 10s
timeout: 100s
btc-subscriber:
interval: 5s
timeout: 1000s
interval: 10s
timeout: 100s
db:
username: root
password: example
address: "mongodb://localhost:27017"
db-name: staking-api-service
max-pagination-limit: 1000
btc:
rpchost: 127.0.0.1:38332
rpcuser: rpcuser
Expand Down
13 changes: 9 additions & 4 deletions internal/config/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ import (
)

type DbConfig struct {
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
DbName string `mapstructure:"db-name"`
Address string `mapstructure:"address"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
DbName string `mapstructure:"db-name"`
Address string `mapstructure:"address"`
MaxPaginationLimit int64 `mapstructure:"max-pagination-limit"`
}

func (cfg *DbConfig) Validate() error {
Expand Down Expand Up @@ -57,5 +58,9 @@ func (cfg *DbConfig) Validate() error {
return fmt.Errorf("port number must be between 1024 and 65535 (inclusive)")
}

if cfg.MaxPaginationLimit < 2 {
return fmt.Errorf("max pagination limit must be greater than 1")
}

return nil
}
61 changes: 60 additions & 1 deletion internal/db/dbclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package db
import (
"context"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"

Expand All @@ -12,9 +13,15 @@ import (
type Database struct {
dbName string
client *mongo.Client
cfg *config.DbConfig
}

func New(ctx context.Context, cfg config.DbConfig) (*Database, error) {
type DbResultMap[T any] struct {
Data []T `json:"data"`
PaginationToken string `json:"paginationToken"`
}

func New(ctx context.Context, cfg *config.DbConfig) (*Database, error) {
credential := options.Credential{
Username: cfg.Username,
Password: cfg.Password,
Expand All @@ -28,6 +35,7 @@ func New(ctx context.Context, cfg config.DbConfig) (*Database, error) {
return &Database{
dbName: cfg.DbName,
client: client,
cfg: cfg,
}, nil
}

Expand All @@ -42,3 +50,54 @@ func (db *Database) Ping(ctx context.Context) error {
func (db *Database) Shutdown(ctx context.Context) error {
return db.client.Disconnect(ctx)
}

/*
Builds the result map with a pagination token.
If the result length exceeds the maximum limit, it returns the map with a token.
Otherwise, it returns the map with an empty token. Note that the pagination
limit is the maximum number of results to return.
For example, if the limit is 10, it fetches 11 but returns only 10.
The last result is used to generate the pagination token.
*/
func toResultMapWithPaginationToken[T any](paginationLimit int64, result []T, paginationKeyBuilder func(T) (string, error)) (*DbResultMap[T], error) {
if len(result) > int(paginationLimit) {
result = result[:paginationLimit]
paginationToken, err := paginationKeyBuilder(result[len(result)-1])
if err != nil {
return nil, err
}
return &DbResultMap[T]{
Data: result,
PaginationToken: paginationToken,
}, nil
}

return &DbResultMap[T]{
Data: result,
PaginationToken: "",
}, nil
}

// Finds documents in the collection with pagination in returned results.
func findWithPagination[T any](
ctx context.Context, client *mongo.Collection, filter bson.M,
options *options.FindOptions, limit int64,
paginationKeyBuilder func(T) (string, error),
) (*DbResultMap[T], error) {
// Always fetch one more than the limit to check if there are more results
// this is used to generate the pagination token
options.SetLimit(limit + 1)

cursor, err := client.Find(ctx, filter, options)
if err != nil {
return nil, err
}
defer cursor.Close(ctx)

var result []T
if err = cursor.All(ctx, &result); err != nil {
return nil, err
}

return toResultMapWithPaginationToken(limit, result, paginationKeyBuilder)
}
42 changes: 27 additions & 15 deletions internal/db/delegation.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,30 +107,42 @@ func (db *Database) GetBTCDelegationByStakingTxHash(
func (db *Database) GetBTCDelegationsByStates(
ctx context.Context,
states []types.DelegationState,
) ([]*model.DelegationDocument, error) {
// Convert states to a slice of strings
paginationToken string,
) (*DbResultMap[model.DelegationDocument], error) {
// Convert states to strings
stateStrings := make([]string, len(states))
for i, state := range states {
stateStrings[i] = state.ToString()
}

filter := bson.M{"state": bson.M{"$in": stateStrings}}
opts := options.Find().SetLimit(200) // to prevent large result sets

cursor, err := db.client.Database(db.dbName).
Collection(model.DelegationsCollection).
Find(ctx, filter, opts)
if err != nil {
return nil, err
// Build filter
filter := bson.M{
"state": bson.M{"$in": stateStrings},
}
defer cursor.Close(ctx)

var delegations []*model.DelegationDocument
if err := cursor.All(ctx, &delegations); err != nil {
return nil, err
// Setup options
options := options.Find()
options.SetSort(bson.M{"_id": 1})

// Decode pagination token if it exists
if paginationToken != "" {
decodedToken, err := model.DecodePaginationToken[model.DelegationScanPagination](paginationToken)
if err != nil {
return nil, &InvalidPaginationTokenError{
Message: "Invalid pagination token",
}
}
filter["_id"] = bson.M{"$gt": decodedToken.StakingTxHashHex}
}

return delegations, nil
return findWithPagination(
ctx,
db.client.Database(db.dbName).Collection(model.DelegationsCollection),
filter,
options,
db.cfg.MaxPaginationLimit,
model.BuildDelegationScanPaginationToken,
)
}

func (db *Database) GetBTCDelegationState(
Expand Down
6 changes: 5 additions & 1 deletion internal/db/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ type DbInterface interface {
GetBTCDelegationByStakingTxHash(
ctx context.Context, stakingTxHash string,
) (*model.DelegationDocument, error)
GetBTCDelegationsByStates(ctx context.Context, states []types.DelegationState) ([]*model.DelegationDocument, error)
GetBTCDelegationsByStates(
ctx context.Context,
states []types.DelegationState,
paginationToken string,
) (*DbResultMap[model.DelegationDocument], error)
GetBTCDelegationState(ctx context.Context, stakingTxHash string) (*types.DelegationState, error)
}
15 changes: 15 additions & 0 deletions internal/db/model/delegation.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,18 @@ type DelegationDocument struct {
StakingTx *TimelockTransaction `bson:"staking_tx"` // Always exist
UnbondingTx *TimelockTransaction `bson:"unbonding_tx,omitempty"`
}

type DelegationScanPagination struct {
StakingTxHashHex string `json:"staking_tx_hash_hex"`
}

func BuildDelegationScanPaginationToken(d DelegationDocument) (string, error) {
page := &DelegationScanPagination{
StakingTxHashHex: d.StakingTxHashHex,
}
token, err := GetPaginationToken(page)
if err != nil {
return "", err
}
return token, nil
}
27 changes: 27 additions & 0 deletions internal/db/model/pagination.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package model

import (
"encoding/base64"
"encoding/json"
)

func DecodePaginationToken[T any](token string) (*T, error) {
tokenBytes, err := base64.URLEncoding.DecodeString(token)
if err != nil {
return nil, err
}
var d T
err = json.Unmarshal(tokenBytes, &d)
if err != nil {
return nil, err
}
return &d, nil
}

func GetPaginationToken[PaginationType any](d PaginationType) (string, error) {
tokenBytes, err := json.Marshal(d)
if err != nil {
return "", err
}
return base64.URLEncoding.EncodeToString(tokenBytes), nil
}
16 changes: 12 additions & 4 deletions internal/observability/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (O Outcome) String() string {
var (
once sync.Once
metricsRouter *chi.Mux
pollDurationHistogram *prometheus.HistogramVec
pollerDurationHistogram *prometheus.HistogramVec
btcClientDurationHistogram *prometheus.HistogramVec
invalidTransactionsCounter *prometheus.CounterVec
failedVerifyingUnbondingTxsCounter prometheus.Counter
Expand Down Expand Up @@ -74,13 +74,13 @@ func initMetricsRouter(metricsPort int) {
// registerMetrics initializes and register the Prometheus metrics.
func registerMetrics() {
defaultHistogramBucketsSeconds := []float64{0.1, 0.5, 1, 2.5, 5, 10, 30}
pollDurationHistogram = prometheus.NewHistogramVec(
pollerDurationHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "poll_duration_seconds",
Help: "Histogram of poll durations in seconds.",
Buckets: defaultHistogramBucketsSeconds,
},
[]string{"status"},
[]string{"poller_name", "status"},
)

btcClientDurationHistogram = prometheus.NewHistogramVec(
Expand Down Expand Up @@ -124,7 +124,7 @@ func registerMetrics() {
)

prometheus.MustRegister(
pollDurationHistogram,
pollerDurationHistogram,
btcClientDurationHistogram,
invalidTransactionsCounter,
failedVerifyingUnbondingTxsCounter,
Expand Down Expand Up @@ -179,3 +179,11 @@ func IncrementFailedVerifyingStakingWithdrawalTxCounter() {
func IncrementFailedVerifyingUnbondingWithdrawalTxCounter() {
failedVerifyingUnbondingWithdrawalTxsCounter.Inc()
}

func ObservePollerDuration(pollerName string, duration time.Duration, err error) {
status := "success"
if err != nil {
status = "failure"
}
pollerDurationHistogram.WithLabelValues(pollerName, status).Observe(duration.Seconds())
}
Loading

0 comments on commit 3319760

Please sign in to comment.