Skip to content

Commit 35ecd48

Browse files
authored
fix: assign sequence when flushing retry buffers (#3362)
As noted in #3354, messages buffered due to the highWatermark would inadvertently skip the usual sequence number assignment, manifesting as OutOfOrderSequenceException on the broker. Wrote a functional test to trigger this behaviour and then used the same logic as found in partitionProducer.dispatch() to prevent the problem occurring. Signed-off-by: Dominic Evans <[email protected]>
1 parent 0347c60 commit 35ecd48

File tree

3 files changed

+152
-0
lines changed

3 files changed

+152
-0
lines changed

async_producer.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -751,6 +751,10 @@ func (pp *partitionProducer) flushRetryBuffers() {
751751
}
752752

753753
for _, msg := range pp.retryState[pp.highWatermark].buf {
754+
if pp.parent.conf.Producer.Idempotent && msg.retries == 0 && msg.flags == 0 && !msg.hasSequence {
755+
msg.sequenceNumber, msg.producerEpoch = pp.parent.txnmgr.getAndIncrementSequenceNumber(msg.Topic, msg.Partition)
756+
msg.hasSequence = true
757+
}
754758
pp.brokerProducer.input <- msg
755759
}
756760

async_producer_test.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,49 @@ func expectResults(t *testing.T, p AsyncProducer, successCount, errorCount int)
6161
expectResultsWithTimeout(t, p, successCount, errorCount, 5*time.Minute)
6262
}
6363

64+
func TestPartitionProducerFlushRetryBuffersAssignsSequence(t *testing.T) {
65+
cfg := NewTestConfig()
66+
cfg.Producer.Idempotent = true
67+
68+
txnmgr := &transactionManager{
69+
producerID: 1,
70+
producerEpoch: 0,
71+
sequenceNumbers: map[string]int32{"topic-0": 1},
72+
}
73+
74+
parent := &asyncProducer{
75+
conf: cfg,
76+
txnmgr: txnmgr,
77+
}
78+
79+
bp := &brokerProducer{
80+
input: make(chan *ProducerMessage, 1),
81+
}
82+
83+
pp := &partitionProducer{
84+
parent: parent,
85+
topic: "topic",
86+
partition: 0,
87+
brokerProducer: bp,
88+
retryState: make([]partitionRetryState, 1),
89+
highWatermark: 1,
90+
}
91+
92+
msg := &ProducerMessage{Topic: "topic", Partition: 0}
93+
pp.retryState[0].buf = []*ProducerMessage{msg}
94+
95+
pp.flushRetryBuffers()
96+
97+
select {
98+
case flushed := <-bp.input:
99+
require.True(t, flushed.hasSequence, "message should have a sequence assigned")
100+
require.Equal(t, int32(1), flushed.sequenceNumber, "sequence number should have increased")
101+
require.Equal(t, txnmgr.producerEpoch, flushed.producerEpoch, "producer epoch should be the same")
102+
default:
103+
t.Fatal("expected buffered message to flush")
104+
}
105+
}
106+
64107
type testPartitioner chan *int32
65108

66109
func (p testPartitioner) Partition(msg *ProducerMessage, numPartitions int32) (int32, error) {

functional_producer_test.go

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"errors"
88
"fmt"
99
"os"
10+
"os/exec"
1011
"strconv"
1112
"strings"
1213
"sync"
@@ -726,6 +727,110 @@ func TestFuncProducingIdempotentWithBrokerFailure(t *testing.T) {
726727
}
727728
}
728729

730+
func TestFuncIdempotentBufferedSequence(t *testing.T) {
731+
checkKafkaVersion(t, "0.11.0.0")
732+
setupFunctionalTest(t)
733+
defer teardownFunctionalTest(t)
734+
735+
const (
736+
topic = "test.1"
737+
partition int32 = 0
738+
)
739+
740+
cfg := NewFunctionalTestConfig()
741+
cfg.Net.MaxOpenRequests = 1
742+
cfg.Producer.Idempotent = true
743+
cfg.Producer.RequiredAcks = WaitForAll
744+
cfg.Producer.Return.Successes = true
745+
cfg.Producer.Return.Errors = true
746+
cfg.Producer.Retry.Max = 64
747+
cfg.Producer.Retry.Backoff = 250 * time.Millisecond
748+
749+
start := time.Now()
750+
751+
producer, err := NewAsyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, cfg)
752+
require.NoError(t, err)
753+
defer producer.Close()
754+
755+
asyncProd, ok := producer.(*asyncProducer)
756+
require.True(t, ok)
757+
758+
waitForMessages := func(count int) {
759+
timeout := time.After(2 * time.Minute)
760+
for count > 0 {
761+
select {
762+
case <-timeout:
763+
t.Fatalf("timed out waiting for %d messages", count)
764+
case perr := <-producer.Errors():
765+
if perr != nil {
766+
t.Logf("producer error: %v", perr.Err)
767+
}
768+
count--
769+
case <-producer.Successes():
770+
count--
771+
}
772+
}
773+
}
774+
775+
for i := 0; i < 5; i++ {
776+
producer.Input() <- &ProducerMessage{
777+
Topic: topic,
778+
Partition: partition,
779+
Value: StringEncoder(fmt.Sprintf("warmup-%d", i)),
780+
}
781+
}
782+
waitForMessages(5)
783+
784+
leader, err := asyncProd.client.Leader(topic, partition)
785+
require.NoError(t, err)
786+
787+
bp := asyncProd.getBrokerProducer(leader)
788+
defer asyncProd.unrefBrokerProducer(leader, bp)
789+
790+
asyncProd.inFlight.Add(1)
791+
pp := &partitionProducer{
792+
parent: asyncProd,
793+
topic: topic,
794+
partition: partition,
795+
brokerProducer: bp,
796+
leader: leader,
797+
retryState: make([]partitionRetryState, asyncProd.conf.Producer.Retry.Max+1),
798+
highWatermark: 1,
799+
}
800+
pp.retryState[0].buf = []*ProducerMessage{{
801+
Topic: topic,
802+
Partition: partition,
803+
Value: StringEncoder("buffered"),
804+
}}
805+
pp.flushRetryBuffers()
806+
807+
waitForMessages(1)
808+
809+
producer.Input() <- &ProducerMessage{
810+
Topic: topic,
811+
Partition: partition,
812+
Value: StringEncoder("post-buffer"),
813+
}
814+
waitForMessages(1)
815+
816+
logSince := start.UTC().Format(time.RFC3339)
817+
cmd := exec.Command(
818+
"docker",
819+
"compose",
820+
"logs",
821+
"--since",
822+
logSince,
823+
fmt.Sprintf("kafka-%d", leader.ID()),
824+
)
825+
cmd.Env = os.Environ()
826+
out, err := cmd.CombinedOutput()
827+
require.NoErrorf(t, err, "failed to read broker logs: %s", out)
828+
829+
logs := string(out)
830+
t.Logf("kafka-%d logs since %s:\n%s", leader.ID(), logSince, logs)
831+
require.NotContains(t, logs, "OutOfOrderSequenceException", "leader logs contained out-of-order sequence errors:\n%s", logs)
832+
}
833+
729834
func TestInterceptors(t *testing.T) {
730835
config := NewFunctionalTestConfig()
731836
setupFunctionalTest(t)

0 commit comments

Comments
 (0)