Skip to content

Commit

Permalink
[LIVY-991][SERVER] Facing issues with the Livy UI Driver link (apache…
Browse files Browse the repository at this point in the history
…#437)

Added conditional check on finished state to set driverlogUrl to null

Co-authored-by: Rajshekhar Muchandi <[email protected]>
  • Loading branch information
2 people authored and jimenefe committed Oct 14, 2024
1 parent c23235b commit c897238
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -315,9 +315,12 @@ class SparkYarnApp private[utils] (
val latestAppInfo = {
val attempt =
yarnClient.getApplicationAttemptReport(appReport.getCurrentApplicationAttemptId)
val driverLogUrl =
val driverLogUrl = if (state == SparkApp.State.FINISHED) {
None
} else {
Try(yarnClient.getContainerReport(attempt.getAMContainerId).getLogUrl)
.toOption
}
AppInfo(driverLogUrl, Option(appReport.getTrackingUrl))
}

Expand Down
118 changes: 118 additions & 0 deletions server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,124 @@ class SparkYarnAppSpec extends FunSpec with LivyBaseUnitTestSuite {
}
}

it("should expose driver log url and Spark UI url in KILLED state") {
Clock.withSleepMethod(mockSleep) {
val mockYarnClient = mock[YarnClient]
val driverLogUrl = "DRIVER LOG URL"
val sparkUiUrl = "SPARK UI URL"

val mockApplicationAttemptId = mock[ApplicationAttemptId]
val mockAppReport = mock[ApplicationReport]
when(mockAppReport.getApplicationId).thenReturn(appId)
when(mockAppReport.getFinalApplicationStatus).thenReturn(FinalApplicationStatus.SUCCEEDED)
when(mockAppReport.getTrackingUrl).thenReturn(sparkUiUrl)
when(mockAppReport.getCurrentApplicationAttemptId).thenReturn(mockApplicationAttemptId)
var done = false
when(mockAppReport.getYarnApplicationState).thenAnswer(new Answer[YarnApplicationState]() {
override def answer(invocation: InvocationOnMock): YarnApplicationState = {
KILLED
}
})
when(mockYarnClient.getApplicationReport(appId)).thenReturn(mockAppReport)

val mockAttemptReport = mock[ApplicationAttemptReport]
val mockContainerId = mock[ContainerId]
when(mockAttemptReport.getAMContainerId).thenReturn(mockContainerId)
when(mockYarnClient.getApplicationAttemptReport(mockApplicationAttemptId))
.thenReturn(mockAttemptReport)

val mockContainerReport = mock[ContainerReport]
when(mockYarnClient.getContainerReport(mockContainerId)).thenReturn(mockContainerReport)

// Block test until getLogUrl is called 10 times.
val getLogUrlCountDown = new CountDownLatch(10)
when(mockContainerReport.getLogUrl).thenAnswer(new Answer[String] {
override def answer(invocation: InvocationOnMock): String = {
getLogUrlCountDown.countDown()
driverLogUrl
}
})

val mockListener = mock[SparkAppListener]

val app = new SparkYarnApp(
appTag, appIdOption, None, Some(mockListener), livyConf, mockYarnClient)
cleanupThread(app.yarnAppMonitorThread) {
getLogUrlCountDown.await(TEST_TIMEOUT.length, TEST_TIMEOUT.unit)
done = true

app.yarnAppMonitorThread.join(TEST_TIMEOUT.toMillis)
assert(!app.yarnAppMonitorThread.isAlive,
"YarnAppMonitorThread should terminate after YARN app is finished.")

verify(mockYarnClient, atLeast(1)).getApplicationReport(appId)
verify(mockAppReport, atLeast(1)).getTrackingUrl()
verify(mockContainerReport, atLeast(1)).getLogUrl()
verify(mockListener).appIdKnown(appId.toString)
verify(mockListener).infoChanged(AppInfo(Some(driverLogUrl), Some(sparkUiUrl)))
}
}
}

it("should not expose driver log url and Spark UI url in FINISHED state") {
Clock.withSleepMethod(mockSleep) {
val mockYarnClient = mock[YarnClient]
val driverLogUrl = "DRIVER LOG URL"
val sparkUiUrl = "SPARK UI URL"

val mockApplicationAttemptId = mock[ApplicationAttemptId]
val mockAppReport = mock[ApplicationReport]
when(mockAppReport.getApplicationId).thenReturn(appId)
when(mockAppReport.getFinalApplicationStatus).thenReturn(FinalApplicationStatus.SUCCEEDED)
when(mockAppReport.getTrackingUrl).thenReturn(sparkUiUrl)
when(mockAppReport.getCurrentApplicationAttemptId).thenReturn(mockApplicationAttemptId)
var done = false
when(mockAppReport.getYarnApplicationState).thenAnswer(new Answer[YarnApplicationState]() {
override def answer(invocation: InvocationOnMock): YarnApplicationState = {
FINISHED
}
})
when(mockYarnClient.getApplicationReport(appId)).thenReturn(mockAppReport)

val mockAttemptReport = mock[ApplicationAttemptReport]
val mockContainerId = mock[ContainerId]
when(mockAttemptReport.getAMContainerId).thenReturn(mockContainerId)
when(mockYarnClient.getApplicationAttemptReport(mockApplicationAttemptId))
.thenReturn(mockAttemptReport)

val mockContainerReport = mock[ContainerReport]
when(mockYarnClient.getContainerReport(mockContainerId)).thenReturn(mockContainerReport)

// Block test until getLogUrl is called 10 times.
val getLogUrlCountDown = new CountDownLatch(10)
when(mockContainerReport.getLogUrl).thenAnswer(new Answer[String] {
override def answer(invocation: InvocationOnMock): String = {
getLogUrlCountDown.countDown()
driverLogUrl
}
})

val mockListener = mock[SparkAppListener]

val app = new SparkYarnApp(
appTag, appIdOption, None, Some(mockListener), livyConf, mockYarnClient)
cleanupThread(app.yarnAppMonitorThread) {
getLogUrlCountDown.await(TEST_TIMEOUT.length, TEST_TIMEOUT.unit)
done = true

app.yarnAppMonitorThread.join(TEST_TIMEOUT.toMillis)
assert(!app.yarnAppMonitorThread.isAlive,
"YarnAppMonitorThread should terminate after YARN app is finished.")

verify(mockYarnClient, atLeast(1)).getApplicationReport(appId)
verify(mockAppReport, atLeast(1)).getTrackingUrl()
verify(mockContainerReport, never()).getLogUrl()
verify(mockListener).appIdKnown(appId.toString)
verify(mockListener, never()).infoChanged(AppInfo(Some(driverLogUrl), Some(sparkUiUrl)))
}
}
}

it("should not die on YARN-4411") {
Clock.withSleepMethod(mockSleep) {
val mockYarnClient = mock[YarnClient]
Expand Down

0 comments on commit c897238

Please sign in to comment.