Skip to content

Commit

Permalink
feat(stamper): add batch-ids flag to filter per batchID
Browse files Browse the repository at this point in the history
  • Loading branch information
gacevicljubisa committed Jan 18, 2025
1 parent c124175 commit 4366d90
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 67 deletions.
15 changes: 14 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,17 @@ beekeeper restart -namespace=default --label-selector="app=bee" --timeout=10m

### stamper

Command **stamper** manage postage batches for nodes and has following subcommands:
Command **stamper** manage postage batches for nodes.

General Notes:

- `namespace` or `cluster-name` must be specified to locate the bee nodes.
- If both are provided, `namespace` takes precedence.
- When `namespace` is set, you can use a `label-selector` to filter specific nodes.
- Use `batch-ids` to target specific postage batches, but this is applied after finding/filtering nodes. If `batch-ids` is not provided, all batches in the filtered nodes are targeted.
- If `timeout` is set to 0 and `periodic-check` is bigger than 0, the operation will run indefinitely with periodic checks.

It has following subcommands:

- **create** - creates a postage batch for selected nodes

Expand Down Expand Up @@ -492,6 +502,7 @@ Command **stamper** manage postage batches for nodes and has following subcomman
It has following flags:

```console
--batch-ids strings Comma separated list of postage batch IDs to top up. If not provided, all batches are topped up.
--cluster-name string Target Beekeeper cluster name.
--geth-url string Geth URL for chain state retrieval.
--help help for topup
Expand Down Expand Up @@ -520,6 +531,7 @@ Command **stamper** manage postage batches for nodes and has following subcomman
It has following flags:

```console
--batch-ids strings Comma separated list of postage batch IDs to dilute. If not provided, all batches are diluted.
--cluster-name string Target Beekeeper cluster name.
--dilution-depth uint8 Number of levels by which to increase the depth of a stamp during dilution. (default 1)
--help help for dilute
Expand Down Expand Up @@ -547,6 +559,7 @@ Command **stamper** manage postage batches for nodes and has following subcomman
It has following flags:

