From ddf98a754290057cd71acd4b88efb1514921aa2b Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Tue, 7 Jan 2025 08:10:46 -0800 Subject: [PATCH 01/10] adding items to cart works, need to implement remove and payment/checkout --- go.mod | 1 + go.sum | 2 + shoppingcart/activities.go | 54 ++++++++++ shoppingcart/starter/main.go | 33 ++++++ shoppingcart/webapp/main.go | 183 +++++++++++++++++++++++++++++++++ shoppingcart/websocket/main.go | 183 +++++++++++++++++++++++++++++++++ shoppingcart/worker/main.go | 30 ++++++ shoppingcart/workflow.go | 66 ++++++++++++ shoppingcart/workflow_test.go | 1 + 9 files changed, 553 insertions(+) create mode 100644 shoppingcart/activities.go create mode 100644 shoppingcart/starter/main.go create mode 100644 shoppingcart/webapp/main.go create mode 100644 shoppingcart/websocket/main.go create mode 100644 shoppingcart/worker/main.go create mode 100644 shoppingcart/workflow.go create mode 100644 shoppingcart/workflow_test.go diff --git a/go.mod b/go.mod index 541dca17..873cb8e3 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/golang/mock v1.7.0-rc.1 github.com/golang/snappy v0.0.4 github.com/google/uuid v1.6.0 + github.com/gorilla/websocket v1.5.0 github.com/hashicorp/go-plugin v1.4.5 github.com/nexus-rpc/sdk-go v0.1.0 github.com/opentracing/opentracing-go v1.2.0 diff --git a/go.sum b/go.sum index 4d00d696..887baadb 100644 --- a/go.sum +++ b/go.sum @@ -141,6 +141,8 @@ github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+ github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grpc-ecosystem/go-grpc-middleware v1.3.0/go.mod h1:z0ButlSOZa5vEBq9m2m2hlwIgKw+rp3sdCBRoJY+30Y= github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 h1:UH//fgunKIs4JdUbpDl1VZCDaL56wXCB/5+wF6uHfaI= github.com/grpc-ecosystem/go-grpc-middleware v1.4.0/go.mod h1:g5qyo/la0ALbONm6Vbp88Yd8NsDy6rZz+RcrMPxvld8= diff --git a/shoppingcart/activities.go b/shoppingcart/activities.go new file mode 100644 index 00000000..e74ddfcc --- /dev/null +++ b/shoppingcart/activities.go @@ -0,0 +1,54 @@ +package shoppingcart + +// +//import ( +// "context" +// "errors" +// "io" +// "net/http" +// //"go.temporal.io/sdk/activity" +//) +// +////func ValidateProductAvailability(ctx context.Context, itemID string) { +//// +////} +// +////func CreateCart(ctx context.Context) (string, error) { +//// resp, err := http.Get(shoppingServerHostPort + "/initialize_cart?is_api_call=true") +//// if err != nil { +//// return "", err +//// } +//// body, err := io.ReadAll(resp.Body) +//// _ = resp.Body.Close() +//// if err != nil { +//// return err +//// } +//// activity.GetLogger(ctx).Info("Cart initialized") +//// // server should generate unique ID for cart +//// return nil +////} +// +//func AddToCart(ctx context.Context, itemID string) error { +// if itemID == "" { +// return errors.New("itemID cannot be blank") +// } +// +// resp, err := http.Get("http://localhost:8099" + "/add?is_api_call=true&item_id=" + itemID) +// if err != nil { +// return err +// } +// body, err := io.ReadAll(resp.Body) +// _ = resp.Body.Close() +// if err != nil { +// return err +// } +// +// // TODO: process body +// //if string(body) +//} +// +//func ProcessPayment(ctx context.Context) {} +// +//func UpdateOrderStatus(ctx context.Context, orderID string, status string) {} +// +//func UpdateShippingStatus(ctx context.Context, shippingStatus string) {} diff --git a/shoppingcart/starter/main.go b/shoppingcart/starter/main.go new file mode 100644 index 00000000..de1c7942 --- /dev/null +++ b/shoppingcart/starter/main.go @@ -0,0 +1,33 @@ +package main + +import ( + "context" + "log" + + shoppingcart "github.com/temporalio/samples-go/shoppingcart" + "go.temporal.io/sdk/client" +) + +func main() { + // The client is a heavyweight object that should be created once per process. + c, err := client.Dial(client.Options{ + HostPort: client.DefaultHostPort, + }) + if err != nil { + log.Fatalln("Unable to create client", err) + } + defer c.Close() + + //shoppingCartID := uuid.New() + workflowOptions := client.StartWorkflowOptions{ + ID: "shopping_cart_1", // + shoppingCartID, + TaskQueue: "shopping_cart", + } + + we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, shoppingcart.CartWorkflow) + if err != nil { + log.Fatalln("Unable to execute workflow", err) + } + log.Println("Started workflow", "WorkflowID", we.GetID(), "RunID", we.GetRunID()) + +} diff --git a/shoppingcart/webapp/main.go b/shoppingcart/webapp/main.go new file mode 100644 index 00000000..03530533 --- /dev/null +++ b/shoppingcart/webapp/main.go @@ -0,0 +1,183 @@ +package main + +import ( + "fmt" + "github.com/gorilla/websocket" + "go.temporal.io/sdk/client" + "net/http" + "os" + "sort" + "sync" +) + +var ( + cartState = make(map[string]int) // id -> itemName -> number + workflowClient client.Client + itemCosts = map[string]int{ + "apple": 2, + "banana": 1, + "watermelon": 5, + "television": 1000, + "house": 10000000, + "car": 50000, + "binder": 10, + } +) + +type WebSocketServer struct { + clients map[string]*websocket.Conn + mu sync.Mutex + broadcast chan WebSocketMessage +} + +type WebSocketMessage struct { + UserID string `json:"user_id"` + Event string `json:"event"` + Data any `json:"data"` +} + +func NewWebSocketServer() *WebSocketServer { + return &WebSocketServer{ + clients: make(map[string]*websocket.Conn), + broadcast: make(chan WebSocketMessage), + } +} + +func (s *WebSocketServer) handleConnections(w http.ResponseWriter, r *http.Request) { + upgrader := websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { + return true + }, + } + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + fmt.Println("Error upgrading connection:", err) + return + } + defer conn.Close() + + // Assume a user ID is passed as a query param for simplicity + userID := r.URL.Query().Get("user_id") + if userID == "" { + fmt.Println("Missing user_id in query parameters") + return + } + + // Register the client + s.mu.Lock() + s.clients[userID] = conn + s.mu.Unlock() + + fmt.Printf("Client connected: %s\n", userID) + + // Keep connection open until closed by the client + for { + if _, _, err := conn.NextReader(); err != nil { + break + } + } + + // Unregister the client + s.mu.Lock() + delete(s.clients, userID) + s.mu.Unlock() + fmt.Printf("Client disconnected: %s\n", userID) +} + +func (s *WebSocketServer) handleMessages() { + for msg := range s.broadcast { + s.mu.Lock() + conn, exists := s.clients[msg.UserID] + s.mu.Unlock() + if !exists { + fmt.Printf("User %s not connected\n", msg.UserID) + continue + } + + if err := conn.WriteJSON(msg); err != nil { + fmt.Printf("Error sending message to user %s: %v\n", msg.UserID, err) + s.mu.Lock() + delete(s.clients, msg.UserID) + s.mu.Unlock() + } + } +} + +func (s *WebSocketServer) SendMessage(userID, event string, data any) { + s.broadcast <- WebSocketMessage{UserID: userID, Event: event, Data: data} +} + +func main() { + wsServer := NewWebSocketServer() + + var err error + workflowClient, err = client.Dial(client.Options{ + HostPort: client.DefaultHostPort, + }) + if err != nil { + panic(err) + } + + fmt.Println("Starting dummy server...") + http.HandleFunc("/", listHandler) + http.HandleFunc("/ws", wsServer.handleConnections) + + go wsServer.handleMessages() + + fmt.Println("WebSocket server started on :8080") + if err := http.ListenAndServe(":8080", nil); err != nil { + fmt.Println("Error starting WebSocket server:", err) + } +} + +func listHandler(w http.ResponseWriter, r *http.Request) { + // read in javascript that handles websocket + fileContents, err := os.ReadFile("shoppingcart/home.html") + if err != nil { + http.Error(w, "Could not read shoppingcart/home.html", http.StatusInternalServerError) + fmt.Println("Error reading shoppingcart/home.html:", err) + return + } + + // Write the contents to the HTTP response + w.Header().Set("Content-Type", "text/html") // Set the content type to HTML + _, _ = fmt.Fprint(w, "") + _, _ = fmt.Fprintf(w, "%s", fileContents) + _, _ = fmt.Fprint(w, "

DUMMY SHOPPING WEBSITE

"+ + "HOME"+ + "TODO:Payment"+ + "TODO:Shipment"+ + "

Available Items to Purchase

") + + // and at the end of the workflow the server will send return data to the client/website + keys := make([]string, 0) + count := 0 + for k, _ := range itemCosts { + count += 1 + keys = append(keys, k) + } + sort.Strings(keys) + for _, k := range keys { + //actionButton := fmt.Sprintf(""+ + // "", k) + actionButton := fmt.Sprintf("", k) + + _, _ = fmt.Fprintf(w, "", k, itemCosts[k], actionButton) + } + + _, _ = fmt.Fprint(w, "
ItemCostAction
%s%d%s
") + + //_, _ = fmt.Fprint(w, "

Current items in cart:

