Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Resolvers to unify finding of resources #20

Merged
merged 7 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
302 changes: 302 additions & 0 deletions pyiron_snippets/resources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,302 @@
"""
Classes to find data files and executables in global paths.
"""

from abc import ABC, abstractmethod
from collections.abc import Iterator, Iterable
import os
import os.path
from fnmatch import fnmatch
from glob import glob
import re
from typing import Any

if os.name == "nt":
EXE_SUFFIX = "bat"
else:
EXE_SUFFIX = "sh"


class ResourceNotFound(RuntimeError):
pass


class AbstractResolver(ABC):
"""
Interface for resolvers.

Implementations must define :meth:`._search`, taking a tuple of names to search for and yielding instances of any
type. Implementations should pick a single type to yield, e.g. :class:`.ResourceResolver` always yields absolute
paths, while :class:`.ExecutableResolver` always yields 2-tuples of a version tag and absolute paths.
"""

@abstractmethod
def _search(self, name: tuple[str]) -> Iterator[Any]:
pass

def search(self, name: Iterable[str] | str = "*") -> Iterator[Any]:
"""
Yield all matches.

When `name` is given as an iterable, returned results match at least one of the `name` globs.

Args:
name (str, iterable of str): file name to search for; can be an exact file name, a glob or list of those

Yields:
object: resources matching `name`
"""
if name is not None and not isinstance(name, str):
name = tuple(name)
else:
name = (name,)
yield from self._search(name)

def list(self, name: Iterable[str] | str = "*") -> list[Any]:
"""
Return all matches.

Args:
name (str, iterable of str): file name to search for; can be an exact file name, a glob or list of those

Returns:
list: all matches returned by :meth:`.search`.
"""
return list(self.search(name))

def first(self, name: Iterable[str] | str = "*") -> Any:
"""
Return first match.

Args:
name (str, iterable of str): file name to search for; can be an exact file name, a glob or list of those

Returns:
object: the first match returned by :meth:`.search`.

Raises:
:class:`~.ResourceNotFound`: if no matches are found.
"""
try:
return next(iter(self.search(name)))
except StopIteration:
raise ResourceNotFound(f"Could not find {name} in {self}!") from None

def chain(self, *resolvers: "AbstractResolver") -> "ResolverChain":
"""
Return a new resolver that searches this and all given resolvers sequentially.

You will likely want to ensure that all given resolvers yield the same types and e.g. not mix ExecutableResolver
and ResourceResolver, but this is not checked.

The advantage of using :meth:`.chain` rather than adding more paths to one resolver is when different paths have
different internal sub structure, such as when combining resources from pyiron resources and conda data
packages. When searching for lammps potential files, e.g. we have some folders that are set up as

<resources>/lammps/potentials/...

but iprpy conda package that ships the NIST potentials doesn't have the lammps/potentials

<iprpy>/...

With chaining we can do very easily

>>> ResourceResolver([<resources>], "lammps", "potentials").chain(
... ResourceResolver([<iprpy>])) # doctest: +SKIP

without we'd need to modify the resource paths ourselves explicitly

>>> ResourceResolver([r + '/lammps/potentials' for r in <resources>] + [<iprpy>]) # doctest: +SKIP

which is a bit more awkward.

Args:
resolvers (:class:`.AbstractResolver`): any number of sub resolvers

Returns:
self: if `resolvers` is empty
:class:`.ResolverChain`: otherwise
"""
if resolvers == ():
return self
return ResolverChain(self, *resolvers)


class ResolverChain(AbstractResolver):
"""
A chain of resolvers. Matches are returned sequentially.
"""

__slots__ = ("_resolvers",)

def __init__(self, *resolvers):
"""
Args:
*resolvers (:class:`.AbstractResolver`): sub resolvers to use
"""
self._resolvers = resolvers

def _search(self, name):
for resolver in self._resolvers:
yield from resolver.search(name)

def __repr__(self):
inner = ", ".join(repr(r) for r in self._resolvers)
return f"{type(self).__name__}({inner})"


