Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace temporary files with database access, in highest-level runner module #2135

Closed
tcompa opened this issue Dec 12, 2024 · 2 comments · Fixed by #2169
Closed

Replace temporary files with database access, in highest-level runner module #2135

tcompa opened this issue Dec 12, 2024 · 2 comments · Fixed by #2169
Assignees
Labels
flexibility Support more workflow-execution use cases runner

Comments

@tcompa
Copy link
Collaborator

tcompa commented Dec 12, 2024

Note:

  1. Fixing this issue will fix the fact that the image list is currently never updated during job execution. In the new version, the update will take place after each task is complete.
  2. More broadly, this will be a slightly deeper integration of db operations within the runner component - which represents a useful example in view of potential upcoming changes related to flexibility.

Runner / internal

In v2/runner.py we have:

        # Write current dataset attributes (history, images, filters) into
        # temporary files which can be used (1) to retrieve the latest state
        # when the job fails, (2) from within endpoints that need up-to-date
        # information
        with open(workflow_dir_local / HISTORY_FILENAME, "w") as f:
            json.dump(tmp_history, f, indent=2)
        with open(workflow_dir_local / FILTERS_FILENAME, "w") as f:
            json.dump(tmp_filters, f, indent=2)
        with open(workflow_dir_local / IMAGES_FILENAME, "w") as f:
            json.dump(tmp_images, f, indent=2)

These should all be replaced by db operations.

Runner / No errors

In v2/__init__.py (same folder), we have

    new_dataset_attributes = await process_workflow(
            workflow=workflow,
            dataset=dataset,
            workflow_dir_local=WORKFLOW_DIR_LOCAL,
            workflow_dir_remote=WORKFLOW_DIR_REMOTE,
            logger_name=logger_name,
            worker_init=worker_init,
            first_task_index=job.first_task_index,
            last_task_index=job.last_task_index,
            **backend_specific_kwargs,
        )

        logger.info(
            f'End execution of workflow "{workflow.name}"; '
            f"more logs at {str(log_file_path)}"
        )
        logger.debug(f'END workflow "{workflow.name}"')

        # Update dataset attributes, in case of successful execution
        dataset.history.extend(new_dataset_attributes["history"])
        dataset.filters = new_dataset_attributes["filters"]
        dataset.images = new_dataset_attributes["images"]
        for attribute_name in ["filters", "history", "images"]:
            flag_modified(dataset, attribute_name)
        db_sync.merge(dataset)

In principle we should not return new_dataset_attributes, because the update has already happened within process_workflow.

Runner / with errors

Relevant functions are assemble_history_failed_job, assemble_filters_failed_job and assemble_images_failed_job.

In the history function, the whole next block has to be replaced by a db read of dataset.history. The rest of the function remains the same - apart from the fact that the function should also commit to the db at its end (rather than returning python objects).

    # Part 1: Read exising history from DB
    new_history = dataset.history

    # Part 2: Extend history based on temporary-file contents
    tmp_history_file = Path(job.working_dir) / HISTORY_FILENAME
    try:
        with tmp_history_file.open("r") as f:
            tmp_file_history = json.load(f)
            new_history.extend(tmp_file_history)
    except FileNotFoundError:
        tmp_file_history = []

The other two functions (assemble_images_failed_job and assemble_filters_failed_job) in principle should just be removed.

API

In get_workflowtask_status endpoint we have

    # TO KEEP:

    # Lowest priority: read status from DB, which corresponds to jobs that are
    # not running
    history = dataset.history
    for history_item in history:
        wftask_id = history_item["workflowtask"]["id"]
        wftask_status = history_item["status"]
        workflow_tasks_status_dict[wftask_id] = wftask_status

    ...

    if running_job is None:
        ...
    else:

        ...

        # TO REMOVE:

        # Highest priority: Read status updates coming from the running-job
        # temporary file. Note: this file only contains information on
        # WorkflowTask's that ran through successfully.
        tmp_file = Path(running_job.working_dir) / HISTORY_FILENAME
        try:
            with tmp_file.open("r") as f:
                history = json.load(f)
        except FileNotFoundError:
            history = []
        for history_item in history:
            wftask_id = history_item["workflowtask"]["id"]
            wftask_status = history_item["status"]
            workflow_tasks_status_dict[wftask_id] = wftask_status
@tcompa
Copy link
Collaborator Author

tcompa commented Jan 7, 2025

I expect that implementing the current issue will also impact/mitigate the unexpected behavior observed by @zonia3000 in #2167. Note that we will simplify the GET /v2/project/{project_id}/status?dataset_id={dataset_id}&workflow_id={workflow_id} endpoint logic, because it will have less branches and it will only read data from db (rather than mixing temporary on-disk data with persistent db data).

In the past we were not able to reproduce the issue in a controlled way (#1773), but we should anyway be aware of it. @mfranzon, let's keep this in mind while working on the current issue.

@tcompa
Copy link
Collaborator Author

tcompa commented Jan 28, 2025

Closed with #2169

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
flexibility Support more workflow-execution use cases runner
Projects
2 participants