diff --git a/pkg/statistics/BUILD.bazel b/pkg/statistics/BUILD.bazel new file mode 100644 index 0000000000000..a1074ff243673 --- /dev/null +++ b/pkg/statistics/BUILD.bazel @@ -0,0 +1,115 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "statistics", + srcs = [ + "analyze.go", + "analyze_jobs.go", + "builder.go", + "builder_ext_stats.go", + "cmsketch.go", + "cmsketch_util.go", + "column.go", + "debugtrace.go", + "estimate.go", + "fmsketch.go", + "histogram.go", + "index.go", + "row_sampler.go", + "sample.go", + "scalar.go", + "table.go", + ], + importpath = "github.com/pingcap/tidb/pkg/statistics", + visibility = ["//visibility:public"], + deps = [ + "//pkg/expression", + "//pkg/kv", + "//pkg/meta/model", + "//pkg/parser/ast", + "//pkg/parser/charset", + "//pkg/parser/mysql", + "//pkg/parser/terror", + "//pkg/planner/core/resolve", + "//pkg/planner/planctx", + "//pkg/planner/util/debugtrace", + "//pkg/sessionctx", + "//pkg/sessionctx/stmtctx", + "//pkg/sessionctx/vardef", + "//pkg/sessionctx/variable", + "//pkg/statistics/asyncload", + "//pkg/statistics/handle/logutil", + "//pkg/tablecodec", + "//pkg/types", + "//pkg/util/chunk", + "//pkg/util/codec", + "//pkg/util/collate", + "//pkg/util/context", + "//pkg/util/dbterror", + "//pkg/util/fastrand", + "//pkg/util/hack", + "//pkg/util/intest", + "//pkg/util/logutil", + "//pkg/util/memory", + "//pkg/util/ranger", + "//pkg/util/sqlexec", + "@com_github_dolthub_swiss//:swiss", + "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", + "@com_github_pingcap_tipb//go-tipb", + "@com_github_twmb_murmur3//:murmur3", + "@org_golang_x_exp//maps", + "@org_uber_go_atomic//:atomic", + "@org_uber_go_zap//:zap", + ], +) + +go_test( + name = "statistics_test", + timeout = "short", + srcs = [ + "bench_daily_test.go", + "builder_test.go", + "cmsketch_test.go", + "fmsketch_test.go", + "histogram_bench_test.go", + "histogram_test.go", + "integration_test.go", + "main_test.go", + "sample_test.go", + "scalar_test.go", + "statistics_test.go", + ], + data = glob(["testdata/**"]), + embed = [":statistics"], + flaky = True, + shard_count = 37, + deps = [ + "//pkg/config", + "//pkg/meta/model", + "//pkg/parser/ast", + "//pkg/parser/mysql", + "//pkg/planner/core/resolve", + "//pkg/sessionctx", + "//pkg/sessionctx/stmtctx", + "//pkg/statistics/handle/ddl/testutil", + "//pkg/testkit", + "//pkg/testkit/analyzehelper", + "//pkg/testkit/testdata", + "//pkg/testkit/testmain", + "//pkg/testkit/testsetup", + "//pkg/types", + "//pkg/util/benchdaily", + "//pkg/util/chunk", + "//pkg/util/codec", + "//pkg/util/collate", + "//pkg/util/memory", + "//pkg/util/mock", + "//pkg/util/ranger", + "//pkg/util/sqlexec", + "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", + "@com_github_stretchr_testify//require", + "@org_uber_go_goleak//:goleak", + ], +) diff --git a/pkg/statistics/handle/globalstats/topn.go b/pkg/statistics/handle/globalstats/topn.go new file mode 100644 index 0000000000000..b8dd2ce57658b --- /dev/null +++ b/pkg/statistics/handle/globalstats/topn.go @@ -0,0 +1,214 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package globalstats + +import ( + "strings" + "sync" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/statistics" + "github.com/pingcap/tidb/pkg/util/hack" + "github.com/pingcap/tidb/pkg/util/sqlkiller" + "github.com/tiancaiamao/gp" +) + +func mergeGlobalStatsTopN(gp *gp.Pool, sc sessionctx.Context, wrapper *StatsWrapper, + timeZone *time.Location, version int, n uint32, isIndex bool) (*statistics.TopN, + []statistics.TopNMeta, []*statistics.Histogram, error) { + if statistics.CheckEmptyTopNs(wrapper.AllTopN) { + return nil, nil, wrapper.AllHg, nil + } + mergeConcurrency := sc.GetSessionVars().AnalyzePartitionMergeConcurrency + killer := &sc.GetSessionVars().SQLKiller + + // use original method if concurrency equals 1 or for version1 + if mergeConcurrency < 2 { + return MergePartTopN2GlobalTopN(timeZone, version, wrapper.AllTopN, n, wrapper.AllHg, isIndex, killer) + } + batchSize := len(wrapper.AllTopN) / mergeConcurrency + if batchSize < 1 { + batchSize = 1 + } else if batchSize > MaxPartitionMergeBatchSize { + batchSize = MaxPartitionMergeBatchSize + } + return MergeGlobalStatsTopNByConcurrency(gp, mergeConcurrency, batchSize, wrapper, timeZone, version, n, isIndex, killer) +} + +// MergeGlobalStatsTopNByConcurrency merge partition topN by concurrency. +// To merge global stats topN by concurrency, +// we will separate the partition topN in concurrency part and deal it with different worker. +// mergeConcurrency is used to control the total concurrency of the running worker, +// and mergeBatchSize is sued to control the partition size for each worker to solve it +func MergeGlobalStatsTopNByConcurrency( + gp *gp.Pool, + mergeConcurrency, mergeBatchSize int, + wrapper *StatsWrapper, + timeZone *time.Location, + version int, + n uint32, + isIndex bool, + killer *sqlkiller.SQLKiller, +) (*statistics.TopN, + []statistics.TopNMeta, []*statistics.Histogram, error) { + if len(wrapper.AllTopN) < mergeConcurrency { + mergeConcurrency = len(wrapper.AllTopN) + } + tasks := make([]*TopnStatsMergeTask, 0) + for start := 0; start < len(wrapper.AllTopN); { + end := start + mergeBatchSize + if end > len(wrapper.AllTopN) { + end = len(wrapper.AllTopN) + } + task := NewTopnStatsMergeTask(start, end) + tasks = append(tasks, task) + start = end + } + var wg sync.WaitGroup + taskNum := len(tasks) + taskCh := make(chan *TopnStatsMergeTask, taskNum) + respCh := make(chan *TopnStatsMergeResponse, taskNum) + worker := NewTopnStatsMergeWorker(taskCh, respCh, wrapper, killer) + for i := 0; i < mergeConcurrency; i++ { + wg.Add(1) + gp.Go(func() { + defer wg.Done() + worker.Run(timeZone, isIndex, version) + }) + } + for _, task := range tasks { + taskCh <- task + } + close(taskCh) + wg.Wait() + close(respCh) + // handle Error + hasErr := false + errMsg := make([]string, 0) + for resp := range respCh { + if resp.Err != nil { + hasErr = true + errMsg = append(errMsg, resp.Err.Error()) + } + } + if hasErr { + return nil, nil, nil, errors.New(strings.Join(errMsg, ",")) + } + + // fetch the response from each worker and merge them into global topn stats + counter := worker.Result() + numTop := len(counter) + sorted := make([]statistics.TopNMeta, 0, numTop) + for value, cnt := range counter { + data := hack.Slice(string(value)) + sorted = append(sorted, statistics.TopNMeta{Encoded: data, Count: uint64(cnt)}) + } + globalTopN, popedTopn := statistics.GetMergedTopNFromSortedSlice(sorted, n) + return globalTopN, popedTopn, wrapper.AllHg, nil +} + +// MergePartTopN2GlobalTopN is used to merge the partition-level topN to global-level topN. +// The input parameters: +// 1. `topNs` are the partition-level topNs to be merged. +// 2. `n` is the size of the global-level topN. +// Notice: This value can be 0 and has no default value, we must explicitly specify this value. +// 3. `hists` are the partition-level histograms. +// Some values not in topN may be placed in the histogram. +// We need it here to make the value in the global-level TopN more accurate. +// +// The output parameters: +// 1. `*TopN` is the final global-level topN. +// 2. `[]TopNMeta` is the left topN value from the partition-level TopNs, +// but is not placed to global-level TopN. We should put them back to histogram latter. +// 3. `[]*Histogram` are the partition-level histograms which +// just delete some values when we merge the global-level topN. +func MergePartTopN2GlobalTopN( + loc *time.Location, + version int, + topNs []*statistics.TopN, + n uint32, + hists []*statistics.Histogram, + isIndex bool, + killer *sqlkiller.SQLKiller, +) (*statistics.TopN, []statistics.TopNMeta, []*statistics.Histogram, error) { + partNum := len(topNs) + // Different TopN structures may hold the same value, we have to merge them. + counter := make(map[hack.MutableString]float64) + // datumMap is used to store the mapping from the string type to datum type. + // The datum is used to find the value in the histogram. + datumMap := statistics.NewDatumMapCache() + for i, topN := range topNs { + if err := killer.HandleSignal(); err != nil { + return nil, nil, nil, err + } + // Ignore the empty topN. + if topN.TotalCount() == 0 { + continue + } + + for _, val := range topN.TopN { + encodedVal := hack.String(val.Encoded) + _, exists := counter[encodedVal] + counter[encodedVal] += float64(val.Count) + if exists { + // We have already calculated the encodedVal from the histogram, so just continue to next topN value. + continue + } + + // We need to check whether the value corresponding to encodedVal is contained in other partition-level stats. + // 1. Check the topN first. + // 2. If the topN doesn't contain the value corresponding to encodedVal. We should check the histogram. + for j := 0; j < partNum; j++ { + if err := killer.HandleSignal(); err != nil { + return nil, nil, nil, err + } + + if (j == i && version >= 2) || topNs[j].FindTopN(val.Encoded) != -1 { + continue + } + // Get the encodedVal from the hists[j] + datum, exists := datumMap.Get(encodedVal) + if !exists { + d, err := datumMap.Put(val, encodedVal, hists[0].Tp.GetType(), isIndex, loc) + if err != nil { + return nil, nil, nil, err + } + datum = d + } + // Get the row count which the value is equal to the encodedVal from histogram. + count, _ := hists[j].EqualRowCount(nil, datum, isIndex) + if count != 0 { + counter[encodedVal] += count + // Remove the value corresponding to encodedVal from the histogram. + hists[j].BinarySearchRemoveVal(&datum, int64(count)) + } + } + } + } + + numTop := len(counter) + if numTop == 0 { + return nil, nil, hists, nil + } + sorted := make([]statistics.TopNMeta, 0, numTop) + for value, cnt := range counter { + data := hack.Slice(string(value)) + sorted = append(sorted, statistics.TopNMeta{Encoded: data, Count: uint64(cnt)}) + } + globalTopN, leftTopN := statistics.GetMergedTopNFromSortedSlice(sorted, n) + return globalTopN, leftTopN, hists, nil +} diff --git a/pkg/statistics/histogram_test.go b/pkg/statistics/histogram_test.go new file mode 100644 index 0000000000000..d7d9827c12477 --- /dev/null +++ b/pkg/statistics/histogram_test.go @@ -0,0 +1,714 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package statistics + +import ( + "fmt" + "testing" + "time" + + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/pkg/meta/model" + "github.com/pingcap/tidb/pkg/parser/ast" + "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/types" + "github.com/pingcap/tidb/pkg/util/codec" + "github.com/pingcap/tidb/pkg/util/mock" + "github.com/stretchr/testify/require" +) + +func TestTruncateHistogram(t *testing.T) { + hist := NewHistogram(0, 0, 0, 0, types.NewFieldType(mysql.TypeLonglong), 1, 0) + low, high := types.NewIntDatum(0), types.NewIntDatum(1) + hist.AppendBucket(&low, &high, 0, 1) + newHist := hist.TruncateHistogram(1) + require.True(t, HistogramEqual(hist, newHist, true)) + newHist = hist.TruncateHistogram(0) + require.Equal(t, 0, newHist.Len()) +} + +func TestValueToString4InvalidKey(t *testing.T) { + bytes, err := codec.EncodeKey(time.UTC, nil, types.NewDatum(1), types.NewDatum(0.5)) + require.NoError(t, err) + // Append invalid flag. + bytes = append(bytes, 20) + datum := types.NewDatum(bytes) + res, err := ValueToString(nil, &datum, 3, nil) + require.NoError(t, err) + require.Equal(t, "(1, 0.5, \x14)", res) +} + +type bucket4Test struct { + lower int64 + upper int64 + count int64 + repeat int64 + ndv int64 +} + +type topN4Test struct { + data int64 + count int64 +} + +func genHist4Test(t *testing.T, buckets []*bucket4Test, totColSize int64) *Histogram { + h := NewHistogram(0, 0, 0, 0, types.NewFieldType(mysql.TypeBlob), len(buckets), totColSize) + for _, bucket := range buckets { + lower, err := codec.EncodeKey(time.UTC, nil, types.NewIntDatum(bucket.lower)) + require.NoError(t, err) + upper, err := codec.EncodeKey(time.UTC, nil, types.NewIntDatum(bucket.upper)) + require.NoError(t, err) + di, du := types.NewBytesDatum(lower), types.NewBytesDatum(upper) + h.AppendBucketWithNDV(&di, &du, bucket.count, bucket.repeat, bucket.ndv) + } + return h +} + +func TestMergePartitionLevelHist(t *testing.T) { + type testCase struct { + partitionHists [][]*bucket4Test + totColSize []int64 + popedTopN []topN4Test + expHist []*bucket4Test + expBucketNumber int + } + tests := []testCase{ + { + partitionHists: [][]*bucket4Test{ + { + // Col(1) = [1, 4,|| 6, 9, 9,|| 12, 12, 12,|| 13, 14, 15] + { + lower: 1, + upper: 4, + count: 2, + repeat: 1, + ndv: 2, + }, + { + lower: 6, + upper: 9, + count: 5, + repeat: 2, + ndv: 2, + }, + { + lower: 12, + upper: 12, + count: 8, + repeat: 3, + ndv: 1, + }, + { + lower: 13, + upper: 15, + count: 11, + repeat: 1, + ndv: 3, + }, + }, + // Col(2) = [2, 5,|| 6, 7, 7,|| 11, 11, 11,|| 13, 14, 17] + { + { + lower: 2, + upper: 5, + count: 2, + repeat: 1, + ndv: 2, + }, + { + lower: 6, + upper: 7, + count: 5, + repeat: 2, + ndv: 2, + }, + { + lower: 11, + upper: 11, + count: 8, + repeat: 3, + ndv: 1, + }, + { + lower: 13, + upper: 17, + count: 11, + repeat: 1, + ndv: 3, + }, + }, + }, + totColSize: []int64{11, 11}, + popedTopN: []topN4Test{}, + expHist: []*bucket4Test{ + { + lower: 1, + upper: 9, + count: 10, + repeat: 2, + ndv: 7, + }, + { + lower: 11, + upper: 17, + count: 22, + repeat: 1, + ndv: 8, + }, + }, + expBucketNumber: 2, + }, + { + partitionHists: [][]*bucket4Test{ + { + // Col(1) = [1, 4,|| 6, 9, 9,|| 12, 12, 12,|| 13, 14, 15] + { + lower: 1, + upper: 4, + count: 2, + repeat: 1, + ndv: 2, + }, + { + lower: 6, + upper: 9, + count: 5, + repeat: 2, + ndv: 2, + }, + { + lower: 12, + upper: 12, + count: 8, + repeat: 3, + ndv: 1, + }, + { + lower: 13, + upper: 15, + count: 11, + repeat: 1, + ndv: 3, + }, + }, + // Col(2) = [2, 5,|| 6, 7, 7,|| 11, 11, 11,|| 13, 14, 17] + { + { + lower: 2, + upper: 5, + count: 2, + repeat: 1, + ndv: 2, + }, + { + lower: 6, + upper: 7, + count: 5, + repeat: 2, + ndv: 2, + }, + { + lower: 11, + upper: 11, + count: 8, + repeat: 3, + ndv: 1, + }, + { + lower: 13, + upper: 17, + count: 11, + repeat: 1, + ndv: 3, + }, + }, + }, + totColSize: []int64{11, 11}, + popedTopN: []topN4Test{ + { + data: 18, + count: 5, + }, + { + data: 4, + count: 6, + }, + }, + expHist: []*bucket4Test{ + { + lower: 1, + upper: 5, + count: 10, + repeat: 1, + ndv: 2, + }, + { + lower: 6, + upper: 12, + count: 22, + repeat: 3, + ndv: 6, + }, + { + lower: 13, + upper: 18, + count: 33, + repeat: 5, + ndv: 5, + }, + }, + expBucketNumber: 3, + }, + { + // issue#49023 + partitionHists: [][]*bucket4Test{ + { + // Col(1) = [1, 4,|| 6, 9, 9,|| 12, 12, 12,|| 13, 14, 15] + { + lower: 1, + upper: 4, + count: 2, + repeat: 1, + ndv: 2, + }, + { + lower: 6, + upper: 9, + count: 5, + repeat: 2, + ndv: 2, + }, + { + lower: 12, + upper: 12, + count: 5, + repeat: 3, + ndv: 1, + }, + { + lower: 13, + upper: 15, + count: 11, + repeat: 1, + ndv: 3, + }, + }, + // Col(2) = [2, 5,|| 6, 7, 7,|| 11, 11, 11,|| 13, 14, 17] + { + { + lower: 2, + upper: 5, + count: 2, + repeat: 1, + ndv: 2, + }, + { + lower: 6, + upper: 7, + count: 2, + repeat: 2, + ndv: 2, + }, + { + lower: 11, + upper: 11, + count: 8, + repeat: 3, + ndv: 1, + }, + { + lower: 13, + upper: 17, + count: 11, + repeat: 1, + ndv: 3, + }, + }, + // Col(3) = [2, 5,|| 6, 7, 7,|| 11, 11, 11,|| 13, 14, 17] + { + { + lower: 2, + upper: 5, + count: 2, + repeat: 1, + ndv: 2, + }, + { + lower: 6, + upper: 7, + count: 2, + repeat: 2, + ndv: 2, + }, + { + lower: 11, + upper: 11, + count: 8, + repeat: 3, + ndv: 1, + }, + { + lower: 13, + upper: 17, + count: 11, + repeat: 1, + ndv: 3, + }, + }, + // Col(4) = [2, 5,|| 6, 7, 7,|| 11, 11, 11,|| 13, 14, 17] + { + { + lower: 2, + upper: 5, + count: 2, + repeat: 1, + ndv: 2, + }, + { + lower: 6, + upper: 7, + count: 2, + repeat: 2, + ndv: 2, + }, + { + lower: 11, + upper: 11, + count: 8, + repeat: 3, + ndv: 1, + }, + { + lower: 13, + upper: 17, + count: 11, + repeat: 1, + ndv: 3, + }, + }, + }, + totColSize: []int64{11, 11, 11, 11}, + popedTopN: []topN4Test{ + { + data: 18, + count: 5, + }, + { + data: 4, + count: 6, + }, + }, + expHist: []*bucket4Test{ + { + lower: 1, + upper: 9, + count: 17, + repeat: 2, + ndv: 8, + }, + { + lower: 11, + upper: 11, + count: 35, + repeat: 9, + ndv: 1, + }, + { + lower: 13, + upper: 18, + count: 55, + repeat: 5, + ndv: 6, + }, + }, + expBucketNumber: 3, + }, + } + failpoint.Enable("github.com/pingcap/pkg/statistics/enableTopNNDV", `return(true)`) + + for ii, tt := range tests { + var expTotColSize int64 + hists := make([]*Histogram, 0, len(tt.partitionHists)) + for i := range tt.partitionHists { + hists = append(hists, genHist4Test(t, tt.partitionHists[i], tt.totColSize[i])) + expTotColSize += tt.totColSize[i] + } + ctx := mock.NewContext() + sc := ctx.GetSessionVars().StmtCtx + poped := make([]TopNMeta, 0, len(tt.popedTopN)) + for _, top := range tt.popedTopN { + b, err := codec.EncodeKey(sc.TimeZone(), nil, types.NewIntDatum(top.data)) + require.NoError(t, err) + tmp := TopNMeta{ + Encoded: b, + Count: uint64(top.count), + } + poped = append(poped, tmp) + } + globalHist, err := MergePartitionHist2GlobalHist(sc, hists, poped, int64(tt.expBucketNumber), true, Version2) + require.NoError(t, err) + require.Equal(t, tt.expBucketNumber, len(globalHist.Buckets)) + for i, b := range tt.expHist { + lo, err := ValueToString(ctx.GetSessionVars(), globalHist.GetLower(i), 1, []byte{types.KindInt64}) + require.NoError(t, err, "failed at #%d case, %d bucket", ii, i) + up, err := ValueToString(ctx.GetSessionVars(), globalHist.GetUpper(i), 1, []byte{types.KindInt64}) + require.NoError(t, err, "failed at #%d case, %d bucket", ii, i) + require.Equal(t, fmt.Sprintf("%v", b.lower), lo, "failed at #%d case, %d bucket", ii, i) + require.Equal(t, fmt.Sprintf("%v", b.upper), up, "failed at #%d case, %d bucket", ii, i) + require.Equal(t, b.count, globalHist.Buckets[i].Count, "failed at #%d case, %d bucket", ii, i) + require.Equal(t, b.repeat, globalHist.Buckets[i].Repeat, "failed at #%d case, %d bucket", ii, i) + require.Equal(t, b.ndv, globalHist.Buckets[i].NDV, "failed at #%d case, %d bucket", ii, i) + } + require.Equal(t, expTotColSize, globalHist.TotColSize, "failed at #%d case", ii) + } + failpoint.Disable("github.com/pingcap/pkg/statistics/enableTopNNDV") +} + +func genBucket4Merging4Test(lower, upper, ndv, disjointNDV int64) bucket4Merging { + l := types.NewIntDatum(lower) + r := types.NewIntDatum(upper) + return bucket4Merging{ + lower: &l, + upper: &r, + Bucket: Bucket{ + NDV: ndv, + Count: ndv, + }, + disjointNDV: disjointNDV, + } +} + +func TestMergeBucketNDV(t *testing.T) { + type testData struct { + left bucket4Merging + right bucket4Merging + result bucket4Merging + } + tests := []testData{ + { + left: genBucket4Merging4Test(1, 2, 2, 0), + right: genBucket4Merging4Test(1, 2, 3, 0), + result: genBucket4Merging4Test(1, 2, 3, 0), + }, + { + left: genBucket4Merging4Test(1, 3, 2, 0), + right: genBucket4Merging4Test(2, 3, 2, 0), + result: genBucket4Merging4Test(1, 3, 3, 0), + }, + { + left: genBucket4Merging4Test(1, 3, 2, 0), + right: genBucket4Merging4Test(4, 6, 2, 2), + result: genBucket4Merging4Test(1, 3, 2, 4), + }, + { + left: genBucket4Merging4Test(1, 5, 5, 0), + right: genBucket4Merging4Test(2, 6, 5, 0), + result: genBucket4Merging4Test(1, 6, 6, 0), + }, + { + left: genBucket4Merging4Test(3, 5, 3, 0), + right: genBucket4Merging4Test(2, 6, 4, 0), + result: genBucket4Merging4Test(2, 6, 5, 0), + }, + } + sc := mock.NewContext().GetSessionVars().StmtCtx + for i, tt := range tests { + res, err := mergeBucketNDV(sc, &tt.left, &tt.right) + require.NoError(t, err, "failed at #%Td case", i) + require.Equal(t, res.lower.GetInt64(), tt.result.lower.GetInt64(), "failed at #%Td case", i) + require.Equal(t, res.upper.GetInt64(), tt.result.upper.GetInt64(), "failed at #%Td case", i) + require.Equal(t, res.NDV, tt.result.NDV, "failed at #%Td case", i) + require.Equal(t, res.disjointNDV, tt.result.disjointNDV, "failed at #%Td case", i) + } +} + +func TestIndexQueryBytes(t *testing.T) { + ctx := mock.NewContext() + sc := ctx.GetSessionVars().StmtCtx + idx := &Index{Info: &model.IndexInfo{Columns: []*model.IndexColumn{{Name: ast.NewCIStr("a"), Offset: 0}}}} + idx.Histogram = *NewHistogram(0, 15, 0, 0, types.NewFieldType(mysql.TypeBlob), 0, 0) + low, err1 := codec.EncodeKey(sc.TimeZone(), nil, types.NewBytesDatum([]byte("0"))) + require.NoError(t, err1) + high, err2 := codec.EncodeKey(sc.TimeZone(), nil, types.NewBytesDatum([]byte("3"))) + require.NoError(t, err2) + idx.Bounds.AppendBytes(0, low) + idx.Bounds.AppendBytes(0, high) + idx.Buckets = append(idx.Buckets, Bucket{Repeat: 10, Count: 20, NDV: 20}) + idx.PreCalculateScalar() + idx.CMSketch = nil + // Count / NDV + require.Equal(t, idx.QueryBytes(nil, low), uint64(1)) + // Repeat + require.Equal(t, idx.QueryBytes(nil, high), uint64(10)) +} + +type histogramInputAndOutput struct { + inputHist *Histogram + inputHistToStr string + outputHistToStr string +} + +func TestStandardizeForV2AnalyzeIndex(t *testing.T) { + // 1. prepare expected input and output histograms (in string) + testData := []*histogramInputAndOutput{ + { + inputHistToStr: "index:0 ndv:6\n" + + "num: 0 lower_bound: 111 upper_bound: 111 repeats: 0 ndv: 0\n" + + "num: 0 lower_bound: 123 upper_bound: 123 repeats: 0 ndv: 0\n" + + "num: 10 lower_bound: 34567 upper_bound: 5 repeats: 3 ndv: 2", + outputHistToStr: "index:0 ndv:6\n" + + "num: 10 lower_bound: 34567 upper_bound: 5 repeats: 3 ndv: 0", + }, + { + inputHistToStr: "index:0 ndv:6\n" + + "num: 0 lower_bound: 111 upper_bound: 111 repeats: 0 ndv: 0\n" + + "num: 0 lower_bound: 123 upper_bound: 123 repeats: 0 ndv: 0\n" + + "num: 0 lower_bound: 34567 upper_bound: 5 repeats: 0 ndv: 0", + outputHistToStr: "index:0 ndv:6", + }, + { + inputHistToStr: "index:0 ndv:6\n" + + "num: 10 lower_bound: 34567 upper_bound: 5 repeats: 3 ndv: 2\n" + + "num: 0 lower_bound: 876 upper_bound: 876 repeats: 0 ndv: 0\n" + + "num: 0 lower_bound: 990 upper_bound: 990 repeats: 0 ndv: 0", + outputHistToStr: "index:0 ndv:6\n" + + "num: 10 lower_bound: 34567 upper_bound: 5 repeats: 3 ndv: 0", + }, + { + inputHistToStr: "index:0 ndv:6\n" + + "num: 10 lower_bound: 111 upper_bound: 111 repeats: 10 ndv: 1\n" + + "num: 12 lower_bound: 123 upper_bound: 34567 repeats: 4 ndv: 20\n" + + "num: 10 lower_bound: 5 upper_bound: 990 repeats: 6 ndv: 2", + outputHistToStr: "index:0 ndv:6\n" + + "num: 10 lower_bound: 111 upper_bound: 111 repeats: 10 ndv: 0\n" + + "num: 12 lower_bound: 123 upper_bound: 34567 repeats: 4 ndv: 0\n" + + "num: 10 lower_bound: 5 upper_bound: 990 repeats: 6 ndv: 0", + }, + { + inputHistToStr: "index:0 ndv:6\n" + + "num: 0 lower_bound: 111 upper_bound: 111 repeats: 0 ndv: 0\n" + + "num: 0 lower_bound: 123 upper_bound: 123 repeats: 0 ndv: 0\n" + + "num: 10 lower_bound: 34567 upper_bound: 34567 repeats: 3 ndv: 2\n" + + "num: 0 lower_bound: 5 upper_bound: 5 repeats: 0 ndv: 0\n" + + "num: 0 lower_bound: 876 upper_bound: 876 repeats: 0 ndv: 0\n" + + "num: 10 lower_bound: 990 upper_bound: 990 repeats: 3 ndv: 2\n" + + "num: 10 lower_bound: 95 upper_bound: 95 repeats: 3 ndv: 2", + outputHistToStr: "index:0 ndv:6\n" + + "num: 10 lower_bound: 34567 upper_bound: 34567 repeats: 3 ndv: 0\n" + + "num: 10 lower_bound: 990 upper_bound: 990 repeats: 3 ndv: 0\n" + + "num: 10 lower_bound: 95 upper_bound: 95 repeats: 3 ndv: 0", + }, + { + inputHistToStr: "index:0 ndv:6\n" + + "num: 0 lower_bound: 111 upper_bound: 111 repeats: 0 ndv: 0\n" + + "num: 0 lower_bound: 123 upper_bound: 123 repeats: 0 ndv: 0\n" + + "num: 10 lower_bound: 34567 upper_bound: 34567 repeats: 3 ndv: 2\n" + + "num: 0 lower_bound: 5 upper_bound: 5 repeats: 0 ndv: 0\n" + + "num: 10 lower_bound: 876 upper_bound: 876 repeats: 3 ndv: 2\n" + + "num: 10 lower_bound: 990 upper_bound: 990 repeats: 3 ndv: 2\n" + + "num: 0 lower_bound: 95 upper_bound: 95 repeats: 0 ndv: 0", + outputHistToStr: "index:0 ndv:6\n" + + "num: 10 lower_bound: 34567 upper_bound: 34567 repeats: 3 ndv: 0\n" + + "num: 10 lower_bound: 876 upper_bound: 876 repeats: 3 ndv: 0\n" + + "num: 10 lower_bound: 990 upper_bound: 990 repeats: 3 ndv: 0", + }, + } + // 2. prepare the actual Histogram input + ctx := mock.NewContext() + sc := ctx.GetSessionVars().StmtCtx + val0, err := codec.EncodeKey(sc.TimeZone(), nil, types.NewIntDatum(111)) + require.NoError(t, err) + val1, err := codec.EncodeKey(sc.TimeZone(), nil, types.NewIntDatum(123)) + require.NoError(t, err) + val2, err := codec.EncodeKey(sc.TimeZone(), nil, types.NewIntDatum(34567)) + require.NoError(t, err) + val3, err := codec.EncodeKey(sc.TimeZone(), nil, types.NewIntDatum(5)) + require.NoError(t, err) + val4, err := codec.EncodeKey(sc.TimeZone(), nil, types.NewIntDatum(876)) + require.NoError(t, err) + val5, err := codec.EncodeKey(sc.TimeZone(), nil, types.NewIntDatum(990)) + require.NoError(t, err) + val6, err := codec.EncodeKey(sc.TimeZone(), nil, types.NewIntDatum(95)) + require.NoError(t, err) + val0Bytes := types.NewBytesDatum(val0) + val1Bytes := types.NewBytesDatum(val1) + val2Bytes := types.NewBytesDatum(val2) + val3Bytes := types.NewBytesDatum(val3) + val4Bytes := types.NewBytesDatum(val4) + val5Bytes := types.NewBytesDatum(val5) + val6Bytes := types.NewBytesDatum(val6) + hist0 := NewHistogram(0, 6, 0, 0, types.NewFieldType(mysql.TypeBlob), 0, 0) + hist0.AppendBucketWithNDV(&val0Bytes, &val0Bytes, 0, 0, 0) + hist0.AppendBucketWithNDV(&val1Bytes, &val1Bytes, 0, 0, 0) + hist0.AppendBucketWithNDV(&val2Bytes, &val3Bytes, 10, 3, 2) + testData[0].inputHist = hist0 + hist1 := NewHistogram(0, 6, 0, 0, types.NewFieldType(mysql.TypeBlob), 0, 0) + hist1.AppendBucketWithNDV(&val0Bytes, &val0Bytes, 0, 0, 0) + hist1.AppendBucketWithNDV(&val1Bytes, &val1Bytes, 0, 0, 0) + hist1.AppendBucketWithNDV(&val2Bytes, &val3Bytes, 0, 0, 0) + testData[1].inputHist = hist1 + hist2 := NewHistogram(0, 6, 0, 0, types.NewFieldType(mysql.TypeBlob), 0, 0) + hist2.AppendBucketWithNDV(&val2Bytes, &val3Bytes, 10, 3, 2) + hist2.AppendBucketWithNDV(&val4Bytes, &val4Bytes, 10, 0, 0) + hist2.AppendBucketWithNDV(&val5Bytes, &val5Bytes, 10, 0, 0) + testData[2].inputHist = hist2 + hist3 := NewHistogram(0, 6, 0, 0, types.NewFieldType(mysql.TypeBlob), 0, 0) + hist3.AppendBucketWithNDV(&val0Bytes, &val0Bytes, 10, 10, 1) + hist3.AppendBucketWithNDV(&val1Bytes, &val2Bytes, 22, 4, 20) + hist3.AppendBucketWithNDV(&val3Bytes, &val5Bytes, 32, 6, 2) + testData[3].inputHist = hist3 + hist4 := NewHistogram(0, 6, 0, 0, types.NewFieldType(mysql.TypeBlob), 0, 0) + hist4.AppendBucketWithNDV(&val0Bytes, &val0Bytes, 0, 0, 0) + hist4.AppendBucketWithNDV(&val1Bytes, &val1Bytes, 0, 0, 0) + hist4.AppendBucketWithNDV(&val2Bytes, &val2Bytes, 10, 3, 2) + hist4.AppendBucketWithNDV(&val3Bytes, &val3Bytes, 10, 0, 0) + hist4.AppendBucketWithNDV(&val4Bytes, &val4Bytes, 10, 0, 0) + hist4.AppendBucketWithNDV(&val5Bytes, &val5Bytes, 20, 3, 2) + hist4.AppendBucketWithNDV(&val6Bytes, &val6Bytes, 30, 3, 2) + testData[4].inputHist = hist4 + hist5 := NewHistogram(0, 6, 0, 0, types.NewFieldType(mysql.TypeBlob), 0, 0) + hist5.AppendBucketWithNDV(&val0Bytes, &val0Bytes, 0, 0, 0) + hist5.AppendBucketWithNDV(&val1Bytes, &val1Bytes, 0, 0, 0) + hist5.AppendBucketWithNDV(&val2Bytes, &val2Bytes, 10, 3, 2) + hist5.AppendBucketWithNDV(&val3Bytes, &val3Bytes, 10, 0, 0) + hist5.AppendBucketWithNDV(&val4Bytes, &val4Bytes, 20, 3, 2) + hist5.AppendBucketWithNDV(&val5Bytes, &val5Bytes, 30, 3, 2) + hist5.AppendBucketWithNDV(&val6Bytes, &val6Bytes, 30, 0, 0) + testData[5].inputHist = hist5 + + // 3. the actual test + for i, test := range testData { + require.Equal(t, test.inputHistToStr, test.inputHist.ToString(1)) + test.inputHist.StandardizeForV2AnalyzeIndex() + require.Equal(t, test.outputHistToStr, test.inputHist.ToString(1), + fmt.Sprintf("testData[%d].inputHist:%s", i, test.inputHistToStr)) + } +} + +func generateData(t *testing.T) *Histogram { + var data []*bucket4Test + sumCount := int64(0) + for n := 100; n < 10000; n = n + 100 { + sumCount += 100 + data = append(data, &bucket4Test{ + lower: int64(n), + upper: int64(n + 100), + count: sumCount, + repeat: 10, + ndv: 10, + }) + } + return genHist4Test(t, data, 0) +} diff --git a/statistics/handle/globalstats/globalstats_internal_test.go b/statistics/handle/globalstats/globalstats_internal_test.go index ab88e12e48a68..40ef7a1e4ab74 100644 --- a/statistics/handle/globalstats/globalstats_internal_test.go +++ b/statistics/handle/globalstats/globalstats_internal_test.go @@ -351,10 +351,51 @@ func testIssues24349(testKit *testkit.TestKit) { testKit.MustExec("create table t (a int, b int) partition by hash(a) partitions 3") testKit.MustExec("insert into t values (0, 3), (0, 3), (0, 3), (0, 2), (1, 1), (1, 2), (1, 2), (1, 2), (1, 3), (1, 4), (2, 1), (2, 1)") testKit.MustExec("analyze table t with 1 topn, 3 buckets") +<<<<<<< HEAD:statistics/handle/globalstats/globalstats_internal_test.go testKit.MustQuery("show stats_buckets where partition_name='global'").Check(testkit.Rows( "test t global a 0 0 2 2 0 2 0", "test t global b 0 0 3 1 1 2 0", "test t global b 0 1 10 1 4 4 0", +======= + testKit.MustQuery("show stats_topn where partition_name = 'global'").Sort().Check(testkit.Rows( + "test t global a 0 1 6", + "test t global b 0 2 4", + )) + testKit.MustExec("explain select * from t where a > 0 and b > 0") + testKit.MustQuery("show stats_topn where table_name = 't'").Sort().Check(testkit.Rows( + "test t global a 0 1 6", + "test t global b 0 2 4", + "test t p0 a 0 0 4", + "test t p0 b 0 3 3", + "test t p1 a 0 1 6", + "test t p1 b 0 2 3", + "test t p2 a 0 2 2", + "test t p2 b 0 1 2", + )) + // column a is trival. + // column b: + // TopN: + // p0: b=3, occurs 3 times + // p1: b=2, occurs 3 times + // p2: b=1, occurs 2 times + // Histogram: + // p0: hist of b: [2, 2] count=repeat=2 + // p1: hist of b: [1, 3] count=2, repeat=3. [4, 4] count==repeat=1 + // After merging global TopN, it should be 2 with 4 as the repeat.(constructed by p1's TopN and p0's histogram) + // Kicking it out, the remained buckets for b are:(consider TopN as a bucket whose lower bound is the same as upper bound and count is the same as repeat) + // [3, 3] count=repeat=4 + // [1, 1] count=repeat=2 + // [1, 3] count=1, repeat=0(merged into TopN) + // [4, 4] count=repeat=1 + // Finally, get one global bucket [1, 4] count=8, repeat=1 + testKit.MustQuery("show stats_buckets where table_name='t'").Sort().Check(testkit.Rows( + "test t global a 0 0 4 4 0 0 0", + "test t global a 0 1 6 2 2 2 0", + "test t global b 0 0 8 1 1 4 0", + "test t p0 b 0 0 1 1 2 2 0", + "test t p1 b 0 0 2 1 1 3 0", + "test t p1 b 0 1 3 1 4 4 0", +>>>>>>> 41c3b01dbc1 (statistics: BinarySearchRemoveVal should use decoded val instead of encoded (#59131)):pkg/statistics/handle/globalstats/global_stats_internal_test.go )) } diff --git a/statistics/histogram.go b/statistics/histogram.go index f1c50774ae578..f45f197b289ff 100644 --- a/statistics/histogram.go +++ b/statistics/histogram.go @@ -281,12 +281,35 @@ func (hg *Histogram) BucketToString(bktID, idxCols int) string { } // BinarySearchRemoveVal removes the value from the TopN using binary search. -func (hg *Histogram) BinarySearchRemoveVal(valCntPairs TopNMeta) { +func (hg *Histogram) BinarySearchRemoveVal(val *types.Datum, count int64) { lowIdx, highIdx := 0, hg.Len()-1 +<<<<<<< HEAD:statistics/histogram.go for lowIdx <= highIdx { midIdx := (lowIdx + highIdx) / 2 cmpResult := bytes.Compare(hg.Bounds.Column(0).GetRaw(midIdx*2), valCntPairs.Encoded) if cmpResult > 0 { +======= + // if hg is too small, we don't need to check the branch. because the cost is more than binary search. + if hg.Len() > 4 { + if cmpResult := chunk.Compare(hg.Bounds.GetRow(highIdx*2+1), 0, val); cmpResult < 0 { + return + } + if cmpResult := chunk.Compare(hg.Bounds.GetRow(lowIdx), 0, val); cmpResult > 0 { + return + } + } + var midIdx = 0 + var found bool + for lowIdx <= highIdx { + midIdx = (lowIdx + highIdx) / 2 + cmpResult := chunk.Compare(hg.Bounds.GetRow(midIdx*2), 0, val) + if cmpResult > 0 { + highIdx = midIdx - 1 + continue + } + cmpResult = chunk.Compare(hg.Bounds.GetRow(midIdx*2+1), 0, val) + if cmpResult < 0 { +>>>>>>> 41c3b01dbc1 (statistics: BinarySearchRemoveVal should use decoded val instead of encoded (#59131)):pkg/statistics/histogram.go lowIdx = midIdx + 1 continue } @@ -301,12 +324,29 @@ func (hg *Histogram) BinarySearchRemoveVal(valCntPairs TopNMeta) { if cmpResult == 0 { hg.Buckets[midIdx].Repeat = 0 } +<<<<<<< HEAD:statistics/histogram.go hg.Buckets[midIdx].Count -= int64(valCntPairs.Count) if hg.Buckets[midIdx].Count < 0 { hg.Buckets[midIdx].Count = 0 +======= + midbucket.Count -= count + if midbucket.Count < 0 { + midbucket.Count = 0 +>>>>>>> 41c3b01dbc1 (statistics: BinarySearchRemoveVal should use decoded val instead of encoded (#59131)):pkg/statistics/histogram.go } break } +<<<<<<< HEAD:statistics/histogram.go +======= + if found { + for midIdx++; midIdx <= hg.Len()-1; midIdx++ { + hg.Buckets[midIdx].Count -= count + if hg.Buckets[midIdx].Count < 0 { + hg.Buckets[midIdx].Count = 0 + } + } + } +>>>>>>> 41c3b01dbc1 (statistics: BinarySearchRemoveVal should use decoded val instead of encoded (#59131)):pkg/statistics/histogram.go } // RemoveVals remove the given values from the histogram. diff --git a/statistics/merge_worker.go b/statistics/merge_worker.go index fe85aab841fe0..bae066f105cfd 100644 --- a/statistics/merge_worker.go +++ b/statistics/merge_worker.go @@ -148,7 +148,11 @@ func (worker *topnStatsMergeWorker) Run(timeZone *time.Location, isIndex bool, v count, _ := allHists[j].equalRowCount(datum, isIndex) if count != 0 { // Remove the value corresponding to encodedVal from the histogram. +<<<<<<< HEAD:statistics/merge_worker.go worker.statsWrapper.AllHg[j].BinarySearchRemoveVal(TopNMeta{Encoded: datum.GetBytes(), Count: uint64(count)}) +======= + worker.statsWrapper.AllHg[j].BinarySearchRemoveVal(&datum, int64(count)) +>>>>>>> 41c3b01dbc1 (statistics: BinarySearchRemoveVal should use decoded val instead of encoded (#59131)):pkg/statistics/handle/globalstats/merge_worker.go } worker.shardMutex[j].Unlock() if count != 0 {