From efad4e6bea2336f7c2112e4cb631f21cbfe234ec Mon Sep 17 00:00:00 2001 From: Ljubisa Gacevic Date: Wed, 18 Dec 2024 11:14:29 +0100 Subject: [PATCH] feat: add scheduler for periodic checks --- cmd/beekeeper/cmd/cmd.go | 1 + cmd/beekeeper/cmd/stamper.go | 93 ++++++++++++++++++++++++------------ go.mod | 10 ++-- go.sum | 38 ++++++++------- pkg/scheduler/scheduler.go | 47 ++++++++++++++++++ pkg/stamper/node.go | 4 +- pkg/stamper/stamper.go | 6 +-- 7 files changed, 143 insertions(+), 56 deletions(-) create mode 100644 pkg/scheduler/scheduler.go diff --git a/cmd/beekeeper/cmd/cmd.go b/cmd/beekeeper/cmd/cmd.go index 120e9559..343ac7a0 100644 --- a/cmd/beekeeper/cmd/cmd.go +++ b/cmd/beekeeper/cmd/cmd.go @@ -358,6 +358,7 @@ func (c *command) preRunE(cmd *cobra.Command, args []string) (err error) { if err := c.setSwapClient(); err != nil { return err } + return nil } diff --git a/cmd/beekeeper/cmd/stamper.go b/cmd/beekeeper/cmd/stamper.go index 2b61d357..419925ef 100644 --- a/cmd/beekeeper/cmd/stamper.go +++ b/cmd/beekeeper/cmd/stamper.go @@ -1,55 +1,49 @@ package cmd import ( + "context" "time" + "github.com/ethersphere/beekeeper/pkg/scheduler" "github.com/ethersphere/beekeeper/pkg/stamper" "github.com/spf13/cobra" ) -func (c *command) initStamperCmd() (err error) { - const ( - optionNameNamespace = "namespace" - optionNameTimeout = "timeout" - optionNameLabelSelector = "label-selector" - ) +const ( + optionNamePeriodicCheck string = "periodic-check" + optionNameNamespace string = "namespace" + optionNameLabelSelector string = "label-selector" +) +func (c *command) initStamperCmd() (err error) { cmd := &cobra.Command{ Use: "stamper", Short: "Manage postage batches for nodes", Long: `Use the stamper command to manage postage batches for nodes. Topup, dilution and creation of postage batches are supported.`, RunE: func(cmd *cobra.Command, args []string) (err error) { - namespace := c.globalConfig.GetString(optionNameNamespace) - // clusterName := c.globalConfig.GetString(optionNameClusterName) - - c.stamper = stamper.NewStamperClient(&stamper.ClientConfig{ - Log: c.log, - Namespace: namespace, - K8sClient: c.k8sClient, - LabelSelector: c.globalConfig.GetString(optionNameLabelSelector), - InCluster: c.globalConfig.GetBool(optionNameInCluster), - }) - - return + return cmd.Help() }, - PreRunE: c.preRunE, } - cmd.PersistentFlags().StringP(optionNameNamespace, "n", "", "Kubernetes namespace. Overrides cluster name if set.") - cmd.PersistentFlags().String(optionNameClusterName, "", "Name of the Beekeeper cluster to target. Ignored if a namespace is specified, in which case the namespace from the cluster configuration is used.") - cmd.PersistentFlags().String(optionNameLabelSelector, nodeFunderLabelSelector, "Kubernetes label selector for filtering resources within the specified namespace. Use an empty string to select all resources.") - cmd.PersistentFlags().Duration(optionNameTimeout, 0*time.Minute, "Maximum duration to wait for the operation to complete. Default is no timeout.") - - cmd.AddCommand(c.initStamperTopup()) - cmd.AddCommand(c.initStamperDilute()) - cmd.AddCommand(c.initStamperCreate()) - cmd.AddCommand(c.initStamperSet()) + cmd.AddCommand(initStamperDefaultFlags(c.initStamperTopup())) + cmd.AddCommand(initStamperDefaultFlags(c.initStamperDilute())) + cmd.AddCommand(initStamperDefaultFlags(c.initStamperCreate())) + cmd.AddCommand(initStamperDefaultFlags(c.initStamperSet())) c.root.AddCommand(cmd) return nil } +func initStamperDefaultFlags(cmd *cobra.Command) *cobra.Command { + cmd.PersistentFlags().StringP(optionNameNamespace, "n", "", "Kubernetes namespace (overrides cluster name).") + cmd.PersistentFlags().String(optionNameClusterName, "", "Target Beekeeper cluster name.") + cmd.PersistentFlags().String(optionNameLabelSelector, nodeFunderLabelSelector, "Kubernetes label selector for filtering resources (use empty string for all).") + cmd.PersistentFlags().Duration(optionNameTimeout, 0*time.Minute, "Operation timeout (no timeout by default).") + cmd.PersistentFlags().Duration(optionNamePeriodicCheck, 0*time.Minute, "Periodic stamper check interval (none by default).") + return cmd +} + func (c *command) initStamperTopup() *cobra.Command { const ( optionTTLThreshold = "ttl-threshold" @@ -84,12 +78,51 @@ func (c *command) initStamperDilute() *cobra.Command { Short: "Dilute postage batches", Long: `Dilute postage batches.`, RunE: func(cmd *cobra.Command, args []string) (err error) { - return + var ctx context.Context + var cancel context.CancelFunc + timeout := c.globalConfig.GetDuration(optionNameTimeout) + if timeout > 0 { + ctx, cancel = context.WithTimeout(cmd.Context(), timeout) + defer cancel() + } else { + ctx = context.Background() + } + + namespace := c.globalConfig.GetString(optionNameNamespace) + // clusterName := c.globalConfig.GetString(optionNameClusterName) + + c.stamper = stamper.NewStamperClient(&stamper.ClientConfig{ + Log: c.log, + Namespace: namespace, + K8sClient: c.k8sClient, + LabelSelector: c.globalConfig.GetString(optionNameLabelSelector), + InCluster: c.globalConfig.GetBool(optionNameInCluster), + }) + + periodicCheck := c.globalConfig.GetDuration(optionNamePeriodicCheck) + + if periodicCheck == 0 { + c.stamper.Dilute(ctx, c.globalConfig.GetFloat64(optionUsageThreshold), c.globalConfig.GetUint16(optionDiutionDepth)) + return nil + } + + diluteExecutor := scheduler.NewPeriodicExecutor(periodicCheck, c.log) + diluteExecutor.Start(ctx, func(ctx context.Context) error { + return c.stamper.Dilute(ctx, c.globalConfig.GetFloat64(optionUsageThreshold), c.globalConfig.GetUint16(optionDiutionDepth)) + }) + defer diluteExecutor.Stop() + + <-ctx.Done() + + c.log.Infof("dilution stopped: %v", ctx.Err()) + + return nil }, + PreRunE: c.preRunE, } cmd.Flags().Float64(optionUsageThreshold, 90, "Percentage threshold for stamp utilization. Triggers dilution when usage exceeds this value.") - cmd.Flags().Uint16(optionDiutionDepth, 1, "Number of levels by which to increase the depth of a stamp during dilution.") + cmd.Flags().Uint8(optionDiutionDepth, 1, "Number of levels by which to increase the depth of a stamp during dilution.") c.root.AddCommand(cmd) diff --git a/go.mod b/go.mod index c87b7ebd..d17c3c89 100644 --- a/go.mod +++ b/go.mod @@ -18,8 +18,8 @@ require ( github.com/prometheus/client_golang v1.19.0 github.com/prometheus/common v0.50.0 github.com/sirupsen/logrus v1.9.3 - github.com/spf13/cobra v1.8.0 - github.com/spf13/viper v1.18.2 + github.com/spf13/cobra v1.8.1 + github.com/spf13/viper v1.19.0 github.com/uber/jaeger-client-go v2.30.0+incompatible golang.org/x/crypto v0.24.0 golang.org/x/sync v0.7.0 @@ -65,7 +65,7 @@ require ( github.com/golang/protobuf v1.5.4 // indirect github.com/google/gnostic-models v0.6.8 // indirect github.com/google/gofuzz v1.2.0 // indirect - github.com/hashicorp/errwrap v1.0.0 // indirect + github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/holiman/uint256 v1.2.4 // indirect @@ -99,7 +99,7 @@ require ( github.com/multiformats/go-multistream v0.5.0 // indirect github.com/multiformats/go-varint v0.0.7 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect - github.com/pelletier/go-toml/v2 v2.1.0 // indirect + github.com/pelletier/go-toml/v2 v2.2.2 // indirect github.com/pjbgf/sha1cd v0.3.0 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_model v0.6.0 // indirect @@ -133,7 +133,7 @@ require ( golang.org/x/text v0.16.0 // indirect golang.org/x/time v0.5.0 // indirect golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect - google.golang.org/appengine v1.6.7 // indirect + google.golang.org/appengine v1.6.8 // indirect google.golang.org/protobuf v1.34.2 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/ini.v1 v1.67.0 // indirect diff --git a/go.sum b/go.sum index 32b0208b..72b06b42 100644 --- a/go.sum +++ b/go.sum @@ -51,8 +51,8 @@ github.com/consensys/bavard v0.1.13 h1:oLhMLOFGTLdlda/kma4VOJazblc7IM5y5QPd2A/Yj github.com/consensys/bavard v0.1.13/go.mod h1:9ItSMtA/dXMAiL7BG6bqW2m3NdSEObYWoH223nGHukI= github.com/consensys/gnark-crypto v0.12.1 h1:lHH39WuuFgVHONRl3J0LRBtuYdQTumFSDtJF7HpyG8M= github.com/consensys/gnark-crypto v0.12.1/go.mod h1:v2Gy7L/4ZRosZ7Ivs+9SfUDr0f5UlG+EM5t7MPHiLuY= -github.com/cpuguy83/go-md2man/v2 v2.0.3 h1:qMCsGGgs+MAzDFyp9LpAe1Lqy/fY/qCovCm0qnXZOBM= -github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/cpuguy83/go-md2man/v2 v2.0.4 h1:wfIWP927BUkWJb2NmU/kNDYIBTh/ziUX91+lVfRxZq4= +github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/crate-crypto/go-ipa v0.0.0-20231025140028-3c0104f4b233 h1:d28BXYi+wUpz1KBmiF9bWrjEMacUEREV6MBi2ODnrfQ= github.com/crate-crypto/go-ipa v0.0.0-20231025140028-3c0104f4b233/go.mod h1:geZJZH3SzKCqnz5VT0q/DyIG/tvu/dZk+VIfXicupJs= github.com/crate-crypto/go-kzg-4844 v1.0.0 h1:TsSgHwrkTKecKJ4kadtHi4b3xHW5dCFUDFnUp1TsawI= @@ -137,13 +137,15 @@ github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOW github.com/golang-jwt/jwt/v4 v4.5.0/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= -github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb h1:PBC98N2aIaM3XXiurYmW7fx4GZkL8feAMVq7nEjURHk= github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/gnostic-models v0.6.8 h1:yo/ABAfM5IMRsS1VnXjTBvUb61tFIHozhlYvRgGre9I= github.com/google/gnostic-models v0.6.8/go.mod h1:5n7qKqH0f5wFt+aWF8CW6pZLLNOfYuF5OpfBSENuI8U= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= @@ -157,8 +159,9 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY= github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= -github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= +github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/go-bexpr v0.1.10 h1:9kuI5PFotCboP3dkDYFr/wi0gg0QVbSNz5oFRpxn4uE= github.com/hashicorp/go-bexpr v0.1.10/go.mod h1:oxlubA2vC/gFVfX1A6JGp7ls7uCDlfJn732ehYYg+g0= github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= @@ -276,8 +279,8 @@ github.com/onsi/gomega v1.31.0 h1:54UJxxj6cPInHS3a35wm6BK/F9nHYueZ1NVujHDrnXE= github.com/onsi/gomega v1.31.0/go.mod h1:DW9aCi7U6Yi40wNVAvT6kzFnEVEI5n3DloYBiKiT6zk= github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs= github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc= -github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= -github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= +github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= +github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pjbgf/sha1cd v0.3.0 h1:4D5XXmUUBUl/xQ6IjCkEAbqXskkq/4O7LmGn0AqMDs4= github.com/pjbgf/sha1cd v0.3.0/go.mod h1:nZ1rrWOcGJ5uZgEEVL1VUM9iRQiZvWdbZjkKyFzPPsI= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -322,18 +325,19 @@ github.com/spf13/afero v1.11.0 h1:WJQKhtpdm3v2IzqG8VMqrr6Rf3UYpEF239Jy9wNepM8= github.com/spf13/afero v1.11.0/go.mod h1:GH9Y3pIexgf1MTIWtNGyogA5MwRIDXGUr+hbWNoBjkY= github.com/spf13/cast v1.6.0 h1:GEiTHELF+vaR5dhz3VqZfFSzZjYbgeKDpBxQVS4GYJ0= github.com/spf13/cast v1.6.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo= -github.com/spf13/cobra v1.8.0 h1:7aJaZx1B85qltLMc546zn58BxxfZdR/W22ej9CFoEf0= -github.com/spf13/cobra v1.8.0/go.mod h1:WXLWApfZ71AjXPya3WOlMsY9yMs7YeiHhFVlvLyhcho= +github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM= +github.com/spf13/cobra v1.8.1/go.mod h1:wHxEcudfqmLYa8iTfL+OuZPbBZkmvliBWKIezN3kD9Y= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= -github.com/spf13/viper v1.18.2 h1:LUXCnvUvSM6FXAsj6nnfc8Q2tp1dIgUfY9Kc8GsSOiQ= -github.com/spf13/viper v1.18.2/go.mod h1:EKmWIqdnk5lOcmR72yw6hS+8OPYcwD0jteitLMVB+yk= +github.com/spf13/viper v1.19.0 h1:RWq5SEjt8o25SROyN3z2OrDB9l7RPd3lwTWU8EcEdcI= +github.com/spf13/viper v1.19.0/go.mod h1:GQUN9bilAbhU/jgc1bKs99f/suXKeUMct8Adx5+Ntkg= github.com/status-im/keycard-go v0.2.0 h1:QDLFswOQu1r5jsycloeQh3bVU8n/NatHHaZobtDnDzA= github.com/status-im/keycard-go v0.2.0/go.mod h1:wlp8ZLbsmrF6g6WjugPAx+IzoLrkdf9+mHxBEeo3Hbg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= @@ -342,8 +346,9 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= github.com/supranational/blst v0.3.11 h1:LyU6FolezeWAhvQk0k6O/d49jqgO52MSDDfYgbeoEm4= @@ -401,7 +406,6 @@ golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA= golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= @@ -453,10 +457,10 @@ golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U= golang.org/x/term v0.21.0 h1:WVXCp+/EBEHOj53Rvu+7KiT/iElMrO8ACK16SMZ3jaA= golang.org/x/term v0.21.0/go.mod h1:ooXLefLobQVslOqselCNF4SxFAaoS6KujMbsGzSDmX0= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= @@ -476,8 +480,10 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c= -google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= +google.golang.org/appengine v1.6.8 h1:IhEN5q69dyKagZPYMSdIjS2HqprW324FRQZJcGqPAsM= +google.golang.org/appengine v1.6.8/go.mod h1:1jJ3jBArFh5pcgW8gCtRJnepW8FzD1V44FJffLiz/Ds= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go new file mode 100644 index 00000000..f1a086e7 --- /dev/null +++ b/pkg/scheduler/scheduler.go @@ -0,0 +1,47 @@ +package scheduler + +import ( + "context" + "time" + + "github.com/ethersphere/beekeeper/pkg/logging" +) + +type PeriodicExecutor struct { + ticker *time.Ticker + interval time.Duration + log logging.Logger + stopChan chan struct{} +} + +func NewPeriodicExecutor(interval time.Duration, log logging.Logger) *PeriodicExecutor { + return &PeriodicExecutor{ + ticker: time.NewTicker(interval), + interval: interval, + log: log, + stopChan: make(chan struct{}), + } +} + +func (pe *PeriodicExecutor) Start(ctx context.Context, task func(ctx context.Context) error) { + go func() { + for { + select { + case <-pe.ticker.C: + pe.log.Tracef("Executing task") + if err := task(ctx); err != nil { + pe.log.Errorf("Task execution failed: %v", err) + } + case <-pe.stopChan: + return + case <-ctx.Done(): + return + } + } + }() +} + +func (pe *PeriodicExecutor) Stop() { + pe.ticker.Stop() + close(pe.stopChan) +} diff --git a/pkg/stamper/node.go b/pkg/stamper/node.go index 5febea1b..eff7d414 100644 --- a/pkg/stamper/node.go +++ b/pkg/stamper/node.go @@ -31,7 +31,7 @@ func (n *Node) Create(ctx context.Context, amount uint64, depth uint8) error { return nil } -func (n *Node) Dilute(ctx context.Context, threshold float64, depthIncrement uint8) error { +func (n *Node) Dilute(ctx context.Context, threshold float64, depthIncrement uint16) error { batches, err := n.client.Postage.PostageBatches(ctx) if err != nil { return fmt.Errorf("node %s: get postage batches: %w", n.Name, err) @@ -47,7 +47,7 @@ func (n *Node) Dilute(ctx context.Context, threshold float64, depthIncrement uin stampsUsage := (float64(batch.Utilization) / divisor) * 100 // (utilization / 2^(depth - bucketDepth)) * 100 if stampsUsage >= threshold { - newDepth := batch.Depth + depthIncrement + newDepth := uint16(batch.Depth) + depthIncrement if err := n.client.Postage.DilutePostageBatch(ctx, batch.BatchID, uint64(newDepth), ""); err != nil { return fmt.Errorf("node %s: dilute batch %s: %w", n.Name, batch.BatchID, err) } diff --git a/pkg/stamper/stamper.go b/pkg/stamper/stamper.go index 8e550b40..7c7bd37a 100644 --- a/pkg/stamper/stamper.go +++ b/pkg/stamper/stamper.go @@ -15,7 +15,7 @@ import ( type Client interface { Create(ctx context.Context, amount uint64, depth uint8) error - Dilute(ctx context.Context, threshold float64, depth uint8) error + Dilute(ctx context.Context, threshold float64, depth uint16) error Set(ctx context.Context, ttlThreshold, topupDuration time.Duration, threshold float64, depth uint16) error Topup(ctx context.Context, ttlThreshold, topupDuration time.Duration) error } @@ -61,7 +61,7 @@ func (s *StamperClient) Create(ctx context.Context, amount uint64, depth uint8) } // Dilute implements Client. -func (s *StamperClient) Dilute(ctx context.Context, usageThreshold float64, dilutionDepth uint8) error { +func (s *StamperClient) Dilute(ctx context.Context, usageThreshold float64, dilutionDepth uint16) error { nodes, err := s.getNamespaceNodes(ctx) if err != nil { return fmt.Errorf("get namespace nodes: %w", err) @@ -151,7 +151,7 @@ func (sc *StamperClient) getIngressNodes(ctx context.Context) ([]Node, error) { allNodes := append(ingressNodes, ingressRouteNodes...) nodes := make([]Node, len(allNodes)) for i, node := range allNodes { - parsedURL, err := url.Parse(node.Host) + parsedURL, err := url.Parse(fmt.Sprintf("http://%s", node.Host)) if err != nil { return nil, fmt.Errorf("extract base URL: %w", err) }