Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix vector_read data server size query #56

Merged
merged 1 commit into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ strict = true
show_error_codes = true
enable_error_code = ["ignore-without-code", "redundant-expr", "truthy-bool"]
warn_unreachable = true
ignore_missing_imports = true


[tool.check-manifest]
Expand Down
63 changes: 35 additions & 28 deletions src/fsspec_xrootd/xrootd.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from __future__ import annotations

Check warning on line 1 in src/fsspec_xrootd/xrootd.py

View workflow job for this annotation

GitHub Actions / Format

Missing module docstring

import asyncio
import io
Expand All @@ -9,27 +9,20 @@
from functools import partial
from typing import Any, Callable, Iterable

from fsspec.asyn import ( # type: ignore[import-not-found]
AsyncFileSystem,
_run_coros_in_chunks,
sync_wrapper,
)
from fsspec.spec import AbstractBufferedFile # type: ignore[import-not-found]
from XRootD import client # type: ignore[import-not-found]
from XRootD.client.flags import ( # type: ignore[import-not-found]
from fsspec.asyn import AsyncFileSystem, _run_coros_in_chunks, sync_wrapper
from fsspec.spec import AbstractBufferedFile
from XRootD import client

Check failure on line 14 in src/fsspec_xrootd/xrootd.py

View workflow job for this annotation

GitHub Actions / Format

Unable to import 'XRootD'
from XRootD.client.flags import (

Check failure on line 15 in src/fsspec_xrootd/xrootd.py

View workflow job for this annotation

GitHub Actions / Format

Unable to import 'XRootD.client.flags'
DirListFlags,
MkDirFlags,
OpenFlags,
QueryCode,
StatInfoFlags,
)
from XRootD.client.responses import ( # type: ignore[import-not-found]
HostList,
XRootDStatus,
)
from XRootD.client.responses import HostList, XRootDStatus

Check failure on line 22 in src/fsspec_xrootd/xrootd.py

View workflow job for this annotation

GitHub Actions / Format

Unable to import 'XRootD.client.responses'


class ErrorCodes(IntEnum):

Check warning on line 25 in src/fsspec_xrootd/xrootd.py

View workflow job for this annotation

GitHub Actions / Format

Missing class docstring
INVALID_PATH = 400


Expand All @@ -37,7 +30,7 @@
future: asyncio.Future[tuple[XRootDStatus, Any]],
status: XRootDStatus,
content: Any,
servers: HostList,

Check warning on line 33 in src/fsspec_xrootd/xrootd.py

View workflow job for this annotation

GitHub Actions / Format

Unused argument 'servers'
) -> None:
"""Sets result of _async_wrap() future.

Expand All @@ -56,7 +49,7 @@
return
try:
future.get_loop().call_soon_threadsafe(future.set_result, (status, content))
except Exception as exc:

Check warning on line 52 in src/fsspec_xrootd/xrootd.py

View workflow job for this annotation

GitHub Actions / Format

Catching too general exception Exception
future.get_loop().call_soon_threadsafe(future.set_exception, exc)


Expand All @@ -73,9 +66,11 @@
An asyncio future. Result is set when _handle() is called back.
"""
future = asyncio.get_running_loop().create_future()
status = func(*args, callback=partial(_handle, future))
if not status.ok:
raise OSError(status.message.strip())
submit_status = func(*args, callback=partial(_handle, future))
if not submit_status.ok:
raise OSError(
f"Failed to submit {func!r} request: {submit_status.message.strip()}"
)
return await future


Expand Down Expand Up @@ -144,11 +139,13 @@
return deets


class XRootDFileSystem(AsyncFileSystem): # type: ignore[misc]

Check warning on line 142 in src/fsspec_xrootd/xrootd.py

View workflow job for this annotation

GitHub Actions / Format

Missing class docstring

Check warning on line 142 in src/fsspec_xrootd/xrootd.py

View workflow job for this annotation

GitHub Actions / Format

Method '_cp_file' is abstract in class 'AsyncFileSystem' but is not overridden in child class 'XRootDFileSystem'

