Building Pipelines

Dependency Workflow Concepts

Operon pipelines don’t actually run anything at all; instead, they define a workflow. Parsl then uses the defined workflow to run the pipeline in a highly-efficient and configuration manner. Operon simply acts as an abstraction to Parsl so the developer need only what about what software needs to be run, and at what point in the pipeline it has all the information it needs to run successfully.

There are two main components to a dependency workflow graph:

  1. Executables
  2. Data

Executables generally take input, perform some computational work on that input, then produce some output. Data is any file on disk that is used as input or produced as output and that needs to be considered in the workflow graph.

When building a pipeline in Operon, the connections between Executables and Data need not be a part of the design process. The developer needs only to define Executables which are a part of the workflow, and the input and/or output files for each Executable. At runtime, Operon will examine the dependency workflow graph and feed the connections appropriately into Parsl.

As an example, consider the following scenario in a bioinformatics workflow : a FASTQ file is used as input to an aligner (say, bwa), which produces a BAM file. That produced BAM file needs to be used as input to two programs, one to gather flagstats and one to quantify gene counts. The dependency workflow graph would look as follows:

_images/flowchart.svg

The developer for this pipeline only needs to define the following:

  • There’s a program called bwa, which lives at /path/to/bwa. As input, it takes in a file located at /path/to/fastq, and as output it generates a file called /path/to/bam.
  • There’s a program called samtools, which lives at /path/to/samtools. As input, it takes in a file located at /path/to/bam, and as output it generates a file called /path/to/flagstats.
  • There’s a program called genecounter, which lives at /path/to/genecounter. As input, it takes in a file located at /path/to/bam, and as output it generates a file called /path/to/genecounts.

This defines three Software instances (Apps in Parsl verbage): bwa, samtools, and genecounter. All three Software instances have data dependencies; that is, they are require input data to run. However, two of the Software instances’ data dependencies are not yet available (because they haven’t been produced by the pipeline yet), so they will not run until those dependencies become available. The Software bwa, however, has all of its data dependencies available, so it begins running immediately. Once bwa is finished running, and consequently produces its output /path/to/bam, the Software samtools and genecounter both recognize that their data dependencies are now available, and so both begin running concurrently.

Note

/path/to/fastq, /path/to/bwa, etc are placeholders in the above example. In a real pipeline, the developer would gather those values either from the command line via pipeline_args or from the configuration via pipeline_config. As long as Data() inputs and outputs resolve to filesystem paths at the time of workflow generation, Parsl will be able to correctly determine data dependencies.

Pipeline Meta Definitions

Pipeline meta definitions describe how the pipeline should be installed, provisioned, and configured so that as little as possible needs to be done by the user before the pipeline is ready to run on the user’s platform.

All pipeline meta definitions (and logic, for that matter) is defined in a single document with a single class, always called Pipeline, which subclasses operon.components.ParslPipeline.

from operon.components import ParslPipeline

class Pipeline(ParslPipeline):
    def description(self):
        return 'An example pipeline'

    ...

    def pipeline(self, pipeline_args, pipeline_config):
        # Pipeline logic here

Description

The description of the pipeline is a string meant to be a human readable overview of what the pipeline does and any other relevant information for the user.

def description(self):
    return 'An example pipeline, written in Operon, powered by Parsl'

The pipeline description is displayed when the user runs operon show.

Dependencies

Pipeline dependencies are Python packages which the pipeline logic use. Dependencies are provided as a list of strings, where each string is the name of a package available on PyPI and suitable to be feed directly into pip.

def dependencies(self):
    return [
        'pysam==0.13',
        'pyvcf'
    ]

Upon pipeline installation, the user is given the option to use pip to install dependencies into their current Python environment. While this may be convenient, it may also cause package collisions or unecessary muddying of a distribution Python environment, so the user can instead opt to get the dependencies from operon show and install them manually into a Python virtual environment.

Note

