The task
function is the fundamental unit to build pipelines.
It is provided with the default export of bionode-watermill
:
const watermill = require('bionode-watermill')
const task = watermill.task
// Or, with assignment destructuring:
const { task } = watermill
task
takes two parameters: props and operationCreator:
const myTask = task(props, operationCreator)
props is an object with the following structure:
const props = {
input: '*.txt', // valid input, see below
output: '*.txt', // valid output, see below
name: 'My Task',
params: { output: 'bar.txt' }, //the actual output name that can be passed to
// operationCreator or even other params that you may wish to pass to
// operationCreator.
alwaysRun: true // force rerun even if output already exists
// other options, see options
}
input and output patterns are required for tasks that deal with files. params.output allows to name the output files, while output is used to check if output was properly resolved.
Example
// example task with input/output files
const task = ({
input: '*.txt',
output: '*.txt',
params: {
output: 'bar.txt'
}
}, ({ input, params }) => `cp ${input} ${params.output}`
)
operationCreator is a function that will be provided with a resolved
props object and that is responsible for the execution of the task itself.
operationCreator
should return a stream or a promise. If the operation
creator does not return a stream, it will be wrapped into a stream internally
(e.g. StreamFromPromise
). An example operation creator is
Example
function operationCreator(resolvedProps) {
const fs = require('fs')
const intoStream = require('into-stream')
const ws = intoStream(resolvedProps.input).pipe(fs.createWriteStream('bar.txt') )
return ws
}
Note
With assignment destructuring and arrow functions, you can write cleaner operation creators:
const operationCreator = ({ input }) => intoStream(input).pipe(fs.createWriteStream('bar.txt'))
Bionode-watermill allows users to both execute code in javascript within
operationCreator
or run unix shell commands within
tasks by returning a string (using ES6 template literals) from
operationCreator
:
Example
// creates a test.txt file
const operationCreator = () => `touch test.txt`
// or non ES6 code style
function operationCreator() {
return `touch test.txt`
}
The input
and output
objects can be a string glob pattern, or a plain
object of them. The glob will be
resolved to an absolute path when it is passed to the operationCreator
.
Bionode-watermill manages input and output files and folders run by a task
.
All inputs and outputs are saved within data
folder (generated by running
bionode-watermill) in the current working directory. The input files of the
first task can be elsewhere, outside data
folder. Inside the data
folder, other subdirectories are created (one per task
) with uid
(unique
id) of
its
respective task. This uid
is generated by bionode-watermill
given the
props of the task and their parent tasks (check Uid).
For example,
{ input: '**/*.sra' }
will resolve to something like:
{ input: '/data/<uid>/ERR1229296.sra' }
And
{
input: {
reference: '*_genomic.fna.gz',
reads: ['*_1.fastq.gz', '*_2.fastq.gz']
}
}
will resolve to something like:
{
input: {
reference: '/data/<uid>/GCA_000988525.2_ASM98852v2_genomic.fna.gz',
reads: ['/data/<uid>/ERR1229296_1.fastq.gz', '/data/ERR1229296_2.fastq.gz']
}
}
Match to filesystem if in first task in pipeline, using current working directoty to search for the input files.
// this is our very first task glob pattern
'*.fas'
// Resolved Input
'./some.fas'
Otherwise glob patterns are matched
to the collection. collection allows bionode-watermill to search
within ./data
folder for the first folder that has files that match the
glob pattern. Have
into account that it crawls the tasks tree from downstream to upstream
until it finds the desired match and that this is run specific, i.e.,
currentCollection
can only reference outputs and inputs generated by the
pipeline. Otherwise, bionode-watermill will also look in current working
directory for a file that matches the desired glob pattern.
// now, this is not the first task
'*.fas'
// Resolved Input
'./data/<uid>/some.fas'
Notice the difference in the location of the inputs.
Also and importantly, if you have multiple inputs to pass to a task, make
sure they are in the same directory. If it is the first task make sure these
inputs are all in the current working directory. Same for tasks that fetch inputs
from ./data
, assure that every file is within a folder. If the desired
outputs are spread through several folders and their file extension is the
same, you will have to add something to the glob pattern suffix. For example,
// we have two fasta files in different directories
// fasta1.fas and fasta2.fas
const someTask = task({
input: ['*1.fas', '*2.fas']
}, operationCreator
)
So, if this is your case, make sure your glob pattern is unique enough for not matching other undesired files.
For more details on multiple inputs check this link.
The resolved values can be accessed from myTask.resolvedOutput
after the task has emitted a task.finish
event.
// Atomic item
// Output
'*.sra'
// Resolved Output
'./data/<uid>/human.sra'
// Array of items
// Output
['*.bam', '*.fastq.gz']
// Resolved Output
['./data/<uid>/reads.bam', './data/<uid>/human.fastq.gz']
// Plain object of items
// Output
{
reads: [1, 2].map(n => `*_${n}.fastq.gz`),
alignment: '*.bam'
}
// Resolved Output
{
reads: ['./data/<uid>/human_1.fastq.gz', './data/<uid>/human_2.fastq.gz'],
alignment: './data/<uid>/reads.bam'
}
To sum up, a task is the fundamental unit for building pipelines. It
- has a unique input/output pair defined by glob pattern(s) and/or streams
- has a single params object. Params should be used when they can be applied to a task with the same I/O but alter the output. e.g. command flags
- emits
task.finish
with a reference to that task object in state
Example
/*
* Usage: samtools index [-bc] [-m INT] <in.bam> [out.index]
* Options:
* -b Generate BAI-format index for BAM files [default]
* -c Generate CSI-format index for BAM files
* -m INT Set minimum interval size for CSI indices to 2^INT [14]
*/
const samtoolsIndex = task({
input: '*.bam',
output: '*.bai',
params: { format: 'b' }
name: 'samtools index'
}, ({ input, params }) => shell(`samtools index -${params.format} ${input}`))
samtoolsIndex()
.on('task.finish', (results) => console.log(results.resolvedOutput))
Warning: the following content is just a conceptual idea for bionode-watermill that is still not implemented.
If either (input or output) is not provided, it will be assumed the task is then a streaming task - i.e., it is a duplex stream with writable and/or readable portions. Consider:
const throughCapitalize = through(function (chunk, env, next) {
// through = require('through2') - a helper to create through streams
// takes chunk, its encoding, and a callback to notify when complete pushing
// push a chunk to the readable portion of this through stream with
this.push(chunk.toString().toUpperCase())
// then call next so that next chunk can be handled
next()
})
You could connect capitalize
to a readable (readFile
) and writable
(writeFile
) file
stream with:
const capitalize = task({
name: 'Capitalize Through Stream'
},
// Here, input is a readable stream that came from the previous task
// Let's return a through stream that takes the input and capitalizes it
({ input }) => input.pipe(throughCapitalize) )
const readFile = task({
input: '*.lowercase',
name: 'Read from *.lowercase'
}, ({ input }) => {
const rs = fs.createReadStream(input)
// Add file information to stream object so we have it later
rs.inFile = input
})
const writeFile = task({
output: '*.uppercase',
name: 'Write to *.uppercase'
}, ({ input }) => fs.createWriteStream(input.inFile.swapExt('uppercase')))
// Can now connect the three:
join(readFile, capitalize, writeFile)
Of course, this could be written as one single task. This is somewhat simpler, but the power of splitting up the read, transform, and write portions of a task will become apparent once we can provide multiple sets of parameters to the transform and observe the effect, without having to manually rewire input and output filenames. As a single task the above would become:
const capitalize = task({
input: '*.lowercase',
output: '*.uppercase',
name: 'Capitalize *.lowercase -> *.uppercase'
}, ({ input }) =>
fs.createReadStream(input)
.pipe(throughCapitalize)
.pipe(fs.createWriteStream(input.split('/').slice(0, -1).join('/') +
'uppercase'))
)
It is fine to run with no task name, a hashed one will be made for you. However, properly named tasks will help greatly reading pipeline output