diff --git a/hathor/builder/builder.py b/hathor/builder/builder.py index 7d4398ae5..0a107d20e 100644 --- a/hathor/builder/builder.py +++ b/hathor/builder/builder.py @@ -34,6 +34,7 @@ from hathor.indexes import IndexesManager, MemoryIndexesManager, RocksDBIndexesManager from hathor.manager import HathorManager from hathor.mining.cpu_mining_service import CpuMiningService +from hathor.p2p import P2PDependencies from hathor.p2p.manager import ConnectionsManager from hathor.p2p.peer import PrivatePeer from hathor.pubsub import PubSubManager @@ -64,12 +65,10 @@ class SyncSupportLevel(IntEnum): @classmethod def add_factories( cls, - settings: HathorSettingsType, p2p_manager: ConnectionsManager, + dependencies: P2PDependencies, sync_v1_support: 'SyncSupportLevel', sync_v2_support: 'SyncSupportLevel', - vertex_parser: VertexParser, - vertex_handler: VertexHandler, ) -> None: """Adds the sync factory to the manager according to the support level.""" from hathor.p2p.sync_v1.factory import SyncV11Factory @@ -78,18 +77,12 @@ def add_factories( # sync-v1 support: if sync_v1_support > cls.UNAVAILABLE: - p2p_manager.add_sync_factory(SyncVersion.V1_1, SyncV11Factory(p2p_manager, vertex_parser=vertex_parser)) + p2p_manager.add_sync_factory(SyncVersion.V1_1, SyncV11Factory(dependencies)) if sync_v1_support is cls.ENABLED: p2p_manager.enable_sync_version(SyncVersion.V1_1) # sync-v2 support: if sync_v2_support > cls.UNAVAILABLE: - sync_v2_factory = SyncV2Factory( - settings, - p2p_manager, - vertex_parser=vertex_parser, - vertex_handler=vertex_handler, - ) - p2p_manager.add_sync_factory(SyncVersion.V2, sync_v2_factory) + p2p_manager.add_sync_factory(SyncVersion.V2, SyncV2Factory(dependencies)) if sync_v2_support is cls.ENABLED: p2p_manager.enable_sync_version(SyncVersion.V2) @@ -232,7 +225,6 @@ def build(self) -> BuildArtifacts: vertex_handler = self._get_or_create_vertex_handler() vertex_parser = self._get_or_create_vertex_parser() poa_block_producer = self._get_or_create_poa_block_producer() - capabilities = self._get_or_create_capabilities() if self._enable_address_index: indexes.enable_address_index(pubsub) @@ -264,7 +256,6 @@ def build(self) -> BuildArtifacts: wallet=wallet, rng=self._rng, checkpoints=self._checkpoints, - capabilities=capabilities, environment_info=get_environment_info(self._cmdline, str(peer.id)), bit_signaling_service=bit_signaling_service, verification_service=verification_service, @@ -416,25 +407,31 @@ def _get_or_create_p2p_manager(self) -> ConnectionsManager: return self._p2p_manager enable_ssl = True - reactor = self._get_reactor() my_peer = self._get_peer() - self._p2p_manager = ConnectionsManager( + dependencies = P2PDependencies( + reactor=self._get_reactor(), settings=self._get_or_create_settings(), - reactor=reactor, + vertex_parser=self._get_or_create_vertex_parser(), + tx_storage=self._get_or_create_tx_storage(), + vertex_handler=self._get_or_create_vertex_handler(), + verification_service=self._get_or_create_verification_service(), + capabilities=self._get_or_create_capabilities(), + whitelist_only=False, + ) + + self._p2p_manager = ConnectionsManager( + dependencies=dependencies, my_peer=my_peer, pubsub=self._get_or_create_pubsub(), ssl=enable_ssl, - whitelist_only=False, rng=self._rng, ) SyncSupportLevel.add_factories( - self._get_or_create_settings(), self._p2p_manager, + dependencies, self._sync_v1_support, self._sync_v2_support, - self._get_or_create_vertex_parser(), - self._get_or_create_vertex_handler(), ) return self._p2p_manager diff --git a/hathor/builder/cli_builder.py b/hathor/builder/cli_builder.py index cc52e3bbc..d052cbe8f 100644 --- a/hathor/builder/cli_builder.py +++ b/hathor/builder/cli_builder.py @@ -34,6 +34,7 @@ from hathor.indexes import IndexesManager, MemoryIndexesManager, RocksDBIndexesManager from hathor.manager import HathorManager from hathor.mining.cpu_mining_service import CpuMiningService +from hathor.p2p import P2PDependencies from hathor.p2p.manager import ConnectionsManager from hathor.p2p.peer import PrivatePeer from hathor.p2p.peer_endpoint import PeerEndpoint @@ -319,16 +320,6 @@ def create_manager(self, reactor: Reactor) -> HathorManager: cpu_mining_service = CpuMiningService() capabilities = settings.get_default_capabilities() - p2p_manager = ConnectionsManager( - settings=settings, - reactor=reactor, - my_peer=peer, - pubsub=pubsub, - ssl=True, - whitelist_only=False, - rng=Random(), - ) - vertex_handler = VertexHandler( reactor=reactor, settings=settings, @@ -341,13 +332,30 @@ def create_manager(self, reactor: Reactor) -> HathorManager: log_vertex_bytes=self._args.log_vertex_bytes, ) + p2p_dependencies = P2PDependencies( + reactor=reactor, + settings=settings, + vertex_parser=vertex_parser, + tx_storage=tx_storage, + vertex_handler=vertex_handler, + verification_service=verification_service, + whitelist_only=False, + capabilities=capabilities, + ) + + p2p_manager = ConnectionsManager( + dependencies=p2p_dependencies, + my_peer=peer, + pubsub=pubsub, + ssl=True, + rng=Random(), + ) + SyncSupportLevel.add_factories( - settings, p2p_manager, + p2p_dependencies, sync_v1_support, sync_v2_support, - vertex_parser, - vertex_handler, ) from hathor.consensus.poa import PoaBlockProducer, PoaSignerFile @@ -385,7 +393,6 @@ def create_manager(self, reactor: Reactor) -> HathorManager: vertex_handler=vertex_handler, vertex_parser=vertex_parser, poa_block_producer=poa_block_producer, - capabilities=capabilities, ) if self._args.x_ipython_kernel: diff --git a/hathor/cli/quick_test.py b/hathor/cli/quick_test.py index 2bf6f16fe..2bdd20711 100644 --- a/hathor/cli/quick_test.py +++ b/hathor/cli/quick_test.py @@ -12,14 +12,19 @@ # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import annotations + import os from argparse import ArgumentParser -from typing import Any +from typing import TYPE_CHECKING from structlog import get_logger from hathor.cli.run_node import RunNode +if TYPE_CHECKING: + from hathor.transaction import Vertex + logger = get_logger() @@ -30,18 +35,17 @@ def __init__(self, vertex_handler, manager, n_blocks): self._manager = manager self._n_blocks = n_blocks - def on_new_vertex(self, *args: Any, **kwargs: Any) -> bool: + def on_new_vertex(self, vertex: Vertex, *, fails_silently: bool) -> bool: from hathor.transaction import Block from hathor.transaction.base_transaction import GenericVertex msg: str | None = None - res = self._vertex_handler.on_new_vertex(*args, **kwargs) + res = self._vertex_handler.on_new_vertex(vertex=vertex, fails_silently=fails_silently) if self._n_blocks is None: should_quit = res msg = 'added a tx' else: - vertex = args[0] should_quit = False assert isinstance(vertex, GenericVertex) @@ -77,7 +81,7 @@ def prepare(self, *, register_resources: bool = True) -> None: self.log.info('patching vertex_handler.on_new_vertex to quit on success') p2p_factory = self.manager.connections.get_sync_factory(SyncVersion.V2) assert isinstance(p2p_factory, SyncV2Factory) - p2p_factory.vertex_handler = VertexHandlerWrapper( + p2p_factory.dependencies.vertex_handler = VertexHandlerWrapper( self.manager.vertex_handler, self.manager, self._args.quit_after_n_blocks, diff --git a/hathor/manager.py b/hathor/manager.py index 3e8811ab6..aa50e3766 100644 --- a/hathor/manager.py +++ b/hathor/manager.py @@ -109,7 +109,6 @@ def __init__( execution_manager: ExecutionManager, vertex_handler: VertexHandler, vertex_parser: VertexParser, - capabilities: list[str], hostname: Optional[str] = None, wallet: Optional[BaseWallet] = None, checkpoints: Optional[list[Checkpoint]] = None, @@ -230,9 +229,6 @@ def __init__( # List of whitelisted peers self.peers_whitelist: list[PeerId] = [] - # List of capabilities of the peer - self.capabilities = capabilities - # This is included in some logs to provide more context self.environment_info = environment_info @@ -975,9 +971,6 @@ def on_new_tx( return success - def has_sync_version_capability(self) -> bool: - return self._settings.CAPABILITY_SYNC_VERSION in self.capabilities - def add_peer_to_whitelist(self, peer_id: PeerId) -> None: if not self._settings.ENABLE_PEER_WHITELIST: return diff --git a/hathor/p2p/__init__.py b/hathor/p2p/__init__.py index e69de29bb..00b4ca1a0 100644 --- a/hathor/p2p/__init__.py +++ b/hathor/p2p/__init__.py @@ -0,0 +1,19 @@ +# Copyright 2024 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from hathor.p2p.dependencies.p2p_dependencies import P2PDependencies + +__all__ = [ + 'P2PDependencies', +] diff --git a/hathor/p2p/dependencies/__init__.py b/hathor/p2p/dependencies/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/hathor/p2p/dependencies/p2p_dependencies.py b/hathor/p2p/dependencies/p2p_dependencies.py new file mode 100644 index 000000000..46c263ad4 --- /dev/null +++ b/hathor/p2p/dependencies/p2p_dependencies.py @@ -0,0 +1,68 @@ +# Copyright 2024 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from hathor.conf.settings import HathorSettings +from hathor.p2p.dependencies.protocols import ( + P2PTransactionStorageProtocol, + P2PVerificationServiceProtocol, + P2PVertexHandlerProtocol, +) +from hathor.reactor import ReactorProtocol +from hathor.transaction.vertex_parser import VertexParser + + +class P2PDependencies: + """A simple class to unify all node dependencies that are required by P2P.""" + + __slots__ = ( + 'reactor', + 'settings', + 'vertex_parser', + 'vertex_handler', + 'verification_service', + 'tx_storage', + 'capabilities', + 'whitelist_only', + '_has_sync_version_capability', + ) + + def __init__( + self, + *, + reactor: ReactorProtocol, + settings: HathorSettings, + vertex_parser: VertexParser, + vertex_handler: P2PVertexHandlerProtocol, + verification_service: P2PVerificationServiceProtocol, + tx_storage: P2PTransactionStorageProtocol, + capabilities: list[str], + whitelist_only: bool, + ) -> None: + self.reactor = reactor + self.settings = settings + self.vertex_parser = vertex_parser + self.vertex_handler = vertex_handler + self.verification_service = verification_service + self.tx_storage = tx_storage + + # List of capabilities of the peer + self.capabilities = capabilities + + # Parameter to explicitly enable whitelist-only mode, when False it will still check the whitelist for sync-v1 + self.whitelist_only = whitelist_only + + self._has_sync_version_capability = settings.CAPABILITY_SYNC_VERSION in capabilities + + def has_sync_version_capability(self) -> bool: + return self._has_sync_version_capability diff --git a/hathor/p2p/dependencies/protocols.py b/hathor/p2p/dependencies/protocols.py new file mode 100644 index 000000000..662d61703 --- /dev/null +++ b/hathor/p2p/dependencies/protocols.py @@ -0,0 +1,46 @@ +# Copyright 2024 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Protocol + +from hathor.indexes.height_index import HeightInfo +from hathor.transaction import Block, Vertex +from hathor.types import VertexId + + +class P2PVertexHandlerProtocol(Protocol): + """Abstract the VertexHandler as a Python protocol to be used in P2P classes.""" + + def on_new_vertex(self, vertex: Vertex, *, fails_silently: bool = True) -> bool: ... + + +class P2PVerificationServiceProtocol(Protocol): + """Abstract the VerificationService as a Python protocol to be used in P2P classes.""" + + def verify_basic(self, vertex: Vertex) -> None: ... + + +class P2PTransactionStorageProtocol(Protocol): + """Abstract the TransactionStorage as a Python protocol to be used in P2P classes.""" + + def get_vertex(self, vertex_id: VertexId) -> Vertex: ... + def get_block(self, block_id: VertexId) -> Block: ... + def transaction_exists(self, vertex_id: VertexId) -> bool: ... + def can_validate_full(self, vertex: Vertex) -> bool: ... + def compare_bytes_with_local_tx(self, vertex: Vertex) -> bool: ... + def get_best_block(self) -> Block: ... + def get_n_height_tips(self, n_blocks: int) -> list[HeightInfo]: ... + def get_mempool_tips(self) -> set[VertexId]: ... + def get_block_id_by_height(self, height: int) -> VertexId | None: ... + def partial_vertex_exists(self, vertex_id: VertexId) -> bool: ... diff --git a/hathor/p2p/factory.py b/hathor/p2p/factory.py index 1b449d84f..9b80fc5e7 100644 --- a/hathor/p2p/factory.py +++ b/hathor/p2p/factory.py @@ -17,7 +17,7 @@ from twisted.internet import protocol from twisted.internet.interfaces import IAddress -from hathor.conf.settings import HathorSettings +from hathor.p2p import P2PDependencies from hathor.p2p.manager import ConnectionsManager from hathor.p2p.peer import PrivatePeer from hathor.p2p.peer_endpoint import PeerAddress @@ -32,13 +32,13 @@ def __init__( my_peer: PrivatePeer, p2p_manager: ConnectionsManager, *, - settings: HathorSettings, + dependencies: P2PDependencies, use_ssl: bool, ): super().__init__() - self._settings = settings self.my_peer = my_peer self.p2p_manager = p2p_manager + self.dependencies = dependencies self.use_ssl = use_ssl def buildProtocol(self, addr: IAddress) -> HathorLineReceiver: @@ -46,9 +46,9 @@ def buildProtocol(self, addr: IAddress) -> HathorLineReceiver: addr=PeerAddress.from_address(addr), my_peer=self.my_peer, p2p_manager=self.p2p_manager, + dependencies=self.dependencies, use_ssl=self.use_ssl, inbound=self.inbound, - settings=self._settings, ) diff --git a/hathor/p2p/manager.py b/hathor/p2p/manager.py index e15d9620e..352d6f4f2 100644 --- a/hathor/p2p/manager.py +++ b/hathor/p2p/manager.py @@ -26,7 +26,7 @@ from twisted.python.failure import Failure from twisted.web.client import Agent -from hathor.conf.settings import HathorSettings +from hathor.p2p import P2PDependencies from hathor.p2p.netfilter.factory import NetfilterFactory from hathor.p2p.peer import PrivatePeer, PublicPeer, UnverifiedPeer from hathor.p2p.peer_connections import PeerConnections @@ -41,7 +41,6 @@ from hathor.p2p.sync_version import SyncVersion from hathor.p2p.utils import parse_whitelist from hathor.pubsub import HathorEvents, PubSubManager -from hathor.reactor import ReactorProtocol as Reactor from hathor.transaction import BaseTransaction from hathor.util import Random @@ -77,7 +76,6 @@ class GlobalRateLimiter: SEND_TIPS = 'NodeSyncTimestamp.send_tips' manager: Optional['HathorManager'] - whitelist_only: bool unverified_peer_storage: UnverifiedPeerStorage verified_peer_storage: VerifiedPeerStorage _sync_factories: dict[SyncVersion, SyncAgentFactory] @@ -87,24 +85,23 @@ class GlobalRateLimiter: def __init__( self, - settings: HathorSettings, - reactor: Reactor, + dependencies: P2PDependencies, my_peer: PrivatePeer, pubsub: PubSubManager, ssl: bool, rng: Random, - whitelist_only: bool, ) -> None: self.log = logger.new() - self._settings = settings + self.dependencies = dependencies + self._settings = dependencies.settings self.rng = rng self.manager = None - self.MAX_ENABLED_SYNC = settings.MAX_ENABLED_SYNC - self.SYNC_UPDATE_INTERVAL = settings.SYNC_UPDATE_INTERVAL - self.PEER_DISCOVERY_INTERVAL = settings.PEER_DISCOVERY_INTERVAL + self.MAX_ENABLED_SYNC = self._settings.MAX_ENABLED_SYNC + self.SYNC_UPDATE_INTERVAL = self._settings.SYNC_UPDATE_INTERVAL + self.PEER_DISCOVERY_INTERVAL = self._settings.PEER_DISCOVERY_INTERVAL - self.reactor = reactor + self.reactor = dependencies.reactor self.my_peer = my_peer # List of address descriptions to listen for new connections (eg: [tcp:8000]) @@ -123,10 +120,16 @@ def __init__( from hathor.p2p.factory import HathorClientFactory, HathorServerFactory self.use_ssl = ssl self.server_factory = HathorServerFactory( - self.my_peer, p2p_manager=self, use_ssl=self.use_ssl, settings=self._settings + my_peer=self.my_peer, + p2p_manager=self, + dependencies=dependencies, + use_ssl=self.use_ssl, ) self.client_factory = HathorClientFactory( - self.my_peer, p2p_manager=self, use_ssl=self.use_ssl, settings=self._settings + my_peer=self.my_peer, + p2p_manager=self, + dependencies=dependencies, + use_ssl=self.use_ssl, ) # Global maximum number of connections. @@ -171,9 +174,6 @@ def __init__( # Pubsub object to publish events self.pubsub = pubsub - # Parameter to explicitly enable whitelist-only mode, when False it will still check the whitelist for sync-v1 - self.whitelist_only = whitelist_only - # Timestamp when the last discovery ran self._last_discovery: float = 0. diff --git a/hathor/p2p/protocol.py b/hathor/p2p/protocol.py index 5f9f1fcaf..5159eb075 100644 --- a/hathor/p2p/protocol.py +++ b/hathor/p2p/protocol.py @@ -24,7 +24,7 @@ from twisted.protocols.basic import LineReceiver from twisted.python.failure import Failure -from hathor.conf.settings import HathorSettings +from hathor.p2p import P2PDependencies from hathor.p2p.messages import ProtocolMessages from hathor.p2p.peer import PrivatePeer, PublicPeer from hathor.p2p.peer_endpoint import PeerAddress @@ -101,12 +101,13 @@ def __init__( my_peer: PrivatePeer, p2p_manager: 'ConnectionsManager', *, - settings: HathorSettings, + dependencies: P2PDependencies, use_ssl: bool, inbound: bool, addr: PeerAddress, ) -> None: - self._settings = settings + self.dependencies = dependencies + self._settings = dependencies.settings self.my_peer = my_peer self.connections = p2p_manager self.addr = addr @@ -173,7 +174,7 @@ def change_state(self, state_enum: PeerState) -> None: """Called to change the state of the connection.""" if state_enum not in self._state_instances: state_cls = state_enum.value - instance = state_cls(self, self._settings) + instance = state_cls(self, dependencies=self.dependencies) instance.state_name = state_enum.name self._state_instances[state_enum] = instance new_state = self._state_instances[state_enum] diff --git a/hathor/p2p/states/base.py b/hathor/p2p/states/base.py index f08401cc0..75a69140e 100644 --- a/hathor/p2p/states/base.py +++ b/hathor/p2p/states/base.py @@ -19,6 +19,7 @@ from twisted.internet.defer import Deferred from hathor.conf.settings import HathorSettings +from hathor.p2p import P2PDependencies from hathor.p2p.messages import ProtocolMessages if TYPE_CHECKING: @@ -34,9 +35,10 @@ class BaseState: Callable[[str], None] | Callable[[str], Deferred[None]] | Callable[[str], Coroutine[Deferred[None], Any, None]] ] - def __init__(self, protocol: 'HathorProtocol', settings: HathorSettings): + def __init__(self, protocol: 'HathorProtocol', *, dependencies: P2PDependencies): self.log = logger.new(**protocol.get_logger_context()) - self._settings = settings + self.dependencies = dependencies + self._settings: HathorSettings = dependencies.settings self.protocol = protocol self.cmd_map = { ProtocolMessages.ERROR: self.handle_error, diff --git a/hathor/p2p/states/hello.py b/hathor/p2p/states/hello.py index 47c9cf4e5..9c034b7cb 100644 --- a/hathor/p2p/states/hello.py +++ b/hathor/p2p/states/hello.py @@ -18,8 +18,8 @@ import hathor from hathor.conf.get_settings import get_global_settings -from hathor.conf.settings import HathorSettings from hathor.exception import HathorError +from hathor.p2p import P2PDependencies from hathor.p2p.messages import ProtocolMessages from hathor.p2p.states.base import BaseState from hathor.p2p.sync_version import SyncVersion @@ -33,8 +33,8 @@ class HelloState(BaseState): - def __init__(self, protocol: 'HathorProtocol', settings: HathorSettings) -> None: - super().__init__(protocol, settings) + def __init__(self, protocol: 'HathorProtocol', *, dependencies: P2PDependencies) -> None: + super().__init__(protocol, dependencies=dependencies) self.log = logger.new(**protocol.get_logger_context()) self.cmd_map.update({ ProtocolMessages.HELLO: self.handle_hello, @@ -55,11 +55,11 @@ def _get_hello_data(self) -> dict[str, Any]: 'network': self._settings.NETWORK_NAME, 'remote_address': format_address(remote), 'genesis_short_hash': get_genesis_short_hash(), - 'timestamp': protocol.node.reactor.seconds(), + 'timestamp': self.dependencies.reactor.seconds(), 'settings_dict': get_settings_hello_dict(self._settings), - 'capabilities': protocol.node.capabilities, + 'capabilities': self.dependencies.capabilities, } - if self.protocol.node.has_sync_version_capability(): + if self.dependencies.has_sync_version_capability(): data['sync_versions'] = [x.value for x in self._get_sync_versions()] return data @@ -143,7 +143,7 @@ def handle_hello(self, payload: str) -> None: protocol.send_error_and_close_connection('Different genesis.') return - dt = data['timestamp'] - protocol.node.reactor.seconds() + dt = data['timestamp'] - self.dependencies.reactor.seconds() if abs(dt) > self._settings.MAX_FUTURE_TIMESTAMP_ALLOWED / 2: protocol.send_error_and_close_connection('Nodes timestamps too far apart.') return diff --git a/hathor/p2p/states/peer_id.py b/hathor/p2p/states/peer_id.py index d9f82e8a3..2ca93ea59 100644 --- a/hathor/p2p/states/peer_id.py +++ b/hathor/p2p/states/peer_id.py @@ -16,7 +16,7 @@ from structlog import get_logger -from hathor.conf.settings import HathorSettings +from hathor.p2p import P2PDependencies from hathor.p2p.messages import ProtocolMessages from hathor.p2p.peer import PublicPeer from hathor.p2p.peer_id import PeerId @@ -30,8 +30,8 @@ class PeerIdState(BaseState): - def __init__(self, protocol: 'HathorProtocol', settings: HathorSettings) -> None: - super().__init__(protocol, settings) + def __init__(self, protocol: 'HathorProtocol', *, dependencies: P2PDependencies) -> None: + super().__init__(protocol, dependencies=dependencies) self.log = logger.new(remote=protocol.get_short_remote()) self.cmd_map.update({ ProtocolMessages.PEER_ID: self.handle_peer_id, @@ -161,10 +161,8 @@ def _should_block_peer(self, peer_id: PeerId) -> bool: return True # otherwise we block non-whitelisted peers when on "whitelist-only mode" - if self.protocol.connections is not None: - protocol_is_whitelist_only = self.protocol.connections.whitelist_only - if protocol_is_whitelist_only and not peer_is_whitelisted: - return True + if self.dependencies.whitelist_only and not peer_is_whitelisted: + return True # default is not blocking, this will be sync-v2 peers not on whitelist when not on whitelist-only mode return False diff --git a/hathor/p2p/states/ready.py b/hathor/p2p/states/ready.py index d7e6f0686..fb3db3ab9 100644 --- a/hathor/p2p/states/ready.py +++ b/hathor/p2p/states/ready.py @@ -18,8 +18,8 @@ from structlog import get_logger from twisted.internet.task import LoopingCall -from hathor.conf.settings import HathorSettings from hathor.indexes.height_index import HeightInfo +from hathor.p2p import P2PDependencies from hathor.p2p.messages import ProtocolMessages from hathor.p2p.peer import PublicPeer, UnverifiedPeer from hathor.p2p.states.base import BaseState @@ -35,12 +35,12 @@ class ReadyState(BaseState): - def __init__(self, protocol: 'HathorProtocol', settings: HathorSettings) -> None: - super().__init__(protocol, settings) + def __init__(self, protocol: 'HathorProtocol', *, dependencies: P2PDependencies) -> None: + super().__init__(protocol, dependencies=dependencies) self.log = logger.new(**self.protocol.get_logger_context()) - self.reactor = self.protocol.node.reactor + self.reactor = self.dependencies.reactor # It triggers an event to send a ping message if necessary. self.lc_ping = LoopingCall(self.send_ping_if_necessary) @@ -85,7 +85,7 @@ def __init__(self, protocol: 'HathorProtocol', settings: HathorSettings) -> None self.lc_get_best_blockchain: Optional[LoopingCall] = None # if the peer has the GET-BEST-BLOCKCHAIN capability - common_capabilities = protocol.capabilities & set(protocol.node.capabilities) + common_capabilities = protocol.capabilities & set(self.dependencies.capabilities) if (self._settings.CAPABILITY_GET_BEST_BLOCKCHAIN in common_capabilities): # set the loop to get the best blockchain from the peer self.lc_get_best_blockchain = LoopingCall(self.send_get_best_blockchain) @@ -106,7 +106,7 @@ def __init__(self, protocol: 'HathorProtocol', settings: HathorSettings) -> None self.log.debug(f'loading {sync_version}') sync_factory = connections.get_sync_factory(sync_version) - self.sync_agent: SyncAgent = sync_factory.create_sync_agent(self.protocol, reactor=self.reactor) + self.sync_agent: SyncAgent = sync_factory.create_sync_agent(self.protocol) self.cmd_map.update(self.sync_agent.get_cmd_dict()) def on_enter(self) -> None: @@ -254,7 +254,7 @@ def handle_get_best_blockchain(self, payload: str) -> None: ) return - best_blockchain = self.protocol.node.tx_storage.get_n_height_tips(n_blocks) + best_blockchain = self.dependencies.tx_storage.get_n_height_tips(n_blocks) self.send_best_blockchain(best_blockchain) def send_best_blockchain(self, best_blockchain: list[HeightInfo]) -> None: diff --git a/hathor/p2p/sync_factory.py b/hathor/p2p/sync_factory.py index f4883f21a..da32bd68b 100644 --- a/hathor/p2p/sync_factory.py +++ b/hathor/p2p/sync_factory.py @@ -16,7 +16,6 @@ from typing import TYPE_CHECKING from hathor.p2p.sync_agent import SyncAgent -from hathor.reactor import ReactorProtocol as Reactor if TYPE_CHECKING: from hathor.p2p.protocol import HathorProtocol @@ -24,5 +23,5 @@ class SyncAgentFactory(ABC): @abstractmethod - def create_sync_agent(self, protocol: 'HathorProtocol', reactor: Reactor) -> SyncAgent: - pass + def create_sync_agent(self, protocol: 'HathorProtocol') -> SyncAgent: + raise NotImplementedError diff --git a/hathor/p2p/sync_v1/agent.py b/hathor/p2p/sync_v1/agent.py index 68fe401ec..7291db0e9 100644 --- a/hathor/p2p/sync_v1/agent.py +++ b/hathor/p2p/sync_v1/agent.py @@ -22,14 +22,13 @@ from twisted.internet.defer import CancelledError, Deferred, inlineCallbacks from twisted.internet.interfaces import IDelayedCall -from hathor.conf.get_settings import get_global_settings +from hathor.p2p import P2PDependencies from hathor.p2p.messages import GetNextPayload, GetTipsPayload, NextPayload, ProtocolMessages, TipsPayload from hathor.p2p.sync_agent import SyncAgent from hathor.p2p.sync_v1.downloader import Downloader -from hathor.reactor import ReactorProtocol as Reactor from hathor.transaction import BaseTransaction +from hathor.transaction.storage import TransactionStorage from hathor.transaction.storage.exceptions import TransactionDoesNotExist -from hathor.transaction.vertex_parser import VertexParser from hathor.util import json_dumps, json_loads logger = get_logger() @@ -64,9 +63,8 @@ def __init__( self, protocol: 'HathorProtocol', downloader: Downloader, - reactor: Reactor, *, - vertex_parser: VertexParser, + dependencies: P2PDependencies, ) -> None: """ :param protocol: Protocol of the connection. @@ -75,13 +73,17 @@ def __init__( :param reactor: Reactor to schedule later calls. (default=twisted.internet.reactor) :type reactor: Reactor """ - self._settings = get_global_settings() - self.vertex_parser = vertex_parser + self._settings = dependencies.settings + self.dependencies = dependencies self.protocol = protocol - self.manager = protocol.node self.downloader = downloader + self.reactor = dependencies.reactor - self.reactor: Reactor = reactor + # Since Sync-v1 does not support multiprocess P2P, the dependencies.tx_storage will always be a concrete + # TransactionStorage in the same process, with no IPC. + # This reduces the number of IPC endpoints we have to implement. + assert isinstance(self.dependencies.tx_storage, TransactionStorage) + self.tx_storage = self.dependencies.tx_storage # Rate limit for this connection. assert protocol.connections is not None @@ -184,7 +186,7 @@ def is_synced(self) -> bool: See the `send_tx_to_peer_if_possible` method for the exact process and to understand why this condition has to be this way. """ - return self.manager.tx_storage.latest_timestamp - self.synced_timestamp <= self.sync_threshold + return self.tx_storage.latest_timestamp - self.synced_timestamp <= self.sync_threshold def is_errored(self) -> bool: # XXX: this sync manager does not have an error state, this method exists for API parity with sync-v2 @@ -203,7 +205,7 @@ def send_tx_to_peer_if_possible(self, tx: BaseTransaction) -> None: # parents' timestamps are below synced_timestamp, i.e., we know that the peer # has all the parents. for parent_hash in tx.parents: - parent = self.protocol.node.tx_storage.get_transaction(parent_hash) + parent = self.tx_storage.get_vertex(parent_hash) if parent.timestamp > self.synced_timestamp: return @@ -286,7 +288,7 @@ def sync_from_timestamp(self, next_timestamp: int) -> Generator[Deferred, Any, N next_offset=payload.next_offset, hashes=len(payload.hashes)) count = 0 for h in payload.hashes: - if not self.manager.tx_storage.transaction_exists(h): + if not self.tx_storage.transaction_exists(h): pending.add(self.get_data(h)) count += 1 self.log.debug('...', next_ts=next_timestamp, count=count, pending=len(pending)) @@ -321,18 +323,18 @@ def find_synced_timestamp(self) -> Generator[Deferred, Any, Optional[int]]: # Maximum of ceil(log(k)), where k is the number of items between the new one and the latest item. prev_cur = None cur = self.peer_timestamp - local_merkle_tree, _ = self.manager.tx_storage.get_merkle_tree(cur) + local_merkle_tree, _ = self.tx_storage.get_merkle_tree(cur) step = 1 while tips.merkle_tree != local_merkle_tree: - if cur <= self.manager.tx_storage.first_timestamp: + if cur <= self.tx_storage.first_timestamp: raise Exception( 'We cannot go before genesis. Peer is probably running with wrong configuration or database.' ) prev_cur = cur - assert self.manager.tx_storage.first_timestamp > 0 - cur = max(cur - step, self.manager.tx_storage.first_timestamp) + assert self.tx_storage.first_timestamp > 0 + cur = max(cur - step, self.tx_storage.first_timestamp) tips = (yield self.get_peer_tips(cur)) - local_merkle_tree, _ = self.manager.tx_storage.get_merkle_tree(cur) + local_merkle_tree, _ = self.tx_storage.get_merkle_tree(cur) step *= 2 # Here, both nodes are synced at timestamp `cur` and not synced at timestamp `prev_cur`. @@ -348,7 +350,7 @@ def find_synced_timestamp(self) -> Generator[Deferred, Any, Optional[int]]: while high - low > 1: mid = (low + high + 1) // 2 tips = (yield self.get_peer_tips(mid)) - local_merkle_tree, _ = self.manager.tx_storage.get_merkle_tree(mid) + local_merkle_tree, _ = self.tx_storage.get_merkle_tree(mid) if tips.merkle_tree == local_merkle_tree: low = mid else: @@ -442,9 +444,9 @@ def send_next(self, timestamp: int, offset: int = 0) -> None: from hathor.indexes.timestamp_index import RangeIdx count = self.MAX_HASHES - assert self.manager.tx_storage.indexes is not None + assert self.tx_storage.indexes is not None from_idx = RangeIdx(timestamp, offset) - hashes, next_idx = self.manager.tx_storage.indexes.sorted_all.get_hashes_and_next_idx(from_idx, count) + hashes, next_idx = self.tx_storage.indexes.sorted_all.get_hashes_and_next_idx(from_idx, count) if next_idx is None: # this means we've reached the end and there's nothing else to sync next_timestamp, next_offset = inf, 0 @@ -524,7 +526,7 @@ def _send_tips(self, timestamp: Optional[int] = None, include_hashes: bool = Fal """ Send a TIPS message. """ if timestamp is None: - timestamp = self.manager.tx_storage.latest_timestamp + timestamp = self.tx_storage.latest_timestamp # All tips # intervals = self.manager.tx_storage.get_all_tips(timestamp) @@ -532,7 +534,7 @@ def _send_tips(self, timestamp: Optional[int] = None, include_hashes: bool = Fal # raise Exception('No tips for timestamp {}'.format(timestamp)) # Calculate list of hashes to be sent - merkle_tree, hashes = self.manager.tx_storage.get_merkle_tree(timestamp) + merkle_tree, hashes = self.tx_storage.get_merkle_tree(timestamp) has_more = False if not include_hashes: @@ -577,7 +579,7 @@ def handle_get_data(self, payload: str) -> None: hash_hex = payload # self.log.debug('handle_get_data', payload=hash_hex) try: - tx = self.protocol.node.tx_storage.get_transaction(bytes.fromhex(hash_hex)) + tx = self.tx_storage.get_vertex(bytes.fromhex(hash_hex)) self.send_data(tx) except TransactionDoesNotExist: # In case the tx does not exist we send a NOT-FOUND message @@ -605,7 +607,7 @@ def handle_data(self, payload: str) -> None: data = base64.b64decode(payload) try: - tx = self.vertex_parser.deserialize(data) + tx = self.dependencies.vertex_parser.deserialize(data) except struct.error: # Invalid data for tx decode return @@ -614,11 +616,10 @@ def handle_data(self, payload: str) -> None: self.log.debug('tx received from peer', tx=tx.hash_hex, peer=self.protocol.get_peer_id()) - if self.protocol.node.tx_storage.get_genesis(tx.hash): + if self.tx_storage.get_genesis(tx.hash): # We just got the data of a genesis tx/block. What should we do? # Will it reduce peer reputation score? return - tx.storage = self.protocol.node.tx_storage key = self.get_data_key(tx.hash) deferred = self.deferred_by_key.pop(key, None) @@ -627,16 +628,16 @@ def handle_data(self, payload: str) -> None: assert tx.timestamp is not None self.requested_data_arrived(tx.timestamp) deferred.callback(tx) - elif self.manager.tx_storage.transaction_exists(tx.hash): + elif self.tx_storage.transaction_exists(tx.hash): # transaction already added to the storage, ignore it # XXX: maybe we could add a hash blacklist and punish peers propagating known bad txs - self.manager.tx_storage.compare_bytes_with_local_tx(tx) + self.tx_storage.compare_bytes_with_local_tx(tx) return else: self.log.info('tx received in real time from peer', tx=tx.hash_hex, peer=self.protocol.get_peer_id()) # If we have not requested the data, it is a new transaction being propagated # in the network, thus, we propagate it as well. - success = self.manager.vertex_handler.on_new_vertex(tx) + success = self.dependencies.vertex_handler.on_new_vertex(tx) if success: self.protocol.connections.send_tx_to_peers(tx) self.update_received_stats(tx, success) @@ -682,12 +683,12 @@ def on_tx_success(self, tx: 'BaseTransaction') -> 'BaseTransaction': # the parameter of the second callback is the return of the first # so I need to return the same tx to guarantee that all peers will receive it if tx: - if self.manager.tx_storage.transaction_exists(tx.hash): - self.manager.tx_storage.compare_bytes_with_local_tx(tx) + if self.tx_storage.transaction_exists(tx.hash): + self.tx_storage.compare_bytes_with_local_tx(tx) success = True else: # Add tx to the DAG. - success = self.manager.vertex_handler.on_new_vertex(tx) + success = self.dependencies.vertex_handler.on_new_vertex(tx) if success: self.protocol.connections.send_tx_to_peers(tx) # Updating stats data diff --git a/hathor/p2p/sync_v1/downloader.py b/hathor/p2p/sync_v1/downloader.py index d8b3c12cf..8e22d9f7f 100644 --- a/hathor/p2p/sync_v1/downloader.py +++ b/hathor/p2p/sync_v1/downloader.py @@ -22,10 +22,10 @@ from twisted.python.failure import Failure from hathor.conf.get_settings import get_global_settings +from hathor.p2p import P2PDependencies from hathor.transaction.storage.exceptions import TransactionDoesNotExist if TYPE_CHECKING: - from hathor.manager import HathorManager from hathor.p2p.sync_v1.agent import NodeSyncTimestamp from hathor.transaction import BaseTransaction @@ -145,10 +145,10 @@ class Downloader: # Size of the sliding window used to download transactions. window_size: int - def __init__(self, manager: 'HathorManager', window_size: int = 100): - self._settings = get_global_settings() + def __init__(self, dependencies: P2PDependencies, window_size: int = 100): + self.dependencies = dependencies + self._settings = dependencies.settings self.log = logger.new() - self.manager = manager self.pending_transactions = {} self.waiting_deque = deque() @@ -172,7 +172,7 @@ def get_tx(self, tx_id: bytes, connection: 'NodeSyncTimestamp') -> Deferred: # If I already have this tx in the storage just return a defer already success # In the node_sync code we already handle this case but in a race condition situation # we might get here but it's not common - tx = self.manager.tx_storage.get_transaction(tx_id) + tx = self.dependencies.tx_storage.get_vertex(tx_id) self.log.debug('requesting to download a tx that is already in the storage', tx=tx_id.hex()) return defer.succeed(tx) except TransactionDoesNotExist: diff --git a/hathor/p2p/sync_v1/factory.py b/hathor/p2p/sync_v1/factory.py index 2a205d728..68fd740e1 100644 --- a/hathor/p2p/sync_v1/factory.py +++ b/hathor/p2p/sync_v1/factory.py @@ -14,34 +14,29 @@ from typing import TYPE_CHECKING, Optional -from hathor.p2p.manager import ConnectionsManager +from hathor.p2p import P2PDependencies from hathor.p2p.sync_agent import SyncAgent from hathor.p2p.sync_factory import SyncAgentFactory from hathor.p2p.sync_v1.agent import NodeSyncTimestamp from hathor.p2p.sync_v1.downloader import Downloader -from hathor.reactor import ReactorProtocol as Reactor -from hathor.transaction.vertex_parser import VertexParser if TYPE_CHECKING: from hathor.p2p.protocol import HathorProtocol class SyncV11Factory(SyncAgentFactory): - def __init__(self, connections: ConnectionsManager, *, vertex_parser: VertexParser): - self.connections = connections - self.vertex_parser = vertex_parser + def __init__(self, dependencies: P2PDependencies) -> None: + self.dependencies = dependencies self._downloader: Optional[Downloader] = None def get_downloader(self) -> Downloader: if self._downloader is None: - assert self.connections.manager is not None - self._downloader = Downloader(self.connections.manager) + self._downloader = Downloader(self.dependencies) return self._downloader - def create_sync_agent(self, protocol: 'HathorProtocol', reactor: Reactor) -> SyncAgent: + def create_sync_agent(self, protocol: 'HathorProtocol') -> SyncAgent: return NodeSyncTimestamp( protocol, downloader=self.get_downloader(), - reactor=reactor, - vertex_parser=self.vertex_parser + dependencies=self.dependencies, ) diff --git a/hathor/p2p/sync_v2/agent.py b/hathor/p2p/sync_v2/agent.py index ddcef4e00..2a3b75f64 100644 --- a/hathor/p2p/sync_v2/agent.py +++ b/hathor/p2p/sync_v2/agent.py @@ -24,8 +24,8 @@ from twisted.internet.defer import Deferred, inlineCallbacks from twisted.internet.task import LoopingCall, deferLater -from hathor.conf.settings import HathorSettings from hathor.exception import InvalidNewTransaction +from hathor.p2p import P2PDependencies from hathor.p2p.messages import ProtocolMessages from hathor.p2p.sync_agent import SyncAgent from hathor.p2p.sync_v2.blockchain_streaming_client import BlockchainStreamingClient, StreamingError @@ -38,18 +38,14 @@ TransactionsStreamingServer, ) from hathor.p2p.sync_v2.transaction_streaming_client import TransactionStreamingClient -from hathor.reactor import ReactorProtocol as Reactor from hathor.transaction import BaseTransaction, Block, Transaction from hathor.transaction.genesis import is_genesis from hathor.transaction.storage.exceptions import TransactionDoesNotExist -from hathor.transaction.vertex_parser import VertexParser from hathor.types import VertexId from hathor.util import collect_n -from hathor.vertex_handler import VertexHandler if TYPE_CHECKING: from hathor.p2p.protocol import HathorProtocol - from hathor.transaction.storage import TransactionStorage logger = get_logger() @@ -89,30 +85,23 @@ class NodeBlockSync(SyncAgent): def __init__( self, - settings: HathorSettings, protocol: 'HathorProtocol', - reactor: Reactor, *, - vertex_parser: VertexParser, - vertex_handler: VertexHandler, + dependencies: P2PDependencies, ) -> None: """ :param protocol: Protocol of the connection. :type protocol: HathorProtocol - - :param reactor: Reactor to schedule later calls. (default=twisted.internet.reactor) - :type reactor: Reactor """ - self._settings = settings - self.vertex_parser = vertex_parser - self.vertex_handler = vertex_handler + self.dependencies = dependencies + self._settings = dependencies.settings + self.vertex_parser = dependencies.vertex_parser self.protocol = protocol - self.tx_storage: 'TransactionStorage' = protocol.node.tx_storage self.state = PeerState.UNKNOWN self.DEFAULT_STREAMING_LIMIT = DEFAULT_STREAMING_LIMIT - self.reactor: Reactor = reactor + self.reactor = dependencies.reactor self._is_streaming: bool = False # Create logger with context @@ -153,7 +142,7 @@ def __init__( # Saves if I am in the middle of a mempool sync # we don't execute any sync while in the middle of it - self.mempool_manager = SyncMempoolManager(self) + self.mempool_manager = SyncMempoolManager(self, dependencies=self.dependencies) self._receiving_tips: Optional[list[VertexId]] = None self.max_receiving_tips: int = self._settings.MAX_MEMPOOL_RECEIVING_TIPS @@ -179,9 +168,7 @@ def __init__( def get_status(self) -> dict[str, Any]: """ Return the status of the sync. """ - assert self.tx_storage.indexes is not None - assert self.tx_storage.indexes.mempool_tips is not None - tips = self.tx_storage.indexes.mempool_tips.get() + tips = self.dependencies.tx_storage.get_mempool_tips() tips_limited, tips_has_more = collect_n(iter(tips), MAX_MEMPOOL_STATUS_TIPS) res = { 'is_enabled': self.is_sync_enabled(), @@ -348,7 +335,7 @@ def run_sync_mempool(self) -> Generator[Any, Any, None]: def get_my_best_block(self) -> _HeightInfo: """Return my best block info.""" - bestblock = self.tx_storage.get_best_block() + bestblock = self.dependencies.tx_storage.get_best_block() meta = bestblock.get_metadata() assert meta.validation.is_fully_connected() return _HeightInfo(height=bestblock.get_height(), id=bestblock.hash) @@ -359,7 +346,6 @@ def run_sync_blocks(self) -> Generator[Any, Any, bool]: Notice that we might already have all other peer's blocks while the other peer is still syncing. """ - assert self.tx_storage.indexes is not None self.state = PeerState.SYNCING_BLOCKS # Get my best block. @@ -382,7 +368,7 @@ def run_sync_blocks(self) -> Generator[Any, Any, bool]: # Not synced but same blockchain? if self.peer_best_block.height <= my_best_block.height: # Is peer behind me at the same blockchain? - common_block_hash = self.tx_storage.get_block_id_by_height(self.peer_best_block.height) + common_block_hash = self.dependencies.tx_storage.get_block_id_by_height(self.peer_best_block.height) if common_block_hash == self.peer_best_block.id: # If yes, nothing to sync from this peer. if not self.is_synced(): @@ -466,7 +452,7 @@ def handle_get_tips(self, _payload: str) -> None: return self.log.debug('handle_get_tips') # TODO Use a streaming of tips - for tx_id in self.tx_storage.get_mempool_tips(): + for tx_id in self.dependencies.tx_storage.get_mempool_tips(): self.send_tips(tx_id) self.log.debug('tips end') self.send_message(ProtocolMessages.TIPS_END) @@ -488,7 +474,7 @@ def handle_tips(self, payload: str) -> None: # filter-out txs we already have try: self._receiving_tips.extend( - VertexId(tx_id) for tx_id in data if not self.tx_storage.partial_vertex_exists(tx_id) + VertexId(tx_id) for tx_id in data if not self.dependencies.tx_storage.partial_vertex_exists(tx_id) ) except ValueError: self.protocol.send_error_and_close_connection('Invalid trasaction ID received') @@ -532,7 +518,9 @@ def start_blockchain_streaming(self, start_block: _HeightInfo, end_block: _HeightInfo) -> Deferred[StreamEnd]: """Request peer to start streaming blocks to us.""" - self._blk_streaming_client = BlockchainStreamingClient(self, start_block, end_block) + self._blk_streaming_client = BlockchainStreamingClient( + self, start_block, end_block, dependencies=self.dependencies + ) quantity = self._blk_streaming_client._blk_max_quantity self.log.info('requesting blocks streaming', start_block=start_block, @@ -596,7 +584,7 @@ def find_best_common_block(self, for info in block_info_list: try: # We must check only fully validated transactions. - blk = self.tx_storage.get_transaction(info.id) + blk = self.dependencies.tx_storage.get_vertex(info.id) except TransactionDoesNotExist: hi = info else: @@ -615,12 +603,12 @@ def on_block_complete(self, blk: Block, vertex_list: list[BaseTransaction]) -> G # Note: Any vertex and block could have already been added by another concurrent syncing peer. try: for tx in vertex_list: - if not self.tx_storage.transaction_exists(tx.hash): - self.vertex_handler.on_new_vertex(tx, fails_silently=False) + if not self.dependencies.tx_storage.transaction_exists(tx.hash): + self.dependencies.vertex_handler.on_new_vertex(tx, fails_silently=False) yield deferLater(self.reactor, 0, lambda: None) - if not self.tx_storage.transaction_exists(blk.hash): - self.vertex_handler.on_new_vertex(blk, fails_silently=False) + if not self.dependencies.tx_storage.transaction_exists(blk.hash): + self.dependencies.vertex_handler.on_new_vertex(blk, fails_silently=False) except InvalidNewTransaction: self.protocol.send_error_and_close_connection('invalid vertex received') @@ -649,10 +637,10 @@ def handle_get_peer_block_hashes(self, payload: str) -> None: return data = [] for h in heights: - blk_hash = self.tx_storage.get_block_id_by_height(h) + blk_hash = self.dependencies.tx_storage.get_block_id_by_height(h) if blk_hash is None: break - blk = self.tx_storage.get_transaction(blk_hash) + blk = self.dependencies.tx_storage.get_vertex(blk_hash) if blk.get_metadata().voided_by: break data.append((h, blk_hash.hex())) @@ -703,7 +691,7 @@ def handle_get_next_blocks(self, payload: str) -> None: def _validate_block(self, _hash: VertexId) -> Optional[Block]: """Validate block given in the GET-NEXT-BLOCKS and GET-TRANSACTIONS-BFS messages.""" try: - blk = self.tx_storage.get_transaction(_hash) + blk = self.dependencies.tx_storage.get_vertex(_hash) except TransactionDoesNotExist: self.log.debug('requested block not found', blk_id=_hash.hex()) self.send_message(ProtocolMessages.NOT_FOUND, _hash.hex()) @@ -780,7 +768,6 @@ def handle_blocks(self, payload: str) -> None: if not isinstance(blk, Block): # Not a block. Punish peer? return - blk.storage = self.tx_storage assert self._blk_streaming_client is not None self._blk_streaming_client.handle_blocks(blk) @@ -841,7 +828,7 @@ def send_get_best_block(self) -> None: def handle_get_best_block(self, _payload: str) -> None: """ Handle a GET-BEST-BLOCK message. """ - best_block = self.tx_storage.get_best_block() + best_block = self.dependencies.tx_storage.get_best_block() meta = best_block.get_metadata() assert meta.validation.is_fully_connected() payload = BestBlockPayload( @@ -863,9 +850,9 @@ def handle_best_block(self, payload: str) -> None: def start_transactions_streaming(self, partial_blocks: list[Block]) -> Deferred[StreamEnd]: """Request peer to start streaming transactions to us.""" - self._tx_streaming_client = TransactionStreamingClient(self, - partial_blocks, - limit=self.DEFAULT_STREAMING_LIMIT) + self._tx_streaming_client = TransactionStreamingClient( + self, partial_blocks, limit=self.DEFAULT_STREAMING_LIMIT, dependencies=self.dependencies + ) start_from: list[bytes] = [] first_block_hash = partial_blocks[0].hash @@ -954,7 +941,7 @@ def handle_get_transactions_bfs(self, payload: str) -> None: start_from_txs = [] for start_from_hash in data.start_from: try: - tx = self.tx_storage.get_transaction(start_from_hash) + tx = self.dependencies.tx_storage.get_vertex(start_from_hash) except TransactionDoesNotExist: # In case the tx does not exist we send a NOT-FOUND message self.log.debug('requested start_from_hash not found', start_from_hash=start_from_hash.hex()) @@ -980,11 +967,14 @@ def send_transactions_bfs(self, """ if self._tx_streaming_server is not None and self._tx_streaming_server.is_running: self.stop_tx_streaming_server(StreamEnd.PER_REQUEST) - self._tx_streaming_server = TransactionsStreamingServer(self, - start_from, - first_block, - last_block, - limit=self.DEFAULT_STREAMING_LIMIT) + self._tx_streaming_server = TransactionsStreamingServer( + self, + start_from, + first_block, + last_block, + limit=self.DEFAULT_STREAMING_LIMIT, + dependencies=self.dependencies, + ) self._tx_streaming_server.start() def send_transaction(self, tx: Transaction) -> None: @@ -1031,7 +1021,6 @@ def handle_transaction(self, payload: str) -> None: self.log.warn('not a transaction', hash=tx.hash_hex) # Not a transaction. Punish peer? return - tx.storage = self.tx_storage assert self._tx_streaming_client is not None self._tx_streaming_client.handle_transaction(tx) @@ -1045,7 +1034,7 @@ def get_tx(self, tx_id: bytes) -> Generator[Deferred, Any, BaseTransaction]: self.log.debug('tx in cache', tx=tx_id.hex()) return tx try: - tx = self.tx_storage.get_transaction(tx_id) + tx = self.dependencies.tx_storage.get_vertex(tx_id) except TransactionDoesNotExist: tx = yield self.get_data(tx_id, 'mempool') assert tx is not None @@ -1115,7 +1104,7 @@ def handle_get_data(self, payload: str) -> None: origin = data.get('origin', '') # self.log.debug('handle_get_data', payload=hash_hex) try: - tx = self.protocol.node.tx_storage.get_transaction(bytes.fromhex(txid_hex)) + tx = self.dependencies.tx_storage.get_vertex(bytes.fromhex(txid_hex)) self.send_data(tx, origin=origin) except TransactionDoesNotExist: # In case the tx does not exist we send a NOT-FOUND message @@ -1150,25 +1139,23 @@ def handle_data(self, payload: str) -> None: return assert tx is not None - if is_genesis(tx.hash, settings=self._settings): + if is_genesis(tx.hash, settings=self.dependencies.settings): # We just got the data of a genesis tx/block. What should we do? # Will it reduce peer reputation score? return - tx.storage = self.protocol.node.tx_storage - - if self.tx_storage.partial_vertex_exists(tx.hash): + if self.dependencies.tx_storage.partial_vertex_exists(tx.hash): # transaction already added to the storage, ignore it # XXX: maybe we could add a hash blacklist and punish peers propagating known bad txs - self.tx_storage.compare_bytes_with_local_tx(tx) + self.dependencies.tx_storage.compare_bytes_with_local_tx(tx) return else: # If we have not requested the data, it is a new transaction being propagated # in the network, thus, we propagate it as well. - if self.tx_storage.can_validate_full(tx): + if self.dependencies.tx_storage.can_validate_full(tx): self.log.debug('tx received in real time from peer', tx=tx.hash_hex, peer=self.protocol.get_peer_id()) try: - success = self.vertex_handler.on_new_vertex(tx, fails_silently=False) + success = self.dependencies.vertex_handler.on_new_vertex(tx, fails_silently=False) if success: self.protocol.connections.send_tx_to_peers(tx) except InvalidNewTransaction: diff --git a/hathor/p2p/sync_v2/blockchain_streaming_client.py b/hathor/p2p/sync_v2/blockchain_streaming_client.py index 07881bc02..3859a576d 100644 --- a/hathor/p2p/sync_v2/blockchain_streaming_client.py +++ b/hathor/p2p/sync_v2/blockchain_streaming_client.py @@ -17,6 +17,7 @@ from structlog import get_logger from twisted.internet.defer import Deferred +from hathor.p2p import P2PDependencies from hathor.p2p.sync_v2.exception import ( BlockNotConnectedToPreviousBlock, InvalidVertexError, @@ -35,11 +36,17 @@ class BlockchainStreamingClient: - def __init__(self, sync_agent: 'NodeBlockSync', start_block: '_HeightInfo', end_block: '_HeightInfo') -> None: + def __init__( + self, + sync_agent: 'NodeBlockSync', + start_block: '_HeightInfo', + end_block: '_HeightInfo', + *, + dependencies: P2PDependencies, + ) -> None: + self.dependencies = dependencies self.sync_agent = sync_agent self.protocol = self.sync_agent.protocol - self.tx_storage = self.sync_agent.tx_storage - self.vertex_handler = self.sync_agent.vertex_handler self.log = logger.new(peer=self.protocol.get_short_peer_id()) @@ -99,7 +106,7 @@ def handle_blocks(self, blk: Block) -> None: # Check for repeated blocks. is_duplicated = False - if self.tx_storage.partial_vertex_exists(blk.hash): + if self.dependencies.tx_storage.partial_vertex_exists(blk.hash): # We reached a block we already have. Skip it. self._blk_repeated += 1 if self._blk_repeated > self.max_repeated_blocks: @@ -123,9 +130,9 @@ def handle_blocks(self, blk: Block) -> None: else: self.log.debug('block received', blk_id=blk.hash.hex()) - if self.tx_storage.can_validate_full(blk): + if self.dependencies.tx_storage.can_validate_full(blk): try: - self.vertex_handler.on_new_vertex(blk, fails_silently=False) + self.dependencies.vertex_handler.on_new_vertex(blk, fails_silently=False) except HathorError: self.fails(InvalidVertexError(blk.hash.hex())) return diff --git a/hathor/p2p/sync_v2/factory.py b/hathor/p2p/sync_v2/factory.py index b9be356b3..4e46c8dfb 100644 --- a/hathor/p2p/sync_v2/factory.py +++ b/hathor/p2p/sync_v2/factory.py @@ -14,38 +14,21 @@ from typing import TYPE_CHECKING -from hathor.conf.settings import HathorSettings -from hathor.p2p.manager import ConnectionsManager +from hathor.p2p import P2PDependencies from hathor.p2p.sync_agent import SyncAgent from hathor.p2p.sync_factory import SyncAgentFactory from hathor.p2p.sync_v2.agent import NodeBlockSync -from hathor.reactor import ReactorProtocol as Reactor -from hathor.transaction.vertex_parser import VertexParser -from hathor.vertex_handler import VertexHandler if TYPE_CHECKING: from hathor.p2p.protocol import HathorProtocol class SyncV2Factory(SyncAgentFactory): - def __init__( - self, - settings: HathorSettings, - connections: ConnectionsManager, - *, - vertex_parser: VertexParser, - vertex_handler: VertexHandler, - ): - self._settings = settings - self.connections = connections - self.vertex_parser = vertex_parser - self.vertex_handler = vertex_handler + def __init__(self, dependencies: P2PDependencies) -> None: + self.dependencies = dependencies - def create_sync_agent(self, protocol: 'HathorProtocol', reactor: Reactor) -> SyncAgent: + def create_sync_agent(self, protocol: 'HathorProtocol') -> SyncAgent: return NodeBlockSync( - self._settings, - protocol, - reactor=reactor, - vertex_parser=self.vertex_parser, - vertex_handler=self.vertex_handler, + protocol=protocol, + dependencies=self.dependencies, ) diff --git a/hathor/p2p/sync_v2/mempool.py b/hathor/p2p/sync_v2/mempool.py index 03651642e..c7ff2c363 100644 --- a/hathor/p2p/sync_v2/mempool.py +++ b/hathor/p2p/sync_v2/mempool.py @@ -19,6 +19,7 @@ from twisted.internet.defer import Deferred, inlineCallbacks from hathor.exception import InvalidNewTransaction +from hathor.p2p import P2PDependencies from hathor.transaction import BaseTransaction if TYPE_CHECKING: @@ -30,15 +31,14 @@ class SyncMempoolManager: """Manage the sync-v2 mempool with one peer. """ - def __init__(self, sync_agent: 'NodeBlockSync'): + def __init__(self, sync_agent: 'NodeBlockSync', *, dependencies: P2PDependencies): """Initialize the sync-v2 mempool manager.""" self.log = logger.new(peer=sync_agent.protocol.get_short_peer_id()) + self.dependencies = dependencies # Shortcuts. self.sync_agent = sync_agent - self.vertex_handler = self.sync_agent.vertex_handler - self.tx_storage = self.sync_agent.tx_storage - self.reactor = self.sync_agent.reactor + self.reactor = dependencies.reactor self._deferred: Optional[Deferred[bool]] = None @@ -90,7 +90,7 @@ def _unsafe_run(self) -> Generator[Deferred, Any, bool]: if not self.missing_tips: # No missing tips? Let's get them! tx_hashes: list[bytes] = yield self.sync_agent.get_tips() - self.missing_tips.update(h for h in tx_hashes if not self.tx_storage.transaction_exists(h)) + self.missing_tips.update(h for h in tx_hashes if not self.dependencies.tx_storage.transaction_exists(h)) while self.missing_tips: self.log.debug('We have missing tips! Let\'s start!', missing_tips=[x.hex() for x in self.missing_tips]) @@ -127,20 +127,20 @@ def _next_missing_dep(self, tx: BaseTransaction) -> Optional[bytes]: """Get the first missing dependency found of tx.""" assert not tx.is_block for txin in tx.inputs: - if not self.tx_storage.transaction_exists(txin.tx_id): + if not self.dependencies.tx_storage.transaction_exists(txin.tx_id): return txin.tx_id for parent in tx.parents: - if not self.tx_storage.transaction_exists(parent): + if not self.dependencies.tx_storage.transaction_exists(parent): return parent return None def _add_tx(self, tx: BaseTransaction) -> None: """Add tx to the DAG.""" self.missing_tips.discard(tx.hash) - if self.tx_storage.transaction_exists(tx.hash): + if self.dependencies.tx_storage.transaction_exists(tx.hash): return try: - success = self.vertex_handler.on_new_vertex(tx, fails_silently=False) + success = self.dependencies.vertex_handler.on_new_vertex(tx, fails_silently=False) if success: self.sync_agent.protocol.connections.send_tx_to_peers(tx) except InvalidNewTransaction: diff --git a/hathor/p2p/sync_v2/streamers.py b/hathor/p2p/sync_v2/streamers.py index b866ecf5f..11308d2e7 100644 --- a/hathor/p2p/sync_v2/streamers.py +++ b/hathor/p2p/sync_v2/streamers.py @@ -19,6 +19,7 @@ from twisted.internet.interfaces import IConsumer, IDelayedCall, IPushProducer from zope.interface import implementer +from hathor.p2p import P2PDependencies from hathor.transaction import BaseTransaction, Block, Transaction from hathor.transaction.storage.traversal import BFSOrderWalk from hathor.util import not_none @@ -68,7 +69,6 @@ def __str__(self): class _StreamingServerBase: def __init__(self, sync_agent: 'NodeBlockSync', *, limit: int = DEFAULT_STREAMING_LIMIT): self.sync_agent = sync_agent - self.tx_storage = self.sync_agent.tx_storage self.protocol: 'HathorProtocol' = sync_agent.protocol assert self.protocol.transport is not None @@ -212,16 +212,20 @@ class TransactionsStreamingServer(_StreamingServerBase): should there be interruptions or issues. """ - def __init__(self, - sync_agent: 'NodeBlockSync', - start_from: list[BaseTransaction], - first_block: Block, - last_block: Block, - *, - limit: int = DEFAULT_STREAMING_LIMIT) -> None: + def __init__( + self, + sync_agent: 'NodeBlockSync', + start_from: list[BaseTransaction], + first_block: Block, + last_block: Block, + *, + limit: int = DEFAULT_STREAMING_LIMIT, + dependencies: P2PDependencies, + ) -> None: # XXX: is limit needed for tx streaming? Or let's always send all txs for # a block? Very unlikely we'll reach this limit super().__init__(sync_agent, limit=limit) + self.dependencies = dependencies self.first_block: Block = first_block self.last_block: Block = last_block @@ -233,7 +237,10 @@ def __init__(self, self.current_block: Optional[Block] = self.first_block self.bfs = BFSOrderWalk( - self.tx_storage.get_vertex, is_dag_verifications=True, is_dag_funds=True, is_left_to_right=False + self.dependencies.tx_storage.get_vertex, + is_dag_verifications=True, + is_dag_funds=True, + is_left_to_right=False, ) self.iter = self.get_iter() @@ -298,7 +305,7 @@ def send_next(self) -> None: # Check if tx is confirmed by the `self.current_block` or any next block. assert cur_metadata.first_block is not None assert self.current_block is not None - first_block = self.tx_storage.get_block(cur_metadata.first_block) + first_block = self.dependencies.tx_storage.get_block(cur_metadata.first_block) if not_none(first_block.static_metadata.height) < not_none(self.current_block.static_metadata.height): self.log.debug('skipping tx: out of current block') self.bfs.skip_neighbors(cur) diff --git a/hathor/p2p/sync_v2/transaction_streaming_client.py b/hathor/p2p/sync_v2/transaction_streaming_client.py index e784a41cc..d63f2fc96 100644 --- a/hathor/p2p/sync_v2/transaction_streaming_client.py +++ b/hathor/p2p/sync_v2/transaction_streaming_client.py @@ -18,6 +18,7 @@ from structlog import get_logger from twisted.internet.defer import Deferred, inlineCallbacks +from hathor.p2p import P2PDependencies from hathor.p2p.sync_v2.exception import ( InvalidVertexError, StreamingError, @@ -37,16 +38,18 @@ class TransactionStreamingClient: - def __init__(self, - sync_agent: 'NodeBlockSync', - partial_blocks: list['Block'], - *, - limit: int) -> None: + def __init__( + self, + sync_agent: 'NodeBlockSync', + partial_blocks: list['Block'], + *, + limit: int, + dependencies: P2PDependencies, + ) -> None: + self.dependencies = dependencies self.sync_agent = sync_agent self.protocol = self.sync_agent.protocol - self.tx_storage = self.sync_agent.tx_storage - self.verification_service = self.protocol.node.verification_service - self.reactor = sync_agent.reactor + self.reactor = self.dependencies.reactor self.log = logger.new(peer=self.protocol.get_short_peer_id()) @@ -153,7 +156,7 @@ def _process_transaction(self, tx: BaseTransaction) -> Generator[Any, Any, None] # Run basic verification. if not tx.is_genesis: try: - self.verification_service.verify_basic(tx) + self.dependencies.verification_service.verify_basic(tx) except TxValidationError as e: self.fails(InvalidVertexError(repr(e))) return @@ -194,7 +197,7 @@ def _process_transaction(self, tx: BaseTransaction) -> Generator[Any, Any, None] def _update_dependencies(self, tx: BaseTransaction) -> None: """Update _existing_deps and _waiting_for with the dependencies.""" for dep in tx.get_all_dependencies(): - if self.tx_storage.transaction_exists(dep) or dep in self._db: + if self.dependencies.tx_storage.transaction_exists(dep) or dep in self._db: self._existing_deps.add(dep) else: self._waiting_for.add(dep) diff --git a/hathor/simulator/fake_connection.py b/hathor/simulator/fake_connection.py index 37056f491..aefd804b4 100644 --- a/hathor/simulator/fake_connection.py +++ b/hathor/simulator/fake_connection.py @@ -25,6 +25,7 @@ from hathor.p2p.peer import PrivatePeer from hathor.p2p.peer_endpoint import PeerAddress, PeerEndpoint from hathor.p2p.peer_id import PeerId +from hathor.transaction.storage import TransactionStorage if TYPE_CHECKING: from hathor.manager import HathorManager @@ -157,14 +158,18 @@ def is_both_synced(self, *, errmsgs: Optional[list[str]] = None) -> bool: self.log.debug('peer not synced', peer1_synced=state1_is_synced, peer2_synced=state2_is_synced) errmsgs.append('peer not synced') return False - [best_block_info1] = state1.protocol.node.tx_storage.get_n_height_tips(1) - [best_block_info2] = state2.protocol.node.tx_storage.get_n_height_tips(1) + [best_block_info1] = state1.dependencies.tx_storage.get_n_height_tips(1) + [best_block_info2] = state2.dependencies.tx_storage.get_n_height_tips(1) if best_block_info1.id != best_block_info2.id: self.log.debug('best block is different') errmsgs.append('best block is different') return False - tips1 = {i.data for i in state1.protocol.node.tx_storage.get_tx_tips()} - tips2 = {i.data for i in state2.protocol.node.tx_storage.get_tx_tips()} + tx_storage1 = state1.dependencies.tx_storage + tx_storage2 = state2.dependencies.tx_storage + assert isinstance(tx_storage1, TransactionStorage) + assert isinstance(tx_storage2, TransactionStorage) + tips1 = {i.data for i in tx_storage1.get_tx_tips()} + tips2 = {i.data for i in tx_storage2.get_tx_tips()} if tips1 != tips2: self.log.debug('tx tips are different') errmsgs.append('tx tips are different') diff --git a/hathor/verification/transaction_verifier.py b/hathor/verification/transaction_verifier.py index 906df38c2..cb7893b5f 100644 --- a/hathor/verification/transaction_verifier.py +++ b/hathor/verification/transaction_verifier.py @@ -51,8 +51,6 @@ def __init__(self, *, settings: HathorSettings, daa: DifficultyAdjustmentAlgorit def verify_parents_basic(self, tx: Transaction) -> None: """Verify number and non-duplicity of parents.""" - assert tx.storage is not None - # check if parents are duplicated parents_set = set(tx.parents) if len(tx.parents) > len(parents_set): diff --git a/tests/others/test_metrics.py b/tests/others/test_metrics.py index 45ca40631..8a8dc546e 100644 --- a/tests/others/test_metrics.py +++ b/tests/others/test_metrics.py @@ -234,8 +234,8 @@ def build_hathor_protocol(): p2p_manager=manager.connections, use_ssl=False, inbound=False, - settings=self._settings, - addr=PeerAddress.parse(f'tcp://localhost:{port}') + addr=PeerAddress.parse(f'tcp://localhost:{port}'), + dependencies=Mock(), ) protocol._peer = PrivatePeer.auto_generated().to_public_peer() port += 1 diff --git a/tests/p2p/test_bootstrap.py b/tests/p2p/test_bootstrap.py index 75f7afcdf..16bec0d0a 100644 --- a/tests/p2p/test_bootstrap.py +++ b/tests/p2p/test_bootstrap.py @@ -1,4 +1,5 @@ from typing import Callable +from unittest.mock import Mock from twisted.internet.defer import Deferred from twisted.internet.interfaces import IProtocol @@ -61,7 +62,13 @@ class BootstrapTestCase(unittest.TestCase): def test_mock_discovery(self) -> None: pubsub = PubSubManager(self.clock) peer = PrivatePeer.auto_generated() - connections = ConnectionsManager(self._settings, self.clock, peer, pubsub, True, self.rng, True) + connections = ConnectionsManager( + dependencies=Mock(), + my_peer=peer, + pubsub=pubsub, + rng=self.rng, + ssl=True, + ) host_ports1 = [ ('foobar', 1234, None), ('127.0.0.99', 9999, None), @@ -89,7 +96,13 @@ def test_mock_discovery(self) -> None: def test_dns_discovery(self) -> None: pubsub = PubSubManager(self.clock) peer = PrivatePeer.auto_generated() - connections = ConnectionsManager(self._settings, self.clock, peer, pubsub, True, self.rng, True) + connections = ConnectionsManager( + dependencies=Mock(), + my_peer=peer, + pubsub=pubsub, + rng=self.rng, + ssl=True, + ) bootstrap_a = [ '127.0.0.99', '127.0.0.88', diff --git a/tests/p2p/test_get_best_blockchain.py b/tests/p2p/test_get_best_blockchain.py index 200fc41e2..67ed0bc74 100644 --- a/tests/p2p/test_get_best_blockchain.py +++ b/tests/p2p/test_get_best_blockchain.py @@ -204,11 +204,11 @@ def test_node_without_get_best_blockchain_capability(self) -> None: manager1 = self.create_peer() manager2 = self.create_peer() - cababilities_without_get_best_blockchain = [ + capabilities_without_get_best_blockchain = [ self._settings.CAPABILITY_WHITELIST, self._settings.CAPABILITY_SYNC_VERSION, ] - manager2.capabilities = cababilities_without_get_best_blockchain + manager2.connections.dependencies.capabilities = capabilities_without_get_best_blockchain conn12 = FakeConnection(manager1, manager2, latency=0.05) self.simulator.add_connection(conn12) @@ -222,7 +222,7 @@ def test_node_without_get_best_blockchain_capability(self) -> None: # assert the peers have the proper capabilities protocol2 = connected_peers1[0] - self.assertTrue(protocol2.capabilities.issuperset(set(cababilities_without_get_best_blockchain))) + self.assertTrue(protocol2.capabilities.issuperset(set(capabilities_without_get_best_blockchain))) protocol1 = connected_peers2[0] default_capabilities = self._settings.get_default_capabilities() self.assertTrue(protocol1.capabilities.issuperset(set(default_capabilities))) diff --git a/tests/p2p/test_sync.py b/tests/p2p/test_sync.py index fc7712495..533d14192 100644 --- a/tests/p2p/test_sync.py +++ b/tests/p2p/test_sync.py @@ -2,6 +2,7 @@ from hathor.checkpoint import Checkpoint as cp from hathor.crypto.util import decode_address +from hathor.p2p import P2PDependencies from hathor.p2p.protocol import PeerIdState from hathor.p2p.sync_version import SyncVersion from hathor.simulator import FakeConnection @@ -268,13 +269,29 @@ def test_downloader(self) -> None: downloader = conn.proto2.connections.get_sync_factory(SyncVersion.V1_1).get_downloader() - node_sync1 = NodeSyncTimestamp( - conn.proto1, downloader, reactor=conn.proto1.node.reactor, vertex_parser=self.manager1.vertex_parser + p2p_dependencies1 = P2PDependencies( + reactor=self.manager1.reactor, + settings=self._settings, + vertex_parser=self.manager1.vertex_parser, + tx_storage=self.manager1.tx_storage, + vertex_handler=self.manager1.vertex_handler, + verification_service=self.manager1.verification_service, + capabilities=[], + whitelist_only=False, ) - node_sync1.start() - node_sync2 = NodeSyncTimestamp( - conn.proto2, downloader, reactor=conn.proto2.node.reactor, vertex_parser=manager2.vertex_parser + p2p_dependencies2 = P2PDependencies( + reactor=manager2.reactor, + settings=self._settings, + vertex_parser=manager2.vertex_parser, + tx_storage=manager2.tx_storage, + vertex_handler=manager2.vertex_handler, + verification_service=manager2.verification_service, + capabilities=[], + whitelist_only=False, ) + node_sync1 = NodeSyncTimestamp(conn.proto1, downloader, dependencies=p2p_dependencies1) + node_sync1.start() + node_sync2 = NodeSyncTimestamp(conn.proto2, downloader, dependencies=p2p_dependencies2) node_sync2.start() self.assertTrue(isinstance(conn.proto1.state, PeerIdState))