Skip to content

Commit

Permalink
refactor(p2p): implement P2PConnectionProtocol
Browse files Browse the repository at this point in the history
  • Loading branch information
glevco committed Nov 7, 2024
1 parent 36da2f5 commit 73b394a
Show file tree
Hide file tree
Showing 22 changed files with 168 additions and 203 deletions.
14 changes: 5 additions & 9 deletions hathor/builder/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,26 +63,23 @@ class SyncSupportLevel(IntEnum):
ENABLED = 2 # available and enabled by default, possible to disable at runtime

@classmethod
def add_factories(
def add_versions(
cls,
p2p_manager: ConnectionsManager,
dependencies: P2PDependencies,
sync_v1_support: 'SyncSupportLevel',
sync_v2_support: 'SyncSupportLevel',
) -> None:
"""Adds the sync factory to the manager according to the support level."""
from hathor.p2p.sync_v1.factory import SyncV11Factory
from hathor.p2p.sync_v2.factory import SyncV2Factory
"""Adds the sync version to the manager according to the support level."""
from hathor.p2p.sync_version import SyncVersion

# sync-v1 support:
if sync_v1_support > cls.UNAVAILABLE:
p2p_manager.add_sync_factory(SyncVersion.V1_1, SyncV11Factory(dependencies))
p2p_manager.add_sync_version(SyncVersion.V1_1)
if sync_v1_support is cls.ENABLED:
p2p_manager.enable_sync_version(SyncVersion.V1_1)
# sync-v2 support:
if sync_v2_support > cls.UNAVAILABLE:
p2p_manager.add_sync_factory(SyncVersion.V2, SyncV2Factory(dependencies))
p2p_manager.add_sync_version(SyncVersion.V2)
if sync_v2_support is cls.ENABLED:
p2p_manager.enable_sync_version(SyncVersion.V2)

Expand Down Expand Up @@ -427,9 +424,8 @@ def _get_or_create_p2p_manager(self) -> ConnectionsManager:
ssl=enable_ssl,
rng=self._rng,
)
SyncSupportLevel.add_factories(
SyncSupportLevel.add_versions(
self._p2p_manager,
dependencies,
self._sync_v1_support,
self._sync_v2_support,
)
Expand Down
10 changes: 8 additions & 2 deletions hathor/builder/cli_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,13 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
log_vertex_bytes=self._args.log_vertex_bytes,
)

if self._args.x_multiprocess_p2p:
self.check_or_raise(
self._args.x_remove_sync_v1,
'multiprocess support for P2P is only available if sync-v1 is removed (use --x-remove-sync-v1)'
)
raise NotImplementedError('Multiprocess support for P2P is not yet implemented.')

p2p_dependencies = P2PDependencies(
reactor=reactor,
settings=settings,
Expand All @@ -351,9 +358,8 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
rng=Random(),
)

