File tree Expand file tree Collapse file tree 3 files changed +48
-2
lines changed
Expand file tree Collapse file tree 3 files changed +48
-2
lines changed Original file line number Diff line number Diff line change 2121
2222namespace facebook ::velox::core {
2323
24+ QueryCtx::~QueryCtx () {
25+ for (auto & cb : releaseCallbacks_) {
26+ cb ();
27+ }
28+ VELOX_CHECK (!underArbitration_);
29+ }
30+
2431// static
2532std::shared_ptr<QueryCtx> QueryCtx::create (
2633 folly::Executor* executor,
Original file line number Diff line number Diff line change 1818
1919#include < folly/Executor.h>
2020#include < folly/executors/CPUThreadPoolExecutor.h>
21+ #include < deque>
22+ #include < functional>
2123#include " velox/common/caching/AsyncDataCache.h"
2224#include " velox/common/memory/Memory.h"
2325#include " velox/core/QueryConfig.h"
@@ -32,8 +34,11 @@ namespace facebook::velox::core {
3234
3335class QueryCtx : public std ::enable_shared_from_this<QueryCtx> {
3436 public:
35- ~QueryCtx () {
36- VELOX_CHECK (!underArbitration_);
37+ using ReleaseCallback = std::function<void ()>;
38+
39+ ~QueryCtx ();
40+ void addReleaseCallback (ReleaseCallback callback) {
41+ releaseCallbacks_.push_back (std::move (callback));
3742 }
3843
3944 // / QueryCtx is used in different places. When used with `Task::start()`, it's
@@ -234,6 +239,8 @@ class QueryCtx : public std::enable_shared_from_this<QueryCtx> {
234239 std::atomic_bool underArbitration_{false };
235240 std::vector<ContinuePromise> arbitrationPromises_;
236241 std::shared_ptr<filesystems::TokenProvider> fsTokenProvider_;
242+ // Callbacks invoked before destruction to clean up external resources.
243+ std::deque<ReleaseCallback> releaseCallbacks_;
237244};
238245
239246// Represents the state of one thread of query execution.
Original file line number Diff line number Diff line change @@ -51,4 +51,36 @@ TEST_F(QueryCtxTest, withSysRootPool) {
5151 ASSERT_FALSE (
5252 queryPool->reclaimer ()->reclaimableBytes (*queryPool, reclaimableBytes));
5353}
54+
55+ TEST_F (QueryCtxTest, releaseCallbacks) {
56+ int callbackCount = 0 ;
57+ std::string capturedQueryId;
58+
59+ {
60+ auto queryCtx = QueryCtx::create (
61+ nullptr ,
62+ QueryConfig{{}},
63+ std::unordered_map<std::string, std::shared_ptr<config::ConfigBase>>{},
64+ nullptr ,
65+ nullptr ,
66+ nullptr ,
67+ " test_query_id" );
68+
69+ // Add multiple callbacks.
70+ queryCtx->addReleaseCallback ([&callbackCount]() { ++callbackCount; });
71+
72+ queryCtx->addReleaseCallback (
73+ [&callbackCount, &capturedQueryId, id = queryCtx->queryId ()]() {
74+ ++callbackCount;
75+ capturedQueryId = id;
76+ });
77+
78+ // Callbacks should not be invoked yet.
79+ ASSERT_EQ (callbackCount, 0 );
80+ }
81+
82+ // After QueryCtx destruction, all callbacks should have been invoked.
83+ ASSERT_EQ (callbackCount, 2 );
84+ ASSERT_EQ (capturedQueryId, " test_query_id" );
85+ }
5486} // namespace facebook::velox::core::test
You can’t perform that action at this time.
0 commit comments