class ResourceResolver(AbstractResolver):
"""
Generic resolver for files and directories.

Resources are expected to conform to the following format:
<resource_path>/<module>/<subdir0>/<subdir1>/...

*All* entries within in this final `subdir` are yielded by :meth:`.search`, whether they are files or directories.
Search results can be restricted by passing a (list of) globs. If a list is given, entries matching at least one of
them are returned.

>>> res = ResourceResolver(..., "lammps")
>>> res.list() # doctest: +SKIP
[
"bin",
"potentials",
"potentials.csv"
]
"""

__slots__ = "_resource_paths", "_module", "_subdirs"

def __init__(self, resource_paths, module, *subdirs):
"""
Args:
resource_paths (list of str): base paths for resource locations
module (str): name of the module
*subdirs (str): additional sub directories to descend into
"""
self._resource_paths = resource_paths
self._module = module
self._subdirs = subdirs

def __repr__(self):
inner = repr(self._resource_paths)
inner += f", {repr(self._module)}"
inner += ", ".join(repr(s) for s in self._subdirs)
return f"{type(self).__name__}({inner})"

def _search(self, name):
for p in self._resource_paths:
sub = os.path.join(p, self._module, *self._subdirs)
if os.path.exists(sub):
for n in name:
yield from sorted(glob(os.path.join(sub, n)))


class ExecutableResolver(AbstractResolver):
"""
A resolver for executable scripts.

Executables are expected to conform to the following format:
<resource_path>/<module>/bin/run_<code>_<version_string>.<suffix>

and have the executable bit set. :meth:`.search` yields tuples of version strings and full paths to the executable
instead of plain strings.

>>> exe = ExecutableResolver(..., "lammps")
>>> exe.list() # doctest: +SKIP
[
('v1', '/my/resources/lammps/bin/run_lammps_v1.sh),
('v1_mpi', '/my/resources/lammps/bin/run_lammps_v1_mpi.sh),
('v2_default', '/my/resources/lammps/bin/run_lammps_v2_default.sh),
]
>>> exe.default_version # doctest: +SKIP
"v2_default"
>>> exe.dict("v1*") # doctest: +SKIP
{
'v1': '/my/resources/lammps/bin/run_lammps_v1.sh),
'v1_mpi': '/my/resources/lammps/bin/run_lammps_v1_mpi.sh)
}
"""

__slots__ = "_regex", "_resolver"

def __init__(self, resource_paths, code, module=None, suffix=EXE_SUFFIX):
"""
Args:
resource_paths (list of str): base paths for resource locations
code (str): name of the simulation code
module (str): name of the module the code is part of, same as `code` by default
suffix (str, optional): file ending; if `None`, 'bat' on Windows 'sh' elsewhere
"""
if suffix is None:
suffix = EXE_SUFFIX
if module is None:
module = code
self._regex = re.compile(f"run_{code}_(.*)\\.{suffix}$")
self._glob = f"run_{code}_*.{suffix}"
self._resolver = ResourceResolver(
resource_paths,
module,
"bin",
)

def __repr__(self):
inner = repr(self._resolver._resource_paths)
inner += f", {repr(self._glob)}"
inner += f", {repr(self._resolver._module)}"
# recover suffix
inner += f", {repr(self._glob.split('.')[-1])}"
return f"{type(self).__name__}({inner})"

def _search(self, name):
seen = set()

def cond(path):
isfile = os.path.isfile(path)
isexec = os.access(
path, os.X_OK, effective_ids=os.access in os.supports_effective_ids
)
return isfile and isexec

for path in filter(cond, self._resolver.search(self._glob)):
# we know that the regex has to match, because we constrain the resolver with the glob
version = self._regex.search(path).group(1)
if version not in seen and any(fnmatch(version, n) for n in name):
yield (version, path)
seen.add(version)

def dict(self, name="*") -> dict[str, str]:
"""
Construct dict from :meth:`.search` results.

Args:
name (str or list of str): glob(s) to filter the version strings

Returns:
dict: mapping version strings to full paths
"""
return dict(self.search(name=name))

@property
def available_versions(self):
"""
list of str: all found versions
"""
return [x[0] for x in self.search("*")]

@property
def default_version(self):
"""
str: the first version found in resources

If a version matching `*default*` exists, the first matching is returned.

Raises:
:class:`.ResourceNotFound`: if no executables are found at all
"""
try:
return self.first("*default*")[0]
except ResourceNotFound:
pass
# try again outside the except clause to avoid nested error in case this fails as well
return self.first("*")[0]
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
92 changes: 92 additions & 0 deletions tests/unit/test_resources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import os
import os.path
import unittest
from pyiron_snippets.resources import ResourceNotFound, ResourceResolver, ExecutableResolver

