assemblerflow.generator.engine module¶
-
assemblerflow.generator.engine.
process_map
= {'seq_typing': <class 'assemblerflow.generator.components.typing.SeqTyping'>, 'trimmomatic': <class 'assemblerflow.generator.components.reads_quality_control.Trimmomatic'>, 'abricate': <class 'assemblerflow.generator.components.annotation.Abricate'>, 'reads_download': <class 'assemblerflow.generator.components.downloads.DownloadReads'>, 'chewbbaca': <class 'assemblerflow.generator.components.mlst.Chewbbaca'>, 'fastqc_trimmomatic': <class 'assemblerflow.generator.components.reads_quality_control.FastqcTrimmomatic'>, 'mash_dist': <class 'assemblerflow.generator.components.distance_estimation.PatlasMashDist'>, 'process_spades': <class 'assemblerflow.generator.components.assembly_processing.ProcessSpades'>, 'check_coverage': <class 'assemblerflow.generator.components.reads_quality_control.CheckCoverage'>, 'skesa': <class 'assemblerflow.generator.components.assembly.Skesa'>, 'mash_screen': <class 'assemblerflow.generator.components.distance_estimation.PatlasMashScreen'>, 'mlst': <class 'assemblerflow.generator.components.mlst.Mlst'>, 'spades': <class 'assemblerflow.generator.components.assembly.Spades'>, 'integrity_coverage': <class 'assemblerflow.generator.components.reads_quality_control.IntegrityCoverage'>, 'prokka': <class 'assemblerflow.generator.components.annotation.Prokka'>, 'patho_typing': <class 'assemblerflow.generator.components.typing.PathoTyping'>, 'pilon': <class 'assemblerflow.generator.components.assembly_processing.Pilon'>, 'mapping_patlas': <class 'assemblerflow.generator.components.patlas_mapping.PatlasMapping'>, 'assembly_mapping': <class 'assemblerflow.generator.components.assembly_processing.AssemblyMapping'>, 'process_skesa': <class 'assemblerflow.generator.components.assembly_processing.ProcessSkesa'>, 'true_coverage': <class 'assemblerflow.generator.components.reads_quality_control.TrueCoverage'>, 'fastqc': <class 'assemblerflow.generator.components.reads_quality_control.FastQC'>}¶ dict: Maps the process ids to the corresponding template interface class wit the format:
{ "<template_string>": module.TemplateClass }
-
class
assemblerflow.generator.engine.
NextflowGenerator
(process_connections, nextflow_file, pipeline_name='assemblerflow', ignore_dependencies=False, auto_dependency=True)[source]¶ Bases:
object
Methods
build
()Main pipeline builder render_pipeline
()Write pipeline attributes to json write_configs
(project_root)Wrapper method that writes all configuration files to the pipeline -
processes
= None¶ list: Stores the process interfaces in the specified order
-
_fork_tree
= None¶ dict: A dictionary with the fork tree of the pipeline, which consists on the the paths of each lane. For instance, a single fork with two sinks is represented as: {1: [2,3]}. Subsequent forks are then added sequentially: {1:[2,3], 2:[3,4,5]}. This allows the path upstream of a process in a given lane to be traversed until the start of the pipeline.
-
lanes
= None¶ int: Stores the number of lanes in the pipelines
-
nf_file
= None¶ str: Path to file where the pipeline will be generated
-
pipeline_name
= None¶ str: Name of the pipeline, for customization and help purposes.
-
template
= None¶ str: String that will harbour the pipeline code
-
secondary_channels
= None¶ dict: Stores secondary channel links
-
main_raw_inputs
= None¶ list: Stores the main raw inputs from the user parameters into the first process(es).
-
secondary_inputs
= None¶ dict: Stores the secondary input channels that may be required by some processes. The key is the params variable and the key is the channel definition for nextflow:
{"genomeSize": "IN_genome_size = Channel.value(params.genomeSize)"}
-
extra_inputs
= None¶
-
status_channels
= None¶ list: Stores the status channels from each process
-
skip_class
= None¶ list: Stores the Process classes that should be skipped when iterating over the
processes
list.
-
resources
= None¶ str: Stores the resource directives string for each nextflow process. See
NextflowGenerator._get_resources_string()
.
-
containers
= None¶ str: Stores the container directives string for each nextflow process. See
NextflowGenerator._get_container_string()
.
-
params
= None¶ str: Stores the params directives string for the nextflow pipeline. See
NextflowGenerator._get_params_string()
-
user_config
= None¶ str: Stores the user configuration file placeholder. This is an empty configuration file that is only added the first time to a project directory. If the file already exists, it will not overwrite it.
-
compilers
= None¶ dict: Maps the information about each available compiler process in assemblerflow. The key of each entry is the name/signature of the compiler process. The value is a json/dict object that contains two key:pair values:
cls
: The reference to the compiler class object.template
: The nextflow template file of the process.
-
static
_parse_process_name
(name_str)[source]¶ Parses the process string and returns the process name and its directives
Process strings my contain directive information with the following syntax:
proc_name={'directive':'val'}
This method parses this string and returns the process name as a string and the directives information as a dictionary.
Parameters: - name_str : str
Raw string with process name and, potentially, directive information
Returns: - str
Process name
- dict or None
Process directives
-
_build_connections
(process_list, ignore_dependencies, auto_dependency)[source]¶ Parses the process connections dictionaries into a process list
This method is called upon instantiation of the NextflowGenerator class. Essentially, it sets the main input/output channel names of the processes so that they can be linked correctly.
If a connection between two consecutive process is not possible due to a mismatch in the input/output types, it exits with an error.
-
_get_process_names
(con, pid)[source]¶ Returns the input/output process names and output process directives
Parameters: - con : dict
Dictionary with the connection information between two processes.
- pid : int
Arbitrary and unique process ID.
Returns: - input_name : str
Name of the input process
- output_name : str
Name of the output process
- output_directives : dict
Parsed directives from the output process
-
_add_dependency
(p, template, inlane, outlane, pid)[source]¶ Automatically Adds a dependency of a process.
This method adds a template to the process list attribute as a dependency. It will adapt the input lane, output lane and process id of the process that depends on it.
Parameters: - p : Process
Process class that contains the dependency.
- template : str
Template name of the dependency.
- inlane : int
Input lane.
- outlane : int
Output lane.
- pid : int
Process ID.
-
_search_tree_backwards
(template, parent_lanes)[source]¶ Searches the process tree backwards in search of a provided process
The search takes into consideration the provided parent lanes and searches only those
Parameters: - template : str
Name of the process template attribute being searched
- parent_lanes : list
List of integers with the parent lanes to be searched
Returns: - bool
Returns True when the template is found. Otherwise returns False.
-
static
_test_connection
(parent_process, child_process)[source]¶ Tests if two processes can be connected by input/output type
Parameters: - parent_process : assemblerflow.generator.process.Process
Process that will be sending output.
- child_process : assemblerflow.generator.process.Process
Process that will receive output.
Adds the footer template to the master template string
-
_update_raw_input
(p, sink_channel=None, input_type=None)[source]¶ Given a process, this method updates the
main_raw_inputs
attribute with the corresponding raw input channel of that process. The input channel and input type can be overridden if the input_channel and input_type arguments are provided.Parameters: - p : assemblerflow.generator.Process.Process
Process instance whose raw input will be modified
- sink_channel: str
Sets the channel where the raw input will fork into. It overrides the process’s input_channel attribute.
- input_type: str
Sets the type of the raw input. It overrides the process’s input_type attribute.
-
_update_secondary_inputs
(p)[source]¶ Given a process, this method updates the
secondary_inputs
attribute with the corresponding secondary inputs of that process.Parameters: - p : assemblerflow.Process.Process
-
_update_extra_inputs
(p)[source]¶ Given a process, this method updates the
extra_inputs
attribute with the corresponding extra inputs of that processParameters: - p : assemblerflow.Process.Process
-
_get_fork_tree
(lane)[source]¶ Returns a list with the parent lanes from the provided lane
Parameters: - lane : int
Target lage
Returns: - list
List of the lanes preceding the provided lane.
-
_update_secondary_channels
(p)[source]¶ Given a process, this method updates the
secondary_channels
attribute with the corresponding secondary inputs of that channel.The rationale of the secondary channels is the following:
- Start storing any secondary emitting channels, by checking the link_start list attribute of each process. If there are channel names in the link start, it adds to the secondary channels dictionary.
- Check for secondary receiving channels, by checking the
link_end list attribute. If the link name starts with a
__ signature, it will created an implicit link with the last
process with an output type after the signature. Otherwise,
it will check is a corresponding link start already exists in
the at least one process upstream of the pipeline and if so,
it will update the
secondary_channels
attribute with the new link.
Parameters: - p : assemblerflow.Process.Process
-
_set_channels
()[source]¶ Sets the main channels for the pipeline
This method will parse de the
processes
attribute and perform the following tasks for each process:- Sets the input/output channels and main input forks and adds
them to the process’s
assemblerflow.process.Process._context
attribute (Seeset_channels()
). - Automatically updates the main input channel of the first
process of each lane so that they fork from the user provide
parameters (See
_update_raw_input()
). - Check for the presence of secondary inputs and adds them to the
secondary_inputs
attribute. - Check for the presence of secondary channels and adds them to the
secondary_channels
attribute.
Notes
On the secondary channel setup: With this approach, there can only be one secondary link start for each type of secondary link. For instance, If there are two processes that start a secondary channel for the
SIDE_max_len
channel, only the last one will be recorded, and all receiving processes will get the channel from the latest process. Secondary channels can only link if the source process if downstream of the sink process in its “forking” path.- Sets the input/output channels and main input forks and adds
them to the process’s
-
_set_init_process
()[source]¶ Sets the main raw inputs and secondary inputs on the init process
This method will fetch the
assemblerflow.process.Init
process instance and sets the raw input (assemblerflow.process.Init.set_raw_inputs()
) and the secondary inputs (assemblerflow.process.Init.set_secondary_inputs()
) for that process. This will handle the connection of the user parameters with channels that are then consumed in the pipeline.
-
_set_secondary_channels
()[source]¶ Sets the secondary channels for the pipeline
This will iterate over the
NextflowGenerator.secondary_channels
dictionary that is populated when executing_update_secondary_channels()
method.
-
_set_general_compilers
()[source]¶ Adds compiler channels to the
processes
attribute.This method will iterate over the pipeline’s processes and check if any process is feeding channels to a compiler process. If so, that compiler process is added to the pipeline and those channels are linked to the compiler via some operator.
-
static
_get_resources_string
(res_dict, pid)[source]¶ Returns the nextflow resources string from a dictionary object
If the dictionary has at least on of the resource directives, these will be compiled for each process in the dictionary and returned as a string read for injection in the nextflow config file template.
This dictionary should be:
dict = {"processA": {"cpus": 1, "memory": "4GB"}, "processB": {"cpus": 2}}
Parameters: - res_dict : dict
Dictionary with the resources for processes.
- pid : int
Unique identified of the process
Returns: - str
nextflow config string
-
static
_get_container_string
(cont_dict, pid)[source]¶ Returns the nextflow containers string from a dictionary object
If the dictionary has at least on of the container directives, these will be compiled for each process in the dictionary and returned as a string read for injection in the nextflow config file template.
This dictionary should be:
dict = {"processA": {"container": "asd", "version": "1.0.0"}, "processB": {"container": "dsd"}}
Parameters: - cont_dict : dict
Dictionary with the containers for processes.
- pid : int
Unique identified of the process
Returns: - str
nextflow config string
-
_get_params_string
()[source]¶ Returns the nextflow params string from a dictionary object.
The params dict should be a set of key:value pairs with the parameter name, and the default parameter value:
self.params = { "genomeSize": 2.1, "minCoverage": 15 }
The values are then added to the string as they are. For instance, a
2.1
float will appear asparam = 2.1
and a"'teste'" string will appear as ``param = 'teste'
(Note the string).Returns: - str
Nextflow params configuration string
-
_set_configurations
()[source]¶ This method will iterate over all process in the pipeline and populate the nextflow configuration files with the directives of each process in the pipeline.
-
render_pipeline
()[source]¶ Write pipeline attributes to json
This function writes the pipeline and their attributes to a json file, that is intended to be read by resources/pipeline_graph.html to render a graphical output showing the DAG.
-
write_configs
(project_root)[source]¶ Wrapper method that writes all configuration files to the pipeline directory
-
build
()[source]¶ Main pipeline builder
This method is responsible for building the
NextflowGenerator.template
attribute that will contain the nextflow code of the pipeline.First it builds the header, then sets the main channels, the secondary inputs, secondary channels and finally the status channels. When the pipeline is built, is writes the code to a nextflow file.
-