From 1a937c924cb6ba69d6eb2adf76014b0648bb181c Mon Sep 17 00:00:00 2001 From: yux0 Date: Fri, 20 Dec 2024 15:55:00 -0800 Subject: [PATCH 1/3] Retry handover error during graceful failover --- common/rpc/interceptor/namespace_validator.go | 36 +++++- .../interceptor/namespace_validator_test.go | 107 +++++++++++++++++- common/util.go | 4 + 3 files changed, 143 insertions(+), 4 deletions(-) diff --git a/common/rpc/interceptor/namespace_validator.go b/common/rpc/interceptor/namespace_validator.go index eea16a7a623..3c5d2d85e5f 100644 --- a/common/rpc/interceptor/namespace_validator.go +++ b/common/rpc/interceptor/namespace_validator.go @@ -26,6 +26,7 @@ package interceptor import ( "context" + "time" enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/operatorservice/v1" @@ -34,11 +35,15 @@ import ( "go.temporal.io/server/api/adminservice/v1" "go.temporal.io/server/common" "go.temporal.io/server/common/api" + "go.temporal.io/server/common/backoff" "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/namespace" "google.golang.org/grpc" ) +// Use max handover allow timeout as retry interval +var handoverRetryPolicy = backoff.NewExponentialRetryPolicy(time.Millisecond * 100).WithExpirationInterval(time.Second * 10) + type ( TaskTokenGetter interface { GetTaskToken() []byte @@ -227,7 +232,7 @@ func (ni *NamespaceValidatorInterceptor) StateValidationIntercept( return nil, err } - return handler(ctx, req) + return ni.handleRequestWithHandoverRetry(ctx, req, info, handler, namespaceEntry) } // ValidateState validates: @@ -237,10 +242,35 @@ func (ni *NamespaceValidatorInterceptor) StateValidationIntercept( // 4. Namespace from request match namespace from task token, if check is enabled with dynamic config. // 5. Namespace is in correct state. func (ni *NamespaceValidatorInterceptor) ValidateState(namespaceEntry *namespace.Namespace, fullMethod string) error { - if err := ni.checkNamespaceState(namespaceEntry, fullMethod); err != nil { + return ni.checkNamespaceState(namespaceEntry, fullMethod) +} + +func (ni *NamespaceValidatorInterceptor) handleRequestWithHandoverRetry( + ctx context.Context, + req interface{}, + info *grpc.UnaryServerInfo, + handler grpc.UnaryHandler, + nsEntry *namespace.Namespace, +) (interface{}, error) { + + var response interface{} + op := func(ctx context.Context) error { + err := ni.checkReplicationState(nsEntry, info.FullMethod) + if err != nil { + var nsErr error + nsEntry, nsErr = ni.namespaceRegistry.RefreshNamespaceById(nsEntry.ID()) + if nsErr != nil { + return nsErr + } + return err + } + + response, err = handler(ctx, req) return err } - return ni.checkReplicationState(namespaceEntry, fullMethod) + // Only retry on ErrNamespaceHandover, other errors will be handled by the retry interceptor + err := backoff.ThrottleRetryContext(ctx, op, handoverRetryPolicy, common.IsNamespaceHandoverError) + return response, err } func (ni *NamespaceValidatorInterceptor) extractNamespace(req interface{}) (*namespace.Namespace, error) { diff --git a/common/rpc/interceptor/namespace_validator_test.go b/common/rpc/interceptor/namespace_validator_test.go index d66246b34c1..4140e96608a 100644 --- a/common/rpc/interceptor/namespace_validator_test.go +++ b/common/rpc/interceptor/namespace_validator_test.go @@ -26,6 +26,7 @@ package interceptor import ( "context" + "errors" "fmt" "testing" @@ -209,7 +210,7 @@ func (s *namespaceValidatorSuite) Test_StateValidationIntercept_StatusFromNamesp { state: enumspb.NAMESPACE_STATE_REGISTERED, replicationState: enumspb.REPLICATION_STATE_HANDOVER, - expectedErr: common.ErrNamespaceHandover, + expectedErr: errors.New("error refresh"), method: api.WorkflowServicePrefix + "StartWorkflowExecution", req: &workflowservice.StartWorkflowExecutionRequest{Namespace: "test-namespace"}, }, @@ -409,6 +410,10 @@ func (s *namespaceValidatorSuite) Test_StateValidationIntercept_StatusFromNamesp }, }), nil) } + if _, allow := allowedMethodsDuringHandover[api.MethodName(testCase.method)]; !allow && + testCase.replicationState == enumspb.REPLICATION_STATE_HANDOVER { + s.mockRegistry.EXPECT().RefreshNamespaceById(namespace.ID("")).Return(nil, errors.New("error refresh")) + } nvi := NewNamespaceValidatorInterceptor( s.mockRegistry, @@ -953,3 +958,103 @@ func (s *namespaceValidatorSuite) TestSetNamespace() { nvi.setNamespace(namespaceEntry, failActivityTaskReq) s.Equal(namespaceRequestName, failActivityTaskReq.Namespace) } + +func (s *namespaceValidatorSuite) Test_HandleRequestWithHandoverRetry() { + testCases := []struct { + replicationState enumspb.ReplicationState + expectedErr error + method string + req any + }{ + { + replicationState: enumspb.REPLICATION_STATE_NORMAL, + expectedErr: nil, + method: api.WorkflowServicePrefix + "StartWorkflowExecution", + req: &workflowservice.StartWorkflowExecutionRequest{Namespace: "test-namespace"}, + }, + { + replicationState: enumspb.REPLICATION_STATE_HANDOVER, + expectedErr: nil, + method: api.WorkflowServicePrefix + "GetReplicationMessages", + req: &workflowservice.StartWorkflowExecutionRequest{Namespace: "test-namespace"}, + }, + { + replicationState: enumspb.REPLICATION_STATE_HANDOVER, + expectedErr: nil, + method: api.WorkflowServicePrefix + "StartWorkflowExecution", + req: &workflowservice.StartWorkflowExecutionRequest{Namespace: "test-namespace"}, + }, + { + replicationState: enumspb.REPLICATION_STATE_HANDOVER, + expectedErr: errors.New("refresh fail"), + method: api.WorkflowServicePrefix + "StartWorkflowExecution", + req: &workflowservice.StartWorkflowExecutionRequest{Namespace: "test-namespace-2"}, + }, + } + + for _, testCase := range testCases { + nvi := NewNamespaceValidatorInterceptor( + s.mockRegistry, + dynamicconfig.GetBoolPropertyFn(false), + dynamicconfig.GetIntPropertyFn(100)) + serverInfo := &grpc.UnaryServerInfo{ + FullMethod: testCase.method, + } + reqWithNamespace, hasNamespace := testCase.req.(NamespaceNameGetter) + if !hasNamespace { + s.Fail("invalid test case without namespace") + } + namespaceName := reqWithNamespace.GetNamespace() + + if testCase.expectedErr != nil { + s.mockRegistry.EXPECT().RefreshNamespaceById(namespace.ID(namespaceName)).Return(nil, errors.New("refresh fail")) + } + + if _, allow := allowedMethodsDuringHandover[api.MethodName(testCase.method)]; !allow && testCase.replicationState == enumspb.REPLICATION_STATE_HANDOVER && testCase.expectedErr == nil { + s.mockRegistry.EXPECT().RefreshNamespaceById(namespace.ID(namespaceName)).Return(namespace.FromPersistentState( + &persistence.GetNamespaceResponse{ + Namespace: &persistencespb.NamespaceDetail{ + Config: &persistencespb.NamespaceConfig{}, + ReplicationConfig: &persistencespb.NamespaceReplicationConfig{ + State: enumspb.REPLICATION_STATE_NORMAL, + }, + Info: &persistencespb.NamespaceInfo{ + Id: namespaceName, + Name: namespaceName, + State: enumspb.NAMESPACE_STATE_REGISTERED, + }, + }, + }, + ), nil) + } + + handlerCalled := false + _, err := nvi.handleRequestWithHandoverRetry(context.Background(), testCase.req, serverInfo, func(ctx context.Context, req interface{}) (interface{}, error) { + handlerCalled = true + return &workflowservice.StartWorkflowExecutionResponse{}, nil + }, + namespace.FromPersistentState( + &persistence.GetNamespaceResponse{ + Namespace: &persistencespb.NamespaceDetail{ + Config: &persistencespb.NamespaceConfig{}, + ReplicationConfig: &persistencespb.NamespaceReplicationConfig{ + State: testCase.replicationState, + }, + Info: &persistencespb.NamespaceInfo{ + Id: namespaceName, + Name: namespaceName, + State: enumspb.NAMESPACE_STATE_REGISTERED, + }, + }, + }, + )) + + if testCase.expectedErr != nil { + s.IsType(testCase.expectedErr, err) + s.False(handlerCalled) + } else { + s.NoError(err) + s.True(handlerCalled) + } + } +} diff --git a/common/util.go b/common/util.go index 03b7ed32ae3..44dc9d57013 100644 --- a/common/util.go +++ b/common/util.go @@ -357,6 +357,10 @@ func IsServiceHandlerRetryableError(err error) bool { return false } +func IsNamespaceHandoverError(err error) bool { + return err.Error() == ErrNamespaceHandover.Error() +} + func IsStickyWorkerUnavailable(err error) bool { switch err.(type) { case *serviceerrors.StickyWorkerUnavailable: From 08accbbbe0c55803039b03d891a2538977f84b16 Mon Sep 17 00:00:00 2001 From: yux0 Date: Thu, 23 Jan 2025 10:14:36 -0800 Subject: [PATCH 2/3] address comment --- common/metrics/metric_defs.go | 2 +- common/rpc/interceptor/namespace_validator.go | 21 +++++++---- .../interceptor/namespace_validator_test.go | 37 +------------------ common/rpc/interceptor/telemetry.go | 6 ++- 4 files changed, 22 insertions(+), 44 deletions(-) diff --git a/common/metrics/metric_defs.go b/common/metrics/metric_defs.go index 7360e81c825..7a30159ae1d 100644 --- a/common/metrics/metric_defs.go +++ b/common/metrics/metric_defs.go @@ -1182,8 +1182,8 @@ var ( // Replication NamespaceReplicationTaskAckLevelGauge = NewGaugeDef("namespace_replication_task_ack_level") - NamespaceReplicationDLQAckLevelGauge = NewGaugeDef("namespace_dlq_ack_level") NamespaceReplicationDLQMaxLevelGauge = NewGaugeDef("namespace_dlq_max_level") + NamespaceHandoverRetryLatency = NewTimerDef("namespace_handover_retry_latency") // Persistence PersistenceRequests = NewCounterDef( diff --git a/common/rpc/interceptor/namespace_validator.go b/common/rpc/interceptor/namespace_validator.go index 3c5d2d85e5f..a18b1ec6e1c 100644 --- a/common/rpc/interceptor/namespace_validator.go +++ b/common/rpc/interceptor/namespace_validator.go @@ -26,6 +26,7 @@ package interceptor import ( "context" + "go.temporal.io/server/common/metrics" "time" enumspb "go.temporal.io/api/enums/v1" @@ -41,8 +42,8 @@ import ( "google.golang.org/grpc" ) -// Use max handover allow timeout as retry interval -var handoverRetryPolicy = backoff.NewExponentialRetryPolicy(time.Millisecond * 100).WithExpirationInterval(time.Second * 10) +// Limit the max attempts to reduce the handover error return to users +var handoverRetryPolicy = backoff.NewConstantDelayRetryPolicy(time.Millisecond * 100).WithMaximumAttempts(20) type ( TaskTokenGetter interface { @@ -254,22 +255,28 @@ func (ni *NamespaceValidatorInterceptor) handleRequestWithHandoverRetry( ) (interface{}, error) { var response interface{} + var retryCount int + var retryLatency time.Duration op := func(ctx context.Context) error { err := ni.checkReplicationState(nsEntry, info.FullMethod) if err != nil { - var nsErr error - nsEntry, nsErr = ni.namespaceRegistry.RefreshNamespaceById(nsEntry.ID()) - if nsErr != nil { - return nsErr - } return err } response, err = handler(ctx, req) + if retryCount > 1 { + retryLatency = retryLatency + handoverRetryPolicy.ComputeNextDelay(0, retryCount, nil) + } + retryCount++ return err } // Only retry on ErrNamespaceHandover, other errors will be handled by the retry interceptor err := backoff.ThrottleRetryContext(ctx, op, handoverRetryPolicy, common.IsNamespaceHandoverError) + + if retryCount > 1 { + // API Latency compensate logic on handover error retry + metrics.ContextCounterAdd(ctx, metrics.NamespaceHandoverRetryLatency.Name(), retryLatency.Nanoseconds()) + } return response, err } diff --git a/common/rpc/interceptor/namespace_validator_test.go b/common/rpc/interceptor/namespace_validator_test.go index 4140e96608a..e4cc6ef524e 100644 --- a/common/rpc/interceptor/namespace_validator_test.go +++ b/common/rpc/interceptor/namespace_validator_test.go @@ -26,7 +26,6 @@ package interceptor import ( "context" - "errors" "fmt" "testing" @@ -210,7 +209,7 @@ func (s *namespaceValidatorSuite) Test_StateValidationIntercept_StatusFromNamesp { state: enumspb.NAMESPACE_STATE_REGISTERED, replicationState: enumspb.REPLICATION_STATE_HANDOVER, - expectedErr: errors.New("error refresh"), + expectedErr: common.ErrNamespaceHandover, method: api.WorkflowServicePrefix + "StartWorkflowExecution", req: &workflowservice.StartWorkflowExecutionRequest{Namespace: "test-namespace"}, }, @@ -410,10 +409,6 @@ func (s *namespaceValidatorSuite) Test_StateValidationIntercept_StatusFromNamesp }, }), nil) } - if _, allow := allowedMethodsDuringHandover[api.MethodName(testCase.method)]; !allow && - testCase.replicationState == enumspb.REPLICATION_STATE_HANDOVER { - s.mockRegistry.EXPECT().RefreshNamespaceById(namespace.ID("")).Return(nil, errors.New("error refresh")) - } nvi := NewNamespaceValidatorInterceptor( s.mockRegistry, @@ -980,16 +975,10 @@ func (s *namespaceValidatorSuite) Test_HandleRequestWithHandoverRetry() { }, { replicationState: enumspb.REPLICATION_STATE_HANDOVER, - expectedErr: nil, + expectedErr: common.ErrNamespaceHandover, method: api.WorkflowServicePrefix + "StartWorkflowExecution", req: &workflowservice.StartWorkflowExecutionRequest{Namespace: "test-namespace"}, }, - { - replicationState: enumspb.REPLICATION_STATE_HANDOVER, - expectedErr: errors.New("refresh fail"), - method: api.WorkflowServicePrefix + "StartWorkflowExecution", - req: &workflowservice.StartWorkflowExecutionRequest{Namespace: "test-namespace-2"}, - }, } for _, testCase := range testCases { @@ -1006,28 +995,6 @@ func (s *namespaceValidatorSuite) Test_HandleRequestWithHandoverRetry() { } namespaceName := reqWithNamespace.GetNamespace() - if testCase.expectedErr != nil { - s.mockRegistry.EXPECT().RefreshNamespaceById(namespace.ID(namespaceName)).Return(nil, errors.New("refresh fail")) - } - - if _, allow := allowedMethodsDuringHandover[api.MethodName(testCase.method)]; !allow && testCase.replicationState == enumspb.REPLICATION_STATE_HANDOVER && testCase.expectedErr == nil { - s.mockRegistry.EXPECT().RefreshNamespaceById(namespace.ID(namespaceName)).Return(namespace.FromPersistentState( - &persistence.GetNamespaceResponse{ - Namespace: &persistencespb.NamespaceDetail{ - Config: &persistencespb.NamespaceConfig{}, - ReplicationConfig: &persistencespb.NamespaceReplicationConfig{ - State: enumspb.REPLICATION_STATE_NORMAL, - }, - Info: &persistencespb.NamespaceInfo{ - Id: namespaceName, - Name: namespaceName, - State: enumspb.NAMESPACE_STATE_REGISTERED, - }, - }, - }, - ), nil) - } - handlerCalled := false _, err := nvi.handleRequestWithHandoverRetry(context.Background(), testCase.req, serverInfo, func(ctx context.Context, req interface{}) (interface{}, error) { handlerCalled = true diff --git a/common/rpc/interceptor/telemetry.go b/common/rpc/interceptor/telemetry.go index 0b789f08d87..be2371272fe 100644 --- a/common/rpc/interceptor/telemetry.go +++ b/common/rpc/interceptor/telemetry.go @@ -215,10 +215,14 @@ func (ti *TelemetryInterceptor) RecordLatencyMetrics(ctx context.Context, startT userLatencyDuration = time.Duration(val) metrics.ServiceLatencyUserLatency.With(metricsHandler).Record(userLatencyDuration) } + handoverRetryLatency := time.Duration(0) + if val, ok := metrics.ContextCounterGet(ctx, metrics.NamespaceHandoverRetryLatency.Name()); ok { + handoverRetryLatency = time.Duration(val) + } latency := time.Since(startTime) metrics.ServiceLatency.With(metricsHandler).Record(latency) - noUserLatency := max(0, latency-userLatencyDuration) + noUserLatency := max(0, latency-userLatencyDuration-handoverRetryLatency) metrics.ServiceLatencyNoUserLatency.With(metricsHandler).Record(noUserLatency) } From d2e278190d4f152bf70839b6f1bb78b008e5eeb5 Mon Sep 17 00:00:00 2001 From: yux0 Date: Thu, 23 Jan 2025 10:19:30 -0800 Subject: [PATCH 3/3] sort import --- common/rpc/interceptor/namespace_validator.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/common/rpc/interceptor/namespace_validator.go b/common/rpc/interceptor/namespace_validator.go index a18b1ec6e1c..b3b11913908 100644 --- a/common/rpc/interceptor/namespace_validator.go +++ b/common/rpc/interceptor/namespace_validator.go @@ -26,7 +26,6 @@ package interceptor import ( "context" - "go.temporal.io/server/common/metrics" "time" enumspb "go.temporal.io/api/enums/v1" @@ -38,6 +37,7 @@ import ( "go.temporal.io/server/common/api" "go.temporal.io/server/common/backoff" "go.temporal.io/server/common/dynamicconfig" + "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" "google.golang.org/grpc" ) @@ -256,7 +256,7 @@ func (ni *NamespaceValidatorInterceptor) handleRequestWithHandoverRetry( var response interface{} var retryCount int - var retryLatency time.Duration + retryLatency := time.Duration(0) op := func(ctx context.Context) error { err := ni.checkReplicationState(nsEntry, info.FullMethod) if err != nil {