Skip to content

Commit fd79507

Browse files
committed
fix: avoid unnecessary reload of file data by file_views()
The file_views function in EagerSnapshot now just passes thrwough to Snapshot like it should, rather than attempting to load redundant state from the log store. I believe this was likely just left over code from the yucky time between 0.26.x and 0.30.0 when we were busy smashing APIs together to get log replay with delta kernel to work. There are number of other test failures that I had to fix because the underlying storage was disappearing in temp directories before the file_views() function was invoked causing FileNotFound errors. See #3953 Signed-off-by: R. Tyler Croy <rtyler@brokenco.de>
1 parent 055093c commit fd79507

File tree

3 files changed

+31
-42
lines changed

3 files changed

+31
-42
lines changed

crates/core/src/delta_datafusion/mod.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1810,10 +1810,9 @@ mod tests {
18101810
assert_eq!("a", small.iter().next().unwrap().unwrap());
18111811

18121812
let expected = vec![
1813+
ObjectStoreOperation::Get(LocationType::Commit),
18131814
ObjectStoreOperation::GetRange(LocationType::Data, 957..965),
18141815
ObjectStoreOperation::GetRange(LocationType::Data, 326..957),
1815-
#[expect(clippy::single_range_in_vec_init)]
1816-
ObjectStoreOperation::GetRanges(LocationType::Data, vec![4..46]),
18171816
];
18181817
let mut actual = Vec::new();
18191818
operations.recv_many(&mut actual, 3).await;

crates/core/src/kernel/snapshot/mod.rs

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -665,24 +665,7 @@ impl EagerSnapshot {
665665
log_store: &dyn LogStore,
666666
predicate: Option<PredicateRef>,
667667
) -> BoxStream<'_, DeltaResult<LogicalFileView>> {
668-
if !self.snapshot.load_config().require_files {
669-
return self.snapshot.file_views(log_store, predicate);
670-
}
671-
self.snapshot
672-
.files_from(
673-
log_store,
674-
predicate,
675-
self.version() as u64,
676-
Box::new(self.files.clone().into_iter()),
677-
None,
678-
)
679-
.map_ok(|batch| {
680-
futures::stream::iter(0..batch.num_rows()).map(move |idx| {
681-
Ok::<_, DeltaTableError>(LogicalFileView::new(batch.clone(), idx))
682-
})
683-
})
684-
.try_flatten()
685-
.boxed()
668+
self.snapshot.file_views(log_store, predicate)
686669
}
687670

688671
#[deprecated(since = "0.29.0", note = "Use `files` with kernel predicate instead.")]

crates/core/src/operations/convert_to_delta.rs

Lines changed: 29 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -570,9 +570,9 @@ mod tests {
570570
partition_schema: Vec<StructField>,
571571
// Whether testing on object store or path
572572
from_path: bool,
573-
) -> DeltaTable {
574-
let temp_dir = tempdir().expect("Failed to create a temp directory");
575-
let temp_dir = temp_dir
573+
) -> (tempfile::TempDir, DeltaTable) {
574+
let root = tempdir().expect("Failed to create a temp directory");
575+
let temp_dir = root
576576
.path()
577577
.to_str()
578578
.expect("Failed to convert Path to string slice");
@@ -585,20 +585,24 @@ mod tests {
585585
} else {
586586
ConvertToDeltaBuilder::new().with_log_store(log_store(temp_dir))
587587
};
588-
builder
589-
.with_partition_schema(partition_schema)
590-
.await
591-
.unwrap_or_else(|e| {
592-
panic!("Failed to convert to Delta table. Location: {path}. Error: {e}")
593-
})
588+
(
589+
root,
590+
builder
591+
.with_partition_schema(partition_schema)
592+
.await
593+
.unwrap_or_else(|e| {
594+
panic!("Failed to convert to Delta table. Location: {path}. Error: {e}")
595+
}),
596+
)
594597
}
595598

