Skip to content

Commit

Permalink
controller: do not propagate cache events when there were no updates …
Browse files Browse the repository at this point in the history
…to the snapshot

Signed-off-by: Guilherme Cassolato <[email protected]>
  • Loading branch information
guicassolato committed Oct 4, 2024
1 parent d4be292 commit ee6fcfe
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 43 deletions.
101 changes: 58 additions & 43 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/go-logr/logr"
"github.com/samber/lo"
"github.com/telepresenceio/watchable"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
Expand Down Expand Up @@ -264,54 +265,68 @@ func (c *Controller) propagate(resourceEvents []ResourceEvent) {
}

func (c *Controller) subscribe(ctx context.Context) {
oldObjs := make(Store)
// init and subscribe resource store
c.cache.LoadOrStore(resourceStoreId, Store{})
subscription := c.cache.SubscribeSubset(ctx, func(storeId string, _ Store) bool {
return storeId == resourceStoreId
})
// handle cache events
objs := make(Store)
go func() {
for snapshot := range subscription {
c.Lock()

newObjs := snapshot.State[resourceStoreId]

events := lo.FilterMap(lo.Keys(newObjs), func(uid string, _ int) (ResourceEvent, bool) {
newObj := newObjs[uid]
event := ResourceEvent{
Kind: newObj.GetObjectKind().GroupVersionKind().GroupKind(),
NewObject: newObj,
}
if oldObj, exists := oldObjs[uid]; !exists {
event.EventType = CreateEvent
oldObjs[uid] = newObj
return event, true
} else if !reflect.DeepEqual(oldObj, newObj) {
event.EventType = UpdateEvent
event.OldObject = oldObj
oldObjs[uid] = newObj
return event, true
}
return event, false
})

deleteEvents := lo.FilterMap(lo.Keys(oldObjs), func(uid string, _ int) (ResourceEvent, bool) {
oldObj := oldObjs[uid]
event := ResourceEvent{
EventType: DeleteEvent,
Kind: oldObj.GetObjectKind().GroupVersionKind().GroupKind(),
OldObject: oldObj,
}
_, exists := newObjs[uid]
if !exists {
delete(oldObjs, uid)
}
return event, !exists
})

events = append(events, deleteEvents...)

c.propagate(events)

c.Unlock()
objs = c.handleCacheEvent(snapshot, objs)
}
}()
}

func (c *Controller) handleCacheEvent(snapshot watchable.Snapshot[string, Store], objs Store) Store {
c.Lock()
defer c.Unlock()

if len(snapshot.Updates) == 0 {
return objs
}

newObjs := snapshot.State[resourceStoreId]

events := lo.FilterMap(lo.Keys(newObjs), func(uid string, _ int) (ResourceEvent, bool) {
newObj := newObjs[uid]
event := ResourceEvent{
Kind: newObj.GetObjectKind().GroupVersionKind().GroupKind(),
NewObject: newObj,
}
if obj, exists := objs[uid]; !exists {
event.EventType = CreateEvent
objs[uid] = newObj
return event, true
} else if !reflect.DeepEqual(obj, newObj) {
event.EventType = UpdateEvent
event.OldObject = obj
objs[uid] = newObj
return event, true
}
return event, false
})

deleteEvents := lo.FilterMap(lo.Keys(objs), func(uid string, _ int) (ResourceEvent, bool) {
obj := objs[uid]
event := ResourceEvent{
EventType: DeleteEvent,
Kind: obj.GetObjectKind().GroupVersionKind().GroupKind(),
OldObject: obj,
}
_, exists := newObjs[uid]
if !exists {
delete(objs, uid)
}
return event, !exists
})

events = append(events, deleteEvents...)

if len(events) > 0 { // this condition is actually redundant; if the snapshot has updates, there must be events
c.propagate(events)
}

return objs
}
22 changes: 22 additions & 0 deletions controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,3 +225,25 @@ func TestStartControllerUnmanaged(t *testing.T) {
}()
time.Sleep(3 * time.Second)
}

func TestCacheSubscription(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
count := 0
c := NewController(WithReconcile(func(context.Context, []ResourceEvent, *machinery.Topology, error, *sync.Map) error {
count++
return nil
}))

c.subscribe(ctx)
time.Sleep(1 * time.Second)
if count != 0 {
t.Errorf("expected no reconcile call, got %d", count)
}

c.add(&corev1.Service{ObjectMeta: metav1.ObjectMeta{Name: "test-service", UID: "7ed703a2-635d-4002-a825-5624823760a5"}})
time.Sleep(1 * time.Second)
if count != 1 {
t.Errorf("expected 1 reconcile call, got %d", count)
}
}

0 comments on commit ee6fcfe

Please sign in to comment.