Skip to content

Commit 47414eb

Browse files
committed
Merge pull request #74 from markhamstra/csd-1.4
SKIPME merge Apache branch-1.4
2 parents b546cbf + 8b86fde commit 47414eb

File tree

13 files changed

+319
-62
lines changed

13 files changed

+319
-62
lines changed

R/pkg/R/client.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ generateSparkSubmitArgs <- function(args, sparkHome, jars, sparkSubmitOpts, pack
4848
jars <- paste("--jars", jars)
4949
}
5050

51-
if (packages != "") {
51+
if (!identical(packages, "")) {
5252
packages <- paste("--packages", packages)
5353
}
5454

R/pkg/inst/tests/test_client.R

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,3 +30,7 @@ test_that("no package specified doesn't add packages flag", {
3030
expect_equal(gsub("[[:space:]]", "", args),
3131
"")
3232
})
33+
34+
test_that("multiple packages don't produce a warning", {
35+
expect_that(generateSparkSubmitArgs("", "", "", "", c("A", "B")), not(gives_warning()))
36+
})

build/sbt-launch-lib.bash

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,13 @@ acquire_sbt_jar () {
5151
printf "Attempting to fetch sbt\n"
5252
JAR_DL="${JAR}.part"
5353
if [ $(command -v curl) ]; then
54-
(curl --silent ${URL1} > "${JAR_DL}" || curl --silent ${URL2} > "${JAR_DL}") && mv "${JAR_DL}" "${JAR}"
54+
(curl --fail --location --silent ${URL1} > "${JAR_DL}" ||\
55+
(rm -f "${JAR_DL}" && curl --fail --location --silent ${URL2} > "${JAR_DL}")) &&\
56+
mv "${JAR_DL}" "${JAR}"
5557
elif [ $(command -v wget) ]; then
56-
(wget --quiet ${URL1} -O "${JAR_DL}" || wget --quiet ${URL2} -O "${JAR_DL}") && mv "${JAR_DL}" "${JAR}"
58+
(wget --quiet ${URL1} -O "${JAR_DL}" ||\
59+
(rm -f "${JAR_DL}" && wget --quiet ${URL2} -O "${JAR_DL}")) &&\
60+
mv "${JAR_DL}" "${JAR}"
5761
else
5862
printf "You do not have curl or wget installed, please install sbt manually from http://www.scala-sbt.org/\n"
5963
exit -1

core/src/main/scala/org/apache/spark/Partitioner.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ object Partitioner {
5656
*/
5757
def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
5858
val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse
59-
for (r <- bySize if r.partitioner.isDefined) {
59+
for (r <- bySize if r.partitioner.isDefined && r.partitioner.get.numPartitions > 0) {
6060
return r.partitioner.get
6161
}
6262
if (rdd.context.conf.contains("spark.default.parallelism")) {

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 96 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -532,70 +532,47 @@ private[master] class Master(
532532
}
533533

534534
/**
535-
* Schedule executors to be launched on the workers.
536-
*
537-
* There are two modes of launching executors. The first attempts to spread out an application's
538-
* executors on as many workers as possible, while the second does the opposite (i.e. launch them
539-
* on as few workers as possible). The former is usually better for data locality purposes and is
540-
* the default.
541-
*
542-
* The number of cores assigned to each executor is configurable. When this is explicitly set,
543-
* multiple executors from the same application may be launched on the same worker if the worker
544-
* has enough cores and memory. Otherwise, each executor grabs all the cores available on the
545-
* worker by default, in which case only one executor may be launched on each worker.
535+
* Schedule and launch executors on workers
546536
*/
547537
private def startExecutorsOnWorkers(): Unit = {
548538
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
549539
// in the queue, then the second app, etc.
550-
if (spreadOutApps) {
551-
// Try to spread out each app among all the workers, until it has all its cores
552-
for (app <- waitingApps if app.coresLeft > 0) {
553-
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
554-
.filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
555-
worker.coresFree >= app.desc.coresPerExecutor.getOrElse(1))
556-
.sortBy(_.coresFree).reverse
557-
val numUsable = usableWorkers.length
558-
val assigned = new Array[Int](numUsable) // Number of cores to give on each node
559-
var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
560-
var pos = 0
561-
while (toAssign > 0) {
562-
if (usableWorkers(pos).coresFree - assigned(pos) > 0) {
563-
toAssign -= 1
564-
assigned(pos) += 1
565-
}
566-
pos = (pos + 1) % numUsable
567-
}
568-
// Now that we've decided how many cores to give on each node, let's actually give them
569-
for (pos <- 0 until numUsable if assigned(pos) > 0) {
570-
allocateWorkerResourceToExecutors(app, assigned(pos), usableWorkers(pos))
571-
}
572-
}
573-
} else {
574-
// Pack each app into as few workers as possible until we've assigned all its cores
575-
for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
576-
for (app <- waitingApps if app.coresLeft > 0) {
577-
allocateWorkerResourceToExecutors(app, app.coresLeft, worker)
578-
}
540+
for (app <- waitingApps if app.coresLeft > 0) {
541+
val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor
542+
// Filter out workers that don't have enough resources to launch an executor
543+
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
544+
.filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
545+
worker.coresFree >= coresPerExecutor.getOrElse(1))
546+
.sortBy(_.coresFree).reverse
547+
val assignedCores = Master.scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
548+
549+
// Now that we've decided how many cores to allocate on each worker, let's allocate them
550+
for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
551+
allocateWorkerResourceToExecutors(
552+
app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
579553
}
580554
}
581555
}
582556

583557
/**
584558
* Allocate a worker's resources to one or more executors.
585559
* @param app the info of the application which the executors belong to
586-
* @param coresToAllocate cores on this worker to be allocated to this application
560+
* @param assignedCores number of cores on this worker for this application
561+
* @param coresPerExecutor number of cores per executor
587562
* @param worker the worker info
588563
*/
589564
private def allocateWorkerResourceToExecutors(
590565
app: ApplicationInfo,
591-
coresToAllocate: Int,
566+
assignedCores: Int,
567+
coresPerExecutor: Option[Int],
592568
worker: WorkerInfo): Unit = {
593-
val memoryPerExecutor = app.desc.memoryPerExecutorMB
594-
val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(coresToAllocate)
595-
var coresLeft = coresToAllocate
596-
while (coresLeft >= coresPerExecutor && worker.memoryFree >= memoryPerExecutor) {
597-
val exec = app.addExecutor(worker, coresPerExecutor)
598-
coresLeft -= coresPerExecutor
569+
// If the number of cores per executor is specified, we divide the cores assigned
570+
// to this worker evenly among the executors with no remainder.
571+
// Otherwise, we launch a single executor that grabs all the assignedCores on this worker.
572+
val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
573+
val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
574+
for (i <- 1 to numExecutors) {
575+
val exec = app.addExecutor(worker, coresToAssign)
599576
launchExecutor(worker, exec)
600577
app.state = ApplicationState.RUNNING
601578
}
@@ -892,7 +869,7 @@ private[master] class Master(
892869

893870
private[deploy] object Master extends Logging {
894871
val systemName = "sparkMaster"
895-
private val actorName = "Master"
872+
val actorName = "Master"
896873

897874
def main(argStrings: Array[String]) {
898875
SignalLogger.register(log)
@@ -944,4 +921,74 @@ private[deploy] object Master extends Logging {
944921
val portsResponse = Await.result(portsRequest, timeout).asInstanceOf[BoundPortsResponse]
945922
(actorSystem, boundPort, portsResponse.webUIPort, portsResponse.restPort)
946923
}
924+
925+
926+
/**
927+
* Schedule executors to be launched on the workers.
928+
* Returns an array containing number of cores assigned to each worker.
929+
*
930+
* There are two modes of launching executors. The first attempts to spread out an application's
931+
* executors on as many workers as possible, while the second does the opposite (i.e. launch them
932+
* on as few workers as possible). The former is usually better for data locality purposes and is
933+
* the default.
934+
*
935+
* The number of cores assigned to each executor is configurable. When this is explicitly set,
936+
* multiple executors from the same application may be launched on the same worker if the worker
937+
* has enough cores and memory. Otherwise, each executor grabs all the cores available on the
938+
* worker by default, in which case only one executor may be launched on each worker.
939+
*
940+
* It is important to allocate coresPerExecutor on each worker at a time (instead of 1 core
941+
* at a time). Consider the following example: cluster has 4 workers with 16 cores each.
942+
* User requests 3 executors (spark.cores.max = 48, spark.executor.cores = 16). If 1 core is
943+
* allocated at a time, 12 cores from each worker would be assigned to each executor.
944+
* Since 12 < 16, no executors would launch [SPARK-8881].
945+
*
946+
* Unfortunately, this must be moved out here into the Master object because Akka allows
947+
* neither creating actors outside of Props nor accessing the Master after setting up the
948+
* actor system. Otherwise, there is no way to test it.
949+
*/
950+
def scheduleExecutorsOnWorkers(
951+
app: ApplicationInfo,
952+
usableWorkers: Array[WorkerInfo],
953+
spreadOutApps: Boolean): Array[Int] = {
954+
// If the number of cores per executor is not specified, then we can just schedule
955+
// 1 core at a time since we expect a single executor to be launched on each worker
956+
val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)
957+
val memoryPerExecutor = app.desc.memoryPerExecutorMB
958+
val numUsable = usableWorkers.length
959+
val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker
960+
val assignedMemory = new Array[Int](numUsable) // Amount of memory to give to each worker
961+
var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
962+
var freeWorkers = (0 until numUsable).toIndexedSeq
963+
964+
def canLaunchExecutor(pos: Int): Boolean = {
965+
usableWorkers(pos).coresFree - assignedCores(pos) >= coresPerExecutor &&
966+
usableWorkers(pos).memoryFree - assignedMemory(pos) >= memoryPerExecutor
967+
}
968+
969+
while (coresToAssign >= coresPerExecutor && freeWorkers.nonEmpty) {
970+
freeWorkers = freeWorkers.filter(canLaunchExecutor)
971+
freeWorkers.foreach { pos =>
972+
var keepScheduling = true
973+
while (keepScheduling && canLaunchExecutor(pos) && coresToAssign >= coresPerExecutor) {
974+
coresToAssign -= coresPerExecutor
975+
assignedCores(pos) += coresPerExecutor
976+
// If cores per executor is not set, we are assigning 1 core at a time
977+
// without actually meaning to launch 1 executor for each core assigned
978+
if (app.desc.coresPerExecutor.isDefined) {
979+
assignedMemory(pos) += memoryPerExecutor
980+
}
981+
982+
// Spreading out an application means spreading out its executors across as
983+
// many workers as possible. If we are not spreading out, then we should keep
984+
// scheduling executors on this worker until we use all of its resources.
985+
// Otherwise, just move on to the next worker.
986+
if (spreadOutApps) {
987+
keepScheduling = false
988+
}
989+
}
990+
}
991+
}
992+
assignedCores
993+
}
947994
}

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -430,18 +430,19 @@ private[spark] object Utils extends Logging {
430430
val lockFileName = s"${url.hashCode}${timestamp}_lock"
431431
val localDir = new File(getLocalDir(conf))
432432
val lockFile = new File(localDir, lockFileName)
433-
val raf = new RandomAccessFile(lockFile, "rw")
433+
val lockFileChannel = new RandomAccessFile(lockFile, "rw").getChannel()
434434
// Only one executor entry.
435435
// The FileLock is only used to control synchronization for executors download file,
436436
// it's always safe regardless of lock type (mandatory or advisory).
437-
val lock = raf.getChannel().lock()
437+
val lock = lockFileChannel.lock()
438438
val cachedFile = new File(localDir, cachedFileName)
439439
try {
440440
if (!cachedFile.exists()) {
441441
doFetchFile(url, localDir, cachedFileName, conf, securityMgr, hadoopConf)
442442
}
443443
} finally {
444444
lock.release()
445+
lockFileChannel.close()
445446
}
446447
copyFile(
447448
url,

0 commit comments

Comments
 (0)