Skip to content

Commit 47ce99d

Browse files
committed
Merge branch 'master' into openstack
2 parents 99f095d + 237b96b commit 47ce99d

File tree

60 files changed

+599
-307
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

60 files changed

+599
-307
lines changed

core/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
<dependency>
7171
<groupId>org.apache.commons</groupId>
7272
<artifactId>commons-math3</artifactId>
73+
<version>3.3</version>
7374
<scope>test</scope>
7475
</dependency>
7576
<dependency>

core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@ package org.apache.spark.deploy
1919

2020
private[spark] object ExecutorState extends Enumeration {
2121

22-
val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST = Value
22+
val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST, EXITED = Value
2323

2424
type ExecutorState = Value
2525

26-
def isFinished(state: ExecutorState): Boolean = Seq(KILLED, FAILED, LOST).contains(state)
26+
def isFinished(state: ExecutorState): Boolean = Seq(KILLED, FAILED, LOST, EXITED).contains(state)
2727
}

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -303,10 +303,11 @@ private[spark] class Master(
303303
appInfo.removeExecutor(exec)
304304
exec.worker.removeExecutor(exec)
305305

306+
val normalExit = exitStatus.exists(_ == 0)
306307
// Only retry certain number of times so we don't go into an infinite loop.
307-
if (appInfo.incrementRetryCount < ApplicationState.MAX_NUM_RETRY) {
308+
if (!normalExit && appInfo.incrementRetryCount < ApplicationState.MAX_NUM_RETRY) {
308309
schedule()
309-
} else {
310+
} else if (!normalExit) {
310311
logError("Application %s with ID %s failed %d times, removing it".format(
311312
appInfo.desc.name, appInfo.id, appInfo.retryCount))
312313
removeApplication(appInfo, ApplicationState.FAILED)

core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -154,11 +154,10 @@ private[spark] class ExecutorRunner(
154154
Files.write(header, stderr, Charsets.UTF_8)
155155
stderrAppender = FileAppender(process.getErrorStream, stderr, conf)
156156

157-
// Wait for it to exit; this is actually a bad thing if it happens, because we expect to run
158-
// long-lived processes only. However, in the future, we might restart the executor a few
159-
// times on the same machine.
157+
// Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
158+
// or with nonzero exit code
160159
val exitCode = process.waitFor()
161-
state = ExecutorState.FAILED
160+
state = ExecutorState.EXITED
162161
val message = "Command exited with code " + exitCode
163162
worker ! ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode))
164163
} catch {

core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -139,10 +139,13 @@ class HadoopRDD[K, V](
139139
// Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the
140140
// local process. The local cache is accessed through HadoopRDD.putCachedMetadata().
141141
// The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects.
142-
val newJobConf = new JobConf(broadcastedConf.value.value)
143-
initLocalJobConfFuncOpt.map(f => f(newJobConf))
144-
HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
145-
newJobConf
142+
// synchronize to prevent ConcurrentModificationException (Spark-1097, Hadoop-10456)
143+
broadcastedConf.synchronized {
144+
val newJobConf = new JobConf(broadcastedConf.value.value)
145+
initLocalJobConfFuncOpt.map(f => f(newJobConf))
146+
HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
147+
newJobConf
148+
}
146149
}
147150
}
148151

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -787,8 +787,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
787787
val outfmt = job.getOutputFormatClass
788788
val jobFormat = outfmt.newInstance
789789

790-
if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true) &&
791-
jobFormat.isInstanceOf[NewFileOutputFormat[_, _]]) {
790+
if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) {
792791
// FileOutputFormat ignores the filesystem parameter
793792
jobFormat.checkOutputSpecs(job)
794793
}
@@ -854,8 +853,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
854853
logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " +
855854
valueClass.getSimpleName + ")")
856855

