diff --git a/CoVpipe2.nf b/CoVpipe2.nf index d40ef44..9e34f38 100644 --- a/CoVpipe2.nf +++ b/CoVpipe2.nf @@ -6,7 +6,7 @@ nextflow.enable.dsl=2 if (params.help) { exit 0, helpMSG() } // parameter sanity check -Set valid_params = ['cores', 'max_cores', 'memory', 'help', 'profile', 'workdir', 'fastq', 'list', 'mode', 'run_id', 'reference', 'ref_genome', 'ref_annotation', 'adapter', 'fastp_additional_parameters', 'kraken', 'taxid', 'primer_bed', 'primer_bedpe', 'primer_version', 'vcount', 'frac', 'cov', 'vois', 'var_mqm', 'var_sap', 'var_qual', 'cns_min_cov', 'cns_gt_adjust', 'update', 'pangolin_docker_default', 'nextclade_docker_default', 'output', 'reference_dir', 'read_dir', 'mapping_dir', 'variant_calling_dir', 'consensus_dir', 'linage_dir', 'report_dir', 'rki_dir', 'runinfo_dir', 'singularity_cache_dir', 'conda_cache_dir', 'databases', 'publish_dir_mode', 'cloudProcess', 'cloud-process'] +Set valid_params = ['cores', 'max_cores', 'memory', 'help', 'profile', 'workdir', 'fastq', 'list', 'dir', 'mode', 'run_id', 'reference', 'ref_genome', 'ref_annotation', 'adapter', 'fastp_additional_parameters', 'kraken', 'taxid', 'primer_bed', 'primer_bedpe', 'primer_version', 'vcount', 'frac', 'cov', 'vois', 'var_mqm', 'var_sap', 'var_qual', 'cns_min_cov', 'cns_gt_adjust', 'update', 'pangolin_docker_default', 'nextclade_docker_default', 'output', 'reference_dir', 'read_dir', 'mapping_dir', 'variant_calling_dir', 'consensus_dir', 'linage_dir', 'report_dir', 'rki_dir', 'runinfo_dir', 'singularity_cache_dir', 'conda_cache_dir', 'databases', 'publish_dir_mode', 'cloudProcess', 'cloud-process'] def parameter_diff = params.keySet() - valid_params if (parameter_diff.size() != 0){ exit 1, "ERROR: Parameter(s) $parameter_diff is/are not valid in the pipeline!\n" @@ -80,21 +80,23 @@ if ( params.reference ) { } // illumina reads input & --list support -if (params.mode == 'paired') { - if (params.fastq && params.list) { fastqInputChannel = Channel - .fromPath( params.fastq, checkIfExists: true ) - .splitCsv(header: true, sep: ',') - .map { row -> [row.sample, [file(row.fastq_1, checkIfExists: true), file(row.fastq_2, checkIfExists: true)]] } - } else if (params.fastq) { fastqInputChannel = Channel - .fromFilePairs( params.fastq, checkIfExists: true )} -} else { - if (params.fastq && params.list) { fastqInputChannel = Channel - .fromPath( params.fastq, checkIfExists: true ) - .splitCsv(header: true, sep: ',') - .map { row -> [row.sample, [file(row.fastq, checkIfExists: true)]] }} - else if (params.fastq) { fastqInputChannel = Channel - .fromPath( params.fastq, checkIfExists: true ) - .map { file -> [file.simpleName, [file]]}} +if (! params.dir) { + if (params.mode == 'paired') { + if (params.fastq && params.list) { fastqInputChannel = Channel + .fromPath( params.fastq, checkIfExists: true ) + .splitCsv(header: true, sep: ',') + .map { row -> [row.sample, [file(row.fastq_1, checkIfExists: true), file(row.fastq_2, checkIfExists: true)]] } + } else if (params.fastq) { fastqInputChannel = Channel + .fromFilePairs( params.fastq, checkIfExists: true )} + } else { + if (params.fastq && params.list) { fastqInputChannel = Channel + .fromPath( params.fastq, checkIfExists: true ) + .splitCsv(header: true, sep: ',') + .map { row -> [row.sample, [file(row.fastq, checkIfExists: true)]] }} + else if (params.fastq) { fastqInputChannel = Channel + .fromPath( params.fastq, checkIfExists: true ) + .map { file -> [file.simpleName, [file]]}} + } } // load adapters [optional] @@ -220,12 +222,23 @@ include { genome_quality } from './workflows/genome_quality_wf' include { summary_report } from './workflows/report_wf' include { rki_report_wf } from './workflows/rki_wf' -include { bed2bedpe } from './modules/utils' +include { bed2bedpe; make_sample_sheet } from './modules/utils' /************************** * MAIN WORKFLOW **************************/ workflow { + if(params.dir){ + make_sample_sheet(params.fastq) + + if (params.mode == 'paired') { fastqInputChannel = make_sample_sheet.out + .splitCsv(header: true, sep: ',') + .map { row -> [row.sample, [file(row.fastq_1, checkIfExists: true), file(row.fastq_2, checkIfExists: true)]] } + } else { fastqInputChannel = make_sample_sheet.out + .splitCsv(header: true, sep: ',') + .map { row -> [row.sample, [file(row.fastq, checkIfExists: true)]] } + } + } // 1: reference preprocessing reference_preprocessing(ref_genome_file) diff --git a/bin/make_sample_sheet.py b/bin/make_sample_sheet.py new file mode 100755 index 0000000..59ee257 --- /dev/null +++ b/bin/make_sample_sheet.py @@ -0,0 +1,277 @@ +#!/usr/bin/env python3 + +import pandas as pd +import os +import re +import sys + +DEFAULT_FILENAME_PATTERNS = [ + ("illumina_some_institute_1",{ + "regex":r"(?P\d+)_(?P\d{2}-\d{4,5}(-[^_]+)?)_(?P.+)" + r"_(?PS\d+)_(?PL\d{3})_(?PR[12])" + r"_(?P\d{3})", + "ambig": ["lane","running"] + }), + ("illumina1",{ + "regex":r"(?P.+)_(?PS[\d]+)_(?PL[\d]{3})_" + r"(?PR[12])_(?P\d{3})", + "ambig":["lane","running"] }), + ("illumina2",{ + "regex":r"(?P.+)_(?PS[\d]+)_" + r"(?PR[12])_(?P\d{3})", + "ambig":["running"] }), + ("illumina3",{ + "regex":r"(?P.+)_(?PS[0-9]+)_(?PL[0-9]{3})_" + r"(?PR[12])", + "ambig":["lane"] }), + ("illumina4",{ + "regex":r"(?P.+)_S(?P[0-9]+)_" + r"(?PR[12])", + "ambig":[] }), + ("illumina_fallback",{ + "regex":r"(?P.+)_" + r"(?PR[12])(?P_.+)?", + "ambig":[] }), + ("SRA",{ + "regex":r"(?PSR.+)_(?P[12])", + "ambig":[] }), + + ("fallback1",{ + "regex":r"(?P.+)_(?P[A-Za-z0-9]+)", + "ambig":[] }), + ("fallback2",{ + "regex":r"(?P.+)\.(?P[A-Za-z0-9]+)", + "ambig":[], + "sep":"."}) + ] + +PAIRED_READ_REF = { + "1":1, + "2":2, + "R1":1, + "R2":2, + "FORWARD":1, + "REVERSE":2, + "FWD":1, + "REV":2, + "F":1, + "R":2, + "P":1, + "M":2, + "PLUS":1, + "MINUS":2, + "SENSE":1, + "ANTI":2 + } + +def check_and_add_fastq(_files, res, pdir=None, sample_names=None, alt_ids=None): + single_mode = len(_files) == 1 and sample_names is not None + patterns=dict(DEFAULT_FILENAME_PATTERNS) + patterns_in_order = [key for key, _ in DEFAULT_FILENAME_PATTERNS] + end=r'.(fq|fnq|fastq)(.gz)?' + for _file in (fl for fl in _files if re.match(f".*{end}", fl)): + if pdir is None: + path = os.path.dirname(os.path.realpath(_file)) + else: + path = os.path.realpath(pdir) + _file = os.path.basename(_file) + for pnum, pattern in enumerate(patterns_in_order): + regex_string = ('{patternregex}{end}'.format( + patternregex=patterns[pattern]["regex"], + end=end)) + match = re.match(regex_string, _file) + if match is None: + if pnum-1 == len(patterns): + print(f"FastQ does not meet any known spec: file: {_file}") + continue + sep = default_if_not("sep", patterns[pattern],"_") + ambig_keys=default_if_not("ambig",patterns[pattern], []) + match_dict = match.groupdict() + ambig = sep.join(match_dict[a] + for a in sorted(ambig_keys)) + nonambig = sep.join(match_dict[na] + for na in sorted(match_dict.keys()) + if na not in ambig_keys + ["read"] + and match_dict[na] is not None) + sub_sample_id = sep.join( + [val if val is not None else "" + for (_id, val) in list(match.groupdict().items()) + if _id not in ["read"]]) + if pattern.startswith("illumina_some_institute"): + matchdict = match.groupdict() + sub_sample_id = "{sid}_{lid}".format( + sid=matchdict["sample_id"], + lid=matchdict["lab_id"]) + + + read = "R1" + read_id = 1 + try: + read = match_dict["read"] + except KeyError: + pass + try: + read_id = PAIRED_READ_REF[read.upper()] + except KeyError: + print("Warning: Read name \"",read,"\" not known") + print(f" Using Regex-pattern: {pattern} {regex_string} File: {_file}") + + + key=(path,nonambig,ambig,read) + if key in res["sample_data"].index: + break + new_entry=pd.DataFrame(dict(zip(res["sample_data"].columns, + ([a] + for a + in [path, nonambig, ambig, read, read_id, + "PLACE_HOLDER", + sample_names[0] if single_mode + and sample_names is not None + else sub_sample_id, + "PLACE_HOLDER", + alt_ids[0] if single_mode + and alt_ids is not None + else "PLACE_HOLDER", + _file, pattern, + '{regexpattern}{end}'.format( + regexpattern=patterns[pattern]["regex"],end=end), + "new"])))) + new_entry.set_index(["path","unambig_id","ambig_id","read"],inplace=True, drop=False) + res["sample_data"]= res["sample_data"].append(new_entry) + # print(read, nonambig, ambig, pattern, _file, path, sep='\t') + break + #eprint(res["sample_data"][["sub_sample_id", "alt_sub_sample_id"]]) + +def default_if_not(key, _dict, default): + try: + return _dict[key] + except KeyError: + return default + +def resolve_sample_id_conflicts(sample_data): + generate_alternative_ids(sample_data) + prelim_groups = sample_data.groupby(["sub_sample_id"]).groups + for sub_id in prelim_groups: + paths_subgroup = sample_data[ + sample_data.sub_sample_id == sub_id].groupby(level="path").groups + num_subids = len(paths_subgroup) + if num_subids == 1: + continue + print("Warning: Found multiple samples ({num_subids}) with sample id {sub_id}! DUPLICATE_[Number]_ will be prepended to duplicates") + for (_id, path) in enumerate(paths_subgroup): + if _id == 0: # Skip first occurance + continue + sample_data.loc[((sample_data.sub_sample_id == sub_id) & + (sample_data.path == path)) , + "sub_sample_id"] = "Duplicate_{_id}_{samp}".format( + _id=_id, samp=sub_id) + +def generate_alternative_ids(sample_data): + if not ((sample_data["alt_sub_sample_id"]=="PLACE_HOLDER").any()): + # All Samples known and will just be passed on + return + old_samples = sample_data.loc[ + sample_data.alt_sub_sample_id != "PLACE_HOLDER"].copy() + old_sample_groups = old_samples.groupby( + level=["path", "unambig_id"]).groups + num_old_samples = len(old_sample_groups) + new_samples = sample_data.loc[ + sample_data.alt_sub_sample_id == "PLACE_HOLDER"].copy() + new_sample_groups = new_samples.groupby( + level=["path","unambig_id"]).groups + for key in (key + for key in new_sample_groups if key in old_sample_groups): + new_sample_data = sample_data.loc[new_sample_groups[key]].copy() + old_sample_data = sample_data.loc[old_sample_groups[key]].copy() + old_sample_sub_groups = old_sample_data.groupby( + level=["ambig_id"]).groups + new_sample_sub_groups = new_sample_data.groupby( + level=["ambig_id"]).groups + for sub_key in (s_key + for s_key in new_sample_sub_groups + if s_key in old_sample_sub_groups): + old_alt_id = old_sample_data.loc[ + old_sample_sub_groups[sub_key]].alt_sub_sample_id[0] + sample_data.loc[ + new_sample_sub_groups[sub_key], + "alt_sub_sample_id"] = old_alt_id + + for sub_key in (skey + for skey in new_sample_sub_groups + if skey not in old_sample_sub_groups): + raise RuntimeError( + "Use of given sample names and conflict resolution through" + " longest common prefix is not implemented yet!!!") + + new_samples = sample_data.loc[ + sample_data.alt_sub_sample_id == "PLACE_HOLDER"].copy() + new_sample_groups = new_samples.groupby( + level=["path","unambig_id"]).groups + for (_id,(path, unambig)) in enumerate(new_sample_groups): + main_name = "Sample_{nid}".format(nid=_id + 1 + num_old_samples) + sample_data.loc[ + new_sample_groups[(path, unambig)], + "alt_sample_id" ] = main_name + sub_groups = sample_data[ + sample_data.alt_sample_id == main_name].groupby( + level=["ambig_id"]).groups + if len(sub_groups)==1: + # Easy Case when Sample not split between lanes or + # in multiple files with differing running number + sample_data.loc[sample_data.alt_sample_id == main_name, + "alt_sub_sample_id"] = main_name + else: + #import pdb; pdb.set_trace() + for (_id, ambig) in enumerate(sub_groups): + sample_data.loc[((sample_data.alt_sample_id == main_name) & + (sample_data.ambig_id == ambig)), + "alt_sub_sample_id"] = "{sid}.{nid}".format( + sid=main_name, nid=_id+1) + if (sample_data["alt_sub_sample_id"]=="PLACE_HOLDER").any(): + raise RuntimeError("Some unique ids could not be assigned") + +def generate_sample_config(sample_data): + samples = dict( + (key, dict(("read{read}".format(read=read_id), os.path.join( + sample_data[( + (sample_data.sub_sample_id==key) & + (sample_data.read_id == read_id))].path[0], + sample_data[( + (sample_data.sub_sample_id==key) & + (sample_data.read_id == read_id))].file[0] + )) + for read_id in sample_data[ + sample_data.sub_sample_id==key].read_id )) + for key in sample_data.sub_sample_id) + for key in samples: + samples[key]["alt_id"] = sample_data[ + sample_data.sub_sample_id==key].alt_sub_sample_id[0] + return samples + + +def main(fastq_dir): + # fastq_dir = '/scratch/Projekte/MF1_genome-reconstruction/covpipe_testdata/20211126_FS10000749_53_BPG61617-2127/sedaghatjoo/' + assert os.path.isdir(fastq_dir), f"{fastq_dir} does not exist" + fastq_all = [f for f in os.listdir(fastq_dir) if re.search(r'.(fq|fnq|fastq)(.gz)?', f)] + + res={"sample_data":pd.DataFrame( + columns=["path", "unambig_id", "ambig_id","read", "read_id", + "sample_id", "sub_sample_id", + "alt_sample_id", "alt_sub_sample_id", + "file","match","regex", "state"], + ), + "delta_files":[]} + res['sample_data'].set_index(["path","unambig_id","ambig_id","read"], inplace=True, drop=False) + + check_and_add_fastq(fastq_all, res, fastq_dir) + samp_data = res["sample_data"] + resolve_sample_id_conflicts(samp_data) + foo = generate_sample_config(samp_data) + + print(f"sample,fastq_1,fastq_2") + for sample in foo: + print(f"{sample},{foo[sample]['read1']},{foo[sample]['read2']}") + +if __name__ == "__main__": + fastq_dir = sys.argv[1] + main(fastq_dir) diff --git a/modules/utils.nf b/modules/utils.nf index 271dcb7..47cd5dc 100644 --- a/modules/utils.nf +++ b/modules/utils.nf @@ -1,3 +1,20 @@ +process make_sample_sheet { + label 'president' + cache false + publishDir "${params.output}/${params.runinfo_dir}", mode: params.publish_dir_mode + + input: + val(read_dir) + + output: + path('sample_sheet.csv') + + script: + """ + make_sample_sheet.py ${read_dir} > sample_sheet.csv + """ +} + process compress_reads { label 'pigz' @@ -20,7 +37,7 @@ process compress_reads { process bgzip_compress { label 'samtools' - publishDir "${params.publish_dir}/${name}", mode: params.publish_dir_mode + publishDir "${params.output}/${name}", mode: params.publish_dir_mode input: tuple val(name), path(file) diff --git a/nextflow.config b/nextflow.config index 9a43e85..94ddc0a 100644 --- a/nextflow.config +++ b/nextflow.config @@ -18,6 +18,7 @@ params { // reads fastq = '' list = false + dir = false mode = 'paired' run_id = ''