Skip to content
This repository has been archived by the owner on Jul 1, 2021. It is now read-only.

Commit

Permalink
Merge pull request #1413 from ChihChengLiang/peer-sync-update
Browse files Browse the repository at this point in the history
Rework Syncer
  • Loading branch information
ChihChengLiang authored Dec 23, 2019
2 parents d81ec9e + 560db71 commit d76e8be
Show file tree
Hide file tree
Showing 9 changed files with 80 additions and 27 deletions.
2 changes: 1 addition & 1 deletion eth2/beacon/scripts/run_beacon_nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def cmd(self) -> str:
"--enable-metrics",
f"--metrics-port={self.metrics_port}",
"--disable-discovery",
"-l debug2",
"-l debug",
"interop",
f"--validators={','.join(str(v) for v in self.validators)}",
f"--start-time={self.start_time}",
Expand Down
5 changes: 5 additions & 0 deletions eth2/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from lahja import BaseEvent


class SyncRequest(BaseEvent):
...
12 changes: 10 additions & 2 deletions tests/libp2p/bcc/test_syncing.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,23 @@ async def get_sync_setup(
bob_chaindb=bob_chaindb,
bob_branch=bob_branch,
genesis_state=genesis_state,
alice_event_bus=event_bus,
handshake=False,
)

async with peer_pair as (alice, bob):
alice_syncer = BeaconChainSyncerFactory(
chain_db__db=alice.chain.chaindb.db, peer_pool=alice.handshaked_peers
chain_db__db=alice.chain.chaindb.db,
peer_pool=alice.handshaked_peers,
event_bus=event_bus,
)

try:
await asyncio.wait_for(alice_syncer.run(), timeout=sync_timeout)
task = asyncio.ensure_future(alice_syncer.run())
# sync will start when alice request_status
await alice.request_status(bob.peer_id)
# Wait sync to complete
await asyncio.wait_for(task, timeout=sync_timeout)
except asyncio.TimeoutError:
# After sync is cancelled, return to let the caller do assertions about the state
pass
Expand Down
13 changes: 13 additions & 0 deletions trinity/components/eth2/beacon/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ def configure_parser(cls, arg_parser: ArgumentParser, subparser: _SubParsersActi
help="Metrics server port",
default=8008,
)
arg_parser.add_argument(
"--debug-libp2p",
action="store_true",
help="Enable debug logging of libp2p",
)

@property
def is_enabled(self) -> bool:
Expand Down Expand Up @@ -116,6 +121,12 @@ async def do_run(cls, boot_info: BootInfo, event_bus: EndpointAPI) -> None:
key_pair = cls._load_or_create_node_key(boot_info)
beacon_app_config = trinity_config.get_app_config(BeaconAppConfig)
base_db = DBClient.connect(trinity_config.database_ipc_path)

if boot_info.args.debug_libp2p:
logging.getLogger("libp2p").setLevel(logging.DEBUG)
else:
logging.getLogger("libp2p").setLevel(logging.INFO)

with base_db:
chain_config = beacon_app_config.get_chain_config()
chain = chain_config.beacon_chain_class(
Expand All @@ -135,6 +146,7 @@ async def do_run(cls, boot_info: BootInfo, event_bus: EndpointAPI) -> None:
preferred_nodes=trinity_config.preferred_nodes,
chain=chain,
subnets=subnets,
event_bus=event_bus,
)

receive_server = BCCReceiveServer(
Expand Down Expand Up @@ -185,6 +197,7 @@ async def do_run(cls, boot_info: BootInfo, event_bus: EndpointAPI) -> None:
peer_pool=libp2p_node.handshaked_peers,
block_importer=SyncBlockImporter(chain),
genesis_config=chain_config.genesis_config,
event_bus=event_bus,
token=libp2p_node.cancel_token,
)
http_server = HTTPServer(
Expand Down
39 changes: 34 additions & 5 deletions trinity/protocol/bcc_libp2p/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@
Sequence,
Set,
Tuple,
Iterator,
)

from lahja import (
EndpointAPI,
)

from cancel_token import (
Expand Down Expand Up @@ -39,6 +44,7 @@
SigningRoot,
SubnetId,
)
from eth2.events import SyncRequest

