Skip to content

Commit

Permalink
Spark 3.2 (#9)
Browse files Browse the repository at this point in the history
* Bump spark version to 3.2.0
* Bump alchemy version for dependency change
* Code updates for changes to Spark APIs
  • Loading branch information
pidge authored Dec 17, 2021
1 parent bf58504 commit b921dc1
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 11 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.1.0
1.2.0
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.HyperLogLogPlusPlus.v
import org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.expressions.{BinaryExpression, ExpectsInputTypes, Expression, ExpressionDescription, Literal, UnaryExpression}
import org.apache.spark.sql.catalyst.trees.UnaryLike
import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types._
Expand All @@ -27,7 +28,7 @@ object HyperLogLogBase {

def resolveImplementation(exp: String): Implementation = exp match {
case null => resolveImplementation
case s => nameToImpl(s.toString)
case s => nameToImpl(s)
}

def resolveImplementation(implicit impl: Implementation = null): Implementation =
Expand Down Expand Up @@ -60,7 +61,7 @@ object HyperLogLogBase {
}
}

trait HyperLogLogInit extends Expression with HyperLogLogBase {
trait HyperLogLogInit extends Expression with UnaryLike[Expression] with HyperLogLogBase {
def relativeSD: Double

// This formula for `p` came from org.apache.spark.sql.catalyst.expressions.aggregate.HyperLogLogPlusPlus:93
Expand Down Expand Up @@ -141,7 +142,7 @@ trait HyperLogLogInitAgg extends NullableSketchAggregation with HyperLogLogInit
}
}

trait NullableSketchAggregation extends TypedImperativeAggregate[Option[Instance]] with HyperLogLogBase {
trait NullableSketchAggregation extends TypedImperativeAggregate[Option[Instance]] with HyperLogLogBase with UnaryLike[Expression] {

override def createAggregationBuffer(): Option[Instance] = None

Expand All @@ -159,8 +160,6 @@ trait NullableSketchAggregation extends TypedImperativeAggregate[Option[Instance

def child: Expression

override def children: Seq[Expression] = Seq(child)

override def nullable: Boolean = child.nullable

override def serialize(hll: Option[Instance]): Array[Byte] =
Expand Down Expand Up @@ -214,6 +213,8 @@ case class HyperLogLogInitSimple(
}

override def prettyName: String = "hll_init"

override protected def withNewChildInternal(newChild: Expression): Expression = copy(child = newChild)
}


Expand Down Expand Up @@ -267,6 +268,8 @@ case class HyperLogLogInitSimpleAgg(
copy(inputAggBufferOffset = newOffset)

override def prettyName: String = "hll_init_agg"

override protected def withNewChildInternal(newChild: Expression): Expression = copy(child = newChild)
}

/**
Expand Down Expand Up @@ -313,6 +316,8 @@ case class HyperLogLogInitCollection(


override def prettyName: String = "hll_init_collection"

override protected def withNewChildInternal(newChild: Expression): Expression = copy(child = newChild)
}


Expand Down Expand Up @@ -367,6 +372,8 @@ case class HyperLogLogInitCollectionAgg(
copy(inputAggBufferOffset = newOffset)

override def prettyName: String = "hll_init_collection_agg"

override protected def withNewChildInternal(newChild: Expression): Expression = copy(child = newChild)
}


Expand Down Expand Up @@ -427,6 +434,8 @@ case class HyperLogLogMerge(
copy(inputAggBufferOffset = newOffset)

override def prettyName: String = "hll_merge"

override protected def withNewChildInternal(newChild: Expression): Expression = copy(child = newChild)
}

/**
Expand Down Expand Up @@ -455,7 +464,7 @@ case class HyperLogLogRowMerge(
assert(children.nonEmpty, s"function requires at least one argument")
children
}.last match {
case Literal(s: Any, StringType) => children.init
case Literal(_: Any, StringType) => children.init
case _ => children
},
children.last match {
Expand Down Expand Up @@ -490,6 +499,9 @@ case class HyperLogLogRowMerge(
}

override def prettyName: String = "hll_row_merge"

override protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): Expression =
copy(children = newChildren)
}

/**
Expand Down Expand Up @@ -527,6 +539,8 @@ case class HyperLogLogCardinality(
}

override def prettyName: String = "hll_cardinality"

override protected def withNewChildInternal(newChild: Expression): Expression = copy(child = newChild)
}

/**
Expand Down Expand Up @@ -598,6 +612,9 @@ case class HyperLogLogIntersectionCardinality(
}

override def prettyName: String = "hll_intersect_cardinality"

override protected def withNewChildrenInternal(newLeft: Expression, newRight: Expression): Expression =
copy(left = newLeft, right = newRight)
}


Expand Down Expand Up @@ -648,6 +665,8 @@ case class HyperLogLogConvert(
}

override def prettyName: String = "hll_convert"

override protected def withNewChildInternal(newChild: Expression): Expression = copy(child = newChild)
}

object functions extends HLLFunctions {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ trait SQLHelper {
}
}
(keys, values).zipped.foreach { (k, v) =>
if (SQLConf.staticConfKeys.contains(k)) {
if (SQLConf.isStaticConfigKey(k)) {
throw new AnalysisException(s"Cannot modify the value of a static config: $k")
}
conf.setConfString(k, v)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ trait SQLTestUtilsBase
// Blocking uncache table for tests
protected def uncacheTable(tableName: String): Unit = {
val tableIdent = spark.sessionState.sqlParser.parseTableIdentifier(tableName)
val cascade = !spark.sessionState.catalog.isTemporaryTable(tableIdent)
val cascade = !spark.sessionState.catalog.isTempView(tableIdent)
spark.sharedState.cacheManager.uncacheQuery(
spark,
spark.table(tableName).logicalPlan,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ object TestSQLContext {
private[sql] class TestSQLSessionStateBuilder(
session: SparkSession,
state: Option[SessionState])
extends SessionStateBuilder(session, state, Map.empty[String, String]) with WithTestConf {
extends SessionStateBuilder(session, state) with WithTestConf {
override def overrideConfs: Map[String, String] = TestSQLContext.overrideConfs
override def newBuilder: NewBuilder = new TestSQLSessionStateBuilder(_, _)
}
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ ThisBuild / crossScalaVersions := Seq("2.12.11")

ThisBuild / javacOptions ++= Seq("-source", "1.8", "-target", "1.8")

val sparkVersion = "3.1.2"
val sparkVersion = "3.2.0"

lazy val scalaSettings = Seq(
scalaVersion := "2.12.11",
Expand Down

0 comments on commit b921dc1

Please sign in to comment.