@@ -198,12 +198,7 @@ func newRespCh(ctx context.Context, buffer int) (*ctxRespSender, <-chan *storepb
198198}
199199
200200func (s ctxRespSender ) send (r * storepb.SeriesResponse ) {
201- select {
202- case <- s .ctx .Done ():
203- return
204- case s .ch <- r :
205- return
206- }
201+ s .ch <- r
207202}
208203
209204// Series returns all series for a requested time range and label matcher. Requested series are taken from other
@@ -348,6 +343,21 @@ type streamSeriesSet struct {
348343 closeSeries context.CancelFunc
349344}
350345
346+ type recvResponse struct {
347+ r * storepb.SeriesResponse
348+ err error
349+ }
350+
351+ func startFrameCtx (responseTimeout time.Duration ) (context.Context , context.CancelFunc ) {
352+ frameTimeoutCtx := context .Background ()
353+ var cancel context.CancelFunc
354+ if responseTimeout != 0 {
355+ frameTimeoutCtx , cancel = context .WithTimeout (frameTimeoutCtx , responseTimeout )
356+ return frameTimeoutCtx , cancel
357+ }
358+ return frameTimeoutCtx , nil
359+ }
360+
351361func startStreamSeriesSet (
352362 ctx context.Context ,
353363 logger log.Logger ,
@@ -384,14 +394,34 @@ func startStreamSeriesSet(
384394 }
385395 }()
386396 for {
387- r , err := s .stream .Recv ()
397+ frameTimeoutCtx , cancel := startFrameCtx (s .responseTimeout )
398+ if cancel != nil {
399+ defer cancel ()
400+ }
401+ rCh := make (chan * recvResponse , 1 )
402+ var rr * recvResponse
403+ go func () {
404+ r , err := s .stream .Recv ()
405+ rCh <- & recvResponse {r : r , err : err }
406+ }()
388407
389- if err == io .EOF {
408+ select {
409+ case <- ctx .Done ():
410+ s .timeoutHandling (true , ctx )
411+ return
412+ case <- frameTimeoutCtx .Done ():
413+ s .timeoutHandling (false , frameTimeoutCtx )
390414 return
415+ case rr = <- rCh :
391416 }
417+ close (rCh )
392418
393- if err != nil {
394- wrapErr := errors .Wrapf (err , "receive series from %s" , s .name )
419+ if rr .err == io .EOF {
420+ return
421+ }
422+
423+ if rr .err != nil {
424+ wrapErr := errors .Wrapf (rr .err , "receive series from %s" , s .name )
395425 if partialResponse {
396426 s .warnCh .send (storepb .NewWarnSeriesResponse (wrapErr ))
397427 return
@@ -402,59 +432,40 @@ func startStreamSeriesSet(
402432 s .errMtx .Unlock ()
403433 return
404434 }
405-
406435 numResponses ++
407436
408- if w := r .GetWarning (); w != "" {
437+ if w := rr . r .GetWarning (); w != "" {
409438 s .warnCh .send (storepb .NewWarnSeriesResponse (errors .New (w )))
410439 continue
411440 }
412-
413- select {
414- case s .recvCh <- r .GetSeries ():
415- continue
416- case <- ctx .Done ():
417- return
418- }
419-
441+ s .recvCh <- rr .r .GetSeries ()
420442 }
421443 }()
422444 return s
423445}
424446
425- // Next blocks until new message is received or stream is closed or operation is timed out.
426- func (s * streamSeriesSet ) Next () (ok bool ) {
427- ctx := s .ctx
428- timeoutMsg := fmt .Sprintf ("failed to receive any data from %s" , s .name )
429-
430- if s .responseTimeout != 0 {
431- timeoutMsg = fmt .Sprintf ("failed to receive any data in %s from %s" , s .responseTimeout .String (), s .name )
432-
433- timeoutCtx , done := context .WithTimeout (s .ctx , s .responseTimeout )
434- defer done ()
435- ctx = timeoutCtx
447+ func (s * streamSeriesSet ) timeoutHandling (isQueryTimeout bool , ctx context.Context ) {
448+ var err error
449+ if isQueryTimeout {
450+ err = errors .Wrap (ctx .Err (), fmt .Sprintf ("failed to receive any data from %s" , s .name ))
451+ } else {
452+ err = errors .Wrap (ctx .Err (), fmt .Sprintf ("failed to receive any data in %s from %s" , s .responseTimeout .String (), s .name ))
436453 }
437-
438- select {
439- case s .currSeries , ok = <- s .recvCh :
440- return ok
441- case <- ctx .Done ():
442- // closeSeries to shutdown a goroutine in startStreamSeriesSet.
443- s .closeSeries ()
444-
445- err := errors .Wrap (ctx .Err (), timeoutMsg )
446- if s .partialResponse {
447- level .Warn (s .logger ).Log ("err" , err , "msg" , "returning partial response" )
448- s .warnCh .send (storepb .NewWarnSeriesResponse (err ))
449- return false
450- }
451- s .errMtx .Lock ()
452- s .err = err
453- s .errMtx .Unlock ()
454-
455- level .Warn (s .logger ).Log ("err" , err , "msg" , "partial response disabled; aborting request" )
456- return false
454+ s .closeSeries ()
455+ if s .partialResponse {
456+ level .Warn (s .logger ).Log ("err" , err , "msg" , "returning partial response" )
457+ s .warnCh .send (storepb .NewWarnSeriesResponse (err ))
458+ return
457459 }
460+ s .errMtx .Lock ()
461+ s .err = err
462+ s .errMtx .Unlock ()
463+ }
464+
465+ // Next blocks until new message is received or stream is closed or operation is timed out.
466+ func (s * streamSeriesSet ) Next () (ok bool ) {
467+ s .currSeries , ok = <- s .recvCh
468+ return ok
458469}
459470
460471func (s * streamSeriesSet ) At () ([]storepb.Label , []storepb.AggrChunk ) {
0 commit comments