Skip to content

Commit

Permalink
update runner introducing db connections and removing local files for…
Browse files Browse the repository at this point in the history
… history, filters, images objects
  • Loading branch information
mfranzon committed Jan 7, 2025
1 parent 815944d commit 30d82f7
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 191 deletions.
21 changes: 11 additions & 10 deletions fractal_server/app/routes/api/v2/status.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import json
from pathlib import Path
from typing import Optional

from fastapi import APIRouter
Expand All @@ -10,6 +8,7 @@
from .....logger import set_logger
from ....db import AsyncSession
from ....db import get_async_db
from ....models.v2 import DatasetV2
from ....models.v2 import JobV2
from ....schemas.v2.dataset import WorkflowTaskStatusTypeV2
from ....schemas.v2.status import StatusReadV2
Expand All @@ -18,7 +17,6 @@
from ._aux_functions import _get_workflow_check_owner
from fractal_server.app.models import UserOAuth
from fractal_server.app.routes.auth import current_active_user
from fractal_server.app.runner.filenames import HISTORY_FILENAME

router = APIRouter()

Expand Down Expand Up @@ -136,13 +134,16 @@ async def get_workflowtask_status(
# Highest priority: Read status updates coming from the running-job
# temporary file. Note: this file only contains information on
# WorkflowTask's that ran through successfully.
tmp_file = Path(running_job.working_dir) / HISTORY_FILENAME
try:
with tmp_file.open("r") as f:
history = json.load(f)
except FileNotFoundError:
history = []
for history_item in history:
# tmp_file = Path(running_job.working_dir) / HISTORY_FILENAME
# try:
# with tmp_file.open("r") as f:
# history = json.load(f)
# except FileNotFoundError:
# history = []
db_dataset = await db.get(
DatasetV2, dataset_id, populate_existing=True
)
for history_item in db_dataset.history:
wftask_id = history_item["workflowtask"]["id"]
wftask_status = history_item["status"]
workflow_tasks_status_dict[wftask_id] = wftask_status
Expand Down
80 changes: 40 additions & 40 deletions fractal_server/app/runner/v2/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from typing import Optional

from sqlalchemy.orm import Session as DBSyncSession
from sqlalchemy.orm.attributes import flag_modified

from ....config import get_settings
from ....logger import get_logger
Expand All @@ -38,12 +37,13 @@
)
from ._slurm_ssh import process_workflow as slurm_ssh_process_workflow
from ._slurm_sudo import process_workflow as slurm_sudo_process_workflow
from .handle_failed_job import assemble_filters_failed_job
from .handle_failed_job import assemble_history_failed_job
from .handle_failed_job import assemble_images_failed_job
from fractal_server import __VERSION__
from fractal_server.app.models import UserSettings

# from .handle_failed_job import assemble_filters_failed_job
# from .handle_failed_job import assemble_images_failed_job

