Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace temporary files with database access in highest level runner module #2169

Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
30d82f7
update runner introducing db connections and removing local files for…
mfranzon Jan 7, 2025
750bfac
Merge branch 'main' into 2135-replace-temporary-files-with-database-a…
mfranzon Jan 7, 2025
0bafd41
fix get call
mfranzon Jan 8, 2025
096aad9
remove populate_existing attribute
mfranzon Jan 8, 2025
ff664d3
get dataset from db in runner
mfranzon Jan 8, 2025
0074c2b
update tests, use db instead of python objects
mfranzon Jan 8, 2025
813a778
remove DatasetV2Mock, use DatasetV2 model
mfranzon Jan 8, 2025
a97c086
add database logic for history update
mfranzon Jan 9, 2025
9e360b6
add database logic for attrs update, move tmp_history, inside loop
mfranzon Jan 9, 2025
42b03c9
remove history,filters,image local files
mfranzon Jan 9, 2025
fae9350
Merge branch 'main' into 2135-replace-temporary-files-with-database-a…
tcompa Jan 9, 2025
d41516c
[skip ci] BROKEN - add submitted status in history, improve history u…
mfranzon Jan 9, 2025
177ada2
Merge branch '2135-replace-temporary-files-with-database-access-in-hi…
mfranzon Jan 9, 2025
de29ce9
add submitted status in history, improve history usage
mfranzon Jan 9, 2025
15d485e
add logic if failed_wftask is None, set last wftask to failed if it i…
mfranzon Jan 9, 2025
cd4351f
add condition if wf has no task
mfranzon Jan 9, 2025
69ac3ff
use db data for runner response, adapt tests
mfranzon Jan 9, 2025
64dd991
fix tests with new status endpoint logic
mfranzon Jan 10, 2025
81f7267
review status endpoint logic, add arg to flake8
mfranzon Jan 13, 2025
e2cbfdc
update test for coverage
mfranzon Jan 13, 2025
4da4d1c
Possible other way of finding SUBMITTED wftasks
tcompa Jan 13, 2025
3b0c1ac
Remove spurious `enumerate`
tcompa Jan 13, 2025
45f00a8
Style
tcompa Jan 13, 2025
e3c5460
Fix test
tcompa Jan 13, 2025
cf26693
Add log
tcompa Jan 13, 2025
8cb3d19
Remove obsolete variable
tcompa Jan 13, 2025
6a10497
style
tcompa Jan 13, 2025
9e3376e
Reintroduce test_workflowtask_status_modified_workflow
tcompa Jan 13, 2025
fe14031
info -> warning
tcompa Jan 13, 2025
daabe56
Remove comment
tcompa Jan 13, 2025
90546cf
[skip ci] remove/update comments
mfranzon Jan 13, 2025
47d01bf
Review filenames.py
tcompa Jan 14, 2025
4d640de
Remove comments [skip ci]
tcompa Jan 14, 2025
e826ac9
Do not return dataset attributes from execute_tasks_v2 (and process-w…
tcompa Jan 14, 2025
9201e7d
BROKEN - start updating tests/v2/04_runner/test_fractal_examples.py
tcompa Jan 14, 2025
ee978f8
refactor tests
mfranzon Jan 14, 2025
8e16efd
BROKEN - start updating tests/v2/04_runner/test_fractal_examples.py
tcompa Jan 14, 2025
e218298
Merge branch '2135-replace-temporary-files-with-database-access-in-hi…
mfranzon Jan 14, 2025
ac2e956
Fix tests/v2/04_runner/test_fractal_examples.py
tcompa Jan 14, 2025
6425917
Style/docstrings/comments
tcompa Jan 14, 2025
57fabf3
refactor test_dummy_example
mfranzon Jan 14, 2025
0b5974b
Merge branch '2135-replace-temporary-files-with-database-access-in-hi…
mfranzon Jan 14, 2025
ef76daf
Remove some output of execute_tasks_v2
tcompa Jan 14, 2025
60f4edd
refactor test_dummy_example
mfranzon Jan 14, 2025
99d8e85
Merge branch '2135-replace-temporary-files-with-database-access-in-hi…
mfranzon Jan 14, 2025
ce8b73f
Review how to set task status to failed
tcompa Jan 14, 2025
23e5151
Style [skip ci]
tcompa Jan 14, 2025
6de6757
Merge branch 'dev-2.11' into 2135-replace-temporary-files-with-databa…
tcompa Jan 14, 2025
1a6e86a
fix arg, update test
mfranzon Jan 14, 2025
b8099a8
Merge branch '2135-replace-temporary-files-with-database-access-in-hi…
mfranzon Jan 14, 2025
b7ef0ec
CHANGELOG [skip ci]
tcompa Jan 14, 2025
d2ebb5a
Trigger GHA workflows also for `dev-2.11` branch
tcompa Jan 14, 2025
95f6bf7
Handle empty-history case in mark_last_wftask_as_failed
tcompa Jan 14, 2025
30ab975
Fix comment
tcompa Jan 14, 2025
34879b7
Fix f string
tcompa Jan 14, 2025
3d56db2
Remove db.refresh
tcompa Jan 14, 2025
74dfe65
Add test_unit_mark_last_wftask_as_failed
tcompa Jan 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions fractal_server/app/routes/api/v2/status.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import json
from pathlib import Path
from typing import Optional

from fastapi import APIRouter
Expand All @@ -10,6 +8,7 @@
from .....logger import set_logger
from ....db import AsyncSession
from ....db import get_async_db
from ....models.v2 import DatasetV2
from ....models.v2 import JobV2
from ....schemas.v2.dataset import WorkflowTaskStatusTypeV2
from ....schemas.v2.status import StatusReadV2
Expand All @@ -18,7 +17,6 @@
from ._aux_functions import _get_workflow_check_owner
from fractal_server.app.models import UserOAuth
from fractal_server.app.routes.auth import current_active_user
from fractal_server.app.runner.filenames import HISTORY_FILENAME

router = APIRouter()

Expand Down Expand Up @@ -136,13 +134,14 @@ async def get_workflowtask_status(
# Highest priority: Read status updates coming from the running-job
# temporary file. Note: this file only contains information on
# WorkflowTask's that ran through successfully.
tmp_file = Path(running_job.working_dir) / HISTORY_FILENAME
try:
with tmp_file.open("r") as f:
history = json.load(f)
except FileNotFoundError:
history = []
for history_item in history:
# tmp_file = Path(running_job.working_dir) / HISTORY_FILENAME
# try:
# with tmp_file.open("r") as f:
# history = json.load(f)
# except FileNotFoundError:
# history = []
db_dataset = await db.get(DatasetV2, dataset_id)
for history_item in db_dataset.history:
mfranzon marked this conversation as resolved.
Show resolved Hide resolved
wftask_id = history_item["workflowtask"]["id"]
wftask_status = history_item["status"]
workflow_tasks_status_dict[wftask_id] = wftask_status
Expand Down
80 changes: 40 additions & 40 deletions fractal_server/app/runner/v2/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from typing import Optional

from sqlalchemy.orm import Session as DBSyncSession
from sqlalchemy.orm.attributes import flag_modified

from ....config import get_settings
from ....logger import get_logger
Expand All @@ -38,12 +37,13 @@
)
from ._slurm_ssh import process_workflow as slurm_ssh_process_workflow
from ._slurm_sudo import process_workflow as slurm_sudo_process_workflow
from .handle_failed_job import assemble_filters_failed_job
from .handle_failed_job import assemble_history_failed_job
from .handle_failed_job import assemble_images_failed_job
from fractal_server import __VERSION__
from fractal_server.app.models import UserSettings

# from .handle_failed_job import assemble_filters_failed_job
# from .handle_failed_job import assemble_images_failed_job

_backends = {}
_backends["local"] = local_process_workflow
_backends["slurm"] = slurm_sudo_process_workflow
Expand Down Expand Up @@ -115,7 +115,6 @@ async def submit_workflow(
logger = set_logger(logger_name=logger_name)

with next(DB.get_sync_db()) as db_sync:

try:
job: Optional[JobV2] = db_sync.get(JobV2, job_id)
dataset: Optional[DatasetV2] = db_sync.get(DatasetV2, dataset_id)
Expand Down Expand Up @@ -322,7 +321,8 @@ async def submit_workflow(
db_sync = next(DB.get_sync_db())
db_sync.close()

new_dataset_attributes = await process_workflow(
# new_dataset_attributes =
await process_workflow(
workflow=workflow,
dataset=dataset,
workflow_dir_local=WORKFLOW_DIR_LOCAL,
Expand All @@ -339,14 +339,14 @@ async def submit_workflow(
f"more logs at {str(log_file_path)}"
)
logger.debug(f'END workflow "{workflow.name}"')

# Update dataset attributes, in case of successful execution
dataset.history.extend(new_dataset_attributes["history"])
dataset.filters = new_dataset_attributes["filters"]
dataset.images = new_dataset_attributes["images"]
for attribute_name in ["filters", "history", "images"]:
flag_modified(dataset, attribute_name)
db_sync.merge(dataset)
#
# # Update dataset attributes, in case of successful execution
# dataset.history.extend(new_dataset_attributes["history"])
# dataset.filters = new_dataset_attributes["filters"]
# dataset.images = new_dataset_attributes["images"]
# for attribute_name in ["filters", "history", "images"]:
# flag_modified(dataset, attribute_name)
# db_sync.merge(dataset)

# Update job DB entry
job.status = JobStatusTypeV2.DONE
Expand All @@ -358,27 +358,27 @@ async def submit_workflow(
db_sync.commit()

except TaskExecutionError as e:

logger.debug(f'FAILED workflow "{workflow.name}", TaskExecutionError.')
logger.info(f'Workflow "{workflow.name}" failed (TaskExecutionError).')

# Read dataset attributes produced by the last successful task, and
# update the DB dataset accordingly
failed_wftask = db_sync.get(WorkflowTaskV2, e.workflow_task_id)
dataset.history = assemble_history_failed_job(
# dataset.history =
assemble_history_failed_job(
job,
dataset,
workflow,
logger_name=logger_name,
failed_wftask=failed_wftask,
)
latest_filters = assemble_filters_failed_job(job)
if latest_filters is not None:
dataset.filters = latest_filters
latest_images = assemble_images_failed_job(job)
if latest_images is not None:
dataset.images = latest_images
db_sync.merge(dataset)
# latest_filters = assemble_filters_failed_job(job)
# if latest_filters is not None:
# dataset.filters = latest_filters
# latest_images = assemble_images_failed_job(job)
# if latest_images is not None:
# dataset.images = latest_images
# db_sync.merge(dataset)

exception_args_string = "\n".join(e.args)
log_msg = (
Expand All @@ -390,25 +390,25 @@ async def submit_workflow(
fail_job(db=db_sync, job=job, log_msg=log_msg, logger_name=logger_name)

except JobExecutionError as e:

logger.debug(f'FAILED workflow "{workflow.name}", JobExecutionError.')
logger.info(f'Workflow "{workflow.name}" failed (JobExecutionError).')

# Read dataset attributes produced by the last successful task, and
# update the DB dataset accordingly
dataset.history = assemble_history_failed_job(
# dataset.history =
assemble_history_failed_job(
job,
dataset,
workflow,
logger_name=logger_name,
)
latest_filters = assemble_filters_failed_job(job)
if latest_filters is not None:
dataset.filters = latest_filters
latest_images = assemble_images_failed_job(job)
if latest_images is not None:
dataset.images = latest_images
db_sync.merge(dataset)
# latest_filters = assemble_filters_failed_job(job)
# if latest_filters is not None:
# dataset.filters = latest_filters
# latest_images = assemble_images_failed_job(job)
# if latest_images is not None:
# dataset.images = latest_images
# db_sync.merge(dataset)

fail_job(
db=db_sync,
Expand All @@ -421,27 +421,27 @@ async def submit_workflow(
)

except Exception:

logger.debug(f'FAILED workflow "{workflow.name}", unknown error.')
logger.info(f'Workflow "{workflow.name}" failed (unkwnon error).')

current_traceback = traceback.format_exc()

# Read dataset attributes produced by the last successful task, and
# update the DB dataset accordingly
dataset.history = assemble_history_failed_job(
# dataset.history =
assemble_history_failed_job(
job,
dataset,
workflow,
logger_name=logger_name,
)
latest_filters = assemble_filters_failed_job(job)
if latest_filters is not None:
dataset.filters = latest_filters
latest_images = assemble_images_failed_job(job)
if latest_images is not None:
dataset.images = latest_images
db_sync.merge(dataset)
# latest_filters = assemble_filters_failed_job(job)
# if latest_filters is not None:
# dataset.filters = latest_filters
# latest_images = assemble_images_failed_job(job)
# if latest_images is not None:
# dataset.images = latest_images
# db_sync.merge(dataset)
fail_job(
db=db_sync,
job=job,
Expand Down
139 changes: 40 additions & 99 deletions fractal_server/app/runner/v2/handle_failed_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,17 @@
"""
Helper functions to handle Dataset history.
"""
import json
import logging
from pathlib import Path
from typing import Any
from typing import Optional

from sqlalchemy.orm.attributes import flag_modified

from ...models.v2 import DatasetV2
from ...models.v2 import JobV2
from ...models.v2 import WorkflowTaskV2
from ...models.v2 import WorkflowV2
from ...schemas.v2 import WorkflowTaskStatusTypeV2
from ..filenames import FILTERS_FILENAME
from ..filenames import HISTORY_FILENAME
from ..filenames import IMAGES_FILENAME
from fractal_server.app.db import get_sync_db


def assemble_history_failed_job(
Expand All @@ -34,7 +31,7 @@
workflow: WorkflowV2,
logger_name: Optional[str] = None,
failed_wftask: Optional[WorkflowTaskV2] = None,
) -> list[dict[str, Any]]:
) -> None:
"""
Assemble `history` after a workflow-execution job fails.

Expand Down Expand Up @@ -62,97 +59,41 @@
# parts, coming from: the database, the temporary file, the failed-task
# information.

# Part 1: Read exising history from DB
new_history = dataset.history

# Part 2: Extend history based on temporary-file contents
tmp_history_file = Path(job.working_dir) / HISTORY_FILENAME
try:
with tmp_history_file.open("r") as f:
tmp_file_history = json.load(f)
new_history.extend(tmp_file_history)
except FileNotFoundError:
tmp_file_history = []

# Part 3/A: Identify failed task, if needed
if failed_wftask is None:
job_wftasks = workflow.task_list[
job.first_task_index : (job.last_task_index + 1) # noqa
]
tmp_file_wftasks = [
history_item["workflowtask"] for history_item in tmp_file_history
]
if len(job_wftasks) <= len(tmp_file_wftasks):
n_tasks_job = len(job_wftasks)
n_tasks_tmp = len(tmp_file_wftasks)
logger.error(
"Cannot identify the failed task based on job task list "
f"(length {n_tasks_job}) and temporary-file task list "
f"(length {n_tasks_tmp})."
with next(get_sync_db()) as db:
db_dataset = db.get(DatasetV2, dataset.id)

# Part 1/A: Identify failed task, if needed
if failed_wftask is None:
job_wftasks = workflow.task_list[
job.first_task_index : (job.last_task_index + 1) # noqa
]
tmp_file_wftasks = [
history_item["workflowtask"]
for history_item in db_dataset.history
]
if len(job_wftasks) <= len(tmp_file_wftasks):
n_tasks_job = len(job_wftasks)
n_tasks_tmp = len(tmp_file_wftasks)
logger.error(
"Cannot identify the failed task based on job task list "
f"(length {n_tasks_job}) and temporary-file task list "
f"(length {n_tasks_tmp})."
)
logger.error("Failed task not appended to history.")

Check notice on line 82 in fractal_server/app/runner/v2/handle_failed_job.py

View workflow job for this annotation

GitHub Actions / Coverage

Missing coverage

Missing coverage on lines 75-82
else:
failed_wftask = job_wftasks[len(tmp_file_wftasks)]

# Part 1/B: Append failed task to history
if failed_wftask is not None:
failed_wftask_dump = failed_wftask.model_dump(exclude={"task"})
failed_wftask_dump["task"] = failed_wftask.task.model_dump()
new_history_item = dict(
workflowtask=failed_wftask_dump,
status=WorkflowTaskStatusTypeV2.FAILED,
parallelization=dict(), # FIXME: re-include parallelization
)
logger.error("Failed task not appended to history.")
else:
failed_wftask = job_wftasks[len(tmp_file_wftasks)]

# Part 3/B: Append failed task to history
if failed_wftask is not None:
failed_wftask_dump = failed_wftask.model_dump(exclude={"task"})
failed_wftask_dump["task"] = failed_wftask.task.model_dump()
new_history_item = dict(
workflowtask=failed_wftask_dump,
status=WorkflowTaskStatusTypeV2.FAILED,
parallelization=dict(), # FIXME: re-include parallelization
)
new_history.append(new_history_item)

return new_history


def assemble_images_failed_job(job: JobV2) -> Optional[dict[str, Any]]:
"""
Assemble `DatasetV2.images` for a failed workflow-execution.

Assemble new value of `images` based on the last successful task, i.e.
based on the content of the temporary `IMAGES_FILENAME` file. If the file
is missing, return `None`.

Argumentss:
job:
The failed `JobV2` object.
db_dataset.history.append(new_history_item)

Returns:
The new value of `dataset.images`, or `None` if `IMAGES_FILENAME`
is missing.
"""
tmp_file = Path(job.working_dir) / IMAGES_FILENAME
try:
with tmp_file.open("r") as f:
new_images = json.load(f)
return new_images
except FileNotFoundError:
return None


def assemble_filters_failed_job(job: JobV2) -> Optional[dict[str, Any]]:
"""
Assemble `DatasetV2.filters` for a failed workflow-execution.

Assemble new value of `filters` based on the last successful task, i.e.
based on the content of the temporary `FILTERS_FILENAME` file. If the file
is missing, return `None`.

Argumentss:
job:
The failed `JobV2` object.

Returns:
The new value of `dataset.filters`, or `None` if `FILTERS_FILENAME`
is missing.
"""
tmp_file = Path(job.working_dir) / FILTERS_FILENAME
try:
with tmp_file.open("r") as f:
new_filters = json.load(f)
return new_filters
except FileNotFoundError:
return None
flag_modified(db_dataset, "history")
db.merge(db_dataset)
db.commit()
Loading
Loading