feat(native): Add native_min_shuffle_compression_page_size_bytes session property (#27683)#27683
Conversation
Reviewer's GuideAdds a new native session property to control the minimum shuffle compression page size, wires it through Java and C++ session/property plumbing, and ensures BroadcastWrite uses the configured threshold while keeping the default behavior disabled (0). Sequence diagram for native_min_shuffle_compression_page_size_bytes propagationsequenceDiagram
actor User
participant PrestoSession as PrestoSession
participant NativeWorkerSessionPropertyProvider as NativeWorkerSessionPropertyProvider
participant NativeEngine as NativeEngine
participant QueryConfig as QueryConfig
participant BroadcastWriteOperator as BroadcastWriteOperator
participant VectorSerde as VectorSerdeOptions
User->>PrestoSession: Set native_min_shuffle_compression_page_size_bytes
PrestoSession->>NativeWorkerSessionPropertyProvider: Request native session properties
NativeWorkerSessionPropertyProvider-->>PrestoSession: integerProperty NATIVE_MIN_SHUFFLE_COMPRESSION_PAGE_SIZE_BYTES
PrestoSession->>NativeEngine: Start query with session properties
NativeEngine->>QueryConfig: Initialize from session
QueryConfig-->>NativeEngine: minShuffleCompressionPageSizeBytes
NativeEngine->>BroadcastWriteOperator: Create operator with OperatorCtx
BroadcastWriteOperator->>QueryConfig: minShuffleCompressionPageSizeBytes()
QueryConfig-->>BroadcastWriteOperator: thresholdBytes
BroadcastWriteOperator->>VectorSerde: getVectorSerdeOptions(compressionKind, Presto, nullopt, thresholdBytes)
VectorSerde-->>BroadcastWriteOperator: SerdeOptions with minPageSizeBytes
Class diagram for new shuffle compression page size session propertyclassDiagram
class NativeWorkerSessionPropertyProvider {
<<class>>
+String NATIVE_MAX_PAGE_PARTITIONING_BUFFER_SIZE
+String NATIVE_PARTITIONED_OUTPUT_EAGER_FLUSH
+String NATIVE_MAX_OUTPUT_BUFFER_SIZE
+String NATIVE_MIN_SHUFFLE_COMPRESSION_PAGE_SIZE_BYTES
+String NATIVE_QUERY_TRACE_ENABLED
+String NATIVE_QUERY_TRACE_DIR
+String NATIVE_QUERY_TRACE_NODE_ID
+NativeWorkerSessionPropertyProvider(FeaturesConfig featuresConfig)
-SessionProperty integerProperty(String name, String description, int defaultValue, boolean hidden)
}
class SessionProperties {
<<class>>
+static const char* kShuffleCompressionCodec
+static const char* kMinShuffleCompressionPageSizeBytes
+SessionProperties()
-void addSessionProperty(const char* name, const char* description, Type type, bool hidden, const char* queryConfigKey, std::string defaultValue)
}
class QueryConfig {
<<class>>
+std::string shuffleCompressionKind()
+uint64_t minShuffleCompressionPageSizeBytes()
}
class BroadcastWriteOperator {
<<class>>
-OperatorCtx* operatorCtx_
+BroadcastWriteOperator(OperatorCtx* operatorCtx, BroadcastFactory* broadcastFactory, RowTypePtr rowType)
-void createSerdeOptions()
}
class VectorSerdeOptions {
<<class>>
}
class getVectorSerdeOptions {
<<function>>
+VectorSerdeOptions getVectorSerdeOptions(CompressionKind kind, std::string driverType, std::optional~uint32_t~ level, uint64_t minPageSizeBytes)
}
SessionProperties --> QueryConfig : uses QueryConfig kMinShuffleCompressionPageSizeBytes
NativeWorkerSessionPropertyProvider --> SessionProperties : maps NATIVE_MIN_SHUFFLE_COMPRESSION_PAGE_SIZE_BYTES
BroadcastWriteOperator --> QueryConfig : calls minShuffleCompressionPageSizeBytes()
BroadcastWriteOperator --> getVectorSerdeOptions : passes minShuffleCompressionPageSizeBytes
getVectorSerdeOptions --> VectorSerdeOptions : returns
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Hey - I've found 1 issue, and left some high level feedback:
- For
native_min_shuffle_compression_page_size_bytes, consider using aDataSize/longProperty(or equivalent wider type) instead ofintegerProperty, since this is a size-in-bytes threshold and may reasonably exceed 32-bit limits in some deployments.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- For `native_min_shuffle_compression_page_size_bytes`, consider using a `DataSize`/`longProperty` (or equivalent wider type) instead of `integerProperty`, since this is a size-in-bytes threshold and may reasonably exceed 32-bit limits in some deployments.
## Individual Comments
### Comment 1
<location path="presto-native-execution/presto_cpp/main/properties/session/SessionProperties.cpp" line_range="379-382" />
<code_context>
+ kMinShuffleCompressionPageSizeBytes,
+ "Native Execution only. Minimum serialized page size in bytes to attempt "
+ "shuffle compression.",
+ INTEGER(),
+ false,
+ QueryConfig::kMinShuffleCompressionPageSizeBytes,
+ std::to_string(c.minShuffleCompressionPageSizeBytes()));
+
// If `legacy_timestamp` is true, the coordinator expects timestamp
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Consider enforcing a non-negative (or minimum) constraint for the page size session property
This is currently a plain `INTEGER()` with no lower bound. If a user sets a negative or unrealistically small value, it may break assumptions about "bytes" (e.g., requiring `>= 0` or `>= 1`) and cause confusing shuffle compression behavior. Please either switch to a constrained numeric type or add config-layer validation so invalid values are rejected during session parsing.
Suggested implementation:
```cpp
addSessionProperty(
kMinShuffleCompressionPageSizeBytes,
"Native Execution only. Minimum serialized page size in bytes to attempt "
"shuffle compression.",
NON_NEGATIVE_INTEGER(),
false,
QueryConfig::kMinShuffleCompressionPageSizeBytes,
std::to_string(c.minShuffleCompressionPageSizeBytes()));
```
1. Ensure that a `NON_NEGATIVE_INTEGER()` type helper exists and is wired to enforce `value >= 0` at session parsing time. This is typically defined alongside the other `INTEGER()`/numeric type helpers used for session properties. If such a helper does not exist:
- Introduce it in the appropriate type helper header/source (where `INTEGER()` is defined).
- Make it wrap the same base integer type as `INTEGER()` but with a minimum value of 0.
2. If your codebase uses a different naming convention (e.g., `NONNEGATIVE_INTEGER()` or `UNSIGNED_INTEGER(min, max)`), adjust the replacement accordingly to match the existing constrained numeric type used for other session properties.
3. Optionally, add a corresponding validation or unit test that attempts to set this session property to negative and very small values to confirm they are rejected or normalized as intended.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| INTEGER(), | ||
| false, | ||
| QueryConfig::kMinShuffleCompressionPageSizeBytes, | ||
| std::to_string(c.minShuffleCompressionPageSizeBytes())); |
There was a problem hiding this comment.
suggestion (bug_risk): Consider enforcing a non-negative (or minimum) constraint for the page size session property
This is currently a plain INTEGER() with no lower bound. If a user sets a negative or unrealistically small value, it may break assumptions about "bytes" (e.g., requiring >= 0 or >= 1) and cause confusing shuffle compression behavior. Please either switch to a constrained numeric type or add config-layer validation so invalid values are rejected during session parsing.
Suggested implementation:
addSessionProperty(
kMinShuffleCompressionPageSizeBytes,
"Native Execution only. Minimum serialized page size in bytes to attempt "
"shuffle compression.",
NON_NEGATIVE_INTEGER(),
false,
QueryConfig::kMinShuffleCompressionPageSizeBytes,
std::to_string(c.minShuffleCompressionPageSizeBytes()));
- Ensure that a
NON_NEGATIVE_INTEGER()type helper exists and is wired to enforcevalue >= 0at session parsing time. This is typically defined alongside the otherINTEGER()/numeric type helpers used for session properties. If such a helper does not exist:- Introduce it in the appropriate type helper header/source (where
INTEGER()is defined). - Make it wrap the same base integer type as
INTEGER()but with a minimum value of 0.
- Introduce it in the appropriate type helper header/source (where
- If your codebase uses a different naming convention (e.g.,
NONNEGATIVE_INTEGER()orUNSIGNED_INTEGER(min, max)), adjust the replacement accordingly to match the existing constrained numeric type used for other session properties. - Optionally, add a corresponding validation or unit test that attempts to set this session property to negative and very small values to confirm they are rejected or normalized as intended.
72b5f9f to
09cf663
Compare
…ion property (prestodb#27683) Summary: Wires the new Velox `min_shuffle_compression_page_size_bytes` property through to a Presto-native session property and the BroadcastWrite operator so users can tune the small-page shuffle-compression skip threshold per query. Adds: - `native_min_shuffle_compression_page_size_bytes` Java session property in NativeWorkerSessionPropertyProvider, grouped with the other PartitionedOutput / output-buffer properties. - Matching constant and addSessionProperty() registration in the Prestissimo SessionProperties (placed near kPartitionedOutputEagerFlush, alongside the other shuffle/output-related session properties). - Mapping entry in SessionPropertiesTest::validateMapping to keep the Java↔Velox name correspondence asserted. - BroadcastWrite operator: passes the new threshold through to getVectorSerdeOptions so broadcast shuffle pages also honor the skip behavior. Default value is 0 (disabled), preserving existing behavior unless the user opts in. Differential Revision: D100420473
…ion property (prestodb#27683) Summary: Wires the new Velox `min_shuffle_compression_page_size_bytes` property through to a Presto-native session property and the BroadcastWrite operator so users can tune the small-page shuffle-compression skip threshold per query. Adds: - `native_min_shuffle_compression_page_size_bytes` Java session property in NativeWorkerSessionPropertyProvider, grouped with the other PartitionedOutput / output-buffer properties. - Matching constant and addSessionProperty() registration in the Prestissimo SessionProperties (placed near kPartitionedOutputEagerFlush, alongside the other shuffle/output-related session properties). - Mapping entry in SessionPropertiesTest::validateMapping to keep the Java↔Velox name correspondence asserted. - BroadcastWrite operator: passes the new threshold through to getVectorSerdeOptions so broadcast shuffle pages also honor the skip behavior. Default value is 0 (disabled), preserving existing behavior unless the user opts in. Differential Revision: D100420473
09cf663 to
f24ca59
Compare
…ion property (prestodb#27683) Summary: Wires the new Velox `min_shuffle_compression_page_size_bytes` property through to a Presto-native session property and the BroadcastWrite operator so users can tune the small-page shuffle-compression skip threshold per query. Adds: - `native_min_shuffle_compression_page_size_bytes` Java session property in NativeWorkerSessionPropertyProvider, grouped with the other PartitionedOutput / output-buffer properties. - Matching constant and addSessionProperty() registration in the Prestissimo SessionProperties (placed near kPartitionedOutputEagerFlush, alongside the other shuffle/output-related session properties). - Mapping entry in SessionPropertiesTest::validateMapping to keep the Java↔Velox name correspondence asserted. - BroadcastWrite operator: passes the new threshold through to getVectorSerdeOptions so broadcast shuffle pages also honor the skip behavior. Default value is 0 (disabled), preserving existing behavior unless the user opts in. Differential Revision: D100420473
f24ca59 to
60cecba
Compare
Summary:
Wires the new Velox
min_shuffle_compression_page_size_bytesproperty through to a Presto-native session property and the BroadcastWrite operator so users can tune the small-page shuffle-compression skip threshold per query.Adds:
native_min_shuffle_compression_page_size_bytesJava session property in NativeWorkerSessionPropertyProvider, grouped with the other PartitionedOutput / output-buffer properties.Default value is 0 (disabled), preserving existing behavior unless the user opts in.
Differential Revision: D100420473