Skip to content

Commit a97f113

Browse files
authored
Merge branch 'main' into optimize_lastvalue
2 parents a5b0006 + 19a5a68 commit a97f113

File tree

10 files changed

+176
-64
lines changed

10 files changed

+176
-64
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
1414
This prevents scrape failures when the Prometheus exporter is misconfigured to get data from the Prometheus bridge. (#7688)
1515
- Improve performance of concurrent histogram measurements in `go.opentelemetry.io/otel/sdk/metric`. (#7474)
1616
- Add experimental observability metrics in `go.opentelemetry.io/otel/exporters/stdout/stdoutmetric`. (#7492)
17+
- Improve the concurrent performance of `HistogramReservoir` in `go.opentelemetry.io/otel/sdk/metric/exemplar` by 4x. (#7443)
1718
- Improve performance of concurrent synchronous gauge measurements in `go.opentelemetry.io/otel/sdk/metric`. (#7478)
1819

1920
<!-- Released section -->

internal/tools/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ require (
235235
golang.org/x/net v0.48.0 // indirect
236236
golang.org/x/sync v0.19.0 // indirect
237237
golang.org/x/sys v0.39.0 // indirect
238-
golang.org/x/telemetry v0.0.0-20251208220230-2638a1023523 // indirect
238+
golang.org/x/telemetry v0.0.0-20251215142616-e75fd47794af // indirect
239239
golang.org/x/text v0.32.0 // indirect
240240
google.golang.org/protobuf v1.36.11 // indirect
241241
gopkg.in/warnings.v0 v0.1.2 // indirect

internal/tools/go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -620,8 +620,8 @@ golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
620620
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
621621
golang.org/x/sys v0.39.0 h1:CvCKL8MeisomCi6qNZ+wbb0DN9E5AATixKsvNtMoMFk=
622622
golang.org/x/sys v0.39.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
623-
golang.org/x/telemetry v0.0.0-20251208220230-2638a1023523 h1:H52Mhyrc44wBgLTGzq6+0cmuVuF3LURCSXsLMOqfFos=
624-
golang.org/x/telemetry v0.0.0-20251208220230-2638a1023523/go.mod h1:ArQvPJS723nJQietgilmZA+shuB3CZxH1n2iXq9VSfs=
623+
golang.org/x/telemetry v0.0.0-20251215142616-e75fd47794af h1:JLNgZmN0uDGV+zlgKknvmvX9+atzn9b7S6M1L6J5tQs=
624+
golang.org/x/telemetry v0.0.0-20251215142616-e75fd47794af/go.mod h1:ArQvPJS723nJQietgilmZA+shuB3CZxH1n2iXq9VSfs=
625625
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
626626
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
627627
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=

sdk/metric/benchmark_test.go

Lines changed: 36 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -81,117 +81,127 @@ func benchSyncViews(sc trace.SpanContext, views ...View) func(*testing.B) {
8181
expRdr := NewManualReader(WithAggregationSelector(exponentialAggregationSelector))
8282
expProvider := NewMeterProvider(WithReader(expRdr), WithView(views...))
8383
expMeter := expProvider.Meter("benchSyncViews")
84+
// Precompute histogram values so they are distributed equally to buckets.
85+
histogramBuckets := DefaultAggregationSelector(InstrumentKindHistogram).(AggregationExplicitBucketHistogram).Boundaries
86+
histogramObservations := make([]float64, len(histogramBuckets))
87+
for i, bucket := range histogramBuckets {
88+
histogramObservations[i] = bucket + 1
89+
}
8490
return func(b *testing.B) {
8591
ctx := trace.ContextWithSpanContext(b.Context(), sc)
8692
iCtr, err := meter.Int64Counter("int64-counter")
8793
assert.NoError(b, err)
8894
b.Run("Int64Counter", benchMeasAttrs(func() measF {
89-
return func(s attribute.Set) func() {
95+
return func(s attribute.Set) func(int) {
9096
o := []metric.AddOption{metric.WithAttributeSet(s)}
91-
return func() { iCtr.Add(ctx, 1, o...) }
97+
return func(int) { iCtr.Add(ctx, 1, o...) }
9298
}
9399
}()))
94100

95101
fCtr, err := meter.Float64Counter("float64-counter")
96102
assert.NoError(b, err)
97103
b.Run("Float64Counter", benchMeasAttrs(func() measF {
98-
return func(s attribute.Set) func() {
104+
return func(s attribute.Set) func(int) {
99105
o := []metric.AddOption{metric.WithAttributeSet(s)}
100-
return func() { fCtr.Add(ctx, 1, o...) }
106+
return func(int) { fCtr.Add(ctx, 1, o...) }
101107
}
102108
}()))
103109

104110
iUDCtr, err := meter.Int64UpDownCounter("int64-up-down-counter")
105111
assert.NoError(b, err)
106112
b.Run("Int64UpDownCounter", benchMeasAttrs(func() measF {
107-
return func(s attribute.Set) func() {
113+
return func(s attribute.Set) func(int) {
108114
o := []metric.AddOption{metric.WithAttributeSet(s)}
109-
return func() { iUDCtr.Add(ctx, 1, o...) }
115+
return func(int) { iUDCtr.Add(ctx, 1, o...) }
110116
}
111117
}()))
112118

113119
fUDCtr, err := meter.Float64UpDownCounter("float64-up-down-counter")
114120
assert.NoError(b, err)
115121
b.Run("Float64UpDownCounter", benchMeasAttrs(func() measF {
116-
return func(s attribute.Set) func() {
122+
return func(s attribute.Set) func(int) {
117123
o := []metric.AddOption{metric.WithAttributeSet(s)}
118-
return func() { fUDCtr.Add(ctx, 1, o...) }
124+
return func(int) { fUDCtr.Add(ctx, 1, o...) }
119125
}
120126
}()))
121127

122128
iGauge, err := meter.Int64Gauge("int64-gauge")
123129
assert.NoError(b, err)
124130
b.Run("Int64Gauge", benchMeasAttrs(func() measF {
125-
return func(s attribute.Set) func() {
131+
return func(s attribute.Set) func(int) {
126132
o := []metric.RecordOption{metric.WithAttributeSet(s)}
127-
return func() { iGauge.Record(ctx, 1, o...) }
133+
return func(int) { iGauge.Record(ctx, 1, o...) }
128134
}
129135
}()))
130136

131137
fGauge, err := meter.Float64Gauge("float64-gauge")
132138
assert.NoError(b, err)
133139
b.Run("Float64Gauge", benchMeasAttrs(func() measF {
134-
return func(s attribute.Set) func() {
140+
return func(s attribute.Set) func(int) {
135141
o := []metric.RecordOption{metric.WithAttributeSet(s)}
136-
return func() { fGauge.Record(ctx, 1, o...) }
142+
return func(int) { fGauge.Record(ctx, 1, o...) }
137143
}
138144
}()))
139145

140146
iHist, err := meter.Int64Histogram("int64-histogram")
141147
assert.NoError(b, err)
142148
b.Run("Int64Histogram", benchMeasAttrs(func() measF {
143-
return func(s attribute.Set) func() {
149+
return func(s attribute.Set) func(int) {
144150
o := []metric.RecordOption{metric.WithAttributeSet(s)}
145-
return func() { iHist.Record(ctx, 1, o...) }
151+
return func(i int) { iHist.Record(ctx, int64(histogramObservations[i%len(histogramObservations)]), o...) }
146152
}
147153
}()))
148154

149155
fHist, err := meter.Float64Histogram("float64-histogram")
150156
assert.NoError(b, err)
151157
b.Run("Float64Histogram", benchMeasAttrs(func() measF {
152-
return func(s attribute.Set) func() {
158+
return func(s attribute.Set) func(i int) {
153159
o := []metric.RecordOption{metric.WithAttributeSet(s)}
154-
return func() { fHist.Record(ctx, 1, o...) }
160+
return func(i int) { fHist.Record(ctx, histogramObservations[i%len(histogramObservations)], o...) }
155161
}
156162
}()))
157163

158164
expIHist, err := expMeter.Int64Histogram("exponential-int64-histogram")
159165
assert.NoError(b, err)
160166
b.Run("ExponentialInt64Histogram", benchMeasAttrs(func() measF {
161-
return func(s attribute.Set) func() {
167+
return func(s attribute.Set) func(int) {
162168
o := []metric.RecordOption{metric.WithAttributeSet(s)}
163-
return func() { expIHist.Record(ctx, 1, o...) }
169+
return func(int) { expIHist.Record(ctx, 1, o...) }
164170
}
165171
}()))
166172

167173
expFHist, err := expMeter.Float64Histogram("exponential-float64-histogram")
168174
assert.NoError(b, err)
169175
b.Run("ExponentialFloat64Histogram", benchMeasAttrs(func() measF {
170-
return func(s attribute.Set) func() {
176+
return func(s attribute.Set) func(int) {
171177
o := []metric.RecordOption{metric.WithAttributeSet(s)}
172-
return func() { expFHist.Record(ctx, 1, o...) }
178+
return func(int) { expFHist.Record(ctx, 1, o...) }
173179
}
174180
}()))
175181
}
176182
}
177183

178-
type measF func(s attribute.Set) func()
184+
type measF func(s attribute.Set) func(i int)
179185

180186
func benchMeasAttrs(meas measF) func(*testing.B) {
181187
return func(b *testing.B) {
182188
b.Run("Attributes/0", func(b *testing.B) {
183189
f := meas(*attribute.EmptySet())
184190
b.RunParallel(func(pb *testing.PB) {
191+
i := 0
185192
for pb.Next() {
186-
f()
193+
f(i)
194+
i++
187195
}
188196
})
189197
})
190198
b.Run("Attributes/1", func(b *testing.B) {
191199
f := meas(attribute.NewSet(attribute.Bool("K", true)))
192200
b.RunParallel(func(pb *testing.PB) {
201+
i := 0
193202
for pb.Next() {
194-
f()
203+
f(i)
204+
i++
195205
}
196206
})
197207
})
@@ -204,8 +214,10 @@ func benchMeasAttrs(meas measF) func(*testing.B) {
204214
}
205215
f := meas(attribute.NewSet(attrs...))
206216
b.RunParallel(func(pb *testing.PB) {
217+
i := 0
207218
for pb.Next() {
208-
f()
219+
f(i)
220+
i++
209221
}
210222
})
211223
})

sdk/metric/exemplar/fixed_size_reservoir.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,11 +130,11 @@ func (r *FixedSizeReservoir) Offer(ctx context.Context, t time.Time, n Value, a
130130
r.mu.Lock()
131131
defer r.mu.Unlock()
132132
if int(r.count) < cap(r.measurements) {
133-
r.store(int(r.count), newMeasurement(ctx, t, n, a))
133+
r.store(ctx, int(r.count), t, n, a)
134134
} else if r.count == r.next {
135135
// Overwrite a random existing measurement with the one offered.
136136
idx := int(rand.Int64N(int64(cap(r.measurements))))
137-
r.store(idx, newMeasurement(ctx, t, n, a))
137+
r.store(ctx, idx, t, n, a)
138138
r.advance()
139139
}
140140
r.count++

sdk/metric/exemplar/fixed_size_reservoir_test.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,21 @@ func TestNewFixedSizeReservoirSamplingCorrectness(t *testing.T) {
4545
}
4646

4747
var sum float64
48-
for _, m := range r.measurements {
49-
sum += m.Value.Float64()
48+
for i := range r.measurements {
49+
sum += r.measurements[i].Value.Float64()
5050
}
5151
mean := sum / float64(sampleSize)
5252

5353
// Check the intensity/rate of the sampled distribution is preserved
5454
// ensuring no bias in our random sampling algorithm.
5555
assert.InDelta(t, 1/mean, intensity, 0.02) // Within 5σ.
5656
}
57+
58+
func TestFixedSizeReservoirConcurrentSafe(t *testing.T) {
59+
t.Run("Int64", reservoirConcurrentSafeTest[int64](func(n int) (ReservoirProvider, int) {
60+
return FixedSizeReservoirProvider(n), n
61+
}))
62+
t.Run("Float64", reservoirConcurrentSafeTest[float64](func(n int) (ReservoirProvider, int) {
63+
return FixedSizeReservoirProvider(n), n
64+
}))
65+
}

sdk/metric/exemplar/histogram_reservoir.go

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"context"
88
"slices"
99
"sort"
10-
"sync"
1110
"time"
1211

1312
"go.opentelemetry.io/otel/attribute"
@@ -43,7 +42,6 @@ var _ Reservoir = &HistogramReservoir{}
4342
type HistogramReservoir struct {
4443
reservoir.ConcurrentSafe
4544
*storage
46-
mu sync.Mutex
4745

4846
// bounds are bucket bounds in ascending order.
4947
bounds []float64
@@ -72,18 +70,13 @@ func (r *HistogramReservoir) Offer(ctx context.Context, t time.Time, v Value, a
7270
}
7371

7472
idx := sort.SearchFloat64s(r.bounds, n)
75-
m := newMeasurement(ctx, t, v, a)
7673

77-
r.mu.Lock()
78-
defer r.mu.Unlock()
79-
r.store(idx, m)
74+
r.store(ctx, idx, t, v, a)
8075
}
8176

8277
// Collect returns all the held exemplars.
8378
//
8479
// The Reservoir state is preserved after this call.
8580
func (r *HistogramReservoir) Collect(dest *[]Exemplar) {
86-
r.mu.Lock()
87-
defer r.mu.Unlock()
8881
r.storage.Collect(dest)
8982
}

sdk/metric/exemplar/histogram_reservoir_test.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,13 @@ func TestHist(t *testing.T) {
1515
return HistogramReservoirProvider(bounds), len(bounds)
1616
}))
1717
}
18+
19+
func TestHistogramReservoirConcurrentSafe(t *testing.T) {
20+
bounds := []float64{0, 100}
21+
t.Run("Int64", reservoirConcurrentSafeTest[int64](func(int) (ReservoirProvider, int) {
22+
return HistogramReservoirProvider(bounds), len(bounds)
23+
}))
24+
t.Run("Float64", reservoirConcurrentSafeTest[float64](func(int) (ReservoirProvider, int) {
25+
return HistogramReservoirProvider(bounds), len(bounds)
26+
}))
27+
}

sdk/metric/exemplar/reservoir_test.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
package exemplar
55

66
import (
7+
"context"
8+
"sync"
79
"testing"
810
"time"
911

@@ -144,3 +146,85 @@ func ReservoirTest[N int64 | float64](f factory) func(*testing.T) {
144146
})
145147
}
146148
}
149+
150+
func reservoirConcurrentSafeTest[N int64 | float64](f factory) func(*testing.T) {
151+
return func(t *testing.T) {
152+
t.Helper()
153+
rp, n := f(1)
154+
if n < 1 {
155+
t.Skip("skipping, reservoir capacity less than 1:", n)
156+
}
157+
r := rp(*attribute.EmptySet())
158+
159+
var wg sync.WaitGroup
160+
161+
const goroutines = 2
162+
163+
// Call Offer concurrently with another Offer, and with Collect.
164+
for i := range goroutines {
165+
wg.Add(1)
166+
go func(iteration int) {
167+
ctx, ts, val, attrs := generateOfferInputs[N](iteration + 1)
168+
r.Offer(ctx, ts, val, attrs)
169+
wg.Done()
170+
}(i)
171+
}
172+
173+
// Also test concurrent Collect calls
174+
wg.Add(1)
175+
go func() {
176+
var dest []Exemplar
177+
r.Collect(&dest)
178+
wg.Done()
179+
}()
180+
181+
wg.Wait()
182+
183+
// Final collect to validate state
184+
var dest []Exemplar
185+
r.Collect(&dest)
186+
assert.NotEmpty(t, dest)
187+
for _, e := range dest {
188+
validateExemplar[N](t, e)
189+
}
190+
}
191+
}
192+
193+
func generateOfferInputs[N int64 | float64](
194+
i int,
195+
) (context.Context, time.Time, Value, []attribute.KeyValue) {
196+
sc := trace.NewSpanContext(trace.SpanContextConfig{
197+
TraceID: trace.TraceID([16]byte{byte(i)}),
198+
SpanID: trace.SpanID([8]byte{byte(i)}),
199+
TraceFlags: trace.FlagsSampled,
200+
})
201+
ctx := trace.ContextWithSpanContext(context.Background(), sc)
202+
ts := time.Unix(int64(i), int64(i))
203+
val := NewValue(N(i))
204+
attrs := []attribute.KeyValue{attribute.Int("i", i)}
205+
return ctx, ts, val, attrs
206+
}
207+
208+
func validateExemplar[N int64 | float64](t *testing.T, e Exemplar) {
209+
t.Helper()
210+
i := 0
211+
switch e.Value.Type() {
212+
case Int64ValueType:
213+
i = int(e.Value.Int64())
214+
case Float64ValueType:
215+
i = int(e.Value.Float64())
216+
default:
217+
t.Fatalf("unexpected value type: %v", e.Value.Type())
218+
}
219+
if i == 0 {
220+
t.Fatal("empty exemplar")
221+
}
222+
ctx, ts, _, attrs := generateOfferInputs[N](i)
223+
sc := trace.SpanContextFromContext(ctx)
224+
tID := sc.TraceID()
225+
sID := sc.SpanID()
226+
assert.Equal(t, tID[:], e.TraceID)
227+
assert.Equal(t, sID[:], e.SpanID)
228+
assert.Equal(t, ts, e.Time)
229+
assert.Equal(t, attrs, e.FilteredAttributes)
230+
}

0 commit comments

Comments
 (0)