Skip to content

Commit

Permalink
Make plugin-based caching versioned and lazy
Browse files Browse the repository at this point in the history
  • Loading branch information
cthoyt committed Jan 7, 2021
1 parent 2ccd5f6 commit 5803f97
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 61 deletions.
44 changes: 27 additions & 17 deletions src/pyobo/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from .identifier_utils import normalize_curie, wrap_norm_prefix
from .path_utils import prefix_cache_join
from .registries import not_available_as_obo
from .sources import has_nomenclature_plugin, run_nomenclature_plugin
from .struct import Reference, TypeDef, get_reference_tuple
from .struct.typedef import has_member, is_a, part_of

Expand Down Expand Up @@ -64,7 +65,14 @@

RelationHint = Union[Reference, TypeDef, Tuple[str, str]]

NO_ALTS = {'ncbigene'}
NO_ALTS = {
'ncbigene',
}


def _get_version(prefix: str) -> Optional[str]:
if has_nomenclature_plugin(prefix):
return run_nomenclature_plugin(prefix).data_version


def get_name_by_curie(curie: str) -> Optional[str]:
Expand Down Expand Up @@ -131,7 +139,7 @@ def get_id_name_mapping(prefix: str, force: bool = False, **kwargs) -> Mapping[s
logger.info('[%s] done loading name mappings', prefix)
return rv

path = prefix_cache_join(prefix, 'names.tsv')
path = prefix_cache_join(prefix, 'names.tsv', version=_get_version(prefix))

@cached_mapping(path=path, header=[f'{prefix}_id', 'name'], force=force)
def _get_id_name_mapping() -> Mapping[str, str]:
Expand Down Expand Up @@ -164,7 +172,7 @@ def get_id_species_mapping(prefix: str, force: bool = False, **kwargs) -> Mappin
logger.info('[%s] done loading species mappings', prefix)
return rv

path = prefix_cache_join(prefix, 'species.tsv')
path = prefix_cache_join(prefix, 'species.tsv', version=_get_version(prefix))

@cached_mapping(path=path, header=[f'{prefix}_id', 'species'], force=force)
def _get_id_species_mapping() -> Mapping[str, str]:
Expand All @@ -180,7 +188,7 @@ def _get_id_species_mapping() -> Mapping[str, str]:
@wrap_norm_prefix
def get_typedef_id_name_mapping(prefix: str, force: bool = False, **kwargs) -> Mapping[str, str]:
"""Get an identifier to name mapping for the typedefs in an OBO file."""
path = prefix_cache_join(prefix, 'typedefs.tsv')
path = prefix_cache_join(prefix, 'typedefs.tsv', version=_get_version(prefix))

@cached_mapping(path=path, header=[f'{prefix}_id', 'name'], force=force)
def _get_typedef_id_name_mapping() -> Mapping[str, str]:
Expand All @@ -195,7 +203,7 @@ def _get_typedef_id_name_mapping() -> Mapping[str, str]:
@wrap_norm_prefix
def get_id_synonyms_mapping(prefix: str, force: bool = False, **kwargs) -> Mapping[str, List[str]]:
"""Get the OBO file and output a synonym dictionary."""
path = prefix_cache_join(prefix, "synonyms.tsv")
path = prefix_cache_join(prefix, "synonyms.tsv", version=_get_version(prefix))

@cached_multidict(path=path, header=[f'{prefix}_id', 'synonym'], force=force)
def _get_multidict() -> Mapping[str, List[str]]:
Expand All @@ -209,7 +217,7 @@ def _get_multidict() -> Mapping[str, List[str]]:
@wrap_norm_prefix
def get_properties_df(prefix: str, force: bool = False, **kwargs) -> pd.DataFrame:
"""Extract properties."""
path = prefix_cache_join(prefix, "properties.tsv")
path = prefix_cache_join(prefix, "properties.tsv", version=_get_version(prefix))

@cached_df(path=path, dtype=str, force=force)
def _df_getter() -> pd.DataFrame:
Expand All @@ -231,8 +239,8 @@ def get_filtered_properties_mapping(
**kwargs,
) -> Mapping[str, str]:
"""Extract a single property for each term as a dictionary."""
path = prefix_cache_join(prefix, 'properties', f"{prop}.tsv")
all_properties_path = prefix_cache_join(prefix, 'properties.tsv')
path = prefix_cache_join(prefix, 'properties', f"{prop}.tsv", version=_get_version(prefix))
all_properties_path = prefix_cache_join(prefix, 'properties.tsv', version=_get_version(prefix))

@cached_mapping(path=path, header=[f'{prefix}_id', prop], force=force)
def _mapping_getter() -> Mapping[str, str]:
Expand Down Expand Up @@ -260,8 +268,8 @@ def get_filtered_properties_df(
**kwargs,
) -> pd.DataFrame:
"""Extract a single property for each term."""
path = prefix_cache_join(prefix, 'properties', f"{prop}.tsv")
all_properties_path = prefix_cache_join(prefix, 'properties.tsv')
path = prefix_cache_join(prefix, 'properties', f"{prop}.tsv", version=_get_version(prefix))
all_properties_path = prefix_cache_join(prefix, 'properties.tsv', version=_get_version(prefix))

@cached_df(path=path, dtype=str, force=force)
def _df_getter() -> pd.DataFrame:
Expand All @@ -288,7 +296,7 @@ def get_relations_df(
**kwargs,
) -> pd.DataFrame:
"""Get all relations from the OBO."""
path = prefix_cache_join(prefix, 'relations.tsv')
path = prefix_cache_join(prefix, 'relations.tsv', version=_get_version(prefix))

@cached_df(path=path, dtype=str, force=force)
def _df_getter() -> pd.DataFrame:
Expand Down Expand Up @@ -317,8 +325,10 @@ def get_filtered_relations_df(
) -> pd.DataFrame:
"""Get all of the given relation."""
relation_prefix, relation_identifier = relation = get_reference_tuple(relation)
path = prefix_cache_join(prefix, 'relations', f'{relation_prefix}:{relation_identifier}.tsv')
all_relations_path = prefix_cache_join(prefix, 'relations.tsv')
path = prefix_cache_join(
prefix, 'relations', f'{relation_prefix}:{relation_identifier}.tsv', version=_get_version(prefix),
)
all_relations_path = prefix_cache_join(prefix, 'relations.tsv', version=_get_version(prefix))

@cached_df(path=path, dtype=str, force=force)
def _df_getter() -> pd.DataFrame:
Expand Down Expand Up @@ -361,8 +371,8 @@ def get_filtered_xrefs(
**kwargs,
) -> Mapping[str, str]:
"""Get xrefs to a given target."""
path = prefix_cache_join(prefix, 'xrefs', f"{xref_prefix}.tsv")
all_xrefs_path = prefix_cache_join(prefix, 'xrefs.tsv')
path = prefix_cache_join(prefix, 'xrefs', f"{xref_prefix}.tsv", version=_get_version(prefix))
all_xrefs_path = prefix_cache_join(prefix, 'xrefs.tsv', version=_get_version(prefix))
header = [f'{prefix}_id', f'{xref_prefix}_id']

@cached_mapping(path=path, header=header, use_tqdm=use_tqdm, force=force)
Expand All @@ -388,7 +398,7 @@ def _get_mapping() -> Mapping[str, str]:
@wrap_norm_prefix
def get_xrefs_df(prefix: str, *, use_tqdm: bool = False, force: bool = False, **kwargs) -> pd.DataFrame:
"""Get all xrefs."""
path = prefix_cache_join(prefix, 'xrefs.tsv')
path = prefix_cache_join(prefix, 'xrefs.tsv', version=_get_version(prefix))

@cached_df(path=path, dtype=str, force=force)
def _df_getter() -> pd.DataFrame:
Expand All @@ -406,7 +416,7 @@ def get_id_to_alts(prefix: str, force: bool = False, **kwargs) -> Mapping[str, L
if prefix in NO_ALTS:
return {}

path = prefix_cache_join(prefix, 'alt_ids.tsv')
path = prefix_cache_join(prefix, 'alt_ids.tsv', version=_get_version(prefix))
header = [f'{prefix}_id', 'alt_id']

@cached_multidict(path=path, header=header, force=force)
Expand Down
13 changes: 7 additions & 6 deletions src/pyobo/getters.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,13 @@ def get(prefix: str, *, url: Optional[str] = None, local: bool = False) -> Obo:
if path.exists() and not local:
logger.debug('[%s] using obonet cache at %s', prefix, path)
return Obo.from_obonet_gz(path)
else:
logger.debug('[%s] no obonet cache found at %s', prefix, path)

if has_nomenclature_plugin(prefix):
elif has_nomenclature_plugin(prefix):
obo = run_nomenclature_plugin(prefix)
logger.info('[%s] caching OBO at %s', prefix, path)
logger.info('[%s] caching nomenclature plugin', prefix)
obo.write_default()
return obo
else:
logger.debug('[%s] no obonet cache found at %s', prefix, path)

obo = _get_obo_via_obonet(prefix=prefix, url=url, local=local)
if not local:
Expand Down Expand Up @@ -176,9 +175,11 @@ def iter_helper_helper(f: Callable[[str], X], strict: bool = True) -> Iterable[T
:raises URLError: If another problem was encountered during download
:raises ValueError: If the data was not in the format that was expected (e.g., OWL)
"""
for prefix in sorted(bioregistry.read_bioregistry()):
it = tqdm(sorted(bioregistry.read_bioregistry()))
for prefix in it:
if prefix in SKIP:
continue
it.set_postfix({'prefix': prefix})
try:
mapping = f(prefix)
except NoBuild:
Expand Down
133 changes: 95 additions & 38 deletions src/pyobo/struct/struct.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,62 +366,119 @@ def from_obonet_gz(cls, path: Union[str, pathlib.Path]) -> 'Obo':
"""Read OBO from a pre-compiled Obonet JSON."""
return cls.from_obonet(get_gzipped_graph(path))

def _path(self, *parts: str):
def _path(self, *parts: str) -> Path:
return prefix_directory_join(self.ontology, *parts, version=self.data_version)

def _cache(self, *parts: str):
def _cache(self, *parts: str) -> Path:
return self._path('cache', *parts)

def write_default(self, use_tqdm: bool = True, write_obo: bool = False, write_obonet: bool = False) -> None:
@property
def _names_path(self) -> Path:
return self._cache('names.tsv')

@property
def _species_path(self) -> Path:
return self._cache('species.tsv')

@property
def _synonyms_path(self) -> Path:
return self._cache('synonyms.tsv')

@property
def _alts_path(self):
return self._cache('alt_ids.tsv')

@property
def _xrefs_path(self) -> Path:
return self._cache('xrefs.tsv')

@property
def _relations_path(self) -> Path:
return self._cache('relations.tsv')

@property
def _properties_path(self) -> Path:
return self._cache('properties.tsv')

@property
def _obo_path(self) -> Path:
return get_prefix_obo_path(self.ontology, version=self.data_version)

@property
def _obonet_gz_path(self) -> Path:
return self._path(f"{self.ontology}.obonet.json.gz")

def write_default(
self,
use_tqdm: bool = True,
force: bool = False,
write_obo: bool = False,
write_obonet: bool = False,
) -> None:
"""Write the OBO to the default path."""
write_map_tsv(
path=self._cache('names.tsv'),
header=[f'{self.ontology}_id', 'name'],
rv=self.get_id_name_mapping(),
)
write_map_tsv(
path=self._cache('species.tsv'),
header=[f'{self.ontology}_id', 'taxonomy_id'],
rv=self.get_id_species_mapping(),
)
write_multimap_tsv(
path=self._cache('synonyms.tsv'),
header=[f'{self.ontology}_id', 'synonym'],
rv=self.get_id_synonyms_mapping(),
)
write_multimap_tsv(
path=self._cache('alt_ids.tsv'),
header=[f'{self.ontology}_id', 'alt_id'],
rv=self.get_id_alts_mapping(),
)
if not self._names_path.exists() or force:
logger.info('[%s] caching names', self.ontology)
write_map_tsv(
path=self._names_path,
header=[f'{self.ontology}_id', 'name'],
rv=self.get_id_name_mapping(),
)

for df_name, get_df in [
('xrefs', self.get_xrefs_df),
('relations', self.get_relations_df),
('properties', self.get_properties_df),
if not self._species_path.exists() or force:
logger.info('[%s] caching species', self.ontology)
write_map_tsv(
path=self._species_path,
header=[f'{self.ontology}_id', 'taxonomy_id'],
rv=self.get_id_species_mapping(),
)

if not self._synonyms_path.exists() or force:
logger.info('[%s] caching synonyms', self.ontology)
write_multimap_tsv(
path=self._synonyms_path,
header=[f'{self.ontology}_id', 'synonym'],
rv=self.get_id_synonyms_mapping(),
)

if not self._alts_path.exists() or force:
logger.info('[%s] caching alts', self.ontology)
write_multimap_tsv(
path=self._alts_path,
header=[f'{self.ontology}_id', 'alt_id'],
rv=self.get_id_alts_mapping(),
)

for path, get_df in [
(self._xrefs_path, self.get_xrefs_df),
(self._relations_path, self.get_relations_df),
(self._properties_path, self.get_properties_df),
]:
if path.exists() and not force:
continue
logger.info('[%s] caching %s', self.ontology, path)
df: pd.DataFrame = get_df(use_tqdm=use_tqdm)
if len(df.index):
df.sort_values(list(df.columns), inplace=True)
df.to_csv(self._cache(f'{df_name}.tsv'), sep='\t', index=False)
df.sort_values(list(df.columns), inplace=True)
df.to_csv(path, sep='\t', index=False)

for relation in (is_a, has_part, part_of, from_species, orthologous):
if relation is not is_a and relation not in self.typedefs:
continue
relations_path = self._cache('relations', f'{relation.curie}.tsv')
if relations_path.exists() and not force:
continue
logger.info('[%s] caching relation %s ! %', self.ontology, relation.curie, relation.name)
relation_df = self.get_filtered_relations_df(relation)
if not len(relation_df.index):
continue
relation_df.sort_values(list(relation_df.columns), inplace=True)
relation_df.to_csv(self._cache('relations', f'{relation.curie}.tsv'), sep='\t', index=False)
relation_df.to_csv(relations_path, sep='\t', index=False)

if write_obo:
obo_path = get_prefix_obo_path(self.ontology, version=self.data_version)
self.write_obo(obo_path, use_tqdm=use_tqdm)
if write_obo and (not self._obo_path.exists() or force):
self.write_obo(self._obo_path, use_tqdm=use_tqdm)

if write_obonet:
obonet_gz_path = self._path(f"{self.ontology}.obonet.json.gz")
logger.info('writing obonet to %s', obonet_gz_path)
self.write_obonet_gz(obonet_gz_path)
if write_obonet and (not self._obonet_gz_path.exists() or force):
logger.info('writing obonet to %s', self._obonet_gz_path)
self.write_obonet_gz(self._obonet_gz_path)

def __iter__(self): # noqa: D105
if self.iter_only:
Expand Down

0 comments on commit 5803f97

Please sign in to comment.