Skip to content

Commit d4dd907

Browse files
committed
ESQL: Added Sample operator NamedWritable to plugin (elastic#131541)
`SampleOperator.Status` wasn't declared as a NamedWritable by the plugin, leading to serialization errors when `SAMPLE` is used with `profile: true`. It leads to an `IllegalArgumentException: Unknown NamedWriteable [org.elasticsearch.compute.operator.Operator$Status][sample]` Profiles will be tested in this PR: elastic#131474, that's currently failing because of this bug
1 parent 13938c6 commit d4dd907

File tree

5 files changed

+101
-9
lines changed

5 files changed

+101
-9
lines changed

docs/changelog/131541.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 131541
2+
summary: Added Sample operator `NamedWritable` to plugin
3+
area: ES|QL
4+
type: bug
5+
issues: []

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -330,6 +330,7 @@ static TransportVersion def(int id) {
330330
public static final TransportVersion MAPPINGS_IN_DATA_STREAMS = def(9_112_0_00);
331331
public static final TransportVersion ESQL_SPLIT_ON_BIG_VALUES_9_1 = def(9_112_0_01);
332332
public static final TransportVersion ESQL_FIXED_INDEX_LIKE_9_1 = def(9_112_0_02);
333+
public static final TransportVersion ESQL_SAMPLE_OPERATOR_STATUS_9_1 = def(9_112_0_03);
333334

334335
/*
335336
* STOP! READ THIS FIRST! No, really,

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/SampleOperator.java

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -66,11 +66,11 @@ public String describe() {
6666
private final RandomSamplingQuery.RandomSamplingIterator randomSamplingIterator;
6767
private boolean finished;
6868

69-
private int pagesProcessed = 0;
70-
private int rowsReceived = 0;
71-
private int rowsEmitted = 0;
7269
private long collectNanos;
7370
private long emitNanos;
71+
private int pagesProcessed = 0;
72+
private long rowsReceived = 0;
73+
private long rowsEmitted = 0;
7474

7575
private SampleOperator(double probability, int seed) {
7676
finished = false;
@@ -109,7 +109,7 @@ private void createOutputPage(Page page) {
109109
final int[] sampledPositions = new int[page.getPositionCount()];
110110
int sampledIdx = 0;
111111
for (int i = randomSamplingIterator.docID(); i - rowsReceived < page.getPositionCount(); i = randomSamplingIterator.nextDoc()) {
112-
sampledPositions[sampledIdx++] = i - rowsReceived;
112+
sampledPositions[sampledIdx++] = Math.toIntExact(i - rowsReceived);
113113
}
114114
if (sampledIdx > 0) {
115115
outputPages.add(page.filter(Arrays.copyOf(sampledPositions, sampledIdx)));
@@ -167,7 +167,7 @@ public Operator.Status status() {
167167
return new Status(collectNanos, emitNanos, pagesProcessed, rowsReceived, rowsEmitted);
168168
}
169169

170-
private record Status(long collectNanos, long emitNanos, int pagesProcessed, int rowsReceived, int rowsEmitted)
170+
public record Status(long collectNanos, long emitNanos, int pagesProcessed, long rowsReceived, long rowsEmitted)
171171
implements
172172
Operator.Status {
173173

@@ -178,16 +178,22 @@ private record Status(long collectNanos, long emitNanos, int pagesProcessed, int
178178
);
179179

180180
Status(StreamInput streamInput) throws IOException {
181-
this(streamInput.readVLong(), streamInput.readVLong(), streamInput.readVInt(), streamInput.readVInt(), streamInput.readVInt());
181+
this(
182+
streamInput.readVLong(),
183+
streamInput.readVLong(),
184+
streamInput.readVInt(),
185+
streamInput.readVLong(),
186+
streamInput.readVLong()
187+
);
182188
}
183189

184190
@Override
185191
public void writeTo(StreamOutput out) throws IOException {
186192
out.writeVLong(collectNanos);
187193
out.writeVLong(emitNanos);
188194
out.writeVInt(pagesProcessed);
189-
out.writeVInt(rowsReceived);
190-
out.writeVInt(rowsEmitted);
195+
out.writeVLong(rowsReceived);
196+
out.writeVLong(rowsEmitted);
191197
}
192198

193199
@Override
@@ -236,7 +242,13 @@ public String toString() {
236242

237243
@Override
238244
public TransportVersion getMinimalSupportedVersion() {
239-
return TransportVersions.ZERO;
245+
assert false : "must not be called when overriding supportsVersion";
246+
throw new UnsupportedOperationException("must not be called when overriding supportsVersion");
247+
}
248+
249+
@Override
250+
public boolean supportsVersion(TransportVersion version) {
251+
return version.onOrAfter(TransportVersions.ESQL_SAMPLE_OPERATOR_STATUS_9_1);
240252
}
241253
}
242254
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.compute.operator;
9+
10+
import org.elasticsearch.common.Strings;
11+
import org.elasticsearch.common.io.stream.Writeable;
12+
import org.elasticsearch.test.AbstractWireSerializingTestCase;
13+
import org.elasticsearch.test.ESTestCase;
14+
15+
import static org.hamcrest.Matchers.equalTo;
16+
17+
public class SampleOperatorStatusTests extends AbstractWireSerializingTestCase<SampleOperator.Status> {
18+
public static SampleOperator.Status simple() {
19+
return new SampleOperator.Status(500012, 200012, 123, 111, 222);
20+
}
21+
22+
public static String simpleToJson() {
23+
return """
24+
{
25+
"collect_nanos" : 500012,
26+
"collect_time" : "500micros",
27+
"emit_nanos" : 200012,
28+
"emit_time" : "200micros",
29+
"pages_processed" : 123,
30+
"rows_received" : 111,
31+
"rows_emitted" : 222
32+
}""";
33+
}
34+
35+
public void testToXContent() {
36+
assertThat(Strings.toString(simple(), true, true), equalTo(simpleToJson()));
37+
}
38+
39+
@Override
40+
protected Writeable.Reader<SampleOperator.Status> instanceReader() {
41+
return SampleOperator.Status::new;
42+
}
43+
44+
@Override
45+
public SampleOperator.Status createTestInstance() {
46+
return new SampleOperator.Status(
47+
randomNonNegativeLong(),
48+
randomNonNegativeLong(),
49+
randomNonNegativeInt(),
50+
randomNonNegativeLong(),
51+
randomNonNegativeLong()
52+
);
53+
}
54+
55+
@Override
56+
protected SampleOperator.Status mutateInstance(SampleOperator.Status instance) {
57+
long collectNanos = instance.collectNanos();
58+
long emitNanos = instance.emitNanos();
59+
int pagesProcessed = instance.pagesProcessed();
60+
long rowsReceived = instance.rowsReceived();
61+
long rowsEmitted = instance.rowsEmitted();
62+
switch (between(0, 4)) {
63+
case 0 -> collectNanos = randomValueOtherThan(collectNanos, ESTestCase::randomNonNegativeLong);
64+
case 1 -> emitNanos = randomValueOtherThan(emitNanos, ESTestCase::randomNonNegativeLong);
65+
case 2 -> pagesProcessed = randomValueOtherThan(pagesProcessed, ESTestCase::randomNonNegativeInt);
66+
case 3 -> rowsReceived = randomValueOtherThan(rowsReceived, ESTestCase::randomNonNegativeLong);
67+
case 4 -> rowsEmitted = randomValueOtherThan(rowsEmitted, ESTestCase::randomNonNegativeLong);
68+
default -> throw new UnsupportedOperationException();
69+
}
70+
return new SampleOperator.Status(collectNanos, emitNanos, pagesProcessed, rowsReceived, rowsEmitted);
71+
}
72+
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.elasticsearch.compute.operator.HashAggregationOperator;
3434
import org.elasticsearch.compute.operator.LimitOperator;
3535
import org.elasticsearch.compute.operator.MvExpandOperator;
36+
import org.elasticsearch.compute.operator.SampleOperator;
3637
import org.elasticsearch.compute.operator.exchange.ExchangeService;
3738
import org.elasticsearch.compute.operator.exchange.ExchangeSinkOperator;
3839
import org.elasticsearch.compute.operator.exchange.ExchangeSourceOperator;
@@ -328,6 +329,7 @@ public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
328329
entries.add(AsyncOperator.Status.ENTRY);
329330
entries.add(EnrichLookupOperator.Status.ENTRY);
330331
entries.add(LookupFromIndexOperator.Status.ENTRY);
332+
entries.add(SampleOperator.Status.ENTRY);
331333
entries.add(ExpressionQueryBuilder.ENTRY);
332334
entries.add(PlanStreamWrapperQueryBuilder.ENTRY);
333335

0 commit comments

Comments
 (0)