Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix unary expressions. #1697

Merged
merged 2 commits into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,158 @@ class PlannerHierarchySpec extends AnyFunSpec with Matchers with PlanValidationS

private val queryParams = PromQlQueryParams("notUsedQuery", 100, 1, 1000)

it("Plan with unary expression should be equals to its binary counterpart.") {
yu-shipit marked this conversation as resolved.
Show resolved Hide resolved
val lp = Parser.queryRangeToLogicalPlan(
"""-foo{_ws_ = "demo", _ns_ = "localNs"} > -1""",
TimeStepParams(startSeconds, step, endSeconds), Antlr)
val execPlan = rootPlanner.materialize(lp, QueryContext(origQueryParams = queryParams))

val lp2 = Parser.queryRangeToLogicalPlan(
"""(0 - foo{_ws_ = "demo", _ns_ = "localNs"}) > (0 - 1)""",
TimeStepParams(startSeconds, step, endSeconds), Antlr)
val execPlan2 = rootPlanner.materialize(lp2, QueryContext(origQueryParams = queryParams))

validatePlan(execPlan, execPlan2.printTree())
val expected =
"""E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,None,None,None,100,false,false,true,Set(),None,Map(filodb-query-exec-aggregate-large-container -> 65536, filodb-query-exec-metadataexec -> 8192)))
|-E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1613234495],raw)
|--T~ScalarOperationMapper(operator=GTR, scalarOnLhs=false)
|---FA1~
|---E~ScalarBinaryOperationExec(params = RangeParams(1633913330,300,1634777330), operator = SUB, lhs = Left(0.0), rhs = Left(1.0)) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536)))
|---T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|----FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|----T~PeriodicSamplesMapper(start=1634173130000, step=300000, end=1634777330000, window=None, functionId=None, rawSource=true, offsetMs=None)
|-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1634172830000,1634777330000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1613234495],raw)
|--T~ScalarOperationMapper(operator=GTR, scalarOnLhs=false)
|---FA1~
|---E~ScalarBinaryOperationExec(params = RangeParams(1633913330,300,1634777330), operator = SUB, lhs = Left(0.0), rhs = Left(1.0)) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536)))
|---T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|----FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|----T~PeriodicSamplesMapper(start=1634173130000, step=300000, end=1634777330000, window=None, functionId=None, rawSource=true, offsetMs=None)
|-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634172830000,1634777330000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1613234495],raw)
|-E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1613234495],downsample)
|--T~ScalarOperationMapper(operator=GTR, scalarOnLhs=false)
|---FA1~
|---E~ScalarBinaryOperationExec(params = RangeParams(1633913330,300,1634777330), operator = SUB, lhs = Left(0.0), rhs = Left(1.0)) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536)))
|---T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|----FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|----T~PeriodicSamplesMapper(start=1633913330000, step=300000, end=1634172830000, window=None, functionId=None, rawSource=true, offsetMs=None)
|-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1633913030000,1634172830000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1613234495],downsample)
|--T~ScalarOperationMapper(operator=GTR, scalarOnLhs=false)
|---FA1~
|---E~ScalarBinaryOperationExec(params = RangeParams(1633913330,300,1634777330), operator = SUB, lhs = Left(0.0), rhs = Left(1.0)) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536)))
|---T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|----FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|----T~PeriodicSamplesMapper(start=1633913330000, step=300000, end=1634172830000, window=None, functionId=None, rawSource=true, offsetMs=None)
|-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1633913030000,1634172830000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1613234495],downsample)""".stripMargin
validatePlan(execPlan, expected)
}

it("Plan with unary expression should be equals to its binary counterpart2.") {
val unaryExpressions = List("""-foo{_ws_ = "demo", _ns_ = "localNs"}""", """+foo{_ws_ = "demo", _ns_ = "localNs"}""",
"""-(foo{_ws_ = "demo", _ns_ = "localNs"} - bar{_ws_ = "demo", _ns_ = "localNs"})""",
"""+(foo{_ws_ = "demo", _ns_ = "localNs"} - bar{_ws_ = "demo", _ns_ = "localNs"})""")
val binaryExpressions = List("""(0 -foo{_ws_ = "demo", _ns_ = "localNs"})""", """(0 + foo{_ws_ = "demo", _ns_ = "localNs"})""",
"""(0 -(foo{_ws_ = "demo", _ns_ = "localNs"} - bar{_ws_ = "demo", _ns_ = "localNs"}))""",
"""(0 + (foo{_ws_ = "demo", _ns_ = "localNs"} - bar{_ws_ = "demo", _ns_ = "localNs"}))""")
unaryExpressions.zip(binaryExpressions).foreach(
pair => {
val lp1 = Parser.queryRangeToLogicalPlan(pair._1, TimeStepParams(startSeconds, step, endSeconds), Antlr)
val execPlan1 = rootPlanner.materialize(lp1, QueryContext(origQueryParams = queryParams))
val lp2 = Parser.queryRangeToLogicalPlan(pair._2, TimeStepParams(startSeconds, step, endSeconds), Antlr)
val execPlan2 = rootPlanner.materialize(lp2, QueryContext(origQueryParams = queryParams))
validatePlan(execPlan1, execPlan2.printTree())
}
)
}

