diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index 58542f1793803..ab7c3ff9ada0b 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -2521,7 +2521,7 @@ func (rc *Client) restoreMetaKvEntries( log.Debug("after rewrite entry", zap.Int("new-key-len", len(newEntry.Key)), zap.Int("new-value-len", len(entry.e.Value)), zap.ByteString("new-key", newEntry.Key)) - if err := rc.rawKVClient.Put(ctx, newEntry.Key, newEntry.Value, entry.ts); err != nil { + if err := PutRawKvWithRetry(ctx, rc.rawKVClient, newEntry.Key, newEntry.Value, entry.ts); err != nil { return 0, 0, errors.Trace(err) } @@ -2933,3 +2933,13 @@ func (b *waitTiFlashBackoffer) NextBackoff(error) time.Duration { func (b *waitTiFlashBackoffer) Attempt() int { return b.Attempts } + +func PutRawKvWithRetry(ctx context.Context, client *RawKVBatchClient, key, value []byte, originTs uint64) error { + err := utils.WithRetry(ctx, func() error { + return client.Put(ctx, key, value, originTs) + }, utils.NewRawClientBackoffStrategy()) + if err != nil { + return errors.Errorf("failed to put raw kv after retry") + } + return nil +} diff --git a/br/pkg/restore/client_test.go b/br/pkg/restore/client_test.go index f208371563f22..fefa7682bb71a 100644 --- a/br/pkg/restore/client_test.go +++ b/br/pkg/restore/client_test.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/tidb/parser/types" "github.com/pingcap/tidb/tablecodec" "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/rawkv" pd "github.com/tikv/pd/client" "golang.org/x/exp/slices" "google.golang.org/grpc/keepalive" @@ -1865,3 +1866,69 @@ func TestCheckNewCollationEnable(t *testing.T) { require.Equal(t, ca.newCollationEnableInCluster == "True", enabled) } } + +type mockRawKVClient struct { + restore.RawkvClient + putCount int + errThreshold int +} + +func (m *mockRawKVClient) BatchPut(ctx context.Context, keys, values [][]byte, options ...rawkv.RawOption) error { + m.putCount += 1 + if m.errThreshold >= m.putCount { + return errors.New("rpcClient is idle") + } + return nil +} + +func TestPutRawKvWithRetry(t *testing.T) { + tests := []struct { + name string + errThreshold int + cancelAfter time.Duration + wantErr string + wantPuts int + }{ + { + name: "success on first try", + errThreshold: 0, + wantPuts: 1, + }, + { + name: "success on after failure", + errThreshold: 2, + wantPuts: 3, + }, + { + name: "fails all retries", + errThreshold: 5, + wantErr: "failed to put raw kv after retry", + wantPuts: 5, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mockRawClient := &mockRawKVClient{ + errThreshold: tt.errThreshold, + } + client := restore.NewRawKVBatchClient(mockRawClient, 1) + + ctx := context.Background() + if tt.cancelAfter > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, tt.cancelAfter) + defer cancel() + } + + err := restore.PutRawKvWithRetry(ctx, client, []byte("key"), []byte("value"), 1) + + if tt.wantErr != "" { + require.ErrorContains(t, err, tt.wantErr) + } else { + require.NoError(t, err) + } + require.Equal(t, tt.wantPuts, mockRawClient.putCount) + }) + } +} diff --git a/br/pkg/restore/log_client/BUILD.bazel b/br/pkg/restore/log_client/BUILD.bazel deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/br/pkg/utils/backoff.go b/br/pkg/utils/backoff.go index f2d6ddfba2f13..196e717d8c11b 100644 --- a/br/pkg/utils/backoff.go +++ b/br/pkg/utils/backoff.go @@ -41,6 +41,10 @@ const ( flashbackRetryTime = 3 flashbackWaitInterval = 3000 * time.Millisecond flashbackMaxWaitInterval = 15 * time.Second + + rawClientMaxAttempts = 5 + rawClientDelayTime = 500 * time.Millisecond + rawClientMaxDelayTime = 5 * time.Second ) // ConstantBackoff is a backoffer that retry forever until success. @@ -289,3 +293,35 @@ func (bo *flashbackBackoffer) NextBackoff(err error) time.Duration { func (bo *flashbackBackoffer) Attempt() int { return bo.attempt } + +type RawClientBackoffStrategy struct { + Attempts int + BaseBackoff time.Duration + MaxBackoff time.Duration +} + +func NewRawClientBackoffStrategy() Backoffer { + return &RawClientBackoffStrategy{ + Attempts: rawClientMaxAttempts, + BaseBackoff: rawClientDelayTime, + MaxBackoff: rawClientMaxDelayTime, + } +} + +// NextBackoff returns a duration to wait before retrying again +func (b *RawClientBackoffStrategy) NextBackoff(error) time.Duration { + bo := b.BaseBackoff + b.Attempts-- + if b.Attempts == 0 { + return 0 + } + b.BaseBackoff *= 2 + if b.BaseBackoff > b.MaxBackoff { + b.BaseBackoff = b.MaxBackoff + } + return bo +} + +func (b *RawClientBackoffStrategy) Attempt() int { + return b.Attempts +} diff --git a/build/BUILD.bazel b/build/BUILD.bazel index aa885fdec93c0..feeea36cef866 100644 --- a/build/BUILD.bazel +++ b/build/BUILD.bazel @@ -1,9 +1,9 @@ -package(default_visibility = ["//visibility:public"]) - -load("@io_bazel_rules_go//go:def.bzl", "nogo") load("@bazel_skylib//rules:common_settings.bzl", "bool_flag") +load("@io_bazel_rules_go//go:def.bzl", "nogo") load("//build/linter/staticcheck:def.bzl", "staticcheck_analyzers") +package(default_visibility = ["//visibility:public"]) + bool_flag( name = "with_nogo_flag", build_setting_default = False,