assemblerflow.generator.engine module

assemblerflow.generator.engine.process_map = {'seq_typing': <class 'assemblerflow.generator.components.typing.SeqTyping'>, 'trimmomatic': <class 'assemblerflow.generator.components.reads_quality_control.Trimmomatic'>, 'abricate': <class 'assemblerflow.generator.components.annotation.Abricate'>, 'reads_download': <class 'assemblerflow.generator.components.downloads.DownloadReads'>, 'chewbbaca': <class 'assemblerflow.generator.components.mlst.Chewbbaca'>, 'fastqc_trimmomatic': <class 'assemblerflow.generator.components.reads_quality_control.FastqcTrimmomatic'>, 'mash_dist': <class 'assemblerflow.generator.components.distance_estimation.PatlasMashDist'>, 'process_spades': <class 'assemblerflow.generator.components.assembly_processing.ProcessSpades'>, 'check_coverage': <class 'assemblerflow.generator.components.reads_quality_control.CheckCoverage'>, 'skesa': <class 'assemblerflow.generator.components.assembly.Skesa'>, 'mash_screen': <class 'assemblerflow.generator.components.distance_estimation.PatlasMashScreen'>, 'mlst': <class 'assemblerflow.generator.components.mlst.Mlst'>, 'spades': <class 'assemblerflow.generator.components.assembly.Spades'>, 'integrity_coverage': <class 'assemblerflow.generator.components.reads_quality_control.IntegrityCoverage'>, 'prokka': <class 'assemblerflow.generator.components.annotation.Prokka'>, 'patho_typing': <class 'assemblerflow.generator.components.typing.PathoTyping'>, 'pilon': <class 'assemblerflow.generator.components.assembly_processing.Pilon'>, 'mapping_patlas': <class 'assemblerflow.generator.components.patlas_mapping.PatlasMapping'>, 'assembly_mapping': <class 'assemblerflow.generator.components.assembly_processing.AssemblyMapping'>, 'process_skesa': <class 'assemblerflow.generator.components.assembly_processing.ProcessSkesa'>, 'true_coverage': <class 'assemblerflow.generator.components.reads_quality_control.TrueCoverage'>, 'fastqc': <class 'assemblerflow.generator.components.reads_quality_control.FastQC'>}

dict: Maps the process ids to the corresponding template interface class wit the format:

{
    "<template_string>": module.TemplateClass
}
class assemblerflow.generator.engine.NextflowGenerator(process_connections, nextflow_file, pipeline_name='assemblerflow', ignore_dependencies=False, auto_dependency=True)[source]

Bases: object

Methods

build() Main pipeline builder
render_pipeline() Write pipeline attributes to json
write_configs(project_root) Wrapper method that writes all configuration files to the pipeline
processes = None

list: Stores the process interfaces in the specified order

_fork_tree = None

dict: A dictionary with the fork tree of the pipeline, which consists on the the paths of each lane. For instance, a single fork with two sinks is represented as: {1: [2,3]}. Subsequent forks are then added sequentially: {1:[2,3], 2:[3,4,5]}. This allows the path upstream of a process in a given lane to be traversed until the start of the pipeline.

lanes = None

int: Stores the number of lanes in the pipelines

nf_file = None

str: Path to file where the pipeline will be generated

pipeline_name = None

str: Name of the pipeline, for customization and help purposes.

template = None

str: String that will harbour the pipeline code

secondary_channels = None

dict: Stores secondary channel links

main_raw_inputs = None

list: Stores the main raw inputs from the user parameters into the first process(es).

secondary_inputs = None

dict: Stores the secondary input channels that may be required by some processes. The key is the params variable and the key is the channel definition for nextflow:

{"genomeSize": "IN_genome_size = Channel.value(params.genomeSize)"}
extra_inputs = None
status_channels = None

list: Stores the status channels from each process

skip_class = None

list: Stores the Process classes that should be skipped when iterating over the processes list.

resources = None

str: Stores the resource directives string for each nextflow process. See NextflowGenerator._get_resources_string().

containers = None

str: Stores the container directives string for each nextflow process. See NextflowGenerator._get_container_string().

params = None

str: Stores the params directives string for the nextflow pipeline. See NextflowGenerator._get_params_string()

