From ad5045cd65358ff3a3648185c1f4afbfad5ec086 Mon Sep 17 00:00:00 2001 From: justinp <174377431+justinp-tt@users.noreply.github.com> Date: Mon, 23 Dec 2024 12:51:55 -0600 Subject: [PATCH 1/6] Ignore state machine not found during sync --- .../history/replication/executable_sync_hsm_task.go | 12 ++++++++++++ .../replication/executable_sync_hsm_task_test.go | 12 ++++++++++++ 2 files changed, 24 insertions(+) diff --git a/service/history/replication/executable_sync_hsm_task.go b/service/history/replication/executable_sync_hsm_task.go index f9ad26dab95..d81835ee372 100644 --- a/service/history/replication/executable_sync_hsm_task.go +++ b/service/history/replication/executable_sync_hsm_task.go @@ -24,6 +24,7 @@ package replication import ( "context" + "errors" "time" "go.temporal.io/api/serviceerror" @@ -37,6 +38,7 @@ import ( "go.temporal.io/server/common/namespace" serviceerrors "go.temporal.io/server/common/serviceerror" ctasks "go.temporal.io/server/common/tasks" + "go.temporal.io/server/service/history/hsm" "go.temporal.io/server/service/history/shard" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -166,6 +168,16 @@ func (e *ExecutableSyncHSMTask) HandleErr(err error) error { } return e.Execute() default: + if errors.Is(err, hsm.ErrStateMachineNotFound) { + e.Logger.Debug("Dropped sync HSM task due to missing state machine - likely deleted", + tag.WorkflowNamespaceID(e.NamespaceID), + tag.WorkflowID(e.WorkflowID), + tag.WorkflowRunID(e.RunID), + tag.TaskID(e.ExecutableTask.TaskID()), + ) + return nil + } + e.Logger.Error("Sync HSM replication task encountered error", tag.WorkflowNamespaceID(e.NamespaceID), tag.WorkflowID(e.WorkflowID), diff --git a/service/history/replication/executable_sync_hsm_task_test.go b/service/history/replication/executable_sync_hsm_task_test.go index 076f4584b5b..fa1a5fb3d95 100644 --- a/service/history/replication/executable_sync_hsm_task_test.go +++ b/service/history/replication/executable_sync_hsm_task_test.go @@ -24,6 +24,7 @@ package replication import ( "errors" + "fmt" "math/rand" "testing" "time" @@ -45,6 +46,7 @@ import ( serviceerrors "go.temporal.io/server/common/serviceerror" "go.temporal.io/server/common/xdc" "go.temporal.io/server/service/history/configs" + "go.temporal.io/server/service/history/hsm" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/tests" "go.uber.org/mock/gomock" @@ -294,3 +296,13 @@ func (s *executableSyncHSMTaskSuite) TestMarkPoisonPill() { VisibilityTime: timestamppb.New(s.task.TaskCreationTime()), }, replicationTask.RawTaskInfo) } + +func (s *executableSyncHSMTaskSuite) TestHandleErr_StateMachineNotFound() { + s.executableTask.EXPECT().GetNamespaceInfo(gomock.Any(), s.task.NamespaceID).Return(uuid.NewString(), true, nil).AnyTimes() + + err := fmt.Errorf("wrapper: %w", hsm.ErrStateMachineNotFound) + + actualErr := s.task.HandleErr(err) + + s.NoError(actualErr) +} From bc2fdd2e91872d185abcf9f051c478077ef4dbf6 Mon Sep 17 00:00:00 2001 From: justinp <174377431+justinp-tt@users.noreply.github.com> Date: Mon, 23 Dec 2024 13:34:16 -0600 Subject: [PATCH 2/6] Revert "Ignore state machine not found during sync" This reverts commit ad5045cd65358ff3a3648185c1f4afbfad5ec086. --- .../history/replication/executable_sync_hsm_task.go | 12 ------------ .../replication/executable_sync_hsm_task_test.go | 12 ------------ 2 files changed, 24 deletions(-) diff --git a/service/history/replication/executable_sync_hsm_task.go b/service/history/replication/executable_sync_hsm_task.go index d81835ee372..f9ad26dab95 100644 --- a/service/history/replication/executable_sync_hsm_task.go +++ b/service/history/replication/executable_sync_hsm_task.go @@ -24,7 +24,6 @@ package replication import ( "context" - "errors" "time" "go.temporal.io/api/serviceerror" @@ -38,7 +37,6 @@ import ( "go.temporal.io/server/common/namespace" serviceerrors "go.temporal.io/server/common/serviceerror" ctasks "go.temporal.io/server/common/tasks" - "go.temporal.io/server/service/history/hsm" "go.temporal.io/server/service/history/shard" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -168,16 +166,6 @@ func (e *ExecutableSyncHSMTask) HandleErr(err error) error { } return e.Execute() default: - if errors.Is(err, hsm.ErrStateMachineNotFound) { - e.Logger.Debug("Dropped sync HSM task due to missing state machine - likely deleted", - tag.WorkflowNamespaceID(e.NamespaceID), - tag.WorkflowID(e.WorkflowID), - tag.WorkflowRunID(e.RunID), - tag.TaskID(e.ExecutableTask.TaskID()), - ) - return nil - } - e.Logger.Error("Sync HSM replication task encountered error", tag.WorkflowNamespaceID(e.NamespaceID), tag.WorkflowID(e.WorkflowID), diff --git a/service/history/replication/executable_sync_hsm_task_test.go b/service/history/replication/executable_sync_hsm_task_test.go index fa1a5fb3d95..076f4584b5b 100644 --- a/service/history/replication/executable_sync_hsm_task_test.go +++ b/service/history/replication/executable_sync_hsm_task_test.go @@ -24,7 +24,6 @@ package replication import ( "errors" - "fmt" "math/rand" "testing" "time" @@ -46,7 +45,6 @@ import ( serviceerrors "go.temporal.io/server/common/serviceerror" "go.temporal.io/server/common/xdc" "go.temporal.io/server/service/history/configs" - "go.temporal.io/server/service/history/hsm" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/tests" "go.uber.org/mock/gomock" @@ -296,13 +294,3 @@ func (s *executableSyncHSMTaskSuite) TestMarkPoisonPill() { VisibilityTime: timestamppb.New(s.task.TaskCreationTime()), }, replicationTask.RawTaskInfo) } - -func (s *executableSyncHSMTaskSuite) TestHandleErr_StateMachineNotFound() { - s.executableTask.EXPECT().GetNamespaceInfo(gomock.Any(), s.task.NamespaceID).Return(uuid.NewString(), true, nil).AnyTimes() - - err := fmt.Errorf("wrapper: %w", hsm.ErrStateMachineNotFound) - - actualErr := s.task.HandleErr(err) - - s.NoError(actualErr) -} From 442ef67c6b860e9674133a2bf2b0a22bd2c0d13c Mon Sep 17 00:00:00 2001 From: justinp <174377431+justinp-tt@users.noreply.github.com> Date: Mon, 23 Dec 2024 13:34:35 -0600 Subject: [PATCH 3/6] Ignore state machine not found during sync --- service/history/ndc/hsm_state_replicator.go | 17 ++++++-- .../history/ndc/hsm_state_replicator_test.go | 41 +++++++++++++++++++ 2 files changed, 54 insertions(+), 4 deletions(-) diff --git a/service/history/ndc/hsm_state_replicator.go b/service/history/ndc/hsm_state_replicator.go index 722a9c2ce9d..abb9c447711 100644 --- a/service/history/ndc/hsm_state_replicator.go +++ b/service/history/ndc/hsm_state_replicator.go @@ -183,10 +183,19 @@ func (r *HSMStateReplicatorImpl) syncHSMNode( incomingNodePath := incomingNode.Path() currentNode, err := currentHSM.Child(incomingNodePath) if err != nil { - // 1. Already done history resend if needed before, - // and node creation today always associated with an event - // 2. Node deletion is not supported right now. - // Based on 1 and 2, node should always be found here. + // The node may not be found if: + // 1. The state machine was deleted (e.g. terminal state cleanup) + // 2. We're missing events that created this node + if errors.Is(err, hsm.ErrStateMachineNotFound) { + // In terminal state, nodes can be deleted + // Ignore the error and continue processing other nodes + r.logger.Debug("State machine not found - likely deleted in terminal state", + tag.WorkflowNamespaceID(mutableState.GetExecutionInfo().NamespaceId), + tag.WorkflowID(mutableState.GetExecutionInfo().WorkflowId), + tag.WorkflowRunID(mutableState.GetExecutionInfo().OriginalExecutionRunId), + ) + return nil + } return err } diff --git a/service/history/ndc/hsm_state_replicator_test.go b/service/history/ndc/hsm_state_replicator_test.go index c623627c448..38fd47a2554 100644 --- a/service/history/ndc/hsm_state_replicator_test.go +++ b/service/history/ndc/hsm_state_replicator_test.go @@ -702,6 +702,47 @@ func (s *hsmStateReplicatorSuite) TestSyncHSM_IncomingStateNewer_WorkflowClosed( s.NoError(err) } +func (s *hsmStateReplicatorSuite) TestSyncHSM_StateMachineNotFound() { + persistedState := s.buildWorkflowMutableState() + // Remove the child1 state machine so it doesn't exist + delete(persistedState.ExecutionInfo.SubStateMachinesByType[s.stateMachineDef.Type()].MachinesById, "child1") + + s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), &persistence.GetWorkflowExecutionRequest{ + ShardID: s.mockShard.GetShardID(), + NamespaceID: s.workflowKey.NamespaceID, + WorkflowID: s.workflowKey.WorkflowID, + RunID: s.workflowKey.RunID, + }).Return(&persistence.GetWorkflowExecutionResponse{ + State: persistedState, + DBRecordVersion: 777, + }, nil).Times(1) + + err := s.nDCHSMStateReplicator.SyncHSMState(context.Background(), &shard.SyncHSMRequest{ + WorkflowKey: s.workflowKey, + EventVersionHistory: persistedState.ExecutionInfo.VersionHistories.Histories[0], + StateMachineNode: &persistencespb.StateMachineNode{ + Children: map[string]*persistencespb.StateMachineMap{ + s.stateMachineDef.Type(): { + MachinesById: map[string]*persistencespb.StateMachineNode{ + "child1": { + Data: []byte(hsmtest.State3), + InitialVersionedTransition: &persistencespb.VersionedTransition{ + NamespaceFailoverVersion: s.namespaceEntry.FailoverVersion(), + }, + LastUpdateVersionedTransition: &persistencespb.VersionedTransition{ + NamespaceFailoverVersion: s.namespaceEntry.FailoverVersion() + 100, + }, + TransitionCount: 50, + }, + }, + }, + }, + }, + }) + + s.NoError(err) // Expect no error as we should gracefully handle missing state machines +} + func (s *hsmStateReplicatorSuite) buildWorkflowMutableState() *persistencespb.WorkflowMutableState { info := &persistencespb.WorkflowExecutionInfo{ From d0460e66820b60fb6293eb1529f16796d2c09dc0 Mon Sep 17 00:00:00 2001 From: justinp <174377431+justinp-tt@users.noreply.github.com> Date: Wed, 22 Jan 2025 16:26:57 -0600 Subject: [PATCH 4/6] Ignore state machine not found during sync --- service/history/ndc/hsm_state_replicator.go | 40 ++++++++++++++++----- 1 file changed, 31 insertions(+), 9 deletions(-) diff --git a/service/history/ndc/hsm_state_replicator.go b/service/history/ndc/hsm_state_replicator.go index abb9c447711..e7da5b95443 100644 --- a/service/history/ndc/hsm_state_replicator.go +++ b/service/history/ndc/hsm_state_replicator.go @@ -184,17 +184,39 @@ func (r *HSMStateReplicatorImpl) syncHSMNode( currentNode, err := currentHSM.Child(incomingNodePath) if err != nil { // The node may not be found if: - // 1. The state machine was deleted (e.g. terminal state cleanup) - // 2. We're missing events that created this node + // State machine was deleted in terminal state. + // Both state machine creation and deletion are always associated with an event, so any missing state + // machine must have a corresponding event in history. if errors.Is(err, hsm.ErrStateMachineNotFound) { - // In terminal state, nodes can be deleted - // Ignore the error and continue processing other nodes - r.logger.Debug("State machine not found - likely deleted in terminal state", - tag.WorkflowNamespaceID(mutableState.GetExecutionInfo().NamespaceId), - tag.WorkflowID(mutableState.GetExecutionInfo().WorkflowId), - tag.WorkflowRunID(mutableState.GetExecutionInfo().OriginalExecutionRunId), + notFoundErr := err + // Get the last items from both version histories + currentVersionHistory, err := versionhistory.GetCurrentVersionHistory( + mutableState.GetExecutionInfo().GetVersionHistories(), ) - return nil + if err != nil { + return err + } + lastLocalItem, err := versionhistory.GetLastVersionHistoryItem(currentVersionHistory) + if err != nil { + return err + } + lastIncomingItem, err := versionhistory.GetLastVersionHistoryItem(request.EventVersionHistory) + if err != nil { + return err + } + + // Only accept "not found" if our version history is ahead + if versionhistory.CompareVersionHistoryItem(lastLocalItem, lastIncomingItem) > 0 { + r.logger.Debug("State machine not found - likely deleted in terminal state", + tag.WorkflowNamespaceID(mutableState.GetExecutionInfo().NamespaceId), + tag.WorkflowID(mutableState.GetExecutionInfo().WorkflowId), + tag.WorkflowRunID(mutableState.GetExecutionInfo().OriginalExecutionRunId), + ) + return nil + } + + // Otherwise, we might be missing events + return notFoundErr } return err } From 18071c1d471acd6a0f153a927ed62b1a726dc6d0 Mon Sep 17 00:00:00 2001 From: justinp <174377431+justinp-tt@users.noreply.github.com> Date: Thu, 23 Jan 2025 16:32:57 -0600 Subject: [PATCH 5/6] Ignore state machine not found during sync --- .../history/ndc/hsm_state_replicator_test.go | 103 ++++++++++++++---- 1 file changed, 80 insertions(+), 23 deletions(-) diff --git a/service/history/ndc/hsm_state_replicator_test.go b/service/history/ndc/hsm_state_replicator_test.go index 38fd47a2554..fb57e67f65c 100644 --- a/service/history/ndc/hsm_state_replicator_test.go +++ b/service/history/ndc/hsm_state_replicator_test.go @@ -24,6 +24,7 @@ package ndc import ( "context" + "errors" "testing" "github.com/pborman/uuid" @@ -703,9 +704,14 @@ func (s *hsmStateReplicatorSuite) TestSyncHSM_IncomingStateNewer_WorkflowClosed( } func (s *hsmStateReplicatorSuite) TestSyncHSM_StateMachineNotFound() { + const ( + deletedMachineID = "child1" + initialCount = 50 + ) + persistedState := s.buildWorkflowMutableState() - // Remove the child1 state machine so it doesn't exist - delete(persistedState.ExecutionInfo.SubStateMachinesByType[s.stateMachineDef.Type()].MachinesById, "child1") + // Remove the state machine to simulate deletion + delete(persistedState.ExecutionInfo.SubStateMachinesByType[s.stateMachineDef.Type()].MachinesById, deletedMachineID) s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), &persistence.GetWorkflowExecutionRequest{ ShardID: s.mockShard.GetShardID(), @@ -715,32 +721,83 @@ func (s *hsmStateReplicatorSuite) TestSyncHSM_StateMachineNotFound() { }).Return(&persistence.GetWorkflowExecutionResponse{ State: persistedState, DBRecordVersion: 777, - }, nil).Times(1) + }, nil).AnyTimes() + + baseVersionHistory := persistedState.ExecutionInfo.VersionHistories.Histories[persistedState.ExecutionInfo.VersionHistories.CurrentVersionHistoryIndex] + + testCases := []struct { + name string + versionHistory *historyspb.VersionHistory + expectError bool + }{ + { + name: "older incoming version - ignore missing state machine since local version is newer", + versionHistory: &historyspb.VersionHistory{ + Items: []*historyspb.VersionHistoryItem{ + {EventId: 50, Version: s.namespaceEntry.FailoverVersion() - 100}, + }, + }, + expectError: false, + }, + { + name: "newer incoming version - return original notFoundErr since local version is behind", + versionHistory: func() *historyspb.VersionHistory { + // Start by copying ALL items from base history + vh := &historyspb.VersionHistory{ + Items: make([]*historyspb.VersionHistoryItem, len(baseVersionHistory.Items)), + } + // Copy all but last item exactly to establish joint point + for i := 0; i < len(baseVersionHistory.Items)-1; i++ { + vh.Items[i] = &historyspb.VersionHistoryItem{ + EventId: baseVersionHistory.Items[i].EventId, + Version: baseVersionHistory.Items[i].Version, + } + } + // Add final item with higher version but next event ID + vh.Items[len(vh.Items)-1] = &historyspb.VersionHistoryItem{ + EventId: baseVersionHistory.Items[len(baseVersionHistory.Items)-1].EventId + 1, + Version: s.namespaceEntry.FailoverVersion() + 100, + } + return vh + }(), + expectError: true, + }, + } - err := s.nDCHSMStateReplicator.SyncHSMState(context.Background(), &shard.SyncHSMRequest{ - WorkflowKey: s.workflowKey, - EventVersionHistory: persistedState.ExecutionInfo.VersionHistories.Histories[0], - StateMachineNode: &persistencespb.StateMachineNode{ - Children: map[string]*persistencespb.StateMachineMap{ - s.stateMachineDef.Type(): { - MachinesById: map[string]*persistencespb.StateMachineNode{ - "child1": { - Data: []byte(hsmtest.State3), - InitialVersionedTransition: &persistencespb.VersionedTransition{ - NamespaceFailoverVersion: s.namespaceEntry.FailoverVersion(), - }, - LastUpdateVersionedTransition: &persistencespb.VersionedTransition{ - NamespaceFailoverVersion: s.namespaceEntry.FailoverVersion() + 100, + for _, tc := range testCases { + tc := tc + s.Run(tc.name, func() { + err := s.nDCHSMStateReplicator.SyncHSMState(context.Background(), &shard.SyncHSMRequest{ + WorkflowKey: s.workflowKey, + EventVersionHistory: tc.versionHistory, + StateMachineNode: &persistencespb.StateMachineNode{ + Children: map[string]*persistencespb.StateMachineMap{ + s.stateMachineDef.Type(): { + MachinesById: map[string]*persistencespb.StateMachineNode{ + deletedMachineID: { + Data: []byte(hsmtest.State3), + InitialVersionedTransition: &persistencespb.VersionedTransition{ + NamespaceFailoverVersion: tc.versionHistory.GetItems()[len(tc.versionHistory.GetItems())-1].GetVersion(), + }, + LastUpdateVersionedTransition: &persistencespb.VersionedTransition{ + NamespaceFailoverVersion: tc.versionHistory.GetItems()[len(tc.versionHistory.GetItems())-1].GetVersion(), + }, + TransitionCount: initialCount, + }, }, - TransitionCount: 50, }, }, }, - }, - }, - }) - - s.NoError(err) // Expect no error as we should gracefully handle missing state machines + }) + + if tc.expectError { + s.Error(err) + s.True(errors.Is(err, hsm.ErrStateMachineNotFound), "expected ErrStateMachineNotFound error") + } else { + s.NoError(err) + } + }) + } } func (s *hsmStateReplicatorSuite) buildWorkflowMutableState() *persistencespb.WorkflowMutableState { From d1b0d7a4a52ba04476efd8a3f386d56489fefd86 Mon Sep 17 00:00:00 2001 From: justinp <174377431+justinp-tt@users.noreply.github.com> Date: Fri, 24 Jan 2025 16:44:55 -0600 Subject: [PATCH 6/6] Ignore state machine not found during sync --- .../history/ndc/hsm_state_replicator_test.go | 42 +++++++------------ 1 file changed, 16 insertions(+), 26 deletions(-) diff --git a/service/history/ndc/hsm_state_replicator_test.go b/service/history/ndc/hsm_state_replicator_test.go index fb57e67f65c..a126d8d05ac 100644 --- a/service/history/ndc/hsm_state_replicator_test.go +++ b/service/history/ndc/hsm_state_replicator_test.go @@ -709,7 +709,9 @@ func (s *hsmStateReplicatorSuite) TestSyncHSM_StateMachineNotFound() { initialCount = 50 ) + baseVersion := s.namespaceEntry.FailoverVersion() persistedState := s.buildWorkflowMutableState() + // Remove the state machine to simulate deletion delete(persistedState.ExecutionInfo.SubStateMachinesByType[s.stateMachineDef.Type()].MachinesById, deletedMachineID) @@ -723,43 +725,29 @@ func (s *hsmStateReplicatorSuite) TestSyncHSM_StateMachineNotFound() { DBRecordVersion: 777, }, nil).AnyTimes() - baseVersionHistory := persistedState.ExecutionInfo.VersionHistories.Histories[persistedState.ExecutionInfo.VersionHistories.CurrentVersionHistoryIndex] - testCases := []struct { name string versionHistory *historyspb.VersionHistory expectError bool }{ { - name: "older incoming version - ignore missing state machine since local version is newer", + name: "local version higher - ignore missing state machine", versionHistory: &historyspb.VersionHistory{ Items: []*historyspb.VersionHistoryItem{ - {EventId: 50, Version: s.namespaceEntry.FailoverVersion() - 100}, + {EventId: 50, Version: baseVersion - 100}, + {EventId: 102, Version: baseVersion - 50}, }, }, expectError: false, }, { - name: "newer incoming version - return original notFoundErr since local version is behind", - versionHistory: func() *historyspb.VersionHistory { - // Start by copying ALL items from base history - vh := &historyspb.VersionHistory{ - Items: make([]*historyspb.VersionHistoryItem, len(baseVersionHistory.Items)), - } - // Copy all but last item exactly to establish joint point - for i := 0; i < len(baseVersionHistory.Items)-1; i++ { - vh.Items[i] = &historyspb.VersionHistoryItem{ - EventId: baseVersionHistory.Items[i].EventId, - Version: baseVersionHistory.Items[i].Version, - } - } - // Add final item with higher version but next event ID - vh.Items[len(vh.Items)-1] = &historyspb.VersionHistoryItem{ - EventId: baseVersionHistory.Items[len(baseVersionHistory.Items)-1].EventId + 1, - Version: s.namespaceEntry.FailoverVersion() + 100, - } - return vh - }(), + name: "incoming version higher - return notFoundErr", + versionHistory: &historyspb.VersionHistory{ + Items: []*historyspb.VersionHistoryItem{ + {EventId: 50, Version: baseVersion - 100}, + {EventId: 102, Version: baseVersion}, + }, + }, expectError: true, }, } @@ -767,6 +755,8 @@ func (s *hsmStateReplicatorSuite) TestSyncHSM_StateMachineNotFound() { for _, tc := range testCases { tc := tc s.Run(tc.name, func() { + lastVersion := tc.versionHistory.Items[len(tc.versionHistory.Items)-1].Version + err := s.nDCHSMStateReplicator.SyncHSMState(context.Background(), &shard.SyncHSMRequest{ WorkflowKey: s.workflowKey, EventVersionHistory: tc.versionHistory, @@ -777,10 +767,10 @@ func (s *hsmStateReplicatorSuite) TestSyncHSM_StateMachineNotFound() { deletedMachineID: { Data: []byte(hsmtest.State3), InitialVersionedTransition: &persistencespb.VersionedTransition{ - NamespaceFailoverVersion: tc.versionHistory.GetItems()[len(tc.versionHistory.GetItems())-1].GetVersion(), + NamespaceFailoverVersion: lastVersion, }, LastUpdateVersionedTransition: &persistencespb.VersionedTransition{ - NamespaceFailoverVersion: tc.versionHistory.GetItems()[len(tc.versionHistory.GetItems())-1].GetVersion(), + NamespaceFailoverVersion: lastVersion, }, TransitionCount: initialCount, },