diff --git a/components/jsonendpoint/endpoint.go b/components/jsonendpoint/endpoint.go new file mode 100644 index 000000000..4d2496ba3 --- /dev/null +++ b/components/jsonendpoint/endpoint.go @@ -0,0 +1,131 @@ +/* + +Package jsonendpoint provides a generic adapter that converts a validated JSON request into a [message.Message] published to a certain topic. Use together with an HTTP router to build gateways to any [message.Publisher]. + +# Usage Example + +Imagine a fairly standard situation of collecting newsletter signups from a Restful API: + + type NewsletterSignup struct { + Name string + Email string + } + + // Validate satisfies [Validator] interface. + func (n *NewsletterSignup) Validate() error { + if n.Name == "" || n.Email == "" { + return errors.New("newsletter signup requires both name and email address") + } + return nil + } + + func main() { + pubSub := gochannel.NewGoChannel(gochannel.Config{ + OutputChannelBuffer: 100, + Persistent: true, + }, + watermill.NewStdLogger(true, true), + ) + + endpoint := New( + 1024*1024, // HTTP readLimit + func(m *NewsletterSignup) (*message.Message, error) { // converter + payload, err := json.Marshal(m) + if err != nil { + return nil, fmt.Errorf("failed to encode: %w", err) + } + return message.NewMessage(watermill.NewUUID(), payload), nil + }, + "newsletter/signup", // Watermill topic + pubSub) + + // ... setup HTTP server and router + router.Post("/api/v1/newsletter/signup", endpoint) + } + +JSON HTTP post requests that hit "/api/v1/newsletter/signup" will get parsed, validated, and converted into a [message.Message]. +*/ +package jsonendpoint + +import ( + "encoding/json" + "errors" + "fmt" + "net/http" + + "github.com/ThreeDotsLabs/watermill/message" +) + +// Validatable is a generic interface that requires type T to be a pointer and implement the Validate method. It complements the adapter definitions. See . +type Validatable[T any] interface { + *T + Validate() error +} + +// New creates an adapter that converts an HTTP request to a [message.Message] and sends it to the [message.Publisher] topic. Enforces message validation on the generic type. +func New[T any, P Validatable[T]](readLimit int64, converter func(P) (*message.Message, error), topic string, p message.Publisher) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + var ( + in = new(T) //*IN + err error + ) + defer func() { + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + _ = json.NewEncoder(w).Encode(map[string]string{ + "Error": err.Error(), + }) + } + }() + + if r.Method != http.MethodPost { + err = fmt.Errorf("request method %q is not supported", r.Method) + return + } + + reader := http.MaxBytesReader(w, r.Body, readLimit) + defer reader.Close() + + err = json.NewDecoder(reader).Decode(in) + if err != nil { + r.Body.Close() + err = errors.New("JSON decoding failure: " + err.Error()) + return + } + r.Body.Close() + + if in == nil { + err = errors.New("no post data provided") + return + } + + if err = P(in).Validate(); err != nil { + err = fmt.Errorf("failed to validate: %w", err) + return + } + + message, err := converter(in) + if err != nil { + err = fmt.Errorf("failed to construct a message: %w", err) + return + } + message.SetContext(r.Context()) + + if err = p.Publish(topic, message); err != nil { + err = fmt.Errorf("publisher rejected the message: %w", err) + return + } + + response, err := json.Marshal(map[string]string{ + "UUID": message.UUID, + }) + if err != nil { + r.Body.Close() + err = errors.New("JSON encoding failure: " + err.Error()) + return + } + + _, err = w.Write(response) + } +} diff --git a/components/jsonendpoint/endpoint_test.go b/components/jsonendpoint/endpoint_test.go new file mode 100644 index 000000000..a7664e0be --- /dev/null +++ b/components/jsonendpoint/endpoint_test.go @@ -0,0 +1,117 @@ +package jsonendpoint + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "net/http" + "net/http/httptest" + "testing" + + "github.com/ThreeDotsLabs/watermill" + "github.com/ThreeDotsLabs/watermill/message" + "github.com/ThreeDotsLabs/watermill/pubsub/gochannel" +) + +const testTopic = "testTopic" + +type testMessage struct { + Title string + Number int +} + +func (m *testMessage) ToBytes(t *testing.T) []byte { + result, err := json.Marshal(m) + if err != nil { + t.Fatalf("failed to encode to bytes: %v", err) + } + return result +} + +func (m *testMessage) Validate() error { + if m.Title == "" { + return errors.New("title is required") + } + if m.Number == 0 { + return errors.New("number is required") + } + return nil +} + +func TestEndpointCreationIntegration(t *testing.T) { + pubSub := gochannel.NewGoChannel(gochannel.Config{ + OutputChannelBuffer: 100, + Persistent: false, + }, + watermill.NewStdLogger(true, true), + ) + + m := &testMessage{ + Title: "title", + Number: 9, + } + + // err := pubSub.Publish(testTopic, m) + // if err != nil { + // t.Fatalf("cannot publish the test message: %v", err) + // } + + messages, err := pubSub.Subscribe(context.Background(), testTopic) + if err != nil { + t.Fatalf("failed to subscribe: %v", err) + } + + endpoint := New(999999, func(m *testMessage) (*message.Message, error) { + payload, err := json.Marshal(m) + if err != nil { + return nil, fmt.Errorf("failed to encode: %w", err) + } + return message.NewMessage(watermill.NewUUID(), payload), nil + }, testTopic, pubSub) + + request := httptest.NewRequest( + http.MethodPost, + "/url/path", + bytes.NewReader(m.ToBytes(t)), + ) + w := httptest.NewRecorder() + endpoint(w, request) + + if err = validateEndpointResponse(w.Result()); err != nil { + t.Fatalf("HTTP test request failed: %v", err) + } + + replayedMessage := <-messages + var rm *testMessage + if err = json.Unmarshal(replayedMessage.Payload, &rm); err != nil { + t.Fatalf("cannot decode replayed message: %v", err) + } + + if rm.Title != m.Title { + t.Fatalf("title mismatch: %q vs %q", rm.Title, rm.Title) + } + if rm.Number != m.Number { + t.Fatalf("number mismatch: %q vs %q", rm.Number, rm.Number) + } +} + +func validateEndpointResponse(response *http.Response) (err error) { + defer response.Body.Close() + + data, err := ioutil.ReadAll(response.Body) + if response.StatusCode != http.StatusOK { + return fmt.Errorf("HTTP test request failed: status code is not OK: %d", response.StatusCode) + } + + var values map[string]string + if err = json.Unmarshal(data, &values); err != nil { + return fmt.Errorf("JSON decoding failure for %q: %w", data, err) + } + if _, ok := values["UUID"]; !ok { + return errors.New("response does not contain new message UUID") + } + return nil +} diff --git a/go.mod b/go.mod index 694d1b08c..d59afca59 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/ThreeDotsLabs/watermill -go 1.17 +go 1.18 require ( github.com/cenkalti/backoff/v3 v3.2.2