Skip to content

Commit 42035a4

Browse files
HeartSaVioRtdas
authored andcommitted
[SPARK-24441][SS] Expose total estimated size of states in HDFSBackedStateStoreProvider
## What changes were proposed in this pull request? This patch exposes the estimation of size of cache (loadedMaps) in HDFSBackedStateStoreProvider as a custom metric of StateStore. The rationalize of the patch is that state backed by HDFSBackedStateStoreProvider will consume more memory than the number what we can get from query status due to caching multiple versions of states. The memory footprint to be much larger than query status reports in situations where the state store is getting a lot of updates: while shallow-copying map incurs additional small memory usages due to the size of map entities and references, but row objects will still be shared across the versions. If there're lots of updates between batches, less row objects will be shared and more row objects will exist in memory consuming much memory then what we expect. While HDFSBackedStateStore refers loadedMaps in HDFSBackedStateStoreProvider directly, there would be only one `StateStoreWriter` which refers a StateStoreProvider, so the value is not exposed as well as being aggregated multiple times. Current state metrics are safe to aggregate for the same reason. ## How was this patch tested? Tested manually. Below is the snapshot of UI page which is reflected by the patch: <img width="601" alt="screen shot 2018-06-05 at 10 16 16 pm" src="https://user-images.githubusercontent.com/1317309/40978481-b46ad324-690e-11e8-9b0f-e80528612a62.png"> Please refer "estimated size of states cache in provider total" as well as "count of versions in state cache in provider". Closes #21469 from HeartSaVioR/SPARK-24441. Authored-by: Jungtaek Lim <[email protected]> Signed-off-by: Tathagata Das <[email protected]>
1 parent ac0174e commit 42035a4

File tree

8 files changed

+176
-9
lines changed

8 files changed

+176
-9
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.streaming.state
2020
import java.io._
2121
import java.util
2222
import java.util.Locale
23+
import java.util.concurrent.atomic.LongAdder
2324

2425
import scala.collection.JavaConverters._
2526
import scala.collection.mutable
@@ -165,7 +166,16 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
165166
}
166167

167168
override def metrics: StateStoreMetrics = {
168-
StateStoreMetrics(mapToUpdate.size(), SizeEstimator.estimate(mapToUpdate), Map.empty)
169+
// NOTE: we provide estimation of cache size as "memoryUsedBytes", and size of state for
170+
// current version as "stateOnCurrentVersionSizeBytes"
171+
val metricsFromProvider: Map[String, Long] = getMetricsForProvider()
172+
173+
val customMetrics = metricsFromProvider.flatMap { case (name, value) =>
174+
// just allow searching from list cause the list is small enough
175+
supportedCustomMetrics.find(_.name == name).map(_ -> value)
176+
} + (metricStateOnCurrentVersionSizeBytes -> SizeEstimator.estimate(mapToUpdate))
177+
178+
StateStoreMetrics(mapToUpdate.size(), metricsFromProvider("memoryUsedBytes"), customMetrics)
169179
}
170180

171181
/**
@@ -180,6 +190,12 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
180190
}
181191
}
182192

193+
def getMetricsForProvider(): Map[String, Long] = synchronized {
194+
Map("memoryUsedBytes" -> SizeEstimator.estimate(loadedMaps),
195+
metricLoadedMapCacheHit.name -> loadedMapCacheHitCount.sum(),
196+
metricLoadedMapCacheMiss.name -> loadedMapCacheMissCount.sum())
197+
}
198+
183199
/** Get the state store for making updates to create a new `version` of the store. */
184200
override def getStore(version: Long): StateStore = synchronized {
185201
require(version >= 0, "Version cannot be less than 0")
@@ -226,7 +242,8 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
226242
}
227243

228244
override def supportedCustomMetrics: Seq[StateStoreCustomMetric] = {
229-
Nil
245+
metricStateOnCurrentVersionSizeBytes :: metricLoadedMapCacheHit :: metricLoadedMapCacheMiss ::
246+
Nil
230247
}
231248

232249
override def toString(): String = {
@@ -248,6 +265,21 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
248265
private lazy val fm = CheckpointFileManager.create(baseDir, hadoopConf)
249266
private lazy val sparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf)
250267

