Skip to content

Commit 8db1895

Browse files
committed
Split compactor cleaner metrics
Signed-off-by: Daniel Deluiggi <[email protected]>
1 parent 8e3674d commit 8db1895

File tree

3 files changed

+258
-60
lines changed

3 files changed

+258
-60
lines changed

pkg/compactor/blocks_cleaner.go

Lines changed: 104 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,15 @@ func (c *BlocksCleaner) loop(ctx context.Context) error {
243243
go func() {
244244
c.runDeleteUserCleanup(ctx, deleteChan)
245245
}()
246+
var metricsChan chan *cleanerJob
247+
if c.cfg.ShardingStrategy == util.ShardingStrategyShuffle &&
248+
c.cfg.CompactionStrategy == util.CompactionStrategyPartitioning {
249+
metricsChan = make(chan *cleanerJob)
250+
defer close(metricsChan)
251+
go func() {
252+
c.runEmitMetricsWorker(ctx, metricsChan)
253+
}()
254+
}
246255

247256
for {
248257
select {
@@ -276,6 +285,17 @@ func (c *BlocksCleaner) loop(ctx context.Context) error {
276285
c.enqueueJobFailed.WithLabelValues(deletedStatus).Inc()
277286
}
278287

288+
if metricsChan != nil {
289+
select {
290+
case metricsChan <- &cleanerJob{
291+
users: activeUsers,
292+
timestamp: cleanJobTimestamp,
293+
}:
294+
default:
295+
level.Warn(c.logger).Log("msg", "unable to push metrics job to metricsChan")
296+
}
297+
}
298+
279299
case <-ctx.Done():
280300
return nil
281301
}
@@ -295,10 +315,25 @@ func (c *BlocksCleaner) checkRunError(runType string, err error) {
295315
}
296316
}
297317

298-
func (c *BlocksCleaner) runActiveUserCleanup(ctx context.Context, jobChan chan *cleanerJob) {
318+
func (c *BlocksCleaner) runEmitMetricsWorker(ctx context.Context, jobChan <-chan *cleanerJob) {
319+
for job := range jobChan {
320+
err := concurrency.ForEachUser(ctx, job.users, c.cfg.CleanupConcurrency, func(ctx context.Context, userID string) error {
321+
userLogger := util_log.WithUserID(userID, c.logger)
322+
userBucket := bucket.NewUserBucketClient(userID, c.bucketClient, c.cfgProvider)
323+
c.emitUserMetrics(ctx, userLogger, userBucket, userID)
324+
return nil
325+
})
326+
327+
if err != nil {
328+
level.Error(c.logger).Log("msg", "emit metrics failed", "err", err.Error())
329+
}
330+
}
331+
}
332+
333+
func (c *BlocksCleaner) runActiveUserCleanup(ctx context.Context, jobChan <-chan *cleanerJob) {
299334
for job := range jobChan {
300335
if job.timestamp < time.Now().Add(-c.cfg.CleanupInterval).Unix() {
301-
level.Warn(c.logger).Log("Active user cleaner job too old. Ignoring to get recent data")
336+
level.Warn(c.logger).Log("msg", "Active user cleaner job too old. Ignoring to get recent data")
302337
continue
303338
}
304339
err := c.cleanUpActiveUsers(ctx, job.users, false)
@@ -746,59 +781,14 @@ func (c *BlocksCleaner) updateBucketMetrics(userID string, parquetEnabled bool,
746781
}
747782

748783
func (c *BlocksCleaner) cleanPartitionedGroupInfo(ctx context.Context, userBucket objstore.InstrumentedBucket, userLogger log.Logger, userID string) {
749-
existentPartitionedGroupInfo := make(map[*PartitionedGroupInfo]struct {
750-
path string
751-
status PartitionedGroupStatus
752-
})
753-
err := userBucket.Iter(ctx, PartitionedGroupDirectory, func(file string) error {
754-
if strings.Contains(file, PartitionVisitMarkerDirectory) {
755-
return nil
756-
}
757-
partitionedGroupInfo, err := ReadPartitionedGroupInfoFile(ctx, userBucket, userLogger, file)
758-
if err != nil {
759-
level.Warn(userLogger).Log("msg", "failed to read partitioned group info", "partitioned_group_info", file)
760-
return nil
761-
}
762-
763-
status := partitionedGroupInfo.getPartitionedGroupStatus(ctx, userBucket, c.compactionVisitMarkerTimeout, userLogger)
764-
level.Debug(userLogger).Log("msg", "got partitioned group status", "partitioned_group_status", status.String())
765-
existentPartitionedGroupInfo[partitionedGroupInfo] = struct {
766-
path string
767-
status PartitionedGroupStatus
768-
}{
769-
path: file,
770-
status: status,
771-
}
772-
return nil
773-
})
774-
784+
err, existentPartitionedGroupInfo := c.iterPartitionGroups(ctx, userBucket, userLogger)
775785
if err != nil {
776786
level.Warn(userLogger).Log("msg", "error return when going through partitioned group directory", "err", err)
777787
}
778788

779-
remainingCompactions := 0
780-
inProgressCompactions := 0
781-
var oldestPartitionGroup *PartitionedGroupInfo
782-
defer func() {
783-
c.remainingPlannedCompactions.WithLabelValues(userID).Set(float64(remainingCompactions))
784-
c.inProgressCompactions.WithLabelValues(userID).Set(float64(inProgressCompactions))
785-
if c.oldestPartitionGroupOffset != nil {
786-
if oldestPartitionGroup != nil {
787-
c.oldestPartitionGroupOffset.WithLabelValues(userID).Set(float64(time.Now().Unix() - oldestPartitionGroup.CreationTime))
788-
level.Debug(userLogger).Log("msg", "partition group info with oldest creation time", "partitioned_group_id", oldestPartitionGroup.PartitionedGroupID, "creation_time", oldestPartitionGroup.CreationTime)
789-
} else {
790-
c.oldestPartitionGroupOffset.WithLabelValues(userID).Set(0)
791-
}
792-
}
793-
}()
794789
for partitionedGroupInfo, extraInfo := range existentPartitionedGroupInfo {
795790
partitionedGroupInfoFile := extraInfo.path
796791

797-
remainingCompactions += extraInfo.status.PendingPartitions
798-
inProgressCompactions += extraInfo.status.InProgressPartitions
799-
if oldestPartitionGroup == nil || partitionedGroupInfo.CreationTime < oldestPartitionGroup.CreationTime {
800-
oldestPartitionGroup = partitionedGroupInfo
801-
}
802792
if extraInfo.status.CanDelete {
803793
if extraInfo.status.IsCompleted {
804794
// Try to remove all blocks included in partitioned group info
@@ -829,6 +819,72 @@ func (c *BlocksCleaner) cleanPartitionedGroupInfo(ctx context.Context, userBucke
829819
}
830820
}
831821

822+
func (c *BlocksCleaner) emitUserMetrics(ctx context.Context, userLogger log.Logger, userBucket objstore.InstrumentedBucket, userID string) {
823+
err, existentPartitionedGroupInfo := c.iterPartitionGroups(ctx, userBucket, userLogger)
824+
if err != nil {
825+
level.Warn(userLogger).Log("msg", "error return when going through partitioned group directory", "err", err)
826+
}
827+
828+
remainingCompactions := 0
829+
inProgressCompactions := 0
830+
completedCompaction := 0
831+
var oldestPartitionGroup *PartitionedGroupInfo
832+
defer func() {
833+
c.remainingPlannedCompactions.WithLabelValues(userID).Set(float64(remainingCompactions))
834+
c.inProgressCompactions.WithLabelValues(userID).Set(float64(inProgressCompactions))
835+
if c.oldestPartitionGroupOffset != nil {
836+
if oldestPartitionGroup != nil {
837+
c.oldestPartitionGroupOffset.WithLabelValues(userID).Set(float64(time.Now().Unix() - oldestPartitionGroup.CreationTime))
838+
level.Debug(userLogger).Log("msg", "partition group info with oldest creation time", "partitioned_group_id", oldestPartitionGroup.PartitionedGroupID, "creation_time", oldestPartitionGroup.CreationTime)
839+
} else {
840+
c.oldestPartitionGroupOffset.WithLabelValues(userID).Set(0)
841+
}
842+
}
843+
}()
844+
for partitionedGroupInfo, extraInfo := range existentPartitionedGroupInfo {
845+
remainingCompactions += extraInfo.status.PendingPartitions
846+
inProgressCompactions += extraInfo.status.InProgressPartitions
847+
if oldestPartitionGroup == nil || partitionedGroupInfo.CreationTime < oldestPartitionGroup.CreationTime {
848+
oldestPartitionGroup = partitionedGroupInfo
849+
}
850+
if extraInfo.status.IsCompleted {
851+
completedCompaction += len(partitionedGroupInfo.Partitions)
852+
}
853+
}
854+
}
855+
856+
func (c *BlocksCleaner) iterPartitionGroups(ctx context.Context, userBucket objstore.InstrumentedBucket, userLogger log.Logger) (error, map[*PartitionedGroupInfo]struct {
857+
path string
858+
status PartitionedGroupStatus
859+
}) {
860+
existentPartitionedGroupInfo := make(map[*PartitionedGroupInfo]struct {
861+
path string
862+
status PartitionedGroupStatus
863+
})
864+
err := userBucket.Iter(ctx, PartitionedGroupDirectory, func(file string) error {
865+
if strings.Contains(file, PartitionVisitMarkerDirectory) {
866+
return nil
867+
}
868+
partitionedGroupInfo, err := ReadPartitionedGroupInfoFile(ctx, userBucket, userLogger, file)
869+
if err != nil {
870+
level.Warn(userLogger).Log("msg", "failed to read partitioned group info", "partitioned_group_info", file)
871+
return nil
872+
}
873+
874+
status := partitionedGroupInfo.getPartitionedGroupStatus(ctx, userBucket, c.compactionVisitMarkerTimeout, userLogger)
875+
level.Debug(userLogger).Log("msg", "got partitioned group status", "partitioned_group_status", status.String())
876+
existentPartitionedGroupInfo[partitionedGroupInfo] = struct {
877+
path string
878+
status PartitionedGroupStatus
879+
}{
880+
path: file,
881+
status: status,
882+
}
883+
return nil
884+
})
885+
return err, existentPartitionedGroupInfo
886+
}
887+
832888
// cleanUserPartialBlocks delete partial blocks which are safe to be deleted. The provided partials map
833889
// and index are updated accordingly.
834890
func (c *BlocksCleaner) cleanUserPartialBlocks(ctx context.Context, userID string, partials map[ulid.ULID]error, idx *bucketindex.Index, userBucket objstore.InstrumentedBucket, userLogger log.Logger) {

pkg/compactor/blocks_cleaner_test.go

Lines changed: 125 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/cortexproject/cortex/pkg/util"
2929
util_log "github.com/cortexproject/cortex/pkg/util/log"
3030
"github.com/cortexproject/cortex/pkg/util/services"
31+
"github.com/prometheus/client_golang/prometheus/testutil"
3132
)
3233

3334
type testBlocksCleanerOptions struct {
@@ -969,7 +970,6 @@ func TestBlocksCleaner_CleanPartitionedGroupInfo(t *testing.T) {
969970
block2DeletionMarkerExists, err := userBucket.Exists(ctx, path.Join(block2.String(), metadata.DeletionMarkFilename))
970971
require.NoError(t, err)
971972
require.False(t, block2DeletionMarkerExists)
972-
973973
}
974974

975975
func TestBlocksCleaner_DeleteEmptyBucketIndex(t *testing.T) {
@@ -1127,6 +1127,130 @@ func TestBlocksCleaner_ParquetMetrics(t *testing.T) {
11271127
`)))
11281128
}
11291129

1130+
func TestBlocksCleaner_EmitUserMetrics(t *testing.T) {
1131+
bucketClient, _ := cortex_testutil.PrepareFilesystemBucket(t)
1132+
bucketClient = bucketindex.BucketWithGlobalMarkers(bucketClient)
1133+
1134+
cfg := BlocksCleanerConfig{
1135+
DeletionDelay: time.Hour,
1136+
CleanupInterval: time.Minute,
1137+
CleanupConcurrency: 1,
1138+
ShardingStrategy: util.ShardingStrategyShuffle,
1139+
CompactionStrategy: util.CompactionStrategyPartitioning,
1140+
}
1141+
1142+
ctx := context.Background()
1143+
logger := log.NewNopLogger()
1144+
registry := prometheus.NewPedanticRegistry()
1145+
scanner, err := users.NewScanner(tsdb.UsersScannerConfig{
1146+
Strategy: tsdb.UserScanStrategyList,
1147+
}, bucketClient, logger, registry)
1148+
require.NoError(t, err)
1149+
cfgProvider := newMockConfigProvider()
1150+
dummyCounterVec := prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"test"})
1151+
remainingPlannedCompactions := promauto.With(registry).NewGaugeVec(prometheus.GaugeOpts{
1152+
Name: "cortex_compactor_remaining_planned_compactions",
1153+
Help: "Total number of plans that remain to be compacted. Only available with shuffle-sharding strategy",
1154+
}, commonLabels)
1155+
1156+
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, 15*time.Minute, cfgProvider, logger, "test-cleaner", registry, time.Minute, 30*time.Second, dummyCounterVec, remainingPlannedCompactions)
1157+
1158+
ts := func(hours int) int64 {
1159+
return time.Now().Add(time.Duration(hours)*time.Hour).Unix() * 1000
1160+
}
1161+
1162+
userID := "user-1"
1163+
partitionedGroupID := uint32(123)
1164+
partitionCount := 5
1165+
startTime := ts(-10)
1166+
endTime := ts(-8)
1167+
userBucket := bucket.NewUserBucketClient(userID, bucketClient, cfgProvider)
1168+
partitionedGroupInfo := PartitionedGroupInfo{
1169+
PartitionedGroupID: partitionedGroupID,
1170+
PartitionCount: partitionCount,
1171+
Partitions: []Partition{
1172+
{
1173+
PartitionID: 0,
1174+
},
1175+
{
1176+
PartitionID: 1,
1177+
},
1178+
{
1179+
PartitionID: 2,
1180+
},
1181+
{
1182+
PartitionID: 3,
1183+
},
1184+
{
1185+
PartitionID: 4,
1186+
},
1187+
},
1188+
RangeStart: startTime,
1189+
RangeEnd: endTime,
1190+
CreationTime: time.Now().Add(-1 * time.Hour).Unix(),
1191+
Version: PartitionedGroupInfoVersion1,
1192+
}
1193+
_, err = UpdatePartitionedGroupInfo(ctx, userBucket, logger, partitionedGroupInfo)
1194+
require.NoError(t, err)
1195+
1196+
//InProgress with valid VisitTime
1197+
v0 := &partitionVisitMarker{
1198+
PartitionedGroupID: partitionedGroupID,
1199+
PartitionID: 0,
1200+
Status: InProgress,
1201+
VisitTime: time.Now().Add(-2 * time.Minute).Unix(),
1202+
}
1203+
v0Manager := NewVisitMarkerManager(userBucket, logger, "dummy-cleaner", v0)
1204+
err = v0Manager.updateVisitMarker(ctx)
1205+
require.NoError(t, err)
1206+
1207+
//InProgress with expired VisitTime
1208+
v1 := &partitionVisitMarker{
1209+
PartitionedGroupID: partitionedGroupID,
1210+
PartitionID: 1,
1211+
Status: InProgress,
1212+
VisitTime: time.Now().Add(-30 * time.Minute).Unix(),
1213+
}
1214+
v1Manager := NewVisitMarkerManager(userBucket, logger, "dummy-cleaner", v1)
1215+
err = v1Manager.updateVisitMarker(ctx)
1216+
require.NoError(t, err)
1217+
1218+
//V2 and V3 are pending
1219+
//V4 is completed
1220+
v4 := &partitionVisitMarker{
1221+
PartitionedGroupID: partitionedGroupID,
1222+
PartitionID: 4,
1223+
Status: Completed,
1224+
VisitTime: time.Now().Add(-20 * time.Minute).Unix(),
1225+
}
1226+
v4Manager := NewVisitMarkerManager(userBucket, logger, "dummy-cleaner", v4)
1227+
err = v4Manager.updateVisitMarker(ctx)
1228+
require.NoError(t, err)
1229+
1230+
cleaner.emitUserMetrics(ctx, logger, userBucket, userID)
1231+
1232+
metricNames := []string{
1233+
"cortex_compactor_remaining_planned_compactions",
1234+
"cortex_compactor_in_progress_compactions",
1235+
"cortex_compactor_oldest_partition_offset",
1236+
}
1237+
1238+
// Check tracked Prometheus metrics
1239+
expectedMetrics := `
1240+
# HELP cortex_compactor_in_progress_compactions Total number of in progress compactions. Only available with shuffle-sharding strategy and partitioning compaction strategy
1241+
# TYPE cortex_compactor_in_progress_compactions gauge
1242+
cortex_compactor_in_progress_compactions{user="user-1"} 1
1243+
# HELP cortex_compactor_oldest_partition_offset Time in seconds between now and the oldest created partition group not completed. Only available with shuffle-sharding strategy and partitioning compaction strategy
1244+
# TYPE cortex_compactor_oldest_partition_offset gauge
1245+
cortex_compactor_oldest_partition_offset{user="user-1"} 3600
1246+
# HELP cortex_compactor_remaining_planned_compactions Total number of plans that remain to be compacted. Only available with shuffle-sharding strategy
1247+
# TYPE cortex_compactor_remaining_planned_compactions gauge
1248+
cortex_compactor_remaining_planned_compactions{user="user-1"} 3
1249+
`
1250+
1251+
assert.NoError(t, testutil.GatherAndCompare(registry, strings.NewReader(expectedMetrics), metricNames...))
1252+
}
1253+
11301254
type mockConfigProvider struct {
11311255
userRetentionPeriods map[string]time.Duration
11321256
parquetConverterEnabled map[string]bool

0 commit comments

Comments
 (0)