Skip to content

Commit

Permalink
refactor(vertex-handler): remove p2p_manager dependency
Browse files Browse the repository at this point in the history
  • Loading branch information
glevco committed Oct 18, 2024
1 parent d569b9a commit b35cea4
Show file tree
Hide file tree
Showing 10 changed files with 40 additions and 46 deletions.
1 change: 0 additions & 1 deletion hathor/builder/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,6 @@ def _get_or_create_vertex_handler(self) -> VertexHandler:
tx_storage=self._get_or_create_tx_storage(),
verification_service=self._get_or_create_verification_service(),
consensus=self._get_or_create_consensus(),
p2p_manager=self._get_or_create_p2p_manager(),
feature_service=self._get_or_create_feature_service(),
pubsub=self._get_or_create_pubsub(),
wallet=self._get_or_create_wallet(),
Expand Down
1 change: 0 additions & 1 deletion hathor/builder/cli_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,6 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
tx_storage=tx_storage,
verification_service=verification_service,
consensus=consensus_algorithm,
p2p_manager=p2p_manager,
feature_service=self.feature_service,
pubsub=pubsub,
wallet=self.wallet,
Expand Down
13 changes: 6 additions & 7 deletions hathor/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
from hathor.p2p.manager import ConnectionsManager
from hathor.p2p.peer import PrivatePeer
from hathor.p2p.peer_id import PeerId
from hathor.profiler import get_cpu_profiler
from hathor.pubsub import HathorEvents, PubSubManager
from hathor.reactor import ReactorProtocol as Reactor
from hathor.reward_lock import is_spent_reward_locked
Expand All @@ -69,7 +68,6 @@
from hathor.websocket.factory import HathorAdminWebsocketFactory

logger = get_logger()
cpu = get_cpu_profiler()


