Skip to content

Commit

Permalink
Revert "Revert "Merge remote-tracking branch 'upstream/develop' into …
Browse files Browse the repository at this point in the history
…develop""

This reverts commit f63de91.
  • Loading branch information
Brian-Yu committed Jan 16, 2025
1 parent f63de91 commit 276ad42
Show file tree
Hide file tree
Showing 22 changed files with 793 additions and 277 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/scala.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ on:
jobs:
test:

runs-on: ubuntu-latest
runs-on: ubuntu-22.04

steps:
- name: Checkout
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package filodb.coordinator

import com.typesafe.scalalogging.StrictLogging
import io.grpc.Metadata
import io.grpc.stub.{MetadataUtils, StreamObserver}
import java.net.InetAddress
Expand Down Expand Up @@ -29,7 +28,7 @@ object GrpcPlanDispatcher {
Runtime.getRuntime.addShutdownHook(new Thread(() => channelMap.values.foreach(_.shutdown())))
}

case class GrpcPlanDispatcher(endpoint: String, requestTimeoutMs: Long) extends PlanDispatcher with StrictLogging {
case class GrpcPlanDispatcher(endpoint: String, requestTimeoutMs: Long) extends PlanDispatcher {

val clusterName = InetAddress.getLocalHost().getHostName()

Expand Down Expand Up @@ -67,8 +66,6 @@ case class GrpcPlanDispatcher(endpoint: String, requestTimeoutMs: Long) extends
val genericRemoteExec = plan.execPlan.asInstanceOf[GenericRemoteExec]
import filodb.coordinator.ProtoConverters._
val protoPlan = genericRemoteExec.execPlan.toExecPlanContainerProto
logger.debug(s"Query ${plan.execPlan.queryContext.queryId} proto plan size is ${protoPlan.toByteArray.length}B")
logger.debug(s"Query ${plan.execPlan.queryContext.queryId} exec plan ${genericRemoteExec.execPlan.printTree()}")
val queryContextProto = genericRemoteExec.execPlan.queryContext.toProto
val remoteExecPlan : RemoteExecPlan = RemoteExecPlan.newBuilder()
.setExecPlan(protoPlan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,20 @@ object NodeClusterActor {
final case class DatasetUnknown(ref: DatasetRef) extends ErrorResponse
final case class BadSchema(message: String) extends ErrorResponse
final case class BadData(message: String) extends ErrorResponse
final case class InternalServiceError(message: String) extends ErrorResponse

// Cluste state info commands
// Cluster state info commands
// Returns a Seq[DatasetRef]
case object ListRegisteredDatasets
// Returns CurrentShardSnapshot or DatasetUnknown
final case class GetShardMap(ref: DatasetRef)

/**
* @param ref compressed ShardMap information for sending over the wire.
* IMPORTANT: Only works with ClusteringV2 shard assignment strategy.
*/
final case class GetShardMapV2(ref: DatasetRef)

/** Registers sending actor to receive `ShardMapUpdate` whenever it changes. DeathWatch
* will be used on the sending actors to watch for updates. On subscribe, will
* immediately send back the current state via a `ShardMapUpdate` message.
Expand Down
24 changes: 24 additions & 0 deletions coordinator/src/main/scala/filodb.coordinator/ShardStatus.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,30 @@ sealed trait ShardAction extends Serializable
final case class CurrentShardSnapshot(ref: DatasetRef,
map: ShardMapper) extends ShardAction with Response

/**
* Optimized form of the ShardMapper state representation.
* NOTE: It doesn't track the shard status updates from coordinator or Ingestion actors. It is just
* a wrapper which compresses the response of ShardMapper state to reduce network transmission costs.
*
* @param nodeCountInCluster Number of replicas in the filodb cluster
* @param numShards Number of shards in the filodb cluster
* @param k8sHostFormat K8s host format. Valid ONLY for ClusterV2 shard assignment strategy
* @param shardState ByteArray. Each bit of the byte represents the shard status.
* For example: lets say we have 4 shards with following status:
* Seq[ShardStatusAssigned, ShardStatusRecovery, ShardStatusAssigned, ShardStatusAssigned]
* Then the shardState would be an array of single byte whose bit representation is - 1000 0000
* Explanation - corresponding bit is set to 1 if the shard is assigned, else 0
*/
final case class ShardMapperV2(nodeCountInCluster: Int, numShards: Int, k8sHostFormat: String,
shardState: Array[Byte])

/**
* Response to GetShardMapV2 request. Uses the optimized ShardMapperV2 representation. Only applicable
* for ClusterV2 shard assignment strategy.
* @param map ShardMapperV2
*/
final case class ShardSnapshot(map: ShardMapperV2) extends ShardAction with Response

/**
* Full state of all shards, sent to all ingestion actors. They react by starting/stopping
* ingestion for the shards they own or no longer own. The version is expected to be global
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,20 @@ trait ClusterOps extends ClientBase with StrictLogging {
}
}
}

/**
* ShardMapperV2 is an optimization of the response size over the ShardMapper and GetShardMap ask call
* @return Some(ShardMapperV2) if the dataset is registered, None if dataset not found
*/
def getShardMapperV2(dataset: DatasetRef, v2Enabled: Boolean,
timeout: FiniteDuration = 30.seconds): Option[ShardMapperV2] = {
require(v2Enabled, s"ClusterV2 ShardAssignment is must for this operation")
val actor = Some(nodeCoordinator)
actor.flatMap { ref =>
Client.actorAsk(ref, GetShardMapV2(dataset), timeout) {
case ShardSnapshot(shardMapperV2) => Some(shardMapperV2)
case _ => None
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import filodb.query.LogicalPlan._
import filodb.query.exec._
import filodb.query.exec.InternalRangeFunction.Last


//scalastyle:off file.size.limit
/**
* Intermediate Plan Result includes the exec plan(s) along with any state to be passed up the
* plan building call tree during query planning.
Expand Down Expand Up @@ -122,10 +122,8 @@ trait DefaultPlanner {
} else lp

val series = walkLogicalPlanTree(logicalPlanWithoutBucket.series, qContext, forceInProcess)
val rawSource = logicalPlanWithoutBucket.series.isRaw && (logicalPlanWithoutBucket.series match {
case r: RawSeries => !r.supportsRemoteDataCall
case _ => true
}) // the series is raw and supports raw export, its going to yield an iterator
// the series is raw and supports raw export, it's going to yield an iterator
val rawSource = logicalPlanWithoutBucket.series.isRaw

/* Last function is used to get the latest value in the window for absent_over_time
If no data is present AbsentFunctionMapper will return range vector with value 1 */
Expand Down Expand Up @@ -203,10 +201,7 @@ trait DefaultPlanner {
} else (None, None, lp)

val rawSeries = walkLogicalPlanTree(lpWithoutBucket.rawSeries, qContext, forceInProcess)
val rawSource = lpWithoutBucket.rawSeries.isRaw && (lpWithoutBucket.rawSeries match {
case r: RawSeries => !r.supportsRemoteDataCall
case _ => true
})
val rawSource = lpWithoutBucket.rawSeries.isRaw
rawSeries.plans.foreach(_.addRangeVectorTransformer(PeriodicSamplesMapper(lp.startMs, lp.stepMs, lp.endMs,
window = None, functionId = None,
stepMultipleNotationUsed = false, funcParams = Nil,
Expand Down Expand Up @@ -234,11 +229,17 @@ trait DefaultPlanner {
lp: ApplyMiscellaneousFunction,
forceInProcess: Boolean = false): PlanResult = {
val vectors = walkLogicalPlanTree(lp.vectors, qContext, forceInProcess)
if (lp.function == MiscellaneousFunctionId.HistToPromVectors)
vectors.plans.foreach(_.addRangeVectorTransformer(HistToPromSeriesMapper(schemas.part)))
else
vectors.plans.foreach(_.addRangeVectorTransformer(MiscellaneousFunctionMapper(lp.function, lp.stringArgs)))
vectors
if (lp.function == MiscellaneousFunctionId.OptimizeWithAgg) {
// Optimize with aggregation is a no-op, doing no transformation. It must pass through
// the execution plan to apply optimization logic correctly during aggregation.
vectors
} else {
if (lp.function == MiscellaneousFunctionId.HistToPromVectors)
vectors.plans.foreach(_.addRangeVectorTransformer(HistToPromSeriesMapper(schemas.part)))
else
vectors.plans.foreach(_.addRangeVectorTransformer(MiscellaneousFunctionMapper(lp.function, lp.stringArgs)))
vectors
}
}

def materializeApplyInstantFunctionRaw(qContext: QueryContext,
Expand Down Expand Up @@ -826,7 +827,7 @@ object PlannerUtil extends StrictLogging {
rewritePlanWithRemoteRawExport(_, rangeSelector, additionalLookbackMs).asInstanceOf[FunctionArgsPlan]),
series = rewritePlanWithRemoteRawExport(lp.series, rangeSelector, additionalLookbackMs)
.asInstanceOf[RawSeriesLikePlan])
// wont bother rewriting and adjusting the start and end for metadata calls
// won't bother rewriting and adjusting the start and end for metadata calls
case lp: MetadataQueryPlan => lp
case lp: TsCardinalities => lp
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,23 +118,23 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
// MultiPartitionPlanner has capability to stitch across time partitions, however, the logic is mostly broken
// and not well tested. The logic below would not work well for any kind of subquery since their actual
// start and ends are different from the start/end parameter of the query context. If we are to implement
// stitching across time, we need to to pass proper parameters to getPartitions() call
// stitching across time, we need to pass proper parameters to getPartitions() call
if (forceInProcess) {
// If inprocess is required, we will rely on the DefaultPlanner's implementation as the expectation is that the
// If InProcess is required, we will rely on the DefaultPlanner's implementation as the expectation is that the
// raw series is doing a remote call to get all the data.
logicalPlan match {
case lp: RawSeries if lp.supportsRemoteDataCall=>
case lp: RawSeries if lp.supportsRemoteDataCall =>
val params = qContext.origQueryParams.asInstanceOf[PromQlQueryParams]
val rs = lp.rangeSelector.asInstanceOf[IntervalSelector]

val (rawExportStart, rawExportEnd) =
(rs.from - lp.offsetMs.getOrElse(0L) - lp.lookbackMs.getOrElse(0L), rs.to - lp.offsetMs.getOrElse(0L))

val partition = getPartitions(lp, params)
assert(partition.nonEmpty, s"Unexpected to see partitions empty for logicalPlan=$lp and param=$params")
val partitions = getPartitions(lp, params)
assert(partitions.nonEmpty, s"Unexpected to see partitions empty for logicalPlan=$lp and param=$params")
// Raw export from both involved partitions for the entire time duration as the shard-key migration
// is not guaranteed to happen exactly at the time of split
val execPlans = partition.map(pa => {
val execPlans = partitions.map(pa => {
val (thisPartitionStartMs, thisPartitionEndMs) = (rawExportStart, rawExportEnd)
val timeRangeOverride = TimeRange(thisPartitionEndMs, thisPartitionEndMs)
val totalOffsetThisPartitionMs = thisPartitionEndMs - thisPartitionStartMs
Expand All @@ -155,7 +155,7 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
, inProcessPlanDispatcher, None,
execPlans.sortWith((x, _) => !x.isInstanceOf[PromQlRemoteExec]),
enableApproximatelyEqualCheck = queryConfig.routingConfig.enableApproximatelyEqualCheckInStitch)
}
}
)
)
case _ : LogicalPlan => super.defaultWalkLogicalPlanTree(logicalPlan, qContext, forceInProcess)
Expand Down Expand Up @@ -191,8 +191,7 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
generateRemoteExecParams(qContext, startMs, endMs)
}
// Single partition but remote, send the entire plan remotely
if (grpcEndpoint.isDefined && !(queryConfig.grpcPartitionsDenyList.contains("*") ||
queryConfig.grpcPartitionsDenyList.contains(partitionName.toLowerCase))) {
if (isPartitionEnabledForGrpc(partitionName, grpcEndpoint)) {
val endpoint = grpcEndpoint.get
val channel = channels.getOrElseUpdate(endpoint, GrpcCommonUtils.buildChannelFromEndpoint(endpoint))
PromQLGrpcRemoteExec(channel, remoteHttpTimeoutMs, remoteContext, inProcessPlanDispatcher,
Expand Down Expand Up @@ -388,6 +387,17 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
PlanResult(execPlan:: Nil)
}

/**
* @param partitionName partition name
* @param grpcEndPoint grpc endpoint
* @return true if partition is enabled with an grpc endpoint
*/
private def isPartitionEnabledForGrpc(partitionName: String, grpcEndPoint: Option[String]): Boolean = {
grpcEndPoint.isDefined &&
!(queryConfig.grpcPartitionsDenyList.contains("*") ||
queryConfig.grpcPartitionsDenyList.contains(partitionName.toLowerCase))
}

/**
* If the argument partition is local, materialize the LogicalPlan with the local planner.
* Otherwise, create a PromQlRemoteExec.
Expand Down Expand Up @@ -415,9 +425,7 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
localPartitionPlanner.materialize(lpWithUpdatedTime, qContextWithOverride)
} else {
val ctx = generateRemoteExecParams(qContextWithOverride, timeRange.startMs, timeRange.endMs)
if (grpcEndpoint.isDefined &&
!(queryConfig.grpcPartitionsDenyList.contains("*") ||
queryConfig.grpcPartitionsDenyList.contains(partitionName.toLowerCase))) {
if (isPartitionEnabledForGrpc(partitionName, grpcEndpoint)) {
val channel = channels.getOrElseUpdate(grpcEndpoint.get,
GrpcCommonUtils.buildChannelFromEndpoint(grpcEndpoint.get))
PromQLGrpcRemoteExec(channel, remoteHttpTimeoutMs, ctx, inProcessPlanDispatcher,
Expand Down Expand Up @@ -586,10 +594,10 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
val assignmentRanges = getAssignmentQueryRanges(partitions, timeRange,
lookbackMs = lookbackMs, offsetMs = offsetMs, stepMsOpt = stepMsOpt)
val execPlans = if (assignmentRanges.isEmpty) {
// Assignment ranges empty means we cant run this query fully on one partition and needs
// remote raw export Check if the total time of raw export is within the limits, if not return Empty result
// Assignment ranges empty means we can't run this query fully on one partition and needs
// remote raw export. Check if the total time of raw export is within the limits, if not return Empty result.
// While it may seem we don't tune the lookback of the leaf raw queries to exactly what we need from each
// partition, in reality it doesnt matter as despite a longer lookback, the actual data exported will be at most
// partition, in reality it doesn't matter as despite a longer lookback, the actual data exported will be at most
// what that partition contains.
val (startTime, endTime) = (qParams.startSecs, qParams.endSecs)
val totalExpectedRawExport = (endTime - startTime) + lookbackMs + offsetMs.max
Expand All @@ -609,7 +617,8 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
}
Seq(EmptyResultExec(qContext, dataset.ref, inProcessPlanDispatcher))
}
} else {
}
else {
// materialize a plan for each range/assignment pair
val (_, execPlans) = assignmentRanges.foldLeft(
(None: Option[(PartitionAssignment, TimeRange)], ListBuffer.empty[ExecPlan])) {
Expand Down Expand Up @@ -652,14 +661,13 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
// leaf logical plan) with supportsRemoteDataCall = true figure out if this range can entirely be selected
// from partition p1 or p2
//

// Do not perform raw exports if the export is beyond a certain value for example
// foo{}[10d] or foo[2d] offset 8d both will export 10 days of raw data which might cause heap pressure
// and OOMs. The max cross partition raw export config can control such queries from bring the process
// down but simpler queries with few minutes or even hour or two of lookback/offset will continue to work
// seamlessly with no data gaps
// Note that at the moment, while planning, we only can look at whats the max time range we can support.
// We still dont have capabilities to check the expected number of timeseries scanned or bytes scanned
// Note that at the moment, while planning, we only can look at what's the max time range we can support.
// We still don't have capabilities to check the expected number of timeseries scanned or bytes scanned
// and adding capabilities to give up a "part" of query execution if the runtime number of bytes of ts
// scanned goes high isn't available. To start with the time range scanned as a static configuration will
// be good enough and can be enhanced in future as required.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1075,6 +1075,8 @@ object ProtoConverters {
case filodb.query.MiscellaneousFunctionId.LabelJoin => GrpcMultiPartitionQueryService.MiscellaneousFunctionId.LABEL_JOIN
case filodb.query.MiscellaneousFunctionId.HistToPromVectors =>
GrpcMultiPartitionQueryService.MiscellaneousFunctionId.HIST_TO_PROM_VECTORS
case filodb.query.MiscellaneousFunctionId.OptimizeWithAgg =>
GrpcMultiPartitionQueryService.MiscellaneousFunctionId.OPTIMIZE_WITH_AGG
}
function
}
Expand All @@ -1087,6 +1089,8 @@ object ProtoConverters {
case GrpcMultiPartitionQueryService.MiscellaneousFunctionId.LABEL_JOIN => filodb.query.MiscellaneousFunctionId.LabelJoin
case GrpcMultiPartitionQueryService.MiscellaneousFunctionId.HIST_TO_PROM_VECTORS =>
filodb.query.MiscellaneousFunctionId.HistToPromVectors
case GrpcMultiPartitionQueryService.MiscellaneousFunctionId.OPTIMIZE_WITH_AGG =>
filodb.query.MiscellaneousFunctionId.OptimizeWithAgg
case GrpcMultiPartitionQueryService.MiscellaneousFunctionId.UNRECOGNIZED =>
throw new IllegalArgumentException(s"Unrecognized MiscellaneousFunctionId ${f}")
}
Expand Down
Loading

0 comments on commit 276ad42

Please sign in to comment.