"+ + // "") + // + //// TODO: List current items in cart + //// TODO: query from websocket? + //for key, val := range cartState { + // // TODO: add remove action + // _, _ = fmt.Fprintf(w, "", key, val) + //} + //_, _ = fmt.Fprint(w, "
ItemQuantityAction
%s%d
") +} diff --git a/shoppingcart/websocket/main.go b/shoppingcart/websocket/main.go new file mode 100644 index 00000000..cb75c864 --- /dev/null +++ b/shoppingcart/websocket/main.go @@ -0,0 +1,183 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "log" + "net/http" + "sync" + + "github.com/gorilla/websocket" + shoppingcart "github.com/temporalio/samples-go/shoppingcart" + "go.temporal.io/sdk/client" +) + +var upgrader = websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { + return true // Adjust for production + }, +} + +// WebSocketMessage defines the structure of the message sent by the web app +type WebSocketMessage struct { + Action string `json:"action"` // "add" or "remove" + ItemID string `json:"item_id"` + Quantity int `json:"quantity"` +} + +type CartStatusMessage struct { + Action string `json:"action"` + Data CartState `json:"data"` +} + +// CartSignalPayload is the payload structure for Temporal signals +//type CartSignalPayload struct { +// Action string `json:"action"` // "add" or "remove" +// ItemID string `json:"item_id"` +// Quantity int `json:"quantity"` +//} + +// WebSocketServer holds the WebSocket connections and Temporal client +type WebSocketServer struct { + connections map[string]*websocket.Conn // Map of user_id to WebSocket connection + mu sync.Mutex + temporalClient client.Client +} + +type CartState map[string]int // itemID -> quantity + +// NewWebSocketServer creates a new WebSocket server instance +func NewWebSocketServer(temporalClient client.Client) *WebSocketServer { + return &WebSocketServer{ + connections: make(map[string]*websocket.Conn), + temporalClient: temporalClient, + } +} + +// HandleConnections manages incoming WebSocket connections +func (s *WebSocketServer) HandleConnections(w http.ResponseWriter, r *http.Request) { + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + log.Println("Error upgrading connection:", err) + return + } + defer conn.Close() + + userID := r.URL.Query().Get("user_id") + if userID == "" { + log.Println("user_id is missing in the query parameters") + return + } + + // Register the connection + s.mu.Lock() + s.connections[userID] = conn + s.mu.Unlock() + defer func() { + s.mu.Lock() + delete(s.connections, userID) + s.mu.Unlock() + }() + + log.Printf("WebSocket connection established for user: %s", userID) + + // Handle incoming messages + for { + log.Println("calling conn.ReadMessage()") + _, message, err := conn.ReadMessage() + if err != nil { + log.Println("Error reading WebSocket message:", err) + break + } + + // Handle the WebSocket message + s.handleMessage(userID, message) + } +} + +// handleMessage processes incoming WebSocket messages and triggers Temporal signals +func (s *WebSocketServer) handleMessage(userID string, message []byte) { + // Parse the WebSocket message + var msg WebSocketMessage + if err := json.Unmarshal(message, &msg); err != nil { + log.Println("Error parsing WebSocket message:", err) + return + } + workflowID := fmt.Sprintf("shopping_cart_%s", userID) + + switch msg.Action { + case "add": + // Signal to add an item to the cart + signalPayload := shoppingcart.CartSignalPayload{ + Action: "add", + ItemID: msg.ItemID, + Quantity: msg.Quantity, + } + log.Println("Sending signal payload", workflowID, signalPayload) + err := s.temporalClient.SignalWorkflow(context.Background(), workflowID, "", "cart_signal", signalPayload) + if err != nil { + log.Println("Error signaling workflow:", err) + } + + // TODO: query the cart and push signal back to webapp + var cartState CartState + resp, err := s.temporalClient.QueryWorkflow(context.Background(), workflowID, "", "get_cart") + if err != nil { + log.Println("Error querying workflow:", err) + return + } + if err := resp.Get(&cartState); err != nil { + log.Fatalln("Unable to decode query result", err) + } + + // Send the cart state back to the WebSocket client + response := CartStatusMessage{ + Action: "cart_state", + Data: cartState, + } + conn := s.connections[userID] + if conn != nil { + conn.WriteJSON(response) + } + case "get_cart": + // Query the cart state + var cartState CartState + resp, err := s.temporalClient.QueryWorkflow(context.Background(), workflowID, "", "get_cart") + if err != nil { + log.Println("Error querying workflow:", err) + return + } + if err := resp.Get(&cartState); err != nil { + log.Fatalln("Unable to decode query result", err) + } + + // Send the cart state back to the WebSocket client + response := CartStatusMessage{ + Action: "cart_state", + Data: cartState, + } + conn := s.connections[userID] + if conn != nil { + conn.WriteJSON(response) + } + default: + log.Printf("Unknown action: %s\n", msg.Action) + } +} + +func main() { + c, err := client.Dial(client.Options{}) + if err != nil { + log.Fatalf("Error creating Temporal client: %v", err) + } + defer c.Close() + + server := NewWebSocketServer(c) + + http.HandleFunc("/ws", server.HandleConnections) + log.Println("WebSocket server is running on ws://localhost:8089/ws") + if err := http.ListenAndServe(":8089", nil); err != nil { + log.Fatalf("Error starting WebSocket server: %v", err) + } +} diff --git a/shoppingcart/worker/main.go b/shoppingcart/worker/main.go new file mode 100644 index 00000000..e4c393b7 --- /dev/null +++ b/shoppingcart/worker/main.go @@ -0,0 +1,30 @@ +package main + +import ( + "log" + + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/worker" + + shoppingcart "github.com/temporalio/samples-go/shoppingcart" +) + +func main() { + // The client and worker are heavyweight objects that should be created once per process. + c, err := client.Dial(client.Options{ + HostPort: client.DefaultHostPort, + }) + if err != nil { + log.Fatalln("Unable to create client", err) + } + defer c.Close() + + w := worker.New(c, "shopping_cart", worker.Options{}) + + w.RegisterWorkflow(shoppingcart.CartWorkflow) + + err = w.Run(worker.InterruptCh()) + if err != nil { + log.Fatalln("Unable to start worker", err) + } +} diff --git a/shoppingcart/workflow.go b/shoppingcart/workflow.go new file mode 100644 index 00000000..567e8f6c --- /dev/null +++ b/shoppingcart/workflow.go @@ -0,0 +1,66 @@ +package shoppingcart + +import ( + "fmt" + "go.temporal.io/sdk/workflow" +) + +var ( + shoppingServerHostPort = "http://localhost:8099" +) + +type CartSignalPayload struct { + Action string `json:"action"` // "add" or "remove" + ItemID string `json:"item_id"` + Quantity int `json:"quantity"` +} + +type CartState map[string]int // itemID -> quantity + +func CartWorkflow(ctx workflow.Context) error { + cart := make(CartState) + cart["apple"] = 1 + + // Signal channel for cart updates + signalChannel := workflow.GetSignalChannel(ctx, "cart_signal") + + // Register a query handler to get the cart state + workflow.SetQueryHandler(ctx, "get_cart", func() (CartState, error) { + return cart, nil + }) + + workflow.GetLogger(ctx).Info("CartWorkflow started. Listening for signals...") + + // Listen for signals and update the cart state in a loop + for { + var payload CartSignalPayload + fmt.Println("[SignalPayload]", payload) + // Block until a signal is received + signalChannel.Receive(ctx, &payload) + + // Process the received signal + switch payload.Action { + case "add": + if payload.Quantity <= 0 { + delete(cart, payload.ItemID) + } else { + cart[payload.ItemID] += payload.Quantity + } + workflow.GetLogger(ctx).Info("Item added to cart", "item_id", payload.ItemID, "quantity", payload.Quantity) + + case "remove": + delete(cart, payload.ItemID) + workflow.GetLogger(ctx).Info("Item removed from cart", "item_id", payload.ItemID) + + default: + workflow.GetLogger(ctx).Warn("Unknown action received", "action", payload.Action) + } + + // Yield control to allow Temporal to process other tasks + //workflow.Yield(ctx) + } + + // This return statement is unreachable because the loop runs indefinitely. + // You can add logic to break the loop if needed (e.g., based on a "stop" signal). + return nil +} diff --git a/shoppingcart/workflow_test.go b/shoppingcart/workflow_test.go new file mode 100644 index 00000000..03c234c0 --- /dev/null +++ b/shoppingcart/workflow_test.go @@ -0,0 +1 @@ +package shoppingcart From 8a0ab4f34077ee8d92b9361f40bfa69da55b61b9 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Tue, 7 Jan 2025 08:42:46 -0800 Subject: [PATCH 02/10] clean up code, add a README --- shoppingcart/README.md | 29 +++++++++++ shoppingcart/activities.go | 54 -------------------- shoppingcart/starter/main.go | 5 +- shoppingcart/webapp/main.go | 86 ++++++++++++++------------------ shoppingcart/websocket/main.go | 91 ++++++++++++---------------------- shoppingcart/worker/main.go | 2 +- 6 files changed, 102 insertions(+), 165 deletions(-) create mode 100644 shoppingcart/README.md delete mode 100644 shoppingcart/activities.go diff --git a/shoppingcart/README.md b/shoppingcart/README.md new file mode 100644 index 00000000..8ab7dd8c --- /dev/null +++ b/shoppingcart/README.md @@ -0,0 +1,29 @@ +# Shopping Cart + +This sample workflow shows how a shopping cart application can be implemented. +Note that this program uses websockets to communicate between the webapp and +the Temporal service. + +The shopping cart is represented as a workflow, maintaining the state of the +cart, and the web socket server updates the carts with signals, and retrieves +the cart state with a query. See [workflow message passing](https://docs.temporal.io/encyclopedia/workflow-message-passing) +on the difference between queries and signals. + +### Steps to run this sample: +1) Run a [Temporal service](https://github.com/temporalio/samples-go/tree/main/#how-to-use). +2) Run the following command to start the worker +``` +go run shoppingcart/worker/main.go +``` +3) Run the following command to start the web socket server +``` +go run shoppingcart/websocket/main.go +``` +4) Run the following command to start the web app +``` +go run shoppingcart/webapp/main.go +``` +5) Run the following command to start the workflow execution +``` +go run shoppingcart/starter/main.go +``` \ No newline at end of file diff --git a/shoppingcart/activities.go b/shoppingcart/activities.go deleted file mode 100644 index e74ddfcc..00000000 --- a/shoppingcart/activities.go +++ /dev/null @@ -1,54 +0,0 @@ -package shoppingcart - -// -//import ( -// "context" -// "errors" -// "io" -// "net/http" -// //"go.temporal.io/sdk/activity" -//) -// -////func ValidateProductAvailability(ctx context.Context, itemID string) { -//// -////} -// -////func CreateCart(ctx context.Context) (string, error) { -//// resp, err := http.Get(shoppingServerHostPort + "/initialize_cart?is_api_call=true") -//// if err != nil { -//// return "", err -//// } -//// body, err := io.ReadAll(resp.Body) -//// _ = resp.Body.Close() -//// if err != nil { -//// return err -//// } -//// activity.GetLogger(ctx).Info("Cart initialized") -//// // server should generate unique ID for cart -//// return nil -////} -// -//func AddToCart(ctx context.Context, itemID string) error { -// if itemID == "" { -// return errors.New("itemID cannot be blank") -// } -// -// resp, err := http.Get("http://localhost:8099" + "/add?is_api_call=true&item_id=" + itemID) -// if err != nil { -// return err -// } -// body, err := io.ReadAll(resp.Body) -// _ = resp.Body.Close() -// if err != nil { -// return err -// } -// -// // TODO: process body -// //if string(body) -//} -// -//func ProcessPayment(ctx context.Context) {} -// -//func UpdateOrderStatus(ctx context.Context, orderID string, status string) {} -// -//func UpdateShippingStatus(ctx context.Context, shippingStatus string) {} diff --git a/shoppingcart/starter/main.go b/shoppingcart/starter/main.go index de1c7942..c081878c 100644 --- a/shoppingcart/starter/main.go +++ b/shoppingcart/starter/main.go @@ -4,7 +4,7 @@ import ( "context" "log" - shoppingcart "github.com/temporalio/samples-go/shoppingcart" + "github.com/temporalio/samples-go/shoppingcart" "go.temporal.io/sdk/client" ) @@ -18,9 +18,8 @@ func main() { } defer c.Close() - //shoppingCartID := uuid.New() workflowOptions := client.StartWorkflowOptions{ - ID: "shopping_cart_1", // + shoppingCartID, + ID: "shopping_cart_1", TaskQueue: "shopping_cart", } diff --git a/shoppingcart/webapp/main.go b/shoppingcart/webapp/main.go index 03530533..71bc3740 100644 --- a/shoppingcart/webapp/main.go +++ b/shoppingcart/webapp/main.go @@ -11,7 +11,7 @@ import ( ) var ( - cartState = make(map[string]int) // id -> itemName -> number + cartState = make(map[string]int) workflowClient client.Client itemCosts = map[string]int{ "apple": 2, @@ -25,21 +25,22 @@ var ( ) type WebSocketServer struct { - clients map[string]*websocket.Conn - mu sync.Mutex - broadcast chan WebSocketMessage + // TODO: This can probably be simplified to a single websocket.Conn + clients map[string]*websocket.Conn + mu sync.Mutex + //broadcast chan WebSocketMessage } -type WebSocketMessage struct { - UserID string `json:"user_id"` - Event string `json:"event"` - Data any `json:"data"` -} +//type WebSocketMessage struct { +// UserID string `json:"user_id"` +// Event string `json:"event"` +// Data any `json:"data"` +//} func NewWebSocketServer() *WebSocketServer { return &WebSocketServer{ - clients: make(map[string]*websocket.Conn), - broadcast: make(chan WebSocketMessage), + clients: make(map[string]*websocket.Conn), + //broadcast: make(chan WebSocketMessage), } } @@ -84,28 +85,28 @@ func (s *WebSocketServer) handleConnections(w http.ResponseWriter, r *http.Reque fmt.Printf("Client disconnected: %s\n", userID) } -func (s *WebSocketServer) handleMessages() { - for msg := range s.broadcast { - s.mu.Lock() - conn, exists := s.clients[msg.UserID] - s.mu.Unlock() - if !exists { - fmt.Printf("User %s not connected\n", msg.UserID) - continue - } - - if err := conn.WriteJSON(msg); err != nil { - fmt.Printf("Error sending message to user %s: %v\n", msg.UserID, err) - s.mu.Lock() - delete(s.clients, msg.UserID) - s.mu.Unlock() - } - } -} - -func (s *WebSocketServer) SendMessage(userID, event string, data any) { - s.broadcast <- WebSocketMessage{UserID: userID, Event: event, Data: data} -} +//func (s *WebSocketServer) handleMessages() { +// for msg := range s.broadcast { +// s.mu.Lock() +// conn, exists := s.clients[msg.UserID] +// s.mu.Unlock() +// if !exists { +// fmt.Printf("User %s not connected\n", msg.UserID) +// continue +// } +// +// if err := conn.WriteJSON(msg); err != nil { +// fmt.Printf("Error sending message to user %s: %v\n", msg.UserID, err) +// s.mu.Lock() +// delete(s.clients, msg.UserID) +// s.mu.Unlock() +// } +// } +//} + +//func (s *WebSocketServer) SendMessage(userID, event string, data any) { +// s.broadcast <- WebSocketMessage{UserID: userID, Event: event, Data: data} +//} func main() { wsServer := NewWebSocketServer() @@ -122,7 +123,7 @@ func main() { http.HandleFunc("/", listHandler) http.HandleFunc("/ws", wsServer.handleConnections) - go wsServer.handleMessages() + //go wsServer.handleMessages() fmt.Println("WebSocket server started on :8080") if err := http.ListenAndServe(":8080", nil); err != nil { @@ -131,7 +132,7 @@ func main() { } func listHandler(w http.ResponseWriter, r *http.Request) { - // read in javascript that handles websocket + // read in javascript that handles websocket logic fileContents, err := os.ReadFile("shoppingcart/home.html") if err != nil { http.Error(w, "Could not read shoppingcart/home.html", http.StatusInternalServerError) @@ -139,7 +140,7 @@ func listHandler(w http.ResponseWriter, r *http.Request) { return } - // Write the contents to the HTTP response + // Generate HTML w.Header().Set("Content-Type", "text/html") // Set the content type to HTML _, _ = fmt.Fprint(w, "") _, _ = fmt.Fprintf(w, "%s", fileContents) @@ -149,7 +150,6 @@ func listHandler(w http.ResponseWriter, r *http.Request) { "TODO:Shipment"+ "

