-
-
Notifications
You must be signed in to change notification settings - Fork 32.3k
gh-109709: Fix asyncio test_stdin_broken_pipe() #109710
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Replace harcoded sleep of 500 ms with synchronization using a pipe. Fix also Process._feed_stdin(): catch also BrokenPipeError on stdin.write(input), not only on stdin.drain().
561d588
to
24b415d
Compare
On FreeBSD, without this fix, I reproduce the bug in less than 30 seconds with the command:
With this fix, I can no longer reproduce the issue. I interrupted the test after 6 min 36 sec:
Well, while running this stress test I found a second issue, a a real bug in Process._feed_stdin(): it doesn't catch BrokenPipeError as promised when the error occurs on stdin.write(). I already updated my PR to include a fix. |
from subprocess import STARTUPINFO | ||
startupinfo = STARTUPINFO() | ||
startupinfo.lpAttributeList = {"handle_list": [handle]} | ||
kwargs = dict(startupinfo=startupinfo) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On Windows, passing pipes as stdin, stdout and stderr is well supported. Passing an additional pipe is not supported by msvcrt (CreateProcess). Passing a handle is possible, but it requires to convert FD to handle and then handle to FD.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could the process be synchronized in a way that requires less boilerplate for Windows? Is just calling terminate() not enough?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like using a pipe as a sync primitive, but I'm unhappy by the quantity of code needed for that :-(
In practice, what is needed is that the child process hangs until the parent decides to terminate it in a clean fashion.
Thanks @vstinner for the PR 🌮🎉.. I'm working now to backport this PR to: 3.11, 3.12. |
Sorry, @vstinner, I could not cleanly backport this to |
Replace harcoded sleep of 500 ms with synchronization using a pipe. Fix also Process._feed_stdin(): catch also BrokenPipeError on stdin.write(input), not only on stdin.drain(). (cherry picked from commit cbbdf2c) Co-authored-by: Victor Stinner <[email protected]>
GH-109731 is a backport of this pull request to the 3.12 branch. |
GH-109735 is a backport of this pull request to the 3.11 branch. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yay for less magic sleeps and more actual synchronization!
I added a review comment on a bit that worries me (is transport.write() actually supposed to raise on broken pipe?).
if input is not None: | ||
self.stdin.write(input) | ||
if debug: | ||
logger.debug( | ||
'%r communicate: feed stdin (%s bytes)', self, len(input)) | ||
|
||
await self.stdin.drain() | ||
except (BrokenPipeError, ConnectionResetError) as exc: | ||
# communicate() ignores BrokenPipeError and ConnectionResetError | ||
# communicate() ignores BrokenPipeError and ConnectionResetError. | ||
# write() and drain() can raise these exceptions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, if I understand correctly this change is here because write() could raise BrokenPipeError when registering a writer. Is this expected behavior? _UnixWritePipeTransport.write() tries hard not to raise if the os.write() call fails.
Could it be that this was not noticed on Linux because the error is raised only by the kqueue selector?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I didn't keep the traceback. I got a BrokenPipeError on FreeBSD, yeah, it was somewhere in the kqueue selector. You can revert my change, and stress-test the test as I described in the issue / PR, to easily trigger the bug.
Could it be that this was not noticed on Linux because the error is raised only by the kqueue selector?
Maybe on Windows and Linux, write() cannot trigger these exceptions, but write() does on FreeBSD.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Simple snippet to trigger the exception:
import os, selectors
sel = selectors.DefaultSelector()
rfd, wfd = os.pipe()
os.close(rfd)
sel.register(wfd, selectors.EVENT_WRITE)
Tested on modern releases of Linux, macOS, FreeBSD, and I only see it raise on FreeBSD. See discussion here for some details: tokio-rs/mio#582 (older versions of macOS used to fail in the same case, and NetBSD/OpenBSD report different errors).
I think this should be considered a bug. _UnixWritePipeTransport.write() should not be expected to raise, otherwise exception handling code would have to be sprinkled all over user code.
In the linked issues (both Tokio and libevent) it's solved at an abstraction level similar to the selectors module in Python. If we agree it could be considered a bug, it's either a selectors bug, or an asyncio bug.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wait, registering a closed FD is bad: don't do that. It wasn't the issue that I got (I hope).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be considered a bug. _UnixWritePipeTransport.write() should not be expected to raise, otherwise exception handling code would have to be sprinkled all over user code.
If you think that there is a bug, please open a new issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not a closed FD! Only the other end of the pipe is closed. The write fd is still valid. On Linux/Mac you will always get an event, so you get the error when you try to write. On other BSDs you get the error preemptively, when registering.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry i was confused between rfd and wfd.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I opened a new issue here #109757. I'd be glad to work on it.
from subprocess import STARTUPINFO | ||
startupinfo = STARTUPINFO() | ||
startupinfo.lpAttributeList = {"handle_list": [handle]} | ||
kwargs = dict(startupinfo=startupinfo) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could the process be synchronized in a way that requires less boilerplate for Windows? Is just calling terminate() not enough?
Replace harcoded sleep of 500 ms with synchronization using a pipe. Fix also Process._feed_stdin(): catch also BrokenPipeError on stdin.write(input), not only on stdin.drain().
…109731) gh-109709: Fix asyncio test_stdin_broken_pipe() (GH-109710) Replace harcoded sleep of 500 ms with synchronization using a pipe. Fix also Process._feed_stdin(): catch also BrokenPipeError on stdin.write(input), not only on stdin.drain(). (cherry picked from commit cbbdf2c) Co-authored-by: Victor Stinner <[email protected]>
Replace harcoded sleep of 500 ms with synchronization using a pipe. Fix also Process._feed_stdin(): catch also BrokenPipeError on stdin.write(input), not only on stdin.drain().
Replace harcoded sleep of 500 ms with synchronization using a pipe.