From c8954559d87761881313ea9593898849cc516d3a Mon Sep 17 00:00:00 2001 From: Nick Smith Date: Tue, 5 Mar 2024 17:34:20 -0600 Subject: [PATCH] Fix vector_read data server size query --- pyproject.toml | 1 + src/fsspec_xrootd/xrootd.py | 63 ++++++++++++++++++++----------------- 2 files changed, 36 insertions(+), 28 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 7f9e4cb..1d48c4c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,6 +25,7 @@ strict = true show_error_codes = true enable_error_code = ["ignore-without-code", "redundant-expr", "truthy-bool"] warn_unreachable = true +ignore_missing_imports = true [tool.check-manifest] diff --git a/src/fsspec_xrootd/xrootd.py b/src/fsspec_xrootd/xrootd.py index 2a22219..6ec6e66 100644 --- a/src/fsspec_xrootd/xrootd.py +++ b/src/fsspec_xrootd/xrootd.py @@ -9,24 +9,17 @@ from functools import partial from typing import Any, Callable, Iterable -from fsspec.asyn import ( # type: ignore[import-not-found] - AsyncFileSystem, - _run_coros_in_chunks, - sync_wrapper, -) -from fsspec.spec import AbstractBufferedFile # type: ignore[import-not-found] -from XRootD import client # type: ignore[import-not-found] -from XRootD.client.flags import ( # type: ignore[import-not-found] +from fsspec.asyn import AsyncFileSystem, _run_coros_in_chunks, sync_wrapper +from fsspec.spec import AbstractBufferedFile +from XRootD import client +from XRootD.client.flags import ( DirListFlags, MkDirFlags, OpenFlags, QueryCode, StatInfoFlags, ) -from XRootD.client.responses import ( # type: ignore[import-not-found] - HostList, - XRootDStatus, -) +from XRootD.client.responses import HostList, XRootDStatus class ErrorCodes(IntEnum): @@ -73,9 +66,11 @@ async def _async_wrap(func: Callable[..., Any], *args: Any) -> Any: An asyncio future. Result is set when _handle() is called back. """ future = asyncio.get_running_loop().create_future() - status = func(*args, callback=partial(_handle, future)) - if not status.ok: - raise OSError(status.message.strip()) + submit_status = func(*args, callback=partial(_handle, future)) + if not submit_status.ok: + raise OSError( + f"Failed to submit {func!r} request: {submit_status.message.strip()}" + ) return await future @@ -149,6 +144,8 @@ class XRootDFileSystem(AsyncFileSystem): # type: ignore[misc] root_marker = "/" default_timeout = 60 async_impl = True + default_max_num_chunks = 1024 + default_max_chunk_size = 2097136 _dataserver_info_cache: dict[str, Any] = defaultdict(dict) @@ -458,7 +455,8 @@ async def _get_file( # Close the remote file await _async_wrap(remote_file.close, self.timeout) - async def _get_max_chunk_info(self, file: Any) -> tuple[int, int]: + @classmethod + async def _get_max_chunk_info(cls, file: Any) -> tuple[int, int]: """Queries the XRootD server for info required for pyxrootd vector_read() function. Queries for maximum number of chunks and the maximum chunk size allowed by the server. @@ -471,20 +469,31 @@ async def _get_max_chunk_info(self, file: Any) -> tuple[int, int]: Tuple of max chunk size and max number of chunks. Both ints. """ data_server = file.get_property("DataServer") - if data_server not in XRootDFileSystem._dataserver_info_cache: + if data_server == "": + return cls.default_max_num_chunks, cls.default_max_chunk_size + # Normalize to URL + data_server = client.URL(data_server) + data_server = f"{data_server.protocol}://{data_server.hostid}/" + if data_server not in cls._dataserver_info_cache: + fs = client.FileSystem(data_server) status, result = await _async_wrap( - self._myclient.query, QueryCode.CONFIG, "readv_iov_max readv_ior_max" + fs.query, QueryCode.CONFIG, "readv_iov_max readv_ior_max" ) if not status.ok: raise OSError( f"Server query for vector read info failed: {status.message}" ) - max_num_chunks, max_chunk_size = map(int, result.split(b"\n", 1)) - XRootDFileSystem._dataserver_info_cache[data_server] = { - "max_num_chunks": int(max_num_chunks), - "max_chunk_size": int(max_chunk_size), + try: + max_num_chunks, max_chunk_size = map(int, result.split(b"\n", 1)) + except ValueError: + raise OSError( + f"Server query for vector read info failed: could not parse {result!r}" + ) from None + cls._dataserver_info_cache[data_server] = { + "max_num_chunks": max_num_chunks, + "max_chunk_size": max_chunk_size, } - info = XRootDFileSystem._dataserver_info_cache[data_server] + info = cls._dataserver_info_cache[data_server] return (info["max_num_chunks"], info["max_chunk_size"]) async def _cat_vector_read( @@ -649,10 +658,8 @@ def open( **kwargs, ) if compression is not None: - from fsspec.compression import compr # type: ignore[import-not-found] - from fsspec.core import ( # type: ignore[import-not-found] - get_compression, - ) + from fsspec.compression import compr + from fsspec.core import get_compression compression = get_compression(path, compression) compress = compr[compression] @@ -697,7 +704,7 @@ def __init__( self._myFile = client.File() status, _n = self._myFile.open( - fs.protocol + "://" + fs.storage_options["hostid"] + "/" + path, + fs.unstrip_protocol(path), self.mode, timeout=self.timeout, )