diff --git a/bin/edgerNormalize.R b/bin/edgerNormalize.R index 316c9cf..659cb04 100755 --- a/bin/edgerNormalize.R +++ b/bin/edgerNormalize.R @@ -6,6 +6,12 @@ library(edgeR) args = commandArgs(trailingOnly=TRUE) +tpm3 <- function(counts,len) { + x <- counts/len + return(t(t(x)*1e6/colSums(x))) +} + + fc.df <- read.delim(args[1], comment.char="#") # Normalize counts <- data.frame(fc.df[,8:ncol(fc.df)]) @@ -15,7 +21,9 @@ x <- DGEList(counts=counts, genes=fc.df[,c("Geneid","Length")] ) # RPKM/CPM normalization df.rpkm <- rpkm(x,x$genes$Length, normalized.lib.sizes=F) df.cpm <- cpm(x) +df.tpm = tpm3(counts, x$genes$Length) write.table(df.rpkm, file=paste0(args[2],"_featureCounts_RPKM.txt"), row.names=T, quote=F,sep=" ") write.table(df.cpm, file=paste0(args[2],"_featureCounts_CPM.txt"), row.names=T, quote=F,sep=" ") +write.table(df.tpm, file=paste0(args[2],"_featureCounts_TPM.txt"), row.names=T, quote=F,sep=" ") diff --git a/main.nf b/main.nf index 5b9e9db..5089b72 100755 --- a/main.nf +++ b/main.nf @@ -24,7 +24,7 @@ def helpMessage() { The typical command for running the pipeline is as follows: nextflow run UMCUGenetics/RNASeq-NF --fastq_path --out_dir -c ${c_blue} Mandatory arguments: ${c_reset} -${c_yellow} --fastq_path [str] ${c_reset} Path to a directory containing fastq files. +${c_yellow} --fastq_path [str] OR --bam_path [str] ${c_reset} Path to a directory containing fastq files or bam files. Files should be named in the following format: xxx_xxx_xxxx ${c_yellow} --out_dir [str] ${c_reset} The output directory where the results will be saved. ${c_yellow} --email [str] ${c_reset} The email address to send workflow summary and MultiQC report to. @@ -128,13 +128,17 @@ if (!params.out_dir) { if (!params.email) { exit 1, "Please provide an email address" } -if (!params.fastq_path) { - exit 1, "fastq files not found, please provide the correct path! (--fastq_path)" +if (!params.fastq_path && !params.bam_path) { + exit 1, "Please specify either a 'fastq_path' or a 'bam_path'! (--fastq_path or --bam_path)" } -if (params.runMapping || params.runPostQC || params.runFeatureCounts) { +/* Extended input and settings checking +*/ + +// Check for gtf file when needed +if ( ( params.runMapping && params.fastq_path ) || params.runFeatureCounts || (params.runPostQC && !params.genome_bed)) { if (!params.genome_gtf ) { - exit 1, "A GTF file is required for STAR, RSeQC &featureCounts. Please provide the correct filepath! (--genome_gtf)" + exit 1, "A GTF file is required for STAR, RSeQC (in case --genome_bed is not specifed) & featureCounts. Please provide the correct filepath! (--genome_gtf)" } else { // Try importing. genome_gtf = Channel @@ -142,17 +146,66 @@ if (params.runMapping || params.runPostQC || params.runFeatureCounts) { .ifEmpty { exit 1, "GTF file not found: ${params.genome_gtf}"} } } -def run_name = params.fastq_path.split('/')[-1] +// Check for bed file when needed +if (params.runPostQC && !params.genome_gtf) { + if (!params.genome_bed ) { + exit 1, "A .bed file is required for RSeQC in case --genome_gtf is not specifed. Please provide the correct filepath! (--genome_bed)" + } else { + // Try importing. + genome_bed = Channel + .fromPath( params.genome_bed, checkIfExists: true ) + .ifEmpty { exit 1, "BED file not found: ${params.genome_bed}"} + } +} + +//mapping requires fastq files +if ( params.runMapping && !params.fastq_path) { + //if none specified, check for bam input as alternative + if(params.bam_path){ + println "Mapping but no --fastq_path specified. Using --bam_path as substitute for mapping results." + } + else{ + exit 1, "Mapping is specified, but no --fastq_path or substitute --bam_path specified. Please specify either one of them!" + } +} + +if ( params.runPostQC) { + if(!params.bam_path && !params.runMapping){ + exit 1, "--runPostQC is specified. This requires mapped bam files, but neither --runMapping or --bam_path are specified. Exiting!" + } +} + +if ( params.runFeatureCounts) { + if(!params.bam_path && !params.runMapping){ + exit 1, "--runFeatureCounts is specified. This requires mapped bam files, but neither --runMapping nor --bam_path are specified. Exiting!" + } +} + +if ( params.runSalmon && !params.fastq_path) { + exit 1, "--runSalmon is specified. This requires fastq files but no --fastq_path specified. Exiting!" +} + +if ( params.runGermlineCallingGATK ) { + if(!params.bam_path && !params.runMapping){ + exit 1, "--runGermlineCallingGATK specified. This requires mapped bam files, but neither --runMapping or --bam_path are specified. Exiting!" + } +} + +def run_name +if( params.fastq_path ){ + run_name = params.fastq_path.split('/')[-1] +} +else{ + if( params.bam_path ){ + run_name = params.bam_path.split('/')[-1] + } +} if ( params.custom_run_name) { run_name = params.custom_run_name } //Start workflow workflow { main : - //Set run and retrieve input fastqs - include extractAllFastqFromDir from './NextflowModules/Utils/fastq.nf' params(params) - include MergeFastqLanes from './NextflowModules/Utils/MergeFastqLanes.nf' params( params ) - fastq_files = extractAllFastqFromDir(params.fastq_path).map { [it[0],it[1],it[4]]} //Pipeline log info log.info """======================================================= RNASeq-NF ${ workflow.manifest.version}" @@ -177,76 +230,109 @@ workflow { summary['Config Profile'] = workflow.profile log.info summary.collect { k,v -> "${k.padRight(15)}: $v" }.join("\n") log.info "=========================================" - // # 1) Pre-processing / QC - include pre_processing from './sub-workflows/pre_processing.nf' params(params) - pre_processing ( fastq_files ) - //Logs - trim_logs = pre_processing.out.trim_logs - fastqc_logs = pre_processing.out.fastqc_logs - sortmerna_logs = pre_processing.out.srna_logs - // Determine final fastqs files - final_fastqs = pre_processing.out.processed_fastqs - //Transform output channels - fastqs_transformed = final_fastqs.groupTuple(by:0).map { sample_id, rg_id, reads -> [sample_id, rg_id[0], reads.flatten()]} - //Merge Fastqs lanes before proceeding. - if ( params.MergeFQ ) { - fastqs_processed = MergeFastqLanes (fastqs_transformed) - } else { - fastqs_processed = fastqs_transformed + + def bam_files + + //handle first steps (qc and merging) when fastq files are used as input + fastqc_logs = Channel.empty() + trim_logs = Channel.empty() + sortmerna_logs = Channel.empty() + + + if ( params.fastq_path ){ + + //Set run and retrieve input fastqs + include extractAllFastqFromDir from './NextflowModules/Utils/fastq.nf' params(params) + fastq_files = extractAllFastqFromDir(params.fastq_path).map { [it[0],it[1],it[4]]} + + // # 1) Pre-processing / QC + include pre_processing from './sub-workflows/pre_processing.nf' params(params) + pre_processing ( fastq_files ) + //Logs + trim_logs = pre_processing.out.trim_logs + fastqc_logs = pre_processing.out.fastqc_logs + sortmerna_logs = pre_processing.out.srna_logs + // Determine final fastqs files + final_fastqs = pre_processing.out.processed_fastqs + //Transform output channels + fastqs_transformed = final_fastqs.groupTuple(by:0).map { sample_id, rg_id, reads -> [sample_id, rg_id[0], reads.flatten()]} + //Merge Fastqs lanes before proceeding. + if ( params.MergeFQ ) { + include MergeFastqLanes from './NextflowModules/Utils/MergeFastqLanes.nf' params( params ) + fastqs_processed = MergeFastqLanes (fastqs_transformed) + } else { + fastqs_processed = fastqs_transformed + } } + //otherwise set up bam files for further use + else { + + //alternative, direct way + include { extractBamFromDir } from './NextflowModules/Utils/bam.nf' params( params ) + //get bams from path + sorted_bams = extractBamFromDir(params.bam_path).map { sample_id, bam, bai -> [sample_id, sample_id, bam, bai]} + empty_logs = Channel.empty() + empty_flagstat = Channel.empty() + mapped = [ "bam_sorted":sorted_bams, "logs":empty_logs, "star_flagstat":empty_flagstat ] + + //else bam files will be used for further processing as handled above + include Flagstat as Flagstat_raw from './NextflowModules/Sambamba/0.7.0/Flagstat.nf' params( params ) + Flagstat_raw( mapped.bam_sorted.map {sample_id, rg_id, bam, bai -> [sample_id, bam, bai] } ) + flagstat_logs = Flagstat_raw.out + } + + // # 2) STAR alignment | Sambamba markdup - if ( params.runMapping ) { - include star_alignment from './sub-workflows/star_alignment.nf' params(params) - mapped = star_alignment ( fastqs_processed, genome_gtf ) - star_logs = mapped.logs - flagstat_logs = mapped.star_flagstat - - } else { - flagstat_logs = Channel.empty() - star_logs = Channel.empty() + flagstat_logs = Channel.empty() + star_logs = Channel.empty() + + if ( params.runMapping ) { + if ( params.fastq_path ) { + include star_alignment from './sub-workflows/star_alignment.nf' params(params) + mapped = star_alignment ( fastqs_processed, genome_gtf ) + star_logs = mapped.logs + flagstat_logs = mapped.star_flagstat + }/* else { + //else bam files will be used for further processing as handled above + include Flagstat as Flagstat_raw from './NextflowModules/Sambamba/0.7.0/Flagstat.nf' params( params ) + Flagstat_raw( mapped.bam_sorted.map {sample_id, rg_id, bam, bai -> [sample_id, bam, bai] } ) + flagstat_logs = Flagstat_raw.out + } +*/ } // # 3) Post-mapping QC + post_qc_logs = Channel.empty() + if ( params.runPostQC) { - if (params.runMapping) { - include post_mapping_QC from './sub-workflows/post_mapping_QC.nf' params(params) - post_mapping_QC( mapped.bam_sorted.map { sample_id, rg_id, bam, bai -> [sample_id, bam, bai] }, genome_gtf ) - post_qc_logs = post_mapping_QC.out[0].map {it[1]}.mix(post_mapping_QC.out[2].map {it[1]}).mix(post_mapping_QC.out[1].map {it[1]}).collect() - } else { - exit 1, "PostQC requires alignment step. Please enable runMapping!" - } - } else { - post_qc_logs = Channel.empty() - } + include post_mapping_QC from './sub-workflows/post_mapping_QC.nf' params(params) + post_mapping_QC( mapped.bam_sorted.map { sample_id, rg_id, bam, bai -> [sample_id, bam, bai] }, genome_gtf ) + //post_mapping_QC( mapped.bam_sorted, genome_gtf ) + post_qc_logs = post_mapping_QC.out[0].map {it[1]}.mix(post_mapping_QC.out[2].map {it[1]}).mix(post_mapping_QC.out[1].map {it[1]}).collect() + } + // # 4) featureCounts + fc_logs = Channel.empty() + if ( params.runFeatureCounts) { - if (params.runMapping) { - include alignment_based_quant from './sub-workflows/alignment_based_quant.nf' params(params) - alignment_based_quant ( run_name, mapped.bam_sorted.map { it[2] }, genome_gtf ) - fc_logs = alignment_based_quant.out.fc_summary - } else { - exit 1, "featureCounts requires alignment step. Please enable runMapping!" - } - } else { - fc_logs = Channel.empty() + include alignment_based_quant from './sub-workflows/alignment_based_quant.nf' params(params) + alignment_based_quant ( run_name, mapped.bam_sorted.map { it[2] }, genome_gtf ) + fc_logs = alignment_based_quant.out.fc_summary } // # 5) Salmon + salmon_logs = Channel.empty() + if ( params.runSalmon ) { include alignment_free_quant from './sub-workflows/alignment_free_quant.nf' params(params) alignment_free_quant( fastqs_processed, run_name ) salmon_logs = alignment_free_quant.out.logs - } else { - salmon_logs = Channel.empty() } // # 6) GATK4 germline variant calling with optional BQSR if ( params.runGermlineCallingGATK ) { - if ( params.runMapping ) { - include gatk_germline_calling from './sub-workflows/gatk_germline_calling.nf' params(params) - gatk_germline_calling( run_name, mapped.bam_sorted ) - } else { - exit 1, "GATK4 requires alignment, markdup step. Please enable runMapping and runMarkDup!" - } + include gatk_germline_calling from './sub-workflows/gatk_germline_calling.nf' params(params) + gatk_germline_calling( run_name, mapped.bam_sorted ) } + // # 7) MultiQC report if ( params.runMultiQC ) { include qc_report from './sub-workflows/qc_report.nf' params(params) diff --git a/sub-workflows/alignment_based_quant.nf b/sub-workflows/alignment_based_quant.nf index c9761ae..bd7fa25 100644 --- a/sub-workflows/alignment_based_quant.nf +++ b/sub-workflows/alignment_based_quant.nf @@ -20,10 +20,11 @@ workflow alignment_based_quant { main: FeatureCounts( run_name, bam_files.collect(), genome_gtf.collect() ) + //if ( params.normalize_counts ) { if ( params.normalize_counts ) { EdgerNormalize (run_name, FeatureCounts.out.count_table ) Deseq2Normalize (run_name, FeatureCounts.out.count_table ) - } + } emit: fc_read_counts = FeatureCounts.out.count_table diff --git a/sub-workflows/post_mapping_QC.nf b/sub-workflows/post_mapping_QC.nf index 7f9b0d7..48fa72c 100644 --- a/sub-workflows/post_mapping_QC.nf +++ b/sub-workflows/post_mapping_QC.nf @@ -10,6 +10,9 @@ workflow post_mapping_QC { genome_gtf main: + rseqc_logs = Channel.empty() + rseqc_tin_logs = Channel.empty() + preseq_lce_logs = Channel.empty() if (params.genome_bed ) { //Create bed12 index file genome_bed = Channel @@ -20,14 +23,16 @@ workflow post_mapping_QC { GenePredToBed ( GtfToGenePred.out.genome_genepred ) genome_bed = GenePredToBed.out.genome_bed12 } - RSeQC(bams_in, genome_bed.collect()) + RSeQC(bams_in, genome_bed.collect()) + if (params.runRSeQC_TIN) { RSeQC_TIN(bams_in, genome_bed.collect()) + rseqc_tin_logs = RSeQC_TIN.out } LCExtrap(bams_in) emit: + rseqc_tin_logs = rseqc_tin_logs rseqc_logs = RSeQC.out - rseqc_tin_logs = RSeQC_TIN.out preseq_lce_logs = LCExtrap.out }