Skip to content

Commit

Permalink
Merge pull request #79 from UMCUGenetics/rnaseq_features_fixes
Browse files Browse the repository at this point in the history
Add bam input, tpm counting and bugfixes to new beta branch
  • Loading branch information
ffmmulder authored Jun 17, 2021
2 parents 76fad46 + 6dc071f commit 6548cc6
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 65 deletions.
8 changes: 8 additions & 0 deletions bin/edgerNormalize.R
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ library(edgeR)

args = commandArgs(trailingOnly=TRUE)

tpm3 <- function(counts,len) {
x <- counts/len
return(t(t(x)*1e6/colSums(x)))
}


fc.df <- read.delim(args[1], comment.char="#")
# Normalize
counts <- data.frame(fc.df[,8:ncol(fc.df)])
Expand All @@ -15,7 +21,9 @@ x <- DGEList(counts=counts, genes=fc.df[,c("Geneid","Length")] )
# RPKM/CPM normalization
df.rpkm <- rpkm(x,x$genes$Length, normalized.lib.sizes=F)
df.cpm <- cpm(x)
df.tpm = tpm3(counts, x$genes$Length)

write.table(df.rpkm, file=paste0(args[2],"_featureCounts_RPKM.txt"), row.names=T, quote=F,sep=" ")
write.table(df.cpm, file=paste0(args[2],"_featureCounts_CPM.txt"), row.names=T, quote=F,sep=" ")
write.table(df.tpm, file=paste0(args[2],"_featureCounts_TPM.txt"), row.names=T, quote=F,sep=" ")