SyncSupportLevel.add_factories(
SyncSupportLevel.add_versions(
p2p_manager,
p2p_dependencies,
sync_v1_support,
sync_v2_support,
)
Expand Down
7 changes: 1 addition & 6 deletions hathor/cli/quick_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,11 @@ def create_parser(cls) -> ArgumentParser:
return parser

def prepare(self, *, register_resources: bool = True) -> None:
from hathor.p2p.sync_v2.factory import SyncV2Factory
from hathor.p2p.sync_version import SyncVersion

super().prepare(register_resources=False)
self._no_wait = self._args.no_wait

self.log.info('patching vertex_handler.on_new_vertex to quit on success')
p2p_factory = self.manager.connections.get_sync_factory(SyncVersion.V2)
assert isinstance(p2p_factory, SyncV2Factory)
p2p_factory.dependencies.vertex_handler = VertexHandlerWrapper(
self.manager.connections.dependencies.vertex_handler = VertexHandlerWrapper(
self.manager.vertex_handler,
self.manager,
self._args.quit_after_n_blocks,
Expand Down
2 changes: 2 additions & 0 deletions hathor/cli/run_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class RunNode:
('--x-enable-event-queue', lambda args: bool(args.x_enable_event_queue)),
('--x-asyncio-reactor', lambda args: bool(args.x_asyncio_reactor)),
('--x-ipython-kernel', lambda args: bool(args.x_ipython_kernel)),
('--x-multiprocess-p2p', lambda args: bool(args.x_multiprocess_p2p)),
]

env_vars_prefix: str | None = None
Expand Down Expand Up @@ -162,6 +163,7 @@ def create_parser(cls) -> ArgumentParser:
help='Log tx bytes for debugging')
parser.add_argument('--disable-ws-history-streaming', action='store_true',
help='Disable websocket history streaming API')
parser.add_argument('--x-multiprocess-p2p', action='store_true', help='Enable multiprocess support for P2P.')
return parser

def prepare(self, *, register_resources: bool = True) -> None:
Expand Down
1 change: 1 addition & 0 deletions hathor/cli/run_node_args.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,4 @@ class RunNodeArgs(BaseModel, extra=Extra.allow):
nano_testnet: bool
log_vertex_bytes: bool
disable_ws_history_streaming: bool
x_multiprocess_p2p: bool
2 changes: 1 addition & 1 deletion hathor/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -987,7 +987,7 @@ def remove_peer_from_whitelist_and_disconnect(self, peer_id: PeerId) -> None:
if peer_id in self.peers_whitelist:
self.peers_whitelist.remove(peer_id)
# disconnect from node
self.connections.drop_connection_by_peer_id(peer_id)
self.connections.drop_connection(peer_id)

def has_recent_activity(self) -> bool:
current_timestamp = time.time()
Expand Down
6 changes: 3 additions & 3 deletions hathor/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ def collect_peer_connection_metrics(self) -> None:
"""
self.peer_connection_metrics.clear()

for connection in self.connections.get_connected_peers():
for addr, connection in self.connections.get_connected_peers().items():
peer = connection.get_peer_if_set()
if not peer:
# A connection without peer will not be able to communicate
Expand All @@ -256,8 +256,8 @@ def collect_peer_connection_metrics(self) -> None:

metrics = connection.get_metrics()
metric = PeerConnectionMetrics(
connection_string=str(connection.addr),
peer_id=str(connection.peer.id),
connection_string=str(addr),
peer_id=str(peer.id),
network=settings.NETWORK_NAME,
received_messages=metrics.received_messages,
sent_messages=metrics.sent_messages,
Expand Down
29 changes: 21 additions & 8 deletions hathor/p2p/dependencies/protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,44 @@

from __future__ import annotations

from typing import TYPE_CHECKING, Iterable, Protocol
from typing import TYPE_CHECKING, Any, Iterable, Protocol

from hathor.indexes.height_index import HeightInfo
from hathor.p2p.peer_endpoint import PeerAddress
from hathor.transaction import BaseTransaction, Block, Vertex
from hathor.types import VertexId

if TYPE_CHECKING:
from hathor.p2p.peer import PublicPeer, UnverifiedPeer
from hathor.p2p.peer import PublicPeer
from hathor.p2p.peer_id import PeerId
from hathor.p2p.protocol import HathorProtocol
from hathor.p2p.sync_factory import SyncAgentFactory
from hathor.p2p.protocol import ConnectionMetrics
from hathor.p2p.sync_version import SyncVersion


class P2PConnectionProtocol(Protocol):
"""Abstract HathorProtocol as a Python protocol to be used in P2PManager."""

def is_synced(self) -> bool: ...
def send_tx_to_peer(self, tx: BaseTransaction) -> None: ...
def disconnect(self, reason: str = '', *, force: bool = False) -> None: ...
def get_peer(self) -> PublicPeer: ...
def get_peer_if_set(self) -> PublicPeer | None: ...
def enable_sync(self) -> None: ...
def disable_sync(self) -> None: ...
def is_sync_enabled(self) -> bool: ...
def send_peers(self, peers: Iterable[PublicPeer]) -> None: ...
def get_metrics(self) -> ConnectionMetrics: ...


class P2PManagerProtocol(Protocol):
"""Abstract the P2PManager as a Python protocol to be used in P2P classes."""

def is_peer_whitelisted(self, peer_id: PeerId) -> bool: ...
def get_enabled_sync_versions(self) -> set[SyncVersion]: ...
def get_sync_factory(self, sync_version: SyncVersion) -> SyncAgentFactory: ...
def get_verified_peers(self) -> Iterable[PublicPeer]: ...
def on_receive_peer(self, peer: UnverifiedPeer) -> None: ...
def on_peer_connect(self, protocol: HathorProtocol) -> None: ...
def on_peer_ready(self, protocol: HathorProtocol) -> None: ...
def on_receive_peer(self, peer_data: dict[str, Any]) -> None: ...
def on_peer_connect(self, *, addr: PeerAddress, inbound: bool) -> None: ...
def on_peer_ready(self, *, addr: PeerAddress, peer: PublicPeer) -> None: ...
def on_handshake_disconnect(self, *, addr: PeerAddress) -> None: ...
def on_ready_disconnect(self, *, addr: PeerAddress, peer_id: PeerId) -> None: ...
def on_initial_disconnect(self, *, addr: PeerAddress) -> None: ...
Expand Down
Loading

0 comments on commit 73b394a

Please sign in to comment.