Skip to content

Commit a2a36d4

Browse files
author
Kostas Sakellis
committed
CR feedback
1 parent 5a0c770 commit a2a36d4

File tree

3 files changed

+13
-12
lines changed

3 files changed

+13
-12
lines changed

core/src/main/scala/org/apache/spark/CacheManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
4646
// Partition is already materialized, so just return its values
4747
val existingMetrics = context.taskMetrics.inputMetrics
4848
val prevBytesRead = existingMetrics
49-
.filter( _.readMethod == blockResult.inputMetrics.readMethod)
49+
.filter(_.readMethod == blockResult.inputMetrics.readMethod)
5050
.map(_.bytesRead)
5151
.getOrElse(0L)
5252

core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,8 @@ class NewHadoopRDD[K, V](
185185
// If we can't get the bytes read from the FS stats, fall back to the split size,
186186
// which may be inaccurate.
187187
try {
188-
inputMetrics.bytesRead = split.serializableHadoopSplit.value.getLength + bytesReadAtStart
188+
inputMetrics.bytesRead = split.serializableHadoopSplit.value.getLength +
189+
bytesReadAtStart
189190
context.taskMetrics.inputMetrics = Some(inputMetrics)
190191
} catch {
191192
case e: java.io.IOException =>

core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,18 @@
1717

1818
package org.apache.spark.metrics
1919

20-
import java.io.{FileWriter, PrintWriter, File}
21-
import org.apache.hadoop.io.{Text, LongWritable}
22-
import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat => NewTextInputFormat}
20+
import java.io.{File, FileWriter, PrintWriter}
2321

24-
import org.apache.spark.util.Utils
22+
import org.apache.hadoop.conf.Configuration
23+
import org.apache.hadoop.fs.{FileSystem, Path}
24+
import org.apache.hadoop.io.{LongWritable, Text}
25+
import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat => NewTextInputFormat}
2526
import org.apache.spark.SharedSparkContext
2627
import org.apache.spark.deploy.SparkHadoopUtil
27-
import org.apache.spark.scheduler.{SparkListenerTaskEnd, SparkListener}
28-
28+
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
29+
import org.apache.spark.util.Utils
2930
import org.scalatest.FunSuite
3031

31-
import org.apache.hadoop.conf.Configuration
32-
import org.apache.hadoop.fs.{Path, FileSystem}
33-
3432
import scala.collection.mutable.ArrayBuffer
3533

3634
class InputOutputMetricsSuite extends FunSuite with SharedSparkContext {
@@ -69,6 +67,7 @@ class InputOutputMetricsSuite extends FunSuite with SharedSparkContext {
6967
val bytesRead2 = runAndReturnBytesRead {
7068
sc.textFile(tmpFilePath, 4).coalesce(2).count()
7169
}
70+
assert(bytesRead != 0)
7271
assert(bytesRead2 == bytesRead)
7372
assert(bytesRead2 >= tmpFile.length())
7473
}
@@ -86,7 +85,7 @@ class InputOutputMetricsSuite extends FunSuite with SharedSparkContext {
8685
}
8786

8887
// for count and coelesce, the same bytes should be read.
89-
assert(bytesRead2 >= bytesRead2)
88+
assert(bytesRead2 >= bytesRead)
9089
}
9190

9291
test("input metrics for new Hadoop API with coalesce") {
@@ -98,6 +97,7 @@ class InputOutputMetricsSuite extends FunSuite with SharedSparkContext {
9897
sc.newAPIHadoopFile(tmpFilePath, classOf[NewTextInputFormat], classOf[LongWritable],
9998
classOf[Text]).coalesce(5).count()
10099
}
100+
assert(bytesRead != 0)
101101
assert(bytesRead2 == bytesRead)
102102
assert(bytesRead >= tmpFile.length())
103103
}

0 commit comments

Comments
 (0)