Skip to content

Commit

Permalink
Add test support for Monix task - Elasticsearch
Browse files Browse the repository at this point in the history
  • Loading branch information
paualarco committed Oct 18, 2021
1 parent 43625dd commit 1ac01cf
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 285 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,35 +18,36 @@
package monix.connect.aws.auth

import java.net.URI
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.flatspec.AsyncFlatSpec
import pureconfig.{CamelCase, ConfigFieldMapping, ConfigSource, KebabCase, PascalCase, SnakeCase}
import pureconfig.generic.auto._
import org.scalatest.matchers.should.Matchers
import pureconfig.error.ConfigReaderException
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider
import software.amazon.awssdk.regions.Region
import monix.connect.aws.auth.MonixAwsConf._
import monix.execution.Scheduler.Implicits.global
import monix.eval.Task
import monix.execution.Scheduler
import monix.testing.scalatest.MonixTaskSpec
import pureconfig.generic.ProductHint

import java.io.File
import scala.util.Try

class MonixAwsConfigSpec extends AnyFlatSpec with Matchers {
class MonixAwsConfigSpec extends AsyncFlatSpec with MonixTaskSpec with Matchers {

"MonixAwsConf" should "load from default config file" in {
//given/when
val monixAwsConf = MonixAwsConf.load().runSyncUnsafe()
override implicit val scheduler: Scheduler = Scheduler.io("monix-aws-config-spec")

//then
monixAwsConf.credentials shouldBe a[DefaultCredentialsProvider]
monixAwsConf.endpoint.isDefined shouldBe true
monixAwsConf.region shouldBe Region.EU_WEST_1
"MonixAwsConf" should "load from default config file" in {
MonixAwsConf.load().asserting { monixAwsConf =>
monixAwsConf.credentials shouldBe a[DefaultCredentialsProvider]
monixAwsConf.endpoint.isDefined shouldBe true
monixAwsConf.region shouldBe Region.EU_WEST_1
}
}

it should "read the local endpoint as a uri" in {

//given
implicit val hint: ProductHint[AppConf] =
ProductHint(ConfigFieldMapping(CamelCase, KebabCase), useDefaultArgs = false, allowUnknownKeys = true)

Expand All @@ -62,18 +63,15 @@ class MonixAwsConfigSpec extends AnyFlatSpec with Matchers {
|}
|""".stripMargin)

//when
val monixAwsConf = configSource.loadOrThrow[AppConf].monixAws

//then
monixAwsConf.credentials shouldBe a[DefaultCredentialsProvider]
monixAwsConf.endpoint shouldBe Some(URI.create("localhost:4566"))
monixAwsConf.httpClient.isDefined shouldBe false
monixAwsConf.region shouldBe Region.AWS_GLOBAL
Task(configSource.loadOrThrow[AppConf].monixAws).asserting{ monixAwsConf =>
monixAwsConf.credentials shouldBe a[DefaultCredentialsProvider]
monixAwsConf.endpoint shouldBe Some(URI.create("localhost:4566"))
monixAwsConf.httpClient.isDefined shouldBe false
monixAwsConf.region shouldBe Region.AWS_GLOBAL
}
}

it should "not require endpoint nor http client settings" in {
//given
val configSource = ConfigSource.string(
"" +
s"""
Expand All @@ -87,18 +85,15 @@ class MonixAwsConfigSpec extends AnyFlatSpec with Matchers {
|}
|""".stripMargin)

//when
val monixAwsConf = configSource.loadOrThrow[AppConf].monixAws

//then
monixAwsConf.credentials shouldBe a[DefaultCredentialsProvider]
monixAwsConf.endpoint.isDefined shouldBe false
monixAwsConf.httpClient.isDefined shouldBe false
monixAwsConf.region shouldBe Region.AWS_GLOBAL
Task(configSource.loadOrThrow[AppConf].monixAws).asserting { monixAwsConf =>
monixAwsConf.credentials shouldBe a[DefaultCredentialsProvider]
monixAwsConf.endpoint.isDefined shouldBe false
monixAwsConf.httpClient.isDefined shouldBe false
monixAwsConf.region shouldBe Region.AWS_GLOBAL
}
}

it should "fail when credentials are not present" in {
//given
val configSource = ConfigSource.string(
"" +
s"""
Expand All @@ -109,17 +104,14 @@ class MonixAwsConfigSpec extends AnyFlatSpec with Matchers {
|}
|""".stripMargin)

//when
val monixAwsConf = Try(configSource.loadOrThrow[AppConf]).map(_.monixAws)

//then
monixAwsConf.isFailure shouldBe true
monixAwsConf.failed.get shouldBe a[ConfigReaderException[_]]
monixAwsConf.failed.get.getMessage should include("Key not found: 'credentials'")
}

it should "fail when credentials region is not present" in {
//given
val configSource = ConfigSource.string(
"" +
s"""
Expand All @@ -132,77 +124,61 @@ class MonixAwsConfigSpec extends AnyFlatSpec with Matchers {
|}
|""".stripMargin)

//when
val monixAwsConf = Try(configSource.loadOrThrow[AppConf]).map(_.monixAws)

//then
monixAwsConf.isFailure shouldBe true
monixAwsConf.failed.get shouldBe a[ConfigReaderException[_]]
monixAwsConf.failed.get.getMessage should include("Key not found: 'region'")
Task(configSource.loadOrThrow[AppConf]).map(_.monixAws).attempt.asserting { monixAwsConf =>
monixAwsConf.isLeft shouldBe true
monixAwsConf.left.get shouldBe a[ConfigReaderException[_]]
monixAwsConf.left.get.getMessage should include("Key not found: 'region'")
}
}

it can "read config in kebabCase" in {
//given/when
val monixAwsConf =
MonixAwsConf.file(new File("aws-auth/src/test/resources/kebab-case.conf"), KebabCase).runSyncUnsafe()

//then
monixAwsConf.credentials shouldBe a[DefaultCredentialsProvider]
monixAwsConf.endpoint shouldBe Some(URI.create("kebab-case:12345"))
monixAwsConf.region shouldBe Region.EU_WEST_1
MonixAwsConf.file(new File("aws-auth/src/test/resources/kebab-case.conf"), KebabCase)
.asserting{ monixAwsConf =>
monixAwsConf.credentials shouldBe a[DefaultCredentialsProvider]
monixAwsConf.endpoint shouldBe Some(URI.create("kebab-case:12345"))
monixAwsConf.region shouldBe Region.EU_WEST_1
}
}

it can "read config in snake_case" in {
//given/when
val monixAwsConf =
MonixAwsConf.file(new File("aws-auth/src/test/resources/snake_case.conf"), SnakeCase).runSyncUnsafe()

//then
monixAwsConf.credentials shouldBe a[DefaultCredentialsProvider]
monixAwsConf.endpoint shouldBe Some(URI.create("snake:12345"))
monixAwsConf.region shouldBe Region.EU_WEST_1
MonixAwsConf.file(new File("aws-auth/src/test/resources/snake_case.conf"), SnakeCase).asserting { monixAwsConf =>
monixAwsConf.credentials shouldBe a[DefaultCredentialsProvider]
monixAwsConf.endpoint shouldBe Some(URI.create("snake:12345"))
monixAwsConf.region shouldBe Region.EU_WEST_1
}
}

it can "read config in PascalCase" in {
//given/when
val monixAwsConf =
MonixAwsConf.file(new File("aws-auth/src/test/resources/PascalCase.conf"), PascalCase).runSyncUnsafe()

//then
monixAwsConf.credentials shouldBe a[DefaultCredentialsProvider]
monixAwsConf.endpoint shouldBe Some(URI.create("PascalCase:12345"))
monixAwsConf.region shouldBe Region.EU_WEST_1
MonixAwsConf.file(new File("aws-auth/src/test/resources/PascalCase.conf"), PascalCase).asserting { monixAwsConf =>
monixAwsConf.credentials shouldBe a[DefaultCredentialsProvider]
monixAwsConf.endpoint shouldBe Some(URI.create("PascalCase:12345"))
monixAwsConf.region shouldBe Region.EU_WEST_1
}
}

it can "read config in camelCase" in {
//given/when
val monixAwsConf =
MonixAwsConf.file(new File("aws-auth/src/test/resources/camelCase.conf"), CamelCase).runSyncUnsafe()

//then
monixAwsConf.credentials shouldBe a[DefaultCredentialsProvider]
monixAwsConf.endpoint shouldBe Some(URI.create("camelCase:12345"))
monixAwsConf.region shouldBe Region.EU_WEST_1
MonixAwsConf.file(new File("aws-auth/src/test/resources/camelCase.conf"), CamelCase).asserting { monixAwsConf =>
monixAwsConf.credentials shouldBe a[DefaultCredentialsProvider]
monixAwsConf.endpoint shouldBe Some(URI.create("camelCase:12345"))
monixAwsConf.region shouldBe Region.EU_WEST_1
}
}

it should "read config in kebab-case by default from reference.conf " in {
//given/when
val monixAwsConf = MonixAwsConf.load().runSyncUnsafe()

//then
monixAwsConf.credentials shouldBe a[DefaultCredentialsProvider]
monixAwsConf.endpoint shouldBe Some(URI.create("localhost:4566"))
monixAwsConf.region shouldBe Region.EU_WEST_1
MonixAwsConf.load().asserting { monixAwsConf =>
monixAwsConf.credentials shouldBe a[DefaultCredentialsProvider]
monixAwsConf.endpoint shouldBe Some(URI.create("localhost:4566"))
monixAwsConf.region shouldBe Region.EU_WEST_1
}
}

it should "read config in camelCase by default when reading from path" in {
//given/when
val monixAwsConf = MonixAwsConf.file(new File("aws-auth/src/test/resources/kebab-case.conf")).runSyncUnsafe()
MonixAwsConf.file(new File("aws-auth/src/test/resources/kebab-case.conf")).asserting { monixAwsConf =>
monixAwsConf.credentials shouldBe a[DefaultCredentialsProvider]
monixAwsConf.endpoint shouldBe Some(URI.create("kebab-case:12345"))
monixAwsConf.region shouldBe Region.EU_WEST_1
}

//then
monixAwsConf.credentials shouldBe a[DefaultCredentialsProvider]
monixAwsConf.endpoint shouldBe Some(URI.create("kebab-case:12345"))
monixAwsConf.region shouldBe Region.EU_WEST_1
}

}
Original file line number Diff line number Diff line change
@@ -1,108 +1,82 @@
package monix.connect.elasticsearch

import monix.eval.Task
import monix.execution.Scheduler
import monix.execution.Scheduler.Implicits.global
import monix.reactive.Observable
import monix.testing.scalatest.MonixTaskSpec
import org.scalacheck.Gen
import org.scalatest.BeforeAndAfterEach
import org.scalatest.flatspec.AnyFlatSpecLike
import org.scalatest.flatspec.{AnyFlatSpecLike, AsyncFlatSpec}
import org.scalatest.matchers.should.Matchers

import scala.util.Try

class ElasticsearchSinkSuite extends AnyFlatSpecLike with Fixture with Matchers with BeforeAndAfterEach {
class ElasticsearchSinkSuite extends AsyncFlatSpec with MonixTaskSpec with Fixture with Matchers with BeforeAndAfterEach {
import com.sksamuel.elastic4s.ElasticDsl._

override implicit val scheduler: Scheduler = Scheduler.io("elasticsearch-sink-suite")
"ElasticsearchSink" should "execute update requests in batches" in {
// given
val updateRequests = Gen.listOfN(10, genUpdateRequest).sample.get

// when
esResource.use { es =>
Observable
.from(updateRequests)
.bufferTumbling(5)
.consumeWith(es.bulkRequestSink())
}.runSyncUnsafe()

// then
val r =
Task
.parSequence(updateRequests.map { request =>
getById(request.index.name, request.id)
.map(_.sourceAsString)
})
.runSyncUnsafe()
r should contain theSameElementsAs updateRequests.flatMap(_.documentSource)
.consumeWith(es.bulkRequestSink()) *>
Task
.parSequence(updateRequests.map { request =>
getById(request.index.name, request.id)
.map(_.sourceAsString)
})
}.asserting(_ should contain theSameElementsAs updateRequests.flatMap(_.documentSource))
}

it should "execute delete requests in batches" in {
// given
val updateRequests = Gen.listOfN(10, genUpdateRequest).sample.get
val deleteRequests = updateRequests.take(5).map(r => deleteById(r.index, r.id))

// when
esResource.use { es =>
Observable
.from(updateRequests)
.bufferTumbling(5)
.consumeWith(es.bulkRequestSink())
}.runSyncUnsafe()
esResource.use { es =>
Observable
.from(deleteRequests)
.bufferTumbling(5)
.consumeWith(es.bulkRequestSink())
}.runSyncUnsafe()
// then
val r =
Task
.parSequence(updateRequests.map { request =>
getById(request.index.name, request.id)
.map(_.sourceAsString)
})
.runSyncUnsafe()
r should contain theSameElementsAs List.fill(5)("{}") ++ updateRequests.takeRight(5).flatMap(_.documentSource)
.consumeWith(es.bulkRequestSink()) *>
Observable
.from(deleteRequests)
.bufferTumbling(5)
.consumeWith(es.bulkRequestSink()) *>
Task
.parSequence(updateRequests.map { request =>
getById(request.index.name, request.id)
.map(_.sourceAsString)
})
}.asserting(_ should contain theSameElementsAs List.fill(5)("{}") ++ updateRequests.takeRight(5).flatMap(_.documentSource))
}

it should "execute index requests batches" in {
// given
val indexRequests = Gen.listOfN(10, genIndexRequest).sample.get

// when
esResource.use { es =>
Observable
.from(indexRequests)
.bufferTumbling(5)
.consumeWith(es.bulkRequestSink())
}.runSyncUnsafe()

// then
val r =
Task
.parSequence(indexRequests.map { request =>
getById(request.index.name, request.id.get)
.map(_.sourceAsString)
})
.runSyncUnsafe()
r should contain theSameElementsAs indexRequests.flatMap(_.source)
.consumeWith(es.bulkRequestSink()) *>
Task
.parSequence(indexRequests.map { request =>
getById(request.index.name, request.id.get)
.map(_.sourceAsString)
})
}.asserting{ _ should contain theSameElementsAs indexRequests.flatMap(_.source) }
}

it should "fails when the es index not exists" in {
// given
val requests = Seq(updateById("test_index", "test_id"))

// when
val ob =
esResource.use { es =>
Observable
.from(requests)
.bufferTumbling(5)
.consumeWith(es.bulkRequestSink())
}

// then
val r = Try(ob.runSyncUnsafe())
r.isFailure shouldBe true
esResource.use { es =>
Observable
.from(requests)
.bufferTumbling(5)
.consumeWith(es.bulkRequestSink())
}.attempt.asserting(_.isLeft shouldBe true)
}
}
Loading

0 comments on commit 1ac01cf

Please sign in to comment.