Skip to content

Commit 54f4247

Browse files
arhimondrfacebook-github-bot
authored andcommitted
feat: Avoid small batches in Exchange (facebookincubator#12010)
Summary: Prevent exchange client from unblocking to early. Unblocking to early impedes effectiveness of page merging. When the cost of creating a vector is high (for example for data sets with high number of columns) creating small pages can make queries significantly less efficient. For example it was observed that when network is congested and Exchange buffers are not filled up as fast query may experience CPU efficiency drop up to 4x: T211034421 Differential Revision: D67615570
1 parent b13e209 commit 54f4247

File tree

8 files changed

+308
-20
lines changed

8 files changed

+308
-20
lines changed

velox/core/QueryConfig.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,15 @@ class QueryConfig {
113113
static constexpr const char* kMaxMergeExchangeBufferSize =
114114
"merge_exchange.max_buffer_size";
115115

116+
/// The minimum number of bytes to accumulate in the ExchangeQueue
117+
/// before unblocking a consumer. This is used to avoid creating tiny
118+
/// batches which may have a negative impact on performance when the
119+
/// cost of creating vectors is high (for example, when there are many
120+
/// columns). To avoid latency degradation, the exchange client unblocks a
121+
/// consumer when 1% of the data size observed so far is accumulated.
122+
static constexpr const char* kMinExchangeOutputBatchBytes =
123+
"min_exchange_output_batch_bytes";
124+
116125
static constexpr const char* kMaxPartialAggregationMemory =
117126
"max_partial_aggregation_memory";
118127

@@ -594,6 +603,11 @@ class QueryConfig {
594603
return get<uint64_t>(kMaxMergeExchangeBufferSize, kDefault);
595604
}
596605

606+
uint64_t minExchangeOutputBatchBytes() const {
607+
static constexpr uint64_t kDefault = 2UL << 20;
608+
return get<uint64_t>(kMinExchangeOutputBatchBytes, kDefault);
609+
}
610+
597611
uint64_t preferredOutputBatchBytes() const {
598612
static constexpr uint64_t kDefault = 10UL << 20;
599613
return get<uint64_t>(kPreferredOutputBatchBytes, kDefault);

velox/exec/ExchangeClient.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ namespace facebook::velox::exec {
2525
class ExchangeClient : public std::enable_shared_from_this<ExchangeClient> {
2626
public:
2727
static constexpr int32_t kDefaultMaxQueuedBytes = 32 << 20; // 32 MB.
28+
static constexpr int32_t kDefaultMinExchangeOutputBatchBytes{
29+
2 << 20}; // 2 MB.
2830
static constexpr std::chrono::seconds kRequestDataSizesMaxWait{10};
2931
static constexpr std::chrono::milliseconds kRequestDataMaxWait{100};
3032
static inline const std::string kBackgroundCpuTimeMs = "backgroundCpuTimeMs";
@@ -33,14 +35,15 @@ class ExchangeClient : public std::enable_shared_from_this<ExchangeClient> {
3335
std::string taskId,
3436
int destination,
3537
int64_t maxQueuedBytes,
38+
uint64_t minOutputBatchBytes,
3639
memory::MemoryPool* pool,
3740
folly::Executor* executor)
3841
: taskId_{std::move(taskId)},
3942
destination_(destination),
4043
maxQueuedBytes_{maxQueuedBytes},
4144
pool_(pool),
4245
executor_(executor),
43-
queue_(std::make_shared<ExchangeQueue>()) {
46+
queue_(std::make_shared<ExchangeQueue>(minOutputBatchBytes)) {
4447
VELOX_CHECK_NOT_NULL(pool_);
4548
VELOX_CHECK_NOT_NULL(executor_);
4649
// NOTE: the executor is used to run async response callback from the

velox/exec/ExchangeQueue.cpp

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
* limitations under the License.
1515
*/
1616
#include "velox/exec/ExchangeQueue.h"
17+
#include <algorithm>
1718

1819
namespace facebook::velox::exec {
1920

@@ -64,6 +65,15 @@ void ExchangeQueue::close() {
6465
clearPromises(promises);
6566
}
6667

68+
int64_t ExchangeQueue::getMinOutputBatchBytesLocked() const {
69+
// always allow to unblock when at end
70+
if (atEnd_) {
71+
return 0;
72+
}
73+
// At most 1% of received bytes so far to minimize latency for small exchanges
74+
return std::min<int64_t>(minOutputBatchBytes_, receivedBytes_ / 100);
75+
}
76+
6777
void ExchangeQueue::enqueueLocked(
6878
std::unique_ptr<SerializedPage>&& page,
6979
std::vector<ContinuePromise>& promises) {
@@ -86,10 +96,13 @@ void ExchangeQueue::enqueueLocked(
8696
receivedBytes_ += page->size();
8797

8898
queue_.push_back(std::move(page));
89-
if (!promises_.empty()) {
99+
const auto minBatchSize = getMinOutputBatchBytesLocked();
100+
while (!promises_.empty() &&
101+
(totalBytes_ - (inflightConsumers_ * minBatchSize)) >= minBatchSize) {
90102
// Resume one of the waiting drivers.
91103
promises.push_back(std::move(promises_.back()));
92104
promises_.pop_back();
105+
inflightConsumers_++;
93106
}
94107
}
95108

@@ -105,6 +118,18 @@ std::vector<std::unique_ptr<SerializedPage>> ExchangeQueue::dequeueLocked(
105118

106119
*atEnd = false;
107120

121+
if (inflightConsumers_ > 0) {
122+
inflightConsumers_--;
123+
}
124+
125+
// If we don't have enough bytes to return, we wait for more data to be
126+
// available
127+
if (totalBytes_ < getMinOutputBatchBytesLocked()) {
128+
promises_.emplace_back("ExchangeQueue::dequeue");
129+
*future = promises_.back().getSemiFuture();
130+
return {};
131+
}
132+
108133
std::vector<std::unique_ptr<SerializedPage>> pages;
109134
uint32_t pageBytes = 0;
110135
for (;;) {

velox/exec/ExchangeQueue.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,13 @@ class SerializedPage {
8181
/// for input.
8282
class ExchangeQueue {
8383
public:
84+
#ifdef VELOX_ENABLE_BACKWARD_COMPATIBILITY
85+
explicit ExchangeQueue() : ExchangeQueue(1) {}
86+
#endif
87+
88+
explicit ExchangeQueue(uint64_t minOutputBatchBytes)
89+
: minOutputBatchBytes_{minOutputBatchBytes} {}
90+
8491
~ExchangeQueue() {
8592
clearAllPromises();
8693
}
@@ -185,6 +192,10 @@ class ExchangeQueue {
185192
}
186193
}
187194

195+
int64_t getMinOutputBatchBytesLocked() const;
196+
197+
const uint64_t minOutputBatchBytes_;
198+
188199
int numCompleted_{0};
189200
int numSources_{0};
190201
bool noMoreSources_{false};
@@ -205,5 +216,7 @@ class ExchangeQueue {
205216
int64_t receivedBytes_{0};
206217
// Maximum value of totalBytes_.
207218
int64_t peakBytes_{0};
219+
// Number of unblocked consumers expected to consume data shortly
220+
int64_t inflightConsumers_{0};
208221
};
209222
} // namespace facebook::velox::exec

velox/exec/MergeSource.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ class MergeExchangeSource : public MergeSource {
128128
mergeExchange->taskId(),
129129
destination,
130130
maxQueuedBytes,
131+
1,
131132
pool,
132133
executor)) {
133134
client_->addRemoteTaskId(taskId);

velox/exec/Task.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2997,6 +2997,7 @@ void Task::createExchangeClientLocked(
29972997
taskId_,
29982998
destination_,
29992999
queryCtx()->queryConfig().maxExchangeBufferSize(),
3000+
queryCtx()->queryConfig().minExchangeOutputBatchBytes(),
30003001
addExchangeClientPool(planNodeId, pipelineId),
30013002
queryCtx()->executor());
30023003
exchangeClientByPlanNode_.emplace(planNodeId, exchangeClients_[pipelineId]);

0 commit comments

Comments
 (0)