Skip to content

Commit 72f3843

Browse files
committed
improve some error handling and add a test.
1 parent d82afc8 commit 72f3843

File tree

3 files changed

+70
-15
lines changed

3 files changed

+70
-15
lines changed

Lib/multiprocessing/forkserver.py

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import threading
99
import warnings
1010

11+
from . import AuthenticationError
1112
from . import connection
1213
from . import process
1314
from .context import reduction
@@ -290,15 +291,22 @@ def sigchld_handler(*_unused):
290291
if listener in rfds:
291292
# Incoming fork request
292293
with listener.accept()[0] as s:
293-
if authkey:
294-
wrapped_s = connection.Connection(s.fileno())
295-
try:
296-
connection.deliver_challenge(wrapped_s, authkey)
297-
connection.answer_challenge(wrapped_s, authkey)
298-
finally:
299-
wrapped_s._detach()
300-
# Receive fds from client
301-
fds = reduction.recvfds(s, MAXFDS_TO_SEND + 1)
294+
try:
295+
if authkey:
296+
wrapped_s = connection.Connection(s.fileno())
297+
try:
298+
connection.deliver_challenge(
299+
wrapped_s, authkey)
300+
connection.answer_challenge(
301+
wrapped_s, authkey)
302+
finally:
303+
wrapped_s._detach()
304+
# Receive fds from client
305+
fds = reduction.recvfds(s, MAXFDS_TO_SEND + 1)
306+
except (EOFError, OSError, AuthenticationError):
307+
# broken pipe or failed authentication
308+
s.close()
309+
continue
302310
if len(fds) > MAXFDS_TO_SEND:
303311
raise RuntimeError(
304312
"Too many ({0:n}) fds to send".format(

Lib/multiprocessing/reduction.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -139,15 +139,12 @@ def detach(self):
139139
__all__ += ['DupFd', 'sendfds', 'recvfds']
140140
import array
141141

142-
# On MacOSX we should acknowledge receipt of fds -- see Issue14669
143-
ACKNOWLEDGE = sys.platform == 'darwin'
144-
145142
def sendfds(sock, fds):
146143
'''Send an array of fds over an AF_UNIX socket.'''
147144
fds = array.array('i', fds)
148145
msg = bytes([len(fds) % 256])
149146
sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)])
150-
if ACKNOWLEDGE and sock.recv(1) != b'A':
147+
if sock.recv(1) != b'A':
151148
raise RuntimeError('did not receive acknowledgement of fd')
152149

153150
def recvfds(sock, size):
@@ -158,8 +155,7 @@ def recvfds(sock, size):
158155
if not msg and not ancdata:
159156
raise EOFError
160157
try:
161-
if ACKNOWLEDGE:
162-
sock.send(b'A')
158+
sock.send(b'A') # Acknowledge
163159
if len(ancdata) != 1:
164160
raise RuntimeError('received %d items of ancdata' %
165161
len(ancdata))

Lib/test/test_multiprocessing_forkserver.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import unittest
2+
from unittest import mock
23
import test._test_multiprocessing
34

5+
import os
46
import sys
57
from test import support
68

@@ -10,7 +12,56 @@
1012
if sys.platform == "win32":
1113
raise unittest.SkipTest("forkserver is not available on Windows")
1214

15+
import multiprocessing
16+
import multiprocessing.connection
17+
import multiprocessing.forkserver
18+
1319
test._test_multiprocessing.install_tests_in_module_dict(globals(), 'forkserver')
1420

21+
22+
class TestForkserverControlAuthentication(unittest.TestCase):
23+
def setUp(self):
24+
super().setUp()
25+
self.context = multiprocessing.get_context("forkserver")
26+
self.pool = self.context.Pool(processes=1, maxtasksperchild=4)
27+
self.assertEqual(self.pool.apply(eval, ("2+2",)), 4)
28+
self.forkserver = multiprocessing.forkserver._forkserver
29+
self.addr = self.forkserver._forkserver_address
30+
self.assertTrue(self.addr)
31+
self.authkey = self.forkserver._forkserver_authkey
32+
self.assertGreater(len(self.authkey), 15)
33+
self.assertTrue(self.forkserver._forkserver_pid)
34+
35+
def tearDown(self):
36+
self.pool.terminate()
37+
self.pool.join()
38+
super().tearDown()
39+
40+
def test_auth_works(self):
41+
"""FYI: An 'EOFError: ran out of input' from a worker is normal."""
42+
# First, demonstrate that a raw auth handshake as Client makes
43+
# does not raise.
44+
client = multiprocessing.connection.Client(
45+
self.addr, authkey=self.authkey)
46+
client.close()
47+
48+
# Now use forkserver code to do the same thing and more.
49+
status_r, data_w = self.forkserver.connect_to_new_process([])
50+
# It is normal for this to trigger an EOFError on stderr from the
51+
# process... it is expecting us to send over a pickle of a Process
52+
# instance to tell it what to do.
53+
# If the authentication handshake and subsequent file descriptor
54+
# sending dance had failed, an exception would've been raised.
55+
os.close(data_w)
56+
os.close(status_r)
57+
58+
def test_no_auth_fails(self):
59+
with mock.patch.object(self.forkserver, '_forkserver_authkey', None):
60+
# With no authkey set, the connection this makes will fail to
61+
# do the file descriptor transfer over the pipe.
62+
with self.assertRaisesRegex(RuntimeError, 'not receive ack'):
63+
status_r, data_w = self.forkserver.connect_to_new_process([])
64+
65+
1566
if __name__ == '__main__':
1667
unittest.main()

0 commit comments

Comments
 (0)