From 35d209ef04cbcf3e4c890cd397e72db9e53f1200 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 11 Jul 2025 14:20:06 -0700 Subject: [PATCH] Fix testStopQueryLocal --- muted-tests.yml | 3 --- .../action/CrossClusterAsyncQueryStopIT.java | 18 ++++++++++++++---- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/muted-tests.yml b/muted-tests.yml index 7da37e24f1230..ae4c34caa5c3b 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -531,9 +531,6 @@ tests: - class: org.elasticsearch.indices.stats.IndexStatsIT method: testFilterCacheStats issue: https://github.com/elastic/elasticsearch/issues/124447 -- class: org.elasticsearch.xpack.esql.action.CrossClusterAsyncQueryStopIT - method: testStopQueryLocal - issue: https://github.com/elastic/elasticsearch/issues/121672 - class: org.elasticsearch.backwards.MixedClusterClientYamlTestSuiteIT method: test {p0=mtermvectors/10_basic/Tests catching other exceptions per item} issue: https://github.com/elastic/elasticsearch/issues/122414 diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java index 222ffb5c05b0d..049d7fc4cf94b 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java @@ -13,7 +13,10 @@ import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; import org.elasticsearch.tasks.TaskInfo; +import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.async.AsyncStopRequest; +import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; +import org.elasticsearch.xpack.esql.plugin.QueryPragmas; import java.util.Iterator; import java.util.List; @@ -132,14 +135,21 @@ public void testStopQueryLocal() throws Exception { Tuple includeCCSMetadata = randomIncludeCCSMetadata(); boolean responseExpectMeta = includeCCSMetadata.v2(); - + // By default, ES|QL uses all workers in the esql_worker threadpool to execute drivers on data nodes. + // If a node is both data and coordinator, and all drivers are blocked by the allowEmitting latch, + // there are no workers left to execute the final driver or fetch pages from remote clusters. + // This can prevent remote clusters from being marked as successful on the coordinator, even if they + // have completed. To avoid this, we reserve at least one worker for the final driver and page fetching. + // A single worker is enough, as these two tasks can be paused and yielded. + var threadpool = cluster(LOCAL_CLUSTER).getInstance(TransportService.class).getThreadPool(); + int maxEsqlWorkers = threadpool.info(EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME).getMax(); LOGGER.info("--> Launching async query"); - final String asyncExecutionId = startAsyncQuery( + final String asyncExecutionId = startAsyncQueryWithPragmas( client(), "FROM blocking,*:logs-* | STATS total=sum(coalesce(const,v)) | LIMIT 1", - includeCCSMetadata.v1() + includeCCSMetadata.v1(), + Map.of(QueryPragmas.TASK_CONCURRENCY.getKey(), between(1, maxEsqlWorkers - 1)) ); - try { // wait until we know that the local query against 'blocking' has started LOGGER.info("--> Waiting for {} to start", asyncExecutionId);