Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge integration to main. #1935

Merged
merged 11 commits into from
Jan 22, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,6 @@ class HighAvailabilityPlanner(dsRef: DatasetRef,
if (execPlans.size == 1) execPlans.head
else stitchPlans(rootLogicalPlan, execPlans, qContext)
}
//scalastyle:on method.length


override def materialize(logicalPlan: LogicalPlan, qContext: QueryContext): ExecPlan = {
val origQueryParams = qContext.origQueryParams
Expand Down Expand Up @@ -216,17 +214,23 @@ class HighAvailabilityPlanner(dsRef: DatasetRef,
// we need to populate planner params with the shard maps
val localActiveShardMapper = getLocalActiveShardMapper(qContext.plannerParams)
val remoteActiveShardMapper = getRemoteActiveShardMapper(qContext.plannerParams)
val plannerParams = qContext.plannerParams.copy(
localShardMapper = Some(localActiveShardMapper),
buddyShardMapper = Some(remoteActiveShardMapper)
)
val q = qContext.copy(plannerParams = plannerParams)
materializeShardLevelFailover(logicalPlan, q)
val shardLevelFailoverIsNeeded =
(!localActiveShardMapper.allShardsActive) && (!remoteActiveShardMapper.allShardsActive)
if (shardLevelFailoverIsNeeded) {
val plannerParams = qContext.plannerParams.copy(
localShardMapper = Some(localActiveShardMapper),
buddyShardMapper = Some(remoteActiveShardMapper)
)
val q = qContext.copy(plannerParams = plannerParams)
materializeShardLevelFailover(logicalPlan, q)
} else {
materializeLegacy(logicalPlan, qContext)
}
} else {
materializeLegacy(logicalPlan, qContext)
}

}
//scalastyle:on method.length

def materializeLegacy(logicalPlan: LogicalPlan, qContext: QueryContext): ExecPlan = {
// lazy because we want to fetch failures only if needed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1911,6 +1911,25 @@ object ProtoConverters {
}
}

// RepeatTransformer
implicit class RepeatTransformerToProtoConverter(rpt: RepeatTransformer) {
def toProto: GrpcMultiPartitionQueryService.RepeatTransformer = {
val builder = GrpcMultiPartitionQueryService.RepeatTransformer.newBuilder()
builder.setStartMs(rpt.startMs)
builder.setStepMs(rpt.stepMs)
builder.setEndMs(rpt.endMs)
builder.setExecPlan(rpt.execPlan)
builder.build()
}
}

implicit class RepeatTransformerFromProtoConverter(rpt: GrpcMultiPartitionQueryService.RepeatTransformer) {
def fromProto(): RepeatTransformer = {
// RepeatTransformer(0, 0, 0, "")
RepeatTransformer(rpt.getStartMs, rpt.getStepMs, rpt.getEndMs, rpt.getExecPlan)
}
}


