Skip to content

Commit

Permalink
br: Release 8.5 20250114 v8.5.0 (#58981)
Browse files Browse the repository at this point in the history
  • Loading branch information
YuJuncen authored Jan 17, 2025
1 parent 417359d commit 0950fe3
Show file tree
Hide file tree
Showing 117 changed files with 9,179 additions and 2,497 deletions.
12 changes: 6 additions & 6 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -5776,13 +5776,13 @@ def go_deps():
name = "com_github_pingcap_kvproto",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/kvproto",
sha256 = "6f2a7d747d05ae61a8f4a3c066058fa69f724480f8dc4a427d66fd066ce730c7",
strip_prefix = "github.com/pingcap/[email protected]20240924080114-4a3e17f5e62d",
sha256 = "f032df69d754f19adf7ac245dade0fdaeb12323e314c462feae8d095d777a681",
strip_prefix = "github.com/pingcap/[email protected]20250116085028-ef010e9196a4",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20240924080114-4a3e17f5e62d.zip",
"http://ats.apps.svc/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20240924080114-4a3e17f5e62d.zip",
"https://cache.hawkingrei.com/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20240924080114-4a3e17f5e62d.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20240924080114-4a3e17f5e62d.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20250116085028-ef010e9196a4.zip",
"http://ats.apps.svc/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20250116085028-ef010e9196a4.zip",
"https://cache.hawkingrei.com/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20250116085028-ef010e9196a4.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20250116085028-ef010e9196a4.zip",
],
)
go_repository(
Expand Down
80 changes: 80 additions & 0 deletions br/cmd/br/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ func newOperatorCommand() *cobra.Command {
cmd.AddCommand(newPrepareForSnapshotBackupCommand(
"prepare-for-snapshot-backup",
"pause gc, schedulers and importing until the program exits, for snapshot backup."))
cmd.AddCommand(newBase64ifyCommand())
cmd.AddCommand(newListMigrationsCommand())
cmd.AddCommand(newMigrateToCommand())
cmd.AddCommand(newChecksumCommand())
return cmd
}

Expand All @@ -52,3 +56,79 @@ func newPrepareForSnapshotBackupCommand(use string, short string) *cobra.Command
operator.DefineFlagsForPrepareSnapBackup(cmd.Flags())
return cmd
}

func newBase64ifyCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "base64ify [-r] -s <storage>",
Short: "generate base64 for a storage. this may be passed to `tikv-ctl compact-log-backup`.",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
cfg := operator.Base64ifyConfig{}
if err := cfg.ParseFromFlags(cmd.Flags()); err != nil {
return err
}
ctx := GetDefaultContext()
return operator.Base64ify(ctx, cfg)
},
}
operator.DefineFlagsForBase64ifyConfig(cmd.Flags())
return cmd
}

func newListMigrationsCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "list-migrations",
Short: "list all migrations",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
cfg := operator.ListMigrationConfig{}
if err := cfg.ParseFromFlags(cmd.Flags()); err != nil {
return err
}
ctx := GetDefaultContext()
return operator.RunListMigrations(ctx, cfg)
},
}
operator.DefineFlagsForListMigrationConfig(cmd.Flags())
return cmd
}

func newMigrateToCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "migrate-to",
Short: "migrate to a specific version",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
cfg := operator.MigrateToConfig{}
if err := cfg.ParseFromFlags(cmd.Flags()); err != nil {
return err
}
ctx := GetDefaultContext()
return operator.RunMigrateTo(ctx, cfg)
},
}
operator.DefineFlagsForMigrateToConfig(cmd.Flags())
return cmd
}

func newChecksumCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "checksum-as",
Short: "calculate the checksum with rewrite rules",
Long: "Calculate the checksum of the current cluster (specified by `-u`) " +
"with applying the rewrite rules generated from a backup (specified by `-s`). " +
"This can be used when you have the checksum of upstream elsewhere.",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
cfg := operator.ChecksumWithRewriteRulesConfig{}
if err := cfg.ParseFromFlags(cmd.Flags()); err != nil {
return err
}
ctx := GetDefaultContext()
return operator.RunChecksumTable(ctx, tidbGlue, cfg)
},
}
task.DefineFilterFlags(cmd, []string{"!*.*"}, false)
operator.DefineFlagsForChecksumTableConfig(cmd.Flags())
return cmd
}
2 changes: 1 addition & 1 deletion br/pkg/checkpoint/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ go_test(
srcs = ["checkpoint_test.go"],
flaky = True,
race = "on",
shard_count = 8,
shard_count = 9,
deps = [
":checkpoint",
"//br/pkg/gluetidb",
Expand Down
9 changes: 1 addition & 8 deletions br/pkg/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,7 @@ type RangeType struct {
*rtree.Range
}

func (r RangeType) IdentKey() []byte {
return r.StartKey
}

type ValueType interface {
IdentKey() []byte
}
type ValueType any

type CheckpointMessage[K KeyType, V ValueType] struct {
// start-key of the origin range
Expand Down Expand Up @@ -261,7 +255,6 @@ func (r *CheckpointRunner[K, V]) WaitForFinish(ctx context.Context, flush bool)
// wait the range flusher exit
r.wg.Wait()
// remove the checkpoint lock
r.checkpointStorage.deleteLock(ctx)
r.checkpointStorage.close()
}

Expand Down
115 changes: 87 additions & 28 deletions br/pkg/checkpoint/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestCheckpointMetaForRestore(t *testing.T) {
},
},
}
err = checkpoint.SaveCheckpointMetadataForSnapshotRestore(ctx, se, checkpointMetaForSnapshotRestore)
err = checkpoint.SaveCheckpointMetadataForSstRestore(ctx, se, checkpoint.SnapshotRestoreCheckpointDatabaseName, checkpointMetaForSnapshotRestore)
require.NoError(t, err)
checkpointMetaForSnapshotRestore2, err := checkpoint.LoadCheckpointMetadataForSnapshotRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor())
require.NoError(t, err)
Expand Down Expand Up @@ -278,9 +278,9 @@ func TestCheckpointRestoreRunner(t *testing.T) {
se, err := g.CreateSession(s.Mock.Storage)
require.NoError(t, err)

err = checkpoint.SaveCheckpointMetadataForSnapshotRestore(ctx, se, &checkpoint.CheckpointMetadataForSnapshotRestore{})
err = checkpoint.SaveCheckpointMetadataForSstRestore(ctx, se, checkpoint.SnapshotRestoreCheckpointDatabaseName, &checkpoint.CheckpointMetadataForSnapshotRestore{})
require.NoError(t, err)
checkpointRunner, err := checkpoint.StartCheckpointRestoreRunnerForTest(ctx, se, 5*time.Second, 3*time.Second)
checkpointRunner, err := checkpoint.StartCheckpointRestoreRunnerForTest(ctx, se, checkpoint.SnapshotRestoreCheckpointDatabaseName, 5*time.Second, 3*time.Second)
require.NoError(t, err)

data := map[string]struct {
Expand Down Expand Up @@ -310,7 +310,7 @@ func TestCheckpointRestoreRunner(t *testing.T) {
}

for _, d := range data {
err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 1, d.RangeKey)
err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, checkpoint.NewCheckpointRangeKeyItem(1, d.RangeKey))
require.NoError(t, err)
}

Expand All @@ -320,7 +320,7 @@ func TestCheckpointRestoreRunner(t *testing.T) {
checkpointRunner.FlushChecksum(ctx, 4, 4, 4, 4)

for _, d := range data2 {
err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 2, d.RangeKey)
err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, checkpoint.NewCheckpointRangeKeyItem(2, d.RangeKey))
require.NoError(t, err)
}

Expand All @@ -343,7 +343,7 @@ func TestCheckpointRestoreRunner(t *testing.T) {
respCount += 1
}

_, err = checkpoint.LoadCheckpointDataForSnapshotRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor(), checker)
_, err = checkpoint.LoadCheckpointDataForSstRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor(), checkpoint.SnapshotRestoreCheckpointDatabaseName, checker)
require.NoError(t, err)
require.Equal(t, 4, respCount)

Expand All @@ -355,10 +355,10 @@ func TestCheckpointRestoreRunner(t *testing.T) {
require.Equal(t, checksum[i].Crc64xor, uint64(i))
}

err = checkpoint.RemoveCheckpointDataForSnapshotRestore(ctx, s.Mock.Domain, se)
err = checkpoint.RemoveCheckpointDataForSstRestore(ctx, s.Mock.Domain, se, checkpoint.SnapshotRestoreCheckpointDatabaseName)
require.NoError(t, err)

