Skip to content

Commit a493539

Browse files
committed
use sync Map and atomics for last value aggregation
1 parent f57bf14 commit a493539

File tree

3 files changed

+127
-102
lines changed

3 files changed

+127
-102
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
1313
- `Exporter` in `go.opentelemetry.io/otel/exporter/prometheus` ignores metrics with the scope `go.opentelemetry.io/contrib/bridges/prometheus`.
1414
This prevents scrape failures when the Prometheus exporter is misconfigured to get data from the Prometheus bridge. (#7688)
1515
- Improve performance of concurrent histogram measurements in `go.opentelemetry.io/otel/sdk/metric`. (#7474)
16+
- Improve performance of concurrent synchronous gauge measurements in `go.opentelemetry.io/otel/sdk/metric`. (#7478)
1617

1718
<!-- Released section -->
1819
<!-- Don't change this section unless doing release -->

sdk/metric/internal/aggregate/aggregate.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,12 +74,13 @@ func (b Builder[N]) filter(f fltrMeasure[N]) Measure[N] {
7474

7575
// LastValue returns a last-value aggregate function input and output.
7676
func (b Builder[N]) LastValue() (Measure[N], ComputeAggregation) {
77-
lv := newLastValue[N](b.AggregationLimit, b.resFunc())
7877
switch b.Temporality {
7978
case metricdata.DeltaTemporality:
80-
return b.filter(lv.measure), lv.delta
79+
lv := newDeltaLastValue[N](b.AggregationLimit, b.resFunc())
80+
return b.filter(lv.measure), lv.collect
8181
default:
82-
return b.filter(lv.measure), lv.cumulative
82+
lv := newCumulativeLastValue[N](b.AggregationLimit, b.resFunc())
83+
return b.filter(lv.measure), lv.collect
8384
}
8485
}
8586

sdk/metric/internal/aggregate/lastvalue.go

Lines changed: 122 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -5,117 +5,168 @@ package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggreg
55

66
import (
77
"context"
8-
"sync"
98
"time"
109

1110
"go.opentelemetry.io/otel/attribute"
1211
"go.opentelemetry.io/otel/sdk/metric/metricdata"
1312
)
1413

15-
// datapoint is timestamped measurement data.
16-
type datapoint[N int64 | float64] struct {
14+
// lastValuePoint is timestamped measurement data.
15+
type lastValuePoint[N int64 | float64] struct {
1716
attrs attribute.Set
18-
value N
17+
value atomicN[N]
1918
res FilteredExemplarReservoir[N]
2019
}
2120

22-
func newLastValue[N int64 | float64](limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *lastValue[N] {
23-
return &lastValue[N]{
21+
// lastValue summarizes a set of measurements as the last one made.
22+
type lastValueMap[N int64 | float64] struct {
23+
newRes func(attribute.Set) FilteredExemplarReservoir[N]
24+
values limitedSyncMap
25+
}
26+
27+
func (s *lastValueMap[N]) measure(
28+
ctx context.Context,
29+
value N,
30+
fltrAttr attribute.Set,
31+
droppedAttr []attribute.KeyValue,
32+
) {
33+
lv := s.values.LoadOrStoreAttr(fltrAttr, func(attr attribute.Set) any {
34+
return &lastValuePoint[N]{
35+
res: s.newRes(attr),
36+
attrs: attr,
37+
}
38+
}).(*lastValuePoint[N])
39+
40+
lv.value.Store(value)
41+
lv.res.Offer(ctx, value, droppedAttr)
42+
}
43+
44+
func newDeltaLastValue[N int64 | float64](
45+
limit int,
46+
r func(attribute.Set) FilteredExemplarReservoir[N],
47+
) *deltaLastValue[N] {
48+
return &deltaLastValue[N]{
2449
newRes: r,
25-
limit: newLimiter[datapoint[N]](limit),
26-
values: make(map[attribute.Distinct]*datapoint[N]),
2750
start: now(),
51+
hotColdValMap: [2]lastValueMap[N]{
52+
{
53+
values: limitedSyncMap{aggLimit: limit},
54+
newRes: r,
55+
},
56+
{
57+
values: limitedSyncMap{aggLimit: limit},
58+
newRes: r,
59+
},
60+
},
2861
}
2962
}
3063

31-
// lastValue summarizes a set of measurements as the last one made.
32-
type lastValue[N int64 | float64] struct {
33-
sync.Mutex
34-
64+
// deltaLastValue summarizes a set of measurements as the last one made.
65+
type deltaLastValue[N int64 | float64] struct {
3566
newRes func(attribute.Set) FilteredExemplarReservoir[N]
36-
limit limiter[datapoint[N]]
37-
values map[attribute.Distinct]*datapoint[N]
3867
start time.Time
39-
}
40-
41-
func (s *lastValue[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) {
42-
s.Lock()
43-
defer s.Unlock()
4468

45-
d, ok := s.values[fltrAttr.Equivalent()]
46-
if !ok {
47-
fltrAttr = s.limit.Attributes(fltrAttr, s.values)
48-
d = &datapoint[N]{
49-
res: s.newRes(fltrAttr),
50-
attrs: fltrAttr,
51-
}
52-
}
53-
54-
d.value = value
55-
d.res.Offer(ctx, value, droppedAttr)
69+
hcwg hotColdWaitGroup
70+
hotColdValMap [2]lastValueMap[N]
71+
}
5672

57-
s.values[fltrAttr.Equivalent()] = d
73+
func (s *deltaLastValue[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) {
74+
hotIdx := s.hcwg.start()
75+
defer s.hcwg.done(hotIdx)
76+
s.hotColdValMap[hotIdx].measure(ctx, value, fltrAttr, droppedAttr)
5877
}
5978

60-
func (s *lastValue[N]) delta(
79+
func (s *deltaLastValue[N]) collect(
6180
dest *metricdata.Aggregation, //nolint:gocritic // The pointer is needed for the ComputeAggregation interface
6281
) int {
6382
t := now()
83+
n := s.copyAndClearDpts(dest, t)
84+
// Update start time for delta temporality.
85+
s.start = t
86+
return n
87+
}
88+
89+
// copyAndClearDpts copies the lastValuePoints held by s into dest. The number of lastValuePoints
90+
// copied is returned.
91+
func (s *deltaLastValue[N]) copyAndClearDpts(dest *metricdata.Aggregation, t time.Time) int { //nolint:gocritic // The pointer is needed for the ComputeAggregation interface
6492
// Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of
65-
// the DataPoints is missed (better luck next time).
93+
// the lastValuePoints is missed (better luck next time).
6694
gData, _ := (*dest).(metricdata.Gauge[N])
95+
// delta always clears values on collection
96+
readIdx := s.hcwg.swapHotAndWait()
97+
// The len will not change while we iterate over values, since we waited
98+
// for all writes to finish to the cold values and len.
99+
n := s.hotColdValMap[readIdx].values.Len()
100+
dPts := reset(gData.DataPoints, n, n)
67101

68-
s.Lock()
69-
defer s.Unlock()
70-
71-
n := s.copyDpts(&gData.DataPoints, t)
102+
var i int
103+
s.hotColdValMap[readIdx].values.Range(func(_, value any) bool {
104+
v := value.(*lastValuePoint[N])
105+
dPts[i].Attributes = v.attrs
106+
dPts[i].StartTime = s.start
107+
dPts[i].Time = t
108+
dPts[i].Value = v.value.Load()
109+
collectExemplars[N](&dPts[i].Exemplars, v.res.Collect)
110+
i++
111+
return true
112+
})
113+
gData.DataPoints = dPts
72114
// Do not report stale values.
73-
clear(s.values)
74-
// Update start time for delta temporality.
75-
s.start = t
76-
115+
s.hotColdValMap[readIdx].values.Clear()
77116
*dest = gData
78-
79117
return n
80118
}
81119

82-
func (s *lastValue[N]) cumulative(
120+
// cumulativeLastValue summarizes a set of measurements as the last one made.
121+
type cumulativeLastValue[N int64 | float64] struct {
122+
lastValueMap[N]
123+
start time.Time
124+
}
125+
126+
func newCumulativeLastValue[N int64 | float64](limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *cumulativeLastValue[N] {
127+
return &cumulativeLastValue[N]{
128+
lastValueMap: lastValueMap[N]{
129+
values: limitedSyncMap{aggLimit: limit},
130+
newRes: r,
131+
},
132+
start: now(),
133+
}
134+
}
135+
136+
func (s *cumulativeLastValue[N]) collect(
83137
dest *metricdata.Aggregation, //nolint:gocritic // The pointer is needed for the ComputeAggregation interface
84138
) int {
85139
t := now()
86140
// Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of
87-
// the DataPoints is missed (better luck next time).
141+
// the lastValuePoints is missed (better luck next time).
88142
gData, _ := (*dest).(metricdata.Gauge[N])
89143

90-
s.Lock()
91-
defer s.Unlock()
144+
// Values are being concurrently written while we iterate, so only use the
145+
// current length for capacity.
146+
dPts := reset(gData.DataPoints, 0, s.values.Len())
92147

93-
n := s.copyDpts(&gData.DataPoints, t)
148+
var i int
149+
s.values.Range(func(_, value any) bool {
150+
v := value.(*lastValuePoint[N])
151+
newPt := metricdata.DataPoint[N]{
152+
Attributes: v.attrs,
153+
StartTime: s.start,
154+
Time: t,
155+
Value: v.value.Load(),
156+
}
157+
collectExemplars[N](&newPt.Exemplars, v.res.Collect)
158+
dPts = append(dPts, newPt)
159+
i++
160+
return true
161+
})
162+
gData.DataPoints = dPts
94163
// TODO (#3006): This will use an unbounded amount of memory if there
95164
// are unbounded number of attribute sets being aggregated. Attribute
96165
// sets that become "stale" need to be forgotten so this will not
97166
// overload the system.
98167
*dest = gData
99168

100-
return n
101-
}
102-
103-
// copyDpts copies the datapoints held by s into dest. The number of datapoints
104-
// copied is returned.
105-
func (s *lastValue[N]) copyDpts(dest *[]metricdata.DataPoint[N], t time.Time) int {
106-
n := len(s.values)
107-
*dest = reset(*dest, n, n)
108-
109-
var i int
110-
for _, v := range s.values {
111-
(*dest)[i].Attributes = v.attrs
112-
(*dest)[i].StartTime = s.start
113-
(*dest)[i].Time = t
114-
(*dest)[i].Value = v.value
115-
collectExemplars(&(*dest)[i].Exemplars, v.res.Collect)
116-
i++
117-
}
118-
return n
169+
return i
119170
}
120171

121172
// newPrecomputedLastValue returns an aggregator that summarizes a set of
@@ -124,51 +175,23 @@ func newPrecomputedLastValue[N int64 | float64](
124175
limit int,
125176
r func(attribute.Set) FilteredExemplarReservoir[N],
126177
) *precomputedLastValue[N] {
127-
return &precomputedLastValue[N]{lastValue: newLastValue[N](limit, r)}
178+
return &precomputedLastValue[N]{deltaLastValue: newDeltaLastValue[N](limit, r)}
128179
}
129180

130181
// precomputedLastValue summarizes a set of observations as the last one made.
131182
type precomputedLastValue[N int64 | float64] struct {
132-
*lastValue[N]
183+
*deltaLastValue[N]
133184
}
134185

135186
func (s *precomputedLastValue[N]) delta(
136187
dest *metricdata.Aggregation, //nolint:gocritic // The pointer is needed for the ComputeAggregation interface
137188
) int {
138-
t := now()
139-
// Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of
140-
// the DataPoints is missed (better luck next time).
141-
gData, _ := (*dest).(metricdata.Gauge[N])
142-
143-
s.Lock()
144-
defer s.Unlock()
145-
146-
n := s.copyDpts(&gData.DataPoints, t)
147-
// Do not report stale values.
148-
clear(s.values)
149-
// Update start time for delta temporality.
150-
s.start = t
151-
152-
*dest = gData
153-
154-
return n
189+
return s.collect(dest)
155190
}
156191

157192
func (s *precomputedLastValue[N]) cumulative(
158193
dest *metricdata.Aggregation, //nolint:gocritic // The pointer is needed for the ComputeAggregation interface
159194
) int {
160-
t := now()
161-
// Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of
162-
// the DataPoints is missed (better luck next time).
163-
gData, _ := (*dest).(metricdata.Gauge[N])
164-
165-
s.Lock()
166-
defer s.Unlock()
167-
168-
n := s.copyDpts(&gData.DataPoints, t)
169-
// Do not report stale values.
170-
clear(s.values)
171-
*dest = gData
172-
173-
return n
195+
// Do not reset the start time.
196+
return s.copyAndClearDpts(dest, now())
174197
}

0 commit comments

Comments
 (0)