Skip to content

Commit 9eb9ecb

Browse files
xiaoxmengJoe-Abraham
authored andcommitted
Table scan cleanup (facebookincubator#9156)
Summary: Pull Request resolved: facebookincubator#9156 Reviewed By: kevinwilfong Differential Revision: D55086413 Pulled By: xiaoxmeng fbshipit-source-id: 60c557f7635e7eb498e17e46f1a15534d862b8bf
1 parent 67ace3d commit 9eb9ecb

File tree

6 files changed

+56
-59
lines changed

6 files changed

+56
-59
lines changed

velox/connectors/Connector.h

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -207,18 +207,17 @@ class DataSource {
207207

208208
virtual std::unordered_map<std::string, RuntimeCounter> runtimeStats() = 0;
209209

210-
// Returns true if 'this' has initiated all the prefetch this will
211-
// initiate. This means that the caller should schedule next splits
212-
// to prefetch in the background. false if the source does not
213-
// prefetch.
210+
/// Returns true if 'this' has initiated all the prefetch this will initiate.
211+
/// This means that the caller should schedule next splits to prefetch in the
212+
/// background. false if the source does not prefetch.
214213
virtual bool allPrefetchIssued() const {
215214
return false;
216215
}
217216

218-
// Initializes this from 'source'. 'source' is effectively moved
219-
// into 'this' Adaptation like dynamic filters stay in effect but
220-
// the parts dealing with open files, prefetched data etc. are moved. 'source'
221-
// is freed after the move.
217+
/// Initializes this from 'source'. 'source' is effectively moved into 'this'
218+
/// Adaptation like dynamic filters stay in effect but the parts dealing with
219+
/// open files, prefetched data etc. are moved. 'source' is freed after the
220+
/// move.
222221
virtual void setFromDataSource(std::unique_ptr<DataSource> /*source*/) {
223222
VELOX_UNSUPPORTED("setFromDataSource");
224223
}
@@ -392,7 +391,7 @@ class Connector {
392391
const std::string& scanId,
393392
int32_t loadQuantum);
394393

395-
virtual folly::Executor* FOLLY_NULLABLE executor() const {
394+
virtual folly::Executor* executor() const {
396395
return nullptr;
397396
}
398397

@@ -422,7 +421,7 @@ class ConnectorFactory {
422421
virtual std::shared_ptr<Connector> newConnector(
423422
const std::string& id,
424423
std::shared_ptr<const Config> config,
425-
folly::Executor* FOLLY_NULLABLE executor = nullptr) = 0;
424+
folly::Executor* executor = nullptr) = 0;
426425

427426
private:
428427
const std::string name_;

velox/connectors/hive/HiveDataSource.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -178,11 +178,11 @@ std::unique_ptr<SplitReader> HiveDataSource::createSplitReader() {
178178
}
179179

180180
void HiveDataSource::addSplit(std::shared_ptr<ConnectorSplit> split) {
181-
VELOX_CHECK(
182-
split_ == nullptr,
181+
VELOX_CHECK_NULL(
182+
split_,
183183
"Previous split has not been processed yet. Call next to process the split.");
184184
split_ = std::dynamic_pointer_cast<HiveConnectorSplit>(split);
185-
VELOX_CHECK(split_, "Wrong type of split");
185+
VELOX_CHECK_NOT_NULL(split_, "Wrong type of split");
186186

187187
VLOG(1) << "Adding split " << split_->toString();
188188

velox/exec/TableScan.cpp

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@ using facebook::velox::common::testutil::TestValue;
2323

2424
namespace facebook::velox::exec {
2525

26-
std::atomic<uint64_t> TableScan::ioWaitNanos_;
27-
2826
TableScan::TableScan(
2927
int32_t operatorId,
3028
DriverCtx* driverCtx,
@@ -74,14 +72,14 @@ RowVectorPtr TableScan::getOutput() {
7472
// w/o producing a result. In this case we return with the Yield blocking
7573
// reason and an already fulfilled future.
7674
curStatus_ = "getOutput: task->shouldStop";
77-
if (this->driverCtx_->task->shouldStop() != StopReason::kNone or
78-
(getOutputTimeLimitMs_ != 0 and
75+
if ((driverCtx_->task->shouldStop() != StopReason::kNone) ||
76+
((getOutputTimeLimitMs_ != 0) &&
7977
(getCurrentTimeMs() - startTimeMs) >= getOutputTimeLimitMs_)) {
8078
blockingReason_ = BlockingReason::kYield;
8179
blockingFuture_ = ContinueFuture{folly::Unit{}};
8280
// A point for test code injection.
8381
TestValue::adjust(
84-
"facebook::velox::exec::TableScan::getOutput::bail", this);
82+
"facebook::velox::exec::TableScan::getOutput::yield", this);
8583
return nullptr;
8684
}
8785

@@ -103,19 +101,19 @@ RowVectorPtr TableScan::getOutput() {
103101

104102
if (!split.hasConnectorSplit()) {
105103
noMoreSplits_ = true;
106-
pendingDynamicFilters_.clear();
104+
dynamicFilters_.clear();
107105
if (dataSource_) {
108106
curStatus_ = "getOutput: noMoreSplits_=1, updating stats_";
109-
auto connectorStats = dataSource_->runtimeStats();
107+
const auto connectorStats = dataSource_->runtimeStats();
110108
auto lockedStats = stats_.wlock();
111109
for (const auto& [name, counter] : connectorStats) {
112110
if (name == "ioWaitNanos") {
113111
ioWaitNanos_ += counter.value - lastIoWaitNanos_;
114112
lastIoWaitNanos_ = counter.value;
115113
}
116-
if (UNLIKELY(lockedStats->runtimeStats.count(name) == 0)) {
117-
lockedStats->runtimeStats.insert(
118-
std::make_pair(name, RuntimeMetric(counter.unit)));
114+
if (FOLLY_UNLIKELY(lockedStats->runtimeStats.count(name) == 0)) {
115+
lockedStats->runtimeStats.emplace(
116+
name, RuntimeMetric(counter.unit));
119117
} else {
120118
VELOX_CHECK_EQ(
121119
lockedStats->runtimeStats.at(name).unit, counter.unit);
@@ -148,7 +146,7 @@ RowVectorPtr TableScan::getOutput() {
148146
tableHandle_,
149147
columnHandles_,
150148
connectorQueryCtx_.get());
151-
for (const auto& entry : pendingDynamicFilters_) {
149+
for (const auto& entry : dynamicFilters_) {
152150
dataSource_->addDynamicFilter(entry.first, entry.second);
153151
}
154152
}
@@ -167,8 +165,8 @@ RowVectorPtr TableScan::getOutput() {
167165
if (connectorSplit->dataSource) {
168166
curStatus_ = "getOutput: preloaded split";
169167
++numPreloadedSplits_;
170-
// The AsyncSource returns a unique_ptr to a shared_ptr. The
171-
// unique_ptr will be nullptr if there was a cancellation.
168+
// The AsyncSource returns a unique_ptr to a shared_ptr. The unique_ptr
169+
// will be nullptr if there was a cancellation.
172170
numReadyPreloadedSplits_ += connectorSplit->dataSource->hasValue();
173171
auto preparedDataSource = connectorSplit->dataSource->move();
174172
stats_.wlock()->getOutputTiming.add(
@@ -187,7 +185,7 @@ RowVectorPtr TableScan::getOutput() {
187185
++stats_.wlock()->numSplits;
188186

189187
curStatus_ = "getOutput: dataSource_->estimatedRowSize";
190-
auto estimatedRowSize = dataSource_->estimatedRowSize();
188+
const auto estimatedRowSize = dataSource_->estimatedRowSize();
191189
readBatchSize_ =
192190
estimatedRowSize == connector::DataSource::kUnknownRowSize
193191
? outputBatchRows()
@@ -201,6 +199,7 @@ RowVectorPtr TableScan::getOutput() {
201199
if (operatorCtx_->task()->isCancelled()) {
202200
return nullptr;
203201
}
202+
204203
ExceptionContextSetter exceptionContext(
205204
{[](VeloxException::Type /*exceptionType*/, auto* debugString) {
206205
return *static_cast<std::string*>(debugString);
@@ -235,8 +234,8 @@ RowVectorPtr TableScan::getOutput() {
235234
curStatus_ = "getOutput: updating stats_.rawInput";
236235
lockedStats->rawInputPositions = dataSource_->getCompletedRows();
237236
lockedStats->rawInputBytes = dataSource_->getCompletedBytes();
238-
auto data = dataOptional.value();
239-
if (data) {
237+
RowVectorPtr data = dataOptional.value();
238+
if (data != nullptr) {
240239
if (data->size() > 0) {
241240
lockedStats->addInputVector(data->estimateFlatSize(), data->size());
242241
constexpr int kMaxSelectiveBatchSizeMultiplier = 4;
@@ -285,7 +284,7 @@ void TableScan::preload(std::shared_ptr<connector::ConnectorSplit> split) {
285284
ctx = operatorCtx_->createConnectorQueryCtx(
286285
split->connectorId, planNodeId(), connectorPool_),
287286
task = operatorCtx_->task(),
288-
pendingDynamicFilters = pendingDynamicFilters_,
287+
dynamicFilters = dynamicFilters_,
289288
split]() -> std::unique_ptr<connector::DataSource> {
290289
if (task->isCancelled()) {
291290
return nullptr;
@@ -298,20 +297,21 @@ void TableScan::preload(std::shared_ptr<connector::ConnectorSplit> split) {
298297
},
299298
&debugString});
300299

301-
auto ptr = connector->createDataSource(type, table, columns, ctx.get());
300+
auto dataSource =
301+
connector->createDataSource(type, table, columns, ctx.get());
302302
if (task->isCancelled()) {
303303
return nullptr;
304304
}
305-
for (const auto& entry : pendingDynamicFilters) {
306-
ptr->addDynamicFilter(entry.first, entry.second);
305+
for (const auto& entry : dynamicFilters) {
306+
dataSource->addDynamicFilter(entry.first, entry.second);
307307
}
308-
ptr->addSplit(split);
309-
return ptr;
308+
dataSource->addSplit(split);
309+
return dataSource;
310310
});
311311
}
312312

313313
void TableScan::checkPreload() {
314-
auto executor = connector_->executor();
314+
auto* executor = connector_->executor();
315315
if (maxSplitPreloadPerDriver_ == 0 || !executor ||
316316
!connector_->supportsSplitPreload()) {
317317
return;
@@ -345,7 +345,7 @@ void TableScan::addDynamicFilter(
345345
if (dataSource_) {
346346
dataSource_->addDynamicFilter(outputChannel, filter);
347347
}
348-
pendingDynamicFilters_.emplace(outputChannel, filter);
348+
dynamicFilters_.emplace(outputChannel, filter);
349349
}
350350

351351
} // namespace facebook::velox::exec

velox/exec/TableScan.h

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -57,20 +57,19 @@ class TableScan : public SourceOperator {
5757
}
5858

5959
private:
60-
// Sets 'maxPreloadSplits' and 'splitPreloader' if prefetching
61-
// splits is appropriate. The preloader will be applied to the
62-
// 'first 'maxPreloadSplits' of the Tasks's split queue for 'this'
63-
// when getting splits.
60+
// Sets 'maxPreloadSplits' and 'splitPreloader' if prefetching splits is
61+
// appropriate. The preloader will be applied to the 'first 'maxPreloadSplits'
62+
// of the Task's split queue for 'this' when getting splits.
6463
void checkPreload();
6564

66-
// Sets 'split->dataSource' to be a Asyncsource that makes a
67-
// DataSource to read 'split'. This source will be prepared in the
68-
// background on the executor of the connector. If the DataSource is
69-
// needed before prepare is done, it will be made when needed.
65+
// Sets 'split->dataSource' to be an AsyncSource that makes a DataSource to
66+
// read 'split'. This source will be prepared in the background on the
67+
// executor of the connector. If the DataSource is needed before prepare is
68+
// done, it will be made when needed.
7069
void preload(std::shared_ptr<connector::ConnectorSplit> split);
7170

7271
// Process-wide IO wait time.
73-
static std::atomic<uint64_t> ioWaitNanos_;
72+
inline static std::atomic<uint64_t> ioWaitNanos_;
7473

7574
const std::shared_ptr<connector::ConnectorTableHandle> tableHandle_;
7675
const std::
@@ -88,17 +87,16 @@ class TableScan : public SourceOperator {
8887
bool noMoreSplits_ = false;
8988
// Dynamic filters to add to the data source when it gets created.
9089
std::unordered_map<column_index_t, std::shared_ptr<common::Filter>>
91-
pendingDynamicFilters_;
90+
dynamicFilters_;
9291

9392
int32_t maxPreloadedSplits_{0};
9493

9594
const int32_t maxSplitPreloadPerDriver_{0};
9695

97-
// Callback passed to getSplitOrFuture() for triggering async
98-
// preload. The callback's lifetime is the lifetime of 'this'. This
99-
// callback can schedule preloads on an executor. These preloads may
100-
// outlive the Task and therefore need to capture a shared_ptr to
101-
// it.
96+
// Callback passed to getSplitOrFuture() for triggering async preload. The
97+
// callback's lifetime is the lifetime of 'this'. This callback can schedule
98+
// preloads on an executor. These preloads may outlive the Task and therefore
99+
// need to capture a shared_ptr to it.
102100
std::function<void(const std::shared_ptr<connector::ConnectorSplit>&)>
103101
splitPreloader_{nullptr};
104102

@@ -111,8 +109,8 @@ class TableScan : public SourceOperator {
111109
int32_t readBatchSize_;
112110
int32_t maxReadBatchSize_;
113111

114-
// Exits getOutput() method after this many milliseconds.
115-
// Zero means 'no limit'.
112+
// Exits getOutput() method after this many milliseconds. Zero means 'no
113+
// limit'.
116114
size_t getOutputTimeLimitMs_{0};
117115

118116
double maxFilteringRatio_{0};

velox/exec/Task.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1438,23 +1438,23 @@ exec::Split Task::getSplitLocked(
14381438
int32_t maxPreloadSplits,
14391439
const ConnectorSplitPreloadFunc& preload) {
14401440
int32_t readySplitIndex = -1;
1441-
if (maxPreloadSplits) {
1441+
if (maxPreloadSplits > 0) {
14421442
for (auto i = 0; i < splitsStore.splits.size() && i < maxPreloadSplits;
14431443
++i) {
14441444
auto& connectorSplit = splitsStore.splits[i].connectorSplit;
14451445
if (!connectorSplit->dataSource) {
1446-
// Initializes split->dataSource
1446+
// Initializes split->dataSource.
14471447
preload(connectorSplit);
14481448
} else if (
1449-
readySplitIndex == -1 && connectorSplit->dataSource->hasValue()) {
1449+
(readySplitIndex == -1) && (connectorSplit->dataSource->hasValue())) {
14501450
readySplitIndex = i;
14511451
}
14521452
}
14531453
}
14541454
if (readySplitIndex == -1) {
14551455
readySplitIndex = 0;
14561456
}
1457-
assert(!splitsStore.splits.empty());
1457+
VELOX_CHECK(!splitsStore.splits.empty());
14581458
auto split = std::move(splitsStore.splits[readySplitIndex]);
14591459
splitsStore.splits.erase(splitsStore.splits.begin() + readySplitIndex);
14601460

velox/exec/tests/TableScanTest.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -491,7 +491,7 @@ DEBUG_ONLY_TEST_F(TableScanTest, timeLimitInGetOutput) {
491491
// Count how many times we bailed from getOutput.
492492
size_t numBailed{0};
493493
SCOPED_TESTVALUE_SET(
494-
"facebook::velox::exec::TableScan::getOutput::bail",
494+
"facebook::velox::exec::TableScan::getOutput::yield",
495495
std::function<void(const TableScan*)>(
496496
([&](const TableScan* /*tableScan*/) { ++numBailed; })));
497497

0 commit comments

Comments
 (0)