class TestResolvers(unittest.TestCase):
"""
Class to test resolvers
"""

@classmethod
def setUpClass(cls):
cls.static_path = os.path.join(os.path.dirname(__file__), "static", "resources")
cls.res1 = os.path.join(cls.static_path, "res1")
cls.res2 = os.path.join(cls.static_path, "res2")

def test_resource_resolver(self):
res = ResourceResolver([self.res1], "module1")
self.assertEqual(set(res.search()),
{os.path.join(self.res1, "module1", "bin"),
os.path.join(self.res1, "module1", "data")},
"Simple search does not return all resources!")
self.assertEqual(res.first(), tuple(res.search())[0],
"first does not return first result!")
self.assertEqual(list(res.search()), res.list(), "list not equal to search!")
with self.assertRaises(ResourceNotFound, msg="first does not raise error on non existing resource!"):
res.first("nonexisting")
res = ResourceResolver([self.res1, self.res2], "module3")
self.assertTrue(len(res.list("empty.txt")) == 2,
msg="should find all instances of files with the same name.")

def test_order(self):
"""search must return results in the order given by the resource paths."""
self.assertTrue("res1" in ResourceResolver([self.res1, self.res2], "module3").first(),
"resolver does not respect order of given resource paths!")
self.assertTrue("res2" in ResourceResolver([self.res2, self.res1], "module3").first(),
"resolver does not respect order of given resource paths!")
self.assertEqual(tuple(os.path.basename(r) for r in ResourceResolver([self.res1], "module1").search()),
tuple(sorted(("bin", "data"))),
"search does not return results from the same folder in alphabetical order!")

def test_chain(self):
"""chained resolvers must behave like normal resolvers."""
chain = ResourceResolver([self.res1], "module3").chain(ResourceResolver([self.res2], "module3"))
resol = ResourceResolver([self.res1, self.res2], "module3")

self.assertEqual(chain.first(), resol.first(),
"first returns different result for chained and normal resolver!")
self.assertEqual(tuple(chain.search()), tuple(resol.search()),
"search returns different result for chained and normal resolver!")

self.assertIs(resol, resol.chain(), "Empty chain does not return the same resolver!")

def test_executable(self):
for suffix in (None, "sh", "bat"):
with self.subTest(suffix=suffix):
res = ExecutableResolver([self.res1], code="code1", module="module1", suffix=suffix)
if os.name != "nt":
# no exec bits are present on windows it seems
self.assertNotIn("versionnonexec", res.available_versions,
"ExecutableResolver must not list scripts that are not executable.")
self.assertNotIn("wrong_format", res.available_versions,
"ExecutableResolver must not list scripts that do not follow the correct format.")
self.assertEqual("version1", res.default_version,
"default version should be chosen in alphabetical order if not explicitly set.")
res = ExecutableResolver([self.res1], code="code2", module="module1", suffix=suffix)
self.assertEqual(res.default_version, "version2_default",
"default version should be chosen as explicitly set.")
self.assertEqual(dict(res.search()), res.dict(), "dict not equal to search!")

def test_resource_resolver_subdirs(self):
"""Resolver constructor should take any additional args to search sub directories."""
res = ResourceResolver([self.res1], "module1", "bin")
expected_results = {
os.path.join(self.res1, "module1", "bin", path)
for path in ("run_code1_versionnonexec.sh", "run_code1_version1.sh", "run_code1_version2.sh")
}
self.assertEqual(set(res.search("*code1*.sh")), expected_results,
"Search with subdirectories does not return all resources!")

def test_resource_resolver_name_globs(self):
res = ResourceResolver([self.res1], "module1", "bin")
expected_results = {
os.path.join(self.res1, "module1", "bin", "run_code1_version1.sh"),
os.path.join(self.res1, "module1", "bin", "run_code1_version2.sh"),
}
results = set(res.search(["*code1*version1.sh", "*code1*sion2.sh"]))
self.assertEqual(results, expected_results,
"Search with multiple glob patterns does not return all resources!")

if __name__ == "__main__":
unittest.main()
Loading