From 62c96e1cdc472356dfbfb24cf9650a8f36017224 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 18 Jan 2016 11:50:04 -0800 Subject: [PATCH 1/9] Add register* methods (get or create) --- .../org/apache/spark/executor/Executor.scala | 2 +- .../apache/spark/executor/TaskMetrics.scala | 150 +++++++++++++----- .../shuffle/BlockStoreShuffleReader.scala | 4 +- .../storage/ShuffleBlockFetcherIterator.scala | 2 +- .../spark/executor/TaskMetricsSuite.scala | 2 +- 5 files changed, 112 insertions(+), 48 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 9b14184364246..75d7e34d60eb2 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -425,7 +425,7 @@ private[spark] class Executor( for (taskRunner <- runningTasks.values().asScala) { if (taskRunner.task != null) { taskRunner.task.metrics.foreach { metrics => - metrics.updateShuffleReadMetrics() + metrics.mergeShuffleReadMetrics() metrics.updateInputMetrics() metrics.setJvmGCTime(curGCTime - taskRunner.startGCTime) metrics.updateAccumulators() diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 42207a9553592..c0e0df3938f46 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -101,14 +101,37 @@ class TaskMetrics extends Serializable { private[spark] def incDiskBytesSpilled(value: Long): Unit = _diskBytesSpilled += value private[spark] def decDiskBytesSpilled(value: Long): Unit = _diskBytesSpilled -= value - /** - * If this task reads from a HadoopRDD or from persisted data, metrics on how much data was read - * are stored here. - */ private var _inputMetrics: Option[InputMetrics] = None + /** + * Metrics related to reading data from a [[org.apache.spark.rdd.HadoopRDD]] or from persisted + * data, defined only in tasks with input. + */ def inputMetrics: Option[InputMetrics] = _inputMetrics + /** + * Get or create a new [[InputMetrics]] associated with this task. + */ + private[spark] def registerInputMetrics(readMethod: DataReadMethod.Value): InputMetrics = { + synchronized { + val metrics = _inputMetrics.getOrElse { + val metrics = new InputMetrics(readMethod) + _inputMetrics = Some(metrics) + metrics + } + // If there already exists an InputMetric with the same read method, we can just return + // that one. Otherwise, if the read method is different from the one previously seen by + // this task, we return a new dummy one to avoid clobbering the values of the old metrics. + // In the future we should try to store input metrics from all different read methods at + // the same time (SPARK-5225). + if (metrics.readMethod == readMethod) { + metrics + } else { + new InputMetrics(readMethod) + } + } + } + /** * This should only be used when recreating TaskMetrics, not when updating input metrics in * executors @@ -117,18 +140,37 @@ class TaskMetrics extends Serializable { _inputMetrics = inputMetrics } + private var _outputMetrics: Option[OutputMetrics] = None + /** - * If this task writes data externally (e.g. to a distributed filesystem), metrics on how much - * data was written are stored here. + * Metrics related to writing data externally (e.g. to a distributed filesystem), + * defined only in tasks with output. */ - var outputMetrics: Option[OutputMetrics] = None + def outputMetrics: Option[OutputMetrics] = _outputMetrics + + @deprecated("setting OutputMetrics is for internal use only", "2.0.0") + def outputMetrics_=(om: Option[OutputMetrics]): Unit = { + _outputMetrics = om + } /** - * If this task reads from shuffle output, metrics on getting shuffle data will be collected here. - * This includes read metrics aggregated over all the task's shuffle dependencies. + * Get or create a new [[OutputMetrics]] associated with this task. */ + private[spark] def registerOutputMetrics( + writeMethod: DataWriteMethod.Value): OutputMetrics = synchronized { + _outputMetrics.getOrElse { + val metrics = new OutputMetrics(writeMethod) + _outputMetrics = Some(metrics) + metrics + } + } + private var _shuffleReadMetrics: Option[ShuffleReadMetrics] = None + /** + * Metrics related to shuffle read aggregated across all shuffle dependencies. + * This is defined only if there are shuffle dependencies in this task. + */ def shuffleReadMetrics: Option[ShuffleReadMetrics] = _shuffleReadMetrics /** @@ -140,34 +182,74 @@ class TaskMetrics extends Serializable { } /** - * ShuffleReadMetrics per dependency for collecting independently while task is in progress. + * Temporary list of [[ShuffleReadMetrics]], one per shuffle dependency. + * + * A task may have multiple shuffle readers for multiple dependencies. To avoid synchronization + * issues from readers in different threads, in-progress tasks use a [[ShuffleReadMetrics]] for + * each dependency and merge these metrics before reporting them to the driver. + */ + @transient private lazy val tempShuffleReadMetrics = new ArrayBuffer[ShuffleReadMetrics] + + /** + * Create a temporary [[ShuffleReadMetrics]] for a particular shuffle dependency. + * + * All usages are expected to be followed by a call to [[mergeShuffleReadMetrics]], which + * merges the temporary values synchronously. Otherwise, all temporary data collected will + * be lost. */ - @transient private lazy val depsShuffleReadMetrics: ArrayBuffer[ShuffleReadMetrics] = - new ArrayBuffer[ShuffleReadMetrics]() + private[spark] def registerTempShuffleReadMetrics(): ShuffleReadMetrics = synchronized { + val readMetrics = new ShuffleReadMetrics + tempShuffleReadMetrics += readMetrics + readMetrics + } /** - * If this task writes to shuffle output, metrics on the written shuffle data will be collected - * here + * Merge values across all temporary [[ShuffleReadMetrics]] into `_shuffleReadMetrics`. + * This is expected to be called on executor heartbeat and at the end of a task. */ - var shuffleWriteMetrics: Option[ShuffleWriteMetrics] = None + private[spark] def mergeShuffleReadMetrics(): Unit = synchronized { + if (tempShuffleReadMetrics.nonEmpty) { + val merged = new ShuffleReadMetrics + for (depMetrics <- tempShuffleReadMetrics) { + merged.incFetchWaitTime(depMetrics.fetchWaitTime) + merged.incLocalBlocksFetched(depMetrics.localBlocksFetched) + merged.incRemoteBlocksFetched(depMetrics.remoteBlocksFetched) + merged.incRemoteBytesRead(depMetrics.remoteBytesRead) + merged.incLocalBytesRead(depMetrics.localBytesRead) + merged.incRecordsRead(depMetrics.recordsRead) + } + _shuffleReadMetrics = Some(merged) + } + } + + private var _shuffleWriteMetrics: Option[ShuffleWriteMetrics] = None /** - * Storage statuses of any blocks that have been updated as a result of this task. + * Metrics related to shuffle write, defined only in shuffle map stages. */ - var updatedBlocks: Option[Seq[(BlockId, BlockStatus)]] = None + def shuffleWriteMetrics: Option[ShuffleWriteMetrics] = _shuffleWriteMetrics + + @deprecated("setting ShuffleWriteMetrics is for internal use only", "2.0.0") + def shuffleWriteMetrics_=(swm: Option[ShuffleWriteMetrics]): Unit = { + _shuffleWriteMetrics = swm + } /** - * A task may have multiple shuffle readers for multiple dependencies. To avoid synchronization - * issues from readers in different threads, in-progress tasks use a ShuffleReadMetrics for each - * dependency, and merge these metrics before reporting them to the driver. This method returns - * a ShuffleReadMetrics for a dependency and registers it for merging later. + * Get or create a new [[ShuffleWriteMetrics]] associated with this task. */ - private [spark] def createShuffleReadMetricsForDependency(): ShuffleReadMetrics = synchronized { - val readMetrics = new ShuffleReadMetrics() - depsShuffleReadMetrics += readMetrics - readMetrics + private[spark] def registerShuffleWriteMetrics(): ShuffleWriteMetrics = synchronized { + _shuffleWriteMetrics.getOrElse { + val metrics = new ShuffleWriteMetrics + _shuffleWriteMetrics = Some(metrics) + metrics + } } + /** + * Storage statuses of any blocks that have been updated as a result of this task. + */ + var updatedBlocks: Option[Seq[(BlockId, BlockStatus)]] = None + /** * Returns the input metrics object that the task should use. Currently, if * there exists an input metric with the same readMethod, we return that one @@ -193,24 +275,6 @@ class TaskMetrics extends Serializable { } } - /** - * Aggregates shuffle read metrics for all registered dependencies into shuffleReadMetrics. - */ - private[spark] def updateShuffleReadMetrics(): Unit = synchronized { - if (!depsShuffleReadMetrics.isEmpty) { - val merged = new ShuffleReadMetrics() - for (depMetrics <- depsShuffleReadMetrics) { - merged.incFetchWaitTime(depMetrics.fetchWaitTime) - merged.incLocalBlocksFetched(depMetrics.localBlocksFetched) - merged.incRemoteBlocksFetched(depMetrics.remoteBlocksFetched) - merged.incRemoteBytesRead(depMetrics.remoteBytesRead) - merged.incLocalBytesRead(depMetrics.localBytesRead) - merged.incRecordsRead(depMetrics.recordsRead) - } - _shuffleReadMetrics = Some(merged) - } - } - private[spark] def updateInputMetrics(): Unit = synchronized { inputMetrics.foreach(_.updateBytesRead()) } diff --git a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala index b0abda4a81b8d..a57e5b0bfb865 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala @@ -65,13 +65,13 @@ private[spark] class BlockStoreShuffleReader[K, C]( } // Update the context task metrics for each record read. - val readMetrics = context.taskMetrics.createShuffleReadMetricsForDependency() + val readMetrics = context.taskMetrics.registerTempShuffleReadMetrics() val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]]( recordIter.map(record => { readMetrics.incRecordsRead(1) record }), - context.taskMetrics().updateShuffleReadMetrics()) + context.taskMetrics().mergeShuffleReadMetrics()) // An interruptible iterator must be used here in order to support task cancellation val interruptibleIter = new InterruptibleIterator[(Any, Any)](context, metricIter) diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index 037bec1d9c33b..c6065df64ae03 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -101,7 +101,7 @@ final class ShuffleBlockFetcherIterator( /** Current bytes in flight from our requests */ private[this] var bytesInFlight = 0L - private[this] val shuffleMetrics = context.taskMetrics().createShuffleReadMetricsForDependency() + private[this] val shuffleMetrics = context.taskMetrics().registerTempShuffleReadMetrics() /** * Whether the iterator is still active. If isZombie is true, the callback interface will no diff --git a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala index 8275fd87764cd..e5ec2aa1be355 100644 --- a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala @@ -22,7 +22,7 @@ import org.apache.spark.SparkFunSuite class TaskMetricsSuite extends SparkFunSuite { test("[SPARK-5701] updateShuffleReadMetrics: ShuffleReadMetrics not added when no shuffle deps") { val taskMetrics = new TaskMetrics() - taskMetrics.updateShuffleReadMetrics() + taskMetrics.mergeShuffleReadMetrics() assert(taskMetrics.shuffleReadMetrics.isEmpty) } } From b9d7fbf37cc410d44e462d9d08650a20decc8fc9 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 18 Jan 2016 12:10:17 -0800 Subject: [PATCH 2/9] Clean up places where we set OutputMetrics Note: there's one remaining place, which is JsonProtocol. --- .../apache/spark/rdd/PairRDDFunctions.scala | 42 ++++++++++++------- .../ui/jobs/JobProgressListenerSuite.scala | 3 +- 2 files changed, 29 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 16a856f594e97..c098ed2354eaf 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -1110,8 +1110,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) writer.close(hadoopContext) } committer.commitTask(hadoopContext) - bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) } - outputMetrics.setRecordsWritten(recordsWritten) + outputMetrics.foreach { om => + bytesWrittenCallback.foreach { fn => om.setBytesWritten(fn()) } + om.setRecordsWritten(recordsWritten) + } 1 } : Int @@ -1196,28 +1198,40 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) writer.close() } writer.commit() - bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) } - outputMetrics.setRecordsWritten(recordsWritten) + outputMetrics.foreach { om => + bytesWrittenCallback.foreach { fn => om.setBytesWritten(fn()) } + om.setRecordsWritten(recordsWritten) + } } self.context.runJob(self, writeToFile) writer.commitJob() } - private def initHadoopOutputMetrics(context: TaskContext): (OutputMetrics, Option[() => Long]) = { + // TODO: these don't seem like the right abstractions. + // We should abstract the duplicate code in a less awkward way. + + private def initHadoopOutputMetrics( + context: TaskContext): (Option[OutputMetrics], Option[() => Long]) = { val bytesWrittenCallback = SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback() - val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop) - if (bytesWrittenCallback.isDefined) { - context.taskMetrics.outputMetrics = Some(outputMetrics) - } + val outputMetrics = + if (bytesWrittenCallback.isDefined) { + Some(context.taskMetrics().registerOutputMetrics(DataWriteMethod.Hadoop)) + } else { + None + } (outputMetrics, bytesWrittenCallback) } - private def maybeUpdateOutputMetrics(bytesWrittenCallback: Option[() => Long], - outputMetrics: OutputMetrics, recordsWritten: Long): Unit = { - if (recordsWritten % PairRDDFunctions.RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0) { - bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) } - outputMetrics.setRecordsWritten(recordsWritten) + private def maybeUpdateOutputMetrics( + bytesWrittenCallback: Option[() => Long], + outputMetrics: Option[OutputMetrics], + recordsWritten: Long): Unit = { + outputMetrics.foreach { om => + if (recordsWritten % PairRDDFunctions.RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0) { + bytesWrittenCallback.foreach { fn => om.setBytesWritten(fn()) } + om.setRecordsWritten(recordsWritten) + } } } diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index e02f5a1b20fe3..98731e9d52b01 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -284,8 +284,7 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) taskMetrics.setInputMetrics(Some(inputMetrics)) inputMetrics.incBytesRead(base + 7) - val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop) - taskMetrics.outputMetrics = Some(outputMetrics) + val outputMetrics = taskMetrics.registerOutputMetrics(DataWriteMethod.Hadoop) outputMetrics.setBytesWritten(base + 8) taskMetrics } From 078598409225224f0532a45f34dae533695b25df Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 18 Jan 2016 12:20:28 -0800 Subject: [PATCH 3/9] Replace set with register JsonProtocol remains the only place where we still call set on each of the *Metrics classes. --- .../sort/BypassMergeSortShuffleWriter.java | 3 +-- .../shuffle/sort/UnsafeShuffleWriter.java | 3 +-- .../shuffle/hash/HashShuffleWriter.scala | 3 +-- .../shuffle/sort/SortShuffleWriter.scala | 6 ++---- .../util/collection/ExternalSorter.scala | 10 ++++++---- .../ui/jobs/JobProgressListenerSuite.scala | 20 +++++++++---------- .../apache/spark/util/JsonProtocolSuite.scala | 10 ++++------ .../execution/UnsafeRowSerializerSuite.scala | 1 - 8 files changed, 24 insertions(+), 32 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index 56cdc22f36261..a789c59c9c5b4 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -114,8 +114,7 @@ public BypassMergeSortShuffleWriter( this.shuffleId = dep.shuffleId(); this.partitioner = dep.partitioner(); this.numPartitions = partitioner.numPartitions(); - this.writeMetrics = new ShuffleWriteMetrics(); - taskContext.taskMetrics().shuffleWriteMetrics_$eq(Option.apply(writeMetrics)); + this.writeMetrics = taskContext.taskMetrics().registerShuffleWriteMetrics(); this.serializer = Serializer.getSerializer(dep.serializer()); this.shuffleBlockResolver = shuffleBlockResolver; } diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index 744c3008ca50e..a677a058b46d0 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -119,8 +119,7 @@ public UnsafeShuffleWriter( this.shuffleId = dep.shuffleId(); this.serializer = Serializer.getSerializer(dep.serializer()).newInstance(); this.partitioner = dep.partitioner(); - this.writeMetrics = new ShuffleWriteMetrics(); - taskContext.taskMetrics().shuffleWriteMetrics_$eq(Option.apply(writeMetrics)); + this.writeMetrics = taskContext.taskMetrics().registerShuffleWriteMetrics(); this.taskContext = taskContext; this.sparkConf = sparkConf; this.transferToEnabled = sparkConf.getBoolean("spark.file.transferTo", true); diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala index 412bf70000da7..28bcced901a70 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala @@ -42,8 +42,7 @@ private[spark] class HashShuffleWriter[K, V]( // we don't try deleting files, etc twice. private var stopping = false - private val writeMetrics = new ShuffleWriteMetrics() - metrics.shuffleWriteMetrics = Some(writeMetrics) + private val writeMetrics = metrics.registerShuffleWriteMetrics() private val blockManager = SparkEnv.get.blockManager private val ser = Serializer.getSerializer(dep.serializer.getOrElse(null)) diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala index f83cf8859e581..4ec52b550dbb8 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala @@ -45,8 +45,7 @@ private[spark] class SortShuffleWriter[K, V, C]( private var mapStatus: MapStatus = null - private val writeMetrics = new ShuffleWriteMetrics() - context.taskMetrics.shuffleWriteMetrics = Some(writeMetrics) + private val writeMetrics = context.taskMetrics().registerShuffleWriteMetrics() /** Write a bunch of records to this task's output */ override def write(records: Iterator[Product2[K, V]]): Unit = { @@ -93,8 +92,7 @@ private[spark] class SortShuffleWriter[K, V, C]( if (sorter != null) { val startTime = System.nanoTime() sorter.stop() - context.taskMetrics.shuffleWriteMetrics.foreach( - _.incShuffleWriteTime(System.nanoTime - startTime)) + writeMetrics.incShuffleWriteTime(System.nanoTime - startTime) sorter = null } } diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 63ba954a7fa7e..b5e4d9d67d3b0 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -644,6 +644,8 @@ private[spark] class ExternalSorter[K, V, C]( blockId: BlockId, outputFile: File): Array[Long] = { + val writeMetrics = context.taskMetrics().registerShuffleWriteMetrics() + // Track location of each range in the output file val lengths = new Array[Long](numPartitions) @@ -652,8 +654,8 @@ private[spark] class ExternalSorter[K, V, C]( val collection = if (aggregator.isDefined) map else buffer val it = collection.destructiveSortedWritablePartitionedIterator(comparator) while (it.hasNext) { - val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize, - context.taskMetrics.shuffleWriteMetrics.get) + val writer = blockManager.getDiskWriter( + blockId, outputFile, serInstance, fileBufferSize, writeMetrics) val partitionId = it.nextPartition() while (it.hasNext && it.nextPartition() == partitionId) { it.writeNext(writer) @@ -666,8 +668,8 @@ private[spark] class ExternalSorter[K, V, C]( // We must perform merge-sort; get an iterator by partition and write everything directly. for ((id, elements) <- this.partitionedIterator) { if (elements.hasNext) { - val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize, - context.taskMetrics.shuffleWriteMetrics.get) + val writer = blockManager.getDiskWriter( + blockId, outputFile, serInstance, fileBufferSize, writeMetrics) for (elem <- elements) { writer.write(elem._1, elem._2) } diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index 98731e9d52b01..762f8c4d58732 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -184,12 +184,12 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with val conf = new SparkConf() val listener = new JobProgressListener(conf) val taskMetrics = new TaskMetrics() - val shuffleReadMetrics = new ShuffleReadMetrics() + val shuffleReadMetrics = taskMetrics.registerTempShuffleReadMetrics() assert(listener.stageIdToData.size === 0) // finish this task, should get updated shuffleRead shuffleReadMetrics.incRemoteBytesRead(1000) - taskMetrics.setShuffleReadMetrics(Some(shuffleReadMetrics)) + taskMetrics.mergeShuffleReadMetrics() var taskInfo = new TaskInfo(1234L, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false) taskInfo.finishTime = 1 var task = new ShuffleMapTask(0) @@ -270,19 +270,17 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with def makeTaskMetrics(base: Int): TaskMetrics = { val taskMetrics = new TaskMetrics() - val shuffleReadMetrics = new ShuffleReadMetrics() - val shuffleWriteMetrics = new ShuffleWriteMetrics() - taskMetrics.setShuffleReadMetrics(Some(shuffleReadMetrics)) - taskMetrics.shuffleWriteMetrics = Some(shuffleWriteMetrics) + taskMetrics.setExecutorRunTime(base + 4) + taskMetrics.incDiskBytesSpilled(base + 5) + taskMetrics.incMemoryBytesSpilled(base + 6) + val shuffleReadMetrics = taskMetrics.registerTempShuffleReadMetrics() shuffleReadMetrics.incRemoteBytesRead(base + 1) shuffleReadMetrics.incLocalBytesRead(base + 9) shuffleReadMetrics.incRemoteBlocksFetched(base + 2) + taskMetrics.mergeShuffleReadMetrics() + val shuffleWriteMetrics = taskMetrics.registerShuffleWriteMetrics() shuffleWriteMetrics.incShuffleBytesWritten(base + 3) - taskMetrics.setExecutorRunTime(base + 4) - taskMetrics.incDiskBytesSpilled(base + 5) - taskMetrics.incMemoryBytesSpilled(base + 6) - val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) - taskMetrics.setInputMetrics(Some(inputMetrics)) + val inputMetrics = taskMetrics.registerInputMetrics(DataReadMethod.Hadoop) inputMetrics.incBytesRead(base + 7) val outputMetrics = taskMetrics.registerOutputMetrics(DataWriteMethod.Hadoop) outputMetrics.setBytesWritten(base + 8) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 068e8397c89bb..b828648fcd1c4 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -773,19 +773,18 @@ class JsonProtocolSuite extends SparkFunSuite { t.incMemoryBytesSpilled(a + c) if (hasHadoopInput) { - val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) + val inputMetrics = t.registerInputMetrics(DataReadMethod.Hadoop) inputMetrics.incBytesRead(d + e + f) inputMetrics.incRecordsRead(if (hasRecords) (d + e + f) / 100 else -1) - t.setInputMetrics(Some(inputMetrics)) } else { - val sr = new ShuffleReadMetrics + val sr = t.registerTempShuffleReadMetrics() sr.incRemoteBytesRead(b + d) sr.incLocalBlocksFetched(e) sr.incFetchWaitTime(a + d) sr.incRemoteBlocksFetched(f) sr.incRecordsRead(if (hasRecords) (b + d) / 100 else -1) sr.incLocalBytesRead(a + f) - t.setShuffleReadMetrics(Some(sr)) + t.mergeShuffleReadMetrics() } if (hasOutput) { val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop) @@ -793,11 +792,10 @@ class JsonProtocolSuite extends SparkFunSuite { outputMetrics.setRecordsWritten(if (hasRecords) (a + b + c)/100 else -1) t.outputMetrics = Some(outputMetrics) } else { - val sw = new ShuffleWriteMetrics + val sw = t.registerShuffleWriteMetrics() sw.incShuffleBytesWritten(a + b + c) sw.incShuffleWriteTime(b + c + d) sw.setShuffleRecordsWritten(if (hasRecords) (a + b + c) / 100 else -1) - t.shuffleWriteMetrics = Some(sw) } // Make at most 6 blocks t.updatedBlocks = Some((1 to (e % 5 + 1)).map { i => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala index 9f09eb4429c12..7438e11ef7176 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala @@ -127,7 +127,6 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkContext { assert(sorter.numSpills > 0) // Merging spilled files should not throw assertion error - taskContext.taskMetrics.shuffleWriteMetrics = Some(new ShuffleWriteMetrics) sorter.writePartitionedFile(ShuffleBlockId(0, 0, 0), outputFile) } { // Clean up From ad094f071472b9cf7b9f9bdb7cd00d88c402995d Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 18 Jan 2016 12:30:59 -0800 Subject: [PATCH 4/9] Clean up JsonProtocol This commit collapsed 10 methods into 2. The 8 that were inlined were only used in 1 place each, and the body of each was quite small. The additional level of abstraction did not add much value and made the code verbose. --- .../org/apache/spark/util/JsonProtocol.scala | 148 ++++++++---------- 1 file changed, 68 insertions(+), 80 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index a6460bc8b8202..9941ae5640470 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -292,19 +292,38 @@ private[spark] object JsonProtocol { } def taskMetricsToJson(taskMetrics: TaskMetrics): JValue = { - val shuffleReadMetrics = - taskMetrics.shuffleReadMetrics.map(shuffleReadMetricsToJson).getOrElse(JNothing) - val shuffleWriteMetrics = - taskMetrics.shuffleWriteMetrics.map(shuffleWriteMetricsToJson).getOrElse(JNothing) - val inputMetrics = - taskMetrics.inputMetrics.map(inputMetricsToJson).getOrElse(JNothing) - val outputMetrics = - taskMetrics.outputMetrics.map(outputMetricsToJson).getOrElse(JNothing) - val updatedBlocks = + val shuffleReadMetrics: JValue = + taskMetrics.shuffleReadMetrics.map { rm => + ("Remote Blocks Fetched" -> rm.remoteBlocksFetched) ~ + ("Local Blocks Fetched" -> rm.localBlocksFetched) ~ + ("Fetch Wait Time" -> rm.fetchWaitTime) ~ + ("Remote Bytes Read" -> rm.remoteBytesRead) ~ + ("Local Bytes Read" -> rm.localBytesRead) ~ + ("Total Records Read" -> rm.recordsRead) + }.getOrElse(JNothing) + val shuffleWriteMetrics: JValue = + taskMetrics.shuffleWriteMetrics.map { wm => + ("Shuffle Bytes Written" -> wm.shuffleBytesWritten) ~ + ("Shuffle Write Time" -> wm.shuffleWriteTime) ~ + ("Shuffle Records Written" -> wm.shuffleRecordsWritten) + }.getOrElse(JNothing) + val inputMetrics: JValue = + taskMetrics.inputMetrics.map { im => + ("Data Read Method" -> im.readMethod.toString) ~ + ("Bytes Read" -> im.bytesRead) ~ + ("Records Read" -> im.recordsRead) + }.getOrElse(JNothing) + val outputMetrics: JValue = + taskMetrics.outputMetrics.map { om => + ("Data Write Method" -> om.writeMethod.toString) ~ + ("Bytes Written" -> om.bytesWritten) ~ + ("Records Written" -> om.recordsWritten) + }.getOrElse(JNothing) + val updatedBlocks: JValue = taskMetrics.updatedBlocks.map { blocks => JArray(blocks.toList.map { case (id, status) => ("Block ID" -> id.toString) ~ - ("Status" -> blockStatusToJson(status)) + ("Status" -> blockStatusToJson(status)) }) }.getOrElse(JNothing) ("Host Name" -> taskMetrics.hostname) ~ @@ -322,33 +341,6 @@ private[spark] object JsonProtocol { ("Updated Blocks" -> updatedBlocks) } - def shuffleReadMetricsToJson(shuffleReadMetrics: ShuffleReadMetrics): JValue = { - ("Remote Blocks Fetched" -> shuffleReadMetrics.remoteBlocksFetched) ~ - ("Local Blocks Fetched" -> shuffleReadMetrics.localBlocksFetched) ~ - ("Fetch Wait Time" -> shuffleReadMetrics.fetchWaitTime) ~ - ("Remote Bytes Read" -> shuffleReadMetrics.remoteBytesRead) ~ - ("Local Bytes Read" -> shuffleReadMetrics.localBytesRead) ~ - ("Total Records Read" -> shuffleReadMetrics.recordsRead) - } - - def shuffleWriteMetricsToJson(shuffleWriteMetrics: ShuffleWriteMetrics): JValue = { - ("Shuffle Bytes Written" -> shuffleWriteMetrics.shuffleBytesWritten) ~ - ("Shuffle Write Time" -> shuffleWriteMetrics.shuffleWriteTime) ~ - ("Shuffle Records Written" -> shuffleWriteMetrics.shuffleRecordsWritten) - } - - def inputMetricsToJson(inputMetrics: InputMetrics): JValue = { - ("Data Read Method" -> inputMetrics.readMethod.toString) ~ - ("Bytes Read" -> inputMetrics.bytesRead) ~ - ("Records Read" -> inputMetrics.recordsRead) - } - - def outputMetricsToJson(outputMetrics: OutputMetrics): JValue = { - ("Data Write Method" -> outputMetrics.writeMethod.toString) ~ - ("Bytes Written" -> outputMetrics.bytesWritten) ~ - ("Records Written" -> outputMetrics.recordsWritten) - } - def taskEndReasonToJson(taskEndReason: TaskEndReason): JValue = { val reason = Utils.getFormattedClassName(taskEndReason) val json: JObject = taskEndReason match { @@ -720,14 +712,45 @@ private[spark] object JsonProtocol { metrics.setResultSerializationTime((json \ "Result Serialization Time").extract[Long]) metrics.incMemoryBytesSpilled((json \ "Memory Bytes Spilled").extract[Long]) metrics.incDiskBytesSpilled((json \ "Disk Bytes Spilled").extract[Long]) - metrics.setShuffleReadMetrics( - Utils.jsonOption(json \ "Shuffle Read Metrics").map(shuffleReadMetricsFromJson)) - metrics.shuffleWriteMetrics = - Utils.jsonOption(json \ "Shuffle Write Metrics").map(shuffleWriteMetricsFromJson) - metrics.setInputMetrics( - Utils.jsonOption(json \ "Input Metrics").map(inputMetricsFromJson)) - metrics.outputMetrics = - Utils.jsonOption(json \ "Output Metrics").map(outputMetricsFromJson) + + // Shuffle read metrics + Utils.jsonOption(json \ "Shuffle Read Metrics").foreach { readJson => + val readMetrics = metrics.registerTempShuffleReadMetrics() + readMetrics.incRemoteBlocksFetched((readJson \ "Remote Blocks Fetched").extract[Int]) + readMetrics.incLocalBlocksFetched((readJson \ "Local Blocks Fetched").extract[Int]) + readMetrics.incRemoteBytesRead((readJson \ "Remote Bytes Read").extract[Long]) + readMetrics.incLocalBytesRead((readJson \ "Local Bytes Read").extractOpt[Long].getOrElse(0L)) + readMetrics.incFetchWaitTime((readJson \ "Fetch Wait Time").extract[Long]) + readMetrics.incRecordsRead((readJson \ "Total Records Read").extractOpt[Long].getOrElse(0L)) + metrics.mergeShuffleReadMetrics() + } + + // Shuffle write metrics + // TODO: Drop the redundant "Shuffle" since it's inconsistent with related classes. + Utils.jsonOption(json \ "Shuffle Write Metrics").foreach { writeJson => + val writeMetrics = metrics.registerShuffleWriteMetrics() + writeMetrics.incShuffleBytesWritten((writeJson \ "Shuffle Bytes Written").extract[Long]) + writeMetrics.incShuffleRecordsWritten((writeJson \ "Shuffle Records Written") + .extractOpt[Long].getOrElse(0L)) + writeMetrics.incShuffleWriteTime((writeJson \ "Shuffle Write Time").extract[Long]) + } + + // Output metrics + Utils.jsonOption(json \ "Output Metrics").foreach { outJson => + val writeMethod = DataWriteMethod.withName((outJson \ "Data Write Method").extract[String]) + val outputMetrics = metrics.registerOutputMetrics(writeMethod) + outputMetrics.setBytesWritten((outJson \ "Bytes Written").extract[Long]) + outputMetrics.setRecordsWritten((outJson \ "Records Written").extractOpt[Long].getOrElse(0L)) + } + + // Input metrics + Utils.jsonOption(json \ "Input Metrics").foreach { inJson => + val readMethod = DataReadMethod.withName((inJson \ "Data Read Method").extract[String]) + val inputMetrics = metrics.registerInputMetrics(readMethod) + inputMetrics.incBytesRead((inJson \ "Bytes Read").extract[Long]) + inputMetrics.incRecordsRead((inJson \ "Records Read").extractOpt[Long].getOrElse(0L)) + } + metrics.updatedBlocks = Utils.jsonOption(json \ "Updated Blocks").map { value => value.extract[List[JValue]].map { block => @@ -736,42 +759,7 @@ private[spark] object JsonProtocol { (id, status) } } - metrics - } - - def shuffleReadMetricsFromJson(json: JValue): ShuffleReadMetrics = { - val metrics = new ShuffleReadMetrics - metrics.incRemoteBlocksFetched((json \ "Remote Blocks Fetched").extract[Int]) - metrics.incLocalBlocksFetched((json \ "Local Blocks Fetched").extract[Int]) - metrics.incFetchWaitTime((json \ "Fetch Wait Time").extract[Long]) - metrics.incRemoteBytesRead((json \ "Remote Bytes Read").extract[Long]) - metrics.incLocalBytesRead((json \ "Local Bytes Read").extractOpt[Long].getOrElse(0)) - metrics.incRecordsRead((json \ "Total Records Read").extractOpt[Long].getOrElse(0)) - metrics - } - - def shuffleWriteMetricsFromJson(json: JValue): ShuffleWriteMetrics = { - val metrics = new ShuffleWriteMetrics - metrics.incShuffleBytesWritten((json \ "Shuffle Bytes Written").extract[Long]) - metrics.incShuffleWriteTime((json \ "Shuffle Write Time").extract[Long]) - metrics.setShuffleRecordsWritten((json \ "Shuffle Records Written") - .extractOpt[Long].getOrElse(0)) - metrics - } - - def inputMetricsFromJson(json: JValue): InputMetrics = { - val metrics = new InputMetrics( - DataReadMethod.withName((json \ "Data Read Method").extract[String])) - metrics.incBytesRead((json \ "Bytes Read").extract[Long]) - metrics.incRecordsRead((json \ "Records Read").extractOpt[Long].getOrElse(0)) - metrics - } - def outputMetricsFromJson(json: JValue): OutputMetrics = { - val metrics = new OutputMetrics( - DataWriteMethod.withName((json \ "Data Write Method").extract[String])) - metrics.setBytesWritten((json \ "Bytes Written").extract[Long]) - metrics.setRecordsWritten((json \ "Records Written").extractOpt[Long].getOrElse(0)) metrics } From 34c7ce5bf724c781a37c352277f7c5cd86d33c9a Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 18 Jan 2016 12:46:42 -0800 Subject: [PATCH 5/9] Hide updatedBlocks --- .../scala/org/apache/spark/CacheManager.scala | 5 +--- .../apache/spark/executor/TaskMetrics.scala | 23 ++++++++++++++- .../spark/memory/StorageMemoryPool.scala | 6 +--- .../spark/storage/StorageStatusListener.scala | 2 +- .../apache/spark/ui/storage/StorageTab.scala | 4 +-- .../org/apache/spark/util/JsonProtocol.scala | 28 +++++++++---------- .../org/apache/spark/CacheManagerSuite.scala | 2 +- .../storage/StorageStatusListenerSuite.scala | 12 ++++---- .../spark/ui/storage/StorageTabSuite.scala | 8 +++--- .../apache/spark/util/JsonProtocolSuite.scala | 7 ++--- 10 files changed, 54 insertions(+), 43 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index 36b536e89c3a4..b89972836fb05 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -71,11 +71,8 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { // Otherwise, cache the values and keep track of any updates in block statuses val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks) - val metrics = context.taskMetrics - val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]()) - metrics.updatedBlocks = Some(lastUpdatedBlocks ++ updatedBlocks.toSeq) + context.taskMetrics().incUpdatedBlockStatuses(updatedBlocks) new InterruptibleIterator(context, cachedValues) - } finally { loading.synchronized { loading.remove(key) diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index c0e0df3938f46..e67159155a761 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -245,10 +245,31 @@ class TaskMetrics extends Serializable { } } + private var _updatedBlockStatuses: Seq[(BlockId, BlockStatus)] = + Seq.empty[(BlockId, BlockStatus)] + /** * Storage statuses of any blocks that have been updated as a result of this task. */ - var updatedBlocks: Option[Seq[(BlockId, BlockStatus)]] = None + def updatedBlockStatuses: Seq[(BlockId, BlockStatus)] = _updatedBlockStatuses + + private[spark] def incUpdatedBlockStatuses(v: Seq[(BlockId, BlockStatus)]): Unit = { + _updatedBlockStatuses ++= v + } + + private[spark] def setUpdatedBlockStatuses(v: Seq[(BlockId, BlockStatus)]): Unit = { + _updatedBlockStatuses = v + } + + @deprecated("use updatedBlockStatuses instead", "2.0.0") + def updatedBlocks: Option[Seq[(BlockId, BlockStatus)]] = { + if (_updatedBlockStatuses.nonEmpty) Some(_updatedBlockStatuses) else None + } + + @deprecated("setting updated blocks is for internal use only", "2.0.0") + def updatedBlocks_=(ub: Option[Seq[(BlockId, BlockStatus)]]): Unit = { + _updatedBlockStatuses = ub.getOrElse(Seq.empty[(BlockId, BlockStatus)]) + } /** * Returns the input metrics object that the task should use. Currently, if diff --git a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala index 4036484aada23..79411e7eb6f10 100644 --- a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala +++ b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala @@ -88,11 +88,7 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w if (numBytesToFree > 0) { memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree, evictedBlocks) // Register evicted blocks, if any, with the active task metrics - Option(TaskContext.get()).foreach { tc => - val metrics = tc.taskMetrics() - val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]()) - metrics.updatedBlocks = Some(lastUpdatedBlocks ++ evictedBlocks.toSeq) - } + Option(TaskContext.get()).foreach(_.taskMetrics().incUpdatedBlockStatuses(evictedBlocks)) } // NOTE: If the memory store evicts blocks, then those evictions will synchronously call // back into this StorageMemoryPool in order to free memory. Therefore, these variables diff --git a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala index ec711480ebf30..d98aae8ff0c68 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala @@ -63,7 +63,7 @@ class StorageStatusListener extends SparkListener { val info = taskEnd.taskInfo val metrics = taskEnd.taskMetrics if (info != null && metrics != null) { - val updatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]()) + val updatedBlocks = metrics.updatedBlockStatuses if (updatedBlocks.length > 0) { updateStorageStatus(info.executorId, updatedBlocks) } diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala index 2d9b885c684b2..f1e28b4e1e9c2 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala @@ -63,8 +63,8 @@ class StorageListener(storageStatusListener: StorageStatusListener) extends Bloc */ override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized { val metrics = taskEnd.taskMetrics - if (metrics != null && metrics.updatedBlocks.isDefined) { - updateRDDInfo(metrics.updatedBlocks.get) + if (metrics != null && metrics.updatedBlockStatuses.nonEmpty) { + updateRDDInfo(metrics.updatedBlockStatuses) } } diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 9941ae5640470..f10dd5b9c6502 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -319,13 +319,11 @@ private[spark] object JsonProtocol { ("Bytes Written" -> om.bytesWritten) ~ ("Records Written" -> om.recordsWritten) }.getOrElse(JNothing) - val updatedBlocks: JValue = - taskMetrics.updatedBlocks.map { blocks => - JArray(blocks.toList.map { case (id, status) => - ("Block ID" -> id.toString) ~ - ("Status" -> blockStatusToJson(status)) - }) - }.getOrElse(JNothing) + val updatedBlocks = + JArray(taskMetrics.updatedBlockStatuses.toList.map { case (id, status) => + ("Block ID" -> id.toString) ~ + ("Status" -> blockStatusToJson(status)) + }) ("Host Name" -> taskMetrics.hostname) ~ ("Executor Deserialize Time" -> taskMetrics.executorDeserializeTime) ~ ("Executor Run Time" -> taskMetrics.executorRunTime) ~ @@ -751,14 +749,14 @@ private[spark] object JsonProtocol { inputMetrics.incRecordsRead((inJson \ "Records Read").extractOpt[Long].getOrElse(0L)) } - metrics.updatedBlocks = - Utils.jsonOption(json \ "Updated Blocks").map { value => - value.extract[List[JValue]].map { block => - val id = BlockId((block \ "Block ID").extract[String]) - val status = blockStatusFromJson(block \ "Status") - (id, status) - } - } + // Updated blocks + Utils.jsonOption(json \ "Updated Blocks").foreach { blocksJson => + metrics.setUpdatedBlockStatuses(blocksJson.extract[List[JValue]].map { blockJson => + val id = BlockId((blockJson \ "Block ID").extract[String]) + val status = blockStatusFromJson(blockJson \ "Status") + (id, status) + }) + } metrics } diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala index 30aa94c8a5971..7c8529f687d4c 100644 --- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala @@ -86,6 +86,6 @@ class CacheManagerSuite extends SparkFunSuite with LocalSparkContext with Before cacheManager = sc.env.cacheManager val context = TaskContext.empty() cacheManager.getOrCompute(rdd3, split, context, StorageLevel.MEMORY_ONLY) - assert(context.taskMetrics.updatedBlocks.getOrElse(Seq()).size === 2) + assert(context.taskMetrics.updatedBlockStatuses.size === 2) } } diff --git a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala index 355d80d06898b..9de434166bba3 100644 --- a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala @@ -85,8 +85,8 @@ class StorageStatusListenerSuite extends SparkFunSuite { val block1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.DISK_ONLY, 0L, 100L)) val block2 = (RDDBlockId(1, 2), BlockStatus(StorageLevel.DISK_ONLY, 0L, 200L)) val block3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.DISK_ONLY, 0L, 300L)) - taskMetrics1.updatedBlocks = Some(Seq(block1, block2)) - taskMetrics2.updatedBlocks = Some(Seq(block3)) + taskMetrics1.setUpdatedBlockStatuses(Seq(block1, block2)) + taskMetrics2.setUpdatedBlockStatuses(Seq(block3)) // Task end with new blocks assert(listener.executorIdToStorageStatus("big").numBlocks === 0) @@ -108,8 +108,8 @@ class StorageStatusListenerSuite extends SparkFunSuite { val droppedBlock1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.NONE, 0L, 0L)) val droppedBlock2 = (RDDBlockId(1, 2), BlockStatus(StorageLevel.NONE, 0L, 0L)) val droppedBlock3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.NONE, 0L, 0L)) - taskMetrics1.updatedBlocks = Some(Seq(droppedBlock1, droppedBlock3)) - taskMetrics2.updatedBlocks = Some(Seq(droppedBlock2, droppedBlock3)) + taskMetrics1.setUpdatedBlockStatuses(Seq(droppedBlock1, droppedBlock3)) + taskMetrics2.setUpdatedBlockStatuses(Seq(droppedBlock2, droppedBlock3)) listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo1, taskMetrics1)) assert(listener.executorIdToStorageStatus("big").numBlocks === 1) @@ -133,8 +133,8 @@ class StorageStatusListenerSuite extends SparkFunSuite { val block1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.DISK_ONLY, 0L, 100L)) val block2 = (RDDBlockId(1, 2), BlockStatus(StorageLevel.DISK_ONLY, 0L, 200L)) val block3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.DISK_ONLY, 0L, 300L)) - taskMetrics1.updatedBlocks = Some(Seq(block1, block2)) - taskMetrics2.updatedBlocks = Some(Seq(block3)) + taskMetrics1.setUpdatedBlockStatuses(Seq(block1, block2)) + taskMetrics2.setUpdatedBlockStatuses(Seq(block3)) listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo1, taskMetrics1)) listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo1, taskMetrics2)) assert(listener.executorIdToStorageStatus("big").numBlocks === 3) diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala index 5ac922c2172ce..d1dbf7c1558b2 100644 --- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala @@ -127,7 +127,7 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter { // Task end with a few new persisted blocks, some from the same RDD val metrics1 = new TaskMetrics - metrics1.updatedBlocks = Some(Seq( + metrics1.setUpdatedBlockStatuses(Seq( (RDDBlockId(0, 100), BlockStatus(memAndDisk, 400L, 0L)), (RDDBlockId(0, 101), BlockStatus(memAndDisk, 0L, 400L)), (RDDBlockId(1, 20), BlockStatus(memAndDisk, 0L, 240L)) @@ -146,7 +146,7 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter { // Task end with a few dropped blocks val metrics2 = new TaskMetrics - metrics2.updatedBlocks = Some(Seq( + metrics2.setUpdatedBlockStatuses(Seq( (RDDBlockId(0, 100), BlockStatus(none, 0L, 0L)), (RDDBlockId(1, 20), BlockStatus(none, 0L, 0L)), (RDDBlockId(2, 40), BlockStatus(none, 0L, 0L)), // doesn't actually exist @@ -173,8 +173,8 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter { val taskMetrics1 = new TaskMetrics val block0 = (RDDBlockId(0, 1), BlockStatus(memOnly, 100L, 0L)) val block1 = (RDDBlockId(1, 1), BlockStatus(memOnly, 200L, 0L)) - taskMetrics0.updatedBlocks = Some(Seq(block0)) - taskMetrics1.updatedBlocks = Some(Seq(block1)) + taskMetrics0.setUpdatedBlockStatuses(Seq(block0)) + taskMetrics1.setUpdatedBlockStatuses(Seq(block1)) bus.postToAll(SparkListenerBlockManagerAdded(1L, bm1, 1000L)) bus.postToAll(SparkListenerStageSubmitted(stageInfo0)) assert(storageListener.rddInfoList.size === 0) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index b828648fcd1c4..9b2dc20c3e686 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -557,7 +557,7 @@ class JsonProtocolSuite extends SparkFunSuite { metrics1.shuffleWriteMetrics, metrics2.shuffleWriteMetrics, assertShuffleWriteEquals) assertOptionEquals( metrics1.inputMetrics, metrics2.inputMetrics, assertInputMetricsEquals) - assertOptionEquals(metrics1.updatedBlocks, metrics2.updatedBlocks, assertBlocksEquals) + assertBlocksEquals(metrics1.updatedBlockStatuses, metrics2.updatedBlockStatuses) } private def assertEquals(metrics1: ShuffleReadMetrics, metrics2: ShuffleReadMetrics) { @@ -787,10 +787,9 @@ class JsonProtocolSuite extends SparkFunSuite { t.mergeShuffleReadMetrics() } if (hasOutput) { - val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop) + val outputMetrics = t.registerOutputMetrics(DataWriteMethod.Hadoop) outputMetrics.setBytesWritten(a + b + c) outputMetrics.setRecordsWritten(if (hasRecords) (a + b + c)/100 else -1) - t.outputMetrics = Some(outputMetrics) } else { val sw = t.registerShuffleWriteMetrics() sw.incShuffleBytesWritten(a + b + c) @@ -798,7 +797,7 @@ class JsonProtocolSuite extends SparkFunSuite { sw.setShuffleRecordsWritten(if (hasRecords) (a + b + c) / 100 else -1) } // Make at most 6 blocks - t.updatedBlocks = Some((1 to (e % 5 + 1)).map { i => + t.setUpdatedBlockStatuses((1 to (e % 5 + 1)).map { i => (RDDBlockId(e % i, f % i), BlockStatus(StorageLevel.MEMORY_AND_DISK_SER_2, a % i, b % i)) }.toSeq) t From d2e4e23be82a0afb2f39d629ee7413591bc08c8d Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 18 Jan 2016 14:42:19 -0800 Subject: [PATCH 6/9] One more --- .../util/collection/unsafe/sort/UnsafeExternalSorter.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index 68dc0c6d415f6..a6edc1ad3f665 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -122,9 +122,7 @@ private UnsafeExternalSorter( // Use getSizeAsKb (not bytes) to maintain backwards compatibility for units // this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024; this.fileBufferSizeBytes = 32 * 1024; - // TODO: metrics tracking + integration with shuffle write metrics - // need to connect the write metrics to task metrics so we count the spill IO somewhere. - this.writeMetrics = new ShuffleWriteMetrics(); + this.writeMetrics = taskContext.taskMetrics().registerShuffleWriteMetrics(); if (existingInMemorySorter == null) { this.inMemSorter = new UnsafeInMemorySorter( From c04b5df944e32d6854ab5ed4a282b77df889d481 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 18 Jan 2016 16:13:08 -0800 Subject: [PATCH 7/9] Review comments --- .../apache/spark/executor/TaskMetrics.scala | 15 ------- .../apache/spark/rdd/PairRDDFunctions.scala | 40 +++++++++---------- 2 files changed, 19 insertions(+), 36 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 2202e33090e71..7afba6bdf8613 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -149,11 +149,6 @@ class TaskMetrics extends Serializable { */ def outputMetrics: Option[OutputMetrics] = _outputMetrics - @deprecated("setting OutputMetrics is for internal use only", "2.0.0") - def outputMetrics_=(om: Option[OutputMetrics]): Unit = { - _outputMetrics = om - } - /** * Get or create a new [[OutputMetrics]] associated with this task. */ @@ -230,11 +225,6 @@ class TaskMetrics extends Serializable { */ def shuffleWriteMetrics: Option[ShuffleWriteMetrics] = _shuffleWriteMetrics - @deprecated("setting ShuffleWriteMetrics is for internal use only", "2.0.0") - def shuffleWriteMetrics_=(swm: Option[ShuffleWriteMetrics]): Unit = { - _shuffleWriteMetrics = swm - } - /** * Get or create a new [[ShuffleWriteMetrics]] associated with this task. */ @@ -267,11 +257,6 @@ class TaskMetrics extends Serializable { if (_updatedBlockStatuses.nonEmpty) Some(_updatedBlockStatuses) else None } - @deprecated("setting updated blocks is for internal use only", "2.0.0") - def updatedBlocks_=(ub: Option[Seq[(BlockId, BlockStatus)]]): Unit = { - _updatedBlockStatuses = ub.getOrElse(Seq.empty[(BlockId, BlockStatus)]) - } - /** * Returns the input metrics object that the task should use. Currently, if * there exists an input metric with the same readMethod, we return that one diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index c098ed2354eaf..33f2f0b44f773 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -1092,7 +1092,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) val committer = format.getOutputCommitter(hadoopContext) committer.setupTask(hadoopContext) - val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context) + val outputMetricsAndBytesWrittenCallback: Option[(OutputMetrics, () => Long)] = + initHadoopOutputMetrics(context) val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K, V]] require(writer != null, "Unable to obtain RecordWriter") @@ -1103,15 +1104,15 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) writer.write(pair._1, pair._2) // Update bytes written metric every few records - maybeUpdateOutputMetrics(bytesWrittenCallback, outputMetrics, recordsWritten) + maybeUpdateOutputMetrics(outputMetricsAndBytesWrittenCallback, recordsWritten) recordsWritten += 1 } } { writer.close(hadoopContext) } committer.commitTask(hadoopContext) - outputMetrics.foreach { om => - bytesWrittenCallback.foreach { fn => om.setBytesWritten(fn()) } + outputMetricsAndBytesWrittenCallback.foreach { case (om, callback) => + om.setBytesWritten(callback()) om.setRecordsWritten(recordsWritten) } 1 @@ -1179,7 +1180,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) // around by taking a mod. We expect that no task will be attempted 2 billion times. val taskAttemptId = (context.taskAttemptId % Int.MaxValue).toInt - val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context) + val outputMetricsAndBytesWrittenCallback: Option[(OutputMetrics, () => Long)] = + initHadoopOutputMetrics(context) writer.setup(context.stageId, context.partitionId, taskAttemptId) writer.open() @@ -1191,15 +1193,15 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef]) // Update bytes written metric every few records - maybeUpdateOutputMetrics(bytesWrittenCallback, outputMetrics, recordsWritten) + maybeUpdateOutputMetrics(outputMetricsAndBytesWrittenCallback, recordsWritten) recordsWritten += 1 } } { writer.close() } writer.commit() - outputMetrics.foreach { om => - bytesWrittenCallback.foreach { fn => om.setBytesWritten(fn()) } + outputMetricsAndBytesWrittenCallback.foreach { case (om, callback) => + om.setBytesWritten(callback()) om.setRecordsWritten(recordsWritten) } } @@ -1211,25 +1213,21 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) // TODO: these don't seem like the right abstractions. // We should abstract the duplicate code in a less awkward way. + // return type: (output metrics, bytes written callback), defined only if the latter is defined private def initHadoopOutputMetrics( - context: TaskContext): (Option[OutputMetrics], Option[() => Long]) = { + context: TaskContext): Option[(OutputMetrics, () => Long)] = { val bytesWrittenCallback = SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback() - val outputMetrics = - if (bytesWrittenCallback.isDefined) { - Some(context.taskMetrics().registerOutputMetrics(DataWriteMethod.Hadoop)) - } else { - None - } - (outputMetrics, bytesWrittenCallback) + bytesWrittenCallback.map { b => + (context.taskMetrics().registerOutputMetrics(DataWriteMethod.Hadoop), b) + } } private def maybeUpdateOutputMetrics( - bytesWrittenCallback: Option[() => Long], - outputMetrics: Option[OutputMetrics], + outputMetricsAndBytesWrittenCallback: Option[(OutputMetrics, () => Long)], recordsWritten: Long): Unit = { - outputMetrics.foreach { om => - if (recordsWritten % PairRDDFunctions.RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0) { - bytesWrittenCallback.foreach { fn => om.setBytesWritten(fn()) } + if (recordsWritten % PairRDDFunctions.RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0) { + outputMetricsAndBytesWrittenCallback.foreach { case (om, callback) => + om.setBytesWritten(callback()) om.setRecordsWritten(recordsWritten) } } From 269031f162cbce031efd4cdce55908f46569a8c8 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 18 Jan 2016 16:33:12 -0800 Subject: [PATCH 8/9] Remove unused method --- .../scala/org/apache/spark/CacheManager.scala | 3 +-- .../apache/spark/executor/TaskMetrics.scala | 25 ------------------- .../org/apache/spark/rdd/HadoopRDD.scala | 2 +- .../org/apache/spark/rdd/NewHadoopRDD.scala | 3 +-- .../datasources/SqlNewHadoopRDD.scala | 3 +-- 5 files changed, 4 insertions(+), 32 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index 32ab9b6801bb1..fa8e2b953835b 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -43,8 +43,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { blockManager.get(key) match { case Some(blockResult) => // Partition is already materialized, so just return its values - val existingMetrics = context.taskMetrics - .getInputMetricsForReadMethod(blockResult.readMethod) + val existingMetrics = context.taskMetrics().registerInputMetrics(blockResult.readMethod) existingMetrics.incBytesRead(blockResult.bytes) val iter = blockResult.data.asInstanceOf[Iterator[T]] diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 7afba6bdf8613..36bcb13aa4659 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -257,31 +257,6 @@ class TaskMetrics extends Serializable { if (_updatedBlockStatuses.nonEmpty) Some(_updatedBlockStatuses) else None } - /** - * Returns the input metrics object that the task should use. Currently, if - * there exists an input metric with the same readMethod, we return that one - * so the caller can accumulate bytes read. If the readMethod is different - * than previously seen by this task, we return a new InputMetric but don't - * record it. - * - * Once https://issues.apache.org/jira/browse/SPARK-5225 is addressed, - * we can store all the different inputMetrics (one per readMethod). - */ - private[spark] def getInputMetricsForReadMethod(readMethod: DataReadMethod): InputMetrics = { - synchronized { - _inputMetrics match { - case None => - val metrics = new InputMetrics(readMethod) - _inputMetrics = Some(metrics) - metrics - case Some(metrics @ InputMetrics(method)) if method == readMethod => - metrics - case Some(InputMetrics(method)) => - new InputMetrics(readMethod) - } - } - } - private[spark] def updateInputMetrics(): Unit = synchronized { inputMetrics.foreach(_.updateBytesRead()) } diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index a7a6e0b8a94f6..a79ab86d49227 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -212,7 +212,7 @@ class HadoopRDD[K, V]( logInfo("Input split: " + split.inputSplit) val jobConf = getJobConf() - val inputMetrics = context.taskMetrics.getInputMetricsForReadMethod(DataReadMethod.Hadoop) + val inputMetrics = context.taskMetrics().registerInputMetrics(DataReadMethod.Hadoop) // Sets the thread local variable for the file's name split.inputSplit.value match { diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 7a1197830443f..5cc9c81cc6749 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -129,8 +129,7 @@ class NewHadoopRDD[K, V]( logInfo("Input split: " + split.serializableHadoopSplit) val conf = getConf - val inputMetrics = context.taskMetrics - .getInputMetricsForReadMethod(DataReadMethod.Hadoop) + val inputMetrics = context.taskMetrics().registerInputMetrics(DataReadMethod.Hadoop) // Find a function that will return the FileSystem bytes read by this thread. Do this before // creating RecordReader, because RecordReader's constructor might read some bytes diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala index d45d2db62f3a9..8222b84d33e3b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala @@ -126,8 +126,7 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag]( logInfo("Input split: " + split.serializableHadoopSplit) val conf = getConf(isDriverSide = false) - val inputMetrics = context.taskMetrics - .getInputMetricsForReadMethod(DataReadMethod.Hadoop) + val inputMetrics = context.taskMetrics().registerInputMetrics(DataReadMethod.Hadoop) // Sets the thread local variable for the file's name split.serializableHadoopSplit.value match { From 12bd943340eb887711f15665e7c3805e3a76558c Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 18 Jan 2016 19:10:05 -0800 Subject: [PATCH 9/9] Add back deprecated things --- .../org/apache/spark/executor/TaskMetrics.scala | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 36bcb13aa4659..32ef5a9b5606f 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -149,6 +149,11 @@ class TaskMetrics extends Serializable { */ def outputMetrics: Option[OutputMetrics] = _outputMetrics + @deprecated("setting OutputMetrics is for internal use only", "2.0.0") + def outputMetrics_=(om: Option[OutputMetrics]): Unit = { + _outputMetrics = om + } + /** * Get or create a new [[OutputMetrics]] associated with this task. */ @@ -225,6 +230,11 @@ class TaskMetrics extends Serializable { */ def shuffleWriteMetrics: Option[ShuffleWriteMetrics] = _shuffleWriteMetrics + @deprecated("setting ShuffleWriteMetrics is for internal use only", "2.0.0") + def shuffleWriteMetrics_=(swm: Option[ShuffleWriteMetrics]): Unit = { + _shuffleWriteMetrics = swm + } + /** * Get or create a new [[ShuffleWriteMetrics]] associated with this task. */ @@ -244,6 +254,11 @@ class TaskMetrics extends Serializable { */ def updatedBlockStatuses: Seq[(BlockId, BlockStatus)] = _updatedBlockStatuses + @deprecated("setting updated blocks is for internal use only", "2.0.0") + def updatedBlocks_=(ub: Option[Seq[(BlockId, BlockStatus)]]): Unit = { + _updatedBlockStatuses = ub.getOrElse(Seq.empty[(BlockId, BlockStatus)]) + } + private[spark] def incUpdatedBlockStatuses(v: Seq[(BlockId, BlockStatus)]): Unit = { _updatedBlockStatuses ++= v }