diff --git a/build.sbt b/build.sbt
index 68a3ee569..bee84cd5d 100644
--- a/build.sbt
+++ b/build.sbt
@@ -25,7 +25,7 @@ lazy val commonSettings = Seq(
Wart.OptionPartial),
organization := "io.horizen",
organizationName := "Zen Blockchain Foundation",
- version := "2.0.0-RC11",
+ version := "2.0.1",
licenses := Seq("CC0" -> url("https://creativecommons.org/publicdomain/zero/1.0/legalcode")),
homepage := Some(url("https://github.com/HorizenOfficial/Sparkz")),
pomExtra :=
diff --git a/ci/README.md b/ci/README.md
new file mode 100644
index 000000000..0933f5d2f
--- /dev/null
+++ b/ci/README.md
@@ -0,0 +1,58 @@
+# New version build functionality
+
+As of 2022/06/27 CI/CD pipeline adds functionality to build and publish multiple `DIGIT.DIGIT.DIGIT-SNAPSHOT` versions of `zendoo-sc-crypotolib` package
+with the help of set_version.sh script.
+
+`set_version.sh` script is located under **ci/devtools** directory and automates preparation steps for building/releasing a new
+version of the artifacts by setting the provided version for all the required dependencies across the configuration files.
+
+---
+## Prerequisites for publishing a package:
+ - Singed by GPG key commit and valid GitHub tag in the format of `DIGIT.DIGIT.DIGIT` or `DIGIT.DIGIT.DIGIT-SNAPSHOT`
+ - GitHub tag matching `${pom_version_of_package}"[0-9]*$` regex
+ - Your(a person who pushes a tag) GPG key being added to CI/CD pipeline build settings
+
+Otherwise, the build process will run without entering the publishing stage.
+
+`DIGIT.DIGIT.DIGIT-SNAPSHOT` package version can be built multiple times by adjusting GitHub tag name accordingly. For example:
+```
+GitHub tag = 1.1.1-SNAPSHOT can build 1.1.1-SNAPSHOT package
+GitHub tag = 1.1.1-SNAPSHOT1 can build 1.1.1-SNAPSHOT package
+GitHub tag = 1.1.1-SNAPSHOT2 can build 1.1.1-SNAPSHOT package
+```
+All SNAPSHOT packages are being pushed to a snapshot repository configured under pom.xml file:
+```
+
+ ossrh
+ https://oss.sonatype.org/content/repositories/snapshots
+
+```
+and can be referred to inside the configuration files by providing the full version, that can be found inside nexus [repository](https://oss.sonatype.org/content/repositories/snapshots/io/horizen/)
+
+---
+## Usage
+Before starting the build process use `set_version.sh` script if needed by providing two arguments in the following format:
+```
+ ./ci/devtools/set_version.sh --help
+ Usage: Provide OLD and NEW versions as the 1st and 2nd arguments respectively.
+ It has to match the following format:
+ DIGIT.DIGIT.DIGIT or DIGIT.DIGIT.DIGIT-SNAPSHOT
+
+ For example:
+ ./set_version.sh 5.5.5 5.5.5-SNAPSHOT
+ ./set_version.sh 5.5.5-SNAPSHOT 5.5.5
+```
+| Changes made by set_version.sh script need to be committed before the build. |
+|------------------------------------------------------------------------------|
+
+---
+## How to refer
+- Find all the existing versions of [2.0.1-SNAPSHOT package](https://oss.sonatype.org/content/repositories/snapshots/io/horizen/sparkz-core_2.13/2.0.1-SNAPSHOT/)
+- Use the full version of SNAPSHOT package as a dependency in the following format for your project.
+```
+
+ io.horizen
+ sparkz-core_2.12
+ 2.0.1-20230310.201529-1
+
+```
\ No newline at end of file
diff --git a/ci/publish.sh b/ci/publish.sh
index d7cc9af4f..c91bde056 100755
--- a/ci/publish.sh
+++ b/ci/publish.sh
@@ -3,10 +3,10 @@
set -eo pipefail
retval=0
-if [[ "${TRAVIS_TAG}" =~ ^[0-9]+\.[0-9]+\.[0-9]+(-RC[0-9]+)?(-SNAPSHOT){1}[0-9]*$ ]]; then
+if [[ "${TRAVIS_TAG}" =~ ^[0-9]+\.[0-9]+\.[0-9]+(-SNAPSHOT){1}[0-9]*$ ]]; then
echo "" && echo "=== Publishing development release on Sonatype Nexus repository. Timestamp is: $(date '+%a %b %d %H:%M:%S %Z %Y') ===" && echo ""
sbt -ivy ./.ivy2 -sbt-dir ./.sbt +publish
-elif [[ "${TRAVIS_TAG}" =~ ^[0-9]+\.[0-9]+\.[0-9]+(-RC[0-9]+)?$ ]]; then
+elif [[ "${TRAVIS_TAG}" =~ ^[0-9]+\.[0-9]+\.[0-9]+$ ]]; then
echo "" && echo "=== Publishing production release on Maven repository. Timestamp is: $(date '+%Y-%m-%d %H:%M') ===" && echo ""
sbt -ivy ./.ivy2 -sbt-dir ./.sbt +publishSigned sonatypeBundleRelease
else
diff --git a/ci/setup_env.sh b/ci/setup_env.sh
index d6eae7017..cc63d3d9d 100755
--- a/ci/setup_env.sh
+++ b/ci/setup_env.sh
@@ -19,12 +19,13 @@ function import_gpg_keys() {
# shellcheck disable=SC2207
declare -r my_arr=( $(echo "${@}" | tr " " "\n") )
- for key in "${my_arr[@]}"; do
+for key in "${my_arr[@]}"; do
echo "Importing key: ${key}"
gpg -v --batch --keyserver hkps://keys.openpgp.org --recv-keys "${key}" ||
gpg -v --batch --keyserver hkp://keyserver.ubuntu.com --recv-keys "${key}" ||
gpg -v --batch --keyserver hkp://pgp.mit.edu:80 --recv-keys "${key}" ||
- gpg -v --batch --keyserver hkp://keyserver.ubuntu.com:80 --recv-keys "${key}"
+ gpg -v --batch --keyserver hkp://keyserver.ubuntu.com:80 --recv-keys "${key}" ||
+ echo -e "${key} can not be found on GPG key servers. Please upload it to at least one of the following GPG key servers:\nhttps://keys.openpgp.org/\nhttps://keyserver.ubuntu.com/\nhttps://pgp.mit.edu/"
done
}
@@ -60,7 +61,7 @@ if [ -n "${TRAVIS_TAG}" ]; then
# Prod vs dev release
if ( git branch -r --contains "${TRAVIS_TAG}" | grep -xqE ". origin\/${PROD_RELEASE_BRANCH}$" ); then
# Checking if package version matches PROD release version
- if ! [[ "${package_version}" =~ ^[0-9]+\.[0-9]+\.[0-9]+(-RC[0-9]+)?$ ]]; then
+ if ! [[ "${package_version}" =~ ^[0-9]+\.[0-9]+\.[0-9]+$ ]]; then
echo "Aborting, package version is in the wrong format for production release."
exit 1
fi
@@ -80,7 +81,7 @@ if [ -n "${TRAVIS_TAG}" ]; then
export CONTAINER_PUBLISH="true"
else
# Checking if package version matches DEV release version
- if ! [[ "${package_version}" =~ ^[0-9]+\.[0-9]+\.[0-9]+(-RC[0-9]+)?(-SNAPSHOT){1}$ ]]; then
+ if ! [[ "${package_version}" =~ ^[0-9]+\.[0-9]+\.[0-9]+(-SNAPSHOT){1}[0-9]*$ ]]; then
echo "Aborting, package version is in the wrong format for development release."
exit 1
fi
diff --git a/release-notes.md b/release-notes.md
index 2405772b6..3a73330c9 100644
--- a/release-notes.md
+++ b/release-notes.md
@@ -1,3 +1,7 @@
+2.0.1
+---------
+* P2p rate limitng feature
+
2.0.0-RC11
---------
* Changed library for the Bcrypt hashing algorithm and added additional unit tests
diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf
index 765adc2a7..aa7addd94 100644
--- a/src/main/resources/reference.conf
+++ b/src/main/resources/reference.conf
@@ -167,6 +167,20 @@ sparkz {
# Limit for number of modifiers to request and process at once
maxRequestedPerPeer = 1024
+ # Enables or disables slow mode - ignoring requesting or broadcasting new transaction when node is overloaded.
+ slowModeFeatureFlag = false
+
+ # Threshold of average time it takes node to apply a new modifier(block or transaction),
+ # after which we consider it overloaded and start throttling.
+ slowModeThresholdMs = 2000
+
+ # Maximum number of modifiers that can be requested from remote peers,
+ # when the node is considered overloaded. Only affects transactions.
+ slowModeMaxRequested = 150
+
+ # The impact is single measurement has on a an average processing value.
+ slowModeMeasurementImpact = 0.1
+
# Desired number of inv objects. Our requests will have this size.
desiredInvObjects = 512
diff --git a/src/main/scala/sparkz/core/network/DeliveryTracker.scala b/src/main/scala/sparkz/core/network/DeliveryTracker.scala
index bd05ebbd2..c9821d7ec 100644
--- a/src/main/scala/sparkz/core/network/DeliveryTracker.scala
+++ b/src/main/scala/sparkz/core/network/DeliveryTracker.scala
@@ -4,10 +4,12 @@ import akka.actor.{ActorRef, ActorSystem, Cancellable}
import sparkz.core.consensus.ContainsModifiers
import sparkz.core.network.ModifiersStatus._
import sparkz.core.network.NodeViewSynchronizer.ReceivableMessages.CheckDelivery
+import sparkz.core.settings.NetworkSettings
import sparkz.util.SparkzEncoding
import sparkz.core.{ModifierTypeId, NodeViewModifier}
import sparkz.util.{ModifierId, SparkzLogging}
+import java.util.concurrent.TimeUnit
import scala.collection.mutable
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration
@@ -35,11 +37,17 @@ import scala.util.{Failure, Try}
* and its methods should not be called from lambdas, Future, Future.map, etc.
*/
class DeliveryTracker(system: ActorSystem,
- deliveryTimeout: FiniteDuration,
- maxDeliveryChecks: Int,
- maxRequestedPerPeer: Int,
+ networkSettings: NetworkSettings,
nvsRef: ActorRef) extends SparkzLogging with SparkzEncoding {
+ protected val deliveryTimeout: FiniteDuration = networkSettings.deliveryTimeout
+ protected val maxDeliveryChecks: Int = networkSettings.maxDeliveryChecks
+ protected val maxRequestedPerPeer: Int = networkSettings.maxRequestedPerPeer
+ protected val slowModeFeatureFlag: Boolean = networkSettings.slowModeFeatureFlag
+ protected val slowModeThresholdMs: Long = networkSettings.slowModeThresholdMs
+ protected val slowModeMaxRequested: Int = networkSettings.slowModeMaxRequested
+ protected val slowModeMeasurementImpact: Double = networkSettings.slowModeMeasurementImpact
+
protected case class RequestedInfo(peer: ConnectedPeer, cancellable: Cancellable, checks: Int)
// when a remote peer is asked for a modifier we add the requested data to `requested`
@@ -52,7 +60,10 @@ class DeliveryTracker(system: ActorSystem,
protected val invalid: mutable.HashSet[ModifierId] = mutable.HashSet()
// when our node received a modifier we put it to `received`
- protected val received: mutable.Map[ModifierId, ConnectedPeer] = mutable.Map()
+ protected val received: mutable.Map[ModifierId, (ConnectedPeer, Long)] = mutable.Map()
+
+ private var averageProcessingTimeMs: Long = 0
+ var slowMode: Boolean = false
/**
* @return status of modifier `id`.
@@ -99,8 +110,8 @@ class DeliveryTracker(system: ActorSystem,
require(isCorrectTransition(status(id), Requested), s"Illegal status transition: ${status(id)} -> Requested")
val cancellable = system.scheduler.scheduleOnce(deliveryTimeout, nvsRef, CheckDelivery(supplier, typeId, id))
requested.put(id, RequestedInfo(supplier, cancellable, checksDone)) match {
- case Some(RequestedInfo(peer,_,_)) if supplier.connectionId == peer.connectionId => //we already had this modifier, it is counted
- case Some(RequestedInfo(peer,_,_)) => decrementPeerLimitCounter(peer); incrementPeerLimitCounter(supplier)
+ case Some(RequestedInfo(peer, _, _)) if supplier.connectionId == peer.connectionId => //we already had this modifier, it is counted
+ case Some(RequestedInfo(peer, _, _)) => decrementPeerLimitCounter(peer); incrementPeerLimitCounter(supplier)
case None => incrementPeerLimitCounter(supplier)
}
}
@@ -127,6 +138,10 @@ class DeliveryTracker(system: ActorSystem,
.map(decrementPeerLimitCounter)
case Received =>
received.remove(modifierId)
+ .collect { case (peer, timestamp) =>
+ updateProcessingTime(timestamp)
+ peer
+ }
case _ =>
None
}
@@ -166,12 +181,12 @@ class DeliveryTracker(system: ActorSystem,
def setReceived(id: ModifierId, sender: ConnectedPeer): Unit =
tryWithLogging {
val oldStatus: ModifiersStatus = status(id)
- require(isCorrectTransition(oldStatus, Invalid), s"Illegal status transition: $oldStatus -> Received")
+ require(isCorrectTransition(oldStatus, Received), s"Illegal status transition: $oldStatus -> Received")
if (oldStatus != Received) {
requested(id).cancellable.cancel()
requested.remove(id)
decrementPeerLimitCounter(sender)
- received.put(id, sender)
+ received.put(id, (sender, System.nanoTime()))
}
}
@@ -181,7 +196,7 @@ class DeliveryTracker(system: ActorSystem,
case Requested =>
requested.get(id).map(_.peer)
case Received =>
- received.get(id)
+ received.get(id).map(_._1)
case _ =>
None
}
@@ -191,6 +206,19 @@ class DeliveryTracker(system: ActorSystem,
maxRequestedPerPeer - peerLimits.getOrElse(peer, 0)
}
+ /**
+ * Check if we have capacity to request more transactions from remote peers.
+ * In order to decide that node cannot request more transactions, all 3 conditions must be satisfied:
+ * - feature flag is enabled
+ * - current node is in slow mode - average time it takes to process a modifier is higher than a threshold
+ * - number of concurrently requested modifiers is bigger than a max allowed value
+ *
+ * @return
+ */
+ def canRequestMoreTransactions: Boolean = {
+ !(slowModeFeatureFlag && slowMode && requested.size > slowModeMaxRequested)
+ }
+
private def incrementPeerLimitCounter(peer: ConnectedPeer): Unit = {
peerLimits.get(peer) match {
case Some(value) => peerLimits.put(peer, value + 1)
@@ -234,6 +262,7 @@ class DeliveryTracker(system: ActorSystem,
.map(decrementPeerLimitCounter)
case Received =>
received.remove(id)
+ .foreach(peer_timestamp => updateProcessingTime(peer_timestamp._2))
case _ =>
()
}
@@ -250,4 +279,18 @@ class DeliveryTracker(system: ActorSystem,
log.warn("Unexpected error", e)
Failure(e)
}
+
+ private def updateProcessingTime(startTime: Long): Unit = {
+ if (slowModeFeatureFlag) {
+ val elapsedMs: Long = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)
+ averageProcessingTimeMs = (averageProcessingTimeMs * (1 - slowModeMeasurementImpact)).toLong + (elapsedMs * slowModeMeasurementImpact).toLong
+ if (averageProcessingTimeMs > slowModeThresholdMs && !slowMode) {
+ slowMode = true
+ logger.warn("Slow mode enabled on P2P layer due to high load. Transactions won't be requested and tx broadcast will be limited.")
+ } else if (averageProcessingTimeMs < slowModeThresholdMs && slowMode) {
+ slowMode = false
+ logger.warn("Slow mode disabled on P2P layer. Transactions will be requested or broadcasted.")
+ }
+ }
+ }
}
diff --git a/src/main/scala/sparkz/core/network/NetworkController.scala b/src/main/scala/sparkz/core/network/NetworkController.scala
index 52db1c467..3c17495f4 100644
--- a/src/main/scala/sparkz/core/network/NetworkController.scala
+++ b/src/main/scala/sparkz/core/network/NetworkController.scala
@@ -534,13 +534,17 @@ class NetworkController(settings: NetworkSettings,
}
}
- private def closeConnection(peerAddress: InetSocketAddress): Unit =
- connections.get(peerAddress).foreach { peer =>
- connections = connections.filterNot { case (address, _) => // clear all connections related to banned peer ip
- Option(peer.connectionId.remoteAddress.getAddress).exists(Option(address.getAddress).contains(_))
- }
- peer.handlerRef ! CloseConnection
+ private def closeConnection(peerAddress: InetSocketAddress): Unit = {
+ connections = connections.filter { case (_, connectedPeer) =>
+ Option(connectedPeer)
+ .filter(_.connectionId.remoteAddress.equals(peerAddress))
+ .map { peer =>
+ peer.handlerRef ! CloseConnection
+ context.system.eventStream.publish(DisconnectedPeer(peerAddress))
+ }
+ .isEmpty
}
+ }
/**
* Register a new penalty for given peer address.
diff --git a/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala b/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala
index e989f31af..cb3a7a3ae 100644
--- a/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala
+++ b/src/main/scala/sparkz/core/network/NodeViewSynchronizer.scala
@@ -54,9 +54,6 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes
modifierSerializers: Map[ModifierTypeId, SparkzSerializer[_ <: NodeViewModifier]])(implicit ec: ExecutionContext)
extends Actor with Synchronizer with SparkzLogging with SparkzEncoding {
- protected val deliveryTimeout: FiniteDuration = networkSettings.deliveryTimeout
- protected val maxDeliveryChecks: Int = networkSettings.maxDeliveryChecks
- protected val maxRequestedPerPeer: Int = networkSettings.maxRequestedPerPeer
protected val invSpec = new InvSpec(networkSettings.maxInvObjects)
protected val requestModifierSpec = new RequestModifierSpec(networkSettings.maxInvObjects)
protected val modifiersSpec = new ModifiersSpec(networkSettings.maxModifiersSpecMessageSize)
@@ -68,7 +65,7 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes
case (_: ModifiersSpec, data: ModifiersData, remote) => modifiersFromRemote(data, remote)
}
- protected val deliveryTracker = new DeliveryTracker(context.system, deliveryTimeout, maxDeliveryChecks, maxRequestedPerPeer, self)
+ protected val deliveryTracker = new DeliveryTracker(context.system, networkSettings, self)
protected val statusTracker = new SyncTracker(self, context, networkSettings, timeProvider)
protected var historyReaderOpt: Option[HR] = None
@@ -98,8 +95,12 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes
private def readersOpt: Option[(HR, MR)] = historyReaderOpt.flatMap(h => mempoolReaderOpt.map(mp => (h, mp)))
protected def broadcastModifierInv[M <: NodeViewModifier](m: M): Unit = {
- val msg = Message(invSpec, Right(InvData(m.modifierTypeId, Seq(m.id))), None)
- networkControllerRef ! SendToNetwork(msg, Broadcast)
+ m.modifierTypeId match {
+ case Transaction.ModifierTypeId if deliveryTracker.slowMode => // will not broadcast due to the high load
+ case _ =>
+ val msg = Message(invSpec, Right(InvData(m.modifierTypeId, Seq(m.id))), None)
+ networkControllerRef ! SendToNetwork(msg, Broadcast)
+ }
}
protected def viewHolderEvents: Receive = {
@@ -224,7 +225,10 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes
val modifierTypeId = invData.typeId
val newModifierIds = (modifierTypeId match {
case Transaction.ModifierTypeId =>
- invData.ids.filter(mid => deliveryTracker.status(mid, mempool) == ModifiersStatus.Unknown)
+ if (deliveryTracker.canRequestMoreTransactions)
+ invData.ids.filter(mid => deliveryTracker.status(mid, mempool) == ModifiersStatus.Unknown)
+ else
+ Seq() // do not request transactions due to the high load
case _ =>
invData.ids.filter(mid => deliveryTracker.status(mid, history) == ModifiersStatus.Unknown)
})
@@ -275,10 +279,12 @@ class NodeViewSynchronizer[TX <: Transaction, SI <: SyncInfo, SIS <: SyncInfoMes
case Some(serializer: SparkzSerializer[TX]@unchecked) if typeId == Transaction.ModifierTypeId =>
// parse all transactions and send them to node view holder
val parsed: Iterable[TX] = parseModifiers(requestedModifiers, serializer, remote)
+ parsed.foreach(tx => deliveryTracker.setReceived(tx.id, remote))
viewHolderRef ! TransactionsFromRemote(parsed)
case Some(serializer: SparkzSerializer[PMOD]@unchecked) =>
// parse all modifiers and put them to modifiers cache
+ log.info(s"Received block ids ${modifiers.keySet.map(encoder.encodeId).mkString(",")}")
val parsed: Iterable[PMOD] = parseModifiers(requestedModifiers, serializer, remote)
val valid: Iterable[PMOD] = parsed.filter(validateAndSetStatus(remote, _))
if (valid.nonEmpty) viewHolderRef ! ModifiersFromRemote[PMOD](valid)
diff --git a/src/main/scala/sparkz/core/settings/Settings.scala b/src/main/scala/sparkz/core/settings/Settings.scala
index e5a612a78..0645da1c5 100644
--- a/src/main/scala/sparkz/core/settings/Settings.scala
+++ b/src/main/scala/sparkz/core/settings/Settings.scala
@@ -31,6 +31,10 @@ case class NetworkSettings(nodeName: String,
maxDeliveryChecks: Int,
penalizeNonDelivery: Boolean,
maxRequestedPerPeer: Int,
+ slowModeFeatureFlag: Boolean,
+ slowModeThresholdMs: Long,
+ slowModeMaxRequested: Int,
+ slowModeMeasurementImpact: Double,
appVersion: String,
agentName: String,
maxModifiersSpecMessageSize: Int,
diff --git a/src/test/scala/sparkz/core/network/DeliveryTrackerSpecification.scala b/src/test/scala/sparkz/core/network/DeliveryTrackerSpecification.scala
index 6ebcd53c3..c0d2007bc 100644
--- a/src/test/scala/sparkz/core/network/DeliveryTrackerSpecification.scala
+++ b/src/test/scala/sparkz/core/network/DeliveryTrackerSpecification.scala
@@ -2,6 +2,8 @@ package sparkz.core.network
import akka.actor.{ActorRef, ActorSystem}
import akka.testkit.TestProbe
+import org.mockito.Mockito.when
+import org.mockito.MockitoSugar.mock
import org.scalatest.matchers.should.Matchers
import org.scalatest.propspec.AnyPropSpec
import org.scalatestplus.scalacheck.ScalaCheckPropertyChecks
@@ -9,9 +11,10 @@ import sparkz.ObjectGenerators
import sparkz.core.consensus.ContainsModifiers
import sparkz.core.network.ModifiersStatus._
import sparkz.core.serialization.SparkzSerializer
-import sparkz.core.{PersistentNodeViewModifier, ModifierTypeId}
+import sparkz.core.settings.NetworkSettings
+import sparkz.core.{ModifierTypeId, PersistentNodeViewModifier}
import sparkz.crypto.hash.Blake2b256
-import sparkz.util.{bytesToId, ModifierId}
+import sparkz.util.{ModifierId, bytesToId}
import scala.collection.concurrent.TrieMap
import scala.concurrent.ExecutionContext.Implicits.global
@@ -154,12 +157,76 @@ class DeliveryTrackerSpecification extends AnyPropSpec
tracker.getPeerLimit(otherPeer) shouldBe 3
}
+ property("slow mode should be enabled when average processing time exceeds threshold") {
+ val system = ActorSystem()
+ val probe = TestProbe("p")(system)
+ implicit val nvsStub: ActorRef = probe.testActor
+ val dt = FiniteDuration(3, MINUTES)
+ val networkSettings = mock[NetworkSettings]
+ when(networkSettings.deliveryTimeout).thenReturn(dt)
+ when(networkSettings.maxDeliveryChecks).thenReturn(2)
+ when(networkSettings.maxRequestedPerPeer).thenReturn(3)
+ when(networkSettings.slowModeFeatureFlag).thenReturn(true)
+ when(networkSettings.slowModeThresholdMs).thenReturn(100)
+ when(networkSettings.slowModeMeasurementImpact).thenReturn(0.1)
+ val deliveryTracker = new DeliveryTracker(
+ system,
+ networkSettings,
+ nvsRef = nvsStub)
+ deliveryTracker.slowMode shouldBe false
+ val modifiers = (1 to 10).map(int => bytesToId(Blake2b256(int+ "")))
+
+ deliveryTracker.setRequested(modifiers, mtid, cp)
+ modifiers.foreach(deliveryTracker.setReceived(_, cp))
+ deliveryTracker.slowMode shouldBe false
+ Thread.sleep(200)
+ deliveryTracker.slowMode shouldBe false
+ modifiers.foreach(deliveryTracker.setHeld)
+ deliveryTracker.slowMode shouldBe true
+ }
+
+ property("slow mode should depend on the feature flag") {
+ val system = ActorSystem()
+ val probe = TestProbe("p")(system)
+ implicit val nvsStub: ActorRef = probe.testActor
+ val dt = FiniteDuration(3, MINUTES)
+ val networkSettings = mock[NetworkSettings]
+ when(networkSettings.deliveryTimeout).thenReturn(dt)
+ when(networkSettings.maxDeliveryChecks).thenReturn(2)
+ when(networkSettings.maxRequestedPerPeer).thenReturn(3)
+ when(networkSettings.slowModeFeatureFlag).thenReturn(false)
+ when(networkSettings.slowModeThresholdMs).thenReturn(100)
+ val deliveryTracker = new DeliveryTracker(
+ system,
+ networkSettings,
+ nvsRef = nvsStub)
+ deliveryTracker.slowMode shouldBe false
+ val modifiers = (1 to 10).map(int => bytesToId(Blake2b256(int+ "")))
+
+ deliveryTracker.setRequested(modifiers, mtid, cp)
+ modifiers.foreach(deliveryTracker.setReceived(_, cp))
+ deliveryTracker.slowMode shouldBe false
+ Thread.sleep(200)
+ deliveryTracker.slowMode shouldBe false
+ modifiers.foreach(deliveryTracker.setHeld)
+ deliveryTracker.slowMode shouldBe false
+ }
+
private def genDeliveryTracker = {
val system = ActorSystem()
val probe = TestProbe("p")(system)
implicit val nvsStub: ActorRef = probe.testActor
val dt = FiniteDuration(3, MINUTES)
- new DeliveryTracker(system, deliveryTimeout = dt, maxDeliveryChecks = 2, maxRequestedPerPeer = 3, nvsStub)
+ val networkSettings = mock[NetworkSettings]
+ when(networkSettings.deliveryTimeout).thenReturn(dt)
+ when(networkSettings.maxDeliveryChecks).thenReturn(2)
+ when(networkSettings.maxRequestedPerPeer).thenReturn(3)
+ when(networkSettings.slowModeFeatureFlag).thenReturn(true)
+ when(networkSettings.slowModeThresholdMs).thenReturn(100)
+ new DeliveryTracker(
+ system,
+ networkSettings,
+ nvsRef = nvsStub)
}
}
diff --git a/src/test/scala/sparkz/core/network/NetworkControllerSpec.scala b/src/test/scala/sparkz/core/network/NetworkControllerSpec.scala
index d69563adf..2cd55441a 100644
--- a/src/test/scala/sparkz/core/network/NetworkControllerSpec.scala
+++ b/src/test/scala/sparkz/core/network/NetworkControllerSpec.scala
@@ -15,8 +15,9 @@ import org.scalatest.matchers.should.Matchers
import sparkz.core.app.{SparkzContext, Version}
import sparkz.core.network.NetworkController.ReceivableMessages.Internal.ConnectionToPeer
import sparkz.core.network.NetworkController.ReceivableMessages.{ConnectTo, GetConnectedPeers, GetPeersStatus}
+import sparkz.core.network.NodeViewSynchronizer.ReceivableMessages.DisconnectedPeer
import sparkz.core.network.message._
-import sparkz.core.network.peer.PeerManager.ReceivableMessages.{AddOrUpdatePeer, ConfirmConnection, GetAllPeers, RandomPeerForConnectionExcluding}
+import sparkz.core.network.peer.PeerManager.ReceivableMessages.{AddOrUpdatePeer, ConfirmConnection, DisconnectFromAddress, GetAllPeers, RandomPeerForConnectionExcluding}
import sparkz.core.network.peer._
import sparkz.core.settings.SparkzSettings
import sparkz.core.utils.LocalTimeProvider
@@ -522,7 +523,7 @@ class NetworkControllerSpec extends NetworkTests with ScalaFutures {
peerManagerProbe.expectMsg(RandomPeerForConnectionExcluding(Seq()))
peerManagerProbe.reply(Some(getPeerInfo(peerAddressOne)))
// Wait for the message to be received
- Thread.sleep(2)
+ Thread.sleep(200)
// Second attempt, discarding the peer we tried just before
networkControllerRef ! ConnectionToPeer(emptyActiveConnections, emptyUnconfirmedConnections)
diff --git a/src/test/scala/sparkz/core/network/NodeViewSynchronizerSpecification.scala b/src/test/scala/sparkz/core/network/NodeViewSynchronizerSpecification.scala
index 19bc251e1..aea418ad3 100644
--- a/src/test/scala/sparkz/core/network/NodeViewSynchronizerSpecification.scala
+++ b/src/test/scala/sparkz/core/network/NodeViewSynchronizerSpecification.scala
@@ -83,7 +83,7 @@ class NodeViewSynchronizerSpecification extends NetworkTests with TestImplementa
timeProvider,
modifierSerializers
) {
- override val deliveryTracker: DeliveryTracker = new DeliveryTracker(context.system, deliveryTimeout, maxDeliveryChecks, maxRequestedPerPeer, self) {
+ override val deliveryTracker: DeliveryTracker = new DeliveryTracker(context.system, settings.network, self) {
override def status(modifierId: ModifierId): ModifiersStatus = ModifiersStatus.Requested
}
}