Skip to content

Commit 49e6020

Browse files
committed
txnkv: fix SetCommitWaitUntilTSOTimeout does not work in causal consistency (tikv#1847)
fix tikv#1846 Signed-off-by: Chao Wang <cclcwangchao@hotmail.com>
1 parent a7b7bb4 commit 49e6020

File tree

4 files changed

+32
-7
lines changed

4 files changed

+32
-7
lines changed

integration_tests/option_test.go

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -119,10 +119,6 @@ func (s *testOptionSuite) GetHistogramMetricSampleCount(m any) uint64 {
119119
}
120120

121121
func (s *testOptionSuite) TestSetCommitWaitUntilTSO() {
122-
if *withTiKV {
123-
s.T().Skip("TestSetCommitWaitUntilTSO only runs on local mock storage")
124-
}
125-
126122
s.Nil(failpoint.Enable("tikvclient/InjectStartTSForCommit", "return(true)"))
127123
defer func() {
128124
s.Nil(failpoint.Disable("tikvclient/InjectStartTSForCommit"))
@@ -151,6 +147,7 @@ func (s *testOptionSuite) TestSetCommitWaitUntilTSO() {
151147
// }
152148
mockCommitTSO []uint64
153149
setWaitTimeout time.Duration
150+
extraPrepare func(transaction.TxnProbe)
154151
err bool
155152
}{
156153
{
@@ -181,11 +178,38 @@ func (s *testOptionSuite) TestSetCommitWaitUntilTSO() {
181178
setWaitTimeout: time.Millisecond,
182179
err: true,
183180
},
181+
{
182+
name: "should also check for 1pc",
183+
commitWaitTSO: oracle.ComposeTS(10000, 0),
184+
mockCommitTSO: []uint64{100},
185+
extraPrepare: func(txn transaction.TxnProbe) {
186+
txn.SetEnableAsyncCommit(true)
187+
txn.SetEnable1PC(true)
188+
},
189+
err: true,
190+
},
191+
{
192+
name: "should also check for causal consistency",
193+
commitWaitTSO: oracle.ComposeTS(10000, 0),
194+
mockCommitTSO: []uint64{100},
195+
extraPrepare: func(txn transaction.TxnProbe) {
196+
txn.SetEnableAsyncCommit(true)
197+
txn.SetEnable1PC(true)
198+
txn.SetCausalConsistency(true)
199+
},
200+
err: true,
201+
},
184202
} {
185203
s.Run(tc.name, func() {
186204
txn, err := s.store.Begin()
187205
s.NoError(err)
188206
s.NoError(txn.Set([]byte("somekey:"+uuid.NewString()), []byte("somevalue")))
207+
// by default, 1pc and async commit are disabled, but it can be overridden in extraPrepare
208+
txn.SetEnableAsyncCommit(false)
209+
txn.SetEnable1PC(false)
210+
if tc.extraPrepare != nil {
211+
tc.extraPrepare(txn)
212+
}
189213

190214
txn.SetCommitWaitUntilTSO(tc.commitWaitTSO + txn.StartTS())
191215
if tc.setWaitTimeout > 0 {

internal/unionstore/memdb_art.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,6 @@ func (db *artDBWithContext) SnapshotIterReverse(upper, lower []byte) Iterator {
166166
}
167167

168168
// SnapshotGetter returns a Getter for a snapshot of MemBuffer.
169-
func (db *artDBWithContext) SnapshotGetter() Getter {
169+
func (db *artDBWithContext) SnapshotGetter() kv.Getter {
170170
return db.ART.SnapshotGetter()
171171
}

internal/unionstore/memdb_rbt.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,6 @@ func (db *rbtDBWithContext) SnapshotIterReverse(upper, lower []byte) Iterator {
176176
}
177177

178178
// SnapshotGetter returns a Getter for a snapshot of MemBuffer.
179-
func (db *rbtDBWithContext) SnapshotGetter() Getter {
179+
func (db *rbtDBWithContext) SnapshotGetter() kv.Getter {
180180
return db.RBT.SnapshotGetter()
181181
}

txnkv/transaction/2pc.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1747,7 +1747,8 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) {
17471747
// all nodes, we have to make sure the commit TS of this transaction is greater
17481748
// than the snapshot TS of all existent readers. So we get a new timestamp
17491749
// from PD and plus one as our MinCommitTS.
1750-
if commitTSMayBeCalculated && c.needLinearizability() {
1750+
// If `commitWaitUntilTSO > 0`, we also need to get a new timestamp from TSO to ensure the constraint satisfied.
1751+
if commitTSMayBeCalculated && (c.needLinearizability() || c.txn.commitWaitUntilTSO > 0) {
17511752
util.EvalFailpoint("getMinCommitTSFromTSO")
17521753
start := time.Now()
17531754
latestTS, err := c.txn.GetTimestampForCommit(bo, c.txn.GetScope())

0 commit comments

Comments
 (0)