Skip to content

Commit

Permalink
Introduce jobscript exit code check (#3)
Browse files Browse the repository at this point in the history
* feat: introduce jobscript exit code check

* fix: incorporate feedback

* fix: Improve SubprocessError construction
  • Loading branch information
leahaeusel authored Dec 30, 2024
1 parent e7bb523 commit 9d1625f
Show file tree
Hide file tree
Showing 7 changed files with 362 additions and 105 deletions.
4 changes: 4 additions & 0 deletions queens/data_processor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,14 @@
DataProcessorEnsightInterfaceDiscrepancy,
)
from queens.data_processor.data_processor_numpy import DataProcessorNumpy
from queens.data_processor.data_processor_pvd import DataProcessorPvd
from queens.data_processor.data_processor_txt import DataProcessorTxt

VALID_TYPES = {
"csv": DataProcessorCsv,
"ensight": DataProcessorEnsight,
"ensight_interface_discrepancy": DataProcessorEnsightInterfaceDiscrepancy,
"numpy": DataProcessorNumpy,
"pvd": DataProcessorPvd,
"txt": DataProcessorTxt,
}
2 changes: 1 addition & 1 deletion queens/drivers/fourc_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def __init__(
files_to_copy=None,
data_processor=None,
gradient_data_processor=None,
post_processor=None,
post_processor="",
post_options="",
mpi_cmd="/usr/bin/mpirun --bind-to none",
):
Expand Down
153 changes: 102 additions & 51 deletions queens/drivers/jobscript_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
#
"""Driver to run a jobscript."""


import logging
from dataclasses import dataclass
from pathlib import Path

from queens.drivers.driver import Driver
from queens.utils.exceptions import SubprocessError
from queens.utils.injector import inject, inject_in_template
from queens.utils.io_utils import read_file
from queens.utils.logger_settings import log_init_args
Expand Down Expand Up @@ -50,7 +52,7 @@ def to_dict(self):
"""Create a job options dict.
Returns:
dict: dictionary with all the data
dict: Dict containing all the data.
"""
dictionary = self.__dict__.copy()
dictionary.update(dictionary.pop("input_files"))
Expand All @@ -60,10 +62,10 @@ def add_data_and_to_dict(self, additional_data):
"""Add additional options to the job options dict.
Args:
additional_data (dict): Additional data to combine with the job options
additional_data (dict): Additional data to combine with the job options.
Returns:
_type_: _description_
dict: Dict combining the job options and the additional data.
"""
return self.to_dict() | additional_data

Expand All @@ -72,12 +74,14 @@ class JobscriptDriver(Driver):
"""Driver to run an executable with a jobscript.
Attributes:
input_templates (Path): read in simulation input template as string
data_processor (obj): instance of data processor class
gradient_data_processor (obj): instance of data processor class for gradient data
jobscript_template (str): read in jobscript template as string
jobscript_options (dict): Dictionary containing jobscript options
jobscript_file_name (str): Jobscript file name (default: 'jobscript.sh')
input_templates (Path): Read in simulation input template as string.
data_processor (obj): Instance of data processor class.
gradient_data_processor (obj): Instance of data processor class for gradient data.
jobscript_template (str): Read-in jobscript template.
jobscript_options (dict): Dictionary containing jobscript options.
jobscript_file_name (str): Jobscript file name (default: 'jobscript.sh').
raise_error_on_jobscript_failure (bool): Whether to raise an error for a non-zero jobscript
exit code.
"""

@log_init_args
Expand All @@ -92,47 +96,48 @@ def __init__(
gradient_data_processor=None,
jobscript_file_name="jobscript.sh",
extra_options=None,
raise_error_on_jobscript_failure=True,
):
"""Initialize JobscriptDriver object.
Args:
parameters (Parameters): Parameters object
input_templates (str, Path, dict): path(s) to simulation input template
jobscript_template (str, Path): path to jobscript template or read in jobscript template
executable (str, Path): path to main executable of respective software
files_to_copy (list, opt): files or directories to copy to experiment_dir
data_processor (obj, opt): instance of data processor class
gradient_data_processor (obj, opt): instance of data processor class for gradient data
jobscript_file_name (str): Jobscript file name (default: 'jobscript.sh')
extra_options (dict): Extra options to inject into jobscript template
parameters (Parameters): Parameters object.
input_templates (str, Path, dict): Path(s) to simulation input template.
jobscript_template (str, Path): Path to jobscript template or read-in jobscript
template.
executable (str, Path): Path to main executable of respective software.
files_to_copy (list, opt): Files or directories to copy to experiment_dir.
data_processor (obj, opt): Instance of data processor class.
gradient_data_processor (obj, opt): Instance of data processor class for gradient data.
jobscript_file_name (str, opt): Jobscript file name (default: 'jobscript.sh').
extra_options (dict, opt): Extra options to inject into jobscript template.
raise_error_on_jobscript_failure (bool, opt): Whether to raise an error for a non-zero
jobscript exit code.
"""
super().__init__(parameters=parameters, files_to_copy=files_to_copy)
self.input_templates = self.create_input_templates_dict(input_templates)
self.jobscript_template = self.get_read_in_jobscript_template(jobscript_template)
self.files_to_copy.extend(self.input_templates.values())
self.data_processor = data_processor
self.gradient_data_processor = gradient_data_processor

if Path(jobscript_template).is_file():
self.jobscript_template = read_file(jobscript_template)
else:
self.jobscript_template = jobscript_template

if extra_options is None:
extra_options = {}

self.jobscript_options = extra_options
self.jobscript_options["executable"] = executable
self.jobscript_file_name = jobscript_file_name
self.raise_error_on_jobscript_failure = raise_error_on_jobscript_failure

@staticmethod
def create_input_templates_dict(input_templates):
"""Cast input templates into a dict.
Args:
input_templates (str, Path, dict): Input template(s)
input_templates (str, Path, dict): Input template(s).
Returns:
dict: containing input file names and template paths
dict: Dict containing input file names and template paths.
"""
if not isinstance(input_templates, dict):
input_templates = {"input_file": input_templates}
Expand All @@ -143,18 +148,57 @@ def create_input_templates_dict(input_templates):
}
return input_templates_dict

@staticmethod
def get_read_in_jobscript_template(jobscript_template):
"""Get the jobscript template contents.
If the provided jobscript template is a Path or a string of a
path and a valid file, the corresponding file is read.
Args:
jobscript_template (str, Path): Path to jobscript template or read-in jobscript
template.
Returns:
str: Read-in jobscript template
"""
if isinstance(jobscript_template, str):
# Catch an exception due to a long string
try:
if Path(jobscript_template).is_file():
jobscript_template = read_file(jobscript_template)
except OSError:
_logger.debug(
"The provided jobscript template string is not a regular file so we assume "
"that it holds the read-in jobscript template. The jobscript template reads:\n"
"%s",
{jobscript_template},
)

elif isinstance(jobscript_template, Path):
if jobscript_template.is_file():
jobscript_template = read_file(jobscript_template)
else:
raise FileNotFoundError(
f"The provided jobscript template path {jobscript_template} is not a file."
)
else:
raise TypeError("The jobscript template needs to be a string or a Path.")

return jobscript_template

def run(self, sample, job_id, num_procs, experiment_dir, experiment_name):
"""Run the driver.
Args:
sample (dict): Dict containing sample
job_id (int): Job ID
num_procs (int): number of processors
experiment_name (str): name of QUEENS experiment.
sample (dict): Dict containing sample.
job_id (int): Job ID.
num_procs (int): Number of processors.
experiment_dir (Path): Path to QUEENS experiment directory.
experiment_name (str): Name of QUEENS experiment.
Returns:
Result and potentially the gradient
Result and potentially the gradient.
"""
job_dir, output_dir, output_file, input_files, log_file, error_file = self._manage_paths(
job_id, experiment_dir, experiment_name
Expand Down Expand Up @@ -204,17 +248,17 @@ def _manage_paths(self, job_id, experiment_dir, experiment_name):
"""Manage paths for driver run.
Args:
job_id (int): Job id.
job_id (int): Job ID.
experiment_dir (Path): Path to QUEENS experiment directory.
experiment_name (str): name of QUEENS experiment.
experiment_name (str): Name of QUEENS experiment.
Returns:
job_dir (Path): Path to job directory
output_dir (Path): Path to output directory
output_file (Path): Path to output file(s)
input_files (dict): Dict with name and path of the input file(s)
log_file (Path): Path to log file
error_file (Path): Path to error file
job_dir (Path): Path to job directory.
output_dir (Path): Path to output directory.
output_file (Path): Path to output file(s).
input_files (dict): Dict with name and path of the input file(s).
log_file (Path): Path to log file.
error_file (Path): Path to error file.
"""
job_dir = experiment_dir / str(job_id)
output_dir = job_dir / "output"
Expand All @@ -235,18 +279,17 @@ def _manage_paths(self, job_id, experiment_dir, experiment_name):

return job_dir, output_dir, output_file, input_files, log_file, error_file

@staticmethod
def _run_executable(job_id, execute_cmd, log_file, error_file, verbose=False):
def _run_executable(self, job_id, execute_cmd, log_file, error_file, verbose=False):
"""Run executable.
Args:
job_id (int): Job id
execute_cmd (str): Executed command
log_file (Path): Path to log file
error_file (Path): Path to error file
verbose (bool, opt): flag for additional streaming to terminal
job_id (int): Job ID.
execute_cmd (str): Executed command.
log_file (Path): Path to log file.
error_file (Path): Path to error file.
verbose (bool, opt): Flag for additional streaming to terminal.
"""
run_subprocess_with_logging(
process_returncode, _, stdout, stderr = run_subprocess_with_logging(
execute_cmd,
terminate_expression="PROC.*ERROR",
logger_name=__name__ + f"_{job_id}",
Expand All @@ -255,16 +298,24 @@ def _run_executable(job_id, execute_cmd, log_file, error_file, verbose=False):
streaming=verbose,
raise_error_on_subprocess_failure=False,
)
if self.raise_error_on_jobscript_failure and process_returncode:
raise SubprocessError.construct_error_from_command(
command=execute_cmd,
command_output=stdout,
error_message=stderr,
additional_message=f"The jobscript with job ID {job_id} has failed with exit code "
f"{process_returncode}.",
)

def _get_results(self, output_dir):
"""Get results from driver run.
Args:
output_dir (Path): Path to output directory
output_dir (Path): Path to output directory.
Returns:
result (np.array): Result from the driver run
gradient (np.array, None): Gradient from the driver run (potentially None)
result (np.array): Result from the driver run.
gradient (np.array, None): Gradient from the driver run (potentially None).
"""
result = None
if self.data_processor:
Expand All @@ -281,9 +332,9 @@ def prepare_input_files(self, sample_dict, experiment_dir, input_files):
"""Prepare and parse data to input files.
Args:
sample_dict (dict): Dict containing sample
sample_dict (dict): Dict containing sample.
experiment_dir (Path): Path to QUEENS experiment directory.
input_files (dict): Dict with name and path of the input file(s)
input_files (dict): Dict with name and path of the input file(s).
"""
for input_template_name, input_template_path in self.input_templates.items():
inject(
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/fourc/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def fixture_setup_symbolic_links_fourc(fourc_link_paths, fourc_build_paths_for_g
"existing file! \n"
"You can create the necessary symbolic links on Linux via:\n"
"-------------------------------------------------------------------------\n"
"ln -s <path/to/fourc> <QUEENS_BaseDir>/config/fourc\n"
"ln -s <path/to/fourc> <QUEENS_BaseDir>/config/4C\n"
"ln -s <path/to/post_ensight> <QUEENS_BaseDir>/config/post_ensight\n"
"ln -s <path/to/post_processor> <QUEENS_BaseDir>/config/post_processor\n"
"-------------------------------------------------------------------------\n"
Expand Down
6 changes: 6 additions & 0 deletions tests/unit_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ def fixture_dummy_simulation_model():
return model


@pytest.fixture(name="files_to_copy")
def fixture_files_to_copy():
"""Files to copy."""
return ["fileA", "fileB"]


@pytest.fixture(name="get_patched_bmfia_iterator")
def fixture_get_patched_bmfia_iterator(global_settings):
"""Function that returns a dummy BMFIA iterator for testing."""
Expand Down
Loading

0 comments on commit 9d1625f

Please sign in to comment.