Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support Eureka discovery #242

Draft
wants to merge 15 commits into
base: main
Choose a base branch
from
9 changes: 9 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ val userProjects: Seq[ProjectReference] = Seq[ProjectReference](
discoveryAwsApi,
discoveryAwsApiAsync,
discoveryConsul,
discoveryEureka,
discoveryKubernetesApi,
discoveryMarathonApi,
leaseKubernetes,
Expand Down Expand Up @@ -105,6 +106,14 @@ lazy val discoveryConsul = pekkoModule("discovery-consul")
libraryDependencies := Dependencies.discoveryConsul,
mimaPreviousArtifactsSet)

lazy val discoveryEureka = pekkoModule("discovery-eureka")
.enablePlugins(AutomateHeaderPlugin, ReproducibleBuildsPlugin)
.disablePlugins(MimaPlugin)
.settings(
name := "pekko-discovery-eureka",
libraryDependencies := Dependencies.discoveryEureka,
mimaPreviousArtifactsSet)

// gathers all enabled routes and serves them (HTTP or otherwise)
lazy val management = pekkoModule("management")
.enablePlugins(AutomateHeaderPlugin, ReproducibleBuildsPlugin)
Expand Down
19 changes: 19 additions & 0 deletions discovery-eureka/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# SPDX-License-Identifier: Apache-2.0

######################################################
# Apache Pekko Service Discovery Eureka Config #
######################################################

