Skip to content

Commit

Permalink
IWF-364: Fix conditional complete on messages of internalChannels fro…
Browse files Browse the repository at this point in the history
…m state APIs (#490)
  • Loading branch information
longquanzheng authored Nov 22, 2024
1 parent 9739cc1 commit 425478b
Show file tree
Hide file tree
Showing 7 changed files with 1,492 additions and 34 deletions.
24 changes: 19 additions & 5 deletions integ/conditional_close_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,14 @@ func doTestConditionalForceCompleteOnChannelEmptyWorkflow(
history, _ := wfHandler.GetTestResult()

expectMap := map[string]int64{
"S1_start": 3,
"S1_decide": 3,
"S1_start": 4,
"S1_decide": 4,
}
if useSignalChannel {
expectMap = map[string]int64{
"S1_start": 3,
"S1_decide": 3,
}
}
if !useSignalChannel {
expectMap[conditionalClose.RpcPublishInternalChannel] = 3
Expand All @@ -153,9 +159,17 @@ func doTestConditionalForceCompleteOnChannelEmptyWorkflow(

assertions.Equal(iwfidl.COMPLETED, resp2.GetWorkflowStatus())
assertions.Equal(1, len(resp2.GetResults()))
assertions.Equal(iwfidl.StateCompletionOutput{
expectedOutput := iwfidl.StateCompletionOutput{
CompletedStateId: "S1",
CompletedStateExecutionId: "S1-3",
CompletedStateExecutionId: "S1-4",
CompletedStateOutput: &conditionalClose.TestInput,
}, resp2.GetResults()[0])
}
if useSignalChannel {
expectedOutput = iwfidl.StateCompletionOutput{
CompletedStateId: "S1",
CompletedStateExecutionId: "S1-3",
CompletedStateOutput: &conditionalClose.TestInput,
}
}
assertions.Equal(expectedOutput, resp2.GetResults()[0])
}
10 changes: 10 additions & 0 deletions integ/workflow/conditional_close/routers.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,19 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context) {
h.invokeHistory[req.GetWorkflowStateId()+"_decide"]++
if req.GetWorkflowStateId() == State1 {

var internalChanPub []iwfidl.InterStateChannelPublishing
context := req.GetContext()
if context.GetStateExecutionId() == "S1-1" {
// wait for 3 seconds so that the channel can have a new message
time.Sleep(time.Second * 3)
} else if context.GetStateExecutionId() == "S1-3" {
// send internal channel message within the state execution
// and expecting the messages are processed by the conditional check
internalChanPub = []iwfidl.InterStateChannelPublishing{
{
ChannelName: TestChannelName,
Value: &TestInput,
}}
}

conditionalClose := &iwfidl.WorkflowConditionalClose{
Expand All @@ -131,6 +140,7 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context) {
}

c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{
PublishToInterStateChannel: internalChanPub,
StateDecision: &iwfidl.StateDecision{
NextStates: []iwfidl.StateMovement{
{
Expand Down
Loading

0 comments on commit 425478b

Please sign in to comment.