Skip to content

Commit

Permalink
fix unary expressions.
Browse files Browse the repository at this point in the history
1. support queries with multiple unary sign such as -+--foo
2. fix rhs scalar unary expression. Basically, the right plan for this case should be
ScalarBinaryOperationExec instead of BinaryJoinExec.
  • Loading branch information
Yu Zhang committed Dec 14, 2023
1 parent 0a91fd8 commit af30a69
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,140 @@ class PlannerHierarchySpec extends AnyFunSpec with Matchers with PlanValidationS

private val queryParams = PromQlQueryParams("notUsedQuery", 100, 1, 1000)

it("Plan with unary expression should be equals to its binary counterpart.") {
val lp = Parser.queryRangeToLogicalPlan(
"""-foo{_ws_ = "demo", _ns_ = "localNs"} > -1""",
TimeStepParams(startSeconds, step, endSeconds), Antlr)
val execPlan = rootPlanner.materialize(lp, QueryContext(origQueryParams = queryParams))

val lp2 = Parser.queryRangeToLogicalPlan(
"""(0 - foo{_ws_ = "demo", _ns_ = "localNs"}) > (0 - 1)""",
TimeStepParams(startSeconds, step, endSeconds), Antlr)
val execPlan2 = rootPlanner.materialize(lp2, QueryContext(origQueryParams = queryParams))

validatePlan(execPlan, execPlan2.printTree())
val expected =
"""E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,None,None,None,100,false,false,true,Set(),None,Map(filodb-query-exec-aggregate-large-container -> 65536, filodb-query-exec-metadataexec -> 8192)))
|-E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1613234495],raw)
|--T~ScalarOperationMapper(operator=GTR, scalarOnLhs=false)
|---FA1~
|---E~ScalarBinaryOperationExec(params = RangeParams(1633913330,300,1634777330), operator = SUB, lhs = Left(0.0), rhs = Left(1.0)) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536)))
|---T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|----FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|----T~PeriodicSamplesMapper(start=1634173130000, step=300000, end=1634777330000, window=None, functionId=None, rawSource=true, offsetMs=None)
|-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1634172830000,1634777330000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1613234495],raw)
|--T~ScalarOperationMapper(operator=GTR, scalarOnLhs=false)
|---FA1~
|---E~ScalarBinaryOperationExec(params = RangeParams(1633913330,300,1634777330), operator = SUB, lhs = Left(0.0), rhs = Left(1.0)) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536)))
|---T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|----FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|----T~PeriodicSamplesMapper(start=1634173130000, step=300000, end=1634777330000, window=None, functionId=None, rawSource=true, offsetMs=None)
|-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634172830000,1634777330000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1613234495],raw)
|-E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1613234495],downsample)
|--T~ScalarOperationMapper(operator=GTR, scalarOnLhs=false)
|---FA1~
|---E~ScalarBinaryOperationExec(params = RangeParams(1633913330,300,1634777330), operator = SUB, lhs = Left(0.0), rhs = Left(1.0)) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536)))
|---T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|----FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|----T~PeriodicSamplesMapper(start=1633913330000, step=300000, end=1634172830000, window=None, functionId=None, rawSource=true, offsetMs=None)
|-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1633913030000,1634172830000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1613234495],downsample)
|--T~ScalarOperationMapper(operator=GTR, scalarOnLhs=false)
|---FA1~
|---E~ScalarBinaryOperationExec(params = RangeParams(1633913330,300,1634777330), operator = SUB, lhs = Left(0.0), rhs = Left(1.0)) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536)))
|---T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|----FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|----T~PeriodicSamplesMapper(start=1633913330000, step=300000, end=1634172830000, window=None, functionId=None, rawSource=true, offsetMs=None)
|-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1633913030000,1634172830000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1613234495],downsample)""".stripMargin
validatePlan(execPlan, expected)
}

