Skip to content

Commit 1303098

Browse files
committed
client: fix potential panic during RPC retries (#5323)
1 parent beb2eaf commit 1303098

File tree

6 files changed

+228
-151
lines changed

6 files changed

+228
-151
lines changed

clientconn.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -907,14 +907,10 @@ func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
907907
}
908908

909909
func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
910-
t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{
910+
return cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{
911911
Ctx: ctx,
912912
FullMethodName: method,
913913
})
914-
if err != nil {
915-
return nil, nil, toRPCErr(err)
916-
}
917-
return t, done, nil
918914
}
919915

920916
func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSelector iresolver.ConfigSelector, addrs []resolver.Address) {

internal/transport/http2_client.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -631,17 +631,16 @@ func (t *http2Client) getCallAuthData(ctx context.Context, audience string, call
631631
// the wire. However, there are two notable exceptions:
632632
//
633633
// 1. If the stream headers violate the max header list size allowed by the
634-
// server. In this case there is no reason to retry at all, as it is
635-
// assumed the RPC would continue to fail on subsequent attempts.
634+
// server. It's possible this could succeed on another transport, even if
635+
// it's unlikely, but do not transparently retry.
636636
// 2. If the credentials errored when requesting their headers. In this case,
637637
// it's possible a retry can fix the problem, but indefinitely transparently
638638
// retrying is not appropriate as it is likely the credentials, if they can
639639
// eventually succeed, would need I/O to do so.
640640
type NewStreamError struct {
641641
Err error
642642

643-
DoNotRetry bool
644-
DoNotTransparentRetry bool
643+
AllowTransparentRetry bool
645644
}
646645

647646
func (e NewStreamError) Error() string {
@@ -650,11 +649,11 @@ func (e NewStreamError) Error() string {
650649

651650
// NewStream creates a stream and registers it into the transport as "active"
652651
// streams. All non-nil errors returned will be *NewStreamError.
653-
func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
652+
func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error) {
654653
ctx = peer.NewContext(ctx, t.getPeer())
655654
headerFields, err := t.createHeaderFields(ctx, callHdr)
656655
if err != nil {
657-
return nil, &NewStreamError{Err: err, DoNotTransparentRetry: true}
656+
return nil, &NewStreamError{Err: err, AllowTransparentRetry: false}
658657
}
659658
s := t.newStream(ctx, callHdr)
660659
cleanup := func(err error) {
@@ -754,23 +753,24 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
754753
return true
755754
}, hdr)
756755
if err != nil {
757-
return nil, &NewStreamError{Err: err}
756+
// Connection closed.
757+
return nil, &NewStreamError{Err: err, AllowTransparentRetry: true}
758758
}
759759
if success {
760760
break
761761
}
762762
if hdrListSizeErr != nil {
763-
return nil, &NewStreamError{Err: hdrListSizeErr, DoNotRetry: true}
763+
return nil, &NewStreamError{Err: hdrListSizeErr}
764764
}
765765
firstTry = false
766766
select {
767767
case <-ch:
768768
case <-ctx.Done():
769769
return nil, &NewStreamError{Err: ContextErr(ctx.Err())}
770770
case <-t.goAway:
771-
return nil, &NewStreamError{Err: errStreamDrain}
771+
return nil, &NewStreamError{Err: errStreamDrain, AllowTransparentRetry: true}
772772
case <-t.ctx.Done():
773-
return nil, &NewStreamError{Err: ErrConnClosing}
773+
return nil, &NewStreamError{Err: ErrConnClosing, AllowTransparentRetry: true}
774774
}
775775
}
776776
if t.statsHandler != nil {

picker_wrapper.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.
131131
}
132132
if _, ok := status.FromError(err); ok {
133133
// Status error: end the RPC unconditionally with this status.
134-
return nil, nil, err
134+
return nil, nil, dropError{error: err}
135135
}
136136
// For all other errors, wait for ready RPCs should block and other
137137
// RPCs should fail with unavailable.
@@ -175,3 +175,9 @@ func (pw *pickerWrapper) close() {
175175
pw.done = true
176176
close(pw.blockingCh)
177177
}
178+
179+
// dropError is a wrapper error that indicates the LB policy wishes to drop the
180+
// RPC and not retry it.
181+
type dropError struct {
182+
error
183+
}

0 commit comments

Comments
 (0)