Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletserver/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func (qre *QueryExecutor) execAutocommit(f func(conn *StatefulConnection) (*sqlt
}
qre.options.TransactionIsolation = querypb.ExecuteOptions_AUTOCOMMIT

if qre.tsv.txThrottler.Throttle(qre.tsv.getPriorityFromOptions(qre.options)) {
if qre.tsv.txThrottler.Throttle(qre.tsv.getPriorityFromOptions(qre.options), qre.options.GetWorkloadName()) {
return nil, errTxThrottled
}

Expand All @@ -238,7 +238,7 @@ func (qre *QueryExecutor) execAutocommit(f func(conn *StatefulConnection) (*sqlt
}

func (qre *QueryExecutor) execAsTransaction(f func(conn *StatefulConnection) (*sqltypes.Result, error)) (*sqltypes.Result, error) {
if qre.tsv.txThrottler.Throttle(qre.tsv.getPriorityFromOptions(qre.options)) {
if qre.tsv.txThrottler.Throttle(qre.tsv.getPriorityFromOptions(qre.options), qre.options.GetWorkloadName()) {
return nil, errTxThrottled
}
conn, beginSQL, _, err := qre.tsv.te.txPool.Begin(qre.ctx, qre.options, false, 0, nil, qre.setting)
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/query_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1806,6 +1806,6 @@ func (m mockTxThrottler) Open() (err error) {
func (m mockTxThrottler) Close() {
}

func (m mockTxThrottler) Throttle(priority int) (result bool) {
func (m mockTxThrottler) Throttle(priority int, workload string) (result bool) {
return m.throttle
}
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ func (tsv *TabletServer) begin(ctx context.Context, target *querypb.Target, save
target, options, false, /* allowOnShutdown */
func(ctx context.Context, logStats *tabletenv.LogStats) error {
startTime := time.Now()
if tsv.txThrottler.Throttle(tsv.getPriorityFromOptions(options)) {
if tsv.txThrottler.Throttle(tsv.getPriorityFromOptions(options), options.GetWorkloadName()) {
return errTxThrottled
}
var connSetting *pools.Setting
Expand Down
16 changes: 8 additions & 8 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type TxThrottler interface {
InitDBConfig(target *querypb.Target)
Open() (err error)
Close()
Throttle(priority int) (result bool)
Throttle(priority int, workload string) (result bool)
}

// ThrottlerInterface defines the public interface that is implemented by go/vt/throttler.Throttler
Expand Down Expand Up @@ -146,8 +146,8 @@ type txThrottler struct {
topoWatchers *stats.GaugesWithSingleLabel
healthChecksReadTotal *stats.CountersWithMultiLabels
healthChecksRecordedTotal *stats.CountersWithMultiLabels
requestsTotal *stats.Counter
requestsThrottled *stats.Counter
requestsTotal *stats.CountersWithSingleLabel
requestsThrottled *stats.CountersWithSingleLabel
}

// txThrottlerConfig holds the parameters that need to be
Expand Down Expand Up @@ -219,8 +219,8 @@ func NewTxThrottler(env tabletenv.Env, topoServer *topo.Server) TxThrottler {
[]string{"cell", "DbType"}),
healthChecksRecordedTotal: env.Exporter().NewCountersWithMultiLabels("TransactionThrottlerHealthchecksRecorded", "transaction throttler healthchecks recorded",
[]string{"cell", "DbType"}),
requestsTotal: env.Exporter().NewCounter("TransactionThrottlerRequests", "transaction throttler requests"),
requestsThrottled: env.Exporter().NewCounter("TransactionThrottlerThrottled", "transaction throttler requests throttled"),
requestsTotal: env.Exporter().NewCountersWithSingleLabel("TransactionThrottlerRequests", "transaction throttler requests", "workload"),
requestsThrottled: env.Exporter().NewCountersWithSingleLabel("TransactionThrottlerThrottled", "transaction throttler requests throttled", "workload"),
}
}

Expand Down Expand Up @@ -263,7 +263,7 @@ func (t *txThrottler) Close() {
// It returns true if the transaction should not proceed (the caller
// should back off). Throttle requires that Open() was previously called
// successfully.
func (t *txThrottler) Throttle(priority int) (result bool) {
func (t *txThrottler) Throttle(priority int, workload string) (result bool) {
if !t.config.enabled {
return false
}
Expand All @@ -275,9 +275,9 @@ func (t *txThrottler) Throttle(priority int) (result bool) {
// are less likely to be throttled.
result = t.state.throttle() && rand.Intn(sqlparser.MaxPriorityValue) < priority

t.requestsTotal.Add(1)
t.requestsTotal.Add(workload, 1)
if result {
t.requestsThrottled.Add(1)
t.requestsThrottled.Add(workload, 1)
}

return result
Expand Down
20 changes: 10 additions & 10 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestDisabledThrottler(t *testing.T) {
Shard: "shard",
})
assert.Nil(t, throttler.Open())
assert.False(t, throttler.Throttle(0))
assert.False(t, throttler.Throttle(0, "some_workload"))
throttlerImpl, _ := throttler.(*txThrottler)
assert.Zero(t, throttlerImpl.throttlerRunning.Get())
throttler.Close()
Expand Down Expand Up @@ -129,9 +129,9 @@ func TestEnabledThrottler(t *testing.T) {
assert.Equal(t, int64(1), throttlerImpl.throttlerRunning.Get())
assert.Equal(t, map[string]int64{"cell1": 1, "cell2": 1}, throttlerImpl.topoWatchers.Counts())

assert.False(t, throttlerImpl.Throttle(100))
assert.Equal(t, int64(1), throttlerImpl.requestsTotal.Get())
assert.Zero(t, throttlerImpl.requestsThrottled.Get())
assert.False(t, throttlerImpl.Throttle(100, "some_workload"))
assert.Equal(t, int64(1), throttlerImpl.requestsTotal.Counts()["some_workload"])
assert.Zero(t, throttlerImpl.requestsThrottled.Counts()["some_workload"])

throttlerImpl.state.StatsUpdate(tabletStats) // This calls replication lag thing
assert.Equal(t, map[string]int64{"cell1.REPLICA": 1}, throttlerImpl.healthChecksReadTotal.Counts())
Expand All @@ -148,14 +148,14 @@ func TestEnabledThrottler(t *testing.T) {
assert.Equal(t, map[string]int64{"cell1.REPLICA": 1}, throttlerImpl.healthChecksRecordedTotal.Counts())

// The second throttle call should reject.
assert.True(t, throttlerImpl.Throttle(100))
assert.Equal(t, int64(2), throttlerImpl.requestsTotal.Get())
assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Get())
assert.True(t, throttlerImpl.Throttle(100, "some_workload"))
assert.Equal(t, int64(2), throttlerImpl.requestsTotal.Counts()["some_workload"])
assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Counts()["some_workload"])

// This call should not throttle due to priority. Check that's the case and counters agree.
assert.False(t, throttlerImpl.Throttle(0))
assert.Equal(t, int64(3), throttlerImpl.requestsTotal.Get())
assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Get())
assert.False(t, throttlerImpl.Throttle(0, "some_workload"))
assert.Equal(t, int64(3), throttlerImpl.requestsTotal.Counts()["some_workload"])
assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Counts()["some_workload"])
throttlerImpl.Close()
assert.Zero(t, throttlerImpl.throttlerRunning.Get())
assert.Equal(t, map[string]int64{"cell1": 0, "cell2": 0}, throttlerImpl.topoWatchers.Counts())
Expand Down