Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add generic jsonendpoint adapter #333

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 131 additions & 0 deletions components/jsonendpoint/endpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*

Package jsonendpoint provides a generic adapter that converts a validated JSON request into a [message.Message] published to a certain topic. Use together with an HTTP router to build gateways to any [message.Publisher].

# Usage Example

Imagine a fairly standard situation of collecting newsletter signups from a Restful API:

type NewsletterSignup struct {
Name string
Email string
}

// Validate satisfies [Validator] interface.
func (n *NewsletterSignup) Validate() error {
if n.Name == "" || n.Email == "" {
return errors.New("newsletter signup requires both name and email address")
}
return nil
}

func main() {
pubSub := gochannel.NewGoChannel(gochannel.Config{
OutputChannelBuffer: 100,
Persistent: true,
},
watermill.NewStdLogger(true, true),
)

endpoint := New(
1024*1024, // HTTP readLimit
func(m *NewsletterSignup) (*message.Message, error) { // converter
payload, err := json.Marshal(m)
if err != nil {
return nil, fmt.Errorf("failed to encode: %w", err)
}
return message.NewMessage(watermill.NewUUID(), payload), nil
},
"newsletter/signup", // Watermill topic
pubSub)

// ... setup HTTP server and router
router.Post("/api/v1/newsletter/signup", endpoint)
}

JSON HTTP post requests that hit "/api/v1/newsletter/signup" will get parsed, validated, and converted into a [message.Message].
*/
package jsonendpoint

import (
"encoding/json"
"errors"
"fmt"
"net/http"

"github.com/ThreeDotsLabs/watermill/message"
)

// Validatable is a generic interface that requires type T to be a pointer and implement the Validate method. It complements the adapter definitions. See <https://stackoverflow.com/questions/72090387/what-is-the-generic-type-for-a-pointer-that-implements-an-interface>.
type Validatable[T any] interface {
*T
Validate() error
}

// New creates an adapter that converts an HTTP request to a [message.Message] and sends it to the [message.Publisher] topic. Enforces message validation on the generic type.
func New[T any, P Validatable[T]](readLimit int64, converter func(P) (*message.Message, error), topic string, p message.Publisher) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
var (
in = new(T) //*IN
err error
)
defer func() {
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
_ = json.NewEncoder(w).Encode(map[string]string{
"Error": err.Error(),
})
}
}()

if r.Method != http.MethodPost {
err = fmt.Errorf("request method %q is not supported", r.Method)
return
}

reader := http.MaxBytesReader(w, r.Body, readLimit)
defer reader.Close()

err = json.NewDecoder(reader).Decode(in)
if err != nil {
r.Body.Close()
err = errors.New("JSON decoding failure: " + err.Error())
return
}
r.Body.Close()

if in == nil {
err = errors.New("no post data provided")
return
}

if err = P(in).Validate(); err != nil {
err = fmt.Errorf("failed to validate: %w", err)
return
}

message, err := converter(in)
if err != nil {
err = fmt.Errorf("failed to construct a message: %w", err)
return
}
message.SetContext(r.Context())

if err = p.Publish(topic, message); err != nil {
err = fmt.Errorf("publisher rejected the message: %w", err)
return
}

response, err := json.Marshal(map[string]string{
"UUID": message.UUID,
})
if err != nil {
r.Body.Close()
err = errors.New("JSON encoding failure: " + err.Error())
return
}

_, err = w.Write(response)
}
}
117 changes: 117 additions & 0 deletions components/jsonendpoint/endpoint_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package jsonendpoint

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
"testing"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)

const testTopic = "testTopic"

type testMessage struct {
Title string
Number int
}

func (m *testMessage) ToBytes(t *testing.T) []byte {
result, err := json.Marshal(m)
if err != nil {
t.Fatalf("failed to encode to bytes: %v", err)
}
return result
}

func (m *testMessage) Validate() error {
if m.Title == "" {
return errors.New("title is required")
}
if m.Number == 0 {
return errors.New("number is required")
}
return nil
}

func TestEndpointCreationIntegration(t *testing.T) {
pubSub := gochannel.NewGoChannel(gochannel.Config{
OutputChannelBuffer: 100,
Persistent: false,
},
watermill.NewStdLogger(true, true),
)

m := &testMessage{
Title: "title",
Number: 9,
}

// err := pubSub.Publish(testTopic, m)
// if err != nil {
// t.Fatalf("cannot publish the test message: %v", err)
// }

messages, err := pubSub.Subscribe(context.Background(), testTopic)
if err != nil {
t.Fatalf("failed to subscribe: %v", err)
}

endpoint := New(999999, func(m *testMessage) (*message.Message, error) {
payload, err := json.Marshal(m)
if err != nil {
return nil, fmt.Errorf("failed to encode: %w", err)
}
return message.NewMessage(watermill.NewUUID(), payload), nil
}, testTopic, pubSub)

request := httptest.NewRequest(
http.MethodPost,
"/url/path",
bytes.NewReader(m.ToBytes(t)),
)
w := httptest.NewRecorder()
endpoint(w, request)

if err = validateEndpointResponse(w.Result()); err != nil {
t.Fatalf("HTTP test request failed: %v", err)
}

replayedMessage := <-messages
var rm *testMessage
if err = json.Unmarshal(replayedMessage.Payload, &rm); err != nil {
t.Fatalf("cannot decode replayed message: %v", err)
}

if rm.Title != m.Title {
t.Fatalf("title mismatch: %q vs %q", rm.Title, rm.Title)
}
if rm.Number != m.Number {
t.Fatalf("number mismatch: %q vs %q", rm.Number, rm.Number)
}
}

func validateEndpointResponse(response *http.Response) (err error) {
defer response.Body.Close()

data, err := ioutil.ReadAll(response.Body)
if response.StatusCode != http.StatusOK {
return fmt.Errorf("HTTP test request failed: status code is not OK: %d", response.StatusCode)
}

var values map[string]string
if err = json.Unmarshal(data, &values); err != nil {
return fmt.Errorf("JSON decoding failure for %q: %w", data, err)
}
if _, ok := values["UUID"]; !ok {
return errors.New("response does not contain new message UUID")
}
return nil
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/ThreeDotsLabs/watermill

go 1.17
go 1.18
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Required for generics support


require (
github.com/cenkalti/backoff/v3 v3.2.2
Expand Down