Skip to content

Commit

Permalink
Throw exception when number of nodes in logical plan exceeds threshold
Browse files Browse the repository at this point in the history
For some queries, the logical plan can be huge which slows or even kills coordinator during planning phase. In this PR,
I added support which throws exception when number of scan nodes exceed a predefined threshold.
  • Loading branch information
feilong-liu authored and rschlussel committed Sep 8, 2022
1 parent e1ef7db commit 69bd48f
Show file tree
Hide file tree
Showing 14 changed files with 291 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.facebook.presto.sql.analyzer.FeaturesConfig.PartialMergePushdownStrategy;
import com.facebook.presto.sql.analyzer.FeaturesConfig.PartitioningPrecisionStrategy;
import com.facebook.presto.sql.analyzer.FeaturesConfig.SingleStreamSpillerChoice;
import com.facebook.presto.sql.planner.CompilerConfig;
import com.facebook.presto.tracing.TracingConfig;
import com.google.common.collect.ImmutableList;
import io.airlift.units.DataSize;
Expand Down Expand Up @@ -232,6 +233,8 @@ public final class SystemSessionProperties
public static final String SEGMENTED_AGGREGATION_ENABLED = "segmented_aggregation_enabled";
public static final String USE_HISTORY_BASED_PLAN_STATISTICS = "use_history_based_plan_statistics";
public static final String TRACK_HISTORY_BASED_PLAN_STATISTICS = "track_history_based_plan_statistics";
public static final String MAX_LEAF_NODES_IN_PLAN = "max_leaf_nodes_in_plan";
public static final String LEAF_NODE_LIMIT_ENABLED = "leaf_node_limit_enabled";

//TODO: Prestissimo related session properties that are temporarily put here. They will be relocated in the future
public static final String PRESTISSIMO_SIMPLIFIED_EXPRESSION_EVALUATION_ENABLED = "simplified_expression_evaluation_enabled";
Expand All @@ -255,7 +258,8 @@ public SystemSessionProperties()
new WarningCollectorConfig(),
new NodeSchedulerConfig(),
new NodeSpillConfig(),
new TracingConfig());
new TracingConfig(),
new CompilerConfig());
}

@Inject
Expand All @@ -268,7 +272,8 @@ public SystemSessionProperties(
WarningCollectorConfig warningCollectorConfig,
NodeSchedulerConfig nodeSchedulerConfig,
NodeSpillConfig nodeSpillConfig,
TracingConfig tracingConfig)
TracingConfig tracingConfig,
CompilerConfig compilerConfig)
{
sessionProperties = ImmutableList.of(
stringProperty(
Expand Down Expand Up @@ -1319,6 +1324,20 @@ public SystemSessionProperties(
TRACK_HISTORY_BASED_PLAN_STATISTICS,
"Track history based plan statistics service in query optimizer",
featuresConfig.isTrackHistoryBasedPlanStatistics(),
false),
new PropertyMetadata<>(
MAX_LEAF_NODES_IN_PLAN,
"Maximum number of leaf nodes in the logical plan of SQL statement",
INTEGER,
Integer.class,
compilerConfig.getLeafNodeLimit(),
false,
value -> validateIntegerValue(value, MAX_LEAF_NODES_IN_PLAN, 0, false),
object -> object),
booleanProperty(
LEAF_NODE_LIMIT_ENABLED,
"Throw exception if the number of leaf nodes in logical plan exceeds threshold set in max_leaf_nodes_in_plan",
compilerConfig.getLeafNodeLimitEnabled(),
false));
}

Expand Down Expand Up @@ -2165,6 +2184,16 @@ public static boolean isVerboseRuntimeStatsEnabled(Session session)
return session.getSystemProperty(VERBOSE_RUNTIME_STATS_ENABLED, Boolean.class);
}

public static boolean isLeafNodeLimitEnabled(Session session)
{
return session.getSystemProperty(LEAF_NODE_LIMIT_ENABLED, Boolean.class);
}

public static int getMaxLeafNodesInPlan(Session session)
{
return session.getSystemProperty(MAX_LEAF_NODES_IN_PLAN, Integer.class);
}

