diff --git a/src/OpenTelemetry/CHANGELOG.md b/src/OpenTelemetry/CHANGELOG.md index 1ef2d54b078..eb91c8b79ec 100644 --- a/src/OpenTelemetry/CHANGELOG.md +++ b/src/OpenTelemetry/CHANGELOG.md @@ -2,6 +2,10 @@ ## Unreleased +* Update `AggregatorStore` to reclaim unused MetricPoints for Delta aggregation + temporality. + ([#4486](https://github.com/open-telemetry/opentelemetry-dotnet/issues/4486)) + ## 1.6.0 Released 2023-Sep-05 diff --git a/src/OpenTelemetry/Metrics/AggregatorStore.cs b/src/OpenTelemetry/Metrics/AggregatorStore.cs index 7f20113b9a4..69343fc952f 100644 --- a/src/OpenTelemetry/Metrics/AggregatorStore.cs +++ b/src/OpenTelemetry/Metrics/AggregatorStore.cs @@ -16,6 +16,7 @@ using System.Collections.Concurrent; using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; using System.Runtime.CompilerServices; using OpenTelemetry.Internal; @@ -23,6 +24,9 @@ namespace OpenTelemetry.Metrics; internal sealed class AggregatorStore { + internal readonly bool OutputDelta; + internal long DroppedMeasurements = 0; + private static readonly string MetricPointCapHitFixMessage = "Consider opting in for the experimental SDK feature to emit all the throttled metrics under the overflow attribute by setting env variable OTEL_DOTNET_EXPERIMENTAL_METRICS_EMIT_OVERFLOW_ATTRIBUTE = true. You could also modify instrumentation to reduce the number of unique key/value pair combinations. Or use Views to drop unwanted tags. Or use MeterProviderBuilder.SetMaxMetricPointsPerMetricStream to set higher limit."; private static readonly Comparison> DimensionComparisonDelegate = (x, y) => x.Key.CompareTo(y.Key); private static readonly ExemplarFilter DefaultExemplarFilter = new AlwaysOffExemplarFilter(); @@ -32,12 +36,23 @@ internal sealed class AggregatorStore private readonly HashSet? tagKeysInteresting; private readonly int tagsKeysInterestingCount; + // This only applies to Delta AggregationTemporality. + // This decides when to change the behavior to start reclaiming MetricPoints. + // It is set to maxMetricPoints * 3 / 4, which means that Snapshot method would start to reclaim MetricPoints + // only after 75% of the MetricPoints have been used. Once the AggregatorStore starts to reclaim MetricPoints, + // it will continue to do so on every Snapshot and it won't go back to its default behavior. + private readonly int metricPointReclamationThreshold; + + // This holds the reclaimed MetricPoints that are available for reuse. + private readonly Queue? availableMetricPoints; + private readonly ConcurrentDictionary tagsToMetricPointIndexDictionary = new(); + private readonly ConcurrentDictionary? tagsToMetricPointIndexDictionaryDelta; + private readonly string name; private readonly string metricPointCapHitMessage; - private readonly bool outputDelta; private readonly MetricPoint[] metricPoints; private readonly int[] currentMetricPointBatch; private readonly AggregationType aggType; @@ -49,6 +64,7 @@ internal sealed class AggregatorStore private readonly int maxMetricPoints; private readonly bool emitOverflowAttribute; private readonly ExemplarFilter exemplarFilter; + private readonly Func[], int, int> lookupAggregatorStore; private int metricPointIndex = 0; private int batchSize = 0; @@ -56,6 +72,9 @@ internal sealed class AggregatorStore private bool zeroTagMetricPointInitialized; private bool overflowTagMetricPointInitialized; + // When set to true, the behavior changes to reuse MetricPoints + private bool reclaimMetricPoints = false; + internal AggregatorStore( MetricStreamIdentity metricStreamIdentity, AggregationType aggType, @@ -66,11 +85,12 @@ internal AggregatorStore( { this.name = metricStreamIdentity.InstrumentName; this.maxMetricPoints = maxMetricPoints; + this.metricPointCapHitMessage = $"Maximum MetricPoints limit reached for this Metric stream. Configured limit: {this.maxMetricPoints}"; this.metricPoints = new MetricPoint[maxMetricPoints]; this.currentMetricPointBatch = new int[maxMetricPoints]; this.aggType = aggType; - this.outputDelta = temporality == AggregationTemporality.Delta; + this.OutputDelta = temporality == AggregationTemporality.Delta; this.histogramBounds = metricStreamIdentity.HistogramBucketBounds ?? FindDefaultHistogramBounds(in metricStreamIdentity); this.exponentialHistogramMaxSize = metricStreamIdentity.ExponentialHistogramMaxSize; this.exponentialHistogramMaxScale = metricStreamIdentity.ExponentialHistogramMaxScale; @@ -92,11 +112,40 @@ internal AggregatorStore( this.emitOverflowAttribute = emitOverflowAttribute; + var reservedMetricPointsCount = 1; + if (emitOverflowAttribute) { // Setting metricPointIndex to 1 as we would reserve the metricPoints[1] for overflow attribute. // Newer attributes should be added starting at the index: 2 this.metricPointIndex = 1; + reservedMetricPointsCount++; + } + + if (this.OutputDelta) + { + this.availableMetricPoints = new Queue(maxMetricPoints - reservedMetricPointsCount); + + // There is no overload which only takes capacity as the parameter + // Using the DefaultConcurrencyLevel defined in the ConcurrentDictionary class: https://github.com/dotnet/runtime/blob/v7.0.5/src/libraries/System.Collections.Concurrent/src/System/Collections/Concurrent/ConcurrentDictionary.cs#L2020 + // We expect at the most (maxMetricPoints - reservedMetricPointsCount) * 2 entries- one for sorted and one for unsorted input + this.tagsToMetricPointIndexDictionaryDelta = + new ConcurrentDictionary(concurrencyLevel: Environment.ProcessorCount, capacity: (maxMetricPoints - reservedMetricPointsCount) * 2); + + this.metricPointReclamationThreshold = maxMetricPoints * 3 / 4; + + // Add a certain number of MetricPoint indices to the queue so that threads have readily available + // access to these MetricPoints for their use. + for (int i = this.metricPointReclamationThreshold + 1; i < this.maxMetricPoints; i++) + { + this.availableMetricPoints.Enqueue(i); + } + + this.lookupAggregatorStore = this.LookupAggregatorStoreForDeltaWithReclaim; + } + else + { + this.lookupAggregatorStore = this.LookupAggregatorStore; } } @@ -130,13 +179,21 @@ internal void Update(double value, ReadOnlySpan> t internal int Snapshot() { this.batchSize = 0; - var indexSnapshot = Math.Min(this.metricPointIndex, this.maxMetricPoints - 1); - if (this.outputDelta) + if (this.OutputDelta) { - this.SnapshotDelta(indexSnapshot); + if (this.reclaimMetricPoints) + { + this.SnapshotDeltaWithMetricPointReclaim(); + } + else + { + var indexSnapshot = Math.Min(this.metricPointIndex, this.maxMetricPoints - 1); + this.SnapshotDelta(indexSnapshot); + } } else { + var indexSnapshot = Math.Min(this.metricPointIndex, this.maxMetricPoints - 1); this.SnapshotCumulative(indexSnapshot); } @@ -173,6 +230,119 @@ internal void SnapshotDelta(int indexSnapshot) } } + internal void SnapshotDeltaWithMetricPointReclaim() + { + // Index = 0 is reserved for the case where no dimensions are provided. + ref var metricPointWithNoTags = ref this.metricPoints[0]; + if (metricPointWithNoTags.MetricPointStatus != MetricPointStatus.NoCollectPending) + { + if (this.IsExemplarEnabled()) + { + metricPointWithNoTags.TakeSnapshotWithExemplar(outputDelta: true); + } + else + { + metricPointWithNoTags.TakeSnapshot(outputDelta: true); + } + + this.currentMetricPointBatch[this.batchSize] = 0; + this.batchSize++; + } + + int startIndexForReclaimableMetricPoints = 1; + + if (this.emitOverflowAttribute) + { + startIndexForReclaimableMetricPoints = 2; // Index 0 and 1 are reserved for no tags and overflow + + // TakeSnapshot for the MetricPoint for overflow + ref var metricPointForOverflow = ref this.metricPoints[1]; + if (metricPointForOverflow.MetricPointStatus != MetricPointStatus.NoCollectPending) + { + if (this.IsExemplarEnabled()) + { + metricPointForOverflow.TakeSnapshotWithExemplar(outputDelta: true); + } + else + { + metricPointForOverflow.TakeSnapshot(outputDelta: true); + } + + this.currentMetricPointBatch[this.batchSize] = 1; + this.batchSize++; + } + } + + for (int i = startIndexForReclaimableMetricPoints; i < this.maxMetricPoints; i++) + { + ref var metricPoint = ref this.metricPoints[i]; + + if (metricPoint.MetricPointStatus == MetricPointStatus.NoCollectPending) + { + // If metricPoint.LookupData is `null` then the MetricPoint is already reclaimed and in the queue. + // If the Collect thread is successfully able to compare and swap the reference count from zero to int.MinValue, it means that + // the MetricPoint can be reused for other tags. + if (metricPoint.LookupData != null && Interlocked.CompareExchange(ref metricPoint.ReferenceCount, int.MinValue, 0) == 0) + { + var lookupData = metricPoint.LookupData; + + // Setting `LookupData` to `null` to denote that this MetricPoint is reclaimed. + // Snapshot method can use this to skip trying to reclaim indices which have already been reclaimed and added to the queue. + metricPoint.LookupData = null; + + Debug.Assert(this.tagsToMetricPointIndexDictionaryDelta != null, "this.tagsToMetricPointIndexDictionaryDelta was null"); + + lock (this.tagsToMetricPointIndexDictionaryDelta!) + { + LookupData? dictionaryValue; + if (lookupData.SortedTags != Tags.EmptyTags) + { + // Check if no other thread added a new entry for the same Tags. + // If no, then remove the existing entries. + if (this.tagsToMetricPointIndexDictionaryDelta.TryGetValue(lookupData.SortedTags, out dictionaryValue) && + dictionaryValue == lookupData) + { + this.tagsToMetricPointIndexDictionaryDelta.TryRemove(lookupData.SortedTags, out var _); + this.tagsToMetricPointIndexDictionaryDelta.TryRemove(lookupData.GivenTags, out var _); + } + } + else + { + if (this.tagsToMetricPointIndexDictionaryDelta.TryGetValue(lookupData.GivenTags, out dictionaryValue) && + dictionaryValue == lookupData) + { + this.tagsToMetricPointIndexDictionaryDelta.TryRemove(lookupData.GivenTags, out var _); + } + } + + Debug.Assert(this.availableMetricPoints != null, "this.availableMetricPoints was null"); + + this.availableMetricPoints!.Enqueue(i); + } + } + + continue; + } + + if (this.IsExemplarEnabled()) + { + metricPoint.TakeSnapshotWithExemplar(outputDelta: true); + } + else + { + metricPoint.TakeSnapshot(outputDelta: true); + } + + this.currentMetricPointBatch[this.batchSize] = i; + this.batchSize++; + } + + if (this.EndTimeInclusive != default) + { + this.StartTimeExclusive = this.EndTimeInclusive; + } + } + internal void SnapshotCumulative(int indexSnapshot) { for (int i = 0; i <= indexSnapshot; i++) @@ -220,7 +390,16 @@ private void InitializeZeroTagPointIfNotInitialized() { if (!this.zeroTagMetricPointInitialized) { - this.metricPoints[0] = new MetricPoint(this, this.aggType, null, this.histogramBounds, this.exponentialHistogramMaxSize, this.exponentialHistogramMaxScale); + if (this.OutputDelta) + { + var lookupData = new LookupData(0, Tags.EmptyTags, Tags.EmptyTags); + this.metricPoints[0] = new MetricPoint(this, this.aggType, null, this.histogramBounds, this.exponentialHistogramMaxSize, this.exponentialHistogramMaxScale, lookupData); + } + else + { + this.metricPoints[0] = new MetricPoint(this, this.aggType, null, this.histogramBounds, this.exponentialHistogramMaxSize, this.exponentialHistogramMaxScale); + } + this.zeroTagMetricPointInitialized = true; } } @@ -236,7 +415,19 @@ private void InitializeOverflowTagPointIfNotInitialized() { if (!this.overflowTagMetricPointInitialized) { - this.metricPoints[1] = new MetricPoint(this, this.aggType, new KeyValuePair[] { new("otel.metric.overflow", true) }, this.histogramBounds, this.exponentialHistogramMaxSize, this.exponentialHistogramMaxScale); + var keyValuePairs = new KeyValuePair[] { new("otel.metric.overflow", true) }; + var tags = new Tags(keyValuePairs); + + if (this.OutputDelta) + { + var lookupData = new LookupData(1, tags, tags); + this.metricPoints[1] = new MetricPoint(this, this.aggType, keyValuePairs, this.histogramBounds, this.exponentialHistogramMaxSize, this.exponentialHistogramMaxScale, lookupData); + } + else + { + this.metricPoints[1] = new MetricPoint(this, this.aggType, keyValuePairs, this.histogramBounds, this.exponentialHistogramMaxSize, this.exponentialHistogramMaxScale); + } + this.overflowTagMetricPointInitialized = true; } } @@ -368,6 +559,404 @@ private int LookupAggregatorStore(KeyValuePair[] tagKeysAndValu return aggregatorIndex; } + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private int LookupAggregatorStoreForDeltaWithReclaim(KeyValuePair[] tagKeysAndValues, int length) + { + int index; + var givenTags = new Tags(tagKeysAndValues); + + Debug.Assert(this.tagsToMetricPointIndexDictionaryDelta != null, "this.tagsToMetricPointIndexDictionaryDelta was null"); + + bool newMetricPointCreated = false; + + if (!this.tagsToMetricPointIndexDictionaryDelta!.TryGetValue(givenTags, out var lookupData)) + { + if (length > 1) + { + // Note: We are using storage from ThreadStatic, so need to make a deep copy for Dictionary storage. + // Create or obtain new arrays to temporarily hold the sorted tag Keys and Values + var storage = ThreadStaticStorage.GetStorage(); + storage.CloneKeysAndValues(tagKeysAndValues, length, out var tempSortedTagKeysAndValues); + + Array.Sort(tempSortedTagKeysAndValues, DimensionComparisonDelegate); + + var sortedTags = new Tags(tempSortedTagKeysAndValues); + + if (!this.tagsToMetricPointIndexDictionaryDelta.TryGetValue(sortedTags, out lookupData)) + { + // Note: We are using storage from ThreadStatic (for up to MaxTagCacheSize tags) for both the input order of tags and the sorted order of tags, + // so we need to make a deep copy for Dictionary storage. + if (length <= ThreadStaticStorage.MaxTagCacheSize) + { + var givenTagKeysAndValues = new KeyValuePair[length]; + tagKeysAndValues.CopyTo(givenTagKeysAndValues.AsSpan()); + + var sortedTagKeysAndValues = new KeyValuePair[length]; + tempSortedTagKeysAndValues.CopyTo(sortedTagKeysAndValues.AsSpan()); + + givenTags = new Tags(givenTagKeysAndValues); + sortedTags = new Tags(sortedTagKeysAndValues); + } + + Debug.Assert(this.availableMetricPoints != null, "this.availableMetricPoints was null"); + + lock (this.tagsToMetricPointIndexDictionaryDelta) + { + // check again after acquiring lock. + if (!this.tagsToMetricPointIndexDictionaryDelta.TryGetValue(sortedTags, out lookupData)) + { + if (this.reclaimMetricPoints) + { + // Check for an available MetricPoint + if (this.availableMetricPoints!.Count > 0) + { + index = this.availableMetricPoints.Dequeue(); + } + else + { + // No MetricPoint is available for reuse + return -1; + } + } + else + { + index = ++this.metricPointIndex; + if (index == this.metricPointReclamationThreshold) + { + this.reclaimMetricPoints = true; + } + } + + lookupData = new LookupData(index, sortedTags, givenTags); + + ref var metricPoint = ref this.metricPoints[index]; + metricPoint = new MetricPoint(this, this.aggType, sortedTags.KeyValuePairs, this.histogramBounds, this.exponentialHistogramMaxSize, this.exponentialHistogramMaxScale, lookupData); + newMetricPointCreated = true; + + // Add to dictionary *after* initializing MetricPoint + // as other threads can start writing to the + // MetricPoint, if dictionary entry found. + + // Add the sorted order along with the given order of tags + this.tagsToMetricPointIndexDictionaryDelta.TryAdd(sortedTags, lookupData); + this.tagsToMetricPointIndexDictionaryDelta.TryAdd(givenTags, lookupData); + } + } + } + } + else + { + // This else block is for tag length = 1 + + // Note: We are using storage from ThreadStatic, so need to make a deep copy for Dictionary storage. + var givenTagKeysAndValues = new KeyValuePair[length]; + + tagKeysAndValues.CopyTo(givenTagKeysAndValues.AsSpan()); + + givenTags = new Tags(givenTagKeysAndValues); + + Debug.Assert(this.availableMetricPoints != null, "this.availableMetricPoints was null"); + + lock (this.tagsToMetricPointIndexDictionaryDelta) + { + // check again after acquiring lock. + if (!this.tagsToMetricPointIndexDictionaryDelta.TryGetValue(givenTags, out lookupData)) + { + if (this.reclaimMetricPoints) + { + // Check for an available MetricPoint + if (this.availableMetricPoints!.Count > 0) + { + index = this.availableMetricPoints.Dequeue(); + } + else + { + // No MetricPoint is available for reuse + return -1; + } + } + else + { + index = ++this.metricPointIndex; + if (index == this.metricPointReclamationThreshold) + { + this.reclaimMetricPoints = true; + } + } + + lookupData = new LookupData(index, Tags.EmptyTags, givenTags); + + ref var metricPoint = ref this.metricPoints[index]; + metricPoint = new MetricPoint(this, this.aggType, givenTags.KeyValuePairs, this.histogramBounds, this.exponentialHistogramMaxSize, this.exponentialHistogramMaxScale, lookupData); + newMetricPointCreated = true; + + // Add to dictionary *after* initializing MetricPoint + // as other threads can start writing to the + // MetricPoint, if dictionary entry found. + + // givenTags will always be sorted when tags length == 1 + this.tagsToMetricPointIndexDictionaryDelta.TryAdd(givenTags, lookupData); + } + } + } + } + + // Found the MetricPoint + index = lookupData.Index; + + // If the running thread created a new MetricPoint, then the Snapshot method cannot reclaim that MetricPoint because MetricPoint is initialized with a ReferenceCount of 1. + // It can simply return the index. + + if (!newMetricPointCreated) + { + // If the running thread did not create the MetricPoint, it could be working on an index that has been reclaimed by Snapshot method. + // This could happen if the thread get switched out by CPU after it retrieves the index but the Snapshot method reclaims it before the thread wakes up again. + + ref var metricPointAtIndex = ref this.metricPoints[index]; + var referenceCount = Interlocked.Increment(ref metricPointAtIndex.ReferenceCount); + + if (referenceCount < 0) + { + // Rare case: Snapshot method had already marked the MetricPoint available for reuse as it has not been updated in last collect cycle. + + // Example scenario: + // Thread T1 wants to record a measurement for (k1,v1). + // Thread T1 creates a new MetricPoint at index 100 and adds an entry for (k1,v1) in the dictionary with the relevant LookupData value; ReferenceCount of the MetricPoint is 1 at this point. + // Thread T1 completes the update and decrements the ReferenceCount to 0. + // Later, another update thread (could be T1 as well) wants to record a measurement for (k1,v1) + // It looks up the dictionary and retrieves the index as 100. ReferenceCount for the MetricPoint is 0 at this point. + // This update thread gets switched out by the CPU. + // With the reclaim behavior, Snapshot method reclaims the index 100 as the MetricPoint for the index has NoCollectPending and has a ReferenceCount of 0. + // Snapshot thread sets the ReferenceCount to int.MinValue. + // The update thread wakes up and increments the ReferenceCount but finds the value to be negative. + + // Retry attempt to get a MetricPoint. + index = this.RemoveStaleEntriesAndGetAvailableMetricPointRare(lookupData, length); + } + else if (metricPointAtIndex.LookupData != lookupData) + { + // Rare case: Another thread with different input tags could have reclaimed this MetricPoint if it was freed up by Snapshot method. + + // Example scenario: + // Thread T1 wants to record a measurement for (k1,v1). + // Thread T1 creates a new MetricPoint at index 100 and adds an entry for (k1,v1) in the dictionary with the relevant LookupData value; ReferenceCount of the MetricPoint is 1 at this point. + // Thread T1 completes the update and decrements the ReferenceCount to 0. + // Later, another update thread T2 (could be T1 as well) wants to record a measurement for (k1,v1) + // It looks up the dictionary and retrieves the index as 100. ReferenceCount for the MetricPoint is 0 at this point. + // This update thread T2 gets switched out by the CPU. + // With the reclaim behavior, Snapshot method reclaims the index 100 as the MetricPoint for the index has NoCollectPending and has a ReferenceCount of 0. + // Snapshot thread sets the ReferenceCount to int.MinValue. + // An update thread T3 wants to record a measurement for (k2,v2). + // Thread T3 looks for an available index from the queue and finds index 100. + // Thread T3 creates a new MetricPoint at index 100 and adds an entry for (k2,v2) in the dictionary with the LookupData value for (k2,v2). ReferenceCount of the MetricPoint is 1 at this point. + // The update thread T2 wakes up and increments the ReferenceCount and finds the value to be positive but the LookupData value does not match the one for (k1,v1). + + // Remove reference since its not the right MetricPoint. + Interlocked.Decrement(ref metricPointAtIndex.ReferenceCount); + + // Retry attempt to get a MetricPoint. + index = this.RemoveStaleEntriesAndGetAvailableMetricPointRare(lookupData, length); + } + } + + return index; + } + + // This method is always called under `lock(this.tagsToMetricPointIndexDictionaryDelta)` so it's safe with other code that adds or removes + // entries from `this.tagsToMetricPointIndexDictionaryDelta` + private bool TryGetAvailableMetricPointRare( + Tags givenTags, + Tags sortedTags, + int length, + [NotNullWhen(true)] + out LookupData? lookupData, + out bool newMetricPointCreated) + { + Debug.Assert(this.tagsToMetricPointIndexDictionaryDelta != null, "this.tagsToMetricPointIndexDictionaryDelta was null"); + Debug.Assert(this.availableMetricPoints != null, "this.availableMetricPoints was null"); + + int index; + newMetricPointCreated = false; + + if (length > 1) + { + // check again after acquiring lock. + if (!this.tagsToMetricPointIndexDictionaryDelta!.TryGetValue(givenTags, out lookupData) && + !this.tagsToMetricPointIndexDictionaryDelta.TryGetValue(sortedTags, out lookupData)) + { + // Check for an available MetricPoint + if (this.availableMetricPoints!.Count > 0) + { + index = this.availableMetricPoints.Dequeue(); + } + else + { + // No MetricPoint is available for reuse + return false; + } + + lookupData = new LookupData(index, sortedTags, givenTags); + + ref var metricPoint = ref this.metricPoints[index]; + metricPoint = new MetricPoint(this, this.aggType, sortedTags.KeyValuePairs, this.histogramBounds, this.exponentialHistogramMaxSize, this.exponentialHistogramMaxScale, lookupData); + newMetricPointCreated = true; + + // Add to dictionary *after* initializing MetricPoint + // as other threads can start writing to the + // MetricPoint, if dictionary entry found. + + // Add the sorted order along with the given order of tags + this.tagsToMetricPointIndexDictionaryDelta.TryAdd(sortedTags, lookupData); + this.tagsToMetricPointIndexDictionaryDelta.TryAdd(givenTags, lookupData); + } + } + else + { + // check again after acquiring lock. + if (!this.tagsToMetricPointIndexDictionaryDelta!.TryGetValue(givenTags, out lookupData)) + { + // Check for an available MetricPoint + if (this.availableMetricPoints!.Count > 0) + { + index = this.availableMetricPoints.Dequeue(); + } + else + { + // No MetricPoint is available for reuse + return false; + } + + lookupData = new LookupData(index, Tags.EmptyTags, givenTags); + + ref var metricPoint = ref this.metricPoints[index]; + metricPoint = new MetricPoint(this, this.aggType, givenTags.KeyValuePairs, this.histogramBounds, this.exponentialHistogramMaxSize, this.exponentialHistogramMaxScale, lookupData); + newMetricPointCreated = true; + + // Add to dictionary *after* initializing MetricPoint + // as other threads can start writing to the + // MetricPoint, if dictionary entry found. + + // givenTags will always be sorted when tags length == 1 + this.tagsToMetricPointIndexDictionaryDelta.TryAdd(givenTags, lookupData); + } + } + + return true; + } + + // This method is essentially a retry attempt for when `LookupAggregatorStoreForDeltaWithReclaim` cannot find a MetricPoint. + // If we still fail to get a MetricPoint in this method, we don't retry any further and simply drop the measurement. + // This method acquires `lock (this.tagsToMetricPointIndexDictionaryDelta)` + private int RemoveStaleEntriesAndGetAvailableMetricPointRare(LookupData lookupData, int length) + { + bool foundMetricPoint = false; + bool newMetricPointCreated = false; + var sortedTags = lookupData.SortedTags; + var inputTags = lookupData.GivenTags; + + // Acquire lock + // Try to remove stale entries from dictionary + // Get the index for a new MetricPoint (it could be self-claimed or from another thread that added a fresh entry) + // If self-claimed, then add a fresh entry to the dictionary + // If an available MetricPoint is found, then only increment the ReferenceCount + + Debug.Assert(this.tagsToMetricPointIndexDictionaryDelta != null, "this.tagsToMetricPointIndexDictionaryDelta was null"); + + // Delete the entry for these Tags and get another MetricPoint. + lock (this.tagsToMetricPointIndexDictionaryDelta!) + { + LookupData? dictionaryValue; + if (lookupData.SortedTags != Tags.EmptyTags) + { + // Check if no other thread added a new entry for the same Tags in the meantime. + // If no, then remove the existing entries. + if (this.tagsToMetricPointIndexDictionaryDelta.TryGetValue(lookupData.SortedTags, out dictionaryValue)) + { + if (dictionaryValue == lookupData) + { + // No other thread added a new entry for the same Tags. + this.tagsToMetricPointIndexDictionaryDelta.TryRemove(lookupData.SortedTags, out _); + this.tagsToMetricPointIndexDictionaryDelta.TryRemove(lookupData.GivenTags, out _); + } + else + { + // Some other thread added a new entry for these Tags. Use the new MetricPoint + lookupData = dictionaryValue; + foundMetricPoint = true; + } + } + } + else + { + if (this.tagsToMetricPointIndexDictionaryDelta.TryGetValue(lookupData.GivenTags, out dictionaryValue)) + { + if (dictionaryValue == lookupData) + { + // No other thread added a new entry for the same Tags. + this.tagsToMetricPointIndexDictionaryDelta.TryRemove(lookupData.GivenTags, out _); + } + else + { + // Some other thread added a new entry for these Tags. Use the new MetricPoint + lookupData = dictionaryValue; + foundMetricPoint = true; + } + } + } + + if (!foundMetricPoint + && this.TryGetAvailableMetricPointRare(inputTags, sortedTags, length, out var tempLookupData, out newMetricPointCreated)) + { + foundMetricPoint = true; + lookupData = tempLookupData; + } + } + + if (foundMetricPoint) + { + var index = lookupData.Index; + + // If the running thread created a new MetricPoint, then the Snapshot method cannot reclaim that MetricPoint because MetricPoint is initialized with a ReferenceCount of 1. + // It can simply return the index. + + if (!newMetricPointCreated) + { + // If the running thread did not create the MetricPoint, it could be working on an index that has been reclaimed by Snapshot method. + // This could happen if the thread get switched out by CPU after it retrieves the index but the Snapshot method reclaims it before the thread wakes up again. + + ref var metricPointAtIndex = ref this.metricPoints[index]; + var referenceCount = Interlocked.Increment(ref metricPointAtIndex.ReferenceCount); + + if (referenceCount < 0) + { + // Super rare case: Snapshot method had already marked the MetricPoint available for reuse as it has not been updated in last collect cycle even in the retry attempt. + // Example scenario mentioned in `LookupAggregatorStoreForDeltaWithReclaim` method. + + // Don't retry again and drop the measurement. + return -1; + } + else if (metricPointAtIndex.LookupData != lookupData) + { + // Rare case: Another thread with different input tags could have reclaimed this MetricPoint if it was freed up by Snapshot method even in the retry attempt. + // Example scenario mentioned in `LookupAggregatorStoreForDeltaWithReclaim` method. + + // Remove reference since its not the right MetricPoint. + Interlocked.Decrement(ref metricPointAtIndex.ReferenceCount); + + // Don't retry again and drop the measurement. + return -1; + } + } + + return index; + } + else + { + // No MetricPoint is available for reuse + return -1; + } + } + private void UpdateLong(long value, ReadOnlySpan> tags) { try @@ -375,6 +964,8 @@ private void UpdateLong(long value, ReadOnlySpan> var index = this.FindMetricAggregatorsDefault(tags); if (index < 0) { + Interlocked.Increment(ref this.DroppedMeasurements); + if (this.emitOverflowAttribute) { this.InitializeOverflowTagPointIfNotInitialized(); @@ -405,6 +996,7 @@ private void UpdateLong(long value, ReadOnlySpan> } catch (Exception) { + Interlocked.Increment(ref this.DroppedMeasurements); OpenTelemetrySdkEventSource.Log.MeasurementDropped(this.name, "SDK internal error occurred.", "Contact SDK owners."); } } @@ -416,6 +1008,8 @@ private void UpdateLongCustomTags(long value, ReadOnlySpan> tags) @@ -574,6 +1177,6 @@ private int FindMetricAggregatorsCustomTag(ReadOnlySpan +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +namespace OpenTelemetry.Metrics; + +internal sealed class LookupData +{ + public int Index; + public Tags SortedTags; + public Tags GivenTags; + + public LookupData(int index, in Tags sortedTags, in Tags givenTags) + { + this.Index = index; + this.SortedTags = sortedTags; + this.GivenTags = givenTags; + } +} diff --git a/src/OpenTelemetry/Metrics/MetricPoint.cs b/src/OpenTelemetry/Metrics/MetricPoint.cs index 0d788dac0f2..48f726f4754 100644 --- a/src/OpenTelemetry/Metrics/MetricPoint.cs +++ b/src/OpenTelemetry/Metrics/MetricPoint.cs @@ -24,6 +24,18 @@ namespace OpenTelemetry.Metrics; /// public struct MetricPoint { + // Represents the number of update threads using this MetricPoint at any given point of time. + // If the value is equal to int.MinValue which is -2147483648, it means that this MetricPoint is available for reuse. + // We never increment the ReferenceCount for MetricPoint with no tags (index == 0) and the MetricPoint for overflow attribute, + // but we always decrement it (in the Update methods). This should be fine. + // ReferenceCount doesn't matter for MetricPoint with no tags and overflow attribute as they are never reclaimed. + internal int ReferenceCount; + + // When the AggregatorStore is reclaiming MetricPoints, this serves the purpose of validating the a given thread is using the right + // MetricPoint for update by checking it against what as added in the Dictionary. Also, when a thread finds out that the MetricPoint + // that its using is already reclaimed, this helps avoid sorting of the tags for adding a new Dictionary entry. + internal LookupData? LookupData; + private const int DefaultSimpleReservoirPoolSize = 1; private readonly AggregatorStore aggregatorStore; @@ -46,17 +58,25 @@ internal MetricPoint( KeyValuePair[]? tagKeysAndValues, double[] histogramExplicitBounds, int exponentialHistogramMaxSize, - int exponentialHistogramMaxScale) + int exponentialHistogramMaxScale, + LookupData? lookupData = null) { Debug.Assert(aggregatorStore != null, "AggregatorStore was null."); Debug.Assert(histogramExplicitBounds != null, "Histogram explicit Bounds was null."); + if (aggregatorStore!.OutputDelta) + { + Debug.Assert(lookupData != null, "LookupData was null."); + } + this.aggType = aggType; this.Tags = new ReadOnlyTagCollection(tagKeysAndValues); this.runningValue = default; this.snapshotValue = default; this.deltaLastValue = default; this.MetricPointStatus = MetricPointStatus.NoCollectPending; + this.ReferenceCount = 1; + this.LookupData = lookupData; ExemplarReservoir? reservoir = null; if (this.aggType == AggregationType.HistogramWithBuckets || @@ -437,6 +457,11 @@ internal void Update(long number) // TODO: For Delta, this can be mitigated // by ignoring Zero points this.MetricPointStatus = MetricPointStatus.CollectPending; + + if (this.aggregatorStore.OutputDelta) + { + Interlocked.Decrement(ref this.ReferenceCount); + } } internal void UpdateWithExemplar(long number, ReadOnlySpan> tags, bool isSampled) @@ -551,6 +576,11 @@ internal void UpdateWithExemplar(long number, ReadOnlySpan> tags, bool isSampled) @@ -762,6 +797,11 @@ internal void UpdateWithExemplar(double number, ReadOnlySpan { + public static readonly Tags EmptyTags = new(Array.Empty>()); + private readonly int hashCode; public Tags(KeyValuePair[] keyValuePairs) diff --git a/test/Benchmarks/Metrics/MetricsBenchmarks.cs b/test/Benchmarks/Metrics/MetricsBenchmarks.cs index aaf245c5d75..3e769c461c3 100644 --- a/test/Benchmarks/Metrics/MetricsBenchmarks.cs +++ b/test/Benchmarks/Metrics/MetricsBenchmarks.cs @@ -22,27 +22,27 @@ using OpenTelemetry.Tests; /* -BenchmarkDotNet=v0.13.5, OS=Windows 11 (10.0.23424.1000) +BenchmarkDotNet v0.13.6, Windows 11 (10.0.23424.1000) Intel Core i7-9700 CPU 3.00GHz, 1 CPU, 8 logical and 8 physical cores -.NET SDK=7.0.203 - [Host] : .NET 7.0.5 (7.0.523.17405), X64 RyuJIT AVX2 - DefaultJob : .NET 7.0.5 (7.0.523.17405), X64 RyuJIT AVX2 +.NET SDK 7.0.400 + [Host] : .NET 7.0.10 (7.0.1023.36312), X64 RyuJIT AVX2 + DefaultJob : .NET 7.0.10 (7.0.1023.36312), X64 RyuJIT AVX2 | Method | AggregationTemporality | Mean | Error | StdDev | Allocated | |-------------------------- |----------------------- |----------:|---------:|---------:|----------:| -| CounterHotPath | Cumulative | 17.06 ns | 0.113 ns | 0.094 ns | - | -| CounterWith1LabelsHotPath | Cumulative | 71.47 ns | 1.464 ns | 2.100 ns | - | -| CounterWith3LabelsHotPath | Cumulative | 162.04 ns | 2.469 ns | 2.188 ns | - | -| CounterWith5LabelsHotPath | Cumulative | 237.30 ns | 2.884 ns | 2.698 ns | - | -| CounterWith6LabelsHotPath | Cumulative | 269.41 ns | 4.087 ns | 3.623 ns | - | -| CounterWith7LabelsHotPath | Cumulative | 303.01 ns | 5.313 ns | 4.970 ns | - | -| CounterHotPath | Delta | 17.30 ns | 0.350 ns | 0.310 ns | - | -| CounterWith1LabelsHotPath | Delta | 70.96 ns | 0.608 ns | 0.539 ns | - | -| CounterWith3LabelsHotPath | Delta | 156.55 ns | 3.139 ns | 3.358 ns | - | -| CounterWith5LabelsHotPath | Delta | 247.14 ns | 4.703 ns | 5.598 ns | - | -| CounterWith6LabelsHotPath | Delta | 271.30 ns | 5.310 ns | 5.215 ns | - | -| CounterWith7LabelsHotPath | Delta | 309.02 ns | 5.934 ns | 5.828 ns | - | +| CounterHotPath | Cumulative | 21.61 ns | 0.084 ns | 0.078 ns | - | +| CounterWith1LabelsHotPath | Cumulative | 69.08 ns | 0.261 ns | 0.244 ns | - | +| CounterWith3LabelsHotPath | Cumulative | 149.77 ns | 0.549 ns | 0.486 ns | - | +| CounterWith5LabelsHotPath | Cumulative | 236.47 ns | 1.684 ns | 1.493 ns | - | +| CounterWith6LabelsHotPath | Cumulative | 276.48 ns | 1.442 ns | 1.349 ns | - | +| CounterWith7LabelsHotPath | Cumulative | 294.09 ns | 2.354 ns | 2.202 ns | - | +| CounterHotPath | Delta | 27.32 ns | 0.380 ns | 0.355 ns | - | +| CounterWith1LabelsHotPath | Delta | 80.83 ns | 0.219 ns | 0.183 ns | - | +| CounterWith3LabelsHotPath | Delta | 162.48 ns | 1.053 ns | 0.985 ns | - | +| CounterWith5LabelsHotPath | Delta | 255.48 ns | 1.807 ns | 1.602 ns | - | +| CounterWith6LabelsHotPath | Delta | 281.75 ns | 2.761 ns | 2.583 ns | - | +| CounterWith7LabelsHotPath | Delta | 310.29 ns | 1.817 ns | 1.611 ns | - | */ namespace Benchmarks.Metrics; diff --git a/test/OpenTelemetry.Tests/Metrics/MetricOverflowAttributeTests.cs b/test/OpenTelemetry.Tests/Metrics/MetricOverflowAttributeTests.cs index 98f7fa05392..e6b1dde373f 100644 --- a/test/OpenTelemetry.Tests/Metrics/MetricOverflowAttributeTests.cs +++ b/test/OpenTelemetry.Tests/Metrics/MetricOverflowAttributeTests.cs @@ -175,6 +175,8 @@ public void MetricOverflowAttributeIsRecordedCorrectlyForCounter(MetricReaderTem // 1. For zero tags // 2. For metric overflow attribute when user opts-in for this feature + counter.Add(10); // Record measurement for zero tags + // Max number for MetricPoints available for use when emitted with tags int maxMetricPointsForUse = MeterProviderBuilderSdk.MaxMetricPointsPerMetricDefault - 2; @@ -199,12 +201,12 @@ public void MetricOverflowAttributeIsRecordedCorrectlyForCounter(MetricReaderTem MetricPoint overflowMetricPoint; // We still have not exceeded the max MetricPoint limit - Assert.DoesNotContain(metricPoints, mp => mp.Tags.KeyAndValues[0].Key == "otel.metric.overflow"); + Assert.DoesNotContain(metricPoints, mp => mp.Tags.Count != 0 && mp.Tags.KeyAndValues[0].Key == "otel.metric.overflow"); exportedItems.Clear(); metricPoints.Clear(); - counter.Add(5, new KeyValuePair("Key", 9999)); // Emit a metric to exceed the max MetricPoint limit + counter.Add(5, new KeyValuePair("Key", 1998)); // Emit a metric to exceed the max MetricPoint limit meterProvider.ForceFlush(); metric = exportedItems[0]; @@ -213,7 +215,16 @@ public void MetricOverflowAttributeIsRecordedCorrectlyForCounter(MetricReaderTem metricPoints.Add(mp); } - overflowMetricPoint = metricPoints.Single(mp => mp.Tags.KeyAndValues[0].Key == "otel.metric.overflow"); + MetricPoint zeroTagsMetricPoint; + if (temporalityPreference == MetricReaderTemporalityPreference.Cumulative) + { + // Check metric point for zero tags + zeroTagsMetricPoint = metricPoints.Single(mp => mp.Tags.Count == 0); + Assert.Equal(10, zeroTagsMetricPoint.GetSumLong()); + } + + // Check metric point for overflow + overflowMetricPoint = metricPoints.Single(mp => mp.Tags.Count != 0 && mp.Tags.KeyAndValues[0].Key == "otel.metric.overflow"); Assert.Equal(true, overflowMetricPoint.Tags.KeyAndValues[0].Value); Assert.Equal(1, overflowMetricPoint.Tags.Count); Assert.Equal(5, overflowMetricPoint.GetSumLong()); @@ -221,8 +232,10 @@ public void MetricOverflowAttributeIsRecordedCorrectlyForCounter(MetricReaderTem exportedItems.Clear(); metricPoints.Clear(); - // Emit 50 more newer MetricPoints with distinct dimension combinations - for (int i = 10000; i < 10050; i++) + counter.Add(15); // Record another measurement for zero tags + + // Emit 2500 more newer MetricPoints with distinct dimension combinations + for (int i = 2000; i < 4500; i++) { counter.Add(5, new KeyValuePair("Key", i)); } @@ -234,21 +247,28 @@ public void MetricOverflowAttributeIsRecordedCorrectlyForCounter(MetricReaderTem metricPoints.Add(mp); } - overflowMetricPoint = metricPoints.Single(mp => mp.Tags.KeyAndValues[0].Key == "otel.metric.overflow"); + zeroTagsMetricPoint = metricPoints.Single(mp => mp.Tags.Count == 0); + overflowMetricPoint = metricPoints.Single(mp => mp.Tags.Count != 0 && mp.Tags.KeyAndValues[0].Key == "otel.metric.overflow"); + if (temporalityPreference == MetricReaderTemporalityPreference.Delta) { - Assert.Equal(250, overflowMetricPoint.GetSumLong()); // 50 * 5 + Assert.Equal(15, zeroTagsMetricPoint.GetSumLong()); + + // Number of metric points that were available before the 2500 measurements were made = 2000 (max MetricPoints) - 2 (reserved for zero tags and overflow) = 1998 + // Number of metric points dropped = 2500 - 1998 = 502 + Assert.Equal(2510, overflowMetricPoint.GetSumLong()); // 502 * 5 } else { - Assert.Equal(255, overflowMetricPoint.GetSumLong()); // 5 + (50 * 5) + Assert.Equal(25, zeroTagsMetricPoint.GetSumLong()); + Assert.Equal(12505, overflowMetricPoint.GetSumLong()); // 5 + (2500 * 5) } exportedItems.Clear(); metricPoints.Clear(); // Test that the SDK continues to correctly aggregate the previously registered measurements even after overflow has occurred - counter.Add(15, new KeyValuePair("Key", 0)); + counter.Add(25); meterProvider.ForceFlush(); metric = exportedItems[0]; @@ -257,18 +277,18 @@ public void MetricOverflowAttributeIsRecordedCorrectlyForCounter(MetricReaderTem metricPoints.Add(mp); } - var metricPoint = metricPoints.Single(mp => mp.Tags.KeyAndValues[0].Key == "Key" && (int)mp.Tags.KeyAndValues[0].Value == 0); + zeroTagsMetricPoint = metricPoints.Single(mp => mp.Tags.Count == 0); if (temporalityPreference == MetricReaderTemporalityPreference.Delta) { - Assert.Equal(15, metricPoint.GetSumLong()); + Assert.Equal(25, zeroTagsMetricPoint.GetSumLong()); } else { - overflowMetricPoint = metricPoints.Single(mp => mp.Tags.KeyAndValues[0].Key == "otel.metric.overflow"); + overflowMetricPoint = metricPoints.Single(mp => mp.Tags.Count != 0 && mp.Tags.KeyAndValues[0].Key == "otel.metric.overflow"); - Assert.Equal(25, metricPoint.GetSumLong()); // 10 + 15 - Assert.Equal(255, overflowMetricPoint.GetSumLong()); + Assert.Equal(50, zeroTagsMetricPoint.GetSumLong()); + Assert.Equal(12505, overflowMetricPoint.GetSumLong()); } } finally @@ -300,6 +320,8 @@ public void MetricOverflowAttributeIsRecordedCorrectlyForHistogram(MetricReaderT // 1. For zero tags // 2. For metric overflow attribute when user opts-in for this feature + histogram.Record(10); // Record measurement for zero tags + // Max number for MetricPoints available for use when emitted with tags int maxMetricPointsForUse = MeterProviderBuilderSdk.MaxMetricPointsPerMetricDefault - 2; @@ -324,12 +346,12 @@ public void MetricOverflowAttributeIsRecordedCorrectlyForHistogram(MetricReaderT MetricPoint overflowMetricPoint; // We still have not exceeded the max MetricPoint limit - Assert.DoesNotContain(metricPoints, mp => mp.Tags.KeyAndValues[0].Key == "otel.metric.overflow"); + Assert.DoesNotContain(metricPoints, mp => mp.Tags.Count != 0 && mp.Tags.KeyAndValues[0].Key == "otel.metric.overflow"); exportedItems.Clear(); metricPoints.Clear(); - histogram.Record(5, new KeyValuePair("Key", 9999)); // Emit a metric to exceed the max MetricPoint limit + histogram.Record(5, new KeyValuePair("Key", 1998)); // Emit a metric to exceed the max MetricPoint limit meterProvider.ForceFlush(); metric = exportedItems[0]; @@ -338,16 +360,27 @@ public void MetricOverflowAttributeIsRecordedCorrectlyForHistogram(MetricReaderT metricPoints.Add(mp); } - overflowMetricPoint = metricPoints.Single(mp => mp.Tags.KeyAndValues[0].Key == "otel.metric.overflow"); + MetricPoint zeroTagsMetricPoint; + if (temporalityPreference == MetricReaderTemporalityPreference.Cumulative) + { + // Check metric point for zero tags + zeroTagsMetricPoint = metricPoints.Single(mp => mp.Tags.Count == 0); + Assert.Equal(10, zeroTagsMetricPoint.GetHistogramSum()); + } + + // Check metric point for overflow + overflowMetricPoint = metricPoints.Single(mp => mp.Tags.Count != 0 && mp.Tags.KeyAndValues[0].Key == "otel.metric.overflow"); Assert.Equal(true, overflowMetricPoint.Tags.KeyAndValues[0].Value); - Assert.Equal(1, overflowMetricPoint.GetHistogramCount()); + Assert.Equal(1, overflowMetricPoint.Tags.Count); Assert.Equal(5, overflowMetricPoint.GetHistogramSum()); exportedItems.Clear(); metricPoints.Clear(); - // Emit 50 more newer MetricPoints with distinct dimension combinations - for (int i = 10000; i < 10050; i++) + histogram.Record(15); // Record another measurement for zero tags + + // Emit 2500 more newer MetricPoints with distinct dimension combinations + for (int i = 2000; i < 4500; i++) { histogram.Record(5, new KeyValuePair("Key", i)); } @@ -359,23 +392,31 @@ public void MetricOverflowAttributeIsRecordedCorrectlyForHistogram(MetricReaderT metricPoints.Add(mp); } - overflowMetricPoint = metricPoints.Single(mp => mp.Tags.KeyAndValues[0].Key == "otel.metric.overflow"); + zeroTagsMetricPoint = metricPoints.Single(mp => mp.Tags.Count == 0); + overflowMetricPoint = metricPoints.Single(mp => mp.Tags.Count != 0 && mp.Tags.KeyAndValues[0].Key == "otel.metric.overflow"); + if (temporalityPreference == MetricReaderTemporalityPreference.Delta) { - Assert.Equal(50, overflowMetricPoint.GetHistogramCount()); - Assert.Equal(250, overflowMetricPoint.GetHistogramSum()); // 50 * 5 + Assert.Equal(15, zeroTagsMetricPoint.GetHistogramSum()); + + // Number of metric points that were available before the 2500 measurements were made = 2000 (max MetricPoints) - 2 (reserved for zero tags and overflow) = 1998 + // Number of metric points dropped = 2500 - 1998 = 502 + Assert.Equal(502, overflowMetricPoint.GetHistogramCount()); + Assert.Equal(2510, overflowMetricPoint.GetHistogramSum()); // 502 * 5 } else { - Assert.Equal(51, overflowMetricPoint.GetHistogramCount()); - Assert.Equal(255, overflowMetricPoint.GetHistogramSum()); // 5 + (50 * 5) + Assert.Equal(25, zeroTagsMetricPoint.GetHistogramSum()); + + Assert.Equal(2501, overflowMetricPoint.GetHistogramCount()); + Assert.Equal(12505, overflowMetricPoint.GetHistogramSum()); // 5 + (2500 * 5) } exportedItems.Clear(); metricPoints.Clear(); // Test that the SDK continues to correctly aggregate the previously registered measurements even after overflow has occurred - histogram.Record(15, new KeyValuePair("Key", 0)); + histogram.Record(25); meterProvider.ForceFlush(); metric = exportedItems[0]; @@ -384,21 +425,18 @@ public void MetricOverflowAttributeIsRecordedCorrectlyForHistogram(MetricReaderT metricPoints.Add(mp); } - var metricPoint = metricPoints.Single(mp => mp.Tags.KeyAndValues[0].Key == "Key" && (int)mp.Tags.KeyAndValues[0].Value == 0); + zeroTagsMetricPoint = metricPoints.Single(mp => mp.Tags.Count == 0); if (temporalityPreference == MetricReaderTemporalityPreference.Delta) { - Assert.Equal(1, metricPoint.GetHistogramCount()); - Assert.Equal(15, metricPoint.GetHistogramSum()); + Assert.Equal(25, zeroTagsMetricPoint.GetHistogramSum()); } else { - overflowMetricPoint = metricPoints.Single(mp => mp.Tags.KeyAndValues[0].Key == "otel.metric.overflow"); - - Assert.Equal(2, metricPoint.GetHistogramCount()); - Assert.Equal(25, metricPoint.GetHistogramSum()); // 10 + 15 + overflowMetricPoint = metricPoints.Single(mp => mp.Tags.Count != 0 && mp.Tags.KeyAndValues[0].Key == "otel.metric.overflow"); - Assert.Equal(255, overflowMetricPoint.GetHistogramSum()); + Assert.Equal(50, zeroTagsMetricPoint.GetHistogramSum()); + Assert.Equal(12505, overflowMetricPoint.GetHistogramSum()); } } finally diff --git a/test/OpenTelemetry.Tests/Metrics/MetricPointReclaimTests.cs b/test/OpenTelemetry.Tests/Metrics/MetricPointReclaimTests.cs new file mode 100644 index 00000000000..b5ef0b65d02 --- /dev/null +++ b/test/OpenTelemetry.Tests/Metrics/MetricPointReclaimTests.cs @@ -0,0 +1,255 @@ +// +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +using System.Collections.Concurrent; +using System.Diagnostics.Metrics; +using System.Reflection; +using OpenTelemetry.Tests; +using Xunit; + +namespace OpenTelemetry.Metrics.Tests; + +public class MetricPointReclaimTests +{ + [Theory] + [InlineData(false)] + [InlineData(true)] + public void MeasurementsAreNotDropped(bool emitMetricWithNoDimensions) + { + var meter = new Meter(Utils.GetCurrentMethodName()); + var counter = meter.CreateCounter("MyFruitCounter"); + + int numberOfUpdateThreads = 25; + int maxNumberofDistinctMetricPoints = 4000; // Default max MetricPoints * 2 + + using var exporter = new CustomExporter(); + using var metricReader = new PeriodicExportingMetricReader(exporter, exportIntervalMilliseconds: 10) + { + TemporalityPreference = MetricReaderTemporalityPreference.Delta, + }; + + using var meterProvider = Sdk.CreateMeterProviderBuilder() + .AddMeter(Utils.GetCurrentMethodName()) + .AddReader(metricReader) + .Build(); + + void EmitMetric(object obj) + { + var threadArguments = obj as ThreadArguments; + var random = new Random(); + while (true) + { + int i = Interlocked.Increment(ref threadArguments!.Counter); + if (i <= maxNumberofDistinctMetricPoints) + { + // Check for cases where a metric with no dimension is also emitted + if (emitMetricWithNoDimensions) + { + counter.Add(25); + } + + // There are separate code paths for single dimension vs multiple dimensions + if (random.Next(2) == 0) + { + counter.Add(100, new KeyValuePair("key", $"value{i}")); + } + else + { + counter.Add(100, new KeyValuePair("key", $"value{i}"), new KeyValuePair("dimensionKey", "dimensionValue")); + } + + Thread.Sleep(25); + } + else + { + break; + } + } + } + + var threads = new Thread[numberOfUpdateThreads]; + var threadArgs = new ThreadArguments(); + + for (int i = 0; i < threads.Length; i++) + { + threads[i] = new Thread(EmitMetric!); + threads[i].Start(threadArgs); + } + + for (int i = 0; i < threads.Length; i++) + { + threads[i].Join(); + } + + meterProvider.ForceFlush(); + + long expectedSum; + + if (emitMetricWithNoDimensions) + { + expectedSum = maxNumberofDistinctMetricPoints * (25 + 100); + } + else + { + expectedSum = maxNumberofDistinctMetricPoints * 100; + } + + Assert.Equal(expectedSum, exporter.Sum); + } + + [Theory] + [InlineData(false)] + [InlineData(true)] + public void MeasurementsAreAggregatedAfterMetricPointReclaim(bool emitMetricWithNoDimension) + { + var meter = new Meter(Utils.GetCurrentMethodName()); + var counter = meter.CreateCounter("MyFruitCounter"); + + long sum = 0; + var measurementValues = new long[] { 10, 20 }; + + int numberOfUpdateThreads = 4; + int numberOfMeasurementsPerThread = 10; + + using var exporter = new CustomExporter(); + using var metricReader = new PeriodicExportingMetricReader(exporter, exportIntervalMilliseconds: 10) + { + TemporalityPreference = MetricReaderTemporalityPreference.Delta, + }; + + using var meterProvider = Sdk.CreateMeterProviderBuilder() + .AddMeter(Utils.GetCurrentMethodName()) + .SetMaxMetricPointsPerMetricStream(10) // Set max MetricPoints limit to 5 + .AddReader(metricReader) + .Build(); + + // Add nine distinct combinations of dimensions to switch AggregatorStore Snapshot behavior + // to start reclaiming Metric Points. (One MetricPoint is reserved for metric point with no dimensions) + for (int i = 1; i < 10; i++) + { + counter.Add(100, new KeyValuePair("key", Guid.NewGuid())); + } + + meterProvider.ForceFlush(); + meterProvider.ForceFlush(); + + exporter.Sum = 0; + + void EmitMetric() + { + int numberOfMeasurements = 0; + var random = new Random(); + while (emitMetricWithNoDimension) + { + if (numberOfMeasurements < numberOfMeasurementsPerThread) + { + // Check for cases where a metric with no dimension is also emitted + if (true) + { + counter.Add(25); + Interlocked.Add(ref sum, 25); + } + + var index = random.Next(measurementValues.Length); + var measurement = measurementValues[index]; + counter.Add(measurement, new KeyValuePair("key", $"value{index}")); + Interlocked.Add(ref sum, measurement); + + numberOfMeasurements++; + + Thread.Sleep(25); + } + else + { + break; + } + } + } + + var threads = new Thread[numberOfUpdateThreads]; + + for (int i = 0; i < threads.Length; i++) + { + threads[i] = new Thread(EmitMetric!); + threads[i].Start(); + } + + for (int i = 0; i < threads.Length; i++) + { + threads[i].Join(); + } + + meterProvider.ForceFlush(); + Assert.Equal(sum, exporter.Sum); + } + + private class ThreadArguments + { + public int Counter; + } + + private class CustomExporter : BaseExporter + { + public long Sum = 0; + + private readonly FieldInfo aggStoreFieldInfo; + + private readonly FieldInfo metricPointLookupDictionaryFieldInfo; + + public CustomExporter() + { + var metricFields = typeof(Metric).GetFields(BindingFlags.NonPublic | BindingFlags.Instance); + this.aggStoreFieldInfo = metricFields!.FirstOrDefault(field => field.Name == "aggStore"); + + var aggregatorStoreFields = typeof(AggregatorStore).GetFields(BindingFlags.NonPublic | BindingFlags.Instance); + this.metricPointLookupDictionaryFieldInfo = aggregatorStoreFields!.FirstOrDefault(field => field.Name == "tagsToMetricPointIndexDictionaryDelta"); + } + + public override ExportResult Export(in Batch batch) + { + foreach (var metric in batch) + { + var aggStore = this.aggStoreFieldInfo.GetValue(metric) as AggregatorStore; + var metricPointLookupDictionary = this.metricPointLookupDictionaryFieldInfo.GetValue(aggStore) as ConcurrentDictionary; + + var droppedMeasurements = aggStore.DroppedMeasurements; + + Assert.Equal(0, droppedMeasurements); + + // This is to ensure that the lookup dictionary does not have unbounded growth + Assert.True(metricPointLookupDictionary.Count <= (MeterProviderBuilderSdk.MaxMetricPointsPerMetricDefault * 2)); + + foreach (ref readonly var metricPoint in metric.GetMetricPoints()) + { + // Access the tags to ensure that this does not throw any exception due to + // any erroneous thread interactions. + foreach (var tag in metricPoint.Tags) + { + _ = tag.Key; + _ = tag.Value; + } + + if (metric.MetricType.IsSum()) + { + this.Sum += metricPoint.GetSumLong(); + } + } + } + + return ExportResult.Success; + } + } +} diff --git a/test/OpenTelemetry.Tests/Metrics/MetricTestsBase.cs b/test/OpenTelemetry.Tests/Metrics/MetricTestsBase.cs index 85c1b2c0817..6cb5395ed29 100644 --- a/test/OpenTelemetry.Tests/Metrics/MetricTestsBase.cs +++ b/test/OpenTelemetry.Tests/Metrics/MetricTestsBase.cs @@ -22,6 +22,8 @@ public class MetricTestsBase { public const string EmitOverFlowAttributeConfigKey = "OTEL_DOTNET_EXPERIMENTAL_METRICS_EMIT_OVERFLOW_ATTRIBUTE"; + // This method relies on the assumption that MetricPoints are exported in the order in which they are emitted. + // For Delta AggregationTemporality, this holds true only until the AggregatorStore has not begun recaliming the MetricPoints. public static void ValidateMetricPointTags(List> expectedTags, ReadOnlyTagCollection actualTags) { int tagIndex = 0; @@ -104,6 +106,8 @@ public static int GetNumberOfMetricPoints(List metrics) return null; } + // This method relies on the assumption that MetricPoints are exported in the order in which they are emitted. + // For Delta AggregationTemporality, this holds true only until the AggregatorStore has not begun recaliming the MetricPoints. // Provide tags input sorted by Key public static void CheckTagsForNthMetricPoint(List metrics, List> tags, int n) {