210 changes: 148 additions & 62 deletions main.nf
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def helpMessage() {
The typical command for running the pipeline is as follows:
nextflow run UMCUGenetics/RNASeq-NF --fastq_path <fastq_dir> --out_dir <output_dir> -c <path/to/analysis.config --email <email address>
${c_blue} Mandatory arguments: ${c_reset}
${c_yellow} --fastq_path [str] ${c_reset} Path to a directory containing fastq files.
${c_yellow} --fastq_path [str] OR --bam_path [str] ${c_reset} Path to a directory containing fastq files or bam files.
Files should be named in the following format: xxx_xxx_xxxx
${c_yellow} --out_dir [str] ${c_reset} The output directory where the results will be saved.
${c_yellow} --email [str] ${c_reset} The email address to send workflow summary and MultiQC report to.
Expand Down Expand Up @@ -128,31 +128,84 @@ if (!params.out_dir) {
if (!params.email) {
exit 1, "Please provide an email address"
}
if (!params.fastq_path) {
exit 1, "fastq files not found, please provide the correct path! (--fastq_path)"
if (!params.fastq_path && !params.bam_path) {
exit 1, "Please specify either a 'fastq_path' or a 'bam_path'! (--fastq_path or --bam_path)"
}

if (params.runMapping || params.runPostQC || params.runFeatureCounts) {
/* Extended input and settings checking
*/

// Check for gtf file when needed
if ( ( params.runMapping && params.fastq_path ) || params.runFeatureCounts || (params.runPostQC && !params.genome_bed)) {
if (!params.genome_gtf ) {
exit 1, "A GTF file is required for STAR, RSeQC &featureCounts. Please provide the correct filepath! (--genome_gtf)"
exit 1, "A GTF file is required for STAR, RSeQC (in case --genome_bed is not specifed) & featureCounts. Please provide the correct filepath! (--genome_gtf)"
} else {
// Try importing.
genome_gtf = Channel
.fromPath( params.genome_gtf, checkIfExists: true )
.ifEmpty { exit 1, "GTF file not found: ${params.genome_gtf}"}
}
}
def run_name = params.fastq_path.split('/')[-1]
// Check for bed file when needed
if (params.runPostQC && !params.genome_gtf) {
if (!params.genome_bed ) {
exit 1, "A .bed file is required for RSeQC in case --genome_gtf is not specifed. Please provide the correct filepath! (--genome_bed)"
} else {
// Try importing.
genome_bed = Channel
.fromPath( params.genome_bed, checkIfExists: true )
.ifEmpty { exit 1, "BED file not found: ${params.genome_bed}"}
}
}

//mapping requires fastq files
if ( params.runMapping && !params.fastq_path) {
//if none specified, check for bam input as alternative
if(params.bam_path){
println "Mapping but no --fastq_path specified. Using --bam_path as substitute for mapping results."
}
else{
exit 1, "Mapping is specified, but no --fastq_path or substitute --bam_path specified. Please specify either one of them!"
}
}

if ( params.runPostQC) {
if(!params.bam_path && !params.runMapping){
exit 1, "--runPostQC is specified. This requires mapped bam files, but neither --runMapping or --bam_path are specified. Exiting!"
}
}

if ( params.runFeatureCounts) {
if(!params.bam_path && !params.runMapping){
exit 1, "--runFeatureCounts is specified. This requires mapped bam files, but neither --runMapping nor --bam_path are specified. Exiting!"
}
}

if ( params.runSalmon && !params.fastq_path) {
exit 1, "--runSalmon is specified. This requires fastq files but no --fastq_path specified. Exiting!"
}

if ( params.runGermlineCallingGATK ) {
if(!params.bam_path && !params.runMapping){
exit 1, "--runGermlineCallingGATK specified. This requires mapped bam files, but neither --runMapping or --bam_path are specified. Exiting!"
}
}

def run_name
if( params.fastq_path ){
run_name = params.fastq_path.split('/')[-1]
}
else{
if( params.bam_path ){
run_name = params.bam_path.split('/')[-1]
}
}
if ( params.custom_run_name) {
run_name = params.custom_run_name
}
//Start workflow
workflow {
main :
//Set run and retrieve input fastqs
include extractAllFastqFromDir from './NextflowModules/Utils/fastq.nf' params(params)
include MergeFastqLanes from './NextflowModules/Utils/MergeFastqLanes.nf' params( params )
fastq_files = extractAllFastqFromDir(params.fastq_path).map { [it[0],it[1],it[4]]}
//Pipeline log info
log.info """=======================================================
RNASeq-NF ${ workflow.manifest.version}"
Expand All @@ -177,76 +230,109 @@ workflow {
summary['Config Profile'] = workflow.profile
log.info summary.collect { k,v -> "${k.padRight(15)}: $v" }.join("\n")
log.info "========================================="
// # 1) Pre-processing / QC
include pre_processing from './sub-workflows/pre_processing.nf' params(params)
pre_processing ( fastq_files )
//Logs
trim_logs = pre_processing.out.trim_logs
fastqc_logs = pre_processing.out.fastqc_logs
sortmerna_logs = pre_processing.out.srna_logs
// Determine final fastqs files
final_fastqs = pre_processing.out.processed_fastqs
//Transform output channels
fastqs_transformed = final_fastqs.groupTuple(by:0).map { sample_id, rg_id, reads -> [sample_id, rg_id[0], reads.flatten()]}
//Merge Fastqs lanes before proceeding.
if ( params.MergeFQ ) {
fastqs_processed = MergeFastqLanes (fastqs_transformed)
} else {
fastqs_processed = fastqs_transformed

def bam_files

//handle first steps (qc and merging) when fastq files are used as input
fastqc_logs = Channel.empty()
trim_logs = Channel.empty()
sortmerna_logs = Channel.empty()


if ( params.fastq_path ){

//Set run and retrieve input fastqs
include extractAllFastqFromDir from './NextflowModules/Utils/fastq.nf' params(params)
fastq_files = extractAllFastqFromDir(params.fastq_path).map { [it[0],it[1],it[4]]}

// # 1) Pre-processing / QC
include pre_processing from './sub-workflows/pre_processing.nf' params(params)
pre_processing ( fastq_files )
//Logs
trim_logs = pre_processing.out.trim_logs
fastqc_logs = pre_processing.out.fastqc_logs
sortmerna_logs = pre_processing.out.srna_logs
// Determine final fastqs files
final_fastqs = pre_processing.out.processed_fastqs
//Transform output channels
fastqs_transformed = final_fastqs.groupTuple(by:0).map { sample_id, rg_id, reads -> [sample_id, rg_id[0], reads.flatten()]}
//Merge Fastqs lanes before proceeding.
if ( params.MergeFQ ) {
include MergeFastqLanes from './NextflowModules/Utils/MergeFastqLanes.nf' params( params )
fastqs_processed = MergeFastqLanes (fastqs_transformed)
} else {
fastqs_processed = fastqs_transformed
}
}
//otherwise set up bam files for further use
else {

//alternative, direct way
include { extractBamFromDir } from './NextflowModules/Utils/bam.nf' params( params )
//get bams from path
sorted_bams = extractBamFromDir(params.bam_path).map { sample_id, bam, bai -> [sample_id, sample_id, bam, bai]}
empty_logs = Channel.empty()
empty_flagstat = Channel.empty()
mapped = [ "bam_sorted":sorted_bams, "logs":empty_logs, "star_flagstat":empty_flagstat ]

//else bam files will be used for further processing as handled above
include Flagstat as Flagstat_raw from './NextflowModules/Sambamba/0.7.0/Flagstat.nf' params( params )
Flagstat_raw( mapped.bam_sorted.map {sample_id, rg_id, bam, bai -> [sample_id, bam, bai] } )
flagstat_logs = Flagstat_raw.out
}


// # 2) STAR alignment | Sambamba markdup
if ( params.runMapping ) {
include star_alignment from './sub-workflows/star_alignment.nf' params(params)
mapped = star_alignment ( fastqs_processed, genome_gtf )
star_logs = mapped.logs
flagstat_logs = mapped.star_flagstat

} else {
flagstat_logs = Channel.empty()
star_logs = Channel.empty()
flagstat_logs = Channel.empty()
star_logs = Channel.empty()

if ( params.runMapping ) {
if ( params.fastq_path ) {
include star_alignment from './sub-workflows/star_alignment.nf' params(params)
mapped = star_alignment ( fastqs_processed, genome_gtf )
star_logs = mapped.logs
flagstat_logs = mapped.star_flagstat
}/* else {
//else bam files will be used for further processing as handled above
include Flagstat as Flagstat_raw from './NextflowModules/Sambamba/0.7.0/Flagstat.nf' params( params )
Flagstat_raw( mapped.bam_sorted.map {sample_id, rg_id, bam, bai -> [sample_id, bam, bai] } )
flagstat_logs = Flagstat_raw.out
}
*/
}

// # 3) Post-mapping QC
post_qc_logs = Channel.empty()

if ( params.runPostQC) {
if (params.runMapping) {
include post_mapping_QC from './sub-workflows/post_mapping_QC.nf' params(params)
post_mapping_QC( mapped.bam_sorted.map { sample_id, rg_id, bam, bai -> [sample_id, bam, bai] }, genome_gtf )
post_qc_logs = post_mapping_QC.out[0].map {it[1]}.mix(post_mapping_QC.out[2].map {it[1]}).mix(post_mapping_QC.out[1].map {it[1]}).collect()
} else {
exit 1, "PostQC requires alignment step. Please enable runMapping!"
}
} else {
post_qc_logs = Channel.empty()
}
include post_mapping_QC from './sub-workflows/post_mapping_QC.nf' params(params)
post_mapping_QC( mapped.bam_sorted.map { sample_id, rg_id, bam, bai -> [sample_id, bam, bai] }, genome_gtf )
//post_mapping_QC( mapped.bam_sorted, genome_gtf )
post_qc_logs = post_mapping_QC.out[0].map {it[1]}.mix(post_mapping_QC.out[2].map {it[1]}).mix(post_mapping_QC.out[1].map {it[1]}).collect()
}

// # 4) featureCounts
fc_logs = Channel.empty()

if ( params.runFeatureCounts) {
if (params.runMapping) {
include alignment_based_quant from './sub-workflows/alignment_based_quant.nf' params(params)
alignment_based_quant ( run_name, mapped.bam_sorted.map { it[2] }, genome_gtf )
fc_logs = alignment_based_quant.out.fc_summary
} else {
exit 1, "featureCounts requires alignment step. Please enable runMapping!"
}
} else {
fc_logs = Channel.empty()
include alignment_based_quant from './sub-workflows/alignment_based_quant.nf' params(params)
alignment_based_quant ( run_name, mapped.bam_sorted.map { it[2] }, genome_gtf )
fc_logs = alignment_based_quant.out.fc_summary
}
// # 5) Salmon
salmon_logs = Channel.empty()

if ( params.runSalmon ) {
include alignment_free_quant from './sub-workflows/alignment_free_quant.nf' params(params)
alignment_free_quant( fastqs_processed, run_name )
salmon_logs = alignment_free_quant.out.logs
} else {
salmon_logs = Channel.empty()
}
// # 6) GATK4 germline variant calling with optional BQSR
if ( params.runGermlineCallingGATK ) {
if ( params.runMapping ) {
include gatk_germline_calling from './sub-workflows/gatk_germline_calling.nf' params(params)
gatk_germline_calling( run_name, mapped.bam_sorted )
} else {
exit 1, "GATK4 requires alignment, markdup step. Please enable runMapping and runMarkDup!"
}
include gatk_germline_calling from './sub-workflows/gatk_germline_calling.nf' params(params)
gatk_germline_calling( run_name, mapped.bam_sorted )
}

// # 7) MultiQC report
if ( params.runMultiQC ) {
include qc_report from './sub-workflows/qc_report.nf' params(params)
Expand Down
3 changes: 2 additions & 1 deletion sub-workflows/alignment_based_quant.nf
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ workflow alignment_based_quant {

main:
FeatureCounts( run_name, bam_files.collect(), genome_gtf.collect() )
//if ( params.normalize_counts ) {
if ( params.normalize_counts ) {
EdgerNormalize (run_name, FeatureCounts.out.count_table )
Deseq2Normalize (run_name, FeatureCounts.out.count_table )
}
}

emit:
fc_read_counts = FeatureCounts.out.count_table
Expand Down
9 changes: 7 additions & 2 deletions sub-workflows/post_mapping_QC.nf
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ workflow post_mapping_QC {
genome_gtf

main:
rseqc_logs = Channel.empty()
rseqc_tin_logs = Channel.empty()
preseq_lce_logs = Channel.empty()
if (params.genome_bed ) {
//Create bed12 index file
genome_bed = Channel
Expand All @@ -20,14 +23,16 @@ workflow post_mapping_QC {
GenePredToBed ( GtfToGenePred.out.genome_genepred )
genome_bed = GenePredToBed.out.genome_bed12
}
RSeQC(bams_in, genome_bed.collect())
RSeQC(bams_in, genome_bed.collect())

if (params.runRSeQC_TIN) {
RSeQC_TIN(bams_in, genome_bed.collect())
rseqc_tin_logs = RSeQC_TIN.out
}
LCExtrap(bams_in)

emit:
rseqc_tin_logs = rseqc_tin_logs
rseqc_logs = RSeQC.out
rseqc_tin_logs = RSeQC_TIN.out
preseq_lce_logs = LCExtrap.out
}

0 comments on commit 6548cc6

Please sign in to comment.