it("Should be able to handle multiple unary signs.") {
val lp = Parser.queryRangeToLogicalPlan(
"""-+---+-foo{_ws_ = "demo", _ns_ = "localNs"} > ---+--+--1""",
TimeStepParams(startSeconds, step, endSeconds), Antlr)
val execPlan = rootPlanner.materialize(lp, QueryContext(origQueryParams = queryParams))
val expected =
"""E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,None,None,None,100,false,false,true,Set(),None,Map(filodb-query-exec-aggregate-large-container -> 65536, filodb-query-exec-metadataexec -> 8192)))
|-E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#699836628],raw)
|--T~ScalarOperationMapper(operator=GTR, scalarOnLhs=false)
|---FA1~
|---E~ScalarBinaryOperationExec(params = RangeParams(1633913330,300,1634777330), operator = SUB, lhs = Left(0.0), rhs = Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=ADD, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=ADD, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Left(1.0)))))))))) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536)))
|---T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|----FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|----T~ScalarOperationMapper(operator=ADD, scalarOnLhs=true)
|-----FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|-----T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|------T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|-------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|-------T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|--------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|--------T~ScalarOperationMapper(operator=ADD, scalarOnLhs=true)
|---------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|---------T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|----------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|----------T~PeriodicSamplesMapper(start=1634173130000, step=300000, end=1634777330000, window=None, functionId=None, rawSource=true, offsetMs=None)
|-----------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1634172830000,1634777330000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#699836628],raw)
|--T~ScalarOperationMapper(operator=GTR, scalarOnLhs=false)
|---FA1~
|---E~ScalarBinaryOperationExec(params = RangeParams(1633913330,300,1634777330), operator = SUB, lhs = Left(0.0), rhs = Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=ADD, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=ADD, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Left(1.0)))))))))) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536)))
|---T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|----FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|----T~ScalarOperationMapper(operator=ADD, scalarOnLhs=true)
|-----FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|-----T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|------T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|-------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|-------T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|--------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|--------T~ScalarOperationMapper(operator=ADD, scalarOnLhs=true)
|---------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|---------T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|----------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|----------T~PeriodicSamplesMapper(start=1634173130000, step=300000, end=1634777330000, window=None, functionId=None, rawSource=true, offsetMs=None)
|-----------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634172830000,1634777330000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#699836628],raw)
|-E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#699836628],downsample)
|--T~ScalarOperationMapper(operator=GTR, scalarOnLhs=false)
|---FA1~
|---E~ScalarBinaryOperationExec(params = RangeParams(1633913330,300,1634777330), operator = SUB, lhs = Left(0.0), rhs = Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=ADD, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=ADD, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Left(1.0)))))))))) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536)))
|---T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|----FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|----T~ScalarOperationMapper(operator=ADD, scalarOnLhs=true)
|-----FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|-----T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|------T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|-------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|-------T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|--------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|--------T~ScalarOperationMapper(operator=ADD, scalarOnLhs=true)
|---------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|---------T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|----------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|----------T~PeriodicSamplesMapper(start=1633913330000, step=300000, end=1634172830000, window=None, functionId=None, rawSource=true, offsetMs=None)
|-----------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1633913030000,1634172830000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#699836628],downsample)
|--T~ScalarOperationMapper(operator=GTR, scalarOnLhs=false)
|---FA1~
|---E~ScalarBinaryOperationExec(params = RangeParams(1633913330,300,1634777330), operator = SUB, lhs = Left(0.0), rhs = Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=ADD, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=ADD, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Left(1.0)))))))))) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536)))
|---T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|----FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|----T~ScalarOperationMapper(operator=ADD, scalarOnLhs=true)
|-----FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|-----T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|------T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|-------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|-------T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|--------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|--------T~ScalarOperationMapper(operator=ADD, scalarOnLhs=true)
|---------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|---------T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|----------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|----------T~PeriodicSamplesMapper(start=1633913330000, step=300000, end=1634172830000, window=None, functionId=None, rawSource=true, offsetMs=None)
|-----------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1633913030000,1634172830000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#699836628],downsample)""".stripMargin
validatePlan(execPlan, expected)
}

it("should generate plan for one namespace query across raw/downsample") {
val lp = Parser.queryRangeToLogicalPlan(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,19 @@ case class UnaryExpression(operator: Operator, operand: Expression) extends Expr
if (operator != Add && operator != Sub) {
throw new IllegalArgumentException(s"operator=$operator is not allowed in expression=$operand")
}
convertUnaryExpression().toSeriesPlan(timeParams)
}

private def convertUnaryExpression() : BinaryExpression = {
// use binary expression to implement the unary operators.
// eg. -foo is implemented through (0 - foo).
BinaryExpression(Scalar(0), operator, None, operand).toSeriesPlan(timeParams)
operand match {
case unaryExpression: UnaryExpression =>
// recursively convert unary expression to a binary expression.
BinaryExpression(Scalar(0), operator, None, unaryExpression.convertUnaryExpression())
case _ =>
BinaryExpression(Scalar(0), operator, None, operand)
}
}
}

Expand Down Expand Up @@ -43,6 +53,7 @@ case class BinaryExpression(lhs: Expression,
def hasScalarResult(expression: Expression): Boolean = {
expression match {
case scalarExpression: ScalarExpression => true
case unaryExpression: UnaryExpression => hasScalarResult(unaryExpression.operand)
case binaryExpression: BinaryExpression => hasScalarResult(binaryExpression.lhs) &&
hasScalarResult(binaryExpression.rhs)
case _ => false
Expand Down
Loading

0 comments on commit af30a69

Please sign in to comment.