From 73b394ac6c2a810c450ae6cc65fd5964088ce7a1 Mon Sep 17 00:00:00 2001 From: Gabriel Levcovitz Date: Wed, 6 Nov 2024 23:00:00 -0300 Subject: [PATCH] refactor(p2p): implement P2PConnectionProtocol --- hathor/builder/builder.py | 14 ++-- hathor/builder/cli_builder.py | 10 ++- hathor/cli/quick_test.py | 7 +- hathor/cli/run_node.py | 2 + hathor/cli/run_node_args.py | 1 + hathor/manager.py | 2 +- hathor/metrics.py | 6 +- hathor/p2p/dependencies/protocols.py | 29 +++++-- hathor/p2p/manager.py | 104 ++++++++++++++------------ hathor/p2p/peer_connections.py | 18 +++-- hathor/p2p/protocol.py | 4 +- hathor/p2p/states/ready.py | 8 +- hathor/p2p/sync_agent.py | 32 +++++++- hathor/p2p/sync_factory.py | 27 ------- hathor/p2p/sync_v1/factory.py | 42 ----------- hathor/p2p/sync_v2/factory.py | 34 --------- tests/others/test_metrics.py | 1 - tests/p2p/test_connections.py | 4 +- tests/p2p/test_get_best_blockchain.py | 12 +++ tests/p2p/test_sync.py | 7 +- tests/p2p/test_sync_rate_limiter.py | 5 ++ tests/sysctl/test_p2p.py | 2 +- 22 files changed, 168 insertions(+), 203 deletions(-) delete mode 100644 hathor/p2p/sync_factory.py delete mode 100644 hathor/p2p/sync_v1/factory.py delete mode 100644 hathor/p2p/sync_v2/factory.py diff --git a/hathor/builder/builder.py b/hathor/builder/builder.py index 0a107d20e..6c7880cdd 100644 --- a/hathor/builder/builder.py +++ b/hathor/builder/builder.py @@ -63,26 +63,23 @@ class SyncSupportLevel(IntEnum): ENABLED = 2 # available and enabled by default, possible to disable at runtime @classmethod - def add_factories( + def add_versions( cls, p2p_manager: ConnectionsManager, - dependencies: P2PDependencies, sync_v1_support: 'SyncSupportLevel', sync_v2_support: 'SyncSupportLevel', ) -> None: - """Adds the sync factory to the manager according to the support level.""" - from hathor.p2p.sync_v1.factory import SyncV11Factory - from hathor.p2p.sync_v2.factory import SyncV2Factory + """Adds the sync version to the manager according to the support level.""" from hathor.p2p.sync_version import SyncVersion # sync-v1 support: if sync_v1_support > cls.UNAVAILABLE: - p2p_manager.add_sync_factory(SyncVersion.V1_1, SyncV11Factory(dependencies)) + p2p_manager.add_sync_version(SyncVersion.V1_1) if sync_v1_support is cls.ENABLED: p2p_manager.enable_sync_version(SyncVersion.V1_1) # sync-v2 support: if sync_v2_support > cls.UNAVAILABLE: - p2p_manager.add_sync_factory(SyncVersion.V2, SyncV2Factory(dependencies)) + p2p_manager.add_sync_version(SyncVersion.V2) if sync_v2_support is cls.ENABLED: p2p_manager.enable_sync_version(SyncVersion.V2) @@ -427,9 +424,8 @@ def _get_or_create_p2p_manager(self) -> ConnectionsManager: ssl=enable_ssl, rng=self._rng, ) - SyncSupportLevel.add_factories( + SyncSupportLevel.add_versions( self._p2p_manager, - dependencies, self._sync_v1_support, self._sync_v2_support, ) diff --git a/hathor/builder/cli_builder.py b/hathor/builder/cli_builder.py index d052cbe8f..4cbb418e4 100644 --- a/hathor/builder/cli_builder.py +++ b/hathor/builder/cli_builder.py @@ -332,6 +332,13 @@ def create_manager(self, reactor: Reactor) -> HathorManager: log_vertex_bytes=self._args.log_vertex_bytes, ) + if self._args.x_multiprocess_p2p: + self.check_or_raise( + self._args.x_remove_sync_v1, + 'multiprocess support for P2P is only available if sync-v1 is removed (use --x-remove-sync-v1)' + ) + raise NotImplementedError('Multiprocess support for P2P is not yet implemented.') + p2p_dependencies = P2PDependencies( reactor=reactor, settings=settings, @@ -351,9 +358,8 @@ def create_manager(self, reactor: Reactor) -> HathorManager: rng=Random(), ) - SyncSupportLevel.add_factories( + SyncSupportLevel.add_versions( p2p_manager, - p2p_dependencies, sync_v1_support, sync_v2_support, ) diff --git a/hathor/cli/quick_test.py b/hathor/cli/quick_test.py index 2bdd20711..6ab17d9aa 100644 --- a/hathor/cli/quick_test.py +++ b/hathor/cli/quick_test.py @@ -72,16 +72,11 @@ def create_parser(cls) -> ArgumentParser: return parser def prepare(self, *, register_resources: bool = True) -> None: - from hathor.p2p.sync_v2.factory import SyncV2Factory - from hathor.p2p.sync_version import SyncVersion - super().prepare(register_resources=False) self._no_wait = self._args.no_wait self.log.info('patching vertex_handler.on_new_vertex to quit on success') - p2p_factory = self.manager.connections.get_sync_factory(SyncVersion.V2) - assert isinstance(p2p_factory, SyncV2Factory) - p2p_factory.dependencies.vertex_handler = VertexHandlerWrapper( + self.manager.connections.dependencies.vertex_handler = VertexHandlerWrapper( self.manager.vertex_handler, self.manager, self._args.quit_after_n_blocks, diff --git a/hathor/cli/run_node.py b/hathor/cli/run_node.py index 9498194ab..777846743 100644 --- a/hathor/cli/run_node.py +++ b/hathor/cli/run_node.py @@ -58,6 +58,7 @@ class RunNode: ('--x-enable-event-queue', lambda args: bool(args.x_enable_event_queue)), ('--x-asyncio-reactor', lambda args: bool(args.x_asyncio_reactor)), ('--x-ipython-kernel', lambda args: bool(args.x_ipython_kernel)), + ('--x-multiprocess-p2p', lambda args: bool(args.x_multiprocess_p2p)), ] env_vars_prefix: str | None = None @@ -162,6 +163,7 @@ def create_parser(cls) -> ArgumentParser: help='Log tx bytes for debugging') parser.add_argument('--disable-ws-history-streaming', action='store_true', help='Disable websocket history streaming API') + parser.add_argument('--x-multiprocess-p2p', action='store_true', help='Enable multiprocess support for P2P.') return parser def prepare(self, *, register_resources: bool = True) -> None: diff --git a/hathor/cli/run_node_args.py b/hathor/cli/run_node_args.py index f493a7d33..73ac907e9 100644 --- a/hathor/cli/run_node_args.py +++ b/hathor/cli/run_node_args.py @@ -83,3 +83,4 @@ class RunNodeArgs(BaseModel, extra=Extra.allow): nano_testnet: bool log_vertex_bytes: bool disable_ws_history_streaming: bool + x_multiprocess_p2p: bool diff --git a/hathor/manager.py b/hathor/manager.py index aa50e3766..df9e12dc3 100644 --- a/hathor/manager.py +++ b/hathor/manager.py @@ -987,7 +987,7 @@ def remove_peer_from_whitelist_and_disconnect(self, peer_id: PeerId) -> None: if peer_id in self.peers_whitelist: self.peers_whitelist.remove(peer_id) # disconnect from node - self.connections.drop_connection_by_peer_id(peer_id) + self.connections.drop_connection(peer_id) def has_recent_activity(self) -> bool: current_timestamp = time.time() diff --git a/hathor/metrics.py b/hathor/metrics.py index 4db9060dc..ac2a4f41e 100644 --- a/hathor/metrics.py +++ b/hathor/metrics.py @@ -247,7 +247,7 @@ def collect_peer_connection_metrics(self) -> None: """ self.peer_connection_metrics.clear() - for connection in self.connections.get_connected_peers(): + for addr, connection in self.connections.get_connected_peers().items(): peer = connection.get_peer_if_set() if not peer: # A connection without peer will not be able to communicate @@ -256,8 +256,8 @@ def collect_peer_connection_metrics(self) -> None: metrics = connection.get_metrics() metric = PeerConnectionMetrics( - connection_string=str(connection.addr), - peer_id=str(connection.peer.id), + connection_string=str(addr), + peer_id=str(peer.id), network=settings.NETWORK_NAME, received_messages=metrics.received_messages, sent_messages=metrics.sent_messages, diff --git a/hathor/p2p/dependencies/protocols.py b/hathor/p2p/dependencies/protocols.py index ba632d564..4822647ed 100644 --- a/hathor/p2p/dependencies/protocols.py +++ b/hathor/p2p/dependencies/protocols.py @@ -14,7 +14,7 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Iterable, Protocol +from typing import TYPE_CHECKING, Any, Iterable, Protocol from hathor.indexes.height_index import HeightInfo from hathor.p2p.peer_endpoint import PeerAddress @@ -22,23 +22,36 @@ from hathor.types import VertexId if TYPE_CHECKING: - from hathor.p2p.peer import PublicPeer, UnverifiedPeer + from hathor.p2p.peer import PublicPeer from hathor.p2p.peer_id import PeerId - from hathor.p2p.protocol import HathorProtocol - from hathor.p2p.sync_factory import SyncAgentFactory + from hathor.p2p.protocol import ConnectionMetrics from hathor.p2p.sync_version import SyncVersion +class P2PConnectionProtocol(Protocol): + """Abstract HathorProtocol as a Python protocol to be used in P2PManager.""" + + def is_synced(self) -> bool: ... + def send_tx_to_peer(self, tx: BaseTransaction) -> None: ... + def disconnect(self, reason: str = '', *, force: bool = False) -> None: ... + def get_peer(self) -> PublicPeer: ... + def get_peer_if_set(self) -> PublicPeer | None: ... + def enable_sync(self) -> None: ... + def disable_sync(self) -> None: ... + def is_sync_enabled(self) -> bool: ... + def send_peers(self, peers: Iterable[PublicPeer]) -> None: ... + def get_metrics(self) -> ConnectionMetrics: ... + + class P2PManagerProtocol(Protocol): """Abstract the P2PManager as a Python protocol to be used in P2P classes.""" def is_peer_whitelisted(self, peer_id: PeerId) -> bool: ... def get_enabled_sync_versions(self) -> set[SyncVersion]: ... - def get_sync_factory(self, sync_version: SyncVersion) -> SyncAgentFactory: ... def get_verified_peers(self) -> Iterable[PublicPeer]: ... - def on_receive_peer(self, peer: UnverifiedPeer) -> None: ... - def on_peer_connect(self, protocol: HathorProtocol) -> None: ... - def on_peer_ready(self, protocol: HathorProtocol) -> None: ... + def on_receive_peer(self, peer_data: dict[str, Any]) -> None: ... + def on_peer_connect(self, *, addr: PeerAddress, inbound: bool) -> None: ... + def on_peer_ready(self, *, addr: PeerAddress, peer: PublicPeer) -> None: ... def on_handshake_disconnect(self, *, addr: PeerAddress) -> None: ... def on_ready_disconnect(self, *, addr: PeerAddress, peer_id: PeerId) -> None: ... def on_initial_disconnect(self, *, addr: PeerAddress) -> None: ... diff --git a/hathor/p2p/manager.py b/hathor/p2p/manager.py index de15b1b5e..bca5e97cf 100644 --- a/hathor/p2p/manager.py +++ b/hathor/p2p/manager.py @@ -25,8 +25,10 @@ from twisted.protocols.tls import TLSMemoryBIOFactory, TLSMemoryBIOProtocol from twisted.python.failure import Failure from twisted.web.client import Agent +from typing_extensions import assert_never from hathor.p2p import P2PDependencies +from hathor.p2p.dependencies.protocols import P2PConnectionProtocol from hathor.p2p.netfilter.factory import NetfilterFactory from hathor.p2p.peer import PrivatePeer, PublicPeer, UnverifiedPeer from hathor.p2p.peer_connections import PeerConnections @@ -36,7 +38,7 @@ from hathor.p2p.peer_storage import UnverifiedPeerStorage, VerifiedPeerStorage from hathor.p2p.protocol import HathorProtocol from hathor.p2p.rate_limiter import RateLimiter -from hathor.p2p.sync_factory import SyncAgentFactory +from hathor.p2p.sync_v1.downloader import Downloader from hathor.p2p.sync_version import SyncVersion from hathor.p2p.utils import parse_whitelist from hathor.pubsub import HathorEvents, PubSubManager @@ -77,7 +79,6 @@ class GlobalRateLimiter: manager: Optional['HathorManager'] unverified_peer_storage: UnverifiedPeerStorage verified_peer_storage: VerifiedPeerStorage - _sync_factories: dict[SyncVersion, SyncAgentFactory] _enabled_sync_versions: set[SyncVersion] rate_limiter: RateLimiter @@ -179,27 +180,29 @@ def __init__( self._last_discovery: float = 0. # sync-manager factories - self._sync_factories = {} + self._available_sync_versions: set[SyncVersion] = set() self._enabled_sync_versions = set() # agent to perform HTTP requests self._http_agent = Agent(self.reactor) - def add_sync_factory(self, sync_version: SyncVersion, sync_factory: SyncAgentFactory) -> None: - """Add factory for the given sync version, must use a sync version that does not already exist.""" - # XXX: to allow code in `set_manager` to safely use the the available sync versions, we add this restriction: + self._sync_v1_downloader: Downloader | None = None + + def add_sync_version(self, sync_version: SyncVersion) -> None: + """Add a sync version, must use one that is not already set.""" + # XXX: to allow code in `set_manager` to safely use the available sync versions, we add this restriction: assert self.manager is None, 'Cannot modify sync factories after a manager is set' - if sync_version in self._sync_factories: + if sync_version in self._available_sync_versions: raise ValueError('sync version already exists') - self._sync_factories[sync_version] = sync_factory + self._available_sync_versions.add(sync_version) def get_available_sync_versions(self) -> set[SyncVersion]: """What sync versions the manager is capable of using, they are not necessarily enabled.""" - return set(self._sync_factories.keys()) + return self._available_sync_versions def is_sync_version_available(self, sync_version: SyncVersion) -> bool: """Whether the given sync version is available for use, is not necessarily enabled.""" - return sync_version in self._sync_factories + return sync_version in self._available_sync_versions def get_enabled_sync_versions(self) -> set[SyncVersion]: """What sync versions are enabled for use, it is necessarily a subset of the available versions.""" @@ -211,7 +214,7 @@ def is_sync_version_enabled(self, sync_version: SyncVersion) -> bool: def enable_sync_version(self, sync_version: SyncVersion) -> None: """Enable using the given sync version on new connections, it must be available before being enabled.""" - assert sync_version in self._sync_factories + assert sync_version in self._available_sync_versions if sync_version in self._enabled_sync_versions: self.log.info('tried to enable a sync verison that was already enabled, nothing to do') return @@ -224,6 +227,12 @@ def disable_sync_version(self, sync_version: SyncVersion) -> None: return self._enabled_sync_versions.discard(sync_version) + def get_sync_v1_downloader(self) -> Downloader: + assert self.is_sync_version_enabled(SyncVersion.V1_1) + if self._sync_v1_downloader is None: + self._sync_v1_downloader = Downloader(self.dependencies) + return self._sync_v1_downloader + def set_manager(self, manager: 'HathorManager') -> None: """Set the manager. This method must be called before start().""" if len(self._enabled_sync_versions) == 0: @@ -313,11 +322,6 @@ def _get_peers_count(self) -> PeerConnectionsMetrics: known_peers_count=len(self.verified_peer_storage) ) - def get_sync_factory(self, sync_version: SyncVersion) -> SyncAgentFactory: - """Get the sync factory for a given version, MUST be available or it will raise an assert.""" - assert sync_version in self._sync_factories, f'sync_version {sync_version} is not available' - return self._sync_factories[sync_version] - def has_synced_peer(self) -> bool: """ Return whether we are synced to at least one peer. """ @@ -340,11 +344,11 @@ def send_tx_to_peers(self, tx: BaseTransaction) -> None: connections = list(self.iter_ready_connections()) self.rng.shuffle(connections) for conn in connections: - conn.send_tx_to_peer(tx) + self.reactor.callLater(0, conn.send_tx_to_peer, tx) def disconnect_all_peers(self, *, force: bool = False) -> None: """Disconnect all peers.""" - for conn in self.get_connected_peers(): + for conn in self.get_connected_peers().values(): conn.disconnect(force=force) def on_connection_failure(self, failure: Failure, endpoint: PeerEndpoint) -> None: @@ -355,52 +359,52 @@ def on_connection_failure(self, failure: Failure, endpoint: PeerEndpoint) -> Non peers_count=self._get_peers_count() ) - def on_peer_connect(self, protocol: HathorProtocol) -> None: + def on_peer_connect(self, *, addr: PeerAddress, inbound: bool) -> None: """Called when a new connection is established from both inbound and outbound peers.""" if len(self._connections.connected_peers()) >= self.max_connections: self.log.warn('reached maximum number of connections', max_connections=self.max_connections) - protocol.disconnect(force=True) + protocol = self._connections.get_peer_by_address(addr) + self.reactor.callLater(0, protocol.disconnect, force=True) return - self._connections.on_connected(addr=protocol.addr, inbound=protocol.inbound) + self._connections.on_connected(addr=addr, inbound=inbound) self.pubsub.publish( HathorEvents.NETWORK_PEER_CONNECTED, - protocol=protocol, peers_count=self._get_peers_count() ) - def on_peer_ready(self, protocol: HathorProtocol) -> None: + def on_peer_ready(self, *, addr: PeerAddress, peer: PublicPeer) -> None: """Called when a peer is ready.""" - self.verified_peer_storage.add_or_replace(protocol.peer) - self.unverified_peer_storage.pop(protocol.peer.id, None) - connection_to_drop = self._connections.on_ready(addr=protocol.addr, peer_id=protocol.peer.id) + self.verified_peer_storage.add_or_replace(peer) + self.unverified_peer_storage.pop(peer.id, None) + address_to_drop = self._connections.on_ready(addr=addr, peer_id=peer.id) # we emit the event even if it's a duplicate peer as a matching # NETWORK_PEER_DISCONNECTED will be emitted regardless self.pubsub.publish( HathorEvents.NETWORK_PEER_READY, - protocol=protocol, peers_count=self._get_peers_count() ) - if connection_to_drop: + if address_to_drop: # connected twice to same peer - self.log.warn('duplicate connection to peer', addr=str(protocol.addr), peer_id=str(protocol.peer.id)) - self.reactor.callLater(0, self.drop_connection, connection_to_drop) - if connection_to_drop == protocol: + self.log.warn('duplicate connection to peer', addr=str(addr), peer_id=str(peer.id)) + self.reactor.callLater(0, self.drop_connection, address_to_drop) + if address_to_drop == addr: return # In case it was a retry, we must reset the data only here, after it gets ready - protocol.peer.info.reset_retry_timestamp() + peer.info.reset_retry_timestamp() + protocol = self._connections.get_peer_by_address(addr) if len(self._connections.ready_peers()) <= self.MAX_ENABLED_SYNC: protocol.enable_sync() - if protocol.peer.id in self.always_enable_sync: + if peer.id in self.always_enable_sync: protocol.enable_sync() # Notify other peers about this new peer connection. - self.relay_peer_to_ready_connections(protocol.peer) + self.relay_peer_to_ready_connections(peer) def relay_peer_to_ready_connections(self, peer: PublicPeer) -> None: """Relay peer to all ready connections.""" @@ -436,10 +440,10 @@ def on_initial_disconnect(self, *, addr: PeerAddress) -> None: def iter_connecting_outbound_peers(self) -> Iterable[PeerAddress]: yield from self._connections.connecting_outbound_peers() - def iter_handshaking_peers(self) -> Iterable[HathorProtocol]: + def iter_handshaking_peers(self) -> Iterable[P2PConnectionProtocol]: yield from self._connections.handshaking_peers().values() - def iter_ready_connections(self) -> Iterable[HathorProtocol]: + def iter_ready_connections(self) -> Iterable[P2PConnectionProtocol]: """Iterate over ready connections.""" yield from self._connections.ready_peers().values() @@ -447,8 +451,8 @@ def iter_not_ready_endpoints(self) -> Iterable[PeerAddress]: """Iterate over not-ready connections.""" yield from self._connections.not_ready_peers() - def get_connected_peers(self) -> Iterable[HathorProtocol]: - yield from self._connections.connected_peers().values() + def get_connected_peers(self) -> dict[PeerAddress, HathorProtocol]: + return self._connections.connected_peers() def get_ready_peer_by_id(self, peer_id: PeerId) -> HathorProtocol | None: return self._connections.get_ready_peer_by_id(peer_id) @@ -459,10 +463,11 @@ def is_peer_ready(self, peer_id: PeerId) -> bool: """ return self._connections.is_peer_ready(peer_id) - def on_receive_peer(self, peer: UnverifiedPeer) -> None: + def on_receive_peer(self, peer_data: dict[str, Any]) -> None: """ Update a peer information in our storage, and instantly attempt to connect to it if it is not connected yet. """ + peer = UnverifiedPeer.create_from_json(peer_data) if peer.id == self.my_peer.id: return peer = self.unverified_peer_storage.add_or_merge(peer) @@ -600,7 +605,7 @@ def connect_to( .addCallback(self._connect_to_callback, endpoint.addr, peer_id) \ .addErrback(self.on_connection_failure, endpoint) - self.log.info('connecting to', endpoint=str(endpoint), peer_id=str(peer_id)) + self.log.info('connecting to', addr=str(endpoint.addr), peer_id=str(peer_id)) self.pubsub.publish( HathorEvents.NETWORK_PEER_CONNECTING, peer=peer, @@ -662,20 +667,23 @@ def _add_hostname_entrypoint(self, hostname: str, address: IPv4Address | IPv6Add hostname_entrypoint = PeerAddress.from_hostname_address(hostname, address) self.my_peer.info.entrypoints.append(hostname_entrypoint) - def drop_connection(self, protocol: HathorProtocol) -> None: + def drop_connection(self, peer_addr_or_id: PeerAddress | PeerId) -> None: """ Drop a connection """ + match peer_addr_or_id: + case PeerId(): + protocol = self.get_ready_peer_by_id(peer_addr_or_id) + if not protocol: + return + case PeerAddress(): + protocol = self._connections.get_peer_by_address(peer_addr_or_id) + case _: + assert_never(peer_addr_or_id) + protocol_peer = protocol.get_peer() self.log.debug('dropping connection', peer_id=protocol_peer.id, protocol=type(protocol).__name__) protocol.send_error_and_close_connection('Connection droped') - def drop_connection_by_peer_id(self, peer_id: PeerId) -> None: - """ Drop a connection by peer id - """ - protocol = self.get_ready_peer_by_id(peer_id) - if protocol: - self.drop_connection(protocol) - def sync_update(self) -> None: """Update the subset of connections that running the sync algorithm.""" try: diff --git a/hathor/p2p/peer_connections.py b/hathor/p2p/peer_connections.py index ec5475048..a9cc9f2e8 100644 --- a/hathor/p2p/peer_connections.py +++ b/hathor/p2p/peer_connections.py @@ -95,6 +95,12 @@ def get_ready_peer_by_id(self, peer_id: PeerId) -> HathorProtocol | None: addr = self._addr_by_id.get(peer_id) return self._ready[addr] if addr else None + def get_peer_by_address(self, addr: PeerAddress) -> HathorProtocol: + """Get a peer by its address. Should only be called for peers that must exist.""" + peer = self._built.get(addr) or self._handshaking.get(addr) or self._ready.get(addr) + assert peer is not None + return peer + def get_peer_counts(self) -> PeerCounts: """Return the peer counts, for metrics.""" return PeerCounts( @@ -155,10 +161,10 @@ def on_handshake_disconnect(self, *, addr: PeerAddress) -> None: assert addr not in self._ready self._handshaking.pop(addr) - def on_ready(self, *, addr: PeerAddress, peer_id: PeerId) -> HathorProtocol | None: + def on_ready(self, *, addr: PeerAddress, peer_id: PeerId) -> PeerAddress | None: """ Callback for when a connection gets to the READY state. - If the PeerId of this connection is duplicate, return the protocol that we should disconnect. + If the PeerId of this connection is duplicate, return the address of the protocol that we should disconnect. Return None otherwise. """ assert addr not in self._built @@ -169,7 +175,7 @@ def on_ready(self, *, addr: PeerAddress, peer_id: PeerId) -> HathorProtocol | No protocol = self._handshaking.pop(addr) self._ready[addr] = protocol # We always index it by address, even if its PeerId is duplicate. - connection_to_drop: HathorProtocol | None = None + addr_to_drop: PeerAddress | None = None # If there's an existing connection with the same PeerId, this is a duplicate connection if old_connection := self.get_ready_peer_by_id(protocol.peer.id): @@ -177,13 +183,13 @@ def on_ready(self, *, addr: PeerAddress, peer_id: PeerId) -> HathorProtocol | No if self._should_drop_new_connection(protocol): # We return early when we drop the new connection, # so we don't override the old connection in _addr_by_id with it below. - return protocol + return protocol.addr # When dropping the old connection, we do override it in _addr_by_id below. - connection_to_drop = old_connection + addr_to_drop = old_connection.addr self._addr_by_id[peer_id] = addr - return connection_to_drop + return addr_to_drop def on_ready_disconnect(self, *, addr: PeerAddress, peer_id: PeerId) -> None: """Callback for when a connection is closed during the READY state.""" diff --git a/hathor/p2p/protocol.py b/hathor/p2p/protocol.py index 4b93ec137..84aaf8d2b 100644 --- a/hathor/p2p/protocol.py +++ b/hathor/p2p/protocol.py @@ -226,7 +226,7 @@ def on_connect(self) -> None: # The first state after INITIAL is HELLO. self.advance_state() - self.p2p_manager.on_peer_connect(self) + self.p2p_manager.on_peer_connect(addr=self.addr, inbound=self.inbound) def on_outbound_connect(self, peer_id: PeerId | None) -> None: """Called when we successfully establish an outbound connection to a peer.""" @@ -237,7 +237,7 @@ def on_outbound_connect(self, peer_id: PeerId | None) -> None: def on_peer_ready(self) -> None: assert self.peer is not None self.update_log_context() - self.p2p_manager.on_peer_ready(self) + self.p2p_manager.on_peer_ready(addr=self.addr, peer=self.peer) self.log.info('peer connected', peer_id=self.peer.id) def on_disconnect(self, reason: Failure) -> None: diff --git a/hathor/p2p/states/ready.py b/hathor/p2p/states/ready.py index f7b561e12..5206e1302 100644 --- a/hathor/p2p/states/ready.py +++ b/hathor/p2p/states/ready.py @@ -24,7 +24,7 @@ from hathor.indexes.height_index import HeightInfo from hathor.p2p import P2PDependencies from hathor.p2p.messages import ProtocolMessages -from hathor.p2p.peer import PublicPeer, UnverifiedPeer +from hathor.p2p.peer import PublicPeer from hathor.p2p.states.base import BaseState from hathor.p2p.sync_agent import SyncAgent from hathor.p2p.utils import to_height_info, to_serializable_best_blockchain @@ -105,10 +105,9 @@ def __init__(self, dependencies: P2PDependencies, protocol: HathorProtocol): sync_version = self.protocol.sync_version assert sync_version is not None self.log.debug(f'loading {sync_version}') - sync_factory = self.protocol.p2p_manager.get_sync_factory(sync_version) # Initialize sync agent and add its commands to the list of available commands. - self.sync_agent: SyncAgent = sync_factory.create_sync_agent(self.protocol) + self.sync_agent = SyncAgent.create(sync_version=sync_version, protocol=protocol, dependencies=dependencies) self.cmd_map.update(self.sync_agent.get_cmd_dict()) def on_enter(self) -> None: @@ -175,8 +174,7 @@ def handle_peers(self, payload: str) -> None: """ received_peers = json_loads(payload) for data in received_peers: - peer = UnverifiedPeer.create_from_json(data) - self.protocol.p2p_manager.on_receive_peer(peer) + self.protocol.p2p_manager.on_receive_peer(data) self.log.debug('received peers', payload=payload) def send_ping_if_necessary(self) -> None: diff --git a/hathor/p2p/sync_agent.py b/hathor/p2p/sync_agent.py index a700335ed..1793fd722 100644 --- a/hathor/p2p/sync_agent.py +++ b/hathor/p2p/sync_agent.py @@ -12,14 +12,44 @@ # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import annotations + from abc import ABC, abstractmethod -from typing import Callable +from typing import TYPE_CHECKING, Callable + +from typing_extensions import assert_never +from hathor.p2p import P2PDependencies from hathor.p2p.messages import ProtocolMessages +from hathor.p2p.sync_version import SyncVersion from hathor.transaction import BaseTransaction +if TYPE_CHECKING: + from hathor.p2p.protocol import HathorProtocol + class SyncAgent(ABC): + @classmethod + def create( + cls, + *, + sync_version: SyncVersion, + protocol: HathorProtocol, + dependencies: P2PDependencies, + ) -> SyncAgent: + match sync_version: + case SyncVersion.V1_1: + from hathor.p2p.manager import ConnectionsManager + from hathor.p2p.sync_v1.agent import NodeSyncTimestamp + assert isinstance(protocol.p2p_manager, ConnectionsManager) + downloader = protocol.p2p_manager.get_sync_v1_downloader() + return NodeSyncTimestamp(protocol=protocol, dependencies=dependencies, downloader=downloader) + case SyncVersion.V2: + from hathor.p2p.sync_v2.agent import NodeBlockSync + return NodeBlockSync(protocol=protocol, dependencies=dependencies) + case _: + assert_never(sync_version) + @abstractmethod def is_started(self) -> bool: """Whether the manager started running""" diff --git a/hathor/p2p/sync_factory.py b/hathor/p2p/sync_factory.py deleted file mode 100644 index da32bd68b..000000000 --- a/hathor/p2p/sync_factory.py +++ /dev/null @@ -1,27 +0,0 @@ -# Copyright 2021 Hathor Labs -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from abc import ABC, abstractmethod -from typing import TYPE_CHECKING - -from hathor.p2p.sync_agent import SyncAgent - -if TYPE_CHECKING: - from hathor.p2p.protocol import HathorProtocol - - -class SyncAgentFactory(ABC): - @abstractmethod - def create_sync_agent(self, protocol: 'HathorProtocol') -> SyncAgent: - raise NotImplementedError diff --git a/hathor/p2p/sync_v1/factory.py b/hathor/p2p/sync_v1/factory.py deleted file mode 100644 index 68fd740e1..000000000 --- a/hathor/p2p/sync_v1/factory.py +++ /dev/null @@ -1,42 +0,0 @@ -# Copyright 2021 Hathor Labs -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from typing import TYPE_CHECKING, Optional - -from hathor.p2p import P2PDependencies -from hathor.p2p.sync_agent import SyncAgent -from hathor.p2p.sync_factory import SyncAgentFactory -from hathor.p2p.sync_v1.agent import NodeSyncTimestamp -from hathor.p2p.sync_v1.downloader import Downloader - -if TYPE_CHECKING: - from hathor.p2p.protocol import HathorProtocol - - -class SyncV11Factory(SyncAgentFactory): - def __init__(self, dependencies: P2PDependencies) -> None: - self.dependencies = dependencies - self._downloader: Optional[Downloader] = None - - def get_downloader(self) -> Downloader: - if self._downloader is None: - self._downloader = Downloader(self.dependencies) - return self._downloader - - def create_sync_agent(self, protocol: 'HathorProtocol') -> SyncAgent: - return NodeSyncTimestamp( - protocol, - downloader=self.get_downloader(), - dependencies=self.dependencies, - ) diff --git a/hathor/p2p/sync_v2/factory.py b/hathor/p2p/sync_v2/factory.py deleted file mode 100644 index 4e46c8dfb..000000000 --- a/hathor/p2p/sync_v2/factory.py +++ /dev/null @@ -1,34 +0,0 @@ -# Copyright 2021 Hathor Labs -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from typing import TYPE_CHECKING - -from hathor.p2p import P2PDependencies -from hathor.p2p.sync_agent import SyncAgent -from hathor.p2p.sync_factory import SyncAgentFactory -from hathor.p2p.sync_v2.agent import NodeBlockSync - -if TYPE_CHECKING: - from hathor.p2p.protocol import HathorProtocol - - -class SyncV2Factory(SyncAgentFactory): - def __init__(self, dependencies: P2PDependencies) -> None: - self.dependencies = dependencies - - def create_sync_agent(self, protocol: 'HathorProtocol') -> SyncAgent: - return NodeBlockSync( - protocol=protocol, - dependencies=self.dependencies, - ) diff --git a/tests/others/test_metrics.py b/tests/others/test_metrics.py index dac804846..e0376b1a9 100644 --- a/tests/others/test_metrics.py +++ b/tests/others/test_metrics.py @@ -34,7 +34,6 @@ def test_p2p_network_events(self): # Execution pubsub.publish( HathorEvents.NETWORK_PEER_CONNECTED, - protocol=Mock(), peers_count=PeerConnectionsMetrics(3, 4, 5, 6) ) self.run_to_completion() diff --git a/tests/p2p/test_connections.py b/tests/p2p/test_connections.py index 64482a2bc..d0d33a4a4 100644 --- a/tests/p2p/test_connections.py +++ b/tests/p2p/test_connections.py @@ -21,5 +21,5 @@ def test_manager_connections(self) -> None: manager.connections.connect_to(endpoint) self.assertIn(endpoint.addr, manager.connections.iter_not_ready_endpoints()) - self.assertNotIn(endpoint.addr, [conn.addr for conn in manager.connections.iter_ready_connections()]) - self.assertNotIn(endpoint.addr, [conn.addr for conn in manager.connections.get_connected_peers()]) + self.assertNotIn(endpoint.addr, manager.connections._connections.ready_peers()) + self.assertNotIn(endpoint.addr, manager.connections.get_connected_peers()) diff --git a/tests/p2p/test_get_best_blockchain.py b/tests/p2p/test_get_best_blockchain.py index 67ed0bc74..c542ddc38 100644 --- a/tests/p2p/test_get_best_blockchain.py +++ b/tests/p2p/test_get_best_blockchain.py @@ -2,6 +2,7 @@ from hathor.indexes.height_index import HeightInfo from hathor.p2p.messages import ProtocolMessages +from hathor.p2p.protocol import HathorProtocol from hathor.p2p.resources import StatusResource from hathor.p2p.states import ReadyState from hathor.p2p.utils import to_height_info @@ -41,6 +42,8 @@ def test_get_best_blockchain(self) -> None: # HelloState is responsible to transmite to protocol the capabilities protocol1 = connected_peers2[0] protocol2 = connected_peers1[0] + assert isinstance(protocol1, HathorProtocol) + assert isinstance(protocol2, HathorProtocol) self.assertIsNotNone(protocol1.capabilities) self.assertIsNotNone(protocol2.capabilities) @@ -97,12 +100,14 @@ def test_handle_get_best_blockchain(self) -> None: connected_peers1 = list(manager1.connections.iter_ready_connections()) self.assertEqual(1, len(connected_peers1)) protocol2 = connected_peers1[0] + assert isinstance(protocol2, HathorProtocol) state2 = protocol2.state assert isinstance(state2, ReadyState) connected_peers2 = list(manager2.connections.iter_ready_connections()) self.assertEqual(1, len(connected_peers2)) protocol1 = connected_peers2[0] + assert isinstance(protocol1, HathorProtocol) state1 = protocol1.state assert isinstance(state1, ReadyState) @@ -137,6 +142,7 @@ def test_handle_get_best_blockchain(self) -> None: connected_peers2 = list(manager2.connections.iter_ready_connections()) self.assertEqual(1, len(connected_peers2)) protocol1 = connected_peers2[0] + assert isinstance(protocol1, HathorProtocol) state1 = protocol1.state assert isinstance(state1, ReadyState) @@ -156,12 +162,14 @@ def test_handle_best_blockchain(self) -> None: connected_peers1 = list(manager1.connections.iter_ready_connections()) self.assertEqual(1, len(connected_peers1)) protocol2 = connected_peers1[0] + assert isinstance(protocol2, HathorProtocol) state2 = protocol2.state assert isinstance(state2, ReadyState) connected_peers2 = list(manager2.connections.iter_ready_connections()) self.assertEqual(1, len(connected_peers2)) protocol1 = connected_peers2[0] + assert isinstance(protocol1, HathorProtocol) state1 = protocol1.state assert isinstance(state1, ReadyState) @@ -222,8 +230,10 @@ def test_node_without_get_best_blockchain_capability(self) -> None: # assert the peers have the proper capabilities protocol2 = connected_peers1[0] + assert isinstance(protocol2, HathorProtocol) self.assertTrue(protocol2.capabilities.issuperset(set(capabilities_without_get_best_blockchain))) protocol1 = connected_peers2[0] + assert isinstance(protocol1, HathorProtocol) default_capabilities = self._settings.get_default_capabilities() self.assertTrue(protocol1.capabilities.issuperset(set(default_capabilities))) @@ -316,12 +326,14 @@ def test_stop_looping_on_exit(self) -> None: connected_peers1 = list(manager1.connections.iter_ready_connections()) self.assertEqual(1, len(connected_peers1)) protocol2 = connected_peers1[0] + assert isinstance(protocol2, HathorProtocol) state2 = protocol2.state assert isinstance(state2, ReadyState) connected_peers2 = list(manager2.connections.iter_ready_connections()) self.assertEqual(1, len(connected_peers2)) protocol1 = connected_peers2[0] + assert isinstance(protocol1, HathorProtocol) state1 = protocol1.state assert isinstance(state1, ReadyState) diff --git a/tests/p2p/test_sync.py b/tests/p2p/test_sync.py index bd23a9e87..896601b19 100644 --- a/tests/p2p/test_sync.py +++ b/tests/p2p/test_sync.py @@ -4,7 +4,6 @@ from hathor.crypto.util import decode_address from hathor.p2p import P2PDependencies from hathor.p2p.protocol import PeerIdState -from hathor.p2p.sync_version import SyncVersion from hathor.simulator import FakeConnection from hathor.transaction import Block, Transaction from hathor.transaction.storage.exceptions import TransactionIsNotABlock @@ -267,7 +266,7 @@ def test_downloader(self) -> None: self.assertTrue(isinstance(conn.proto1.state, PeerIdState)) self.assertTrue(isinstance(conn.proto2.state, PeerIdState)) - downloader = conn.proto2.p2p_manager.get_sync_factory(SyncVersion.V1_1).get_downloader() + downloader = conn.proto2.p2p_manager.get_sync_v1_downloader() p2p_dependencies1 = P2PDependencies( reactor=self.manager1.reactor, @@ -350,8 +349,6 @@ def test_downloader(self) -> None: def _downloader_bug_setup(self) -> None: """ This is an auxiliary method to setup a bug scenario.""" - from hathor.p2p.sync_version import SyncVersion - # ## premise setup # # - peer_X will be self.manager @@ -380,7 +377,7 @@ def _downloader_bug_setup(self) -> None: # create the peer that will experience the bug self.manager_bug = self.create_peer(self.network) - self.downloader = self.manager_bug.connections.get_sync_factory(SyncVersion.V1_1).get_downloader() + self.downloader = self.manager_bug.connections.get_sync_v1_downloader() self.downloader.window_size = 1 self.conn1 = FakeConnection(self.manager_bug, self.manager1) self.conn2 = FakeConnection(self.manager_bug, self.manager2) diff --git a/tests/p2p/test_sync_rate_limiter.py b/tests/p2p/test_sync_rate_limiter.py index e550b7da4..88ff7f174 100644 --- a/tests/p2p/test_sync_rate_limiter.py +++ b/tests/p2p/test_sync_rate_limiter.py @@ -2,6 +2,7 @@ from twisted.python.failure import Failure +from hathor.p2p.protocol import HathorProtocol from hathor.p2p.states import ReadyState from hathor.p2p.sync_v1.agent import NodeSyncTimestamp from hathor.simulator import FakeConnection @@ -34,6 +35,7 @@ def test_sync_rate_limiter(self) -> None: connected_peers2 = list(manager2.connections.iter_ready_connections()) self.assertEqual(1, len(connected_peers2)) protocol1 = connected_peers2[0] + assert isinstance(protocol1, HathorProtocol) assert isinstance(protocol1.state, ReadyState) sync2 = protocol1.state.sync_agent assert isinstance(sync2, NodeSyncTimestamp) @@ -68,6 +70,7 @@ def test_sync_rate_limiter_disconnect(self) -> None: self.assertEqual(1, len(connected_peers2)) protocol1 = connected_peers2[0] + assert isinstance(protocol1, HathorProtocol) assert isinstance(protocol1.state, ReadyState) sync1 = protocol1.state.sync_agent assert isinstance(sync1, NodeSyncTimestamp) @@ -118,6 +121,7 @@ def test_sync_rate_limiter_delayed_calls_draining(self) -> None: self.assertEqual(1, len(connected_peers2)) protocol1 = connected_peers2[0] + assert isinstance(protocol1, HathorProtocol) assert isinstance(protocol1.state, ReadyState) sync1 = protocol1.state.sync_agent assert isinstance(sync1, NodeSyncTimestamp) @@ -158,6 +162,7 @@ def test_sync_rate_limiter_delayed_calls_stop(self) -> None: self.assertEqual(1, len(connected_peers2)) protocol1 = connected_peers2[0] + assert isinstance(protocol1, HathorProtocol) assert isinstance(protocol1.state, ReadyState) sync1 = protocol1.state.sync_agent assert isinstance(sync1, NodeSyncTimestamp) diff --git a/tests/sysctl/test_p2p.py b/tests/sysctl/test_p2p.py index bcadc3bbd..f47e82c9d 100644 --- a/tests/sysctl/test_p2p.py +++ b/tests/sysctl/test_p2p.py @@ -137,7 +137,7 @@ def test_available_sync_versions(self): self.assertEqual(sysctl.get('available_sync_versions'), ['v1', 'v2']) - del connections._sync_factories[SyncVersion.V2] + connections._available_sync_versions.remove(SyncVersion.V2) self.assertEqual(sysctl.get('available_sync_versions'), ['v1']) def _default_enabled_sync_versions(self) -> list[str]: