Skip to content

Commit

Permalink
fix: add informer resync period for node status watcher
Browse files Browse the repository at this point in the history
Also use a constant everywhere in informers.

Add some debug logs.

Might fix #9991

Signed-off-by: Andrey Smirnov <[email protected]>
  • Loading branch information
smira committed Jan 14, 2025
1 parent 9b957df commit da2e811
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 10 deletions.
2 changes: 1 addition & 1 deletion internal/app/machined/pkg/controllers/k8s/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ func (ctrl *EndpointController) watchKubernetesEndpoint(ctx context.Context, r c

func kubernetesEndpointWatcher(ctx context.Context, logger *zap.Logger, client *kubernetes.Client) (chan *corev1.Endpoints, func(), error) {
informerFactory := informers.NewSharedInformerFactoryWithOptions(
client.Clientset, 30*time.Second,
client.Clientset, constants.KubernetesInformerDefaultResyncPeriod,
informers.WithNamespace(corev1.NamespaceDefault),
informers.WithTweakListOptions(func(options *v1.ListOptions) {
options.FieldSelector = fields.OneTermEqualSelector("metadata.name", "kubernetes").String()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"
"fmt"

"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
Expand All @@ -17,6 +18,7 @@ import (
"k8s.io/client-go/tools/cache"

"github.com/siderolabs/talos/pkg/kubernetes"
"github.com/siderolabs/talos/pkg/machinery/constants"
)

// NodeWatcher defines a NodeWatcher-based node watcher.
Expand Down Expand Up @@ -46,10 +48,12 @@ func (r *NodeWatcher) Get() (*corev1.Node, error) {
}

// Watch starts watching Node state and notifies on updates via notify channel.
func (r *NodeWatcher) Watch(ctx context.Context) (<-chan struct{}, <-chan error, func(), error) {
func (r *NodeWatcher) Watch(ctx context.Context, logger *zap.Logger) (<-chan struct{}, <-chan error, func(), error) {
logger.Debug("starting node watcher", zap.String("nodename", r.nodename))

informerFactory := informers.NewSharedInformerFactoryWithOptions(
r.client.Clientset,
0,
constants.KubernetesInformerDefaultResyncPeriod,
informers.WithTweakListOptions(
func(opts *metav1.ListOptions) {
opts.FieldSelector = fields.OneTermEqualSelector(metav1.ObjectNameField, r.nodename).String()
Expand Down Expand Up @@ -88,7 +92,11 @@ func (r *NodeWatcher) Watch(ctx context.Context) (<-chan struct{}, <-chan error,

informerFactory.Start(ctx.Done())

logger.Debug("waiting for node cache sync")

informerFactory.WaitForCacheSync(ctx.Done())

logger.Debug("node cache sync done")

return notifyCh, watchErrCh, informerFactory.Shutdown, nil
}
4 changes: 2 additions & 2 deletions internal/app/machined/pkg/controllers/k8s/node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,9 @@ func (ctrl *NodeStatusController) Run(ctx context.Context, r controller.Runtime,
var watchCtx context.Context
watchCtx, watchCtxCancel = context.WithCancel(ctx) //nolint:govet

notifyCh, watchErrCh, notifyCloser, err = nodewatcher.Watch(watchCtx)
notifyCh, watchErrCh, notifyCloser, err = nodewatcher.Watch(watchCtx, logger)
if err != nil {
return fmt.Errorf("error setting up registry watcher: %w", err) //nolint:govet
return fmt.Errorf("error setting up node watcher: %w", err) //nolint:govet
}
}

Expand Down
10 changes: 7 additions & 3 deletions internal/integration/base/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
"k8s.io/kubectl/pkg/scheme"

taloskubernetes "github.com/siderolabs/talos/pkg/kubernetes"
"github.com/siderolabs/talos/pkg/machinery/constants"
)

// K8sSuite is a base suite for K8s tests.
Expand Down Expand Up @@ -813,9 +814,12 @@ func (k8sSuite *K8sSuite) SetupNodeInformer(ctx context.Context, nodeName string

watchCh := make(chan *corev1.Node)

informerFactory := informers.NewSharedInformerFactoryWithOptions(k8sSuite.Clientset, 30*time.Second, informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.FieldSelector = metadataKeyName + nodeName
}))
informerFactory := informers.NewSharedInformerFactoryWithOptions(
k8sSuite.Clientset, constants.KubernetesInformerDefaultResyncPeriod,
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.FieldSelector = metadataKeyName + nodeName
}),
)

nodeInformer := informerFactory.Core().V1().Nodes().Informer()
_, err := nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down
3 changes: 1 addition & 2 deletions internal/pkg/discovery/registry/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"net/netip"
"strconv"
"strings"
"time"

"github.com/siderolabs/gen/value"
"github.com/siderolabs/gen/xslices"
Expand Down Expand Up @@ -265,7 +264,7 @@ func (r *Kubernetes) List(localNodeName string) ([]*cluster.AffiliateSpec, error

// Watch starts watching Node state and notifies on updates via notify channel.
func (r *Kubernetes) Watch(ctx context.Context, logger *zap.Logger) (<-chan struct{}, func(), error) {
informerFactory := informers.NewSharedInformerFactory(r.client.Clientset, 30*time.Second)
informerFactory := informers.NewSharedInformerFactory(r.client.Clientset, constants.KubernetesInformerDefaultResyncPeriod)

notifyCh := make(chan struct{}, 1)

Expand Down
3 changes: 3 additions & 0 deletions pkg/machinery/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -1238,6 +1238,9 @@ const (

// RegistrydListenAddress is the address to listen on for the registryd service.
RegistrydListenAddress = "127.0.0.1:3172"

// KubernetesInformerDefaultResyncPeriod is the default resync period for Kubernetes informers.
KubernetesInformerDefaultResyncPeriod = 30 * time.Second
)

// See https://linux.die.net/man/3/klogctl
Expand Down

0 comments on commit da2e811

Please sign in to comment.