Skip to content

Commit

Permalink
Revert "Setting rawSource to true and fixing the unit tests (#1920)" (#…
Browse files Browse the repository at this point in the history
…1933)

This reverts commit 0999aec.
  • Loading branch information
sandeep6189 authored Jan 18, 2025
1 parent a610493 commit 42f5072
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 152 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,10 @@ trait DefaultPlanner {
} else lp

val series = walkLogicalPlanTree(logicalPlanWithoutBucket.series, qContext, forceInProcess)
// the series is raw and supports raw export, it's going to yield an iterator
val rawSource = logicalPlanWithoutBucket.series.isRaw
val rawSource = logicalPlanWithoutBucket.series.isRaw && (logicalPlanWithoutBucket.series match {
case r: RawSeries => !r.supportsRemoteDataCall
case _ => true
}) // the series is raw and supports raw export, its going to yield an iterator

/* Last function is used to get the latest value in the window for absent_over_time
If no data is present AbsentFunctionMapper will return range vector with value 1 */
Expand Down Expand Up @@ -201,7 +203,10 @@ trait DefaultPlanner {
} else (None, None, lp)

val rawSeries = walkLogicalPlanTree(lpWithoutBucket.rawSeries, qContext, forceInProcess)
val rawSource = lpWithoutBucket.rawSeries.isRaw
val rawSource = lpWithoutBucket.rawSeries.isRaw && (lpWithoutBucket.rawSeries match {
case r: RawSeries => !r.supportsRemoteDataCall
case _ => true
})
rawSeries.plans.foreach(_.addRangeVectorTransformer(PeriodicSamplesMapper(lp.startMs, lp.stepMs, lp.endMs,
window = None, functionId = None,
stepMultipleNotationUsed = false, funcParams = Nil,
Expand Down Expand Up @@ -827,7 +832,7 @@ object PlannerUtil extends StrictLogging {
rewritePlanWithRemoteRawExport(_, rangeSelector, additionalLookbackMs).asInstanceOf[FunctionArgsPlan]),
series = rewritePlanWithRemoteRawExport(lp.series, rangeSelector, additionalLookbackMs)
.asInstanceOf[RawSeriesLikePlan])
// won't bother rewriting and adjusting the start and end for metadata calls
// wont bother rewriting and adjusting the start and end for metadata calls
case lp: MetadataQueryPlan => lp
case lp: TsCardinalities => lp
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,23 +118,23 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
// MultiPartitionPlanner has capability to stitch across time partitions, however, the logic is mostly broken
// and not well tested. The logic below would not work well for any kind of subquery since their actual
// start and ends are different from the start/end parameter of the query context. If we are to implement
// stitching across time, we need to pass proper parameters to getPartitions() call
// stitching across time, we need to to pass proper parameters to getPartitions() call
if (forceInProcess) {
// If InProcess is required, we will rely on the DefaultPlanner's implementation as the expectation is that the
// If inprocess is required, we will rely on the DefaultPlanner's implementation as the expectation is that the
// raw series is doing a remote call to get all the data.
logicalPlan match {
case lp: RawSeries if lp.supportsRemoteDataCall =>
case lp: RawSeries if lp.supportsRemoteDataCall=>
val params = qContext.origQueryParams.asInstanceOf[PromQlQueryParams]
val rs = lp.rangeSelector.asInstanceOf[IntervalSelector]

val (rawExportStart, rawExportEnd) =
(rs.from - lp.offsetMs.getOrElse(0L) - lp.lookbackMs.getOrElse(0L), rs.to - lp.offsetMs.getOrElse(0L))

val partitions = getPartitions(lp, params)
assert(partitions.nonEmpty, s"Unexpected to see partitions empty for logicalPlan=$lp and param=$params")
val partition = getPartitions(lp, params)
assert(partition.nonEmpty, s"Unexpected to see partitions empty for logicalPlan=$lp and param=$params")
// Raw export from both involved partitions for the entire time duration as the shard-key migration
// is not guaranteed to happen exactly at the time of split
val execPlans = partitions.map(pa => {
val execPlans = partition.map(pa => {
val (thisPartitionStartMs, thisPartitionEndMs) = (rawExportStart, rawExportEnd)
val timeRangeOverride = TimeRange(thisPartitionEndMs, thisPartitionEndMs)
val totalOffsetThisPartitionMs = thisPartitionEndMs - thisPartitionStartMs
Expand All @@ -155,7 +155,7 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
, inProcessPlanDispatcher, None,
execPlans.sortWith((x, _) => !x.isInstanceOf[PromQlRemoteExec]),
enableApproximatelyEqualCheck = queryConfig.routingConfig.enableApproximatelyEqualCheckInStitch)
}
}
)
)
case _ : LogicalPlan => super.defaultWalkLogicalPlanTree(logicalPlan, qContext, forceInProcess)
Expand Down Expand Up @@ -191,7 +191,8 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
generateRemoteExecParams(qContext, startMs, endMs)
}
// Single partition but remote, send the entire plan remotely
if (isPartitionEnabledForGrpc(partitionName, grpcEndpoint)) {
if (grpcEndpoint.isDefined && !(queryConfig.grpcPartitionsDenyList.contains("*") ||
queryConfig.grpcPartitionsDenyList.contains(partitionName.toLowerCase))) {
val endpoint = grpcEndpoint.get
val channel = channels.getOrElseUpdate(endpoint, GrpcCommonUtils.buildChannelFromEndpoint(endpoint))
PromQLGrpcRemoteExec(channel, remoteHttpTimeoutMs, remoteContext, inProcessPlanDispatcher,
Expand Down Expand Up @@ -387,17 +388,6 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
PlanResult(execPlan:: Nil)
}

/**
* @param partitionName partition name
* @param grpcEndPoint grpc endpoint
* @return true if partition is enabled with an grpc endpoint
*/
private def isPartitionEnabledForGrpc(partitionName: String, grpcEndPoint: Option[String]): Boolean = {
grpcEndPoint.isDefined &&
!(queryConfig.grpcPartitionsDenyList.contains("*") ||
queryConfig.grpcPartitionsDenyList.contains(partitionName.toLowerCase))
}

/**
* If the argument partition is local, materialize the LogicalPlan with the local planner.
* Otherwise, create a PromQlRemoteExec.
Expand Down Expand Up @@ -425,7 +415,9 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
localPartitionPlanner.materialize(lpWithUpdatedTime, qContextWithOverride)
} else {
val ctx = generateRemoteExecParams(qContextWithOverride, timeRange.startMs, timeRange.endMs)
if (isPartitionEnabledForGrpc(partitionName, grpcEndpoint)) {
if (grpcEndpoint.isDefined &&
!(queryConfig.grpcPartitionsDenyList.contains("*") ||
queryConfig.grpcPartitionsDenyList.contains(partitionName.toLowerCase))) {
val channel = channels.getOrElseUpdate(grpcEndpoint.get,
GrpcCommonUtils.buildChannelFromEndpoint(grpcEndpoint.get))
PromQLGrpcRemoteExec(channel, remoteHttpTimeoutMs, ctx, inProcessPlanDispatcher,
Expand Down Expand Up @@ -594,10 +586,10 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
val assignmentRanges = getAssignmentQueryRanges(partitions, timeRange,
lookbackMs = lookbackMs, offsetMs = offsetMs, stepMsOpt = stepMsOpt)
val execPlans = if (assignmentRanges.isEmpty) {
// Assignment ranges empty means we can't run this query fully on one partition and needs
// remote raw export. Check if the total time of raw export is within the limits, if not return Empty result.
// Assignment ranges empty means we cant run this query fully on one partition and needs
// remote raw export Check if the total time of raw export is within the limits, if not return Empty result
// While it may seem we don't tune the lookback of the leaf raw queries to exactly what we need from each
// partition, in reality it doesn't matter as despite a longer lookback, the actual data exported will be at most
// partition, in reality it doesnt matter as despite a longer lookback, the actual data exported will be at most
// what that partition contains.
val (startTime, endTime) = (qParams.startSecs, qParams.endSecs)
val totalExpectedRawExport = (endTime - startTime) + lookbackMs + offsetMs.max
Expand All @@ -617,8 +609,7 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
}
Seq(EmptyResultExec(qContext, dataset.ref, inProcessPlanDispatcher))
}
}
else {
} else {
// materialize a plan for each range/assignment pair
val (_, execPlans) = assignmentRanges.foldLeft(
(None: Option[(PartitionAssignment, TimeRange)], ListBuffer.empty[ExecPlan])) {
Expand Down Expand Up @@ -661,13 +652,14 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
// leaf logical plan) with supportsRemoteDataCall = true figure out if this range can entirely be selected
// from partition p1 or p2
//

// Do not perform raw exports if the export is beyond a certain value for example
// foo{}[10d] or foo[2d] offset 8d both will export 10 days of raw data which might cause heap pressure
// and OOMs. The max cross partition raw export config can control such queries from bring the process
// down but simpler queries with few minutes or even hour or two of lookback/offset will continue to work
// seamlessly with no data gaps
// Note that at the moment, while planning, we only can look at what's the max time range we can support.
// We still don't have capabilities to check the expected number of timeseries scanned or bytes scanned
// Note that at the moment, while planning, we only can look at whats the max time range we can support.
// We still dont have capabilities to check the expected number of timeseries scanned or bytes scanned
// and adding capabilities to give up a "part" of query execution if the runtime number of bytes of ts
// scanned goes high isn't available. To start with the time range scanned as a static configuration will
// be good enough and can be enhanced in future as required.
Expand Down
Loading

0 comments on commit 42f5072

Please sign in to comment.