Skip to content

Conversation

@guojialiang92
Copy link
Contributor

@guojialiang92 guojialiang92 commented Dec 16, 2025

Description

The purpose of this PR is to introduce the translog concurrent recovery mechanism mentioned in #20131, which is used to accelerate the primary promotion of segment replication.

Default concurrency strategy

  • Translog concurrent recovery is restricted to use in the primary promotion scenario of segment replication.
  • Introduce a dedicated thread pool translog_recovery with a size equal to the number of cores and an unbounded queue.
  • Introduce cluster-level dynamic configuration indices.translog_concurrent_recovery.enable, which is used to enable translog concurrent recovery, with a default value of false.
  • Introduce cluster-level dynamic configuration indices.translog_concurrent_recovery.batch_size, which is used to represent the number of translog operations processed by a single thread, with a default value of 500,000.
  • When the number of translog recovery operations is less than indices.translog_concurrent_recovery.batch_size, concurrency is not enabled, remaining consistent with the current execution logic.
  • When the number of translog recovery operations exceeds indices.translog_concurrent_recovery.batch_size.
    • If translog_recovery is not busy, enable concurrency, with each thread responsible for executing the number of operations specified by indices.translog_concurrent_recovery.batch_size.
    • If translog_recovery is busy, do not enable concurrency, keeping it consistent with the current execution logic.

Related Issues

Resolves #[20131]

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Summary by CodeRabbit

  • New Features

    • Added dynamic cluster settings to enable concurrent translog recovery and to configure batch size.
    • Introduced a dedicated translog-recovery thread pool and batched parallel translog recovery to speed shard recovery when conditions allow, with automatic fallback to the sequential path.
  • Tests

    • Added end-to-end tests validating concurrent translog recovery, primary promotion, replication, and translog/Lucene state consistency.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link

coderabbitai bot commented Dec 16, 2025

Walkthrough

Adds dynamic settings to enable batched concurrent translog recovery, registers them as cluster settings, introduces a dedicated TRANSLOG_RECOVERY thread pool, implements conditional parallel per-batch translog recovery in IndexShard (with sequential fallback), and updates tests and test-framework to exercise the new path.

Changes

Cohort / File(s) Summary
Settings & Cluster registration
server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java, server/src/main/java/org/opensearch/common/settings/ClusterSettings.java
Add INDICES_TRANSLOG_CONCURRENT_RECOVERY_ENABLE (boolean) and INDICES_TRANSLOG_CONCURRENT_RECOVERY_BATCH_SIZE (long): fields, accessors, mutators, update consumers, and include them in built-in cluster settings.
Thread pool
server/src/main/java/org/opensearch/threadpool/ThreadPool.java
Add Names.TRANSLOG_RECOVERY, map it to ThreadPoolType.FIXED, and register a FixedExecutorBuilder for TRANSLOG_RECOVERY (size = allocated processors).
Parallel recovery implementation
server/src/main/java/org/opensearch/index/shard/IndexShard.java
Implement conditional parallel translog recovery: check settings and pool availability, split translog into batches, create per-batch snapshots, submit per-batch recovery tasks to TRANSLOG_RECOVERY collecting Future<Integer>, aggregate counts/exceptions, ensure snapshot cleanup; fall back to sequential recovery when not applicable.
Test framework
test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java
Add addReplica(Path remotePath, RecoverySettings recoverySettings) overload and use it in ReplicationGroup to create replicas with specific RecoverySettings and merged segment publisher.
Integration test
server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithNodeToNodeIndexShardTests.java
Add testPrimaryPromotionWithConcurrentTranslogRecovery() and a shared helper doPrimaryPromotion(...) to exercise primary/replica promotion with concurrent translog recovery and merged segment publisher.

Sequence Diagram

sequenceDiagram
    participant Initiator as Recovery initiator
    participant IndexShard
    participant RecoverySettings
    participant TranslogMgr as TranslogManager
    participant ThreadPool as TRANSLOG_RECOVERY
    participant Engine

    Initiator->>IndexShard: start translog recovery
    IndexShard->>RecoverySettings: check concurrent enabled & batchSize
    alt concurrent enabled & pool available & ops > batchSize
        IndexShard->>TranslogMgr: obtain full translog snapshot
        IndexShard->>IndexShard: compute batch boundaries
        loop per batch
            IndexShard->>ThreadPool: submit batch recovery task (snapshot slice)
            ThreadPool->>Engine: apply batch operations
            Engine-->>ThreadPool: result / exception
            ThreadPool-->>IndexShard: Future completes
            IndexShard->>TranslogMgr: close per-batch snapshot
        end
        alt any failure
            IndexShard->>Initiator: throw aggregated IOException
        else
            IndexShard->>Initiator: return total recovered ops
        end
    else fallback
        IndexShard->>TranslogMgr: single-snapshot sequential recovery
        TranslogMgr->>Engine: apply ops serially
        Engine-->>IndexShard: recovery complete
        IndexShard->>Initiator: return recovered ops
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

  • Review concurrency, Future aggregation, and exception consolidation in IndexShard.java.
  • Verify per-batch translog snapshot lifecycle and finally-block cleanup.
  • Confirm correct thread pool registration, sizing, and ThreadPoolType mapping in ThreadPool.java.
  • Validate dynamic settings registration, update consumers, and inclusion in ClusterSettings.java.
  • Check new tests and test-framework overload for correctness and flakiness.

