Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(p2p): remove builder methods from P2PManager [part ?] #1159

Draft
wants to merge 1 commit into
base: refactor/p2p/renames
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 31 additions & 18 deletions hathor/builder/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
from hathor.manager import HathorManager
from hathor.mining.cpu_mining_service import CpuMiningService
from hathor.p2p import P2PDependencies, P2PManager, SingleProcessP2PDependencies
from hathor.p2p.p2p_manager import SyncFactoryConfig
from hathor.p2p.peer import PrivatePeer
from hathor.p2p.sync_version import SyncVersion
from hathor.pubsub import PubSubManager
from hathor.reactor import ReactorProtocol as Reactor
from hathor.storage import RocksDBStorage
Expand All @@ -62,28 +64,38 @@ class SyncSupportLevel(IntEnum):
ENABLED = 2 # available and enabled by default, possible to disable at runtime

@classmethod
def add_factories(
def get_factories(
cls,
p2p_manager: P2PManager,
tx_storage: TransactionStorage,
dependencies: P2PDependencies,
sync_v1_support: 'SyncSupportLevel',
sync_v2_support: 'SyncSupportLevel',
) -> None:
"""Adds the sync factory to the manager according to the support level."""
) -> dict[SyncVersion, SyncFactoryConfig]:
"""Create sync factories according to the support level."""
from hathor.p2p.sync_v1.factory import SyncV11Factory
from hathor.p2p.sync_v2.factory import SyncV2Factory
from hathor.p2p.sync_version import SyncVersion
log = logger.new()
sync_factories: dict[SyncVersion, SyncFactoryConfig] = {}

# sync-v1 support:
if sync_v1_support > cls.UNAVAILABLE:
p2p_manager.add_sync_factory(SyncVersion.V1_1, SyncV11Factory(dependencies))
if sync_v1_support is cls.ENABLED:
p2p_manager.enable_sync_version(SyncVersion.V1_1)
sync_factories[SyncVersion.V1_1] = SyncFactoryConfig(
factory=SyncV11Factory(dependencies),
enabled=sync_v1_support is cls.ENABLED
)

# sync-v2 support:
if sync_v2_support > cls.UNAVAILABLE:
p2p_manager.add_sync_factory(SyncVersion.V2, SyncV2Factory(dependencies))
if sync_v2_support is cls.ENABLED:
p2p_manager.enable_sync_version(SyncVersion.V2)
sync_factories[SyncVersion.V2] = SyncFactoryConfig(
factory=SyncV2Factory(dependencies),
enabled=sync_v2_support is cls.ENABLED
)
log.debug('enable sync-v2 indexes')
assert tx_storage.indexes is not None
tx_storage.indexes.enable_mempool_index()

return sync_factories


class StorageType(Enum):
Expand Down Expand Up @@ -266,7 +278,6 @@ def build(self) -> BuildArtifacts:
**kwargs
)

p2p_manager.finalize_factories()
if poa_block_producer:
poa_block_producer.manager = manager

Expand Down Expand Up @@ -407,16 +418,23 @@ def _get_or_create_p2p_manager(self) -> P2PManager:

enable_ssl = True
my_peer = self._get_peer()
tx_storage = self._get_or_create_tx_storage()

dependencies = SingleProcessP2PDependencies(
reactor=self._get_reactor(),
settings=self._get_or_create_settings(),
vertex_parser=self._get_or_create_vertex_parser(),
tx_storage=self._get_or_create_tx_storage(),
tx_storage=tx_storage,
vertex_handler=self._get_or_create_vertex_handler(),
verification_service=self._get_or_create_verification_service(),
pubsub=self._get_or_create_pubsub(),
)
sync_factories = SyncSupportLevel.get_factories(
tx_storage,
dependencies,
self._sync_v1_support,
self._sync_v2_support,
)

self._p2p_manager = P2PManager(
dependencies=dependencies,
Expand All @@ -425,12 +443,7 @@ def _get_or_create_p2p_manager(self) -> P2PManager:
whitelist_only=False,
rng=self._rng,
capabilities=self._get_or_create_capabilities(),
)
SyncSupportLevel.add_factories(
self._p2p_manager,
dependencies,
self._sync_v1_support,
self._sync_v2_support,
sync_factories=sync_factories,
)
return self._p2p_manager

Expand Down
19 changes: 11 additions & 8 deletions hathor/builder/cli_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from typing import Any, Optional

from structlog import get_logger
from typing_extensions import assert_never

from hathor.cli.run_node_args import RunNodeArgs
from hathor.cli.side_dag import SideDagArgs
Expand Down Expand Up @@ -238,6 +239,8 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
case SyncChoice.V2_ONLY:
sync_v1_support = SyncSupportLevel.UNAVAILABLE
sync_v2_support = SyncSupportLevel.ENABLED
case _:
assert_never(sync_choice)

