generated from pyiron/pyiron_module_template
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add Resolvers to unify finding of resources
Add two classes to find files in pyiron resource paths, one for any resource files one specifically for executables that also parses out versions.
- Loading branch information
Showing
12 changed files
with
350 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,284 @@ | ||
""" | ||
Classes to find data files and executables in global paths. | ||
""" | ||
from abc import ABC, abstractmethod | ||
from collections.abc import Iterator, Iterable | ||
import os | ||
import os.path | ||
from fnmatch import fnmatch | ||
from glob import glob | ||
import re | ||
from typing import Any | ||
|
||
if os.name == "nt": | ||
EXE_SUFFIX = "bat" | ||
else: | ||
EXE_SUFFIX = "sh" | ||
|
||
class ResourceNotFound(RuntimeError): | ||
pass | ||
|
||
class AbstractResolver(ABC): | ||
""" | ||
Interface for resolvers. | ||
Implementations must define :meth:`._search`, taking a tuple of names to search for and yielding instances of any | ||
type. Implementations should pick a single type to yield, e.g. :class:`.ResourceResolver` always yields absolute | ||
paths, while :class:`.ExecutableResolver` always yields 2-tuples of a version tag and absolute paths. | ||
""" | ||
@abstractmethod | ||
def _search(self, name: tuple[str]) -> Iterator[Any]: | ||
pass | ||
|
||
def search(self, name: Iterable[str] | str = "*") -> Iterator[Any]: | ||
""" | ||
Yield all matches. | ||
When `name` is given as an iterable, returned results match at least one of the `name` globs. | ||
Args: | ||
name (str, iterable of str): file name to search for; can be an exact file name, a glob or list of those | ||
Yields: | ||
object: resources matching `name` | ||
""" | ||
if name is not None and not isinstance(name, str): | ||
name = tuple(name) | ||
else: | ||
name = (name,) | ||
yield from self._search(name) | ||
|
||
def list(self, name: Iterable[str] | str = "*") -> list[Any]: | ||
""" | ||
Return all matches. | ||
Args: | ||
name (str, iterable of str): file name to search for; can be an exact file name, a glob or list of those | ||
Returns: | ||
list: all matches returned by :meth:`.search`. | ||
""" | ||
return list(self.search(name)) | ||
|
||
def first(self, name: Iterable[str] | str = "*") -> Any: | ||
""" | ||
Return first match. | ||
Args: | ||
name (str, iterable of str): file name to search for; can be an exact file name, a glob or list of those | ||
Returns: | ||
object: the first match returned by :meth:`.search`. | ||
Raises: | ||
:class:`~.ResourceNotFound`: if no matches are found. | ||
""" | ||
try: | ||
return next(iter(self.search(name))) | ||
except StopIteration: | ||
raise ResourceNotFound(f"Could not find {name} in {self}!") from None | ||
|
||
def chain(self, *resolvers: "AbstractResolver") -> "ResolverChain": | ||
""" | ||
Return a new resolver that searches this and all given resolvers sequentially. | ||
You will likely want to ensure that all given resolvers yield the same types and e.g. not mix ExecutableResolver | ||
and ResourceResolver, but this is not checked. | ||
The advantage of using :meth:`.chain` rather than adding more paths to one resolver is when different paths have | ||
different internal sub structure, such as when combining resources from pyiron resources and conda data | ||
packages. When searching for lammps potential files, e.g. we have some folders that are set up as | ||
<resources>/lammps/potentials/... | ||
but iprpy conda package that ships the NIST potentials doesn't have the lammps/potentials | ||
<iprpy>/... | ||
With chaining we can do very easily | ||
>>> ResourceResolver([<resources>], "lammps", "potentials").chain( | ||
... ResourceResolver([<iprpy>])) # doctest: +SKIP | ||
without we'd need to modify the resource paths ourselves explicitly | ||
>>> ResourceResolver([r + '/lammps/potentials' for r in <resources>] + [<iprpy>]) # doctest: +SKIP | ||
which is a bit more awkward. | ||
Args: | ||
resolvers (:class:`.AbstractResolver`): any number of sub resolvers | ||
Returns: | ||
self: if `resolvers` is empty | ||
:class:`.ResolverChain`: otherwise | ||
""" | ||
if resolvers == (): | ||
return self | ||
return ResolverChain(self, *resolvers) | ||
|
||
|
||
class ResolverChain(AbstractResolver): | ||
""" | ||
A chain of resolvers. Matches are returned sequentially. | ||
""" | ||
__slots__ = ("_resolvers",) | ||
def __init__(self, *resolvers): | ||
""" | ||
Args: | ||
*resolvers (:class:`.AbstractResolver`): sub resolvers to use | ||
""" | ||
self._resolvers = resolvers | ||
|
||
def _search(self, name): | ||
for resolver in self._resolvers: | ||
yield from resolver.search(name) | ||
|
||
def __repr__(self): | ||
inner = ", ".join(repr(r) for r in self._resolvers) | ||
return f'{type(self).__name__}({inner})' | ||
|
||
|
||
class ResourceResolver(AbstractResolver): | ||
""" | ||
Generic resolver for files and directories. | ||
Resources are expected to conform to the following format: | ||
<resource_path>/<module>/<subdir0>/<subdir1>/... | ||
*All* entries within in this final `subdir` are yielded by :meth:`.search`, whether they are files or directories. | ||
Search results can be restricted by passing a (list of) globs. If a list is given, entries matching at least one of | ||
them are returned. | ||
>>> exe = ResourceResolver(..., "lammps") | ||
>>> exe.list() # doctest: +SKIP | ||
[ | ||
('v1', '/my/resources/lammps/bin/run_lammps_v1.sh), | ||
('v1_mpi', '/my/resources/lammps/bin/run_lammps_v1_mpi.sh), | ||
('v2_default', '/my/resources/lammps/bin/run_lammps_v2_default.sh), | ||
] | ||
>>> exe.default_version # doctest: +SKIP | ||
"v2_default" | ||
>>> exe.dict("v1*") # doctest: +SKIP | ||
{ | ||
'v1': '/my/resources/lammps/bin/run_lammps_v1.sh), | ||
'v1_mpi': '/my/resources/lammps/bin/run_lammps_v1_mpi.sh) | ||
} | ||
""" | ||
__slots__ = "_resource_paths", "_module", "_subdirs" | ||
def __init__(self, resource_paths, module, *subdirs): | ||
""" | ||
Args: | ||
resource_paths (list of str): base paths for resource locations | ||
module (str): name of the module | ||
*subdirs (str): additional sub directories to descend into | ||
""" | ||
self._resource_paths = resource_paths | ||
self._module = module | ||
self._subdirs = subdirs | ||
|
||
def __repr__(self): | ||
inner = repr(self._resource_paths) | ||
inner += f", {repr(self._module)}" | ||
inner += ", ".join(repr(s) for s in self._subdirs) | ||
return f"{type(self).__name__}({inner})" | ||
|
||
def _search(self, name): | ||
for p in self._resource_paths: | ||
sub = os.path.join(p, self._module, *self._subdirs) | ||
if os.path.exists(sub): | ||
for n in name: | ||
yield from sorted(glob(os.path.join(sub, n))) | ||
|
||
|
||
class ExecutableResolver(AbstractResolver): | ||
""" | ||
A resolver for executable scripts. | ||
Executables are expected to conform to the following format: | ||
<resource_path>/<module>/bin/run_<code>_<version_string>.<suffix> | ||
and have the executable bit set. :meth:`.search` yields tuples of version strings and full paths to the executable | ||
instead of plain strings. | ||
>>> exe = ExecutableResolver(..., "lammps") | ||
>>> exe.list() # doctest: +SKIP | ||
[ | ||
('v1', '/my/resources/lammps/bin/run_lammps_v1.sh), | ||
('v1_mpi', '/my/resources/lammps/bin/run_lammps_v1_mpi.sh), | ||
('v2_default', '/my/resources/lammps/bin/run_lammps_v2_default.sh), | ||
] | ||
>>> exe.default_version # doctest: +SKIP | ||
"v2_default" | ||
>>> exe.dict("v1*") # doctest: +SKIP | ||
{ | ||
'v1': '/my/resources/lammps/bin/run_lammps_v1.sh), | ||
'v1_mpi': '/my/resources/lammps/bin/run_lammps_v1_mpi.sh) | ||
} | ||
""" | ||
__slots__ = "_regex", "_resolver" | ||
def __init__(self, resource_paths, code, module=None, suffix=EXE_SUFFIX): | ||
""" | ||
Args: | ||
resource_paths (list of str): base paths for resource locations | ||
code (str): name of the simulation code | ||
module (str): name of the module the code is part of, same as `code` by default | ||
suffix (str): file ending; 'bat' on Windows 'sh' elsewhere | ||
""" | ||
if module is None: | ||
module = code | ||
self._regex = re.compile(f"run_{code}_(.*)\.{suffix}$") | ||
self._glob = f'run_{code}_*.{suffix}' | ||
self._resolver = ResourceResolver( | ||
resource_paths, | ||
module, 'bin', | ||
) | ||
|
||
def _search(self, name): | ||
seen = set() | ||
def cond(path): | ||
isfile = os.path.isfile(path) | ||
isexec = os.access(path, os.X_OK, effective_ids=os.access in os.supports_effective_ids) | ||
return isfile and isexec | ||
for path in filter(cond, self._resolver.search(self._glob)): | ||
# we know that the regex has to match, because we constrain the resolver with the glob | ||
version = self._regex.search(path).group(1) | ||
if version not in seen and any(fnmatch(version, n) for n in name): | ||
yield (version, path) | ||
seen.add(version) | ||
|
||
def dict(self, name="*") -> dict[str, str]: | ||
""" | ||
Construct dict from :meth:`.search` results. | ||
Args: | ||
name (str or list of str): glob(s) to filter the version strings | ||
Returns: | ||
dict: mapping version strings to full paths | ||
""" | ||
return dict(self.search(name=name)) | ||
|
||
@property | ||
def available_versions(self): | ||
""" | ||
list of str: all found versions | ||
""" | ||
return [x[0] for x in self.search("*")] | ||
|
||
@property | ||
def default_version(self): | ||
""" | ||
str: the first version found in resources | ||
If a version matching `*default*` exists, the first matching is returned. | ||
Raises: | ||
:class:`.ResourceNotFound`: if no executables are found at all | ||
""" | ||
try: | ||
return self.first("*default*")[0] | ||
except ResourceNotFound: | ||
pass | ||
# try again outside the except clause to avoid nested error in case this fails as well | ||
return self.first("*")[0] |
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
import os | ||
import os.path | ||
import unittest | ||
from pyiron_snippets.resources import ResourceNotFound, ResourceResolver, ExecutableResolver | ||
|
||
class TestResolvers(unittest.TestCase): | ||
""" | ||
Class to test resolvers | ||
""" | ||
|
||
@classmethod | ||
def setUpClass(cls): | ||
cls.static_path = os.path.join(os.path.dirname(__file__), "static", "resources") | ||
cls.res1 = os.path.join(cls.static_path, "res1") | ||
cls.res2 = os.path.join(cls.static_path, "res2") | ||
|
||
def test_resource_resolver(self): | ||
res = ResourceResolver([self.res1], "module1") | ||
self.assertEqual(set(res.search()), | ||
{os.path.join(self.res1, "module1", "bin"), | ||
os.path.join(self.res1, "module1", "data")}, | ||
"Simple search does not return all resources!") | ||
self.assertEqual(res.first(), tuple(res.search())[0], | ||
"first does not return first result!") | ||
self.assertEqual(list(res.search()), res.list(), "list not equal to search!") | ||
with self.assertRaises(ResourceNotFound, msg="first does not raise error on non existing resource!"): | ||
res.first("nonexisting") | ||
res = ResourceResolver([self.res1, self.res2], "module3") | ||
self.assertTrue(len(res.list("empty.txt")) == 2, | ||
msg="should find all instances of files with the same name.") | ||
|
||
def test_order(self): | ||
"""search must return results in the order given by the resource paths.""" | ||
self.assertTrue("res1" in ResourceResolver([self.res1, self.res2], "module3").first(), | ||
"resolver does not respect order of given resource paths!") | ||
self.assertTrue("res2" in ResourceResolver([self.res2, self.res1], "module3").first(), | ||
"resolver does not respect order of given resource paths!") | ||
self.assertEqual(tuple(os.path.basename(r) for r in ResourceResolver([self.res1], "module1").search()), | ||
tuple(sorted(("bin", "data"))), | ||
"search does not return results from the same folder in alphabetical order!") | ||
|
||
def test_chain(self): | ||
"""chained resolvers must behave like normal resolvers.""" | ||
chain = ResourceResolver([self.res1], "module3").chain(ResourceResolver([self.res2], "module3")) | ||
resol = ResourceResolver([self.res1, self.res2], "module3") | ||
|
||
self.assertEqual(chain.first(), resol.first(), | ||
"first returns different result for chained and normal resolver!") | ||
self.assertEqual(tuple(chain.search()), tuple(resol.search()), | ||
"search returns different result for chained and normal resolver!") | ||
|
||
def test_executable(self): | ||
res = ExecutableResolver([self.res1], code="code1", module="module1") | ||
self.assertNotIn("versionnonexec", res.available_versions, | ||
"ExecutableResolver must not list scripts that are not executable.") | ||
self.assertNotIn("wrong_format", res.available_versions, | ||
"ExecutableResolver must not list scripts that do not follow the correct format.") | ||
self.assertEqual("version1", res.default_version, | ||
"default version should be chosen in alphabetical order if not explicitly set.") | ||
res = ExecutableResolver([self.res1], code="code2", module="module1") | ||
print(res.list(), res.available_versions) | ||
self.assertEqual(res.default_version, "version2_default", | ||
"default version should be chosen as explicitly set.") | ||
|
||
if __name__ == "__main__": | ||
unittest.main() |