Skip to content

Commit

Permalink
refactor(p2p): minor pre-IPC refactors
Browse files Browse the repository at this point in the history
  • Loading branch information
glevco committed Nov 7, 2024
1 parent 07ecfc0 commit 3b27e43
Show file tree
Hide file tree
Showing 13 changed files with 96 additions and 53 deletions.
10 changes: 9 additions & 1 deletion hathor/builder/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ def build(self) -> BuildArtifacts:
vertex_handler = self._get_or_create_vertex_handler()
vertex_parser = self._get_or_create_vertex_parser()
poa_block_producer = self._get_or_create_poa_block_producer()
capabilities = self._get_or_create_capabilities()

if self._enable_address_index:
indexes.enable_address_index(pubsub)
Expand Down Expand Up @@ -263,7 +264,7 @@ def build(self) -> BuildArtifacts:
wallet=wallet,
rng=self._rng,
checkpoints=self._checkpoints,
capabilities=self._capabilities,
capabilities=capabilities,
environment_info=get_environment_info(self._cmdline, str(peer.id)),
bit_signaling_service=bit_signaling_service,
verification_service=verification_service,
Expand Down Expand Up @@ -642,6 +643,13 @@ def _get_or_create_poa_block_producer(self) -> PoaBlockProducer | None:

return self._poa_block_producer

def _get_or_create_capabilities(self) -> list[str]:
if self._capabilities is None:
settings = self._get_or_create_settings()
self._capabilities = settings.get_default_capabilities()

return self._capabilities

def use_memory(self) -> 'Builder':
self.check_if_can_modify()
self._storage_type = StorageType.MEMORY
Expand Down
2 changes: 2 additions & 0 deletions hathor/builder/cli_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
)

cpu_mining_service = CpuMiningService()
capabilities = settings.get_default_capabilities()

p2p_manager = ConnectionsManager(
settings=settings,
Expand Down Expand Up @@ -384,6 +385,7 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
vertex_handler=vertex_handler,
vertex_parser=vertex_parser,
poa_block_producer=poa_block_producer,
capabilities=capabilities,
)

if self._args.x_ipython_kernel:
Expand Down
8 changes: 8 additions & 0 deletions hathor/conf/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,14 @@ def from_yaml(cls, *, filepath: str) -> 'HathorSettings':
validators=_VALIDATORS
)

def get_default_capabilities(self) -> list[str]:
"""Return the default capabilities."""
return [
self.CAPABILITY_WHITELIST,
self.CAPABILITY_SYNC_VERSION,
self.CAPABILITY_GET_BEST_BLOCKCHAIN
]


