-
Notifications
You must be signed in to change notification settings - Fork 1.4k
feat: Avoid small batches in Exchange #12010
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
This pull request was exported from Phabricator. Differential Revision: D67615570 |
✅ Deploy Preview for meta-velox canceled.
|
Summary: X-link: facebookincubator/velox#12010 Prevent exchange client from unblocking to early. Unblocking to early impedes effectiveness of page merging. When the cost of creating a vector is high (for example for data sets with high number of columns) creating small pages can make queries significantly less efficient. For example it was observed that when network is congested and Exchange buffers are not filled up as fast query may experience CPU efficiency drop up to 4x: T211034421 Differential Revision: D67615570
d634e88 to
ebd2e0b
Compare
|
This pull request was exported from Phabricator. Differential Revision: D67615570 |
Summary: Prevent exchange client from unblocking to early. Unblocking to early impedes effectiveness of page merging. When the cost of creating a vector is high (for example for data sets with high number of columns) creating small pages can make queries significantly less efficient. For example it was observed that when network is congested and Exchange buffers are not filled up as fast query may experience CPU efficiency drop up to 4x: T211034421 Differential Revision: D67615570
ebd2e0b to
54f4247
Compare
|
This pull request was exported from Phabricator. Differential Revision: D67615570 |
xiaoxmeng
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@arhimondr thank for the change % comments.
54f4247 to
0c92f8b
Compare
|
This pull request was exported from Phabricator. Differential Revision: D67615570 |
0c92f8b to
8197447
Compare
|
This pull request was exported from Phabricator. Differential Revision: D67615570 |
8197447 to
6421f7d
Compare
|
This pull request was exported from Phabricator. Differential Revision: D67615570 |
xiaoxmeng
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@arhimondr LGTM. Thanks for the update!
6421f7d to
2631e22
Compare
|
This pull request was exported from Phabricator. Differential Revision: D67615570 |
2631e22 to
34fbab7
Compare
|
This pull request was exported from Phabricator. Differential Revision: D67615570 |
34fbab7 to
d82f1c8
Compare
|
This pull request was exported from Phabricator. Differential Revision: D67615570 |
d82f1c8 to
401586f
Compare
|
This pull request was exported from Phabricator. Differential Revision: D67615570 |
xiaoxmeng
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@arhimondr thanks for the iterations % nits.
| operatorCtx_->driverCtx()->queryConfig(), | ||
| serdeKind_)}, | ||
| processSplits_{operatorCtx_->driverCtx()->driverId == 0}, | ||
| driverId_{driverCtx->driverId}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we can just fetch from operatorCtx_->driverCtx()->driverId and not necessary save a copy of it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just wanted to safe a couple of extra dereferences :-)
velox/exec/ExchangeQueue.h
Outdated
| uint32_t maxBytes, | ||
| bool* atEnd, | ||
| ContinueFuture* future, | ||
| std::vector<ContinuePromise>& promises); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/promises/staledPromises/
Maybe put a comment? And at the caller, we expect it at most has one promise?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually yeah, let me use a pointer instead of a list. It should never be more than one
Summary: Prevent exchange client from unblocking to early. Unblocking to early impedes effectiveness of page merging. When the cost of creating a vector is high (for example for data sets with high number of columns) creating small pages can make queries significantly less efficient. For example it was observed that when network is congested and Exchange buffers are not filled up as fast query may experience CPU efficiency drop up to 4x: T211034421 Reviewed By: xiaoxmeng Differential Revision: D67615570
401586f to
d263477
Compare
|
This pull request was exported from Phabricator. Differential Revision: D67615570 |
|
This pull request has been merged in 121b230. |
|
Conbench analyzed the 0 benchmark runs that triggered this notification. None of the specified runs were found on the Conbench server. The full Conbench report has more details. |
| queue_.push_back(std::move(page)); | ||
| if (!promises_.empty()) { | ||
| const auto minBatchSize = minOutputBatchBytesLocked(); | ||
| while (!promises_.empty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@arhimondr I have a question to confirm: there is actually no need for a loop here, because after all, only one page is added here, so at most only one consumer should be awakened, right?
Looking forward to your guidance.
cc @xiaoxmeng
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lingbin I think you are right. I don't think a single page can be consumed by more than a single consumer today (even if it is large). The loop does not seem to be necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@arhimondr Thanks for your quick reply.
Meanwhile, I think it can be simplified to the following code, because the resumed driver will try to read as much data as possible (as long as it does not exceed 'maxBytes').
What do you think? If you think it's OK, I can create a PR to make this change.
Before:
while (!promises_.empty()) {
VELOX_CHECK_LE(promises_.size(), numberOfConsumers_);
const int32_t unblockedConsumers = numberOfConsumers_ - promises_.size();
const int64_t unasignedBytes =
totalBytes_ - unblockedConsumers * minBatchSize;
if (unasignedBytes < minBatchSize) {
break;
}
// Resume one of the waiting drivers.
auto it = promises_.begin();
promises.push_back(std::move(it->second));
promises_.erase(it);
}After:
if (!promises_.empty() && totalBytes_ >= minBatchSize) {
// Resume one of the waiting drivers.
auto it = promises_.begin();
promises.push_back(std::move(it->second));
promises_.erase(it);
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lingbin you need to take into account how many consumers are inflight to avoid unblocking too many.
Consider receiving minBatchSize and unblocking one consumer. Then receiving minBatchSize / 2 worth of data would increase the totalBytes_ to 1.5 * minBatchSize and unblocking one more consumer (what is not desired).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@arhimondr Thank you for your explanation. Now I understand the purpose of unasignedBytes.
However, I found that according to the current strategy, unblocking will only be performed when totalBytes_ >= (unblockedConsumers +1) * minBatchSize. Will this lead to too few unblocks?
Consider const numberOfConsumers_ = 10, promises_.size() = 2, then according to the current strategy, it is necessary to receive 9 pages (totalBytes_ == 9 * minBatchSize) before unblocking one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider const numberOfConsumers_ = 10, promises_.size() = 2, then according to the current strategy, it is necessary to receive 9 pages (totalBytes_ == 9 * minBatchSize) before unblocking one.
Yes, this is correct.
When there are 8 consumers unblocked to unblock a next one you need to receive (8 * minBatchSize) + minBatchSize so every consumer has minBatchSize to process (on average)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When there are 8 consumers unblocked to unblock a next one you need to receive
(8 * minBatchSize) + minBatchSizeso every consumer hasminBatchSizeto process (on average)
@arhimondr Do you mean that you originally wanted to unblock 8 consumers (so that each consumes minBatchSize size of data)? But in fact, only one will be unblocked here, and then it will consume 9 minBatchSize size of data at a time. (The default "preferred_output_batch_bytes" size is 10MB).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@arhimondr There seem to be two points here: "the number of unblocks" and "whether to generate a small vector".
These two issues do not seem to be causally related, as long as each of our consumers guarantees that they will consume at least minBatchSize size of data each time, then we can guarantee that small vector will not be generated.
This is already guaranteed in dequeueLocked(), right? I see:
velox/velox/exec/ExchangeQueue.cpp
Lines 146 to 151 in d8cac2f
| // If we don't have enough bytes to return, we wait for more data to be | |
| // available | |
| if (totalBytes_ < minOutputBatchBytesLocked()) { | |
| addPromiseLocked(consumerId, future, stalePromise); | |
| return {}; | |
| } |
For your example:
t1: add one page: totalBytes_ = minBatchSize, unblock 'consumer-1'
t2: add one page: totalBytes_ = 1.5 * minBatchSize, unblock 'consumer-2'
t3: 'consumer-1' will consume data of size `1.5 * minBatchSize` at one time and generate a RowVector
t4: 'consumer-2' will find that there is no data to consume, and will be blocked again. No new RowVector will be generated.
That is, although it is unblocked twice, only one RowVector will be generated in the end.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is already guaranteed in dequeueLocked(), right? I see:
Yes, there is a second check in dequeueLocked. The idea is not to unblock too many. For example when you have totalBytes_ = 10 * minBatchSize unblocking more than 10 consumers does not make sense, as at least on of them will get blocked again in dequeueLocked
Summary:
Prevent exchange client from unblocking to early. Unblocking to early impedes
effectiveness of page merging. When the cost of creating a vector is high (for
example for data sets with high number of columns) creating small pages can
make queries significantly less efficient.
For example it was observed that when network is congested and Exchange buffers
are not filled up as fast query may experience CPU efficiency drop up to 4x: T211034421
Differential Revision: D67615570