From 0997d467c0445891107d7258d3ce8af58f5061ce Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 11 Jul 2025 16:38:47 -0700 Subject: [PATCH] Fix testStopQueryLocal (#131130) By default, ES|QL uses all workers in the esql_worker threadpool to execute drivers on data nodes. If a node is both data and coordinator, and all drivers are blocked by the allowEmitting latch, there are no workers left to execute the final driver or fetch pages from remote clusters. This can prevent remote clusters from being marked as successful on the coordinator, even if they have completed. To avoid this, we reserve at least one worker for the final driver and page fetching. A single worker is enough, as these two tasks can be paused and yielded. Closes #121672 --- muted-tests.yml | 3 --- .../action/CrossClusterAsyncQueryStopIT.java | 19 +++++++++++++++---- .../xpack/esql/plugin/QueryPragmas.java | 2 +- 3 files changed, 16 insertions(+), 8 deletions(-) diff --git a/muted-tests.yml b/muted-tests.yml index a762977c892b6..82e1256ee018f 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -395,9 +395,6 @@ tests: - class: org.elasticsearch.xpack.esql.action.CrossClusterQueryWithPartialResultsIT method: testOneRemoteClusterPartial issue: https://github.com/elastic/elasticsearch/issues/124055 - - class: org.elasticsearch.xpack.esql.action.CrossClusterAsyncQueryStopIT - method: testStopQueryLocal - issue: https://github.com/elastic/elasticsearch/issues/121672 - class: org.elasticsearch.gradle.internal.InternalDistributionBwcSetupPluginFuncTest method: "builds distribution from branches via archives extractedAssemble [bwcDistVersion: 8.3.0, bwcProject: staged, expectedAssembleTaskName: extractedAssemble, #1]" diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java index 37e6b0bb48404..9667b559ea6b0 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java @@ -13,7 +13,10 @@ import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; import org.elasticsearch.tasks.TaskInfo; +import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.async.AsyncStopRequest; +import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; +import org.elasticsearch.xpack.esql.plugin.QueryPragmas; import java.util.Iterator; import java.util.List; @@ -132,13 +135,21 @@ public void testStopQueryLocal() throws Exception { Tuple includeCCSMetadata = randomIncludeCCSMetadata(); boolean responseExpectMeta = includeCCSMetadata.v2(); - - final String asyncExecutionId = startAsyncQuery( + // By default, ES|QL uses all workers in the esql_worker threadpool to execute drivers on data nodes. + // If a node is both data and coordinator, and all drivers are blocked by the allowEmitting latch, + // there are no workers left to execute the final driver or fetch pages from remote clusters. + // This can prevent remote clusters from being marked as successful on the coordinator, even if they + // have completed. To avoid this, we reserve at least one worker for the final driver and page fetching. + // A single worker is enough, as these two tasks can be paused and yielded. + var threadpool = cluster(LOCAL_CLUSTER).getInstance(TransportService.class).getThreadPool(); + int maxEsqlWorkers = threadpool.info(EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME).getMax(); + LOGGER.info("--> Launching async query"); + final String asyncExecutionId = startAsyncQueryWithPragmas( client(), "FROM blocking,*:logs-* | STATS total=sum(coalesce(const,v)) | LIMIT 1", - includeCCSMetadata.v1() + includeCCSMetadata.v1(), + Map.of(QueryPragmas.TASK_CONCURRENCY.getKey(), between(1, maxEsqlWorkers - 1)) ); - try { // wait until we know that the local query against 'blocking' has started assertTrue(SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS)); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java index 6e7bae2dc5eff..7d23cd3d69385 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java @@ -34,7 +34,7 @@ public final class QueryPragmas implements Writeable { public static final Setting EXCHANGE_CONCURRENT_CLIENTS = Setting.intSetting("exchange_concurrent_clients", 2); public static final Setting ENRICH_MAX_WORKERS = Setting.intSetting("enrich_max_workers", 1); - private static final Setting TASK_CONCURRENCY = Setting.intSetting( + public static final Setting TASK_CONCURRENCY = Setting.intSetting( "task_concurrency", ThreadPool.searchOrGetThreadPoolSize(EsExecutors.allocatedProcessors(Settings.EMPTY)) );