596599
async fn open_created_delta_table(
597600
path: &str,
598601
partition_schema: Vec<StructField>,
599-
) -> DeltaTable {
600-
let temp_dir = tempdir().expect("Failed to create a temp directory");
601-
let temp_dir = temp_dir
602+
) -> (tempfile::TempDir, DeltaTable) {
603+
// The [TempDir] has to be returned so that it stays in scoipe until the function completes
604+
let root = tempdir().expect("Failed to create a temp directory");
605+
let temp_dir = root
602606
.path()
603607
.to_str()
604608
.expect("Failed to convert to string slice");
@@ -612,7 +616,10 @@ mod tests {
612616
panic!("Failed to convert to Delta table. Location: {path}. Error: {e}")
613617
});
614618
let table_uri = url::Url::from_directory_path(std::path::Path::new(temp_dir)).unwrap();
615-
open_table(table_uri).await.expect("Failed to open table")
619+
(
620+
root,
621+
open_table(table_uri).await.expect("Failed to open table"),
622+
)
616623
}
617624

618625
async fn assert_delta_table(
@@ -690,7 +697,7 @@ mod tests {
690697
#[tokio::test]
691698
async fn test_convert_to_delta() {
692699
let path = "../test/tests/data/delta-0.8.0-date";
693-
let table = create_delta_table(path, Vec::new(), false).await;
700+
let (_tmp, table) = create_delta_table(path, Vec::new(), false).await;
694701
let action = table
695702
.get_active_add_actions_by_partitions(&[])
696703
.next()
@@ -726,7 +733,7 @@ mod tests {
726733
.await;
727734

728735
let path = "../test/tests/data/delta-0.8.0-null-partition";
729-
let table = create_delta_table(
736+
let (_tmp2, table) = create_delta_table(
730737
path,
731738
vec![schema_field("k", PrimitiveType::String, true)],
732739
false,
@@ -751,7 +758,7 @@ mod tests {
751758
).await;
752759

753760
let path = "../test/tests/data/delta-0.8.0-special-partition";
754-
let table = create_delta_table(
761+
let (_tmp3, table) = create_delta_table(
755762
path,
756763
vec![schema_field("x", PrimitiveType::String, true)],
757764
false,
@@ -779,7 +786,7 @@ mod tests {
779786
.await;
780787

781788
let path = "../test/tests/data/delta-0.8.0-partitioned";
782-
let table = create_delta_table(
789+
let (_tmp4, table) = create_delta_table(
783790
path,
784791
vec![
785792
schema_field("day", PrimitiveType::String, true),
@@ -834,7 +841,7 @@ mod tests {
834841
#[tokio::test]
835842
async fn test_open_created_delta_table() {
836843
let path = "../test/tests/data/delta-0.2.0";
837-
let table = open_created_delta_table(path, Vec::new()).await;
844+
let (_tmp, table) = open_created_delta_table(path, Vec::new()).await;
838845
assert_delta_table(
839846
table,
840847
path,
@@ -854,7 +861,7 @@ mod tests {
854861
.await;
855862

856863
let path = "../test/tests/data/delta-0.8-empty";
857-
let table = open_created_delta_table(path, Vec::new()).await;
864+
let (_tmp2, table) = open_created_delta_table(path, Vec::new()).await;
858865
assert_delta_table(
859866
table,
860867
path,
@@ -869,7 +876,7 @@ mod tests {
869876
.await;
870877

871878
let path = "../test/tests/data/delta-0.8.0";
872-
let table = open_created_delta_table(path, Vec::new()).await;
879+
let (_tmp3, table) = open_created_delta_table(path, Vec::new()).await;
873880
assert_delta_table(
874881
table,
875882
path,
@@ -889,7 +896,7 @@ mod tests {
889896
#[tokio::test]
890897
async fn test_convert_to_delta_from_path() {
891898
let path = "../test/tests/data/delta-2.2.0-partitioned-types";
892-
let table = create_delta_table(
899+
let (_tmp, table) = create_delta_table(
893900
path,
894901
vec![
895902
schema_field("c1", PrimitiveType::Integer, true),
@@ -927,7 +934,7 @@ mod tests {
927934
.await;
928935

929936
let path = "../test/tests/data/delta-0.8.0-numeric-partition";
930-
let table = create_delta_table(
937+
let (_tmp2, table) = create_delta_table(
931938
path,
932939
vec![
933940
schema_field("x", PrimitiveType::Long, true),

0 commit comments

Comments
 (0)