Skip to content

Commit e1f88a4

Browse files
Fix snapshot calculation in ludicrous mode (#5636)
In ludicrous mode snapshot calculation is not happening correctly. This is resulting in no snapshot being taken ever and hence w directory size is increasing with time. For snapshot creation we need last applied raft index(such that all raft index entries before this index are also applied) and max timestamp out of all committed transactions. Committed transactions are sent by Zero to all alpha leaders via subscription. In ludicrous mode we don't send transaction to Zero while committing. Hence Zero doesn't have anything to send as part of Oracle updates. Because of this, possible snapshot timestamp(maxCommitTs) in calculateSnapshot() is always 0. Hence no snapshot is ever taken. This PR addresses two issues: Correction while marking raft index done. Now any raft index will only be marked as done once we have applied mutations for proposal(raft entry). For ludicrous mode, use StartTs of any transaction for calculating maxCommitTs snapshot in calculateSnapshot()
1 parent f77aaaf commit e1f88a4

2 files changed

Lines changed: 18 additions & 14 deletions

File tree

worker/draft.go

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ func newNode(store *raftwal.DiskStorage, gid uint32, id uint64, myAddr string) *
215215
ops: make(map[op]*y.Closer),
216216
}
217217
if x.WorkerConfig.LudicrousMode {
218-
n.ex = newExecutor()
218+
n.ex = newExecutor(&m.Applied)
219219
}
220220
return n
221221
}
@@ -420,7 +420,7 @@ func (n *node) applyMutations(ctx context.Context, proposal *pb.Proposal) (rerr
420420
})
421421

422422
if x.WorkerConfig.LudicrousMode {
423-
n.ex.addEdges(ctx, m.StartTs, m.Edges)
423+
n.ex.addEdges(ctx, proposal.Index, m.StartTs, m.Edges)
424424
return nil
425425
}
426426

@@ -489,14 +489,7 @@ func (n *node) applyCommitted(proposal *pb.Proposal) error {
489489
span.Annotatef(nil, "While applying mutations: %v", err)
490490
return err
491491
}
492-
if x.WorkerConfig.LudicrousMode {
493-
ts := proposal.Mutations.StartTs
494-
return n.commitOrAbort(proposal.Key, &pb.OracleDelta{
495-
Txns: []*pb.TxnStatus{
496-
{StartTs: ts, CommitTs: ts},
497-
},
498-
})
499-
}
492+
500493
span.Annotate(nil, "Done")
501494
return nil
502495
}
@@ -1430,13 +1423,18 @@ func (n *node) calculateSnapshot(startIdx uint64, discardN int) (*pb.Snapshot, e
14301423
span.Annotatef(nil, "Error: %v", err)
14311424
return nil, err
14321425
}
1426+
1427+
var start uint64
14331428
if proposal.Mutations != nil {
1434-
start := proposal.Mutations.StartTs
1429+
start = proposal.Mutations.StartTs
14351430
if start >= minPendingStart && snapshotIdx == 0 {
14361431
snapshotIdx = entry.Index - 1
14371432
}
14381433
}
1439-
if proposal.Delta != nil {
1434+
// In ludicrous mode commitTs for any transaction is same as startTs.
1435+
if x.WorkerConfig.LudicrousMode {
1436+
maxCommitTs = x.Max(maxCommitTs, start)
1437+
} else if proposal.Delta != nil {
14401438
for _, txn := range proposal.Delta.GetTxns() {
14411439
maxCommitTs = x.Max(maxCommitTs, txn.CommitTs)
14421440
}

worker/executor.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ type subMutation struct {
3333
edges []*pb.DirectedEdge
3434
ctx context.Context
3535
startTs uint64
36+
index uint64
3637
}
3738

3839
type executor struct {
@@ -41,12 +42,14 @@ type executor struct {
4142
sync.RWMutex
4243
predChan map[string]chan *subMutation
4344
closer *y.Closer
45+
applied *y.WaterMark
4446
}
4547

46-
func newExecutor() *executor {
48+
func newExecutor(applied *y.WaterMark) *executor {
4749
ex := &executor{
4850
predChan: make(map[string]chan *subMutation),
4951
closer: y.NewCloser(0),
52+
applied: applied,
5053
}
5154
go ex.shutdown()
5255
return ex
@@ -80,6 +83,7 @@ func (e *executor) processMutationCh(ch chan *subMutation) {
8083
glog.Errorf("Error while waiting for writes: %v", err)
8184
}
8285

86+
e.applied.Done(payload.index)
8387
atomic.AddInt64(&e.pendingSize, -esize)
8488
}
8589
}
@@ -111,7 +115,7 @@ const (
111115
executorAddEdges = "executor.addEdges"
112116
)
113117

114-
func (e *executor) addEdges(ctx context.Context, startTs uint64, edges []*pb.DirectedEdge) {
118+
func (e *executor) addEdges(ctx context.Context, index, startTs uint64, edges []*pb.DirectedEdge) {
115119
rampMeter(&e.pendingSize, maxPendingEdgesSize, executorAddEdges)
116120

117121
payloadMap := make(map[string]*subMutation)
@@ -122,6 +126,7 @@ func (e *executor) addEdges(ctx context.Context, startTs uint64, edges []*pb.Dir
122126
payloadMap[edge.Attr] = &subMutation{
123127
ctx: ctx,
124128
startTs: startTs,
129+
index: index,
125130
}
126131
payload = payloadMap[edge.Attr]
127132
}
@@ -138,6 +143,7 @@ func (e *executor) addEdges(ctx context.Context, startTs uint64, edges []*pb.Dir
138143
default:
139144
// Closer is not closed. And we have the Lock, so sending on channel should be safe.
140145
for attr, payload := range payloadMap {
146+
e.applied.Begin(index)
141147
e.getChannelUnderLock(attr) <- payload
142148
}
143149
}

0 commit comments

Comments
 (0)