Skip to content

Commit f1e8923

Browse files
ejortegauClaude
andauthored
feat: add --consolidator-reject-on-cap flag to reject queries when waiter cap exceeded (#19836)
Signed-off-by: Eduardo Ortega <5791035+ejortegau@users.noreply.github.com> Co-authored-by: Claude <svc-devxp-claude@slack-corp.com>
1 parent da94cc8 commit f1e8923

6 files changed

Lines changed: 114 additions & 0 deletions

File tree

changelog/25.0/25.0.0/summary.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
- [Default data protection for `_reverse` workflow cancel/complete](#vreplication-reverse-workflow-data-protection)
1313
- **[VTGate](#minor-changes-vtgate)**
1414
- [New controls for cross-keyspace reads](#vtgate-cross-keyspace-reads)
15+
- **[VTTablet](#minor-changes-vttablet)**
16+
- [Consolidator Reject on Waiter Cap](#vttablet-consolidator-reject-on-cap)
1517
- **[VTTablet](#minor-changes-vttablet)**
1618
- [Schema engine table-count limit is now configurable](#vttablet-schema-max-table-count)
1719

@@ -67,6 +69,16 @@ The VTGate flag prevents cross-keyspace reads globally, regardless of per-keyspa
6769

6870
### <a id="minor-changes-vttablet"/>VTTablet</a>
6971

72+
#### <a id="vttablet-consolidator-reject-on-cap"/>Consolidator Reject on Waiter Cap</a>
73+
74+
A new `--consolidator-reject-on-cap` flag (default `false`) has been added to VTTablet. When enabled alongside a non-zero `--consolidator-query-waiter-cap`, queries that would join a consolidated result but exceed the **global** consolidator waiter cap are rejected with a `RESOURCE_EXHAUSTED` error instead of silently falling back to independent MySQL execution.
75+
76+
**Important:** The cap is enforced against the consolidator's global `totalWaiterCount` across all queries, not a per-query waiter count. This means a duplicate for query B can be rejected because query A has already consumed most of the global waiter budget. This provides backpressure when the consolidator as a whole is saturated, rather than when any single query has too many waiters.
77+
78+
See [#19836](https://github.com/vitessio/vitess/pull/19836) for details.
79+
80+
### <a id="minor-changes-vttablet"/>VTTablet</a>
81+
7082
#### <a id="vttablet-schema-max-table-count"/>Schema engine table-count limit is now configurable</a>
7183

7284
Previously the schema engine had a hardcoded cap of 10,000 tables: a vttablet whose underlying MySQL had more than 10,000 tables would fail to load its schema and could not serve queries. This made recovery from `EmergencyReparentShard` impossible without dropping tables directly on MySQL.

go/flags/endtoend/vtcombo.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ Flags:
4949
--config-type string Config file type (omit to infer config type from file extension).
5050
--consolidator-cache-proto3-rows If true, the consolidation leader pre-caches proto3-encoded rows so that waiters avoid redundant encoding work.
5151
--consolidator-query-waiter-cap int Configure the maximum number of clients allowed to wait on the consolidator.
52+
--consolidator-reject-on-cap If true, reject queries with a RESOURCE_EXHAUSTED error when the global consolidator waiter cap is exceeded, instead of falling back to independent execution. The cap is enforced globally across all consolidated queries, not per-query.
5253
--consolidator-stream-query-size int Configure the stream consolidator query size in bytes. Setting to 0 disables the stream consolidator. (default 2097152)
5354
--consolidator-stream-total-size int Configure the stream consolidator total size in bytes. Setting to 0 disables the stream consolidator. (default 134217728)
5455
--consul-auth-static-file string JSON File to read the topos/tokens from.

go/flags/endtoend/vttablet.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ Flags:
8282
--config-type string Config file type (omit to infer config type from file extension).
8383
--consolidator-cache-proto3-rows If true, the consolidation leader pre-caches proto3-encoded rows so that waiters avoid redundant encoding work.
8484
--consolidator-query-waiter-cap int Configure the maximum number of clients allowed to wait on the consolidator.
85+
--consolidator-reject-on-cap If true, reject queries with a RESOURCE_EXHAUSTED error when the global consolidator waiter cap is exceeded, instead of falling back to independent execution. The cap is enforced globally across all consolidated queries, not per-query.
8586
--consolidator-stream-query-size int Configure the stream consolidator query size in bytes. Setting to 0 disables the stream consolidator. (default 2097152)
8687
--consolidator-stream-total-size int Configure the stream consolidator total size in bytes. Setting to 0 disables the stream consolidator. (default 134217728)
8788
--consul-auth-static-file string JSON File to read the topos/tokens from.

go/vt/vttablet/tabletserver/query_executor.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -775,6 +775,9 @@ func (qre *QueryExecutor) execSelect() (*sqltypes.Result, error) {
775775
startTime := time.Now()
776776
q.Wait()
777777
qre.tsv.stats.WaitTimings.Record("Consolidations", startTime)
778+
} else if qre.tsv.config.ConsolidatorRejectOnCap {
779+
q.AddWaiterCounter(-1)
780+
return nil, vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "consolidator waiter cap exceeded")
778781
} else {
779782
// Waiter cap exceeded, fall back to independent query execution
780783
waiterCapExceeded = true

go/vt/vttablet/tabletserver/query_executor_test.go

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1697,6 +1697,101 @@ func TestQueryExecutorConsolidatorWaiterCapFallback(t *testing.T) {
16971697
db.VerifyAllExecutedOrFail()
16981698
}
16991699

1700+
func TestQueryExecutorConsolidatorRejectOnCap(t *testing.T) {
1701+
tests := []struct {
1702+
name string
1703+
rejectOnCap bool
1704+
wantErr bool
1705+
wantErrCode vtrpcpb.Code
1706+
wantErrMsg string
1707+
wantDBQueries int
1708+
wantWaitCalls int
1709+
wantCounterArgs []int64
1710+
}{
1711+
{
1712+
name: "reject enabled",
1713+
rejectOnCap: true,
1714+
wantErr: true,
1715+
wantErrCode: vtrpcpb.Code_RESOURCE_EXHAUSTED,
1716+
wantErrMsg: "consolidator waiter cap exceeded",
1717+
wantDBQueries: 0,
1718+
wantWaitCalls: 0,
1719+
wantCounterArgs: []int64{-1},
1720+
},
1721+
{
1722+
name: "reject disabled falls back to independent execution",
1723+
rejectOnCap: false,
1724+
wantErr: false,
1725+
wantDBQueries: 1,
1726+
wantWaitCalls: 0,
1727+
wantCounterArgs: []int64{-1},
1728+
},
1729+
}
1730+
1731+
for _, tc := range tests {
1732+
t.Run(tc.name, func(t *testing.T) {
1733+
db := setUpQueryExecutorTest(t)
1734+
defer db.Close()
1735+
1736+
ctx := context.Background()
1737+
tsv := newTestTabletServer(ctx, enableConsolidator, db)
1738+
defer tsv.StopService()
1739+
1740+
tsv.config.ConsolidatorQueryWaiterCap = 1
1741+
tsv.config.ConsolidatorRejectOnCap = tc.rejectOnCap
1742+
1743+
fakeConsolidator := sync2.NewFakeConsolidator()
1744+
tsv.qe.consolidator = fakeConsolidator
1745+
1746+
input := "select * from t limit 10001"
1747+
result := &sqltypes.Result{
1748+
Fields: getTestTableFields(),
1749+
Rows: [][]sqltypes.Value{{
1750+
sqltypes.NewInt32(1),
1751+
sqltypes.NewInt32(100),
1752+
sqltypes.NewInt32(200),
1753+
}},
1754+
}
1755+
1756+
fakePendingResult := &sync2.FakePendingResult{Consolidator: fakeConsolidator}
1757+
fakePendingResult.SetResult(result)
1758+
fakePendingResult.WaiterCount = 2
1759+
fakeConsolidator.SetTotalWaiterCount(2)
1760+
1761+
fakeConsolidator.CreateReturn = &sync2.FakeConsolidatorCreateReturn{
1762+
Created: false,
1763+
PendingResult: fakePendingResult,
1764+
}
1765+
1766+
db.AddQuery(input, result)
1767+
1768+
qre := newTestQueryExecutor(context.Background(), tsv, input, 0)
1769+
qre.options = &querypb.ExecuteOptions{Consolidator: querypb.ExecuteOptions_CONSOLIDATOR_ENABLED}
1770+
1771+
got, err := qre.Execute()
1772+
if tc.wantErr {
1773+
require.Error(t, err)
1774+
assert.Equal(t, tc.wantErrCode, vterrors.Code(err))
1775+
assert.ErrorContains(t, err, tc.wantErrMsg)
1776+
} else {
1777+
require.NoError(t, err)
1778+
require.NotNil(t, got)
1779+
}
1780+
1781+
require.Len(t, fakeConsolidator.CreateCalls, 1)
1782+
require.Equal(t, tc.wantWaitCalls, fakePendingResult.WaitCalls)
1783+
require.Equal(t, 0, fakePendingResult.BroadcastCalls)
1784+
1785+
require.Len(t, fakePendingResult.AddWaiterCounterCalls, len(tc.wantCounterArgs))
1786+
for i, wantArg := range tc.wantCounterArgs {
1787+
require.Equal(t, wantArg, fakePendingResult.AddWaiterCounterCalls[i])
1788+
}
1789+
1790+
require.Equal(t, tc.wantDBQueries, db.GetQueryCalledNum(input))
1791+
})
1792+
}
1793+
}
1794+
17001795
func TestGetConnectionLogStats(t *testing.T) {
17011796
db := setUpQueryExecutorTest(t)
17021797
defer db.Close()

go/vt/vttablet/tabletserver/tabletenv/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,7 @@ func registerTabletEnvFlags(fs *pflag.FlagSet) {
207207

208208
fs.Int64Var(&currentConfig.ConsolidatorQueryWaiterCap, "consolidator-query-waiter-cap", 0, "Configure the maximum number of clients allowed to wait on the consolidator.")
209209
fs.BoolVar(&currentConfig.ConsolidatorCacheProto3Rows, "consolidator-cache-proto3-rows", defaultConfig.ConsolidatorCacheProto3Rows, "If true, the consolidation leader pre-caches proto3-encoded rows so that waiters avoid redundant encoding work.")
210+
fs.BoolVar(&currentConfig.ConsolidatorRejectOnCap, "consolidator-reject-on-cap", defaultConfig.ConsolidatorRejectOnCap, "If true, reject queries with a RESOURCE_EXHAUSTED error when the global consolidator waiter cap is exceeded, instead of falling back to independent execution. The cap is enforced globally across all consolidated queries, not per-query.")
210211
utils.SetFlagDurationVar(fs, &healthCheckInterval, "health-check-interval", defaultConfig.Healthcheck.Interval, "Interval between health checks")
211212
utils.SetFlagDurationVar(fs, &degradedThreshold, "degraded-threshold", defaultConfig.Healthcheck.DegradedThreshold, "replication lag after which a replica is considered degraded")
212213
utils.SetFlagDurationVar(fs, &unhealthyThreshold, "unhealthy-threshold", defaultConfig.Healthcheck.UnhealthyThreshold, "replication lag after which a replica is considered unhealthy")
@@ -342,6 +343,7 @@ type TabletConfig struct {
342343
ConsolidatorStreamTotalSize int64 `json:"consolidatorStreamTotalSize,omitempty"`
343344
ConsolidatorStreamQuerySize int64 `json:"consolidatorStreamQuerySize,omitempty"`
344345
ConsolidatorQueryWaiterCap int64 `json:"consolidatorMaxQueryWait,omitempty"`
346+
ConsolidatorRejectOnCap bool `json:"consolidatorRejectOnCap,omitempty"`
345347
ConsolidatorCacheProto3Rows bool `json:"consolidatorCacheProto3Rows,omitempty"`
346348
QueryCacheMemory int64 `json:"queryCacheMemory,omitempty"`
347349
QueryCacheDoorkeeper bool `json:"queryCacheDoorkeeper,omitempty"`

0 commit comments

Comments
 (0)