-
Notifications
You must be signed in to change notification settings - Fork 514
feat(optimize): guarantee global sorting in OPTIMIZE output #3539
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
ACTION NEEDED delta-rs follows the Conventional Commits specification for release automation. The PR title and description are used as the merge commit message. Please update your PR title and description to match the specification. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fine by me to add this, I can see the value. Just left a couple comments
python/src/lib.rs
Outdated
@@ -572,6 +574,8 @@ impl RawDeltaTable { | |||
writer_properties: Option<PyWriterProperties>, | |||
commit_properties: Option<PyCommitProperties>, | |||
post_commithook_properties: Option<PyPostCommitHookProperties>, | |||
sort_enabled: bool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This flag is unnecessary, passing cols should be enough to have it be disabled or not, same applies for the rust builder. Just let some() is enough to trigger a different match arm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, I think this flag is unnecessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed, I made that change
python/tests/conftest.py
Outdated
try: | ||
from arro3.core import Array, DataType, Field, Schema, Table | ||
except ImportError: | ||
pytest.skip("arro3 not installed; skipping Python deltarlake tests", allow_module_level=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Arro3 is required so this try except should exist
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit also deltalake is spelled incorrectly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ion-elgreco could you clarify your comment concerning the try except. Do you want me to revert it or are you ok with it the way I have it? (besides the spelling)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not entirely convinced this PR necessary. Why is doing a z-order not appropriate with the optimize? I don't really agree it should be "just done" for the user, even Databricks doesn't automatically order the output, it requires a ZORDER clause in a SQL statement.
futures::future::ready(Ok(batch_stream)), | ||
)); | ||
util::flatten_join_error(rewrite_result) | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This else branch looks like it's a complicated way to do a z-order as below. I might be misunderstanding, could you explain what is happening here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That else block under if !self.sort_enabled is not doing Z-ordering at all, but I added new global sort path for
compaction:
- We spin up a DataFusion runtime (with a spillable memory pool).
- Register the Delta object store under delta-rs://
- Build a DataFrame over exactly the same files in this merge‐bin via DeltaTableProvider
- Call df.sort(sort_columns…) on the user-supplied keys
- Turn the sorted plan back into a Stream
- Hand that stream into our existing rewrite_files() writer, so all output files are globally ordered.
The actual Z-order logic lives separately in the OptimizeOperations::ZOrder branch (around line 917) where we invoke a custom ZOrderUDF. I’ve expanded the comment above the else-block in optimize.rs to spell this out. Hope that clears it up—please let me know if you’d like any further tweaks!
python/src/lib.rs
Outdated
@@ -572,6 +574,8 @@ impl RawDeltaTable { | |||
writer_properties: Option<PyWriterProperties>, | |||
commit_properties: Option<PyCommitProperties>, | |||
post_commithook_properties: Option<PyPostCommitHookProperties>, | |||
sort_enabled: bool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, I think this flag is unnecessary.
python/tests/conftest.py
Outdated
try: | ||
from arro3.core import Array, DataType, Field, Schema, Table | ||
except ImportError: | ||
pytest.skip("arro3 not installed; skipping Python deltarlake tests", allow_module_level=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit also deltalake is spelled incorrectly
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #3539 +/- ##
===========================================
- Coverage 74.23% 21.03% -53.21%
===========================================
Files 150 76 -74
Lines 44739 13119 -31620
Branches 44739 13119 -31620
===========================================
- Hits 33213 2759 -30454
- Misses 9384 10076 +692
+ Partials 2142 284 -1858 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
// user-specified `sort_columns`, and then rewrite pages in that | ||
// sorted order. This guarantees monotonic ordering across all | ||
// output files. Note: this is a standard DataFusion `sort(...)` | ||
// operation, not Z-ordering (see the ZOrder branch below). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So judging by the comment which I might be misunderstanding, this rewrites the entire table?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for flagging that, your reading is correct that the original wording made it sound like we were rewriting the entire table. I’ve updated the comment to make it more clear that we only build a DataFusion plan and rewrite the files in the current compaction batch in sorted order (not the full table)
// Asynchronous DataFrame-based sorted reader | ||
let read_sorted = async move { | ||
let memory_pool = FairSpillPool::new(max_spill_size); | ||
let runtime = RuntimeEnvBuilder::new() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You shouldn't need this machinery shouldn't session ctx and proper object stores already be registered?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your correct. The DeltaTableProvider already carries the correct object store, so we don’t need to spin up a custom RuntimeEnv or manually register anything.
use tempfile::TempDir; | ||
|
||
#[tokio::test] | ||
async fn test_optimize_global_sort_enabled() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know if this test does what we need here. This shows a single partition is sorted, which is fine, but that's the same as a Z-order. I think you'd need to partition the table and add a bit more data in order to validate it sorts correctly across partitions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, thanks for the feedback. Ill put something together
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for catching that. I’ve updated test_optimize_global_sort_enabled so it creates the table partitioned on objectId, now when we call
.optimize().with_sort_columns(&["objectId","dateTime"]), it will scan multiple partitions, globally sort across them, and rewrite each partition in order.
This ensures we’re truly testing cross-partition sorting
.map_err(|e| DeltaTableError::Generic(e.to_string()))?; | ||
let sort_exprs = sort_cols | ||
.iter() | ||
.map(|c| col(c).sort(true, true)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if I don't want the order to be ascending and nulls first?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, Ive updated the code so you can decide if you want ascending or not and how nulls are handled
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Circling back on this, sorry, but it's not clear to me, could you explain what problem you are trying to solve here in more detail? I'm not convinced this PR is necessary and there seem to be a few places where this could produce unintended results and take lots of extra compute time. If you have a specific use case or problem it'd be helpful to know. I ask because your PR seems very tailored towards your data and there maybe a way to accomplish what you are looking for without this PR.
@hntd187 Put another way, everything you’d otherwise have to script yourself gets pulled into the existing Optimize API no extra scans, no schema, and no replace‐where gymnastics. It could be I am overlooking something. I'm open to suggestions! |
1b07837
to
2041740
Compare
…Time) - Introduce and flags in (default: true) - Add builder methods and to toggle sorting - Update signature to accept new sort flags - Propagate sort flags through , wiring in a DataFusion global sort path - Add unit tests and integration tests verifying file ordering - Expose for tests] Signed-off-by: tolleybot <> Signed-off-by: Don Tolley <[email protected]>
Signed-off-by: tolleybot <> Signed-off-by: Don Tolley <[email protected]>
Signed-off-by: tolleybot <> Signed-off-by: Don Tolley <[email protected]>
…le initialization Signed-off-by: tolleybot <> Signed-off-by: Don Tolley <[email protected]>
…t import in conftest Signed-off-by: tolleybot <> Signed-off-by: Don Tolley <[email protected]>
…as previoulsy intended Signed-off-by: tolleybot <> Signed-off-by: Don Tolley <[email protected]>
Signed-off-by: tolleybot <> Signed-off-by: Don Tolley <[email protected]>
Align Python binding for OptimizeBuilder so that now defaults to , matching the Rust default. Update to expect the unsorted output by default and explicitly opt in to sorting in the tests. Signed-off-by: tolleybot <> Signed-off-by: Don Tolley <[email protected]>
Default sort_columns to empty Vec so columns aren't hardcoded; users must opt-in via with_sort_columns(). Update tests accordingly. Signed-off-by: tolleybot <> Signed-off-by: Don Tolley <[email protected]>
Signed-off-by: tolleybot <> Signed-off-by: Don Tolley <[email protected]>
Signed-off-by: tolleybot <> Signed-off-by: Don Tolley <[email protected]>
…ht, restoring original behavior Signed-off-by: tolleybot <> Signed-off-by: Don Tolley <[email protected]>
…he current compaction batch’s files are globally sorted and rewritten, not the entire table. Signed-off-by: tolleybot <> Signed-off-by: Don Tolley <[email protected]>
… true cross-partition sorting test Signed-off-by: tolleybot <> Signed-off-by: Don Tolley <[email protected]>
The global-sort path no longer needs a custom RuntimeEnv or manual register_object_store calls. Instead, rely on a plain SessionContext and the DeltaTableProvider (which already carries the correct object store) for all file access and sorting. Signed-off-by: tolleybot <> Signed-off-by: Don Tolley <[email protected]>
The high-level Python API was missing the new sort_enabled and sort_columns arguments on TableOptimizer.compact(), causing a TypeError when calling compact(sort_enabled=True). This change adds those parameters (with defaults matching the Rust binding) and forwards them through to compact_optimize. Signed-off-by: tolleybot <> Signed-off-by: Don Tolley <[email protected]>
DataFusion’s `col(…)` function lowercases unquoted identifiers, causing CamelCase columns (e.g. `objectId`) to be transformed to "objectid" and break schema resolution during global compaction sorting. This change swaps out `col(c)` for `ident(c)` so identifiers are passed through verbatim: * Replace `use datafusion::logical_expr::col;` with `use datafusion::logical_expr::ident;` * Change `.map(|c| col(c).sort(true, true))` to `.map(|c| ident(c).sort(true, true))` in optimize.rs With `ident`, case-sensitive column names resolve correctly and the “No field named objectid” error is eliminated. Signed-off-by: tolleybot <> Signed-off-by: Don Tolley <[email protected]>
- When sort_enabled is true, collect all active files per partition into a single MergeBin instead of size-based bins - Introduce build_global_sort_plan to assemble these per-partition bins - Wire global sort plan into create_merge_plan for compact operations when sorting is enabled - Retain singleton partitions in the sort path (no longer pruning single-file bins) - Update metrics to reflect total files and partitions processed This ensures that each partition’s output is rewritten in a single sorted pass without altering the overall compaction workflow. Signed-off-by: tolleybot <> Signed-off-by: Don Tolley <[email protected]>
Signed-off-by: tolleybot <> Signed-off-by: Don Tolley <[email protected]>
2041740
to
f31be4d
Compare
…irst flags - Remove the boolean; sorting is now driven by non-empty - Add and to and - Propagate those flags into DataFusion’s calls instead of hardcoded - Update to infer sort from - Expose , , in Python’s and - Revise Python build instructions in to include venv setup and updated signature check - Add Rust unit tests for the new flags, and update Python tests to drive sorting via - Cleans up dead code and adjusts builder tests accordingly Signed-off-by: Don Tolley <[email protected]>
Hi, I work with @tolleybot and can hopefully provide some more context on why we want this feature. Our use case is that we use Delta Lake to store time-series data, and have consumers that want to read time series and store them in memory with a specific order (by object id then timestamp). We would like to minimise the amount of work these consumers need to do to reorder data. We can't guarantee a whole table or partition will always be globally ordered like this as data can be split across multiple Parquet files with no inherent order, and any append operations for recent data will break the ordering. But if batches of data that are read from Delta Lake are locally ordered it greatly reduces the work our consumers need to do. Currently we always write ordered data, but the compaction operation arbitrarily reorders data. For what it's worth, Iceberg allows setting a table order that's used by compaction (https://www.tabular.io/apache-iceberg-cookbook/data-engineering-table-write-order/), so I don't think this requirement is unique to us. |
FWIW I found this issue after reading https://blog.cloudflare.com/logexplorer-ga/ which specifically called out the need for this feature in upstream delta-rs, quoting the relevant section
Bringing it up as another data point for something like this feature being useful |
I appreciate the feedback, but it's still not clear to me what this is trying to solve that z-order does not solve? I noticed in the tests you are using dates, but they are string typed and not date types, are you sure that those types and z-order does not already solve this? I created several test tables with z-order to try they appear to also do what you are trying to do. |
Add to demonstrate that Z-order does not guarantee strict lexicographic ordering on timestamp columns, whereas does. Signed-off-by: Don Tolley <[email protected]>
…amp test Z-order on small timestamp datasets may coincide with lexicographic order. Focus test on verifying that global sort via always matches strict lex ordering. Signed-off-by: Don Tolley <[email protected]>
Signed-off-by: Don Tolley <[email protected]>
@hntd187 To make the difference clearer, I added a Python unit test, "test_python_zorder_vs_global_sort_on_ints", that writes a tiny DataFrame with two integer keys, runs both
and
and then compares the outputs: |
I checked your test out and slightly modified it to write 2 separate tables. They are physically the same and checksums show the methods produce a byte for byte identical output. I think whatever you are doing to re-read the tables back in is altering the ordering of the rows somehow, but on a physical file level they are identical. Is there something I'm missing here? |
I'm very interested in this, but as far as I can tell, z ordering should achieve the same? |
Can I get a copy of your test code. It would be helpful to try myself |
It's the same as your code, I just write 2 tables out instead of using the same table |
What this PR adds
Users who don’t pass sort_columns keep today’s behavior and performance. ⸻ Why Z-order ≠ global sort
Concrete exampleFor two tiny columns (objectId, seq) the layouts diverge: Row objectId (x) seq (y) Z-order bits → Z Resulting order
You can view both Z-order and global sort outputs side-by-side in our Python gist, making it easy to compare the differences at a glance. ⸻ Why we want global sort for our time-series tables
Z-order remains ideal when you need multi-column clustering (e.g., geo or IoT cubes). For our time-series use case, a deterministic, lexicographic sequence on (objectId, timestamp) is both faster to read and simpler to maintain. |
The example above does what you suggest, which makes sense, so I went back and looked at my original test that made identical output and I realized that depending on the type of the associated columns here change the sorting output. If you change the first column to be a string then the output is identical. So sometimes when z-order happens to match up with a global sort sometimes it doesn't. With this in mind, I'd say that this is very niche and specific to your use case. There is nothing stopping you from employing this strategy yourself, but I think adding this will confuse users on what they need and anyone who doesn't use delta-rs might also be confused by why this is here or might expect it to exist in other places. Additionally, this assumes you use delta-rs and only delta-rs (likely Databricks's own optimize or z-order will wipe this all out) so while I appreciate your effort to contribute this, I think it would be best to decline this PR. |
Closing this, as declined. |
feat(optimize): guarantee global sorting in OPTIMIZE output
After running
OPTIMIZE
on a Delta table, files were not guaranteed to beglobally sorted by
(objectId, dateTime)
, forcing downstream consumers toperform an extra sort. This change adds a global sort phase in the compaction
pipeline and exposes new flags to toggle or customize sorting.
Closes #3538