Available Items to Purchase

") - // and at the end of the workflow the server will send return data to the client/website keys := make([]string, 0) count := 0 for k, _ := range itemCosts { @@ -158,8 +158,6 @@ func listHandler(w http.ResponseWriter, r *http.Request) { } sort.Strings(keys) for _, k := range keys { - //actionButton := fmt.Sprintf(""+ - // "", k) actionButton := fmt.Sprintf("
ItemCostAction
") - //_, _ = fmt.Fprint(w, "

Current items in cart:

"+ - // "") - // - //// TODO: List current items in cart - //// TODO: query from websocket? - //for key, val := range cartState { - // // TODO: add remove action - // _, _ = fmt.Fprintf(w, "", key, val) - //} - //_, _ = fmt.Fprint(w, "
ItemQuantityAction
%s%d
") } diff --git a/shoppingcart/websocket/main.go b/shoppingcart/websocket/main.go index cb75c864..72c891dc 100644 --- a/shoppingcart/websocket/main.go +++ b/shoppingcart/websocket/main.go @@ -9,17 +9,10 @@ import ( "sync" "github.com/gorilla/websocket" - shoppingcart "github.com/temporalio/samples-go/shoppingcart" + "github.com/temporalio/samples-go/shoppingcart" "go.temporal.io/sdk/client" ) -var upgrader = websocket.Upgrader{ - CheckOrigin: func(r *http.Request) bool { - return true // Adjust for production - }, -} - -// WebSocketMessage defines the structure of the message sent by the web app type WebSocketMessage struct { Action string `json:"action"` // "add" or "remove" ItemID string `json:"item_id"` @@ -31,23 +24,14 @@ type CartStatusMessage struct { Data CartState `json:"data"` } -// CartSignalPayload is the payload structure for Temporal signals -//type CartSignalPayload struct { -// Action string `json:"action"` // "add" or "remove" -// ItemID string `json:"item_id"` -// Quantity int `json:"quantity"` -//} - -// WebSocketServer holds the WebSocket connections and Temporal client type WebSocketServer struct { connections map[string]*websocket.Conn // Map of user_id to WebSocket connection mu sync.Mutex temporalClient client.Client } -type CartState map[string]int // itemID -> quantity +type CartState map[string]int -// NewWebSocketServer creates a new WebSocket server instance func NewWebSocketServer(temporalClient client.Client) *WebSocketServer { return &WebSocketServer{ connections: make(map[string]*websocket.Conn), @@ -57,6 +41,11 @@ func NewWebSocketServer(temporalClient client.Client) *WebSocketServer { // HandleConnections manages incoming WebSocket connections func (s *WebSocketServer) HandleConnections(w http.ResponseWriter, r *http.Request) { + upgrader := websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { + return true // Adjust for production + }, + } conn, err := upgrader.Upgrade(w, r, nil) if err != nil { log.Println("Error upgrading connection:", err) @@ -120,52 +109,38 @@ func (s *WebSocketServer) handleMessage(userID string, message []byte) { log.Println("Error signaling workflow:", err) } - // TODO: query the cart and push signal back to webapp - var cartState CartState - resp, err := s.temporalClient.QueryWorkflow(context.Background(), workflowID, "", "get_cart") - if err != nil { - log.Println("Error querying workflow:", err) - return - } - if err := resp.Get(&cartState); err != nil { - log.Fatalln("Unable to decode query result", err) - } - - // Send the cart state back to the WebSocket client - response := CartStatusMessage{ - Action: "cart_state", - Data: cartState, - } - conn := s.connections[userID] - if conn != nil { - conn.WriteJSON(response) - } + s.getCart(workflowID, userID) case "get_cart": - // Query the cart state - var cartState CartState - resp, err := s.temporalClient.QueryWorkflow(context.Background(), workflowID, "", "get_cart") - if err != nil { - log.Println("Error querying workflow:", err) - return - } - if err := resp.Get(&cartState); err != nil { - log.Fatalln("Unable to decode query result", err) - } - - // Send the cart state back to the WebSocket client - response := CartStatusMessage{ - Action: "cart_state", - Data: cartState, - } - conn := s.connections[userID] - if conn != nil { - conn.WriteJSON(response) - } + s.getCart(workflowID, userID) default: log.Printf("Unknown action: %s\n", msg.Action) } } +// Query the cart workflow and state back to the webapp +func (s *WebSocketServer) getCart(workflowID string, userID string) { + // Query the cart state + var cartState CartState + resp, err := s.temporalClient.QueryWorkflow(context.Background(), workflowID, "", "get_cart") + if err != nil { + log.Println("Error querying workflow:", err) + return + } + if err := resp.Get(&cartState); err != nil { + log.Fatalln("Unable to decode query result", err) + } + + // Send the cart state back to the WebSocket client + response := CartStatusMessage{ + Action: "cart_state", + Data: cartState, + } + conn := s.connections[userID] + if conn != nil { + conn.WriteJSON(response) + } +} + func main() { c, err := client.Dial(client.Options{}) if err != nil { diff --git a/shoppingcart/worker/main.go b/shoppingcart/worker/main.go index e4c393b7..39dfb213 100644 --- a/shoppingcart/worker/main.go +++ b/shoppingcart/worker/main.go @@ -6,7 +6,7 @@ import ( "go.temporal.io/sdk/client" "go.temporal.io/sdk/worker" - shoppingcart "github.com/temporalio/samples-go/shoppingcart" + "github.com/temporalio/samples-go/shoppingcart" ) func main() { From 9e4ddae1704ecb63c996584a78e742b7f52fc05d Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Thu, 9 Jan 2025 10:20:39 -0800 Subject: [PATCH 03/10] Stop using websockets, use Update-with-start to interact between web app and temporal workflow --- early-return/README.md | 9 +- shoppingcart/README.md | 29 +++--- shoppingcart/server/main.go | 137 ++++++++++++++++++++++++++ shoppingcart/starter/main.go | 32 ------ shoppingcart/webapp/main.go | 171 --------------------------------- shoppingcart/websocket/main.go | 158 ------------------------------ shoppingcart/worker/main.go | 2 +- shoppingcart/workflow.go | 58 ++++------- shoppingcart/workflow_test.go | 51 ++++++++++ 9 files changed, 229 insertions(+), 418 deletions(-) create mode 100644 shoppingcart/server/main.go delete mode 100644 shoppingcart/starter/main.go delete mode 100644 shoppingcart/webapp/main.go delete mode 100644 shoppingcart/websocket/main.go diff --git a/early-return/README.md b/early-return/README.md index ace156e9..24444062 100644 --- a/early-return/README.md +++ b/early-return/README.md @@ -5,9 +5,16 @@ This sample demonstrates an early-return from a workflow. By utilizing Update-with-Start, a client can start a new workflow and synchronously receive a response mid-workflow, while the workflow continues to run to completion. +See [shopping cart](https://github.com/temporalio/samples-go/tree/main/shoppingcart) +for Update-with-Start being used for lazy initialization. + ### Steps to run this sample: 1) Run a [Temporal service](https://github.com/temporalio/samples-go/tree/main/#how-to-use). -2) Run the following command to start the worker + + NOTE: frontend.enableExecuteMultiOperation=true must be configured for the server +in order to use Update-with-Start. + +5) Run the following command to start the worker ``` go run early-return/worker/main.go ``` diff --git a/shoppingcart/README.md b/shoppingcart/README.md index 8ab7dd8c..0517d8bb 100644 --- a/shoppingcart/README.md +++ b/shoppingcart/README.md @@ -1,29 +1,26 @@ # Shopping Cart This sample workflow shows how a shopping cart application can be implemented. -Note that this program uses websockets to communicate between the webapp and -the Temporal service. +This sample utilizes Update-with-Start and the `WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING` +option to start and continually update the workflow with the same Update-with-Start +call. This is also known as lazy-init. -The shopping cart is represented as a workflow, maintaining the state of the -cart, and the web socket server updates the carts with signals, and retrieves -the cart state with a query. See [workflow message passing](https://docs.temporal.io/encyclopedia/workflow-message-passing) -on the difference between queries and signals. +Another interesting Update-with-Start use case is +[early return](https://github.com/temporalio/samples-go/tree/main/early-return), +which supplements this sample and can be used to handle the transaction and payment +portion of this shopping cart scenario. ### Steps to run this sample: 1) Run a [Temporal service](https://github.com/temporalio/samples-go/tree/main/#how-to-use). + + NOTE: frontend.enableExecuteMultiOperation=true must be configured for the server +in order to use Update-with-Start. + 2) Run the following command to start the worker ``` go run shoppingcart/worker/main.go ``` -3) Run the following command to start the web socket server -``` -go run shoppingcart/websocket/main.go -``` -4) Run the following command to start the web app -``` -go run shoppingcart/webapp/main.go +3) Run the following command to start the web app ``` -5) Run the following command to start the workflow execution +go run shoppingcart/server/main.go ``` -go run shoppingcart/starter/main.go -``` \ No newline at end of file diff --git a/shoppingcart/server/main.go b/shoppingcart/server/main.go new file mode 100644 index 00000000..e495f1a9 --- /dev/null +++ b/shoppingcart/server/main.go @@ -0,0 +1,137 @@ +package main + +import ( + "context" + "fmt" + "github.com/temporalio/samples-go/shoppingcart" + enumspb "go.temporal.io/api/enums/v1" + "go.temporal.io/sdk/client" + "log" + "net/http" + "sort" +) + +var ( + cartState = make(map[string]int) + workflowClient client.Client + itemCosts = map[string]int{ + "apple": 2, + "banana": 1, + "watermelon": 5, + "television": 1000, + "house": 10000000, + "car": 50000, + "binder": 10, + } +) + +func main() { + var err error + workflowClient, err = client.Dial(client.Options{ + HostPort: client.DefaultHostPort, + }) + if err != nil { + panic(err) + } + + fmt.Println("Starting dummy server...") + http.HandleFunc("/", listHandler) + http.HandleFunc("/action", actionHandler) + + fmt.Println("Server started on :8080") + if err := http.ListenAndServe(":8080", nil); err != nil { + fmt.Println("Error starting WebSocket server:", err) + } +} + +func listHandler(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "text/html") // Set the content type to HTML + _, _ = fmt.Fprint(w, "