it("Should be able to handle multiple unary signs.") {
val lp = Parser.queryRangeToLogicalPlan(
"""-+---+-foo{_ws_ = "demo", _ns_ = "localNs"} > ---+--+--1""",
yu-shipit marked this conversation as resolved.
Show resolved Hide resolved
TimeStepParams(startSeconds, step, endSeconds), Antlr)
val execPlan = rootPlanner.materialize(lp, QueryContext(origQueryParams = queryParams))
val expected =
"""E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,None,None,None,100,false,false,true,Set(),None,Map(filodb-query-exec-aggregate-large-container -> 65536, filodb-query-exec-metadataexec -> 8192)))
|-E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#699836628],raw)
|--T~ScalarOperationMapper(operator=GTR, scalarOnLhs=false)
|---FA1~
|---E~ScalarBinaryOperationExec(params = RangeParams(1633913330,300,1634777330), operator = SUB, lhs = Left(0.0), rhs = Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=ADD, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=ADD, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Left(1.0)))))))))) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536)))
|---T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|----FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|----T~ScalarOperationMapper(operator=ADD, scalarOnLhs=true)
|-----FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|-----T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|------T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|-------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|-------T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|--------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|--------T~ScalarOperationMapper(operator=ADD, scalarOnLhs=true)
|---------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|---------T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|----------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|----------T~PeriodicSamplesMapper(start=1634173130000, step=300000, end=1634777330000, window=None, functionId=None, rawSource=true, offsetMs=None)
|-----------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1634172830000,1634777330000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#699836628],raw)
|--T~ScalarOperationMapper(operator=GTR, scalarOnLhs=false)
|---FA1~
|---E~ScalarBinaryOperationExec(params = RangeParams(1633913330,300,1634777330), operator = SUB, lhs = Left(0.0), rhs = Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=ADD, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=ADD, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Left(1.0)))))))))) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536)))
|---T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|----FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|----T~ScalarOperationMapper(operator=ADD, scalarOnLhs=true)
|-----FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|-----T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|------T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|-------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|-------T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|--------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|--------T~ScalarOperationMapper(operator=ADD, scalarOnLhs=true)
|---------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|---------T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|----------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|----------T~PeriodicSamplesMapper(start=1634173130000, step=300000, end=1634777330000, window=None, functionId=None, rawSource=true, offsetMs=None)
|-----------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634172830000,1634777330000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#699836628],raw)
|-E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#699836628],downsample)
|--T~ScalarOperationMapper(operator=GTR, scalarOnLhs=false)
|---FA1~
|---E~ScalarBinaryOperationExec(params = RangeParams(1633913330,300,1634777330), operator = SUB, lhs = Left(0.0), rhs = Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=ADD, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=ADD, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Left(1.0)))))))))) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536)))
|---T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|----FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|----T~ScalarOperationMapper(operator=ADD, scalarOnLhs=true)
|-----FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|-----T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|------T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|-------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|-------T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|--------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|--------T~ScalarOperationMapper(operator=ADD, scalarOnLhs=true)
|---------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|---------T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|----------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|----------T~PeriodicSamplesMapper(start=1633913330000, step=300000, end=1634172830000, window=None, functionId=None, rawSource=true, offsetMs=None)
|-----------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1633913030000,1634172830000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#699836628],downsample)
|--T~ScalarOperationMapper(operator=GTR, scalarOnLhs=false)
|---FA1~
|---E~ScalarBinaryOperationExec(params = RangeParams(1633913330,300,1634777330), operator = SUB, lhs = Left(0.0), rhs = Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=ADD, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=ADD, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Right(params = RangeParams(1633913330,300,1634777330), operator=SUB, lhs=Left(0.0), rhs=Left(1.0)))))))))) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536)))
|---T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|----FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|----T~ScalarOperationMapper(operator=ADD, scalarOnLhs=true)
|-----FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|-----T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|------T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|-------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|-------T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|--------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|--------T~ScalarOperationMapper(operator=ADD, scalarOnLhs=true)
|---------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|---------T~ScalarOperationMapper(operator=SUB, scalarOnLhs=true)
|----------FA1~StaticFuncArgs(0.0,RangeParams(1633913330,300,1634777330))
|----------T~PeriodicSamplesMapper(start=1633913330000, step=300000, end=1634172830000, window=None, functionId=None, rawSource=true, offsetMs=None)
|-----------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1633913030000,1634172830000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#699836628],downsample)""".stripMargin
validatePlan(execPlan, expected)
}

it("should generate plan for one namespace query across raw/downsample") {
val lp = Parser.queryRangeToLogicalPlan(
Expand Down
Loading
Loading