@@ -141,10 +141,7 @@ func fetchKnownCells(ctx context.Context, topoServer *topo.Server, target *query
141141// be executing a method. The only exception is the 'Throttle' method where multiple goroutines are
142142// allowed to execute it concurrently.
143143type txThrottler struct {
144- // config stores the transaction throttler's configuration.
145- // It is populated in NewTxThrottler and is not modified
146- // since.
147- config * txThrottlerConfig
144+ config * tabletenv.TabletConfig
148145
149146 // state holds an open transaction throttler state. It is nil
150147 // if the TransactionThrottler is closed.
@@ -162,30 +159,6 @@ type txThrottler struct {
162159 requestsThrottled * stats.CountersWithSingleLabel
163160}
164161
165- // txThrottlerConfig holds the parameters that need to be
166- // passed when constructing a TxThrottler object.
167- type txThrottlerConfig struct {
168- // enabled is true if the transaction throttler is enabled. All methods
169- // of a disabled transaction throttler do nothing and Throttle() always
170- // returns false.
171- enabled bool
172-
173- // if dryRun is true, the txThrottler will run only on monitoring mode, meaning that it will increase counters for
174- // total and actually throttled requests, but it will not actually return that a transaction should be throttled.
175- dryRun bool
176-
177- throttlerConfig * throttlerdatapb.Configuration
178- // healthCheckCells stores the cell names in which running vttablets will be monitored for
179- // replication lag.
180- healthCheckCells []string
181-
182- // tabletTypes stores the tablet types for throttling
183- tabletTypes map [topodatapb.TabletType ]bool
184-
185- // rate to refresh topo for cells
186- topoRefreshInterval time.Duration
187- }
188-
189162type txThrottlerState interface {
190163 deallocateResources ()
191164 StatsUpdate (tabletStats * discovery.TabletHealth )
@@ -194,7 +167,7 @@ type txThrottlerState interface {
194167
195168// txThrottlerStateImpl holds the state of an open TxThrottler object.
196169type txThrottlerStateImpl struct {
197- config * txThrottlerConfig
170+ config * tabletenv. TabletConfig
198171 txThrottler * txThrottler
199172
200173 // throttleMu serializes calls to throttler.Throttler.Throttle(threadId).
@@ -208,49 +181,38 @@ type txThrottlerStateImpl struct {
208181 healthCheckChan chan * discovery.TabletHealth
209182 healthCheckCells []string
210183 cellsFromTopo bool
184+
185+ // tabletTypes stores the tablet types for throttling
186+ tabletTypes map [topodatapb.TabletType ]bool
211187}
212188
213- // NewTxThrottler tries to construct a txThrottler from the
214- // relevant fields in the tabletenv.Config object. It returns a disabled TxThrottler if
215- // any error occurs.
216- // This function calls tryCreateTxThrottler that does the actual creation work
217- // and returns an error if one occurred.
189+ // NewTxThrottler tries to construct a txThrottler from the relevant
190+ // fields in the tabletenv.Env and topo.Server objects.
218191func NewTxThrottler (env tabletenv.Env , topoServer * topo.Server ) TxThrottler {
219- throttlerConfig := & txThrottlerConfig { enabled : false }
220-
221- if env . Config (). EnableTxThrottler {
222- // Clone tsv.TxThrottlerHealthCheckCells so that we don't assume tsv.TxThrottlerHealthCheckCells
223- // is immutable.
224- healthCheckCells := env . Config (). TxThrottlerHealthCheckCells
225-
226- tabletTypes := make ( map [topodatapb. TabletType ] bool , len ( * env . Config (). TxThrottlerTabletTypes ))
227- for _ , tabletType := range * env . Config (). TxThrottlerTabletTypes {
228- tabletTypes [ tabletType ] = true
192+ config := env . Config ()
193+ if config . EnableTxThrottler {
194+ if len ( config . TxThrottlerHealthCheckCells ) == 0 {
195+ defer log . Infof ( "Initialized transaction throttler using tabletTypes: %+v, cellsFromTopo: true, topoRefreshInterval: %s, throttlerConfig: %q" ,
196+ config . TxThrottlerTabletTypes , config . TxThrottlerTopoRefreshInterval , config . TxThrottlerConfig . Get (),
197+ )
198+ } else {
199+ defer log . Infof ( "Initialized transaction throttler using tabletTypes: %+v, healthCheckCells: %+v, throttlerConfig: %q" ,
200+ config . TxThrottlerTabletTypes , config . TxThrottlerHealthCheckCells , config . TxThrottlerConfig . Get (),
201+ )
229202 }
230-
231- throttlerConfig = & txThrottlerConfig {
232- enabled : true ,
233- healthCheckCells : healthCheckCells ,
234- dryRun : env .Config ().TxThrottlerDryRun ,
235- tabletTypes : tabletTypes ,
236- throttlerConfig : env .Config ().TxThrottlerConfig .Get (),
237- topoRefreshInterval : env .Config ().TxThrottlerTopoRefreshInterval ,
238- }
239-
240- defer log .Infof ("Initialized transaction throttler with config: %+v" , throttlerConfig )
241203 }
242204
243205 return & txThrottler {
244- config : throttlerConfig ,
206+ config : config ,
245207 topoServer : topoServer ,
246- throttlerRunning : env .Exporter ().NewGauge ("TransactionThrottlerRunning " , "transaction throttler running state" ),
247- topoWatchers : env .Exporter ().NewGaugesWithSingleLabel ("TransactionThrottlerTopoWatchers " , "transaction throttler topology watchers" , "cell" ),
248- healthChecksReadTotal : env .Exporter ().NewCountersWithMultiLabels ("TransactionThrottlerHealthchecksRead " , "transaction throttler healthchecks read" ,
208+ throttlerRunning : env .Exporter ().NewGauge (TxThrottlerName + "Running " , "transaction throttler running state" ),
209+ topoWatchers : env .Exporter ().NewGaugesWithSingleLabel (TxThrottlerName + "TopoWatchers " , "transaction throttler topology watchers" , "cell" ),
210+ healthChecksReadTotal : env .Exporter ().NewCountersWithMultiLabels (TxThrottlerName + "HealthchecksRead " , "transaction throttler healthchecks read" ,
249211 []string {"cell" , "DbType" }),
250- healthChecksRecordedTotal : env .Exporter ().NewCountersWithMultiLabels ("TransactionThrottlerHealthchecksRecorded " , "transaction throttler healthchecks recorded" ,
212+ healthChecksRecordedTotal : env .Exporter ().NewCountersWithMultiLabels (TxThrottlerName + "HealthchecksRecorded " , "transaction throttler healthchecks recorded" ,
251213 []string {"cell" , "DbType" }),
252- requestsTotal : env .Exporter ().NewCountersWithSingleLabel ("TransactionThrottlerRequests " , "transaction throttler requests" , "workload" ),
253- requestsThrottled : env .Exporter ().NewCountersWithSingleLabel ("TransactionThrottlerThrottled " , "transaction throttler requests throttled" , "workload" ),
214+ requestsTotal : env .Exporter ().NewCountersWithSingleLabel (TxThrottlerName + "Requests " , "transaction throttler requests" , "workload" ),
215+ requestsThrottled : env .Exporter ().NewCountersWithSingleLabel (TxThrottlerName + "Throttled " , "transaction throttler requests throttled" , "workload" ),
254216 }
255217}
256218
@@ -261,7 +223,7 @@ func (t *txThrottler) InitDBConfig(target *querypb.Target) {
261223
262224// Open opens the transaction throttler. It must be called prior to 'Throttle'.
263225func (t * txThrottler ) Open () (err error ) {
264- if ! t .config .enabled {
226+ if ! t .config .EnableTxThrottler {
265227 return nil
266228 }
267229 if t .state != nil {
@@ -277,7 +239,7 @@ func (t *txThrottler) Open() (err error) {
277239// It should be called after the throttler is no longer needed.
278240// It's ok to call this method on a closed throttler--in which case the method does nothing.
279241func (t * txThrottler ) Close () {
280- if ! t .config .enabled {
242+ if ! t .config .EnableTxThrottler {
281243 return
282244 }
283245 if t .state == nil {
@@ -294,7 +256,7 @@ func (t *txThrottler) Close() {
294256// should back off). Throttle requires that Open() was previously called
295257// successfully.
296258func (t * txThrottler ) Throttle (priority int , workload string ) (result bool ) {
297- if ! t .config .enabled {
259+ if ! t .config .EnableTxThrottler {
298260 return false
299261 }
300262 if t .state == nil {
@@ -310,11 +272,11 @@ func (t *txThrottler) Throttle(priority int, workload string) (result bool) {
310272 t .requestsThrottled .Add (workload , 1 )
311273 }
312274
313- return result && ! t .config .dryRun
275+ return result && ! t .config .TxThrottlerDryRun
314276}
315277
316- func newTxThrottlerState (txThrottler * txThrottler , config * txThrottlerConfig , target * querypb.Target ) (txThrottlerState , error ) {
317- maxReplicationLagModuleConfig := throttler.MaxReplicationLagModuleConfig {Configuration : config .throttlerConfig }
278+ func newTxThrottlerState (txThrottler * txThrottler , config * tabletenv. TabletConfig , target * querypb.Target ) (txThrottlerState , error ) {
279+ maxReplicationLagModuleConfig := throttler.MaxReplicationLagModuleConfig {Configuration : config .TxThrottlerConfig . Get () }
318280
319281 t , err := throttlerFactory (
320282 TxThrottlerName ,
@@ -326,13 +288,20 @@ func newTxThrottlerState(txThrottler *txThrottler, config *txThrottlerConfig, ta
326288 if err != nil {
327289 return nil , err
328290 }
329- if err := t .UpdateConfiguration (config .throttlerConfig , true /* copyZeroValues */ ); err != nil {
291+ if err := t .UpdateConfiguration (config .TxThrottlerConfig . Get () , true /* copyZeroValues */ ); err != nil {
330292 t .Close ()
331293 return nil , err
332294 }
295+
296+ tabletTypes := make (map [topodatapb.TabletType ]bool , len (* config .TxThrottlerTabletTypes ))
297+ for _ , tabletType := range * config .TxThrottlerTabletTypes {
298+ tabletTypes [tabletType ] = true
299+ }
300+
333301 state := & txThrottlerStateImpl {
334302 config : config ,
335- healthCheckCells : config .healthCheckCells ,
303+ healthCheckCells : config .TxThrottlerHealthCheckCells ,
304+ tabletTypes : tabletTypes ,
336305 throttler : t ,
337306 txThrottler : txThrottler ,
338307 }
@@ -402,7 +371,7 @@ func (ts *txThrottlerStateImpl) updateHealthCheckCells(ctx context.Context, topo
402371func (ts * txThrottlerStateImpl ) healthChecksProcessor (ctx context.Context , topoServer * topo.Server , target * querypb.Target ) {
403372 var cellsUpdateTicks <- chan time.Time
404373 if ts .cellsFromTopo {
405- ticker := time .NewTicker (ts .config .topoRefreshInterval )
374+ ticker := time .NewTicker (ts .config .TxThrottlerTopoRefreshInterval )
406375 cellsUpdateTicks = ticker .C
407376 defer ticker .Stop ()
408377 }
@@ -420,7 +389,7 @@ func (ts *txThrottlerStateImpl) healthChecksProcessor(ctx context.Context, topoS
420389
421390func (ts * txThrottlerStateImpl ) throttle () bool {
422391 if ts .throttler == nil {
423- log .Error ("throttle called after deallocateResources was called" )
392+ log .Error ("txThrottler: throttle called after deallocateResources was called" )
424393 return false
425394 }
426395 // Serialize calls to ts.throttle.Throttle()
@@ -442,7 +411,7 @@ func (ts *txThrottlerStateImpl) deallocateResources() {
442411
443412// StatsUpdate updates the health of a tablet with the given healthcheck.
444413func (ts * txThrottlerStateImpl ) StatsUpdate (tabletStats * discovery.TabletHealth ) {
445- if ts .config . tabletTypes == nil {
414+ if len ( ts .tabletTypes ) == 0 {
446415 return
447416 }
448417
@@ -451,8 +420,8 @@ func (ts *txThrottlerStateImpl) StatsUpdate(tabletStats *discovery.TabletHealth)
451420 ts .txThrottler .healthChecksReadTotal .Add (metricLabels , 1 )
452421
453422 // Monitor tablets for replication lag if they have a tablet
454- // type specified by the --tx_throttler_tablet_types flag.
455- if ts .config . tabletTypes [tabletType ] {
423+ // type specified by the --tx-throttler-tablet-types flag.
424+ if ts .tabletTypes [tabletType ] {
456425 ts .throttler .RecordReplicationLag (time .Now (), tabletStats )
457426 ts .txThrottler .healthChecksRecordedTotal .Add (metricLabels , 1 )
458427 }
0 commit comments