From 656fdebcf78c568dce17ed815048e55799972136 Mon Sep 17 00:00:00 2001 From: "Xiaochao Dong (@damnever)" Date: Mon, 18 Dec 2023 10:44:47 +0800 Subject: [PATCH 1/2] Count the number of rate-limited samples in distributor_samples_in_total Signed-off-by: Xiaochao Dong (@damnever) --- pkg/distributor/distributor.go | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 30e4d799d5e..49d9446d68b 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -583,21 +583,9 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co inflight := d.inflightPushRequests.Inc() defer d.inflightPushRequests.Dec() - if d.cfg.InstanceLimits.MaxInflightPushRequests > 0 && inflight > int64(d.cfg.InstanceLimits.MaxInflightPushRequests) { - return nil, errTooManyInflightPushRequests - } - - if d.cfg.InstanceLimits.MaxIngestionRate > 0 { - if rate := d.ingestionRate.Rate(); rate >= d.cfg.InstanceLimits.MaxIngestionRate { - return nil, errMaxSamplesPushRateLimitReached - } - } - now := time.Now() d.activeUsers.UpdateUserTimestamp(userID, now) - removeReplica := false - numSamples := 0 numExemplars := 0 for _, ts := range req.Timeseries { @@ -610,6 +598,17 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co // Count the total number of metadata in. d.incomingMetadata.WithLabelValues(userID).Add(float64(len(req.Metadata))) + if d.cfg.InstanceLimits.MaxInflightPushRequests > 0 && inflight > int64(d.cfg.InstanceLimits.MaxInflightPushRequests) { + return nil, errTooManyInflightPushRequests + } + + if d.cfg.InstanceLimits.MaxIngestionRate > 0 { + if rate := d.ingestionRate.Rate(); rate >= d.cfg.InstanceLimits.MaxIngestionRate { + return nil, errMaxSamplesPushRateLimitReached + } + } + + removeReplica := false // Cache user limit with overrides so we spend less CPU doing locking. See issue #4904 limits := d.limits.GetOverridesForUser(userID) From f2b43dc01f6a3a2d385b548cce47283f6491c96e Mon Sep 17 00:00:00 2001 From: "Xiaochao Dong (@damnever)" Date: Mon, 29 Jan 2024 14:38:12 +0800 Subject: [PATCH 2/2] Update changelog Signed-off-by: Xiaochao Dong (@damnever) --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7cd7776c557..64fa3fd5645 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ * [CHANGE] Store Gateway: Add a new fastcache based inmemory index cache. #5619 * [CHANGE] Index Cache: Multi level cache backfilling operation becomes async. Added `-blocks-storage.bucket-store.index-cache.multilevel.max-async-concurrency` and `-blocks-storage.bucket-store.index-cache.multilevel.max-async-buffer-size` configs and metric `cortex_store_multilevel_index_cache_backfill_dropped_items_total` for number of dropped items. #5661 * [CHANGE] Ingester: Disable uploading compacted blocks and overlapping compaction in ingester. #5735 +* [CHANGE] Distributor: Count the number of rate-limited samples in `distributor_samples_in_total`. #5714 * [FEATURE] Ingester: Add per-tenant new metric `cortex_ingester_tsdb_data_replay_duration_seconds`. #5477 * [FEATURE] Query Frontend/Scheduler: Add query priority support. #5605 * [FEATURE] Tracing: Add `kuberesolver` to resolve endpoints address with `kubernetes://` prefix as Kubernetes service. #5731