Skip to content

Support timeout in Channel.receive_first and Channel.send_first #16094

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: master
Choose a base branch
from

Conversation

lachlan
Copy link
Contributor

@lachlan lachlan commented Aug 16, 2025

Support timeout in Channel.receive_first and Channel.send_first, implementing requested feature #16075.

Adds an optional named input parameter timeout for specifying the maximum time to wait for the send/receive action to complete.

If the timeout is exceeded a Channel::TimeoutError exception is then raised.

The timeout parameter defaults to nil for backwards compatibility.

…rystal-lang#16075)

Adds an optional named input parameter `timeout` for specifying the
maximum time to wait for the send/receive action to complete.

If the timeout is exceeded a `Channel::TimeoutError` exception is then
raised.

The `timeout` parameter defaults to `nil` for backwards compatibility.
src/channel.cr Outdated
def initialize(msg = "Channel is closed")
super(msg)
end
end

class TimeoutError < Error
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thought: I'm wondering if a dedicated Channel::TimeoutError makes much sense. Perhaps a generic timeout error would be better? It could be reused for other kinds of timeouts. I suppose it usually shouldn't matter much whether a timeout occurred in a channel or somewhere else...
There's already IO::Timeout, but it probably makes sense to be a separate type because it should be in the IO::Error hierarchy. I don't think Channel::Error is equivalently relevant.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I started out just using IO::TimeoutError but I convinced myself it wasn't appropriate to use an IO related error for a Channel timeout.

If adding a Channel::Error superclass to the existing Channel::ClosedError and new Channel::TimeoutError has little utility, I'll go ahead and remove it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've removed Channel::Error, but left Channel::TimeoutError for now pending whether or not there should be a generic and reusable TimeoutError in the stdlib.

src/channel.cr Outdated
# :ditto:
def self.receive_first(channels : Enumerable(Channel), *, timeout : Time::Span? = nil)
actions = channels.map(&.receive_select_action)
actions = actions.to_a + [TimeoutAction.new(timeout)] unless timeout.nil?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue: ary + [x] should at least be rewritten as ary << x to avoid the intermediate array.
But perhaps we could also avoid unnecessary heap allocations entirely?

actions.to_a would always allocate an array, even when channels is a tuple, and currently would not allocate at all.
For reference, Channel.select_impl goes extra lengths with custom implementations for different collection types in order to avoid heap allocations if at all possible.

This is only for the case when using a timeout, so it doesn't look like this introduces a performance regression for existing code. But it could still mean receive_first with timeout is significantly less performant than without. I haven't benchmarked this, just based on intuition (which may be wrong).

Suggested change
actions = actions.to_a + [TimeoutAction.new(timeout)] unless timeout.nil?
actions = actions.to_a << TimeoutAction.new(timeout) unless timeout.nil?

ditto for .send_first

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I try actions = actions.to_a << TimeoutAction.new(timeout) unless timeout.nil? the compiler gives this error:

In src\channel.cr:330:45

 330 | actions = actions.to_a << TimeoutAction.new(timeout) unless timeout.nil?
                                               ^--
Error: expected argument #1 to 'Array(Channel::StrictReceiveAction(Int32))#<<' to be Channel::StrictReceiveAction(Int32), not Channel::TimeoutAction

Overloads are:
 - Array(T)#<<(value : T)

So I ended up with the actions = actions.to_a + [TimeoutAction.new(timeout)] approach as it was the only way I could figure out how to make the compiler happy with the types.

I would love if there was a better approach than concatenating these two temporary arrays to make the compiler happy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of array concatenation, I tried splatting method arguments into the call to self.select and that works and seems like a better approach? And I assume splatting method arguments avoids heap allocation, but @straight-shoota please confirm?

index, value = self.select(*actions, TimeoutAction.new(timeout))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Splatting only works for Tuple. So this code wouldn't compile if actions was, for example, Array or StaticArray.
Seems like we're missing a test case for that. 🙈

We can use splatting as an optimization for Tuple, but we would need other branches for other enumerables as well, just like in .select_impl.

For the array append to work, we must type the array as Array(SelectAction) (which is fine because .select expects that item type). An easy way to do that is to explicitly cast the output type of the map block: channels.map(&.receive_select_action.as(SelectAction)). This should result in an appropriate collection type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added some additional tests to cover Array and StaticArray, and yep as you said compilation failed:

In src\channel.cr:360:34

 360 | index, value = self.select(*actions, TimeoutAction.new(timeout))
                                  ^
Error: argument to splat must be a tuple, not Array(Channel::StrictReceiveAction(Int32))

I'm sorry (for being such a noob) but I cannot for the life of me figure out how to make the compiler happy trying to use << to append a TimeoutAction. I tried the following:

In src\channel.cr:364:41

 364 | actions = actions.to_a.as(Array(SelectAction)) << timeout_action
                                       ^-----------
Error: can't use Channel::SelectAction(S) as generic type argument yet, use a more specific type

...

In src\channel.cr:364:55

 364 | actions = actions.to_a(&.as(SelectAction)) << timeout_action
                                                     ^-------------
Error: expected argument #1 to 'Array(Channel::StrictReceiveAction(Int32))#<<' to be Channel::StrictReceiveAction(Int32), not Channel::TimeoutAction

...

In src\channel.cr:365:59

 365 | actions = actions.to_a.map(&.as(SelectAction)) << timeout_action
                                                         ^-------------
Error: expected argument #1 to 'Array(Channel::StrictReceiveAction(Int32))#<<' to be Channel::StrictReceiveAction(Int32), not Channel::TimeoutAction

But have ended up back at the actions.to_a + [TimeoutAction.new(timeout)] as the only way I could get it to compile.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants