Skip to content

Commit

Permalink
add a shared memory port allocator to allow multiple processes to sha… (
Browse files Browse the repository at this point in the history
corda#5223)

* add a shared memory port allocator to allow multiple processes to share a single allocation pool

* remove dangerous reset function on port allocator

* set forkCount = 2 in node integration test

* only allow one build of a cordapp at any given time for Driver tests

* make all portallocation requests use same starting point

* globally set forks to 6

* tweak forking parameters to allow parallel builds

* tweak unit test parallelism

* 2 workers for integrationTest

* some more tweaks for parallel builds

* some more tweaks for parallel builds

* seems that 49K is not the start of ephemeral ports on all kernels

* tweak parallel settings

* try fix RPC shutdown test in parallel env

* add some logging for RPC shutdown test

* added some logging around PortAllocation tests - try figure out where they are getting stuck

* added some logging around PortAllocation tests - try figure out where they are getting stuck

* fix api-scanner tests

* minimize api changes

* revert to complying with existing API

* add the AtomicInteger for api compatibility reasons

* make sizing script executable

* address review comments pt1

* address review comments pt2

* fix compile errors after review comments

* return to using home dir as temp dir seemed to interact badly with gradle
  • Loading branch information
roastario authored Jul 2, 2019
1 parent f89008c commit 88894bc
Show file tree
Hide file tree
Showing 36 changed files with 340 additions and 111 deletions.
11 changes: 11 additions & 0 deletions BUILD.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Corda Build

## Build Environment Variables

CORDA_CORE_TESTING_FORKS : Number of JVMS to fork for running unit tests in :core

CORDA_NODE_INT_TESTING_FORKS : Number of JVMS to fork for running integration tests in :node
CORDA_NODE_TESTING_FORKS : Number of JVMS to fork for running unit tests in :node

CORDA_INT_TESTING_FORKS : Global number of JVMS to fork for running integration tests
CORDA_TESTING_FORKS : Global number of JVMS to fork for running unit tests
20 changes: 11 additions & 9 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -232,22 +232,14 @@ allprojects {
}

tasks.withType(Test) {
forkEvery = 10
failFast = project.hasProperty('tests.failFast') ? project.property('tests.failFast').toBoolean() : false

// Prevent the project from creating temporary files outside of the build directory.
systemProperty 'java.io.tmpdir', buildDir.absolutePath

maxHeapSize = "1g"

if (project.hasProperty('test.parallel') && project.property('test.parallel').toBoolean()) {
maxParallelForks = Runtime.runtime.availableProcessors().intdiv(2) as int ?: 1
}

if (System.getProperty("test.maxParallelForks") != null) {
maxParallelForks = Integer.getInteger('test.maxParallelForks')
logger.debug("System property test.maxParallelForks found - setting max parallel forks to $maxParallelForks for $project")
}

if (project.path.startsWith(':experimental') && System.getProperty("experimental.test.enable") == null) {
enabled = false
}
Expand All @@ -257,6 +249,16 @@ allprojects {
extensions.configure(TypeOf.typeOf(JacocoTaskExtension)) { ex ->
ex.append = false
}

maxParallelForks = (System.env.CORDA_TESTING_FORKS == null) ? 1 : "$System.env.CORDA_TESTING_FORKS".toInteger()

systemProperty 'java.security.egd', 'file:/dev/./urandom'
}

tasks.withType(Test){
if (name.contains("integrationTest")){
maxParallelForks = (System.env.CORDA_INT_TESTING_FORKS == null) ? 1 : "$System.env.CORDA_INT_TESTING_FORKS".toInteger()
}
}

group 'net.corda'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,16 @@ import net.corda.core.internal.toPath
import net.corda.core.messaging.*
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.getOrThrow
import net.corda.finance.DOLLARS
import net.corda.finance.POUNDS
import net.corda.finance.USD
import net.corda.finance.contracts.asset.Cash
import net.corda.finance.workflows.getCashBalance
import net.corda.finance.workflows.getCashBalances
import net.corda.finance.flows.CashIssueFlow
import net.corda.finance.flows.CashPaymentFlow
import net.corda.finance.workflows.getCashBalance
import net.corda.finance.workflows.getCashBalances
import net.corda.node.internal.NodeWithInfo
import net.corda.node.services.Permissions.Companion.all
import net.corda.testing.common.internal.checkNotOnClasspath
Expand Down Expand Up @@ -47,6 +48,7 @@ import kotlin.test.assertTrue
class CordaRPCClientTest : NodeBasedTest(listOf("net.corda.finance"), notaries = listOf(DUMMY_NOTARY_NAME)) {
companion object {
val rpcUser = User("user1", "test", permissions = setOf(all()))
val log = contextLogger()
}

private lateinit var node: NodeWithInfo
Expand Down Expand Up @@ -97,13 +99,13 @@ class CordaRPCClientTest : NodeBasedTest(listOf("net.corda.finance"), notaries =
val nodeIsShut: PublishSubject<Unit> = PublishSubject.create()
val latch = CountDownLatch(1)
var successful = false
val maxCount = 20
val maxCount = 120
var count = 0
CloseableExecutor(Executors.newSingleThreadScheduledExecutor()).use { scheduler ->

val task = scheduler.scheduleAtFixedRate({
try {
println("Checking whether node is still running...")
log.info("Checking whether node is still running...")
client.start(rpcUser.username, rpcUser.password).use {
println("... node is still running.")
if (count == maxCount) {
Expand All @@ -112,7 +114,7 @@ class CordaRPCClientTest : NodeBasedTest(listOf("net.corda.finance"), notaries =
count++
}
} catch (e: RPCException) {
println("... node is not running.")
log.info("... node is not running.")
nodeIsShut.onCompleted()
} catch (e: ActiveMQSecurityException) {
// nothing here - this happens if trying to connect before the node is started
Expand All @@ -122,7 +124,7 @@ class CordaRPCClientTest : NodeBasedTest(listOf("net.corda.finance"), notaries =
}, 1, 1, TimeUnit.SECONDS)

nodeIsShut.doOnError { error ->
error.printStackTrace()
log.error("FAILED TO SHUT DOWN NODE DUE TO", error)
successful = false
task.cancel(true)
latch.countDown()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class RPCStabilityTests {
val testSerialization = SerializationEnvironmentRule(true)

private val pool = Executors.newFixedThreadPool(10, testThreadFactory())
private val portAllocation = incrementalPortAllocation(10000)
private val portAllocation = incrementalPortAllocation()

@After
fun shutdown() {
Expand Down
6 changes: 3 additions & 3 deletions core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,9 @@ configurations {
testArtifacts.extendsFrom testRuntimeClasspath
}

tasks.withType(Test) {
// fork a new test process for every test class
forkEvery = 10

test{
maxParallelForks = (System.env.CORDA_CORE_TESTING_FORKS == null) ? 1 : "$System.env.CORDA_CORE_TESTING_FORKS".toInteger()
}

task testJar(type: Jar) {
Expand Down
9 changes: 3 additions & 6 deletions node/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -207,18 +207,15 @@ tasks.withType(JavaCompile) {
options.compilerArgs << '-proc:none'
}

tasks.withType(Test) {
test {
maxHeapSize = "2g"
// fork a new test process for every test class
forkEvery = 10
maxParallelForks = (System.env.CORDA_NODE_TESTING_FORKS == null) ? 1 : "$System.env.CORDA_NODE_TESTING_FORKS".toInteger()
}

task integrationTest(type: Test) {
testClassesDirs = sourceSets.integrationTest.output.classesDirs
classpath = sourceSets.integrationTest.runtimeClasspath

systemProperty 'testing.global.port.allocation.enabled', true
systemProperty 'testing.global.port.allocation.starting.port', 10000
maxParallelForks = (System.env.CORDA_NODE_INT_TESTING_FORKS == null) ? 1 : "$System.env.CORDA_NODE_INT_TESTING_FORKS".toInteger()
}

// quasar exclusions upon agent code instrumentation at run-time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import java.net.ServerSocket
class AddressBindingFailureTests {

companion object {
private val portAllocation = incrementalPortAllocation(20_000)
private val portAllocation = incrementalPortAllocation()
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class AMQPBridgeTest {

private val BOB = TestIdentity(BOB_NAME)

private val portAllocation = incrementalPortAllocation(10000)
private val portAllocation = incrementalPortAllocation()
private val artemisAddress = portAllocation.nextHostAndPort()
private val amqpAddress = portAllocation.nextHostAndPort()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class CertificateRevocationListNodeTests {
private val ROOT_CA = DEV_ROOT_CA
private lateinit var INTERMEDIATE_CA: CertificateAndKeyPair

private val portAllocation = incrementalPortAllocation(10000)
private val portAllocation = incrementalPortAllocation()
private val serverPort = portAllocation.nextPort()

private lateinit var server: CrlServer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class ProtonWrapperTests {
@JvmField
val temporaryFolder = TemporaryFolder()

private val portAllocation = incrementalPortAllocation(15000) // use 15000 to move us out of harms way
private val portAllocation = incrementalPortAllocation() // use 15000 to move us out of harms way
private val serverPort = portAllocation.nextPort()
private val serverPort2 = portAllocation.nextPort()
private val artemisPort = portAllocation.nextPort()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService

class FlowsDrainingModeContentionTest {
private val portAllocation = incrementalPortAllocation(10000)
private val portAllocation = incrementalPortAllocation()
private val user = User("mark", "dadada", setOf(all()))
private val users = listOf(user)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ import net.corda.core.internal.concurrent.map
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.core.utilities.unwrap
import net.corda.node.logging.logFile
import net.corda.node.services.Permissions
import net.corda.nodeapi.internal.hasCancelledDrainingShutdown
import net.corda.testing.core.ALICE_NAME
Expand All @@ -21,7 +19,6 @@ import net.corda.testing.driver.internal.incrementalPortAllocation
import net.corda.testing.internal.chooseIdentity
import net.corda.testing.node.User
import net.corda.testing.node.internal.waitForShutdown
import org.assertj.core.api.Assertions
import org.assertj.core.api.AssertionsForInterfaceTypes.assertThat
import org.junit.After
import org.junit.Before
Expand All @@ -37,7 +34,7 @@ class P2PFlowsDrainingModeTest {
private val logger = contextLogger()
}

private val portAllocation = incrementalPortAllocation(10000)
private val portAllocation = incrementalPortAllocation()
private val user = User("mark", "dadada", setOf(Permissions.all()))
private val users = listOf(user)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import org.junit.Test

class RpcFlowsDrainingModeTest {

private val portAllocation = incrementalPortAllocation(10000)
private val portAllocation = incrementalPortAllocation()
private val user = User("mark", "dadada", setOf(Permissions.all()))
private val users = listOf(user)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import kotlin.test.assertTrue

class H2SecurityTests {
companion object {
private val port = incrementalPortAllocation(21_000)
private val port = incrementalPortAllocation()
private fun getFreePort() = port.nextPort()
private const val h2AddressKey = "h2Settings.address"
private const val dbPasswordKey = "dataSourceProperties.dataSource.password"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class ArtemisMessagingTest {
val temporaryFolder = TemporaryFolder()

// THe
private val portAllocation = incrementalPortAllocation(10000)
private val portAllocation = incrementalPortAllocation()
private val serverPort = portAllocation.nextPort()
private val identity = generateKeyPair()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class NetworkMapTest(var initFunc: (URL, NetworkMapServer) -> CompatibilityZoneP
val testSerialization = SerializationEnvironmentRule(true)

private val cacheTimeout = 1.seconds
private val portAllocation = incrementalPortAllocation(10000)
private val portAllocation = incrementalPortAllocation()

private lateinit var networkMapServer: NetworkMapServer
private lateinit var compatibilityZone: CompatibilityZoneParams
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import java.nio.file.Path
import javax.security.auth.x500.X500Principal

class ArtemisRpcTests {
private val ports: PortAllocation = incrementalPortAllocation(10000)
private val ports: PortAllocation = incrementalPortAllocation()

private val user = User("mark", "dadada", setOf(all()))
private val users = listOf(user)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,15 @@ import net.corda.finance.contracts.asset.Cash
import net.corda.finance.flows.CashIssueAndPaymentFlow
import net.corda.finance.schemas.CashSchemaV1
import net.corda.node.services.Permissions
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.DUMMY_BANK_A_NAME
import net.corda.testing.core.DUMMY_BANK_B_NAME
import net.corda.testing.core.DUMMY_NOTARY_NAME
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.OutOfProcess
import net.corda.testing.driver.driver
import net.corda.testing.driver.internal.OutOfProcessImpl
import net.corda.testing.driver.internal.incrementalPortAllocation
import net.corda.testing.node.User
import net.corda.testing.node.internal.FINANCE_CORDAPPS
import org.junit.ClassRule
import org.junit.Test
import java.util.*
import java.util.concurrent.CountDownLatch
Expand All @@ -47,7 +44,7 @@ class RpcReconnectTests {
private val log = contextLogger()
}

private val portAllocator = incrementalPortAllocation(20006)
private val portAllocator = incrementalPortAllocation()

/**
* This test showcases and stress tests the demo [ReconnectingCordaRPCOps].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class HardRestartTest {
fun restartShortPingPongFlowRandomly() {
val demoUser = User("demo", "demo", setOf(Permissions.startFlow<Ping>(), Permissions.all()))
driver(DriverParameters(
portAllocation = incrementalPortAllocation(10000),
portAllocation = incrementalPortAllocation(),
startNodesInProcess = false,
inMemoryDB = false,
notarySpecs = emptyList(),
Expand Down Expand Up @@ -101,7 +101,7 @@ class HardRestartTest {
fun restartLongPingPongFlowRandomly() {
val demoUser = User("demo", "demo", setOf(Permissions.startFlow<Ping>(), Permissions.all()))
driver(DriverParameters(
portAllocation = incrementalPortAllocation(10000),
portAllocation = incrementalPortAllocation(),
startNodesInProcess = false,
inMemoryDB = false,
notarySpecs = emptyList(),
Expand Down Expand Up @@ -139,7 +139,7 @@ class HardRestartTest {
fun softRestartLongPingPongFlowRandomly() {
val demoUser = User("demo", "demo", setOf(Permissions.startFlow<Ping>(), Permissions.all()))
driver(DriverParameters(
portAllocation = incrementalPortAllocation(10000),
portAllocation = incrementalPortAllocation(),
startNodesInProcess = false,
inMemoryDB = false,
notarySpecs = emptyList(),
Expand Down Expand Up @@ -221,7 +221,7 @@ class HardRestartTest {
fun restartRecursiveFlowRandomly() {
val demoUser = User("demo", "demo", setOf(Permissions.startFlow<RecursiveA>(), Permissions.all()))
driver(DriverParameters(
portAllocation = incrementalPortAllocation(10000),
portAllocation = incrementalPortAllocation(),
startNodesInProcess = false,
inMemoryDB = false,
notarySpecs = emptyList(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class NodeRegistrationTest {
@JvmField
val testSerialization = SerializationEnvironmentRule(true)

private val portAllocation = incrementalPortAllocation(13000)
private val portAllocation = incrementalPortAllocation()
private val registrationHandler = RegistrationHandler(DEV_ROOT_CA)
private lateinit var server: NetworkMapServer
private lateinit var serverHostAndPort: NetworkHostAndPort
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.junit.Test
import java.util.*

class AdditionP2PAddressModeTest {
private val portAllocation = incrementalPortAllocation(27182)
private val portAllocation = incrementalPortAllocation()
@Test
fun `runs nodes with one configured to use additionalP2PAddresses`() {
val testUser = User("test", "test", setOf(all()))
Expand Down
4 changes: 3 additions & 1 deletion node/src/test/kotlin/net/corda/node/CordaRPCOpsImplTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ class CordaRPCOpsImplTest {

@After
fun cleanUp() {
mockNet.stopNodes()
if (::mockNet.isInitialized) {
mockNet.stopNodes()
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ import net.corda.testing.core.singleIdentityAndCert
import net.corda.testing.node.MockServices
import net.corda.testing.node.makeTestIdentityService
import org.bouncycastle.asn1.DEROctetString
import org.junit.Ignore
import org.junit.Test
import kotlin.test.assertEquals

class KMSUtilsTests {
@Test
@Ignore
fun `should generate certificates with the correct role`() {
val aliceKey = generateKeyPair()
val alice = getTestPartyAndCertificate(ALICE_NAME, aliceKey.public)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import net.corda.core.utilities.getOrThrow
import net.corda.node.services.schema.NodeSchemaService
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.notary.experimental.raft.RaftNotarySchemaV1
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.driver.internal.incrementalPortAllocation
Expand Down Expand Up @@ -45,7 +44,7 @@ class RaftTransactionCommitLogTests {
val testSerialization = SerializationEnvironmentRule(true)

private val databases: MutableList<CordaPersistence> = mutableListOf()
private val portAllocation = incrementalPortAllocation(10000)
private val portAllocation = incrementalPortAllocation()

private lateinit var cluster: List<Member>

Expand Down
Loading

0 comments on commit 88894bc

Please sign in to comment.