Skip to content

Commit

Permalink
Refactor PubMLST data fetching
Browse files Browse the repository at this point in the history
  • Loading branch information
ahdamin committed Dec 6, 2024
1 parent 45ff7d7 commit a2864c6
Showing 1 changed file with 86 additions and 89 deletions.
175 changes: 86 additions & 89 deletions microSALT/utils/referencer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,23 @@
import shutil
import subprocess
import urllib.request
import xml.etree.ElementTree as ET
import zipfile

from Bio import Entrez
import xml.etree.ElementTree as ET

from microSALT.store.db_manipulator import DB_Manipulator
from microSALT.utils.pubmlst.api import (
check_database_metadata,
download_locus,
download_profiles,
fetch_schemes,
query_databases,
)
from microSALT.utils.pubmlst.authentication import (
get_new_session_token,
load_session_token,
)


class Referencer:
Expand Down Expand Up @@ -44,8 +56,12 @@ def __init__(self, config, log, sampleinfo={}, force=False):
self.name = self.sampleinfo.get("CG_ID_sample")
self.sample = self.sampleinfo

self.token, self.secret = load_session_token()
if not self.token or not self.secret:
self.token, self.secret = get_new_session_token()

def identify_new(self, cg_id="", project=False):
""" Automatically downloads pubMLST & NCBI organisms not already downloaded """
"""Automatically downloads pubMLST & NCBI organisms not already downloaded"""
neworgs = list()
newrefs = list()
try:
Expand Down Expand Up @@ -88,9 +104,7 @@ def index_db(self, full_dir, suffix):
"""Check for indexation, makeblastdb job if not enough of them."""
reindexation = False
files = os.listdir(full_dir)
sufx_files = glob.glob(
"{}/*{}".format(full_dir, suffix)
) # List of source files
sufx_files = glob.glob("{}/*{}".format(full_dir, suffix)) # List of source files
for file in sufx_files:
subsuf = "\{}$".format(suffix)
base = re.sub(subsuf, "", file)
Expand All @@ -102,10 +116,7 @@ def index_db(self, full_dir, suffix):
if os.path.basename(base) == elem[: elem.rfind(".")]:
bases = bases + 1
# Number of index files fresher than source (6)
if (
os.stat(file).st_mtime
< os.stat("{}/{}".format(full_dir, elem)).st_mtime
):
if os.stat(file).st_mtime < os.stat("{}/{}".format(full_dir, elem)).st_mtime:
newer = newer + 1
# 7 for parse_seqids, 4 for not.
if not (bases == 7 or newer == 6) and not (bases == 4 and newer == 3):
Expand All @@ -118,18 +129,16 @@ def index_db(self, full_dir, suffix):
)
# MLST locis
else:
bash_cmd = "makeblastdb -in {}/{} -dbtype nucl -parse_seqids -out {}".format(
full_dir, os.path.basename(file), os.path.basename(base)
bash_cmd = (
"makeblastdb -in {}/{} -dbtype nucl -parse_seqids -out {}".format(
full_dir, os.path.basename(file), os.path.basename(base)
)
)
proc = subprocess.Popen(
bash_cmd.split(), cwd=full_dir, stdout=subprocess.PIPE
)
proc = subprocess.Popen(bash_cmd.split(), cwd=full_dir, stdout=subprocess.PIPE)
proc.communicate()
except Exception as e:
self.logger.error(
"Unable to index requested target {} in {}".format(
file, full_dir
)
"Unable to index requested target {} in {}".format(file, full_dir)
)
if reindexation:
self.logger.info("Re-indexed contents of {}".format(full_dir))
Expand All @@ -142,7 +151,7 @@ def fetch_external(self, force=False):
for entry in root:
# Check organism
species = entry.text.strip()
organ = species.lower().replace(" ", "_")
organ = species.lower().replace(" ", "_")
if "escherichia_coli" in organ and "#1" in organ:
organ = organ[:-2]
if organ in self.organisms:
Expand All @@ -151,15 +160,11 @@ def fetch_external(self, force=False):
st_link = entry.find("./mlst/database/profiles/url").text
profiles_query = urllib.request.urlopen(st_link)
profile_no = profiles_query.readlines()[-1].decode("utf-8").split("\t")[0]
if (
organ.replace("_", " ") not in self.updated
and (
int(profile_no.replace("-", "")) > int(currver.replace("-", ""))
or force
)
if organ.replace("_", " ") not in self.updated and (
int(profile_no.replace("-", "")) > int(currver.replace("-", "")) or force
):
# Download MLST profiles
self.logger.info("Downloading new MLST profiles for " + species)
self.logger.info("Downloading new MLST profiles for " + species)
output = "{}/{}".format(self.config["folders"]["profiles"], organ)
urllib.request.urlretrieve(st_link, output)
# Clear existing directory and download allele files
Expand All @@ -169,7 +174,9 @@ def fetch_external(self, force=False):
for locus in entry.findall("./mlst/database/loci/locus"):
locus_name = locus.text.strip()
locus_link = locus.find("./url").text
urllib.request.urlretrieve(locus_link, "{}/{}.tfa".format(out, locus_name))
urllib.request.urlretrieve(
locus_link, "{}/{}.tfa".format(out, locus_name)
)
# Create new indexes
self.index_db(out, ".tfa")
# Update database
Expand All @@ -180,9 +187,7 @@ def fetch_external(self, force=False):
)
self.db_access.reload_profiletable(organ)
except Exception as e:
self.logger.warning(
"Unable to update pubMLST external data: {}".format(e)
)
self.logger.warning("Unable to update pubMLST external data: {}".format(e))

def resync(self, type="", sample="", ignore=False):
"""Manipulates samples that have an internal ST that differs from pubMLST ST"""
Expand Down Expand Up @@ -225,9 +230,7 @@ def fetch_resistances(self, force=False):

for file in os.listdir(hiddensrc):
if file not in actual and (".fsa" in file):
self.logger.info(
"resFinder database files corrupted. Syncing..."
)
self.logger.info("resFinder database files corrupted. Syncing...")
wipeIndex = True
break

Expand Down Expand Up @@ -259,12 +262,12 @@ def fetch_resistances(self, force=False):
self.index_db(self.config["folders"]["resistances"], ".fsa")

def existing_organisms(self):
""" Returns list of all organisms currently added """
"""Returns list of all organisms currently added"""
return self.organisms

def organism2reference(self, normal_organism_name):
"""Finds which reference contains the same words as the organism
and returns it in a format for database calls. Returns empty string if none found"""
and returns it in a format for database calls. Returns empty string if none found"""
orgs = os.listdir(self.config["folders"]["references"])
organism = re.split(r"\W+", normal_organism_name.lower())
try:
Expand Down Expand Up @@ -293,13 +296,11 @@ def organism2reference(self, normal_organism_name):
)

def download_ncbi(self, reference):
""" Checks available references, downloads from NCBI if not present """
"""Checks available references, downloads from NCBI if not present"""
try:
DEVNULL = open(os.devnull, "wb")
Entrez.email = "[email protected]"
record = Entrez.efetch(
db="nucleotide", id=reference, rettype="fasta", retmod="text"
)
record = Entrez.efetch(db="nucleotide", id=reference, rettype="fasta", retmod="text")
sequence = record.read()
output = "{}/{}.fasta".format(self.config["folders"]["genomes"], reference)
with open(output, "w") as f:
Expand All @@ -322,20 +323,16 @@ def download_ncbi(self, reference):
out, err = proc.communicate()
self.logger.info("Downloaded reference {}".format(reference))
except Exception as e:
self.logger.warning(
"Unable to download genome '{}' from NCBI".format(reference)
)
self.logger.warning("Unable to download genome '{}' from NCBI".format(reference))

def add_pubmlst(self, organism):
""" Checks pubmlst for references of given organism and downloads them """
"""Checks pubmlst for references of given organism and downloads them"""
# Organism must be in binomial format and only resolve to one hit
errorg = organism
try:
organism = organism.lower().replace(".", " ")
if organism.replace(" ", "_") in self.organisms and not self.force:
self.logger.info(
"Organism {} already stored in microSALT".format(organism)
)
self.logger.info("Organism {} already stored in microSALT".format(organism))
return
db_query = self.query_pubmlst()

Expand All @@ -357,9 +354,7 @@ def add_pubmlst(self, organism):
seqdef_url = subtype["href"]
desc = subtype["description"]
counter += 1.0
self.logger.info(
"Located pubMLST hit {} for sample".format(desc)
)
self.logger.info("Located pubMLST hit {} for sample".format(desc))
if counter > 2.0:
raise Exception(
"Reference '{}' resolved to {} organisms. Please be more stringent".format(
Expand All @@ -369,9 +364,7 @@ def add_pubmlst(self, organism):
elif counter < 1.0:
# add external
raise Exception(
"Unable to find requested organism '{}' in pubMLST database".format(
errorg
)
"Unable to find requested organism '{}' in pubMLST database".format(errorg)
)
else:
truename = desc.lower().split(" ")
Expand All @@ -384,7 +377,7 @@ def add_pubmlst(self, organism):
self.logger.warning(e.args[0])

def query_pubmlst(self):
""" Returns a json object containing all organisms available via pubmlst.org """
"""Returns a json object containing all organisms available via pubmlst.org"""
# Example request URI: http://rest.pubmlst.org/db/pubmlst_neisseria_seqdef/schemes/1/profiles_csv
seqdef_url = dict()
databases = "http://rest.pubmlst.org/db"
Expand All @@ -394,7 +387,7 @@ def query_pubmlst(self):
return db_query

def get_mlst_scheme(self, subtype_href):
""" Returns the path for the MLST data scheme at pubMLST """
"""Returns the path for the MLST data scheme at pubMLST"""
try:
mlst = False
record_req_1 = urllib.request.Request("{}/schemes/1".format(subtype_href))
Expand All @@ -412,13 +405,13 @@ def get_mlst_scheme(self, subtype_href):
if mlst:
self.logger.debug("Found data at pubMLST: {}".format(mlst))
return mlst
else:
else:
self.logger.warning("Could not find MLST data at {}".format(subtype_href))
except Exception as e:
self.logger.warning(e)

def external_version(self, organism, subtype_href):
""" Returns the version (date) of the data available on pubMLST """
"""Returns the version (date) of the data available on pubMLST"""
mlst_href = self.get_mlst_scheme(subtype_href)
try:
with urllib.request.urlopen(mlst_href) as response:
Expand All @@ -429,17 +422,13 @@ def external_version(self, organism, subtype_href):
self.logger.warning(e)

def download_pubmlst(self, organism, subtype_href, force=False):
""" Downloads ST and loci for a given organism stored on pubMLST if it is more recent. Returns update date """
"""Downloads ST and loci for a given organism stored on pubMLST if it is more recent. Returns update date"""
organism = organism.lower().replace(" ", "_")

# Pull version
extver = self.external_version(organism, subtype_href)
currver = self.db_access.get_version("profile_{}".format(organism))
if (
int(extver.replace("-", ""))
<= int(currver.replace("-", ""))
and not force
):
if int(extver.replace("-", "")) <= int(currver.replace("-", "")) and not force:
# self.logger.info("Profile for {} already at latest version".format(organism.replace('_' ,' ').capitalize()))
return currver

Expand Down Expand Up @@ -473,32 +462,40 @@ def download_pubmlst(self, organism, subtype_href, force=False):
self.index_db(output, ".tfa")

def fetch_pubmlst(self, force=False):
""" Updates reference for data that is stored on pubMLST """
seqdef_url = dict()
db_query = self.query_pubmlst()
"""Fetches and updates PubMLST data"""
try:
self.logger.info("Querying available PubMLST databases...")
databases = query_databases(self.token, self.secret)

for db in databases.get("databases", []):
db_name = db["description"]
if db_name.replace(" ", "_").lower() in self.organisms and not force:
self.logger.info(f"Database {db_name} is already up-to-date.")
continue

self.logger.info(f"Fetching schemes for {db_name}...")
schemes = fetch_schemes(db["name"], self.token, self.secret)

for scheme in schemes.get("schemes", []):
if "MLST" in scheme["description"]:
self.logger.info(f"Downloading profiles for {db_name}...")
profiles = download_profiles(
db["name"], scheme["id"], self.token, self.secret
)

# Fetch seqdef locations
for item in db_query:
for subtype in item["databases"]:
for name in self.organisms:
if name.replace("_", " ") in subtype["description"].lower():
# Seqdef always appear after isolates, so this is fine
self.updated.append(name.replace("_", " "))
seqdef_url[name] = subtype["href"]

for key, val in seqdef_url.items():
internal_ver = self.db_access.get_version("profile_{}".format(key))
external_ver = self.external_version(key, val)
if (internal_ver < external_ver) or force:
self.logger.info(
"pubMLST reference for {} updated to {} from {}".format(
key.replace("_", " ").capitalize(), external_ver, internal_ver
)
)
self.download_pubmlst(key, val, force)
self.db_access.upd_rec(
{"name": "profile_{}".format(key)},
"Versions",
{"version": external_ver},
)
self.db_access.reload_profiletable(key)
self.logger.info(f"Profiles fetched for {db_name}. Total: {len(profiles)}.")

# Handle loci
for locus in scheme.get("loci", []):
self.logger.info(f"Downloading locus {locus} for {db_name}...")
locus_data = download_locus(db["name"], locus, self.token, self.secret)
self.logger.info(f"Locus {locus} downloaded successfully.")

# Metadata check
metadata = check_database_metadata(db["name"], self.token, self.secret)
self.logger.info(
f"Database metadata for {db_name}: {metadata.get('last_updated')}"
)

except Exception as e:
self.logger.error(f"Failed to fetch PubMLST data: {e}")

0 comments on commit a2864c6

Please sign in to comment.