@@ -76,9 +76,34 @@ struct Queue {
7676
7777impl Queue {
7878 /// Whether the queue is exhausted.
79- const fn is_exhasted ( & self ) -> bool {
79+ const fn is_exhausted ( & self ) -> bool {
8080 self . remaining == 0
8181 }
82+
83+ /// Whether the queue is stopped.
84+ const fn is_stopped ( & self ) -> bool {
85+ !self . in_flight && ( !self . is_exhausted ( ) || self . reset . is_none ( ) )
86+ }
87+
88+ /// Take pending permit requests with the provided endpoint.
89+ fn take ( & mut self , endpoint : & Endpoint ) -> VecDeque < Message > {
90+ let ( taken, retained) = mem:: take ( & mut self . pending )
91+ . into_iter ( )
92+ . filter ( |req| !req. notifier . is_closed ( ) )
93+ . partition ( |req| req. endpoint == * endpoint) ;
94+ self . pending = retained;
95+
96+ taken
97+ }
98+
99+ /// Convert the queue into a bucket.
100+ fn to_bucket ( & self , f : impl FnOnce ( Key ) -> Instant ) -> Option < crate :: Bucket > {
101+ self . reset . map ( |key| crate :: Bucket {
102+ limit : self . limit ,
103+ remaining : self . remaining ,
104+ reset_at : f ( key) . into ( ) ,
105+ } )
106+ }
82107}
83108
84109impl From < VecDeque < Message > > for Queue {
@@ -136,12 +161,9 @@ pub async fn runner(
136161 macro_rules! try_pop {
137162 ( $queue: ident) => {
138163 let ( mut tx, rx) = oneshot:: channel( ) ;
139- while let Some ( req) = $queue
140- . pending
141- . front( )
142- . is_some_and( |req| global_remaining != 0 || req. endpoint. is_interaction( ) )
143- . then( || $queue. pending. pop_front( ) )
144- . flatten( )
164+ while let Some ( req) = $queue. pending. front( )
165+ && ( global_remaining != 0 || req. endpoint. is_interaction( ) )
166+ && let Some ( req) = $queue. pending. pop_front( )
145167 {
146168 match req. notifier. send( tx) {
147169 Ok ( ( ) ) => {
@@ -181,10 +203,7 @@ pub async fn runner(
181203 }
182204 ( ) = & mut global_timer, if global_remaining != global_limit => {
183205 global_remaining = global_limit;
184- // Try resume all stopped queues.
185- for ( _, queue) in queues. iter_mut( ) . filter( |( _, queue) | {
186- !queue. in_flight && ( !queue. is_exhasted( ) || queue. reset. is_none( ) )
187- } ) {
206+ for ( _, queue) in queues. iter_mut( ) . filter( |( _, queue) | queue. is_stopped( ) ) {
188207 try_pop!( queue) ;
189208 }
190209 }
@@ -195,17 +214,17 @@ pub async fn runner(
195214 debug_assert!( !queue. in_flight) ;
196215 queue. reset = None ;
197216 // Note that non-exhausted queues are not stopped.
198- if queue. is_exhasted ( ) {
217+ if queue. is_exhausted ( ) {
199218 try_pop!( queue) ;
200219 }
201220 }
202221 Some ( Ok ( ( endpoint, headers) ) ) = in_flight. join_next( ) => {
203222 let _span = tracing:: info_span!( "resp" , ?endpoint) . entered( ) ;
204- if let Ok ( Some ( headers) ) = headers {
223+ if let Ok ( Some ( mut headers) ) = headers {
205224 tracing:: trace!( ?headers) ;
206225
207226 let hash = hasher. bucket( & headers. bucket, & endpoint) ;
208- let queue = match buckets. entry( endpoint. clone ( ) ) {
227+ let queue = match buckets. entry( endpoint) {
209228 MapEntry :: Occupied ( entry) if * entry. get( ) == headers. bucket => {
210229 & mut queues. find_mut( hash, |& ( key, _) | key == hash) . unwrap( ) . 1
211230 }
@@ -218,20 +237,16 @@ pub async fn runner(
218237
219238 // Retrieve this endpoint's requests.
220239 let ( _, old_queue) = queues. find_mut( old_hash, |& ( key, _) | key == old_hash) . unwrap( ) ;
240+ let pending = old_queue. take( old_entry. key( ) ) ;
221241 old_queue. in_flight = false ;
222- let ( pending, old_pending) = mem:: take( & mut old_queue. pending)
223- . into_iter( )
224- . filter( |req| !req. notifier. is_closed( ) )
225- . partition:: <VecDeque <_>, _>( |req| req. endpoint == * old_entry. key( ) ) ;
226- old_queue. pending = old_pending;
227242 try_pop!( old_queue) ;
228243
229244 match old_entry {
230245 MapEntry :: Occupied ( mut entry) => {
231- entry. insert( headers. bucket) ;
246+ entry. insert( mem :: take ( & mut headers. bucket) ) ;
232247 }
233248 MapEntry :: Vacant ( entry) => {
234- entry. insert( headers. bucket) ;
249+ entry. insert( mem :: take ( & mut headers. bucket) ) ;
235250 }
236251 }
237252 // And move them into the new queue.
@@ -251,23 +266,19 @@ pub async fn runner(
251266 }
252267 } ;
253268
254- queue. in_flight = false ;
255269 queue. limit = headers. limit;
256270 queue. remaining = headers. remaining;
257- if let Some ( key) = & queue. reset {
258- reset. reset_at( key, headers. reset_at. into( ) ) ;
259- } else {
260- queue. reset = Some ( reset. insert_at( hash, headers. reset_at. into( ) ) ) ;
261- }
262- if queue. is_exhasted( ) {
263- tracing:: info!(
264- reset_after = ?headers. reset_at. saturating_duration_since( Instant :: now( ) . into( ) ) ,
265- "exhausted"
266- ) ;
267- continue ;
271+ match & queue. reset {
272+ Some ( key) => reset. reset_at( key, headers. reset_at. into( ) ) ,
273+ None => queue. reset = Some ( reset. insert_at( hash, headers. reset_at. into( ) ) ) ,
268274 }
269275
270- try_pop!( queue) ;
276+ queue. in_flight = false ;
277+ if queue. is_exhausted( ) {
278+ tracing:: info!( reset_after = ?headers. reset_after( ) , "exhausted" ) ;
279+ } else {
280+ try_pop!( queue) ;
281+ }
271282 } else {
272283 if headers. is_err( ) {
273284 tracing:: debug!( "cancelled" ) ;
@@ -306,16 +317,9 @@ pub async fn runner(
306317 }
307318 } ;
308319
309- let is_cancelled = pred. is_some_and( |p| !p( queue. reset. map( |key| crate :: Bucket {
310- limit: queue. limit,
311- remaining: queue. remaining,
312- reset_at: reset. deadline( & key) . into( ) ,
313- } ) ) ) ;
314-
315- let queue_active = queue. in_flight || ( queue. is_exhasted( ) && queue. reset. is_some( ) ) ;
316- if is_cancelled {
320+ if pred. is_some_and( |p| !p( queue. to_bucket( |key| reset. deadline( & key) ) ) ) {
317321 drop( msg) ;
318- } else if queue_active || ( global_remaining == 0 && !msg. endpoint. is_interaction( ) ) {
322+ } else if !queue . is_stopped ( ) || ( global_remaining == 0 && !msg. endpoint. is_interaction( ) ) {
319323 queue. pending. push_back( msg) ;
320324 } else {
321325 let ( tx, rx) = oneshot:: channel( ) ;
0 commit comments