-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-43421][SS] Implement Changelog based Checkpointing for RocksDB State Store Provider #41099
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
da54bb5
5bda37f
8bc8552
be7846b
0512806
974a47e
2d81dc2
9df1340
1b0f94c
bf30cf1
19b8355
4ff442f
b3cc436
0ef2fc9
e59d43f
1e46adc
5f53f49
5910fb7
0ee93c1
7c65cac
b2ead71
36d9ae2
4aa2605
ff4cff9
82e7168
4109c29
573e0e9
fc8c1bd
6480621
590f21c
bb58556
14d7b91
99f5e0a
b1d3809
050f214
f723840
da7aa99
4f9b0a7
7d52ed5
91d0075
5732fbd
6cb6d0b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2320,6 +2320,11 @@ Here are the configs regarding to RocksDB instance of the state store provider: | |
<td>Whether we perform a range compaction of RocksDB instance for commit operation</td> | ||
<td>False</td> | ||
</tr> | ||
<tr> | ||
<td>spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled</td> | ||
<td>Whether to upload changelog instead of snapshot during RocksDB StateStore commit</td> | ||
<td>False</td> | ||
</tr> | ||
<tr> | ||
<td>spark.sql.streaming.stateStore.rocksdb.blockSizeKB</td> | ||
<td>Approximate size in KB of user data packed per block for a RocksDB BlockBasedTable, which is a RocksDB's default SST file format.</td> | ||
|
@@ -2389,6 +2394,19 @@ If you want to cap RocksDB memory usage in your Spark Structured Streaming deplo | |
You can also determine the max allowed memory for RocksDB instances by setting the `spark.sql.streaming.stateStore.rocksdb.maxMemoryUsageMB` value to a static number or as a fraction of the physical memory available on the node. | ||
Limits for individual RocksDB instances can also be configured by setting `spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB` and `spark.sql.streaming.stateStore.rocksdb.maxWriteBufferNumber` to the required values. By default, RocksDB internal defaults are used for these settings. | ||
|
||
##### RocksDB State Store Changelog Checkpointing | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we provide higher-level of description how this works? We even don't explain what does changelog means. I understand we have no explanation for changelog checkpointing for HDFS backed state store provider which is unfortunate, but for RocksDB state store provider, users have to make a decision whether to use old one (incremental checkpointing) or new one, which requires them to understand the characteristics of two options before choosing one. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Drafted a newer version with chatgpt, PTAL. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks OK :) |
||
In newer version of Spark, changelog checkpointing is introduced for RocksDB state store. The traditional checkpointing mechanism for RocksDB State Store is incremental snapshot checkpointing, where the manifest files and newly generated RocksDB SST files of RocksDB instances are uploaded to a durable storage. | ||
Instead of uploading data files of RocksDB instances, changelog checkpointing uploads changes made to the state since the last checkpoint for durability. | ||
Snapshots are persisted periodically in the background for predictable failure recovery and changelog trimming. | ||
Changelog checkpointing avoids cost of capturing and uploading snapshots of RocksDB instances and significantly reduce streaming query latency. | ||
|
||
Changelog checkpointing is disabled by default. You can enable RocksDB State Store changelog checkpointing by setting `spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled` config to `true`. | ||
Changelog checkpointing is designed to be backward compatible with traditional checkpointing mechanism. | ||
RocksDB state store provider offers seamless support for transitioning between two checkpointing mechanisms in both directions. This allows you to leverage the performance benefits of changelog checkpointing without discarding the old state checkpoint. | ||
In a version of spark that supports changelog checkpointing, you can migrate streaming queries from older versions of Spark to changelog checkpointing by enabling changelog checkpointing in the spark session. | ||
Vice versa, you can disable changelog checkpointing safely in newer version of Spark, then any query that already run with changelog checkpointing will switch back to traditional checkpointing. | ||
You would need to restart you streaming queries for change in checkpointing mechanism to be applied, but you won't observe any performance degrade in the process. | ||
|
||
##### Performance-aspect considerations | ||
|
||
1. You may want to disable the track of total number of rows to aim the better performance on RocksDB state store. | ||
|
Uh oh!
There was an error while loading. Please reload this page.