diff --git a/muted-tests.yml b/muted-tests.yml index 28930ca34dd2f..4c2dc63ada195 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -374,9 +374,6 @@ tests: - class: org.elasticsearch.xpack.esql.action.CrossClusterQueryWithPartialResultsIT method: testOneRemoteClusterPartial issue: https://github.com/elastic/elasticsearch/issues/124055 - - class: org.elasticsearch.xpack.esql.action.CrossClusterAsyncQueryStopIT - method: testStopQueryLocal - issue: https://github.com/elastic/elasticsearch/issues/121672 - class: org.elasticsearch.gradle.internal.InternalDistributionBwcSetupPluginFuncTest method: "builds distribution from branches via archives extractedAssemble [bwcDistVersion: 8.3.0, bwcProject: staged, expectedAssembleTaskName: extractedAssemble, #1]" diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java index 37e6b0bb48404..9667b559ea6b0 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java @@ -13,7 +13,10 @@ import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; import org.elasticsearch.tasks.TaskInfo; +import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.async.AsyncStopRequest; +import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; +import org.elasticsearch.xpack.esql.plugin.QueryPragmas; import java.util.Iterator; import java.util.List; @@ -132,13 +135,21 @@ public void testStopQueryLocal() throws Exception { Tuple includeCCSMetadata = randomIncludeCCSMetadata(); boolean responseExpectMeta = includeCCSMetadata.v2(); - - final String asyncExecutionId = startAsyncQuery( + // By default, ES|QL uses all workers in the esql_worker threadpool to execute drivers on data nodes. + // If a node is both data and coordinator, and all drivers are blocked by the allowEmitting latch, + // there are no workers left to execute the final driver or fetch pages from remote clusters. + // This can prevent remote clusters from being marked as successful on the coordinator, even if they + // have completed. To avoid this, we reserve at least one worker for the final driver and page fetching. + // A single worker is enough, as these two tasks can be paused and yielded. + var threadpool = cluster(LOCAL_CLUSTER).getInstance(TransportService.class).getThreadPool(); + int maxEsqlWorkers = threadpool.info(EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME).getMax(); + LOGGER.info("--> Launching async query"); + final String asyncExecutionId = startAsyncQueryWithPragmas( client(), "FROM blocking,*:logs-* | STATS total=sum(coalesce(const,v)) | LIMIT 1", - includeCCSMetadata.v1() + includeCCSMetadata.v1(), + Map.of(QueryPragmas.TASK_CONCURRENCY.getKey(), between(1, maxEsqlWorkers - 1)) ); - try { // wait until we know that the local query against 'blocking' has started assertTrue(SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS)); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java index 6e7bae2dc5eff..7d23cd3d69385 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java @@ -34,7 +34,7 @@ public final class QueryPragmas implements Writeable { public static final Setting EXCHANGE_CONCURRENT_CLIENTS = Setting.intSetting("exchange_concurrent_clients", 2); public static final Setting ENRICH_MAX_WORKERS = Setting.intSetting("enrich_max_workers", 1); - private static final Setting TASK_CONCURRENCY = Setting.intSetting( + public static final Setting TASK_CONCURRENCY = Setting.intSetting( "task_concurrency", ThreadPool.searchOrGetThreadPoolSize(EsExecutors.allocatedProcessors(Settings.EMPTY)) );