Source code for flowcraft.generator.pipeline_parser

import os
import logging
import re
from difflib import SequenceMatcher

try:
    from generator.error_handling import SanityError
    from generator.process_details import colored_print
except ImportError:
    from flowcraft.generator.error_handling import SanityError
    from flowcraft.generator.process_details import colored_print

logger = logging.getLogger("main.{}".format(__name__))

# Set the tokens used for the main syntax
# Token signaling the start of a fork
FORK_TOKEN = "("
# Token separating different lanes from a fork
LANE_TOKEN = "|"
# Token that closes a fork
CLOSE_TOKEN = ")"


[docs]def guess_process(query_str, process_map): """ Function to guess processes based on strings that are not available in process_map. If the string has typos and is somewhat similar (50%) to any process available in flowcraft it will print info to the terminal, suggesting the most similar processes available in flowcraft. Parameters ---------- query_str: str The string of the process with potential typos process_map: The dictionary that contains all the available processes """ save_list = [] # loops between the processes available in process_map for process in process_map: similarity = SequenceMatcher(None, process, query_str) # checks if similarity between the process and the query string is # higher than 50% if similarity.ratio() > 0.5: save_list.append(process) # checks if any process is stored in save_list if save_list: logger.info(colored_print( "Maybe you meant:\n\t{}".format("\n\t".join(save_list)), "white")) logger.info(colored_print("Hint: check the available processes by using " "the '-l' or '-L' flag.", "white"))
[docs]def remove_inner_forks(text): """Recursively removes nested brackets This function is used to remove nested brackets from fork strings using regular expressions Parameters ---------- text: str The string that contains brackets with inner forks to be removed Returns ------- text: str the string with only the processes that are not in inner forks, thus the processes that belong to a given fork. """ n = 1 # run at least once for one level of fork # Then this loop assures that all brackets will get removed in a nested # structure while n: # this removes non-nested brackets text, n = re.subn(r'\([^()]*\)', '', text) return text
[docs]def empty_tasks(p_string): """ Function to check if pipeline string is empty or has an empty string Parameters ---------- p_string: str String with the definition of the pipeline, e.g.:: 'processA processB processC(ProcessD | ProcessE)' """ if p_string.strip() == "": raise SanityError("'-t' parameter received an empty string or " "an empty file.")
[docs]def brackets_but_no_lanes(p_string): """ Function to check if a LANE_TOKEN is provided but no fork is initiated. Parameters ---------- p_string: str String with the definition of the pipeline, e.g.:: 'processA processB processC(ProcessD | ProcessE)' """ if "|" in p_string and "(" not in p_string: raise SanityError("No fork initiation character '(' was " "provided but there is a fork lane separator " "character '|'")
[docs]def brackets_insanity_check(p_string): """ This function performs a check for different number of '(' and ')' characters, which indicates that some forks are poorly constructed. Parameters ---------- p_string: str String with the definition of the pipeline, e.g.:: 'processA processB processC(ProcessD | ProcessE)' """ if p_string.count(FORK_TOKEN) != p_string.count(CLOSE_TOKEN): # get the number of each type of bracket and state the one that has a # higher value dict_values = { FORK_TOKEN: p_string.count(FORK_TOKEN), CLOSE_TOKEN: p_string.count(CLOSE_TOKEN) } max_bracket = max(dict_values, key=dict_values.get) raise SanityError( "A different number of '(' and ')' was specified. There are " "{} extra '{}'. The number of '(' and ')'should be equal.".format( str(abs( p_string.count(FORK_TOKEN) - p_string.count(CLOSE_TOKEN))), max_bracket))
[docs]def lane_char_insanity_check(p_string): """ This function performs a sanity check for multiple '|' character between two processes. Parameters ---------- p_string: str String with the definition of the pipeline, e.g.:: 'processA processB processC(ProcessD | ProcessE)' """ if LANE_TOKEN + LANE_TOKEN in p_string: raise SanityError("Duplicated fork separator character '|'.")
[docs]def final_char_insanity_check(p_string): """ This function checks if lane token is the last element of the pipeline string. Parameters ---------- p_string: str String with the definition of the pipeline, e.g.:: 'processA processB processC(ProcessD | ProcessE)' """ # Check if last character of string is a LANE_TOKEN if p_string.endswith(LANE_TOKEN): raise SanityError("Fork separator character '|' cannot be the " "last element of pipeline string")
[docs]def fork_procs_insanity_check(p_string): """ This function checks if the pipeline string contains a process between the fork start token or end token and the separator (lane) token. Checks for the absence of processes in one of the branches of the fork ['|)' and '(|'] and for the existence of a process before starting a fork (in an inner fork) ['|(']. Parameters ---------- p_string: str String with the definition of the pipeline, e.g.:: 'processA processB processC(ProcessD | ProcessE)' """ # Check for the absence of processes in one of the branches of the fork # ['|)' and '(|'] and for the existence of a process before starting a fork # (in an inner fork) ['|(']. if FORK_TOKEN + LANE_TOKEN in p_string or \ LANE_TOKEN + CLOSE_TOKEN in p_string or \ LANE_TOKEN + FORK_TOKEN in p_string: raise SanityError("There must be a process between the fork " "start character '(' or end ')' and the separator of " "processes character '|'")
[docs]def start_proc_insanity_check(p_string): """ This function checks if there is a starting process after the beginning of each fork. It checks for duplicated start tokens ['((']. Parameters ---------- p_string: str String with the definition of the pipeline, e.g.:: 'processA processB processC(ProcessD | ProcessE)' """ if FORK_TOKEN + FORK_TOKEN in p_string: raise SanityError("There must be a starting process after the " "fork before adding a new fork. E.g: proc1 ( proc2.1 " "(proc3.1 | proc3.2) | proc 2.2 )")
[docs]def late_proc_insanity_check(p_string): """ This function checks if there are processes after the close token. It searches for everything that isn't "|" or ")" after a ")" token. Parameters ---------- p_string: str String with the definition of the pipeline, e.g.:: 'processA processB processC(ProcessD | ProcessE)' """ if re.search('\{}[^|)]'.format(CLOSE_TOKEN), p_string): raise SanityError("After a fork it is not allowed to have any " "alphanumeric value.")
[docs]def inner_fork_insanity_checks(pipeline_string): """ This function performs two sanity checks in the pipeline string. The first check, assures that each fork contains a lane token '|', while the second check looks for duplicated processes within the same fork. Parameters ---------- pipeline_string: str String with the definition of the pipeline, e.g.:: 'processA processB processC(ProcessD | ProcessE)' """ # first lets get all forks to a list. list_of_forks = [] # stores forks left_indexes = [] # stores indexes of left brackets # iterate through the string looking for '(' and ')'. for pos, char in enumerate(pipeline_string): if char == FORK_TOKEN: # saves pos to left_indexes list left_indexes.append(pos) elif char == CLOSE_TOKEN and len(left_indexes) > 0: # saves fork to list_of_forks list_of_forks.append(pipeline_string[left_indexes[-1] + 1: pos]) # removes last bracket from left_indexes list left_indexes = left_indexes[:-1] # sort list in descending order of number of forks list_of_forks.sort(key=lambda x: x.count(FORK_TOKEN), reverse=True) # Now, we can iterate through list_of_forks and check for errors in each # fork for fork in list_of_forks: # remove inner forks for these checks since each fork has its own entry # in list_of_forks. Note that each fork is now sorted in descending # order which enables to remove sequentially the string for the fork # potentially with more inner forks for subfork in list_of_forks: # checks if subfork is contained in fork and if they are different, # avoiding to remove itself if subfork in list_of_forks and subfork != fork: # removes inner forks. Note that string has no spaces fork_simplified = fork.replace("({})".format(subfork), "") else: fork_simplified = fork # Checks if there is no fork separator character '|' within each fork if not len(fork_simplified.split(LANE_TOKEN)) > 1: raise SanityError("One of the forks doesn't have '|' " "separator between the processes to fork. This is" " the prime suspect: '({})'".format(fork))
[docs]def insanity_checks(pipeline_str): """Wrapper that performs all sanity checks on the pipeline string Parameters ---------- pipeline_str : str String with the pipeline definition """ # Gets rid of all spaces in string p_string = pipeline_str.replace(" ", "").strip() # some of the check functions use the pipeline_str as the user provided but # the majority uses the parsed p_string. checks = [ [p_string, [ empty_tasks, brackets_but_no_lanes, brackets_insanity_check, lane_char_insanity_check, final_char_insanity_check, fork_procs_insanity_check, start_proc_insanity_check, late_proc_insanity_check ]], [pipeline_str, [ inner_fork_insanity_checks ]] ] # executes sanity checks in pipeline string before parsing it. for param, func_list in checks: for func in func_list: func(param)
[docs]def parse_pipeline(pipeline_str): """Parses a pipeline string into a list of dictionaries with the connections between processes Parameters ---------- pipeline_str : str String with the definition of the pipeline, e.g.:: 'processA processB processC(ProcessD | ProcessE)' Returns ------- pipeline_links : list """ if os.path.exists(pipeline_str): logger.debug("Found pipeline file: {}".format(pipeline_str)) with open(pipeline_str) as fh: pipeline_str = "".join([x.strip() for x in fh.readlines()]) logger.info(colored_print("Resulting pipeline string:\n")) logger.info(colored_print(pipeline_str + "\n")) # Perform pipeline insanity checks insanity_checks(pipeline_str) logger.debug("Parsing pipeline string: {}".format(pipeline_str)) pipeline_links = [] lane = 1 # Add unique identifiers to each process to allow a correct connection # between forks with same processes pipeline_str_modified, identifiers_to_tags = add_unique_identifiers( pipeline_str) # Get number of forks in the pipeline nforks = pipeline_str_modified.count(FORK_TOKEN) logger.debug("Found {} fork(s)".format(nforks)) # If there are no forks, connect the pipeline as purely linear if not nforks: logger.debug("Detected linear pipeline string : {}".format( pipeline_str)) linear_pipeline = ["__init__"] + pipeline_str_modified.split() pipeline_links.extend(linear_connection(linear_pipeline, lane)) # Removes unique identifiers used for correctly assign fork parents with # a possible same process name pipeline_links = remove_unique_identifiers(identifiers_to_tags, pipeline_links) return pipeline_links for i in range(nforks): logger.debug("Processing fork {} in lane {}".format(i, lane)) # Split the pipeline at each fork start position. fields[-1] will # hold the process after the fork. fields[-2] will hold the processes # before the fork. fields = pipeline_str_modified.split(FORK_TOKEN, i + 1) # Get the processes before the fork. This may be empty when the # fork is at the beginning of the pipeline. previous_process = fields[-2].split(LANE_TOKEN)[-1].split() logger.debug("Previous processes string: {}".format(fields[-2])) logger.debug("Previous processes list: {}".format(previous_process)) # Get lanes after the fork next_lanes = get_lanes(fields[-1]) logger.debug("Next lanes object: {}".format(next_lanes)) # Get the immediate targets of the fork fork_sink = [x[0] for x in next_lanes] logger.debug("The fork sinks into the processes: {}".format(fork_sink)) # The first fork is a special case, where the processes before AND # after the fork (until the start of another fork) are added to # the ``pipeline_links`` variable. Otherwise, only the processes # after the fork will be added if i == 0: # If there are no previous process, the fork is at the beginning # of the pipeline string. In this case, inject the special # "init" process. if not previous_process: previous_process = ["__init__"] lane = 0 else: previous_process = ["__init__"] + previous_process # Add the linear modules before the fork pipeline_links.extend( linear_connection(previous_process, lane)) fork_source = previous_process[-1] logger.debug("Fork source is set to: {}".format(fork_source)) fork_lane = get_source_lane(previous_process, pipeline_links) logger.debug("Fork lane is set to: {}".format(fork_lane)) # Add the forking modules pipeline_links.extend( fork_connection(fork_source, fork_sink, fork_lane, lane)) # Add the linear connections in the subsequent lanes pipeline_links.extend( linear_lane_connection(next_lanes, lane)) lane += len(fork_sink) pipeline_links = remove_unique_identifiers(identifiers_to_tags, pipeline_links) return pipeline_links
[docs]def get_source_lane(fork_process, pipeline_list): """Returns the lane of the last process that matches fork_process Parameters ---------- fork_process : list List of processes before the fork. pipeline_list : list List with the pipeline connection dictionaries. Returns ------- int Lane of the last process that matches fork_process """ fork_source = fork_process[-1] fork_sig = [x for x in fork_process if x != "__init__"] for position, p in enumerate(pipeline_list[::-1]): if p["output"]["process"] == fork_source: lane = p["output"]["lane"] logger.debug("Possible source match found in position {} in lane" " {}".format(position, lane)) lane_sequence = [x["output"]["process"] for x in pipeline_list if x["output"]["lane"] == lane] logger.debug("Testing lane sequence '{}' against fork signature" " '{}'".format(lane_sequence, fork_sig)) if lane_sequence == fork_sig: return p["output"]["lane"] return 0
[docs]def get_lanes(lanes_str): """From a raw pipeline string, get a list of lanes from the start of the current fork. When the pipeline is being parsed, it will be split at every fork position. The string at the right of the fork position will be provided to this function. It's job is to retrieve the lanes that result from that fork, ignoring any nested forks. Parameters ---------- lanes_str : str Pipeline string after a fork split Returns ------- lanes : list List of lists, with the list of processes for each lane """ logger.debug("Parsing lanes from raw string: {}".format(lanes_str)) # Temporarily stores the lanes string after removal of nested forks parsed_lanes = "" # Flag used to determined whether the cursor is inside or outside the # right fork infork = 0 for i in lanes_str: # Nested fork started if i == FORK_TOKEN: infork += 1 # Nested fork stopped if i == CLOSE_TOKEN: infork -= 1 if infork < 0: break # Save only when in the right fork if infork == 0: # Ignore forking syntax tokens if i not in [FORK_TOKEN, CLOSE_TOKEN]: parsed_lanes += i return [x.split() for x in parsed_lanes.split(LANE_TOKEN)]
[docs]def linear_connection(plist, lane): """Connects a linear list of processes into a list of dictionaries Parameters ---------- plist : list List with process names. This list should contain at least two entries. lane : int Corresponding lane of the processes Returns ------- res : list List of dictionaries with the links between processes """ logger.debug( "Establishing linear connection with processes: {}".format(plist)) res = [] previous = None for p in plist: # Skip first process if not previous: previous = p continue res.append({ "input": { "process": previous, "lane": lane }, "output": { "process": p, "lane": lane } }) previous = p return res
[docs]def fork_connection(source, sink, source_lane, lane): """Makes the connection between a process and the first processes in the lanes to which it forks. The ``lane`` argument should correspond to the lane of the source process. For each lane in ``sink``, the lane counter will increase. Parameters ---------- source : str Name of the process that is forking sink : list List of the processes where the source will fork to. Each element corresponds to the start of a lane. source_lane : int Lane of the forking process lane : int Lane of the source process Returns ------- res : list List of dictionaries with the links between processes """ logger.debug("Establishing forking of source '{}' into processes" " '{}'. Source lane set to '{}' and lane set to '{}'".format( source, sink, source_lane, lane)) res = [] # Increase the lane counter for the first lane lane_counter = lane + 1 for p in sink: res.append({ "input": { "process": source, "lane": source_lane }, "output": { "process": p, "lane": lane_counter } }) lane_counter += 1 return res
[docs]def linear_lane_connection(lane_list, lane): """ Parameters ---------- lane_list : list Each element should correspond to a list of processes for a given lane lane : int Lane counter before the fork start Returns ------- res : list List of dictionaries with the links between processes """ logger.debug( "Establishing linear connections for lanes: {}".format(lane_list)) res = [] # Increase the lane counter for the first lane lane += 1 for l in lane_list: res.extend(linear_connection(l, lane)) lane += 1 return res
[docs]def add_unique_identifiers(pipeline_str): """Returns the pipeline string with unique identifiers and a dictionary with references between the unique keys and the original values Parameters ---------- pipeline_str : str Pipeline string Returns ------- str Pipeline string with unique identifiers dict Match between process unique values and original names """ # Add space at beginning and end of pipeline to allow regex mapping of final # process in linear pipelines pipeline_str_modified = " {} ".format(pipeline_str) # Regex to get all process names. Catch all words without spaces and that # are not fork tokens or pipes reg_find_proc = r"[^\s{}{}{}]+".format(LANE_TOKEN, FORK_TOKEN, CLOSE_TOKEN) process_names = re.findall(reg_find_proc, pipeline_str_modified) identifiers_to_tags = {} """ dict: Matches new process names (identifiers) with original process names """ new_process_names = [] """ list: New process names used to replace in the pipeline string """ # Assigns the new process names by appending a numeric id at the end of # the process name for index, val in enumerate(process_names): if "=" in val: parts = val.split("=") new_id = "{}_{}={}".format(parts[0], index, parts[1]) else: new_id = "{}_{}".format(val, index) # add new process with id new_process_names.append(new_id) # makes a match between new process name and original process name identifiers_to_tags[new_id] = val # Add space between forks, pipes and the process names for the replace # regex to work match_result = lambda match: " {} ".format(match.group()) # force to add a space between each token so that regex modification can # be applied find = r'[{}{}{}]+'.format(FORK_TOKEN, LANE_TOKEN, CLOSE_TOKEN) pipeline_str_modified = re.sub(find, match_result, pipeline_str_modified) # Replace original process names by the unique identifiers for index, val in enumerate(process_names): # regex to replace process names with non assigned process ids # escape characters are required to match to the dict keys # (identifiers_to_tags), since python keys with escape characters # must be escaped find = r'{}[^_]'.format(val).replace("\\", "\\\\") pipeline_str_modified = re.sub(find, new_process_names[index] + " ", pipeline_str_modified, 1) return pipeline_str_modified, identifiers_to_tags
[docs]def remove_unique_identifiers(identifiers_to_tags, pipeline_links): """Removes unique identifiers and add the original process names to the already parsed pipelines Parameters ---------- identifiers_to_tags : dict Match between unique process identifiers and process names pipeline_links: list Parsed pipeline list with unique identifiers Returns ------- list Pipeline list with original identifiers """ # Replaces the unique identifiers by the original process names for index, val in enumerate(pipeline_links): if val["input"]["process"] != "__init__": val["input"]["process"] = identifiers_to_tags[ val["input"]["process"]] if val["output"]["process"] != "__init__": val["output"]["process"] = identifiers_to_tags[ val["output"]["process"]] return pipeline_links