def _parse_checkpoints(checkpoints: Union[dict[int, str], list[Checkpoint]]) -> list[Checkpoint]:
"""Parse a dictionary of raw checkpoint data into a list of checkpoints."""
Expand Down
15 changes: 2 additions & 13 deletions hathor/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,9 @@ def __init__(
execution_manager: ExecutionManager,
vertex_handler: VertexHandler,
vertex_parser: VertexParser,
capabilities: list[str],
hostname: Optional[str] = None,
wallet: Optional[BaseWallet] = None,
capabilities: Optional[list[str]] = None,
checkpoints: Optional[list[Checkpoint]] = None,
rng: Optional[Random] = None,
environment_info: Optional[EnvironmentInfo] = None,
Expand Down Expand Up @@ -231,10 +231,7 @@ def __init__(
self.peers_whitelist: list[PeerId] = []

# List of capabilities of the peer
if capabilities is not None:
self.capabilities = capabilities
else:
self.capabilities = self.get_default_capabilities()
self.capabilities = capabilities

# This is included in some logs to provide more context
self.environment_info = environment_info
Expand All @@ -246,14 +243,6 @@ def __init__(
self.lc_check_sync_state.clock = self.reactor
self.lc_check_sync_state_interval = self.CHECK_SYNC_STATE_INTERVAL

def get_default_capabilities(self) -> list[str]:
"""Return the default capabilities for this manager."""
return [
self._settings.CAPABILITY_WHITELIST,
self._settings.CAPABILITY_SYNC_VERSION,
self._settings.CAPABILITY_GET_BEST_BLOCKCHAIN
]

def start(self) -> None:
""" A factory must be started only once. And it is usually automatically started.
"""
Expand Down
20 changes: 11 additions & 9 deletions hathor/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,23 +248,25 @@ def collect_peer_connection_metrics(self) -> None:
self.peer_connection_metrics.clear()

for connection in self.connections.get_connected_peers():
if not connection._peer:
peer = connection.get_peer_if_set()
if not peer:
# A connection without peer will not be able to communicate
# So we can just discard it for the sake of the metrics
continue

metrics = connection.get_metrics()
metric = PeerConnectionMetrics(
connection_string=str(connection.addr),
peer_id=str(connection.peer.id),
network=settings.NETWORK_NAME,
received_messages=connection.metrics.received_messages,
sent_messages=connection.metrics.sent_messages,
received_bytes=connection.metrics.received_bytes,
sent_bytes=connection.metrics.sent_bytes,
received_txs=connection.metrics.received_txs,
discarded_txs=connection.metrics.discarded_txs,
received_blocks=connection.metrics.received_blocks,
discarded_blocks=connection.metrics.discarded_blocks,
received_messages=metrics.received_messages,
sent_messages=metrics.sent_messages,
received_bytes=metrics.received_bytes,
sent_bytes=metrics.sent_bytes,
received_txs=metrics.received_txs,
discarded_txs=metrics.discarded_txs,
received_blocks=metrics.received_blocks,
discarded_blocks=metrics.discarded_blocks,
)

self.peer_connection_metrics.append(metric)
Expand Down
33 changes: 21 additions & 12 deletions hathor/p2p/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,9 +322,7 @@ def has_synced_peer(self) -> bool:
"""
connections = list(self.iter_ready_connections())
for conn in connections:
assert conn.state is not None
assert isinstance(conn.state, ReadyState)
if conn.state.is_synced():
if conn.is_synced():
return True
return False

Expand All @@ -341,9 +339,7 @@ def send_tx_to_peers(self, tx: BaseTransaction) -> None:
connections = list(self.iter_ready_connections())
self.rng.shuffle(connections)
for conn in connections:
assert conn.state is not None
assert isinstance(conn.state, ReadyState)
conn.state.send_tx_to_peer(tx)
conn.send_tx_to_peer(tx)

def disconnect_all_peers(self, *, force: bool = False) -> None:
"""Disconnect all peers."""
Expand Down Expand Up @@ -408,10 +404,9 @@ def on_peer_ready(self, protocol: HathorProtocol) -> None:
def relay_peer_to_ready_connections(self, peer: PublicPeer) -> None:
"""Relay peer to all ready connections."""
for conn in self.iter_ready_connections():
if conn.peer == peer:
if conn.get_peer() == peer:
continue
assert isinstance(conn.state, ReadyState)
conn.state.send_peers([peer])
conn.send_peers([peer])

def on_handshake_disconnect(self, *, addr: PeerAddress) -> None:
"""Called when a peer disconnects from a handshaking state (HELLO or PEER-ID)."""
Expand Down Expand Up @@ -666,9 +661,9 @@ def _add_hostname_entrypoint(self, hostname: str, address: IPv4Address | IPv6Add
def drop_connection(self, protocol: HathorProtocol) -> None:
""" Drop a connection
"""
assert protocol.peer is not None
self.log.debug('dropping connection', peer_id=protocol.peer.id, protocol=type(protocol).__name__)
protocol.send_error_and_close_connection('Connection dropped')
protocol_peer = protocol.get_peer()
self.log.debug('dropping connection', peer_id=protocol_peer.id, protocol=type(protocol).__name__)
protocol.send_error_and_close_connection('Connection droped')

def drop_connection_by_peer_id(self, peer_id: PeerId) -> None:
""" Drop a connection by peer id
Expand Down Expand Up @@ -767,3 +762,17 @@ def reload_entrypoints_and_connections(self) -> None:
self.log.warn('Killing all connections and resetting entrypoints...')
self.disconnect_all_peers(force=True)
self.my_peer.reload_entrypoints_from_source_file()

def get_peers_whitelist(self) -> list[PeerId]:
assert self.manager is not None
return self.manager.peers_whitelist

def get_verified_peers(self) -> Iterable[PublicPeer]:
return self.verified_peer_storage.values()

def get_randbytes(self, n: int) -> bytes:
return self.rng.randbytes(n)

def is_peer_whitelisted(self, peer_id: PeerId) -> bool:
assert self.manager is not None
return peer_id in self.manager.peers_whitelist
24 changes: 23 additions & 1 deletion hathor/p2p/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import time
from enum import Enum
from typing import TYPE_CHECKING, Optional, cast
from typing import TYPE_CHECKING, Iterable, Optional, cast

from structlog import get_logger
from twisted.internet import defer
Expand All @@ -34,6 +34,7 @@
from hathor.p2p.sync_version import SyncVersion
from hathor.p2p.utils import format_address
from hathor.profiler import get_cpu_profiler
from hathor.transaction import BaseTransaction

if TYPE_CHECKING:
from hathor.manager import HathorManager # noqa: F401
Expand Down Expand Up @@ -410,6 +411,27 @@ def disable_sync(self) -> None:
self.log.info('disable sync')
self.state.sync_agent.disable_sync()

def is_synced(self) -> bool:
assert isinstance(self.state, ReadyState)
return self.state.is_synced()

def send_tx_to_peer(self, tx: BaseTransaction) -> None:
assert isinstance(self.state, ReadyState)
return self.state.send_tx_to_peer(tx)

def get_peer(self) -> PublicPeer:
return self.peer

def get_peer_if_set(self) -> PublicPeer | None:
return self._peer

def send_peers(self, peers: Iterable[PublicPeer]) -> None:
assert isinstance(self.state, ReadyState)
self.state.send_peers(peers)

def get_metrics(self) -> 'ConnectionMetrics':
return self.metrics


class HathorLineReceiver(LineReceiver, HathorProtocol):
""" Implements HathorProtocol in a LineReceiver protocol.
Expand Down
2 changes: 1 addition & 1 deletion hathor/p2p/states/peer_id.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def _should_block_peer(self, peer_id: PeerId) -> bool:
Currently this is only because the peer is not in a whitelist and whitelist blocking is active.
"""
peer_is_whitelisted = peer_id in self.protocol.node.peers_whitelist
peer_is_whitelisted = self.protocol.connections.is_peer_whitelisted(peer_id)
# never block whitelisted peers
if peer_is_whitelisted:
return False
Expand Down
5 changes: 2 additions & 3 deletions hathor/p2p/states/ready.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def handle_get_peers(self, payload: str) -> None:
""" Executed when a GET-PEERS command is received. It just responds with
a list of all known peers.
"""
for peer in self.protocol.connections.verified_peer_storage.values():
for peer in self.protocol.connections.get_verified_peers():
self.send_peers([peer])

def send_peers(self, peer_list: Iterable[PublicPeer]) -> None:
Expand Down Expand Up @@ -195,8 +195,7 @@ def send_ping(self) -> None:
"""
# Add a salt number to prevent peers from faking rtt.
self.ping_start_time = self.reactor.seconds()
rng = self.protocol.connections.rng
self.ping_salt = rng.randbytes(self.ping_salt_size).hex()
self.ping_salt = self.protocol.connections.get_randbytes(self.ping_salt_size).hex()
self.send_message(ProtocolMessages.PING, self.ping_salt)

def send_pong(self, salt: str) -> None:
Expand Down
12 changes: 5 additions & 7 deletions hathor/p2p/sync_v2/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from hathor.p2p.sync_v2.transaction_streaming_client import TransactionStreamingClient
from hathor.reactor import ReactorProtocol as Reactor
from hathor.transaction import BaseTransaction, Block, Transaction
from hathor.transaction.genesis import is_genesis
from hathor.transaction.storage.exceptions import TransactionDoesNotExist
from hathor.transaction.vertex_parser import VertexParser
from hathor.types import VertexId
Expand Down Expand Up @@ -381,7 +382,7 @@ def run_sync_blocks(self) -> Generator[Any, Any, bool]:
# Not synced but same blockchain?
if self.peer_best_block.height <= my_best_block.height:
# Is peer behind me at the same blockchain?
common_block_hash = self.tx_storage.indexes.height.get(self.peer_best_block.height)
common_block_hash = self.tx_storage.get_block_id_by_height(self.peer_best_block.height)
if common_block_hash == self.peer_best_block.id:
# If yes, nothing to sync from this peer.
if not self.is_synced():
Expand Down Expand Up @@ -459,15 +460,13 @@ def send_get_tips(self) -> None:
def handle_get_tips(self, _payload: str) -> None:
""" Handle a GET-TIPS message.
"""
assert self.tx_storage.indexes is not None
assert self.tx_storage.indexes.mempool_tips is not None
if self._is_streaming:
self.log.warn('can\'t send while streaming') # XXX: or can we?
self.send_message(ProtocolMessages.MEMPOOL_END)
return
self.log.debug('handle_get_tips')
# TODO Use a streaming of tips
for tx_id in self.tx_storage.indexes.mempool_tips.get():
for tx_id in self.tx_storage.get_mempool_tips():
self.send_tips(tx_id)
self.log.debug('tips end')
self.send_message(ProtocolMessages.TIPS_END)
Expand Down Expand Up @@ -643,15 +642,14 @@ def send_get_peer_block_hashes(self, heights: list[int]) -> None:
def handle_get_peer_block_hashes(self, payload: str) -> None:
""" Handle a GET-PEER-BLOCK-HASHES message.
"""
assert self.tx_storage.indexes is not None
heights = json.loads(payload)
if len(heights) > 20:
self.log.info('too many heights', heights_qty=len(heights))
self.protocol.send_error_and_close_connection('GET-PEER-BLOCK-HASHES: too many heights')
return
data = []
for h in heights:
blk_hash = self.tx_storage.indexes.height.get(h)
blk_hash = self.tx_storage.get_block_id_by_height(h)
if blk_hash is None:
break
blk = self.tx_storage.get_transaction(blk_hash)
Expand Down Expand Up @@ -1152,7 +1150,7 @@ def handle_data(self, payload: str) -> None:
return

assert tx is not None
if self.protocol.node.tx_storage.get_genesis(tx.hash):
if is_genesis(tx.hash, settings=self._settings):
# We just got the data of a genesis tx/block. What should we do?
# Will it reduce peer reputation score?
return
Expand Down
1 change: 0 additions & 1 deletion hathor/p2p/sync_v2/blockchain_streaming_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ def handle_blocks(self, blk: Block) -> None:
if self.tx_storage.partial_vertex_exists(blk.hash):
# We reached a block we already have. Skip it.
self._blk_repeated += 1
is_duplicated = True
if self._blk_repeated > self.max_repeated_blocks:
self.log.info('too many repeated block received', total_repeated=self._blk_repeated)
self.fails(TooManyRepeatedVerticesError())
Expand Down
15 changes: 11 additions & 4 deletions hathor/transaction/storage/transaction_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -533,12 +533,14 @@ def get_transaction(self, hash_bytes: bytes) -> BaseTransaction:
self.post_get_validation(tx)
return tx

def get_block_by_height(self, height: int) -> Optional[Block]:
"""Return a block in the best blockchain from the height index. This is fast."""
def get_block_id_by_height(self, height: int) -> VertexId | None:
assert self.indexes is not None
ancestor_hash = self.indexes.height.get(height)
return self.indexes.height.get(height)

return None if ancestor_hash is None else self.get_block(ancestor_hash)
def get_block_by_height(self, height: int) -> Optional[Block]:
"""Return a block in the best blockchain from the height index. This is fast."""
block_id = self.get_block_id_by_height(height)
return None if block_id is None else self.get_block(block_id)

def get_metadata(self, hash_bytes: bytes) -> Optional[TransactionMetadata]:
"""Returns the transaction metadata with hash `hash_bytes`.
Expand Down Expand Up @@ -1137,6 +1139,11 @@ def partial_vertex_exists(self, vertex_id: VertexId) -> bool:
with self.allow_partially_validated_context():
return self.transaction_exists(vertex_id)

def get_mempool_tips(self) -> set[VertexId]:
assert self.indexes is not None
assert self.indexes.mempool_tips is not None
return self.indexes.mempool_tips.get()


class BaseTransactionStorage(TransactionStorage):
indexes: Optional[IndexesManager]
Expand Down
2 changes: 1 addition & 1 deletion tests/p2p/test_get_best_blockchain.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ def test_node_without_get_best_blockchain_capability(self) -> None:
protocol2 = connected_peers1[0]
self.assertTrue(protocol2.capabilities.issuperset(set(cababilities_without_get_best_blockchain)))
protocol1 = connected_peers2[0]
default_capabilities = manager2.get_default_capabilities()
default_capabilities = self._settings.get_default_capabilities()
self.assertTrue(protocol1.capabilities.issuperset(set(default_capabilities)))

# assert the peers don't engage in get_best_blockchain messages
Expand Down

0 comments on commit 3b27e43

Please sign in to comment.