Skip to content

Connecting to Epmd no longer blocks. #21

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@
<version>4.8.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.9.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scala-tools.testing</groupId>
<artifactId>specs_${scala.version}</artifactId>
Expand Down
114 changes: 100 additions & 14 deletions src/main/scala/scalang/epmd/Epmd.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,30 +21,81 @@ import netty.bootstrap._
import netty.channel._
import socket.nio._
import overlock.threadpool._
import com.codahale.logula.Logging

object Epmd {
case class EpmdConfig(
host: String,
port: Int,
connectOnInit: Boolean = true,
retries: Option[Int] = Some(Epmd.defaultRetries),
retryInterval: Option[Double] = None,
connectionTimeout: Option[Int] = None
)

object Epmd extends Logging {
private val futurePollingInterval = 100 /* nanos */
val defaultPort = 4369
val defaultRetries = 5
val defaultRetryInterval = 2.0 /*seconds*/

lazy val bossPool = ThreadPool.instrumentedElastic("scalang.epmd", "boss", 1, 20)
lazy val workerPool = ThreadPool.instrumentedElastic("scalang.epmd", "worker", 1, 20)

def apply(host : String) : Epmd = {
val port = Option(System.getenv("ERL_EPMD_PORT")).map(_.toInt).getOrElse(defaultPort)
new Epmd(host, port)
def apply(host: String, port: Option[Int] = None): Epmd = {
val epmdPort = port match {
case Some(p) => p
case None => Option(System.getenv("ERL_EPMD_PORT")).map(_.toInt).getOrElse(defaultPort)
}
Epmd(new EpmdConfig(host, epmdPort))
}

def apply(host : String, port : Int) : Epmd = {
new Epmd(host, port)
def apply(cfg: EpmdConfig): Epmd = {
val epmd = new Epmd(cfg.host, cfg.port, cfg.connectionTimeout)
if (cfg.connectOnInit) {
connectWithRetries(epmd, cfg)
}
epmd
}

def connectWithRetries(epmd: Epmd, cfg: EpmdConfig) {
var future = epmd.connect
while (!future.isDone) {
Thread.sleep(futurePollingInterval)
}

if (cfg.retries.isDefined) {
val retries = cfg.retries.get

val retryInterval = cfg.retryInterval.getOrElse(defaultRetryInterval)
val retryIntervalMillis = (retryInterval * 1000.0).toInt

var numRetries = 0
while (!epmd.connected && numRetries < retries) {
// Retry the connection
if (!future.isSuccess) {
log.warn("epmd connection failed. Retrying in %.1f seconds", retryInterval)
Thread.sleep(retryIntervalMillis)
future = epmd.connect
numRetries += 1
}
// Poll the future
while (!future.isDone) {
Thread.sleep(futurePollingInterval)
}
}
}
}
}

class Epmd(val host : String, val port : Int) {
class Epmd(val host : String, val port : Int, val defaultTimeout: Option[Int] = None) extends Logging {
var channel: Channel = null
val handler = new EpmdHandler

val bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(
Epmd.bossPool,
Epmd.workerPool))

val handler = new EpmdHandler

bootstrap.setPipelineFactory(new ChannelPipelineFactory {
def getPipeline : ChannelPipeline = {
Channels.pipeline(
Expand All @@ -53,29 +104,63 @@ class Epmd(val host : String, val port : Int) {
handler)
}
})
setTimeout(defaultTimeout)


def setTimeout(timeout: Option[Int]) {
if (timeout.isDefined) {
bootstrap.setOption("connectTimeoutMillis", timeout.get * 1000)
}
}

def connect: ChannelFuture = {
val connectFuture = bootstrap.connect(new InetSocketAddress(host, port))
connectFuture.addListener(new ChannelFutureListener {
def operationComplete(future: ChannelFuture) {
if (!future.isSuccess) {
log.error(future.getCause, "Failed to connect to epmd on %s:%s", host, port)
} else {
channel = future.getChannel
}
}
})
connectFuture
}

val connectFuture = bootstrap.connect(new InetSocketAddress(host, port))
val channel = connectFuture.awaitUninterruptibly.getChannel
if(!connectFuture.isSuccess) {
throw connectFuture.getCause
def connectBlocking: Epmd = {
val connectFuture = bootstrap.connect(new InetSocketAddress(host, port))
channel = connectFuture.awaitUninterruptibly.getChannel
this
}

def close {
channel.close
}

def connected = (channel != null)

def alive(portNo : Int, nodeName : String) : Option[Int] = {
if (!connected) {
log.error("'alive(%s, %s)' called before Epmd connected!", portNo, nodeName)
return None
}

channel.write(AliveReq(portNo,nodeName))
val response = handler.response.call.asInstanceOf[AliveResp]
if (response.result == 0) {
Some(response.creation)
} else {
error("Epmd response was: " + response.result)
log.error("Epmd response was: " + response.result)
None
}
}

def lookupPort(nodeName : String) : Option[Int] = {
if(!connected) {
log.error("'lookupPort(%s)' called before Epmd connected!", nodeName)
return None
}

channel.write(PortPleaseReq(nodeName))
handler.response.call match {
case PortPleaseResp(portNo, _) => Some(portNo)
Expand All @@ -84,3 +169,4 @@ class Epmd(val host : String, val port : Int) {
}
}


2 changes: 2 additions & 0 deletions src/main/scala/scalang/epmd/EpmdMessages.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
//
package scalang.epmd

// Requests:
case class AliveReq(portNo : Int, nodeName : String)

case class AliveResp(result : Int, creation : Int)

// Responses:
case class PortPleaseReq(nodeName : String)

case class PortPleaseError(result : Int)
Expand Down
11 changes: 10 additions & 1 deletion src/test/scala/scalang/TestHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,18 @@ object Escript {

object EpmdCmd {
def apply() : SysProcess = {
val builder = new ProcessBuilder("epmd")
val osName = System.getProperty("os.name").toLowerCase
var builder : ProcessBuilder = null
if (!osName.contains("win")) {
builder = new ProcessBuilder("bash", "-c", "export PATH=" + formatPath + " && epmd")
} else {
builder = new ProcessBuilder("epmd")
}
builder.start
}

val additionalPaths = List("/usr/local/bin", "/usr/local/sbin")
def formatPath: String = additionalPaths.mkString(":") + ":" + System.getenv("PATH")
}

object ReadLine {
Expand Down
48 changes: 46 additions & 2 deletions src/test/scala/scalang/epmd/EpmdSpec.scala
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package scalang.epmd

import org.specs._
import org.specs.runner._
import mock.Mockito
import java.lang.{Process => SysProcess}
import scalang._
import org.jboss.netty.channel.ChannelFuture

class EpmdSpec extends SpecificationWithJUnit {
class EpmdSpec extends SpecificationWithJUnit with Mockito {
"Epmd" should {
var proc : SysProcess = null
doBefore {
Expand Down Expand Up @@ -36,4 +37,47 @@ class EpmdSpec extends SpecificationWithJUnit {
epmdQuery.close
}
}

"Epmd object" should {
"return an Epmd directly" in {
val noConnectConfig = new EpmdConfig("localhost", Epmd.defaultPort, connectOnInit = false)
val epmd = Epmd(noConnectConfig)
epmd.connected must(be(false))
}

"connect with retries" in {
val epmd = mock[Epmd]
val future = mock[ChannelFuture]
epmd.connect.returns(future)

// Always returns true so that it's never polled
future.isDone.returns(true)

// 'connected' is checked before 'future.isSuccess' since there should be a polling step
// that's being skipped in each test.
epmd.connected
.returns(false)
.thenReturns(false)
.thenReturns(false)
.thenReturns(false)
.thenReturns(true)
future.isSuccess
.returns(false)
.thenReturns(false)
.thenReturns(false)
.thenReturns(true)

val retryCfg = new EpmdConfig(
"localhost",
Epmd.defaultPort,
connectOnInit = true,
connectionTimeout = Some(1),
retries = Some(10),
retryInterval = Some(1)
)
Epmd.connectWithRetries(epmd, retryCfg)

there was 4.times(epmd).connect
}
}
}