Skip to content

Commit 2125ef9

Browse files
committed
Allow disabling client segment acknowledgment
This allows for storage-wide expiration policies to be implemented that doesn't require explicit API call to remove an expired segment.
1 parent da9d3ea commit 2125ef9

File tree

11 files changed

+73
-29
lines changed

11 files changed

+73
-29
lines changed

client/trino-client/src/main/java/io/trino/client/spooling/Segment.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public static Segment inlined(byte[] data, DataAttributes attributes)
8787
return new InlineSegment(data, attributes);
8888
}
8989

90-
public static Segment spooled(URI retrieveUri, URI ackUri, DataAttributes attributes, Map<String, List<String>> headers)
90+
public static Segment spooled(URI retrieveUri, Optional<URI> ackUri, DataAttributes attributes, Map<String, List<String>> headers)
9191
{
9292
return new SpooledSegment(retrieveUri, ackUri, attributes, headers);
9393
}

client/trino-client/src/main/java/io/trino/client/spooling/SegmentLoader.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.net.URI;
2727
import java.util.List;
2828
import java.util.Map;
29+
import java.util.Optional;
2930
import java.util.logging.Level;
3031
import java.util.logging.Logger;
3132

@@ -55,7 +56,7 @@ public InputStream load(SpooledSegment segment)
5556
return loadFromURI(segment.getDataUri(), segment.getAckUri(), segment.getHeaders());
5657
}
5758

58-
public InputStream loadFromURI(URI segmentUri, URI ackUri, Map<String, List<String>> headers)
59+
public InputStream loadFromURI(URI segmentUri, Optional<URI> ackUri, Map<String, List<String>> headers)
5960
throws IOException
6061
{
6162
Headers requestHeaders = toHeaders(headers);
@@ -99,7 +100,7 @@ public void onResponse(Call call, Response response)
99100
});
100101
}
101102

