Skip to content

[SPARK-24441][SS] Expose total estimated size of states in HDFSBackedStateStoreProvider #21469

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 9 commits into from

Conversation

HeartSaVioR
Copy link
Contributor

@HeartSaVioR HeartSaVioR commented May 31, 2018

What changes were proposed in this pull request?

This patch exposes the estimation of size of cache (loadedMaps) in HDFSBackedStateStoreProvider as a custom metric of StateStore.

The rationalize of the patch is that state backed by HDFSBackedStateStoreProvider will consume more memory than the number what we can get from query status due to caching multiple versions of states. The memory footprint to be much larger than query status reports in situations where the state store is getting a lot of updates: while shallow-copying map incurs additional small memory usages due to the size of map entities and references, but row objects will still be shared across the versions. If there're lots of updates between batches, less row objects will be shared and more row objects will exist in memory consuming much memory then what we expect.

While HDFSBackedStateStore refers loadedMaps in HDFSBackedStateStoreProvider directly, there would be only one StateStoreWriter which refers a StateStoreProvider, so the value is not exposed as well as being aggregated multiple times. Current state metrics are safe to aggregate for the same reason.

How was this patch tested?

Tested manually. Below is the snapshot of UI page which is reflected by the patch:

screen shot 2018-06-05 at 10 16 16 pm

Please refer "estimated size of states cache in provider total" as well as "count of versions in state cache in provider".

@SparkQA
Copy link

SparkQA commented May 31, 2018

Test build #91347 has finished for PR 21469 at commit dc11338.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 1, 2018

Test build #91375 has finished for PR 21469 at commit 6c1c30b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

@@ -181,6 +182,12 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
}
}

def getCustomMetricsForProvider(): Map[StateStoreCustomMetric, Long] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tiny nit:

def getCustomMetricsForProvider(): Map[StateStoreCustomMetric, Long] = synchronized {
  Map(metricProviderLoaderMapSize -> SizeEstimator.estimate(loadedMaps))
}

@HyukjinKwon
Copy link
Member

Shall we make the PR title complete? Looks truncated.

@HeartSaVioR HeartSaVioR changed the title [SPARK-24441][SS] Expose total size of states in HDFSBackedStateStore… [SPARK-24441][SS] Expose total size of states in HDFSBackedStateStoreProvider Jun 1, 2018
@HeartSaVioR
Copy link
Contributor Author

Thanks @HyukjinKwon for reviewing. Addressed PR title as well as fixing nit.

@SparkQA
Copy link

SparkQA commented Jun 1, 2018

Test build #91382 has finished for PR 21469 at commit 933fb2e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jose-torres
Copy link
Contributor

LGTM. To clarify the description, we expect the memory footprint to be much larger than query status reports in situations where the state store is getting a lot of updates?

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Jun 3, 2018

@jose-torres
Ah yes I forgot that shallow copy has been occurring, so while new map should hold the size of map entries and references, but row object itself will be shared across versions. Thanks for pointing it out. Will update the description.

@HeartSaVioR HeartSaVioR changed the title [SPARK-24441][SS] Expose total size of states in HDFSBackedStateStoreProvider [SPARK-24441][SS] Expose total estimated size of states in HDFSBackedStateStoreProvider Jun 3, 2018
@arunmahadevan
Copy link
Contributor

arunmahadevan commented Jun 4, 2018

@HeartSaVioR , may be then this should be reported in the "memoryUsedBytes" in the StateOperatorProgress (value reported in StreamingQueryProgress) or better as a separate custom metrics because currently the usage reported does not reflect the memory used for the cache.

Question: in the screenshot "Estimated size of states cache in provider total" is 3.3 MB whereas the "memory used by state total" is 20.6 KB with "total number of state rows" = 2. This 150x difference is expected with just 2 rows in the state? Were there 100 versions of the map in the sample output you posted?

@HeartSaVioR
Copy link
Contributor Author

@arunmahadevan
I didn't add the metric to StateOperatorProgress cause this behavior is specific to HDFSBackedStateStoreProvider (though this is only one implementation available in Apache Spark) so not sure this metric can be treated as a general one. (@tdas what do you think about this?)

Btw, the cache is going to clean up when maintenance operation is in progress, so there could be more than 100 versions in map. Not sure why it shows 150x, but I couldn't find missing spot on the patch. Maybe the issue is from SizeEstimator.estimate()?

One thing we need to check is how SizeEstimator.estimate() calculate the memory usage when Unsafe row objects are shared across versions. If SizeEstimator adds the size of object whenever it is referenced, it will report much higher memory usage than actual.

@HeartSaVioR
Copy link
Contributor Author

Looks like the size is added only once for same identity on SizeEstimator.estimate(), so SizeEstimator.estimate() is working correctly in this case. There might be other valid cases, but not sure.