DUMMY SHOPPING WEBSITE

"+ + "HOME"+ + "TODO:Payment"+ + "

Available Items to Purchase

") + + keys := make([]string, 0) + for k := range itemCosts { + keys = append(keys, k) + } + sort.Strings(keys) + for _, k := range keys { + actionButton := fmt.Sprintf(""+ + "", k) + _, _ = fmt.Fprintf(w, "", k, itemCosts[k], actionButton) + } + _, _ = fmt.Fprint(w, "
ItemCostAction
%s%d%s

Current items in cart:

"+ + "") + + if len(cartState) == 0 { + updateWithStartCart("", "") + } + + // List current items in cart + keys = make([]string, 0) + for k := range cartState { + keys = append(keys, k) + } + sort.Strings(keys) + for _, k := range keys { + removeButton := fmt.Sprintf(""+ + "", k) + _, _ = fmt.Fprintf(w, "", k, cartState[k], removeButton) + } + _, _ = fmt.Fprint(w, "
ItemQuantityAction
%s%d%s
") +} + +func actionHandler(w http.ResponseWriter, r *http.Request) { + actionType := r.URL.Query().Get("type") + if actionType != "add" && actionType != "remove" && actionType != "" { + log.Fatalln("Invalid action type:", actionType) + } + id := r.URL.Query().Get("id") + + updateWithStartCart(actionType, id) + + if actionType != "" { + listHandler(w, r) + } +} + +func updateWithStartCart(actionType string, id string) { + ctx := context.Background() + startWorkflowOp := workflowClient.NewWithStartWorkflowOperation(client.StartWorkflowOptions{ + ID: "shopping-cart-workflow", + TaskQueue: shoppingcart.TaskQueueName, + // WorkflowIDConflictPolicy is required when using UpdateWithStartWorkflow. + // Here we use USE_EXISTING, because we want to reuse the running workflow, as it + // is long-running and keeping track of our cart state. + WorkflowIDConflictPolicy: enumspb.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING, + }, shoppingcart.CartWorkflow) + + updateOptions := client.UpdateWorkflowOptions{ + UpdateName: shoppingcart.UpdateName, + WaitForStage: client.WorkflowUpdateStageCompleted, + Args: []interface{}{actionType, id}, + } + option := client.UpdateWithStartWorkflowOptions{ + StartWorkflowOperation: startWorkflowOp, + UpdateOptions: updateOptions, + } + updateHandle, err := workflowClient.UpdateWithStartWorkflow(ctx, option) + if err != nil { + // For example, a client-side validation error (e.g. missing conflict + // policy or invalid workflow argument types in the start operation), or + // a server-side failure (e.g. failed to start workflow, or exceeded + // limit on concurrent update per workflow execution). + log.Fatalln("Error issuing update-with-start:", err) + } + + log.Println("Started workflow", + "WorkflowID:", updateHandle.WorkflowID(), + "RunID:", updateHandle.RunID()) + + // Always use a zero variable before calling Get for any Go SDK API + cartState = make(map[string]int) + if err = updateHandle.Get(ctx, &cartState); err != nil { + log.Fatalln("Error obtaining update result:", err) + } +} diff --git a/shoppingcart/starter/main.go b/shoppingcart/starter/main.go deleted file mode 100644 index c081878c..00000000 --- a/shoppingcart/starter/main.go +++ /dev/null @@ -1,32 +0,0 @@ -package main - -import ( - "context" - "log" - - "github.com/temporalio/samples-go/shoppingcart" - "go.temporal.io/sdk/client" -) - -func main() { - // The client is a heavyweight object that should be created once per process. - c, err := client.Dial(client.Options{ - HostPort: client.DefaultHostPort, - }) - if err != nil { - log.Fatalln("Unable to create client", err) - } - defer c.Close() - - workflowOptions := client.StartWorkflowOptions{ - ID: "shopping_cart_1", - TaskQueue: "shopping_cart", - } - - we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, shoppingcart.CartWorkflow) - if err != nil { - log.Fatalln("Unable to execute workflow", err) - } - log.Println("Started workflow", "WorkflowID", we.GetID(), "RunID", we.GetRunID()) - -} diff --git a/shoppingcart/webapp/main.go b/shoppingcart/webapp/main.go deleted file mode 100644 index 71bc3740..00000000 --- a/shoppingcart/webapp/main.go +++ /dev/null @@ -1,171 +0,0 @@ -package main - -import ( - "fmt" - "github.com/gorilla/websocket" - "go.temporal.io/sdk/client" - "net/http" - "os" - "sort" - "sync" -) - -var ( - cartState = make(map[string]int) - workflowClient client.Client - itemCosts = map[string]int{ - "apple": 2, - "banana": 1, - "watermelon": 5, - "television": 1000, - "house": 10000000, - "car": 50000, - "binder": 10, - } -) - -type WebSocketServer struct { - // TODO: This can probably be simplified to a single websocket.Conn - clients map[string]*websocket.Conn - mu sync.Mutex - //broadcast chan WebSocketMessage -} - -//type WebSocketMessage struct { -// UserID string `json:"user_id"` -// Event string `json:"event"` -// Data any `json:"data"` -//} - -func NewWebSocketServer() *WebSocketServer { - return &WebSocketServer{ - clients: make(map[string]*websocket.Conn), - //broadcast: make(chan WebSocketMessage), - } -} - -func (s *WebSocketServer) handleConnections(w http.ResponseWriter, r *http.Request) { - upgrader := websocket.Upgrader{ - CheckOrigin: func(r *http.Request) bool { - return true - }, - } - conn, err := upgrader.Upgrade(w, r, nil) - if err != nil { - fmt.Println("Error upgrading connection:", err) - return - } - defer conn.Close() - - // Assume a user ID is passed as a query param for simplicity - userID := r.URL.Query().Get("user_id") - if userID == "" { - fmt.Println("Missing user_id in query parameters") - return - } - - // Register the client - s.mu.Lock() - s.clients[userID] = conn - s.mu.Unlock() - - fmt.Printf("Client connected: %s\n", userID) - - // Keep connection open until closed by the client - for { - if _, _, err := conn.NextReader(); err != nil { - break - } - } - - // Unregister the client - s.mu.Lock() - delete(s.clients, userID) - s.mu.Unlock() - fmt.Printf("Client disconnected: %s\n", userID) -} - -//func (s *WebSocketServer) handleMessages() { -// for msg := range s.broadcast { -// s.mu.Lock() -// conn, exists := s.clients[msg.UserID] -// s.mu.Unlock() -// if !exists { -// fmt.Printf("User %s not connected\n", msg.UserID) -// continue -// } -// -// if err := conn.WriteJSON(msg); err != nil { -// fmt.Printf("Error sending message to user %s: %v\n", msg.UserID, err) -// s.mu.Lock() -// delete(s.clients, msg.UserID) -// s.mu.Unlock() -// } -// } -//} - -//func (s *WebSocketServer) SendMessage(userID, event string, data any) { -// s.broadcast <- WebSocketMessage{UserID: userID, Event: event, Data: data} -//} - -func main() { - wsServer := NewWebSocketServer() - - var err error - workflowClient, err = client.Dial(client.Options{ - HostPort: client.DefaultHostPort, - }) - if err != nil { - panic(err) - } - - fmt.Println("Starting dummy server...") - http.HandleFunc("/", listHandler) - http.HandleFunc("/ws", wsServer.handleConnections) - - //go wsServer.handleMessages() - - fmt.Println("WebSocket server started on :8080") - if err := http.ListenAndServe(":8080", nil); err != nil { - fmt.Println("Error starting WebSocket server:", err) - } -} - -func listHandler(w http.ResponseWriter, r *http.Request) { - // read in javascript that handles websocket logic - fileContents, err := os.ReadFile("shoppingcart/home.html") - if err != nil { - http.Error(w, "Could not read shoppingcart/home.html", http.StatusInternalServerError) - fmt.Println("Error reading shoppingcart/home.html:", err) - return - } - - // Generate HTML - w.Header().Set("Content-Type", "text/html") // Set the content type to HTML - _, _ = fmt.Fprint(w, "") - _, _ = fmt.Fprintf(w, "%s", fileContents) - _, _ = fmt.Fprint(w, "

DUMMY SHOPPING WEBSITE

"+ - "HOME"+ - "TODO:Payment"+ - "TODO:Shipment"+ - "

Available Items to Purchase

