Skip to content

Commit f57bf14

Browse files
dashpolebwplotka
andauthored
Use sync.Map and atomics for fixed bucket histograms (#7474)
Implement a lockless histogram using atomics, and use a sync.Map for attribute access. This improves performance by ~2x. The design is very similar to #7427, but with one additional change to make the histogram data point itself atomic: * For cumulative histograms, which do not use a hot/cold limitedSyncMap, we use a hot/cold data point. This way, we maintain the keys in the sync map, but still ensure that collection gets a consistent view of measure() calls. Parallel benchmarks: ``` │ main.txt │ hist.txt │ │ sec/op │ sec/op vs base │ SyncMeasure/NoView/ExemplarsDisabled/Int64Histogram/Attributes/10-24 274.5n ± 2% 125.2n ± 5% -54.42% (p=0.002 n=6) SyncMeasure/NoView/ExemplarsDisabled/Float64Histogram/Attributes/10-24 274.1n ± 2% 132.5n ± 2% -51.65% (p=0.002 n=6) geomean 274.3n 128.8n -53.05% ``` zero memory allocations before and after this change for Measure(). Omitted for brevity Benchmarks for collect: ``` │ main.txt │ hist.txt │ │ sec/op │ sec/op vs base │ Collect/NoView/Int64Histogram/1/Attributes/0-24 1.799µ ± 60% 1.702µ ± 6% ~ (p=1.000 n=6) Collect/NoView/Int64Histogram/1/Attributes/1-24 973.7n ± 28% 1720.0n ± 5% +76.65% (p=0.002 n=6) Collect/NoView/Int64Histogram/1/Attributes/10-24 881.0n ± 17% 1710.0n ± 5% +94.09% (p=0.002 n=6) Collect/NoView/Int64Histogram/10/Attributes/0-24 996.1n ± 14% 1781.5n ± 4% +78.85% (p=0.002 n=6) Collect/NoView/Int64Histogram/10/Attributes/1-24 1.029µ ± 67% 1.733µ ± 3% +68.42% (p=0.009 n=6) Collect/NoView/Int64Histogram/10/Attributes/10-24 1.533µ ± 18% 1.708µ ± 4% ~ (p=0.240 n=6) Collect/NoView/Float64Histogram/1/Attributes/0-24 1.222µ ± 120% 1.733µ ± 4% ~ (p=0.065 n=6) Collect/NoView/Float64Histogram/1/Attributes/1-24 893.3n ± 8% 1733.0n ± 4% +94.00% (p=0.002 n=6) Collect/NoView/Float64Histogram/1/Attributes/10-24 860.7n ± 2% 1732.0n ± 5% +101.23% (p=0.002 n=6) Collect/NoView/Float64Histogram/10/Attributes/0-24 852.5n ± 4% 1758.0n ± 3% +106.22% (p=0.002 n=6) Collect/NoView/Float64Histogram/10/Attributes/1-24 853.8n ± 3% 1725.0n ± 3% +102.04% (p=0.002 n=6) Collect/NoView/Float64Histogram/10/Attributes/10-24 843.4n ± 2% 1755.0n ± 4% +108.10% (p=0.002 n=6) geomean 1.028µ 1.732µ +68.46% │ main.txt │ hist.txt │ │ B/op │ B/op vs base │ Collect/NoView/Int64Histogram/1/Attributes/0-24 336.0 ± 0% 2131.0 ± 0% +534.23% (p=0.002 n=6) Collect/NoView/Int64Histogram/1/Attributes/1-24 336.0 ± 0% 2131.0 ± 0% +534.23% (p=0.002 n=6) Collect/NoView/Int64Histogram/1/Attributes/10-24 336.0 ± 0% 2131.0 ± 0% +534.23% (p=0.002 n=6) Collect/NoView/Int64Histogram/10/Attributes/0-24 336.0 ± 0% 2131.0 ± 0% +534.23% (p=0.002 n=6) Collect/NoView/Int64Histogram/10/Attributes/1-24 336.0 ± 0% 2131.0 ± 0% +534.23% (p=0.002 n=6) Collect/NoView/Int64Histogram/10/Attributes/10-24 336.0 ± 0% 2131.0 ± 0% +534.23% (p=0.002 n=6) Collect/NoView/Float64Histogram/1/Attributes/0-24 336.0 ± 0% 2131.0 ± 0% +534.23% (p=0.002 n=6) Collect/NoView/Float64Histogram/1/Attributes/1-24 336.0 ± 0% 2130.5 ± 0% +534.08% (p=0.002 n=6) Collect/NoView/Float64Histogram/1/Attributes/10-24 336.0 ± 0% 2131.0 ± 0% +534.23% (p=0.002 n=6) Collect/NoView/Float64Histogram/10/Attributes/0-24 336.0 ± 0% 2131.0 ± 0% +534.23% (p=0.002 n=6) Collect/NoView/Float64Histogram/10/Attributes/1-24 336.0 ± 0% 2131.0 ± 0% +534.23% (p=0.002 n=6) Collect/NoView/Float64Histogram/10/Attributes/10-24 336.0 ± 0% 2131.0 ± 0% +534.23% (p=0.002 n=6) geomean 336.0 2.081Ki +534.21% │ main.txt │ hist.txt │ │ allocs/op │ allocs/op vs base │ Collect/NoView/Int64Histogram/1/Attributes/0-24 5.000 ± 0% 6.000 ± 0% +20.00% (p=0.002 n=6) Collect/NoView/Int64Histogram/1/Attributes/1-24 5.000 ± 0% 6.000 ± 0% +20.00% (p=0.002 n=6) Collect/NoView/Int64Histogram/1/Attributes/10-24 5.000 ± 0% 6.000 ± 0% +20.00% (p=0.002 n=6) Collect/NoView/Int64Histogram/10/Attributes/0-24 5.000 ± 0% 6.000 ± 0% +20.00% (p=0.002 n=6) Collect/NoView/Int64Histogram/10/Attributes/1-24 5.000 ± 0% 6.000 ± 0% +20.00% (p=0.002 n=6) Collect/NoView/Int64Histogram/10/Attributes/10-24 5.000 ± 0% 6.000 ± 0% +20.00% (p=0.002 n=6) Collect/NoView/Float64Histogram/1/Attributes/0-24 5.000 ± 0% 6.000 ± 0% +20.00% (p=0.002 n=6) Collect/NoView/Float64Histogram/1/Attributes/1-24 5.000 ± 0% 6.000 ± 0% +20.00% (p=0.002 n=6) Collect/NoView/Float64Histogram/1/Attributes/10-24 5.000 ± 0% 6.000 ± 0% +20.00% (p=0.002 n=6) Collect/NoView/Float64Histogram/10/Attributes/0-24 5.000 ± 0% 6.000 ± 0% +20.00% (p=0.002 n=6) Collect/NoView/Float64Histogram/10/Attributes/1-24 5.000 ± 0% 6.000 ± 0% +20.00% (p=0.002 n=6) Collect/NoView/Float64Histogram/10/Attributes/10-24 5.000 ± 0% 6.000 ± 0% +20.00% (p=0.002 n=6) geomean 5.000 6.000 +20.00% ``` Collect does get substantially worse, but Measure is expected to be called significantly more often than collect. --------- Co-authored-by: Bartlomiej Plotka <[email protected]>
1 parent b4578c8 commit f57bf14

File tree

7 files changed

+560
-201
lines changed

7 files changed

+560
-201
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
1212

1313
- `Exporter` in `go.opentelemetry.io/otel/exporter/prometheus` ignores metrics with the scope `go.opentelemetry.io/contrib/bridges/prometheus`.
1414
This prevents scrape failures when the Prometheus exporter is misconfigured to get data from the Prometheus bridge. (#7688)
15+
- Improve performance of concurrent histogram measurements in `go.opentelemetry.io/otel/sdk/metric`. (#7474)
1516

1617
<!-- Released section -->
1718
<!-- Don't change this section unless doing release -->

sdk/metric/internal/aggregate/aggregate.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,12 +126,13 @@ func (b Builder[N]) ExplicitBucketHistogram(
126126
boundaries []float64,
127127
noMinMax, noSum bool,
128128
) (Measure[N], ComputeAggregation) {
129-
h := newHistogram[N](boundaries, noMinMax, noSum, b.AggregationLimit, b.resFunc())
130129
switch b.Temporality {
131130
case metricdata.DeltaTemporality:
132-
return b.filter(h.measure), h.delta
131+
h := newDeltaHistogram[N](boundaries, noMinMax, noSum, b.AggregationLimit, b.resFunc())
132+
return b.filter(h.measure), h.collect
133133
default:
134-
return b.filter(h.measure), h.cumulative
134+
h := newCumulativeHistogram[N](boundaries, noMinMax, noSum, b.AggregationLimit, b.resFunc())
135+
return b.filter(h.measure), h.collect
135136
}
136137
}
137138

sdk/metric/internal/aggregate/atomic.go

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,97 @@ func (n *atomicCounter[N]) add(value N) {
5151
}
5252
}
5353

54+
// reset resets the internal state, and is not safe to call concurrently.
55+
func (n *atomicCounter[N]) reset() {
56+
n.nFloatBits.Store(0)
57+
n.nInt.Store(0)
58+
}
59+
60+
// atomicN is a generic atomic number value.
61+
type atomicN[N int64 | float64] struct {
62+
val atomic.Uint64
63+
}
64+
65+
func (a *atomicN[N]) Load() (value N) {
66+
v := a.val.Load()
67+
switch any(value).(type) {
68+
case int64:
69+
value = N(v)
70+
case float64:
71+
value = N(math.Float64frombits(v))
72+
default:
73+
panic("unsupported type")
74+
}
75+
return value
76+
}
77+
78+
func (a *atomicN[N]) Store(v N) {
79+
var val uint64
80+
switch any(v).(type) {
81+
case int64:
82+
val = uint64(v)
83+
case float64:
84+
val = math.Float64bits(float64(v))
85+
default:
86+
panic("unsupported type")
87+
}
88+
a.val.Store(val)
89+
}
90+
91+
func (a *atomicN[N]) CompareAndSwap(oldN, newN N) bool {
92+
var o, n uint64
93+
switch any(oldN).(type) {
94+
case int64:
95+
o, n = uint64(oldN), uint64(newN)
96+
case float64:
97+
o, n = math.Float64bits(float64(oldN)), math.Float64bits(float64(newN))
98+
default:
99+
panic("unsupported type")
100+
}
101+
return a.val.CompareAndSwap(o, n)
102+
}
103+
104+
type atomicMinMax[N int64 | float64] struct {
105+
minimum, maximum atomicN[N]
106+
set atomic.Bool
107+
mu sync.Mutex
108+
}
109+
110+
// init returns true if the value was used to initialize min and max.
111+
func (s *atomicMinMax[N]) init(val N) bool {
112+
s.mu.Lock()
113+
defer s.mu.Unlock()
114+
if !s.set.Load() {
115+
defer s.set.Store(true)
116+
s.minimum.Store(val)
117+
s.maximum.Store(val)
118+
return true
119+
}
120+
return false
121+
}
122+
123+
func (s *atomicMinMax[N]) Update(val N) {
124+
if !s.set.Load() && s.init(val) {
125+
return
126+
}
127+
128+
old := s.minimum.Load()
129+
for val < old {
130+
if s.minimum.CompareAndSwap(old, val) {
131+
return
132+
}
133+
old = s.minimum.Load()
134+
}
135+
136+
old = s.maximum.Load()
137+
for old < val {
138+
if s.maximum.CompareAndSwap(old, val) {
139+
return
140+
}
141+
old = s.maximum.Load()
142+
}
143+
}
144+
54145
// hotColdWaitGroup is a synchronization primitive which enables lockless
55146
// writes for concurrent writers and enables a reader to acquire exclusive
56147
// access to a snapshot of state including only completed operations.

sdk/metric/internal/aggregate/atomic_test.go

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,33 @@ func TestAtomicSumAddIntConcurrentSafe(t *testing.T) {
5252
assert.Equal(t, int64(15), aSum.load())
5353
}
5454

55+
func BenchmarkAtomicCounter(b *testing.B) {
56+
b.Run("Int64", benchmarkAtomicCounter[int64])
57+
b.Run("Float64", benchmarkAtomicCounter[float64])
58+
}
59+
60+
func benchmarkAtomicCounter[N int64 | float64](b *testing.B) {
61+
b.Run("add", func(b *testing.B) {
62+
var a atomicCounter[N]
63+
b.RunParallel(func(pb *testing.PB) {
64+
for pb.Next() {
65+
a.add(2)
66+
}
67+
})
68+
})
69+
b.Run("load", func(b *testing.B) {
70+
var a atomicCounter[N]
71+
a.add(2)
72+
var v N
73+
b.RunParallel(func(pb *testing.PB) {
74+
for pb.Next() {
75+
v = a.load()
76+
}
77+
})
78+
assert.Equal(b, N(2), v)
79+
})
80+
}
81+
5582
func TestHotColdWaitGroupConcurrentSafe(t *testing.T) {
5683
var wg sync.WaitGroup
5784
hcwg := &hotColdWaitGroup{}
@@ -76,3 +103,150 @@ func TestHotColdWaitGroupConcurrentSafe(t *testing.T) {
76103
}
77104
wg.Wait()
78105
}
106+
107+
func TestAtomicN(t *testing.T) {
108+
t.Run("Int64", testAtomicN[int64])
109+
t.Run("Float64", testAtomicN[float64])
110+
}
111+
112+
func testAtomicN[N int64 | float64](t *testing.T) {
113+
var v atomicN[N]
114+
assert.Equal(t, N(0), v.Load())
115+
assert.True(t, v.CompareAndSwap(0, 6))
116+
assert.Equal(t, N(6), v.Load())
117+
assert.False(t, v.CompareAndSwap(0, 6))
118+
v.Store(22)
119+
assert.Equal(t, N(22), v.Load())
120+
}
121+
122+
func TestAtomicNConcurrentSafe(t *testing.T) {
123+
t.Run("Int64", testAtomicNConcurrentSafe[int64])
124+
t.Run("Float64", testAtomicNConcurrentSafe[float64])
125+
}
126+
127+
func testAtomicNConcurrentSafe[N int64 | float64](t *testing.T) {
128+
var wg sync.WaitGroup
129+
var v atomicN[N]
130+
131+
for range 2 {
132+
wg.Add(1)
133+
go func() {
134+
defer wg.Done()
135+
got := v.Load()
136+
assert.Equal(t, int64(0), int64(got)%6)
137+
}()
138+
wg.Add(1)
139+
go func() {
140+
defer wg.Done()
141+
v.Store(12)
142+
}()
143+
wg.Add(1)
144+
go func() {
145+
defer wg.Done()
146+
v.CompareAndSwap(0, 6)
147+
}()
148+
}
149+
wg.Wait()
150+
}
151+
152+
func BenchmarkAtomicN(b *testing.B) {
153+
b.Run("Int64", benchmarkAtomicN[int64])
154+
b.Run("Float64", benchmarkAtomicN[float64])
155+
}
156+
157+
func benchmarkAtomicN[N int64 | float64](b *testing.B) {
158+
b.Run("Load", func(b *testing.B) {
159+
var a atomicN[N]
160+
a.Store(2)
161+
var v N
162+
b.RunParallel(func(pb *testing.PB) {
163+
for pb.Next() {
164+
v = a.Load()
165+
}
166+
})
167+
assert.Equal(b, N(2), v)
168+
})
169+
b.Run("Store", func(b *testing.B) {
170+
var a atomicN[N]
171+
b.RunParallel(func(pb *testing.PB) {
172+
for pb.Next() {
173+
a.Store(3)
174+
}
175+
})
176+
})
177+
b.Run("CompareAndSwap", func(b *testing.B) {
178+
var a atomicN[N]
179+
b.RunParallel(func(pb *testing.PB) {
180+
i := 0
181+
for pb.Next() {
182+
// Make sure we swap back and forth, in-case that matters.
183+
if i%2 == 0 {
184+
a.CompareAndSwap(0, 1)
185+
} else {
186+
a.CompareAndSwap(1, 0)
187+
}
188+
i++
189+
}
190+
})
191+
})
192+
}
193+
194+
func TestAtomicMinMaxConcurrentSafe(t *testing.T) {
195+
t.Run("Int64", testAtomicMinMaxConcurrentSafe[int64])
196+
t.Run("Float64", testAtomicMinMaxConcurrentSafe[float64])
197+
}
198+
199+
func testAtomicMinMaxConcurrentSafe[N int64 | float64](t *testing.T) {
200+
var wg sync.WaitGroup
201+
var minMax atomicMinMax[N]
202+
203+
assert.False(t, minMax.set.Load())
204+
for _, i := range []float64{2, 4, 6, 8, -3, 0, 8, 0} {
205+
wg.Add(1)
206+
go func() {
207+
defer wg.Done()
208+
minMax.Update(N(i))
209+
}()
210+
}
211+
wg.Wait()
212+
213+
assert.True(t, minMax.set.Load())
214+
assert.Equal(t, N(-3), minMax.minimum.Load())
215+
assert.Equal(t, N(8), minMax.maximum.Load())
216+
}
217+
218+
func BenchmarkAtomicMinMax(b *testing.B) {
219+
b.Run("Int64", benchmarkAtomicMinMax[int64])
220+
b.Run("Float64", benchmarkAtomicMinMax[float64])
221+
}
222+
223+
func benchmarkAtomicMinMax[N int64 | float64](b *testing.B) {
224+
b.Run("UpdateIncreasing", func(b *testing.B) {
225+
var a atomicMinMax[N]
226+
b.RunParallel(func(pb *testing.PB) {
227+
i := 0
228+
for pb.Next() {
229+
a.Update(N(i))
230+
i++
231+
}
232+
})
233+
})
234+
b.Run("UpdateDecreasing", func(b *testing.B) {
235+
var a atomicMinMax[N]
236+
b.RunParallel(func(pb *testing.PB) {
237+
i := 0
238+
for pb.Next() {
239+
a.Update(N(i))
240+
i--
241+
}
242+
})
243+
})
244+
b.Run("UpdateConstant", func(b *testing.B) {
245+
var a atomicMinMax[N]
246+
b.RunParallel(func(pb *testing.PB) {
247+
for pb.Next() {
248+
a.Update(N(5))
249+
}
250+
})
251+
})
252+
}

0 commit comments

Comments
 (0)