#!/usr/bin/env python3
"""
Purpose
-------
This module is intended execute trimmomatic on paired-end FastQ files.
Expected input
--------------
The following variables are expected whether using NextFlow or the
:py:func:`main` executor.
- ``sample_id`` : Pair of FastQ file paths.
- e.g.: ``'SampleA'``
- ``fastq_pair`` : Pair of FastQ file paths.
- e.g.: ``'SampleA_1.fastq.gz SampleA_2.fastq.gz'``
- ``trim_range`` : Crop range detected using FastQC.
- e.g.: ``'15 151'``
- ``opts`` : List of options for trimmomatic
- e.g.: ``'["5:20", "3", "3", "55"]'``
- e.g.: ``'[trim_sliding_window, trim_leading, trim_trailing, trim_min_length]'``
- ``phred`` : List of guessed phred values for each sample
- e.g.: ``'[SampleA: 33, SampleB: 33]'``
- ``clear`` : If 'true', remove the input fastq files at the end of the
component run, IF THE FILES ARE IN THE WORK DIRECTORY
Generated output
----------------
The generated output are output files that contain an object, usually a string.
(Values within ``${}`` are substituted by the corresponding variable.)
- ``${sample_id}_*P*``: Pair of paired FastQ files generated by Trimmomatic
- e.g.: ``'SampleA_1_P.fastq.gz SampleA_2_P.fastq.gz'``
- ``trimmomatic_status``: Stores the status of the trimmomatic run. If it was\
successfully executed, it stores 'pass'. Otherwise, it stores the \
``STDERR`` message.
- e.g.: ``'pass'``
Code documentation
------------------
"""
# TODO: More control over read trimming
# TODO: Add option to remove adapters
# TODO: What to do when there is encoding failure
__version__ = "1.0.3"
__build__ = "29062018"
__template__ = "trimmomatic-nf"
import os
import re
import json
import fileinput
import subprocess
import tempfile
from subprocess import PIPE
from collections import OrderedDict
from flowcraft_utils.flowcraft_base import get_logger, MainWrapper
logger = get_logger(__file__)
def __get_version_trimmomatic():
try:
cli = ["java", "-jar", TRIM_PATH, "-version"]
p = subprocess.Popen(cli, stdout=PIPE, stderr=PIPE)
stdout, _ = p.communicate()
version = stdout.strip().decode("utf8")
except Exception as e:
logger.debug(e)
version = "undefined"
return {
"program": "Trimmomatic",
"version": version,
}
if __file__.endswith(".command.sh"):
SAMPLE_ID = '$sample_id'
FASTQ_PAIR = '$fastq_pair'.split()
TRIM_RANGE = '$trim_range'.split()
TRIM_OPTS = [x.strip() for x in '$opts'.strip("[]").split(",")]
PHRED = '$phred'
ADAPTERS_FILE = '$ad'
CLEAR = '$clear'
logger.debug("Running {} with parameters:".format(
os.path.basename(__file__)))
logger.debug("SAMPLE_ID: {}".format(SAMPLE_ID))
logger.debug("FASTQ_PAIR: {}".format(FASTQ_PAIR))
logger.debug("TRIM_RANGE: {}".format(TRIM_RANGE))
logger.debug("TRIM_OPTS: {}".format(TRIM_OPTS))
logger.debug("PHRED: {}".format(PHRED))
logger.debug("ADAPTERS_FILE: {}".format(ADAPTERS_FILE))
logger.debug("CLEAR: {}".format(CLEAR))
TRIM_PATH = "/NGStools/Trimmomatic-0.36/trimmomatic.jar"
ADAPTERS_PATH = "/NGStools/Trimmomatic-0.36/adapters"
[docs]def parse_log(log_file):
"""Retrieves some statistics from a single Trimmomatic log file.
This function parses Trimmomatic's log file and stores some trimming
statistics in an :py:class:`OrderedDict` object. This object contains
the following keys:
- ``clean_len``: Total length after trimming.
- ``total_trim``: Total trimmed base pairs.
- ``total_trim_perc``: Total trimmed base pairs in percentage.
- ``5trim``: Total base pairs trimmed at 5' end.
- ``3trim``: Total base pairs trimmed at 3' end.
Parameters
----------
log_file : str
Path to trimmomatic log file.
Returns
-------
x : :py:class:`OrderedDict`
Object storing the trimming statistics.
"""
template = OrderedDict([
# Total length after trimming
("clean_len", 0),
# Total trimmed base pairs
("total_trim", 0),
# Total trimmed base pairs in percentage
("total_trim_perc", 0),
# Total trimmed at 5' end
("5trim", 0),
# Total trimmed at 3' end
("3trim", 0),
# Bad reads (completely trimmed)
("bad_reads", 0)
])
with open(log_file) as fh:
for line in fh:
# This will split the log fields into:
# 0. read length after trimming
# 1. amount trimmed from the start
# 2. last surviving base
# 3. amount trimmed from the end
fields = [int(x) for x in line.strip().split()[-4:]]
if not fields[0]:
template["bad_reads"] += 1
template["5trim"] += fields[1]
template["3trim"] += fields[3]
template["total_trim"] += fields[1] + fields[3]
template["clean_len"] += fields[0]
total_len = template["clean_len"] + template["total_trim"]
if total_len:
template["total_trim_perc"] = round(
(template["total_trim"] / total_len) * 100, 2)
else:
template["total_trim_perc"] = 0
return template
[docs]def write_report(storage_dic, output_file, sample_id):
""" Writes a report from multiple samples.
Parameters
----------
storage_dic : dict or :py:class:`OrderedDict`
Storage containing the trimming statistics. See :py:func:`parse_log`
for its generation.
output_file : str
Path where the output file will be generated.
"""
with open(output_file, "w") as fh, open(".report.json", "w") as json_rep:
# Write header
fh.write("Sample,Total length,Total trimmed,%,5end Trim,3end Trim,"
"bad_reads\\n")
# Write contents
for sample, vals in storage_dic.items():
fh.write("{},{}\\n".format(
sample, ",".join([str(x) for x in vals.values()])))
json_dic = {
"tableRow": [{
"sample": sample_id,
"data": [
{"header": "Trimmed (%)",
"value": vals["total_trim_perc"],
"table": "qc",
"columnBar": True},
]
}],
"plotData": [{
"sample": sample_id,
"data": {
"sparkline": vals["clean_len"]
}
}],
"badReads": vals["bad_reads"]
}
json_rep.write(json.dumps(json_dic, separators=(",", ":")))
[docs]def trimmomatic_log(log_file, sample_id):
log_storage = OrderedDict()
log_storage[sample_id] = parse_log(log_file)
#remove temp dir where log file is stored
tempdir = os.path.dirname(log_file)
os.remove(log_file)
os.rmdir(tempdir)
write_report(log_storage, "trimmomatic_report.csv", sample_id)
[docs]def clean_up(fastq_pairs, clear):
"""Cleans the working directory of unwanted temporary files"""
# Find unpaired fastq files
unpaired_fastq = [f for f in os.listdir(".")
if f.endswith("_U.fastq.gz")]
# Remove unpaired fastq files, if any
for fpath in unpaired_fastq:
os.remove(fpath)
# Expected output to assess whether it is safe to remove temporary input
expected_out = [f for f in os.listdir(".") if f.endswith("_trim.fastq.gz")]
if clear == "true" and len(expected_out) == 2:
for fq in fastq_pairs:
# Get real path of fastq files, following symlinks
rp = os.path.realpath(fq)
logger.debug("Removing temporary fastq file path: {}".format(rp))
if re.match(".*/work/.{2}/.{30}/.*", rp):
os.remove(rp)
[docs]def merge_default_adapters():
"""Merges the default adapters file in the trimmomatic adapters directory
Returns
-------
str
Path with the merged adapters file.
"""
default_adapters = [os.path.join(ADAPTERS_PATH, x) for x in
os.listdir(ADAPTERS_PATH)]
filepath = os.path.join(os.getcwd(), "default_adapters.fasta")
with open(filepath, "w") as fh, \
fileinput.input(default_adapters) as in_fh:
for line in in_fh:
fh.write("{}{}".format(line, "\\n"))
return filepath
@MainWrapper
def main(sample_id, fastq_pair, trim_range, trim_opts, phred, adapters_file,
clear):
""" Main executor of the trimmomatic template.
Parameters
----------
sample_id : str
Sample Identification string.
fastq_pair : list
Two element list containing the paired FastQ files.
trim_range : list
Two element list containing the trimming range.
trim_opts : list
Four element list containing several trimmomatic options:
[*SLIDINGWINDOW*; *LEADING*; *TRAILING*; *MINLEN*]
phred : int
Guessed phred score for the sample. The phred score is a generated
output from :py:class:`templates.integrity_coverage`.
adapters_file : str
Path to adapters file. If not provided, or the path is not available,
it will use the default adapters from Trimmomatic will be used
clear : str
Can be either 'true' or 'false'. If 'true', the input fastq files will
be removed at the end of the run, IF they are in the working directory
"""
logger.info("Starting trimmomatic")
# Create base CLI
cli = [
"java",
"-Xmx{}".format("$task.memory"[:-1].lower().replace(" ", "")),
"-jar",
TRIM_PATH.strip(),
"PE",
"-threads",
"$task.cpus"
]
# If the phred encoding was detected, provide it
try:
# Check if the provided PHRED can be converted to int
phred = int(phred)
phred_flag = "-phred{}".format(str(phred))
cli += [phred_flag]
# Could not detect phred encoding. Do not add explicit encoding to
# trimmomatic and let it guess
except ValueError:
pass
# Add input samples to CLI
cli += fastq_pair
# Add output file names
output_names = []
for i in range(len(fastq_pair)):
output_names.append("{}_{}_trim.fastq.gz".format(
SAMPLE_ID, str(i + 1)))
output_names.append("{}_{}_U.fastq.gz".format(
SAMPLE_ID, str(i + 1)))
cli += output_names
if trim_range != ["None"]:
cli += [
"CROP:{}".format(trim_range[1]),
"HEADCROP:{}".format(trim_range[0]),
]
if os.path.exists(adapters_file):
logger.debug("Using the provided adapters file '{}'".format(
adapters_file))
else:
logger.debug("Adapters file '{}' not provided or does not exist. Using"
" default adapters".format(adapters_file))
adapters_file = merge_default_adapters()
cli += [
"ILLUMINACLIP:{}:3:30:10:6:true".format(adapters_file)
]
#create log file im temporary dir to avoid issues when running on a docker container in macOS
logfile = os.path.join(tempfile.mkdtemp(prefix='tmp'), "{}_trimlog.txt".format(sample_id))
# Add trimmomatic options
cli += [
"SLIDINGWINDOW:{}".format(trim_opts[0]),
"LEADING:{}".format(trim_opts[1]),
"TRAILING:{}".format(trim_opts[2]),
"MINLEN:{}".format(trim_opts[3]),
"TOPHRED33",
"-trimlog",
logfile
]
logger.debug("Running trimmomatic subprocess with command: {}".format(cli))
p = subprocess.Popen(cli, stdout=PIPE, stderr=PIPE)
stdout, stderr = p.communicate()
# Attempt to decode STDERR output from bytes. If unsuccessful, coerce to
# string
try:
stderr = stderr.decode("utf8")
except (UnicodeDecodeError, AttributeError):
stderr = str(stderr)
logger.info("Finished trimmomatic subprocess with STDOUT:\\n"
"======================================\\n{}".format(stdout))
logger.info("Finished trimmomatic subprocesswith STDERR:\\n"
"======================================\\n{}".format(stderr))
logger.info("Finished trimmomatic with return code: {}".format(
p.returncode))
trimmomatic_log(logfile, sample_id)
if p.returncode == 0 and os.path.exists("{}_1_trim.fastq.gz".format(
SAMPLE_ID)):
clean_up(fastq_pair, clear)
# Check if trimmomatic ran successfully. If not, write the error message
# to the status channel and exit.
with open(".status", "w") as status_fh:
if p.returncode != 0:
status_fh.write("fail")
return
else:
status_fh.write("pass")
if __name__ == '__main__':
main(SAMPLE_ID, FASTQ_PAIR, TRIM_RANGE, TRIM_OPTS, PHRED, ADAPTERS_FILE,
CLEAR)