Skip to content

Commit

Permalink
consider peer reputation score when deciding to disconnect (hyperledg…
Browse files Browse the repository at this point in the history
…er#6187)

* don't disconnect if peer has enough of a score increase to have been useful

* use threshold not increase

Signed-off-by: Sally MacFarlane <[email protected]>

---------

Signed-off-by: Sally MacFarlane <[email protected]>
  • Loading branch information
macfarla authored Nov 21, 2023
1 parent eaf5682 commit ea376ba
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ private boolean registerDisconnect(
disconnectCallbacks.forEach(callback -> callback.onDisconnect(peer));
peer.handleDisconnect();
abortPendingRequestsAssignedToDisconnectedPeers();
LOG.debug("Disconnected EthPeer {}", peer.getShortNodeId());
LOG.debug("Disconnected EthPeer {}...", peer.getShortNodeId());
LOG.trace("Disconnected EthPeer {}", peer);
}
}
Expand Down Expand Up @@ -391,7 +391,7 @@ public void disconnectWorstUselessPeer() {
peer -> {
LOG.atDebug()
.setMessage(
"disconnecting peer {}. Waiting for better peers. Current {} of max {}")
"disconnecting peer {}... Waiting for better peers. Current {} of max {}")
.addArgument(peer::getShortNodeId)
.addArgument(this::peerCount)
.addArgument(this::getMaxPeers)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ public void handleDisconnect(
"Disconnect - {} - {} - {}... - {} peers left\n{}",
initiatedByPeer ? "Inbound" : "Outbound",
reason,
connection.getPeer().getId().slice(0, 16),
connection.getPeer().getId().slice(0, 8),
ethPeers.peerCount(),
ethPeers);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,21 @@
public class PeerReputation implements Comparable<PeerReputation> {
static final long USELESS_RESPONSE_WINDOW_IN_MILLIS =
TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES);
static final int DEFAULT_MAX_SCORE = 150;
static final int DEFAULT_MAX_SCORE = 200;
// how much above the initial score you need to be to not get disconnected for timeouts/useless
// responses
private final int hasBeenUsefulThreshold;
static final int DEFAULT_INITIAL_SCORE = 100;
private static final Logger LOG = LoggerFactory.getLogger(PeerReputation.class);
private static final int TIMEOUT_THRESHOLD = 3;
private static final int TIMEOUT_THRESHOLD = 5;
private static final int USELESS_RESPONSE_THRESHOLD = 5;

private final ConcurrentMap<Integer, AtomicInteger> timeoutCountByRequestType =
new ConcurrentHashMap<>();
private final Queue<Long> uselessResponseTimes = new ConcurrentLinkedQueue<>();

private static final int SMALL_ADJUSTMENT = 1;
private static final int LARGE_ADJUSTMENT = 10;

private static final int LARGE_ADJUSTMENT = 5;
private int score;

private final int maxScore;
Expand All @@ -59,22 +61,37 @@ public PeerReputation(final int initialScore, final int maxScore) {
checkArgument(
initialScore <= maxScore, "Initial score must be less than or equal to max score");
this.maxScore = maxScore;
this.hasBeenUsefulThreshold = Math.min(maxScore, initialScore + 10);
this.score = initialScore;
}

public Optional<DisconnectReason> recordRequestTimeout(final int requestCode) {
final int newTimeoutCount = getOrCreateTimeoutCount(requestCode).incrementAndGet();
if (newTimeoutCount >= TIMEOUT_THRESHOLD) {
LOG.debug(
"Disconnection triggered by {} repeated timeouts for requestCode {}",
newTimeoutCount,
requestCode);
score -= LARGE_ADJUSTMENT;
return Optional.of(DisconnectReason.TIMEOUT);
// don't trigger disconnect if this peer has a sufficiently high reputation score
if (peerHasNotBeenUseful()) {
LOG.debug(
"Disconnection triggered by {} repeated timeouts for requestCode {}, peer score {}",
newTimeoutCount,
requestCode,
score);
return Optional.of(DisconnectReason.TIMEOUT);
}

LOG.trace(
"Not triggering disconnect for {} repeated timeouts for requestCode {} because peer has high score {}",
newTimeoutCount,
requestCode,
score);
} else {
score -= SMALL_ADJUSTMENT;
return Optional.empty();
}
return Optional.empty();
}

private boolean peerHasNotBeenUseful() {
return score < hasBeenUsefulThreshold;
}

public void resetTimeoutCount(final int requestCode) {
Expand All @@ -96,12 +113,19 @@ public Optional<DisconnectReason> recordUselessResponse(final long timestamp) {
}
if (uselessResponseTimes.size() >= USELESS_RESPONSE_THRESHOLD) {
score -= LARGE_ADJUSTMENT;
LOG.debug("Disconnection triggered by exceeding useless response threshold");
return Optional.of(DisconnectReason.USELESS_PEER);
// don't trigger disconnect if this peer has a sufficiently high reputation score
if (peerHasNotBeenUseful()) {
LOG.debug(
"Disconnection triggered by exceeding useless response threshold, score {}", score);
return Optional.of(DisconnectReason.USELESS_PEER);
}
LOG.trace(
"Not triggering disconnect for exceeding useless response threshold because peer has high score {}",
score);
} else {
score -= SMALL_ADJUSTMENT;
return Optional.empty();
}
return Optional.empty();
}

public void recordUsefulResponse() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public void shouldThrowOnInvalidInitialScore() {

@Test
public void shouldOnlyDisconnectWhenTimeoutLimitReached() {
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS)).isEmpty();
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS)).isEmpty();
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS)).isEmpty();
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS)).isEmpty();
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS)).contains(TIMEOUT);
Expand All @@ -45,6 +47,11 @@ public void shouldOnlyDisconnectWhenTimeoutLimitReached() {
public void shouldTrackTimeoutsSeparatelyForDifferentRequestTypes() {
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS)).isEmpty();
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS)).isEmpty();
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS)).isEmpty();
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS)).isEmpty();

assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_BODIES)).isEmpty();
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_BODIES)).isEmpty();
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_BODIES)).isEmpty();
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_BODIES)).isEmpty();

Expand All @@ -57,6 +64,8 @@ public void shouldResetTimeoutCountForRequestType() {
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS)).isEmpty();
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS)).isEmpty();

assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_BODIES)).isEmpty();
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_BODIES)).isEmpty();
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_BODIES)).isEmpty();
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_BODIES)).isEmpty();

Expand Down

0 comments on commit ea376ba

Please sign in to comment.