Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark on k8s : changes for using namspaces with Kubernetes client APIs. #462

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions client-http/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.4.1</version>
<executions>
<execution>
<id>shade</id>
Expand Down
15 changes: 12 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@
<spark.scala-2.11.version>2.4.5</spark.scala-2.11.version>
<spark.scala-2.12.version>2.4.5</spark.scala-2.12.version>
<spark.version>${spark.scala-2.11.version}</spark.version>
<kubernetes.client.version>5.6.0</kubernetes.client.version>
<kubernetes.client.version>6.8.1</kubernetes.client.version>
<hive.version>3.0.0</hive.version>
<commons-codec.version>1.9</commons-codec.version>
<httpclient.version>4.5.13</httpclient.version>
Expand Down Expand Up @@ -1145,6 +1145,13 @@
</reporting>

<profiles>
<profile>
<id>hadoop3</id>
<properties>
<hadoop.major-minor.version>3</hadoop.major-minor.version>
<hadoop.version>3.4.0</hadoop.version>
</properties>
</profile>
<profile>
<id>hadoop2</id>
<properties>
Expand Down Expand Up @@ -1192,11 +1199,13 @@
<profile>
<id>spark3</id>
<properties>
<spark.version>3.2.3</spark.version>
<spark.version>3.5.0</spark.version>
<java.version>1.8</java.version>
<py4j.version>0.10.9.7</py4j.version>
<json4s.version>3.7.0-M11</json4s.version>
<netty.version>4.1.92.Final</netty.version>
<netty.version>4.1.108.Final</netty.version>
<jackson.version>2.15.2</jackson.version>
<jackson-databind.version>2.15.2</jackson-databind.version>
<spark.bin.name>spark-${spark.version}-bin-hadoop${hadoop.major-minor.version}</spark.bin.name>
<spark.bin.download.url>
https://archive.apache.org/dist/spark/spark-${spark.version}/${spark.bin.name}.tgz
Expand Down
1 change: 1 addition & 0 deletions repl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.4.1</version>
<executions>
<execution>
<id>shade</id>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ class SQLInterpreterSpec extends BaseInterpreterSpec {
assert(resp.isInstanceOf[Interpreter.ExecuteError])
val error = resp.asInstanceOf[Interpreter.ExecuteError]
error.ename should be ("Error")
assert(error.evalue.contains("not found"))
assert(error.evalue.contains("TABLE_OR_VIEW_NOT_FOUND"))
}

it should "fail if submitting multiple queries" in withInterpreter { interpreter =>
Expand Down
2 changes: 2 additions & 0 deletions rsc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
Expand Down Expand Up @@ -114,6 +115,7 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
Expand Down
6 changes: 6 additions & 0 deletions server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
</dependency>

<dependency>
<groupId>org.apache.livy</groupId>
<artifactId>livy-test-lib</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,17 @@ package org.apache.livy.server.batch

import java.lang.ProcessBuilder.Redirect
import java.util.concurrent.atomic.AtomicInteger

import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
import scala.util.Random

import com.fasterxml.jackson.annotation.JsonIgnoreProperties

import org.apache.livy.{LivyConf, Logging, Utils}
import org.apache.livy.server.AccessManager
import org.apache.livy.server.recovery.SessionStore
import org.apache.livy.sessions.{FinishedSessionState, Session, SessionState}
import org.apache.livy.sessions.Session._
import org.apache.livy.utils.{AppInfo, SparkApp, SparkAppListener, SparkProcessBuilder}


@JsonIgnoreProperties(ignoreUnknown = true)
case class BatchRecoveryMetadata(
id: Int,
Expand All @@ -40,6 +38,7 @@ case class BatchRecoveryMetadata(
appTag: String,
owner: String,
proxyUser: Option[String],
namespace: String,
version: Int = 1)
extends RecoveryMetadata

Expand All @@ -64,7 +63,7 @@ object BatchSession extends Logging {
mockApp: Option[SparkApp] = None): BatchSession = {
val appTag = s"livy-batch-$id-${Random.alphanumeric.take(8).mkString}".toLowerCase()
val impersonatedUser = accessManager.checkImpersonation(proxyUser, owner)

val namespace = SparkApp.getNamespace(request.conf, livyConf)
def createSparkApp(s: BatchSession): SparkApp = {
val conf = SparkApp.prepareSparkConf(
appTag,
Expand Down Expand Up @@ -106,7 +105,8 @@ object BatchSession extends Logging {
childProcesses.decrementAndGet()
}
}
SparkApp.create(appTag, None, Option(sparkSubmit), livyConf, Option(s))
val extrasMap: Map[String, String] = Map(SparkApp.SPARK_KUBERNETES_NAMESPACE_KEY -> namespace)
SparkApp.create(appTag, None, Option(sparkSubmit), livyConf, Option(s), extrasMap)
}

info(s"Creating batch session $id: [owner: $owner, request: $request]")
Expand All @@ -120,6 +120,7 @@ object BatchSession extends Logging {
owner,
impersonatedUser,
sessionStore,
namespace,
mockApp.map { m => (_: BatchSession) => m }.getOrElse(createSparkApp))
}

Expand All @@ -137,8 +138,9 @@ object BatchSession extends Logging {
m.owner,
m.proxyUser,
sessionStore,
m.namespace,
mockApp.map { m => (_: BatchSession) => m }.getOrElse { s =>
SparkApp.create(m.appTag, m.appId, None, livyConf, Option(s))
SparkApp.create(m.appTag, m.appId, None, livyConf, Option(s), Map[String, String]())
})
}
}
Expand All @@ -152,6 +154,7 @@ class BatchSession(
owner: String,
override val proxyUser: Option[String],
sessionStore: SessionStore,
namespace: String,
sparkApp: BatchSession => SparkApp)
extends Session(id, name, owner, livyConf) with SparkAppListener {
import BatchSession._
Expand Down Expand Up @@ -204,5 +207,5 @@ class BatchSession(
override def infoChanged(appInfo: AppInfo): Unit = { this.appInfo = appInfo }

override def recoveryMetadata: RecoveryMetadata =
BatchRecoveryMetadata(id, name, appId, appTag, owner, proxyUser)
BatchRecoveryMetadata(id, name, appId, appTag, owner, proxyUser, namespace)
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ case class InteractiveRecoveryMetadata(
// proxyUser is deprecated. It is available here only for backward compatibility
proxyUser: Option[String],
rscDriverUri: Option[URI],
namespace: String,
version: Int = 1)
extends RecoveryMetadata

Expand All @@ -93,6 +94,7 @@ object InteractiveSession extends Logging {
mockClient: Option[RSCClient] = None): InteractiveSession = {
val appTag = s"livy-session-$id-${Random.alphanumeric.take(8).mkString}".toLowerCase()
val impersonatedUser = accessManager.checkImpersonation(proxyUser, owner)
val namespace = SparkApp.getNamespace(request.conf, livyConf)

val client = mockClient.orElse {
val conf = SparkApp.prepareSparkConf(appTag, livyConf, prepareConf(
Expand Down Expand Up @@ -153,6 +155,7 @@ object InteractiveSession extends Logging {
request.numExecutors,
request.pyFiles,
request.queue,
namespace,
mockApp)
}

Expand Down Expand Up @@ -193,6 +196,7 @@ object InteractiveSession extends Logging {
metadata.numExecutors,
metadata.pyFiles,
metadata.queue,
metadata.namespace,
mockApp)
}

Expand Down Expand Up @@ -433,6 +437,7 @@ class InteractiveSession(
val numExecutors: Option[Int],
val pyFiles: List[String],
val queue: Option[String],
val namespace: String,
mockApp: Option[SparkApp]) // For unit test.
extends Session(id, name, owner, ttl, idleTimeout, livyConf)
with SessionHeartbeat
Expand Down Expand Up @@ -462,11 +467,13 @@ class InteractiveSession(
app = mockApp.orElse {
val driverProcess = client.flatMap { c => Option(c.getDriverProcess) }
.map(new LineBufferedProcess(_, livyConf.getInt(LivyConf.SPARK_LOGS_SIZE)))
if (!livyConf.isRunningOnKubernetes()) {
driverProcess.map(_ => SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this)))
val namespace = SparkApp.getNamespace(conf, livyConf)
val extrasMap: Map[String, String] = Map(SparkApp.SPARK_KUBERNETES_NAMESPACE_KEY -> namespace)
if (!livyConf.isRunningOnKubernetes()) {
driverProcess.map(_ => SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this), extrasMap))
} else {
// Create SparkApp for Kubernetes anyway
Some(SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this)))
Some(SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this), extrasMap))
}
}

