Skip to content

Lazy evaluation #582

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 39 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
92d8bc5
Factor out a _send method in mpi_engine.
bgrant Aug 25, 2014
e27d079
Another small refactoring for mpi_engine.
bgrant Aug 25, 2014
7c0ae22
WIP: Lazy evaluation for MPIContext.
bgrant Aug 25, 2014
ad5fb4d
Predict output dtype for ufuncs...
bgrant Aug 25, 2014
9354bf3
Add a context manager for the laziness.
bgrant Aug 25, 2014
19fee1f
Remove trailing whitespace.
bgrant Aug 25, 2014
8c9a6e3
Remove the unsupported case in the lazy test.
bgrant Aug 26, 2014
c468d0c
Add more passing tests and one failing one (skipped)
bgrant Aug 26, 2014
c307e75
Remove a print statement.
bgrant Aug 26, 2014
fb019bd
Test unary ufuncs.
bgrant Aug 26, 2014
900efd3
Skip an unsupported test.
bgrant Aug 26, 2014
6003ce6
Fix dependent values (and leftover lazy proxies).
bgrant Aug 27, 2014
6bc2dc3
Whitespace.
bgrant Aug 27, 2014
28bb22a
Add a complicated expression test.
bgrant Aug 27, 2014
78c3d00
Add one more check.
bgrant Aug 27, 2014
82b809a
Update the module docstring.
bgrant Aug 28, 2014
b5fb986
Add some docstrings and comments.
bgrant Aug 28, 2014
6f98892
Remove extraneous statements.
bgrant Aug 28, 2014
9816462
Allow DistArray creation inside lazy eval.
bgrant Aug 29, 2014
c2f35ad
Whitespace.
bgrant Aug 29, 2014
f0f45e5
Add a more complex test.
bgrant Aug 29, 2014
d39b916
Merge branch 'master' into feature/lazy-evaluation
bgrant Aug 29, 2014
8311c57
Add a test for a user-defined function.
bgrant Aug 29, 2014
341a2f9
Add a test for multiple return values.
bgrant Sep 3, 2014
82a31f8
Fix multiple return values.
bgrant Sep 3, 2014
d9a708e
Improve an error message.
bgrant Sep 3, 2014
1f6e99d
Add a test without the context manager.
bgrant Sep 3, 2014
1298a37
Improve a test.
bgrant Sep 3, 2014
d8b75ce
Test a lazy loop.
bgrant Sep 3, 2014
30ea479
Add a simple benchmark.
bgrant Sep 3, 2014
6e43255
Improve benchmark.
bgrant Sep 3, 2014
be72582
Merge branch 'master' into feature/lazy-evaluation
bgrant Sep 3, 2014
483eab6
Fix a spelling error.
bgrant Sep 4, 2014
b5d0f24
Improve variable names for readability.
bgrant Sep 4, 2014
fe3cd02
Print better status, dump to json.
bgrant Sep 4, 2014
81bedc6
Better filesnames.
bgrant Sep 4, 2014
a34b002
Make into functions.
bgrant Sep 4, 2014
4ca2c90
Add another test.
bgrant Sep 4, 2014
13401ce
Remove pprints in newest test.
bgrant Sep 4, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 96 additions & 13 deletions distarray/globalapi/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from abc import ABCMeta, abstractmethod

from functools import wraps
from contextlib import contextmanager

import numpy

Expand All @@ -28,7 +29,7 @@

from distarray.globalapi.ipython_utils import IPythonClient
from distarray.utils import uid, nonce, has_exactly_one
from distarray.localapi.proxyize import Proxy
from distarray.localapi.proxyize import Proxy, lazy_proxyize, lazy_name