Check warning on line 142 in src/fsspec_xrootd/xrootd.py

View workflow job for this annotation

GitHub Actions / Format

Method '_pipe_file' is abstract in class 'AsyncFileSystem' but is not overridden in child class 'XRootDFileSystem'

Check warning on line 142 in src/fsspec_xrootd/xrootd.py

View workflow job for this annotation

GitHub Actions / Format

Method '_put_file' is abstract in class 'AsyncFileSystem' but is not overridden in child class 'XRootDFileSystem'

Check warning on line 142 in src/fsspec_xrootd/xrootd.py

View workflow job for this annotation

GitHub Actions / Format

Method 'cp_file' is abstract in class 'AbstractFileSystem' but is not overridden in child class 'XRootDFileSystem'

Check warning on line 142 in src/fsspec_xrootd/xrootd.py

View workflow job for this annotation

GitHub Actions / Format

Method 'created' is abstract in class 'AbstractFileSystem' but is not overridden in child class 'XRootDFileSystem'
protocol = "root"
root_marker = "/"
default_timeout = 60
async_impl = True
default_max_num_chunks = 1024
default_max_chunk_size = 2097136

_dataserver_info_cache: dict[str, Any] = defaultdict(dict)

Expand Down Expand Up @@ -458,7 +455,8 @@
# Close the remote file
await _async_wrap(remote_file.close, self.timeout)

async def _get_max_chunk_info(self, file: Any) -> tuple[int, int]:
@classmethod
async def _get_max_chunk_info(cls, file: Any) -> tuple[int, int]:
"""Queries the XRootD server for info required for pyxrootd vector_read() function.
Queries for maximum number of chunks and the maximum chunk size allowed by the server.

Expand All @@ -471,20 +469,31 @@
Tuple of max chunk size and max number of chunks. Both ints.
"""
data_server = file.get_property("DataServer")
if data_server not in XRootDFileSystem._dataserver_info_cache:
if data_server == "":
return cls.default_max_num_chunks, cls.default_max_chunk_size
# Normalize to URL
data_server = client.URL(data_server)
data_server = f"{data_server.protocol}://{data_server.hostid}/"
if data_server not in cls._dataserver_info_cache:
fs = client.FileSystem(data_server)
status, result = await _async_wrap(
self._myclient.query, QueryCode.CONFIG, "readv_iov_max readv_ior_max"
fs.query, QueryCode.CONFIG, "readv_iov_max readv_ior_max"
)
if not status.ok:
raise OSError(
f"Server query for vector read info failed: {status.message}"
)
max_num_chunks, max_chunk_size = map(int, result.split(b"\n", 1))
XRootDFileSystem._dataserver_info_cache[data_server] = {
"max_num_chunks": int(max_num_chunks),
"max_chunk_size": int(max_chunk_size),
try:
max_num_chunks, max_chunk_size = map(int, result.split(b"\n", 1))
except ValueError:
raise OSError(
f"Server query for vector read info failed: could not parse {result!r}"
) from None
cls._dataserver_info_cache[data_server] = {
"max_num_chunks": max_num_chunks,
"max_chunk_size": max_chunk_size,
}
info = XRootDFileSystem._dataserver_info_cache[data_server]
info = cls._dataserver_info_cache[data_server]
return (info["max_num_chunks"], info["max_chunk_size"])

async def _cat_vector_read(
Expand Down Expand Up @@ -649,10 +658,8 @@
**kwargs,
)
if compression is not None:
from fsspec.compression import compr # type: ignore[import-not-found]
from fsspec.core import ( # type: ignore[import-not-found]
get_compression,
)
from fsspec.compression import compr
from fsspec.core import get_compression

compression = get_compression(path, compression)
compress = compr[compression]
Expand Down Expand Up @@ -697,7 +704,7 @@

self._myFile = client.File()
status, _n = self._myFile.open(
fs.protocol + "://" + fs.storage_options["hostid"] + "/" + path,
fs.unstrip_protocol(path),
self.mode,
timeout=self.timeout,
)
Expand Down
Loading