implicit class RangeVectorTransformerToProtoConverter(rangeVectorTransformer: RangeVectorTransformer) {
def toProto(): GrpcMultiPartitionQueryService.RangeVectorTransformerContainer = {
Expand All @@ -1931,6 +1950,7 @@ object ProtoConverters {
case vfm: VectorFunctionMapper => b.setVectorFunctionMapper(vfm.toProto).build()
case ap: AggregatePresenter => b.setAggregatePresenter(ap.toProto).build()
case afm: AbsentFunctionMapper => b.setAbsentFunctionMapper(afm.toProto).build()
case rpt: RepeatTransformer => b.setRepeatTransformer(rpt.toProto).build()
case _ => throw new IllegalArgumentException("Unexpected Range Vector Transformer")
}
}
Expand All @@ -1955,6 +1975,7 @@ object ProtoConverters {
case RangeVectorTransfomerCase.VECTORFUNCTIONMAPPER => rvtc.getVectorFunctionMapper().fromProto
case RangeVectorTransfomerCase.AGGREGATEPRESENTER => rvtc.getAggregatePresenter().fromProto
case RangeVectorTransfomerCase.ABSENTFUNCTIONMAPPER => rvtc.getAbsentFunctionMapper().fromProto
case RangeVectorTransfomerCase.REPEATTRANSFORMER => rvtc.getRepeatTransformer().fromProto
case RangeVectorTransfomerCase.RANGEVECTORTRANSFOMER_NOT_SET =>
throw new IllegalArgumentException("Unexpected Range Vector Transformer")
}
Expand Down
27 changes: 13 additions & 14 deletions core/src/main/scala/filodb.core/query/RangeVector.scala
Original file line number Diff line number Diff line change
Expand Up @@ -296,11 +296,11 @@ object RepeatValueVector extends StrictLogging {
val startNs = Utils.currentThreadCpuTimeNanos
try {
ChunkMap.validateNoSharedLocks(execPlan)
Using(rv.rows()){
Using.resource(rv.rows()){
rows =>
val nextRow = if (rows.hasNext) Some(rows.next()) else None
new RepeatValueVector(rv.key, startMs, stepMs, endMs, nextRow, schema)
}.get
}
} finally {
ChunkMap.releaseAllSharedLocks()
queryStats.getCpuNanosCounter(Nil).addAndGet(Utils.currentThreadCpuTimeNanos - startNs)
Expand Down Expand Up @@ -573,20 +573,19 @@ object SerializedRangeVector extends StrictLogging {
val startRecordNo = oldContainerOpt.map(_.numRecords).getOrElse(0)
try {
ChunkMap.validateNoSharedLocks(execPlan)
val rows = rv.rows
while (rows.hasNext) {
val nextRow = rows.next()
// Don't encode empty / NaN data over the wire
if (!canRemoveEmptyRows(rv.outputRange, schema) ||
schema.columns(1).colType == DoubleColumn && !java.lang.Double.isNaN(nextRow.getDouble(1)) ||
schema.columns(1).colType == HistogramColumn && !nextRow.getHistogram(1).isEmpty) {
numRows += 1
builder.addFromReader(nextRow, schema, 0)
}
Using.resource(rv.rows()) {
rows => while (rows.hasNext) {
val nextRow = rows.next()
// Don't encode empty / NaN data over the wire
if (!canRemoveEmptyRows(rv.outputRange, schema) ||
schema.columns(1).colType == DoubleColumn && !java.lang.Double.isNaN(nextRow.getDouble(1)) ||
schema.columns(1).colType == HistogramColumn && !nextRow.getHistogram(1).isEmpty) {
numRows += 1
builder.addFromReader(nextRow, schema, 0)
}
}
}
} finally {
rv.rows().close()
// clear exec plan
// When the query is done, clean up lingering shared locks caused by iterator limit.
ChunkMap.releaseAllSharedLocks()
}
Expand Down
8 changes: 8 additions & 0 deletions grpc/src/main/protobuf/query_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,13 @@ message AbsentFunctionMapper {
string metricColumn = 3;
}

message RepeatTransformer {
int64 startMs = 1;
int64 stepMs = 2;
int64 endMs = 3;
string execPlan = 4;
}

message RangeVectorTransformerContainer {
oneof rangeVectorTransfomer {
StitchRvsMapper stitchRvsMapper = 1;
Expand All @@ -793,6 +800,7 @@ message RangeVectorTransformerContainer {
VectorFunctionMapper vectorFunctionMapper = 13;
AggregatePresenter aggregatePresenter = 14;
AbsentFunctionMapper AbsentFunctionMapper = 15;
RepeatTransformer repeatTransformer = 16;
}
}

Expand Down
25 changes: 23 additions & 2 deletions query/src/main/scala/filodb/query/exec/AggrOverRangeVectors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,30 @@ final case class LocalPartitionReduceAggregateExec(queryContext: QueryContext,
override def reduceSchemas(r1: ResultSchema, r2: ResultSchema): ResultSchema = {
if (r1.isEmpty) r2
else if (r2.isEmpty) r1
else if (r1 == r2 && (!r1.hasSameColumnsAs(r2)) || r1.fixedVectorLen != r2.fixedVectorLen) {
else if (!r1.hasSameColumnsAs(r2)) {
throw SchemaMismatch(r1.toString, r2.toString, getClass.getSimpleName)
} else r1
} else if (r1.fixedVectorLen != r2.fixedVectorLen) {
// Because this change, this multi-partition case regarding stitch will cause schema mismatch.

// -LocalPartitionReduceAggregateExec (Mismatch because 7 != 1)
// --StitchRvsExec ===> Schema is reduced to (Schema(fixedVectorLen=1))
// ---raw (Schema=None)
// ---downsample (Schema(fixedVectorLen=1))
// --StitchRvsExec(Remote partition) ==> Schema is reduced to(Schema(fixedVectorLen=7))
// ---raw (Schema(fixedVectorLen=7))
// ---downsample (Schema=None)

// The schema reduction is like
// Reduce(Reduce(Schema(fixedVectorLen=1)), None), Reduce(Reduce(Schema(fixedVectorLen=7)), None))
// => Reduce(Schema(fixedVectorLen=1)), Schema(fixedVectorLen=7)))
// => Mismatch.

// r1 and r2 should have the same fixed length. They could be different due to non-existing points.
// Use the larger fixedVectorLen as the result fixedVectorLen.
// This is to handle the special cases regarding the reduction of stitch plans.
r1.copy(fixedVectorLen = Some(Math.max(r1.fixedVectorLen.getOrElse(0), r2.fixedVectorLen.getOrElse(0))))
}
else r1
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package filodb.query.exec.aggregator

import scala.collection.mutable
import scala.util.Using

import filodb.core.Utils
import filodb.core.binaryrecord2.RecordBuilder
Expand Down Expand Up @@ -94,21 +95,22 @@ class CountValuesRowAggregator(label: String, limit: Int = 1000) extends RowAggr
FiloSchedulers.assertThreadName(QuerySchedName)
// aggRangeVector.rows.take below triggers the ChunkInfoIterator which requires lock/release
ChunkMap.validateNoSharedLocks(s"CountValues-$label")
aggRangeVector.rows.take(limit).foreach { row =>
val rowMap = CountValuesSerDeser.deserialize(row.getBlobBase(1),
row.getBlobNumBytes(1), row.getBlobOffset(1))
rowMap.foreach { (k, v) =>
val rvk = CustomRangeVectorKey(aggRangeVector.key.labelValues +
(label.utf8 -> k.toString.utf8))
val builder = resRvs.getOrElseUpdate(rvk, SerializedRangeVector.newBuilder())
builder.startNewRecord(recSchema)
builder.addLong(row.getLong(0))
builder.addDouble(v)
builder.endRecord()
Using.resource(aggRangeVector.rows()) {
rows => rows.take(limit).foreach { row =>
val rowMap = CountValuesSerDeser.deserialize(row.getBlobBase(1),
row.getBlobNumBytes(1), row.getBlobOffset(1))
rowMap.foreach { (k, v) =>
val rvk = CustomRangeVectorKey(aggRangeVector.key.labelValues +
(label.utf8 -> k.toString.utf8))
val builder = resRvs.getOrElseUpdate(rvk, SerializedRangeVector.newBuilder())
builder.startNewRecord(recSchema)
builder.addLong(row.getLong(0))
builder.addDouble(v)
builder.endRecord()
}
}
}
}
} finally {
aggRangeVector.rows.close()
ChunkMap.releaseAllSharedLocks()
}
resRvs.map { case (key, builder) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import java.util.concurrent.TimeUnit

import scala.collection.{mutable, Iterator}
import scala.collection.mutable.ListBuffer
import scala.util.Using

import com.typesafe.scalalogging.StrictLogging

Expand Down Expand Up @@ -125,33 +126,35 @@ class TopBottomKRowAggregator(k: Int, bottomK: Boolean) extends RowAggregator wi
FiloSchedulers.assertThreadName(QuerySchedName)
ChunkMap.validateNoSharedLocks(s"TopkQuery-$k-$bottomK")
// We limit the results wherever it is materialized first. So it is done here.
val rows = aggRangeVector.rows.take(limit)
val it = Iterator.from(0, rangeParams.stepSecs.toInt)
.takeWhile(_ <= (rangeParams.endSecs - rangeParams.startSecs)).map { t =>
val timestamp = t + rangeParams.startSecs
val rvkSeen = new ListBuffer[RangeVectorKey]
if (rows.hasNext) {
val row = rows.next()
var i = 1
while (row.notNull(i)) {
if (row.filoUTF8String(i) != CustomRangeVectorKey.emptyAsZcUtf8) {
val key = row.filoUTF8String(i)
val rvk = CustomRangeVectorKey.fromZcUtf8(key)
rvkSeen += rvk
val builder = resRvs.getOrElseUpdate(rvk, createBuilder(rangeParams, timestamp))
addRecordToBuilder(builder, TimeUnit.SECONDS.toMillis(timestamp), row.getDouble(i + 1))
Using.resource(aggRangeVector.rows()) {
rs => val rows = rs.take(limit)
val it = Iterator.from(0, rangeParams.stepSecs.toInt)
.takeWhile(_ <= (rangeParams.endSecs - rangeParams.startSecs)).map { t =>
val timestamp = t + rangeParams.startSecs
val rvkSeen = new ListBuffer[RangeVectorKey]
if (rows.hasNext) {
val row = rows.next()
var i = 1
while (row.notNull(i)) {
if (row.filoUTF8String(i) != CustomRangeVectorKey.emptyAsZcUtf8) {
val key = row.filoUTF8String(i)
val rvk = CustomRangeVectorKey.fromZcUtf8(key)
rvkSeen += rvk
val builder = resRvs.getOrElseUpdate(rvk, createBuilder(rangeParams, timestamp))
addRecordToBuilder(builder, TimeUnit.SECONDS.toMillis(timestamp), row.getDouble(i + 1))
}
i += 2
}
resRvs.keySet.foreach { rvs =>
if (!rvkSeen.contains(rvs)) addRecordToBuilder(resRvs(rvs), timestamp * 1000, Double.NaN)
}
i += 2
}
resRvs.keySet.foreach { rvs =>
if (!rvkSeen.contains(rvs)) addRecordToBuilder(resRvs(rvs), timestamp * 1000, Double.NaN)
}
}
// address step == 0 case
if (rangeParams.startSecs == rangeParams.endSecs || rangeParams.stepSecs == 0)
it.take(1).toList else it.toList
}
// address step == 0 case
if (rangeParams.startSecs == rangeParams.endSecs || rangeParams.stepSecs == 0) it.take(1).toList else it.toList
} finally {
aggRangeVector.rows().close()
ChunkMap.releaseAllSharedLocks()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import filodb.core.query._
import filodb.core.store.{AllChunkScan, ChunkSource, InMemoryMetaStore, NullColumnStore, TimeRangeChunkScan}
import filodb.core.{DatasetRef, GlobalConfig, TestData}
import filodb.memory.MemFactory
import filodb.memory.format.{SeqRowReader, ZeroCopyUTF8String}
import filodb.memory.format.{SeqRowReader, UnsafeUtils, ZeroCopyUTF8String}
import filodb.query._
import monix.eval.Task
import monix.execution.Scheduler
Expand Down Expand Up @@ -176,4 +176,30 @@ class LocalPartitionDistConcatExecSpec extends AnyFunSpec with Matchers with Sca
result.resultSchema.fixedVectorLen.nonEmpty shouldEqual true
result.resultSchema.fixedVectorLen.get shouldEqual 11
}

it("should reduce result schemas with different fixedVecLengths caused by Stitching without error") {

// null needed below since there is a require in code that prevents empty children
val exec1 = StitchRvsExec(QueryContext(), InProcessPlanDispatcher(QueryConfig.unitTestingQueryConfig),
Some(RvRange(0, 10, 100)), Seq(UnsafeUtils.ZeroPointer.asInstanceOf[ExecPlan]))
val exec2 = StitchRvsExec(QueryContext(), InProcessPlanDispatcher(QueryConfig.unitTestingQueryConfig),
Some(RvRange(0, 10, 100)), Seq(UnsafeUtils.ZeroPointer.asInstanceOf[ExecPlan]))

val exec = LocalPartitionReduceAggregateExec(QueryContext(), dummyDispatcher,
Array[ExecPlan](exec1, exec2), AggregationOperator.Sum, Seq(0))

val rs1 = exec1.reduceSchemas(ResultSchema(List(ColumnInfo("timestamp",
TimestampColumn), ColumnInfo("value", DoubleColumn)), 1, Map(), Some(430), List(0, 1)),
ResultSchema.empty
)
val rs2 = exec1.reduceSchemas(ResultSchema.empty, ResultSchema(List(ColumnInfo("timestamp",
TimestampColumn), ColumnInfo("value", DoubleColumn)), 1, Map(), Some(147), List(0, 1)))

val reduced = exec.reduceSchemas(rs1, rs2)
reduced.columns shouldEqual rs1.columns
reduced.numRowKeyColumns shouldEqual rs1.numRowKeyColumns
reduced.brSchemas shouldEqual rs1.brSchemas
reduced.fixedVectorLen shouldEqual Some(430)
reduced.colIDs shouldEqual rs1.colIDs
}
}
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version in ThisBuild := "0.9.29.1"
version in ThisBuild := "0.9.29.2"