# mpi context
from distarray.mpionly_utils import (make_targets_comm,
Expand Down Expand Up @@ -71,7 +72,8 @@ def make_subcomm(self, new_targets):
pass

@abstractmethod
def apply(self, func, args=None, kwargs=None, targets=None, autoproxyize=False):
def apply(self, func, args=None, kwargs=None, targets=None, nresults=None,
autoproxyize=False):
pass

@abstractmethod
Expand Down Expand Up @@ -206,7 +208,7 @@ def local_allclose(la, lb, rtol, atol):
from numpy import allclose
return allclose(la.ndarray, lb.ndarray, rtol, atol)

local_results = self.apply(local_allclose,
local_results = self.apply(local_allclose,
(a.key, b.key, rtol, atol),
targets=a.targets)
return all(local_results)
Expand Down Expand Up @@ -580,7 +582,7 @@ def is_NoneType(pxy):
return pxy.type_str == str(type(None))

def is_LocalArray(pxy):
return (isinstance(pxy, Proxy) and
return (isinstance(pxy, Proxy) and
pxy.type_str == "<class 'distarray.localapi.localarray.LocalArray'>")

if all(is_LocalArray(r) for r in results):
Expand Down Expand Up @@ -743,7 +745,8 @@ def _execute(self, lines, targets):
def _push(self, d, targets):
return self.view.push(d, targets=targets, block=True)

def apply(self, func, args=None, kwargs=None, targets=None, autoproxyize=False):
def apply(self, func, args=None, kwargs=None, targets=None,
autoproxyize=False, nresults=1):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There isn't time to implement it, but it feels like nresults should be an attribute on the local function somehow, either with a decorator or a function annotation for the return value or something. Perhaps we can write it up in an issue...

"""
Analogous to IPython.parallel.view.apply_sync

Expand All @@ -758,6 +761,8 @@ def apply(self, func, args=None, kwargs=None, targets=None, autoproxyize=False):
engines func is to be run on.
autoproxyize: bool, default False
If True, implicitly return a Proxy object from the function.
nresults: int, default 1
Number of return values. Only implemented for MPIContext.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anything preventing us from allowing nresults on IPythonContext as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well nresults is only for lazy evaluation, and lazy evaluation is only implemented for MPIContext.


Returns
-------
Expand Down Expand Up @@ -860,7 +865,7 @@ def delete_key(self, key, targets=None):
if MPIContext.INTERCOMM:
self._send_msg(msg, targets=targets)

def __init__(self, targets=None):
def __init__(self, targets=None, lazy=False):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So does this allow you to create a context that is always lazy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, or at least lazy from the get-go. It really just sets <context>.lazy = True right off the bat. You can still set it to False later if you want.


if MPIContext.INTERCOMM is None:
MPIContext.INTERCOMM = initial_comm_setup()
Expand All @@ -870,6 +875,16 @@ def __init__(self, targets=None):
self.all_targets = list(range(self.nengines))
self.targets = self.all_targets if targets is None else sorted(targets)

self.lazy = lazy # is the context in lazy-communication mode?

# message queues used for lazy mode
# mapping: target -> queue of messages for that target

# _sendq: batches up messages to send upon sync()
self._sendq = dict([(t, []) for t in self.targets])
# _recvq: stores proxy objects for expected return values
self._recvq = dict([(t, []) for t in self.targets])

# make/get comms
# this is the object we want to use with push, pull, etc'
self._comm_from_targets = {}
Expand Down Expand Up @@ -912,16 +927,45 @@ def close(self):

# End of key management routines.

@contextmanager
def lazy_eval(self):
"""Context manager that enables lazy evaluation.

On exit, call `sync()` and set `self.lazy` to False
"""
self.lazy = True
yield self
self.sync()
self.lazy = False

def _send_msg(self, msg, targets=None):
targets = self.targets if targets is None else targets
for t in targets:
MPIContext.INTERCOMM.send(msg, dest=t)
if self.lazy and msg[0] != 'process_message_queue':
for t in targets:
self._sendq[t].append(msg)
else:
for t in targets:
MPIContext.INTERCOMM.send(msg, dest=t)

def _recv_msg(self, targets=None):
def _recv_msg(self, targets=None, nresults=1, sync=False):
res = []
targets = self.targets if targets is None else targets
for t in targets:
res.append(MPIContext.INTERCOMM.recv(source=t))
if self.lazy and not sync:
result_names = [lazy_name() for n in range(nresults)]
for t in targets:
if nresults == 0:
res.append(None)
elif nresults == 1:
res.append(lazy_proxyize(name=result_names[0]))
else:
target_results = []
for name in result_names:
target_results.append(lazy_proxyize(name))
res.append(target_results)
self._recvq[t].append(res[-1])
else:
for t in targets:
res.append(MPIContext.INTERCOMM.recv(source=t))
return res

def make_subcomm(self, targets):
Expand Down Expand Up @@ -952,7 +996,8 @@ def _push(self, d, targets=None):
msg = ('push', d)
return self._send_msg(msg, targets=targets)

def apply(self, func, args=None, kwargs=None, targets=None, autoproxyize=False):
def apply(self, func, args=None, kwargs=None, targets=None,
autoproxyize=False, nresults=1):
"""
Analogous to IPython.parallel.view.apply_sync

Expand All @@ -967,6 +1012,8 @@ def apply(self, func, args=None, kwargs=None, targets=None, autoproxyize=False):
engines func is to be run on.
autoproxyize: bool, default False
If True, implicitly return a Proxy object from the function.
nresults: int, default 1
Number of return values. Only needed for lazy evaluation.

Returns
-------
Expand Down Expand Up @@ -1000,11 +1047,47 @@ def apply(self, func, args=None, kwargs=None, targets=None, autoproxyize=False):
msg = ('builtin_call', func, args, kwargs, autoproxyize)

self._send_msg(msg, targets=targets)
return self._recv_msg(targets=targets)
return self._recv_msg(targets=targets, nresults=nresults)

def push_function(self, key, func, targets=None):
push_function(self, key, func, targets=targets)

def sync(self, targets=None):
"""Send queued messages, fill in expected result values."""
targets = self.targets if targets is None else targets
for t in targets:
msg = ('process_message_queue', self._recvq[t], self._sendq[t])
self._send_msg(msg, targets=[t])
self._sendq[t] = [] # empty the send queue

for t in targets:
results = self._recv_msg(targets=[t], sync=True)[0]
lresults = self._recvq[t]
for lres, res in zip(lresults, results):
# multiple return values
if isinstance(res, collections.Sequence):
if len(lres) != len(res):
msg = ("Reserved lazy result object isn't the same"
" size as the actual result object: {} != {}")
raise TypeError(msg.format(len(lres), len(res)))
for sublres, subres in zip(lres, res):
if isinstance(subres, Proxy):
sublres.__dict__ = subres.__dict__
else:
msg = ("Only DistArray return values are "
"supported in lazy mode. Type is: {}"
"".format(type(subres)))
raise TypeError(msg)
# single return value
else:
if isinstance(res, Proxy):
lres.__dict__ = res.__dict__
else:
msg = ("Only DistArray return values are "
"currently supported in lazy mode.")
raise TypeError(msg)
self._recvq[t] = [] # empty the recv queue


class ContextCreationError(RuntimeError):
pass
Expand Down
1 change: 0 additions & 1 deletion distarray/globalapi/distarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ def from_localarrays(cls, key, context=None, targets=None, distribution=None,

If `dtype` is not provided, it will be fetched from the engines.
"""

def get_dim_datas_and_dtype(arr):
return (arr.dim_data, arr.dtype)

Expand Down
56 changes: 49 additions & 7 deletions distarray/globalapi/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,25 @@
__all__.append(func_name)


def unary_output_dtype(ufunc, val):
"""Determine the output dtype of a unary ufunc with input `val`.

Use the ufunc.types attribute and the input dtype.
"""
input_dtype = numpy.result_type(val) # find out dtype of scalars
# Look at the built-in implementations to find an output type.
for input_type, _, _, output_type in ufunc.types:
if input_dtype.char == input_type:
return numpy.dtype(output_type)
# Nothing found. Try coercion.
for input_type, _, _, output_type in ufunc.types:
if numpy.can_cast(input_dtype, input_type):
return numpy.dtype(output_type)
else: # Can't even coerce to a known input type. Give up.
msg = "Unary ufunc doesn't have a mapping for this type: {}."
raise TypeError(msg.format(input_dtype))


def unary_proxy(name):
def proxy_func(a, *args, **kwargs):
context = determine_context(a)
Expand All @@ -44,18 +63,41 @@ def func_call(func_name, arr_name, args, kwargs):
dotted_name = 'distarray.localapi.%s' % (func_name,)
func = get_from_dotted_name(dotted_name)
res = func(arr_name, *args, **kwargs)
return proxyize(res), res.dtype # noqa
return proxyize(res)

res = context.apply(func_call, args=(name, a.key, args, kwargs),
targets=a.targets)
new_key = res[0][0]
dtype = res[0][1]
new_key = res[0]
dtype = unary_output_dtype(getattr(numpy, name), a)
return DistArray.from_localarrays(new_key,
distribution=a.distribution,
dtype=dtype)
return proxy_func


def binary_output_dtype(ufunc, val0, val1):
"""Determine the output dtype of a binary ufunc, given input values.

Use the ufunc.types attribute and the input dtypes.
"""
# find out dtype of scalars
input_dtype_0, input_dtype_1 = map(numpy.result_type, (val0, val1))
# Look at the built-in implementations to find an output type.
for input_type_0, input_type_1, _, _, output_type in ufunc.types:
if ((input_dtype_0.char == input_type_0) and
(input_dtype_1.char == input_type_1)):
return numpy.dtype(output_type)
# Nothing found. Try coercion.
for input_type_0, input_type_1, _, _, output_type in ufunc.types:
if (numpy.can_cast(input_dtype_0, input_type_0) and
numpy.can_cast(input_dtype_1, input_type_1)):
return numpy.dtype(output_type)
else: # Can't even coerce to a known input type. Give up.
msg = ("Binary ufunc doesn't have a mapping for these input types: "
"{}, {}")
raise TypeError(msg.format(input_dtype_0, input_dtype_1))


def binary_proxy(name):
def proxy_func(a, b, *args, **kwargs):
context = determine_context(a, b)
Expand Down Expand Up @@ -83,12 +125,12 @@ def func_call(func_name, a, b, args, kwargs):
dotted_name = 'distarray.localapi.%s' % (func_name,)
func = get_from_dotted_name(dotted_name)
res = func(a, b, *args, **kwargs)
return proxyize(res), res.dtype # noqa
return proxyize(res)

res = context.apply(func_call, args=(name, a_key, b_key, args, kwargs),
targets=distribution.targets)
new_key = res[0][0]
dtype = res[0][1]
targets=distribution.targets, nresults=1)
new_key = res[0]
dtype = binary_output_dtype(getattr(numpy, name), a, b)
return DistArray.from_localarrays(new_key,
distribution=distribution,
dtype=dtype)
Expand Down
2 changes: 1 addition & 1 deletion distarray/globalapi/maps.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ def view(self, new_dimsize):

def is_compatible(self, other):
return (isinstance(other, (NoDistMap, BlockMap, BlockCyclicMap)) and
other.grid_size == self.grid_size and
other.grid_size == self.grid_size and
other.size == self.size)


Expand Down
Loading