102-
private InputStream delegatingInputStream(Response response, InputStream delegate, URI ackUri, Headers headers)
103+
private InputStream delegatingInputStream(Response response, InputStream delegate, Optional<URI> ackUri, Headers headers)
103104
{
104105
return new FilterInputStream(delegate)
105106
{
@@ -108,7 +109,7 @@ public void close()
108109
throws IOException
109110
{
110111
try (Response ignored = response; InputStream ignored2 = delegate) {
111-
acknowledge(ackUri, headers);
112+
ackUri.ifPresent(uri -> acknowledge(uri, headers));
112113
}
113114
}
114115
};

client/trino-client/src/main/java/io/trino/client/spooling/SpooledSegment.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.net.URI;
2222
import java.util.List;
2323
import java.util.Map;
24+
import java.util.Optional;
2425

2526
import static com.google.common.base.MoreObjects.firstNonNull;
2627
import static java.lang.String.format;
@@ -30,20 +31,20 @@ public final class SpooledSegment
3031
extends Segment
3132
{
3233
private final URI dataUri;
34+
private final Optional<URI> ackUri;
3335
private final Map<String, List<String>> headers;
34-
private final URI ackUri;
3536

3637
@JsonCreator
3738
public SpooledSegment(
3839
@JsonProperty("uri") URI dataUri,
39-
@JsonProperty("ackUri") URI ackUri,
40+
@JsonProperty("ackUri") Optional<URI> ackUri,
4041
@JsonProperty("metadata") Map<String, Object> metadata,
4142
@JsonProperty("headers") Map<String, List<String>> headers)
4243
{
4344
this(dataUri, ackUri, new DataAttributes(metadata), headers);
4445
}
4546

46-
SpooledSegment(URI dataUri, URI ackUri, DataAttributes metadata, Map<String, List<String>> headers)
47+
SpooledSegment(URI dataUri, Optional<URI> ackUri, DataAttributes metadata, Map<String, List<String>> headers)
4748
{
4849
super(metadata);
4950
this.dataUri = requireNonNull(dataUri, "dataUri is null");
@@ -58,7 +59,7 @@ public URI getDataUri()
5859
}
5960

6061
@JsonProperty("ackUri")
61-
public URI getAckUri()
62+
public Optional<URI> getAckUri()
6263
{
6364
return ackUri;
6465
}
@@ -73,6 +74,6 @@ public Map<String, List<String>> getHeaders()
7374
@Override
7475
public String toString()
7576
{
76-
return format("SpooledSegment{offset=%d, rows=%d, size=%d, headers=%s}", getOffset(), getRowsCount(), getSegmentSize(), headers.keySet());
77+
return format("SpooledSegment{offset=%d, rows=%d, size=%d, headers=%s, ack=%b}", getOffset(), getRowsCount(), getSegmentSize(), headers.keySet(), ackUri.isPresent());
7778
}
7879
}

core/trino-main/src/main/java/io/trino/operator/OutputSpoolingOperatorFactory.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ static class OutputSpoolingOperator
135135
implements Operator
136136
{
137137
private final OutputSpoolingController controller;
138+
private final boolean explicitAck;
138139

139140
enum State
140141
{
@@ -165,6 +166,7 @@ public OutputSpoolingOperator(OperatorContext operatorContext, QueryDataEncoder
165166
spoolingConfig.getMaximumSegmentSize().toBytes(),
166167
spoolingConfig.getInitialSegmentSize().toBytes(),
167168
spoolingConfig.getMaximumSegmentSize().toBytes());
169+
this.explicitAck = spoolingConfig.isExplicitAck();
168170
this.userMemoryContext = operatorContext.newLocalUserMemoryContext(OutputSpoolingOperator.class.getSimpleName());
169171
this.queryDataEncoder = requireNonNull(queryDataEncoder, "queryDataEncoder is null");
170172
this.spoolingManager = requireNonNull(spoolingManager, "spoolingManager is null");
@@ -278,7 +280,7 @@ private Page spool(List<Page> pages, boolean finished)
278280
controller.recordEncoded(attributes.get(SEGMENT_SIZE, Integer.class));
279281

280282
// This page is small (hundreds of bytes) so there is no point in tracking its memory usage
281-
return emptySingleRowPage(SpooledBlock.forLocation(spoolingManager.location(segmentHandle), attributes).serialize());
283+
return emptySingleRowPage(SpooledBlock.forLocation(spoolingManager.location(segmentHandle), attributes, explicitAck).serialize());
282284
}
283285
catch (IOException e) {
284286
throw new UncheckedIOException(e);

core/trino-main/src/main/java/io/trino/server/protocol/spooling/CoordinatorSegmentResource.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,12 +53,14 @@ public class CoordinatorSegmentResource
5353
private final SpoolingManager spoolingManager;
5454
private final SegmentRetrievalMode retrievalMode;
5555
private final InternalNodeManager nodeManager;
56+
private final boolean explicitAck;
5657

5758
@Inject
5859
public CoordinatorSegmentResource(SpoolingManager spoolingManager, SpoolingConfig config, InternalNodeManager nodeManager)
5960
{
6061
this.spoolingManager = requireNonNull(spoolingManager, "spoolingManager is null");
6162
this.retrievalMode = requireNonNull(config, "config is null").getRetrievalMode();
63+
this.explicitAck = config.isExplicitAck();
6264
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
6365
}
6466

@@ -97,6 +99,12 @@ public Response download(@Context UriInfo uriInfo, @PathParam("identifier") Stri
9799
public Response acknowledge(@PathParam("identifier") String identifier, @Context HttpHeaders headers)
98100
throws IOException
99101
{
102+
if (!explicitAck) {
103+
return Response.status(Response.Status.NOT_ACCEPTABLE)
104+
.entity("Explicit segment acknowledgment is disabled")
105+
.build();
106+
}
107+
100108
try {
101109
spoolingManager.acknowledge(handle(identifier, headers));
102110
return Response.ok().build();

core/trino-main/src/main/java/io/trino/server/protocol/spooling/SpooledBlock.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,10 @@
3636
import static io.airlift.json.JsonCodec.listJsonCodec;
3737
import static io.airlift.json.JsonCodec.mapJsonCodec;
3838
import static io.airlift.slice.Slices.utf8Slice;
39+
import static io.trino.spi.type.BooleanType.BOOLEAN;
3940
import static io.trino.spi.type.VarcharType.VARCHAR;
4041

41-
public record SpooledBlock(Slice identifier, Optional<URI> directUri, Map<String, List<String>> headers, DataAttributes attributes)
42+
public record SpooledBlock(Slice identifier, Optional<URI> directUri, Map<String, List<String>> headers, DataAttributes attributes, boolean explicitAck)
4243
{
4344
private static final JsonCodec<Map<String, List<String>>> HEADERS_CODEC = mapJsonCodec(String.class, listJsonCodec(String.class));
4445
private static final JsonCodec<DataAttributes> ATTRIBUTES_CODEC = JsonCodec.jsonCodec(DataAttributes.class);
@@ -47,7 +48,8 @@ public record SpooledBlock(Slice identifier, Optional<URI> directUri, Map<String
4748
new RowType.Field(Optional.of("identifier"), VARCHAR),
4849
new RowType.Field(Optional.of("directLocation"), VARCHAR),
4950
new RowType.Field(Optional.of("headers"), VARCHAR),
50-
new RowType.Field(Optional.of("metadata"), VARCHAR)));
51+
new RowType.Field(Optional.of("metadata"), VARCHAR),
52+
new RowType.Field(Optional.of("explicitAck"), BOOLEAN)));
5153

5254
public static final String SPOOLING_METADATA_COLUMN_NAME = "$spooling:metadata$";
5355
public static final Symbol SPOOLING_METADATA_SYMBOL = new Symbol(SPOOLING_METADATA_TYPE, SPOOLING_METADATA_COLUMN_NAME);
@@ -63,29 +65,33 @@ public static SpooledBlock deserialize(Page page)
6365
VARCHAR.getSlice(row.getRawFieldBlock(0), 0),
6466
Optional.empty(), // Not a direct location
6567
HEADERS_CODEC.fromJson(VARCHAR.getSlice(row.getRawFieldBlock(2), 0).toStringUtf8()),
66-
ATTRIBUTES_CODEC.fromJson(VARCHAR.getSlice(row.getRawFieldBlock(3), 0).toStringUtf8()));
68+
ATTRIBUTES_CODEC.fromJson(VARCHAR.getSlice(row.getRawFieldBlock(3), 0).toStringUtf8()),
69+
BOOLEAN.getBoolean(row.getRawFieldBlock(4), 0));
6770
}
6871

6972
return new SpooledBlock(
7073
VARCHAR.getSlice(row.getRawFieldBlock(0), 0),
7174
Optional.of(URI.create(VARCHAR.getSlice(row.getRawFieldBlock(1), 0).toStringUtf8())),
7275
HEADERS_CODEC.fromJson(VARCHAR.getSlice(row.getRawFieldBlock(2), 0).toStringUtf8()),
73-
ATTRIBUTES_CODEC.fromJson(VARCHAR.getSlice(row.getRawFieldBlock(3), 0).toStringUtf8()));
76+
ATTRIBUTES_CODEC.fromJson(VARCHAR.getSlice(row.getRawFieldBlock(3), 0).toStringUtf8()),
77+
BOOLEAN.getBoolean(row.getRawFieldBlock(4), 0));
7478
}
7579

