Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ISSUE-204 / Fix event publisher in aggregatestore/model #205

Merged
merged 1 commit into from
Mar 28, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion aggregatestore/model/aggregatestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ func (r *AggregateStore) Save(ctx context.Context, aggregate eh.Aggregate) error

// Publish events if supported by the aggregate.
if publisher, ok := aggregate.(EventPublisher); ok && r.bus != nil {
for _, e := range publisher.EventsToPublish() {
events := publisher.EventsToPublish()
publisher.ClearEvents()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will we lost all of events if an error happen?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line should move to after the loop

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I choose the same "guarantee" as the ES version, which is to clear all events before publishing, no matter what the result of publishing is. If an error happens here it will trickle to the command handler anyway, and should be handled by the client with a new request. There is a much broader issue about what to do with failing events in #181.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like it will work. I'll apply and check.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything checks out. The old code crashed without my hack. Everything works ok with upstream master now. Thanks.

for _, e := range events {
if err := r.bus.PublishEvent(ctx, e); err != nil {
return err
}
Expand Down
15 changes: 13 additions & 2 deletions aggregatestore/model/aggregatestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,15 @@ func TestAggregateStore_SaveWithPublish(t *testing.T) {
store, repo, bus := createStore(t)

ctx := context.Background()

id := eh.NewUUID()
agg := NewAggregate(id)
event := eh.NewEvent("test", nil, time.Now())

// Normal publish should publish events on the bus.
agg.PublishEvent(event)
if len(agg.SliceEventPublisher) != 1 {
t.Error("there should be one event to publish")
}
err := store.Save(ctx, agg)
if err != nil {
t.Error("there should be no error:", err)
Expand All @@ -154,13 +158,20 @@ func TestAggregateStore_SaveWithPublish(t *testing.T) {
if !reflect.DeepEqual(bus.Events, []eh.Event{event}) {
t.Error("there should be an event on the bus:", bus.Events)
}
if len(agg.SliceEventPublisher) != 0 {
t.Error("there should be no events to publish")
}

// Bus error.
// Simulate a bus error.
bus.Err = errors.New("bus error")
agg.PublishEvent(event)
err = store.Save(ctx, agg)
if err == nil || err.Error() != "bus error" {
t.Error("there should be an error named 'error':", err)
}
if len(agg.SliceEventPublisher) != 0 {
t.Error("there should be no events to publish")
}
}

func createStore(t *testing.T) (*AggregateStore, *mocks.Repo, *mocks.EventBus) {
Expand Down
7 changes: 7 additions & 0 deletions aggregatestore/model/eventpublisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
type EventPublisher interface {
// EventsToPublish returns all events to publish.
EventsToPublish() []eh.Event
// ClearEvents clears all events after a publish.
ClearEvents()
}

// SliceEventPublisher is an EventPublisher using a slice to store events.
Expand All @@ -38,3 +40,8 @@ func (a *SliceEventPublisher) PublishEvent(e eh.Event) {
func (a *SliceEventPublisher) EventsToPublish() []eh.Event {
return *a
}

// ClearEvents implements the ClearEvents method of the EventPublisher interface.
func (a *SliceEventPublisher) ClearEvents() {
*a = nil
}