diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md index c75019cffff..0c693bd7f72 100644 --- a/.github/PULL_REQUEST_TEMPLATE.md +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -1,3 +1,3 @@ - [ ] Closes #xxxx - [ ] Tests added / passed -- [ ] Passes `black distributed` / `flake8 distributed` +- [ ] Passes `black distributed` / `flake8 distributed` / `isort distributed` diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index ca1c26a7d87..fb4d7de629c 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,4 +1,9 @@ repos: + - repo: https://github.com/pycqa/isort + rev: 5.7.0 + hooks: + - id: isort + language_version: python3 - repo: https://github.com/psf/black rev: 20.8b1 hooks: diff --git a/conftest.py b/conftest.py index 07adc4982f6..e8a159fa4a3 100644 --- a/conftest.py +++ b/conftest.py @@ -1,7 +1,6 @@ # https://pytest.org/latest/example/simple.html#control-skipping-of-tests-according-to-command-line-option import pytest - # Uncomment to enable more logging and checks # (https://docs.python.org/3/library/asyncio-dev.html) # Note this makes things slower and might consume much memory. diff --git a/distributed/__init__.py b/distributed/__init__.py index 475288a3b32..70bde950a0e 100644 --- a/distributed/__init__.py +++ b/distributed/__init__.py @@ -1,42 +1,42 @@ -from . import config +from . import config # isort:skip import dask from dask.config import config + +from ._version import get_versions from .actor import Actor, ActorFuture -from .core import connect, rpc, Status -from .deploy import LocalCluster, Adaptive, SpecCluster, SSHCluster -from .diagnostics.progressbar import progress -from .diagnostics.plugin import WorkerPlugin, SchedulerPlugin, PipInstall from .client import ( Client, - Executor, CompatibleExecutor, - wait, + Executor, + Future, as_completed, default_client, fire_and_forget, - Future, futures_of, + get_task_metadata, get_task_stream, performance_report, - get_task_metadata, + wait, ) +from .core import Status, connect, rpc +from .deploy import Adaptive, LocalCluster, SpecCluster, SSHCluster +from .diagnostics.plugin import PipInstall, SchedulerPlugin, WorkerPlugin +from .diagnostics.progressbar import progress +from .event import Event from .lock import Lock from .multi_lock import MultiLock from .nanny import Nanny from .pubsub import Pub, Sub from .queues import Queue +from .scheduler import Scheduler from .security import Security from .semaphore import Semaphore -from .event import Event -from .scheduler import Scheduler from .threadpoolexecutor import rejoin -from .utils import sync, TimeoutError, CancelledError +from .utils import CancelledError, TimeoutError, sync from .variable import Variable -from .worker import Worker, get_worker, get_client, secede, Reschedule +from .worker import Reschedule, Worker, get_client, get_worker, secede from .worker_client import local_client, worker_client -from ._version import get_versions - versions = get_versions() __version__ = versions["version"] __git_revision__ = versions["full-revisionid"] diff --git a/distributed/_concurrent_futures_thread.py b/distributed/_concurrent_futures_thread.py index b26da12cb7a..1b6f328991f 100644 --- a/distributed/_concurrent_futures_thread.py +++ b/distributed/_concurrent_futures_thread.py @@ -8,16 +8,17 @@ __author__ = "Brian Quinlan (brian@sweetapp.com)" import atexit -from concurrent.futures import _base import itertools +from concurrent.futures import _base try: import queue except ImportError: import Queue as queue + +import os import threading import weakref -import os # Workers are created as daemon threads. This is done to allow the interpreter # to exit when there are still idle threads in a ThreadPoolExecutor's thread diff --git a/distributed/_ipython_utils.py b/distributed/_ipython_utils.py index 8aa1fe7ad7b..8e83a78ff4f 100644 --- a/distributed/_ipython_utils.py +++ b/distributed/_ipython_utils.py @@ -12,19 +12,17 @@ except ImportError: # Python 2 import Queue as queue -from subprocess import Popen + import sys -from threading import Thread +from subprocess import Popen +from threading import Event, Thread from uuid import uuid4 -from tornado.gen import TimeoutError -from tornado.ioloop import IOLoop -from threading import Event - from IPython import get_ipython from jupyter_client import BlockingKernelClient, write_connection_file from jupyter_core.paths import jupyter_runtime_dir - +from tornado.gen import TimeoutError +from tornado.ioloop import IOLoop OUTPUT_TIMEOUT = 10 diff --git a/distributed/actor.py b/distributed/actor.py index b9149fec7f4..0facdda4cb8 100644 --- a/distributed/actor.py +++ b/distributed/actor.py @@ -5,7 +5,7 @@ from .client import Future, default_client from .protocol import to_serialize -from .utils import iscoroutinefunction, thread_state, sync +from .utils import iscoroutinefunction, sync, thread_state from .utils_comm import WrappedKey from .worker import get_worker diff --git a/distributed/batched.py b/distributed/batched.py index 7b2523fc1e0..dc20de6f220 100644 --- a/distributed/batched.py +++ b/distributed/batched.py @@ -1,5 +1,5 @@ -from collections import deque import logging +from collections import deque import dask from tornado import gen, locks @@ -8,7 +8,6 @@ from .core import CommClosedError from .utils import parse_timedelta - logger = logging.getLogger(__name__) diff --git a/distributed/cfexecutor.py b/distributed/cfexecutor.py index e11c96c2821..c86c989abac 100644 --- a/distributed/cfexecutor.py +++ b/distributed/cfexecutor.py @@ -1,12 +1,11 @@ -import concurrent.futures as cf import weakref +from concurrent import futures as cf from tlz import merge - from tornado import gen from .metrics import time -from .utils import sync, TimeoutError +from .utils import TimeoutError, sync @gen.coroutine diff --git a/distributed/cli/dask_scheduler.py b/distributed/cli/dask_scheduler.py index 335fcd63b5c..702772667d2 100755 --- a/distributed/cli/dask_scheduler.py +++ b/distributed/cli/dask_scheduler.py @@ -1,6 +1,6 @@ import atexit -import logging import gc +import logging import os import re import sys @@ -8,17 +8,16 @@ import click import dask - from tornado.ioloop import IOLoop from distributed import Scheduler -from distributed.preloading import validate_preload_argv from distributed.cli.utils import check_python_3, install_signal_handlers -from distributed.utils import deserialize_for_cli +from distributed.preloading import validate_preload_argv from distributed.proctitle import ( enable_proctitle_on_children, enable_proctitle_on_current, ) +from distributed.utils import deserialize_for_cli logger = logging.getLogger("distributed.scheduler") diff --git a/distributed/cli/dask_spec.py b/distributed/cli/dask_spec.py index 299878a3a46..9d4b4e4ca45 100644 --- a/distributed/cli/dask_spec.py +++ b/distributed/cli/dask_spec.py @@ -1,11 +1,12 @@ import asyncio -import click import json import os import sys -import yaml +import click import dask.config +import yaml + from distributed.deploy.spec import run_spec from distributed.utils import deserialize_for_cli diff --git a/distributed/cli/dask_ssh.py b/distributed/cli/dask_ssh.py index f592c572ac5..f81cd73d495 100755 --- a/distributed/cli/dask_ssh.py +++ b/distributed/cli/dask_ssh.py @@ -1,7 +1,7 @@ -from distributed.deploy.old_ssh import SSHCluster import click from distributed.cli.utils import check_python_3 +from distributed.deploy.old_ssh import SSHCluster @click.command( diff --git a/distributed/cli/dask_worker.py b/distributed/cli/dask_worker.py index 3d60ce35603..dcd60f2e540 100755 --- a/distributed/cli/dask_worker.py +++ b/distributed/cli/dask_worker.py @@ -1,16 +1,19 @@ import asyncio import atexit -from contextlib import suppress -import logging import gc +import logging import os import signal import sys import warnings +from contextlib import suppress import click import dask from dask.system import CPU_COUNT +from tlz import valmap +from tornado.ioloop import IOLoop, TimeoutError + from distributed import Nanny from distributed.cli.utils import check_python_3, install_signal_handlers from distributed.comm import get_address_host_port @@ -22,9 +25,6 @@ ) from distributed.utils import deserialize_for_cli, import_term -from tlz import valmap -from tornado.ioloop import IOLoop, TimeoutError - logger = logging.getLogger("distributed.dask_worker") diff --git a/distributed/cli/tests/test_dask_scheduler.py b/distributed/cli/tests/test_dask_scheduler.py index 6be7f3c365d..2e938db558f 100644 --- a/distributed/cli/tests/test_dask_scheduler.py +++ b/distributed/cli/tests/test_dask_scheduler.py @@ -3,26 +3,26 @@ pytest.importorskip("requests") import os -import requests -import socket import shutil +import socket import sys import tempfile from time import sleep +import requests from click.testing import CliRunner import distributed -from distributed import Scheduler, Client +import distributed.cli.dask_scheduler +from distributed import Client, Scheduler +from distributed.metrics import time from distributed.utils import get_ip, get_ip_interface, tmpfile +from distributed.utils_test import loop # noqa: F401 from distributed.utils_test import ( - popen, assert_can_connect_from_everywhere_4_6, assert_can_connect_locally_4, + popen, ) -from distributed.utils_test import loop # noqa: F401 -from distributed.metrics import time -import distributed.cli.dask_scheduler def test_defaults(loop): diff --git a/distributed/cli/tests/test_dask_spec.py b/distributed/cli/tests/test_dask_spec.py index a18b9fb383a..0a5f64fc484 100644 --- a/distributed/cli/tests/test_dask_spec.py +++ b/distributed/cli/tests/test_dask_spec.py @@ -1,10 +1,11 @@ -import pytest import sys + +import pytest import yaml from distributed import Client -from distributed.utils_test import popen from distributed.utils_test import cleanup # noqa: F401 +from distributed.utils_test import popen @pytest.mark.asyncio diff --git a/distributed/cli/tests/test_dask_ssh.py b/distributed/cli/tests/test_dask_ssh.py index 9be8cb06f62..b73cd66d914 100644 --- a/distributed/cli/tests/test_dask_ssh.py +++ b/distributed/cli/tests/test_dask_ssh.py @@ -1,4 +1,5 @@ from click.testing import CliRunner + from distributed.cli.dask_ssh import main diff --git a/distributed/cli/tests/test_dask_worker.py b/distributed/cli/tests/test_dask_worker.py index 853b9964128..98724c6754b 100644 --- a/distributed/cli/tests/test_dask_worker.py +++ b/distributed/cli/tests/test_dask_worker.py @@ -1,22 +1,29 @@ import asyncio + import pytest from click.testing import CliRunner pytest.importorskip("requests") -import requests -import sys import os -from time import sleep +import sys from multiprocessing import cpu_count +from time import sleep + +import requests import distributed.cli.dask_worker from distributed import Client, Scheduler from distributed.deploy.utils import nprocesses_nthreads from distributed.metrics import time -from distributed.utils import sync, tmpfile, parse_ports -from distributed.utils_test import popen, terminate_process, wait_for_port -from distributed.utils_test import loop, cleanup # noqa: F401 +from distributed.utils import parse_ports, sync, tmpfile +from distributed.utils_test import ( # noqa: F401 + cleanup, + loop, + popen, + terminate_process, + wait_for_port, +) def test_nanny_worker_ports(loop): diff --git a/distributed/cli/tests/test_tls_cli.py b/distributed/cli/tests/test_tls_cli.py index def31bc244d..9301b47ab3e 100644 --- a/distributed/cli/tests/test_tls_cli.py +++ b/distributed/cli/tests/test_tls_cli.py @@ -1,16 +1,15 @@ from time import sleep from distributed import Client +from distributed.metrics import time +from distributed.utils_test import loop # noqa: F401 from distributed.utils_test import ( - popen, get_cert, new_config_file, - tls_security, + popen, tls_only_config, + tls_security, ) -from distributed.utils_test import loop # noqa: F401 -from distributed.metrics import time - ca_file = get_cert("tls-ca-cert.pem") cert = get_cert("tls-cert.pem") diff --git a/distributed/cli/utils.py b/distributed/cli/utils.py index c1bff051534..b2515faff11 100644 --- a/distributed/cli/utils.py +++ b/distributed/cli/utils.py @@ -1,6 +1,5 @@ from tornado.ioloop import IOLoop - py3_err_msg = """ Warning: Your terminal does not set locales. diff --git a/distributed/client.py b/distributed/client.py index 2669a25473b..229a49c0689 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -1,37 +1,36 @@ import asyncio import atexit -from collections import defaultdict -from collections.abc import Iterator -from concurrent.futures import ThreadPoolExecutor -from concurrent.futures._base import DoneAndNotDoneFutures -from contextlib import contextmanager, suppress -from contextvars import ContextVar import copy import errno -from functools import partial import html import inspect import json import logging -from numbers import Number import os +import socket import sys -import uuid import threading -import socket -from queue import Queue as pyQueue +import uuid import warnings import weakref +from collections import defaultdict +from collections.abc import Iterator +from concurrent.futures import ThreadPoolExecutor +from concurrent.futures._base import DoneAndNotDoneFutures +from contextlib import contextmanager, suppress +from contextvars import ContextVar +from functools import partial +from numbers import Number +from queue import Queue as pyQueue import dask -from dask.base import tokenize, normalize_token, collections_to_dsk +from dask.base import collections_to_dsk, normalize_token, tokenize +from dask.compatibility import apply from dask.core import flatten +from dask.highlevelgraph import HighLevelGraph from dask.optimization import SubgraphCallable -from dask.compatibility import apply from dask.utils import ensure_dict, format_bytes, funcname, stringify -from dask.highlevelgraph import HighLevelGraph - -from tlz import first, groupby, merge, valmap, keymap, partition_all +from tlz import first, groupby, keymap, merge, partition_all, valmap try: from dask.delayed import single_key @@ -40,24 +39,18 @@ from tornado import gen from tornado.ioloop import IOLoop, PeriodicCallback +from . import versions as version_module from .batched import BatchedSend -from .utils_comm import ( - WrappedKey, - unpack_remotedata, - pack_data, - scatter_to_workers, - gather_from_workers, - retry_operation, -) from .cfexecutor import ClientExecutor from .core import ( - connect, - rpc, - clean_exception, CommClosedError, - PooledRPCCall, ConnectionPool, + PooledRPCCall, + clean_exception, + connect, + rpc, ) +from .diagnostics.plugin import UploadFile, WorkerPlugin from .metrics import time from .protocol import to_serialize from .protocol.pickle import dumps, loads @@ -66,26 +59,31 @@ from .security import Security from .sizeof import sizeof from .threadpoolexecutor import rejoin -from .worker import get_client, get_worker, secede -from .diagnostics.plugin import UploadFile, WorkerPlugin from .utils import ( All, - sync, - log_errors, + Any, + CancelledError, + LoopRunner, + TimeoutError, + format_dashboard_link, + has_keyword, key_split, - thread_state, + log_errors, no_default, - LoopRunner, parse_timedelta, shutting_down, - Any, - has_keyword, - format_dashboard_link, - TimeoutError, - CancelledError, + sync, + thread_state, ) -from . import versions as version_module - +from .utils_comm import ( + WrappedKey, + gather_from_workers, + pack_data, + retry_operation, + scatter_to_workers, + unpack_remotedata, +) +from .worker import get_client, get_worker, secede logger = logging.getLogger(__name__) @@ -3941,7 +3939,7 @@ async def _get_task_stream( source, figure = task_stream_figure(sizing_mode="stretch_both") source.data.update(rects) if plot == "save": - from bokeh.plotting import save, output_file + from bokeh.plotting import output_file, save output_file(filename=filename, title="Dask Task Stream") save(figure, filename=filename, resources=bokeh_resources) diff --git a/distributed/comm/__init__.py b/distributed/comm/__init__.py index 2ff679ada3d..af6d30812d3 100644 --- a/distributed/comm/__init__.py +++ b/distributed/comm/__init__.py @@ -1,21 +1,20 @@ from .addressing import ( - parse_address, - unparse_address, + get_address_host, + get_address_host_port, + get_local_address_for, normalize_address, + parse_address, parse_host_port, - unparse_host_port, resolve_address, - get_address_host_port, - get_address_host, - get_local_address_for, + unparse_address, + unparse_host_port, ) -from .core import connect, listen, Comm, CommClosedError +from .core import Comm, CommClosedError, connect, listen from .utils import get_tcp_server_address def _register_transports(): - from . import inproc - from . import tcp + from . import inproc, tcp try: from . import ucx diff --git a/distributed/comm/addressing.py b/distributed/comm/addressing.py index 537ecd4ef23..949fa31bcd1 100644 --- a/distributed/comm/addressing.py +++ b/distributed/comm/addressing.py @@ -1,9 +1,9 @@ import itertools + import dask -from . import registry from ..utils import get_ip_interface - +from . import registry DEFAULT_SCHEME = dask.config.get("distributed.comm.default-scheme") diff --git a/distributed/comm/core.py b/distributed/comm/core.py index 2a01ec3d728..6ee79723736 100644 --- a/distributed/comm/core.py +++ b/distributed/comm/core.py @@ -1,21 +1,20 @@ -from abc import ABC, abstractmethod, abstractproperty import asyncio -from contextlib import suppress import inspect import logging import random import sys import weakref +from abc import ABC, abstractmethod, abstractproperty +from contextlib import suppress import dask from ..metrics import time -from ..utils import parse_timedelta, TimeoutError +from ..protocol import pickle +from ..protocol.compression import get_default_compression +from ..utils import TimeoutError, parse_timedelta from . import registry from .addressing import parse_address -from ..protocol.compression import get_default_compression -from ..protocol import pickle - logger = logging.getLogger(__name__) diff --git a/distributed/comm/inproc.py b/distributed/comm/inproc.py index d93377975ba..7374fba188d 100644 --- a/distributed/comm/inproc.py +++ b/distributed/comm/inproc.py @@ -1,21 +1,19 @@ import asyncio -from collections import deque, namedtuple import itertools import logging import os import threading -import weakref import warnings +import weakref +from collections import deque, namedtuple from tornado.concurrent import Future from tornado.ioloop import IOLoop from ..protocol import nested_deserialize from ..utils import get_ip - +from .core import Comm, CommClosedError, Connector, Listener from .registry import Backend, backends -from .core import Comm, Connector, Listener, CommClosedError - logger = logging.getLogger(__name__) diff --git a/distributed/comm/tcp.py b/distributed/comm/tcp.py index b79a5105a0d..3ba49b7f869 100644 --- a/distributed/comm/tcp.py +++ b/distributed/comm/tcp.py @@ -2,11 +2,12 @@ import functools import logging import socket -from ssl import SSLError import struct import sys -from tornado import gen import weakref +from ssl import SSLError + +from tornado import gen try: import ssl @@ -19,16 +20,14 @@ from tornado.tcpclient import TCPClient from tornado.tcpserver import TCPServer +from ..protocol.utils import pack_frames_prelude, unpack_frames from ..system import MEMORY_LIMIT from ..threadpoolexecutor import ThreadPoolExecutor from ..utils import ensure_ip, get_ip, get_ipv6, nbytes, parse_timedelta, shutting_down - -from .registry import Backend, backends from .addressing import parse_host_port, unparse_host_port -from .core import Comm, Connector, Listener, CommClosedError, FatalCommClosedError -from .utils import to_frames, from_frames, get_tcp_server_address, ensure_concrete_host -from ..protocol.utils import pack_frames_prelude, unpack_frames - +from .core import Comm, CommClosedError, Connector, FatalCommClosedError, Listener +from .registry import Backend, backends +from .utils import ensure_concrete_host, from_frames, get_tcp_server_address, to_frames logger = logging.getLogger(__name__) diff --git a/distributed/comm/tests/test_comms.py b/distributed/comm/tests/test_comms.py index 56f3c7a1b11..0b323669223 100644 --- a/distributed/comm/tests/test_comms.py +++ b/distributed/comm/tests/test_comms.py @@ -7,10 +7,12 @@ from functools import partial import dask - -import distributed import pkg_resources import pytest +from tornado import ioloop +from tornado.concurrent import Future + +import distributed from distributed.comm import ( CommClosedError, connect, @@ -38,8 +40,6 @@ has_ipv6, requires_ipv6, ) -from tornado import ioloop -from tornado.concurrent import Future EXTERNAL_IP4 = get_ip() if has_ipv6(): diff --git a/distributed/comm/tests/test_ucx.py b/distributed/comm/tests/test_ucx.py index aa5095e2f3c..2a388904475 100644 --- a/distributed/comm/tests/test_ucx.py +++ b/distributed/comm/tests/test_ucx.py @@ -1,16 +1,15 @@ import asyncio + import pytest ucp = pytest.importorskip("ucp") -from distributed import Client, Worker, Scheduler, wait -from distributed.comm import ucx, listen, connect +from distributed import Client, Scheduler, Worker, wait +from distributed.comm import connect, listen, parse_address, ucx from distributed.comm.registry import backends, get_backend -from distributed.comm import ucx, parse_address -from distributed.protocol import to_serialize from distributed.deploy.local import LocalCluster -from distributed.utils_test import gen_test, loop, inc, cleanup, popen # noqa: 401 - +from distributed.protocol import to_serialize +from distributed.utils_test import cleanup, gen_test, inc, loop, popen # noqa: 401 try: HOST = ucp.get_address() diff --git a/distributed/comm/tests/test_ucx_config.py b/distributed/comm/tests/test_ucx_config.py index 43eb45acb23..c266bb75148 100644 --- a/distributed/comm/tests/test_ucx_config.py +++ b/distributed/comm/tests/test_ucx_config.py @@ -1,12 +1,13 @@ -import pytest from time import sleep import dask +import pytest from dask.utils import format_bytes + from distributed import Client -from distributed.utils_test import gen_test, loop, inc, cleanup, popen # noqa: 401 -from distributed.utils import get_ip from distributed.comm.ucx import _scrub_ucx_config +from distributed.utils import get_ip +from distributed.utils_test import cleanup, gen_test, inc, loop, popen # noqa: 401 try: HOST = get_ip() diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index 0d91b404ee2..7bd06776894 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -11,19 +11,19 @@ import dask -from .addressing import parse_host_port, unparse_host_port -from .core import Comm, Connector, Listener, CommClosedError -from .registry import Backend, backends -from .utils import ensure_concrete_host, to_frames, from_frames from ..utils import ( + CancelledError, ensure_ip, get_ip, get_ipv6, - nbytes, log_errors, - CancelledError, + nbytes, parse_bytes, ) +from .addressing import parse_host_port, unparse_host_port +from .core import Comm, CommClosedError, Connector, Listener +from .registry import Backend, backends +from .utils import ensure_concrete_host, from_frames, to_frames logger = logging.getLogger(__name__) diff --git a/distributed/comm/utils.py b/distributed/comm/utils.py index b3ac85feed8..15b9244329a 100644 --- a/distributed/comm/utils.py +++ b/distributed/comm/utils.py @@ -9,7 +9,6 @@ from .. import protocol from ..utils import get_ip, get_ipv6, nbytes, offload - logger = logging.getLogger(__name__) diff --git a/distributed/core.py b/distributed/core.py index bc2930a4f78..3e099ec1d3d 100644 --- a/distributed/core.py +++ b/distributed/core.py @@ -1,15 +1,15 @@ import asyncio -from collections import defaultdict -from contextlib import suppress -from enum import Enum -from functools import partial import inspect import logging import threading import traceback import uuid -import weakref import warnings +import weakref +from collections import defaultdict +from contextlib import suppress +from enum import Enum +from functools import partial import dask import tblib @@ -17,28 +17,27 @@ from tornado import gen from tornado.ioloop import IOLoop, PeriodicCallback +from . import profile, protocol from .comm import ( + CommClosedError, connect, + get_address_host_port, listen, - CommClosedError, normalize_address, unparse_host_port, - get_address_host_port, ) from .metrics import time -from . import profile from .system_monitor import SystemMonitor from .utils import ( - is_coroutine_function, - get_traceback, - truncate_exception, - shutting_down, - parse_timedelta, - has_keyword, CancelledError, TimeoutError, + get_traceback, + has_keyword, + is_coroutine_function, + parse_timedelta, + shutting_down, + truncate_exception, ) -from . import protocol class Status(Enum): diff --git a/distributed/counter.py b/distributed/counter.py index feffb69ce8c..d6c1dad9ecc 100644 --- a/distributed/counter.py +++ b/distributed/counter.py @@ -2,7 +2,6 @@ from tornado.ioloop import IOLoop, PeriodicCallback - try: from crick import TDigest except ImportError: diff --git a/distributed/dashboard/components/__init__.py b/distributed/dashboard/components/__init__.py index 78d60108c8e..8f8fd09ee5b 100644 --- a/distributed/dashboard/components/__init__.py +++ b/distributed/dashboard/components/__init__.py @@ -1,34 +1,34 @@ import asyncio +import weakref from bisect import bisect from operator import add from time import time -import weakref -from bokeh.layouts import row, column +import dask +from bokeh.layouts import column, row from bokeh.models import ( + BoxZoomTool, + Button, ColumnDataSource, - Plot, DataRange1d, - LinearAxis, HoverTool, - BoxZoomTool, - ResetTool, + LinearAxis, + OpenURL, PanTool, - WheelZoomTool, - Range1d, + Plot, Quad, - TapTool, - OpenURL, - Button, + Range1d, + ResetTool, Select, + TapTool, + WheelZoomTool, ) from bokeh.palettes import Spectral9 from bokeh.plotting import figure -import dask from tornado import gen -from distributed.dashboard.utils import without_property_validation, BOKEH_VERSION from distributed import profile +from distributed.dashboard.utils import BOKEH_VERSION, without_property_validation from distributed.utils import log_errors, parse_timedelta if dask.config.get("distributed.dashboard.export-tool"): diff --git a/distributed/dashboard/components/nvml.py b/distributed/dashboard/components/nvml.py index 34cce3c4bc7..9a77f22eee8 100644 --- a/distributed/dashboard/components/nvml.py +++ b/distributed/dashboard/components/nvml.py @@ -1,22 +1,21 @@ import math -from distributed.dashboard.components import DashboardComponent, add_periodic_callback - -from bokeh.plotting import figure from bokeh.models import ( - ColumnDataSource, BasicTicker, + ColumnDataSource, + HoverTool, NumeralTickFormatter, - TapTool, OpenURL, - HoverTool, + TapTool, ) -from tornado import escape +from bokeh.plotting import figure from dask.utils import format_bytes -from distributed.utils import log_errors -from distributed.dashboard.components.scheduler import BOKEH_THEME, TICKS_1024 -from distributed.dashboard.utils import without_property_validation, update +from tornado import escape +from distributed.dashboard.components import DashboardComponent, add_periodic_callback +from distributed.dashboard.components.scheduler import BOKEH_THEME, TICKS_1024 +from distributed.dashboard.utils import update, without_property_validation +from distributed.utils import log_errors try: import pynvml diff --git a/distributed/dashboard/components/scheduler.py b/distributed/dashboard/components/scheduler.py index 49032665150..f13d421b349 100644 --- a/distributed/dashboard/components/scheduler.py +++ b/distributed/dashboard/components/scheduler.py @@ -1,46 +1,46 @@ -from collections import defaultdict import logging import math -from numbers import Number import operator import os +from collections import defaultdict +from numbers import Number +import dask +from bokeh.io import curdoc from bokeh.layouts import column, row from bokeh.models import ( - ColumnDataSource, + AdaptiveTicker, + BasicTicker, + BoxSelectTool, + BoxZoomTool, + CDSView, ColorBar, + ColumnDataSource, DataRange1d, + GroupFilter, HoverTool, - ResetTool, - PanTool, - WheelZoomTool, - TapTool, + NumberFormatter, + NumeralTickFormatter, OpenURL, + Panel, + PanTool, Range1d, - value, - NumeralTickFormatter, - BoxZoomTool, - AdaptiveTicker, - BasicTicker, - NumberFormatter, - BoxSelectTool, - GroupFilter, - CDSView, + ResetTool, Tabs, - Panel, + TapTool, Title, + WheelZoomTool, + value, ) from bokeh.models.widgets import DataTable, TableColumn -from bokeh.plotting import figure from bokeh.palettes import Viridis11 +from bokeh.plotting import figure from bokeh.themes import Theme -from bokeh.transform import factor_cmap, linear_cmap, cumsum -from bokeh.io import curdoc -import dask +from bokeh.transform import cumsum, factor_cmap, linear_cmap from dask import config from dask.utils import format_bytes, key_split from tlz import pipe -from tlz.curried import map, concat, groupby +from tlz.curried import concat, groupby, map from tornado import escape try: @@ -51,24 +51,24 @@ from distributed.dashboard.components import add_periodic_callback from distributed.dashboard.components.shared import ( DashboardComponent, - ProfileTimePlot, ProfileServer, + ProfileTimePlot, SystemMonitor, ) from distributed.dashboard.utils import ( - transpose, BOKEH_VERSION, PROFILING, - without_property_validation, + transpose, update, + without_property_validation, ) -from distributed.metrics import time -from distributed.utils import log_errors, format_time, parse_timedelta -from distributed.diagnostics.progress_stream import color_of, progress_quads from distributed.diagnostics.graph_layout import GraphLayout +from distributed.diagnostics.progress_stream import color_of, progress_quads from distributed.diagnostics.task_stream import TaskStreamPlugin from distributed.diagnostics.task_stream import color_of as ts_color_of from distributed.diagnostics.task_stream import colors as ts_color_lookup +from distributed.metrics import time +from distributed.utils import format_time, log_errors, parse_timedelta if dask.config.get("distributed.dashboard.export-tool"): from distributed.dashboard.export_tool import ExportTool diff --git a/distributed/dashboard/components/shared.py b/distributed/dashboard/components/shared.py index 037b256b188..5a20c0724d0 100644 --- a/distributed/dashboard/components/shared.py +++ b/distributed/dashboard/components/shared.py @@ -1,31 +1,31 @@ import asyncio import weakref -from bokeh.layouts import row, column +import dask +import tlz as toolz +from bokeh.layouts import column, row from bokeh.models import ( + Button, ColumnDataSource, DataRange1d, HoverTool, + NumeralTickFormatter, Range1d, - Button, Select, - NumeralTickFormatter, ) from bokeh.palettes import Spectral9 from bokeh.plotting import figure -import dask from tornado import gen -import tlz as toolz +from distributed import profile +from distributed.compatibility import WINDOWS from distributed.dashboard.components import DashboardComponent from distributed.dashboard.utils import ( - without_property_validation, BOKEH_VERSION, update, + without_property_validation, ) -from distributed import profile from distributed.utils import log_errors, parse_timedelta -from distributed.compatibility import WINDOWS if dask.config.get("distributed.dashboard.export-tool"): from distributed.dashboard.export_tool import ExportTool diff --git a/distributed/dashboard/components/worker.py b/distributed/dashboard/components/worker.py index ee9ad65d2e1..93dcfd71d79 100644 --- a/distributed/dashboard/components/worker.py +++ b/distributed/dashboard/components/worker.py @@ -2,22 +2,21 @@ import math import os -from bokeh.layouts import row, column +from bokeh.layouts import column, row from bokeh.models import ( + BoxZoomTool, ColumnDataSource, DataRange1d, HoverTool, - BoxZoomTool, - ResetTool, - PanTool, - WheelZoomTool, NumeralTickFormatter, + PanTool, + ResetTool, Select, + WheelZoomTool, ) - from bokeh.models.widgets import DataTable, TableColumn -from bokeh.plotting import figure from bokeh.palettes import RdBu +from bokeh.plotting import figure from bokeh.themes import Theme from dask.utils import format_bytes from tlz import merge, partition_all @@ -25,15 +24,14 @@ from distributed.dashboard.components import add_periodic_callback from distributed.dashboard.components.shared import ( DashboardComponent, - ProfileTimePlot, ProfileServer, + ProfileTimePlot, SystemMonitor, ) -from distributed.dashboard.utils import transpose, without_property_validation, update +from distributed.dashboard.utils import transpose, update, without_property_validation from distributed.diagnostics.progress_stream import color_of from distributed.metrics import time -from distributed.utils import log_errors, key_split, format_time - +from distributed.utils import format_time, key_split, log_errors logger = logging.getLogger(__name__) diff --git a/distributed/dashboard/core.py b/distributed/dashboard/core.py index 916504f4d04..75f1ff9c1b7 100644 --- a/distributed/dashboard/core.py +++ b/distributed/dashboard/core.py @@ -1,6 +1,6 @@ -from distutils.version import LooseVersion import functools import warnings +from distutils.version import LooseVersion import bokeh from bokeh.server.server import BokehTornado @@ -9,11 +9,11 @@ from bokeh.server.util import create_hosts_allowlist except ImportError: from bokeh.server.util import create_hosts_whitelist as create_hosts_allowlist -from bokeh.application.handlers.function import FunctionHandler -from bokeh.application import Application + import dask import toolz - +from bokeh.application import Application +from bokeh.application.handlers.function import FunctionHandler if LooseVersion(bokeh.__version__) < LooseVersion("0.13.0"): warnings.warn( diff --git a/distributed/dashboard/export_tool.py b/distributed/dashboard/export_tool.py index d93d21b881b..a9be2d7a4bf 100644 --- a/distributed/dashboard/export_tool.py +++ b/distributed/dashboard/export_tool.py @@ -6,7 +6,6 @@ from bokeh.resources import CDN from bokeh.util.compiler import JavaScript - fn = __file__ fn = os.path.join(os.path.dirname(fn), "export_tool.js") with open(fn) as f: diff --git a/distributed/dashboard/scheduler.py b/distributed/dashboard/scheduler.py index 09a4339b50a..565e46a5e30 100644 --- a/distributed/dashboard/scheduler.py +++ b/distributed/dashboard/scheduler.py @@ -1,44 +1,43 @@ from urllib.parse import urljoin -from tornado.ioloop import IOLoop from tornado import web +from tornado.ioloop import IOLoop try: import numpy as np except ImportError: np = False -from .core import BokehApplication -from .components.worker import counters_doc +from .components.nvml import gpu_memory_doc, gpu_utilization_doc # noqa: 1708 from .components.scheduler import ( - systemmonitor_doc, - stealing_doc, - workers_doc, events_doc, - tasks_doc, - status_doc, - profile_doc, - profile_server_doc, graph_doc, - individual_task_stream_doc, - individual_progress_doc, - individual_graph_doc, - individual_profile_doc, - individual_profile_server_doc, - individual_nbytes_doc, - individual_cpu_doc, - individual_nprocessing_doc, - individual_workers_doc, + individual_aggregate_time_per_action_doc, individual_bandwidth_types_doc, individual_bandwidth_workers_doc, - individual_memory_by_key_doc, individual_compute_time_per_key_doc, - individual_aggregate_time_per_action_doc, + individual_cpu_doc, + individual_graph_doc, + individual_memory_by_key_doc, + individual_nbytes_doc, + individual_nprocessing_doc, + individual_profile_doc, + individual_profile_server_doc, + individual_progress_doc, individual_systemmonitor_doc, + individual_task_stream_doc, + individual_workers_doc, + profile_doc, + profile_server_doc, + status_doc, + stealing_doc, + systemmonitor_doc, + tasks_doc, + workers_doc, ) +from .components.worker import counters_doc +from .core import BokehApplication from .worker import counters_doc -from .components.nvml import gpu_memory_doc, gpu_utilization_doc # noqa: 1708 - template_variables = { "pages": ["status", "workers", "tasks", "system", "profile", "graph", "info"] diff --git a/distributed/dashboard/tests/test_components.py b/distributed/dashboard/tests/test_components.py index a3e444e17e6..bc9f6c74849 100644 --- a/distributed/dashboard/tests/test_components.py +++ b/distributed/dashboard/tests/test_components.py @@ -6,12 +6,12 @@ from bokeh.models import ColumnDataSource, Model -from distributed.utils_test import slowinc, gen_cluster from distributed.dashboard.components.shared import ( Processing, ProfilePlot, ProfileTimePlot, ) +from distributed.utils_test import gen_cluster, slowinc @pytest.mark.parametrize("Component", [Processing]) diff --git a/distributed/dashboard/tests/test_scheduler_bokeh.py b/distributed/dashboard/tests/test_scheduler_bokeh.py index 9c50ae50434..9e21c003e00 100644 --- a/distributed/dashboard/tests/test_scheduler_bokeh.py +++ b/distributed/dashboard/tests/test_scheduler_bokeh.py @@ -8,39 +8,39 @@ import pytest pytest.importorskip("bokeh") +import dask from bokeh.server.server import BokehTornado +from dask.core import flatten +from dask.utils import stringify from tlz import first from tornado.httpclient import AsyncHTTPClient, HTTPRequest -import dask -from dask.core import flatten -from dask.utils import stringify from distributed.client import wait from distributed.compatibility import MACOS -from distributed.metrics import time -from distributed.utils import format_dashboard_link -from distributed.utils_test import gen_cluster, inc, dec, slowinc, div, get_cert -from distributed.dashboard.components.worker import Counters -from distributed.dashboard.scheduler import applications +from distributed.dashboard import scheduler from distributed.dashboard.components.scheduler import ( - SystemMonitor, + AggregateAction, + ComputePerKey, + CurrentLoad, + Events, + MemoryByKey, + NBytesHistogram, Occupancy, - StealingTimeSeries, + ProcessingHistogram, + ProfileServer, StealingEvents, - Events, - TaskStream, + StealingTimeSeries, + SystemMonitor, + TaskGraph, TaskProgress, - CurrentLoad, - ProcessingHistogram, - NBytesHistogram, + TaskStream, WorkerTable, - TaskGraph, - ProfileServer, - MemoryByKey, - AggregateAction, - ComputePerKey, ) -from distributed.dashboard import scheduler +from distributed.dashboard.components.worker import Counters +from distributed.dashboard.scheduler import applications +from distributed.metrics import time +from distributed.utils import format_dashboard_link +from distributed.utils_test import dec, div, gen_cluster, get_cert, inc, slowinc scheduler.PROFILING = False diff --git a/distributed/dashboard/tests/test_worker_bokeh.py b/distributed/dashboard/tests/test_worker_bokeh.py index 6143e837529..b6f5476f618 100644 --- a/distributed/dashboard/tests/test_worker_bokeh.py +++ b/distributed/dashboard/tests/test_worker_bokeh.py @@ -10,17 +10,17 @@ from tornado.httpclient import AsyncHTTPClient from distributed.client import wait -from distributed.metrics import time -from distributed.utils_test import gen_cluster, inc, dec from distributed.dashboard.components.worker import ( - StateTable, - CrossFilter, CommunicatingStream, - ExecutingTimeSeries, CommunicatingTimeSeries, - SystemMonitor, Counters, + CrossFilter, + ExecutingTimeSeries, + StateTable, + SystemMonitor, ) +from distributed.metrics import time +from distributed.utils_test import dec, gen_cluster, inc @gen_cluster( diff --git a/distributed/dashboard/worker.py b/distributed/dashboard/worker.py index ff9ae3b2f7d..1e65cfcc765 100644 --- a/distributed/dashboard/worker.py +++ b/distributed/dashboard/worker.py @@ -1,14 +1,14 @@ +from tornado.ioloop import IOLoop + from .components.worker import ( - status_doc, - crossfilter_doc, - systemmonitor_doc, counters_doc, + crossfilter_doc, profile_doc, profile_server_doc, + status_doc, + systemmonitor_doc, ) from .core import BokehApplication -from tornado.ioloop import IOLoop - template_variables = { "pages": ["status", "system", "profile", "crossfilter", "profile-server"] diff --git a/distributed/deploy/__init__.py b/distributed/deploy/__init__.py index 0148328cd4c..1518942dc4c 100644 --- a/distributed/deploy/__init__.py +++ b/distributed/deploy/__init__.py @@ -1,10 +1,10 @@ from contextlib import suppress +from .adaptive import Adaptive from .cluster import Cluster from .local import LocalCluster +from .spec import ProcessInterface, SpecCluster from .ssh import SSHCluster -from .spec import SpecCluster, ProcessInterface -from .adaptive import Adaptive with suppress(ImportError): from .ssh import SSHCluster diff --git a/distributed/deploy/adaptive.py b/distributed/deploy/adaptive.py index d3e9ddb1fbf..ff73015a40a 100644 --- a/distributed/deploy/adaptive.py +++ b/distributed/deploy/adaptive.py @@ -1,10 +1,11 @@ -from inspect import isawaitable import logging +from inspect import isawaitable + import dask.config -from .adaptive_core import AdaptiveCore -from ..utils import log_errors, parse_timedelta from ..protocol import pickle +from ..utils import log_errors, parse_timedelta +from .adaptive_core import AdaptiveCore logger = logging.getLogger(__name__) diff --git a/distributed/deploy/adaptive_core.py b/distributed/deploy/adaptive_core.py index b74e013c947..7c95096956a 100644 --- a/distributed/deploy/adaptive_core.py +++ b/distributed/deploy/adaptive_core.py @@ -2,13 +2,12 @@ import logging import math -from tornado.ioloop import IOLoop, PeriodicCallback import tlz as toolz +from tornado.ioloop import IOLoop, PeriodicCallback from ..metrics import time from ..utils import parse_timedelta - logger = logging.getLogger(__name__) diff --git a/distributed/deploy/cluster.py b/distributed/deploy/cluster.py index 93a40cfa0f3..c22d21b4376 100644 --- a/distributed/deploy/cluster.py +++ b/distributed/deploy/cluster.py @@ -1,28 +1,26 @@ import asyncio import datetime -from contextlib import suppress import logging import threading -import warnings import uuid -from tornado.ioloop import PeriodicCallback +import warnings +from contextlib import suppress import dask.config from dask.utils import format_bytes - -from .adaptive import Adaptive +from tornado.ioloop import PeriodicCallback from ..core import Status from ..utils import ( - log_errors, - sync, Log, Logs, - thread_state, format_dashboard_link, + log_errors, parse_timedelta, + sync, + thread_state, ) - +from .adaptive import Adaptive logger = logging.getLogger(__name__) @@ -310,7 +308,7 @@ def _widget(self): pass try: - from ipywidgets import Layout, VBox, HBox, IntText, Button, HTML, Accordion + from ipywidgets import HTML, Accordion, Button, HBox, IntText, Layout, VBox except ImportError: self._cached_widget = None return None diff --git a/distributed/deploy/local.py b/distributed/deploy/local.py index a30a6e410f5..0282e2b507f 100644 --- a/distributed/deploy/local.py +++ b/distributed/deploy/local.py @@ -4,15 +4,15 @@ import warnings import weakref -from dask.system import CPU_COUNT import toolz +from dask.system import CPU_COUNT -from .spec import SpecCluster -from .utils import nprocesses_nthreads from ..nanny import Nanny from ..scheduler import Scheduler from ..security import Security from ..worker import Worker, parse_memory_limit +from .spec import SpecCluster +from .utils import nprocesses_nthreads logger = logging.getLogger(__name__) diff --git a/distributed/deploy/old_ssh.py b/distributed/deploy/old_ssh.py index 6d62e0cfd8b..77b01e2388f 100644 --- a/distributed/deploy/old_ssh.py +++ b/distributed/deploy/old_ssh.py @@ -1,6 +1,6 @@ import logging -import socket import os +import socket import sys import time import traceback @@ -13,10 +13,8 @@ from threading import Thread from tlz import merge - from tornado import gen - logger = logging.getLogger(__name__) @@ -36,7 +34,7 @@ class bcolors: def async_ssh(cmd_dict): import paramiko from paramiko.buffered_pipe import PipeTimeout - from paramiko.ssh_exception import SSHException, PasswordRequiredException + from paramiko.ssh_exception import PasswordRequiredException, SSHException ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) diff --git a/distributed/deploy/spec.py b/distributed/deploy/spec.py index ef1d8a4c6fa..3d816582032 100644 --- a/distributed/deploy/spec.py +++ b/distributed/deploy/spec.py @@ -1,29 +1,28 @@ import asyncio import atexit -from contextlib import suppress import copy import logging import math -import weakref import warnings +import weakref +from contextlib import suppress import dask from tornado import gen -from .adaptive import Adaptive -from .cluster import Cluster -from ..core import rpc, CommClosedError, Status +from ..core import CommClosedError, Status, rpc +from ..scheduler import Scheduler +from ..security import Security from ..utils import ( LoopRunner, - silence_logging, + TimeoutError, + import_term, parse_bytes, parse_timedelta, - import_term, - TimeoutError, + silence_logging, ) -from ..scheduler import Scheduler -from ..security import Security - +from .adaptive import Adaptive +from .cluster import Cluster logger = logging.getLogger(__name__) diff --git a/distributed/deploy/ssh.py b/distributed/deploy/ssh.py index 66003a49572..8945396475f 100644 --- a/distributed/deploy/ssh.py +++ b/distributed/deploy/ssh.py @@ -1,17 +1,16 @@ import logging import sys -from typing import List, Union import warnings import weakref +from typing import List, Union import dask -from .spec import SpecCluster, ProcessInterface from ..core import Status -from ..utils import cli_keywords from ..scheduler import Scheduler as _Scheduler +from ..utils import cli_keywords, serialize_for_cli from ..worker import Worker as _Worker -from ..utils import serialize_for_cli +from .spec import ProcessInterface, SpecCluster logger = logging.getLogger(__name__) diff --git a/distributed/deploy/tests/test_adaptive.py b/distributed/deploy/tests/test_adaptive.py index e747cf95a20..4e44baeaf08 100644 --- a/distributed/deploy/tests/test_adaptive.py +++ b/distributed/deploy/tests/test_adaptive.py @@ -6,10 +6,16 @@ import dask import pytest -from distributed import Client, wait, Adaptive, LocalCluster, SpecCluster, Worker -from distributed.utils_test import gen_test, slowinc, clean -from distributed.utils_test import loop, nodebug, cleanup # noqa: F401 +from distributed import Adaptive, Client, LocalCluster, SpecCluster, Worker, wait from distributed.metrics import time +from distributed.utils_test import ( # noqa: F401 + clean, + cleanup, + gen_test, + loop, + nodebug, + slowinc, +) @pytest.mark.asyncio diff --git a/distributed/deploy/tests/test_adaptive_core.py b/distributed/deploy/tests/test_adaptive_core.py index a073314223d..b4fc5768f82 100644 --- a/distributed/deploy/tests/test_adaptive_core.py +++ b/distributed/deploy/tests/test_adaptive_core.py @@ -1,4 +1,5 @@ import asyncio + import pytest from distributed.deploy.adaptive_core import AdaptiveCore diff --git a/distributed/deploy/tests/test_local.py b/distributed/deploy/tests/test_local.py index 8eaea2ba83f..82d7922f95f 100644 --- a/distributed/deploy/tests/test_local.py +++ b/distributed/deploy/tests/test_local.py @@ -1,42 +1,41 @@ import asyncio -from functools import partial import gc import subprocess import sys -from time import sleep -from threading import Lock import unittest import weakref from distutils.version import LooseVersion +from functools import partial +from threading import Lock +from time import sleep -from tornado.ioloop import IOLoop +import pytest import tornado +from dask.system import CPU_COUNT from tornado.httpclient import AsyncHTTPClient -import pytest +from tornado.ioloop import IOLoop -from dask.system import CPU_COUNT -from distributed import Client, Worker, Nanny, get_client +from distributed import Client, Nanny, Worker, get_client from distributed.core import Status from distributed.deploy.local import LocalCluster +from distributed.deploy.utils_test import ClusterTest from distributed.metrics import time from distributed.system import MEMORY_LIMIT +from distributed.utils import TimeoutError, sync from distributed.utils_test import ( # noqa: F401 + assert_can_connect_from_everywhere_4, + assert_can_connect_from_everywhere_4_6, + assert_can_connect_locally_4, + assert_cannot_connect, + captured_logger, clean, cleanup, - inc, gen_test, + inc, + loop, slowinc, - assert_cannot_connect, - assert_can_connect_locally_4, - assert_can_connect_from_everywhere_4, - assert_can_connect_from_everywhere_4_6, - captured_logger, tls_only_security, ) -from distributed.utils_test import loop # noqa: F401 -from distributed.utils import sync, TimeoutError - -from distributed.deploy.utils_test import ClusterTest def test_simple(loop): diff --git a/distributed/deploy/tests/test_slow_adaptive.py b/distributed/deploy/tests/test_slow_adaptive.py index e7021fc854a..bf89098ea0d 100644 --- a/distributed/deploy/tests/test_slow_adaptive.py +++ b/distributed/deploy/tests/test_slow_adaptive.py @@ -1,9 +1,10 @@ import asyncio + import pytest +from dask.distributed import Client, Scheduler, SpecCluster, Worker -from dask.distributed import Worker, Scheduler, SpecCluster, Client -from distributed.utils_test import slowinc, cleanup # noqa: F401 from distributed.metrics import time +from distributed.utils_test import cleanup, slowinc # noqa: F401 class SlowWorker: diff --git a/distributed/deploy/tests/test_spec_cluster.py b/distributed/deploy/tests/test_spec_cluster.py index afdc10cbe04..30ffc50114b 100644 --- a/distributed/deploy/tests/test_spec_cluster.py +++ b/distributed/deploy/tests/test_spec_cluster.py @@ -1,18 +1,19 @@ import asyncio import re -from time import sleep import warnings +from time import sleep import dask -from dask.distributed import SpecCluster, Worker, Client, Scheduler, Nanny -from distributed.core import Status +import pytest +import tlz as toolz +from dask.distributed import Client, Nanny, Scheduler, SpecCluster, Worker + from distributed.compatibility import WINDOWS -from distributed.deploy.spec import close_clusters, ProcessInterface, run_spec +from distributed.core import Status +from distributed.deploy.spec import ProcessInterface, close_clusters, run_spec from distributed.metrics import time -from distributed.utils_test import loop, cleanup # noqa: F401 from distributed.utils import is_valid_xml -import tlz as toolz -import pytest +from distributed.utils_test import cleanup, loop # noqa: F401 class MyWorker(Worker): diff --git a/distributed/deploy/tests/test_ssh.py b/distributed/deploy/tests/test_ssh.py index 0bea6f7dc75..6c330383d7d 100644 --- a/distributed/deploy/tests/test_ssh.py +++ b/distributed/deploy/tests/test_ssh.py @@ -3,7 +3,9 @@ pytest.importorskip("asyncssh") import sys + import dask + from distributed import Client from distributed.compatibility import MACOS, WINDOWS from distributed.deploy.ssh import SSHCluster diff --git a/distributed/deploy/utils_test.py b/distributed/deploy/utils_test.py index fd6ba03aae9..543020ec21b 100644 --- a/distributed/deploy/utils_test.py +++ b/distributed/deploy/utils_test.py @@ -1,7 +1,7 @@ -from ..client import Client - import pytest +from ..client import Client + class ClusterTest: Cluster = None diff --git a/distributed/diagnostics/eventstream.py b/distributed/diagnostics/eventstream.py index f1f70f458af..1f98e47c64b 100644 --- a/distributed/diagnostics/eventstream.py +++ b/distributed/diagnostics/eventstream.py @@ -1,10 +1,8 @@ import logging -from .plugin import SchedulerPlugin - -from ..core import connect, coerce_to_address +from ..core import coerce_to_address, connect from ..worker import dumps_function - +from .plugin import SchedulerPlugin logger = logging.getLogger(__name__) diff --git a/distributed/diagnostics/nvml.py b/distributed/diagnostics/nvml.py index c1bbb4161a8..cd39f9e04c5 100644 --- a/distributed/diagnostics/nvml.py +++ b/distributed/diagnostics/nvml.py @@ -1,4 +1,5 @@ import os + import pynvml nvmlInit = None diff --git a/distributed/diagnostics/progress.py b/distributed/diagnostics/progress.py index eeb3c8a2817..de88cd60c05 100644 --- a/distributed/diagnostics/progress.py +++ b/distributed/diagnostics/progress.py @@ -1,14 +1,13 @@ import asyncio -from collections import defaultdict import logging +from collections import defaultdict from timeit import default_timer +from dask.utils import stringify from tlz import groupby, valmap -from dask.utils import stringify -from .plugin import SchedulerPlugin from ..utils import key_split, key_split_group, log_errors - +from .plugin import SchedulerPlugin logger = logging.getLogger(__name__) diff --git a/distributed/diagnostics/progress_stream.py b/distributed/diagnostics/progress_stream.py index b17c38fc1b6..2ee2b1c5000 100644 --- a/distributed/diagnostics/progress_stream.py +++ b/distributed/diagnostics/progress_stream.py @@ -1,14 +1,12 @@ import logging -from tlz import valmap, merge +from tlz import merge, valmap -from .progress import AllProgress - -from ..core import connect, coerce_to_address +from ..core import coerce_to_address, connect from ..scheduler import Scheduler -from ..utils import key_split, color_of +from ..utils import color_of, key_split from ..worker import dumps_function - +from .progress import AllProgress logger = logging.getLogger(__name__) diff --git a/distributed/diagnostics/progressbar.py b/distributed/diagnostics/progressbar.py index 45884bd5b1c..851910e45d4 100644 --- a/distributed/diagnostics/progressbar.py +++ b/distributed/diagnostics/progressbar.py @@ -1,20 +1,18 @@ -from contextlib import suppress -import logging import html -from timeit import default_timer +import logging import sys import weakref +from contextlib import suppress +from timeit import default_timer from tlz import valmap from tornado.ioloop import IOLoop -from .progress import format_time, Progress, MultiProgress - -from ..core import connect, coerce_to_address, CommClosedError from ..client import default_client, futures_of +from ..core import CommClosedError, coerce_to_address, connect from ..protocol.pickle import dumps -from ..utils import key_split, is_kernel, LoopRunner, parse_timedelta - +from ..utils import LoopRunner, is_kernel, key_split, parse_timedelta +from .progress import MultiProgress, Progress, format_time logger = logging.getLogger(__name__) @@ -160,7 +158,7 @@ def __init__( ): super().__init__(keys, scheduler, interval, complete) - from ipywidgets import FloatProgress, HBox, VBox, HTML + from ipywidgets import HTML, FloatProgress, HBox, VBox self.elapsed_time = HTML("") self.bar = FloatProgress(min=0, max=1, description="") @@ -319,7 +317,7 @@ def __init__( self.widget = VBox([]) def make_widget(self, all): - from ipywidgets import FloatProgress, HBox, VBox, HTML + from ipywidgets import HTML, FloatProgress, HBox, VBox self.elapsed_time = HTML("") self.bars = {key: FloatProgress(min=0, max=1, description="") for key in all} diff --git a/distributed/diagnostics/task_stream.py b/distributed/diagnostics/task_stream.py index c319ca73d69..e276c477776 100644 --- a/distributed/diagnostics/task_stream.py +++ b/distributed/diagnostics/task_stream.py @@ -1,12 +1,12 @@ -from collections import deque import logging +from collections import deque import dask -from .progress_stream import color_of -from .plugin import SchedulerPlugin -from ..utils import key_split, format_time, parse_timedelta -from ..metrics import time +from ..metrics import time +from ..utils import format_time, key_split, parse_timedelta +from .plugin import SchedulerPlugin +from .progress_stream import color_of logger = logging.getLogger(__name__) diff --git a/distributed/diagnostics/tests/test_graph_layout.py b/distributed/diagnostics/tests/test_graph_layout.py index b63311f8432..b714b261a4c 100644 --- a/distributed/diagnostics/tests/test_graph_layout.py +++ b/distributed/diagnostics/tests/test_graph_layout.py @@ -1,9 +1,9 @@ import asyncio import operator -from distributed.utils_test import gen_cluster, inc -from distributed.diagnostics import GraphLayout from distributed import wait +from distributed.diagnostics import GraphLayout +from distributed.utils_test import gen_cluster, inc @gen_cluster(client=True) diff --git a/distributed/diagnostics/tests/test_nvml.py b/distributed/diagnostics/tests/test_nvml.py index 6182049fe40..6938a86850f 100644 --- a/distributed/diagnostics/tests/test_nvml.py +++ b/distributed/diagnostics/tests/test_nvml.py @@ -1,6 +1,7 @@ -import pytest import os +import pytest + pynvml = pytest.importorskip("pynvml") from distributed.diagnostics import nvml diff --git a/distributed/diagnostics/tests/test_progress.py b/distributed/diagnostics/tests/test_progress.py index 871dcb0c5a5..fa74bfa3b46 100644 --- a/distributed/diagnostics/tests/test_progress.py +++ b/distributed/diagnostics/tests/test_progress.py @@ -4,15 +4,15 @@ from distributed import Nanny from distributed.client import wait -from distributed.metrics import time -from distributed.utils_test import gen_cluster, inc, dec, div, nodebug from distributed.diagnostics.progress import ( - Progress, - SchedulerPlugin, AllProgress, GroupProgress, MultiProgress, + Progress, + SchedulerPlugin, ) +from distributed.metrics import time +from distributed.utils_test import dec, div, gen_cluster, inc, nodebug def f(*args): diff --git a/distributed/diagnostics/tests/test_progress_stream.py b/distributed/diagnostics/tests/test_progress_stream.py index 8f506b7a7bb..d65d953e6b5 100644 --- a/distributed/diagnostics/tests/test_progress_stream.py +++ b/distributed/diagnostics/tests/test_progress_stream.py @@ -3,6 +3,7 @@ pytest.importorskip("bokeh") from dask import delayed + from distributed.client import wait from distributed.diagnostics.progress_stream import progress_quads, progress_stream from distributed.utils_test import div, gen_cluster, inc diff --git a/distributed/diagnostics/tests/test_progressbar.py b/distributed/diagnostics/tests/test_progressbar.py index f19dbd2df26..36db0d7b45b 100644 --- a/distributed/diagnostics/tests/test_progressbar.py +++ b/distributed/diagnostics/tests/test_progressbar.py @@ -5,8 +5,14 @@ from distributed import Scheduler, Worker from distributed.diagnostics.progressbar import TextProgressBar, progress from distributed.metrics import time -from distributed.utils_test import inc, div, gen_cluster -from distributed.utils_test import client, loop, cluster_fixture # noqa: F401 +from distributed.utils_test import ( # noqa: F401 + client, + cluster_fixture, + div, + gen_cluster, + inc, + loop, +) def test_text_progressbar(capsys, client): diff --git a/distributed/diagnostics/tests/test_scheduler_plugin.py b/distributed/diagnostics/tests/test_scheduler_plugin.py index 3f1e54f6ed7..465b674971f 100644 --- a/distributed/diagnostics/tests/test_scheduler_plugin.py +++ b/distributed/diagnostics/tests/test_scheduler_plugin.py @@ -1,6 +1,7 @@ import pytest -from distributed import Scheduler, Worker, SchedulerPlugin -from distributed.utils_test import inc, gen_cluster, cleanup # noqa: F401 + +from distributed import Scheduler, SchedulerPlugin, Worker +from distributed.utils_test import cleanup, gen_cluster, inc # noqa: F401 @gen_cluster(client=True) diff --git a/distributed/diagnostics/tests/test_task_stream.py b/distributed/diagnostics/tests/test_task_stream.py index 4b57d18ee7a..642277e81dc 100644 --- a/distributed/diagnostics/tests/test_task_stream.py +++ b/distributed/diagnostics/tests/test_task_stream.py @@ -5,11 +5,18 @@ from tlz import frequencies from distributed import get_task_stream -from distributed.utils_test import gen_cluster, div, inc, slowinc -from distributed.utils_test import client, loop, cluster_fixture # noqa: F401 from distributed.client import wait from distributed.diagnostics.task_stream import TaskStreamPlugin from distributed.metrics import time +from distributed.utils_test import ( # noqa: F401 + client, + cluster_fixture, + div, + gen_cluster, + inc, + loop, + slowinc, +) @gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 3) diff --git a/distributed/diagnostics/tests/test_widgets.py b/distributed/diagnostics/tests/test_widgets.py index 2505f986406..bc86e436b24 100644 --- a/distributed/diagnostics/tests/test_widgets.py +++ b/distributed/diagnostics/tests/test_widgets.py @@ -2,10 +2,11 @@ pytest.importorskip("ipywidgets") -from distributed.compatibility import WINDOWS from ipykernel.comm import Comm from ipywidgets import Widget +from distributed.compatibility import WINDOWS + ################# # Utility stuff # ################# @@ -72,20 +73,28 @@ def record_display(*args): # Distributed stuff # ##################### -from operator import add import re +from operator import add from tlz import valmap from distributed.client import wait -from distributed.worker import dumps_task -from distributed.utils_test import inc, dec, throws, gen_cluster, gen_tls_cluster -from distributed.utils_test import client, loop, cluster_fixture # noqa: F401 from distributed.diagnostics.progressbar import ( - ProgressWidget, MultiProgressWidget, + ProgressWidget, progress, ) +from distributed.utils_test import ( # noqa: F401 + client, + cluster_fixture, + dec, + gen_cluster, + gen_tls_cluster, + inc, + loop, + throws, +) +from distributed.worker import dumps_task @gen_cluster(client=True) diff --git a/distributed/diagnostics/websocket.py b/distributed/diagnostics/websocket.py index e34961bfeff..51282c1e621 100644 --- a/distributed/diagnostics/websocket.py +++ b/distributed/diagnostics/websocket.py @@ -1,5 +1,5 @@ -from .plugin import SchedulerPlugin from ..utils import key_split +from .plugin import SchedulerPlugin from .task_stream import colors diff --git a/distributed/diskutils.py b/distributed/diskutils.py index e9dbb7b25bf..49d0a26222c 100644 --- a/distributed/diskutils.py +++ b/distributed/diskutils.py @@ -11,7 +11,6 @@ from . import locket - logger = logging.getLogger(__name__) DIR_LOCK_EXT = ".dirlock" diff --git a/distributed/event.py b/distributed/event.py index 0136d35ef26..a3e2a1b7eeb 100644 --- a/distributed/event.py +++ b/distributed/event.py @@ -1,13 +1,12 @@ import asyncio -from collections import defaultdict -from contextlib import suppress import logging import uuid +from collections import defaultdict +from contextlib import suppress from .client import Client -from .utils import log_errors, TimeoutError +from .utils import TimeoutError, log_errors, parse_timedelta from .worker import get_worker -from .utils import parse_timedelta logger = logging.getLogger(__name__) diff --git a/distributed/http/routing.py b/distributed/http/routing.py index 8a1d90d5490..7de870faa72 100644 --- a/distributed/http/routing.py +++ b/distributed/http/routing.py @@ -1,7 +1,8 @@ import os -from tornado import web + import tornado.httputil import tornado.routing +from tornado import web def _descend_routes(router, routers=set(), out=set()): diff --git a/distributed/http/scheduler/info.py b/distributed/http/scheduler/info.py index 96199faba38..01446531285 100644 --- a/distributed/http/scheduler/info.py +++ b/distributed/http/scheduler/info.py @@ -1,19 +1,18 @@ -from datetime import datetime import json import logging import os import os.path +from datetime import datetime from dask.utils import format_bytes - +from tlz import first, merge from tornado import escape from tornado.websocket import WebSocketHandler -from tlz import first, merge -from ..utils import RequestHandler, redirect from ...diagnostics.websocket import WebsocketPlugin from ...metrics import time -from ...utils import log_errors, format_time +from ...utils import format_time, log_errors +from ..utils import RequestHandler, redirect ns = { func.__name__: func diff --git a/distributed/http/scheduler/json.py b/distributed/http/scheduler/json.py index 5dc09b4b6fe..61801a00d34 100644 --- a/distributed/http/scheduler/json.py +++ b/distributed/http/scheduler/json.py @@ -1,5 +1,5 @@ -from ..utils import RequestHandler from ...utils import log_errors +from ..utils import RequestHandler class CountsJSON(RequestHandler): diff --git a/distributed/http/scheduler/missing_bokeh.py b/distributed/http/scheduler/missing_bokeh.py index 3eb68960d53..917e79f610b 100644 --- a/distributed/http/scheduler/missing_bokeh.py +++ b/distributed/http/scheduler/missing_bokeh.py @@ -1,5 +1,5 @@ -from ..utils import RequestHandler, redirect from ...utils import log_errors +from ..utils import RequestHandler, redirect class MissingBokeh(RequestHandler): diff --git a/distributed/http/scheduler/prometheus/__init__.py b/distributed/http/scheduler/prometheus/__init__.py index 2016ca32512..120a01dab58 100644 --- a/distributed/http/scheduler/prometheus/__init__.py +++ b/distributed/http/scheduler/prometheus/__init__.py @@ -2,6 +2,7 @@ from distributed.http.utils import RequestHandler from distributed.scheduler import ALL_TASK_STATES + from .semaphore import SemaphoreMetricExtension @@ -10,7 +11,7 @@ def __init__(self, dask_server): self.server = dask_server def collect(self): - from prometheus_client.core import GaugeMetricFamily, CounterMetricFamily + from prometheus_client.core import CounterMetricFamily, GaugeMetricFamily yield GaugeMetricFamily( "dask_scheduler_clients", diff --git a/distributed/http/scheduler/prometheus/semaphore.py b/distributed/http/scheduler/prometheus/semaphore.py index f1df7434019..aac467b66cc 100644 --- a/distributed/http/scheduler/prometheus/semaphore.py +++ b/distributed/http/scheduler/prometheus/semaphore.py @@ -3,7 +3,7 @@ def __init__(self, dask_server): self.server = dask_server def collect(self): - from prometheus_client.core import GaugeMetricFamily, CounterMetricFamily + from prometheus_client.core import CounterMetricFamily, GaugeMetricFamily sem_ext = self.server.extensions["semaphores"] diff --git a/distributed/http/scheduler/tests/test_scheduler_http.py b/distributed/http/scheduler/tests/test_scheduler_http.py index 2aa4f58c4ea..65225520ed9 100644 --- a/distributed/http/scheduler/tests/test_scheduler_http.py +++ b/distributed/http/scheduler/tests/test_scheduler_http.py @@ -5,12 +5,12 @@ pytest.importorskip("bokeh") +from dask.sizeof import sizeof from tornado.escape import url_escape from tornado.httpclient import AsyncHTTPClient, HTTPClientError -from dask.sizeof import sizeof from distributed.utils import is_valid_xml -from distributed.utils_test import gen_cluster, slowinc, inc +from distributed.utils_test import gen_cluster, inc, slowinc @gen_cluster(client=True) diff --git a/distributed/http/scheduler/tests/test_semaphore_http.py b/distributed/http/scheduler/tests/test_semaphore_http.py index 4c66165b985..21996cb35f4 100644 --- a/distributed/http/scheduler/tests/test_semaphore_http.py +++ b/distributed/http/scheduler/tests/test_semaphore_http.py @@ -1,9 +1,8 @@ import pytest - from tornado.httpclient import AsyncHTTPClient -from distributed.utils_test import gen_cluster from distributed import Semaphore +from distributed.utils_test import gen_cluster @gen_cluster(client=True, clean_kwargs={"threads": False}) diff --git a/distributed/http/statics.py b/distributed/http/statics.py index e1c7a98e9a2..f838ac8522e 100644 --- a/distributed/http/statics.py +++ b/distributed/http/statics.py @@ -1,6 +1,7 @@ -from tornado import web import os +from tornado import web + routes = [ ( r"/statics/(.*)", diff --git a/distributed/http/tests/test_core.py b/distributed/http/tests/test_core.py index c1bffedb72e..61cb713fcf2 100644 --- a/distributed/http/tests/test_core.py +++ b/distributed/http/tests/test_core.py @@ -1,6 +1,7 @@ -from distributed.utils_test import gen_cluster from tornado.httpclient import AsyncHTTPClient +from distributed.utils_test import gen_cluster + @gen_cluster(client=True) async def test_scheduler(c, s, a, b): diff --git a/distributed/http/tests/test_routing.py b/distributed/http/tests/test_routing.py index ca7d071d256..764475ce4ed 100644 --- a/distributed/http/tests/test_routing.py +++ b/distributed/http/tests/test_routing.py @@ -1,6 +1,6 @@ +import pytest from tornado import web from tornado.httpclient import AsyncHTTPClient, HTTPClientError -import pytest from distributed.http.routing import RoutingApplication diff --git a/distributed/http/utils.py b/distributed/http/utils.py index 5977ccd5bad..c0de926c99b 100644 --- a/distributed/http/utils.py +++ b/distributed/http/utils.py @@ -2,12 +2,11 @@ import os from typing import List -from tornado import web import toolz +from tornado import web from ..utils import has_keyword - dirname = os.path.dirname(__file__) diff --git a/distributed/http/worker/prometheus.py b/distributed/http/worker/prometheus.py index b354cad3ea9..4d0c0a55e60 100644 --- a/distributed/http/worker/prometheus.py +++ b/distributed/http/worker/prometheus.py @@ -1,7 +1,7 @@ -from ..utils import RequestHandler - import logging +from ..utils import RequestHandler + class _PrometheusCollector: def __init__(self, server): diff --git a/distributed/http/worker/tests/test_worker_http.py b/distributed/http/worker/tests/test_worker_http.py index 5bc0a5debc9..e464c484ebe 100644 --- a/distributed/http/worker/tests/test_worker_http.py +++ b/distributed/http/worker/tests/test_worker_http.py @@ -1,6 +1,8 @@ -import pytest import json + +import pytest from tornado.httpclient import AsyncHTTPClient + from distributed.utils_test import gen_cluster diff --git a/distributed/lock.py b/distributed/lock.py index 7d1c1a4af57..7ffe8c8daf9 100644 --- a/distributed/lock.py +++ b/distributed/lock.py @@ -1,12 +1,11 @@ import asyncio -from collections import defaultdict, deque import logging import uuid +from collections import defaultdict, deque from .client import Client -from .utils import log_errors, TimeoutError +from .utils import TimeoutError, log_errors, parse_timedelta from .worker import get_worker -from .utils import parse_timedelta logger = logging.getLogger(__name__) diff --git a/distributed/locket.py b/distributed/locket.py index 65a10f195f7..906938e6085 100644 --- a/distributed/locket.py +++ b/distributed/locket.py @@ -3,9 +3,9 @@ # flake8: noqa -import time import errno import threading +import time import weakref __all__ = ["lock_file"] diff --git a/distributed/metrics.py b/distributed/metrics.py index 163a982b792..c52b40c5afd 100755 --- a/distributed/metrics.py +++ b/distributed/metrics.py @@ -1,8 +1,7 @@ import collections -from functools import wraps import sys import time as timemod - +from functools import wraps _empty_namedtuple = collections.namedtuple("_empty_namedtuple", ()) diff --git a/distributed/multi_lock.py b/distributed/multi_lock.py index 6d1df68c2f4..aaa21999a19 100644 --- a/distributed/multi_lock.py +++ b/distributed/multi_lock.py @@ -1,13 +1,12 @@ import asyncio -from collections import defaultdict import logging -from typing import Hashable, List import uuid +from collections import defaultdict +from typing import Hashable, List from .client import Client -from .utils import log_errors, TimeoutError +from .utils import TimeoutError, log_errors, parse_timedelta from .worker import get_worker -from .utils import parse_timedelta logger = logging.getLogger(__name__) diff --git a/distributed/nanny.py b/distributed/nanny.py index f93f18522ca..a62833aa32a 100644 --- a/distributed/nanny.py +++ b/distributed/nanny.py @@ -1,41 +1,40 @@ import asyncio -from contextlib import suppress import errno import logging -from multiprocessing.queues import Empty import os -import psutil import shutil import threading import uuid import warnings import weakref +from contextlib import suppress +from multiprocessing.queues import Empty import dask +import psutil from dask.system import CPU_COUNT -from tornado.ioloop import IOLoop, PeriodicCallback from tornado import gen +from tornado.ioloop import IOLoop, PeriodicCallback +from . import preloading from .comm import get_address_host, unparse_host_port from .comm.addressing import address_from_user_args -from .core import RPCClosed, CommClosedError, coerce_to_address, Status +from .core import CommClosedError, RPCClosed, Status, coerce_to_address from .metrics import time from .node import ServerNode -from . import preloading from .process import AsyncProcess from .proctitle import enable_proctitle_on_children from .security import Security from .utils import ( + TimeoutError, get_ip, - mp_context, - silence_logging, json_load_robust, - parse_timedelta, + mp_context, parse_ports, - TimeoutError, + parse_timedelta, + silence_logging, ) -from .worker import run, parse_memory_limit, Worker - +from .worker import Worker, parse_memory_limit, run logger = logging.getLogger(__name__) diff --git a/distributed/node.py b/distributed/node.py index 997e6a1a988..247a8d85cc2 100644 --- a/distributed/node.py +++ b/distributed/node.py @@ -1,18 +1,17 @@ -from contextlib import suppress import logging import warnings import weakref +from contextlib import suppress -from tornado.httpserver import HTTPServer -import tlz import dask +import tlz +from tornado.httpserver import HTTPServer -from .comm import get_tcp_server_address -from .comm import get_address_host +from .comm import get_address_host, get_tcp_server_address from .core import Server from .http.routing import RoutingApplication -from .versions import get_versions from .utils import DequeHandler, clean_dashboard_address +from .versions import get_versions class ServerNode(Server): diff --git a/distributed/preloading.py b/distributed/preloading.py index dbbe42d57d0..bae94a149d8 100644 --- a/distributed/preloading.py +++ b/distributed/preloading.py @@ -1,17 +1,16 @@ +import filecmp import inspect import logging import os import shutil import sys -from typing import List -from types import ModuleType -import filecmp from importlib import import_module +from types import ModuleType +from typing import List import click -from tornado.httpclient import AsyncHTTPClient - from dask.utils import tmpfile +from tornado.httpclient import AsyncHTTPClient from .utils import import_file diff --git a/distributed/process.py b/distributed/process.py index 1c11dd2e3d7..f46527b5337 100644 --- a/distributed/process.py +++ b/distributed/process.py @@ -1,18 +1,17 @@ +import asyncio import logging import os -from queue import Queue as PyQueue import re import threading import weakref -import asyncio -import dask - -from .utils import mp_context, TimeoutError +from queue import Queue as PyQueue +import dask from tornado import gen from tornado.concurrent import Future from tornado.ioloop import IOLoop +from .utils import TimeoutError, mp_context logger = logging.getLogger(__name__) diff --git a/distributed/profile.py b/distributed/profile.py index 78e1765d041..fbaf6ea6599 100644 --- a/distributed/profile.py +++ b/distributed/profile.py @@ -25,16 +25,16 @@ } """ import bisect -from collections import defaultdict, deque import linecache import sys import threading +from collections import defaultdict, deque from time import sleep import tlz as toolz from .metrics import time -from .utils import format_time, color_of, parse_timedelta +from .utils import color_of, format_time, parse_timedelta def identifier(frame): @@ -375,8 +375,8 @@ def plot_figure(data, **kwargs): -------- plot_data """ - from bokeh.plotting import ColumnDataSource, figure from bokeh.models import HoverTool + from bokeh.plotting import ColumnDataSource, figure if "states" in data: data = toolz.dissoc(data, "states") diff --git a/distributed/protocol/__init__.py b/distributed/protocol/__init__.py index be1c498c35c..36aa3f42c12 100644 --- a/distributed/protocol/__init__.py +++ b/distributed/protocol/__init__.py @@ -1,25 +1,25 @@ from contextlib import suppress -from functools import partial from distutils.version import LooseVersion +from functools import partial from .compression import compressions, default_compression -from .core import dumps, loads, maybe_compress, decompress, msgpack -from .cuda import cuda_serialize, cuda_deserialize +from .core import decompress, dumps, loads, maybe_compress, msgpack +from .cuda import cuda_deserialize, cuda_serialize from .serialize import ( - serialize, - deserialize, - nested_deserialize, Serialize, Serialized, - to_serialize, - register_serialization, - dask_serialize, dask_deserialize, - serialize_bytes, + dask_serialize, + deserialize, deserialize_bytes, - serialize_bytelist, - register_serialization_family, + nested_deserialize, register_generic, + register_serialization, + register_serialization_family, + serialize, + serialize_bytelist, + serialize_bytes, + to_serialize, ) diff --git a/distributed/protocol/arrow.py b/distributed/protocol/arrow.py index 1f2b4e83e9a..2850c47466e 100644 --- a/distributed/protocol/arrow.py +++ b/distributed/protocol/arrow.py @@ -1,7 +1,7 @@ -from .serialize import dask_serialize, dask_deserialize - import pyarrow +from .serialize import dask_deserialize, dask_serialize + if pyarrow.__version__ < "0.10": raise ImportError( "Need pyarrow >= 0.10 . " diff --git a/distributed/protocol/compression.py b/distributed/protocol/compression.py index b067ae0e526..5131a4f53f3 100644 --- a/distributed/protocol/compression.py +++ b/distributed/protocol/compression.py @@ -3,10 +3,10 @@ Includes utilities for determining whether or not to compress """ -from contextlib import suppress -from functools import partial import logging import random +from contextlib import suppress +from functools import partial import dask from tlz import identity @@ -22,7 +22,6 @@ from ..utils import ensure_bytes - compressions = {None: {"compress": identity, "decompress": identity}} compressions[False] = compressions[None] # alias diff --git a/distributed/protocol/core.py b/distributed/protocol/core.py index 4235dca9f9c..05a804d3b52 100644 --- a/distributed/protocol/core.py +++ b/distributed/protocol/core.py @@ -1,18 +1,18 @@ import logging + import msgpack -from .compression import compressions, maybe_compress, decompress +from .compression import compressions, decompress, maybe_compress from .serialize import ( Serialize, Serialized, + merge_and_deserialize, msgpack_decode_default, msgpack_encode_default, - merge_and_deserialize, serialize_and_split, ) from .utils import msgpack_opts - logger = logging.getLogger(__name__) diff --git a/distributed/protocol/cuda.py b/distributed/protocol/cuda.py index 44ed6a033df..572b63547f5 100644 --- a/distributed/protocol/cuda.py +++ b/distributed/protocol/cuda.py @@ -1,8 +1,8 @@ import dask +from dask.utils import typename from . import pickle from .serialize import ObjectDictSerializer, register_serialization_family -from dask.utils import typename cuda_serialize = dask.utils.Dispatch("cuda_serialize") cuda_deserialize = dask.utils.Dispatch("cuda_deserialize") diff --git a/distributed/protocol/h5py.py b/distributed/protocol/h5py.py index e129c166683..8a47c7abdc6 100644 --- a/distributed/protocol/h5py.py +++ b/distributed/protocol/h5py.py @@ -1,7 +1,7 @@ -from .serialize import dask_serialize, dask_deserialize - import h5py +from .serialize import dask_deserialize, dask_serialize + @dask_serialize.register(h5py.File) def serialize_h5py_file(f): diff --git a/distributed/protocol/keras.py b/distributed/protocol/keras.py index 121aa0c4700..c2c24e3992e 100644 --- a/distributed/protocol/keras.py +++ b/distributed/protocol/keras.py @@ -1,7 +1,7 @@ -from .serialize import dask_serialize, dask_deserialize, serialize, deserialize - import keras +from .serialize import dask_deserialize, dask_serialize, deserialize, serialize + @dask_serialize.register(keras.Model) def serialize_keras_model(model): diff --git a/distributed/protocol/netcdf4.py b/distributed/protocol/netcdf4.py index eb83461eddc..d3d0b1e2c0c 100644 --- a/distributed/protocol/netcdf4.py +++ b/distributed/protocol/netcdf4.py @@ -1,7 +1,7 @@ -from .serialize import dask_serialize, dask_deserialize, serialize, deserialize - import netCDF4 +from .serialize import dask_deserialize, dask_serialize, deserialize, serialize + @dask_serialize.register(netCDF4.Dataset) def serialize_netcdf4_dataset(ds): diff --git a/distributed/protocol/numpy.py b/distributed/protocol/numpy.py index 65f7e2f4076..a0a7d544064 100644 --- a/distributed/protocol/numpy.py +++ b/distributed/protocol/numpy.py @@ -1,10 +1,10 @@ import math -import numpy as np -from .serialize import dask_serialize, dask_deserialize -from . import pickle +import numpy as np from ..utils import log_errors +from . import pickle +from .serialize import dask_deserialize, dask_serialize def itemsize(dt): diff --git a/distributed/protocol/serialize.py b/distributed/protocol/serialize.py index 228f02d78ca..971e1d3821a 100644 --- a/distributed/protocol/serialize.py +++ b/distributed/protocol/serialize.py @@ -1,24 +1,17 @@ -from array import array -from functools import partial -import traceback import importlib +import traceback +from array import array from enum import Enum +from functools import partial import dask -from dask.base import normalize_token - import msgpack +from dask.base import normalize_token +from ..utils import ensure_bytes, has_keyword, typename from . import pickle -from ..utils import has_keyword, typename, ensure_bytes -from .compression import maybe_compress, decompress -from .utils import ( - unpack_frames, - pack_frames_prelude, - frame_split_size, - msgpack_opts, -) - +from .compression import decompress, maybe_compress +from .utils import frame_split_size, msgpack_opts, pack_frames_prelude, unpack_frames lazy_registrations = {} diff --git a/distributed/protocol/sparse.py b/distributed/protocol/sparse.py index a22d661f849..42d625b0df2 100644 --- a/distributed/protocol/sparse.py +++ b/distributed/protocol/sparse.py @@ -1,7 +1,7 @@ -from .serialize import dask_serialize, dask_deserialize, serialize, deserialize - import sparse +from .serialize import dask_deserialize, dask_serialize, deserialize, serialize + @dask_serialize.register(sparse.COO) def serialize_sparse(x): diff --git a/distributed/protocol/tests/test_arrow.py b/distributed/protocol/tests/test_arrow.py index e86bfa6f827..35d26177f03 100644 --- a/distributed/protocol/tests/test_arrow.py +++ b/distributed/protocol/tests/test_arrow.py @@ -4,8 +4,8 @@ pa = pytest.importorskip("pyarrow") import distributed -from distributed.utils_test import gen_cluster from distributed.protocol import deserialize, serialize, to_serialize +from distributed.utils_test import gen_cluster df = pd.DataFrame({"A": list("abc"), "B": [1, 2, 3]}) tbl = pa.Table.from_pandas(df, preserve_index=False) diff --git a/distributed/protocol/tests/test_collection.py b/distributed/protocol/tests/test_collection.py index fd112b6c792..f2064fda858 100644 --- a/distributed/protocol/tests/test_collection.py +++ b/distributed/protocol/tests/test_collection.py @@ -1,7 +1,8 @@ -import pytest -from distributed.protocol import serialize, deserialize -import pandas as pd import numpy as np +import pandas as pd +import pytest + +from distributed.protocol import deserialize, serialize @pytest.mark.parametrize("collection", [tuple, dict, list]) diff --git a/distributed/protocol/tests/test_collection_cuda.py b/distributed/protocol/tests/test_collection_cuda.py index e2602795782..4f3242525cd 100644 --- a/distributed/protocol/tests/test_collection_cuda.py +++ b/distributed/protocol/tests/test_collection_cuda.py @@ -1,8 +1,8 @@ +import pandas as pd import pytest - -from distributed.protocol import serialize, deserialize from dask.dataframe.utils import assert_eq -import pandas as pd + +from distributed.protocol import deserialize, serialize @pytest.mark.parametrize("collection", [tuple, dict]) diff --git a/distributed/protocol/tests/test_cupy.py b/distributed/protocol/tests/test_cupy.py index 520693fb5c1..5c684e46d62 100644 --- a/distributed/protocol/tests/test_cupy.py +++ b/distributed/protocol/tests/test_cupy.py @@ -1,6 +1,7 @@ import pickle import pytest + from distributed.protocol import deserialize, serialize cupy = pytest.importorskip("cupy") diff --git a/distributed/protocol/tests/test_h5py.py b/distributed/protocol/tests/test_h5py.py index 80eeb2c05f5..5189fc7f499 100644 --- a/distributed/protocol/tests/test_h5py.py +++ b/distributed/protocol/tests/test_h5py.py @@ -6,7 +6,6 @@ h5py = pytest.importorskip("h5py") from distributed.protocol import deserialize, serialize - from distributed.utils import tmpfile @@ -82,10 +81,9 @@ def test_raise_error_on_serialize_write_permissions(): deserialize(*serialize(f)) -from distributed.utils_test import gen_cluster - +from dask import array as da -import dask.array as da +from distributed.utils_test import gen_cluster @silence_h5py_issue775 diff --git a/distributed/protocol/tests/test_highlevelgraph.py b/distributed/protocol/tests/test_highlevelgraph.py index 1fcb339f721..ea0c752494b 100644 --- a/distributed/protocol/tests/test_highlevelgraph.py +++ b/distributed/protocol/tests/test_highlevelgraph.py @@ -1,14 +1,12 @@ import ast import dask +import pytest +from dask import array as da +from dask import dataframe as dd -import dask.array as da -import dask.dataframe as dd - -from distributed.utils_test import gen_cluster from distributed.diagnostics import SchedulerPlugin - -import pytest +from distributed.utils_test import gen_cluster np = pytest.importorskip("numpy") pd = pytest.importorskip("pandas") diff --git a/distributed/protocol/tests/test_keras.py b/distributed/protocol/tests/test_keras.py index da8cdf6374a..d84127d5df1 100644 --- a/distributed/protocol/tests/test_keras.py +++ b/distributed/protocol/tests/test_keras.py @@ -1,10 +1,10 @@ import numpy as np -from numpy.testing import assert_allclose import pytest +from numpy.testing import assert_allclose keras = pytest.importorskip("keras") -from distributed.protocol import serialize, deserialize, dumps, loads, to_serialize +from distributed.protocol import deserialize, dumps, loads, serialize, to_serialize def test_serialize_deserialize_model(): diff --git a/distributed/protocol/tests/test_netcdf4.py b/distributed/protocol/tests/test_netcdf4.py index 1ed78508156..4b198381328 100644 --- a/distributed/protocol/tests/test_netcdf4.py +++ b/distributed/protocol/tests/test_netcdf4.py @@ -4,7 +4,6 @@ np = pytest.importorskip("numpy") from distributed.protocol import deserialize, serialize - from distributed.utils import tmpfile @@ -75,10 +74,9 @@ def test_serialize_deserialize_group(): assert (x[:] == y[:]).all() -from distributed.utils_test import gen_cluster - +from dask import array as da -import dask.array as da +from distributed.utils_test import gen_cluster @gen_cluster(client=True) diff --git a/distributed/protocol/tests/test_numba.py b/distributed/protocol/tests/test_numba.py index 61213640715..b34d4be25ab 100644 --- a/distributed/protocol/tests/test_numba.py +++ b/distributed/protocol/tests/test_numba.py @@ -1,7 +1,9 @@ -from distributed.protocol import serialize, deserialize import pickle + import pytest +from distributed.protocol import deserialize, serialize + cuda = pytest.importorskip("numba.cuda") np = pytest.importorskip("numpy") diff --git a/distributed/protocol/tests/test_numpy.py b/distributed/protocol/tests/test_numpy.py index 9096748b5d1..9a4269e3b9c 100644 --- a/distributed/protocol/tests/test_numpy.py +++ b/distributed/protocol/tests/test_numpy.py @@ -4,20 +4,20 @@ import pytest from distributed.protocol import ( - serialize, - deserialize, decompress, + deserialize, dumps, loads, - to_serialize, msgpack, + serialize, + to_serialize, ) -from distributed.protocol.utils import BIG_BYTES_SHARD_SIZE +from distributed.protocol.compression import maybe_compress from distributed.protocol.numpy import itemsize from distributed.protocol.pickle import HIGHEST_PROTOCOL -from distributed.protocol.compression import maybe_compress +from distributed.protocol.utils import BIG_BYTES_SHARD_SIZE from distributed.system import MEMORY_LIMIT -from distributed.utils import ensure_bytes, tmpfile, nbytes +from distributed.utils import ensure_bytes, nbytes, tmpfile from distributed.utils_test import gen_cluster diff --git a/distributed/protocol/tests/test_pandas.py b/distributed/protocol/tests/test_pandas.py index a8134d7e3d0..e2037962884 100644 --- a/distributed/protocol/tests/test_pandas.py +++ b/distributed/protocol/tests/test_pandas.py @@ -1,20 +1,18 @@ import numpy as np import pandas as pd import pytest - from dask.dataframe.utils import assert_eq from distributed.protocol import ( - serialize, - deserialize, decompress, + deserialize, dumps, loads, + serialize, to_serialize, ) from distributed.utils import ensure_bytes - dfs = [ pd.DataFrame({}), pd.DataFrame({"x": [1, 2, 3]}), diff --git a/distributed/protocol/tests/test_pickle.py b/distributed/protocol/tests/test_pickle.py index d7a2ad6d1ad..86424f8c1e1 100644 --- a/distributed/protocol/tests/test_pickle.py +++ b/distributed/protocol/tests/test_pickle.py @@ -1,8 +1,8 @@ -from functools import partial import gc -from operator import add -import weakref import sys +import weakref +from functools import partial +from operator import add import pytest diff --git a/distributed/protocol/tests/test_protocol.py b/distributed/protocol/tests/test_protocol.py index e088f146bfb..411fcc0c4e4 100644 --- a/distributed/protocol/tests/test_protocol.py +++ b/distributed/protocol/tests/test_protocol.py @@ -1,8 +1,8 @@ import pytest -from distributed.protocol import loads, dumps, msgpack, maybe_compress, to_serialize +from distributed.protocol import dumps, loads, maybe_compress, msgpack, to_serialize from distributed.protocol.compression import compressions -from distributed.protocol.serialize import Serialize, Serialized, serialize, deserialize +from distributed.protocol.serialize import Serialize, Serialized, deserialize, serialize from distributed.system import MEMORY_LIMIT from distributed.utils import nbytes diff --git a/distributed/protocol/tests/test_rmm.py b/distributed/protocol/tests/test_rmm.py index 8b176afd877..bd9d7f4cab9 100644 --- a/distributed/protocol/tests/test_rmm.py +++ b/distributed/protocol/tests/test_rmm.py @@ -1,6 +1,7 @@ -from distributed.protocol import serialize, deserialize import pytest +from distributed.protocol import deserialize, serialize + numpy = pytest.importorskip("numpy") cuda = pytest.importorskip("numba.cuda") rmm = pytest.importorskip("rmm") diff --git a/distributed/protocol/tests/test_scipy.py b/distributed/protocol/tests/test_scipy.py index 4e5eb8423cf..0904f92002a 100644 --- a/distributed/protocol/tests/test_scipy.py +++ b/distributed/protocol/tests/test_scipy.py @@ -1,4 +1,5 @@ import pytest + from distributed.protocol import deserialize, serialize numpy = pytest.importorskip("numpy") diff --git a/distributed/protocol/tests/test_serialize.py b/distributed/protocol/tests/test_serialize.py index 735dffb5c19..d1e15cb6d44 100644 --- a/distributed/protocol/tests/test_serialize.py +++ b/distributed/protocol/tests/test_serialize.py @@ -1,35 +1,34 @@ -from array import array import copy import pickle +from array import array import msgpack import numpy as np import pytest -from tlz import identity - from dask.utils_test import inc +from tlz import identity from distributed import wait +from distributed.comm.utils import from_frames, to_frames from distributed.protocol import ( - register_serialization, - serialize, - deserialize, - nested_deserialize, Serialize, Serialized, - to_serialize, - serialize_bytes, - deserialize_bytes, - serialize_bytelist, - register_serialization_family, dask_serialize, + deserialize, + deserialize_bytes, dumps, loads, + nested_deserialize, + register_serialization, + register_serialization_family, + serialize, + serialize_bytelist, + serialize_bytes, + to_serialize, ) from distributed.protocol.serialize import check_dask_serializable from distributed.utils import nbytes -from distributed.utils_test import inc, gen_test -from distributed.comm.utils import to_frames, from_frames +from distributed.utils_test import gen_test, inc class MyObj: @@ -141,9 +140,10 @@ def test_nested_deserialize(): assert x == x_orig # x wasn't mutated -from distributed.utils_test import gen_cluster from dask import delayed +from distributed.utils_test import gen_cluster + @gen_cluster(client=True) async def test_object_in_graph(c, s, a, b): diff --git a/distributed/protocol/tests/test_sparse.py b/distributed/protocol/tests/test_sparse.py index 89f9da09bc2..5d971c9b1f4 100644 --- a/distributed/protocol/tests/test_sparse.py +++ b/distributed/protocol/tests/test_sparse.py @@ -1,6 +1,6 @@ import numpy as np -from numpy.testing import assert_allclose import pytest +from numpy.testing import assert_allclose sparse = pytest.importorskip("sparse") diff --git a/distributed/protocol/tests/test_torch.py b/distributed/protocol/tests/test_torch.py index efb5fa6610a..3e212c0e858 100644 --- a/distributed/protocol/tests/test_torch.py +++ b/distributed/protocol/tests/test_torch.py @@ -1,6 +1,7 @@ -from distributed.protocol import serialize, deserialize import pytest +from distributed.protocol import deserialize, serialize + np = pytest.importorskip("numpy") torch = pytest.importorskip("torch") diff --git a/distributed/protocol/torch.py b/distributed/protocol/torch.py index 3b4c6d19c8d..f8b6acb13b0 100644 --- a/distributed/protocol/torch.py +++ b/distributed/protocol/torch.py @@ -1,7 +1,7 @@ -from .serialize import serialize, dask_serialize, dask_deserialize, register_generic - -import torch import numpy as np +import torch + +from .serialize import dask_deserialize, dask_serialize, register_generic, serialize @dask_serialize.register(torch.Tensor) diff --git a/distributed/pubsub.py b/distributed/pubsub.py index 91200be06eb..5dba5b679c6 100644 --- a/distributed/pubsub.py +++ b/distributed/pubsub.py @@ -1,13 +1,13 @@ import asyncio -from collections import defaultdict, deque import logging import threading import weakref +from collections import defaultdict, deque from .core import CommClosedError from .metrics import time -from .utils import sync, TimeoutError, parse_timedelta from .protocol.serialize import to_serialize +from .utils import TimeoutError, parse_timedelta, sync logger = logging.getLogger(__name__) @@ -283,7 +283,7 @@ class Pub: def __init__(self, name, worker=None, client=None): if worker is None and client is None: - from distributed import get_worker, get_client + from distributed import get_client, get_worker try: worker = get_worker() @@ -363,7 +363,7 @@ class Sub: def __init__(self, name, worker=None, client=None): if worker is None and client is None: - from distributed.worker import get_worker, get_client + from distributed.worker import get_client, get_worker try: worker = get_worker() diff --git a/distributed/pytest_resourceleaks.py b/distributed/pytest_resourceleaks.py index 348472892d6..68246005275 100644 --- a/distributed/pytest_resourceleaks.py +++ b/distributed/pytest_resourceleaks.py @@ -4,10 +4,10 @@ """ import collections import gc -import time import os import sys import threading +import time import pytest diff --git a/distributed/queues.py b/distributed/queues.py index e368d329d03..cadaf358f0a 100644 --- a/distributed/queues.py +++ b/distributed/queues.py @@ -1,14 +1,13 @@ import asyncio -from collections import defaultdict import logging import uuid +from collections import defaultdict from dask.utils import stringify -from .client import Future, Client -from .utils import sync, thread_state +from .client import Client, Future +from .utils import parse_timedelta, sync, thread_state from .worker import get_client, get_worker -from .utils import parse_timedelta logger = logging.getLogger(__name__) diff --git a/distributed/recreate_exceptions.py b/distributed/recreate_exceptions.py index 7e966f270a0..6b498113b5e 100644 --- a/distributed/recreate_exceptions.py +++ b/distributed/recreate_exceptions.py @@ -1,5 +1,7 @@ import logging + from dask.utils import stringify + from .client import futures_of, wait from .utils import sync from .utils_comm import pack_data diff --git a/distributed/scheduler.py b/distributed/scheduler.py index d4e0b20dcb8..8627ee56d53 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -1,90 +1,85 @@ import asyncio -from collections import defaultdict, deque - -from collections.abc import Mapping, Set -from contextlib import suppress -from datetime import timedelta -from functools import partial import inspect import itertools import json import logging import math -from numbers import Number import operator import os -import sys import random +import sys import warnings import weakref +from collections import defaultdict, deque +from collections.abc import Mapping, Set +from contextlib import suppress +from datetime import timedelta +from functools import partial +from numbers import Number + +import dask import psutil import sortedcontainers - +from dask.highlevelgraph import HighLevelGraph from tlz import ( + compose, + concat, + first, + groupby, merge, - pluck, merge_sorted, - first, merge_with, - valmap, + pluck, second, - compose, - groupby, - concat, + valmap, ) from tornado.ioloop import IOLoop, PeriodicCallback -import dask -from dask.highlevelgraph import HighLevelGraph - -from . import profile +from . import preloading, profile +from . import versions as version_module from .batched import BatchedSend from .comm import ( + get_address_host, normalize_address, resolve_address, - get_address_host, unparse_host_port, ) from .comm.addressing import addresses_from_user_args -from .core import rpc, send_recv, clean_exception, CommClosedError, Status +from .core import CommClosedError, Status, clean_exception, rpc, send_recv from .diagnostics.plugin import SchedulerPlugin - +from .event import EventExtension from .http import get_handlers +from .lock import LockExtension from .metrics import time +from .multi_lock import MultiLockExtension from .node import ServerNode -from . import preloading from .proctitle import setproctitle +from .publish import PublishExtension +from .pubsub import PubSubSchedulerExtension +from .queues import QueueExtension +from .recreate_exceptions import ReplayExceptionScheduler from .security import Security +from .semaphore import SemaphoreExtension +from .stealing import WorkStealing from .utils import ( All, + TimeoutError, + empty_context, + format_bytes, + format_time, get_fileno_limit, - log_errors, key_split, - validate_key, + key_split_group, + log_errors, no_default, - parse_timedelta, parse_bytes, + parse_timedelta, shutting_down, - key_split_group, - empty_context, tmpfile, - format_bytes, - format_time, - TimeoutError, + validate_key, ) -from .utils_comm import scatter_to_workers, gather_from_workers, retry_operation -from .utils_perf import enable_gc_diagnosis, disable_gc_diagnosis -from . import versions as version_module - -from .publish import PublishExtension -from .queues import QueueExtension -from .semaphore import SemaphoreExtension -from .recreate_exceptions import ReplayExceptionScheduler -from .lock import LockExtension -from .multi_lock import MultiLockExtension -from .event import EventExtension -from .pubsub import PubSubSchedulerExtension -from .stealing import WorkStealing +from .utils_comm import gather_from_workers, retry_operation, scatter_to_workers +from .utils_perf import disable_gc_diagnosis, enable_gc_diagnosis from .variable import VariableExtension try: @@ -94,6 +89,8 @@ if compiled: from cython import ( + Py_hash_t, + Py_ssize_t, bint, cast, ccall, @@ -105,15 +102,11 @@ final, inline, nogil, - Py_hash_t, - Py_ssize_t, ) else: - from ctypes import ( - c_double as double, - c_ssize_t as Py_hash_t, - c_ssize_t as Py_ssize_t, - ) + from ctypes import c_double as double + from ctypes import c_ssize_t as Py_hash_t + from ctypes import c_ssize_t as Py_ssize_t bint = bool @@ -6387,16 +6380,16 @@ def profile_to_figure(state): for k in sorted(timespent.keys()): tasks_timings += f"\n