Skip to content

Commit

Permalink
This commit fixes:
Browse files Browse the repository at this point in the history
- stage in/out of files for parsl tasks
- setting stdout/err for parsl tasks
- setting exception for parsl tasks
- bash_apps exit codes
  • Loading branch information
AymenFJA committed Nov 6, 2023
1 parent 99c7d67 commit cfff365
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 24 deletions.
86 changes: 63 additions & 23 deletions parsl/executors/radical/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from functools import partial
from typing import Optional, Dict
from pathlib import Path, PosixPath
from concurrent.futures import Future

from .rpex_resources import ResourceConfig
Expand All @@ -26,8 +27,10 @@

RPEX = 'RPEX'
BASH = 'bash'
CWD = os.getcwd()
PYTHON = 'python'
os.environ["RADICAL_REPORT"] = "False"
PWD = os.path.abspath(os.path.dirname(__file__))

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -167,10 +170,15 @@ def task_state_cb(self, task, state):
elif state == rp.FAILED:
if task.description['mode'] in [rp.TASK_EXEC,
rp.TASK_EXECUTABLE]:
parsl_task.set_exception(BashExitFailure(task.name,
task.exit_code))
# for some reason RP sometimes report a
# task with exit code 0 as FAILED
if task.exit_code == 0:
parsl_task.set_result(int(task.exit_code))
else:
parsl_task.set_exception(BashExitFailure(task.name,
task.exit_code))
else:
parsl_task.set_exception(AppException(task.stderr))
parsl_task.set_exception(eval(task.exception))

def start(self):
"""Create the Pilot component and pass it.
Expand Down Expand Up @@ -212,25 +220,18 @@ def start(self):
pd = rp.PilotDescription(pd_init)

tds = list()
executor_path = os.path.abspath(os.path.dirname(__file__))
master_path = '{0}/rpex_master.py'.format(executor_path)
master_path = '{0}/rpex_master.py'.format(PWD)

for i in range(self.n_masters):
td = rp.TaskDescription(self.master)
td.mode = rp.RAPTOR_MASTER
td.uid = ru.generate_id('master.%(item_counter)06d', ru.ID_CUSTOM,
ns=self.session.uid)
td.arguments = [self.rpex_cfg, i]
td.ranks = 1
td.cores_per_rank = 1
td.input_staging = [{'source': master_path,
'target': 'rpex_master.py',
'action': rp.TRANSFER,
'flags': rp.DEFAULT_FLAGS},
{'source': self.rpex_cfg,
'target': os.path.basename(self.rpex_cfg),
'action': rp.TRANSFER,
'flags': rp.DEFAULT_FLAGS}]
td.arguments = [self.rpex_cfg, i]
td.input_staging = self._stage_files([File(master_path),
File(self.rpex_cfg)], mode='in')
tds.append(td)

self.pmgr = rp.PilotManager(session=self.session)
Expand Down Expand Up @@ -343,9 +344,14 @@ def task_translate(self, tid, func, args, kwargs):
raise Exception("failed to obtain bash app cmd") from e

task.mode = rp.TASK_EXECUTABLE
task.executable = '/bin/bash'
task.arguments = ['-c', bash_app]

# workaround for the difference between
# RP execution of executables (bashapps)
# and Parsl execution.
bashapp_file = self._map_bash_app_to_file(bash_app, tid)
task.executable = '/bin/bash'
task.arguments = [bashapp_file]
task.input_staging = [bashapp_file]
# specifying pre_exec is only for executables
task.pre_exec = kwargs.get('pre_exec', [])

Expand All @@ -366,18 +372,52 @@ def task_translate(self, tid, func, args, kwargs):
task.output_staging = self._stage_files(kwargs.get("outputs", []),
mode='out')

stderr_stdout = ['stdout', 'stderr']
for k in stderr_stdout:
k_val = kwargs.get(k, '')
if k_val:
out_err = k_val.split('/')
setattr(task, k, out_err[-1])
task.sandbox = '/'.join(out_err[:-1])
self._set_stdout_stderr(task, kwargs)

task.timeout = kwargs.get('walltime', 0.0)

return task

def _map_bash_app_to_file(self, bash_app, bash_app_id):
"""
This function writes the command of
a bash_app in a file and pass it to
RP as an executable to fix:
https://github.com/Parsl/parsl/pull/2923#issuecomment-1790895266
"""
file_path = f"{self.run_dir}/{bash_app_id}.sh"
shell_script = f"""
#/bin/bash
{bash_app}
"""
with open(file_path, 'w') as f:
f.write(shell_script)
return file_path

def _set_stdout_stderr(self, task, kwargs):
"""
set the stdout and stderr of a task
"""
for k in ['stdout', 'stderr']:
k_val = kwargs.get(k, '')
if k_val:
# check the type of the stderr/out
if isinstance(k_val, File):
k_val = k_val.filepath
elif isinstance(k_val, PosixPath):
k_val = k_val.__str__()

# if the stderr/out has no path
# then we consider it local and
# we just set the path to the cwd
if '/' not in k_val:
k_val = CWD + '/' + k_val

# finally set the stderr/out to
# the desired name by the user
setattr(task, k, k_val)
task.sandbox = Path(k_val).parent.__str__()

def _stage_files(self, files, mode):
"""
a function to stage list of input/output a
Expand Down
2 changes: 1 addition & 1 deletion parsl/tests/configs/local_radical.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
# this fork.
parsl_src = "pip install git+https://github.com/AymenFJA/parsl.git"
rpex_cfg = ResourceConfig()
rpex_cfg.pilot_env_setup.append(parsl_src)
rpex_cfg.pilot_env_setup.extend([parsl_src, "pytest", "pandas"])


def fresh_config():
Expand Down

0 comments on commit cfff365

Please sign in to comment.