Skip to content

Commit aa9c8ba

Browse files
committed
Refactor distributions into shared interface
1 parent b056fef commit aa9c8ba

File tree

15 files changed

+152
-189
lines changed

15 files changed

+152
-189
lines changed

metric/distribution/distribution.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,18 +38,24 @@ type Distribution interface {
3838
// weight is 1/samplingRate
3939
AddEntry(value float64, weight float64) error
4040

41-
AddDistribution(distribution Distribution)
42-
43-
AddDistributionWithWeight(distribution Distribution, weight float64)
41+
AddDistribution(Distribution)
42+
Resize(int) []Distribution
43+
}
4444

45+
type ClassicDistribution interface {
46+
Distribution
4547
ConvertToOtel(dp pmetric.HistogramDataPoint)
4648

4749
ConvertFromOtel(dp pmetric.HistogramDataPoint, unit string)
50+
}
51+
52+
type ExponentialDistribution interface {
53+
Distribution
4854

49-
Resize(listMaxSize int) []Distribution
55+
ConvertFromOtel(dp pmetric.ExponentialHistogramDataPoint, unit string)
5056
}
5157

52-
var NewDistribution func() Distribution
58+
var NewClassicDistribution func() ClassicDistribution
5359

5460
// IsSupportedValue checks to see if the metric is between the min value and 2^360 and not a NaN.
5561
// This matches the accepted range described in the MetricDatum documentation

metric/distribution/exph/exph.go

Lines changed: 49 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -11,22 +11,28 @@ import (
1111
"slices"
1212

1313
"go.opentelemetry.io/collector/pdata/pmetric"
14+
15+
"github.com/aws/amazon-cloudwatch-agent/metric/distribution"
1416
)
1517

1618
type ExpHistogramDistribution struct {
1719
max float64
1820
min float64
1921
sampleCount float64
2022
sum float64
21-
scale int32
23+
scale int
2224
positiveBuckets map[int]uint64 // map of bucket index to count
2325
negativeBuckets map[int]uint64 // map of bucket index to count
2426
zeroThreshold float64
2527
zeroCount uint64
2628
unit string
2729
}
2830

