Skip to content

Write race condition while migrating/importing a slot #1218

@phyok

Description

@phyok

Bug Report

Current Behavior

While a slot with a lot of data is being migrated from one master M1 to another master M2, the slot will be in MIGRATING state in M1 and IMPORTING state in M2 for sometime due to the large amount of data being copied.

During this state, when a command C1 was sent to M1 and M1 reply with ASK M2, lettuce might not send ASKING immediately followed by C1 to M2. In this scenario, the client will be bounced back and forth between M1 and M2 until ASKING + C1 are sent consecutively to M2 or the maxRedirect count is reached. In the example below, we can see that the client is sending other commands between ASKING and C1.

The probability of the above scenario happening increases with the amount of data in the slot and the QPS the client is handling. The symptom can be alleviated by increasing the maxRedirect count as this gives lettuce more retries to get ASKING + C1 commands sent consecutively.

Traces

Offending Key: rct--287151481

slot being moved: 441

tcpflow trace on M1

$ grep 'rct--287151481' -B 10 -A 10 src.txt

rct-228385639
$26
xxxxxxxxxxxxxxxxxxxxxxxxxx

[MASTER1_IP].25620-[CLIENT_IP].33616: +OK

[CLIENT_IP].33616-[MASTER1_IP].25620: *3
$3
SET
$14
rct--287151481
$26
xxxxxxxxxxxxxxxxxxxxxxxxxx

[MASTER1_IP].25620-[CLIENT_IP].33616: -ASK 441 [MASTER2_IP]:25649

[CLIENT_IP].33616-[MASTER1_IP].25620: *3
$3
SET
$14
rct--896187229

tcpflow trace on M2

$ grep 'rct--287151481' -B 30 -A 10 dst.txt
[MASTER2_IP].25649-[CLIENT_IP].48852: +OK

[CLIENT_IP].48852-[MASTER2_IP].25649: *1
$6
ASKING

[MASTER2_IP].25649-[CLIENT_IP].48852: +OK

[CLIENT_IP].48852-[MASTER2_IP].25649: *3
$3
SET
$13
rct-926126166
$26
xxxxxxxxxxxxxxxxxxxxxxxxxx

[MASTER2_IP].25649-[REPLICA2_IP].38381: *3
$3
SET
$13
rct-926126166
$26
xxxxxxxxxxxxxxxxxxxxxxxxxx

[MASTER2_IP].25649-[CLIENT_IP].48852: +OK

[CLIENT_IP].48852-[MASTER2_IP].25649: *3
$3
SET
$14
rct--287151481
$26
xxxxxxxxxxxxxxxxxxxxxxxxxx

[MASTER2_IP].25649-[CLIENT_IP].48852: -MOVED 441 [MASTER1_IP]:25620

[CLIENT_IP].48852-[MASTER2_IP].25649: *3
$3
SET
$13
rct-672494303
io.lettuce.core.RedisCommandExecutionException: MOVED 441 [MASTER1_IP]:25620
        at io.lettuce.core.ExceptionFactory.createExecutionException(ExceptionFactory.java:135)
        at io.lettuce.core.LettuceFutures.awaitOrCancel(LettuceFutures.java:122)
        at io.lettuce.core.cluster.ClusterFutureSyncInvocationHandler.handleInvocation(ClusterFutureSyncInvocationHandler.java:123)
        at io.lettuce.core.internal.AbstractInvocationHandler.invoke(AbstractInvocationHandler.java:80)
        at com.sun.proxy.$Proxy7.set(Unknown Source)
        at redis.benchmark.client.Lettuce.setValue(Lettuce.java:41)
        at redis.benchmark.Benchmark.lambda$runBenchmark$1(Benchmark.java:89)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: io.lettuce.core.RedisCommandExecutionException: MOVED 441 [MASTER1_IP]:25620
        at io.lettuce.core.ExceptionFactory.createExecutionException(ExceptionFactory.java:135)
        at io.lettuce.core.ExceptionFactory.createExecutionException(ExceptionFactory.java:108)
        at io.lettuce.core.protocol.AsyncCommand.completeResult(AsyncCommand.java:120)
        at io.lettuce.core.protocol.AsyncCommand.complete(AsyncCommand.java:111)
        at io.lettuce.core.protocol.CommandWrapper.complete(CommandWrapper.java:59)
        at io.lettuce.core.cluster.ClusterCommand.complete(ClusterCommand.java:63)
        at io.lettuce.core.protocol.CommandHandler.complete(CommandHandler.java:652)
        at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:612)
        at io.lettuce.core.protocol.CommandHandler.channelRead(CommandHandler.java:563)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514)
        at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        ... 1 more

Input Code

Input Code
package redis.benchmark.client;

import io.lettuce.core.RedisURI;
import io.lettuce.core.cluster.ClusterClientOptions;
import io.lettuce.core.cluster.ClusterTopologyRefreshOptions;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.cluster.api.sync.RedisClusterCommands;
import java.time.Duration;

public class LettuceClient implements ClusterClient {

  private RedisClusterClient client;
  private StatefulRedisClusterConnection conn;

  LettuceClient(String host, int port) {

    ClusterTopologyRefreshOptions refreshOptions =
        ClusterTopologyRefreshOptions.builder().enableAllAdaptiveRefreshTriggers().build();

    ClusterClientOptions opts =
        ClusterClientOptions.builder()
            .topologyRefreshOptions(refreshOptions)
            .maxRedirects(5)
            .build();

    this.client =
        io.lettuce.core.cluster.RedisClusterClient.create(
            RedisURI.builder()
                .withHost(host)
                .withPort(port)
                .withTimeout(Duration.ofSeconds(10L))
                .build());
    this.client.setOptions(opts);
    this.conn = client.connect();
  }

  @Override
  public void setValue(String key, String value) {
    RedisClusterCommands commands = conn.sync();
    commands.set(key, value);
  }

  @Override
  public void close() {
    conn.close();
  }
}

Benchmark.java

 public void runBenchmark() throws InterruptedException {
    Runtime rt = Runtime.getRuntime();
    long bytesBefore = rt.totalMemory();
    long startTime = System.nanoTime();

    ticker.scheduleAtFixedRate(
        () -> opsTrend.offer(opsSoFar.intValue()),
        TICKER_INTIAL_DELAY,
        TICKER_PERIOD,
        TimeUnit.SECONDS);

    for (int i = 0; i < threadCount; i++) {
      executor.execute(
          () -> {
            while (opsSoFar.getAndIncrement() < totalOps) {
              String key = KEY_PREFIX + ThreadLocalRandom.current().nextInt();
              long t0 = System.nanoTime();
              try {
                redisClient.setValue(key, DATA); //redisClient is an instance of LettuceClient
              } catch (Exception e) {
                e.printStackTrace();
                errorCount.incrementAndGet();
              }
              long elapsedTime = System.nanoTime() - t0;
              latencies.offer(elapsedTime);
            }
            latch.countDown();
          });
    }

    latch.await();
    totalTime = System.nanoTime() - startTime;
    executor.shutdown();

    // submit the last batch of ops before shutting down the ticker.
    // Subtract threadCount since each thread incremented an extra time
    ticker.execute(() -> opsTrend.offer(opsSoFar.intValue() - threadCount));
    ticker.shutdown();
    redisClient.close();
    bytesAlloc = rt.totalMemory() - bytesBefore;
  }

Expected behavior/code

The command C1 above should immediately follow the "ASKING" request. Please see redis doc for more details.

Environment

  • Lettuce version(s): 5.2.0.RELEASE
  • Redis version: [e.g. 4.0.9]

Possible Solution

Additional context

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions