Skip to content

Commit 6972e94

Browse files
pitroumrocklin
authored andcommitted
Use forkserver on Unix and Python 3 (#687)
* Use forkserver on Unix and Python 3 * Improve test suite speed * Install lz4 and paramiko * Skip forking HDFS tests on py2
1 parent 563a3ee commit 6972e94

File tree

10 files changed

+131
-111
lines changed

10 files changed

+131
-111
lines changed

.travis.yml

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ install:
4848
# Install dependencies
4949
- conda create -q -n test-environment python=$TRAVIS_PYTHON_VERSION
5050
- source activate test-environment
51-
- conda install -q pytest pytest-timeout coverage tornado toolz dill futures dask ipywidgets psutil bokeh requests joblib mock ipykernel jupyter_client h5py netcdf4
51+
- conda install -q pytest pytest-timeout coverage tornado toolz dill futures dask ipywidgets psutil bokeh requests joblib mock ipykernel jupyter_client h5py netcdf4 lz4 paramiko
5252
- |
5353
if [[ $HDFS == true ]]; then
5454
conda install -q libxml2 krb5 boost
@@ -63,9 +63,10 @@ install:
6363
- python setup.py install
6464

6565
script:
66+
- export PYTEST_OPTIONS="--verbose -r s --timeout-method=thread --timeout=300"
6667
- |
6768
if [[ $HDFS == true ]]; then
68-
py.test distributed/tests/test_hdfs.py --verbose -r s --timeout-method=thread --timeout=30
69+
py.test distributed/tests/test_hdfs.py $PYTEST_OPTIONS
6970
if [ $? -ne 0 ]; then
7071
# Diagnose test error
7172
echo "--"
@@ -75,9 +76,9 @@ script:
7576
(exit 1)
7677
fi
7778
elif [[ $COVERAGE == true ]]; then
78-
coverage run $(which py.test) distributed -m "not avoid_travis" --verbose;
79+
coverage run $(which py.test) distributed -m "not avoid_travis" $PYTEST_OPTIONS;
7980
else
80-
py.test -m "not avoid_travis" distributed --verbose;
81+
py.test -m "not avoid_travis" distributed $PYTEST_OPTIONS;
8182
fi;
8283
8384
after_success:

distributed/bokeh/application.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import atexit
44
import json
55
import logging
6-
import multiprocessing
76
import os
87
import socket
98
import sys

distributed/cli/dask_scheduler.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import atexit
44
import json
55
import logging
6-
import multiprocessing
76
import os
87
import socket
98
import subprocess

distributed/nanny.py

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from datetime import datetime, timedelta
44
import json
55
import logging
6-
from multiprocessing import Process, Queue, queues
6+
from multiprocessing.queues import Empty
77
import os
88
import shutil
99
import subprocess
@@ -19,7 +19,7 @@
1919
from .core import Server, rpc, write, RPCClosed
2020
from .metrics import disk_io_counters, net_io_counters
2121
from .protocol import to_serialize
22-
from .utils import get_ip, ignoring, log_errors, tmpfile
22+
from .utils import get_ip, ignoring, log_errors, mp_context, tmpfile
2323
from .worker import _ncores, Worker, run, TOTAL_MEMORY
2424

2525
nanny_environment = os.path.dirname(sys.executable)
@@ -191,13 +191,14 @@ def instantiate(self, stream=None, environment=None):
191191
except JSONDecodeError:
192192
yield gen.sleep(0.01)
193193
else:
194-
q = Queue()
195-
self.process = Process(target=run_worker_fork,
196-
args=(q, self.ip, self.scheduler.ip,
197-
self.scheduler.port, self.ncores,
198-
self.port, self._given_worker_port,
199-
self.local_dir, self.services, self.name,
200-
self.memory_limit, self.reconnect))
194+
q = mp_context.Queue()
195+
self.process = mp_context.Process(
196+
target=run_worker_fork,
197+
args=(q, self.ip, self.scheduler.ip,
198+
self.scheduler.port, self.ncores,
199+
self.port, self._given_worker_port,
200+
self.local_dir, self.services, self.name,
201+
self.memory_limit, self.reconnect))
201202
self.process.daemon = True
202203
self.process.start()
203204
while True:
@@ -209,7 +210,7 @@ def instantiate(self, stream=None, environment=None):
209210
self.worker_dir = msg['dir']
210211
assert self.worker_port
211212
break
212-
except queues.Empty:
213+
except Empty:
213214
yield gen.sleep(0.1)
214215

215216
logger.info("Nanny %s:%d starts worker process %s:%d",

distributed/tests/test_client.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
from concurrent.futures import CancelledError
77
from datetime import timedelta
88
import itertools
9-
from multiprocessing import Process
109
import os
1110
import pickle
1211
from random import random, choice
@@ -34,7 +33,7 @@
3433
temp_default_client, get_restrictions)
3534
from distributed.scheduler import Scheduler, KilledWorker
3635
from distributed.sizeof import sizeof
37-
from distributed.utils import sync, tmp_text, ignoring, tokey, All
36+
from distributed.utils import sync, tmp_text, ignoring, tokey, All, mp_context
3837
from distributed.utils_test import (cluster, slow, slowinc, slowadd, randominc,
3938
loop, inc, dec, div, throws, gen_cluster, gen_test, double, deep)
4039

@@ -1614,7 +1613,7 @@ def long_running_client_connection(ip, port):
16141613

16151614
@gen_cluster()
16161615
def test_cleanup_after_broken_client_connection(s, a, b):
1617-
proc = Process(target=long_running_client_connection, args=(s.ip, s.port))
1616+
proc = mp_context.Process(target=long_running_client_connection, args=(s.ip, s.port))
16181617
proc.daemon = True
16191618
proc.start()
16201619

distributed/tests/test_core.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
from __future__ import print_function, division, absolute_import
22

33
from functools import partial
4-
from multiprocessing import Process
54
import socket
65
from time import time
76

@@ -13,6 +12,7 @@
1312
coerce_to_rpc, send_recv, coerce_to_address, ConnectionPool)
1413
from distributed.utils_test import slow, loop, gen_test
1514

15+
1616
def test_server(loop):
1717
@gen.coroutine
1818
def f():

0 commit comments

Comments
 (0)