Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Add decorators to modify map function behaviour #8

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 10 additions & 13 deletions pasha/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@

from functools import wraps

from .context import (
MapContext, SerialContext, ThreadContext, ProcessContext) # noqa
from .context import ( # noqa
MapContext, SerialContext, ThreadContext, ProcessContext)
from .functor import ( # noqa
SequenceFunctor, NdarrayFunctor, DataArrayFunctor, ExtraDataFunctor)
from .util import ( # noqa
with_init, with_finalize, with_local_copies, with_reduction)


_default_context = None
Expand Down Expand Up @@ -72,19 +74,14 @@ def set_default_context(ctx_or_method, *args, **kwargs):
_default_context = ctx


@wraps(MapContext.array)
def array(*args, **kwargs):
return get_default_context().array(*args, **kwargs)
@wraps(MapContext.alloc)
def alloc(*args, **kwargs):
return get_default_context().alloc(*args, **kwargs)


@wraps(MapContext.array_like)
def array_like(*args, **kwargs):
return get_default_context().array_like(*args, **kwargs)


@wraps(MapContext.array_per_worker)
def array_per_worker(*args, **kwargs):
return get_default_context().array_per_worker(*args, **kwargs)
@wraps(MapContext.alloc_per_worker)
def alloc_per_worker(*args, **kwargs):
return get_default_context().alloc_per_worker(*args, **kwargs)


@wraps(MapContext.map)
Expand Down
187 changes: 152 additions & 35 deletions pasha/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,56 +39,166 @@ def __init__(self, num_workers):

self.num_workers = num_workers

def array(self, shape, dtype=np.float64):
"""Allocate an array shared with all workers.
def _run_hook(self, hook_type, function, *args):
try:
hooks = function._pasha_hooks_
except AttributeError:
# No _pasha_hooks_ attribute.
pass
else:
for hook in hooks:
function = getattr(hook, hook_type)(self, function, *args)

The implementation may decide how to back this memory, but it
is required that all workers of this context may read AND write
to this memory. The base implementation allocates directly
on the heap.
return function

@staticmethod
def _resolve_alloc(shape, dtype, order, like):
"""Resolve allocation arguments.

This method resolves the arguments passed to
:method:`MapContext.alloc` and fills any omitted values.

Returns:
(tuple, DtypeLike, str): Resolved shape, data-type and
memory order.
"""
if like is not None:
shape = shape if shape is not None else like.shape
dtype = dtype if dtype is not None else like.dtype

try:
if order is None:
# Check for C contiguity first, as one-dimensional
# arrays may be both C and F-style at the same time.
# If neither is the case, make the result C-style
# anyway.
if like.flags.c_contiguous:
order = 'C'
elif like.flags.f_contiguous:
order = 'F'
else:
order = 'C'
except AttributeError:
# Existance of the flags property may not be considered
# required for an ArrayLike, so in case it's not just
# go for C-style.
order = 'C'

elif shape is None:
raise ValueError('array shape must be specified')

else:
dtype = dtype if dtype is not None else np.float64
order = order if order is not None else 'C'

if isinstance(shape, int):
shape = (shape,)
elif not isinstance(shape, tuple):
shape = tuple(shape)

return shape, dtype, order

@staticmethod
def _alloc(shape, dtype, order):
"""Low-level allocation.

In most instances, it is recommended to use the high-level
method :method:`MapContext.alloc` instead. Context
implemenattions can override this method to provide alternative
means of providing memory shared with all workers, e.g. via
shared memory segments across processes.

Note that none of the arguments of this method are optional.

The default implementation allocates directly on the heap.

Args:
shape (int or tuple of ints): Shape of the array.
dtype (data-type): Desired data-type for the array.
shape (tuple): Shape of the array.
dtype (DtypeLike): Desired data-type for the array.
order ('C' or 'F') C-style or Fortran-style memory order.

Returns:
(numpy.ndarray) Created array object.
"""

return np.zeros(shape, dtype=dtype)
return np.empty(shape, dtype, order)

def array_like(self, other):
"""Allocate an array with the same shape and dtype as another.
def alloc(self, shape=None, dtype=None, order=None, fill=None, like=None):
"""Allocate an array guaranteed to be shared with all workers.

The implementation may decide how to back this memory, but it
is required that all workers of this context may read AND write
to this memory. By default, it allocates directly on the heap.

Only the shape argument is required for allocation to succeed,
however it may be omitted if an existing ArrayLike object is
passed via the like argument whose shape is taken.

The dtype and order arguments have the default values np.float64
and 'C' if omitted or are taken from the ArrayLike object passed
to like. Only assume C-style or Fortran-style order to be
supported by all context implementations.

If fill is omitted, the resulting array is uninitialized.

Any specified argument always takes precedence over values
inferred from an ArrayLike object passed to like.

Args:
other (ArrayLike): Other array.
shape (int or tuple of ints, optional): Shape of the array.
dtype (DTypeLike, optional): Desired data-type for the array.
order ('C' or 'F', optional): Whether to store multiple
dimensions in row-major (C-style, default) or
column-major (Fortran-style).
fill (Scalar or ArrayLike): Value to initialize to, the
array will be uninitialized if omitted.
like (ArrayLike): Array to cope shape, dtype and order from.

Returns:
(numpy.ndarray) Created array object.
"""

