@@ -23,8 +23,6 @@ using facebook::velox::common::testutil::TestValue;
2323
2424namespace facebook ::velox::exec {
2525
26- std::atomic<uint64_t > TableScan::ioWaitNanos_;
27-
2826TableScan::TableScan (
2927 int32_t operatorId,
3028 DriverCtx* driverCtx,
@@ -74,14 +72,14 @@ RowVectorPtr TableScan::getOutput() {
7472 // w/o producing a result. In this case we return with the Yield blocking
7573 // reason and an already fulfilled future.
7674 curStatus_ = " getOutput: task->shouldStop" ;
77- if (this -> driverCtx_ ->task ->shouldStop () != StopReason::kNone or
78- (getOutputTimeLimitMs_ != 0 and
75+ if (( driverCtx_->task ->shouldStop () != StopReason::kNone ) ||
76+ (( getOutputTimeLimitMs_ != 0 ) &&
7977 (getCurrentTimeMs () - startTimeMs) >= getOutputTimeLimitMs_)) {
8078 blockingReason_ = BlockingReason::kYield ;
8179 blockingFuture_ = ContinueFuture{folly::Unit{}};
8280 // A point for test code injection.
8381 TestValue::adjust (
84- " facebook::velox::exec::TableScan::getOutput::bail " , this );
82+ " facebook::velox::exec::TableScan::getOutput::yield " , this );
8583 return nullptr ;
8684 }
8785
@@ -103,19 +101,19 @@ RowVectorPtr TableScan::getOutput() {
103101
104102 if (!split.hasConnectorSplit ()) {
105103 noMoreSplits_ = true ;
106- pendingDynamicFilters_ .clear ();
104+ dynamicFilters_ .clear ();
107105 if (dataSource_) {
108106 curStatus_ = " getOutput: noMoreSplits_=1, updating stats_" ;
109- auto connectorStats = dataSource_->runtimeStats ();
107+ const auto connectorStats = dataSource_->runtimeStats ();
110108 auto lockedStats = stats_.wlock ();
111109 for (const auto & [name, counter] : connectorStats) {
112110 if (name == " ioWaitNanos" ) {
113111 ioWaitNanos_ += counter.value - lastIoWaitNanos_;
114112 lastIoWaitNanos_ = counter.value ;
115113 }
116- if (UNLIKELY (lockedStats->runtimeStats .count (name) == 0 )) {
117- lockedStats->runtimeStats .insert (
118- std::make_pair ( name, RuntimeMetric (counter.unit ) ));
114+ if (FOLLY_UNLIKELY (lockedStats->runtimeStats .count (name) == 0 )) {
115+ lockedStats->runtimeStats .emplace (
116+ name, RuntimeMetric (counter.unit ));
119117 } else {
120118 VELOX_CHECK_EQ (
121119 lockedStats->runtimeStats .at (name).unit , counter.unit );
@@ -148,7 +146,7 @@ RowVectorPtr TableScan::getOutput() {
148146 tableHandle_,
149147 columnHandles_,
150148 connectorQueryCtx_.get ());
151- for (const auto & entry : pendingDynamicFilters_ ) {
149+ for (const auto & entry : dynamicFilters_ ) {
152150 dataSource_->addDynamicFilter (entry.first , entry.second );
153151 }
154152 }
@@ -167,8 +165,8 @@ RowVectorPtr TableScan::getOutput() {
167165 if (connectorSplit->dataSource ) {
168166 curStatus_ = " getOutput: preloaded split" ;
169167 ++numPreloadedSplits_;
170- // The AsyncSource returns a unique_ptr to a shared_ptr. The
171- // unique_ptr will be nullptr if there was a cancellation.
168+ // The AsyncSource returns a unique_ptr to a shared_ptr. The unique_ptr
169+ // will be nullptr if there was a cancellation.
172170 numReadyPreloadedSplits_ += connectorSplit->dataSource ->hasValue ();
173171 auto preparedDataSource = connectorSplit->dataSource ->move ();
174172 stats_.wlock ()->getOutputTiming .add (
@@ -187,7 +185,7 @@ RowVectorPtr TableScan::getOutput() {
187185 ++stats_.wlock ()->numSplits ;
188186
189187 curStatus_ = " getOutput: dataSource_->estimatedRowSize" ;
190- auto estimatedRowSize = dataSource_->estimatedRowSize ();
188+ const auto estimatedRowSize = dataSource_->estimatedRowSize ();
191189 readBatchSize_ =
192190 estimatedRowSize == connector::DataSource::kUnknownRowSize
193191 ? outputBatchRows ()
@@ -201,6 +199,7 @@ RowVectorPtr TableScan::getOutput() {
201199 if (operatorCtx_->task ()->isCancelled ()) {
202200 return nullptr ;
203201 }
202+
204203 ExceptionContextSetter exceptionContext (
205204 {[](VeloxException::Type /* exceptionType*/ , auto * debugString) {
206205 return *static_cast <std::string*>(debugString);
@@ -235,11 +234,11 @@ RowVectorPtr TableScan::getOutput() {
235234 curStatus_ = " getOutput: updating stats_.rawInput" ;
236235 lockedStats->rawInputPositions = dataSource_->getCompletedRows ();
237236 lockedStats->rawInputBytes = dataSource_->getCompletedBytes ();
238- auto data = dataOptional.value ();
239- if (data) {
237+ RowVectorPtr data = dataOptional.value ();
238+ if (data != nullptr ) {
240239 if (data->size () > 0 ) {
241240 lockedStats->addInputVector (data->estimateFlatSize (), data->size ());
242- constexpr int kMaxSelectiveBatchSizeMultiplier = 4 ;
241+ static constexpr int kMaxSelectiveBatchSizeMultiplier = 4 ;
243242 maxFilteringRatio_ = std::max (
244243 {maxFilteringRatio_,
245244 1.0 * data->size () / readBatchSize,
@@ -285,7 +284,7 @@ void TableScan::preload(std::shared_ptr<connector::ConnectorSplit> split) {
285284 ctx = operatorCtx_->createConnectorQueryCtx (
286285 split->connectorId , planNodeId (), connectorPool_),
287286 task = operatorCtx_->task (),
288- pendingDynamicFilters = pendingDynamicFilters_ ,
287+ dynamicFilters = dynamicFilters_ ,
289288 split]() -> std::unique_ptr<connector::DataSource> {
290289 if (task->isCancelled ()) {
291290 return nullptr ;
@@ -298,20 +297,21 @@ void TableScan::preload(std::shared_ptr<connector::ConnectorSplit> split) {
298297 },
299298 &debugString});
300299
301- auto ptr = connector->createDataSource (type, table, columns, ctx.get ());
300+ auto dataSource =
301+ connector->createDataSource (type, table, columns, ctx.get ());
302302 if (task->isCancelled ()) {
303303 return nullptr ;
304304 }
305- for (const auto & entry : pendingDynamicFilters ) {
306- ptr ->addDynamicFilter (entry.first , entry.second );
305+ for (const auto & entry : dynamicFilters ) {
306+ dataSource ->addDynamicFilter (entry.first , entry.second );
307307 }
308- ptr ->addSplit (split);
309- return ptr ;
308+ dataSource ->addSplit (split);
309+ return dataSource ;
310310 });
311311}
312312
313313void TableScan::checkPreload () {
314- auto executor = connector_->executor ();
314+ auto * executor = connector_->executor ();
315315 if (maxSplitPreloadPerDriver_ == 0 || !executor ||
316316 !connector_->supportsSplitPreload ()) {
317317 return ;
@@ -345,7 +345,7 @@ void TableScan::addDynamicFilter(
345345 if (dataSource_) {
346346 dataSource_->addDynamicFilter (outputChannel, filter);
347347 }
348- pendingDynamicFilters_ .emplace (outputChannel, filter);
348+ dynamicFilters_ .emplace (outputChannel, filter);
349349}
350350
351351} // namespace facebook::velox::exec
0 commit comments