Skip to content

Commit 754602e

Browse files
committed
expoBuckets uses a circular buffer to avoid shifts
1 parent f57bf14 commit 754602e

File tree

2 files changed

+229
-248
lines changed

2 files changed

+229
-248
lines changed

sdk/metric/internal/aggregate/exponential_histogram.go

Lines changed: 95 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@ const (
1919
expoMaxScale = 20
2020
expoMinScale = -10
2121

22-
smallestNonZeroNormalFloat64 = 0x1p-1022
23-
2422
// These redefine the Math constants with a type, so the compiler won't coerce
2523
// them into an int on 32 bit platforms.
2624
maxInt64 int64 = math.MaxInt64
@@ -62,13 +60,15 @@ func newExpoHistogramDataPoint[N int64 | float64](
6260
mi = N(minInt64)
6361
}
6462
return &expoHistogramDataPoint[N]{
65-
attrs: attrs,
66-
min: ma,
67-
max: mi,
68-
maxSize: maxSize,
69-
noMinMax: noMinMax,
70-
noSum: noSum,
71-
scale: maxScale,
63+
attrs: attrs,
64+
min: ma,
65+
max: mi,
66+
maxSize: maxSize,
67+
noMinMax: noMinMax,
68+
noSum: noSum,
69+
scale: maxScale,
70+
posBuckets: newExpoBuckets(maxSize),
71+
negBuckets: newExpoBuckets(maxSize),
7272
}
7373
}
7474

