Skip to content

Commit 9166ea9

Browse files
committed
Bp pr 17558 pr 17858.slack19.0 (#615)
* VReplication: Improve error handling in VTGate VStreams (vitessio#17558) Signed-off-by: Tom Thornton <thomaswilliamthornton@gmail.com> * Backport vitessio#17858 --------- Signed-off-by: Tom Thornton <thomaswilliamthornton@gmail.com>
1 parent 9a59a67 commit 9166ea9

4 files changed

Lines changed: 223 additions & 83 deletions

File tree

go/test/endtoend/vreplication/vstream_test.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"io"
2323
"strings"
2424
"sync"
25+
"sync/atomic"
2526
"testing"
2627
"time"
2728

@@ -77,11 +78,13 @@ func testVStreamWithFailover(t *testing.T, failover bool) {
7778
}},
7879
}
7980
flags := &vtgatepb.VStreamFlags{HeartbeatInterval: 3600}
80-
done := false
81+
done := atomic.Bool{}
82+
done.Store(false)
8183

8284
// don't insert while PRS is going on
8385
var insertMu sync.Mutex
84-
stopInserting := false
86+
stopInserting := atomic.Bool{}
87+
stopInserting.Store(false)
8588
id := 0
8689

8790
vtgateConn := vc.GetVTGateConn(t)
@@ -90,7 +93,7 @@ func testVStreamWithFailover(t *testing.T, failover bool) {
9093
// first goroutine that keeps inserting rows into table being streamed until some time elapses after second PRS
9194
go func() {
9295
for {
93-
if stopInserting {
96+
if stopInserting.Load() {
9497
return
9598
}
9699
insertMu.Lock()
@@ -122,7 +125,7 @@ func testVStreamWithFailover(t *testing.T, failover bool) {
122125
log.Infof("%s:: remote error: %v", time.Now(), err)
123126
}
124127

125-
if done {
128+
if done.Load() {
126129
return
127130
}
128131
}
@@ -154,12 +157,12 @@ func testVStreamWithFailover(t *testing.T, failover bool) {
154157
require.NoError(t, err)
155158
}
156159
time.Sleep(100 * time.Millisecond)
157-
stopInserting = true
158-
time.Sleep(2 * time.Second)
159-
done = true
160+
stopInserting.Store(true)
161+
time.Sleep(10 * time.Second) // Give the vstream plenty of time to catchup
162+
done.Store(true)
160163
}
161164

162-
if done {
165+
if done.Load() {
163166
break
164167
}
165168
}

0 commit comments

Comments
 (0)