Skip to content

Commit f4c264c

Browse files
author
Aleksey Sin
committed
Refactoring for PR.
Signed-off-by: Aleksey Sin <asin@ozon.ru>
1 parent c71cc72 commit f4c264c

File tree

1 file changed

+17
-8
lines changed

1 file changed

+17
-8
lines changed

pkg/store/proxy.go

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -302,8 +302,9 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe
302302
}
303303

304304
var storeTypeStr string
305-
if st.StoreType() != nil {
306-
storeTypeStr = st.StoreType().String()
305+
storeType := st.StoreType()
306+
if storeType != nil {
307+
storeTypeStr = storeType.String()
307308
}
308309
metrics := &storeRequestMetrics{
309310
withPayload: s.metrics.timeToFirstByte.WithLabelValues(st.LabelSetsString(), storeTypeStr, withPayloadLabel),
@@ -312,10 +313,10 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe
312313
queryTimeoutCount: s.metrics.queryTimeoutCount.WithLabelValues(st.LabelSetsString(), storeTypeStr),
313314
}
314315

315-
seriesSet = append(seriesSet, startStreamSeriesSet(seriesCtx, s.logger, closeSeries,
316-
wg, sc, respSender, st.String(), !r.PartialResponseDisabled, s.responseTimeout, metrics))
317316
// Schedule streamSeriesSet that translates gRPC streamed response
318317
// into seriesSet (if series) or respCh if warnings.
318+
seriesSet = append(seriesSet, startStreamSeriesSet(seriesCtx, s.logger, closeSeries,
319+
wg, sc, respSender, st.String(), !r.PartialResponseDisabled, s.responseTimeout, metrics))
319320
}
320321

321322
level.Debug(s.logger).Log("msg", strings.Join(storeDebugMsgs, ";"))
@@ -380,6 +381,16 @@ type recvResponse struct {
380381
err error
381382
}
382383

384+
func startFrameCtx(responseTimeout time.Duration) (context.Context, context.CancelFunc) {
385+
frameTimeoutCtx := context.Background()
386+
var cancel context.CancelFunc
387+
if responseTimeout != 0 {
388+
frameTimeoutCtx, cancel = context.WithTimeout(frameTimeoutCtx, responseTimeout)
389+
return frameTimeoutCtx, cancel
390+
}
391+
return frameTimeoutCtx, nil
392+
}
393+
383394
func startStreamSeriesSet(
384395
ctx context.Context,
385396
logger log.Logger,
@@ -410,10 +421,8 @@ func startStreamSeriesSet(
410421
defer wg.Done()
411422
defer close(s.recvCh)
412423
for {
413-
frameTimeoutCtx := context.Background()
414-
var cancel context.CancelFunc
415-
if s.responseTimeout != 0 {
416-
frameTimeoutCtx, cancel = context.WithTimeout(frameTimeoutCtx, s.responseTimeout)
424+
frameTimeoutCtx, cancel := startFrameCtx(s.responseTimeout)
425+
if cancel != nil {
417426
defer cancel()
418427
}
419428
rCh := make(chan *recvResponse, 1)

0 commit comments

Comments
 (0)