Skip to content

Commit f08816f

Browse files
committed
WIP
1 parent 839acdd commit f08816f

File tree

1 file changed

+23
-17
lines changed

1 file changed

+23
-17
lines changed

kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/PaginatedAddFilesIteratorImpl.java

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,18 @@ public class PaginatedAddFilesIteratorImpl implements PaginatedAddFilesIterator
1414
private final long pageSize; // max num of files to return in this page
1515

1616
private long numAddFilesReturned = 0;
17-
private String lastLogFileName = null; // when reading first page, lastLogFileName is absent
18-
private long rowIdxInLastFile = 0;
17+
private String currentLogFileName = null; // when reading first page, lastLogFileName is absent
18+
private long currentRowIdxInLastFile = 0;
1919
private FilteredColumnarBatch nextBatch = null;
20+
private String startingLogFileName;
21+
private long startingRowIdxInLastFile;
2022

2123
public PaginatedAddFilesIteratorImpl(
2224
Iterator<FilteredColumnarBatch> originalIterator, PaginationContext paginationContext) {
2325
this.originalIterator = originalIterator;
2426
this.pageSize = paginationContext.pageSize;
25-
this.lastLogFileName = paginationContext.lastReadLogFileName;
26-
this.rowIdxInLastFile = paginationContext.lastReadRowIdxInFile;
27+
this.startingLogFileName = paginationContext.lastReadLogFileName;
28+
this.startingRowIdxInLastFile = paginationContext.lastReadRowIdxInFile;
2729
}
2830

2931
@Override
@@ -34,25 +36,29 @@ public boolean hasNext() {
3436
if (numAddFilesReturned >= pageSize) {
3537
return false;
3638
}
37-
if (originalIterator.hasNext()) {
38-
FilteredColumnarBatch batch = originalIterator.next();
39-
String fileName = batch.getFileName(); // TODO: get parquet reader PR merged first
40-
if (!fileName.equals(lastLogFileName)) {
41-
lastLogFileName = fileName;
39+
while (originalIterator.hasNext()) {
40+
nextBatch = originalIterator.next();
41+
String fileName = nextBatch.getFileName(); // TODO: get parquet reader PR merged first
42+
if (!fileName.equals(currentLogFileName)) {
43+
currentLogFileName = fileName;
4244
System.out.println("fileName " + fileName);
43-
rowIdxInLastFile = 0; // row idx starts from 1
45+
currentRowIdxInLastFile = 0;// row idx starts from 1
4446
}
45-
long numActiveAddFiles = batch.getNumOfTrueRows();
46-
long rowNum =
47-
batch.getData().getSize(); // number of rows, if 5 AddFile and 7 RemoveFile -> this is 12.
47+
long numActiveAddFiles = nextBatch.getNumOfTrueRows();
48+
long rowNum = nextBatch.getData().getSize();
49+
currentRowIdxInLastFile += rowNum;
4850

4951
System.out.println("numActiveAddFiles: " + numActiveAddFiles);
50-
System.out.println("numTotalAddFiles: " + batch.getData().getColumnVector(0).getSize());
52+
System.out.println("numTotalAddFiles: " + nextBatch.getData().getColumnVector(0).getSize());
5153
System.out.println("numOfRows: " + rowNum);
5254

53-
nextBatch = batch;
55+
if(currentLogFileName.compareTo(startingLogFileName) < 0 ||
56+
(currentLogFileName.equals(startingLogFileName) && currentRowIdxInLastFile < startingRowIdxInLastFile)) {
57+
//skip this batch
58+
nextBatch = originalIterator.next();
59+
continue;
60+
}
5461
numAddFilesReturned += numActiveAddFiles;
55-
rowIdxInLastFile += rowNum;
5662
System.out.println("numAddFilesReturned: " + numAddFilesReturned);
5763
return true;
5864
}
@@ -78,6 +84,6 @@ public void close() throws IOException {
7884
}
7985

8086
public Row getCurrentPageToken() {
81-
return new PageToken(lastLogFileName, rowIdxInLastFile).getRow();
87+
return new PageToken(currentLogFileName, currentRowIdxInLastFile).getRow();
8288
}
8389
}

0 commit comments

Comments
 (0)