Source code for flowcraft.templates.megahit

#!/usr/bin/env python3

"""
Purpose
-------

This module is intended execute megahit on paired-end FastQ files.

Expected input
--------------

The following variables are expected whether using NextFlow or the
:py:func:`main` executor.

- ``sample_id`` : Sample Identification string.
    - e.g.: ``'SampleA'``
- ``fastq_pair`` : Pair of FastQ file paths.
    - e.g.: ``'SampleA_1.fastq.gz SampleA_2.fastq.gz'``
- ``kmers`` : Setting for megahit kmers. Can be either ``'auto'``, \
    ``'default'`` or a user provided list. All must be odd, in the range 15-255, increment <= 28
    - e.g.: ``'auto'`` or ``'default'`` or ``'55 77 99 113 127'``
- ``clear`` : If 'true', remove the input fastq files at the end of the
    component run, IF THE FILES ARE IN THE WORK DIRECTORY

Generated output
----------------

- ``contigs.fa`` : Main output of megahit with the assembly
    - e.g.: ``contigs.fa``
- ``megahit_status`` :  Stores the status of the megahit run. If it was \
    successfully executed, it stores ``'pass'``. Otherwise, it stores the\
    ``STDERR`` message.
    - e.g.: ``'pass'``

Code documentation
------------------

"""

__version__ = "1.0.1"
__build__ = "26042018"
__template__ = "megahit-nf"

import os
import re
import subprocess

from subprocess import PIPE

from flowcraft_utils.flowcraft_base import get_logger, MainWrapper

logger = get_logger(__file__)


[docs]def is_odd(k_mer): for i in k_mer: if i % 2 != 0: return True return False
def __get_version_megahit(): try: cli = ["megahit", "--version"] p = subprocess.Popen(cli, stdout=PIPE, stderr=PIPE) stdout, _ = p.communicate() version = stdout.strip().split()[-1][1:].decode("utf8") except Exception as e: logger.debug(e) version = "undefined" return { "program": "megahit", "version": version, } if __file__.endswith(".command.sh"): SAMPLE_ID = '$sample_id' FASTQ_PAIR = '$fastq_pair'.split() MAX_LEN = int('$max_len'.strip()) KMERS = '$kmers'.strip() MEM = '$task.memory' CLEAR = '$clear' logger.debug("Running {} with parameters:".format( os.path.basename(__file__))) logger.debug("SAMPLE_ID: {}".format(SAMPLE_ID)) logger.debug("FASTQ_PAIR: {}".format(FASTQ_PAIR)) logger.debug("MAX_LEN: {}".format(MAX_LEN)) logger.debug("KMERS: {}".format(KMERS)) logger.debug("CLEAR: {}".format(CLEAR))
[docs]def set_kmers(kmer_opt, max_read_len): """Returns a kmer list based on the provided kmer option and max read len. Parameters ---------- kmer_opt : str The k-mer option. Can be either ``'auto'``, ``'default'`` or a sequence of space separated integers, ``'23, 45, 67'``. max_read_len : int The maximum read length of the current sample. Returns ------- kmers : list List of k-mer values that will be provided to megahit. """ logger.debug("Kmer option set to: {}".format(kmer_opt)) # Check if kmer option is set to auto if kmer_opt == "auto": if max_read_len >= 175: kmers = [55, 77, 99, 113, 127] else: kmers = [21, 33, 55, 67, 77] logger.debug("Kmer range automatically selected based on max read" "length of {}: {}".format(max_read_len, kmers)) # Check if manual k-mers were specified elif len(kmer_opt.split()) > 1: kmers = kmer_opt.split() if kmers[0]<15 or kmers[-1]>255 or is_odd(kmers): kmers = [] logger.debug("Kmer out of range or with even numbers" "(will be automatically determined by megahit") else: logger.debug("Kmer range manually set to: {}".format(kmers)) else: kmers = [] logger.debug("Kmer range set to empty (will be automatically " "determined by megahit") return kmers
[docs]def fix_contig_names(asseembly_path): """Removes whitespace from the assembly contig names Parameters ---------- asseembly_path : path to assembly file Returns ------- str: Path to new assembly file with fixed contig names """ fixed_assembly = "fixed_assembly.fa" with open(asseembly_path) as in_hf, open(fixed_assembly, "w") as ou_fh: for line in in_hf: if line.startswith(">"): fixed_line = line.replace(" ", "_") ou_fh.write(fixed_line) else: ou_fh.write(line) return fixed_assembly
[docs]def clean_up(fastq): """ Cleans the temporary fastq files. If they are symlinks, the link source is removed Parameters ---------- fastq : list List of fastq files. """ for fq in fastq: # Get real path of fastq files, following symlinks rp = os.path.realpath(fq) logger.debug("Removing temporary fastq file path: {}".format(rp)) if re.match(".*/work/.{2}/.{30}/.*", rp): os.remove(rp)
@MainWrapper def main(sample_id, fastq_pair, max_len, kmer, mem, clear): """Main executor of the megahit template. Parameters ---------- sample_id : str Sample Identification string. fastq_pair : list Two element list containing the paired FastQ files. max_len : int Maximum read length. This value is determined in :py:class:`templates.integrity_coverage` kmer : str Can be either ``'auto'``, ``'default'`` or a sequence of space separated integers, ``'23, 45, 67'``. """ logger.info("Starting megahit") logger.info("Setting megahit kmers") kmers = set_kmers(kmer, max_len) logger.info("megahit kmers set to: {}".format(kmers)) mem_bytes = int(mem.replace(" GB", "")) * 1073741824 cli = [ "megahit", "--num-cpu-threads", "$task.cpus", "--memory", str(mem_bytes), "-o", "megahit" ] # Add kmers, if any were specified if kmers: cli += [ "--k-list", ",".join([str(x) for x in kmers]) ] # Add FastQ files cli += [ "-1", fastq_pair[0], "-2", fastq_pair[1] ] logger.debug("Running megahit subprocess with command: {}".format(cli)) p = subprocess.Popen(cli, stdout=PIPE, stderr=PIPE) stdout, stderr = p.communicate() # Attempt to decode STDERR output from bytes. If unsuccessful, coerce to # string try: stderr = stderr.decode("utf8") stdout = stdout.decode("utf8") except (UnicodeDecodeError, AttributeError): stderr = str(stderr) stdout = str(stdout) logger.info("Finished megahit subprocess with STDOUT:\\n" "======================================\\n{}".format(stdout)) logger.info("Fished megahit subprocesswith STDERR:\\n" "======================================\\n{}".format(stderr)) logger.info("Finished megahit with return code: {}".format( p.returncode)) with open(".status", "w") as fh: if p.returncode != 0: fh.write("error") return else: fh.write("pass") assembly_path = "megahit/final.contigs.fa" fixed_assembly = fix_contig_names(assembly_path) # Change the default final.contigs.fa assembly name to a more informative # one if "_trim." in fastq_pair[0]: sample_id += "_trim" # Get megahit version for output name info = __get_version_megahit() assembly_file = "{}_megahit{}.fasta".format( sample_id, info["version"].replace(".", "")) os.rename(fixed_assembly, assembly_file) logger.info("Setting main assembly file to: {}".format(assembly_file)) # Remove input fastq files when clear option is specified. # Only remove temporary input when the expected output exists. if clear == "true" and os.path.exists(assembly_file): clean_up(fastq_pair) if __name__ == '__main__': main(SAMPLE_ID, FASTQ_PAIR, MAX_LEN, KMERS, MEM, CLEAR)