Skip to content

Commit af3b035

Browse files
committed
Take snapshots less frequently (#3367)
Taking snapshots frequently causes a straggler follower to have a hard time getting a new snapshot streamed. If the latter takes more time than the former, then the straggler would never catch up. So, instead of taking snapshots frequently, this PR adds a mechanism to track the progress of Raft in the p directory. This way, a restarted Alpha does not need to replay all the Raft logs, only the ones which it hasn't applied yet. This PR adds two new flags: 1. The duration after which we'd abort a txn. 2. The number of Raft logs after which a snapshot would be taken. This PR also removes a strange lastCommitTs logic, which was only spitting out an log error, without doing anything. Changes: * Keep track of Raft progress in p directory, so it can skip over already applied dataset. Allow user to specify how often to take snapshots. * Improve how we set raft progress. * Make x.Parse parse the new Raft key * Handle key not found error. * Fix up the debug wal to spit out the last entry as well.
1 parent eecc4e1 commit af3b035

5 files changed

Lines changed: 127 additions & 25 deletions

File tree

dgraph/cmd/alpha/run.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,15 @@ they form a Raft group and provide synchronous replication.
9999
"[mmap, disk] Specifies how Badger Value log is stored."+
100100
" mmap consumes more RAM, but provides better performance.")
101101

