Skip to content

Commit

Permalink
Merge pull request #205 from maxekman/ISSUE-204/fix-eventpublisher
Browse files Browse the repository at this point in the history
ISSUE-204 / Fix event publisher in aggregatestore/model
  • Loading branch information
maxekman authored Mar 28, 2018
2 parents e1bbbab + ef1541c commit 7067a22
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 3 deletions.
4 changes: 3 additions & 1 deletion aggregatestore/model/aggregatestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ func (r *AggregateStore) Save(ctx context.Context, aggregate eh.Aggregate) error

// Publish events if supported by the aggregate.
if publisher, ok := aggregate.(EventPublisher); ok && r.bus != nil {
for _, e := range publisher.EventsToPublish() {
events := publisher.EventsToPublish()
publisher.ClearEvents()
for _, e := range events {
if err := r.bus.PublishEvent(ctx, e); err != nil {
return err
}
Expand Down
15 changes: 13 additions & 2 deletions aggregatestore/model/aggregatestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,15 @@ func TestAggregateStore_SaveWithPublish(t *testing.T) {
store, repo, bus := createStore(t)

ctx := context.Background()

id := eh.NewUUID()
agg := NewAggregate(id)
event := eh.NewEvent("test", nil, time.Now())

// Normal publish should publish events on the bus.
agg.PublishEvent(event)
if len(agg.SliceEventPublisher) != 1 {
t.Error("there should be one event to publish")
}
err := store.Save(ctx, agg)
if err != nil {
t.Error("there should be no error:", err)
Expand All @@ -154,13 +158,20 @@ func TestAggregateStore_SaveWithPublish(t *testing.T) {
if !reflect.DeepEqual(bus.Events, []eh.Event{event}) {
t.Error("there should be an event on the bus:", bus.Events)
}
if len(agg.SliceEventPublisher) != 0 {
t.Error("there should be no events to publish")
}

// Bus error.
// Simulate a bus error.
bus.Err = errors.New("bus error")
agg.PublishEvent(event)
err = store.Save(ctx, agg)
if err == nil || err.Error() != "bus error" {
t.Error("there should be an error named 'error':", err)
}
if len(agg.SliceEventPublisher) != 0 {
t.Error("there should be no events to publish")
}
}

func createStore(t *testing.T) (*AggregateStore, *mocks.Repo, *mocks.EventBus) {
Expand Down
7 changes: 7 additions & 0 deletions aggregatestore/model/eventpublisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
type EventPublisher interface {
// EventsToPublish returns all events to publish.
EventsToPublish() []eh.Event
// ClearEvents clears all events after a publish.
ClearEvents()
}

// SliceEventPublisher is an EventPublisher using a slice to store events.
Expand All @@ -38,3 +40,8 @@ func (a *SliceEventPublisher) PublishEvent(e eh.Event) {
func (a *SliceEventPublisher) EventsToPublish() []eh.Event {
return *a
}

// ClearEvents implements the ClearEvents method of the EventPublisher interface.
func (a *SliceEventPublisher) ClearEvents() {
*a = nil
}

0 comments on commit 7067a22

Please sign in to comment.