Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 16 additions & 9 deletions go/vt/vtorc/inst/instance_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/patrickmn/go-cache"
Expand Down Expand Up @@ -54,31 +53,37 @@ var (
instanceWriteSem = semaphore.NewWeighted(config.GetBackendWriteConcurrency())
)

var forgetAliases *cache.Cache
var (
forgetAliases *cache.Cache
forgetAliasesOnce sync.Once
)

var (
readTopologyInstanceCounter = stats.NewCounter("InstanceReadTopology", "Number of times an instance was read from the topology")
readInstanceCounter = stats.NewCounter("InstanceRead", "Number of times an instance was read")
currentErrantGTIDCount = stats.NewGaugesWithSingleLabel("CurrentErrantGTIDCount", "Number of errant GTIDs a vttablet currently has", "TabletAlias")
)

var (
emptyQuotesRegexp = regexp.MustCompile(`^""$`)
cacheInitializationCompleted atomic.Bool
)
var emptyQuotesRegexp = regexp.MustCompile(`^""$`)

func init() {
go initializeInstanceDao()
}

func initializeInstanceDao() {
config.WaitForConfigurationToBeLoaded()
InitializeForgetAliasesCache()
initForgetAliasesCache()
}

func initForgetAliasesCache() {
forgetAliasesOnce.Do(func() {
forgetAliases = cache.New(config.GetInstancePollTime()*3, time.Second)
})
}

// InitializeForgetAliasesCache ensures the forgetAliases cache is initialized.
func InitializeForgetAliasesCache() {
forgetAliases = cache.New(config.GetInstancePollTime()*3, time.Second)
cacheInitializationCompleted.Store(true)
initForgetAliasesCache()
}

// ExecDBWriteFunc chooses how to execute a write onto the database: whether synchronously or not
Expand Down Expand Up @@ -1105,6 +1110,7 @@ func UpdateInstanceLastAttemptedCheck(tabletAlias *topodatapb.TabletAlias) error

// InstanceIsForgotten returns true if an instance was forgotten.
func InstanceIsForgotten(tabletAlias *topodatapb.TabletAlias) bool {
initForgetAliasesCache()
tabletAliasString := topoproto.TabletAliasString(tabletAlias)
_, found := forgetAliases.Get(tabletAliasString)
return found
Expand All @@ -1118,6 +1124,7 @@ func ForgetInstance(tabletAlias *topodatapb.TabletAlias) error {
log.Error(errMsg)
return errors.New(errMsg)
}
initForgetAliasesCache()
tabletAliasString := topoproto.TabletAliasString(tabletAlias)
forgetAliases.Set(tabletAliasString, true, cache.DefaultExpiration)
log.Info(fmt.Sprintf("Forgetting: %v", tabletAliasString))
Expand Down
19 changes: 4 additions & 15 deletions go/vt/vtorc/inst/instance_dao_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,8 +558,8 @@ func TestReadOutdatedInstances(t *testing.T) {
},
}

// wait for the forgetAliases cache to be initialized to prevent data race.
waitForCacheInitialization()
// Ensure the forgetAliases cache is initialized before overriding it.
InitializeForgetAliasesCache()

// We are setting InstancePollSeconds to 59 minutes, just for the test.
oldVal := config.GetInstancePollTime()
Expand Down Expand Up @@ -747,8 +747,8 @@ func TestForgetInstanceAndInstanceIsForgotten(t *testing.T) {
},
}

// wait for the forgetAliases cache to be initialized to prevent data race.
waitForCacheInitialization()
// Ensure the forgetAliases cache is initialized before overriding it.
InitializeForgetAliasesCache()

oldCache := forgetAliases
// Clear the database after the test. The easiest way to do that is to run all the initialization commands again.
Expand Down Expand Up @@ -810,17 +810,6 @@ func TestSnapshotTopologies(t *testing.T) {
require.Equal(t, []string{"zone1-0000000100", "zone1-0000000101", "zone1-0000000112", "zone2-0000000200"}, tabletAliases)
}

// waitForCacheInitialization waits for the cache to be initialized to prevent data race in tests
// that alter the cache or depend on its behaviour.
func waitForCacheInitialization() {
for {
if cacheInitializationCompleted.Load() {
return
}
time.Sleep(100 * time.Millisecond)
}
}

func TestGetDatabaseState(t *testing.T) {
// Clear the database after the test. The easiest way to do that is to run all the initialization commands again.
defer func() {
Expand Down
Loading