Skip to content

Commit f61e9d2

Browse files
committed
Merge remote-tracking branch 'upstream/master' into dt-timing
2 parents bcf874a + 9a54de1 commit f61e9d2

21 files changed

+541
-212
lines changed

bin/spark-sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ CLASS="org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver"
2929
FWDIR="$(cd `dirname $0`/..; pwd)"
3030

3131
function usage {
32-
echo "Usage: ./sbin/spark-sql [options] [cli option]"
32+
echo "Usage: ./bin/spark-sql [options] [cli option]"
3333
pattern="usage"
3434
pattern+="\|Spark assembly has been built with Hive"
3535
pattern+="\|NOTE: SPARK_PREPEND_CLASSES is set"

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -156,11 +156,9 @@ object SparkEnv extends Logging {
156156
conf.set("spark.driver.port", boundPort.toString)
157157
}
158158

159-
// Create an instance of the class named by the given Java system property, or by
160-
// defaultClassName if the property is not set, and return it as a T
161-
def instantiateClass[T](propertyName: String, defaultClassName: String): T = {
162-
val name = conf.get(propertyName, defaultClassName)
163-
val cls = Class.forName(name, true, Utils.getContextOrSparkClassLoader)
159+
// Create an instance of the class with the given name, possibly initializing it with our conf
160+
def instantiateClass[T](className: String): T = {
161+
val cls = Class.forName(className, true, Utils.getContextOrSparkClassLoader)
164162
// Look for a constructor taking a SparkConf and a boolean isDriver, then one taking just
165163
// SparkConf, then one taking no arguments
166164
try {
@@ -178,11 +176,17 @@ object SparkEnv extends Logging {
178176
}
179177
}
180178

181-
val serializer = instantiateClass[Serializer](
179+
// Create an instance of the class named by the given SparkConf property, or defaultClassName
180+
// if the property is not set, possibly initializing it with our conf
181+
def instantiateClassFromConf[T](propertyName: String, defaultClassName: String): T = {
182+
instantiateClass[T](conf.get(propertyName, defaultClassName))
183+
}
184+
185+
val serializer = instantiateClassFromConf[Serializer](
182186
"spark.serializer", "org.apache.spark.serializer.JavaSerializer")
183187
logDebug(s"Using serializer: ${serializer.getClass}")
184188

185-
val closureSerializer = instantiateClass[Serializer](
189+
val closureSerializer = instantiateClassFromConf[Serializer](
186190
"spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer")
187191

188192
def registerOrLookup(name: String, newActor: => Actor): ActorRef = {
@@ -246,8 +250,13 @@ object SparkEnv extends Logging {
246250
"."
247251
}
248252

249-
val shuffleManager = instantiateClass[ShuffleManager](
250-
"spark.shuffle.manager", "org.apache.spark.shuffle.hash.HashShuffleManager")
253+
// Let the user specify short names for shuffle managers
254+
val shortShuffleMgrNames = Map(
255+
"hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
256+
"sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
257+
val shuffleMgrName = conf.get("spark.shuffle.manager", "hash")
258+
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
259+
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
251260

252261
val shuffleMemoryManager = new ShuffleMemoryManager(conf)
253262

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,6 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
219219

220220
/** Fill in values by parsing user options. */
221221
private def parseOpts(opts: Seq[String]): Unit = {
222-
var inSparkOpts = true
223222
val EQ_SEPARATED_OPT="""(--[^=]+)=(.+)""".r
224223

225224
// Delineates parsing of Spark options from parsing of user options.

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,7 @@ private[spark] class Executor(
374374
for (taskRunner <- runningTasks.values()) {
375375
if (!taskRunner.attemptedTask.isEmpty) {
376376
Option(taskRunner.task).flatMap(_.metrics).foreach { metrics =>
377+
metrics.updateShuffleReadMetrics
377378
tasksMetrics += ((taskRunner.taskId, metrics))
378379
}
379380
}

core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.executor
1919

20+
import scala.collection.mutable.ArrayBuffer
21+
2022
import org.apache.spark.annotation.DeveloperApi
2123
import org.apache.spark.storage.{BlockId, BlockStatus}
2224

@@ -81,12 +83,27 @@ class TaskMetrics extends Serializable {
8183
var inputMetrics: Option[InputMetrics] = None
8284

8385
/**
84-
* If this task reads from shuffle output, metrics on getting shuffle data will be collected here
86+
* If this task reads from shuffle output, metrics on getting shuffle data will be collected here.
87+
* This includes read metrics aggregated over all the task's shuffle dependencies.
8588
*/
8689
private var _shuffleReadMetrics: Option[ShuffleReadMetrics] = None
8790

8891
def shuffleReadMetrics = _shuffleReadMetrics
8992

93+
/**
94+
* This should only be used when recreating TaskMetrics, not when updating read metrics in
95+
* executors.
96+
*/
97+
private[spark] def setShuffleReadMetrics(shuffleReadMetrics: Option[ShuffleReadMetrics]) {
98+
_shuffleReadMetrics = shuffleReadMetrics
99+
}
100+
101+
/**
102+
* ShuffleReadMetrics per dependency for collecting independently while task is in progress.
103+
*/
104+
@transient private lazy val depsShuffleReadMetrics: ArrayBuffer[ShuffleReadMetrics] =
105+
new ArrayBuffer[ShuffleReadMetrics]()
106+
90107
/**
91108
* If this task writes to shuffle output, metrics on the written shuffle data will be collected
92109
* here
@@ -98,19 +115,31 @@ class TaskMetrics extends Serializable {
98115
*/
99116
var updatedBlocks: Option[Seq[(BlockId, BlockStatus)]] = None
100117

101-
/** Adds the given ShuffleReadMetrics to any existing shuffle metrics for this task. */
102-
def updateShuffleReadMetrics(newMetrics: ShuffleReadMetrics) = synchronized {
103-
_shuffleReadMetrics match {
104-
case Some(existingMetrics) =>
105-
existingMetrics.shuffleFinishTime = math.max(
106-
existingMetrics.shuffleFinishTime, newMetrics.shuffleFinishTime)
107-
existingMetrics.fetchWaitTime += newMetrics.fetchWaitTime
108-
existingMetrics.localBlocksFetched += newMetrics.localBlocksFetched
109-
existingMetrics.remoteBlocksFetched += newMetrics.remoteBlocksFetched
110-
existingMetrics.remoteBytesRead += newMetrics.remoteBytesRead
111-
case None =>
112-
_shuffleReadMetrics = Some(newMetrics)
118+
/**
119+
* A task may have multiple shuffle readers for multiple dependencies. To avoid synchronization
120+
* issues from readers in different threads, in-progress tasks use a ShuffleReadMetrics for each
121+
* dependency, and merge these metrics before reporting them to the driver. This method returns
122+
* a ShuffleReadMetrics for a dependency and registers it for merging later.
123+
*/
124+
private [spark] def createShuffleReadMetricsForDependency(): ShuffleReadMetrics = synchronized {
125+
val readMetrics = new ShuffleReadMetrics()
126+
depsShuffleReadMetrics += readMetrics
127+
readMetrics
128+
}
129+
130+
/**
131+
* Aggregates shuffle read metrics for all registered dependencies into shuffleReadMetrics.
132+
*/
133+
private[spark] def updateShuffleReadMetrics() = synchronized {
134+
val merged = new ShuffleReadMetrics()
135+
for (depMetrics <- depsShuffleReadMetrics) {
136+
merged.fetchWaitTime += depMetrics.fetchWaitTime
137+
merged.localBlocksFetched += depMetrics.localBlocksFetched
138+
merged.remoteBlocksFetched += depMetrics.remoteBlocksFetched
139+
merged.remoteBytesRead += depMetrics.remoteBytesRead
140+
merged.shuffleFinishTime = math.max(merged.shuffleFinishTime, depMetrics.shuffleFinishTime)
113141
}
142+
_shuffleReadMetrics = Some(merged)
114143
}
115144
}
116145

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1233,6 +1233,11 @@ abstract class RDD[T: ClassTag](
12331233
dependencies.head.rdd.asInstanceOf[RDD[U]]
12341234
}
12351235

1236+
/** Returns the jth parent RDD: e.g. rdd.parent[T](0) is equivalent to rdd.firstParent[T] */
1237+
protected[spark] def parent[U: ClassTag](j: Int) = {
1238+
dependencies(j).rdd.asInstanceOf[RDD[U]]
1239+
}
1240+
12361241
/** The [[org.apache.spark.SparkContext]] that this RDD was created on. */
12371242
def context = sc
12381243

core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ private[hash] object BlockStoreShuffleFetcher extends Logging {
3232
shuffleId: Int,
3333
reduceId: Int,
3434
context: TaskContext,
35-
serializer: Serializer)
35+
serializer: Serializer,
36+
shuffleMetrics: ShuffleReadMetrics)
3637
: Iterator[T] =
3738
{
3839
logDebug("Fetching outputs for shuffle %d, reduce %d".format(shuffleId, reduceId))
@@ -73,17 +74,11 @@ private[hash] object BlockStoreShuffleFetcher extends Logging {
7374
}
7475
}
7576

76-
val blockFetcherItr = blockManager.getMultiple(blocksByAddress, serializer)
77+
val blockFetcherItr = blockManager.getMultiple(blocksByAddress, serializer, shuffleMetrics)
7778
val itr = blockFetcherItr.flatMap(unpackBlock)
7879

7980
val completionIter = CompletionIterator[T, Iterator[T]](itr, {
80-
val shuffleMetrics = new ShuffleReadMetrics
81-
shuffleMetrics.shuffleFinishTime = System.currentTimeMillis
82-
shuffleMetrics.fetchWaitTime = blockFetcherItr.fetchWaitTime
83-
shuffleMetrics.remoteBytesRead = blockFetcherItr.remoteBytesRead
84-
shuffleMetrics.localBlocksFetched = blockFetcherItr.numLocalBlocks
85-
shuffleMetrics.remoteBlocksFetched = blockFetcherItr.numRemoteBlocks
86-
context.taskMetrics.updateShuffleReadMetrics(shuffleMetrics)
81+
context.taskMetrics.updateShuffleReadMetrics()
8782
})
8883

8984
new InterruptibleIterator[T](context, completionIter)

core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,10 @@ private[spark] class HashShuffleReader[K, C](
3636

3737
/** Read the combined key-values for this reduce task */
3838
override def read(): Iterator[Product2[K, C]] = {
39+
val readMetrics = context.taskMetrics.createShuffleReadMetricsForDependency()
3940
val ser = Serializer.getSerializer(dep.serializer)
40-
val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context, ser)
41+
val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context, ser,
42+
readMetrics)
4143

4244
val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
4345
if (dep.mapSideCombine) {
@@ -58,7 +60,7 @@ private[spark] class HashShuffleReader[K, C](
5860
// Create an ExternalSorter to sort the data. Note that if spark.shuffle.spill is disabled,
5961
// the ExternalSorter won't spill to disk.
6062
val sorter = new ExternalSorter[K, C, C](ordering = Some(keyOrd), serializer = Some(ser))
61-
sorter.write(aggregatedIter)
63+
sorter.insertAll(aggregatedIter)
6264
context.taskMetrics.memoryBytesSpilled += sorter.memoryBytesSpilled
6365
context.taskMetrics.diskBytesSpilled += sorter.diskBytesSpilled
6466
sorter.iterator

core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala

Lines changed: 21 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ private[spark] class SortShuffleWriter[K, V, C](
4444

4545
private var sorter: ExternalSorter[K, V, _] = null
4646
private var outputFile: File = null
47+
private var indexFile: File = null
4748

4849
// Are we in the process of stopping? Because map tasks can call stop() with success = true
4950
// and then call stop() with success = false if they get an exception, we want to make sure
@@ -57,78 +58,36 @@ private[spark] class SortShuffleWriter[K, V, C](
5758

5859
/** Write a bunch of records to this task's output */
5960
override def write(records: Iterator[_ <: Product2[K, V]]): Unit = {
60-
// Get an iterator with the elements for each partition ID
61-
val partitions: Iterator[(Int, Iterator[Product2[K, _]])] = {
62-
if (dep.mapSideCombine) {
63-
if (!dep.aggregator.isDefined) {
64-
throw new IllegalStateException("Aggregator is empty for map-side combine")
65-
}
66-
sorter = new ExternalSorter[K, V, C](
67-
dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
68-
sorter.write(records)
69-
sorter.partitionedIterator
70-
} else {
71-
// In this case we pass neither an aggregator nor an ordering to the sorter, because we
72-
// don't care whether the keys get sorted in each partition; that will be done on the
73-
// reduce side if the operation being run is sortByKey.
74-
sorter = new ExternalSorter[K, V, V](
75-
None, Some(dep.partitioner), None, dep.serializer)
76-
sorter.write(records)
77-
sorter.partitionedIterator
61+
if (dep.mapSideCombine) {
62+
if (!dep.aggregator.isDefined) {
63+
throw new IllegalStateException("Aggregator is empty for map-side combine")
7864
}
65+
sorter = new ExternalSorter[K, V, C](
66+
dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
67+
sorter.insertAll(records)
68+
} else {
69+
// In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
70+
// care whether the keys get sorted in each partition; that will be done on the reduce side
71+
// if the operation being run is sortByKey.
72+
sorter = new ExternalSorter[K, V, V](
73+
None, Some(dep.partitioner), None, dep.serializer)
74+
sorter.insertAll(records)
7975
}
8076

8177
// Create a single shuffle file with reduce ID 0 that we'll write all results to. We'll later
8278
// serve different ranges of this file using an index file that we create at the end.
8379
val blockId = ShuffleBlockId(dep.shuffleId, mapId, 0)
84-
outputFile = blockManager.diskBlockManager.getFile(blockId)
85-
86-
// Track location of each range in the output file
87-
val offsets = new Array[Long](numPartitions + 1)
88-
val lengths = new Array[Long](numPartitions)
89-
90-
for ((id, elements) <- partitions) {
91-
if (elements.hasNext) {
92-
val writer = blockManager.getDiskWriter(blockId, outputFile, ser, fileBufferSize,
93-
writeMetrics)
94-
for (elem <- elements) {
95-
writer.write(elem)
96-
}
97-
writer.commitAndClose()
98-
val segment = writer.fileSegment()
99-
offsets(id + 1) = segment.offset + segment.length
100-
lengths(id) = segment.length
101-
} else {
102-
// The partition is empty; don't create a new writer to avoid writing headers, etc
103-
offsets(id + 1) = offsets(id)
104-
}
105-
}
106-
107-
context.taskMetrics.memoryBytesSpilled += sorter.memoryBytesSpilled
108-
context.taskMetrics.diskBytesSpilled += sorter.diskBytesSpilled
10980

110-
// Write an index file with the offsets of each block, plus a final offset at the end for the
111-
// end of the output file. This will be used by SortShuffleManager.getBlockLocation to figure
112-
// out where each block begins and ends.
81+
outputFile = blockManager.diskBlockManager.getFile(blockId)
82+
indexFile = blockManager.diskBlockManager.getFile(blockId.name + ".index")
11383

114-
val diskBlockManager = blockManager.diskBlockManager
115-
val indexFile = diskBlockManager.getFile(blockId.name + ".index")
116-
val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexFile)))
117-
try {
118-
var i = 0
119-
while (i < numPartitions + 1) {
120-
out.writeLong(offsets(i))
121-
i += 1
122-
}
123-
} finally {
124-
out.close()
125-
}
84+
val partitionLengths = sorter.writePartitionedFile(blockId, context)
12685

12786
// Register our map output with the ShuffleBlockManager, which handles cleaning it over time
12887
blockManager.shuffleBlockManager.addCompletedMap(dep.shuffleId, mapId, numPartitions)
12988

13089
mapStatus = new MapStatus(blockManager.blockManagerId,
131-
lengths.map(MapOutputTracker.compressSize))
90+
partitionLengths.map(MapOutputTracker.compressSize))
13291
}
13392

13493
/** Close this writer, passing along whether the map completed */
@@ -145,6 +104,9 @@ private[spark] class SortShuffleWriter[K, V, C](
145104
if (outputFile != null) {
146105
outputFile.delete()
147106
}
107+
if (indexFile != null) {
108+
indexFile.delete()
109+
}
148110
return None
149111
}
150112
} finally {

0 commit comments

Comments
 (0)