Skip to content

Commit 109b21e

Browse files
authored
bump parquet converter marker version to 2 for parquet files with hash column (#7154)
Signed-off-by: yeya24 <[email protected]>
1 parent bb7ba54 commit 109b21e

File tree

3 files changed

+82
-2
lines changed

3 files changed

+82
-2
lines changed

pkg/parquetconverter/converter.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -386,7 +386,8 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin
386386
continue
387387
}
388388

389-
if marker.Version == cortex_parquet.CurrentVersion {
389+
// We don't convert blocks again if they already have a valid converter mark.
390+
if cortex_parquet.ValidConverterMarkVersion(marker.Version) {
390391
continue
391392
}
392393

pkg/parquetconverter/converter_test.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package parquetconverter
22

33
import (
4+
"bytes"
45
"context"
6+
"encoding/json"
57
"errors"
68
"fmt"
79
"io"
@@ -414,3 +416,71 @@ func (m *mockTenantLimits) ByUserID(userID string) *validation.Limits {
414416
func (m *mockTenantLimits) AllByUserID() map[string]*validation.Limits {
415417
return m.limits
416418
}
419+
420+
func TestConverter_SkipBlocksWithExistingValidMarker(t *testing.T) {
421+
cfg := prepareConfig()
422+
user := "user"
423+
ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
424+
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
425+
dir := t.TempDir()
426+
427+
cfg.Ring.InstanceID = "parquet-converter-1"
428+
cfg.Ring.InstanceAddr = "1.2.3.4"
429+
cfg.Ring.KVStore.Mock = ringStore
430+
bucketClient, err := filesystem.NewBucket(t.TempDir())
431+
require.NoError(t, err)
432+
userBucket := bucket.NewPrefixedBucketClient(bucketClient, user)
433+
limits := &validation.Limits{}
434+
flagext.DefaultValues(limits)
435+
limits.ParquetConverterEnabled = true
436+
437+
c, logger, _ := prepare(t, cfg, objstore.WithNoopInstr(bucketClient), limits, nil)
438+
439+
ctx := context.Background()
440+
441+
lbls := labels.FromStrings("__name__", "test")
442+
443+
// Create a block
444+
rnd := rand.New(rand.NewSource(time.Now().Unix()))
445+
blockID, err := e2e.CreateBlock(ctx, rnd, dir, []labels.Labels{lbls}, 2, 0, 2*time.Hour.Milliseconds(), time.Minute.Milliseconds(), 10)
446+
require.NoError(t, err)
447+
448+
// Upload the block to the bucket
449+
blockDir := fmt.Sprintf("%s/%s", dir, blockID.String())
450+
b, err := tsdb.OpenBlock(nil, blockDir, nil, nil)
451+
require.NoError(t, err)
452+
err = block.Upload(ctx, logger, userBucket, b.Dir(), metadata.NoneFunc)
453+
require.NoError(t, err)
454+
455+
// Write a converter mark with version 1 to simulate an already converted block
456+
markerV1 := parquet.ConverterMark{
457+
Version: parquet.ParquetConverterMarkVersion1,
458+
}
459+
markerBytes, err := json.Marshal(markerV1)
460+
require.NoError(t, err)
461+
markerPath := path.Join(blockID.String(), parquet.ConverterMarkerFileName)
462+
err = userBucket.Upload(ctx, markerPath, bytes.NewReader(markerBytes))
463+
require.NoError(t, err)
464+
465+
// Verify the marker exists with version 1
466+
marker, err := parquet.ReadConverterMark(ctx, blockID, userBucket, logger)
467+
require.NoError(t, err)
468+
require.Equal(t, parquet.ParquetConverterMarkVersion1, marker.Version)
469+
470+
// Start the converter
471+
err = services.StartAndAwaitRunning(context.Background(), c)
472+
require.NoError(t, err)
473+
defer services.StopAndAwaitTerminated(ctx, c) // nolint:errcheck
474+
475+
// Wait a bit for the converter to process blocks
476+
time.Sleep(5 * time.Second)
477+
478+
// Verify the marker version is still 1 (i.e., the block was not converted again)
479+
markerAfter, err := parquet.ReadConverterMark(ctx, blockID, userBucket, logger)
480+
require.NoError(t, err)
481+
require.Equal(t, parquet.ParquetConverterMarkVersion1, markerAfter.Version, "block with existing marker version 1 should not be converted again")
482+
483+
// Verify that no conversion happened by checking the convertedBlocks metric
484+
// It should be 0 since the block was already converted
485+
assert.Equal(t, 0.0, testutil.ToFloat64(c.metrics.convertedBlocks.WithLabelValues(user)))
486+
}

pkg/storage/parquet/converter_marker.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,12 @@ import (
1919
const (
2020
ConverterMarkerPrefix = "parquet-markers"
2121
ConverterMarkerFileName = "parquet-converter-mark.json"
22-
CurrentVersion = 1
22+
23+
CurrentVersion = ParquetConverterMarkVersion2
24+
ParquetConverterMarkVersion1 = 1
25+
// ParquetConverterMarkVersion2 has an additional series hash
26+
// column which is used for projection pushdown.
27+
ParquetConverterMarkVersion2 = 2
2328
)
2429

2530
type ConverterMark struct {
@@ -64,3 +69,7 @@ func WriteConverterMark(ctx context.Context, id ulid.ULID, userBkt objstore.Buck
6469
type ConverterMarkMeta struct {
6570
Version int `json:"version"`
6671
}
72+
73+
func ValidConverterMarkVersion(version int) bool {
74+
return version == ParquetConverterMarkVersion1 || version == ParquetConverterMarkVersion2
75+
}

0 commit comments

Comments
 (0)