Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 123 additions & 0 deletions balancer/pickfirst/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"context"
"fmt"
"testing"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/balancer/pickfirst"
Expand Down Expand Up @@ -291,3 +292,125 @@ func metricsDataFromReader(ctx context.Context, reader *metric.ManualReader) map
}
return gotMetrics
}

func (s) TestDisconnectLabel(t *testing.T) {
// 1. Valid GOAWAY
// Server GracefulStop sends GOAWAY with active streams = 0.
// This usually sends NoError(0) code.
t.Run("GoAway", func(t *testing.T) {
runDisconnectLabelTest(t, "GOAWAY NO_ERROR", func(ss *stubserver.StubServer) {
ss.S.GracefulStop()
// GracefulStop waits for connections to close, which happens after
// GOAWAY is sent.
})
})

// 2. IO Error
// Server Stop closes the listener and active connections immediately.
// This often results in "connection reset" or "EOF" (unknown) depending on timing/OS.
// Let's check for "unknown" or "connection reset" or "subchannel shutdown" strictly.
// In this test env, it often results in io.EOF which we mapped to "unknown".
t.Run("IO_Error", func(t *testing.T) {
runDisconnectLabelTest(t, "unknown", func(ss *stubserver.StubServer) {
ss.Stop()
})
})

// Scenario 3: Unknown (Client closes - voluntary? actually client close might be UNKNOWN or not recorded as split)
// If client closes, we might not record "disconnections" metric from ClientConn perspective?
// disconnections metric is "Number of times the selected subchannel becomes disconnected".
// If we close 'cc', we tear down subchannels.
// But let's try to trigger a case where we just disconnect without server side action?
// Or maybe "unknown" is what we get for "Idle" timeout?
// Let's stick to IO and GoAway first which are explicit in A94.
}

func runDisconnectLabelTest(t *testing.T, wantLabel string, triggerFunc func(*stubserver.StubServer)) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

ss := &stubserver.StubServer{
EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) {
return &testpb.Empty{}, nil
},
}
ss.StartServer()
defer ss.Stop() // Cleanup in case triggerFunc didn't fully stop or strict cleanup needed

sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(pfConfig)
r := manual.NewBuilderWithScheme("whatever")
r.InitialState(resolver.State{
ServiceConfig: sc,
Addresses: []resolver.Address{{Addr: ss.Address}},
})

grpcTarget := r.Scheme() + ":///"
reader := metric.NewManualReader()
provider := metric.NewMeterProvider(metric.WithReader(reader))
mo := opentelemetry.MetricsOptions{
MeterProvider: provider,
Metrics: opentelemetry.DefaultMetrics().Add("grpc.subchannel.disconnections"),
OptionalLabels: []string{"grpc.disconnect_error"},
}

cc, err := grpc.NewClient(grpcTarget, opentelemetry.DialOption(opentelemetry.Options{MetricsOptions: mo}), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
if err != nil {
t.Fatalf("NewClient() failed: %v", err)
}
defer cc.Close()

tsc := testgrpc.NewTestServiceClient(cc)
// Ensure connected
if _, err := tsc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
t.Fatalf("EmptyCall() failed: %v", err)
}

// Trigger disconnection
triggerFunc(ss)

// Wait for Idle state (disconnection happened)
testutils.AwaitState(ctx, t, cc, connectivity.Idle)

// Verify metrics

gotMetrics := metricsDataFromReader(ctx, reader)
val, ok := gotMetrics["grpc.subchannel.disconnections"]
if !ok {
t.Fatalf("Metric grpc.subchannel.disconnections not found")
}