@@ -104,7 +104,7 @@ func (p *expoHistogramDataPoint[N]) record(v N) {
104104

105105
// If the new bin would make the counts larger than maxScale, we need to
106106
// downscale current measurements.
107-
if scaleDelta := p.scaleChange(bin, bucket.startBin, len(bucket.counts)); scaleDelta > 0 {
107+
if scaleDelta := p.scaleChange(bin, bucket.startBin, bucket.endBin); scaleDelta > 0 {
108108
if p.scale-scaleDelta < expoMinScale {
109109
// With a scale of -10 there is only two buckets for the whole range of float64 values.
110110
// This can only happen if there is a max size of 1.
@@ -168,8 +168,8 @@ var scaleFactors = [21]float64{
168168

169169
// scaleChange returns the magnitude of the scale change needed to fit bin in
170170
// the bucket. If no scale change is needed 0 is returned.
171-
func (p *expoHistogramDataPoint[N]) scaleChange(bin, startBin int32, length int) int32 {
172-
if length == 0 {
171+
func (p *expoHistogramDataPoint[N]) scaleChange(bin, startBin, endBin int32) int32 {
172+
if startBin == endBin {
173173
// No need to rescale if there are no buckets.
174174
return 0
175175
}
@@ -178,7 +178,7 @@ func (p *expoHistogramDataPoint[N]) scaleChange(bin, startBin int32, length int)
178178
high := int(bin)
179179
if startBin >= bin {
180180
low = int(bin)
181-
high = int(startBin) + length - 1
181+
high = int(endBin) - 1
182182
}
183183

184184
var count int32
@@ -193,61 +193,62 @@ func (p *expoHistogramDataPoint[N]) scaleChange(bin, startBin int32, length int)
193193
return count
194194
}
195195

196+
func newExpoBuckets(maxSize int) expoBuckets {
197+
return expoBuckets{
198+
counts: make([]uint64, maxSize),
199+
}
200+
}
201+
196202
// expoBuckets is a set of buckets in an exponential histogram.
197203
type expoBuckets struct {
204+
// startBin is the inclusive start of the range.
198205
startBin int32
199-
counts []uint64
206+
// endBin is the exclusive end of the range.
207+
endBin int32
208+
// counts is a circular slice of size maxSize, where bin i is stored at
209+
// position i % maxSize.
210+
counts []uint64
211+
}
212+
213+
func (e *expoBuckets) getIdx(bin int32) int {
214+
newBin := int(bin) % len(e.counts)
215+
// ensure the index is positive.
216+
return (newBin + len(e.counts)) % len(e.counts)
217+
}
218+
219+
// loadCountsAndOffset returns the buckets counts, the count, and the offset.
220+
// It is not safe to call concurrently.
221+
func (e *expoBuckets) loadCountsAndOffset(into *[]uint64) int32 {
222+
// TODO (#3047): Making copies for bounds and counts incurs a large
223+
// memory allocation footprint. Alternatives should be explored.
224+
length := int(e.endBin - e.startBin)
225+
counts := reset(*into, length, length)
226+
count := uint64(0)
227+
eIdx := e.startBin
228+
for i := range length {
229+
val := e.counts[e.getIdx(eIdx)]
230+
counts[i] = val
231+
count += val
232+
eIdx++
233+
}
234+
*into = counts
235+
return e.startBin
200236
}
201237

202238
// record increments the count for the given bin, and expands the buckets if needed.
203239
// Size changes must be done before calling this function.
204240
func (b *expoBuckets) record(bin int32) {
205-
if len(b.counts) == 0 {
206-
b.counts = []uint64{1}
241+
// we already downscaled, so we are guaranteed that we can fit within the
242+
// counts.
243+
b.counts[b.getIdx(bin)]++
244+
switch {
245+
case b.startBin == b.endBin:
246+
b.endBin = bin + 1
207247
b.startBin = bin
208-
return
209-
}
210-
211-
endBin := int(b.startBin) + len(b.counts) - 1
212-
213-
// if the new bin is inside the current range
214-
if bin >= b.startBin && int(bin) <= endBin {
215-
b.counts[bin-b.startBin]++
216-
return
217-
}
218-
// if the new bin is before the current start add spaces to the counts
219-
if bin < b.startBin {
220-
origLen := len(b.counts)
221-
newLength := endBin - int(bin) + 1
222-
shift := b.startBin - bin
223-
224-
if newLength > cap(b.counts) {
225-
b.counts = append(b.counts, make([]uint64, newLength-len(b.counts))...)
226-
}
227-
228-
copy(b.counts[shift:origLen+int(shift)], b.counts)
229-
b.counts = b.counts[:newLength]
230-
for i := 1; i < int(shift); i++ {
231-
b.counts[i] = 0
232-
}
248+
case bin < b.startBin:
233249
b.startBin = bin
234-
b.counts[0] = 1
235-
return
236-
}
237-
// if the new is after the end add spaces to the end
238-
if int(bin) > endBin {
239-
if int(bin-b.startBin) < cap(b.counts) {
240-
b.counts = b.counts[:bin-b.startBin+1]
241-
for i := endBin + 1 - int(b.startBin); i < len(b.counts); i++ {
242-
b.counts[i] = 0
243-
}
244-
b.counts[bin-b.startBin] = 1
245-
return
246-
}
247-
248-
end := make([]uint64, int(bin-b.startBin)-len(b.counts)+1)
249-
b.counts = append(b.counts, end...)
250-
b.counts[bin-b.startBin] = 1
250+
case bin >= b.endBin:
251+
b.endBin = bin + 1
251252
}
252253
}
253254

@@ -263,26 +264,42 @@ func (b *expoBuckets) downscale(delta int32) {
263264
// new Offset: -2
264265
// new Counts: [4, 14, 30, 10]
265266

266-
if len(b.counts) <= 1 || delta < 1 {
267+
oldStartBin := b.startBin
268+
oldEndBin := b.endBin
269+
oldLength := b.endBin - b.startBin
270+
if oldLength <= 1 || delta < 1 {
267271
b.startBin >>= delta
272+
b.endBin += b.startBin - oldStartBin
273+
// Shift all elements left by the change in start position
274+
startShift := b.getIdx(oldStartBin - b.startBin)
275+
b.counts = append(b.counts[startShift:], b.counts[:startShift]...)
268276
return
269277
}
270278

271279
steps := int32(1) << delta
272-
offset := b.startBin % steps
280+
281+
offset := oldStartBin % steps
273282
offset = (offset + steps) % steps // to make offset positive
274-
for i := 1; i < len(b.counts); i++ {
275-
idx := i + int(offset)
276-
if idx%int(steps) == 0 {
277-
b.counts[idx/int(steps)] = b.counts[i]
283+
newLen := (oldLength-1+offset)/steps + 1
284+
b.startBin >>= delta
285+
b.endBin = b.startBin + newLen
286+
startShift := b.getIdx(oldStartBin - b.startBin)
287+
288+
for i := oldStartBin + 1; i < oldEndBin; i++ {
289+
newIdx := b.getIdx(int32(math.Floor(float64(i)/float64(steps))) + int32(startShift))
290+
if i%steps == 0 {
291+
b.counts[newIdx] = b.counts[b.getIdx(i)]
278292
continue
279293
}
280-
b.counts[idx/int(steps)] += b.counts[i]
294+
b.counts[newIdx] += b.counts[b.getIdx(i)]
281295
}
296+
// Shift all elements left by the change in start position
297+
b.counts = append(b.counts[startShift:], b.counts[:startShift]...)
282298

283-
lastIdx := (len(b.counts) - 1 + int(offset)) / int(steps)
284-
b.counts = b.counts[:lastIdx+1]
285-
b.startBin >>= delta
299+
// Clear all elements that are outside of our start to end range
300+
for i := b.endBin; i < b.startBin+int32(len(b.counts)); i++ {
301+
b.counts[b.getIdx(i)] = 0
302+
}
286303
}
287304

288305
// newExponentialHistogram returns an Aggregator that summarizes a set of
@@ -381,21 +398,11 @@ func (e *expoHistogram[N]) delta(
381398
hDPts[i].ZeroCount = val.zeroCount
382399
hDPts[i].ZeroThreshold = 0.0
383400

384-
hDPts[i].PositiveBucket.Offset = val.posBuckets.startBin
385-
hDPts[i].PositiveBucket.Counts = reset(
386-
hDPts[i].PositiveBucket.Counts,
387-
len(val.posBuckets.counts),
388-
len(val.posBuckets.counts),
389-
)
390-
copy(hDPts[i].PositiveBucket.Counts, val.posBuckets.counts)
391-
392-
hDPts[i].NegativeBucket.Offset = val.negBuckets.startBin
393-
hDPts[i].NegativeBucket.Counts = reset(
394-
hDPts[i].NegativeBucket.Counts,
395-
len(val.negBuckets.counts),
396-
len(val.negBuckets.counts),
397-
)
398-
copy(hDPts[i].NegativeBucket.Counts, val.negBuckets.counts)
401+
offset := val.posBuckets.loadCountsAndOffset(&hDPts[i].PositiveBucket.Counts)
402+
hDPts[i].PositiveBucket.Offset = offset
403+
404+
offset = val.negBuckets.loadCountsAndOffset(&hDPts[i].NegativeBucket.Counts)
405+
hDPts[i].NegativeBucket.Offset = offset
399406

400407
if !e.noSum {
401408
hDPts[i].Sum = val.sum
@@ -444,21 +451,11 @@ func (e *expoHistogram[N]) cumulative(
444451
hDPts[i].ZeroCount = val.zeroCount
445452
hDPts[i].ZeroThreshold = 0.0
446453

447-
hDPts[i].PositiveBucket.Offset = val.posBuckets.startBin
448-
hDPts[i].PositiveBucket.Counts = reset(
449-
hDPts[i].PositiveBucket.Counts,
450-
len(val.posBuckets.counts),
451-
len(val.posBuckets.counts),
452-
)
453-
copy(hDPts[i].PositiveBucket.Counts, val.posBuckets.counts)
454-
455-
hDPts[i].NegativeBucket.Offset = val.negBuckets.startBin
456-
hDPts[i].NegativeBucket.Counts = reset(
457-
hDPts[i].NegativeBucket.Counts,
458-
len(val.negBuckets.counts),
459-
len(val.negBuckets.counts),
460-
)
461-
copy(hDPts[i].NegativeBucket.Counts, val.negBuckets.counts)
454+
offset := val.posBuckets.loadCountsAndOffset(&hDPts[i].PositiveBucket.Counts)
455+
hDPts[i].PositiveBucket.Offset = offset
456+
457+
offset = val.negBuckets.loadCountsAndOffset(&hDPts[i].NegativeBucket.Counts)
458+
hDPts[i].NegativeBucket.Offset = offset
462459

463460
if !e.noSum {
464461
hDPts[i].Sum = val.sum

0 commit comments

Comments
 (0)