@HeartSaVioR
Copy link
Contributor Author

Also added custom metric for the count of versions stored in loadedMaps.

This is a new screenshot:
screen shot 2018-06-05 at 10 16 16 pm

@arunmahadevan
Copy link
Contributor

I didn't add the metric to StateOperatorProgress cause this behavior is specific to HDFSBackedStateStoreProvider

May be this can be reported as a custom metrics and keep it optional and that way its not tied to any specific implementation.

@SparkQA
Copy link

SparkQA commented Jun 5, 2018

Test build #91486 has finished for PR 21469 at commit 345397d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class StateStoreCustomAverageMetric(name: String, desc: String) extends StateStoreCustomMetric

@HyukjinKwon
Copy link
Member

@jose-torres is it good to go?

@HeartSaVioR
Copy link
Contributor Author

@arunmahadevan
Added custom metrics in state store to streaming query status as well. You can see providerLoadedMapSize is added to stateOperators.customMetrics in below output.

I have to exclude providerLoadedMapCountOfVersions from the list, since average metric is implemented a bit tricky and doesn't look like easy to aggregate for streaming query status.
We may want to reimplement SQLMetric and subclasses to make sure everything works correctly without any tricky approach, but that doesn't look like trivial to address and I think this is out of scope on this PR.

18/06/06 22:51:23 INFO MicroBatchExecution: Streaming query made progress: {
  "id" : "7564a0b7-e3b2-4d53-b246-b774ab04e586",
  "runId" : "8dd34784-080c-4f86-afaf-ac089902252d",
  "name" : null,
  "timestamp" : "2018-06-06T13:51:15.467Z",
  "batchId" : 4,
  "numInputRows" : 547,
  "inputRowsPerSecond" : 67.15776550030694,
  "processedRowsPerSecond" : 65.94333936106088,
  "durationMs" : {
    "addBatch" : 7944,
    "getBatch" : 1,
    "getEndOffset" : 0,
    "queryPlanning" : 61,
    "setOffsetRange" : 5,
    "triggerExecution" : 8295,
    "walCommit" : 158
  },
  "eventTime" : {
    "avg" : "2018-06-06T13:51:10.313Z",
    "max" : "2018-06-06T13:51:14.250Z",
    "min" : "2018-06-06T13:51:07.098Z",
    "watermark" : "2018-06-06T13:50:36.676Z"
  },
  "stateOperators" : [ {
    "numRowsTotal" : 20,
    "numRowsUpdated" : 16,
    "memoryUsedBytes" : 26679,
    "customMetrics" : {
      "providerLoadedMapSize" : 181911
    }
  } ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[apachelogs-v2]]",
    "startOffset" : {
      "apachelogs-v2" : {
        "2" : 489056,
        "4" : 489053,
        "1" : 489055,
        "3" : 489051,
        "0" : 489053
      }
    },
    "endOffset" : {
      "apachelogs-v2" : {
        "2" : 489056,
        "4" : 489053,
        "1" : 489055,
        "3" : 489051,
        "0" : 489053
      }
    },
    "numInputRows" : 547,
    "inputRowsPerSecond" : 67.15776550030694,
    "processedRowsPerSecond" : 65.94333936106088
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSinkProvider@60999714"
  }
}

@arunmahadevan
Copy link
Contributor

Nice, LGTM.

@SparkQA
Copy link

SparkQA commented Jun 6, 2018

Test build #91503 has finished for PR 21469 at commit af57f26.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 7, 2018