class HathorManager:
Expand Down Expand Up @@ -171,8 +169,6 @@ def __init__(

self.is_started: bool = False

self.cpu = cpu

# XXX: first checkpoint must be genesis (height=0)
self.checkpoints: list[Checkpoint] = checkpoints or []
self.checkpoints_ready: list[bool] = [False] * len(self.checkpoints)
Expand Down Expand Up @@ -960,7 +956,6 @@ def propagate_tx(self, tx: BaseTransaction, fails_silently: bool = True) -> bool

return self.on_new_tx(tx, fails_silently=fails_silently, propagate_to_peers=True)

@cpu.profiler('on_new_tx')
def on_new_tx(
self,
tx: BaseTransaction,
Expand All @@ -977,14 +972,18 @@ def on_new_tx(
:param fails_silently: if False will raise an exception when tx cannot be added
:param propagate_to_peers: if True will relay the tx to other peers if it is accepted
"""
return self.vertex_handler.on_new_vertex(
success = self.vertex_handler.on_new_vertex(
tx,
quiet=quiet,
fails_silently=fails_silently,
propagate_to_peers=propagate_to_peers,
reject_locked_reward=reject_locked_reward,
)

if propagate_to_peers and success:
self.connections.send_tx_to_peers(tx)

return success

def has_sync_version_capability(self) -> bool:
return self._settings.CAPABILITY_SYNC_VERSION in self.capabilities

Expand Down
10 changes: 7 additions & 3 deletions hathor/p2p/sync_v1/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -636,8 +636,10 @@ def handle_data(self, payload: str) -> None:
self.log.info('tx received in real time from peer', tx=tx.hash_hex, peer=self.protocol.get_peer_id())
# If we have not requested the data, it is a new transaction being propagated
# in the network, thus, we propagate it as well.
result = self.manager.on_new_tx(tx, propagate_to_peers=True)
self.update_received_stats(tx, result)
success = self.manager.vertex_handler.on_new_vertex(tx)
if success:
self.protocol.connections.send_tx_to_peers(tx)
self.update_received_stats(tx, success)

def update_received_stats(self, tx: 'BaseTransaction', result: bool) -> None:
""" Update protocol metrics when receiving a new tx
Expand Down Expand Up @@ -685,7 +687,9 @@ def on_tx_success(self, tx: 'BaseTransaction') -> 'BaseTransaction':
success = True
else:
# Add tx to the DAG.
success = self.manager.on_new_tx(tx)
success = self.manager.vertex_handler.on_new_vertex(tx)
if success:
self.protocol.connections.send_tx_to_peers(tx)
# Updating stats data
self.update_received_stats(tx, success)
return tx
Expand Down
20 changes: 9 additions & 11 deletions hathor/p2p/sync_v2/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,9 @@ def handle_tips(self, payload: str) -> None:
data = [bytes.fromhex(x) for x in data]
# filter-out txs we already have
try:
self._receiving_tips.extend(VertexId(tx_id) for tx_id in data if not self.partial_vertex_exists(tx_id))
self._receiving_tips.extend(
VertexId(tx_id) for tx_id in data if not self.tx_storage.partial_vertex_exists(tx_id)
)
except ValueError:
self.protocol.send_error_and_close_connection('Invalid trasaction ID received')
# XXX: it's OK to do this *after* the extend because the payload is limited by the line protocol
Expand Down Expand Up @@ -553,12 +555,6 @@ def send_message(self, cmd: ProtocolMessages, payload: Optional[str] = None) ->
assert self.protocol.state is not None
self.protocol.state.send_message(cmd, payload)

def partial_vertex_exists(self, vertex_id: VertexId) -> bool:
""" Return true if the vertex exists no matter its validation state.
"""
with self.tx_storage.allow_partially_validated_context():
return self.tx_storage.transaction_exists(vertex_id)

@inlineCallbacks
def find_best_common_block(self,
my_best_block: _HeightInfo,
Expand Down Expand Up @@ -621,11 +617,11 @@ def on_block_complete(self, blk: Block, vertex_list: list[BaseTransaction]) -> G
try:
for tx in vertex_list:
if not self.tx_storage.transaction_exists(tx.hash):
self.vertex_handler.on_new_vertex(tx, propagate_to_peers=False, fails_silently=False)
self.vertex_handler.on_new_vertex(tx, fails_silently=False)
yield deferLater(self.reactor, 0, lambda: None)

if not self.tx_storage.transaction_exists(blk.hash):
self.vertex_handler.on_new_vertex(blk, propagate_to_peers=False, fails_silently=False)
self.vertex_handler.on_new_vertex(blk, fails_silently=False)
except InvalidNewTransaction:
self.protocol.send_error_and_close_connection('invalid vertex received')

Expand Down Expand Up @@ -1163,7 +1159,7 @@ def handle_data(self, payload: str) -> None:

tx.storage = self.protocol.node.tx_storage

if self.partial_vertex_exists(tx.hash):
if self.tx_storage.partial_vertex_exists(tx.hash):
# transaction already added to the storage, ignore it
# XXX: maybe we could add a hash blacklist and punish peers propagating known bad txs
self.tx_storage.compare_bytes_with_local_tx(tx)
Expand All @@ -1174,7 +1170,9 @@ def handle_data(self, payload: str) -> None:
if self.tx_storage.can_validate_full(tx):
self.log.debug('tx received in real time from peer', tx=tx.hash_hex, peer=self.protocol.get_peer_id())
try:
self.vertex_handler.on_new_vertex(tx, propagate_to_peers=True, fails_silently=False)
success = self.vertex_handler.on_new_vertex(tx, fails_silently=False)
if success:
self.protocol.connections.send_tx_to_peers(tx)
except InvalidNewTransaction:
self.protocol.send_error_and_close_connection('invalid vertex received')
else:
Expand Down
10 changes: 2 additions & 8 deletions hathor/p2p/sync_v2/blockchain_streaming_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
from hathor.p2p.sync_v2.streamers import StreamEnd
from hathor.transaction import Block
from hathor.transaction.exceptions import HathorError
from hathor.types import VertexId

if TYPE_CHECKING:
from hathor.p2p.sync_v2.agent import NodeBlockSync, _HeightInfo
Expand Down Expand Up @@ -75,11 +74,6 @@ def fails(self, reason: 'StreamingError') -> None:
"""Fail the execution by resolving the deferred with an error."""
self._deferred.errback(reason)

def partial_vertex_exists(self, vertex_id: VertexId) -> bool:
"""Return true if the vertex exists no matter its validation state."""
with self.tx_storage.allow_partially_validated_context():
return self.tx_storage.transaction_exists(vertex_id)

def handle_blocks(self, blk: Block) -> None:
"""This method is called by the sync agent when a BLOCKS message is received."""
if self._deferred.called:
Expand All @@ -105,7 +99,7 @@ def handle_blocks(self, blk: Block) -> None:

# Check for repeated blocks.
is_duplicated = False
if self.partial_vertex_exists(blk.hash):
if self.tx_storage.partial_vertex_exists(blk.hash):
# We reached a block we already have. Skip it.
self._blk_repeated += 1
is_duplicated = True
Expand All @@ -132,7 +126,7 @@ def handle_blocks(self, blk: Block) -> None:

if self.tx_storage.can_validate_full(blk):
try:
self.vertex_handler.on_new_vertex(blk, propagate_to_peers=False, fails_silently=False)
self.vertex_handler.on_new_vertex(blk, fails_silently=False)
except HathorError:
self.fails(InvalidVertexError(blk.hash.hex()))
return
Expand Down
4 changes: 3 additions & 1 deletion hathor/p2p/sync_v2/mempool.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,9 @@ def _add_tx(self, tx: BaseTransaction) -> None:
if self.tx_storage.transaction_exists(tx.hash):
return
try:
self.vertex_handler.on_new_vertex(tx, fails_silently=False)
success = self.vertex_handler.on_new_vertex(tx, fails_silently=False)
if success:
self.sync_agent.protocol.connections.send_tx_to_peers(tx)
except InvalidNewTransaction:
self.sync_agent.protocol.send_error_and_close_connection('invalid vertex received')
raise
7 changes: 5 additions & 2 deletions hathor/profiler/cpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@
import time
from collections import defaultdict
from functools import wraps
from typing import Any, Callable, Union
from typing import Callable, ParamSpec, TypeVar, Union

from twisted.internet.task import LoopingCall

Key = tuple[str, ...]

T = TypeVar('T')
P = ParamSpec('P')


class ProcItem:
"""Store information for each process."""
Expand Down Expand Up @@ -184,7 +187,7 @@ def update(self) -> None:
t1 = time.process_time()
self.measures[('profiler',)].add_time(t1 - t0)

def profiler(self, key: Union[str, Callable[..., str]]) -> Callable[[Callable[..., Any]], Any]:
def profiler(self, key: Union[str, Callable[..., str]]) -> Callable[[Callable[P, T]], Callable[P, T]]:
"""Decorator to collect data. The `key` must be the key itself
or a method that returns the key.
Expand Down
5 changes: 5 additions & 0 deletions hathor/transaction/storage/transaction_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -1154,6 +1154,11 @@ def can_validate_full(self, vertex: Vertex) -> bool:
return True
return all_exist and all_valid

def partial_vertex_exists(self, vertex_id: VertexId) -> bool:
"""Return true if the vertex exists no matter its validation state."""
with self.allow_partially_validated_context():
return self.transaction_exists(vertex_id)


class BaseTransactionStorage(TransactionStorage):
indexes: Optional[IndexesManager]
Expand Down
15 changes: 3 additions & 12 deletions hathor/vertex_handler/vertex_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from hathor.consensus import ConsensusAlgorithm
from hathor.exception import HathorError, InvalidNewTransaction
from hathor.feature_activation.feature_service import FeatureService
from hathor.p2p.manager import ConnectionsManager
from hathor.profiler import get_cpu_profiler
from hathor.pubsub import HathorEvents, PubSubManager
from hathor.reactor import ReactorProtocol
from hathor.transaction import BaseTransaction, Block
Expand All @@ -30,6 +30,7 @@
from hathor.wallet import BaseWallet

logger = get_logger()
cpu = get_cpu_profiler()


class VertexHandler:
Expand All @@ -40,7 +41,6 @@ class VertexHandler:
'_tx_storage',
'_verification_service',
'_consensus',
'_p2p_manager',
'_feature_service',
'_pubsub',
'_wallet',
Expand All @@ -55,7 +55,6 @@ def __init__(
tx_storage: TransactionStorage,
verification_service: VerificationService,
consensus: ConsensusAlgorithm,
p2p_manager: ConnectionsManager,
feature_service: FeatureService,
pubsub: PubSubManager,
wallet: BaseWallet | None,
Expand All @@ -67,27 +66,25 @@ def __init__(
self._tx_storage = tx_storage
self._verification_service = verification_service
self._consensus = consensus
self._p2p_manager = p2p_manager
self._feature_service = feature_service
self._pubsub = pubsub
self._wallet = wallet
self._log_vertex_bytes = log_vertex_bytes

@cpu.profiler('on_new_vertex')
def on_new_vertex(
self,
vertex: BaseTransaction,
*,
quiet: bool = False,
fails_silently: bool = True,
propagate_to_peers: bool = True,
reject_locked_reward: bool = True,
) -> bool:
""" New method for adding transactions or blocks that steps the validation state machine.
:param vertex: transaction to be added
:param quiet: if True will not log when a new tx is accepted
:param fails_silently: if False will raise an exception when tx cannot be added
:param propagate_to_peers: if True will relay the tx to other peers if it is accepted
"""
is_valid = self._validate_vertex(
vertex,
Expand All @@ -102,7 +99,6 @@ def on_new_vertex(
self._post_consensus(
vertex,
quiet=quiet,
propagate_to_peers=propagate_to_peers,
reject_locked_reward=reject_locked_reward
)

Expand Down Expand Up @@ -177,7 +173,6 @@ def _post_consensus(
vertex: BaseTransaction,
*,
quiet: bool,
propagate_to_peers: bool,
reject_locked_reward: bool,
) -> None:
""" Handle operations that need to happen once the tx becomes fully validated.
Expand Down Expand Up @@ -208,10 +203,6 @@ def _post_consensus(

self._log_new_object(vertex, 'new {}', quiet=quiet)

if propagate_to_peers:
# Propagate to our peers.
self._p2p_manager.send_tx_to_peers(vertex)

def _log_new_object(self, tx: BaseTransaction, message_fmt: str, *, quiet: bool) -> None:
""" A shortcut for logging additional information for block/txs.
"""
Expand Down

0 comments on commit b35cea4

Please sign in to comment.