1414 * limitations under the License.
1515 */
1616#include " velox/exec/ExchangeQueue.h"
17+ #include < algorithm>
1718
1819namespace facebook ::velox::exec {
1920
@@ -64,6 +65,15 @@ void ExchangeQueue::close() {
6465 clearPromises (promises);
6566}
6667
68+ int64_t ExchangeQueue::minOutputBatchBytesLocked () const {
69+ // always allow to unblock when at end
70+ if (atEnd_) {
71+ return 0 ;
72+ }
73+ // At most 1% of received bytes so far to minimize latency for small exchanges
74+ return std::min<int64_t >(minOutputBatchBytes_, receivedBytes_ / 100 );
75+ }
76+
6777void ExchangeQueue::enqueueLocked (
6878 std::unique_ptr<SerializedPage>&& page,
6979 std::vector<ContinuePromise>& promises) {
@@ -86,17 +96,45 @@ void ExchangeQueue::enqueueLocked(
8696 receivedBytes_ += page->size ();
8797
8898 queue_.push_back (std::move (page));
89- if (!promises_.empty ()) {
99+ const auto minBatchSize = minOutputBatchBytesLocked ();
100+ while (!promises_.empty ()) {
101+ VELOX_CHECK_LE (promises_.size (), numberOfConsumers_);
102+ const int32_t unblockedConsumers = numberOfConsumers_ - promises_.size ();
103+ const int64_t unasignedBytes =
104+ totalBytes_ - unblockedConsumers * minBatchSize;
105+ if (unasignedBytes < minBatchSize) {
106+ break ;
107+ }
90108 // Resume one of the waiting drivers.
91- promises.push_back (std::move (promises_.back ()));
92- promises_.pop_back ();
109+ auto it = promises_.begin ();
110+ promises.push_back (std::move (it->second ));
111+ promises_.erase (it);
93112 }
94113}
95114
115+ void ExchangeQueue::addPromiseLocked (
116+ int consumerId,
117+ ContinueFuture* future,
118+ std::vector<ContinuePromise>& promises) {
119+ ContinuePromise promise{" ExchangeQueue::dequeue" };
120+ *future = promise.getSemiFuture ();
121+ auto it = promises_.find (consumerId);
122+ if (it != promises_.end ()) {
123+ // resolve stale promises outside the lock to avoid broken promises
124+ promises.push_back (std::move (it->second ));
125+ it->second = std::move (promise);
126+ } else {
127+ promises_[consumerId] = std::move (promise);
128+ }
129+ VELOX_CHECK_LE (promises_.size (), numberOfConsumers_);
130+ }
131+
96132std::vector<std::unique_ptr<SerializedPage>> ExchangeQueue::dequeueLocked (
133+ int consumerId,
97134 uint32_t maxBytes,
98135 bool * atEnd,
99- ContinueFuture* future) {
136+ ContinueFuture* future,
137+ std::vector<ContinuePromise>& promises) {
100138 VELOX_CHECK_NOT_NULL (future);
101139 if (!error_.empty ()) {
102140 *atEnd = true ;
@@ -105,15 +143,21 @@ std::vector<std::unique_ptr<SerializedPage>> ExchangeQueue::dequeueLocked(
105143
106144 *atEnd = false ;
107145
146+ // If we don't have enough bytes to return, we wait for more data to be
147+ // available
148+ if (totalBytes_ < minOutputBatchBytesLocked ()) {
149+ addPromiseLocked (consumerId, future, promises);
150+ return {};
151+ }
152+
108153 std::vector<std::unique_ptr<SerializedPage>> pages;
109154 uint32_t pageBytes = 0 ;
110155 for (;;) {
111156 if (queue_.empty ()) {
112157 if (atEnd_) {
113158 *atEnd = true ;
114159 } else if (pages.empty ()) {
115- promises_.emplace_back (" ExchangeQueue::dequeue" );
116- *future = promises_.back ().getSemiFuture ();
160+ addPromiseLocked (consumerId, future, promises);
117161 }
118162 return pages;
119163 }
0 commit comments