Skip to content

Commit

Permalink
Fix vector_read data server size query
Browse files Browse the repository at this point in the history
  • Loading branch information
nsmith- committed Mar 5, 2024
1 parent b12503e commit c895455
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 28 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ strict = true
show_error_codes = true
enable_error_code = ["ignore-without-code", "redundant-expr", "truthy-bool"]
warn_unreachable = true
ignore_missing_imports = true


[tool.check-manifest]
Expand Down
63 changes: 35 additions & 28 deletions src/fsspec_xrootd/xrootd.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,17 @@
from functools import partial
from typing import Any, Callable, Iterable

from fsspec.asyn import ( # type: ignore[import-not-found]
AsyncFileSystem,
_run_coros_in_chunks,
sync_wrapper,
)
from fsspec.spec import AbstractBufferedFile # type: ignore[import-not-found]
from XRootD import client # type: ignore[import-not-found]
from XRootD.client.flags import ( # type: ignore[import-not-found]
from fsspec.asyn import AsyncFileSystem, _run_coros_in_chunks, sync_wrapper
from fsspec.spec import AbstractBufferedFile
from XRootD import client

Check failure on line 14 in src/fsspec_xrootd/xrootd.py

View workflow job for this annotation

GitHub Actions / Format

Unable to import 'XRootD'
from XRootD.client.flags import (

Check failure on line 15 in src/fsspec_xrootd/xrootd.py

View workflow job for this annotation

GitHub Actions / Format

Unable to import 'XRootD.client.flags'
DirListFlags,
MkDirFlags,
OpenFlags,
QueryCode,
StatInfoFlags,
)
from XRootD.client.responses import ( # type: ignore[import-not-found]
HostList,
XRootDStatus,
)
from XRootD.client.responses import HostList, XRootDStatus

Check failure on line 22 in src/fsspec_xrootd/xrootd.py

View workflow job for this annotation

GitHub Actions / Format

Unable to import 'XRootD.client.responses'


class ErrorCodes(IntEnum):

Check warning on line 25 in src/fsspec_xrootd/xrootd.py

View workflow job for this annotation

GitHub Actions / Format

Missing class docstring
Expand Down Expand Up @@ -73,9 +66,11 @@ async def _async_wrap(func: Callable[..., Any], *args: Any) -> Any:
An asyncio future. Result is set when _handle() is called back.
"""
future = asyncio.get_running_loop().create_future()
status = func(*args, callback=partial(_handle, future))
if not status.ok:
raise OSError(status.message.strip())
submit_status = func(*args, callback=partial(_handle, future))
if not submit_status.ok:
raise OSError(
f"Failed to submit {func!r} request: {submit_status.message.strip()}"
)
return await future


Expand Down Expand Up @@ -149,6 +144,8 @@ class XRootDFileSystem(AsyncFileSystem): # type: ignore[misc]
root_marker = "/"
default_timeout = 60
async_impl = True
default_max_num_chunks = 1024
default_max_chunk_size = 2097136

_dataserver_info_cache: dict[str, Any] = defaultdict(dict)

Expand Down Expand Up @@ -458,7 +455,8 @@ async def _get_file(
# Close the remote file
await _async_wrap(remote_file.close, self.timeout)

async def _get_max_chunk_info(self, file: Any) -> tuple[int, int]:
@classmethod
async def _get_max_chunk_info(cls, file: Any) -> tuple[int, int]:
"""Queries the XRootD server for info required for pyxrootd vector_read() function.
Queries for maximum number of chunks and the maximum chunk size allowed by the server.
Expand All @@ -471,20 +469,31 @@ async def _get_max_chunk_info(self, file: Any) -> tuple[int, int]:
Tuple of max chunk size and max number of chunks. Both ints.
"""
data_server = file.get_property("DataServer")
if data_server not in XRootDFileSystem._dataserver_info_cache:
if data_server == "":
return cls.default_max_num_chunks, cls.default_max_chunk_size
# Normalize to URL
data_server = client.URL(data_server)
data_server = f"{data_server.protocol}://{data_server.hostid}/"
if data_server not in cls._dataserver_info_cache:
fs = client.FileSystem(data_server)
status, result = await _async_wrap(
self._myclient.query, QueryCode.CONFIG, "readv_iov_max readv_ior_max"
fs.query, QueryCode.CONFIG, "readv_iov_max readv_ior_max"
)
if not status.ok:
raise OSError(
f"Server query for vector read info failed: {status.message}"
)
max_num_chunks, max_chunk_size = map(int, result.split(b"\n", 1))
XRootDFileSystem._dataserver_info_cache[data_server] = {
"max_num_chunks": int(max_num_chunks),
"max_chunk_size": int(max_chunk_size),
try:
max_num_chunks, max_chunk_size = map(int, result.split(b"\n", 1))
except ValueError:
raise OSError(
f"Server query for vector read info failed: could not parse {result!r}"
) from None
cls._dataserver_info_cache[data_server] = {
"max_num_chunks": max_num_chunks,
"max_chunk_size": max_chunk_size,
}
info = XRootDFileSystem._dataserver_info_cache[data_server]
info = cls._dataserver_info_cache[data_server]
return (info["max_num_chunks"], info["max_chunk_size"])

async def _cat_vector_read(
Expand Down Expand Up @@ -649,10 +658,8 @@ def open(
**kwargs,
)
if compression is not None:
from fsspec.compression import compr # type: ignore[import-not-found]
from fsspec.core import ( # type: ignore[import-not-found]
get_compression,
)
from fsspec.compression import compr
from fsspec.core import get_compression

compression = get_compression(path, compression)
compress = compr[compression]
Expand Down Expand Up @@ -697,7 +704,7 @@ def __init__(

self._myFile = client.File()
status, _n = self._myFile.open(
fs.protocol + "://" + fs.storage_options["hostid"] + "/" + path,
fs.unstrip_protocol(path),
self.mode,
timeout=self.timeout,
)
Expand Down

0 comments on commit c895455

Please sign in to comment.