Skip to content

asioexec::completion_token & ::use_sender #1503

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 10, 2025

Conversation

RobertLeahy
Copy link
Contributor

See commit message for description of functionality.

Addresses defects discussed in comment on #1501.

Copy link

copy-pr-bot bot commented Mar 27, 2025

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

@ericniebler
Copy link
Collaborator

/ok to test

@AnonymousPC
Copy link

AnonymousPC commented Mar 27, 2025

This PR successfully resolves the issues mentioned in #1501. However, the use_sender tokens still runs on legacy boost::asio contexts.

std::cout << "main: " << std::this_thread::get_id() << std::endl;

auto context = boost::asio::io_context();
auto runner = std::thread([&] {
    auto guard = boost::asio::make_work_guard(context);
    context.run();
});

auto timer = boost::asio::steady_timer(context, std::chrono::seconds(1));
auto task = timer.async_wait(asioexec::use_sender)
            | stdexec::then([] (auto&&...) { std::cout << "then: " << std::this_thread::get_id() << std::endl; });
stdexec::sync_wait(std::move(task));

context.stop();
runner.join();

The output (on my laptop, may vary across different platforms) appears to be:

main: 0x2090d4840
then: 0x16d5d3000

As far as I can tell one of the core concepts in std::execution is that operations are independent from schedulers, which means that a function like std::execution::sender auto async_operation() should always execute within the caller-provided execution context (unless explicitly schedule, starts_on or continutes_on in function body).

@villevoutilainen
Copy link
Contributor

One of the core concepts in std::execution is that operations are independent from schedulers, which means that a function like std::execution::sender auto async_operation() should always execute within the caller-provided execution context (unless explicitly schedule(particular_sched) in function body).

That's a strong preference and recommendation, but not a hard requirement. Some async work might not be able to be moved to another context, even if you can move some parts of its setup with starts_on and its whole continuations with continues_on. And then there's things that can't really move the setup either, such as this little beast:
https://git.qt.io/vivoutil/libunifex-with-qt/-/blob/main/thread_example/threadrunner.cpp?ref_type=heads#L89-96

The QTimer can't be started or stopped on any other thread than the one that owns it. So, its users will have to do that transfer (continues_on) dance before invoking such operations. That doesn't mean that QTimers cannot be used with the rest of the framework, they just happen to require such a QTimer-specific dance.

I can easily imagine that there's some existing async facilities that are similarly thread-pinned to a particular i/o thread, and can't be completely moved to another scheduler, even if you can move parts of operating them, like doing a starts_on, but that can't be assumed to mean that every part of it moves. It's named "starts_on", not "runs_completely_on".

@ericniebler
Copy link
Collaborator

However, the use_sender tokens still runs on legacy boost::asio contexts.

playing the n00b card: what is the non-legacy way to do this? what are the advantages?

@AnonymousPC
Copy link

AnonymousPC commented Mar 27, 2025

However, the use_sender tokens still runs on legacy boost::asio contexts.

playing the n00b card: what is the non-legacy way to do this? what are the advantages?

In this case:

boost::asio::io_context external_context;

auto function() {
  auto work = boost::asio::ssl_stream(external_context).async_connect(website)
            | std::execution::let_value([] { return async_ssl_handshake(); /*here*/ })
            | std::execution::let_value([] { return async_send_request(); /*and here*/ });
  std::execution::sync_wait(work);
}

I'm wondering which thread will execute the "here" scope:

  • The function caller's thread? (stdexec/examples use this one)
  • The external_context's thread? ("legacy" boost::asio use this one)
  • The main thread?
  • A system context thread?
  • Or could it be any of the above?

A reasonable assumption might be the caller's thread, because:

  1. std::execution only has access to the caller thread in this scenario.
  2. If the "here" scope were to run on the external_context's thread, the function caller's thread would still need to busy-wait for it (potentially causing ~2x CPU clock usage due to contention).

