#!/usr/bin/env python3
"""
Purpose
-------
This module is intended execute trimmomatic on paired-end FastQ files.
Expected input
--------------
The following variables are expected whether using NextFlow or the
:py:func:`main` executor.
- ``fastq_id`` : Pair of FastQ file paths.
- e.g.: ``'SampleA'``
- ``fastq_pair`` : Pair of FastQ file paths.
- e.g.: ``'SampleA_1.fastq.gz SampleA_2.fastq.gz'``
- ``trim_range`` : Crop range detected using FastQC.
- e.g.: ``'15 151'``
- ``opts`` : List of options for trimmomatic
- e.g.: ``'["5:20", "3", "3", "55"]'``
- e.g.: ``'[trim_sliding_window, trim_leading, trim_trailing, trim_min_length]'``
- ``phred`` : List of guessed phred values for each sample
- e.g.: ``'[SampleA: 33, SampleB: 33]'``
Generated output
----------------
The generated output are output files that contain an object, usually a string.
(Values within ``${}`` are substituted by the corresponding variable.)
- ``${fastq_id}_*P*``: Pair of paired FastQ files generated by Trimmomatic
- e.g.: ``'SampleA_1_P.fastq.gz SampleA_2_P.fastq.gz'``
- ``trimmomatic_status``: Stores the status of the trimmomatic run. If it was\
successfully executed, it stores 'pass'. Otherwise, it stores the \
``STDERR`` message.
- e.g.: ``'pass'``
Code documentation
------------------
"""
# TODO: More control over read trimming
# TODO: Add option to remove adapters
# TODO: What to do when there is encoding failure
__version__ = "1.0.2"
__build__ = "28032018"
__template__ = "trimmomatic-nf"
import os
import json
import fileinput
import subprocess
from subprocess import PIPE
from collections import OrderedDict
from assemblerflow_utils.assemblerflow_base import get_logger, MainWrapper
logger = get_logger(__file__)
def __get_version_trimmomatic():
try:
cli = ["java", "-jar", TRIM_PATH, "-version"]
p = subprocess.Popen(cli, stdout=PIPE, stderr=PIPE)
stdout, _ = p.communicate()
version = stdout.strip().decode("utf8")
except Exception as e:
logger.debug(e)
version = "undefined"
return {
"program": "Trimmomatic",
"version": version,
}
if __file__.endswith(".command.sh"):
FASTQ_ID = '$fastq_id'
FASTQ_PAIR = '$fastq_pair'.split()
TRIM_RANGE = '$trim_range'.split()
TRIM_OPTS = [x.strip() for x in '$opts'.strip("[]").split(",")]
PHRED = '$phred'
ADAPTERS_FILE = '$ad'
logger.debug("Running {} with parameters:".format(
os.path.basename(__file__)))
logger.debug("FASTQ_ID: {}".format(FASTQ_ID))
logger.debug("FASTQ_PAIR: {}".format(FASTQ_PAIR))
logger.debug("TRIM_RANGE: {}".format(TRIM_RANGE))
logger.debug("TRIM_OPTS: {}".format(TRIM_OPTS))
logger.debug("PHRED: {}".format(PHRED))
logger.debug("ADAPTERS_FILE: {}".format(ADAPTERS_FILE))
TRIM_PATH = "/NGStools/Trimmomatic-0.36/trimmomatic.jar"
ADAPTERS_PATH = "/NGStools/Trimmomatic-0.36/adapters"
[docs]def parse_log(log_file):
"""Retrieves some statistics from a single Trimmomatic log file.
This function parses Trimmomatic's log file and stores some trimming
statistics in an :py:class:`OrderedDict` object. This object contains
the following keys:
- ``clean_len``: Total length after trimming.
- ``total_trim``: Total trimmed base pairs.
- ``total_trim_perc``: Total trimmed base pairs in percentage.
- ``5trim``: Total base pairs trimmed at 5' end.
- ``3trim``: Total base pairs trimmed at 3' end.
Parameters
----------
log_file : str
Path to trimmomatic log file.
Returns
-------
x : :py:class:`OrderedDict`
Object storing the trimming statistics.
"""
template = OrderedDict([
# Total length after trimming
("clean_len", 0),
# Total trimmed base pairs
("total_trim", 0),
# Total trimmed base pairs in percentage
("total_trim_perc", 0),
# Total trimmed at 5' end
("5trim", 0),
# Total trimmed at 3' end
("3trim", 0),
# Bad reads (completely trimmed)
("bad_reads", 0)
])
with open(log_file) as fh:
for line in fh:
# This will split the log fields into:
# 0. read length after trimming
# 1. amount trimmed from the start
# 2. last surviving base
# 3. amount trimmed from the end
fields = [int(x) for x in line.strip().split()[-4:]]
if not fields[0]:
template["bad_reads"] += 1
template["5trim"] += fields[1]
template["3trim"] += fields[3]
template["total_trim"] += fields[1] + fields[3]
template["clean_len"] += fields[0]
total_len = template["clean_len"] + template["total_trim"]
if total_len:
template["total_trim_perc"] = round(
(template["total_trim"] / total_len) * 100, 2)
else:
template["total_trim_perc"] = 0
return template
[docs]def write_report(storage_dic, output_file):
""" Writes a report from multiple samples.
Parameters
----------
storage_dic : dict or :py:class:`OrderedDict`
Storage containing the trimming statistics. See :py:func:`parse_log`
for its generation.
output_file : str
Path where the output file will be generated.
"""
with open(output_file, "w") as fh, open(".report.json", "w") as json_rep:
# Write header
fh.write("Sample,Total length,Total trimmed,%,5end Trim,3end Trim,"
"bad_reads\\n")
# Write contents
for sample, vals in storage_dic.items():
fh.write("{},{}\\n".format(
sample, ",".join([str(x) for x in vals.values()])))
json_dic = {
"tableRow": [
{"header": "Trimmed (%)",
"value": vals["total_trim_perc"],
"table": "assembly",
"columnBar": True},
],
"plotData": {
"sparkline": vals["clean_len"]
},
"badReads": vals["bad_reads"]
}
json_rep.write(json.dumps(json_dic, separators=(",", ":")))
[docs]def trimmomatic_log(log_file):
log_storage = OrderedDict()
log_id = log_file.rstrip("_trimlog.txt")
log_storage[log_id] = parse_log(log_file)
os.remove(log_file)
write_report(log_storage, "trimmomatic_report.csv")
[docs]def clean_up():
"""Cleans the working directory of unwanted temporary files"""
# Find unpaired fastq files
unpaired_fastq = [f for f in os.listdir(".")
if f.endswith("_U.fastq.gz")]
# Remove unpaired fastq files, if any
for fpath in unpaired_fastq:
os.remove(fpath)
[docs]def merge_default_adapters():
"""Merges the default adapters file in the trimmomatic adapters directory
Returns
-------
str
Path with the merged adapters file.
"""
default_adapters = [os.path.join(ADAPTERS_PATH, x) for x in
os.listdir(ADAPTERS_PATH)]
filepath = os.path.join(os.getcwd(), "default_adapters.fasta")
with open(filepath, "w") as fh, \
fileinput.input(default_adapters) as in_fh:
for line in in_fh:
fh.write(line)
return filepath
@MainWrapper
def main(fastq_id, fastq_pair, trim_range, trim_opts, phred, adapters_file):
""" Main executor of the trimmomatic template.
Parameters
----------
fastq_id : str
Sample Identification string.
fastq_pair : list
Two element list containing the paired FastQ files.
trim_range : list
Two element list containing the trimming range.
trim_opts : list
Four element list containing several trimmomatic options:
[*SLIDINGWINDOW*; *LEADING*; *TRAILING*; *MINLEN*]
phred : int
Guessed phred score for the sample. The phred score is a generated
output from :py:class:`templates.integrity_coverage`.
adapters_file : str
Path to adapters file. If not provided, or the path is not available,
it will use the default adapters from Trimmomatic will be used
"""
logger.info("Starting trimmomatic")
# Create base CLI
cli = [
"java",
"-Xmx{}".format("$task.memory"[:-1].lower().replace(" ", "")),
"-jar",
TRIM_PATH.strip(),
"PE",
"-threads",
"$task.cpus"
]
# If the phred encoding was detected, provide it
try:
# Check if the provided PHRED can be converted to int
phred = int(phred)
phred_flag = "-phred{}".format(str(phred))
cli += [phred_flag]
# Could not detect phred encoding. Do not add explicit encoding to
# trimmomatic and let it guess
except ValueError:
pass
# Add input samples to CLI
cli += fastq_pair
# Add output file names
output_names = []
for i in range(len(fastq_pair)):
output_names.append("{}_{}_trim.fastq.gz".format(
FASTQ_ID, str(i + 1)))
output_names.append("{}_{}_U.fastq.gz".format(
FASTQ_ID, str(i + 1)))
cli += output_names
if trim_range != ["None"]:
cli += [
"CROP:{}".format(trim_range[1]),
"HEADCROP:{}".format(trim_range[0]),
]
if os.path.exists(adapters_file):
logger.debug("Using the provided adapters file '{}'".format(
adapters_file))
else:
logger.debug("Adapters file '{}' not provided or does not exist. Using"
" default adapters".format(adapters_file))
adapters_file = merge_default_adapters()
cli += [
"ILLUMINACLIP:{}:3:30:10:6:true".format(adapters_file)
]
# Add trimmomatic options
cli += [
"SLIDINGWINDOW:{}".format(trim_opts[0]),
"LEADING:{}".format(trim_opts[1]),
"TRAILING:{}".format(trim_opts[2]),
"MINLEN:{}".format(trim_opts[3]),
"TOPHRED33",
"-trimlog",
"{}_trimlog.txt".format(fastq_id)
]
logger.debug("Running trimmomatic subprocess with command: {}".format(cli))
p = subprocess.Popen(cli, stdout=PIPE, stderr=PIPE)
stdout, stderr = p.communicate()
# Attempt to decode STDERR output from bytes. If unsuccessful, coerce to
# string
try:
stderr = stderr.decode("utf8")
except (UnicodeDecodeError, AttributeError):
stderr = str(stderr)
logger.info("Finished trimmomatic subprocess with STDOUT:\\n"
"======================================\\n{}".format(stdout))
logger.info("Fished trimmomatic subprocesswith STDERR:\\n"
"======================================\\n{}".format(stderr))
logger.info("Finished trimmomatic with return code: {}".format(
p.returncode))
trimmomatic_log("{}_trimlog.txt".format(fastq_id))
clean_up()
# Check if trimmomatic ran successfully. If not, write the error message
# to the status channel and exit.
with open(".status", "w") as status_fh:
if p.returncode != 0:
status_fh.write("fail")
return
else:
status_fh.write("pass")
if __name__ == '__main__':
main(FASTQ_ID, FASTQ_PAIR, TRIM_RANGE, TRIM_OPTS, PHRED, ADAPTERS_FILE)