From b566eda9ce04319c89b40b399687c73733a98de5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20K=C3=B6lling?= Date: Wed, 18 Sep 2024 14:42:19 -0400 Subject: [PATCH 01/10] fixup IPNS --- ipfsspec/async_ipfs.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ipfsspec/async_ipfs.py b/ipfsspec/async_ipfs.py index a6d003d..3e8a1eb 100644 --- a/ipfsspec/async_ipfs.py +++ b/ipfsspec/async_ipfs.py @@ -167,14 +167,14 @@ def get_gateway(protocol="ipfs"): ipfs_gateway = os.environ.get("IPFS_GATEWAY", "") if ipfs_gateway: logger.debug("using IPFS gateway from IPFS_GATEWAY environment variable: %s", ipfs_gateway) - return AsyncGateway(ipfs_gateway, protocol) + return AsyncIPFSGateway(ipfs_gateway, protocol) # internal configuration: accept IPFSSPEC_GATEWAYS for backwards compatibility if ipfsspec_gateways := os.environ.get("IPFSSPEC_GATEWAYS", ""): ipfs_gateway = ipfsspec_gateways.split()[0] logger.debug("using IPFS gateway from IPFSSPEC_GATEWAYS environment variable: %s", ipfs_gateway) warnings.warn("The IPFSSPEC_GATEWAYS environment variable is deprecated, please configure your IPFS Gateway according to IPIP-280, e.g. by using the IPFS_GATEWAY environment variable or using the ~/.ipfs/gateway file.", DeprecationWarning) - return AsyncGateway(ipfs_gateway, protocol) + return AsyncIPFSGateway(ipfs_gateway, protocol) # check various well-known files for possible gateway configurations if ipfs_path := os.environ.get("IPFS_PATH", ""): From ddc41d445e5d3a7d96016cd330fb0000bb49f27d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20K=C3=B6lling?= Date: Wed, 18 Sep 2024 16:47:43 -0400 Subject: [PATCH 02/10] refactor: don't need AsyncIPFSGatewayBase anymore --- ipfsspec/async_ipfs.py | 104 ++++++++++++++++++++--------------------- 1 file changed, 52 insertions(+), 52 deletions(-) diff --git a/ipfsspec/async_ipfs.py b/ipfsspec/async_ipfs.py index 3e8a1eb..ad492cb 100644 --- a/ipfsspec/async_ipfs.py +++ b/ipfsspec/async_ipfs.py @@ -21,7 +21,58 @@ def __init__(self, retry_after=None): self.retry_after = retry_after -class AsyncIPFSGatewayBase: +class AsyncIPFSGateway: + resolution = "path" + + def __init__(self, url, protocol="ipfs"): + self.url = url + self.protocol = protocol + + async def api_get(self, endpoint, session, **kwargs): + res = await session.get(self.url + "/api/v0/" + endpoint, params=kwargs, trace_request_ctx={'gateway': self.url}) + self._raise_requests_too_quick(res) + return res + + async def api_post(self, endpoint, session, **kwargs): + res = await session.post(self.url + "/api/v0/" + endpoint, params=kwargs, trace_request_ctx={'gateway': self.url}) + self._raise_requests_too_quick(res) + return res + + async def _cid_req(self, method, path, headers=None, **kwargs): + headers = headers or {} + if self.resolution == "path": + res = await method("/".join((self.url, self.protocol, path)), trace_request_ctx={'gateway': self.url}, headers=headers, **kwargs) + elif self.resolution == "subdomain": + raise NotImplementedError("subdomain resolution is not yet implemented") + else: + raise NotImplementedError(f"'{self.resolution}' resolution is not known") + # TODO: maybe handle 301 here + self._raise_requests_too_quick(res) + return res + + async def cid_head(self, path, session, headers=None, **kwargs): + return await self._cid_req(session.head, path, headers=headers, **kwargs) + + async def cid_get(self, path, session, headers=None, **kwargs): + return await self._cid_req(session.get, path, headers=headers, **kwargs) + + async def version(self, session): + res = await self.api_get("version", session) + res.raise_for_status() + return await res.json() + + @staticmethod + def _raise_requests_too_quick(response): + if response.status == 429: + if "retry-after" in response.headers: + retry_after = int(response.headers["retry-after"]) + else: + retry_after = None + raise RequestsTooQuick(retry_after) + + def __str__(self): + return f"GW({self.url})" + async def stat(self, path, session): res = await self.api_get("files/stat", session, arg=path) self._raise_not_found_for_status(res, path) @@ -87,57 +138,6 @@ def _raise_not_found_for_status(self, response, url): response.raise_for_status() -class AsyncIPFSGateway(AsyncIPFSGatewayBase): - resolution = "path" - - def __init__(self, url, protocol="ipfs"): - self.url = url - self.protocol = protocol - - async def api_get(self, endpoint, session, **kwargs): - res = await session.get(self.url + "/api/v0/" + endpoint, params=kwargs, trace_request_ctx={'gateway': self.url}) - self._raise_requests_too_quick(res) - return res - - async def api_post(self, endpoint, session, **kwargs): - res = await session.post(self.url + "/api/v0/" + endpoint, params=kwargs, trace_request_ctx={'gateway': self.url}) - self._raise_requests_too_quick(res) - return res - - async def _cid_req(self, method, path, headers=None, **kwargs): - headers = headers or {} - if self.resolution == "path": - res = await method("/".join((self.url, self.protocol, path)), trace_request_ctx={'gateway': self.url}, headers=headers, **kwargs) - elif self.resolution == "subdomain": - raise NotImplementedError("subdomain resolution is not yet implemented") - else: - raise NotImplementedError(f"'{self.resolution}' resolution is not known") - # TODO: maybe handle 301 here - self._raise_requests_too_quick(res) - return res - - async def cid_head(self, path, session, headers=None, **kwargs): - return await self._cid_req(session.head, path, headers=headers, **kwargs) - - async def cid_get(self, path, session, headers=None, **kwargs): - return await self._cid_req(session.get, path, headers=headers, **kwargs) - - async def version(self, session): - res = await self.api_get("version", session) - res.raise_for_status() - return await res.json() - - @staticmethod - def _raise_requests_too_quick(response): - if response.status == 429: - if "retry-after" in response.headers: - retry_after = int(response.headers["retry-after"]) - else: - retry_after = None - raise RequestsTooQuick(retry_after) - - def __str__(self): - return f"GW({self.url})" async def get_client(**kwargs): From 85e873faf7ada6d3023a02694d8592d8b4c91e95 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20K=C3=B6lling?= Date: Wed, 18 Sep 2024 17:06:28 -0400 Subject: [PATCH 03/10] deprecate: remove version() from gateway There's no real standard way of retrieving IPFS Gateway versions, so we shouldn't pretend there'd be one. --- ipfsspec/async_ipfs.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/ipfsspec/async_ipfs.py b/ipfsspec/async_ipfs.py index ad492cb..0929245 100644 --- a/ipfsspec/async_ipfs.py +++ b/ipfsspec/async_ipfs.py @@ -56,11 +56,6 @@ async def cid_head(self, path, session, headers=None, **kwargs): async def cid_get(self, path, session, headers=None, **kwargs): return await self._cid_req(session.get, path, headers=headers, **kwargs) - async def version(self, session): - res = await self.api_get("version", session) - res.raise_for_status() - return await res.json() - @staticmethod def _raise_requests_too_quick(response): if response.status == 429: From 3053d9ddc54383b3d6db9e37e78197c65df54b82 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20K=C3=B6lling?= Date: Wed, 18 Sep 2024 17:07:43 -0400 Subject: [PATCH 04/10] remove api_get and api_post as there's no API anymore --- ipfsspec/async_ipfs.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/ipfsspec/async_ipfs.py b/ipfsspec/async_ipfs.py index 0929245..287cd8e 100644 --- a/ipfsspec/async_ipfs.py +++ b/ipfsspec/async_ipfs.py @@ -28,16 +28,6 @@ def __init__(self, url, protocol="ipfs"): self.url = url self.protocol = protocol - async def api_get(self, endpoint, session, **kwargs): - res = await session.get(self.url + "/api/v0/" + endpoint, params=kwargs, trace_request_ctx={'gateway': self.url}) - self._raise_requests_too_quick(res) - return res - - async def api_post(self, endpoint, session, **kwargs): - res = await session.post(self.url + "/api/v0/" + endpoint, params=kwargs, trace_request_ctx={'gateway': self.url}) - self._raise_requests_too_quick(res) - return res - async def _cid_req(self, method, path, headers=None, **kwargs): headers = headers or {} if self.resolution == "path": From 27b074890d24cf6b7dffd1d7fac92e44f99dc988 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20K=C3=B6lling?= Date: Wed, 18 Sep 2024 17:08:17 -0400 Subject: [PATCH 05/10] rename cid_(get|head) to get|head These are the only methods in the current gateway spec. --- ipfsspec/async_ipfs.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/ipfsspec/async_ipfs.py b/ipfsspec/async_ipfs.py index 287cd8e..e8c62bc 100644 --- a/ipfsspec/async_ipfs.py +++ b/ipfsspec/async_ipfs.py @@ -40,10 +40,10 @@ async def _cid_req(self, method, path, headers=None, **kwargs): self._raise_requests_too_quick(res) return res - async def cid_head(self, path, session, headers=None, **kwargs): + async def head(self, path, session, headers=None, **kwargs): return await self._cid_req(session.head, path, headers=headers, **kwargs) - async def cid_get(self, path, session, headers=None, **kwargs): + async def get(self, path, session, headers=None, **kwargs): return await self._cid_req(session.get, path, headers=headers, **kwargs) @staticmethod @@ -67,7 +67,7 @@ async def file_info(self, path, session): info = {"name": path} headers = {"Accept-Encoding": "identity"} # this ensures correct file size - res = await self.cid_head(path, session, headers=headers, allow_redirects=True) + res = await self.head(path, session, headers=headers, allow_redirects=True) async with res: self._raise_not_found_for_status(res, path) @@ -92,7 +92,7 @@ async def file_info(self, path, session): return info async def cat(self, path, session): - res = await self.cid_get(path, session) + res = await self.get(path, session) async with res: self._raise_not_found_for_status(res, path) if res.status != 200: From faa0cc0b0e5092bfcd9a805973867f1bdd5c5ac5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20K=C3=B6lling?= Date: Thu, 19 Sep 2024 15:25:18 -0400 Subject: [PATCH 06/10] implement ls() and info() using IPFS Gateway APIs This change uses the IPFS Trustless Gateway Spec to retrieve information required for ls() and info() by decoding the IPFS data blocks themselves. --- ipfsspec/async_ipfs.py | 120 ++++++++++++++++++++++++----------------- ipfsspec/car.py | 116 +++++++++++++++++++++++++++++++++++++++ ipfsspec/unixfsv1.py | 109 +++++++++++++++++++++++++++++++++++++ ipfsspec/utils.py | 21 ++++++++ setup.py | 3 ++ test/test_async.py | 39 +++++++++----- 6 files changed, 346 insertions(+), 62 deletions(-) create mode 100644 ipfsspec/car.py create mode 100644 ipfsspec/unixfsv1.py create mode 100644 ipfsspec/utils.py diff --git a/ipfsspec/async_ipfs.py b/ipfsspec/async_ipfs.py index e8c62bc..43a67bf 100644 --- a/ipfsspec/async_ipfs.py +++ b/ipfsspec/async_ipfs.py @@ -6,15 +6,22 @@ from pathlib import Path import warnings +import asyncio import aiohttp from fsspec.asyn import AsyncFileSystem, sync, sync_wrapper from fsspec.exceptions import FSTimeoutError +from multiformats import CID, multicodec +from .car import read_car +from . import unixfsv1 + import logging logger = logging.getLogger("ipfsspec") +DagPbCodec = multicodec.get("dag-pb") +RawCodec = multicodec.get("raw") class RequestsTooQuick(OSError): def __init__(self, retry_after=None): @@ -58,38 +65,52 @@ def _raise_requests_too_quick(response): def __str__(self): return f"GW({self.url})" - async def stat(self, path, session): - res = await self.api_get("files/stat", session, arg=path) + async def info(self, path, session): + res = await self.get(path, session, headers={"Accept": "application/vnd.ipld.raw"}, params={"format": "raw"}) self._raise_not_found_for_status(res, path) - return await res.json() - - async def file_info(self, path, session): - info = {"name": path} - - headers = {"Accept-Encoding": "identity"} # this ensures correct file size - res = await self.head(path, session, headers=headers, allow_redirects=True) - - async with res: - self._raise_not_found_for_status(res, path) - if res.status != 200: - # TODO: maybe handle 301 here - raise FileNotFoundError(path) - if "Content-Length" in res.headers: - info["size"] = int(res.headers["Content-Length"]) - elif "Content-Range" in res.headers: - info["size"] = int(res.headers["Content-Range"].split("/")[1]) - - if "ETag" in res.headers: - etag = res.headers["ETag"].strip("\"") - info["ETag"] = etag - if etag.startswith("DirIndex"): - info["type"] = "directory" - info["CID"] = etag.split("-")[-1] - else: - info["type"] = "file" - info["CID"] = etag - - return info + cid = CID.decode(res.headers["X-Ipfs-Roots"].split(",")[-1]) + resdata = await res.read() + + if cid.codec == RawCodec: + return { + "name": path, + "CID": str(cid), + "type": "file", + "size": len(resdata), + } + elif cid.codec == DagPbCodec: + node = unixfsv1.PBNode.loads(resdata) + data = unixfsv1.Data.loads(node.Data) + if data.Type == unixfsv1.DataType.Raw: + raise ValueError(f"The path '{path}' is only a subsection of a file") + elif data.Type == unixfsv1.DataType.Directory: + return { + "name": path, + "CID": str(cid), + "type": "directory", + "islink": False, + } + elif data.Type == unixfsv1.DataType.File: + return { + "name": path, + "CID": str(cid), + "type": "file", + "size": data.filesize, + "islink": False, + } + elif data.Type == unixfsv1.DataType.Metadata: + raise NotImplementedError(f"The path '{path}' contains a Metadata node, this is currently not implemented") + elif data.Type == unixfsv1.DataType.Symlink: + return { + "name": path, + "CID": str(cid), + "type": "other", # TODO: maybe we should have directory or file as returning type, but that probably would require resolving at least another level of blocks + "islink": True, + } + elif data.Type == unixfsv1.DataType.HAMTShard: + raise NotImplementedError(f"The path '{path}' contains a HAMTSharded directory, this is currently not implemented") + else: + raise ValueError(f"The path '{path}' is neiter an IPFS UNIXFSv1 object") async def cat(self, path, session): res = await self.get(path, session) @@ -99,18 +120,25 @@ async def cat(self, path, session): raise FileNotFoundError(path) return await res.read() - async def ls(self, path, session): - res = await self.api_get("ls", session, arg=path) + async def ls(self, path, session, detail=False): + res = await self.get(path, session, headers={"Accept": "application/vnd.ipld.car"}, params={"format": "car", "dag-scope": "entity"}) self._raise_not_found_for_status(res, path) - resdata = await res.json() - types = {1: "directory", 2: "file"} - return [{ - "name": path + "/" + link["Name"], - "CID": link["Hash"], - "type": types[link["Type"]], - "size": link["Size"], - } - for link in resdata["Objects"][0]["Links"]] + resdata = await res.read() + root = CID.decode(res.headers["X-Ipfs-Roots"].split(",")[-1]) + assert root.codec == DagPbCodec, "this is not a directory" + _, blocks = read_car(resdata) # roots should be ignored by https://specs.ipfs.tech/http-gateways/trustless-gateway/ + blocks = {cid: data for cid, data, _ in blocks} + root_block = unixfsv1.PBNode.loads(blocks[root]) + root_data = unixfsv1.Data.loads(root_block.Data) + if root_data.Type != unixfsv1.DataType.Directory: + raise ValueError(f"The path '{path}' is not a directory") + + if detail: + return await asyncio.gather(*( + self.info(path + "/" + link.Name, session) + for link in root_block.Links)) + else: + return [path + "/" + link.Name for link in root_block.Links] def _raise_not_found_for_status(self, response, url): """ @@ -259,11 +287,7 @@ async def set_session(self): async def _ls(self, path, detail=True, **kwargs): path = self._strip_protocol(path) session = await self.set_session() - res = await self.gateway.ls(path, session) - if detail: - return res - else: - return [r["name"] for r in res] + return await self.gateway.ls(path, session, detail=detail) ls = sync_wrapper(_ls) @@ -275,7 +299,7 @@ async def _cat_file(self, path, start=None, end=None, **kwargs): async def _info(self, path, **kwargs): path = self._strip_protocol(path) session = await self.set_session() - return await self.gateway.file_info(path, session) + return await self.gateway.info(path, session) def open(self, path, mode="rb", block_size=None, cache_options=None, **kwargs): if mode != "rb": diff --git a/ipfsspec/car.py b/ipfsspec/car.py new file mode 100644 index 0000000..028e41e --- /dev/null +++ b/ipfsspec/car.py @@ -0,0 +1,116 @@ +""" +CAR handling functions. +""" + +from typing import List, Optional, Tuple, Union, Iterator, BinaryIO +import dataclasses + +import dag_cbor +from multiformats import CID, varint, multicodec, multihash + +from .utils import is_cid_list, StreamLike, ensure_stream + +DagPbCodec = multicodec.get("dag-pb") +Sha256Hash = multihash.get("sha2-256") + +@dataclasses.dataclass +class CARBlockLocation: + varint_size: int + cid_size: int + payload_size: int + offset: int = 0 + + @property + def cid_offset(self) -> int: + return self.offset + self.varint_size + + @property + def payload_offset(self) -> int: + return self.offset + self.varint_size + self.cid_size + + @property + def size(self) -> int: + return self.varint_size + self.cid_size + self.payload_size + + +def decode_car_header(stream: BinaryIO) -> Tuple[List[CID], int]: + """ + Decodes a CAR header and returns the list of contained roots. + """ + header_size, visize, _ = varint.decode_raw(stream) # type: ignore [call-overload] # varint uses BufferedIOBase + header = dag_cbor.decode(stream.read(header_size)) + if not isinstance(header, dict): + raise ValueError("no valid CAR header found") + if header["version"] != 1: + raise ValueError("CAR is not version 1") + roots = header["roots"] + if not isinstance(roots, list): + raise ValueError("CAR header doesn't contain roots") + if not is_cid_list(roots): + raise ValueError("CAR roots do not only contain CIDs") + return roots, visize + header_size + + +def decode_raw_car_block(stream: BinaryIO) -> Optional[Tuple[CID, bytes, CARBlockLocation]]: + try: + block_size, visize, _ = varint.decode_raw(stream) # type: ignore [call-overload] # varint uses BufferedIOBase + except ValueError: + # stream has likely been consumed entirely + return None + + data = stream.read(block_size) + # as the size of the CID is variable but not explicitly given in + # the CAR format, we need to partially decode each CID to determine + # its size and the location of the payload data + if data[0] == 0x12 and data[1] == 0x20: + # this is CIDv0 + cid_version = 0 + default_base = "base58btc" + cid_codec: Union[int, multicodec.Multicodec] = DagPbCodec + hash_codec: Union[int, multihash.Multihash] = Sha256Hash + cid_digest = data[2:34] + data = data[34:] + else: + # this is CIDv1(+) + cid_version, _, data = varint.decode_raw(data) + if cid_version != 1: + raise ValueError(f"CIDv{cid_version} is currently not supported") + default_base = "base32" + cid_codec, _, data = multicodec.unwrap_raw(data) + hash_codec, _, data = varint.decode_raw(data) + digest_size, _, data = varint.decode_raw(data) + cid_digest = data[:digest_size] + data = data[digest_size:] + cid = CID(default_base, cid_version, cid_codec, (hash_codec, cid_digest)) + + if not cid.hashfun.digest(data) == cid.digest: + raise ValueError(f"CAR is corrupted. Entry '{cid}' could not be verified") + + return cid, bytes(data), CARBlockLocation(visize, block_size - len(data), len(data)) + + +def read_car(stream_or_bytes: StreamLike) -> Tuple[List[CID], Iterator[Tuple[CID, bytes, CARBlockLocation]]]: + """ + Reads a CAR. + + Parameters + ---------- + stream_or_bytes: StreamLike + Stream to read CAR from + + Returns + ------- + roots : List[CID] + Roots as given by the CAR header + blocks : Iterator[Tuple[cid, BytesLike, CARBlockLocation]] + Iterator over all blocks contained in the CAR + """ + stream = ensure_stream(stream_or_bytes) + roots, header_size = decode_car_header(stream) + def blocks() -> Iterator[Tuple[CID, bytes, CARBlockLocation]]: + offset = header_size + while (next_block := decode_raw_car_block(stream)) is not None: + cid, data, sizes = next_block + yield cid, data, dataclasses.replace(sizes, offset=offset) + offset += sizes.size + return roots, blocks() diff --git a/ipfsspec/unixfsv1.py b/ipfsspec/unixfsv1.py new file mode 100644 index 0000000..b3bb03c --- /dev/null +++ b/ipfsspec/unixfsv1.py @@ -0,0 +1,109 @@ +""" +from UNIXFS spec (https://github.com/ipfs/specs/blob/master/UNIXFS.md): + +message Data { + enum DataType { + Raw = 0; + Directory = 1; + File = 2; + Metadata = 3; + Symlink = 4; + HAMTShard = 5; + } + + required DataType Type = 1; + optional bytes Data = 2; + optional uint64 filesize = 3; + repeated uint64 blocksizes = 4; + optional uint64 hashType = 5; + optional uint64 fanout = 6; + optional uint32 mode = 7; + optional UnixTime mtime = 8; +} + +message Metadata { + optional string MimeType = 1; +} + +message UnixTime { + required int64 Seconds = 1; + optional fixed32 FractionalNanoseconds = 2; +} + + + +from DAG-PB spec (https://ipld.io/specs/codecs/dag-pb/spec/): + +message PBLink { + // binary CID (with no multibase prefix) of the target object + optional bytes Hash = 1; + + // UTF-8 string name + optional string Name = 2; + + // cumulative size of target object + optional uint64 Tsize = 3; +} + +message PBNode { + // refs to other objects + repeated PBLink Links = 2; + + // opaque user data + optional bytes Data = 1; +} +""" + +from dataclasses import dataclass +from enum import IntEnum +from typing import List, Optional + +from pure_protobuf.dataclasses_ import field, message # type: ignore +from pure_protobuf.types import uint32, uint64, int64, fixed32 # type: ignore + +class DataType(IntEnum): + Raw = 0 + Directory = 1 + File = 2 + Metadata = 3 + Symlink = 4 + HAMTShard = 5 + +@message +@dataclass +class UnixTime: + Seconds: int64 = field(1) + FractionalNanoseconds: Optional[fixed32] = field(2) + +@message +@dataclass +class Data: + # pylint: disable=too-many-instance-attributes + Type: DataType = field(1) + Data: Optional[bytes] = field(2, default=None) + filesize: Optional[uint64] = field(3, default=None) + blocksizes: List[uint64] = field(4, default_factory=list, packed=False) + hashType: Optional[uint64] = field(5, default=None) + fanout: Optional[uint64] = field(6, default=None) + mode: Optional[uint32] = field(7, default=None) + mtime: Optional[UnixTime] = field(8, default=None) + +@message +@dataclass +class Metadata: + MimeType: Optional[str] = field(1, default=None) + + +@message +@dataclass +class PBLink: + Hash: Optional[bytes] = field(1, default=None) + Name: Optional[str] = field(2, default=None) + Tsize: Optional[uint64] = field(3, default=None) + +Data_ = Data +@message +@dataclass +class PBNode: + Links: List[PBLink] = field(2, default_factory=list) + Data: Optional[bytes] = field(1, default=None) diff --git a/ipfsspec/utils.py b/ipfsspec/utils.py new file mode 100644 index 0000000..d1bb255 --- /dev/null +++ b/ipfsspec/utils.py @@ -0,0 +1,21 @@ +""" +Some utilities. +""" + +from io import BytesIO +from typing import List, Union, BinaryIO + +from multiformats import CID +from typing_extensions import TypeGuard + +StreamLike = Union[BinaryIO, bytes] + +def ensure_stream(stream_or_bytes: StreamLike) -> BinaryIO: + if isinstance(stream_or_bytes, bytes): + return BytesIO(stream_or_bytes) + else: + return stream_or_bytes + + +def is_cid_list(os: List[object]) -> TypeGuard[List[CID]]: + return all(isinstance(o, CID) for o in os) diff --git a/setup.py b/setup.py index 832f002..04060e3 100644 --- a/setup.py +++ b/setup.py @@ -26,6 +26,9 @@ "fsspec>=0.9.0", "requests", "aiohttp", + "multiformats", + "dag-cbor >= 0.2.2", + "pure-protobuf >= 2.1.0, <3", ], entry_points={ 'fsspec.specs': [ diff --git a/test/test_async.py b/test/test_async.py index 9e1165d..7c108c0 100644 --- a/test/test_async.py +++ b/test/test_async.py @@ -1,6 +1,7 @@ import pytest import pytest_asyncio from ipfsspec.async_ipfs import AsyncIPFSGateway, AsyncIPFSFileSystem +import asyncio import aiohttp TEST_ROOT = "QmW3CrGFuFyF3VH1wvrap4Jend5NRTgtESDjuQ7QhHD5dd" @@ -14,6 +15,18 @@ async def session(): yield session +@pytest_asyncio.fixture +async def get_client(session): + async def get_client(**kwargs): + return session + + +@pytest_asyncio.fixture +async def fs(get_client): + AsyncIPFSFileSystem.clear_instance_cache() # avoid reusing old event loop + return AsyncIPFSFileSystem(asynchronous=True, loop=asyncio.get_running_loop(), get_client=get_client) + + @pytest.mark.local_gw @pytest.mark.parametrize("gw_host", ["http://127.0.0.1:8080"]) @pytest.mark.parametrize("filename", TEST_FILENAMES) @@ -22,7 +35,7 @@ async def test_different_file_representations(filename, gw_host, session): gw = AsyncIPFSGateway(gw_host) path = TEST_ROOT + "/" + filename - info = await gw.file_info(path, session) + info = await gw.info(path, session) assert info["size"] == len(REF_CONTENT) assert info["type"] == "file" content = await gw.cat(path, session) @@ -35,14 +48,12 @@ async def test_different_file_representations(filename, gw_host, session): async def test_get_cid_of_folder(gw_host, session): gw = AsyncIPFSGateway(gw_host) - info = await gw.file_info(TEST_ROOT, session) + info = await gw.info(TEST_ROOT, session) assert info["CID"] == TEST_ROOT @pytest.mark.asyncio -async def test_ls(event_loop): - AsyncIPFSFileSystem.clear_instance_cache() # avoid reusing old event loop - fs = AsyncIPFSFileSystem(asynchronous=True, loop=event_loop) +async def test_ls(fs): res = await fs._ls(TEST_ROOT, detail=False) assert res == [TEST_ROOT + fs.sep + fn for fn in TEST_FILENAMES] res = await fs._ls(TEST_ROOT, detail=True) @@ -51,9 +62,13 @@ async def test_ls(event_loop): @pytest.mark.asyncio -async def test_cat_file(event_loop): - AsyncIPFSFileSystem.clear_instance_cache() # avoid reusing old event loop - fs = AsyncIPFSFileSystem(asynchronous=True, loop=event_loop) +async def test_glob(fs): + res = await fs._glob(TEST_ROOT + "/w*") + assert res == [TEST_ROOT + fs.sep + fn for fn in TEST_FILENAMES if fn.startswith("w")] + + +@pytest.mark.asyncio +async def test_cat_file(fs): res = await fs._cat_file(TEST_ROOT + "/default") assert res == REF_CONTENT res = await fs._cat_file(TEST_ROOT + "/default", start=3, end=7) @@ -61,9 +76,7 @@ async def test_cat_file(event_loop): @pytest.mark.asyncio -async def test_exists(event_loop): - AsyncIPFSFileSystem.clear_instance_cache() # avoid reusing old event loop - fs = AsyncIPFSFileSystem(asynchronous=True, loop=event_loop) +async def test_exists(fs): res = await fs._exists(TEST_ROOT + "/default") assert res is True res = await fs._exists(TEST_ROOT + "/missing") @@ -71,9 +84,7 @@ async def test_exists(event_loop): @pytest.mark.asyncio -async def test_isfile(event_loop): - AsyncIPFSFileSystem.clear_instance_cache() # avoid reusing old event loop - fs = AsyncIPFSFileSystem(asynchronous=True, loop=event_loop) +async def test_isfile(fs): res = await fs._isfile(TEST_ROOT + "/default") assert res is True res = await fs._isfile(TEST_ROOT) From 857914bc44464cdc782e4717c806d39092446b55 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20K=C3=B6lling?= Date: Thu, 19 Sep 2024 15:28:32 -0400 Subject: [PATCH 07/10] bump IPFS version in tests (should now work) --- .github/workflows/local_gateway.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/local_gateway.yml b/.github/workflows/local_gateway.yml index eb1dee9..4c2267b 100644 --- a/.github/workflows/local_gateway.yml +++ b/.github/workflows/local_gateway.yml @@ -10,7 +10,7 @@ jobs: max-parallel: 4 matrix: python-version: ["3.8", "3.9", "3.10"] - ipfs-version: ["0.27.0"] # this is the latest IPFS version supporting /api/v0, see issue #28 + ipfs-version: ["0.30.0"] steps: - uses: actions/checkout@v1 - name: Set up Python ${{ matrix.python-version }} From 4878f4947ec976ad8e48248342e3b0c95a9af665 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20K=C3=B6lling?= Date: Thu, 19 Sep 2024 16:44:49 -0400 Subject: [PATCH 08/10] fixup: remove CAR handling (for now) --- ipfsspec/async_ipfs.py | 20 ++++--- ipfsspec/car.py | 116 ----------------------------------------- ipfsspec/utils.py | 21 -------- 3 files changed, 9 insertions(+), 148 deletions(-) delete mode 100644 ipfsspec/car.py delete mode 100644 ipfsspec/utils.py diff --git a/ipfsspec/async_ipfs.py b/ipfsspec/async_ipfs.py index 43a67bf..2f18060 100644 --- a/ipfsspec/async_ipfs.py +++ b/ipfsspec/async_ipfs.py @@ -13,7 +13,6 @@ from fsspec.exceptions import FSTimeoutError from multiformats import CID, multicodec -from .car import read_car from . import unixfsv1 import logging @@ -121,24 +120,23 @@ async def cat(self, path, session): return await res.read() async def ls(self, path, session, detail=False): - res = await self.get(path, session, headers={"Accept": "application/vnd.ipld.car"}, params={"format": "car", "dag-scope": "entity"}) + res = await self.get(path, session, headers={"Accept": "application/vnd.ipld.raw"}, params={"format": "raw"}) self._raise_not_found_for_status(res, path) resdata = await res.read() - root = CID.decode(res.headers["X-Ipfs-Roots"].split(",")[-1]) - assert root.codec == DagPbCodec, "this is not a directory" - _, blocks = read_car(resdata) # roots should be ignored by https://specs.ipfs.tech/http-gateways/trustless-gateway/ - blocks = {cid: data for cid, data, _ in blocks} - root_block = unixfsv1.PBNode.loads(blocks[root]) - root_data = unixfsv1.Data.loads(root_block.Data) - if root_data.Type != unixfsv1.DataType.Directory: + cid = CID.decode(res.headers["X-Ipfs-Roots"].split(",")[-1]) + assert cid.codec == DagPbCodec, "this is not a directory" + node = unixfsv1.PBNode.loads(resdata) + data = unixfsv1.Data.loads(node.Data) + if data.Type != unixfsv1.DataType.Directory: + # TODO: we might need support for HAMTShard here (for large directories) raise ValueError(f"The path '{path}' is not a directory") if detail: return await asyncio.gather(*( self.info(path + "/" + link.Name, session) - for link in root_block.Links)) + for link in node.Links)) else: - return [path + "/" + link.Name for link in root_block.Links] + return [path + "/" + link.Name for link in node.Links] def _raise_not_found_for_status(self, response, url): """ diff --git a/ipfsspec/car.py b/ipfsspec/car.py deleted file mode 100644 index 028e41e..0000000 --- a/ipfsspec/car.py +++ /dev/null @@ -1,116 +0,0 @@ -""" -CAR handling functions. -""" - -from typing import List, Optional, Tuple, Union, Iterator, BinaryIO -import dataclasses - -import dag_cbor -from multiformats import CID, varint, multicodec, multihash - -from .utils import is_cid_list, StreamLike, ensure_stream - -DagPbCodec = multicodec.get("dag-pb") -Sha256Hash = multihash.get("sha2-256") - -@dataclasses.dataclass -class CARBlockLocation: - varint_size: int - cid_size: int - payload_size: int - offset: int = 0 - - @property - def cid_offset(self) -> int: - return self.offset + self.varint_size - - @property - def payload_offset(self) -> int: - return self.offset + self.varint_size + self.cid_size - - @property - def size(self) -> int: - return self.varint_size + self.cid_size + self.payload_size - - -def decode_car_header(stream: BinaryIO) -> Tuple[List[CID], int]: - """ - Decodes a CAR header and returns the list of contained roots. - """ - header_size, visize, _ = varint.decode_raw(stream) # type: ignore [call-overload] # varint uses BufferedIOBase - header = dag_cbor.decode(stream.read(header_size)) - if not isinstance(header, dict): - raise ValueError("no valid CAR header found") - if header["version"] != 1: - raise ValueError("CAR is not version 1") - roots = header["roots"] - if not isinstance(roots, list): - raise ValueError("CAR header doesn't contain roots") - if not is_cid_list(roots): - raise ValueError("CAR roots do not only contain CIDs") - return roots, visize + header_size - - -def decode_raw_car_block(stream: BinaryIO) -> Optional[Tuple[CID, bytes, CARBlockLocation]]: - try: - block_size, visize, _ = varint.decode_raw(stream) # type: ignore [call-overload] # varint uses BufferedIOBase - except ValueError: - # stream has likely been consumed entirely - return None - - data = stream.read(block_size) - # as the size of the CID is variable but not explicitly given in - # the CAR format, we need to partially decode each CID to determine - # its size and the location of the payload data - if data[0] == 0x12 and data[1] == 0x20: - # this is CIDv0 - cid_version = 0 - default_base = "base58btc" - cid_codec: Union[int, multicodec.Multicodec] = DagPbCodec - hash_codec: Union[int, multihash.Multihash] = Sha256Hash - cid_digest = data[2:34] - data = data[34:] - else: - # this is CIDv1(+) - cid_version, _, data = varint.decode_raw(data) - if cid_version != 1: - raise ValueError(f"CIDv{cid_version} is currently not supported") - default_base = "base32" - cid_codec, _, data = multicodec.unwrap_raw(data) - hash_codec, _, data = varint.decode_raw(data) - digest_size, _, data = varint.decode_raw(data) - cid_digest = data[:digest_size] - data = data[digest_size:] - cid = CID(default_base, cid_version, cid_codec, (hash_codec, cid_digest)) - - if not cid.hashfun.digest(data) == cid.digest: - raise ValueError(f"CAR is corrupted. Entry '{cid}' could not be verified") - - return cid, bytes(data), CARBlockLocation(visize, block_size - len(data), len(data)) - - -def read_car(stream_or_bytes: StreamLike) -> Tuple[List[CID], Iterator[Tuple[CID, bytes, CARBlockLocation]]]: - """ - Reads a CAR. - - Parameters - ---------- - stream_or_bytes: StreamLike - Stream to read CAR from - - Returns - ------- - roots : List[CID] - Roots as given by the CAR header - blocks : Iterator[Tuple[cid, BytesLike, CARBlockLocation]] - Iterator over all blocks contained in the CAR - """ - stream = ensure_stream(stream_or_bytes) - roots, header_size = decode_car_header(stream) - def blocks() -> Iterator[Tuple[CID, bytes, CARBlockLocation]]: - offset = header_size - while (next_block := decode_raw_car_block(stream)) is not None: - cid, data, sizes = next_block - yield cid, data, dataclasses.replace(sizes, offset=offset) - offset += sizes.size - return roots, blocks() diff --git a/ipfsspec/utils.py b/ipfsspec/utils.py deleted file mode 100644 index d1bb255..0000000 --- a/ipfsspec/utils.py +++ /dev/null @@ -1,21 +0,0 @@ -""" -Some utilities. -""" - -from io import BytesIO -from typing import List, Union, BinaryIO - -from multiformats import CID -from typing_extensions import TypeGuard - -StreamLike = Union[BinaryIO, bytes] - -def ensure_stream(stream_or_bytes: StreamLike) -> BinaryIO: - if isinstance(stream_or_bytes, bytes): - return BytesIO(stream_or_bytes) - else: - return stream_or_bytes - - -def is_cid_list(os: List[object]) -> TypeGuard[List[CID]]: - return all(isinstance(o, CID) for o in os) From e8ab93656dd22953c7867806757dfc1aaa0b2a6b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20K=C3=B6lling?= Date: Thu, 19 Sep 2024 17:25:43 -0400 Subject: [PATCH 09/10] revisit exceptions --- ipfsspec/async_ipfs.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/ipfsspec/async_ipfs.py b/ipfsspec/async_ipfs.py index 2f18060..572a388 100644 --- a/ipfsspec/async_ipfs.py +++ b/ipfsspec/async_ipfs.py @@ -81,7 +81,7 @@ async def info(self, path, session): node = unixfsv1.PBNode.loads(resdata) data = unixfsv1.Data.loads(node.Data) if data.Type == unixfsv1.DataType.Raw: - raise ValueError(f"The path '{path}' is only a subsection of a file") + raise FileNotFoundError(path) # this is not a file, it's only a part of it elif data.Type == unixfsv1.DataType.Directory: return { "name": path, @@ -109,14 +109,12 @@ async def info(self, path, session): elif data.Type == unixfsv1.DataType.HAMTShard: raise NotImplementedError(f"The path '{path}' contains a HAMTSharded directory, this is currently not implemented") else: - raise ValueError(f"The path '{path}' is neiter an IPFS UNIXFSv1 object") + raise FileNotFoundError(path) # it exists, but is not a UNIXFSv1 object, so it's not a file async def cat(self, path, session): res = await self.get(path, session) async with res: self._raise_not_found_for_status(res, path) - if res.status != 200: - raise FileNotFoundError(path) return await res.read() async def ls(self, path, session, detail=False): @@ -129,7 +127,7 @@ async def ls(self, path, session, detail=False): data = unixfsv1.Data.loads(node.Data) if data.Type != unixfsv1.DataType.Directory: # TODO: we might need support for HAMTShard here (for large directories) - raise ValueError(f"The path '{path}' is not a directory") + raise NotADirectoryError(path) if detail: return await asyncio.gather(*( @@ -142,9 +140,9 @@ def _raise_not_found_for_status(self, response, url): """ Raises FileNotFoundError for 404s, otherwise uses raise_for_status. """ - if response.status == 404: + if response.status == 404: # returned for known missing files raise FileNotFoundError(url) - elif response.status == 400: + elif response.status == 400: # return for invalid requests, so it's also certainly not there raise FileNotFoundError(url) response.raise_for_status() @@ -301,7 +299,7 @@ async def _info(self, path, **kwargs): def open(self, path, mode="rb", block_size=None, cache_options=None, **kwargs): if mode != "rb": - raise NotImplementedError + raise NotImplementedError("opening modes other than read binary are not implemented") data = self.cat_file(path) # load whole chunk into memory return io.BytesIO(data) From 37aa0922772e8bea1b22d03a01daf33b5f0f57fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20K=C3=B6lling?= Date: Thu, 19 Sep 2024 17:26:17 -0400 Subject: [PATCH 10/10] add test for missing folder in ls --- test/test_async.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/test/test_async.py b/test/test_async.py index 7c108c0..1978bc5 100644 --- a/test/test_async.py +++ b/test/test_async.py @@ -61,6 +61,13 @@ async def test_ls(fs): assert all([r["size"] == len(REF_CONTENT) for r in res]) +@pytest.mark.parametrize("detail", [False, True]) +@pytest.mark.asyncio +async def test_ls_missing(fs, detail): + with pytest.raises(FileNotFoundError): + await fs._ls(TEST_ROOT + "/missing", detail=detail) + + @pytest.mark.asyncio async def test_glob(fs): res = await fs._glob(TEST_ROOT + "/w*")