diff --git a/.github/workflows/gce_test.yaml b/.github/workflows/gce_test.yaml new file mode 100644 index 0000000000..fb9e1e4cbf --- /dev/null +++ b/.github/workflows/gce_test.yaml @@ -0,0 +1,108 @@ +name: GlobusComputeExecutor tests + +on: + pull_request: + types: + - opened + - synchronize + +env: + PYTHON_VERSION: 3.11 + +jobs: + main-test-suite: + runs-on: ubuntu-20.04 + timeout-minutes: 60 + + steps: + - uses: actions/checkout@master + + - name: Set up Python Environment + uses: actions/setup-python@v4 + with: + python-version: ${{ env.PYTHON_VERSION }} + + - name: Collect Job Information + id: job-info + run: | + echo "Python Version: ${{ env.PYTHON_VERSION }} " >> ci_job_info.txt + echo "CI Triggering Event: ${{ github.event_name }}" >> ci_job_info.txt + echo "Triggering Git Ref: ${{ github.ref }}" >> ci_job_info.txt + echo "Triggering Git SHA: ${{ github.sha }}" >> ci_job_info.txt + echo "Workflow Run: ${{ github.run_number }}" >> ci_job_info.txt + echo "Workflow Attempt: ${{ github.run_attempt }}" >> ci_job_info.txt + as_ascii="$(echo "${{ github.ref_name }}" | perl -pe "s/[^A-z0-9-]+/-/g; s/^-+|-+\$//g; s/--+/-/g;")" + echo "as-ascii=$as_ascii" >> $GITHUB_OUTPUT + + - name: setup virtual env + run: | + make virtualenv + source .venv/bin/activate + + - name: Non-requirements based install + run: | + # mpich: required by mpi4py which is in test-requirements for radical-pilot + sudo apt-get update -q + sudo apt-get install -qy mpich + + - name: make deps clean_coverage + run: | + source .venv/bin/activate + make deps + make clean_coverage + + # Temporary fix until fixes make it to a release + git clone -b main https://github.com/globus/globus-compute.git + pip3 install globus-compute/compute_sdk globus-compute/compute_endpoint + + - name: start globus_compute_endpoint + env: + GLOBUS_COMPUTE_CLIENT_ID: ${{ secrets.GLOBUS_COMPUTE_CLIENT_ID }} + GLOBUS_COMPUTE_CLIENT_SECRET: ${{ secrets.GLOBUS_COMPUTE_SECRET_KEY }} + run: | + source /home/runner/work/parsl/parsl/.venv/bin/activate + globus-compute-endpoint configure default + which globus-compute-endpoint + python3 -c "import globus_compute_sdk; print(globus_compute_sdk.__version__)" + python3 -c "import globus_compute_endpoint; print(globus_compute_endpoint.__version__)" + cat << EOF > /home/runner/.globus_compute/default/config.yaml + engine: + type: ThreadPoolEngine + max_workers: 4 + EOF + cat /home/runner/.globus_compute/default/config.yaml + mkdir ~/.globus_compute/default/tasks_working_dir + globus-compute-endpoint start default + globus-compute-endpoint list + - name: make test + env: + GLOBUS_COMPUTE_CLIENT_ID: ${{ secrets.GLOBUS_COMPUTE_CLIENT_ID }} + GLOBUS_COMPUTE_CLIENT_SECRET: ${{ secrets.GLOBUS_COMPUTE_SECRET_KEY }} + run: | + source .venv/bin/activate + export GLOBUS_COMPUTE_ENDPOINT=$(globus-compute-endpoint list | grep default | cut -c 3-38) + echo "GLOBUS_COMPUTE_ENDPOINT = $GLOBUS_COMPUTE_ENDPOINT" + + export PARSL_TEST_PRESERVE_NUM_RUNS=7 + + make gce_test + ln -s pytest-parsl/parsltest-current test_runinfo + + - name: stop globus_compute_endpoint + env: + GLOBUS_COMPUTE_CLIENT_ID: ${{ secrets.GLOBUS_COMPUTE_CLIENT_ID }} + GLOBUS_COMPUTE_CLIENT_SECRET: ${{ secrets.GLOBUS_COMPUTE_SECRET_KEY }} + run: | + source /home/runner/work/parsl/parsl/.venv/bin/activate + globus-compute-endpoint stop default + + - name: Archive runinfo logs + if: ${{ always() }} + uses: actions/upload-artifact@v4 + with: + name: runinfo-${{ env.PYTHON_VERSION }}-${{ steps.job-info.outputs.as-ascii }}-${{ github.sha }} + path: | + runinfo/ + pytest-parsl/ + ci_job_info.txt + compression-level: 9 \ No newline at end of file diff --git a/Makefile b/Makefile index 4d2f37f715..ad127f2c23 100644 --- a/Makefile +++ b/Makefile @@ -51,6 +51,10 @@ clean_coverage: mypy: ## run mypy checks MYPYPATH=$(CWD)/mypy-stubs mypy parsl/ +.PHONY: gce_test +gce_test: ## Run tests with GlobusComputeExecutor + pytest -v -k "not shared_fs and not issue_3620 and not staging_required" --config parsl/tests/configs/globus_compute.py parsl/tests/ --random-order --durations 10 + .PHONY: local_thread_test local_thread_test: ## run all tests with local_thread config pytest parsl/tests/ -k "not cleannet" --config parsl/tests/configs/local_threads.py --random-order --durations 10 diff --git a/docs/reference.rst b/docs/reference.rst index 45f83ad36f..5933f7841b 100644 --- a/docs/reference.rst +++ b/docs/reference.rst @@ -77,6 +77,7 @@ Executors parsl.executors.taskvine.TaskVineExecutor parsl.executors.FluxExecutor parsl.executors.radical.RadicalPilotExecutor + parsl.executors.globus_compute.GlobusComputeExecutor Manager Selectors ================= diff --git a/docs/userguide/execution.rst b/docs/userguide/execution.rst index 832985c164..848a3114e4 100644 --- a/docs/userguide/execution.rst +++ b/docs/userguide/execution.rst @@ -87,6 +87,9 @@ Parsl currently supports the following executors: 4. `parsl.executors.taskvine.TaskVineExecutor`: This executor uses `TaskVine `_ as the execution backend. TaskVine scales up to tens of thousands of cores and actively uses local storage on compute nodes to offer a diverse array of performance-oriented features, including: smart caching and sharing common large files between tasks and compute nodes, reliable execution of tasks, dynamic resource sizing, automatic Python environment detection and sharing. These executors cover a broad range of execution requirements. As with other Parsl components, there is a standard interface (ParslExecutor) that can be implemented to add support for other executors. +5. `parsl.executors.globus_compute.GlobusComputeExecutor`: This executor uses `Globus Compute `_ +as the execution backend to run tasks on remote systems. + .. note:: Refer to :ref:`configuration-section` for information on how to configure these executors. diff --git a/mypy.ini b/mypy.ini index 4b64a12de2..e46e11fd63 100644 --- a/mypy.ini +++ b/mypy.ini @@ -177,6 +177,9 @@ ignore_missing_imports = True #[mypy-multiprocessing.synchronization.*] #ignore_missing_imports = True +[mypy-globus_compute_sdk.*] +ignore_missing_imports = True + [mypy-pandas.*] ignore_missing_imports = True diff --git a/parsl/executors/__init__.py b/parsl/executors/__init__.py index bc29204502..81955aab76 100644 --- a/parsl/executors/__init__.py +++ b/parsl/executors/__init__.py @@ -1,4 +1,5 @@ from parsl.executors.flux.executor import FluxExecutor +from parsl.executors.globus_compute import GlobusComputeExecutor from parsl.executors.high_throughput.executor import HighThroughputExecutor from parsl.executors.high_throughput.mpi_executor import MPIExecutor from parsl.executors.threads import ThreadPoolExecutor @@ -8,4 +9,5 @@ 'HighThroughputExecutor', 'MPIExecutor', 'WorkQueueExecutor', - 'FluxExecutor'] + 'FluxExecutor', + 'GlobusComputeExecutor'] diff --git a/parsl/executors/globus_compute.py b/parsl/executors/globus_compute.py new file mode 100644 index 0000000000..228a8c6664 --- /dev/null +++ b/parsl/executors/globus_compute.py @@ -0,0 +1,132 @@ +from __future__ import annotations + +import copy +import uuid +from concurrent.futures import Future +from typing import Any, Callable, Dict, Optional, Union + +import typeguard + +from parsl.errors import OptionalModuleMissing +from parsl.executors.base import ParslExecutor +from parsl.utils import RepresentationMixin + +try: + from globus_compute_sdk import Executor + _globus_compute_enabled = True +except ImportError: + _globus_compute_enabled = False + +UUID_LIKE_T = Union[uuid.UUID, str] + + +class GlobusComputeExecutor(ParslExecutor, RepresentationMixin): + """ GlobusComputeExecutor enables remote execution on Globus Compute endpoints + + GlobusComputeExecutor is a thin wrapper over globus_compute_sdk.Executor + Refer to `globus-compute user documentation `_ + and `reference documentation `_ + for more details. + + .. note:: + As a remote execution system, Globus Compute relies on serialization to ship + tasks and results between the Parsl client side and the remote Globus Compute + Endpoint side. Serialization is unreliable across python versions, and + wrappers used by Parsl assume identical Parsl versions across on both sides. + We recommend using matching Python, Parsl and Globus Compute version on both + the client side and the endpoint side for stable behavior. + + """ + + @typeguard.typechecked + def __init__( + self, + executor: Executor, + label: str = 'GlobusComputeExecutor', + resource_specification: Optional[dict] = None, + user_endpoint_config: Optional[dict] = None, + ): + """ + Parameters + ---------- + + executor: globus_compute_sdk.Executor + Pass a globus_compute_sdk Executor that will be used to execute + tasks on a globus_compute endpoint. Refer to `globus-compute docs + `_ + + label: + a label to name the executor + + resource_specification: + Specify resource requirements for individual task execution. + + user_endpoint_config: + User endpoint configuration values as described + and allowed by endpoint administrators. Must be a JSON-serializable dict + or None. Refer docs from `globus-compute + `_ + for more info. + + """ + if not _globus_compute_enabled: + raise OptionalModuleMissing( + ['globus-compute-sdk'], + "GlobusComputeExecutor requires globus-compute-sdk installed" + ) + + super().__init__() + self.executor: Executor = executor + self.resource_specification = resource_specification + self.user_endpoint_config = user_endpoint_config + self.label = label + + def start(self) -> None: + """ Start the Globus Compute Executor """ + pass + + def submit(self, func: Callable, resource_specification: Dict[str, Any], *args: Any, **kwargs: Any) -> Future: + """ Submit func to globus-compute + + + Parameters + ---------- + + func: Callable + Python function to execute remotely + + resource_specification: Dict[str, Any] + Resource specification can be used specify MPI resources required by MPI applications on + Endpoints configured to use globus compute's MPIEngine. GCE also accepts *user_endpoint_config* + to configure endpoints when the endpoint is a `Multi-User Endpoint + `_ + + args: + Args to pass to the function + + kwargs: + kwargs to pass to the function + + Returns + ------- + + Future + """ + res_spec = copy.deepcopy(resource_specification or self.resource_specification) + # Pop user_endpoint_config since it is illegal in resource_spec for globus_compute + if res_spec: + user_endpoint_config = res_spec.pop('user_endpoint_config', self.user_endpoint_config) + else: + user_endpoint_config = self.user_endpoint_config + + self.executor.resource_specification = res_spec + self.executor.user_endpoint_config = user_endpoint_config + return self.executor.submit(func, *args, **kwargs) + + def shutdown(self): + """Clean-up the resources associated with the Executor. + + GCE.shutdown will cancel all futures that have not yet registered with + Globus Compute and will not wait for the launched futures to complete. + """ + self.executor.shutdown(wait=False, cancel_futures=True) diff --git a/parsl/tests/configs/globus_compute.py b/parsl/tests/configs/globus_compute.py new file mode 100644 index 0000000000..19f90b8c20 --- /dev/null +++ b/parsl/tests/configs/globus_compute.py @@ -0,0 +1,20 @@ +import os + +from globus_compute_sdk import Executor + +from parsl.config import Config +from parsl.executors import GlobusComputeExecutor + + +def fresh_config(): + + endpoint_id = os.environ["GLOBUS_COMPUTE_ENDPOINT"] + + return Config( + executors=[ + GlobusComputeExecutor( + executor=Executor(endpoint_id=endpoint_id), + label="globus_compute", + ) + ] + ) diff --git a/parsl/tests/conftest.py b/parsl/tests/conftest.py index 4bcdde0b7a..4f1281025c 100644 --- a/parsl/tests/conftest.py +++ b/parsl/tests/conftest.py @@ -163,6 +163,10 @@ def pytest_configure(config): 'markers', 'shared_fs: Marks tests that require a shared_fs between the workers are the test client' ) + config.addinivalue_line( + 'markers', + 'issue_3620: Marks tests that do not work correctly on GlobusComputeExecutor (ref: issue 3620)' + ) @pytest.fixture(autouse=True, scope='session') diff --git a/parsl/tests/test_error_handling/test_resource_spec.py b/parsl/tests/test_error_handling/test_resource_spec.py index 4616219be2..7def2b736c 100644 --- a/parsl/tests/test_error_handling/test_resource_spec.py +++ b/parsl/tests/test_error_handling/test_resource_spec.py @@ -1,3 +1,5 @@ +import pytest + import parsl from parsl.app.app import python_app from parsl.executors import WorkQueueExecutor @@ -11,6 +13,7 @@ def double(x, parsl_resource_specification={}): return x * 2 +@pytest.mark.issue_3620 def test_resource(n=2): executors = parsl.dfk().executors executor = None diff --git a/setup.py b/setup.py index cace8c0252..8336e4db7b 100755 --- a/setup.py +++ b/setup.py @@ -34,6 +34,7 @@ 'flux': ['pyyaml', 'cffi', 'jsonschema'], 'proxystore': ['proxystore'], 'radical-pilot': ['radical.pilot==1.60', 'radical.utils==1.60'], + 'globus_compute': ['globus_compute_sdk>=2.27.1'], # Disabling psi-j since github direct links are not allowed by pypi # 'psij': ['psi-j-parsl@git+https://github.com/ExaWorks/psi-j-parsl'] } diff --git a/test-requirements.txt b/test-requirements.txt index 82ec5172c2..5016c5c48d 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -10,6 +10,7 @@ types-mock types-python-dateutil types-requests mpi4py +globus-compute-sdk>=2.27.1 # sqlalchemy is needed for typechecking, so it's here # as well as at runtime for optional monitoring execution