Skip to content

Commit

Permalink
Unit test GlobusComputeExecutor
Browse files Browse the repository at this point in the history
Use `mock.Mock(spec=)` to spec out that we're at least calling the GC Executor
correctly.
  • Loading branch information
khk-globus committed Jan 14, 2025
1 parent 9b552fa commit 21fdcd1
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 12 deletions.
27 changes: 15 additions & 12 deletions parsl/executors/globus_compute.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
from __future__ import annotations

import copy
import uuid
from concurrent.futures import Future
from typing import Any, Callable, Dict, Optional, Union
from typing import Any, Callable, Dict, Optional

import typeguard

Expand All @@ -17,8 +16,6 @@
except ImportError:
_globus_compute_enabled = False

UUID_LIKE_T = Union[uuid.UUID, str]


class GlobusComputeExecutor(ParslExecutor, RepresentationMixin):
""" GlobusComputeExecutor enables remote execution on Globus Compute endpoints
Expand All @@ -40,11 +37,11 @@ class GlobusComputeExecutor(ParslExecutor, RepresentationMixin):

@typeguard.typechecked
def __init__(
self,
executor: Executor,
label: str = 'GlobusComputeExecutor',
resource_specification: Optional[dict] = None,
user_endpoint_config: Optional[dict] = None,
self,
executor: Executor,
label: str = 'GlobusComputeExecutor',
resource_specification: Optional[dict] = None,
user_endpoint_config: Optional[dict] = None,
):
"""
Parameters
Expand Down Expand Up @@ -119,9 +116,15 @@ def submit(self, func: Callable, resource_specification: Dict[str, Any], *args:
else:
user_endpoint_config = self.user_endpoint_config

self.executor.resource_specification = res_spec
self.executor.user_endpoint_config = user_endpoint_config
return self.executor.submit(func, *args, **kwargs)
orig_spec = self.executor.resource_specification
orig_uep_cof = self.executor.user_endpoint_config
try:
self.executor.resource_specification = res_spec
self.executor.user_endpoint_config = user_endpoint_config
return self.executor.submit(func, *args, **kwargs)
finally:
self.executor.resource_specification = orig_spec
self.executor.user_endpoint_config = orig_uep_cof

def shutdown(self):
"""Clean-up the resources associated with the Executor.
Expand Down
102 changes: 102 additions & 0 deletions parsl/tests/unit/test_globus_compute_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import random
from unittest import mock

import pytest

from parsl.executors import GlobusComputeExecutor
from globus_compute_sdk import Executor


@pytest.fixture
def mock_ex():
# Not Parsl's job to test GC's Executor
yield mock.Mock(spec=Executor)


@pytest.mark.local
def test_gc_executor_mock_spec(mock_ex):
# a test of tests -- make sure we're using spec= in the mock
with pytest.raises(AttributeError):
mock_ex.aasdf()


@pytest.mark.local
def test_gc_executor_label_default(mock_ex):
gce = GlobusComputeExecutor(mock_ex)
assert gce.label == type(gce).__name__, "Expect reasonable default label"


@pytest.mark.local
def test_gc_executor_label(mock_ex, randomstring):
exp_label = randomstring()
gce = GlobusComputeExecutor(mock_ex, label=exp_label)
assert gce.label == exp_label


@pytest.mark.local
def test_gc_executor_resets_spec_after_submit(mock_ex, randomstring):
submit_res = {randomstring(): "some submit res"}
res = {"some": randomstring(), "spec": randomstring()}
gce = GlobusComputeExecutor(mock_ex, resource_specification=res)

fn = mock.Mock()
orig_res = mock_ex.resource_specification
orig_uep = mock_ex.user_endpoint_config

def mock_submit(*a, **k):
assert mock_ex.resource_specification == submit_res, "Expect set for submission"
assert mock_ex.user_endpoint_config is None
mock_ex.submit.side_effect = mock_submit

gce.submit(fn, resource_specification=submit_res)

assert mock_ex.resource_specification == orig_res
assert mock_ex.user_endpoint_config is orig_uep


@pytest.mark.local
def test_gc_executor_resets_uep_after_submit(mock_ex, randomstring):
uep_conf = randomstring()
res = {"some": randomstring()}
gce = GlobusComputeExecutor(mock_ex)

fn = mock.Mock()
orig_res = mock_ex.resource_specification
orig_uep = mock_ex.user_endpoint_config

def mock_submit(*a, **k):

assert mock_ex.resource_specification == res, "Expect set for submission"
assert mock_ex.user_endpoint_config == uep_conf, "Expect set for submission"
mock_ex.submit.side_effect = mock_submit

gce.submit(fn, resource_specification={"user_endpoint_config": uep_conf, **res})

assert mock_ex.resource_specification == orig_res
assert mock_ex.user_endpoint_config is orig_uep


@pytest.mark.local
def test_gc_executor_happy_path(mock_ex, randomstring):
mock_fn = mock.Mock()
args = tuple(randomstring() for _ in range(random.randint(0, 3)))
kwargs = {randomstring(): randomstring() for _ in range(random.randint(0, 3))}

gce = GlobusComputeExecutor(mock_ex)
gce.submit(mock_fn, {}, *args, **kwargs)

assert mock_ex.submit.called, "Expect proxying of args to underlying executor"
found_a, found_k = mock_ex.submit.call_args
assert found_a[0] is mock_fn
assert found_a[1:] == args
assert found_k == kwargs


@pytest.mark.local
def test_gc_executor_shuts_down_asynchronously(mock_ex):
gce = GlobusComputeExecutor(mock_ex)
gce.shutdown()
assert mock_ex.shutdown.called
a, k = mock_ex.shutdown.call_args
assert k["wait"] is False
assert k["cancel_futures"] is True

0 comments on commit 21fdcd1

Please sign in to comment.