diff --git a/distarray/globalapi/context.py b/distarray/globalapi/context.py index c73aad7c..17d63f1a 100644 --- a/distarray/globalapi/context.py +++ b/distarray/globalapi/context.py @@ -17,6 +17,7 @@ from abc import ABCMeta, abstractmethod from functools import wraps +from contextlib import contextmanager import numpy @@ -28,7 +29,7 @@ from distarray.globalapi.ipython_utils import IPythonClient from distarray.utils import uid, nonce, has_exactly_one -from distarray.localapi.proxyize import Proxy +from distarray.localapi.proxyize import Proxy, lazy_proxyize, lazy_name # mpi context from distarray.mpionly_utils import (make_targets_comm, @@ -71,7 +72,8 @@ def make_subcomm(self, new_targets): pass @abstractmethod - def apply(self, func, args=None, kwargs=None, targets=None, autoproxyize=False): + def apply(self, func, args=None, kwargs=None, targets=None, nresults=None, + autoproxyize=False): pass @abstractmethod @@ -206,7 +208,7 @@ def local_allclose(la, lb, rtol, atol): from numpy import allclose return allclose(la.ndarray, lb.ndarray, rtol, atol) - local_results = self.apply(local_allclose, + local_results = self.apply(local_allclose, (a.key, b.key, rtol, atol), targets=a.targets) return all(local_results) @@ -580,7 +582,7 @@ def is_NoneType(pxy): return pxy.type_str == str(type(None)) def is_LocalArray(pxy): - return (isinstance(pxy, Proxy) and + return (isinstance(pxy, Proxy) and pxy.type_str == "") if all(is_LocalArray(r) for r in results): @@ -743,7 +745,8 @@ def _execute(self, lines, targets): def _push(self, d, targets): return self.view.push(d, targets=targets, block=True) - def apply(self, func, args=None, kwargs=None, targets=None, autoproxyize=False): + def apply(self, func, args=None, kwargs=None, targets=None, + autoproxyize=False, nresults=1): """ Analogous to IPython.parallel.view.apply_sync @@ -758,6 +761,8 @@ def apply(self, func, args=None, kwargs=None, targets=None, autoproxyize=False): engines func is to be run on. autoproxyize: bool, default False If True, implicitly return a Proxy object from the function. + nresults: int, default 1 + Number of return values. Only implemented for MPIContext. Returns ------- @@ -860,7 +865,7 @@ def delete_key(self, key, targets=None): if MPIContext.INTERCOMM: self._send_msg(msg, targets=targets) - def __init__(self, targets=None): + def __init__(self, targets=None, lazy=False): if MPIContext.INTERCOMM is None: MPIContext.INTERCOMM = initial_comm_setup() @@ -870,6 +875,16 @@ def __init__(self, targets=None): self.all_targets = list(range(self.nengines)) self.targets = self.all_targets if targets is None else sorted(targets) + self.lazy = lazy # is the context in lazy-communication mode? + + # message queues used for lazy mode + # mapping: target -> queue of messages for that target + + # _sendq: batches up messages to send upon sync() + self._sendq = dict([(t, []) for t in self.targets]) + # _recvq: stores proxy objects for expected return values + self._recvq = dict([(t, []) for t in self.targets]) + # make/get comms # this is the object we want to use with push, pull, etc' self._comm_from_targets = {} @@ -912,16 +927,45 @@ def close(self): # End of key management routines. + @contextmanager + def lazy_eval(self): + """Context manager that enables lazy evaluation. + + On exit, call `sync()` and set `self.lazy` to False + """ + self.lazy = True + yield self + self.sync() + self.lazy = False + def _send_msg(self, msg, targets=None): targets = self.targets if targets is None else targets - for t in targets: - MPIContext.INTERCOMM.send(msg, dest=t) + if self.lazy and msg[0] != 'process_message_queue': + for t in targets: + self._sendq[t].append(msg) + else: + for t in targets: + MPIContext.INTERCOMM.send(msg, dest=t) - def _recv_msg(self, targets=None): + def _recv_msg(self, targets=None, nresults=1, sync=False): res = [] targets = self.targets if targets is None else targets - for t in targets: - res.append(MPIContext.INTERCOMM.recv(source=t)) + if self.lazy and not sync: + result_names = [lazy_name() for n in range(nresults)] + for t in targets: + if nresults == 0: + res.append(None) + elif nresults == 1: + res.append(lazy_proxyize(name=result_names[0])) + else: + target_results = [] + for name in result_names: + target_results.append(lazy_proxyize(name)) + res.append(target_results) + self._recvq[t].append(res[-1]) + else: + for t in targets: + res.append(MPIContext.INTERCOMM.recv(source=t)) return res def make_subcomm(self, targets): @@ -952,7 +996,8 @@ def _push(self, d, targets=None): msg = ('push', d) return self._send_msg(msg, targets=targets) - def apply(self, func, args=None, kwargs=None, targets=None, autoproxyize=False): + def apply(self, func, args=None, kwargs=None, targets=None, + autoproxyize=False, nresults=1): """ Analogous to IPython.parallel.view.apply_sync @@ -967,6 +1012,8 @@ def apply(self, func, args=None, kwargs=None, targets=None, autoproxyize=False): engines func is to be run on. autoproxyize: bool, default False If True, implicitly return a Proxy object from the function. + nresults: int, default 1 + Number of return values. Only needed for lazy evaluation. Returns ------- @@ -1000,11 +1047,47 @@ def apply(self, func, args=None, kwargs=None, targets=None, autoproxyize=False): msg = ('builtin_call', func, args, kwargs, autoproxyize) self._send_msg(msg, targets=targets) - return self._recv_msg(targets=targets) + return self._recv_msg(targets=targets, nresults=nresults) def push_function(self, key, func, targets=None): push_function(self, key, func, targets=targets) + def sync(self, targets=None): + """Send queued messages, fill in expected result values.""" + targets = self.targets if targets is None else targets + for t in targets: + msg = ('process_message_queue', self._recvq[t], self._sendq[t]) + self._send_msg(msg, targets=[t]) + self._sendq[t] = [] # empty the send queue + + for t in targets: + results = self._recv_msg(targets=[t], sync=True)[0] + lresults = self._recvq[t] + for lres, res in zip(lresults, results): + # multiple return values + if isinstance(res, collections.Sequence): + if len(lres) != len(res): + msg = ("Reserved lazy result object isn't the same" + " size as the actual result object: {} != {}") + raise TypeError(msg.format(len(lres), len(res))) + for sublres, subres in zip(lres, res): + if isinstance(subres, Proxy): + sublres.__dict__ = subres.__dict__ + else: + msg = ("Only DistArray return values are " + "supported in lazy mode. Type is: {}" + "".format(type(subres))) + raise TypeError(msg) + # single return value + else: + if isinstance(res, Proxy): + lres.__dict__ = res.__dict__ + else: + msg = ("Only DistArray return values are " + "currently supported in lazy mode.") + raise TypeError(msg) + self._recvq[t] = [] # empty the recv queue + class ContextCreationError(RuntimeError): pass diff --git a/distarray/globalapi/distarray.py b/distarray/globalapi/distarray.py index 517ca133..69b97c47 100644 --- a/distarray/globalapi/distarray.py +++ b/distarray/globalapi/distarray.py @@ -77,7 +77,6 @@ def from_localarrays(cls, key, context=None, targets=None, distribution=None, If `dtype` is not provided, it will be fetched from the engines. """ - def get_dim_datas_and_dtype(arr): return (arr.dim_data, arr.dtype) diff --git a/distarray/globalapi/functions.py b/distarray/globalapi/functions.py index 24a058dc..567a24ed 100644 --- a/distarray/globalapi/functions.py +++ b/distarray/globalapi/functions.py @@ -35,6 +35,25 @@ __all__.append(func_name) +def unary_output_dtype(ufunc, val): + """Determine the output dtype of a unary ufunc with input `val`. + + Use the ufunc.types attribute and the input dtype. + """ + input_dtype = numpy.result_type(val) # find out dtype of scalars + # Look at the built-in implementations to find an output type. + for input_type, _, _, output_type in ufunc.types: + if input_dtype.char == input_type: + return numpy.dtype(output_type) + # Nothing found. Try coercion. + for input_type, _, _, output_type in ufunc.types: + if numpy.can_cast(input_dtype, input_type): + return numpy.dtype(output_type) + else: # Can't even coerce to a known input type. Give up. + msg = "Unary ufunc doesn't have a mapping for this type: {}." + raise TypeError(msg.format(input_dtype)) + + def unary_proxy(name): def proxy_func(a, *args, **kwargs): context = determine_context(a) @@ -44,18 +63,41 @@ def func_call(func_name, arr_name, args, kwargs): dotted_name = 'distarray.localapi.%s' % (func_name,) func = get_from_dotted_name(dotted_name) res = func(arr_name, *args, **kwargs) - return proxyize(res), res.dtype # noqa + return proxyize(res) res = context.apply(func_call, args=(name, a.key, args, kwargs), targets=a.targets) - new_key = res[0][0] - dtype = res[0][1] + new_key = res[0] + dtype = unary_output_dtype(getattr(numpy, name), a) return DistArray.from_localarrays(new_key, distribution=a.distribution, dtype=dtype) return proxy_func +def binary_output_dtype(ufunc, val0, val1): + """Determine the output dtype of a binary ufunc, given input values. + + Use the ufunc.types attribute and the input dtypes. + """ + # find out dtype of scalars + input_dtype_0, input_dtype_1 = map(numpy.result_type, (val0, val1)) + # Look at the built-in implementations to find an output type. + for input_type_0, input_type_1, _, _, output_type in ufunc.types: + if ((input_dtype_0.char == input_type_0) and + (input_dtype_1.char == input_type_1)): + return numpy.dtype(output_type) + # Nothing found. Try coercion. + for input_type_0, input_type_1, _, _, output_type in ufunc.types: + if (numpy.can_cast(input_dtype_0, input_type_0) and + numpy.can_cast(input_dtype_1, input_type_1)): + return numpy.dtype(output_type) + else: # Can't even coerce to a known input type. Give up. + msg = ("Binary ufunc doesn't have a mapping for these input types: " + "{}, {}") + raise TypeError(msg.format(input_dtype_0, input_dtype_1)) + + def binary_proxy(name): def proxy_func(a, b, *args, **kwargs): context = determine_context(a, b) @@ -83,12 +125,12 @@ def func_call(func_name, a, b, args, kwargs): dotted_name = 'distarray.localapi.%s' % (func_name,) func = get_from_dotted_name(dotted_name) res = func(a, b, *args, **kwargs) - return proxyize(res), res.dtype # noqa + return proxyize(res) res = context.apply(func_call, args=(name, a_key, b_key, args, kwargs), - targets=distribution.targets) - new_key = res[0][0] - dtype = res[0][1] + targets=distribution.targets, nresults=1) + new_key = res[0] + dtype = binary_output_dtype(getattr(numpy, name), a, b) return DistArray.from_localarrays(new_key, distribution=distribution, dtype=dtype) diff --git a/distarray/globalapi/maps.py b/distarray/globalapi/maps.py index 69f644b3..a0961ddc 100644 --- a/distarray/globalapi/maps.py +++ b/distarray/globalapi/maps.py @@ -271,7 +271,7 @@ def view(self, new_dimsize): def is_compatible(self, other): return (isinstance(other, (NoDistMap, BlockMap, BlockCyclicMap)) and - other.grid_size == self.grid_size and + other.grid_size == self.grid_size and other.size == self.size) diff --git a/distarray/globalapi/tests/test_context.py b/distarray/globalapi/tests/test_context.py index ea1eca1b..5535ecde 100644 --- a/distarray/globalapi/tests/test_context.py +++ b/distarray/globalapi/tests/test_context.py @@ -19,11 +19,193 @@ from numpy.testing import assert_allclose, assert_array_equal -from distarray.testing import DefaultContextTestCase, IPythonContextTestCase, check_targets -from distarray.globalapi.context import Context +from distarray.testing import (DefaultContextTestCase, IPythonContextTestCase, + MPIContextTestCase, check_targets) +import distarray.globalapi as gapi +from distarray.globalapi.context import DistArray, Context from distarray.globalapi.maps import Distribution from distarray.mpionly_utils import is_solo_mpi_process from distarray.localapi import LocalArray +from distarray.localapi.proxyize import LazyPlaceholder + + +@unittest.skipIf(is_solo_mpi_process(), # not in MPI mode + "Cannot test MPIContext in IPython mode") +class TestLazyEval(MPIContextTestCase): + + ntargets = 'any' + + def test_single_add(self): + a = self.context.zeros((53, 63)) + b = self.context.ones((53, 63)) + self.context.lazy = True + c = a + b + self.assertTrue(isinstance(c.key.dereference(), LazyPlaceholder)) + self.context.sync() + self.context.lazy = False + assert_array_equal(c.toarray(), a.toarray() + b.toarray()) + + def test_single_add_with_context_manager(self): + a = self.context.zeros((53, 63)) + b = self.context.ones((53, 63)) + with self.context.lazy_eval(): + c = a + b + self.assertTrue(isinstance(c.key.dereference(), LazyPlaceholder)) + assert_array_equal(c.toarray(), a.toarray() + b.toarray()) + + def test_single_mult(self): + a = self.context.zeros((54, 64)) + b = self.context.ones((54, 64)) + with self.context.lazy_eval(): + c = a * b + self.assertTrue(isinstance(c.key.dereference(), LazyPlaceholder)) + assert_array_equal(c.toarray(), a.toarray() * b.toarray()) + + def test_constant_mult(self): + a = self.context.zeros((55, 65)) + with self.context.lazy_eval(): + c = a * 2 + self.assertTrue(isinstance(c.key.dereference(), LazyPlaceholder)) + assert_array_equal(c.toarray(), a.toarray() * 2) + + def test_two_identical_add_expr(self): + a = self.context.zeros((56, 66)) + b = self.context.ones((56, 66)) + with self.context.lazy_eval(): + c = a + b + d = a + b + self.assertTrue(isinstance(c.key.dereference(), LazyPlaceholder)) + self.assertTrue(isinstance(d.key.dereference(), LazyPlaceholder)) + assert_array_equal(c.toarray(), a.toarray() + b.toarray()) + assert_array_equal(d.toarray(), a.toarray() + b.toarray()) + + def test_different_adds(self): + a = self.context.zeros((58, 68)) + b = self.context.ones((58, 68)) + c = self.context.ones((58, 68)) + 1 + d = self.context.ones((58, 68)) + 2 + with self.context.lazy_eval(): + e = a + b + f = c + d + self.assertTrue(isinstance(e.key.dereference(), LazyPlaceholder)) + self.assertTrue(isinstance(f.key.dereference(), LazyPlaceholder)) + assert_array_equal(e.toarray(), a.toarray() + b.toarray()) + assert_array_equal(f.toarray(), c.toarray() + d.toarray()) + + def test_more_different_adds(self): + a = self.context.zeros((59, 69)) + b = self.context.ones((59, 69)) + c = self.context.ones((59, 69)) + 1 + with self.context.lazy_eval(): + e = a + b + f = b + c + self.assertTrue(isinstance(e.key.dereference(), LazyPlaceholder)) + self.assertTrue(isinstance(f.key.dereference(), LazyPlaceholder)) + assert_array_equal(e.toarray(), a.toarray() + b.toarray()) + assert_array_equal(f.toarray(), b.toarray() + c.toarray()) + + def test_unary_ufuncs(self): + a = self.context.ones((60, 70)) + b = -1 * self.context.ones((60, 70)) + with self.context.lazy_eval(): + c = -a + d = gapi.absolute(b) + self.assertTrue(isinstance(c.key.dereference(), LazyPlaceholder)) + self.assertTrue(isinstance(d.key.dereference(), LazyPlaceholder)) + assert_array_equal(c.toarray(), -a.toarray()) + assert_array_equal(d.toarray(), numpy.absolute(b.toarray())) + + def test_dependent_add(self): + a = self.context.zeros((61, 71)) + b = self.context.ones((61, 71)) + c = self.context.ones((61, 71)) + 1 + with self.context.lazy_eval(): + t0 = a + b + d = t0 + c + self.assertTrue(isinstance(t0.key.dereference(), LazyPlaceholder)) + self.assertTrue(isinstance(d.key.dereference(), LazyPlaceholder)) + assert_array_equal(d.toarray(), a.toarray() + b.toarray() + c.toarray()) + + def test_temporary_add(self): + a = self.context.zeros((61, 71)) + b = self.context.ones((61, 71)) + c = self.context.ones((61, 71)) + 1 + with self.context.lazy_eval(): + d = a + b + c + self.assertTrue(isinstance(d.key.dereference(), LazyPlaceholder)) + assert_array_equal(d.toarray(), a.toarray() + b.toarray() + c.toarray()) + + def test_complex_expressions(self): + a = self.context.zeros((52, 62)) + b = self.context.ones((52, 62)) + c = self.context.ones((52, 62)) + 1 + with self.context.lazy_eval(): + d = (2*a + (3*b + 4*c)) / 2 + e = gapi.negative(d * d) + self.assertTrue(isinstance(d.key.dereference(), LazyPlaceholder)) + self.assertTrue(isinstance(e.key.dereference(), LazyPlaceholder)) + d_expected = (2*a.toarray() + (3*b.toarray() + 4*c.toarray())) / 2 + e_expected = numpy.negative(d * d) + assert_array_equal(d.toarray(), d_expected) + assert_array_equal(e.toarray(), e_expected) + + def test_lazy_creation(self): + with self.context.lazy_eval(): + a = self.context.zeros((50, 60)) + b = self.context.zeros((50, 60)) + assert_array_equal(b.toarray(), numpy.zeros((50, 60))) + assert_array_equal(a.toarray(), numpy.zeros((50, 60))) + + def test_creation_and_expressions(self): + with self.context.lazy_eval(): + a = self.context.zeros((52, 62)) + b = self.context.ones((52, 62)) + c = self.context.ones((52, 62)) + 1 + d = (2*a + (3*b + 4*c)) / 2 + e = gapi.negative(d * d) + self.assertTrue(isinstance(d.key.dereference(), LazyPlaceholder)) + self.assertTrue(isinstance(e.key.dereference(), LazyPlaceholder)) + d_expected = (2*a.toarray() + (3*b.toarray() + 4*c.toarray())) / 2 + e_expected = numpy.negative(d * d) + assert_array_equal(d.toarray(), d_expected) + assert_array_equal(e.toarray(), e_expected) + + def test_user_function(self): + with self.context.lazy_eval(): + def local_square(la): + return la * la + da = self.context.ones((30, 40)) * 2 + new_key = self.context.apply(local_square, (da.key,), autoproxyize=True)[0] + new_da = DistArray.from_localarrays(key=new_key, + distribution=da.distribution, + dtype=int) + assert_array_equal(new_da.toarray(), (numpy.ones((30, 40)) * 2) ** 2) + + def test_multiple_return_values(self): + with self.context.lazy_eval(): + da = self.context.ones((30, 40)) * 2 + def local_powers(la): + return proxyize(la * la), proxyize(la * la * la) + key0, key1 = self.context.apply(local_powers, (da.key,), nresults=2)[0] + da0 = DistArray.from_localarrays(key=key0, + distribution=da.distribution, + dtype=int) + self.assertTrue(isinstance(da0.key.dereference(), LazyPlaceholder)) + da1 = DistArray.from_localarrays(key=key1, + distribution=da.distribution, + dtype=int) + self.assertTrue(isinstance(da1.key.dereference(), LazyPlaceholder)) + assert_array_equal(da0.toarray(), (numpy.ones((30, 40)) * 2) ** 2) + assert_array_equal(da1.toarray(), (numpy.ones((30, 40)) * 2) ** 3) + + def test_lazy_loop(self): + rvalues = [] + with self.context.lazy_eval(): + for i in range(10): + rvalues.append(i * self.context.ones((10, 3))) + + for i, value in enumerate(rvalues): + assert_array_equal(value.toarray(), i * numpy.ones((10, 3))) class TestRegister(DefaultContextTestCase): @@ -53,7 +235,7 @@ def test_local_sin(self): def local_sin(da): return numpy.sin(da) self.context.register(local_sin) - + db = self.context.local_sin(self.da) assert_allclose(0, db.tondarray(), atol=1e-14) @@ -146,7 +328,7 @@ def local_none(da): self.assertTrue(dp is None) def test_parameterless(self): - + def parameterless(): """This is a parameterless function.""" return None @@ -414,6 +596,7 @@ def local_label(la): self.context.apply(local_label, kwargs={'la': da}) assert_array_equal(da.tondarray(), range(len(self.context.targets))) + class TestGetBaseComm(DefaultContextTestCase): ntargets = 'any' diff --git a/distarray/localapi/proxyize.py b/distarray/localapi/proxyize.py index 97bc32b5..893a673c 100644 --- a/distarray/localapi/proxyize.py +++ b/distarray/localapi/proxyize.py @@ -6,28 +6,47 @@ from importlib import import_module -from distarray.utils import DISTARRAY_BASE_NAME +from distarray.utils import DISTARRAY_BASE_NAME, nonce + + +class LazyPlaceholder(object): + pass + class Proxy(object): - def __init__(self, name, obj, module_name): + def __init__(self, name, obj, module_name, lazy=False): self.name = name self.module_name = module_name + self.lazy = lazy self.type_str = str(type(obj)) namespace = import_module(self.module_name) setattr(namespace, self.name, obj) def dereference(self): - """ Callable only on the engines. """ + """Callable only on the engines.""" namespace = import_module(self.module_name) return getattr(namespace, self.name) def cleanup(self): namespace = import_module(self.module_name) - delattr(namespace, self.name) + if 'lazy' not in self.name: + delattr(namespace, self.name) self.name = self.module_name = self.type_str = None +def lazy_name(): + return DISTARRAY_BASE_NAME + "lazy_" + nonce() + + +def lazy_proxyize(name=None): + """Return a Proxy object for a delayed ("lazy") value.""" + if name is None: + name = lazy_name() + return Proxy(name=name, obj=LazyPlaceholder(), + module_name='__main__', lazy=True) + + class Proxyize(object): """Callable that, given an object, returns a Proxy object. diff --git a/distarray/mpi_engine.py b/distarray/mpi_engine.py index 1bba45c5..42da9cfc 100644 --- a/distarray/mpi_engine.py +++ b/distarray/mpi_engine.py @@ -4,13 +4,15 @@ # Distributed under the terms of the BSD License. See COPYING.rst. # --------------------------------------------------------------------------- """ -The engine_loop function and utilities necessary for it. +The MPI-based `Engine` class. """ +from collections import OrderedDict, Iterable from functools import reduce from importlib import import_module import types +from distarray.externals.six import next from distarray.localapi import LocalArray from distarray.localapi.proxyize import Proxy @@ -21,9 +23,12 @@ class Engine(object): + """MPI-based worker class.""" + INTERCOMM = None def __init__(self): + """Setup and execute the main ``recv`` loop.""" self.world = get_comm_world() self.world_ranks = list(range(self.world.size)) @@ -34,7 +39,10 @@ def __init__(self): # make engines intracomm (Context._base_comm): Engine.INTERCOMM = initial_comm_setup() - assert self.world.rank != self.client_rank + assert self.is_engine() + self.lazy = False + self._client_sendq = [] + self._value_from_name = OrderedDict() while True: msg = Engine.INTERCOMM.recv(source=self.client_rank) val = self.parse_msg(msg) @@ -43,12 +51,16 @@ def __init__(self): Engine.INTERCOMM.Free() def arg_kwarg_proxy_converter(self, args, kwargs): + """Dereference proxy arguments and update computed lazy proxies.""" module = import_module('__main__') # convert args args = list(args) for i, a in enumerate(args): if isinstance(a, module.Proxy): - args[i] = a.dereference() + if a.lazy: + args[i] = self._value_from_name[a.name].dereference() + else: + args[i] = a.dereference() args = tuple(args) # convert kwargs @@ -60,12 +72,14 @@ def arg_kwarg_proxy_converter(self, args, kwargs): return args, kwargs def is_engine(self): + """Is this an engine (as opposed to the client)?""" if self.world.rank != self.client_rank: return True else: return False def parse_msg(self, msg): + """Given a message, execute the proper function.""" to_do = msg[0] what = {'func_call': self.func_call, 'execute': self.execute, @@ -75,12 +89,17 @@ def parse_msg(self, msg): 'free_comm': self.free_comm, 'delete': self.delete, 'make_targets_comm': self.engine_make_targets_comm, - 'builtin_call': self.builtin_call} + 'builtin_call': self.builtin_call, + 'process_message_queue': self.process_message_queue} func = what[to_do] ret = func(msg) return ret def delete(self, msg): + """Process the 'delete' message. + + Cleans up the namespace. + """ obj = msg[1] if isinstance(obj, Proxy): obj.cleanup() @@ -93,7 +112,7 @@ def delete(self, msg): pass def func_call(self, msg): - + """Process the 'func_call' message.""" func_data = msg[1] args = msg[2] kwargs = msg[3] @@ -115,14 +134,16 @@ def func_call(self, msg): res = new_func(*args, **kwargs) if autoproxyize and isinstance(res, LocalArray): res = module.proxyize(res) - Engine.INTERCOMM.send(res, dest=self.client_rank) + self._client_send(res) def execute(self, msg): + """Process the 'execute' message.""" main = import_module('__main__') code = msg[1] exec(code, main.__dict__) def push(self, msg): + """Process the 'push' message.""" d = msg[1] module = import_module('__main__') for k, v in d.items(): @@ -131,24 +152,31 @@ def push(self, msg): setattr(place, pieces[-1], v) def pull(self, msg): + """Process the 'pull' message.""" name = msg[1] module = import_module('__main__') res = reduce(getattr, [module] + name.split('.')) - Engine.INTERCOMM.send(res, dest=self.client_rank) + self._client_send(res) def free_comm(self, msg): + """Call `Free` on a communicator.""" comm = msg[1].dereference() comm.Free() def kill(self, msg): - """Break out of the engine loop.""" + """Process the 'kill' message. + + Breaks out of the engine loop. + """ return 'kill' def engine_make_targets_comm(self, msg): + """Process the 'make_targets_comm' message.""" targets = msg[1] make_targets_comm(targets) def builtin_call(self, msg): + """Process the 'builtin_call' message.""" func = msg[1] args = msg[2] kwargs = msg[3] @@ -156,4 +184,49 @@ def builtin_call(self, msg): args, kwargs = self.arg_kwarg_proxy_converter(args, kwargs) res = func(*args, **kwargs) - Engine.INTERCOMM.send(res, dest=self.client_rank) + self._client_send(res) + + def process_message_queue(self, msg): + """Process the 'process_message_queue' message. + + This temporarily puts the engine in ``lazy`` mode, queueing up all the + replies (`_client_send`) to be sent at once. + """ + # we need the recvq (msg[1]) to see which values are returned from + # which expression + lazy_proxies = msg[1] + # set up mapping from lazy_proxy names to their computed values + # values to be filled in as the queue is processed + self._value_from_name = [] + for val in lazy_proxies: + if isinstance(val, Proxy): + self._value_from_name.append((val.name, None)) + elif isinstance(val, Iterable): + self._value_from_name.extend([(lp.name, None) for lp in val]) + else: + msg = "recvq contains an unrecognized type." + raise TypeError(msg) + self._value_from_name = OrderedDict(self._value_from_name) + + self._current_rval = iter(self._value_from_name) + msgq = msg[2] + self.lazy = True # 'process_message_queue' only received in lazy mode + for submsg in msgq: + val = self.parse_msg(submsg) + if val == 'kill': + return val + self.lazy = False + self._client_send(self._client_sendq) + self._client_sendq = [] + self._value_from_name = OrderedDict() + + def _client_send(self, msg): + """Send a message to the client. + + If in lazy mode, just queue up the message. + """ + if self.lazy: + self._value_from_name[next(self._current_rval)] = msg + self._client_sendq.append(msg) + else: + Engine.INTERCOMM.send(msg, dest=self.client_rank) diff --git a/examples/lazy_eval/__init__.py b/examples/lazy_eval/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/examples/lazy_eval/benchmark_lazy_eval.py b/examples/lazy_eval/benchmark_lazy_eval.py new file mode 100644 index 00000000..fede8352 --- /dev/null +++ b/examples/lazy_eval/benchmark_lazy_eval.py @@ -0,0 +1,90 @@ +#!/usr/bin/env python +# encoding: utf-8 +# --------------------------------------------------------------------------- +# Copyright (C) 2008-2014, IPython Development Team and Enthought, Inc. +# Distributed under the terms of the BSD License. See COPYING.rst. +# --------------------------------------------------------------------------- + +""" +See if lazy evaluation (promise pipelining) can be more efficient than eager +evaluation. +""" + +from __future__ import print_function, division + +import json +import datetime +from sys import stderr +from timeit import default_timer as time + +import numpy +from distarray.globalapi import Context, tanh + + +def benchmark(nops_list, arr_shape): + context = Context() + + data = {"nops": list(nops_list), + "Numpy": [], + "Eager": [], + "Lazy": [], + } + + total_tests = len(nops_list) * 3 + test_num = 1 + for nops in nops_list: + + # bench Numpy + arr = numpy.ones(arr_shape) + start = time() + for _ in range(nops): + arr = numpy.tanh(arr) + result = time() - start + data['Numpy'].append(result) + print('({}/{}), Numpy, {} ops, {:0.3f} s'.format(test_num, total_tests, nops, result), + file=stderr, flush=True) + test_num += 1 + + # bench DistArray eager eval + arr = context.ones(arr_shape) + start = time() + for _ in range(nops): + arr = tanh(arr) + result = time() - start + data['Eager'].append(result) + print('({}/{}), Eager, {} ops, {:0.3f} s'.format(test_num, total_tests, nops, result), + file=stderr, flush=True) + test_num += 1 + + # bench DistArray lazy eval + arr = context.ones(arr_shape) + start = time() + with context.lazy_eval(): + for i in range(nops): + arr = tanh(arr) + result = time() - start + data['Lazy'].append(result) + print('({}/{}), Lazy, {} ops, {:0.3f} s'.format(test_num, total_tests, nops, result), + file=stderr, flush=True) + test_num += 1 + + return data + + +def save_data(data, note=''): + now = datetime.datetime.now() + filename = '_'.join((now.strftime("%Y-%m-%dT%H-%M-%S"), note)) + ".json" + with open(filename, 'w') as fp: + json.dump(data, fp, indent=4) + + +def main(nops_list=None, arr_shape=None, note=''): + nops_list = range(1, 20002, 1000) if nops_list is None else nops_list + arr_shape = (10, 10) if arr_shape is None else arr_shape + data = benchmark(nops_list, arr_shape) + save_data(data, note) + + return data + +if __name__ == '__main__': + main()