Skip to content

Commit

Permalink
[kafka subscriber] moved marshaler to main package
Browse files Browse the repository at this point in the history
  • Loading branch information
roblaszczak committed Nov 12, 2018
1 parent bca267a commit 121abe5
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 108 deletions.
3 changes: 1 addition & 2 deletions _examples/http-to-kafka/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message/infrastructure/http"
"github.com/ThreeDotsLabs/watermill/message/infrastructure/kafka"
"github.com/ThreeDotsLabs/watermill/message/infrastructure/kafka/marshal"
)

type GitlabWebhook struct {
Expand All @@ -27,7 +26,7 @@ type GitlabWebhook struct {
func main() {
logger := watermill.NewStdLogger(true, true)

kafkaPublisher, err := kafka.NewPublisher([]string{"localhost:9092"}, marshal.ConfluentKafka{}, nil)
kafkaPublisher, err := kafka.NewPublisher([]string{"localhost:9092"}, kafka.DefaultMarshaler{}, nil)
if err != nil {
panic(err)
}
Expand Down
16 changes: 10 additions & 6 deletions _examples/simple-app/app1/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/message/infrastructure/kafka"
"github.com/ThreeDotsLabs/watermill/message/infrastructure/kafka/marshal"
"github.com/ThreeDotsLabs/watermill/message/router/middleware"
"github.com/renstrom/shortuuid"
)
Expand All @@ -27,6 +26,7 @@ type postAdded struct {

var letters = []rune("abcdefghijklmnopqrstuvwxyz")

// randString generates random string of len n
func randString(n int) string {
b := make([]rune, n)
for i := range b {
Expand All @@ -36,13 +36,14 @@ func randString(n int) string {
}

func main() {
publisher, err := kafka.NewPublisher([]string{"localhost:9092"}, marshal.ConfluentKafka{}, nil)
publisher, err := kafka.NewPublisher([]string{"localhost:9092"}, kafka.DefaultMarshaler{}, nil)
if err != nil {
panic(err)
}
defer publisher.Close()

messagesToAdd := 1000
workers := 25

msgAdded := make(chan struct{})
allMessagesAdded := make(chan struct{})
Expand All @@ -60,7 +61,7 @@ func main() {
}
}()

for num := 0; num < 25; num++ {
for num := 0; num < workers; num++ {
go func() {
var msgPayload postAdded
var msg *message.Message
Expand All @@ -71,15 +72,17 @@ func main() {
msgPayload.Title = randString(15)
msgPayload.Content = randString(30)

b, err := json.Marshal(msgPayload)
payload, err := json.Marshal(msgPayload)
if err != nil {
panic(err)
}
msg = message.NewMessage(uuid.NewV4().String(), b)

msg = message.NewMessage(uuid.NewV4().String(), payload)

// using function from middleware to set correlation id, useful for debugging
middleware.SetCorrelationID(shortuuid.New(), msg)

err = publisher.Publish("test_topic", msg)
err = publisher.Publish("app1-posts_published", msg)
if err != nil {
log.Println("cannot publish message:", err)
continue
Expand All @@ -89,5 +92,6 @@ func main() {
}()
}

// waiting to all being produced
<-allMessagesAdded
}
7 changes: 3 additions & 4 deletions _examples/simple-app/app2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message/infrastructure/kafka"
"github.com/ThreeDotsLabs/watermill/message/infrastructure/kafka/marshal"
)

type postAdded struct {
Expand All @@ -20,7 +19,7 @@ type postAdded struct {
}

var (
marshaler = marshal.ConfluentKafka{}
marshaler = kafka.DefaultMarshaler{}
brokers = []string{"localhost:9092"}

logger = watermill.NewStdLogger(false, false)
Expand Down Expand Up @@ -64,14 +63,14 @@ func main() {

h.AddHandler(
"posts_counter",
"test_topic",
"app1-posts_published",
"posts_count",
message.NewPubSub(pub, createSubscriber("app2-posts_counter_v2", logger)),
PostsCounter{memoryCountStorage{new(int64)}}.Count,
)
h.AddNoPublisherHandler(
"feed_generator",
"test_topic",
"app1-posts_published",
createSubscriber("app2-feed_generator_v2", logger),
FeedGenerator{printFeedStorage{}}.UpdateFeed,
)
Expand Down
83 changes: 0 additions & 83 deletions message/infrastructure/kafka/marshal/confluent.go

This file was deleted.

82 changes: 79 additions & 3 deletions message/infrastructure/kafka/marshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,94 @@ package kafka

import (
"github.com/ThreeDotsLabs/watermill/message"
"github.com/confluentinc/confluent-kafka-go/kafka"
confluentKafka "github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/pkg/errors"
)

type Marshaler interface {
Marshal(topic string, msg *message.Message) (*kafka.Message, error)
Marshal(topic string, msg *message.Message) (*confluentKafka.Message, error)
}

type Unmarshaler interface {
Unmarshal(*kafka.Message) (*message.Message, error)
Unmarshal(*confluentKafka.Message) (*message.Message, error)
}

type MarshalerUnmarshaler interface {
Marshaler
Unmarshaler
}

const UUIDHeaderKey = "_watermill_message_uuid"

type DefaultMarshaler struct{}

func (DefaultMarshaler) Marshal(topic string, msg *message.Message) (*confluentKafka.Message, error) {
if value := msg.Metadata.Get(UUIDHeaderKey); value != "" {
return nil, errors.Errorf("metadata %s is reserved by watermil for message UUID", UUIDHeaderKey)
}

headers := []confluentKafka.Header{{
Key: UUIDHeaderKey,
Value: []byte(msg.UUID),
}}
for key, value := range msg.Metadata {
headers = append(headers, confluentKafka.Header{
Key: key,
Value: []byte(value),
})
}

return &confluentKafka.Message{
TopicPartition: confluentKafka.TopicPartition{Topic: &topic, Partition: confluentKafka.PartitionAny},
Value: msg.Payload,
Headers: headers,
}, nil
}

func (DefaultMarshaler) Unmarshal(kafkaMsg *confluentKafka.Message) (*message.Message, error) {
var messageID string
metadata := make(message.Metadata, len(kafkaMsg.Headers))

for _, header := range kafkaMsg.Headers {
if header.Key == UUIDHeaderKey {
messageID = string(header.Value)
} else {
metadata.Set(header.Key, string(header.Value))
}
}

msg := message.NewMessage(
messageID,
kafkaMsg.Value,
)
msg.Metadata = metadata

return msg, nil
}

type GeneratePartitionKey func(topic string, msg *message.Message) (string, error)

type kafkaJsonWithPartitioning struct {
DefaultMarshaler

generatePartitionKey GeneratePartitionKey
}

func NewWithPartitioningMarshaler(generatePartitionKey GeneratePartitionKey) MarshalerUnmarshaler {
return kafkaJsonWithPartitioning{generatePartitionKey: generatePartitionKey}
}

func (j kafkaJsonWithPartitioning) Marshal(topic string, msg *message.Message) (*confluentKafka.Message, error) {
kafkaMsg, err := j.DefaultMarshaler.Marshal(topic, msg)
if err != nil {
return nil, err
}

key, err := j.generatePartitionKey(topic, msg)
if err != nil {
return nil, errors.Wrap(err, "cannot generate partition key")
}
kafkaMsg.Key = []byte(key)

return kafkaMsg, nil
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
package marshal_test
package kafka_test

import (
"testing"

"github.com/ThreeDotsLabs/watermill/message/infrastructure/kafka"

"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/message/infrastructure/kafka/marshal"
"github.com/satori/go.uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestJson(t *testing.T) {
m := marshal.ConfluentKafka{}
m := kafka.DefaultMarshaler{}

msg := message.NewMessage(uuid.NewV4().String(), []byte("payload"))
msg.Metadata.Set("foo", "bar")
Expand All @@ -28,7 +29,7 @@ func TestJson(t *testing.T) {
}

func TestJsonWithPartitioning(t *testing.T) {
m := marshal.NewKafkaJsonWithPartitioning(func(topic string, msg *message.Message) (string, error) {
m := kafka.NewWithPartitioningMarshaler(func(topic string, msg *message.Message) (string, error) {
return msg.Metadata.Get("partition"), nil
})

Expand Down
3 changes: 1 addition & 2 deletions message/infrastructure/kafka/pubsub_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@ import (
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/message/infrastructure"
"github.com/ThreeDotsLabs/watermill/message/infrastructure/kafka"
"github.com/ThreeDotsLabs/watermill/message/infrastructure/kafka/marshal"
)

func BenchmarkSubscriber(b *testing.B) {
infrastructure.BenchSubscriber(b, func(n int) message.PubSub {
logger := watermill.NopLogger{}
marshaler := marshal.ConfluentKafka{}
marshaler := kafka.DefaultMarshaler{}

publisher, err := kafka.NewPublisher(brokers, marshaler, nil)
if err != nil {
Expand Down
7 changes: 3 additions & 4 deletions message/infrastructure/kafka/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/message/infrastructure"
"github.com/ThreeDotsLabs/watermill/message/infrastructure/kafka"
"github.com/ThreeDotsLabs/watermill/message/infrastructure/kafka/marshal"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -39,21 +38,21 @@ func generatePartitionKey(topic string, msg *message.Message) (string, error) {
}

func createPubSubWithConsumerGrup(t *testing.T, consumerGroup string) message.PubSub {
return newPubSub(t, marshal.ConfluentKafka{}, consumerGroup)
return newPubSub(t, kafka.DefaultMarshaler{}, consumerGroup)
}

func createPubSub(t *testing.T) message.PubSub {
return createPubSubWithConsumerGrup(t, "test")
}

func createPartitionedPubSub(t *testing.T) message.PubSub {
return newPubSub(t, marshal.NewKafkaJsonWithPartitioning(generatePartitionKey), "test")
return newPubSub(t, kafka.NewWithPartitioningMarshaler(generatePartitionKey), "test")
}

func createNoGroupSubscriberConstructor(t *testing.T) message.Subscriber {
logger := watermill.NewStdLogger(true, true)

marshaler := marshal.ConfluentKafka{}
marshaler := kafka.DefaultMarshaler{}

sub, err := kafka.NewConfluentSubscriber(
kafka.SubscriberConfig{
Expand Down

0 comments on commit 121abe5

Please sign in to comment.