diff --git a/bin/combine_tables.py b/bin/combine_tables.py index 46da929b..2feed698 100755 --- a/bin/combine_tables.py +++ b/bin/combine_tables.py @@ -87,7 +87,7 @@ def main(args=None): if args.gtdbtk_summary: gtdbtk_results = pd.read_csv(args.gtdbtk_summary, sep="\t") - if not bins.equals(gtdbtk_results["user_genome"].sort_values().reset_index(drop=True)): + if len(set(gtdbtk_results["user_genome"].to_list()).difference(set(bins))) > 0: sys.exit("Bins in GTDB-Tk summary do not match bins in bin depths summary!") results = pd.merge( results, gtdbtk_results, left_on="bin", right_on="user_genome", how="outer" diff --git a/modules/local/cat.nf b/modules/local/cat.nf index bda355c6..90d44ae5 100644 --- a/modules/local/cat.nf +++ b/modules/local/cat.nf @@ -11,14 +11,14 @@ process CAT { tuple val(db_name), path("database/*"), path("taxonomy/*") output: - path("*.ORF2LCA.names.txt.gz") , emit: orf2lca_classification - path("*.bin2classification.names.txt.gz") , emit: tax_classification_names - path("raw/*.ORF2LCA.txt.gz") , emit: orf2lca - path("raw/*.predicted_proteins.faa.gz") , emit: faa - path("raw/*.predicted_proteins.gff.gz") , emit: gff - path("raw/*.log") , emit: log - path("raw/*.bin2classification.txt.gz") , emit: tax_classification_taxids - path "versions.yml" , emit: versions + tuple val(meta), path("*.bin2classification.names.txt") , emit: tax_classification_names + path("*.ORF2LCA.names.txt.gz") , emit: orf2lca_classification + path("raw/*.ORF2LCA.txt.gz") , emit: orf2lca + path("raw/*.predicted_proteins.faa.gz") , emit: faa + path("raw/*.predicted_proteins.gff.gz") , emit: gff + path("raw/*.log") , emit: log + path("raw/*.bin2classification.txt.gz") , emit: tax_classification_taxids + path "versions.yml" , emit: versions script: def official_taxonomy = params.cat_official_taxonomy ? "--only_official" : "" @@ -31,12 +31,13 @@ process CAT { mkdir raw mv *.ORF2LCA.txt *.predicted_proteins.faa *.predicted_proteins.gff *.log *.bin2classification.txt raw/ + cp *.bin2classification.names.txt raw/ gzip "raw/${prefix}.ORF2LCA.txt" \ "raw/${prefix}.concatenated.predicted_proteins.faa" \ "raw/${prefix}.concatenated.predicted_proteins.gff" \ "raw/${prefix}.bin2classification.txt" \ "${prefix}.ORF2LCA.names.txt" \ - "${prefix}.bin2classification.names.txt" + "raw/${prefix}.bin2classification.names.txt" cat <<-END_VERSIONS > versions.yml "${task.process}": diff --git a/modules/local/cat_summary.nf b/modules/local/cat_summary.nf index 8bd2d815..5f8631fa 100644 --- a/modules/local/cat_summary.nf +++ b/modules/local/cat_summary.nf @@ -16,9 +16,6 @@ process CAT_SUMMARY { script: def prefix = task.ext.prefix ?: "cat_summary" """ - # use find as sometimes these are empty and need to fail gracefully - find -L -type f -name "*bin2classification.names.txt.gz" -exec sh -c 'for f do gunzip -c \$f > \${f%.*}; done' find-sh {} + - bioawk '(NR == 1) || (FNR > 1)' *.txt > ${prefix}.tsv cat <<-END_VERSIONS > versions.yml diff --git a/modules/local/quast_bins.nf b/modules/local/quast_bins.nf index e8ae58e7..30117404 100644 --- a/modules/local/quast_bins.nf +++ b/modules/local/quast_bins.nf @@ -11,7 +11,7 @@ process QUAST_BINS { output: path "QUAST/*", type: 'dir' , emit: dir - path "QUAST/*-quast_summary.tsv", emit: quast_bin_summaries + tuple val(meta), path("QUAST/*-quast_summary.tsv"), emit: quast_bin_summaries path "versions.yml" , emit: versions script: diff --git a/subworkflows/local/binning_refinement.nf b/subworkflows/local/binning_refinement.nf index 360bffaa..f92bf0cb 100644 --- a/subworkflows/local/binning_refinement.nf +++ b/subworkflows/local/binning_refinement.nf @@ -62,6 +62,7 @@ workflow BINNING_REFINEMENT { // Note: do not `failOnMismatch` on join here, in some cases e.g. MAXBIN2 will fail if no bins, so cannot join! // Only want to join for DAS_Tool on bins that 'exist' + ch_input_for_dastool = ch_contigs_for_dastool.join(ch_fastatocontig2bin_for_dastool, by: 0) ch_versions = ch_versions.mix(DASTOOL_FASTATOCONTIG2BIN_METABAT2.out.versions.first()) diff --git a/subworkflows/local/depths.nf b/subworkflows/local/depths.nf index 012899ad..87fc21cc 100644 --- a/subworkflows/local/depths.nf +++ b/subworkflows/local/depths.nf @@ -19,9 +19,6 @@ workflow DEPTHS { main: ch_versions = Channel.empty() - - depths.dump(tag: 'depths', pretty: true) - // Compute bin depths for different samples (according to `binning_map_mode`) // Create a new meta combine key first, but copy meta so that // we retain the information about binners and domain classification @@ -62,7 +59,15 @@ workflow DEPTHS { } MAG_DEPTHS_PLOT ( ch_mag_depths_plot, ch_sample_groups.collect() ) - MAG_DEPTHS_SUMMARY ( MAG_DEPTHS.out.depths.map{it[1]}.collect() ) + + //Depth files that are coming from bins and failed binning refinement are concatenated per meta + ch_mag_depth_out = MAG_DEPTHS.out.depths + .collectFile(keepHeader: true) { + meta, depth -> + [meta.id, depth] + } + + MAG_DEPTHS_SUMMARY ( ch_mag_depth_out.collect() ) ch_versions = ch_versions.mix( MAG_DEPTHS_PLOT.out.versions ) ch_versions = ch_versions.mix( MAG_DEPTHS_SUMMARY.out.versions ) diff --git a/subworkflows/local/gtdbtk.nf b/subworkflows/local/gtdbtk.nf index 2f110a43..370f3c4f 100644 --- a/subworkflows/local/gtdbtk.nf +++ b/subworkflows/local/gtdbtk.nf @@ -25,7 +25,8 @@ workflow GTDBTK { def completeness = -1 def contamination = -1 def missing, duplicated - if (params.busco_db.getBaseName().contains('odb10')) { + def busco_db = file(params.busco_db) + if (busco_db.getBaseName().contains('odb10')) { missing = row.'%Missing (specific)' // TODO or just take '%Complete'? duplicated = row.'%Complete and duplicated (specific)' } else { @@ -52,7 +53,7 @@ workflow GTDBTK { ch_filtered_bins = bins .transpose() .map { meta, bin -> [bin.getName(), bin, meta]} - .join(ch_bin_metrics, failOnDuplicate: true, failOnMismatch: true) + .join(ch_bin_metrics, failOnDuplicate: true) .map { bin_name, bin, meta, completeness, contamination -> [meta, bin, completeness, contamination] } .branch { passed: (it[2] != -1 && it[2] >= params.gtdbtk_min_completeness && it[3] != -1 && it[3] <= params.gtdbtk_max_contamination) @@ -84,11 +85,9 @@ workflow GTDBTK { GTDBTK_SUMMARY ( ch_filtered_bins.discarded.map{it[1]}.collect().ifEmpty([]), - GTDBTK_CLASSIFYWF.out.summary.collect().ifEmpty([]), + GTDBTK_CLASSIFYWF.out.summary.map{it[1]}.collect().ifEmpty([]), [], - // GTDBTK_CLASSIFYWF.out.filtered.collect().ifEmpty([]), [] - // GTDBTK_CLASSIFYWF.out.failed.collect().ifEmpty([]) ) emit: diff --git a/workflows/mag.nf b/workflows/mag.nf index 160928d2..090e5232 100644 --- a/workflows/mag.nf +++ b/workflows/mag.nf @@ -455,10 +455,23 @@ workflow MAG { if ( !ch_centrifuge_db_file.isEmpty() ) { if ( ch_centrifuge_db_file.extension in ['gz', 'tgz'] ) { // Expects to be tar.gz! - ch_db_for_centrifuge = CENTRIFUGE_DB_PREPARATION ( ch_centrifuge_db_file ).db + ch_db_for_centrifuge = CENTRIFUGE_DB_PREPARATION ( ch_centrifuge_db_file ) + .db + .collect() + .map{ + db -> + def db_name = db[0].getBaseName().split('\\.')[0] + [ db_name, db ] + } } else if ( ch_centrifuge_db_file.isDirectory() ) { ch_db_for_centrifuge = Channel .fromPath( "${ch_centrifuge_db_file}/*.cf" ) + .collect() + .map{ + db -> + def db_name = db[0].getBaseName().split('\\.')[0] + [ db_name, db ] + } } else { ch_db_for_centrifuge = Channel.empty() } @@ -466,16 +479,6 @@ workflow MAG { ch_db_for_centrifuge = Channel.empty() } - // Centrifuge val(db_name) has to be the basename of any of the - // index files up to but not including the final .1.cf - ch_db_for_centrifuge = ch_db_for_centrifuge - .collect() - .map{ - db -> - def db_name = db[0].getBaseName().split('\\.')[0] - [ db_name, db ] - } - CENTRIFUGE ( ch_short_reads, ch_db_for_centrifuge @@ -578,6 +581,7 @@ workflow MAG { } ch_assemblies = Channel.empty() + if (!params.skip_megahit){ MEGAHIT ( ch_short_reads_grouped ) ch_megahit_assemblies = MEGAHIT.out.assembly @@ -791,8 +795,6 @@ workflow MAG { [meta_new, bins] } - - // If any two of the binners are both skipped at once, do not run because DAS_Tool needs at least one if ( params.refine_bins_dastool ) { ch_prokarya_bins_dastool = ch_binning_results_bins @@ -829,8 +831,6 @@ workflow MAG { } else if ( params.postbinning_input == 'refined_bins_only' ) { ch_input_for_postbinning_bins = ch_refined_bins ch_input_for_postbinning_bins_unbins = ch_refined_bins.mix(ch_refined_unbins) - // TODO REACTIVATE ONCE PR #489 IS READY! - // TODO RE-ADD BOTH TO SCHEMA ONCE RE-ADDING } else if ( params.postbinning_input == 'both' ) { ch_all_bins = ch_binning_results_bins.mix(ch_refined_bins) ch_input_for_postbinning_bins = ch_all_bins @@ -913,7 +913,12 @@ workflow MAG { QUAST_BINS ( ch_input_for_quast_bins ) ch_versions = ch_versions.mix(QUAST_BINS.out.versions.first()) - QUAST_BINS_SUMMARY ( QUAST_BINS.out.quast_bin_summaries.collect() ) + ch_quast_bin_summary = QUAST_BINS.out.quast_bin_summaries + .collectFile(keepHeader: true) { + meta, summary -> + ["${meta.id}.tsv", summary] + } + QUAST_BINS_SUMMARY ( ch_quast_bin_summary.collect() ) ch_quast_bins_summary = QUAST_BINS_SUMMARY.out.summary } @@ -932,8 +937,15 @@ workflow MAG { ch_input_for_postbinning_bins_unbins, ch_cat_db ) + // Group all classification results for each sample in a single file + ch_cat_summary = CAT.out.tax_classification_names + .collectFile(keepHeader: true) { + meta, classification -> + ["${meta.id}.txt", classification] + } + // Group all classification results for the whole run in a single file CAT_SUMMARY( - CAT.out.tax_classification_names.collect() + ch_cat_summary.collect() ) ch_versions = ch_versions.mix(CAT.out.versions.first()) ch_versions = ch_versions.mix(CAT_SUMMARY.out.versions)