Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions realtime_tools/include/realtime_tools/realtime_publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ class RealtimePublisher
: publisher_(publisher), is_running_(false), keep_running_(true), turn_(State::LOOP_NOT_STARTED)
{
thread_ = std::thread(&RealtimePublisher::publishingLoop, this);

// Wait for the thread to be ready before proceeding
// This is important to ensure that the thread is properly initialized and ready to handle
// messages before any other operations are performed on the RealtimePublisher instance.
while (!thread_.joinable() ||
turn_.load(std::memory_order_acquire) == State::LOOP_NOT_STARTED) {
std::this_thread::sleep_for(std::chrono::microseconds(100));
}
}

[[deprecated(
Expand Down Expand Up @@ -116,14 +124,14 @@ class RealtimePublisher
/**
* \brief Try to acquire the data lock for non-realtime message publishing
*
* It first checks if the current state allows non-realtime message publishing (turn_ == NON_REALTIME)
* It first checks if the current state allows non-realtime message publishing (turn_ == REALTIME)
* and then attempts to lock
*
* \return true if the lock was successfully acquired, false otherwise
*/
bool trylock()
{
if (turn_.load(std::memory_order_acquire) == State::NON_REALTIME && msg_mutex_.try_lock()) {
if (turn_.load(std::memory_order_acquire) == State::REALTIME && msg_mutex_.try_lock()) {
return true;
} else {
return false;
Expand Down Expand Up @@ -159,7 +167,7 @@ class RealtimePublisher
*/
void unlockAndPublish()
{
turn_.store(State::REALTIME, std::memory_order_release);
turn_.store(State::NON_REALTIME, std::memory_order_release);
unlock();
}

Expand Down Expand Up @@ -196,33 +204,32 @@ class RealtimePublisher
* \brief Publishing loop (runs in separate thread)
*
* This is the main loop for the non-realtime publishing thread. It:
* 1. Waits for new messages (State::REALTIME)
* 1. Waits for new messages (State::NON_REALTIME)
* 2. Copies the message data
* 3. Publishes the message through the ROS publisher
* 4. Returns to State::NON_REALTIME to allow realtime updates
* 4. Returns to State::REALTIME to allow realtime updates
*
* The loop continues until keep_running_ is set to false.
*/
void publishingLoop()
{
is_running_ = true;
turn_.store(State::NON_REALTIME, std::memory_order_release);

while (keep_running_) {
MessageT outgoing;

{
turn_.store(State::REALTIME, std::memory_order_release);
// Locks msg_ and copies it to outgoing
std::unique_lock<std::mutex> lock_(msg_mutex_);
updated_cond_.wait(lock_, [&] { return turn_ == State::REALTIME || !keep_running_; });
updated_cond_.wait(lock_, [&] { return turn_ == State::NON_REALTIME || !keep_running_; });
outgoing = msg_;
}

// Sends the outgoing message
if (keep_running_) {
publisher_->publish(outgoing);
}
turn_.store(State::NON_REALTIME, std::memory_order_release);
}
is_running_ = false;
}
Expand Down