try:
from generator.process_details import colored_print
import generator.error_handling as eh
from generator import recipes
except ImportError:
from flowcraft.generator.process_details import colored_print
import flowcraft.generator.error_handling as eh
from flowcraft.generator import recipes
from collections import OrderedDict
import sys
import json
import logging
import pkgutil
logger = logging.getLogger("main.{}".format(__name__))
[docs]class InnuendoRecipe:
def __init__(self):
"""Class to build automatic pipelines based on the processes provided.
This class provides the methods to build the most eficient pipeline
based on the processes provided. It automatic creates the
flowcraft pipeline string based on the relationships between the
possible processes.
"""
self.count_forks = 0
"""
int : counts the total possible number of forks
"""
self.forks = []
"""
list : a list with all the possible forks
"""
self.pipeline_string = ""
"""
str : the generated pipeline string
"""
self.process_to_id = {}
"""
dict: key value between the process name and its identifier
"""
self.process_descriptions = {}
[docs] @staticmethod
def validate_pipeline(pipeline_string):
"""Validate pipeline string
Validates the pipeline string by searching for forbidden characters
Parameters
----------
pipeline_string : str
STring with the processes provided
Returns
-------
"""
if "(" in pipeline_string or ")" in pipeline_string or "|" in \
pipeline_string:
logger.error(
colored_print("Please provide a valid task list!", "red_bold")
)
return False
return True
[docs] def build_upstream(self, process_descriptions, task, all_tasks,
task_pipeline,
count_forks, total_tasks, forks):
"""Builds the upstream pipeline of the current process
Checks for the upstream processes to the current process and
adds them to the current pipeline fragment if they were provided in
the process list.
Parameters
----------
process_descriptions : dict
Information of processes input, output and if is forkable
task : str
Current process
all_tasks : list
A list of all provided processes
task_pipeline : list
Current pipeline fragment
count_forks : int
Current number of forks
total_tasks : str
All space separated processes
forks : list
Current forks
Returns
-------
list : resulting pipeline fragment
"""
if task in process_descriptions:
if process_descriptions[task][1] is not None:
if len(process_descriptions[task][1].split("|")) > 1:
local_forks = process_descriptions[task][1].split("|")
# Produces a new pipeline fragment for each forkable
# process
for local_fork in local_forks:
if local_fork in total_tasks:
count_forks += 1
task_pipeline.insert(
0,
process_descriptions[task][1]
)
self.define_pipeline_string(
process_descriptions,
local_fork,
False,
True,
count_forks,
total_tasks,
forks
)
return task_pipeline
else:
# Adds the process to the pipeline fragment in case it is
# provided in the task list
if process_descriptions[task][1] in total_tasks:
task_pipeline.insert(
0,
process_descriptions[task][1].split("|")[0]
)
# Proceeds building upstream until the input for a
# process is None
self.build_upstream(
process_descriptions,
process_descriptions[task][1].split("|")[0],
all_tasks,
task_pipeline,
count_forks,
total_tasks,
forks
)
else:
logger.error(
colored_print("{} not in provided protocols as "
"input for {}".format(
process_descriptions[task][1], task), "red_bold"
)
)
sys.exit()
return task_pipeline
else:
return task_pipeline
[docs] def build_downstream(self, process_descriptions, task, all_tasks,
task_pipeline,
count_forks, total_tasks, forks):
"""Builds the downstream pipeline of the current process
Checks for the downstream processes to the current process and
adds them to the current pipeline fragment.
Parameters
----------
process_descriptions : dict
Information of processes input, output and if is forkable
task : str
Current process
all_tasks : list
A list of all provided processes
task_pipeline : list
Current pipeline fragment
count_forks : int
Current number of forks
total_tasks : str
All space separated processes
forks : list
Current forks
Returns
-------
list : resulting pipeline fragment
"""
if task in process_descriptions:
if process_descriptions[task][2] is not None:
if len(process_descriptions[task][2].split("|")) > 1:
local_forks = process_descriptions[task][2].split("|")
# Adds the process to the pipeline fragment downstream
# and defines a new pipeline fragment for each fork.
# Those will only look for downstream processes
for local_fork in local_forks:
if local_fork in total_tasks:
count_forks += 1
task_pipeline.append(process_descriptions[task][2])
self.define_pipeline_string(
process_descriptions,
local_fork,
False,
True,
count_forks,
total_tasks,
forks
)
return task_pipeline
else:
if process_descriptions[task][2] in total_tasks:
task_pipeline.append(process_descriptions[task][2].split("|")[0])
# Proceeds building downstream until the output for a
# process is None
self.build_downstream(
process_descriptions,
process_descriptions[task][2].split("|")[0],
all_tasks,
task_pipeline,
count_forks,
total_tasks,
forks
)
return task_pipeline
else:
return task_pipeline
[docs] def define_pipeline_string(self, process_descriptions, tasks,
check_upstream,
check_downstream, count_forks, total_tasks,
forks):
"""Builds the possible forks and connections between the provided
processes
This method loops through all the provided tasks and builds the
upstream and downstream pipeline if required. It then returns all
possible forks than need to be merged à posteriori`
Parameters
----------
process_descriptions : dict
Information of processes input, output and if is forkable
tasks : str
Space separated processes
check_upstream : bool
If is to build the upstream pipeline of the current task
check_downstream : bool
If is to build the downstream pipeline of the current task
count_forks : int
Number of current forks
total_tasks : str
All space separated processes
forks : list
Current forks
Returns
-------
list : List with all the possible pipeline forks
"""
tasks_array = tasks.split()
for task_unsplit in tasks_array:
task = task_unsplit.split("=")[0]
if task not in process_descriptions.keys():
logger.error(
colored_print(
"{} not in the possible processes".format(task),
"red_bold"
)
)
sys.exit()
else:
process_split = task_unsplit.split("=")
if len(process_split) > 1:
self.process_to_id[process_split[0]] = process_split[1]
# Only uses the process if it is not already in the possible forks
if not bool([x for x in forks if task in x]) and not bool([y for y in forks if process_descriptions[task][2] in y]):
task_pipeline = []
if task in process_descriptions:
if check_upstream:
task_pipeline = self.build_upstream(
process_descriptions,
task,
tasks_array,
task_pipeline,
count_forks,
total_tasks,
forks
)
task_pipeline.append(task)
if check_downstream:
task_pipeline = self.build_downstream(
process_descriptions,
task,
tasks_array,
task_pipeline,
count_forks,
total_tasks,
forks
)
# Adds the pipeline fragment to the list of possible forks
forks.append(list(OrderedDict.fromkeys(task_pipeline)))
# Checks for task in fork. Case order of input processes is reversed
elif bool([y for y in forks if process_descriptions[task][2] in y]):
for fork in forks:
if task not in fork:
try:
dependent_index = fork.index(process_descriptions[task][2])
fork.insert(dependent_index, task)
except ValueError:
continue
for i in range(0, len(forks)):
for j in range(0, len(forks[i])):
try:
if len(forks[i][j].split("|")) > 1:
forks[i][j] = forks[i][j].split("|")
tmp_fork = []
for s in forks[i][j]:
if s in total_tasks:
tmp_fork.append(s)
forks[i][j] = tmp_fork
except AttributeError as e:
continue
return forks
[docs] def build_pipeline_string(self, forks):
"""Parses, filters and merge all possible pipeline forks into the
final pipeline string
This method checks for shared start and end sections between forks
and merges them according to the shared processes::
[[spades, ...], [skesa, ...], [...,[spades, skesa]]]
-> [..., [[spades, ...], [skesa, ...]]]
Then it defines the pipeline string by replacing the arrays levels
to the flowcraft fork format::
[..., [[spades, ...], [skesa, ...]]]
-> ( ... ( spades ... | skesa ... ) )
Parameters
----------
forks : list
List with all the possible pipeline forks.
Returns
-------
str : String with the pipeline definition used as input for
parse_pipeline
"""
final_forks = []
for i in range(0, len(forks)):
needs_merge = [False, 0, 0, 0, 0, ""]
is_merged = False
for i2 in range(0, len(forks[i])):
for j in range(i, len(forks)):
needs_merge[0] = False
for j2 in range(0, len(forks[j])):
try:
j2_fork = forks[j][j2].split("|")
except AttributeError:
j2_fork = forks[j][j2]
# Gets the indexes of the forks matrix that need to
# be merged
if forks[i][i2] in j2_fork and (i2 == 0 or j2 == 0) and i != j:
needs_merge[0] = True
needs_merge[1] = i
needs_merge[2] = i2
needs_merge[3] = j
needs_merge[4] = j2
needs_merge[5] = forks[i][i2]
if needs_merge[0]:
index_merge_point = forks[needs_merge[3]][-1].index(needs_merge[5])
# Merges the forks. If only one fork is possible,
# that fork is neglected and it merges into a single
# channel.
if needs_merge[2] == 0:
if len(forks[needs_merge[3]][-1]) < 2:
forks[needs_merge[3]] = forks[needs_merge[3]][:-1] + forks[needs_merge[1]][::]
else:
forks[needs_merge[3]][-1][index_merge_point] = forks[needs_merge[1]]
elif needs_merge[4] == 0:
if len(forks[needs_merge[3]][-1]) < 2:
forks[needs_merge[3]] = forks[needs_merge[3]][:-1] + forks[needs_merge[1]][::]
else:
forks[needs_merge[3]][-1][index_merge_point] = forks[needs_merge[1]]
is_merged = True
# Adds forks that dont need merge to the final forks
if needs_merge[0] is not None and not is_merged:
if bool([nf for nf in forks[i] if "|" in nf]):
continue
final_forks.append(forks[i])
if len(final_forks) == 1:
final_forks = str(final_forks[0])
# parses the string array to the flowcraft nomenclature
pipeline_string = " " + str(final_forks)\
.replace("[[", "( ")\
.replace("]]", " )")\
.replace("]", " |")\
.replace(", [", " ")\
.replace("'", "")\
.replace(",", "")\
.replace("[", "")
if pipeline_string[-1] == "|":
pipeline_string = pipeline_string[:-1]
to_search = " {} "
to_replace = " {}={} "
# Replace only names by names + process ids
for key, val in self.process_to_id.items():
# Case only one process in the pipeline
pipeline_string = pipeline_string\
.replace(to_search.format(key),
to_replace.format(key, val))
return pipeline_string
[docs] def run_auto_pipeline(self, tasks):
"""Main method to run the automatic pipeline creation
This method aggregates the functions required to build the pipeline
string that can be used as input for the workflow generator.
Parameters
----------
tasks : str
A string with the space separated tasks to be included in the
pipeline
Returns
-------
str : String with the pipeline definition used as input for
parse_pipeline
"""
self.forks = self.define_pipeline_string(
self.process_descriptions,
tasks,
True,
True,
self.count_forks,
tasks,
self.forks
)
self.pipeline_string = self.build_pipeline_string(self.forks)
return self.pipeline_string
# def get_process_info(self):
# return list(self.process_descriptions.keys())
[docs]class Innuendo(InnuendoRecipe):
"""
Recipe class for the INNUENDO Project. It has all the available in the
platform for quick use of the processes in the scope of the project.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# The description of the processes
# [forkable, input_process, output_process]
self.process_descriptions = {
"reads_download": [False, None,"integrity_coverage|seq_typing|patho_typing"],
"patho_typing": [True, None, None],
"seq_typing": [True, None, None],
"integrity_coverage": [True, None, "fastqc_trimmomatic"],
"fastqc_trimmomatic": [False, "integrity_coverage",
"true_coverage"],
"true_coverage": [False, "fastqc_trimmomatic",
"fastqc"],
"fastqc": [False, "true_coverage", "check_coverage"],
"check_coverage": [False, "fastqc", "spades|skesa"],
"spades": [False, "fastqc_trimmomatic", "process_spades"],
"skesa": [False, "fastqc_trimmomatic", "process_skesa"],
"process_spades": [False, "spades", "assembly_mapping"],
"process_skesa": [False, "skesa", "assembly_mapping"],
"assembly_mapping": [False, "process_spades", "pilon"],
"pilon": [False, "assembly_mapping", "mlst"],
"mlst": [False, "pilon", "abricate|prokka|chewbbaca|sistr"],
"sistr": [True, "mlst", None],
"abricate": [True, "mlst", None],
#"prokka": [True, "mlst", None],
"chewbbaca": [True, "mlst", None]
}
[docs]def brew_innuendo(args):
"""Brews a given list of processes according to the recipe
Parameters
----------
args : argparse.Namespace
The arguments passed through argparser that will be used to check the
the recipe, tasks and brew the process
Returns
-------
str
The final pipeline string, ready for the engine.
list
List of process strings.
"""
# Create recipe class instance
automatic_pipeline = Innuendo()
if not args.tasks:
input_processes = " ".join(
automatic_pipeline.process_descriptions.keys())
else:
input_processes = args.tasks
# Validate the provided pipeline processes
validated = automatic_pipeline.validate_pipeline(input_processes)
if not validated:
sys.exit(1)
# Get the final pipeline string
pipeline_string = automatic_pipeline.run_auto_pipeline(input_processes)
return pipeline_string
[docs]class Recipe:
def __init__(self):
self.pipeline_str = None
"""
str: The raw pipeline string, with no attribute or directives, except
for number indicators for when there are duplicate components.
e.g.: "fastqc trimmomatic spades"
e.g.: "fastqc trimmomatic (spades#1 | spades#2)
"""
self.directives = {}
"""
dict: Dictionary with the parameters and directives for each component
in the pipeline_str attribute. Missing components will be left with
the default parameters and directives.
"""
[docs] def brew(self):
if not hasattr(self, "name"):
raise eh.RecipeError("Recipe class '{}' does not have a 'name' "
"attribute set".format(self.__class__))
if not self.pipeline_str:
raise eh.RecipeError("Recipe with name '{}' does not have a "
"pipeline_str attribute set".format(self.name))
for component, vals in self.directives.items():
params = vals.get("params", None)
directives = vals.get("directives", None)
# Check for component number symbol
if "#" in component:
_component = component.split("#")[0]
else:
_component = component
component_str = self._get_component_str(_component, params,
directives)
self.pipeline_str = self.pipeline_str.replace(component,
component_str)
return self.pipeline_str
@staticmethod
def _get_component_str(component, params=None, directives=None):
""" Generates a component string based on the provided parameters and
directives
Parameters
----------
component : str
Component name
params : dict
Dictionary with parameter information
directives : dict
Dictionary with directives information
Returns
-------
str
Component string with the parameters and directives, ready for
parsing by flowcraft engine
"""
final_directives = {}
if directives:
final_directives = directives
if params:
final_directives["params"] = params
if final_directives:
return "{}={}".format(
component, json.dumps(final_directives, separators=(",", ":")))
else:
return component
[docs]def brew_recipe(recipe_name):
"""Returns a pipeline string from a recipe name.
Parameters
----------
recipe_name : str
Name of the recipe. Must match the name attribute in one of the classes
defined in :mod:`flowcraft.generator.recipes`
Returns
-------
str
Pipeline string ready for parsing and processing by flowcraft engine
"""
# This will iterate over all modules included in the recipes subpackage
# It will return the import class and the module name, algon with the
# correct prefix
prefix = "{}.".format(recipes.__name__)
for importer, modname, _ in pkgutil.iter_modules(recipes.__path__, prefix):
# Import the current module
_module = importer.find_module(modname).load_module(modname)
# Fetch all available classes in module
_recipe_classes = [cls for cls in _module.__dict__.values() if
isinstance(cls, type)]
# Iterate over each Recipe class, and check for a match with the
# provided recipe name.
for cls in _recipe_classes:
# Create instance of class to allow fetching the name attribute
recipe_cls = cls()
if getattr(recipe_cls, "name", None) == recipe_name:
return recipe_cls.brew()
logger.error(
colored_print("Recipe name '{}' does not exist.".format(recipe_name))
)
sys.exit(1)
[docs]def list_recipes(full=False):
"""Method that iterates over all available recipes and prints their
information to the standard output
Parameters
----------
full : bool
If true, it will provide the pipeline string along with the recipe name
"""
logger.info(colored_print(
"\n===== L I S T O F R E C I P E S =====\n",
"green_bold"))
# This will iterate over all modules included in the recipes subpackage
# It will return the import class and the module name, algon with the
# correct prefix
prefix = "{}.".format(recipes.__name__)
for importer, modname, _ in pkgutil.iter_modules(recipes.__path__, prefix):
# Import the current module
_module = importer.find_module(modname).load_module(modname)
# Fetch all available classes in module
_recipe_classes = [cls for cls in _module.__dict__.values() if
isinstance(cls, type)]
# Iterate over each Recipe class, and check for a match with the
# provided recipe name.
for cls in _recipe_classes:
recipe_cls = cls()
if hasattr(recipe_cls, "name"):
logger.info(colored_print("=> {}".format(recipe_cls.name), "blue_bold"))
if full:
logger.info(colored_print("\t {}".format(recipe_cls.__doc__), "purple_bold"))
logger.info(colored_print("Pipeline string: {}\n".format(recipe_cls.pipeline_str), "yellow_bold"))
sys.exit(0)