76-
public static SpooledBlock forLocation(SpooledLocation location, DataAttributes attributes)
80+
public static SpooledBlock forLocation(SpooledLocation location, DataAttributes attributes, boolean explicitAck)
7781
{
7882
return switch (location) {
7983
case DirectLocation directLocation -> new SpooledBlock(
8084
directLocation.identifier(),
8185
Optional.of(directLocation.directUri()),
8286
directLocation.headers(),
83-
attributes);
87+
attributes,
88+
explicitAck);
8489
case CoordinatorLocation coordinatorLocation -> new SpooledBlock(
8590
coordinatorLocation.identifier(),
8691
Optional.empty(),
8792
coordinatorLocation.headers(),
88-
attributes);
93+
attributes,
94+
explicitAck);
8995
};
9096
}
9197

@@ -109,6 +115,7 @@ void serialize(RowBlockBuilder rowBlockBuilder)
109115
}
110116
VARCHAR.writeSlice(rowEntryBuilder.get(2), utf8Slice(HEADERS_CODEC.toJson(headers)));
111117
VARCHAR.writeSlice(rowEntryBuilder.get(3), utf8Slice(ATTRIBUTES_CODEC.toJson(attributes)));
118+
BOOLEAN.writeBoolean(rowEntryBuilder.get(4), explicitAck);
112119
});
113120
}
114121

core/trino-main/src/main/java/io/trino/server/protocol/spooling/SpooledQueryDataProducer.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.io.IOException;
3030
import java.net.URI;
3131
import java.util.List;
32+
import java.util.Optional;
3233
import java.util.function.Consumer;
3334

