Skip to content

Conversation

@lingbin
Copy link
Contributor

@lingbin lingbin commented Dec 19, 2025

Replace the 'while' loop with a simple 'if' condition in
'ExchangeQueue::enqueueLocked()' to avoid unnecessary iterations when
checking for waiting consumers. Because after all, only one page is
added here, so at most only one consumer should be awakened.
Otherwise, excess awakened consumers will block again immediately
due to no available pages, which wastes resources.

After this patch, we only wake up one waiting consumer at a time when
accumulated data meets the minimum batch size requirement(see
'minOutputBatchBytesLocked()'). This reduces the number of awakened
consumers and improves performance.

See [1] for discussion.

[1] #12010 (comment)

Replace the 'while' loop with a simple 'if' condition in
'ExchangeQueue::enqueueLocked()' to avoid unnecessary iterations when
checking for waiting consumers. Because after all, only one page is
added here, so at most **only one consumer** should be awakened.
Otherwise, excess awakened consumers will block again immediately
due to no available pages, which wastes resources.

After this patch, we only wake up one waiting consumer at a time when
accumulated data meets the minimum batch size requirement(see
'minOutputBatchBytesLocked()'). This reduces the number of awakened
consumers and improves performance.
@netlify
Copy link

netlify bot commented Dec 19, 2025

Deploy Preview for meta-velox canceled.

Name Link
🔨 Latest commit fbbff46
🔍 Latest deploy log https://app.netlify.com/projects/meta-velox/deploys/6948f5b7b77b240008b2c830

@meta-cla meta-cla bot added the CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. label Dec 19, 2025
@mbasmanova mbasmanova requested a review from kagamiori December 19, 2025 12:58
@mbasmanova
Copy link
Contributor

CC: @arhimondr

if (unasignedBytes < minBatchSize) {
break;
}
if (!promises_.empty() && totalBytes_ >= minBatchSize) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it possible that there are > 1 pages in the queue_?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mbasmanova Thank you for your review. Yes, there can be more than one page in the queue.

Since a consumer consumes multiple pages at a time(dequeueLocked() return a std::vector at a time), I think the key here isn't the number of remaining pages in the queue_, but rather the total size of all the remaining pages in the queue_.
Because only one page was added here, waking up one consumer is sufficient, the awakened consumer will consume all the data in the queue_.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the key here isn't the number of remaining pages in the queue_, but rather the total size of all the remaining pages in the queue_.

This is my understanding as well.

Because only one page was added here, waking up one consumer is sufficient, the awakened consumer will consume all the data in the queue_.

I'm not sure I understand why "the awakened consumer will consume all the data in the queue_". I believe it is possible that there is more data in the queue then can be consumed in a single dequeueLocked call. Am I missing something?

...If multiple pages are available,
  /// returns as many pages as fit within 'maxBytes', but no fewer than one...

  std::vector<std::unique_ptr<SerializedPageBase>> dequeueLocked(
      int consumerId,
      uint32_t maxBytes,
      bool* atEnd,
      ContinueFuture* future,
      ContinuePromise* stalePromise);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mbasmanova Thanks for your reply.

If there is data in the queue_ but the consumer is still blocked, it means the data in the queue_ is less than minExchangeOutputBatchBytes (value: 2MB). Each consumer (the Exchange operator) consumes data with maxBytes = preferredOutputBatchBytes (value: 10MB, bigger than 2MB), so it will consume all the data.

currentPages_ = exchangeClient_->next(
driverId_, preferredOutputBatchBytes_, &atEnd_, &dataFuture);

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants