@@ -21,11 +21,11 @@ import java.io.File
21
21
import java .util .{Collections , List => JList }
22
22
import java .util .concurrent .locks .ReentrantLock
23
23
24
- import com .google .common .collect .HashBiMap
25
-
26
24
import scala .collection .JavaConversions ._
27
25
import scala .collection .mutable .{HashMap , HashSet }
28
26
27
+ import com .google .common .collect .HashBiMap
28
+
29
29
import org .apache .mesos .Protos .{TaskInfo => MesosTaskInfo , _ }
30
30
import org .apache .mesos .{Scheduler => MScheduler , _ }
31
31
import org .apache .spark .scheduler .TaskSchedulerImpl
@@ -99,7 +99,7 @@ private[spark] class CoarseMesosSchedulerBackend(
99
99
startScheduler(master, CoarseMesosSchedulerBackend .this , fwInfo)
100
100
}
101
101
102
- def createCommand (offer : Offer , numCores : Int ): CommandInfo = {
102
+ def createCommand (offer : Offer , numCores : Int , taskId : Int ): CommandInfo = {
103
103
val executorSparkHome = conf.getOption(" spark.mesos.executor.home" )
104
104
.orElse(sc.getSparkHome())
105
105
.getOrElse {
@@ -155,7 +155,7 @@ private[spark] class CoarseMesosSchedulerBackend(
155
155
s " cd $basename*; $prefixEnv " +
156
156
" ./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend" +
157
157
s " --driver-url $driverURL" +
158
- s " --executor-id ${offer.getSlaveId.getValue}" +
158
+ s " --executor-id ${sparkExecutorId( offer.getSlaveId.getValue, taskId.toString) }" +
159
159
s " --hostname ${offer.getHostname}" +
160
160
s " --cores $numCores" +
161
161
s " --app-id $appId" )
@@ -213,7 +213,7 @@ private[spark] class CoarseMesosSchedulerBackend(
213
213
val task = MesosTaskInfo .newBuilder()
214
214
.setTaskId(TaskID .newBuilder().setValue(taskId.toString).build())
215
215
.setSlaveId(offer.getSlaveId)
216
- .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave))
216
+ .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave, taskId ))
217
217
.setName(" Task " + taskId)
218
218
.addResources(createResource(" cpus" , cpusToUse))
219
219
.addResources(createResource(" mem" ,
@@ -345,14 +345,12 @@ private[spark] class CoarseMesosSchedulerBackend(
345
345
}
346
346
}
347
347
348
- assert(pendingRemovedSlaveIds.size <= taskIdToSlaveId.size)
349
-
350
348
// We cannot simply decrement from the existing executor limit as we may not able to
351
349
// launch as much executors as the limit. But we assume if we are notified to kill
352
350
// executors, that means the scheduler wants to set the limit that is less than
353
351
// the amount of the executors that has been launched. Therefore, we take the existing
354
352
// amount of executors launched and deduct the executors killed as the new limit.
355
- executorLimitOption = Option (taskIdToSlaveId.size - pendingRemovedSlaveIds.size)
353
+ executorLimitOption = Option (Math .max( 0 , taskIdToSlaveId.size - pendingRemovedSlaveIds.size) )
356
354
true
357
355
}
358
356
}
0 commit comments