diff --git a/sequence_processing_pipeline/ConvertJob.py b/sequence_processing_pipeline/ConvertJob.py index 3d8d2244..0a4ed289 100644 --- a/sequence_processing_pipeline/ConvertJob.py +++ b/sequence_processing_pipeline/ConvertJob.py @@ -1,9 +1,32 @@ -from os.path import join, exists from sequence_processing_pipeline.Job import Job from sequence_processing_pipeline.PipelineError import (PipelineError, JobFailedError) import logging import re +from jinja2 import BaseLoader, Environment, TemplateNotFound +import pathlib +from os.path import join, exists, getmtime + + +# taken from https://jinja.palletsprojects.com/en/3.0.x/api/#jinja2.BaseLoader +class KISSLoader(BaseLoader): + def __init__(self, path): + # pin the path for loader to the location sequence_processing_pipeline + # (the location of this file), along w/the relative path to the + # templates directory. + self.path = join(pathlib.Path(__file__).parent.resolve(), path) + + def get_source(self, environment, template): + path = join(self.path, template) + if not exists(path): + raise TemplateNotFound(template) + mtime = getmtime(path) + with open(path) as f: + source = f.read() + return source, path, lambda: mtime == getmtime(path) + + +logging.basicConfig(level=logging.DEBUG) class ConvertJob(Job): @@ -39,12 +62,23 @@ def __init__(self, run_dir, output_path, sample_sheet_path, queue_name, self.node_count = node_count self.nprocs = nprocs self.wall_time_limit = wall_time_limit + + # TODO: This value is currently a string e.g.: '1gb' or '10gb' read + # in from the configuration json file. However this param should be + # changed to process_mem_in_gb or similar and the string changed to + # a numerical value. self.pmem = pmem self.bcl_tool = bcl_tool_path self.qiita_job_id = qiita_job_id + # CHARLIE self.job_script_path = join(self.output_path, f"{self.job_name}.sh") self.suffix = 'fastq.gz' + # for projects that use sequence_processing_pipeline as a dependency, + # jinja_env must be set to sequence_processing_pipeline's root path, + # rather than the project's root path. + self.jinja_env = Environment(loader=KISSLoader('templates')) + tmp = False for executable_name in ['bcl2fastq', 'bcl-convert']: if executable_name in self.bcl_tool: @@ -63,76 +97,62 @@ def __init__(self, run_dir, output_path, sample_sheet_path, queue_name, self._generate_job_script() def _generate_job_script(self): - """ - Generate a Torque job script for processing supplied root_directory. - :return: The path to the newly-created job-script. - """ - lines = [] - - lines.append("#!/bin/bash") - lines.append(f"#SBATCH --job-name {self.qiita_job_id}_{self.job_name}") - lines.append(f"#SBATCH -p {self.queue_name}") - lines.append(f'#SBATCH -N {self.node_count}') - lines.append(f'#SBATCH -n {self.nprocs}') - lines.append("#SBATCH --time %d" % self.wall_time_limit) - - # send an email to the list of users defined below when a job starts, - # terminates, or aborts. This is used to confirm that the package's - # own reporting mechanism is reporting correctly. - lines.append("#SBATCH --mail-type=ALL") - - # list of users to be contacted independently of this package's - # notification system, when a job starts, terminates, or gets aborted. - lines.append("#SBATCH --mail-user qiita.help@gmail.com") - - lines.append(f"#SBATCH --mem-per-cpu {self.pmem}") - - lines.append("set -x") - lines.append('date') - lines.append('hostname') - lines.append(f'cd {self.root_dir}') - - if self.modules_to_load: - lines.append("module load " + ' '.join(self.modules_to_load)) - - # Assume that the bcl-convert tool is named 'bcl-convert' and choose - # accordingly. - if 'bcl-convert' in self.bcl_tool: - lines.append(('%s ' - '--sample-sheet "%s" ' - '--output-directory %s ' - '--bcl-input-directory . ' - '--bcl-num-decompression-threads 16 ' - '--bcl-num-conversion-threads 16 ' - '--bcl-num-compression-threads 16 ' - '--bcl-num-parallel-tiles 16 ' - '--bcl-sampleproject-subdirectories true ' - '--force') % (self.bcl_tool, - self.sample_sheet_path, - self.output_path)) - - # equivalent cp for bcl-conversion (see below) needed. - else: - lines.append(('%s ' - '--sample-sheet "%s" ' - '--minimum-trimmed-read-length 1 ' - '--mask-short-adapter-reads 1 ' - '-R . ' - '-o %s ' - '--loading-threads 16 ' - '--processing-threads 16 ' - '--writing-threads 16 ' - '--create-fastq-for-index-reads ' - '--ignore-missing-positions ') % - (self.bcl_tool, - self.sample_sheet_path, - self.output_path)) - - with open(self.job_script_path, 'w') as f: - for line in lines: - # remove long spaces in some lines. - line = re.sub(r'\s+', ' ', line) - f.write(f"{line}\n") + # bypass generating job script for a force-fail job, since it is + # not needed. + if self.force_job_fail: + return None + + template = self.jinja_env.get_template("convert_job.sh") + + job_name = f'{self.qiita_job_id}_{self.job_name}' + + with open(self.job_script_path, mode="w", encoding="utf-8") as f: + if 'bcl-convert' in self.bcl_tool: + cmd_line = (f'{self.bcl_tool} ' + f'--sample-sheet "{self.sample_sheet_path}" ' + f'--output-directory {self.output_path} ' + '--bcl-input-directory . ' + '--bcl-num-decompression-threads 16 ' + '--bcl-num-conversion-threads 16 ' + '--bcl-num-compression-threads 16 ' + '--bcl-num-parallel-tiles 16 ' + '--bcl-sampleproject-subdirectories true ' + '--force') + # equivalent cp for bcl-conversion (see below) needed. + else: + cmd_line = (f'{self.bcl_tool} ' + f'--sample-sheet "{self.sample_sheet_path}" ' + '--minimum-trimmed-read-length 1 ' + '--mask-short-adapter-reads 1 ' + '-R . ' + f'-o {self.output_path} ' + '--loading-threads 16 ' + '--processing-threads 16 ' + '--writing-threads 16 ' + '--create-fastq-for-index-reads ' + '--ignore-missing-positions ') + + params = {'job_name': job_name, + 'queue_name': self.queue_name, + 'node_count': self.node_count, + 'nprocs': self.nprocs, + 'wall_time_limit': self.wall_time_limit, + 'mem_per_cpu': self.pmem, + 'run_dir': self.root_dir, + 'sheet_path': self.sample_sheet_path, + 'cmd_line': cmd_line} + + # generate a string of linux system modules to load before + # processing begins. + if self.modules_to_load: + # if {{modules_to_load}} is defined, not empty and not false, + # then the line "module load " will be + # added to the template. + params['modules_to_load'] = ' '.join(self.modules_to_load) + + f.write(template.render(**params)) + + return self.job_script_path def run(self, callback=None): """ diff --git a/sequence_processing_pipeline/NuQCJob.py b/sequence_processing_pipeline/NuQCJob.py index b1c27900..fa431eef 100644 --- a/sequence_processing_pipeline/NuQCJob.py +++ b/sequence_processing_pipeline/NuQCJob.py @@ -1,4 +1,4 @@ -from jinja2 import BaseLoader, TemplateNotFound +from jinja2 import BaseLoader, Environment, TemplateNotFound from metapool import load_sample_sheet from os import stat, makedirs, rename from os.path import join, basename, dirname, exists, abspath, getmtime @@ -10,7 +10,6 @@ import logging from sequence_processing_pipeline.Commands import split_similar_size_bins from sequence_processing_pipeline.util import iter_paired_files -from jinja2 import Environment import glob import re from sys import executable diff --git a/sequence_processing_pipeline/templates/convert_job.sh b/sequence_processing_pipeline/templates/convert_job.sh new file mode 100644 index 00000000..5c2a2835 --- /dev/null +++ b/sequence_processing_pipeline/templates/convert_job.sh @@ -0,0 +1,17 @@ +#!/bin/bash +#SBATCH -J {{job_name}} +#SBATCH -p {{queue_name}} +#SBATCH -N {{node_count}} +#SBATCH -n {{nprocs}} +#SBATCH --time {{wall_time_limit}} +#SBATCH --mail-type=ALL +#SBATCH --mail-user qiita.help@gmail.com +#SBATCH --mem-per-cpu {{mem_per_cpu}} +set -x +date +hostname +cd {{run_dir}} +{% if modules_to_load %} + module load {{modules_to_load}} +{% endif %} +{{cmd_line}} \ No newline at end of file diff --git a/sequence_processing_pipeline/tests/test_ConvertJob.py b/sequence_processing_pipeline/tests/test_ConvertJob.py index df81fdcf..c2cf318b 100644 --- a/sequence_processing_pipeline/tests/test_ConvertJob.py +++ b/sequence_processing_pipeline/tests/test_ConvertJob.py @@ -910,6 +910,7 @@ def tearDown(self): rmtree(self.good_output_path) def test_creation(self): + self.maxDiff = None run_dir = self.base_path('211021_A00000_0000_SAMPLE') inv_input_directory = self.base_path('inv_input_directory') qiita_id = 'abcdabcdabcdabcdabcdabcdabcdabcd' @@ -934,13 +935,15 @@ def test_creation(self): 'ConvertJob.sh')) as f: obs = ''.join(f.readlines()) - # ssp should be just the value of the self.path() partial function by - # itself. For readability, SCRIPT_EXP addresses the '/' separator. - # Hence, the trailing '/' is redundant and should be removed here. - self.assertEqual(obs, - SCRIPT_EXP.format(ssp=self.base_path('').rstrip('/'), - gop=self.good_output_path, - run_dir=run_dir)) + # substitute variables in expected output with the run-time values + # that we expect. + exp = SCRIPT_EXP.replace("{run_dir}", run_dir).\ + replace("{gop}", self.good_output_path).\ + replace("{ssp}/", self.base_path('')) + + # remove trailing whitespace from the ends of each parameter, since + # it's not important. + self.assertEqual(obs.rstrip(), exp.rstrip()) def test_error_msg_from_logs(self): run_dir = self.base_path('211021_A00000_0000_SAMPLE') @@ -998,7 +1001,7 @@ def test_parse_sample_sheet(self): SCRIPT_EXP = ''.join([ '#!/bin/bash\n', - '#SBATCH --job-name abcdabcdabcdabcdabcdabcdabcdabcd_ConvertJob\n', + '#SBATCH -J abcdabcdabcdabcdabcdabcdabcdabcd_ConvertJob\n', '#SBATCH -p qiita\n', '#SBATCH -N 1\n', '#SBATCH -n 16\n', @@ -1009,7 +1012,7 @@ def test_parse_sample_sheet(self): 'set -x\n', 'date\n', 'hostname\n', - 'cd {run_dir}\n', + 'cd {run_dir}\n\n', 'tests/bin/bcl-convert --sample-sheet "{ssp}/good-sample-sheet.csv" ' '--output-directory {gop}/ConvertJob ' '--bcl-input-directory . '