3435
import static io.trino.client.spooling.DataAttribute.ROWS_COUNT;
@@ -76,8 +77,9 @@ public QueryData produce(ExternalUriInfo uriInfo, Session session, QueryResultRo
7677
.set(ROW_OFFSET, currentOffset)
7778
.build();
7879
builder.withSegment(spooled(
79-
metadata.directUri().orElseGet(() -> buildSegmentDownloadURI(uriBuilder, metadata.identifier())),
80-
buildSegmentAckURI(uriBuilder, metadata.identifier()),
80+
metadata.directUri()
81+
.orElseGet(() -> buildSegmentDownloadURI(uriBuilder, metadata.identifier())),
82+
metadata.explicitAck() ? Optional.of(buildSegmentAckURI(uriBuilder, metadata.identifier())) : Optional.empty(),
8183
attributes,
8284
metadata.headers()));
8385
currentOffset += attributes.get(ROWS_COUNT, Long.class);

core/trino-main/src/main/java/io/trino/server/protocol/spooling/SpoolingConfig.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ public class SpoolingConfig
3838
private Optional<Duration> storageRedirectTtl = Optional.empty();
3939

4040
private boolean allowInlining = true;
41+
private boolean explicitAck = true;
4142
private long maximumInlinedRows = 1000;
4243
private DataSize maximumInlinedSize = DataSize.of(128, KILOBYTE);
4344
private DataSize initialSegmentSize = DataSize.of(8, MEGABYTE);
@@ -123,6 +124,19 @@ public SpoolingConfig setAllowInlining(boolean allowInlining)
123124
return this;
124125
}
125126

