Skip to content

Commit

Permalink
statistics: optimize stats delta dumping with batch processing (pingc…
Browse files Browse the repository at this point in the history
  • Loading branch information
Rustin170506 authored Jan 23, 2025
1 parent 3ee53bb commit b313166
Show file tree
Hide file tree
Showing 56 changed files with 549 additions and 638 deletions.
2 changes: 0 additions & 2 deletions pkg/disttask/importinto/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ type SharedVars struct {
TableImporter *importer.TableImporter
DataEngine *backend.OpenedEngine
IndexEngine *backend.OpenedEngine
Progress *importer.Progress

mu sync.Mutex
Checksum *verification.KVGroupChecksum
Expand Down Expand Up @@ -183,5 +182,4 @@ type Checksum struct {
// This portion of the code may be implemented uniformly in the framework in the future.
type Result struct {
LoadedRowCnt uint64
ColSizeMap map[int64]int64
}
8 changes: 1 addition & 7 deletions pkg/disttask/importinto/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,14 +577,9 @@ func updateResult(handle storage.TaskHandle, task *proto.Task, taskMeta *TaskMet
}
subtaskMetas = append(subtaskMetas, &subtaskMeta)
}
columnSizeMap := make(map[int64]int64)
for _, subtaskMeta := range subtaskMetas {
taskMeta.Result.LoadedRowCnt += subtaskMeta.Result.LoadedRowCnt
for key, val := range subtaskMeta.Result.ColSizeMap {
columnSizeMap[key] += val
}
}
taskMeta.Result.ColSizeMap = columnSizeMap

if globalSort {
taskMeta.Result.LoadedRowCnt, err = getLoadedRowCountOnGlobalSort(handle, task)
Expand Down Expand Up @@ -662,8 +657,7 @@ func (sch *ImportSchedulerExt) finishJob(ctx context.Context, logger *zap.Logger
func(ctx context.Context) (bool, error) {
return true, taskHandle.WithNewSession(func(se sessionctx.Context) error {
if err := importer.FlushTableStats(ctx, se, taskMeta.Plan.TableInfo.ID, &importer.JobImportResult{
Affected: taskMeta.Result.LoadedRowCnt,
ColSizeMap: taskMeta.Result.ColSizeMap,
Affected: taskMeta.Result.LoadedRowCnt,
}); err != nil {
logger.Warn("flush table stats failed", zap.Error(err))
}
Expand Down
2 changes: 0 additions & 2 deletions pkg/disttask/importinto/subtask_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ func (e *importMinimalTaskExecutor) Run(ctx context.Context, dataWriter, indexWr
sharedVars.TableImporter,
sharedVars.DataEngine,
sharedVars.IndexEngine,
sharedVars.Progress,
logger,
checksum,
); err != nil {
Expand All @@ -82,7 +81,6 @@ func (e *importMinimalTaskExecutor) Run(ctx context.Context, dataWriter, indexWr
sharedVars.TableImporter,
dataWriter,
indexWriter,
sharedVars.Progress,
logger,
checksum,
); err != nil {
Expand Down
2 changes: 0 additions & 2 deletions pkg/disttask/importinto/task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ func (s *importStepExecutor) RunSubtask(ctx context.Context, subtask *proto.Subt
TableImporter: s.tableImporter,
DataEngine: dataEngine,
IndexEngine: indexEngine,
Progress: importer.NewProgress(),
Checksum: verification.NewKVGroupChecksumWithKeyspace(s.tableImporter.GetKeySpace()),
SortedDataMeta: &external.SortedKVMeta{},
SortedIndexMetas: make(map[int64]*external.SortedKVMeta),
Expand Down Expand Up @@ -251,7 +250,6 @@ func (s *importStepExecutor) OnFinished(ctx context.Context, subtask *proto.Subt
}
subtaskMeta.Result = Result{
LoadedRowCnt: dataKVCount,
ColSizeMap: sharedVars.Progress.GetColSize(),
}
allocators := sharedVars.TableImporter.Allocators()
subtaskMeta.MaxIDs = map[autoid.AllocatorType]int64{
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func (e *DeleteExec) removeRow(ctx sessionctx.Context, t table.Table, h kv.Handl
return err
}

err = t.RemoveRecord(ctx.GetTableCtx(), txn, h, data, posInfo.ExtraPartialRowOption)
err = t.RemoveRecord(ctx.GetTableCtx(), txn, h, data, posInfo.IndexesRowLayout)
if err != nil {
return err
}
Expand Down
36 changes: 36 additions & 0 deletions pkg/executor/historical_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,3 +403,39 @@ PARTITION p0 VALUES LESS THAN (6)
require.NotNil(t, jt)
require.False(t, jt.IsHistoricalStats)
}

func TestDumpHistoricalStatsMetaForMultiTables(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_historical_stats = 1")
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t1(a int, b varchar(10), index idx(a, b))")
tk.MustExec("create table t2(a int, b varchar(10), index idx(a, b))")
// Insert some data.
tk.MustExec("insert into t1 values (1, 'a'), (2, 'b'), (3, 'c')")
tk.MustExec("insert into t2 values (1, 'a'), (2, 'b'), (3, 'c')")
// Analyze the tables.
tk.MustExec("analyze table t1")
tk.MustExec("analyze table t2")
h := dom.StatsHandle()
// Update the stats cache.
require.NoError(t, h.Update(context.Background(), dom.InfoSchema()))

// Insert more data.
tk.MustExec("insert into t1 values (4, 'd'), (5, 'e'), (6, 'f')")
tk.MustExec("insert into t2 values (4, 'd'), (5, 'e'), (6, 'f')")
// Dump stats delta to kv.
require.NoError(t, h.DumpStatsDeltaToKV(true))

// Check historical stats meta.
tbl1, err := dom.InfoSchema().TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t1"))
require.NoError(t, err)
tbl2, err := dom.InfoSchema().TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t2"))
require.NoError(t, err)
rows := tk.MustQuery("select version from mysql.stats_meta_history where table_id = ? order by version desc limit 1", tbl1.Meta().ID).Rows()
version1 := rows[0][0].(string)
rows = tk.MustQuery("select version from mysql.stats_meta_history where table_id = ? order by version desc limit 1", tbl2.Meta().ID).Rows()
version2 := rows[0][0].(string)
require.Equal(t, version1, version2)
}
1 change: 0 additions & 1 deletion pkg/executor/importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ go_library(
"job.go",
"kv_encode.go",
"precheck.go",
"progress.go",
"table_import.go",
],
importpath = "github.com/pingcap/tidb/pkg/executor/importer",
Expand Down
5 changes: 1 addition & 4 deletions pkg/executor/importer/engine_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ func ProcessChunk(
chunk *checkpoints.ChunkCheckpoint,
tableImporter *TableImporter,
dataEngine, indexEngine *backend.OpenedEngine,
progress *Progress,
logger *zap.Logger,
groupChecksum *verification.KVGroupChecksum,
) error {
Expand Down Expand Up @@ -65,7 +64,7 @@ func ProcessChunk(
}
}()

return ProcessChunkWithWriter(ctx, chunk, tableImporter, dataWriter, indexWriter, progress, logger, groupChecksum)
return ProcessChunkWithWriter(ctx, chunk, tableImporter, dataWriter, indexWriter, logger, groupChecksum)
}

// ProcessChunkWithWriter processes a chunk, and write kv pairs to dataWriter and indexWriter.
Expand All @@ -74,7 +73,6 @@ func ProcessChunkWithWriter(
chunk *checkpoints.ChunkCheckpoint,
tableImporter *TableImporter,
dataWriter, indexWriter backend.EngineWriter,
progress *Progress,
logger *zap.Logger,
groupChecksum *verification.KVGroupChecksum,
) error {
Expand Down Expand Up @@ -116,6 +114,5 @@ func ProcessChunkWithWriter(
if err != nil {
return err
}
progress.AddColSize(encoder.GetColumnSize())
return nil
}
5 changes: 2 additions & 3 deletions pkg/executor/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -1395,9 +1395,8 @@ func getDataSourceType(p *plannercore.ImportInto) DataSourceType {

// JobImportResult is the result of the job import.
type JobImportResult struct {
Affected uint64
Warnings []contextutil.SQLWarn
ColSizeMap variable.DeltaColsMap
Affected uint64
Warnings []contextutil.SQLWarn
}

// GetMsgFromBRError get msg from BR error.
Expand Down
8 changes: 2 additions & 6 deletions pkg/executor/importer/importer_testkit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,11 +314,9 @@ func TestProcessChunkWith(t *testing.T) {
defer ti.Backend().CloseEngineMgr()
kvWriter := mock.NewMockEngineWriter(ctrl)
kvWriter.EXPECT().AppendRows(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
progress := importer.NewProgress()
checksum := verify.NewKVGroupChecksumWithKeyspace(keyspace)
err := importer.ProcessChunkWithWriter(ctx, chunkInfo, ti, kvWriter, kvWriter, progress, zap.NewExample(), checksum)
err := importer.ProcessChunkWithWriter(ctx, chunkInfo, ti, kvWriter, kvWriter, zap.NewExample(), checksum)
require.NoError(t, err)
require.Len(t, progress.GetColSize(), 3)
checksumMap := checksum.GetInnerChecksums()
require.Len(t, checksumMap, 1)
require.Equal(t, verify.MakeKVChecksum(74, 2, 15625182175392723123), *checksumMap[verify.DataKVGroupID])
Expand Down Expand Up @@ -346,11 +344,9 @@ func TestProcessChunkWith(t *testing.T) {
ti.SetSelectedRowCh(rowsCh)
kvWriter := mock.NewMockEngineWriter(ctrl)
kvWriter.EXPECT().AppendRows(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
progress := importer.NewProgress()
checksum := verify.NewKVGroupChecksumWithKeyspace(keyspace)
err := importer.ProcessChunkWithWriter(ctx, chunkInfo, ti, kvWriter, kvWriter, progress, zap.NewExample(), checksum)
err := importer.ProcessChunkWithWriter(ctx, chunkInfo, ti, kvWriter, kvWriter, zap.NewExample(), checksum)
require.NoError(t, err)
require.Len(t, progress.GetColSize(), 3)
checksumMap := checksum.GetInnerChecksums()
require.Len(t, checksumMap, 1)
require.Equal(t, verify.MakeKVChecksum(111, 3, 14585065391351463171), *checksumMap[verify.DataKVGroupID])
Expand Down
6 changes: 0 additions & 6 deletions pkg/executor/importer/kv_encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ import (
// KVEncoder encodes a row of data into a KV pair.
type KVEncoder interface {
Encode(row []types.Datum, rowID int64) (*kv.Pairs, error)
// GetColumnSize returns the size of each column in the current encoder.
GetColumnSize() map[int64]int64
io.Closer
}

Expand Down Expand Up @@ -91,10 +89,6 @@ func (en *tableKVEncoder) Encode(row []types.Datum, rowID int64) (*kv.Pairs, err
return en.Record2KV(record, row, rowID)
}

func (en *tableKVEncoder) GetColumnSize() map[int64]int64 {
return en.SessionCtx.GetColumnSize(en.TableMeta().ID)
}

// todo merge with code in load_data.go
func (en *tableKVEncoder) parserData2TableData(parserData []types.Datum, rowID int64) ([]types.Datum, error) {
row := make([]types.Datum, 0, len(en.insertColumns))
Expand Down
49 changes: 0 additions & 49 deletions pkg/executor/importer/progress.go

This file was deleted.

16 changes: 5 additions & 11 deletions pkg/executor/importer/table_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,25 +654,20 @@ func (ti *TableImporter) ImportSelectedRows(ctx context.Context, se sessionctx.C
}

var (
mu sync.Mutex
checksum = verify.NewKVGroupChecksumWithKeyspace(ti.keyspace)
colSizeMap = make(map[int64]int64)
mu sync.Mutex
checksum = verify.NewKVGroupChecksumWithKeyspace(ti.keyspace)
)
eg, egCtx := tidbutil.NewErrorGroupWithRecoverWithCtx(ctx)
for i := 0; i < ti.ThreadCnt; i++ {
eg.Go(func() error {
chunkCheckpoint := checkpoints.ChunkCheckpoint{}
chunkChecksum := verify.NewKVGroupChecksumWithKeyspace(ti.keyspace)
progress := NewProgress()
defer func() {
mu.Lock()
defer mu.Unlock()
checksum.Add(chunkChecksum)
for k, v := range progress.GetColSize() {
colSizeMap[k] += v
}
}()
return ProcessChunk(egCtx, &chunkCheckpoint, ti, dataEngine, indexEngine, progress, ti.logger, chunkChecksum)
return ProcessChunk(egCtx, &chunkCheckpoint, ti, dataEngine, indexEngine, ti.logger, chunkChecksum)
})
}
if err = eg.Wait(); err != nil {
Expand Down Expand Up @@ -716,8 +711,7 @@ func (ti *TableImporter) ImportSelectedRows(ctx context.Context, se sessionctx.C
}

return &JobImportResult{
Affected: uint64(dataKVCount),
ColSizeMap: colSizeMap,
Affected: uint64(dataKVCount),
}, nil
}

Expand Down Expand Up @@ -976,7 +970,7 @@ func FlushTableStats(ctx context.Context, se sessionctx.Context, tableID int64,
sessionVars := se.GetSessionVars()
sessionVars.TxnCtxMu.Lock()
defer sessionVars.TxnCtxMu.Unlock()
sessionVars.TxnCtx.UpdateDeltaForTable(tableID, int64(result.Affected), int64(result.Affected), result.ColSizeMap)
sessionVars.TxnCtx.UpdateDeltaForTable(tableID, int64(result.Affected), int64(result.Affected))
se.StmtCommit(ctx)
return se.CommitTxn(ctx)
}
39 changes: 23 additions & 16 deletions pkg/executor/infoschema_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,20 +182,23 @@ func TestDataForTableStatsField(t *testing.T) {
require.NoError(t, h.DumpStatsDeltaToKV(true))
require.NoError(t, h.Update(context.Background(), is))
tk.MustQuery("select table_rows, avg_row_length, data_length, index_length from information_schema.tables where table_name='t'").Check(
testkit.Rows("3 18 54 6"))
testkit.Rows("3 16 48 0"))
tk.MustExec(`insert into t(c, d, e) values(4, 5, "f")`)
require.NoError(t, h.DumpStatsDeltaToKV(true))
require.NoError(t, h.Update(context.Background(), is))
tk.MustQuery("select table_rows, avg_row_length, data_length, index_length from information_schema.tables where table_name='t'").Check(
testkit.Rows("4 18 72 8"))
testkit.Rows("4 16 64 0"))
tk.MustExec("delete from t where c >= 3")
require.NoError(t, h.DumpStatsDeltaToKV(true))
require.NoError(t, h.Update(context.Background(), is))
tk.MustQuery("select table_rows, avg_row_length, data_length, index_length from information_schema.tables where table_name='t'").Check(
testkit.Rows("2 18 36 4"))
testkit.Rows("2 16 32 0"))
tk.MustExec("delete from t where c=3")
require.NoError(t, h.DumpStatsDeltaToKV(true))
require.NoError(t, h.Update(context.Background(), is))
tk.MustQuery("select table_rows, avg_row_length, data_length, index_length from information_schema.tables where table_name='t'").Check(
testkit.Rows("2 16 32 0"))
tk.MustExec("analyze table t all columns")
tk.MustQuery("select table_rows, avg_row_length, data_length, index_length from information_schema.tables where table_name='t'").Check(
testkit.Rows("2 18 36 4"))

Expand All @@ -207,6 +210,9 @@ func TestDataForTableStatsField(t *testing.T) {
tk.MustExec(`insert into t(a, b, c) values(1, 2, "c"), (7, 3, "d"), (12, 4, "e")`)
require.NoError(t, h.DumpStatsDeltaToKV(true))
require.NoError(t, h.Update(context.Background(), is))
tk.MustQuery("select table_rows, avg_row_length, data_length, index_length from information_schema.tables where table_name='t'").Check(
testkit.Rows("3 16 48 0"))
tk.MustExec("analyze table t all columns")
tk.MustQuery("select table_rows, avg_row_length, data_length, index_length from information_schema.tables where table_name='t'").Check(
testkit.Rows("3 18 54 6"))
}
Expand All @@ -227,23 +233,24 @@ func TestPartitionsTable(t *testing.T) {
tk.MustExec(`insert into test_partitions(a, b, c) values(1, 2, "c"), (7, 3, "d"), (12, 4, "e");`)

tk.MustQuery("select PARTITION_NAME, PARTITION_DESCRIPTION from information_schema.PARTITIONS where table_name='test_partitions';").Check(
testkit.Rows("" +
"p0 6]\n" +
"[p1 11]\n" +
"[p2 16"))
testkit.Rows("p0 6", "p1 11", "p2 16"))

tk.MustQuery("select table_rows, avg_row_length, data_length, index_length from information_schema.PARTITIONS where table_name='test_partitions';").Check(
testkit.Rows("" +
"0 0 0 0]\n" +
"[0 0 0 0]\n" +
"[0 0 0 0"))
testkit.Rows(
"0 0 0 0",
"0 0 0 0",
"0 0 0 0",
),
)
require.NoError(t, h.DumpStatsDeltaToKV(true))
require.NoError(t, h.Update(context.Background(), is))
tk.MustQuery("select table_rows, avg_row_length, data_length, index_length from information_schema.PARTITIONS where table_name='test_partitions';").Check(
testkit.Rows("" +
"1 18 18 2]\n" +
"[1 18 18 2]\n" +
"[1 18 18 2"))
testkit.Rows(
"1 16 16 0",
"1 16 16 0",
"1 16 16 0",
),
)
})

// Test for table has no partitions.
Expand All @@ -255,7 +262,7 @@ func TestPartitionsTable(t *testing.T) {
require.NoError(t, h.DumpStatsDeltaToKV(true))
require.NoError(t, h.Update(context.Background(), is))
tk.MustQuery("select PARTITION_NAME, TABLE_ROWS, AVG_ROW_LENGTH, DATA_LENGTH, INDEX_LENGTH from information_schema.PARTITIONS where table_name='test_partitions_1';").Check(
testkit.Rows("<nil> 3 18 54 6"))
testkit.Rows("<nil> 3 16 48 0"))

tk.MustExec("DROP TABLE IF EXISTS `test_partitions`;")
tk.MustExec(`CREATE TABLE test_partitions1 (id int, b int, c varchar(5), primary key(id), index idx(c)) PARTITION BY RANGE COLUMNS(id) (PARTITION p0 VALUES LESS THAN (6), PARTITION p1 VALUES LESS THAN (11), PARTITION p2 VALUES LESS THAN (16));`)
Expand Down
Loading

0 comments on commit b313166

Please sign in to comment.