// We used AssertEqual in the other test, let's use it here too if available.
// But checking attributes manually might be safer if AssertEqual is strict on other optional fields.
// Let's iterate datapoints.
start := time.Now()
for {
points := val.Data.(metricdata.Sum[int64]).DataPoints
if len(points) == 0 {
t.Fatalf("No data points for disconnections")
}
dp := points[0]
// Check attributes
seenLabel := false
var foundAttrs []string
for _, kv := range dp.Attributes.ToSlice() {
foundAttrs = append(foundAttrs, fmt.Sprintf("%s=%s", kv.Key, kv.Value.AsString()))
if kv.Key == "grpc.disconnect_error" {
seenLabel = true
if kv.Value.AsString() != wantLabel {
t.Errorf("Want label %q, got %q", wantLabel, kv.Value.AsString())
}
}
}
if !seenLabel && wantLabel != "" {
if time.Since(start) > time.Second {
t.Errorf("Label grpc.disconnect_error missing. Found attributes: %v", foundAttrs)
}
}
if seenLabel || time.Since(start) > time.Second {
break
}
time.Sleep(10 * time.Millisecond)
gotMetrics = metricsDataFromReader(ctx, reader)
val = gotMetrics["grpc.subchannel.disconnections"]
}
}
43 changes: 41 additions & 2 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,16 @@ import (
"fmt"
"math"
"net/url"
"os"
"slices"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"

"golang.org/x/net/http2"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/balancer/pickfirst"
Expand Down Expand Up @@ -1270,6 +1274,7 @@ type addrConn struct {

localityLabel string
backendServiceLabel string
disconnectError string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Can we add a Label suffix to stay consistent with existing field names.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

}

// Note: this requires a lock on ac.mu.
Expand All @@ -1286,9 +1291,14 @@ func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error)
// TODO: https://github.com/grpc/grpc-go/issues/7862 - Remove the second
// part of the if condition below once the issue is fixed.
if ac.state == connectivity.Ready || (ac.state == connectivity.Connecting && s == connectivity.Idle) {
disconnectionsMetric.Record(ac.cc.metricsRecorderList, 1, ac.cc.target, ac.backendServiceLabel, ac.localityLabel, "unknown")
disconnectError := ac.disconnectError
if disconnectError == "" {
disconnectError = "unknown"
}
disconnectionsMetric.Record(ac.cc.metricsRecorderList, 1, ac.cc.target, ac.backendServiceLabel, ac.localityLabel, disconnectError)
openConnectionsMetric.Record(ac.cc.metricsRecorderList, -1, ac.cc.target, ac.backendServiceLabel, ac.securityLevelLocked(), ac.localityLabel)
}
ac.disconnectError = "" // Reset for next time
ac.state = s
ac.channelz.ChannelMetrics.State.Store(&s)
if lastErr == nil {
Expand Down Expand Up @@ -1483,7 +1493,7 @@ func (ac *addrConn) createTransport(ctx context.Context, addr resolver.Address,
addr.ServerName = ac.cc.getServerName(addr)
hctx, hcancel := context.WithCancel(ctx)

onClose := func(r transport.GoAwayReason) {
onClose := func(r transport.GoAwayReason, goAwayCode http2.ErrCode, err error) {
ac.mu.Lock()
defer ac.mu.Unlock()
// adjust params based on GoAwayReason
Expand All @@ -1504,6 +1514,7 @@ func (ac *addrConn) createTransport(ctx context.Context, addr resolver.Address,
return
}
ac.transport = nil
ac.disconnectError = disconnectErrorString(r, goAwayCode, err)
// Refresh the name resolver on any connection loss.
ac.cc.resolveNow(resolver.ResolveNowOptions{})
// Always go idle and wait for the LB policy to initiate a new
Expand Down Expand Up @@ -1560,6 +1571,31 @@ func (ac *addrConn) createTransport(ctx context.Context, addr resolver.Address,
return nil
}

func disconnectErrorString(r transport.GoAwayReason, goAwayCode http2.ErrCode, err error) string {
if r != transport.GoAwayInvalid {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use a switch statement here. See: https://go.dev/doc/effective_go#switch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

return fmt.Sprintf("GOAWAY %s", goAwayCode.String())
}
if err == nil {
return "unknown"
}
if errors.Is(err, context.Canceled) {
return "subchannel shutdown"
}
if errors.Is(err, syscall.ECONNRESET) {
return "connection reset"
}
if errors.Is(err, syscall.ETIMEDOUT) || errors.Is(err, context.DeadlineExceeded) || errors.Is(err, os.ErrDeadlineExceeded) {
return "connection timed out"
}
if errors.Is(err, syscall.ECONNABORTED) {
return "connection aborted"
}
if errors.Is(err, syscall.ECONNREFUSED) {
return "socket error"
}
return "unknown"
}

// startHealthCheck starts the health checking stream (RPC) to watch the health
// stats of this connection if health checking is requested and configured.
//
Expand Down Expand Up @@ -1665,6 +1701,9 @@ func (ac *addrConn) tearDown(err error) {
ac.transport = nil
// We have to set the state to Shutdown before anything else to prevent races
// between setting the state and logic that waits on context cancellation / etc.
if ac.disconnectError == "" {
ac.disconnectError = "subchannel shutdown"
}
ac.updateConnectivityState(connectivity.Shutdown, nil)
ac.cancel()
ac.curAddr = resolver.Address{}
Expand Down
20 changes: 11 additions & 9 deletions internal/transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ type http2Client struct {
// goAwayDebugMessage contains a detailed human readable string about a
// GoAway frame, useful for error messages.
goAwayDebugMessage string
// goAwayCode records the http2.ErrCode received with the GoAway frame.
goAwayCode http2.ErrCode
// A condition variable used to signal when the keepalive goroutine should
// go dormant. The condition for dormancy is based on the number of active
// streams and the `PermitWithoutStream` keepalive client parameter. And
Expand All @@ -147,7 +149,7 @@ type http2Client struct {

channelz *channelz.Socket

onClose func(GoAwayReason)
onClose func(GoAwayReason, http2.ErrCode, error)

bufferPool mem.BufferPool

Expand Down Expand Up @@ -204,7 +206,7 @@ func isTemporary(err error) bool {
// NewHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
// and starts to receive messages on it. Non-nil error returns if construction
// fails.
func NewHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onClose func(GoAwayReason)) (_ ClientTransport, err error) {
func NewHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onClose func(GoAwayReason, http2.ErrCode, error)) (_ ClientTransport, err error) {
scheme := "http"
ctx, cancel := context.WithCancel(ctx)
defer func() {
Expand Down Expand Up @@ -1015,7 +1017,7 @@ func (t *http2Client) Close(err error) {
// Call t.onClose ASAP to prevent the client from attempting to create new
// streams.
if t.state != draining {
t.onClose(GoAwayInvalid)
t.onClose(GoAwayInvalid, http2.ErrCodeNo, err)
}
t.state = closing
streams := t.activeStreams
Expand Down Expand Up @@ -1086,7 +1088,7 @@ func (t *http2Client) GracefulClose() {
if t.logger.V(logLevel) {
t.logger.Infof("GracefulClose called")
}
t.onClose(GoAwayInvalid)
t.onClose(GoAwayInvalid, http2.ErrCodeNo, nil)
t.state = draining
active := len(t.activeStreams)
t.mu.Unlock()
Expand Down Expand Up @@ -1372,7 +1374,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) error {
// draining, to allow the client to stop attempting to create streams
// before disallowing new streams on this connection.
if t.state != draining {
t.onClose(t.goAwayReason)
t.onClose(t.goAwayReason, t.goAwayCode, nil)
t.state = draining
}
}
Expand Down Expand Up @@ -1417,11 +1419,11 @@ func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) {
t.goAwayReason = GoAwayTooManyPings
}
}
if len(f.DebugData()) == 0 {
t.goAwayDebugMessage = fmt.Sprintf("code: %s", f.ErrCode)
} else {
t.goAwayDebugMessage = fmt.Sprintf("code: %s, debug data: %q", f.ErrCode, string(f.DebugData()))
t.goAwayDebugMessage = fmt.Sprintf("code: %s", f.ErrCode)
if len(f.DebugData()) > 0 {
t.goAwayDebugMessage += fmt.Sprintf(", debug data: %q", string(f.DebugData()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not needed. reverted this part.

}
t.goAwayCode = f.ErrCode
}

func (t *http2Client) GetGoAwayReason() (GoAwayReason, string) {
Expand Down
20 changes: 10 additions & 10 deletions internal/transport/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ func setUpWithOptions(t *testing.T, port int, sc *ServerConfig, ht hType, copts
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
t.Cleanup(cancel)
connectCtx, cCancel := context.WithTimeout(context.Background(), 2*time.Second)
ct, connErr := NewHTTP2Client(connectCtx, ctx, addr, copts, func(GoAwayReason) {})
ct, connErr := NewHTTP2Client(connectCtx, ctx, addr, copts, func(GoAwayReason, http2.ErrCode, error) {})
if connErr != nil {
cCancel() // Do not cancel in success path.
t.Fatalf("failed to create transport: %v", connErr)
Expand Down Expand Up @@ -601,7 +601,7 @@ func setUpControllablePingServer(t *testing.T, copts ConnectOptions, connCh chan
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
t.Cleanup(cancel)
connectCtx, cCancel := context.WithTimeout(context.Background(), 2*time.Second)
tr, err := NewHTTP2Client(connectCtx, ctx, resolver.Address{Addr: lis.Addr().String()}, copts, func(GoAwayReason) {})
tr, err := NewHTTP2Client(connectCtx, ctx, resolver.Address{Addr: lis.Addr().String()}, copts, func(GoAwayReason, http2.ErrCode, error) {})
if err != nil {
cCancel() // Do not cancel in success path.
// Server clean-up.
Expand Down Expand Up @@ -1484,7 +1484,7 @@ func (s) TestClientHonorsConnectContext(t *testing.T) {
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
_, err = NewHTTP2Client(connectCtx, ctx, resolver.Address{Addr: lis.Addr().String()}, copts, func(GoAwayReason) {})
_, err = NewHTTP2Client(connectCtx, ctx, resolver.Address{Addr: lis.Addr().String()}, copts, func(GoAwayReason, http2.ErrCode, error) {})
if err == nil {
t.Fatalf("NewHTTP2Client() returned successfully; wanted error")
}
Expand All @@ -1496,7 +1496,7 @@ func (s) TestClientHonorsConnectContext(t *testing.T) {
// Test context deadline.
connectCtx, cancel = context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
_, err = NewHTTP2Client(connectCtx, ctx, resolver.Address{Addr: lis.Addr().String()}, copts, func(GoAwayReason) {})
_, err = NewHTTP2Client(connectCtx, ctx, resolver.Address{Addr: lis.Addr().String()}, copts, func(GoAwayReason, http2.ErrCode, error) {})
if err == nil {
t.Fatalf("NewHTTP2Client() returned successfully; wanted error")
}
Expand Down Expand Up @@ -1581,7 +1581,7 @@ func (s) TestClientWithMisbehavedServer(t *testing.T) {
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
ct, err := NewHTTP2Client(connectCtx, ctx, resolver.Address{Addr: lis.Addr().String()}, copts, func(GoAwayReason) {})
ct, err := NewHTTP2Client(connectCtx, ctx, resolver.Address{Addr: lis.Addr().String()}, copts, func(GoAwayReason, http2.ErrCode, error) {})
if err != nil {
t.Fatalf("Error while creating client transport: %v", err)
}
Expand Down Expand Up @@ -2602,7 +2602,7 @@ func (s) TestClientHandshakeInfo(t *testing.T) {
ChannelzParent: channelzSubChannel(t),
BufferPool: mem.DefaultBufferPool(),
}
tr, err := NewHTTP2Client(ctx, ctx, addr, copts, func(GoAwayReason) {})
tr, err := NewHTTP2Client(ctx, ctx, addr, copts, func(GoAwayReason, http2.ErrCode, error) {})
if err != nil {
t.Fatalf("NewHTTP2Client(): %v", err)
}
Expand Down Expand Up @@ -2644,7 +2644,7 @@ func (s) TestClientHandshakeInfoDialer(t *testing.T) {
ChannelzParent: channelzSubChannel(t),
BufferPool: mem.DefaultBufferPool(),
}
tr, err := NewHTTP2Client(ctx, ctx, addr, copts, func(GoAwayReason) {})
tr, err := NewHTTP2Client(ctx, ctx, addr, copts, func(GoAwayReason, http2.ErrCode, error) {})
if err != nil {
t.Fatalf("NewHTTP2Client(): %v", err)
}
Expand Down Expand Up @@ -3001,7 +3001,7 @@ func (s) TestClientSendsAGoAwayFrame(t *testing.T) {
cOpts := ConnectOptions{
BufferPool: mem.DefaultBufferPool(),
}
ct, err := NewHTTP2Client(ctx, ctx, resolver.Address{Addr: lis.Addr().String()}, cOpts, func(GoAwayReason) {})
ct, err := NewHTTP2Client(ctx, ctx, resolver.Address{Addr: lis.Addr().String()}, cOpts, func(GoAwayReason, http2.ErrCode, error) {})
if err != nil {
t.Fatalf("Error while creating client transport: %v", err)
}
Expand Down Expand Up @@ -3070,7 +3070,7 @@ func (s) TestClientCloseReturnsAfterReaderCompletes(t *testing.T) {

// Create a client transport with a custom dialer that hangs the Read()
// after Close().
ct, err := NewHTTP2Client(ctx, ctx, addr, copts, func(GoAwayReason) {})
ct, err := NewHTTP2Client(ctx, ctx, addr, copts, func(GoAwayReason, http2.ErrCode, error) {})
if err != nil {
t.Fatalf("Failed to create transport: %v", err)
}
Expand Down Expand Up @@ -3162,7 +3162,7 @@ func (s) TestClientCloseReturnsEarlyWhenGoAwayWriteHangs(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
// Create client transport with custom dialer
ct, connErr := NewHTTP2Client(connectCtx, ctx, addr, copts, func(GoAwayReason) {})
ct, connErr := NewHTTP2Client(connectCtx, ctx, addr, copts, func(GoAwayReason, http2.ErrCode, error) {})
if connErr != nil {
t.Fatalf("failed to create transport: %v", connErr)
}
Expand Down
Loading
Loading