return self.array(other.shape, other.dtype)
array = self._alloc(*self._resolve_alloc(shape, dtype, order, like))

if fill is not None:
array[:] = fill

def array_per_worker(self, shape, dtype=np.float64):
return array

def alloc_per_worker(self, shape=None, dtype=None, order=None, fill=None,
like=None):
"""Allocate a shared array for each worker.

The returned array will contain an additional prepended axis
with its shape corresponding to the number of workers in this
context, i.e. with one dimension more than specified by the
shape parameter. These are useful for parallelized reduction
operations, where each worker may work with its own accumulator.
The returned array will contain an additional axis with its
shape corresponding to the number of workers in this context,
i.e. with one dimension more than specified by the shape
parameter. In row-major order (C-style) the axis is prepended,
in column-major order (Fortran-style) it is appended.

These are useful for parallelized reduction operations, where
each worker may work with its own accumulator, which are all
reduced after mapping (e.g. via sum).

Args:
Same as array()
Same as alloc()

Returns:
(numpy.ndarray) Created array object.
"""

if isinstance(shape, int):
return self.array((self.num_workers, shape), dtype)
else:
return self.array((self.num_workers,) + tuple(shape), dtype)
shape, dtype, order = self._resolve_alloc(shape, dtype, order, like)

if order == 'C':
shape = (self.num_workers,) + shape
elif order == 'F':
shape = shape + (self.num_workers,)

array = self._alloc(shape, dtype, order)

if fill is not None:
array[:] = fill

return array

def map(self, function, functor):
"""Apply a function to a functor.
Expand All @@ -109,8 +219,7 @@ def map(self, function, functor):

raise NotImplementedError('map')

@staticmethod
def run_worker(function, functor, share, worker_id):
def run_worker(self, function, functor, share, worker_id):
"""Main worker loop.

This staticmethod contains the actual inner loop for a worker,
Expand All @@ -130,9 +239,13 @@ def run_worker(function, functor, share, worker_id):
None
"""

function = self._run_hook('pre_worker', function, worker_id)

for entry in functor.iterate(share):
function(worker_id, *entry)

self._run_hook('post_worker', function, worker_id)


class SerialContext(MapContext):
"""Serial map context.
Expand All @@ -146,7 +259,10 @@ def __init__(self, *args, **kwargs):

def map(self, function, functor):
functor = Functor.try_wrap(functor)

function = self._run_hook('pre_map', function)
self.run_worker(function, functor, next(iter(functor.split(1))), 0)
self._run_hook('post_map', function)


class PoolContext(MapContext):
Expand Down Expand Up @@ -187,8 +303,10 @@ def map(self, function, functor, pool_cls):
for worker_id in range(self.num_workers):
self.id_queue.put(worker_id)

function = self._run_hook('pre_map', function)
with pool_cls(self.num_workers, self.init_worker, (functor,)) as p:
p.map(self.run_worker, functor.split(self.num_workers))
self._run_hook('post_map', function)


class ThreadContext(PoolContext):
Expand Down Expand Up @@ -240,13 +358,12 @@ def __init__(self, *args, **kwargs):

self.id_queue = self.mp_ctx.Queue()

def array(self, shape, dtype=np.float64):
if isinstance(shape, int):
n_elements = shape
else:
n_elements = 1
for _s in shape:
n_elements *= _s
def _alloc(self, shape, dtype, order):
# Allocate shared memory via mmap.

n_elements = 1
for _s in shape:
n_elements *= _s

import mmap
n_bytes = n_elements * np.dtype(dtype).itemsize
Expand All @@ -256,8 +373,8 @@ def array(self, shape, dtype=np.float64):
flags=mmap.MAP_SHARED | mmap.MAP_ANONYMOUS,
prot=mmap.PROT_READ | mmap.PROT_WRITE)

return np.frombuffer(memoryview(buf)[:n_bytes],
dtype=dtype).reshape(shape)
return np.frombuffer(memoryview(buf)[:n_bytes], dtype=dtype) \
.reshape(shape, order=order)

def map(self, function, functor):
super().map(function, functor, self.mp_ctx.Pool)
Expand Down
Loading