Skip to content

Commit 936f338

Browse files
authored
ESQL: Change queries ID to be the same as the async (#127472)
This PR changes the list and query API for ESQL, such that the ID now follows the same format as async query IDs. This is saved as part of the task status. For async queries, this is easy, but for sync queries, this is slightly more complicated, since when creating them, we don't have access to a node ID. So instead, the status itself is just the doc ID portion of the async execution ID, which is used for salting, since this part needs to be consistent, so that when we list the queries, we can compute the async execution ID correctly. Also, I've removed the individual ID, node, and data node tags, as mentioned in the ticket. In addition, I've changed the accept and content-type to be JSON for lists. Resolves #127187
1 parent 13a5dde commit 936f338

File tree

22 files changed

+244
-82
lines changed

22 files changed

+244
-82
lines changed

docs/changelog/127472.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 127472
2+
summary: Change queries ID to be the same as the async
3+
area: ES|QL
4+
type: feature
5+
issues:
6+
- 127187

rest-api-spec/src/main/resources/rest-api-spec/api/esql.list_queries.json

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,8 @@
77
"stability": "experimental",
88
"visibility": "public",
99
"headers": {
10-
"accept": [],
11-
"content_type": [
12-
"application/json"
13-
]
10+
"accept": ["application/json"],
11+
"content_type": ["application/json"]
1412
},
1513
"url": {
1614
"paths": [

server/src/main/java/org/elasticsearch/tasks/TaskAwareRequest.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99

1010
package org.elasticsearch.tasks;
1111

12+
import org.elasticsearch.core.Nullable;
13+
1214
import java.util.Map;
1315

1416
/**
@@ -52,6 +54,23 @@ default Task createTask(long id, String type, String action, TaskId parentTaskId
5254
return new Task(id, type, action, getDescription(), parentTaskId, headers);
5355
}
5456

57+
/**
58+
* Returns the task object that should be used to keep track of the processing of the request, with an extra local node ID.
59+
*/
60+
// TODO remove the above overload, use only this one.
61+
default Task createTask(
62+
// TODO this is only nullable in tests, where the MockNode does not guarantee the localNodeId is set before calling this method. We
63+
// We should fix the tests, and replace this and id with TaskId instead.
64+
@Nullable String localNodeId,
65+
long id,
66+
String type,
67+
String action,
68+
TaskId parentTaskId,
69+
Map<String, String> headers
70+
) {
71+
return createTask(id, type, action, parentTaskId, headers);
72+
}
73+
5574
/**
5675
* Returns optional description of the request to be displayed by the task manager
5776
*/

server/src/main/java/org/elasticsearch/tasks/TaskManager.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,14 @@ public Task register(String type, String action, TaskAwareRequest request, boole
141141
headers.put(key, httpHeader);
142142
}
143143
}
144-
Task task = request.createTask(taskIdGenerator.incrementAndGet(), type, action, request.getParentTask(), headers);
144+
Task task = request.createTask(
145+
lastDiscoveryNodes.getLocalNodeId(),
146+
taskIdGenerator.incrementAndGet(),
147+
type,
148+
action,
149+
request.getParentTask(),
150+
headers
151+
);
145152
Objects.requireNonNull(task);
146153
assert task.getParentTaskId().equals(request.getParentTask()) : "Request [ " + request + "] didn't preserve it parentTaskId";
147154
if (logger.isTraceEnabled()) {

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncExecutionId.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
1111
import org.elasticsearch.common.io.stream.BytesStreamOutput;
1212
import org.elasticsearch.common.io.stream.StreamInput;
13+
import org.elasticsearch.common.io.stream.StreamOutput;
14+
import org.elasticsearch.common.io.stream.Writeable;
1315
import org.elasticsearch.tasks.TaskId;
1416

1517
import java.io.IOException;
@@ -20,7 +22,7 @@
2022
/**
2123
* A class that contains all information related to a submitted async execution.
2224
*/
23-
public final class AsyncExecutionId {
25+
public final class AsyncExecutionId implements Writeable {
2426
public static final String ASYNC_EXECUTION_ID_HEADER = "X-Elasticsearch-Async-Id";
2527
public static final String ASYNC_EXECUTION_IS_RUNNING_HEADER = "X-Elasticsearch-Async-Is-Running";
2628

@@ -115,4 +117,13 @@ public static AsyncExecutionId decode(String id) {
115117
}
116118
return new AsyncExecutionId(docId, new TaskId(taskId), id);
117119
}
120+
121+
@Override
122+
public void writeTo(StreamOutput out) throws IOException {
123+
out.writeString(getEncoded());
124+
}
125+
126+
public static AsyncExecutionId readFrom(StreamInput input) throws IOException {
127+
return decode(input.readString());
128+
}
118129
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.core.async;
9+
10+
import org.elasticsearch.common.io.stream.Writeable;
11+
import org.elasticsearch.tasks.TaskId;
12+
import org.elasticsearch.test.AbstractWireSerializingTestCase;
13+
14+
import java.io.IOException;
15+
16+
public class AsyncExecutionIdWireTests extends AbstractWireSerializingTestCase<AsyncExecutionId> {
17+
@Override
18+
protected Writeable.Reader<AsyncExecutionId> instanceReader() {
19+
return AsyncExecutionId::readFrom;
20+
}
21+
22+
@Override
23+
protected AsyncExecutionId createTestInstance() {
24+
return new AsyncExecutionId(randomAlphaOfLength(15), new TaskId(randomAlphaOfLength(10), randomLong()));
25+
}
26+
27+
@Override
28+
protected AsyncExecutionId mutateInstance(AsyncExecutionId instance) throws IOException {
29+
return new AsyncExecutionId(
30+
instance.getDocId(),
31+
new TaskId(instance.getTaskId().getNodeId(), instance.getTaskId().getId() * 12345)
32+
);
33+
}
34+
}

x-pack/plugin/esql/qa/security/src/javaRestTest/java/org/elasticsearch/xpack/esql/EsqlSecurityIT.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -916,19 +916,23 @@ public void testListQueryForbidden() throws Exception {
916916

917917
public void testGetQueryAllowed() throws Exception {
918918
// This is a bit tricky, since there is no such running query. We just make sure it didn't fail on forbidden privileges.
919-
Request request = new Request("GET", "_query/queries/foo:1234");
920-
var resp = expectThrows(ResponseException.class, () -> client().performRequest(request));
921-
assertThat(resp.getResponse().getStatusLine().getStatusCode(), not(equalTo(404)));
919+
setUser(GET_QUERY_REQUEST, "user_with_monitor_privileges");
920+
var resp = expectThrows(ResponseException.class, () -> client().performRequest(GET_QUERY_REQUEST));
921+
assertThat(resp.getResponse().getStatusLine().getStatusCode(), not(equalTo(403)));
922922
}
923923

924924
public void testGetQueryForbidden() throws Exception {
925-
Request request = new Request("GET", "_query/queries/foo:1234");
926-
setUser(request, "user_without_monitor_privileges");
927-
var resp = expectThrows(ResponseException.class, () -> client().performRequest(request));
925+
setUser(GET_QUERY_REQUEST, "user_without_monitor_privileges");
926+
var resp = expectThrows(ResponseException.class, () -> client().performRequest(GET_QUERY_REQUEST));
928927
assertThat(resp.getResponse().getStatusLine().getStatusCode(), equalTo(403));
929928
assertThat(resp.getMessage(), containsString("this action is granted by the cluster privileges [monitor_esql,monitor,manage,all]"));
930929
}
931930

931+
private static final Request GET_QUERY_REQUEST = new Request(
932+
"GET",
933+
"_query/queries/FmJKWHpFRi1OU0l5SU1YcnpuWWhoUWcZWDFuYUJBeW1TY0dKM3otWUs2bDJudzo1Mg=="
934+
);
935+
932936
private void createEnrichPolicy() throws Exception {
933937
createIndex("songs", Settings.EMPTY, """
934938
"properties":{"song_id": {"type": "keyword"}, "title": {"type": "keyword"}, "artist": {"type": "keyword"} }

x-pack/plugin/esql/qa/testFixtures/src/main/resources/query_task.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
"id" : 5326,
44
"type" : "transport",
55
"action" : "indices:data/read/esql",
6+
"status" : {
7+
"request_id" : "Ks5ApyqMTtWj5LrKigmCjQ"
8+
},
69
"description" : "FROM test | STATS MAX(d) by a, b", <1>
710
"start_time" : "2023-07-31T15:46:32.328Z",
811
"start_time_in_millis" : 1690818392328,

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractPausableIntegTestCase.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
public abstract class AbstractPausableIntegTestCase extends AbstractEsqlIntegTestCase {
3131

3232
protected static final Semaphore scriptPermits = new Semaphore(0);
33+
// Incremented onWait. Can be used to check if the onWait process has been reached.
34+
protected static final Semaphore scriptWaits = new Semaphore(0);
3335

3436
protected int pageSize = -1;
3537

@@ -98,6 +100,7 @@ public void setupIndex() throws IOException {
98100
public static class PausableFieldPlugin extends AbstractPauseFieldPlugin {
99101
@Override
100102
protected boolean onWait() throws InterruptedException {
103+
scriptWaits.release();
101104
return scriptPermits.tryAcquire(1, TimeUnit.MINUTES);
102105
}
103106
}

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlListQueriesActionIT.java

Lines changed: 38 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -7,24 +7,21 @@
77

88
package org.elasticsearch.xpack.esql.action;
99

10+
import org.elasticsearch.action.ActionFuture;
1011
import org.elasticsearch.client.Request;
1112
import org.elasticsearch.client.Response;
12-
import org.elasticsearch.tasks.TaskId;
1313
import org.elasticsearch.test.IntOrLongMatcher;
1414
import org.elasticsearch.test.MapMatcher;
1515
import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
1616
import org.elasticsearch.xpack.esql.EsqlTestUtils;
1717

18-
import java.util.List;
18+
import java.io.IOException;
1919
import java.util.Map;
2020
import java.util.concurrent.TimeUnit;
2121

2222
import static org.elasticsearch.core.TimeValue.timeValueSeconds;
2323
import static org.elasticsearch.xpack.esql.EsqlTestUtils.jsonEntityToMap;
24-
import static org.hamcrest.Matchers.allOf;
25-
import static org.hamcrest.Matchers.everyItem;
2624
import static org.hamcrest.Matchers.is;
27-
import static org.hamcrest.Matchers.isA;
2825

2926
public class EsqlListQueriesActionIT extends AbstractPausableIntegTestCase {
3027
private static final String QUERY = "from test | stats sum(pause_me)";
@@ -45,31 +42,10 @@ public void testRunningQueries() throws Exception {
4542
try (var initialResponse = sendAsyncQuery()) {
4643
id = initialResponse.asyncExecutionId().get();
4744

45+
assertRunningQueries();
4846
var getResultsRequest = new GetAsyncResultRequest(id);
4947
getResultsRequest.setWaitForCompletionTimeout(timeValueSeconds(1));
5048
client().execute(EsqlAsyncGetResultAction.INSTANCE, getResultsRequest).get().close();
51-
Response listResponse = getRestClient().performRequest(new Request("GET", "/_query/queries"));
52-
@SuppressWarnings("unchecked")
53-
var listResult = (Map<String, Map<String, Object>>) EsqlTestUtils.singleValue(
54-
jsonEntityToMap(listResponse.getEntity()).values()
55-
);
56-
var taskId = new TaskId(EsqlTestUtils.singleValue(listResult.keySet()));
57-
MapMatcher basicMatcher = MapMatcher.matchesMap()
58-
.entry("node", is(taskId.getNodeId()))
59-
.entry("id", IntOrLongMatcher.matches(taskId.getId()))
60-
.entry("query", is(QUERY))
61-
.entry("start_time_millis", IntOrLongMatcher.isIntOrLong())
62-
.entry("running_time_nanos", IntOrLongMatcher.isIntOrLong());
63-
MapMatcher.assertMap(EsqlTestUtils.singleValue(listResult.values()), basicMatcher);
64-
65-
Response getQueryResponse = getRestClient().performRequest(new Request("GET", "/_query/queries/" + taskId));
66-
MapMatcher.assertMap(
67-
jsonEntityToMap(getQueryResponse.getEntity()),
68-
basicMatcher.entry("coordinating_node", isA(String.class))
69-
.entry("data_nodes", allOf(isA(List.class), everyItem(isA(String.class))))
70-
.entry("documents_found", IntOrLongMatcher.isIntOrLong())
71-
.entry("values_loaded", IntOrLongMatcher.isIntOrLong())
72-
);
7349
} finally {
7450
if (id != null) {
7551
// Finish the query.
@@ -82,9 +58,44 @@ public void testRunningQueries() throws Exception {
8258
}
8359
}
8460

61+
public void testRunningQueriesSync() throws Exception {
62+
var future = sendSyncQueryAsyncly();
63+
try {
64+
scriptWaits.acquire();
65+
assertRunningQueries();
66+
} finally {
67+
scriptPermits.release(numberOfDocs());
68+
future.actionGet(timeValueSeconds(60)).close();
69+
}
70+
}
71+
72+
private static void assertRunningQueries() throws IOException {
73+
Response listResponse = getRestClient().performRequest(new Request("GET", "/_query/queries"));
74+
@SuppressWarnings("unchecked")
75+
var listResult = (Map<String, Map<String, Object>>) EsqlTestUtils.singleValue(jsonEntityToMap(listResponse.getEntity()).values());
76+
String queryId = EsqlTestUtils.singleValue(listResult.keySet());
77+
MapMatcher basicMatcher = MapMatcher.matchesMap()
78+
.entry("query", is(QUERY))
79+
.entry("start_time_millis", IntOrLongMatcher.isIntOrLong())
80+
.entry("running_time_nanos", IntOrLongMatcher.isIntOrLong());
81+
MapMatcher.assertMap(EsqlTestUtils.singleValue(listResult.values()), basicMatcher);
82+
83+
Response getQueryResponse = getRestClient().performRequest(new Request("GET", "/_query/queries/" + queryId));
84+
MapMatcher.assertMap(
85+
jsonEntityToMap(getQueryResponse.getEntity()),
86+
basicMatcher.entry("documents_found", IntOrLongMatcher.isIntOrLong()).entry("values_loaded", IntOrLongMatcher.isIntOrLong())
87+
);
88+
}
89+
8590
private EsqlQueryResponse sendAsyncQuery() {
8691
scriptPermits.drainPermits();
8792
scriptPermits.release(between(1, 5));
8893
return EsqlQueryRequestBuilder.newAsyncEsqlQueryRequestBuilder(client()).query(QUERY).execute().actionGet(60, TimeUnit.SECONDS);
8994
}
95+
96+
private ActionFuture<EsqlQueryResponse> sendSyncQueryAsyncly() {
97+
scriptPermits.drainPermits();
98+
scriptPermits.release(between(1, 5));
99+
return EsqlQueryRequestBuilder.newSyncEsqlQueryRequestBuilder(client()).query(QUERY).execute();
100+
}
90101
}

0 commit comments

Comments
 (0)