Skip to content

Expose Scheduler Status #161

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Mar 4, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
*.pyc
*.py~
docs/build
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ install:
# Install dependencies
- conda create -n test-environment python=$TRAVIS_PYTHON_VERSION
- source activate test-environment
- conda install pytest coverage tornado toolz dill futures dask ipywidgets psutil bokeh
- conda install pytest coverage tornado toolz dill futures dask ipywidgets psutil bokeh requests
- pip install git+https://github.com/dask/dask.git --upgrade

# Install distributed
Expand Down
6 changes: 4 additions & 2 deletions distributed/cli/dscheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@ def handle_signal(sig, frame):
@click.command()
@click.argument('center', type=str, default='')
@click.option('--port', type=int, default=8786, help="Serving port")
@click.option('--http-port', type=int, default=9786, help="HTTP port")
@click.option('--host', type=str, default=ip,
help="Serving host defaults to %s" % ip)
def main(center, host, port):
def main(center, host, port, http_port):
ip = socket.gethostbyname(host)
loop = IOLoop.current()
scheduler = Scheduler(center, services={'http': HTTPScheduler}, ip=ip)
scheduler = Scheduler(center, ip=ip,
services={('http', http_port): HTTPScheduler})
if center:
loop.run_sync(scheduler.sync_center)
scheduler.start(port)
Expand Down
6 changes: 4 additions & 2 deletions distributed/cli/dworker.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
@click.argument('center', type=str)
@click.option('--port', type=int, default=0,
help="Serving port, defaults to randomly assigned")
@click.option('--http-port', type=int, default=0,
help="Serving http port, defaults to randomly assigned")
@click.option('--host', type=str, default=None,
help="Serving host. Defaults to an ip address that can hopefully"
" be visible from the center network.")
Expand All @@ -26,7 +28,7 @@
@click.option('--nprocs', type=int, default=1,
help="Number of worker processes. Defaults to one.")
@click.option('--no-nanny', is_flag=True)
def main(center, host, port, nthreads, nprocs, no_nanny):
def main(center, host, port, http_port, nthreads, nprocs, no_nanny):
try:
center_host, center_port = center.split(':')
center_ip = socket.gethostbyname(center_host)
Expand All @@ -41,7 +43,7 @@ def main(center, host, port, nthreads, nprocs, no_nanny):
if not nthreads:
nthreads = _ncores // nprocs

services = {'http': HTTPWorker}
services = {('http', http_port): HTTPWorker}

loop = IOLoop.current()
t = Worker if no_nanny else Nanny
Expand Down
16 changes: 16 additions & 0 deletions distributed/cli/tests/test_dscheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from subprocess import Popen, PIPE
import requests

from distributed import Scheduler, Executor

def test_defaults():
try:
proc = Popen(['dscheduler'], stdout=PIPE, stderr=PIPE)
e = Executor('127.0.0.1:%d' % Scheduler.default_port)

response = requests.get('http://127.0.0.1:9786/info.json')
assert response.ok
assert response.json()['status'] == 'running'
finally:
e.shutdown()
proc.kill()
6 changes: 5 additions & 1 deletion distributed/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ class Server(TCPServer):
* ``{'op': 'ping'}``
* ``{'op': 'add': 'x': 10, 'y': 20}``
"""
default_port = 0

def __init__(self, handlers, max_buffer_size=MAX_BUFFER_SIZE, **kwargs):
self.handlers = assoc(handlers, 'identity', self.identity)
self.id = str(uuid.uuid1())
Expand All @@ -110,7 +112,9 @@ def port(self):
def identity(self, stream):
return {'type': type(self).__name__, 'id': self.id}

def listen(self, port):
def listen(self, port=None):
if port is None:
port = self.default_port
while True:
try:
super(Server, self).listen(port)
Expand Down
2 changes: 2 additions & 0 deletions distributed/diagnostics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@
from .progressbar import progress
with ignoring(ImportError):
from .resource_monitor import Occupancy
with ignoring(ImportError):
from .scheduler_widgets import scheduler_status
114 changes: 114 additions & 0 deletions distributed/diagnostics/scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import os
import pandas as pd


def scheduler_status_str(d):
""" Render scheduler status as a string

Consumes data from status.json route

Examples
--------
>>> d = {"address": "SCHEDULER_ADDRESS:9999",
... "ready": 5,
... "ncores": {"192.168.1.107:44544": 4,
... "192.168.1.107:36441": 4},
... "in-memory": 30, "waiting": 20,
... "processing": {"192.168.1.107:44544": {'inc': 3, 'add': 1},
... "192.168.1.107:36441": {'inc': 2}},
... "tasks": 70,
... "failed": 9,
... "bytes": {"192.168.1.107:44544": 1000,
... "192.168.1.107:36441": 2000}}

>>> print(scheduler_status_str(d)) # doctest: +NORMALIZE_WHITESPACE
Scheduler: SCHEDULER_ADDRESS:9999
<BLANKLINE>
Count Progress
Tasks
waiting 20 +++++++++++
ready 5 ++
failed 9 +++++
in-progress 6 +++
in-memory 30 +++++++++++++++++
total 70 ++++++++++++++++++++++++++++++++++++++++
<BLANKLINE>
Ncores Bytes Processing
Workers
192.168.1.107:36441 4 2000 [inc]
192.168.1.107:44544 4 1000 [add, inc]
"""
sched = scheduler_progress_df(d)
workers = worker_status_df(d)
s = "Scheduler: %s\n\n%s\n\n%s" % (d['address'], sched, workers)
return os.linesep.join(map(str.rstrip, s.split(os.linesep)))


def scheduler_progress_df(d):
""" Convert status response to DataFrame of total progress

Consumes dictionary from status.json route

Examples
--------
>>> d = {"ready": 5, "in-memory": 30, "waiting": 20,
... "tasks": 70, "failed": 9,
... "processing": {"192.168.1.107:44544": {'inc': 3, 'add': 1},
... "192.168.1.107:36441": {'inc': 2}},
... "other-keys-are-fine-too": ''}

>>> scheduler_progress_df(d) # doctest: +SKIP
Count Progress
Tasks
waiting 20 +++++++++++
ready 5 ++
failed 9 +++++
in-progress 6 +++
in-memory 30 +++++++++++++++++
total 70 ++++++++++++++++++++++++++++++++++++++++
"""
d = d.copy()
d['in-progress'] = sum(v for vv in d['processing'].values() for v in vv.values())
d['total'] = d.pop('tasks')
names = ['waiting', 'ready', 'failed', 'in-progress', 'in-memory', 'total']
df = pd.DataFrame(pd.Series({k: d[k] for k in names},
index=names, name='Count'))
if d['total']:
barlength = (40 * df.Count / d['total']).astype(int)
df['Progress'] = barlength.apply(lambda n: ('%-40s' % (n * '+').rstrip(' ')))
else:
df['Progress'] = 0

df.index.name = 'Tasks'

return df


def worker_status_df(d):
""" Status of workers as a Pandas DataFrame

Consumes data from status.json route.

Examples
--------
>>> d = {"other-keys-are-fine-too": '',
... "ncores": {"192.168.1.107:44544": 4,
... "192.168.1.107:36441": 4},
... "processing": {"192.168.1.107:44544": {'inc': 3, 'add': 1},
... "192.168.1.107:36441": {'inc': 2}},
... "bytes": {"192.168.1.107:44544": 1000,
... "192.168.1.107:36441": 2000}}

>>> worker_status_df(d) # doctest: +SKIP
Ncores Bytes Processing
Workers
192.168.1.107:36441 4 2000 [inc]
192.168.1.107:44544 4 1000 [add, inc]
"""
names = ['ncores', 'bytes', 'processing']
df = pd.DataFrame({k: d[k] for k in names}, columns=names)
df['processing'] = df['processing'].apply(sorted)
df.columns = df.columns.map(str.title)
df.index.name = 'Workers'
df = df.sort_index()
return df
53 changes: 53 additions & 0 deletions distributed/diagnostics/scheduler_widgets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import json
import logging

from tornado.ioloop import IOLoop, PeriodicCallback
from tornado.httpclient import AsyncHTTPClient
from tornado import gen

from .scheduler import scheduler_progress_df, worker_status_df

logger = logging.getLogger(__name__)


def scheduler_status_widget(d, widget=None):
""" IPython widget to display scheduler status

