Skip to content

Commit

Permalink
refactor(p2p): remove HathorManager and P2PManager dependencies
Browse files Browse the repository at this point in the history
  • Loading branch information
glevco committed Nov 7, 2024
1 parent 56993b9 commit 45ede7d
Show file tree
Hide file tree
Showing 13 changed files with 66 additions and 74 deletions.
32 changes: 30 additions & 2 deletions hathor/p2p/dependencies/protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,40 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Protocol
from __future__ import annotations

from typing import TYPE_CHECKING, Iterable, Protocol

from hathor.indexes.height_index import HeightInfo
from hathor.transaction import Block, Vertex
from hathor.p2p.peer_endpoint import PeerAddress
from hathor.transaction import BaseTransaction, Block, Vertex
from hathor.types import VertexId

if TYPE_CHECKING:
from hathor.p2p.peer import PublicPeer, UnverifiedPeer
from hathor.p2p.peer_id import PeerId
from hathor.p2p.protocol import HathorProtocol
from hathor.p2p.sync_factory import SyncAgentFactory
from hathor.p2p.sync_version import SyncVersion


class P2PManagerProtocol(Protocol):
"""Abstract the P2PManager as a Python protocol to be used in P2P classes."""

def is_peer_whitelisted(self, peer_id: PeerId) -> bool: ...
def get_enabled_sync_versions(self) -> set[SyncVersion]: ...
def get_sync_factory(self, sync_version: SyncVersion) -> SyncAgentFactory: ...
def get_verified_peers(self) -> Iterable[PublicPeer]: ...
def on_receive_peer(self, peer: UnverifiedPeer) -> None: ...
def on_peer_connect(self, protocol: HathorProtocol) -> None: ...
def on_peer_ready(self, protocol: HathorProtocol) -> None: ...
def on_handshake_disconnect(self, *, addr: PeerAddress) -> None: ...
def on_ready_disconnect(self, *, addr: PeerAddress, peer_id: PeerId) -> None: ...
def on_unknown_disconnect(self, *, addr: PeerAddress) -> None: ...
def get_randbytes(self, n: int) -> bytes: ...
def is_peer_ready(self, peer_id: PeerId) -> bool: ...
def send_tx_to_peers(self, tx: BaseTransaction) -> None: ...


