Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions go/vt/vtorc/inst/shard_dao_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,13 @@ func TestSaveReadAndDeleteShard(t *testing.T) {
require.NoError(t, err)
require.Equal(t, []string{tt.shardName}, shardNames)

// ReadAllShardNames
allShardNames, err := ReadAllShardNames()
require.NoError(t, err)
ksShards, found := allShardNames[tt.keyspaceName]
require.True(t, found)
require.Equal(t, []string{tt.shardName}, ksShards)

// DeleteShard
require.NoError(t, DeleteShard(tt.keyspaceName, tt.shardName))
_, _, err = ReadShardPrimaryInformation(tt.keyspaceName, tt.shardName)
Expand Down
32 changes: 28 additions & 4 deletions go/vt/vtorc/logic/keyspace_shard_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"golang.org/x/exp/maps"

"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
Expand Down Expand Up @@ -108,6 +109,26 @@ func RefreshKeyspaceAndShard(keyspaceName string, shardName string) error {
return refreshShard(keyspaceName, shardName)
}

// shouldWatchShard returns true if a shard is within the shardsToWatch
// ranges for it's keyspace.
func shouldWatchShard(shard *topo.ShardInfo) bool {
if len(shardsToWatch) == 0 {
return true
}

watchRanges, found := shardsToWatch[shard.Keyspace()]
if !found {
return false
}

for _, keyRange := range watchRanges {
if key.KeyRangeContainsKeyRange(keyRange, shard.GetKeyRange()) {
return true
}
}
return false
}

// refreshKeyspace refreshes the keyspace's information for the given keyspace from the topo
func refreshKeyspace(keyspaceName string) error {
refreshCtx, refreshCancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
Expand Down Expand Up @@ -149,10 +170,14 @@ func refreshAllShards(ctx context.Context, keyspaceName string) error {
log.Error(err)
return err
}

// save shards that should be watched.
savedShards := make(map[string]bool, len(shardInfos))
for _, shardInfo := range shardInfos {
err = inst.SaveShard(shardInfo)
if err != nil {
if !shouldWatchShard(shardInfo) {
continue
}
if err = inst.SaveShard(shardInfo); err != nil {
log.Error(err)
return err
}
Expand All @@ -171,8 +196,7 @@ func refreshAllShards(ctx context.Context, keyspaceName string) error {
}
shardName := topoproto.KeyspaceShardString(keyspaceName, shard)
log.Infof("Forgetting shard: %s", shardName)
err = inst.DeleteShard(keyspaceName, shard)
if err != nil {
if err = inst.DeleteShard(keyspaceName, shard); err != nil {
log.Errorf("Failed to delete shard %s: %+v", shardName, err)
return err
}
Expand Down
41 changes: 41 additions & 0 deletions go/vt/vtorc/logic/keyspace_shard_discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,3 +311,44 @@ func verifyPrimaryAlias(t *testing.T, keyspaceName, shardName string, primaryAli
require.NoError(t, err)
require.Equal(t, primaryAliasWanted, primaryAlias)
}

func TestRefreshAllShards(t *testing.T) {
// Store the old flags and restore on test completion
oldClustersToWatch := clustersToWatch
oldTs := ts
defer func() {
clustersToWatch = oldClustersToWatch
ts = oldTs
db.ClearVTOrcDatabase()
}()

ctx := context.Background()
ts = memorytopo.NewServer(ctx, "zone1")
require.NoError(t, initializeShardsToWatch())
require.NoError(t, ts.CreateKeyspace(ctx, "ks1", keyspaceDurabilityNone))
shards := []string{"-40", "40-80", "80-c0", "c0-"}
for _, shard := range shards {
require.NoError(t, ts.CreateShard(ctx, "ks1", shard))
}

// test shard refresh
require.NoError(t, refreshAllShards(ctx, "ks1"))
shardNames, err := inst.ReadShardNames("ks1")
require.NoError(t, err)
require.Equal(t, []string{"-40", "40-80", "80-c0", "c0-"}, shardNames)

// test topo shard delete propagates
require.NoError(t, ts.DeleteShard(ctx, "ks1", "c0-"))
require.NoError(t, refreshAllShards(ctx, "ks1"))
shardNames, err = inst.ReadShardNames("ks1")
require.NoError(t, err)
require.Equal(t, []string{"-40", "40-80", "80-c0"}, shardNames)

// test clustersToWatch filters what shards are saved
clustersToWatch = []string{"ks1/-80"}
require.NoError(t, initializeShardsToWatch())
require.NoError(t, refreshAllShards(ctx, "ks1"))
shardNames, err = inst.ReadShardNames("ks1")
require.NoError(t, err)
require.Equal(t, []string{"-40", "40-80"}, shardNames)
}