Skip to content

Commit

Permalink
Merge pull request #1728 from fractal-analytics-platform/1727-move-re…
Browse files Browse the repository at this point in the history
…mote-folder-creation-to-the-fractalsshslurmexecutor

Move SSH remote-folder creation to `_process_workkflow` (ref #1727)
  • Loading branch information
tcompa authored Sep 4, 2024
2 parents 5b4be90 + b5593ba commit eb7a8d2
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 6 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

# 2.3.11 (Unreleased)

* SSH runner:
* Move remote-folder creation from `submit_workflow` to more specific `_process_workflow` (\#1728).
* Benchmarks:
* Add `GET /auth/token/login/` to tested endpoints (\#1720).
* Testing:
Expand Down
6 changes: 1 addition & 5 deletions fractal_server/app/runner/v2/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,15 +194,11 @@ async def submit_workflow(
folder=str(WORKFLOW_DIR_REMOTE), user=slurm_user
)
elif FRACTAL_RUNNER_BACKEND == "slurm_ssh":
# Folder creation is deferred to _process_workflow
WORKFLOW_DIR_REMOTE = (
Path(settings.FRACTAL_SLURM_SSH_WORKING_BASE_DIR)
/ WORKFLOW_DIR_LOCAL.name
)
# FIXME SSH: move mkdir to executor, likely within handshake
fractal_ssh.mkdir(
folder=str(WORKFLOW_DIR_REMOTE),
)
logger.info(f"Created {str(WORKFLOW_DIR_REMOTE)} via SSH.")
else:
logger.error(
"Invalid FRACTAL_RUNNER_BACKEND="
Expand Down
17 changes: 17 additions & 0 deletions fractal_server/app/runner/v2/_slurm_ssh/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,15 @@
from ....models.v2 import DatasetV2
from ....models.v2 import WorkflowV2
from ...async_wrap import async_wrap
from ...exceptions import JobExecutionError
from ...executors.slurm.ssh.executor import FractalSlurmSSHExecutor
from ...set_start_and_last_task_index import set_start_and_last_task_index
from ..runner import execute_tasks_v2
from ._submit_setup import _slurm_submit_setup
from fractal_server.logger import set_logger


logger = set_logger(__name__)


def _process_workflow(
Expand Down Expand Up @@ -60,6 +65,18 @@ def _process_workflow(
if isinstance(worker_init, str):
worker_init = worker_init.split("\n")

# Create main remote folder
try:
fractal_ssh.mkdir(folder=str(workflow_dir_remote))
logger.info(f"Created {str(workflow_dir_remote)} via SSH.")
except Exception as e:
error_msg = (
f"Could not create {str(workflow_dir_remote)} via SSH.\n"
f"Original error: {str(e)}."
)
logger.error(error_msg)
raise JobExecutionError(info=error_msg)

with FractalSlurmSSHExecutor(
fractal_ssh=fractal_ssh,
workflow_dir_local=workflow_dir_local,
Expand Down
16 changes: 15 additions & 1 deletion tests/v2/08_full_workflow/common_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,15 @@ async def workflow_with_non_python_task(
task_factory_v2,
tmp777_path: Path,
additional_user_kwargs=None,
):
this_should_fail: bool = False,
) -> str:
"""
Run a non-python-task Fractal job.
Returns:
String with job logs.
"""

user_kwargs = {"is_verified": True}
if additional_user_kwargs is not None:
user_kwargs.update(additional_user_kwargs)
Expand Down Expand Up @@ -571,6 +579,10 @@ async def workflow_with_non_python_task(
job_status_data = res.json()
debug(job_status_data)

if this_should_fail:
assert job_status_data["status"] == "failed"
return job_status_data["log"]

assert job_status_data["status"] == "done"
debug(job_status_data["end_timestamp"])
assert job_status_data["end_timestamp"]
Expand Down Expand Up @@ -599,3 +611,5 @@ async def workflow_with_non_python_task(
log = file.read().decode("utf-8")
assert "This goes to standard output" in log
assert "This goes to standard error" in log

return job_status_data["log"]
57 changes: 57 additions & 0 deletions tests/v2/08_full_workflow/test_full_workflow_slurm_ssh_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,60 @@ async def test_workflow_with_non_python_task_slurm_ssh(
task_factory_v2=task_factory_v2,
tmp777_path=tmp777_path,
)

app.state.fractal_ssh.close()


async def test_workflow_with_non_python_task_slurm_ssh_fail(
client,
app,
MockCurrentUser,
testdata_path,
tmp777_path,
project_factory_v2,
dataset_factory_v2,
workflow_factory_v2,
task_factory_v2,
ssh_alive,
slurmlogin_ip,
monkeypatch,
override_settings_factory,
current_py_version: str,
):
"""
Setup faulty SSH connection (with wrong path to key file) and observe
first failure point.
"""

override_settings_factory(
FRACTAL_RUNNER_BACKEND="slurm_ssh",
FRACTAL_SLURM_WORKER_PYTHON=f"/usr/bin/python{current_py_version}",
FRACTAL_SLURM_SSH_HOST=slurmlogin_ip,
FRACTAL_SLURM_SSH_USER=SLURM_USER,
FRACTAL_SLURM_SSH_PRIVATE_KEY_PATH="/invalid/path",
FRACTAL_SLURM_SSH_WORKING_BASE_DIR=(
tmp777_path / "artifacts"
).as_posix(),
FRACTAL_SLURM_CONFIG_FILE=testdata_path / "slurm_config.json",
)

connection = get_ssh_connection()
app.state.fractal_ssh = FractalSSH(connection=connection)

monkeypatch.setattr("sys.stdin", io.StringIO(""))

job_logs = await workflow_with_non_python_task(
MockCurrentUser=MockCurrentUser,
client=client,
testdata_path=testdata_path,
project_factory_v2=project_factory_v2,
dataset_factory_v2=dataset_factory_v2,
workflow_factory_v2=workflow_factory_v2,
task_factory_v2=task_factory_v2,
tmp777_path=tmp777_path,
this_should_fail=True,
)
assert "Could not create" in job_logs
assert "via SSH" in job_logs

app.state.fractal_ssh.close()

0 comments on commit eb7a8d2

Please sign in to comment.