_backends = {}
_backends["local"] = local_process_workflow
_backends["slurm"] = slurm_sudo_process_workflow
Expand Down Expand Up @@ -115,7 +115,6 @@ async def submit_workflow(
logger = set_logger(logger_name=logger_name)

with next(DB.get_sync_db()) as db_sync:

try:
job: Optional[JobV2] = db_sync.get(JobV2, job_id)
dataset: Optional[DatasetV2] = db_sync.get(DatasetV2, dataset_id)
Expand Down Expand Up @@ -322,7 +321,8 @@ async def submit_workflow(
db_sync = next(DB.get_sync_db())
db_sync.close()

new_dataset_attributes = await process_workflow(
# new_dataset_attributes =
await process_workflow(
workflow=workflow,
dataset=dataset,
workflow_dir_local=WORKFLOW_DIR_LOCAL,
Expand All @@ -339,14 +339,14 @@ async def submit_workflow(
f"more logs at {str(log_file_path)}"
)
logger.debug(f'END workflow "{workflow.name}"')

# Update dataset attributes, in case of successful execution
dataset.history.extend(new_dataset_attributes["history"])
dataset.filters = new_dataset_attributes["filters"]
dataset.images = new_dataset_attributes["images"]
for attribute_name in ["filters", "history", "images"]:
flag_modified(dataset, attribute_name)
db_sync.merge(dataset)
#
# # Update dataset attributes, in case of successful execution
# dataset.history.extend(new_dataset_attributes["history"])
# dataset.filters = new_dataset_attributes["filters"]
# dataset.images = new_dataset_attributes["images"]
# for attribute_name in ["filters", "history", "images"]:
# flag_modified(dataset, attribute_name)
# db_sync.merge(dataset)

# Update job DB entry
job.status = JobStatusTypeV2.DONE
Expand All @@ -358,27 +358,27 @@ async def submit_workflow(
db_sync.commit()

except TaskExecutionError as e:

logger.debug(f'FAILED workflow "{workflow.name}", TaskExecutionError.')
logger.info(f'Workflow "{workflow.name}" failed (TaskExecutionError).')

# Read dataset attributes produced by the last successful task, and
# update the DB dataset accordingly
failed_wftask = db_sync.get(WorkflowTaskV2, e.workflow_task_id)
dataset.history = assemble_history_failed_job(
# dataset.history =
assemble_history_failed_job(
job,
dataset,
workflow,
logger_name=logger_name,
failed_wftask=failed_wftask,
)
latest_filters = assemble_filters_failed_job(job)
if latest_filters is not None:
dataset.filters = latest_filters
latest_images = assemble_images_failed_job(job)
if latest_images is not None:
dataset.images = latest_images
db_sync.merge(dataset)
# latest_filters = assemble_filters_failed_job(job)
# if latest_filters is not None:
# dataset.filters = latest_filters
# latest_images = assemble_images_failed_job(job)
# if latest_images is not None:
# dataset.images = latest_images
# db_sync.merge(dataset)

exception_args_string = "\n".join(e.args)
log_msg = (
Expand All @@ -390,25 +390,25 @@ async def submit_workflow(
fail_job(db=db_sync, job=job, log_msg=log_msg, logger_name=logger_name)

except JobExecutionError as e:

logger.debug(f'FAILED workflow "{workflow.name}", JobExecutionError.')
logger.info(f'Workflow "{workflow.name}" failed (JobExecutionError).')

# Read dataset attributes produced by the last successful task, and
# update the DB dataset accordingly
dataset.history = assemble_history_failed_job(
# dataset.history =
assemble_history_failed_job(
job,
dataset,
workflow,
logger_name=logger_name,
)
latest_filters = assemble_filters_failed_job(job)
if latest_filters is not None:
dataset.filters = latest_filters
latest_images = assemble_images_failed_job(job)
if latest_images is not None:
dataset.images = latest_images
db_sync.merge(dataset)
# latest_filters = assemble_filters_failed_job(job)
# if latest_filters is not None:
# dataset.filters = latest_filters
# latest_images = assemble_images_failed_job(job)
# if latest_images is not None:
# dataset.images = latest_images
# db_sync.merge(dataset)

fail_job(
db=db_sync,
Expand All @@ -421,27 +421,27 @@ async def submit_workflow(
)

except Exception:

logger.debug(f'FAILED workflow "{workflow.name}", unknown error.')
logger.info(f'Workflow "{workflow.name}" failed (unkwnon error).')

current_traceback = traceback.format_exc()

# Read dataset attributes produced by the last successful task, and
# update the DB dataset accordingly
dataset.history = assemble_history_failed_job(
# dataset.history =
assemble_history_failed_job(
job,
dataset,
workflow,
logger_name=logger_name,
)
latest_filters = assemble_filters_failed_job(job)
if latest_filters is not None:
dataset.filters = latest_filters
latest_images = assemble_images_failed_job(job)
if latest_images is not None:
dataset.images = latest_images
db_sync.merge(dataset)
# latest_filters = assemble_filters_failed_job(job)
# if latest_filters is not None:
# dataset.filters = latest_filters
# latest_images = assemble_images_failed_job(job)
# if latest_images is not None:
# dataset.images = latest_images
# db_sync.merge(dataset)
fail_job(
db=db_sync,
job=job,
Expand Down
135 changes: 35 additions & 100 deletions fractal_server/app/runner/v2/handle_failed_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,15 @@
"""
Helper functions to handle Dataset history.
"""
import json
import logging
from pathlib import Path
from typing import Any
from typing import Optional

from ...models.v2 import DatasetV2
from ...models.v2 import JobV2
from ...models.v2 import WorkflowTaskV2
from ...models.v2 import WorkflowV2
from ...schemas.v2 import WorkflowTaskStatusTypeV2
from ..filenames import FILTERS_FILENAME
from ..filenames import HISTORY_FILENAME
from ..filenames import IMAGES_FILENAME
from fractal_server.app.db import get_sync_db


def assemble_history_failed_job(
Expand All @@ -34,7 +29,7 @@ def assemble_history_failed_job(
workflow: WorkflowV2,
logger_name: Optional[str] = None,
failed_wftask: Optional[WorkflowTaskV2] = None,
) -> list[dict[str, Any]]:
) -> None:
"""
Assemble `history` after a workflow-execution job fails.
Expand Down Expand Up @@ -62,97 +57,37 @@ def assemble_history_failed_job(
# parts, coming from: the database, the temporary file, the failed-task
# information.

# Part 1: Read exising history from DB
new_history = dataset.history

# Part 2: Extend history based on temporary-file contents
tmp_history_file = Path(job.working_dir) / HISTORY_FILENAME
try:
with tmp_history_file.open("r") as f:
tmp_file_history = json.load(f)
new_history.extend(tmp_file_history)
except FileNotFoundError:
tmp_file_history = []

# Part 3/A: Identify failed task, if needed
if failed_wftask is None:
job_wftasks = workflow.task_list[
job.first_task_index : (job.last_task_index + 1) # noqa
]
tmp_file_wftasks = [
history_item["workflowtask"] for history_item in tmp_file_history
]
if len(job_wftasks) <= len(tmp_file_wftasks):
n_tasks_job = len(job_wftasks)
n_tasks_tmp = len(tmp_file_wftasks)
logger.error(
"Cannot identify the failed task based on job task list "
f"(length {n_tasks_job}) and temporary-file task list "
f"(length {n_tasks_tmp})."
with next(get_sync_db()) as db:
db_dataset = db.get(dataset)

# Part 1/A: Identify failed task, if needed
if failed_wftask is None:
job_wftasks = workflow.task_list[
job.first_task_index : (job.last_task_index + 1) # noqa
]
tmp_file_wftasks = [
history_item["workflowtask"]
for history_item in db_dataset.history
]
if len(job_wftasks) <= len(tmp_file_wftasks):
n_tasks_job = len(job_wftasks)
n_tasks_tmp = len(tmp_file_wftasks)
logger.error(
"Cannot identify the failed task based on job task list "
f"(length {n_tasks_job}) and temporary-file task list "
f"(length {n_tasks_tmp})."
)
logger.error("Failed task not appended to history.")
else:
failed_wftask = job_wftasks[len(tmp_file_wftasks)]

# Part 1/B: Append failed task to history
if failed_wftask is not None:
failed_wftask_dump = failed_wftask.model_dump(exclude={"task"})
failed_wftask_dump["task"] = failed_wftask.task.model_dump()
new_history_item = dict(
workflowtask=failed_wftask_dump,
status=WorkflowTaskStatusTypeV2.FAILED,
parallelization=dict(), # FIXME: re-include parallelization
)
logger.error("Failed task not appended to history.")
else:
failed_wftask = job_wftasks[len(tmp_file_wftasks)]

# Part 3/B: Append failed task to history
if failed_wftask is not None:
failed_wftask_dump = failed_wftask.model_dump(exclude={"task"})
failed_wftask_dump["task"] = failed_wftask.task.model_dump()
new_history_item = dict(
workflowtask=failed_wftask_dump,
status=WorkflowTaskStatusTypeV2.FAILED,
parallelization=dict(), # FIXME: re-include parallelization
)
new_history.append(new_history_item)

return new_history


def assemble_images_failed_job(job: JobV2) -> Optional[dict[str, Any]]:
"""
Assemble `DatasetV2.images` for a failed workflow-execution.
Assemble new value of `images` based on the last successful task, i.e.
based on the content of the temporary `IMAGES_FILENAME` file. If the file
is missing, return `None`.
Argumentss:
job:
The failed `JobV2` object.
Returns:
The new value of `dataset.images`, or `None` if `IMAGES_FILENAME`
is missing.
"""
tmp_file = Path(job.working_dir) / IMAGES_FILENAME
try:
with tmp_file.open("r") as f:
new_images = json.load(f)
return new_images
except FileNotFoundError:
return None


def assemble_filters_failed_job(job: JobV2) -> Optional[dict[str, Any]]:
"""
Assemble `DatasetV2.filters` for a failed workflow-execution.
Assemble new value of `filters` based on the last successful task, i.e.
based on the content of the temporary `FILTERS_FILENAME` file. If the file
is missing, return `None`.
Argumentss:
job:
The failed `JobV2` object.
Returns:
The new value of `dataset.filters`, or `None` if `FILTERS_FILENAME`
is missing.
"""
tmp_file = Path(job.working_dir) / FILTERS_FILENAME
try:
with tmp_file.open("r") as f:
new_filters = json.load(f)
return new_filters
except FileNotFoundError:
return None
db_dataset.history.extend(new_history_item)
Loading

0 comments on commit 30d82f7

Please sign in to comment.