From 0d18ea0616c2c9c8235daa7c5249e5a04354d89f Mon Sep 17 00:00:00 2001 From: Ruihao Chen Date: Thu, 2 Jan 2025 18:04:05 +0800 Subject: [PATCH 01/15] Add hint for check SQL --- sync_diff_inspector/chunk/chunk.go | 7 ++- sync_diff_inspector/diff.go | 1 - sync_diff_inspector/source/mysql_shard.go | 4 +- sync_diff_inspector/source/tidb.go | 4 +- sync_diff_inspector/splitter/bucket.go | 10 ++++ sync_diff_inspector/splitter/index_fields.go | 2 +- sync_diff_inspector/splitter/random.go | 49 ++++++++++++------- sync_diff_inspector/splitter/splitter_test.go | 2 +- sync_diff_inspector/utils/utils.go | 33 +++++++++++-- sync_diff_inspector/utils/utils_test.go | 2 +- 10 files changed, 83 insertions(+), 31 deletions(-) diff --git a/sync_diff_inspector/chunk/chunk.go b/sync_diff_inspector/chunk/chunk.go index 6ac71cf51..439e7cfff 100644 --- a/sync_diff_inspector/chunk/chunk.go +++ b/sync_diff_inspector/chunk/chunk.go @@ -147,8 +147,9 @@ type Range struct { IsFirst bool `json:"is-first"` IsLast bool `json:"is-last"` - Where string `json:"where"` - Args []interface{} `json:"args"` + Where string `json:"where"` + Args []interface{} `json:"args"` + IndexHint string `json:"index-hint"` columnOffset map[string]int } @@ -386,6 +387,7 @@ func (c *Range) Update(column, lower, upper string, updateLower, updateUpper boo func (c *Range) Copy() *Range { newChunk := NewChunkRange() + newChunk.IndexHint = c.IndexHint for _, bound := range c.Bounds { newChunk.addBound(&Bound{ Column: bound.Column, @@ -401,6 +403,7 @@ func (c *Range) Copy() *Range { func (c *Range) Clone() *Range { newChunk := NewChunkRange() + newChunk.IndexHint = c.IndexHint for _, bound := range c.Bounds { newChunk.addBound(&Bound{ Column: bound.Column, diff --git a/sync_diff_inspector/diff.go b/sync_diff_inspector/diff.go index b46885d0c..c0efc364b 100644 --- a/sync_diff_inspector/diff.go +++ b/sync_diff_inspector/diff.go @@ -609,7 +609,6 @@ func (df *Diff) compareChecksumAndGetCount(ctx context.Context, tableRange *spli if downstreamInfo.Err != nil { log.Warn("failed to compare downstream checksum") return false, -1, -1, errors.Trace(downstreamInfo.Err) - } if upstreamInfo.Count == downstreamInfo.Count && upstreamInfo.Checksum == downstreamInfo.Checksum { diff --git a/sync_diff_inspector/source/mysql_shard.go b/sync_diff_inspector/source/mysql_shard.go index afeb056c4..59e5e1adf 100644 --- a/sync_diff_inspector/source/mysql_shard.go +++ b/sync_diff_inspector/source/mysql_shard.go @@ -103,7 +103,9 @@ func (s *MySQLSources) GetCountAndMd5(ctx context.Context, tableRange *splitter. for _, ms := range matchSources { go func(ms *common.TableShardSource) { - count, checksum, err := utils.GetCountAndMd5Checksum(ctx, ms.DBConn, ms.OriginSchema, ms.OriginTable, table.Info, chunk.Where, chunk.Args) + count, checksum, err := utils.GetCountAndMd5Checksum( + ctx, ms.DBConn, ms.OriginSchema, ms.OriginTable, table.Info, + chunk.Where, chunk.IndexHint, chunk.Args) infoCh <- &ChecksumInfo{ Checksum: checksum, Count: count, diff --git a/sync_diff_inspector/source/tidb.go b/sync_diff_inspector/source/tidb.go index 4504b27c2..a181546d7 100644 --- a/sync_diff_inspector/source/tidb.go +++ b/sync_diff_inspector/source/tidb.go @@ -126,7 +126,9 @@ func (s *TiDBSource) GetCountAndMd5(ctx context.Context, tableRange *splitter.Ra chunk := tableRange.GetChunk() matchSource := getMatchSource(s.sourceTableMap, table) - count, checksum, err := utils.GetCountAndMd5Checksum(ctx, s.dbConn, matchSource.OriginSchema, matchSource.OriginTable, table.Info, chunk.Where, chunk.Args) + count, checksum, err := utils.GetCountAndMd5Checksum( + ctx, s.dbConn, matchSource.OriginSchema, matchSource.OriginTable, table.Info, + chunk.Where, chunk.IndexHint, chunk.Args) cost := time.Since(beginTime) return &ChecksumInfo{ diff --git a/sync_diff_inspector/splitter/bucket.go b/sync_diff_inspector/splitter/bucket.go index 9d886494c..ac0d9e921 100644 --- a/sync_diff_inspector/splitter/bucket.go +++ b/sync_diff_inspector/splitter/bucket.go @@ -16,6 +16,7 @@ package splitter import ( "context" "database/sql" + "fmt" "sync" "github.com/pingcap/errors" @@ -48,6 +49,7 @@ type BucketIterator struct { errCh chan error cancel context.CancelFunc indexID int64 + indexName string progressID string dbConn *sql.DB @@ -192,6 +194,7 @@ NEXTINDEX: s.buckets = bucket s.indexColumns = indexColumns s.indexID = index.ID + s.indexName = index.Name.L break } @@ -242,9 +245,13 @@ func (s *BucketIterator) produceChunks(ctx context.Context, startRange *RangeInf }() var ( lowerValues, upperValues []string + indexHint string latestCount int64 err error ) + if len(s.indexName) > 0 { + indexHint = fmt.Sprintf("force index(%s)", s.indexName) + } firstBucket := 0 if startRange != nil { c := startRange.GetChunk() @@ -278,6 +285,7 @@ func (s *BucketIterator) produceChunks(ctx context.Context, startRange *RangeInf leftCnt := c.Index.ChunkCnt - c.Index.ChunkIndex - 1 if leftCnt > 0 { chunkRange := chunk.NewChunkRange() + chunkRange.IndexHint = indexHint for i, column := range s.indexColumns { chunkRange.Update(column.Name.O, "", nextUpperValues[i], false, true) @@ -310,6 +318,7 @@ func (s *BucketIterator) produceChunks(ctx context.Context, startRange *RangeInf } chunkRange := chunk.NewChunkRange() + chunkRange.IndexHint = indexHint for j, column := range s.indexColumns { var lowerValue, upperValue string if len(lowerValues) > 0 { @@ -347,6 +356,7 @@ func (s *BucketIterator) produceChunks(ctx context.Context, startRange *RangeInf // merge the rest keys into one chunk chunkRange := chunk.NewChunkRange() + chunkRange.IndexHint = indexHint if len(lowerValues) > 0 { for j, column := range s.indexColumns { chunkRange.Update(column.Name.O, lowerValues[j], "", true, false) diff --git a/sync_diff_inspector/splitter/index_fields.go b/sync_diff_inspector/splitter/index_fields.go index eb9659813..c1d9ea5be 100644 --- a/sync_diff_inspector/splitter/index_fields.go +++ b/sync_diff_inspector/splitter/index_fields.go @@ -47,7 +47,7 @@ func indexFieldsFromConfigString(strFields string, tableInfo *model.TableInfo) ( splitFieldArr[i] = strings.TrimSpace(splitFieldArr[i]) } - fields, err := GetSplitFields(tableInfo, splitFieldArr) + fields, _, err := GetSplitFields(tableInfo, splitFieldArr) if err != nil { return nil, errors.Trace(err) } diff --git a/sync_diff_inspector/splitter/random.go b/sync_diff_inspector/splitter/random.go index 33fe7272b..64bba5896 100644 --- a/sync_diff_inspector/splitter/random.go +++ b/sync_diff_inspector/splitter/random.go @@ -17,6 +17,7 @@ import ( "context" "database/sql" "fmt" + "slices" "strings" "github.com/pingcap/errors" @@ -55,12 +56,14 @@ func NewRandomIteratorWithCheckpoint(ctx context.Context, progressID string, tab splitFieldArr[i] = strings.TrimSpace(splitFieldArr[i]) } - fields, err := GetSplitFields(table.Info, splitFieldArr) + fields, indexName, err := GetSplitFields(table.Info, splitFieldArr) if err != nil { return nil, errors.Trace(err) } chunkRange := chunk.NewChunkRange() + chunkRange.IndexHint = fmt.Sprintf("force index(%s)", indexName) + beginIndex := 0 bucketChunkCnt := 0 chunkCnt := 0 @@ -165,48 +168,58 @@ func (s *RandomIterator) Close() { } // GetSplitFields returns fields to split chunks, order by pk, uk, index, columns. -func GetSplitFields(table *model.TableInfo, splitFields []string) ([]*model.ColumnInfo, error) { +// Return the columns, corresponding index name and error +func GetSplitFields(table *model.TableInfo, splitFields []string) ([]*model.ColumnInfo, string, error) { colsMap := make(map[string]*model.ColumnInfo) splitCols := make([]*model.ColumnInfo, 0, 2) for _, splitField := range splitFields { col := dbutil.FindColumnByName(table.Columns, splitField) if col == nil { - return nil, errors.NotFoundf("column %s in table %s", splitField, table.Name) + return nil, "", errors.NotFoundf("column %s in table %s", splitField, table.Name) } splitCols = append(splitCols, col) } if len(splitCols) != 0 { - return splitCols, nil + return splitCols, "", nil } for _, col := range table.Columns { colsMap[col.Name.O] = col } + + // First try to get column from index indices := dbutil.FindAllIndex(table) - if len(indices) != 0 { - NEXTINDEX: - for _, idx := range indices { - cols := make([]*model.ColumnInfo, 0, len(table.Columns)) - for _, icol := range idx.Columns { - col := colsMap[icol.Name.O] - if col.Hidden { - continue NEXTINDEX - } - cols = append(cols, col) - } - return cols, nil + slices.SortFunc(indices, func(i1, i2 *model.IndexInfo) int { + if i1.Primary { + return -1 + } + if i2.Primary { + return 1 + } + if i1.Unique { + return -1 + } + if i2.Unique { + return -1 + } + return 0 + }) + for _, idx := range indices { + icol := idx.Columns[0] + if col, ok := colsMap[icol.Name.O]; ok && !col.Hidden { + return []*model.ColumnInfo{col}, idx.Name.L, nil } } for _, col := range table.Columns { if !col.Hidden { - return []*model.ColumnInfo{col}, nil + return []*model.ColumnInfo{col}, "", nil } } - return nil, errors.NotFoundf("not found column") + return nil, "", errors.NotFoundf("not found column") } // splitRangeByRandom splits a chunk to multiple chunks by random diff --git a/sync_diff_inspector/splitter/splitter_test.go b/sync_diff_inspector/splitter/splitter_test.go index 18577c60c..7c4156c43 100644 --- a/sync_diff_inspector/splitter/splitter_test.go +++ b/sync_diff_inspector/splitter/splitter_test.go @@ -142,7 +142,7 @@ func TestSplitRangeByRandom(t *testing.T) { tableInfo, err := dbutil.GetTableInfoBySQL(testCase.createTableSQL, parser.New()) require.NoError(t, err) - splitCols, err := GetSplitFields(tableInfo, nil) + splitCols, _, err := GetSplitFields(tableInfo, nil) require.NoError(t, err) createFakeResultForRandomSplit(mock, 0, testCase.randomValues) chunks, err := splitRangeByRandom(context.Background(), db, testCase.originChunk, testCase.splitCount, "test", "test", splitCols, "", "") diff --git a/sync_diff_inspector/utils/utils.go b/sync_diff_inspector/utils/utils.go index 382bfe02d..e8dac8fe6 100644 --- a/sync_diff_inspector/utils/utils.go +++ b/sync_diff_inspector/utils/utils.go @@ -766,7 +766,15 @@ func GetTableSize(ctx context.Context, db *sql.DB, schemaName, tableName string) } // GetCountAndMd5Checksum returns checksum code and count of some data by given condition -func GetCountAndMd5Checksum(ctx context.Context, db *sql.DB, schemaName, tableName string, tbInfo *model.TableInfo, limitRange string, args []interface{}) (int64, uint64, error) { +func GetCountAndMd5Checksum( + ctx context.Context, + db *sql.DB, + schemaName, tableName string, + tbInfo *model.TableInfo, + limitRange string, + indexHint string, + args []interface{}, +) (int64, uint64, error) { /* calculate MD5 checksum and count example: mysql> SELECT COUNT(*) as CNT, BIT_XOR(CAST(CONV(SUBSTRING(MD5(CONCAT_WS(',', `id`, `name`, CONCAT(ISNULL(`id`), ISNULL(`name`)))), 1, 16), 16, 10) AS UNSIGNED) ^ CAST(CONV(SUBSTRING(MD5(CONCAT_WS(',', `id`, `name`, CONCAT(ISNULL(`id`), ISNULL(`name`)))), 17, 16), 16, 10) AS UNSIGNED)) as CHECKSUM FROM `a`.`t`; @@ -796,19 +804,34 @@ func GetCountAndMd5Checksum(ctx context.Context, db *sql.DB, schemaName, tableNa columnIsNull = append(columnIsNull, fmt.Sprintf("ISNULL(%s)", name)) } - query := fmt.Sprintf("SELECT COUNT(*) as CNT, BIT_XOR(CAST(CONV(SUBSTRING(MD5(CONCAT_WS(',', %s, CONCAT(%s))), 1, 16), 16, 10) AS UNSIGNED) ^ CAST(CONV(SUBSTRING(MD5(CONCAT_WS(',', %s, CONCAT(%s))), 17, 16), 16, 10) AS UNSIGNED)) as CHECKSUM FROM %s WHERE %s;", - strings.Join(columnNames, ", "), strings.Join(columnIsNull, ", "), strings.Join(columnNames, ", "), strings.Join(columnIsNull, ", "), dbutil.TableName(schemaName, tableName), limitRange) + query := fmt.Sprintf(`SELECT + COUNT(*) as CNT, + BIT_XOR(CAST(CONV(SUBSTRING(MD5(CONCAT_WS(',', %s, CONCAT(%s))), 1, 16), 16, 10) AS UNSIGNED) ^ + CAST(CONV(SUBSTRING(MD5(CONCAT_WS(',', %s, CONCAT(%s))), 17, 16), 16, 10) AS UNSIGNED)) as CHECKSUM + FROM %s %s WHERE %s`, + strings.Join(columnNames, ", "), + strings.Join(columnIsNull, ", "), + strings.Join(columnNames, ", "), + strings.Join(columnIsNull, ", "), + dbutil.TableName(schemaName, tableName), + indexHint, + limitRange, + ) log.Debug("count and checksum", zap.String("sql", query), zap.Reflect("args", args)) var count sql.NullInt64 var checksum uint64 err := db.QueryRowContext(ctx, query, args...).Scan(&count, &checksum) if err != nil { - log.Warn("execute checksum query fail", zap.String("query", query), zap.Reflect("args", args), zap.Error(err)) + log.Warn("execute checksum query fail", + zap.String("query", query), + zap.Reflect("args", args), + zap.Error(err), + ) return -1, 0, errors.Trace(err) } if !count.Valid { - // if don't have any data, the checksum will be `NULL` + // If there are no data, the checksum will be `NULL` log.Warn("get empty count", zap.String("sql", query), zap.Reflect("args", args)) return 0, 0, nil } diff --git a/sync_diff_inspector/utils/utils_test.go b/sync_diff_inspector/utils/utils_test.go index 2131fa22c..9c64a56b7 100644 --- a/sync_diff_inspector/utils/utils_test.go +++ b/sync_diff_inspector/utils/utils_test.go @@ -271,7 +271,7 @@ func TestGetCountAndMd5Checksum(t *testing.T) { mock.ExpectQuery("SELECT COUNT.*FROM `test_schema`\\.`test_table` WHERE \\[23 45\\].*").WithArgs("123", "234").WillReturnRows(sqlmock.NewRows([]string{"CNT", "CHECKSUM"}).AddRow(123, 456)) - count, checksum, err := GetCountAndMd5Checksum(ctx, conn, "test_schema", "test_table", tableInfo, "[23 45]", []interface{}{"123", "234"}) + count, checksum, err := GetCountAndMd5Checksum(ctx, conn, "test_schema", "test_table", tableInfo, "[23 45]", "", []interface{}{"123", "234"}) require.NoError(t, err) require.Equal(t, count, int64(123)) require.Equal(t, checksum, uint64(0x1c8)) From 081c93b125826f6af9acd967348e471cc21f856b Mon Sep 17 00:00:00 2001 From: Ruihao Chen Date: Mon, 6 Jan 2025 12:09:33 +0800 Subject: [PATCH 02/15] Fix and add UT --- pkg/diff/spliter_test.go | 89 ++++++++++++++++++- sync_diff_inspector/splitter/bucket.go | 13 +-- sync_diff_inspector/splitter/index_fields.go | 45 ++++++++-- .../splitter/index_fields_test.go | 6 +- sync_diff_inspector/splitter/random.go | 84 +++++++++-------- 5 files changed, 175 insertions(+), 62 deletions(-) diff --git a/pkg/diff/spliter_test.go b/pkg/diff/spliter_test.go index 96b50f9d6..2ce6aa922 100644 --- a/pkg/diff/spliter_test.go +++ b/pkg/diff/spliter_test.go @@ -14,12 +14,19 @@ package diff import ( + "context" "fmt" + "strings" + "testing" sqlmock "github.com/DATA-DOG/go-sqlmock" . "github.com/pingcap/check" "github.com/pingcap/tidb-tools/pkg/dbutil" + "github.com/pingcap/tidb-tools/sync_diff_inspector/source/common" + "github.com/pingcap/tidb-tools/sync_diff_inspector/splitter" + "github.com/pingcap/tidb-tools/sync_diff_inspector/utils" "github.com/pingcap/tidb/pkg/parser" + "github.com/stretchr/testify/require" ) var _ = Suite(&testSpliterSuite{}) @@ -438,6 +445,78 @@ func (s *testSpliterSuite) TestBucketSpliter(c *C) { } } +func TestBucketSpliterHint(t *testing.T) { + db, mock, err := sqlmock.New() + require.NoError(t, err) + ctx := context.Background() + + testCases := []struct { + tableSQL string + indexCount int + expectedHint string + }{ + { + "create table `test`.`test`(`a` int, `b` int, `c` int, primary key(`a`, `b`), unique key i1(`c`))", + 0, + "force index(PRIMARY)", + }, + { + "create table `test`.`test`(`a` int, `b` int, `c` int, unique key i1(`c`))", + 0, + "force index(i1)", + }, + { + "create table `test`.`test`(`a` int, `b` int, `c` int, key i2(`b`))", + 1, + "force index(i2)", + }, + } + + for _, tc := range testCases { + tableInfo, err := dbutil.GetTableInfoBySQL(tc.tableSQL, parser.New()) + require.NoError(t, err) + + tableDiff := &common.TableDiff{ + Schema: "test", + Table: "test", + Info: tableInfo, + } + + createFakeResultForRangeHint(mock, tc.indexCount) + + iter, err := splitter.NewBucketIteratorWithCheckpoint(ctx, "", tableDiff, db, nil, utils.NewWorkerPool(1, "bucketIter")) + require.NoError(t, err) + chunk, err := iter.Next() + require.NoError(t, err) + require.Equal(t, strings.ToLower(tc.expectedHint), strings.ToLower(chunk.IndexHint)) + } +} + +func createFakeResultForRangeHint(mock sqlmock.Sqlmock, indexCount int) { + /* + +---------+------------+-------------+----------+-----------+-------+---------+-------------+-------------+ + | Db_name | Table_name | Column_name | Is_index | Bucket_id | Count | Repeats | Lower_Bound | Upper_Bound | + +---------+------------+-------------+----------+-----------+-------+---------+-------------+-------------+ + | test | test | PRIMARY | 1 | 0 | 64 | 1 | (0, 0) | (63, 11) | + | test | test | PRIMARY | 1 | 1 | 128 | 1 | (64, 12) | (127, 23) | + | test | test | PRIMARY | 1 | 2 | 192 | 1 | (128, 24) | (191, 35) | + | test | test | PRIMARY | 1 | 3 | 256 | 1 | (192, 36) | (255, 47) | + | test | test | PRIMARY | 1 | 4 | 320 | 1 | (256, 48) | (319, 59) | + +---------+------------+-------------+----------+-----------+-------+---------+-------------+-------------+ + */ + for i := 0; i < indexCount; i++ { + mock.ExpectQuery("SELECT COUNT\\(DISTINCT *").WillReturnRows(sqlmock.NewRows([]string{"SEL"}).AddRow("5")) + } + + statsRows := sqlmock.NewRows([]string{"Db_name", "Table_name", "Column_name", "Is_index", "Bucket_id", "Count", "Repeats", "Lower_Bound", "Upper_Bound"}) + for _, indexName := range []string{"PRIMARY", "i1", "i2", "i3", "i4"} { + for i := 0; i < 5; i++ { + statsRows.AddRow("test", "test", indexName, 1, (i+1)*64, (i+1)*64, 1, fmt.Sprintf("(%d, %d)", i*64, i*12), fmt.Sprintf("(%d, %d)", (i+1)*64-1, (i+1)*12-1)) + } + } + mock.ExpectQuery("SHOW STATS_BUCKETS").WillReturnRows(statsRows) +} + func createFakeResultForBucketSplit(mock sqlmock.Sqlmock, aRandomValues, bRandomValues []interface{}) { /* +---------+------------+-------------+----------+-----------+-------+---------+-------------+-------------+ @@ -452,10 +531,14 @@ func createFakeResultForBucketSplit(mock sqlmock.Sqlmock, aRandomValues, bRandom */ statsRows := sqlmock.NewRows([]string{"Db_name", "Table_name", "Column_name", "Is_index", "Bucket_id", "Count", "Repeats", "Lower_Bound", "Upper_Bound"}) - for i := 0; i < 5; i++ { - statsRows.AddRow("test", "test", "PRIMARY", 1, (i+1)*64, (i+1)*64, 1, fmt.Sprintf("(%d, %d)", i*64, i*12), fmt.Sprintf("(%d, %d)", (i+1)*64-1, (i+1)*12-1)) + for _, indexName := range []string{"PRIMARY", "i1", "i2", "i3", "i4"} { + for i := 0; i < 5; i++ { + statsRows.AddRow("test", "test", indexName, 1, (i+1)*64, (i+1)*64, 1, fmt.Sprintf("(%d, %d)", i*64, i*12), fmt.Sprintf("(%d, %d)", (i+1)*64-1, (i+1)*12-1)) + } } + mock.ExpectQuery("SHOW STATS_BUCKETS").WillReturnRows(statsRows) + mock.ExpectQuery("SELECT COUNT\\(DISTINCT *").WillReturnRows(sqlmock.NewRows([]string{"SEL"}).AddRow("5")) for i := 0; i < len(aRandomValues); i++ { aRandomRows := sqlmock.NewRows([]string{"a"}) @@ -466,6 +549,4 @@ func createFakeResultForBucketSplit(mock sqlmock.Sqlmock, aRandomValues, bRandom bRandomRows.AddRow(bRandomValues[i]) mock.ExpectQuery("ORDER BY rand_value").WillReturnRows(bRandomRows) } - - return } diff --git a/sync_diff_inspector/splitter/bucket.go b/sync_diff_inspector/splitter/bucket.go index ac0d9e921..195edaa9f 100644 --- a/sync_diff_inspector/splitter/bucket.go +++ b/sync_diff_inspector/splitter/bucket.go @@ -132,7 +132,7 @@ func (s *BucketIterator) Next() (*chunk.Range, error) { } func (s *BucketIterator) init(ctx context.Context, startRange *RangeInfo) error { - fields, err := indexFieldsFromConfigString(s.table.Fields, s.table.Info) + fields, indices, err := getFieldsAndIndex(s.table, s.dbConn, true) if err != nil { return err } @@ -143,17 +143,6 @@ func (s *BucketIterator) init(ctx context.Context, startRange *RangeInfo) error return errors.Trace(err) } - var indices []*model.IndexInfo - if fields.IsEmpty() { - indices, err = utils.GetBetterIndex(context.Background(), s.dbConn, s.table.Schema, s.table.Table, s.table.Info) - if err != nil { - return errors.Trace(err) - } - } else { - // There are user configured "index-fields", so we will try to match from all indices. - indices = dbutil.FindAllIndex(s.table.Info) - } - NEXTINDEX: for _, index := range indices { if index == nil { diff --git a/sync_diff_inspector/splitter/index_fields.go b/sync_diff_inspector/splitter/index_fields.go index c1d9ea5be..00343a2a4 100644 --- a/sync_diff_inspector/splitter/index_fields.go +++ b/sync_diff_inspector/splitter/index_fields.go @@ -14,11 +14,15 @@ package splitter import ( + "context" + "database/sql" "sort" "strings" "github.com/pingcap/errors" "github.com/pingcap/log" + "github.com/pingcap/tidb-tools/pkg/dbutil" + "github.com/pingcap/tidb-tools/sync_diff_inspector/source/common" "github.com/pingcap/tidb-tools/sync_diff_inspector/utils" "github.com/pingcap/tidb/pkg/meta/model" "go.uber.org/zap" @@ -31,23 +35,26 @@ type indexFields struct { empty bool } -func indexFieldsFromConfigString(strFields string, tableInfo *model.TableInfo) (*indexFields, error) { - if len(strFields) == 0 { - // Empty option - return &indexFields{empty: true}, nil - } - +func indexFieldsFromConfigString(strFields string, tableInfo *model.TableInfo, enableEmpty bool) (*indexFields, error) { if tableInfo == nil { log.Panic("parsing index fields with empty tableInfo", zap.String("index-fields", strFields)) } - splitFieldArr := strings.Split(strFields, ",") + if len(strFields) == 0 && enableEmpty { + // Empty option + return &indexFields{empty: true}, nil + } + + var splitFieldArr []string + if len(strFields) > 0 { + splitFieldArr = strings.Split(strFields, ",") + } for i := range splitFieldArr { splitFieldArr[i] = strings.TrimSpace(splitFieldArr[i]) } - fields, _, err := GetSplitFields(tableInfo, splitFieldArr) + fields, err := GetSplitFields(tableInfo, splitFieldArr) if err != nil { return nil, errors.Trace(err) } @@ -109,3 +116,25 @@ func sortColsInPlace(cols []*model.ColumnInfo) { return cols[i].ID < cols[j].ID }) } + +func getFieldsAndIndex(table *common.TableDiff, dbConn *sql.DB, enableEmpty bool) ( + *indexFields, []*model.IndexInfo, error, +) { + fields, err := indexFieldsFromConfigString(table.Fields, table.Info, enableEmpty) + if err != nil { + return nil, nil, err + } + + var indices []*model.IndexInfo + if fields.IsEmpty() { + indices, err = utils.GetBetterIndex(context.Background(), dbConn, table.Schema, table.Table, table.Info) + if err != nil { + return nil, nil, errors.Trace(err) + } + } else { + // There are user configured "index-fields", so we will try to match from all indices. + indices = dbutil.FindAllIndex(table.Info) + } + + return fields, indices, nil +} diff --git a/sync_diff_inspector/splitter/index_fields_test.go b/sync_diff_inspector/splitter/index_fields_test.go index 2b41b1226..5192e52ee 100644 --- a/sync_diff_inspector/splitter/index_fields_test.go +++ b/sync_diff_inspector/splitter/index_fields_test.go @@ -33,7 +33,7 @@ func TestIndexFieldsSimple(t *testing.T) { tableInfo, err := dbutil.GetTableInfoBySQL(createTableSQL1, parser.New()) require.NoError(t, err) - fields, err := indexFieldsFromConfigString("k", tableInfo) + fields, err := indexFieldsFromConfigString("k", tableInfo, true) require.NoError(t, err) require.False(t, fields.IsEmpty()) require.Len(t, fields.Cols(), 1) @@ -64,7 +64,7 @@ func TestIndexFieldsComposite(t *testing.T) { tableInfo, err := dbutil.GetTableInfoBySQL(createTableSQL1, parser.New()) require.NoError(t, err) - fields, err := indexFieldsFromConfigString("id, k", tableInfo) + fields, err := indexFieldsFromConfigString("id, k", tableInfo, true) require.NoError(t, err) require.False(t, fields.IsEmpty()) require.Len(t, fields.Cols(), 2) @@ -95,7 +95,7 @@ func TestIndexFieldsEmpty(t *testing.T) { tableInfo, err := dbutil.GetTableInfoBySQL(createTableSQL1, parser.New()) require.NoError(t, err) - fields, err := indexFieldsFromConfigString("", tableInfo) + fields, err := indexFieldsFromConfigString("", tableInfo, true) require.NoError(t, err) require.True(t, fields.IsEmpty()) diff --git a/sync_diff_inspector/splitter/random.go b/sync_diff_inspector/splitter/random.go index 64bba5896..51a981c7c 100644 --- a/sync_diff_inspector/splitter/random.go +++ b/sync_diff_inspector/splitter/random.go @@ -17,8 +17,6 @@ import ( "context" "database/sql" "fmt" - "slices" - "strings" "github.com/pingcap/errors" "github.com/pingcap/failpoint" @@ -47,18 +45,43 @@ func NewRandomIterator(ctx context.Context, progressID string, table *common.Tab func NewRandomIteratorWithCheckpoint(ctx context.Context, progressID string, table *common.TableDiff, dbConn *sql.DB, startRange *RangeInfo) (*RandomIterator, error) { // get the chunk count by data count and chunk size - var splitFieldArr []string - if len(table.Fields) != 0 { - splitFieldArr = strings.Split(table.Fields, ",") + fields, indices, err := getFieldsAndIndex(table, dbConn, true) + if err != nil { + return nil, errors.Trace(err) } - for i := range splitFieldArr { - splitFieldArr[i] = strings.TrimSpace(splitFieldArr[i]) - } + indexName := "" +NEXTINDEX: + for _, index := range indices { + if index == nil { + continue + } + if startRange != nil && startRange.IndexID != index.ID { + continue + } - fields, indexName, err := GetSplitFields(table.Info, splitFieldArr) - if err != nil { - return nil, errors.Trace(err) + indexColumns := utils.GetColumnsFromIndex(index, table.Info) + + if len(indexColumns) < len(index.Columns) { + // some column in index is ignored. + continue + } + + if !fields.MatchesIndex(index) { + // We are enforcing user configured "index-fields" settings. + continue + } + + // skip the index that has expression column + for _, col := range indexColumns { + if col.Hidden { + continue NEXTINDEX + } + } + + // Found the index, use it as index hint. + indexName = index.Name.O + break } chunkRange := chunk.NewChunkRange() @@ -120,7 +143,7 @@ func NewRandomIteratorWithCheckpoint(ctx context.Context, progressID string, tab bucketChunkCnt = chunkCnt } - chunks, err := splitRangeByRandom(ctx, dbConn, chunkRange, chunkCnt, table.Schema, table.Table, fields, table.Range, table.Collation) + chunks, err := splitRangeByRandom(ctx, dbConn, chunkRange, chunkCnt, table.Schema, table.Table, fields.cols, table.Range, table.Collation) if err != nil { return nil, errors.Trace(err) } @@ -169,21 +192,21 @@ func (s *RandomIterator) Close() { // GetSplitFields returns fields to split chunks, order by pk, uk, index, columns. // Return the columns, corresponding index name and error -func GetSplitFields(table *model.TableInfo, splitFields []string) ([]*model.ColumnInfo, string, error) { +func GetSplitFields(table *model.TableInfo, splitFields []string) ([]*model.ColumnInfo, error) { colsMap := make(map[string]*model.ColumnInfo) splitCols := make([]*model.ColumnInfo, 0, 2) for _, splitField := range splitFields { col := dbutil.FindColumnByName(table.Columns, splitField) if col == nil { - return nil, "", errors.NotFoundf("column %s in table %s", splitField, table.Name) + return nil, errors.NotFoundf("column %s in table %s", splitField, table.Name) } splitCols = append(splitCols, col) } if len(splitCols) != 0 { - return splitCols, "", nil + return splitCols, nil } for _, col := range table.Columns { @@ -192,34 +215,25 @@ func GetSplitFields(table *model.TableInfo, splitFields []string) ([]*model.Colu // First try to get column from index indices := dbutil.FindAllIndex(table) - slices.SortFunc(indices, func(i1, i2 *model.IndexInfo) int { - if i1.Primary { - return -1 - } - if i2.Primary { - return 1 - } - if i1.Unique { - return -1 - } - if i2.Unique { - return -1 - } - return 0 - }) +NEXTINDEX: for _, idx := range indices { - icol := idx.Columns[0] - if col, ok := colsMap[icol.Name.O]; ok && !col.Hidden { - return []*model.ColumnInfo{col}, idx.Name.L, nil + cols := make([]*model.ColumnInfo, 0, len(table.Columns)) + for _, icol := range idx.Columns { + col := colsMap[icol.Name.O] + if col.Hidden { + continue NEXTINDEX + } + cols = append(cols, col) } + return cols, nil } for _, col := range table.Columns { if !col.Hidden { - return []*model.ColumnInfo{col}, "", nil + return []*model.ColumnInfo{col}, nil } } - return nil, "", errors.NotFoundf("not found column") + return nil, errors.NotFoundf("not found column") } // splitRangeByRandom splits a chunk to multiple chunks by random From 3b0088141087911b20e6a024e8c9a84ba21b3129 Mon Sep 17 00:00:00 2001 From: Ruihao Chen Date: Mon, 6 Jan 2025 14:10:05 +0800 Subject: [PATCH 03/15] Fix --- sync_diff_inspector/splitter/bucket.go | 13 +++++- sync_diff_inspector/splitter/index_fields.go | 43 +++---------------- .../splitter/index_fields_test.go | 6 +-- sync_diff_inspector/splitter/random.go | 18 ++++++-- sync_diff_inspector/splitter/splitter_test.go | 2 +- sync_diff_inspector/utils/utils_test.go | 2 +- 6 files changed, 39 insertions(+), 45 deletions(-) diff --git a/sync_diff_inspector/splitter/bucket.go b/sync_diff_inspector/splitter/bucket.go index 195edaa9f..ac0d9e921 100644 --- a/sync_diff_inspector/splitter/bucket.go +++ b/sync_diff_inspector/splitter/bucket.go @@ -132,7 +132,7 @@ func (s *BucketIterator) Next() (*chunk.Range, error) { } func (s *BucketIterator) init(ctx context.Context, startRange *RangeInfo) error { - fields, indices, err := getFieldsAndIndex(s.table, s.dbConn, true) + fields, err := indexFieldsFromConfigString(s.table.Fields, s.table.Info) if err != nil { return err } @@ -143,6 +143,17 @@ func (s *BucketIterator) init(ctx context.Context, startRange *RangeInfo) error return errors.Trace(err) } + var indices []*model.IndexInfo + if fields.IsEmpty() { + indices, err = utils.GetBetterIndex(context.Background(), s.dbConn, s.table.Schema, s.table.Table, s.table.Info) + if err != nil { + return errors.Trace(err) + } + } else { + // There are user configured "index-fields", so we will try to match from all indices. + indices = dbutil.FindAllIndex(s.table.Info) + } + NEXTINDEX: for _, index := range indices { if index == nil { diff --git a/sync_diff_inspector/splitter/index_fields.go b/sync_diff_inspector/splitter/index_fields.go index 00343a2a4..eb9659813 100644 --- a/sync_diff_inspector/splitter/index_fields.go +++ b/sync_diff_inspector/splitter/index_fields.go @@ -14,15 +14,11 @@ package splitter import ( - "context" - "database/sql" "sort" "strings" "github.com/pingcap/errors" "github.com/pingcap/log" - "github.com/pingcap/tidb-tools/pkg/dbutil" - "github.com/pingcap/tidb-tools/sync_diff_inspector/source/common" "github.com/pingcap/tidb-tools/sync_diff_inspector/utils" "github.com/pingcap/tidb/pkg/meta/model" "go.uber.org/zap" @@ -35,21 +31,18 @@ type indexFields struct { empty bool } -func indexFieldsFromConfigString(strFields string, tableInfo *model.TableInfo, enableEmpty bool) (*indexFields, error) { - if tableInfo == nil { - log.Panic("parsing index fields with empty tableInfo", - zap.String("index-fields", strFields)) - } - - if len(strFields) == 0 && enableEmpty { +func indexFieldsFromConfigString(strFields string, tableInfo *model.TableInfo) (*indexFields, error) { + if len(strFields) == 0 { // Empty option return &indexFields{empty: true}, nil } - var splitFieldArr []string - if len(strFields) > 0 { - splitFieldArr = strings.Split(strFields, ",") + if tableInfo == nil { + log.Panic("parsing index fields with empty tableInfo", + zap.String("index-fields", strFields)) } + + splitFieldArr := strings.Split(strFields, ",") for i := range splitFieldArr { splitFieldArr[i] = strings.TrimSpace(splitFieldArr[i]) } @@ -116,25 +109,3 @@ func sortColsInPlace(cols []*model.ColumnInfo) { return cols[i].ID < cols[j].ID }) } - -func getFieldsAndIndex(table *common.TableDiff, dbConn *sql.DB, enableEmpty bool) ( - *indexFields, []*model.IndexInfo, error, -) { - fields, err := indexFieldsFromConfigString(table.Fields, table.Info, enableEmpty) - if err != nil { - return nil, nil, err - } - - var indices []*model.IndexInfo - if fields.IsEmpty() { - indices, err = utils.GetBetterIndex(context.Background(), dbConn, table.Schema, table.Table, table.Info) - if err != nil { - return nil, nil, errors.Trace(err) - } - } else { - // There are user configured "index-fields", so we will try to match from all indices. - indices = dbutil.FindAllIndex(table.Info) - } - - return fields, indices, nil -} diff --git a/sync_diff_inspector/splitter/index_fields_test.go b/sync_diff_inspector/splitter/index_fields_test.go index 5192e52ee..2b41b1226 100644 --- a/sync_diff_inspector/splitter/index_fields_test.go +++ b/sync_diff_inspector/splitter/index_fields_test.go @@ -33,7 +33,7 @@ func TestIndexFieldsSimple(t *testing.T) { tableInfo, err := dbutil.GetTableInfoBySQL(createTableSQL1, parser.New()) require.NoError(t, err) - fields, err := indexFieldsFromConfigString("k", tableInfo, true) + fields, err := indexFieldsFromConfigString("k", tableInfo) require.NoError(t, err) require.False(t, fields.IsEmpty()) require.Len(t, fields.Cols(), 1) @@ -64,7 +64,7 @@ func TestIndexFieldsComposite(t *testing.T) { tableInfo, err := dbutil.GetTableInfoBySQL(createTableSQL1, parser.New()) require.NoError(t, err) - fields, err := indexFieldsFromConfigString("id, k", tableInfo, true) + fields, err := indexFieldsFromConfigString("id, k", tableInfo) require.NoError(t, err) require.False(t, fields.IsEmpty()) require.Len(t, fields.Cols(), 2) @@ -95,7 +95,7 @@ func TestIndexFieldsEmpty(t *testing.T) { tableInfo, err := dbutil.GetTableInfoBySQL(createTableSQL1, parser.New()) require.NoError(t, err) - fields, err := indexFieldsFromConfigString("", tableInfo, true) + fields, err := indexFieldsFromConfigString("", tableInfo) require.NoError(t, err) require.True(t, fields.IsEmpty()) diff --git a/sync_diff_inspector/splitter/random.go b/sync_diff_inspector/splitter/random.go index 51a981c7c..560d00cda 100644 --- a/sync_diff_inspector/splitter/random.go +++ b/sync_diff_inspector/splitter/random.go @@ -17,6 +17,7 @@ import ( "context" "database/sql" "fmt" + "strings" "github.com/pingcap/errors" "github.com/pingcap/failpoint" @@ -45,11 +46,22 @@ func NewRandomIterator(ctx context.Context, progressID string, table *common.Tab func NewRandomIteratorWithCheckpoint(ctx context.Context, progressID string, table *common.TableDiff, dbConn *sql.DB, startRange *RangeInfo) (*RandomIterator, error) { // get the chunk count by data count and chunk size - fields, indices, err := getFieldsAndIndex(table, dbConn, true) + var splitFieldArr []string + if len(table.Fields) != 0 { + splitFieldArr = strings.Split(table.Fields, ",") + } + + for i := range splitFieldArr { + splitFieldArr[i] = strings.TrimSpace(splitFieldArr[i]) + } + + fields, err := GetSplitFields(table.Info, splitFieldArr) if err != nil { return nil, errors.Trace(err) } + iFields := &indexFields{cols: fields, tableInfo: table.Info} + var indices = dbutil.FindAllIndex(table.Info) indexName := "" NEXTINDEX: for _, index := range indices { @@ -67,7 +79,7 @@ NEXTINDEX: continue } - if !fields.MatchesIndex(index) { + if !iFields.MatchesIndex(index) { // We are enforcing user configured "index-fields" settings. continue } @@ -143,7 +155,7 @@ NEXTINDEX: bucketChunkCnt = chunkCnt } - chunks, err := splitRangeByRandom(ctx, dbConn, chunkRange, chunkCnt, table.Schema, table.Table, fields.cols, table.Range, table.Collation) + chunks, err := splitRangeByRandom(ctx, dbConn, chunkRange, chunkCnt, table.Schema, table.Table, iFields.cols, table.Range, table.Collation) if err != nil { return nil, errors.Trace(err) } diff --git a/sync_diff_inspector/splitter/splitter_test.go b/sync_diff_inspector/splitter/splitter_test.go index 7c4156c43..18577c60c 100644 --- a/sync_diff_inspector/splitter/splitter_test.go +++ b/sync_diff_inspector/splitter/splitter_test.go @@ -142,7 +142,7 @@ func TestSplitRangeByRandom(t *testing.T) { tableInfo, err := dbutil.GetTableInfoBySQL(testCase.createTableSQL, parser.New()) require.NoError(t, err) - splitCols, _, err := GetSplitFields(tableInfo, nil) + splitCols, err := GetSplitFields(tableInfo, nil) require.NoError(t, err) createFakeResultForRandomSplit(mock, 0, testCase.randomValues) chunks, err := splitRangeByRandom(context.Background(), db, testCase.originChunk, testCase.splitCount, "test", "test", splitCols, "", "") diff --git a/sync_diff_inspector/utils/utils_test.go b/sync_diff_inspector/utils/utils_test.go index 9c64a56b7..a4c0f4131 100644 --- a/sync_diff_inspector/utils/utils_test.go +++ b/sync_diff_inspector/utils/utils_test.go @@ -269,7 +269,7 @@ func TestGetCountAndMd5Checksum(t *testing.T) { tableInfo, err := dbutil.GetTableInfoBySQL(createTableSQL, parser.New()) require.NoError(t, err) - mock.ExpectQuery("SELECT COUNT.*FROM `test_schema`\\.`test_table` WHERE \\[23 45\\].*").WithArgs("123", "234").WillReturnRows(sqlmock.NewRows([]string{"CNT", "CHECKSUM"}).AddRow(123, 456)) + mock.ExpectQuery("SELECT COUNT.*FROM `test_schema`\\.`test_table`").WithArgs("123", "234").WillReturnRows(sqlmock.NewRows([]string{"CNT", "CHECKSUM"}).AddRow(123, 456)) count, checksum, err := GetCountAndMd5Checksum(ctx, conn, "test_schema", "test_table", tableInfo, "[23 45]", "", []interface{}{"123", "234"}) require.NoError(t, err) From f36eb9e404523c7078ee74ddfce9a6c29e19f594 Mon Sep 17 00:00:00 2001 From: Ruihao Chen Date: Mon, 6 Jan 2025 14:20:00 +0800 Subject: [PATCH 04/15] Fix hint on MySQL --- sync_diff_inspector/splitter/random.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sync_diff_inspector/splitter/random.go b/sync_diff_inspector/splitter/random.go index 560d00cda..3369ee207 100644 --- a/sync_diff_inspector/splitter/random.go +++ b/sync_diff_inspector/splitter/random.go @@ -97,7 +97,9 @@ NEXTINDEX: } chunkRange := chunk.NewChunkRange() - chunkRange.IndexHint = fmt.Sprintf("force index(%s)", indexName) + if len(indexName) > 0 { + chunkRange.IndexHint = fmt.Sprintf("force index(%s)", indexName) + } beginIndex := 0 bucketChunkCnt := 0 From 392aea378167e2877062c2c5280a2ac93639b50c Mon Sep 17 00:00:00 2001 From: Ruihao Chen Date: Mon, 6 Jan 2025 19:00:49 +0800 Subject: [PATCH 05/15] Update test --- pkg/diff/spliter_test.go | 62 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 56 insertions(+), 6 deletions(-) diff --git a/pkg/diff/spliter_test.go b/pkg/diff/spliter_test.go index 2ce6aa922..40ee00d64 100644 --- a/pkg/diff/spliter_test.go +++ b/pkg/diff/spliter_test.go @@ -482,7 +482,7 @@ func TestBucketSpliterHint(t *testing.T) { Info: tableInfo, } - createFakeResultForRangeHint(mock, tc.indexCount) + createFakeResultForBucketIterator(mock, tc.indexCount) iter, err := splitter.NewBucketIteratorWithCheckpoint(ctx, "", tableDiff, db, nil, utils.NewWorkerPool(1, "bucketIter")) require.NoError(t, err) @@ -492,7 +492,57 @@ func TestBucketSpliterHint(t *testing.T) { } } -func createFakeResultForRangeHint(mock sqlmock.Sqlmock, indexCount int) { +func TestRandomSpliterHint(t *testing.T) { + db, mock, err := sqlmock.New() + require.NoError(t, err) + ctx := context.Background() + + testCases := []struct { + tableSQL string + expectedHint string + }{ + { + "create table `test`.`test`(`a` int, `b` int, `c` int, primary key(`a`, `b`), unique key i1(`c`))", + "force index(PRIMARY)", + }, + { + "create table `test`.`test`(`a` int, `b` int, `c` int, unique key i1(`c`), key i2(`b`))", + "force index(i1)", + }, + { + "create table `test`.`test`(`a` int, `b` int, `c` int, key i2(`b`))", + "force index(i2)", + }, + { + "create table `test`.`test`(`a` int, `b` int, `c` int)", + "", + }, + } + + for _, tc := range testCases { + tableInfo, err := dbutil.GetTableInfoBySQL(tc.tableSQL, parser.New()) + require.NoError(t, err) + + for _, tableRange := range []string{"", "c > 100"} { + tableDiff := &common.TableDiff{ + Schema: "test", + Table: "test", + Info: tableInfo, + Range: tableRange, + } + + mock.ExpectQuery("SELECT COUNT*").WillReturnRows(sqlmock.NewRows([]string{"CNT"}).AddRow("320")) + + iter, err := splitter.NewRandomIteratorWithCheckpoint(ctx, "", tableDiff, db, nil) + require.NoError(t, err) + chunk, err := iter.Next() + require.NoError(t, err) + require.Equal(t, strings.ToLower(tc.expectedHint), strings.ToLower(chunk.IndexHint)) + } + } +} + +func createFakeResultForBucketIterator(mock sqlmock.Sqlmock, indexCount int) { /* +---------+------------+-------------+----------+-----------+-------+---------+-------------+-------------+ | Db_name | Table_name | Column_name | Is_index | Bucket_id | Count | Repeats | Lower_Bound | Upper_Bound | @@ -504,10 +554,6 @@ func createFakeResultForRangeHint(mock sqlmock.Sqlmock, indexCount int) { | test | test | PRIMARY | 1 | 4 | 320 | 1 | (256, 48) | (319, 59) | +---------+------------+-------------+----------+-----------+-------+---------+-------------+-------------+ */ - for i := 0; i < indexCount; i++ { - mock.ExpectQuery("SELECT COUNT\\(DISTINCT *").WillReturnRows(sqlmock.NewRows([]string{"SEL"}).AddRow("5")) - } - statsRows := sqlmock.NewRows([]string{"Db_name", "Table_name", "Column_name", "Is_index", "Bucket_id", "Count", "Repeats", "Lower_Bound", "Upper_Bound"}) for _, indexName := range []string{"PRIMARY", "i1", "i2", "i3", "i4"} { for i := 0; i < 5; i++ { @@ -515,6 +561,10 @@ func createFakeResultForRangeHint(mock sqlmock.Sqlmock, indexCount int) { } } mock.ExpectQuery("SHOW STATS_BUCKETS").WillReturnRows(statsRows) + + for i := 0; i < indexCount; i++ { + mock.ExpectQuery("SELECT COUNT\\(DISTINCT *").WillReturnRows(sqlmock.NewRows([]string{"SEL"}).AddRow("5")) + } } func createFakeResultForBucketSplit(mock sqlmock.Sqlmock, aRandomValues, bRandomValues []interface{}) { From 7148d6ab9d4a94b9ddeaa8cd3d0b99eada6960c4 Mon Sep 17 00:00:00 2001 From: Ruihao Chen Date: Thu, 9 Jan 2025 13:30:56 +0800 Subject: [PATCH 06/15] Only apply hint for TiDB --- pkg/diff/spliter_test.go | 12 ++++++------ sync_diff_inspector/chunk/chunk.go | 8 +++++--- sync_diff_inspector/source/mysql_shard.go | 2 +- sync_diff_inspector/source/tidb.go | 12 +++++++++++- sync_diff_inspector/splitter/bucket.go | 12 ++++-------- sync_diff_inspector/splitter/random.go | 4 +--- sync_diff_inspector/utils/utils.go | 6 +++--- 7 files changed, 31 insertions(+), 25 deletions(-) diff --git a/pkg/diff/spliter_test.go b/pkg/diff/spliter_test.go index 40ee00d64..9bb24f797 100644 --- a/pkg/diff/spliter_test.go +++ b/pkg/diff/spliter_test.go @@ -458,17 +458,17 @@ func TestBucketSpliterHint(t *testing.T) { { "create table `test`.`test`(`a` int, `b` int, `c` int, primary key(`a`, `b`), unique key i1(`c`))", 0, - "force index(PRIMARY)", + "PRIMARY", }, { "create table `test`.`test`(`a` int, `b` int, `c` int, unique key i1(`c`))", 0, - "force index(i1)", + "i1", }, { "create table `test`.`test`(`a` int, `b` int, `c` int, key i2(`b`))", 1, - "force index(i2)", + "i2", }, } @@ -503,15 +503,15 @@ func TestRandomSpliterHint(t *testing.T) { }{ { "create table `test`.`test`(`a` int, `b` int, `c` int, primary key(`a`, `b`), unique key i1(`c`))", - "force index(PRIMARY)", + "PRIMARY", }, { "create table `test`.`test`(`a` int, `b` int, `c` int, unique key i1(`c`), key i2(`b`))", - "force index(i1)", + "i1", }, { "create table `test`.`test`(`a` int, `b` int, `c` int, key i2(`b`))", - "force index(i2)", + "i2", }, { "create table `test`.`test`(`a` int, `b` int, `c` int)", diff --git a/sync_diff_inspector/chunk/chunk.go b/sync_diff_inspector/chunk/chunk.go index 439e7cfff..d8d2c83ce 100644 --- a/sync_diff_inspector/chunk/chunk.go +++ b/sync_diff_inspector/chunk/chunk.go @@ -147,9 +147,11 @@ type Range struct { IsFirst bool `json:"is-first"` IsLast bool `json:"is-last"` - Where string `json:"where"` - Args []interface{} `json:"args"` - IndexHint string `json:"index-hint"` + Where string `json:"where"` + Args []interface{} `json:"args"` + + // IndexHint is the index for the checksum query hint, it's only used in TiDB source. + IndexHint string `json:"index-hint"` columnOffset map[string]int } diff --git a/sync_diff_inspector/source/mysql_shard.go b/sync_diff_inspector/source/mysql_shard.go index 59e5e1adf..5f7aa2e08 100644 --- a/sync_diff_inspector/source/mysql_shard.go +++ b/sync_diff_inspector/source/mysql_shard.go @@ -105,7 +105,7 @@ func (s *MySQLSources) GetCountAndMd5(ctx context.Context, tableRange *splitter. go func(ms *common.TableShardSource) { count, checksum, err := utils.GetCountAndMd5Checksum( ctx, ms.DBConn, ms.OriginSchema, ms.OriginTable, table.Info, - chunk.Where, chunk.IndexHint, chunk.Args) + chunk.Where, "", chunk.Args) infoCh <- &ChecksumInfo{ Checksum: checksum, Count: count, diff --git a/sync_diff_inspector/source/tidb.go b/sync_diff_inspector/source/tidb.go index a181546d7..0ac1dcf90 100644 --- a/sync_diff_inspector/source/tidb.go +++ b/sync_diff_inspector/source/tidb.go @@ -17,6 +17,7 @@ import ( "context" "database/sql" "fmt" + "strings" "time" "github.com/coreos/go-semver/semver" @@ -126,9 +127,18 @@ func (s *TiDBSource) GetCountAndMd5(ctx context.Context, tableRange *splitter.Ra chunk := tableRange.GetChunk() matchSource := getMatchSource(s.sourceTableMap, table) + + indexHint := "" + if chunk.IndexHint != "" { + indexHint = fmt.Sprintf("/*+ USE_INDEX(`%s`.`%s`, `%s`) */", + matchSource.OriginSchema, matchSource.OriginTable, + strings.Replace(chunk.IndexHint, "`", "``", -1), + ) + } + count, checksum, err := utils.GetCountAndMd5Checksum( ctx, s.dbConn, matchSource.OriginSchema, matchSource.OriginTable, table.Info, - chunk.Where, chunk.IndexHint, chunk.Args) + chunk.Where, indexHint, chunk.Args) cost := time.Since(beginTime) return &ChecksumInfo{ diff --git a/sync_diff_inspector/splitter/bucket.go b/sync_diff_inspector/splitter/bucket.go index ac0d9e921..759fcc8c4 100644 --- a/sync_diff_inspector/splitter/bucket.go +++ b/sync_diff_inspector/splitter/bucket.go @@ -16,7 +16,6 @@ package splitter import ( "context" "database/sql" - "fmt" "sync" "github.com/pingcap/errors" @@ -245,13 +244,10 @@ func (s *BucketIterator) produceChunks(ctx context.Context, startRange *RangeInf }() var ( lowerValues, upperValues []string - indexHint string latestCount int64 err error ) - if len(s.indexName) > 0 { - indexHint = fmt.Sprintf("force index(%s)", s.indexName) - } + firstBucket := 0 if startRange != nil { c := startRange.GetChunk() @@ -285,7 +281,7 @@ func (s *BucketIterator) produceChunks(ctx context.Context, startRange *RangeInf leftCnt := c.Index.ChunkCnt - c.Index.ChunkIndex - 1 if leftCnt > 0 { chunkRange := chunk.NewChunkRange() - chunkRange.IndexHint = indexHint + chunkRange.IndexHint = s.indexName for i, column := range s.indexColumns { chunkRange.Update(column.Name.O, "", nextUpperValues[i], false, true) @@ -318,7 +314,7 @@ func (s *BucketIterator) produceChunks(ctx context.Context, startRange *RangeInf } chunkRange := chunk.NewChunkRange() - chunkRange.IndexHint = indexHint + chunkRange.IndexHint = s.indexName for j, column := range s.indexColumns { var lowerValue, upperValue string if len(lowerValues) > 0 { @@ -356,7 +352,7 @@ func (s *BucketIterator) produceChunks(ctx context.Context, startRange *RangeInf // merge the rest keys into one chunk chunkRange := chunk.NewChunkRange() - chunkRange.IndexHint = indexHint + chunkRange.IndexHint = s.indexName if len(lowerValues) > 0 { for j, column := range s.indexColumns { chunkRange.Update(column.Name.O, lowerValues[j], "", true, false) diff --git a/sync_diff_inspector/splitter/random.go b/sync_diff_inspector/splitter/random.go index 3369ee207..e5c24ffdf 100644 --- a/sync_diff_inspector/splitter/random.go +++ b/sync_diff_inspector/splitter/random.go @@ -97,9 +97,7 @@ NEXTINDEX: } chunkRange := chunk.NewChunkRange() - if len(indexName) > 0 { - chunkRange.IndexHint = fmt.Sprintf("force index(%s)", indexName) - } + chunkRange.IndexHint = indexName beginIndex := 0 bucketChunkCnt := 0 diff --git a/sync_diff_inspector/utils/utils.go b/sync_diff_inspector/utils/utils.go index e8dac8fe6..49f622b0e 100644 --- a/sync_diff_inspector/utils/utils.go +++ b/sync_diff_inspector/utils/utils.go @@ -804,17 +804,17 @@ func GetCountAndMd5Checksum( columnIsNull = append(columnIsNull, fmt.Sprintf("ISNULL(%s)", name)) } - query := fmt.Sprintf(`SELECT + query := fmt.Sprintf(`SELECT %s COUNT(*) as CNT, BIT_XOR(CAST(CONV(SUBSTRING(MD5(CONCAT_WS(',', %s, CONCAT(%s))), 1, 16), 16, 10) AS UNSIGNED) ^ CAST(CONV(SUBSTRING(MD5(CONCAT_WS(',', %s, CONCAT(%s))), 17, 16), 16, 10) AS UNSIGNED)) as CHECKSUM - FROM %s %s WHERE %s`, + FROM %s WHERE %s`, + indexHint, strings.Join(columnNames, ", "), strings.Join(columnIsNull, ", "), strings.Join(columnNames, ", "), strings.Join(columnIsNull, ", "), dbutil.TableName(schemaName, tableName), - indexHint, limitRange, ) log.Debug("count and checksum", zap.String("sql", query), zap.Reflect("args", args)) From dbd303bebf54c16955c4d65caa37b48ce0b84cb2 Mon Sep 17 00:00:00 2001 From: Ruihao Chen Date: Thu, 9 Jan 2025 13:56:11 +0800 Subject: [PATCH 07/15] Move test --- pkg/diff/spliter_test.go | 138 +----------------- sync_diff_inspector/splitter/splitter_test.go | 123 ++++++++++++++++ 2 files changed, 127 insertions(+), 134 deletions(-) diff --git a/pkg/diff/spliter_test.go b/pkg/diff/spliter_test.go index 9bb24f797..c71320f17 100644 --- a/pkg/diff/spliter_test.go +++ b/pkg/diff/spliter_test.go @@ -14,19 +14,12 @@ package diff import ( - "context" "fmt" - "strings" - "testing" sqlmock "github.com/DATA-DOG/go-sqlmock" . "github.com/pingcap/check" "github.com/pingcap/tidb-tools/pkg/dbutil" - "github.com/pingcap/tidb-tools/sync_diff_inspector/source/common" - "github.com/pingcap/tidb-tools/sync_diff_inspector/splitter" - "github.com/pingcap/tidb-tools/sync_diff_inspector/utils" "github.com/pingcap/tidb/pkg/parser" - "github.com/stretchr/testify/require" ) var _ = Suite(&testSpliterSuite{}) @@ -445,128 +438,6 @@ func (s *testSpliterSuite) TestBucketSpliter(c *C) { } } -func TestBucketSpliterHint(t *testing.T) { - db, mock, err := sqlmock.New() - require.NoError(t, err) - ctx := context.Background() - - testCases := []struct { - tableSQL string - indexCount int - expectedHint string - }{ - { - "create table `test`.`test`(`a` int, `b` int, `c` int, primary key(`a`, `b`), unique key i1(`c`))", - 0, - "PRIMARY", - }, - { - "create table `test`.`test`(`a` int, `b` int, `c` int, unique key i1(`c`))", - 0, - "i1", - }, - { - "create table `test`.`test`(`a` int, `b` int, `c` int, key i2(`b`))", - 1, - "i2", - }, - } - - for _, tc := range testCases { - tableInfo, err := dbutil.GetTableInfoBySQL(tc.tableSQL, parser.New()) - require.NoError(t, err) - - tableDiff := &common.TableDiff{ - Schema: "test", - Table: "test", - Info: tableInfo, - } - - createFakeResultForBucketIterator(mock, tc.indexCount) - - iter, err := splitter.NewBucketIteratorWithCheckpoint(ctx, "", tableDiff, db, nil, utils.NewWorkerPool(1, "bucketIter")) - require.NoError(t, err) - chunk, err := iter.Next() - require.NoError(t, err) - require.Equal(t, strings.ToLower(tc.expectedHint), strings.ToLower(chunk.IndexHint)) - } -} - -func TestRandomSpliterHint(t *testing.T) { - db, mock, err := sqlmock.New() - require.NoError(t, err) - ctx := context.Background() - - testCases := []struct { - tableSQL string - expectedHint string - }{ - { - "create table `test`.`test`(`a` int, `b` int, `c` int, primary key(`a`, `b`), unique key i1(`c`))", - "PRIMARY", - }, - { - "create table `test`.`test`(`a` int, `b` int, `c` int, unique key i1(`c`), key i2(`b`))", - "i1", - }, - { - "create table `test`.`test`(`a` int, `b` int, `c` int, key i2(`b`))", - "i2", - }, - { - "create table `test`.`test`(`a` int, `b` int, `c` int)", - "", - }, - } - - for _, tc := range testCases { - tableInfo, err := dbutil.GetTableInfoBySQL(tc.tableSQL, parser.New()) - require.NoError(t, err) - - for _, tableRange := range []string{"", "c > 100"} { - tableDiff := &common.TableDiff{ - Schema: "test", - Table: "test", - Info: tableInfo, - Range: tableRange, - } - - mock.ExpectQuery("SELECT COUNT*").WillReturnRows(sqlmock.NewRows([]string{"CNT"}).AddRow("320")) - - iter, err := splitter.NewRandomIteratorWithCheckpoint(ctx, "", tableDiff, db, nil) - require.NoError(t, err) - chunk, err := iter.Next() - require.NoError(t, err) - require.Equal(t, strings.ToLower(tc.expectedHint), strings.ToLower(chunk.IndexHint)) - } - } -} - -func createFakeResultForBucketIterator(mock sqlmock.Sqlmock, indexCount int) { - /* - +---------+------------+-------------+----------+-----------+-------+---------+-------------+-------------+ - | Db_name | Table_name | Column_name | Is_index | Bucket_id | Count | Repeats | Lower_Bound | Upper_Bound | - +---------+------------+-------------+----------+-----------+-------+---------+-------------+-------------+ - | test | test | PRIMARY | 1 | 0 | 64 | 1 | (0, 0) | (63, 11) | - | test | test | PRIMARY | 1 | 1 | 128 | 1 | (64, 12) | (127, 23) | - | test | test | PRIMARY | 1 | 2 | 192 | 1 | (128, 24) | (191, 35) | - | test | test | PRIMARY | 1 | 3 | 256 | 1 | (192, 36) | (255, 47) | - | test | test | PRIMARY | 1 | 4 | 320 | 1 | (256, 48) | (319, 59) | - +---------+------------+-------------+----------+-----------+-------+---------+-------------+-------------+ - */ - statsRows := sqlmock.NewRows([]string{"Db_name", "Table_name", "Column_name", "Is_index", "Bucket_id", "Count", "Repeats", "Lower_Bound", "Upper_Bound"}) - for _, indexName := range []string{"PRIMARY", "i1", "i2", "i3", "i4"} { - for i := 0; i < 5; i++ { - statsRows.AddRow("test", "test", indexName, 1, (i+1)*64, (i+1)*64, 1, fmt.Sprintf("(%d, %d)", i*64, i*12), fmt.Sprintf("(%d, %d)", (i+1)*64-1, (i+1)*12-1)) - } - } - mock.ExpectQuery("SHOW STATS_BUCKETS").WillReturnRows(statsRows) - - for i := 0; i < indexCount; i++ { - mock.ExpectQuery("SELECT COUNT\\(DISTINCT *").WillReturnRows(sqlmock.NewRows([]string{"SEL"}).AddRow("5")) - } -} - func createFakeResultForBucketSplit(mock sqlmock.Sqlmock, aRandomValues, bRandomValues []interface{}) { /* +---------+------------+-------------+----------+-----------+-------+---------+-------------+-------------+ @@ -581,14 +452,11 @@ func createFakeResultForBucketSplit(mock sqlmock.Sqlmock, aRandomValues, bRandom */ statsRows := sqlmock.NewRows([]string{"Db_name", "Table_name", "Column_name", "Is_index", "Bucket_id", "Count", "Repeats", "Lower_Bound", "Upper_Bound"}) - for _, indexName := range []string{"PRIMARY", "i1", "i2", "i3", "i4"} { - for i := 0; i < 5; i++ { - statsRows.AddRow("test", "test", indexName, 1, (i+1)*64, (i+1)*64, 1, fmt.Sprintf("(%d, %d)", i*64, i*12), fmt.Sprintf("(%d, %d)", (i+1)*64-1, (i+1)*12-1)) - } + for i := 0; i < 5; i++ { + statsRows.AddRow("test", "test", "PRIMARY", 1, (i+1)*64, (i+1)*64, 1, fmt.Sprintf("(%d, %d)", i*64, i*12), fmt.Sprintf("(%d, %d)", (i+1)*64-1, (i+1)*12-1)) } mock.ExpectQuery("SHOW STATS_BUCKETS").WillReturnRows(statsRows) - mock.ExpectQuery("SELECT COUNT\\(DISTINCT *").WillReturnRows(sqlmock.NewRows([]string{"SEL"}).AddRow("5")) for i := 0; i < len(aRandomValues); i++ { aRandomRows := sqlmock.NewRows([]string{"a"}) @@ -599,4 +467,6 @@ func createFakeResultForBucketSplit(mock sqlmock.Sqlmock, aRandomValues, bRandom bRandomRows.AddRow(bRandomValues[i]) mock.ExpectQuery("ORDER BY rand_value").WillReturnRows(bRandomRows) } + + return } diff --git a/sync_diff_inspector/splitter/splitter_test.go b/sync_diff_inspector/splitter/splitter_test.go index 18577c60c..2f94ef293 100644 --- a/sync_diff_inspector/splitter/splitter_test.go +++ b/sync_diff_inspector/splitter/splitter_test.go @@ -19,6 +19,7 @@ import ( "fmt" "sort" "strconv" + "strings" "testing" sqlmock "github.com/DATA-DOG/go-sqlmock" @@ -934,3 +935,125 @@ func TestChunkSize(t *testing.T) { require.NoError(t, err) } + +func TestBucketSpliterHint(t *testing.T) { + db, mock, err := sqlmock.New() + require.NoError(t, err) + ctx := context.Background() + + testCases := []struct { + tableSQL string + indexCount int + expectedHint string + }{ + { + "create table `test`.`test`(`a` int, `b` int, `c` int, primary key(`a`, `b`), unique key i1(`c`))", + 0, + "PRIMARY", + }, + { + "create table `test`.`test`(`a` int, `b` int, `c` int, unique key i1(`c`))", + 0, + "i1", + }, + { + "create table `test`.`test`(`a` int, `b` int, `c` int, key i2(`b`))", + 1, + "i2", + }, + } + + for _, tc := range testCases { + tableInfo, err := dbutil.GetTableInfoBySQL(tc.tableSQL, parser.New()) + require.NoError(t, err) + + tableDiff := &common.TableDiff{ + Schema: "test", + Table: "test", + Info: tableInfo, + } + + createFakeResultForBucketIterator(mock, tc.indexCount) + + iter, err := NewBucketIteratorWithCheckpoint(ctx, "", tableDiff, db, nil, utils.NewWorkerPool(1, "bucketIter")) + require.NoError(t, err) + chunk, err := iter.Next() + require.NoError(t, err) + require.Equal(t, strings.ToLower(tc.expectedHint), strings.ToLower(chunk.IndexHint)) + } +} + +func TestRandomSpliterHint(t *testing.T) { + db, mock, err := sqlmock.New() + require.NoError(t, err) + ctx := context.Background() + + testCases := []struct { + tableSQL string + expectedHint string + }{ + { + "create table `test`.`test`(`a` int, `b` int, `c` int, primary key(`a`, `b`), unique key i1(`c`))", + "PRIMARY", + }, + { + "create table `test`.`test`(`a` int, `b` int, `c` int, unique key i1(`c`), key i2(`b`))", + "i1", + }, + { + "create table `test`.`test`(`a` int, `b` int, `c` int, key i2(`b`))", + "i2", + }, + { + "create table `test`.`test`(`a` int, `b` int, `c` int)", + "", + }, + } + + for _, tc := range testCases { + tableInfo, err := dbutil.GetTableInfoBySQL(tc.tableSQL, parser.New()) + require.NoError(t, err) + + for _, tableRange := range []string{"", "c > 100"} { + tableDiff := &common.TableDiff{ + Schema: "test", + Table: "test", + Info: tableInfo, + Range: tableRange, + } + + mock.ExpectQuery("SELECT COUNT*").WillReturnRows(sqlmock.NewRows([]string{"CNT"}).AddRow("320")) + + iter, err := NewRandomIteratorWithCheckpoint(ctx, "", tableDiff, db, nil) + require.NoError(t, err) + chunk, err := iter.Next() + require.NoError(t, err) + require.Equal(t, strings.ToLower(tc.expectedHint), strings.ToLower(chunk.IndexHint)) + } + } +} + +func createFakeResultForBucketIterator(mock sqlmock.Sqlmock, indexCount int) { + /* + +---------+------------+-------------+----------+-----------+-------+---------+-------------+-------------+ + | Db_name | Table_name | Column_name | Is_index | Bucket_id | Count | Repeats | Lower_Bound | Upper_Bound | + +---------+------------+-------------+----------+-----------+-------+---------+-------------+-------------+ + | test | test | PRIMARY | 1 | 0 | 64 | 1 | (0, 0) | (63, 11) | + | test | test | PRIMARY | 1 | 1 | 128 | 1 | (64, 12) | (127, 23) | + | test | test | PRIMARY | 1 | 2 | 192 | 1 | (128, 24) | (191, 35) | + | test | test | PRIMARY | 1 | 3 | 256 | 1 | (192, 36) | (255, 47) | + | test | test | PRIMARY | 1 | 4 | 320 | 1 | (256, 48) | (319, 59) | + +---------+------------+-------------+----------+-----------+-------+---------+-------------+-------------+ + */ + statsRows := sqlmock.NewRows([]string{"Db_name", "Table_name", "Column_name", "Is_index", "Bucket_id", "Count", "Repeats", "Lower_Bound", "Upper_Bound"}) + for _, indexName := range []string{"PRIMARY", "i1", "i2", "i3", "i4"} { + for i := 0; i < 5; i++ { + statsRows.AddRow("test", "test", indexName, 1, (i+1)*64, (i+1)*64, 1, fmt.Sprintf("(%d, %d)", i*64, i*12), fmt.Sprintf("(%d, %d)", (i+1)*64-1, (i+1)*12-1)) + } + } + mock.ExpectQuery("SHOW STATS_BUCKETS").WillReturnRows(statsRows) + + for i := 0; i < indexCount; i++ { + mock.ExpectQuery("SELECT COUNT\\(DISTINCT *").WillReturnRows(sqlmock.NewRows([]string{"SEL"}).AddRow("5")) + } +} From d6a68f28acdcaedbe2a661cd571c72c3284f20b5 Mon Sep 17 00:00:00 2001 From: Ruihao Chen Date: Thu, 9 Jan 2025 14:02:13 +0800 Subject: [PATCH 08/15] Remove some code --- pkg/diff/spliter_test.go | 1 - sync_diff_inspector/splitter/random.go | 22 +++++++++++----------- sync_diff_inspector/utils/utils_test.go | 3 +-- 3 files changed, 12 insertions(+), 14 deletions(-) diff --git a/pkg/diff/spliter_test.go b/pkg/diff/spliter_test.go index c71320f17..96b50f9d6 100644 --- a/pkg/diff/spliter_test.go +++ b/pkg/diff/spliter_test.go @@ -455,7 +455,6 @@ func createFakeResultForBucketSplit(mock sqlmock.Sqlmock, aRandomValues, bRandom for i := 0; i < 5; i++ { statsRows.AddRow("test", "test", "PRIMARY", 1, (i+1)*64, (i+1)*64, 1, fmt.Sprintf("(%d, %d)", i*64, i*12), fmt.Sprintf("(%d, %d)", (i+1)*64-1, (i+1)*12-1)) } - mock.ExpectQuery("SHOW STATS_BUCKETS").WillReturnRows(statsRows) for i := 0; i < len(aRandomValues); i++ { diff --git a/sync_diff_inspector/splitter/random.go b/sync_diff_inspector/splitter/random.go index e5c24ffdf..80d40380a 100644 --- a/sync_diff_inspector/splitter/random.go +++ b/sync_diff_inspector/splitter/random.go @@ -203,7 +203,6 @@ func (s *RandomIterator) Close() { } // GetSplitFields returns fields to split chunks, order by pk, uk, index, columns. -// Return the columns, corresponding index name and error func GetSplitFields(table *model.TableInfo, splitFields []string) ([]*model.ColumnInfo, error) { colsMap := make(map[string]*model.ColumnInfo) @@ -225,19 +224,20 @@ func GetSplitFields(table *model.TableInfo, splitFields []string) ([]*model.Colu colsMap[col.Name.O] = col } - // First try to get column from index indices := dbutil.FindAllIndex(table) -NEXTINDEX: - for _, idx := range indices { - cols := make([]*model.ColumnInfo, 0, len(table.Columns)) - for _, icol := range idx.Columns { - col := colsMap[icol.Name.O] - if col.Hidden { - continue NEXTINDEX + if len(indices) != 0 { + NEXTINDEX: + for _, idx := range indices { + cols := make([]*model.ColumnInfo, 0, len(table.Columns)) + for _, icol := range idx.Columns { + col := colsMap[icol.Name.O] + if col.Hidden { + continue NEXTINDEX + } + cols = append(cols, col) } - cols = append(cols, col) + return cols, nil } - return cols, nil } for _, col := range table.Columns { diff --git a/sync_diff_inspector/utils/utils_test.go b/sync_diff_inspector/utils/utils_test.go index a4c0f4131..dd2e4bf03 100644 --- a/sync_diff_inspector/utils/utils_test.go +++ b/sync_diff_inspector/utils/utils_test.go @@ -269,8 +269,7 @@ func TestGetCountAndMd5Checksum(t *testing.T) { tableInfo, err := dbutil.GetTableInfoBySQL(createTableSQL, parser.New()) require.NoError(t, err) - mock.ExpectQuery("SELECT COUNT.*FROM `test_schema`\\.`test_table`").WithArgs("123", "234").WillReturnRows(sqlmock.NewRows([]string{"CNT", "CHECKSUM"}).AddRow(123, 456)) - + mock.ExpectQuery("SELECT COUNT.*FROM `test_schema`\\.`test_table` WHERE \\[23 45\\].*").WithArgs("123", "234").WillReturnRows(sqlmock.NewRows([]string{"CNT", "CHECKSUM"}).AddRow(123, 456)) count, checksum, err := GetCountAndMd5Checksum(ctx, conn, "test_schema", "test_table", tableInfo, "[23 45]", "", []interface{}{"123", "234"}) require.NoError(t, err) require.Equal(t, count, int64(123)) From fb057d550acae0ae106f2585193f9ed4c3a32382 Mon Sep 17 00:00:00 2001 From: Ruihao Chen Date: Thu, 9 Jan 2025 14:05:13 +0800 Subject: [PATCH 09/15] Remove some code --- sync_diff_inspector/diff.go | 1 + sync_diff_inspector/splitter/bucket.go | 1 - sync_diff_inspector/splitter/random.go | 1 - sync_diff_inspector/utils/utils_test.go | 1 + 4 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sync_diff_inspector/diff.go b/sync_diff_inspector/diff.go index c0efc364b..b46885d0c 100644 --- a/sync_diff_inspector/diff.go +++ b/sync_diff_inspector/diff.go @@ -609,6 +609,7 @@ func (df *Diff) compareChecksumAndGetCount(ctx context.Context, tableRange *spli if downstreamInfo.Err != nil { log.Warn("failed to compare downstream checksum") return false, -1, -1, errors.Trace(downstreamInfo.Err) + } if upstreamInfo.Count == downstreamInfo.Count && upstreamInfo.Checksum == downstreamInfo.Checksum { diff --git a/sync_diff_inspector/splitter/bucket.go b/sync_diff_inspector/splitter/bucket.go index 759fcc8c4..6a2fa6737 100644 --- a/sync_diff_inspector/splitter/bucket.go +++ b/sync_diff_inspector/splitter/bucket.go @@ -247,7 +247,6 @@ func (s *BucketIterator) produceChunks(ctx context.Context, startRange *RangeInf latestCount int64 err error ) - firstBucket := 0 if startRange != nil { c := startRange.GetChunk() diff --git a/sync_diff_inspector/splitter/random.go b/sync_diff_inspector/splitter/random.go index 80d40380a..44dd00b41 100644 --- a/sync_diff_inspector/splitter/random.go +++ b/sync_diff_inspector/splitter/random.go @@ -223,7 +223,6 @@ func GetSplitFields(table *model.TableInfo, splitFields []string) ([]*model.Colu for _, col := range table.Columns { colsMap[col.Name.O] = col } - indices := dbutil.FindAllIndex(table) if len(indices) != 0 { NEXTINDEX: diff --git a/sync_diff_inspector/utils/utils_test.go b/sync_diff_inspector/utils/utils_test.go index dd2e4bf03..9c64a56b7 100644 --- a/sync_diff_inspector/utils/utils_test.go +++ b/sync_diff_inspector/utils/utils_test.go @@ -270,6 +270,7 @@ func TestGetCountAndMd5Checksum(t *testing.T) { require.NoError(t, err) mock.ExpectQuery("SELECT COUNT.*FROM `test_schema`\\.`test_table` WHERE \\[23 45\\].*").WithArgs("123", "234").WillReturnRows(sqlmock.NewRows([]string{"CNT", "CHECKSUM"}).AddRow(123, 456)) + count, checksum, err := GetCountAndMd5Checksum(ctx, conn, "test_schema", "test_table", tableInfo, "[23 45]", "", []interface{}{"123", "234"}) require.NoError(t, err) require.Equal(t, count, int64(123)) From 32d429e08b15a55077a4a3b508795d4b9cf161a1 Mon Sep 17 00:00:00 2001 From: Ruihao Chen Date: Thu, 9 Jan 2025 14:27:38 +0800 Subject: [PATCH 10/15] Fix test --- sync_diff_inspector/chunk/chunk_test.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/sync_diff_inspector/chunk/chunk_test.go b/sync_diff_inspector/chunk/chunk_test.go index b5d62dd94..16a4518c3 100644 --- a/sync_diff_inspector/chunk/chunk_test.go +++ b/sync_diff_inspector/chunk/chunk_test.go @@ -122,7 +122,7 @@ func TestChunkToString(t *testing.T) { require.Equal(t, arg, expectArgs[i]) } - require.Equal(t, chunk.String(), `{"index":null,"type":0,"bounds":[{"column":"a","lower":"1","upper":"2","has-lower":true,"has-upper":true},{"column":"b","lower":"3","upper":"4","has-lower":true,"has-upper":true},{"column":"c","lower":"5","upper":"6","has-lower":true,"has-upper":true}],"is-first":false,"is-last":false,"where":"","args":null}`) + require.Equal(t, chunk.String(), `{"index":null,"type":0,"bounds":[{"column":"a","lower":"1","upper":"2","has-lower":true,"has-upper":true},{"column":"b","lower":"3","upper":"4","has-lower":true,"has-upper":true},{"column":"c","lower":"5","upper":"6","has-lower":true,"has-upper":true}],"is-first":false,"is-last":false,"where":"","args":null,"index-hint":""}`) require.Equal(t, chunk.ToMeta(), "range in sequence: (1,3,5) < (a,b,c) <= (2,4,6)") // upper @@ -157,7 +157,7 @@ func TestChunkToString(t *testing.T) { require.Equal(t, arg, expectArgs[i]) } - require.Equal(t, chunk.String(), `{"index":null,"type":0,"bounds":[{"column":"a","lower":"1","upper":"2","has-lower":false,"has-upper":true},{"column":"b","lower":"3","upper":"4","has-lower":false,"has-upper":true},{"column":"c","lower":"5","upper":"6","has-lower":false,"has-upper":true}],"is-first":false,"is-last":false,"where":"","args":null}`) + require.Equal(t, chunk.String(), `{"index":null,"type":0,"bounds":[{"column":"a","lower":"1","upper":"2","has-lower":false,"has-upper":true},{"column":"b","lower":"3","upper":"4","has-lower":false,"has-upper":true},{"column":"c","lower":"5","upper":"6","has-lower":false,"has-upper":true}],"is-first":false,"is-last":false,"where":"","args":null,"index-hint":""}`) require.Equal(t, chunk.ToMeta(), "range in sequence: (a,b,c) <= (2,4,6)") // lower @@ -199,7 +199,7 @@ func TestChunkToString(t *testing.T) { require.Equal(t, arg, expectArgs[i]) } - require.Equal(t, chunk.String(), `{"index":null,"type":0,"bounds":[{"column":"a","lower":"1","upper":"2","has-lower":true,"has-upper":false},{"column":"b","lower":"3","upper":"4","has-lower":true,"has-upper":false},{"column":"c","lower":"5","upper":"6","has-lower":true,"has-upper":false}],"is-first":false,"is-last":false,"where":"","args":null}`) + require.Equal(t, chunk.String(), `{"index":null,"type":0,"bounds":[{"column":"a","lower":"1","upper":"2","has-lower":true,"has-upper":false},{"column":"b","lower":"3","upper":"4","has-lower":true,"has-upper":false},{"column":"c","lower":"5","upper":"6","has-lower":true,"has-upper":false}],"is-first":false,"is-last":false,"where":"","args":null,"index-hint":""}`) require.Equal(t, chunk.ToMeta(), "range in sequence: (1,3,5) < (a,b,c)") // none @@ -232,7 +232,7 @@ func TestChunkToString(t *testing.T) { for i, arg := range args { require.Equal(t, arg, expectArgs[i]) } - require.Equal(t, chunk.String(), `{"index":null,"type":0,"bounds":[{"column":"a","lower":"1","upper":"2","has-lower":false,"has-upper":false},{"column":"b","lower":"3","upper":"4","has-lower":false,"has-upper":false},{"column":"c","lower":"5","upper":"6","has-lower":false,"has-upper":false}],"is-first":false,"is-last":false,"where":"","args":null}`) + require.Equal(t, chunk.String(), `{"index":null,"type":0,"bounds":[{"column":"a","lower":"1","upper":"2","has-lower":false,"has-upper":false},{"column":"b","lower":"3","upper":"4","has-lower":false,"has-upper":false},{"column":"c","lower":"5","upper":"6","has-lower":false,"has-upper":false}],"is-first":false,"is-last":false,"where":"","args":null,"index-hint":""}`) require.Equal(t, chunk.ToMeta(), "range in sequence: Full") // same & lower & upper @@ -274,7 +274,7 @@ func TestChunkToString(t *testing.T) { require.Equal(t, arg, expectArgs[i]) } - require.Equal(t, chunk.String(), `{"index":null,"type":0,"bounds":[{"column":"a","lower":"1","upper":"1","has-lower":true,"has-upper":true},{"column":"b","lower":"3","upper":"4","has-lower":true,"has-upper":true},{"column":"c","lower":"5","upper":"5","has-lower":true,"has-upper":true}],"is-first":false,"is-last":false,"where":"","args":null}`) + require.Equal(t, chunk.String(), `{"index":null,"type":0,"bounds":[{"column":"a","lower":"1","upper":"1","has-lower":true,"has-upper":true},{"column":"b","lower":"3","upper":"4","has-lower":true,"has-upper":true},{"column":"c","lower":"5","upper":"5","has-lower":true,"has-upper":true}],"is-first":false,"is-last":false,"where":"","args":null,"index-hint":""}`) require.Equal(t, chunk.ToMeta(), "range in sequence: (1,3,5) < (a,b,c) <= (1,4,5)") // same & upper @@ -309,7 +309,7 @@ func TestChunkToString(t *testing.T) { require.Equal(t, arg, expectArgs[i]) } - require.Equal(t, chunk.String(), `{"index":null,"type":0,"bounds":[{"column":"a","lower":"2","upper":"2","has-lower":false,"has-upper":true},{"column":"b","lower":"3","upper":"4","has-lower":false,"has-upper":true},{"column":"c","lower":"5","upper":"6","has-lower":false,"has-upper":true}],"is-first":false,"is-last":false,"where":"","args":null}`) + require.Equal(t, chunk.String(), `{"index":null,"type":0,"bounds":[{"column":"a","lower":"2","upper":"2","has-lower":false,"has-upper":true},{"column":"b","lower":"3","upper":"4","has-lower":false,"has-upper":true},{"column":"c","lower":"5","upper":"6","has-lower":false,"has-upper":true}],"is-first":false,"is-last":false,"where":"","args":null,"index-hint":""}`) require.Equal(t, chunk.ToMeta(), "range in sequence: (a,b,c) <= (2,4,6)") // same & lower @@ -351,7 +351,7 @@ func TestChunkToString(t *testing.T) { require.Equal(t, arg, expectArgs[i]) } - require.Equal(t, chunk.String(), `{"index":null,"type":0,"bounds":[{"column":"a","lower":"1","upper":"1","has-lower":true,"has-upper":false},{"column":"b","lower":"3","upper":"4","has-lower":true,"has-upper":false},{"column":"c","lower":"5","upper":"6","has-lower":true,"has-upper":false}],"is-first":false,"is-last":false,"where":"","args":null}`) + require.Equal(t, chunk.String(), `{"index":null,"type":0,"bounds":[{"column":"a","lower":"1","upper":"1","has-lower":true,"has-upper":false},{"column":"b","lower":"3","upper":"4","has-lower":true,"has-upper":false},{"column":"c","lower":"5","upper":"6","has-lower":true,"has-upper":false}],"is-first":false,"is-last":false,"where":"","args":null,"index-hint":""}`) require.Equal(t, chunk.ToMeta(), "range in sequence: (1,3,5) < (a,b,c)") // same & none @@ -384,7 +384,7 @@ func TestChunkToString(t *testing.T) { for i, arg := range args { require.Equal(t, arg, expectArgs[i]) } - require.Equal(t, chunk.String(), `{"index":null,"type":0,"bounds":[{"column":"a","lower":"1","upper":"1","has-lower":false,"has-upper":false},{"column":"b","lower":"3","upper":"4","has-lower":false,"has-upper":false},{"column":"c","lower":"5","upper":"6","has-lower":false,"has-upper":false}],"is-first":false,"is-last":false,"where":"","args":null}`) + require.Equal(t, chunk.String(), `{"index":null,"type":0,"bounds":[{"column":"a","lower":"1","upper":"1","has-lower":false,"has-upper":false},{"column":"b","lower":"3","upper":"4","has-lower":false,"has-upper":false},{"column":"c","lower":"5","upper":"6","has-lower":false,"has-upper":false}],"is-first":false,"is-last":false,"where":"","args":null,"index-hint":""}`) require.Equal(t, chunk.ToMeta(), "range in sequence: Full") // all equal @@ -417,7 +417,7 @@ func TestChunkToString(t *testing.T) { for i, arg := range args { require.Equal(t, arg, expectArgs[i]) } - require.Equal(t, chunk.String(), `{"index":null,"type":0,"bounds":[{"column":"a","lower":"1","upper":"1","has-lower":true,"has-upper":true},{"column":"b","lower":"3","upper":"3","has-lower":true,"has-upper":true},{"column":"c","lower":"6","upper":"6","has-lower":true,"has-upper":true}],"is-first":false,"is-last":false,"where":"","args":null}`) + require.Equal(t, chunk.String(), `{"index":null,"type":0,"bounds":[{"column":"a","lower":"1","upper":"1","has-lower":true,"has-upper":true},{"column":"b","lower":"3","upper":"3","has-lower":true,"has-upper":true},{"column":"c","lower":"6","upper":"6","has-lower":true,"has-upper":true}],"is-first":false,"is-last":false,"where":"","args":null,"index-hint":""}`) require.Equal(t, chunk.ToMeta(), "range in sequence: (1,3,6) < (a,b,c) <= (1,3,6)") } From 94674f80a27fdb4f207f71d62f0fd9eb642d1e8c Mon Sep 17 00:00:00 2001 From: Ruihao Chen Date: Thu, 9 Jan 2025 15:17:28 +0800 Subject: [PATCH 11/15] Fix test --- sync_diff_inspector/utils/utils.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/sync_diff_inspector/utils/utils.go b/sync_diff_inspector/utils/utils.go index 49f622b0e..c6edd942d 100644 --- a/sync_diff_inspector/utils/utils.go +++ b/sync_diff_inspector/utils/utils.go @@ -804,11 +804,7 @@ func GetCountAndMd5Checksum( columnIsNull = append(columnIsNull, fmt.Sprintf("ISNULL(%s)", name)) } - query := fmt.Sprintf(`SELECT %s - COUNT(*) as CNT, - BIT_XOR(CAST(CONV(SUBSTRING(MD5(CONCAT_WS(',', %s, CONCAT(%s))), 1, 16), 16, 10) AS UNSIGNED) ^ - CAST(CONV(SUBSTRING(MD5(CONCAT_WS(',', %s, CONCAT(%s))), 17, 16), 16, 10) AS UNSIGNED)) as CHECKSUM - FROM %s WHERE %s`, + query := fmt.Sprintf("SELECT %s COUNT(*) as CNT, BIT_XOR(CAST(CONV(SUBSTRING(MD5(CONCAT_WS(',', %s, CONCAT(%s))), 1, 16), 16, 10) AS UNSIGNED) ^ CAST(CONV(SUBSTRING(MD5(CONCAT_WS(',', %s, CONCAT(%s))), 17, 16), 16, 10) AS UNSIGNED)) as CHECKSUM FROM %s WHERE %s;", indexHint, strings.Join(columnNames, ", "), strings.Join(columnIsNull, ", "), From e3bd5959935c00b552639ee17a4895cb06645a17 Mon Sep 17 00:00:00 2001 From: Ruihao Chen Date: Wed, 15 Jan 2025 15:34:52 +0800 Subject: [PATCH 12/15] Update --- sync_diff_inspector/chunk/chunk.go | 4 +++- sync_diff_inspector/source/tidb.go | 18 +++++++++++++----- sync_diff_inspector/splitter/bucket.go | 5 +++++ sync_diff_inspector/splitter/random.go | 9 ++++----- sync_diff_inspector/utils/utils.go | 12 ++++++++++++ 5 files changed, 37 insertions(+), 11 deletions(-) diff --git a/sync_diff_inspector/chunk/chunk.go b/sync_diff_inspector/chunk/chunk.go index d8d2c83ce..d32d654d8 100644 --- a/sync_diff_inspector/chunk/chunk.go +++ b/sync_diff_inspector/chunk/chunk.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tidb-tools/pkg/dbutil" + "github.com/pingcap/tidb/pkg/meta/model" "go.uber.org/zap" ) @@ -151,7 +152,8 @@ type Range struct { Args []interface{} `json:"args"` // IndexHint is the index for the checksum query hint, it's only used in TiDB source. - IndexHint string `json:"index-hint"` + IndexHint string `json:"index-hint"` + IndexColumns []*model.ColumnInfo `json:"-"` columnOffset map[string]int } diff --git a/sync_diff_inspector/source/tidb.go b/sync_diff_inspector/source/tidb.go index 0ac1dcf90..5c47035b9 100644 --- a/sync_diff_inspector/source/tidb.go +++ b/sync_diff_inspector/source/tidb.go @@ -17,7 +17,6 @@ import ( "context" "database/sql" "fmt" - "strings" "time" "github.com/coreos/go-semver/semver" @@ -130,10 +129,19 @@ func (s *TiDBSource) GetCountAndMd5(ctx context.Context, tableRange *splitter.Ra indexHint := "" if chunk.IndexHint != "" { - indexHint = fmt.Sprintf("/*+ USE_INDEX(`%s`.`%s`, `%s`) */", - matchSource.OriginSchema, matchSource.OriginTable, - strings.Replace(chunk.IndexHint, "`", "``", -1), - ) + // Handle the case that index columns of upstream and downstream are the same, but the index names are different. + if tableInfos, err := s.GetSourceStructInfo(ctx, tableRange.GetTableIndex()); err == nil { + for _, index := range tableInfos[0].Indices { + if utils.IsSameIndex(index, chunk.IndexColumns) { + indexHint = fmt.Sprintf("/*+ USE_INDEX(`%s`.`%s`, `%s`) */", + matchSource.OriginSchema, + matchSource.OriginTable, + index.Name.L, + ) + break + } + } + } } count, checksum, err := utils.GetCountAndMd5Checksum( diff --git a/sync_diff_inspector/splitter/bucket.go b/sync_diff_inspector/splitter/bucket.go index 6a2fa6737..6cc61eb06 100644 --- a/sync_diff_inspector/splitter/bucket.go +++ b/sync_diff_inspector/splitter/bucket.go @@ -281,6 +281,7 @@ func (s *BucketIterator) produceChunks(ctx context.Context, startRange *RangeInf if leftCnt > 0 { chunkRange := chunk.NewChunkRange() chunkRange.IndexHint = s.indexName + chunkRange.IndexColumns = s.indexColumns for i, column := range s.indexColumns { chunkRange.Update(column.Name.O, "", nextUpperValues[i], false, true) @@ -314,6 +315,8 @@ func (s *BucketIterator) produceChunks(ctx context.Context, startRange *RangeInf chunkRange := chunk.NewChunkRange() chunkRange.IndexHint = s.indexName + chunkRange.IndexColumns = s.indexColumns + for j, column := range s.indexColumns { var lowerValue, upperValue string if len(lowerValues) > 0 { @@ -352,6 +355,8 @@ func (s *BucketIterator) produceChunks(ctx context.Context, startRange *RangeInf // merge the rest keys into one chunk chunkRange := chunk.NewChunkRange() chunkRange.IndexHint = s.indexName + chunkRange.IndexColumns = s.indexColumns + if len(lowerValues) > 0 { for j, column := range s.indexColumns { chunkRange.Update(column.Name.O, lowerValues[j], "", true, false) diff --git a/sync_diff_inspector/splitter/random.go b/sync_diff_inspector/splitter/random.go index 44dd00b41..1fe580ce3 100644 --- a/sync_diff_inspector/splitter/random.go +++ b/sync_diff_inspector/splitter/random.go @@ -60,9 +60,10 @@ func NewRandomIteratorWithCheckpoint(ctx context.Context, progressID string, tab return nil, errors.Trace(err) } + chunkRange := chunk.NewChunkRange() + iFields := &indexFields{cols: fields, tableInfo: table.Info} var indices = dbutil.FindAllIndex(table.Info) - indexName := "" NEXTINDEX: for _, index := range indices { if index == nil { @@ -92,13 +93,11 @@ NEXTINDEX: } // Found the index, use it as index hint. - indexName = index.Name.O + chunkRange.IndexHint = index.Name.O + chunkRange.IndexColumns = indexColumns break } - chunkRange := chunk.NewChunkRange() - chunkRange.IndexHint = indexName - beginIndex := 0 bucketChunkCnt := 0 chunkCnt := 0 diff --git a/sync_diff_inspector/utils/utils.go b/sync_diff_inspector/utils/utils.go index c6edd942d..d2e34eebf 100644 --- a/sync_diff_inspector/utils/utils.go +++ b/sync_diff_inspector/utils/utils.go @@ -1066,3 +1066,15 @@ func IsBinaryColumn(col *model.ColumnInfo) bool { // varbinary or binary return (col.GetType() == mysql.TypeVarchar || col.GetType() == mysql.TypeString) && mysql.HasBinaryFlag(col.GetFlag()) } + +func IsSameIndex(index *model.IndexInfo, columns []*model.ColumnInfo) bool { + if len(index.Columns) != len(columns) { + return false + } + for i, col := range index.Columns { + if col.Name.L != columns[i].Name.L { + return false + } + } + return true +} From 4b71979f20eaf32028197eac49ec7340c39c2864 Mon Sep 17 00:00:00 2001 From: Ruihao Chen Date: Mon, 20 Jan 2025 10:58:08 +0800 Subject: [PATCH 13/15] Add config --- sync_diff_inspector/chunk/chunk.go | 5 +- sync_diff_inspector/config/config.go | 2 + sync_diff_inspector/diff.go | 3 ++ sync_diff_inspector/source/mysql_shard.go | 14 +++++- sync_diff_inspector/source/source.go | 3 ++ sync_diff_inspector/source/tidb.go | 57 ++++++++++++++++++++++- sync_diff_inspector/utils/utils.go | 4 +- sync_diff_inspector/utils/utils_test.go | 6 ++- 8 files changed, 86 insertions(+), 8 deletions(-) diff --git a/sync_diff_inspector/chunk/chunk.go b/sync_diff_inspector/chunk/chunk.go index d32d654d8..ebfac2f07 100644 --- a/sync_diff_inspector/chunk/chunk.go +++ b/sync_diff_inspector/chunk/chunk.go @@ -152,7 +152,8 @@ type Range struct { Args []interface{} `json:"args"` // IndexHint is the index for the checksum query hint, it's only used in TiDB source. - IndexHint string `json:"index-hint"` + IndexHint string `json:"index-hint"` + // IndexColumns is the columns used to split chunks. IndexColumns []*model.ColumnInfo `json:"-"` columnOffset map[string]int @@ -392,6 +393,7 @@ func (c *Range) Update(column, lower, upper string, updateLower, updateUpper boo func (c *Range) Copy() *Range { newChunk := NewChunkRange() newChunk.IndexHint = c.IndexHint + newChunk.IndexColumns = c.IndexColumns for _, bound := range c.Bounds { newChunk.addBound(&Bound{ Column: bound.Column, @@ -408,6 +410,7 @@ func (c *Range) Copy() *Range { func (c *Range) Clone() *Range { newChunk := NewChunkRange() newChunk.IndexHint = c.IndexHint + newChunk.IndexColumns = c.IndexColumns for _, bound := range c.Bounds { newChunk.addBound(&Bound{ Column: bound.Column, diff --git a/sync_diff_inspector/config/config.go b/sync_diff_inspector/config/config.go index 7fd99cca6..f04c09b4f 100644 --- a/sync_diff_inspector/config/config.go +++ b/sync_diff_inspector/config/config.go @@ -374,6 +374,8 @@ type Config struct { CheckStructOnly bool `toml:"check-struct-only" json:"check-struct-only"` // experimental feature: only check table data without table struct CheckDataOnly bool `toml:"check-data-only" json:"-"` + // the mode of hint + HintMode string `toml:"hint-mode" json:"hint-mode"` // skip validation for tables that don't exist upstream or downstream SkipNonExistingTable bool `toml:"skip-non-existing-table" json:"-"` // DMAddr is dm-master's address, the format should like "http://127.0.0.1:8261" diff --git a/sync_diff_inspector/diff.go b/sync_diff_inspector/diff.go index b46885d0c..9f9af65eb 100644 --- a/sync_diff_inspector/diff.go +++ b/sync_diff_inspector/diff.go @@ -135,6 +135,9 @@ func (df *Diff) init(ctx context.Context, cfg *config.Config) (err error) { setTiDBCfg() df.downstream, df.upstream, err = source.NewSources(ctx, cfg) + df.downstream.SetHintMode(cfg.HintMode) + df.upstream.SetHintMode(cfg.HintMode) + if err != nil { return errors.Trace(err) } diff --git a/sync_diff_inspector/source/mysql_shard.go b/sync_diff_inspector/source/mysql_shard.go index 5f7aa2e08..fca658963 100644 --- a/sync_diff_inspector/source/mysql_shard.go +++ b/sync_diff_inspector/source/mysql_shard.go @@ -64,6 +64,11 @@ type MySQLSources struct { sourceTablesMap map[string][]*common.TableShardSource } +// SetHintMode does nothing for MySQL source +func (*MySQLSources) SetHintMode(string) error { + return nil +} + func getMatchedSourcesForTable(sourceTablesMap map[string][]*common.TableShardSource, table *common.TableDiff) []*common.TableShardSource { if sourceTablesMap == nil { log.Fatal("unreachable, source tables map shouldn't be nil.") @@ -103,8 +108,15 @@ func (s *MySQLSources) GetCountAndMd5(ctx context.Context, tableRange *splitter. for _, ms := range matchSources { go func(ms *common.TableShardSource) { + conn, err := ms.DBConn.Conn(ctx) + if err != nil { + infoCh <- &ChecksumInfo{ + Err: err, + } + return + } count, checksum, err := utils.GetCountAndMd5Checksum( - ctx, ms.DBConn, ms.OriginSchema, ms.OriginTable, table.Info, + ctx, conn, ms.OriginSchema, ms.OriginTable, table.Info, chunk.Where, "", chunk.Args) infoCh <- &ChecksumInfo{ Checksum: checksum, diff --git a/sync_diff_inspector/source/source.go b/sync_diff_inspector/source/source.go index c0c827105..8ef4d0eb9 100644 --- a/sync_diff_inspector/source/source.go +++ b/sync_diff_inspector/source/source.go @@ -72,6 +72,9 @@ type TableAnalyzer interface { } type Source interface { + // SetHintMode set the hint mode + SetHintMode(string) error + // GetTableAnalyzer pick the proper analyzer for different source. // the implement of this function is different in mysql/tidb. GetTableAnalyzer() TableAnalyzer diff --git a/sync_diff_inspector/source/tidb.go b/sync_diff_inspector/source/tidb.go index 5c47035b9..c15c06adb 100644 --- a/sync_diff_inspector/source/tidb.go +++ b/sync_diff_inspector/source/tidb.go @@ -17,6 +17,7 @@ import ( "context" "database/sql" "fmt" + "strings" "time" "github.com/coreos/go-semver/semver" @@ -82,10 +83,36 @@ func (s *TiDBRowsIterator) Next() (map[string]*dbutil.ColumnData, error) { return nil, nil } +type hintMode int + +const ( + // hintNone does nothing + hintNone hintMode = iota + // hintSQL indicates using SQL hints to force index scan + hintSQL + // hintSessionVar indicates using session variable to force index scan + hintSessionVar +) + +// String implements the fmt.Stringer interface. +func (m hintMode) String() string { + switch m { + case hintNone: + return "None" + case hintSQL: + return "HintSQL" + case hintSessionVar: + return "HintSessionVar" + default: + panic(fmt.Sprintf("invalid hint mode '%d'", m)) + } +} + type TiDBSource struct { tableDiffs []*common.TableDiff sourceTableMap map[string]*common.TableSource snapshot string + mode hintMode // bucketSpliterPool is the shared pool to produce chunks using bucket bucketSpliterPool *utils.WorkerPool dbConn *sql.DB @@ -93,6 +120,23 @@ type TiDBSource struct { version *semver.Version } +// SetHintMode parses the string value to the hintMode. +func (s *TiDBSource) SetHintMode(ss string) error { + switch strings.ToLower(ss) { + case "", "none": + s.mode = hintNone + case "sql": + s.mode = hintSQL + case "sessionvar": + s.mode = hintSessionVar + default: + return errors.Errorf("invalid hint mode '%s', please choose valid option between ['', 'sql', 'session']", ss) + } + + log.Info("get hint mode", zap.String("hint mode", s.mode.String())) + return nil +} + func (s *TiDBSource) GetTableAnalyzer() TableAnalyzer { return &TiDBTableAnalyzer{ s.dbConn, @@ -127,8 +171,15 @@ func (s *TiDBSource) GetCountAndMd5(ctx context.Context, tableRange *splitter.Ra matchSource := getMatchSource(s.sourceTableMap, table) + conn, err := s.dbConn.Conn(ctx) + if err != nil { + return &ChecksumInfo{ + Err: err, + } + } + indexHint := "" - if chunk.IndexHint != "" { + if s.mode == hintSQL && len(chunk.IndexColumns) > 0 { // Handle the case that index columns of upstream and downstream are the same, but the index names are different. if tableInfos, err := s.GetSourceStructInfo(ctx, tableRange.GetTableIndex()); err == nil { for _, index := range tableInfos[0].Indices { @@ -142,10 +193,12 @@ func (s *TiDBSource) GetCountAndMd5(ctx context.Context, tableRange *splitter.Ra } } } + } else if s.mode == hintSessionVar { + conn.ExecContext(ctx, "set session tidb_opt_prefer_range_scan = 1") } count, checksum, err := utils.GetCountAndMd5Checksum( - ctx, s.dbConn, matchSource.OriginSchema, matchSource.OriginTable, table.Info, + ctx, conn, matchSource.OriginSchema, matchSource.OriginTable, table.Info, chunk.Where, indexHint, chunk.Args) cost := time.Since(beginTime) diff --git a/sync_diff_inspector/utils/utils.go b/sync_diff_inspector/utils/utils.go index d2e34eebf..ca9a22459 100644 --- a/sync_diff_inspector/utils/utils.go +++ b/sync_diff_inspector/utils/utils.go @@ -768,7 +768,7 @@ func GetTableSize(ctx context.Context, db *sql.DB, schemaName, tableName string) // GetCountAndMd5Checksum returns checksum code and count of some data by given condition func GetCountAndMd5Checksum( ctx context.Context, - db *sql.DB, + conn *sql.Conn, schemaName, tableName string, tbInfo *model.TableInfo, limitRange string, @@ -817,7 +817,7 @@ func GetCountAndMd5Checksum( var count sql.NullInt64 var checksum uint64 - err := db.QueryRowContext(ctx, query, args...).Scan(&count, &checksum) + err := conn.QueryRowContext(ctx, query, args...).Scan(&count, &checksum) if err != nil { log.Warn("execute checksum query fail", zap.String("query", query), diff --git a/sync_diff_inspector/utils/utils_test.go b/sync_diff_inspector/utils/utils_test.go index 9c64a56b7..cea44165c 100644 --- a/sync_diff_inspector/utils/utils_test.go +++ b/sync_diff_inspector/utils/utils_test.go @@ -261,9 +261,9 @@ func TestGetCountAndMd5Checksum(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) defer cancel() - conn, mock, err := sqlmock.New() + db, mock, err := sqlmock.New() require.NoError(t, err) - defer conn.Close() + defer db.Close() createTableSQL := "create table `test`.`test`(`a` int, `c` float, `b` varchar(10), `d` datetime, primary key(`a`, `b`), key(`c`, `d`))" tableInfo, err := dbutil.GetTableInfoBySQL(createTableSQL, parser.New()) @@ -271,6 +271,8 @@ func TestGetCountAndMd5Checksum(t *testing.T) { mock.ExpectQuery("SELECT COUNT.*FROM `test_schema`\\.`test_table` WHERE \\[23 45\\].*").WithArgs("123", "234").WillReturnRows(sqlmock.NewRows([]string{"CNT", "CHECKSUM"}).AddRow(123, 456)) + conn, err := db.Conn(ctx) + require.NoError(t, err) count, checksum, err := GetCountAndMd5Checksum(ctx, conn, "test_schema", "test_table", tableInfo, "[23 45]", "", []interface{}{"123", "234"}) require.NoError(t, err) require.Equal(t, count, int64(123)) From eb933f6b094302987019fd4620904683a6aa1530 Mon Sep 17 00:00:00 2001 From: Ruihao Chen Date: Tue, 21 Jan 2025 14:13:03 +0800 Subject: [PATCH 14/15] Add comments --- sync_diff_inspector/chunk/chunk.go | 4 ++-- sync_diff_inspector/source/tidb.go | 9 +++++++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/sync_diff_inspector/chunk/chunk.go b/sync_diff_inspector/chunk/chunk.go index ebfac2f07..ce75346a4 100644 --- a/sync_diff_inspector/chunk/chunk.go +++ b/sync_diff_inspector/chunk/chunk.go @@ -151,9 +151,9 @@ type Range struct { Where string `json:"where"` Args []interface{} `json:"args"` - // IndexHint is the index for the checksum query hint, it's only used in TiDB source. + // IndexHint is the index found in chunk splitting, it's only used for test. IndexHint string `json:"index-hint"` - // IndexColumns is the columns used to split chunks. + // IndexColumns is the columns used to split chunks, and it's used to find index hint in checksum query. IndexColumns []*model.ColumnInfo `json:"-"` columnOffset map[string]int diff --git a/sync_diff_inspector/source/tidb.go b/sync_diff_inspector/source/tidb.go index c15c06adb..4d1c959a2 100644 --- a/sync_diff_inspector/source/tidb.go +++ b/sync_diff_inspector/source/tidb.go @@ -180,9 +180,14 @@ func (s *TiDBSource) GetCountAndMd5(ctx context.Context, tableRange *splitter.Ra indexHint := "" if s.mode == hintSQL && len(chunk.IndexColumns) > 0 { - // Handle the case that index columns of upstream and downstream are the same, but the index names are different. + // Since the index name is extracted from one data source, + // while another data source may have an index with same columns but a different index name, + // we use the index columns to get the actual index name here. + // For example: + // Upstream: idx1(c1, c2) + // Downstream: idx2(c1, c2) if tableInfos, err := s.GetSourceStructInfo(ctx, tableRange.GetTableIndex()); err == nil { - for _, index := range tableInfos[0].Indices { + for _, index := range dbutil.FindAllIndex(tableInfos[0]) { if utils.IsSameIndex(index, chunk.IndexColumns) { indexHint = fmt.Sprintf("/*+ USE_INDEX(`%s`.`%s`, `%s`) */", matchSource.OriginSchema, From 2c49ce3f982ffbd2ce4a1f3250fb6af968c21c37 Mon Sep 17 00:00:00 2001 From: Ruihao Chen Date: Tue, 21 Jan 2025 15:09:16 +0800 Subject: [PATCH 15/15] Fix tests --- sync_diff_inspector/config/config_test.go | 2 +- sync_diff_inspector/source/mysql_shard.go | 1 + sync_diff_inspector/source/tidb.go | 1 + 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/sync_diff_inspector/config/config_test.go b/sync_diff_inspector/config/config_test.go index 7c12c260b..1145a0c6d 100644 --- a/sync_diff_inspector/config/config_test.go +++ b/sync_diff_inspector/config/config_test.go @@ -50,7 +50,7 @@ func TestParseConfig(t *testing.T) { // we might not use the same config to run this test. e.g. MYSQL_PORT can be 4000 require.JSONEq(t, cfg.String(), - "{\"check-thread-count\":4,\"split-thread-count\":5,\"export-fix-sql\":true,\"check-struct-only\":false,\"dm-addr\":\"\",\"dm-task\":\"\",\"data-sources\":{\"mysql1\":{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule2\"],\"Router\":{\"Selector\":{}},\"Conn\":null},\"mysql2\":{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule2\"],\"Router\":{\"Selector\":{}},\"Conn\":null},\"mysql3\":{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule3\"],\"Router\":{\"Selector\":{}},\"Conn\":null},\"tidb0\":{\"host\":\"127.0.0.1\",\"port\":4000,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"security\":null,\"route-rules\":null,\"Router\":{\"Selector\":{}},\"Conn\":null}},\"routes\":{\"rule1\":{\"schema-pattern\":\"test_*\",\"table-pattern\":\"t_*\",\"target-schema\":\"test\",\"target-table\":\"t\"},\"rule2\":{\"schema-pattern\":\"test2_*\",\"table-pattern\":\"t2_*\",\"target-schema\":\"test2\",\"target-table\":\"t2\"},\"rule3\":{\"schema-pattern\":\"test2_*\",\"table-pattern\":\"t2_*\",\"target-schema\":\"test\",\"target-table\":\"t\"}},\"table-configs\":{\"config1\":{\"target-tables\":[\"schema*.table*\",\"test2.t2\"],\"Schema\":\"\",\"Table\":\"\",\"ConfigIndex\":0,\"HasMatched\":false,\"IgnoreColumns\":[\"\",\"\"],\"Fields\":[\"\"],\"Range\":\"age \\u003e 10 AND age \\u003c 20\",\"TargetTableInfo\":null,\"Collation\":\"\",\"chunk-size\":0}},\"task\":{\"source-instances\":[\"mysql1\",\"mysql2\",\"mysql3\"],\"source-routes\":null,\"target-instance\":\"tidb0\",\"target-check-tables\":[\"schema*.table*\",\"!c.*\",\"test2.t2\"],\"target-configs\":[\"config1\"],\"output-dir\":\"/tmp/output/config\",\"SourceInstances\":[{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule2\"],\"Router\":{\"Selector\":{}},\"Conn\":null},{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule2\"],\"Router\":{\"Selector\":{}},\"Conn\":null},{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule3\"],\"Router\":{\"Selector\":{}},\"Conn\":null}],\"TargetInstance\":{\"host\":\"127.0.0.1\",\"port\":4000,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"security\":null,\"route-rules\":null,\"Router\":{\"Selector\":{}},\"Conn\":null},\"TargetTableConfigs\":[{\"target-tables\":[\"schema*.table*\",\"test2.t2\"],\"Schema\":\"\",\"Table\":\"\",\"ConfigIndex\":0,\"HasMatched\":false,\"IgnoreColumns\":[\"\",\"\"],\"Fields\":[\"\"],\"Range\":\"age \\u003e 10 AND age \\u003c 20\",\"TargetTableInfo\":null,\"Collation\":\"\",\"chunk-size\":0}],\"TargetCheckTables\":[{},{},{}],\"FixDir\":\"/tmp/output/config/fix-on-tidb0\",\"CheckpointDir\":\"/tmp/output/config/checkpoint\",\"HashFile\":\"\"},\"ConfigFile\":\"config_sharding.toml\",\"PrintVersion\":false}") + "{\"check-thread-count\":4,\"split-thread-count\":5,\"export-fix-sql\":true,\"hint-mode\":\"\",\"check-struct-only\":false,\"dm-addr\":\"\",\"dm-task\":\"\",\"data-sources\":{\"mysql1\":{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule2\"],\"Router\":{\"Selector\":{}},\"Conn\":null},\"mysql2\":{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule2\"],\"Router\":{\"Selector\":{}},\"Conn\":null},\"mysql3\":{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule3\"],\"Router\":{\"Selector\":{}},\"Conn\":null},\"tidb0\":{\"host\":\"127.0.0.1\",\"port\":4000,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"security\":null,\"route-rules\":null,\"Router\":{\"Selector\":{}},\"Conn\":null}},\"routes\":{\"rule1\":{\"schema-pattern\":\"test_*\",\"table-pattern\":\"t_*\",\"target-schema\":\"test\",\"target-table\":\"t\"},\"rule2\":{\"schema-pattern\":\"test2_*\",\"table-pattern\":\"t2_*\",\"target-schema\":\"test2\",\"target-table\":\"t2\"},\"rule3\":{\"schema-pattern\":\"test2_*\",\"table-pattern\":\"t2_*\",\"target-schema\":\"test\",\"target-table\":\"t\"}},\"table-configs\":{\"config1\":{\"target-tables\":[\"schema*.table*\",\"test2.t2\"],\"Schema\":\"\",\"Table\":\"\",\"ConfigIndex\":0,\"HasMatched\":false,\"IgnoreColumns\":[\"\",\"\"],\"Fields\":[\"\"],\"Range\":\"age \\u003e 10 AND age \\u003c 20\",\"TargetTableInfo\":null,\"Collation\":\"\",\"chunk-size\":0}},\"task\":{\"source-instances\":[\"mysql1\",\"mysql2\",\"mysql3\"],\"source-routes\":null,\"target-instance\":\"tidb0\",\"target-check-tables\":[\"schema*.table*\",\"!c.*\",\"test2.t2\"],\"target-configs\":[\"config1\"],\"output-dir\":\"/tmp/output/config\",\"SourceInstances\":[{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule2\"],\"Router\":{\"Selector\":{}},\"Conn\":null},{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule2\"],\"Router\":{\"Selector\":{}},\"Conn\":null},{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule3\"],\"Router\":{\"Selector\":{}},\"Conn\":null}],\"TargetInstance\":{\"host\":\"127.0.0.1\",\"port\":4000,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"security\":null,\"route-rules\":null,\"Router\":{\"Selector\":{}},\"Conn\":null},\"TargetTableConfigs\":[{\"target-tables\":[\"schema*.table*\",\"test2.t2\"],\"Schema\":\"\",\"Table\":\"\",\"ConfigIndex\":0,\"HasMatched\":false,\"IgnoreColumns\":[\"\",\"\"],\"Fields\":[\"\"],\"Range\":\"age \\u003e 10 AND age \\u003c 20\",\"TargetTableInfo\":null,\"Collation\":\"\",\"chunk-size\":0}],\"TargetCheckTables\":[{},{},{}],\"FixDir\":\"/tmp/output/config/fix-on-tidb0\",\"CheckpointDir\":\"/tmp/output/config/checkpoint\",\"HashFile\":\"\"},\"ConfigFile\":\"config_sharding.toml\",\"PrintVersion\":false}") hash, err := cfg.Task.ComputeConfigHash() require.NoError(t, err) require.Equal(t, hash, "c080f9894ec24aadb4aaec1109cd1951454f09a1233f2034bc3b06e0903cb289") diff --git a/sync_diff_inspector/source/mysql_shard.go b/sync_diff_inspector/source/mysql_shard.go index fca658963..cece39fab 100644 --- a/sync_diff_inspector/source/mysql_shard.go +++ b/sync_diff_inspector/source/mysql_shard.go @@ -115,6 +115,7 @@ func (s *MySQLSources) GetCountAndMd5(ctx context.Context, tableRange *splitter. } return } + defer conn.Close() count, checksum, err := utils.GetCountAndMd5Checksum( ctx, conn, ms.OriginSchema, ms.OriginTable, table.Info, chunk.Where, "", chunk.Args) diff --git a/sync_diff_inspector/source/tidb.go b/sync_diff_inspector/source/tidb.go index 4d1c959a2..c7a3dcbe4 100644 --- a/sync_diff_inspector/source/tidb.go +++ b/sync_diff_inspector/source/tidb.go @@ -177,6 +177,7 @@ func (s *TiDBSource) GetCountAndMd5(ctx context.Context, tableRange *splitter.Ra Err: err, } } + defer conn.Close() indexHint := "" if s.mode == hintSQL && len(chunk.IndexColumns) > 0 {