```console
--batch-ids strings Comma separated list of postage batch IDs to set. If not provided, all batches are set.
--cluster-name string Target Beekeeper cluster name.
--dilution-depth uint16 Number of levels by which to increase the depth of a stamp during dilution. (default 1)
--geth-url string Geth URL for chain state retrieval.
Expand Down
92 changes: 47 additions & 45 deletions cmd/beekeeper/cmd/stamper.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,10 @@ func initStamperDefaultFlags(cmd *cobra.Command) *cobra.Command {

func (c *command) initStamperTopup() *cobra.Command {
const (
optionTTLThreshold = "ttl-threshold"
optionTopUpTo = "topup-to"
optionGethUrl = "geth-url"
optionNameTTLThreshold = "ttl-threshold"
optionNameTopUpTo = "topup-to"
optionNameGethUrl = "geth-url"
optionNameBatchIDs = "batch-ids"
)

cmd := &cobra.Command{
Expand All @@ -58,36 +59,37 @@ func (c *command) initStamperTopup() *cobra.Command {
Long: `Top up the TTL of postage batches.`,
RunE: func(cmd *cobra.Command, args []string) (err error) {
return c.withTimeoutHandler(cmd, func(ctx context.Context) error {
stamper, err := c.createStamperClient(ctx)
stamperClient, err := c.createStamperClient(ctx)
if err != nil {
return fmt.Errorf("failed to create stamper client: %w", err)
}

return c.executePeriodically(ctx, func(ctx context.Context) error {
return stamper.Topup(ctx,
c.globalConfig.GetDuration(optionTTLThreshold),
c.globalConfig.GetDuration(optionTopUpTo),
return stamperClient.Topup(ctx,
c.globalConfig.GetDuration(optionNameTTLThreshold),
c.globalConfig.GetDuration(optionNameTopUpTo),
stamper.WithBatchIDs(c.globalConfig.GetStringSlice(optionNameBatchIDs)),
)
})
})
},
PreRunE: c.preRunE,
}

cmd.Flags().Duration(optionTTLThreshold, 5*24*time.Hour, "Threshold for the remaining TTL of a stamp. Actions are triggered when TTL drops below this value.")
cmd.Flags().Duration(optionTopUpTo, 30*24*time.Hour, "Duration to top up the TTL of a stamp to.")
cmd.Flags().String(optionGethUrl, "", "Geth URL for chain state retrieval.")
cmd.Flags().Duration(optionNameTTLThreshold, 5*24*time.Hour, "Threshold for the remaining TTL of a stamp. Actions are triggered when TTL drops below this value.")
cmd.Flags().Duration(optionNameTopUpTo, 30*24*time.Hour, "Duration to top up the TTL of a stamp to.")
cmd.Flags().StringSlice(optionNameBatchIDs, nil, "Comma separated list of postage batch IDs to top up. If not provided, all batches are topped up.")
cmd.Flags().String(optionNameGethUrl, "", "Geth URL for chain state retrieval.")
cmd.Flags().Duration(optionNamePeriodicCheck, 0, "Periodic check interval. Default is 0, which means no periodic check.")

c.root.AddCommand(cmd)

return cmd
}

func (c *command) initStamperDilute() *cobra.Command {
const (
optionUsageThreshold = "usage-threshold"
optionDiutionDepth = "dilution-depth"
optionNameUsageThreshold = "usage-threshold"
optionNameDiutionDepth = "dilution-depth"
optionNameBatchIDs = "batch-ids"
)

cmd := &cobra.Command{
Expand All @@ -96,28 +98,28 @@ func (c *command) initStamperDilute() *cobra.Command {
Long: `Dilute postage batches.`,
RunE: func(cmd *cobra.Command, args []string) (err error) {
return c.withTimeoutHandler(cmd, func(ctx context.Context) error {
stamper, err := c.createStamperClient(ctx)
stamperClient, err := c.createStamperClient(ctx)
if err != nil {
return fmt.Errorf("failed to create stamper client: %w", err)
}

return c.executePeriodically(ctx, func(ctx context.Context) error {
return stamper.Dilute(ctx,
c.globalConfig.GetFloat64(optionUsageThreshold),
c.globalConfig.GetUint16(optionDiutionDepth),
return stamperClient.Dilute(ctx,
c.globalConfig.GetFloat64(optionNameUsageThreshold),
c.globalConfig.GetUint16(optionNameDiutionDepth),
stamper.WithBatchIDs(c.globalConfig.GetStringSlice(optionNameBatchIDs)),
)
})
})
},
PreRunE: c.preRunE,
}

cmd.Flags().Float64(optionUsageThreshold, 90, "Percentage threshold for stamp utilization. Triggers dilution when usage exceeds this value.")
cmd.Flags().Uint8(optionDiutionDepth, 1, "Number of levels by which to increase the depth of a stamp during dilution.")
cmd.Flags().Float64(optionNameUsageThreshold, 90, "Percentage threshold for stamp utilization. Triggers dilution when usage exceeds this value.")
cmd.Flags().Uint8(optionNameDiutionDepth, 1, "Number of levels by which to increase the depth of a stamp during dilution.")
cmd.Flags().StringSlice(optionNameBatchIDs, nil, "Comma separated list of postage batch IDs to dilute. If not provided, all batches are diluted.")
cmd.Flags().Duration(optionNamePeriodicCheck, 0, "Periodic check interval. Default is 0, which means no periodic check.")

c.root.AddCommand(cmd)

return cmd
}

Expand All @@ -133,12 +135,12 @@ func (c *command) initStamperCreate() *cobra.Command {
Long: `Create a postage batch for selected nodes. Nodes are selected by namespace (use label-selector for filtering) or cluster name.`,
RunE: func(cmd *cobra.Command, args []string) (err error) {
return c.withTimeoutHandler(cmd, func(ctx context.Context) error {
stamper, err := c.createStamperClient(ctx)
stamperClient, err := c.createStamperClient(ctx)
if err != nil {
return fmt.Errorf("failed to create stamper client: %w", err)
}

return stamper.Create(ctx,
return stamperClient.Create(ctx,
c.globalConfig.GetUint64(optionNameAmount),
c.globalConfig.GetUint16(optionNameDepth),
)
Expand All @@ -148,20 +150,19 @@ func (c *command) initStamperCreate() *cobra.Command {
}

cmd.Flags().Uint64(optionNameAmount, 100000000, "Amount of BZZ in PLURS added that the postage batch will have.")
cmd.Flags().Uint16(optionNameDepth, 16, "Batch depth which specifies how many chunks can be signed with the batch. It is a logarithm. Must be higher than default bucket depth (16)")

c.root.AddCommand(cmd)
cmd.Flags().Uint16(optionNameDepth, 17, "Batch depth which specifies how many chunks can be signed with the batch. It is a logarithm. Must be higher than default bucket depth (16)")

return cmd
}

func (c *command) initStamperSet() *cobra.Command {
const (
optionTTLThreshold = "ttl-threshold"
optionTopUpTo = "topup-to"
optionUsageThreshold = "usage-threshold"
optionDiutionDepth = "dilution-depth"
optionGethUrl = "geth-url"
optionNameTTLThreshold = "ttl-threshold"
optionNameTopUpTo = "topup-to"
optionNameUsageThreshold = "usage-threshold"
optionNameDiutionDepth = "dilution-depth"
optionNameGethUrl = "geth-url"
optionNameBatchIDs = "batch-ids"
)

cmd := &cobra.Command{
Expand All @@ -170,32 +171,32 @@ func (c *command) initStamperSet() *cobra.Command {
Long: `Set stamper configuration.`,
RunE: func(cmd *cobra.Command, args []string) (err error) {
return c.withTimeoutHandler(cmd, func(ctx context.Context) error {
stamper, err := c.createStamperClient(ctx)
stamperClient, err := c.createStamperClient(ctx)
if err != nil {
return fmt.Errorf("failed to create stamper client: %w", err)
}

return c.executePeriodically(ctx, func(ctx context.Context) error {
return stamper.Set(ctx,
c.globalConfig.GetDuration(optionTTLThreshold),
c.globalConfig.GetDuration(optionTopUpTo),
c.globalConfig.GetFloat64(optionUsageThreshold),
c.globalConfig.GetUint16(optionDiutionDepth),
return stamperClient.Set(ctx,
c.globalConfig.GetDuration(optionNameTTLThreshold),
c.globalConfig.GetDuration(optionNameTopUpTo),
c.globalConfig.GetFloat64(optionNameUsageThreshold),
c.globalConfig.GetUint16(optionNameDiutionDepth),
stamper.WithBatchIDs(c.globalConfig.GetStringSlice(optionNameBatchIDs)),
)
})
})
},
}

cmd.Flags().Duration(optionTTLThreshold, 5*24*time.Hour, "Threshold for the remaining TTL of a stamp. Actions are triggered when TTL drops below this value.")
cmd.Flags().Duration(optionTopUpTo, 30*24*time.Hour, "Duration to top up the TTL of a stamp to.")
cmd.Flags().Float64(optionUsageThreshold, 90, "Percentage threshold for stamp utilization. Triggers dilution when usage exceeds this value.")
cmd.Flags().Uint16(optionDiutionDepth, 1, "Number of levels by which to increase the depth of a stamp during dilution.")
cmd.Flags().String(optionGethUrl, "", "Geth URL for chain state retrieval.")
cmd.Flags().Duration(optionNameTTLThreshold, 5*24*time.Hour, "Threshold for the remaining TTL of a stamp. Actions are triggered when TTL drops below this value.")
cmd.Flags().Duration(optionNameTopUpTo, 30*24*time.Hour, "Duration to top up the TTL of a stamp to.")
cmd.Flags().Float64(optionNameUsageThreshold, 90, "Percentage threshold for stamp utilization. Triggers dilution when usage exceeds this value.")
cmd.Flags().Uint16(optionNameDiutionDepth, 1, "Number of levels by which to increase the depth of a stamp during dilution.")
cmd.Flags().StringSlice(optionNameBatchIDs, nil, "Comma separated list of postage batch IDs to set. If not provided, all batches are set.")
cmd.Flags().String(optionNameGethUrl, "", "Geth URL for chain state retrieval.")
cmd.Flags().Duration(optionNamePeriodicCheck, 0, "Periodic check interval. Default is 0, which means no periodic check.")

c.root.AddCommand(cmd)

return cmd
}

Expand Down Expand Up @@ -226,6 +227,7 @@ func (c *command) createStamperClient(ctx context.Context) (stamper.Client, erro
Namespace: namespace,
K8sClient: c.k8sClient,
BeeClients: beeClients,
SwapClient: c.swapClient,
LabelSelector: c.globalConfig.GetString(optionNameLabelSelector),
InCluster: c.globalConfig.GetBool(optionNameInCluster),
}), nil
Expand Down
29 changes: 25 additions & 4 deletions pkg/stamper/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,17 @@ func (n *node) Create(ctx context.Context, amount uint64, depth uint16) error {
return nil
}

func (n *node) Dilute(ctx context.Context, threshold float64, depthIncrement uint16) error {
func (n *node) Dilute(ctx context.Context, threshold float64, depthIncrement uint16, batchIds []string) error {
batches, err := n.client.Postage.PostageBatches(ctx)
if err != nil {
return fmt.Errorf("node %s: get postage batches: %w", n.name, err)
}

for _, batch := range batches {
if len(batchIds) > 0 && !contains(batchIds, batch.BatchID) {
continue
}

if !batch.Usable || batch.Utilization == 0 {
continue
}
Expand All @@ -67,7 +71,7 @@ func (n *node) Dilute(ctx context.Context, threshold float64, depthIncrement uin
// which directly affects the calculations for Topup by reducing the effective batch TTL.
// Therefore, Topup is handled first, considering the original depth, followed by Dilute
// which accounts for the new depth and utilization threshold.
func (n *node) Set(ctx context.Context, ttlThreshold time.Duration, topupDuration time.Duration, threshold float64, depth uint16, blockTime int64) error {
func (n *node) Set(ctx context.Context, ttlThreshold time.Duration, topupDuration time.Duration, threshold float64, depth uint16, blockTime int64, batchIds []string) error {
chainState, err := n.client.Postage.GetChainState(ctx)
if err != nil {
return fmt.Errorf("node %s: get chain state: %w", n.name, err)
Expand All @@ -84,6 +88,10 @@ func (n *node) Set(ctx context.Context, ttlThreshold time.Duration, topupDuratio
}

for _, batch := range batches {
if len(batchIds) > 0 && !contains(batchIds, batch.BatchID) {
continue
}

if !batch.Usable || batch.Utilization == 0 {
continue
}
Expand Down Expand Up @@ -117,7 +125,7 @@ func (n *node) Set(ctx context.Context, ttlThreshold time.Duration, topupDuratio
}
}

// Dilute logic
// Dilute
usageFactor := batch.Depth - batch.BucketDepth // depth - bucketDepth
divisor := float64(int(1) << usageFactor) // 2^(depth - bucketDepth)
stampsUsage := (float64(batch.Utilization) / divisor) * 100 // (utilization / 2^(depth - bucketDepth)) * 100
Expand All @@ -135,7 +143,7 @@ func (n *node) Set(ctx context.Context, ttlThreshold time.Duration, topupDuratio
return nil
}

func (n *node) Topup(ctx context.Context, ttlThreshold time.Duration, topupDuration time.Duration, blockTime int64) error {
func (n *node) Topup(ctx context.Context, ttlThreshold time.Duration, topupDuration time.Duration, blockTime int64, batchIds []string) error {
chainState, err := n.client.Postage.GetChainState(ctx)
if err != nil {
return fmt.Errorf("node %s: get chain state: %w", n.name, err)
Expand All @@ -152,6 +160,10 @@ func (n *node) Topup(ctx context.Context, ttlThreshold time.Duration, topupDurat
}

for _, batch := range batches {
if len(batchIds) > 0 && !contains(batchIds, batch.BatchID) {
continue
}

if !batch.Usable || batch.Utilization == 0 {
continue
}
Expand Down Expand Up @@ -188,3 +200,12 @@ func (n *node) Topup(ctx context.Context, ttlThreshold time.Duration, topupDurat

return nil
}

func contains(slice []string, value string) bool {
for _, v := range slice {
if v == value {
return true
}
}
return false
}
Loading

0 comments on commit 4366d90

Please sign in to comment.