Skip to content

Commit

Permalink
eval: add support for IPv6 (#1647)
Browse files Browse the repository at this point in the history
Remove support for Eureka as we are no longer using
it and clarify that the lookup is based on Edda.
Updates the mapping to support using the IPv6 address
of the LWC API instances.
  • Loading branch information
brharrington authored Apr 2, 2024
1 parent 47bd1af commit 28feb66
Show file tree
Hide file tree
Showing 9 changed files with 254 additions and 333 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import org.apache.pekko.stream.stage.GraphStage
import org.apache.pekko.stream.stage.GraphStageLogic
import org.apache.pekko.stream.stage.InHandler
import org.apache.pekko.stream.stage.OutHandler
import com.netflix.atlas.eval.stream.EurekaSource.GroupResponse
import com.netflix.atlas.eval.stream.EurekaSource.Groups
import com.netflix.atlas.eval.stream.EddaSource.GroupResponse
import com.netflix.atlas.eval.stream.EddaSource.Groups
import com.netflix.atlas.eval.stream.Evaluator.DataSources
import com.netflix.atlas.pekko.DiagnosticMessage
import com.typesafe.scalalogging.StrictLogging
Expand All @@ -38,7 +38,7 @@ import scala.concurrent.duration.FiniteDuration
* Takes in a collection of data sources and outputs a source that will regularly emit
* the metadata for the groups needed by those sources.
*/
private[stream] class EurekaGroupsLookup(context: StreamContext, frequency: FiniteDuration)
private[stream] class EddaGroupsLookup(context: StreamContext, frequency: FiniteDuration)
extends GraphStage[FlowShape[DataSources, Source[SourcesAndGroups, NotUsed]]]
with StrictLogging {

Expand Down Expand Up @@ -72,7 +72,7 @@ private[stream] class EurekaGroupsLookup(context: StreamContext, frequency: Fini
val eurekaSources = next.sources.asScala
.flatMap { s =>
try {
Option(context.findEurekaBackendForUri(Uri(s.uri)).eurekaUri)
Option(context.findEurekaBackendForUri(Uri(s.uri)).eddaUri)
} catch {
case e: Exception =>
val msg = DiagnosticMessage.error(e)
Expand All @@ -83,7 +83,7 @@ private[stream] class EurekaGroupsLookup(context: StreamContext, frequency: Fini
.toList
.distinct
.map { uri =>
EurekaSource(uri, context)
EddaSource(uri, context)
}

// Perform lookup for each vip and create groups composite
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Copyright 2014-2024 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.atlas.eval.stream

import org.apache.pekko.NotUsed
import org.apache.pekko.http.scaladsl.model.HttpMethods
import org.apache.pekko.http.scaladsl.model.HttpRequest
import org.apache.pekko.http.scaladsl.model.HttpResponse
import org.apache.pekko.http.scaladsl.model.MediaTypes
import org.apache.pekko.http.scaladsl.model.StatusCodes
import org.apache.pekko.http.scaladsl.model.headers.*
import org.apache.pekko.stream.scaladsl.Compression
import org.apache.pekko.stream.scaladsl.Source
import org.apache.pekko.util.ByteString
import com.fasterxml.jackson.annotation.JsonProperty
import com.netflix.atlas.json.Json
import com.netflix.atlas.pekko.ByteStringInputStream
import com.typesafe.scalalogging.StrictLogging

import java.nio.charset.StandardCharsets
import scala.util.Failure
import scala.util.Success

private[stream] object EddaSource extends StrictLogging {

/**
* Subscribes to all instances that are available for an app or a vip in eureka.
*
* @param eddaUri
* Should be the `/api/v2/group/autoScalingGroups` endpoint for an Edda service.
* @param context
* Shared context for the evaluation stream.
*/
def apply(eddaUri: String, context: StreamContext): Source[GroupResponse, NotUsed] = {

val headers =
List(Accept(MediaTypes.`application/json`), `Accept-Encoding`(HttpEncodings.gzip))
val request = HttpRequest(HttpMethods.GET, eddaUri, headers)

Source
.single(request)
.via(context.httpClient("edda"))
.flatMapConcat {
case Success(res: HttpResponse) if res.status == StatusCodes.OK =>
parseResponse(eddaUri, res)
case Success(res: HttpResponse) =>
logger.warn(s"edda refresh failed with status ${res.status}: $eddaUri")
res.discardEntityBytes(context.materializer)
Source.empty[GroupResponse]
case Failure(t) =>
logger.warn(s"edda refresh failed with exception: $eddaUri", t)
Source.empty[GroupResponse]
}
}

private def unzipIfNeeded(res: HttpResponse): Source[ByteString, Any] = {
val isCompressed = res.headers.contains(`Content-Encoding`(HttpEncodings.gzip))
if (isCompressed) res.entity.dataBytes.via(Compression.gunzip()) else res.entity.dataBytes
}

private def parseResponse(
uri: String,
res: HttpResponse
): Source[GroupResponse, Any] = {
unzipIfNeeded(res)
.reduce(_ ++ _)
.recover {
case t: Throwable =>
logger.warn(s"exception while processing edda response: $uri", t)
ByteString.empty
}
.filter(_.nonEmpty)
.map { bs =>
decodeEddaResponse(new ByteStringInputStream(bs)).copy(uri = uri)
}
}

private def decodeEddaResponse(in: ByteStringInputStream): EddaResponse = {
val responses = Json.decode[List[EddaResponse]](in)
require(responses != null, "EddaResponse list cannot be null")
EddaResponse(null, responses.flatMap(_.instances))
}

//
// Model objects for Edda response payloads
//

case class Groups(groups: List[GroupResponse])

sealed trait GroupResponse {

def uri: String
def instances: List[Instance]
}

case class EddaResponse(uri: String, instances: List[Instance]) extends GroupResponse {

require(instances != null, "instances cannot be null")
}

case class Instance(
instanceId: String,
privateIpAddress: Option[String] = None,
ipv6Address: Option[String] = None
) {

require(instanceId != null, "instanceId cannot be null")

private val variables = {
val builder = Map.newBuilder[String, String]

privateIpAddress.foreach { ip =>
builder.addOne("local-ipv4" -> ip) // Backwards compatibility with Eureka
builder.addOne("ipv4" -> ip)
builder.addOne("ip" -> ip)
}

ipv6Address.foreach { ip =>
builder.addOne("ipv6" -> s"[$ip]")
builder.addOne("ip" -> s"[$ip]")
}

// For Edda, always assume default port of 7101
builder.addOne("port" -> "7101")

builder.result()
}

require(variables.contains("ip"), "IP address must be available")

def substitute(pattern: String): String = {
var tmp = pattern
variables.foreachEntry { (k, v) =>
tmp = tmp.replace(s"{$k}", v)
}
tmp
}
}
}

This file was deleted.

Loading

0 comments on commit 28feb66

Please sign in to comment.