Expand Down Expand Up @@ -535,7 +542,7 @@ class InteractiveSession(
heartbeatTimeout.toSeconds.toInt, owner, ttl, idleTimeout,
driverMemory, driverCores, executorMemory, executorCores, conf,
archives, files, jars, numExecutors, pyFiles, queue,
proxyUser, rscDriverUri)
proxyUser, rscDriverUri, namespace)

override def state: SessionState = {
if (serverSideState == SessionState.Running) {
Expand Down
21 changes: 18 additions & 3 deletions server/src/main/scala/org/apache/livy/utils/SparkApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import scala.collection.JavaConverters._

import org.apache.livy.LivyConf

import java.io.{File, FileInputStream}
import java.util.Properties

object AppInfo {
val DRIVER_LOG_URL_NAME = "driverLogUrl"
val SPARK_UI_URL_NAME = "sparkUiUrl"
Expand Down Expand Up @@ -56,12 +59,23 @@ trait SparkAppListener {
*/
object SparkApp {
private val SPARK_YARN_TAG_KEY = "spark.yarn.tags"

val SPARK_KUBERNETES_NAMESPACE_KEY = "spark.kubernetes.namespace"
object State extends Enumeration {
val STARTING, RUNNING, FINISHED, FAILED, KILLED = Value
}
type State = State.Value

def getNamespace(conf: Map[String, String], livyConf: LivyConf): String = {
var namespace:String = conf.getOrElse(SPARK_KUBERNETES_NAMESPACE_KEY, "")
if(namespace == "") {
val sparkHome = livyConf.sparkHome().get //SPARK_HOME is mandatory for Livy
val sparkDefaultsPath = sparkHome + File.separator + "conf" + File.separator + "spark-defaults.conf"
val properties = new Properties()
properties.load(new FileInputStream(sparkDefaultsPath))
namespace = properties.getProperty(SPARK_KUBERNETES_NAMESPACE_KEY,"default")
}
namespace
}
/**
* Return cluster manager dependent SparkConf.
*
Expand Down Expand Up @@ -102,11 +116,12 @@ object SparkApp {
appId: Option[String],
process: Option[LineBufferedProcess],
livyConf: LivyConf,
listener: Option[SparkAppListener]): SparkApp = {
listener: Option[SparkAppListener],
extrasMap: Map[String, String]): SparkApp = {
if (livyConf.isRunningOnYarn()) {
new SparkYarnApp(uniqueAppTag, appId, process, listener, livyConf)
} else if (livyConf.isRunningOnKubernetes()) {
new SparkKubernetesApp(uniqueAppTag, appId, process, listener, livyConf)
new SparkKubernetesApp(uniqueAppTag, appId, process, listener, livyConf, extrasMap)
} else {
require(process.isDefined, "process must not be None when Livy master is not YARN or" +
"Kubernetes.")
Expand Down
Loading