Skip to content

Commit 748a530

Browse files
addaleaxMylesBorins
authored andcommitted
src: add a threadsafe variant of SetImmediate()
Add a variant of `SetImmediate()` that can be called from any thread. This allows removing the `AsyncRequest` abstraction and replaces it with a more generic mechanism. Backport-PR-URL: #32301 PR-URL: #31386 Refs: openjs-foundation/summit#240 Reviewed-By: Gireesh Punathil <[email protected]> Reviewed-By: James M Snell <[email protected]> Reviewed-By: Colin Ihrig <[email protected]> Reviewed-By: Rich Trott <[email protected]>
1 parent aafb224 commit 748a530

File tree

3 files changed

+43
-7
lines changed

3 files changed

+43
-7
lines changed

src/env-inl.h

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -732,13 +732,15 @@ Environment::NativeImmediateQueue::Shift() {
732732
if (!head_)
733733
tail_ = nullptr; // The queue is now empty.
734734
}
735+
size_--;
735736
return ret;
736737
}
737738

738739
void Environment::NativeImmediateQueue::Push(
739740
std::unique_ptr<Environment::NativeImmediateCallback> cb) {
740741
NativeImmediateCallback* prev_tail = tail_;
741742

743+
size_++;
742744
tail_ = cb.get();
743745
if (prev_tail != nullptr)
744746
prev_tail->set_next(std::move(cb));
@@ -758,6 +760,10 @@ void Environment::NativeImmediateQueue::ConcatMove(
758760
other.size_ = 0;
759761
}
760762

763+
size_t Environment::NativeImmediateQueue::size() const {
764+
return size_.load();
765+
}
766+
761767
template <typename Fn>
762768
void Environment::CreateImmediate(Fn&& cb, bool ref) {
763769
auto callback = std::make_unique<NativeImmediateCallbackImpl<Fn>>(
@@ -779,6 +785,17 @@ void Environment::SetUnrefImmediate(Fn&& cb) {
779785
CreateImmediate(std::move(cb), false);
780786
}
781787

788+
template <typename Fn>
789+
void Environment::SetImmediateThreadsafe(Fn&& cb) {
790+
auto callback = std::make_unique<NativeImmediateCallbackImpl<Fn>>(
791+
std::move(cb), false);
792+
{
793+
Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
794+
native_immediates_threadsafe_.Push(std::move(callback));
795+
}
796+
uv_async_send(&task_queues_async_);
797+
}
798+
782799
Environment::NativeImmediateCallback::NativeImmediateCallback(bool refed)
783800
: refed_(refed) {}
784801

@@ -1138,7 +1155,7 @@ void Environment::RemoveCleanupHook(void (*fn)(void*), void* arg) {
11381155
inline void Environment::RegisterFinalizationGroupForCleanup(
11391156
v8::Local<v8::FinalizationGroup> group) {
11401157
cleanup_finalization_groups_.emplace_back(isolate(), group);
1141-
uv_async_send(&cleanup_finalization_groups_async_);
1158+
uv_async_send(&task_queues_async_);
11421159
}
11431160

11441161
size_t CleanupHookCallback::Hash::operator()(

src/env.cc

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -462,15 +462,16 @@ void Environment::InitializeLibuv(bool start_profiler_idle_notifier) {
462462
uv_check_init(event_loop(), &idle_check_handle_);
463463
uv_async_init(
464464
event_loop(),
465-
&cleanup_finalization_groups_async_,
465+
&task_queues_async_,
466466
[](uv_async_t* async) {
467467
Environment* env = ContainerOf(
468-
&Environment::cleanup_finalization_groups_async_, async);
468+
&Environment::task_queues_async_, async);
469469
env->CleanupFinalizationGroups();
470+
env->RunAndClearNativeImmediates();
470471
});
471472
uv_unref(reinterpret_cast<uv_handle_t*>(&idle_prepare_handle_));
472473
uv_unref(reinterpret_cast<uv_handle_t*>(&idle_check_handle_));
473-
uv_unref(reinterpret_cast<uv_handle_t*>(&cleanup_finalization_groups_async_));
474+
uv_unref(reinterpret_cast<uv_handle_t*>(&task_queues_async_));
474475

475476
thread_stopper()->Install(
476477
this, static_cast<void*>(this), [](uv_async_t* handle) {
@@ -534,7 +535,7 @@ void Environment::RegisterHandleCleanups() {
534535
close_and_finish,
535536
nullptr);
536537
RegisterHandleCleanup(
537-
reinterpret_cast<uv_handle_t*>(&cleanup_finalization_groups_async_),
538+
reinterpret_cast<uv_handle_t*>(&task_queues_async_),
538539
close_and_finish,
539540
nullptr);
540541
}
@@ -665,6 +666,15 @@ void Environment::RunAndClearNativeImmediates(bool only_refed) {
665666
"RunAndClearNativeImmediates", this);
666667
size_t ref_count = 0;
667668

669+
// It is safe to check .size() first, because there is a causal relationship
670+
// between pushes to the threadsafe and this function being called.
671+
// For the common case, it's worth checking the size first before establishing
672+
// a mutex lock.
673+
if (native_immediates_threadsafe_.size() > 0) {
674+
Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
675+
native_immediates_.ConcatMove(std::move(native_immediates_threadsafe_));
676+
}
677+
668678
NativeImmediateQueue queue;
669679
queue.ConcatMove(std::move(native_immediates_));
670680

@@ -1078,7 +1088,7 @@ void Environment::CleanupFinalizationGroups() {
10781088
if (try_catch.HasCaught() && !try_catch.HasTerminated())
10791089
errors::TriggerUncaughtException(isolate(), try_catch);
10801090
// Re-schedule the execution of the remainder of the queue.
1081-
uv_async_send(&cleanup_finalization_groups_async_);
1091+
uv_async_send(&task_queues_async_);
10821092
return;
10831093
}
10841094
}

src/env.h

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1192,6 +1192,9 @@ class Environment : public MemoryRetainer {
11921192
inline void SetImmediate(Fn&& cb);
11931193
template <typename Fn>
11941194
inline void SetUnrefImmediate(Fn&& cb);
1195+
template <typename Fn>
1196+
// This behaves like SetImmediate() but can be called from any thread.
1197+
inline void SetImmediateThreadsafe(Fn&& cb);
11951198
// This needs to be available for the JS-land setImmediate().
11961199
void ToggleImmediateRef(bool ref);
11971200

@@ -1281,7 +1284,7 @@ class Environment : public MemoryRetainer {
12811284
uv_idle_t immediate_idle_handle_;
12821285
uv_prepare_t idle_prepare_handle_;
12831286
uv_check_t idle_check_handle_;
1284-
uv_async_t cleanup_finalization_groups_async_;
1287+
uv_async_t task_queues_async_;
12851288
bool profiler_idle_notifier_started_ = false;
12861289

12871290
AsyncHooks async_hooks_;
@@ -1431,12 +1434,18 @@ class Environment : public MemoryRetainer {
14311434
// 'other' afterwards.
14321435
inline void ConcatMove(NativeImmediateQueue&& other);
14331436

1437+
// size() is atomic and may be called from any thread.
1438+
inline size_t size() const;
1439+
14341440
private:
1441+
std::atomic<size_t> size_ {0};
14351442
std::unique_ptr<NativeImmediateCallback> head_;
14361443
NativeImmediateCallback* tail_ = nullptr;
14371444
};
14381445

14391446
NativeImmediateQueue native_immediates_;
1447+
Mutex native_immediates_threadsafe_mutex_;
1448+
NativeImmediateQueue native_immediates_threadsafe_;
14401449

14411450
void RunAndClearNativeImmediates(bool only_refed = false);
14421451
static void CheckImmediate(uv_check_t* handle);

0 commit comments

Comments
 (0)