Skip to content

Commit

Permalink
refactor(p2p): implement initial state
Browse files Browse the repository at this point in the history
  • Loading branch information
glevco committed Nov 6, 2024
1 parent e9616e9 commit 260d851
Show file tree
Hide file tree
Showing 20 changed files with 221 additions and 139 deletions.
2 changes: 1 addition & 1 deletion hathor/p2p/dependencies/protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def on_peer_connect(self, protocol: HathorProtocol) -> None: ...
def on_peer_ready(self, protocol: HathorProtocol) -> None: ...
def on_handshake_disconnect(self, *, addr: PeerAddress) -> None: ...
def on_ready_disconnect(self, *, addr: PeerAddress, peer_id: PeerId) -> None: ...
def on_unknown_disconnect(self, *, addr: PeerAddress) -> None: ...
def on_initial_disconnect(self, *, addr: PeerAddress) -> None: ...
def get_randbytes(self, n: int) -> bytes: ...
def is_peer_ready(self, peer_id: PeerId) -> bool: ...
def send_tx_to_peers(self, tx: BaseTransaction) -> None: ...
Expand Down
13 changes: 10 additions & 3 deletions hathor/p2p/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

from abc import ABC
from typing import Callable

from twisted.internet import protocol
from twisted.internet.interfaces import IAddress
Expand All @@ -21,7 +22,7 @@
from hathor.p2p.manager import ConnectionsManager
from hathor.p2p.peer import PrivatePeer
from hathor.p2p.peer_endpoint import PeerAddress
from hathor.p2p.protocol import HathorLineReceiver
from hathor.p2p.protocol import HathorLineReceiver, HathorProtocol


class _HathorLineReceiverFactory(ABC, protocol.Factory):
Expand All @@ -34,22 +35,28 @@ def __init__(
*,
dependencies: P2PDependencies,
use_ssl: bool,
built_protocol_callback: Callable[[PeerAddress, HathorProtocol], None] | None,
):
super().__init__()
self.my_peer = my_peer
self.p2p_manager = p2p_manager
self.dependencies = dependencies
self.use_ssl = use_ssl
self._built_protocol_callback = built_protocol_callback

