Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

### Added

- [#8366](https://github.com/thanos-io/thanos/pull/8366) Store: optionally ignore Parquet migrated blocks

### Changed

### Removed
Expand Down
24 changes: 16 additions & 8 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ type storeConfig struct {

matcherCacheSize int
disableAdminOperations bool
ignoreParquetMigratedBlocks bool
}

func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) {
Expand Down Expand Up @@ -232,6 +233,8 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) {

cmd.Flag("disable-admin-operations", "Disable UI/API admin operations like marking blocks for deletion and no compaction.").Default("false").BoolVar(&sc.disableAdminOperations)

cmd.Flag("ignore-parquet-migrated-blocks", "If true, store gateway will ignore blocks that have been migrated to parquet format. This allows for safe migration from TSDB to parquet blocks.").Default("false").BoolVar(&sc.ignoreParquetMigratedBlocks)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, let's always enable this, it doesn't make sense to introduce yet another flag. We already have a million of them


sc.reqLogConfig = extkingpin.RegisterRequestLoggingFlags(cmd)
}

Expand Down Expand Up @@ -393,14 +396,19 @@ func runStore(
return errors.Errorf("unknown sync strategy %s", conf.blockListStrategy)
}
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, insBkt, time.Duration(conf.ignoreDeletionMarksDelay), conf.blockMetaFetchConcurrency)
metaFetcher, err := block.NewMetaFetcher(logger, conf.blockMetaFetchConcurrency, insBkt, blockLister, dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg),
[]block.MetadataFilter{
block.NewTimePartitionMetaFilter(conf.filterConf.MinTime, conf.filterConf.MaxTime),
block.NewLabelShardedMetaFilter(relabelConfig),
block.NewConsistencyDelayMetaFilter(logger, time.Duration(conf.consistencyDelay), extprom.WrapRegistererWithPrefix("thanos_", reg)),
ignoreDeletionMarkFilter,
block.NewDeduplicateFilter(conf.blockMetaFetchConcurrency),
})
filters := []block.MetadataFilter{
block.NewTimePartitionMetaFilter(conf.filterConf.MinTime, conf.filterConf.MaxTime),
block.NewLabelShardedMetaFilter(relabelConfig),
block.NewConsistencyDelayMetaFilter(logger, time.Duration(conf.consistencyDelay), extprom.WrapRegistererWithPrefix("thanos_", reg)),
ignoreDeletionMarkFilter,
block.NewDeduplicateFilter(conf.blockMetaFetchConcurrency),
}

if conf.ignoreParquetMigratedBlocks {
filters = append(filters, block.NewParquetMigratedMetaFilter(logger))
}

metaFetcher, err := block.NewMetaFetcher(logger, conf.blockMetaFetchConcurrency, insBkt, blockLister, dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg), filters)
if err != nil {
return errors.Wrap(err, "meta fetcher")
}
Expand Down
58 changes: 58 additions & 0 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ const (
// MarkedForNoDownsampleMeta is label for blocks which are loaded but also marked for no downsample. This label is also counted in `loaded` label metric.
MarkedForNoDownsampleMeta = "marked-for-no-downsample"

// ParquetMigratedMeta is label for blocks which are marked as migrated to parquet format.
ParquetMigratedMeta = "parquet-migrated"

// Modified label values.
replicaRemovedMeta = "replica-label-removed"
)
Expand Down Expand Up @@ -162,6 +165,7 @@ func DefaultSyncedStateLabelValues() [][]string {
{duplicateMeta},
{MarkedForDeletionMeta},
{MarkedForNoCompactionMeta},
{ParquetMigratedMeta},
}
}

Expand Down Expand Up @@ -1086,3 +1090,57 @@ func ParseRelabelConfig(contentYaml []byte, supportedActions map[relabel.Action]

return relabelConfig, nil
}

var _ MetadataFilter = &ParquetMigratedMetaFilter{}

// ParquetMigratedMetaFilter is a metadata filter that filters out blocks that have been
// migrated to parquet format. The filter checks for the presence of the parquet_migrated
// extension key with a value of true.
// Not go-routine safe.
type ParquetMigratedMetaFilter struct {
logger log.Logger
}

// NewParquetMigratedMetaFilter creates a new ParquetMigratedMetaFilter.
func NewParquetMigratedMetaFilter(logger log.Logger) *ParquetMigratedMetaFilter {
return &ParquetMigratedMetaFilter{
logger: logger,
}
}

// Filter filters out blocks that have been marked as migrated to parquet format.
func (f *ParquetMigratedMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced GaugeVec, modified GaugeVec) error {
for id, meta := range metas {
if meta.Thanos.Extensions == nil {
continue
}

// Try to extract the parquet_migrated extension value
extensionsMap, ok := meta.Thanos.Extensions.(map[string]interface{})
if !ok {
// If extensions is not a map, try to convert it
extensionsBytes, err := json.Marshal(meta.Thanos.Extensions)
if err != nil {
level.Warn(f.logger).Log("msg", "failed to marshal extensions for parquet migration check", "block", id, "err", err)
continue
}

var parsedExtensions map[string]interface{}
if err := json.Unmarshal(extensionsBytes, &parsedExtensions); err != nil {
level.Warn(f.logger).Log("msg", "failed to unmarshal extensions for parquet migration check", "block", id, "err", err)
continue
}
extensionsMap = parsedExtensions
}

// Check if the parquet_migrated key exists and is set to true
if parquetMigrated, exists := extensionsMap[metadata.ParquetMigratedExtensionKey]; exists {
if migratedBool, ok := parquetMigrated.(bool); ok && migratedBool {
level.Debug(f.logger).Log("msg", "filtering out parquet migrated block", "block", id)
synced.WithLabelValues(ParquetMigratedMeta).Inc()
delete(metas, id)
}
}
}
return nil
}
156 changes: 156 additions & 0 deletions pkg/block/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1212,3 +1212,159 @@ func Test_ParseRelabelConfig(t *testing.T) {
testutil.NotOk(t, err)
testutil.Equals(t, "unsupported relabel action: labelmap", err.Error())
}

