Source code for flowcraft.templates.process_abricate

#!/usr/bin/env python3

"""
Purpose
-------

This module is intended parse the results of the Abricate for one or more
samples.

Expected input
--------------

The following variables are expected whether using NextFlow or the
:py:func:`main` executor.

- ``abricate_files`` : Path to abricate output file.
    - e.g.: ``'abr_resfinder.tsv'``

Generated output
----------------

None


Code documentation
------------------

"""

__version__ = "1.0.1"
__build__ = "26032018"
__template__ = "process_abricate-nf"

import re
import os
import json
import operator
import subprocess

from subprocess import PIPE

from flowcraft_utils.flowcraft_base import get_logger, MainWrapper

logger = get_logger(__file__)


def __get_version_abricate():

    try:

        # Get abricate version
        cli = ["abricate", "--version"]
        p = subprocess.Popen(cli, stdout=PIPE, stderr=PIPE)
        stdout, _ = p.communicate()

        version = stdout.strip().split()[-1].decode("utf8")

    except Exception as e:
        logger.debug(e)
        version = "undefined"

    try:

        # Get abricate database versions
        cli = ["abricate", "--list"]
        p = subprocess.Popen(cli, stdout=PIPE, stderr=PIPE)
        dbout, _ = p.communicate()

        databases = [[u.decode("utf8") for u in i.strip().split()]
                     for i in dbout.splitlines()][1:]

    except Exception as e:
        logger.debug(e)
        databases = "undefined"

    return {
        "program": "abricate",
        "version": version,
        "databases": databases
    }


if __file__.endswith(".command.sh"):
    ABRICATE_FILES = '$abricate_file'.split()
    logger.debug("Running {} with parameters:".format(
        os.path.basename(__file__)))
    logger.debug("ABRICATE_FILE: {}".format(ABRICATE_FILES))


[docs]class Abricate: """Main parser for Abricate output files. This class parses one or more output files from Abricate, usually from different databases. In addition to the parsing methods, it also provides a flexible method to filter and re-format the content of the abricate files. Parameters ---------- fls : list List of paths to Abricate output files. """ def __init__(self, fls): self.storage = {} """ dic: Main storage of Abricate's file content. Each entry corresponds to a single line and contains the keys:: - ``log_file``: Name of the summary log file containing abricate results - ``infile``: Input file of Abricate. - ``reference``: Reference of the query sequence. - ``seq_range``: Range of the query sequence in the database sequence. - ``gene``: AMR gene name. - ``accession``: The genomic source of the sequence. - ``database``: The database the sequence came from. - ``coverage``: Proportion of gene covered. - ``identity``: Proportion of exact nucleotide matches. """ self._key = 0 """ int: Arbitrary key for unique entries in the storage attribute """ self.parse_files(fls)
[docs] def parse_files(self, fls): """Public method for parsing abricate output files. This method is called at at class instantiation for the provided output files. Additional abricate output files can be added using this method after the class instantiation. Parameters ---------- fls : list List of paths to Abricate files """ for f in fls: # Make sure paths exists if os.path.exists(f): self._parser(f) else: logger.warning("File {} does not exist".format(f))
def _parser(self, fl): """Parser for a single abricate output file. This parser will scan a single Abricate output file and populate the :py:attr:`Abricate.storage` attribute. Parameters ---------- fl : str Path to abricate output file Notes ----- This method will populate the :py:attr:`Abricate.storage` attribute with all compliant lines in the abricate output file. Entries are inserted using an arbitrary key that is set by the :py:attr:`Abricate._key` attribute. """ with open(fl) as fh: for line in fh: # Skip header and comment lines if line.startswith("#") or line.strip() == "": continue fields = line.strip().split("\t") try: coverage = float(fields[8]) except ValueError: coverage = None try: identity = float(fields[9]) except ValueError: identity = None try: accession = fields[11] except IndexError: accession = None self.storage[self._key] = { "log_file": os.path.basename(fl), "infile": fields[0], "reference": fields[1], "seq_range": (int(fields[2]), int(fields[3])), "gene": fields[4], "accession": accession, "database": fields[10], "coverage": coverage, "identity": identity } self._key += 1 @staticmethod def _test_truth(x, op, y): """ Test the truth of a comparison between x and y using an operator. If you want to compare '100 > 200', this method can be called as self._test_truth(100, ">", 200). Parameters ---------- x : int Arbitrary value to compare in the left. op : str Comparison operator. y : int Arbitrary value to compare in the right. Returns ------- x : bool The 'truthness' of the test. """ ops = { ">": operator.gt, "<": operator.lt, ">=": operator.ge, "<=": operator.le, "==": operator.eq, "!=": operator.ne } return ops[op](x, y)
[docs] def iter_filter(self, filters, databases=None, fields=None, filter_behavior="and"): """General purpose filter iterator. This general filter iterator allows the filtering of entries based on one or more custom filters. These filters must contain an entry of the `storage` attribute, a comparison operator, and the test value. For example, to filter out entries with coverage below 80:: my_filter = ["coverage", ">=", 80] Filters should always be provide as a list of lists:: iter_filter([["coverage", ">=", 80]]) # or my_filters = [["coverage", ">=", 80], ["identity", ">=", 50]] iter_filter(my_filters) As a convenience, a list of the desired databases can be directly specified using the `database` argument, which will only report entries for the specified databases:: iter_filter(my_filters, databases=["plasmidfinder"]) By default, this method will yield the complete entry record. However, the returned filters can be specified using the `fields` option:: iter_filter(my_filters, fields=["reference", "coverage"]) Parameters ---------- filters : list List of lists with the custom filter. Each list should have three elements. (1) the key from the entry to be compared; (2) the comparison operator; (3) the test value. Example: ``[["identity", ">", 80]]``. databases : list List of databases that should be reported. fields : list List of fields from each individual entry that are yielded. filter_behavior : str options: ``'and'`` ``'or'`` Sets the behaviour of the filters, if multiple filters have been provided. By default it is set to ``'and'``, which means that an entry has to pass all filters. It can be set to ``'or'``, in which case one one of the filters has to pass. yields ------ dic : dict Dictionary object containing a :py:attr:`Abricate.storage` entry that passed the filters. """ if filter_behavior not in ["and", "or"]: raise ValueError("Filter behavior must be either 'and' or 'or'") for dic in self.storage.values(): # This attribute will determine whether an entry will be yielded # or not _pass = False # Stores the flags with the test results for each filter # The results will be either True or False flag = [] # Filter for databases if databases: # Skip entry if not in specified database if dic["database"] not in databases: continue # Apply filters for f in filters: # Get value of current filter val = dic[f[0]] if not self._test_truth(val, f[1], f[2]): flag.append(False) else: flag.append(True) # Test whether the entry will pass based on the test results # and the filter behaviour if filter_behavior == "and": if all(flag): _pass = True elif filter_behavior == "or": if any(flag): _pass = True if _pass: if fields: yield dict((x, y) for x, y in dic.items() if x in fields) else: yield dic
[docs] def get_filter(self, *args, **kwargs): """ Wrapper of the iter_filter method that returns a list with results It should be called exactly as in the `iter_filter` Returns ------- _ : list List of dictionary entries that passed the filters in the `iter_filter` method. See Also -------- iter_filter """ return list(self.iter_filter(*args, **kwargs))
[docs]class AbricateReport(Abricate): """Report generator for single Abricate output files This class is intended to parse an Abricate output file from a single sample and database and generates a JSON report for the report webpage. Parameters ---------- fls : list List of paths to Abricate output files. database : (optional) str Name of the database for the current report. If not provided, it will be inferred based on the first entry of the Abricate file. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @staticmethod def _get_contig_id(contig_str): """Tries to retrieve contig id. Returns the original string if it is unable to retrieve the id. Parameters ---------- contig_str : str Full contig string (fasta header) Returns ------- str Contig id """ contig_id = contig_str try: contig_id = re.search(".*NODE_([0-9]*)_.*", contig_str).group(1) except AttributeError: pass try: contig_id = re.search(".*Contig_([0-9]*)_.*", contig_str).group(1) except AttributeError: pass return contig_id
[docs] def get_plot_data(self): """ Generates the JSON report to plot the gene boxes Following the convention of the reports platform, this method returns a list of JSON/dict objects with the information about each entry in the abricate file. The information contained in this JSON is:: {contig_id: <str>, seqRange: [<int>, <int>], gene: <str>, accession: <str>, coverage: <float>, identity: <float> } Note that the `seqRange` entry contains the position in the corresponding contig, not the absolute position in the whole assembly. Returns ------- json_dic : list List of JSON/dict objects with the report data. """ json_dic = {"plotData": []} sample_dic = {} sample_assembly_map = {} for entry in self.storage.values(): sample_id = re.match("(.*)_abr", entry["log_file"]).groups()[0] if sample_id not in sample_dic: sample_dic[sample_id] = {} # Get contig ID using the same regex as in `assembly_report.py` # template contig_id = self._get_contig_id(entry["reference"]) # Get database database = entry["database"] if database not in sample_dic[sample_id]: sample_dic[sample_id][database] = [] # Update the sample-assembly correspondence dict if sample_id not in sample_assembly_map: sample_assembly_map[sample_id] = entry["infile"] sample_dic[sample_id][database].append( {"contig": contig_id, "seqRange": entry["seq_range"], "gene": entry["gene"].replace("'", ""), "accession": entry["accession"], "coverage": entry["coverage"], "identity": entry["identity"], }, ) for sample, data in sample_dic.items(): json_dic["plotData"].append( { "sample": sample, "data": {"abricateXrange": data}, "assemblyFile": sample_assembly_map[sample] } ) return json_dic
[docs] def get_table_data(self): """ Returns ------- """ gene_storage = {} json_dic = {"tableRow": []} logger.info("Generating JSON table data") # Collect the gene lists for each database for key, entry in self.storage.items(): # Retrieve and initiate new sample entry, if not present already logger.debug("Retrieving sample if from: {}".format( entry["infile"])) sample_id = re.match("(.*)_abr", entry["log_file"]).groups()[0] database = entry["database"] if sample_id not in gene_storage: gene_storage[sample_id] = {} if database not in gene_storage[sample_id]: gene_storage[sample_id][database] = [] gene_storage[sample_id][database].append( entry["gene"].replace("'", "").replace('"', '') ) # For each database, create the JSON report for sample, table_data in gene_storage.items(): json_dic["tableRow"].append({ "sample": sample, "data": [] }) for db, gene_list in table_data.items(): ind_json = { "table": "abricate", "header": db, "value": len(gene_list), "geneList": gene_list } json_dic["tableRow"][-1]["data"].append(ind_json) return json_dic
[docs] def write_report_data(self): """Writes the JSON report to a json file """ json_plot = self.get_plot_data() json_table = self.get_table_data() json_dic = {**json_plot, **json_table} with open(".report.json", "w") as json_report: json_report.write(json.dumps(json_dic, separators=(",", ":")))
if __name__ == '__main__': @MainWrapper def main(abr_file): abr = AbricateReport(fls=abr_file) abr.write_report_data() main(ABRICATE_FILES)