Skip to content

Commit 1b07837

Browse files
author
tolleybot
committed
optimize: support global ordering for sort-enabled compactions
- When sort_enabled is true, collect all active files per partition into a single MergeBin instead of size-based bins - Introduce build_global_sort_plan to assemble these per-partition bins - Wire global sort plan into create_merge_plan for compact operations when sorting is enabled - Retain singleton partitions in the sort path (no longer pruning single-file bins) - Update metrics to reflect total files and partitions processed This ensures that each partition’s output is rewritten in a single sorted pass without altering the overall compaction workflow.
1 parent 508fe0d commit 1b07837

File tree

1 file changed

+41
-1
lines changed

1 file changed

+41
-1
lines changed

crates/core/src/operations/optimize.rs

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,39 @@ pub struct Metrics {
8686
/// The order of records from source files is preserved
8787
pub preserve_insertion_order: bool,
8888
}
89+
/// Build a plan to globally sort all files per partition (for sort-enabled compaction)
90+
fn build_global_sort_plan(
91+
snapshot: &DeltaTableState,
92+
filters: &[PartitionFilter],
93+
) -> Result<(OptimizeOperations, Metrics), DeltaTableError> {
94+
let mut metrics = Metrics::default();
95+
// Collect all active add actions by partition
96+
let mut partition_map: HashMap<String, (IndexMap<String, Scalar>, MergeBin)> = HashMap::new();
97+
for add_res in snapshot.get_active_add_actions_by_partitions(filters)? {
98+
let add = add_res?;
99+
metrics.total_considered_files += 1;
100+
// Extract partition values and path
101+
let part_vals = add.partition_values()?;
102+
let partition_values = part_vals
103+
.clone()
104+
.into_iter()
105+
.map(|(k, v)| (k.to_string(), v))
106+
.collect::<IndexMap<_, _>>();
107+
let part_path = part_vals.hive_partition_path();
108+
// Insert or update the merge bin for this partition
109+
let entry = partition_map
110+
.entry(part_path)
111+
.or_insert_with(|| (partition_values, MergeBin::new()));
112+
entry.1.add(add.add_action());
113+
}
114+
// Build operations: one MergeBin per partition with all its files
115+
let mut operations: HashMap<String, (IndexMap<String, Scalar>, Vec<MergeBin>)> = HashMap::new();
116+
for (path, (partition_values, bin)) in partition_map {
117+
operations.insert(path, (partition_values, vec![bin]));
118+
}
119+
metrics.partitions_optimized = operations.len() as u64;
120+
Ok((OptimizeOperations::Compact(operations), metrics))
121+
}
89122

90123
// Unit tests for sorting flags in OptimizeBuilder and create_merge_plan
91124
#[cfg(test)]
@@ -1071,7 +1104,14 @@ pub fn create_merge_plan(
10711104
let partitions_keys = &snapshot.metadata().partition_columns;
10721105

10731106
let (operations, metrics) = match optimize_type {
1074-
OptimizeType::Compact => build_compaction_plan(snapshot, filters, target_size)?,
1107+
OptimizeType::Compact => {
1108+
if sort_enabled {
1109+
// Sorts all files per partition, regardless of bin size
1110+
build_global_sort_plan(snapshot, filters)?
1111+
} else {
1112+
build_compaction_plan(snapshot, filters, target_size)?
1113+
}
1114+
}
10751115
OptimizeType::ZOrder(zorder_columns) => {
10761116
build_zorder_plan(zorder_columns, snapshot, partitions_keys, filters)?
10771117
}

0 commit comments

Comments
 (0)