user_config = None

str: Stores the user configuration file placeholder. This is an empty configuration file that is only added the first time to a project directory. If the file already exists, it will not overwrite it.

compilers = None

dict: Maps the information about each available compiler process in assemblerflow. The key of each entry is the name/signature of the compiler process. The value is a json/dict object that contains two key:pair values:

  • cls: The reference to the compiler class object.
  • template: The nextflow template file of the process.
static _parse_process_name(name_str)[source]

Parses the process string and returns the process name and its directives

Process strings my contain directive information with the following syntax:

proc_name={'directive':'val'}

This method parses this string and returns the process name as a string and the directives information as a dictionary.

Parameters:
name_str : str

Raw string with process name and, potentially, directive information

Returns:
str

Process name

dict or None

Process directives

_build_connections(process_list, ignore_dependencies, auto_dependency)[source]

Parses the process connections dictionaries into a process list

This method is called upon instantiation of the NextflowGenerator class. Essentially, it sets the main input/output channel names of the processes so that they can be linked correctly.

If a connection between two consecutive process is not possible due to a mismatch in the input/output types, it exits with an error.

_get_process_names(con, pid)[source]

Returns the input/output process names and output process directives

Parameters:
con : dict

Dictionary with the connection information between two processes.

pid : int

Arbitrary and unique process ID.

Returns:
input_name : str

Name of the input process

output_name : str

Name of the output process

output_directives : dict

Parsed directives from the output process

_add_dependency(p, template, inlane, outlane, pid)[source]

Automatically Adds a dependency of a process.

This method adds a template to the process list attribute as a dependency. It will adapt the input lane, output lane and process id of the process that depends on it.

Parameters:
p : Process

Process class that contains the dependency.

template : str

Template name of the dependency.

inlane : int

Input lane.

outlane : int

Output lane.

pid : int

Process ID.

_search_tree_backwards(template, parent_lanes)[source]

Searches the process tree backwards in search of a provided process

The search takes into consideration the provided parent lanes and searches only those

Parameters:
template : str

Name of the process template attribute being searched

parent_lanes : list

List of integers with the parent lanes to be searched

Returns:
bool

Returns True when the template is found. Otherwise returns False.

static _test_connection(parent_process, child_process)[source]

Tests if two processes can be connected by input/output type

Parameters:
parent_process : assemblerflow.generator.process.Process

Process that will be sending output.

child_process : assemblerflow.generator.process.Process

Process that will receive output.

_build_header()[source]

Adds the header template to the master template string

Adds the footer template to the master template string

_update_raw_input(p, sink_channel=None, input_type=None)[source]

Given a process, this method updates the main_raw_inputs attribute with the corresponding raw input channel of that process. The input channel and input type can be overridden if the input_channel and input_type arguments are provided.

Parameters:
p : assemblerflow.generator.Process.Process

Process instance whose raw input will be modified

sink_channel: str

Sets the channel where the raw input will fork into. It overrides the process’s input_channel attribute.

input_type: str

Sets the type of the raw input. It overrides the process’s input_type attribute.

_update_secondary_inputs(p)[source]

Given a process, this method updates the secondary_inputs attribute with the corresponding secondary inputs of that process.

Parameters:
p : assemblerflow.Process.Process
_update_extra_inputs(p)[source]

Given a process, this method updates the extra_inputs attribute with the corresponding extra inputs of that process

Parameters:
p : assemblerflow.Process.Process
_get_fork_tree(lane)[source]

Returns a list with the parent lanes from the provided lane

Parameters:
lane : int

Target lage

Returns:
list

List of the lanes preceding the provided lane.

Parameters:
p
link
_update_secondary_channels(p)[source]

Given a process, this method updates the secondary_channels attribute with the corresponding secondary inputs of that channel.

The rationale of the secondary channels is the following:

  • Start storing any secondary emitting channels, by checking the link_start list attribute of each process. If there are channel names in the link start, it adds to the secondary channels dictionary.
  • Check for secondary receiving channels, by checking the link_end list attribute. If the link name starts with a __ signature, it will created an implicit link with the last process with an output type after the signature. Otherwise, it will check is a corresponding link start already exists in the at least one process upstream of the pipeline and if so, it will update the secondary_channels attribute with the new link.
