diff --git a/hathor/builder/builder.py b/hathor/builder/builder.py index 228791355..ea3afd8f7 100644 --- a/hathor/builder/builder.py +++ b/hathor/builder/builder.py @@ -614,7 +614,6 @@ def _get_or_create_vertex_handler(self) -> VertexHandler: tx_storage=self._get_or_create_tx_storage(), verification_service=self._get_or_create_verification_service(), consensus=self._get_or_create_consensus(), - p2p_manager=self._get_or_create_p2p_manager(), feature_service=self._get_or_create_feature_service(), pubsub=self._get_or_create_pubsub(), wallet=self._get_or_create_wallet(), diff --git a/hathor/builder/cli_builder.py b/hathor/builder/cli_builder.py index 493d31b6d..b2ffc747a 100644 --- a/hathor/builder/cli_builder.py +++ b/hathor/builder/cli_builder.py @@ -334,7 +334,6 @@ def create_manager(self, reactor: Reactor) -> HathorManager: tx_storage=tx_storage, verification_service=verification_service, consensus=consensus_algorithm, - p2p_manager=p2p_manager, feature_service=self.feature_service, pubsub=pubsub, wallet=self.wallet, diff --git a/hathor/manager.py b/hathor/manager.py index 0b6bd94b1..ce40cd896 100644 --- a/hathor/manager.py +++ b/hathor/manager.py @@ -48,7 +48,6 @@ from hathor.p2p.manager import ConnectionsManager from hathor.p2p.peer import PrivatePeer from hathor.p2p.peer_id import PeerId -from hathor.profiler import get_cpu_profiler from hathor.pubsub import HathorEvents, PubSubManager from hathor.reactor import ReactorProtocol as Reactor from hathor.reward_lock import is_spent_reward_locked @@ -69,7 +68,6 @@ from hathor.websocket.factory import HathorAdminWebsocketFactory logger = get_logger() -cpu = get_cpu_profiler() class HathorManager: @@ -171,8 +169,6 @@ def __init__( self.is_started: bool = False - self.cpu = cpu - # XXX: first checkpoint must be genesis (height=0) self.checkpoints: list[Checkpoint] = checkpoints or [] self.checkpoints_ready: list[bool] = [False] * len(self.checkpoints) @@ -960,7 +956,6 @@ def propagate_tx(self, tx: BaseTransaction, fails_silently: bool = True) -> bool return self.on_new_tx(tx, fails_silently=fails_silently, propagate_to_peers=True) - @cpu.profiler('on_new_tx') def on_new_tx( self, tx: BaseTransaction, @@ -977,14 +972,18 @@ def on_new_tx( :param fails_silently: if False will raise an exception when tx cannot be added :param propagate_to_peers: if True will relay the tx to other peers if it is accepted """ - return self.vertex_handler.on_new_vertex( + success = self.vertex_handler.on_new_vertex( tx, quiet=quiet, fails_silently=fails_silently, - propagate_to_peers=propagate_to_peers, reject_locked_reward=reject_locked_reward, ) + if propagate_to_peers and success: + self.connections.send_tx_to_peers(tx) + + return success + def has_sync_version_capability(self) -> bool: return self._settings.CAPABILITY_SYNC_VERSION in self.capabilities diff --git a/hathor/p2p/sync_v1/agent.py b/hathor/p2p/sync_v1/agent.py index 72ef6b6d0..68fe401ec 100644 --- a/hathor/p2p/sync_v1/agent.py +++ b/hathor/p2p/sync_v1/agent.py @@ -636,8 +636,10 @@ def handle_data(self, payload: str) -> None: self.log.info('tx received in real time from peer', tx=tx.hash_hex, peer=self.protocol.get_peer_id()) # If we have not requested the data, it is a new transaction being propagated # in the network, thus, we propagate it as well. - result = self.manager.on_new_tx(tx, propagate_to_peers=True) - self.update_received_stats(tx, result) + success = self.manager.vertex_handler.on_new_vertex(tx) + if success: + self.protocol.connections.send_tx_to_peers(tx) + self.update_received_stats(tx, success) def update_received_stats(self, tx: 'BaseTransaction', result: bool) -> None: """ Update protocol metrics when receiving a new tx @@ -685,7 +687,9 @@ def on_tx_success(self, tx: 'BaseTransaction') -> 'BaseTransaction': success = True else: # Add tx to the DAG. - success = self.manager.on_new_tx(tx) + success = self.manager.vertex_handler.on_new_vertex(tx) + if success: + self.protocol.connections.send_tx_to_peers(tx) # Updating stats data self.update_received_stats(tx, success) return tx diff --git a/hathor/p2p/sync_v2/agent.py b/hathor/p2p/sync_v2/agent.py index b27ced303..5393080b4 100644 --- a/hathor/p2p/sync_v2/agent.py +++ b/hathor/p2p/sync_v2/agent.py @@ -488,7 +488,9 @@ def handle_tips(self, payload: str) -> None: data = [bytes.fromhex(x) for x in data] # filter-out txs we already have try: - self._receiving_tips.extend(VertexId(tx_id) for tx_id in data if not self.partial_vertex_exists(tx_id)) + self._receiving_tips.extend( + VertexId(tx_id) for tx_id in data if not self.tx_storage.partial_vertex_exists(tx_id) + ) except ValueError: self.protocol.send_error_and_close_connection('Invalid trasaction ID received') # XXX: it's OK to do this *after* the extend because the payload is limited by the line protocol @@ -553,12 +555,6 @@ def send_message(self, cmd: ProtocolMessages, payload: Optional[str] = None) -> assert self.protocol.state is not None self.protocol.state.send_message(cmd, payload) - def partial_vertex_exists(self, vertex_id: VertexId) -> bool: - """ Return true if the vertex exists no matter its validation state. - """ - with self.tx_storage.allow_partially_validated_context(): - return self.tx_storage.transaction_exists(vertex_id) - @inlineCallbacks def find_best_common_block(self, my_best_block: _HeightInfo, @@ -621,11 +617,11 @@ def on_block_complete(self, blk: Block, vertex_list: list[BaseTransaction]) -> G try: for tx in vertex_list: if not self.tx_storage.transaction_exists(tx.hash): - self.vertex_handler.on_new_vertex(tx, propagate_to_peers=False, fails_silently=False) + self.vertex_handler.on_new_vertex(tx, fails_silently=False) yield deferLater(self.reactor, 0, lambda: None) if not self.tx_storage.transaction_exists(blk.hash): - self.vertex_handler.on_new_vertex(blk, propagate_to_peers=False, fails_silently=False) + self.vertex_handler.on_new_vertex(blk, fails_silently=False) except InvalidNewTransaction: self.protocol.send_error_and_close_connection('invalid vertex received') @@ -1163,7 +1159,7 @@ def handle_data(self, payload: str) -> None: tx.storage = self.protocol.node.tx_storage - if self.partial_vertex_exists(tx.hash): + if self.tx_storage.partial_vertex_exists(tx.hash): # transaction already added to the storage, ignore it # XXX: maybe we could add a hash blacklist and punish peers propagating known bad txs self.tx_storage.compare_bytes_with_local_tx(tx) @@ -1174,7 +1170,9 @@ def handle_data(self, payload: str) -> None: if self.tx_storage.can_validate_full(tx): self.log.debug('tx received in real time from peer', tx=tx.hash_hex, peer=self.protocol.get_peer_id()) try: - self.vertex_handler.on_new_vertex(tx, propagate_to_peers=True, fails_silently=False) + success = self.vertex_handler.on_new_vertex(tx, fails_silently=False) + if success: + self.protocol.connections.send_tx_to_peers(tx) except InvalidNewTransaction: self.protocol.send_error_and_close_connection('invalid vertex received') else: diff --git a/hathor/p2p/sync_v2/blockchain_streaming_client.py b/hathor/p2p/sync_v2/blockchain_streaming_client.py index 6f0a3f236..e78ec056b 100644 --- a/hathor/p2p/sync_v2/blockchain_streaming_client.py +++ b/hathor/p2p/sync_v2/blockchain_streaming_client.py @@ -27,7 +27,6 @@ from hathor.p2p.sync_v2.streamers import StreamEnd from hathor.transaction import Block from hathor.transaction.exceptions import HathorError -from hathor.types import VertexId if TYPE_CHECKING: from hathor.p2p.sync_v2.agent import NodeBlockSync, _HeightInfo @@ -75,11 +74,6 @@ def fails(self, reason: 'StreamingError') -> None: """Fail the execution by resolving the deferred with an error.""" self._deferred.errback(reason) - def partial_vertex_exists(self, vertex_id: VertexId) -> bool: - """Return true if the vertex exists no matter its validation state.""" - with self.tx_storage.allow_partially_validated_context(): - return self.tx_storage.transaction_exists(vertex_id) - def handle_blocks(self, blk: Block) -> None: """This method is called by the sync agent when a BLOCKS message is received.""" if self._deferred.called: @@ -105,7 +99,7 @@ def handle_blocks(self, blk: Block) -> None: # Check for repeated blocks. is_duplicated = False - if self.partial_vertex_exists(blk.hash): + if self.tx_storage.partial_vertex_exists(blk.hash): # We reached a block we already have. Skip it. self._blk_repeated += 1 is_duplicated = True @@ -132,7 +126,7 @@ def handle_blocks(self, blk: Block) -> None: if self.tx_storage.can_validate_full(blk): try: - self.vertex_handler.on_new_vertex(blk, propagate_to_peers=False, fails_silently=False) + self.vertex_handler.on_new_vertex(blk, fails_silently=False) except HathorError: self.fails(InvalidVertexError(blk.hash.hex())) return diff --git a/hathor/p2p/sync_v2/mempool.py b/hathor/p2p/sync_v2/mempool.py index 806c16849..03651642e 100644 --- a/hathor/p2p/sync_v2/mempool.py +++ b/hathor/p2p/sync_v2/mempool.py @@ -140,7 +140,9 @@ def _add_tx(self, tx: BaseTransaction) -> None: if self.tx_storage.transaction_exists(tx.hash): return try: - self.vertex_handler.on_new_vertex(tx, fails_silently=False) + success = self.vertex_handler.on_new_vertex(tx, fails_silently=False) + if success: + self.sync_agent.protocol.connections.send_tx_to_peers(tx) except InvalidNewTransaction: self.sync_agent.protocol.send_error_and_close_connection('invalid vertex received') raise diff --git a/hathor/profiler/cpu.py b/hathor/profiler/cpu.py index 63064df7e..34bad0512 100644 --- a/hathor/profiler/cpu.py +++ b/hathor/profiler/cpu.py @@ -15,12 +15,15 @@ import time from collections import defaultdict from functools import wraps -from typing import Any, Callable, Union +from typing import Callable, ParamSpec, TypeVar, Union from twisted.internet.task import LoopingCall Key = tuple[str, ...] +T = TypeVar('T') +P = ParamSpec('P') + class ProcItem: """Store information for each process.""" @@ -184,7 +187,7 @@ def update(self) -> None: t1 = time.process_time() self.measures[('profiler',)].add_time(t1 - t0) - def profiler(self, key: Union[str, Callable[..., str]]) -> Callable[[Callable[..., Any]], Any]: + def profiler(self, key: Union[str, Callable[..., str]]) -> Callable[[Callable[P, T]], Callable[P, T]]: """Decorator to collect data. The `key` must be the key itself or a method that returns the key. diff --git a/hathor/transaction/storage/transaction_storage.py b/hathor/transaction/storage/transaction_storage.py index f91e3b795..82a364b92 100644 --- a/hathor/transaction/storage/transaction_storage.py +++ b/hathor/transaction/storage/transaction_storage.py @@ -1154,6 +1154,11 @@ def can_validate_full(self, vertex: Vertex) -> bool: return True return all_exist and all_valid + def partial_vertex_exists(self, vertex_id: VertexId) -> bool: + """Return true if the vertex exists no matter its validation state.""" + with self.allow_partially_validated_context(): + return self.transaction_exists(vertex_id) + class BaseTransactionStorage(TransactionStorage): indexes: Optional[IndexesManager] diff --git a/hathor/vertex_handler/vertex_handler.py b/hathor/vertex_handler/vertex_handler.py index 473516a56..59650e83e 100644 --- a/hathor/vertex_handler/vertex_handler.py +++ b/hathor/vertex_handler/vertex_handler.py @@ -20,7 +20,7 @@ from hathor.consensus import ConsensusAlgorithm from hathor.exception import HathorError, InvalidNewTransaction from hathor.feature_activation.feature_service import FeatureService -from hathor.p2p.manager import ConnectionsManager +from hathor.profiler import get_cpu_profiler from hathor.pubsub import HathorEvents, PubSubManager from hathor.reactor import ReactorProtocol from hathor.transaction import BaseTransaction, Block @@ -30,6 +30,7 @@ from hathor.wallet import BaseWallet logger = get_logger() +cpu = get_cpu_profiler() class VertexHandler: @@ -40,7 +41,6 @@ class VertexHandler: '_tx_storage', '_verification_service', '_consensus', - '_p2p_manager', '_feature_service', '_pubsub', '_wallet', @@ -55,7 +55,6 @@ def __init__( tx_storage: TransactionStorage, verification_service: VerificationService, consensus: ConsensusAlgorithm, - p2p_manager: ConnectionsManager, feature_service: FeatureService, pubsub: PubSubManager, wallet: BaseWallet | None, @@ -67,19 +66,18 @@ def __init__( self._tx_storage = tx_storage self._verification_service = verification_service self._consensus = consensus - self._p2p_manager = p2p_manager self._feature_service = feature_service self._pubsub = pubsub self._wallet = wallet self._log_vertex_bytes = log_vertex_bytes + @cpu.profiler('on_new_vertex') def on_new_vertex( self, vertex: BaseTransaction, *, quiet: bool = False, fails_silently: bool = True, - propagate_to_peers: bool = True, reject_locked_reward: bool = True, ) -> bool: """ New method for adding transactions or blocks that steps the validation state machine. @@ -87,7 +85,6 @@ def on_new_vertex( :param vertex: transaction to be added :param quiet: if True will not log when a new tx is accepted :param fails_silently: if False will raise an exception when tx cannot be added - :param propagate_to_peers: if True will relay the tx to other peers if it is accepted """ is_valid = self._validate_vertex( vertex, @@ -102,7 +99,6 @@ def on_new_vertex( self._post_consensus( vertex, quiet=quiet, - propagate_to_peers=propagate_to_peers, reject_locked_reward=reject_locked_reward ) @@ -177,7 +173,6 @@ def _post_consensus( vertex: BaseTransaction, *, quiet: bool, - propagate_to_peers: bool, reject_locked_reward: bool, ) -> None: """ Handle operations that need to happen once the tx becomes fully validated. @@ -208,10 +203,6 @@ def _post_consensus( self._log_new_object(vertex, 'new {}', quiet=quiet) - if propagate_to_peers: - # Propagate to our peers. - self._p2p_manager.send_tx_to_peers(vertex) - def _log_new_object(self, tx: BaseTransaction, message_fmt: str, *, quiet: bool) -> None: """ A shortcut for logging additional information for block/txs. """