Skip to content

Commit

Permalink
feat(query): namespace cardinality publisher (#1294)
Browse files Browse the repository at this point in the history
  • Loading branch information
alextheimer authored Dec 7, 2021
1 parent a0558cc commit 95b91c4
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 1 deletion.
10 changes: 10 additions & 0 deletions coordinator/src/main/scala/filodb.coordinator/ShardManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,16 @@ private[coordinator] final class ShardManager(settings: FilodbSettings,
private val _coordinators = new mutable.LinkedHashMap[Address, ActorRef]
private val _errorShardReassignedAt = new mutable.HashMap[DatasetRef, mutable.HashMap[Int, Long]]

private val _tenantIngestionMeteringOpt =
if (settings.config.getBoolean("shard-key-level-ingestion-metrics-enabled")) {
val inst = TenantIngestionMetering(
settings,
() => { _datasetInfo.map{ case (dsRef, _) => dsRef}.toIterator },
() => { _coordinators.head._2 })
inst.schedulePeriodicPublishJob()
Some(inst)
} else None

val shardReassignmentMinInterval = settings.config.getDuration("shard-manager.reassignment-min-interval")

/* These workloads were in an actor and exist now in an unprotected class.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package filodb.coordinator

import java.util.concurrent.TimeUnit

import scala.concurrent.duration.FiniteDuration
import scala.util.{Failure, Success}

import akka.actor.ActorRef
import com.typesafe.scalalogging.StrictLogging
import kamon.Kamon
import kamon.tag.TagSet
import monix.execution.Scheduler.Implicits.{global => scheduler}

import filodb.coordinator.client.Client
import filodb.coordinator.client.QueryCommands.LogicalPlan2Query
import filodb.core.DatasetRef
import filodb.query.{QueryError, QueryResult, TsCardinalities}

/**
* Periodically queries a node for all namespace cardinalities.
* Kamon gauges are updated with the response data.
*
* The intent is to publish a low-cardinality metric such that namespace
* cardinality queries can be efficiently answered.
*
* @param dsIterProducer produces an iterator to step through all datasets.
* @param coordActorProducer produces a single actor to ask a query. Actors are
* queried in the order they're returned from this function.
*/
case class TenantIngestionMetering(settings: FilodbSettings,
dsIterProducer: () => Iterator[DatasetRef],
coordActorProducer: () => ActorRef) extends StrictLogging{

private val ASK_TIMEOUT = FiniteDuration(
settings.config.getDuration("metering-query-interval").toSeconds,
TimeUnit.SECONDS)
private val SCHED_INIT_DELAY = ASK_TIMEOUT // time until first job is scheduled
private val SCHED_DELAY = ASK_TIMEOUT // time between all jobs after the first

private val CLUSTER_TYPE = settings.config.getString("cluster-type")

private val METRIC_ACTIVE = "active_timeseries_by_tenant"
private val METRIC_TOTAL = "total_timeseries_by_tenant"

def schedulePeriodicPublishJob() : Unit = {
// NOTE: the FiniteDuration overload of scheduleWithFixedDelay
// does not work. Unsure why, but that's why these FiniteDurations are
// awkwardly parsed into seconds.
scheduler.scheduleWithFixedDelay(
SCHED_INIT_DELAY.toSeconds,
SCHED_DELAY.toSeconds,
TimeUnit.SECONDS,
() => queryAndSchedulePublish())
}

/**
* For each dataset, ask a Coordinator with a TsCardinalities LogicalPlan.
* Schedules a job to publish the Coordinator's response.
*/
private def queryAndSchedulePublish() : Unit = {
import filodb.query.exec.TsCardExec._
val groupDepth = 1 // group cardinalities at the second level (i.e. ws & ns)
val prefix = Nil // query for cardinalities regardless of first-level name (i.e. ws name)
dsIterProducer().foreach { dsRef =>
val fut = Client.asyncAsk(
coordActorProducer(),
LogicalPlan2Query(dsRef, TsCardinalities(prefix, groupDepth)),
ASK_TIMEOUT)
fut.onComplete {
case Success(QueryResult(_, _, rv, _, _, _)) =>
rv.foreach(_.rows().foreach{ rr =>
// publish a cardinality metric for each namespace
val data = RowData.fromRowReader(rr)
val prefix = data.group.toString.split(PREFIX_DELIM)
val tags = Map("metric_ws" -> prefix(0),
"metric_ns" -> prefix(1),
"dataset" -> dsRef.dataset,
"cluster_type" -> CLUSTER_TYPE)
Kamon.gauge(METRIC_ACTIVE).withTags(TagSet.from(tags)).update(data.counts.active.toDouble)
Kamon.gauge(METRIC_TOTAL).withTags(TagSet.from(tags)).update(data.counts.total.toDouble)
})
case Success(QueryError(_, _, t)) => logger.warn("QueryError: " + t.getMessage)
case Failure(t) => logger.warn("Failure: " + t.getMessage)
// required to compile
case _ => throw new IllegalArgumentException("should never reach here; attempted to match: " + fut)
}
}
}
}
2 changes: 2 additions & 0 deletions core/src/main/resources/filodb-defaults.conf
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ filodb {
spread-assignment = []

shard-key-level-ingestion-metrics-enabled = true
# Time between each TenantIngestionMetering query/publish job
metering-query-interval = 15 minutes
# info config used for metric breakdown
cluster-type = "raw" // possible values: downsample, raw, recRule, aggregates etc.
# Name of the deployment partition that this FiloDB cluster belongs to
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package filodb.core.memstore.ratelimit

import org.scalatest.funspec.AnyFunSpec
import org.scalatest.matchers.should.Matchers

import filodb.core.memstore.ratelimit.CardinalityStore._
import filodb.core.MetricsTestData

class RocksDbCardinalityStoreSpec extends AnyFunSpec with Matchers {
it ("should correctly return overflow record") {
val dset = MetricsTestData.timeseriesDatasetMultipleShardKeys
val shard = 0
val numOverflow = 123

val db = new RocksDbCardinalityStore(dset.ref, shard)

(0 until MAX_RESULT_SIZE + numOverflow).foreach{ i =>
val prefix = Seq("ws", "ns", s"metric-$i")
db.store(CardinalityRecord(shard, prefix, 1, 1, 1, 1))
}

Seq(Nil, Seq("ws"), Seq("ws", "ns")).foreach{ prefix =>
val res = db.scanChildren(prefix, 3)
res.size shouldEqual MAX_RESULT_SIZE + 1 // one extra for the overflow CardinalityRecord
res.contains(CardinalityRecord(shard, OVERFLOW_PREFIX,
numOverflow, numOverflow, numOverflow, numOverflow)) shouldEqual true
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -361,12 +361,14 @@ final case object TsCardExec {
// row name assigned to overflow counts
val OVERFLOW_GROUP = prefixToGroup(CardinalityStore.OVERFLOW_PREFIX)

val PREFIX_DELIM = ","

/**
* Convert a shard key prefix to a row's group name.
*/
def prefixToGroup(prefix: Seq[String]): ZeroCopyUTF8String = {
// just concat the prefix together with a single char delimiter
prefix.mkString(",").utf8
prefix.mkString(PREFIX_DELIM).utf8
}

case class CardCounts(active: Int, total: Int) {
Expand Down

0 comments on commit 95b91c4

Please sign in to comment.