Skip to content

Commit fb84117

Browse files
committed
receive/rule: Fixed Segfault issue; Added tests & benchmarks for TSDBStore, fixed multitsdb benchmarks.
Fixed #3013 Also: * Fixed other quite big issue with reusing chunk slice. * Fixed framing - previously it was wrongly sending single-chunk frames, taking huge amount of time. Before upgrading go.mod with latest TSDB: TestTSDBStore_SeriesChunkBytesCopied/flush_WAL_and_access_results: tsdb_test.go:487: tsdb_test.go:487: unexpected error: invoked function panicked or caused segmentation fault: runtime error: invalid memory address or nil pointer dereference After all ok. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
1 parent ae629b2 commit fb84117

File tree

7 files changed

+351
-92
lines changed

7 files changed

+351
-92
lines changed

pkg/store/bucket_test.go

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1178,7 +1178,7 @@ func benchBucketSeries(t testutil.TB, samplesPerSeries, totalSeries int, request
11781178
var (
11791179
logger = log.NewNopLogger()
11801180
blocks []*bucketBlock
1181-
series []storepb.Series
1181+
series []*storepb.Series
11821182
random = rand.New(rand.NewSource(120))
11831183
)
11841184

@@ -1213,7 +1213,7 @@ func benchBucketSeries(t testutil.TB, samplesPerSeries, totalSeries int, request
12131213
// This allows to pick time range that will correspond to number of series picked 1:1.
12141214
for bi := 0; bi < numOfBlocks; bi++ {
12151215
head, bSeries := storetestutil.CreateHeadWithSeries(t, bi, storetestutil.HeadGenOptions{
1216-
Dir: tmpDir,
1216+
TSDBDir: filepath.Join(tmpDir, fmt.Sprintf("%d", bi)),
12171217
SamplesPerSeries: samplesPerSeriesPerBlock,
12181218
Series: seriesPerBlock,
12191219
PrependLabels: extLset,
@@ -1536,17 +1536,22 @@ func TestSeries_RequestAndResponseHints(t *testing.T) {
15361536
}
15371537

15381538
// Create TSDB blocks.
1539-
opts := storetestutil.HeadGenOptions{
1540-
Dir: tmpDir,
1539+
head, seriesSet1 := storetestutil.CreateHeadWithSeries(t, 0, storetestutil.HeadGenOptions{
1540+
TSDBDir: filepath.Join(tmpDir, "0"),
15411541
SamplesPerSeries: 1,
15421542
Series: 2,
15431543
PrependLabels: extLset,
15441544
Random: random,
1545-
}
1546-
head, seriesSet1 := storetestutil.CreateHeadWithSeries(t, 0, opts)
1545+
})
15471546
block1 := createBlockFromHead(t, bktDir, head)
15481547
testutil.Ok(t, head.Close())
1549-
head2, seriesSet2 := storetestutil.CreateHeadWithSeries(t, 1, opts)
1548+
head2, seriesSet2 := storetestutil.CreateHeadWithSeries(t, 1, storetestutil.HeadGenOptions{
1549+
TSDBDir: filepath.Join(tmpDir, "1"),
1550+
SamplesPerSeries: 1,
1551+
Series: 2,
1552+
PrependLabels: extLset,
1553+
Random: random,
1554+
})
15501555
block2 := createBlockFromHead(t, bktDir, head2)
15511556
testutil.Ok(t, head2.Close())
15521557

@@ -1610,7 +1615,7 @@ func TestSeries_RequestAndResponseHints(t *testing.T) {
16101615
{Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"},
16111616
},
16121617
},
1613-
ExpectedSeries: append(append([]storepb.Series{}, seriesSet1...), seriesSet2...),
1618+
ExpectedSeries: append(append([]*storepb.Series{}, seriesSet1...), seriesSet2...),
16141619
ExpectedHints: []hintspb.SeriesResponseHints{
16151620
{
16161621
QueriedBlocks: []hintspb.Block{

pkg/store/multitsdb_test.go

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -92,29 +92,24 @@ func benchMultiTSDBSeries(t testutil.TB, totalSamples, totalSeries int, flushToB
9292
}
9393
}()
9494
for j := range dbs {
95+
tsdbDir := filepath.Join(tmpDir, fmt.Sprintf("%d", j))
96+
9597
head, created := storetestutil.CreateHeadWithSeries(t, j, storetestutil.HeadGenOptions{
96-
Dir: tmpDir,
98+
TSDBDir: tsdbDir,
9799
SamplesPerSeries: samplesPerSeriesPerTSDB,
98100
Series: seriesPerTSDB,
99-
WithWAL: true,
101+
WithWAL: !flushToBlocks,
100102
Random: random,
101103
SkipChunks: t.IsBenchmark(),
102104
})
103-
testutil.Ok(t, head.Close())
104-
105-
tsdbDir := filepath.Join(tmpDir, fmt.Sprintf("%d", j))
106-
107105
for i := 0; i < len(created); i++ {
108-
resps[j] = append(resps[j], storepb.NewSeriesResponse(&created[i]))
106+
resps[j] = append(resps[j], storepb.NewSeriesResponse(created[i]))
109107
}
110108

111109
if flushToBlocks {
112-
db, err := tsdb.OpenDBReadOnly(tsdbDir, logger)
113-
testutil.Ok(t, err)
114-
115-
testutil.Ok(t, db.FlushWAL(tmpDir))
116-
testutil.Ok(t, db.Close())
110+
_ = createBlockFromHead(t, tsdbDir, head)
117111
}
112+
testutil.Ok(t, head.Close())
118113

119114
db, err := tsdb.OpenDBReadOnly(tsdbDir, logger)
120115
testutil.Ok(t, err)
@@ -129,7 +124,7 @@ func benchMultiTSDBSeries(t testutil.TB, totalSamples, totalSeries int, flushToB
129124

130125
store := NewMultiTSDBStore(logger, nil, component.Receive, func() map[string]storepb.StoreServer { return tsdbs })
131126

132-
var expected []storepb.Series
127+
var expected []*storepb.Series
133128
lastLabels := storepb.Series{}
134129
for _, resp := range resps {
135130
for _, r := range resp {
@@ -141,7 +136,7 @@ func benchMultiTSDBSeries(t testutil.TB, totalSamples, totalSeries int, flushToB
141136
continue
142137
}
143138
lastLabels = x
144-
expected = append(expected, *r.GetSeries())
139+
expected = append(expected, r.GetSeries())
145140
}
146141
}
147142

pkg/store/proxy_test.go

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"math"
1212
"math/rand"
1313
"os"
14+
"path/filepath"
1415
"sort"
1516
"testing"
1617
"time"
@@ -1616,17 +1617,16 @@ func benchProxySeries(t testutil.TB, totalSamples, totalSeries int) {
16161617
var resps []*storepb.SeriesResponse
16171618

16181619
head, created := storetestutil.CreateHeadWithSeries(t, j, storetestutil.HeadGenOptions{
1619-
Dir: tmpDir,
1620+
TSDBDir: filepath.Join(tmpDir, fmt.Sprintf("%d", j)),
16201621
SamplesPerSeries: samplesPerSeriesPerClient,
16211622
Series: seriesPerClient,
1622-
MaxFrameBytes: storetestutil.RemoteReadFrameLimit,
16231623
Random: random,
16241624
SkipChunks: t.IsBenchmark(),
16251625
})
16261626
testutil.Ok(t, head.Close())
16271627

16281628
for i := 0; i < len(created); i++ {
1629-
resps = append(resps, storepb.NewSeriesResponse(&created[i]))
1629+
resps = append(resps, storepb.NewSeriesResponse(created[i]))
16301630
}
16311631

16321632
clients[j] = &testClient{
@@ -1647,23 +1647,22 @@ func benchProxySeries(t testutil.TB, totalSamples, totalSeries int) {
16471647
}
16481648

16491649
var allResps []*storepb.SeriesResponse
1650-
var expected []storepb.Series
1650+
var expected []*storepb.Series
16511651
lastLabels := storepb.Series{}
16521652
for _, c := range clients {
16531653
m := c.(*testClient).StoreClient.(*mockedStoreAPI)
16541654

1655+
// NOTE: Proxy will merge all series with same labels without any frame limit (https://github.com/thanos-io/thanos/issues/2332).
16551656
for _, r := range m.RespSeries {
16561657
allResps = append(allResps, r)
16571658

1658-
// Proxy will merge all series with same labels without limit (https://github.com/thanos-io/thanos/issues/2332).
1659-
// Let's do this here as well.
16601659
x := storepb.Series{Labels: r.GetSeries().Labels}
16611660
if x.String() == lastLabels.String() {
16621661
expected[len(expected)-1].Chunks = append(expected[len(expected)-1].Chunks, r.GetSeries().Chunks...)
16631662
continue
16641663
}
16651664
lastLabels = x
1666-
expected = append(expected, *r.GetSeries())
1665+
expected = append(expected, r.GetSeries())
16671666
}
16681667

16691668
}
@@ -1700,7 +1699,7 @@ func benchProxySeries(t testutil.TB, totalSamples, totalSeries int) {
17001699
// In this we expect exactly the same response as input.
17011700
expected = expected[:0]
17021701
for _, r := range allResps {
1703-
expected = append(expected, *r.GetSeries())
1702+
expected = append(expected, r.GetSeries())
17041703
}
17051704
storetestutil.TestServerSeries(t, store,
17061705
&storetestutil.SeriesCase{

pkg/store/storepb/testutil/series.go

Lines changed: 28 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"fmt"
99
"math"
1010
"math/rand"
11+
"os"
1112
"path/filepath"
1213
"runtime"
1314
"sort"
@@ -39,13 +40,12 @@ func allPostings(t testing.TB, ix tsdb.IndexReader) index.Postings {
3940
const RemoteReadFrameLimit = 1048576
4041

4142
type HeadGenOptions struct {
42-
Dir string
43+
TSDBDir string
4344
SamplesPerSeries, Series int
4445

45-
MaxFrameBytes int // No limit by default.
4646
WithWAL bool
4747
PrependLabels labels.Labels
48-
SkipChunks bool
48+
SkipChunks bool // Skips chunks in returned slice (not in generated head!).
4949

5050
Random *rand.Rand
5151
}
@@ -54,22 +54,23 @@ type HeadGenOptions struct {
5454
// Returned series list has "ext1"="1" prepended. Each series looks as follows:
5555
// {foo=bar,i=000001aaaaaaaaaabbbbbbbbbbccccccccccdddddddddd} <random value> where number indicate sample number from 0.
5656
// Returned series are frame in same way as remote read would frame them.
57-
func CreateHeadWithSeries(t testing.TB, j int, opts HeadGenOptions) (*tsdb.Head, []storepb.Series) {
57+
func CreateHeadWithSeries(t testing.TB, j int, opts HeadGenOptions) (*tsdb.Head, []*storepb.Series) {
5858
if opts.SamplesPerSeries < 1 || opts.Series < 1 {
5959
t.Fatal("samples and series has to be 1 or more")
6060
}
6161

62-
tsdbDir := filepath.Join(opts.Dir, fmt.Sprintf("%d", j))
63-
fmt.Printf("Creating %d %d-sample series in %s\n", opts.Series, opts.SamplesPerSeries, tsdbDir)
62+
fmt.Printf("Creating %d %d-sample series in %s\n", opts.Series, opts.SamplesPerSeries, opts.TSDBDir)
6463

6564
var w *wal.WAL
6665
var err error
6766
if opts.WithWAL {
68-
w, err = wal.New(nil, nil, filepath.Join(tsdbDir, "wal"), true)
67+
w, err = wal.New(nil, nil, filepath.Join(opts.TSDBDir, "wal"), true)
6968
testutil.Ok(t, err)
69+
} else {
70+
testutil.Ok(t, os.MkdirAll(filepath.Join(opts.TSDBDir, "wal"), os.ModePerm))
7071
}
7172

72-
h, err := tsdb.NewHead(nil, nil, w, tsdb.DefaultBlockDuration, tsdbDir, nil, tsdb.DefaultStripeSize, nil)
73+
h, err := tsdb.NewHead(nil, nil, w, tsdb.DefaultBlockDuration, opts.TSDBDir, nil, tsdb.DefaultStripeSize, nil)
7374
testutil.Ok(t, err)
7475

7576
app := h.Appender(context.Background())
@@ -96,31 +97,20 @@ func CreateHeadWithSeries(t testing.TB, j int, opts HeadGenOptions) (*tsdb.Head,
9697
var (
9798
lset labels.Labels
9899
chunkMetas []chunks.Meta
99-
expected = make([]storepb.Series, 0, opts.Series)
100-
sBytes int
100+
expected = make([]*storepb.Series, 0, opts.Series)
101101
)
102102

103103
all := allPostings(t, ir)
104104
for all.Next() {
105105
testutil.Ok(t, ir.Series(all.At(), &lset, &chunkMetas))
106-
i := 0
107106
sLset := storepb.PromLabelsToLabels(lset)
108-
expected = append(expected, storepb.Series{Labels: append(storepb.PromLabelsToLabels(opts.PrependLabels), sLset...)})
107+
expected = append(expected, &storepb.Series{Labels: append(storepb.PromLabelsToLabels(opts.PrependLabels), sLset...)})
109108

110109
if opts.SkipChunks {
111110
continue
112111
}
113112

114-
lBytes := 0
115-
for _, l := range sLset {
116-
lBytes += l.Size()
117-
}
118-
sBytes = lBytes
119-
120-
for {
121-
c := chunkMetas[i]
122-
i++
123-
113+
for _, c := range chunkMetas {
124114
chEnc, err := chks.Chunk(c.Ref)
125115
testutil.Ok(t, err)
126116

@@ -129,22 +119,11 @@ func CreateHeadWithSeries(t testing.TB, j int, opts HeadGenOptions) (*tsdb.Head,
129119
c.MaxTime = c.MinTime + int64(chEnc.NumSamples()) - 1
130120
}
131121

132-
sBytes += len(chEnc.Bytes())
133-
134122
expected[len(expected)-1].Chunks = append(expected[len(expected)-1].Chunks, storepb.AggrChunk{
135123
MinTime: c.MinTime,
136124
MaxTime: c.MaxTime,
137125
Raw: &storepb.Chunk{Type: storepb.Chunk_XOR, Data: chEnc.Bytes()},
138126
})
139-
if i >= len(chunkMetas) {
140-
break
141-
}
142-
143-
// Compose many frames as remote read (so sidecar StoreAPI) would do if requested by maxFrameBytes.
144-
if opts.MaxFrameBytes > 0 && sBytes >= opts.MaxFrameBytes {
145-
expected = append(expected, storepb.Series{Labels: sLset})
146-
sBytes = lBytes
147-
}
148127
}
149128
}
150129
testutil.Ok(t, all.Err())
@@ -158,7 +137,7 @@ type SeriesServer struct {
158137

159138
ctx context.Context
160139

161-
SeriesSet []storepb.Series
140+
SeriesSet []*storepb.Series
162141
Warnings []string
163142
HintsSet []*types.Any
164143

@@ -178,7 +157,7 @@ func (s *SeriesServer) Send(r *storepb.SeriesResponse) error {
178157
}
179158

180159
if r.GetSeries() != nil {
181-
s.SeriesSet = append(s.SeriesSet, *r.GetSeries())
160+
s.SeriesSet = append(s.SeriesSet, r.GetSeries())
182161
return nil
183162
}
184163

@@ -227,7 +206,7 @@ type SeriesCase struct {
227206
Req *storepb.SeriesRequest
228207

229208
// Exact expectations are checked only for tests. For benchmarks only length is assured.
230-
ExpectedSeries []storepb.Series
209+
ExpectedSeries []*storepb.Series
231210
ExpectedWarnings []string
232211
ExpectedHints []hintspb.SeriesResponseHints
233212
}
@@ -241,7 +220,7 @@ func TestServerSeries(t testutil.TB, store storepb.StoreServer, cases ...*Series
241220
srv := NewSeriesServer(context.Background())
242221
testutil.Ok(t, store.Series(c.Req, srv))
243222
testutil.Equals(t, len(c.ExpectedWarnings), len(srv.Warnings), "%v", srv.Warnings)
244-
testutil.Equals(t, len(c.ExpectedSeries), len(srv.SeriesSet))
223+
//testutil.Equals(t, len(c.ExpectedSeries), len(srv.SeriesSet))
245224
testutil.Equals(t, len(c.ExpectedHints), len(srv.HintsSet))
246225

247226
if !t.IsBenchmark() {
@@ -252,10 +231,18 @@ func TestServerSeries(t testutil.TB, store storepb.StoreServer, cases ...*Series
252231
})
253232
}
254233

255-
testutil.Equals(t, c.ExpectedSeries[0].Chunks[0], srv.SeriesSet[0].Chunks[0])
256-
257-
// This might give unreadable output for millions of series on fail..
258-
testutil.Equals(t, c.ExpectedSeries, srv.SeriesSet)
234+
// Huge responses can produce unreadable diffs - make it more human readable.
235+
if len(c.ExpectedSeries) > 4 {
236+
for j := range c.ExpectedSeries {
237+
testutil.Equals(t, c.ExpectedSeries[j].Labels, srv.SeriesSet[j].Labels, "%v series chunks mismatch", j)
238+
if len(c.ExpectedSeries[j].Chunks) > 20 {
239+
testutil.Equals(t, len(c.ExpectedSeries[j].Chunks), len(srv.SeriesSet[j].Chunks), "%v series chunks number mismatch", j)
240+
}
241+
testutil.Equals(t, c.ExpectedSeries[j].Chunks, srv.SeriesSet[j].Chunks, "%v series chunks mismatch", j)
242+
}
243+
} else {
244+
testutil.Equals(t, c.ExpectedSeries, srv.SeriesSet)
245+
}
259246

260247
var actualHints []hintspb.SeriesResponseHints
261248
for _, anyHints := range srv.HintsSet {

0 commit comments

Comments
 (0)