diff --git a/coordinator/src/main/scala/filodb.coordinator/queryplanner/MultiPartitionPlanner.scala b/coordinator/src/main/scala/filodb.coordinator/queryplanner/MultiPartitionPlanner.scala index 6c114d07c..8a7f10fda 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryplanner/MultiPartitionPlanner.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryplanner/MultiPartitionPlanner.scala @@ -22,15 +22,34 @@ import filodb.query.LogicalPlan._ import filodb.query.exec._ //scalastyle:off file.size.limit +case class PartitionDetails(partitionName: String, httpEndPoint: String, + grpcEndPoint: Option[String], proportion: Float) +trait PartitionAssignmentTrait { + val proportionMap: Map[String, PartitionDetails] + val timeRange: TimeRange +} case class PartitionAssignment(partitionName: String, httpEndPoint: String, timeRange: TimeRange, - grpcEndPoint: Option[String] = None) + grpcEndPoint: Option[String] = None) extends PartitionAssignmentTrait { + val proportionMap: Map[String, PartitionDetails] = + Map(partitionName -> PartitionDetails(partitionName, httpEndPoint, grpcEndPoint, 1.0f)) +} +case class PartitionAssignmentV2(proportionMap: Map[String, PartitionDetails], + timeRange: TimeRange) extends PartitionAssignmentTrait trait PartitionLocationProvider { + // keep this function for backward compatibility. def getPartitions(routingKey: Map[String, String], timeRange: TimeRange): List[PartitionAssignment] def getMetadataPartitions(nonMetricShardKeyFilters: Seq[ColumnFilter], timeRange: TimeRange): List[PartitionAssignment] + def getPartitionsTrait(routingKey: Map[String, String], timeRange: TimeRange): List[PartitionAssignmentTrait] = { + getPartitions(routingKey, timeRange) + } + def getMetadataPartitionsTrait(nonMetricShardKeyFilters: Seq[ColumnFilter], + timeRange: TimeRange): List[PartitionAssignmentTrait] = { + getMetadataPartitions(nonMetricShardKeyFilters, timeRange) + } } /** @@ -166,9 +185,9 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv val partitions = getPartitions(logicalPlan, paramToCheckPartitions) if (isSinglePartition(partitions)) { val (partitionName, startMs, endMs, grpcEndpoint) = partitions.headOption match { - case Some(pa: PartitionAssignment) - => (pa.partitionName, params.startSecs * 1000L, - params.endSecs * 1000L, pa.grpcEndPoint) + case Some(pa: PartitionAssignmentTrait) + => (pa.proportionMap.keys.head, params.startSecs * 1000L, + params.endSecs * 1000L, pa.proportionMap.values.head.grpcEndPoint) case None => (localPartitionName, params.startSecs * 1000L, params.endSecs * 1000L, None) } @@ -198,7 +217,7 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv PromQLGrpcRemoteExec(channel, remoteHttpTimeoutMs, remoteContext, inProcessPlanDispatcher, dataset.ref, plannerSelector) } else { - val remotePartitionEndpoint = partitions.head.httpEndPoint + val remotePartitionEndpoint = partitions.head.proportionMap.values.head.httpEndPoint val httpEndpoint = remotePartitionEndpoint + params.remoteQueryPath.getOrElse("") PromQlRemoteExec(httpEndpoint, remoteHttpTimeoutMs, remoteContext, inProcessPlanDispatcher, dataset.ref, remoteExecHttpClient) @@ -330,7 +349,7 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv val partitions = if (routingKeys.isEmpty) List.empty else { routingKeys.flatMap{ keys => - partitionLocationProvider.getPartitions(keys, queryTimeRange). + partitionLocationProvider.getPartitionsTrait(keys, queryTimeRange). sortBy(_.timeRange.startMs) }.toList } @@ -388,6 +407,27 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv PlanResult(execPlan:: Nil) } + private def materializeForPartition(logicalPlan: LogicalPlan, + partition: PartitionAssignmentTrait, + queryContext: QueryContext, + timeRangeOverride: Option[TimeRange] = None): ExecPlan = { + partition match { + case PartitionAssignment(partitionName, httpEndPoint, _, grpcEndPoint) => + materializeForPartition(logicalPlan, partitionName, grpcEndPoint, httpEndPoint, queryContext, timeRangeOverride) + case PartitionAssignmentV2(proportionMap, _) => + val plans = proportionMap.map(entry => { + val partitionDetails = entry._2 + materializeForPartition(logicalPlan, partitionDetails.partitionName, + partitionDetails.grpcEndPoint, partitionDetails.httpEndPoint, queryContext, timeRangeOverride) + }).toSeq + if (plans.size > 1) { + val dispatcher = PlannerUtil.pickDispatcher(plans) + MultiPartitionDistConcatExec(queryContext, dispatcher, plans) + } else { + plans.head + } + } + } /** * If the argument partition is local, materialize the LogicalPlan with the local planner. * Otherwise, create a PromQlRemoteExec. @@ -395,9 +435,11 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv * range is computed from the PromQlQueryParams. */ private def materializeForPartition(logicalPlan: LogicalPlan, - partition: PartitionAssignment, + partitionName: String, + grpcEndpoint: Option[String], + httpEndPoint: String, queryContext: QueryContext, - timeRangeOverride: Option[TimeRange] = None): ExecPlan = { + timeRangeOverride: Option[TimeRange]): ExecPlan = { val qContextWithOverride = timeRangeOverride.map{ r => val oldParams = queryContext.origQueryParams.asInstanceOf[PromQlQueryParams] val newParams = oldParams.copy(startSecs = r.startMs / 1000, endSecs = r.endMs / 1000) @@ -405,7 +447,6 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv }.getOrElse(queryContext) val queryParams = qContextWithOverride.origQueryParams.asInstanceOf[PromQlQueryParams] val timeRange = timeRangeOverride.getOrElse(TimeRange(1000 * queryParams.startSecs, 1000 * queryParams.endSecs)) - val (partitionName, grpcEndpoint) = (partition.partitionName, partition.grpcEndPoint) if (partitionName.equals(localPartitionName)) { // FIXME: the below check is needed because subquery tests fail when their // time-ranges are updated even with the original query params. @@ -423,7 +464,7 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv PromQLGrpcRemoteExec(channel, remoteHttpTimeoutMs, ctx, inProcessPlanDispatcher, dataset.ref, plannerSelector) } else { - val httpEndpoint = partition.httpEndPoint + queryParams.remoteQueryPath.getOrElse("") + val httpEndpoint = httpEndPoint + queryParams.remoteQueryPath.getOrElse("") PromQlRemoteExec(httpEndpoint, remoteHttpTimeoutMs, ctx, inProcessPlanDispatcher, dataset.ref, remoteExecHttpClient) } @@ -442,9 +483,9 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv * @param stepMsOpt occupied iff the returned ranges should describe periodic steps * (i.e. all range start times (except the first) should be snapped to a step) */ - private def getAssignmentQueryRanges(assignments: Seq[PartitionAssignment], queryRange: TimeRange, + private def getAssignmentQueryRanges(assignments: Seq[PartitionAssignmentTrait], queryRange: TimeRange, lookbackMs: Long = 0L, offsetMs: Seq[Long] = Seq(0L), - stepMsOpt: Option[Long] = None): Seq[(PartitionAssignment, TimeRange)] = { + stepMsOpt: Option[Long] = None): Seq[(PartitionAssignmentTrait, TimeRange)] = { // Construct a sequence of Option[TimeRange]; the ith range is None iff the ith partition has no range to query. // First partition doesn't need its start snapped to a periodic step, so deal with it separately. val filteredAssignments = assignments @@ -612,7 +653,7 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv } else { // materialize a plan for each range/assignment pair val (_, execPlans) = assignmentRanges.foldLeft( - (None: Option[(PartitionAssignment, TimeRange)], ListBuffer.empty[ExecPlan])) { + (None: Option[(PartitionAssignmentTrait, TimeRange)], ListBuffer.empty[ExecPlan])) { case (acc, next) => acc match { case (Some((_, prevTimeRange)), ep: ListBuffer[ExecPlan]) => val (currentAssignment, currentTimeRange) = next @@ -803,7 +844,8 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv localPartitionPlanner.materialize(lp, qContext) } else { - val execPlans = partitions.map { p => + val execPlans = partitions.flatMap(ps => ps.proportionMap.values.map(pd => + PartitionAssignment(pd.partitionName, pd.httpEndPoint, ps.timeRange, pd.grpcEndPoint))).map { p => logger.debug(s"partitionInfo=$p; queryParams=$queryParams") if (p.partitionName.equals(localPartitionName)) localPartitionPlanner.materialize( @@ -848,7 +890,8 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv logger.warn(s"no partitions found for $lp; defaulting to local planner") localPartitionPlanner.materialize(lp, qContext) } else { - val execPlans = partitions.map { p => + val execPlans = partitions.flatMap(ps => ps.proportionMap.values.map(pd => + PartitionAssignment(pd.partitionName, pd.httpEndPoint, ps.timeRange, pd.grpcEndPoint))).map { p => logger.debug(s"partition=$p; plan=$lp") if (p.partitionName.equals(localPartitionName)) localPartitionPlanner.materialize(lp, qContext) @@ -867,9 +910,10 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv PlanResult(execPlan::Nil) } - private def getMetadataPartitions(filters: Seq[ColumnFilter], timeRange: TimeRange): List[PartitionAssignment] = { + private def getMetadataPartitions(filters: Seq[ColumnFilter], + timeRange: TimeRange): List[PartitionAssignmentTrait] = { val nonMetricShardKeyFilters = filters.filter(f => dataset.options.nonMetricShardColumns.contains(f.column)) - partitionLocationProvider.getMetadataPartitions(nonMetricShardKeyFilters, timeRange) + partitionLocationProvider.getMetadataPartitionsTrait(nonMetricShardKeyFilters, timeRange) } private def createMetadataRemoteExec(qContext: QueryContext, partitionAssignment: PartitionAssignment, diff --git a/coordinator/src/main/scala/filodb.coordinator/queryplanner/PartitionLocationPlanner.scala b/coordinator/src/main/scala/filodb.coordinator/queryplanner/PartitionLocationPlanner.scala index 45f3871bb..cb5f7ba1b 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryplanner/PartitionLocationPlanner.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryplanner/PartitionLocationPlanner.scala @@ -26,7 +26,7 @@ abstract class PartitionLocationPlanner(dataset: Dataset, */ protected def getPartitions(logicalPlan: LogicalPlan, queryParams: PromQlQueryParams, - infiniteTimeRange: Boolean = false) : Seq[PartitionAssignment] = { + infiniteTimeRange: Boolean = false) : Seq[PartitionAssignmentTrait] = { //1. Get a Seq of all Leaf node filters val leafFilters = LogicalPlan.getColumnFilterGroup(logicalPlan) @@ -71,19 +71,19 @@ abstract class PartitionLocationPlanner(dataset: Dataset, //4. Based on the map in 2 and time range in 5, get the partitions to query routingKeyMap.flatMap(metricMap => - partitionLocationProvider.getPartitions(metricMap, queryTimeRange)) + partitionLocationProvider.getPartitionsTrait(metricMap, queryTimeRange)) } // scalastyle:on method.length /** * Checks if all the PartitionAssignments belong to same partition */ - protected def isSinglePartition(partitions: Seq[PartitionAssignment]) : Boolean = { + protected def isSinglePartition(partitions: Seq[PartitionAssignmentTrait]) : Boolean = { if (partitions.isEmpty) true else { - val partName = partitions.head.partitionName - partitions.forall(_.partitionName.equals(partName)) + val pSet = partitions.flatMap(p => p.proportionMap.keys) + pSet.forall(p => p.equals(pSet.head)) } } diff --git a/coordinator/src/main/scala/filodb.coordinator/queryplanner/ShardKeyRegexPlanner.scala b/coordinator/src/main/scala/filodb.coordinator/queryplanner/ShardKeyRegexPlanner.scala index 01ba33ca4..a589dee61 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryplanner/ShardKeyRegexPlanner.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryplanner/ShardKeyRegexPlanner.scala @@ -154,7 +154,7 @@ class ShardKeyRegexPlanner(val dataset: Dataset, val shardKeys = getShardKeys(logicalPlan) val partitions = shardKeys .flatMap(filters => getPartitions(logicalPlan.replaceFilters(filters), qParams)) - .map(_.partitionName) + .flatMap(_.proportionMap.keys) .distinct // NOTE: don't use partitions.size < 2. When partitions == 0, generateExec will not // materialize any plans because there are no partitions against which it should materialize. @@ -240,7 +240,7 @@ class ShardKeyRegexPlanner(val dataset: Dataset, val newLogicalPlan = logicalPlan.replaceFilters(key) val newQueryParams = queryParams.copy(promQl = LogicalPlanParser.convertToQuery(newLogicalPlan)) val partitions = getPartitions(newLogicalPlan, newQueryParams) - .map(_.partitionName) + .flatMap(_.proportionMap.keys) .distinct if (partitions.size > 1) { partitionSplitKeys.append(key) diff --git a/coordinator/src/test/scala/filodb.coordinator/queryplanner/MultiPartitionPlannerSpec.scala b/coordinator/src/test/scala/filodb.coordinator/queryplanner/MultiPartitionPlannerSpec.scala index f40712935..9a69fd260 100644 --- a/coordinator/src/test/scala/filodb.coordinator/queryplanner/MultiPartitionPlannerSpec.scala +++ b/coordinator/src/test/scala/filodb.coordinator/queryplanner/MultiPartitionPlannerSpec.scala @@ -51,6 +51,29 @@ class MultiPartitionPlannerSpec extends AnyFunSpec with Matchers with PlanValida def partitions(timeRange: TimeRange): List[PartitionAssignment] = List(PartitionAssignment("local", "local-url", TimeRange(timeRange.startMs, timeRange.endMs))) + def partitionsV2SinglePartition(timeRange: TimeRange): List[PartitionAssignmentV2] = List( + PartitionAssignmentV2(Map("local" -> PartitionDetails("local", "local-url", None, 1.0f)), + TimeRange(timeRange.startMs, timeRange.endMs))) + def partitionsV2MultiPartition(timeRange: TimeRange): List[PartitionAssignmentV2] = List( + PartitionAssignmentV2(Map("local" -> PartitionDetails("local", "local-url", None, .5f), + "remote" -> PartitionDetails("remote", "remote-url", None, .5f)), TimeRange(timeRange.startMs, timeRange.endMs))) + + def partitionsV2MultiPartitionAssignments(timeRange: TimeRange): List[PartitionAssignmentV2] = List( + PartitionAssignmentV2(Map("local" -> PartitionDetails("local", "local-url", None, .5f), + "remote1" -> PartitionDetails("remote1", "remote-url", None, .5f)), + TimeRange(timeRange.startMs, timeRange.startMs + (timeRange.endMs - timeRange.startMs) / 2)), + PartitionAssignmentV2(Map("local" -> PartitionDetails("local", "local-url", None, .5f), + "remote2" -> PartitionDetails("remote2", "remote-url", None, .5f)), + TimeRange(timeRange.startMs + (timeRange.endMs - timeRange.startMs) / 2 + 1, timeRange.endMs))) + + def partitionsV2MultiPartitionAssignmentsOnlyWeightChange(timeRange: TimeRange): List[PartitionAssignmentV2] = List( + PartitionAssignmentV2(Map("local" -> PartitionDetails("local", "local-url", None, .5f), + "remote" -> PartitionDetails("remote1", "remote-url", None, .5f)), + TimeRange(timeRange.startMs, timeRange.startMs + (timeRange.endMs - timeRange.startMs) / 2)), + PartitionAssignmentV2(Map("local" -> PartitionDetails("local", "local-url", None, .2f), + "remote" -> PartitionDetails("remote2", "remote-url", None, .8f)), + TimeRange(timeRange.startMs + (timeRange.endMs - timeRange.startMs) / 2 + 1, timeRange.endMs))) + it ("should not generate PromQlExec plan when partitions are local") { val partitionLocationProvider = new PartitionLocationProvider { override def getPartitions(routingKey: Map[String, String], timeRange: TimeRange): List[PartitionAssignment] = @@ -611,6 +634,117 @@ class MultiPartitionPlannerSpec extends AnyFunSpec with Matchers with PlanValida execPlan.isInstanceOf[BinaryJoinExec] shouldEqual (true) } + it("single-partition namespace with V2 assignment should work well") { + val partitionLocationProvider = new PartitionLocationProvider { + override def getPartitionsTrait(routingKey: Map[String, String], timeRange: TimeRange): List[PartitionAssignmentTrait] = + partitionsV2SinglePartition(timeRange) + + override def getMetadataPartitionsTrait(nonMetricShardKeyFilters: Seq[ColumnFilter], timeRange: TimeRange): List[PartitionAssignmentTrait] = + partitionsV2SinglePartition(timeRange) + + override def getPartitions(routingKey: Map[String, String], timeRange: TimeRange): List[PartitionAssignment] = ??? + + override def getMetadataPartitions(nonMetricShardKeyFilters: Seq[ColumnFilter], timeRange: TimeRange): List[PartitionAssignment] = ??? + } + + val engine = new MultiPartitionPlanner(partitionLocationProvider, localPlanner, "local", dataset, queryConfig) + val lp = Parser.queryRangeToLogicalPlan("test1{job = \"app\"} + test2{job = \"app\"}", + TimeStepParams(1000, 100, 2000)) + + val promQlQueryParams = PromQlQueryParams("test1{job = \"app\"} + test2{job = \"app\"}", 1000, 100, 2000) + + val execPlan = engine.materialize(lp, QueryContext(origQueryParams = promQlQueryParams, plannerParams = + PlannerParams(processMultiPartition = true))) + + execPlan.isInstanceOf[BinaryJoinExec] shouldEqual (true) + execPlan.children.forall(plan => plan.isInstanceOf[MultiPartitionDistConcatExec]) shouldEqual (false) + } + + it("multi-partition namespace with V2 assignment should work well") { + val partitionLocationProvider = new PartitionLocationProvider { + override def getPartitionsTrait(routingKey: Map[String, String], timeRange: TimeRange): List[PartitionAssignmentTrait] = + partitionsV2MultiPartition(timeRange) + + override def getMetadataPartitionsTrait(nonMetricShardKeyFilters: Seq[ColumnFilter], timeRange: TimeRange): List[PartitionAssignmentTrait] = + partitionsV2MultiPartition(timeRange) + + override def getPartitions(routingKey: Map[String, String], timeRange: TimeRange): List[PartitionAssignment] = ??? + + override def getMetadataPartitions(nonMetricShardKeyFilters: Seq[ColumnFilter], timeRange: TimeRange): List[PartitionAssignment] = ??? + } + + val engine = new MultiPartitionPlanner(partitionLocationProvider, localPlanner, "local", dataset, queryConfig) + val lp = Parser.queryRangeToLogicalPlan("test1{job = \"app\"} + test2{job = \"app\"}", + TimeStepParams(1000, 100, 2000)) + + val promQlQueryParams = PromQlQueryParams("test1{job = \"app\"} + test2{job = \"app\"}", 1000, 100, 2000) + + val execPlan = engine.materialize(lp, QueryContext(origQueryParams = promQlQueryParams, plannerParams = + PlannerParams(processMultiPartition = true))) + + execPlan.isInstanceOf[BinaryJoinExec] shouldEqual (true) + execPlan.children.forall(plan => plan.isInstanceOf[MultiPartitionDistConcatExec]) shouldEqual (true) + } + + it("multi-partition namespace with V2 assignment with partition assignment changes should work well") { + val partitionLocationProvider = new PartitionLocationProvider { + override def getPartitionsTrait(routingKey: Map[String, String], timeRange: TimeRange): List[PartitionAssignmentTrait] = + partitionsV2MultiPartitionAssignments(timeRange) + + override def getMetadataPartitionsTrait(nonMetricShardKeyFilters: Seq[ColumnFilter], timeRange: TimeRange): List[PartitionAssignmentTrait] = + partitionsV2MultiPartitionAssignments(timeRange) + + override def getPartitions(routingKey: Map[String, String], timeRange: TimeRange): List[PartitionAssignment] = ??? + + override def getMetadataPartitions(nonMetricShardKeyFilters: Seq[ColumnFilter], timeRange: TimeRange): List[PartitionAssignment] = ??? + } + + val engine = new MultiPartitionPlanner(partitionLocationProvider, localPlanner, "local", dataset, queryConfig) + val lp = Parser.queryRangeToLogicalPlan("test1{job = \"app\"} + test2{job = \"app\"}", + TimeStepParams(1000, 100, 2000)) + + val promQlQueryParams = PromQlQueryParams("test1{job = \"app\"} + test2{job = \"app\"}", 1000, 100, 2000) + + val execPlan = engine.materialize(lp, QueryContext(origQueryParams = promQlQueryParams, plannerParams = + PlannerParams(processMultiPartition = true))) + println(execPlan.printTree()) + + execPlan.isInstanceOf[StitchRvsExec] shouldEqual (true) + // Should have two MultiPartitionDistConcatExec for before and after assignment change. + execPlan.children.count(plan => plan.isInstanceOf[MultiPartitionDistConcatExec]) shouldEqual 2 + // Should have one BinaryJoinExec stitch the period when assignment change happens. + execPlan.children.count(plan => plan.isInstanceOf[BinaryJoinExec]) shouldEqual 1 + } + + it("multi-partition namespace with V2 assignment with partition assignment weight changes should work well") { + val partitionLocationProvider = new PartitionLocationProvider { + override def getPartitionsTrait(routingKey: Map[String, String], timeRange: TimeRange): List[PartitionAssignmentTrait] = + partitionsV2MultiPartitionAssignmentsOnlyWeightChange(timeRange) + + override def getMetadataPartitionsTrait(nonMetricShardKeyFilters: Seq[ColumnFilter], timeRange: TimeRange): List[PartitionAssignmentTrait] = + partitionsV2MultiPartitionAssignmentsOnlyWeightChange(timeRange) + + override def getPartitions(routingKey: Map[String, String], timeRange: TimeRange): List[PartitionAssignment] = ??? + + override def getMetadataPartitions(nonMetricShardKeyFilters: Seq[ColumnFilter], timeRange: TimeRange): List[PartitionAssignment] = ??? + } + + val engine = new MultiPartitionPlanner(partitionLocationProvider, localPlanner, "local", dataset, queryConfig) + val lp = Parser.queryRangeToLogicalPlan("test1{job = \"app\"} + test2{job = \"app\"}", + TimeStepParams(1000, 100, 2000)) + + val promQlQueryParams = PromQlQueryParams("test1{job = \"app\"} + test2{job = \"app\"}", 1000, 100, 2000) + + val execPlan = engine.materialize(lp, QueryContext(origQueryParams = promQlQueryParams, plannerParams = + PlannerParams(processMultiPartition = true))) + + execPlan.isInstanceOf[StitchRvsExec] shouldEqual (true) + // Should have two MultiPartitionDistConcatExec for before and after assignment change. + execPlan.children.count(plan => plan.isInstanceOf[MultiPartitionDistConcatExec]) shouldEqual 2 + // Should have one BinaryJoinExec stitch the period when assignment change happens. + execPlan.children.count(plan => plan.isInstanceOf[BinaryJoinExec]) shouldEqual 1 + } + it ("should have equal hashcode for identical getColumnFilterGroup") { val lp1 = Parser.queryRangeToLogicalPlan("test1{inst = \"inst-001\", job = \"app\", host = \"localhost\"}",