102+
// Snapshot and Transactions.
103+
flag.Int("snapshot_after", 10000,
104+
"Create a new Raft snapshot after this many number of Raft entries. The"+
105+
" lower this number, the more frequent snapshot creation would be."+
106+
" Also determines how often Rollups would happen.")
107+
flag.String("abort_older_than", "5m",
108+
"Abort any pending transactions older than this duration. The liveness of a"+
109+
" transaction is determined by its last mutation.")
110+
102111
// OpenCensus flags.
103112
flag.Float64("trace", 1.0, "The ratio of queries to trace.")
104113
flag.String("jaeger.collector", "", "Send opencensus traces to Jaeger.")
@@ -436,6 +445,10 @@ func run() {
436445

437446
ips, err := parseIPsFromString(Alpha.Conf.GetString("whitelist"))
438447
x.Check(err)
448+
449+
abortDur, err := time.ParseDuration(Alpha.Conf.GetString("abort_older_than"))
450+
x.Check(err)
451+
439452
worker.Config = worker.Options{
440453
ExportPath: Alpha.Conf.GetString("export"),
441454
NumPendingProposals: Alpha.Conf.GetInt("pending_proposals"),
@@ -447,6 +460,8 @@ func run() {
447460
WhiteListedIPRanges: ips,
448461
MaxRetries: Alpha.Conf.GetInt("max_retries"),
449462
StrictMutations: opts.MutationsMode == edgraph.StrictMutations,
463+
SnapshotAfter: Alpha.Conf.GetInt("snapshot_after"),
464+
AbortOlderThan: abortDur,
450465
}
451466

452467
x.LoadTLSConfig(&tlsConf, Alpha.Conf, tlsNodeCert, tlsNodeKey)

dgraph/cmd/debug/run.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -471,6 +471,9 @@ func printKeys(db *badger.DB) {
471471

472472
// Don't use a switch case here. Because multiple of these can be true. In particular,
473473
// IsSchema can be true alongside IsData.
474+
if pk.IsRaft() {
475+
buf.WriteString("{r}")
476+
}
474477
if pk.IsData() {
475478
buf.WriteString("{d}")
476479
}

worker/config.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@
1616

1717
package worker
1818

19-
import "net"
19+
import (
20+
"net"
21+
"time"
22+
)
2023

2124
type IPRange struct {
2225
Lower, Upper net.IP
@@ -34,6 +37,8 @@ type Options struct {
3437
WhiteListedIPRanges []IPRange
3538
MaxRetries int
3639
StrictMutations bool
40+
SnapshotAfter int
41+
AbortOlderThan time.Duration
3742
}
3843

3944
var Config Options

worker/draft.go

Lines changed: 86 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,6 @@ type node struct {
6060
gid uint32
6161
closer *y.Closer
6262

63-
lastCommitTs uint64 // Only used to ensure that our commit Ts is monotonically increasing.
64-
6563
streaming int32 // Used to avoid calculating snapshot
6664

6765
canCampaign bool
@@ -488,12 +486,7 @@ func (n *node) commitOrAbort(pkey string, delta *pb.OracleDelta) error {
488486
}
489487

490488
for _, status := range delta.Txns {
491-
if status.CommitTs > 0 && status.CommitTs < n.lastCommitTs {
492-
glog.Errorf("Lastcommit %d > current %d. This would cause some commits to be lost.",
493-
n.lastCommitTs, status.CommitTs)
494-
}
495489
toDisk(status.StartTs, status.CommitTs)
496-
n.lastCommitTs = status.CommitTs
497490
}
498491
if err := writer.Flush(); err != nil {
499492
return x.Errorf("Error while flushing to disk: %v", err)
@@ -623,6 +616,66 @@ func (n *node) rampMeter() {
623616
time.Sleep(3 * time.Millisecond)
624617
}
625618
}
619+
620+
func (n *node) findRaftProgress() (uint64, error) {
621+
var applied uint64
622+
err := pstore.View(func(txn *badger.Txn) error {
623+
item, err := txn.Get(x.RaftKey())
624+
if err == badger.ErrKeyNotFound {
625+
return nil
626+
}
627+
if err != nil {
628+
return err
629+
}
630+
return item.Value(func(val []byte) error {
631+
var snap pb.Snapshot
632+
if err := snap.Unmarshal(val); err != nil {
633+
return err
634+
}
635+
applied = snap.Index
636+
return nil
637+
})
638+
})
639+
return applied, err
640+
}
641+
642+
func (n *node) updateRaftProgress() error {
643+
// Both leader and followers can independently update their Raft progress. We don't store
644+
// this in Raft WAL. Instead, this is used to just skip over log records that this Alpha
645+
// has already applied, to speed up things on a restart.
646+
snap, err := n.calculateSnapshot(10) // 10 is a randomly chosen small number.
647+
if err != nil {
648+
return err
649+
}
650+
if snap == nil {
651+
return nil
652+
}
653+
654+
// Let's check what we already have. And only update if the new snap.Index is ahead of the last
655+
// stored applied.
656+
applied, err := n.findRaftProgress()
657+
if err != nil {
658+
return err
659+
}
660+
if snap.Index <= applied {
661+
return nil
662+
}
663+
664+
data, err := snap.Marshal()
665+
x.Check(err)
666+
txn := pstore.NewTransactionAt(math.MaxUint64, true)
667+
defer txn.Discard()
668+
669+
if err := txn.Set(x.RaftKey(), data); err != nil {
670+
return err
671+
}
672+
if err := txn.CommitAt(1, nil); err != nil {
673+
return err
674+
}
675+
glog.V(1).Infof("[%#x] Set Raft progress to index: %d.", n.Id, snap.Index)
676+
return nil
677+
}
678+
626679
func (n *node) Run() {
627680
defer n.closer.Done() // CLOSER:1
628681

@@ -647,7 +700,13 @@ func (n *node) Run() {
647700
close(done)
648701
}()
649702

650-
var snapshotLoops uint64
703+
applied, err := n.findRaftProgress()
704+
if err != nil {
705+
glog.Errorf("While trying to find raft progress: %v", err)
706+
} else {
707+
glog.Infof("Found Raft progress in p directory: %d", applied)
708+
}
709+
651710
for {
652711
select {
653712
case <-done:
@@ -660,23 +719,23 @@ func (n *node) Run() {
660719

661720
case <-slowTicker.C:
662721
n.elog.Printf("Size of applyCh: %d", len(n.applyCh))
722+
if err := n.updateRaftProgress(); err != nil {
723+
glog.Errorf("While updating Raft progress: %v", err)
724+
}
725+
663726
if leader {
664-
// We try to take a snapshot every slow tick duration, with a 1000 discard entries.
665-
// But, once a while, we take a snapshot with 10 discard entries. This avoids the
666-
// scenario where after bringing up an Alpha, and doing a hundred schema updates, we
667-
// don't take any snapshots because there are not enough updates (discardN=10),
668-
// which then really slows down restarts. At the same time, by checking more
669-
// frequently, we can quickly take a snapshot if a lot of mutations are coming in
670-
// fast (discardN=1000).
671-
discardN := 1000
672-
if snapshotLoops%5 == 0 {
673-
discardN = 10
674-
}
675-
snapshotLoops++
727+
// We keep track of the applied index in the p directory. Even if we don't take
728+
// snapshot for a while and let the Raft logs grow and restart, we would not have to
729+
// run all the log entries, because we can tell Raft.Config to set Applied to that
730+
// index.
731+
// This applied index tracking also covers the case when we have a big index
732+
// rebuild. The rebuild would be tracked just like others and would not need to be
733+
// replayed after a restart, because the Applied config would let us skip right
734+
// through it.
676735
// We use disk based storage for Raft. So, we're not too concerned about
677736
// snapshotting. We just need to do enough, so that we don't have a huge backlog of
678737
// entries to process on a restart.
679-
if err := n.proposeSnapshot(discardN); err != nil {
738+
if err := n.proposeSnapshot(Config.SnapshotAfter); err != nil {
680739
x.Errorf("While calculating and proposing snapshot: %v", err)
681740
}
682741
go n.abortOldTransactions()
@@ -782,6 +841,10 @@ func (n *node) Run() {
782841
n.elog.Printf("Found empty data at index: %d", entry.Index)
783842
n.Applied.Done(entry.Index)
784843

844+
} else if entry.Index < applied {
845+
n.elog.Printf("Skipping over already applied entry: %d", entry.Index)
846+
n.Applied.Done(entry.Index)
847+
785848
} else {
786849
proposal := &pb.Proposal{}
787850
if err := proposal.Unmarshal(entry.Data); err != nil {
@@ -971,7 +1034,6 @@ func (n *node) blockingAbort(req *pb.TxnTimestamps) error {
9711034

9721035
// Let's propose the txn updates received from Zero. This is important because there are edge
9731036
// cases where a txn status might have been missed by the group.
974-
glog.Infof("TryAbort returned with delta: %+v\n", delta)
9751037
aborted := &pb.OracleDelta{}
9761038
for _, txn := range delta.Txns {
9771039
// Only pick the aborts. DO NOT propose the commits. They must come in the right order via
@@ -1000,14 +1062,14 @@ func (n *node) blockingAbort(req *pb.TxnTimestamps) error {
10001062
// abort. Note that only the leader runs this function.
10011063
func (n *node) abortOldTransactions() {
10021064
// Aborts if not already committed.
1003-
starts := posting.Oracle().TxnOlderThan(5 * time.Minute)
1065+
starts := posting.Oracle().TxnOlderThan(Config.AbortOlderThan)
10041066
if len(starts) == 0 {
10051067
return
10061068
}
10071069
glog.Infof("Found %d old transactions. Acting to abort them.\n", len(starts))
10081070
req := &pb.TxnTimestamps{Ts: starts}
10091071
err := n.blockingAbort(req)
1010-
glog.Infof("abortOldTransactions for %d txns. Error: %+v\n", len(req.Ts), err)
1072+
glog.Infof("Done abortOldTransactions for %d txns. Error: %+v\n", len(req.Ts), err)
10111073
}
10121074

10131075
// calculateSnapshot would calculate a snapshot index, considering these factors:

x/keys.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ const (
3434
// keys of same attributes are located together
3535
defaultPrefix = byte(0x00)
3636
byteSchema = byte(0x01)
37+
byteType = byte(0x02)
38+
byteRaft = byte(0xff)
3739
)
3840

3941
func writeAttr(buf []byte, attr string) []byte {
@@ -46,6 +48,13 @@ func writeAttr(buf []byte, attr string) []byte {
4648
return rest[len(attr):]
4749
}
4850

51+
func RaftKey() []byte {
52+
buf := make([]byte, 5)
53+
buf[0] = byteRaft
54+
AssertTrue(4 == copy(buf[1:5], []byte("raft")))
55+
return buf
56+
}
57+
4958
// SchemaKey returns schema key for given attribute,
5059
// schema keys are stored separately with unique prefix,
5160
// since we need to iterate over all schema keys
@@ -123,6 +132,10 @@ type ParsedKey struct {
123132
bytePrefix byte
124133
}
125134

135+
func (p ParsedKey) IsRaft() bool {
136+
return p.bytePrefix == byteRaft
137+
}
138+
126139
func (p ParsedKey) IsData() bool {
127140
return p.byteType == ByteData
128141
}
@@ -256,6 +269,10 @@ func Parse(key []byte) *ParsedKey {
256269
p := &ParsedKey{}
257270

258271
p.bytePrefix = key[0]
272+
if p.bytePrefix == byteRaft {
273+
return p
274+
}
275+
259276
sz := int(binary.BigEndian.Uint16(key[1:3]))
260277
k := key[3:]
261278

0 commit comments

Comments
 (0)