@@ -156,6 +156,29 @@ class Subscription : public SubscriptionBase
156
156
" ' is not allowed with 0 depth qos policy" );
157
157
}
158
158
159
+ // Use std::weak_ptr owner_before trick to determine if user
160
+ // has assigned a subscription options_.callback_lifetime weak_ptr.
161
+ // https://stackoverflow.com/a/45507610
162
+ std::weak_ptr<void > empty;
163
+ if (!options_.callback_lifetime .owner_before (empty) &&
164
+ !empty.owner_before (options_.callback_lifetime )) {
165
+ // options_.callback_lifetime was not user assigned,
166
+ // So use options_.callback_group if user assigned,
167
+ // falling back to node's default_callback_group
168
+ std::shared_ptr<void > vsp = options_.callback_group == nullptr ?
169
+ node_base->get_default_callback_group () :
170
+ options_.callback_group ;
171
+ std::weak_ptr<void > vwp = vsp;
172
+ options_.callback_lifetime = vwp;
173
+ }
174
+
175
+ if (options_.callback_lifetime .expired ())
176
+ {
177
+ throw std::invalid_argument (
178
+ " callback_lifetime weak_ptr for topic '" + topic_name +
179
+ " ' has already expired" );
180
+ }
181
+
159
182
using SubscriptionIntraProcessT = rclcpp::experimental::SubscriptionIntraProcess<
160
183
MessageT,
161
184
SubscribedType,
@@ -172,7 +195,8 @@ class Subscription : public SubscriptionBase
172
195
context,
173
196
this ->get_topic_name (), // important to get like this, as it has the fully-qualified name
174
197
qos_profile,
175
- resolve_intra_process_buffer_type (options_.intra_process_buffer_type , callback));
198
+ resolve_intra_process_buffer_type (options_.intra_process_buffer_type , callback),
199
+ options_.callback_lifetime );
176
200
TRACETOOLS_TRACEPOINT (
177
201
rclcpp_subscription_init,
178
202
static_cast <const void *>(get_subscription_handle ().get ()),
@@ -300,12 +324,15 @@ class Subscription : public SubscriptionBase
300
324
now = std::chrono::system_clock::now ();
301
325
}
302
326
303
- any_callback_.dispatch (typed_message, message_info);
327
+ if (!options_.callback_lifetime .expired ())
328
+ {
329
+ any_callback_.dispatch (typed_message, message_info);
304
330
305
- if (subscription_topic_statistics_) {
306
- const auto nanos = std::chrono::time_point_cast<std::chrono::nanoseconds>(now);
307
- const auto time = rclcpp::Time (nanos.time_since_epoch ().count ());
308
- subscription_topic_statistics_->handle_message (message_info.get_rmw_message_info (), time);
331
+ if (subscription_topic_statistics_) {
332
+ const auto nanos = std::chrono::time_point_cast<std::chrono::nanoseconds>(now);
333
+ const auto time = rclcpp::Time (nanos.time_since_epoch ().count ());
334
+ subscription_topic_statistics_->handle_message (message_info.get_rmw_message_info (), time);
335
+ }
309
336
}
310
337
}
311
338
@@ -321,12 +348,15 @@ class Subscription : public SubscriptionBase
321
348
now = std::chrono::system_clock::now ();
322
349
}
323
350
324
- any_callback_.dispatch (serialized_message, message_info);
351
+ if (!options_.callback_lifetime .expired ())
352
+ {
353
+ any_callback_.dispatch (serialized_message, message_info);
325
354
326
- if (subscription_topic_statistics_) {
327
- const auto nanos = std::chrono::time_point_cast<std::chrono::nanoseconds>(now);
328
- const auto time = rclcpp::Time (nanos.time_since_epoch ().count ());
329
- subscription_topic_statistics_->handle_message (message_info.get_rmw_message_info (), time);
355
+ if (subscription_topic_statistics_) {
356
+ const auto nanos = std::chrono::time_point_cast<std::chrono::nanoseconds>(now);
357
+ const auto time = rclcpp::Time (nanos.time_since_epoch ().count ());
358
+ subscription_topic_statistics_->handle_message (message_info.get_rmw_message_info (), time);
359
+ }
330
360
}
331
361
}
332
362
@@ -353,12 +383,15 @@ class Subscription : public SubscriptionBase
353
383
now = std::chrono::system_clock::now ();
354
384
}
355
385
356
- any_callback_.dispatch (sptr, message_info);
386
+ if (!options_.callback_lifetime .expired ())
387
+ {
388
+ any_callback_.dispatch (sptr, message_info);
357
389
358
- if (subscription_topic_statistics_) {
359
- const auto nanos = std::chrono::time_point_cast<std::chrono::nanoseconds>(now);
360
- const auto time = rclcpp::Time (nanos.time_since_epoch ().count ());
361
- subscription_topic_statistics_->handle_message (message_info.get_rmw_message_info (), time);
390
+ if (subscription_topic_statistics_) {
391
+ const auto nanos = std::chrono::time_point_cast<std::chrono::nanoseconds>(now);
392
+ const auto time = rclcpp::Time (nanos.time_since_epoch ().count ());
393
+ subscription_topic_statistics_->handle_message (message_info.get_rmw_message_info (), time);
394
+ }
362
395
}
363
396
}
364
397
@@ -449,7 +482,9 @@ class Subscription : public SubscriptionBase
449
482
* It is important to save a copy of this so that the rmw payload which it
450
483
* may contain is kept alive for the duration of the subscription.
451
484
*/
452
- const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> options_;
485
+ // NOTE: Had to drop const in order to set default options_.callback_lifetime
486
+ // if not set in user code.
487
+ rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> options_;
453
488
typename message_memory_strategy::MessageMemoryStrategy<ROSMessageType, AllocatorT>::SharedPtr
454
489
message_memory_strategy_;
455
490
0 commit comments