class P2PVertexHandlerProtocol(Protocol):
"""Abstract the VertexHandler as a Python protocol to be used in P2P classes."""
Expand Down
5 changes: 2 additions & 3 deletions hathor/p2p/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
from hathor.p2p.peer_storage import UnverifiedPeerStorage, VerifiedPeerStorage
from hathor.p2p.protocol import HathorProtocol
from hathor.p2p.rate_limiter import RateLimiter
from hathor.p2p.states.ready import ReadyState
from hathor.p2p.sync_factory import SyncAgentFactory
from hathor.p2p.sync_version import SyncVersion
from hathor.p2p.utils import parse_whitelist
Expand Down Expand Up @@ -458,7 +457,7 @@ def is_peer_ready(self, peer_id: PeerId) -> bool:
"""
return self._connections.is_peer_ready(peer_id)

def on_receive_peer(self, peer: UnverifiedPeer, origin: Optional[ReadyState] = None) -> None:
def on_receive_peer(self, peer: UnverifiedPeer) -> None:
""" Update a peer information in our storage, and instantly attempt to connect
to it if it is not connected yet.
"""
Expand Down Expand Up @@ -623,7 +622,7 @@ def listen(self, description: str) -> None:
if self.use_ssl:
factory = TLSMemoryBIOFactory(self.my_peer.certificate_options, False, factory)

factory = NetfilterFactory(self, factory)
factory = NetfilterFactory(factory)

self.log.info('trying to listen on', endpoint=description)
deferred: Deferred[IListeningPort] = endpoint.listen(factory)
Expand Down
5 changes: 1 addition & 4 deletions hathor/p2p/netfilter/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,12 @@
if TYPE_CHECKING:
from twisted.internet.interfaces import IAddress

from hathor.p2p.manager import ConnectionsManager
from hathor.p2p.protocol import HathorProtocol


class NetfilterContext:
"""Context sent to the targets when a match occurs."""
def __init__(self, *, connections: Optional['ConnectionsManager'] = None, addr: Optional['IAddress'] = None,
protocol: Optional['HathorProtocol'] = None):
def __init__(self, *, addr: Optional['IAddress'] = None, protocol: Optional['HathorProtocol'] = None):
"""Initialize the context."""
self.addr = addr
self.protocol = protocol
self.connections = connections
9 changes: 2 additions & 7 deletions hathor/p2p/netfilter/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import TYPE_CHECKING, Optional
from typing import Optional

from twisted.internet.interfaces import IAddress, IProtocolFactory
from twisted.internet.protocol import Protocol
Expand All @@ -21,19 +21,14 @@
from hathor.p2p.netfilter import get_table
from hathor.p2p.netfilter.context import NetfilterContext

if TYPE_CHECKING:
from hathor.p2p.manager import ConnectionsManager


class NetfilterFactory(WrappingFactory):
"""Wrapper factory to easily check new connections."""
def __init__(self, connections: 'ConnectionsManager', wrappedFactory: 'IProtocolFactory'):
def __init__(self, wrappedFactory: 'IProtocolFactory'):
super().__init__(wrappedFactory)
self.connections = connections

def buildProtocol(self, addr: IAddress) -> Optional[Protocol]:
context = NetfilterContext(
connections=self.connections,
addr=addr,
)
verdict = get_table('filter').get_chain('pre_conn').process(context)
Expand Down
25 changes: 9 additions & 16 deletions hathor/p2p/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from twisted.python.failure import Failure

from hathor.p2p import P2PDependencies
from hathor.p2p.dependencies.protocols import P2PManagerProtocol
from hathor.p2p.messages import ProtocolMessages
from hathor.p2p.peer import PrivatePeer, PublicPeer
from hathor.p2p.peer_endpoint import PeerAddress
Expand Down Expand Up @@ -75,8 +76,6 @@ class WarningFlags(str, Enum):
NO_ENTRYPOINTS = 'no_entrypoints'

my_peer: PrivatePeer
connections: 'ConnectionsManager'
node: 'HathorManager'
app_version: str
last_message: float
_peer: Optional[PublicPeer]
Expand All @@ -99,7 +98,7 @@ def peer(self) -> PublicPeer:
def __init__(
self,
my_peer: PrivatePeer,
p2p_manager: 'ConnectionsManager',
p2p_manager: P2PManagerProtocol,
*,
dependencies: P2PDependencies,
use_ssl: bool,
Expand All @@ -109,14 +108,10 @@ def __init__(
self.dependencies = dependencies
self._settings = dependencies.settings
self.my_peer = my_peer
self.connections = p2p_manager
self.p2p_manager: P2PManagerProtocol = p2p_manager
self.addr = addr

assert p2p_manager.manager is not None
self.node = p2p_manager.manager

assert self.connections.reactor is not None
self.reactor = self.connections.reactor
self.reactor = self.dependencies.reactor

# Indicate whether it is an inbound connection (true) or an outbound connection (false).
self.inbound = inbound
Expand Down Expand Up @@ -250,8 +245,7 @@ def on_connect(self) -> None:
# The initial state is HELLO.
self.change_state(self.PeerState.HELLO)

if self.connections:
self.connections.on_peer_connect(self)
self.p2p_manager.on_peer_connect(self)

def on_outbound_connect(self, peer_id: PeerId | None) -> None:
"""Called when we successfully establish an outbound connection to a peer."""
Expand All @@ -260,10 +254,9 @@ def on_outbound_connect(self, peer_id: PeerId | None) -> None:
self.expected_peer_id = peer_id

def on_peer_ready(self) -> None:
assert self.connections is not None
assert self.peer is not None
self.update_log_context()
self.connections.on_peer_ready(self)
self.p2p_manager.on_peer_ready(self)
self.log.info('peer connected', peer_id=self.peer.id)

def on_disconnect(self, reason: Failure) -> None:
Expand All @@ -288,19 +281,19 @@ def on_disconnect(self, reason: Failure) -> None:
addr=str(self.addr),
peer_id=str(self.get_peer_id()),
)
self.connections.on_unknown_disconnect(addr=self.addr)
self.p2p_manager.on_unknown_disconnect(addr=self.addr)
return
self.state.on_exit()
state_name = self.state.state_name

if self.is_state(self.PeerState.HELLO) or self.is_state(self.PeerState.PEER_ID):
self.state = None
self.connections.on_handshake_disconnect(addr=self.addr)
self.p2p_manager.on_handshake_disconnect(addr=self.addr)
return

if self.is_state(self.PeerState.READY):
self.state = None
self.connections.on_ready_disconnect(addr=self.addr, peer_id=self.peer.id)
self.p2p_manager.on_ready_disconnect(addr=self.addr, peer_id=self.peer.id)
return

self.state = None
Expand Down
5 changes: 1 addition & 4 deletions hathor/p2p/states/hello.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,7 @@ def _get_hello_data(self) -> dict[str, Any]:

def _get_sync_versions(self) -> set[SyncVersion]:
"""Shortcut to ConnectionManager.get_enabled_sync_versions"""
connections_manager = self.protocol.connections
assert connections_manager is not None
return connections_manager.get_enabled_sync_versions()
return self.protocol.p2p_manager.get_enabled_sync_versions()

def on_enter(self) -> None:
# After a connection is made, we just send a HELLO message.
Expand Down Expand Up @@ -162,7 +160,6 @@ def handle_hello(self, payload: str) -> None:

context = NetfilterContext(
protocol=protocol,
connections=protocol.connections,
addr=protocol.transport.getPeer(),
)
verdict = get_table('filter').get_chain('post_hello').process(context)
Expand Down
10 changes: 4 additions & 6 deletions hathor/p2p/states/peer_id.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,9 @@ async def handle_peer_id(self, payload: str) -> None:
protocol.send_error_and_close_connection('Are you my clone?!')
return

if protocol.connections is not None:
if protocol.connections.is_peer_ready(peer.id):
protocol.send_error_and_close_connection('We are already connected.')
return
if self.protocol.p2p_manager.is_peer_ready(peer.id):
protocol.send_error_and_close_connection('We are already connected.')
return

entrypoint_valid = await peer.info.validate_entrypoint(protocol)
if not entrypoint_valid:
Expand All @@ -131,7 +130,6 @@ async def handle_peer_id(self, payload: str) -> None:

context = NetfilterContext(
protocol=protocol,
connections=protocol.connections,
addr=protocol.transport.getPeer(),
)
verdict = get_table('filter').get_chain('post_peerid').process(context)
Expand All @@ -146,7 +144,7 @@ def _should_block_peer(self, peer_id: PeerId) -> bool:
Currently this is only because the peer is not in a whitelist and whitelist blocking is active.
"""
peer_is_whitelisted = self.protocol.connections.is_peer_whitelisted(peer_id)
peer_is_whitelisted = self.protocol.p2p_manager.is_peer_whitelisted(peer_id)
# never block whitelisted peers
if peer_is_whitelisted:
return False
Expand Down
19 changes: 7 additions & 12 deletions hathor/p2p/states/ready.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,22 +96,18 @@ def __init__(self, protocol: 'HathorProtocol', *, dependencies: P2PDependencies)
ProtocolMessages.BEST_BLOCKCHAIN: self.handle_best_blockchain,
})

