From fa6d2d491aa1b47b6be2038396513857c9e85312 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20Ko=CC=88lling?= Date: Tue, 6 Aug 2024 01:26:02 +0200 Subject: [PATCH] remove multi-gateway functionality In the past, the multi-gateway functionality often created more harm than use. The functionality mainly existed, to provide a zero-configuration experience for new users. Due to IPIP-280, this is now less of a concern. Further, the new auto-config method encourages new users to briefly interact with IPFS, maybe leading to a better performing setup. --- ipfsspec/async_ipfs.py | 104 ------------------------------------ test/test_async.py | 19 +------ test/test_async_fallback.py | 91 ------------------------------- 3 files changed, 1 insertion(+), 213 deletions(-) delete mode 100644 test/test_async_fallback.py diff --git a/ipfsspec/async_ipfs.py b/ipfsspec/async_ipfs.py index 902de4e..b4a4ac2 100644 --- a/ipfsspec/async_ipfs.py +++ b/ipfsspec/async_ipfs.py @@ -141,110 +141,6 @@ def __str__(self): return f"GW({self.url})" -class GatewayState: - def __init__(self): - self.reachable = True - self.next_request_time = 0 - self.backoff_time = 0 - self.start_backoff = 1e-5 - self.max_backoff = 5 - - def schedule_next(self): - self.next_request_time = time.monotonic() + self.backoff_time - - def backoff(self): - if self.backoff_time < self.start_backoff: - self.backoff_time = self.start_backoff - else: - self.backoff_time *= 2 - self.reachable = True - self.schedule_next() - - def speedup(self, not_below=0): - did_speed_up = False - if self.backoff_time > not_below: - self.backoff_time *= 0.9 - did_speed_up = True - self.reachable = True - self.schedule_next() - return did_speed_up - - def broken(self): - self.backoff_time = self.max_backoff - self.reachable = False - self.schedule_next() - - def trying_to_reach(self): - self.next_request_time = time.monotonic() + 1 - - -class MultiGateway(AsyncIPFSGatewayBase): - def __init__(self, gws, max_backoff_rounds=50): - self.gws = [(GatewayState(), gw) for gw in gws] - self.max_backoff_rounds = max_backoff_rounds - - @property - def _gws_in_priority_order(self): - now = time.monotonic() - return sorted(self.gws, key=lambda x: max(now, x[0].next_request_time)) - - async def _gw_op(self, op): - for _ in range(self.max_backoff_rounds): - for state, gw in self._gws_in_priority_order: - not_before = state.next_request_time - if not state.reachable: - state.trying_to_reach() - else: - state.schedule_next() - now = time.monotonic() - if not_before > now: - await asyncio.sleep(not_before - now) - logger.debug("tring %s", gw) - try: - res = await op(gw) - if state.speedup(time.monotonic() - now): - logger.debug("%s speedup", gw) - return res - except FileNotFoundError: # early exit if object doesn't exist - raise - except (RequestsTooQuick, aiohttp.ClientResponseError, asyncio.TimeoutError) as e: - state.backoff() - logger.debug("%s backoff %s", gw, e) - break - except IOError as e: - exception = e - state.broken() - logger.debug("%s broken", gw) - continue - else: - raise exception - raise RequestsTooQuick() - - async def api_get(self, endpoint, session, **kwargs): - return await self._gw_op(lambda gw: gw.api_get(endpoint, session, **kwargs)) - - async def api_post(self, endpoint, session, **kwargs): - return await self._gw_op(lambda gw: gw.api_post(endpoint, session, **kwargs)) - - async def cid_head(self, path, session, headers=None, **kwargs): - return await self._gw_op(lambda gw: gw.cid_head(path, session, headers=headers, **kwargs)) - - async def cid_get(self, path, session, headers=None, **kwargs): - return await self._gw_op(lambda gw: gw.cid_get(path, session, headers=headers, **kwargs)) - - async def cat(self, path, session): - return await self._gw_op(lambda gw: gw.cat(path, session)) - - async def ls(self, path, session): - return await self._gw_op(lambda gw: gw.ls(path, session)) - - def state_report(self): - return "\n".join(f"{s.next_request_time}, {gw}" for s, gw in self.gws) - - def __str__(self): - return "Multi-GW(" + ", ".join(str(gw) for _, gw in self.gws) + ")" - - async def get_client(**kwargs): timeout = aiohttp.ClientTimeout(sock_connect=1, sock_read=5) kwargs = {"timeout": timeout, **kwargs} diff --git a/test/test_async.py b/test/test_async.py index b54a1b1..9e1165d 100644 --- a/test/test_async.py +++ b/test/test_async.py @@ -1,6 +1,6 @@ import pytest import pytest_asyncio -from ipfsspec.async_ipfs import AsyncIPFSGateway, MultiGateway, AsyncIPFSFileSystem +from ipfsspec.async_ipfs import AsyncIPFSGateway, AsyncIPFSFileSystem import aiohttp TEST_ROOT = "QmW3CrGFuFyF3VH1wvrap4Jend5NRTgtESDjuQ7QhHD5dd" @@ -39,23 +39,6 @@ async def test_get_cid_of_folder(gw_host, session): assert info["CID"] == TEST_ROOT -@pytest.mark.local_gw -@pytest.mark.parametrize("gw_hosts", [ - ["http://127.0.0.1:8080"], - ["http://127.0.0.1:9999", "http://127.0.0.1:8080"], - ["http://127.0.0.1:8080", "http://127.0.0.1:9999"], - ["https://ipfs.io", "http://127.0.0.1:8080"], - ["http://127.0.0.1:8080", "https://ipfs.io"], -]) -@pytest.mark.asyncio -async def test_multi_gw_cat(gw_hosts, session): - gws = [AsyncIPFSGateway(gw_host) for gw_host in gw_hosts] - gw = MultiGateway(gws) - - res = await gw.cat(TEST_ROOT + "/default", session) - assert res == REF_CONTENT - - @pytest.mark.asyncio async def test_ls(event_loop): AsyncIPFSFileSystem.clear_instance_cache() # avoid reusing old event loop diff --git a/test/test_async_fallback.py b/test/test_async_fallback.py deleted file mode 100644 index 07ca331..0000000 --- a/test/test_async_fallback.py +++ /dev/null @@ -1,91 +0,0 @@ -import pytest -import time - -from ipfsspec.async_ipfs import MultiGateway, AsyncIPFSGatewayBase, RequestsTooQuick - - -class MockGateway(AsyncIPFSGatewayBase): - def __init__(self, objects): - self.objects = objects - self.request_count = 0 - - async def cid_get(self, path, session, headers=None, **kwargs): - self.request_count += 1 - try: - return self.objects[path] - except KeyError: - raise FileNotFoundError(path) - - -class RateLimitedMockGateway(AsyncIPFSGatewayBase): - def __init__(self, max_rate, base, report_time=True): - self.request_count = 0 - self.next_allowed_request = time.monotonic() - - self.max_rate = max_rate - self.base = base - self.report_time = report_time - - def _rate_limit(self): - self.request_count += 1 - now = time.monotonic() - if now <= self.next_allowed_request: - raise RequestsTooQuick(self.next_allowed_request - now if self.report_time else None) - else: - self.next_allowed_request = now + self.max_rate - - async def cid_get(self, path, session, headers=None, **kwargs): - self._rate_limit() - return await self.base.cid_get(path, session, headers=headers, **kwargs) - - -@pytest.fixture -def session(): - return None - - -@pytest.mark.asyncio -async def test_backoff(session): - base = MockGateway({ - "QmTz3oc4gdpRMKP2sdGUPZTAGRngqjsi99BPoztyP53JMM": "bar", - }) - gws = [RateLimitedMockGateway(0.01, base)] - gw = MultiGateway(gws) - - for _ in range(50): - obj = await gw.cid_get("QmTz3oc4gdpRMKP2sdGUPZTAGRngqjsi99BPoztyP53JMM", session) - assert obj == "bar" - assert 50 <= gws[0].request_count < 240 - - -@pytest.mark.asyncio -async def test_backoff_use_faster_server(session): - base = MockGateway({ - "QmTz3oc4gdpRMKP2sdGUPZTAGRngqjsi99BPoztyP53JMM": "zapp", - }) - gws = [ - RateLimitedMockGateway(0.1, base), - RateLimitedMockGateway(0.01, base) - ] - gw = MultiGateway(gws) - - for _ in range(100): - obj = await gw.cid_get("QmTz3oc4gdpRMKP2sdGUPZTAGRngqjsi99BPoztyP53JMM", session) - assert obj == "zapp" - assert gws[0].request_count < gws[1].request_count - - -@pytest.mark.asyncio -async def test_stop_on_not_found(session): - objects = { - "QmTz3oc4gdpRMKP2sdGUPZTAGRngqjsi99BPoztyP53JMM": "zapp", - } - gws = [ - MockGateway(objects), - MockGateway(objects) - ] - gw = MultiGateway(gws) - - with pytest.raises(FileNotFoundError): - await gw.cid_get("QmTz3oc4gdpRMKP2sdGUPZTAGRngqjsi99BPoztyP53JMM/not_there", session) - assert gws[1].request_count == 0