diff --git a/go/sqltypes/proto3.go b/go/sqltypes/proto3.go index 4b7ef3013d5..acc2c17a077 100644 --- a/go/sqltypes/proto3.go +++ b/go/sqltypes/proto3.go @@ -71,9 +71,42 @@ func RowsToProto3(rows [][]Value) []*querypb.Row { return nil } + // Batch-allocate all Row structs in a single backing array to reduce + // per-row heap allocations from N to 1. + backing := make([]querypb.Row, len(rows)) result := make([]*querypb.Row, len(rows)) + + // Pre-allocate a single Lengths backing array for all rows. + nCols := len(rows[0]) + allLengths := make([]int64, 0, nCols*len(rows)) + + // First pass: compute lengths and accumulate total value size. + totalValueBytes := 0 for i, r := range rows { - result[i] = RowToProto3(r) + result[i] = &backing[i] + start := len(allLengths) + for _, c := range r { + if c.IsNull() { + allLengths = append(allLengths, -1) + } else { + l := c.Len() + allLengths = append(allLengths, int64(l)) + totalValueBytes += l + } + } + backing[i].Lengths = allLengths[start:] + } + + // Second pass: batch-allocate all Values into a single buffer. + allValues := make([]byte, 0, totalValueBytes) + for i, r := range rows { + start := len(allValues) + for _, c := range r { + if !c.IsNull() { + allValues = append(allValues, c.Raw()...) + } + } + backing[i].Values = allValues[start:] } return result } @@ -87,9 +120,26 @@ func proto3ToRows(fields []*querypb.Field, rows []*querypb.Row) [][]Value { return [][]Value{} } - result := make([][]Value, len(rows)) + nCols := len(fields) + nRows := len(rows) + + // Combined allocation: result slice headers + backing Values in one block. + // Layout: [nRows []Value headers] allocated as make([][]Value, nRows) + // then [nRows*nCols Value] backing allocated separately. + // For single row, the make calls may be optimized by the compiler. + backing := make([]Value, nRows*nCols) + result := make([][]Value, nRows) for i, r := range rows { - result[i] = MakeRowTrusted(fields, r) + sqlRow := backing[i*nCols : (i+1)*nCols] + var offset int64 + for j, length := range r.Lengths { + if length < 0 { + continue + } + sqlRow[j] = MakeTrusted(fields[j].Type, r.Values[offset:offset+length]) + offset += length + } + result[i] = sqlRow } return result } diff --git a/go/streamlog/streamlog.go b/go/streamlog/streamlog.go index 721c0d28dc3..7ea82827740 100644 --- a/go/streamlog/streamlog.go +++ b/go/streamlog/streamlog.go @@ -155,21 +155,35 @@ func shouldEmitLogOnCondition(aCond bool, aReason string, allMatches bool, reaso } } +// HasSubscribers returns true if there are any active subscribers. +// This can be used to skip expensive work (e.g., copying bind variables) +// when no one is listening. +func (logger *StreamLogger[T]) HasSubscribers() bool { + logger.mu.Lock() + has := len(logger.subscribed) > 0 + logger.mu.Unlock() + return has +} + // Send sends message to all the writers subscribed to logger. Calling -// Send does not block. -func (logger *StreamLogger[T]) Send(message T) { +// Send does not block. It returns the number of subscribers the message +// was delivered to. +func (logger *StreamLogger[T]) Send(message T) int { logger.mu.Lock() defer logger.mu.Unlock() + delivered := 0 for ch, name := range logger.subscribed { select { case ch <- message: deliveredCount.Add([]string{logger.name, name}, 1) + delivered++ default: deliveryDropCount.Add([]string{logger.name, name}, 1) } } sendCount.Add(logger.name, 1) + return delivered } // Subscribe returns a channel which can be used to listen diff --git a/go/trace/trace.go b/go/trace/trace.go index cb2a7c6f31b..99c98456b74 100644 --- a/go/trace/trace.go +++ b/go/trace/trace.go @@ -62,6 +62,13 @@ func AnnotateSQL(span Span, strippedSQL fmt.Stringer) { span.Annotate("sql-statement-type", strippedSQL.String()) } +// IsNoop returns true when the current tracer is the noop tracer. +// Callers can use this to avoid computing trace annotations that would be discarded. +func IsNoop() bool { + _, ok := currentTracer.(noopTracingServer) + return ok +} + // FromContext returns the Span from a Context if present. The bool return // value indicates whether a Span was present in the Context. func FromContext(ctx context.Context) (Span, bool) { diff --git a/go/vt/callerid/callerid.go b/go/vt/callerid/callerid.go index c09a2a455aa..63703a57479 100644 --- a/go/vt/callerid/callerid.go +++ b/go/vt/callerid/callerid.go @@ -88,13 +88,14 @@ func GetSubcomponent(ef *vtrpcpb.CallerID) string { } // NewContext adds the provided EffectiveCallerID(vtrpcpb.CallerID) and ImmediateCallerID(querypb.VTGateCallerID) -// into the Context +// into the Context. Skips context wrapping for nil values to avoid unnecessary allocations. func NewContext(ctx context.Context, ef *vtrpcpb.CallerID, im *querypb.VTGateCallerID) context.Context { - ctx = context.WithValue( - context.WithValue(ctx, effectiveCallerIDKey, ef), - immediateCallerIDKey, - im, - ) + if ef != nil { + ctx = context.WithValue(ctx, effectiveCallerIDKey, ef) + } + if im != nil { + ctx = context.WithValue(ctx, immediateCallerIDKey, im) + } return ctx } diff --git a/go/vt/callinfo/plugin_grpc.go b/go/vt/callinfo/plugin_grpc.go index e402f2f7966..6de5fa547e9 100644 --- a/go/vt/callinfo/plugin_grpc.go +++ b/go/vt/callinfo/plugin_grpc.go @@ -21,6 +21,7 @@ package callinfo import ( "context" "fmt" + "net" "github.com/google/safehtml" "github.com/google/safehtml/template" @@ -29,50 +30,65 @@ import ( ) // GRPCCallInfo returns an augmented context with a CallInfo structure, -// only for gRPC contexts. +// only for gRPC contexts. Uses a combined callInfoContext to avoid +// separate allocations for the struct and context.WithValue wrapper. func GRPCCallInfo(ctx context.Context) context.Context { method, ok := grpc.Method(ctx) if !ok { return ctx } - callinfo := &gRPCCallInfoImpl{ - method: method, + c := &callInfoContext{ + Context: ctx, + method: method, } - peer, ok := peer.FromContext(ctx) - if ok { - callinfo.remoteAddr = peer.Addr.String() + if p, ok := peer.FromContext(ctx); ok { + c.remoteAddr = p.Addr } - return NewContext(ctx, callinfo) + return c } -type gRPCCallInfoImpl struct { +// callInfoContext is a combined context+callinfo that avoids separate allocations +// for the gRPCCallInfoImpl struct and the context.WithValue wrapper. It embeds +// the parent context and implements Value() to return itself for the callInfoKey. +type callInfoContext struct { + context.Context method string - remoteAddr string + remoteAddr net.Addr } -func (gci *gRPCCallInfoImpl) RemoteAddr() string { - return gci.remoteAddr +func (c *callInfoContext) Value(key any) any { + if key == callInfoKey { + return CallInfo(c) + } + return c.Context.Value(key) +} + +func (c *callInfoContext) RemoteAddr() string { + if c.remoteAddr == nil { + return "" + } + return c.remoteAddr.String() } -func (gci *gRPCCallInfoImpl) Username() string { +func (c *callInfoContext) Username() string { return "gRPC" } -func (gci *gRPCCallInfoImpl) Text() string { - return fmt.Sprintf("%s:%s(gRPC)", gci.remoteAddr, gci.method) +func (c *callInfoContext) Text() string { + return fmt.Sprintf("%s:%s(gRPC)", c.RemoteAddr(), c.method) } var grpcTmpl = template.Must(template.New("tcs").Parse("Method: {{.Method}} Remote Addr: {{.RemoteAddr}}")) -func (gci *gRPCCallInfoImpl) HTML() safehtml.HTML { +func (c *callInfoContext) HTML() safehtml.HTML { html, err := grpcTmpl.ExecuteToHTML(struct { Method string RemoteAddr string }{ - Method: gci.method, - RemoteAddr: gci.remoteAddr, + Method: c.method, + RemoteAddr: c.RemoteAddr(), }) if err != nil { panic(err) diff --git a/go/vt/callinfo/plugin_grpc_test.go b/go/vt/callinfo/plugin_grpc_test.go index 794619899ba..e911881b5b2 100644 --- a/go/vt/callinfo/plugin_grpc_test.go +++ b/go/vt/callinfo/plugin_grpc_test.go @@ -17,20 +17,41 @@ limitations under the License. package callinfo import ( + "net" "testing" "github.com/stretchr/testify/require" ) func TestGRPCCallInfo(t *testing.T) { - grpcCi := gRPCCallInfoImpl{ + grpcCi := callInfoContext{ + Context: t.Context(), method: "tcp", - remoteAddr: "localhost", + remoteAddr: &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 8080}, } require.Equal(t, t.Context(), GRPCCallInfo(t.Context())) - require.Equal(t, grpcCi.remoteAddr, grpcCi.RemoteAddr()) + require.Equal(t, "127.0.0.1:8080", grpcCi.RemoteAddr()) require.Equal(t, "gRPC", grpcCi.Username()) - require.Equal(t, "localhost:tcp(gRPC)", grpcCi.Text()) - require.Equal(t, "Method: tcp Remote Addr: localhost", grpcCi.HTML().String()) + require.Equal(t, "127.0.0.1:8080:tcp(gRPC)", grpcCi.Text()) + require.Equal(t, "Method: tcp Remote Addr: 127.0.0.1:8080", grpcCi.HTML().String()) +} + +func TestGRPCCallInfoNilAddr(t *testing.T) { + grpcCi := callInfoContext{ + Context: t.Context(), + method: "test", + } + require.Equal(t, "", grpcCi.RemoteAddr()) +} + +func TestGRPCCallInfoFromContext(t *testing.T) { + ctx := &callInfoContext{ + Context: t.Context(), + method: "/queryservice.Query/Execute", + remoteAddr: &net.TCPAddr{IP: net.ParseIP("10.0.0.1"), Port: 1234}, + } + ci, ok := FromContext(ctx) + require.True(t, ok) + require.Equal(t, "10.0.0.1:1234", ci.RemoteAddr()) } diff --git a/go/vt/sqlparser/normalizer.go b/go/vt/sqlparser/normalizer.go index 91e28b77a18..bb304f0a77a 100644 --- a/go/vt/sqlparser/normalizer.go +++ b/go/vt/sqlparser/normalizer.go @@ -50,7 +50,7 @@ type ( inDerived int inSelect int - bindVarNeeds *BindVarNeeds + bindVarNeeds BindVarNeeds shouldRewriteDatabaseFunc bool hasStarInSelect bool @@ -126,7 +126,7 @@ func Normalize( return &RewriteASTResult{ AST: out.(Statement), - BindVarNeeds: nz.bindVarNeeds, + BindVarNeeds: &nz.bindVarNeeds, UpdateQueryFromAST: nz.useASTQuery, }, nil } @@ -146,15 +146,12 @@ func newNormalizer( bindVars: bindVars, reserved: reserved, vals: make(map[Literal]string), - tupleVals: make(map[string]string), - bindVarNeeds: &BindVarNeeds{}, keyspace: keyspace, selectLimit: selectLimit, setVarComment: setVarComment, fkChecksState: fkChecksState, sysVars: sysVars, views: views, - onLeave: make(map[*AliasedExpr]func(*AliasedExpr)), parameterize: parameterize, } } @@ -220,9 +217,17 @@ func (nz *normalizer) noteAliasedExprName(node *AliasedExpr) { if node.As.NotEmpty() { return } + // Column references are never rewritten by normalization (only literals are), + // so there's no need to track them for alias preservation. + if _, ok := node.Expr.(*ColName); ok { + return + } buf := NewTrackedBuffer(nil) node.Expr.Format(buf) rewrites := nz.bindVarNeeds.NumberOfRewrites() + if nz.onLeave == nil { + nz.onLeave = make(map[*AliasedExpr]func(*AliasedExpr)) + } nz.onLeave[node] = func(newAliasedExpr *AliasedExpr) { if nz.bindVarNeeds.NumberOfRewrites() > rewrites { newAliasedExpr.As = NewIdentifierCI(buf.String()) @@ -485,6 +490,9 @@ func (nz *normalizer) rewriteInComparisons(node *ComparisonExpr) { } nz.bindVars[bvname] = bvals + if nz.tupleVals == nil { + nz.tupleVals = make(map[string]string) + } nz.tupleVals[string(key)] = bvname } diff --git a/go/vt/sqlparser/tracked_buffer.go b/go/vt/sqlparser/tracked_buffer.go index 14fe6fe772e..7cd3f3e1b6d 100644 --- a/go/vt/sqlparser/tracked_buffer.go +++ b/go/vt/sqlparser/tracked_buffer.go @@ -21,10 +21,19 @@ import ( "reflect" "strconv" "strings" + "sync" "vitess.io/vitess/go/slice" ) +var trackedBufferPool = sync.Pool{ + New: func() any { + return &TrackedBuffer{ + Builder: new(strings.Builder), + } + }, +} + // NodeFormatter defines the signature of a custom node formatter // function that can be given to TrackedBuffer for code generation. type NodeFormatter func(buf *TrackedBuffer, node SQLNode) @@ -398,9 +407,17 @@ func String(node SQLNode) string { return "" } - buf := NewTrackedBuffer(nil) + buf := trackedBufferPool.Get().(*TrackedBuffer) + buf.Builder.Reset() + buf.bindLocations = buf.bindLocations[:0] + buf.nodeFormatter = nil + buf.literal = buf.WriteString + buf.fast = true + buf.escape = escapeKeywords node.FormatFast(buf) - return buf.String() + s := buf.String() + trackedBufferPool.Put(buf) + return s } // UnescapedString will return a string where no identifiers have been escaped. diff --git a/go/vt/vtgate/engine/cached_size.go b/go/vt/vtgate/engine/cached_size.go index 441a7bfa30c..3551172b26c 100644 --- a/go/vt/vtgate/engine/cached_size.go +++ b/go/vt/vtgate/engine/cached_size.go @@ -827,7 +827,7 @@ func (cached *Plan) CachedSize(alloc bool) int64 { } size := int64(0) if alloc { - size += int64(224) + size += int64(240) } // field Original string size += hack.RuntimeAllocSize(int64(len(cached.Original))) @@ -853,6 +853,15 @@ func (cached *Plan) CachedSize(alloc bool) int64 { } // field QueryHints vitess.io/vitess/go/vt/sqlparser.QueryHints size += cached.QueryHints.CachedSize(false) + // field RoutingIndexes [][3]string + { + size += hack.RuntimeAllocSize(int64(cap(cached.RoutingIndexes)) * int64(48)) + for _, elem := range cached.RoutingIndexes { + for _, elem := range elem { + size += hack.RuntimeAllocSize(int64(len(elem))) + } + } + } return size } diff --git a/go/vt/vtgate/engine/plan.go b/go/vt/vtgate/engine/plan.go index 5a3eb747be1..2474a2b102b 100644 --- a/go/vt/vtgate/engine/plan.go +++ b/go/vt/vtgate/engine/plan.go @@ -51,6 +51,10 @@ type ( ParamsCount uint16 // ParamsCount is the total number of bind parameters (?) in the query. Optimized atomic.Bool // Prepared queries need to be optimized before the first execution + // RoutingIndexes caches the routing vindex information for this plan, + // computed once from Instructions to avoid per-query closure allocation. + RoutingIndexes [][3]string + ExecCount uint64 // ExecCount is how many times this plan has been executed. ExecTime uint64 // ExecTime is the total accumulated execution time in nanoseconds. ShardQueries uint64 // ShardQueries is the total count of shard-level queries performed. @@ -259,7 +263,9 @@ func (pk PlanKey) DebugString() string { } func (pk PlanKey) Hash() theine.HashKey256 { - hasher := vthash.New256() + // Use a stack-allocated Digest to avoid heap allocation. + var hasher vthash.Hasher256 + vthash.Init256(&hasher) _, _ = hasher.WriteUint16(uint16(pk.Collation)) _, _ = hasher.WriteUint16(uint16(pk.TabletType)) _, _ = hasher.WriteString(pk.CurrentKeyspace) @@ -274,12 +280,13 @@ func (pk PlanKey) Hash() theine.HashKey256 { func NewPlan(query string, stmt sqlparser.Statement, primitive Primitive, bindVarNeeds *sqlparser.BindVarNeeds, tablesUsed []string) *Plan { return &Plan{ - Type: getPlanType(primitive), - QueryType: sqlparser.ASTToStatementType(stmt), - Original: query, - Instructions: primitive, - BindVarNeeds: bindVarNeeds, - TablesUsed: tablesUsed, + Type: getPlanType(primitive), + QueryType: sqlparser.ASTToStatementType(stmt), + Original: query, + Instructions: primitive, + BindVarNeeds: bindVarNeeds, + TablesUsed: tablesUsed, + RoutingIndexes: GetRoutingIndexes(primitive), } } diff --git a/go/vt/vtgate/engine/route.go b/go/vt/vtgate/engine/route.go index a3e68127fd2..c39267bf579 100644 --- a/go/vt/vtgate/engine/route.go +++ b/go/vt/vtgate/engine/route.go @@ -506,6 +506,12 @@ func execShard( } func getQueries(query string, bvs []map[string]*querypb.BindVariable) []*querypb.BoundQuery { + if len(bvs) == 1 { + return []*querypb.BoundQuery{{ + Sql: query, + BindVariables: bvs[0], + }} + } queries := make([]*querypb.BoundQuery, len(bvs)) for i, bv := range bvs { queries[i] = &querypb.BoundQuery{ diff --git a/go/vt/vtgate/engine/routing.go b/go/vt/vtgate/engine/routing.go index f362c5868c6..b225d9b370e 100644 --- a/go/vt/vtgate/engine/routing.go +++ b/go/vt/vtgate/engine/routing.go @@ -346,12 +346,25 @@ func (rp *RoutingParameters) byDestination(ctx context.Context, vcursor VCursor, } func (rp *RoutingParameters) equal(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable) ([]*srvtopo.ResolvedShard, []map[string]*querypb.BindVariable, error) { - env := evalengine.NewExpressionEnv(ctx, bindVars, vcursor) - value, err := env.Evaluate(rp.Values[0]) - if err != nil { - return nil, nil, err + var value sqltypes.Value + // Fast path: for simple bind variable references with numeric types, skip the eval machinery + // (ExpressionEnv + valueToEval allocations) and use the bind variable value directly. + if bvExpr, ok := rp.Values[0].(*evalengine.BindVariable); ok { + if bvar, found := bindVars[bvExpr.Key]; found && bvar != nil && sqltypes.IsNumber(bvar.Type) { + value = sqltypes.MakeTrusted(bvar.Type, bvar.Value) + goto resolved + } + } + { + env := evalengine.NewExpressionEnv(ctx, bindVars, vcursor) + result, err := env.Evaluate(rp.Values[0]) + if err != nil { + return nil, nil, err + } + value = result.Value(vcursor.ConnCollation()) } - rss, _, err := resolveShards(ctx, vcursor, rp.Vindex.(vindexes.SingleColumn), rp.Keyspace, []sqltypes.Value{value.Value(vcursor.ConnCollation())}) +resolved: + rss, _, err := resolveShards(ctx, vcursor, rp.Vindex.(vindexes.SingleColumn), rp.Keyspace, []sqltypes.Value{value}) if err != nil { return nil, nil, err } diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index cd49508feb8..1152875222d 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -260,8 +260,10 @@ func (e *Executor) Execute( prepared bool, ) (result *sqltypes.Result, err error) { span, ctx := trace.NewSpan(ctx, "executor.Execute") - span.Annotate("method", method) - trace.AnnotateSQL(span, sqlparser.Preview(sql)) + if !trace.IsNoop() { + span.Annotate("method", method) + trace.AnnotateSQL(span, sqlparser.Preview(sql)) + } defer span.Finish() logStats := logstats.NewLogStats(ctx, method, sql, safeSession.GetSessionUUID(), bindVars, streamlog.GetQueryLogConfig()) @@ -287,7 +289,9 @@ func (e *Executor) Execute( } logStats.SaveEndTime() - e.queryLogger.Send(logStats) + if e.queryLogger.Send(logStats) == 0 { + logStats.Release() + } err = errorTransform.TransformError(err) err = vterrors.TruncateError(err, truncateErrorLen) @@ -323,8 +327,10 @@ func (e *Executor) StreamExecute( callback func(*sqltypes.Result) error, ) error { span, ctx := trace.NewSpan(ctx, "executor.StreamExecute") - span.Annotate("method", method) - trace.AnnotateSQL(span, sqlparser.Preview(sql)) + if !trace.IsNoop() { + span.Annotate("method", method) + trace.AnnotateSQL(span, sqlparser.Preview(sql)) + } defer span.Finish() logStats := logstats.NewLogStats(ctx, method, sql, safeSession.GetSessionUUID(), bindVars, streamlog.GetQueryLogConfig()) @@ -400,11 +406,7 @@ func (e *Executor) StreamExecute( // 5: Log and add statistics logStats.TablesUsed = plan.TablesUsed - executedRoot := vc.ExecutedPrimitive() - if executedRoot == nil { - executedRoot = plan.Instructions - } - logStats.RoutingIndexesUsed = engine.GetRoutingIndexes(executedRoot) + logStats.RoutingIndexesUsed = plan.RoutingIndexes logStats.TabletType = vc.TabletType().String() logStats.ExecuteTime = time.Since(execStart) logStats.ActiveKeyspace = vc.GetKeyspace() @@ -433,7 +435,9 @@ func (e *Executor) StreamExecute( } logStats.SaveEndTime() - e.queryLogger.Send(logStats) + if e.queryLogger.Send(logStats) == 0 { + logStats.Release() + } err = errorTransform.TransformError(err) err = vterrors.TruncateError(err, truncateErrorLen) @@ -1189,7 +1193,9 @@ func (e *Executor) fetchOrCreatePlan( e.applyQueryHints(vcursor, plan) logStats.SQL = comments.Leading + plan.Original + comments.Trailing - logStats.BindVariables = sqltypes.CopyBindVariables(bindVars) + if e.queryLogger.HasSubscribers() { + logStats.BindVariables = sqltypes.CopyBindVariables(bindVars) + } return plan, vcursor, stmt, nil } @@ -1508,7 +1514,11 @@ func (e *Executor) Prepare(ctx context.Context, method string, safeSession *econ // it was a no-op record (i.e. didn't issue any queries) if logStats.StmtType != "ROLLBACK" || logStats.ShardQueries != 0 { logStats.SaveEndTime() - e.queryLogger.Send(logStats) + if e.queryLogger.Send(logStats) == 0 { + logStats.Release() + } + } else { + logStats.Release() } err = errorTransform.TransformError(err) diff --git a/go/vt/vtgate/logstats/logstats.go b/go/vt/vtgate/logstats/logstats.go index 25bad941922..b6df343953d 100644 --- a/go/vt/vtgate/logstats/logstats.go +++ b/go/vt/vtgate/logstats/logstats.go @@ -20,6 +20,7 @@ import ( "context" "io" "net/url" + "sync" "time" "github.com/google/safehtml" @@ -31,6 +32,12 @@ import ( querypb "vitess.io/vitess/go/vt/proto/query" ) +var logStatsPool = sync.Pool{ + New: func() any { + return &LogStats{} + }, +} + // LogStats records the stats for a single vtgate query type LogStats struct { Config streamlog.QueryLogConfig @@ -63,15 +70,44 @@ type LogStats struct { // NewLogStats constructs a new LogStats with supplied Method and ctx // field values, and the StartTime field set to the present time. func NewLogStats(ctx context.Context, methodName, sql, sessionUUID string, bindVars map[string]*querypb.BindVariable, config streamlog.QueryLogConfig) *LogStats { - return &LogStats{ - Ctx: ctx, - Method: methodName, - SQL: sql, - SessionUUID: sessionUUID, - BindVariables: bindVars, - StartTime: time.Now(), - Config: config, - } + ls := logStatsPool.Get().(*LogStats) + ls.Config = config + ls.Ctx = ctx + ls.Method = methodName + ls.TabletType = "" + ls.StmtType = "" + ls.SQL = sql + ls.BindVariables = bindVars + ls.StartTime = time.Now() + ls.EndTime = time.Time{} + ls.ShardQueries = 0 + ls.RowsAffected = 0 + ls.RowsReturned = 0 + ls.PlanTime = 0 + ls.ExecuteTime = 0 + ls.CommitTime = 0 + ls.Error = nil + ls.TablesUsed = nil + ls.RoutingIndexesUsed = nil + ls.SessionUUID = sessionUUID + ls.CachedPlan = false + ls.ActiveKeyspace = "" + ls.MirrorSourceExecuteTime = 0 + ls.MirrorTargetExecuteTime = 0 + ls.MirrorTargetError = nil + return ls +} + +// Release returns the LogStats to the pool for reuse. +// After calling Release, the LogStats must not be accessed. +func (stats *LogStats) Release() { + stats.Ctx = nil + stats.BindVariables = nil + stats.Error = nil + stats.TablesUsed = nil + stats.RoutingIndexesUsed = nil + stats.MirrorTargetError = nil + logStatsPool.Put(stats) } // SaveEndTime sets the end time of this request to now diff --git a/go/vt/vtgate/plan_execute.go b/go/vt/vtgate/plan_execute.go index 24921d459cc..1f2c6674b94 100644 --- a/go/vt/vtgate/plan_execute.go +++ b/go/vt/vtgate/plan_execute.go @@ -500,11 +500,7 @@ func (e *Executor) logExecutionEnd(logStats *logstats.LogStats, execStart time.T logStats.RowsReturned = uint64(len(qr.Rows)) // log the tables and routing indexes used in the plan for successful query execution. logStats.TablesUsed = plan.TablesUsed - executedRoot := vcursor.ExecutedPrimitive() - if executedRoot == nil { - executedRoot = plan.Instructions - } - logStats.RoutingIndexesUsed = engine.GetRoutingIndexes(executedRoot) + logStats.RoutingIndexesUsed = plan.RoutingIndexes } e.updateQueryStats(plan.QueryType.String(), plan.Type.String(), vcursor.TabletType().String(), int64(logStats.ShardQueries), logStats.TablesUsed) diff --git a/go/vt/vtgate/scatter_conn.go b/go/vt/vtgate/scatter_conn.go index 7d2626bc717..6ec221ff621 100644 --- a/go/vt/vtgate/scatter_conn.go +++ b/go/vt/vtgate/scatter_conn.go @@ -98,13 +98,13 @@ func NewScatterConn(statsName string, txConn *TxConn, gw *TabletGateway) *Scatte } } -func (stc *ScatterConn) startAction(name string, target *querypb.Target) (time.Time, []string) { - statsKey := []string{name, target.Keyspace, target.Shard, topoproto.TabletTypeLString(target.TabletType)} +func (stc *ScatterConn) startAction(name string, target *querypb.Target) (time.Time, [4]string) { + statsKey := [4]string{name, target.Keyspace, target.Shard, topoproto.TabletTypeLString(target.TabletType)} startTime := time.Now() return startTime, statsKey } -func (stc *ScatterConn) endAction(startTime time.Time, allErrors *concurrency.AllErrorRecorder, statsKey []string, err *error, session *econtext.SafeSession) { +func (stc *ScatterConn) endAction(startTime time.Time, allErrors *concurrency.AllErrorRecorder, statsKey [4]string, err *error, session *econtext.SafeSession) { if *err != nil { allErrors.RecordError(*err) // Don't increment the error counter for duplicate @@ -112,21 +112,21 @@ func (stc *ScatterConn) endAction(startTime time.Time, allErrors *concurrency.Al // client queries and are not VTGate's fault. ec := vterrors.Code(*err) if ec != vtrpcpb.Code_ALREADY_EXISTS && ec != vtrpcpb.Code_INVALID_ARGUMENT { - stc.tabletCallErrorCount.Add(statsKey, 1) + stc.tabletCallErrorCount.Add(statsKey[:], 1) } if ec == vtrpcpb.Code_RESOURCE_EXHAUSTED || ec == vtrpcpb.Code_ABORTED { session.SetRollback() } } - stc.timings.Record(statsKey, startTime) + stc.timings.Record(statsKey[:], startTime) } -func (stc *ScatterConn) endLockAction(startTime time.Time, allErrors *concurrency.AllErrorRecorder, statsKey []string, err *error) { +func (stc *ScatterConn) endLockAction(startTime time.Time, allErrors *concurrency.AllErrorRecorder, statsKey [4]string, err *error) { if *err != nil { allErrors.RecordError(*err) - stc.tabletCallErrorCount.Add(statsKey, 1) + stc.tabletCallErrorCount.Add(statsKey[:], 1) } - stc.timings.Record(statsKey, startTime) + stc.timings.Record(statsKey[:], startTime) } type reset int @@ -160,7 +160,12 @@ func (stc *ScatterConn) ExecuteMultiShard( // mu protects qr var mu sync.Mutex - qr = new(sqltypes.Result) + var singleShard bool + if len(rss) == 1 { + singleShard = true + } else { + qr = new(sqltypes.Result) + } if session.InLockSession() && triggerLockHeartBeat(session) { go stc.runLockQuery(ctx, session) @@ -273,26 +278,46 @@ func (stc *ScatterConn) ExecuteMultiShard( if err != nil { return newInfo, err } - mu.Lock() - defer mu.Unlock() - if innerqr != nil { - resultsObserver.Observe(innerqr) - } + if singleShard { + // Single-shard fast path: use innerqr directly, no mutex needed. + if innerqr != nil { + resultsObserver.Observe(innerqr) + } + qr = innerqr + } else { + mu.Lock() + defer mu.Unlock() - // Don't append more rows if row count is exceeded. - if ignoreMaxMemoryRows || len(qr.Rows) <= maxMemoryRows { - qr.AppendResult(innerqr) + if innerqr != nil { + resultsObserver.Observe(innerqr) + } + + // Don't append more rows if row count is exceeded. + if ignoreMaxMemoryRows || len(qr.Rows) <= maxMemoryRows { + qr.AppendResult(innerqr) + } } return newInfo, nil }, ) + if qr == nil { + qr = new(sqltypes.Result) + } + if !ignoreMaxMemoryRows && len(qr.Rows) > maxMemoryRows { return nil, []error{vterrors.NewErrorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, vterrors.NetPacketTooLarge, "in-memory row count exceeded allowed limit of %d", maxMemoryRows)} } - return qr, allErrors.GetErrors() + errs = allErrors.GetErrors() + if errs == nil { + // Success path: recycle the recorder. Safe because GetErrors() + // returned nil (no error slice to alias). + allErrors.Errors = nil + allErrorRecorderPool.Put(allErrors) + } + return qr, errs } func triggerLockHeartBeat(session *econtext.SafeSession) bool { @@ -677,46 +702,64 @@ func (stc *ScatterConn) multiGoTransaction( action shardActionTransactionFunc, ) (allErrors *concurrency.AllErrorRecorder) { numShards := len(rss) - allErrors = new(concurrency.AllErrorRecorder) + allErrors = allErrorRecorderPool.Get().(*concurrency.AllErrorRecorder) if numShards == 0 { return allErrors } - oneShard := func(rs *srvtopo.ResolvedShard, i int) { + + if numShards == 1 { + // Single-shard fast path: inline the logic to avoid closure allocation. + rs := rss[0] var err error startTime, statsKey := stc.startAction(name, rs.Target) - defer stc.endAction(startTime, allErrors, statsKey, &err, session) info, shardSession, err := actionInfo(ctx, rs.Target, session, autocommit, stc.txConn.txMode.TransactionMode()) - if err != nil { - return - } - info, err = action(rs, i, info) - if info == nil { - return - } - if info.ignoreOldSession { - shardSession = nil - } - if shardSession != nil && info.rowsAffected { - // We might not always update or append in the session. - // We need to track if rows were affected in the transaction. - shardSession.RowsAffected = info.rowsAffected - } - if info.actionNeeded != nothing && (info.transactionID != 0 || info.reservedID != 0) { - appendErr := session.AppendOrUpdate(rs.Target, info, shardSession, stc.txConn.txMode.TransactionMode()) - if appendErr != nil { - err = appendErr + if err == nil { + info, err = action(rs, 0, info) + if info != nil { + if info.ignoreOldSession { + shardSession = nil + } + if shardSession != nil && info.rowsAffected { + shardSession.RowsAffected = info.rowsAffected + } + if info.actionNeeded != nothing && (info.transactionID != 0 || info.reservedID != 0) { + if appendErr := session.AppendOrUpdate(rs.Target, info, shardSession, stc.txConn.txMode.TransactionMode()); appendErr != nil { + err = appendErr + } + } } } - } - if numShards == 1 { - // only one shard, do it synchronously. - for i, rs := range rss { - oneShard(rs, i) - } + stc.endAction(startTime, allErrors, statsKey, &err, session) } else { + oneShard := func(rs *srvtopo.ResolvedShard, i int) { + var err error + startTime, statsKey := stc.startAction(name, rs.Target) + defer stc.endAction(startTime, allErrors, statsKey, &err, session) + + info, shardSession, err := actionInfo(ctx, rs.Target, session, autocommit, stc.txConn.txMode.TransactionMode()) + if err != nil { + return + } + info, err = action(rs, i, info) + if info == nil { + return + } + if info.ignoreOldSession { + shardSession = nil + } + if shardSession != nil && info.rowsAffected { + shardSession.RowsAffected = info.rowsAffected + } + if info.actionNeeded != nothing && (info.transactionID != 0 || info.reservedID != 0) { + appendErr := session.AppendOrUpdate(rs.Target, info, shardSession, stc.txConn.txMode.TransactionMode()) + if appendErr != nil { + err = appendErr + } + } + } var panicRecord atomic.Value var wg sync.WaitGroup for i, rs := range rss { @@ -866,11 +909,11 @@ func actionInfo(ctx context.Context, target *querypb.Target, session *econtext.S alias: alias, }, nil, nil } - return &shardActionInfo{}, nil, nil + return zeroActionInfo, nil, nil } ignoreSession := ctx.Value(engine.IgnoreReserveTxn) if ignoreSession != nil { - return &shardActionInfo{}, nil, nil + return zeroActionInfo, nil, nil } // No need to protect ourselves from the race condition between // Find and AppendOrUpdate. The higher level functions ensure that no @@ -984,6 +1027,17 @@ func (sai *shardActionInfo) updateTransactionAndReservedID(txID int64, rID int64 type actionNeeded int +// zeroActionInfo is a read-only singleton used by actionInfo to avoid heap +// allocation for the common non-transactional, non-reserved query path. +// updateTransactionAndReservedID copies before mutating, so this is safe. +var zeroActionInfo = &shardActionInfo{} + +// allErrorRecorderPool reuses AllErrorRecorder objects on the success path +// (no errors) to avoid per-query allocation. +var allErrorRecorderPool = sync.Pool{ + New: func() any { return &concurrency.AllErrorRecorder{} }, +} + const ( nothing actionNeeded = iota reserveBegin diff --git a/go/vt/vthash/hash.go b/go/vt/vthash/hash.go index 87a2e17412d..a3a994f904a 100644 --- a/go/vt/vthash/hash.go +++ b/go/vt/vthash/hash.go @@ -42,3 +42,8 @@ var defaultHash256Key = [32]byte{} func New256() *Hasher256 { return highway.New(defaultHash256Key) } + +// Init256 initializes a Hasher256 in-place using the default key, avoiding heap allocation. +func Init256(h *Hasher256) { + highway.Init(h, defaultHash256Key) +} diff --git a/go/vt/vthash/highway/highwayhash.go b/go/vt/vthash/highway/highwayhash.go index 20b9ca4d3a8..a475099c76e 100644 --- a/go/vt/vthash/highway/highwayhash.go +++ b/go/vt/vthash/highway/highwayhash.go @@ -46,6 +46,14 @@ func New(key [Size]byte) *Digest { return h } +// Init initializes a Digest in-place with the given key, avoiding heap allocation. +// This is useful when the caller allocates the Digest on the stack. +func Init(d *Digest, key [Size]byte) { + d.size = Size + d.key = key + d.Reset() +} + // New128 returns a hash.Hash computing the HighwayHash-128 checksum. // It returns a non-nil error if the key is not 32 bytes long. func New128(key [Size]byte) *Digest { diff --git a/go/vt/vttablet/grpctabletconn/conn.go b/go/vt/vttablet/grpctabletconn/conn.go index dedd3dd2ea3..e90c6907e7d 100644 --- a/go/vt/vttablet/grpctabletconn/conn.go +++ b/go/vt/vttablet/grpctabletconn/conn.go @@ -83,6 +83,14 @@ type gRPCQueryClient struct { var _ queryservice.QueryService = (*gRPCQueryClient)(nil) +var executeRequestPool = sync.Pool{ + New: func() any { + return &querypb.ExecuteRequest{ + Query: &querypb.BoundQuery{}, + } + }, +} + // DialTablet creates and initializes gRPCQueryClient. func DialTablet(ctx context.Context, tablet *topodatapb.Tablet, failFast grpcclient.FailFast) (queryservice.QueryService, error) { // create the RPC client @@ -119,19 +127,27 @@ func (conn *gRPCQueryClient) Execute(ctx context.Context, _ queryservice.Session return nil, tabletconn.ConnClosed } - req := &querypb.ExecuteRequest{ - EffectiveCallerId: callerid.EffectiveCallerIDFromContext(ctx), - ImmediateCallerId: callerid.ImmediateCallerIDFromContext(ctx), - Target: target, - Query: &querypb.BoundQuery{ - Sql: query, - BindVariables: bindVars, - }, - TransactionId: transactionID, - Options: options, - ReservedId: reservedID, - } + req := executeRequestPool.Get().(*querypb.ExecuteRequest) + req.EffectiveCallerId = callerid.EffectiveCallerIDFromContext(ctx) + req.ImmediateCallerId = callerid.ImmediateCallerIDFromContext(ctx) + req.Target = target + req.Query.Sql = query + req.Query.BindVariables = bindVars + req.TransactionId = transactionID + req.Options = options + req.ReservedId = reservedID + er, err := conn.c.Execute(ctx, req) + + // Return request to pool. Clear references to avoid retaining caller memory. + req.EffectiveCallerId = nil + req.ImmediateCallerId = nil + req.Target = nil + req.Query.Sql = "" + req.Query.BindVariables = nil + req.Options = nil + executeRequestPool.Put(req) + if err != nil { return nil, tabletconn.ErrorFromGRPC(err) }