From 857300b615c3a742e7c48caa5d32dcadc307f983 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 29 Mar 2024 13:08:40 +0000 Subject: [PATCH] Factor radio selection in monitoring worker-side code Co-authored-by: Sicheng Zhou <111474133+ClaudiaCumberbatch@users.noreply.github.com> --- parsl/monitoring/remote.py | 42 ++++++++++++++++---------------------- 1 file changed, 18 insertions(+), 24 deletions(-) diff --git a/parsl/monitoring/remote.py b/parsl/monitoring/remote.py index 6cd7385e16..8d773b7157 100644 --- a/parsl/monitoring/remote.py +++ b/parsl/monitoring/remote.py @@ -96,6 +96,22 @@ def wrapped(*args: List[Any], **kwargs: Dict[str, Any]) -> Any: return (wrapped, args, new_kwargs) +def get_radio(radio_mode: str, monitoring_hub_url: str, task_id: int, run_dir: str) -> MonitoringRadio: + radio: MonitoringRadio + if radio_mode == "udp": + radio = UDPRadio(monitoring_hub_url, + source_id=task_id) + elif radio_mode == "htex": + radio = HTEXRadio(monitoring_hub_url, + source_id=task_id) + elif radio_mode == "filesystem": + radio = FilesystemRadio(monitoring_url=monitoring_hub_url, + source_id=task_id, run_dir=run_dir) + else: + raise RuntimeError(f"Unknown radio mode: {radio_mode}") + return radio + + @wrap_with_logs def send_first_message(try_id: int, task_id: int, @@ -122,18 +138,7 @@ def send_first_last_message(try_id: int, import platform import os - radio: MonitoringRadio - if radio_mode == "udp": - radio = UDPRadio(monitoring_hub_url, - source_id=task_id) - elif radio_mode == "htex": - radio = HTEXRadio(monitoring_hub_url, - source_id=task_id) - elif radio_mode == "filesystem": - radio = FilesystemRadio(monitoring_url=monitoring_hub_url, - source_id=task_id, run_dir=run_dir) - else: - raise RuntimeError(f"Unknown radio mode: {radio_mode}") + radio = get_radio(radio_mode, monitoring_hub_url, task_id, run_dir) msg = (MessageType.RESOURCE_INFO, {'run_id': run_id, @@ -178,18 +183,7 @@ def monitor(pid: int, setproctitle("parsl: task resource monitor") - radio: MonitoringRadio - if radio_mode == "udp": - radio = UDPRadio(monitoring_hub_url, - source_id=task_id) - elif radio_mode == "htex": - radio = HTEXRadio(monitoring_hub_url, - source_id=task_id) - elif radio_mode == "filesystem": - radio = FilesystemRadio(monitoring_url=monitoring_hub_url, - source_id=task_id, run_dir=run_dir) - else: - raise RuntimeError(f"Unknown radio mode: {radio_mode}") + radio = get_radio(radio_mode, monitoring_hub_url, task_id, run_dir) logging.debug("start of monitor")