# Initialize sync manager and add its commands to the list of available commands.
connections = self.protocol.connections
assert connections is not None

# Get the sync factory and create a sync manager from it
# Get the sync factory and create a sync agent from it
sync_version = self.protocol.sync_version
assert sync_version is not None
self.log.debug(f'loading {sync_version}')
sync_factory = connections.get_sync_factory(sync_version)
sync_factory = self.protocol.p2p_manager.get_sync_factory(sync_version)

# Initialize sync agent and add its commands to the list of available commands.
self.sync_agent: SyncAgent = sync_factory.create_sync_agent(self.protocol)
self.cmd_map.update(self.sync_agent.get_cmd_dict())

def on_enter(self) -> None:
if self.protocol.connections:
self.protocol.on_peer_ready()
self.protocol.on_peer_ready()

self.lc_ping.start(1, now=False)

Expand Down Expand Up @@ -155,7 +151,7 @@ def handle_get_peers(self, payload: str) -> None:
""" Executed when a GET-PEERS command is received. It just responds with
a list of all known peers.
"""
for peer in self.protocol.connections.get_verified_peers():
for peer in self.protocol.p2p_manager.get_verified_peers():
self.send_peers([peer])

def send_peers(self, peer_list: Iterable[PublicPeer]) -> None:
Expand All @@ -175,8 +171,7 @@ def handle_peers(self, payload: str) -> None:
received_peers = json_loads(payload)
for data in received_peers:
peer = UnverifiedPeer.create_from_json(data)
if self.protocol.connections:
self.protocol.connections.on_receive_peer(peer, origin=self)
self.protocol.p2p_manager.on_receive_peer(peer)
self.log.debug('received peers', payload=payload)