from libp2p import (
initialize_default_swarm,
Expand Down Expand Up @@ -140,7 +146,7 @@
make_rpc_v1_ssz_protocol_id,
make_tcp_ip_maddr,
Interaction,
compare_chain_tip_and_finalized_epoch,
peer_is_ahead,
validate_peer_status,
get_my_status,
get_requested_beacon_blocks,
Expand All @@ -160,6 +166,7 @@
REQ_RESP_BEACON_BLOCKS_BY_ROOT_SSZ = make_rpc_v1_ssz_protocol_id(
REQ_RESP_BEACON_BLOCKS_BY_ROOT
)
NEXT_UPDATE_INTERVAL = 10


@dataclass
Expand Down Expand Up @@ -241,6 +248,10 @@ def get_best(self, field: str) -> Peer:
def get_best_head_slot_peer(self) -> Peer:
return self.get_best("head_slot")

@property
def peer_ids(self) -> Iterator[ID]:
return iter(self.peers.keys())


DIAL_RETRY_COUNT = 10

Expand All @@ -258,6 +269,7 @@ class Node(BaseService):
preferred_nodes: Tuple[Multiaddr, ...]
chain: BaseBeaconChain
subnets: Set[SubnetId]
_event_bus: EndpointAPI

handshaked_peers: PeerPool = None

Expand All @@ -267,6 +279,7 @@ def __init__(
listen_ip: str,
listen_port: int,
chain: BaseBeaconChain,
event_bus: EndpointAPI,
security_protocol_ops: Dict[TProtocol, BaseSecureTransport] = None,
muxer_protocol_ops: Dict[TProtocol, IMuxedConn] = None,
gossipsub_params: Optional[GossipsubParams] = None,
Expand Down Expand Up @@ -316,6 +329,7 @@ def __init__(
)

self.chain = chain
self._event_bus = event_bus

self.handshaked_peers = PeerPool()

Expand All @@ -327,6 +341,7 @@ def is_started(self) -> bool:

async def _run(self) -> None:
self.logger.info("libp2p node %s is up", self.listen_maddr)
self.run_daemon_task(self.update_status())
await self.cancellation()

async def start(self) -> None:
Expand Down Expand Up @@ -595,8 +610,12 @@ async def _handle_status(self, stream: INetStream) -> None:

self._add_peer_from_status(peer_id, peer_status)

# Check if we are behind the peer
compare_chain_tip_and_finalized_epoch(self.chain, peer_status)
if peer_is_ahead(self.chain, peer_status):
logger.debug(
"Peer's chain is ahead of us, start syncing with the peer(%s)",
str(peer_id),
)
await self._event_bus.broadcast(SyncRequest())

async def request_status(self, peer_id: ID) -> None:
self.logger.info("Initiate handshake with %s", str(peer_id))
Expand All @@ -615,8 +634,12 @@ async def request_status(self, peer_id: ID) -> None:

self._add_peer_from_status(peer_id, peer_status)

# Check if we are behind the peer
compare_chain_tip_and_finalized_epoch(self.chain, peer_status)
if peer_is_ahead(self.chain, peer_status):
logger.debug(
"Peer's chain is ahead of us, start syncing with the peer(%s)",
str(peer_id),
)
await self._event_bus.broadcast(SyncRequest())

async def _handle_goodbye(self, stream: INetStream) -> None:
async with Interaction(stream) as interaction:
Expand Down Expand Up @@ -720,3 +743,9 @@ async def request_beacon_blocks_by_root(
])

return blocks

async def update_status(self) -> None:
while True:
for peer_id in self.handshaked_peers.peer_ids:
asyncio.ensure_future(self.request_status(peer_id))
await asyncio.sleep(NEXT_UPDATE_INTERVAL)
11 changes: 4 additions & 7 deletions trinity/protocol/bcc_libp2p/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,20 +163,17 @@ async def validate_peer_status(chain: BaseBeaconChain, peer_status: Status) -> N
)


def compare_chain_tip_and_finalized_epoch(chain: BaseBeaconChain, peer_status: Status) -> None:
def peer_is_ahead(chain: BaseBeaconChain, peer_status: Status) -> bool:
checkpoint = chain.get_head_state().finalized_checkpoint
head_block = chain.get_canonical_head()

peer_has_higher_finalized_epoch = peer_status.finalized_epoch > checkpoint.epoch
peer_has_equal_finalized_epoch = peer_status.finalized_epoch == checkpoint.epoch
peer_has_higher_head_slot = peer_status.head_slot > head_block.slot
if (
return (
peer_has_higher_finalized_epoch or
(peer_has_equal_finalized_epoch and peer_has_higher_head_slot)
):
# TODO: kickoff syncing process with this peer
logger.debug("Peer's chain is ahead of us, start syncing with the peer.")
pass
)


def validate_start_slot(chain: BaseBeaconChain, start_slot: Slot) -> None:
Expand Down Expand Up @@ -410,7 +407,7 @@ def peer_id(self) -> ID:
return self.stream.muxed_conn.peer_id