") - - keys := make([]string, 0) - count := 0 - for k, _ := range itemCosts { - count += 1 - keys = append(keys, k) - } - sort.Strings(keys) - for _, k := range keys { - actionButton := fmt.Sprintf("", k) - - _, _ = fmt.Fprintf(w, "", k, itemCosts[k], actionButton) - } - - _, _ = fmt.Fprint(w, "
ItemCostAction
%s%d%s
") - -} diff --git a/shoppingcart/websocket/main.go b/shoppingcart/websocket/main.go deleted file mode 100644 index 72c891dc..00000000 --- a/shoppingcart/websocket/main.go +++ /dev/null @@ -1,158 +0,0 @@ -package main - -import ( - "context" - "encoding/json" - "fmt" - "log" - "net/http" - "sync" - - "github.com/gorilla/websocket" - "github.com/temporalio/samples-go/shoppingcart" - "go.temporal.io/sdk/client" -) - -type WebSocketMessage struct { - Action string `json:"action"` // "add" or "remove" - ItemID string `json:"item_id"` - Quantity int `json:"quantity"` -} - -type CartStatusMessage struct { - Action string `json:"action"` - Data CartState `json:"data"` -} - -type WebSocketServer struct { - connections map[string]*websocket.Conn // Map of user_id to WebSocket connection - mu sync.Mutex - temporalClient client.Client -} - -type CartState map[string]int - -func NewWebSocketServer(temporalClient client.Client) *WebSocketServer { - return &WebSocketServer{ - connections: make(map[string]*websocket.Conn), - temporalClient: temporalClient, - } -} - -// HandleConnections manages incoming WebSocket connections -func (s *WebSocketServer) HandleConnections(w http.ResponseWriter, r *http.Request) { - upgrader := websocket.Upgrader{ - CheckOrigin: func(r *http.Request) bool { - return true // Adjust for production - }, - } - conn, err := upgrader.Upgrade(w, r, nil) - if err != nil { - log.Println("Error upgrading connection:", err) - return - } - defer conn.Close() - - userID := r.URL.Query().Get("user_id") - if userID == "" { - log.Println("user_id is missing in the query parameters") - return - } - - // Register the connection - s.mu.Lock() - s.connections[userID] = conn - s.mu.Unlock() - defer func() { - s.mu.Lock() - delete(s.connections, userID) - s.mu.Unlock() - }() - - log.Printf("WebSocket connection established for user: %s", userID) - - // Handle incoming messages - for { - log.Println("calling conn.ReadMessage()") - _, message, err := conn.ReadMessage() - if err != nil { - log.Println("Error reading WebSocket message:", err) - break - } - - // Handle the WebSocket message - s.handleMessage(userID, message) - } -} - -// handleMessage processes incoming WebSocket messages and triggers Temporal signals -func (s *WebSocketServer) handleMessage(userID string, message []byte) { - // Parse the WebSocket message - var msg WebSocketMessage - if err := json.Unmarshal(message, &msg); err != nil { - log.Println("Error parsing WebSocket message:", err) - return - } - workflowID := fmt.Sprintf("shopping_cart_%s", userID) - - switch msg.Action { - case "add": - // Signal to add an item to the cart - signalPayload := shoppingcart.CartSignalPayload{ - Action: "add", - ItemID: msg.ItemID, - Quantity: msg.Quantity, - } - log.Println("Sending signal payload", workflowID, signalPayload) - err := s.temporalClient.SignalWorkflow(context.Background(), workflowID, "", "cart_signal", signalPayload) - if err != nil { - log.Println("Error signaling workflow:", err) - } - - s.getCart(workflowID, userID) - case "get_cart": - s.getCart(workflowID, userID) - default: - log.Printf("Unknown action: %s\n", msg.Action) - } -} - -// Query the cart workflow and state back to the webapp -func (s *WebSocketServer) getCart(workflowID string, userID string) { - // Query the cart state - var cartState CartState - resp, err := s.temporalClient.QueryWorkflow(context.Background(), workflowID, "", "get_cart") - if err != nil { - log.Println("Error querying workflow:", err) - return - } - if err := resp.Get(&cartState); err != nil { - log.Fatalln("Unable to decode query result", err) - } - - // Send the cart state back to the WebSocket client - response := CartStatusMessage{ - Action: "cart_state", - Data: cartState, - } - conn := s.connections[userID] - if conn != nil { - conn.WriteJSON(response) - } -} - -func main() { - c, err := client.Dial(client.Options{}) - if err != nil { - log.Fatalf("Error creating Temporal client: %v", err) - } - defer c.Close() - - server := NewWebSocketServer(c) - - http.HandleFunc("/ws", server.HandleConnections) - log.Println("WebSocket server is running on ws://localhost:8089/ws") - if err := http.ListenAndServe(":8089", nil); err != nil { - log.Fatalf("Error starting WebSocket server: %v", err) - } -} diff --git a/shoppingcart/worker/main.go b/shoppingcart/worker/main.go index 39dfb213..4917a6da 100644 --- a/shoppingcart/worker/main.go +++ b/shoppingcart/worker/main.go @@ -19,7 +19,7 @@ func main() { } defer c.Close() - w := worker.New(c, "shopping_cart", worker.Options{}) + w := worker.New(c, shoppingcart.TaskQueueName, worker.Options{}) w.RegisterWorkflow(shoppingcart.CartWorkflow) diff --git a/shoppingcart/workflow.go b/shoppingcart/workflow.go index 567e8f6c..99d32163 100644 --- a/shoppingcart/workflow.go +++ b/shoppingcart/workflow.go @@ -3,10 +3,12 @@ package shoppingcart import ( "fmt" "go.temporal.io/sdk/workflow" + "log" ) var ( - shoppingServerHostPort = "http://localhost:8099" + UpdateName = "shopping-cart" + TaskQueueName = "shopping-cart-tq" ) type CartSignalPayload struct { @@ -19,48 +21,26 @@ type CartState map[string]int // itemID -> quantity func CartWorkflow(ctx workflow.Context) error { cart := make(CartState) - cart["apple"] = 1 - // Signal channel for cart updates - signalChannel := workflow.GetSignalChannel(ctx, "cart_signal") - - // Register a query handler to get the cart state - workflow.SetQueryHandler(ctx, "get_cart", func() (CartState, error) { - return cart, nil - }) - - workflow.GetLogger(ctx).Info("CartWorkflow started. Listening for signals...") - - // Listen for signals and update the cart state in a loop - for { - var payload CartSignalPayload - fmt.Println("[SignalPayload]", payload) - // Block until a signal is received - signalChannel.Receive(ctx, &payload) - - // Process the received signal - switch payload.Action { - case "add": - if payload.Quantity <= 0 { - delete(cart, payload.ItemID) + if err := workflow.SetUpdateHandler(ctx, UpdateName, func(ctx workflow.Context, actionType string, itemID string) (CartState, error) { + fmt.Println("Received update,", actionType, itemID) + if itemID != "" { + if actionType == "add" { + cart[itemID] += 1 + } else if actionType == "remove" { + cart[itemID] -= 1 + if cart[itemID] <= 0 { + delete(cart, itemID) + } } else { - cart[payload.ItemID] += payload.Quantity + log.Fatalln("Unknown action type:", actionType) } - workflow.GetLogger(ctx).Info("Item added to cart", "item_id", payload.ItemID, "quantity", payload.Quantity) - - case "remove": - delete(cart, payload.ItemID) - workflow.GetLogger(ctx).Info("Item removed from cart", "item_id", payload.ItemID) - - default: - workflow.GetLogger(ctx).Warn("Unknown action received", "action", payload.Action) } - - // Yield control to allow Temporal to process other tasks - //workflow.Yield(ctx) + return cart, nil + }); err != nil { + return err } - // This return statement is unreachable because the loop runs indefinitely. - // You can add logic to break the loop if needed (e.g., based on a "stop" signal). - return nil + // Keep workflow alive to continue to listen receive updates. + return workflow.Await(ctx, func() bool { return false }) } diff --git a/shoppingcart/workflow_test.go b/shoppingcart/workflow_test.go index 03c234c0..f3aa6ebb 100644 --- a/shoppingcart/workflow_test.go +++ b/shoppingcart/workflow_test.go @@ -1 +1,52 @@ package shoppingcart + +import ( + "github.com/pborman/uuid" + "github.com/stretchr/testify/require" + "testing" + + "go.temporal.io/sdk/testsuite" +) + +func Test_ShoppingCartWorkflow(t *testing.T) { + testSuite := &testsuite.WorkflowTestSuite{} + env := testSuite.NewTestWorkflowEnvironment() + updatesCompleted := 0 + + env.RegisterDelayedCallback(func() { + env.UpdateWorkflow(UpdateName, uuid.New(), &testsuite.TestUpdateCallback{ + OnAccept: func() {}, + OnReject: func(err error) { panic("unexpected rejection") }, + OnComplete: func(i interface{}, err error) { + require.NoError(t, err) + cartState, ok := i.(CartState) + if !ok { + require.Fail(t, "Invalid return type") + } + require.Equal(t, cartState["apple"], 1) + updatesCompleted++ + }, + }, "add", "apple") + }, 0) + + env.RegisterDelayedCallback(func() { + env.UpdateWorkflow(UpdateName, uuid.New(), &testsuite.TestUpdateCallback{ + OnAccept: func() {}, + OnReject: func(err error) { panic("unexpected rejection") }, + OnComplete: func(i interface{}, err error) { + require.NoError(t, err) + cartState, ok := i.(CartState) + if !ok { + require.Fail(t, "Invalid return type") + } + _, ok = cartState["apple"] + require.False(t, ok) + updatesCompleted++ + }, + }, "remove", "apple") + }, 0) + env.ExecuteWorkflow(CartWorkflow) + + require.True(t, env.IsWorkflowCompleted()) + require.Equal(t, updatesCompleted, 2) +} From 5cc6c248f2f6e86a94754e24b0013cd4a64d37b4 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Thu, 9 Jan 2025 13:40:23 -0800 Subject: [PATCH 04/10] remove websocket references --- go.mod | 1 - go.sum | 2 -- shoppingcart/server/main.go | 2 +- 3 files changed, 1 insertion(+), 4 deletions(-) diff --git a/go.mod b/go.mod index 873cb8e3..541dca17 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,6 @@ require ( github.com/golang/mock v1.7.0-rc.1 github.com/golang/snappy v0.0.4 github.com/google/uuid v1.6.0 - github.com/gorilla/websocket v1.5.0 github.com/hashicorp/go-plugin v1.4.5 github.com/nexus-rpc/sdk-go v0.1.0 github.com/opentracing/opentracing-go v1.2.0 diff --git a/go.sum b/go.sum index 887baadb..4d00d696 100644 --- a/go.sum +++ b/go.sum @@ -141,8 +141,6 @@ github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+ github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= -github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grpc-ecosystem/go-grpc-middleware v1.3.0/go.mod h1:z0ButlSOZa5vEBq9m2m2hlwIgKw+rp3sdCBRoJY+30Y= github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 h1:UH//fgunKIs4JdUbpDl1VZCDaL56wXCB/5+wF6uHfaI= github.com/grpc-ecosystem/go-grpc-middleware v1.4.0/go.mod h1:g5qyo/la0ALbONm6Vbp88Yd8NsDy6rZz+RcrMPxvld8= diff --git a/shoppingcart/server/main.go b/shoppingcart/server/main.go index e495f1a9..36ba4f11 100644 --- a/shoppingcart/server/main.go +++ b/shoppingcart/server/main.go @@ -40,7 +40,7 @@ func main() { fmt.Println("Server started on :8080") if err := http.ListenAndServe(":8080", nil); err != nil { - fmt.Println("Error starting WebSocket server:", err) + fmt.Println("Error starting server:", err) } } From c11eefb004459e1c950d214dff0ade87d7f5cd21 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Thu, 9 Jan 2025 14:44:43 -0800 Subject: [PATCH 05/10] Fix CI --- shoppingcart/workflow.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/shoppingcart/workflow.go b/shoppingcart/workflow.go index 99d32163..dbd486a8 100644 --- a/shoppingcart/workflow.go +++ b/shoppingcart/workflow.go @@ -1,9 +1,7 @@ package shoppingcart import ( - "fmt" "go.temporal.io/sdk/workflow" - "log" ) var ( @@ -23,7 +21,8 @@ func CartWorkflow(ctx workflow.Context) error { cart := make(CartState) if err := workflow.SetUpdateHandler(ctx, UpdateName, func(ctx workflow.Context, actionType string, itemID string) (CartState, error) { - fmt.Println("Received update,", actionType, itemID) + logger := workflow.GetLogger(ctx) + logger.Info("Received update,", actionType, itemID) if itemID != "" { if actionType == "add" { cart[itemID] += 1 @@ -33,7 +32,7 @@ func CartWorkflow(ctx workflow.Context) error { delete(cart, itemID) } } else { - log.Fatalln("Unknown action type:", actionType) + logger.Error("Unknown action type:", actionType) } } return cart, nil From 5d48f5eb95aae3d3f3b9efe321168a7792eb857e Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Mon, 13 Jan 2025 09:10:13 -0800 Subject: [PATCH 06/10] change cost units to cents --- shoppingcart/server/main.go | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/shoppingcart/server/main.go b/shoppingcart/server/main.go index 36ba4f11..a11a1c74 100644 --- a/shoppingcart/server/main.go +++ b/shoppingcart/server/main.go @@ -14,14 +14,15 @@ import ( var ( cartState = make(map[string]int) workflowClient client.Client - itemCosts = map[string]int{ - "apple": 2, - "banana": 1, - "watermelon": 5, - "television": 1000, - "house": 10000000, - "car": 50000, - "binder": 10, + // Units are in cents + itemCosts = map[string]int{ + "apple": 200, + "banana": 100, + "watermelon": 500, + "television": 100000, + "house": 100000000, + "car": 5000000, + "binder": 1000, } ) @@ -48,7 +49,6 @@ func listHandler(w http.ResponseWriter, _ *http.Request) { w.Header().Set("Content-Type", "text/html") // Set the content type to HTML _, _ = fmt.Fprint(w, "

