From b781e298b9a002823eb2ccccfd86d9130594db58 Mon Sep 17 00:00:00 2001 From: Alex Theimer Date: Fri, 14 Jun 2024 02:02:54 -0700 Subject: [PATCH 1/2] feat(query): arbitrary target-schema columns Target-schemas can now be defined against any labels (including non-shard-keys). --- .../queryplanner/DefaultPlanner.scala | 77 ++++++- .../queryplanner/LogicalPlanUtils.scala | 41 ++-- .../queryplanner/SingleClusterPlanner.scala | 134 +++-------- .../queryplanner/PlannerHierarchySpec.scala | 115 +++++++++ .../SingleClusterPlannerSpec.scala | 218 ++++++------------ 5 files changed, 315 insertions(+), 270 deletions(-) diff --git a/coordinator/src/main/scala/filodb.coordinator/queryplanner/DefaultPlanner.scala b/coordinator/src/main/scala/filodb.coordinator/queryplanner/DefaultPlanner.scala index b9be8d5906..a69fa9674a 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryplanner/DefaultPlanner.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryplanner/DefaultPlanner.scala @@ -2,14 +2,13 @@ package filodb.coordinator.queryplanner import java.util.concurrent.ThreadLocalRandom - import akka.serialization.SerializationExtension import com.typesafe.scalalogging.StrictLogging - import filodb.coordinator.{ActorPlanDispatcher, ActorSystemHolder, GrpcPlanDispatcher, RemoteActorPlanDispatcher} +import filodb.core.TargetSchemaProvider import filodb.core.metadata.{Dataset, DatasetOptions, Schemas} import filodb.core.query._ -import filodb.core.query.Filter.Equals +import filodb.core.query.Filter.{Equals, EqualsRegex} import filodb.core.store.{AllChunkScan, ChunkScanMethod, InMemoryChunkScan, TimeRangeChunkScan, WriteBufferChunkScan} import filodb.prometheus.ast.Vectors.PromMetricLabel import filodb.query._ @@ -841,4 +840,76 @@ object PlannerUtil extends StrictLogging { def replaceLastBucketOccurenceStringFromMetricName(metricName: String): String = { metricName.replaceAll("_bucket$", "") } + + /** + * Returns true iff the argument filters match the target-schema columns + * by either pure or regex equality. + */ + def hasAllTargetSchemaFilters(targetSchemaCols: Seq[String], + colFilters: Seq[ColumnFilter]): Boolean = { + targetSchemaCols.forall { tschemaCol => + colFilters.filter(_.column == tschemaCol).exists { + case ColumnFilter(_, regex: EqualsRegex) if QueryUtils.containsPipeOnlyRegex(regex.value.toString) => true + case ColumnFilter(_, equals: Equals) => true + case _ => false + } + } + } + + // scalastyle:off method.length + + /** + * Returns a target-schema iff all of the following are true: + * - When the argument filters are resolved into sets of pure equality filters, + * tschemas are defined for all sets. + * - All tschemas are defined against the same columns. + * - No tschema changes during the query range. + */ + def getTargetSchemaColumns(colFilters: Seq[ColumnFilter], + targetSchemaProvider: TargetSchemaProvider, + startMs: Long, endMs: Long): Option[Seq[String]] = { + val keyToValues = colFilters.filter { + case ColumnFilter(col, regex: EqualsRegex) if QueryUtils.containsPipeOnlyRegex(regex.value.toString) => true + case _ => false + }.map { colFilter => + val eqRegex = colFilter.filter.asInstanceOf[EqualsRegex] + val values = QueryUtils.splitAtUnescapedPipes(eqRegex.value.toString).distinct + (colFilter.column, values) + }.filter(_._2.nonEmpty) + .toMap + + val targetSchemaChanges = QueryUtils.makeAllKeyValueCombos(keyToValues).map { keyToValue => + // Replace pipe-concatenated EqualsRegex filters with Equals filters. + val equalsFilters = keyToValue.map(entry => ColumnFilter(entry._1, Equals(entry._2))).toSeq + val newFilters = LogicalPlanUtils.upsertFilters(colFilters, equalsFilters) + targetSchemaProvider.targetSchemaFunc(newFilters) + } + + val isChanging = targetSchemaChanges.exists { changes => + changes.nonEmpty && changes.exists(c => c.time >= startMs && c.time <= endMs) + } + if (isChanging) { + return None + } + + val targetSchemaOpts = targetSchemaChanges.map { changes => + val tsIndex = changes.lastIndexWhere(t => t.time <= startMs) + if (tsIndex > -1) Some(changes(tsIndex)) else None + } + if (targetSchemaOpts.exists(_.isEmpty)) { + return None + } + + val targetSchemas = targetSchemaOpts.map(_.get.schema) + val hasAllSameTargetSchemas = { + val headCols = targetSchemas.head.toSet + targetSchemas.tail.forall(_.toSet == headCols) + } + if (!hasAllSameTargetSchemas) { + return None + } + + Some(targetSchemas.head) + } + // scalastyle:on method.length } diff --git a/coordinator/src/main/scala/filodb.coordinator/queryplanner/LogicalPlanUtils.scala b/coordinator/src/main/scala/filodb.coordinator/queryplanner/LogicalPlanUtils.scala index d5ed505bf8..cf14285b8b 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryplanner/LogicalPlanUtils.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryplanner/LogicalPlanUtils.scala @@ -421,16 +421,15 @@ object LogicalPlanUtils extends StrictLogging { } /** - * Returns a set of target-schema columns iff all of: - * - all plan RawSeries share the same target-schema columns. - * - no target-schema definition changes during the query. + * Returns an occupied set of target-schema columns iff a + * target-schema is applicable to the plan. + * @param getShardKeyFilterGroups returns sets of pure-equals or pipe-concatenated + * regex-equals shard-key filters */ def sameRawSeriesTargetSchemaColumns(plan: LogicalPlan, targetSchemaProvider: TargetSchemaProvider, - getShardKeyFilters: RawSeries => Seq[Seq[ColumnFilter]]) + getShardKeyFilterGroups: RawSeries => Seq[Seq[ColumnFilter]]) : Option[Seq[String]] = { - // compose a stream of Options for each RawSeries-- - // the options contain a target-schema iff it is defined and unchanging. val rawSeries = LogicalPlan.findLeafLogicalPlans(plan) .filter(_.isInstanceOf[RawSeries]) .map(_.asInstanceOf[RawSeries]) @@ -438,26 +437,16 @@ object LogicalPlanUtils extends StrictLogging { // Cannot handle RawSeries without IntervalSelector. return None } - val rsTschemaOpts = rawSeries.flatMap{ rs => - val interval = LogicalPlanUtils.getSpanningIntervalSelector(rs) - val rawShardKeyFilters = getShardKeyFilters(rs) - // The filters might contain pipe-concatenated EqualsRegex values. - // Convert these into sets of single-valued Equals filters. - val resolvedShardKeyFilters = rawShardKeyFilters.flatMap { filters => - val equalsFilters: Seq[Seq[ColumnFilter]] = filters.map { filter => - filter.filter match { - case EqualsRegex(values: String) if QueryUtils.containsPipeOnlyRegex(values) => - QueryUtils.splitAtUnescapedPipes(values).map(value => ColumnFilter(filter.column, Equals(value))) - case _ => Seq(filter) - } - } - // E.g. foo{key1=~"baz|bat",key2=~"bar|bak"} would give the following combos: - // [[baz,bar], [baz,bak], [bat,bar], [bat,bak]] - QueryUtils.combinations(equalsFilters) - }.map(_.toSet).distinct.map(_.toSeq) // make sure keys are distinct - resolvedShardKeyFilters.map{ shardKey => - val filters = LogicalPlanUtils.upsertFilters(rs.filters, shardKey) - LogicalPlanUtils.getTargetSchemaIfUnchanging(targetSchemaProvider, filters, interval) + val rsTschemaOpts = rawSeries.flatMap { rs => + val interval = LogicalPlanUtils.getSpanningIntervalSelector(rs) + val rawFilters = LogicalPlan.getColumnFilterGroup(rs) + assert(rawFilters.size == 1, s"expected single RawSeries filter group; rawSeries=$rs; filters=$rawFilters") + val shardKeyFilterGroups = getShardKeyFilterGroups(rs) + shardKeyFilterGroups.map { shardKeyFilterGroup => + // Upsert shard-key equality filters into the full set of filters; + // use this new set to find the target-schema columns. + val newFilters = LogicalPlanUtils.upsertFilters(rawFilters.head.toSeq, shardKeyFilterGroup) + PlannerUtil.getTargetSchemaColumns(newFilters, targetSchemaProvider, interval.from, interval.to) } } if (rsTschemaOpts.isEmpty) { diff --git a/coordinator/src/main/scala/filodb.coordinator/queryplanner/SingleClusterPlanner.scala b/coordinator/src/main/scala/filodb.coordinator/queryplanner/SingleClusterPlanner.scala index 73db750ac4..51ad216f50 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryplanner/SingleClusterPlanner.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryplanner/SingleClusterPlanner.scala @@ -2,41 +2,29 @@ package filodb.coordinator.queryplanner import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ - import akka.actor.ActorRef import com.typesafe.scalalogging.StrictLogging import kamon.Kamon - import filodb.coordinator.{ActorPlanDispatcher, GrpcPlanDispatcher, RemoteActorPlanDispatcher, ShardMapper} import filodb.coordinator.client.QueryCommands.StaticSpreadProvider -import filodb.coordinator.queryplanner.SingleClusterPlanner.findTargetSchema -import filodb.core.{SpreadProvider, StaticTargetSchemaProvider, TargetSchemaChange, TargetSchemaProvider} +import filodb.coordinator.queryplanner.PlannerUtil.{getTargetSchemaColumns, hasAllTargetSchemaFilters} +import filodb.core.{SpreadProvider, StaticTargetSchemaProvider, TargetSchemaProvider} import filodb.core.binaryrecord2.RecordBuilder import filodb.core.metadata.{Dataset, DatasetOptions, Schemas} -import filodb.core.query.{Filter, _} -import filodb.core.query.Filter.{Equals, EqualsRegex} +import filodb.core.query._ +import filodb.core.query.Filter.Equals import filodb.prometheus.ast.Vectors.{PromMetricLabel, TypeLabel} import filodb.prometheus.ast.WindowConstants -import filodb.query.{exec, _} +import filodb.query._ import filodb.query.InstantFunctionId.HistogramBucket import filodb.query.LogicalPlan._ -import filodb.query.exec.{LocalPartitionDistConcatExec, _} +import filodb.query.exec._ import filodb.query.exec.InternalRangeFunction.Last // scalastyle:off file.size.limit object SingleClusterPlanner { private val mdNoShardKeyFilterRequests = Kamon.counter("queryengine-metadata-no-shardkey-requests").withoutTags - - // Find the TargetSchema that is applicable i.e effective for the current query window - private def findTargetSchema(targetSchemaChanges: Seq[TargetSchemaChange], - startMs: Long, endMs: Long): Option[TargetSchemaChange] = { - val tsIndex = targetSchemaChanges.lastIndexWhere(t => t.time <= startMs) - if(tsIndex > -1) - Some(targetSchemaChanges(tsIndex)) - else - None - } } /** @@ -74,65 +62,6 @@ class SingleClusterPlanner(val dataset: Dataset, qContext.plannerParams.targetSchemaProviderOverride.getOrElse(_targetSchemaProvider) } - /** - * Returns true iff a target-schema: - * (1) matches any shard-key matched by the argument filters, and - * (2) changes between the argument timestamps. - */ - def isTargetSchemaChanging(shardKeyFilters: Seq[ColumnFilter], - startMs: Long, endMs: Long, - qContext: QueryContext): Boolean = { - val keyToValues = shardKeyFilters.map { filter => - val values = filter match { - case ColumnFilter(col, regex: EqualsRegex) if QueryUtils.containsPipeOnlyRegex(regex.value.toString) => - QueryUtils.splitAtUnescapedPipes(regex.value.toString).distinct - case ColumnFilter(col, equals: Equals) => - Seq(equals.value.toString) - } - (filter.column, values) - }.toMap - QueryUtils.makeAllKeyValueCombos(keyToValues).exists { shardKeys => - // Replace any EqualsRegex shard-key filters with Equals. - val equalsFilters = shardKeys.map(entry => ColumnFilter(entry._1, Equals(entry._2))).toSeq - val newFilters = LogicalPlanUtils.upsertFilters(shardKeyFilters, equalsFilters) - val targetSchemaChanges = targetSchemaProvider(qContext).targetSchemaFunc(newFilters) - targetSchemaChanges.nonEmpty && targetSchemaChanges.exists(c => c.time >= startMs && c.time <= endMs) - } - } - - /** - * Returns true iff a target-schema should be used to identify query shards. - * A target-schema should be used iff all of: - * (1) A target-schema is defined for the argument filters. - * (2) The target-schema does not change between startMs and endMs. - * (3) All required target-schema columns are present in the argument filters. - * - * @param filters Query Column Filters - * @param targetSchema TargetSchema - * @return useTargetSchema - use target-schema to calculate query shards - */ - def useTargetSchemaForShards(filters: Seq[ColumnFilter], - startMs: Long, endMs: Long, - qContext: QueryContext): Boolean = { - val targetSchemaChanges = targetSchemaProvider(qContext).targetSchemaFunc(filters) - val targetSchemaOpt = findTargetSchema(targetSchemaChanges, startMs, endMs) - if (targetSchemaOpt.isEmpty) { - return false - } - - val shardKeyFilters = { - val filterOpts = dataset.options.nonMetricShardColumns.map(col => filters.find(_.column == col)) - assert(filterOpts.forall(_.isDefined), "expected all shard-key filters present but found: " + filters) - filterOpts.map(_.get) - } - val tsChangeExists = isTargetSchemaChanging(shardKeyFilters, startMs, endMs, qContext) - val allTSColsPresent = targetSchemaOpt.get.schema - .forall(tschemaCol => filters.exists(cf => - cf.column == tschemaCol && cf.filter.isInstanceOf[Equals])) - - !tsChangeExists && allTSColsPresent - } - import SingleClusterPlanner._ private def dispatcherForShard(shard: Int, forceInProcess: Boolean, queryContext: QueryContext): PlanDispatcher = { @@ -341,19 +270,15 @@ class SingleClusterPlanner(val dataset: Dataset, val shardValues = shardPairs.filterNot(_._1 == dsOptions.metricColumn).map(_._2) logger.debug(s"For shardColumns $shardColumns, extracted metric $metric and shard values $shardValues") - val targetSchemaChange = targetSchemaProvider(qContext).targetSchemaFunc(filters) - val targetSchema = { - if (targetSchemaChange.nonEmpty) { - findTargetSchema(targetSchemaChange, startMs, endMs).map(tsc => tsc.schema).getOrElse(Seq.empty) - } else Seq.empty - } - val shardHash = RecordBuilder.shardKeyHash(shardValues, dsOptions.metricColumn, metric, targetSchema) - if(useTargetSchemaForShards(filters, startMs, endMs, qContext)) { + val targetSchema = getTargetSchemaColumns(filters, targetSchemaProvider(qContext), startMs, endMs) + val shardHash = RecordBuilder.shardKeyHash(shardValues, dsOptions.metricColumn, metric, + targetSchema.getOrElse(Seq.empty)) + if(targetSchema.isDefined && hasAllTargetSchemaFilters(targetSchema.get, filters)) { val nonShardKeyLabelPairs = filters.filter(f => !shardColumns.contains(f.column) && f.filter.isInstanceOf[Filter.Equals]) .map(cf => cf.column -> cf.filter.asInstanceOf[Filter.Equals].value.toString).toMap - val partitionHash = RecordBuilder.partitionKeyHash(nonShardKeyLabelPairs, shardPairs.toMap, targetSchema, + val partitionHash = RecordBuilder.partitionKeyHash(nonShardKeyLabelPairs, shardPairs.toMap, targetSchema.get, dsOptions.metricColumn, metric) // since target-schema filter is provided in the query, ingestionShard can be used to find the single shard // that can answer the query. @@ -392,16 +317,35 @@ class SingleClusterPlanner(val dataset: Dataset, (shardCol, trimmedValues) } - // Find the union of all shards for each shard-key. - val shardKeys = QueryUtils.makeAllKeyValueCombos(shardColToValues.toMap) - shardKeys.flatMap{ shardKey => + // Find all *non*-shard-key filters matched by equality (pure or pipe-concatenated regex). + // Unlike above, this doesn't trim or throw exceptions. + val nonShardColToValues: Seq[(String, Seq[String])] = filters + .filterNot {f => shardColumns.contains(f.column)} + .map { + case ColumnFilter(col, Filter.Equals(filtVal: String)) => + (col, Seq(filtVal)) + case ColumnFilter(col, Filter.EqualsRegex(filtVal: String)) + if QueryUtils.containsPipeOnlyRegex(filtVal) => + val values = QueryUtils.splitAtUnescapedPipes(filtVal).distinct + (col, values) + case filter => (filter.column, Seq()) + } + .filter(_._2.nonEmpty) + + // Resolve (col -> seq[values]) pairs into all possible combinations of col->value sets. + // Create Equals filters for each entry and upsert into the argument filters; each + // new set of filters can then be used to identify a set of shards. + val colToValues = (shardColToValues ++ nonShardColToValues).toMap + val colToValueMaps = QueryUtils.makeAllKeyValueCombos(colToValues) + colToValueMaps.flatMap{ colValueMap => // Replace any EqualsRegex shard-key filters with Equals. val newFilters = filters.map{ filt => - shardKey.get(filt.column) + colValueMap.get(filt.column) .map(value => ColumnFilter(filt.column, Filter.Equals(value))) .getOrElse(filt) } - shardsFromValues(shardKey.toSeq, newFilters, qContext, startMs, endMs) + val newShardKeyFilters = colValueMap.filter(label => shardColumns.contains(label._1)) + shardsFromValues(newShardKeyFilters.toSeq, newFilters, qContext, startMs, endMs) }.distinct } } @@ -893,10 +837,6 @@ class SingleClusterPlanner(val dataset: Dataset, case _ => (0, Long.MaxValue) } - val shardKeyFilters = LogicalPlan.getNonMetricShardKeyFilters(lp, dataset.options.nonMetricShardColumns) - assert(shardKeyFilters.size == 1, "RawSeries with more than one shard-key group: " + lp) - val targetSchemaChangesExist = isTargetSchemaChanging(shardKeyFilters.head, startMs, endMs, qContext) - val execPlans = shardsFromFilters(renamedFilters, qContext, startMs, endMs).map { shard => val dispatcher = dispatcherForShard(shard, forceInProcess, qContext) val ep = MultiSchemaPartitionsExec( @@ -913,9 +853,7 @@ class SingleClusterPlanner(val dataset: Dataset, } // Stitch only if spread changes during the query-window. - // When a target-schema changes during query window, data might be ingested in - // different shards after the change. - PlanResult(execPlans, needsStitch || targetSchemaChangesExist) + PlanResult(execPlans, needsStitch) } // scalastyle:on method.length diff --git a/coordinator/src/test/scala/filodb.coordinator/queryplanner/PlannerHierarchySpec.scala b/coordinator/src/test/scala/filodb.coordinator/queryplanner/PlannerHierarchySpec.scala index e886edefbb..384734ef93 100644 --- a/coordinator/src/test/scala/filodb.coordinator/queryplanner/PlannerHierarchySpec.scala +++ b/coordinator/src/test/scala/filodb.coordinator/queryplanner/PlannerHierarchySpec.scala @@ -3696,6 +3696,121 @@ class PlannerHierarchySpec extends AnyFunSpec with Matchers with PlanValidationS } } + it("should correctly pushdown shard-key-regex queries when target-schema filter labels include non-shard-keys") { + + val SHARD_LABEL = "tschemaShard" + val FILTER_LABEL = "tschemaFilter" + val ENABLED = "enabled" + val timeParams = TimeStepParams(startSeconds, step, endSeconds) + + def tschemaProviderFunc(filters: Seq[ColumnFilter]): Seq[TargetSchemaChange] = { + filters + .find(f => f.column == FILTER_LABEL && f.filter.valuesStrings.toList.head.asInstanceOf[String] == ENABLED) + .map(_ => Seq(TargetSchemaChange(0, Seq("_ns_", SHARD_LABEL)))) + .getOrElse(Nil) + } + val tschema = FunctionalTargetSchemaProvider(tschemaProviderFunc) + + val tests = Seq( + // should pushdown + ("""foo{_ws_ = "demo", _ns_ =~".*Ns", tschemaFilter="enabled", tschemaShard="hello"}""", + """E~MultiPartitionDistConcatExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0))) + |-E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,None,None,None,100,false,false,true,Set(),None,Map(filodb-query-exec-aggregate-large-container -> 65536, filodb-query-exec-metadataexec -> 8192),RoutingConfig(false,3 days,true,300000))) + |--T~PeriodicSamplesMapper(start=1634173130000, step=300000, end=1634777330000, window=None, functionId=None, rawSource=true, offsetMs=None) + |---E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634172830000,1634777330000), filters=List(ColumnFilter(tschemaFilter,Equals(enabled)), ColumnFilter(tschemaShard,Equals(hello)), ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1315306304],raw) + |--T~PeriodicSamplesMapper(start=1633913330000, step=300000, end=1634172830000, window=None, functionId=None, rawSource=true, offsetMs=None) + |---E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1633913030000,1634172830000), filters=List(ColumnFilter(tschemaFilter,Equals(enabled)), ColumnFilter(tschemaShard,Equals(hello)), ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1315306304],downsample) + |-E~PromQlRemoteExec(PromQlQueryParams(foo{tschemaFilter="enabled",tschemaShard="hello",_ws_="demo",_ns_="remoteNs"},1633913330,300,1634777330,None,false), PlannerParams(filodb,None,None,None,Some(FunctionalTargetSchemaProvider(~)),60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,true,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remotePartition-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0)))""".stripMargin), + // no pushdown; shard label is missing + ("""foo{_ws_ = "demo", _ns_ =~".*Ns", tschemaFilter="enabled"}""", + """E~MultiPartitionDistConcatExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0))) + |-E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,None,None,None,100,false,false,true,Set(),None,Map(filodb-query-exec-aggregate-large-container -> 65536, filodb-query-exec-metadataexec -> 8192),RoutingConfig(false,3 days,true,300000))) + |--E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1206429904],raw) + |---T~PeriodicSamplesMapper(start=1634173130000, step=300000, end=1634777330000, window=None, functionId=None, rawSource=true, offsetMs=None) + |----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1634172830000,1634777330000), filters=List(ColumnFilter(tschemaFilter,Equals(enabled)), ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1206429904],raw) + |---T~PeriodicSamplesMapper(start=1634173130000, step=300000, end=1634777330000, window=None, functionId=None, rawSource=true, offsetMs=None) + |----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634172830000,1634777330000), filters=List(ColumnFilter(tschemaFilter,Equals(enabled)), ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1206429904],raw) + |--E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1206429904],downsample) + |---T~PeriodicSamplesMapper(start=1633913330000, step=300000, end=1634172830000, window=None, functionId=None, rawSource=true, offsetMs=None) + |----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1633913030000,1634172830000), filters=List(ColumnFilter(tschemaFilter,Equals(enabled)), ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1206429904],downsample) + |---T~PeriodicSamplesMapper(start=1633913330000, step=300000, end=1634172830000, window=None, functionId=None, rawSource=true, offsetMs=None) + |----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1633913030000,1634172830000), filters=List(ColumnFilter(tschemaFilter,Equals(enabled)), ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1206429904],downsample) + |-E~PromQlRemoteExec(PromQlQueryParams(foo{tschemaFilter="enabled",_ws_="demo",_ns_="remoteNs"},1633913330,300,1634777330,None,false), PlannerParams(filodb,None,None,None,Some(FunctionalTargetSchemaProvider(~)),60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,true,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remotePartition-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0)))""".stripMargin), + // no pushdown; filter label does not match tschema config + ("""foo{_ws_ = "demo", _ns_ =~".*Ns", tschemaFilter="disabled", tschemaShard="hello"}""", + """E~MultiPartitionDistConcatExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0))) + |-E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,None,None,None,100,false,false,true,Set(),None,Map(filodb-query-exec-aggregate-large-container -> 65536, filodb-query-exec-metadataexec -> 8192),RoutingConfig(false,3 days,true,300000))) + |--E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1215082528],raw) + |---T~PeriodicSamplesMapper(start=1634173130000, step=300000, end=1634777330000, window=None, functionId=None, rawSource=true, offsetMs=None) + |----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1634172830000,1634777330000), filters=List(ColumnFilter(tschemaFilter,Equals(disabled)), ColumnFilter(tschemaShard,Equals(hello)), ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1215082528],raw) + |---T~PeriodicSamplesMapper(start=1634173130000, step=300000, end=1634777330000, window=None, functionId=None, rawSource=true, offsetMs=None) + |----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634172830000,1634777330000), filters=List(ColumnFilter(tschemaFilter,Equals(disabled)), ColumnFilter(tschemaShard,Equals(hello)), ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1215082528],raw) + |--E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1215082528],downsample) + |---T~PeriodicSamplesMapper(start=1633913330000, step=300000, end=1634172830000, window=None, functionId=None, rawSource=true, offsetMs=None) + |----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1633913030000,1634172830000), filters=List(ColumnFilter(tschemaFilter,Equals(disabled)), ColumnFilter(tschemaShard,Equals(hello)), ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1215082528],downsample) + |---T~PeriodicSamplesMapper(start=1633913330000, step=300000, end=1634172830000, window=None, functionId=None, rawSource=true, offsetMs=None) + |----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1633913030000,1634172830000), filters=List(ColumnFilter(tschemaFilter,Equals(disabled)), ColumnFilter(tschemaShard,Equals(hello)), ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1215082528],downsample) + |-E~PromQlRemoteExec(PromQlQueryParams(foo{tschemaFilter="disabled",tschemaShard="hello",_ws_="demo",_ns_="remoteNs"},1633913330,300,1634777330,None,false), PlannerParams(filodb,None,None,None,Some(FunctionalTargetSchemaProvider(~)),60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,true,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remotePartition-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0)))""".stripMargin), + // only inner plans should be pushed down + ("""sum by (ohno) (foo{_ws_ = "demo", _ns_ =~".*Ns", tschemaFilter="enabled", tschemaShard="hello"})""", + """T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(1633913330,300,1634777330)) + |-E~MultiPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0))) + |--E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,None,None,None,100,false,false,true,Set(),None,Map(filodb-query-exec-aggregate-large-container -> 65536, filodb-query-exec-metadataexec -> 8192),RoutingConfig(false,3 days,true,300000))) + |---E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#566593597],raw) + |----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List(ohno)) + |-----T~PeriodicSamplesMapper(start=1634173130000, step=300000, end=1634777330000, window=None, functionId=None, rawSource=true, offsetMs=None) + |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634172830000,1634777330000), filters=List(ColumnFilter(tschemaFilter,Equals(enabled)), ColumnFilter(tschemaShard,Equals(hello)), ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#566593597],raw) + |---E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#566593597],downsample) + |----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List(ohno)) + |-----T~PeriodicSamplesMapper(start=1633913330000, step=300000, end=1634172830000, window=None, functionId=None, rawSource=true, offsetMs=None) + |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1633913030000,1634172830000), filters=List(ColumnFilter(tschemaFilter,Equals(enabled)), ColumnFilter(tschemaShard,Equals(hello)), ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#566593597],downsample) + |--E~PromQlRemoteExec(PromQlQueryParams(sum(foo{tschemaFilter="enabled",tschemaShard="hello",_ws_="demo",_ns_="remoteNs"}) by (ohno),1633913330,300,1634777330,None,false), PlannerParams(filodb,None,None,None,Some(FunctionalTargetSchemaProvider(~)),60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,true,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remotePartition-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0)))""".stripMargin), + // entire plan should be pushed down + ("""sum by (tschemaShard) (foo{_ws_ = "demo", _ns_ =~".*Ns", tschemaFilter="enabled", tschemaShard="hello"})""", + """E~MultiPartitionDistConcatExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,None,None,None,100,false,false,true,Set(),None,Map(filodb-query-exec-aggregate-large-container -> 65536, filodb-query-exec-metadataexec -> 8192),RoutingConfig(false,3 days,true,300000))) + |-E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,None,None,None,100,false,false,true,Set(),None,Map(filodb-query-exec-aggregate-large-container -> 65536, filodb-query-exec-metadataexec -> 8192),RoutingConfig(false,3 days,true,300000))) + |--T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(1634173130,300,1634777330)) + |---E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1503850392],raw) + |----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List(tschemaShard)) + |-----T~PeriodicSamplesMapper(start=1634173130000, step=300000, end=1634777330000, window=None, functionId=None, rawSource=true, offsetMs=None) + |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634172830000,1634777330000), filters=List(ColumnFilter(tschemaFilter,Equals(enabled)), ColumnFilter(tschemaShard,Equals(hello)), ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0))) + |--T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(1633913330,300,1634172830)) + |---E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1503850392],downsample) + |----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List(tschemaShard)) + |-----T~PeriodicSamplesMapper(start=1633913330000, step=300000, end=1634172830000, window=None, functionId=None, rawSource=true, offsetMs=None) + |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1633913030000,1634172830000), filters=List(ColumnFilter(tschemaFilter,Equals(enabled)), ColumnFilter(tschemaShard,Equals(hello)), ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0))) + |-E~PromQlRemoteExec(PromQlQueryParams(sum(foo{tschemaFilter="enabled",tschemaShard="hello",_ws_="demo",_ns_="remoteNs"}) by (tschemaShard),1633913330,300,1634777330,None,false), PlannerParams(filodb,None,None,None,Some(FunctionalTargetSchemaProvider(~)),60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,false,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remotePartition-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0)))""".stripMargin), + // nothing pushed down; filter does not match + ("""sum by (tschemaShard) (foo{_ws_ = "demo", _ns_ =~".*Ns", tschemaFilter="disabled", tschemaShard="hello"})""", + """T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(1633913330,300,1634777330)) + |-E~MultiPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0))) + |--E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,None,None,None,100,false,false,true,Set(),None,Map(filodb-query-exec-aggregate-large-container -> 65536, filodb-query-exec-metadataexec -> 8192),RoutingConfig(false,3 days,true,300000))) + |---E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-383802190],raw) + |----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List(tschemaShard)) + |-----T~PeriodicSamplesMapper(start=1634173130000, step=300000, end=1634777330000, window=None, functionId=None, rawSource=true, offsetMs=None) + |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1634172830000,1634777330000), filters=List(ColumnFilter(tschemaFilter,Equals(disabled)), ColumnFilter(tschemaShard,Equals(hello)), ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-383802190],raw) + |----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List(tschemaShard)) + |-----T~PeriodicSamplesMapper(start=1634173130000, step=300000, end=1634777330000, window=None, functionId=None, rawSource=true, offsetMs=None) + |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634172830000,1634777330000), filters=List(ColumnFilter(tschemaFilter,Equals(disabled)), ColumnFilter(tschemaShard,Equals(hello)), ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-383802190],raw) + |---E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-383802190],downsample) + |----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List(tschemaShard)) + |-----T~PeriodicSamplesMapper(start=1633913330000, step=300000, end=1634172830000, window=None, functionId=None, rawSource=true, offsetMs=None) + |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1633913030000,1634172830000), filters=List(ColumnFilter(tschemaFilter,Equals(disabled)), ColumnFilter(tschemaShard,Equals(hello)), ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-383802190],downsample) + |----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List(tschemaShard)) + |-----T~PeriodicSamplesMapper(start=1633913330000, step=300000, end=1634172830000, window=None, functionId=None, rawSource=true, offsetMs=None) + |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1633913030000,1634172830000), filters=List(ColumnFilter(tschemaFilter,Equals(disabled)), ColumnFilter(tschemaShard,Equals(hello)), ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-383802190],downsample) + |--E~PromQlRemoteExec(PromQlQueryParams(sum(foo{tschemaFilter="disabled",tschemaShard="hello",_ws_="demo",_ns_="remoteNs"}) by (tschemaShard),1633913330,300,1634777330,None,false), PlannerParams(filodb,None,None,None,Some(FunctionalTargetSchemaProvider(~)),60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,true,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remotePartition-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0)))""".stripMargin), + ) + for ((query, expected) <- tests) { + val lp = Parser.queryRangeToLogicalPlan(query, timeParams) + val context = QueryContext( + origQueryParams = PromQlQueryParams(query, timeParams.start, timeParams.step, timeParams.end), + plannerParams = PlannerParams(processMultiPartition = true, targetSchemaProviderOverride = Some(tschema))) + val ep = rootPlanner.materialize(lp, context) + validatePlan(ep, expected) + } + } + it("should correctly batch local/remote requests and individually handle split keys separately") { val partitionLocationProvider = new PartitionLocationProvider { override def getPartitions(routingKey: Map[String, String], timeRange: TimeRange): List[PartitionAssignment] = { diff --git a/coordinator/src/test/scala/filodb.coordinator/queryplanner/SingleClusterPlannerSpec.scala b/coordinator/src/test/scala/filodb.coordinator/queryplanner/SingleClusterPlannerSpec.scala index afd3462206..68ad730560 100644 --- a/coordinator/src/test/scala/filodb.coordinator/queryplanner/SingleClusterPlannerSpec.scala +++ b/coordinator/src/test/scala/filodb.coordinator/queryplanner/SingleClusterPlannerSpec.scala @@ -9,7 +9,7 @@ import filodb.core.metadata.Column.ColumnType import filodb.core.{GlobalScheduler, MetricsTestData, SpreadChange, TargetSchemaChange} import filodb.core.metadata.Schemas import filodb.core.query.{ColumnFilter, _} -import filodb.core.query.Filter.{Equals, EqualsRegex, NotEquals} +import filodb.core.query.Filter.{Equals, NotEquals} import filodb.core.store.TimeRangeChunkScan import filodb.prometheus.ast.{TimeStepParams, WindowConstants} import filodb.prometheus.parse.Parser @@ -385,20 +385,21 @@ class SingleClusterPlannerSpec extends AnyFunSpec with Matchers with ScalaFuture // Target-Schema start - it("should stitch results when target-schema changes during query range") { - val lp = Parser.queryRangeToLogicalPlan("""foo{job="bar"}""", TimeStepParams(20000, 100, 30000)) - def spread(filter: Seq[ColumnFilter]): Seq[SpreadChange] = { - Seq(SpreadChange(0, 2)) - } - def targetSchema(filter: Seq[ColumnFilter]): Seq[TargetSchemaChange] = { - Seq(TargetSchemaChange(0, Seq("job")), TargetSchemaChange(25000000L, Seq("job"))) - } - val execPlan = engine.materialize(lp, QueryContext(promQlQueryParams, plannerParams = PlannerParams - (spreadOverride = Some(FunctionalSpreadProvider(spread)), - targetSchemaProviderOverride = Some(FunctionalTargetSchemaProvider(targetSchema)), queryTimeoutMillis = 1000000))) - println(execPlan.children.size) - execPlan.rangeVectorTransformers.head.isInstanceOf[StitchRvsMapper] shouldEqual true - } + // Removed for now; target-schemas should not be applied when they change during a query range. + // it("should stitch results when target-schema changes during query range") { + // val lp = Parser.queryRangeToLogicalPlan("""foo{job="bar"}""", TimeStepParams(20000, 100, 30000)) + // def spread(filter: Seq[ColumnFilter]): Seq[SpreadChange] = { + // Seq(SpreadChange(0, 2)) + // } + // def targetSchema(filter: Seq[ColumnFilter]): Seq[TargetSchemaChange] = { + // Seq(TargetSchemaChange(0, Seq("job")), TargetSchemaChange(25000000L, Seq("job"))) + // } + // val execPlan = engine.materialize(lp, QueryContext(promQlQueryParams, plannerParams = PlannerParams + // (spreadOverride = Some(FunctionalSpreadProvider(spread)), + // targetSchemaProviderOverride = Some(FunctionalTargetSchemaProvider(targetSchema)), queryTimeoutMillis = 1000000))) + // println(execPlan.children.size) + // execPlan.rangeVectorTransformers.head.isInstanceOf[StitchRvsMapper] shouldEqual true + // } it("should apply the target schema appropriate to the query range") { @@ -473,19 +474,46 @@ class SingleClusterPlannerSpec extends AnyFunSpec with Matchers with ScalaFuture execPlan.rangeVectorTransformers.last.isInstanceOf[StitchRvsMapper] shouldEqual true } - it("should stitch results when target-schema has changed but spread did not change in query range") { - val lp = Parser.queryRangeToLogicalPlan("""foo{job="bar", instance="inst1"}""", TimeStepParams(20000, 100, 30000)) - def spread(filter: Seq[ColumnFilter]): Seq[SpreadChange] = { - Seq(SpreadChange(0, 2)) // Spread 4 - } + // Removed for now; target-schemas should not be applied when they change during a query range. + // it("should stitch results when target-schema has changed but spread did not change in query range") { + // val lp = Parser.queryRangeToLogicalPlan("""foo{job="bar", instance="inst1"}""", TimeStepParams(20000, 100, 30000)) + // def spread(filter: Seq[ColumnFilter]): Seq[SpreadChange] = { + // Seq(SpreadChange(0, 2)) // Spread 4 + // } + // def targetSchema(filter: Seq[ColumnFilter]): Seq[TargetSchemaChange] = { + // Seq(TargetSchemaChange(0, Seq("job")), TargetSchemaChange(25000000, Seq("job", "instance"))) + // } + // val execPlan = engine.materialize(lp, QueryContext(promQlQueryParams, plannerParams = PlannerParams + // (spreadOverride = Some(FunctionalSpreadProvider(spread)), + // targetSchemaProviderOverride = Some(FunctionalTargetSchemaProvider(targetSchema)), queryTimeoutMillis = 1000000))) + // execPlan.children.size shouldEqual 4 // target-schema does not apply when there are changes during a query-window + // execPlan.rangeVectorTransformers.last.isInstanceOf[StitchRvsMapper] shouldEqual true + // } + + it("should not apply target-schema when changes during query range") { + val query = """foo{job="bar", instance="inst1"}""" + val lpWithChanges = Parser.queryRangeToLogicalPlan(query, TimeStepParams(20000, 100, 30000)) + val lpNoChanges = Parser.queryRangeToLogicalPlan(query, TimeStepParams(26000, 100, 36000)) def targetSchema(filter: Seq[ColumnFilter]): Seq[TargetSchemaChange] = { Seq(TargetSchemaChange(0, Seq("job")), TargetSchemaChange(25000000, Seq("job", "instance"))) } - val execPlan = engine.materialize(lp, QueryContext(promQlQueryParams, plannerParams = PlannerParams - (spreadOverride = Some(FunctionalSpreadProvider(spread)), - targetSchemaProviderOverride = Some(FunctionalTargetSchemaProvider(targetSchema)), queryTimeoutMillis = 1000000))) - execPlan.children.size shouldEqual 4 // target-schema does not apply when there are changes during a query-window - execPlan.rangeVectorTransformers.last.isInstanceOf[StitchRvsMapper] shouldEqual true + val qContext = QueryContext(promQlQueryParams, + plannerParams = PlannerParams( + targetSchemaProviderOverride = Some(FunctionalTargetSchemaProvider(targetSchema))) + ) + + val execPlanWithChanges = engine.materialize(lpWithChanges, qContext) + val execPlanNoChanges = engine.materialize(lpNoChanges, qContext) + + validatePlan(execPlanWithChanges, + """E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1700637671],raw) + |-T~PeriodicSamplesMapper(start=20000000, step=100000, end=30000000, window=None, functionId=None, rawSource=true, offsetMs=None) + |--E~MultiSchemaPartitionsExec(dataset=timeseries, shard=5, chunkMethod=TimeRangeChunkScan(19700000,30000000), filters=List(ColumnFilter(job,Equals(bar)), ColumnFilter(instance,Equals(inst1)), ColumnFilter(__name__,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1700637671],raw) + |-T~PeriodicSamplesMapper(start=20000000, step=100000, end=30000000, window=None, functionId=None, rawSource=true, offsetMs=None) + |--E~MultiSchemaPartitionsExec(dataset=timeseries, shard=21, chunkMethod=TimeRangeChunkScan(19700000,30000000), filters=List(ColumnFilter(job,Equals(bar)), ColumnFilter(instance,Equals(inst1)), ColumnFilter(__name__,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1700637671],raw)""".stripMargin) + validatePlan(execPlanNoChanges, + """T~PeriodicSamplesMapper(start=26000000, step=100000, end=36000000, window=None, functionId=None, rawSource=true, offsetMs=None) + |-E~MultiSchemaPartitionsExec(dataset=timeseries, shard=27, chunkMethod=TimeRangeChunkScan(25700000,36000000), filters=List(ColumnFilter(job,Equals(bar)), ColumnFilter(instance,Equals(inst1)), ColumnFilter(__name__,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#849919583],raw)""".stripMargin) } it("should not stitch when all the target-schema labels are present in column filters in a binary join") { @@ -1051,28 +1079,28 @@ class SingleClusterPlannerSpec extends AnyFunSpec with Matchers with ScalaFuture |-----T~PeriodicSamplesMapper(start=20000000, step=100000, end=30000000, window=None, functionId=None, rawSource=true, offsetMs=None) |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=27, chunkMethod=TimeRangeChunkScan(19700000,30000000), filters=List(ColumnFilter(job,Equals(bar)), ColumnFilter(__name__,Equals(foo))), colName=None, schema=None) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,None))""".stripMargin), // Should pushdown with regex when all target-schema cols are given in `by` clause. - ("""sum(foo{job="bar",app=~"abc|def"}) by (job,app)""", - """E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1645725393],raw) + ("""sum(foo{job="bar",app=~"abc|defghij"}) by (job,app)""", + """E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1617319944],raw) |-T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(20000,100,30000)) - |--E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1645725393],raw) + |--E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1617319944],raw) |---T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List(job, app)) |----T~PeriodicSamplesMapper(start=20000000, step=100000, end=30000000, window=None, functionId=None, rawSource=true, offsetMs=None) - |-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=11, chunkMethod=TimeRangeChunkScan(19700000,30000000), filters=List(ColumnFilter(job,Equals(bar)), ColumnFilter(app,EqualsRegex(abc|def)), ColumnFilter(__name__,Equals(foo))), colName=None, schema=None) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,None)) + |-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=27, chunkMethod=TimeRangeChunkScan(19700000,30000000), filters=List(ColumnFilter(job,Equals(bar)), ColumnFilter(app,EqualsRegex(abc|defghij)), ColumnFilter(__name__,Equals(foo))), colName=None, schema=None) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,None,None,None,25,true,false,true,Set(),None,Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0))) |-T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(20000,100,30000)) - |--E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1645725393],raw) + |--E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1617319944],raw) |---T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List(job, app)) |----T~PeriodicSamplesMapper(start=20000000, step=100000, end=30000000, window=None, functionId=None, rawSource=true, offsetMs=None) - |-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=27, chunkMethod=TimeRangeChunkScan(19700000,30000000), filters=List(ColumnFilter(job,Equals(bar)), ColumnFilter(app,EqualsRegex(abc|def)), ColumnFilter(__name__,Equals(foo))), colName=None, schema=None) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,None))""".stripMargin), + |-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=11, chunkMethod=TimeRangeChunkScan(19700000,30000000), filters=List(ColumnFilter(job,Equals(bar)), ColumnFilter(app,EqualsRegex(abc|defghij)), ColumnFilter(__name__,Equals(foo))), colName=None, schema=None) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,None,None,None,25,true,false,true,Set(),None,Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0)))""".stripMargin), // Should not pushdown with regex when some target-schema cols are missing from `by` clause. - ("""sum(foo{job="bar",app=~"abc|def"}) by (job)""", + ("""sum(foo{job="bar",app=~"abc|defghij"}) by (job)""", """T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(20000,100,30000)) - |-E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1376873951],raw) + |-E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-481557981],raw) |--T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List(job)) |---T~PeriodicSamplesMapper(start=20000000, step=100000, end=30000000, window=None, functionId=None, rawSource=true, offsetMs=None) - |----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=11, chunkMethod=TimeRangeChunkScan(19700000,30000000), filters=List(ColumnFilter(job,Equals(bar)), ColumnFilter(app,EqualsRegex(abc|def)), ColumnFilter(__name__,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1376873951],raw) + |----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=27, chunkMethod=TimeRangeChunkScan(19700000,30000000), filters=List(ColumnFilter(job,Equals(bar)), ColumnFilter(app,EqualsRegex(abc|defghij)), ColumnFilter(__name__,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-481557981],raw) |--T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List(job)) |---T~PeriodicSamplesMapper(start=20000000, step=100000, end=30000000, window=None, functionId=None, rawSource=true, offsetMs=None) - |----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=27, chunkMethod=TimeRangeChunkScan(19700000,30000000), filters=List(ColumnFilter(job,Equals(bar)), ColumnFilter(app,EqualsRegex(abc|def)), ColumnFilter(__name__,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1376873951],raw)""".stripMargin), + |----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=11, chunkMethod=TimeRangeChunkScan(19700000,30000000), filters=List(ColumnFilter(job,Equals(bar)), ColumnFilter(app,EqualsRegex(abc|defghij)), ColumnFilter(__name__,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-481557981],raw)""".stripMargin), // ============== BEGIN COMPOUND TESTS ================== @@ -1229,7 +1257,7 @@ class SingleClusterPlannerSpec extends AnyFunSpec with Matchers with ScalaFuture (spreadOverride = Some(FunctionalSpreadProvider(spread)), targetSchemaProviderOverride = Some(FunctionalTargetSchemaProvider(targetSchema)), queryTimeoutMillis = 1000000))) try { - validatePlan(execPlan, expected) + validatePlan(execPlan, expected, sort = true) } catch { case e: TestFailedException => println(s"Plan validation failed for query: $query") @@ -1758,28 +1786,28 @@ class SingleClusterPlannerSpec extends AnyFunSpec with Matchers with ScalaFuture |-----T~PeriodicSamplesMapper(start=20000000, step=100000, end=30000000, window=None, functionId=None, rawSource=true, offsetMs=None) |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=27, chunkMethod=TimeRangeChunkScan(19700000,30000000), filters=List(ColumnFilter(job,Equals(bar)), ColumnFilter(__name__,Equals(foo))), colName=None, schema=None) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,None,true,false,true))""".stripMargin), // Should pushdown with regex when all target-schema cols are given in `by` clause. - ("""sum(foo{job="bar",app=~"abc|def"}) by (app)""", - """E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1929473151],raw) + ("""sum(foo{job="bar",app=~"abc|defghijk"}) by (app)""", + """E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-593538160],raw) |-T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(20000,100,30000)) - |--E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1929473151],raw) + |--E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-593538160],raw) |---T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List(app)) |----T~PeriodicSamplesMapper(start=20000000, step=100000, end=30000000, window=None, functionId=None, rawSource=true, offsetMs=None) - |-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=11, chunkMethod=TimeRangeChunkScan(19700000,30000000), filters=List(ColumnFilter(job,Equals(bar)), ColumnFilter(app,EqualsRegex(abc|def)), ColumnFilter(__name__,Equals(foo))), colName=None, schema=None) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,None,true,false,true)) + |-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=27, chunkMethod=TimeRangeChunkScan(19700000,30000000), filters=List(ColumnFilter(job,Equals(bar)), ColumnFilter(app,EqualsRegex(abc|defghijk)), ColumnFilter(__name__,Equals(foo))), colName=None, schema=None) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,None,None,None,25,true,false,true,Set(),None,Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0))) |-T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(20000,100,30000)) - |--E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1929473151],raw) + |--E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-593538160],raw) |---T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List(app)) |----T~PeriodicSamplesMapper(start=20000000, step=100000, end=30000000, window=None, functionId=None, rawSource=true, offsetMs=None) - |-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=27, chunkMethod=TimeRangeChunkScan(19700000,30000000), filters=List(ColumnFilter(job,Equals(bar)), ColumnFilter(app,EqualsRegex(abc|def)), ColumnFilter(__name__,Equals(foo))), colName=None, schema=None) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,None,true,false,true))""".stripMargin), + |-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=11, chunkMethod=TimeRangeChunkScan(19700000,30000000), filters=List(ColumnFilter(job,Equals(bar)), ColumnFilter(app,EqualsRegex(abc|defghijk)), ColumnFilter(__name__,Equals(foo))), colName=None, schema=None) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,None,None,None,25,true,false,true,Set(),None,Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0)))""".stripMargin), // Should not pushdown with regex when some target-schema cols are missing from `by` clause. - ("""sum(foo{job="bar",app=~"abc|def"}) by ()""", + ("""sum(foo{job="bar",app=~"abc|defghijk"}) by ()""", """T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(20000,100,30000)) - |-E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1929473151],raw) + |-E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1232087542],raw) |--T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) |---T~PeriodicSamplesMapper(start=20000000, step=100000, end=30000000, window=None, functionId=None, rawSource=true, offsetMs=None) - |----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=11, chunkMethod=TimeRangeChunkScan(19700000,30000000), filters=List(ColumnFilter(job,Equals(bar)), ColumnFilter(app,EqualsRegex(abc|def)), ColumnFilter(__name__,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1929473151],raw) + |----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=27, chunkMethod=TimeRangeChunkScan(19700000,30000000), filters=List(ColumnFilter(job,Equals(bar)), ColumnFilter(app,EqualsRegex(abc|defghijk)), ColumnFilter(__name__,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1232087542],raw) |--T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) |---T~PeriodicSamplesMapper(start=20000000, step=100000, end=30000000, window=None, functionId=None, rawSource=true, offsetMs=None) - |----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=27, chunkMethod=TimeRangeChunkScan(19700000,30000000), filters=List(ColumnFilter(job,Equals(bar)), ColumnFilter(app,EqualsRegex(abc|def)), ColumnFilter(__name__,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1929473151],raw)""".stripMargin), + |----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=11, chunkMethod=TimeRangeChunkScan(19700000,30000000), filters=List(ColumnFilter(job,Equals(bar)), ColumnFilter(app,EqualsRegex(abc|defghijk)), ColumnFilter(__name__,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1232087542],raw)""".stripMargin), // ============== BEGIN COMPOUND TESTS ================== @@ -1936,7 +1964,7 @@ class SingleClusterPlannerSpec extends AnyFunSpec with Matchers with ScalaFuture (spreadOverride = Some(FunctionalSpreadProvider(spread)), targetSchemaProviderOverride = Some(FunctionalTargetSchemaProvider(targetSchema)), queryTimeoutMillis = 1000000))) try { - validatePlan(execPlan, expected) + validatePlan(execPlan, expected, sort = true) } catch { case e: TestFailedException => println(s"Plan validation failed for query: $query") @@ -2704,102 +2732,6 @@ class SingleClusterPlannerSpec extends AnyFunSpec with Matchers with ScalaFuture reduced = concat.reduceSchemas(reduced, qres2.resultSchema) } - it ("should correctly determine when target-schema changes") { - val qContext = QueryContext(promQlQueryParams) - val startMs = 1000 * promQlQueryParams.startSecs - val endMs = 1000 * promQlQueryParams.endSecs - val filters = Seq(ColumnFilter("app", EqualsRegex("foo|bar"))) - val isTschemaChanging = (tschemaProviderFunc: Seq[ColumnFilter] => Seq[TargetSchemaChange]) => { - val tschemaProvider = FunctionalTargetSchemaProvider(tschemaProviderFunc) - val scp = new SingleClusterPlanner( - dataset, schemas, mapperRef, earliestRetainedTimestampFn = 0, - queryConfig, clusterName = "raw", _targetSchemaProvider = tschemaProvider) - scp.isTargetSchemaChanging(filters, startMs, endMs, qContext) - } - - val none = (filters: Seq[ColumnFilter]) => Nil - isTschemaChanging(none) shouldEqual false - - val unchanging = (filters: Seq[ColumnFilter]) => Seq(TargetSchemaChange(0, Seq("hello"))) - isTschemaChanging(unchanging) shouldEqual false - - val unchangingMultiple = (filters: Seq[ColumnFilter]) => Seq(TargetSchemaChange(0, Seq("hello", "goodbye"))) - isTschemaChanging(unchangingMultiple) shouldEqual false - - val unchangingDifferent = (filters: Seq[ColumnFilter]) => { - if (filters.exists(_.filter.valuesStrings.head == "foo")) { - Seq(TargetSchemaChange(0, Seq("hello"))) - } else { - Seq(TargetSchemaChange(0, Seq("goodbye"))) - } - } - isTschemaChanging(unchangingDifferent) shouldEqual false - - val firstTschema = (filters: Seq[ColumnFilter]) => Seq(TargetSchemaChange(startMs + 1000, Seq("hello"))) - isTschemaChanging(firstTschema) shouldEqual true - - val tschemaChanges = (filters: Seq[ColumnFilter]) => Seq( - TargetSchemaChange(0, Seq("hello")), - TargetSchemaChange(startMs + 1000, Seq("goodbye")), - ) - isTschemaChanging(tschemaChanges) shouldEqual true - - val tschemaChangesAfter = (filters: Seq[ColumnFilter]) => Seq( - TargetSchemaChange(0, Seq("hello")), - TargetSchemaChange(endMs + 1000, Seq("goodbye")), - ) - isTschemaChanging(tschemaChangesAfter) shouldEqual false - - val oneChanges = (filters: Seq[ColumnFilter]) => { - if (filters.exists(_.filter.valuesStrings.head == "foo")) { - Seq( - TargetSchemaChange(0, Seq("hello")), - TargetSchemaChange(startMs + 1000, Seq("goodbye"))) - } else { - Seq(TargetSchemaChange(0, Seq("goodbye"))) - } - } - isTschemaChanging(oneChanges) shouldEqual true - } - - it("should correctly determine when to use target-schema to find shards") { - val qContext = QueryContext(promQlQueryParams) - val startMs = 1000 * promQlQueryParams.startSecs - val endMs = 1000 * promQlQueryParams.endSecs - val filters = Seq(ColumnFilter("job", Equals("foo"))) - val useTschemaForShards = (tschemaProviderFunc: Seq[ColumnFilter] => Seq[TargetSchemaChange]) => { - val tschemaProvider = FunctionalTargetSchemaProvider(tschemaProviderFunc) - val scp = new SingleClusterPlanner( - dataset, schemas, mapperRef, earliestRetainedTimestampFn = 0, - queryConfig, clusterName = "raw", _targetSchemaProvider = tschemaProvider) - scp.useTargetSchemaForShards(filters, startMs, endMs, qContext) - } - - val none = (filters: Seq[ColumnFilter]) => Nil - useTschemaForShards(none) shouldEqual false - - val unchanging = (filters: Seq[ColumnFilter]) => Seq(TargetSchemaChange(0, Seq("job"))) - useTschemaForShards(unchanging) shouldEqual true - - val unchangingWrongCol = (filters: Seq[ColumnFilter]) => Seq(TargetSchemaChange(0, Seq("wrong"))) - useTschemaForShards(unchangingWrongCol) shouldEqual false - - val firstTschema = (filters: Seq[ColumnFilter]) => Seq(TargetSchemaChange(startMs + 1000, Seq("job"))) - useTschemaForShards(firstTschema) shouldEqual false - - val tschemaChanges = (filters: Seq[ColumnFilter]) => Seq( - TargetSchemaChange(0, Seq("job")), - TargetSchemaChange(startMs + 1000, Seq("job")), - ) - useTschemaForShards(tschemaChanges) shouldEqual false - - val tschemaChangesAfter = (filters: Seq[ColumnFilter]) => Seq( - TargetSchemaChange(0, Seq("job")), - TargetSchemaChange(endMs + 1000, Seq("job")), - ) - useTschemaForShards(tschemaChangesAfter) shouldEqual true - } - it ("should correctly identify shards to query") { val qContext = QueryContext(promQlQueryParams) val startMs = 1000 * promQlQueryParams.startSecs From 4821be7a8c9b47ead434c14eea78cb583ac7c6c4 Mon Sep 17 00:00:00 2001 From: Alex Theimer Date: Fri, 14 Jun 2024 12:25:35 -0700 Subject: [PATCH 2/2] scalastyle --- .../filodb.coordinator/queryplanner/DefaultPlanner.scala | 5 +++++ .../queryplanner/SingleClusterPlanner.scala | 2 ++ 2 files changed, 7 insertions(+) diff --git a/coordinator/src/main/scala/filodb.coordinator/queryplanner/DefaultPlanner.scala b/coordinator/src/main/scala/filodb.coordinator/queryplanner/DefaultPlanner.scala index a69fa9674a..7407104223 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryplanner/DefaultPlanner.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryplanner/DefaultPlanner.scala @@ -2,8 +2,10 @@ package filodb.coordinator.queryplanner import java.util.concurrent.ThreadLocalRandom + import akka.serialization.SerializationExtension import com.typesafe.scalalogging.StrictLogging + import filodb.coordinator.{ActorPlanDispatcher, ActorSystemHolder, GrpcPlanDispatcher, RemoteActorPlanDispatcher} import filodb.core.TargetSchemaProvider import filodb.core.metadata.{Dataset, DatasetOptions, Schemas} @@ -17,6 +19,7 @@ import filodb.query.LogicalPlan._ import filodb.query.exec._ import filodb.query.exec.InternalRangeFunction.Last +// scalastyle:off file.size.limit /** * Intermediate Plan Result includes the exec plan(s) along with any state to be passed up the @@ -913,3 +916,5 @@ object PlannerUtil extends StrictLogging { } // scalastyle:on method.length } + +// scalastyle:on file.size.limit diff --git a/coordinator/src/main/scala/filodb.coordinator/queryplanner/SingleClusterPlanner.scala b/coordinator/src/main/scala/filodb.coordinator/queryplanner/SingleClusterPlanner.scala index 51ad216f50..cae04cda31 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryplanner/SingleClusterPlanner.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryplanner/SingleClusterPlanner.scala @@ -2,9 +2,11 @@ package filodb.coordinator.queryplanner import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ + import akka.actor.ActorRef import com.typesafe.scalalogging.StrictLogging import kamon.Kamon + import filodb.coordinator.{ActorPlanDispatcher, GrpcPlanDispatcher, RemoteActorPlanDispatcher, ShardMapper} import filodb.coordinator.client.QueryCommands.StaticSpreadProvider import filodb.coordinator.queryplanner.PlannerUtil.{getTargetSchemaColumns, hasAllTargetSchemaFilters}