diff --git a/CHANGELOG.md b/CHANGELOG.md index be7532c0298..14876a67996 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ### Added +- [#8366](https://github.com/thanos-io/thanos/pull/8366) Store: optionally ignore Parquet migrated blocks + ### Changed ### Removed diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index ca86d269209..f7b3e58874e 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -393,14 +393,16 @@ func runStore( return errors.Errorf("unknown sync strategy %s", conf.blockListStrategy) } ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, insBkt, time.Duration(conf.ignoreDeletionMarksDelay), conf.blockMetaFetchConcurrency) - metaFetcher, err := block.NewMetaFetcher(logger, conf.blockMetaFetchConcurrency, insBkt, blockLister, dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg), - []block.MetadataFilter{ - block.NewTimePartitionMetaFilter(conf.filterConf.MinTime, conf.filterConf.MaxTime), - block.NewLabelShardedMetaFilter(relabelConfig), - block.NewConsistencyDelayMetaFilter(logger, time.Duration(conf.consistencyDelay), extprom.WrapRegistererWithPrefix("thanos_", reg)), - ignoreDeletionMarkFilter, - block.NewDeduplicateFilter(conf.blockMetaFetchConcurrency), - }) + filters := []block.MetadataFilter{ + block.NewTimePartitionMetaFilter(conf.filterConf.MinTime, conf.filterConf.MaxTime), + block.NewLabelShardedMetaFilter(relabelConfig), + block.NewConsistencyDelayMetaFilter(logger, time.Duration(conf.consistencyDelay), extprom.WrapRegistererWithPrefix("thanos_", reg)), + ignoreDeletionMarkFilter, + block.NewDeduplicateFilter(conf.blockMetaFetchConcurrency), + block.NewParquetMigratedMetaFilter(logger), + } + + metaFetcher, err := block.NewMetaFetcher(logger, conf.blockMetaFetchConcurrency, insBkt, blockLister, dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg), filters) if err != nil { return errors.Wrap(err, "meta fetcher") } diff --git a/pkg/block/fetcher.go b/pkg/block/fetcher.go index 9d15980fad5..4a387db59ca 100644 --- a/pkg/block/fetcher.go +++ b/pkg/block/fetcher.go @@ -92,6 +92,9 @@ const ( // MarkedForNoDownsampleMeta is label for blocks which are loaded but also marked for no downsample. This label is also counted in `loaded` label metric. MarkedForNoDownsampleMeta = "marked-for-no-downsample" + // ParquetMigratedMeta is label for blocks which are marked as migrated to parquet format. + ParquetMigratedMeta = "parquet-migrated" + // Modified label values. replicaRemovedMeta = "replica-label-removed" ) @@ -162,6 +165,7 @@ func DefaultSyncedStateLabelValues() [][]string { {duplicateMeta}, {MarkedForDeletionMeta}, {MarkedForNoCompactionMeta}, + {ParquetMigratedMeta}, } } @@ -1086,3 +1090,46 @@ func ParseRelabelConfig(contentYaml []byte, supportedActions map[relabel.Action] return relabelConfig, nil } + +var _ MetadataFilter = &ParquetMigratedMetaFilter{} + +// ParquetMigratedMetaFilter is a metadata filter that filters out blocks that have been +// migrated to parquet format. The filter checks for the presence of the parquet_migrated +// extension key with a value of true. +// Not go-routine safe. +type ParquetMigratedMetaFilter struct { + logger log.Logger +} + +// NewParquetMigratedMetaFilter creates a new ParquetMigratedMetaFilter. +func NewParquetMigratedMetaFilter(logger log.Logger) *ParquetMigratedMetaFilter { + return &ParquetMigratedMetaFilter{ + logger: logger, + } +} + +// Filter filters out blocks that have been marked as migrated to parquet format. +func (f *ParquetMigratedMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced GaugeVec, modified GaugeVec) error { + for id, meta := range metas { + if meta.Thanos.Extensions == nil { + continue + } + + extensionsMap, ok := meta.Thanos.Extensions.(map[string]interface{}) + if !ok { + continue + } + + parquetMigrated, exists := extensionsMap[metadata.ParquetMigratedExtensionKey] + if !exists { + continue + } + + if migratedBool, ok := parquetMigrated.(bool); ok && migratedBool { + level.Debug(f.logger).Log("msg", "filtering out parquet migrated block", "block", id) + synced.WithLabelValues(ParquetMigratedMeta).Inc() + delete(metas, id) + } + } + return nil +} diff --git a/pkg/block/fetcher_test.go b/pkg/block/fetcher_test.go index 2d23c94d484..944bbd85e1c 100644 --- a/pkg/block/fetcher_test.go +++ b/pkg/block/fetcher_test.go @@ -22,6 +22,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" promtest "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/tsdb" "github.com/thanos-io/objstore" @@ -1212,3 +1213,157 @@ func Test_ParseRelabelConfig(t *testing.T) { testutil.NotOk(t, err) testutil.Equals(t, "unsupported relabel action: labelmap", err.Error()) } + +func TestParquetMigratedMetaFilter_Filter(t *testing.T) { + logger := log.NewNopLogger() + filter := NewParquetMigratedMetaFilter(logger) + + // Simulate what might happen when extensions are loaded from JSON + extensions := struct { + ParquetMigrated bool `json:"parquet_migrated"` + }{ + ParquetMigrated: true, + } + + for _, c := range []struct { + name string + metas map[ulid.ULID]*metadata.Meta + check func(t *testing.T, metas map[ulid.ULID]*metadata.Meta, err error) + }{ + { + name: "block with other extensions", + metas: map[ulid.ULID]*metadata.Meta{ + ulid.MustNew(2, nil): { + Thanos: metadata.Thanos{ + Extensions: map[string]interface{}{ + "other_key": "other_value", + }, + }, + }, + }, + check: func(t *testing.T, metas map[ulid.ULID]*metadata.Meta, err error) { + testutil.Ok(t, err) + testutil.Equals(t, 1, len(metas)) + }, + }, + { + name: "no extensions", + metas: map[ulid.ULID]*metadata.Meta{ + ulid.MustNew(1, nil): { + Thanos: metadata.Thanos{ + Extensions: nil, + }, + }, + }, + check: func(t *testing.T, metas map[ulid.ULID]*metadata.Meta, err error) { + testutil.Equals(t, 1, len(metas)) + testutil.Ok(t, err) + }, + }, + { + name: "block with parquet_migrated=false", + metas: map[ulid.ULID]*metadata.Meta{ + ulid.MustNew(3, nil): { + Thanos: metadata.Thanos{ + Extensions: map[string]interface{}{ + metadata.ParquetMigratedExtensionKey: false, + }, + }, + }, + }, + check: func(t *testing.T, metas map[ulid.ULID]*metadata.Meta, err error) { + testutil.Equals(t, 1, len(metas)) + testutil.Ok(t, err) + }, + }, + { + name: "block with parquet_migrated=true", + metas: map[ulid.ULID]*metadata.Meta{ + ulid.MustNew(4, nil): { + Thanos: metadata.Thanos{ + Extensions: map[string]interface{}{ + metadata.ParquetMigratedExtensionKey: true, + }, + }, + }, + }, + check: func(t *testing.T, metas map[ulid.ULID]*metadata.Meta, err error) { + testutil.Equals(t, 0, len(metas)) + testutil.Ok(t, err) + }, + }, + { + name: "mixed blocks with parquet_migrated", + metas: map[ulid.ULID]*metadata.Meta{ + ulid.MustNew(5, nil): { + Thanos: metadata.Thanos{ + Extensions: map[string]interface{}{ + metadata.ParquetMigratedExtensionKey: true, + }, + }, + }, + ulid.MustNew(6, nil): { + Thanos: metadata.Thanos{ + Extensions: map[string]interface{}{ + metadata.ParquetMigratedExtensionKey: false, + }, + }, + }, + ulid.MustNew(7, nil): { + Thanos: metadata.Thanos{ + Extensions: nil, + }, + }, + }, + check: func(t *testing.T, metas map[ulid.ULID]*metadata.Meta, err error) { + testutil.Equals(t, 2, len(metas)) + testutil.Ok(t, err) + testutil.Assert(t, metas[ulid.MustNew(6, nil)] != nil, "Expected block with parquet_migrated=false to remain") + testutil.Assert(t, metas[ulid.MustNew(7, nil)] != nil, "Expected block without extensions to remain") + }, + }, + { + name: "block with serialized extensions", + metas: map[ulid.ULID]*metadata.Meta{ + ulid.MustNew(8, nil): { + Thanos: metadata.Thanos{ + Extensions: extensions, + }, + }, + }, + check: func(t *testing.T, metas map[ulid.ULID]*metadata.Meta, err error) { + testutil.Equals(t, 0, len(metas)) + testutil.Ok(t, err) + }, + }, + } { + t.Run(c.name, func(t *testing.T) { + r := prometheus.NewRegistry() + + synced := promauto.With(r).NewGaugeVec( + prometheus.GaugeOpts{ + Name: "test_synced", + Help: "Test synced metric", + }, + []string{"state"}, + ) + modified := promauto.With(r).NewGaugeVec( + prometheus.GaugeOpts{ + Name: "test_modified", + Help: "Test modified metric", + }, + []string{"state"}, + ) + ctx := context.Background() + + m, err := json.Marshal(c.metas) + testutil.Ok(t, err) + + var outmetas map[ulid.ULID]*metadata.Meta + testutil.Ok(t, json.Unmarshal(m, &outmetas)) + + err = filter.Filter(ctx, outmetas, synced, modified) + c.check(t, outmetas, err) + }) + } +} diff --git a/pkg/block/metadata/meta.go b/pkg/block/metadata/meta.go index 1f1aae5db57..a0706743f86 100644 --- a/pkg/block/metadata/meta.go +++ b/pkg/block/metadata/meta.go @@ -53,6 +53,11 @@ const ( TSDBVersion1 = 1 // ThanosVersion1 is a enumeration of Thanos section of TSDB meta supported by Thanos. ThanosVersion1 = 1 + + // ParquetMigratedExtensionKey is the key used in block extensions to indicate + // that the block has been migrated to parquet format and can be safely ignored + // by store gateways. + ParquetMigratedExtensionKey = "parquet_migrated" ) // Meta describes the a block's meta. It wraps the known TSDB meta structure and