Skip to content

Commit 9a4a97b

Browse files
committed
add local exchange partition buffer size system config
1 parent 279e557 commit 9a4a97b

File tree

5 files changed

+33
-2
lines changed

5 files changed

+33
-2
lines changed

presto-docs/src/main/sphinx/presto_cpp/properties.rst

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,14 @@ avoid exceeding memory limits for the query.
218218
When ``spill_enabled`` is ``true``, this determines whether Presto will try spilling memory to disk for order by to
219219
avoid exceeding memory limits for the query.
220220

221+
``local-exchange.max-partition-buffer-size``
222+
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
223+
224+
* **Type:** ``integer``
225+
* **Default value:** ``65536`` (64KB)
226+
227+
Specifies the maximum size in bytes to accumulate for a single partition of a local exchange before flushing.
228+
221229

222230
``shared-arbitrator.reserved-capacity``
223231
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

presto-native-execution/presto_cpp/main/QueryContextManager.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,9 @@ void updateFromSystemConfigs(
5757
{core::QueryConfig::kAggregationSpillEnabled,
5858
std::string(SystemConfig::kAggregationSpillEnabled)},
5959
{core::QueryConfig::kRequestDataSizesMaxWaitSec,
60-
std::string(SystemConfig::kRequestDataSizesMaxWaitSec)}};
60+
std::string(SystemConfig::kRequestDataSizesMaxWaitSec)},
61+
{core::QueryConfig::kMaxLocalExchangePartitionBufferSize,
62+
std::string(SystemConfig::kMaxLocalExchangePartitionBufferSize)}};
6163
for (const auto& configNameEntry : sessionSystemConfigMapping) {
6264
const auto& sessionName = configNameEntry.first;
6365
const auto& systemConfigName = configNameEntry.second;

presto-native-execution/presto_cpp/main/common/Configs.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,7 @@ SystemConfig::SystemConfig() {
263263
STR_PROP(kPluginDir, ""),
264264
NUM_PROP(kExchangeIoEvbViolationThresholdMs, 1000),
265265
NUM_PROP(kHttpSrvIoEvbViolationThresholdMs, 1000),
266+
NUM_PROP(kMaxLocalExchangePartitionBufferSize, 65536),
266267
};
267268
}
268269

@@ -909,6 +910,10 @@ int32_t SystemConfig::httpSrvIoEvbViolationThresholdMs() const {
909910
.value();
910911
}
911912

913+
uint64_t SystemConfig::maxLocalExchangePartitionBufferSize() const {
914+
return optionalProperty<uint64_t>(kMaxLocalExchangePartitionBufferSize).value();
915+
}
916+
912917
NodeConfig::NodeConfig() {
913918
registeredProps_ =
914919
std::unordered_map<std::string, folly::Optional<std::string>>{

presto-native-execution/presto_cpp/main/common/Configs.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -755,6 +755,9 @@ class SystemConfig : public ConfigBase {
755755
static constexpr std::string_view kHttpSrvIoEvbViolationThresholdMs{
756756
"http-server.io-evb-violation-threshold-ms"};
757757

758+
static constexpr std::string_view kMaxLocalExchangePartitionBufferSize{
759+
"local-exchange.max-partition-buffer-size"};
760+
758761
SystemConfig();
759762

760763
virtual ~SystemConfig() = default;
@@ -1035,6 +1038,8 @@ class SystemConfig : public ConfigBase {
10351038
int32_t exchangeIoEvbViolationThresholdMs() const;
10361039

10371040
int32_t httpSrvIoEvbViolationThresholdMs() const;
1041+
1042+
uint64_t maxLocalExchangePartitionBufferSize() const;
10381043
};
10391044

10401045
/// Provides access to node properties defined in node.properties file.

presto-native-execution/presto_cpp/main/tests/QueryContextManagerTest.cpp

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,8 @@ TEST_F(QueryContextManagerTest, defaultSessionProperties) {
127127
queryConfig.spillWriteBufferSize(), defaultQC->spillWriteBufferSize());
128128
EXPECT_EQ(
129129
queryConfig.requestDataSizesMaxWaitSec(), defaultQC->requestDataSizesMaxWaitSec());
130+
EXPECT_EQ(
131+
queryConfig.maxLocalExchangePartitionBufferSize(), defaultQC->maxLocalExchangePartitionBufferSize());
130132
}
131133

132134
TEST_F(QueryContextManagerTest, overridingSessionProperties) {
@@ -162,6 +164,9 @@ TEST_F(QueryContextManagerTest, overridingSessionProperties) {
162164
EXPECT_EQ(
163165
queryCtx->queryConfig().requestDataSizesMaxWaitSec(),
164166
systemConfig->requestDataSizesMaxWaitSec());
167+
EXPECT_EQ(
168+
queryCtx->queryConfig().maxLocalExchangePartitionBufferSize(),
169+
systemConfig->maxLocalExchangePartitionBufferSize());
165170
}
166171
{
167172
protocol::SessionRepresentation session{
@@ -171,7 +176,8 @@ TEST_F(QueryContextManagerTest, overridingSessionProperties) {
171176
{"spill_enabled", "true"},
172177
{"aggregation_spill_enabled", "false"},
173178
{"join_spill_enabled", "true"},
174-
{"request_data_sizes_max_wait_sec", "12"}}};
179+
{"request_data_sizes_max_wait_sec", "12"},
180+
{"local-exchange.max-partition-buffer-size", "1024"}}};
175181
protocol::TaskUpdateRequest updateRequest;
176182
updateRequest.session = session;
177183
auto queryCtx =
@@ -194,6 +200,11 @@ TEST_F(QueryContextManagerTest, overridingSessionProperties) {
194200
EXPECT_NE(
195201
queryCtx->queryConfig().aggregationSpillEnabled(),
196202
systemConfig->aggregationSpillEnabled());
203+
// Override with different value
204+
EXPECT_EQ(queryCtx->queryConfig().maxLocalExchangePartitionBufferSize(), 1024);
205+
EXPECT_NE(
206+
queryCtx->queryConfig().maxLocalExchangePartitionBufferSize(),
207+
systemConfig->maxLocalExchangePartitionBufferSize());
197208
// Override with same value
198209
EXPECT_EQ(
199210
queryCtx->queryConfig().joinSpillEnabled(), true);

0 commit comments

Comments
 (0)