def buildProtocol(self, addr: IAddress) -> HathorLineReceiver:
return HathorLineReceiver(
addr=PeerAddress.from_address(addr),
peer_addr = PeerAddress.from_address(addr)
hathor_protocol = HathorLineReceiver(
addr=peer_addr,
my_peer=self.my_peer,
p2p_manager=self.p2p_manager,
dependencies=self.dependencies,
use_ssl=self.use_ssl,
inbound=self.inbound,
)
if self._built_protocol_callback:
self._built_protocol_callback(peer_addr, hathor_protocol)
return hathor_protocol


class HathorServerFactory(_HathorLineReceiverFactory, protocol.ServerFactory):
Expand Down
15 changes: 10 additions & 5 deletions hathor/p2p/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,14 @@ def __init__(
p2p_manager=self,
dependencies=dependencies,
use_ssl=self.use_ssl,
built_protocol_callback=self._on_built_protocol,
)
self.client_factory = HathorClientFactory(
my_peer=self.my_peer,
p2p_manager=self,
dependencies=dependencies,
use_ssl=self.use_ssl,
built_protocol_callback=self._on_built_protocol,
)

# Global maximum number of connections.
Expand Down Expand Up @@ -360,7 +362,7 @@ def on_peer_connect(self, protocol: HathorProtocol) -> None:
protocol.disconnect(force=True)
return

self._connections.on_connected(protocol=protocol)
self._connections.on_connected(addr=protocol.addr, inbound=protocol.inbound)
self.pubsub.publish(
HathorEvents.NETWORK_PEER_CONNECTED,
protocol=protocol,
Expand Down Expand Up @@ -423,9 +425,9 @@ def on_ready_disconnect(self, *, addr: PeerAddress, peer_id: PeerId) -> None:
peers_count=self._get_peers_count()
)

def on_unknown_disconnect(self, *, addr: PeerAddress) -> None:
"""Called when a peer disconnects from an unknown state (None)."""
self._connections.on_unknown_disconnect(addr=addr)
def on_initial_disconnect(self, *, addr: PeerAddress) -> None:
"""Called when a peer disconnects from the initial state."""
self._connections.on_initial_disconnect(addr=addr)
self.pubsub.publish(
HathorEvents.NETWORK_PEER_DISCONNECTED,
peers_count=self._get_peers_count()
Expand Down Expand Up @@ -575,7 +577,7 @@ def connect_to(
if endpoint.peer_id is not None and peer is not None:
assert endpoint.peer_id == peer.id, 'the entrypoint peer_id does not match the actual peer_id'

already_exists = self._connections.on_connecting(addr=endpoint.addr)
already_exists = self._connections.on_start_connection(addr=endpoint.addr)
if already_exists:
self.log.debug('skipping because we are already connected(ing) to this endpoint', endpoint=str(endpoint))
return None
Expand Down Expand Up @@ -643,6 +645,9 @@ def _on_listen_success(self, listening_port: IListeningPort, description: str) -
if self.manager.hostname:
self._add_hostname_entrypoint(self.manager.hostname, address)

def _on_built_protocol(self, addr: PeerAddress, protocol: HathorProtocol) -> None:
self._connections.on_built_protocol(addr=addr, protocol=protocol)

def update_hostname_entrypoints(self, *, old_hostname: str | None, new_hostname: str) -> None:
"""Add new hostname entrypoints according to the listen addresses, and remove any old entrypoint."""
assert self.manager is not None
Expand Down
46 changes: 32 additions & 14 deletions hathor/p2p/peer_connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,17 @@ class PeerConnections:
It's also responsible for reacting for state changes on those connections.
"""

__slots__ = ('_connecting_outbound', '_handshaking', '_ready', '_addr_by_id')
__slots__ = ('_connecting_outbound', '_built', '_handshaking', '_ready', '_addr_by_id')

def __init__(self) -> None:
# Peers that are in the "connecting" state, between starting a connection and Twisted calling `connectionMade`.
# This is only for outbound peers, that is, connections initiated by us.
# They're uniquely identified by the address we're connecting to.
self._connecting_outbound: set[PeerAddress] = set()

# Peers that had their protocol instances built, before getting connected.
self._built: dict[PeerAddress, HathorProtocol] = {}

# Peers that are handshaking, in a state after being connected and before reaching the READY state.
# They're uniquely identified by the address we're connected to.
self._handshaking: dict[PeerAddress, HathorProtocol] = {}
Expand Down Expand Up @@ -71,7 +74,7 @@ def ready_peers(self) -> dict[PeerAddress, HathorProtocol]:

def not_ready_peers(self) -> list[PeerAddress]:
"""Get not ready peers, that is, peers that are either connecting or handshaking."""
return list(self._connecting_outbound) + list(self._handshaking)
return list(self._built) + list(self._connecting_outbound) + list(self._handshaking)

def connected_peers(self) -> dict[PeerAddress, HathorProtocol]:
"""
Expand Down Expand Up @@ -104,39 +107,49 @@ def is_peer_ready(self, peer_id: PeerId) -> bool:
"""Return whether a peer is ready, by its PeerId."""
return peer_id in self._addr_by_id

def on_connecting(self, *, addr: PeerAddress) -> bool:
def on_start_connection(self, *, addr: PeerAddress) -> bool:
"""
Callback for when an outbound connection is initiated.
Returns True if this address already exists, either connecting or connected, and False otherwise."""
Returns True if this address already exists, either connecting or connected, and False otherwise.
"""
if addr in self.all_peers():
return True

self._connecting_outbound.add(addr)
return False

def on_built_protocol(self, *, addr: PeerAddress, protocol: HathorProtocol) -> None:
"""Callback for when a HathorProtocol instance is built."""
assert addr not in self._built
assert addr not in self.connected_peers()
self._built[addr] = protocol

def on_failed_to_connect(self, *, addr: PeerAddress) -> None:
"""Callback for when an outbound connection fails before getting connected."""
assert addr in self._connecting_outbound
assert addr not in self.connected_peers()
self._connecting_outbound.remove(addr)

def on_connected(self, *, protocol: HathorProtocol) -> None:
"""Callback for when an outbound connection gets connected."""
assert protocol.addr not in self.connected_peers()
def on_connected(self, *, addr: PeerAddress, inbound: bool) -> None:
"""Callback for when a connection is made from both inbound and outbound peers."""
assert addr in self._built
assert addr not in self.connected_peers()

if protocol.inbound:
assert protocol.addr not in self._connecting_outbound
if inbound:
assert addr not in self._connecting_outbound
else:
assert protocol.addr in self._connecting_outbound
self._connecting_outbound.remove(protocol.addr)
assert addr in self._connecting_outbound
self._connecting_outbound.remove(addr)

self._handshaking[protocol.addr] = protocol
protocol = self._built.pop(addr)
self._handshaking[addr] = protocol

def on_handshake_disconnect(self, *, addr: PeerAddress) -> None:
"""
Callback for when a connection is closed during a handshaking state, that is,
after getting connected and before getting READY.
"""
assert addr not in self._built
assert addr not in self._connecting_outbound
assert addr in self._handshaking
assert addr not in self._ready
Expand All @@ -148,6 +161,7 @@ def on_ready(self, *, addr: PeerAddress, peer_id: PeerId) -> HathorProtocol | No
If the PeerId of this connection is duplicate, return the protocol that we should disconnect.
Return None otherwise.
"""
assert addr not in self._built
assert addr not in self._connecting_outbound
assert addr in self._handshaking
assert addr not in self._ready
Expand All @@ -173,6 +187,7 @@ def on_ready(self, *, addr: PeerAddress, peer_id: PeerId) -> HathorProtocol | No

def on_ready_disconnect(self, *, addr: PeerAddress, peer_id: PeerId) -> None:
"""Callback for when a connection is closed during the READY state."""
assert addr not in self._built
assert addr not in self._connecting_outbound
assert addr not in self._handshaking
assert addr in self._ready
Expand All @@ -181,10 +196,13 @@ def on_ready_disconnect(self, *, addr: PeerAddress, peer_id: PeerId) -> None:
if self._addr_by_id[peer_id] == addr:
self._addr_by_id.pop(peer_id)

def on_unknown_disconnect(self, *, addr: PeerAddress) -> None:
"""Callback for when a connection is closed during an unknown state."""
def on_initial_disconnect(self, *, addr: PeerAddress) -> None:
"""Callback for when a connection is closed during the initial state."""
assert addr in self._built
assert addr not in self._handshaking
assert addr not in self._ready

self._built.pop(addr)
if addr in self._connecting_outbound:
self._connecting_outbound.remove(addr)

Expand Down
4 changes: 4 additions & 0 deletions hathor/p2p/peer_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ def __eq__(self, other: Any) -> bool:
def __ne__(self, other: Any) -> bool:
return not self == other

def __hash__(self):
host = 'localhost' if self.is_localhost() else self.host
return hash((self.protocol, host, self.port))

@classmethod
def parse(cls, description: str) -> Self:
protocol, host, port, query = _parse_address_parts(description)
Expand Down
Loading

0 comments on commit 260d851

Please sign in to comment.