DUMMY SHOPPING WEBSITE

"+ "HOME"+ - "TODO:Payment"+ "

Available Items to Purchase

") keys := make([]string, 0) @@ -59,7 +59,8 @@ func listHandler(w http.ResponseWriter, _ *http.Request) { for _, k := range keys { actionButton := fmt.Sprintf(""+ "", k) - _, _ = fmt.Fprintf(w, "", k, itemCosts[k], actionButton) + dollars := float64(itemCosts[k]) / 100 + _, _ = fmt.Fprintf(w, "", k, dollars, actionButton) } _, _ = fmt.Fprint(w, "
ItemCostAction
%s%d%s
%s$%.2f%s

Current items in cart:

"+ "") From 17476f1a1eabf8af2bfab1822237c5f7976e8bd7 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Tue, 28 Jan 2025 11:04:27 -0800 Subject: [PATCH 07/10] Add Update validator, workflow completion/CAN support --- early-return/README.md | 2 +- early-return/workflow_test.go | 8 +++--- shoppingcart/README.md | 3 ++- shoppingcart/server/main.go | 29 ++++++++++++++------ shoppingcart/workflow.go | 50 ++++++++++++++++++++++++++--------- shoppingcart/workflow_test.go | 33 +++++++++++++++++++---- 6 files changed, 94 insertions(+), 31 deletions(-) diff --git a/early-return/README.md b/early-return/README.md index 24444062..5f353dbb 100644 --- a/early-return/README.md +++ b/early-return/README.md @@ -14,7 +14,7 @@ for Update-with-Start being used for lazy initialization. NOTE: frontend.enableExecuteMultiOperation=true must be configured for the server in order to use Update-with-Start. -5) Run the following command to start the worker +2) Run the following command to start the worker ``` go run early-return/worker/main.go ``` diff --git a/early-return/workflow_test.go b/early-return/workflow_test.go index 27f1e972..a8aab98a 100644 --- a/early-return/workflow_test.go +++ b/early-return/workflow_test.go @@ -22,7 +22,7 @@ func Test_CompleteTransaction_Succeeds(t *testing.T) { env.UpdateWorkflow(UpdateName, uuid.New(), &testsuite.TestUpdateCallback{ OnAccept: func() {}, OnReject: func(err error) { - panic("unexpected rejection") + require.Fail(t, "unexpected rejection") }, OnComplete: func(i interface{}, err error) { require.NoError(t, err) @@ -50,7 +50,7 @@ func Test_CompleteTransaction_Fails(t *testing.T) { env.UpdateWorkflow(UpdateName, uuid.New(), &testsuite.TestUpdateCallback{ OnAccept: func() {}, OnReject: func(err error) { - panic("unexpected rejection") + require.Fail(t, "unexpected rejection") }, OnComplete: func(i interface{}, err error) { require.NoError(t, err) @@ -76,7 +76,7 @@ func Test_CancelTransaction(t *testing.T) { env.UpdateWorkflow(UpdateName, uuid.New(), &testsuite.TestUpdateCallback{ OnAccept: func() {}, OnReject: func(err error) { - panic("unexpected rejection") + require.Fail(t, "unexpected rejection") }, OnComplete: func(i interface{}, err error) { require.ErrorContains(t, err, "invalid Amount") @@ -103,7 +103,7 @@ func Test_CancelTransaction_Fails(t *testing.T) { env.UpdateWorkflow(UpdateName, uuid.New(), &testsuite.TestUpdateCallback{ OnAccept: func() {}, OnReject: func(err error) { - panic("unexpected rejection") + require.Fail(t, "unexpected rejection") }, OnComplete: func(i interface{}, err error) { require.ErrorContains(t, err, "invalid Amount") diff --git a/shoppingcart/README.md b/shoppingcart/README.md index 0517d8bb..ecda1545 100644 --- a/shoppingcart/README.md +++ b/shoppingcart/README.md @@ -3,7 +3,8 @@ This sample workflow shows how a shopping cart application can be implemented. This sample utilizes Update-with-Start and the `WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING` option to start and continually update the workflow with the same Update-with-Start -call. This is also known as lazy-init. +call. This is also known as lazy-init. You will see in the Temporal UI, when you checkout +your cart, the current workflow will complete and a new workflow will be created. Another interesting Update-with-Start use case is [early return](https://github.com/temporalio/samples-go/tree/main/early-return), diff --git a/shoppingcart/server/main.go b/shoppingcart/server/main.go index a11a1c74..a2de4c3f 100644 --- a/shoppingcart/server/main.go +++ b/shoppingcart/server/main.go @@ -3,6 +3,7 @@ package main import ( "context" "fmt" + "github.com/pborman/uuid" "github.com/temporalio/samples-go/shoppingcart" enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/sdk/client" @@ -12,7 +13,7 @@ import ( ) var ( - cartState = make(map[string]int) + cartState = shoppingcart.CartState{Items: make(map[string]int)} workflowClient client.Client // Units are in cents itemCosts = map[string]int{ @@ -24,6 +25,7 @@ var ( "car": 5000000, "binder": 1000, } + workflowIdNumber = uuid.New() ) func main() { @@ -48,7 +50,7 @@ func main() { func listHandler(w http.ResponseWriter, _ *http.Request) { w.Header().Set("Content-Type", "text/html") // Set the content type to HTML _, _ = fmt.Fprint(w, "

DUMMY SHOPPING WEBSITE

"+ - "HOME"+ + "HOME Checkout"+ "

Available Items to Purchase

ItemQuantityAction
") keys := make([]string, 0) @@ -65,27 +67,29 @@ func listHandler(w http.ResponseWriter, _ *http.Request) { _, _ = fmt.Fprint(w, "
ItemCostAction

Current items in cart:

"+ "") - if len(cartState) == 0 { + if len(cartState.Items) == 0 { updateWithStartCart("", "") } // List current items in cart keys = make([]string, 0) - for k := range cartState { + for k := range cartState.Items { keys = append(keys, k) } sort.Strings(keys) for _, k := range keys { removeButton := fmt.Sprintf(""+ "", k) - _, _ = fmt.Fprintf(w, "", k, cartState[k], removeButton) + _, _ = fmt.Fprintf(w, "", k, cartState.Items[k], removeButton) } _, _ = fmt.Fprint(w, "
ItemQuantityAction
%s%d%s
%s%d%s
") } func actionHandler(w http.ResponseWriter, r *http.Request) { actionType := r.URL.Query().Get("type") - if actionType != "add" && actionType != "remove" && actionType != "" { + switch actionType { + case "add", "remove", "checkout", "": + default: log.Fatalln("Invalid action type:", actionType) } id := r.URL.Query().Get("id") @@ -100,7 +104,7 @@ func actionHandler(w http.ResponseWriter, r *http.Request) { func updateWithStartCart(actionType string, id string) { ctx := context.Background() startWorkflowOp := workflowClient.NewWithStartWorkflowOperation(client.StartWorkflowOptions{ - ID: "shopping-cart-workflow", + ID: "shopping-cart-workflow" + workflowIdNumber, TaskQueue: shoppingcart.TaskQueueName, // WorkflowIDConflictPolicy is required when using UpdateWithStartWorkflow. // Here we use USE_EXISTING, because we want to reuse the running workflow, as it @@ -126,12 +130,21 @@ func updateWithStartCart(actionType string, id string) { log.Fatalln("Error issuing update-with-start:", err) } + // If someone has checked out their cart, this completes the workflow. + // We then want to create a new workflow for the next user to shop. + if actionType == "checkout" { + workflowIdNumber = uuid.New() + cartState = shoppingcart.CartState{Items: make(map[string]int)} + log.Println("Items checked out and workflow completed, starting new workflow") + return + } + log.Println("Started workflow", "WorkflowID:", updateHandle.WorkflowID(), "RunID:", updateHandle.RunID()) // Always use a zero variable before calling Get for any Go SDK API - cartState = make(map[string]int) + cartState = shoppingcart.CartState{Items: make(map[string]int)} if err = updateHandle.Get(ctx, &cartState); err != nil { log.Fatalln("Error obtaining update result:", err) } diff --git a/shoppingcart/workflow.go b/shoppingcart/workflow.go index dbd486a8..3979d918 100644 --- a/shoppingcart/workflow.go +++ b/shoppingcart/workflow.go @@ -1,6 +1,7 @@ package shoppingcart import ( + "fmt" "go.temporal.io/sdk/workflow" ) @@ -15,31 +16,56 @@ type CartSignalPayload struct { Quantity int `json:"quantity"` } -type CartState map[string]int // itemID -> quantity +type CartState struct { + Items map[string]int // itemID -> quantity +} func CartWorkflow(ctx workflow.Context) error { - cart := make(CartState) + cart := CartState{make(map[string]int)} + logger := workflow.GetLogger(ctx) + var checkout bool - if err := workflow.SetUpdateHandler(ctx, UpdateName, func(ctx workflow.Context, actionType string, itemID string) (CartState, error) { - logger := workflow.GetLogger(ctx) + if err := workflow.SetUpdateHandlerWithOptions(ctx, UpdateName, func(ctx workflow.Context, actionType string, itemID string) (CartState, error) { logger.Info("Received update,", actionType, itemID) + if actionType == "checkout" { + checkout = true + } if itemID != "" { if actionType == "add" { - cart[itemID] += 1 + cart.Items[itemID] += 1 } else if actionType == "remove" { - cart[itemID] -= 1 - if cart[itemID] <= 0 { - delete(cart, itemID) + cart.Items[itemID] -= 1 + if cart.Items[itemID] <= 0 { + delete(cart.Items, itemID) } - } else { - logger.Error("Unknown action type:", actionType) } } return cart, nil + }, workflow.UpdateHandlerOptions{ + Validator: func(ctx workflow.Context, actionType string, itemID string) error { + switch actionType { + case "add", "remove", "checkout", "": + return nil + default: + return fmt.Errorf("unsupported action type: %s", actionType) + } + }, }); err != nil { return err } - // Keep workflow alive to continue to listen receive updates. - return workflow.Await(ctx, func() bool { return false }) + err := workflow.Await(ctx, func() bool { return workflow.GetInfo(ctx).GetContinueAsNewSuggested() || checkout }) + if err != nil { + return err + } + if workflow.GetInfo(ctx).GetContinueAsNewSuggested() { + logger.Info("Continuing as new") + return workflow.NewContinueAsNewError(ctx, CartWorkflow) + } + if checkout { + return nil + } + + return fmt.Errorf("unreachable") + } diff --git a/shoppingcart/workflow_test.go b/shoppingcart/workflow_test.go index f3aa6ebb..fcd6cb21 100644 --- a/shoppingcart/workflow_test.go +++ b/shoppingcart/workflow_test.go @@ -1,6 +1,7 @@ package shoppingcart import ( + "fmt" "github.com/pborman/uuid" "github.com/stretchr/testify/require" "testing" @@ -16,14 +17,14 @@ func Test_ShoppingCartWorkflow(t *testing.T) { env.RegisterDelayedCallback(func() { env.UpdateWorkflow(UpdateName, uuid.New(), &testsuite.TestUpdateCallback{ OnAccept: func() {}, - OnReject: func(err error) { panic("unexpected rejection") }, + OnReject: func(err error) { require.Fail(t, "unexpected rejection") }, OnComplete: func(i interface{}, err error) { require.NoError(t, err) cartState, ok := i.(CartState) if !ok { require.Fail(t, "Invalid return type") } - require.Equal(t, cartState["apple"], 1) + require.Equal(t, cartState.Items["apple"], 1) updatesCompleted++ }, }, "add", "apple") @@ -32,21 +33,43 @@ func Test_ShoppingCartWorkflow(t *testing.T) { env.RegisterDelayedCallback(func() { env.UpdateWorkflow(UpdateName, uuid.New(), &testsuite.TestUpdateCallback{ OnAccept: func() {}, - OnReject: func(err error) { panic("unexpected rejection") }, + OnReject: func(err error) { require.Fail(t, "unexpected rejection") }, OnComplete: func(i interface{}, err error) { require.NoError(t, err) cartState, ok := i.(CartState) if !ok { require.Fail(t, "Invalid return type") } - _, ok = cartState["apple"] + _, ok = cartState.Items["apple"] require.False(t, ok) updatesCompleted++ }, }, "remove", "apple") }, 0) + + env.RegisterDelayedCallback(func() { + env.UpdateWorkflow(UpdateName, uuid.New(), &testsuite.TestUpdateCallback{ + OnAccept: func() { require.Fail(t, "unexpected accept") }, + OnReject: func(err error) { + require.Error(t, err) + require.Equal(t, fmt.Errorf("unsupported action type: invalid"), err) + }, + OnComplete: func(i interface{}, err error) { + }, + }, "invalid", "apple") + }, 0) + + env.RegisterDelayedCallback(func() { + env.UpdateWorkflow(UpdateName, uuid.New(), &testsuite.TestUpdateCallback{ + OnAccept: func() {}, + OnReject: func(err error) { + require.Fail(t, "unexpected rejection") + }, + OnComplete: func(i interface{}, err error) {}, + }, "checkout", "") + }, 0) env.ExecuteWorkflow(CartWorkflow) require.True(t, env.IsWorkflowCompleted()) - require.Equal(t, updatesCompleted, 2) + require.Equal(t, 2, updatesCompleted) } From 68abaa7a6ada7ef9cfd0d5f309d401c61dcc223d Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Thu, 30 Jan 2025 10:22:55 -0800 Subject: [PATCH 08/10] Add sample temporal server command to README, use sessionId to mimic sessions by cookies --- early-return/README.md | 5 +++- shoppingcart/README.md | 9 ++++-- shoppingcart/server/main.go | 55 ++++++++++++++++++++--------------- shoppingcart/workflow.go | 57 ++++++++++++++++++++----------------- 4 files changed, 74 insertions(+), 52 deletions(-) diff --git a/early-return/README.md b/early-return/README.md index 5f353dbb..3bc735e3 100644 --- a/early-return/README.md +++ b/early-return/README.md @@ -12,7 +12,10 @@ for Update-with-Start being used for lazy initialization. 1) Run a [Temporal service](https://github.com/temporalio/samples-go/tree/main/#how-to-use). NOTE: frontend.enableExecuteMultiOperation=true must be configured for the server -in order to use Update-with-Start. +in order to use Update-with-Start. For example: +``` +temporal server start-dev --dynamic-config-value frontend.enableExecuteMultiOperation=true +``` 2) Run the following command to start the worker ``` diff --git a/shoppingcart/README.md b/shoppingcart/README.md index ecda1545..527df830 100644 --- a/shoppingcart/README.md +++ b/shoppingcart/README.md @@ -4,7 +4,9 @@ This sample workflow shows how a shopping cart application can be implemented. This sample utilizes Update-with-Start and the `WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING` option to start and continually update the workflow with the same Update-with-Start call. This is also known as lazy-init. You will see in the Temporal UI, when you checkout -your cart, the current workflow will complete and a new workflow will be created. +your cart, the current workflow will complete and a new workflow will be created. This sample +only supports a single concurrent shopper, but can be extended to support concurrent shoppers, +identified with the `sessionId` infrastructure shown in this sample. Another interesting Update-with-Start use case is [early return](https://github.com/temporalio/samples-go/tree/main/early-return), @@ -15,7 +17,10 @@ portion of this shopping cart scenario. 1) Run a [Temporal service](https://github.com/temporalio/samples-go/tree/main/#how-to-use). NOTE: frontend.enableExecuteMultiOperation=true must be configured for the server -in order to use Update-with-Start. +in order to use Update-with-Start. For example: +``` +temporal server start-dev --dynamic-config-value frontend.enableExecuteMultiOperation=true +``` 2) Run the following command to start the worker ``` diff --git a/shoppingcart/server/main.go b/shoppingcart/server/main.go index a2de4c3f..3e1fc94b 100644 --- a/shoppingcart/server/main.go +++ b/shoppingcart/server/main.go @@ -13,7 +13,6 @@ import ( ) var ( - cartState = shoppingcart.CartState{Items: make(map[string]int)} workflowClient client.Client // Units are in cents itemCosts = map[string]int{ @@ -25,7 +24,7 @@ var ( "car": 5000000, "binder": 1000, } - workflowIdNumber = uuid.New() + sessionId = newSession() ) func main() { @@ -67,9 +66,7 @@ func listHandler(w http.ResponseWriter, _ *http.Request) { _, _ = fmt.Fprint(w, "

Current items in cart:

"+ "") - if len(cartState.Items) == 0 { - updateWithStartCart("", "") - } + cartState := updateWithStartCart("list", "") // List current items in cart keys = make([]string, 0) @@ -88,23 +85,39 @@ func listHandler(w http.ResponseWriter, _ *http.Request) { func actionHandler(w http.ResponseWriter, r *http.Request) { actionType := r.URL.Query().Get("type") switch actionType { - case "add", "remove", "checkout", "": + case "checkout": + err := workflowClient.SignalWorkflow(context.Background(), sessionId, "", "checkout", nil) + if err != nil { + log.Fatalln("Error signaling checkout:", err) + } + sessionId = newSession() + log.Println("Items checked out and workflow completed, starting new workflow") + case "add", "remove", "list": + id := r.URL.Query().Get("id") + updateWithStartCart(actionType, id) default: log.Fatalln("Invalid action type:", actionType) } - id := r.URL.Query().Get("id") - updateWithStartCart(actionType, id) - - if actionType != "" { + // Generate the HTML after communicating with the Temporal workflow. + // "list" already generates HTML, so skip for that scenario + if actionType != "list" { listHandler(w, r) } } -func updateWithStartCart(actionType string, id string) { +func updateWithStartCart(actionType string, id string) shoppingcart.CartState { + // Handle a client request to add an item to the shopping cart. The user is not logged in, but a session ID is + // available from a cookie, and we use this as the cart ID. The Temporal client was created at service-start + // time and is shared by all request handlers. + // + // A Workflow Type exists that can be used to represent a shopping cart. The method uses update-with-start to + // add an item to the shopping cart, creating the cart if it doesn't already exist. + // + // Note that the workflow handle is available, even if the Update fails. ctx := context.Background() startWorkflowOp := workflowClient.NewWithStartWorkflowOperation(client.StartWorkflowOptions{ - ID: "shopping-cart-workflow" + workflowIdNumber, + ID: sessionId, TaskQueue: shoppingcart.TaskQueueName, // WorkflowIDConflictPolicy is required when using UpdateWithStartWorkflow. // Here we use USE_EXISTING, because we want to reuse the running workflow, as it @@ -130,22 +143,18 @@ func updateWithStartCart(actionType string, id string) { log.Fatalln("Error issuing update-with-start:", err) } - // If someone has checked out their cart, this completes the workflow. - // We then want to create a new workflow for the next user to shop. - if actionType == "checkout" { - workflowIdNumber = uuid.New() - cartState = shoppingcart.CartState{Items: make(map[string]int)} - log.Println("Items checked out and workflow completed, starting new workflow") - return - } - - log.Println("Started workflow", + log.Println("Updated workflow", "WorkflowID:", updateHandle.WorkflowID(), "RunID:", updateHandle.RunID()) // Always use a zero variable before calling Get for any Go SDK API - cartState = shoppingcart.CartState{Items: make(map[string]int)} + cartState := shoppingcart.CartState{Items: make(map[string]int)} if err = updateHandle.Get(ctx, &cartState); err != nil { log.Fatalln("Error obtaining update result:", err) } + return cartState +} + +func newSession() string { + return "session-" + uuid.New() } diff --git a/shoppingcart/workflow.go b/shoppingcart/workflow.go index 3979d918..ead6a5e0 100644 --- a/shoppingcart/workflow.go +++ b/shoppingcart/workflow.go @@ -10,12 +10,6 @@ var ( TaskQueueName = "shopping-cart-tq" ) -type CartSignalPayload struct { - Action string `json:"action"` // "add" or "remove" - ItemID string `json:"item_id"` - Quantity int `json:"quantity"` -} - type CartState struct { Items map[string]int // itemID -> quantity } @@ -23,49 +17,60 @@ type CartState struct { func CartWorkflow(ctx workflow.Context) error { cart := CartState{make(map[string]int)} logger := workflow.GetLogger(ctx) - var checkout bool if err := workflow.SetUpdateHandlerWithOptions(ctx, UpdateName, func(ctx workflow.Context, actionType string, itemID string) (CartState, error) { logger.Info("Received update,", actionType, itemID) - if actionType == "checkout" { - checkout = true - } - if itemID != "" { - if actionType == "add" { - cart.Items[itemID] += 1 - } else if actionType == "remove" { - cart.Items[itemID] -= 1 - if cart.Items[itemID] <= 0 { - delete(cart.Items, itemID) - } + switch actionType { + case "add": + cart.Items[itemID] += 1 + case "remove": + cart.Items[itemID] -= 1 + if cart.Items[itemID] <= 0 { + delete(cart.Items, itemID) } + case "list": + default: + logger.Error("Unsupported action type.") } + return cart, nil }, workflow.UpdateHandlerOptions{ - Validator: func(ctx workflow.Context, actionType string, itemID string) error { + Validator: func(ctx workflow.Context, actionType string, itemId string) error { switch actionType { - case "add", "remove", "checkout", "": - return nil + case "add", "remove": + if itemId == "" { + return fmt.Errorf("itemId must be specified for add or remove actionType") + } + case "list": + if itemId != "" { + logger.Warn("ItemId not needed for \"list\" actionType.") + } default: return fmt.Errorf("unsupported action type: %s", actionType) } + return nil }, }); err != nil { return err } - err := workflow.Await(ctx, func() bool { return workflow.GetInfo(ctx).GetContinueAsNewSuggested() || checkout }) + signalChan := workflow.GetSignalChannel(ctx, "checkout") + + err := workflow.Await(ctx, func() bool { return workflow.GetInfo(ctx).GetContinueAsNewSuggested() || signalChan.Receive(ctx, nil) }) if err != nil { return err } if workflow.GetInfo(ctx).GetContinueAsNewSuggested() { + err := workflow.Await(ctx, func() bool { + return workflow.AllHandlersFinished(ctx) + }) + if err != nil { + return err + } logger.Info("Continuing as new") return workflow.NewContinueAsNewError(ctx, CartWorkflow) } - if checkout { - return nil - } - return fmt.Errorf("unreachable") + return nil } From 2b7166d2b058e1e6a99e62eb94a209c6efeb6bca Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Thu, 30 Jan 2025 13:48:15 -0800 Subject: [PATCH 09/10] Address more PR feedback, rename server to UI, remove references to anything "dummy" --- expense/README.md | 10 +++++----- expense/activities.go | 2 +- expense/{server => ui}/main.go | 9 +++++---- shoppingcart/README.md | 2 +- shoppingcart/{server => ui}/main.go | 20 +++++++++----------- shoppingcart/workflow.go | 6 ++++++ 6 files changed, 27 insertions(+), 22 deletions(-) rename expense/{server => ui}/main.go (92%) rename shoppingcart/{server => ui}/main.go (92%) diff --git a/expense/README.md b/expense/README.md index b34d4988..e79ae02d 100644 --- a/expense/README.md +++ b/expense/README.md @@ -9,18 +9,18 @@ method has to return before it is actually approved. This is done by returning a the activity is not completed yet. * When the expense is approved (or rejected), somewhere in the world needs to be notified, and it will need to call `client.CompleteActivity()` to tell Temporal service that that activity is now completed. - In this sample case, the dummy server does this job. In real world, you will need to register some listener + In this sample case, the sample expense system does this job. In real world, you will need to register some listener to the expense system or you will need to have your own polling agent to check for the expense status periodically. -* After the wait activity is completed, it does the payment for the expense (dummy step in this sample case). +* After the wait activity is completed, it does the payment for the expense (UI step in this sample case). -This sample relies on an a dummy expense server to work. +This sample relies on an a sample expense system to work. Get a Temporal service running [here](https://github.com/temporalio/samples-go/tree/main/#how-to-use). # Steps To Run Sample * You need a Temporal service running. README.md for more details. -* Start the dummy server +* Start the sample expense system UI ``` -go run expense/server/main.go +go run expense/ui/main.go ``` * Start workflow and activity workers ``` diff --git a/expense/activities.go b/expense/activities.go index 0eb6b1e6..876a669f 100644 --- a/expense/activities.go +++ b/expense/activities.go @@ -37,7 +37,7 @@ func CreateExpenseActivity(ctx context.Context, expenseID string) error { // waitForDecisionActivity waits for the expense decision. This activity will complete asynchronously. When this method // returns error activity.ErrResultPending, the Temporal Go SDK recognize this error, and won't mark this activity // as failed or completed. The Temporal server will wait until Client.CompleteActivity() is called or timeout happened -// whichever happen first. In this sample case, the CompleteActivity() method is called by our dummy expense server when +// whichever happen first. In this sample case, the CompleteActivity() method is called by our sample expense system when // the expense is approved. func WaitForDecisionActivity(ctx context.Context, expenseID string) (string, error) { if len(expenseID) == 0 { diff --git a/expense/server/main.go b/expense/ui/main.go similarity index 92% rename from expense/server/main.go rename to expense/ui/main.go index c343e710..9ee874db 100644 --- a/expense/server/main.go +++ b/expense/ui/main.go @@ -10,7 +10,7 @@ import ( ) /** - * Dummy server that support to list expenses, create new expense, update expense state and checking expense state. + * Sample expense system that support to list expenses, create new expense, update expense state and checking expense state. */ type expenseState string @@ -22,7 +22,7 @@ const ( completed expenseState = "COMPLETED" ) -// use memory store for this dummy server +// use memory store for this sample expense system var ( allExpense = make(map[string]expenseState) tokenMap = make(map[string][]byte) @@ -39,18 +39,19 @@ func main() { panic(err) } - fmt.Println("Starting dummy server...") http.HandleFunc("/", listHandler) http.HandleFunc("/list", listHandler) http.HandleFunc("/create", createHandler) http.HandleFunc("/action", actionHandler) http.HandleFunc("/status", statusHandler) http.HandleFunc("/registerCallback", callbackHandler) + + fmt.Println("Expense system UI available at http://localhost:8099") _ = http.ListenAndServe(":8099", nil) } func listHandler(w http.ResponseWriter, _ *http.Request) { - _, _ = fmt.Fprint(w, "

DUMMY EXPENSE SYSTEM

"+"HOME"+ + _, _ = fmt.Fprint(w, "

SAMPLE EXPENSE SYSTEM

"+"HOME"+ "

All expense requests:

ItemQuantityAction
") var keys []string for k := range allExpense { diff --git a/shoppingcart/README.md b/shoppingcart/README.md index 527df830..3c333bfd 100644 --- a/shoppingcart/README.md +++ b/shoppingcart/README.md @@ -28,5 +28,5 @@ go run shoppingcart/worker/main.go ``` 3) Run the following command to start the web app ``` -go run shoppingcart/server/main.go +go run shoppingcart/ui/main.go ``` diff --git a/shoppingcart/server/main.go b/shoppingcart/ui/main.go similarity index 92% rename from shoppingcart/server/main.go rename to shoppingcart/ui/main.go index 3e1fc94b..c5a405b8 100644 --- a/shoppingcart/server/main.go +++ b/shoppingcart/ui/main.go @@ -36,11 +36,10 @@ func main() { panic(err) } - fmt.Println("Starting dummy server...") http.HandleFunc("/", listHandler) http.HandleFunc("/action", actionHandler) - fmt.Println("Server started on :8080") + fmt.Println("Shopping Cart UI available at http://localhost:8080") if err := http.ListenAndServe(":8080", nil); err != nil { fmt.Println("Error starting server:", err) } @@ -48,7 +47,7 @@ func main() { func listHandler(w http.ResponseWriter, _ *http.Request) { w.Header().Set("Content-Type", "text/html") // Set the content type to HTML - _, _ = fmt.Fprint(w, "

DUMMY SHOPPING WEBSITE

"+ + _, _ = fmt.Fprint(w, "

SAMPLE SHOPPING WEBSITE

"+ "HOMECheckout"+ "

Available Items to Purchase

Expense IDStatusAction
") @@ -125,16 +124,15 @@ func updateWithStartCart(actionType string, id string) shoppingcart.CartState { WorkflowIDConflictPolicy: enumspb.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING, }, shoppingcart.CartWorkflow) - updateOptions := client.UpdateWorkflowOptions{ - UpdateName: shoppingcart.UpdateName, - WaitForStage: client.WorkflowUpdateStageCompleted, - Args: []interface{}{actionType, id}, - } - option := client.UpdateWithStartWorkflowOptions{ + updateWithStartOptions := client.UpdateWithStartWorkflowOptions{ StartWorkflowOperation: startWorkflowOp, - UpdateOptions: updateOptions, + UpdateOptions: client.UpdateWorkflowOptions{ + UpdateName: shoppingcart.UpdateName, + WaitForStage: client.WorkflowUpdateStageCompleted, + Args: []interface{}{actionType, id}, + }, } - updateHandle, err := workflowClient.UpdateWithStartWorkflow(ctx, option) + updateHandle, err := workflowClient.UpdateWithStartWorkflow(ctx, updateWithStartOptions) if err != nil { // For example, a client-side validation error (e.g. missing conflict // policy or invalid workflow argument types in the start operation), or diff --git a/shoppingcart/workflow.go b/shoppingcart/workflow.go index ead6a5e0..3c07b879 100644 --- a/shoppingcart/workflow.go +++ b/shoppingcart/workflow.go @@ -68,6 +68,12 @@ func CartWorkflow(ctx workflow.Context) error { return err } logger.Info("Continuing as new") + + // NOTE: In this sample workflow, using ContinueAsNew does not properly + // pass the cart state to the new workflow. This was omitted for simplicity. + // In practice, you could use an activity to query a database for your cart + // state at the beginning of the workflow, or you could change CartWorkflow + // to accept an optional CartState input param to handle this scenario. return workflow.NewContinueAsNewError(ctx, CartWorkflow) } From 35aa4f1a70da47b26d21be15feafbd61e1007cff Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Thu, 30 Jan 2025 13:58:14 -0800 Subject: [PATCH 10/10] fix test --- shoppingcart/workflow_test.go | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/shoppingcart/workflow_test.go b/shoppingcart/workflow_test.go index fcd6cb21..8079e0ea 100644 --- a/shoppingcart/workflow_test.go +++ b/shoppingcart/workflow_test.go @@ -60,13 +60,7 @@ func Test_ShoppingCartWorkflow(t *testing.T) { }, 0) env.RegisterDelayedCallback(func() { - env.UpdateWorkflow(UpdateName, uuid.New(), &testsuite.TestUpdateCallback{ - OnAccept: func() {}, - OnReject: func(err error) { - require.Fail(t, "unexpected rejection") - }, - OnComplete: func(i interface{}, err error) {}, - }, "checkout", "") + env.SignalWorkflow("checkout", nil) }, 0) env.ExecuteWorkflow(CartWorkflow)
ItemCostAction