Skip to content

[to dev/1.3] Add more checkpoints in series scan #15914

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import static org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.appendAggregationResult;
Expand Down Expand Up @@ -140,7 +141,11 @@ public TsBlock next() throws Exception {

// calculate aggregation result on current time window
// Keep curTimeRange if the calculation of this timeRange is not done
if (calculateAggregationResultForCurrentTimeRange()) {
Optional<Boolean> b = calculateAggregationResultForCurrentTimeRange();
if (!b.isPresent()) {
continue;
}
if (b.get()) {
curTimeRange = null;
}
}
Expand All @@ -164,41 +169,58 @@ public boolean isFinished() throws Exception {

@SuppressWarnings("squid:S112")
/** Return true if we have the result of this timeRange. */
protected boolean calculateAggregationResultForCurrentTimeRange() {
protected Optional<Boolean> calculateAggregationResultForCurrentTimeRange() {
try {
if (calcFromCachedData()) {
updateResultTsBlock();
return true;
return Optional.of(true);
}

if (readAndCalcFromPage()) {
updateResultTsBlock();
return true;
return Optional.of(true);
}

// only when all the page data has been consumed, we need to read the chunk data
if (!seriesScanUtil.hasNextPage() && readAndCalcFromChunk()) {
updateResultTsBlock();
return true;
return Optional.of(true);
}

// only when all the page and chunk data has been consumed, we need to read the file data
if (!seriesScanUtil.hasNextPage()
&& !seriesScanUtil.hasNextChunk()
&& readAndCalcFromFile()) {
updateResultTsBlock();
return true;
Optional<Boolean> b;
if (!seriesScanUtil.hasNextPage()) {
b = seriesScanUtil.hasNextChunk();
if (!b.isPresent()) {
return b;
}
if (!b.get() && readAndCalcFromFile()) {
updateResultTsBlock();
return Optional.of(true);
}
}

// If the TimeRange is (Long.MIN_VALUE, Long.MAX_VALUE), for Aggregators like countAggregator,
// we have to consume all the data before we finish the aggregation calculation.
if (seriesScanUtil.hasNextPage()
|| seriesScanUtil.hasNextChunk()
|| seriesScanUtil.hasNextFile()) {
return false;
if (seriesScanUtil.hasNextPage()) {
return Optional.of(false);
}
b = seriesScanUtil.hasNextChunk();
if (!b.isPresent()) {
return b;
}
if (b.get()) {
return Optional.of(false);
}
b = seriesScanUtil.hasNextFile();
if (!b.isPresent()) {
return b;
}
if (b.get()) {
return Optional.of(false);
}
updateResultTsBlock();
return true;
return Optional.of(true);
} catch (IOException e) {
throw new RuntimeException("Error while scanning the file", e);
}
Expand Down Expand Up @@ -241,7 +263,14 @@ protected void calcFromStatistics(Statistics timeStatistics, Statistics[] valueS
protected boolean readAndCalcFromFile() throws IOException {
// start stopwatch
long start = System.nanoTime();
while (System.nanoTime() - start < leftRuntimeOfOneNextCall && seriesScanUtil.hasNextFile()) {
while (System.nanoTime() - start < leftRuntimeOfOneNextCall) {
Optional<Boolean> b = seriesScanUtil.hasNextFile();
if (!b.isPresent()) {
continue;
}
if (!b.get()) {
break;
}
if (canUseStatistics && seriesScanUtil.canUseCurrentFileStatistics()) {
Statistics fileTimeStatistics = seriesScanUtil.currentFileTimeStatistics();
if (fileTimeStatistics.getStartTime() > curTimeRange.getMax()) {
Expand Down Expand Up @@ -282,7 +311,14 @@ protected boolean readAndCalcFromFile() throws IOException {
protected boolean readAndCalcFromChunk() throws IOException {
// start stopwatch
long start = System.nanoTime();
while (System.nanoTime() - start < leftRuntimeOfOneNextCall && seriesScanUtil.hasNextChunk()) {
while (System.nanoTime() - start < leftRuntimeOfOneNextCall) {
Optional<Boolean> b = seriesScanUtil.hasNextChunk();
if (!b.isPresent()) {
continue;
}
if (!b.get()) {
break;
}
if (canUseStatistics && seriesScanUtil.canUseCurrentChunkStatistics()) {
Statistics chunkTimeStatistics = seriesScanUtil.currentChunkTimeStatistics();
if (chunkTimeStatistics.getStartTime() > curTimeRange.getMax()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

public abstract class AbstractSeriesScanOperator extends AbstractDataSourceOperator {
Expand Down Expand Up @@ -70,10 +71,19 @@ public boolean hasNext() throws Exception {
* 2. consume chunk data secondly
* 3. consume next file finally
*/
if (!readPageData() && !readChunkData() && !readFileData()) {
noMoreData = true;
break;
if (readPageData()) {
continue;
}
Optional<Boolean> b = readChunkData();
if (!b.isPresent() || b.get()) {
continue;
}
b = readFileData();
if (!b.isPresent() || b.get()) {
continue;
}
noMoreData = true;
break;

} while (System.nanoTime() - start < maxRuntime
&& !resultTsBlockBuilder.isFull()
Expand All @@ -87,22 +97,28 @@ public boolean hasNext() throws Exception {
}
}

private boolean readFileData() throws IOException {
while (seriesScanUtil.hasNextFile()) {
if (readChunkData()) {
return true;
}
protected Optional<Boolean> readFileData() throws IOException {
Optional<Boolean> b = seriesScanUtil.hasNextFile();
if (!b.isPresent() || !b.get()) {
return b;
}
return false;
b = readChunkData();
if (!b.isPresent() || b.get()) {
return b;
}
return Optional.empty();
}

private boolean readChunkData() throws IOException {
while (seriesScanUtil.hasNextChunk()) {
if (readPageData()) {
return true;
}
protected Optional<Boolean> readChunkData() throws IOException {
Optional<Boolean> b = seriesScanUtil.hasNextChunk();
if (!b.isPresent() || !b.get()) {
return b;
}
return false;

if (readPageData()) {
return Optional.of(true);
}
return Optional.empty();
}

private boolean readPageData() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,14 @@ protected DescPriorityMergeReader getDescPriorityMergeReader() {
// file level methods
/////////////////////////////////////////////////////////////////////////////////////////////////

public boolean hasNextFile() throws IOException {
// When Optional.empty() is returned, it means that the current hasNextFile has not been fully
// executed. In order to avoid the execution time of this method exceeding the allocated time
// slice, it is return early in this way. For the upper-level method, when encountering
// Optional.empty(), it needs to return directly to the checkpoint method that checks the operator
// execution time slice.
public Optional<Boolean> hasNextFile() throws IOException {
if (!paginationController.hasCurLimit()) {
return false;
return Optional.of(false);
}

if (!unSeqPageReaders.isEmpty()
Expand All @@ -220,21 +225,25 @@ public boolean hasNextFile() throws IOException {
}

if (firstTimeSeriesMetadata != null) {
return true;
return Optional.of(true);
}

while (firstTimeSeriesMetadata == null
&& (orderUtils.hasNextSeqResource()
|| orderUtils.hasNextUnseqResource()
|| !seqTimeSeriesMetadata.isEmpty()
|| !unSeqTimeSeriesMetadata.isEmpty())) {
boolean checked = false;
if (orderUtils.hasNextSeqResource()
|| orderUtils.hasNextUnseqResource()
|| !seqTimeSeriesMetadata.isEmpty()
|| !unSeqTimeSeriesMetadata.isEmpty()) {
// init first time series metadata whose startTime is minimum
tryToUnpackAllOverlappedFilesToTimeSeriesMetadata();
// filter file based on push-down conditions
filterFirstTimeSeriesMetadata();
checked = true;
}

return firstTimeSeriesMetadata != null;
if (checked && firstTimeSeriesMetadata == null) {
return Optional.empty();
}
return Optional.of(firstTimeSeriesMetadata != null);
}

private boolean currentFileOverlapped() {
Expand Down Expand Up @@ -278,11 +287,16 @@ public void skipCurrentFile() {
* This method should be called after hasNextFile() until no next chunk, make sure that all
* overlapped chunks are consumed.
*
* @return Optional<Boolean> When Optional.empty() is returned, it means that the current
* hasNextFile has not been fully executed. In order to avoid the execution time of this
* method exceeding the allocated time slice, it is return early in this way. For the
* upper-level method, when encountering Optional.empty(), it needs to return directly to the
* checkpoint method who checks the operator execution time slice.
* @throws IllegalStateException illegal state
*/
public boolean hasNextChunk() throws IOException {
public Optional<Boolean> hasNextChunk() throws IOException {
if (!paginationController.hasCurLimit()) {
return false;
return Optional.of(false);
}

if (!unSeqPageReaders.isEmpty()
Expand All @@ -298,18 +312,28 @@ public boolean hasNextChunk() throws IOException {
}

if (firstChunkMetadata != null) {
return true;
return Optional.of(true);
// hasNextFile() has not been invoked
} else if (firstTimeSeriesMetadata == null && cachedChunkMetadata.isEmpty()) {
return false;
return Optional.of(false);
}

while (firstChunkMetadata == null && (!cachedChunkMetadata.isEmpty() || hasNextFile())) {
Optional<Boolean> hasNextFileReturnValue = null;
while (firstChunkMetadata == null) {
if (cachedChunkMetadata.isEmpty()) {
if (hasNextFileReturnValue != null) {
return Optional.empty();
}
hasNextFileReturnValue = hasNextFile();
if (!hasNextFileReturnValue.isPresent() || !hasNextFileReturnValue.get()) {
return hasNextFileReturnValue;
}
}
initFirstChunkMetadata();
// filter chunk based on push-down conditions
filterFirstChunkMetadata();
}
return firstChunkMetadata != null;
return Optional.of(firstChunkMetadata != null);
}

private void filterFirstChunkMetadata() {
Expand Down Expand Up @@ -1051,15 +1075,21 @@ private void tryToUnpackAllOverlappedFilesToTimeSeriesMetadata() throws IOExcept
/*
* Fill sequence TimeSeriesMetadata List until it is not empty
*/
while (seqTimeSeriesMetadata.isEmpty() && orderUtils.hasNextSeqResource()) {
unpackSeqTsFileResource();
if (seqTimeSeriesMetadata.isEmpty() && orderUtils.hasNextSeqResource()) {
// Avoid exceeding the time slice when a series cannot be found
if (!unpackSeqTsFileResource().isPresent()) {
return;
}
}

/*
* Fill unSequence TimeSeriesMetadata Priority Queue until it is not empty
*/
while (unSeqTimeSeriesMetadata.isEmpty() && orderUtils.hasNextUnseqResource()) {
unpackUnseqTsFileResource();
if (unSeqTimeSeriesMetadata.isEmpty() && orderUtils.hasNextUnseqResource()) {
// Avoid exceeding the time slice when a series cannot be found
if (!unpackUnseqTsFileResource().isPresent()) {
return;
}
}

/*
Expand Down Expand Up @@ -1166,13 +1196,16 @@ private Optional<ITimeSeriesMetadata> unpackSeqTsFileResource() throws IOExcepti
}
}

private void unpackUnseqTsFileResource() throws IOException {
private Optional<ITimeSeriesMetadata> unpackUnseqTsFileResource() throws IOException {
ITimeSeriesMetadata timeseriesMetadata =
loadTimeSeriesMetadata(orderUtils.getNextUnseqFileResource(true), false);
// skip if data type is mismatched which may be caused by delete
if (timeseriesMetadata != null && timeseriesMetadata.typeMatch(getTsDataTypeList())) {
timeseriesMetadata.setSeq(false);
unSeqTimeSeriesMetadata.add(timeseriesMetadata);
return Optional.of(timeseriesMetadata);
} else {
return Optional.empty();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;

public class SeriesDataBlockReader implements IDataBlockReader {
Expand Down Expand Up @@ -133,7 +134,14 @@ public boolean hasNextBatch() throws IOException {
/*
* consume next file finally
*/
while (seriesScanUtil.hasNextFile()) {
while (true) {
Optional<Boolean> b = seriesScanUtil.hasNextFile();
if (!b.isPresent()) {
continue;
}
if (!b.get()) {
break;
}
if (readChunkData()) {
hasCachedBatchData = true;
return true;
Expand All @@ -157,7 +165,15 @@ public void close() throws IOException {
}

private boolean readChunkData() throws IOException {
while (seriesScanUtil.hasNextChunk()) {
while (true) {
Optional<Boolean> b = seriesScanUtil.hasNextChunk();
if (!b.isPresent()) {
// This reader is used for compaction, just keep traversing
continue;
}
if (!b.get()) {
break;
}
if (readPageData()) {
return true;
}
Expand Down
Loading
Loading