diff --git a/ci/environment-py38.yml b/ci/environment-py38.yml index 25f68e5dd..9c514de98 100644 --- a/ci/environment-py38.yml +++ b/ci/environment-py38.yml @@ -3,7 +3,7 @@ channels: - conda-forge dependencies: - pip - - git + - git <2.45.0 - py - pip: - hadoop-test-cluster diff --git a/docs/source/api.rst b/docs/source/api.rst index cb14fe7e1..c01b7802f 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -47,7 +47,9 @@ Base Classes fsspec.core.OpenFiles fsspec.core.get_fs_token_paths fsspec.core.url_to_fs - fsspec.dircache.DirCache + fsspec.dircache.DisabledListingsCache + fsspec.dircache.MemoryListingsCache + fsspec.dircache.FileListingsCache fsspec.FSMap fsspec.generic.GenericFileSystem fsspec.registry.register_implementation @@ -82,7 +84,13 @@ Base Classes .. autofunction:: fsspec.core.url_to_fs -.. autoclass:: fsspec.dircache.DirCache +.. autoclass:: fsspec.dircache.DisabledListingsCache + :members: __init__ + +.. autoclass:: fsspec.dircache.MemoryListingsCache + :members: __init__ + +.. autoclass:: fsspec.dircache.FileListingsCache :members: __init__ .. autoclass:: fsspec.FSMap diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index f5f30fdd9..71ff51efc 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -1,6 +1,14 @@ Changelog ========= +Dev +-------- + +Enhancements + +- add file-based listing cache using diskcache (#895) + warning: use new ``listings_cache_options`` instead of ``use_listings_cache`` etc. + 2024.3.1 -------- diff --git a/docs/source/features.rst b/docs/source/features.rst index 907084e0d..1d9ac5e0a 100644 --- a/docs/source/features.rst +++ b/docs/source/features.rst @@ -181,15 +181,26 @@ Listings Caching ---------------- For some implementations, getting file listings (i.e., ``ls`` and anything that -depends on it) is expensive. These implementations use dict-like instances of -:class:`fsspec.dircache.DirCache` to manage the listings. - -The cache allows for time-based expiry of entries with the ``listings_expiry_time`` -parameter, or LRU expiry with the ``max_paths`` parameter. These can be -set on any implementation instance that uses listings caching; or to skip the -caching altogether, use ``use_listings_cache=False``. That would be appropriate -when the target location is known to be volatile because it is being written -to from other sources. +depends on it) is expensive. These implementations maye use either dict-like instances of +:class:`fsspec.dircache.MemoryListingsCache` or file-based caching with instances of +:class:`fsspec.dircache.FileListingsCache` to manage the listings. + +The listings cache can be controlled via the keyword ``listings_cache_options`` which is a dictionary. +The type of cache that is used, can be controlled via the keyword ``cache_type`` (`disabled`, `memory` or `file`). +The cache allows for time-based expiry of entries with the keyword ``expiry_time``. If the target location is known to +be volatile because e.g. it is being written to from other sources we recommend to disable the listings cache. +If you want to use the file-based caching, you can also provide the argument +``directory`` to determine where the cache file is stored. + +Example for ``listings_cache_options``: + +.. code-block:: json + + { + "cache_type": "file", + "expiry_time": 3600, + "directory": "/tmp/cache" + } When the ``fsspec`` instance writes to the backend, the method ``invalidate_cache`` is called, so that subsequent listing of the given paths will force a refresh. In diff --git a/fsspec/archive.py b/fsspec/archive.py index f466780fc..17bc0b893 100644 --- a/fsspec/archive.py +++ b/fsspec/archive.py @@ -37,19 +37,19 @@ def _all_dirnames(self, paths): def info(self, path, **kwargs): self._get_dirs() path = self._strip_protocol(path) - if path in {"", "/"} and self.dir_cache: + if path in {"", "/"} and self.dircache: return {"name": "", "type": "directory", "size": 0} - if path in self.dir_cache: - return self.dir_cache[path] - elif path + "/" in self.dir_cache: - return self.dir_cache[path + "/"] + if path in self.dircache: + return self.dircache[path] + elif path + "/" in self.dircache: + return self.dircache[path + "/"] else: raise FileNotFoundError(path) def ls(self, path, detail=True, **kwargs): self._get_dirs() paths = {} - for p, f in self.dir_cache.items(): + for p, f in self.dircache.items(): p = p.rstrip("/") if "/" in p: root = p.rsplit("/", 1)[0] diff --git a/fsspec/asyn.py b/fsspec/asyn.py index a040efc4b..caf7b0897 100644 --- a/fsspec/asyn.py +++ b/fsspec/asyn.py @@ -312,7 +312,15 @@ class AsyncFileSystem(AbstractFileSystem): mirror_sync_methods = True disable_throttling = False - def __init__(self, *args, asynchronous=False, loop=None, batch_size=None, **kwargs): + def __init__( + self, + *args, + asynchronous=False, + loop=None, + batch_size=None, + listings_cache_options=None, + **kwargs, + ): self.asynchronous = asynchronous self._pid = os.getpid() if not asynchronous: @@ -320,7 +328,7 @@ def __init__(self, *args, asynchronous=False, loop=None, batch_size=None, **kwar else: self._loop = None self.batch_size = batch_size - super().__init__(*args, **kwargs) + super().__init__(listings_cache_options, *args, **kwargs) @property def loop(self): diff --git a/fsspec/dircache.py b/fsspec/dircache.py deleted file mode 100644 index eca19566b..000000000 --- a/fsspec/dircache.py +++ /dev/null @@ -1,98 +0,0 @@ -import time -from collections.abc import MutableMapping -from functools import lru_cache - - -class DirCache(MutableMapping): - """ - Caching of directory listings, in a structure like:: - - {"path0": [ - {"name": "path0/file0", - "size": 123, - "type": "file", - ... - }, - {"name": "path0/file1", - }, - ... - ], - "path1": [...] - } - - Parameters to this class control listing expiry or indeed turn - caching off - """ - - def __init__( - self, - use_listings_cache=True, - listings_expiry_time=None, - max_paths=None, - **kwargs, - ): - """ - - Parameters - ---------- - use_listings_cache: bool - If False, this cache never returns items, but always reports KeyError, - and setting items has no effect - listings_expiry_time: int or float (optional) - Time in seconds that a listing is considered valid. If None, - listings do not expire. - max_paths: int (optional) - The number of most recent listings that are considered valid; 'recent' - refers to when the entry was set. - """ - self._cache = {} - self._times = {} - if max_paths: - self._q = lru_cache(max_paths + 1)(lambda key: self._cache.pop(key, None)) - self.use_listings_cache = use_listings_cache - self.listings_expiry_time = listings_expiry_time - self.max_paths = max_paths - - def __getitem__(self, item): - if self.listings_expiry_time is not None: - if self._times.get(item, 0) - time.time() < -self.listings_expiry_time: - del self._cache[item] - if self.max_paths: - self._q(item) - return self._cache[item] # maybe raises KeyError - - def clear(self): - self._cache.clear() - - def __len__(self): - return len(self._cache) - - def __contains__(self, item): - try: - self[item] - return True - except KeyError: - return False - - def __setitem__(self, key, value): - if not self.use_listings_cache: - return - if self.max_paths: - self._q(key) - self._cache[key] = value - if self.listings_expiry_time is not None: - self._times[key] = time.time() - - def __delitem__(self, key): - del self._cache[key] - - def __iter__(self): - entries = list(self._cache) - - return (k for k in entries if k in self) - - def __reduce__(self): - return ( - DirCache, - (self.use_listings_cache, self.listings_expiry_time, self.max_paths), - ) diff --git a/fsspec/implementations/http.py b/fsspec/implementations/http.py index 4580764ce..1d9961020 100644 --- a/fsspec/implementations/http.py +++ b/fsspec/implementations/http.py @@ -3,7 +3,6 @@ import logging import re import weakref -from copy import copy from urllib.parse import urlparse import aiohttp @@ -58,6 +57,7 @@ def __init__( client_kwargs=None, get_client=get_client, encoded=False, + listings_cache_options=None, **storage_options, ): """ @@ -83,11 +83,39 @@ def __init__( A callable which takes keyword arguments and constructs an aiohttp.ClientSession. It's state will be managed by the HTTPFileSystem class. + listings_cache_options: dict + Options for the listings cache. storage_options: key-value Any other parameters passed on to requests cache_type, cache_options: defaults used in open """ - super().__init__(self, asynchronous=asynchronous, loop=loop, **storage_options) + # TODO: remove in future release + # Clean caching-related parameters from `storage_options` + # before propagating them as `request_options` through `self.kwargs`. + old_listings_cache_kwargs = { + "use_listings_cache", + "listings_expiry_time", + "max_paths", + "skip_instance_cache", + } + # intersection of old_listings_cache_kwargs and storage_options + old_listings_cache_kwargs = old_listings_cache_kwargs.intersection( + storage_options + ) + if old_listings_cache_kwargs: + logger.warning( + f"The following parameters are not used anymore and will be ignored: {old_listings_cache_kwargs}. " + f"Use new `listings_cache_options` instead." + ) + for key in old_listings_cache_kwargs: + del storage_options[key] + super().__init__( + self, + asynchronous=asynchronous, + loop=loop, + listings_cache_options=listings_cache_options, + **storage_options, + ) self.block_size = block_size if block_size is not None else DEFAULT_BLOCK_SIZE self.simple_links = simple_links self.same_schema = same_scheme @@ -96,19 +124,10 @@ def __init__( self.client_kwargs = client_kwargs or {} self.get_client = get_client self.encoded = encoded - self.kwargs = storage_options - self._session = None - - # Clean caching-related parameters from `storage_options` - # before propagating them as `request_options` through `self.kwargs`. # TODO: Maybe rename `self.kwargs` to `self.request_options` to make # it clearer. - request_options = copy(storage_options) - self.use_listings_cache = request_options.pop("use_listings_cache", False) - request_options.pop("listings_expiry_time", None) - request_options.pop("max_paths", None) - request_options.pop("skip_instance_cache", None) - self.kwargs = request_options + self.kwargs = storage_options + self._session = None @property def fsid(self): @@ -201,7 +220,7 @@ async def _ls_real(self, url, detail=True, **kwargs): return sorted(out) async def _ls(self, url, detail=True, **kwargs): - if self.use_listings_cache and url in self.dircache: + if url in self.dircache: out = self.dircache[url] else: out = await self._ls_real(url, detail=detail, **kwargs) diff --git a/fsspec/implementations/libarchive.py b/fsspec/implementations/libarchive.py index eb6f14535..da3b3361d 100644 --- a/fsspec/implementations/libarchive.py +++ b/fsspec/implementations/libarchive.py @@ -115,7 +115,7 @@ def __init__( Kwargs passed when instantiating the target FS, if ``fo`` is a string. """ - super().__init__(self, **kwargs) + super().__init__(False, self, **kwargs) if mode != "r": raise ValueError("Only read from archive files accepted") if isinstance(fo, str): diff --git a/fsspec/implementations/local.py b/fsspec/implementations/local.py index 7938a9147..9881606f1 100644 --- a/fsspec/implementations/local.py +++ b/fsspec/implementations/local.py @@ -15,12 +15,6 @@ logger = logging.getLogger("fsspec.local") -def _remove_prefix(text: str, prefix: str): - if text.startswith(prefix): - return text[len(prefix) :] - return text - - class LocalFileSystem(AbstractFileSystem): """Interface to files on local storage @@ -121,8 +115,8 @@ def lexists(self, path, **kwargs): return osp.lexists(path) def cp_file(self, path1, path2, **kwargs): - path1 = self._strip_protocol(path1, remove_trailing_slash=True) - path2 = self._strip_protocol(path2, remove_trailing_slash=True) + path1 = self._strip_protocol(path1) + path2 = self._strip_protocol(path2) if self.auto_mkdir: self.makedirs(self._parent(path2), exist_ok=True) if self.isfile(path1): @@ -151,8 +145,8 @@ def put_file(self, path1, path2, callback=None, **kwargs): return self.cp_file(path1, path2, **kwargs) def mv(self, path1, path2, **kwargs): - path1 = self._strip_protocol(path1, remove_trailing_slash=True) - path2 = self._strip_protocol(path2, remove_trailing_slash=True) + path1 = self._strip_protocol(path1) + path2 = self._strip_protocol(path2) shutil.move(path1, path2) def link(self, src, dst, **kwargs): @@ -176,7 +170,7 @@ def rm(self, path, recursive=False, maxdepth=None): path = [path] for p in path: - p = self._strip_protocol(p, remove_trailing_slash=True) + p = self._strip_protocol(p) if self.isdir(p): if not recursive: raise ValueError("Cannot delete directory, set recursive=True") @@ -219,7 +213,7 @@ def modified(self, path): @classmethod def _parent(cls, path): - path = cls._strip_protocol(path, remove_trailing_slash=True) + path = cls._strip_protocol(path) if os.sep == "/": # posix native return path.rsplit("/", 1)[0] or "/" @@ -234,17 +228,43 @@ def _parent(cls, path): return path_ @classmethod - def _strip_protocol(cls, path, remove_trailing_slash=False): + def _strip_protocol(cls, path): path = stringify_path(path) - if path.startswith("file:"): - path = _remove_prefix(_remove_prefix(path, "file://"), "file:") - if os.sep == "\\": - path = path.lstrip("/") + if path.startswith("file://"): + path = path[7:] + elif path.startswith("file:"): + path = path[5:] + elif path.startswith("local://"): + path = path[8:] elif path.startswith("local:"): - path = _remove_prefix(_remove_prefix(path, "local://"), "local:") - if os.sep == "\\": - path = path.lstrip("/") - return make_path_posix(path, remove_trailing_slash) + path = path[6:] + + path = make_path_posix(path) + if os.sep != "/": + # This code-path is a stripped down version of + # > drive, path = ntpath.splitdrive(path) + if path[1:2] == ":": + # Absolute drive-letter path, e.g. X:\Windows + # Relative path with drive, e.g. X:Windows + drive, path = path[:2], path[2:] + elif path[:2] == "//": + # UNC drives, e.g. \\server\share or \\?\UNC\server\share + # Device drives, e.g. \\.\device or \\?\device + if (index1 := path.find("/", 2)) == -1 or ( + index2 := path.find("/", index1 + 1) + ) == -1: + drive, path = path, "" + else: + drive, path = path[:index2], path[index2:] + else: + # Relative path, e.g. Windows + drive = "" + + path = path.rstrip("/") or cls.root_marker + return drive + path + + else: + return path.rstrip("/") or cls.root_marker def _isfilestore(self): # Inheriting from DaskFileSystem makes this False (S3, etc. were) @@ -257,42 +277,55 @@ def chmod(self, path, mode): return os.chmod(path, mode) -def make_path_posix(path, remove_trailing_slash=False): - """Make path generic for current OS""" +def make_path_posix(path): + """Make path generic and absolute for current OS""" if not isinstance(path, str): if isinstance(path, (list, set, tuple)): - return type(path)(make_path_posix(p, remove_trailing_slash) for p in path) + return type(path)(make_path_posix(p) for p in path) else: - path = str(stringify_path(path)) + path = stringify_path(path) + if not isinstance(path, str): + raise TypeError(f"could not convert {path!r} to string") if os.sep == "/": # Native posix if path.startswith("/"): # most common fast case for posix - return path.rstrip("/") or "/" if remove_trailing_slash else path + return path elif path.startswith("~"): - return make_path_posix(osp.expanduser(path), remove_trailing_slash) + return osp.expanduser(path) elif path.startswith("./"): path = path[2:] - path = f"{os.getcwd()}/{path}" - return path.rstrip("/") or "/" if remove_trailing_slash else path + elif path == ".": + path = "" return f"{os.getcwd()}/{path}" else: # NT handling - if len(path) > 1: - if path[1] == ":": - # windows full path like "C:\\local\\path" - if len(path) <= 3: - # nt root (something like c:/) - return path[0] + ":/" - path = path.replace("\\", "/").replace("//", "/") - return path.rstrip("/") if remove_trailing_slash else path - elif path[0] == "~": - return make_path_posix(osp.expanduser(path), remove_trailing_slash) - elif path.startswith(("\\\\", "//")): - # windows UNC/DFS-style paths - path = "//" + path[2:].replace("\\", "/").replace("//", "/") - return path.rstrip("/") if remove_trailing_slash else path - return make_path_posix(osp.abspath(path), remove_trailing_slash) + if path[0:1] == "/" and path[2:3] == ":": + # path is like "/c:/local/path" + path = path[1:] + if path[1:2] == ":": + # windows full path like "C:\\local\\path" + if len(path) <= 3: + # nt root (something like c:/) + return path[0] + ":/" + path = path.replace("\\", "/") + return path + elif path[0:1] == "~": + return make_path_posix(osp.expanduser(path)) + elif path.startswith(("\\\\", "//")): + # windows UNC/DFS-style paths + return "//" + path[2:].replace("\\", "/") + elif path.startswith(("\\", "/")): + # windows relative path with root + path = path.replace("\\", "/") + return f"{osp.splitdrive(os.getcwd())[0]}{path}" + else: + path = path.replace("\\", "/") + if path.startswith("./"): + path = path[2:] + elif path == ".": + path = "" + return f"{make_path_posix(os.getcwd())}/{path}" def trailing_sep(path): diff --git a/fsspec/implementations/tar.py b/fsspec/implementations/tar.py index 412e5ba4d..580932861 100644 --- a/fsspec/implementations/tar.py +++ b/fsspec/implementations/tar.py @@ -82,7 +82,7 @@ def __init__( self._fo_ref = fo self.fo = fo # the whole instance is a context self.tar = tarfile.TarFile(fileobj=self.fo) - self.dir_cache = None + self.dircache = None self.index_store = index_store self.index = None @@ -101,11 +101,11 @@ def _index(self): # TODO: save index to self.index_store here, if set def _get_dirs(self): - if self.dir_cache is not None: + if self.dircache is not None: return # This enables ls to get directories as children as well as files - self.dir_cache = { + self.dircache = { dirname: {"name": dirname, "size": 0, "type": "directory"} for dirname in self._all_dirnames(self.tar.getnames()) } @@ -113,7 +113,7 @@ def _get_dirs(self): info = member.get_info() info["name"] = info["name"].rstrip("/") info["type"] = typemap.get(info["type"], "file") - self.dir_cache[info["name"]] = info + self.dircache[info["name"]] = info def _open(self, path, mode="rb", **kwargs): if mode != "rb": diff --git a/fsspec/implementations/tests/test_http.py b/fsspec/implementations/tests/test_http.py index fdae51ff5..e91c5495f 100644 --- a/fsspec/implementations/tests/test_http.py +++ b/fsspec/implementations/tests/test_http.py @@ -11,6 +11,7 @@ import fsspec.asyn import fsspec.utils from fsspec.implementations.http import HTTPStreamFile +from fsspec.listings_cache import CacheType, FileListingsCache, MemoryListingsCache from fsspec.tests.conftest import data, reset_files, server, win # noqa: F401 @@ -27,87 +28,157 @@ def test_list_invalid_args(server): def test_list_cache(server): - h = fsspec.filesystem("http", use_listings_cache=True) + h = fsspec.filesystem("http", listings_cache_options=True) + assert h.listings_cache_options == { + "cache_type": CacheType.MEMORY, + "expiry_time": None, + } + assert issubclass(h.dircache.__class__, MemoryListingsCache) out = h.glob(server + "/index/*") assert out == [server + "/index/realfile"] -def test_list_cache_with_expiry_time_cached(server): - h = fsspec.filesystem("http", use_listings_cache=True, listings_expiry_time=30) +@pytest.mark.parametrize("expiry_time", [None, 10]) +def test_list_cache_memory(server, expiry_time): + h = fsspec.filesystem( + "http", + listings_cache_options={"cache_type": "memory", "expiry_time": expiry_time}, + ) + assert h.listings_cache_options == { + "cache_type": CacheType.MEMORY, + "expiry_time": expiry_time, + } + assert issubclass(h.dircache.__class__, MemoryListingsCache) + start = time.time() + out = h.glob(server + "/index/*") + normal_duration = time.time() - start + assert out == [server + "/index/realfile"] + # Verify cache content. + assert len(h.dircache) == 1 + start = time.time() + out = h.glob(server + "/index/*") + cached_duration = time.time() - start + assert out == [server + "/index/realfile"] + assert normal_duration / cached_duration > 1.5 - # First, the directory cache is not initialized. - assert not h.dircache - # By querying the filesystem with "use_listings_cache=True", - # the cache will automatically get populated. +@pytest.mark.parametrize("expiry_time", [None, 10]) +def test_list_cache_file(server, tmp_path, expiry_time): + h = fsspec.filesystem( + "http", + listings_cache_options={ + "cache_type": "file", + "expiry_time": expiry_time, + "directory": tmp_path, + }, + ) + assert h.listings_cache_options == { + "cache_type": CacheType.FILE, + "expiry_time": expiry_time, + "directory": tmp_path, + } + assert issubclass(h.dircache.__class__, FileListingsCache) + h.dircache.clear() # Needed for filedircache + start = time.time() out = h.glob(server + "/index/*") + normal_duration = time.time() - start assert out == [server + "/index/realfile"] - # Verify cache content. assert len(h.dircache) == 1 - + start = time.time() out = h.glob(server + "/index/*") + cached_duration = time.time() - start assert out == [server + "/index/realfile"] + assert normal_duration / cached_duration > 1.5 + h.dircache.clear() # clean up -def test_list_cache_with_expiry_time_purged(server): - h = fsspec.filesystem("http", use_listings_cache=True, listings_expiry_time=0.3) - +@pytest.mark.parametrize("listings_cache_type", ["memory", "file"]) +def test_list_cache_with_expiry_time_purged(server, listings_cache_type): + h = fsspec.filesystem( + "http", + listings_cache_options={ + "cache_type": listings_cache_type, + "expiry_time": 3, + }, + ) + expected_listings_cache_options = { + "cache_type": ( + CacheType.MEMORY if listings_cache_type == "memory" else CacheType.FILE + ), + "expiry_time": 3, + } + if listings_cache_type == "file": + expected_listings_cache_options["directory"] = None + assert h.listings_cache_options == expected_listings_cache_options + h.dircache.clear() # Needed for filedircache # First, the directory cache is not initialized. assert not h.dircache - # By querying the filesystem with "use_listings_cache=True", # the cache will automatically get populated. out = h.glob(server + "/index/*") assert out == [server + "/index/realfile"] assert len(h.dircache) == 1 - # Verify cache content. assert server + "/index/" in h.dircache assert len(h.dircache.get(server + "/index/")) == 1 - # Wait beyond the TTL / cache expiry time. - time.sleep(0.31) - + time.sleep(4) # Verify that the cache item should have been purged. cached_items = h.dircache.get(server + "/index/") assert cached_items is None - # Verify that after clearing the item from the cache, # it can get populated again. out = h.glob(server + "/index/*") assert out == [server + "/index/realfile"] cached_items = h.dircache.get(server + "/index/") assert len(cached_items) == 1 + h.dircache.clear() # clean up -def test_list_cache_reuse(server): - h = fsspec.filesystem("http", use_listings_cache=True, listings_expiry_time=5) - +@pytest.mark.parametrize("listings_cache_type", ["memory", "file"]) +def test_list_cache_reuse(server, listings_cache_type): + h = fsspec.filesystem( + "http", + listings_cache_options={"cache_type": listings_cache_type, "expiry_time": 5}, + ) + expected_listings_cache_options = { + "cache_type": ( + CacheType.MEMORY if listings_cache_type == "memory" else CacheType.FILE + ), + "expiry_time": 5, + } + if listings_cache_type == "file": + expected_listings_cache_options["directory"] = None + assert h.listings_cache_options == expected_listings_cache_options + # Needed for filedircache + h.dircache.clear() # First, the directory cache is not initialized. assert not h.dircache - # By querying the filesystem with "use_listings_cache=True", # the cache will automatically get populated. out = h.glob(server + "/index/*") assert out == [server + "/index/realfile"] - # Verify cache content. assert len(h.dircache) == 1 - # Verify another instance without caching enabled does not have cache content. h = fsspec.filesystem("http", use_listings_cache=False) assert not h.dircache - # Verify that yet another new instance, with caching enabled, # will see the same cache content again. - h = fsspec.filesystem("http", use_listings_cache=True, listings_expiry_time=5) + h = fsspec.filesystem( + "http", + listings_cache_options={"cache_type": listings_cache_type, "expiry_time": 5}, + ) assert len(h.dircache) == 1 - # However, yet another instance with a different expiry time will also not have # any valid cache content. - h = fsspec.filesystem("http", use_listings_cache=True, listings_expiry_time=666) + h = fsspec.filesystem( + "http", + listings_cache_options={"cache_type": listings_cache_type, "expiry_time": 666}, + ) assert len(h.dircache) == 0 + h.dircache.clear() # clean up def test_ls_raises_filenotfound(server): @@ -123,12 +194,6 @@ def test_list_cache_with_max_paths(server): assert out == [server + "/index/realfile"] -def test_list_cache_with_skip_instance_cache(server): - h = fsspec.filesystem("http", use_listings_cache=True, skip_instance_cache=True) - out = h.glob(server + "/index/*") - assert out == [server + "/index/realfile"] - - def test_glob_return_subfolders(server): h = fsspec.filesystem("http") out = h.glob(server + "/simple/*") diff --git a/fsspec/implementations/tests/test_local.py b/fsspec/implementations/tests/test_local.py index 428abc980..ef3927912 100644 --- a/fsspec/implementations/tests/test_local.py +++ b/fsspec/implementations/tests/test_local.py @@ -40,6 +40,35 @@ odir = os.getcwd() +@pytest.fixture() +def cwd(): + pth = os.getcwd().replace("\\", "/") + assert not pth.endswith("/") + yield pth + + +@pytest.fixture() +def current_drive(cwd): + drive = os.path.splitdrive(cwd)[0] + assert not drive or (len(drive) == 2 and drive.endswith(":")) + yield drive + + +@pytest.fixture() +def user_home(): + pth = os.path.expanduser("~").replace("\\", "/") + assert not pth.endswith("/") + yield pth + + +def winonly(*args): + return pytest.param(*args, marks=pytest.mark.skipif(not WIN, reason="Windows only")) + + +def posixonly(*args): + return pytest.param(*args, marks=pytest.mark.skipif(WIN, reason="Posix only")) + + @contextmanager def filetexts(d, open=open, mode="t"): """Dumps a number of textfiles to disk @@ -472,7 +501,6 @@ def test_make_path_posix(): assert make_path_posix("/posix") == f"{drive}:/posix" # Windows drive requires trailing slash assert make_path_posix("C:\\") == "C:/" - assert make_path_posix("C:\\", remove_trailing_slash=True) == "C:/" else: assert make_path_posix("/a/posix/path") == "/a/posix/path" assert make_path_posix("/posix") == "/posix" @@ -505,6 +533,71 @@ def test_make_path_posix(): assert userpath.endswith("/path") +@pytest.mark.parametrize( + "path", + [ + "/abc/def", + "abc/def", + "", + ".", + "//server/share/", + "\\\\server\\share\\", + "C:\\", + "d:/abc/def", + "e:", + pytest.param( + "\\\\server\\share", + marks=[ + pytest.mark.xfail( + WIN and sys.version_info < (3, 11), + reason="requires py3.11+ see: python/cpython#96290", + ) + ], + ), + pytest.param( + "f:foo", + marks=[pytest.mark.xfail(WIN, reason="unsupported")], + id="relative-path-with-drive", + ), + ], +) +def test_make_path_posix_returns_absolute_paths(path): + posix_pth = make_path_posix(path) + assert os.path.isabs(posix_pth) + + +@pytest.mark.parametrize("container_cls", [list, set, tuple]) +def test_make_path_posix_set_list_tuple(container_cls): + paths = container_cls( + [ + "/foo/bar", + "bar/foo", + ] + ) + posix_paths = make_path_posix(paths) + assert isinstance(posix_paths, container_cls) + assert posix_paths == container_cls( + [ + make_path_posix("/foo/bar"), + make_path_posix("bar/foo"), + ] + ) + + +@pytest.mark.parametrize( + "obj", + [ + 1, + True, + None, + object(), + ], +) +def test_make_path_posix_wrong_type(obj): + with pytest.raises(TypeError): + make_path_posix(obj) + + def test_parent(): if WIN: assert LocalFileSystem._parent("C:\\file or folder") == "C:/" @@ -514,6 +607,48 @@ def test_parent(): assert LocalFileSystem._parent("/") == "/" +@pytest.mark.parametrize( + "path,parent", + [ + ("C:\\", "C:/"), + ("C:\\.", "C:/"), + ("C:\\.\\", "C:/"), + ("file:C:/", "C:/"), + ("file://C:/", "C:/"), + ("local:C:/", "C:/"), + ("local://C:/", "C:/"), + ("\\\\server\\share", "//server/share"), + ("\\\\server\\share\\", "//server/share"), + ("\\\\server\\share\\path", "//server/share"), + ("//server/share", "//server/share"), + ("//server/share/", "//server/share"), + ("//server/share/path", "//server/share"), + ("C:\\file or folder", "C:/"), + ("C:\\file or folder\\", "C:/"), + ("file:///", "{current_drive}/"), + ("file:///path", "{current_drive}/"), + ] + if WIN + else [ + ("/", "/"), + ("/.", "/"), + ("/./", "/"), + ("file:/", "/"), + ("file:///", "/"), + ("local:/", "/"), + ("local:///", "/"), + ("/file or folder", "/"), + ("/file or folder/", "/"), + ("file:///path", "/"), + ("file://c/", "{cwd}"), + ], +) +def test_parent_edge_cases(path, parent, cwd, current_drive): + parent = parent.format(cwd=cwd, current_drive=current_drive) + + assert LocalFileSystem._parent(path) == parent + + def test_linked_files(tmpdir): tmpdir = str(tmpdir) fn0 = os.path.join(tmpdir, "target") @@ -647,26 +782,130 @@ def test_pickle(tmpdir): assert f2.read() == f.read() -def test_strip_protocol_expanduser(): - path = "file://~\\foo\\bar" if WIN else "file://~/foo/bar" - stripped = LocalFileSystem._strip_protocol(path) - assert path != stripped - assert "~" not in stripped - assert "file://" not in stripped - assert stripped.startswith(os.path.expanduser("~").replace("\\", "/")) - path = LocalFileSystem._strip_protocol("./", remove_trailing_slash=True) - assert not path.endswith("/") +@pytest.mark.parametrize( + "uri, expected", + [ + ("file://~/foo/bar", "{user_home}/foo/bar"), + ("~/foo/bar", "{user_home}/foo/bar"), + winonly("~\\foo\\bar", "{user_home}/foo/bar"), + winonly("file://~\\foo\\bar", "{user_home}/foo/bar"), + ], +) +def test_strip_protocol_expanduser(uri, expected, user_home): + expected = expected.format(user_home=user_home) + stripped = LocalFileSystem._strip_protocol(uri) + assert expected == stripped -def test_strip_protocol_no_authority(): - path = "file:\\foo\\bar" if WIN else "file:/foo/bar" - stripped = LocalFileSystem._strip_protocol(path) - assert "file:" not in stripped - assert stripped.endswith("/foo/bar") - if WIN: - assert ( - LocalFileSystem._strip_protocol("file://C:\\path\\file") == "C:/path/file" - ) + +@pytest.mark.parametrize( + "uri, expected", + [ + ("file://", "{cwd}"), + ("file://.", "{cwd}"), + ("file://./", "{cwd}"), + ("./", "{cwd}"), + ("file:path", "{cwd}/path"), + ("file://path", "{cwd}/path"), + ("path", "{cwd}/path"), + ("./path", "{cwd}/path"), + winonly(".\\", "{cwd}"), + winonly("file://.\\path", "{cwd}/path"), + ], +) +def test_strip_protocol_relative_paths(uri, expected, cwd): + expected = expected.format(cwd=cwd) + + stripped = LocalFileSystem._strip_protocol(uri) + assert expected == stripped + + +@pytest.mark.parametrize( + "uri, expected", + [ + posixonly("file:/foo/bar", "/foo/bar"), + winonly("file:/foo/bar", "{current_drive}/foo/bar"), + winonly("file:\\foo\\bar", "{current_drive}/foo/bar"), + winonly("file:D:\\path\\file", "D:/path/file"), + winonly("file:/D:\\path\\file", "D:/path/file"), + winonly("file://D:\\path\\file", "D:/path/file"), + ], +) +def test_strip_protocol_no_authority(uri, expected, cwd, current_drive): + expected = expected.format(cwd=cwd, current_drive=current_drive) + + stripped = LocalFileSystem._strip_protocol(uri) + assert expected == stripped + + +@pytest.mark.parametrize( + "uri, expected", + [ + ("file:/path", "/path"), + ("file:///path", "/path"), + ("file:////path", "//path"), + ("local:/path", "/path"), + ("s3://bucket/key", "{cwd}/s3://bucket/key"), + ("/path", "/path"), + ("file:///", "/"), + ] + if not WIN + else [ + ("file:c:/path", "c:/path"), + ("file:/c:/path", "c:/path"), + ("file:/C:/path", "C:/path"), + ("file://c:/path", "c:/path"), + ("file:///c:/path", "c:/path"), + ("local:/path", "{current_drive}/path"), + ("s3://bucket/key", "{cwd}/s3://bucket/key"), + ("c:/path", "c:/path"), + ("c:\\path", "c:/path"), + ("file:///", "{current_drive}/"), + pytest.param( + "file://localhost/c:/path", + "c:/path", + marks=pytest.mark.xfail( + reason="rfc8089 section3 'localhost uri' not supported" + ), + ), + ], +) +def test_strip_protocol_absolute_paths(uri, expected, current_drive, cwd): + expected = expected.format(current_drive=current_drive, cwd=cwd) + + stripped = LocalFileSystem._strip_protocol(uri) + assert expected == stripped + + +@pytest.mark.parametrize( + "uri, expected", + [ + ("file:c|/path", "c:/path"), + ("file:/D|/path", "D:/path"), + ("file:///C|/path", "C:/path"), + ], +) +@pytest.mark.skipif(not WIN, reason="Windows only") +@pytest.mark.xfail(WIN, reason="legacy dos uris not supported") +def test_strip_protocol_legacy_dos_uris(uri, expected): + stripped = LocalFileSystem._strip_protocol(uri) + assert expected == stripped + + +@pytest.mark.parametrize( + "uri, stripped", + [ + ("file://remote/share/pth", "{cwd}/remote/share/pth"), + ("file:////remote/share/pth", "//remote/share/pth"), + ("file://///remote/share/pth", "///remote/share/pth"), + ("//remote/share/pth", "//remote/share/pth"), + winonly("\\\\remote\\share\\pth", "//remote/share/pth"), + ], +) +def test_strip_protocol_windows_remote_shares(uri, stripped, cwd): + stripped = stripped.format(cwd=cwd) + + assert LocalFileSystem._strip_protocol(uri) == stripped def test_mkdir_twice_faile(tmpdir): diff --git a/fsspec/implementations/zip.py b/fsspec/implementations/zip.py index 9d9c046bf..97f90eca6 100644 --- a/fsspec/implementations/zip.py +++ b/fsspec/implementations/zip.py @@ -44,7 +44,7 @@ def __init__( compression, allowZip64, compresslevel: passed to ZipFile Only relevant when creating a ZIP """ - super().__init__(self, **kwargs) + super().__init__(False, self, **kwargs) if mode not in set("rwa"): raise ValueError(f"mode '{mode}' no understood") self.mode = mode diff --git a/fsspec/spec.py b/fsspec/spec.py index bcc01f514..0aa19294b 100644 --- a/fsspec/spec.py +++ b/fsspec/spec.py @@ -9,16 +9,17 @@ from errno import ESPIPE from glob import has_magic from hashlib import sha256 -from typing import ClassVar +from typing import ClassVar, Optional from .callbacks import DEFAULT_CALLBACK from .config import apply_config, conf -from .dircache import DirCache +from .listings_cache import create_listings_cache from .transaction import Transaction from .utils import ( _unstrip_protocol, glob_translate, isfilelike, + normalize_listings_cache_options, other_paths, read_block, stringify_path, @@ -115,7 +116,12 @@ class AbstractFileSystem(metaclass=_Cached): #: Extra *class attributes* that should be considered when hashing. _extra_tokenize_attributes = () - def __init__(self, *args, **storage_options): + def __init__( + self, + listings_cache_options: Optional[bool, dict] = None, + *args, + **storage_options, + ): """Create and configure file-system instance Instances may be cachable, so if similar enough arguments are seen @@ -128,10 +134,11 @@ def __init__(self, *args, **storage_options): Parameters ---------- - use_listings_cache, listings_expiry_time, max_paths: - passed to ``DirCache``, if the implementation supports - directory listing caching. Pass use_listings_cache=False - to disable such caching. + listings_cache_options: bool or dict + If True, a default MemoryListingsCache cache is created. + If dict of arguments, used to create a directory cache using + argument cache_type ("memory" or "file"), expiry_time, and + other arguments passed to ``MemoryListingsCache`` or ``FileListingsCache``. skip_instance_cache: bool If this is a cachable implementation, pass True here to force creating a new instance even if a matching instance exists, and prevent @@ -146,7 +153,11 @@ def __init__(self, *args, **storage_options): self._intrans = False self._transaction = None self._invalidated_caches_in_transaction = [] - self.dircache = DirCache(**storage_options) + listings_cache_options = normalize_listings_cache_options( + listings_cache_options + ) + self.listings_cache_options = listings_cache_options + self.dircache = create_listings_cache(**listings_cache_options) if storage_options.pop("add_docs", None): warnings.warn("add_docs is no longer supported.", FutureWarning) @@ -358,6 +369,7 @@ def _ls_from_cache(self, path): but contains nothing), None if not in cache. """ parent = self._parent(path) + try: return self.dircache[path.rstrip("/")] except KeyError: diff --git a/fsspec/tests/test_spec.py b/fsspec/tests/test_spec.py index de23d783d..b267e0444 100644 --- a/fsspec/tests/test_spec.py +++ b/fsspec/tests/test_spec.py @@ -859,7 +859,7 @@ def test_json(): def test_ls_from_cache(): - fs = DummyTestFS() + fs = DummyTestFS(listings_cache_options=True) uncached_results = fs.ls("top_level/second_level/", refresh=True) assert fs.ls("top_level/second_level/", refresh=False) == uncached_results diff --git a/fsspec/tests/test_utils.py b/fsspec/tests/test_utils.py index 0efd5d91d..b007fa738 100644 --- a/fsspec/tests/test_utils.py +++ b/fsspec/tests/test_utils.py @@ -1,11 +1,12 @@ import io import sys -from pathlib import Path +from pathlib import Path, PurePath from unittest.mock import Mock import pytest import fsspec.utils +from fsspec.listings_cache import CacheType from fsspec.utils import ( can_be_local, common_prefix, @@ -13,6 +14,7 @@ infer_storage_options, merge_offset_ranges, mirror_from, + normalize_listings_cache_options, other_paths, read_block, seek_delimiter, @@ -440,3 +442,85 @@ def test_size(): f = io.BytesIO(b"hello") assert fsspec.utils.file_size(f) == 5 assert f.tell() == 0 + + +class _HasFspath: + def __fspath__(self): + return "foo" + + +class _HasPathAttr: + def __init__(self): + self.path = "foo" + + +@pytest.mark.parametrize( + "path,expected", + [ + # coerce to string + ("foo", "foo"), + (Path("foo"), "foo"), + (PurePath("foo"), "foo"), + (_HasFspath(), "foo"), + (_HasPathAttr(), "foo"), + # passthrough + (b"bytes", b"bytes"), + (None, None), + (1, 1), + (True, True), + (o := object(), o), + ([], []), + ((), ()), + (set(), set()), + ], +) +def test_stringify_path(path, expected): + path = fsspec.utils.stringify_path(path) + + assert path == expected + + +@pytest.mark.parametrize("listings_cache_options", [None, False, {}]) +def test_normalize_listings_cache_options_disable(listings_cache_options): + assert normalize_listings_cache_options(listings_cache_options) == { + "cache_type": CacheType.DISABLED, + "expiry_time": None, + } + + +def test_normalize_listings_cache_options_enable(): + assert normalize_listings_cache_options(True) == { + "cache_type": CacheType.MEMORY, + "expiry_time": None, + } + + +def test_normalize_listings_cache_options_with_cache_type(): + assert normalize_listings_cache_options({"cache_type": CacheType.FILE}) == { + "cache_type": CacheType.FILE, + "directory": None, + "expiry_time": None, + } + + +def test_normalize_listings_cache_options_with_expiry_time(): + assert normalize_listings_cache_options({"expiry_time": 10}) == { + "cache_type": CacheType.MEMORY, + "expiry_time": 10, + } + + +def test_normalize_listings_cache_options_file_cache_with_directory(): + assert normalize_listings_cache_options( + {"cache_type": CacheType.FILE, "directory": "foo"} + ) == {"cache_type": CacheType.FILE, "directory": "foo", "expiry_time": None} + + +def test_normalize_listings_cache_options_invalid_cache_type(): + with pytest.raises(ValueError): + normalize_listings_cache_options({"cache_type": "invalid"}) + + +def test_normalize_listings_cache_options_invalid_expiry_time(): + with pytest.raises(ValueError): + normalize_listings_cache_options({"expiry_time": -1}) diff --git a/fsspec/utils.py b/fsspec/utils.py index ba3f80be3..b59011630 100644 --- a/fsspec/utils.py +++ b/fsspec/utils.py @@ -18,17 +18,20 @@ Callable, Iterable, Iterator, + Optional, Sequence, TypeVar, + Union, ) from urllib.parse import urlsplit +from fsspec.listings_cache import CacheType + if TYPE_CHECKING: from typing_extensions import TypeGuard from fsspec.spec import AbstractFileSystem - DEFAULT_BLOCK_SIZE = 5 * 2**20 T = TypeVar("T") @@ -350,8 +353,6 @@ def stringify_path(filepath: str | os.PathLike[str] | pathlib.Path) -> str: return filepath elif hasattr(filepath, "__fspath__"): return filepath.__fspath__() - elif isinstance(filepath, pathlib.Path): - return str(filepath) elif hasattr(filepath, "path"): return filepath.path else: @@ -740,3 +741,49 @@ def glob_translate(pat): results.append(any_sep) res = "".join(results) return rf"(?s:{res})\Z" + + +def normalize_listings_cache_options( + listings_cache_options: Optional[Union[bool, dict]], +) -> dict: + """Normalize listings cache options + Cases: + - listings_cache_options is None: return disabled cache options (cache_type=CacheType.DISABLED, expiry_time=None) + - listings_cache_options is True: return default cache options (cache_type=CacheType.MEMORY, expiry_time=None) + - listings_cache_options is dict: return normalized cache options + """ + default_listings_cache_options = { + "cache_type": CacheType.MEMORY, + "expiry_time": None, + } + if not listings_cache_options: + return {"cache_type": CacheType.DISABLED, "expiry_time": None} + elif listings_cache_options is True: + return default_listings_cache_options + else: + normalized_listings_cache_options = default_listings_cache_options.copy() + normalized_listings_cache_options.update(listings_cache_options) + # disassemble and reassemble normalized_listings_cache_options + cache_type = normalized_listings_cache_options["cache_type"] + expiry_time = normalized_listings_cache_options["expiry_time"] + directory = normalized_listings_cache_options.get("directory") + try: + cache_type = CacheType(cache_type) + except ValueError: + try: + cache_type = CacheType[cache_type.upper()] + except KeyError as e: + raise ValueError( + f"Cache type must be one of {', '.join(ct.name.lower() for ct in CacheType)}" + ) from e + if expiry_time: + if expiry_time < 0: + raise ValueError("Expiry time must be positive") + expiry_time = int(expiry_time) if expiry_time else None + normalized_listings_cache_options = { + "cache_type": cache_type, + "expiry_time": expiry_time, + } + if cache_type == CacheType.FILE: + normalized_listings_cache_options["directory"] = directory + return normalized_listings_cache_options diff --git a/pyproject.toml b/pyproject.toml index caee973d2..6d3e819d6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,12 +29,14 @@ adl = ["adlfs"] arrow = ["pyarrow >= 1"] dask = ["dask", "distributed"] dev = ["ruff", "pre-commit"] +dircache = ["diskcache", "platformdirs"] dropbox = ["dropbox", "dropboxdrivefs", "requests"] entrypoints = [] full = [ 'adlfs', 'aiohttp !=4.0.0a0, !=4.0.0a1', 'dask', + 'diskcache', 'distributed', 'dropbox', 'dropboxdrivefs', @@ -44,6 +46,7 @@ full = [ 'ocifs', 'panel', 'paramiko', + 'platformdirs', 'pyarrow >= 1', 'pygit2', 'requests', @@ -84,6 +87,7 @@ test_full = [ 'aiohttp!=4.0.0a0, !=4.0.0a1', 'cloudpickle', 'dask', + 'diskcache', 'distributed', 'dropbox', 'dropboxdrivefs', @@ -99,6 +103,7 @@ test_full = [ 'pandas', 'panel', 'paramiko', + 'platformdirs', 'pyarrow', 'pyarrow >= 1', 'pyftpdlib',