Skip to content

Commit 01af7d7

Browse files
committed
Merge pull request #1612 from aaijazi/aaijazi_reject_tx_buffering
Return an error when tx buffer is full
2 parents b011e59 + a7cf5f3 commit 01af7d7

4 files changed

Lines changed: 39 additions & 15 deletions

File tree

go/vt/vtgate/discoverygateway.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,9 @@ func (dg *discoveryGateway) Begin(ctx context.Context, keyspace string, shard st
143143
err = dg.withRetry(ctx, keyspace, shard, tabletType, func(conn tabletconn.TabletConn) error {
144144
var innerErr error
145145
// Potentially buffer this transaction.
146-
txbuffer.FakeBuffer(keyspace, shard, attemptNumber)
146+
if bufferErr := txbuffer.FakeBuffer(keyspace, shard, attemptNumber); bufferErr != nil {
147+
return bufferErr
148+
}
147149
transactionID, innerErr = conn.Begin(ctx)
148150
attemptNumber++
149151
return innerErr

go/vt/vtgate/shard_conn.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,9 @@ func (sdc *ShardConn) Begin(ctx context.Context) (transactionID int64, err error
170170
err = sdc.withRetry(ctx, func(conn tabletconn.TabletConn) error {
171171
var innerErr error
172172
// Potentially buffer this transaction.
173-
txbuffer.FakeBuffer(sdc.keyspace, sdc.shard, attemptNumber)
173+
if bufferErr := txbuffer.FakeBuffer(sdc.keyspace, sdc.shard, attemptNumber); bufferErr != nil {
174+
return bufferErr
175+
}
174176
transactionID, innerErr = conn.Begin(ctx)
175177
attemptNumber++
176178
return innerErr

go/vt/vtgate/txbuffer/tx_buffer.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,14 @@ but will not return transient errors during the buffering time.
1515
package txbuffer
1616

1717
import (
18+
"errors"
1819
"flag"
1920
"sync"
2021
"time"
2122

2223
"github.com/youtube/vitess/go/stats"
24+
vtrpcpb "github.com/youtube/vitess/go/vt/proto/vtrpc"
25+
"github.com/youtube/vitess/go/vt/vterrors"
2326
)
2427

2528
var (
@@ -39,26 +42,32 @@ var (
3942
// timeSleep can be mocked out in unit tests
4043
var timeSleep = time.Sleep
4144

45+
// errBufferFull is the error returned a buffer request is rejected because the buffer is full.
46+
var errBufferFull = vterrors.FromError(
47+
vtrpcpb.ErrorCode_TRANSIENT_ERROR,
48+
errors.New("transaction buffer full, rejecting request"),
49+
)
50+
4251
// FakeBuffer will pretend to buffer new transactions in VTGate.
4352
// Transactions *will NOT actually be buffered*, they will just have a delayed start time.
4453
// This can be useful to understand what the impact of trasaction buffering will be
4554
// on upstream callers. Once the impact is measured, it can be used to tweak parameter values
4655
// for the best behavior.
4756
// FakeBuffer should be called before the VtTablet Begin, otherwise it will increase transaction times.
48-
func FakeBuffer(keyspace, shard string, attemptNumber int) {
57+
func FakeBuffer(keyspace, shard string, attemptNumber int) error {
4958
// Only buffer on the first Begin attempt, not on possible retries.
5059
if !*enableFakeTxBuffer || attemptNumber != 0 {
51-
return
60+
return nil
5261
}
5362
if keyspace != *bufferKeyspace || shard != *bufferShard {
54-
return
63+
return nil
5564
}
5665
bufferedTransactionsAttempted.Add(1)
5766

5867
bufferMu.Lock()
5968
if int(bufferedTransactions.Get()) >= *maxBufferSize {
6069
bufferMu.Unlock()
61-
return
70+
return errBufferFull
6271
}
6372
bufferedTransactions.Add(1)
6473
bufferMu.Unlock()
@@ -67,4 +76,5 @@ func FakeBuffer(keyspace, shard string, attemptNumber int) {
6776
timeSleep(*fakeBufferDelay)
6877
// Don't need to lock for this, as there's no race when decrementing the count
6978
bufferedTransactions.Add(-1)
79+
return nil
7080
}

go/vt/vtgate/txbuffer/tx_buffer_test.go

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ func TestFakeBuffer(t *testing.T) {
5858
wantCalled bool
5959
// expected value of BufferedTransactionAttempts
6060
wantAttempted int
61+
wantErr error
6162
}{
6263
{
6364
desc: "enableFakeBuffer=False",
@@ -88,6 +89,7 @@ func TestFakeBuffer(t *testing.T) {
8889
bufferedTransactions: *maxBufferSize,
8990
// When the buffer is full, bufferedTransactionsAttempted should still be incremented
9091
wantAttempted: 1,
92+
wantErr: errBufferFull,
9193
},
9294
{
9395
desc: "buffered successful",
@@ -107,7 +109,11 @@ func TestFakeBuffer(t *testing.T) {
107109

108110
*enableFakeTxBuffer = test.enableFakeBuffer
109111

110-
FakeBuffer(test.keyspace, test.shard, test.attemptNumber)
112+
gotErr := FakeBuffer(test.keyspace, test.shard, test.attemptNumber)
113+
114+
if gotErr != test.wantErr {
115+
t.Errorf("With %v, FakeBuffer() => %v; want: %v", test.desc, gotErr, test.wantErr)
116+
}
111117

112118
if controller.called != test.wantCalled {
113119
t.Errorf("With %v, FakeBuffer() => timeSleep.called: %v; want: %v",
@@ -162,23 +168,27 @@ func TestParallelFakeBuffer(t *testing.T) {
162168

163169
wg.Add(1)
164170
finished := make(chan struct{})
171+
var gotErr error
165172
go func() {
166173
defer wg.Done()
167-
FakeBuffer(*bufferKeyspace, *bufferShard, 0)
174+
gotErr = FakeBuffer(*bufferKeyspace, *bufferShard, 0)
168175
close(finished)
169176
}()
170177

171-
if wantFakeSleepCalled {
172-
// the first maxBufferSize calls to FakeBuffer
173-
// should call sleep, wait until they do
174-
<-controller.blocked
175-
} else {
176-
// the rest should not block, wait until they're done
177-
<-finished
178+
// wait until either the gorouotine is blocked (because it's buffering) or until
179+
// it's finished (because it shouldn't be buffered).
180+
select {
181+
case <-controller.blocked:
182+
case <-finished:
178183
}
179184

180185
if controller.called {
181186
controllers = append(controllers, controller)
187+
} else {
188+
// if we didn't call fakeSleep, the buffer is full and should return an error saying so.
189+
if gotErr != errBufferFull {
190+
t.Errorf("On iteration %v, FakeBuffer() => %v; want: %v", i, gotErr, errBufferFull)
191+
}
182192
}
183193

184194
if controller.called != wantFakeSleepCalled {

0 commit comments

Comments
 (0)