Skip to content

Commit

Permalink
[examples] added some docs for sample-app
Browse files Browse the repository at this point in the history
  • Loading branch information
roblaszczak committed Nov 12, 2018
1 parent 121abe5 commit 35190ab
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 6 deletions.
2 changes: 2 additions & 0 deletions _examples/simple-app/app2/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type PostsCounter struct {
}

func (p PostsCounter) Count(msg *message.Message) ([]*message.Message, error) {
// in production use when implementing counter we probably want to make some kind of deduplication here

newCount, err := p.countStorage.CountAdd()
if err != nil {
return nil, errors.Wrap(err, "cannot add count")
Expand Down
7 changes: 7 additions & 0 deletions _examples/simple-app/app2/feed_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@ import (
"github.com/pkg/errors"
)

// intentionally not importing type from app1, because we don't need all data and we want to avoid coupling
type postAdded struct {
OccurredOn time.Time `json:"occurred_on"`
Author string `json:"author"`
Title string `json:"title"`
}

type feedStorage interface {
AddToFeed(title, author string, time time.Time) error
}
Expand Down
26 changes: 20 additions & 6 deletions _examples/simple-app/app2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,6 @@ import (
"github.com/ThreeDotsLabs/watermill/message/infrastructure/kafka"
)

type postAdded struct {
OccurredOn time.Time `json:"occurred_on"`
Author string `json:"author"`
Title string `json:"title"`
}

var (
marshaler = kafka.DefaultMarshaler{}
brokers = []string{"localhost:9092"}
Expand Down Expand Up @@ -51,23 +45,43 @@ func main() {
}

h.AddMiddleware(
// some, simple metrics
newMetricsMiddleware().Middleware,

// retry middleware retries message processing if error occurred in handler
poisonQueue.Middleware,

// if retries limit was exceeded, message is sent to poison queue (poison_queue topic)
retryMiddleware.Middleware,

// recovered recovers panic from handlers
middleware.Recoverer,

// correlation ID middleware adds to every produced message correlation id of consumed message,
// useful for debugging
middleware.CorrelationID,

// simulating error or panic from handler
middleware.RandomFail(0.1),
middleware.RandomPanic(0.1),
)

// close router when SIGTERM is sent
h.AddPlugin(plugin.SignalsHandler)

// handler which just counts added posts
h.AddHandler(
"posts_counter",
"app1-posts_published",
"posts_count",
message.NewPubSub(pub, createSubscriber("app2-posts_counter_v2", logger)),
PostsCounter{memoryCountStorage{new(int64)}}.Count,
)

// handler which generates "feed" from events post
//
// this implementation just prints it to stdout,
// but production ready implementation would save posts to some persistent storage
h.AddNoPublisherHandler(
"feed_generator",
"app1-posts_published",
Expand Down

0 comments on commit 35190ab

Please sign in to comment.