Skip to content

Commit 1757392

Browse files
pedroerpfacebook-github-bot
authored andcommitted
refactor(query-ctx): Add builder pattern and documentation
Summary: Adding a builder pattern for constructing QueeryCtx objects. QueryCtx is part of the external API, and adding members to it over time has cluttered the create() and constructor methods. It was pretty inconvenient to add and pass new members to it. Also adding more documentation and removing unnecessary includes and forward declarations. No logic changes. Differential Revision: D89744645
1 parent fd021d0 commit 1757392

File tree

6 files changed

+242
-79
lines changed

6 files changed

+242
-79
lines changed

velox/core/QueryCtx.cpp

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,17 +30,30 @@ std::shared_ptr<QueryCtx> QueryCtx::create(
3030
cache::AsyncDataCache* cache,
3131
std::shared_ptr<memory::MemoryPool> pool,
3232
folly::Executor* spillExecutor,
33-
const std::string& queryId,
33+
std::string queryId,
3434
std::shared_ptr<filesystems::TokenProvider> tokenProvider) {
35+
return QueryCtx::Builder()
36+
.executor(executor)
37+
.queryConfig(std::move(queryConfig))
38+
.connectorConfigs(std::move(connectorConfigs))
39+
.asyncDataCache(cache)
40+
.pool(std::move(pool))
41+
.spillExecutor(spillExecutor)
42+
.queryId(std::move(queryId))
43+
.tokenProvider(std::move(tokenProvider))
44+
.build();
45+
}
46+
47+
std::shared_ptr<QueryCtx> QueryCtx::Builder::build() {
3548
std::shared_ptr<QueryCtx> queryCtx(new QueryCtx(
36-
executor,
37-
std::move(queryConfig),
38-
std::move(connectorConfigs),
39-
cache,
40-
std::move(pool),
41-
spillExecutor,
42-
queryId,
43-
std::move(tokenProvider)));
49+
executor_,
50+
std::move(queryConfig_),
51+
std::move(connectorConfigs_),
52+
cache_,
53+
std::move(pool_),
54+
spillExecutor_,
55+
std::move(queryId_),
56+
std::move(tokenProvider_)));
4457
queryCtx->maybeSetReclaimer();
4558
return queryCtx;
4659
}
@@ -70,7 +83,7 @@ QueryCtx::QueryCtx(
7083
// We attach a monotonically increasing sequence number to ensure the pool
7184
// name is unique.
7285
static std::atomic<int64_t> seqNum{0};
73-
return fmt::format("query.{}.{}", queryId.c_str(), seqNum++);
86+
return fmt::format("query.{}.{}", queryId, seqNum++);
7487
}
7588