268+
private val loadedMapCacheHitCount: LongAdder = new LongAdder
269+
private val loadedMapCacheMissCount: LongAdder = new LongAdder
270+
271+
private lazy val metricStateOnCurrentVersionSizeBytes: StateStoreCustomSizeMetric =
272+
StateStoreCustomSizeMetric("stateOnCurrentVersionSizeBytes",
273+
"estimated size of state only on current version")
274+
275+
private lazy val metricLoadedMapCacheHit: StateStoreCustomMetric =
276+
StateStoreCustomSumMetric("loadedMapCacheHitCount",
277+
"count of cache hit on states cache in provider")
278+
279+
private lazy val metricLoadedMapCacheMiss: StateStoreCustomMetric =
280+
StateStoreCustomSumMetric("loadedMapCacheMissCount",
281+
"count of cache miss on states cache in provider")
282+
251283
private case class StoreFile(version: Long, path: Path, isSnapshot: Boolean)
252284

253285
private def commitUpdates(newVersion: Long, map: MapType, output: DataOutputStream): Unit = {
@@ -311,13 +343,16 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
311343
// Shortcut if the map for this version is already there to avoid a redundant put.
312344
val loadedCurrentVersionMap = synchronized { Option(loadedMaps.get(version)) }
313345
if (loadedCurrentVersionMap.isDefined) {
346+
loadedMapCacheHitCount.increment()
314347
return loadedCurrentVersionMap.get
315348
}
316349

317350
logWarning(s"The state for version $version doesn't exist in loadedMaps. " +
318351
"Reading snapshot file and delta files if needed..." +
319352
"Note that this is normal for the first batch of starting query.")
320353

354+
loadedMapCacheMissCount.increment()
355+
321356
val (result, elapsedMs) = Utils.timeTakenMs {
322357
val snapshotCurrentVersionMap = readSnapshotFile(version)
323358
if (snapshotCurrentVersionMap.isDefined) {

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,8 @@ trait StateStoreCustomMetric {
138138
def name: String
139139
def desc: String
140140
}
141+
142+
case class StateStoreCustomSumMetric(name: String, desc: String) extends StateStoreCustomMetric
141143
case class StateStoreCustomSizeMetric(name: String, desc: String) extends StateStoreCustomMetric
142144
case class StateStoreCustomTimingMetric(name: String, desc: String) extends StateStoreCustomMetric
143145

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,8 @@ class SymmetricHashJoinStateManager(
269269
keyWithIndexToValueMetrics.numKeys, // represent each buffered row only once
270270
keyToNumValuesMetrics.memoryUsedBytes + keyWithIndexToValueMetrics.memoryUsedBytes,
271271
keyWithIndexToValueMetrics.customMetrics.map {
272+
case (s @ StateStoreCustomSumMetric(_, desc), value) =>
273+
s.copy(desc = newDesc(desc)) -> value
272274
case (s @ StateStoreCustomSizeMetric(_, desc), value) =>
273275
s.copy(desc = newDesc(desc)) -> value
274276
case (s @ StateStoreCustomTimingMetric(_, desc), value) =>

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,10 +88,18 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan =>
8888
* the driver after this SparkPlan has been executed and metrics have been updated.
8989
*/
9090
def getProgress(): StateOperatorProgress = {
91+
val customMetrics = stateStoreCustomMetrics
92+
.map(entry => entry._1 -> longMetric(entry._1).value)
93+
94+
val javaConvertedCustomMetrics: java.util.HashMap[String, java.lang.Long] =
95+
new java.util.HashMap(customMetrics.mapValues(long2Long).asJava)
96+
9197
new StateOperatorProgress(
9298
numRowsTotal = longMetric("numTotalStateRows").value,
9399
numRowsUpdated = longMetric("numUpdatedStateRows").value,
94-
memoryUsedBytes = longMetric("stateMemory").value)
100+
memoryUsedBytes = longMetric("stateMemory").value,
101+
javaConvertedCustomMetrics
102+
)
95103
}
96104

97105
/** Records the duration of running `body` for the next query progress update. */
@@ -113,6 +121,8 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan =>
113121
private def stateStoreCustomMetrics: Map[String, SQLMetric] = {
114122
val provider = StateStoreProvider.create(sqlContext.conf.stateStoreProviderClass)
115123
provider.supportedCustomMetrics.map {
124+
case StateStoreCustomSumMetric(name, desc) =>
125+
name -> SQLMetrics.createMetric(sparkContext, desc)
116126
case StateStoreCustomSizeMetric(name, desc) =>
117127
name -> SQLMetrics.createSizeMetric(sparkContext, desc)
118128
case StateStoreCustomTimingMetric(name, desc) =>

sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ import org.apache.spark.annotation.InterfaceStability
3838
class StateOperatorProgress private[sql](
3939
val numRowsTotal: Long,
4040
val numRowsUpdated: Long,
41-
val memoryUsedBytes: Long
41+
val memoryUsedBytes: Long,
42+
val customMetrics: ju.Map[String, JLong] = new ju.HashMap()
4243
) extends Serializable {
4344

4445
/** The compact JSON representation of this progress. */
@@ -48,12 +49,20 @@ class StateOperatorProgress private[sql](
4849
def prettyJson: String = pretty(render(jsonValue))
4950

5051
private[sql] def copy(newNumRowsUpdated: Long): StateOperatorProgress =
51-
new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, memoryUsedBytes)
52+
new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, memoryUsedBytes, customMetrics)
5253

5354
private[sql] def jsonValue: JValue = {
5455
("numRowsTotal" -> JInt(numRowsTotal)) ~
5556
("numRowsUpdated" -> JInt(numRowsUpdated)) ~
56-
("memoryUsedBytes" -> JInt(memoryUsedBytes))
57+
("memoryUsedBytes" -> JInt(memoryUsedBytes)) ~
58+
("customMetrics" -> {
59+
if (!customMetrics.isEmpty) {
60+
val keys = customMetrics.keySet.asScala.toSeq.sorted
61+
keys.map { k => k -> JInt(customMetrics.get(k).toLong) : JObject }.reduce(_ ~ _)
62+
} else {
63+
JNothing
64+
}
65+
})
5766
}
5867

5968
override def toString: String = prettyJson

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,22 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
317317
assert(store.metrics.memoryUsedBytes > noDataMemoryUsed)
318318
}
319319

320+
test("reports memory usage on current version") {
321+
def getSizeOfStateForCurrentVersion(metrics: StateStoreMetrics): Long = {
322+
val metricPair = metrics.customMetrics.find(_._1.name == "stateOnCurrentVersionSizeBytes")
323+
assert(metricPair.isDefined)
324+
metricPair.get._2
325+
}
326+
327+
val provider = newStoreProvider()
328+
val store = provider.getStore(0)
329+
val noDataMemoryUsed = getSizeOfStateForCurrentVersion(store.metrics)
330+
331+
put(store, "a", 1)
332+
store.commit()
333+
assert(getSizeOfStateForCurrentVersion(store.metrics) > noDataMemoryUsed)
334+
}
335+
320336
test("StateStore.get") {
321337
quietly {
322338
val dir = newDir()
@@ -631,6 +647,90 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
631647
assert(CreateAtomicTestManager.cancelCalledInCreateAtomic)
632648
}
633649

650+
test("expose metrics with custom metrics to StateStoreMetrics") {
651+
def getCustomMetric(metrics: StateStoreMetrics, name: String): Long = {
652+
val metricPair = metrics.customMetrics.find(_._1.name == name)
653+
assert(metricPair.isDefined)
654+
metricPair.get._2
655+
}
656+
657+
def getLoadedMapSizeMetric(metrics: StateStoreMetrics): Long = {
658+
metrics.memoryUsedBytes
659+
}
660+
661+
def assertCacheHitAndMiss(
662+
metrics: StateStoreMetrics,
663+
expectedCacheHitCount: Long,
664+
expectedCacheMissCount: Long): Unit = {
665+
val cacheHitCount = getCustomMetric(metrics, "loadedMapCacheHitCount")
666+
val cacheMissCount = getCustomMetric(metrics, "loadedMapCacheMissCount")
667+
assert(cacheHitCount === expectedCacheHitCount)
668+
assert(cacheMissCount === expectedCacheMissCount)
669+
}
670+
671+
val provider = newStoreProvider()
672+
673+
// Verify state before starting a new set of updates
674+
assert(getLatestData(provider).isEmpty)
675+
676+
val store = provider.getStore(0)
677+
assert(!store.hasCommitted)
678+
679+
assert(store.metrics.numKeys === 0)
680+
681+
val initialLoadedMapSize = getLoadedMapSizeMetric(store.metrics)
682+
assert(initialLoadedMapSize >= 0)
683+
assertCacheHitAndMiss(store.metrics, expectedCacheHitCount = 0, expectedCacheMissCount = 0)
684+
685+
put(store, "a", 1)
686+
assert(store.metrics.numKeys === 1)
687+
688+
put(store, "b", 2)
689+
put(store, "aa", 3)
690+
assert(store.metrics.numKeys === 3)
691+
remove(store, _.startsWith("a"))
692+
assert(store.metrics.numKeys === 1)
693+
assert(store.commit() === 1)
694+
695+
assert(store.hasCommitted)
696+
697+
val loadedMapSizeForVersion1 = getLoadedMapSizeMetric(store.metrics)
698+
assert(loadedMapSizeForVersion1 > initialLoadedMapSize)
699+
assertCacheHitAndMiss(store.metrics, expectedCacheHitCount = 0, expectedCacheMissCount = 0)
700+
701+
val storeV2 = provider.getStore(1)
702+
assert(!storeV2.hasCommitted)
703+
assert(storeV2.metrics.numKeys === 1)
704+
705+
put(storeV2, "cc", 4)
706+
assert(storeV2.metrics.numKeys === 2)
707+
assert(storeV2.commit() === 2)
708+
709+
assert(storeV2.hasCommitted)
710+
711+
val loadedMapSizeForVersion1And2 = getLoadedMapSizeMetric(storeV2.metrics)
712+
assert(loadedMapSizeForVersion1And2 > loadedMapSizeForVersion1)
713+
assertCacheHitAndMiss(storeV2.metrics, expectedCacheHitCount = 1, expectedCacheMissCount = 0)
714+
715+
val reloadedProvider = newStoreProvider(store.id)
716+
// intended to load version 2 instead of 1
717+
// version 2 will not be loaded to the cache in provider
718+
val reloadedStore = reloadedProvider.getStore(1)
719+
assert(reloadedStore.metrics.numKeys === 1)
720+
721+
assert(getLoadedMapSizeMetric(reloadedStore.metrics) === loadedMapSizeForVersion1)
722+
assertCacheHitAndMiss(reloadedStore.metrics, expectedCacheHitCount = 0,
723+
expectedCacheMissCount = 1)
724+
725+
// now we are loading version 2
726+
val reloadedStoreV2 = reloadedProvider.getStore(2)
727+
assert(reloadedStoreV2.metrics.numKeys === 2)
728+
729+
assert(getLoadedMapSizeMetric(reloadedStoreV2.metrics) > loadedMapSizeForVersion1)
730+
assertCacheHitAndMiss(reloadedStoreV2.metrics, expectedCacheHitCount = 0,
731+
expectedCacheMissCount = 2)
732+
}
733+
634734
override def newStoreProvider(): HDFSBackedStateStoreProvider = {
635735
newStoreProvider(opId = Random.nextInt(), partition = 0)
636736
}

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
231231
test("event ordering") {
232232
val listener = new EventCollector
233233
withListenerAdded(listener) {
234-
for (i <- 1 to 100) {
234+
for (i <- 1 to 50) {
235235
listener.reset()
236236
require(listener.startEvent === null)
237237
testStream(MemoryStream[Int].toDS)(

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,12 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
5858
| "stateOperators" : [ {
5959
| "numRowsTotal" : 0,
6060
| "numRowsUpdated" : 1,
61-
| "memoryUsedBytes" : 2
61+
| "memoryUsedBytes" : 3,
62+
| "customMetrics" : {
63+
| "loadedMapCacheHitCount" : 1,
64+
| "loadedMapCacheMissCount" : 0,
65+
| "stateOnCurrentVersionSizeBytes" : 2
66+
| }
6267
| } ],
6368
| "sources" : [ {
6469
| "description" : "source",
@@ -230,7 +235,11 @@ object StreamingQueryStatusAndProgressSuite {
230235
"avg" -> "2016-12-05T20:54:20.827Z",
231236
"watermark" -> "2016-12-05T20:54:20.827Z").asJava),
232237
stateOperators = Array(new StateOperatorProgress(
233-
numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 2)),
238+
numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 3,
239+
customMetrics = new java.util.HashMap(Map("stateOnCurrentVersionSizeBytes" -> 2L,
240+
"loadedMapCacheHitCount" -> 1L, "loadedMapCacheMissCount" -> 0L)
241+
.mapValues(long2Long).asJava)
242+
)),
234243
sources = Array(
235244
new SourceProgress(
236245
description = "source",

0 commit comments

Comments
 (0)