From ef1541cfd0d7e4d7d4bf99477d3398f7071c7ab8 Mon Sep 17 00:00:00 2001 From: Max Ekman Date: Wed, 28 Mar 2018 10:14:20 +0200 Subject: [PATCH] Fix event publisher in aggregatestore/model --- aggregatestore/model/aggregatestore.go | 4 +++- aggregatestore/model/aggregatestore_test.go | 15 +++++++++++++-- aggregatestore/model/eventpublisher.go | 7 +++++++ 3 files changed, 23 insertions(+), 3 deletions(-) diff --git a/aggregatestore/model/aggregatestore.go b/aggregatestore/model/aggregatestore.go index 30fc0a89..c87af864 100644 --- a/aggregatestore/model/aggregatestore.go +++ b/aggregatestore/model/aggregatestore.go @@ -75,7 +75,9 @@ func (r *AggregateStore) Save(ctx context.Context, aggregate eh.Aggregate) error // Publish events if supported by the aggregate. if publisher, ok := aggregate.(EventPublisher); ok && r.bus != nil { - for _, e := range publisher.EventsToPublish() { + events := publisher.EventsToPublish() + publisher.ClearEvents() + for _, e := range events { if err := r.bus.PublishEvent(ctx, e); err != nil { return err } diff --git a/aggregatestore/model/aggregatestore_test.go b/aggregatestore/model/aggregatestore_test.go index d6d9dec4..919cc202 100644 --- a/aggregatestore/model/aggregatestore_test.go +++ b/aggregatestore/model/aggregatestore_test.go @@ -139,11 +139,15 @@ func TestAggregateStore_SaveWithPublish(t *testing.T) { store, repo, bus := createStore(t) ctx := context.Background() - id := eh.NewUUID() agg := NewAggregate(id) event := eh.NewEvent("test", nil, time.Now()) + + // Normal publish should publish events on the bus. agg.PublishEvent(event) + if len(agg.SliceEventPublisher) != 1 { + t.Error("there should be one event to publish") + } err := store.Save(ctx, agg) if err != nil { t.Error("there should be no error:", err) @@ -154,13 +158,20 @@ func TestAggregateStore_SaveWithPublish(t *testing.T) { if !reflect.DeepEqual(bus.Events, []eh.Event{event}) { t.Error("there should be an event on the bus:", bus.Events) } + if len(agg.SliceEventPublisher) != 0 { + t.Error("there should be no events to publish") + } - // Bus error. + // Simulate a bus error. bus.Err = errors.New("bus error") + agg.PublishEvent(event) err = store.Save(ctx, agg) if err == nil || err.Error() != "bus error" { t.Error("there should be an error named 'error':", err) } + if len(agg.SliceEventPublisher) != 0 { + t.Error("there should be no events to publish") + } } func createStore(t *testing.T) (*AggregateStore, *mocks.Repo, *mocks.EventBus) { diff --git a/aggregatestore/model/eventpublisher.go b/aggregatestore/model/eventpublisher.go index b0940410..7a98330c 100644 --- a/aggregatestore/model/eventpublisher.go +++ b/aggregatestore/model/eventpublisher.go @@ -23,6 +23,8 @@ import ( type EventPublisher interface { // EventsToPublish returns all events to publish. EventsToPublish() []eh.Event + // ClearEvents clears all events after a publish. + ClearEvents() } // SliceEventPublisher is an EventPublisher using a slice to store events. @@ -38,3 +40,8 @@ func (a *SliceEventPublisher) PublishEvent(e eh.Event) { func (a *SliceEventPublisher) EventsToPublish() []eh.Event { return *a } + +// ClearEvents implements the ClearEvents method of the EventPublisher interface. +func (a *SliceEventPublisher) ClearEvents() { + *a = nil +}