Skip to content

Commit

Permalink
eval: add rewrite settings to reference.conf (#1691)
Browse files Browse the repository at this point in the history
Move the config settings to be explicitly in the config
so it is easy to confirm when checking settings.
  • Loading branch information
brharrington authored Aug 28, 2024
1 parent f786abe commit 52a9337
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 35 deletions.
7 changes: 7 additions & 0 deletions atlas-eval/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ atlas.eval {

// Which version of the LWC server API to use
lwcapi-version = 2

// Rewriting for data sources. Can be enabled to call an endpoint for performing rewrites
// on the URIs for a data source.
rewrite {
enabled = false
uri = ""
}
}

graph {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,14 @@ class DataSourceRewriter(
implicit val system: ActorSystem
) extends StrictLogging {

private val (enabled, rewriteUrl) = {
val enabled = config.hasPath("atlas.eval.stream.rewrite-url")
val url = if (enabled) config.getString("atlas.eval.stream.rewrite-url") else ""
if (enabled) {
logger.info(s"Rewriting enabled with url: ${url}")
} else {
logger.info("Rewriting is disabled")
}
(enabled, url)
private val rewriteConfig = config.getConfig("atlas.eval.stream.rewrite")
private val enabled = rewriteConfig.getBoolean("enabled")
private val rewriteUri = rewriteConfig.getString("uri")

if (enabled) {
logger.info(s"rewriting enabled with uri: $rewriteUri")
} else {
logger.info(s"rewriting is disabled")
}

private val client = PekkoHttpClient
Expand Down Expand Up @@ -215,7 +214,7 @@ class DataSourceRewriter(
Some(new DataSource(ds.id, ds.step(), rewrite))
}
}
DataSources.of(rewrites: _*)
DataSources.of(rewrites*)
}

private[stream] def constructRequest(dss: List[DataSource]): HttpRequest = {
Expand All @@ -226,7 +225,7 @@ class DataSourceRewriter(
json.writeEndArray()
}
HttpRequest(
uri = rewriteUrl,
uri = rewriteUri,
method = HttpMethods.POST,
entity = HttpEntity(ContentTypes.`application/json`, baos.toByteArray)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import com.netflix.atlas.pekko.PekkoHttpClient
import com.netflix.spectator.api.NoopRegistry
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import com.typesafe.config.ConfigValueFactory
import munit.FunSuite
import org.apache.pekko.NotUsed
import org.apache.pekko.actor.ActorSystem
Expand All @@ -51,7 +50,7 @@ import scala.util.Try

class DataSourceRewriterSuite extends FunSuite with TestKitBase {

val dss = DataSources.of(
private val dss = DataSources.of(
new DataSource("foo", Duration.ofSeconds(60), "http://localhost/api/v1/graph?q=name,foo,:eq"),
new DataSource(
"bar",
Expand All @@ -60,19 +59,21 @@ class DataSourceRewriterSuite extends FunSuite with TestKitBase {
)
)

var config: Config = _
var logger: MockLogger = _
var ctx: StreamContext = null
private var config: Config = _
private var logger: MockLogger = _
private var ctx: StreamContext = null

override implicit def system: ActorSystem = ActorSystem("Test")

override def beforeEach(context: BeforeEach): Unit = {
config = ConfigFactory
.load()
.withValue(
"atlas.eval.stream.rewrite-url",
ConfigValueFactory.fromAnyRef("http://localhost/api/v1/rewrite")
)
.parseString("""
|atlas.eval.stream.rewrite {
| enabled = true
| uri = "http://localhost/api/v1/rewrite"
|}
|""".stripMargin)
.withFallback(ConfigFactory.load())
logger = new MockLogger()
ctx = new StreamContext(config, Materializer(system), dsLogger = logger)
}
Expand All @@ -85,7 +86,7 @@ class DataSourceRewriterSuite extends FunSuite with TestKitBase {

test("rewrite: OK") {
val client = mockClient(
StatusCode.int2StatusCode(200),
StatusCodes.OK,
okRewrite()
)
val expected = DataSources.of(
Expand All @@ -103,7 +104,7 @@ class DataSourceRewriterSuite extends FunSuite with TestKitBase {

test("rewrite: Bad URI in datasources") {
val client = mockClient(
StatusCode.int2StatusCode(200),
StatusCodes.OK,
Map(
"http://localhost/api/v1/graph?q=name,foo,:eq" -> Rewrite(
"OK",
Expand All @@ -129,7 +130,7 @@ class DataSourceRewriterSuite extends FunSuite with TestKitBase {

test("rewrite: Malformed response JSON") {
val client = mockClient(
StatusCode.int2StatusCode(200),
StatusCodes.OK,
Map(
"http://localhost/api/v1/graph?q=name,foo,:eq" -> Rewrite(
"OK",
Expand All @@ -153,7 +154,7 @@ class DataSourceRewriterSuite extends FunSuite with TestKitBase {

test("rewrite: 500") {
val client = mockClient(
StatusCode.int2StatusCode(500),
StatusCodes.InternalServerError,
Map.empty
)
val expected = DataSources.of()
Expand All @@ -164,7 +165,7 @@ class DataSourceRewriterSuite extends FunSuite with TestKitBase {

test("rewrite: Missing a rewrite") {
val client = mockClient(
StatusCode.int2StatusCode(200),
StatusCodes.OK,
Map(
"http://localhost/api/v1/graph?q=name,foo,:eq" -> Rewrite(
"OK",
Expand All @@ -185,9 +186,9 @@ class DataSourceRewriterSuite extends FunSuite with TestKitBase {
test("rewrite: source changes with good, bad, good") {
val client = new MockClient(
List(
StatusCode.int2StatusCode(200),
StatusCode.int2StatusCode(500),
StatusCode.int2StatusCode(200)
StatusCodes.OK,
StatusCodes.InternalServerError,
StatusCodes.OK
),
List(
okRewrite(true),
Expand Down Expand Up @@ -224,9 +225,9 @@ class DataSourceRewriterSuite extends FunSuite with TestKitBase {
test("rewrite: retry initial flow with 500s") {
val client = new MockClient(
List(
StatusCode.int2StatusCode(500),
StatusCode.int2StatusCode(500),
StatusCode.int2StatusCode(200)
StatusCodes.InternalServerError,
StatusCodes.InternalServerError,
StatusCodes.OK
),
List(
Map.empty,
Expand All @@ -250,9 +251,9 @@ class DataSourceRewriterSuite extends FunSuite with TestKitBase {
test("rewrite: retry initial flow with 500, exception, ok") {
val client = new MockClient(
List(
StatusCode.int2StatusCode(500),
StatusCodes.InternalServerError,
StatusCodes.custom(0, "no conn", "no conn", false, true),
StatusCode.int2StatusCode(200)
StatusCodes.OK
),
List(
Map.empty,
Expand Down Expand Up @@ -343,7 +344,9 @@ class DataSourceRewriterSuite extends FunSuite with TestKitBase {

var called = 0

override def singleRequest(request: HttpRequest): Future[HttpResponse] = ???
override def singleRequest(request: HttpRequest): Future[HttpResponse] = {
Future.failed(new UnsupportedOperationException())
}

override def superPool[C](
config: PekkoHttpClient.ClientConfig
Expand Down

0 comments on commit 52a9337

Please sign in to comment.