Developer Tutorial

This tutorial is meant to guide you, the pipeline developer, through all the necessary concepts to write an optimally parallel and completely portable working pipeline in under 150 lines of code.

Since Operon was originally developed with bioinformatics in mind, we’re going to be developing a bioinformatics pipeline and using bioinformatics concepts, but Operon certainly isn’t limited to any single field. The developer can easily take the concepts talked about here and apply them to any set of software.

Let’s get started. This tutorial assumes you have Operon installed. First we need a plain Python file; create one called tutorial.py in whatever editor you desire. The very first thing we need to do is create a class called Pipeline which is a subclass of operon.components.ParslPipeline. Add the following to tutorial.py:

from operon.components import ParslPipeline


class Pipeline(ParslPipeline):
    def description(self):
        pass

    def dependencies(self):
        pass

    def conda(self):
        pass

    def arguments(self, parser):
        pass

    def configuration(self):
        pass

    def pipeline(self, pipeline_args, pipeline_config):
        pass

Pipelines are defined by overriding methods in ParslPipeline as above. We’ll fill in those methods now, going from simple to more complex.

description(self)

The description(self) method needs only to return a string that is used to describe the pipeline on various help screens. Fill it in like so:

def description(self):
    return 'A tutorial pipeline'

dependencies(self)

The dependencies(self) method needs to return a list, where each element is a string representation of a Python packages acceptable to pip, which the user has the option to install at the time of pipeline installation. Generally these packages should be ones that are used in the pipeline but aren’t a part of the standard library. In this tutorial, we’re going to use PyVCF like so:

def dependencies(self):
    return ['pyvcf']

Note

When the user installs Python dependencies, they’re installed into the current Python environment, which may or may not be the correct environment for eventual pipeline execution. There isn’t much the developer can do about this, so don’t stress too much.

conda(self)

