diff --git a/.github/workflows/benchmarks.yaml b/.github/workflows/benchmarks.yaml index 7129b77c9f..9faa0e7c9b 100644 --- a/.github/workflows/benchmarks.yaml +++ b/.github/workflows/benchmarks.yaml @@ -2,9 +2,9 @@ name: Benchmarks on: push: - branches: ["main"] + branches: ["main", "dev-2.11"] pull_request: - branches: ["main"] + branches: ["main", "dev-2.11"] jobs: diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3072893810..8045a489b7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -2,9 +2,9 @@ name: ci on: push: - branches: ["main"] + branches: ["main", "dev-2.11"] pull_request: - branches: ["main"] + branches: ["main", "dev-2.11"] jobs: diff --git a/.github/workflows/documentation.yaml b/.github/workflows/documentation.yaml index ecb7e407d6..6df17afeba 100644 --- a/.github/workflows/documentation.yaml +++ b/.github/workflows/documentation.yaml @@ -2,9 +2,9 @@ name: docs on: push: - branches: ["main"] + branches: ["main", "dev-2.11"] pull_request: - branches: ["main"] + branches: ["main", "dev-2.11"] jobs: diff --git a/.github/workflows/migrations.yml b/.github/workflows/migrations.yml index 5bfe363c1d..358939d0d4 100644 --- a/.github/workflows/migrations.yml +++ b/.github/workflows/migrations.yml @@ -9,9 +9,9 @@ name: migrations on: push: - branches: ["main"] + branches: ["main", "dev-2.11"] pull_request: - branches: ["main"] + branches: ["main", "dev-2.11"] env: diff --git a/.github/workflows/oauth.yaml b/.github/workflows/oauth.yaml index 717c565694..4a24944947 100644 --- a/.github/workflows/oauth.yaml +++ b/.github/workflows/oauth.yaml @@ -2,9 +2,9 @@ name: OAuth2-OIDC on: push: - branches: ["main"] + branches: ["main", "dev-2.11"] pull_request: - branches: ["main"] + branches: ["main", "dev-2.11"] jobs: diff --git a/.github/workflows/pip_install.yml b/.github/workflows/pip_install.yml index c88b303171..ab900bc4d5 100644 --- a/.github/workflows/pip_install.yml +++ b/.github/workflows/pip_install.yml @@ -2,9 +2,9 @@ name: pip-install on: push: - branches: ["main"] + branches: ["main", "dev-2.11"] pull_request: - branches: ["main"] + branches: ["main", "dev-2.11"] jobs: diff --git a/.github/workflows/poetry_build.yml b/.github/workflows/poetry_build.yml index 7fa759648c..058697ad91 100644 --- a/.github/workflows/poetry_build.yml +++ b/.github/workflows/poetry_build.yml @@ -2,9 +2,9 @@ name: Build package on: push: - branches: ["main"] + branches: ["main", "dev-2.11"] pull_request: - branches: ["main"] + branches: ["main", "dev-2.11"] jobs: build: diff --git a/.github/workflows/precommit.yml b/.github/workflows/precommit.yml index 1e2ce952c9..8ee60dc5e9 100644 --- a/.github/workflows/precommit.yml +++ b/.github/workflows/precommit.yml @@ -2,9 +2,9 @@ name: precommit on: push: - branches: ["main"] + branches: ["main","dev-2.11"] pull_request: - branches: ["main"] + branches: ["main","dev-2.11"] jobs: precommit: diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 231b9203ca..7e02c5f835 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -18,7 +18,7 @@ repos: rev: 7.1.1 hooks: - id: flake8 - args: ["--exclude", "examples/*"] + args: ["--exclude", "examples/*", "--ignore=E203,W503"] - repo: https://github.com/PyCQA/bandit rev: '1.7.4' hooks: diff --git a/CHANGELOG.md b/CHANGELOG.md index 8982c9db98..dffa68557a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ **Note**: Numbers like (\#1234) point to closed Pull Requests on the fractal-server repository. +# 2.11.0 (unreleased) + +* Runner + * Integrate database write access in runner component (\#2169). +* API: + * Update and simplify `/api/v2/project/{project_id}/status/` (\#2169). + # 2.10.5 * App: diff --git a/fractal_server/app/routes/api/v1/dataset.py b/fractal_server/app/routes/api/v1/dataset.py index c12f04b688..202985b522 100644 --- a/fractal_server/app/routes/api/v1/dataset.py +++ b/fractal_server/app/routes/api/v1/dataset.py @@ -17,7 +17,7 @@ from ....models.v1 import Dataset from ....models.v1 import Project from ....models.v1 import Resource -from ....runner.filenames import HISTORY_FILENAME +from ....runner.filenames import HISTORY_FILENAME_V1 from ....schemas.v1 import DatasetCreateV1 from ....schemas.v1 import DatasetReadV1 from ....schemas.v1 import DatasetStatusReadV1 @@ -511,7 +511,7 @@ async def get_workflowtask_status( # Highest priority: Read status updates coming from the running-job # temporary file. Note: this file only contains information on # WorkflowTask's that ran through successfully - tmp_file = Path(running_job.working_dir) / HISTORY_FILENAME + tmp_file = Path(running_job.working_dir) / HISTORY_FILENAME_V1 try: with tmp_file.open("r") as f: history = json.load(f) diff --git a/fractal_server/app/routes/api/v2/status.py b/fractal_server/app/routes/api/v2/status.py index e31769dbf2..69a7e38522 100644 --- a/fractal_server/app/routes/api/v2/status.py +++ b/fractal_server/app/routes/api/v2/status.py @@ -1,5 +1,3 @@ -import json -from pathlib import Path from typing import Optional from fastapi import APIRouter @@ -18,7 +16,6 @@ from ._aux_functions import _get_workflow_check_owner from fractal_server.app.models import UserOAuth from fractal_server.app.routes.auth import current_active_user -from fractal_server.app.runner.filenames import HISTORY_FILENAME router = APIRouter() @@ -98,8 +95,8 @@ async def get_workflowtask_status( if running_job is None: # If no job is running, the chronological-last history item is also the # positional-last workflow task to be included in the response. - if len(dataset.history) > 0: - last_valid_wftask_id = dataset.history[-1]["workflowtask"]["id"] + if len(history) > 0: + last_valid_wftask_id = history[-1]["workflowtask"]["id"] else: last_valid_wftask_id = None else: @@ -109,7 +106,24 @@ async def get_workflowtask_status( # as "submitted" start = running_job.first_task_index end = running_job.last_task_index + 1 - for wftask in workflow.task_list[start:end]: + + running_job_wftasks = workflow.task_list[start:end] + running_job_statuses = [ + workflow_tasks_status_dict.get(wft.id, None) + for wft in running_job_wftasks + ] + try: + first_submitted_index = running_job_statuses.index( + WorkflowTaskStatusTypeV2.SUBMITTED + ) + except ValueError: + logger.warning( + f"Job {running_job.id} is submitted but its task list does " + f"not contain a {WorkflowTaskStatusTypeV2.SUBMITTED} task." + ) + first_submitted_index = 0 + + for wftask in running_job_wftasks[first_submitted_index:]: workflow_tasks_status_dict[ wftask.id ] = WorkflowTaskStatusTypeV2.SUBMITTED @@ -133,20 +147,6 @@ async def get_workflowtask_status( last_valid_wftask_id = None logger.warning(f"Now setting {last_valid_wftask_id=}.") - # Highest priority: Read status updates coming from the running-job - # temporary file. Note: this file only contains information on - # WorkflowTask's that ran through successfully. - tmp_file = Path(running_job.working_dir) / HISTORY_FILENAME - try: - with tmp_file.open("r") as f: - history = json.load(f) - except FileNotFoundError: - history = [] - for history_item in history: - wftask_id = history_item["workflowtask"]["id"] - wftask_status = history_item["status"] - workflow_tasks_status_dict[wftask_id] = wftask_status - # Based on previously-gathered information, clean up the response body clean_workflow_tasks_status_dict = {} for wf_task in workflow.task_list: diff --git a/fractal_server/app/runner/filenames.py b/fractal_server/app/runner/filenames.py index eaa8a123c0..22cdcbfcad 100644 --- a/fractal_server/app/runner/filenames.py +++ b/fractal_server/app/runner/filenames.py @@ -1,6 +1,4 @@ -HISTORY_FILENAME = "history.json" -FILTERS_FILENAME = "filters.json" -IMAGES_FILENAME = "images.json" -METADATA_FILENAME = "metadata.json" +HISTORY_FILENAME_V1 = "history.json" +METADATA_FILENAME_V1 = "metadata.json" SHUTDOWN_FILENAME = "shutdown" WORKFLOW_LOG_FILENAME = "workflow.log" diff --git a/fractal_server/app/runner/v1/_common.py b/fractal_server/app/runner/v1/_common.py index 5bcf1f0f40..46187e0187 100644 --- a/fractal_server/app/runner/v1/_common.py +++ b/fractal_server/app/runner/v1/_common.py @@ -28,8 +28,8 @@ from ..exceptions import TaskExecutionError from .common import TaskParameters from .common import write_args_file -from fractal_server.app.runner.filenames import HISTORY_FILENAME -from fractal_server.app.runner.filenames import METADATA_FILENAME +from fractal_server.app.runner.filenames import HISTORY_FILENAME_V1 +from fractal_server.app.runner.filenames import METADATA_FILENAME_V1 from fractal_server.app.runner.task_files import get_task_file_paths from fractal_server.string_tools import validate_cmd @@ -610,11 +610,11 @@ def execute_tasks( ) # Write most recent metadata to METADATA_FILENAME - with open(workflow_dir_local / METADATA_FILENAME, "w") as f: + with open(workflow_dir_local / METADATA_FILENAME_V1, "w") as f: json.dump(current_task_pars.metadata, f, indent=2) # Write most recent metadata to HISTORY_FILENAME - with open(workflow_dir_local / HISTORY_FILENAME, "w") as f: + with open(workflow_dir_local / HISTORY_FILENAME_V1, "w") as f: json.dump(current_task_pars.history, f, indent=2) return current_task_pars diff --git a/fractal_server/app/runner/v1/handle_failed_job.py b/fractal_server/app/runner/v1/handle_failed_job.py index 92c910f882..658f31fdfc 100644 --- a/fractal_server/app/runner/v1/handle_failed_job.py +++ b/fractal_server/app/runner/v1/handle_failed_job.py @@ -24,8 +24,8 @@ from ...models.v1 import Workflow from ...models.v1 import WorkflowTask from ...schemas.v1 import WorkflowTaskStatusTypeV1 -from ..filenames import HISTORY_FILENAME -from ..filenames import METADATA_FILENAME +from ..filenames import HISTORY_FILENAME_V1 +from ..filenames import METADATA_FILENAME_V1 def assemble_history_failed_job( @@ -64,7 +64,7 @@ def assemble_history_failed_job( new_history = output_dataset.history # Part 2: Extend history based on tmp_metadata_file - tmp_history_file = Path(job.working_dir) / HISTORY_FILENAME + tmp_history_file = Path(job.working_dir) / HISTORY_FILENAME_V1 try: with tmp_history_file.open("r") as f: tmp_file_history = json.load(f) @@ -129,7 +129,7 @@ def assemble_meta_failed_job( """ new_meta = deepcopy(output_dataset.meta) - metadata_file = Path(job.working_dir) / METADATA_FILENAME + metadata_file = Path(job.working_dir) / METADATA_FILENAME_V1 try: with metadata_file.open("r") as f: metadata_update = json.load(f) diff --git a/fractal_server/app/runner/v2/__init__.py b/fractal_server/app/runner/v2/__init__.py index dec83d8d5b..fee385c738 100644 --- a/fractal_server/app/runner/v2/__init__.py +++ b/fractal_server/app/runner/v2/__init__.py @@ -11,7 +11,6 @@ from typing import Optional from sqlalchemy.orm import Session as DBSyncSession -from sqlalchemy.orm.attributes import flag_modified from ....config import get_settings from ....logger import get_logger @@ -24,7 +23,6 @@ from ...db import DB from ...models.v2 import DatasetV2 from ...models.v2 import JobV2 -from ...models.v2 import WorkflowTaskV2 from ...models.v2 import WorkflowV2 from ...schemas.v2 import JobStatusTypeV2 from ..exceptions import JobExecutionError @@ -38,12 +36,11 @@ ) from ._slurm_ssh import process_workflow as slurm_ssh_process_workflow from ._slurm_sudo import process_workflow as slurm_sudo_process_workflow -from .handle_failed_job import assemble_filters_failed_job -from .handle_failed_job import assemble_history_failed_job -from .handle_failed_job import assemble_images_failed_job +from .handle_failed_job import mark_last_wftask_as_failed from fractal_server import __VERSION__ from fractal_server.app.models import UserSettings + _backends = {} _backends["local"] = local_process_workflow _backends["slurm"] = slurm_sudo_process_workflow @@ -115,7 +112,6 @@ async def submit_workflow( logger = set_logger(logger_name=logger_name) with next(DB.get_sync_db()) as db_sync: - try: job: Optional[JobV2] = db_sync.get(JobV2, job_id) dataset: Optional[DatasetV2] = db_sync.get(DatasetV2, dataset_id) @@ -322,7 +318,7 @@ async def submit_workflow( db_sync = next(DB.get_sync_db()) db_sync.close() - new_dataset_attributes = await process_workflow( + await process_workflow( workflow=workflow, dataset=dataset, workflow_dir_local=WORKFLOW_DIR_LOCAL, @@ -340,14 +336,6 @@ async def submit_workflow( ) logger.debug(f'END workflow "{workflow.name}"') - # Update dataset attributes, in case of successful execution - dataset.history.extend(new_dataset_attributes["history"]) - dataset.filters = new_dataset_attributes["filters"] - dataset.images = new_dataset_attributes["images"] - for attribute_name in ["filters", "history", "images"]: - flag_modified(dataset, attribute_name) - db_sync.merge(dataset) - # Update job DB entry job.status = JobStatusTypeV2.DONE job.end_timestamp = get_timestamp() @@ -358,28 +346,13 @@ async def submit_workflow( db_sync.commit() except TaskExecutionError as e: - logger.debug(f'FAILED workflow "{workflow.name}", TaskExecutionError.') logger.info(f'Workflow "{workflow.name}" failed (TaskExecutionError).') - # Read dataset attributes produced by the last successful task, and - # update the DB dataset accordingly - failed_wftask = db_sync.get(WorkflowTaskV2, e.workflow_task_id) - dataset.history = assemble_history_failed_job( - job, - dataset, - workflow, + mark_last_wftask_as_failed( + dataset_id=dataset_id, logger_name=logger_name, - failed_wftask=failed_wftask, ) - latest_filters = assemble_filters_failed_job(job) - if latest_filters is not None: - dataset.filters = latest_filters - latest_images = assemble_images_failed_job(job) - if latest_images is not None: - dataset.images = latest_images - db_sync.merge(dataset) - exception_args_string = "\n".join(e.args) log_msg = ( f"TASK ERROR: " @@ -390,26 +363,12 @@ async def submit_workflow( fail_job(db=db_sync, job=job, log_msg=log_msg, logger_name=logger_name) except JobExecutionError as e: - logger.debug(f'FAILED workflow "{workflow.name}", JobExecutionError.') logger.info(f'Workflow "{workflow.name}" failed (JobExecutionError).') - - # Read dataset attributes produced by the last successful task, and - # update the DB dataset accordingly - dataset.history = assemble_history_failed_job( - job, - dataset, - workflow, + mark_last_wftask_as_failed( + dataset_id=dataset_id, logger_name=logger_name, ) - latest_filters = assemble_filters_failed_job(job) - if latest_filters is not None: - dataset.filters = latest_filters - latest_images = assemble_images_failed_job(job) - if latest_images is not None: - dataset.images = latest_images - db_sync.merge(dataset) - fail_job( db=db_sync, job=job, @@ -421,27 +380,13 @@ async def submit_workflow( ) except Exception: - logger.debug(f'FAILED workflow "{workflow.name}", unknown error.') logger.info(f'Workflow "{workflow.name}" failed (unkwnon error).') - - current_traceback = traceback.format_exc() - - # Read dataset attributes produced by the last successful task, and - # update the DB dataset accordingly - dataset.history = assemble_history_failed_job( - job, - dataset, - workflow, + mark_last_wftask_as_failed( + dataset_id=dataset_id, logger_name=logger_name, ) - latest_filters = assemble_filters_failed_job(job) - if latest_filters is not None: - dataset.filters = latest_filters - latest_images = assemble_images_failed_job(job) - if latest_images is not None: - dataset.images = latest_images - db_sync.merge(dataset) + current_traceback = traceback.format_exc() fail_job( db=db_sync, job=job, diff --git a/fractal_server/app/runner/v2/_local/__init__.py b/fractal_server/app/runner/v2/_local/__init__.py index ac068368ac..74ca9fc78a 100644 --- a/fractal_server/app/runner/v2/_local/__init__.py +++ b/fractal_server/app/runner/v2/_local/__init__.py @@ -39,18 +39,15 @@ def _process_workflow( workflow_dir_local: Path, first_task_index: int, last_task_index: int, -) -> dict: +) -> None: """ - Internal processing routine - - Schedules the workflow using a `FractalThreadPoolExecutor`. + Run the workflow using a `FractalThreadPoolExecutor`. """ - with FractalThreadPoolExecutor() as executor: - new_dataset_attributes = execute_tasks_v2( + execute_tasks_v2( wf_task_list=workflow.task_list[ - first_task_index : (last_task_index + 1) # noqa - ], # noqa + first_task_index : (last_task_index + 1) + ], dataset=dataset, executor=executor, workflow_dir_local=workflow_dir_local, @@ -58,7 +55,6 @@ def _process_workflow( logger_name=logger_name, submit_setup_call=_local_submit_setup, ) - return new_dataset_attributes async def process_workflow( @@ -75,7 +71,7 @@ async def process_workflow( slurm_user: Optional[str] = None, slurm_account: Optional[str] = None, worker_init: Optional[str] = None, -) -> dict: +) -> None: """ Run a workflow @@ -127,11 +123,6 @@ async def process_workflow( (positive exit codes). JobExecutionError: wrapper for errors raised by the tasks' executors (negative exit codes). - - Returns: - output_dataset_metadata: - The updated metadata for the dataset, as returned by the last task - of the workflow """ if workflow_dir_remote and (workflow_dir_remote != workflow_dir_local): @@ -148,7 +139,7 @@ async def process_workflow( last_task_index=last_task_index, ) - new_dataset_attributes = await async_wrap(_process_workflow)( + await async_wrap(_process_workflow)( workflow=workflow, dataset=dataset, logger_name=logger_name, @@ -156,4 +147,3 @@ async def process_workflow( first_task_index=first_task_index, last_task_index=last_task_index, ) - return new_dataset_attributes diff --git a/fractal_server/app/runner/v2/_local_experimental/__init__.py b/fractal_server/app/runner/v2/_local_experimental/__init__.py index cb87a01a80..c298779ec2 100644 --- a/fractal_server/app/runner/v2/_local_experimental/__init__.py +++ b/fractal_server/app/runner/v2/_local_experimental/__init__.py @@ -21,23 +21,17 @@ def _process_workflow( workflow_dir_local: Path, first_task_index: int, last_task_index: int, -) -> dict: +) -> None: """ - Internal processing routine - - Schedules the workflow using a `FractalProcessPoolExecutor`. - - Cf. - [process_workflow][fractal_server.app.runner.v2._local_experimental.process_workflow] - for the call signature. + Run the workflow using a `FractalProcessPoolExecutor`. """ with FractalProcessPoolExecutor( shutdown_file=workflow_dir_local / SHUTDOWN_FILENAME ) as executor: try: - new_dataset_attributes = execute_tasks_v2( + execute_tasks_v2( wf_task_list=workflow.task_list[ - first_task_index : (last_task_index + 1) # noqa + first_task_index : (last_task_index + 1) ], dataset=dataset, executor=executor, @@ -54,8 +48,6 @@ def _process_workflow( ) ) - return new_dataset_attributes - async def process_workflow( *, @@ -71,7 +63,7 @@ async def process_workflow( slurm_user: Optional[str] = None, slurm_account: Optional[str] = None, worker_init: Optional[str] = None, -) -> dict: +) -> None: """ Run a workflow @@ -123,11 +115,6 @@ async def process_workflow( (positive exit codes). JobExecutionError: wrapper for errors raised by the tasks' executors (negative exit codes). - - Returns: - output_dataset_metadata: - The updated metadata for the dataset, as returned by the last task - of the workflow """ if workflow_dir_remote and (workflow_dir_remote != workflow_dir_local): @@ -144,7 +131,7 @@ async def process_workflow( last_task_index=last_task_index, ) - new_dataset_attributes = await async_wrap(_process_workflow)( + await async_wrap(_process_workflow)( workflow=workflow, dataset=dataset, logger_name=logger_name, @@ -152,4 +139,3 @@ async def process_workflow( first_task_index=first_task_index, last_task_index=last_task_index, ) - return new_dataset_attributes diff --git a/fractal_server/app/runner/v2/_slurm_ssh/__init__.py b/fractal_server/app/runner/v2/_slurm_ssh/__init__.py index 3816a779f0..def74eb560 100644 --- a/fractal_server/app/runner/v2/_slurm_ssh/__init__.py +++ b/fractal_server/app/runner/v2/_slurm_ssh/__init__.py @@ -17,7 +17,6 @@ Executor objects. """ from pathlib import Path -from typing import Any from typing import Optional from typing import Union @@ -47,16 +46,13 @@ def _process_workflow( last_task_index: int, fractal_ssh: FractalSSH, worker_init: Optional[Union[str, list[str]]] = None, -) -> dict[str, Any]: +) -> None: """ - Internal processing routine for the SLURM backend + Run the workflow using a `FractalSlurmSSHExecutor`. This function initialises the a FractalSlurmExecutor, setting logging, workflow working dir and user to impersonate. It then schedules the workflow tasks and returns the new dataset attributes - - Returns: - new_dataset_attributes: """ if isinstance(worker_init, str): @@ -80,10 +76,10 @@ def _process_workflow( workflow_dir_remote=workflow_dir_remote, common_script_lines=worker_init, ) as executor: - new_dataset_attributes = execute_tasks_v2( + execute_tasks_v2( wf_task_list=workflow.task_list[ - first_task_index : (last_task_index + 1) # noqa - ], # noqa + first_task_index : (last_task_index + 1) + ], dataset=dataset, executor=executor, workflow_dir_local=workflow_dir_local, @@ -91,7 +87,6 @@ def _process_workflow( logger_name=logger_name, submit_setup_call=_slurm_submit_setup, ) - return new_dataset_attributes async def process_workflow( @@ -109,7 +104,7 @@ async def process_workflow( slurm_user: Optional[str] = None, slurm_account: Optional[str] = None, worker_init: Optional[str] = None, -) -> dict: +) -> None: """ Process workflow (SLURM backend public interface) """ @@ -122,7 +117,7 @@ async def process_workflow( last_task_index=last_task_index, ) - new_dataset_attributes = await async_wrap(_process_workflow)( + await async_wrap(_process_workflow)( workflow=workflow, dataset=dataset, logger_name=logger_name, @@ -133,4 +128,3 @@ async def process_workflow( worker_init=worker_init, fractal_ssh=fractal_ssh, ) - return new_dataset_attributes diff --git a/fractal_server/app/runner/v2/_slurm_sudo/__init__.py b/fractal_server/app/runner/v2/_slurm_sudo/__init__.py index b2ee5f1d71..2fafc67ef5 100644 --- a/fractal_server/app/runner/v2/_slurm_sudo/__init__.py +++ b/fractal_server/app/runner/v2/_slurm_sudo/__init__.py @@ -17,7 +17,6 @@ Executor objects. """ from pathlib import Path -from typing import Any from typing import Optional from typing import Union @@ -43,16 +42,13 @@ def _process_workflow( slurm_account: Optional[str] = None, user_cache_dir: str, worker_init: Optional[Union[str, list[str]]] = None, -) -> dict[str, Any]: +) -> None: """ - Internal processing routine for the SLURM backend + Run the workflow using a `FractalSlurmExecutor`. This function initialises the a FractalSlurmExecutor, setting logging, workflow working dir and user to impersonate. It then schedules the workflow tasks and returns the new dataset attributes - - Returns: - new_dataset_attributes: """ if not slurm_user: @@ -73,10 +69,10 @@ def _process_workflow( common_script_lines=worker_init, slurm_account=slurm_account, ) as executor: - new_dataset_attributes = execute_tasks_v2( + execute_tasks_v2( wf_task_list=workflow.task_list[ - first_task_index : (last_task_index + 1) # noqa - ], # noqa + first_task_index : (last_task_index + 1) + ], dataset=dataset, executor=executor, workflow_dir_local=workflow_dir_local, @@ -84,7 +80,6 @@ def _process_workflow( logger_name=logger_name, submit_setup_call=_slurm_submit_setup, ) - return new_dataset_attributes async def process_workflow( @@ -101,7 +96,7 @@ async def process_workflow( slurm_user: Optional[str] = None, slurm_account: Optional[str] = None, worker_init: Optional[str] = None, -) -> dict: +) -> None: """ Process workflow (SLURM backend public interface). """ @@ -113,8 +108,7 @@ async def process_workflow( first_task_index=first_task_index, last_task_index=last_task_index, ) - - new_dataset_attributes = await async_wrap(_process_workflow)( + await async_wrap(_process_workflow)( workflow=workflow, dataset=dataset, logger_name=logger_name, @@ -127,4 +121,3 @@ async def process_workflow( slurm_account=slurm_account, worker_init=worker_init, ) - return new_dataset_attributes diff --git a/fractal_server/app/runner/v2/handle_failed_job.py b/fractal_server/app/runner/v2/handle_failed_job.py index 0886743de8..023d04c11d 100644 --- a/fractal_server/app/runner/v2/handle_failed_job.py +++ b/fractal_server/app/runner/v2/handle_failed_job.py @@ -12,147 +12,48 @@ """ Helper functions to handle Dataset history. """ -import json import logging -from pathlib import Path -from typing import Any -from typing import Optional + +from sqlalchemy.orm.attributes import flag_modified from ...models.v2 import DatasetV2 -from ...models.v2 import JobV2 -from ...models.v2 import WorkflowTaskV2 -from ...models.v2 import WorkflowV2 from ...schemas.v2 import WorkflowTaskStatusTypeV2 -from ..filenames import FILTERS_FILENAME -from ..filenames import HISTORY_FILENAME -from ..filenames import IMAGES_FILENAME +from fractal_server.app.db import get_sync_db -def assemble_history_failed_job( - job: JobV2, - dataset: DatasetV2, - workflow: WorkflowV2, - logger_name: Optional[str] = None, - failed_wftask: Optional[WorkflowTaskV2] = None, -) -> list[dict[str, Any]]: +def mark_last_wftask_as_failed( + dataset_id: int, + logger_name: str, +) -> None: """ - Assemble `history` after a workflow-execution job fails. + Edit dataset history, by marking last item as failed. Args: - job: - The failed `JobV2` object. - dataset: - The `DatasetV2` object associated to `job`. - workflow: - The `WorkflowV2` object associated to `job`. + dataset: The `DatasetV2` object logger_name: A logger name. - failed_wftask: - If set, append it to `history` during step 3; if `None`, infer - it by comparing the job task list and the one in - `HISTORY_FILENAME`. - - Returns: - The new value of `history`, to be merged into - `dataset.meta`. """ logger = logging.getLogger(logger_name) - - # The final value of the history attribute should include up to three - # parts, coming from: the database, the temporary file, the failed-task - # information. - - # Part 1: Read exising history from DB - new_history = dataset.history - - # Part 2: Extend history based on temporary-file contents - tmp_history_file = Path(job.working_dir) / HISTORY_FILENAME - try: - with tmp_history_file.open("r") as f: - tmp_file_history = json.load(f) - new_history.extend(tmp_file_history) - except FileNotFoundError: - tmp_file_history = [] - - # Part 3/A: Identify failed task, if needed - if failed_wftask is None: - job_wftasks = workflow.task_list[ - job.first_task_index : (job.last_task_index + 1) # noqa - ] - tmp_file_wftasks = [ - history_item["workflowtask"] for history_item in tmp_file_history - ] - if len(job_wftasks) <= len(tmp_file_wftasks): - n_tasks_job = len(job_wftasks) - n_tasks_tmp = len(tmp_file_wftasks) - logger.error( - "Cannot identify the failed task based on job task list " - f"(length {n_tasks_job}) and temporary-file task list " - f"(length {n_tasks_tmp})." + with next(get_sync_db()) as db: + db_dataset = db.get(DatasetV2, dataset_id) + if len(db_dataset.history) == 0: + logger.warning( + f"History for {dataset_id=} is empty. Likely reason: the job " + "failed before its first task was marked as SUBMITTED. " + "Continue." ) - logger.error("Failed task not appended to history.") - else: - failed_wftask = job_wftasks[len(tmp_file_wftasks)] - - # Part 3/B: Append failed task to history - if failed_wftask is not None: - failed_wftask_dump = failed_wftask.model_dump(exclude={"task"}) - failed_wftask_dump["task"] = failed_wftask.task.model_dump() - new_history_item = dict( - workflowtask=failed_wftask_dump, - status=WorkflowTaskStatusTypeV2.FAILED, - parallelization=dict(), # FIXME: re-include parallelization - ) - new_history.append(new_history_item) - - return new_history - - -def assemble_images_failed_job(job: JobV2) -> Optional[dict[str, Any]]: - """ - Assemble `DatasetV2.images` for a failed workflow-execution. - - Assemble new value of `images` based on the last successful task, i.e. - based on the content of the temporary `IMAGES_FILENAME` file. If the file - is missing, return `None`. - - Argumentss: - job: - The failed `JobV2` object. - - Returns: - The new value of `dataset.images`, or `None` if `IMAGES_FILENAME` - is missing. - """ - tmp_file = Path(job.working_dir) / IMAGES_FILENAME - try: - with tmp_file.open("r") as f: - new_images = json.load(f) - return new_images - except FileNotFoundError: - return None - - -def assemble_filters_failed_job(job: JobV2) -> Optional[dict[str, Any]]: - """ - Assemble `DatasetV2.filters` for a failed workflow-execution. - - Assemble new value of `filters` based on the last successful task, i.e. - based on the content of the temporary `FILTERS_FILENAME` file. If the file - is missing, return `None`. - - Argumentss: - job: - The failed `JobV2` object. - - Returns: - The new value of `dataset.filters`, or `None` if `FILTERS_FILENAME` - is missing. - """ - tmp_file = Path(job.working_dir) / FILTERS_FILENAME - try: - with tmp_file.open("r") as f: - new_filters = json.load(f) - return new_filters - except FileNotFoundError: - return None + return + workflowtask_id = db_dataset.history[-1]["workflowtask"]["id"] + last_item_status = db_dataset.history[-1]["status"] + if last_item_status != WorkflowTaskStatusTypeV2.SUBMITTED: + logger.warning( + "Unexpected branch: " + f"Last history item, for {workflowtask_id=}, " + f"has status {last_item_status}. Skip." + ) + return + logger.info(f"Setting history item for {workflowtask_id=} to failed.") + db_dataset.history[-1]["status"] = WorkflowTaskStatusTypeV2.FAILED + flag_modified(db_dataset, "history") + db.merge(db_dataset) + db.commit() diff --git a/fractal_server/app/runner/v2/runner.py b/fractal_server/app/runner/v2/runner.py index 5a87ac8a58..93c645d6c3 100644 --- a/fractal_server/app/runner/v2/runner.py +++ b/fractal_server/app/runner/v2/runner.py @@ -1,4 +1,3 @@ -import json import logging from concurrent.futures import ThreadPoolExecutor from copy import copy @@ -7,20 +6,20 @@ from typing import Callable from typing import Optional +from sqlalchemy.orm.attributes import flag_modified + from ....images import Filters from ....images import SingleImage from ....images.tools import filter_image_list from ....images.tools import find_image_by_zarr_url from ....images.tools import match_filter from ..exceptions import JobExecutionError -from ..filenames import FILTERS_FILENAME -from ..filenames import HISTORY_FILENAME -from ..filenames import IMAGES_FILENAME from .runner_functions import no_op_submit_setup_call from .runner_functions import run_v2_task_compound from .runner_functions import run_v2_task_non_parallel from .runner_functions import run_v2_task_parallel from .task_interface import TaskOutput +from fractal_server.app.db import get_sync_db from fractal_server.app.models.v2 import DatasetV2 from fractal_server.app.models.v2 import WorkflowTaskV2 from fractal_server.app.schemas.v2.dataset import _DatasetHistoryItemV2 @@ -35,20 +34,20 @@ def execute_tasks_v2( workflow_dir_remote: Optional[Path] = None, logger_name: Optional[str] = None, submit_setup_call: Callable = no_op_submit_setup_call, -) -> DatasetV2: - +) -> None: logger = logging.getLogger(logger_name) - if ( - not workflow_dir_local.exists() - ): # FIXME: this should have already happened + if not workflow_dir_local.exists(): + logger.warning( + f"Now creating {workflow_dir_local}, " + "but it should have already happened." + ) workflow_dir_local.mkdir() # Initialize local dataset attributes zarr_dir = dataset.zarr_dir tmp_images = deepcopy(dataset.images) tmp_filters = deepcopy(dataset.filters) - tmp_history = [] for wftask in wf_task_list: task = wftask.task @@ -77,7 +76,18 @@ def execute_tasks_v2( f'Image zarr_url: {image["zarr_url"]}\n' f'Image types: {image["types"]}\n' ) - + # First, set status SUBMITTED in dataset.history for each wftask + with next(get_sync_db()) as db: + db_dataset = db.get(DatasetV2, dataset.id) + new_history_item = _DatasetHistoryItemV2( + workflowtask=wftask, + status=WorkflowTaskStatusTypeV2.SUBMITTED, + parallelization=dict(), # FIXME: re-include parallelization + ).dict() + db_dataset.history.append(new_history_item) + flag_modified(db_dataset, "history") + db.merge(db_dataset) + db.commit() # TASK EXECUTION (V2) if task.type == "non_parallel": current_task_output = run_v2_task_non_parallel( @@ -282,36 +292,18 @@ def execute_tasks_v2( tmp_filters["types"].update(types_from_manifest) tmp_filters["types"].update(types_from_task) - # Update history (based on _DatasetHistoryItemV2) - history_item = _DatasetHistoryItemV2( - workflowtask=wftask, - status=WorkflowTaskStatusTypeV2.DONE, - parallelization=dict( - # task_type=wftask.task.type, # FIXME: breaks for V1 tasks - # component_list=fil, #FIXME - ), - ).dict() - tmp_history.append(history_item) - - # Write current dataset attributes (history, images, filters) into - # temporary files which can be used (1) to retrieve the latest state + # Write current dataset attributes (history, images, filters) into the + # database. They can be used (1) to retrieve the latest state # when the job fails, (2) from within endpoints that need up-to-date # information - with open(workflow_dir_local / HISTORY_FILENAME, "w") as f: - json.dump(tmp_history, f, indent=2) - with open(workflow_dir_local / FILTERS_FILENAME, "w") as f: - json.dump(tmp_filters, f, indent=2) - with open(workflow_dir_local / IMAGES_FILENAME, "w") as f: - json.dump(tmp_images, f, indent=2) + with next(get_sync_db()) as db: + db_dataset = db.get(DatasetV2, dataset.id) + db_dataset.history[-1]["status"] = WorkflowTaskStatusTypeV2.DONE + db_dataset.filters = tmp_filters + db_dataset.images = tmp_images + for attribute_name in ["filters", "history", "images"]: + flag_modified(db_dataset, attribute_name) + db.merge(db_dataset) + db.commit() logger.debug(f'END {wftask.order}-th task (name="{task_name}")') - - # NOTE: tmp_history only contains the newly-added history items (to be - # appended to the original history), while tmp_filters and tmp_images - # represent the new attributes (to replace the original ones) - result = dict( - history=tmp_history, - filters=tmp_filters, - images=tmp_images, - ) - return result diff --git a/tests/v1/04_api/test_history_v1.py b/tests/v1/04_api/test_history_v1.py index 8b11eb955e..f5aa7898c7 100644 --- a/tests/v1/04_api/test_history_v1.py +++ b/tests/v1/04_api/test_history_v1.py @@ -6,7 +6,7 @@ from fractal_server.app.routes.api.v1._aux_functions import ( _workflow_insert_task, ) -from fractal_server.app.runner.filenames import HISTORY_FILENAME +from fractal_server.app.runner.filenames import HISTORY_FILENAME_V1 from fractal_server.app.runner.v1.handle_failed_job import ( assemble_history_failed_job, ) # noqa @@ -46,9 +46,9 @@ async def test_get_workflowtask_status( working_dir = tmp_path / "working_dir" working_dir.mkdir() - with (working_dir / HISTORY_FILENAME).open("w") as f: + with (working_dir / HISTORY_FILENAME_V1).open("w") as f: json.dump(history, f) - debug(working_dir / HISTORY_FILENAME) + debug(working_dir / HISTORY_FILENAME_V1) async with MockCurrentUser() as user: project = await project_factory(user) @@ -315,7 +315,7 @@ async def test_assemble_history_failed_job_fail( Path(job.working_dir).mkdir() tmp_history = [dict(workflowtask={"id": wftask.id})] - with (Path(job.working_dir) / HISTORY_FILENAME).open("w") as fp: + with (Path(job.working_dir) / HISTORY_FILENAME_V1).open("w") as fp: json.dump(tmp_history, fp) logger = logging.getLogger(None) @@ -341,7 +341,7 @@ async def test_json_decode_error( history = "NOT A VALID JSON" working_dir = tmp_path / "working_dir" working_dir.mkdir() - with (working_dir / HISTORY_FILENAME).open("w") as f: + with (working_dir / HISTORY_FILENAME_V1).open("w") as f: f.write(history) async with MockCurrentUser() as user: diff --git a/tests/v2/03_api/test_api_status.py b/tests/v2/03_api/test_api_status.py index 62d5319248..5533b6c4c6 100644 --- a/tests/v2/03_api/test_api_status.py +++ b/tests/v2/03_api/test_api_status.py @@ -1,14 +1,12 @@ -import json - -from devtools import debug +from sqlalchemy.orm.attributes import flag_modified from fractal_server.app.routes.api.v2._aux_functions import ( _workflow_insert_task, ) -from fractal_server.app.runner.filenames import HISTORY_FILENAME +from fractal_server.app.schemas.v2.dataset import WorkflowTaskStatusTypeV2 -async def test_workflowtask_status_no_history_no_job( +async def test_status_no_history_no_running_job( db, MockCurrentUser, project_factory_v2, @@ -18,8 +16,8 @@ async def test_workflowtask_status_no_history_no_job( client, ): """ - Test the status endpoint when there is information in the DB and no running - job associated to a given dataset/workflow pair. + GIVEN A database with no jobs and no history + THEN The status-endpoint response is empty """ async with MockCurrentUser() as user: project = await project_factory_v2(user) @@ -41,7 +39,7 @@ async def test_workflowtask_status_no_history_no_job( assert res.json() == {"status": {}} -async def test_workflowtask_status_history_no_job( +async def test_status_yes_history_no_running_job( db, MockCurrentUser, project_factory_v2, @@ -51,8 +49,8 @@ async def test_workflowtask_status_history_no_job( client, ): """ - Test the status endpoint when there is a non-empty history in the DB but - no running job associated to a given dataset/workflow pair. + Test the case of the database with non-empty dataset.history and no + running jobs. """ async with MockCurrentUser() as user: project = await project_factory_v2(user) @@ -137,7 +135,7 @@ async def test_workflowtask_status_history_no_job( assert res.json() == {"status": {"3": "done"}} -async def test_workflowtask_status_history_job( +async def test_status_yes_history_yes_running_job( db, MockCurrentUser, tmp_path, @@ -153,12 +151,9 @@ async def test_workflowtask_status_history_job( there is a running job associated to a given dataset/workflow pair. """ working_dir = tmp_path / "working_dir" - history = [dict(workflowtask=dict(id=3), status="done")] async with MockCurrentUser() as user: project = await project_factory_v2(user) - dataset = await dataset_factory_v2( - project_id=project.id, history=history - ) + dataset = await dataset_factory_v2(project_id=project.id, history=[]) task = await task_factory_v2( user_id=user.id, name="task1", source="task1" ) @@ -177,7 +172,39 @@ async def test_workflowtask_status_history_job( last_task_index=1, ) - # CASE 1: the job has no temporary history file + # CASE 1: first submitted + dataset.history = [ + dict( + workflowtask=dict(id=workflow.task_list[0].id), + status=WorkflowTaskStatusTypeV2.SUBMITTED, + ), + ] + flag_modified(dataset, "history") + await db.merge(dataset) + await db.commit() + res = await client.get( + ( + f"api/v2/project/{project.id}/status/?" + f"dataset_id={dataset.id}&workflow_id={workflow.id}" + ) + ) + assert res.status_code == 200 + assert res.json()["status"] == {"1": "submitted", "2": "submitted"} + + # CASE 2: first done + dataset.history = [ + dict( + workflowtask=dict(id=workflow.task_list[0].id), + status=WorkflowTaskStatusTypeV2.DONE, + ), + dict( + workflowtask=dict(id=workflow.task_list[1].id), + status=WorkflowTaskStatusTypeV2.SUBMITTED, + ), + ] + flag_modified(dataset, "history") + await db.merge(dataset) + await db.commit() res = await client.get( ( f"api/v2/project/{project.id}/status/?" @@ -185,15 +212,22 @@ async def test_workflowtask_status_history_job( ) ) assert res.status_code == 200 - assert res.json() == {"status": {"1": "submitted", "2": "submitted"}} + assert res.json()["status"] == {"1": "done", "2": "submitted"} - # CASE 2: the job has a temporary history file - history = [ - dict(workflowtask=dict(id=workflow.task_list[0].id), status="done") + # CASE 3: no "SUBMITTED" in the task list + dataset.history = [ + dict( + workflowtask=dict(id=workflow.task_list[0].id), + status=WorkflowTaskStatusTypeV2.DONE, + ), + dict( + workflowtask=dict(id=workflow.task_list[1].id), + status=WorkflowTaskStatusTypeV2.FAILED, + ), ] - working_dir.mkdir() - with (working_dir / HISTORY_FILENAME).open("w") as f: - json.dump(history, f) + flag_modified(dataset, "history") + await db.merge(dataset) + await db.commit() res = await client.get( ( f"api/v2/project/{project.id}/status/?" @@ -201,7 +235,7 @@ async def test_workflowtask_status_history_job( ) ) assert res.status_code == 200 - assert res.json() == {"status": {"1": "done", "2": "submitted"}} + assert res.json()["status"] == {"1": "submitted", "2": "submitted"} async def test_workflowtask_status_two_jobs( @@ -217,7 +251,7 @@ async def test_workflowtask_status_two_jobs( ): """ If there are more than one jobs associated to a given dataset/workflow pair - (which should never happen), the endpoin responds with 422. + (which should never happen), the endpoint responds with 422. """ working_dir = tmp_path / "working_dir" async with MockCurrentUser() as user: @@ -237,14 +271,12 @@ async def test_workflowtask_status_two_jobs( dataset_id=dataset.id, working_dir=str(working_dir), ) - res = await client.get( ( f"api/v2/project/{project.id}/status/?" f"dataset_id={dataset.id}&workflow_id={workflow.id}" ) ) - debug(res.json()) assert res.status_code == 422 @@ -267,7 +299,6 @@ async def test_workflowtask_status_modified_workflow( working_dir = tmp_path / "working_dir" async with MockCurrentUser() as user: project = await project_factory_v2(user) - dataset = await dataset_factory_v2(project_id=project.id, history=[]) task = await task_factory_v2( user_id=user.id, name="task1", source="task1" ) @@ -276,6 +307,23 @@ async def test_workflowtask_status_modified_workflow( await _workflow_insert_task( workflow_id=workflow.id, task_id=task.id, db=db ) + dataset = await dataset_factory_v2( + project_id=project.id, + history=[ + dict( + workflowtask=dict(id=workflow.task_list[0].id), + status=WorkflowTaskStatusTypeV2.DONE, + ), + dict( + workflowtask=dict(id=workflow.task_list[1].id), + status=WorkflowTaskStatusTypeV2.DONE, + ), + dict( + workflowtask=dict(id=workflow.task_list[2].id), + status=WorkflowTaskStatusTypeV2.SUBMITTED, + ), + ], + ) await job_factory_v2( project_id=project.id, workflow_id=workflow.id, @@ -293,20 +341,43 @@ async def test_workflowtask_status_modified_workflow( wftask_list = res.json()["task_list"] for wftask in wftask_list[1:]: wftask_id = wftask["id"] - debug(f"Delete {wftask_id=}") res = await client.delete( f"api/v2/project/{project.id}/workflow/{workflow.id}/" f"wftask/{wftask_id}/" ) assert res.status_code == 204 - # The endpoint response is OK, even if the running_job points to - # non-existing WorkflowTask's. - res = await client.get( - ( - f"api/v2/project/{project.id}/status/?" - f"dataset_id={dataset.id}&workflow_id={workflow.id}" + # The endpoint response is OK, even if the running_job points to + # non-existing WorkflowTask's. + res = await client.get( + ( + f"api/v2/project/{project.id}/status/?" + f"dataset_id={dataset.id}&workflow_id={workflow.id}" + ) ) - ) - assert res.status_code == 200 - assert res.json() == {"status": {"1": "submitted"}} + assert res.status_code == 200 + assert res.json() == {"status": {"1": "submitted"}} + + # Delete last remaining task + res = await client.get( + f"api/v2/project/{project.id}/workflow/{workflow.id}/" + ) + assert res.status_code == 200 + for wftask in res.json()["task_list"]: + wftask_id = wftask["id"] + res = await client.delete( + f"api/v2/project/{project.id}/workflow/{workflow.id}/" + f"wftask/{wftask_id}/" + ) + assert res.status_code == 204 + + # The endpoint response is OK, even if the running_job points to + # non-existing WorkflowTask's. + res = await client.get( + ( + f"api/v2/project/{project.id}/status/?" + f"dataset_id={dataset.id}&workflow_id={workflow.id}" + ) + ) + assert res.status_code == 200 + assert res.json() == {"status": {}} diff --git a/tests/v2/03_api/test_submission_job_list_v2.py b/tests/v2/03_api/test_submission_job_list_v2.py index 14e31ef1b7..4375ffe803 100644 --- a/tests/v2/03_api/test_submission_job_list_v2.py +++ b/tests/v2/03_api/test_submission_job_list_v2.py @@ -18,7 +18,6 @@ async def test_clean_app_job_list_v2( job_factory_v2, override_settings_factory, ): - # Check that app fixture starts in a clean state assert app.state.jobsV1 == [] assert app.state.jobsV2 == [] @@ -27,10 +26,9 @@ async def test_clean_app_job_list_v2( override_settings_factory(FRACTAL_API_MAX_JOB_LIST_LENGTH=0) async with MockCurrentUser(user_kwargs=dict(is_verified=True)) as user: - # Create DB objects task = await task_factory_v2( - user_id=user.id, name="task", command="echo" + user_id=user.id, name="task", command_non_parallel="echo" ) project = await project_factory_v2(user) workflow = await workflow_factory_v2(project_id=project.id) diff --git a/tests/v2/04_runner/aux_get_dataset_attrs.py b/tests/v2/04_runner/aux_get_dataset_attrs.py new file mode 100644 index 0000000000..6fed8b0185 --- /dev/null +++ b/tests/v2/04_runner/aux_get_dataset_attrs.py @@ -0,0 +1,12 @@ +from typing import Any + +from fractal_server.app.models import DatasetV2 + + +async def _get_dataset_attrs(db, dataset_id) -> dict[str, Any]: + await db.close() + db_dataset = await db.get(DatasetV2, dataset_id) + dataset_attrs = db_dataset.model_dump( + include={"filters", "history", "images"} + ) + return dataset_attrs diff --git a/tests/v2/04_runner/test_dummy_examples.py b/tests/v2/04_runner/test_dummy_examples.py index b9f493f283..4c535e5ae0 100644 --- a/tests/v2/04_runner/test_dummy_examples.py +++ b/tests/v2/04_runner/test_dummy_examples.py @@ -4,16 +4,16 @@ from pathlib import Path import pytest +from aux_get_dataset_attrs import _get_dataset_attrs from devtools import debug from fixtures_mocks import * # noqa: F401,F403 -from v2_mock_models import DatasetV2Mock from v2_mock_models import WorkflowTaskV2Mock from fractal_server.app.runner.exceptions import JobExecutionError from fractal_server.urls import normalize_url -def execute_tasks_v2(wf_task_list, workflow_dir_local, **kwargs): +def execute_tasks_v2(wf_task_list, workflow_dir_local, **kwargs) -> None: from fractal_server.app.runner.task_files import task_subfolder_name from fractal_server.app.runner.v2.runner import ( execute_tasks_v2 as raw_execute_tasks_v2, @@ -26,16 +26,21 @@ def execute_tasks_v2(wf_task_list, workflow_dir_local, **kwargs): logging.info(f"Now creating {subfolder.as_posix()}") subfolder.mkdir(parents=True) - out = raw_execute_tasks_v2( + raw_execute_tasks_v2( wf_task_list=wf_task_list, workflow_dir_local=workflow_dir_local, **kwargs, ) - return out -def test_dummy_insert_single_image( - tmp_path: Path, executor: Executor, fractal_tasks_mock_no_db +async def test_dummy_insert_single_image( + db, + MockCurrentUser, + project_factory_v2, + dataset_factory_v2, + tmp_path: Path, + executor: Executor, + fractal_tasks_mock_no_db, ): # Preliminary setup execute_tasks_v2_args = dict( @@ -44,44 +49,13 @@ def test_dummy_insert_single_image( workflow_dir_remote=tmp_path / "job_dir", ) zarr_dir = (tmp_path / "zarr_dir").as_posix().rstrip("/") - - # Run successfully on an empty dataset - dataset_attrs = execute_tasks_v2( - wf_task_list=[ - WorkflowTaskV2Mock( - task=fractal_tasks_mock_no_db["dummy_insert_single_image"], - task_id=fractal_tasks_mock_no_db[ - "dummy_insert_single_image" - ].id, - id=0, - order=0, - ) - ], - dataset=DatasetV2Mock(name="dataset", zarr_dir=zarr_dir), - **execute_tasks_v2_args, - ) - debug(dataset_attrs["images"]) - - # Run successfully even if the image already exists - dataset_attrs = execute_tasks_v2( - wf_task_list=[ - WorkflowTaskV2Mock( - task=fractal_tasks_mock_no_db["dummy_insert_single_image"], - task_id=fractal_tasks_mock_no_db[ - "dummy_insert_single_image" - ].id, - id=1, - order=1, - ) - ], - dataset=DatasetV2Mock(name="dataset", zarr_dir=zarr_dir), - **execute_tasks_v2_args, - ) - debug(dataset_attrs["images"]) - - # Fail because new image is not relative to zarr_dir - IMAGES = [dict(zarr_url=Path(zarr_dir, "my-image").as_posix())] - with pytest.raises(JobExecutionError) as e: + async with MockCurrentUser() as user: + project = await project_factory_v2(user) + dataset = await dataset_factory_v2( + project_id=project.id, zarr_dir=zarr_dir + ) + # Run successfully on an empty dataset + debug(dataset) execute_tasks_v2( wf_task_list=[ WorkflowTaskV2Mock( @@ -89,63 +63,15 @@ def test_dummy_insert_single_image( task_id=fractal_tasks_mock_no_db[ "dummy_insert_single_image" ].id, - args_non_parallel={ - "full_new_image": dict( - zarr_url=IMAGES[0]["zarr_url"], origin="/somewhere" - ) - }, - id=2, - order=2, - ) - ], - dataset=DatasetV2Mock( - name="dataset", zarr_dir=zarr_dir, images=IMAGES - ), - **execute_tasks_v2_args, - ) - error_msg = str(e.value) - assert ( - "Cannot edit an image with zarr_url different from origin." - in error_msg - ) - - # Fail because types filters are set twice - execute_tasks_v2_args = dict( - executor=executor, - workflow_dir_local=tmp_path / "job_dir_2", - workflow_dir_remote=tmp_path / "job_dir_2", - ) - PATCHED_TASK = deepcopy( - fractal_tasks_mock_no_db["dummy_insert_single_image"] - ) - KEY = "something" - PATCHED_TASK.output_types = {KEY: True} - with pytest.raises(JobExecutionError) as e: - execute_tasks_v2( - wf_task_list=[ - WorkflowTaskV2Mock( - task=PATCHED_TASK, - task_id=PATCHED_TASK.id, - args_non_parallel={"types": {KEY: True}}, - id=2, - order=2, + id=0, + order=0, ) ], - dataset=DatasetV2Mock( - name="dataset", zarr_dir=zarr_dir, images=IMAGES - ), + dataset=dataset, **execute_tasks_v2_args, ) - error_msg = str(e.value) - assert "Some type filters are being set twice" in error_msg - # Fail because new image is not relative to zarr_dir - execute_tasks_v2_args = dict( - executor=executor, - workflow_dir_local=tmp_path / "job_dir_3", - workflow_dir_remote=tmp_path / "job_dir_3", - ) - with pytest.raises(JobExecutionError) as e: + # Run successfully even if the image already exists execute_tasks_v2( wf_task_list=[ WorkflowTaskV2Mock( @@ -153,41 +79,136 @@ def test_dummy_insert_single_image( task_id=fractal_tasks_mock_no_db[ "dummy_insert_single_image" ].id, - args_non_parallel={"fail": True}, - id=2, - order=2, + id=1, + order=1, ) ], - dataset=DatasetV2Mock(name="dataset", zarr_dir=zarr_dir), + dataset=dataset, **execute_tasks_v2_args, ) - error_msg = str(e.value) - assert "is not a parent directory" in error_msg - assert zarr_dir in error_msg + # Fail because new image is not relative to zarr_dir + IMAGES = [dict(zarr_url=Path(zarr_dir, "my-image").as_posix())] + dataset_images = await dataset_factory_v2( + project_id=project.id, zarr_dir=zarr_dir, images=IMAGES + ) + with pytest.raises(JobExecutionError) as e: + execute_tasks_v2( + wf_task_list=[ + WorkflowTaskV2Mock( + task=fractal_tasks_mock_no_db[ + "dummy_insert_single_image" + ], + task_id=fractal_tasks_mock_no_db[ + "dummy_insert_single_image" + ].id, + args_non_parallel={ + "full_new_image": dict( + zarr_url=IMAGES[0]["zarr_url"], + origin="/somewhere", + ) + }, + id=2, + order=2, + ) + ], + dataset=dataset_images, + **execute_tasks_v2_args, + ) + error_msg = str(e.value) + assert ( + "Cannot edit an image with zarr_url different from origin." + in error_msg + ) - # Fail because new image's zarr_url is equal to zarr_dir - with pytest.raises(JobExecutionError) as e: - execute_tasks_v2( - wf_task_list=[ - WorkflowTaskV2Mock( - task=fractal_tasks_mock_no_db["dummy_insert_single_image"], - task_id=fractal_tasks_mock_no_db[ - "dummy_insert_single_image" - ].id, - args_non_parallel={"fail_2": True}, - id=3, - order=3, - ) - ], - dataset=DatasetV2Mock(name="dataset", zarr_dir=zarr_dir), - **execute_tasks_v2_args, + # Fail because types filters are set twice + execute_tasks_v2_args = dict( + executor=executor, + workflow_dir_local=tmp_path / "job_dir_2", + workflow_dir_remote=tmp_path / "job_dir_2", + ) + PATCHED_TASK = deepcopy( + fractal_tasks_mock_no_db["dummy_insert_single_image"] + ) + KEY = "something" + PATCHED_TASK.output_types = {KEY: True} + with pytest.raises(JobExecutionError) as e: + execute_tasks_v2( + wf_task_list=[ + WorkflowTaskV2Mock( + task=PATCHED_TASK, + task_id=PATCHED_TASK.id, + args_non_parallel={"types": {KEY: True}}, + id=2, + order=2, + ) + ], + dataset=dataset_images, + **execute_tasks_v2_args, + ) + error_msg = str(e.value) + assert "Some type filters are being set twice" in error_msg + + # Fail because new image is not relative to zarr_dir + execute_tasks_v2_args = dict( + executor=executor, + workflow_dir_local=tmp_path / "job_dir_3", + workflow_dir_remote=tmp_path / "job_dir_3", + ) + with pytest.raises(JobExecutionError) as e: + execute_tasks_v2( + wf_task_list=[ + WorkflowTaskV2Mock( + task=fractal_tasks_mock_no_db[ + "dummy_insert_single_image" + ], + task_id=fractal_tasks_mock_no_db[ + "dummy_insert_single_image" + ].id, + args_non_parallel={"fail": True}, + id=2, + order=2, + ) + ], + dataset=dataset, + **execute_tasks_v2_args, + ) + error_msg = str(e.value) + assert "is not a parent directory" in error_msg + assert zarr_dir in error_msg + + # Fail because new image's zarr_url is equal to zarr_dir + with pytest.raises(JobExecutionError) as e: + execute_tasks_v2( + wf_task_list=[ + WorkflowTaskV2Mock( + task=fractal_tasks_mock_no_db[ + "dummy_insert_single_image" + ], + task_id=fractal_tasks_mock_no_db[ + "dummy_insert_single_image" + ].id, + args_non_parallel={"fail_2": True}, + id=3, + order=3, + ) + ], + dataset=dataset, + **execute_tasks_v2_args, + ) + error_msg = str(e.value) + assert ( + "Cannot create image if zarr_url is equal to zarr_dir" in error_msg ) - error_msg = str(e.value) - assert "Cannot create image if zarr_url is equal to zarr_dir" in error_msg -def test_dummy_remove_images( - tmp_path: Path, executor: Executor, fractal_tasks_mock_no_db +async def test_dummy_remove_images( + db, + MockCurrentUser, + project_factory_v2, + dataset_factory_v2, + tmp_path: Path, + executor: Executor, + fractal_tasks_mock_no_db, ): # Preliminary setup execute_tasks_v2_args = dict( @@ -196,59 +217,67 @@ def test_dummy_remove_images( workflow_dir_remote=tmp_path / "job_dir", ) zarr_dir = (tmp_path / "zarr_dir").as_posix().rstrip("/") - - # Run successfully on a dataset which includes the images to be removed - dataset_pre = DatasetV2Mock( - name="dataset", - zarr_dir=zarr_dir, - images=[ - dict(zarr_url=Path(zarr_dir, str(index)).as_posix()) - for index in [0, 1, 2] - ], - ) - dataset_attrs = execute_tasks_v2( - wf_task_list=[ - WorkflowTaskV2Mock( - task=fractal_tasks_mock_no_db["dummy_remove_images"], - task_id=fractal_tasks_mock_no_db["dummy_remove_images"].id, - id=0, - order=0, - ) - ], - dataset=dataset_pre, - **execute_tasks_v2_args, - ) - debug(dataset_attrs) - - # Fail when removing images that do not exist - dataset_pre = DatasetV2Mock( - name="dataset", - zarr_dir=zarr_dir, - ) - with pytest.raises(JobExecutionError) as e: + async with MockCurrentUser() as user: + # Run successfully on a dataset which includes the images to be removed + project = await project_factory_v2(user) + dataset_pre = await dataset_factory_v2( + project_id=project.id, + zarr_dir=zarr_dir, + images=[ + dict(zarr_url=Path(zarr_dir, str(index)).as_posix()) + for index in [0, 1, 2] + ], + ) execute_tasks_v2( wf_task_list=[ WorkflowTaskV2Mock( task=fractal_tasks_mock_no_db["dummy_remove_images"], task_id=fractal_tasks_mock_no_db["dummy_remove_images"].id, - id=1, - order=1, - args_non_parallel=dict( - more_zarr_urls=[ - Path(zarr_dir, "missing-image").as_posix() - ] - ), + id=0, + order=0, ) ], dataset=dataset_pre, **execute_tasks_v2_args, ) - error_msg = str(e.value) - assert "Cannot remove missing image" in error_msg - -def test_dummy_unset_attribute( - tmp_path: Path, executor: Executor, fractal_tasks_mock_no_db + # Fail when removing images that do not exist + dataset_pre_fail = await dataset_factory_v2( + project_id=project.id, + zarr_dir=zarr_dir, + ) + with pytest.raises(JobExecutionError) as e: + execute_tasks_v2( + wf_task_list=[ + WorkflowTaskV2Mock( + task=fractal_tasks_mock_no_db["dummy_remove_images"], + task_id=fractal_tasks_mock_no_db[ + "dummy_remove_images" + ].id, + id=1, + order=1, + args_non_parallel=dict( + more_zarr_urls=[ + Path(zarr_dir, "missing-image").as_posix() + ] + ), + ) + ], + dataset=dataset_pre_fail, + **execute_tasks_v2_args, + ) + error_msg = str(e.value) + assert "Cannot remove missing image" in error_msg + + +async def test_dummy_unset_attribute( + db, + MockCurrentUser, + project_factory_v2, + dataset_factory_v2, + tmp_path: Path, + executor: Executor, + fractal_tasks_mock_no_db, ): # Preliminary setup execute_tasks_v2_args = dict( @@ -257,21 +286,22 @@ def test_dummy_unset_attribute( workflow_dir_remote=tmp_path / "job_dir", ) zarr_dir = (tmp_path / "zarr_dir").as_posix().rstrip("/") - - dataset_pre = DatasetV2Mock( - name="dataset", - zarr_dir=zarr_dir, - images=[ - dict( - zarr_url=Path(zarr_dir, "my-image").as_posix(), - attributes={"key1": "value1", "key2": "value2"}, - types={}, - ) - ], - ) + async with MockCurrentUser() as user: + project = await project_factory_v2(user) + dataset_pre = await dataset_factory_v2( + project_id=project.id, + zarr_dir=zarr_dir, + images=[ + dict( + zarr_url=Path(zarr_dir, "my-image").as_posix(), + attributes={"key1": "value1", "key2": "value2"}, + types={}, + ) + ], + ) # Unset an existing attribute (starting from dataset_pre) - dataset_attrs = execute_tasks_v2( + execute_tasks_v2( wf_task_list=[ WorkflowTaskV2Mock( task=fractal_tasks_mock_no_db["dummy_unset_attribute"], @@ -284,11 +314,12 @@ def test_dummy_unset_attribute( dataset=dataset_pre, **execute_tasks_v2_args, ) + dataset_attrs = await _get_dataset_attrs(db, dataset_pre.id) debug(dataset_attrs["images"]) assert "key2" not in dataset_attrs["images"][0]["attributes"].keys() # Unset a missing attribute (starting from dataset_pre) - dataset_attrs = execute_tasks_v2( + execute_tasks_v2( wf_task_list=[ WorkflowTaskV2Mock( task=fractal_tasks_mock_no_db["dummy_unset_attribute"], @@ -301,15 +332,21 @@ def test_dummy_unset_attribute( dataset=dataset_pre, **execute_tasks_v2_args, ) - debug(dataset_attrs["images"]) + dataset_attrs = await _get_dataset_attrs(db, dataset_pre.id) assert dataset_attrs["images"][0]["attributes"] == { "key1": "value1", "key2": "value2", } -def test_dummy_insert_single_image_none_attribute( - tmp_path: Path, executor: Executor, fractal_tasks_mock_no_db +async def test_dummy_insert_single_image_none_attribute( + db, + MockCurrentUser, + project_factory_v2, + dataset_factory_v2, + tmp_path: Path, + executor: Executor, + fractal_tasks_mock_no_db, ): # Preliminary setup execute_tasks_v2_args = dict( @@ -318,31 +355,45 @@ def test_dummy_insert_single_image_none_attribute( workflow_dir_remote=tmp_path / "job_dir", ) zarr_dir = (tmp_path / "zarr_dir").as_posix().rstrip("/") - - # Run successfully on an empty dataset - dataset_attrs = execute_tasks_v2( - wf_task_list=[ - WorkflowTaskV2Mock( - task=fractal_tasks_mock_no_db["dummy_insert_single_image"], - task_id=fractal_tasks_mock_no_db[ - "dummy_insert_single_image" - ].id, - args_non_parallel=dict(attributes={"attribute-name": None}), - id=0, - order=0, - ) - ], - dataset=DatasetV2Mock(name="dataset", zarr_dir=zarr_dir), - **execute_tasks_v2_args, - ) - debug(dataset_attrs["images"]) - assert ( - "attribute-name" not in dataset_attrs["images"][0]["attributes"].keys() - ) + async with MockCurrentUser() as user: + project = await project_factory_v2(user) + dataset = await dataset_factory_v2( + project_id=project.id, zarr_dir=zarr_dir + ) + # Run successfully on an empty dataset + execute_tasks_v2( + wf_task_list=[ + WorkflowTaskV2Mock( + task=fractal_tasks_mock_no_db["dummy_insert_single_image"], + task_id=fractal_tasks_mock_no_db[ + "dummy_insert_single_image" + ].id, + args_non_parallel=dict( + attributes={"attribute-name": None} + ), + id=0, + order=0, + ) + ], + dataset=dataset, + **execute_tasks_v2_args, + ) + dataset_attrs = await _get_dataset_attrs(db, dataset.id) + debug(dataset_attrs["images"]) + assert ( + "attribute-name" + not in dataset_attrs["images"][0]["attributes"].keys() + ) -def test_dummy_insert_single_image_normalization( - tmp_path: Path, executor: Executor, fractal_tasks_mock_no_db +async def test_dummy_insert_single_image_normalization( + db, + MockCurrentUser, + project_factory_v2, + dataset_factory_v2, + tmp_path: Path, + executor: Executor, + fractal_tasks_mock_no_db, ): # Preliminary setup execute_tasks_v2_args = dict( @@ -351,30 +402,41 @@ def test_dummy_insert_single_image_normalization( workflow_dir_remote=tmp_path / "job_dir", ) zarr_dir = (tmp_path / "zarr_dir").as_posix().rstrip("/") - - # Run successfully with trailing slashes - dataset_attrs = execute_tasks_v2( - wf_task_list=[ - WorkflowTaskV2Mock( - task=fractal_tasks_mock_no_db["dummy_insert_single_image"], - task_id=fractal_tasks_mock_no_db[ - "dummy_insert_single_image" - ].id, - id=0, - order=0, - args_non_parallel={"trailing_slash": True}, - ) - ], - dataset=DatasetV2Mock(name="dataset", zarr_dir=zarr_dir), - **execute_tasks_v2_args, - ) - debug(dataset_attrs["images"]) - for image in dataset_attrs["images"]: - assert normalize_url(image["zarr_url"]) == image["zarr_url"] - - -def test_default_inclusion_of_images( - tmp_path: Path, executor: Executor, fractal_tasks_mock_no_db + async with MockCurrentUser() as user: + project = await project_factory_v2(user) + dataset = await dataset_factory_v2( + project_id=project.id, zarr_dir=zarr_dir + ) + # Run successfully with trailing slashes + execute_tasks_v2( + wf_task_list=[ + WorkflowTaskV2Mock( + task=fractal_tasks_mock_no_db["dummy_insert_single_image"], + task_id=fractal_tasks_mock_no_db[ + "dummy_insert_single_image" + ].id, + id=0, + order=0, + args_non_parallel={"trailing_slash": True}, + ) + ], + dataset=dataset, + **execute_tasks_v2_args, + ) + dataset_attrs = await _get_dataset_attrs(db, dataset.id) + debug(dataset_attrs["images"]) + for image in dataset_attrs["images"]: + assert normalize_url(image["zarr_url"]) == image["zarr_url"] + + +async def test_default_inclusion_of_images( + db, + MockCurrentUser, + project_factory_v2, + dataset_factory_v2, + tmp_path: Path, + executor: Executor, + fractal_tasks_mock_no_db, ): """ Ref @@ -389,26 +451,32 @@ def test_default_inclusion_of_images( types={}, ) ] - dataset_pre = DatasetV2Mock( - name="dataset", zarr_dir=zarr_dir, images=images - ) + async with MockCurrentUser() as user: + project = await project_factory_v2(user) + dataset_pre = await dataset_factory_v2( + project_id=project.id, zarr_dir=zarr_dir, images=images + ) - # Run - dataset_attrs = execute_tasks_v2( - wf_task_list=[ - WorkflowTaskV2Mock( - task=fractal_tasks_mock_no_db["generic_task_parallel"], - task_id=fractal_tasks_mock_no_db["generic_task_parallel"].id, - id=0, - order=0, - ) - ], - dataset=dataset_pre, - executor=executor, - workflow_dir_local=tmp_path / "job_dir", - workflow_dir_remote=tmp_path / "job_dir", - ) - image = dataset_attrs["images"][0] - debug(dataset_attrs) - debug(image) - assert image["types"] == dict(my_type=True) + # Run + execute_tasks_v2( + wf_task_list=[ + WorkflowTaskV2Mock( + task=fractal_tasks_mock_no_db["generic_task_parallel"], + task_id=fractal_tasks_mock_no_db[ + "generic_task_parallel" + ].id, + rder=0, + id=0, + order=0, + ) + ], + dataset=dataset_pre, + executor=executor, + workflow_dir_local=tmp_path / "job_dir", + workflow_dir_remote=tmp_path / "job_dir", + ) + dataset_attrs = await _get_dataset_attrs(db, dataset_pre.id) + image = dataset_attrs["images"][0] + debug(dataset_attrs) + debug(image) + assert image["types"] == dict(my_type=True) diff --git a/tests/v2/04_runner/test_fractal_examples.py b/tests/v2/04_runner/test_fractal_examples.py index ac90971130..1882a87902 100644 --- a/tests/v2/04_runner/test_fractal_examples.py +++ b/tests/v2/04_runner/test_fractal_examples.py @@ -5,9 +5,9 @@ from typing import Any import pytest +from aux_get_dataset_attrs import _get_dataset_attrs from devtools import debug from fixtures_mocks import * # noqa: F401,F403 -from v2_mock_models import DatasetV2Mock from v2_mock_models import TaskV2Mock from v2_mock_models import WorkflowTaskV2Mock @@ -29,12 +29,11 @@ def execute_tasks_v2(wf_task_list, workflow_dir_local, **kwargs): logging.info(f"Now creating {subfolder.as_posix()}") subfolder.mkdir(parents=True) - out = raw_execute_tasks_v2( + raw_execute_tasks_v2( wf_task_list=wf_task_list, workflow_dir_local=workflow_dir_local, **kwargs, ) - return out def _assert_image_data_exist(image_list: list[dict]): @@ -61,8 +60,14 @@ def image_data_exist_on_disk(image_list: list[SingleImage]): return all_images_have_data -def test_fractal_demos_01( - tmp_path: Path, executor: Executor, fractal_tasks_mock_no_db +async def test_fractal_demos_01( + db, + MockCurrentUser, + project_factory_v2, + dataset_factory_v2, + tmp_path: Path, + executor: Executor, + fractal_tasks_mock_no_db, ): """ Mock of fractal-demos/examples/01. @@ -75,147 +80,172 @@ def test_fractal_demos_01( ) zarr_dir = (tmp_path / "zarr_dir").as_posix().rstrip("/") - dataset_attrs = execute_tasks_v2( - wf_task_list=[ - WorkflowTaskV2Mock( - task=fractal_tasks_mock_no_db["create_ome_zarr_compound"], - task_id=fractal_tasks_mock_no_db[ - "create_ome_zarr_compound" - ].id, - args_non_parallel=dict(image_dir="/tmp/input_images"), - args_parallel={}, - id=0, - order=0, - ) - ], - dataset=DatasetV2Mock(name="dataset", zarr_dir=zarr_dir), - **execute_tasks_v2_args, - ) - - assert _task_names_from_history(dataset_attrs["history"]) == [ - "create_ome_zarr_compound" - ] - assert dataset_attrs["filters"]["attributes"] == {} - assert dataset_attrs["filters"]["types"] == {} - _assert_image_data_exist(dataset_attrs["images"]) - assert len(dataset_attrs["images"]) == 2 - - dataset_attrs = execute_tasks_v2( - wf_task_list=[ - WorkflowTaskV2Mock( - task=fractal_tasks_mock_no_db["illumination_correction"], - task_id=fractal_tasks_mock_no_db["illumination_correction"].id, - args_parallel=dict(overwrite_input=True), - id=1, - order=1, - ) - ], - dataset=DatasetV2Mock( - name="dataset", zarr_dir=zarr_dir, **dataset_attrs - ), - **execute_tasks_v2_args, - ) - assert _task_names_from_history(dataset_attrs["history"]) == [ - "illumination_correction", - ] - assert dataset_attrs["filters"]["attributes"] == {} - assert dataset_attrs["filters"]["types"] == { - "illumination_correction": True, - } - assert set(img["zarr_url"] for img in dataset_attrs["images"]) == { - f"{zarr_dir}/my_plate.zarr/A/01/0", - f"{zarr_dir}/my_plate.zarr/A/02/0", - } - - img = find_image_by_zarr_url( - zarr_url=f"{zarr_dir}/my_plate.zarr/A/01/0", - images=dataset_attrs["images"], - )["image"] - assert img == { - "zarr_url": f"{zarr_dir}/my_plate.zarr/A/01/0", - "attributes": { - "well": "A01", - "plate": "my_plate.zarr", - }, - "types": { - "illumination_correction": True, - "3D": True, - }, - "origin": None, - } - - _assert_image_data_exist(dataset_attrs["images"]) - - dataset_attrs = execute_tasks_v2( - wf_task_list=[ - WorkflowTaskV2Mock( - task=fractal_tasks_mock_no_db["MIP_compound"], - task_id=fractal_tasks_mock_no_db["MIP_compound"].id, - args_non_parallel=dict(suffix="mip"), - args_parallel={}, - id=2, - order=2, - ) - ], - dataset=DatasetV2Mock( - name="dataset", zarr_dir=zarr_dir, **dataset_attrs - ), - **execute_tasks_v2_args, - ) - debug(dataset_attrs) - - assert _task_names_from_history(dataset_attrs["history"]) == [ - "MIP_compound", - ] - - assert dataset_attrs["filters"]["attributes"] == {} - assert dataset_attrs["filters"]["types"] == { - "illumination_correction": True, - "3D": False, - } - img = find_image_by_zarr_url( - zarr_url=f"{zarr_dir}/my_plate_mip.zarr/A/01/0", - images=dataset_attrs["images"], - )["image"] - assert img == { - "zarr_url": f"{zarr_dir}/my_plate_mip.zarr/A/01/0", - "origin": f"{zarr_dir}/my_plate.zarr/A/01/0", - "attributes": { - "well": "A01", - "plate": "my_plate_mip.zarr", - }, - "types": { - "3D": False, + async with MockCurrentUser() as user: + project = await project_factory_v2(user) + dataset = await dataset_factory_v2( + project_id=project.id, zarr_dir=zarr_dir + ) + execute_tasks_v2( + wf_task_list=[ + WorkflowTaskV2Mock( + task=fractal_tasks_mock_no_db["create_ome_zarr_compound"], + task_id=fractal_tasks_mock_no_db[ + "create_ome_zarr_compound" + ].id, + args_non_parallel=dict(image_dir="/tmp/input_images"), + args_parallel={}, + id=0, + order=0, + ) + ], + dataset=dataset, + **execute_tasks_v2_args, + ) + dataset_attrs = await _get_dataset_attrs(db, dataset.id) + debug(dataset_attrs["history"]) + assert _task_names_from_history(dataset_attrs["history"]) == [ + "create_ome_zarr_compound" + ] + assert dataset_attrs["filters"]["attributes"] == {} + assert dataset_attrs["filters"]["types"] == {} + _assert_image_data_exist(dataset_attrs["images"]) + assert len(dataset_attrs["images"]) == 2 + + dataset_with_attrs = await dataset_factory_v2( + project_id=project.id, zarr_dir=zarr_dir, **dataset_attrs + ) + execute_tasks_v2( + wf_task_list=[ + WorkflowTaskV2Mock( + task=fractal_tasks_mock_no_db["illumination_correction"], + task_id=fractal_tasks_mock_no_db[ + "illumination_correction" + ].id, + args_parallel=dict(overwrite_input=True), + id=1, + order=1, + ) + ], + dataset=dataset_with_attrs, + **execute_tasks_v2_args, + ) + dataset_attrs = await _get_dataset_attrs(db, dataset_with_attrs.id) + assert _task_names_from_history(dataset_attrs["history"]) == [ + "create_ome_zarr_compound", + "illumination_correction", + ] + assert dataset_attrs["filters"]["attributes"] == {} + assert dataset_attrs["filters"]["types"] == { "illumination_correction": True, - }, - } - _assert_image_data_exist(dataset_attrs["images"]) - - dataset_attrs = execute_tasks_v2( - wf_task_list=[ - WorkflowTaskV2Mock( - task=fractal_tasks_mock_no_db["cellpose_segmentation"], - task_id=fractal_tasks_mock_no_db["cellpose_segmentation"].id, - args_parallel={}, - id=3, - order=3, - ) - ], - dataset=DatasetV2Mock( - name="dataset", zarr_dir=zarr_dir, **dataset_attrs - ), - **execute_tasks_v2_args, - ) - - debug(dataset_attrs) - - assert _task_names_from_history(dataset_attrs["history"]) == [ - "cellpose_segmentation", - ] + } + assert set(img["zarr_url"] for img in dataset_attrs["images"]) == { + f"{zarr_dir}/my_plate.zarr/A/01/0", + f"{zarr_dir}/my_plate.zarr/A/02/0", + } + + img = find_image_by_zarr_url( + zarr_url=f"{zarr_dir}/my_plate.zarr/A/01/0", + images=dataset_attrs["images"], + )["image"] + assert img == { + "zarr_url": f"{zarr_dir}/my_plate.zarr/A/01/0", + "attributes": { + "well": "A01", + "plate": "my_plate.zarr", + }, + "types": { + "illumination_correction": True, + "3D": True, + }, + "origin": None, + } + + _assert_image_data_exist(dataset_attrs["images"]) + dataset_with_attrs = await dataset_factory_v2( + project_id=project.id, zarr_dir=zarr_dir, **dataset_attrs + ) + execute_tasks_v2( + wf_task_list=[ + WorkflowTaskV2Mock( + task=fractal_tasks_mock_no_db["MIP_compound"], + task_id=fractal_tasks_mock_no_db["MIP_compound"].id, + args_non_parallel=dict(suffix="mip"), + args_parallel={}, + id=2, + order=2, + ) + ], + dataset=dataset_with_attrs, + **execute_tasks_v2_args, + ) + dataset_attrs = await _get_dataset_attrs(db, dataset_with_attrs.id) + debug(dataset_attrs) + assert _task_names_from_history(dataset_attrs["history"]) == [ + "create_ome_zarr_compound", + "illumination_correction", + "MIP_compound", + ] -def test_fractal_demos_01_no_overwrite( - tmp_path: Path, executor: Executor, fractal_tasks_mock_no_db + assert dataset_attrs["filters"]["attributes"] == {} + assert dataset_attrs["filters"]["types"] == { + "illumination_correction": True, + "3D": False, + } + img = find_image_by_zarr_url( + zarr_url=f"{zarr_dir}/my_plate_mip.zarr/A/01/0", + images=dataset_attrs["images"], + )["image"] + assert img == { + "zarr_url": f"{zarr_dir}/my_plate_mip.zarr/A/01/0", + "origin": f"{zarr_dir}/my_plate.zarr/A/01/0", + "attributes": { + "well": "A01", + "plate": "my_plate_mip.zarr", + }, + "types": { + "3D": False, + "illumination_correction": True, + }, + } + _assert_image_data_exist(dataset_attrs["images"]) + dataset_with_attrs = await dataset_factory_v2( + project_id=project.id, zarr_dir=zarr_dir, **dataset_attrs + ) + execute_tasks_v2( + wf_task_list=[ + WorkflowTaskV2Mock( + task=fractal_tasks_mock_no_db["cellpose_segmentation"], + task_id=fractal_tasks_mock_no_db[ + "cellpose_segmentation" + ].id, + args_parallel={}, + id=3, + order=3, + ) + ], + dataset=dataset_with_attrs, + **execute_tasks_v2_args, + ) + dataset_attrs = await _get_dataset_attrs(db, dataset_with_attrs.id) + debug(dataset_attrs) + + assert _task_names_from_history(dataset_attrs["history"]) == [ + "create_ome_zarr_compound", + "illumination_correction", + "MIP_compound", + "cellpose_segmentation", + ] + + +async def test_fractal_demos_01_no_overwrite( + db, + MockCurrentUser, + project_factory_v2, + dataset_factory_v2, + tmp_path: Path, + executor: Executor, + fractal_tasks_mock_no_db, ): """ Similar to fractal-demos/examples/01, but illumination @@ -229,194 +259,217 @@ def test_fractal_demos_01_no_overwrite( workflow_dir_local=tmp_path / "job_dir", workflow_dir_remote=tmp_path / "job_dir", ) - - dataset_attrs = execute_tasks_v2( - wf_task_list=[ - WorkflowTaskV2Mock( - task=fractal_tasks_mock_no_db["create_ome_zarr_compound"], - task_id=fractal_tasks_mock_no_db[ - "create_ome_zarr_compound" - ].id, - args_non_parallel=dict(image_dir="/tmp/input_images"), - id=0, - order=0, - ) - ], - dataset=DatasetV2Mock(name="dataset", zarr_dir=zarr_dir), - **execute_tasks_v2_args, - ) - assert [img["zarr_url"] for img in dataset_attrs["images"]] == [ - f"{zarr_dir}/my_plate.zarr/A/01/0", - f"{zarr_dir}/my_plate.zarr/A/02/0", - ] - - _assert_image_data_exist(dataset_attrs["images"]) - # Run illumination correction with overwrite_input=False - dataset_attrs = execute_tasks_v2( - wf_task_list=[ - WorkflowTaskV2Mock( - task=fractal_tasks_mock_no_db["illumination_correction"], - task_id=fractal_tasks_mock_no_db["illumination_correction"].id, - args_parallel=dict(overwrite_input=False), - id=1, - order=1, - ) - ], - dataset=DatasetV2Mock( - name="dataset", zarr_dir=zarr_dir, **dataset_attrs - ), - **execute_tasks_v2_args, - ) - - assert _task_names_from_history(dataset_attrs["history"]) == [ - "illumination_correction", - ] - assert dataset_attrs["filters"]["attributes"] == {} - assert dataset_attrs["filters"]["types"] == { - "illumination_correction": True, - } - assert [img["zarr_url"] for img in dataset_attrs["images"]] == [ - f"{zarr_dir}/my_plate.zarr/A/01/0", - f"{zarr_dir}/my_plate.zarr/A/02/0", - f"{zarr_dir}/my_plate.zarr/A/01/0_corr", - f"{zarr_dir}/my_plate.zarr/A/02/0_corr", - ] - assert dataset_attrs["images"][0] == { - "zarr_url": f"{zarr_dir}/my_plate.zarr/A/01/0", - "origin": None, - "attributes": { - "well": "A01", - "plate": "my_plate.zarr", - }, - "types": { - "3D": True, - }, - } - assert dataset_attrs["images"][1] == { - "zarr_url": f"{zarr_dir}/my_plate.zarr/A/02/0", - "origin": None, - "attributes": { - "well": "A02", - "plate": "my_plate.zarr", - }, - "types": { - "3D": True, - }, - } - assert dataset_attrs["images"][2] == { - "zarr_url": f"{zarr_dir}/my_plate.zarr/A/01/0_corr", - "origin": f"{zarr_dir}/my_plate.zarr/A/01/0", - "attributes": { - "well": "A01", - "plate": "my_plate.zarr", - }, - "types": { - "illumination_correction": True, - "3D": True, - }, - } - assert dataset_attrs["images"][3] == { - "zarr_url": f"{zarr_dir}/my_plate.zarr/A/02/0_corr", - "origin": f"{zarr_dir}/my_plate.zarr/A/02/0", - "attributes": { - "well": "A02", - "plate": "my_plate.zarr", - }, - "types": { - "3D": True, + async with MockCurrentUser() as user: + project = await project_factory_v2(user) + dataset = await dataset_factory_v2( + project_id=project.id, zarr_dir=zarr_dir + ) + execute_tasks_v2( + wf_task_list=[ + WorkflowTaskV2Mock( + task=fractal_tasks_mock_no_db["create_ome_zarr_compound"], + task_id=fractal_tasks_mock_no_db[ + "create_ome_zarr_compound" + ].id, + args_non_parallel=dict(image_dir="/tmp/input_images"), + id=0, + order=0, + ) + ], + dataset=dataset, + **execute_tasks_v2_args, + ) + dataset_attrs = await _get_dataset_attrs(db, dataset.id) + assert [img["zarr_url"] for img in dataset_attrs["images"]] == [ + f"{zarr_dir}/my_plate.zarr/A/01/0", + f"{zarr_dir}/my_plate.zarr/A/02/0", + ] + + _assert_image_data_exist(dataset_attrs["images"]) + # Run illumination correction with overwrite_input=False + dataset_with_attrs = await dataset_factory_v2( + project_id=project.id, zarr_dir=zarr_dir, **dataset_attrs + ) + execute_tasks_v2( + wf_task_list=[ + WorkflowTaskV2Mock( + task=fractal_tasks_mock_no_db["illumination_correction"], + task_id=fractal_tasks_mock_no_db[ + "illumination_correction" + ].id, + args_parallel=dict(overwrite_input=False), + id=1, + order=1, + ) + ], + dataset=dataset_with_attrs, + **execute_tasks_v2_args, + ) + dataset_attrs = await _get_dataset_attrs(db, dataset_with_attrs.id) + assert _task_names_from_history(dataset_attrs["history"]) == [ + "create_ome_zarr_compound", + "illumination_correction", + ] + assert dataset_attrs["filters"]["attributes"] == {} + assert dataset_attrs["filters"]["types"] == { "illumination_correction": True, - }, - } - _assert_image_data_exist(dataset_attrs["images"]) - - dataset_attrs = execute_tasks_v2( - wf_task_list=[ - WorkflowTaskV2Mock( - task=fractal_tasks_mock_no_db["MIP_compound"], - task_id=fractal_tasks_mock_no_db["MIP_compound"].id, - args_non_parallel=dict(suffix="mip"), - id=2, - order=2, - ) - ], - dataset=DatasetV2Mock( - name="dataset", zarr_dir=zarr_dir, **dataset_attrs - ), - **execute_tasks_v2_args, - ) - - assert _task_names_from_history(dataset_attrs["history"]) == [ - "MIP_compound", - ] - assert dataset_attrs["filters"]["attributes"] == {} - assert dataset_attrs["filters"]["types"] == { - "3D": False, - "illumination_correction": True, - } - assert [img["zarr_url"] for img in dataset_attrs["images"]] == [ - f"{zarr_dir}/my_plate.zarr/A/01/0", - f"{zarr_dir}/my_plate.zarr/A/02/0", - f"{zarr_dir}/my_plate.zarr/A/01/0_corr", - f"{zarr_dir}/my_plate.zarr/A/02/0_corr", - f"{zarr_dir}/my_plate_mip.zarr/A/01/0_corr", - f"{zarr_dir}/my_plate_mip.zarr/A/02/0_corr", - ] - - assert dataset_attrs["images"][4] == { - "zarr_url": f"{zarr_dir}/my_plate_mip.zarr/A/01/0_corr", - "origin": f"{zarr_dir}/my_plate.zarr/A/01/0_corr", - "attributes": { - "well": "A01", - "plate": "my_plate_mip.zarr", - }, - "types": { + } + assert [img["zarr_url"] for img in dataset_attrs["images"]] == [ + f"{zarr_dir}/my_plate.zarr/A/01/0", + f"{zarr_dir}/my_plate.zarr/A/02/0", + f"{zarr_dir}/my_plate.zarr/A/01/0_corr", + f"{zarr_dir}/my_plate.zarr/A/02/0_corr", + ] + assert dataset_attrs["images"][0] == { + "zarr_url": f"{zarr_dir}/my_plate.zarr/A/01/0", + "origin": None, + "attributes": { + "well": "A01", + "plate": "my_plate.zarr", + }, + "types": { + "3D": True, + }, + } + assert dataset_attrs["images"][1] == { + "zarr_url": f"{zarr_dir}/my_plate.zarr/A/02/0", + "origin": None, + "attributes": { + "well": "A02", + "plate": "my_plate.zarr", + }, + "types": { + "3D": True, + }, + } + assert dataset_attrs["images"][2] == { + "zarr_url": f"{zarr_dir}/my_plate.zarr/A/01/0_corr", + "origin": f"{zarr_dir}/my_plate.zarr/A/01/0", + "attributes": { + "well": "A01", + "plate": "my_plate.zarr", + }, + "types": { + "illumination_correction": True, + "3D": True, + }, + } + assert dataset_attrs["images"][3] == { + "zarr_url": f"{zarr_dir}/my_plate.zarr/A/02/0_corr", + "origin": f"{zarr_dir}/my_plate.zarr/A/02/0", + "attributes": { + "well": "A02", + "plate": "my_plate.zarr", + }, + "types": { + "3D": True, + "illumination_correction": True, + }, + } + _assert_image_data_exist(dataset_attrs["images"]) + dataset_with_attrs = await dataset_factory_v2( + project_id=project.id, zarr_dir=zarr_dir, **dataset_attrs + ) + execute_tasks_v2( + wf_task_list=[ + WorkflowTaskV2Mock( + task=fractal_tasks_mock_no_db["MIP_compound"], + task_id=fractal_tasks_mock_no_db["MIP_compound"].id, + args_non_parallel=dict(suffix="mip"), + id=2, + order=2, + ) + ], + dataset=dataset_with_attrs, + **execute_tasks_v2_args, + ) + dataset_attrs = await _get_dataset_attrs(db, dataset_with_attrs.id) + + assert _task_names_from_history(dataset_attrs["history"]) == [ + "create_ome_zarr_compound", + "illumination_correction", + "MIP_compound", + ] + assert dataset_attrs["filters"]["attributes"] == {} + assert dataset_attrs["filters"]["types"] == { "3D": False, "illumination_correction": True, - }, - } - assert dataset_attrs["images"][5] == { - "zarr_url": f"{zarr_dir}/my_plate_mip.zarr/A/02/0_corr", - "origin": f"{zarr_dir}/my_plate.zarr/A/02/0_corr", - "attributes": { - "well": "A02", - "plate": "my_plate_mip.zarr", - }, - "types": { + } + assert [img["zarr_url"] for img in dataset_attrs["images"]] == [ + f"{zarr_dir}/my_plate.zarr/A/01/0", + f"{zarr_dir}/my_plate.zarr/A/02/0", + f"{zarr_dir}/my_plate.zarr/A/01/0_corr", + f"{zarr_dir}/my_plate.zarr/A/02/0_corr", + f"{zarr_dir}/my_plate_mip.zarr/A/01/0_corr", + f"{zarr_dir}/my_plate_mip.zarr/A/02/0_corr", + ] + + assert dataset_attrs["images"][4] == { + "zarr_url": f"{zarr_dir}/my_plate_mip.zarr/A/01/0_corr", + "origin": f"{zarr_dir}/my_plate.zarr/A/01/0_corr", + "attributes": { + "well": "A01", + "plate": "my_plate_mip.zarr", + }, + "types": { + "3D": False, + "illumination_correction": True, + }, + } + assert dataset_attrs["images"][5] == { + "zarr_url": f"{zarr_dir}/my_plate_mip.zarr/A/02/0_corr", + "origin": f"{zarr_dir}/my_plate.zarr/A/02/0_corr", + "attributes": { + "well": "A02", + "plate": "my_plate_mip.zarr", + }, + "types": { + "3D": False, + "illumination_correction": True, + }, + } + + assert dataset_attrs["filters"]["attributes"] == {} + assert dataset_attrs["filters"]["types"] == { "3D": False, "illumination_correction": True, - }, - } - - assert dataset_attrs["filters"]["attributes"] == {} - assert dataset_attrs["filters"]["types"] == { - "3D": False, - "illumination_correction": True, - } - - _assert_image_data_exist(dataset_attrs["images"]) - - dataset_attrs = execute_tasks_v2( - wf_task_list=[ - WorkflowTaskV2Mock( - task=fractal_tasks_mock_no_db["cellpose_segmentation"], - task_id=fractal_tasks_mock_no_db["cellpose_segmentation"].id, - id=3, - order=3, - ) - ], - dataset=DatasetV2Mock( - name="dataset", zarr_dir=zarr_dir, **dataset_attrs - ), - **execute_tasks_v2_args, - ) - - assert _task_names_from_history(dataset_attrs["history"]) == [ - "cellpose_segmentation", - ] + } - -def test_registration_no_overwrite( - tmp_path: Path, executor: Executor, fractal_tasks_mock_no_db + _assert_image_data_exist(dataset_attrs["images"]) + dataset_with_attrs = await dataset_factory_v2( + project_id=project.id, zarr_dir=zarr_dir, **dataset_attrs + ) + execute_tasks_v2( + wf_task_list=[ + WorkflowTaskV2Mock( + task=fractal_tasks_mock_no_db["cellpose_segmentation"], + task_id=fractal_tasks_mock_no_db[ + "cellpose_segmentation" + ].id, + id=3, + order=3, + ) + ], + dataset=dataset_with_attrs, + **execute_tasks_v2_args, + ) + dataset_attrs = await _get_dataset_attrs(db, dataset_with_attrs.id) + assert _task_names_from_history(dataset_attrs["history"]) == [ + "create_ome_zarr_compound", + "illumination_correction", + "MIP_compound", + "cellpose_segmentation", + ] + + +async def test_registration_no_overwrite( + db, + MockCurrentUser, + project_factory_v2, + dataset_factory_v2, + tmp_path: Path, + executor: Executor, + fractal_tasks_mock_no_db, ): """ Test registration workflow, based on four tasks. @@ -428,104 +481,129 @@ def test_registration_no_overwrite( workflow_dir_remote=tmp_path / "job_dir", ) zarr_dir = (tmp_path / "zarr_dir").as_posix().rstrip("/") - dataset_attrs = execute_tasks_v2( - wf_task_list=[ - WorkflowTaskV2Mock( - task=fractal_tasks_mock_no_db[ - "create_ome_zarr_multiplex_compound" - ], - task_id=fractal_tasks_mock_no_db[ - "create_ome_zarr_multiplex_compound" - ].id, - args_non_parallel=dict(image_dir="/tmp/input_images"), - id=0, - order=0, - ), - ], - dataset=DatasetV2Mock(name="dataset", zarr_dir=zarr_dir), - **execute_tasks_v2_args, - ) + async with MockCurrentUser() as user: + project = await project_factory_v2(user) + dataset = await dataset_factory_v2( + project_id=project.id, zarr_dir=zarr_dir + ) + execute_tasks_v2( + wf_task_list=[ + WorkflowTaskV2Mock( + task=fractal_tasks_mock_no_db[ + "create_ome_zarr_multiplex_compound" + ], + task_id=fractal_tasks_mock_no_db[ + "create_ome_zarr_multiplex_compound" + ].id, + args_non_parallel=dict(image_dir="/tmp/input_images"), + id=0, + order=0, + ), + ], + dataset=dataset, + **execute_tasks_v2_args, + ) + dataset_attrs = await _get_dataset_attrs(db, dataset.id) + # Run init registration + dataset_with_attrs = await dataset_factory_v2( + project_id=project.id, zarr_dir=zarr_dir, **dataset_attrs + ) + execute_tasks_v2( + wf_task_list=[ + WorkflowTaskV2Mock( + task=fractal_tasks_mock_no_db[ + "calculate_registration_compound" + ], + task_id=fractal_tasks_mock_no_db[ + "calculate_registration_compound" + ].id, + args_non_parallel={"ref_acquisition": 0}, + id=1, + order=1, + ) + ], + dataset=dataset_with_attrs, + **execute_tasks_v2_args, + ) + dataset_attrs = await _get_dataset_attrs(db, dataset_with_attrs.id) - # Run init registration - dataset_attrs = execute_tasks_v2( - wf_task_list=[ - WorkflowTaskV2Mock( - task=fractal_tasks_mock_no_db[ - "calculate_registration_compound" - ], - task_id=fractal_tasks_mock_no_db[ - "calculate_registration_compound" - ].id, - args_non_parallel={"ref_acquisition": 0}, - id=1, - order=1, - ) - ], - dataset=DatasetV2Mock( - name="dataset", zarr_dir=zarr_dir, **dataset_attrs - ), - **execute_tasks_v2_args, - ) + # In all non-reference-cycle images, a certain table was updated + for image in dataset_attrs["images"]: + if image["attributes"]["acquisition"] == 0: + assert not os.path.isfile( + f"{image['zarr_url']}/registration_table" + ) + else: + assert os.path.isfile( + f"{image['zarr_url']}/registration_table" + ) - # In all non-reference-cycle images, a certain table was updated - for image in dataset_attrs["images"]: - if image["attributes"]["acquisition"] == 0: - assert not os.path.isfile( - f"{image['zarr_url']}/registration_table" - ) - else: - assert os.path.isfile(f"{image['zarr_url']}/registration_table") - - # Run find_registration_consensus - dataset_attrs = execute_tasks_v2( - wf_task_list=[ - WorkflowTaskV2Mock( - task=fractal_tasks_mock_no_db["find_registration_consensus"], - task_id=fractal_tasks_mock_no_db[ - "find_registration_consensus" - ].id, - id=2, - order=2, - ) - ], - dataset=DatasetV2Mock( - name="dataset", zarr_dir=zarr_dir, **dataset_attrs - ), - **execute_tasks_v2_args, - ) + # Run find_registration_consensus + dataset_with_attrs = await dataset_factory_v2( + project_id=project.id, zarr_dir=zarr_dir, **dataset_attrs + ) + execute_tasks_v2( + wf_task_list=[ + WorkflowTaskV2Mock( + task=fractal_tasks_mock_no_db[ + "find_registration_consensus" + ], + task_id=fractal_tasks_mock_no_db[ + "find_registration_consensus" + ].id, + id=2, + order=2, + ) + ], + dataset=dataset_with_attrs, + **execute_tasks_v2_args, + ) + dataset_attrs = await _get_dataset_attrs(db, dataset_with_attrs.id) - # In all images, a certain (post-consensus) table was updated - for image in dataset_attrs["images"]: - assert os.path.isfile(f"{image['zarr_url']}/registration_table_final") - - # The image list still has the original six images only - assert len(dataset_attrs["images"]) == 6 - - # Run apply_registration_to_image - dataset_attrs = execute_tasks_v2( - wf_task_list=[ - WorkflowTaskV2Mock( - task=fractal_tasks_mock_no_db["apply_registration_to_image"], - task_id=fractal_tasks_mock_no_db[ - "apply_registration_to_image" - ].id, - args_parallel={"overwrite_input": False}, - id=3, - order=3, + # In all images, a certain (post-consensus) table was updated + for image in dataset_attrs["images"]: + assert os.path.isfile( + f"{image['zarr_url']}/registration_table_final" ) - ], - dataset=DatasetV2Mock( - name="dataset", zarr_dir=zarr_dir, **dataset_attrs - ), - **execute_tasks_v2_args, - ) - # A new copy of each image was created - assert len(dataset_attrs["images"]) == 12 + # The image list still has the original six images only + assert len(dataset_attrs["images"]) == 6 + + # Run apply_registration_to_image + dataset_with_attrs = await dataset_factory_v2( + project_id=project.id, zarr_dir=zarr_dir, **dataset_attrs + ) + execute_tasks_v2( + wf_task_list=[ + WorkflowTaskV2Mock( + task=fractal_tasks_mock_no_db[ + "apply_registration_to_image" + ], + task_id=fractal_tasks_mock_no_db[ + "apply_registration_to_image" + ].id, + args_parallel={"overwrite_input": False}, + id=3, + order=3, + ) + ], + dataset=dataset_with_attrs, + **execute_tasks_v2_args, + ) + dataset_attrs = await _get_dataset_attrs(db, dataset_with_attrs.id) + + # A new copy of each image was created + assert len(dataset_attrs["images"]) == 12 -def test_registration_overwrite( - tmp_path: Path, executor: Executor, fractal_tasks_mock_no_db +async def test_registration_overwrite( + db, + MockCurrentUser, + project_factory_v2, + dataset_factory_v2, + tmp_path: Path, + executor: Executor, + fractal_tasks_mock_no_db, ): """ Test registration workflow, based on four tasks. @@ -538,106 +616,132 @@ def test_registration_overwrite( ) zarr_dir = (tmp_path / "zarr_dir").as_posix().rstrip("/") - dataset_attrs = execute_tasks_v2( - wf_task_list=[ - WorkflowTaskV2Mock( - task=fractal_tasks_mock_no_db[ - "create_ome_zarr_multiplex_compound" - ], - task_id=fractal_tasks_mock_no_db[ - "create_ome_zarr_multiplex_compound" - ].id, - args_non_parallel=dict(image_dir="/tmp/input_images"), - id=0, - order=0, - ), - ], - dataset=DatasetV2Mock(name="dataset", zarr_dir=zarr_dir), - **execute_tasks_v2_args, - ) + async with MockCurrentUser() as user: + project = await project_factory_v2(user) + dataset = await dataset_factory_v2( + project_id=project.id, zarr_dir=zarr_dir + ) + execute_tasks_v2( + wf_task_list=[ + WorkflowTaskV2Mock( + task=fractal_tasks_mock_no_db[ + "create_ome_zarr_multiplex_compound" + ], + task_id=fractal_tasks_mock_no_db[ + "create_ome_zarr_multiplex_compound" + ].id, + args_non_parallel=dict(image_dir="/tmp/input_images"), + id=0, + order=0, + ), + ], + dataset=dataset, + **execute_tasks_v2_args, + ) + dataset_attrs = await _get_dataset_attrs(db, dataset.id) - # Run init registration - dataset_attrs = execute_tasks_v2( - wf_task_list=[ - WorkflowTaskV2Mock( - task=fractal_tasks_mock_no_db[ - "calculate_registration_compound" - ], - task_id=fractal_tasks_mock_no_db[ - "calculate_registration_compound" - ].id, - args_non_parallel={"ref_acquisition": 0}, - order=1, - id=1, - ) - ], - dataset=DatasetV2Mock( - name="dataset", zarr_dir=zarr_dir, **dataset_attrs - ), - **execute_tasks_v2_args, - ) + # Run init registration + dataset_with_attrs = await dataset_factory_v2( + project_id=project.id, zarr_dir=zarr_dir, **dataset_attrs + ) + execute_tasks_v2( + wf_task_list=[ + WorkflowTaskV2Mock( + task=fractal_tasks_mock_no_db[ + "calculate_registration_compound" + ], + task_id=fractal_tasks_mock_no_db[ + "calculate_registration_compound" + ].id, + args_non_parallel={"ref_acquisition": 0}, + order=1, + id=1, + ) + ], + dataset=dataset_with_attrs, + **execute_tasks_v2_args, + ) + dataset_attrs = await _get_dataset_attrs(db, dataset_with_attrs.id) - # In all non-reference-cycle images, a certain table was updated - for image in dataset_attrs["images"]: - if image["attributes"]["acquisition"] == 0: - assert not os.path.isfile( - f"{image['zarr_url']}/registration_table" - ) - else: - assert os.path.isfile(f"{image['zarr_url']}/registration_table") - - # Run find_registration_consensus - dataset_attrs = execute_tasks_v2( - wf_task_list=[ - WorkflowTaskV2Mock( - task=fractal_tasks_mock_no_db["find_registration_consensus"], - task_id=fractal_tasks_mock_no_db[ - "find_registration_consensus" - ].id, - id=2, - order=2, - ) - ], - dataset=DatasetV2Mock( - name="dataset", zarr_dir=zarr_dir, **dataset_attrs - ), - **execute_tasks_v2_args, - ) + # In all non-reference-cycle images, a certain table was updated + for image in dataset_attrs["images"]: + if image["attributes"]["acquisition"] == 0: + assert not os.path.isfile( + f"{image['zarr_url']}/registration_table" + ) + else: + assert os.path.isfile( + f"{image['zarr_url']}/registration_table" + ) + + # Run find_registration_consensus + dataset_with_attrs = await dataset_factory_v2( + project_id=project.id, zarr_dir=zarr_dir, **dataset_attrs + ) + execute_tasks_v2( + wf_task_list=[ + WorkflowTaskV2Mock( + task=fractal_tasks_mock_no_db[ + "find_registration_consensus" + ], + task_id=fractal_tasks_mock_no_db[ + "find_registration_consensus" + ].id, + id=2, + order=2, + ) + ], + dataset=dataset_with_attrs, + **execute_tasks_v2_args, + ) + dataset_attrs = await _get_dataset_attrs(db, dataset_with_attrs.id) - # In all images, a certain (post-consensus) table was updated - for image in dataset_attrs["images"]: - assert os.path.isfile(f"{image['zarr_url']}/registration_table_final") - - # The image list still has the original six images only - assert len(dataset_attrs["images"]) == 6 - - # Run apply_registration_to_image - dataset_attrs = execute_tasks_v2( - wf_task_list=[ - WorkflowTaskV2Mock( - task=fractal_tasks_mock_no_db["apply_registration_to_image"], - task_id=fractal_tasks_mock_no_db[ - "apply_registration_to_image" - ].id, - args_parallel={"overwrite_input": True}, - id=3, - order=3, + # In all images, a certain (post-consensus) table was updated + for image in dataset_attrs["images"]: + assert os.path.isfile( + f"{image['zarr_url']}/registration_table_final" ) - ], - dataset=DatasetV2Mock( - name="dataset", zarr_dir=zarr_dir, **dataset_attrs - ), - **execute_tasks_v2_args, - ) - # Images are still the same number, but they are marked as registered - assert len(dataset_attrs["images"]) == 6 - for image in dataset_attrs["images"]: - assert image["types"]["registration"] is True + # The image list still has the original six images only + assert len(dataset_attrs["images"]) == 6 + + # Run apply_registration_to_image + dataset_with_attrs = await dataset_factory_v2( + project_id=project.id, zarr_dir=zarr_dir, **dataset_attrs + ) + execute_tasks_v2( + wf_task_list=[ + WorkflowTaskV2Mock( + task=fractal_tasks_mock_no_db[ + "apply_registration_to_image" + ], + task_id=fractal_tasks_mock_no_db[ + "apply_registration_to_image" + ].id, + args_parallel={"overwrite_input": True}, + id=3, + order=3, + ) + ], + dataset=dataset_with_attrs, + **execute_tasks_v2_args, + ) + dataset_attrs = await _get_dataset_attrs(db, dataset_with_attrs.id) + + # Images are still the same number, but they are marked as registered + assert len(dataset_attrs["images"]) == 6 + for image in dataset_attrs["images"]: + assert image["types"]["registration"] is True -def test_channel_parallelization_with_overwrite( - tmp_path: Path, executor: Executor, fractal_tasks_mock_no_db +async def test_channel_parallelization_with_overwrite( + db, + MockCurrentUser, + project_factory_v2, + dataset_factory_v2, + tmp_path: Path, + executor: Executor, + fractal_tasks_mock_no_db, ): zarr_dir = (tmp_path / "zarr_dir").as_posix().rstrip("/") @@ -646,51 +750,65 @@ def test_channel_parallelization_with_overwrite( workflow_dir_local=tmp_path / "job_dir", workflow_dir_remote=tmp_path / "job_dir", ) - # Run create_ome_zarr+yokogawa_to_zarr - dataset_attrs = execute_tasks_v2( - wf_task_list=[ - WorkflowTaskV2Mock( - task=fractal_tasks_mock_no_db["create_ome_zarr_compound"], - task_id=fractal_tasks_mock_no_db[ - "create_ome_zarr_compound" - ].id, - args_non_parallel=dict(image_dir="/tmp/input_images"), - id=0, - order=0, - ), - ], - dataset=DatasetV2Mock(name="dataset", zarr_dir=zarr_dir), - **execute_tasks_v2_args, - ) + async with MockCurrentUser() as user: + project = await project_factory_v2(user) + dataset = await dataset_factory_v2( + project_id=project.id, zarr_dir=zarr_dir + ) + # Run create_ome_zarr+yokogawa_to_zarr + execute_tasks_v2( + wf_task_list=[ + WorkflowTaskV2Mock( + task=fractal_tasks_mock_no_db["create_ome_zarr_compound"], + task_id=fractal_tasks_mock_no_db[ + "create_ome_zarr_compound" + ].id, + args_non_parallel=dict(image_dir="/tmp/input_images"), + id=0, + order=0, + ), + ], + dataset=dataset, + **execute_tasks_v2_args, + ) + dataset_attrs = await _get_dataset_attrs(db, dataset.id) - # Run illumination_correction_compound - dataset_attrs = execute_tasks_v2( - wf_task_list=[ - WorkflowTaskV2Mock( - task=fractal_tasks_mock_no_db[ - "illumination_correction_compound" - ], - task_id=fractal_tasks_mock_no_db[ - "illumination_correction_compound" - ].id, - args_non_parallel=dict(overwrite_input=True), - args_parallel=dict(another_argument="something"), - id=1, - order=1, - ), - ], - dataset=DatasetV2Mock( - name="dataset", zarr_dir=zarr_dir, **dataset_attrs - ), - **execute_tasks_v2_args, - ) + # Run illumination_correction_compound + dataset_with_attrs = await dataset_factory_v2( + project_id=project.id, zarr_dir=zarr_dir, **dataset_attrs + ) + execute_tasks_v2( + wf_task_list=[ + WorkflowTaskV2Mock( + task=fractal_tasks_mock_no_db[ + "illumination_correction_compound" + ], + task_id=fractal_tasks_mock_no_db[ + "illumination_correction_compound" + ].id, + args_non_parallel=dict(overwrite_input=True), + args_parallel=dict(another_argument="something"), + id=1, + order=1, + ), + ], + dataset=dataset_with_attrs, + **execute_tasks_v2_args, + ) + dataset_attrs = await _get_dataset_attrs(db, dataset_with_attrs.id) - # Check that there are now 2 images - assert len(dataset_attrs["images"]) == 2 + # Check that there are now 2 images + assert len(dataset_attrs["images"]) == 2 -def test_channel_parallelization_no_overwrite( - tmp_path: Path, executor: Executor, fractal_tasks_mock_no_db +async def test_channel_parallelization_no_overwrite( + db, + MockCurrentUser, + project_factory_v2, + dataset_factory_v2, + tmp_path: Path, + executor: Executor, + fractal_tasks_mock_no_db, ): zarr_dir = (tmp_path / "zarr_dir").as_posix().rstrip("/") @@ -699,50 +817,62 @@ def test_channel_parallelization_no_overwrite( workflow_dir_local=tmp_path / "job_dir", workflow_dir_remote=tmp_path / "job_dir", ) - # Run create_ome_zarr+yokogawa_to_zarr - dataset_attrs = execute_tasks_v2( - wf_task_list=[ - WorkflowTaskV2Mock( - task=fractal_tasks_mock_no_db["create_ome_zarr_compound"], - task_id=fractal_tasks_mock_no_db[ - "create_ome_zarr_compound" - ].id, - args_non_parallel=dict(image_dir="/tmp/input_images"), - id=0, - order=0, - ), - ], - dataset=DatasetV2Mock(name="dataset", zarr_dir=zarr_dir), - **execute_tasks_v2_args, - ) + async with MockCurrentUser() as user: + project = await project_factory_v2(user) + dataset = await dataset_factory_v2( + project_id=project.id, zarr_dir=zarr_dir + ) + # Run create_ome_zarr+yokogawa_to_zarr + execute_tasks_v2( + wf_task_list=[ + WorkflowTaskV2Mock( + task=fractal_tasks_mock_no_db["create_ome_zarr_compound"], + task_id=fractal_tasks_mock_no_db[ + "create_ome_zarr_compound" + ].id, + args_non_parallel=dict(image_dir="/tmp/input_images"), + id=0, + order=0, + ), + ], + dataset=dataset, + **execute_tasks_v2_args, + ) + dataset_attrs = await _get_dataset_attrs(db, dataset.id) - # Run illumination_correction_compound - dataset_attrs = execute_tasks_v2( - wf_task_list=[ - WorkflowTaskV2Mock( - task=fractal_tasks_mock_no_db[ - "illumination_correction_compound" - ], - task_id=fractal_tasks_mock_no_db[ - "illumination_correction_compound" - ].id, - args_non_parallel=dict(overwrite_input=False), - args_parallel=dict(another_argument="something"), - id=1, - order=1, - ), - ], - dataset=DatasetV2Mock( - name="dataset", zarr_dir=zarr_dir, **dataset_attrs - ), - **execute_tasks_v2_args, - ) + # Run illumination_correction_compound + dataset_with_attrs = await dataset_factory_v2( + project_id=project.id, zarr_dir=zarr_dir, **dataset_attrs + ) + execute_tasks_v2( + wf_task_list=[ + WorkflowTaskV2Mock( + task=fractal_tasks_mock_no_db[ + "illumination_correction_compound" + ], + task_id=fractal_tasks_mock_no_db[ + "illumination_correction_compound" + ].id, + args_non_parallel=dict(overwrite_input=False), + args_parallel=dict(another_argument="something"), + id=1, + order=1, + ), + ], + dataset=dataset_with_attrs, + **execute_tasks_v2_args, + ) + dataset_attrs = await _get_dataset_attrs(db, dataset_with_attrs.id) - # Check that there are now 4 images - assert len(dataset_attrs["images"]) == 4 + # Check that there are now 4 images + assert len(dataset_attrs["images"]) == 4 -def test_invalid_filtered_image_list( +async def test_invalid_filtered_image_list( + db, + MockCurrentUser, + project_factory_v2, + dataset_factory_v2, tmp_path: Path, executor: Executor, ): @@ -759,27 +889,29 @@ def test_invalid_filtered_image_list( zarr_dir = (tmp_path / "zarr_dir").as_posix().rstrip("/") zarr_url = Path(zarr_dir, "my_image").as_posix() image = SingleImage(zarr_url=zarr_url, attributes={}, types={}).dict() - - with pytest.raises(JobExecutionError) as e: - execute_tasks_v2( - wf_task_list=[ - WorkflowTaskV2Mock( - task=TaskV2Mock( - id=0, - name="name", - source="source", - command_non_parallel="cmd", - type="non_parallel", - input_types=dict(invalid=True), - ), - task_id=0, - id=0, - order=0, - ) - ], - dataset=DatasetV2Mock( - name="dataset", zarr_dir=zarr_dir, images=[image] - ), - **execute_tasks_v2_args, + async with MockCurrentUser() as user: + project = await project_factory_v2(user) + dataset = await dataset_factory_v2( + project_id=project.id, zarr_dir=zarr_dir, images=[image] ) - assert "Invalid filtered image list" in str(e.value) + with pytest.raises(JobExecutionError) as e: + execute_tasks_v2( + wf_task_list=[ + WorkflowTaskV2Mock( + task=TaskV2Mock( + id=0, + name="name", + source="source", + command_non_parallel="cmd", + type="non_parallel", + input_types=dict(invalid=True), + ), + task_id=0, + id=0, + order=0, + ) + ], + dataset=dataset, + **execute_tasks_v2_args, + ) + assert "Invalid filtered image list" in str(e.value) diff --git a/tests/v2/04_runner/test_no_images_parallelization.py b/tests/v2/04_runner/test_no_images_parallelization.py index b744176cc6..3ae0653d98 100644 --- a/tests/v2/04_runner/test_no_images_parallelization.py +++ b/tests/v2/04_runner/test_no_images_parallelization.py @@ -3,7 +3,6 @@ from pathlib import Path from fixtures_mocks import * # noqa: F401,F403 -from v2_mock_models import DatasetV2Mock from v2_mock_models import TaskV2Mock from v2_mock_models import WorkflowTaskV2Mock @@ -21,15 +20,21 @@ def execute_tasks_v2(wf_task_list, workflow_dir_local, **kwargs): logging.info(f"Now creating {subfolder.as_posix()}") subfolder.mkdir(parents=True) - out = raw_execute_tasks_v2( + raw_execute_tasks_v2( wf_task_list=wf_task_list, workflow_dir_local=workflow_dir_local, **kwargs, ) - return out -def test_parallelize_on_no_images(tmp_path: Path, executor: Executor): +async def test_parallelize_on_no_images( + db, + MockCurrentUser, + project_factory_v2, + dataset_factory_v2, + tmp_path: Path, + executor: Executor, +): """ Run a parallel task on a dataset with no images. """ @@ -40,29 +45,40 @@ def test_parallelize_on_no_images(tmp_path: Path, executor: Executor): workflow_dir_remote=tmp_path / "job_dir", ) zarr_dir = (tmp_path / "zarr_dir").as_posix().rstrip("/") - - # Run successfully on an empty dataset - execute_tasks_v2( - wf_task_list=[ - WorkflowTaskV2Mock( - task=TaskV2Mock( - name="name", - type="parallel", - command_parallel="echo", + async with MockCurrentUser() as user: + project = await project_factory_v2(user) + dataset = await dataset_factory_v2( + project_id=project.id, zarr_dir=zarr_dir + ) + # Run successfully on an empty dataset + execute_tasks_v2( + wf_task_list=[ + WorkflowTaskV2Mock( + task=TaskV2Mock( + name="name", + type="parallel", + command_parallel="echo", + id=0, + source="source", + ), + task_id=0, id=0, - source="source", - ), - task_id=0, - id=0, - order=0, - ) - ], - dataset=DatasetV2Mock(name="dataset", zarr_dir=zarr_dir), - **execute_tasks_v2_args, - ) + order=0, + ) + ], + dataset=dataset, + **execute_tasks_v2_args, + ) -def test_parallelize_on_no_images_compound(tmp_path: Path, executor: Executor): +async def test_parallelize_on_no_images_compound( + db, + MockCurrentUser, + project_factory_v2, + dataset_factory_v2, + tmp_path: Path, + executor: Executor, +): """ Run a compound task with an empty parallelization list. """ @@ -73,25 +89,29 @@ def test_parallelize_on_no_images_compound(tmp_path: Path, executor: Executor): workflow_dir_remote=tmp_path / "job_dir", ) zarr_dir = (tmp_path / "zarr_dir").as_posix().rstrip("/") - - # Run successfully on an empty dataset - execute_tasks_v2( - wf_task_list=[ - WorkflowTaskV2Mock( - task=TaskV2Mock( - name="name", - type="compound", - # this produces an empty parallelization list - command_non_parallel="echo", - command_parallel="echo", + async with MockCurrentUser() as user: + project = await project_factory_v2(user) + dataset = await dataset_factory_v2( + project_id=project.id, zarr_dir=zarr_dir + ) + # Run successfully on an empty dataset + execute_tasks_v2( + wf_task_list=[ + WorkflowTaskV2Mock( + task=TaskV2Mock( + name="name", + type="compound", + # this produces an empty parallelization list + command_non_parallel="echo", + command_parallel="echo", + id=0, + source="source", + ), + task_id=0, id=0, - source="source", - ), - task_id=0, - id=0, - order=0, - ) - ], - dataset=DatasetV2Mock(name="dataset", zarr_dir=zarr_dir), - **execute_tasks_v2_args, - ) + order=0, + ) + ], + dataset=dataset, + **execute_tasks_v2_args, + ) diff --git a/tests/v2/04_runner/test_unit_mark_last_wftask_as_failed.py b/tests/v2/04_runner/test_unit_mark_last_wftask_as_failed.py new file mode 100644 index 0000000000..1f1e446f32 --- /dev/null +++ b/tests/v2/04_runner/test_unit_mark_last_wftask_as_failed.py @@ -0,0 +1,43 @@ +from fractal_server.app.runner.v2.handle_failed_job import ( + mark_last_wftask_as_failed, +) + + +async def test_unit_mark_last_wftask_as_failed( + db, + dataset_factory_v2, + project_factory_v2, + MockCurrentUser, + caplog, +): + async with MockCurrentUser() as user: + project = await project_factory_v2(user=user) + dataset_no_history = await dataset_factory_v2( + project_id=project.id, + name="name", + history=[], + ) + + caplog.clear() + mark_last_wftask_as_failed( + dataset_id=dataset_no_history.id, logger_name="logger" + ) + print(caplog.text) + assert "is empty. Likely reason" in caplog.text + + dataset_wrong_history = await dataset_factory_v2( + name="name", + history=[ + { + "workflowtask": {"id": 123}, + "status": "done", + } + ], + ) + + caplog.clear() + mark_last_wftask_as_failed( + dataset_id=dataset_wrong_history.id, logger_name="logger" + ) + print(caplog.text) + assert "Unexpected branch: Last history item" in caplog.text diff --git a/tests/v2/04_runner/v2_mock_models.py b/tests/v2/04_runner/v2_mock_models.py index dd5b6e6f51..4321f2f9ec 100644 --- a/tests/v2/04_runner/v2_mock_models.py +++ b/tests/v2/04_runner/v2_mock_models.py @@ -1,5 +1,4 @@ from typing import Any -from typing import Literal from typing import Optional from pydantic import BaseModel @@ -8,27 +7,6 @@ from pydantic import validator -class DatasetV2Mock(BaseModel): - id: Optional[int] = None - name: str - zarr_dir: str - images: list[dict[str, Any]] = Field(default_factory=list) - filters: dict[Literal["types", "attributes"], dict[str, Any]] = Field( - default_factory=dict - ) - history: list = Field(default_factory=list) - - @property - def image_zarr_urls(self) -> list[str]: - return [image["zarr_urls"] for image in self.images] - - @validator("filters", always=True) - def _default_filters(cls, value): - if value == {}: - return {"types": {}, "attributes": {}} - return value - - class TaskV2Mock(BaseModel): id: int name: str diff --git a/tests/v2/08_full_workflow/common_functions.py b/tests/v2/08_full_workflow/common_functions.py index bf94580795..5cfc2932cb 100644 --- a/tests/v2/08_full_workflow/common_functions.py +++ b/tests/v2/08_full_workflow/common_functions.py @@ -7,9 +7,6 @@ from devtools import debug from fractal_server.app.models.v2 import TaskV2 -from fractal_server.app.runner.filenames import FILTERS_FILENAME -from fractal_server.app.runner.filenames import HISTORY_FILENAME -from fractal_server.app.runner.filenames import IMAGES_FILENAME from fractal_server.app.runner.filenames import WORKFLOW_LOG_FILENAME PREFIX = "/api/v2" @@ -130,7 +127,6 @@ async def full_workflow( ) assert res.status_code == 200 dataset = res.json() - debug(dataset) assert len(dataset["history"]) == 2 assert dataset["filters"]["types"] == {"3D": False} res = await client.post( @@ -169,9 +165,6 @@ async def full_workflow( with zipfile.ZipFile(f"{working_dir}.zip", "r") as zip_ref: actual_files = zip_ref.namelist() expected_files = [ - HISTORY_FILENAME, - FILTERS_FILENAME, - IMAGES_FILENAME, WORKFLOW_LOG_FILENAME, ] assert set(expected_files) < set(actual_files) @@ -204,7 +197,6 @@ async def full_workflow_TaskExecutionError( user_kwargs: Optional[dict] = None, user_settings_dict: Optional[dict] = None, ): - if user_kwargs is None: user_kwargs = {} @@ -596,9 +588,6 @@ async def workflow_with_non_python_task( must_exist = [ "0.log", "0.args.json", - IMAGES_FILENAME, - HISTORY_FILENAME, - FILTERS_FILENAME, WORKFLOW_LOG_FILENAME, ]