diff --git a/client-http/pom.xml b/client-http/pom.xml index 875f8cf66..2a72f8722 100644 --- a/client-http/pom.xml +++ b/client-http/pom.xml @@ -112,6 +112,7 @@ org.apache.maven.plugins maven-shade-plugin + 3.4.1 shade diff --git a/pom.xml b/pom.xml index 8c5e10503..ecf99e252 100644 --- a/pom.xml +++ b/pom.xml @@ -85,7 +85,7 @@ 2.4.5 2.4.5 ${spark.scala-2.11.version} - 5.6.0 + 6.8.1 3.0.0 1.9 4.5.13 @@ -1145,6 +1145,13 @@ + + hadoop3 + + 3 + 3.4.0 + + hadoop2 @@ -1192,11 +1199,13 @@ spark3 - 3.2.3 + 3.5.0 1.8 0.10.9.7 3.7.0-M11 - 4.1.92.Final + 4.1.108.Final + 2.15.2 + 2.15.2 spark-${spark.version}-bin-hadoop${hadoop.major-minor.version} https://archive.apache.org/dist/spark/spark-${spark.version}/${spark.bin.name}.tgz diff --git a/repl/pom.xml b/repl/pom.xml index 6a37cb515..6123c94c0 100644 --- a/repl/pom.xml +++ b/repl/pom.xml @@ -176,6 +176,7 @@ org.apache.maven.plugins maven-shade-plugin + 3.4.1 shade diff --git a/repl/src/test/scala/org/apache/livy/repl/SQLInterpreterSpec.scala b/repl/src/test/scala/org/apache/livy/repl/SQLInterpreterSpec.scala index 3d9d4aca3..6f333a1f5 100644 --- a/repl/src/test/scala/org/apache/livy/repl/SQLInterpreterSpec.scala +++ b/repl/src/test/scala/org/apache/livy/repl/SQLInterpreterSpec.scala @@ -193,7 +193,7 @@ class SQLInterpreterSpec extends BaseInterpreterSpec { assert(resp.isInstanceOf[Interpreter.ExecuteError]) val error = resp.asInstanceOf[Interpreter.ExecuteError] error.ename should be ("Error") - assert(error.evalue.contains("not found")) + assert(error.evalue.contains("TABLE_OR_VIEW_NOT_FOUND")) } it should "fail if submitting multiple queries" in withInterpreter { interpreter => diff --git a/rsc/pom.xml b/rsc/pom.xml index df5e92355..74221d751 100644 --- a/rsc/pom.xml +++ b/rsc/pom.xml @@ -64,6 +64,7 @@ io.netty netty-all + ${netty.version} org.apache.spark @@ -114,6 +115,7 @@ org.apache.hadoop hadoop-common + ${hadoop.version} provided diff --git a/server/pom.xml b/server/pom.xml index f9c296e51..38a9d4240 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -52,6 +52,12 @@ ${project.version} + + org.apache.commons + commons-lang3 + 3.12.0 + + org.apache.livy livy-test-lib diff --git a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala index 8b64a0398..094e4466f 100644 --- a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala +++ b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala @@ -19,12 +19,9 @@ package org.apache.livy.server.batch import java.lang.ProcessBuilder.Redirect import java.util.concurrent.atomic.AtomicInteger - import scala.concurrent.{ExecutionContext, ExecutionContextExecutor} import scala.util.Random - import com.fasterxml.jackson.annotation.JsonIgnoreProperties - import org.apache.livy.{LivyConf, Logging, Utils} import org.apache.livy.server.AccessManager import org.apache.livy.server.recovery.SessionStore @@ -32,6 +29,7 @@ import org.apache.livy.sessions.{FinishedSessionState, Session, SessionState} import org.apache.livy.sessions.Session._ import org.apache.livy.utils.{AppInfo, SparkApp, SparkAppListener, SparkProcessBuilder} + @JsonIgnoreProperties(ignoreUnknown = true) case class BatchRecoveryMetadata( id: Int, @@ -40,6 +38,7 @@ case class BatchRecoveryMetadata( appTag: String, owner: String, proxyUser: Option[String], + namespace: String, version: Int = 1) extends RecoveryMetadata @@ -64,7 +63,7 @@ object BatchSession extends Logging { mockApp: Option[SparkApp] = None): BatchSession = { val appTag = s"livy-batch-$id-${Random.alphanumeric.take(8).mkString}".toLowerCase() val impersonatedUser = accessManager.checkImpersonation(proxyUser, owner) - + val namespace = SparkApp.getNamespace(request.conf, livyConf) def createSparkApp(s: BatchSession): SparkApp = { val conf = SparkApp.prepareSparkConf( appTag, @@ -106,7 +105,8 @@ object BatchSession extends Logging { childProcesses.decrementAndGet() } } - SparkApp.create(appTag, None, Option(sparkSubmit), livyConf, Option(s)) + val extrasMap: Map[String, String] = Map(SparkApp.SPARK_KUBERNETES_NAMESPACE_KEY -> namespace) + SparkApp.create(appTag, None, Option(sparkSubmit), livyConf, Option(s), extrasMap) } info(s"Creating batch session $id: [owner: $owner, request: $request]") @@ -120,6 +120,7 @@ object BatchSession extends Logging { owner, impersonatedUser, sessionStore, + namespace, mockApp.map { m => (_: BatchSession) => m }.getOrElse(createSparkApp)) } @@ -137,8 +138,9 @@ object BatchSession extends Logging { m.owner, m.proxyUser, sessionStore, + m.namespace, mockApp.map { m => (_: BatchSession) => m }.getOrElse { s => - SparkApp.create(m.appTag, m.appId, None, livyConf, Option(s)) + SparkApp.create(m.appTag, m.appId, None, livyConf, Option(s), Map[String, String]()) }) } } @@ -152,6 +154,7 @@ class BatchSession( owner: String, override val proxyUser: Option[String], sessionStore: SessionStore, + namespace: String, sparkApp: BatchSession => SparkApp) extends Session(id, name, owner, livyConf) with SparkAppListener { import BatchSession._ @@ -204,5 +207,5 @@ class BatchSession( override def infoChanged(appInfo: AppInfo): Unit = { this.appInfo = appInfo } override def recoveryMetadata: RecoveryMetadata = - BatchRecoveryMetadata(id, name, appId, appTag, owner, proxyUser) + BatchRecoveryMetadata(id, name, appId, appTag, owner, proxyUser, namespace) } diff --git a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala index 4250794dc..b8959896f 100644 --- a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala +++ b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala @@ -70,6 +70,7 @@ case class InteractiveRecoveryMetadata( // proxyUser is deprecated. It is available here only for backward compatibility proxyUser: Option[String], rscDriverUri: Option[URI], + namespace: String, version: Int = 1) extends RecoveryMetadata @@ -93,6 +94,7 @@ object InteractiveSession extends Logging { mockClient: Option[RSCClient] = None): InteractiveSession = { val appTag = s"livy-session-$id-${Random.alphanumeric.take(8).mkString}".toLowerCase() val impersonatedUser = accessManager.checkImpersonation(proxyUser, owner) + val namespace = SparkApp.getNamespace(request.conf, livyConf) val client = mockClient.orElse { val conf = SparkApp.prepareSparkConf(appTag, livyConf, prepareConf( @@ -153,6 +155,7 @@ object InteractiveSession extends Logging { request.numExecutors, request.pyFiles, request.queue, + namespace, mockApp) } @@ -193,6 +196,7 @@ object InteractiveSession extends Logging { metadata.numExecutors, metadata.pyFiles, metadata.queue, + metadata.namespace, mockApp) } @@ -433,6 +437,7 @@ class InteractiveSession( val numExecutors: Option[Int], val pyFiles: List[String], val queue: Option[String], + val namespace: String, mockApp: Option[SparkApp]) // For unit test. extends Session(id, name, owner, ttl, idleTimeout, livyConf) with SessionHeartbeat @@ -462,11 +467,13 @@ class InteractiveSession( app = mockApp.orElse { val driverProcess = client.flatMap { c => Option(c.getDriverProcess) } .map(new LineBufferedProcess(_, livyConf.getInt(LivyConf.SPARK_LOGS_SIZE))) - if (!livyConf.isRunningOnKubernetes()) { - driverProcess.map(_ => SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this))) + val namespace = SparkApp.getNamespace(conf, livyConf) + val extrasMap: Map[String, String] = Map(SparkApp.SPARK_KUBERNETES_NAMESPACE_KEY -> namespace) + if (!livyConf.isRunningOnKubernetes()) { + driverProcess.map(_ => SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this), extrasMap)) } else { // Create SparkApp for Kubernetes anyway - Some(SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this))) + Some(SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this), extrasMap)) } } @@ -535,7 +542,7 @@ class InteractiveSession( heartbeatTimeout.toSeconds.toInt, owner, ttl, idleTimeout, driverMemory, driverCores, executorMemory, executorCores, conf, archives, files, jars, numExecutors, pyFiles, queue, - proxyUser, rscDriverUri) + proxyUser, rscDriverUri, namespace) override def state: SessionState = { if (serverSideState == SessionState.Running) { diff --git a/server/src/main/scala/org/apache/livy/utils/SparkApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkApp.scala index e424f80fc..fbc987210 100644 --- a/server/src/main/scala/org/apache/livy/utils/SparkApp.scala +++ b/server/src/main/scala/org/apache/livy/utils/SparkApp.scala @@ -21,6 +21,9 @@ import scala.collection.JavaConverters._ import org.apache.livy.LivyConf +import java.io.{File, FileInputStream} +import java.util.Properties + object AppInfo { val DRIVER_LOG_URL_NAME = "driverLogUrl" val SPARK_UI_URL_NAME = "sparkUiUrl" @@ -56,12 +59,23 @@ trait SparkAppListener { */ object SparkApp { private val SPARK_YARN_TAG_KEY = "spark.yarn.tags" - + val SPARK_KUBERNETES_NAMESPACE_KEY = "spark.kubernetes.namespace" object State extends Enumeration { val STARTING, RUNNING, FINISHED, FAILED, KILLED = Value } type State = State.Value + def getNamespace(conf: Map[String, String], livyConf: LivyConf): String = { + var namespace:String = conf.getOrElse(SPARK_KUBERNETES_NAMESPACE_KEY, "") + if(namespace == "") { + val sparkHome = livyConf.sparkHome().get //SPARK_HOME is mandatory for Livy + val sparkDefaultsPath = sparkHome + File.separator + "conf" + File.separator + "spark-defaults.conf" + val properties = new Properties() + properties.load(new FileInputStream(sparkDefaultsPath)) + namespace = properties.getProperty(SPARK_KUBERNETES_NAMESPACE_KEY,"default") + } + namespace + } /** * Return cluster manager dependent SparkConf. * @@ -102,11 +116,12 @@ object SparkApp { appId: Option[String], process: Option[LineBufferedProcess], livyConf: LivyConf, - listener: Option[SparkAppListener]): SparkApp = { + listener: Option[SparkAppListener], + extrasMap: Map[String, String]): SparkApp = { if (livyConf.isRunningOnYarn()) { new SparkYarnApp(uniqueAppTag, appId, process, listener, livyConf) } else if (livyConf.isRunningOnKubernetes()) { - new SparkKubernetesApp(uniqueAppTag, appId, process, listener, livyConf) + new SparkKubernetesApp(uniqueAppTag, appId, process, listener, livyConf, extrasMap) } else { require(process.isDefined, "process must not be None when Livy master is not YARN or" + "Kubernetes.") diff --git a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala index c4574deef..32e96273b 100644 --- a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala +++ b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala @@ -19,7 +19,6 @@ package org.apache.livy.utils import java.net.URLEncoder import java.util.Collections import java.util.concurrent._ - import scala.annotation.tailrec import scala.collection.mutable.ArrayBuffer import scala.concurrent._ @@ -27,14 +26,15 @@ import scala.concurrent.duration._ import scala.language.postfixOps import scala.util.{Failure, Success, Try} import scala.util.control.NonFatal - import io.fabric8.kubernetes.api.model._ import io.fabric8.kubernetes.api.model.networking.v1.{Ingress, IngressBuilder} import io.fabric8.kubernetes.client.{Config, ConfigBuilder, _} -import org.apache.commons.lang.StringUtils - +import org.apache.commons.lang3.StringUtils import org.apache.livy.{LivyConf, Logging} +import java.util +import scala.collection.mutable + object SparkKubernetesApp extends Logging { private val leakedAppTags = new java.util.concurrent.ConcurrentHashMap[String, Long]() @@ -52,14 +52,14 @@ object SparkKubernetesApp extends Logging { val iter = leakedAppTags.entrySet().iterator() var isRemoved = false val now = System.currentTimeMillis() - val apps = withRetry(kubernetesClient.getApplications()) + val apps = appNamespaces.flatMap(namespace => withRetry(kubernetesClient.inNamespace(namespace).getApplications())) while (iter.hasNext) { val entry = iter.next() apps.find(_.getApplicationTag.contains(entry.getKey)) .foreach({ app => info(s"Kill leaked app ${app.getApplicationId}") - withRetry(kubernetesClient.killApplication(app)) + withRetry(kubernetesClient.inNamespace(app.getApplicationNamespace).killApplication(app)) iter.remove() isRemoved = true }) @@ -138,6 +138,7 @@ object SparkKubernetesApp extends Logging { private var sessionLeakageCheckInterval: Long = _ var kubernetesClient: DefaultKubernetesClient = _ + var appNamespaces: mutable.Set[String] = mutable.Set("default") private var appLookupThreadPoolSize: Long = _ private var appLookupMaxFailedTimes: Long = _ @@ -146,8 +147,7 @@ object SparkKubernetesApp extends Logging { this.livyConf = livyConf // KubernetesClient is thread safe. Create once, share it across threads. - kubernetesClient = - KubernetesClientFactory.createKubernetesClient(livyConf) + kubernetesClient = KubernetesClientFactory.createKubernetesClient(livyConf) cacheLogSize = livyConf.getInt(LivyConf.SPARK_LOGS_SIZE) appLookupTimeout = livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LOOKUP_TIMEOUT).milliseconds @@ -245,7 +245,8 @@ class SparkKubernetesApp private[utils] ( process: Option[LineBufferedProcess], listener: Option[SparkAppListener], livyConf: LivyConf, - kubernetesClient: => KubernetesClient = SparkKubernetesApp.kubernetesClient) // For unit test. + extrasMap: Map[String, String], + kubernetesClient: => DefaultKubernetesClient = SparkKubernetesApp.kubernetesClient) // For unit test. extends SparkApp with Logging { @@ -262,6 +263,8 @@ class SparkKubernetesApp private[utils] ( private var kubernetesTagToAppIdFailedTimes: Int = _ private var kubernetesAppMonitorFailedTimes: Int = _ + private var namespace: String = extrasMap(SparkApp.SPARK_KUBERNETES_NAMESPACE_KEY) + appNamespaces.add(namespace) private def failToMonitor(): Unit = { changeState(SparkApp.State.FAILED) process.foreach(_.destroy()) @@ -292,10 +295,11 @@ class SparkKubernetesApp private[utils] ( } // Get KubernetesApplication by appTag. val appOption: Option[KubernetesApplication] = try { - getAppFromTag(appTag, pollInterval, appLookupTimeout.fromNow) + getAppFromTag(appTag, pollInterval, appLookupTimeout.fromNow, namespace) } catch { case e: Exception => failToGetAppId() + error(s"Exception getting app from tag $appTag in namespace $namespace with message: ", e) appPromise.failure(e) return } @@ -311,7 +315,7 @@ class SparkKubernetesApp private[utils] ( listener.foreach(_.appIdKnown(appId)) if (livyConf.getBoolean(LivyConf.KUBERNETES_INGRESS_CREATE)) { - withRetry(kubernetesClient.createSparkUIIngress(app, livyConf)) + withRetry(kubernetesClient.inNamespace(namespace).createSparkUIIngress(app, livyConf)) } var appInfo = AppInfo() @@ -326,7 +330,7 @@ class SparkKubernetesApp private[utils] ( debug(s"getApplicationReport, applicationId: ${app.getApplicationId}, " + s"namespace: ${app.getApplicationNamespace} " + s"applicationTag: ${app.getApplicationTag}") - val report = kubernetesClient.getApplicationReport(livyConf, app, + val report = kubernetesClient.inNamespace(namespace).getApplicationReport(livyConf, app, cacheLogSize = cacheLogSize) report } @@ -399,7 +403,7 @@ class SparkKubernetesApp private[utils] ( def kubernetesApplication: KubernetesApplication = applicationDetails.get.get if (kubernetesApplication != null && kubernetesApplication.getApplicationId != null) { try { - withRetry(kubernetesClient.killApplication( + withRetry(kubernetesClient.inNamespace(namespace).killApplication( Await.result(appPromise.future, appLookupTimeout))) } catch { // We cannot kill the Kubernetes app without the appTag. @@ -440,10 +444,9 @@ class SparkKubernetesApp private[utils] ( private def getAppFromTag( appTag: String, pollInterval: duration.Duration, - deadline: Deadline): Option[KubernetesApplication] = { + deadline: Deadline, namespace: String): Option[KubernetesApplication] = { import KubernetesExtensions._ - - withRetry(kubernetesClient.getApplications().find(_.getApplicationTag.contains(appTag))) + withRetry(kubernetesClient.inNamespace(namespace).getApplications().find(_.getApplicationTag.contains(appTag))) match { case Some(app) => Some(app) case None => @@ -686,8 +689,7 @@ private[utils] object KubernetesExtensions { appTagLabel: String = SPARK_APP_TAG_LABEL, appIdLabel: String = SPARK_APP_ID_LABEL ): Seq[KubernetesApplication] = { - client.pods.inAnyNamespace - .withLabels(labels.asJava) + client.pods.withLabels(labels.asJava) .withLabel(appTagLabel) .withLabel(appIdLabel) .list.getItems.asScala.map(new KubernetesApplication(_)) @@ -831,6 +833,7 @@ private[utils] object KubernetesClientFactory { def toOption: Option[String] = if (string == null || string.isEmpty) None else Option(string) } + def createKubernetesClient(livyConf: LivyConf): DefaultKubernetesClient = { val masterUrl = sparkMasterToKubernetesApi(livyConf.sparkMaster()) diff --git a/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala b/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala index 401a8beb1..481fcae3d 100644 --- a/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala @@ -143,7 +143,7 @@ class BatchSessionSpec val req = new CreateBatchRequest() val name = Some("Test Batch Session") val mockApp = mock[SparkApp] - val m = BatchRecoveryMetadata(99, name, None, "appTag", null, None) + val m = BatchRecoveryMetadata(99, name, None, "appTag", null, None, "") val batch = BatchSession.recover(m, conf, sessionStore, Some(mockApp)) batch.state shouldBe (SessionState.Recovering) diff --git a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala index e7d651f89..097da65fa 100644 --- a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala @@ -279,7 +279,7 @@ class InteractiveSessionSpec extends FunSpec val m = InteractiveRecoveryMetadata( 78, Some("Test session"), None, "appTag", Spark, 0, null, None, None, None, None, None, None, Map.empty[String, String], List.empty[String], List.empty[String], - List.empty[String], None, List.empty[String], None, None, Some(URI.create(""))) + List.empty[String], None, List.empty[String], None, None, Some(URI.create("")), "") val s = InteractiveSession.recover(m, conf, sessionStore, None, Some(mockClient)) s.start() @@ -298,7 +298,7 @@ class InteractiveSessionSpec extends FunSpec val m = InteractiveRecoveryMetadata( 78, None, None, "appTag", Spark, 0, null, None, None, None, None, None, None, Map.empty[String, String], List.empty[String], List.empty[String], - List.empty[String], None, List.empty[String], None, None, Some(URI.create(""))) + List.empty[String], None, List.empty[String], None, None, Some(URI.create("")),"") val s = InteractiveSession.recover(m, conf, sessionStore, None, Some(mockClient)) s.start() @@ -315,7 +315,7 @@ class InteractiveSessionSpec extends FunSpec val m = InteractiveRecoveryMetadata( 78, None, Some("appId"), "appTag", Spark, 0, null, None, None, None, None, None, None, Map.empty[String, String], List.empty[String], List.empty[String], - List.empty[String], None, List.empty[String], None, None, None) + List.empty[String], None, List.empty[String], None, None, None,"") val s = InteractiveSession.recover(m, conf, sessionStore, None) s.start() s.state shouldBe a[SessionState.Dead] diff --git a/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala b/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala index 363b01f89..62b35b6e0 100644 --- a/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala +++ b/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala @@ -215,7 +215,7 @@ class SessionManagerSpec extends FunSpec with Matchers with LivyBaseUnitTestSuit implicit def executor: ExecutionContext = ExecutionContext.global def makeMetadata(id: Int, appTag: String): BatchRecoveryMetadata = { - BatchRecoveryMetadata(id, Some(s"test-session-$id"), None, appTag, null, None) + BatchRecoveryMetadata(id, Some(s"test-session-$id"), None, appTag, null, None, "") } def mockSession(id: Int): BatchSession = {