Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updates and Fixes to Make spark-memory work on opensource spark #4

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions Config
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# -*-perl-*-
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you remove this file from the pr please?


package.Spark-memory = {
interfaces = (1.0);

deploy = {
generic = true;
};

build-environment = {
chroot = basic;
network-access = blocked;
};

# Use NoOpBuild. See https://w.amazon.com/index.php/BrazilBuildSystem/NoOpBuild
build-system = no-op;
build-tools = {
1.0 = {
NoOpBuild = 1.0;
};
};

# Use runtime-dependencies for when you want to bring in additional
# packages when deploying.
# Use dependencies instead if you intend for these dependencies to
# be exported to other packages that build against you.
dependencies = {
1.0 = {
};
};

runtime-dependencies = {
1.0 = {
};
};

};
14 changes: 13 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,23 @@
Spark Memory Monitor
========================
Intro
----------
Memory management is a complicated system in spark that often times leads to confusing and difficult to diagnose problems. Typically the user gets anerror message that shows that YARN has killed an executor for exceeding physical memory limits but gives no insight into why that is. Further more the typical tools used to investigate java memory use are ineffective due to the extensive use of off-heap memory use in spark. This results in a portition of the process memory being “unaccounted for”. spark-memory is a tool to provide insight into the parts of the code that are not typically visible giving insight into the max memory use.

Usage
> ExecutorLostFailure (executor 19 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 5.6 GB of 5.5 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead or disabling yarn.nodemanager.vmem-check-enabled because of YARN-4714.

Example yarn error message


Building spark-memory
-----------

Build with `mvn package`, `sbt`, etc.


Modifying spark-submit to use spark-memory
-----------

Include that jar in your spark application. You could bundle it directly, or just include it with `--jars`.

The monitoring is configured via java system properties:
Expand Down
53 changes: 0 additions & 53 deletions core/src/main/scala/com/cloudera/spark/MemoryMonitor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -327,60 +327,7 @@ object MemoryMonitor {
}
}

class MemoryMonitorExecutorExtension extends ExecutorPlugin {
// the "extension class" api just lets you invoke a constructor. We really just want to
// call this static method, so that's good enough.
MemoryMonitor.installIfSysProps()
val args = MemoryMonitorArgs.sysPropsArgs

val monitoredTaskCount = new AtomicInteger(0)

val scheduler = if (args.stagesToPoll != null && args.stagesToPoll.nonEmpty) {
// TODO share polling executors?
new ScheduledThreadPoolExecutor(1, new ThreadFactory {
override def newThread(r: Runnable): Thread = {
val t = new Thread(r, "thread-dump poll thread")
t.setDaemon(true)
t
}
})
} else {
null
}
val pollingTask = new AtomicReference[ScheduledFuture[_]]()

override def taskStart(taskContext: TaskContext): Unit = {
if (args.stagesToPoll.contains(taskContext.stageId())) {
if (monitoredTaskCount.getAndIncrement() == 0) {
// TODO schedule thread polling
val task = scheduler.scheduleWithFixedDelay(new Runnable {
override def run(): Unit = {
val d = MemoryMonitor.dateFormat.format(System.currentTimeMillis())
println(s"Polled thread dump @ $d")
MemoryMonitor.showThreadDump(MemoryMonitor.getThreadInfo)
}
}, 0, args.threadDumpFreqMillis, TimeUnit.MILLISECONDS)
pollingTask.set(task)
}
}
}

override def onTaskFailure(context: TaskContext, error: Throwable): Unit = {
removeActiveTask(context)
}

override def onTaskCompletion(context: TaskContext): Unit = {
removeActiveTask(context)
}

private def removeActiveTask(context: TaskContext): Unit = {
if (args.stagesToPoll.contains(context.stageId())) {
if (monitoredTaskCount.decrementAndGet() == 0) {
pollingTask.get().cancel(false)
}
}
}
}

class MemoryMonitorArgs extends FieldArgs {
var enabled = false
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.cloudera.spark

import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
import java.util.concurrent.{ScheduledFuture, ScheduledThreadPoolExecutor, ThreadFactory, TimeUnit}

import org.apache.spark.TaskContext
import org.apache.spark.executor.ExecutorPlugin

class MemoryMonitorExecutorExtension extends ExecutorPlugin with org.apache.spark.ExecutorPlugin {
// the "extension class" api just lets you invoke a constructor. We really just want to
// call this static method, so that's good enough.
MemoryMonitor.installIfSysProps()
val args = MemoryMonitorArgs.sysPropsArgs

val monitoredTaskCount = new AtomicInteger(0)

val scheduler = if (args.stagesToPoll != null && args.stagesToPoll.nonEmpty) {
// TODO share polling executors?
new ScheduledThreadPoolExecutor(1, new ThreadFactory {
override def newThread(r: Runnable): Thread = {
val t = new Thread(r, "thread-dump poll thread")
t.setDaemon(true)
t
}
})
} else {
null
}
val pollingTask = new AtomicReference[ScheduledFuture[_]]()

override def taskStart(taskContext: TaskContext): Unit = {
if (args.stagesToPoll.contains(taskContext.stageId())) {
if (monitoredTaskCount.getAndIncrement() == 0) {
// TODO schedule thread polling
val task = scheduler.scheduleWithFixedDelay(new Runnable {
override def run(): Unit = {
val d = MemoryMonitor.dateFormat.format(System.currentTimeMillis())
println(s"Polled thread dump @ $d")
MemoryMonitor.showThreadDump(MemoryMonitor.getThreadInfo)
}
}, 0, args.threadDumpFreqMillis, TimeUnit.MILLISECONDS)
pollingTask.set(task)
}
}
}

override def onTaskFailure(context: TaskContext, error: Throwable): Unit = {
removeActiveTask(context)
}

override def onTaskCompletion(context: TaskContext): Unit = {
removeActiveTask(context)
}

private def removeActiveTask(context: TaskContext): Unit = {
if (args.stagesToPoll.contains(context.stageId())) {
if (monitoredTaskCount.decrementAndGet() == 0) {
pollingTask.get().cancel(false)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.apache.spark.memory

import com.cloudera.spark.{Reflector, IncrementBytes, MemoryGetter}
import org.apache.spark.util.{Utils, ThreadStackTrace}
import org.apache.spark.{SparkContext, SparkEnv}

class SparkMemoryManagerHandle(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.cloudera.spark

import org.scalatest.FunSuite

class MemoryMonitorExecutorExtensionSuite extends FunSuite {
test("MemoryMonitorExecutorExtension should extend the correct class of ExecutorPlugin") {
assert(classOf[MemoryMonitorExecutorExtension].getInterfaces.contains(classOf[org.apache.spark.executor.ExecutorPlugin]))
assert(classOf[MemoryMonitorExecutorExtension].getInterfaces.contains(classOf[org.apache.spark.ExecutorPlugin]))
}
}
4 changes: 1 addition & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
<artifactId>sumac_2.11</artifactId>
<version>0.3.0</version>
</dependency>

<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
Expand All @@ -46,10 +45,9 @@
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.0</version>
<version>2.4.3</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_2.11</artifactId>
Expand Down