Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions go/vt/vtgate/engine/fake_vcursor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ func (t *noopVCursor) CloneForReplicaWarming(ctx context.Context) VCursor {
panic("implement me")
}

func (t *noopVCursor) WarmingReadsContext(ctx context.Context) (context.Context, context.CancelFunc) {
panic("implement me")
}

func (t *noopVCursor) CloneForMirroring(ctx context.Context) VCursor {
panic("implement me")
}
Expand Down Expand Up @@ -470,10 +474,11 @@ type loggingVCursor struct {

parser *sqlparser.Parser

onMirrorClonesFn func(context.Context) VCursor
onExecuteMultiShardFn func(context.Context, Primitive, []*srvtopo.ResolvedShard, []*querypb.BoundQuery, bool, bool)
onStreamExecuteMultiFn func(context.Context, Primitive, string, []*srvtopo.ResolvedShard, []map[string]*querypb.BindVariable, bool, bool, func(*sqltypes.Result) error)
onRecordMirrorStatsFn func(time.Duration, time.Duration, error)
onMirrorClonesFn func(context.Context) VCursor
onExecuteMultiShardFn func(context.Context, Primitive, []*srvtopo.ResolvedShard, []*querypb.BoundQuery, bool, bool)
onStreamExecuteMultiFn func(context.Context, Primitive, string, []*srvtopo.ResolvedShard, []map[string]*querypb.BindVariable, bool, bool, func(*sqltypes.Result) error)
onRecordMirrorStatsFn func(time.Duration, time.Duration, error)
onResolveDestinationsFn func(context.Context)

metrics *Metrics
}
Expand Down Expand Up @@ -599,6 +604,10 @@ func (f *loggingVCursor) CloneForReplicaWarming(ctx context.Context) VCursor {
return f
}

func (f *loggingVCursor) WarmingReadsContext(ctx context.Context) (context.Context, context.CancelFunc) {
return ctx, func() {}
}

func (f *loggingVCursor) CloneForMirroring(ctx context.Context) VCursor {
if f.onMirrorClonesFn != nil {
return f.onMirrorClonesFn(ctx)
Expand Down Expand Up @@ -662,6 +671,9 @@ func (f *loggingVCursor) StreamExecuteMulti(ctx context.Context, primitive Primi
}

func (f *loggingVCursor) ResolveDestinations(ctx context.Context, keyspace string, ids []*querypb.Value, destinations []key.ShardDestination) ([]*srvtopo.ResolvedShard, [][]*querypb.Value, error) {
if f.onResolveDestinationsFn != nil {
f.onResolveDestinationsFn(ctx)
}
f.log = append(f.log, fmt.Sprintf("ResolveDestinations %v %v %v", keyspace, ids, key.DestinationsString(destinations)))
if f.shardErr != nil {
return nil, nil, f.shardErr
Expand Down
9 changes: 8 additions & 1 deletion go/vt/vtgate/engine/primitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,16 @@ type (
// GetWarmingReadsChannel returns the channel for executing warming reads against replicas
GetWarmingReadsChannel() chan bool

// CloneForReplicaWarming clones the VCursor for re-use in warming queries to replicas
// CloneForReplicaWarming clones the VCursor for re-use in warming queries to replicas.
// The clone must be created before launching the warming goroutine to avoid
// concurrent access to the original VCursor. Use WarmingReadsContext on the
// clone to obtain a timeout-bounded context for the warming query.
CloneForReplicaWarming(ctx context.Context) VCursor

// WarmingReadsContext returns a timeout-bounded context for a warming query
// and a cancel function that must be called when the warming query completes.
WarmingReadsContext(ctx context.Context) (context.Context, context.CancelFunc)

// CloneForMirroring clones the VCursor for re-use in mirroring queries to other keyspaces
CloneForMirroring(ctx context.Context) VCursor
//
Expand Down
23 changes: 11 additions & 12 deletions go/vt/vtgate/engine/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,10 @@ import (

var _ Primitive = (*Route)(nil)

var (
replicaWarmingReadsMirrored = stats.NewCountersWithMultiLabels(
"ReplicaWarmingReadsMirrored",
"Number of reads mirrored to replicas to warm their bufferpools",
[]string{"Keyspace"})
)
var replicaWarmingReadsMirrored = stats.NewCountersWithMultiLabels(
"ReplicaWarmingReadsMirrored",
"Number of reads mirrored to replicas to warm their bufferpools",
[]string{"Keyspace"})

// Route represents the instructions to route a read query to
// one or many vttablets.
Expand Down Expand Up @@ -103,9 +101,7 @@ func NewRoute(opcode Opcode, keyspace *vindexes.Keyspace, query, fieldQuery stri
}
}

var (
partialSuccessScatterQueries = stats.NewCounter("PartialSuccessScatterQueries", "Count of partially successful scatter queries")
)
var partialSuccessScatterQueries = stats.NewCounter("PartialSuccessScatterQueries", "Count of partially successful scatter queries")

// TryExecute performs a non-streaming exec.
func (route *Route) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
Expand Down Expand Up @@ -512,22 +508,25 @@ func (route *Route) executeWarmingReplicaRead(ctx context.Context, vcursor VCurs
return
}

replicaVCursor := vcursor.CloneForReplicaWarming(ctx)
warmingReadsChannel := vcursor.GetWarmingReadsChannel()

select {
// if there's no more room in the channel, drop the warming read
case warmingReadsChannel <- true:
replicaVCursor := vcursor.CloneForReplicaWarming(ctx)
go func(replicaVCursor VCursor) {
warmingCtx, cancel := replicaVCursor.WarmingReadsContext(ctx)
// Defers run LIFO: channel slot is released first, then context is canceled.
defer cancel()
defer func() {
<-warmingReadsChannel
}()
rss, _, err := route.findRoute(ctx, replicaVCursor, bindVars)
rss, _, err := route.findRoute(warmingCtx, replicaVCursor, bindVars)
if err != nil {
return
}

_, errs := replicaVCursor.ExecuteMultiShard(ctx, route, rss, queries, false /*rollbackOnError*/, false /*canAutocommit*/, route.FetchLastInsertID)
_, errs := replicaVCursor.ExecuteMultiShard(warmingCtx, route, rss, queries, false /*rollbackOnError*/, false /*canAutocommit*/, route.FetchLastInsertID)
if len(errs) > 0 {
log.Warningf("Failed to execute warming replica read: %v", errs)
} else {
Expand Down
Loading
Loading