Skip to content

Commit

Permalink
Fix tests
Browse files Browse the repository at this point in the history
Makes S3MultipartUpload return a failure on subscription, not before.



Fix test


Scalafmt


Fix test


Remove unused attempt


a


Test fix


Test fix


Runs first IT and then unit tests
Fix concurrent mongodb tests


Fixing redis it tests


Fix redis uri property updates


Fix redis tests


S3 tests


Parallel sqs test


Gcs and elasticsearch tests




ScalafmtAll


Increase random key range


Synchronous gcs tests
  • Loading branch information
paualarco committed Oct 26, 2021
1 parent 1ac01cf commit d1a1e06
Show file tree
Hide file tree
Showing 52 changed files with 899 additions and 1,097 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ jobs:

- name: Run checks
run: sbt scalafmtCheckAll
- name: Run Unit Tests for Java ${{ matrix.java }}, Scala ${{ matrix.scala }}
run: sbt test
- name: Start docker dependencies
run: sh start-dependencies.sh
- name: Run Functional Tests for Java ${{ matrix.java }}, Scala ${{ matrix.scala }}
run: sbt it:test

- name: Run Unit Tests for Java ${{ matrix.java }}, Scala ${{ matrix.scala }}
run: sbt test

mima:
name: Mima binary compatibility test
needs: tests
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,5 @@ project/metals.sbt
website/.docusaurus/

gcs/tmp/

