Skip to content

Commit

Permalink
Merge pull request #765 from pyiron/executor_submodule
Browse files Browse the repository at this point in the history
  • Loading branch information
liamhuber authored Jul 11, 2023
2 parents a90a1eb + b8aed7e commit 4a35eb2
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 0 deletions.
3 changes: 3 additions & 0 deletions pyiron_contrib/executors/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Executors

This sub-module holds custom children of `concurrent.futures.Executor` for use in other parts of pyiron (e.g. `pyiron_contrib.workflow`).
Empty file.
29 changes: 29 additions & 0 deletions pyiron_contrib/executors/executors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from concurrent.futures import ProcessPoolExecutor

import cloudpickle


def _apply_cloudpickle(fn, /, *args, **kwargs):
fn = cloudpickle.loads(fn)
return fn(*args, **kwargs)


class CloudpickleProcessPoolExecutor(ProcessPoolExecutor):
"""
This executor behaves like `concurrent.futures.ProcessPoolExecutor`, except that
non-pickleable callables may also be submit (e.g. dynamically defined functions).
This is accomplished by replacing the `pickle` backend of the
`concurrent.futures.ProcessPoolExecutor` with a backend from `cloudpickle` when
serializing the callable.
This solution comes from u/mostsquares @ stackoverflow:
https://stackoverflow.com/questions/62830970/submit-dynamically-loaded-functions-to-the-processpoolexecutor
Note: Arguments and return values must still be regularly pickleable.
"""

def submit(self, fn, /, *args, **kwargs):
return super().submit(
_apply_cloudpickle, cloudpickle.dumps(fn), *args, **kwargs
)
Empty file added tests/unit/executor/__init__.py
Empty file.
105 changes: 105 additions & 0 deletions tests/unit/executor/test_executors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
from functools import partialmethod
from pickle import PickleError
from time import sleep
import unittest

from pyiron_contrib.executors.executors import CloudpickleProcessPoolExecutor


class Foo:
"""
A base class to be dynamically modified for testing CloudpickleProcessPoolExecutor.
"""
def __init__(self, fnc: callable):
self.fnc = fnc
self.result = None

@property
def run(self):
return self.fnc

def process_result(self, future):
self.result = future.result()


def dynamic_foo():
"""
A decorator for dynamically modifying the Foo class to test
CloudpickleProcessPoolExecutor.
Overrides the `fnc` input of `Foo` with the decorated function.
"""
def as_dynamic_foo(fnc: callable):
return type(
"DynamicFoo",
(Foo,), # Define parentage
{
"__init__": partialmethod(
Foo.__init__,
fnc
)
},
)

return as_dynamic_foo


class TestCloudpickleProcessPoolExecutor(unittest.TestCase):
def test_unpickleable_callable(self):
"""
We should be able to use an unpickleable callable -- in this case, a method of
a dynamically defined class.
"""
fortytwo = 42 # No magic numbers; we use it in a couple places so give it a var

@dynamic_foo()
def slowly_returns_42():
sleep(0.1)
return fortytwo

dynamic_42 = slowly_returns_42() # Instantiate the dynamically defined class
self.assertIsInstance(
dynamic_42,
Foo,
msg="Just a sanity check that the test is set up right"
)
self.assertIsNone(
dynamic_42.result,
msg="Just a sanity check that the test is set up right"
)
executor = CloudpickleProcessPoolExecutor()
fs = executor.submit(dynamic_42.run)
fs.add_done_callback(dynamic_42.process_result)
self.assertFalse(fs.done(), msg="Should be running on the executor")
self.assertEqual(fortytwo, fs.result(), msg="Future must complete")
self.assertEqual(fortytwo, dynamic_42.result, msg="Callback must get called")

def test_unpickleable_return(self):
"""
We should _not_ be able to use an unpickleable return value -- in this case, a
method of a dynamically defined class.
"""

@dynamic_foo()
def does_nothing():
return

@dynamic_foo()
def slowly_returns_unpickleable():
"""
Returns a complex, dynamically defined variable
"""
sleep(0.1)
inside_variable = does_nothing()
inside_variable.result = "it was an inside job!"
return inside_variable

dynamic_dynamic = slowly_returns_unpickleable()
executor = CloudpickleProcessPoolExecutor()
fs = executor.submit(dynamic_dynamic.run)
with self.assertRaises(PickleError):
print(fs.result()) # Can't (un)pickle the result


if __name__ == '__main__':
unittest.main()

0 comments on commit 4a35eb2

Please sign in to comment.