29-
func NewExpHistogramDistribution() *ExpHistogramDistribution {
31+
func NewExponentialDistribution() distribution.ExponentialDistribution {
32+
return newExpHistogramDistribution()
33+
}
34+
35+
func newExpHistogramDistribution() *ExpHistogramDistribution {
3036
return &ExpHistogramDistribution{
3137
max: -math.MaxFloat64,
3238
min: math.MaxFloat64,
@@ -81,8 +87,8 @@ func (d *ExpHistogramDistribution) ValuesAndCounts() ([]float64, []float64) {
8187
})
8288
for _, offsetIndex := range posOffsetIndicies {
8389
counter := d.positiveBuckets[offsetIndex]
84-
bucketBegin := LowerBoundary(offsetIndex, int(d.scale))
85-
bucketEnd := LowerBoundary(offsetIndex+1, int(d.scale))
90+
bucketBegin := LowerBoundary(offsetIndex, d.scale)
91+
bucketEnd := LowerBoundary(offsetIndex+1, d.scale)
8692
value := (bucketBegin + bucketEnd) / 2.0
8793
values = append(values, value)
8894
counts = append(counts, float64(counter))
@@ -97,8 +103,8 @@ func (d *ExpHistogramDistribution) ValuesAndCounts() ([]float64, []float64) {
97103
negOffsetIndicies := slices.Sorted(maps.Keys(d.negativeBuckets))
98104
for _, offsetIndex := range negOffsetIndicies {
99105
counter := d.negativeBuckets[offsetIndex]
100-
bucketBegin := LowerBoundary(offsetIndex, int(d.scale))
101-
bucketEnd := LowerBoundary(offsetIndex+1, int(d.scale))
106+
bucketBegin := LowerBoundary(offsetIndex, d.scale)
107+
bucketEnd := LowerBoundary(offsetIndex+1, d.scale)
102108
value := -(bucketBegin + bucketEnd) / 2.0
103109
values = append(values, value)
104110
counts = append(counts, float64(counter))
@@ -107,43 +113,60 @@ func (d *ExpHistogramDistribution) ValuesAndCounts() ([]float64, []float64) {
107113
return values, counts
108114
}
109115

110-
func (d *ExpHistogramDistribution) AddDistribution(from *ExpHistogramDistribution) {
116+
// weight is 1/samplingRate
117+
func (d *ExpHistogramDistribution) AddEntry(value float64, weight float64) error {
118+
return d.AddEntryWithUnit(value, weight, "")
119+
}
120+
121+
// weight is 1/samplingRate
122+
func (d *ExpHistogramDistribution) AddEntryWithUnit(value float64, weight float64, unit string) error {
123+
return nil
124+
}
125+
126+
func (d *ExpHistogramDistribution) AddDistribution(from distribution.Distribution) {
127+
128+
expFrom, ok := from.(*ExpHistogramDistribution)
129+
if !ok {
130+
log.Printf("E! The from distribution is not an exponential histogram distribution. Cannot add distributions: %v", from)
131+
return
132+
}
133+
111134
if from.SampleCount() <= 0 {
112135
log.Printf("D! SampleCount should be larger than 0: %v", from.SampleCount())
113136
return
114137
}
115138

116139
// some scales are compatible due to perfect subsetting (buckets of an exponential histogram map exactly into
117140
// buckets with a lesser scale). for simplicity, deny adding distributions if the scales dont match
118-
if from.scale != d.scale {
119-
log.Printf("E! The from distribution scale is not compatible with the to distribution scale: from distribution scale %v, to distribution scale %v", from.scale, d.scale)
141+
if expFrom.scale != d.scale {
142+
log.Printf("E! The from distribution scale is not compatible with the to distribution scale: from distribution scale %v, to distribution scale %v", expFrom.scale, d.scale)
120143
return
121144
}
122145

123-
if from.zeroThreshold != d.zeroThreshold {
124-
log.Printf("E! The from distribution zeroThreshold is not compatible with the to distribution zeroThreshold: from distribution zeroThreshold %v, to distribution zeroThreshold %v", from.zeroThreshold, d.zeroThreshold)
146+
if expFrom.zeroThreshold != d.zeroThreshold {
147+
log.Printf("E! The from distribution zeroThreshold is not compatible with the to distribution zeroThreshold: from distribution zeroThreshold %v, to distribution zeroThreshold %v", expFrom.zeroThreshold, d.zeroThreshold)
125148
return
126149
}
127150

128-
d.max = max(d.max, from.Maximum())
129-
d.min = min(d.min, from.Minimum())
130-
d.sampleCount += from.SampleCount()
131-
d.sum += from.Sum()
151+
d.max = max(d.max, expFrom.Maximum())
152+
d.min = min(d.min, expFrom.Minimum())
153+
d.sampleCount += expFrom.SampleCount()
154+
d.sum += expFrom.Sum()
132155

133-
for i := range from.positiveBuckets {
134-
d.positiveBuckets[i] += from.positiveBuckets[i]
156+
for i := range expFrom.positiveBuckets {
157+
d.positiveBuckets[i] += expFrom.positiveBuckets[i]
135158
}
136159

137-
d.zeroCount += from.zeroCount
160+
d.zeroCount += expFrom.zeroCount
138161

139-
for i := range from.negativeBuckets {
140-
d.negativeBuckets[i] += from.negativeBuckets[i]
162+
for i := range expFrom.negativeBuckets {
163+
d.negativeBuckets[i] += expFrom.negativeBuckets[i]
141164
}
142165

143166
if d.unit == "" {
144-
d.unit = from.Unit()
145-
} else if d.unit != from.Unit() && from.Unit() != "" {
146-
log.Printf("D! Multiple units are detected: %s, %s", d.unit, from.Unit())
167+
d.unit = expFrom.Unit()
168+
} else if d.unit != expFrom.Unit() && expFrom.Unit() != "" {
169+
log.Printf("D! Multiple units are detected: %s, %s", d.unit, expFrom.Unit())
147170
}
148171

149172
}
@@ -152,7 +175,7 @@ func (d *ExpHistogramDistribution) ConvertFromOtel(dp pmetric.ExponentialHistogr
152175
positiveBuckets := dp.Positive()
153176
negativeBuckets := dp.Negative()
154177

155-
d.scale = dp.Scale()
178+
d.scale = int(dp.Scale())
156179
d.unit = unit
157180

158181
d.max = dp.Max()
@@ -181,7 +204,7 @@ func (d *ExpHistogramDistribution) ConvertFromOtel(dp pmetric.ExponentialHistogr
181204
}
182205
}
183206

184-
func (d *ExpHistogramDistribution) Resize(_ int) []*ExpHistogramDistribution {
207+
func (d *ExpHistogramDistribution) Resize(_ int) []distribution.Distribution {
185208
// TODO: split data points into separate PMD requests if the number of buckets exceeds the API limit
186-
return []*ExpHistogramDistribution{d}
209+
return []distribution.Distribution{d}
187210
}

0 commit comments

Comments
 (0)