If the user accepts auto-installing dependencies into their current Python environment, pip will attempt to do so using the --upgrade flag. This may upgrade or downgrade packages already installed in the current Python environment if there are any collisions.

Conda/Bioconda

Executables provided by Conda/Bioconda can be installed and injected into the user’s pipeline configuration, provided the user has Miniconda installed and in PATH. Executables are defined by a list of CondaPackage tuples, with the option to override the default conda channels that Operon loads.

from operon.components import CondaPackage

def conda(self):
    return {
        'channels': ['overriding', 'channels', 'here'],
        'packages': [
            CondaPackage(tag='star=2.4.2a', config_key='STAR', executable_path='bin/STAR'),
            CondaPackage(tag='picard', config_key='picard', executable_path='share/picard-2.15.0-0/picard.jar')
        ]
    }

If provided, channels will be loaded by Miniconda in list order, which means the last entry has the highest precedence, the second-highest entry has the second-highest precedence, etc.

A CondaPackage named tuple takes the following keys:

  • tag is the name of the executable and optional version number fed directly to Miniconda

  • config_key is the outermost key in the pipeline’s configuration(). When this executable is injected into a user’s pipeline config, it’s placed into pipeline_config[config_key]['path']

  • executable_path is only necessary if the basename of the installed executable is different from the conda tag, or if the developer wishes to use an executable outside conda’s default bin folder. Some examples:

    • The conda package star=2.4.2a is installed as STAR, so executable_path= must be set to bin/STAR
    • The conda package picard installs an executable into bin, but if the developer wishes to access the jar file directly, she must set executable_path= to share/picard-2.15.0-0/picard.jar
    • The conda package bwa installs an executable into bin called bwa, so executable_path does not need to be set

To see which executables are offered by Bioconda, please refer to their package index.

Pipeline Configuration

The pipeline configuration contains attributes passed into the pipeline logic which may change from platform to platform, but generally won’t change from run to run. For example, paths to executables for software, paths to reference files, number of threads to use, etc will vary by platform but will be the same for every run.

def configuration(self):
    return {
        'software1': {
            'path': 'Full path to software1',
            'threads': 'Run software1 with this many threads',
            'threads': {
                'q_type': 'list',
                'message': 'Run software1 with this many threads',
                'choices': ['1', '2', '4', '8', '16'],
                'default': '4'
            }
        },
        'software2': {
            'path': 'Full path to software2',
            'genome_reference': {
                'q_type': 'path',
                'message': 'Path to genome reference'
            }
        }
    }

The returned configuration dictionary may nest arbitrarily deep. All values must be either a dictionary or a string. Considering the configuration dictionary as a tree, there are two types of leaves: a string or a dictionary which configures a question to the user. During configuration of the pipeline using operon configure, the user is presented with a prompt for each leaf, and the user input is gathered and stored in place of the prompt string.

Note

The nesting of dictionaries inside the configuration dictionary is purely for the developer’s organizational convenience; the user will never see anything but prompts defined by the string values.

If the order of prompts is important, return a collections.OrderedDict instance.

For a string leaf, the question type defaults to a Text question, where the prompt presented is the string itself. The exception to this is if the word path is found in the most immediate key, the question type will default to Path.

For a dictionary leaf, the question type can be fully configured. For a dictionary to be recognized as a leaf, it must contain the key q_type, or else it will be interpreted as another level in the nested configuration dictionary. The following options can be passed to a question configuration:

  • q_type must be one of {path, text, confirm, list, checkbox, password}
  • message is the prompt displayed to the user
  • default is a default value suggested to the user as part of the prompt
  • validate is a function which determines whether the user input is valid
  • ignore is a function which determines whether to display this question to the user
  • choices is a list of choices; only used by the List and Checkbox question types
  • always_default if present with any value, will force the default to always be the value defined by the key default, regardless of whether another value was injected by Operon

The q_type and message keys are required for all question types, while the choices key is additionally required for List and Checkbox question types. For more information on how each of the question types operate, please refer to the inquirer documentation on question types.