The conda(self method is used to define what external software the pipeline uses, and returns a dictionary with two possible keys which are both optional:

{
    'packages': [
        # List of bioconda packages to install
    ],
    'channels': [
        # List of anaconda channels to use
    ]
}

Unless channels is provided, Operon will use bioconda to download packages. The elements of the packages key should be operon.components.CondaPackage objects (add that import now). We will be using the following:

def conda(self):
    return {
        'packages': [
            CondaPackage(tag='bwa', config_key='bwa'),
            CondaPackage(tag='macs2=2.1.1', config_key='macs2'),
            CondaPackage(tag='samtools=1.6', config_key='samtools'),
            CondaPackage(tag='picard=2.9.2', config_key='picard'),
            CondaPackage(tag='freebayes=1.1.0', config_key='freebayes')
        ]
    }

Note

The conda(self) method is not required but is a massive convenience to the user (which might also be you) because it enables the user to not have to track down and manually install software to run the pipeline. Everything defined here, which presumably encompasses all or most software used in the pipeline, can be automatically gathered and injected into the pipeline’s configuration dictionary at the time of configuration.

arguments(self, parser)

For this simple tutoral, we’ll only take three basic arguments like so:

def arguments(self, parser):
    parser.add_argument('--read', help='Path to read in fastq format')
    parser.add_argument('--output-dir', help='Path to an output directory')
    parser.add_argument('--lib', help='Name of this sample library')

configuration(self)

Most of our configuration will be paths, which is a common practice, with a threading question thrown in. Notice in bwa|reference and freebayes|reference_fasta the expanded leaf type is used so that we can get those as path questions instead of plain text, since we are asking for a path.

def configuration(self):
    return {
        'bwa': {
            'path': 'Path to bwa',
            'reference': {
                'q_type': 'path',
                'message': 'Path to a reference genome prefix for bwa'
            },
            'threads': 'Number of threads to run bwa'
        },
        'macs2': {
            'path': 'Path to macs2'
        },
        'samtools': {
            'path': 'Path to samtools'
        },
        'picard': {
            'path': 'Path to picard'
        },
        'freebayes': {
            'path': 'Path to freebayes',
            'reference_fasta': {
                'q_type': 'path',
                'message': 'Full path to reference fasta'
            }
        }
    }

pipeline(self, pipeline_args, pipeline_config)

Now the main part of the pipeline building process. We’ve defined our periphery and can assume that the parameters pipeline_args and pipeline_config have been populated and that all the software we’ve asked for is installed.

Generally the first step is to define operon.components.Software instances:

def pipeline(self, pipeline_args, pipeline_config):
    freebayes = Software('freebayes')
    bwa_mem = Software('bwa', subprogram='mem')
    macs2 = Software('macs2', subprogram='callpeak')
    picard_markduplicates = Software(
        name='picard_markduplicates',
        path=pipeline_config['picard']['path'],
        subprogram='MarkDuplicates'
    )
    samtools_flagstat = Software(
        name='samtools_flagstat',
        path=pipeline_config['samtools']['path'],
        subprogram='flagstat'
    )
    samtools_sort = Software(
        name='samtools_sort',
        path=pipeline_config['samtools']['path'],
        subprogram='sort'
    )

For freebayes the instantiation is very simple: the default path resolution is fine, and there isn’t a subprogram to call. bwa mem and macs2 callpeak are slightly more involved, but only by adding a subprogram= keyword argument.

For picard and samtools, we’re giving names that don’t have a match in the configuration dictionary. That means the default path resoluation won’t work, so we need to give it paths explicitly with the path= keyword argument.

Next some very simple pipeline setup, just creating the output directory where the user defined output to go. There may be more setup in more complicated pipelines. Add:

# Set up output directory
os.makedirs(pipeline_args['output_dir'], exist_ok=True)

Now we can start constructing our pipeline workflow. Modify the import statments at the top of the file to:

import os
from operon.components import ParslPipeline, CondaPackage, Software, Parameter, Redirect, Data, CodeBlock

We’ll need all those components eventually.

The general idea the developer should take when constructing the pipeline workflow is to think of software as stationary steps and data as elements flowing through those steps. Each stationary step has data coming in and data going out. With that mental model, we can construct the following workflow:

First we will run the software bwa whose output will flow into samtools sort; this will be sequential so there’s no parallelization involved quite yet. The output of samtools sort will flow into both samtools flagstat and picard markduplicates, forming our first two-way branch. These two program will run in parallel the moment that samtools sort produces its output. From there, the output of picard markduplicates flows as input into both the freebayes variant caller and the macs2 peak caller, forming another two-way branch. Finally, the output of freebayes will flow as input into a Python code block which uses PyVCF. The overall workflow will terminate when all leaves have completed; in this case, samtools flagstat, macs2, and the Python code block.

Let’s dive in. The first software we need to insert into the workflow is bwa:

alignment_sam_filepath = os.path.join(pipeline_args['output_dir'], pipeline_args['lib'] + '.sam')
bwa_mem.register(
    Parameter('-t', pipeline_config['bwa']['threads']),
    Parameter(pipeline_config['bwa']['reference']),
    Parameter(Data(pipeline_args['read']).as_input()),
    Redirect(stream='>', dest=Data(alignment_sam_filepath).as_output(tmp=True))
)

There’s a lot going on here. First we define a filepath to send out alignment output from bwa. Then we call the .register() method of bwa, which signals to Operon that we want to insert bwa into the workflow as a stationary step; the input data and output data flow is defined in the arguments to .register().

The first parameter is simple enough, just passing a -t in with the number of threads coming from our pipeline configuration. The second parameter is a positional argument pointing to the reference genome we wish to use for this alignment.

The third argument is importantly different. It’s another positional argument mean to tell bwa where to find its input fastq file, but the path is wrapped in a call to a Data object. Using a Data object is how Operon knows which data/files on the filesystem should be considered as part of the workflow; failure to specify input or output paths inside Data objects will cause Operon to miss them and may result in workflow programs running before they have all their inputs! Notice also the .as_input() method is called on the Data object, which tells Operon not only is this path important, but it should be treated as input data into bwa.

Finally, the Redirect object (bwa send its output to stdout) sends the stdout stream to a filepath, again wrapped in a Data object and marked as output. The tmp=True keyword argument tell Operon to delete this file after the whole pipeline is finished, since we’re not too interested in keeping that file around in our final results.

sorted_sam_filepath = os.path.join(pipeline_args['output_dir'], pipeline_args['lib'] + '.sorted.sam')
samtools_sort.register(
    Parameter('-o', Data(sorted_sam_filepath).as_output()),
    Parameter(Data(alignment_sam_filepath).as_input())
)

The next step in the workflow is samtools sort, which takes the output from bwa as input and produces some output of its own. Notice again the important filepaths are wrapped in Data objects and given a specification as input or output.

Note

Although there is certainly a conceptual link between the output from bwa and the input here, that link does not need to be explicitly defined. As long as the same filepath is used, Operon will automatically recognize and link together input and output data flows between bwa and samtools sort.

After samtools sort, we’re at our first intersection. It doesn’t matter in which order we define the next steps, so add something similar to:

samtools_flagstat.register(
    Parameter(Data(sorted_sam_filepath).as_input()),
    Redirect(stream='>', dest=sorted_sam_filepath + '.flagstat')
)

dup_sorted_sam_filepath = os.path.join(pipeline_args['output_dir'], pipeline_args['lib'] + '.dup.sorted.sam')
picard_markduplicates.register(
    Parameter('I={}'.format(sorted_sam_filepath)),
    Parameter('O={}'.format(dup_sorted_sam_filepath)),
    Parameter('M={}'.format(os.path.join(pipeline_args['logs_dir'], 'marked_dup_metrics.txt'))),
    extra_inputs=[Data(sorted_sam_filepath)],
    extra_outputs=[Data(dup_sorted_sam_filepath)]
)

There are a couple things to notice here. samtools flagstat takes input in a Data object but its output is a plain string. This is because the output of samtools flagstat isn’t used as an input into any other program, so there’s no need to treat it as a special file.

Notice also that both samtools flagstat and picard markduplicates use the same filepath as input; in fact that’s the point! Operon (really Parsl) will recognize that and make a parallel fork here, running each program at the same time. As the developer, you didn’t have to explicitly say to fork here, it is just what the workflow calls for. This is what we mean by the workflow being automatically and optimally parallel.

Finally, there are some funny keyword arguments to picard_markduplicates.register(). This is because of how the parameters are passed to picard markduplicates: it wants the form I=/some/path, which mostly easily achieved with a format string, as we’ve done. But if we were to pass in like this:

Parameter('I={}'.format(Data(sorted_sam_filepath).as_input()))

that wouldn’t work. Why not? It’s how we’ve passed in special input data in the other programs!

When the Data object is coerced into a string, it just becomes its path as a plain string. If a Data object is used in a format string’s .format() method, it will become it string representation before Operon can ever recognize it. To mitigate that, we can explicitly tell Operon what the special input and output Data items are in the extra_inputs= and extra_outputs= keyword arguments, respectively. Notice in those keyword arguments we don’t need to call .as_input() or .as_output() because Operon can determine which is which from the keyword.

Now we come to our final two-way fork:

macs2_output_dir = os.path.join(pipeline_args['output_dir'], 'macs2')
os.makedirs(macs2_output_dir, exist_ok=True)
macs2.register(
    Parameter('--treatment', Data(dup_sorted_sam_filepath).as_input()),
    Parameter('--name', pipeline_args['lib']),
    Parameter('--outdir', macs2_output_dir),
    Parameter('--call-summits'),
    Parameter('--shift', '100')
)

vcf_output_path = os.path.join(pipeline_args['output_dir'], pipeline_args['lib'] + '.vcf')
freebayes.register(
    Parameter('--fasta-reference', pipeline_config['freebayes']['reference_fasta']),
    Parameter(Data(dup_sorted_sam_filepath).as_input()),
    Redirect(stream='>', dest=Data(vcf_output_path).as_output())
)

There isn’t much going on here that we haven’t already discussed, so we won’t go into detail. Both macs2 and freebayes use the output of picard markduplicates, so they will run once the output of picard markduplicates is available.

Finally, the output of freebayes is used as input into a Python codeblock:

def get_first_five_positions(vcf_filepath, output_filepath):
    import vcf
    vcf_reader = vcf.Reader(open(vcf_filepath))
    with open(output_filepath, 'w') as output_file:
        for record in vcf_reader[:5]:
            output_file.write(record.POS + '\n')
CodeBlock.register(
    func=get_first_five_positions,
    kwargs={
        'vcf_filepath': vcf_output_path,
        'output_filepath': os.path.join(pipeline_args['output_dir'], 'first_five.txt')
    },
    inputs=[Data(vcf_output_path)]
)

CodeBlocks can be thought of as very similar to Software instances with the same data flow model.

Running the Tutorial Pipeline

The pipeline in complete! If you with to attempt to run it, simply install it into Operon, configure it (you might need to create a reference genome index), and run it with the small data bundle linked below.

Download the tutorial data bundle.

An awkward step in the Operon installation process is the necessity of a reference genome index. To generate one we need a reference genome fasta file and then to run bwa index over it; this mean that when we first configure our pipeline when it asks for the bwa reference genome prefix we won’t have it, but we can come back and fill it in later.

$ /path/to/bwa index /path/to/downloaded/dm6.fa.gz

bwa dumps its index in the same folder as the genome fasta, so the reference prefix is just the path to the reference fasta.

Use the inluded Drosophila melanogaster DNA fastq as your --read input.

freebayes can’t deal with a compressed reference genome fasta, so uncompress it before passing the path in the pipeline config.