Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions go/vt/discovery/healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,69 @@ func TestHealthCheck(t *testing.T) {
testChecksum(t, 0, hc.stateChecksum())
}

// TestHealthCheckConcurrentReadDuringUpdate exercises GetTabletHealthByAlias
// (which copies the tablet's health fields under connMu via SimpleCopy) while
// the tablet's checkConn goroutine processes streaming health responses. The
// field writes in processResponse must hold connMu, otherwise this reports a
// data race under -race.
func TestHealthCheckConcurrentReadDuringUpdate(t *testing.T) {
ctx := utils.LeakCheckContext(t)
hcErrorCounters.ResetAll()
ts := memorytopo.NewServer(ctx, "cell")
defer ts.Close()
hc := createTestHc(ctx, ts)
defer hc.Close()
tablet := createTestTablet(0, "cell", "a")
tablet.Type = topodatapb.TabletType_REPLICA
input := make(chan *querypb.StreamHealthResponse)
_ = createFakeConn(tablet, input)
hc.AddTablet(tablet)

var wg sync.WaitGroup
wg.Add(2)
stop := make(chan struct{})

// Writer: drive processResponse repeatedly through the checkConn goroutine.
go func() {
defer wg.Done()
for i := 0; ; i++ {
shr := &querypb.StreamHealthResponse{
TabletAlias: tablet.Alias,
Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA},
Serving: i%2 == 0,
RealtimeStats: &querypb.RealtimeStats{ReplicationLagSeconds: uint32(i % 5)},
}
select {
case input <- shr:
case <-stop:
return
}
}
}()

// Reader: copy the same fields under connMu via SimpleCopy.
go func() {
defer wg.Done()
for {
select {
case <-stop:
return
default:
_, _ = hc.GetTabletHealthByAlias(tablet.Alias)
}
}
}()

time.Sleep(200 * time.Millisecond)
close(stop)
// Unblock the writer if it is parked on a send.
select {
case <-input:
default:
}
wg.Wait()
}

func TestHealthCheckStreamError(t *testing.T) {
ctx := utils.LeakCheckContext(t)

Expand Down
28 changes: 20 additions & 8 deletions go/vt/discovery/tablet_health_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (thc *tabletHealthCheck) SimpleCopy() *TabletHealth {
// from the health check connection are logged the first time,
// but don't continue to log if the connection stays down.
//
// thc.mu must be locked before calling this function
// thc.connMu must be locked before calling this function.
func (thc *tabletHealthCheck) setServingState(serving bool, reason string) {
if !thc.loggedServingState || (serving != thc.Serving) {
// Emit the log from a separate goroutine to avoid holding
Expand Down Expand Up @@ -185,6 +185,7 @@ func (thc *tabletHealthCheck) processResponse(hc *HealthCheckImpl, shr *query.St
return vterrors.New(vtrpc.Code_FAILED_PRECONDITION, fmt.Sprintf("health stats mismatch, tablet %+v alias does not match response alias %v", thc.Tablet, shr.TabletAlias))
}

thc.connMu.Lock()
prevTarget := thc.Target
// check whether this is a trivial update so as to update healthy map
trivialUpdate := thc.LastError == nil && thc.Serving && shr.RealtimeStats.HealthError == "" && shr.Serving &&
Expand All @@ -199,9 +200,11 @@ func (thc *tabletHealthCheck) processResponse(hc *HealthCheckImpl, shr *query.St
reason = "healthCheck update error: " + healthErr.Error()
}
thc.setServingState(serving, reason)
serving = thc.Serving
thc.connMu.Unlock()

// notify downstream for primary change
hc.updateHealth(thc.SimpleCopy(), prevTarget, trivialUpdate, thc.Serving)
hc.updateHealth(thc.SimpleCopy(), prevTarget, trivialUpdate, serving)
return nil
}

Expand Down Expand Up @@ -308,12 +311,15 @@ func (thc *tabletHealthCheck) checkConn(hc *HealthCheckImpl) {
// This will ensure that this update prevails over any previous message that
// stream could have sent.
if timedout.Load() {
thc.connMu.Lock()
thc.LastError = fmt.Errorf("healthcheck timed out (latest %v)", thc.lastResponseTimestamp)
thc.setServingState(false, thc.LastError.Error())
hcErrorCounters.Add([]string{thc.Target.Keyspace, thc.Target.Shard, topoproto.TabletTypeLString(thc.Target.TabletType)}, 1)
target := thc.Target
thc.connMu.Unlock()
hcErrorCounters.Add([]string{target.Keyspace, target.Shard, topoproto.TabletTypeLString(target.TabletType)}, 1)
// trivialUpdate = false because this is an error
// up = false because we did not get a healthy response within the timeout
hc.updateHealth(thc.SimpleCopy(), thc.Target, false, false)
hc.updateHealth(thc.SimpleCopy(), target, false, false)
}

// Streaming RPC failed e.g. because vttablet was restarted or took too long.
Expand All @@ -334,25 +340,31 @@ func (thc *tabletHealthCheck) checkConn(hc *HealthCheckImpl) {

func (thc *tabletHealthCheck) closeConnection(ctx context.Context, err error) {
thc.logger.Warningf("tablet %v healthcheck stream error: %v", thc.Tablet, err)
thc.connMu.Lock()
thc.setServingState(false, err.Error())
thc.LastError = err
_ = thc.Conn.Close(ctx)
conn := thc.Conn
thc.Conn = nil
thc.connMu.Unlock()
_ = conn.Close(ctx)
}

// finalizeConn closes the health checking connection.
// To be called only on exit from checkConn().
func (thc *tabletHealthCheck) finalizeConn() {
thc.connMu.Lock()
thc.setServingState(false, "finalizeConn closing connection")
// Note: checkConn() exits only when thc.ctx.Done() is closed. Thus it's
// safe to simply get Err() value here and assign to LastError.
thc.LastError = thc.ctx.Err()
if thc.Conn != nil {
conn := thc.Conn
thc.Conn = nil
thc.connMu.Unlock()
if conn != nil {
// Don't use thc.ctx because it's already closed.
// Use a separate context, and add a timeout to prevent unbounded waits.
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
_ = thc.Conn.Close(ctx)
thc.Conn = nil
_ = conn.Close(ctx)
}
}
Loading