Parameters:
p : assemblerflow.Process.Process
_set_channels()[source]

Sets the main channels for the pipeline

This method will parse de the processes attribute and perform the following tasks for each process:

  • Sets the input/output channels and main input forks and adds them to the process’s assemblerflow.process.Process._context attribute (See set_channels()).
  • Automatically updates the main input channel of the first process of each lane so that they fork from the user provide parameters (See _update_raw_input()).
  • Check for the presence of secondary inputs and adds them to the secondary_inputs attribute.
  • Check for the presence of secondary channels and adds them to the secondary_channels attribute.

Notes

On the secondary channel setup: With this approach, there can only be one secondary link start for each type of secondary link. For instance, If there are two processes that start a secondary channel for the SIDE_max_len channel, only the last one will be recorded, and all receiving processes will get the channel from the latest process. Secondary channels can only link if the source process if downstream of the sink process in its “forking” path.

_set_init_process()[source]

Sets the main raw inputs and secondary inputs on the init process

This method will fetch the assemblerflow.process.Init process instance and sets the raw input ( assemblerflow.process.Init.set_raw_inputs()) and the secondary inputs (assemblerflow.process.Init.set_secondary_inputs()) for that process. This will handle the connection of the user parameters with channels that are then consumed in the pipeline.

_set_secondary_channels()[source]

Sets the secondary channels for the pipeline

This will iterate over the NextflowGenerator.secondary_channels dictionary that is populated when executing _update_secondary_channels() method.

_set_compiler_channels()[source]

Wrapper method that calls functions related to compiler channels

_set_general_compilers()[source]

Adds compiler channels to the processes attribute.

This method will iterate over the pipeline’s processes and check if any process is feeding channels to a compiler process. If so, that compiler process is added to the pipeline and those channels are linked to the compiler via some operator.

_set_status_channels()[source]

Compiles all status channels for the status compiler process

static _get_resources_string(res_dict, pid)[source]

Returns the nextflow resources string from a dictionary object

If the dictionary has at least on of the resource directives, these will be compiled for each process in the dictionary and returned as a string read for injection in the nextflow config file template.

This dictionary should be:

dict = {"processA": {"cpus": 1, "memory": "4GB"},
        "processB": {"cpus": 2}}
Parameters:
res_dict : dict

Dictionary with the resources for processes.

pid : int

Unique identified of the process

Returns:
str

nextflow config string

static _get_container_string(cont_dict, pid)[source]

Returns the nextflow containers string from a dictionary object

If the dictionary has at least on of the container directives, these will be compiled for each process in the dictionary and returned as a string read for injection in the nextflow config file template.

This dictionary should be:

dict = {"processA": {"container": "asd", "version": "1.0.0"},
        "processB": {"container": "dsd"}}
Parameters:
cont_dict : dict

Dictionary with the containers for processes.

pid : int

Unique identified of the process

Returns:
str

nextflow config string

_get_params_string()[source]

Returns the nextflow params string from a dictionary object.

The params dict should be a set of key:value pairs with the parameter name, and the default parameter value:

self.params = {
    "genomeSize": 2.1,
    "minCoverage": 15
}

The values are then added to the string as they are. For instance, a 2.1 float will appear as param = 2.1 and a "'teste'" string will appear as ``param = 'teste' (Note the string).

Returns:
str

Nextflow params configuration string

_get_params_help()[source]
static _render_config(template, context)[source]
_set_configurations()[source]

This method will iterate over all process in the pipeline and populate the nextflow configuration files with the directives of each process in the pipeline.

render_pipeline()[source]

Write pipeline attributes to json

This function writes the pipeline and their attributes to a json file, that is intended to be read by resources/pipeline_graph.html to render a graphical output showing the DAG.

write_configs(project_root)[source]

Wrapper method that writes all configuration files to the pipeline directory

build()[source]

Main pipeline builder

This method is responsible for building the NextflowGenerator.template attribute that will contain the nextflow code of the pipeline.

First it builds the header, then sets the main channels, the secondary inputs, secondary channels and finally the status channels. When the pipeline is built, is writes the code to a nextflow file.