diff --git a/_examples/http-to-kafka/main.go b/_examples/http-to-kafka/main.go index 9eb3a6cfa..14ee6ef47 100644 --- a/_examples/http-to-kafka/main.go +++ b/_examples/http-to-kafka/main.go @@ -17,7 +17,6 @@ import ( "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message/infrastructure/http" "github.com/ThreeDotsLabs/watermill/message/infrastructure/kafka" - "github.com/ThreeDotsLabs/watermill/message/infrastructure/kafka/marshal" ) type GitlabWebhook struct { @@ -27,7 +26,7 @@ type GitlabWebhook struct { func main() { logger := watermill.NewStdLogger(true, true) - kafkaPublisher, err := kafka.NewPublisher([]string{"localhost:9092"}, marshal.ConfluentKafka{}, nil) + kafkaPublisher, err := kafka.NewPublisher([]string{"localhost:9092"}, kafka.DefaultMarshaler{}, nil) if err != nil { panic(err) } diff --git a/_examples/simple-app/app1/main.go b/_examples/simple-app/app1/main.go index 91f8d7ae7..c918ceb7a 100644 --- a/_examples/simple-app/app1/main.go +++ b/_examples/simple-app/app1/main.go @@ -11,7 +11,6 @@ import ( "github.com/ThreeDotsLabs/watermill/message" "github.com/ThreeDotsLabs/watermill/message/infrastructure/kafka" - "github.com/ThreeDotsLabs/watermill/message/infrastructure/kafka/marshal" "github.com/ThreeDotsLabs/watermill/message/router/middleware" "github.com/renstrom/shortuuid" ) @@ -27,6 +26,7 @@ type postAdded struct { var letters = []rune("abcdefghijklmnopqrstuvwxyz") +// randString generates random string of len n func randString(n int) string { b := make([]rune, n) for i := range b { @@ -36,13 +36,14 @@ func randString(n int) string { } func main() { - publisher, err := kafka.NewPublisher([]string{"localhost:9092"}, marshal.ConfluentKafka{}, nil) + publisher, err := kafka.NewPublisher([]string{"localhost:9092"}, kafka.DefaultMarshaler{}, nil) if err != nil { panic(err) } defer publisher.Close() messagesToAdd := 1000 + workers := 25 msgAdded := make(chan struct{}) allMessagesAdded := make(chan struct{}) @@ -60,7 +61,7 @@ func main() { } }() - for num := 0; num < 25; num++ { + for num := 0; num < workers; num++ { go func() { var msgPayload postAdded var msg *message.Message @@ -71,15 +72,17 @@ func main() { msgPayload.Title = randString(15) msgPayload.Content = randString(30) - b, err := json.Marshal(msgPayload) + payload, err := json.Marshal(msgPayload) if err != nil { panic(err) } - msg = message.NewMessage(uuid.NewV4().String(), b) + msg = message.NewMessage(uuid.NewV4().String(), payload) + + // using function from middleware to set correlation id, useful for debugging middleware.SetCorrelationID(shortuuid.New(), msg) - err = publisher.Publish("test_topic", msg) + err = publisher.Publish("app1-posts_published", msg) if err != nil { log.Println("cannot publish message:", err) continue @@ -89,5 +92,6 @@ func main() { }() } + // waiting to all being produced <-allMessagesAdded } diff --git a/_examples/simple-app/app2/main.go b/_examples/simple-app/app2/main.go index c2f8571ac..54e7733f7 100644 --- a/_examples/simple-app/app2/main.go +++ b/_examples/simple-app/app2/main.go @@ -10,7 +10,6 @@ import ( "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message/infrastructure/kafka" - "github.com/ThreeDotsLabs/watermill/message/infrastructure/kafka/marshal" ) type postAdded struct { @@ -20,7 +19,7 @@ type postAdded struct { } var ( - marshaler = marshal.ConfluentKafka{} + marshaler = kafka.DefaultMarshaler{} brokers = []string{"localhost:9092"} logger = watermill.NewStdLogger(false, false) @@ -64,14 +63,14 @@ func main() { h.AddHandler( "posts_counter", - "test_topic", + "app1-posts_published", "posts_count", message.NewPubSub(pub, createSubscriber("app2-posts_counter_v2", logger)), PostsCounter{memoryCountStorage{new(int64)}}.Count, ) h.AddNoPublisherHandler( "feed_generator", - "test_topic", + "app1-posts_published", createSubscriber("app2-feed_generator_v2", logger), FeedGenerator{printFeedStorage{}}.UpdateFeed, ) diff --git a/message/infrastructure/kafka/marshal/confluent.go b/message/infrastructure/kafka/marshal/confluent.go deleted file mode 100644 index 09747a859..000000000 --- a/message/infrastructure/kafka/marshal/confluent.go +++ /dev/null @@ -1,83 +0,0 @@ -package marshal - -import ( - "github.com/ThreeDotsLabs/watermill/message" - "github.com/ThreeDotsLabs/watermill/message/infrastructure/kafka" - confluentKafka "github.com/confluentinc/confluent-kafka-go/kafka" - "github.com/pkg/errors" -) - -const UUIDHeaderKey = "_watermill_message_uuid" - -type ConfluentKafka struct{} - -func (ConfluentKafka) Marshal(topic string, msg *message.Message) (*confluentKafka.Message, error) { - if value := msg.Metadata.Get(UUIDHeaderKey); value != "" { - return nil, errors.Errorf("metadata %s is reserved by watermil for message UUID", UUIDHeaderKey) - } - - headers := []confluentKafka.Header{{ - Key: UUIDHeaderKey, - Value: []byte(msg.UUID), - }} - for key, value := range msg.Metadata { - headers = append(headers, confluentKafka.Header{ - Key: key, - Value: []byte(value), - }) - } - - return &confluentKafka.Message{ - TopicPartition: confluentKafka.TopicPartition{Topic: &topic, Partition: confluentKafka.PartitionAny}, - Value: msg.Payload, - Headers: headers, - }, nil -} - -func (ConfluentKafka) Unmarshal(kafkaMsg *confluentKafka.Message) (*message.Message, error) { - var messageID string - metadata := make(message.Metadata, len(kafkaMsg.Headers)) - - for _, header := range kafkaMsg.Headers { - if header.Key == UUIDHeaderKey { - messageID = string(header.Value) - } else { - metadata.Set(header.Key, string(header.Value)) - } - } - - msg := message.NewMessage( - messageID, - kafkaMsg.Value, - ) - msg.Metadata = metadata - - return msg, nil -} - -type GeneratePartitionKey func(topic string, msg *message.Message) (string, error) - -type kafkaJsonWithPartitioning struct { - ConfluentKafka - - generatePartitionKey GeneratePartitionKey -} - -func NewKafkaJsonWithPartitioning(generatePartitionKey GeneratePartitionKey) kafka.MarshalerUnmarshaler { - return kafkaJsonWithPartitioning{generatePartitionKey: generatePartitionKey} -} - -func (j kafkaJsonWithPartitioning) Marshal(topic string, msg *message.Message) (*confluentKafka.Message, error) { - kafkaMsg, err := j.ConfluentKafka.Marshal(topic, msg) - if err != nil { - return nil, err - } - - key, err := j.generatePartitionKey(topic, msg) - if err != nil { - return nil, errors.Wrap(err, "cannot generate partition key") - } - kafkaMsg.Key = []byte(key) - - return kafkaMsg, nil -} diff --git a/message/infrastructure/kafka/marshaler.go b/message/infrastructure/kafka/marshaler.go index f4601d535..8b87c598b 100644 --- a/message/infrastructure/kafka/marshaler.go +++ b/message/infrastructure/kafka/marshaler.go @@ -2,18 +2,94 @@ package kafka import ( "github.com/ThreeDotsLabs/watermill/message" - "github.com/confluentinc/confluent-kafka-go/kafka" + confluentKafka "github.com/confluentinc/confluent-kafka-go/kafka" + "github.com/pkg/errors" ) type Marshaler interface { - Marshal(topic string, msg *message.Message) (*kafka.Message, error) + Marshal(topic string, msg *message.Message) (*confluentKafka.Message, error) } type Unmarshaler interface { - Unmarshal(*kafka.Message) (*message.Message, error) + Unmarshal(*confluentKafka.Message) (*message.Message, error) } type MarshalerUnmarshaler interface { Marshaler Unmarshaler } + +const UUIDHeaderKey = "_watermill_message_uuid" + +type DefaultMarshaler struct{} + +func (DefaultMarshaler) Marshal(topic string, msg *message.Message) (*confluentKafka.Message, error) { + if value := msg.Metadata.Get(UUIDHeaderKey); value != "" { + return nil, errors.Errorf("metadata %s is reserved by watermil for message UUID", UUIDHeaderKey) + } + + headers := []confluentKafka.Header{{ + Key: UUIDHeaderKey, + Value: []byte(msg.UUID), + }} + for key, value := range msg.Metadata { + headers = append(headers, confluentKafka.Header{ + Key: key, + Value: []byte(value), + }) + } + + return &confluentKafka.Message{ + TopicPartition: confluentKafka.TopicPartition{Topic: &topic, Partition: confluentKafka.PartitionAny}, + Value: msg.Payload, + Headers: headers, + }, nil +} + +func (DefaultMarshaler) Unmarshal(kafkaMsg *confluentKafka.Message) (*message.Message, error) { + var messageID string + metadata := make(message.Metadata, len(kafkaMsg.Headers)) + + for _, header := range kafkaMsg.Headers { + if header.Key == UUIDHeaderKey { + messageID = string(header.Value) + } else { + metadata.Set(header.Key, string(header.Value)) + } + } + + msg := message.NewMessage( + messageID, + kafkaMsg.Value, + ) + msg.Metadata = metadata + + return msg, nil +} + +type GeneratePartitionKey func(topic string, msg *message.Message) (string, error) + +type kafkaJsonWithPartitioning struct { + DefaultMarshaler + + generatePartitionKey GeneratePartitionKey +} + +func NewWithPartitioningMarshaler(generatePartitionKey GeneratePartitionKey) MarshalerUnmarshaler { + return kafkaJsonWithPartitioning{generatePartitionKey: generatePartitionKey} +} + +func (j kafkaJsonWithPartitioning) Marshal(topic string, msg *message.Message) (*confluentKafka.Message, error) { + kafkaMsg, err := j.DefaultMarshaler.Marshal(topic, msg) + if err != nil { + return nil, err + } + + key, err := j.generatePartitionKey(topic, msg) + if err != nil { + return nil, errors.Wrap(err, "cannot generate partition key") + } + kafkaMsg.Key = []byte(key) + + return kafkaMsg, nil +} diff --git a/message/infrastructure/kafka/marshal/confluent_test.go b/message/infrastructure/kafka/marshaler_test.go similarity index 88% rename from message/infrastructure/kafka/marshal/confluent_test.go rename to message/infrastructure/kafka/marshaler_test.go index 90fe993ab..2cd0e7d9d 100644 --- a/message/infrastructure/kafka/marshal/confluent_test.go +++ b/message/infrastructure/kafka/marshaler_test.go @@ -1,17 +1,18 @@ -package marshal_test +package kafka_test import ( "testing" + "github.com/ThreeDotsLabs/watermill/message/infrastructure/kafka" + "github.com/ThreeDotsLabs/watermill/message" - "github.com/ThreeDotsLabs/watermill/message/infrastructure/kafka/marshal" "github.com/satori/go.uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) func TestJson(t *testing.T) { - m := marshal.ConfluentKafka{} + m := kafka.DefaultMarshaler{} msg := message.NewMessage(uuid.NewV4().String(), []byte("payload")) msg.Metadata.Set("foo", "bar") @@ -28,7 +29,7 @@ func TestJson(t *testing.T) { } func TestJsonWithPartitioning(t *testing.T) { - m := marshal.NewKafkaJsonWithPartitioning(func(topic string, msg *message.Message) (string, error) { + m := kafka.NewWithPartitioningMarshaler(func(topic string, msg *message.Message) (string, error) { return msg.Metadata.Get("partition"), nil }) diff --git a/message/infrastructure/kafka/pubsub_bench_test.go b/message/infrastructure/kafka/pubsub_bench_test.go index e3989264a..b5b72a7f0 100644 --- a/message/infrastructure/kafka/pubsub_bench_test.go +++ b/message/infrastructure/kafka/pubsub_bench_test.go @@ -7,13 +7,12 @@ import ( "github.com/ThreeDotsLabs/watermill/message" "github.com/ThreeDotsLabs/watermill/message/infrastructure" "github.com/ThreeDotsLabs/watermill/message/infrastructure/kafka" - "github.com/ThreeDotsLabs/watermill/message/infrastructure/kafka/marshal" ) func BenchmarkSubscriber(b *testing.B) { infrastructure.BenchSubscriber(b, func(n int) message.PubSub { logger := watermill.NopLogger{} - marshaler := marshal.ConfluentKafka{} + marshaler := kafka.DefaultMarshaler{} publisher, err := kafka.NewPublisher(brokers, marshaler, nil) if err != nil { diff --git a/message/infrastructure/kafka/pubsub_test.go b/message/infrastructure/kafka/pubsub_test.go index 5bfef4acf..5d98c1be7 100644 --- a/message/infrastructure/kafka/pubsub_test.go +++ b/message/infrastructure/kafka/pubsub_test.go @@ -7,7 +7,6 @@ import ( "github.com/ThreeDotsLabs/watermill/message" "github.com/ThreeDotsLabs/watermill/message/infrastructure" "github.com/ThreeDotsLabs/watermill/message/infrastructure/kafka" - "github.com/ThreeDotsLabs/watermill/message/infrastructure/kafka/marshal" "github.com/stretchr/testify/require" ) @@ -39,7 +38,7 @@ func generatePartitionKey(topic string, msg *message.Message) (string, error) { } func createPubSubWithConsumerGrup(t *testing.T, consumerGroup string) message.PubSub { - return newPubSub(t, marshal.ConfluentKafka{}, consumerGroup) + return newPubSub(t, kafka.DefaultMarshaler{}, consumerGroup) } func createPubSub(t *testing.T) message.PubSub { @@ -47,13 +46,13 @@ func createPubSub(t *testing.T) message.PubSub { } func createPartitionedPubSub(t *testing.T) message.PubSub { - return newPubSub(t, marshal.NewKafkaJsonWithPartitioning(generatePartitionKey), "test") + return newPubSub(t, kafka.NewWithPartitioningMarshaler(generatePartitionKey), "test") } func createNoGroupSubscriberConstructor(t *testing.T) message.Subscriber { logger := watermill.NewStdLogger(true, true) - marshaler := marshal.ConfluentKafka{} + marshaler := kafka.DefaultMarshaler{} sub, err := kafka.NewConfluentSubscriber( kafka.SubscriberConfig{