Would appreciate any insights or corrections on this understanding!
(Above is translated by GPT as english is not my mother-language, and i've tried hard not making the grammar seems offensive. Thank you !)

@RobertLeahy
Copy link
Contributor Author

However, the use_sender tokens still runs on legacy boost::asio contexts.

playing the n00b card: what is the non-legacy way to do this? what are the advantages?

I think the boost::asio::io_context (in which work is running) is what's being called "legacy," rather than what's actually being done.

std::this_thread::sync_wait must generate a receiver to connect to the provided sender. This receiver has an associated scheduler and delegation scheduler obtained from a std::execution::run_loop execution context.

Asio asynchronous operations have one or two executors associated with them: The "I/O executor" which is associated with the "I/O object" upon which the operation is being performed, and the "associated executor" which is associated with the completion handler. When obtaining the executor which will actually be used operations are expected to consult the completion handler association, provide the I/O executor as a fallback, and then perform all work (except that contained within the initiation) thereupon.

Note that modern versions of Asio actually have two hooks for providing an associated executor: One to be used in the case of deferred execution, and one to be used in the case of immediate execution. This distinction is important because in the absence of an immediate executor association Asio asynchronou operations which complete "inline" (i.e. from within the initiation) are required to complete non-reentrantly. This is possible because in the Asio executor model whether or not work is allowed to complete inline (i.e. from within the execute member function of the executor) can be controlled via the "blocking" property.

It's important to note that S&R schedulers do not support this functionality. You cannot ask an S&R scheduler whether or not its schedule operation is permitted to complete inline within start, and you cannot ask an S&R scheduler to provide you with an updated version of itself which provides (or doesn't provide) a guarantee one way or another.

The above means that S&R schedulers lack the ability to express a fundamental Asio basis operation: A request to execute work non-reentrantly. Since asynchronous operations in the Asio model do not complete inline (unless the user has explicitly opted into this via the immediate executor association) many Asio asynchronous operations will assume this property about their intermediate asynchronous operations and will only be correct (i.e. not liable to overflow the stack) if this is satisfied.

Which gives us the background to come back to the question at hand. The issue here is that asioexec::completion_token (which asioexec::use_sender is built on top of) does not obtain a scheduler from the receiver's environment and pass this through as the associated executor from the completion handler it synthesizes to interoperate with Asio asynchronous operations. The fundamental reason for this is, as described above, S&R schedulers cannot provide all the same guarantees as Asio executors, and that this would therefore be unsafe and undesirable.

The argument could be made that an executor could be synthesized which wraps both a scheduler and the I/O executor, and which obtains non-reentrant execution by requesting such execution from the executor, and then scheduling back to the scheduler but this would be inefficient and would still require additional careful care and attention to satisfy Asio's guarantees that no move constructors of the completion handler are called except from the associated executor's execution context.

Ultimately, at least as of now, the S&R ecosystem does not provide guarantees anywhere near as strict around schedulers and execution contexts as Asio, and therefore while there is a scheduler available in the receiver's environment operations are free within the S&R model to ignore it, and that's what this integration does (at least as of now).

@RobertLeahy
Copy link
Contributor Author

In this case:

boost::asio::io_context external_context;

auto function() {
  auto work = boost::asio::ssl_stream(external_context).async_connect(website)
            | std::execution::let_value([] { return async_ssl_handshake(); /*here*/ })
            | std::execution::let_value([] { return async_send_request(); /*and here*/ });
  std::execution::sync_wait(work);
}

I'm wondering which thread will execute the "here" scope [...]

Assuming async_connect is parameterized with asioexec::use_sender from this PR the first here will run within external_context. If we assume that async_ssl_handshake sources a sender from an I/O object also associated with external_context the same will also be true for and here.

@ericniebler
Copy link
Collaborator

This is possible because in the Asio executor model whether or not work is allowed to complete inline (i.e. from within the execute member function of the executor) can be controlled via the "blocking" property.

we've long discussed the possibility of a "blocking" sender attribute. it needs a design and a champion. i would love to close this gap.

Copy link
Collaborator

@ericniebler ericniebler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

love this, it's looking good. needs tests and a clang-format. i can help with the cmakery needed for the tests, if you like.

::stdexec::stop_callback_for_t<
::stdexec::stop_token_of_t<
::stdexec::env_of_t<Receiver>>,
stop_callback>> callback_;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i prefer to define all the members in one place -- top or bottom of the class, i don't care which -- but pick one. and unless there's a good reason, they should all have the same accessibility.

finally, unless you expect this hierarchy to be openly extensible, i prefer private/friend over protected.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only reason this base class exists at all is to be SCARY.

Given that it's buried in a detail namespace I don't expect the hierarchy to be openly extensible, but I also don't expect anyone to look at or care about any of the members, so I was just using the accessibility that got me what I needed with the least amount of typing.

@ericniebler
Copy link
Collaborator

/ok to test

2 similar comments
@ericniebler
Copy link
Collaborator

/ok to test

@ericniebler
Copy link
Collaborator

/ok to test

@RobertLeahy
Copy link
Contributor Author

I repushed a large change but it doesn't address the warnings @ericniebler and I have discussed (regarding mismatch between declared signatures and actual value categories sent on the Asio side). I'm working on a solution for that.

@RobertLeahy
Copy link
Contributor Author

