Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: Improve subprocess logging #90

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 24 additions & 14 deletions queens/drivers/jobscript_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@
import logging
from dataclasses import dataclass
from pathlib import Path
from subprocess import TimeoutExpired

from queens.drivers.driver import Driver
from queens.utils.exceptions import SubprocessError
from queens.utils.injector import inject, inject_in_template
from queens.utils.io_utils import read_file
from queens.utils.logger_settings import log_init_args
from queens.utils.metadata import SimulationMetadata
from queens.utils.run_subprocess import run_subprocess_with_logging
from queens.utils.run_subprocess import run_subprocess

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -82,6 +83,8 @@ class JobscriptDriver(Driver):
jobscript_file_name (str): Jobscript file name (default: 'jobscript.sh').
raise_error_on_jobscript_failure (bool): Whether to raise an error for a non-zero jobscript
exit code.
job_timeout (int | None): Timeout for jobs in seconds. Jobs will be terminated after
timeout seconds. Default is None meaning no timeout is used.
"""

@log_init_args
Expand All @@ -91,6 +94,7 @@ def __init__(
input_templates,
jobscript_template,
executable,
job_timeout=None,
files_to_copy=None,
data_processor=None,
gradient_data_processor=None,
Expand All @@ -106,6 +110,8 @@ def __init__(
jobscript_template (str, Path): Path to jobscript template or read-in jobscript
template.
executable (str, Path): Path to main executable of respective software.
job_timeout (int, opt): Timeout for jobs in seconds. Jobs will be terminated after
timeout seconds. Default is None meaning no timeout is used.
files_to_copy (list, opt): Files or directories to copy to experiment_dir.
data_processor (obj, opt): Instance of data processor class.
gradient_data_processor (obj, opt): Instance of data processor class for gradient data.
Expand All @@ -129,6 +135,8 @@ def __init__(
self.jobscript_file_name = jobscript_file_name
self.raise_error_on_jobscript_failure = raise_error_on_jobscript_failure

self.job_timeout = job_timeout

@staticmethod
def create_input_templates_dict(input_templates):
"""Cast input templates into a dict.
Expand Down Expand Up @@ -235,8 +243,18 @@ def run(self, sample, job_id, num_procs, experiment_dir, experiment_name):
)

with metadata.time_code("run_jobscript"):
execute_cmd = "bash " + str(jobscript_file)
self._run_executable(job_id, execute_cmd, log_file, error_file, verbose=False)
try:
execute_cmd = f"bash {jobscript_file} >{log_file} 2>{error_file}"
self._run_executable(execute_cmd)
except TimeoutExpired as timeout_expired_error:
_logger.warning("Job %d timed out:", job_id)
_logger.warning("%s", timeout_expired_error)
with error_file.open("a") as file:
file.write(f"Job {job_id} timed out:\n")
file.write(f"{timeout_expired_error}")
results = (None, None)
metadata.outputs = results
return results

with metadata.time_code("data_processing"):
results = self._get_results(output_dir)
Expand Down Expand Up @@ -279,24 +297,16 @@ def _manage_paths(self, job_id, experiment_dir, experiment_name):

return job_dir, output_dir, output_file, input_files, log_file, error_file

def _run_executable(self, job_id, execute_cmd, log_file, error_file, verbose=False):
def _run_executable(self, execute_cmd):
"""Run executable.

Args:
job_id (int): Job ID.
execute_cmd (str): Executed command.
log_file (Path): Path to log file.
error_file (Path): Path to error file.
verbose (bool, opt): Flag for additional streaming to terminal.
"""
process_returncode, _, stdout, stderr = run_subprocess_with_logging(
process_returncode, _, stdout, stderr = run_subprocess(
execute_cmd,
terminate_expression="PROC.*ERROR",
logger_name=__name__ + f"_{job_id}",
log_file=str(log_file),
error_file=str(error_file),
streaming=verbose,
raise_error_on_subprocess_failure=False,
timeout=self.job_timeout,
)
if self.raise_error_on_jobscript_failure and process_returncode:
raise SubprocessError.construct_error_from_command(
Expand Down
133 changes: 0 additions & 133 deletions queens/utils/logger_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,139 +208,6 @@ def setup_cluster_logging():
root_logger.addHandler(console_stderr)


def get_job_logger(
logger_name, log_file, error_file, streaming, propagate=False, full_log_formatting=True
):
"""Setup job logging and get job logger.

Args:
logger_name (str): Logger name
log_file (path): Path to log file
error_file (path): Path to error file
streaming (bool): Flag for additional streaming to given stream
propagate (bool): Flag for propagation of stream (default: *False*)
full_log_formatting (bool): Flag to add logger metadata such as time
Returns:
job_logger (logging.logger): Job logger
lfh (logging.FileHandler): Logging file handler
efh (logging.FileHandler): Error logging file handler
stream_handler (logging.StreamHandler): Streaming handler, i.e. logging to console
"""
# get job logger
job_logger = logging.getLogger(logger_name)

if full_log_formatting:
# define formatter
formatter = NewLineFormatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
else:
formatter = NewLineFormatter("%(message)s")

# set level
job_logger.setLevel(logging.INFO)

# set option to propagate (default: false)
job_logger.propagate = propagate

# add handlers for log and error file (remark: python code is run in parallel
# for cluster runs; thus, each processor logs his own file.)
lfh = logging.FileHandler(log_file, mode="w", delay=False)
lfh.setLevel(logging.INFO)
lfh.setFormatter(formatter)
job_logger.addHandler(lfh)
efh = logging.FileHandler(error_file, mode="w", delay=False)
efh.setLevel(logging.ERROR)
efh.setFormatter(formatter)
job_logger.addHandler(efh)

# add handler for additional streaming to given stream, if required
if streaming:
stream_handler = logging.StreamHandler(stream=sys.stdout)
stream_handler.setLevel(logging.INFO)
stream_handler.terminator = ""
stream_handler.setFormatter(fmt=None)
job_logger.addHandler(stream_handler)
else:
stream_handler = None

# return job logger and handlers
return job_logger, lfh, efh, stream_handler


def job_logging(command_string, process, job_logger, terminate_expression):
"""Actual logging of job.

Args:
command_string (str): Command string for the subprocess
process (obj): Subprocess object
job_logger (obj): Job logger object
terminate_expression (str): Expression on which to terminate

Returns:
stderr (str): Error messages
"""
# initialize stderr to None
stderr = None

# start logging
job_logger.info("run_subprocess started with:")
job_logger.info(command_string)
for line in iter(process.stdout.readline, b""): # b'\n'-separated lines
line = line.rstrip() # remove any trailing whitespaces
exit_code = process.poll()
if line == "" and exit_code is not None:
job_logger.info("subprocess exited with code %s.", exit_code)
# This line waits for termination and puts together stdout not yet consumed from the
# stream by the logger and finally the stderr.
stdout, stderr = process.communicate()
# following line should never really do anything. We want to log all that was
# written to stdout even after program was terminated.
job_logger.info(stdout)
if stderr:
job_logger.error("error message (if provided) follows:")
for errline in io.StringIO(stderr):
job_logger.error(errline)
break
if terminate_expression:
# two seconds in time.sleep(2) are arbitrary. Feel free to tune it to your needs.
if re.search(terminate_expression, line):
job_logger.warning("run_subprocess detected terminate expression:")
job_logger.error(line)
# give program the chance to terminate by itself, because terminate expression
# will be found also if program terminates itself properly
time.sleep(2)
if process.poll() is None:
# log terminate command
job_logger.warning("running job will be terminated by QUEENS.")
process.terminate()
# wait before communicate call which gathers all the output
time.sleep(2)
continue
job_logger.info(line)

return stderr


def finish_job_logger(job_logger, lfh, efh, stream_handler):
"""Close and remove file handlers.

(to prevent OSError: [Errno 24] Too many open files)

Args:
job_logger (logging.logger): Job logger
lfh (logging.FileHandler): Logging file handler
efh (logging.FileHandler): Error logging file handler
stream_handler (logging.StreamHandler): Streaming handler, i.e. logging to console
"""
# we need to close the FileHandlers to
lfh.close()
efh.close()
job_logger.removeHandler(lfh)
job_logger.removeHandler(efh)
if stream_handler is not None:
stream_handler.close()
job_logger.removeHandler(stream_handler)


def reset_logging():
"""Reset loggers.

Expand Down
84 changes: 3 additions & 81 deletions queens/utils/run_subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import subprocess

from queens.utils.exceptions import SubprocessError
from queens.utils.logger_settings import finish_job_logger, get_job_logger, job_logging

_logger = logging.getLogger(__name__)

Expand All @@ -31,6 +30,7 @@ def run_subprocess(
raise_error_on_subprocess_failure=True,
additional_error_message=None,
allowed_errors=None,
timeout=None,
):
"""Run a system command outside of the Python script.

Expand All @@ -40,6 +40,7 @@ def run_subprocess(
raise_error_on_subprocess_failure (bool, optional): Raise or warn error defaults to True
additional_error_message (str, optional): Additional error message to be displayed
allowed_errors (lst, optional): List of strings to be removed from the error message
timeout (int, optional): Terminate process after timeout seconds
Returns:
process_returncode (int): code for success of subprocess
process_id (int): unique process id, the subprocess was assigned on computing machine
Expand All @@ -48,7 +49,7 @@ def run_subprocess(
"""
process = start_subprocess(command)

stdout, stderr = process.communicate()
stdout, stderr = process.communicate(timeout=timeout)
process_id = process.pid
process_returncode = process.returncode

Expand All @@ -63,85 +64,6 @@ def run_subprocess(
return process_returncode, process_id, stdout, stderr


def run_subprocess_with_logging(
command,
terminate_expression,
logger_name,
log_file,
error_file,
full_log_formatting=True,
streaming=False,
raise_error_on_subprocess_failure=True,
additional_error_message=None,
allowed_errors=None,
):
"""Run a system command outside of the Python script.

Log errors and stdout-return to initialized logger during runtime. Terminate subprocess if
regular expression pattern is found in stdout.

Args:
command (str): command, that will be run in subprocess
terminate_expression (str): regular expression to terminate subprocess
logger_name (str): logger name to write to. Should be configured previously
log_file (str): path to log file
error_file (str): path to error file
full_log_formatting (bool): Flag to add logger metadata in the simulation logs
streaming (bool, optional): Flag for additional streaming to stdout
raise_error_on_subprocess_failure (bool, optional): Raise or warn error defaults to True
additional_error_message (str, optional): Additional error message to be displayed
allowed_errors (lst, optional): List of strings to be removed from the error message
Returns:
process_returncode (int): code for success of subprocess
process_id (int): unique process id, the subprocess was assigned on computing machine
stdout (str): always None
stderr (str): standard error content
"""
# setup job logging and get job logger as well as handlers
job_logger, log_file_handle, error_file_handler, stream_handler = get_job_logger(
logger_name=logger_name,
log_file=log_file,
error_file=error_file,
streaming=streaming,
full_log_formatting=full_log_formatting,
)

# run subprocess
process = start_subprocess(command)

# actual logging of job
stderr = job_logging(
command_string=command,
process=process,
job_logger=job_logger,
terminate_expression=terminate_expression,
)

stdout = ""

# get ID and returncode of subprocess
process_id = process.pid
process_returncode = process.returncode

# close and remove file handlers (to prevent OSError: [Errno 24] Too many open files)
finish_job_logger(
job_logger=job_logger,
lfh=log_file_handle,
efh=error_file_handler,
stream_handler=stream_handler,
)

_raise_or_warn_error(
command=command,
stdout=stdout,
stderr=stderr,
raise_error_on_subprocess_failure=raise_error_on_subprocess_failure,
additional_error_message=additional_error_message,
allowed_errors=allowed_errors,
)
return process_returncode, process_id, stdout, stderr


def start_subprocess(command):
"""Start subprocess.

Expand Down
Loading