Test build #91509 has finished for PR 21469 at commit 7ec3242.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -231,7 +231,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
test("event ordering") {
val listener = new EventCollector
withListenerAdded(listener) {
for (i <- 1 to 100) {
for (i <- 1 to 50) {
Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Jun 7, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After the patch this test starts failing: it just means there's more time needed to run this loop 100 times. It doesn't mean the logic is broken. Decreasing number works for me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, and I agree with the implicit claim that this slowdown isn't too worrying.

@SparkQA
Copy link

SparkQA commented Jun 7, 2018

Test build #91523 has finished for PR 21469 at commit 3c80cad.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Jun 7, 2018

Test build #91526 has finished for PR 21469 at commit 3c80cad.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jun 8, 2018

Test build #91535 has finished for PR 21469 at commit 3c80cad.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

ok to test

@HeartSaVioR
Copy link
Contributor Author

@tdas Thanks for the review! Addressed review comments.

@SparkQA
Copy link

SparkQA commented Aug 1, 2018

Test build #93869 has finished for PR 21469 at commit ed072fc.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

Retest this, please

@SparkQA
Copy link

SparkQA commented Aug 2, 2018

Test build #93906 has finished for PR 21469 at commit ed072fc.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Aug 2, 2018

Test build #93927 has finished for PR 21469 at commit ed072fc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

@tdas Kindly reminder.

@tdas
Copy link
Contributor

tdas commented Aug 8, 2018

I am having a second thoughts about this. Exposing the entire memory usage of all the loaded maps as another custom metric .... just adds more confusion. Rather the point of the the main state metric memoryUsedBytes is to capture how much memory is occupied because of the one partition of the state, and that implicitly should cover all the loaded versions of that state partition. So I strongly feel that instead of adding a custom metric, we should change the existing memoryUsedBytes to capture all the memory.

I am fine adding the custom metrics hit and miss counts. No questions about that.

What do you think?

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Aug 8, 2018

My series of patches could be possible based on two metrics: size for memory usage of latest version and size for total memory usage of loaded versions. SPARK-24717 (#21700) enabled the possibility to tune the overall state memory usage in executor, and if end users have either one metric they couldn't tune it.

IMHO, I'm not 100% sure how much this patch provides confusion to the end users, but if the intention of memoryUsedBytes is for measuring overall state partition, what about replacing memoryUsedBytes as size for total memory usage of loaded versions, but also placing size for memory usage of latest version to custom metric?

@HeartSaVioR
Copy link
Contributor Author

@tdas Kindly reminder.
@zsxwing Could you take a quick look at this and share your thought? I think the patch is ready to merge, but blocked with slightly conflict of view so more voices would be better.

@tdas
Copy link
Contributor

tdas commented Aug 21, 2018

@HeartSaVioR I think I agree with a second approach that you suggested. So
memoryUsedBytes => size for total memory usage of loaded versions and
customMetric => size for memory usage of latest version
Please make the necessary changes.

customMetric.stateOnCurrentVersionSizeBytes to size for memory usage of current version
@HeartSaVioR
Copy link
Contributor Author

@tdas Thanks for the feedback! Updated the PR.

@SparkQA
Copy link

SparkQA commented Aug 21, 2018

Test build #95012 has finished for PR 21469 at commit 545081e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor

tdas commented Aug 21, 2018

LGTM.

@tdas
Copy link
Contributor

tdas commented Aug 21, 2018

Merged to master.

@asfgit asfgit closed this in 42035a4 Aug 21, 2018
@HeartSaVioR
Copy link
Contributor Author

Thanks all for reviewing and thanks @tdas for merging this in!

@HeartSaVioR HeartSaVioR deleted the SPARK-24441 branch August 21, 2018 22:42
@tdas
Copy link
Contributor

tdas commented Aug 21, 2018

Unfortunately this PR broke the master build. Looks like some import that probably got removed in the other PR I merged, which didnt create any direct conflict.

@HeartSaVioR
Copy link
Contributor Author

@tdas Yeah, I can check with master branch if you would like to let me handle, and please go ahead if you would like to handle it by yourself.

@HeartSaVioR
Copy link
Contributor Author

@tdas In case of you are not working on the patch, I'm working on the fix and will provide minor PR.

@tdas
Copy link
Contributor

tdas commented Aug 22, 2018

I did. Fixed the import

vatsalmevada pushed a commit to TIBCOSoftware/snappy-spark that referenced this pull request Nov 8, 2019
… HDFSBackedStateStoreProvider

This patch proposes breaking down configuration of retaining batch size on state into two pieces: files and in memory (cache). While this patch reuses existing configuration for files, it introduces new configuration, "spark.sql.streaming.maxBatchesToRetainInMemory" to configure max count of batch to retain in memory.

Apply this patch on top of SPARK-24441 (apache#21469), and manually tested in various workloads to ensure overall size of states in memory is around 2x or less of the size of latest version of state, while it was 10x ~ 80x before applying the patch.

Author: Jungtaek Lim <[email protected]>

Closes apache#21700 from HeartSaVioR/SPARK-24717.
vatsalmevada pushed a commit to TIBCOSoftware/snappy-spark that referenced this pull request Nov 8, 2019
#183)

[SPARK-24717][SS] Split out max retain version of state for memory in HDFSBackedStateStoreProvider

This patch proposes breaking down configuration of retaining batch size on state into two pieces: files and in memory (cache). While this patch reuses existing configuration for files, it introduces new configuration, "spark.sql.streaming.maxBatchesToRetainInMemory" to configure max count of batch to retain in memory.

Apply this patch on top of SPARK-24441 (apache#21469), and manually tested in various workloads to ensure overall size of states in memory is around 2x or less of the size of latest version of state, while it was 10x ~ 80x before applying the patch.

Author: Jungtaek Lim <[email protected]>

Closes apache#21700 from HeartSaVioR/SPARK-24717.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants