Skip to content

Commit e2519d1

Browse files
committed
util/tracing: fix crash in StreamClientInterceptor
Before this patch, our client-side tracing interceptor for streaming rpc calls was exposed to gRPC bugs being currently fixed in github.com/grpc/grpc-go/pull/5323. This had to do with calls to clientStream.Context() panicking with an NPE under certain races with RPCs failing. We've recently gotten two crashes seemingly because of this. It's unclear why this hasn't shown up before, as nothing seems new (either on our side or on the grpc side). In 22.2 we do use more streaming RPCs than before (for example for span configs), so maybe that does it. This patch eliminates the problem by eliminating the problematic call into ClientStream.Context(). The background is that our interceptors needs to watch for ctx cancelation and consider the RPC done at that point. But, there was no reason for that call; we can more simply use the RPC caller's ctx for the purposes of figuring out if the caller cancels the RPC. In fact, calling ClientStream.Context() is bad for other reasons, beyond exposing us to the bug: 1) ClientStream.Context() pins the RPC attempt to a lower-level connection, and inhibits gRPC's ability to sometimes transparently retry failed calls. In fact, there's a comment on ClientStream.Context() that tells you not to call it before using the stream through other methods like Recv(), which imply that the RPC is already "pinned" and transparent retries are no longer possible anyway. We were breaking this. 2) One of the grpc-go maintainers suggested that, due to the bugs reference above, this call could actually sometimes give us "the wrong context", although how wrong exactly is not clear to me (i.e. could we have gotten a ctx that doesn't inherit from the caller's ctx? Or a ctx that's canceled independently from the caller?) Release note: A rare crash indicating a nil-pointer deference in google.golang.org/grpc/internal/transport.(*Stream).Context(...) was fixed.
1 parent 120fbbb commit e2519d1

File tree

2 files changed

+77
-14
lines changed

2 files changed

+77
-14
lines changed

pkg/util/tracing/grpc_interceptor.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -356,12 +356,12 @@ func StreamClientInterceptor(tracer *Tracer, init func(*Span)) grpc.StreamClient
356356
clientSpan.Finish()
357357
return cs, err
358358
}
359-
return newTracingClientStream(cs, method, desc, clientSpan), nil
359+
return newTracingClientStream(ctx, cs, desc, clientSpan), nil
360360
}
361361
}
362362