See also:
scheduler_status_str
"""
from ipywidgets import HTML, VBox
if widget is None:
widget = VBox([HTML(''), HTML(''), HTML('')])
header = '<h3>Scheduler: %s</h3>' % d['address']
sched = scheduler_progress_df(d)
workers = worker_status_df(d)
widget.children[0].value = header
widget.children[1].value = sched.style.set_table_attributes('class="table"').render()
widget.children[2].value = workers.style.set_table_attributes('class="table"').render()
logger.debug("Update scheduler status widget")
return widget


@gen.coroutine
def update_status_widget(widget, ip, port):
client = AsyncHTTPClient()
try:
response = yield client.fetch('http://%s:%d/status.json' % (ip, port))
d = json.loads(response.body.decode())
scheduler_status_widget(d, widget)
except Exception as e:
logger.exception(e)
raise


def scheduler_status(e, interval=200, port=9786, loop=None):
from ipywidgets import HTML, VBox
loop = loop or IOLoop.current()
widget = VBox([HTML(''), HTML(''), HTML('')])
cb = lambda: update_status_widget(widget, e.scheduler.ip, port)
loop.add_callback(cb)
pc = PeriodicCallback(cb, interval, io_loop=loop)
pc.start()

return widget
112 changes: 112 additions & 0 deletions distributed/diagnostics/tests/test_scheduler_diagnostic_widgets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
from ipykernel.comm import Comm
import ipywidgets as widgets
from ipywidgets import Widget

#################
# Utility stuff #
#################

# Taken from ipywidgets/widgets/tests/test_interaction.py
# https://github.com/ipython/ipywidgets
# Licensed under Modified BSD. Copyright IPython Development Team. See:
# https://github.com/ipython/ipywidgets/blob/master/COPYING.md


class DummyComm(Comm):
comm_id = 'a-b-c-d'

def open(self, *args, **kwargs):
pass

def send(self, *args, **kwargs):
pass

def close(self, *args, **kwargs):
pass

_widget_attrs = {}
displayed = []
undefined = object()

def setup():
_widget_attrs['_comm_default'] = getattr(Widget, '_comm_default', undefined)
Widget._comm_default = lambda self: DummyComm()
_widget_attrs['_ipython_display_'] = Widget._ipython_display_
def raise_not_implemented(*args, **kwargs):
raise NotImplementedError()
Widget._ipython_display_ = raise_not_implemented

def teardown():
for attr, value in _widget_attrs.items():
if value is undefined:
delattr(Widget, attr)
else:
setattr(Widget, attr, value)

def f(**kwargs):
pass

def clear_display():
global displayed
displayed = []

def record_display(*args):
displayed.extend(args)
# End code taken from ipywidgets


d = {"address": "SCHEDULER_ADDRESS:9999",
"ready": 5,
"ncores": {"192.168.1.107:44544": 4,
"192.168.1.107:36441": 4},
"in-memory": 30, "waiting": 20,
"processing": {"192.168.1.107:44544": {'inc': 3, 'add': 1},
"192.168.1.107:36441": {'inc': 2}},
"tasks": 70,
"failed": 9,
"bytes": {"192.168.1.107:44544": 1000,
"192.168.1.107:36441": 2000}}

from distributed.diagnostics.scheduler_widgets import (scheduler_status_widget,
scheduler_status)
from distributed.http.scheduler import HTTPScheduler
from time import time

from distributed.utils_test import gen_cluster, inc

from tornado import gen
from tornado.httpclient import AsyncHTTPClient


def test_scheduler_status_widget():
widget = scheduler_status_widget(d)
assert d['address'] in widget.children[0].value


@gen_cluster(executor=True)
def test_scheduler_status(e, s, a, b):
ss = HTTPScheduler(s)
ss.listen(0)

client = AsyncHTTPClient()

response = yield client.fetch('http://localhost:%d/status.json' % ss.port)

widget = scheduler_status(e, 50, port=ss.port, loop=e.loop)

yield gen.sleep(1)

start = time()
while s.address not in widget.children[0].value:
yield gen.sleep(0.01)
assert time() < start + 1

futures = e.map(inc, range(11))
yield e._gather(futures)

start = time()
while '11' not in widget.children[1].value:
yield gen.sleep(0.01)
assert time() < start + 1

ss.stop()
Loading