Process creation guidelines

Basic process creation

The addition of a new process to FlowCraft requires three main steps:

  1. Create process template: Create a jinja2 template in flowcraft.generator.templates with the nextflow code.
  2. Create Process class: Create a Process subclass in flowcraft.generator.process with information about the process (e.g., expected input/output, secondary inputs, etc.).

Create process template

First, create the nextflow template that will be integrated into the pipeline as a process. This file must be placed in flowcraft.generator.templates and have the .nf extension. In order to allow the template to be dynamically added to a pipeline file, we use the jinja2 template language to substitute key variables in the process, such as input/output channels.

An example created as a my_process.nf file is as follows:

some_channel_{{ pid }} = Channel.value(params.param1{{ param_id}})
other_channel_{{ pid }} = Chanel.fromPath(params.param2{{ param_id}})

process myProcess_{{ pid }} {

    {% include "post.txt" ignore missing %}

    publishDir "results/myProcess_{{ pid }}", pattern: "*.tsv"

    input:
    set sample_id, <data> from {{ input_channel }}
    val x from some_channel_{{ pid }}
    file y from other_channel_{{ pid }}
    val direct_from_parms from Channel.value(params.param3{{param_id}}

    // The output is optional
    output:
    set sample_id, <data> into {{ output_channel }}
    {% with task_name="abricate" %}
    {%- include "compiler_channels.txt" ignore missing -%}
    {% endwith %}

    """
    <process code/commands>
    """
}

{{ forks }}

The fields surrounded by curly brackets are jinja placeholders that will be dynamically substituted when building the pipeline. They will ensure that the processes and potential forks correctly link with each other and that channels are unique and correctly linked. This example contains all placeholder variables that are currently supported by FlowCraft.

{{pid}}

Used as a unique process identifier that prevent issues from process and channel duplication in the pipeline. Therefore, is should be appended to each process and channel name as _{{ pid }} (note the underscore):

some_channel_{{ pid }}
process myProcess_{{ pid }}

{{param_id}}

Same as the {{ pid }}, but sets the identified for nextflow params. It should be appended to each param as {{ param_id }}. This will allow parameters to be specific to each component in the pipeline:

Channel.value(params.param1{{ param_id}})

Note that the parameters used in the template, should also be defined in the Process class params attribute (see Parameters).

{% include “post.txt” %}

Inserts beforeScript and afterScript statements to the process that sets environmental variables and a series of dotfiles for the process to log their status, warnings, fails and reports (see Dotfiles for more information). It also includes scripts for sending requests to REST APIs (only when certain pipeline parameters are used).

{{input_channel}}

All processes must include one and only one input channel. In most cases, this channel should be defined with a two element tuple that contains the sample ID and then the actual data file/stream. We suggest the sample ID variable to be named sample_id as a standard. If other name variable name is specified and you include the compiler_channels.txt in the process, you’ll need to change the sample ID variable (see Sample ID variable).

{{output_channel}}

Terminal processes may skip the output channel entirely. However, if you want to link the main output of this process with subsequent ones, this placeholder must be used only once. Like in the input channel, this channel should be defined with a two element tuple with the sample ID and the data. The sample ID must match the one specified in the input_channel.

{% include “compiler_channels.txt %}

This will include the special channels that will compile the status/logging of the processes throughout the pipeline. You must include the whole block (see Status channels):

{% with task_name="abricate" %}
{%- include "compiler_channels.txt" ignore missing -%}
{% endwith %}

{{forks}}

Inserts potential forks of the main output channel. It is mandatory if the output_channel is set.

Complete example

As an example of a complete process, this is the template of spades.nf:

IN_spades_opts_{{ pid }} = Channel.value([params.spadesMinCoverage{{ param_id }},params.spadesMinKmerCoverage{{ param_id }}])
IN_spades_kmers_{{pid}} = Channel.value(params.spadesKmers{{ param_id }})

process spades_{{ pid }} {

    // Send POST request to platform
    {% include "post.txt" ignore missing %}

    tag { fastq_id + " getStats" }
    publishDir 'results/assembly/spades/', pattern: '*_spades.assembly.fasta', mode: 'copy'

    input:
    set fastq_id, file(fastq_pair), max_len from {{ input_channel }}.join(SIDE_max_len_{{ pid }})
    val opts from IN_spades_opts_{{ pid }}
    val kmers from IN_spades_kmers_{{ pid }}

    output:
    set fastq_id, file('*_spades.assembly.fasta') optional true into {{ output_channel }}
    set fastq_id, val("spades"), file(".status"), file(".warning"), file(".fail") into STATUS_{{ pid }}
    file ".report.json"

    script:
    template "spades.py"
}

{{ forks }}

Create Process class

The process class will contain the information that FlowCraft will use to build the pipeline and assess potential conflicts/dependencies between process. This class should be created in one the category files in the flowcraft.generator.components module (e.g.: assembly.py). If the new component does not fit in any of the existing categories, create a new one that imports flowcraft.generator.process.Process and add your new class. This class should inherit from the Process base class:

class MyProcess(Process):

    def __init__(self, **kwargs):

        super().__init__(**kwargs)

        self.input_type = "fastq"
        self.output_type = "fasta"

This is the simplest working example of a process class, which basically needs to inherit the parent class attributes (the super part). Then we only need to define the expected input and output types of the process. There are no limitations to the input/output types. However, a pipeline will only build successfully when all processes correctly link the output with the input type.

Depending on the process, other attributes may be required:

  • Parameters: Parameters provided by the user to be used in the process.
  • Secondary inputs: Channels created from parameters provided by the user.
  • Secondary Link start and Link end: Secondary links that connect secondary information between two processes.
  • Dependencies: List of other processes that may be required for the current process.
  • Directives: Default information for RAM/CPU/Container directives and more.

Add to available components

Contrary to previous implementation (version <= 1.3.1), the available components are now retrieved automatically by FlowCraft and there is no need to add the process to any dictionary (previous process_map). In order for the component to be accessible to flowcraft build the process template name in snake_case must match the process class in CamelCase. For instance, if the process template is named my_process.nf, the process class must be MyProcess, then the FlowCraft will be able to automatically add it to the list of available components.

Note

Note that the template string does not include the .nf extension.

Process attributes

This section describes the main attributes of the Process class: what they do and how do they impact the pipeline generation.

Input/Output types

The input_type and output_type attributes set the expected type of input and output of the process. There are no limitations to the type of input/output that are provided. However, processes will only link when the output of one process matches the input of the subsequent process (unless the ignore_type attribute is set to True). Otherwise, FlowCraft will raise an exception stating that two processes could not be linked.

Note

The input/ouput types that are currently used are fastq, fasta.

Parameters

The params attribute sets the parameters that can be used by the process. For each parameter, a default value and a description should be provided. The default value will be set in the params.config file in the pipeline directory and the description will be used to generated the custom help message of the pipeline:

self.params = {
    "genomeSize": {
        "default": 2.1,
        "description": "Expected genome size (default: params.genomeSiz)
    },
    "minCoverage": {
        "default": 15,
        "description": "Minimum coverage to proceed (default: params.minCoverage)"
    }
}

These parameters can be simple values that are not feed into any channel, or can be automatically set to a secondary input channel via Secondary inputs (see below).

They can be specified when running the pipeline like any nextflow parameter (e.g.: --genomeSize 5) and used in the nextflow process as usual (e.g.: params.genomeSize).

Note

These pairs are then used to populate the params.config file that is generated in the pipeline directory. Note that the values are replaced literally in the config file. For instance, "genomeSize": 2.1, will appear as genomeSize = 2.1, whereas "adapters": "'None'" will appear as adapters = 'None'. If you want a value to appear as a string, the double and single quotes are necessary.

Secondary inputs

Warning

The secondary_inputs attribute has been deprecated since v1.2.1. Instead, specify the secondary channels directly in the nextflow template files.

Any process can receive one or more input channels in addition to the main channel. These are particularly useful when the process needs to receive additional options from the parameters scope of nextflow. These additional inputs can be specified via the secondary_inputs attribute, which should store a list of dictionaries (a dictionary for each input). Each dictionary should contains a key:value pair with the name of the parameter (params) and the definition of the nextflow channel (channel). Consider the example below:

self.secondary_inputs = [
        {
            "params": "genomeSize",
            "channel": "IN_genome_size = Channel.value(params.genomeSize)"
        },
        {
            "params": "minCoverage",
            "channel": "IN_min_coverage = Channel.value(params.minCoverage)"
        }
    ]

This process will receive two secondary inputs that are given by the genomeSize and minCoverage parameters. These should be also specified in the params attribute (See Parameters above).

For each of these parameters, the dictionary also stores how the channel should be defined at the beginning of the pipeline file. Note that this channel definition mentions the parameters (e.g. params.genomeSize). An additional best practice for channel definition is to include one or more sanity checks to ensure that the provided arguments are correct. These checks can be added in the nextflow template file, or literally in the channel string:

self.secondary_inputs = [
    {
        "params": "genomeSize",
        "channel":
                "IN_genome_size = Channel.value(params.genomeSize)"
                "map{it -> it.toString().isNumber() ? it : exit(1, \"The genomeSize parameter must be a number or a float. Provided value: '${params.genomeSize}'\")}"
        }

Extra input

The extra_input attribute is mostly a user specified directive that allows the injection of additional input data from a parameter into the main input channel of the process. When a pipeline is defined as:

process1 process2={'extra_input':'var'}

FlowCraft will expose a new var parameter, setup an extra input channel and mix it with process2 main input channel. A more detailed explanation follows below.

First, FlowCraft will create a nextflow channel from the parameter name provided via the extra_input directive. The channel string will depend on the input type of the process (this string is fetched from the RAW_MAPPING attribute). For instance, if the input type of process2 is fastq, the new extra channel will be:

IN_var_extraInput = Channel.fromFilePairs(params.var)

Since the same extra input parameter may be used by more than one process, the IN_var_extraInput channel will be automatically forked into the final destination channels:

// When there is a single destination channel
IN_var_extraInput.set{ EXTRA_process2_1_2 }
// When there are multiple destination channels for the same parameter
IN_var_extraInput.into{ EXTRA_process2_1_2; EXTRA_process3_1_3 }

The destination channels are the ones that will be actually mixed with the main input channels:

process process2 {
    input:
    (...) main_channel.mix(EXTRA_process2_1_2)
}

In these cases, the processes that receive the extra input will process the data provided by the preceding channel AND by the parameter. The data provided via the extra input parameter does not have to wait for the main_channel, which means that they can run in parallel, if there are enough resources.

Compiler

The compiler attribute allows one or more channels of the process to be fed into a compiler process (See Compiler processes). These are special processes that collect information from one or more processes to execute a given task. Therefore, this parameter can only be used when there is an appropriate compiler process available (the available compiler processes are set in the compilers dictionary). In order to provide one or more channels to a compiler process, simply add a key:value to the attribute, where the key is the id of the compiler process present in the compilers dictionary and the value is the list of channels:

self.compiler["patlas_consensus"] = ["mappingOutputChannel"]

Dependencies

If a process depends on the presence of one or more processes upstream in the pipeline, these can be specific via the dependencies attribute. When building the pipeline if at least one of the dependencies is absent, FlowCraft will raise an exception informing of a missing dependency.

Directives

The directives attribute allows for information about cpu/RAM usage and container to be specified for each nextflow process in the template file. For instance, considering the case where a Process has a template with two nextflow processes:

process proc_A_{{ pid }} {
    // stuff
}

process proc_B_{{ pid }} {
    // stuff
}

Then, information about each process can be specified individually in the directives attribute:

class myProcess(Process):
    (...)
    self.directives = {
        "proc_A": {
            "cpus": 1
            "memory": "4GB"
        },
        "proc_B": {
            "cpus": 4
            "container": "my/container"
            "version": "1.0.0"
        }
    }

The information in this attribute will then be used to build the resources.config (containing the information about cpu/RAM) and containers.config (containing the container images) files. Whenever a directive is missing, such as the container and version from proc_A and memory from proc_B, nothing about them will be written into the config files and they will use the default pipeline values:

Ignore type

The ignore_type attribute, controls whether a match between the input of the current process and the output of the previous one is enforced or not. When there are multiple terminal processes that fork from the main channel, there is no need to enforce the type match and in that case this attribute can be set to False.

Process ID

The process ID, set via the pid attribute, is an arbitrarily and incremental number that is awarded to each process depending on its position in the pipeline. It is mainly used to ensure that there are no duplicated channels even when the same process is used multiple times in the same pipeline.

Template

The template attribute is used to fetch the jinja2 template file that corresponds to the current process. The path to the template file is determined as follows:

join(<template directory>, template + ".nf")

Status channels

The status channels are special channels dedicated to passing information regarding the status, warnings, fails and logging from each process (see Dotfiles for more information). They are used only when the nextflow template file contains the appropriate jinja2 placeholder:

output:
{% with task_name="<nextflow_template_name>" %}
{%- include "compiler_channels.txt" ignore missing -%}
{% endwith %}

By default, every Process class contains a status_channels list attribute that contains the template string:

self.status_channels = ["STATUS_{}".format(template)]

If there is only one nextflow process in the template and the task_name variable in the template matches the template attribute, then it’s all automatically set up.

If the template file contains more than one nextflow process definition, multiple placeholders can be provided in the template:

process A {
    (...)
    output:
    {% with task_name="A" %}
    {%- include "compiler_channels.txt" ignore missing -%}
    {% endwith %}
}

process B {
    (...)
    output:
    {% with task_name="B" %}
    {%- include "compiler_channels.txt" ignore missing -%}
    {% endwith %}
}

In this case, the status_channels attribute would need to be changed to:

self.status_channels = ["A", "B"]

Sample ID variable

In case you change the standard nextflow variable that stores the sample ID in the input of the process (sample_id), you also need to change it for the compiler_channels placeholder:

process A {

input:
set other_id, data from {{ input_channel }}

output:
{% with task_name="B", sample_id="other_id" %}
{%- include "compiler_channels.txt" ignore missing -%}
{% endwith %}

}

Advanced use cases

Compiler processes

Compilers are special processes that collect data from one or more processes and perform a given task with that compiled data. They are automatically included in the pipeline when at least one of the source channels is present. In the case there are multiple source channels, they are merged according to a specified operator.

Creating a compiler process

The creation of the compiler process is simpler than that of a regular process but follows the same three steps.

  1. Create a nextflow template file in flowcraft.generator.templates:

    process fullConsensus {
    
        input:
        set id, file(infile_list) from {{ compile_channels }}
    
        output:
        <output channels>
    
        script:
        """
        <commands/code/template>
        """
    
    }
    

The only requirement is the inclusion of a compiler_channels jinja placeholder in the main input channel.

  1. Create a Compiler class in the flowcraft.generator.process module:

    class PatlasConsensus(Compiler):
    
        def __init__(self, **kwargs):
    
            super().__init__(**kwargs)
    

This class must inherit from Compiler and does not require any more changes.

3. Map the compiler template file to the class in compilers attribute:

self.compilers = {
"patlas_consensus": {
    "cls": pc.PatlasConsensus,
    "template": "patlas_consensus",
    "operator": "join"
    }
}

Each compiler should contain a key:value entry. The key is the compiler id that is then specified in the compiler attribute of the component classes. The value is a json/dict object that species the compiler class in the cls key, the template string in the template string and the operator used to join the channels into the compiler via the operator key.

How a compiler process works

Consider the case where you have a compiler process named compiler_1 and two processes, process_1 and process_2, both of which feed a single channel to compiler_1. This means that the class definition of these processes include:

class Process_1(Process):
    (...)
    self.compiler["compiler_1"] = ["channel1"]

class Process_2(Process):
    (...)
    self.compiler["compiler_1"] = ["channel2"]

If a pipeline is built with at least one of these process, the compiler_1 process will be automatically included in the pipeline. If more than one channel is provided to the compiler, they will be merged with the specified operator:

process compiler_1 {

    input:
    set sample_id, file(infile_list) from channel2.join(channel1)

}

This will allow the output of multiple separate process to be processed by a single process in the pipeline, and it automatically adjusts according to the channels provided to the compiler.