From 1ac01cfb4af9ae7fb0f3521119ff2298a51359d2 Mon Sep 17 00:00:00 2001 From: Pau Date: Sun, 17 Oct 2021 19:42:24 +0200 Subject: [PATCH] Add test support for Monix task - Elasticsearch --- .../connect/aws/auth/MonixAwsConfigSpec.scala | 144 ++++++--------- .../ElasticsearchSinkSuite.scala | 98 ++++------ .../ElasticsearchSourceSuite.scala | 47 ++--- .../elasticsearch/ElasticsearchSuite.scala | 171 +++++++----------- .../monix/connect/elasticsearch/Fixture.scala | 2 + 5 files changed, 177 insertions(+), 285 deletions(-) diff --git a/aws-auth/src/test/scala/monix/connect/aws/auth/MonixAwsConfigSpec.scala b/aws-auth/src/test/scala/monix/connect/aws/auth/MonixAwsConfigSpec.scala index c1d89b117..1ab1ddb3d 100644 --- a/aws-auth/src/test/scala/monix/connect/aws/auth/MonixAwsConfigSpec.scala +++ b/aws-auth/src/test/scala/monix/connect/aws/auth/MonixAwsConfigSpec.scala @@ -18,7 +18,7 @@ package monix.connect.aws.auth import java.net.URI -import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.flatspec.AsyncFlatSpec import pureconfig.{CamelCase, ConfigFieldMapping, ConfigSource, KebabCase, PascalCase, SnakeCase} import pureconfig.generic.auto._ import org.scalatest.matchers.should.Matchers @@ -26,27 +26,28 @@ import pureconfig.error.ConfigReaderException import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider import software.amazon.awssdk.regions.Region import monix.connect.aws.auth.MonixAwsConf._ -import monix.execution.Scheduler.Implicits.global +import monix.eval.Task +import monix.execution.Scheduler +import monix.testing.scalatest.MonixTaskSpec import pureconfig.generic.ProductHint import java.io.File import scala.util.Try -class MonixAwsConfigSpec extends AnyFlatSpec with Matchers { +class MonixAwsConfigSpec extends AsyncFlatSpec with MonixTaskSpec with Matchers { - "MonixAwsConf" should "load from default config file" in { - //given/when - val monixAwsConf = MonixAwsConf.load().runSyncUnsafe() + override implicit val scheduler: Scheduler = Scheduler.io("monix-aws-config-spec") - //then - monixAwsConf.credentials shouldBe a[DefaultCredentialsProvider] - monixAwsConf.endpoint.isDefined shouldBe true - monixAwsConf.region shouldBe Region.EU_WEST_1 + "MonixAwsConf" should "load from default config file" in { + MonixAwsConf.load().asserting { monixAwsConf => + monixAwsConf.credentials shouldBe a[DefaultCredentialsProvider] + monixAwsConf.endpoint.isDefined shouldBe true + monixAwsConf.region shouldBe Region.EU_WEST_1 + } } it should "read the local endpoint as a uri" in { - //given implicit val hint: ProductHint[AppConf] = ProductHint(ConfigFieldMapping(CamelCase, KebabCase), useDefaultArgs = false, allowUnknownKeys = true) @@ -62,18 +63,15 @@ class MonixAwsConfigSpec extends AnyFlatSpec with Matchers { |} |""".stripMargin) - //when - val monixAwsConf = configSource.loadOrThrow[AppConf].monixAws - - //then - monixAwsConf.credentials shouldBe a[DefaultCredentialsProvider] - monixAwsConf.endpoint shouldBe Some(URI.create("localhost:4566")) - monixAwsConf.httpClient.isDefined shouldBe false - monixAwsConf.region shouldBe Region.AWS_GLOBAL + Task(configSource.loadOrThrow[AppConf].monixAws).asserting{ monixAwsConf => + monixAwsConf.credentials shouldBe a[DefaultCredentialsProvider] + monixAwsConf.endpoint shouldBe Some(URI.create("localhost:4566")) + monixAwsConf.httpClient.isDefined shouldBe false + monixAwsConf.region shouldBe Region.AWS_GLOBAL + } } it should "not require endpoint nor http client settings" in { - //given val configSource = ConfigSource.string( "" + s""" @@ -87,18 +85,15 @@ class MonixAwsConfigSpec extends AnyFlatSpec with Matchers { |} |""".stripMargin) - //when - val monixAwsConf = configSource.loadOrThrow[AppConf].monixAws - - //then - monixAwsConf.credentials shouldBe a[DefaultCredentialsProvider] - monixAwsConf.endpoint.isDefined shouldBe false - monixAwsConf.httpClient.isDefined shouldBe false - monixAwsConf.region shouldBe Region.AWS_GLOBAL + Task(configSource.loadOrThrow[AppConf].monixAws).asserting { monixAwsConf => + monixAwsConf.credentials shouldBe a[DefaultCredentialsProvider] + monixAwsConf.endpoint.isDefined shouldBe false + monixAwsConf.httpClient.isDefined shouldBe false + monixAwsConf.region shouldBe Region.AWS_GLOBAL + } } it should "fail when credentials are not present" in { - //given val configSource = ConfigSource.string( "" + s""" @@ -109,17 +104,14 @@ class MonixAwsConfigSpec extends AnyFlatSpec with Matchers { |} |""".stripMargin) - //when val monixAwsConf = Try(configSource.loadOrThrow[AppConf]).map(_.monixAws) - //then monixAwsConf.isFailure shouldBe true monixAwsConf.failed.get shouldBe a[ConfigReaderException[_]] monixAwsConf.failed.get.getMessage should include("Key not found: 'credentials'") } it should "fail when credentials region is not present" in { - //given val configSource = ConfigSource.string( "" + s""" @@ -132,77 +124,61 @@ class MonixAwsConfigSpec extends AnyFlatSpec with Matchers { |} |""".stripMargin) - //when - val monixAwsConf = Try(configSource.loadOrThrow[AppConf]).map(_.monixAws) - - //then - monixAwsConf.isFailure shouldBe true - monixAwsConf.failed.get shouldBe a[ConfigReaderException[_]] - monixAwsConf.failed.get.getMessage should include("Key not found: 'region'") + Task(configSource.loadOrThrow[AppConf]).map(_.monixAws).attempt.asserting { monixAwsConf => + monixAwsConf.isLeft shouldBe true + monixAwsConf.left.get shouldBe a[ConfigReaderException[_]] + monixAwsConf.left.get.getMessage should include("Key not found: 'region'") + } } it can "read config in kebabCase" in { - //given/when - val monixAwsConf = - MonixAwsConf.file(new File("aws-auth/src/test/resources/kebab-case.conf"), KebabCase).runSyncUnsafe() - - //then - monixAwsConf.credentials shouldBe a[DefaultCredentialsProvider] - monixAwsConf.endpoint shouldBe Some(URI.create("kebab-case:12345")) - monixAwsConf.region shouldBe Region.EU_WEST_1 + MonixAwsConf.file(new File("aws-auth/src/test/resources/kebab-case.conf"), KebabCase) + .asserting{ monixAwsConf => + monixAwsConf.credentials shouldBe a[DefaultCredentialsProvider] + monixAwsConf.endpoint shouldBe Some(URI.create("kebab-case:12345")) + monixAwsConf.region shouldBe Region.EU_WEST_1 + } } it can "read config in snake_case" in { - //given/when - val monixAwsConf = - MonixAwsConf.file(new File("aws-auth/src/test/resources/snake_case.conf"), SnakeCase).runSyncUnsafe() - - //then - monixAwsConf.credentials shouldBe a[DefaultCredentialsProvider] - monixAwsConf.endpoint shouldBe Some(URI.create("snake:12345")) - monixAwsConf.region shouldBe Region.EU_WEST_1 + MonixAwsConf.file(new File("aws-auth/src/test/resources/snake_case.conf"), SnakeCase).asserting { monixAwsConf => + monixAwsConf.credentials shouldBe a[DefaultCredentialsProvider] + monixAwsConf.endpoint shouldBe Some(URI.create("snake:12345")) + monixAwsConf.region shouldBe Region.EU_WEST_1 + } } it can "read config in PascalCase" in { - //given/when - val monixAwsConf = - MonixAwsConf.file(new File("aws-auth/src/test/resources/PascalCase.conf"), PascalCase).runSyncUnsafe() - - //then - monixAwsConf.credentials shouldBe a[DefaultCredentialsProvider] - monixAwsConf.endpoint shouldBe Some(URI.create("PascalCase:12345")) - monixAwsConf.region shouldBe Region.EU_WEST_1 + MonixAwsConf.file(new File("aws-auth/src/test/resources/PascalCase.conf"), PascalCase).asserting { monixAwsConf => + monixAwsConf.credentials shouldBe a[DefaultCredentialsProvider] + monixAwsConf.endpoint shouldBe Some(URI.create("PascalCase:12345")) + monixAwsConf.region shouldBe Region.EU_WEST_1 + } } it can "read config in camelCase" in { - //given/when - val monixAwsConf = - MonixAwsConf.file(new File("aws-auth/src/test/resources/camelCase.conf"), CamelCase).runSyncUnsafe() - - //then - monixAwsConf.credentials shouldBe a[DefaultCredentialsProvider] - monixAwsConf.endpoint shouldBe Some(URI.create("camelCase:12345")) - monixAwsConf.region shouldBe Region.EU_WEST_1 + MonixAwsConf.file(new File("aws-auth/src/test/resources/camelCase.conf"), CamelCase).asserting { monixAwsConf => + monixAwsConf.credentials shouldBe a[DefaultCredentialsProvider] + monixAwsConf.endpoint shouldBe Some(URI.create("camelCase:12345")) + monixAwsConf.region shouldBe Region.EU_WEST_1 + } } it should "read config in kebab-case by default from reference.conf " in { - //given/when - val monixAwsConf = MonixAwsConf.load().runSyncUnsafe() - - //then - monixAwsConf.credentials shouldBe a[DefaultCredentialsProvider] - monixAwsConf.endpoint shouldBe Some(URI.create("localhost:4566")) - monixAwsConf.region shouldBe Region.EU_WEST_1 + MonixAwsConf.load().asserting { monixAwsConf => + monixAwsConf.credentials shouldBe a[DefaultCredentialsProvider] + monixAwsConf.endpoint shouldBe Some(URI.create("localhost:4566")) + monixAwsConf.region shouldBe Region.EU_WEST_1 + } } it should "read config in camelCase by default when reading from path" in { - //given/when - val monixAwsConf = MonixAwsConf.file(new File("aws-auth/src/test/resources/kebab-case.conf")).runSyncUnsafe() + MonixAwsConf.file(new File("aws-auth/src/test/resources/kebab-case.conf")).asserting { monixAwsConf => + monixAwsConf.credentials shouldBe a[DefaultCredentialsProvider] + monixAwsConf.endpoint shouldBe Some(URI.create("kebab-case:12345")) + monixAwsConf.region shouldBe Region.EU_WEST_1 + } - //then - monixAwsConf.credentials shouldBe a[DefaultCredentialsProvider] - monixAwsConf.endpoint shouldBe Some(URI.create("kebab-case:12345")) - monixAwsConf.region shouldBe Region.EU_WEST_1 } } diff --git a/elasticsearch/src/it/scala/monix/connect/elasticsearch/ElasticsearchSinkSuite.scala b/elasticsearch/src/it/scala/monix/connect/elasticsearch/ElasticsearchSinkSuite.scala index e69ab1834..31ea552ec 100644 --- a/elasticsearch/src/it/scala/monix/connect/elasticsearch/ElasticsearchSinkSuite.scala +++ b/elasticsearch/src/it/scala/monix/connect/elasticsearch/ElasticsearchSinkSuite.scala @@ -1,108 +1,82 @@ package monix.connect.elasticsearch import monix.eval.Task +import monix.execution.Scheduler import monix.execution.Scheduler.Implicits.global import monix.reactive.Observable +import monix.testing.scalatest.MonixTaskSpec import org.scalacheck.Gen import org.scalatest.BeforeAndAfterEach -import org.scalatest.flatspec.AnyFlatSpecLike +import org.scalatest.flatspec.{AnyFlatSpecLike, AsyncFlatSpec} import org.scalatest.matchers.should.Matchers import scala.util.Try -class ElasticsearchSinkSuite extends AnyFlatSpecLike with Fixture with Matchers with BeforeAndAfterEach { +class ElasticsearchSinkSuite extends AsyncFlatSpec with MonixTaskSpec with Fixture with Matchers with BeforeAndAfterEach { import com.sksamuel.elastic4s.ElasticDsl._ + override implicit val scheduler: Scheduler = Scheduler.io("elasticsearch-sink-suite") "ElasticsearchSink" should "execute update requests in batches" in { - // given val updateRequests = Gen.listOfN(10, genUpdateRequest).sample.get - // when esResource.use { es => Observable .from(updateRequests) .bufferTumbling(5) - .consumeWith(es.bulkRequestSink()) - }.runSyncUnsafe() - - // then - val r = - Task - .parSequence(updateRequests.map { request => - getById(request.index.name, request.id) - .map(_.sourceAsString) - }) - .runSyncUnsafe() - r should contain theSameElementsAs updateRequests.flatMap(_.documentSource) + .consumeWith(es.bulkRequestSink()) *> + Task + .parSequence(updateRequests.map { request => + getById(request.index.name, request.id) + .map(_.sourceAsString) + }) + }.asserting(_ should contain theSameElementsAs updateRequests.flatMap(_.documentSource)) } it should "execute delete requests in batches" in { - // given val updateRequests = Gen.listOfN(10, genUpdateRequest).sample.get val deleteRequests = updateRequests.take(5).map(r => deleteById(r.index, r.id)) - // when esResource.use { es => Observable .from(updateRequests) .bufferTumbling(5) - .consumeWith(es.bulkRequestSink()) - }.runSyncUnsafe() - esResource.use { es => - Observable - .from(deleteRequests) - .bufferTumbling(5) - .consumeWith(es.bulkRequestSink()) - }.runSyncUnsafe() - // then - val r = - Task - .parSequence(updateRequests.map { request => - getById(request.index.name, request.id) - .map(_.sourceAsString) - }) - .runSyncUnsafe() - r should contain theSameElementsAs List.fill(5)("{}") ++ updateRequests.takeRight(5).flatMap(_.documentSource) + .consumeWith(es.bulkRequestSink()) *> + Observable + .from(deleteRequests) + .bufferTumbling(5) + .consumeWith(es.bulkRequestSink()) *> + Task + .parSequence(updateRequests.map { request => + getById(request.index.name, request.id) + .map(_.sourceAsString) + }) + }.asserting(_ should contain theSameElementsAs List.fill(5)("{}") ++ updateRequests.takeRight(5).flatMap(_.documentSource)) } it should "execute index requests batches" in { - // given val indexRequests = Gen.listOfN(10, genIndexRequest).sample.get - // when esResource.use { es => Observable .from(indexRequests) .bufferTumbling(5) - .consumeWith(es.bulkRequestSink()) - }.runSyncUnsafe() - - // then - val r = - Task - .parSequence(indexRequests.map { request => - getById(request.index.name, request.id.get) - .map(_.sourceAsString) - }) - .runSyncUnsafe() - r should contain theSameElementsAs indexRequests.flatMap(_.source) + .consumeWith(es.bulkRequestSink()) *> + Task + .parSequence(indexRequests.map { request => + getById(request.index.name, request.id.get) + .map(_.sourceAsString) + }) + }.asserting{ _ should contain theSameElementsAs indexRequests.flatMap(_.source) } } it should "fails when the es index not exists" in { - // given val requests = Seq(updateById("test_index", "test_id")) - // when - val ob = - esResource.use { es => - Observable - .from(requests) - .bufferTumbling(5) - .consumeWith(es.bulkRequestSink()) - } - - // then - val r = Try(ob.runSyncUnsafe()) - r.isFailure shouldBe true + esResource.use { es => + Observable + .from(requests) + .bufferTumbling(5) + .consumeWith(es.bulkRequestSink()) + }.attempt.asserting(_.isLeft shouldBe true) } } diff --git a/elasticsearch/src/it/scala/monix/connect/elasticsearch/ElasticsearchSourceSuite.scala b/elasticsearch/src/it/scala/monix/connect/elasticsearch/ElasticsearchSourceSuite.scala index d98c3c5c1..363ebc6ee 100644 --- a/elasticsearch/src/it/scala/monix/connect/elasticsearch/ElasticsearchSourceSuite.scala +++ b/elasticsearch/src/it/scala/monix/connect/elasticsearch/ElasticsearchSourceSuite.scala @@ -1,24 +1,26 @@ package monix.connect.elasticsearch import monix.eval.Task +import monix.execution.Scheduler import monix.execution.Scheduler.Implicits.global +import monix.testing.scalatest.MonixTaskSpec import org.scalacheck.Gen import org.scalatest.BeforeAndAfterEach -import org.scalatest.flatspec.AnyFlatSpecLike +import org.scalatest.flatspec.{AnyFlatSpecLike, AsyncFlatSpec} import org.scalatest.matchers.should.Matchers import scala.util.Try -class ElasticsearchSourceSuite extends AnyFlatSpecLike with Fixture with Matchers with BeforeAndAfterEach { +class ElasticsearchSourceSuite extends AsyncFlatSpec with MonixTaskSpec with Fixture with Matchers with BeforeAndAfterEach { import com.sksamuel.elastic4s.ElasticDsl._ + override implicit val scheduler: Scheduler = Scheduler.io("elasticsearch-sink-suite") + "ElasticsearchSource" should "emit searched results to downstream" in { - // given val index = genIndex.sample.get val updateRequests = Gen.listOfN(1000, genUpdateRequest(index)).sample.get val searchRequest = search(index).query(matchAllQuery()).keepAlive("1m") - // when esResource.use { es => Task .sequence( @@ -26,60 +28,45 @@ class ElasticsearchSourceSuite extends AnyFlatSpecLike with Fixture with Matcher es.bulkExecuteRequest(updateRequests), es.refresh(index) ) - ) - }.runSyncUnsafe() - // then - val r = esResource.use { es => + ) *> es.scroll(searchRequest) .map(_.id) .toListL - }.runSyncUnsafe() - r should contain theSameElementsAs updateRequests.map(_.id).distinct + }.asserting(_ should contain theSameElementsAs updateRequests.map(_.id).distinct) } it should "emit searched results when the param `keepAlive` is empty" in { - // given val index = genIndex.sample.get val updateRequests = Gen.listOfN(1000, genUpdateRequest(index)).sample.get val searchRequest = search(index).query(matchAllQuery()) - // when esResource.use { es => Task.sequence( Seq( es.bulkExecuteRequest(updateRequests), es.refresh(index) ) - ) - }.runSyncUnsafe() - - // then - val r = esResource.use(_.scroll(searchRequest).map(_.id).toListL).runSyncUnsafe() - r should contain theSameElementsAs updateRequests.map(_.id).distinct + ) *> + es.scroll(searchRequest).map(_.id).toListL + }.asserting(_ should contain theSameElementsAs updateRequests.map(_.id).distinct) } it should "fails when the es index not exists" in { - // given val index = genIndex.sample.get val searchRequest = search(index).query(matchAllQuery()).keepAlive("1m") - // when - val r = Try(esResource.use(_.scroll(searchRequest).map(_.id).toListL).runSyncUnsafe()) - - // then - r.isFailure shouldBe true + esResource.use(_.scroll(searchRequest).map(_.id).toListL) + .attempt.asserting(_.isLeft shouldBe true) } it should "returns an empty list when result is empty" in { - // given val index = genIndex.sample.get - esResource.use(_.createIndex(createIndex(index))).runSyncUnsafe() val searchRequest = search(index).query(matchAllQuery()).keepAlive("1m") - // when - val result = esResource.use(_.scroll(searchRequest).map(_.id).toListL).runSyncUnsafe() - // then - result.isEmpty shouldBe true + esResource.use{ es => + es.createIndex(createIndex(index)) *> + es.scroll(searchRequest).map(_.id).toListL + }.asserting(_.isEmpty shouldBe true) } } diff --git a/elasticsearch/src/it/scala/monix/connect/elasticsearch/ElasticsearchSuite.scala b/elasticsearch/src/it/scala/monix/connect/elasticsearch/ElasticsearchSuite.scala index 2cfefbe4e..0f141c058 100644 --- a/elasticsearch/src/it/scala/monix/connect/elasticsearch/ElasticsearchSuite.scala +++ b/elasticsearch/src/it/scala/monix/connect/elasticsearch/ElasticsearchSuite.scala @@ -2,39 +2,36 @@ package monix.connect.elasticsearch import com.sksamuel.elastic4s.Indexes import monix.eval.Task +import monix.execution.Scheduler import monix.execution.Scheduler.Implicits.global +import monix.testing.scalatest.MonixTaskSpec import org.scalacheck.Gen import org.scalatest.BeforeAndAfterEach -import org.scalatest.flatspec.AnyFlatSpecLike +import org.scalatest.flatspec.{AnyFlatSpecLike, AsyncFlatSpec} import org.scalatest.matchers.should.Matchers -class ElasticsearchSuite extends AnyFlatSpecLike with Fixture with Matchers with BeforeAndAfterEach { +class ElasticsearchSuite extends AsyncFlatSpec with MonixTaskSpec with Fixture with Matchers with BeforeAndAfterEach { import com.sksamuel.elastic4s.ElasticDsl._ + override implicit val scheduler: Scheduler = Scheduler.io("elasticsearch-suite") + "Elasticsearch" should "execute many update requests" in { - // given val updateRequests = Gen.listOfN(10, genUpdateRequest).sample.get - // when - esResource.use(_.bulkExecuteRequest(updateRequests)).runSyncUnsafe() - - // then - val r = - Task - .parSequence(updateRequests.map { request => - getById(request.index.name, request.id) - .map(_.sourceAsString) - }) - .runSyncUnsafe() - r should contain theSameElementsAs updateRequests.flatMap(_.documentSource) + esResource.use{ es => + es.bulkExecuteRequest(updateRequests) *> + Task + .parSequence(updateRequests.map { request => + getById(request.index.name, request.id) + .map(_.sourceAsString) + }) + }.asserting(_ should contain theSameElementsAs updateRequests.flatMap(_.documentSource)) } it should "execute many delete requests" in { - // given val updateRequests = Gen.listOfN(10, genUpdateRequest).sample.get val deleteRequests = updateRequests.take(5).map(r => deleteById(r.index, r.id)) - - // when + val expectedDocuments = List.fill(5)("{}") ++ updateRequests.takeRight(5).flatMap(_.documentSource) esResource.use { es => Task.sequence( Seq( @@ -42,80 +39,59 @@ class ElasticsearchSuite extends AnyFlatSpecLike with Fixture with Matchers with es.refresh(updateRequests.map(_.index.name)), es.bulkExecuteRequest(deleteRequests) ) - ) - }.runSyncUnsafe() - - // then - val r = - Task - .parSequence(updateRequests.map { request => - getById(request.index.name, request.id) - .map(_.sourceAsString) - }) - .runSyncUnsafe() - r should contain theSameElementsAs List.fill(5)("{}") ++ updateRequests.takeRight(5).flatMap(_.documentSource) + ) *> + Task + .parSequence(updateRequests.map { request => + getById(request.index.name, request.id) + .map(_.sourceAsString) + }) + }.asserting(_ should contain theSameElementsAs expectedDocuments) } it should "execute many index requests" in { - // given val indexRequests = Gen.listOfN(10, genIndexRequest).sample.get - // when - esResource.use(_.bulkExecuteRequest(indexRequests)).runSyncUnsafe() - - // then - val r = + esResource.use{ + _.bulkExecuteRequest(indexRequests) + } *> Task .parSequence(indexRequests.map { request => getById(request.index.name, request.id.get) .map(_.sourceAsString) - }) - .runSyncUnsafe() - r should contain theSameElementsAs indexRequests.flatMap(_.source) + }).asserting{ + _ should contain theSameElementsAs indexRequests.flatMap(_.source) + } } it should "execute a single update request" in { - // given val request = genUpdateRequest.sample.get - // when - esResource.use(_.singleUpdate(request)).runSyncUnsafe() - - // then - val r = getById(request.index.name, request.id) - .map(_.sourceAsString) - .runSyncUnsafe() - r shouldBe request.documentSource.get - + esResource.use{ _.singleUpdate(request) *> + getById(request.index.name, request.id) + .map(_.sourceAsString) + }.asserting(_ shouldBe request.documentSource.get) } it should "execute a single delete by id request" in { - // given val updateRequests = Gen.listOfN(2, genUpdateRequest).sample.get val deleteRequest = deleteById(updateRequests.head.index, updateRequests.head.id) + val expectedDocs = List("{}") ++ updateRequests.takeRight(1).flatMap(_.documentSource) - // when esResource.use { es => Task .parSequence(updateRequests.map(es.singleUpdate)) - .flatMap(_ => es.singleDeleteById(deleteRequest)) - }.runSyncUnsafe() - - // then - val r = updateRequests.map { request => - getById(request.index.name, request.id) - .map(_.sourceAsString) - .runSyncUnsafe() - } - r should contain theSameElementsAs List("{}") ++ updateRequests.takeRight(1).flatMap(_.documentSource) + .flatMap(_ => es.singleDeleteById(deleteRequest)) *> + Task.traverse(updateRequests) { request => + getById(request.index.name, request.id) + .map(_.sourceAsString) + } + }.asserting(_ should contain theSameElementsAs expectedDocs) } it should "execute a single delete by query request" in { - // given val updateRequest = genUpdateRequest.sample.get val deleteRequest = deleteByQuery(updateRequest.index, idsQuery(updateRequest.id)) - // when esResource.use { es => Task.sequence( Seq( @@ -124,24 +100,17 @@ class ElasticsearchSuite extends AnyFlatSpecLike with Fixture with Matchers with es.singleDeleteByQuery(deleteRequest), es.refresh(Seq(updateRequest.index.name)) ) - ) - }.runSyncUnsafe() - - // then - val r = esResource.use { - _.search(search(updateRequest.index).query(matchAllQuery())) - .map(_.result.hits.total.value) - }.runSyncUnsafe() - r shouldBe 0 + ) *> + es.search(search(updateRequest.index).query(matchAllQuery())) + .map(_.result.hits.total.value) + } asserting(_ shouldBe 0) } it should "execute a single search request" in { - // given val updateRequest = genUpdateRequest.sample.get val searchRequest = search(updateRequest.index.name).query(matchAllQuery()) - // when - val r = esResource.use { es => + esResource.use { es => Task .sequence( Seq( @@ -150,72 +119,56 @@ class ElasticsearchSuite extends AnyFlatSpecLike with Fixture with Matchers with ) ) .flatMap(_ => es.search(searchRequest)) - }.runSyncUnsafe() - - // then - r.result.hits.hits.head.sourceAsString shouldBe updateRequest.documentSource.get + }.asserting(_.result.hits.hits.head.sourceAsString shouldBe updateRequest.documentSource.get) } it should "execute a create index request" in { - // given val indexName = genIndex.sample.get val indexSource = """{"settings":{"number_of_shards":1},"mappings":{"properties":{"a":{"type":"text"}}}}""" val createIndexRequest = createIndex(indexName).source(indexSource) - // when - esResource.use { es => es.createIndex(createIndexRequest) }.runSyncUnsafe() - - // then - val indexResult = esResource - .use(_.getIndex(getIndex(indexName))) - .runSyncUnsafe() - .result(indexName) - val mappings = indexResult.mappings.properties("a") - val settings = indexResult.settings("index.number_of_shards") - mappings.`type`.get shouldBe "text" - settings shouldBe "1" + esResource.use { es => + es.createIndex(createIndexRequest) *> + es.getIndex(getIndex(indexName)) + }.asserting { response => + val indexResult = response.result(indexName) + val mappings = indexResult.mappings.properties("a") + val settings = indexResult.settings("index.number_of_shards") + mappings.`type`.get shouldBe "text" + settings shouldBe "1" + } } it should "execute a delete index request" in { - // given val indexName = genIndex.sample.get val indexSource = """{"settings":{"number_of_shards":1},"mappings":{"properties":{"a":{"type":"text"}}}}""" val createIndexRequest = createIndex(indexName).source(indexSource) val deleteIndexRequest = deleteIndex(indexName) - // when esResource.use { es => Task.sequence( Seq( es.createIndex(createIndexRequest), es.deleteIndex(deleteIndexRequest) ) - ) - } - - // then - intercept[NoSuchElementException] { - esResource - .use(_.getIndex(getIndex(indexName))) - .runSyncUnsafe() - .result(indexName) + ) *> + es.getIndex(getIndex(indexName)).map(_.result(indexName)).attempt + }.asserting { getIndexAttempt => + getIndexAttempt.isLeft shouldBe true + getIndexAttempt.left.get shouldBe a[NoSuchElementException] } } it should "execute a single count request" in { - // given val index = genIndex.sample.get val updateRequests = Gen.listOfN(100, genUpdateRequest(index)).sample.get val countRequest = count(Indexes(index)).query(matchAllQuery()) - // when - val r = esResource.use { es => + esResource.use { es => Task .parSequence(updateRequests.map(es.singleUpdate)) .flatMap(_ => es.refresh(Seq(index))) .flatMap(_ => es.singleCount(countRequest)) - }.runSyncUnsafe() - // then - r.result.count shouldBe updateRequests.map(_.id).distinct.length + }.asserting(_.result.count shouldBe updateRequests.map(_.id).distinct.length) } } diff --git a/elasticsearch/src/it/scala/monix/connect/elasticsearch/Fixture.scala b/elasticsearch/src/it/scala/monix/connect/elasticsearch/Fixture.scala index 712232014..bb308077d 100644 --- a/elasticsearch/src/it/scala/monix/connect/elasticsearch/Fixture.scala +++ b/elasticsearch/src/it/scala/monix/connect/elasticsearch/Fixture.scala @@ -8,8 +8,10 @@ import com.sksamuel.elastic4s.requests.update.UpdateRequest import com.sksamuel.elastic4s.{RequestFailure, RequestSuccess} import monix.eval.Task import org.scalacheck.Gen +import org.scalatest.AsyncTestSuite trait Fixture { + this: AsyncTestSuite => import com.sksamuel.elastic4s.ElasticDsl._ protected val esResource: Resource[Task, Elasticsearch] = Elasticsearch.create("http://localhost:9200")