127+
public boolean isExplicitAck()
128+
{
129+
return explicitAck;
130+
}
131+
132+
@ConfigDescription("Allow client to acknowledge segment retrieval and its eager removal")
133+
@Config("protocol.spooling.explicit-ack.enabled")
134+
public SpoolingConfig setExplicitAck(boolean explicitAck)
135+
{
136+
this.explicitAck = explicitAck;
137+
return this;
138+
}
139+
126140
public long getMaximumInlinedRows()
127141
{
128142
return maximumInlinedRows;

core/trino-main/src/test/java/io/trino/server/protocol/TestQueryDataSerialization.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.net.URI;
3535
import java.util.List;
3636
import java.util.Map;
37+
import java.util.Optional;
3738
import java.util.OptionalDouble;
3839
import java.util.Set;
3940

@@ -138,11 +139,11 @@ public void testSpooledQueryDataSerialization()
138139
inlined("super".getBytes(UTF_8), dataAttributes(0, 100, 5)),
139140
spooled(
140141
URI.create("http://localhost:8080/v1/download/20160128_214710_00012_rk68b/segments/1"),
141-
URI.create("http://localhost:8080/v1/ack/20160128_214710_00012_rk68b/segments/1"),
142+
Optional.of(URI.create("http://localhost:8080/v1/ack/20160128_214710_00012_rk68b/segments/1")),
142143
dataAttributes(100, 100, 1024), Map.of("x-amz-server-side-encryption", List.of("AES256"))),
143144
spooled(
144145
URI.create("http://localhost:8080/v1/download/20160128_214710_00012_rk68b/segments/2"),
145-
URI.create("http://localhost:8080/v1/ack/20160128_214710_00012_rk68b/segments/2"),
146+
Optional.empty(),
146147
dataAttributes(200, 100, 1024), Map.of("x-amz-server-side-encryption", List.of("AES256")))))
147148
.withAttributes(DataAttributes.builder()
148149
.set(SCHEMA, "serializedSchema")
@@ -179,7 +180,6 @@ public void testSpooledQueryDataSerialization()
179180
{
180181
"type": "spooled",
181182
"uri": "http://localhost:8080/v1/download/20160128_214710_00012_rk68b/segments/2",
182-
"ackUri": "http://localhost:8080/v1/ack/20160128_214710_00012_rk68b/segments/2",
183183
"metadata": {
184184
"rowOffset": 200,
185185
"rowsCount": 100,
@@ -199,9 +199,15 @@ public void testEncodedQueryDataToString()
199199

200200
EncodedQueryData spooledQueryData = new EncodedQueryData("json+zstd", ImmutableMap.of("decryption_key", "secret"), ImmutableList.of(spooled(
201201
URI.create("http://coordinator:8080/v1/segments/uuid"),
202+
Optional.of(URI.create("http://coordinator:8080/v1/segments/uuid")),
203+
dataAttributes(10, 2, 1256), headers())));
204+
assertThat(spooledQueryData.toString()).isEqualTo("EncodedQueryData{encoding=json+zstd, segments=[SpooledSegment{offset=10, rows=2, size=1256, headers=[x-amz-server-side-encryption], ack=true}], metadata=[decryption_key]}");
205+
206+
EncodedQueryData spooledQueryDataWithoutAck = new EncodedQueryData("json+zstd", ImmutableMap.of("decryption_key", "secret"), ImmutableList.of(spooled(
202207
URI.create("http://coordinator:8080/v1/segments/uuid"),
208+
Optional.empty(),
203209
dataAttributes(10, 2, 1256), headers())));
204-
assertThat(spooledQueryData.toString()).isEqualTo("EncodedQueryData{encoding=json+zstd, segments=[SpooledSegment{offset=10, rows=2, size=1256, headers=[x-amz-server-side-encryption]}], metadata=[decryption_key]}");
210+
assertThat(spooledQueryDataWithoutAck.toString()).isEqualTo("EncodedQueryData{encoding=json+zstd, segments=[SpooledSegment{offset=10, rows=2, size=1256, headers=[x-amz-server-side-encryption], ack=false}], metadata=[decryption_key]}");
205211
}
206212

207213
private void testRoundTrip(QueryData queryData, String expectedDataRepresentation)

core/trino-main/src/test/java/io/trino/server/protocol/spooling/TestSpooledBlock.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,31 +55,31 @@ public void verifySerialization(Slice identifier, Optional<URI> directUri, Map<S
5555

5656
public void verifySerializationRoundTrip(Slice identifier, Optional<URI> directUri, Map<String, List<String>> headers)
5757
{
58-
SpooledBlock metadata = new SpooledBlock(identifier, directUri, headers, createDataAttributes(10, 1200));
58+
SpooledBlock metadata = new SpooledBlock(identifier, directUri, headers, createDataAttributes(10, 1200), true);
5959
Page page = new Page(metadata.serialize());
6060
SpooledBlock retrieved = SpooledBlock.deserialize(page);
6161
assertThat(metadata).isEqualTo(retrieved);
6262
}
6363

6464
private void verifySerializationRoundTripWithNonEmptyPage(Slice identifier, Optional<URI> directUri, Map<String, List<String>> headers)
6565
{
66-
SpooledBlock metadata = new SpooledBlock(identifier, directUri, headers, createDataAttributes(10, 1100));
66+
SpooledBlock metadata = new SpooledBlock(identifier, directUri, headers, createDataAttributes(10, 1100), false);
6767
Page page = new Page(blockWithPositions(1, true), metadata.serialize());
6868
SpooledBlock retrieved = SpooledBlock.deserialize(page);
6969
assertThat(metadata).isEqualTo(retrieved);
7070
}
7171

7272
private void verifyThrowsErrorOnNonNullPositions(Slice identifier, Optional<URI> directUri, Map<String, List<String>> headers)
7373
{
74-
SpooledBlock metadata = new SpooledBlock(identifier, directUri, headers, createDataAttributes(20, 1200));
74+
SpooledBlock metadata = new SpooledBlock(identifier, directUri, headers, createDataAttributes(20, 1200), true);
7575

7676
assertThatThrownBy(() -> SpooledBlock.deserialize(new Page(blockWithPositions(1, false), metadata.serialize())))
7777
.hasMessage("Spooling metadata block must have all but last channels null");
7878
}
7979

8080
private void verifyThrowsErrorOnMultiplePositions(Slice identifier, Optional<URI> directUri, Map<String, List<String>> headers)
8181
{
82-
SpooledBlock metadata = new SpooledBlock(identifier, directUri, headers, createDataAttributes(30, 1300));
82+
SpooledBlock metadata = new SpooledBlock(identifier, directUri, headers, createDataAttributes(30, 1300), false);
8383
RowBlockBuilder rowBlockBuilder = SPOOLING_METADATA_TYPE.createBlockBuilder(null, 2);
8484
metadata.serialize(rowBlockBuilder);
8585
metadata.serialize(rowBlockBuilder);

0 commit comments

Comments
 (0)