Skip to content

Commit 3cc0fec

Browse files
committed
Backport vitessio#17858
1 parent 79dc8e3 commit 3cc0fec

2 files changed

Lines changed: 131 additions & 18 deletions

File tree

go/vt/vtgate/vstream_manager.go

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package vtgate
1818

1919
import (
2020
"context"
21+
"errors"
2122
"fmt"
2223
"io"
2324
"regexp"
@@ -51,8 +52,11 @@ type vstreamManager struct {
5152
toposerv srvtopo.Server
5253
cell string
5354

54-
vstreamsCreated *stats.CountersWithMultiLabels
55-
vstreamsLag *stats.GaugesWithMultiLabels
55+
vstreamsCreated *stats.CountersWithMultiLabels
56+
vstreamsLag *stats.GaugesWithMultiLabels
57+
vstreamsCount *stats.CountersWithMultiLabels
58+
vstreamsEventsStreamed *stats.CountersWithMultiLabels
59+
vstreamsEndedWithErrors *stats.CountersWithMultiLabels
5660
}
5761

5862
// maxSkewTimeoutSeconds is the maximum allowed skew between two streams when the MinimizeSkew flag is set
@@ -143,6 +147,7 @@ type journalEvent struct {
143147

144148
func newVStreamManager(resolver *srvtopo.Resolver, serv srvtopo.Server, cell string) *vstreamManager {
145149
exporter := servenv.NewExporter(cell, "VStreamManager")
150+
labels := []string{"Keyspace", "ShardName", "TabletType"}
146151

147152
return &vstreamManager{
148153
resolver: resolver,
@@ -151,11 +156,23 @@ func newVStreamManager(resolver *srvtopo.Resolver, serv srvtopo.Server, cell str
151156
vstreamsCreated: exporter.NewCountersWithMultiLabels(
152157
"VStreamsCreated",
153158
"Number of vstreams created",
154-
[]string{"Keyspace", "ShardName", "TabletType"}),
159+
labels),
155160
vstreamsLag: exporter.NewGaugesWithMultiLabels(
156161
"VStreamsLag",
157162
"Difference between event current time and the binlog event timestamp",
158-
[]string{"Keyspace", "ShardName", "TabletType"}),
163+
labels),
164+
vstreamsCount: exporter.NewCountersWithMultiLabels(
165+
"VStreamsCount",
166+
"Number of active vstreams",
167+
labels),
168+
vstreamsEventsStreamed: exporter.NewCountersWithMultiLabels(
169+
"VStreamsEventsStreamed",
170+
"Number of events sent across all vstreams",
171+
labels),
172+
vstreamsEndedWithErrors: exporter.NewCountersWithMultiLabels(
173+
"VStreamsEndedWithErrors",
174+
"Number of vstreams that ended with errors",
175+
labels),
159176
}
160177
}
161178

@@ -378,11 +395,26 @@ func (vs *vstream) startOneStream(ctx context.Context, sgtid *binlogdatapb.Shard
378395
vs.wg.Add(1)
379396
go func() {
380397
defer vs.wg.Done()
398+
399+
labelValues := []string{sgtid.Keyspace, sgtid.Shard, vs.tabletType.String()}
400+
// Initialize vstreamsEndedWithErrors metric to zero.
401+
vs.vsm.vstreamsEndedWithErrors.Add(labelValues, 0)
402+
vs.vsm.vstreamsCreated.Add(labelValues, 1)
403+
vs.vsm.vstreamsCount.Add(labelValues, 1)
404+
381405
err := vs.streamFromTablet(ctx, sgtid)
382406

383407
// Set the error on exit. First one wins.
384408
if err != nil {
385409
log.Errorf("Error in vstream for %+v: %s", sgtid, err)
410+
// Get the original/base error.
411+
uerr := vterrors.UnwrapAll(err)
412+
if !errors.Is(uerr, context.Canceled) && !errors.Is(uerr, context.DeadlineExceeded) {
413+
// The client did not intentionally end the stream so this was an error in the
414+
// vstream itself.
415+
vs.vsm.vstreamsEndedWithErrors.Add(labelValues, 1)
416+
}
417+
vs.vsm.vstreamsCount.Add(labelValues, -1)
386418
vs.once.Do(func() {
387419
vs.setError(err, fmt.Sprintf("error starting stream from shard GTID %+v", sgtid))
388420
vs.cancel()
@@ -503,6 +535,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
503535
// It will be closed when all journal events converge.
504536
var journalDone chan struct{}
505537
ignoreTablets := make([]*topodatapb.TabletAlias, 0)
538+
labelValues := []string{sgtid.Keyspace, sgtid.Shard, vs.tabletType.String()}
506539

507540
errCount := 0
508541
for {
@@ -613,18 +646,11 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
613646
TableLastPKs: sgtid.TablePKs,
614647
Options: options,
615648
}
616-
var vstreamCreatedOnce sync.Once
617649
log.Infof("Starting to vstream from %s, with req %+v", tabletAliasString, req)
618650
err = tabletConn.VStream(ctx, req, func(events []*binlogdatapb.VEvent) error {
619651
// We received a valid event. Reset error count.
620652
errCount = 0
621653

622-
labels := []string{sgtid.Keyspace, sgtid.Shard, req.Target.TabletType.String()}
623-
624-
vstreamCreatedOnce.Do(func() {
625-
vs.vsm.vstreamsCreated.Add(labels, 1)
626-
})
627-
628654
select {
629655
case <-ctx.Done():
630656
return vterrors.Wrapf(ctx.Err(), "context ended while streaming from tablet %s in %s/%s",
@@ -755,7 +781,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
755781
sendevents = append(sendevents, event)
756782
}
757783
lag := event.CurrentTime/1e9 - event.Timestamp
758-
vs.vsm.vstreamsLag.Set(labels, lag)
784+
vs.vsm.vstreamsLag.Set(labelValues, lag)
759785
}
760786
if len(sendevents) != 0 {
761787
eventss = append(eventss, sendevents)
@@ -793,6 +819,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
793819
}
794820
log.Infof("vstream for %s/%s error, retrying: %v", sgtid.Keyspace, sgtid.Shard, err)
795821
}
822+
796823
}
797824

798825
// shouldRetry determines whether we should exit immediately or retry the vstream.
@@ -838,6 +865,7 @@ func (vs *vstream) shouldRetry(err error) (retry bool, ignoreTablet bool) {
838865
func (vs *vstream) sendAll(ctx context.Context, sgtid *binlogdatapb.ShardGtid, eventss [][]*binlogdatapb.VEvent) error {
839866
vs.mu.Lock()
840867
defer vs.mu.Unlock()
868+
labelValues := []string{sgtid.Keyspace, sgtid.Shard, vs.tabletType.String()}
841869

842870
// Send all chunks while holding the lock.
843871
for _, events := range eventss {
@@ -890,6 +918,7 @@ func (vs *vstream) sendAll(ctx context.Context, sgtid *binlogdatapb.ShardGtid, e
890918
case <-ctx.Done():
891919
return nil
892920
case vs.eventCh <- events:
921+
vs.vsm.vstreamsEventsStreamed.Add(labelValues, int64(len(events)))
893922
}
894923
}
895924
return nil

go/vt/vtgate/vstream_manager_test.go

Lines changed: 90 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ func TestVStreamMulti(t *testing.T) {
335335
}
336336
}
337337

338-
func TestVStreamsCreatedAndLagMetrics(t *testing.T) {
338+
func TestVStreamsMetrics(t *testing.T) {
339339
ctx, cancel := context.WithCancel(context.Background())
340340
defer cancel()
341341
cell := "aa"
@@ -346,9 +346,12 @@ func TestVStreamsCreatedAndLagMetrics(t *testing.T) {
346346
vsm := newTestVStreamManager(ctx, hc, st, cell)
347347
vsm.vstreamsCreated.ResetAll()
348348
vsm.vstreamsLag.ResetAll()
349+
vsm.vstreamsCount.ResetAll()
350+
vsm.vstreamsEventsStreamed.ResetAll()
351+
vsm.vstreamsEndedWithErrors.ResetAll()
349352
sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil)
350353
addTabletToSandboxTopo(t, ctx, st, ks, "-20", sbc0.Tablet())
351-
sbc1 := hc.AddTestTablet(cell, "1.1.1.1", 1002, ks, "20-40", topodatapb.TabletType_PRIMARY, true, 1, nil)
354+
sbc1 := hc.AddTestTablet(cell, "1.1.1.2", 1002, ks, "20-40", topodatapb.TabletType_PRIMARY, true, 1, nil)
352355
addTabletToSandboxTopo(t, ctx, st, ks, "20-40", sbc1.Tablet())
353356

354357
send0 := []*binlogdatapb.VEvent{
@@ -377,15 +380,96 @@ func TestVStreamsCreatedAndLagMetrics(t *testing.T) {
377380
ch := startVStream(ctx, t, vsm, vgtid, nil)
378381
<-ch
379382
<-ch
383+
expectedLabels1 := "TestVStream.-20.PRIMARY"
384+
expectedLabels2 := "TestVStream.20-40.PRIMARY"
380385
wantVStreamsCreated := make(map[string]int64)
381-
wantVStreamsCreated["TestVStream.-20.PRIMARY"] = 1
382-
wantVStreamsCreated["TestVStream.20-40.PRIMARY"] = 1
386+
wantVStreamsCreated[expectedLabels1] = 1
387+
wantVStreamsCreated[expectedLabels2] = 1
383388
assert.Equal(t, wantVStreamsCreated, vsm.vstreamsCreated.Counts(), "vstreamsCreated matches")
384389

385390
wantVStreamsLag := make(map[string]int64)
386-
wantVStreamsLag["TestVStream.-20.PRIMARY"] = 5
387-
wantVStreamsLag["TestVStream.20-40.PRIMARY"] = 7
391+
wantVStreamsLag[expectedLabels1] = 5
392+
wantVStreamsLag[expectedLabels2] = 7
388393
assert.Equal(t, wantVStreamsLag, vsm.vstreamsLag.Counts(), "vstreamsLag matches")
394+
395+
wantVStreamsCount := make(map[string]int64)
396+
wantVStreamsCount[expectedLabels1] = 1
397+
wantVStreamsCount[expectedLabels2] = 1
398+
assert.Equal(t, wantVStreamsCount, vsm.vstreamsCount.Counts(), "vstreamsCount matches")
399+
400+
wantVStreamsEventsStreamed := make(map[string]int64)
401+
wantVStreamsEventsStreamed[expectedLabels1] = 2
402+
wantVStreamsEventsStreamed[expectedLabels2] = 2
403+
assert.Equal(t, wantVStreamsEventsStreamed, vsm.vstreamsEventsStreamed.Counts(), "vstreamsEventsStreamed matches")
404+
405+
wantVStreamsEndedWithErrors := make(map[string]int64)
406+
wantVStreamsEndedWithErrors[expectedLabels1] = 0
407+
wantVStreamsEndedWithErrors[expectedLabels2] = 0
408+
assert.Equal(t, wantVStreamsEndedWithErrors, vsm.vstreamsEndedWithErrors.Counts(), "vstreamsEndedWithErrors matches")
409+
}
410+
411+
func TestVStreamsMetricsErrors(t *testing.T) {
412+
ctx, cancel := context.WithCancel(context.Background())
413+
defer cancel()
414+
cell := "aa"
415+
ks := "TestVStream"
416+
_ = createSandbox(ks)
417+
hc := discovery.NewFakeHealthCheck(nil)
418+
st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"})
419+
vsm := newTestVStreamManager(ctx, hc, st, cell)
420+
vsm.vstreamsCreated.ResetAll()
421+
vsm.vstreamsLag.ResetAll()
422+
vsm.vstreamsCount.ResetAll()
423+
vsm.vstreamsEventsStreamed.ResetAll()
424+
vsm.vstreamsEndedWithErrors.ResetAll()
425+
sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil)
426+
addTabletToSandboxTopo(t, ctx, st, ks, "-20", sbc0.Tablet())
427+
sbc1 := hc.AddTestTablet(cell, "1.1.1.2", 1002, ks, "20-40", topodatapb.TabletType_PRIMARY, true, 1, nil)
428+
addTabletToSandboxTopo(t, ctx, st, ks, "20-40", sbc1.Tablet())
429+
430+
const wantErr = "Invalid arg message"
431+
sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, wantErr))
432+
433+
send1 := []*binlogdatapb.VEvent{
434+
{Type: binlogdatapb.VEventType_GTID, Gtid: "gtid02"},
435+
{Type: binlogdatapb.VEventType_COMMIT, Timestamp: 10, CurrentTime: 17 * 1e9},
436+
}
437+
sbc1.AddVStreamEvents(send1, nil)
438+
439+
vgtid := &binlogdatapb.VGtid{
440+
ShardGtids: []*binlogdatapb.ShardGtid{{
441+
Keyspace: ks,
442+
Shard: "-20",
443+
Gtid: "pos",
444+
}, {
445+
Keyspace: ks,
446+
Shard: "20-40",
447+
Gtid: "pos",
448+
}},
449+
}
450+
ch := make(chan *binlogdatapb.VStreamResponse)
451+
done := make(chan struct{})
452+
go func() {
453+
err := vsm.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error {
454+
ch <- &binlogdatapb.VStreamResponse{Events: events}
455+
return nil
456+
})
457+
458+
if err == nil || !strings.Contains(err.Error(), wantErr) {
459+
require.ErrorContains(t, err, wantErr)
460+
}
461+
close(done)
462+
}()
463+
<-ch
464+
<-done
465+
466+
expectedLabels1 := "TestVStream.-20.PRIMARY"
467+
expectedLabels2 := "TestVStream.20-40.PRIMARY"
468+
469+
wantVStreamsEndedWithErrors := make(map[string]int64)
470+
wantVStreamsEndedWithErrors[expectedLabels1] = 1
471+
wantVStreamsEndedWithErrors[expectedLabels2] = 0
472+
assert.Equal(t, wantVStreamsEndedWithErrors, vsm.vstreamsEndedWithErrors.Counts(), "vstreamsEndedWithErrors matches")
389473
}
390474

391475
func TestVStreamRetriableErrors(t *testing.T) {

0 commit comments

Comments
 (0)