Skip to content

Commit

Permalink
chore: avoiding calling through the mesh to the proxy (#1055)
Browse files Browse the repository at this point in the history
  • Loading branch information
franciscolopezsancho authored Aug 4, 2022
1 parent 96cdf54 commit 76ede4d
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public KalixTestKit start(final Config config) {
proxyContainer.start();
started = true;
// pass on proxy and host to GrpcClients to allow for inter-component communication
GrpcClients.get(runner.system()).setSelfServicePort(proxyContainer.getProxyPort());
GrpcClients.get(runner.system()).setProxyPort(proxyContainer.getProxyPort());
return this;
}

Expand Down
26 changes: 16 additions & 10 deletions sdk/java-sdk/src/main/scala/kalix/javasdk/impl/DiscoveryImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@ class DiscoveryImpl(system: ActorSystem, services: Map[String, Service]) extends
* Discover what components the user function wishes to serve.
*/
override def discover(in: ProxyInfo): scala.concurrent.Future[Spec] = {
log.info(
"Received discovery call from [{} {}] at [{}]:[{}] supporting Kalix protocol {}.{}",
in.proxyName,
in.proxyVersion,
in.internalProxyHostname,
in.proxyPort,
in.protocolMajorVersion,
in.protocolMinorVersion)
if (isVersionProbe(in)) {
// only (silently) send service info for hybrid proxy version probe
Future.successful(Spec(serviceInfo = Some(serviceInfo)))
Expand All @@ -78,23 +86,21 @@ class DiscoveryImpl(system: ActorSystem, services: Map[String, Service]) extends
val proxyTerminatedPromise = if (in.devMode) Promise.successful[Done](Done) else Promise[Done]()
proxyTerminatedRef.getAndSet(proxyTerminatedPromise).trySuccess(Done)

log.info(
"Received discovery call from [{} {}] at [{}] supporting Kalix protocol {}.{}",
in.proxyName,
in.proxyVersion,
in.proxyHostname,
in.protocolMajorVersion,
in.protocolMinorVersion)
log.debug(s"Supported sidecar entity types: {}", in.supportedEntityTypes.mkString("[", ",", "]"))

val unsupportedServices = services.values.filterNot { service =>
in.supportedEntityTypes.contains(service.componentType)
}

val grpcClients = GrpcClients.get(system)
val grpcClients = GrpcClients(system)
// pass the deployed name of the service on to GrpcClients for cross component calls
GrpcClients.get(system).setProxyHostname(in.proxyHostname)

if (in.internalProxyHostname.isEmpty) {
// for backward compatibiliy with proxy 1.0.14 or older
grpcClients.setProxyHostname(in.proxyHostname)
} else {
grpcClients.setProxyHostname(in.internalProxyHostname)
}
grpcClients.setProxyPort(in.proxyPort)
grpcClients.setIdentificationInfo(in.identificationInfo)

if (unsupportedServices.nonEmpty) {
Expand Down
30 changes: 19 additions & 11 deletions sdk/java-sdk/src/main/scala/kalix/javasdk/impl/GrpcClients.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ final class GrpcClients(system: ExtendedActorSystem) extends Extension {
private val log = LoggerFactory.getLogger(classOf[GrpcClients])

@volatile private var proxyHostname: Option[String] = None
@volatile private var selfPort: Option[Int] = None
@volatile private var proxyPort: Option[Int] = None
@volatile private var identificationInfo: Option[IdentificationInfo] = None
private implicit val ec: ExecutionContext = system.dispatcher
private val clients = new ConcurrentHashMap[Key, AnyRef]()
Expand All @@ -84,15 +84,14 @@ final class GrpcClients(system: ExtendedActorSystem) extends Extension {
identificationInfo = info
}

def setSelfServicePort(port: Int): Unit = {
def setProxyPort(port: Int): Unit = {
log.debug("Setting port to: [{}]", port)
selfPort = Some(port)
proxyPort = Some(port)
}

def getComponentGrpcClient[T](serviceClass: Class[T]): T = {
getLocalGrpcClient(serviceClass)
getProxyGrpcClient(serviceClass)
}

def getProxyGrpcClient[T](serviceClass: Class[T]): T = {
getLocalGrpcClient(serviceClass)
}
Expand All @@ -104,13 +103,22 @@ final class GrpcClients(system: ExtendedActorSystem) extends Extension {
def getGrpcClient[T](serviceClass: Class[T], service: String): T =
getGrpcClient(serviceClass, service, port = 80, remoteAddHeader)

/** Local gRPC clients point to services (user components or Kalix services) in the same deployable */
/** gRPC clients point to services (user components or Kalix services) in the same deployable */
private def getLocalGrpcClient[T](serviceClass: Class[T]): T = {
proxyHostname match {
case Some("localhost") => getGrpcClient(serviceClass, "localhost", selfPort.getOrElse(9000), localAddHeader)
case Some(selfName) => getGrpcClient(serviceClass, selfName, 80, localAddHeader)
case None =>
throw new IllegalStateException("Self service name not set by proxy at discovery, too old proxy version?")
(proxyHostname, proxyPort) match {
case (Some(internalProxyHostname), Some(port)) =>
getGrpcClient(serviceClass, internalProxyHostname, port, localAddHeader)
// for backward compatibiliy with proxy 1.0.14 or older.
case (Some("localhost"), None) =>
log.warn("you are using an old version of the Kalix proxy")
getGrpcClient(serviceClass, "localhost", proxyPort.getOrElse(9000), localAddHeader)
// for backward compatibiliy with proxy 1.0.14 or older
case (Some(proxyHostname), None) =>
log.warn("you are using an old version of the Kalix proxy")
getGrpcClient(serviceClass, proxyHostname, 80, localAddHeader)
case _ =>
throw new IllegalStateException(
"Service proxy hostname and port are not set by proxy at discovery, too old proxy version?")
}
}

Expand Down

0 comments on commit 76ede4d

Please sign in to comment.