def send_ping_if_necessary(self) -> None:
Expand All @@ -195,7 +190,7 @@ def send_ping(self) -> None:
"""
# Add a salt number to prevent peers from faking rtt.
self.ping_start_time = self.reactor.seconds()
self.ping_salt = self.protocol.connections.get_randbytes(self.ping_salt_size).hex()
self.ping_salt = self.protocol.p2p_manager.get_randbytes(self.ping_salt_size).hex()
self.send_message(ProtocolMessages.PING, self.ping_salt)

def send_pong(self, salt: str) -> None:
Expand Down
11 changes: 6 additions & 5 deletions hathor/p2p/sync_v1/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from twisted.internet.interfaces import IDelayedCall

from hathor.p2p import P2PDependencies
from hathor.p2p.manager import ConnectionsManager
from hathor.p2p.messages import GetNextPayload, GetTipsPayload, NextPayload, ProtocolMessages, TipsPayload
from hathor.p2p.sync_agent import SyncAgent
from hathor.p2p.sync_v1.downloader import Downloader
Expand Down Expand Up @@ -86,9 +87,9 @@ def __init__(
self.tx_storage = self.dependencies.tx_storage

# Rate limit for this connection.
assert protocol.connections is not None
self.global_rate_limiter: 'RateLimiter' = protocol.connections.rate_limiter
self.GlobalRateLimiter = protocol.connections.GlobalRateLimiter
assert isinstance(self.protocol.p2p_manager, ConnectionsManager)
self.global_rate_limiter: 'RateLimiter' = self.protocol.p2p_manager.rate_limiter
self.GlobalRateLimiter = self.protocol.p2p_manager.GlobalRateLimiter

self.call_later_id: Optional[IDelayedCall] = None
self.call_later_interval: int = 1 # seconds
Expand Down Expand Up @@ -639,7 +640,7 @@ def handle_data(self, payload: str) -> None:
# in the network, thus, we propagate it as well.
success = self.dependencies.vertex_handler.on_new_vertex(tx)
if success:
self.protocol.connections.send_tx_to_peers(tx)
self.protocol.p2p_manager.send_tx_to_peers(tx)
self.update_received_stats(tx, success)

def update_received_stats(self, tx: 'BaseTransaction', result: bool) -> None:
Expand Down Expand Up @@ -690,7 +691,7 @@ def on_tx_success(self, tx: 'BaseTransaction') -> 'BaseTransaction':
# Add tx to the DAG.
success = self.dependencies.vertex_handler.on_new_vertex(tx)
if success:
self.protocol.connections.send_tx_to_peers(tx)
self.protocol.p2p_manager.send_tx_to_peers(tx)
# Updating stats data
self.update_received_stats(tx, success)
return tx
Expand Down
Loading

0 comments on commit 45ede7d

Please sign in to comment.