Skip to content

Commit ed072fc

Browse files
committed
Address review comments from @tdas
1 parent 32d0418 commit ed072fc

File tree

3 files changed

+17
-23
lines changed

3 files changed

+17
-23
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,10 +81,10 @@ class SQLMetric(val metricType: String, initValue: Long = 0L) extends Accumulato
8181
}
8282

8383
object SQLMetrics {
84-
val SUM_METRIC = "sum"
85-
val SIZE_METRIC = "size"
86-
val TIMING_METRIC = "timing"
87-
val AVERAGE_METRIC = "average"
84+
private val SUM_METRIC = "sum"
85+
private val SIZE_METRIC = "size"
86+
private val TIMING_METRIC = "timing"
87+
private val AVERAGE_METRIC = "average"
8888

8989
private val baseForAvgMetric: Int = 10
9090

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,13 +90,11 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan =>
9090
* the driver after this SparkPlan has been executed and metrics have been updated.
9191
*/
9292
def getProgress(): StateOperatorProgress = {
93-
// average metric is a bit tricky, so hard to aggregate: just exclude them to simplify issue
94-
val avgExcludedCustomMetrics = stateStoreCustomMetrics
95-
.filterNot(_._2.metricType == SQLMetrics.AVERAGE_METRIC)
93+
val customMetrics = stateStoreCustomMetrics
9694
.map(entry => entry._1 -> longMetric(entry._1).value)
9795

9896
val javaConvertedCustomMetrics: java.util.HashMap[String, java.lang.Long] =
99-
new java.util.HashMap(avgExcludedCustomMetrics.mapValues(long2Long).asJava)
97+
new java.util.HashMap(customMetrics.mapValues(long2Long).asJava)
10098

10199
new StateOperatorProgress(
102100
numRowsTotal = longMetric("numTotalStateRows").value,

sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -52,21 +52,17 @@ class StateOperatorProgress private[sql](
5252
new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, memoryUsedBytes, customMetrics)
5353

5454
private[sql] def jsonValue: JValue = {
55-
def safeMapToJValue[T](map: ju.Map[String, T], valueToJValue: T => JValue): JValue = {
56-
if (map.isEmpty) return JNothing
57-
val keys = map.keySet.asScala.toSeq.sorted
58-
keys.map { k => k -> valueToJValue(map.get(k)) : JObject }.reduce(_ ~ _)
59-
}
60-
61-
val jsonVal = ("numRowsTotal" -> JInt(numRowsTotal)) ~
62-
("numRowsUpdated" -> JInt(numRowsUpdated)) ~
63-
("memoryUsedBytes" -> JInt(memoryUsedBytes))
64-
65-
if (!customMetrics.isEmpty) {
66-
jsonVal ~ ("customMetrics" -> safeMapToJValue[JLong](customMetrics, v => JInt(v.toLong)))
67-
} else {
68-
jsonVal
69-
}
55+
("numRowsTotal" -> JInt(numRowsTotal)) ~
56+
("numRowsUpdated" -> JInt(numRowsUpdated)) ~
57+
("memoryUsedBytes" -> JInt(memoryUsedBytes)) ~
58+
("customMetrics" -> {
59+
if (!customMetrics.isEmpty) {
60+
val keys = customMetrics.keySet.asScala.toSeq.sorted
61+
keys.map { k => k -> JInt(customMetrics.get(k).toLong) : JObject }.reduce(_ ~ _)
62+
} else {
63+
JNothing
64+
}
65+
})
7066
}
7167

7268
override def toString: String = prettyJson

0 commit comments

Comments
 (0)