857-
if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true) &&
858-
outputFormatInstance.isInstanceOf[FileOutputFormat[_, _]]) {
856+
if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) {
859857
// FileOutputFormat ignores the filesystem parameter
860858
val ignoredFs = FileSystem.get(conf)
861859
conf.getOutputFormat.checkOutputSpecs(ignoredFs, conf)

core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,15 @@ private object ParallelCollectionRDD {
117117
if (numSlices < 1) {
118118
throw new IllegalArgumentException("Positive number of slices required")
119119
}
120+
// Sequences need to be sliced at the same set of index positions for operations
121+
// like RDD.zip() to behave as expected
122+
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
123+
(0 until numSlices).iterator.map(i => {
124+
val start = ((i * length) / numSlices).toInt
125+
val end = (((i + 1) * length) / numSlices).toInt
126+
(start, end)
127+
})
128+
}
120129
seq match {
121130
case r: Range.Inclusive => {
122131
val sign = if (r.step < 0) {
@@ -128,30 +137,28 @@ private object ParallelCollectionRDD {
128137
r.start, r.end + sign, r.step).asInstanceOf[Seq[T]], numSlices)
129138
}
130139
case r: Range => {
131-
(0 until numSlices).map(i => {
132-
val start = ((i * r.length.toLong) / numSlices).toInt
133-
val end = (((i + 1) * r.length.toLong) / numSlices).toInt
134-
new Range(r.start + start * r.step, r.start + end * r.step, r.step)
135-
}).asInstanceOf[Seq[Seq[T]]]
140+
positions(r.length, numSlices).map({
141+
case (start, end) =>
142+
new Range(r.start + start * r.step, r.start + end * r.step, r.step)
143+
}).toSeq.asInstanceOf[Seq[Seq[T]]]
136144
}
137145
case nr: NumericRange[_] => {
138146
// For ranges of Long, Double, BigInteger, etc
139147
val slices = new ArrayBuffer[Seq[T]](numSlices)
140-
val sliceSize = (nr.size + numSlices - 1) / numSlices // Round up to catch everything
141148
var r = nr
142-
for (i <- 0 until numSlices) {
149+
for ((start, end) <- positions(nr.length, numSlices)) {
150+
val sliceSize = end - start
143151
slices += r.take(sliceSize).asInstanceOf[Seq[T]]
144152
r = r.drop(sliceSize)
145153
}
146154
slices
147155
}
148156
case _ => {
149157
val array = seq.toArray // To prevent O(n^2) operations for List etc
150-
(0 until numSlices).map(i => {
151-
val start = ((i * array.length.toLong) / numSlices).toInt
152-
val end = (((i + 1) * array.length.toLong) / numSlices).toInt
153-
array.slice(start, end).toSeq
154-
})
158+
positions(array.length, numSlices).map({
159+
case (start, end) =>
160+
array.slice(start, end).toSeq
161+
}).toSeq
155162
}
156163
}
157164
}

core/src/main/scala/org/apache/spark/storage/RDDInfo.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ class RDDInfo(
2626
val id: Int,
2727
val name: String,
2828
val numPartitions: Int,
29-
val storageLevel: StorageLevel)
29+
var storageLevel: StorageLevel)
3030
extends Ordered[RDDInfo] {
3131

3232
var numCachedPartitions = 0
@@ -36,8 +36,8 @@ class RDDInfo(
3636

3737
override def toString = {
3838
import Utils.bytesToString
39-
("RDD \"%s\" (%d) Storage: %s; CachedPartitions: %d; TotalPartitions: %d; MemorySize: %s; " +
40-
"TachyonSize: %s; DiskSize: %s").format(
39+
("RDD \"%s\" (%d) StorageLevel: %s; CachedPartitions: %d; TotalPartitions: %d; " +
40+
"MemorySize: %s; TachyonSize: %s; DiskSize: %s").format(
4141
name, id, storageLevel.toString, numCachedPartitions, numPartitions,
4242
bytesToString(memSize), bytesToString(tachyonSize), bytesToString(diskSize))
4343
}

core/src/main/scala/org/apache/spark/storage/StorageUtils.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,10 +89,13 @@ private[spark] object StorageUtils {
8989
// Add up memory, disk and Tachyon sizes
9090
val persistedBlocks =
9191
blocks.filter { status => status.memSize + status.diskSize + status.tachyonSize > 0 }
92+
val _storageLevel =
93+
if (persistedBlocks.length > 0) persistedBlocks(0).storageLevel else StorageLevel.NONE
9294
val memSize = persistedBlocks.map(_.memSize).reduceOption(_ + _).getOrElse(0L)
9395
val diskSize = persistedBlocks.map(_.diskSize).reduceOption(_ + _).getOrElse(0L)
9496
val tachyonSize = persistedBlocks.map(_.tachyonSize).reduceOption(_ + _).getOrElse(0L)
9597
rddInfoMap.get(rddId).map { rddInfo =>
98+
rddInfo.storageLevel = _storageLevel
9699
rddInfo.numCachedPartitions = persistedBlocks.length
97100
rddInfo.memSize = memSize
98101
rddInfo.diskSize = diskSize

core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,24 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
111111
assert(slices.forall(_.isInstanceOf[Range]))
112112
}
113113

114+
test("identical slice sizes between Range and NumericRange") {
115+
val r = ParallelCollectionRDD.slice(1 to 7, 4)
116+
val nr = ParallelCollectionRDD.slice(1L to 7L, 4)
117+
assert(r.size === 4)
118+
for (i <- 0 until r.size) {
119+
assert(r(i).size === nr(i).size)
120+
}
121+
}
122+
123+
test("identical slice sizes between List and NumericRange") {
124+
val r = ParallelCollectionRDD.slice(List(1, 2), 4)
125+
val nr = ParallelCollectionRDD.slice(1L to 2L, 4)
126+
assert(r.size === 4)
127+
for (i <- 0 until r.size) {
128+
assert(r(i).size === nr(i).size)
129+
}
130+
}
131+
114132
test("large ranges don't overflow") {
115133
val N = 100 * 1000 * 1000
116134
val data = 0 until N

0 commit comments

Comments
 (0)