Suggested labels

enhancement, Indexing:Replication

Suggested reviewers

  • msfroh
  • dbwiddis
  • cwperks
  • mch2
  • reta
  • sachinpkale
  • anasalkouz
  • gbbafna
  • shwetathareja
  • kotwanikunal
  • saratvemulapalli
  • jed326

Poem

🐰 I split the logs in batches bright,
Threads hopped in parallel light,
Snapshots closed with tidy care,
Counts summed up, exceptions snared,
A rabbit cheers — recovery's right!

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 17.65% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and concisely describes the main change: introducing concurrent translog recovery to accelerate segment replication primary promotion, which aligns with the core purpose of the PR.
Description check ✅ Passed The description includes all required sections: Description (explaining the purpose and default concurrency strategy in detail), Related Issues (resolving #20131), and completed checklist items indicating testing, API changes, and documentation were addressed.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithNodeToNodeIndexShardTests.java (1)

718-802: Well-structured test for concurrent translog recovery.

The test adequately exercises the concurrent translog recovery path with 200-300 documents and a batch size of 10, creating 20-30 concurrent batches. The assertions comprehensively verify engine types, document counts, translog statistics, and history consistency.

One observation: the batch size of 10 is significantly smaller than the production default of 50,000. While this is intentional for testing, consider adding a brief comment explaining this choice to help future maintainers understand the test design.

+    // Use a small batch size to ensure multiple concurrent batches are created during recovery
     @TestLogging(reason = "Getting trace logs from IndexShard", value = "org.opensearch.index.shard.IndexShard:TRACE")
     public void testPrimaryPromotionWithConcurrentTranslogRecovery() throws Exception {
         final RecoverySettings recoverySettings = new RecoverySettings(
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e798353 and ac424e3.

📒 Files selected for processing (6)
  • server/src/main/java/org/opensearch/common/settings/ClusterSettings.java (1 hunks)
  • server/src/main/java/org/opensearch/index/shard/IndexShard.java (4 hunks)
  • server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java (5 hunks)
  • server/src/main/java/org/opensearch/threadpool/ThreadPool.java (3 hunks)
  • server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithNodeToNodeIndexShardTests.java (2 hunks)
  • test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java (2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (20)
  • GitHub Check: gradle-check
  • GitHub Check: assemble (21, windows-latest)
  • GitHub Check: assemble (21, ubuntu-latest)
  • GitHub Check: assemble (25, ubuntu-latest)
  • GitHub Check: assemble (25, ubuntu-24.04-arm)
  • GitHub Check: assemble (21, ubuntu-24.04-arm)
  • GitHub Check: assemble (25, windows-latest)
  • GitHub Check: detect-breaking-change
  • GitHub Check: precommit (21, windows-2025, true)
  • GitHub Check: Analyze (java)
  • GitHub Check: precommit (25, macos-15-intel)
  • GitHub Check: precommit (21, macos-15)
  • GitHub Check: precommit (21, ubuntu-24.04-arm)
  • GitHub Check: precommit (21, ubuntu-latest)
  • GitHub Check: precommit (25, macos-15)
  • GitHub Check: precommit (25, windows-latest)
  • GitHub Check: precommit (25, ubuntu-latest)
  • GitHub Check: precommit (21, windows-latest)
  • GitHub Check: precommit (21, macos-15-intel)
  • GitHub Check: precommit (25, ubuntu-24.04-arm)
🔇 Additional comments (11)
test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java (2)

314-316: LGTM - Constructor correctly uses new overload with RecoverySettings.

The constructor now properly passes recoverySettings to replicas, enabling concurrent translog recovery testing scenarios.


472-487: LGTM - New overload properly supports RecoverySettings injection.

The new addReplica(Path, RecoverySettings) overload correctly constructs an IndexShard with the provided RecoverySettings, enabling tests to exercise concurrent translog recovery paths.

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java (1)

349-350: LGTM - New cluster settings properly registered.

The concurrent translog recovery settings are correctly added to BUILT_IN_CLUSTER_SETTINGS, enabling dynamic configuration updates.

server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithNodeToNodeIndexShardTests.java (1)

38-40: LGTM - Necessary imports added for test infrastructure.

server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java (4)

269-270: LGTM - Thread-safe volatile fields for concurrent access.


306-307: LGTM - Settings properly initialized from constructor.


347-351: LGTM - Dynamic setting update consumers properly registered.


521-535: LGTM - Standard accessor pattern with private setters.

The public getters and private setters follow the established pattern for cluster settings in this class.

server/src/main/java/org/opensearch/threadpool/ThreadPool.java (3)

102-102: LGTM - New thread pool name constant added.


183-183: LGTM - Thread pool type correctly mapped to FIXED.

A FIXED pool type is appropriate for translog recovery to provide bounded parallelism.


263-263: Verify unbounded queue is intentional for TRANSLOG_RECOVERY pool.

The queue size of -1 creates an unbounded queue. During large translog recoveries, this could lead to memory pressure if many recovery tasks are queued. Consider whether a bounded queue with a reasonable limit would be more appropriate, similar to other fixed pools like WRITE (queue size 10000) or TRANSLOG_SYNC (queue size 10000).

If unbounded is intentional (e.g., because concurrent recovery is already gated by settings), please document this design choice.

@github-actions
Copy link
Contributor

❌ Gradle check result for ac424e3: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Signed-off-by: guojialiang <[email protected]>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
server/src/main/java/org/opensearch/index/shard/IndexShard.java (1)

5281-5336: Concurrent translog recovery runner looks solid; consider loosening InternalTranslogManager coupling and documenting assumptions

The new TranslogRecoveryRunner implementation in resetEngineToGlobalCheckpoint() generally looks correct and addresses earlier issues:

  • Snapshots created for each batch are now always closed in finally via IOUtils.closeWhileHandlingException, so there is no longer a leak if any task fails.
  • Batch loop bounds use a standard ceil‑division pattern, avoiding the extra empty batch when totalOperations is an exact multiple of batchSize.
  • Exceptions from individual futures are aggregated into a single IOException, preserving all suppressed causes while still returning the total recovered op count on success.
  • Concurrency is conservatively gated on:
    • recoverySettings.isTranslogConcurrentRecoveryEnable(),
    • indexSettings.isSegRepEnabledOrRemoteNode(),
    • totalOperations > batchSize, and
    • the TRANSLOG_RECOVERY executor queue being empty,
      so you fall back to the well‑tested sequential path when conditions aren’t favorable.

A few follow‑ups worth considering:

  1. Avoid hard dependency on InternalTranslogManager where possible.
    The assert translogManager instanceof InternalTranslogManager; and the downcast to call getTranslog().newSnapshot(start, end) tie this path to a specific implementation. If you expect other TranslogManager implementations (plugins, future remote variants), it might be cleaner to:

    • either expose a newSnapshot(long fromSeqNo, long toSeqNo) style API on TranslogManager, or
    • guard the concurrent path with an instanceof and fall back to sequential recovery when the manager is not InternalTranslogManager.
      Right now the assert only fires with assertions enabled; in production we’d get a ClassCastException instead.
  2. Clarify assumptions around partitioning by seq_no.
    The batching logic derives [start, end] ranges from engine.getProcessedLocalCheckpoint() and batchSize, and uses snapshot.totalOperations() purely to compute the number of batches. This relies on translog invariants (strictly increasing seq_nos, no concurrent appends because operations are blocked during resetEngineToGlobalCheckpoint) so that the union of your per‑batch snapshots matches the original recovery range. A short comment here explaining that assumption would help future maintainers understand why it’s safe to ignore the passed snapshot other than for totalOperations().

  3. TRANSLOG_RECOVERY executor assumption.
    The cast to OpenSearchThreadPoolExecutor and queue emptiness check are fine with today’s FixedExecutorBuilder setup for ThreadPool.Names.TRANSLOG_RECOVERY, but they implicitly assume that name will always map to an OpenSearchThreadPoolExecutor. If that ever changes, a runtime ClassCastException will be thrown. A brief comment noting the expectation (or an instanceof guard that falls back to sequential recovery if the cast fails) would make this more robust.

If you’re comfortable with the current constraints (internal manager only and fixed executor type), this can merge as‑is; otherwise the above refactors are low‑risk and can be done now or in a follow‑up.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ac424e3 and ad8564b.

📒 Files selected for processing (2)
  • server/src/main/java/org/opensearch/index/shard/IndexShard.java (4 hunks)
  • server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java (5 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
server/src/main/java/org/opensearch/index/shard/IndexShard.java (3)
server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java (1)
  • InternalTranslogManager (38-480)
server/src/main/java/org/opensearch/threadpool/ThreadPool.java (1)
  • Names (99-130)
libs/common/src/main/java/org/opensearch/common/util/io/IOUtils.java (1)
  • IOUtils (58-317)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (19)
  • GitHub Check: gradle-check
  • GitHub Check: assemble (25, ubuntu-24.04-arm)
  • GitHub Check: assemble (21, ubuntu-latest)
  • GitHub Check: assemble (21, windows-latest)
  • GitHub Check: assemble (21, ubuntu-24.04-arm)
  • GitHub Check: precommit (21, ubuntu-24.04-arm)
  • GitHub Check: assemble (25, ubuntu-latest)
  • GitHub Check: precommit (21, macos-15-intel)
  • GitHub Check: precommit (25, ubuntu-latest)
  • GitHub Check: assemble (25, windows-latest)
  • GitHub Check: precommit (21, windows-2025, true)
  • GitHub Check: precommit (21, macos-15)
  • GitHub Check: precommit (25, ubuntu-24.04-arm)
  • GitHub Check: precommit (21, windows-latest)
  • GitHub Check: precommit (25, macos-15-intel)
  • GitHub Check: precommit (21, ubuntu-latest)
  • GitHub Check: precommit (25, windows-latest)
  • GitHub Check: Analyze (java)
  • GitHub Check: detect-breaking-change
🔇 Additional comments (2)
server/src/main/java/org/opensearch/index/shard/IndexShard.java (1)

106-107: New imports align with concurrent translog recovery usage

The added imports (OpenSearchThreadPoolExecutor, InternalTranslogManager, TranslogManager, Future) are consistent with the new concurrent recovery runner and TRANSLOG_RECOVERY thread pool usage; no issues here.

Also applies to: 187-195, 246-247

server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java (1)

233-246: Concurrent translog recovery settings wiring is correct; clarify intended default batch size

The settings, backing fields, constructor initialization, and dynamic update consumers are all consistent and thread‑safe; min 1 on INDICES_TRANSLOG_CONCURRENT_RECOVERY_BATCH_SIZE prevents division-by-zero.

The same batch_size setting serves dual purposes in IndexShard:

  • Threshold for switching from single‑threaded to concurrent recovery (totalOperations > batchSize, line 5290)
  • Per-thread batch size, calculated as (totalOperations + batchSize - 1) / batchSize (line 5296)

This dual-use is correct but worth documenting: tuning the batch size affects both when concurrency activates and how work is divided per thread. If separate tuning of "concurrency threshold" vs "operations per thread" is needed later, consider splitting into two settings.

Please confirm the intended default of 50,000 operations for this batch size.

Signed-off-by: guojialiang <[email protected]>
Signed-off-by: guojialiang <[email protected]>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
server/src/main/java/org/opensearch/index/shard/IndexShard.java (1)

5279-5332: Make concurrent translog recovery robust to snapshot/submit failures and tighten batch logic

The concurrent TranslogRecoveryRunner is close, but there are two important issues:

  1. Snapshot leak on failures in the creation loop

    If either:

    • engine.translogManager().newChangesSnapshot(start, end, false) throws, or
    • threadPool.executor(...).submit(...) throws (e.g., RejectedExecutionException during shutdown),

    the method exits before reaching the second loop that closes snapshots. Any snapshots created in prior iterations of the loop are never closed, leaking file descriptors until shard/engine shutdown.

    To fix this, wrap the batch creation + execution phase in a try/finally that always closes every Translog.Snapshot you created, and explicitly close the snapshot if submit() fails before it’s added to the list.

  2. Cleaner last-batch detection

    The last-batch check i == totalOperations / batchSize relies on the same arithmetic as the loop bound but is harder to read and easy to get wrong during future changes. Precomputing the batch count and checking against batches - 1 is clearer and matches the ceil formula used in the loop bound.

A minimally invasive refactor that addresses both points:

-        final TranslogRecoveryRunner translogRunner = (snapshot) -> {
-            Engine engine = newEngineReference.get();
-            assert null != engine;
-            int totalOperations = snapshot.totalOperations();
-            long batchSize = recoverySettings.getTranslogConcurrentRecoveryBatchSize();
-            boolean threadPoolNotBusy = ((OpenSearchThreadPoolExecutor) threadPool.executor(ThreadPool.Names.TRANSLOG_RECOVERY)).getQueue()
-                .isEmpty();
-            if (recoverySettings.isTranslogConcurrentRecoveryEnable()
-                && indexSettings.isSegRepEnabledOrRemoteNode()
-                && totalOperations > batchSize
-                && threadPoolNotBusy) {
-                long localCheckpoint = engine.getProcessedLocalCheckpoint();
-                List<Tuple<Future<Integer>, Translog.Snapshot>> translogSnapshotsFutureList = new ArrayList<>();
-                for (int i = 0; i < (totalOperations + batchSize - 1) / batchSize; i++) {
-                    long start = localCheckpoint + 1 + (long) i * batchSize;
-                    long end = (i == totalOperations / batchSize) ? Long.MAX_VALUE : start + batchSize - 1;
-                    Translog.Snapshot translogSnapshot = engine.translogManager().newChangesSnapshot(start, end, false);
-                    translogSnapshotsFutureList.add(
-                        new Tuple<>(
-                            threadPool.executor(ThreadPool.Names.TRANSLOG_RECOVERY)
-                                .submit(() -> runTranslogRecovery(engine, translogSnapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {
-                                    // TODO: add a dedicate recovery stats for the reset translog
-                                })),
-                            translogSnapshot
-                        )
-                    );
-                }
-                Exception exception = null;
-                int totalRecovered = 0;
-                for (Tuple<Future<Integer>, Translog.Snapshot> translogSnapshotFuture : translogSnapshotsFutureList) {
-                    try {
-                        int recoveredOps = translogSnapshotFuture.v1().get();
-                        totalRecovered += recoveredOps;
-                    } catch (Exception e) {
-                        if (exception == null) {
-                            exception = e;
-                        } else {
-                            exception.addSuppressed(e);
-                        }
-                    } finally {
-                        Translog.Snapshot translogSnapshot = translogSnapshotFuture.v2();
-                        IOUtils.closeWhileHandlingException(translogSnapshot);
-                    }
-                }
-                if (exception != null) {
-                    throw new IOException("generate exception when concurrent translog recovery", exception);
-                }
-                return totalRecovered;
-            } else {
-                return runTranslogRecovery(newEngineReference.get(), snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {
-                    // TODO: add a dedicate recovery stats for the reset translog
-                });
-            }
-        };
+        final TranslogRecoveryRunner translogRunner = (snapshot) -> {
+            Engine engine = newEngineReference.get();
+            assert engine != null;
+            final int totalOperations = snapshot.totalOperations();
+            final long batchSize = recoverySettings.getTranslogConcurrentRecoveryBatchSize();
+            final boolean threadPoolNotBusy = ((OpenSearchThreadPoolExecutor) threadPool.executor(ThreadPool.Names.TRANSLOG_RECOVERY))
+                .getQueue()
+                .isEmpty();
+
+            if (recoverySettings.isTranslogConcurrentRecoveryEnable()
+                && indexSettings.isSegRepEnabledOrRemoteNode()
+                && totalOperations > batchSize
+                && threadPoolNotBusy) {
+                final long localCheckpoint = engine.getProcessedLocalCheckpoint();
+                final long batches = (totalOperations + batchSize - 1) / batchSize;
+                final List<Tuple<Future<Integer>, Translog.Snapshot>> translogSnapshotsFutureList = new ArrayList<>();
+                Exception exception = null;
+                int totalRecovered = 0;
+
+                try {
+                    for (long i = 0; i < batches; i++) {
+                        final long start = localCheckpoint + 1 + i * batchSize;
+                        final long end = (i == batches - 1) ? Long.MAX_VALUE : start + batchSize - 1;
+                        final Translog.Snapshot translogSnapshot = engine.translogManager().newChangesSnapshot(start, end, false);
+                        try {
+                            Future<Integer> future = threadPool.executor(ThreadPool.Names.TRANSLOG_RECOVERY)
+                                .submit(
+                                    () -> runTranslogRecovery(
+                                        engine,
+                                        translogSnapshot,
+                                        Engine.Operation.Origin.LOCAL_RESET,
+                                        () -> {
+                                            // TODO: add a dedicate recovery stats for the reset translog
+                                        }
+                                    )
+                                );
+                            translogSnapshotsFutureList.add(new Tuple<>(future, translogSnapshot));
+                        } catch (RuntimeException e) {
+                            IOUtils.closeWhileHandlingException(translogSnapshot);
+                            throw e;
+                        }
+                    }
+
+                    for (Tuple<Future<Integer>, Translog.Snapshot> translogSnapshotFuture : translogSnapshotsFutureList) {
+                        try {
+                            int recoveredOps = translogSnapshotFuture.v1().get();
+                            totalRecovered += recoveredOps;
+                        } catch (Exception e) {
+                            if (exception == null) {
+                                exception = e;
+                            } else {
+                                exception.addSuppressed(e);
+                            }
+                        }
+                    }
+
+                    if (exception != null) {
+                        throw new IOException("exception during concurrent translog recovery", exception);
+                    }
+                    return totalRecovered;
+                } finally {
+                    for (Tuple<Future<Integer>, Translog.Snapshot> translogSnapshotFuture : translogSnapshotsFutureList) {
+                        IOUtils.closeWhileHandlingException(translogSnapshotFuture.v2());
+                    }
+                }
+            } else {
+                return runTranslogRecovery(newEngineReference.get(), snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {
+                    // TODO: add a dedicate recovery stats for the reset translog
+                });
+            }
+        };

This keeps the overall behavior and conditions intact, but ensures snapshots are always closed and makes the batch structure easier to reason about.

🧹 Nitpick comments (2)
server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithNodeToNodeIndexShardTests.java (2)

777-781: Consider verifying that concurrent recovery was actually used.

While the test correctly configures concurrent translog recovery and verifies the end result, it doesn't explicitly verify that the concurrent code path was exercised. Consider adding assertions or logging to confirm that concurrent recovery actually occurred (e.g., checking thread pool activity or recovery metrics).


717-800: Consider extracting common logic to reduce duplication.

This test method shares significant logic with testNRTReplicaPromotedAsPrimary (lines 533-599). While the duplication is reasonable for testing different scenarios, consider extracting the common test flow into a helper method that accepts RecoverySettings and MergedSegmentPublisher as parameters. This would improve maintainability if the test logic needs to change in the future.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between eff2f31 and cd7419c.

📒 Files selected for processing (2)
  • server/src/main/java/org/opensearch/index/shard/IndexShard.java (3 hunks)
  • server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithNodeToNodeIndexShardTests.java (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
server/src/main/java/org/opensearch/index/shard/IndexShard.java (2)
server/src/main/java/org/opensearch/threadpool/ThreadPool.java (1)
  • Names (99-130)
server/src/main/java/org/opensearch/index/engine/Engine.java (1)
  • Operation (1503-1603)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (20)
  • GitHub Check: gradle-check
  • GitHub Check: precommit (21, windows-2025, true)
  • GitHub Check: precommit (25, windows-latest)
  • GitHub Check: precommit (25, ubuntu-latest)
  • GitHub Check: precommit (21, macos-15)
  • GitHub Check: precommit (25, macos-15-intel)
  • GitHub Check: precommit (25, macos-15)
  • GitHub Check: precommit (25, ubuntu-24.04-arm)
  • GitHub Check: precommit (21, macos-15-intel)
  • GitHub Check: precommit (21, ubuntu-latest)
  • GitHub Check: precommit (21, windows-latest)
  • GitHub Check: precommit (21, ubuntu-24.04-arm)
  • GitHub Check: assemble (21, windows-latest)
  • GitHub Check: assemble (21, ubuntu-24.04-arm)
  • GitHub Check: assemble (25, ubuntu-latest)
  • GitHub Check: assemble (25, ubuntu-24.04-arm)
  • GitHub Check: assemble (21, ubuntu-latest)
  • GitHub Check: assemble (25, windows-latest)
  • GitHub Check: detect-breaking-change
  • GitHub Check: Analyze (java)
🔇 Additional comments (7)
server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithNodeToNodeIndexShardTests.java (6)

38-38: LGTM!

The import is correctly added to support the new test method's usage of MergedSegmentPublisher.EMPTY.


718-724: LGTM!

The RecoverySettings configuration correctly enables concurrent translog recovery with a small batch size (10) to ensure the concurrent code path is exercised when larger document counts are indexed.


725-738: LGTM!

The replication group setup correctly passes the custom RecoverySettings and MergedSegmentPublisher.EMPTY to enable concurrent translog recovery testing.


740-752: LGTM!

The initial document indexing, replication, and assertions correctly verify the starting state before testing concurrent translog recovery.


773-775: LGTM!

The primary promotion logic correctly syncs the global checkpoint before promoting the replica.


783-798: LGTM!

The final assertions comprehensively verify engine types, document counts, translog state, and consistency across all shards after concurrent translog recovery.

server/src/main/java/org/opensearch/index/shard/IndexShard.java (1)

105-107: Imports for concurrent translog recovery look correct

OpenSearchThreadPoolExecutor and Future are appropriately imported and used in the new concurrent translog recovery logic; no issues here.

Also applies to: 243-245

@github-actions
Copy link
Contributor

❌ Gradle check result for cd7419c: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Signed-off-by: guojialiang <[email protected]>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between cd7419c and a41f8e9.

📒 Files selected for processing (2)
  • server/src/main/java/org/opensearch/index/shard/IndexShard.java (3 hunks)
  • server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithNodeToNodeIndexShardTests.java (2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (20)
  • GitHub Check: gradle-check
  • GitHub Check: Analyze (java)
  • GitHub Check: precommit (21, windows-2025, true)
  • GitHub Check: assemble (21, ubuntu-24.04-arm)
  • GitHub Check: precommit (25, ubuntu-24.04-arm)
  • GitHub Check: precommit (25, macos-15-intel)
  • GitHub Check: precommit (25, macos-15)
  • GitHub Check: precommit (21, ubuntu-latest)
  • GitHub Check: precommit (25, windows-latest)
  • GitHub Check: assemble (25, windows-latest)
  • GitHub Check: precommit (21, ubuntu-24.04-arm)
  • GitHub Check: assemble (21, windows-latest)
  • GitHub Check: assemble (25, ubuntu-latest)
  • GitHub Check: precommit (25, ubuntu-latest)
  • GitHub Check: assemble (25, ubuntu-24.04-arm)
  • GitHub Check: precommit (21, macos-15)
  • GitHub Check: precommit (21, windows-latest)
  • GitHub Check: precommit (21, macos-15-intel)
  • GitHub Check: assemble (21, ubuntu-latest)
  • GitHub Check: detect-breaking-change
🔇 Additional comments (6)
server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithNodeToNodeIndexShardTests.java (4)

38-38: LGTM - Import is used in the new test.

The import for MergedSegmentPublisher is properly used in the new test method at line 732.


718-734: Well-structured test setup for concurrent translog recovery.

The test correctly configures concurrent recovery with a small batch size (10) to ensure the concurrent path is triggered with 200-300 additional documents. The use of MergedSegmentPublisher.EMPTY and custom RecoverySettings aligns with the test framework enhancements for this feature.

Optional enhancement: Consider adding explicit verification that concurrent recovery was actually used (e.g., checking thread pool metrics or adding test-specific instrumentation). While the current end-to-end validation is valuable for an integration test, explicit path verification would strengthen confidence that the concurrent execution path is exercised and not falling back to single-threaded mode.


757-770: Test correctly triggers concurrent recovery threshold.

The test uses randomIntBetween(200, 300) additional documents, which far exceeds the batch size of 10, ensuring the concurrent recovery path is triggered during promotion.

Minor note: The variable name additonalDocs (line 757, 759) has a typo (should be "additional"), but this is consistent with the existing test pattern in this file (see line 557). No action needed unless there's a broader effort to fix this across the file.


772-797: LGTM - Promotion and verification logic is correct.

The test properly:

  • Promotes the replica to primary (triggering translog recovery)
  • Demotes and recovers the old primary as a replica
  • Verifies engine types, document counts, translog state, and consistency across all shards

The verification approach matches the established pattern from testNRTReplicaPromotedAsPrimary and appropriately validates the correctness of promotion with concurrent translog recovery enabled.

server/src/main/java/org/opensearch/index/shard/IndexShard.java (2)

106-106: New executor import looks appropriate

OpenSearchThreadPoolExecutor is used only for queue introspection on the TRANSLOG_RECOVERY pool; no issues with the import itself.


244-244: Use of Future for concurrent recovery results is fine

java.util.concurrent.Future is used in the new concurrent translog recovery to aggregate per-batch results; import and usage are appropriate.

@github-actions
Copy link
Contributor

❌ Gradle check result for a41f8e9: null

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Signed-off-by: guojialiang <[email protected]>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithNodeToNodeIndexShardTests.java (2)

539-559: Good test setup; consider adding verification that concurrent recovery occurred.

The test correctly configures concurrent translog recovery (batch size 10, operations 200-300) to trigger the concurrent path. However, the test only verifies the final state is correct, not that concurrent recovery actually executed.

Consider enhancing the test to explicitly verify concurrent recovery was triggered, for example by:

  • Checking thread pool metrics or task counts
  • Adding test instrumentation to track which recovery path was taken
  • Using a custom ThreadPool that records whether TRANSLOG_RECOVERY threads were used

This would catch regressions where concurrent recovery silently falls back to sequential execution.


539-559: Consider adding test coverage for the thread pool busy fallback scenario.

According to the PR objectives, concurrent translog recovery falls back to single-threaded execution when the thread pool is busy. Currently, there's no test that explicitly exercises this fallback path.

Consider adding a test that:

  1. Saturates the TRANSLOG_RECOVERY thread pool with long-running tasks
  2. Initiates a promotion that should trigger concurrent recovery
  3. Verifies that recovery completes successfully using the sequential fallback
  4. Confirms the final state is correct

This would improve confidence in the resilience of the feature under load.

📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c6e1f1e and 3a60804.

📒 Files selected for processing (1)
  • server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithNodeToNodeIndexShardTests.java (2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (20)
  • GitHub Check: gradle-check
  • GitHub Check: assemble (21, ubuntu-24.04-arm)
  • GitHub Check: assemble (21, windows-latest)
  • GitHub Check: assemble (25, ubuntu-24.04-arm)
  • GitHub Check: assemble (25, ubuntu-latest)
  • GitHub Check: assemble (25, windows-latest)
  • GitHub Check: assemble (21, ubuntu-latest)
  • GitHub Check: precommit (21, macos-15-intel)
  • GitHub Check: precommit (25, ubuntu-latest)
  • GitHub Check: precommit (21, windows-2025, true)
  • GitHub Check: precommit (21, ubuntu-latest)
  • GitHub Check: precommit (21, windows-latest)
  • GitHub Check: precommit (25, ubuntu-24.04-arm)
  • GitHub Check: precommit (25, macos-15)
  • GitHub Check: precommit (25, windows-latest)
  • GitHub Check: precommit (21, macos-15)
  • GitHub Check: precommit (25, macos-15-intel)
  • GitHub Check: precommit (21, ubuntu-24.04-arm)
  • GitHub Check: detect-breaking-change
  • GitHub Check: Analyze (java)
🔇 Additional comments (2)
server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithNodeToNodeIndexShardTests.java (2)

38-38: LGTM - necessary import for new test.

The import supports the new concurrent translog recovery test by providing MergedSegmentPublisher.EMPTY.


561-624: Well-structured helper method that consolidates promotion workflow.

The doPrimaryPromotion method effectively extracts common test logic into a reusable helper. The two-batch approach is well-designed:

  • First batch verifies committed and replicated operations
  • Second batch exercises translog recovery during promotion (ops in translog but not yet replicated)

The method properly verifies state at each stage: doc counts, translog operations, engine types, and final consistency across all shards.

@github-actions
Copy link
Contributor

❌ Gradle check result for 3a60804: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Signed-off-by: guojialiang <[email protected]>
@guojialiang92 guojialiang92 force-pushed the dev/support_concurrent_translog_recovery branch from 3a60804 to 2d351d5 Compare December 17, 2025 05:58
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithNodeToNodeIndexShardTests.java (1)

533-537: Previous review comment about zero doc counts remains unaddressed.

The refactoring to doPrimaryPromotion improves maintainability, but the prior concern about randomInt(10) potentially returning 0 still applies. Zero documents weaken test coverage of the promotion workflow.

Apply this diff to ensure at least one document is indexed:

-            doPrimaryPromotion(shards, randomInt(10), randomInt(10));
+            doPrimaryPromotion(shards, randomIntBetween(1, 10), randomIntBetween(1, 10));
📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3a60804 and 2d351d5.

📒 Files selected for processing (1)
  • server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithNodeToNodeIndexShardTests.java (2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (20)
  • GitHub Check: gradle-check
  • GitHub Check: assemble (21, windows-latest)
  • GitHub Check: assemble (21, ubuntu-latest)
  • GitHub Check: assemble (21, ubuntu-24.04-arm)
  • GitHub Check: assemble (25, ubuntu-24.04-arm)
  • GitHub Check: assemble (25, windows-latest)
  • GitHub Check: assemble (25, ubuntu-latest)
  • GitHub Check: precommit (21, macos-15)
  • GitHub Check: precommit (21, ubuntu-24.04-arm)
  • GitHub Check: precommit (25, macos-15)
  • GitHub Check: precommit (25, windows-latest)
  • GitHub Check: precommit (21, windows-latest)
  • GitHub Check: precommit (21, ubuntu-latest)
  • GitHub Check: precommit (21, macos-15-intel)
  • GitHub Check: precommit (25, ubuntu-24.04-arm)
  • GitHub Check: precommit (25, macos-15-intel)
  • GitHub Check: precommit (21, windows-2025, true)
  • GitHub Check: precommit (25, ubuntu-latest)
  • GitHub Check: Analyze (java)
  • GitHub Check: detect-breaking-change
🔇 Additional comments (2)
server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithNodeToNodeIndexShardTests.java (2)

38-38: LGTM!

The import is necessary for the new test that creates a replication group with MergedSegmentPublisher.EMPTY.


561-624: LGTM! Well-structured helper method.

The doPrimaryPromotion helper effectively centralizes the promotion workflow for reuse across multiple tests. The logic is correct, with clear sections for document ingestion, replication, promotion, and verification. The assertions properly validate translog stats, engine types, and document consistency across shards.

@github-actions
Copy link
Contributor

✅ Gradle check result for 2d351d5: SUCCESS

@codecov
Copy link

codecov bot commented Dec 17, 2025

Codecov Report

❌ Patch coverage is 77.35849% with 12 lines in your changes missing coverage. Please review.
✅ Project coverage is 73.15%. Comparing base (930ae74) to head (2d351d5).
⚠️ Report is 3 commits behind head on main.

Files with missing lines Patch % Lines
...in/java/org/opensearch/index/shard/IndexShard.java 79.48% 5 Missing and 3 partials ⚠️
.../opensearch/indices/recovery/RecoverySettings.java 66.66% 4 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main   #20251      +/-   ##
============================================
- Coverage     73.20%   73.15%   -0.05%     
+ Complexity    71745    71712      -33     
============================================
  Files          5795     5795              
  Lines        328304   328354      +50     
  Branches      47283    47291       +8     
============================================
- Hits         240334   240218     -116     
- Misses        68663    68904     +241     
+ Partials      19307    19232      -75     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant