Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry handover error during graceful failover #7028

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 33 additions & 3 deletions common/rpc/interceptor/namespace_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ package interceptor

import (
"context"
"time"

enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/operatorservice/v1"
Expand All @@ -34,11 +35,15 @@ import (
"go.temporal.io/server/api/adminservice/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/api"
"go.temporal.io/server/common/backoff"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/namespace"
"google.golang.org/grpc"
)

// Use max handover allow timeout as retry interval
var handoverRetryPolicy = backoff.NewExponentialRetryPolicy(time.Millisecond * 100).WithExpirationInterval(time.Second * 10)

type (
TaskTokenGetter interface {
GetTaskToken() []byte
Expand Down Expand Up @@ -227,7 +232,7 @@ func (ni *NamespaceValidatorInterceptor) StateValidationIntercept(
return nil, err
}

return handler(ctx, req)
return ni.handleRequestWithHandoverRetry(ctx, req, info, handler, namespaceEntry)
}

// ValidateState validates:
Expand All @@ -237,10 +242,35 @@ func (ni *NamespaceValidatorInterceptor) StateValidationIntercept(
// 4. Namespace from request match namespace from task token, if check is enabled with dynamic config.
// 5. Namespace is in correct state.
func (ni *NamespaceValidatorInterceptor) ValidateState(namespaceEntry *namespace.Namespace, fullMethod string) error {
if err := ni.checkNamespaceState(namespaceEntry, fullMethod); err != nil {
return ni.checkNamespaceState(namespaceEntry, fullMethod)
}

func (ni *NamespaceValidatorInterceptor) handleRequestWithHandoverRetry(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
nsEntry *namespace.Namespace,
) (interface{}, error) {

var response interface{}
op := func(ctx context.Context) error {
err := ni.checkReplicationState(nsEntry, info.FullMethod)
if err != nil {
var nsErr error
nsEntry, nsErr = ni.namespaceRegistry.RefreshNamespaceById(nsEntry.ID())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm this will keep refreshing the namespace? I mean all requests will perform a refresh upon retry.

also if the handover error is from history service then the refresh here is unnecessary

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense. We can also just wait for ns refresh not do this proactive refresh.

if nsErr != nil {
return nsErr
}
return err
}

response, err = handler(ctx, req)
return err
}
return ni.checkReplicationState(namespaceEntry, fullMethod)
// Only retry on ErrNamespaceHandover, other errors will be handled by the retry interceptor
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we just let retry interceptor handle the error? and this interceptor just block until ns is no longer in handover state?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not go with this because retry interceptor should be the most inner interceptor. If that is the case, can I add a handover interceptor after the retry interceptor?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't remember why we moved retry interceptor to be the most inner interceptor, but I think there must be a reason. I think we should fully understand that to make sure the retry logic here won't cause any issue.

can I add a handover interceptor after the retry interceptor?

hmmm then why not just retry handover error in retry interceptor?

I guess I was proposing something different in my comment, i.e. block on ns state change instead of retry (by registering a callback to namespace registry).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cannot wait for ns callback as the error might return from history nodes and there is no callback from frontend.

err := backoff.ThrottleRetryContext(ctx, op, handoverRetryPolicy, common.IsNamespaceHandoverError)
yycptt marked this conversation as resolved.
Show resolved Hide resolved
return response, err
}

func (ni *NamespaceValidatorInterceptor) extractNamespace(req interface{}) (*namespace.Namespace, error) {
Expand Down
107 changes: 106 additions & 1 deletion common/rpc/interceptor/namespace_validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ package interceptor

import (
"context"
"errors"
"fmt"
"testing"

Expand Down Expand Up @@ -209,7 +210,7 @@ func (s *namespaceValidatorSuite) Test_StateValidationIntercept_StatusFromNamesp
{
state: enumspb.NAMESPACE_STATE_REGISTERED,
replicationState: enumspb.REPLICATION_STATE_HANDOVER,
expectedErr: common.ErrNamespaceHandover,
expectedErr: errors.New("error refresh"),
method: api.WorkflowServicePrefix + "StartWorkflowExecution",
req: &workflowservice.StartWorkflowExecutionRequest{Namespace: "test-namespace"},
},
Expand Down Expand Up @@ -409,6 +410,10 @@ func (s *namespaceValidatorSuite) Test_StateValidationIntercept_StatusFromNamesp
},
}), nil)
}
if _, allow := allowedMethodsDuringHandover[api.MethodName(testCase.method)]; !allow &&
testCase.replicationState == enumspb.REPLICATION_STATE_HANDOVER {
s.mockRegistry.EXPECT().RefreshNamespaceById(namespace.ID("")).Return(nil, errors.New("error refresh"))
}

nvi := NewNamespaceValidatorInterceptor(
s.mockRegistry,
Expand Down Expand Up @@ -953,3 +958,103 @@ func (s *namespaceValidatorSuite) TestSetNamespace() {
nvi.setNamespace(namespaceEntry, failActivityTaskReq)
s.Equal(namespaceRequestName, failActivityTaskReq.Namespace)
}

func (s *namespaceValidatorSuite) Test_HandleRequestWithHandoverRetry() {
testCases := []struct {
replicationState enumspb.ReplicationState
expectedErr error
method string
req any
}{
{
replicationState: enumspb.REPLICATION_STATE_NORMAL,
expectedErr: nil,
method: api.WorkflowServicePrefix + "StartWorkflowExecution",
req: &workflowservice.StartWorkflowExecutionRequest{Namespace: "test-namespace"},
},
{
replicationState: enumspb.REPLICATION_STATE_HANDOVER,
expectedErr: nil,
method: api.WorkflowServicePrefix + "GetReplicationMessages",
req: &workflowservice.StartWorkflowExecutionRequest{Namespace: "test-namespace"},
},
{
replicationState: enumspb.REPLICATION_STATE_HANDOVER,
expectedErr: nil,
method: api.WorkflowServicePrefix + "StartWorkflowExecution",
req: &workflowservice.StartWorkflowExecutionRequest{Namespace: "test-namespace"},
},
{
replicationState: enumspb.REPLICATION_STATE_HANDOVER,
expectedErr: errors.New("refresh fail"),
method: api.WorkflowServicePrefix + "StartWorkflowExecution",
req: &workflowservice.StartWorkflowExecutionRequest{Namespace: "test-namespace-2"},
},
}

for _, testCase := range testCases {
nvi := NewNamespaceValidatorInterceptor(
s.mockRegistry,
dynamicconfig.GetBoolPropertyFn(false),
dynamicconfig.GetIntPropertyFn(100))
serverInfo := &grpc.UnaryServerInfo{
FullMethod: testCase.method,
}
reqWithNamespace, hasNamespace := testCase.req.(NamespaceNameGetter)
if !hasNamespace {
s.Fail("invalid test case without namespace")
}
namespaceName := reqWithNamespace.GetNamespace()

if testCase.expectedErr != nil {
s.mockRegistry.EXPECT().RefreshNamespaceById(namespace.ID(namespaceName)).Return(nil, errors.New("refresh fail"))
}

if _, allow := allowedMethodsDuringHandover[api.MethodName(testCase.method)]; !allow && testCase.replicationState == enumspb.REPLICATION_STATE_HANDOVER && testCase.expectedErr == nil {
s.mockRegistry.EXPECT().RefreshNamespaceById(namespace.ID(namespaceName)).Return(namespace.FromPersistentState(
&persistence.GetNamespaceResponse{
Namespace: &persistencespb.NamespaceDetail{
Config: &persistencespb.NamespaceConfig{},
ReplicationConfig: &persistencespb.NamespaceReplicationConfig{
State: enumspb.REPLICATION_STATE_NORMAL,
},
Info: &persistencespb.NamespaceInfo{
Id: namespaceName,
Name: namespaceName,
State: enumspb.NAMESPACE_STATE_REGISTERED,
},
},
},
), nil)
}

handlerCalled := false
_, err := nvi.handleRequestWithHandoverRetry(context.Background(), testCase.req, serverInfo, func(ctx context.Context, req interface{}) (interface{}, error) {
handlerCalled = true
return &workflowservice.StartWorkflowExecutionResponse{}, nil
},
namespace.FromPersistentState(
&persistence.GetNamespaceResponse{
Namespace: &persistencespb.NamespaceDetail{
Config: &persistencespb.NamespaceConfig{},
ReplicationConfig: &persistencespb.NamespaceReplicationConfig{
State: testCase.replicationState,
},
Info: &persistencespb.NamespaceInfo{
Id: namespaceName,
Name: namespaceName,
State: enumspb.NAMESPACE_STATE_REGISTERED,
},
},
},
))

if testCase.expectedErr != nil {
s.IsType(testCase.expectedErr, err)
s.False(handlerCalled)
} else {
s.NoError(err)
s.True(handlerCalled)
}
}
}
4 changes: 4 additions & 0 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,10 @@ func IsServiceHandlerRetryableError(err error) bool {
return false
}

func IsNamespaceHandoverError(err error) bool {
return err.Error() == ErrNamespaceHandover.Error()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can you replace other places that perform the same change to use this func? e.g. L341 above.

}

func IsStickyWorkerUnavailable(err error) bool {
switch err.(type) {
case *serviceerrors.StickyWorkerUnavailable:
Expand Down
Loading