diff --git a/main.nf b/main.nf index 94756edf..11e888bf 100644 --- a/main.nf +++ b/main.nf @@ -9,6 +9,9 @@ ---------------------------------------------------------------------------------------- */ +nextflow.enable.dsl=2 + + def helpMessage() { @@ -364,7 +367,7 @@ if (params.subsample) { csv_pairs_ch, csv_singles_ch, read_pairs_ch, read_singles_ch, input_paths_ch) .dump ( tag: 'ch_read_files_trimming_unchecked__with_fastas' ) - .into { ch_read_files_trimming_to_trim; ch_read_files_trimming_to_check_size } + .set { ch_read_files_trimming } } else { // No fasta files - combine everything and error out sra_ch.concat( @@ -394,7 +397,6 @@ if (!protein_input) { reads_ch_unchecked .ifEmpty{ exit 1, "No reads provided! Check read input files" } .set { ch_reads_for_ribosomal_removal } - ch_read_files_trimming_to_check_size = Channel.empty() } else if (params.bam || params.tenx_tgz) { ch_non_bam_reads_unchecked // No need to check if empty since there is bam input @@ -403,7 +405,7 @@ if (!protein_input) { // if no fastas, then definitely trimming the remaining reads ch_read_files_trimming_unchecked .ifEmpty{ exit 1, "No reads provided! Check read input files" } - .into { ch_read_files_trimming_to_trim; ch_read_files_trimming_to_check_size } + .set { ch_read_files_trimming } } } else { // Since there exists protein input, don't check if these are empty @@ -414,10 +416,9 @@ if (!protein_input) { if (params.skip_trimming) { reads_ch_unchecked .set { ch_reads_for_ribosomal_removal } - ch_read_files_trimming_to_check_size = Channel.empty() } else if (!have_nucleotide_fasta_input) { ch_read_files_trimming_unchecked - .into { ch_read_files_trimming_to_trim; ch_read_files_trimming_to_check_size } + .set { ch_read_files_trimming } } if (params.bam) { ch_non_bam_reads_unchecked @@ -450,7 +451,7 @@ save_translate_json = params.save_translate_json // --- Parse the Sourmash parameters ---- ksizes = params.ksizes?.toString().tokenize(',') Channel.from(params.ksizes?.toString().tokenize(',')) - .into { ch_ksizes_for_compare_peptide; ch_ksizes_for_compare_nucleotide } + .set { ch_ksizes_for_compare } molecules = params.molecules?.toString().tokenize(',') peptide_molecules = molecules.findAll { it != "dna" } @@ -461,7 +462,7 @@ Channel.from( molecules ) .set { ch_molecules } Channel.from( peptide_molecules ) - .into { ch_peptide_molecules; ch_peptide_molecules_for_compare } + .set { ch_peptide_molecules } // Parse sketch value and style parameters sketch_num_hashes = params.sketch_num_hashes @@ -546,7 +547,7 @@ if (workflow.profile.contains('awsbatch')) { // related: https://github.com/nextflow-io/nextflow/issues/813 if (!params.outdir.startsWith('s3:')) exit 1, "Outdir not on S3 - specify S3 Bucket to run on AWSBatch!" // Prevent trace files to be stored on S3 since S3 does not support rolling files. - if (params.tracedir.startsWith('s3:')) exit 1, "Specify a local tracedir or run without trace! S3 cannot be used for tracefiles." + // if (params.tracedir.startsWith('s3:')) exit 1, "Specify a local tracedir or run without trace! S3 cannot be used for tracefiles." } // Stage config files @@ -676,17 +677,14 @@ process get_software_versions { scrape_software_versions.py &> software_versions_mqc.yaml """ } -if ( !params.split_kmer && have_sketch_value ) { - // Only use this for sourmash sketches, not split k-mer sketches - /* - * Validate sketch sizes - */ + + process validate_sketch_value { - publishDir "${params.outdir}/pipeline_info", mode: params.publish_dir_mode, - saveAs: {filename -> - if (filename.indexOf(".txt") > 0) filename - else null - } + // publishDir "${params.outdir}/pipeline_info", mode: params.publish_dir_mode, + // saveAs: {filename -> + // if (filename.indexOf(".txt") > 0) filename + // else null + // } input: val sketch_num_hashes val sketch_num_hashes_log2 @@ -694,12 +692,12 @@ if ( !params.split_kmer && have_sketch_value ) { val sketch_scaled_log2 output: - file sketch_value into ch_sketch_value_unparsed - file sketch_style into ch_sketch_style_unparsed + path 'sketch_value.txt', emit: sketch_value + path "sketch_style.txt", emit: sketch_style script: - sketch_style = "sketch_style.txt" - sketch_value = 'sketch_value.txt' + def sketch_style = "sketch_style.txt" + def sketch_value = 'sketch_value.txt' """ validate_sketch_value.py \\ --sketch_num_hashes ${sketch_num_hashes} \\ @@ -711,33 +709,8 @@ if ( !params.split_kmer && have_sketch_value ) { """ } - // Parse sketch style into value - sketch_style_parsed = ch_sketch_style_unparsed - .splitText() - .dump ( tag: 'ch_sketch_style' ) - .map { it -> it.replaceAll('\\n', '' ) } - .first() - .dump ( tag: 'sketch_style_parsed' ) - .collect () - // get first item of returned array from .collect() - // sketch_style_parsed = sketch_style_parsed[0] - // .into { ch_sketch_style_for_nucleotides; ch_sketch_style_for_proteins } - // sketch_style = sketch_styles[0] - // println "sketch_style_parsed: ${sketch_style_parsed}" - // println "sketch_style: ${sketch_style}" - - // Parse file into values - sketch_value_parsed = ch_sketch_value_unparsed - .splitText() - .map { it -> it.replaceAll('\\n', '')} - .first() - .dump ( tag : 'sketch_value_parsed' ) - .collect() - // get first item of returned array from .collect() - // sketch_value_parsed = sketch_value_parsed[0] - // .into { ch_sketch_value_for_proteins; ch_sketch_value_for_dna } -} + // Combine sketch values with ksize and molecule types @@ -965,104 +938,63 @@ if (params.tenx_tgz || params.bam) { ch_non_bam_reads .mix ( per_cell_fastqs_ch ) .dump ( tag: 'ch_non_bam_reads__per_cell_fastqs_ch' ) - .into{ ch_read_files_trimming_to_trim; ch_read_files_trimming_to_check_size } + .into{ ch_read_files_trimming } } } -if ( have_nucleotide_input ) { - if (!params.skip_trimming && have_nucleotide_fastq_input){ - process fastp { - label 'process_low' - tag "$name" - publishDir "${params.outdir}/fastp", mode: params.publish_dir_mode, - saveAs: {filename -> - if (filename.indexOf(".fastq.gz") == -1) "logs/$filename" - else if (reads[1] == null) "single_end/$filename" - else if (reads[1] != null) "paired_end/$filename" - else null - } - - input: - set val(name), file(reads) from ch_read_files_trimming_to_trim +process fastp { + label 'process_low' + tag "$name" + publishDir "${params.outdir}/fastp", mode: params.publish_dir_mode, + saveAs: {filename -> + if (filename.indexOf(".fastq.gz") == -1) "logs/$filename" + else if (reads[1] == null) "single_end/$filename" + else if (reads[1] != null) "paired_end/$filename" + else null + } - output: - set val(name), file("*trimmed.fastq.gz") into ch_reads_all_trimmed - file "*fastp.json" into ch_fastp_results - file "*fastp.html" into ch_fastp_html + input: + tuple val(name), path(reads) - script: - // One set of reads --> single end - if (reads[1] == null) { - """ - fastp \\ - --in1 ${reads} \\ - --out1 ${name}_R1_trimmed.fastq.gz \\ - --json ${name}_fastp.json \\ - --html ${name}_fastp.html - """ - } else if (reads[1] != null ){ - // More than one set of reads --> paired end - """ - fastp \\ - --in1 ${reads[0]} \\ - --in2 ${reads[1]} \\ - --out1 ${name}_R1_trimmed.fastq.gz \\ - --out2 ${name}_R2_trimmed.fastq.gz \\ - --json ${name}_fastp.json \\ - --html ${name}_fastp.html - """ - } else { - """ - echo name ${name} - echo reads: ${reads} - echo "Number of reads is not equal to 1 or 2 --> don't know how to trim non-paired-end and non-single-end reads" - """ - } - } + output: + tuple val(name), path("*trimmed.fastq.gz"), emit: trimmed_reads + path "*fastp.json" , emit: json + path "*fastp.html" , emit: html - // Filtering out fastq.gz files less than 200 bytes (arbitary number) - // ~200 bytes is about the size of a file with a single read or less - // We can't use .size() > 0 because it's fastq.gz is gzipped content - ch_reads_all_trimmed - .dump ( tag: 'ch_reads_all_trimmed' ) - .branch { - // Paired is a tuple of two reads - paired: it[1].size() == 2 - single: true + script: + // One set of reads --> single end + if (reads[1] == null) { + """ + fastp \\ + --in1 ${reads} \\ + --out1 ${name}_R1_trimmed.fastq.gz \\ + --json ${name}_fastp.json \\ + --html ${name}_fastp.html + """ + } else if (reads[1] != null ){ + // More than one set of reads --> paired end + """ + fastp \\ + --in1 ${reads[0]} \\ + --in2 ${reads[1]} \\ + --out1 ${name}_R1_trimmed.fastq.gz \\ + --out2 ${name}_R2_trimmed.fastq.gz \\ + --json ${name}_fastp.json \\ + --html ${name}_fastp.html + """ + } else { + """ + echo name ${name} + echo reads: ${reads} + echo "Number of reads is not equal to 1 or 2 --> don't know how to trim non-paired-end and non-single-end reads" + """ } - .set { ch_reads_trimmed_branched } - - ch_reads_trimmed_branched.paired - .filter{ it -> it[1][0].size() > 200 } - .dump ( tag: 'ch_reads_trimmed_paired' ) - .set{ ch_reads_trimmed_paired } - - ch_reads_trimmed_branched.single - .filter{ it -> it[1].size() > 200 } - .dump ( tag: 'ch_reads_trimmed_single' ) - .set{ ch_reads_trimmed_single } - - ch_reads_trimmed_single - .mix ( ch_reads_trimmed_paired ) - .set { ch_reads_trimmed } - - // Concatenate trimmed fastq files with fastas - if (params.subsample){ - // Concatenate trimmed reads with fastas for subsequent subsampling - ch_reads_trimmed - .concat( fastas_ch ) - .dump ( tag: 'trimmed_reads__concat_fastas' ) - .set { subsample_ch_reads_for_ribosomal_removal } - } else { - // Concatenate trimmed reads with fastas for signature generation - ch_reads_for_ribosomal_removal = ch_reads_trimmed.mix(fastas_ch) - } -} else { - ch_reads_for_ribosomal_removal = fastas_ch - ch_fastp_results = Channel.from(false) } + + + if (params.subsample) { process subsample_input { tag "${id}_subsample" @@ -1087,26 +1019,19 @@ if (params.subsample) { } } -/* - * STEP 2+ - SortMeRNA - remove rRNA sequences on request - */ -if (!params.remove_ribo_rna) { - ch_reads_for_ribosomal_removal - .set { ch_reads_to_translate } - sortmerna_logs = Channel.empty() -} else { + process sortmerna_index { label 'mid_memory_long' label 'mid_cpu' tag "${fasta.baseName}" input: - file(fasta) from sortmerna_fasta + file(fasta) output: - val("${fasta.baseName}") into sortmerna_db_name - file("$fasta") into sortmerna_db_fasta - file("${fasta.baseName}*") into sortmerna_db + val("${fasta.baseName}") , emit: db_name + path("$fasta") , emit: db_fasta + path("${fasta.baseName}*"), emit: db script: """ @@ -1177,206 +1102,147 @@ if (!params.remove_ribo_rna) { """ } } - } - - - - if (params.reference_proteome_fasta){ - process translate { - tag "${sample_id}" - label "low_memory_long" - publishDir "${params.outdir}/translate/", mode: params.publish_dir_mode, - saveAs: { - filename -> - if (save_translate_csv && filename.indexOf(".csv") > 0) "$filename" - else if (save_translate_json && filename.indexOf(".json") > 0) "$filename" - else if (filename.indexOf("_noncoding_reads_nucleotides") > 0) "noncoding_nucleotides/${filename}" - else if (filename.indexOf("_coding_reads_nucleotides") > 0) "coding_nucleotides/${filename}" - else if (filename.indexOf("_coding_reads_peptides") > 0) "coding_peptides/${filename}" - else "$filename" - } - - input: - set bloom_id, molecule, file(bloom_filter) from ch_orpheum_bloom_filter.collect() - set sample_id, file(reads) from ch_reads_to_translate - - output: - // TODO also extract nucleotide sequence of coding reads and do sourmash compute using only DNA on that? - set val(sample_id), file("${sample_id}__noncoding_reads_nucleotides.fasta") into ch_noncoding_nucleotides_potentially_empty - set val(sample_id), file("${sample_id}__coding_reads_peptides.fasta") into ch_translated_protein_seqs - set val(sample_id), file("${sample_id}__coding_reads_nucleotides.fasta") into ch_translatable_nucleotide_seqs - set val(sample_id), file(translate_csv) into ch_coding_scores_csv - set val(sample_id), file(translate_json) into ch_coding_scores_json - - script: - translate_json = "${sample_id}__coding_summary.json" - translate_csv = "${sample_id}__coding_scores.csv" - csv_flag = save_translate_csv ? "--csv ${translate_csv}" : '' - json_flag = save_translate_json ? "--json-summary ${translate_json}" : '' - - """ - orpheum translate \\ - --molecule ${molecule} \\ - --coding-nucleotide-fasta ${sample_id}__coding_reads_nucleotides.fasta \\ - --noncoding-nucleotide-fasta ${sample_id}__noncoding_reads_nucleotides.fasta \\ - ${csv_flag} \\ - ${json_flag} \\ - --jaccard-threshold ${translate_jaccard_threshold} \\ - --peptide-ksize ${translate_peptide_ksize} \\ - --peptides-are-bloom-filter \\ - ${bloom_filter} \\ - ${reads} > ${sample_id}__coding_reads_peptides.fasta - touch ${translate_csv} - touch ${translate_json} - """ - } + - // Remove empty files - // it[0] = sample id - // it[1] = sequence fasta file - ch_translated_protein_seqs - .mix ( ch_protein_fastas ) - .dump ( tag: 'ch_protein_seq_to_sketch_from_translate' ) - .set { ch_protein_seq_to_sketch } - // Remove empty files - // it[0] = sample bloom id - // it[1] = sequence fasta file - ch_noncoding_nucleotides_nonempty = ch_noncoding_nucleotides_potentially_empty.filter{ it[1].size() > 0 } - ch_translatable_nucleotide_seqs - .dump( tag: 'ch_translatable_nucleotide_seqs' ) - .filter{ it[1].size() > 0 } - .dump ( tag: 'ch_reads_to_sketch__from_translate' ) - .set { ch_reads_to_sketch } - } else { - // Send reads directly into coding/noncoding - ch_reads_to_translate - .dump ( tag: 'ch_reads_to_sketch__no_translation' ) - .set{ ch_reads_to_sketch } - } - if (params.split_kmer){ - /////////////////////////////////////////////////////////////////////////////// - /////////////////////////////////////////////////////////////////////////////// - /* -- -- */ - /* -- CREATE SKA SKETCH -- */ - /* -- -- */ - /////////////////////////////////////////////////////////////////////////////// - /////////////////////////////////////////////////////////////////////////////// +process translate { + tag "${sample_id}" + label "low_memory_long" + publishDir "${params.outdir}/translate/", mode: params.publish_dir_mode, + saveAs: { + filename -> + if (save_translate_csv && filename.indexOf(".csv") > 0) "$filename" + else if (save_translate_json && filename.indexOf(".json") > 0) "$filename" + else if (filename.indexOf("_noncoding_reads_nucleotides") > 0) "noncoding_nucleotides/${filename}" + else if (filename.indexOf("_coding_reads_nucleotides") > 0) "coding_nucleotides/${filename}" + else if (filename.indexOf("_coding_reads_peptides") > 0) "coding_peptides/${filename}" + else "$filename" + } - process ska_compute_sketch { - tag "${sketch_id}" - publishDir "${params.outdir}/ska/sketches/", mode: params.publish_dir_mode - errorStrategy 'retry' - maxRetries 3 + input: + tuple bloom_id, molecule, path(bloom_filter) + tuple sample_id, path(reads) + + output: + // TODO also extract nucleotide sequence of coding reads and do sourmash compute using only DNA on that? + set val(sample_id), file("${sample_id}__noncoding_reads_nucleotides.fasta") into ch_noncoding_nucleotides_potentially_empty + set val(sample_id), file("${sample_id}__coding_reads_peptides.fasta") into ch_translated_protein_seqs + set val(sample_id), file("${sample_id}__coding_reads_nucleotides.fasta") into ch_translatable_nucleotide_seqs + set val(sample_id), file(translate_csv) into ch_coding_scores_csv + set val(sample_id), file(translate_json) into ch_coding_scores_json + + script: + translate_json = "${sample_id}__coding_summary.json" + translate_csv = "${sample_id}__coding_scores.csv" + csv_flag = save_translate_csv ? "--csv ${translate_csv}" : '' + json_flag = save_translate_json ? "--json-summary ${translate_json}" : '' + + """ + orpheum translate \\ + --molecule ${molecule} \\ + --coding-nucleotide-fasta ${sample_id}__coding_reads_nucleotides.fasta \\ + --noncoding-nucleotide-fasta ${sample_id}__noncoding_reads_nucleotides.fasta \\ + ${csv_flag} \\ + ${json_flag} \\ + --jaccard-threshold ${translate_jaccard_threshold} \\ + --peptide-ksize ${translate_peptide_ksize} \\ + --peptides-are-bloom-filter \\ + ${bloom_filter} \\ + ${reads} > ${sample_id}__coding_reads_peptides.fasta + touch ${translate_csv} + touch ${translate_json} + """ +} - input: - each ksize from ksizes - set id, file(reads) from ch_reads_to_sketch - output: - set val(ksize), file("${sketch_id}.skf") into ska_sketches - script: - sketch_id = "${id}_ksize_${ksize}" + +process ska_compute_sketch { + tag "${sketch_id}" + publishDir "${params.outdir}/ska/sketches/", mode: params.publish_dir_mode + errorStrategy 'retry' + maxRetries 3 - """ - ska fastq \\ - -k $ksize \\ - -o ${sketch_id} \\ - ${reads} - """ - } - } else if (!params.skip_compute) { + input: + each ksize from ksizes + set id, file(reads) from ch_reads_to_sketch - process sourmash_compute_sketch_fastx_nucleotide { - tag "${sig_id}" - label "low_memory" - publishDir "${params.outdir}/sketches_nucleotide/${sketch_id}", mode: "${params.publish_dir_mode}", - saveAs: {filename -> - if (filename.indexOf(".csv") > 0) "description/$filename" - else if (filename.indexOf(".sig") > 0) "sigs/$filename" - else null - } + output: + set val(ksize), file("${sketch_id}.skf") into ska_sketches - input: - val track_abundance - val sketch_value_parsed - val sketch_style_parsed - set val(sample_id), file(reads) from ch_reads_to_sketch + script: + sketch_id = "${id}_ksize_${ksize}" - output: - file(csv) into ch_sourmash_sig_describe_nucleotides - set val(sample_id), val(sketch_id), val("dna"), val(params.ksizes), file(sig) into sourmash_sketches_all_nucleotide + """ + ska fastq \\ + -k $ksize \\ + -o ${sketch_id} \\ + ${reads} + """ - script: - // Don't calculate DNA signature if this is protein, to minimize disk, - // memory and IO requirements in the future - sketch_id = make_sketch_id( - "dna", - params.ksizes, - sketch_value_parsed[0], - track_abundance, - sketch_style_parsed[0] - ) - sketch_value_flag = make_sketch_value_flag(sketch_style_parsed[0], sketch_value_parsed[0]) - track_abundance_flag = track_abundance ? '--track-abundance' : '' - sig_id = "${sample_id}__${sketch_id}" - sig = "${sig_id}.sig" - csv = "${sig_id}.csv" - """ - sourmash compute \\ - ${sketch_value_flag} \\ - --ksizes ${params.ksizes} \\ - --dna \\ - $track_abundance_flag \\ - --output ${sig} \\ - --name '${sample_id}' \\ - $reads - sourmash sig describe --csv ${csv} ${sig} - """ - } - sourmash_sketches_all_nucleotide - .filter{ it[3].size() > 0 } - .dump ( tag: "sourmash_sketches_all_nucleotide" ) - .set { sourmash_sketches_nucleotide } - } else { - sourmash_sketches_nucleotide = Channel.empty() - ch_protein_fastas - .set { ch_protein_seq_to_sketch } - ch_sourmash_sig_describe_nucleotides = Channel.empty() } -} else { - sourmash_sketches_nucleotide = Channel.empty() - ch_fastp_results = Channel.from(false) - sortmerna_logs = Channel.from(false) - - ch_protein_fastas - .set { ch_protein_seq_to_sketch } - ch_sourmash_sig_describe_nucleotides = Channel.empty() -} +process sourmash_compute_sketch_fastx_nucleotide { + tag "${sig_id}" + label "low_memory" -if ((!have_nucleotide_input) || params.skip_trimming || have_nucleotide_fasta_input) { - // Only protein input or skip trimming, or fastas which can't be trimmed. - ch_fastp_results = Channel.from(false) - sortmerna_logs = Channel.from(false) - -} + memory { reads.size() * 4 * task.attempt } + publishDir "${params.outdir}/sketches_nucleotide/${sketch_id}", mode: "${params.publish_dir_mode}", + saveAs: {filename -> + if (filename.indexOf(".csv") > 0) "description/$filename" + else if (filename.indexOf(".sig") > 0) "sigs/$filename" + else null + } -if (!have_nucleotide_input) { - // Only protein input, can't do sortMeRNA - sortmerna_logs = Channel.empty() - ch_fastp_results = Channel.from(false) + input: + each ksize + val track_abundance + val sketch_value_parsed + val sketch_style_parsed + tuple val(sample_id), path(reads) + + output: + path(csv) , emit: describe_csv + tuple val(sample_id), val(sketch_id), val("dna"), val(ksize), path(sig), emit: sketches + + script: + // Don't calculate DNA signature if this is protein, to minimize disk, + // memory and IO requirements in the future + sketch_id = make_sketch_id( + "dna", + ksize, + sketch_value_parsed[0], + track_abundance, + sketch_style_parsed[0] + ) + sketch_value_flag = make_sketch_value_flag(sketch_style_parsed[0], sketch_value_parsed[0]) + track_abundance_flag = track_abundance ? '--track-abundance' : '' + sig_id = "${sample_id}__${sketch_id}" + sig = "${sig_id}.sig" + csv = "${sig_id}.csv" + """ + sourmash compute \\ + ${sketch_value_flag} \\ + --ksizes ${ksize} \\ + --dna \\ + $track_abundance_flag \\ + --output ${sig} \\ + --name '${sample_id}' \\ + $reads + + sourmash sig describe \\ + --csv ${csv} \\ + ${sig} + """ } + -if (!params.skip_compute && (protein_input || params.reference_proteome_fasta)){ - process sourmash_compute_sketch_fastx_peptide { tag "${sig_id}" label "low_memory" @@ -1388,19 +1254,20 @@ if (!params.skip_compute && (protein_input || params.reference_proteome_fasta)){ } input: + each val(ksize) from ksizes val track_abundance val sketch_value_parsed val sketch_style_parsed - set val(sample_id), file(reads) from ch_protein_seq_to_sketch + tuple val(sample_id), path(protein_seqs) output: file(csv) into ch_sourmash_sig_describe_peptides - set val(sample_id), val(sketch_id), val(peptide_molecules_comma_separated), val(params.ksizes), file(sig) into sourmash_sketches_all_peptide + set val(sample_id), val(sketch_id), val(peptide_molecules_comma_separated), val(ksize), file(sig) into sourmash_sketches_all_peptide script: sketch_id = make_sketch_id( peptide_molecules_comma_separated, - params.ksizes, + ksize, sketch_value_parsed[0], track_abundance, sketch_style_parsed[0] @@ -1412,81 +1279,23 @@ if (!params.skip_compute && (protein_input || params.reference_proteome_fasta)){ sig = "${sig_id}.sig" csv = "${sig_id}.csv" """ - sourmash compute \\ - ${sketch_value_flag} \\ - --ksizes ${params.ksizes} \\ - --input-is-protein \\ - ${peptide_molecule_flags} \\ - --name '${sample_id}' \\ - --no-dna \\ - $track_abundance_flag \\ - --output ${sig} \\ - $reads - sourmash sig describe --csv ${csv} ${sig} + sourmash compute \\ + ${sketch_value_flag} \\ + --ksizes ${ksize} \\ + --input-is-protein \\ + ${peptide_molecule_flags} \\ + --name '${sample_id}' \\ + --no-dna \\ + $track_abundance_flag \\ + --output ${sig} \\ + $protein_seqs + + sourmash sig describe \\ + --csv ${csv} \\ + ${sig} """ } - sourmash_sketches_peptide = sourmash_sketches_all_peptide.filter{ it[3].size() > 0 } -} else { - sourmash_sketches_peptide = Channel.empty() - ch_sourmash_sig_describe_peptides = Channel.empty() -} - -// ------------- -// Merge signatures from same sample id and sketch id -// ------------- -if ((params.bam || params.tenx_tgz) && !params.skip_compute && !params.skip_sig_merge) { - - sourmash_sketches_nucleotide - .mix ( sourmash_sketches_peptide ) - .dump ( tag: 'ch_sourmash_sketches_mixed' ) - .set { ch_sourmash_sketches_mixed } - - ch_fastq_id_to_cell_id_is_aligned - .dump( tag: 'ch_fastq_id_to_cell_id_is_aligned' ) - .combine ( ch_sourmash_sketches_mixed, by: 0 ) - .unique() - .dump( tag: 'fastq_id_to_cells__combine__sketches' ) - // [DUMP: fastq_id_to_cells__combine__sketches] - // ['mouse_brown_fat_ptprc_plus_unaligned__aligned__CTGAAGTCAATGGTCT', - // mouse_brown_fat_ptprc_plus_unaligned__CTGAAGTCAATGGTCT, - // 'aligned', - // molecule-dna__ksize-3__num_hashes-4__track_abundance-false, - // 'dna', - // '3', - // mouse_brown_fat_ptprc_plus_unaligned__aligned__CTGAAGTCAATGGTCT__molecule-dna__ksize-3__num_hashes-4__track_abundance-false.sig] - // it[0]: fastq_id, e.g. "mouse_brown_fat_ptprc_plus_unaligned__aligned__CTGAAGTCAATGGTCT" (contains aligned/unaligned) - // it[1]: cell_id, e.g. "mouse_brown_fat_ptprc_plus_unaligned__CTGAAGTCAATGGTCT" - // it[2]: is_aligned, e.g. "aligned" - // it[3]: sketch_id, e.g. molecule-dna__ksize-3__num_hashes-4__track_abundance-false - // it[4]: molecule, e.g. 'dna' - // it[5]: ksize, e.g. '3' - // it[6]: signature file, e.g. mouse_brown_fat_ptprc_plus_unaligned__aligned__CTGAAGTCAATGGTCT__molecule-dna__ksize-3__num_hashes-4__track_abundance-false.sig - .groupTuple( by: [1, 3, 4, 5] ) - // [DUMP: fastq_id_to_cells__combine__sketches__grouptuple] - // [ - // ['mouse_brown_fat_ptprc_plus_unaligned__aligned__GCGCAGTCATGCCTTC'], - // mouse_brown_fat_ptprc_plus_unaligned__GCGCAGTCATGCCTTC, - // ['aligned'], - // molecule-dna__ksize-3,9__scaled-2__track_abundance-false, - // 'dna', - // '3,9', - // [mouse_brown_fat_ptprc_plus_unaligned__aligned__GCGCAGTCATGCCTTC__molecule-dna__ksize-3,9__scaled-2__track_abundance-false.sig] - // ] - .dump( tag: 'fastq_id_to_cells__combine__sketches__grouptuple' ) - .map { it -> [it[0].unique(), it[1], it[2].unique(), it[3], it[4], it[5], it[6]] } - .dump( tag: 'fastq_id_to_cells__combine__sketches__grouptuple__unique' ) - .set { ch_sourmash_sketches_to_merge } - - // ch_sourmash_sketches_branched - // .to_merge - // .dump ( tag: 'ch_sourmash_sketches_to_merge' ) - // .set { ch_sourmash_sketches_to_merge } - // ch_sourmash_sketches_branched - // .single - // .map { [it[0][0], it[1], it[2][0], it[3], it[4], it[5], file(it[6][0])] } - // .dump ( tag: 'ch_sourmash_sketches_to_mix_with_merged' ) - // .set { ch_sourmash_sketches_to_mix_with_merged } process sourmash_sig_merge { tag "${sig_id}" @@ -1499,11 +1308,11 @@ if ((params.bam || params.tenx_tgz) && !params.skip_compute && !params.skip_sig_ } input: - set val(fasta_ids), val(cell_id), val(is_aligned), val(sketch_id), val(moltypes), val(ksizes), file(sigs) from ch_sourmash_sketches_to_merge + tuple val(fasta_ids), val(cell_id), val(is_aligned), val(sketch_id), val(moltypes), val(ksizes), path(sigs) output: - file(csv) into ch_sourmash_sig_describe_merged - set val(cell_id), val(sketch_id), val(moltypes), val(ksizes), file(output_sig) into ch_sourmash_sketches_merged, ch_sourmash_sketches_merged_to_view, ch_sourmash_sketches_merged_for_moltypes_ksizes + file(csv) into describe_csv + set val(cell_id), val(sketch_id), val(moltypes), val(ksizes), file(output_sig) into sketches script: // sketch_id = make_sketch_id(molecule, ksize, sketch_value, track_abundance, sketch_style) @@ -1525,25 +1334,6 @@ if ((params.bam || params.tenx_tgz) && !params.skip_compute && !params.skip_sig_ } - ch_sourmash_sketches_merged_to_view - .dump( tag: "ch_sourmash_sketches_to_view" ) - - - -// } else if (!params.skip_compute) { -// sourmash_sketches_nucleotide -// .mix ( sourmash_sketches_peptide ) -// .dump ( tag: 'skip_merge__ch_sourmash_sketches_to_comapre' ) -// .set { ch_sourmash_sketches_to_comapre } -// ch_sourmash_sig_describe_merged = Channel.empty() -} else { - // Use "mix" to aggregate the nucleotide and peptide sketches into one place - sourmash_sketches_nucleotide - .mix ( sourmash_sketches_peptide ) - .dump ( tag: 'skip_merge__ch_sourmash_sketches_to_compare' ) - .set { ch_sourmash_sketches_merged } - ch_sourmash_sig_describe_merged = Channel.empty() -} if (params.split_kmer){ process ska_compare_sketches { @@ -1571,7 +1361,7 @@ if (!params.split_kmer && !params.skip_compare && !params.skip_compute) { // .collect() // // Set as a list so that combine does cartesian product of all signatures // .map { it -> [it] } - // .combine( ch_ksizes_for_compare_nucleotide ) + // .combine( ch_ksizes_for_compare ) // .dump( tag: 'sourmash_sketches_nucleotide__ksizes' ) // .map { x -> [x[0], x[1], 'dna'] } // .dump( tag: 'sourmash_sketches_nucleotide__ksizes__molecules' ) @@ -1593,12 +1383,12 @@ if (!params.split_kmer && !params.skip_compare && !params.skip_compute) { // ch_sourmash_sketches_to_compare = Channel.empty() - ch_peptide_molecules_for_compare - .combine( ch_ksizes_for_compare_peptide ) + ch_peptide_molecules + .combine( ch_ksizes_for_compare ) .set { ch_sourmash_compare_params_peptide } Channel.from("dna") - .combine( ch_ksizes_for_compare_nucleotide ) + .combine( ch_ksizes_for_compare ) .mix ( ch_sourmash_compare_params_peptide ) .set { ch_sourmash_compare_params_both } @@ -1869,3 +1659,258 @@ def checkHostname() { } } } + + +workflow { + if ( !params.split_kmer && have_sketch_value ) { + // Only use this for sourmash sketches, not split k-mer sketches + /* + * Validate sketch sizes + */ + validate_sketch_value(sketch_num_hashes, sketch_num_hashes_log2, sketch_scaled, sketch_scaled_log2) + + + // Parse sketch style into value + sketch_style = validate_sketch_value.out.sketch_style + sketch_style_parsed = sketch_style + .splitText() + .dump ( tag: 'ch_sketch_style' ) + .map { it -> it.replaceAll('\\n', '' ) } + .first() + .dump ( tag: 'sketch_style_parsed' ) + .collect () + // get first item of returned array from .collect() + + + // Parse file into values + sketch_value_parsed = validate_sketch_value.out.sketch_value + .splitText() + .map { it -> it.replaceAll('\\n', '')} + .first() + .dump ( tag : 'sketch_value_parsed' ) + .collect() + // get first item of returned array from .collect() + } + + + if ( have_nucleotide_input ) { + if (!params.skip_trimming && have_nucleotide_fastq_input){ + fastp(ch_read_files_trimming) + // Filtering out fastq.gz files less than 200 bytes (arbitary number) + // ~200 bytes is about the size of a file with a single read or less + // We can't use .size() > 0 because it's fastq.gz is gzipped content + fastp.out.trimmed_reads + .dump ( tag: 'ch_reads_all_trimmed' ) + .branch { + // Paired is a tuple of two reads + paired: it[1].size() == 2 + single: true + } + .set { ch_reads_trimmed_branched } + + ch_reads_trimmed_branched.paired + .filter{ it -> it[1][0].size() > 200 } + .dump ( tag: 'ch_reads_trimmed_paired' ) + .set{ ch_reads_trimmed_paired } + + ch_reads_trimmed_branched.single + .filter{ it -> it[1].size() > 200 } + .dump ( tag: 'ch_reads_trimmed_single' ) + .set{ ch_reads_trimmed_single } + + ch_reads_trimmed_single + .mix ( ch_reads_trimmed_paired ) + .set { ch_reads_trimmed } + + // Concatenate trimmed fastq files with fastas + if (params.subsample){ + // Concatenate trimmed reads with fastas for subsequent subsampling + ch_reads_trimmed + .concat( fastas_ch ) + .dump ( tag: 'trimmed_reads__concat_fastas' ) + .set { subsample_ch_reads_for_ribosomal_removal } + } else { + // Concatenate trimmed reads with fastas for signature generation + ch_reads_for_ribosomal_removal = ch_reads_trimmed.mix(fastas_ch) + } + } else { + ch_reads_for_ribosomal_removal = fastas_ch + ch_fastp_results = Channel.from(false) + } + + /* + * STEP 2+ - SortMeRNA - remove rRNA sequences on request + */ +if (!params.remove_ribo_rna) { + ch_reads_for_ribosomal_removal + .set { ch_reads_to_translate } + sortmerna_logs = Channel.empty() +} else { + sortmerna_index(sortmerna_fasta) + sortmerna( + ch_reads_for_ribosomal_removal, + sortmerna_index.out.db_name, + sortmerna_index.out.db_fasta, + sortmerna_index.out.db + ) +} + + if (params.reference_proteome_fasta){ + translate(ch_orpheum_bloom_filter.collect(), ch_reads_to_translate) + + // Remove empty files + // it[0] = sample id + // it[1] = sequence fasta file + ch_translated_protein_seqs + .mix ( ch_protein_fastas ) + .dump ( tag: 'ch_protein_seq_to_sketch_from_translate' ) + .set { ch_protein_seq_to_sketch } + // Remove empty files + // it[0] = sample bloom id + // it[1] = sequence fasta file + ch_noncoding_nucleotides_nonempty = ch_noncoding_nucleotides_potentially_empty.filter{ it[1].size() > 0 } + ch_translatable_nucleotide_seqs + .dump( tag: 'ch_translatable_nucleotide_seqs' ) + .filter{ it[1].size() > 0 } + .dump ( tag: 'ch_reads_to_sketch__from_translate' ) + .set { ch_reads_to_sketch } + + } else { + // Send reads directly into coding/noncoding + ch_reads_to_translate + .dump ( tag: 'ch_reads_to_sketch__no_translation' ) + .set{ ch_reads_to_sketch } + } + + if (params.split_kmer){ + /////////////////////////////////////////////////////////////////////////////// + /////////////////////////////////////////////////////////////////////////////// + /* -- -- */ + /* -- CREATE SKA SKETCH -- */ + /* -- -- */ + /////////////////////////////////////////////////////////////////////////////// + /////////////////////////////////////////////////////////////////////////////// + + ska_compute_sketch(ksizes, ch_reads_to_sketch) + } else if (!params.skip_compute) { + ch_ksizes = Channel.from(params.ksizes.split(',')) + + sourmash_compute_sketch_fastx_nucleotide(ch_ksizes, track_abundance, sketch_value_parsed, sketch_style_parsed, ch_reads_to_sketch) + + sourmash_compute_sketch_fastx_nucleotide.out.sketches + // 5th item (4 when 0-based) is the actual signature. + // Make sure it is non-empty + .filter{ it[4].size() > 0 } + .dump ( tag: "sourmash_sketches_all_nucleotide" ) + .set { sourmash_sketches_nucleotide } + } else { + sourmash_sketches_nucleotide = Channel.empty() + ch_protein_fastas + .set { ch_protein_seq_to_sketch } + ch_sourmash_sig_describe_nucleotides = Channel.empty() + } + } else { + sourmash_sketches_nucleotide = Channel.empty() + ch_fastp_results = Channel.from(false) + sortmerna_logs = Channel.from(false) + + ch_protein_fastas + .set { ch_protein_seq_to_sketch } + ch_sourmash_sig_describe_nucleotides = Channel.empty() + } + + + +if ((!have_nucleotide_input) || params.skip_trimming || have_nucleotide_fasta_input) { + // Only protein input or skip trimming, or fastas which can't be trimmed. + ch_fastp_results = Channel.from(false) + sortmerna_logs = Channel.from(false) + +} + +if (!have_nucleotide_input) { + // Only protein input, can't do sortMeRNA + sortmerna_logs = Channel.empty() + ch_fastp_results = Channel.from(false) +} + + +if (!params.skip_compute && (protein_input || params.reference_proteome_fasta)){ + sourmash_compute_sketch_fastx_peptide(ksizes, track_abundance, sketch_value_parsed, sketch_style_parsed, ch_protein_seq_to_sketch) + + sourmash_sketches_peptide = sourmash_sketches_all_peptide.filter{ it[3].size() > 0 } +} else { + sourmash_sketches_peptide = Channel.empty() + ch_sourmash_sig_describe_peptides = Channel.empty() +} + +// ------------- +// Merge signatures from same sample id and sketch id +// -> for single-cell data where aligned/unaligned reads were separated +// ------------- +if ((params.bam || params.tenx_tgz) && !params.skip_compute && !params.skip_sig_merge) { + + sourmash_sketches_nucleotide + .mix ( sourmash_sketches_peptide ) + .dump ( tag: 'ch_sourmash_sketches_mixed' ) + .set { ch_sourmash_sketches_mixed } + + ch_fastq_id_to_cell_id_is_aligned + .dump( tag: 'ch_fastq_id_to_cell_id_is_aligned' ) + .combine ( ch_sourmash_sketches_mixed, by: 0 ) + .unique() + .dump( tag: 'fastq_id_to_cells__combine__sketches' ) + // [DUMP: fastq_id_to_cells__combine__sketches] + // ['mouse_brown_fat_ptprc_plus_unaligned__aligned__CTGAAGTCAATGGTCT', + // mouse_brown_fat_ptprc_plus_unaligned__CTGAAGTCAATGGTCT, + // 'aligned', + // molecule-dna__ksize-3__num_hashes-4__track_abundance-false, + // 'dna', + // '3', + // mouse_brown_fat_ptprc_plus_unaligned__aligned__CTGAAGTCAATGGTCT__molecule-dna__ksize-3__num_hashes-4__track_abundance-false.sig] + // it[0]: fastq_id, e.g. "mouse_brown_fat_ptprc_plus_unaligned__aligned__CTGAAGTCAATGGTCT" (contains aligned/unaligned) + // it[1]: cell_id, e.g. "mouse_brown_fat_ptprc_plus_unaligned__CTGAAGTCAATGGTCT" + // it[2]: is_aligned, e.g. "aligned" + // it[3]: sketch_id, e.g. molecule-dna__ksize-3__num_hashes-4__track_abundance-false + // it[4]: molecule, e.g. 'dna' + // it[5]: ksize, e.g. '3' + // it[6]: signature file, e.g. mouse_brown_fat_ptprc_plus_unaligned__aligned__CTGAAGTCAATGGTCT__molecule-dna__ksize-3__num_hashes-4__track_abundance-false.sig + .groupTuple( by: [1, 3, 4, 5] ) + // [DUMP: fastq_id_to_cells__combine__sketches__grouptuple] + // [ + // ['mouse_brown_fat_ptprc_plus_unaligned__aligned__GCGCAGTCATGCCTTC'], + // mouse_brown_fat_ptprc_plus_unaligned__GCGCAGTCATGCCTTC, + // ['aligned'], + // molecule-dna__ksize-3,9__scaled-2__track_abundance-false, + // 'dna', + // '3,9', + // [mouse_brown_fat_ptprc_plus_unaligned__aligned__GCGCAGTCATGCCTTC__molecule-dna__ksize-3,9__scaled-2__track_abundance-false.sig] + // ] + .dump( tag: 'fastq_id_to_cells__combine__sketches__grouptuple' ) + .map { it -> [it[0].unique(), it[1], it[2].unique(), it[3], it[4], it[5], it[6]] } + .dump( tag: 'fastq_id_to_cells__combine__sketches__grouptuple__unique' ) + .set { ch_sourmash_sketches_to_merge } + + sourmash_sig_merge(ch_sourmash_sketches_to_merge) + + sourmash_sig_merge.out.sketches + .dump( tag: "ch_sourmash_sketches_to_view" ) + + ch_sourmash_sketches_merged = sourmash_sig_merge.out.sketches + ch_sourmash_sig_describe_merged = sourmash_sig_merge.out.describe_csv + // } else if (!params.skip_compute) { +// sourmash_sketches_nucleotide +// .mix ( sourmash_sketches_peptide ) +// .dump ( tag: 'skip_merge__ch_sourmash_sketches_to_comapre' ) +// .set { ch_sourmash_sketches_to_comapre } +// ch_sourmash_sig_describe_merged = Channel.empty() +} else { + // Use "mix" to aggregate the nucleotide and peptide sketches into one place + sourmash_sketches_nucleotide + .mix ( sourmash_sketches_peptide ) + .dump ( tag: 'skip_merge__ch_sourmash_sketches_to_compare' ) + .set { ch_sourmash_sketches_merged } + ch_sourmash_sig_describe_merged = Channel.empty() +} + +} diff --git a/nextflow.config b/nextflow.config index 589d186f..98c2e3cc 100644 --- a/nextflow.config +++ b/nextflow.config @@ -28,7 +28,7 @@ params { // Creating sketches molecules ='dna,protein,dayhoff' - ksizes = '21,30,51' + ksizes = '21,33,51' // Track abundance by default track_abundance = true // Number of hashes from each sample