From 5ac2404dca1f8bcba458d4bac8f16c894e8d3360 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Wed, 25 Dec 2024 01:13:39 +0800 Subject: [PATCH 1/2] This is an automated cherry-pick of #58507 Signed-off-by: ti-chi-bot --- pkg/planner/core/issuetest/BUILD.bazel | 27 ++ .../core/issuetest/planner_issue_test.go | 179 +++++++ pkg/planner/core/task_base.go | 435 ++++++++++++++++++ 3 files changed, 641 insertions(+) create mode 100644 pkg/planner/core/issuetest/BUILD.bazel create mode 100644 pkg/planner/core/issuetest/planner_issue_test.go create mode 100644 pkg/planner/core/task_base.go diff --git a/pkg/planner/core/issuetest/BUILD.bazel b/pkg/planner/core/issuetest/BUILD.bazel new file mode 100644 index 0000000000000..2a3b07f0c3748 --- /dev/null +++ b/pkg/planner/core/issuetest/BUILD.bazel @@ -0,0 +1,27 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_test") + +go_test( + name = "issuetest_test", + timeout = "short", + srcs = [ + "main_test.go", + "planner_issue_test.go", + ], + data = glob(["testdata/**"]), + flaky = True, + race = "on", + shard_count = 6, + deps = [ + "//pkg/parser", + "//pkg/planner", + "//pkg/planner/core", + "//pkg/planner/core/base", + "//pkg/planner/core/resolve", + "//pkg/testkit", + "//pkg/testkit/testdata", + "//pkg/testkit/testmain", + "//pkg/testkit/testsetup", + "@com_github_stretchr_testify//require", + "@org_uber_go_goleak//:goleak", + ], +) diff --git a/pkg/planner/core/issuetest/planner_issue_test.go b/pkg/planner/core/issuetest/planner_issue_test.go new file mode 100644 index 0000000000000..ca0b80fb7504f --- /dev/null +++ b/pkg/planner/core/issuetest/planner_issue_test.go @@ -0,0 +1,179 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package issuetest + +import ( + "context" + "testing" + + "github.com/pingcap/tidb/pkg/parser" + "github.com/pingcap/tidb/pkg/planner" + "github.com/pingcap/tidb/pkg/planner/core" + "github.com/pingcap/tidb/pkg/planner/core/base" + "github.com/pingcap/tidb/pkg/planner/core/resolve" + "github.com/pingcap/tidb/pkg/testkit" + "github.com/stretchr/testify/require" +) + +// It's a case for Columns in tableScan and indexScan with double reader +func TestIssue43461(t *testing.T) { + store, domain := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(a int, b int, c int, index b(b), index b_c(b, c)) partition by hash(a) partitions 4;") + tk.MustExec("analyze table t") + + stmt, err := parser.New().ParseOneStmt("select * from t use index(b) where b > 1 order by b limit 1", "", "") + require.NoError(t, err) + + nodeW := resolve.NewNodeW(stmt) + p, _, err := planner.Optimize(context.TODO(), tk.Session(), nodeW, domain.InfoSchema()) + require.NoError(t, err) + require.NotNil(t, p) + + var idxLookUpPlan *core.PhysicalIndexLookUpReader + var ok bool + + for { + idxLookUpPlan, ok = p.(*core.PhysicalIndexLookUpReader) + if ok { + break + } + p = p.(base.PhysicalPlan).Children()[0] + } + require.True(t, ok) + + is := idxLookUpPlan.IndexPlans[0].(*core.PhysicalIndexScan) + ts := idxLookUpPlan.TablePlans[0].(*core.PhysicalTableScan) + + require.NotEqual(t, is.Columns, ts.Columns) +} + +func Test53726(t *testing.T) { + // test for RemoveUnnecessaryFirstRow + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t7(c int); ") + tk.MustExec("insert into t7 values (575932053), (-258025139);") + tk.MustQuery("select distinct cast(c as decimal), cast(c as signed) from t7"). + Sort().Check(testkit.Rows("-258025139 -258025139", "575932053 575932053")) + tk.MustQuery("explain select distinct cast(c as decimal), cast(c as signed) from t7"). + Check(testkit.Rows( + "HashAgg_8 8000.00 root group by:Column#7, Column#8, funcs:firstrow(Column#7)->Column#3, funcs:firstrow(Column#8)->Column#4", + "└─TableReader_9 8000.00 root data:HashAgg_4", + " └─HashAgg_4 8000.00 cop[tikv] group by:cast(test.t7.c, bigint(22) BINARY), cast(test.t7.c, decimal(10,0) BINARY), ", + " └─TableFullScan_7 10000.00 cop[tikv] table:t7 keep order:false, stats:pseudo")) + + tk.MustExec("analyze table t7 all columns") + tk.MustQuery("select distinct cast(c as decimal), cast(c as signed) from t7"). + Sort(). + Check(testkit.Rows("-258025139 -258025139", "575932053 575932053")) + tk.MustQuery("explain select distinct cast(c as decimal), cast(c as signed) from t7"). + Check(testkit.Rows( + "HashAgg_6 2.00 root group by:Column#11, Column#12, funcs:firstrow(Column#11)->Column#3, funcs:firstrow(Column#12)->Column#4", + "└─Projection_12 2.00 root cast(test.t7.c, decimal(10,0) BINARY)->Column#11, cast(test.t7.c, bigint(22) BINARY)->Column#12", + " └─TableReader_11 2.00 root data:TableFullScan_10", + " └─TableFullScan_10 2.00 cop[tikv] table:t7 keep order:false")) +} + +func TestIssue54535(t *testing.T) { + // test for tidb_enable_inl_join_inner_multi_pattern system variable + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("set session tidb_enable_inl_join_inner_multi_pattern='ON'") + tk.MustExec("create table ta(a1 int, a2 int, a3 int, index idx_a(a1))") + tk.MustExec("create table tb(b1 int, b2 int, b3 int, index idx_b(b1))") + tk.MustExec("analyze table ta") + tk.MustExec("analyze table tb") + + tk.MustQuery("explain SELECT /*+ inl_join(tmp) */ * FROM ta, (SELECT b1, COUNT(b3) AS cnt FROM tb GROUP BY b1, b2) as tmp where ta.a1 = tmp.b1"). + Check(testkit.Rows( + "Projection_9 9990.00 root test.ta.a1, test.ta.a2, test.ta.a3, test.tb.b1, Column#9", + "└─IndexJoin_16 9990.00 root inner join, inner:HashAgg_14, outer key:test.ta.a1, inner key:test.tb.b1, equal cond:eq(test.ta.a1, test.tb.b1)", + " ├─TableReader_43(Build) 9990.00 root data:Selection_42", + " │ └─Selection_42 9990.00 cop[tikv] not(isnull(test.ta.a1))", + " │ └─TableFullScan_41 10000.00 cop[tikv] table:ta keep order:false, stats:pseudo", + " └─HashAgg_14(Probe) 79840080.00 root group by:test.tb.b1, test.tb.b2, funcs:count(Column#11)->Column#9, funcs:firstrow(test.tb.b1)->test.tb.b1", + " └─IndexLookUp_15 79840080.00 root ", + " ├─Selection_12(Build) 9990.00 cop[tikv] not(isnull(test.tb.b1))", + " │ └─IndexRangeScan_10 10000.00 cop[tikv] table:tb, index:idx_b(b1) range: decided by [eq(test.tb.b1, test.ta.a1)], keep order:false, stats:pseudo", + " └─HashAgg_13(Probe) 79840080.00 cop[tikv] group by:test.tb.b1, test.tb.b2, funcs:count(test.tb.b3)->Column#11", + " └─TableRowIDScan_11 9990.00 cop[tikv] table:tb keep order:false, stats:pseudo")) + // test for issues/55169 + tk.MustExec("create table t1(col_1 int, index idx_1(col_1));") + tk.MustExec("create table t2(col_1 int, col_2 int, index idx_2(col_1));") + tk.MustQuery("select /*+ inl_join(tmp) */ * from t1 inner join (select col_1, group_concat(col_2) from t2 group by col_1) tmp on t1.col_1 = tmp.col_1;").Check(testkit.Rows()) + tk.MustQuery("select /*+ inl_join(tmp) */ * from t1 inner join (select col_1, group_concat(distinct col_2 order by col_2) from t2 group by col_1) tmp on t1.col_1 = tmp.col_1;").Check(testkit.Rows()) +} + +func TestIssue53175(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec(`create table t(a int)`) + tk.MustExec(`set @@sql_mode = default`) + tk.MustQuery(`select @@sql_mode REGEXP 'ONLY_FULL_GROUP_BY'`).Check(testkit.Rows("1")) + tk.MustContainErrMsg(`select * from t group by null`, "[planner:1055]Expression #1 of SELECT list is not in GROUP BY clause and contains nonaggregated column 'test.t.a' which is not functionally dependent on columns in GROUP BY clause; this is incompatible with sql_mode=only_full_group_by") + tk.MustExec(`create view v as select * from t group by null`) + tk.MustContainErrMsg(`select * from v`, "[planner:1055]Expression #1 of SELECT list is not in GROUP BY clause and contains nonaggregated column 'test.t.a' which is not functionally dependent on columns in GROUP BY clause; this is incompatible with sql_mode=only_full_group_by") + tk.MustExec(`set @@sql_mode = ''`) + tk.MustQuery(`select * from t group by null`) + tk.MustQuery(`select * from v`) +} + +func TestIssues57583(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test;") + tk.MustExec("create table t1(id int, v1 int, v2 int, v3 int);") + tk.MustExec(" create table t2(id int, v1 int, v2 int, v3 int);") + tk.MustQuery("explain select t1.id from t1 join t2 on t1.v1 = t2.v2 intersect select t1.id from t1 join t2 on t1.v1 = t2.v2;").Check(testkit.Rows( + "HashJoin_15 6393.60 root semi join, left side:HashAgg_16, equal:[nulleq(test.t1.id, test.t1.id)]", + "├─HashJoin_26(Build) 12487.50 root inner join, equal:[eq(test.t1.v1, test.t2.v2)]", + "│ ├─TableReader_33(Build) 9990.00 root data:Selection_32", + "│ │ └─Selection_32 9990.00 cop[tikv] not(isnull(test.t2.v2))", + "│ │ └─TableFullScan_31 10000.00 cop[tikv] table:t2 keep order:false, stats:pseudo", + "│ └─TableReader_30(Probe) 9990.00 root data:Selection_29", + "│ └─Selection_29 9990.00 cop[tikv] not(isnull(test.t1.v1))", + "│ └─TableFullScan_28 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo", + "└─HashAgg_16(Probe) 7992.00 root group by:test.t1.id, funcs:firstrow(test.t1.id)->test.t1.id", + " └─HashJoin_17 12487.50 root inner join, equal:[eq(test.t1.v1, test.t2.v2)]", + " ├─TableReader_24(Build) 9990.00 root data:Selection_23", + " │ └─Selection_23 9990.00 cop[tikv] not(isnull(test.t2.v2))", + " │ └─TableFullScan_22 10000.00 cop[tikv] table:t2 keep order:false, stats:pseudo", + " └─TableReader_21(Probe) 9990.00 root data:Selection_20", + " └─Selection_20 9990.00 cop[tikv] not(isnull(test.t1.v1))", + " └─TableFullScan_19 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo")) +} + +func TestIssue58476(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test;") + tk.MustExec("CREATE TABLE t3 (id int PRIMARY KEY,c1 varchar(256),c2 varchar(256) GENERATED ALWAYS AS (concat(c1, c1)) VIRTUAL,KEY (id));") + tk.MustExec("insert into t3(id, c1) values (50, 'c');") + tk.MustQuery("SELECT /*+ USE_INDEX_MERGE(`t3`)*/ id FROM `t3` WHERE c2 BETWEEN 'a' AND 'b' GROUP BY id HAVING id < 100 or id > 0;").Check(testkit.Rows()) + tk.MustQuery("explain format='brief' SELECT /*+ USE_INDEX_MERGE(`t3`)*/ id FROM `t3` WHERE c2 BETWEEN 'a' AND 'b' GROUP BY id HAVING id < 100 or id > 0;"). + Check(testkit.Rows( + `Projection 249.75 root test.t3.id`, + `└─Selection 249.75 root ge(test.t3.c2, "a"), le(test.t3.c2, "b")`, + ` └─Projection 9990.00 root test.t3.id, test.t3.c2`, + ` └─IndexMerge 9990.00 root type: union`, + ` ├─IndexRangeScan(Build) 3323.33 cop[tikv] table:t3, index:id(id) range:[-inf,100), keep order:false, stats:pseudo`, + ` ├─TableRangeScan(Build) 3333.33 cop[tikv] table:t3 range:(0,+inf], keep order:false, stats:pseudo`, + ` └─TableRowIDScan(Probe) 9990.00 cop[tikv] table:t3 keep order:false, stats:pseudo`)) +} diff --git a/pkg/planner/core/task_base.go b/pkg/planner/core/task_base.go new file mode 100644 index 0000000000000..6b32b7b67a034 --- /dev/null +++ b/pkg/planner/core/task_base.go @@ -0,0 +1,435 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package core + +import ( + "github.com/pingcap/tidb/pkg/expression" + "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/planner/cardinality" + "github.com/pingcap/tidb/pkg/planner/core/base" + "github.com/pingcap/tidb/pkg/planner/core/cost" + "github.com/pingcap/tidb/pkg/planner/property" + "github.com/pingcap/tidb/pkg/statistics" + "github.com/pingcap/tidb/pkg/util/logutil" + "github.com/pingcap/tidb/pkg/util/size" + "github.com/pingcap/tipb/go-tipb" + "go.uber.org/zap" +) + +var ( + _ base.Task = &RootTask{} + _ base.Task = &MppTask{} + _ base.Task = &CopTask{} +) + +// ************************************* RootTask Start ****************************************** + +// RootTask is the final sink node of a plan graph. It should be a single goroutine on tidb. +type RootTask struct { + p base.PhysicalPlan + isEmpty bool // isEmpty indicates if this task contains a dual table and returns empty data. + // TODO: The flag 'isEmpty' is only checked by Projection and UnionAll. We should support more cases in the future. +} + +// GetPlan returns the root task's plan. +func (t *RootTask) GetPlan() base.PhysicalPlan { + return t.p +} + +// SetPlan sets the root task' plan. +func (t *RootTask) SetPlan(p base.PhysicalPlan) { + t.p = p +} + +// IsEmpty indicates whether root task is empty. +func (t *RootTask) IsEmpty() bool { + return t.isEmpty +} + +// SetEmpty set the root task as empty. +func (t *RootTask) SetEmpty(x bool) { + t.isEmpty = x +} + +// Copy implements Task interface. +func (t *RootTask) Copy() base.Task { + return &RootTask{ + p: t.p, + } +} + +// ConvertToRootTask implements Task interface. +func (t *RootTask) ConvertToRootTask(_ base.PlanContext) base.Task { + return t.Copy().(*RootTask) +} + +// Invalid implements Task interface. +func (t *RootTask) Invalid() bool { + return t.p == nil +} + +// Count implements Task interface. +func (t *RootTask) Count() float64 { + return t.p.StatsInfo().RowCount +} + +// Plan implements Task interface. +func (t *RootTask) Plan() base.PhysicalPlan { + return t.p +} + +// MemoryUsage return the memory usage of rootTask +func (t *RootTask) MemoryUsage() (sum int64) { + if t == nil { + return + } + sum = size.SizeOfInterface + size.SizeOfBool + if t.p != nil { + sum += t.p.MemoryUsage() + } + return sum +} + +// ************************************* RootTask End ****************************************** + +// ************************************* MPPTask Start ****************************************** + +// MppTask can not : +// 1. keep order +// 2. support double read +// 3. consider virtual columns. +// 4. TODO: partition prune after close +type MppTask struct { + p base.PhysicalPlan + + partTp property.MPPPartitionType + hashCols []*property.MPPPartitionColumn + + // rootTaskConds record filters of TableScan that cannot be pushed down to TiFlash. + + // For logical plan like: HashAgg -> Selection -> TableScan, if filters in Selection cannot be pushed to TiFlash. + // Planner will generate physical plan like: PhysicalHashAgg -> PhysicalSelection -> TableReader -> PhysicalTableScan(cop tiflash) + // Because planner will make mppTask invalid directly then use copTask directly. + + // But in DisaggregatedTiFlash mode, cop and batchCop protocol is disabled, so we have to consider this situation for mppTask. + // When generating PhysicalTableScan, if prop.TaskTp is RootTaskType, mppTask will be converted to rootTask, + // and filters in rootTaskConds will be added in a Selection which will be executed in TiDB. + // So physical plan be like: PhysicalHashAgg -> PhysicalSelection -> TableReader -> ExchangeSender -> PhysicalTableScan(mpp tiflash) + rootTaskConds []expression.Expression + tblColHists *statistics.HistColl +} + +// Count implements Task interface. +func (t *MppTask) Count() float64 { + return t.p.StatsInfo().RowCount +} + +// Copy implements Task interface. +func (t *MppTask) Copy() base.Task { + nt := *t + return &nt +} + +// Plan implements Task interface. +func (t *MppTask) Plan() base.PhysicalPlan { + return t.p +} + +// Invalid implements Task interface. +func (t *MppTask) Invalid() bool { + return t.p == nil +} + +// ConvertToRootTask implements Task interface. +func (t *MppTask) ConvertToRootTask(ctx base.PlanContext) base.Task { + return t.Copy().(*MppTask).ConvertToRootTaskImpl(ctx) +} + +// MemoryUsage return the memory usage of mppTask +func (t *MppTask) MemoryUsage() (sum int64) { + if t == nil { + return + } + + sum = size.SizeOfInterface + size.SizeOfInt + size.SizeOfSlice + int64(cap(t.hashCols))*size.SizeOfPointer + if t.p != nil { + sum += t.p.MemoryUsage() + } + return +} + +// ConvertToRootTaskImpl implements Task interface. +func (t *MppTask) ConvertToRootTaskImpl(ctx base.PlanContext) *RootTask { + // In disaggregated-tiflash mode, need to consider generated column. + tryExpandVirtualColumn(t.p) + sender := PhysicalExchangeSender{ + ExchangeType: tipb.ExchangeType_PassThrough, + }.Init(ctx, t.p.StatsInfo()) + sender.SetChildren(t.p) + + p := PhysicalTableReader{ + tablePlan: sender, + StoreType: kv.TiFlash, + }.Init(ctx, t.p.QueryBlockOffset()) + p.SetStats(t.p.StatsInfo()) + collectPartitionInfosFromMPPPlan(p, t.p) + rt := &RootTask{} + rt.SetPlan(p) + + if len(t.rootTaskConds) > 0 { + // Some Filter cannot be pushed down to TiFlash, need to add Selection in rootTask, + // so this Selection will be executed in TiDB. + _, isTableScan := t.p.(*PhysicalTableScan) + _, isSelection := t.p.(*PhysicalSelection) + if isSelection { + _, isTableScan = t.p.Children()[0].(*PhysicalTableScan) + } + if !isTableScan { + // Need to make sure oriTaskPlan is TableScan, because rootTaskConds is part of TableScan.FilterCondition. + // It's only for TableScan. This is ensured by converting mppTask to rootTask just after TableScan is built, + // so no other operators are added into this mppTask. + logutil.BgLogger().Error("expect Selection or TableScan for mppTask.p", zap.String("mppTask.p", t.p.TP())) + return base.InvalidTask.(*RootTask) + } + selectivity, _, err := cardinality.Selectivity(ctx, t.tblColHists, t.rootTaskConds, nil) + if err != nil { + logutil.BgLogger().Debug("calculate selectivity failed, use selection factor", zap.Error(err)) + selectivity = cost.SelectionFactor + } + sel := PhysicalSelection{Conditions: t.rootTaskConds}.Init(ctx, rt.GetPlan().StatsInfo().Scale(selectivity), rt.GetPlan().QueryBlockOffset()) + sel.fromDataSource = true + sel.SetChildren(rt.GetPlan()) + rt.SetPlan(sel) + } + return rt +} + +// ************************************* MPPTask End ****************************************** + +// ************************************* CopTask Start ****************************************** + +// CopTask is a task that runs in a distributed kv store. +// TODO: In future, we should split copTask to indexTask and tableTask. +type CopTask struct { + indexPlan base.PhysicalPlan + tablePlan base.PhysicalPlan + // indexPlanFinished means we have finished index plan. + indexPlanFinished bool + // keepOrder indicates if the plan scans data by order. + keepOrder bool + // needExtraProj means an extra prune is needed because + // in double read / index merge cases, they may output one more column for handle(row id). + needExtraProj bool + // originSchema is the target schema to be projected to when needExtraProj is true. + originSchema *expression.Schema + + extraHandleCol *expression.Column + commonHandleCols []*expression.Column + // tblColHists stores the original stats of DataSource, it is used to get + // average row width when computing network cost. + tblColHists *statistics.HistColl + // tblCols stores the original columns of DataSource before being pruned, it + // is used to compute average row width when computing scan cost. + tblCols []*expression.Column + + idxMergePartPlans []base.PhysicalPlan + idxMergeIsIntersection bool + idxMergeAccessMVIndex bool + + // rootTaskConds stores select conditions containing virtual columns. + // These conditions can't push to TiKV, so we have to add a selection for rootTask + rootTaskConds []expression.Expression + + // For table partition. + physPlanPartInfo *PhysPlanPartInfo + + // expectCnt is the expected row count of upper task, 0 for unlimited. + // It's used for deciding whether using paging distsql. + expectCnt uint64 +} + +// Invalid implements Task interface. +func (t *CopTask) Invalid() bool { + return t.tablePlan == nil && t.indexPlan == nil && len(t.idxMergePartPlans) == 0 +} + +// Count implements Task interface. +func (t *CopTask) Count() float64 { + if t.indexPlanFinished { + return t.tablePlan.StatsInfo().RowCount + } + return t.indexPlan.StatsInfo().RowCount +} + +// Copy implements Task interface. +func (t *CopTask) Copy() base.Task { + nt := *t + return &nt +} + +// Plan implements Task interface. +// copTask plan should be careful with indexMergeReader, whose real plan is stored in +// idxMergePartPlans, when its indexPlanFinished is marked with false. +func (t *CopTask) Plan() base.PhysicalPlan { + if t.indexPlanFinished { + return t.tablePlan + } + return t.indexPlan +} + +// MemoryUsage return the memory usage of copTask +func (t *CopTask) MemoryUsage() (sum int64) { + if t == nil { + return + } + + sum = size.SizeOfInterface*(2+int64(cap(t.idxMergePartPlans)+cap(t.rootTaskConds))) + size.SizeOfBool*3 + size.SizeOfUint64 + + size.SizeOfPointer*(3+int64(cap(t.commonHandleCols)+cap(t.tblCols))) + size.SizeOfSlice*4 + t.physPlanPartInfo.MemoryUsage() + if t.indexPlan != nil { + sum += t.indexPlan.MemoryUsage() + } + if t.tablePlan != nil { + sum += t.tablePlan.MemoryUsage() + } + if t.originSchema != nil { + sum += t.originSchema.MemoryUsage() + } + if t.extraHandleCol != nil { + sum += t.extraHandleCol.MemoryUsage() + } + + for _, col := range t.commonHandleCols { + sum += col.MemoryUsage() + } + for _, col := range t.tblCols { + sum += col.MemoryUsage() + } + for _, p := range t.idxMergePartPlans { + sum += p.MemoryUsage() + } + for _, expr := range t.rootTaskConds { + sum += expr.MemoryUsage() + } + return +} + +// ConvertToRootTask implements Task interface. +func (t *CopTask) ConvertToRootTask(ctx base.PlanContext) base.Task { + // copy one to avoid changing itself. + return t.Copy().(*CopTask).convertToRootTaskImpl(ctx) +} + +func (t *CopTask) convertToRootTaskImpl(ctx base.PlanContext) *RootTask { + // copTasks are run in parallel, to make the estimated cost closer to execution time, we amortize + // the cost to cop iterator workers. According to `CopClient::Send`, the concurrency + // is Min(DistSQLScanConcurrency, numRegionsInvolvedInScan), since we cannot infer + // the number of regions involved, we simply use DistSQLScanConcurrency. + t.finishIndexPlan() + // Network cost of transferring rows of table scan to TiDB. + if t.tablePlan != nil { + tp := t.tablePlan + for len(tp.Children()) > 0 { + if len(tp.Children()) == 1 { + tp = tp.Children()[0] + } else { + join := tp.(*PhysicalHashJoin) + tp = join.Children()[1-join.InnerChildIdx] + } + } + ts := tp.(*PhysicalTableScan) + prevColumnLen := len(ts.Columns) + prevSchema := ts.schema.Clone() + ts.Columns = ExpandVirtualColumn(ts.Columns, ts.schema, ts.Table.Columns) + if !t.needExtraProj && len(ts.Columns) > prevColumnLen { + // Add a projection to make sure not to output extract columns. + t.needExtraProj = true + t.originSchema = prevSchema + } + } + newTask := &RootTask{} + if t.idxMergePartPlans != nil { + p := PhysicalIndexMergeReader{ + partialPlans: t.idxMergePartPlans, + tablePlan: t.tablePlan, + IsIntersectionType: t.idxMergeIsIntersection, + AccessMVIndex: t.idxMergeAccessMVIndex, + KeepOrder: t.keepOrder, + }.Init(ctx, t.idxMergePartPlans[0].QueryBlockOffset()) + p.PlanPartInfo = t.physPlanPartInfo + setTableScanToTableRowIDScan(p.tablePlan) + newTask.SetPlan(p) + if t.needExtraProj { + schema := t.originSchema + proj := PhysicalProjection{Exprs: expression.Column2Exprs(schema.Columns)}.Init(ctx, p.StatsInfo(), t.idxMergePartPlans[0].QueryBlockOffset(), nil) + proj.SetSchema(schema) + proj.SetChildren(p) + newTask.SetPlan(proj) + } + t.handleRootTaskConds(ctx, newTask) + return newTask + } + if t.indexPlan != nil && t.tablePlan != nil { + newTask = buildIndexLookUpTask(ctx, t) + } else if t.indexPlan != nil { + p := PhysicalIndexReader{indexPlan: t.indexPlan}.Init(ctx, t.indexPlan.QueryBlockOffset()) + p.PlanPartInfo = t.physPlanPartInfo + p.SetStats(t.indexPlan.StatsInfo()) + newTask.SetPlan(p) + } else { + tp := t.tablePlan + for len(tp.Children()) > 0 { + if len(tp.Children()) == 1 { + tp = tp.Children()[0] + } else { + join := tp.(*PhysicalHashJoin) + tp = join.Children()[1-join.InnerChildIdx] + } + } + ts := tp.(*PhysicalTableScan) + p := PhysicalTableReader{ + tablePlan: t.tablePlan, + StoreType: ts.StoreType, + IsCommonHandle: ts.Table.IsCommonHandle, + }.Init(ctx, t.tablePlan.QueryBlockOffset()) + p.PlanPartInfo = t.physPlanPartInfo + p.SetStats(t.tablePlan.StatsInfo()) + + // If agg was pushed down in Attach2Task(), the partial agg was placed on the top of tablePlan, the final agg was + // placed above the PhysicalTableReader, and the schema should have been set correctly for them, the schema of + // partial agg contains the columns needed by the final agg. + // If we add the projection here, the projection will be between the final agg and the partial agg, then the + // schema will be broken, the final agg will fail to find needed columns in ResolveIndices(). + // Besides, the agg would only be pushed down if it doesn't contain virtual columns, so virtual column should not be affected. + aggPushedDown := false + switch p.tablePlan.(type) { + case *PhysicalHashAgg, *PhysicalStreamAgg: + aggPushedDown = true + } + + if t.needExtraProj && !aggPushedDown { + proj := PhysicalProjection{Exprs: expression.Column2Exprs(t.originSchema.Columns)}.Init(ts.SCtx(), ts.StatsInfo(), ts.QueryBlockOffset(), nil) + proj.SetSchema(t.originSchema) + proj.SetChildren(p) + newTask.SetPlan(proj) + } else { + newTask.SetPlan(p) + } + } + + t.handleRootTaskConds(ctx, newTask) + return newTask +} + +// ************************************* CopTask End ****************************************** From b8741da166ab94daba4f859a5b4817979cb3135b Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Thu, 23 Jan 2025 17:28:16 +0800 Subject: [PATCH 2/2] update Signed-off-by: Weizhen Wang --- pkg/planner/core/issuetest/BUILD.bazel | 27 -- .../core/issuetest/planner_issue_test.go | 179 ------- pkg/planner/core/task_base.go | 435 ------------------ planner/core/issuetest/BUILD.bazel | 2 +- planner/core/issuetest/planner_issue_test.go | 18 + planner/core/task.go | 2 +- 6 files changed, 20 insertions(+), 643 deletions(-) delete mode 100644 pkg/planner/core/issuetest/BUILD.bazel delete mode 100644 pkg/planner/core/issuetest/planner_issue_test.go delete mode 100644 pkg/planner/core/task_base.go diff --git a/pkg/planner/core/issuetest/BUILD.bazel b/pkg/planner/core/issuetest/BUILD.bazel deleted file mode 100644 index 2a3b07f0c3748..0000000000000 --- a/pkg/planner/core/issuetest/BUILD.bazel +++ /dev/null @@ -1,27 +0,0 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_test") - -go_test( - name = "issuetest_test", - timeout = "short", - srcs = [ - "main_test.go", - "planner_issue_test.go", - ], - data = glob(["testdata/**"]), - flaky = True, - race = "on", - shard_count = 6, - deps = [ - "//pkg/parser", - "//pkg/planner", - "//pkg/planner/core", - "//pkg/planner/core/base", - "//pkg/planner/core/resolve", - "//pkg/testkit", - "//pkg/testkit/testdata", - "//pkg/testkit/testmain", - "//pkg/testkit/testsetup", - "@com_github_stretchr_testify//require", - "@org_uber_go_goleak//:goleak", - ], -) diff --git a/pkg/planner/core/issuetest/planner_issue_test.go b/pkg/planner/core/issuetest/planner_issue_test.go deleted file mode 100644 index ca0b80fb7504f..0000000000000 --- a/pkg/planner/core/issuetest/planner_issue_test.go +++ /dev/null @@ -1,179 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package issuetest - -import ( - "context" - "testing" - - "github.com/pingcap/tidb/pkg/parser" - "github.com/pingcap/tidb/pkg/planner" - "github.com/pingcap/tidb/pkg/planner/core" - "github.com/pingcap/tidb/pkg/planner/core/base" - "github.com/pingcap/tidb/pkg/planner/core/resolve" - "github.com/pingcap/tidb/pkg/testkit" - "github.com/stretchr/testify/require" -) - -// It's a case for Columns in tableScan and indexScan with double reader -func TestIssue43461(t *testing.T) { - store, domain := testkit.CreateMockStoreAndDomain(t) - tk := testkit.NewTestKit(t, store) - tk.MustExec("use test") - tk.MustExec("create table t(a int, b int, c int, index b(b), index b_c(b, c)) partition by hash(a) partitions 4;") - tk.MustExec("analyze table t") - - stmt, err := parser.New().ParseOneStmt("select * from t use index(b) where b > 1 order by b limit 1", "", "") - require.NoError(t, err) - - nodeW := resolve.NewNodeW(stmt) - p, _, err := planner.Optimize(context.TODO(), tk.Session(), nodeW, domain.InfoSchema()) - require.NoError(t, err) - require.NotNil(t, p) - - var idxLookUpPlan *core.PhysicalIndexLookUpReader - var ok bool - - for { - idxLookUpPlan, ok = p.(*core.PhysicalIndexLookUpReader) - if ok { - break - } - p = p.(base.PhysicalPlan).Children()[0] - } - require.True(t, ok) - - is := idxLookUpPlan.IndexPlans[0].(*core.PhysicalIndexScan) - ts := idxLookUpPlan.TablePlans[0].(*core.PhysicalTableScan) - - require.NotEqual(t, is.Columns, ts.Columns) -} - -func Test53726(t *testing.T) { - // test for RemoveUnnecessaryFirstRow - store := testkit.CreateMockStore(t) - tk := testkit.NewTestKit(t, store) - tk.MustExec("use test") - tk.MustExec("create table t7(c int); ") - tk.MustExec("insert into t7 values (575932053), (-258025139);") - tk.MustQuery("select distinct cast(c as decimal), cast(c as signed) from t7"). - Sort().Check(testkit.Rows("-258025139 -258025139", "575932053 575932053")) - tk.MustQuery("explain select distinct cast(c as decimal), cast(c as signed) from t7"). - Check(testkit.Rows( - "HashAgg_8 8000.00 root group by:Column#7, Column#8, funcs:firstrow(Column#7)->Column#3, funcs:firstrow(Column#8)->Column#4", - "└─TableReader_9 8000.00 root data:HashAgg_4", - " └─HashAgg_4 8000.00 cop[tikv] group by:cast(test.t7.c, bigint(22) BINARY), cast(test.t7.c, decimal(10,0) BINARY), ", - " └─TableFullScan_7 10000.00 cop[tikv] table:t7 keep order:false, stats:pseudo")) - - tk.MustExec("analyze table t7 all columns") - tk.MustQuery("select distinct cast(c as decimal), cast(c as signed) from t7"). - Sort(). - Check(testkit.Rows("-258025139 -258025139", "575932053 575932053")) - tk.MustQuery("explain select distinct cast(c as decimal), cast(c as signed) from t7"). - Check(testkit.Rows( - "HashAgg_6 2.00 root group by:Column#11, Column#12, funcs:firstrow(Column#11)->Column#3, funcs:firstrow(Column#12)->Column#4", - "└─Projection_12 2.00 root cast(test.t7.c, decimal(10,0) BINARY)->Column#11, cast(test.t7.c, bigint(22) BINARY)->Column#12", - " └─TableReader_11 2.00 root data:TableFullScan_10", - " └─TableFullScan_10 2.00 cop[tikv] table:t7 keep order:false")) -} - -func TestIssue54535(t *testing.T) { - // test for tidb_enable_inl_join_inner_multi_pattern system variable - store := testkit.CreateMockStore(t) - tk := testkit.NewTestKit(t, store) - tk.MustExec("use test") - tk.MustExec("set session tidb_enable_inl_join_inner_multi_pattern='ON'") - tk.MustExec("create table ta(a1 int, a2 int, a3 int, index idx_a(a1))") - tk.MustExec("create table tb(b1 int, b2 int, b3 int, index idx_b(b1))") - tk.MustExec("analyze table ta") - tk.MustExec("analyze table tb") - - tk.MustQuery("explain SELECT /*+ inl_join(tmp) */ * FROM ta, (SELECT b1, COUNT(b3) AS cnt FROM tb GROUP BY b1, b2) as tmp where ta.a1 = tmp.b1"). - Check(testkit.Rows( - "Projection_9 9990.00 root test.ta.a1, test.ta.a2, test.ta.a3, test.tb.b1, Column#9", - "└─IndexJoin_16 9990.00 root inner join, inner:HashAgg_14, outer key:test.ta.a1, inner key:test.tb.b1, equal cond:eq(test.ta.a1, test.tb.b1)", - " ├─TableReader_43(Build) 9990.00 root data:Selection_42", - " │ └─Selection_42 9990.00 cop[tikv] not(isnull(test.ta.a1))", - " │ └─TableFullScan_41 10000.00 cop[tikv] table:ta keep order:false, stats:pseudo", - " └─HashAgg_14(Probe) 79840080.00 root group by:test.tb.b1, test.tb.b2, funcs:count(Column#11)->Column#9, funcs:firstrow(test.tb.b1)->test.tb.b1", - " └─IndexLookUp_15 79840080.00 root ", - " ├─Selection_12(Build) 9990.00 cop[tikv] not(isnull(test.tb.b1))", - " │ └─IndexRangeScan_10 10000.00 cop[tikv] table:tb, index:idx_b(b1) range: decided by [eq(test.tb.b1, test.ta.a1)], keep order:false, stats:pseudo", - " └─HashAgg_13(Probe) 79840080.00 cop[tikv] group by:test.tb.b1, test.tb.b2, funcs:count(test.tb.b3)->Column#11", - " └─TableRowIDScan_11 9990.00 cop[tikv] table:tb keep order:false, stats:pseudo")) - // test for issues/55169 - tk.MustExec("create table t1(col_1 int, index idx_1(col_1));") - tk.MustExec("create table t2(col_1 int, col_2 int, index idx_2(col_1));") - tk.MustQuery("select /*+ inl_join(tmp) */ * from t1 inner join (select col_1, group_concat(col_2) from t2 group by col_1) tmp on t1.col_1 = tmp.col_1;").Check(testkit.Rows()) - tk.MustQuery("select /*+ inl_join(tmp) */ * from t1 inner join (select col_1, group_concat(distinct col_2 order by col_2) from t2 group by col_1) tmp on t1.col_1 = tmp.col_1;").Check(testkit.Rows()) -} - -func TestIssue53175(t *testing.T) { - store := testkit.CreateMockStore(t) - tk := testkit.NewTestKit(t, store) - tk.MustExec("use test") - tk.MustExec(`create table t(a int)`) - tk.MustExec(`set @@sql_mode = default`) - tk.MustQuery(`select @@sql_mode REGEXP 'ONLY_FULL_GROUP_BY'`).Check(testkit.Rows("1")) - tk.MustContainErrMsg(`select * from t group by null`, "[planner:1055]Expression #1 of SELECT list is not in GROUP BY clause and contains nonaggregated column 'test.t.a' which is not functionally dependent on columns in GROUP BY clause; this is incompatible with sql_mode=only_full_group_by") - tk.MustExec(`create view v as select * from t group by null`) - tk.MustContainErrMsg(`select * from v`, "[planner:1055]Expression #1 of SELECT list is not in GROUP BY clause and contains nonaggregated column 'test.t.a' which is not functionally dependent on columns in GROUP BY clause; this is incompatible with sql_mode=only_full_group_by") - tk.MustExec(`set @@sql_mode = ''`) - tk.MustQuery(`select * from t group by null`) - tk.MustQuery(`select * from v`) -} - -func TestIssues57583(t *testing.T) { - store := testkit.CreateMockStore(t) - tk := testkit.NewTestKit(t, store) - tk.MustExec("use test;") - tk.MustExec("create table t1(id int, v1 int, v2 int, v3 int);") - tk.MustExec(" create table t2(id int, v1 int, v2 int, v3 int);") - tk.MustQuery("explain select t1.id from t1 join t2 on t1.v1 = t2.v2 intersect select t1.id from t1 join t2 on t1.v1 = t2.v2;").Check(testkit.Rows( - "HashJoin_15 6393.60 root semi join, left side:HashAgg_16, equal:[nulleq(test.t1.id, test.t1.id)]", - "├─HashJoin_26(Build) 12487.50 root inner join, equal:[eq(test.t1.v1, test.t2.v2)]", - "│ ├─TableReader_33(Build) 9990.00 root data:Selection_32", - "│ │ └─Selection_32 9990.00 cop[tikv] not(isnull(test.t2.v2))", - "│ │ └─TableFullScan_31 10000.00 cop[tikv] table:t2 keep order:false, stats:pseudo", - "│ └─TableReader_30(Probe) 9990.00 root data:Selection_29", - "│ └─Selection_29 9990.00 cop[tikv] not(isnull(test.t1.v1))", - "│ └─TableFullScan_28 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo", - "└─HashAgg_16(Probe) 7992.00 root group by:test.t1.id, funcs:firstrow(test.t1.id)->test.t1.id", - " └─HashJoin_17 12487.50 root inner join, equal:[eq(test.t1.v1, test.t2.v2)]", - " ├─TableReader_24(Build) 9990.00 root data:Selection_23", - " │ └─Selection_23 9990.00 cop[tikv] not(isnull(test.t2.v2))", - " │ └─TableFullScan_22 10000.00 cop[tikv] table:t2 keep order:false, stats:pseudo", - " └─TableReader_21(Probe) 9990.00 root data:Selection_20", - " └─Selection_20 9990.00 cop[tikv] not(isnull(test.t1.v1))", - " └─TableFullScan_19 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo")) -} - -func TestIssue58476(t *testing.T) { - store := testkit.CreateMockStore(t) - tk := testkit.NewTestKit(t, store) - tk.MustExec("use test;") - tk.MustExec("CREATE TABLE t3 (id int PRIMARY KEY,c1 varchar(256),c2 varchar(256) GENERATED ALWAYS AS (concat(c1, c1)) VIRTUAL,KEY (id));") - tk.MustExec("insert into t3(id, c1) values (50, 'c');") - tk.MustQuery("SELECT /*+ USE_INDEX_MERGE(`t3`)*/ id FROM `t3` WHERE c2 BETWEEN 'a' AND 'b' GROUP BY id HAVING id < 100 or id > 0;").Check(testkit.Rows()) - tk.MustQuery("explain format='brief' SELECT /*+ USE_INDEX_MERGE(`t3`)*/ id FROM `t3` WHERE c2 BETWEEN 'a' AND 'b' GROUP BY id HAVING id < 100 or id > 0;"). - Check(testkit.Rows( - `Projection 249.75 root test.t3.id`, - `└─Selection 249.75 root ge(test.t3.c2, "a"), le(test.t3.c2, "b")`, - ` └─Projection 9990.00 root test.t3.id, test.t3.c2`, - ` └─IndexMerge 9990.00 root type: union`, - ` ├─IndexRangeScan(Build) 3323.33 cop[tikv] table:t3, index:id(id) range:[-inf,100), keep order:false, stats:pseudo`, - ` ├─TableRangeScan(Build) 3333.33 cop[tikv] table:t3 range:(0,+inf], keep order:false, stats:pseudo`, - ` └─TableRowIDScan(Probe) 9990.00 cop[tikv] table:t3 keep order:false, stats:pseudo`)) -} diff --git a/pkg/planner/core/task_base.go b/pkg/planner/core/task_base.go deleted file mode 100644 index 6b32b7b67a034..0000000000000 --- a/pkg/planner/core/task_base.go +++ /dev/null @@ -1,435 +0,0 @@ -// Copyright 2024 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package core - -import ( - "github.com/pingcap/tidb/pkg/expression" - "github.com/pingcap/tidb/pkg/kv" - "github.com/pingcap/tidb/pkg/planner/cardinality" - "github.com/pingcap/tidb/pkg/planner/core/base" - "github.com/pingcap/tidb/pkg/planner/core/cost" - "github.com/pingcap/tidb/pkg/planner/property" - "github.com/pingcap/tidb/pkg/statistics" - "github.com/pingcap/tidb/pkg/util/logutil" - "github.com/pingcap/tidb/pkg/util/size" - "github.com/pingcap/tipb/go-tipb" - "go.uber.org/zap" -) - -var ( - _ base.Task = &RootTask{} - _ base.Task = &MppTask{} - _ base.Task = &CopTask{} -) - -// ************************************* RootTask Start ****************************************** - -// RootTask is the final sink node of a plan graph. It should be a single goroutine on tidb. -type RootTask struct { - p base.PhysicalPlan - isEmpty bool // isEmpty indicates if this task contains a dual table and returns empty data. - // TODO: The flag 'isEmpty' is only checked by Projection and UnionAll. We should support more cases in the future. -} - -// GetPlan returns the root task's plan. -func (t *RootTask) GetPlan() base.PhysicalPlan { - return t.p -} - -// SetPlan sets the root task' plan. -func (t *RootTask) SetPlan(p base.PhysicalPlan) { - t.p = p -} - -// IsEmpty indicates whether root task is empty. -func (t *RootTask) IsEmpty() bool { - return t.isEmpty -} - -// SetEmpty set the root task as empty. -func (t *RootTask) SetEmpty(x bool) { - t.isEmpty = x -} - -// Copy implements Task interface. -func (t *RootTask) Copy() base.Task { - return &RootTask{ - p: t.p, - } -} - -// ConvertToRootTask implements Task interface. -func (t *RootTask) ConvertToRootTask(_ base.PlanContext) base.Task { - return t.Copy().(*RootTask) -} - -// Invalid implements Task interface. -func (t *RootTask) Invalid() bool { - return t.p == nil -} - -// Count implements Task interface. -func (t *RootTask) Count() float64 { - return t.p.StatsInfo().RowCount -} - -// Plan implements Task interface. -func (t *RootTask) Plan() base.PhysicalPlan { - return t.p -} - -// MemoryUsage return the memory usage of rootTask -func (t *RootTask) MemoryUsage() (sum int64) { - if t == nil { - return - } - sum = size.SizeOfInterface + size.SizeOfBool - if t.p != nil { - sum += t.p.MemoryUsage() - } - return sum -} - -// ************************************* RootTask End ****************************************** - -// ************************************* MPPTask Start ****************************************** - -// MppTask can not : -// 1. keep order -// 2. support double read -// 3. consider virtual columns. -// 4. TODO: partition prune after close -type MppTask struct { - p base.PhysicalPlan - - partTp property.MPPPartitionType - hashCols []*property.MPPPartitionColumn - - // rootTaskConds record filters of TableScan that cannot be pushed down to TiFlash. - - // For logical plan like: HashAgg -> Selection -> TableScan, if filters in Selection cannot be pushed to TiFlash. - // Planner will generate physical plan like: PhysicalHashAgg -> PhysicalSelection -> TableReader -> PhysicalTableScan(cop tiflash) - // Because planner will make mppTask invalid directly then use copTask directly. - - // But in DisaggregatedTiFlash mode, cop and batchCop protocol is disabled, so we have to consider this situation for mppTask. - // When generating PhysicalTableScan, if prop.TaskTp is RootTaskType, mppTask will be converted to rootTask, - // and filters in rootTaskConds will be added in a Selection which will be executed in TiDB. - // So physical plan be like: PhysicalHashAgg -> PhysicalSelection -> TableReader -> ExchangeSender -> PhysicalTableScan(mpp tiflash) - rootTaskConds []expression.Expression - tblColHists *statistics.HistColl -} - -// Count implements Task interface. -func (t *MppTask) Count() float64 { - return t.p.StatsInfo().RowCount -} - -// Copy implements Task interface. -func (t *MppTask) Copy() base.Task { - nt := *t - return &nt -} - -// Plan implements Task interface. -func (t *MppTask) Plan() base.PhysicalPlan { - return t.p -} - -// Invalid implements Task interface. -func (t *MppTask) Invalid() bool { - return t.p == nil -} - -// ConvertToRootTask implements Task interface. -func (t *MppTask) ConvertToRootTask(ctx base.PlanContext) base.Task { - return t.Copy().(*MppTask).ConvertToRootTaskImpl(ctx) -} - -// MemoryUsage return the memory usage of mppTask -func (t *MppTask) MemoryUsage() (sum int64) { - if t == nil { - return - } - - sum = size.SizeOfInterface + size.SizeOfInt + size.SizeOfSlice + int64(cap(t.hashCols))*size.SizeOfPointer - if t.p != nil { - sum += t.p.MemoryUsage() - } - return -} - -// ConvertToRootTaskImpl implements Task interface. -func (t *MppTask) ConvertToRootTaskImpl(ctx base.PlanContext) *RootTask { - // In disaggregated-tiflash mode, need to consider generated column. - tryExpandVirtualColumn(t.p) - sender := PhysicalExchangeSender{ - ExchangeType: tipb.ExchangeType_PassThrough, - }.Init(ctx, t.p.StatsInfo()) - sender.SetChildren(t.p) - - p := PhysicalTableReader{ - tablePlan: sender, - StoreType: kv.TiFlash, - }.Init(ctx, t.p.QueryBlockOffset()) - p.SetStats(t.p.StatsInfo()) - collectPartitionInfosFromMPPPlan(p, t.p) - rt := &RootTask{} - rt.SetPlan(p) - - if len(t.rootTaskConds) > 0 { - // Some Filter cannot be pushed down to TiFlash, need to add Selection in rootTask, - // so this Selection will be executed in TiDB. - _, isTableScan := t.p.(*PhysicalTableScan) - _, isSelection := t.p.(*PhysicalSelection) - if isSelection { - _, isTableScan = t.p.Children()[0].(*PhysicalTableScan) - } - if !isTableScan { - // Need to make sure oriTaskPlan is TableScan, because rootTaskConds is part of TableScan.FilterCondition. - // It's only for TableScan. This is ensured by converting mppTask to rootTask just after TableScan is built, - // so no other operators are added into this mppTask. - logutil.BgLogger().Error("expect Selection or TableScan for mppTask.p", zap.String("mppTask.p", t.p.TP())) - return base.InvalidTask.(*RootTask) - } - selectivity, _, err := cardinality.Selectivity(ctx, t.tblColHists, t.rootTaskConds, nil) - if err != nil { - logutil.BgLogger().Debug("calculate selectivity failed, use selection factor", zap.Error(err)) - selectivity = cost.SelectionFactor - } - sel := PhysicalSelection{Conditions: t.rootTaskConds}.Init(ctx, rt.GetPlan().StatsInfo().Scale(selectivity), rt.GetPlan().QueryBlockOffset()) - sel.fromDataSource = true - sel.SetChildren(rt.GetPlan()) - rt.SetPlan(sel) - } - return rt -} - -// ************************************* MPPTask End ****************************************** - -// ************************************* CopTask Start ****************************************** - -// CopTask is a task that runs in a distributed kv store. -// TODO: In future, we should split copTask to indexTask and tableTask. -type CopTask struct { - indexPlan base.PhysicalPlan - tablePlan base.PhysicalPlan - // indexPlanFinished means we have finished index plan. - indexPlanFinished bool - // keepOrder indicates if the plan scans data by order. - keepOrder bool - // needExtraProj means an extra prune is needed because - // in double read / index merge cases, they may output one more column for handle(row id). - needExtraProj bool - // originSchema is the target schema to be projected to when needExtraProj is true. - originSchema *expression.Schema - - extraHandleCol *expression.Column - commonHandleCols []*expression.Column - // tblColHists stores the original stats of DataSource, it is used to get - // average row width when computing network cost. - tblColHists *statistics.HistColl - // tblCols stores the original columns of DataSource before being pruned, it - // is used to compute average row width when computing scan cost. - tblCols []*expression.Column - - idxMergePartPlans []base.PhysicalPlan - idxMergeIsIntersection bool - idxMergeAccessMVIndex bool - - // rootTaskConds stores select conditions containing virtual columns. - // These conditions can't push to TiKV, so we have to add a selection for rootTask - rootTaskConds []expression.Expression - - // For table partition. - physPlanPartInfo *PhysPlanPartInfo - - // expectCnt is the expected row count of upper task, 0 for unlimited. - // It's used for deciding whether using paging distsql. - expectCnt uint64 -} - -// Invalid implements Task interface. -func (t *CopTask) Invalid() bool { - return t.tablePlan == nil && t.indexPlan == nil && len(t.idxMergePartPlans) == 0 -} - -// Count implements Task interface. -func (t *CopTask) Count() float64 { - if t.indexPlanFinished { - return t.tablePlan.StatsInfo().RowCount - } - return t.indexPlan.StatsInfo().RowCount -} - -// Copy implements Task interface. -func (t *CopTask) Copy() base.Task { - nt := *t - return &nt -} - -// Plan implements Task interface. -// copTask plan should be careful with indexMergeReader, whose real plan is stored in -// idxMergePartPlans, when its indexPlanFinished is marked with false. -func (t *CopTask) Plan() base.PhysicalPlan { - if t.indexPlanFinished { - return t.tablePlan - } - return t.indexPlan -} - -// MemoryUsage return the memory usage of copTask -func (t *CopTask) MemoryUsage() (sum int64) { - if t == nil { - return - } - - sum = size.SizeOfInterface*(2+int64(cap(t.idxMergePartPlans)+cap(t.rootTaskConds))) + size.SizeOfBool*3 + size.SizeOfUint64 + - size.SizeOfPointer*(3+int64(cap(t.commonHandleCols)+cap(t.tblCols))) + size.SizeOfSlice*4 + t.physPlanPartInfo.MemoryUsage() - if t.indexPlan != nil { - sum += t.indexPlan.MemoryUsage() - } - if t.tablePlan != nil { - sum += t.tablePlan.MemoryUsage() - } - if t.originSchema != nil { - sum += t.originSchema.MemoryUsage() - } - if t.extraHandleCol != nil { - sum += t.extraHandleCol.MemoryUsage() - } - - for _, col := range t.commonHandleCols { - sum += col.MemoryUsage() - } - for _, col := range t.tblCols { - sum += col.MemoryUsage() - } - for _, p := range t.idxMergePartPlans { - sum += p.MemoryUsage() - } - for _, expr := range t.rootTaskConds { - sum += expr.MemoryUsage() - } - return -} - -// ConvertToRootTask implements Task interface. -func (t *CopTask) ConvertToRootTask(ctx base.PlanContext) base.Task { - // copy one to avoid changing itself. - return t.Copy().(*CopTask).convertToRootTaskImpl(ctx) -} - -func (t *CopTask) convertToRootTaskImpl(ctx base.PlanContext) *RootTask { - // copTasks are run in parallel, to make the estimated cost closer to execution time, we amortize - // the cost to cop iterator workers. According to `CopClient::Send`, the concurrency - // is Min(DistSQLScanConcurrency, numRegionsInvolvedInScan), since we cannot infer - // the number of regions involved, we simply use DistSQLScanConcurrency. - t.finishIndexPlan() - // Network cost of transferring rows of table scan to TiDB. - if t.tablePlan != nil { - tp := t.tablePlan - for len(tp.Children()) > 0 { - if len(tp.Children()) == 1 { - tp = tp.Children()[0] - } else { - join := tp.(*PhysicalHashJoin) - tp = join.Children()[1-join.InnerChildIdx] - } - } - ts := tp.(*PhysicalTableScan) - prevColumnLen := len(ts.Columns) - prevSchema := ts.schema.Clone() - ts.Columns = ExpandVirtualColumn(ts.Columns, ts.schema, ts.Table.Columns) - if !t.needExtraProj && len(ts.Columns) > prevColumnLen { - // Add a projection to make sure not to output extract columns. - t.needExtraProj = true - t.originSchema = prevSchema - } - } - newTask := &RootTask{} - if t.idxMergePartPlans != nil { - p := PhysicalIndexMergeReader{ - partialPlans: t.idxMergePartPlans, - tablePlan: t.tablePlan, - IsIntersectionType: t.idxMergeIsIntersection, - AccessMVIndex: t.idxMergeAccessMVIndex, - KeepOrder: t.keepOrder, - }.Init(ctx, t.idxMergePartPlans[0].QueryBlockOffset()) - p.PlanPartInfo = t.physPlanPartInfo - setTableScanToTableRowIDScan(p.tablePlan) - newTask.SetPlan(p) - if t.needExtraProj { - schema := t.originSchema - proj := PhysicalProjection{Exprs: expression.Column2Exprs(schema.Columns)}.Init(ctx, p.StatsInfo(), t.idxMergePartPlans[0].QueryBlockOffset(), nil) - proj.SetSchema(schema) - proj.SetChildren(p) - newTask.SetPlan(proj) - } - t.handleRootTaskConds(ctx, newTask) - return newTask - } - if t.indexPlan != nil && t.tablePlan != nil { - newTask = buildIndexLookUpTask(ctx, t) - } else if t.indexPlan != nil { - p := PhysicalIndexReader{indexPlan: t.indexPlan}.Init(ctx, t.indexPlan.QueryBlockOffset()) - p.PlanPartInfo = t.physPlanPartInfo - p.SetStats(t.indexPlan.StatsInfo()) - newTask.SetPlan(p) - } else { - tp := t.tablePlan - for len(tp.Children()) > 0 { - if len(tp.Children()) == 1 { - tp = tp.Children()[0] - } else { - join := tp.(*PhysicalHashJoin) - tp = join.Children()[1-join.InnerChildIdx] - } - } - ts := tp.(*PhysicalTableScan) - p := PhysicalTableReader{ - tablePlan: t.tablePlan, - StoreType: ts.StoreType, - IsCommonHandle: ts.Table.IsCommonHandle, - }.Init(ctx, t.tablePlan.QueryBlockOffset()) - p.PlanPartInfo = t.physPlanPartInfo - p.SetStats(t.tablePlan.StatsInfo()) - - // If agg was pushed down in Attach2Task(), the partial agg was placed on the top of tablePlan, the final agg was - // placed above the PhysicalTableReader, and the schema should have been set correctly for them, the schema of - // partial agg contains the columns needed by the final agg. - // If we add the projection here, the projection will be between the final agg and the partial agg, then the - // schema will be broken, the final agg will fail to find needed columns in ResolveIndices(). - // Besides, the agg would only be pushed down if it doesn't contain virtual columns, so virtual column should not be affected. - aggPushedDown := false - switch p.tablePlan.(type) { - case *PhysicalHashAgg, *PhysicalStreamAgg: - aggPushedDown = true - } - - if t.needExtraProj && !aggPushedDown { - proj := PhysicalProjection{Exprs: expression.Column2Exprs(t.originSchema.Columns)}.Init(ts.SCtx(), ts.StatsInfo(), ts.QueryBlockOffset(), nil) - proj.SetSchema(t.originSchema) - proj.SetChildren(p) - newTask.SetPlan(proj) - } else { - newTask.SetPlan(p) - } - } - - t.handleRootTaskConds(ctx, newTask) - return newTask -} - -// ************************************* CopTask End ****************************************** diff --git a/planner/core/issuetest/BUILD.bazel b/planner/core/issuetest/BUILD.bazel index a6c9ab8a9e24f..ac31cded370be 100644 --- a/planner/core/issuetest/BUILD.bazel +++ b/planner/core/issuetest/BUILD.bazel @@ -6,6 +6,6 @@ go_test( srcs = ["planner_issue_test.go"], flaky = True, race = "on", - shard_count = 8, + shard_count = 9, deps = ["//testkit"], ) diff --git a/planner/core/issuetest/planner_issue_test.go b/planner/core/issuetest/planner_issue_test.go index 4d0397d93b899..7e699b0a30b70 100644 --- a/planner/core/issuetest/planner_issue_test.go +++ b/planner/core/issuetest/planner_issue_test.go @@ -285,3 +285,21 @@ func Test53726(t *testing.T) { " └─TableReader_11 2.00 root data:TableFullScan_10", " └─TableFullScan_10 2.00 cop[tikv] table:t7 keep order:false")) } + +func TestIssue58476(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test;") + tk.MustExec("CREATE TABLE t3 (id int PRIMARY KEY,c1 varchar(256),c2 varchar(256) GENERATED ALWAYS AS (concat(c1, c1)) VIRTUAL,KEY (id));") + tk.MustExec("insert into t3(id, c1) values (50, 'c');") + tk.MustQuery("SELECT /*+ USE_INDEX_MERGE(`t3`)*/ id FROM `t3` WHERE c2 BETWEEN 'a' AND 'b' GROUP BY id HAVING id < 100 or id > 0;").Check(testkit.Rows()) + tk.MustQuery("explain format='brief' SELECT /*+ USE_INDEX_MERGE(`t3`)*/ id FROM `t3` WHERE c2 BETWEEN 'a' AND 'b' GROUP BY id HAVING id < 100 or id > 0;"). + Check(testkit.Rows( + `Projection 249.75 root test.t3.id`, + `└─Selection 249.75 root ge(test.t3.c2, "a"), le(test.t3.c2, "b")`, + ` └─Projection 9990.00 root test.t3.id, test.t3.c2`, + ` └─IndexMerge 9990.00 root type: union`, + ` ├─IndexRangeScan(Build) 3323.33 cop[tikv] table:t3, index:id(id) range:[-inf,100), keep order:false, stats:pseudo`, + ` ├─TableRangeScan(Build) 3333.33 cop[tikv] table:t3 range:(0,+inf], keep order:false, stats:pseudo`, + ` └─TableRowIDScan(Probe) 9990.00 cop[tikv] table:t3 keep order:false, stats:pseudo`)) +} diff --git a/planner/core/task.go b/planner/core/task.go index c805f7b7718d6..813571fe3592a 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -696,7 +696,6 @@ func (t *copTask) convertToRootTaskImpl(ctx sessionctx.Context) *rootTask { p.PartitionInfo = t.partitionInfo setTableScanToTableRowIDScan(p.tablePlan) newTask.p = p - t.handleRootTaskConds(ctx, newTask) if t.needExtraProj { schema := t.originSchema proj := PhysicalProjection{Exprs: expression.Column2Exprs(schema.Columns)}.Init(ctx, p.stats, t.idxMergePartPlans[0].SelectBlockOffset(), nil) @@ -704,6 +703,7 @@ func (t *copTask) convertToRootTaskImpl(ctx sessionctx.Context) *rootTask { proj.SetChildren(p) newTask.p = proj } + t.handleRootTaskConds(ctx, newTask) return newTask } if t.indexPlan != nil && t.tablePlan != nil {