diff --git a/.github/workflows/local_gateway.yml b/.github/workflows/local_gateway.yml index eb1dee9..4c2267b 100644 --- a/.github/workflows/local_gateway.yml +++ b/.github/workflows/local_gateway.yml @@ -10,7 +10,7 @@ jobs: max-parallel: 4 matrix: python-version: ["3.8", "3.9", "3.10"] - ipfs-version: ["0.27.0"] # this is the latest IPFS version supporting /api/v0, see issue #28 + ipfs-version: ["0.30.0"] steps: - uses: actions/checkout@v1 - name: Set up Python ${{ matrix.python-version }} diff --git a/ipfsspec/async_ipfs.py b/ipfsspec/async_ipfs.py index a6d003d..572a388 100644 --- a/ipfsspec/async_ipfs.py +++ b/ipfsspec/async_ipfs.py @@ -6,104 +6,34 @@ from pathlib import Path import warnings +import asyncio import aiohttp from fsspec.asyn import AsyncFileSystem, sync, sync_wrapper from fsspec.exceptions import FSTimeoutError +from multiformats import CID, multicodec +from . import unixfsv1 + import logging logger = logging.getLogger("ipfsspec") +DagPbCodec = multicodec.get("dag-pb") +RawCodec = multicodec.get("raw") class RequestsTooQuick(OSError): def __init__(self, retry_after=None): self.retry_after = retry_after -class AsyncIPFSGatewayBase: - async def stat(self, path, session): - res = await self.api_get("files/stat", session, arg=path) - self._raise_not_found_for_status(res, path) - return await res.json() - - async def file_info(self, path, session): - info = {"name": path} - - headers = {"Accept-Encoding": "identity"} # this ensures correct file size - res = await self.cid_head(path, session, headers=headers, allow_redirects=True) - - async with res: - self._raise_not_found_for_status(res, path) - if res.status != 200: - # TODO: maybe handle 301 here - raise FileNotFoundError(path) - if "Content-Length" in res.headers: - info["size"] = int(res.headers["Content-Length"]) - elif "Content-Range" in res.headers: - info["size"] = int(res.headers["Content-Range"].split("/")[1]) - - if "ETag" in res.headers: - etag = res.headers["ETag"].strip("\"") - info["ETag"] = etag - if etag.startswith("DirIndex"): - info["type"] = "directory" - info["CID"] = etag.split("-")[-1] - else: - info["type"] = "file" - info["CID"] = etag - - return info - - async def cat(self, path, session): - res = await self.cid_get(path, session) - async with res: - self._raise_not_found_for_status(res, path) - if res.status != 200: - raise FileNotFoundError(path) - return await res.read() - - async def ls(self, path, session): - res = await self.api_get("ls", session, arg=path) - self._raise_not_found_for_status(res, path) - resdata = await res.json() - types = {1: "directory", 2: "file"} - return [{ - "name": path + "/" + link["Name"], - "CID": link["Hash"], - "type": types[link["Type"]], - "size": link["Size"], - } - for link in resdata["Objects"][0]["Links"]] - - def _raise_not_found_for_status(self, response, url): - """ - Raises FileNotFoundError for 404s, otherwise uses raise_for_status. - """ - if response.status == 404: - raise FileNotFoundError(url) - elif response.status == 400: - raise FileNotFoundError(url) - response.raise_for_status() - - -class AsyncIPFSGateway(AsyncIPFSGatewayBase): +class AsyncIPFSGateway: resolution = "path" def __init__(self, url, protocol="ipfs"): self.url = url self.protocol = protocol - async def api_get(self, endpoint, session, **kwargs): - res = await session.get(self.url + "/api/v0/" + endpoint, params=kwargs, trace_request_ctx={'gateway': self.url}) - self._raise_requests_too_quick(res) - return res - - async def api_post(self, endpoint, session, **kwargs): - res = await session.post(self.url + "/api/v0/" + endpoint, params=kwargs, trace_request_ctx={'gateway': self.url}) - self._raise_requests_too_quick(res) - return res - async def _cid_req(self, method, path, headers=None, **kwargs): headers = headers or {} if self.resolution == "path": @@ -116,17 +46,12 @@ async def _cid_req(self, method, path, headers=None, **kwargs): self._raise_requests_too_quick(res) return res - async def cid_head(self, path, session, headers=None, **kwargs): + async def head(self, path, session, headers=None, **kwargs): return await self._cid_req(session.head, path, headers=headers, **kwargs) - async def cid_get(self, path, session, headers=None, **kwargs): + async def get(self, path, session, headers=None, **kwargs): return await self._cid_req(session.get, path, headers=headers, **kwargs) - async def version(self, session): - res = await self.api_get("version", session) - res.raise_for_status() - return await res.json() - @staticmethod def _raise_requests_too_quick(response): if response.status == 429: @@ -139,6 +64,90 @@ def _raise_requests_too_quick(response): def __str__(self): return f"GW({self.url})" + async def info(self, path, session): + res = await self.get(path, session, headers={"Accept": "application/vnd.ipld.raw"}, params={"format": "raw"}) + self._raise_not_found_for_status(res, path) + cid = CID.decode(res.headers["X-Ipfs-Roots"].split(",")[-1]) + resdata = await res.read() + + if cid.codec == RawCodec: + return { + "name": path, + "CID": str(cid), + "type": "file", + "size": len(resdata), + } + elif cid.codec == DagPbCodec: + node = unixfsv1.PBNode.loads(resdata) + data = unixfsv1.Data.loads(node.Data) + if data.Type == unixfsv1.DataType.Raw: + raise FileNotFoundError(path) # this is not a file, it's only a part of it + elif data.Type == unixfsv1.DataType.Directory: + return { + "name": path, + "CID": str(cid), + "type": "directory", + "islink": False, + } + elif data.Type == unixfsv1.DataType.File: + return { + "name": path, + "CID": str(cid), + "type": "file", + "size": data.filesize, + "islink": False, + } + elif data.Type == unixfsv1.DataType.Metadata: + raise NotImplementedError(f"The path '{path}' contains a Metadata node, this is currently not implemented") + elif data.Type == unixfsv1.DataType.Symlink: + return { + "name": path, + "CID": str(cid), + "type": "other", # TODO: maybe we should have directory or file as returning type, but that probably would require resolving at least another level of blocks + "islink": True, + } + elif data.Type == unixfsv1.DataType.HAMTShard: + raise NotImplementedError(f"The path '{path}' contains a HAMTSharded directory, this is currently not implemented") + else: + raise FileNotFoundError(path) # it exists, but is not a UNIXFSv1 object, so it's not a file + + async def cat(self, path, session): + res = await self.get(path, session) + async with res: + self._raise_not_found_for_status(res, path) + return await res.read() + + async def ls(self, path, session, detail=False): + res = await self.get(path, session, headers={"Accept": "application/vnd.ipld.raw"}, params={"format": "raw"}) + self._raise_not_found_for_status(res, path) + resdata = await res.read() + cid = CID.decode(res.headers["X-Ipfs-Roots"].split(",")[-1]) + assert cid.codec == DagPbCodec, "this is not a directory" + node = unixfsv1.PBNode.loads(resdata) + data = unixfsv1.Data.loads(node.Data) + if data.Type != unixfsv1.DataType.Directory: + # TODO: we might need support for HAMTShard here (for large directories) + raise NotADirectoryError(path) + + if detail: + return await asyncio.gather(*( + self.info(path + "/" + link.Name, session) + for link in node.Links)) + else: + return [path + "/" + link.Name for link in node.Links] + + def _raise_not_found_for_status(self, response, url): + """ + Raises FileNotFoundError for 404s, otherwise uses raise_for_status. + """ + if response.status == 404: # returned for known missing files + raise FileNotFoundError(url) + elif response.status == 400: # return for invalid requests, so it's also certainly not there + raise FileNotFoundError(url) + response.raise_for_status() + + + async def get_client(**kwargs): timeout = aiohttp.ClientTimeout(sock_connect=1, sock_read=5) @@ -167,14 +176,14 @@ def get_gateway(protocol="ipfs"): ipfs_gateway = os.environ.get("IPFS_GATEWAY", "") if ipfs_gateway: logger.debug("using IPFS gateway from IPFS_GATEWAY environment variable: %s", ipfs_gateway) - return AsyncGateway(ipfs_gateway, protocol) + return AsyncIPFSGateway(ipfs_gateway, protocol) # internal configuration: accept IPFSSPEC_GATEWAYS for backwards compatibility if ipfsspec_gateways := os.environ.get("IPFSSPEC_GATEWAYS", ""): ipfs_gateway = ipfsspec_gateways.split()[0] logger.debug("using IPFS gateway from IPFSSPEC_GATEWAYS environment variable: %s", ipfs_gateway) warnings.warn("The IPFSSPEC_GATEWAYS environment variable is deprecated, please configure your IPFS Gateway according to IPIP-280, e.g. by using the IPFS_GATEWAY environment variable or using the ~/.ipfs/gateway file.", DeprecationWarning) - return AsyncGateway(ipfs_gateway, protocol) + return AsyncIPFSGateway(ipfs_gateway, protocol) # check various well-known files for possible gateway configurations if ipfs_path := os.environ.get("IPFS_PATH", ""): @@ -274,11 +283,7 @@ async def set_session(self): async def _ls(self, path, detail=True, **kwargs): path = self._strip_protocol(path) session = await self.set_session() - res = await self.gateway.ls(path, session) - if detail: - return res - else: - return [r["name"] for r in res] + return await self.gateway.ls(path, session, detail=detail) ls = sync_wrapper(_ls) @@ -290,11 +295,11 @@ async def _cat_file(self, path, start=None, end=None, **kwargs): async def _info(self, path, **kwargs): path = self._strip_protocol(path) session = await self.set_session() - return await self.gateway.file_info(path, session) + return await self.gateway.info(path, session) def open(self, path, mode="rb", block_size=None, cache_options=None, **kwargs): if mode != "rb": - raise NotImplementedError + raise NotImplementedError("opening modes other than read binary are not implemented") data = self.cat_file(path) # load whole chunk into memory return io.BytesIO(data) diff --git a/ipfsspec/unixfsv1.py b/ipfsspec/unixfsv1.py new file mode 100644 index 0000000..b3bb03c --- /dev/null +++ b/ipfsspec/unixfsv1.py @@ -0,0 +1,109 @@ +""" +from UNIXFS spec (https://github.com/ipfs/specs/blob/master/UNIXFS.md): + +message Data { + enum DataType { + Raw = 0; + Directory = 1; + File = 2; + Metadata = 3; + Symlink = 4; + HAMTShard = 5; + } + + required DataType Type = 1; + optional bytes Data = 2; + optional uint64 filesize = 3; + repeated uint64 blocksizes = 4; + optional uint64 hashType = 5; + optional uint64 fanout = 6; + optional uint32 mode = 7; + optional UnixTime mtime = 8; +} + +message Metadata { + optional string MimeType = 1; +} + +message UnixTime { + required int64 Seconds = 1; + optional fixed32 FractionalNanoseconds = 2; +} + + + +from DAG-PB spec (https://ipld.io/specs/codecs/dag-pb/spec/): + +message PBLink { + // binary CID (with no multibase prefix) of the target object + optional bytes Hash = 1; + + // UTF-8 string name + optional string Name = 2; + + // cumulative size of target object + optional uint64 Tsize = 3; +} + +message PBNode { + // refs to other objects + repeated PBLink Links = 2; + + // opaque user data + optional bytes Data = 1; +} +""" + +from dataclasses import dataclass +from enum import IntEnum +from typing import List, Optional + +from pure_protobuf.dataclasses_ import field, message # type: ignore +from pure_protobuf.types import uint32, uint64, int64, fixed32 # type: ignore + +class DataType(IntEnum): + Raw = 0 + Directory = 1 + File = 2 + Metadata = 3 + Symlink = 4 + HAMTShard = 5 + +@message +@dataclass +class UnixTime: + Seconds: int64 = field(1) + FractionalNanoseconds: Optional[fixed32] = field(2) + +@message +@dataclass +class Data: + # pylint: disable=too-many-instance-attributes + Type: DataType = field(1) + Data: Optional[bytes] = field(2, default=None) + filesize: Optional[uint64] = field(3, default=None) + blocksizes: List[uint64] = field(4, default_factory=list, packed=False) + hashType: Optional[uint64] = field(5, default=None) + fanout: Optional[uint64] = field(6, default=None) + mode: Optional[uint32] = field(7, default=None) + mtime: Optional[UnixTime] = field(8, default=None) + +@message +@dataclass +class Metadata: + MimeType: Optional[str] = field(1, default=None) + + +@message +@dataclass +class PBLink: + Hash: Optional[bytes] = field(1, default=None) + Name: Optional[str] = field(2, default=None) + Tsize: Optional[uint64] = field(3, default=None) + +Data_ = Data +@message +@dataclass +class PBNode: + Links: List[PBLink] = field(2, default_factory=list) + Data: Optional[bytes] = field(1, default=None) diff --git a/setup.py b/setup.py index 832f002..04060e3 100644 --- a/setup.py +++ b/setup.py @@ -26,6 +26,9 @@ "fsspec>=0.9.0", "requests", "aiohttp", + "multiformats", + "dag-cbor >= 0.2.2", + "pure-protobuf >= 2.1.0, <3", ], entry_points={ 'fsspec.specs': [ diff --git a/test/test_async.py b/test/test_async.py index 9e1165d..1978bc5 100644 --- a/test/test_async.py +++ b/test/test_async.py @@ -1,6 +1,7 @@ import pytest import pytest_asyncio from ipfsspec.async_ipfs import AsyncIPFSGateway, AsyncIPFSFileSystem +import asyncio import aiohttp TEST_ROOT = "QmW3CrGFuFyF3VH1wvrap4Jend5NRTgtESDjuQ7QhHD5dd" @@ -14,6 +15,18 @@ async def session(): yield session +@pytest_asyncio.fixture +async def get_client(session): + async def get_client(**kwargs): + return session + + +@pytest_asyncio.fixture +async def fs(get_client): + AsyncIPFSFileSystem.clear_instance_cache() # avoid reusing old event loop + return AsyncIPFSFileSystem(asynchronous=True, loop=asyncio.get_running_loop(), get_client=get_client) + + @pytest.mark.local_gw @pytest.mark.parametrize("gw_host", ["http://127.0.0.1:8080"]) @pytest.mark.parametrize("filename", TEST_FILENAMES) @@ -22,7 +35,7 @@ async def test_different_file_representations(filename, gw_host, session): gw = AsyncIPFSGateway(gw_host) path = TEST_ROOT + "/" + filename - info = await gw.file_info(path, session) + info = await gw.info(path, session) assert info["size"] == len(REF_CONTENT) assert info["type"] == "file" content = await gw.cat(path, session) @@ -35,14 +48,12 @@ async def test_different_file_representations(filename, gw_host, session): async def test_get_cid_of_folder(gw_host, session): gw = AsyncIPFSGateway(gw_host) - info = await gw.file_info(TEST_ROOT, session) + info = await gw.info(TEST_ROOT, session) assert info["CID"] == TEST_ROOT @pytest.mark.asyncio -async def test_ls(event_loop): - AsyncIPFSFileSystem.clear_instance_cache() # avoid reusing old event loop - fs = AsyncIPFSFileSystem(asynchronous=True, loop=event_loop) +async def test_ls(fs): res = await fs._ls(TEST_ROOT, detail=False) assert res == [TEST_ROOT + fs.sep + fn for fn in TEST_FILENAMES] res = await fs._ls(TEST_ROOT, detail=True) @@ -50,10 +61,21 @@ async def test_ls(event_loop): assert all([r["size"] == len(REF_CONTENT) for r in res]) +@pytest.mark.parametrize("detail", [False, True]) @pytest.mark.asyncio -async def test_cat_file(event_loop): - AsyncIPFSFileSystem.clear_instance_cache() # avoid reusing old event loop - fs = AsyncIPFSFileSystem(asynchronous=True, loop=event_loop) +async def test_ls_missing(fs, detail): + with pytest.raises(FileNotFoundError): + await fs._ls(TEST_ROOT + "/missing", detail=detail) + + +@pytest.mark.asyncio +async def test_glob(fs): + res = await fs._glob(TEST_ROOT + "/w*") + assert res == [TEST_ROOT + fs.sep + fn for fn in TEST_FILENAMES if fn.startswith("w")] + + +@pytest.mark.asyncio +async def test_cat_file(fs): res = await fs._cat_file(TEST_ROOT + "/default") assert res == REF_CONTENT res = await fs._cat_file(TEST_ROOT + "/default", start=3, end=7) @@ -61,9 +83,7 @@ async def test_cat_file(event_loop): @pytest.mark.asyncio -async def test_exists(event_loop): - AsyncIPFSFileSystem.clear_instance_cache() # avoid reusing old event loop - fs = AsyncIPFSFileSystem(asynchronous=True, loop=event_loop) +async def test_exists(fs): res = await fs._exists(TEST_ROOT + "/default") assert res is True res = await fs._exists(TEST_ROOT + "/missing") @@ -71,9 +91,7 @@ async def test_exists(event_loop): @pytest.mark.asyncio -async def test_isfile(event_loop): - AsyncIPFSFileSystem.clear_instance_cache() # avoid reusing old event loop - fs = AsyncIPFSFileSystem(asynchronous=True, loop=event_loop) +async def test_isfile(fs): res = await fs._isfile(TEST_ROOT + "/default") assert res is True res = await fs._isfile(TEST_ROOT)