Skip to content

Commit

Permalink
feat(query): Arithmetic Operators between Scalar & Vector
Browse files Browse the repository at this point in the history
feat(query): Arithmetic Operators between Scalar & Vector

implement query support for Binary Operators between a scalar and a vector
  • Loading branch information
sherali42 authored and Evan Chan committed Oct 8, 2018
1 parent e198df4 commit 7400035
Show file tree
Hide file tree
Showing 7 changed files with 298 additions and 7 deletions.
5 changes: 4 additions & 1 deletion cli/src/main/scala/filodb.cli/CliMain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,10 @@ object CliMain extends ArgMain[Arguments] with CsvImportExport with FilodbCluste
case None =>
try {
client.logicalPlan2Query(ref, plan, qOpts) match {
case QueryResult2(_, schema, result) => result.foreach(rv => println(rv.prettyPrint(schema)))
case QueryResult2(_, schema, result) => {
println(s"Number of Range Vectors: ${result.size}")
result.foreach(rv => println(rv.prettyPrint(schema)))
}
case QueryError2(_,ex) => println(s"ERROR: ${ex.getMessage}")
}
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ trait Expressions extends Aggregates with Functions {
val scalar = expression.toScalar
val seriesPlan = rhs.asInstanceOf[PeriodicSeries].toPeriodicSeriesPlan(queryParams)
ScalarVectorBinaryOperation(operator.getPlanOperator, scalar, seriesPlan, scalarIsLhs = true)
case series: PeriodicSeries if lhs.isInstanceOf[ScalarExpression] =>
val scalar = lhs.asInstanceOf[ScalarExpression].toScalar
case series: PeriodicSeries if rhs.isInstanceOf[ScalarExpression] =>
val scalar = rhs.asInstanceOf[ScalarExpression].toScalar
val seriesPlan = series.toPeriodicSeriesPlan(queryParams)
ScalarVectorBinaryOperation(operator.getPlanOperator, scalar, seriesPlan, scalarIsLhs = false)
case series: PeriodicSeries if rhs.isInstanceOf[PeriodicSeries] =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,8 @@ trait Expression extends Aggregates with Selector with Numeric with Join {


lazy val binaryExpression: PackratParser[BinaryExpression] =
"(".? ~ expression ~ binaryOp ~ vectorMatch.? ~ expression ~ ")".? ^^ {
case p1 ~ lhs ~ op ~ vm ~ rhs ~ p2 => BinaryExpression(lhs, op, vm, rhs)
expression ~ binaryOp ~ vectorMatch.? ~ expression ^^ {
case lhs ~ op ~ vm ~ rhs => BinaryExpression(lhs, op, vm, rhs)
}


Expand All @@ -296,7 +296,7 @@ trait Expression extends Aggregates with Selector with Numeric with Join {

lazy val expression: PackratParser[Expression] =
binaryExpression | aggregateExpression |
function | unaryExpression | vector | numericalExpression | simpleSeries
function | unaryExpression | vector | numericalExpression | simpleSeries | "(" ~> expression <~ ")"

}

Expand Down
19 changes: 19 additions & 0 deletions prometheus/src/test/scala/filodb/prometheus/parse/ParserSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ class ParserSpec extends FunSpec with Matchers {
parseSuccessfully("1 + 2/(3*1)")
parseSuccessfully("-some_metric")
parseSuccessfully("+some_metric")
parseSuccessfully("(1 + heap_size{a=\"b\"})")
parseSuccessfully("(1 + heap_size{a=\"b\"}) + 5")
parseSuccessfully("(1 + heap_size{a=\"b\"}) + 5 * (3 - cpu_load{c=\"d\"})")
parseSuccessfully("((1 + heap_size{a=\"b\"}) + 5) * (3 - cpu_load{c=\"d\"})")

parseError("")
parseError("# just a comment\n\n")
Expand All @@ -48,6 +52,11 @@ class ParserSpec extends FunSpec with Matchers {
parseError("*1")
parseError("(1))")
parseError("((1)")
parseError("((1 + heap_size{a=\"b\"})")
parseError("(1 + heap_size{a=\"b\"}))")
parseError("(1 + heap_size{a=\"b\"}) + (5")
parseError("(1 + heap_size{a=\"b\"}) + 5 * (3 - cpu_load{c=\"d\"}")

parseError("(")
parseError("1 and 1")
parseError("1 == 1")
Expand Down Expand Up @@ -247,6 +256,16 @@ class ParserSpec extends FunSpec with Matchers {
"ApplyInstantFunction(Aggregate(Sum,PeriodicSeriesWithWindowing(RawSeries(IntervalSelector(List(1524855388000),List(1524855988000)),List(ColumnFilter(__name__,Equals(http_request_duration_seconds_bucket))),List()),1524855988000,1000,1524855988000,600000,Rate,List()),List(),List(job, le),List()),HistogramQuantile,List(0.9))",
"http_requests_total" ->
"PeriodicSeries(RawSeries(IntervalSelector(List(1524855688000),List(1524855988000)),List(ColumnFilter(__name__,Equals(http_requests_total))),List()),1524855988000,1000,1524855988000)",
"http_requests_total ^ 5" ->
"ScalarVectorBinaryOperation(POW,5.0,PeriodicSeries(RawSeries(IntervalSelector(List(1524855688000),List(1524855988000)),List(ColumnFilter(__name__,Equals(http_requests_total))),List()),1524855988000,1000,1524855988000),false)",

//FIXME Operator precedence is not implemented
"10 + http_requests_total * 5" ->
"ScalarVectorBinaryOperation(ADD,10.0,ScalarVectorBinaryOperation(MUL,5.0,PeriodicSeries(RawSeries(IntervalSelector(List(1524855688000),List(1524855988000)),List(ColumnFilter(__name__,Equals(http_requests_total))),List()),1524855988000,1000,1524855988000),false),true)",
"10 + (http_requests_total * 5)" ->
"ScalarVectorBinaryOperation(ADD,10.0,ScalarVectorBinaryOperation(MUL,5.0,PeriodicSeries(RawSeries(IntervalSelector(List(1524855688000),List(1524855988000)),List(ColumnFilter(__name__,Equals(http_requests_total))),List()),1524855988000,1000,1524855988000),false),true)",
"(10 + http_requests_total) * 5" ->
"ScalarVectorBinaryOperation(MUL,5.0,ScalarVectorBinaryOperation(ADD,10.0,PeriodicSeries(RawSeries(IntervalSelector(List(1524855688000),List(1524855988000)),List(ColumnFilter(__name__,Equals(http_requests_total))),List()),1524855988000,1000,1524855988000),true),false)",
"topk(5, http_requests_total)" ->
"Aggregate(TopK,PeriodicSeries(RawSeries(IntervalSelector(List(1524855688000),List(1524855988000)),List(ColumnFilter(__name__,Equals(http_requests_total))),List()),1524855988000,1000,1524855988000),List(5.0),List(),List())",
"irate(http_requests_total{job=\"api-server\"}[5m])" ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import filodb.core.metadata.Dataset
import filodb.core.query._
import filodb.memory.format.RowReader
import filodb.query.{BinaryOperator, InstantFunctionId, QueryConfig}
import filodb.query.exec.binaryOp.BinaryOperatorFunction
import filodb.query.exec.rangefn.InstantFunction

/**
Expand Down Expand Up @@ -93,10 +94,33 @@ final case class ScalarOperationMapper(operator: BinaryOperator,
protected[exec] def args: String =
s"operator=$operator, scalar=$scalar"

val operatorFunction = BinaryOperatorFunction.factoryMethod(operator)

def apply(source: Observable[RangeVector],
queryConfig: QueryConfig,
limit: Int,
sourceSchema: ResultSchema): Observable[RangeVector] = ???
sourceSchema: ResultSchema): Observable[RangeVector] = {
source.map { rv =>
val resultIterator: Iterator[RowReader] = new Iterator[RowReader]() {

private val rows = rv.rows
private val result = new TransientRow()
private val sclrVal = scalar.asInstanceOf[Double]

override def hasNext: Boolean = rows.hasNext

override def next(): RowReader = {
val next = rows.next()
val nextVal = next.getDouble(1)
val newValue = if (scalarOnLhs) operatorFunction.calculate(sclrVal, nextVal)
else operatorFunction.calculate(nextVal, sclrVal)
result.setValues(next.getLong(0), newValue)
result
}
}
IteratorBackedRangeVector(rv.key, resultIterator)
}
}

// TODO all operation defs go here and get invoked from mapRangeVector
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package filodb.query.exec.binaryOp

import filodb.query.BinaryOperator
import filodb.query.BinaryOperator._

trait ScalarFunction {
def calculate (lhs: Double, rhs: Double): Double
}

object BinaryOperatorFunction {

/**
* This function returns a function that can be applied to generate the result.
*
* @param function to be invoked
* @return the function
*/
def factoryMethod(function: BinaryOperator): ScalarFunction = {
function match {

case SUB => new ScalarFunction {
override def calculate(lhs: Double, rhs: Double): Double = lhs - rhs
}
case ADD => new ScalarFunction {
override def calculate(lhs: Double, rhs: Double): Double = lhs + rhs
}
case MUL => new ScalarFunction {
override def calculate(lhs: Double, rhs: Double): Double = lhs * rhs
}
case MOD => new ScalarFunction {
override def calculate(lhs: Double, rhs: Double): Double = lhs % rhs
}
case DIV => new ScalarFunction {
override def calculate(lhs: Double, rhs: Double): Double = lhs / rhs
}
case POW => new ScalarFunction {
override def calculate(lhs: Double, rhs: Double): Double = math.pow(lhs, rhs)
}
case _ => throw new UnsupportedOperationException(s"$function not supported.")
}
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
package filodb.query.exec.rangefn

import scala.util.Random

import com.typesafe.config.{Config, ConfigFactory}
import monix.execution.Scheduler.Implicits.global
import monix.reactive.Observable
import org.scalatest.{FunSpec, Matchers}
import org.scalatest.concurrent.ScalaFutures

import filodb.core.MetricsTestData
import filodb.core.query.{CustomRangeVectorKey, RangeVector, RangeVectorKey, ResultSchema}
import filodb.memory.format.{RowReader, ZeroCopyUTF8String}
import filodb.query._
import filodb.query.exec.TransientRow

class BinaryOperatorSpec extends FunSpec with Matchers with ScalaFutures {

val config: Config = ConfigFactory.load("application_test.conf").getConfig("filodb")
val resultSchema = ResultSchema(MetricsTestData.timeseriesDataset.infosFromIDs(0 to 1), 1)
val ignoreKey = CustomRangeVectorKey(
Map(ZeroCopyUTF8String("ignore") -> ZeroCopyUTF8String("ignore")))
val sampleBase: Array[RangeVector] = Array(
new RangeVector {
override def key: RangeVectorKey = ignoreKey
override def rows: Iterator[RowReader] = Seq(
new TransientRow(1L, 3.3d),
new TransientRow(2L, 5.1d)).iterator
},
new RangeVector {
override def key: RangeVectorKey = ignoreKey
override def rows: Iterator[RowReader] = Seq(
new TransientRow(3L, 3239.3423d),
new TransientRow(4L, 94935.1523d)).iterator
})
val queryConfig = new QueryConfig(config.getConfig("query"))
val rand = new Random()
val error = 0.00000001d
val scalar = 5.0

it("should work with Binary Operator mapper") {
val ignoreKey = CustomRangeVectorKey(
Map(ZeroCopyUTF8String("ignore") -> ZeroCopyUTF8String("ignore")))

val samples: Array[RangeVector] = Array.fill(100)(new RangeVector {
val data: Stream[TransientRow] = Stream.from(0).map { n =>
new TransientRow(n.toLong, rand.nextDouble())
}.take(20)

override def key: RangeVectorKey = ignoreKey

override def rows: Iterator[RowReader] = data.iterator
})
fireBinaryOperatorTests(samples)
}

it ("should handle NaN") {
val ignoreKey = CustomRangeVectorKey(
Map(ZeroCopyUTF8String("ignore") -> ZeroCopyUTF8String("ignore")))

val samples: Array[RangeVector] = Array(
new RangeVector {
override def key: RangeVectorKey = ignoreKey
override def rows: Iterator[RowReader] = Seq(
new TransientRow(1L, Double.NaN),
new TransientRow(2L, 5.6d)).iterator
},
new RangeVector {
override def key: RangeVectorKey = ignoreKey
override def rows: Iterator[RowReader] = Seq(
new TransientRow(1L, 4.6d),
new TransientRow(2L, 4.4d)).iterator
},
new RangeVector {
override def key: RangeVectorKey = ignoreKey
override def rows: Iterator[RowReader] = Seq(
new TransientRow(1L, 0d),
new TransientRow(2L, 5.4d)).iterator
}
)
fireBinaryOperatorTests(samples)
}

it ("should handle special cases") {
val ignoreKey = CustomRangeVectorKey(
Map(ZeroCopyUTF8String("ignore") -> ZeroCopyUTF8String("ignore")))

val samples: Array[RangeVector] = Array(
new RangeVector {
override def key: RangeVectorKey = ignoreKey

override def rows: Iterator[RowReader] = Seq(
new TransientRow(1L, 2.0d/0d),
new TransientRow(2L, 4.5d),
new TransientRow(2L, 0d),
new TransientRow(2L, -2.1d),
new TransientRow(2L, 5.9d),
new TransientRow(2L, Double.NaN),
new TransientRow(2L, 3.3d)).iterator
}
)
fireBinaryOperatorTests(samples)
}

private def fireBinaryOperatorTests(samples: Array[RangeVector]): Unit = {

// Subtraction - prefix
val expectedSub1 = samples.map(_.rows.map(v => scalar - v.getDouble(1)))
applyBinaryOperationAndAssertResult(samples, expectedSub1, BinaryOperator.SUB, scalar, true)

// Subtraction - suffix
val expectedSub2 = samples.map(_.rows.map(v => v.getDouble(1) - scalar))
applyBinaryOperationAndAssertResult(samples, expectedSub2, BinaryOperator.SUB, scalar, false)

// Addition - prefix
val expectedAdd1 = samples.map(_.rows.map(v => scalar + v.getDouble(1)))
applyBinaryOperationAndAssertResult(samples, expectedAdd1, BinaryOperator.ADD, scalar, true)

// Addition - suffix
val expectedAdd2 = samples.map(_.rows.map(v => v.getDouble(1) + scalar))
applyBinaryOperationAndAssertResult(samples, expectedAdd2, BinaryOperator.ADD, scalar, false)

// Multiply - prefix
val expectedMul1 = samples.map(_.rows.map(v => scalar * v.getDouble(1)))
applyBinaryOperationAndAssertResult(samples, expectedMul1, BinaryOperator.MUL, scalar, true)

// Multiply - suffix
val expectedMul2 = samples.map(_.rows.map(v => v.getDouble(1) * scalar))
applyBinaryOperationAndAssertResult(samples, expectedMul2, BinaryOperator.MUL, scalar, false)

// Modulo - prefix
val expectedMod1 = samples.map(_.rows.map(v => scalar % v.getDouble(1)))
applyBinaryOperationAndAssertResult(samples, expectedMod1, BinaryOperator.MOD, scalar, true)

// Modulo - suffix
val expectedMod2 = samples.map(_.rows.map(v => v.getDouble(1) % scalar))
applyBinaryOperationAndAssertResult(samples, expectedMod2, BinaryOperator.MOD, scalar, false)

// Division - prefix
val expectedDiv1 = samples.map(_.rows.map(v => scalar / v.getDouble(1)))
applyBinaryOperationAndAssertResult(samples, expectedDiv1, BinaryOperator.DIV, scalar, true)

// Division - suffix
val expectedDiv2 = samples.map(_.rows.map(v => v.getDouble(1) / scalar))
applyBinaryOperationAndAssertResult(samples, expectedDiv2, BinaryOperator.DIV, scalar, false)

// power - prefix
val expectedPow1 = samples.map(_.rows.map(v => math.pow(scalar, v.getDouble(1))))
applyBinaryOperationAndAssertResult(samples, expectedPow1, BinaryOperator.POW, scalar, true)

// power - suffix
val expectedPow2 = samples.map(_.rows.map(v => math.pow(v.getDouble(1), scalar)))
applyBinaryOperationAndAssertResult(samples, expectedPow2, BinaryOperator.POW, scalar, false)
}

it ("should handle unknown functions") {
// sort_desc
the[UnsupportedOperationException] thrownBy {
val binaryOpMapper = exec.ScalarOperationMapper(BinaryOperator.EQL, 10, true)
binaryOpMapper(Observable.fromIterable(sampleBase), queryConfig, 1000, resultSchema)
} should have message "EQL not supported."

the[UnsupportedOperationException] thrownBy {
val binaryOpMapper = exec.ScalarOperationMapper(BinaryOperator.GTE, 10, false)
binaryOpMapper(Observable.fromIterable(sampleBase), queryConfig, 1000, resultSchema)
} should have message "GTE not supported."
}

it ("should fail with wrong calculation") {
// ceil
val expectedVal = sampleBase.map(_.rows.map(v => scala.math.floor(v.getDouble(1))))
val binaryOpMapper = exec.ScalarOperationMapper(BinaryOperator.ADD, scalar, true)
val resultObs = binaryOpMapper(Observable.fromIterable(sampleBase), queryConfig, 1000, resultSchema)
val result = resultObs.toListL.runAsync.futureValue.map(_.rows.map(_.getDouble(1)))
expectedVal.zip(result).foreach {
case (ex, res) => {
ex.zip(res).foreach {
case (val1, val2) =>
val1 should not equal val2
}
}
}
}

private def applyBinaryOperationAndAssertResult(samples: Array[RangeVector], expectedVal: Array[Iterator[Double]],
binOp: BinaryOperator, scalar: Double, scalarOnLhs: Boolean): Unit = {
val scalarOpMapper = exec.ScalarOperationMapper(binOp, scalar, scalarOnLhs)
val resultObs = scalarOpMapper(Observable.fromIterable(samples), queryConfig, 1000, resultSchema)
val result = resultObs.toListL.runAsync.futureValue.map(_.rows.map(_.getDouble(1)))
expectedVal.zip(result).foreach {
case (ex, res) => {
ex.zip(res).foreach {
case (val1, val2) =>
if (val1.isInfinity) val2.isInfinity shouldEqual true
else if (val1.isNaN) val2.isNaN shouldEqual true
else val1 shouldEqual val2
}
}
}
}

}

0 comments on commit 7400035

Please sign in to comment.