gcs/blob-test/
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class MonixAwsConfigSpec extends AsyncFlatSpec with MonixTaskSpec with Matchers
|}
|""".stripMargin)

Task(configSource.loadOrThrow[AppConf].monixAws).asserting{ monixAwsConf =>
Task(configSource.loadOrThrow[AppConf].monixAws).asserting { monixAwsConf =>
monixAwsConf.credentials shouldBe a[DefaultCredentialsProvider]
monixAwsConf.endpoint shouldBe Some(URI.create("localhost:4566"))
monixAwsConf.httpClient.isDefined shouldBe false
Expand Down Expand Up @@ -132,12 +132,11 @@ class MonixAwsConfigSpec extends AsyncFlatSpec with MonixTaskSpec with Matchers
}

it can "read config in kebabCase" in {
MonixAwsConf.file(new File("aws-auth/src/test/resources/kebab-case.conf"), KebabCase)
.asserting{ monixAwsConf =>
monixAwsConf.credentials shouldBe a[DefaultCredentialsProvider]
monixAwsConf.endpoint shouldBe Some(URI.create("kebab-case:12345"))
monixAwsConf.region shouldBe Region.EU_WEST_1
}
MonixAwsConf.file(new File("aws-auth/src/test/resources/kebab-case.conf"), KebabCase).asserting { monixAwsConf =>
monixAwsConf.credentials shouldBe a[DefaultCredentialsProvider]
monixAwsConf.endpoint shouldBe Some(URI.create("kebab-case:12345"))
monixAwsConf.region shouldBe Region.EU_WEST_1
}
}

it can "read config in snake_case" in {
Expand Down
31 changes: 16 additions & 15 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,11 @@ lazy val sharedSettings = Seq(
"-sourcepath",
file(".").getAbsolutePath.replaceAll("[.]$", "")
),
parallelExecution in Test := false,
parallelExecution in IntegrationTest := false,
parallelExecution in ThisBuild := false,
testForkedParallel in Test := false,
testForkedParallel in IntegrationTest := false,
testForkedParallel in ThisBuild := false,
concurrentRestrictions in Global += Tags.limit(Tags.Test, 1),
parallelExecution in Test := true,
parallelExecution in ThisBuild := true,
testForkedParallel in Test := true,
testForkedParallel in ThisBuild := true,
concurrentRestrictions in Global += Tags.limit(Tags.Test, 3),
logBuffered in Test := false,
logBuffered in IntegrationTest := false,
//dependencyClasspath in IntegrationTest := (dependencyClasspath in IntegrationTest).value ++ (exportedProducts in Test).value,
Expand Down Expand Up @@ -132,7 +130,7 @@ lazy val dynamodb = monixConnector("dynamodb", Dependencies.DynamoDb).aggregate(

lazy val hdfs = monixConnector("hdfs", Dependencies.Hdfs)

lazy val mongodb = monixConnector("mongodb", Dependencies.MongoDb, isMimaEnabled = false)
lazy val mongodb = monixConnector("mongodb", Dependencies.MongoDb, isMimaEnabled = false, isITParallelExecution = true)

lazy val parquet = monixConnector("parquet", Dependencies.Parquet)

Expand All @@ -143,18 +141,18 @@ val protoTestSettings = Seq(
//Compile / PB.protoSources := Seq(new File("src/test/protobuf"))
)

lazy val redis = monixConnector("redis", Dependencies.Redis)
lazy val redis = monixConnector("redis", Dependencies.Redis, isITParallelExecution = false)
.settings(protoTestSettings)

lazy val s3 = monixConnector("s3", Dependencies.S3, isMimaEnabled = false)
lazy val s3 = monixConnector("s3", Dependencies.S3, isMimaEnabled = false, isITParallelExecution = true)
.aggregate(awsAuth).dependsOn(awsAuth % "compile->compile;test->test")

lazy val sqs = monixConnector("sqs", Dependencies.Sqs, isMimaEnabled = false)
lazy val sqs = monixConnector("sqs", Dependencies.Sqs, isMimaEnabled = false, isITParallelExecution = true)
.aggregate(awsAuth).dependsOn(awsAuth % "compile->compile;test->test")

lazy val gcs = monixConnector("gcs", Dependencies.GCS)
lazy val gcs = monixConnector("gcs", Dependencies.GCS, isITParallelExecution = false)

lazy val elasticsearch = monixConnector("elasticsearch", Dependencies.Elasticsearch)
lazy val elasticsearch = monixConnector("elasticsearch", Dependencies.Elasticsearch, isITParallelExecution = true)

//internal

Expand All @@ -163,10 +161,13 @@ lazy val awsAuth = monixConnector("aws-auth", Dependencies.AwsAuth, isMimaEnable
def monixConnector(
connectorName: String,
projectDependencies: Seq[ModuleID],
isMimaEnabled: Boolean = true): Project = {
isMimaEnabled: Boolean = true,
isITParallelExecution: Boolean = false): Project = {
Project(id = connectorName, base = file(connectorName))
.enablePlugins(AutomateHeaderPlugin)
.settings(name := s"monix-$connectorName", libraryDependencies ++= projectDependencies, Defaults.itSettings)
.settings(name := s"monix-$connectorName", libraryDependencies ++= projectDependencies, Defaults.itSettings,
IntegrationTest / parallelExecution := isITParallelExecution,
IntegrationTest / testForkedParallel := isITParallelExecution)
.settings(sharedSettings)
.configs(IntegrationTest, IT)
.enablePlugins(AutomateHeaderPlugin)
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ services:
- SERVICES=dynamodb

elasticmq:
image: softwaremill/elasticmq-native:latest
image: softwaremill/elasticmq-native:1.0.0
ports:
- '9324:9324'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,32 @@ package monix.connect.elasticsearch

import monix.eval.Task
import monix.execution.Scheduler
import monix.execution.Scheduler.Implicits.global
import monix.reactive.Observable
import monix.testing.scalatest.MonixTaskSpec
import org.scalacheck.Gen
import org.scalatest.BeforeAndAfterEach
import org.scalatest.flatspec.{AnyFlatSpecLike, AsyncFlatSpec}
import org.scalatest.flatspec.AsyncFlatSpec
import org.scalatest.matchers.should.Matchers

import scala.util.Try

class ElasticsearchSinkSuite extends AsyncFlatSpec with MonixTaskSpec with Fixture with Matchers with BeforeAndAfterEach {
import com.sksamuel.elastic4s.ElasticDsl._

override implicit val scheduler: Scheduler = Scheduler.io("elasticsearch-sink-suite")

"ElasticsearchSink" should "execute update requests in batches" in {
val updateRequests = Gen.listOfN(10, genUpdateRequest).sample.get
val updateRequests = Gen.listOfN(5, genUpdateRequest).sample.get

esResource.use { es =>
Observable
.from(updateRequests)
.bufferTumbling(5)
.consumeWith(es.bulkRequestSink()) *>
Task
.parSequence(updateRequests.map { request =>
.parTraverse(updateRequests){ request =>
getById(request.index.name, request.id)
.map(_.sourceAsString)
})
}.asserting(_ should contain theSameElementsAs updateRequests.flatMap(_.documentSource))
}
}.asserting(_ should contain allElementsOf updateRequests.flatMap(_.documentSource))
}

it should "execute delete requests in batches" in {
Expand All @@ -50,7 +48,7 @@ class ElasticsearchSinkSuite extends AsyncFlatSpec with MonixTaskSpec with Fixtu
getById(request.index.name, request.id)
.map(_.sourceAsString)
})
}.asserting(_ should contain theSameElementsAs List.fill(5)("{}") ++ updateRequests.takeRight(5).flatMap(_.documentSource))
}.asserting(_ should contain allElementsOf List.fill(5)("{}") ++ updateRequests.takeRight(5).flatMap(_.documentSource))
}

it should "execute index requests batches" in {
Expand All @@ -62,11 +60,11 @@ class ElasticsearchSinkSuite extends AsyncFlatSpec with MonixTaskSpec with Fixtu
.bufferTumbling(5)
.consumeWith(es.bulkRequestSink()) *>
Task
.parSequence(indexRequests.map { request =>
.parTraverse(indexRequests){ request =>
getById(request.index.name, request.id.get)
.map(_.sourceAsString)
})
}.asserting{ _ should contain theSameElementsAs indexRequests.flatMap(_.source) }
}
}.asserting{ _ should contain allElementsOf indexRequests.flatMap(_.source) }
}

it should "fails when the es index not exists" in {
Expand Down
7 changes: 4 additions & 3 deletions gcs/src/it/scala/monix/connect/gcp/storage/GcsBlobSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ import monix.testing.scalatest.MonixTaskSpec
import org.scalacheck.Gen
import org.scalatest.BeforeAndAfterAll
import org.apache.commons.io.FileUtils
import scala.concurrent.duration._

class GcsBlobSuite extends AsyncWordSpec with MonixTaskSpec with IdiomaticMockito with Matchers with ArgumentMatchersSugar with BeforeAndAfterAll {

override implicit val scheduler: Scheduler = Scheduler.io("gcs-blob-suite")
val storage: Storage = LocalStorageHelper.getOptions.getService
val dir = new File("gcs/tmp").toPath
val dir = new File("gcs/blob-test").toPath
val genLocalPath = Gen.identifier.map(s => dir.toAbsolutePath.toString + "/" + s)
val testBucketName = Gen.identifier.sample.get

Expand Down Expand Up @@ -100,8 +101,8 @@ class GcsBlobSuite extends AsyncWordSpec with MonixTaskSpec with IdiomaticMockit
val gcsBlob = new GcsBlob(blob)

for {
_ <- gcsBlob.downloadToFile(filePath)
exists = gcsBlob.exists()
_ <- gcsBlob.downloadToFile(filePath) >> Task.sleep(2.seconds)
exists <- gcsBlob.exists()
r = Files.readAllBytes(filePath)
} yield {
exists shouldBe true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import org.scalatest.wordspec.{AnyWordSpecLike, AsyncWordSpec}
class GcsUploaderSuite extends AsyncWordSpec with MonixTaskSpec with IdiomaticMockito with Matchers with ArgumentMatchersSugar with BeforeAndAfterAll {

val storage = LocalStorageHelper.getOptions.getService
val dir = new File("gcs/tmp").toPath
val dir = new File("gcs/uploader-test").toPath
val genLocalPath = Gen.identifier.map(s => dir.toAbsolutePath.toString + "/" + s)
val testBucketName = Gen.identifier.sample.get
override implicit val scheduler: Scheduler = Scheduler.io("gcs-storage-suite")
Expand All @@ -32,6 +32,7 @@ class GcsUploaderSuite extends AsyncWordSpec with MonixTaskSpec with IdiomaticMo

override def afterAll(): Unit = {
super.beforeAll()

}

s"$GcsUploader consumer implementation" should {
Expand Down
50 changes: 26 additions & 24 deletions hdfs/src/test/scala/monix/connect/hdfs/HdfsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ import monix.reactive.{Consumer, Observable}
import monix.execution.Scheduler
import monix.testing.scalatest.MonixTaskSpec

class HdfsSpec extends AsyncWordSpec with MonixTaskSpec with Matchers with BeforeAndAfterAll with BeforeAndAfterEach with HdfsFixture {
class HdfsSpec
extends AsyncWordSpec with MonixTaskSpec with Matchers with BeforeAndAfterAll with BeforeAndAfterEach
with HdfsFixture {

override implicit val scheduler = Scheduler.io("hdfs-spec")
private var miniHdfs: MiniDFSCluster = _
Expand Down Expand Up @@ -128,27 +130,27 @@ class HdfsSpec extends AsyncWordSpec with MonixTaskSpec with Matchers with Befor
val chunksB: List[Array[Byte]] = genChunks.sample.get
val existedBefore: Boolean = fs.exists(path)

for {
offsetA <- Observable
.from(chunksA)
.consumeWith(hdfsWriter)
resultA <- Hdfs.read(fs, path).headL
failedOverwriteAttempt <- Observable
.from(chunksB)
.consumeWith(hdfsWriter)
.attempt
resultAfterOverwriteAttempt <- Hdfs.read(fs, path).headL
} yield {
existedBefore shouldBe false
fs.exists(path) shouldBe true
resultA shouldBe chunksA.flatten
offsetA shouldBe chunksA.flatten.size
failedOverwriteAttempt.isLeft shouldBe true
failedOverwriteAttempt.left.get shouldBe a[org.apache.hadoop.fs.FileAlreadyExistsException]
fs.exists(path) shouldBe true
resultAfterOverwriteAttempt shouldBe chunksA.flatten
resultAfterOverwriteAttempt.length shouldBe chunksA.flatten.size
}
for {
offsetA <- Observable
.from(chunksA)
.consumeWith(hdfsWriter)
resultA <- Hdfs.read(fs, path).headL
failedOverwriteAttempt <- Observable
.from(chunksB)
.consumeWith(hdfsWriter)
.attempt
resultAfterOverwriteAttempt <- Hdfs.read(fs, path).headL
} yield {
existedBefore shouldBe false
fs.exists(path) shouldBe true
resultA shouldBe chunksA.flatten
offsetA shouldBe chunksA.flatten.size
failedOverwriteAttempt.isLeft shouldBe true
failedOverwriteAttempt.left.get shouldBe a[org.apache.hadoop.fs.FileAlreadyExistsException]
fs.exists(path) shouldBe true
resultAfterOverwriteAttempt shouldBe chunksA.flatten
resultAfterOverwriteAttempt.length shouldBe chunksA.flatten.size
}
}

"allow appending to existing files" in {
Expand All @@ -165,7 +167,7 @@ class HdfsSpec extends AsyncWordSpec with MonixTaskSpec with Matchers with Befor
resultA <- Hdfs.read(fs, path).headL
fileExistsA = fs.exists(path)

finalOffset <- Observable
finalOffset <- Observable
.from(chunksB)
.consumeWith(Hdfs.append(fs, path))
finalResult <- Hdfs.read(fs, path).headL
Expand All @@ -191,7 +193,7 @@ class HdfsSpec extends AsyncWordSpec with MonixTaskSpec with Matchers with Befor
.from(chunksA)
.consumeWith(Hdfs.append(fs, path))
.attempt
.asserting{appendAttempt =>
.asserting { appendAttempt =>
existed shouldBe false
appendAttempt.isLeft shouldBe true
}
Expand Down
Loading

0 comments on commit d1a1e06

Please sign in to comment.