For the above example configuration, the user will see and interactively fill in the prompts:

$ operon configure pipeline-name
[?] Full path to software1: (User enters) /path/to/soft1
[?] Run software1 with this many threads: (User selects) 8
   1
   2
   4
 > 8
   16

[?] Full path to software2: (User enters) /path/to/soft2
[?] Path to genome reference: (User enters) /path/to/genome

The input from the user is stored in the .operon folder, so the next time the pipeline is run with this configuration it will be made available in the pipeline_config parameter:

# Contents of pipeline_config
{
    'software1': {
        'path': '/path/to/soft1',
        'threads': '8'
    },
    'software2': {
        'path': '/path/to/soft2',
        'genome_reference': '/path/to/genome'
    }
}

So for any software which needs access to a genome reference, the path can be passed to the software as pipeline_config['software2']['genome_reference'].

Pipeline Arguments

The pipeline arguments are attributes that will change from run to run and are specified by the user as command line arguments on a per-run basis. Pipeline arguments are added by modifying the argparse.ArgumentParser object passed into self.arguments(); refer to the documentation for argparse for futher details on how pipeline arguments can be gathered.

def arguments(self, parser):
    parser.add_argument('--output-dir', help='Path to output directory')
    parser.add_argument('--fastqs', nargs='*', help='Paths to all input fastq files')
    parser.add_argument('--run-name', default='run001', help='Name of this run')
    # Nothing needs to be returned since parser is modified in place

Added arguments are exposed to the user when running operon run, according to the rules of the argparse module.

$ operon run pipeline -h
> operon run pipeline [-h] [--output-dir OUTPUT_DIR] [--fastqs [FASTQS [FASTQS ...]]]
>                     [--run-name RUN_NAME]
>
> Pipeline description is here
>
> optional arguments:
>   -h, --help            show this help message and exit
>   -c CONFIG, --config CONFIG
>                         Path to a config file to use for this run.
>   --output-dir OUTPUT_DIR
>                         Path to output directory
>   --fastqs [FASTQS [FASTQS ...]]
>                         Paths to all input fastq files
>   --run-name RUN_NAME   Name of this run
>
$ operon run pipeline --fastqs /path/to/fastq1.fq /path/to/fastq2.fq \
>                     --output-dir /path/to/output --run-name run005

Populated arguments are made available to the pipeline as a dictionary in the pipeline_args parameter:

# Contents for pipeline_args
{
    'fastqs': ['/path/to/fastq1.fq', '/path/to/fastq2.fq'],
    'output_dir': '/path/to/output',
    'run_name': 'run005'
}

Note

Parameters in argparse can have dashes in them (and should to separate words), but when converted to a Python dictionary dashes are replaced with underscores.

Ex. --output-dir is accessed by pipeline_args['output_dir']

Three pipeline arguments are always injected by Operon: --pipeline-config, --parsl-config, and --logs-dir. These arguments point to a pipeline config file to use for the run, a Parsl config file to use for the run, and a directory in which to store log files, respectively.

Pipeline Logic

Pipeline logic defines how the workflow dependency graph should be built. The work is done in the pipeline() method, which is given two parameters, pipeline_args and pipeline_config, which are populated at runtime with command line arguments from the user and the stored pipeline configuration file, respectively.

Executables and data are defined using a set of wrapper objects provided by Operon: this section details those components and how to use them.

def pipeline(self, pipeline_args, pipeline_config):
    # All logic to build the workflow graph goes here

Note

All the logic here is only to build the workflow dependency graph, which means that none of the executables are being run and none of the data is being produced until after pipeline() has completed. Parsl only begins actually running the software after it’s been fed the generated workflow graph.

All statements in the pipeline() method should be for generating the workflow graph, not handling or operating on data in any way. If needed, small blocks of Python can be written in a CodeBlock instance, which can be integrated into the workflow graph and so will execute at the correct time.

Data operon.components.Data