public static boolean isStreamingForPartialAggregationEnabled(Session session)
{
return session.getSystemProperty(STREAMING_FOR_PARTIAL_AGGREGATION_ENABLED, Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto.sql.planner;

import com.facebook.airlift.configuration.Config;
import com.facebook.airlift.configuration.ConfigDescription;
import com.facebook.airlift.configuration.DefunctConfig;
import com.facebook.presto.spi.function.Description;

Expand All @@ -23,6 +24,8 @@
public class CompilerConfig
{
private int expressionCacheSize = 10_000;
private int leafNodeLimit = 10_000;
private boolean leafNodeLimitEnabled;

@Min(0)
public int getExpressionCacheSize()
Expand All @@ -37,4 +40,30 @@ public CompilerConfig setExpressionCacheSize(int expressionCacheSize)
this.expressionCacheSize = expressionCacheSize;
return this;
}

public int getLeafNodeLimit()
{
return this.leafNodeLimit;
}

@Config("planner.max-leaf-nodes-in-plan")
@ConfigDescription("Maximum number of leaf nodes in logical plan, throw an exception when exceed if leaf-node-limit-enabled is set true")
public CompilerConfig setLeafNodeLimit(int num)
{
this.leafNodeLimit = num;
return this;
}

public boolean getLeafNodeLimitEnabled()
{
return this.leafNodeLimitEnabled;
}

@Config("planner.leaf-node-limit-enabled")
@ConfigDescription("Throw an exception if number of leaf nodes in logical plan exceeds threshold set in max-leaf-nodes-in-plan")
public CompilerConfig setLeafNodeLimitEnabled(boolean enabled)
{
this.leafNodeLimitEnabled = enabled;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,8 @@ private RelationPlan createTableWriterPlan(

private RelationPlan createDeletePlan(Analysis analysis, Delete node)
{
DeleteNode deleteNode = new QueryPlanner(analysis, variableAllocator, idAllocator, buildLambdaDeclarationToVariableMap(analysis, variableAllocator), metadata, session)
SqlPlannerContext context = new SqlPlannerContext(0);
DeleteNode deleteNode = new QueryPlanner(analysis, variableAllocator, idAllocator, buildLambdaDeclarationToVariableMap(analysis, variableAllocator), metadata, session, context)
.plan(node);

TableHandle handle = analysis.getTableHandle(node.getTable());
Expand Down Expand Up @@ -584,8 +585,9 @@ private PlanNode createOutputPlan(RelationPlan plan, Analysis analysis)

private RelationPlan createRelationPlan(Analysis analysis, Query query)
{
SqlPlannerContext context = new SqlPlannerContext(0);
return new RelationPlanner(analysis, variableAllocator, idAllocator, buildLambdaDeclarationToVariableMap(analysis, variableAllocator), metadata, session)
.process(query, null);
.process(query, context);
}

private ConnectorTableMetadata createTableMetadata(QualifiedObjectName table, List<ColumnMetadata> columns, Map<String, Expression> propertyExpressions, Map<NodeRef<Parameter>, Expression> parameters, Optional<String> comment)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,16 @@ class QueryPlanner
private final Metadata metadata;
private final Session session;
private final SubqueryPlanner subqueryPlanner;
private final SqlPlannerContext sqlPlannerContext;

QueryPlanner(
Analysis analysis,
PlanVariableAllocator variableAllocator,
PlanNodeIdAllocator idAllocator,
Map<NodeRef<LambdaArgumentDeclaration>, VariableReferenceExpression> lambdaDeclarationToVariableMap,
Metadata metadata,
Session session)
Session session,
SqlPlannerContext sqlPlannerContext)
{
requireNonNull(analysis, "analysis is null");
requireNonNull(variableAllocator, "variableAllocator is null");
Expand All @@ -138,6 +140,7 @@ class QueryPlanner
this.metadata = metadata;
this.session = session;
this.subqueryPlanner = new SubqueryPlanner(analysis, variableAllocator, idAllocator, lambdaDeclarationToVariableMap, metadata, session);
this.sqlPlannerContext = sqlPlannerContext;
}

public RelationPlan plan(Query query)
Expand Down Expand Up @@ -266,7 +269,7 @@ private static List<VariableReferenceExpression> computeOutputs(PlanBuilder buil
private PlanBuilder planQueryBody(Query query)
{
RelationPlan relationPlan = new RelationPlanner(analysis, variableAllocator, idAllocator, lambdaDeclarationToVariableMap, metadata, session)
.process(query.getQueryBody(), null);
.process(query.getQueryBody(), sqlPlannerContext);

return planBuilderFor(relationPlan);
}
Expand All @@ -277,7 +280,7 @@ private PlanBuilder planFrom(QuerySpecification node)

if (node.getFrom().isPresent()) {
relationPlan = new RelationPlanner(analysis, variableAllocator, idAllocator, lambdaDeclarationToVariableMap, metadata, session)
.process(node.getFrom().get(), null);
.process(node.getFrom().get(), sqlPlannerContext);
}
else {
relationPlan = planImplicitTable();
Expand Down Expand Up @@ -330,7 +333,7 @@ private PlanBuilder filter(PlanBuilder subPlan, Expression predicate, Node node)

// rewrite expressions which contain already handled subqueries
Expression rewrittenBeforeSubqueries = subPlan.rewrite(predicate);
subPlan = subqueryPlanner.handleSubqueries(subPlan, rewrittenBeforeSubqueries, node);
subPlan = subqueryPlanner.handleSubqueries(subPlan, rewrittenBeforeSubqueries, node, sqlPlannerContext);
Expression rewrittenAfterSubqueries = subPlan.rewrite(predicate);

return subPlan.withNewRoot(new FilterNode(getSourceLocation(node), idAllocator.getNextId(), subPlan.getRoot(), castToRowExpression(rewrittenAfterSubqueries)));
Expand Down Expand Up @@ -872,7 +875,7 @@ private PlanBuilder window(PlanBuilder subPlan, List<FunctionCall> windowFunctio
private PlanBuilder handleSubqueries(PlanBuilder subPlan, Node node, Iterable<Expression> inputs)
{
for (Expression input : inputs) {
subPlan = subqueryPlanner.handleSubqueries(subPlan, subPlan.rewrite(input), node);
subPlan = subqueryPlanner.handleSubqueries(subPlan, subPlan.rewrite(input), node, sqlPlannerContext);
}
return subPlan;
}
Expand Down
Loading

0 comments on commit 69bd48f

Please sign in to comment.