7689
void QueryCtx::maybeSetReclaimer() {

velox/core/QueryCtx.h

Lines changed: 151 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,25 +24,76 @@
2424
#include "velox/vector/DecodedVector.h"
2525
#include "velox/vector/VectorPool.h"
2626

27-
namespace facebook::velox {
28-
class Config;
29-
}
30-
3127
namespace facebook::velox::core {
3228

29+
/// Query execution context that manages resources and configuration for a
30+
/// query.
31+
///
32+
/// QueryCtx encapsulates query-level state and resources including:
33+
///
34+
/// - Memory pool management and memory arbitration.
35+
/// - Query and connector-specific configuration.
36+
/// - Executor for parallel task execution.
37+
/// - Async data cache for IO operations.
38+
/// - Spill executor for disk-based operations.
39+
/// - Query tracing and metrics tracking.
40+
///
41+
/// Usage Contexts:
42+
///
43+
/// - Multi-threaded execution: Used with Task::start() where an executor must
44+
/// be provided and its lifetime must outlive all tasks using this context.
45+
/// - Single-threaded execution: Used with ExecCtx or Task::next() for
46+
/// expression evaluation where no executor is required.
47+
///
48+
/// Construction:
49+
///
50+
/// To construct a QueryCtx, prefer to use the builder pattern:
51+
///
52+
/// @code
53+
/// auto queryCtx = QueryCtx::Builder()
54+
/// .executor(myExecutor)
55+
/// .queryConfig(configMap)
56+
/// .queryId("query-123")
57+
/// .pool(myMemoryPool)
58+
/// .build();
59+
/// @endcode
60+
///
61+
/// Memory Management:
62+
///
63+
/// - Automatically creates a root memory pool if not provided
64+
/// - Supports memory arbitration and reclamation under memory pressure
65+
/// - Tracks spilled bytes with configurable limits
66+
/// - Thread-safe memory pool operations
67+
///
68+
/// Thread-safety: QueryCtx is thread-safe for concurrent access across
69+
/// multiple tasks and operators within a query execution.
3370
class QueryCtx : public std::enable_shared_from_this<QueryCtx> {
3471
public:
3572
~QueryCtx() {
3673
VELOX_CHECK(!underArbitration_);
3774
}
3875

39-
/// QueryCtx is used in different places. When used with `Task::start()`, it's
40-
/// required that the caller supplies the executor and ensure its lifetime
41-
/// outlives the tasks that use it. In contrast, when used in expression
42-
/// evaluation through `ExecCtx` or 'Task::next()' for single thread execution
43-
/// mode, executor is not needed. Hence, we don't require executor to always
44-
/// be passed in here, but instead, ensure that executor exists when actually
45-
/// being used.
76+
/// Creates a new QueryCtx instance with the specified configuration.
77+
///
78+
/// This factory method constructs a QueryCtx with all necessary resources
79+
/// and automatically sets up memory reclamation if not already configured.
80+
///
81+
/// @param executor Optional executor for parallel task execution. Required
82+
/// when used with Task::start(), but not needed for
83+
/// expression evaluation or single-threaded execution.
84+
/// @param queryConfig Query-level configuration settings.
85+
/// @param connectorConfigs Connector-specific configuration mappings.
86+
/// @param cache Async data cache for IO operations (defaults to global
87+
/// instance).
88+
/// @param pool Memory pool for query execution (auto-created if nullptr).
89+
/// @param spillExecutor Optional executor for spilling operations.
90+
/// @param queryId Unique identifier for this query.
91+
/// @param tokenProvider Optional filesystem token provider for
92+
/// authentication.
93+
/// @return Shared pointer to the newly created QueryCtx.
94+
///
95+
/// Note: The caller must ensure the executor's lifetime outlives all tasks
96+
/// using this QueryCtx when executor is provided.
4697
static std::shared_ptr<QueryCtx> create(
4798
folly::Executor* executor = nullptr,
4899
QueryConfig&& queryConfig = QueryConfig{{}},
@@ -51,9 +102,95 @@ class QueryCtx : public std::enable_shared_from_this<QueryCtx> {
51102
cache::AsyncDataCache* cache = cache::AsyncDataCache::getInstance(),
52103
std::shared_ptr<memory::MemoryPool> pool = nullptr,
53104
folly::Executor* spillExecutor = nullptr,
54-
const std::string& queryId = "",
105+
std::string queryId = "",
55106
std::shared_ptr<filesystems::TokenProvider> tokenProvider = {});
56107

108+
/// Builder pattern for constructing QueryCtx instances.
109+
///
110+
/// Provides a fluent interface for creating QueryCtx with optional
111+
/// parameters. This is the recommended approach for improved readability,
112+
/// especially when only setting a subset of configuration options.
113+
///
114+
/// Example:
115+
/// @code
116+
/// auto ctx = QueryCtx::Builder()
117+
/// .queryId("my-query")
118+
/// .executor(myExecutor)
119+
/// .queryConfig(QueryConfig{mySettings})
120+
/// .build();
121+
/// @endcode
122+
class Builder {
123+
public:
124+
Builder& executor(folly::Executor* executor) {
125+
executor_ = executor;
126+
return *this;
127+
}
128+
129+
Builder& queryConfig(QueryConfig queryConfig) {
130+
queryConfig_ = std::move(queryConfig);
131+
return *this;
132+
}
133+
134+
Builder& connectorConfigs(
135+
std::unordered_map<std::string, std::shared_ptr<config::ConfigBase>>
136+
connectorConfigs) {
137+
connectorConfigs_ = std::move(connectorConfigs);
138+
return *this;
139+
}
140+
141+
Builder& asyncDataCache(cache::AsyncDataCache* cache) {
142+
cache_ = cache;
143+
return *this;
144+
}
145+
146+
Builder& pool(std::shared_ptr<memory::MemoryPool> pool) {
147+
pool_ = std::move(pool);
148+
return *this;
149+
}
150+
151+
Builder& spillExecutor(folly::Executor* spillExecutor) {
152+
spillExecutor_ = spillExecutor;
153+
return *this;
154+
}
155+
156+
Builder& queryId(std::string queryId) {
157+
queryId_ = std::move(queryId);
158+
return *this;
159+
}
160+
161+
Builder& tokenProvider(
162+
std::shared_ptr<filesystems::TokenProvider> tokenProvider) {
163+
tokenProvider_ = std::move(tokenProvider);
164+
return *this;
165+
}
166+
167+
/// Constructs and returns a QueryCtx with the configured parameters.
168+
///
169+
/// @return Shared pointer to the newly created QueryCtx instance
170+
std::shared_ptr<QueryCtx> build();
171+
172+
private:
173+
folly::Executor* executor_{nullptr};
174+
QueryConfig queryConfig_{QueryConfig{{}}};
175+
std::unordered_map<std::string, std::shared_ptr<config::ConfigBase>>
176+
connectorConfigs_;
177+
cache::AsyncDataCache* cache_{cache::AsyncDataCache::getInstance()};
178+
std::shared_ptr<memory::MemoryPool> pool_;
179+
folly::Executor* spillExecutor_{nullptr};
180+
std::string queryId_;
181+
std::shared_ptr<filesystems::TokenProvider> tokenProvider_;
182+
};
183+
184+
/// Generates a unique memory pool name for a query.
185+
///
186+
/// Creates a pool name by combining the provided query ID with a
187+
/// monotonically increasing sequence number to ensure uniqueness across
188+
/// multiple pool creations, even for the same query ID.
189+
///
190+
/// @param queryId The query identifier to incorporate into the pool name
191+
/// @return A unique pool name in the format "query.{queryId}.{seqNum}"
192+
///
193+
/// Thread-safe: Uses atomic operations for sequence number generation.
57194
static std::string generatePoolName(const std::string& queryId);
58195

59196
memory::MemoryPool* pool() const {
@@ -214,6 +351,7 @@ class QueryCtx : public std::enable_shared_from_this<QueryCtx> {
214351

215352
// Invoked to start memory arbitration on this query.
216353
void startArbitration();
354+
217355
// Invoked to stop memory arbitration on this query.
218356
void finishArbitration();
219357

@@ -230,6 +368,7 @@ class QueryCtx : public std::enable_shared_from_this<QueryCtx> {
230368
std::atomic<uint64_t> numTracedBytes_{0};
231369

232370
mutable std::mutex mutex_;
371+
233372
// Indicates if this query is under memory arbitration or not.
234373
std::atomic_bool underArbitration_{false};
235374
std::vector<ContinuePromise> arbitrationPromises_;

velox/exec/tests/TableWriterTest.cpp

Lines changed: 47 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2815,8 +2815,10 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, reclaimFromTableWriter) {
28152815
const int numPrevArbitrationFailures = arbitrator->stats().numFailures;
28162816
const int numPrevNonReclaimableAttempts =
28172817
arbitrator->stats().numNonReclaimableAttempts;
2818-
auto queryCtx = core::QueryCtx::create(
2819-
executor_.get(), QueryConfig{{}}, {}, nullptr, std::move(queryPool));
2818+
auto queryCtx = core::QueryCtx::Builder()
2819+
.executor(executor_.get())
2820+
.pool(std::move(queryPool))
2821+
.build();
28202822
ASSERT_EQ(queryCtx->pool()->capacity(), kQueryMemoryCapacity);
28212823

28222824
std::atomic_int numInputs{0};
@@ -2934,8 +2936,10 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, reclaimFromSortTableWriter) {
29342936
const int numPrevArbitrationFailures = arbitrator->stats().numFailures;
29352937
const int numPrevNonReclaimableAttempts =
29362938
arbitrator->stats().numNonReclaimableAttempts;
2937-
auto queryCtx = core::QueryCtx::create(
2938-
executor_.get(), QueryConfig{{}}, {}, nullptr, std::move(queryPool));
2939+
auto queryCtx = core::QueryCtx::Builder()
2940+
.executor(executor_.get())
2941+
.pool(std::move(queryPool))
2942+
.build();
29392943
ASSERT_EQ(queryCtx->pool()->capacity(), kQueryMemoryCapacity);
29402944

29412945
const auto spillStats = common::globalSpillStats();
@@ -3049,8 +3053,11 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, writerFlushThreshold) {
30493053
const int numPrevArbitrationFailures = arbitrator->stats().numFailures;
30503054
const int numPrevNonReclaimableAttempts =
30513055
arbitrator->stats().numNonReclaimableAttempts;
3052-
auto queryCtx = core::QueryCtx::create(
3053-
executor_.get(), QueryConfig{{}}, {}, nullptr, std::move(queryPool));
3056+
auto queryCtx = core::QueryCtx::Builder()
3057+
.executor(executor_.get())
3058+
.pool(std::move(queryPool))
3059+
.build();
3060+
30543061
ASSERT_EQ(queryCtx->pool()->capacity(), kQueryMemoryCapacity);
30553062

30563063
memory::MemoryPool* compressionPool{nullptr};
@@ -3157,8 +3164,11 @@ DEBUG_ONLY_TEST_F(
31573164
const int numPrevArbitrationFailures = arbitrator->stats().numFailures;
31583165
const int numPrevNonReclaimableAttempts =
31593166
arbitrator->stats().numNonReclaimableAttempts;
3160-
auto queryCtx = core::QueryCtx::create(
3161-
executor_.get(), QueryConfig{{}}, {}, nullptr, std::move(queryPool));
3167+
auto queryCtx = core::QueryCtx::Builder()
3168+
.executor(executor_.get())
3169+
.pool(std::move(queryPool))
3170+
.build();
3171+
31623172
ASSERT_EQ(queryCtx->pool()->capacity(), kQueryMemoryCapacity);
31633173

31643174
std::atomic<bool> injectFakeAllocationOnce{true};
@@ -3240,8 +3250,10 @@ DEBUG_ONLY_TEST_F(
32403250
const int numPrevNonReclaimableAttempts =
32413251
arbitrator->stats().numNonReclaimableAttempts;
32423252
const int numPrevReclaimedBytes = arbitrator->stats().reclaimedUsedBytes;
3243-
auto queryCtx = core::QueryCtx::create(
3244-
executor_.get(), QueryConfig{{}}, {}, nullptr, std::move(queryPool));
3253+
auto queryCtx = core::QueryCtx::Builder()
3254+
.executor(executor_.get())
3255+
.pool(std::move(queryPool))
3256+
.build();
32453257
ASSERT_EQ(queryCtx->pool()->capacity(), kQueryMemoryCapacity);
32463258

32473259
std::atomic<bool> writerNoMoreInput{false};
@@ -3340,8 +3352,10 @@ DEBUG_ONLY_TEST_F(
33403352
const int numPrevArbitrationFailures = arbitrator->stats().numFailures;
33413353
const int numPrevNonReclaimableAttempts =
33423354
arbitrator->stats().numNonReclaimableAttempts;
3343-
auto queryCtx = core::QueryCtx::create(
3344-
executor_.get(), QueryConfig{{}}, {}, nullptr, std::move(queryPool));
3355+
auto queryCtx = core::QueryCtx::Builder()
3356+
.executor(executor_.get())
3357+
.pool(std::move(queryPool))
3358+
.build();
33453359
ASSERT_EQ(queryCtx->pool()->capacity(), kQueryMemoryCapacity);
33463360

33473361
std::atomic<bool> injectFakeAllocationOnce{true};
@@ -3432,8 +3446,11 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, tableFileWriteError) {
34323446

34333447
auto queryPool = memory::memoryManager()->addRootPool(
34343448
"tableFileWriteError", kQueryMemoryCapacity);
3435-
auto queryCtx = core::QueryCtx::create(
3436-
executor_.get(), QueryConfig{{}}, {}, nullptr, std::move(queryPool));
3449+
auto queryCtx = core::QueryCtx::Builder()
3450+
.executor(executor_.get())
3451+
.pool(std::move(queryPool))
3452+
.build();
3453+
34373454
ASSERT_EQ(queryCtx->pool()->capacity(), kQueryMemoryCapacity);
34383455

34393456
std::atomic_bool injectWriterErrorOnce{true};
@@ -3500,8 +3517,10 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, tableWriteSpillUseMoreMemory) {
35003517

35013518
auto queryPool = memory::memoryManager()->addRootPool(
35023519
"tableWriteSpillUseMoreMemory", kQueryMemoryCapacity / 4);
3503-
auto queryCtx = core::QueryCtx::create(
3504-
executor_.get(), QueryConfig{{}}, {}, nullptr, std::move(queryPool));
3520+
auto queryCtx = core::QueryCtx::Builder()
3521+
.executor(executor_.get())
3522+
.pool(std::move(queryPool))
3523+
.build();
35053524
ASSERT_EQ(queryCtx->pool()->capacity(), kQueryMemoryCapacity / 4);
35063525

35073526
auto fakeLeafPool = queryCtx->pool()->addLeafChild(
@@ -3587,14 +3606,18 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, tableWriteReclaimOnClose) {
35873606

35883607
auto queryPool = memory::memoryManager()->addRootPool(
35893608
"tableWriteSpillUseMoreMemory", kQueryMemoryCapacity);
3590-
auto queryCtx = core::QueryCtx::create(
3591-
executor_.get(), QueryConfig{{}}, {}, nullptr, std::move(queryPool));
3609+
auto queryCtx = core::QueryCtx::Builder()
3610+
.executor(executor_.get())
3611+
.pool(std::move(queryPool))
3612+
.build();
35923613
ASSERT_EQ(queryCtx->pool()->capacity(), kQueryMemoryCapacity);
35933614

35943615
auto fakeQueryPool =
35953616
memory::memoryManager()->addRootPool("fake", kQueryMemoryCapacity);
3596-
auto fakeQueryCtx = core::QueryCtx::create(
3597-
executor_.get(), QueryConfig{{}}, {}, nullptr, std::move(fakeQueryPool));
3617+
auto fakeQueryCtx = core::QueryCtx::Builder()
3618+
.executor(executor_.get())
3619+
.pool(std::move(fakeQueryPool))
3620+
.build();
35983621
ASSERT_EQ(fakeQueryCtx->pool()->capacity(), kQueryMemoryCapacity);
35993622

36003623
auto fakeLeafPool = fakeQueryCtx->pool()->addLeafChild(
@@ -3680,8 +3703,10 @@ DEBUG_ONLY_TEST_F(
36803703
.data;
36813704
auto queryPool = memory::memoryManager()->addRootPool(
36823705
"tableWriteSpillUseMoreMemory", kQueryMemoryCapacity);
3683-
auto queryCtx = core::QueryCtx::create(
3684-
executor_.get(), QueryConfig{{}}, {}, nullptr, std::move(queryPool));
3706+
auto queryCtx = core::QueryCtx::Builder()
3707+
.executor(executor_.get())
3708+
.pool(std::move(queryPool))
3709+
.build();
36853710
ASSERT_EQ(queryCtx->pool()->capacity(), kQueryMemoryCapacity);
36863711

36873712
std::atomic_bool writerCloseWaitFlag{true};

0 commit comments

Comments
 (0)