From 3c5d2fc3b6fade865a7118575e8e37514dfee64b Mon Sep 17 00:00:00 2001 From: Prathyush PV Date: Fri, 6 Dec 2024 15:18:12 -0800 Subject: [PATCH] Emit termination reason metric --- common/metrics/metric_defs.go | 1 + common/util.go | 84 +++++++++++++++---- service/frontend/workflow_handler.go | 22 ++--- .../api/respondworkflowtaskcompleted/api.go | 2 +- service/history/consts/const.go | 6 +- service/history/workflow/context.go | 15 ++-- .../history/workflow/mutable_state_impl.go | 2 +- tests/sizelimit_test.go | 6 +- 8 files changed, 97 insertions(+), 41 deletions(-) diff --git a/common/metrics/metric_defs.go b/common/metrics/metric_defs.go index 27a8c4cce11..43579b11db0 100644 --- a/common/metrics/metric_defs.go +++ b/common/metrics/metric_defs.go @@ -920,6 +920,7 @@ var ( WorkflowFailedCount = NewCounterDef("workflow_failed") WorkflowTimeoutCount = NewCounterDef("workflow_timeout") WorkflowTerminateCount = NewCounterDef("workflow_terminate") + WorkflowForceTerminateCount = NewCounterDef("workflow_force_terminate") WorkflowContinuedAsNewCount = NewCounterDef("workflow_continued_as_new") ReplicationStreamPanic = NewCounterDef("replication_stream_panic") ReplicationStreamError = NewCounterDef("replication_stream_error") diff --git a/common/util.go b/common/util.go index 03b7ed32ae3..35c104f1560 100644 --- a/common/util.go +++ b/common/util.go @@ -58,6 +58,8 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" ) +type WorkflowTerminationReason int + const ( persistenceClientRetryInitialInterval = 50 * time.Millisecond persistenceClientRetryMaxAttempts = 2 @@ -113,23 +115,25 @@ const ( sdkClientFactoryRetryExpirationInterval = time.Minute contextExpireThreshold = 10 * time.Millisecond +) - // FailureReasonCompleteResultExceedsLimit is failureReason for complete result exceeds limit - FailureReasonCompleteResultExceedsLimit = "Complete result exceeds size limit." - // FailureReasonFailureDetailsExceedsLimit is failureReason for failure details exceeds limit - FailureReasonFailureExceedsLimit = "Failure exceeds size limit." - // FailureReasonCancelDetailsExceedsLimit is failureReason for cancel details exceeds limit - FailureReasonCancelDetailsExceedsLimit = "Cancel details exceed size limit." - // FailureReasonHeartbeatExceedsLimit is failureReason for heartbeat exceeds limit - FailureReasonHeartbeatExceedsLimit = "Heartbeat details exceed size limit." - // FailureReasonHistorySizeExceedsLimit is reason to fail workflow when history size exceeds limit - FailureReasonHistorySizeExceedsLimit = "Workflow history size exceeds limit." - // FailureReasonHistorySizeExceedsLimit is reason to fail workflow when history count exceeds limit - FailureReasonHistoryCountExceedsLimit = "Workflow history count exceeds limit." - // FailureReasonMutableStateSizeExceedsLimit is reason to fail workflow when mutable state size exceeds limit - FailureReasonMutableStateSizeExceedsLimit = "Workflow mutable state size exceeds limit." - // FailureReasonTransactionSizeExceedsLimit is the failureReason for when transaction cannot be committed because it exceeds size limit - FailureReasonTransactionSizeExceedsLimit = "Transaction size exceeds limit." +const ( + // TerminationReasonCompleteResultExceedsLimit is failureReason for complete result exceeds limit + TerminationReasonCompleteResultExceedsLimit WorkflowTerminationReason = iota + // TerminationReasonFailureExceedsLimit is failureReason for failure details exceeds limit + TerminationReasonFailureExceedsLimit + // TerminationReasonCancelDetailsExceedsLimit is failureReason for cancel details exceeds limit + TerminationReasonCancelDetailsExceedsLimit + // TerminationReasonHeartbeatExceedsLimit is failureReason for heartbeat exceeds limit + TerminationReasonHeartbeatExceedsLimit + // TerminationReasonHistorySizeExceedsLimit is reason to fail workflow when history size exceeds limit + TerminationReasonHistorySizeExceedsLimit + // TerminationReasonHistoryCountExceedsLimit is reason to fail workflow when history count exceeds limit + TerminationReasonHistoryCountExceedsLimit + // TerminationReasonMutableStateSizeExceedsLimit is reason to fail workflow when mutable state size exceeds limit + TerminationReasonMutableStateSizeExceedsLimit + // TerminationReasonTransactionSizeExceedsLimit is the failureReason for when transaction cannot be committed because it exceeds size limit + TerminationReasonTransactionSizeExceedsLimit ) var ( @@ -148,6 +152,54 @@ var ( ErrNamespaceHandover = serviceerror.NewUnavailable(fmt.Sprintf("Namespace replication in %s state.", enumspb.REPLICATION_STATE_HANDOVER.String())) ) +// String returns a short tag value for the given WorkflowTerminationReason. +func (r WorkflowTerminationReason) String() string { + switch r { + case TerminationReasonCompleteResultExceedsLimit: + return "Complete result exceeds size limit." + case TerminationReasonFailureExceedsLimit: + return "Failure exceeds size limit." + case TerminationReasonCancelDetailsExceedsLimit: + return "Cancel details exceed size limit." + case TerminationReasonHeartbeatExceedsLimit: + return "Heartbeat details exceed size limit." + case TerminationReasonHistorySizeExceedsLimit: + return "Workflow history size exceeds limit." + case TerminationReasonHistoryCountExceedsLimit: + return "Workflow history count exceeds limit." + case TerminationReasonMutableStateSizeExceedsLimit: + return "Workflow mutable state size exceeds limit." + case TerminationReasonTransactionSizeExceedsLimit: + return "Transaction size exceeds limit." + default: + return "Unknown failure." + } +} + +// MetricsTag returns a Reason tag for the given WorkflowTerminationReason. +func (r WorkflowTerminationReason) MetricsTag() metrics.Tag { + switch r { + case TerminationReasonCompleteResultExceedsLimit: + return metrics.ReasonTag("complete_result_exceeds_limit") + case TerminationReasonFailureExceedsLimit: + return metrics.ReasonTag("failure_exceeds_limit") + case TerminationReasonCancelDetailsExceedsLimit: + return metrics.ReasonTag("cancel_details_exceeds_limit") + case TerminationReasonHeartbeatExceedsLimit: + return metrics.ReasonTag("heartbeat_exceeds_limit") + case TerminationReasonHistorySizeExceedsLimit: + return metrics.ReasonTag("history_size_exceeds_limit") + case TerminationReasonHistoryCountExceedsLimit: + return metrics.ReasonTag("history_count_exceeds_limit") + case TerminationReasonMutableStateSizeExceedsLimit: + return metrics.ReasonTag("ms_size_exceeds_limit") + case TerminationReasonTransactionSizeExceedsLimit: + return metrics.ReasonTag("transaction_size_exceeds_limit") + default: + return metrics.ReasonTag("unknown_failure") + } +} + // AwaitWaitGroup calls Wait on the given wait // Returns true if the Wait() call succeeded before the timeout // Returns false if the Wait() did not return before the timeout diff --git a/service/frontend/workflow_handler.go b/service/frontend/workflow_handler.go index 67af07f62f0..a0d014da07b 100644 --- a/service/frontend/workflow_handler.go +++ b/service/frontend/workflow_handler.go @@ -1005,7 +1005,7 @@ func (wh *WorkflowHandler) RespondWorkflowTaskFailed( wh.throttledLogger, tag.BlobSizeViolationOperation("RespondWorkflowTaskFailed"), ); err != nil { - serverFailure := failure.NewServerFailure(common.FailureReasonFailureExceedsLimit, true) + serverFailure := failure.NewServerFailure(common.TerminationReasonFailureExceedsLimit.String(), true) serverFailure.Cause = failure.Truncate(request.Failure, sizeLimitWarn) request.Failure = serverFailure } @@ -1173,7 +1173,7 @@ func (wh *WorkflowHandler) RecordActivityTaskHeartbeat(ctx context.Context, requ // heartbeat details exceed size limit, we would fail the activity immediately with explicit error reason failRequest := &workflowservice.RespondActivityTaskFailedRequest{ TaskToken: request.TaskToken, - Failure: failure.NewServerFailure(common.FailureReasonHeartbeatExceedsLimit, true), + Failure: failure.NewServerFailure(common.TerminationReasonHeartbeatExceedsLimit.String(), true), Identity: request.Identity, } _, err = wh.historyClient.RespondActivityTaskFailed(ctx, &historyservice.RespondActivityTaskFailedRequest{ @@ -1263,7 +1263,7 @@ func (wh *WorkflowHandler) RecordActivityTaskHeartbeatById(ctx context.Context, // heartbeat details exceed size limit, we would fail the activity immediately with explicit error reason failRequest := &workflowservice.RespondActivityTaskFailedRequest{ TaskToken: token, - Failure: failure.NewServerFailure(common.FailureReasonHeartbeatExceedsLimit, true), + Failure: failure.NewServerFailure(common.TerminationReasonHeartbeatExceedsLimit.String(), true), Identity: request.Identity, } _, err = wh.historyClient.RespondActivityTaskFailed(ctx, &historyservice.RespondActivityTaskFailedRequest{ @@ -1338,7 +1338,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCompleted( // result exceeds blob size limit, we would record it as failure failRequest := &workflowservice.RespondActivityTaskFailedRequest{ TaskToken: request.TaskToken, - Failure: failure.NewServerFailure(common.FailureReasonCompleteResultExceedsLimit, true), + Failure: failure.NewServerFailure(common.TerminationReasonCompleteResultExceedsLimit.String(), true), Identity: request.Identity, } _, err = wh.historyClient.RespondActivityTaskFailed(ctx, &historyservice.RespondActivityTaskFailedRequest{ @@ -1430,7 +1430,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCompletedById(ctx context.Context, // result exceeds blob size limit, we would record it as failure failRequest := &workflowservice.RespondActivityTaskFailedRequest{ TaskToken: token, - Failure: failure.NewServerFailure(common.FailureReasonCompleteResultExceedsLimit, true), + Failure: failure.NewServerFailure(common.TerminationReasonCompleteResultExceedsLimit.String(), true), Identity: request.Identity, } _, err = wh.historyClient.RespondActivityTaskFailed(ctx, &historyservice.RespondActivityTaskFailedRequest{ @@ -1511,7 +1511,7 @@ func (wh *WorkflowHandler) RespondActivityTaskFailed( tag.BlobSizeViolationOperation("RespondActivityTaskFailed"), ); err != nil { // heartbeat details exceed size limit, we would fail the activity immediately with explicit error reason - response.Failures = append(response.Failures, failure.NewServerFailure(common.FailureReasonHeartbeatExceedsLimit, true)) + response.Failures = append(response.Failures, failure.NewServerFailure(common.TerminationReasonHeartbeatExceedsLimit.String(), true)) // do not send heartbeat to history service request.LastHeartbeatDetails = nil @@ -1529,7 +1529,7 @@ func (wh *WorkflowHandler) RespondActivityTaskFailed( wh.throttledLogger, tag.BlobSizeViolationOperation("RespondActivityTaskFailed"), ); err != nil { - serverFailure := failure.NewServerFailure(common.FailureReasonFailureExceedsLimit, true) + serverFailure := failure.NewServerFailure(common.TerminationReasonFailureExceedsLimit.String(), true) serverFailure.Cause = failure.Truncate(request.Failure, sizeLimitWarn) request.Failure = serverFailure @@ -1615,7 +1615,7 @@ func (wh *WorkflowHandler) RespondActivityTaskFailedById(ctx context.Context, re tag.BlobSizeViolationOperation("RespondActivityTaskFailedById"), ); err != nil { // heartbeat details exceed size limit, we would fail the activity immediately with explicit error reason - response.Failures = append(response.Failures, failure.NewServerFailure(common.FailureReasonHeartbeatExceedsLimit, true)) + response.Failures = append(response.Failures, failure.NewServerFailure(common.TerminationReasonHeartbeatExceedsLimit.String(), true)) // do not send heartbeat to history service request.LastHeartbeatDetails = nil @@ -1633,7 +1633,7 @@ func (wh *WorkflowHandler) RespondActivityTaskFailedById(ctx context.Context, re wh.throttledLogger, tag.BlobSizeViolationOperation("RespondActivityTaskFailedById"), ); err != nil { - serverFailure := failure.NewServerFailure(common.FailureReasonFailureExceedsLimit, true) + serverFailure := failure.NewServerFailure(common.TerminationReasonFailureExceedsLimit.String(), true) serverFailure.Cause = failure.Truncate(request.Failure, sizeLimitWarn) request.Failure = serverFailure @@ -1699,7 +1699,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCanceled(ctx context.Context, requ // details exceeds blob size limit, we would record it as failure failRequest := &workflowservice.RespondActivityTaskFailedRequest{ TaskToken: request.TaskToken, - Failure: failure.NewServerFailure(common.FailureReasonCancelDetailsExceedsLimit, true), + Failure: failure.NewServerFailure(common.TerminationReasonCancelDetailsExceedsLimit.String(), true), Identity: request.Identity, } _, err = wh.historyClient.RespondActivityTaskFailed(ctx, &historyservice.RespondActivityTaskFailedRequest{ @@ -1790,7 +1790,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCanceledById(ctx context.Context, // details exceeds blob size limit, we would record it as failure failRequest := &workflowservice.RespondActivityTaskFailedRequest{ TaskToken: token, - Failure: failure.NewServerFailure(common.FailureReasonCancelDetailsExceedsLimit, true), + Failure: failure.NewServerFailure(common.TerminationReasonCancelDetailsExceedsLimit.String(), true), Identity: request.Identity, } _, err = wh.historyClient.RespondActivityTaskFailed(ctx, &historyservice.RespondActivityTaskFailedRequest{ diff --git a/service/history/api/respondworkflowtaskcompleted/api.go b/service/history/api/respondworkflowtaskcompleted/api.go index 6b252d3864c..88405552a4a 100644 --- a/service/history/api/respondworkflowtaskcompleted/api.go +++ b/service/history/api/respondworkflowtaskcompleted/api.go @@ -620,7 +620,7 @@ func (handler *WorkflowTaskCompletedHandler) Invoke( if err := workflow.TerminateWorkflow( ms, - common.FailureReasonTransactionSizeExceedsLimit, + common.TerminationReasonTransactionSizeExceedsLimit.String(), payloads.EncodeString(updateErr.Error()), consts.IdentityHistoryService, false, diff --git a/service/history/consts/const.go b/service/history/consts/const.go index caaf9abf053..2e7285b1710 100644 --- a/service/history/consts/const.go +++ b/service/history/consts/const.go @@ -102,11 +102,11 @@ var ( // ErrEmptyHistoryRawEventBatch indicate that one single batch of history raw events is of size 0 ErrEmptyHistoryRawEventBatch = serviceerror.NewInvalidArgument("encountered empty history batch") // ErrHistorySizeExceedsLimit is error indicating workflow execution has exceeded system defined history size limit - ErrHistorySizeExceedsLimit = serviceerror.NewInvalidArgument(common.FailureReasonHistorySizeExceedsLimit) + ErrHistorySizeExceedsLimit = serviceerror.NewInvalidArgument(common.TerminationReasonHistorySizeExceedsLimit.String()) // ErrHistoryCountExceedsLimit is error indicating workflow execution has exceeded system defined history count limit - ErrHistoryCountExceedsLimit = serviceerror.NewInvalidArgument(common.FailureReasonHistoryCountExceedsLimit) + ErrHistoryCountExceedsLimit = serviceerror.NewInvalidArgument(common.TerminationReasonHistoryCountExceedsLimit.String()) // ErrMutableStateSizeExceedsLimit is error indicating workflow execution has exceeded system defined mutable state size limit - ErrMutableStateSizeExceedsLimit = serviceerror.NewInvalidArgument(common.FailureReasonMutableStateSizeExceedsLimit) + ErrMutableStateSizeExceedsLimit = serviceerror.NewInvalidArgument(common.TerminationReasonMutableStateSizeExceedsLimit.String()) // ErrUnknownCluster is error indicating unknown cluster ErrUnknownCluster = serviceerror.NewInvalidArgument("unknown cluster") // ErrBufferedQueryCleared is error indicating mutable state is cleared while buffered query is pending diff --git a/service/history/workflow/context.go b/service/history/workflow/context.go index 51fcc61adc2..b59d1bedb0e 100644 --- a/service/history/workflow/context.go +++ b/service/history/workflow/context.go @@ -990,7 +990,7 @@ func (c *ContextImpl) enforceHistorySizeCheck( ) (bool, error) { // Hard terminate workflow if still running and breached history size limit if c.maxHistorySizeExceeded(shardContext) { - if err := c.forceTerminateWorkflow(ctx, shardContext, common.FailureReasonHistorySizeExceedsLimit); err != nil { + if err := c.forceTerminateWorkflow(ctx, shardContext, common.TerminationReasonHistorySizeExceedsLimit); err != nil { return false, err } // Return true to caller to indicate workflow state is overwritten to force terminate execution on update @@ -1028,7 +1028,7 @@ func (c *ContextImpl) enforceHistoryCountCheck( ) (bool, error) { // Hard terminate workflow if still running and breached history count limit if c.maxHistoryCountExceeded(shardContext) { - if err := c.forceTerminateWorkflow(ctx, shardContext, common.FailureReasonHistoryCountExceedsLimit); err != nil { + if err := c.forceTerminateWorkflow(ctx, shardContext, common.TerminationReasonHistoryCountExceedsLimit); err != nil { return false, err } // Return true to caller to indicate workflow state is overwritten to force terminate execution on update @@ -1064,7 +1064,7 @@ func (c *ContextImpl) maxHistoryCountExceeded(shardContext shard.Context) bool { // TODO: ideally this check should be after closing mutable state tx, but that would require a large refactor func (c *ContextImpl) enforceMutableStateSizeCheck(ctx context.Context, shardContext shard.Context) (bool, error) { if c.maxMutableStateSizeExceeded() { - if err := c.forceTerminateWorkflow(ctx, shardContext, common.FailureReasonMutableStateSizeExceedsLimit); err != nil { + if err := c.forceTerminateWorkflow(ctx, shardContext, common.TerminationReasonMutableStateSizeExceedsLimit); err != nil { return false, err } // Return true to caller to indicate workflow state is overwritten to force terminate execution on update @@ -1100,7 +1100,7 @@ func (c *ContextImpl) maxMutableStateSizeExceeded() bool { func (c *ContextImpl) forceTerminateWorkflow( ctx context.Context, shardContext shard.Context, - failureReason string, + failureReason common.WorkflowTerminationReason, ) error { if !c.MutableState.IsWorkflowExecutionRunning() { return nil @@ -1120,14 +1120,17 @@ func (c *ContextImpl) forceTerminateWorkflow( return err } - return TerminateWorkflow( + err = TerminateWorkflow( mutableState, - failureReason, + failureReason.String(), nil, consts.IdentityHistoryService, false, nil, // No links necessary. ) + if err != nil { + return err + } } // CacheSize estimates the in-memory size of the object for cache limits. For proto objects, it uses proto.Size() diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index ed58a3b9a3b..1a24ae054ea 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -5244,7 +5244,7 @@ func (ms *MutableStateImpl) truncateRetryableActivityFailure( // nonRetryable is set to false here as only retryable failures are recorded in mutable state. // also when this method is called, the check for isRetryable is already done, so the value // is only for visibility/debugging purpose. - serverFailure := failure.NewServerFailure(common.FailureReasonFailureExceedsLimit, false) + serverFailure := failure.NewServerFailure(common.TerminationReasonFailureExceedsLimit.String(), false) serverFailure.Cause = failure.Truncate(activityFailure, sizeLimitError) return serverFailure } diff --git a/tests/sizelimit_test.go b/tests/sizelimit_test.go index 7897dca1630..7bdab9e98b7 100644 --- a/tests/sizelimit_test.go +++ b/tests/sizelimit_test.go @@ -202,7 +202,7 @@ SignalLoop: } // Signalling workflow should result in force terminating the workflow execution and returns with ResourceExhausted // error. InvalidArgument is returned by the client. - s.EqualError(signalErr, common.FailureReasonHistoryCountExceedsLimit) + s.EqualError(signalErr, common.TerminationReasonHistoryCountExceedsLimit.String()) s.IsType(&serviceerror.InvalidArgument{}, signalErr) historyEvents := s.GetHistory(s.Namespace(), &commonpb.WorkflowExecution{ @@ -451,7 +451,7 @@ func (s *SizeLimitFunctionalSuite) TestTerminateWorkflowCausedByMsSizeLimit() { s.Logger.Info("PollAndProcessWorkflowTask", tag.Error(err)) // Workflow should be force terminated at this point - s.EqualError(err, common.FailureReasonMutableStateSizeExceedsLimit) + s.EqualError(err, common.TerminationReasonMutableStateSizeExceedsLimit.String()) } // Send another signal without RunID @@ -559,7 +559,7 @@ SignalLoop: } // Signalling workflow should result in force terminating the workflow execution and returns with ResourceExhausted // error. InvalidArgument is returned by the client. - s.EqualError(signalErr, common.FailureReasonHistorySizeExceedsLimit) + s.EqualError(signalErr, common.TerminationReasonHistorySizeExceedsLimit.String()) s.IsType(&serviceerror.InvalidArgument{}, signalErr) historyEvents := s.GetHistory(s.Namespace(), &commonpb.WorkflowExecution{