From 42f50720a831fdf0223b1e7f2b53f93465a5f19f Mon Sep 17 00:00:00 2001 From: sandeep6189 Date: Sat, 18 Jan 2025 10:45:24 +0530 Subject: [PATCH] Revert "Setting rawSource to true and fixing the unit tests (#1920)" (#1933) This reverts commit 0999aec4c4fa4245a5882a59fed0869fce9ddc10. --- .../queryplanner/DefaultPlanner.scala | 13 +- .../queryplanner/MultiPartitionPlanner.scala | 46 ++--- .../MultiPartitionPlannerSpec.scala | 195 ++++++++---------- .../queryplanner/PlannerHierarchySpec.scala | 18 +- 4 files changed, 120 insertions(+), 152 deletions(-) diff --git a/coordinator/src/main/scala/filodb.coordinator/queryplanner/DefaultPlanner.scala b/coordinator/src/main/scala/filodb.coordinator/queryplanner/DefaultPlanner.scala index 3eb14d830..0e459fa65 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryplanner/DefaultPlanner.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryplanner/DefaultPlanner.scala @@ -122,8 +122,10 @@ trait DefaultPlanner { } else lp val series = walkLogicalPlanTree(logicalPlanWithoutBucket.series, qContext, forceInProcess) - // the series is raw and supports raw export, it's going to yield an iterator - val rawSource = logicalPlanWithoutBucket.series.isRaw + val rawSource = logicalPlanWithoutBucket.series.isRaw && (logicalPlanWithoutBucket.series match { + case r: RawSeries => !r.supportsRemoteDataCall + case _ => true + }) // the series is raw and supports raw export, its going to yield an iterator /* Last function is used to get the latest value in the window for absent_over_time If no data is present AbsentFunctionMapper will return range vector with value 1 */ @@ -201,7 +203,10 @@ trait DefaultPlanner { } else (None, None, lp) val rawSeries = walkLogicalPlanTree(lpWithoutBucket.rawSeries, qContext, forceInProcess) - val rawSource = lpWithoutBucket.rawSeries.isRaw + val rawSource = lpWithoutBucket.rawSeries.isRaw && (lpWithoutBucket.rawSeries match { + case r: RawSeries => !r.supportsRemoteDataCall + case _ => true + }) rawSeries.plans.foreach(_.addRangeVectorTransformer(PeriodicSamplesMapper(lp.startMs, lp.stepMs, lp.endMs, window = None, functionId = None, stepMultipleNotationUsed = false, funcParams = Nil, @@ -827,7 +832,7 @@ object PlannerUtil extends StrictLogging { rewritePlanWithRemoteRawExport(_, rangeSelector, additionalLookbackMs).asInstanceOf[FunctionArgsPlan]), series = rewritePlanWithRemoteRawExport(lp.series, rangeSelector, additionalLookbackMs) .asInstanceOf[RawSeriesLikePlan]) - // won't bother rewriting and adjusting the start and end for metadata calls + // wont bother rewriting and adjusting the start and end for metadata calls case lp: MetadataQueryPlan => lp case lp: TsCardinalities => lp } diff --git a/coordinator/src/main/scala/filodb.coordinator/queryplanner/MultiPartitionPlanner.scala b/coordinator/src/main/scala/filodb.coordinator/queryplanner/MultiPartitionPlanner.scala index 9b09eff66..6c114d07c 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryplanner/MultiPartitionPlanner.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryplanner/MultiPartitionPlanner.scala @@ -118,23 +118,23 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv // MultiPartitionPlanner has capability to stitch across time partitions, however, the logic is mostly broken // and not well tested. The logic below would not work well for any kind of subquery since their actual // start and ends are different from the start/end parameter of the query context. If we are to implement - // stitching across time, we need to pass proper parameters to getPartitions() call + // stitching across time, we need to to pass proper parameters to getPartitions() call if (forceInProcess) { - // If InProcess is required, we will rely on the DefaultPlanner's implementation as the expectation is that the + // If inprocess is required, we will rely on the DefaultPlanner's implementation as the expectation is that the // raw series is doing a remote call to get all the data. logicalPlan match { - case lp: RawSeries if lp.supportsRemoteDataCall => + case lp: RawSeries if lp.supportsRemoteDataCall=> val params = qContext.origQueryParams.asInstanceOf[PromQlQueryParams] val rs = lp.rangeSelector.asInstanceOf[IntervalSelector] val (rawExportStart, rawExportEnd) = (rs.from - lp.offsetMs.getOrElse(0L) - lp.lookbackMs.getOrElse(0L), rs.to - lp.offsetMs.getOrElse(0L)) - val partitions = getPartitions(lp, params) - assert(partitions.nonEmpty, s"Unexpected to see partitions empty for logicalPlan=$lp and param=$params") + val partition = getPartitions(lp, params) + assert(partition.nonEmpty, s"Unexpected to see partitions empty for logicalPlan=$lp and param=$params") // Raw export from both involved partitions for the entire time duration as the shard-key migration // is not guaranteed to happen exactly at the time of split - val execPlans = partitions.map(pa => { + val execPlans = partition.map(pa => { val (thisPartitionStartMs, thisPartitionEndMs) = (rawExportStart, rawExportEnd) val timeRangeOverride = TimeRange(thisPartitionEndMs, thisPartitionEndMs) val totalOffsetThisPartitionMs = thisPartitionEndMs - thisPartitionStartMs @@ -155,7 +155,7 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv , inProcessPlanDispatcher, None, execPlans.sortWith((x, _) => !x.isInstanceOf[PromQlRemoteExec]), enableApproximatelyEqualCheck = queryConfig.routingConfig.enableApproximatelyEqualCheckInStitch) - } + } ) ) case _ : LogicalPlan => super.defaultWalkLogicalPlanTree(logicalPlan, qContext, forceInProcess) @@ -191,7 +191,8 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv generateRemoteExecParams(qContext, startMs, endMs) } // Single partition but remote, send the entire plan remotely - if (isPartitionEnabledForGrpc(partitionName, grpcEndpoint)) { + if (grpcEndpoint.isDefined && !(queryConfig.grpcPartitionsDenyList.contains("*") || + queryConfig.grpcPartitionsDenyList.contains(partitionName.toLowerCase))) { val endpoint = grpcEndpoint.get val channel = channels.getOrElseUpdate(endpoint, GrpcCommonUtils.buildChannelFromEndpoint(endpoint)) PromQLGrpcRemoteExec(channel, remoteHttpTimeoutMs, remoteContext, inProcessPlanDispatcher, @@ -387,17 +388,6 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv PlanResult(execPlan:: Nil) } - /** - * @param partitionName partition name - * @param grpcEndPoint grpc endpoint - * @return true if partition is enabled with an grpc endpoint - */ - private def isPartitionEnabledForGrpc(partitionName: String, grpcEndPoint: Option[String]): Boolean = { - grpcEndPoint.isDefined && - !(queryConfig.grpcPartitionsDenyList.contains("*") || - queryConfig.grpcPartitionsDenyList.contains(partitionName.toLowerCase)) - } - /** * If the argument partition is local, materialize the LogicalPlan with the local planner. * Otherwise, create a PromQlRemoteExec. @@ -425,7 +415,9 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv localPartitionPlanner.materialize(lpWithUpdatedTime, qContextWithOverride) } else { val ctx = generateRemoteExecParams(qContextWithOverride, timeRange.startMs, timeRange.endMs) - if (isPartitionEnabledForGrpc(partitionName, grpcEndpoint)) { + if (grpcEndpoint.isDefined && + !(queryConfig.grpcPartitionsDenyList.contains("*") || + queryConfig.grpcPartitionsDenyList.contains(partitionName.toLowerCase))) { val channel = channels.getOrElseUpdate(grpcEndpoint.get, GrpcCommonUtils.buildChannelFromEndpoint(grpcEndpoint.get)) PromQLGrpcRemoteExec(channel, remoteHttpTimeoutMs, ctx, inProcessPlanDispatcher, @@ -594,10 +586,10 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv val assignmentRanges = getAssignmentQueryRanges(partitions, timeRange, lookbackMs = lookbackMs, offsetMs = offsetMs, stepMsOpt = stepMsOpt) val execPlans = if (assignmentRanges.isEmpty) { - // Assignment ranges empty means we can't run this query fully on one partition and needs - // remote raw export. Check if the total time of raw export is within the limits, if not return Empty result. + // Assignment ranges empty means we cant run this query fully on one partition and needs + // remote raw export Check if the total time of raw export is within the limits, if not return Empty result // While it may seem we don't tune the lookback of the leaf raw queries to exactly what we need from each - // partition, in reality it doesn't matter as despite a longer lookback, the actual data exported will be at most + // partition, in reality it doesnt matter as despite a longer lookback, the actual data exported will be at most // what that partition contains. val (startTime, endTime) = (qParams.startSecs, qParams.endSecs) val totalExpectedRawExport = (endTime - startTime) + lookbackMs + offsetMs.max @@ -617,8 +609,7 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv } Seq(EmptyResultExec(qContext, dataset.ref, inProcessPlanDispatcher)) } - } - else { + } else { // materialize a plan for each range/assignment pair val (_, execPlans) = assignmentRanges.foldLeft( (None: Option[(PartitionAssignment, TimeRange)], ListBuffer.empty[ExecPlan])) { @@ -661,13 +652,14 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv // leaf logical plan) with supportsRemoteDataCall = true figure out if this range can entirely be selected // from partition p1 or p2 // + // Do not perform raw exports if the export is beyond a certain value for example // foo{}[10d] or foo[2d] offset 8d both will export 10 days of raw data which might cause heap pressure // and OOMs. The max cross partition raw export config can control such queries from bring the process // down but simpler queries with few minutes or even hour or two of lookback/offset will continue to work // seamlessly with no data gaps - // Note that at the moment, while planning, we only can look at what's the max time range we can support. - // We still don't have capabilities to check the expected number of timeseries scanned or bytes scanned + // Note that at the moment, while planning, we only can look at whats the max time range we can support. + // We still dont have capabilities to check the expected number of timeseries scanned or bytes scanned // and adding capabilities to give up a "part" of query execution if the runtime number of bytes of ts // scanned goes high isn't available. To start with the time range scanned as a static configuration will // be good enough and can be enhanced in future as required. diff --git a/coordinator/src/test/scala/filodb.coordinator/queryplanner/MultiPartitionPlannerSpec.scala b/coordinator/src/test/scala/filodb.coordinator/queryplanner/MultiPartitionPlannerSpec.scala index 0c8bf6dd3..f40712935 100644 --- a/coordinator/src/test/scala/filodb.coordinator/queryplanner/MultiPartitionPlannerSpec.scala +++ b/coordinator/src/test/scala/filodb.coordinator/queryplanner/MultiPartitionPlannerSpec.scala @@ -82,8 +82,8 @@ class MultiPartitionPlannerSpec extends AnyFunSpec with Matchers with PlanValida def twoPartitions(timeRange: TimeRange): List[PartitionAssignment] = List( PartitionAssignment("remote", "remote-url", TimeRange(startSeconds * 1000 - lookbackMs, - localPartitionStart * 1000 - 1), Some("grpcEndpoint1")), PartitionAssignment("remote2", "remote-url2", - TimeRange(localPartitionStart * 1000, endSeconds * 1000), Some("grpcEndpoint2"))) + localPartitionStart * 1000 - 1)), PartitionAssignment("remote2", "remote-url2", + TimeRange(localPartitionStart * 1000, endSeconds * 1000))) val partitionLocationProvider = new PartitionLocationProvider { override def getPartitions(routingKey: Map[String, String], timeRange: TimeRange): List[PartitionAssignment] = { @@ -101,13 +101,13 @@ class MultiPartitionPlannerSpec extends AnyFunSpec with Matchers with PlanValida val execPlan = engine.materialize(lp, QueryContext(origQueryParams = promQlQueryParams, plannerParams = PlannerParams(processMultiPartition = true))) - val expectedPlanTree = s"""E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000),CachingConfig(true,2048))) - |-E~PromQLGrpcRemoteExec(PromQlQueryParams(test{job = "app"},1000,100,2999,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=grpcEndpoint1.execStreaming, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000),CachingConfig(true,2048))) - |-T~PeriodicSamplesMapper(start=3000000, step=100000, end=3599000, window=None, functionId=None, rawSource=true, offsetMs=None) - |--E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000),CachingConfig(true,2048))) - |---E~PromQLGrpcRemoteExec(PromQlQueryParams(test{job="app"}[900s],3599,1,3599,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=grpcEndpoint2.execStreaming, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000),CachingConfig(true,2048))) - |---E~PromQLGrpcRemoteExec(PromQlQueryParams(test{job="app"}[900s],3599,1,3599,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=grpcEndpoint1.execStreaming, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000),CachingConfig(true,2048))) - |-E~PromQLGrpcRemoteExec(PromQlQueryParams(test{job = "app"},3600,100,10000,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=grpcEndpoint2.execStreaming, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000),CachingConfig(true,2048)))""".stripMargin + val expectedPlanTree = s"""E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000))) + |-E~PromQlRemoteExec(PromQlQueryParams(test{job = "app"},1000,100,2999,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000))) + |-T~PeriodicSamplesMapper(start=3000000, step=100000, end=3599000, window=None, functionId=None, rawSource=false, offsetMs=None) + |--E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000))) + |---E~PromQlRemoteExec(PromQlQueryParams(test{job="app"}[900s],3599,1,3599,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000))) + |---E~PromQlRemoteExec(PromQlQueryParams(test{job="app"}[900s],3599,1,3599,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote-url2, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000))) + |-E~PromQlRemoteExec(PromQlQueryParams(test{job = "app"},3600,100,10000,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote-url2, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000)))""".stripMargin validatePlan(execPlan, expectedPlanTree) } @@ -824,11 +824,10 @@ class MultiPartitionPlannerSpec extends AnyFunSpec with Matchers with PlanValida override def getPartitions(routingKey: Map[String, String], timeRange: TimeRange): List[PartitionAssignment] = { if (routingKey.equals(Map("job" -> "app"))) List(PartitionAssignment("remote1", "remote-url1", TimeRange(startSeconds * 1000 - lookbackMs, - secondPartitionStart * 1000 - 1), Some("grpcEndpoint1")), + secondPartitionStart * 1000 - 1)), PartitionAssignment("remote2", "remote-url2", TimeRange(secondPartitionStart * 1000, - thirdPartitionStart * 1000 - 1), Some("grpcEndpoint2")), - PartitionAssignment("remote3", "remote-url3", TimeRange(thirdPartitionStart * 1000, endSeconds * 1000), - Some("grpcEndpoint3"))) + thirdPartitionStart * 1000 - 1)), + PartitionAssignment("remote3", "remote-url3", TimeRange(thirdPartitionStart * 1000, endSeconds * 1000))) else Nil } override def getMetadataPartitions(nonMetricShardKeyFilters: Seq[ColumnFilter], timeRange: TimeRange): List[PartitionAssignment] = @@ -844,20 +843,20 @@ class MultiPartitionPlannerSpec extends AnyFunSpec with Matchers with PlanValida val execPlan = engine.materialize(lp, QueryContext(origQueryParams = promQlQueryParams, plannerParams = PlannerParams(processMultiPartition = true))) // Even a three partition span works with stitch filling in the missing gaps - val expectedRawPlan = s"""E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000),CachingConfig(true,2048))) - |-E~PromQLGrpcRemoteExec(PromQlQueryParams(test{job = "app"},1000,100,3999,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=grpcEndpoint1.execStreaming, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000),CachingConfig(true,2048))) - |-T~PeriodicSamplesMapper(start=4000000, step=100000, end=4599000, window=None, functionId=None, rawSource=true, offsetMs=None) - |--E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000),CachingConfig(true,2048))) - |---E~PromQLGrpcRemoteExec(PromQlQueryParams(test{job="app"}[900s],4599,1,4599,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=grpcEndpoint3.execStreaming, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000),CachingConfig(true,2048))) - |---E~PromQLGrpcRemoteExec(PromQlQueryParams(test{job="app"}[900s],4599,1,4599,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=grpcEndpoint2.execStreaming, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000),CachingConfig(true,2048))) - |---E~PromQLGrpcRemoteExec(PromQlQueryParams(test{job="app"}[900s],4599,1,4599,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=grpcEndpoint1.execStreaming, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000),CachingConfig(true,2048))) - |-E~PromQLGrpcRemoteExec(PromQlQueryParams(test{job = "app"},4600,100,6999,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=grpcEndpoint2.execStreaming, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000),CachingConfig(true,2048))) - |-T~PeriodicSamplesMapper(start=7000000, step=100000, end=7599000, window=None, functionId=None, rawSource=true, offsetMs=None) - |--E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000),CachingConfig(true,2048))) - |---E~PromQLGrpcRemoteExec(PromQlQueryParams(test{job="app"}[900s],7599,1,7599,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=grpcEndpoint3.execStreaming, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000),CachingConfig(true,2048))) - |---E~PromQLGrpcRemoteExec(PromQlQueryParams(test{job="app"}[900s],7599,1,7599,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=grpcEndpoint2.execStreaming, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000),CachingConfig(true,2048))) - |---E~PromQLGrpcRemoteExec(PromQlQueryParams(test{job="app"}[900s],7599,1,7599,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=grpcEndpoint1.execStreaming, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000),CachingConfig(true,2048))) - |-E~PromQLGrpcRemoteExec(PromQlQueryParams(test{job = "app"},7600,100,10000,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=grpcEndpoint3.execStreaming, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000),CachingConfig(true,2048)))""".stripMargin + val expectedRawPlan = s"""E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000))) + |-E~PromQlRemoteExec(PromQlQueryParams(test{job = "app"},1000,100,3999,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote-url1, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000))) + |-T~PeriodicSamplesMapper(start=4000000, step=100000, end=4599000, window=None, functionId=None, rawSource=false, offsetMs=None) + |--E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000))) + |---E~PromQlRemoteExec(PromQlQueryParams(test{job="app"}[900s],4599,1,4599,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote-url1, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000))) + |---E~PromQlRemoteExec(PromQlQueryParams(test{job="app"}[900s],4599,1,4599,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote-url2, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000))) + |---E~PromQlRemoteExec(PromQlQueryParams(test{job="app"}[900s],4599,1,4599,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote-url3, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000))) + |-E~PromQlRemoteExec(PromQlQueryParams(test{job = "app"},4600,100,6999,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote-url2, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000))) + |-T~PeriodicSamplesMapper(start=7000000, step=100000, end=7599000, window=None, functionId=None, rawSource=false, offsetMs=None) + |--E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000))) + |---E~PromQlRemoteExec(PromQlQueryParams(test{job="app"}[900s],7599,1,7599,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote-url1, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000))) + |---E~PromQlRemoteExec(PromQlQueryParams(test{job="app"}[900s],7599,1,7599,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote-url2, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000))) + |---E~PromQlRemoteExec(PromQlQueryParams(test{job="app"}[900s],7599,1,7599,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote-url3, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000))) + |-E~PromQlRemoteExec(PromQlQueryParams(test{job = "app"},7600,100,10000,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote-url3, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000)))""".stripMargin validatePlan(execPlan, expectedRawPlan) } @@ -1602,15 +1601,18 @@ class MultiPartitionPlannerSpec extends AnyFunSpec with Matchers with PlanValida } it("should generate generate a raw export from remote from multiple partitions and stitch") { + val p1StartSecs = 1000 val p1EndSecs = 6999 val p2StartSecs = 7000 val p2EndSecs = 15000 val stepSecs = 100 + val queryStartSecs = 12000 + val subqueryLookbackSecs = 9000 def twoPartitions(): List[PartitionAssignment] = List( - PartitionAssignment("remote", "remote-url", TimeRange(p1StartSecs * 1000, p1EndSecs * 1000), Some("grpcEndpoint1")), - PartitionAssignment("local", "local-url", TimeRange(p2StartSecs * 1000, p2EndSecs * 1000), Some("grpcEndpoint1")) + PartitionAssignment("remote", "remote-url", TimeRange(p1StartSecs * 1000, p1EndSecs * 1000)), + PartitionAssignment("local", "local-url", TimeRange(p2StartSecs * 1000, p2EndSecs * 1000)) ) val partitionLocationProvider = new PartitionLocationProvider { @@ -1630,25 +1632,25 @@ class MultiPartitionPlannerSpec extends AnyFunSpec with Matchers with PlanValida } val expectedPlanWithRemoteExport = - s"""E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000),CachingConfig(true,2048))) - |-E~PromQLGrpcRemoteExec(PromQlQueryParams(sum(rate(test{job = "app"}[10m])),1000,100,6999,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=grpcEndpoint1.execStreaming, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000),CachingConfig(true,2048))) + s"""E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000))) + |-E~PromQlRemoteExec(PromQlQueryParams(sum(rate(test{job = "app"}[10m])),1000,100,6999,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000))) |-T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(7000,100,7899)) - |--E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000),CachingConfig(true,2048))) + |--E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000))) |---T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) - |----T~PeriodicSamplesMapper(start=7000000, step=100000, end=7899000, window=Some(600000), functionId=Some(Rate), rawSource=true, offsetMs=None) - |-----E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000),CachingConfig(true,2048))) - |------E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1345229141],raw) - |-------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=3, chunkMethod=TimeRangeChunkScan(6399000,7899000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1345229141],raw) - |-------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=19, chunkMethod=TimeRangeChunkScan(6399000,7899000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1345229141],raw) - |------E~PromQLGrpcRemoteExec(PromQlQueryParams(test{job="app"}[1500s],7899,1,7899,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=grpcEndpoint1.execStreaming, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000),CachingConfig(true,2048))) + |----T~PeriodicSamplesMapper(start=7000000, step=100000, end=7899000, window=Some(600000), functionId=Some(Rate), rawSource=false, offsetMs=None) + |-----E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000))) + |------E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#885676802],raw) + |-------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=3, chunkMethod=TimeRangeChunkScan(6399000,7899000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#885676802],raw) + |-------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=19, chunkMethod=TimeRangeChunkScan(6399000,7899000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#885676802],raw) + |------E~PromQlRemoteExec(PromQlQueryParams(test{job="app"}[1500s],7899,1,7899,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000))) |-T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(7900,100,10000)) - |--E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1345229141],raw) + |--E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#885676802],raw) |---T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) |----T~PeriodicSamplesMapper(start=7900000, step=100000, end=10000000, window=Some(600000), functionId=Some(Rate), rawSource=true, offsetMs=None) - |-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=3, chunkMethod=TimeRangeChunkScan(7300000,10000000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1345229141],raw) + |-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=3, chunkMethod=TimeRangeChunkScan(7300000,10000000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#885676802],raw) |---T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) |----T~PeriodicSamplesMapper(start=7900000, step=100000, end=10000000, window=Some(600000), functionId=Some(Rate), rawSource=true, offsetMs=None) - |-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=19, chunkMethod=TimeRangeChunkScan(7300000,10000000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1345229141],raw)""".stripMargin + |-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=19, chunkMethod=TimeRangeChunkScan(7300000,10000000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#885676802],raw)""".stripMargin val engine = new MultiPartitionPlanner(partitionLocationProvider, localPlanner, "local", dataset, queryConfig.copy(routingConfig = queryConfig.routingConfig.copy(supportRemoteRawExport = true))) @@ -1661,44 +1663,44 @@ class MultiPartitionPlannerSpec extends AnyFunSpec with Matchers with PlanValida validatePlan(execPlan1, expectedPlanWithRemoteExport) val expectedPlanWithRemoteExec1 = - """E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000),CachingConfig(true,2048))) - |-E~PromQLGrpcRemoteExec(PromQlQueryParams(sum(rate(test{job = "app"}[10m])) + sum(rate(bar{job = "app"}[5m])),1000,100,6999,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=grpcEndpoint1.execStreaming, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000),CachingConfig(true,2048))) - |-E~BinaryJoinExec(binaryOp=ADD, on=None, ignoring=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000),CachingConfig(true,2048))) + """E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000))) + |-E~PromQlRemoteExec(PromQlQueryParams(sum(rate(test{job = "app"}[10m])) + sum(rate(bar{job = "app"}[5m])),1000,100,6999,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000))) + |-E~BinaryJoinExec(binaryOp=ADD, on=None, ignoring=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000))) |--T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(7000,100,7899)) - |---E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000),CachingConfig(true,2048))) + |---E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000))) |----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) - |-----T~PeriodicSamplesMapper(start=7000000, step=100000, end=7899000, window=Some(600000), functionId=Some(Rate), rawSource=true, offsetMs=None) - |------E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000),CachingConfig(true,2048))) - |-------E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1680817524],raw) - |--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=3, chunkMethod=TimeRangeChunkScan(6399000,7899000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1680817524],raw) - |--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=19, chunkMethod=TimeRangeChunkScan(6399000,7899000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1680817524],raw) - |-------E~PromQLGrpcRemoteExec(PromQlQueryParams(test{job="app"}[1500s],7899,1,7899,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=grpcEndpoint1.execStreaming, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000),CachingConfig(true,2048))) + |-----T~PeriodicSamplesMapper(start=7000000, step=100000, end=7899000, window=Some(600000), functionId=Some(Rate), rawSource=false, offsetMs=None) + |------E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000))) + |-------E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-3#479288168],raw) + |--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=3, chunkMethod=TimeRangeChunkScan(6399000,7899000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-3#479288168],raw) + |--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=19, chunkMethod=TimeRangeChunkScan(6399000,7899000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-3#479288168],raw) + |-------E~PromQlRemoteExec(PromQlQueryParams(test{job="app"}[1500s],7899,1,7899,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000))) |--T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(7000,100,7899)) - |---E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000),CachingConfig(true,2048))) + |---E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000))) |----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) - |-----T~PeriodicSamplesMapper(start=7000000, step=100000, end=7899000, window=Some(300000), functionId=Some(Rate), rawSource=true, offsetMs=None) - |------E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000),CachingConfig(true,2048))) - |-------E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1680817524],raw) - |--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=11, chunkMethod=TimeRangeChunkScan(6699000,7899000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(bar))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1680817524],raw) - |--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=27, chunkMethod=TimeRangeChunkScan(6699000,7899000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(bar))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1680817524],raw) - |-------E~PromQLGrpcRemoteExec(PromQlQueryParams(bar{job="app"}[1200s],7899,1,7899,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=grpcEndpoint1.execStreaming, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000),CachingConfig(true,2048))) - |-E~BinaryJoinExec(binaryOp=ADD, on=None, ignoring=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1680817524],raw) + |-----T~PeriodicSamplesMapper(start=7000000, step=100000, end=7899000, window=Some(300000), functionId=Some(Rate), rawSource=false, offsetMs=None) + |------E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000))) + |-------E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-3#479288168],raw) + |--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=11, chunkMethod=TimeRangeChunkScan(6699000,7899000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(bar))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-3#479288168],raw) + |--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=27, chunkMethod=TimeRangeChunkScan(6699000,7899000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(bar))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-3#479288168],raw) + |-------E~PromQlRemoteExec(PromQlQueryParams(bar{job="app"}[1200s],7899,1,7899,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,300000))) + |-E~BinaryJoinExec(binaryOp=ADD, on=None, ignoring=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-3#479288168],raw) |--T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(7900,100,10000)) - |---E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1680817524],raw) + |---E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-3#479288168],raw) |----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) |-----T~PeriodicSamplesMapper(start=7900000, step=100000, end=10000000, window=Some(600000), functionId=Some(Rate), rawSource=true, offsetMs=None) - |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=3, chunkMethod=TimeRangeChunkScan(7300000,10000000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1680817524],raw) + |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=3, chunkMethod=TimeRangeChunkScan(7300000,10000000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-3#479288168],raw) |----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) |-----T~PeriodicSamplesMapper(start=7900000, step=100000, end=10000000, window=Some(600000), functionId=Some(Rate), rawSource=true, offsetMs=None) - |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=19, chunkMethod=TimeRangeChunkScan(7300000,10000000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1680817524],raw) + |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=19, chunkMethod=TimeRangeChunkScan(7300000,10000000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-3#479288168],raw) |--T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(7900,100,10000)) - |---E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1680817524],raw) + |---E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-3#479288168],raw) |----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) |-----T~PeriodicSamplesMapper(start=7900000, step=100000, end=10000000, window=Some(300000), functionId=Some(Rate), rawSource=true, offsetMs=None) - |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=11, chunkMethod=TimeRangeChunkScan(7600000,10000000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(bar))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1680817524],raw) + |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=11, chunkMethod=TimeRangeChunkScan(7600000,10000000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(bar))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-3#479288168],raw) |----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) |-----T~PeriodicSamplesMapper(start=7900000, step=100000, end=10000000, window=Some(300000), functionId=Some(Rate), rawSource=true, offsetMs=None) - |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=27, chunkMethod=TimeRangeChunkScan(7600000,10000000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(bar))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1680817524],raw)""".stripMargin + |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=27, chunkMethod=TimeRangeChunkScan(7600000,10000000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(bar))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-3#479288168],raw)""".stripMargin val query2 = "sum(rate(test{job = \"app\"}[10m])) + sum(rate(bar{job = \"app\"}[5m]))" val lp2 = Parser.queryRangeToLogicalPlan(query2, TimeStepParams(2000, stepSecs, 10000)) @@ -1709,29 +1711,31 @@ class MultiPartitionPlannerSpec extends AnyFunSpec with Matchers with PlanValida validatePlan(execPlan2, expectedPlanWithRemoteExec1) + + // Planner with period of uncertainty should still generate steps that are aligned with start and step, // that is should be snapped correctly val expectedPlanWithRemoteExportAndUncertainty = - s"""E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) - |-E~PromQLGrpcRemoteExec(PromQlQueryParams(sum(rate(test{job = "app"}[10m])),1000,400,6999,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=grpcEndpoint1.execStreaming, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + s"""E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000))) + |-E~PromQlRemoteExec(PromQlQueryParams(sum(rate(test{job = "app"}[10m])),1000,400,6999,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000))) |-T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(7000,400,7799)) - |--E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |--E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000))) |---T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) - |----T~PeriodicSamplesMapper(start=7000000, step=400000, end=7799000, window=Some(600000), functionId=Some(Rate), rawSource=true, offsetMs=None) - |-----E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) - |------E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#654367497],raw) - |-------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=3, chunkMethod=TimeRangeChunkScan(6399000,7799000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#654367497],raw) - |-------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=19, chunkMethod=TimeRangeChunkScan(6399000,7799000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#654367497],raw) - |------E~PromQLGrpcRemoteExec(PromQlQueryParams(test{job="app"}[1400s],7799,1,7799,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=grpcEndpoint1.execStreaming, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) + |----T~PeriodicSamplesMapper(start=7000000, step=400000, end=7799000, window=Some(600000), functionId=Some(Rate), rawSource=false, offsetMs=None) + |-----E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000))) + |------E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1666791464],raw) + |-------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=3, chunkMethod=TimeRangeChunkScan(6399000,7799000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1666791464],raw) + |-------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=19, chunkMethod=TimeRangeChunkScan(6399000,7799000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1666791464],raw) + |------E~PromQlRemoteExec(PromQlQueryParams(test{job="app"}[1400s],7799,1,7799,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000))) |-T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(7800,400,10000)) - |--E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#654367497],raw) + |--E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1666791464],raw) |---T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) |----T~PeriodicSamplesMapper(start=7800000, step=400000, end=10000000, window=Some(600000), functionId=Some(Rate), rawSource=true, offsetMs=None) - |-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=3, chunkMethod=TimeRangeChunkScan(7200000,10000000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#654367497],raw) + |-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=3, chunkMethod=TimeRangeChunkScan(7200000,10000000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1666791464],raw) |---T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) |----T~PeriodicSamplesMapper(start=7800000, step=400000, end=10000000, window=Some(600000), functionId=Some(Rate), rawSource=true, offsetMs=None) - |-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=19, chunkMethod=TimeRangeChunkScan(7200000,10000000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#654367497],raw)""".stripMargin + |-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=19, chunkMethod=TimeRangeChunkScan(7200000,10000000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1666791464],raw)""".stripMargin val engine1 = new MultiPartitionPlanner(partitionLocationProvider, localPlanner, "local", dataset, queryConfig.copy(routingConfig = queryConfig.routingConfig.copy( @@ -1743,6 +1747,8 @@ class MultiPartitionPlannerSpec extends AnyFunSpec with Matchers with PlanValida val execPlan3 = engine1.materialize(lp5, QueryContext(origQueryParams = promQlQueryParam5, plannerParams = PlannerParams(processMultiPartition = true))) validatePlan(execPlan3, expectedPlanWithRemoteExportAndUncertainty) + + } it ("should give the correct routing keys") { @@ -1755,39 +1761,4 @@ class MultiPartitionPlannerSpec extends AnyFunSpec with Matchers with PlanValida val expected = Set(Map("job" -> "abc"), Map("job" -> "def"), Map("job" -> "ghi")) mpp.getRoutingKeys(lp) shouldEqual expected } - - it ("Simulate namespace migration with histogram queries") { - val partitionLocationProvider = new PartitionLocationProvider { - override def getPartitions(routingKey: Map[String, String], timeRange: TimeRange): List[PartitionAssignment] = { - val mid = timeRange.startMs + (timeRange.endMs - timeRange.startMs) / 2 - List(PartitionAssignment("remote-1", "remote-url-1", TimeRange(timeRange.startMs, mid), Some("grpc-1")), - PartitionAssignment("remote-2", "remote-url-2", TimeRange(mid + 1, timeRange.endMs * 100), Some("grpc-2"))) - } - override def getMetadataPartitions(nonMetricShardKeyFilters: Seq[ColumnFilter], timeRange: TimeRange): List[PartitionAssignment] = - List(PartitionAssignment("local", "local-url", TimeRange(timeRange.startMs, timeRange.endMs))) - } - val histogramQuery = """histogram_quantile(0.5,sum(rate(mns_gmail_authenticate_request_ms{_ws_="acs-icloud",_ns_="mail-notifications",app="mail-notifications"}[2m])))""" - - val engine = new MultiPartitionPlanner(partitionLocationProvider, localPlanner, "local", dataset, queryConfig) - val lp = Parser.queryRangeToLogicalPlan(histogramQuery, TimeStepParams(1000, 100, 2000)) - val promQlQueryParams = PromQlQueryParams(histogramQuery, 1000, 100, 2000) - val execPlan = engine.materialize(lp, QueryContext(origQueryParams = promQlQueryParams, plannerParams = - PlannerParams(processMultiPartition = true))) - - // test the plan parameters - execPlan.isInstanceOf[StitchRvsExec] shouldEqual (true) - execPlan.children.length shouldEqual 3 - execPlan.children(0).isInstanceOf[PromQLGrpcRemoteExec] shouldEqual true - execPlan.children(1).isInstanceOf[LocalPartitionReduceAggregateExec] shouldEqual true - execPlan.children(2).isInstanceOf[PromQLGrpcRemoteExec] shouldEqual true - val innerRawExportPlan = execPlan.children(1).asInstanceOf[LocalPartitionReduceAggregateExec] - innerRawExportPlan.rangeVectorTransformers.length shouldEqual 2 - innerRawExportPlan.childAggregates.length shouldEqual 1 - innerRawExportPlan.childAggregates.head.isInstanceOf[StitchRvsExec] shouldEqual true - val innerStitchPlan = innerRawExportPlan.childAggregates.head.asInstanceOf[StitchRvsExec] - innerStitchPlan.rangeVectorTransformers.length shouldEqual 2 - innerStitchPlan.rangeVectorTransformers(0).isInstanceOf[PeriodicSamplesMapper] shouldEqual true - val psmT = innerStitchPlan.rangeVectorTransformers(0).asInstanceOf[PeriodicSamplesMapper] - psmT.rawSource shouldEqual true - } } diff --git a/coordinator/src/test/scala/filodb.coordinator/queryplanner/PlannerHierarchySpec.scala b/coordinator/src/test/scala/filodb.coordinator/queryplanner/PlannerHierarchySpec.scala index 58d52ab29..8c4bfac26 100644 --- a/coordinator/src/test/scala/filodb.coordinator/queryplanner/PlannerHierarchySpec.scala +++ b/coordinator/src/test/scala/filodb.coordinator/queryplanner/PlannerHierarchySpec.scala @@ -2728,7 +2728,7 @@ class PlannerHierarchySpec extends AnyFunSpec with Matchers with PlanValidationS |-T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(5001,3,6803)) |--E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) |---T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) - |----T~PeriodicSamplesMapper(start=5001000, step=3000, end=6803000, window=Some(1800000), functionId=Some(Rate), rawSource=true, offsetMs=None) + |----T~PeriodicSamplesMapper(start=5001000, step=3000, end=6803000, window=Some(1800000), functionId=Some(Rate), rawSource=false, offsetMs=None) |-----E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) |------E~PromQlRemoteExec(PromQlQueryParams(test{_ws_="demo",_ns_="localNs"}[3603s],6803,1,6803,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) |------E~PromQlRemoteExec(PromQlQueryParams(test{_ws_="demo",_ns_="localNs"}[3603s],6803,1,6803,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote1-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) @@ -2739,7 +2739,7 @@ class PlannerHierarchySpec extends AnyFunSpec with Matchers with PlanValidationS |-T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(6801,3,7403)) |--E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) |---T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) - |----T~PeriodicSamplesMapper(start=6801000, step=3000, end=7403000, window=Some(600000), functionId=Some(Rate), rawSource=true, offsetMs=Some(1800000)) + |----T~PeriodicSamplesMapper(start=6801000, step=3000, end=7403000, window=Some(600000), functionId=Some(Rate), rawSource=false, offsetMs=Some(1800000)) |-----E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) |------E~PromQlRemoteExec(PromQlQueryParams(test{_ws_="demo",_ns_="localNs"}[3003s],7403,1,7403,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) |------E~PromQlRemoteExec(PromQlQueryParams(test{_ws_="demo",_ns_="localNs"}[3003s],7403,1,7403,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote1-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) @@ -2751,14 +2751,14 @@ class PlannerHierarchySpec extends AnyFunSpec with Matchers with PlanValidationS |--T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(5601,3,8003)) |---E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) |----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) - |-----T~PeriodicSamplesMapper(start=5601000, step=3000, end=8003000, window=Some(1800000), functionId=Some(Rate), rawSource=true, offsetMs=Some(1200000)) + |-----T~PeriodicSamplesMapper(start=5601000, step=3000, end=8003000, window=Some(1800000), functionId=Some(Rate), rawSource=false, offsetMs=Some(1200000)) |------E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) |-------E~PromQlRemoteExec(PromQlQueryParams(test{_ws_="demo",_ns_="localNs"}[5403s],8003,1,8003,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) |-------E~PromQlRemoteExec(PromQlQueryParams(test{_ws_="demo",_ns_="localNs"}[5403s],8003,1,8003,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote1-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) |--T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(5601,3,8003)) |---E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) |----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) - |-----T~PeriodicSamplesMapper(start=5601000, step=3000, end=8003000, window=Some(1800000), functionId=Some(Rate), rawSource=true, offsetMs=Some(600000)) + |-----T~PeriodicSamplesMapper(start=5601000, step=3000, end=8003000, window=Some(1800000), functionId=Some(Rate), rawSource=false, offsetMs=Some(600000)) |------E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) |-------E~PromQlRemoteExec(PromQlQueryParams(test{_ws_="demo",_ns_="localNs"}[4803s],8003,1,8003,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) |-------E~PromQlRemoteExec(PromQlQueryParams(test{_ws_="demo",_ns_="localNs"}[4803s],8003,1,8003,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote1-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) @@ -2767,11 +2767,11 @@ class PlannerHierarchySpec extends AnyFunSpec with Matchers with PlanValidationS expected = """E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) |-E~PromQlRemoteExec(PromQlQueryParams(sum_over_time(test{_ws_ = "demo", _ns_ = "localNs"}[10m]) + sum_over_time(test{_ws_ = "demo", _ns_ = "localNs"}[30m] offset 10m),0,3,5000,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) |-E~BinaryJoinExec(binaryOp=ADD, on=None, ignoring=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) - |--T~PeriodicSamplesMapper(start=5001000, step=3000, end=7403000, window=Some(600000), functionId=Some(SumOverTime), rawSource=true, offsetMs=None) + |--T~PeriodicSamplesMapper(start=5001000, step=3000, end=7403000, window=Some(600000), functionId=Some(SumOverTime), rawSource=false, offsetMs=None) |---E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) |----E~PromQlRemoteExec(PromQlQueryParams(test{_ws_="demo",_ns_="localNs"}[3003s],7403,1,7403,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) |----E~PromQlRemoteExec(PromQlQueryParams(test{_ws_="demo",_ns_="localNs"}[3003s],7403,1,7403,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote1-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) - |--T~PeriodicSamplesMapper(start=5001000, step=3000, end=7403000, window=Some(1800000), functionId=Some(SumOverTime), rawSource=true, offsetMs=Some(600000)) + |--T~PeriodicSamplesMapper(start=5001000, step=3000, end=7403000, window=Some(1800000), functionId=Some(SumOverTime), rawSource=false, offsetMs=Some(600000)) |---E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) |----E~PromQlRemoteExec(PromQlQueryParams(test{_ws_="demo",_ns_="localNs"}[4803s],7403,1,7403,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) |----E~PromQlRemoteExec(PromQlQueryParams(test{_ws_="demo",_ns_="localNs"}[4803s],7403,1,7403,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote1-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) @@ -2781,7 +2781,7 @@ class PlannerHierarchySpec extends AnyFunSpec with Matchers with PlanValidationS |-E~PromQlRemoteExec(PromQlQueryParams(123 + sum_over_time(test{_ws_ = "demo", _ns_ = "localNs"}[10m] offset 10m),0,3,5000,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) |-T~ScalarOperationMapper(operator=ADD, scalarOnLhs=true) |--FA1~StaticFuncArgs(123.0,RangeParams(5001,3,6203)) - |--T~PeriodicSamplesMapper(start=5001000, step=3000, end=6203000, window=Some(600000), functionId=Some(SumOverTime), rawSource=true, offsetMs=Some(600000)) + |--T~PeriodicSamplesMapper(start=5001000, step=3000, end=6203000, window=Some(600000), functionId=Some(SumOverTime), rawSource=false, offsetMs=Some(600000)) |---E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) |----E~PromQlRemoteExec(PromQlQueryParams(test{_ws_="demo",_ns_="localNs"}[2403s],6203,1,6203,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) |----E~PromQlRemoteExec(PromQlQueryParams(test{_ws_="demo",_ns_="localNs"}[2403s],6203,1,6203,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote1-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) @@ -2792,11 +2792,11 @@ class PlannerHierarchySpec extends AnyFunSpec with Matchers with PlanValidationS |-E~BinaryJoinExec(binaryOp=ADD, on=None, ignoring=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) |--T~ScalarOperationMapper(operator=ADD, scalarOnLhs=true) |---FA1~StaticFuncArgs(123.0,RangeParams(5001,3,7403)) - |---T~PeriodicSamplesMapper(start=5001000, step=3000, end=7403000, window=Some(600000), functionId=Some(SumOverTime), rawSource=true, offsetMs=Some(600000)) + |---T~PeriodicSamplesMapper(start=5001000, step=3000, end=7403000, window=Some(600000), functionId=Some(SumOverTime), rawSource=false, offsetMs=Some(600000)) |----E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) |-----E~PromQlRemoteExec(PromQlQueryParams(test{_ws_="demo",_ns_="localNs"}[3603s],7403,1,7403,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) |-----E~PromQlRemoteExec(PromQlQueryParams(test{_ws_="demo",_ns_="localNs"}[3603s],7403,1,7403,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote1-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) - |--T~PeriodicSamplesMapper(start=5001000, step=3000, end=7403000, window=Some(1200000), functionId=Some(SumOverTime), rawSource=true, offsetMs=Some(1200000)) + |--T~PeriodicSamplesMapper(start=5001000, step=3000, end=7403000, window=Some(1200000), functionId=Some(SumOverTime), rawSource=false, offsetMs=Some(1200000)) |---E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) |----E~PromQlRemoteExec(PromQlQueryParams(test{_ws_="demo",_ns_="localNs"}[4803s],7403,1,7403,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote0-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048))) |----E~PromQlRemoteExec(PromQlQueryParams(test{_ws_="demo",_ns_="localNs"}[4803s],7403,1,7403,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remote1-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(true,3 days,true,3000),CachingConfig(true,2048)))