pubsub = PubSubManager(reactor)

Expand Down Expand Up @@ -341,23 +344,24 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
)

capabilities = settings.get_default_capabilities()
sync_factories = SyncSupportLevel.get_factories(
tx_storage,
p2p_dependencies,
sync_v1_support,
sync_v2_support,
)

p2p_manager = P2PManager(
dependencies=p2p_dependencies,
my_peer=peer,
ssl=True,
whitelist_only=False,
rng=Random(),
capabilities=capabilities,
sync_factories=sync_factories,
hostname=hostname,
)

SyncSupportLevel.add_factories(
p2p_manager,
p2p_dependencies,
sync_v1_support,
sync_v2_support,
)

from hathor.consensus.poa import PoaBlockProducer, PoaSignerFile
poa_block_producer: PoaBlockProducer | None = None
if settings.CONSENSUS_ALGORITHM.is_poa():
Expand Down Expand Up @@ -399,7 +403,6 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
'--x-ipython-kernel must be used with --x-asyncio-reactor')
self._start_ipykernel()

p2p_manager.finalize_factories()
if poa_block_producer:
poa_block_producer.manager = self.manager

Expand Down
4 changes: 0 additions & 4 deletions hathor/p2p/dependencies/p2p_dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,3 @@ def get_best_block_tips(self) -> list[VertexId]:
@abstractmethod
def partial_vertex_exists(self, vertex_id: VertexId) -> bool:
raise NotImplementedError

@abstractmethod
def enable_mempool_index(self) -> None:
raise NotImplementedError
4 changes: 0 additions & 4 deletions hathor/p2p/dependencies/single_process_p2p_dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,3 @@ def get_best_block_tips(self) -> list[VertexId]:
@override
def partial_vertex_exists(self, vertex_id: VertexId) -> bool:
return self._tx_storage.partial_vertex_exists(vertex_id)

@override
def enable_mempool_index(self) -> None:
self._indexes.enable_mempool_index()
62 changes: 30 additions & 32 deletions hathor/p2p/p2p_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from twisted.protocols.tls import TLSMemoryBIOFactory, TLSMemoryBIOProtocol
from twisted.python.failure import Failure
from twisted.web.client import Agent
from typing_extensions import Self

from hathor.p2p import P2PDependencies
from hathor.p2p.entrypoint import Entrypoint
Expand Down Expand Up @@ -67,6 +68,17 @@ class PeerConnectionsMetrics(NamedTuple):
known_peers_count: int


class SyncFactoryConfig(NamedTuple):
factory: SyncAgentFactory
enabled: bool

def enable(self) -> Self:
return self._replace(enabled=True)

def disable(self) -> Self:
return self._replace(enabled=False)


class P2PManager:
""" It manages all peer-to-peer connections and events related to control messages.
"""
Expand All @@ -81,8 +93,6 @@ class GlobalRateLimiter:
whitelist_only: bool
unverified_peer_storage: UnverifiedPeerStorage
verified_peer_storage: VerifiedPeerStorage
_sync_factories: dict[SyncVersion, SyncAgentFactory]
_enabled_sync_versions: set[SyncVersion]

rate_limiter: RateLimiter

