flowcraft.generator.process module¶
-
class
flowcraft.generator.process.
Process
(template)[source]¶ Bases:
object
Main interface for basic process functionality
The
Process
class is intended to be inherited by specific process classes (e.g.,IntegrityCoverage
) and provides the basic functionality to build the channels and links between processes.Child classes are expected to inherit the
__init__
execution, which basically means that at least, the child must be defined as:class ChildProcess(Process): def__init__(self, **kwargs): super().__init__(**kwargs)
This ensures that when the
ChildProcess
class is instantiated, it automatically sets the attributes of the parent class.This also means that child processes must be instantiated providing information on the process type and jinja2 template with the nextflow code.
Parameters: - template : str
Name of the jinja2 template with the nextflow code for that process. Templates are stored in
generator/templates
.
Attributes: template_str
Class property that returns a populated template string
Methods
get_user_channel
(input_channel[, input_type])Returns the main raw channel for the process render
(template, context)Wrapper to the jinja2 render method from a template file set_channels
(**kwargs)General purpose method that sets the main channels set_main_channel_names
(input_suffix, …)Sets the main channel names based on the provide input and output channel suffixes. set_param_id
(param_id)Sets the param_id for the process, which will be used to render the template. set_secondary_channel
(source, channel_list)General purpose method for setting a secondary channel update_attributes
(attr_dict)Updates the directives attribute from a dictionary object. update_main_forks
(sink)Updates the forks attribute with the sink channel destination update_main_input -
RAW_MAPPING
= {'accessions': {'channel': 'IN_accessions_raw', 'channel_str': 'Channel.fromPath(params.{0}).ifEmpty {{ exit 1, "No accessions file provided with path:\'${{params.{0}}}\'" }}', 'checks': 'if (!params.{0}){{ exit 1, "\'{0}\' parameter missing" }}\n', 'default_value': 'null', 'description': 'Path file with accessions, one perline. (default: $params.fastq)', 'params': 'accessions'}, 'fasta': {'channel': 'IN_fasta_raw', 'channel_str': 'Channel.fromPath(params.{0}).map{{ it -> file(it).exists() ? [it.toString().tokenize(\'/\').last().tokenize(\'.\')[0..-2].join(\'.\'), it] : null }}.ifEmpty {{ exit 1, "No fasta files provided with pattern:\'${{params.{0}}}\'" }}', 'checks': 'if (params.{0} instanceof Boolean){{exit 1, "\'{0}\' must be a path pattern. Provide value:\'$params.{0}\'"}}\nif (!params.{0}){{ exit 1, "\'{0}\' parameter missing"}}', 'default_value': "'fasta/*.fasta'", 'description': 'Path fasta files. (default: $params.fastq)', 'params': 'fasta'}, 'fastq': {'channel': 'IN_fastq_raw', 'channel_str': 'Channel.fromFilePairs(params.{0}).ifEmpty {{ exit 1, "No fastq files provided with pattern:\'${{params.{0}}}\'" }}', 'checks': 'if (params.{0} instanceof Boolean){{exit 1, "\'{0}\' must be a path pattern. Provide value:\'$params.{0}\'"}}\nif (!params.{0}){{ exit 1, "\'{0}\' parameter missing"}}', 'default_value': "'fastq/*_{1,2}.*'", 'description': 'Path expression to paired-end fastq files. (default: $params.fastq)', 'params': 'fastq'}}¶ dict: Contains the mapping between the
Process.input_type
attribute and the corresponding nextflow parameter and main channel definition, e.g.:"fastq" : { "params": "fastq", "channel: "<channel> }
-
pid
= None¶ int: Process ID number that represents the order and position in the generated pipeline
-
template
= None¶ str: Template name for the current process. This string will be used to fetch the file containing the corresponding jinja2 template in the
_set_template()
method
-
input_type
= None¶ str: Type of expected input data. Used to verify the connection between two processes is viable.
-
output_type
= None¶ str: Type of output data. Used to verify the connection between two processes is viable.
-
ignore_type
= None¶ boolean: If True, this process will ignore the input/output type requirements. This attribute is set to True for terminal singleton forks in the pipeline.
-
ignore_pid
= None¶ boolean: If True, this process will not make the pid advance. This is used for terminal forks before the end of the pipeline.
-
dependencies
= None¶ list: Contains the dependencies of the current process in the form of the
Process.template
attribute (e.g., [fastqc
])
-
input_channel
= None¶ str: Place holder of the main input channel for the current process. This attribute can change dynamically depending on the forks and secondary channels in the final pipeline.
-
output_channel
= None¶ str: Place holder of the main output channel for the current process. This attribute can change dynamically depending on the forks and secondary channels in the final pipeline.
-
input_user_channel
= None¶ dict: Stores a dictionary of two key:value pairs containing the raw input channel for the process. This is automatically
determined by theinput_type
attribute, and will- fetch the information that is mapped in the
RAW_MAPPING
- variable. It will only be used by the first process(es) defined in a pipeline.
- fetch the information that is mapped in the
-
link_start
= None¶ list: List of strings with the starting points for secondary channels. When building the pipeline, these strings will be matched with equal strings in the
link_end
attribute of other Processes.
-
link_end
= None¶ list: List of dictionaries containing the a string of the ending point for a secondary channel. Each dictionary should contain at least two key/vals:
{"link": <link string>, "alias":<string for template>}
-
status_channels
= None¶ list: Name of the status channels produced by the process. By default, it sets a single status channel. If more than one status channels are required for the process, list each one in this attribute (e.g.,
FastQC.status_channels
)
-
status_strs
= None¶ str: Name of the status channel for the current process. These strings will be provided to the StatusCompiler process to collect and compile status reports
-
forks
= None¶ list: List of strings with the literal definition of the forks for the current process, ready to be added to the template string.
-
main_forks
= None¶ list: List of the channels onto which the main output should be forked into. They will be automatically added to the
main_forks
attribute when setting the secondary channels
-
secondary_inputs
= None¶ list: List of dictionaries with secondary input channels from nextflow parameters. This dictionary should contain two key:value pairs with the
params
key, containing the parameter name, and thechannel
key, containing the nextflow channel definition:{ "params": "pathoSpecies", "channel": "IN_pathoSpecies = Channel .value(params.pathoSpecies)" }
-
extra_input
= None¶ str: with the name of the params that will be used to provide extra input into the process. This extra input will be mixed with the main input channel using nextflow’s
mix
operator. Its channel will be defined at the start of the pipeline, based on thechannel_str
key of theRAW_MAPPING
for the corresponding input type.
-
params
= None¶ dict: Maps the parameter names to the corresponding default values.
-
param_id
= None¶ str: The parameter id suffix that will be added to each parameter. In case it is empty, the multiple identical parameters in different components will be merged.
-
directives
= None¶ dict: Specifies the directives (cpus, memory, container) for each nextflow process in the template. If specified, this directives will be added to the nextflow configuration file. Otherwise, the default values for cpus and memory will be used. In the case of containers, they will not run inside any container.
- The current supported directives are:
- cpus
- memory
- container
- container tag/version
An example of directives for two process is as follows:
self.directives = { "processA": {"cpus": 1, "memory": "1GB"}, "processB": {"memory": "5GB", "container": "my/image", "version": "0.5.0"} }
-
compiler
= None¶ dict: Specifies channels from the current process that are received by a compiler process. Each key in this dictionary should match a compiler process key in
compilers
. The value should be a list of the channels that will be fed to the compiler process:self.compiler["patlas_consensus"] = ["mashScreenOutputChannel"]
-
set_main_channel_names
(input_suffix, output_suffix, lane)[source]¶ Sets the main channel names based on the provide input and output channel suffixes. This is performed when connecting processes.
Parameters: - input_suffix : str
Suffix added to the input channel. Should be based on the lane and an arbitrary unique id
- output_suffix : str
Suffix added to the output channel. Should be based on the lane and an arbitrary unique id
- lane : int
Sets the lane of the process.
-
set_param_id
(param_id)[source]¶ Sets the param_id for the process, which will be used to render the template.
Parameters: - param_id : str
The
param_id
attribute of the process.
-
get_user_channel
(input_channel, input_type=None)[source]¶ Returns the main raw channel for the process
Provided with at least a channel name, this method returns the raw channel name and specification (the nextflow string definition) for the process. By default, it will fork from the raw input of the process’
input_type
attribute. However, this behaviour can be overridden by providing theinput_type
argument.If the specified or inferred input type exists in the
RAW_MAPPING
dictionary, the channel info dictionary will be retrieved along with the specified input channel. Otherwise, it will return None.An example of the returned dictionary is:
{"input_channel": "myChannel", "params": "fastq", "channel": "IN_fastq_raw", "channel_str":"IN_fastq_raw = Channel.fromFilePairs(params.fastq)" }
Returns: - dict or None
Dictionary with the complete raw channel info. None if no channel is found.
-
static
render
(template, context)[source]¶ Wrapper to the jinja2 render method from a template file
Parameters: - template : str
Path to template file.
- context : dict
Dictionary with kwargs context to populate the template
-
template_str
¶ Class property that returns a populated template string
This property allows the template of a particular process to be dynamically generated and returned when doing
Process.template_str
.Returns: - x : str
String with the complete and populated process template
-
set_channels
(**kwargs)[source]¶ General purpose method that sets the main channels
This method will take a variable number of keyword arguments to set the
Process._context
attribute with the information on the main channels for the process. This is done by appending the process ID (Process.pid
) attribute to the input, output and status channel prefix strings. In the output channel, the process ID is incremented by 1 to allow the connection with the channel in the next process.The
**kwargs
system for setting theProcess._context
attribute also provides additional flexibility. In this way, individual processes can provide additional information not covered in this method, without changing it.Parameters: - kwargs : dict
Dictionary with the keyword arguments for setting up the template context
-
update_main_forks
(sink)[source]¶ Updates the forks attribute with the sink channel destination
Parameters: - sink : str
Channel onto which the main input will be forked to
-
set_secondary_channel
(source, channel_list)[source]¶ General purpose method for setting a secondary channel
This method allows a given source channel to be forked into one or more channels and sets those forks in the
Process.forks
attribute. Both the source and the channels in thechannel_list
argument must be the final channel strings, which means that this method should be called only after setting the main channels.If the source is not a main channel, this will simply create a fork or set for every channel in the
channel_list
argument list:SOURCE_CHANNEL_1.into{SINK_1;SINK_2}
If the source is a main channel, this will apply some changes to the output channel of the process, to avoid overlapping main output channels. For instance, forking the main output channel for process 2 would create a
MAIN_2.into{...}
. The issue here is that theMAIN_2
channel is expected as the input of the next process, but now is being used to create the fork. To solve this issue, the output channel is modified into_MAIN_2
, and the fork is set to the channels provided channels plus theMAIN_2
channel:_MAIN_2.into{MAIN_2;MAIN_5;...}
Parameters: - source : str
String with the name of the source channel
- channel_list : list
List of channels that will receive a fork of the secondary channel
-
update_attributes
(attr_dict)[source]¶ Updates the directives attribute from a dictionary object.
This will only update the directives for processes that have been defined in the subclass.
Parameters: - attr_dict : dict
Dictionary containing the attributes that will be used to update the process attributes and/or directives.
-
class
flowcraft.generator.process.
Compiler
(**kwargs)[source]¶ Bases:
flowcraft.generator.process.Process
Extends the Process methods to status-type processes
Attributes: template_str
Class property that returns a populated template string
Methods
get_user_channel
(input_channel[, input_type])Returns the main raw channel for the process render
(template, context)Wrapper to the jinja2 render method from a template file set_channels
(**kwargs)General purpose method that sets the main channels set_compiler_channels
(channel_list[, operator])General method for setting the input channels for the status process set_main_channel_names
(input_suffix, …)Sets the main channel names based on the provide input and output channel suffixes. set_param_id
(param_id)Sets the param_id for the process, which will be used to render the template. set_secondary_channel
(source, channel_list)General purpose method for setting a secondary channel update_attributes
(attr_dict)Updates the directives attribute from a dictionary object. update_main_forks
(sink)Updates the forks attribute with the sink channel destination update_main_input -
set_compiler_channels
(channel_list, operator='mix')[source]¶ General method for setting the input channels for the status process
Given a list of status channels that are gathered during the pipeline construction, this method will automatically set the input channel for the status process. This makes use of the
mix
channel operator of nextflow for multiple channels:STATUS_1.mix(STATUS_2,STATUS_3,...)
This will set the
status_channels
key for the_context
attribute of the process.Parameters: - channel_list : list
List of strings with the final name of the status channels
- operator : str
Specifies the operator used to join the compiler channels. Available options are ‘mix’and ‘join’.
-
class
flowcraft.generator.process.
Init
(**kwargs)[source]¶ Bases:
flowcraft.generator.process.Process
Attributes: template_str
Class property that returns a populated template string
Methods
get_user_channel
(input_channel[, input_type])Returns the main raw channel for the process render
(template, context)Wrapper to the jinja2 render method from a template file set_channels
(**kwargs)General purpose method that sets the main channels set_extra_inputs
(channel_dict)Sets the initial definition of the extra input channels. set_main_channel_names
(input_suffix, …)Sets the main channel names based on the provide input and output channel suffixes. set_param_id
(param_id)Sets the param_id for the process, which will be used to render the template. set_raw_inputs
(raw_input)Sets the main input channels of the pipeline and their forks. set_secondary_channel
(source, channel_list)General purpose method for setting a secondary channel set_secondary_inputs
(channel_dict)Adds secondary inputs to the start of the pipeline. update_attributes
(attr_dict)Updates the directives attribute from a dictionary object. update_main_forks
(sink)Updates the forks attribute with the sink channel destination update_main_input -
set_raw_inputs
(raw_input)[source]¶ Sets the main input channels of the pipeline and their forks.
The
raw_input
dictionary input should contain one entry for each input type (fastq, fasta, etc). The corresponding value should be a dictionary/json with the following key:values:channel
: Name of the raw input channel (e.g.: channel1)channel_str
: The nextflow definition of the channel and- eventual checks (e.g.: channel1 = Channel.fromPath(param))
raw_forks
: A list of channels to which the channel name will for to.
Each new type of input parameter is automatically added to the
params
attribute, so that they are automatically collected for the pipeline description and help.Parameters: - raw_input : dict
Contains an entry for each input type with the channel name, channel string and forks.
-
set_secondary_inputs
(channel_dict)[source]¶ Adds secondary inputs to the start of the pipeline.
This channels are inserted into the pipeline file as they are provided in the values of the argument.
Parameters: - channel_dict : dict
Each entry should be <parameter>: <channel string>.
-
set_extra_inputs
(channel_dict)[source]¶ Sets the initial definition of the extra input channels.
The
channel_dict
argument should contain the input type and destination channel of each parameter (which is the key):channel_dict = { "param1": { "input_type": "fasta" "channels": ["abricate_2_3", "chewbbaca_3_4"] } }
Parameters: - channel_dict : dict
Dictionary with the extra_input parameter as key, and a dictionary as a value with the input_type and destination channels
-
class
flowcraft.generator.process.
StatusCompiler
(**kwargs)[source]¶ Bases:
flowcraft.generator.process.Compiler
Status compiler process template interface
This special process receives the status channels from all processes in the generated pipeline.
Attributes: template_str
Class property that returns a populated template string
Methods
get_user_channel
(input_channel[, input_type])Returns the main raw channel for the process render
(template, context)Wrapper to the jinja2 render method from a template file set_channels
(**kwargs)General purpose method that sets the main channels set_compiler_channels
(channel_list[, operator])General method for setting the input channels for the status process set_main_channel_names
(input_suffix, …)Sets the main channel names based on the provide input and output channel suffixes. set_param_id
(param_id)Sets the param_id for the process, which will be used to render the template. set_secondary_channel
(source, channel_list)General purpose method for setting a secondary channel update_attributes
(attr_dict)Updates the directives attribute from a dictionary object. update_main_forks
(sink)Updates the forks attribute with the sink channel destination update_main_input
-
class
flowcraft.generator.process.
ReportCompiler
(**kwargs)[source]¶ Bases:
flowcraft.generator.process.Compiler
Reports compiler process template interface
This special process receives the report channels from all processes in the generated pipeline.
Attributes: template_str
Class property that returns a populated template string
Methods
get_user_channel
(input_channel[, input_type])Returns the main raw channel for the process render
(template, context)Wrapper to the jinja2 render method from a template file set_channels
(**kwargs)General purpose method that sets the main channels set_compiler_channels
(channel_list[, operator])General method for setting the input channels for the status process set_main_channel_names
(input_suffix, …)Sets the main channel names based on the provide input and output channel suffixes. set_param_id
(param_id)Sets the param_id for the process, which will be used to render the template. set_secondary_channel
(source, channel_list)General purpose method for setting a secondary channel update_attributes
(attr_dict)Updates the directives attribute from a dictionary object. update_main_forks
(sink)Updates the forks attribute with the sink channel destination update_main_input
-
class
flowcraft.generator.process.
PatlasConsensus
(**kwargs)[source]¶ Bases:
flowcraft.generator.process.Compiler
Patlas consensus compiler process template interface
This special process receives the channels associated with the
patlas_consensus
key.Attributes: template_str
Class property that returns a populated template string
Methods
get_user_channel
(input_channel[, input_type])Returns the main raw channel for the process render
(template, context)Wrapper to the jinja2 render method from a template file set_channels
(**kwargs)General purpose method that sets the main channels set_compiler_channels
(channel_list[, operator])General method for setting the input channels for the status process set_main_channel_names
(input_suffix, …)Sets the main channel names based on the provide input and output channel suffixes. set_param_id
(param_id)Sets the param_id for the process, which will be used to render the template. set_secondary_channel
(source, channel_list)General purpose method for setting a secondary channel update_attributes
(attr_dict)Updates the directives attribute from a dictionary object. update_main_forks
(sink)Updates the forks attribute with the sink channel destination update_main_input