Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Edit termination reason metric #6951

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions common/metrics/metric_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -920,6 +920,7 @@ var (
WorkflowFailedCount = NewCounterDef("workflow_failed")
WorkflowTimeoutCount = NewCounterDef("workflow_timeout")
WorkflowTerminateCount = NewCounterDef("workflow_terminate")
WorkflowForceTerminateCount = NewCounterDef("workflow_force_terminate")
WorkflowContinuedAsNewCount = NewCounterDef("workflow_continued_as_new")
ReplicationStreamPanic = NewCounterDef("replication_stream_panic")
ReplicationStreamError = NewCounterDef("replication_stream_error")
Expand Down
84 changes: 68 additions & 16 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"
)

type WorkflowTerminationReason int

const (
persistenceClientRetryInitialInterval = 50 * time.Millisecond
persistenceClientRetryMaxAttempts = 2
Expand Down Expand Up @@ -113,23 +115,25 @@ const (
sdkClientFactoryRetryExpirationInterval = time.Minute

contextExpireThreshold = 10 * time.Millisecond
)

// FailureReasonCompleteResultExceedsLimit is failureReason for complete result exceeds limit
FailureReasonCompleteResultExceedsLimit = "Complete result exceeds size limit."
// FailureReasonFailureDetailsExceedsLimit is failureReason for failure details exceeds limit
FailureReasonFailureExceedsLimit = "Failure exceeds size limit."
// FailureReasonCancelDetailsExceedsLimit is failureReason for cancel details exceeds limit
FailureReasonCancelDetailsExceedsLimit = "Cancel details exceed size limit."
// FailureReasonHeartbeatExceedsLimit is failureReason for heartbeat exceeds limit
FailureReasonHeartbeatExceedsLimit = "Heartbeat details exceed size limit."
// FailureReasonHistorySizeExceedsLimit is reason to fail workflow when history size exceeds limit
FailureReasonHistorySizeExceedsLimit = "Workflow history size exceeds limit."
// FailureReasonHistorySizeExceedsLimit is reason to fail workflow when history count exceeds limit
FailureReasonHistoryCountExceedsLimit = "Workflow history count exceeds limit."
// FailureReasonMutableStateSizeExceedsLimit is reason to fail workflow when mutable state size exceeds limit
FailureReasonMutableStateSizeExceedsLimit = "Workflow mutable state size exceeds limit."
// FailureReasonTransactionSizeExceedsLimit is the failureReason for when transaction cannot be committed because it exceeds size limit
FailureReasonTransactionSizeExceedsLimit = "Transaction size exceeds limit."
const (
// TerminationReasonCompleteResultExceedsLimit is failureReason for complete result exceeds limit
TerminationReasonCompleteResultExceedsLimit WorkflowTerminationReason = iota
// TerminationReasonFailureExceedsLimit is failureReason for failure details exceeds limit
TerminationReasonFailureExceedsLimit
// TerminationReasonCancelDetailsExceedsLimit is failureReason for cancel details exceeds limit
TerminationReasonCancelDetailsExceedsLimit
// TerminationReasonHeartbeatExceedsLimit is failureReason for heartbeat exceeds limit
TerminationReasonHeartbeatExceedsLimit
// TerminationReasonHistorySizeExceedsLimit is reason to fail workflow when history size exceeds limit
TerminationReasonHistorySizeExceedsLimit
// TerminationReasonHistoryCountExceedsLimit is reason to fail workflow when history count exceeds limit
TerminationReasonHistoryCountExceedsLimit
// TerminationReasonMutableStateSizeExceedsLimit is reason to fail workflow when mutable state size exceeds limit
TerminationReasonMutableStateSizeExceedsLimit
// TerminationReasonTransactionSizeExceedsLimit is the failureReason for when transaction cannot be committed because it exceeds size limit
TerminationReasonTransactionSizeExceedsLimit
)

var (
Expand All @@ -148,6 +152,54 @@ var (
ErrNamespaceHandover = serviceerror.NewUnavailable(fmt.Sprintf("Namespace replication in %s state.", enumspb.REPLICATION_STATE_HANDOVER.String()))
)

// String returns a short tag value for the given WorkflowTerminationReason.
func (r WorkflowTerminationReason) String() string {
switch r {
case TerminationReasonCompleteResultExceedsLimit:
return "Complete result exceeds size limit."
case TerminationReasonFailureExceedsLimit:
return "Failure exceeds size limit."
case TerminationReasonCancelDetailsExceedsLimit:
return "Cancel details exceed size limit."
case TerminationReasonHeartbeatExceedsLimit:
return "Heartbeat details exceed size limit."
case TerminationReasonHistorySizeExceedsLimit:
return "Workflow history size exceeds limit."
case TerminationReasonHistoryCountExceedsLimit:
return "Workflow history count exceeds limit."
case TerminationReasonMutableStateSizeExceedsLimit:
return "Workflow mutable state size exceeds limit."
case TerminationReasonTransactionSizeExceedsLimit:
return "Transaction size exceeds limit."
default:
return "Unknown failure."
}
}

// MetricsTag returns a Reason tag for the given WorkflowTerminationReason.
func (r WorkflowTerminationReason) MetricsTag() metrics.Tag {
switch r {
case TerminationReasonCompleteResultExceedsLimit:
return metrics.ReasonTag("complete_result_exceeds_limit")
case TerminationReasonFailureExceedsLimit:
return metrics.ReasonTag("failure_exceeds_limit")
case TerminationReasonCancelDetailsExceedsLimit:
return metrics.ReasonTag("cancel_details_exceeds_limit")
case TerminationReasonHeartbeatExceedsLimit:
return metrics.ReasonTag("heartbeat_exceeds_limit")
case TerminationReasonHistorySizeExceedsLimit:
return metrics.ReasonTag("history_size_exceeds_limit")
case TerminationReasonHistoryCountExceedsLimit:
return metrics.ReasonTag("history_count_exceeds_limit")
case TerminationReasonMutableStateSizeExceedsLimit:
return metrics.ReasonTag("ms_size_exceeds_limit")
case TerminationReasonTransactionSizeExceedsLimit:
return metrics.ReasonTag("transaction_size_exceeds_limit")
default:
return metrics.ReasonTag("unknown_failure")
}
}

// AwaitWaitGroup calls Wait on the given wait
// Returns true if the Wait() call succeeded before the timeout
// Returns false if the Wait() did not return before the timeout
Expand Down
22 changes: 11 additions & 11 deletions service/frontend/workflow_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1005,7 +1005,7 @@ func (wh *WorkflowHandler) RespondWorkflowTaskFailed(
wh.throttledLogger,
tag.BlobSizeViolationOperation("RespondWorkflowTaskFailed"),
); err != nil {
serverFailure := failure.NewServerFailure(common.FailureReasonFailureExceedsLimit, true)
serverFailure := failure.NewServerFailure(common.TerminationReasonFailureExceedsLimit.String(), true)
serverFailure.Cause = failure.Truncate(request.Failure, sizeLimitWarn)
request.Failure = serverFailure
}
Expand Down Expand Up @@ -1173,7 +1173,7 @@ func (wh *WorkflowHandler) RecordActivityTaskHeartbeat(ctx context.Context, requ
// heartbeat details exceed size limit, we would fail the activity immediately with explicit error reason
failRequest := &workflowservice.RespondActivityTaskFailedRequest{
TaskToken: request.TaskToken,
Failure: failure.NewServerFailure(common.FailureReasonHeartbeatExceedsLimit, true),
Failure: failure.NewServerFailure(common.TerminationReasonHeartbeatExceedsLimit.String(), true),
Identity: request.Identity,
}
_, err = wh.historyClient.RespondActivityTaskFailed(ctx, &historyservice.RespondActivityTaskFailedRequest{
Expand Down Expand Up @@ -1263,7 +1263,7 @@ func (wh *WorkflowHandler) RecordActivityTaskHeartbeatById(ctx context.Context,
// heartbeat details exceed size limit, we would fail the activity immediately with explicit error reason
failRequest := &workflowservice.RespondActivityTaskFailedRequest{
TaskToken: token,
Failure: failure.NewServerFailure(common.FailureReasonHeartbeatExceedsLimit, true),
Failure: failure.NewServerFailure(common.TerminationReasonHeartbeatExceedsLimit.String(), true),
Identity: request.Identity,
}
_, err = wh.historyClient.RespondActivityTaskFailed(ctx, &historyservice.RespondActivityTaskFailedRequest{
Expand Down Expand Up @@ -1338,7 +1338,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCompleted(
// result exceeds blob size limit, we would record it as failure
failRequest := &workflowservice.RespondActivityTaskFailedRequest{
TaskToken: request.TaskToken,
Failure: failure.NewServerFailure(common.FailureReasonCompleteResultExceedsLimit, true),
Failure: failure.NewServerFailure(common.TerminationReasonCompleteResultExceedsLimit.String(), true),
Identity: request.Identity,
}
_, err = wh.historyClient.RespondActivityTaskFailed(ctx, &historyservice.RespondActivityTaskFailedRequest{
Expand Down Expand Up @@ -1430,7 +1430,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCompletedById(ctx context.Context,
// result exceeds blob size limit, we would record it as failure
failRequest := &workflowservice.RespondActivityTaskFailedRequest{
TaskToken: token,
Failure: failure.NewServerFailure(common.FailureReasonCompleteResultExceedsLimit, true),
Failure: failure.NewServerFailure(common.TerminationReasonCompleteResultExceedsLimit.String(), true),
Identity: request.Identity,
}
_, err = wh.historyClient.RespondActivityTaskFailed(ctx, &historyservice.RespondActivityTaskFailedRequest{
Expand Down Expand Up @@ -1511,7 +1511,7 @@ func (wh *WorkflowHandler) RespondActivityTaskFailed(
tag.BlobSizeViolationOperation("RespondActivityTaskFailed"),
); err != nil {
// heartbeat details exceed size limit, we would fail the activity immediately with explicit error reason
response.Failures = append(response.Failures, failure.NewServerFailure(common.FailureReasonHeartbeatExceedsLimit, true))
response.Failures = append(response.Failures, failure.NewServerFailure(common.TerminationReasonHeartbeatExceedsLimit.String(), true))

// do not send heartbeat to history service
request.LastHeartbeatDetails = nil
Expand All @@ -1529,7 +1529,7 @@ func (wh *WorkflowHandler) RespondActivityTaskFailed(
wh.throttledLogger,
tag.BlobSizeViolationOperation("RespondActivityTaskFailed"),
); err != nil {
serverFailure := failure.NewServerFailure(common.FailureReasonFailureExceedsLimit, true)
serverFailure := failure.NewServerFailure(common.TerminationReasonFailureExceedsLimit.String(), true)
serverFailure.Cause = failure.Truncate(request.Failure, sizeLimitWarn)
request.Failure = serverFailure

Expand Down Expand Up @@ -1615,7 +1615,7 @@ func (wh *WorkflowHandler) RespondActivityTaskFailedById(ctx context.Context, re
tag.BlobSizeViolationOperation("RespondActivityTaskFailedById"),
); err != nil {
// heartbeat details exceed size limit, we would fail the activity immediately with explicit error reason
response.Failures = append(response.Failures, failure.NewServerFailure(common.FailureReasonHeartbeatExceedsLimit, true))
response.Failures = append(response.Failures, failure.NewServerFailure(common.TerminationReasonHeartbeatExceedsLimit.String(), true))

// do not send heartbeat to history service
request.LastHeartbeatDetails = nil
Expand All @@ -1633,7 +1633,7 @@ func (wh *WorkflowHandler) RespondActivityTaskFailedById(ctx context.Context, re
wh.throttledLogger,
tag.BlobSizeViolationOperation("RespondActivityTaskFailedById"),
); err != nil {
serverFailure := failure.NewServerFailure(common.FailureReasonFailureExceedsLimit, true)
serverFailure := failure.NewServerFailure(common.TerminationReasonFailureExceedsLimit.String(), true)
serverFailure.Cause = failure.Truncate(request.Failure, sizeLimitWarn)
request.Failure = serverFailure

Expand Down Expand Up @@ -1699,7 +1699,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCanceled(ctx context.Context, requ
// details exceeds blob size limit, we would record it as failure
failRequest := &workflowservice.RespondActivityTaskFailedRequest{
TaskToken: request.TaskToken,
Failure: failure.NewServerFailure(common.FailureReasonCancelDetailsExceedsLimit, true),
Failure: failure.NewServerFailure(common.TerminationReasonCancelDetailsExceedsLimit.String(), true),
Identity: request.Identity,
}
_, err = wh.historyClient.RespondActivityTaskFailed(ctx, &historyservice.RespondActivityTaskFailedRequest{
Expand Down Expand Up @@ -1790,7 +1790,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCanceledById(ctx context.Context,
// details exceeds blob size limit, we would record it as failure
failRequest := &workflowservice.RespondActivityTaskFailedRequest{
TaskToken: token,
Failure: failure.NewServerFailure(common.FailureReasonCancelDetailsExceedsLimit, true),
Failure: failure.NewServerFailure(common.TerminationReasonCancelDetailsExceedsLimit.String(), true),
Identity: request.Identity,
}
_, err = wh.historyClient.RespondActivityTaskFailed(ctx, &historyservice.RespondActivityTaskFailedRequest{
Expand Down
2 changes: 1 addition & 1 deletion service/history/api/respondworkflowtaskcompleted/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ func (handler *WorkflowTaskCompletedHandler) Invoke(

if err := workflow.TerminateWorkflow(
ms,
common.FailureReasonTransactionSizeExceedsLimit,
common.TerminationReasonTransactionSizeExceedsLimit.String(),
payloads.EncodeString(updateErr.Error()),
consts.IdentityHistoryService,
false,
Expand Down
6 changes: 3 additions & 3 deletions service/history/consts/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,11 @@ var (
// ErrEmptyHistoryRawEventBatch indicate that one single batch of history raw events is of size 0
ErrEmptyHistoryRawEventBatch = serviceerror.NewInvalidArgument("encountered empty history batch")
// ErrHistorySizeExceedsLimit is error indicating workflow execution has exceeded system defined history size limit
ErrHistorySizeExceedsLimit = serviceerror.NewInvalidArgument(common.FailureReasonHistorySizeExceedsLimit)
ErrHistorySizeExceedsLimit = serviceerror.NewInvalidArgument(common.TerminationReasonHistorySizeExceedsLimit.String())
// ErrHistoryCountExceedsLimit is error indicating workflow execution has exceeded system defined history count limit
ErrHistoryCountExceedsLimit = serviceerror.NewInvalidArgument(common.FailureReasonHistoryCountExceedsLimit)
ErrHistoryCountExceedsLimit = serviceerror.NewInvalidArgument(common.TerminationReasonHistoryCountExceedsLimit.String())
// ErrMutableStateSizeExceedsLimit is error indicating workflow execution has exceeded system defined mutable state size limit
ErrMutableStateSizeExceedsLimit = serviceerror.NewInvalidArgument(common.FailureReasonMutableStateSizeExceedsLimit)
ErrMutableStateSizeExceedsLimit = serviceerror.NewInvalidArgument(common.TerminationReasonMutableStateSizeExceedsLimit.String())
// ErrUnknownCluster is error indicating unknown cluster
ErrUnknownCluster = serviceerror.NewInvalidArgument("unknown cluster")
// ErrBufferedQueryCleared is error indicating mutable state is cleared while buffered query is pending
Expand Down
15 changes: 9 additions & 6 deletions service/history/workflow/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -990,7 +990,7 @@
) (bool, error) {
// Hard terminate workflow if still running and breached history size limit
if c.maxHistorySizeExceeded(shardContext) {
if err := c.forceTerminateWorkflow(ctx, shardContext, common.FailureReasonHistorySizeExceedsLimit); err != nil {
if err := c.forceTerminateWorkflow(ctx, shardContext, common.TerminationReasonHistorySizeExceedsLimit); err != nil {
return false, err
}
// Return true to caller to indicate workflow state is overwritten to force terminate execution on update
Expand Down Expand Up @@ -1028,7 +1028,7 @@
) (bool, error) {
// Hard terminate workflow if still running and breached history count limit
if c.maxHistoryCountExceeded(shardContext) {
if err := c.forceTerminateWorkflow(ctx, shardContext, common.FailureReasonHistoryCountExceedsLimit); err != nil {
if err := c.forceTerminateWorkflow(ctx, shardContext, common.TerminationReasonHistoryCountExceedsLimit); err != nil {
return false, err
}
// Return true to caller to indicate workflow state is overwritten to force terminate execution on update
Expand Down Expand Up @@ -1064,7 +1064,7 @@
// TODO: ideally this check should be after closing mutable state tx, but that would require a large refactor
func (c *ContextImpl) enforceMutableStateSizeCheck(ctx context.Context, shardContext shard.Context) (bool, error) {
if c.maxMutableStateSizeExceeded() {
if err := c.forceTerminateWorkflow(ctx, shardContext, common.FailureReasonMutableStateSizeExceedsLimit); err != nil {
if err := c.forceTerminateWorkflow(ctx, shardContext, common.TerminationReasonMutableStateSizeExceedsLimit); err != nil {
return false, err
}
// Return true to caller to indicate workflow state is overwritten to force terminate execution on update
Expand Down Expand Up @@ -1100,7 +1100,7 @@
func (c *ContextImpl) forceTerminateWorkflow(
ctx context.Context,
shardContext shard.Context,
failureReason string,
failureReason common.WorkflowTerminationReason,
) error {
if !c.MutableState.IsWorkflowExecutionRunning() {
return nil
Expand All @@ -1120,15 +1120,18 @@
return err
}

return TerminateWorkflow(
err = TerminateWorkflow(
mutableState,
failureReason,
failureReason.String(),
nil,
consts.IdentityHistoryService,
false,
nil, // No links necessary.
)
if err != nil {
return err
}
}

Check failure on line 1134 in service/history/workflow/context.go

View workflow job for this annotation

GitHub Actions / golangci

missing return) (typecheck)

Check failure on line 1134 in service/history/workflow/context.go

View workflow job for this annotation

GitHub Actions / golangci

missing return) (typecheck)

Check failure on line 1134 in service/history/workflow/context.go

View workflow job for this annotation

GitHub Actions / golangci

missing return) (typecheck)

Check failure on line 1134 in service/history/workflow/context.go

View workflow job for this annotation

GitHub Actions / Pre-build for cache

missing return

// CacheSize estimates the in-memory size of the object for cache limits. For proto objects, it uses proto.Size()
// which returns the serialized size. Note: In-memory size will be slightly larger than the serialized size.
Expand Down
2 changes: 1 addition & 1 deletion service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -5244,7 +5244,7 @@ func (ms *MutableStateImpl) truncateRetryableActivityFailure(
// nonRetryable is set to false here as only retryable failures are recorded in mutable state.
// also when this method is called, the check for isRetryable is already done, so the value
// is only for visibility/debugging purpose.
serverFailure := failure.NewServerFailure(common.FailureReasonFailureExceedsLimit, false)
serverFailure := failure.NewServerFailure(common.TerminationReasonFailureExceedsLimit.String(), false)
serverFailure.Cause = failure.Truncate(activityFailure, sizeLimitError)
return serverFailure
}
Expand Down
6 changes: 3 additions & 3 deletions tests/sizelimit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ SignalLoop:
}
// Signalling workflow should result in force terminating the workflow execution and returns with ResourceExhausted
// error. InvalidArgument is returned by the client.
s.EqualError(signalErr, common.FailureReasonHistoryCountExceedsLimit)
s.EqualError(signalErr, common.TerminationReasonHistoryCountExceedsLimit.String())
s.IsType(&serviceerror.InvalidArgument{}, signalErr)

historyEvents := s.GetHistory(s.Namespace(), &commonpb.WorkflowExecution{
Expand Down Expand Up @@ -451,7 +451,7 @@ func (s *SizeLimitFunctionalSuite) TestTerminateWorkflowCausedByMsSizeLimit() {
s.Logger.Info("PollAndProcessWorkflowTask", tag.Error(err))

// Workflow should be force terminated at this point
s.EqualError(err, common.FailureReasonMutableStateSizeExceedsLimit)
s.EqualError(err, common.TerminationReasonMutableStateSizeExceedsLimit.String())
}

// Send another signal without RunID
Expand Down Expand Up @@ -559,7 +559,7 @@ SignalLoop:
}
// Signalling workflow should result in force terminating the workflow execution and returns with ResourceExhausted
// error. InvalidArgument is returned by the client.
s.EqualError(signalErr, common.FailureReasonHistorySizeExceedsLimit)
s.EqualError(signalErr, common.TerminationReasonHistorySizeExceedsLimit.String())
s.IsType(&serviceerror.InvalidArgument{}, signalErr)

historyEvents := s.GetHistory(s.Namespace(), &commonpb.WorkflowExecution{
Expand Down
Loading