Skip to content

Commit 89cebd0

Browse files
vitess-bot[bot]vitess-bot
authored andcommitted
[Bugfix] Broken Heartbeat system in Row Streamer (#18390)
Signed-off-by: siddharth16396 <siddharth16396@gmail.com>
1 parent a4a6ed3 commit 89cebd0

2 files changed

Lines changed: 80 additions & 5 deletions

File tree

go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -362,11 +362,13 @@ func (rs *rowStreamer) streamQuery(send func(*binlogdatapb.VStreamRowsResponse)
362362
heartbeatTicker := time.NewTicker(rowStreamertHeartbeatInterval)
363363
defer heartbeatTicker.Stop()
364364
go func() {
365-
select {
366-
case <-rs.ctx.Done():
367-
return
368-
case <-heartbeatTicker.C:
369-
safeSend(&binlogdatapb.VStreamRowsResponse{Heartbeat: true})
365+
for {
366+
select {
367+
case <-rs.ctx.Done():
368+
return
369+
case <-heartbeatTicker.C:
370+
safeSend(&binlogdatapb.VStreamRowsResponse{Heartbeat: true})
371+
}
370372
}
371373
}()
372374

go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"fmt"
2222
"regexp"
2323
"testing"
24+
"time"
2425

2526
"github.com/stretchr/testify/require"
2627

@@ -432,6 +433,78 @@ func TestStreamRowsCancel(t *testing.T) {
432433
}
433434
}
434435

436+
func TestStreamRowsHeartbeat(t *testing.T) {
437+
if testing.Short() {
438+
t.Skip()
439+
}
440+
441+
// Save original heartbeat interval and restore it after test
442+
originalInterval := rowStreamertHeartbeatInterval
443+
defer func() {
444+
rowStreamertHeartbeatInterval = originalInterval
445+
}()
446+
447+
// Set a very short heartbeat interval for testing (100ms)
448+
rowStreamertHeartbeatInterval = 10 * time.Millisecond
449+
450+
execStatements(t, []string{
451+
"create table t1(id int, val varchar(128), primary key(id))",
452+
"insert into t1 values (1, 'test1')",
453+
"insert into t1 values (2, 'test2')",
454+
"insert into t1 values (3, 'test3')",
455+
"insert into t1 values (4, 'test4')",
456+
"insert into t1 values (5, 'test5')",
457+
})
458+
459+
defer execStatements(t, []string{
460+
"drop table t1",
461+
})
462+
463+
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
464+
defer cancel()
465+
466+
heartbeatCount := 0
467+
dataReceived := false
468+
469+
var options binlogdatapb.VStreamOptions
470+
options.ConfigOverrides = make(map[string]string)
471+
options.ConfigOverrides["vstream_dynamic_packet_size"] = "false"
472+
options.ConfigOverrides["vstream_packet_size"] = "10"
473+
474+
err := engine.StreamRows(ctx, "select * from t1", nil, func(rows *binlogdatapb.VStreamRowsResponse) error {
475+
if rows.Heartbeat {
476+
heartbeatCount++
477+
// After receiving at least 3 heartbeats, we can be confident the fix is working
478+
if heartbeatCount >= 3 {
479+
cancel()
480+
return nil
481+
}
482+
} else if len(rows.Rows) > 0 {
483+
dataReceived = true
484+
}
485+
// Add a small delay to allow heartbeats to be sent
486+
time.Sleep(50 * time.Millisecond)
487+
return nil
488+
}, &options)
489+
490+
// We expect context canceled error since we cancel after receiving heartbeats
491+
if err != nil && err.Error() != "stream ended: context canceled" {
492+
t.Errorf("unexpected error: %v", err)
493+
}
494+
495+
// Verify we received data
496+
if !dataReceived {
497+
t.Error("expected to receive data rows")
498+
}
499+
500+
// This is the critical test: we should receive multiple heartbeats
501+
// Without the fix (missing for loop), we would only get 1 heartbeat
502+
// With the fix, we should get at least 3 heartbeats
503+
if heartbeatCount < 3 {
504+
t.Errorf("expected at least 3 heartbeats, got %d. This indicates the heartbeat goroutine is not running continuously", heartbeatCount)
505+
}
506+
}
507+
435508
func checkStream(t *testing.T, query string, lastpk []sqltypes.Value, wantQuery string, wantStream []string) {
436509
t.Helper()
437510

0 commit comments

Comments
 (0)