Skip to content

Commit d23c68f

Browse files
committed
fix: make sure FIFO order for write() when notifyChannelActive(), also make sure channel access thread-safe and avoid potential NPE
1 parent 761d602 commit d23c68f

File tree

2 files changed

+64
-42
lines changed

2 files changed

+64
-42
lines changed

src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java

Lines changed: 34 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -63,11 +63,11 @@ public class DefaultEndpoint implements RedisChannelWriter, Endpoint, PushHandle
6363

6464
private static final AtomicLong ENDPOINT_COUNTER = new AtomicLong();
6565

66-
private static final AtomicIntegerFieldUpdater<DefaultEndpoint> QUEUE_SIZE = AtomicIntegerFieldUpdater
67-
.newUpdater(DefaultEndpoint.class, "queueSize");
66+
private static final AtomicIntegerFieldUpdater<DefaultEndpoint> QUEUE_SIZE = AtomicIntegerFieldUpdater.newUpdater(
67+
DefaultEndpoint.class, "queueSize");
6868

69-
private static final AtomicIntegerFieldUpdater<DefaultEndpoint> STATUS = AtomicIntegerFieldUpdater
70-
.newUpdater(DefaultEndpoint.class, "status");
69+
private static final AtomicIntegerFieldUpdater<DefaultEndpoint> STATUS = AtomicIntegerFieldUpdater.newUpdater(
70+
DefaultEndpoint.class, "status");
7171

7272
private static final int ST_OPEN = 0;
7373

@@ -191,9 +191,9 @@ public <K, V, T> RedisCommand<K, V, T> write(RedisCommand<K, V, T> command) {
191191
}
192192