def debug(self, message: str) -> None:
self.logger.debug(
self.logger.debug2(
"Interaction %s with %s %s",
self.stream.get_protocol().split("/")[4],
str(self.peer_id)[:15],
Expand Down
15 changes: 8 additions & 7 deletions trinity/sync/beacon/chain.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import asyncio
from typing import (
Tuple,
AsyncGenerator,
Expand All @@ -7,6 +6,9 @@
from eth_utils import (
ValidationError,
)
from lahja import (
EndpointAPI,
)

from cancel_token import (
CancelToken,
Expand All @@ -24,13 +26,12 @@
from eth2.beacon.typing import (
Slot,
)
from eth2.events import SyncRequest

from trinity.db.beacon.chain import BaseAsyncBeaconChainDB
from trinity.protocol.bcc_libp2p.node import PeerPool, Peer
from trinity.sync.beacon.constants import (
MAX_BLOCKS_PER_REQUEST,
NEXT_SYNC_CHECK_INTERVAL,
PEER_SELECTION_RETRY_INTERVAL,
)
from trinity.sync.common.chain import (
SyncBlockImporter,
Expand All @@ -51,30 +52,31 @@ class BeaconChainSyncer(BaseService):
block_importer: SyncBlockImporter
genesis_config: Eth2GenesisConfig
sync_peer: Peer
_event_bus: EndpointAPI

def __init__(self,
chain_db: BaseAsyncBeaconChainDB,
peer_pool: PeerPool,
block_importer: SyncBlockImporter,
genesis_config: Eth2GenesisConfig,
event_bus: EndpointAPI,
token: CancelToken = None) -> None:
super().__init__(token)

self.chain_db = chain_db
self.peer_pool = peer_pool
self.block_importer = block_importer
self.genesis_config = genesis_config
self._event_bus = event_bus

self.sync_peer = None

async def _run(self) -> None:
while True:
async for event in self.wait_iter(self._event_bus.stream(SyncRequest)):
try:
self.sync_peer = await self.wait(self.select_sync_peer())
except LeadingPeerNotFonud as exception:
self.logger.info("No suitable peers to sync with: %s", exception)
# wait some time and try again
await asyncio.sleep(PEER_SELECTION_RETRY_INTERVAL)
continue
else:
# sync peer selected successfully
Expand All @@ -87,7 +89,6 @@ async def _run(self) -> None:
)
# Reset the sync peer
self.sync_peer = None
await asyncio.sleep(NEXT_SYNC_CHECK_INTERVAL)

async def select_sync_peer(self) -> Peer:
if len(self.peer_pool) == 0:
Expand Down
2 changes: 0 additions & 2 deletions trinity/sync/beacon/constants.py
Original file line number Diff line number Diff line change
@@ -1,3 +1 @@
MAX_BLOCKS_PER_REQUEST = 64
NEXT_SYNC_CHECK_INTERVAL = 10
PEER_SELECTION_RETRY_INTERVAL = 5
8 changes: 5 additions & 3 deletions trinity/tools/bcc_factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@

from cancel_token import CancelToken

from lahja import EndpointAPI
from libp2p.crypto.secp256k1 import create_new_key_pair
from libp2p.peer.id import ID
from libp2p.peer.peerinfo import (
PeerInfo,
)


from eth_utils import to_tuple

from eth.constants import (
Expand Down Expand Up @@ -59,7 +59,6 @@
AtomicDBFactory,
)


try:
import factory
except ImportError:
Expand Down Expand Up @@ -89,6 +88,7 @@ class Meta:
preferred_nodes: Tuple[Multiaddr, ...] = tuple()
subnets: None
chain = factory.SubFactory(BeaconChainFactory)
event_bus = None

@classmethod
def create_batch(cls, number: int) -> Tuple[Node, ...]:
Expand Down Expand Up @@ -116,6 +116,7 @@ async def ConnectionPairFactory(
bob_chaindb: AsyncBeaconChainDB = None,
bob_branch: Collection[BaseBeaconBlock] = None,
genesis_state: BeaconState = None,
alice_event_bus: EndpointAPI = None,
cancel_token: CancelToken = None,
handshake: bool = True,
) -> AsyncIterator[Tuple[Node, Node]]:
Expand All @@ -136,7 +137,7 @@ async def ConnectionPairFactory(
alice_kwargs["chain__genesis_state"] = genesis_state
bob_kwargs["chain__genesis_state"] = genesis_state

alice = NodeFactory(cancel_token=cancel_token, **alice_kwargs)
alice = NodeFactory(cancel_token=cancel_token, event_bus=alice_event_bus, **alice_kwargs)
bob = NodeFactory(cancel_token=cancel_token, **bob_kwargs)
async with run_service(alice), run_service(bob):
await asyncio.sleep(0.01)
Expand Down Expand Up @@ -266,4 +267,5 @@ class Meta:
lambda obj: SimpleWriterBlockImporter(obj.chain_db)
)
genesis_config = SERENITY_GENESIS_CONFIG
event_bus = None
token = None

0 comments on commit d76e8be

Please sign in to comment.