Skip to content

Commit

Permalink
Merge pull request #248 from Ecogenomics/checkpointing_#207
Browse files Browse the repository at this point in the history
Checkpointing #207
  • Loading branch information
aaronmussig authored Apr 30, 2020
2 parents a84fdd2 + f5ad000 commit 4715994
Show file tree
Hide file tree
Showing 10 changed files with 291 additions and 167 deletions.
5 changes: 3 additions & 2 deletions docs/announcements.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ nav_order: 1
# Announcements

**Note (x, 2020)**:
* GTDB-Tk v1.1.1 has been released
* GTDB-Tk v??? has been released
* *Bug fixes:*
* none?
* Fixed an issue where `--scratch_dir` would fail, and not clean-up the mmap file.
* *Features:*
* Added the `infer_rank` method which established the taxonomic ranks of internal nodes of user trees based on RED
* Added the `decorate` command allowing the `de novo workflow` to be run
* Skip gene calling if existing files are found.

**Note (Apr 9, 2020)**:
* GTDB-Tk v1.1.0 has been released (**we recommend all users update to this version**)
Expand Down
12 changes: 6 additions & 6 deletions gtdbtk/classify.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,13 @@ def place_genomes(self,
mem_gb = get_memory_gb()
if mem_gb is not None:
mem_total = mem_gb['MemTotal']
if marker_set_id == 'bac120' and mem_total < 114:
if marker_set_id == 'bac120' and mem_total < 100:
self.logger.warning(f'pplacer requires ~113 GB of RAM to fully '
f'load the bacterial tree into memory. '
f'However, {mem_total}GB was detected. '
f'This may affect pplacer performance, '
f'or fail if there is insufficient scratch space.')
elif marker_set_id == 'ar122' and mem_total < 7:
elif marker_set_id == 'ar122' and mem_total < 5:
self.logger.warning(f'pplacer requires ~6.2 GB of RAM to fully '
f'load the archaeal tree into memory. '
f'However, {mem_total}GB was detected. '
Expand All @@ -127,10 +127,10 @@ def place_genomes(self,
# check if a scratch file is to be created
pplacer_mmap_file = None
if scratch_dir:
self.logger.info(
'Using a scratch file for pplacer allocations. This decreases memory usage and performance.')
pplacer_mmap_file = ' --mmap-file {}'.format(
os.path.join(scratch_dir, prefix + ".pplacer.scratch"))
self.logger.info('Using a scratch file for pplacer allocations. '
'This decreases memory usage and performance.')
pplacer_mmap_file = os.path.join(scratch_dir, prefix + ".pplacer.scratch")
make_sure_path_exists(scratch_dir)

# get path to pplacer reference package
if marker_set_id == 'bac120':
Expand Down
40 changes: 27 additions & 13 deletions gtdbtk/external/pfam_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. #
# #
###############################################################################

import logging
import multiprocessing as mp
import os
import sys

from gtdbtk.exceptions import GTDBTkExit
from gtdbtk.external.pypfam.Scan.PfamScan import PfamScan
from gtdbtk.io.marker.tophit import TopHitPfamFile
from gtdbtk.tools import sha256
from gtdbtk.tools import sha256, file_has_checksum


class PfamSearch(object):
Expand All @@ -39,7 +39,8 @@ def __init__(self,
"""Initialization."""

self.threads = threads

self.logger = logging.getLogger('timestamp')
self.warnings = logging.getLogger('warnings')
self.pfam_hmm_dir = pfam_hmm_dir
self.protein_file_suffix = protein_file_suffix
self.pfam_suffix = pfam_suffix
Expand Down Expand Up @@ -80,7 +81,7 @@ def _topHit(self, pfam_file):

tophit_file.write()

def _workerThread(self, queueIn, queueOut):
def _workerThread(self, queueIn, queueOut, n_skipped):
"""Process each data item in parallel."""
try:
while True:
Expand All @@ -93,16 +94,23 @@ def _workerThread(self, queueIn, queueOut):
output_hit_file = os.path.join(self.output_dir, genome_id, filename.replace(self.protein_file_suffix,
self.pfam_suffix))

pfam_scan = PfamScan(cpu=self.cpus_per_genome, fasta=gene_file, dir=self.pfam_hmm_dir)
pfam_scan.search()
pfam_scan.write_results(output_hit_file, None, None, None, None)
# Check if this has already been processed.
out_files = (output_hit_file, TopHitPfamFile.get_path(self.output_dir, genome_id))
if all([file_has_checksum(x) for x in out_files]):
self.warnings.info(f'Skipped Pfam processing for: {genome_id}')
with n_skipped.get_lock():
n_skipped.value += 1
else:
pfam_scan = PfamScan(cpu=self.cpus_per_genome, fasta=gene_file, dir=self.pfam_hmm_dir)
pfam_scan.search()
pfam_scan.write_results(output_hit_file, None, None, None, None)

# calculate checksum
with open(output_hit_file + self.checksum_suffix, 'w') as fh:
fh.write(sha256(output_hit_file))
# calculate checksum
with open(output_hit_file + self.checksum_suffix, 'w') as fh:
fh.write(sha256(output_hit_file))

# identify top hit for each gene
self._topHit(output_hit_file)
# identify top hit for each gene
self._topHit(output_hit_file)

queueOut.put(gene_file)
except Exception as error:
Expand Down Expand Up @@ -138,6 +146,7 @@ def run(self, gene_files):
# populate worker queue with data to process
workerQueue = mp.Queue()
writerQueue = mp.Queue()
n_skipped = mp.Value('i', 0)

for f in gene_files:
workerQueue.put(f)
Expand All @@ -147,7 +156,7 @@ def run(self, gene_files):

try:
workerProc = [mp.Process(target=self._workerThread, args=(
workerQueue, writerQueue)) for _ in range(self.threads)]
workerQueue, writerQueue, n_skipped)) for _ in range(self.threads)]
writeProc = mp.Process(target=self._writerThread, args=(
len(gene_files), writerQueue))

Expand All @@ -169,3 +178,8 @@ def run(self, gene_files):

writeProc.terminate()
raise

if n_skipped.value > 0:
genome_s = 'genome' if n_skipped.value == 1 else 'genomes'
self.logger.warning(f'Pfam skipped {n_skipped.value} {genome_s} '
f'due to pre-existing data, see warnings.log')
196 changes: 112 additions & 84 deletions gtdbtk/external/pplacer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,80 +16,15 @@
###############################################################################

import logging
import multiprocessing as mp
import os
import queue
import re
import subprocess
import sys

from gtdbtk.exceptions import PplacerException, TogException


class PplacerLogger(object):
"""Helper class for writing pplacer output."""

def __init__(self, fh):
"""Initialise the class.
Parameters
----------
fh : BinaryIO
The file to write to .
"""
self.fh = fh

def _disp_progress(self, line):
"""Calculates the progress and writes it to stdout.
Parameters
----------
line : str
The line passed from pplacer stdout.
"""
if not line.startswith('working on '):
sys.stdout.write(f'\rInitialising pplacer [{line[0:50].center(50)}]')
sys.stdout.flush()
else:
re_hits = re.search(r'\((\d+)\/(\d+)\)', line)
current = int(re_hits.group(1))
total = int(re_hits.group(2))
sys.stdout.write('\r{}'.format(self._get_progress_str(current,
total)))
sys.stdout.flush()

def _get_progress_str(self, current, total):
"""Determines the format of the genomes % string.
Parameters
----------
current : int
The current number of genomes which have been placed.
total : int
The total number of genomes which are to be placed.
Returns
-------
out : str
A string formatted to show the progress of placement.
"""
width = 50
bar = str()
prop = float(current) / total
bar += '#' * int(prop * width)
bar += '-' * (width - len(bar))
return 'Placing genomes |{}| {}/{} ({:.2f}%)'.format(bar, current,
total, prop * 100)

def read(self, line):
"""Reads a line and writes the progress to stdout and the file.
Parameters
----------
line : str
A line returned from Prodigal stdout.
"""
self.fh.write(line)
line = line.strip()
self._disp_progress(line)
from gtdbtk.tools import get_proc_memory_gb


class Pplacer(object):
Expand All @@ -106,7 +41,7 @@ def _get_version(self):
try:
env = os.environ.copy()
proc = subprocess.Popen(['pplacer', '--version'], stdout=subprocess.PIPE,
stderr=subprocess.PIPE, env=env, encoding='utf-8')
stderr=subprocess.PIPE, env=env, encoding='utf-8')

