Skip to content

Commit fc0f139

Browse files
authored
Fix testStopQueryLocal (#131130) (#131145)
By default, ES|QL uses all workers in the esql_worker threadpool to execute drivers on data nodes. If a node is both data and coordinator, and all drivers are blocked by the allowEmitting latch, there are no workers left to execute the final driver or fetch pages from remote clusters. This can prevent remote clusters from being marked as successful on the coordinator, even if they have completed. To avoid this, we reserve at least one worker for the final driver and page fetching. A single worker is enough, as these two tasks can be paused and yielded. Closes #121672
1 parent 82aaa70 commit fc0f139

File tree

3 files changed

+16
-8
lines changed

3 files changed

+16
-8
lines changed

muted-tests.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -374,9 +374,6 @@ tests:
374374
- class: org.elasticsearch.xpack.esql.action.CrossClusterQueryWithPartialResultsIT
375375
method: testOneRemoteClusterPartial
376376
issue: https://github.com/elastic/elasticsearch/issues/124055
377-
- class: org.elasticsearch.xpack.esql.action.CrossClusterAsyncQueryStopIT
378-
method: testStopQueryLocal
379-
issue: https://github.com/elastic/elasticsearch/issues/121672
380377
- class: org.elasticsearch.gradle.internal.InternalDistributionBwcSetupPluginFuncTest
381378
method: "builds distribution from branches via archives extractedAssemble [bwcDistVersion: 8.3.0, bwcProject: staged, expectedAssembleTaskName:
382379
extractedAssemble, #1]"

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@
1313
import org.elasticsearch.logging.LogManager;
1414
import org.elasticsearch.logging.Logger;
1515
import org.elasticsearch.tasks.TaskInfo;
16+
import org.elasticsearch.transport.TransportService;
1617
import org.elasticsearch.xpack.core.async.AsyncStopRequest;
18+
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
19+
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
1720

1821
import java.util.Iterator;
1922
import java.util.List;
@@ -132,13 +135,21 @@ public void testStopQueryLocal() throws Exception {
132135

133136
Tuple<Boolean, Boolean> includeCCSMetadata = randomIncludeCCSMetadata();
134137
boolean responseExpectMeta = includeCCSMetadata.v2();
135-
136-
final String asyncExecutionId = startAsyncQuery(
138+
// By default, ES|QL uses all workers in the esql_worker threadpool to execute drivers on data nodes.
139+
// If a node is both data and coordinator, and all drivers are blocked by the allowEmitting latch,
140+
// there are no workers left to execute the final driver or fetch pages from remote clusters.
141+
// This can prevent remote clusters from being marked as successful on the coordinator, even if they
142+
// have completed. To avoid this, we reserve at least one worker for the final driver and page fetching.
143+
// A single worker is enough, as these two tasks can be paused and yielded.
144+
var threadpool = cluster(LOCAL_CLUSTER).getInstance(TransportService.class).getThreadPool();
145+
int maxEsqlWorkers = threadpool.info(EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME).getMax();
146+
LOGGER.info("--> Launching async query");
147+
final String asyncExecutionId = startAsyncQueryWithPragmas(
137148
client(),
138149
"FROM blocking,*:logs-* | STATS total=sum(coalesce(const,v)) | LIMIT 1",
139-
includeCCSMetadata.v1()
150+
includeCCSMetadata.v1(),
151+
Map.of(QueryPragmas.TASK_CONCURRENCY.getKey(), between(1, maxEsqlWorkers - 1))
140152
);
141-
142153
try {
143154
// wait until we know that the local query against 'blocking' has started
144155
assertTrue(SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS));

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public final class QueryPragmas implements Writeable {
3434
public static final Setting<Integer> EXCHANGE_CONCURRENT_CLIENTS = Setting.intSetting("exchange_concurrent_clients", 2);
3535
public static final Setting<Integer> ENRICH_MAX_WORKERS = Setting.intSetting("enrich_max_workers", 1);
3636

37-
private static final Setting<Integer> TASK_CONCURRENCY = Setting.intSetting(
37+
public static final Setting<Integer> TASK_CONCURRENCY = Setting.intSetting(
3838
"task_concurrency",
3939
ThreadPool.searchOrGetThreadPoolSize(EsExecutors.allocatedProcessors(Settings.EMPTY))
4040
);

0 commit comments

Comments
 (0)