func TestParquetMigratedMetaFilter_Filter(t *testing.T) {
logger := log.NewNopLogger()
filter := NewParquetMigratedMetaFilter(logger)

synced := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "test_synced",
Help: "Test synced metric",
},
[]string{"state"},
)
modified := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "test_modified",
Help: "Test modified metric",
},
[]string{"state"},
)

ctx := context.Background()

// Test case 1: Block without extensions should not be filtered
t.Run("block without extensions", func(t *testing.T) {
metas := map[ulid.ULID]*metadata.Meta{
ulid.MustNew(1, nil): {
Thanos: metadata.Thanos{
Extensions: nil,
},
},
}

err := filter.Filter(ctx, metas, synced, modified)
testutil.Ok(t, err)
testutil.Equals(t, 1, len(metas))
})

// Test case 2: Block with extensions but no parquet_migrated key should not be filtered
t.Run("block with other extensions", func(t *testing.T) {
metas := map[ulid.ULID]*metadata.Meta{
ulid.MustNew(2, nil): {
Thanos: metadata.Thanos{
Extensions: map[string]interface{}{
"other_key": "other_value",
},
},
},
}

err := filter.Filter(ctx, metas, synced, modified)
testutil.Ok(t, err)
testutil.Equals(t, 1, len(metas))
})

// Test case 3: Block with parquet_migrated=false should not be filtered
t.Run("block with parquet_migrated=false", func(t *testing.T) {
metas := map[ulid.ULID]*metadata.Meta{
ulid.MustNew(3, nil): {
Thanos: metadata.Thanos{
Extensions: map[string]interface{}{
metadata.ParquetMigratedExtensionKey: false,
},
},
},
}

err := filter.Filter(ctx, metas, synced, modified)
testutil.Ok(t, err)
testutil.Equals(t, 1, len(metas))
})

// Test case 4: Block with parquet_migrated=true should be filtered
t.Run("block with parquet_migrated=true", func(t *testing.T) {
blockID := ulid.MustNew(4, nil)
metas := map[ulid.ULID]*metadata.Meta{
blockID: {
Thanos: metadata.Thanos{
Extensions: map[string]interface{}{
metadata.ParquetMigratedExtensionKey: true,
},
},
},
}

err := filter.Filter(ctx, metas, synced, modified)
testutil.Ok(t, err)
testutil.Equals(t, 0, len(metas))
})

// Test case 5: Mixed blocks - only parquet migrated ones should be filtered
t.Run("mixed blocks", func(t *testing.T) {
blockID1 := ulid.MustNew(5, nil)
blockID2 := ulid.MustNew(6, nil)
blockID3 := ulid.MustNew(7, nil)

metas := map[ulid.ULID]*metadata.Meta{
blockID1: {
Thanos: metadata.Thanos{
Extensions: map[string]interface{}{
metadata.ParquetMigratedExtensionKey: true,
},
},
},
blockID2: {
Thanos: metadata.Thanos{
Extensions: map[string]interface{}{
metadata.ParquetMigratedExtensionKey: false,
},
},
},
blockID3: {
Thanos: metadata.Thanos{
Extensions: nil,
},
},
}

err := filter.Filter(ctx, metas, synced, modified)
testutil.Ok(t, err)
testutil.Equals(t, 2, len(metas))

// blockID1 should be filtered out
_, exists := metas[blockID1]
testutil.Equals(t, false, exists)

// blockID2 and blockID3 should remain
_, exists = metas[blockID2]
testutil.Equals(t, true, exists)
_, exists = metas[blockID3]
testutil.Equals(t, true, exists)
})

// Test case 6: Test with JSON serialized extensions (as might come from real metadata)
t.Run("block with serialized extensions", func(t *testing.T) {
blockID := ulid.MustNew(8, nil)

// Simulate what might happen when extensions are loaded from JSON
extensions := struct {
ParquetMigrated bool `json:"parquet_migrated"`
}{
ParquetMigrated: true,
}

metas := map[ulid.ULID]*metadata.Meta{
blockID: {
Thanos: metadata.Thanos{
Extensions: extensions,
},
},
}

err := filter.Filter(ctx, metas, synced, modified)
testutil.Ok(t, err)
testutil.Equals(t, 0, len(metas))
})
}
5 changes: 5 additions & 0 deletions pkg/block/metadata/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ const (
TSDBVersion1 = 1
// ThanosVersion1 is a enumeration of Thanos section of TSDB meta supported by Thanos.
ThanosVersion1 = 1

// ParquetMigratedExtensionKey is the key used in block extensions to indicate
// that the block has been migrated to parquet format and can be safely ignored
// by store gateways.
ParquetMigratedExtensionKey = "parquet_migrated"
)

// Meta describes the a block's meta. It wraps the known TSDB meta structure and
Expand Down
Loading