Skip to content

Commit

Permalink
Support for GlobusComputeEngines (globus#1129)
Browse files Browse the repository at this point in the history
* Adding Engine infrastructure to replace executors

* Adding `ProcessPoolEngine` based on `concurrent.futures.ProcessPoolExecutor`
* Adding `ThreadPoolEngine` based on `concurrent.futures.ThreadPoolEngine`
* Adding `GlobusComputeEngine` based on `parsl.executors.HighThroughputExecutor`
* Adding `execute_task(messagepacked_payload)-> messagepacked_result` wrapper
* Adding a new `GlobusComputeEngineBase` class

* * Adding tests for `engines`
* Tests for the older `HighThroughputExecutor` (funcx' fork) are broken and will be fixed in a follow up PR
* Marking failing HTEX tests to skip
* Adding changelog fragment
  • Loading branch information
yadudoc authored May 5, 2023
1 parent 8cbed37 commit 11e07e6
Show file tree
Hide file tree
Showing 13 changed files with 1,257 additions and 0 deletions.
43 changes: 43 additions & 0 deletions changelog.d/20230503_101434_yadudoc1729_unfork_htex_2.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
.. A new scriv changelog fragment.
..
.. Uncomment the header that is right (remove the leading dots).
..
New Functionality
^^^^^^^^^^^^^^^^^

- Support for 3 new execution ``Engines``, designed to replace the ``HighThroughputExecutor``

- ``GlobusComputeEngine``: Wraps Parsl's ``HighThroughputExecutor`` to match the current
default executor (globus-computes' fork of ``HighThroughputExecutor``)
- ``ProcessPoolEngine``: Wraps ``concurrent.futures.ProcessPoolExecutor`` for concurrent
local execution
- ``ThreadPoolEngine``: Wraps ``concurrent.futures.ThreadPoolEngine`` for concurrent
local execution on MacOS.

.. - A bullet item for the New Functionality category.
..
.. Bug Fixes
.. ^^^^^^^^^
..
.. - A bullet item for the Bug Fixes category.
..
.. Removed
.. ^^^^^^^
..
.. - A bullet item for the Removed category.
..
.. Deprecated
.. ^^^^^^^^^^
..
.. - A bullet item for the Deprecated category.
..
.. Changed
.. ^^^^^^^
..
.. - A bullet item for the Changed category.
..
.. Security
.. ^^^^^^^^
..
.. - A bullet item for the Security category.
..
9 changes: 9 additions & 0 deletions compute_endpoint/globus_compute_endpoint/engines/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from globus_compute_endpoint.engines.globus_compute import GlobusComputeEngine
from globus_compute_endpoint.engines.process_pool import ProcessPoolEngine
from globus_compute_endpoint.engines.thread_pool import ThreadPoolEngine

__all__ = [
"GlobusComputeEngine",
"ProcessPoolEngine",
"ThreadPoolEngine",
]
176 changes: 176 additions & 0 deletions compute_endpoint/globus_compute_endpoint/engines/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
import logging
import queue
import threading
import time
import typing as t
import uuid
from abc import ABC, abstractmethod
from concurrent.futures import Future

from globus_compute_common import messagepack
from globus_compute_common.messagepack.message_types import (
EPStatusReport,
Result,
TaskTransition,
)
from globus_compute_common.tasks import ActorName, TaskState
from globus_compute_endpoint.engines.helper import execute_task
from globus_compute_endpoint.exception_handling import (
get_error_string,
get_result_error_details,
)

logger = logging.getLogger(__name__)


class ReportingThread:
def __init__(
self, target: t.Callable, args: t.List, reporting_period: float = 30.0
):
"""This class wraps threading.Thread to run a callable in a loop
periodically until the user calls `stop`. A status attribute can
report exceptions to the parent thread upon failure.
Parameters
----------
target: Target function to be invoked to get report and post to queue
args: args to be passed to target fn
kwargs: kwargs to be passed to target fn
reporting_period
"""
self.status: Future = Future()
self._shutdown_event = threading.Event()
self.reporting_period = reporting_period
self._thread = threading.Thread(
target=self.run_in_loop, args=[target] + args, name="GCReportingThread"
)

def start(self):
logger.info("Start called")
self._thread.start()

def run_in_loop(self, target: t.Callable, *args) -> None:
while True:
try:
target(*args)
except Exception as e:
# log and update future before exiting, if it is not already set
self.status.set_exception(exception=e)
self._shutdown_event.set()
if self._shutdown_event.wait(timeout=self.reporting_period):
break

logger.warning("ReportingThread exiting")

def stop(self) -> None:
self._shutdown_event.set()
self._thread.join(timeout=0.1)


class GlobusComputeEngineBase(ABC):
"""Shared functionality and interfaces required by all GlobusCompute Engines.
This is designed to plug-in executors following the concurrent.futures.Executor
interface as execution backends to GlobusCompute
"""

def __init__(
self,
*args: object,
heartbeat_period_s: float = 30.0,
endpoint_id: t.Optional[uuid.UUID] = None,
**kwargs: object,
):
self._shutdown_event = threading.Event()
self._heartbeat_period_s = heartbeat_period_s
self.endpoint_id = endpoint_id

# remove these unused vars that we are adding to just keep
# endpoint interchange happy
self.container_type: t.Optional[str] = None
self.funcx_service_address: t.Optional[str] = None
self.run_dir: t.Optional[str] = None
# This attribute could be set by the subclasses in their
# start method if another component insists on owning the queue.
self.results_passthrough: queue.Queue = queue.Queue()

@abstractmethod
def start(
self,
*args,
**kwargs,
) -> None:
raise NotImplementedError

@abstractmethod
def get_status_report(self) -> EPStatusReport:
raise NotImplementedError

def report_status(self):
status_report = self.get_status_report()
packed_status = messagepack.pack(status_report)
self.results_passthrough.put(packed_status)

def _status_report(
self, shutdown_event: threading.Event, heartbeat_period_s: float
):
while not shutdown_event.wait(timeout=heartbeat_period_s):
status_report = self.get_status_report()
packed = messagepack.pack(status_report)
self.results_passthrough.put(packed)

def _future_done_callback(self, future: Future):
"""Callback to post result to the passthrough queue
Parameters
----------
future: Future for which the callback is triggerd
"""

if future.exception():
code, user_message = get_result_error_details()
error_details = {"code": code, "user_message": user_message}
exec_end = TaskTransition(
timestamp=time.time_ns(),
state=TaskState.EXEC_END,
actor=ActorName.WORKER,
)
result_message = dict(
task_id=future.task_id, # type: ignore
data=get_error_string(),
exception=get_error_string(),
error_details=error_details,
task_statuses=[exec_end], # We don't have any more info transitions
)
packed_result = messagepack.pack(Result(**result_message))
else:
packed_result = future.result()

self.results_passthrough.put(packed_result)

@abstractmethod
def _submit(
self,
func: t.Callable,
*args: t.Any,
**kwargs: t.Any,
) -> Future:
"""Subclass should use the internal execution system to implement this"""
raise NotImplementedError()

def submit(self, task_id: uuid.UUID, packed_task: bytes) -> Future:
"""GC Endpoints should submit tasks via this method so that tasks are
tracked properly.
Parameters
----------
packed_task: messagepack bytes buffer
Returns
-------
future
"""

future: Future = self._submit(execute_task, packed_task)

# Executors mark futures are failed in the event of faults
# We need to tie the task_id info into the future to identify
# which tasks have failed
future.task_id = task_id # type: ignore
future.add_done_callback(self._future_done_callback)
return future
113 changes: 113 additions & 0 deletions compute_endpoint/globus_compute_endpoint/engines/globus_compute.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import logging
import multiprocessing
import os
import typing as t
import uuid
from concurrent.futures import Future

from globus_compute_common.messagepack.message_types import (
EPStatusReport,
TaskTransition,
)
from globus_compute_endpoint.engines.base import (
GlobusComputeEngineBase,
ReportingThread,
)
from parsl.executors.high_throughput.executor import HighThroughputExecutor

logger = logging.getLogger(__name__)


class GlobusComputeEngine(GlobusComputeEngineBase):
def __init__(
self,
*args,
label: str = "GlobusComputeEngine",
address: t.Optional[str] = None,
heartbeat_period_s: float = 30.0,
**kwargs,
):
self.address = address
self.run_dir = os.getcwd()
self.label = label
self._status_report_thread = ReportingThread(
target=self.report_status, args=[], reporting_period=heartbeat_period_s
)
super().__init__(*args, heartbeat_period_s=heartbeat_period_s, **kwargs)
self.executor = HighThroughputExecutor( # type: ignore
*args, address=address, **kwargs
)

def start(
self,
*args,
endpoint_id: t.Optional[uuid.UUID] = None,
run_dir: t.Optional[str] = None,
results_passthrough: t.Optional[multiprocessing.Queue] = None,
**kwargs,
):
assert run_dir, "GCExecutor requires kwarg:run_dir at start"
assert endpoint_id, "GCExecutor requires kwarg:endpoint_id at start"
self.run_dir = os.path.join(os.getcwd(), run_dir)
self.endpoint_id = endpoint_id
self.executor.provider.script_dir = os.path.join(self.run_dir, "submit_scripts")
os.makedirs(self.executor.provider.script_dir, exist_ok=True)
if results_passthrough:
# Only update the default queue in GCExecutorBase if
# a queue is passed in
self.results_passthrough = results_passthrough
self.executor.start()
self._status_report_thread.start()

def _submit(
self,
func: t.Callable,
*args: t.Any,
**kwargs: t.Any,
) -> Future:
return self.executor.submit(func, {}, *args, **kwargs)

def get_status_report(self) -> EPStatusReport:
"""
endpoint_id: uuid.UUID
ep_status_report: t.Dict[str, t.Any]
task_statuses: t.Dict[str, t.List[TaskTransition]]
Returns
-------
"""
executor_status: t.Dict[str, t.Any] = {
"task_id": -2,
"info": {
"total_cores": 0,
"total_mem": 0,
"new_core_hrs": 0,
"total_core_hrs": 0,
"managers": 0,
"active_managers": 0,
"total_workers": 0,
"idle_workers": 0,
"pending_tasks": 0,
"outstanding_tasks": 0,
"worker_mode": 0,
"scheduler_mode": 0,
"scaling_enabled": False,
"mem_per_worker": 0,
"cores_per_worker": 0,
"prefetch_capacity": 0,
"max_blocks": 1,
"min_blocks": 1,
"max_workers_per_node": 0,
"nodes_per_block": 1,
"heartbeat_period": self._heartbeat_period_s,
},
}
task_status_deltas: t.Dict[str, t.List[TaskTransition]] = {}
return EPStatusReport(
endpoint_id=self.endpoint_id,
ep_status_report=executor_status,
task_statuses=task_status_deltas,
)

def shutdown(self):
self._status_report_thread.stop()
return self.executor.shutdown()
Loading

0 comments on commit 11e07e6

Please sign in to comment.