Skip to content

Commit

Permalink
planner: fix cascades about the XFormed operator should derive their …
Browse files Browse the repository at this point in the history
…stats own (#58904)

close #58905
  • Loading branch information
AilinKid authored Jan 23, 2025
1 parent cf4d252 commit 6812b17
Show file tree
Hide file tree
Showing 49 changed files with 448 additions and 148 deletions.
5 changes: 4 additions & 1 deletion pkg/planner/cascades/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,8 @@ go_test(
timeout = "short",
srcs = ["cascades_test.go"],
flaky = True,
deps = ["//pkg/testkit"],
deps = [
"//pkg/testkit",
"@com_github_stretchr_testify//require",
],
)
4 changes: 2 additions & 2 deletions pkg/planner/cascades/cascades.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ type Optimizer struct {
ctx cascadesctx.Context
}

// NewCascades return a new cascades obj for logical alternative searching.
func NewCascades(lp corebase.LogicalPlan) (*Optimizer, error) {
// NewOptimizer return a new cascades obj for logical alternative searching.
func NewOptimizer(lp corebase.LogicalPlan) (*Optimizer, error) {
cas := &Optimizer{
logic: lp,
ctx: NewContext(lp.SCtx()),
Expand Down
41 changes: 41 additions & 0 deletions pkg/planner/cascades/cascades_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"testing"

"github.com/pingcap/tidb/pkg/testkit"
"github.com/stretchr/testify/require"
)

func TestCascadesDrive(t *testing.T) {
Expand All @@ -36,3 +37,43 @@ func TestCascadesDrive(t *testing.T) {
"Projection_3 1.00 root 1->Column#1",
"└─TableDual_4 1.00 root rows:1"))
}

func TestXFormedOperatorShouldDeriveTheirStatsOwn(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")

tk.MustExec("CREATE TABLE t1 ( a1 int DEFAULT NULL, b1 int DEFAULT NULL, c1 int DEFAULT NULL)")
tk.MustExec("CREATE TABLE t2 ( a2 int DEFAULT NULL, b2 int DEFAULT NULL, KEY idx (a2))")
// currently we only pull the correlated condition up to apply itself, but we don't convert it to join actively.
// we just left cascades to it to xform and generate a join operator.
tk.MustExec("INSERT INTO t1 (a1, b1, c1) VALUES (1, 2, 3), (4, NULL, 5), (NULL, 6, 7), (8, 9, NULL), (10, 11, 12);")
tk.MustExec("INSERT INTO t2 values (1,1),(2,2),(3,3)")
for i := 0; i < 10; i++ {
tk.MustExec("INSERT INTO t2 select * from t2")
}
tk.MustExec("analyze table t1, t2")
tk.Session().GetSessionVars().SetEnableCascadesPlanner(false)
res1 := tk.MustQuery("explain format=\"brief\" SELECT 1 FROM t1 AS tab WHERE (EXISTS(SELECT 1 FROM t2 WHERE a2 = a1 ))").String()
tk.Session().GetSessionVars().SetEnableCascadesPlanner(true)
res2 := tk.MustQuery("explain format=\"brief\" SELECT 1 FROM t1 AS tab WHERE (EXISTS(SELECT 1 FROM t2 WHERE a2 = a1 ))").String()
require.Equal(t, res1, res2)

tk.Session().GetSessionVars().SetEnableCascadesPlanner(false)
res1 = tk.MustQuery("explain format=\"brief\" SELECT /*+ inl_join(tab, t2@sel_2) */ 1 FROM t1 AS tab WHERE (EXISTS(SELECT 1 FROM t2 WHERE a2 = a1 ))").String()
tk.Session().GetSessionVars().SetEnableCascadesPlanner(true)
res2 = tk.MustQuery("explain format=\"brief\" SELECT /*+ inl_join(tab, t2@sel_2) */ 1 FROM t1 AS tab WHERE (EXISTS(SELECT 1 FROM t2 WHERE a2 = a1 ))").String()
require.Equal(t, res1, res2)

tk.Session().GetSessionVars().SetEnableCascadesPlanner(false)
res1 = tk.MustQuery("explain format=\"brief\" SELECT /*+ inl_hash_join(tab, t2@sel_2) */ 1 FROM t1 AS tab WHERE (EXISTS(SELECT 1 FROM t2 WHERE a2 = a1 ))").String()
tk.Session().GetSessionVars().SetEnableCascadesPlanner(true)
res2 = tk.MustQuery("explain format=\"brief\" SELECT /*+ inl_hash_join(tab, t2@sel_2) */ 1 FROM t1 AS tab WHERE (EXISTS(SELECT 1 FROM t2 WHERE a2 = a1 ))").String()
require.Equal(t, res1, res2)

tk.Session().GetSessionVars().SetEnableCascadesPlanner(false)
res1 = tk.MustQuery("explain format=\"brief\" SELECT /*+ hash_join(tab, t2@sel_2) */ 1 FROM t1 AS tab WHERE (EXISTS(SELECT 1 FROM t2 WHERE a2 = a1 ))").String()
tk.Session().GetSessionVars().SetEnableCascadesPlanner(true)
res2 = tk.MustQuery("explain format=\"brief\" SELECT /*+ hash_join(tab, t2@sel_2) */ 1 FROM t1 AS tab WHERE (EXISTS(SELECT 1 FROM t2 WHERE a2 = a1 ))").String()
require.Equal(t, res1, res2)
}
7 changes: 6 additions & 1 deletion pkg/planner/cascades/memo/group_expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,11 @@ func (e *GroupExpression) addr() unsafe.Pointer {
return unsafe.Pointer(e)
}

// GetWrappedLogicalPlan overrides the logical plan interface implemented by BaseLogicalPlan.
func (e *GroupExpression) GetWrappedLogicalPlan() base.LogicalPlan {
return e.LogicalPlan
}

// DeriveLogicalProp derive the new group's logical property from a specific GE.
// DeriveLogicalProp is not called with recursive, because we only examine and
// init new group from bottom-up, so we can sure that this new group's children
Expand Down Expand Up @@ -195,7 +200,7 @@ func (e *GroupExpression) DeriveLogicalProp() (err error) {
})
if !skipDeriveStats {
// here can only derive the basic stats from bottom up, we can't pass any colGroups required by parents.
tmpStats, err = e.LogicalPlan.DeriveStats(childStats, tmpSchema, childSchema)
tmpStats, _, err = e.LogicalPlan.DeriveStats(childStats, tmpSchema, childSchema, nil)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/planner/cascades/old/optimize.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func (opt *Optimizer) fillGroupStats(g *memo.Group) (err error) {
childSchema[i] = childGroup.Prop.Schema
}
planNode := expr.ExprNode
g.Prop.Stats, err = planNode.DeriveStats(childStats, g.Prop.Schema, childSchema)
g.Prop.Stats, _, err = planNode.DeriveStats(childStats, g.Prop.Schema, childSchema, nil)
return err
}

Expand Down
17 changes: 17 additions & 0 deletions pkg/planner/cascades/rule/apply/decorrelate_apply/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "decorrelate_apply",
srcs = ["xf_decorrelate_apply.go"],
importpath = "github.com/pingcap/tidb/pkg/planner/cascades/rule/apply/decorrelate_apply",
visibility = ["//visibility:public"],
deps = [
"//pkg/planner/cascades/pattern",
"//pkg/planner/cascades/rule",
"//pkg/planner/core/base",
"//pkg/planner/core/operator/logicalop",
"//pkg/planner/util/coreusage",
"//pkg/util/intest",
"//pkg/util/plancodec",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package decorrelateapply

import (
"github.com/pingcap/tidb/pkg/planner/cascades/pattern"
"github.com/pingcap/tidb/pkg/planner/cascades/rule"
corebase "github.com/pingcap/tidb/pkg/planner/core/base"
"github.com/pingcap/tidb/pkg/planner/core/operator/logicalop"
"github.com/pingcap/tidb/pkg/planner/util/coreusage"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/plancodec"
)

var _ rule.Rule = &XFDeCorrelateApply{}

// XFDeCorrelateApply pull the correlated expression from projection as child of apply.
type XFDeCorrelateApply struct {
*rule.BaseRule
}

// NewXFDeCorrelateApply creates a new XFDeCorrelateApply rule.
func NewXFDeCorrelateApply() *XFDeCorrelateApply {
pa := pattern.NewPattern(pattern.OperandApply, pattern.EngineTiDBOnly)
pa.SetChildren(pattern.NewPattern(pattern.OperandAny, pattern.EngineTiDBOnly), pattern.NewPattern(pattern.OperandAny, pattern.EngineTiDBOnly))
return &XFDeCorrelateApply{
BaseRule: rule.NewBaseRule(rule.XFDeCorrelateApply, pa),
}
}

// Match implements the Rule interface.
func (*XFDeCorrelateApply) Match(_ corebase.LogicalPlan) bool {
return true
}

// XForm implements the Rule interface.
func (*XFDeCorrelateApply) XForm(applyGE corebase.LogicalPlan) ([]corebase.LogicalPlan, error) {
children := applyGE.Children()
outerPlanGE := children[0]
innerPlanGE := children[1]
// don't modify the apply op's CorCols in-place, which will change the hash64, apply should be re-inserted into the group otherwise.
corCols := coreusage.ExtractCorColumnsBySchema4LogicalPlan(innerPlanGE.GetWrappedLogicalPlan(), outerPlanGE.GetWrappedLogicalPlan().Schema())
if len(corCols) == 0 {
apply := applyGE.GetWrappedLogicalPlan().(*logicalop.LogicalApply)
// If the inner plan is non-correlated, this apply will be simplified to join.
clonedJoin := apply.LogicalJoin
// Reset4Cascades is to reset the plan for cascades.
// reset the tp and self, stats to nil, recreate the task map, re-alloc the plan id and so on.
// set the new GE's stats to nil, since the inherited stats is not precious, which will be filled in physicalOpt.
clonedJoin.Reset4Cascades(plancodec.TypeJoin, &clonedJoin)
intest.Assert(clonedJoin.Children() != nil)
return []corebase.LogicalPlan{&clonedJoin}, nil
}
return nil, nil
}
22 changes: 11 additions & 11 deletions pkg/planner/cascades/rule/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package rule
import (
"github.com/pingcap/tidb/pkg/planner/cascades/pattern"
"github.com/pingcap/tidb/pkg/planner/cascades/util"
"github.com/pingcap/tidb/pkg/planner/core/base"
corebase "github.com/pingcap/tidb/pkg/planner/core/base"
)

var _ Rule = &BaseRule{}
Expand All @@ -34,26 +34,26 @@ type Rule interface {
Pattern() *pattern.Pattern

// PreCheck check the admission of the group expression output from binder.
PreCheck(base.LogicalPlan) bool
PreCheck(corebase.LogicalPlan) bool

// Match checks whether the GroupExpr satisfies all the requirements of the specific rule.
// The pattern only identifies the operator type, some transformation rules also need
// detailed information for certain plan operators to decide whether it is applicable.
Match(base.LogicalPlan) bool
Match(corebase.LogicalPlan) bool

// XForm does the real work of the optimization rule. The returned group expressions list
// indicates the new GroupExprs generated by the transformation rule.
XForm(_ base.LogicalPlan) ([]base.LogicalPlan, error)
// XForm does the real work of the optimization rule.
// first value: GE list indicates the new GroupExprs generated by the transformation rule.
XForm(_ corebase.LogicalPlan) ([]corebase.LogicalPlan, error)
}

// BaseRule is the abstract parent class of rule.
type BaseRule struct {
tp ruleType
tp Type
pattern *pattern.Pattern
}

// NewBaseRule creates a new BaseRule.
func NewBaseRule(tp ruleType, pattern *pattern.Pattern) *BaseRule {
func NewBaseRule(tp Type, pattern *pattern.Pattern) *BaseRule {
return &BaseRule{
tp: tp,
pattern: pattern,
Expand All @@ -77,16 +77,16 @@ func (r *BaseRule) Pattern() *pattern.Pattern {
}

// PreCheck implements the Rule interface.
func (*BaseRule) PreCheck(_ base.LogicalPlan) bool {
func (*BaseRule) PreCheck(_ corebase.LogicalPlan) bool {
return true
}

// Match implements Rule interface.
func (*BaseRule) Match(_ base.LogicalPlan) bool {
func (*BaseRule) Match(_ corebase.LogicalPlan) bool {
return true
}

// XForm implements Rule interface.
func (*BaseRule) XForm(_ base.LogicalPlan) ([]base.LogicalPlan, error) {
func (*BaseRule) XForm(_ corebase.LogicalPlan) ([]corebase.LogicalPlan, error) {
return nil, nil
}
11 changes: 8 additions & 3 deletions pkg/planner/cascades/rule/rule_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,22 @@

package rule

type ruleType int
// Type indicates the rule type.
type Type int

const (
// DefaultNone indicates this is none rule.
DefaultNone ruleType = iota
DefaultNone Type = iota
// XFJoinToApply refers to join to a apply rule.
XFJoinToApply
// XFDeCorrelateApply try to decorate apply to a join.
XFDeCorrelateApply
// XFPullCorrExprsFromProj try to pull correlated expression from proj from outer child of a apply.
XFPullCorrExprsFromProj
)

// String implements the fmt.Stringer interface.
func (tp *ruleType) String() string {
func (tp *Type) String() string {
switch *tp {
case XFJoinToApply:
return "join_to_apply"
Expand Down
13 changes: 13 additions & 0 deletions pkg/planner/cascades/rule/ruleset/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "ruleset",
srcs = ["rule_set.go"],
importpath = "github.com/pingcap/tidb/pkg/planner/cascades/rule/ruleset",
visibility = ["//visibility:public"],
deps = [
"//pkg/planner/cascades/pattern",
"//pkg/planner/cascades/rule",
"//pkg/planner/cascades/rule/apply/decorrelate_apply",
],
)
28 changes: 28 additions & 0 deletions pkg/planner/cascades/rule/ruleset/rule_set.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ruleset

import (
"github.com/pingcap/tidb/pkg/planner/cascades/pattern"
"github.com/pingcap/tidb/pkg/planner/cascades/rule"
"github.com/pingcap/tidb/pkg/planner/cascades/rule/apply/decorrelate_apply"
)

// DefaultRuleSet is default set of a series of rules.
var DefaultRuleSet = map[pattern.Operand][]rule.Rule{
pattern.OperandApply: {
decorrelateapply.NewXFDeCorrelateApply(),
},
}
2 changes: 2 additions & 0 deletions pkg/planner/cascades/task/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ go_library(
"//pkg/planner/cascades/base",
"//pkg/planner/cascades/base/cascadesctx",
"//pkg/planner/cascades/memo",
"//pkg/planner/cascades/pattern",
"//pkg/planner/cascades/rule",
"//pkg/planner/cascades/rule/ruleset",
"//pkg/planner/cascades/util",
],
)
Expand Down
3 changes: 2 additions & 1 deletion pkg/planner/cascades/task/task_apply_rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ func (a *ApplyRuleTask) Execute() error {
}
pa := a.rule.Pattern()
binder := rule.NewBinder(pa, a.gE)
for holder := binder.Next(); holder != nil; {
holder := binder.Next()
for ; holder != nil; holder = binder.Next() {
if !a.rule.PreCheck(holder) {
continue
}
Expand Down
11 changes: 6 additions & 5 deletions pkg/planner/cascades/task/task_opt_group_expression.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import (
"github.com/pingcap/tidb/pkg/planner/cascades/base"
"github.com/pingcap/tidb/pkg/planner/cascades/base/cascadesctx"
"github.com/pingcap/tidb/pkg/planner/cascades/memo"
"github.com/pingcap/tidb/pkg/planner/cascades/pattern"
"github.com/pingcap/tidb/pkg/planner/cascades/rule"
"github.com/pingcap/tidb/pkg/planner/cascades/rule/ruleset"
"github.com/pingcap/tidb/pkg/planner/cascades/util"
)

Expand All @@ -42,8 +44,8 @@ func NewOptGroupExpressionTask(ctx cascadesctx.Context, ge *memo.GroupExpression

// Execute implements the task.Execute interface.
func (ge *OptGroupExpressionTask) Execute() error {
ruleList := ge.getValidRules()
for _, one := range ruleList {
ruleMap := ge.getValidRules()
for _, one := range ruleMap[pattern.GetOperand(ge.groupExpression.GetWrappedLogicalPlan())] {
ge.Push(NewApplyRuleTask(ge.ctx, ge.groupExpression, one))
}
// since it's a stack-order, LIFO, when we want to apply a rule for a specific group expression,
Expand All @@ -62,7 +64,6 @@ func (ge *OptGroupExpressionTask) Desc(w util.StrBufferWriter) {
}

// getValidRules filter the allowed rule from session variable, and system config.
func (*OptGroupExpressionTask) getValidRules() []rule.Rule {
// todo: add rule set
return []rule.Rule{}
func (*OptGroupExpressionTask) getValidRules() map[pattern.Operand][]rule.Rule {
return ruleset.DefaultRuleSet
}
8 changes: 6 additions & 2 deletions pkg/planner/core/base/plan_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,12 +240,12 @@ type LogicalPlan interface {
PullUpConstantPredicates() []expression.Expression

// RecursiveDeriveStats derives statistic info between plans.
RecursiveDeriveStats(colGroups [][]*expression.Column) (*property.StatsInfo, error)
RecursiveDeriveStats(colGroups [][]*expression.Column) (*property.StatsInfo, bool, error)

// DeriveStats derives statistic info for current plan node given child stats.
// We need selfSchema, childSchema here because it makes this method can be used in
// cascades planner, where LogicalPlan might not record its children or schema.
DeriveStats(childStats []*property.StatsInfo, selfSchema *expression.Schema, childSchema []*expression.Schema) (*property.StatsInfo, error)
DeriveStats(childStats []*property.StatsInfo, selfSchema *expression.Schema, childSchema []*expression.Schema, reloads []bool) (*property.StatsInfo, bool, error)

// ExtractColGroups extracts column groups from child operator whose DNVs are required by the current operator.
// For example, if current operator is LogicalAggregation of `Group By a, b`, we indicate the child operators to maintain
Expand Down Expand Up @@ -299,4 +299,8 @@ type LogicalPlan interface {

// GetPlanIDsHash set sub operator tree's ids hash64
GetPlanIDsHash() uint64

// GetWrappedLogicalPlan return the wrapped logical plan inside a group expression.
// For logicalPlan implementation, it just returns itself as well.
GetWrappedLogicalPlan() LogicalPlan
}
Loading

0 comments on commit 6812b17

Please sign in to comment.