Skip to content

Remove operations pull completed tokens from txn-queue #301

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: development
Choose a base branch
from
32 changes: 28 additions & 4 deletions txn/flusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package txn
import (
"fmt"

mgo "github.com/globalsign/mgo"
"github.com/globalsign/mgo"

"github.com/globalsign/mgo/bson"
)
Expand Down Expand Up @@ -857,12 +857,13 @@ func (f *flusher) apply(t *transaction, pull map[bson.ObjectId]*transaction) err
updated := false
if !hasToken(stash.Queue, tt) {
var set, unset bson.D
cleanedQueue := filterQueue(info.Queue, pull, tt)
if revno == 0 {
// Missing revno in stash means -1.
set = bson.D{{Name: "txn-queue", Value: info.Queue}}
set = bson.D{{Name: "txn-queue", Value: cleanedQueue}}
unset = bson.D{{Name: "n", Value: 1}, {Name: "txn-revno", Value: 1}}
} else {
set = bson.D{{Name: "txn-queue", Value: info.Queue}, {Name: "txn-revno", Value: newRevno}}
set = bson.D{{Name: "txn-queue", Value: cleanedQueue}, {Name: "txn-revno", Value: newRevno}}
unset = bson.D{{Name: "n", Value: 1}}
}
qdoc := bson.D{{Name: "_id", Value: dkey}, {Name: "n", Value: nonce}}
Expand Down Expand Up @@ -898,7 +899,12 @@ func (f *flusher) apply(t *transaction, pull map[bson.ObjectId]*transaction) err
var info txnInfo
if _, err = f.sc.Find(qdoc).Apply(change, &info); err == nil {
f.debugf("Stash for document %v has revno %d and queue: %v", dkey, info.Revno, info.Queue)
d = setInDoc(d, bson.D{{Name: "_id", Value: op.Id}, {Name: "txn-revno", Value: newRevno}, {Name: "txn-queue", Value: info.Queue}})
cleanedQueue := filterQueue(info.Queue, pull, tt)
d = setInDoc(d, bson.D{
{Name: "_id", Value: op.Id},
{Name: "txn-revno", Value: newRevno},
{Name: "txn-queue", Value: cleanedQueue},
})
// Unlikely yet unfortunate race in here if this gets seriously
// delayed. If someone inserts+removes meanwhile, this will
// reinsert, and there's no way to avoid that while keeping the
Expand Down Expand Up @@ -998,6 +1004,24 @@ func tokensToPull(dqueue []tokenAndId, pull map[bson.ObjectId]*transaction, dont
return result
}

// filterQueue takes an existing queue and removes all the items in pull from it. The returned slice will only contain
// items that aren't in 'pull'.
func filterQueue(queue []token, pull map[bson.ObjectId]*transaction, dontPull token) ([]token) {
cleaned := make([]token, 0, len(queue))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When filtering like this, a nice trick I like is:
cleaned := queue[:0]
Because the slice is created with the same underlying array, there's no re-allocation up to the original length.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I almost went this way, but ended up avoiding it because it modifies the original object. We do use it for debug logging later on, so I didn't want to confuse the situation, especially since it doesn't rewrite info.Queue's length, it means an array that could end up with the tail doubled.

for _, tt := range queue {
if tt == dontPull {
cleaned = append(cleaned, tt)
continue
}
txnId := tt.id()
if _, ok := pull[txnId]; !ok {
// shouldn't be pulled, so add it to the cleaned queue
cleaned = append(cleaned, tt)
}
}
return cleaned
}

func objToDoc(obj interface{}) (d bson.D, err error) {
data, err := bson.Marshal(obj)
if err != nil {
Expand Down
34 changes: 34 additions & 0 deletions txn/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ func TestAll(t *testing.T) {
TestingT(t)
}

var fast = flag.Bool("fast", false, "Skip slow tests")

type S struct {
server dbtest.DBServer
session *mgo.Session
Expand Down Expand Up @@ -578,6 +580,9 @@ func (s *S) TestPurgeMissing(c *C) {
}

func (s *S) TestTxnQueueStashStressTest(c *C) {
if *fast {
c.Skip("-fast was supplied and this test is slow")
}
txn.SetChaos(txn.Chaos{
SlowdownChance: 0.3,
Slowdown: 50 * time.Millisecond,
Expand Down Expand Up @@ -1065,3 +1070,32 @@ func (s *S) TestTxnQueuePreparing(c *C) {
}
c.Check(len(qdoc.Queue), Equals, expectedCount)
}

func (s *S) TestTxnQueueAddAndRemove(c *C) {
opts := txn.DefaultRunnerOptions()
opts.MaxTxnQueueLength = 10
s.runner.SetOptions(opts)
opInsert := []txn.Op{{
C: "accounts",
Id: 0,
Insert: M{"balance": 0},
}}
opRemove := []txn.Op{{
C: "accounts",
Id: 0,
Remove: true,
}}
err := s.runner.Run(opInsert, "", nil)
c.Assert(err, IsNil)
for n := 0; n < 10; n++ {
err = s.runner.Run(opRemove, "", nil)
c.Assert(err, IsNil)
err = s.runner.Run(opInsert, "", nil)
c.Assert(err, IsNil)
}
var qdoc txnQueue
err = s.accounts.FindId(0).One(&qdoc)
c.Assert(err, IsNil)
// Both Remove and Insert should prune all the completed transactions
c.Check(len(qdoc.Queue), Equals, 1)
}