A Data instance wraps a file on the filesystem and registers it as a data node in the workflow graph. Any file that should be considered in the workflow graph needs to be wrapped in a Data instance; often this is only input or output to an executable, and may not include output like log files.

When passed as an argument to a Parameter object, the data must be specified as either input or output by calling either the .as_input() or .as_output() method. This distinction is not necessary when passing as a part of extra_inputs= or extra_outputs= keyword arguments.

Data objects can be marked as temporary, which designates the underlying file on the filesystem to be deleted at the end of the run, by setting the tmp= parameter to True in .as_output().

from operon.components import Data

bwa.register(
    Parameter('--fastq', Data('/path/to/fastq.fq').as_input()),
    Parameter('--tmp-bam', Data('/path/to/tmp.bam').as_output(tmp=True)),
    Parameter('--persistent-bam', Data('/path/to/persistent.bam').as_output())
)

samtools.register(
    Parameter('--input-bam', Data('/path/to/persistent.bam').as_input())
)

The developer does not need to keep track of individual Data instances because Data instances are uniquely identified by the filesystem paths they wrap; that is, if a Data instance is created as Data('/path/to/file'), any subsequent calls to Data('/path/to/file') will not create a new Data instance but rather simply refer to the instance already created. Of course, the developer could store Data instances in variables and pass those instead, if desired.

Data instances can be used in-place anywhere a filesystem path would be passed; that includes both Parameter and Redirect objects.

Meta operon.meta.Meta

The Meta class has a method define_executor() used to give a name to a resource configuration.

from operon.meta import Meta

Meta.define_executor(label='small_site', resources={
    'cpu': '2',
    'mem': '2G'
})

Meta.define_executor(label='large_site', resources={
    'cpu': '8',
    'mem': '50G'
})

The value passed to resources must be a dictionary with the keys cpu and mem as in the above example. The value of mem should be an integer (as a string) followed by one of M, G, or T.

Software operon.components.Software

A Software instance is an abstraction of an executable program external to the pipeline.

from operon.components import Software

bwa = Software(name='bwa', path='/path/to/bwa')
samtools_flagstat = Software(name='samtools', subprogram='flagstat')
genecounter = Software(name='genecounter', path='/path/to/genecounter')

If the path= parameter isn’t given, Operon will try to infer the path by looking in pipeline_config[name]['path']. If the path can’t be inferred, a ValueError will be thrown.

To register an Executable node in the workflow graph, call the Software instance’s .register() method. register() takes any of Parameter, Redirect, Pipe. Keyword arguments extra_inputs= and extra_outputs= can also be given to pass in respective lists of Data() input and output that aren’t defined as a command line argument to the Executable.

bwa.register(
    Parameter('--fastq', Data('/path/to/fastq.fq')),
    Parameter('--phred', '33'),
    Redirect(stream=Redirect.STDERR, dest='/logs/bwa.log'),
    extra_inputs=[Data('/path/to/indexed_genome.fa')],
    extra_outputs=[Data('/path/to/bam')]
)

The register() method returns an object wrapping the Executable node’s id, which can be passed to other Software instances via the wait_on= keyword. If a Software is given other apps in its wait_on=, those other apps will be included in the input dependencies, and so won’t start running until all app and data dependencies are resolved.

first_app = first.register(
    Parameter('-a', '1')
)

second.register(
    Parameter('--output', Data('second.out').as_output())
)

third.register(
    Parameter('b', Data('second.out').as_input()),
    wait_for=[first_app]
)

In the above example, third won’t start running until both first is finished running and the output from second called second.out is available.

Multiexecutor Pipelines

For many workflows, the resource requirements of its software won’t be uniform. One solution is to calculate the largest resource need and allocate that to every software, but this leads to a large amount of unused resources. A better solution is to define resource pools of varying size and assign software to an appropriate pool. This can be done with the meta= keyword argument.

The developer can define a resource configuration with a call to Meta.define_executor() and then pass that name to the meta= keyword argument:

from operon.components import Software
from operon.meta import Meta

Meta.define_executor(label='small_site', resources={
    'cpu': '2',
    'mem': '2G'
})

soft1 = Software('soft1')
soft1.register(
    Parameter('-a', '1'),
    Parameter('-b', '2'),
    meta={
        'executor': 'small_site'  # Matches the above Meta definition
    }
)

CodeBlock operon.components.CodeBlock

A CodeBlock instance wraps a Python function that can be passed Data instances in much the same way as a Software instance, and so can be integrated into the workflow graph. That is, a functions wrapped in a CodeBlock will wait to execute until all its data dependencies are available.

The function wrapped by a CodeBlock instance can be defined as normal and registered with CodeBlock.register(), where arguments and data dependencies can be defined.

def get_mapped_reads_from_flagstats(star_output_bam):
    import re
    with open(star_output_bam + '.flagstat') as flagstats:
        flagstats_contents = flagstats.read()
        target_line = re.search(r'(\d+) \+ \d+ mapped', flagstats_contents)
        if target_line is not None:
            with open('output.txt', 'w') as output_write:
                output_write.write(str(int(target_line.group(1))/2) + '\n')
CodeBlock.register(
    func=get_mapped_reads_from_flagstats,
    args=[],
    kwargs={'star_output_bam': star_output_bam},
    inputs=[Data(star_output_bam + '.flagstat').as_input()],
    outputs=[Data('output.txt').as_output()]
)

Note

When a function wrapped by a CodeBlock actually executes, the scope in which it was defined will be long gone. That means that any variables or data structures declared in pipeline() can’t be counted on as available in the body of the function. It also means that any modules the function needs to use must be explicitly imported by the function, even if that module has already been imported by the pipeline document.

The return value of a CodeBlock is the same as that for a Software instance, and can be passed to other Software or CodeBlocks via the wait_on= keyword argument.

Parameter operon.components.Parameter

A Parameter object represents a parameter key and value(s) passed into a Software instance.

from operon.components import Parameter

Parameter('-a', '1')  # Becomes '-a 1'
Parameter('--type', 'gene', 'transcript')  # Becomes '--type gene transcript'
Parameter('--output=/path/to/output')  # Becomes '--output=/path/to/output'

When multiple Parameter instances are passed into a Software instance, order is preserved, which is important for positional arguments.

Redirect operon.components.Redirect

A Redirect objects represents an output stream redirection. The keyword arguments stream= and dest= direct which stream(s) to redirect and to where on the filesystem, respectively.

from operon.components import Redirect

bwa.register(
    Parameter('-a', '1000'),
    Redirect(stream=Redirect.STDOUT, dest='/path/to/bwa.log')
)

stream= can be one of the provided constants:

Redirect.STDOUT         # >
Redirect.STDERR         # 2>
Redirect.BOTH           # &>

The order of Redirect objects passed to a Software instance, both in relation to each other and to other Parameter objects, doesn’t matter. However, if more than two Redirect s are passed in, only the first two will be considered.

Pipe operon.components.Pipe

A Pipe object represents piping the output of one executable into the input of another. The producing Software instance is passed a Pipe object, which contains the receiving Software instance.

from operon.components import Pipe

software1.register(
    Parameter('-a', '1'),
    Pipe(software2.prep(
        Parameter('-b', '2'),
        Parameter('-c', '3')
    ))
)
# Registers as: software1 -a 1 | software2 -b 2 -c3

Note

Since the whole executable call needs to be registered with Parsl as a single unit, register() is only called on the outermost Software instances. Within a Pipe object, the receiving Software instance should instead call prep(), which takes all the same parameters as register().

Pipeline Logging

All stream output from all Executables in the workflow graph that aren’t explicitly redirected to a file with a Redirect is gathered and output to a single pipeline log file at the end of execution.

The location of this log file is defined by the user with the --logs-dir pipeline argument injected into every pipeline. It may be of interest to the developer to also put any explicitly redirected log files into this directory.