Skip to content

Commit b0f0a20

Browse files
committed
Fix race condition in events-executor
The initial implementation of the events-executor contained a bug where the executor would end up in an inconsistent state and stop processing interrupt/shutdown notifications. Manually adding a node to the executor results in a) producing a notify waitable event and b) refreshing the executor collections. The inconsistent state would happen if the event was processed before the collections were finished to be refreshed: the executor would pick up the event but be unable to process it. This would leave the `notify_waitable_event_pushed_` flag to true, preventing additional notify waitable events to be pushed. The behavior is observable only under heavy load. Signed-off-by: Alberto Soragna <[email protected]>
1 parent 77ede02 commit b0f0a20

File tree

3 files changed

+94
-15
lines changed

3 files changed

+94
-15
lines changed

rclcpp/include/rclcpp/experimental/executors/events_executor/events_executor.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,10 @@ class EventsExecutor : public rclcpp::Executor
243243
std::function<void(size_t, int)>
244244
create_waitable_callback(const rclcpp::Waitable * waitable_id);
245245

246+
/// Utility to add the notify waitable to an entities collection
247+
void
248+
add_notify_waitable_to_collection(ExecutorEntitiesCollection::WaitableCollection & collection);
249+
246250
/// Searches for the provided entity_id in the collection and returns the entity if valid
247251
template<typename CollectionType>
248252
typename CollectionType::EntitySharedPtr

rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ EventsExecutor::EventsExecutor(
5050
timers_manager_ =
5151
std::make_shared<rclcpp::experimental::TimersManager>(context_, timer_on_ready_cb);
5252

53+
this->current_entities_collection_ =
54+
std::make_shared<rclcpp::executors::ExecutorEntitiesCollection>();
55+
5356
notify_waitable_ = std::make_shared<rclcpp::executors::ExecutorNotifyWaitable>(
5457
[this]() {
5558
// This callback is invoked when:
@@ -61,6 +64,10 @@ EventsExecutor::EventsExecutor(
6164
this->refresh_current_collection_from_callback_groups();
6265
});
6366

67+
// Make sure that the notify waitable is immediately added to the collection
68+
// to avoid missing events
69+
this->add_notify_waitable_to_collection(current_entities_collection_->waitables);
70+
6471
notify_waitable_->add_guard_condition(interrupt_guard_condition_);
6572
notify_waitable_->add_guard_condition(shutdown_guard_condition_);
6673

@@ -87,9 +94,6 @@ EventsExecutor::EventsExecutor(
8794

8895
this->entities_collector_ =
8996
std::make_shared<rclcpp::executors::ExecutorEntitiesCollector>(notify_waitable_);
90-
91-
this->current_entities_collection_ =
92-
std::make_shared<rclcpp::executors::ExecutorEntitiesCollection>();
9397
}
9498

9599
EventsExecutor::~EventsExecutor()
@@ -395,18 +399,8 @@ EventsExecutor::refresh_current_collection_from_callback_groups()
395399
// retrieved in the "standard" way.
396400
// To do it, we need to add the notify waitable as an entry in both the new and
397401
// current collections such that it's neither added or removed.
398-
rclcpp::CallbackGroup::WeakPtr weak_group_ptr;
399-
new_collection.waitables.insert(
400-
{
401-
this->notify_waitable_.get(),
402-
{this->notify_waitable_, weak_group_ptr}
403-
});
404-
405-
this->current_entities_collection_->waitables.insert(
406-
{
407-
this->notify_waitable_.get(),
408-
{this->notify_waitable_, weak_group_ptr}
409-
});
402+
this->add_notify_waitable_to_collection(new_collection.waitables);
403+
this->add_notify_waitable_to_collection(current_entities_collection_->waitables);
410404

411405
this->refresh_current_collection(new_collection);
412406
}
@@ -486,3 +480,16 @@ EventsExecutor::create_waitable_callback(const rclcpp::Waitable * entity_key)
486480
};
487481
return callback;
488482
}
483+
484+
void
485+
EventsExecutor::add_notify_waitable_to_collection(
486+
ExecutorEntitiesCollection::WaitableCollection & collection)
487+
{
488+
// The notify waitable is not associated to any group, so use an invalid one
489+
rclcpp::CallbackGroup::WeakPtr weak_group_ptr;
490+
collection.insert(
491+
{
492+
this->notify_waitable_.get(),
493+
{this->notify_waitable_, weak_group_ptr}
494+
});
495+
}

rclcpp/test/rclcpp/executors/test_events_executor.cpp

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -479,6 +479,74 @@ TEST_F(TestEventsExecutor, destroy_entities)
479479
spinner.join();
480480
}
481481

482+
// This test verifies that the add_node operation is robust wrt race conditions.
483+
// The initial implementation of the events-executor contained a bug where the executor
484+
// would end up in an inconsistent state and stop processing interrupt/shutdown notifications.
485+
// Manually adding a node to the executor results in a) producing a notify waitable event
486+
// and b) refreshing the executor collections.
487+
// The inconsistent state would happen if the event was processed before the collections were
488+
// finished to be refreshed: the executor would pick up the event but be unable to process it.
489+
// This would leave the `notify_waitable_event_pushed_` flag to true, preventing additional
490+
// notify waitable events to be pushed.
491+
// The behavior is observable only under heavy load, so this test spawns several worker
492+
// threads. Due to the nature of the bug, this test may still succeed even if the
493+
// bug is present. However repeated runs will show its flakiness nature and indicate
494+
// an eventual regression.
495+
TEST_F(TestEventsExecutor, testRaceConditionAddNode)
496+
{
497+
// rmw_connextdds doesn't support events-executor
498+
if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) {
499+
GTEST_SKIP();
500+
}
501+
502+
// Spawn some threads to do some heavy work
503+
std::atomic<bool> should_cancel = false;
504+
std::vector<std::thread> stress_threads;
505+
for (size_t i = 0; i < 5 * std::thread::hardware_concurrency(); i++) {
506+
stress_threads.emplace_back([&should_cancel, i]() {
507+
// This is just some arbitrary heavy work
508+
size_t total = 0;
509+
for (size_t k = 0; k < 549528914167; k++) {
510+
if (should_cancel) {
511+
break;
512+
}
513+
total += k * (i + 42);
514+
}
515+
// Do something with the total to avoid the thread's work being optimized away
516+
std::cout<<"The dummy total is: "<< total<<std::endl;
517+
});
518+
}
519+
520+
// Create an executor
521+
auto executor = std::make_shared<EventsExecutor>();
522+
// Start spinning
523+
auto executor_thread = std::thread([executor]() {
524+
executor->spin();
525+
});
526+
// Add a node to the executor
527+
auto node_options = ros2_test::unit_test_node_options();
528+
auto node = std::make_shared<rclcpp::Node>("my_node", node_options);
529+
executor->add_node(node->get_node_base_interface());
530+
531+
// Cancel the executor (make sure that it's already spinning first)
532+
while (!executor->is_spinning() && rclcpp::ok())
533+
{
534+
continue;
535+
}
536+
executor->cancel();
537+
538+
// Try to join the thread after cancelling the executor
539+
// This is the "test". We want to make sure that we can still cancel the executor
540+
// regardless of the presence of race conditions
541+
executor_thread.join();
542+
543+
// The test is now completed: we can join the stress threads
544+
should_cancel = true;
545+
for (auto & t : stress_threads) {
546+
t.join();
547+
}
548+
}
549+
482550
// Testing construction of a subscriptions with QoS event callback functions.
483551
std::string * g_pub_log_msg;
484552
std::string * g_sub_log_msg;

0 commit comments

Comments
 (0)