Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lift [] and . operators into AppFutures #2904

Merged
merged 27 commits into from
Oct 21, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
dfd4d85
rough implementation
benclifford Oct 9, 2023
2c9d8f9
add functionality
benclifford Oct 9, 2023
6efedb5
Fix whitespace
benclifford Oct 9, 2023
9036a7e
Execute on _parsl_internal, and add a TODO about choosing which DFK t…
benclifford Oct 9, 2023
d68664e
Add some discussion on DFK choice
benclifford Oct 9, 2023
575b0ba
Merge remote-tracking branch 'origin/master' into benc-lifted-dict
benclifford Oct 12, 2023
e6ff04b
Add some more tests for lifted getitem
benclifford Oct 12, 2023
8bbc054
Rename test case as it tests more than dicts now
benclifford Oct 12, 2023
b72216e
Store DFK into task record, for use by follow on tasks that should be…
benclifford Oct 12, 2023
2ec3116
Bind to same DFK as parent task
benclifford Oct 12, 2023
e52c3f3
Add brief documentation on lifted []
benclifford Oct 12, 2023
3af9963
Add more notes to doc
benclifford Oct 12, 2023
90c0c17
Add `__getattr__` support to PR 2904 (#2906)
Andrew-S-Rosen Oct 13, 2023
fdcf927
Merge remote-tracking branch 'refs/remotes/origin/benc-lifted-dict' i…
benclifford Oct 13, 2023
c04ff9e
Merge branch 'master' into benc-lifted-dict
benclifford Oct 13, 2023
fcde3ea
Skip test that conflicts with wq/taskvine return handling
benclifford Oct 14, 2023
fe4d351
More docs: comments from Andrew Rosen and document his . implementation
benclifford Oct 14, 2023
ab9fcfc
Bring my recent [] changes to Andrew's . implementation
benclifford Oct 14, 2023
de56ce9
Merge tests into one file
benclifford Oct 14, 2023
892759a
Add class instance .-operator test, which is less extreme than broken…
benclifford Oct 14, 2023
f57d197
Avoid lifting attribute references on _underscored attributes
benclifford Oct 14, 2023
47f0b3f
Fix up headings
benclifford Oct 14, 2023
b526da1
Fix quotation style around []
benclifford Oct 14, 2023
09865b4
Remove in-function import and use package level import like in __geti…
benclifford Oct 14, 2023
1f3e820
Fix typo
benclifford Oct 14, 2023
e2dc1d8
Merge remote-tracking branch 'origin/master' into benc-lifted-dict
benclifford Oct 21, 2023
6188e21
Use renamed task_def=>task_record in this PR
benclifford Oct 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/userguide/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ User guide
monitoring
workflow
modularizing
lifted_ops
joins
usage_tracking
plugins
Expand Down
34 changes: 34 additions & 0 deletions docs/userguide/lifted_ops.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
.. _label-liftedops:

Lifted [] operator
==================

When an app returns a complex structure such as a ``dict`` or a ``list``,
it is sometimes useful to pass an element of that structure to a subsequent
task, without waiting for that subsequent task to complete.

To help with this, Parsl allows the ``[]`` operator to be used on and
benclifford marked this conversation as resolved.
Show resolved Hide resolved
`AppFuture`. This operator will return return another `AppFuture` that will
benclifford marked this conversation as resolved.
Show resolved Hide resolved
complete after the initial future, with the result of `[]` on the value
of the initial future.

The end result is that this assertion will hold:

.. code-block:: python

fut = my_app()
assert fut['x'].result() == fut.result()[x]

but more concurrency will be available, as execution of the main workflow
code will not stop to wait for ``result()`` to complete on the initial
future.

`AppFuture` does not implement other methods commonly associated with
dicts and lists, such as ``len``, because those methods should return a
specific type of result immediately, and that is not possible when the
results are not available until the future.

If a key does not exist in the returned result, then the exception will
appear in the Future returned by ``[]``, rather than at the point that
the ``[]`` operator is applied. This is because the valid values that can
be used are not known until the underlying result is available.
1 change: 1 addition & 0 deletions parsl/dataflow/dflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -969,6 +969,7 @@ def submit(self,

task_def: TaskRecord
task_def = {'depends': [],
'dfk': self,
'executor': executor,
'func_name': func.__name__,
'memoize': cache,
Expand Down
43 changes: 42 additions & 1 deletion parsl/dataflow/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@
2. AppFutures which represent the futures on App/Leaf tasks.

"""
from __future__ import annotations

from concurrent.futures import Future
import logging
import threading
from typing import Optional, Sequence
from typing import Any, Optional, Sequence

import parsl.app.app as app

from parsl.app.futures import DataFuture
from parsl.dataflow.taskrecord import TaskRecord
Expand Down Expand Up @@ -118,3 +121,41 @@ def task_status(self) -> str:
@property
def outputs(self) -> Sequence[DataFuture]:
return self._outputs

def __getitem__(self, key: Any) -> AppFuture:
# This is decorated on each invocation because the getitem task
# should be bound to the same DFK as the task associated with this
# Future.
deferred_getitem_app = app.python_app(deferred_getitem, executors=['_parsl_internal'], data_flow_kernel=self.task_def['dfk'])

return deferred_getitem_app(self, key)

def __getattr__(self, name: str) -> AppFuture:
# hack around circular imports for python_app
from parsl.app.app import python_app

# TODO: it would be nice to avoid redecorating this each time,
# which was done to avoid import loops here -- but the DFK
# is not defined at import time, and so this decoration needs
# to happen at least once per DFK. So perhaps for implementation
# simplicity, this redecoration should always happen, as happens
# for example with the globus data provider.

# TODO: this should be run on the same DFK as is executing the
# task that is associated with this future. That value isn't
# easily available here (although probably the right thing to
# do is add it to self.task_def)
deferred_getattr_app = python_app(deferred_getattr, executors=['_parsl_internal'])

return deferred_getattr_app(self, name)


def deferred_getitem(o: Any, k: Any) -> Any:
return o[k]


# this needs python_app to be importable, but three's an import loop
# if so... so hack around it for prototyping.
# @python_app
def deferred_getattr(o: Any, name: str) -> Any:
return getattr(o, name)
7 changes: 7 additions & 0 deletions parsl/dataflow/taskrecord.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,25 @@
from typing_extensions import TypedDict
from concurrent.futures import Future


# only for type checking:
from typing import Any, Callable, Dict, Optional, List, Sequence, TYPE_CHECKING, Union

if TYPE_CHECKING:
from parsl.dataflow.futures import AppFuture

import parsl.dataflow.dflow as dflow

from parsl.dataflow.states import States


class TaskRecord(TypedDict, total=False):
"""This stores most information about a Parsl task"""

dfk: dflow.DataFlowKernel
"""The DataFlowKernel which is managing this task.
"""

func_name: str

status: States
Expand Down
75 changes: 75 additions & 0 deletions parsl/tests/test_python_apps/test_lifted.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from parsl import python_app


@python_app
def returns_a_dict():
return {"a": "X", "b": "Y"}


@python_app
def returns_a_list():
return ["X", "Y"]


@python_app
def returns_a_tuple():
return ("X", "Y")


@python_app
def returns_a_class():
from dataclasses import dataclass

@dataclass
class MyClass:
a: str = "X"
b: str = "Y"

return MyClass


def test_returns_a_dict():

# precondition that returns_a_dict behaves
# correctly
assert returns_a_dict().result()["a"] == "X"

# check that the deferred __getitem__ functionality works,
# allowing [] to be used on an AppFuture
assert returns_a_dict()["a"].result() == "X"


def test_returns_a_list():

# precondition that returns_a_list behaves
# correctly
assert returns_a_list().result()[0] == "X"

# check that the deferred __getitem__ functionality works,
# allowing [] to be used on an AppFuture
assert returns_a_list()[0].result() == "X"


def test_returns_a_tuple():

# precondition that returns_a_tuple behaves
# correctly
assert returns_a_tuple().result()[0] == "X"

# check that the deferred __getitem__ functionality works,
# allowing [] to be used on an AppFuture
assert returns_a_tuple()[0].result() == "X"


def test_returns_a_class():

# precondition that returns_a_class behaves
# correctly
assert returns_a_class().result().a == "X"

# check that the deferred __getattr__ functionality works,
# allowing [] to be used on an AppFuture
assert returns_a_class().a.result() == "X"

# when the result is not indexable, a sensible error should
# appear in the appropriate future
55 changes: 55 additions & 0 deletions parsl/tests/test_python_apps/test_lifted_getitem.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from parsl import python_app
from concurrent.futures import Future


@python_app
def returns_a_dict():
return {"a": "X", "b": "Y"}


@python_app
def returns_a_list():
return [5, 6, 7, 8, 9]


@python_app
def passthrough(v):
return v


def test_lifted_getitem_on_dict():
# check that the deferred __getitem__ functionality works,
# allowing [] to be used on an AppFuture
assert returns_a_dict()["a"].result() == "X"


def test_lifted_getitem_on_dict_bad_key():
assert isinstance(returns_a_dict()["invalid"].exception(), KeyError)


def test_lifted_getitem_on_list():
assert returns_a_list()[2].result() == 7


def test_lifted_getitem_ordering():
# this should test that lifting getitem has the correct execution
# order: that it does not defer the execution of following code

f_prereq = Future()

f_top = passthrough(f_prereq)

f_a = f_top['a']

# lifted ['a'] should not defer execution here (so it should not
# implicitly call result() on f_top). If it does, this test will
# hang at this point, waiting for f_top to get a value, which
# will not happen until f_prereq gets a value..
# which doesn't happen until:

f_prereq.set_result({"a": "X"})

# now at this point it should be safe to wait for f_a to get a result
# while passthrough and lifted getitem run...

assert f_a.result() == "X"