Skip to content

Commit

Permalink
Merge pull request #2169 from fractal-analytics-platform/2135-replace…
Browse files Browse the repository at this point in the history
…-temporary-files-with-database-access-in-highest-level-runner-module

Replace temporary files with database access in highest level runner module
  • Loading branch information
tcompa authored Jan 14, 2025
2 parents d7dac21 + 74dfe65 commit 803c147
Show file tree
Hide file tree
Showing 32 changed files with 1,455 additions and 1,338 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/benchmarks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ name: Benchmarks

on:
push:
branches: ["main"]
branches: ["main", "dev-2.11"]
pull_request:
branches: ["main"]
branches: ["main", "dev-2.11"]

jobs:

Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ name: ci

on:
push:
branches: ["main"]
branches: ["main", "dev-2.11"]
pull_request:
branches: ["main"]
branches: ["main", "dev-2.11"]

jobs:

Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/documentation.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ name: docs

on:
push:
branches: ["main"]
branches: ["main", "dev-2.11"]
pull_request:
branches: ["main"]
branches: ["main", "dev-2.11"]

jobs:

Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/migrations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ name: migrations

on:
push:
branches: ["main"]
branches: ["main", "dev-2.11"]
pull_request:
branches: ["main"]
branches: ["main", "dev-2.11"]


env:
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/oauth.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ name: OAuth2-OIDC

on:
push:
branches: ["main"]
branches: ["main", "dev-2.11"]
pull_request:
branches: ["main"]
branches: ["main", "dev-2.11"]

jobs:

Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/pip_install.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ name: pip-install

on:
push:
branches: ["main"]
branches: ["main", "dev-2.11"]
pull_request:
branches: ["main"]
branches: ["main", "dev-2.11"]

jobs:

Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/poetry_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ name: Build package

on:
push:
branches: ["main"]
branches: ["main", "dev-2.11"]
pull_request:
branches: ["main"]
branches: ["main", "dev-2.11"]

jobs:
build:
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/precommit.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ name: precommit

on:
push:
branches: ["main"]
branches: ["main","dev-2.11"]
pull_request:
branches: ["main"]
branches: ["main","dev-2.11"]

