From de970238cd218c4711d088dcc59cc140db43e22c Mon Sep 17 00:00:00 2001 From: Madhav Bissa Date: Fri, 13 Mar 2026 07:11:12 +0000 Subject: [PATCH 1/6] disconnect reason for subchannel metrics (A94) --- balancer/pickfirst/metrics_test.go | 123 +++++++++++++++++ clientconn.go | 43 +++++- internal/transport/http2_client.go | 26 ++-- internal/transport/transport_test.go | 20 +-- stats/opentelemetry/e2e_test.go | 189 +++++++++++++++++++++++++++ 5 files changed, 379 insertions(+), 22 deletions(-) diff --git a/balancer/pickfirst/metrics_test.go b/balancer/pickfirst/metrics_test.go index 3257cbe5d454..a89f2947b254 100644 --- a/balancer/pickfirst/metrics_test.go +++ b/balancer/pickfirst/metrics_test.go @@ -22,6 +22,7 @@ import ( "context" "fmt" "testing" + "time" "google.golang.org/grpc" "google.golang.org/grpc/balancer/pickfirst" @@ -291,3 +292,125 @@ func metricsDataFromReader(ctx context.Context, reader *metric.ManualReader) map } return gotMetrics } + +func (s) TestDisconnectLabel(t *testing.T) { + // 1. Valid GOAWAY + // Server GracefulStop sends GOAWAY with active streams = 0. + // This usually sends NoError(0) code. + t.Run("GoAway", func(t *testing.T) { + runDisconnectLabelTest(t, "GOAWAY NO_ERROR", func(ss *stubserver.StubServer) { + ss.S.GracefulStop() + // GracefulStop waits for connections to close, which happens after + // GOAWAY is sent. + }) + }) + + // 2. IO Error + // Server Stop closes the listener and active connections immediately. + // This often results in "connection reset" or "EOF" (unknown) depending on timing/OS. + // Let's check for "unknown" or "connection reset" or "subchannel shutdown" strictly. + // In this test env, it often results in io.EOF which we mapped to "unknown". + t.Run("IO_Error", func(t *testing.T) { + runDisconnectLabelTest(t, "unknown", func(ss *stubserver.StubServer) { + ss.Stop() + }) + }) + + // Scenario 3: Unknown (Client closes - voluntary? actually client close might be UNKNOWN or not recorded as split) + // If client closes, we might not record "disconnections" metric from ClientConn perspective? + // disconnections metric is "Number of times the selected subchannel becomes disconnected". + // If we close 'cc', we tear down subchannels. + // But let's try to trigger a case where we just disconnect without server side action? + // Or maybe "unknown" is what we get for "Idle" timeout? + // Let's stick to IO and GoAway first which are explicit in A94. +} + +func runDisconnectLabelTest(t *testing.T, wantLabel string, triggerFunc func(*stubserver.StubServer)) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + ss := &stubserver.StubServer{ + EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { + return &testpb.Empty{}, nil + }, + } + ss.StartServer() + defer ss.Stop() // Cleanup in case triggerFunc didn't fully stop or strict cleanup needed + + sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(pfConfig) + r := manual.NewBuilderWithScheme("whatever") + r.InitialState(resolver.State{ + ServiceConfig: sc, + Addresses: []resolver.Address{{Addr: ss.Address}}, + }) + + grpcTarget := r.Scheme() + ":///" + reader := metric.NewManualReader() + provider := metric.NewMeterProvider(metric.WithReader(reader)) + mo := opentelemetry.MetricsOptions{ + MeterProvider: provider, + Metrics: opentelemetry.DefaultMetrics().Add("grpc.subchannel.disconnections"), + OptionalLabels: []string{"grpc.disconnect_error"}, + } + + cc, err := grpc.NewClient(grpcTarget, opentelemetry.DialOption(opentelemetry.Options{MetricsOptions: mo}), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r)) + if err != nil { + t.Fatalf("NewClient() failed: %v", err) + } + defer cc.Close() + + tsc := testgrpc.NewTestServiceClient(cc) + // Ensure connected + if _, err := tsc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { + t.Fatalf("EmptyCall() failed: %v", err) + } + + // Trigger disconnection + triggerFunc(ss) + + // Wait for Idle state (disconnection happened) + testutils.AwaitState(ctx, t, cc, connectivity.Idle) + + // Verify metrics + + gotMetrics := metricsDataFromReader(ctx, reader) + val, ok := gotMetrics["grpc.subchannel.disconnections"] + if !ok { + t.Fatalf("Metric grpc.subchannel.disconnections not found") + } + + // We used AssertEqual in the other test, let's use it here too if available. + // But checking attributes manually might be safer if AssertEqual is strict on other optional fields. + // Let's iterate datapoints. + start := time.Now() + for { + points := val.Data.(metricdata.Sum[int64]).DataPoints + if len(points) == 0 { + t.Fatalf("No data points for disconnections") + } + dp := points[0] + // Check attributes + seenLabel := false + var foundAttrs []string + for _, kv := range dp.Attributes.ToSlice() { + foundAttrs = append(foundAttrs, fmt.Sprintf("%s=%s", kv.Key, kv.Value.AsString())) + if kv.Key == "grpc.disconnect_error" { + seenLabel = true + if kv.Value.AsString() != wantLabel { + t.Errorf("Want label %q, got %q", wantLabel, kv.Value.AsString()) + } + } + } + if !seenLabel && wantLabel != "" { + if time.Since(start) > time.Second { + t.Errorf("Label grpc.disconnect_error missing. Found attributes: %v", foundAttrs) + } + } + if seenLabel || time.Since(start) > time.Second { + break + } + time.Sleep(10 * time.Millisecond) + gotMetrics = metricsDataFromReader(ctx, reader) + val = gotMetrics["grpc.subchannel.disconnections"] + } +} diff --git a/clientconn.go b/clientconn.go index 5dec2dacc0ba..8a0aabbb977c 100644 --- a/clientconn.go +++ b/clientconn.go @@ -24,12 +24,16 @@ import ( "fmt" "math" "net/url" + "os" "slices" "strings" "sync" "sync/atomic" + "syscall" "time" + "golang.org/x/net/http2" + "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/base" "google.golang.org/grpc/balancer/pickfirst" @@ -1270,6 +1274,7 @@ type addrConn struct { localityLabel string backendServiceLabel string + disconnectError string } // Note: this requires a lock on ac.mu. @@ -1286,9 +1291,14 @@ func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error) // TODO: https://github.com/grpc/grpc-go/issues/7862 - Remove the second // part of the if condition below once the issue is fixed. if ac.state == connectivity.Ready || (ac.state == connectivity.Connecting && s == connectivity.Idle) { - disconnectionsMetric.Record(ac.cc.metricsRecorderList, 1, ac.cc.target, ac.backendServiceLabel, ac.localityLabel, "unknown") + disconnectError := ac.disconnectError + if disconnectError == "" { + disconnectError = "unknown" + } + disconnectionsMetric.Record(ac.cc.metricsRecorderList, 1, ac.cc.target, ac.backendServiceLabel, ac.localityLabel, disconnectError) openConnectionsMetric.Record(ac.cc.metricsRecorderList, -1, ac.cc.target, ac.backendServiceLabel, ac.securityLevelLocked(), ac.localityLabel) } + ac.disconnectError = "" // Reset for next time ac.state = s ac.channelz.ChannelMetrics.State.Store(&s) if lastErr == nil { @@ -1483,7 +1493,7 @@ func (ac *addrConn) createTransport(ctx context.Context, addr resolver.Address, addr.ServerName = ac.cc.getServerName(addr) hctx, hcancel := context.WithCancel(ctx) - onClose := func(r transport.GoAwayReason) { + onClose := func(r transport.GoAwayReason, goAwayCode http2.ErrCode, err error) { ac.mu.Lock() defer ac.mu.Unlock() // adjust params based on GoAwayReason @@ -1504,6 +1514,7 @@ func (ac *addrConn) createTransport(ctx context.Context, addr resolver.Address, return } ac.transport = nil + ac.disconnectError = disconnectErrorString(r, goAwayCode, err) // Refresh the name resolver on any connection loss. ac.cc.resolveNow(resolver.ResolveNowOptions{}) // Always go idle and wait for the LB policy to initiate a new @@ -1560,6 +1571,31 @@ func (ac *addrConn) createTransport(ctx context.Context, addr resolver.Address, return nil } +func disconnectErrorString(r transport.GoAwayReason, goAwayCode http2.ErrCode, err error) string { + if r != transport.GoAwayInvalid { + return fmt.Sprintf("GOAWAY %s", goAwayCode.String()) + } + if err == nil { + return "unknown" + } + if errors.Is(err, context.Canceled) { + return "subchannel shutdown" + } + if errors.Is(err, syscall.ECONNRESET) { + return "connection reset" + } + if errors.Is(err, syscall.ETIMEDOUT) || errors.Is(err, context.DeadlineExceeded) || errors.Is(err, os.ErrDeadlineExceeded) { + return "connection timed out" + } + if errors.Is(err, syscall.ECONNABORTED) { + return "connection aborted" + } + if errors.Is(err, syscall.ECONNREFUSED) { + return "socket error" + } + return "unknown" +} + // startHealthCheck starts the health checking stream (RPC) to watch the health // stats of this connection if health checking is requested and configured. // @@ -1665,6 +1701,9 @@ func (ac *addrConn) tearDown(err error) { ac.transport = nil // We have to set the state to Shutdown before anything else to prevent races // between setting the state and logic that waits on context cancellation / etc. + if ac.disconnectError == "" { + ac.disconnectError = "subchannel shutdown" + } ac.updateConnectivityState(connectivity.Shutdown, nil) ac.cancel() ac.curAddr = resolver.Address{} diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index c943503f3590..e1cd21b44c7f 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -134,6 +134,8 @@ type http2Client struct { // goAwayDebugMessage contains a detailed human readable string about a // GoAway frame, useful for error messages. goAwayDebugMessage string + // goAwayCode records the http2.ErrCode received with the GoAway frame. + goAwayCode http2.ErrCode // A condition variable used to signal when the keepalive goroutine should // go dormant. The condition for dormancy is based on the number of active // streams and the `PermitWithoutStream` keepalive client parameter. And @@ -147,7 +149,7 @@ type http2Client struct { channelz *channelz.Socket - onClose func(GoAwayReason) + onClose func(GoAwayReason, http2.ErrCode, error) bufferPool mem.BufferPool @@ -204,7 +206,7 @@ func isTemporary(err error) bool { // NewHTTP2Client constructs a connected ClientTransport to addr based on HTTP2 // and starts to receive messages on it. Non-nil error returns if construction // fails. -func NewHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onClose func(GoAwayReason)) (_ ClientTransport, err error) { +func NewHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onClose func(GoAwayReason, http2.ErrCode, error)) (_ ClientTransport, err error) { scheme := "http" ctx, cancel := context.WithCancel(ctx) defer func() { @@ -1015,7 +1017,7 @@ func (t *http2Client) Close(err error) { // Call t.onClose ASAP to prevent the client from attempting to create new // streams. if t.state != draining { - t.onClose(GoAwayInvalid) + t.onClose(GoAwayInvalid, http2.ErrCodeNo, err) } t.state = closing streams := t.activeStreams @@ -1086,7 +1088,7 @@ func (t *http2Client) GracefulClose() { if t.logger.V(logLevel) { t.logger.Infof("GracefulClose called") } - t.onClose(GoAwayInvalid) + t.onClose(GoAwayInvalid, http2.ErrCodeNo, nil) t.state = draining active := len(t.activeStreams) t.mu.Unlock() @@ -1372,7 +1374,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) error { // draining, to allow the client to stop attempting to create streams // before disallowing new streams on this connection. if t.state != draining { - t.onClose(t.goAwayReason) + t.onClose(t.goAwayReason, t.goAwayCode, nil) t.state = draining } } @@ -1406,22 +1408,26 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) error { return nil } +// setGoAwayReason sets the value of t.goAwayReason based +// on the GoAway frame received. +// It expects a lock on transport's mutex to be held by +// the caller. // setGoAwayReason sets the value of t.goAwayReason based // on the GoAway frame received. // It expects a lock on transport's mutex to be held by // the caller. func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) { t.goAwayReason = GoAwayNoReason + t.goAwayDebugMessage = fmt.Sprintf("code: %s", f.ErrCode) + if len(f.DebugData()) > 0 { + t.goAwayDebugMessage += fmt.Sprintf(", debug data: %q", string(f.DebugData())) + } if f.ErrCode == http2.ErrCodeEnhanceYourCalm { if string(f.DebugData()) == "too_many_pings" { t.goAwayReason = GoAwayTooManyPings } } - if len(f.DebugData()) == 0 { - t.goAwayDebugMessage = fmt.Sprintf("code: %s", f.ErrCode) - } else { - t.goAwayDebugMessage = fmt.Sprintf("code: %s, debug data: %q", f.ErrCode, string(f.DebugData())) - } + t.goAwayCode = f.ErrCode } func (t *http2Client) GetGoAwayReason() (GoAwayReason, string) { diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index 2d32216ba6fa..48528408fe8b 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -530,7 +530,7 @@ func setUpWithOptions(t *testing.T, port int, sc *ServerConfig, ht hType, copts ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) t.Cleanup(cancel) connectCtx, cCancel := context.WithTimeout(context.Background(), 2*time.Second) - ct, connErr := NewHTTP2Client(connectCtx, ctx, addr, copts, func(GoAwayReason) {}) + ct, connErr := NewHTTP2Client(connectCtx, ctx, addr, copts, func(GoAwayReason, http2.ErrCode, error) {}) if connErr != nil { cCancel() // Do not cancel in success path. t.Fatalf("failed to create transport: %v", connErr) @@ -601,7 +601,7 @@ func setUpControllablePingServer(t *testing.T, copts ConnectOptions, connCh chan ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) t.Cleanup(cancel) connectCtx, cCancel := context.WithTimeout(context.Background(), 2*time.Second) - tr, err := NewHTTP2Client(connectCtx, ctx, resolver.Address{Addr: lis.Addr().String()}, copts, func(GoAwayReason) {}) + tr, err := NewHTTP2Client(connectCtx, ctx, resolver.Address{Addr: lis.Addr().String()}, copts, func(GoAwayReason, http2.ErrCode, error) {}) if err != nil { cCancel() // Do not cancel in success path. // Server clean-up. @@ -1484,7 +1484,7 @@ func (s) TestClientHonorsConnectContext(t *testing.T) { } ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - _, err = NewHTTP2Client(connectCtx, ctx, resolver.Address{Addr: lis.Addr().String()}, copts, func(GoAwayReason) {}) + _, err = NewHTTP2Client(connectCtx, ctx, resolver.Address{Addr: lis.Addr().String()}, copts, func(GoAwayReason, http2.ErrCode, error) {}) if err == nil { t.Fatalf("NewHTTP2Client() returned successfully; wanted error") } @@ -1496,7 +1496,7 @@ func (s) TestClientHonorsConnectContext(t *testing.T) { // Test context deadline. connectCtx, cancel = context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancel() - _, err = NewHTTP2Client(connectCtx, ctx, resolver.Address{Addr: lis.Addr().String()}, copts, func(GoAwayReason) {}) + _, err = NewHTTP2Client(connectCtx, ctx, resolver.Address{Addr: lis.Addr().String()}, copts, func(GoAwayReason, http2.ErrCode, error) {}) if err == nil { t.Fatalf("NewHTTP2Client() returned successfully; wanted error") } @@ -1581,7 +1581,7 @@ func (s) TestClientWithMisbehavedServer(t *testing.T) { } ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - ct, err := NewHTTP2Client(connectCtx, ctx, resolver.Address{Addr: lis.Addr().String()}, copts, func(GoAwayReason) {}) + ct, err := NewHTTP2Client(connectCtx, ctx, resolver.Address{Addr: lis.Addr().String()}, copts, func(GoAwayReason, http2.ErrCode, error) {}) if err != nil { t.Fatalf("Error while creating client transport: %v", err) } @@ -2602,7 +2602,7 @@ func (s) TestClientHandshakeInfo(t *testing.T) { ChannelzParent: channelzSubChannel(t), BufferPool: mem.DefaultBufferPool(), } - tr, err := NewHTTP2Client(ctx, ctx, addr, copts, func(GoAwayReason) {}) + tr, err := NewHTTP2Client(ctx, ctx, addr, copts, func(GoAwayReason, http2.ErrCode, error) {}) if err != nil { t.Fatalf("NewHTTP2Client(): %v", err) } @@ -2644,7 +2644,7 @@ func (s) TestClientHandshakeInfoDialer(t *testing.T) { ChannelzParent: channelzSubChannel(t), BufferPool: mem.DefaultBufferPool(), } - tr, err := NewHTTP2Client(ctx, ctx, addr, copts, func(GoAwayReason) {}) + tr, err := NewHTTP2Client(ctx, ctx, addr, copts, func(GoAwayReason, http2.ErrCode, error) {}) if err != nil { t.Fatalf("NewHTTP2Client(): %v", err) } @@ -3001,7 +3001,7 @@ func (s) TestClientSendsAGoAwayFrame(t *testing.T) { cOpts := ConnectOptions{ BufferPool: mem.DefaultBufferPool(), } - ct, err := NewHTTP2Client(ctx, ctx, resolver.Address{Addr: lis.Addr().String()}, cOpts, func(GoAwayReason) {}) + ct, err := NewHTTP2Client(ctx, ctx, resolver.Address{Addr: lis.Addr().String()}, cOpts, func(GoAwayReason, http2.ErrCode, error) {}) if err != nil { t.Fatalf("Error while creating client transport: %v", err) } @@ -3070,7 +3070,7 @@ func (s) TestClientCloseReturnsAfterReaderCompletes(t *testing.T) { // Create a client transport with a custom dialer that hangs the Read() // after Close(). - ct, err := NewHTTP2Client(ctx, ctx, addr, copts, func(GoAwayReason) {}) + ct, err := NewHTTP2Client(ctx, ctx, addr, copts, func(GoAwayReason, http2.ErrCode, error) {}) if err != nil { t.Fatalf("Failed to create transport: %v", err) } @@ -3162,7 +3162,7 @@ func (s) TestClientCloseReturnsEarlyWhenGoAwayWriteHangs(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() // Create client transport with custom dialer - ct, connErr := NewHTTP2Client(connectCtx, ctx, addr, copts, func(GoAwayReason) {}) + ct, connErr := NewHTTP2Client(connectCtx, ctx, addr, copts, func(GoAwayReason, http2.ErrCode, error) {}) if connErr != nil { t.Fatalf("failed to create transport: %v", connErr) } diff --git a/stats/opentelemetry/e2e_test.go b/stats/opentelemetry/e2e_test.go index 8d05e0e5037e..b211b3f8ca33 100644 --- a/stats/opentelemetry/e2e_test.go +++ b/stats/opentelemetry/e2e_test.go @@ -20,8 +20,11 @@ import ( "context" "fmt" "io" + "net" "slices" "strconv" + "sync" + "syscall" "testing" "time" @@ -1996,3 +1999,189 @@ func (s) TestHealthStreamNoOtelErrorLog(t *testing.T) { t.Fatalf("EmptyCall failed: %v", err) } } + +type errorConn struct { + net.Conn + mu sync.Mutex + err error +} + +func (c *errorConn) Read(b []byte) (int, error) { + n, err := c.Conn.Read(b) + c.mu.Lock() + defer c.mu.Unlock() + if c.err != nil { + return 0, c.err + } + return n, err +} + +// TestSubChannelDisconnectionLabels verifies granular disconnect error labels. +func (s) TestSubChannelDisconnectionLabels(t *testing.T) { + // Wrapper to run valid scenarios + + runScenario := func(name, expectedLabel string, action func(backend *stubserver.StubServer, cc *grpc.ClientConn, injectErr func(error), updateResource func(*e2e.UpdateOptions))) { + t.Run(name, func(t *testing.T) { + // Start a single backend server. + backend := stubserver.StartTestService(t, &stubserver.StubServer{ + EmptyCallF: func(_ context.Context, _ *testpb.Empty) (*testpb.Empty, error) { + return &testpb.Empty{}, nil + }, + }) + port := itestutils.ParsePort(t, backend.Address) + defer backend.Stop() + + serviceName := "service-" + name + managementServer, nodeID, _, xdsResolver := setup.ManagementServerAndResolver(t) + routeConfigName := "route-" + serviceName + clusterName := "cluster-" + serviceName + endpointsName := "endpoints-" + serviceName + + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, routeConfigName)}, + Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(routeConfigName, serviceName, clusterName)}, + Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, endpointsName, e2e.SecurityLevelNone)}, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.EndpointResourceWithOptions(e2e.EndpointOptions{ + ClusterName: endpointsName, + Host: "localhost", + Localities: []e2e.LocalityOptions{ + { + Backends: []e2e.BackendOptions{{Ports: []uint32{port}}}, + Weight: 1, + }, + }, + })}, + } + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Setup Telemetry. + reader := metric.NewManualReader() + provider := metric.NewMeterProvider(metric.WithReader(reader)) + mo := opentelemetry.MetricsOptions{ + MeterProvider: provider, + Metrics: opentelemetry.DefaultMetrics().Add( + "grpc.subchannel.disconnections", + ), + OptionalLabels: []string{ + "grpc.lb.locality", + "grpc.lb.backend_service", + "grpc.disconnect_error", + }, + } + + var activeConn *errorConn + dialer := func(ctx context.Context, addr string) (net.Conn, error) { + conn, err := (&net.Dialer{}).DialContext(ctx, "tcp", addr) + if err != nil { + return nil, err + } + activeConn = &errorConn{Conn: conn} + return activeConn, nil + } + + target := fmt.Sprintf("xds:///%s", serviceName) + cc, err := grpc.NewClient(target, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithResolvers(xdsResolver), + grpc.WithContextDialer(dialer), + opentelemetry.DialOption(opentelemetry.Options{MetricsOptions: mo}), + ) + if err != nil { + t.Fatalf("Failed to create client: %v", err) + } + defer cc.Close() + + // Wait for connection to be READY + client := testpb.NewTestServiceClient(cc) + if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil { + t.Fatalf("EmptyCall failed: %v", err) + } + + injectErr := func(err error) { + if activeConn != nil { + activeConn.mu.Lock() + activeConn.err = err + activeConn.mu.Unlock() + activeConn.Conn.Close() + } + } + + // Perform the action to trigger disconnect + action(backend, cc, injectErr, func(opts *e2e.UpdateOptions) { + opts.NodeID = nodeID // Ensure NodeID matches + if err := managementServer.Update(ctx, *opts); err != nil { + t.Fatal(err) + } + }) + + // Verify Metric + wantMetrics := []metricdata.Metrics{ + { + Name: "grpc.subchannel.disconnections", + Description: "EXPERIMENTAL. Number of times the selected subchannel becomes disconnected.", + Unit: "{disconnection}", + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String("grpc.target", target), + attribute.String("grpc.lb.backend_service", clusterName), + attribute.String("grpc.lb.locality", `{region="region-1", zone="zone-1", sub_zone="subzone-1"}`), + attribute.String("grpc.disconnect_error", expectedLabel), + ), + Value: 1, + }, + }, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + }, + }, + } + + if err := pollForWantMetrics(ctx, t, reader, wantMetrics); err != nil { + t.Fatalf("Failed to verify metric for case %s: %v", name, err) + } + }) + } + + // 1. GOAWAY + runScenario("GoAway", "GOAWAY NO_ERROR", func(backend *stubserver.StubServer, _ *grpc.ClientConn, _ func(error), _ func(*e2e.UpdateOptions)) { + backend.S.GracefulStop() + }) + + // 2. IO Error (Generic unknown) + runScenario("IO_Error", "unknown", func(_ *stubserver.StubServer, _ *grpc.ClientConn, injectErr func(error), _ func(*e2e.UpdateOptions)) { + injectErr(io.EOF) + }) + + // 3. Subchannel Shutdown + runScenario("SubchannelShutdown", "subchannel shutdown", func(_ *stubserver.StubServer, cc *grpc.ClientConn, _ func(error), _ func(*e2e.UpdateOptions)) { + cc.Close() + }) + + // 4. Connection Reset + runScenario("ConnectionReset", "connection reset", func(_ *stubserver.StubServer, _ *grpc.ClientConn, injectErr func(error), _ func(*e2e.UpdateOptions)) { + injectErr(syscall.ECONNRESET) + }) + + // 5. Connection Timed Out + runScenario("ConnectionTimeout", "connection timed out", func(_ *stubserver.StubServer, _ *grpc.ClientConn, injectErr func(error), _ func(*e2e.UpdateOptions)) { + injectErr(context.DeadlineExceeded) + }) + + // 6. Socket Error + runScenario("SocketError", "socket error", func(_ *stubserver.StubServer, _ *grpc.ClientConn, injectErr func(error), _ func(*e2e.UpdateOptions)) { + injectErr(syscall.ECONNREFUSED) + }) + + // 7. Connection Aborted + runScenario("ConnectionAborted", "connection aborted", func(_ *stubserver.StubServer, _ *grpc.ClientConn, injectErr func(error), _ func(*e2e.UpdateOptions)) { + injectErr(syscall.ECONNABORTED) + }) +} From 4bdc057c48b1ede246160743b984a68697792078 Mon Sep 17 00:00:00 2001 From: Madhav Bissa Date: Fri, 13 Mar 2026 10:40:05 +0000 Subject: [PATCH 2/6] revie comment --- internal/transport/http2_client.go | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index e1cd21b44c7f..c23809e71893 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -1408,25 +1408,21 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) error { return nil } -// setGoAwayReason sets the value of t.goAwayReason based -// on the GoAway frame received. -// It expects a lock on transport's mutex to be held by -// the caller. // setGoAwayReason sets the value of t.goAwayReason based // on the GoAway frame received. // It expects a lock on transport's mutex to be held by // the caller. func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) { t.goAwayReason = GoAwayNoReason - t.goAwayDebugMessage = fmt.Sprintf("code: %s", f.ErrCode) - if len(f.DebugData()) > 0 { - t.goAwayDebugMessage += fmt.Sprintf(", debug data: %q", string(f.DebugData())) - } if f.ErrCode == http2.ErrCodeEnhanceYourCalm { if string(f.DebugData()) == "too_many_pings" { t.goAwayReason = GoAwayTooManyPings } } + t.goAwayDebugMessage = fmt.Sprintf("code: %s", f.ErrCode) + if len(f.DebugData()) > 0 { + t.goAwayDebugMessage += fmt.Sprintf(", debug data: %q", string(f.DebugData())) + } t.goAwayCode = f.ErrCode } From dcaf507f4a06a67dbf568cc1f9758e0334d0e94d Mon Sep 17 00:00:00 2001 From: Madhav Bissa Date: Tue, 17 Mar 2026 06:24:41 +0000 Subject: [PATCH 3/6] review comments --- clientconn.go | 40 ++++++++++++++---------------- internal/transport/http2_client.go | 7 +++--- 2 files changed, 22 insertions(+), 25 deletions(-) diff --git a/clientconn.go b/clientconn.go index 8a0aabbb977c..a4f8aad97365 100644 --- a/clientconn.go +++ b/clientconn.go @@ -1272,9 +1272,9 @@ type addrConn struct { channelz *channelz.SubChannel - localityLabel string - backendServiceLabel string - disconnectError string + localityLabel string + backendServiceLabel string + disconnectErrorLabel string } // Note: this requires a lock on ac.mu. @@ -1291,14 +1291,14 @@ func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error) // TODO: https://github.com/grpc/grpc-go/issues/7862 - Remove the second // part of the if condition below once the issue is fixed. if ac.state == connectivity.Ready || (ac.state == connectivity.Connecting && s == connectivity.Idle) { - disconnectError := ac.disconnectError + disconnectError := ac.disconnectErrorLabel if disconnectError == "" { disconnectError = "unknown" } disconnectionsMetric.Record(ac.cc.metricsRecorderList, 1, ac.cc.target, ac.backendServiceLabel, ac.localityLabel, disconnectError) openConnectionsMetric.Record(ac.cc.metricsRecorderList, -1, ac.cc.target, ac.backendServiceLabel, ac.securityLevelLocked(), ac.localityLabel) } - ac.disconnectError = "" // Reset for next time + ac.disconnectErrorLabel = "" // Reset for next time ac.state = s ac.channelz.ChannelMetrics.State.Store(&s) if lastErr == nil { @@ -1514,7 +1514,7 @@ func (ac *addrConn) createTransport(ctx context.Context, addr resolver.Address, return } ac.transport = nil - ac.disconnectError = disconnectErrorString(r, goAwayCode, err) + ac.disconnectErrorLabel = disconnectErrorString(r, goAwayCode, err) // Refresh the name resolver on any connection loss. ac.cc.resolveNow(resolver.ResolveNowOptions{}) // Always go idle and wait for the LB policy to initiate a new @@ -1572,28 +1572,24 @@ func (ac *addrConn) createTransport(ctx context.Context, addr resolver.Address, } func disconnectErrorString(r transport.GoAwayReason, goAwayCode http2.ErrCode, err error) string { - if r != transport.GoAwayInvalid { + switch { + case r != transport.GoAwayInvalid: return fmt.Sprintf("GOAWAY %s", goAwayCode.String()) - } - if err == nil { + case err == nil: return "unknown" - } - if errors.Is(err, context.Canceled) { + case errors.Is(err, context.Canceled): return "subchannel shutdown" - } - if errors.Is(err, syscall.ECONNRESET) { + case errors.Is(err, syscall.ECONNRESET): return "connection reset" - } - if errors.Is(err, syscall.ETIMEDOUT) || errors.Is(err, context.DeadlineExceeded) || errors.Is(err, os.ErrDeadlineExceeded) { + case errors.Is(err, syscall.ETIMEDOUT), errors.Is(err, context.DeadlineExceeded), errors.Is(err, os.ErrDeadlineExceeded): return "connection timed out" - } - if errors.Is(err, syscall.ECONNABORTED) { + case errors.Is(err, syscall.ECONNABORTED): return "connection aborted" - } - if errors.Is(err, syscall.ECONNREFUSED) { + case errors.Is(err, syscall.ECONNREFUSED): return "socket error" + default: + return "unknown" } - return "unknown" } // startHealthCheck starts the health checking stream (RPC) to watch the health @@ -1701,8 +1697,8 @@ func (ac *addrConn) tearDown(err error) { ac.transport = nil // We have to set the state to Shutdown before anything else to prevent races // between setting the state and logic that waits on context cancellation / etc. - if ac.disconnectError == "" { - ac.disconnectError = "subchannel shutdown" + if ac.disconnectErrorLabel == "" { + ac.disconnectErrorLabel = "subchannel shutdown" } ac.updateConnectivityState(connectivity.Shutdown, nil) ac.cancel() diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index c23809e71893..615ff1a1467b 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -1419,9 +1419,10 @@ func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) { t.goAwayReason = GoAwayTooManyPings } } - t.goAwayDebugMessage = fmt.Sprintf("code: %s", f.ErrCode) - if len(f.DebugData()) > 0 { - t.goAwayDebugMessage += fmt.Sprintf(", debug data: %q", string(f.DebugData())) + if len(f.DebugData()) == 0 { + t.goAwayDebugMessage = fmt.Sprintf("code: %s", f.ErrCode) + } else { + t.goAwayDebugMessage = fmt.Sprintf("code: %s, debug data: %q", f.ErrCode, string(f.DebugData())) } t.goAwayCode = f.ErrCode } From 8eab968f86c3a16406e41e8bd825ba9ca348ffe8 Mon Sep 17 00:00:00 2001 From: Madhav Bissa Date: Tue, 17 Mar 2026 06:49:53 +0000 Subject: [PATCH 4/6] fixing merge failures --- internal/transport/transport_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index c2fc507b27a4..3933754b2c0d 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -3551,7 +3551,7 @@ func setupRSTStreamOnEOSTest(ctx context.Context, t *testing.T, sendServerFrames // Set up a client. copts := ConnectOptions{BufferPool: mem.DefaultBufferPool()} - ct, err := NewHTTP2Client(ctx, ctx, resolver.Address{Addr: lis.Addr().String()}, copts, func(GoAwayReason) {}) + ct, err := NewHTTP2Client(ctx, ctx, resolver.Address{Addr: lis.Addr().String()}, copts, func(GoAwayReason, http2.ErrCode, error) {}) if err != nil { t.Fatalf("NewHTTP2Client failed: %v", err) } From 88cd619333860c8513be97f56cbde05108d32a2f Mon Sep 17 00:00:00 2001 From: Madhav Bissa Date: Tue, 17 Mar 2026 07:18:09 +0000 Subject: [PATCH 5/6] make the test deterministic by ensuring correct listener behaviour --- balancer/pickfirst/metrics_test.go | 78 ++++++++++++++++++++++-------- 1 file changed, 57 insertions(+), 21 deletions(-) diff --git a/balancer/pickfirst/metrics_test.go b/balancer/pickfirst/metrics_test.go index a89f2947b254..53045120f34c 100644 --- a/balancer/pickfirst/metrics_test.go +++ b/balancer/pickfirst/metrics_test.go @@ -21,6 +21,10 @@ package pickfirst_test import ( "context" "fmt" + "io" + "net" + "sync" + "syscall" "testing" "time" @@ -298,34 +302,48 @@ func (s) TestDisconnectLabel(t *testing.T) { // Server GracefulStop sends GOAWAY with active streams = 0. // This usually sends NoError(0) code. t.Run("GoAway", func(t *testing.T) { - runDisconnectLabelTest(t, "GOAWAY NO_ERROR", func(ss *stubserver.StubServer) { + runDisconnectLabelTest(t, "GOAWAY NO_ERROR", func(ss *stubserver.StubServer, _ *controllableConn) { ss.S.GracefulStop() - // GracefulStop waits for connections to close, which happens after - // GOAWAY is sent. }) }) - // 2. IO Error - // Server Stop closes the listener and active connections immediately. - // This often results in "connection reset" or "EOF" (unknown) depending on timing/OS. - // Let's check for "unknown" or "connection reset" or "subchannel shutdown" strictly. - // In this test env, it often results in io.EOF which we mapped to "unknown". - t.Run("IO_Error", func(t *testing.T) { - runDisconnectLabelTest(t, "unknown", func(ss *stubserver.StubServer) { - ss.Stop() + t.Run("ConnectionReset", func(t *testing.T) { + runDisconnectLabelTest(t, "connection reset", func(_ *stubserver.StubServer, cc *controllableConn) { + cc.breakWith(syscall.ECONNRESET) }) }) - // Scenario 3: Unknown (Client closes - voluntary? actually client close might be UNKNOWN or not recorded as split) - // If client closes, we might not record "disconnections" metric from ClientConn perspective? - // disconnections metric is "Number of times the selected subchannel becomes disconnected". - // If we close 'cc', we tear down subchannels. - // But let's try to trigger a case where we just disconnect without server side action? - // Or maybe "unknown" is what we get for "Idle" timeout? - // Let's stick to IO and GoAway first which are explicit in A94. + t.Run("EOF", func(t *testing.T) { + runDisconnectLabelTest(t, "unknown", func(_ *stubserver.StubServer, cc *controllableConn) { + cc.breakWith(io.EOF) + }) + }) +} + +type controllableConn struct { + net.Conn + mu sync.Mutex + readErr error +} + +func (c *controllableConn) Read(b []byte) (int, error) { + n, err := c.Conn.Read(b) + c.mu.Lock() + defer c.mu.Unlock() + if c.readErr != nil { + return 0, c.readErr + } + return n, err } -func runDisconnectLabelTest(t *testing.T, wantLabel string, triggerFunc func(*stubserver.StubServer)) { +func (c *controllableConn) breakWith(err error) { + c.mu.Lock() + c.readErr = err + c.mu.Unlock() + c.Conn.Close() +} + +func runDisconnectLabelTest(t *testing.T, wantLabel string, triggerFunc func(*stubserver.StubServer, *controllableConn)) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -353,7 +371,21 @@ func runDisconnectLabelTest(t *testing.T, wantLabel string, triggerFunc func(*st OptionalLabels: []string{"grpc.disconnect_error"}, } - cc, err := grpc.NewClient(grpcTarget, opentelemetry.DialOption(opentelemetry.Options{MetricsOptions: mo}), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r)) + var mu sync.Mutex + var lastConn *controllableConn + dialer := func(ctx context.Context, addr string) (net.Conn, error) { + conn, err := (&net.Dialer{}).DialContext(ctx, "tcp", addr) + if err != nil { + return nil, err + } + cc := &controllableConn{Conn: conn} + mu.Lock() + lastConn = cc + mu.Unlock() + return cc, nil + } + + cc, err := grpc.NewClient(grpcTarget, opentelemetry.DialOption(opentelemetry.Options{MetricsOptions: mo}), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r), grpc.WithContextDialer(dialer)) if err != nil { t.Fatalf("NewClient() failed: %v", err) } @@ -365,8 +397,12 @@ func runDisconnectLabelTest(t *testing.T, wantLabel string, triggerFunc func(*st t.Fatalf("EmptyCall() failed: %v", err) } + mu.Lock() + lc := lastConn + mu.Unlock() + // Trigger disconnection - triggerFunc(ss) + triggerFunc(ss, lc) // Wait for Idle state (disconnection happened) testutils.AwaitState(ctx, t, cc, connectivity.Idle) From 8907a8e5a41d34f7558c84768e387c6e0f87ab4c Mon Sep 17 00:00:00 2001 From: Madhav Bissa Date: Thu, 19 Mar 2026 05:24:22 +0000 Subject: [PATCH 6/6] changing existing tests to adapt for new changes in race --- stats/opentelemetry/e2e_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/stats/opentelemetry/e2e_test.go b/stats/opentelemetry/e2e_test.go index b211b3f8ca33..21b2db1e9f09 100644 --- a/stats/opentelemetry/e2e_test.go +++ b/stats/opentelemetry/e2e_test.go @@ -1875,7 +1875,7 @@ func (s) TestSubChannelMetrics(t *testing.T) { targetAttr := attribute.String("grpc.target", target) localityAttr := attribute.String("grpc.lb.locality", `{region="region-1", zone="zone-1", sub_zone="subzone-1"}`) backendServiceAttr := attribute.String("grpc.lb.backend_service", clusterName) - disconnectionReasonAttr := attribute.String("grpc.disconnect_error", "unknown") + disconnectionReasonAttr := attribute.String("grpc.disconnect_error", "GOAWAY NO_ERROR") securityLevelAttr := attribute.String("grpc.security_level", "NoSecurity") // Verify Connect Metrics. @@ -1915,8 +1915,8 @@ func (s) TestSubChannelMetrics(t *testing.T) { t.Fatal(err) } - // Stop backend to trigger Disconnect Metrics. - backend.Stop() + // Stop backend gracefully to trigger Disconnect Metrics with GOAWAY NO_ERROR. + backend.S.GracefulStop() disconnectionWantMetrics := []metricdata.Metrics{ {