Skip to content

Commit

Permalink
Merge pull request #2157 from rsksmart/add-logs-peer-discovery
Browse files Browse the repository at this point in the history
Add logs peer discovery
  • Loading branch information
Vovchyk authored Dec 11, 2023
2 parents 8c3b618 + 8d7333e commit a8006f4
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package co.rsk.net.discovery;

import co.rsk.net.discovery.message.PeerDiscoveryMessage;
import org.apache.commons.lang3.builder.ToStringBuilder;

import java.net.InetSocketAddress;

Expand All @@ -38,4 +39,12 @@ public PeerDiscoveryMessage getMessage() {
public InetSocketAddress getAddress() {
return address;
}

@Override
public String toString() {
return new ToStringBuilder(this)
.append("message", this.message)
.append("address", this.address)
.toString();
}
}
10 changes: 10 additions & 0 deletions rskj-core/src/main/java/co/rsk/net/discovery/NodeChallenge.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package co.rsk.net.discovery;

import org.apache.commons.lang3.builder.ToStringBuilder;
import org.ethereum.net.rlpx.Node;

/**
Expand Down Expand Up @@ -45,4 +46,13 @@ public Node getChallenger() {
public String getChallengeId() {
return challengeId;
}

@Override
public String toString() {
return new ToStringBuilder(this)
.append("challengedNode", this.challengedNode)
.append("challenger", this.challenger)
.append("challengeId", this.challengeId)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import co.rsk.net.discovery.message.PingPeerMessage;
import com.google.common.annotations.VisibleForTesting;
import org.ethereum.net.rlpx.Node;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -29,9 +31,14 @@
* Created by mario on 22/02/17.
*/
public class NodeChallengeManager {
private static final Logger logger = LoggerFactory.getLogger(NodeChallengeManager.class);

private Map<String, NodeChallenge> activeChallenges = new ConcurrentHashMap<>();

public NodeChallenge startChallenge(Node challengedNode, Node challenger, PeerExplorer explorer) {
logger.debug("startChallenge - Starting challenge for node: [{}] by challenger: [{}]",
challengedNode.getHexId(), challenger.getHexId());

PingPeerMessage pingMessage = explorer.sendPing(challengedNode.getAddress(), 1, challengedNode);
String messageId = pingMessage.getMessageId();
NodeChallenge challenge = new NodeChallenge(challengedNode, challenger, messageId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import co.rsk.net.discovery.message.DiscoveryMessageType;
import co.rsk.net.discovery.message.PeerDiscoveryMessage;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.ethereum.net.rlpx.Node;

import java.net.InetSocketAddress;
Expand Down Expand Up @@ -73,4 +74,14 @@ public boolean validateMessageResponse(InetSocketAddress responseAddress, PeerDi
public boolean hasExpired() {
return System.currentTimeMillis() > expirationDate;
}

@Override
public String toString() {
return new ToStringBuilder(this)
.append("messageId", this.messageId)
.append("message", this.message)
.append("address", this.address)
.append("relatedNode", this.relatedNode)
.toString();
}
}
125 changes: 118 additions & 7 deletions rskj-core/src/main/java/co/rsk/net/discovery/PeerExplorer.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@
package co.rsk.net.discovery;

import co.rsk.net.NodeID;
import co.rsk.net.discovery.message.*;
import co.rsk.net.discovery.message.DiscoveryMessageType;
import co.rsk.net.discovery.message.FindNodePeerMessage;
import co.rsk.net.discovery.message.NeighborsPeerMessage;
import co.rsk.net.discovery.message.PingPeerMessage;
import co.rsk.net.discovery.message.PongPeerMessage;
import co.rsk.net.discovery.table.NodeDistanceTable;
import co.rsk.net.discovery.table.OperationResult;
import co.rsk.net.discovery.table.PeerDiscoveryRequestBuilder;
Expand All @@ -37,7 +41,12 @@
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.security.SecureRandom;
import java.util.*;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
Expand Down Expand Up @@ -160,6 +169,12 @@ void setUDPChannel(UDPChannel udpChannel) {
}

synchronized void handleMessage(DiscoveryEvent event) {
logger.debug("handleMessage - Handling message with " +
"type: [{}], " +
"networkId: [{}]",
event.getMessage().getMessageType(),
event.getMessage().getNetworkId());

if (state != ExecState.RUNNING) {
logger.warn("Cannot handle message as current state is {}", state);
return;
Expand All @@ -170,6 +185,8 @@ synchronized void handleMessage(DiscoveryEvent event) {
//have a networkId in the message yet, then just let them through, for now.
if (event.getMessage().getNetworkId().isPresent() &&
event.getMessage().getNetworkId().getAsInt() != this.networkId) {
logger.warn("handleMessage - Message ignored because remote peer's network id: [{}] is different from local network id: [{}]",
event.getMessage().getNetworkId(), this.networkId);
return;
}
if (type == DiscoveryMessageType.PING) {
Expand All @@ -191,9 +208,14 @@ synchronized void handleMessage(DiscoveryEvent event) {

private void handlePingMessage(InetSocketAddress address, PingPeerMessage message) {
this.sendPong(address, message);

Node connectedNode = this.establishedConnections.get(message.getNodeId());

logger.debug("handlePingMessage - Handling ping message with " +
"address: [{}/{}], " +
"nodeId: [{}], " +
"connectedNode: [{}]"
, address.getHostName(), address.getPort(), message.getNodeId(), connectedNode);

if (connectedNode == null) {
this.sendPing(address, 1);
} else {
Expand All @@ -204,12 +226,20 @@ private void handlePingMessage(InetSocketAddress address, PingPeerMessage messag
private void handlePong(InetSocketAddress pongAddress, PongPeerMessage message) {
PeerDiscoveryRequest request = this.pendingPingRequests.get(message.getMessageId());

logger.debug("handlePong - Handling pong message with " +
"address: [{}/{}], " +
"messageId: [{}], " +
"request: [{}]"
, pongAddress.getHostName(), pongAddress.getPort(), message.getMessageId(), request);

if (request != null && request.validateMessageResponse(pongAddress, message)) {
this.pendingPingRequests.remove(message.getMessageId());
NodeChallenge challenge = this.challengeManager.removeChallenge(message.getMessageId());
if (challenge == null) {
this.addConnection(message, request.getAddress().getHostString(), request.getAddress().getPort());
}
} else {
logger.warn("handlePong - Peer discovery request with id [{}] is either null or invalid", message.getMessageId());
}
}

Expand All @@ -219,17 +249,29 @@ private void handleFindNode(FindNodePeerMessage message) {

if (connectedNode != null) {
List<Node> nodesToSend = this.distanceTable.getClosestNodes(nodeId);
logger.debug("About to send [{}] neighbors to ip[{}] port[{}] nodeId[{}]", nodesToSend.size(), connectedNode.getHost(), connectedNode.getPort(), connectedNode.getHexId());
logger.debug("handleFindNode - About to send [{}] neighbors to address: [{}/{}], nodeId: [{}] with messageId: [{}]",
nodesToSend.size(), connectedNode.getHost(), connectedNode.getPort(), connectedNode.getHexId(), message.getMessageId());
this.sendNeighbors(connectedNode.getAddress(), nodesToSend, message.getMessageId());
updateEntry(connectedNode);
} else {
logger.warn("handleFindNode - Node with id: [{}] is not connected. Ignored", nodeId);
}
}

private void handleNeighborsMessage(InetSocketAddress neighborsResponseAddress, NeighborsPeerMessage message) {
Node connectedNode = this.establishedConnections.get(message.getNodeId());
NodeID nodeId = message.getNodeId();
Node connectedNode = this.establishedConnections.get(nodeId);

if (connectedNode != null) {
logger.debug("Neighbors received from [{}]", connectedNode.getHexId());
logger.debug("handleNeighborsMessage - Neighbors received from id: [{}], address: [{}/{}] with " +
"nodeId: [{}]," +
"messageId: [{}], " +
"nodesCount: [{}], " +
"nodes: [{}], " +
"connectedNode: [{}]",
connectedNode.getHexId(), neighborsResponseAddress.getHostName(), neighborsResponseAddress.getPort(), message.getNodeId(),
message.getMessageId(), message.countNodes(), message.getNodes(), connectedNode);

PeerDiscoveryRequest request = this.pendingFindNodeRequests.remove(message.getMessageId());

if (request != null && request.validateMessageResponse(neighborsResponseAddress, message)) {
Expand All @@ -239,6 +281,8 @@ private void handleNeighborsMessage(InetSocketAddress neighborsResponseAddress,
this.startConversationWithNewNodes();
}
updateEntry(connectedNode);
} else {
logger.warn("handleFindNode - Node with id: [{}] is not connected. Ignored", nodeId);
}
}

Expand All @@ -254,6 +298,8 @@ synchronized PingPeerMessage sendPing(InetSocketAddress nodeAddress, int attempt
PingPeerMessage nodeMessage = checkPendingPeerToAddress(nodeAddress);

if (nodeMessage != null) {
logger.warn("sendPing - No ping message has been sent to address: [{}/{}], as there's pending one", nodeAddress.getHostName(), nodeAddress.getPort());

return nodeMessage;
}

Expand All @@ -263,6 +309,13 @@ synchronized PingPeerMessage sendPing(InetSocketAddress nodeAddress, int attempt
localAddress.getAddress().getHostAddress(),
localAddress.getPort(),
id, this.key, this.networkId);

logger.debug("sendPing - Sending ping message to " +
"address: [{}/{}], " +
"attempt: [{}], " +
"nodeMessage: [{}]"
, nodeAddress.getHostName(), nodeAddress.getPort(), attempt, nodeMessage);

udpChannel.write(new DiscoveryEvent(nodeMessage, nodeAddress));

PeerDiscoveryRequest request = PeerDiscoveryRequestBuilder.builder().messageId(id)
Expand All @@ -275,6 +328,7 @@ synchronized PingPeerMessage sendPing(InetSocketAddress nodeAddress, int attempt
}

private void updateEntry(Node connectedNode) {
logger.trace("updateEntry - Updating node [{}]", connectedNode.getHexId());
try {
updateEntryLock.lock();
this.distanceTable.updateEntry(connectedNode);
Expand All @@ -296,6 +350,13 @@ private PingPeerMessage checkPendingPeerToAddress(InetSocketAddress address) {
private PongPeerMessage sendPong(InetSocketAddress nodeAddress, PingPeerMessage message) {
InetSocketAddress localAddress = this.localNode.getAddress();
PongPeerMessage pongPeerMessage = PongPeerMessage.create(localAddress.getHostName(), localAddress.getPort(), message.getMessageId(), this.key, this.networkId);

logger.debug("sendPong - Sending pong message to " +
"address: [{}/{}], " +
"messageId: [{}], " +
"pongPeerMessage: [{}]"
, nodeAddress.getHostName(), nodeAddress.getPort(), message.getMessageId(), pongPeerMessage);

udpChannel.write(new DiscoveryEvent(pongPeerMessage, nodeAddress));

return pongPeerMessage;
Expand All @@ -306,10 +367,18 @@ FindNodePeerMessage sendFindNode(Node node) {
InetSocketAddress nodeAddress = node.getAddress();
String id = UUID.randomUUID().toString();
FindNodePeerMessage findNodePeerMessage = FindNodePeerMessage.create(this.key.getNodeId(), id, this.key, this.networkId);

udpChannel.write(new DiscoveryEvent(findNodePeerMessage, nodeAddress));
PeerDiscoveryRequest request = PeerDiscoveryRequestBuilder.builder().messageId(id).relatedNode(node)
.message(findNodePeerMessage).address(nodeAddress).expectedResponse(DiscoveryMessageType.NEIGHBORS)
.expirationPeriod(requestTimeout).build();

logger.debug("sendFindNode - Sending find node message to " +
"address: [{}/{}], " +
"findNodePeerMessage: [{}], " +
"request: [{}]"
, nodeAddress.getHostName(), nodeAddress.getPort(), findNodePeerMessage, request);

pendingFindNodeRequests.put(findNodePeerMessage.getMessageId(), request);

return findNodePeerMessage;
Expand All @@ -318,8 +387,16 @@ FindNodePeerMessage sendFindNode(Node node) {
private NeighborsPeerMessage sendNeighbors(InetSocketAddress nodeAddress, List<Node> nodes, String id) {
List<Node> nodesToSend = getRandomizeLimitedList(nodes, MAX_NODES_PER_MSG, 5);
NeighborsPeerMessage sendNodesMessage = NeighborsPeerMessage.create(nodesToSend, id, this.key, networkId);

logger.debug("sendNeighbors - Sending neighbors message to " +
"address: [{}/{}], " +
"id: [{}], " +
"nodes: [{}], " +
"nodesToSend: [{}], " +
"sendNodesMessage: [{}]"
, nodeAddress.getHostName(), nodeAddress.getPort(), id, nodes, nodesToSend, sendNodesMessage);

udpChannel.write(new DiscoveryEvent(sendNodesMessage, nodeAddress));
logger.debug(" [{}] Neighbors Sent to ip:[{}] port:[{}]", nodesToSend.size(), nodeAddress.getAddress().getHostAddress(), nodeAddress.getPort());

return sendNodesMessage;
}
Expand All @@ -340,6 +417,7 @@ synchronized void clean() {
return;
}

logger.trace("clean - Cleaning obsolete requests");
this.purgeRequests();
}

Expand All @@ -350,17 +428,28 @@ synchronized void update() {
}

List<Node> closestNodes = this.distanceTable.getClosestNodes(this.localNode.getId());

logger.trace("update - closestNodes: [{}]", closestNodes);

this.askForMoreNodes(closestNodes);
this.checkPeersPulse(closestNodes);
}

private void checkPeersPulse(List<Node> closestNodes) {
List<Node> nodesToCheck = this.getRandomizeLimitedList(closestNodes, MAX_NODES_TO_CHECK, 10);

logger.trace("checkPeersPulse - Checking peers pulse for nodes: [{}], nodesToCheck: [{}]"
, closestNodes, nodesToCheck);

nodesToCheck.forEach(node -> sendPing(node.getAddress(), 1, node));
}

private void askForMoreNodes(List<Node> closestNodes) {
List<Node> nodesToAsk = getRandomizeLimitedList(closestNodes, MAX_NODES_TO_ASK, 5);

logger.trace("askForMoreNodes - Asking for more nodes from closestNodes: [{}], nodesToAsk: [{}]"
, closestNodes, nodesToAsk);

nodesToAsk.forEach(this::sendFindNode);
}

Expand All @@ -369,6 +458,9 @@ private List<PeerDiscoveryRequest> removeExpiredRequests(Map<String, PeerDiscove
.filter(PeerDiscoveryRequest::hasExpired).collect(Collectors.toList());
requests.forEach(r -> pendingRequests.remove(r.getMessageId()));

logger.trace("removeExpiredRequests - Removing expired requests from pendingRequests: [{}], requests: [{}]"
, pendingRequests, requests);

return requests;
}

Expand All @@ -377,6 +469,8 @@ private void removeExpiredChallenges(List<PeerDiscoveryRequest> peerDiscoveryReq
}

private void resendExpiredPing(List<PeerDiscoveryRequest> peerDiscoveryRequests) {
logger.trace("resendExpiredPing - Resending expired pings form peerDiscoveryRequests: [{}]", peerDiscoveryRequests);

peerDiscoveryRequests.stream().filter(r -> r.getAttemptNumber() < RETRIES_COUNT)
.forEach(r -> sendPing(r.getAddress(), r.getAttemptNumber() + 1, r.getRelatedNode()));
}
Expand All @@ -392,6 +486,12 @@ private void removeConnections(List<PeerDiscoveryRequest> expiredRequests) {
}

private void removeConnection(Node node) {
if (logger.isDebugEnabled()) {
InetSocketAddress address = node.getAddress();
logger.debug("removeConnection - Removing node: [{}], " +
"nodeAddress address: [{}/{}]", node.getHexId(), address.getHostName(), address.getPort());
}

this.establishedConnections.remove(node.getId());
this.distanceTable.removeNode(node);
this.knownHosts.remove(node.getAddressAsString());
Expand All @@ -400,6 +500,14 @@ private void removeConnection(Node node) {
private void addConnection(PongPeerMessage message, String ip, int port) {
Node senderNode = new Node(message.getNodeId().getID(), ip, port);
boolean isLocalNode = StringUtils.equals(senderNode.getHexId(), this.localNode.getHexId());

logger.debug("addConnection - Adding node with " +
"address: [{}/{}], " +
"messageId: [{}], " +
"allowMultipleConnectionsPerHostPort: [{}], " +
"senderNode: [{}], " +
"isLocalNode: [{}], ", ip, port, message.getMessageId(), this.allowMultipleConnectionsPerHostPort, senderNode, isLocalNode);

if (isLocalNode) {
return;
}
Expand All @@ -409,6 +517,9 @@ private void addConnection(PongPeerMessage message, String ip, int port) {
}

OperationResult result = this.distanceTable.addNode(senderNode);

logger.debug("addConnection - result: [{}]", result);

if (result.isSuccess()) {
this.knownHosts.put(senderNode.getAddressAsString(), senderNode.getId());
this.establishedConnections.put(senderNode.getId(), senderNode);
Expand Down
Loading

0 comments on commit a8006f4

Please sign in to comment.