jobs:
precommit:
Expand Down
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ repos:
rev: 7.1.1
hooks:
- id: flake8
args: ["--exclude", "examples/*"]
args: ["--exclude", "examples/*", "--ignore=E203,W503"]
- repo: https://github.com/PyCQA/bandit
rev: '1.7.4'
hooks:
Expand Down
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
**Note**: Numbers like (\#1234) point to closed Pull Requests on the fractal-server repository.

# 2.11.0 (unreleased)

* Runner
* Integrate database write access in runner component (\#2169).
* API:
* Update and simplify `/api/v2/project/{project_id}/status/` (\#2169).

# 2.10.5

* App:
Expand Down
4 changes: 2 additions & 2 deletions fractal_server/app/routes/api/v1/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from ....models.v1 import Dataset
from ....models.v1 import Project
from ....models.v1 import Resource
from ....runner.filenames import HISTORY_FILENAME
from ....runner.filenames import HISTORY_FILENAME_V1
from ....schemas.v1 import DatasetCreateV1
from ....schemas.v1 import DatasetReadV1
from ....schemas.v1 import DatasetStatusReadV1
Expand Down Expand Up @@ -511,7 +511,7 @@ async def get_workflowtask_status(
# Highest priority: Read status updates coming from the running-job
# temporary file. Note: this file only contains information on
# WorkflowTask's that ran through successfully
tmp_file = Path(running_job.working_dir) / HISTORY_FILENAME
tmp_file = Path(running_job.working_dir) / HISTORY_FILENAME_V1
try:
with tmp_file.open("r") as f:
history = json.load(f)
Expand Down
40 changes: 20 additions & 20 deletions fractal_server/app/routes/api/v2/status.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import json
from pathlib import Path
from typing import Optional

from fastapi import APIRouter
Expand All @@ -18,7 +16,6 @@
from ._aux_functions import _get_workflow_check_owner
from fractal_server.app.models import UserOAuth
from fractal_server.app.routes.auth import current_active_user
from fractal_server.app.runner.filenames import HISTORY_FILENAME

router = APIRouter()

Expand Down Expand Up @@ -98,8 +95,8 @@ async def get_workflowtask_status(
if running_job is None:
# If no job is running, the chronological-last history item is also the
# positional-last workflow task to be included in the response.
if len(dataset.history) > 0:
last_valid_wftask_id = dataset.history[-1]["workflowtask"]["id"]
if len(history) > 0:
last_valid_wftask_id = history[-1]["workflowtask"]["id"]
else:
last_valid_wftask_id = None
else:
Expand All @@ -109,7 +106,24 @@ async def get_workflowtask_status(
# as "submitted"
start = running_job.first_task_index
end = running_job.last_task_index + 1
for wftask in workflow.task_list[start:end]:

running_job_wftasks = workflow.task_list[start:end]
running_job_statuses = [
workflow_tasks_status_dict.get(wft.id, None)
for wft in running_job_wftasks
]
try:
first_submitted_index = running_job_statuses.index(
WorkflowTaskStatusTypeV2.SUBMITTED
)
except ValueError:
logger.warning(
f"Job {running_job.id} is submitted but its task list does "
f"not contain a {WorkflowTaskStatusTypeV2.SUBMITTED} task."
)
first_submitted_index = 0

for wftask in running_job_wftasks[first_submitted_index:]:
workflow_tasks_status_dict[
wftask.id
] = WorkflowTaskStatusTypeV2.SUBMITTED
Expand All @@ -133,20 +147,6 @@ async def get_workflowtask_status(
last_valid_wftask_id = None
logger.warning(f"Now setting {last_valid_wftask_id=}.")

# Highest priority: Read status updates coming from the running-job
# temporary file. Note: this file only contains information on
# WorkflowTask's that ran through successfully.
tmp_file = Path(running_job.working_dir) / HISTORY_FILENAME
try:
with tmp_file.open("r") as f:
history = json.load(f)
except FileNotFoundError:
history = []
for history_item in history:
wftask_id = history_item["workflowtask"]["id"]
wftask_status = history_item["status"]
workflow_tasks_status_dict[wftask_id] = wftask_status

# Based on previously-gathered information, clean up the response body
clean_workflow_tasks_status_dict = {}
for wf_task in workflow.task_list:
Expand Down
6 changes: 2 additions & 4 deletions fractal_server/app/runner/filenames.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
HISTORY_FILENAME = "history.json"
FILTERS_FILENAME = "filters.json"
IMAGES_FILENAME = "images.json"
METADATA_FILENAME = "metadata.json"
HISTORY_FILENAME_V1 = "history.json"
METADATA_FILENAME_V1 = "metadata.json"
SHUTDOWN_FILENAME = "shutdown"
WORKFLOW_LOG_FILENAME = "workflow.log"
8 changes: 4 additions & 4 deletions fractal_server/app/runner/v1/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
from ..exceptions import TaskExecutionError
from .common import TaskParameters
from .common import write_args_file
from fractal_server.app.runner.filenames import HISTORY_FILENAME
from fractal_server.app.runner.filenames import METADATA_FILENAME
from fractal_server.app.runner.filenames import HISTORY_FILENAME_V1
from fractal_server.app.runner.filenames import METADATA_FILENAME_V1
from fractal_server.app.runner.task_files import get_task_file_paths
from fractal_server.string_tools import validate_cmd

Expand Down Expand Up @@ -610,11 +610,11 @@ def execute_tasks(
)

# Write most recent metadata to METADATA_FILENAME
with open(workflow_dir_local / METADATA_FILENAME, "w") as f:
with open(workflow_dir_local / METADATA_FILENAME_V1, "w") as f:
json.dump(current_task_pars.metadata, f, indent=2)

# Write most recent metadata to HISTORY_FILENAME
with open(workflow_dir_local / HISTORY_FILENAME, "w") as f:
with open(workflow_dir_local / HISTORY_FILENAME_V1, "w") as f:
json.dump(current_task_pars.history, f, indent=2)

return current_task_pars
8 changes: 4 additions & 4 deletions fractal_server/app/runner/v1/handle_failed_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
from ...models.v1 import Workflow
from ...models.v1 import WorkflowTask
from ...schemas.v1 import WorkflowTaskStatusTypeV1
from ..filenames import HISTORY_FILENAME
from ..filenames import METADATA_FILENAME
from ..filenames import HISTORY_FILENAME_V1
from ..filenames import METADATA_FILENAME_V1


def assemble_history_failed_job(
Expand Down Expand Up @@ -64,7 +64,7 @@ def assemble_history_failed_job(
new_history = output_dataset.history

# Part 2: Extend history based on tmp_metadata_file
tmp_history_file = Path(job.working_dir) / HISTORY_FILENAME
tmp_history_file = Path(job.working_dir) / HISTORY_FILENAME_V1
try:
with tmp_history_file.open("r") as f:
tmp_file_history = json.load(f)
Expand Down Expand Up @@ -129,7 +129,7 @@ def assemble_meta_failed_job(
"""

new_meta = deepcopy(output_dataset.meta)
metadata_file = Path(job.working_dir) / METADATA_FILENAME
metadata_file = Path(job.working_dir) / METADATA_FILENAME_V1
try:
with metadata_file.open("r") as f:
metadata_update = json.load(f)
Expand Down
Loading

0 comments on commit 803c147

Please sign in to comment.