#!/usr/bin/env python3
"""
Purpose
-------
This module is intended to provide a summary report for a given assembly
in Fasta format.
Expected input
--------------
The following variables are expected whether using NextFlow or the
:py:func:`main` executor.
- ``sample_id`` : Sample Identification string.
- e.g.: ``'SampleA'``
- ``assembly`` : Path to assembly file in Fasta format.
- e.g.: ``'assembly.fasta'``
Generated output
----------------
- ``${sample_id}_assembly_report.csv`` : CSV with summary information of the \
assembly.
- e.g.: ``'SampleA_assembly_report.csv'``
Code documentation
------------------
"""
__version__ = "1.0.1"
__build__ = "16012018"
__template__ = "assembly_report-nf"
import os
import re
import json
import traceback
import subprocess
from collections import OrderedDict
from subprocess import PIPE
from flowcraft_utils.flowcraft_base import get_logger, MainWrapper
logger = get_logger(__file__)
def __get_version_pilon():
pilon_path = "/NGStools/pilon-1.22.jar"
try:
cli = ["java", "-jar", pilon_path, "--version"]
p = subprocess.Popen(cli, stdout=PIPE, stderr=PIPE)
stdout, _ = p.communicate()
version = stdout.split()[2].decode("utf8")
except Exception as e:
logger.debug(e)
version = "undefined"
return {
"program": "Pilon",
"version": version,
}
if __file__.endswith(".command.sh"):
SAMPLE_ID = '$sample_id'
ASSEMBLY_FILE = '$assembly'
COVERAGE_BP_FILE = '$coverage_bp'
logger.debug("Running {} with parameters:".format(
os.path.basename(__file__)))
logger.debug("SAMPLE_ID: {}".format(SAMPLE_ID))
logger.debug("ASSEMBLY_FILE: {}".format(ASSEMBLY_FILE))
logger.debug("COVERAGE_BP_FILE: {}".format(COVERAGE_BP_FILE))
[docs]class Assembly:
"""Class that parses and filters an assembly file in Fasta format.
This class parses an assembly file, collects a number
of summary statistics and metadata from the contigs and reports.
Parameters
----------
assembly_file : str
Path to assembly file.
sample_id : str
Name of the sample for the current assembly.
"""
def __init__(self, assembly_file, sample_id):
self.summary_info = OrderedDict([
("ncontigs", 0),
("avg_contig_size", []),
("n50", 0),
("total_len", 0),
("avg_gc", []),
("missing_data", 0)
])
"""
OrderedDict: Initialize summary information dictionary. Contains keys:
- ``ncontigs``: Number of contigs
- ``avg_contig_size``: Average size of contigs
- ``n50``: N50 metric
- ``total_len``: Total assembly length
- ``avg_gc``: Average GC proportion
- ``missing_data``: Count of missing data characters
"""
self.contigs = OrderedDict()
"""
OrderedDict: Object that maps the contig headers to the corresponding
sequence
"""
self.contig_coverage = OrderedDict()
"""
OrderedDict: Object that maps the contig headers to the corresponding
list of per-base coverage
"""
self.sample = sample_id
"""
str: Sample id
"""
self.contig_boundaries = {}
"""
dict: Maps the boundaries of each contig in the genome
"""
self._parse_assembly(assembly_file)
def _parse_assembly(self, assembly_file):
"""Parse an assembly file in fasta format.
This is a Fasta parsing method that populates the
:py:attr:`Assembly.contigs` attribute with data for each contig in the
assembly.
Parameters
----------
assembly_file : str
Path to the assembly fasta file.
"""
with open(assembly_file) as fh:
header = None
logger.debug("Starting iteration of assembly file: {}".format(
assembly_file))
for line in fh:
# Skip empty lines
if not line.strip():
continue
if line.startswith(">"):
# Add contig header to contig dictionary
header = line[1:].strip()
self.contigs[header] = []
else:
# Add sequence string for the current contig
self.contigs[header].append(line.strip())
# After populating the contigs dictionary, convert the values
# list into a string sequence
self.contigs = OrderedDict(
(header, "".join(seq)) for header, seq in self.contigs.items())
@staticmethod
def _get_contig_id(contig_str):
"""Tries to retrieve contig id. Returns the original string if it
is unable to retrieve the id.
Parameters
----------
contig_str : str
Full contig string (fasta header)
Returns
-------
str
Contig id
"""
contig_id = contig_str
try:
contig_id = re.search(".*NODE_([0-9]*)_.*", contig_str).group(1)
except AttributeError:
pass
try:
contig_id = re.search(".*Contig_([0-9]*)_.*", contig_str).group(1)
except AttributeError:
pass
return contig_id
[docs] def get_summary_stats(self, output_csv=None):
"""Generates a CSV report with summary statistics about the assembly
The calculated statistics are:
- Number of contigs
- Average contig size
- N50
- Total assembly length
- Average GC content
- Amount of missing data
Parameters
----------
output_csv: str
Name of the output CSV file.
"""
contig_size_list = []
self.summary_info["ncontigs"] = len(self.contigs)
for contig_id, sequence in self.contigs.items():
logger.debug("Processing contig: {}".format(contig_id))
# Get contig sequence size
contig_len = len(sequence)
# Add size for average contig size
contig_size_list.append(contig_len)
# Add to total assembly length
self.summary_info["total_len"] += contig_len
# Add to average gc
self.summary_info["avg_gc"].append(
sum(map(sequence.count, ["G", "C"])) / contig_len
)
# Add to missing data
self.summary_info["missing_data"] += sequence.count("N")
# Get average contig size
logger.debug("Getting average contig size")
self.summary_info["avg_contig_size"] = \
sum(contig_size_list) / len(contig_size_list)
# Get average gc content
logger.debug("Getting average GC content")
self.summary_info["avg_gc"] = \
sum(self.summary_info["avg_gc"]) / len(self.summary_info["avg_gc"])
# Get N50
logger.debug("Getting N50")
cum_size = 0
for l in sorted(contig_size_list, reverse=True):
cum_size += l
if cum_size >= self.summary_info["total_len"] / 2:
self.summary_info["n50"] = l
break
if output_csv:
logger.debug("Writing report to csv")
# Write summary info to CSV
with open(output_csv, "w") as fh:
summary_line = "{}, {}\\n".format(
self.sample, ",".join(
[str(x) for x in self.summary_info.values()]))
fh.write(summary_line)
def _get_window_labels(self, window):
"""Returns the mapping between sliding window points and their contigs,
and the x-axis position of contig
Parameters
----------
window : int
Size of the window.
Returns
-------
xbars : list
The x-axis position of the ending for each contig.
labels : list
The x-axis labels for each data point in the sliding window
"""
# Get summary stats, if they have not yet been triggered
if not self.summary_info:
self.get_summary_stats()
# Get contig boundary positon
c = 0
xbars = []
for contig, seq in self.contigs.items():
contig_id = self._get_contig_id(contig)
self.contig_boundaries[contig_id] = [c, c + len(seq)]
c += len(seq)
xbars.append((contig_id, c, contig))
return xbars
@staticmethod
def _gc_prop(s, length):
"""Get proportion of GC from a string
Parameters
----------
s : str
Arbitrary string
Returns
-------
x : float
GC proportion.
"""
gc = sum(map(s.count, ["c", "g"]))
return gc / length
[docs] def get_gc_sliding(self, window=2000):
"""Calculates a sliding window of the GC content for the assembly
Returns
-------
gc_res : list
List of GC proportion floats for each data point in the sliding
window
"""
gc_res = []
# Get complete sequence to calculate sliding window values
complete_seq = "".join(self.contigs.values()).lower()
for i in range(0, len(complete_seq), window):
seq_window = complete_seq[i:i + window]
# Get GC proportion
gc_res.append(round(self._gc_prop(seq_window, len(seq_window)), 2))
return gc_res
def _get_coverage_from_file(self, coverage_file):
"""
Parameters
----------
coverage_file
Returns
-------
"""
with open(coverage_file) as fh:
for line in fh:
fields = line.strip().split()
# Get header
header = fields[0]
coverage = int(fields[2])
if header not in self.contig_coverage:
self.contig_coverage[header] = [coverage]
else:
self.contig_coverage[header].append(coverage)
[docs] def get_coverage_sliding(self, coverage_file, window=2000):
"""
Parameters
----------
coverage_file : str
Path to file containing the coverage info at the per-base level
(as generated by samtools depth)
window : int
Size of sliding window
Returns
-------
"""
if not self.contig_coverage:
self._get_coverage_from_file(coverage_file)
# Stores the coverage results
cov_res = []
# Make flat list of coverage values across genome
complete_cov = [x for y in self.contig_coverage.values() for x in y]
for i in range(0, len(complete_cov), window):
# Get coverage values for current window
cov_window = complete_cov[i:i + window]
# Get mean coverage
cov_res.append(int(sum(cov_window) / len(cov_window)))
return cov_res
@MainWrapper
def main(sample_id, assembly_file, coverage_bp_file=None):
"""Main executor of the assembly_report template.
Parameters
----------
sample_id : str
Sample Identification string.
assembly_file : str
Path to assembly file in Fasta format.
"""
logger.info("Starting assembly report")
assembly_obj = Assembly(assembly_file, sample_id)
logger.info("Retrieving summary statistics for assembly")
assembly_obj.get_summary_stats("{}_assembly_report.csv".format(sample_id))
size_dist = [len(x) for x in assembly_obj.contigs.values()]
json_dic = {
"tableRow": [{
"sample": sample_id,
"data": [
{"header": "Contigs",
"value": assembly_obj.summary_info["ncontigs"],
"table": "assembly",
"columnBar": True},
{"header": "Assembled BP",
"value": assembly_obj.summary_info["total_len"],
"table": "assembly",
"columnBar": True},
]
}],
"plotData": [{
"sample": sample_id,
"data": {
"size_dist": size_dist
}
}]
}
if coverage_bp_file:
try:
window = 2000
gc_sliding_data = assembly_obj.get_gc_sliding(window=window)
cov_sliding_data = \
assembly_obj.get_coverage_sliding(coverage_bp_file,
window=window)
# Get total basepairs based on the individual coverage of each
# contig bpx
total_bp = sum(
[sum(x) for x in assembly_obj.contig_coverage.values()]
)
# Add data to json report
json_dic["plotData"][0]["data"]["genomeSliding"] = {
"gcData": gc_sliding_data,
"covData": cov_sliding_data,
"window": window,
"xbars": assembly_obj._get_window_labels(window),
"assemblyFile": os.path.basename(assembly_file)
}
json_dic["plotData"][0]["data"]["sparkline"] = total_bp
except:
logger.error("Unexpected error creating sliding window data:\\n"
"{}".format(traceback.format_exc()))
# Write json report
with open(".report.json", "w") as json_report:
json_report.write(json.dumps(json_dic, separators=(",", ":")))
with open(".status", "w") as status_fh:
status_fh.write("pass")
if __name__ == '__main__':
main(SAMPLE_ID, ASSEMBLY_FILE, COVERAGE_BP_FILE)