exists := checkpoint.ExistsSnapshotRestoreCheckpoint(ctx, s.Mock.Domain)
exists := checkpoint.ExistsSstRestoreCheckpoint(ctx, s.Mock.Domain, checkpoint.SnapshotRestoreCheckpointDatabaseName)
require.False(t, exists)
exists = s.Mock.Domain.InfoSchema().SchemaExists(pmodel.NewCIStr(checkpoint.SnapshotRestoreCheckpointDatabaseName))
require.False(t, exists)
Expand All @@ -371,9 +371,9 @@ func TestCheckpointRunnerRetry(t *testing.T) {
se, err := g.CreateSession(s.Mock.Storage)
require.NoError(t, err)

err = checkpoint.SaveCheckpointMetadataForSnapshotRestore(ctx, se, &checkpoint.CheckpointMetadataForSnapshotRestore{})
err = checkpoint.SaveCheckpointMetadataForSstRestore(ctx, se, checkpoint.SnapshotRestoreCheckpointDatabaseName, &checkpoint.CheckpointMetadataForSnapshotRestore{})
require.NoError(t, err)
checkpointRunner, err := checkpoint.StartCheckpointRestoreRunnerForTest(ctx, se, 100*time.Millisecond, 300*time.Millisecond)
checkpointRunner, err := checkpoint.StartCheckpointRestoreRunnerForTest(ctx, se, checkpoint.SnapshotRestoreCheckpointDatabaseName, 100*time.Millisecond, 300*time.Millisecond)
require.NoError(t, err)

err = failpoint.Enable("github.com/pingcap/tidb/br/pkg/checkpoint/failed-after-checkpoint-flushes", "return(true)")
Expand All @@ -382,32 +382,33 @@ func TestCheckpointRunnerRetry(t *testing.T) {
err = failpoint.Disable("github.com/pingcap/tidb/br/pkg/checkpoint/failed-after-checkpoint-flushes")
require.NoError(t, err)
}()
err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 1, "123")
err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, checkpoint.NewCheckpointRangeKeyItem(1, "123"))
require.NoError(t, err)
err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 2, "456")
err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, checkpoint.NewCheckpointRangeKeyItem(2, "456"))
require.NoError(t, err)
err = checkpointRunner.FlushChecksum(ctx, 1, 1, 1, 1)
require.NoError(t, err)
err = checkpointRunner.FlushChecksum(ctx, 2, 2, 2, 2)
time.Sleep(time.Second)
err = failpoint.Disable("github.com/pingcap/tidb/br/pkg/checkpoint/failed-after-checkpoint-flushes")
require.NoError(t, err)
err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 3, "789")
err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, checkpoint.NewCheckpointRangeKeyItem(3, "789"))
require.NoError(t, err)
err = checkpointRunner.FlushChecksum(ctx, 3, 3, 3, 3)
require.NoError(t, err)
checkpointRunner.WaitForFinish(ctx, true)
se, err = g.CreateSession(s.Mock.Storage)
require.NoError(t, err)
recordSet := make(map[string]int)
_, err = checkpoint.LoadCheckpointDataForSnapshotRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor(),
func(tableID int64, rangeKey checkpoint.RestoreValueType) {
recordSet[fmt.Sprintf("%d_%s", tableID, rangeKey)] += 1
_, err = checkpoint.LoadCheckpointDataForSstRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor(),
checkpoint.SnapshotRestoreCheckpointDatabaseName,
func(tableID int64, v checkpoint.RestoreValueType) {
recordSet[fmt.Sprintf("%d_%s", tableID, v.RangeKey)] += 1
})
require.NoError(t, err)
require.LessOrEqual(t, 1, recordSet["1_{123}"])
require.LessOrEqual(t, 1, recordSet["2_{456}"])
require.LessOrEqual(t, 1, recordSet["3_{789}"])
require.LessOrEqual(t, 1, recordSet["1_123"])
require.LessOrEqual(t, 1, recordSet["2_456"])
require.LessOrEqual(t, 1, recordSet["3_789"])
items, _, err := checkpoint.LoadCheckpointChecksumForRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor())
require.NoError(t, err)
require.Equal(t, fmt.Sprintf("%d_%d_%d", items[1].Crc64xor, items[1].TotalBytes, items[1].TotalKvs), "1_1_1")
Expand All @@ -422,14 +423,14 @@ func TestCheckpointRunnerNoRetry(t *testing.T) {
se, err := g.CreateSession(s.Mock.Storage)
require.NoError(t, err)

err = checkpoint.SaveCheckpointMetadataForSnapshotRestore(ctx, se, &checkpoint.CheckpointMetadataForSnapshotRestore{})
err = checkpoint.SaveCheckpointMetadataForSstRestore(ctx, se, checkpoint.SnapshotRestoreCheckpointDatabaseName, &checkpoint.CheckpointMetadataForSnapshotRestore{})
require.NoError(t, err)
checkpointRunner, err := checkpoint.StartCheckpointRestoreRunnerForTest(ctx, se, 100*time.Millisecond, 300*time.Millisecond)
checkpointRunner, err := checkpoint.StartCheckpointRestoreRunnerForTest(ctx, se, checkpoint.SnapshotRestoreCheckpointDatabaseName, 100*time.Millisecond, 300*time.Millisecond)
require.NoError(t, err)

err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 1, "123")
err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, checkpoint.NewCheckpointRangeKeyItem(1, "123"))
require.NoError(t, err)
err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 2, "456")
err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, checkpoint.NewCheckpointRangeKeyItem(2, "456"))
require.NoError(t, err)
err = checkpointRunner.FlushChecksum(ctx, 1, 1, 1, 1)
require.NoError(t, err)
Expand All @@ -440,13 +441,14 @@ func TestCheckpointRunnerNoRetry(t *testing.T) {
se, err = g.CreateSession(s.Mock.Storage)
require.NoError(t, err)
recordSet := make(map[string]int)
_, err = checkpoint.LoadCheckpointDataForSnapshotRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor(),
func(tableID int64, rangeKey checkpoint.RestoreValueType) {
recordSet[fmt.Sprintf("%d_%s", tableID, rangeKey)] += 1
_, err = checkpoint.LoadCheckpointDataForSstRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor(),
checkpoint.SnapshotRestoreCheckpointDatabaseName,
func(tableID int64, v checkpoint.RestoreValueType) {
recordSet[fmt.Sprintf("%d_%s", tableID, v.RangeKey)] += 1
})
require.NoError(t, err)
require.Equal(t, 1, recordSet["1_{123}"])
require.Equal(t, 1, recordSet["2_{456}"])
require.Equal(t, 1, recordSet["1_123"])
require.Equal(t, 1, recordSet["2_456"])
items, _, err := checkpoint.LoadCheckpointChecksumForRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor())
require.NoError(t, err)
require.Equal(t, fmt.Sprintf("%d_%d_%d", items[1].Crc64xor, items[1].TotalBytes, items[1].TotalKvs), "1_1_1")
Expand Down Expand Up @@ -584,3 +586,60 @@ func TestCheckpointRunnerLock(t *testing.T) {

runner.WaitForFinish(ctx, true)
}