pekko.discovery {
eureka {
class = org.apache.pekko.discovery.eureka.EurekaServiceDiscovery
# default eureka scheme
eureka-scheme = "http"
# default eureka host
eureka-host = "127.0.0.1"
# default eureka port
eureka-port = 8761
# default eureka-path
eureka-path = "eureka"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pekko.discovery.eureka

import scala.collection.immutable

object EurekaResponse {
case class Application(name: String, instance: immutable.Seq[Instance])
case class Instance(hostName: String, app: String, vipAddress: String, ipAddr: Option[String],
status: String, port: PortWrapper, securePort: PortWrapper, dataCenterInfo: Option[DataCenterInfo],
lastDirtyTimestamp: String)
case class Status()
case class PortWrapper(port: Int, enabled: Boolean)
case class DataCenterInfo(name: String = "MyOwn",
clz: String = "com.netflix.appinfo.InstanceInfo$DefaultDataCenterInfo")
}

import EurekaResponse._

case class EurekaResponse(application: Application, errorCode: Option[String])
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pekko.discovery.eureka

import org.apache.pekko
import pekko.actor.ActorSystem
import pekko.discovery.ServiceDiscovery.{ Resolved, ResolvedTarget }
import pekko.discovery.eureka.EurekaServiceDiscovery.{ pick, targets }
import pekko.discovery.eureka.JsonFormat._
import pekko.discovery.{ Lookup, ServiceDiscovery }
import pekko.event.{ LogSource, Logging }
import pekko.http.scaladsl.Http
import pekko.http.scaladsl.model.{ HttpRequest, MediaRange, MediaTypes, Uri }
import pekko.http.scaladsl.model.headers._
import pekko.http.scaladsl.unmarshalling.Unmarshal

import java.net.InetAddress
import scala.collection.immutable
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import scala.util.Try

object EurekaServiceDiscovery {
private[eureka] def pick(
instances: immutable.Seq[EurekaResponse.Instance]): Future[immutable.Seq[EurekaResponse.Instance]] = {
Future.successful(instances.collect {
case instance if instance.status == "UP" => instance
})
}

private[eureka] def targets(instances: immutable.Seq[EurekaResponse.Instance]): immutable.Seq[ResolvedTarget] = {
instances.map { instance =>
ResolvedTarget(
host = instance.hostName,
port = Some(instance.port.port),
address = instance.ipAddr.flatMap(ip => Try(InetAddress.getByName(ip)).toOption))
}
}
}

class EurekaServiceDiscovery(implicit system: ActorSystem) extends ServiceDiscovery {

import system.dispatcher

private val log = Logging(system, getClass)(LogSource.fromClass)
private val settings = EurekaSettings(system)
private val (scheme, host, port, path) =
(settings.scheme, settings.host, settings.port, settings.path)
private val http = Http()

override def lookup(lookup: Lookup, resolveTimeout: FiniteDuration): Future[ServiceDiscovery.Resolved] = {

val uriPath = Uri.Path.Empty / path / "apps" / lookup.serviceName
val uri = Uri.from(scheme = scheme, host = host, port = port).withPath(uriPath)
val request = HttpRequest(uri = uri,
headers = immutable.Seq(`Accept-Encoding`(HttpEncodings.gzip), Accept(MediaRange(MediaTypes.`application/json`))))

log.info("Requesting seed nodes by: {}", request.uri)

for {
response <- http.singleRequest(request)
entity <- response.entity.toStrict(resolveTimeout)
response <- {
log.debug("Eureka response: [{}]", entity.data.utf8String)
val unmarshalled = Unmarshal(entity).to[EurekaResponse]
unmarshalled.failed.foreach { _ =>
log.error(
"Failed to unmarshal Eureka response status [{}], entity: [{}], uri: [{}]",
response.status.value,
entity.data.utf8String,
uri)
}
unmarshalled
}
instances <- pick(response.application.instance)
} yield Resolved(lookup.serviceName, targets(instances))

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pekko.discovery.eureka

import org.apache.pekko
import pekko.actor.{
ActorSystem,
ClassicActorSystemProvider,
ExtendedActorSystem,
Extension,
ExtensionId,
ExtensionIdProvider
}
import pekko.annotation.ApiMayChange

@ApiMayChange
final class EurekaSettings(system: ExtendedActorSystem) extends Extension {
private val eurekaConfig = system.settings.config.getConfig("pekko.discovery.eureka")

val scheme: String = eurekaConfig.getString("eureka-scheme")
val host: String = eurekaConfig.getString("eureka-host")
val port: Int = eurekaConfig.getInt("eureka-port")
val path: String = eurekaConfig.getString("eureka-path")
}

@ApiMayChange
object EurekaSettings extends ExtensionId[EurekaSettings] with ExtensionIdProvider {
override def get(system: ActorSystem): EurekaSettings = super.get(system)

override def get(system: ClassicActorSystemProvider): EurekaSettings = super.get(system)

override def lookup: EurekaSettings.type = EurekaSettings

override def createExtension(system: ExtendedActorSystem): EurekaSettings = new EurekaSettings(system)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/

/*
* Copyright (C) 2017-2021 Lightbend Inc. <https://www.lightbend.com>
*/

package org.apache.pekko.discovery.eureka

import org.apache.pekko.discovery.eureka.EurekaResponse.{ Application, DataCenterInfo, Instance, PortWrapper }
import org.apache.pekko.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import spray.json._

object JsonFormat extends SprayJsonSupport with DefaultJsonProtocol {
implicit val portFormat: JsonFormat[PortWrapper] = new JsonFormat[PortWrapper] {

override def read(json: JsValue): PortWrapper = {
json.asJsObject.getFields("$", "@enabled") match {
case Seq(JsNumber(port), JsString(enabled)) => PortWrapper(port.toInt, enabled.toBoolean)
case _ => throw DeserializationException("PortWrapper expected")
}
}

override def write(obj: PortWrapper): JsValue = JsObject(
"$" -> JsNumber(obj.port),
"@enabled" -> JsString(obj.enabled.toString))
}
implicit val dataCenterInfoFormat: JsonFormat[DataCenterInfo] = new JsonFormat[DataCenterInfo] {

override def read(json: JsValue): DataCenterInfo = {
json.asJsObject.getFields("name", "@class") match {
case Seq(JsString(name), JsString(clz)) => DataCenterInfo(name, clz)
case _ => throw DeserializationException("DataCenterInfo expected")
}
}

override def write(obj: DataCenterInfo): JsValue = JsObject(
"name" -> JsString(obj.name),
"@class" -> JsString(obj.clz))
}
implicit val instanceFormat: JsonFormat[Instance] = jsonFormat9(Instance.apply)
implicit val applicationFormat: JsonFormat[Application] = jsonFormat2(Application.apply)
implicit val rootFormat: RootJsonFormat[EurekaResponse] = jsonFormat2(EurekaResponse.apply)
}
15 changes: 15 additions & 0 deletions discovery-eureka/src/test/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>

<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<target>System.out</target>
<encoder>
<pattern>%date{MM/dd HH:mm:ss} %-5level[%thread] %logger{1} - %m%n%xException</pattern>
</encoder>
</appender>

<root level="INFO">
<appender-ref ref="CONSOLE"/>
</root>

</configuration>
Loading
Loading