@@ -13,7 +13,6 @@ import (
1313 "github.com/prometheus/client_golang/prometheus"
1414 "github.com/prometheus/prometheus/pkg/labels"
1515 "github.com/prometheus/prometheus/storage"
16- "github.com/prometheus/prometheus/tsdb/chunkenc"
1716 "google.golang.org/grpc/codes"
1817 "google.golang.org/grpc/status"
1918
@@ -24,18 +23,19 @@ import (
2423)
2524
2625type TSDBReader interface {
27- storage.Queryable
26+ storage.ChunkQueryable
2827 StartTime () (int64 , error )
2928}
3029
3130// TSDBStore implements the store API against a local TSDB instance.
3231// It attaches the provided external labels to all results. It only responds with raw data
3332// and does not support downsampling.
3433type TSDBStore struct {
35- logger log.Logger
36- db TSDBReader
37- component component.StoreAPI
38- externalLabels labels.Labels
34+ logger log.Logger
35+ db TSDBReader
36+ component component.StoreAPI
37+ externalLabels labels.Labels
38+ maxBytesPerFrame int
3939}
4040
4141// ReadWriteTSDBStore is a TSDBStore that can also be written to.
@@ -50,10 +50,11 @@ func NewTSDBStore(logger log.Logger, _ prometheus.Registerer, db TSDBReader, com
5050 logger = log .NewNopLogger ()
5151 }
5252 return & TSDBStore {
53- logger : logger ,
54- db : db ,
55- component : component ,
56- externalLabels : externalLabels ,
53+ logger : logger ,
54+ db : db ,
55+ component : component ,
56+ externalLabels : externalLabels ,
57+ maxBytesPerFrame : 1024 * 1024 , // 1MB as recommended by gRPC.
5758 }
5859}
5960
@@ -109,7 +110,7 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSer
109110 return status .Error (codes .InvalidArgument , err .Error ())
110111 }
111112
112- q , err := s .db .Querier (context .Background (), r .MinTime , r .MaxTime )
113+ q , err := s .db .ChunkQuerier (context .Background (), r .MinTime , r .MaxTime )
113114 if err != nil {
114115 return status .Error (codes .Internal , err .Error ())
115116 }
@@ -119,72 +120,67 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSer
119120 set = q .Select (false , nil , matchers ... )
120121 respSeries storepb.Series
121122 )
123+
124+ // Stream at most one series per frame; series may be split over multiple frames according to maxBytesInFrame.
122125 for set .Next () {
123126 series := set .At ()
124-
125127 respSeries .Labels = s .translateAndExtendLabels (series .Labels (), s .externalLabels )
126-
127- if ! r .SkipChunks {
128- // TODO(fabxc): An improvement over this trivial approach would be to directly
129- // use the chunks provided by TSDB in the response.
130- c , err := s .encodeChunks (series .Iterator (), MaxSamplesPerChunk )
131- if err != nil {
132- return status .Errorf (codes .Internal , "encode chunk: %s" , err )
128+ respSeries .Chunks = respSeries .Chunks [:0 ]
129+ if r .SkipChunks {
130+ if err := srv .Send (storepb .NewSeriesResponse (& respSeries )); err != nil {
131+ return status .Error (codes .Aborted , err .Error ())
133132 }
134-
135- respSeries .Chunks = append (respSeries .Chunks [:0 ], c ... )
133+ continue
136134 }
137135
138- if err := srv .Send (storepb .NewSeriesResponse (& respSeries )); err != nil {
139- return status .Error (codes .Aborted , err .Error ())
136+ frameBytesLeft := s .maxBytesPerFrame
137+ for _ , lbl := range respSeries .Labels {
138+ frameBytesLeft -= lbl .Size ()
140139 }
141- }
142- if err := set .Err (); err != nil {
143- return status .Error (codes .Internal , err .Error ())
144- }
145- return nil
146- }
147-
148- func (s * TSDBStore ) encodeChunks (it chunkenc.Iterator , maxSamplesPerChunk int ) (chks []storepb.AggrChunk , err error ) {
149- var (
150- chkMint int64
151- chk * chunkenc.XORChunk
152- app chunkenc.Appender
153- isNext = it .Next ()
154- )
155140
156- for isNext {
157- if chk == nil {
158- chk = chunkenc . NewXORChunk ()
159- app , err = chk . Appender ()
160- if err ! = nil {
161- return nil , err
141+ chIter := series . Iterator ()
142+ isNext := chIter . Next ()
143+ for isNext {
144+ chk := chIter . At ()
145+ if chk . Chunk = = nil {
146+ return status . Errorf ( codes . Internal , "TSDBStore: found not populated chunk returned by SeriesSet at ref: %v" , chk . Ref )
162147 }
163- chkMint , _ = it .At ()
164- }
165148
166- app .Append (it .At ())
167- chkMaxt , _ := it .At ()
149+ respSeries .Chunks = append (respSeries .Chunks , storepb.AggrChunk {
150+ MinTime : chk .MinTime ,
151+ MaxTime : chk .MaxTime ,
152+ Raw : & storepb.Chunk {
153+ Type : storepb .Chunk_Encoding (chk .Chunk .Encoding () - 1 ), // proto chunk encoding is one off to TSDB one.
154+ Data : chk .Chunk .Bytes (),
155+ },
156+ })
157+ frameBytesLeft -= respSeries .Chunks [len (respSeries .Chunks )- 1 ].Size ()
158+
159+ // We are fine with minor inaccuracy of max bytes per frame. The inaccuracy will be max of full chunk size.
160+ isNext = chIter .Next ()
161+ if frameBytesLeft > 0 && isNext {
162+ continue
163+ }
168164
169- isNext = it .Next ()
170- if isNext && chk .NumSamples () < maxSamplesPerChunk {
171- continue
165+ if err := srv .Send (storepb .NewSeriesResponse (& respSeries )); err != nil {
166+ return status .Error (codes .Aborted , err .Error ())
167+ }
168+ respSeries .Chunks = respSeries .Chunks [:0 ]
169+ }
170+ if err := chIter .Err (); err != nil {
171+ return status .Error (codes .Internal , errors .Wrap (err , "chunk iter" ).Error ())
172172 }
173173
174- // Cut the chunk.
175- chks = append (chks , storepb.AggrChunk {
176- MinTime : chkMint ,
177- MaxTime : chkMaxt ,
178- Raw : & storepb.Chunk {Type : storepb .Chunk_XOR , Data : chk .Bytes ()},
179- })
180- chk = nil
181174 }
182- if it .Err () != nil {
183- return nil , errors .Wrap (it .Err (), "read TSDB series" )
175+ if err := set .Err (); err != nil {
176+ return status .Error (codes .Internal , err .Error ())
177+ }
178+ for _ , w := range set .Warnings () {
179+ if err := srv .Send (storepb .NewWarnSeriesResponse (w )); err != nil {
180+ return status .Error (codes .Aborted , err .Error ())
181+ }
184182 }
185-
186- return chks , nil
187-
183+ return nil
188184}
189185
190186// translateAndExtendLabels transforms a metrics into a protobuf label set. It additionally
@@ -217,7 +213,7 @@ func (s *TSDBStore) translateAndExtendLabels(m, extend labels.Labels) []storepb.
217213func (s * TSDBStore ) LabelNames (ctx context.Context , _ * storepb.LabelNamesRequest ) (
218214 * storepb.LabelNamesResponse , error ,
219215) {
220- q , err := s .db .Querier (ctx , math .MinInt64 , math .MaxInt64 )
216+ q , err := s .db .ChunkQuerier (ctx , math .MinInt64 , math .MaxInt64 )
221217 if err != nil {
222218 return nil , status .Error (codes .Internal , err .Error ())
223219 }
@@ -234,7 +230,7 @@ func (s *TSDBStore) LabelNames(ctx context.Context, _ *storepb.LabelNamesRequest
234230func (s * TSDBStore ) LabelValues (ctx context.Context , r * storepb.LabelValuesRequest ) (
235231 * storepb.LabelValuesResponse , error ,
236232) {
237- q , err := s .db .Querier (ctx , math .MinInt64 , math .MaxInt64 )
233+ q , err := s .db .ChunkQuerier (ctx , math .MinInt64 , math .MaxInt64 )
238234 if err != nil {
239235 return nil , status .Error (codes .Internal , err .Error ())
240236 }
0 commit comments