func TestCheckpointCompactedRestoreRunner(t *testing.T) {
ctx := context.Background()
s := utiltest.CreateRestoreSchemaSuite(t)
g := gluetidb.New()
se, err := g.CreateSession(s.Mock.Storage)
require.NoError(t, err)

err = checkpoint.SaveCheckpointMetadataForSstRestore(ctx, se, checkpoint.CustomSSTRestoreCheckpointDatabaseName, nil)
require.NoError(t, err)
checkpointRunner, err := checkpoint.StartCheckpointRestoreRunnerForTest(ctx, se, checkpoint.CustomSSTRestoreCheckpointDatabaseName, 500*time.Millisecond, time.Second)
require.NoError(t, err)

data := map[string]struct {
Name string
}{
"a": {Name: "a"},
"A": {Name: "A"},
"1": {Name: "1"},
}

for _, d := range data {
err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, checkpoint.NewCheckpointFileItem(1, d.Name))
require.NoError(t, err)
}

checkpointRunner.FlushChecksum(ctx, 1, 1, 1, 1)
checkpointRunner.FlushChecksum(ctx, 2, 2, 2, 2)

checkpointRunner.WaitForFinish(ctx, true)

se, err = g.CreateSession(s.Mock.Storage)
require.NoError(t, err)
respCount := 0
checker := func(tableID int64, resp checkpoint.RestoreValueType) {
require.NotNil(t, resp)
d, ok := data[resp.Name]
require.True(t, ok)
require.Equal(t, d.Name, resp.Name)
respCount++
}

exists := checkpoint.ExistsSstRestoreCheckpoint(ctx, s.Mock.Domain, checkpoint.CustomSSTRestoreCheckpointDatabaseName)
require.True(t, exists)

_, err = checkpoint.LoadCheckpointDataForSstRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor(), checkpoint.CustomSSTRestoreCheckpointDatabaseName, checker)
require.NoError(t, err)
require.Equal(t, 3, respCount)

err = checkpoint.RemoveCheckpointDataForSstRestore(ctx, s.Mock.Domain, se, checkpoint.CustomSSTRestoreCheckpointDatabaseName)
require.NoError(t, err)

exists = checkpoint.ExistsSstRestoreCheckpoint(ctx, s.Mock.Domain, checkpoint.CustomSSTRestoreCheckpointDatabaseName)
require.False(t, exists)
exists = s.Mock.Domain.InfoSchema().SchemaExists(pmodel.NewCIStr(checkpoint.CustomSSTRestoreCheckpointDatabaseName))
require.False(t, exists)
}
9 changes: 0 additions & 9 deletions br/pkg/checkpoint/external_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,12 +187,3 @@ func (s *externalCheckpointStorage) updateLock(ctx context.Context) error {

return nil
}

func (s *externalCheckpointStorage) deleteLock(ctx context.Context) {
if s.lockId > 0 {
err := s.storage.DeleteFile(ctx, s.CheckpointLockPath)
if err != nil {
log.Warn("failed to remove the checkpoint lock", zap.Error(err))
}
}
}
Loading

0 comments on commit 0950fe3

Please sign in to comment.