The above-mentioned issue should be addressed by this latest push. I no longer get warnings that the declared signatures and actually sent values have cv-/ref-qualification mismatches.

@ericniebler
Copy link
Collaborator

/ok to test 8fd12c0

@ericniebler
Copy link
Collaborator

/ok to test 7b525c9

@@ -67,6 +67,8 @@ jobs:
-DCMAKE_BUILD_TYPE=${{ matrix.build }} \
-DCMAKE_CXX_FLAGS="${{ matrix.cxxflags }}" \
-DSTDEXEC_ENABLE_TBB:BOOL=${{ !contains(matrix.cxxflags, '-fsanitize') }} \
-DSTDEXEC_ENABLE_ASIO:BOOL=TRUE \
-DSTDEXEC_ASIO_IMPLEMENTATION:STRING=boost \
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why only test in CI with Boost.Asio?

@ericniebler
Copy link
Collaborator

/ok to test 895a80c

RobertLeahy and others added 2 commits May 10, 2025 09:05
Adds two completion tokens for interop with Asio (either Boost.Asio or
standalone Asio).

asioexec::completion_token performs the most basic transformations
necessary to transform an Asio initiating function into a sender
factory:

- The initiating function returns a sender
- Initiation is deferred until the above-mentioned sender is connected
  and the resulting operation state is started
- The completion handler provided to the initiation (see asio::
  async_initiate) has the following properties:
  - Invocation results in the arguments thereto being sent via a value
    completion signal (this means that errors transmitted via a leading
    error_code parameter (i.e. in Asio style) are delivered via the
    value channel, see below)
  - Abandonment thereof (i.e. allowing the lifetime of the completion
    handler, and all objects transitively derived by moving therefrom,
    to end without invoking any of them) results in a stopped completion
    signal
  - Any exception thrown from any intermediate completion handler, or
    the final completion handler, is sent via an error completion signal
    with a std::exception_ptr representing that exception (this is
    accomplished by wrapping the associated executor)
  - The cancellation slot is connected to a cancellation signal which
    is sent when a stop request is received via the receiver's
    associated stop token

The fact that invocations of the completion handler are passed to the
value channel untouched reflects the design intent that the above-
described completion token perform only "the most basic transformations
necessary." This means that the full context of partial success must be
made available and since the error channel is unary this must be
transmitted in the value channel.

For a more ergonomic experience than that described above asioexec::
use_sender is also provided. This uses asioexec::completion_token to
adapt an Asio initiating function into a sender factory and wraps the
returned sender with an additional layer which performs the following
transformations to value completion signals with a leading error_code
parameter (note that when configured for standalone Asio std::error_code
is matched whereas when configured for Boost.Asio both boost::system::
error_code and std::error_code are matched):

- If that argument compares equal to errc::operation_cancelled
  transforms the value completion signal into a stopped completion
  signal, otherwise
- If that argument is truthy transforms the value completion signal into
  an error completion signal with an appropriate std::exception_ptr
  (i.e. one which points to a std::system_error for std::error_code,
  boost::system::system_error for boost::system::error_code), otherwise
- Sends the remainder of the arguments (i.e. all but the error_code) as
  a value completion signal
@ericniebler
Copy link
Collaborator

/ok to test eeb430a

@ericniebler ericniebler merged commit 35a3e31 into NVIDIA:main May 10, 2025
18 checks passed
@oleksandrkozlov
Copy link

Hi @RobertLeahy and @ericniebler,

Thanks for the addition of asio support in stdexec. I'm currently experimenting with it in a single-threaded app that uses an asio::io_context event loop. In particular, I'm trying to switch from asio::awaitable to exec::task by using asioexec::use_sender instead of asio::use_awaitable.

With asio::use_awaitable, I can write and spawn a coroutine like this:

auto ctx = asio::io_context{};

auto task = [&]() -> asio::awaitable<void> {
    auto timer = asio::steady_timer(ctx, std::chrono::seconds{1});
    co_await timer.async_wait(asio::use_awaitable);
}();

asio::co_spawn(ctx, std::move(task), asio::detached);

ctx.run();

Now, with asioexec::use_sender, I can create a similar task:

auto ctx = asio::io_context{};

auto task = [&]() -> exec::task<void> {
    auto timer = asio::steady_timer(ctx, std::chrono::seconds{1});
    co_await timer.async_wait(asioexec::use_sender);
}();

// How do I spawn this task?

ctx.run();

However, I couldn’t figure out how to spawn such a exec::task when using asio::io_context as the event loop in a single-threaded application. A clarification would be greatly appreciated. Thanks in advance!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants