Skip to content

Commit 51dbbb8

Browse files
fix: use AggregationDefault for spanner grpc metrics
This prevents the OTel SDK from dropping the configured AttributeFilter when processing asynchronous gauge metrics (such as grpc.subchannel.open_connections). Forcing AggregationSum on gauge instruments caused the View to be rejected, exporting distinct metric points with full cardinality. The exporter then blindly stripped the labels, resulting in identical TimeSeries collisions and metric drops in Cloud Monitoring due to duplicate timestamps. By using AggregationDefault, the OTel SDK correctly interprets the AttributeFilter for all metrics and natively aggregates gauges together before export.
1 parent 370e0ed commit 51dbbb8

2 files changed

Lines changed: 99 additions & 1 deletion

File tree

spanner/metric_monitoring_exporter_test.go

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,13 @@ import (
2929
"google.golang.org/grpc/keepalive"
3030
"google.golang.org/grpc/metadata"
3131
"google.golang.org/protobuf/types/known/emptypb"
32+
33+
"cloud.google.com/go/spanner/internal"
34+
"go.opentelemetry.io/otel/attribute"
35+
"go.opentelemetry.io/otel/metric"
36+
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
37+
"google.golang.org/api/option"
38+
"google.golang.org/grpc/credentials/insecure"
3239
)
3340

3441
type MetricsTestServer struct {
@@ -124,3 +131,94 @@ func NewMetricTestServer() (*MetricsTestServer, error) {
124131

125132
return testServer, nil
126133
}
134+
135+
func TestBuiltinMetrics_GaugeAggregation(t *testing.T) {
136+
ctx := context.Background()
137+
138+
// Setup mock monitoring server
139+
monitoringServer, err := NewMetricTestServer()
140+
if err != nil {
141+
t.Fatalf("Error setting up metrics test server")
142+
}
143+
defer monitoringServer.Shutdown()
144+
go monitoringServer.Serve()
145+
146+
// Override exporter options to use the mock
147+
origCreateExporterOptions := createExporterOptions
148+
createExporterOptions = func(opts ...option.ClientOption) []option.ClientOption {
149+
return []option.ClientOption{
150+
option.WithEndpoint(monitoringServer.Endpoint), // Connect to mock
151+
option.WithoutAuthentication(),
152+
option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())),
153+
}
154+
}
155+
defer func() {
156+
createExporterOptions = origCreateExporterOptions
157+
}()
158+
159+
// Build the meter provider using the built-in configs
160+
clientAttributes := []attribute.KeyValue{
161+
attribute.String("client_uid", "test-uid"),
162+
}
163+
164+
mpOptions, exporter, err := builtInMeterProviderOptions("test-project", "", clientAttributes)
165+
if err != nil {
166+
t.Fatalf("builtInMeterProviderOptions failed: %v", err)
167+
}
168+
169+
mp := sdkmetric.NewMeterProvider(mpOptions...)
170+
defer mp.Shutdown(ctx)
171+
172+
// Obtain the specific meter bound to the grpc metric meter name configured in views
173+
meter := mp.Meter("grpc-go", metric.WithInstrumentationVersion(internal.Version))
174+
175+
// Create the gauge (UpDownCounter in grpc-go)
176+
openConns, err := meter.Int64UpDownCounter("grpc.subchannel.open_connections")
177+
if err != nil {
178+
t.Fatalf("Failed to create counter: %v", err)
179+
}
180+
181+
// Record values with DIFFERENT target attributes (which should be DROPPED by the AttributeFilter)
182+
openConns.Add(ctx, 1, metric.WithAttributes(attribute.String("grpc.target", "target-1")))
183+
openConns.Add(ctx, 2, metric.WithAttributes(attribute.String("grpc.target", "target-2")))
184+
185+
// Force flush to the mock server
186+
err = mp.ForceFlush(ctx)
187+
if err != nil {
188+
t.Fatalf("ForceFlush failed: %v", err)
189+
}
190+
// Give the mock server a moment to receive the RPC
191+
time.Sleep(100 * time.Millisecond)
192+
193+
reqs := monitoringServer.CreateServiceTimeSeriesRequests()
194+
if len(reqs) == 0 {
195+
t.Fatalf("No CreateTimeSeriesRequests received")
196+
}
197+
198+
// Check if all exported metrics only have one data point for grpc.subchannel.open_connections
199+
var gaugePoints int64
200+
var foundTimeSeries int
201+
202+
for _, req := range reqs {
203+
for _, ts := range req.TimeSeries {
204+
if ts.Metric.Type == "spanner.googleapis.com/internal/client/grpc/subchannel/open_connections" {
205+
foundTimeSeries++
206+
if len(ts.Points) > 0 {
207+
gaugePoints += ts.Points[0].Value.GetInt64Value()
208+
}
209+
}
210+
}
211+
}
212+
213+
if foundTimeSeries != 1 {
214+
t.Errorf("Expected exactly ONE TimeSeries for open_connections due to attribute filtering making them identical, got %d", foundTimeSeries)
215+
}
216+
217+
// Since both Adds are recorded and their distinct attributes dropped, they are aggregated into 3.
218+
if gaugePoints != 3 {
219+
t.Errorf("Expected sum of open connections to be 3, got %d", gaugePoints)
220+
}
221+
222+
// Ensure that after flush we cleanly close exporter
223+
exporter.stop()
224+
}

spanner/metrics.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -362,7 +362,7 @@ func builtInMeterProviderOptions(project, compression string, clientAttributes [
362362
Name: m,
363363
},
364364
sdkmetric.Stream{
365-
Aggregation: sdkmetric.AggregationSum{},
365+
Aggregation: sdkmetric.AggregationDefault{},
366366
AttributeFilter: func(kv attribute.KeyValue) bool {
367367
if _, ok := allowedMetricLabels[string(kv.Key)]; ok {
368368
return true

0 commit comments

Comments
 (0)