193193
if (autoFlushCommands) {
194-
195-
if (isConnected()) {
196-
writeToChannelAndFlush(command);
194+
Channel channel = this.channel;
195+
if (isConnected(channel)) {
196+
writeToChannelAndFlush(channel, command);
197197
} else {
198198
writeToDisconnectedBuffer(command);
199199
}
@@ -232,9 +232,9 @@ public <K, V, T> RedisCommand<K, V, T> write(RedisCommand<K, V, T> command) {
232232
}
233233

234234
if (autoFlushCommands) {
235-
236-
if (isConnected()) {
237-
writeToChannelAndFlush(commands);
235+
Channel channel = this.channel;
236+
if (isConnected(channel)) {
237+
writeToChannelAndFlush(channel, commands);
238238
} else {
239239
writeToDisconnectedBuffer(commands);
240240
}
@@ -284,10 +284,9 @@ private RedisException validateWrite(int commands) {
284284
return new RedisException("Connection is closed");
285285
}
286286

287+
final boolean connected = isConnected(this.channel);
287288
if (usesBoundedQueues()) {
288289

289-
boolean connected = isConnected();
290-
291290
if (QUEUE_SIZE.get(this) + commands > clientOptions.getRequestQueueSize()) {
292291
return new RedisException("Request queue size exceeded: " + clientOptions.getRequestQueueSize()
293292
+ ". Commands are not accepted until the queue size drops.");
@@ -304,7 +303,7 @@ private RedisException validateWrite(int commands) {
304303
}
305304
}
306305

307-
if (!isConnected() && rejectCommandsWhileDisconnected) {
306+
if (!connected && rejectCommandsWhileDisconnected) {
308307
return new RedisException("Currently not connected. Commands are rejected.");
309308
}
310309

@@ -366,11 +365,11 @@ private void writeToDisconnectedBuffer(RedisCommand<?, ?, ?> command) {
366365
commandBuffer.add(command);
367366
}
368367

369-
private void writeToChannelAndFlush(RedisCommand<?, ?, ?> command) {
368+
private void writeToChannelAndFlush(Channel channel, RedisCommand<?, ?, ?> command) {
370369

371370
QUEUE_SIZE.incrementAndGet(this);
372371

373-
ChannelFuture channelFuture = channelWriteAndFlush(command);
372+
ChannelFuture channelFuture = channelWriteAndFlush(channel, command);
374373

375374
if (reliability == Reliability.AT_MOST_ONCE) {
376375
// cancel on exceptions and remove from queue, because there is no housekeeping
@@ -383,30 +382,30 @@ private void writeToChannelAndFlush(RedisCommand<?, ?, ?> command) {
383382
}
384383
}
385384

386-
private void writeToChannelAndFlush(Collection<? extends RedisCommand<?, ?, ?>> commands) {
385+
private void writeToChannelAndFlush(Channel channel, Collection<? extends RedisCommand<?, ?, ?>> commands) {
387386

388387
QUEUE_SIZE.addAndGet(this, commands.size());
389388

390389
if (reliability == Reliability.AT_MOST_ONCE) {
391390

392391
// cancel on exceptions and remove from queue, because there is no housekeeping
393392
for (RedisCommand<?, ?, ?> command : commands) {
394-
channelWrite(command).addListener(AtMostOnceWriteListener.newInstance(this, command));
393+
channelWrite(channel, command).addListener(AtMostOnceWriteListener.newInstance(this, command));
395394
}
396395
}
397396

398397
if (reliability == Reliability.AT_LEAST_ONCE) {
399398

400399
// commands are ok to stay within the queue, reconnect will retrigger them
401400
for (RedisCommand<?, ?, ?> command : commands) {
402-
channelWrite(command).addListener(RetryListener.newInstance(this, command));
401+
channelWrite(channel, command).addListener(RetryListener.newInstance(this, command));
403402
}
404403
}
405404

406-
channelFlush();
405+
channelFlush(channel);
407406
}
408407

409-
private void channelFlush() {
408+
private void channelFlush(Channel channel) {
410409

411410
if (debugEnabled) {
412411
logger.debug("{} write() channelFlush", logPrefix());
@@ -415,7 +414,7 @@ private void channelFlush() {
415414
channel.flush();
416415
}
417416

418-
private ChannelFuture channelWrite(RedisCommand<?, ?, ?> command) {
417+
private ChannelFuture channelWrite(Channel channel, RedisCommand<?, ?, ?> command) {
419418

420419
if (debugEnabled) {
421420
logger.debug("{} write() channelWrite command {}", logPrefix(), command);
@@ -424,7 +423,7 @@ private ChannelFuture channelWrite(RedisCommand<?, ?, ?> command) {
424423
return channel.write(command);
425424
}
426425

427-
private ChannelFuture channelWriteAndFlush(RedisCommand<?, ?, ?> command) {
426+
private ChannelFuture channelWriteAndFlush(Channel channel, RedisCommand<?, ?, ?> command) {
428427

429428
if (debugEnabled) {
430429
logger.debug("{} write() writeAndFlush command {}", logPrefix(), command);
@@ -437,7 +436,6 @@ private ChannelFuture channelWriteAndFlush(RedisCommand<?, ?, ?> command) {
437436
public void notifyChannelActive(Channel channel) {
438437

439438
this.logPrefix = null;
440-
this.channel = channel;
441439
this.connectionError = null;
442440

443441
if (isClosed()) {
@@ -452,6 +450,7 @@ public void notifyChannelActive(Channel channel) {
452450
}
453451

454452
sharedLock.doExclusive(() -> {
453+
this.channel = channel;
455454

456455
try {
457456
// Move queued commands to buffer before issuing any commands because of connection activation.
@@ -474,7 +473,7 @@ public void notifyChannelActive(Channel channel) {
474473
inActivation = false;
475474
}
476475

477-
flushCommands(disconnectedBuffer);
476+
flushCommands(channel, disconnectedBuffer);
478477
} catch (Exception e) {
479478

480479
if (debugEnabled) {
@@ -527,7 +526,7 @@ public void notifyException(Throwable t) {
527526
doExclusive(this::drainCommands).forEach(cmd -> cmd.completeExceptionally(t));
528527
}
529528

530-
if (!isConnected()) {
529+
if (!isConnected(this.channel)) {
531530
connectionError = t;
532531
}
533532
}
@@ -540,16 +539,16 @@ public void registerConnectionWatchdog(ConnectionWatchdog connectionWatchdog) {
540539
@Override
541540
@SuppressWarnings({ "rawtypes", "unchecked" })
542541
public void flushCommands() {
543-
flushCommands(commandBuffer);
542+
flushCommands(this.channel, commandBuffer);
544543
}
545544

546-
private void flushCommands(Queue<RedisCommand<?, ?, ?>> queue) {
545+
private void flushCommands(Channel channel, Queue<RedisCommand<?, ?, ?>> queue) {
547546

548547
if (debugEnabled) {
549548
logger.debug("{} flushCommands()", logPrefix());
550549
}
551550

552-
if (isConnected()) {
551+
if (isConnected(channel)) {
553552

554553
List<RedisCommand<?, ?, ?>> commands = sharedLock.doExclusive(() -> {
555554

@@ -565,7 +564,7 @@ private void flushCommands(Queue<RedisCommand<?, ?, ?>> queue) {
565564
}
566565

567566
if (!commands.isEmpty()) {
568-
writeToChannelAndFlush(commands);
567+
writeToChannelAndFlush(channel, commands);
569568
}
570569
}
571570
}
@@ -628,10 +627,10 @@ public void disconnect() {
628627

629628
private Channel getOpenChannel() {
630629

631-
Channel currentChannel = this.channel;
630+
Channel channel = this.channel;
632631

633-
if (currentChannel != null) {
634-
return currentChannel;
632+
if (channel != null /* && channel.isOpen() is this deliberately omitted? */) {
633+
return channel;
635634
}
636635

637636
return null;
@@ -648,6 +647,7 @@ public void reset() {
648647
logger.debug("{} reset()", logPrefix());
649648
}
650649

650+
Channel channel = this.channel;
651651
if (channel != null) {
652652
channel.pipeline().fireUserEventTriggered(new ConnectionEvents.Reset());
653653
}
@@ -720,9 +720,7 @@ public void notifyDrainQueuedCommands(HasQueuedCommands queuedCommands) {
720720
}
721721
}
722722

723-
if (isConnected()) {
724-
flushCommands(disconnectedBuffer);
725-
}
723+
flushCommands(this.channel, disconnectedBuffer);
726724
});
727725
}
728726

@@ -787,9 +785,7 @@ private void cancelCommands(String message, Iterable<? extends RedisCommand<?, ?
787785
}
788786
}
789787

790-
private boolean isConnected() {
791-
792-
Channel channel = this.channel;
788+
private boolean isConnected(Channel channel) {
793789
return channel != null && channel.isActive();
794790
}
795791

src/test/java/io/lettuce/core/protocol/DefaultEndpointUnitTests.java

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,34 @@ void before() {
136136
sut.setConnectionFacade(connectionFacade);
137137
}
138138

139+
@Test
140+
void writeShouldGuaranteeFIFOOrder() {
141+
sut.write(Collections.singletonList(new Command<>(CommandType.SELECT, new StatusOutput<>(StringCodec.UTF8))));
142+
143+
sut.registerConnectionWatchdog(connectionWatchdog);
144+
doAnswer(i -> sut.write(new Command<>(CommandType.AUTH, new StatusOutput<>(StringCodec.UTF8)))).when(connectionWatchdog)
145+
.arm();
146+
when(channel.isActive()).thenReturn(true);
147+
148+
sut.notifyChannelActive(channel);
149+
150+
DefaultChannelPromise promise = new DefaultChannelPromise(channel, ImmediateEventExecutor.INSTANCE);
151+
152+
when(channel.writeAndFlush(any())).thenAnswer(invocation -> {
153+
if (invocation.getArguments()[0] instanceof RedisCommand) {
154+
queue.add((RedisCommand) invocation.getArguments()[0]);
155+
}
156+
157+
if (invocation.getArguments()[0] instanceof Collection) {
158+
queue.addAll((Collection) invocation.getArguments()[0]);
159+
}
160+
return promise;
161+
});
162+
163+
assertThat(queue).hasSize(2).first().hasFieldOrPropertyWithValue("type", CommandType.SELECT);
164+
assertThat(queue).hasSize(2).last().hasFieldOrPropertyWithValue("type", CommandType.AUTH);
165+
}
166+
139167
@Test
140168
void writeConnectedShouldWriteCommandToChannel() {
141169

@@ -396,11 +424,9 @@ void shouldNotReplayActivationCommands() {
396424

397425
when(channel.isActive()).thenReturn(true);
398426
ConnectionTestUtil.getDisconnectedBuffer(sut)
399-
.add(new ActivationCommand<>(
400-
new Command<>(CommandType.SELECT, new StatusOutput<>(StringCodec.UTF8))));
427+
.add(new ActivationCommand<>(new Command<>(CommandType.SELECT, new StatusOutput<>(StringCodec.UTF8))));
401428
ConnectionTestUtil.getDisconnectedBuffer(sut).add(new LatencyMeteredCommand<>(
402-
new ActivationCommand<>(
403-
new Command<>(CommandType.SUBSCRIBE, new StatusOutput<>(StringCodec.UTF8)))));
429+
new ActivationCommand<>(new Command<>(CommandType.SUBSCRIBE, new StatusOutput<>(StringCodec.UTF8)))));
404430

405431
doAnswer(i -> {
406432

0 commit comments

Comments
 (0)