output, error = proc.communicate()
return output.strip()
Expand All @@ -131,35 +66,128 @@ def run(self, cpus, model, ref_pkg, json_out, msa_file, pplacer_out,
file isn't generated.
"""

args = ['pplacer', '-m', model, '-j', str(cpus), '-c', ref_pkg, '-o',
json_out, msa_file]
if mmap_file:
args.append('--mmap-file')
args.append(mmap_file)
self.logger.debug(' '.join(args))

proc = subprocess.Popen(args, stdout=subprocess.PIPE, encoding='utf-8')
with open(pplacer_out, 'w') as fh:
pplacer_logger = PplacerLogger(fh)
while True:
line = proc.stdout.readline()
if not line:
sys.stdout.write('\n')
break
pplacer_logger.read(line)
proc.wait()
out_q = mp.Queue()
pid = mp.Value('i', 0)
p_worker = mp.Process(target=self._worker, args=(args, out_q, pplacer_out, pid))
p_writer = mp.Process(target=self._writer, args=(out_q, pid))

if proc.returncode != 0:
raise PplacerException('An error was encountered while '
'running pplacer, check the log '
'file: {}'.format(pplacer_out))
try:
p_worker.start()
p_writer.start()

p_worker.join()
out_q.put(None)
p_writer.join()

if p_worker.exitcode != 0:
raise PplacerException('An error was encountered while running pplacer.')
except Exception:
p_worker.terminate()
p_writer.terminate()
raise
finally:
if mmap_file:
os.remove(mmap_file)

if not os.path.isfile(json_out):
self.logger.error('pplacer returned a zero exit code but no output '
'file was generated.')
raise PplacerException

def _worker(self, args, out_q, pplacer_out, pid):
"""The worker thread writes the piped output of pplacer to disk and
shares it with the writer thread for logging."""
with subprocess.Popen(args, stdout=subprocess.PIPE, encoding='utf-8') as proc:
with pid.get_lock():
pid.value = proc.pid

with open(pplacer_out, 'w') as fh:
while True:
line = proc.stdout.readline()
if not line:
break
fh.write(f'{line}')
out_q.put(line)
proc.wait()

if proc.returncode != 0:
raise PplacerException('An error was encountered while '
'running pplacer, check the log '
'file: {}'.format(pplacer_out))

def _writer(self, out_q, pid):
"""The writer subprocess is able to report on newly piped events from
subprocess in the worker thread, and report on memory usage while
waiting for new comands."""
states = ['Reading user alignment',
'Reading reference alignment',
'Pre-masking sequences',
'Determining figs',
'Allocating memory for internal nodes',
'Caching likelihood information on reference tree',
'Pulling exponents',
'Preparing the edges for baseball',
'Placing genomes']
cur_state = None
n_total, n_placed = None, 0
while True:
try:
state = out_q.get(block=True, timeout=5)
if not state:
break
elif state.startswith('Running pplacer'):
cur_state = 0
elif state.startswith("Didn't find any reference"):
cur_state = 1
elif state.startswith('Pre-masking sequences'):
cur_state = 2
elif state.startswith('Determining figs'):
cur_state = 3
elif state.startswith('Allocating memory for internal'):
cur_state = 4
elif state.startswith('Caching likelihood information'):
cur_state = 5
elif state.startswith('Pulling exponents'):
cur_state = 6
elif state.startswith('Preparing the edges'):
cur_state = 7
elif state.startswith('working on '):
cur_state = 8
else:
cur_state = None
sys.stdout.write(f'\r==> {state}')

if cur_state and cur_state == 8:
if not n_total:
n_total = int(re.search(r'\((\d+)\/(\d+)\)', state).group(2))
n_placed += 1
sys.stdout.write(
f'\r==> Step 9 of 9: placing genome {n_placed} of {n_total} ({n_placed / n_total:.2%})')
elif cur_state and (cur_state >= 0 or cur_state < 8):
sys.stdout.write(f'\r==> Step {cur_state + 1} of 9: {states[cur_state + 1]}.')
sys.stdout.flush()

# Report the memory usage if at a memory-reportable state.
except queue.Empty:
if cur_state == 3:
virt, res = get_proc_memory_gb(pid.value)
sys.stdout.write(f'\r==> Step {cur_state + 1} of 9: {states[4]} ({virt:.2f} GB)')
elif cur_state == 4:
virt, res = get_proc_memory_gb(pid.value)
sys.stdout.write(
f'\r==> Step {cur_state + 1} of 9: {states[5]} ({res:.2f}/{virt:.2f} GB, {res / virt:.2%})')
sys.stdout.flush()
except Exception:
pass
sys.stdout.write('\n')

def tog(self, pplacer_json_out, tree_file):
""" Convert the pplacer json output into a newick tree.
Expand Down
Loading

0 comments on commit 4715994

Please sign in to comment.