Skip to content

Commit 073f90d

Browse files
authored
Merge pull request #149 from joshuazh-x/msgapp-index
raft: next index shall be larger than match index.
2 parents 23c936a + 6439482 commit 073f90d

2 files changed

Lines changed: 55 additions & 1 deletion

File tree

raft_test.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4824,6 +4824,58 @@ func votedWithConfig(configFunc func(*Config), vote, term uint64) *raft {
48244824
return sm
48254825
}
48264826

4827+
func TestLogReplicationWithReorderedMessage(t *testing.T) {
4828+
r1 := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2)))
4829+
r1.becomeCandidate()
4830+
r1.becomeLeader()
4831+
r1.readMessages()
4832+
r1.trk.Progress[2].BecomeReplicate()
4833+
4834+
r2 := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2)))
4835+
4836+
// r1 sends 2 MsgApp messages to r2.
4837+
mustAppendEntry(r1, pb.Entry{Data: []byte("somedata")})
4838+
r1.sendAppend(2)
4839+
req1 := expectOneMessage(t, r1)
4840+
mustAppendEntry(r1, pb.Entry{Data: []byte("somedata")})
4841+
r1.sendAppend(2)
4842+
req2 := expectOneMessage(t, r1)
4843+
4844+
// r2 receives the second MsgApp first due to reordering.
4845+
r2.Step(req2)
4846+
resp2 := expectOneMessage(t, r2)
4847+
// r2 rejects req2
4848+
require.True(t, resp2.Reject)
4849+
require.Equal(t, uint64(0), resp2.RejectHint)
4850+
require.Equal(t, uint64(2), resp2.Index)
4851+
4852+
// r2 handles the first MsgApp and responses to r1.
4853+
// And r1 updates match index accordingly.
4854+
r2.Step(req1)
4855+
m := expectOneMessage(t, r2)
4856+
require.False(t, m.Reject)
4857+
require.Equal(t, uint64(2), m.Index)
4858+
r1.Step(m)
4859+
m = expectOneMessage(t, r1)
4860+
require.Equal(t, uint64(2), r1.trk.Progress[2].Match)
4861+
4862+
// r1 observes a transient network issue to r2, hence transits to probe state.
4863+
r1.Step(pb.Message{From: 2, To: 1, Type: pb.MsgUnreachable})
4864+
require.Equal(t, tracker.StateProbe, r1.trk.Progress[2].State)
4865+
4866+
// now r1 receives the delayed resp2.
4867+
r1.Step(resp2)
4868+
m = expectOneMessage(t, r1)
4869+
// r1 shall re-send MsgApp from match index even if resp2's reject hint is less than matching index.
4870+
require.Equal(t, r1.trk.Progress[2].Match, m.Index)
4871+
}
4872+
4873+
func expectOneMessage(t *testing.T, r *raft) pb.Message {
4874+
msgs := r.readMessages()
4875+
require.Len(t, msgs, 1, "expect one message")
4876+
return msgs[0]
4877+
}
4878+
48274879
type network struct {
48284880
t *testing.T // optional
48294881

tracker/progress.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,11 +203,13 @@ func (pr *Progress) MaybeDecrTo(rejected, matchHint uint64) bool {
203203

204204
// The rejection must be stale if "rejected" does not match next - 1. This
205205
// is because non-replicating followers are probed one entry at a time.
206+
// The check is a best effort assuming message reordering is rare.
206207
if pr.Next-1 != rejected {
207208
return false
208209
}
209210

210-
pr.Next = max(min(rejected, matchHint+1), 1)
211+
// Next index shall always be larger than match index.
212+
pr.Next = max(min(rejected, matchHint+1), pr.Match+1)
211213
pr.MsgAppFlowPaused = false
212214
return true
213215
}

0 commit comments

Comments
 (0)