Skip to content

Commit

Permalink
Ensure given event broadcaster is used by manager
Browse files Browse the repository at this point in the history
Fixes an issue where the event broadcaster provided to
'manager.NewManager()' was not being used.
  • Loading branch information
alecmerdler committed Jan 17, 2025
1 parent 8224053 commit 9069b5d
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 5 deletions.
73 changes: 72 additions & 1 deletion manager/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@ package manager
import (
"context"
"fmt"
"net/http"
"testing"
"time"

"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
ctrlmanageropts "k8s.io/controller-manager/options"
Expand Down Expand Up @@ -52,7 +56,7 @@ func TestControllerQueueDone(t *testing.T) {
CtxQueue := queue.NewQueueOperationsCtx()
registry := typed.NewRegistry()
broadcaster := record.NewBroadcaster()
eventSink := &typedcorev1.EventSinkImpl{Interface: fake.NewSimpleClientset().CoreV1().Events("")}
eventSink := newFakeEventSink()

controller := NewOwnedResourceController(klogr.New(), "my-controller", gvr, CtxQueue, registry, broadcaster, func(_ context.Context, gvr schema.GroupVersionResource, namespace, name string) {
fmt.Println("processing", gvr, namespace, name)
Expand Down Expand Up @@ -81,3 +85,70 @@ func TestControllerQueueDone(t *testing.T) {
return controller.Queue.Len() == 0
}, 1*time.Second, 1*time.Millisecond)
}

func TestControllerEventsBroadcast(t *testing.T) {
gvr := schema.GroupVersionResource{
Group: "example.com",
Version: "v1",
Resource: "mytypes",
}
CtxQueue := queue.NewQueueOperationsCtx()
registry := typed.NewRegistry()
broadcaster := record.NewBroadcaster()
eventSink := newFakeEventSink()
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "my-controller"})

controller := NewOwnedResourceController(klogr.New(), "my-controller", gvr, CtxQueue, registry, broadcaster, func(_ context.Context, gvr schema.GroupVersionResource, namespace, name string) {
fmt.Println("processing", gvr, namespace, name)
})

mgr := NewManager(ctrlmanageropts.RecommendedDebuggingOptions().DebuggingConfiguration, ":8888", broadcaster, eventSink)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
_ = mgr.Start(ctx, controller)
}()
require.Eventually(t, healthCheckPassing("http://localhost:8888/healthz"), 1*time.Second, 50*time.Millisecond)

recorder.Event(&v1.ObjectReference{Namespace: "test", Name: "a"}, v1.EventTypeNormal, "test", "test")

require.Eventually(t, func() bool {
return len(eventSink.Events) > 0
}, 5*time.Second, 1*time.Millisecond)
}

type fakeEventSink struct {
Events map[types.UID]*v1.Event
}

func newFakeEventSink() *fakeEventSink {
return &fakeEventSink{
Events: make(map[types.UID]*v1.Event),
}
}

func (f *fakeEventSink) Create(event *v1.Event) (*v1.Event, error) {
f.Events[event.UID] = event
return event, nil
}

func (f *fakeEventSink) Update(event *v1.Event) (*v1.Event, error) {
f.Events[event.UID] = event
return event, nil
}

func (f *fakeEventSink) Patch(oldEvent *v1.Event, data []byte) (*v1.Event, error) {

Check warning on line 140 in manager/controller_test.go

View workflow job for this annotation

GitHub Actions / Lint Go

unused-parameter: parameter 'data' seems to be unused, consider removing or renaming it as _ (revive)
f.Events[oldEvent.UID] = oldEvent
return oldEvent, nil
}

func healthCheckPassing(addr string) func() bool {
return func() bool {
resp, err := http.Get(addr)
if err != nil {
return false
}

return resp.StatusCode == http.StatusOK
}
}
6 changes: 2 additions & 4 deletions manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ func (m *Manager) Start(ctx context.Context, controllers ...Controller) error {
}
m.RUnlock()

broadcaster := record.NewBroadcaster()

m.once.Do(func() {
m.Lock()
m.errG, ctx = errgroup.WithContext(ctx)
Expand All @@ -88,8 +86,8 @@ func (m *Manager) Start(ctx context.Context, controllers ...Controller) error {

// start broadcaster
m.errG.Go(func() error {
broadcaster.StartStructuredLogging(2)
broadcaster.StartRecordingToSink(m.sink)
m.broadcaster.StartStructuredLogging(2)
m.broadcaster.StartRecordingToSink(m.sink)
return nil
})

Expand Down

0 comments on commit 9069b5d

Please sign in to comment.