Skip to content

Commit f6f37d0

Browse files
committed
Add more vstream metrics for vstream manager
Signed-off-by: twthorn <thomaswilliamthornton@gmail.com>
1 parent 81ce29c commit f6f37d0

2 files changed

Lines changed: 121 additions & 17 deletions

File tree

go/vt/vtgate/vstream_manager.go

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,11 @@ type vstreamManager struct {
5151
toposerv srvtopo.Server
5252
cell string
5353

54-
vstreamsCreated *stats.CountersWithMultiLabels
55-
vstreamsLag *stats.GaugesWithMultiLabels
54+
vstreamsCreated *stats.CountersWithMultiLabels
55+
vstreamsLag *stats.GaugesWithMultiLabels
56+
vstreamsCount *stats.CountersWithMultiLabels
57+
vstreamsEventsStreamed *stats.CountersWithMultiLabels
58+
vstreamsEndedWithErrors *stats.CountersWithMultiLabels
5659
}
5760

5861
// maxSkewTimeoutSeconds is the maximum allowed skew between two streams when the MinimizeSkew flag is set
@@ -151,10 +154,22 @@ func newVStreamManager(resolver *srvtopo.Resolver, serv srvtopo.Server, cell str
151154
vstreamsCreated: exporter.NewCountersWithMultiLabels(
152155
"VStreamsCreated",
153156
"Number of vstreams created",
154-
[]string{"Keyspace", "ShardName", "TabletType"}),
157+
[]string{"Keyspace", "ShardName", "TabletType", "TabletHostname"}),
155158
vstreamsLag: exporter.NewGaugesWithMultiLabels(
156159
"VStreamsLag",
157160
"Difference between event current time and the binlog event timestamp",
161+
[]string{"Keyspace", "ShardName", "TabletType", "TabletHostname"}),
162+
vstreamsCount: exporter.NewCountersWithMultiLabels(
163+
"VStreamsCount",
164+
"Number of active vstreams",
165+
[]string{"Keyspace", "ShardName", "TabletType", "TabletHostname"}),
166+
vstreamsEventsStreamed: exporter.NewCountersWithMultiLabels(
167+
"VStreamsEventsStreamed",
168+
"Number of vstreams events sent",
169+
[]string{"Keyspace", "ShardName", "TabletType", "TabletHostname"}),
170+
vstreamsEndedWithErrors: exporter.NewCountersWithMultiLabels(
171+
"VStreamsEndedWithErrors",
172+
"Number of vstreams ended with errors",
158173
[]string{"Keyspace", "ShardName", "TabletType"}),
159174
}
160175
}
@@ -378,11 +393,17 @@ func (vs *vstream) startOneStream(ctx context.Context, sgtid *binlogdatapb.Shard
378393
vs.wg.Add(1)
379394
go func() {
380395
defer vs.wg.Done()
396+
397+
labels := []string{sgtid.Keyspace, sgtid.Shard, vs.tabletType.String()}
398+
vs.vsm.vstreamsEndedWithErrors.Add(labels, 0)
399+
381400
err := vs.streamFromTablet(ctx, sgtid)
382401

383402
// Set the error on exit. First one wins.
384403
if err != nil {
385404
log.Errorf("Error in vstream for %+v: %s", sgtid, err)
405+
406+
vs.vsm.vstreamsEndedWithErrors.Add(labels, 1)
386407
vs.once.Do(func() {
387408
vs.setError(err, fmt.Sprintf("error starting stream from shard GTID %+v", sgtid))
388409
vs.cancel()
@@ -613,18 +634,16 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
613634
TableLastPKs: sgtid.TablePKs,
614635
Options: options,
615636
}
616-
var vstreamCreatedOnce sync.Once
637+
638+
labels := []string{sgtid.Keyspace, sgtid.Shard, req.Target.TabletType.String(), tablet.Hostname}
639+
vs.vsm.vstreamsCreated.Add(labels, 1)
640+
vs.vsm.vstreamsCount.Add(labels, 1)
641+
617642
log.Infof("Starting to vstream from %s, with req %+v", tabletAliasString, req)
618643
err = tabletConn.VStream(ctx, req, func(events []*binlogdatapb.VEvent) error {
619644
// We received a valid event. Reset error count.
620645
errCount = 0
621646

622-
labels := []string{sgtid.Keyspace, sgtid.Shard, req.Target.TabletType.String()}
623-
624-
vstreamCreatedOnce.Do(func() {
625-
vs.vsm.vstreamsCreated.Add(labels, 1)
626-
})
627-
628647
select {
629648
case <-ctx.Done():
630649
return vterrors.Wrapf(ctx.Err(), "context ended while streaming from tablet %s in %s/%s",
@@ -646,6 +665,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
646665

647666
sendevents := make([]*binlogdatapb.VEvent, 0, len(events))
648667
for i, event := range events {
668+
vs.vsm.vstreamsEventsStreamed.Add(labels, 1)
649669
switch event.Type {
650670
case binlogdatapb.VEventType_FIELD:
651671
// Update table names and send.
@@ -762,6 +782,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
762782
}
763783
return nil
764784
})
785+
vs.vsm.vstreamsCount.Add(labels, -1)
765786
// If stream was ended (by a journal event), return nil without checking for error.
766787
select {
767788
case <-journalDone:

go/vt/vtgate/vstream_manager_test.go

Lines changed: 90 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ func TestVStreamMulti(t *testing.T) {
335335
}
336336
}
337337

338-
func TestVStreamsCreatedAndLagMetrics(t *testing.T) {
338+
func TestVStreamsMetrics(t *testing.T) {
339339
ctx, cancel := context.WithCancel(context.Background())
340340
defer cancel()
341341
cell := "aa"
@@ -346,9 +346,11 @@ func TestVStreamsCreatedAndLagMetrics(t *testing.T) {
346346
vsm := newTestVStreamManager(ctx, hc, st, cell)
347347
vsm.vstreamsCreated.ResetAll()
348348
vsm.vstreamsLag.ResetAll()
349-
sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil)
349+
hostname1 := "host1"
350+
hostname2 := "host2"
351+
sbc0 := hc.AddTestTablet(cell, hostname1, 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil)
350352
addTabletToSandboxTopo(t, ctx, st, ks, "-20", sbc0.Tablet())
351-
sbc1 := hc.AddTestTablet(cell, "1.1.1.1", 1002, ks, "20-40", topodatapb.TabletType_PRIMARY, true, 1, nil)
353+
sbc1 := hc.AddTestTablet(cell, hostname2, 1002, ks, "20-40", topodatapb.TabletType_PRIMARY, true, 1, nil)
352354
addTabletToSandboxTopo(t, ctx, st, ks, "20-40", sbc1.Tablet())
353355

354356
send0 := []*binlogdatapb.VEvent{
@@ -377,15 +379,96 @@ func TestVStreamsCreatedAndLagMetrics(t *testing.T) {
377379
ch := startVStream(ctx, t, vsm, vgtid, nil)
378380
<-ch
379381
<-ch
382+
expectedLabels1Prefix := "TestVStream.-20.PRIMARY"
383+
expectedLabels2Prefix := "TestVStream.20-40.PRIMARY"
384+
expectedLabels1 := expectedLabels1Prefix + "." + hostname1
385+
expectedLabels2 := expectedLabels2Prefix + "." + hostname2
380386
wantVStreamsCreated := make(map[string]int64)
381-
wantVStreamsCreated["TestVStream.-20.PRIMARY"] = 1
382-
wantVStreamsCreated["TestVStream.20-40.PRIMARY"] = 1
387+
wantVStreamsCreated[expectedLabels1] = 1
388+
wantVStreamsCreated[expectedLabels2] = 1
383389
assert.Equal(t, wantVStreamsCreated, vsm.vstreamsCreated.Counts(), "vstreamsCreated matches")
384390

385391
wantVStreamsLag := make(map[string]int64)
386-
wantVStreamsLag["TestVStream.-20.PRIMARY"] = 5
387-
wantVStreamsLag["TestVStream.20-40.PRIMARY"] = 7
392+
wantVStreamsLag[expectedLabels1] = 5
393+
wantVStreamsLag[expectedLabels2] = 7
388394
assert.Equal(t, wantVStreamsLag, vsm.vstreamsLag.Counts(), "vstreamsLag matches")
395+
396+
wantVStreamsCount := make(map[string]int64)
397+
wantVStreamsCount[expectedLabels1] = 1
398+
wantVStreamsCount[expectedLabels2] = 1
399+
assert.Equal(t, wantVStreamsCount, vsm.vstreamsCount.Counts(), "vstreamsCount matches")
400+
401+
wantVStreamsEventsStreamed := make(map[string]int64)
402+
wantVStreamsEventsStreamed[expectedLabels1] = 2
403+
wantVStreamsEventsStreamed[expectedLabels2] = 2
404+
assert.Equal(t, wantVStreamsEventsStreamed, vsm.vstreamsEventsStreamed.Counts(), "vstreamsEventsStreamed matches")
405+
406+
wantVStreamsEndedWithErrors := make(map[string]int64)
407+
wantVStreamsEndedWithErrors[expectedLabels1Prefix] = 0
408+
wantVStreamsEndedWithErrors[expectedLabels2Prefix] = 0
409+
assert.Equal(t, wantVStreamsEndedWithErrors, vsm.vstreamsEndedWithErrors.Counts(), "vstreamsEndedWithErrors matches")
410+
}
411+
412+
func TestVStreamsMetricsErrors(t *testing.T) {
413+
ctx, cancel := context.WithCancel(context.Background())
414+
defer cancel()
415+
cell := "aa"
416+
ks := "TestVStream"
417+
_ = createSandbox(ks)
418+
hc := discovery.NewFakeHealthCheck(nil)
419+
st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"})
420+
vsm := newTestVStreamManager(ctx, hc, st, cell)
421+
vsm.vstreamsCreated.ResetAll()
422+
vsm.vstreamsLag.ResetAll()
423+
hostname1 := "host1"
424+
hostname2 := "host2"
425+
sbc0 := hc.AddTestTablet(cell, hostname1, 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil)
426+
addTabletToSandboxTopo(t, ctx, st, ks, "-20", sbc0.Tablet())
427+
sbc1 := hc.AddTestTablet(cell, hostname2, 1002, ks, "20-40", topodatapb.TabletType_PRIMARY, true, 1, nil)
428+
addTabletToSandboxTopo(t, ctx, st, ks, "20-40", sbc1.Tablet())
429+
430+
const wantErr = "Invalid arg message"
431+
sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, wantErr))
432+
433+
send1 := []*binlogdatapb.VEvent{
434+
{Type: binlogdatapb.VEventType_GTID, Gtid: "gtid02"},
435+
{Type: binlogdatapb.VEventType_COMMIT, Timestamp: 10, CurrentTime: 17 * 1e9},
436+
}
437+
sbc1.AddVStreamEvents(send1, nil)
438+
439+
vgtid := &binlogdatapb.VGtid{
440+
ShardGtids: []*binlogdatapb.ShardGtid{{
441+
Keyspace: ks,
442+
Shard: "-20",
443+
Gtid: "pos",
444+
}, {
445+
Keyspace: ks,
446+
Shard: "20-40",
447+
Gtid: "pos",
448+
}},
449+
}
450+
ch := make(chan *binlogdatapb.VStreamResponse)
451+
done := make(chan struct{})
452+
go func() {
453+
err := vsm.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error {
454+
ch <- &binlogdatapb.VStreamResponse{Events: events}
455+
return nil
456+
})
457+
458+
if err == nil || !strings.Contains(err.Error(), wantErr) {
459+
t.Errorf("vstream end: %v, must contain %v", err.Error(), wantErr)
460+
}
461+
close(done)
462+
}()
463+
<-done
464+
465+
expectedLabels1 := "TestVStream.-20.PRIMARY"
466+
expectedLabels2 := "TestVStream.20-40.PRIMARY"
467+
468+
wantVStreamsEndedWithErrors := make(map[string]int64)
469+
wantVStreamsEndedWithErrors[expectedLabels1] = 1
470+
wantVStreamsEndedWithErrors[expectedLabels2] = 1
471+
assert.Equal(t, wantVStreamsEndedWithErrors, vsm.vstreamsEndedWithErrors.Counts(), "vstreamsEndedWithErrors matches")
389472
}
390473

391474
func TestVStreamRetriableErrors(t *testing.T) {

0 commit comments

Comments
 (0)