diff --git a/eth2/beacon/scripts/run_beacon_nodes.py b/eth2/beacon/scripts/run_beacon_nodes.py index b3f6a9b08d..c9aae368d3 100755 --- a/eth2/beacon/scripts/run_beacon_nodes.py +++ b/eth2/beacon/scripts/run_beacon_nodes.py @@ -132,7 +132,7 @@ def cmd(self) -> str: "--enable-metrics", f"--metrics-port={self.metrics_port}", "--disable-discovery", - "-l debug2", + "-l debug", "interop", f"--validators={','.join(str(v) for v in self.validators)}", f"--start-time={self.start_time}", diff --git a/eth2/events.py b/eth2/events.py new file mode 100644 index 0000000000..f1278339b3 --- /dev/null +++ b/eth2/events.py @@ -0,0 +1,5 @@ +from lahja import BaseEvent + + +class SyncRequest(BaseEvent): + ... diff --git a/tests/libp2p/bcc/test_syncing.py b/tests/libp2p/bcc/test_syncing.py index bdc870f65d..837485c209 100644 --- a/tests/libp2p/bcc/test_syncing.py +++ b/tests/libp2p/bcc/test_syncing.py @@ -31,15 +31,23 @@ async def get_sync_setup( bob_chaindb=bob_chaindb, bob_branch=bob_branch, genesis_state=genesis_state, + alice_event_bus=event_bus, + handshake=False, ) async with peer_pair as (alice, bob): alice_syncer = BeaconChainSyncerFactory( - chain_db__db=alice.chain.chaindb.db, peer_pool=alice.handshaked_peers + chain_db__db=alice.chain.chaindb.db, + peer_pool=alice.handshaked_peers, + event_bus=event_bus, ) try: - await asyncio.wait_for(alice_syncer.run(), timeout=sync_timeout) + task = asyncio.ensure_future(alice_syncer.run()) + # sync will start when alice request_status + await alice.request_status(bob.peer_id) + # Wait sync to complete + await asyncio.wait_for(task, timeout=sync_timeout) except asyncio.TimeoutError: # After sync is cancelled, return to let the caller do assertions about the state pass diff --git a/trinity/components/eth2/beacon/component.py b/trinity/components/eth2/beacon/component.py index 4f5e9f8dcd..2e44d824cd 100644 --- a/trinity/components/eth2/beacon/component.py +++ b/trinity/components/eth2/beacon/component.py @@ -78,6 +78,11 @@ def configure_parser(cls, arg_parser: ArgumentParser, subparser: _SubParsersActi help="Metrics server port", default=8008, ) + arg_parser.add_argument( + "--debug-libp2p", + action="store_true", + help="Enable debug logging of libp2p", + ) @property def is_enabled(self) -> bool: @@ -116,6 +121,12 @@ async def do_run(cls, boot_info: BootInfo, event_bus: EndpointAPI) -> None: key_pair = cls._load_or_create_node_key(boot_info) beacon_app_config = trinity_config.get_app_config(BeaconAppConfig) base_db = DBClient.connect(trinity_config.database_ipc_path) + + if boot_info.args.debug_libp2p: + logging.getLogger("libp2p").setLevel(logging.DEBUG) + else: + logging.getLogger("libp2p").setLevel(logging.INFO) + with base_db: chain_config = beacon_app_config.get_chain_config() chain = chain_config.beacon_chain_class( @@ -135,6 +146,7 @@ async def do_run(cls, boot_info: BootInfo, event_bus: EndpointAPI) -> None: preferred_nodes=trinity_config.preferred_nodes, chain=chain, subnets=subnets, + event_bus=event_bus, ) receive_server = BCCReceiveServer( @@ -185,6 +197,7 @@ async def do_run(cls, boot_info: BootInfo, event_bus: EndpointAPI) -> None: peer_pool=libp2p_node.handshaked_peers, block_importer=SyncBlockImporter(chain), genesis_config=chain_config.genesis_config, + event_bus=event_bus, token=libp2p_node.cancel_token, ) http_server = HTTPServer( diff --git a/trinity/protocol/bcc_libp2p/node.py b/trinity/protocol/bcc_libp2p/node.py index bb5ed7c348..da1b96e5ff 100644 --- a/trinity/protocol/bcc_libp2p/node.py +++ b/trinity/protocol/bcc_libp2p/node.py @@ -10,6 +10,11 @@ Sequence, Set, Tuple, + Iterator, +) + +from lahja import ( + EndpointAPI, ) from cancel_token import ( @@ -39,6 +44,7 @@ SigningRoot, SubnetId, ) +from eth2.events import SyncRequest from libp2p import ( initialize_default_swarm, @@ -140,7 +146,7 @@ make_rpc_v1_ssz_protocol_id, make_tcp_ip_maddr, Interaction, - compare_chain_tip_and_finalized_epoch, + peer_is_ahead, validate_peer_status, get_my_status, get_requested_beacon_blocks, @@ -160,6 +166,7 @@ REQ_RESP_BEACON_BLOCKS_BY_ROOT_SSZ = make_rpc_v1_ssz_protocol_id( REQ_RESP_BEACON_BLOCKS_BY_ROOT ) +NEXT_UPDATE_INTERVAL = 10 @dataclass @@ -241,6 +248,10 @@ def get_best(self, field: str) -> Peer: def get_best_head_slot_peer(self) -> Peer: return self.get_best("head_slot") + @property + def peer_ids(self) -> Iterator[ID]: + return iter(self.peers.keys()) + DIAL_RETRY_COUNT = 10 @@ -258,6 +269,7 @@ class Node(BaseService): preferred_nodes: Tuple[Multiaddr, ...] chain: BaseBeaconChain subnets: Set[SubnetId] + _event_bus: EndpointAPI handshaked_peers: PeerPool = None @@ -267,6 +279,7 @@ def __init__( listen_ip: str, listen_port: int, chain: BaseBeaconChain, + event_bus: EndpointAPI, security_protocol_ops: Dict[TProtocol, BaseSecureTransport] = None, muxer_protocol_ops: Dict[TProtocol, IMuxedConn] = None, gossipsub_params: Optional[GossipsubParams] = None, @@ -316,6 +329,7 @@ def __init__( ) self.chain = chain + self._event_bus = event_bus self.handshaked_peers = PeerPool() @@ -327,6 +341,7 @@ def is_started(self) -> bool: async def _run(self) -> None: self.logger.info("libp2p node %s is up", self.listen_maddr) + self.run_daemon_task(self.update_status()) await self.cancellation() async def start(self) -> None: @@ -595,8 +610,12 @@ async def _handle_status(self, stream: INetStream) -> None: self._add_peer_from_status(peer_id, peer_status) - # Check if we are behind the peer - compare_chain_tip_and_finalized_epoch(self.chain, peer_status) + if peer_is_ahead(self.chain, peer_status): + logger.debug( + "Peer's chain is ahead of us, start syncing with the peer(%s)", + str(peer_id), + ) + await self._event_bus.broadcast(SyncRequest()) async def request_status(self, peer_id: ID) -> None: self.logger.info("Initiate handshake with %s", str(peer_id)) @@ -615,8 +634,12 @@ async def request_status(self, peer_id: ID) -> None: self._add_peer_from_status(peer_id, peer_status) - # Check if we are behind the peer - compare_chain_tip_and_finalized_epoch(self.chain, peer_status) + if peer_is_ahead(self.chain, peer_status): + logger.debug( + "Peer's chain is ahead of us, start syncing with the peer(%s)", + str(peer_id), + ) + await self._event_bus.broadcast(SyncRequest()) async def _handle_goodbye(self, stream: INetStream) -> None: async with Interaction(stream) as interaction: @@ -720,3 +743,9 @@ async def request_beacon_blocks_by_root( ]) return blocks + + async def update_status(self) -> None: + while True: + for peer_id in self.handshaked_peers.peer_ids: + asyncio.ensure_future(self.request_status(peer_id)) + await asyncio.sleep(NEXT_UPDATE_INTERVAL) diff --git a/trinity/protocol/bcc_libp2p/utils.py b/trinity/protocol/bcc_libp2p/utils.py index 17d06b3f42..2300d01e28 100644 --- a/trinity/protocol/bcc_libp2p/utils.py +++ b/trinity/protocol/bcc_libp2p/utils.py @@ -163,20 +163,17 @@ async def validate_peer_status(chain: BaseBeaconChain, peer_status: Status) -> N ) -def compare_chain_tip_and_finalized_epoch(chain: BaseBeaconChain, peer_status: Status) -> None: +def peer_is_ahead(chain: BaseBeaconChain, peer_status: Status) -> bool: checkpoint = chain.get_head_state().finalized_checkpoint head_block = chain.get_canonical_head() peer_has_higher_finalized_epoch = peer_status.finalized_epoch > checkpoint.epoch peer_has_equal_finalized_epoch = peer_status.finalized_epoch == checkpoint.epoch peer_has_higher_head_slot = peer_status.head_slot > head_block.slot - if ( + return ( peer_has_higher_finalized_epoch or (peer_has_equal_finalized_epoch and peer_has_higher_head_slot) - ): - # TODO: kickoff syncing process with this peer - logger.debug("Peer's chain is ahead of us, start syncing with the peer.") - pass + ) def validate_start_slot(chain: BaseBeaconChain, start_slot: Slot) -> None: @@ -410,7 +407,7 @@ def peer_id(self) -> ID: return self.stream.muxed_conn.peer_id def debug(self, message: str) -> None: - self.logger.debug( + self.logger.debug2( "Interaction %s with %s %s", self.stream.get_protocol().split("/")[4], str(self.peer_id)[:15], diff --git a/trinity/sync/beacon/chain.py b/trinity/sync/beacon/chain.py index bb39437fd2..29fa0d5dc6 100644 --- a/trinity/sync/beacon/chain.py +++ b/trinity/sync/beacon/chain.py @@ -1,4 +1,3 @@ -import asyncio from typing import ( Tuple, AsyncGenerator, @@ -7,6 +6,9 @@ from eth_utils import ( ValidationError, ) +from lahja import ( + EndpointAPI, +) from cancel_token import ( CancelToken, @@ -24,13 +26,12 @@ from eth2.beacon.typing import ( Slot, ) +from eth2.events import SyncRequest from trinity.db.beacon.chain import BaseAsyncBeaconChainDB from trinity.protocol.bcc_libp2p.node import PeerPool, Peer from trinity.sync.beacon.constants import ( MAX_BLOCKS_PER_REQUEST, - NEXT_SYNC_CHECK_INTERVAL, - PEER_SELECTION_RETRY_INTERVAL, ) from trinity.sync.common.chain import ( SyncBlockImporter, @@ -51,12 +52,14 @@ class BeaconChainSyncer(BaseService): block_importer: SyncBlockImporter genesis_config: Eth2GenesisConfig sync_peer: Peer + _event_bus: EndpointAPI def __init__(self, chain_db: BaseAsyncBeaconChainDB, peer_pool: PeerPool, block_importer: SyncBlockImporter, genesis_config: Eth2GenesisConfig, + event_bus: EndpointAPI, token: CancelToken = None) -> None: super().__init__(token) @@ -64,17 +67,16 @@ def __init__(self, self.peer_pool = peer_pool self.block_importer = block_importer self.genesis_config = genesis_config + self._event_bus = event_bus self.sync_peer = None async def _run(self) -> None: - while True: + async for event in self.wait_iter(self._event_bus.stream(SyncRequest)): try: self.sync_peer = await self.wait(self.select_sync_peer()) except LeadingPeerNotFonud as exception: self.logger.info("No suitable peers to sync with: %s", exception) - # wait some time and try again - await asyncio.sleep(PEER_SELECTION_RETRY_INTERVAL) continue else: # sync peer selected successfully @@ -87,7 +89,6 @@ async def _run(self) -> None: ) # Reset the sync peer self.sync_peer = None - await asyncio.sleep(NEXT_SYNC_CHECK_INTERVAL) async def select_sync_peer(self) -> Peer: if len(self.peer_pool) == 0: diff --git a/trinity/sync/beacon/constants.py b/trinity/sync/beacon/constants.py index 19c4224a48..ec8b873f64 100644 --- a/trinity/sync/beacon/constants.py +++ b/trinity/sync/beacon/constants.py @@ -1,3 +1 @@ MAX_BLOCKS_PER_REQUEST = 64 -NEXT_SYNC_CHECK_INTERVAL = 10 -PEER_SELECTION_RETRY_INTERVAL = 5 diff --git a/trinity/tools/bcc_factories.py b/trinity/tools/bcc_factories.py index 573218102c..82f09f0100 100644 --- a/trinity/tools/bcc_factories.py +++ b/trinity/tools/bcc_factories.py @@ -15,13 +15,13 @@ from cancel_token import CancelToken +from lahja import EndpointAPI from libp2p.crypto.secp256k1 import create_new_key_pair from libp2p.peer.id import ID from libp2p.peer.peerinfo import ( PeerInfo, ) - from eth_utils import to_tuple from eth.constants import ( @@ -59,7 +59,6 @@ AtomicDBFactory, ) - try: import factory except ImportError: @@ -89,6 +88,7 @@ class Meta: preferred_nodes: Tuple[Multiaddr, ...] = tuple() subnets: None chain = factory.SubFactory(BeaconChainFactory) + event_bus = None @classmethod def create_batch(cls, number: int) -> Tuple[Node, ...]: @@ -116,6 +116,7 @@ async def ConnectionPairFactory( bob_chaindb: AsyncBeaconChainDB = None, bob_branch: Collection[BaseBeaconBlock] = None, genesis_state: BeaconState = None, + alice_event_bus: EndpointAPI = None, cancel_token: CancelToken = None, handshake: bool = True, ) -> AsyncIterator[Tuple[Node, Node]]: @@ -136,7 +137,7 @@ async def ConnectionPairFactory( alice_kwargs["chain__genesis_state"] = genesis_state bob_kwargs["chain__genesis_state"] = genesis_state - alice = NodeFactory(cancel_token=cancel_token, **alice_kwargs) + alice = NodeFactory(cancel_token=cancel_token, event_bus=alice_event_bus, **alice_kwargs) bob = NodeFactory(cancel_token=cancel_token, **bob_kwargs) async with run_service(alice), run_service(bob): await asyncio.sleep(0.01) @@ -266,4 +267,5 @@ class Meta: lambda obj: SimpleWriterBlockImporter(obj.chain_db) ) genesis_config = SERENITY_GENESIS_CONFIG + event_bus = None token = None