363363
func newTracingClientStream(
364-
cs grpc.ClientStream, method string, desc *grpc.StreamDesc, clientSpan *Span,
364+
ctx context.Context, cs grpc.ClientStream, desc *grpc.StreamDesc, clientSpan *Span,
365365
) grpc.ClientStream {
366366
finishChan := make(chan struct{})
367367

@@ -386,8 +386,14 @@ func newTracingClientStream(
386386
case <-finishChan:
387387
// The client span is being finished by another code path; hence, no
388388
// action is necessary.
389-
case <-cs.Context().Done():
390-
finishFunc(nil)
389+
case <-ctx.Done():
390+
// A streaming RPC can be finished by the caller cancelling the ctx. If
391+
// the ctx is cancelled, the caller doesn't necessarily need to interact
392+
// with the stream anymore (see [1]), so finishChan might never be
393+
// signaled). Thus, we listen for ctx cancellation and finish the span.
394+
//
395+
// [1] https://pkg.go.dev/google.golang.org/grpc#ClientConn.NewStream
396+
finishFunc(nil /* err */)
391397
}
392398
}()
393399
otcs := &tracingClientStream{

pkg/util/tracing/grpc_interceptor_test.go

Lines changed: 67 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ package tracing_test
1313
import (
1414
"context"
1515
"fmt"
16+
"io"
1617
"net"
17-
"runtime"
1818
"testing"
1919

2020
"github.com/cockroachdb/cockroach/pkg/testutils"
@@ -141,7 +141,10 @@ func TestGRPCInterceptors(t *testing.T) {
141141

142142
for _, tc := range []struct {
143143
name string
144-
do func(context.Context) (*types.Any, error)
144+
// expSpanName is the expected name of the RPC spans (client-side and
145+
// server-side). If not specified, the test's name is used.
146+
expSpanName string
147+
do func(context.Context) (*types.Any, error)
145148
}{
146149
{
147150
name: "UnaryUnary",
@@ -156,11 +159,45 @@ func TestGRPCInterceptors(t *testing.T) {
156159
if err != nil {
157160
return nil, err
158161
}
159-
any, err := sc.Recv()
162+
if err := sc.CloseSend(); err != nil {
163+
return nil, err
164+
}
165+
var firstResponse *types.Any
166+
// Consume the stream fully, as mandated by the gRPC API.
167+
for {
168+
any, err := sc.Recv()
169+
if err == io.EOF {
170+
break
171+
}
172+
if err != nil {
173+
return nil, err
174+
}
175+
if firstResponse == nil {
176+
firstResponse = any
177+
}
178+
}
179+
return firstResponse, nil
180+
},
181+
},
182+
{
183+
// Test that cancelling the client's ctx finishes the client span. The
184+
// client span is usually finished either when Recv() receives an error
185+
// (e.g. when receiving an io.EOF after exhausting the stream). But the
186+
// client is allowed to not read from the stream any more if it cancels
187+
// the ctx.
188+
name: "UnaryStream_ContextCancel",
189+
expSpanName: "UnaryStream",
190+
do: func(ctx context.Context) (*types.Any, error) {
191+
ctx, cancel := context.WithCancel(ctx)
192+
defer cancel()
193+
sc, err := c.UnaryStream(ctx, unusedAny)
160194
if err != nil {
161195
return nil, err
162196
}
163-
return any, sc.CloseSend()
197+
if err := sc.CloseSend(); err != nil {
198+
return nil, err
199+
}
200+
return sc.Recv()
164201
},
165202
},
166203
{
@@ -186,7 +223,24 @@ func TestGRPCInterceptors(t *testing.T) {
186223
if err := sc.Send(unusedAny); err != nil {
187224
return nil, err
188225
}
189-
return sc.Recv()
226+
if err := sc.CloseSend(); err != nil {
227+
return nil, err
228+
}
229+
var firstResponse *types.Any
230+
// Consume the stream fully, as mandated by the gRPC API.
231+
for {
232+
any, err := sc.Recv()
233+
if err == io.EOF {
234+
break
235+
}
236+
if err != nil {
237+
return nil, err
238+
}
239+
if firstResponse == nil {
240+
firstResponse = any
241+
}
242+
}
243+
return firstResponse, nil
190244
},
191245
},
192246
} {
@@ -214,20 +268,23 @@ func TestGRPCInterceptors(t *testing.T) {
214268
}
215269
require.Equal(t, 1, n)
216270

271+
expSpanName := tc.expSpanName
272+
if expSpanName == "" {
273+
expSpanName = tc.name
274+
}
217275
exp := fmt.Sprintf(`
218276
span: root
219277
span: /cockroach.testutils.grpcutils.GRPCTest/%[1]s
220278
tags: span.kind=client
221279
span: /cockroach.testutils.grpcutils.GRPCTest/%[1]s
222280
tags: span.kind=server
223-
event: structured=magic-value`, tc.name)
281+
event: structured=magic-value`, expSpanName)
224282
require.NoError(t, tracing.CheckRecordedSpans(finalRecs, exp))
225283
})
226284
}
227-
// Force a GC so that the finalizer for the stream client span runs and closes
228-
// the span. Nothing else closes that span in this test. See
229-
// newTracingClientStream().
230-
runtime.GC()
285+
// Check that all the RPC spans (client-side and server-side) have been
286+
// closed. SucceedsSoon because the closing of the span is async (although
287+
// immediate) in the ctx cancellation subtest.
231288
testutils.SucceedsSoon(t, func() error {
232289
return tr.VisitSpans(func(sp tracing.RegistrySpan) error {
233290
rec := sp.GetFullRecording(tracing.RecordingVerbose)[0]

0 commit comments

Comments
 (0)