Skip to content

Commit 2a89adc

Browse files
committed
SPARK-1712: TaskDescription instance is too big causes Spark to hang
1 parent 3eb53bd commit 2a89adc

File tree

3 files changed

+22
-7
lines changed

3 files changed

+22
-7
lines changed

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,12 @@ import java.nio.ByteBuffer
2222
import akka.actor._
2323
import akka.remote._
2424

25-
import org.apache.spark.{Logging, SecurityManager, SparkConf}
25+
import org.apache.spark.{SparkEnv, Logging, SecurityManager, SparkConf}
2626
import org.apache.spark.TaskState.TaskState
2727
import org.apache.spark.deploy.SparkHadoopUtil
2828
import org.apache.spark.deploy.worker.WorkerWatcher
2929
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
30+
import org.apache.spark.scheduler.TaskDescription
3031
import org.apache.spark.util.{AkkaUtils, Utils}
3132

3233
private[spark] class CoarseGrainedExecutorBackend(
@@ -61,12 +62,14 @@ private[spark] class CoarseGrainedExecutorBackend(
6162
logError("Slave registration failed: " + message)
6263
System.exit(1)
6364

64-
case LaunchTask(taskDesc) =>
65-
logInfo("Got assigned task " + taskDesc.taskId)
65+
case LaunchTask(data) =>
6666
if (executor == null) {
6767
logError("Received LaunchTask command but executor was null")
6868
System.exit(1)
6969
} else {
70+
val ser = SparkEnv.get.closureSerializer.newInstance()
71+
val taskDesc =ser.deserialize[TaskDescription](data.value)
72+
logInfo("Got assigned task " + taskDesc.taskId)
7073
executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
7174
}
7275

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable
2828
private[spark] object CoarseGrainedClusterMessages {
2929

3030
// Driver to executors
31-
case class LaunchTask(task: TaskDescription) extends CoarseGrainedClusterMessage
31+
case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage
3232

3333
case class KillTask(taskId: Long, executor: String, interruptThread: Boolean)
3434
extends CoarseGrainedClusterMessage

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,10 @@ import akka.actor._
2727
import akka.pattern.ask
2828
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
2929

30-
import org.apache.spark.{Logging, SparkException, TaskState}
30+
import org.apache.spark.{SparkEnv, Logging, SparkException, TaskState}
3131
import org.apache.spark.scheduler.{SchedulerBackend, SlaveLost, TaskDescription, TaskSchedulerImpl, WorkerOffer}
3232
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
33-
import org.apache.spark.util.{AkkaUtils, Utils}
33+
import org.apache.spark.util.{SerializableBuffer, AkkaUtils, Utils}
3434

3535
/**
3636
* A scheduler backend that waits for coarse grained executors to connect to it through Akka.
@@ -48,6 +48,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
4848
var totalCoreCount = new AtomicInteger(0)
4949
val conf = scheduler.sc.conf
5050
private val timeout = AkkaUtils.askTimeout(conf)
51+
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
5152

5253
class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
5354
private val executorActor = new HashMap[String, ActorRef]
@@ -141,7 +142,18 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
141142
def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
142143
for (task <- tasks.flatten) {
143144
freeCores(task.executorId) -= scheduler.CPUS_PER_TASK
144-
executorActor(task.executorId) ! LaunchTask(task)
145+
val ser = SparkEnv.get.closureSerializer.newInstance()
146+
val taskBytes = ser.serialize(task).array()
147+
val serializedTask = ser.serialize(taskBytes)
148+
if (serializedTask.limit >= akkaFrameSize - 1024) {
149+
var msg = "Serialized task %s:%d were %d bytes which " +
150+
"exceeds spark.akka.frameSize (%d bytes)."
151+
msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize)
152+
val exception = new SparkException(msg)
153+
logError(msg, exception)
154+
throw exception
155+
}
156+
executorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask))
145157
}
146158
}
147159

0 commit comments

Comments
 (0)