Expand All @@ -94,6 +104,7 @@ def __init__(
rng: Random,
whitelist_only: bool,
capabilities: list[str],
sync_factories: dict[SyncVersion, SyncFactoryConfig],
hostname: str | None = None,
) -> None:
self.log = logger.new()
Expand All @@ -108,7 +119,6 @@ def __init__(

self.reactor = dependencies.reactor
self.my_peer = my_peer
self._finalized_factories = False

# List of whitelisted peers
self.peers_whitelist: list[PeerId] = []
Expand Down Expand Up @@ -199,20 +209,15 @@ def __init__(
# Timestamp when the last discovery ran
self._last_discovery: float = 0.

if all(not config.enabled for config in sync_factories.values()):
raise TypeError('Class built incorrectly without any enabled sync version')

# sync-manager factories
self._sync_factories = {}
self._enabled_sync_versions = set()
self._sync_factories = sync_factories

# agent to perform HTTP requests
self._http_agent = Agent(self.reactor)

def add_sync_factory(self, sync_version: SyncVersion, sync_factory: SyncAgentFactory) -> None:
"""Add factory for the given sync version, must use a sync version that does not already exist."""
assert not self._finalized_factories, 'Cannot modify sync factories after it is finalized'
if sync_version in self._sync_factories:
raise ValueError('sync version already exists')
self._sync_factories[sync_version] = sync_factory

def get_available_sync_versions(self) -> set[SyncVersion]:
"""What sync versions the manager is capable of using, they are not necessarily enabled."""
return set(self._sync_factories.keys())
Expand All @@ -223,36 +228,30 @@ def is_sync_version_available(self, sync_version: SyncVersion) -> bool:

def get_enabled_sync_versions(self) -> set[SyncVersion]:
"""What sync versions are enabled for use, it is necessarily a subset of the available versions."""
return self._enabled_sync_versions.copy()
return {version for version, config in self._sync_factories.items() if config.enabled}

def is_sync_version_enabled(self, sync_version: SyncVersion) -> bool:
"""Whether the given sync version is enabled for use, being enabled implies being available."""
return sync_version in self._enabled_sync_versions
if config := self._sync_factories.get(sync_version):
return config.enabled
return False

def enable_sync_version(self, sync_version: SyncVersion) -> None:
"""Enable using the given sync version on new connections, it must be available before being enabled."""
assert sync_version in self._sync_factories
if sync_version in self._enabled_sync_versions:
config = self._sync_factories[sync_version]
if config.enabled:
self.log.info('tried to enable a sync verison that was already enabled, nothing to do')
return
self._enabled_sync_versions.add(sync_version)
self._sync_factories[sync_version] = config.enable()

def disable_sync_version(self, sync_version: SyncVersion) -> None:
"""Disable using the given sync version, it WILL NOT close connections using the given version."""
if sync_version not in self._enabled_sync_versions:
self.log.info('tried to disable a sync verison that was already disabled, nothing to do')
return
self._enabled_sync_versions.discard(sync_version)

def finalize_factories(self) -> None:
"""Signal that no more sync factories will be added. This method must be called before start()."""
self._finalized_factories = True
if len(self._enabled_sync_versions) == 0:
raise TypeError('Class built incorrectly without any enabled sync version')

if self.is_sync_version_available(SyncVersion.V2):
self.log.debug('enable sync-v2 indexes')
self.dependencies.enable_mempool_index()
if config := self._sync_factories.get(sync_version):
if not config.enabled:
self.log.info('tried to disable a sync verison that was already disabled, nothing to do')
return
self._sync_factories[sync_version] = config.disable()

def add_listen_address_description(self, addr: str) -> None:
"""Add address to listen for incoming connections."""
Expand Down Expand Up @@ -285,7 +284,6 @@ def enable_rate_limiter(self, max_hits: int = 16, window_seconds: float = 1) ->

def start(self) -> None:
"""Listen on the given address descriptions and start accepting and processing connections."""
assert self._finalized_factories, 'sync factories must be finalized by calling `finalize_factories`'
self.lc_reconnect.start(5, now=False)
self.lc_sync_update.start(self.lc_sync_update_interval, now=False)

Expand Down Expand Up @@ -332,7 +330,7 @@ def _get_peers_count(self) -> PeerConnectionsMetrics:
def get_sync_factory(self, sync_version: SyncVersion) -> SyncAgentFactory:
"""Get the sync factory for a given version, MUST be available or it will raise an assert."""
assert sync_version in self._sync_factories, f'sync_version {sync_version} is not available'
return self._sync_factories[sync_version]
return self._sync_factories[sync_version].factory

def has_synced_peer(self) -> bool:
""" Return whether we are synced to at least one peer.
Expand Down
9 changes: 6 additions & 3 deletions tests/p2p/test_bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
from typing_extensions import override

from hathor.p2p.entrypoint import Entrypoint, Protocol
from hathor.p2p.p2p_manager import P2PManager
from hathor.p2p.p2p_manager import P2PManager, SyncFactoryConfig
from hathor.p2p.peer import PrivatePeer
from hathor.p2p.peer_discovery import DNSPeerDiscovery, PeerDiscovery
from hathor.p2p.peer_discovery.dns import LookupResult
from hathor.p2p.sync_version import SyncVersion
from tests import unittest
from tests.test_memory_reactor_clock import TestMemoryReactorClock

Expand Down Expand Up @@ -54,7 +55,8 @@ def test_mock_discovery(self) -> None:
whitelist_only=True,
rng=self.rng,
ssl=True,
capabilities=self._settings.get_default_capabilities()
capabilities=self._settings.get_default_capabilities(),
sync_factories={SyncVersion.V2: SyncFactoryConfig(Mock(), True)},
)
host_ports1 = [
('foobar', 1234),
Expand Down Expand Up @@ -84,7 +86,8 @@ def test_dns_discovery(self) -> None:
whitelist_only=True,
rng=self.rng,
ssl=True,
capabilities=self._settings.get_default_capabilities()
capabilities=self._settings.get_default_capabilities(),
sync_factories={SyncVersion.V2: SyncFactoryConfig(Mock(), True)},
)
bootstrap_a = [
'127.0.0.99',
Expand Down
Loading