diff --git a/.github/scripts/create-web-site.sh b/.github/scripts/create-web-site.sh new file mode 100755 index 000000000..aa8aa6f93 --- /dev/null +++ b/.github/scripts/create-web-site.sh @@ -0,0 +1,7 @@ +#!/usr/bin/env bash + +set -e + +echo "GIT_DEPLOY_KEY: $GIT_DEPLOY_KEY" + +sbt docs/docusaurusPublishGhpages \ No newline at end of file diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 7a7cf54c9..1702b5b82 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -61,24 +61,7 @@ jobs: - name: Run Mima binary compatibility test for Java ${{ matrix.java }} and Scala ${{ matrix.scala }} run: sbt mimaReportBinaryIssues - - publish-page: - name: Publish documentation page - runs-on: ubuntu-latest - strategy: - matrix: - java: [ 8 ] - scala: [ 2.12.11 ] - - steps: - - uses: actions/checkout@v2 - - uses: olafurpg/setup-scala@v7 - with: - java-version: "adopt@1.${{ matrix.java }}" - - - name: Publishes documentation page - run: sbt docs/docusaurusPublishGhpages - + publish: name: Publish needs: [ tests, mima ] @@ -88,11 +71,16 @@ jobs: - uses: actions/checkout@v1 - uses: olafurpg/setup-scala@v2 - uses: olafurpg/setup-gpg@v2 - - name: Publish ${{ github.ref }} + - name: Publish release ${{ github.ref }} run: sbt ci-release env: PGP_PASSPHRASE: ${{ secrets.PGP_PASSPHRASE }} PGP_SECRET: ${{ secrets.PGP_SECRET }} SONATYPE_PASSWORD: ${{ secrets.SONATYPE_PASSWORD }} SONATYPE_USERNAME: ${{ secrets.SONATYPE_USERNAME }} + - name: Publis documentation web page + run: | + ./.github/scripts/create-web-site.sh + env: + GIT_DEPLOY_KEY: ${{ secrets.GIT_DEPLOY_KEY }} diff --git a/LICENSE.txt b/LICENSE similarity index 100% rename from LICENSE.txt rename to LICENSE diff --git a/README.md b/README.md index 3ed49d88a..3a299f23c 100644 --- a/README.md +++ b/README.md @@ -1,49 +1,36 @@ # Monix Connect - - [![release-badge][]][release] [![workflow-badge][]][workflow] [![Gitter](https://badges.gitter.im/monix/monix-connect.svg)](https://gitter.im/monix/monix-connect?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge) + [![Scala Steward badge](https://img.shields.io/badge/Scala_Steward-helping-blue.svg?style=flat&logo=data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAA4AAAAQCAMAAAARSr4IAAAAVFBMVEUAAACHjojlOy5NWlrKzcYRKjGFjIbp293YycuLa3pYY2LSqql4f3pCUFTgSjNodYRmcXUsPD/NTTbjRS+2jomhgnzNc223cGvZS0HaSD0XLjbaSjElhIr+AAAAAXRSTlMAQObYZgAAAHlJREFUCNdNyosOwyAIhWHAQS1Vt7a77/3fcxxdmv0xwmckutAR1nkm4ggbyEcg/wWmlGLDAA3oL50xi6fk5ffZ3E2E3QfZDCcCN2YtbEWZt+Drc6u6rlqv7Uk0LdKqqr5rk2UCRXOk0vmQKGfc94nOJyQjouF9H/wCc9gECEYfONoAAAAASUVORK5CYII=)](https://scala-steward.org) [workflow]: https://github.com/monix/monix-connect/actions?query=branch%3Amaster+workflow%3Abuild [workflow-badge]: https://github.com/monix/monix-connect/workflows/build/badge.svg - [release]: https://search.maven.org/search?q=a:monix-connect* [release-badge]: https://img.shields.io/github/v/tag/monix/monix-connect.svg - _Warning:_ Mind that the project is yet in early stages and its API is likely to be changed. + ⚠️ _Mind that the project isn't yet stable, so binary compatibility is not guaranteed._ -[Monix Connect]() is an initiative to implement stream integrations for [Monix](https://monix.io/). - - The `connector` describes a connection between the application and a specific data point, which could be a file, a database or any system in which the appication - can interact by sending or receiving information. - Therefore, the aim of this project is to implement the most common - connections that users could need when developing reactive applications with Monix, these would basically reduce boilerplate code and furthermore will let users to greatly save time and complexity in their implementing projects. - - The latest stable version of `monix-connect` is compatible with Monix 3.x, Scala 2.12.x and 2.13.x, you can import - all of the connectors by adding the following dependency (find and fill your release [version](https://github.com/monix/monix-connect/releases)): - - ```scala - libraryDependencies += "io.monix" %% "monix-connect" % "VERSION" -``` +Monix Connect is an initiative to implement stream integrations for [Monix](https://monix.io/). -But you can also only include to a specific connector to your library dependencies, see below how to do so and how to get started with each of the available [connectors](#Connectors). +Learn more on how to get started in the [documentation page](https://monix.github.io/monix-connect/). ### Connectors -1. [Akka](#Akka) -2. [DynamoDB](#DynamoDB) -3. [Hdfs](#HDFS) -4. [Parquet](#Parquet) -5. [Redis](#Redis) -6. [S3](#S3) +The below list comprehends the current set of connectors that are available to use. +1. [Akka](https://monix.github.io/monix-connect/docs/akka) +2. [DynamoDB](https://monix.github.io/monix-connect/docs/dynamodb) +3. [Hdfs](https://monix.github.io/monix-connect/docs/hdfs) +4. [Parquet](https://monix.github.io/monix-connect/docs/parquet) +5. [Redis](https://monix.github.io/monix-connect/docs/redis) +6. [S3](https://monix.github.io/monix-connect/docs/s3) ### Contributing The Monix Connect project welcomes contributions from anybody wishing to participate. All code or documentation that is provided must be licensed with the same license that Monix Connect is licensed with (Apache -2.0, see LICENSE.txt). +2.0, see [LICENCE](./LICENSE)). People are expected to follow the [Scala Code of Conduct](./CODE_OF_CONDUCT.md) when @@ -57,5 +44,5 @@ gladly accepted. For more information, check out the ## License All code in this repository is licensed under the Apache License, -Version 2.0. See [LICENCE.txt](./LICENSE.txt). +Version 2.0. See [LICENCE](./LICENSE). diff --git a/build.sbt b/build.sbt index 8354f9f8b..c45c6724a 100644 --- a/build.sbt +++ b/build.sbt @@ -75,6 +75,27 @@ lazy val sharedSettings = Seq( publishArtifact in Test := false, pomIncludeRepository := { _ => false }, // removes optional dependencies + // ScalaDoc settings + autoAPIMappings := true, + apiURL := Some(url("https://monix.github.io/monix-connect/api/")), + apiMappings ++= { + val cp: Seq[Attributed[File]] = (fullClasspath in Compile).value + def findManagedDependency(organization: String, name: String): File = { + ( for { + entry <- cp + module <- entry.get(moduleID.key) + if module.organization == organization + if module.name.startsWith(name) + } yield entry.data + ).head + } + Map( + findManagedDependency("io.monix","monix-execution") -> url("https://monix.io/api/3.1/"), + findManagedDependency("io.monix","monix-catnap") -> url("https://monix.io/api/3.1/"), + findManagedDependency("org.typelevel","cats-effect") -> url("https://typelevel.org/cats-effect/api/") + ) + }, + licenses := Seq("APL2" -> url("http://www.apache.org/licenses/LICENSE-2.0.txt")), //homepage := Some(url("https://monix.io")), //todo homepage settings headerLicense := Some(HeaderLicense.Custom( @@ -104,6 +125,8 @@ lazy val sharedSettings = Seq( doctestTestFramework := DoctestTestFramework.ScalaTest, doctestTestFramework := DoctestTestFramework.ScalaCheck, doctestOnlyCodeBlocksMode := true + + ) def mimaSettings(projectName: String) = Seq( @@ -144,12 +167,9 @@ lazy val monix = (project in file(".")) .settings(name := "monix-connect") .aggregate(akka, dynamodb, hdfs, parquet, redis, s3) .dependsOn(akka, dynamodb, hdfs, parquet, redis, s3) - //.settings(unidocSettings) //todo enable unidoc settings - //.enablePlugins(ScalaUnidocPlugin) lazy val akka = monixConnector("akka", Dependencies.Akka) - lazy val dynamodb = monixConnector("dynamodb", Dependencies.DynamoDb) lazy val hdfs = monixConnector("hdfs", Dependencies.Hdfs) @@ -230,7 +250,7 @@ lazy val mdocSettings = Seq( scalacOptions in (ScalaUnidoc, unidoc) ++= Seq( "-doc-source-url", s"https://github.com/monix/monix-connect/tree/v${version.value}€{FILE_PATH}.scala", "-sourcepath", baseDirectory.in(LocalRootProject).value.getAbsolutePath, - "-doc-title", "Monix BIO", + "-doc-title", "Monix Connect", "-doc-version", s"v${version.value}", "-groups" ), diff --git a/docs/akka.md b/docs/akka.md index a7ee9745b..1689b08c5 100644 --- a/docs/akka.md +++ b/docs/akka.md @@ -7,17 +7,18 @@ title: Akka Streams This module makes interoperability with akka streams easier by simply defining implicit extended classes for reactive stream conversions between akka and monix. -The three main core abstractions defined in akka streams are _Source_, _Flow_ and _Sink_, -they were designed following the JVM [reactive streams](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md) standards, -meaning that under it's design they implement the `Publisher` and `Subscriber` contract. -Which at the end it means that they can interoperate with other libraries that also implements the reactive stream specification, such as [Monix Reactive](https://monix.io/api/3.2/monix/reactive/index.html). - So this module aims to provide a nice interoperability between these two reactive streams libraries. - - In order to achieve it, this module will provide an extended conversion method for each of the stream abstractions mentioned before. +The [reactive streams](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md) specification is an standard to follow for those +JVM libraries that aims to provide asynchronous stream processing with non-blocking back pressure. +As you might probably know, both _Akka Streams_ and _[Monix Reactive](https://monix.io/api/3.2/monix/reactive/index.html)_ follows the same standard, meaning that it is possible to convert between +their own different stream data types (since both implement the `Publisher` and `Subscriber` contract). + +So this module aims to provide a nice and easy inter-operability between the two mentioned libraries, and to +achieve that it provides extended conversion methods. These implicit extended methods can be imported from: `monix.connect.akka.stream.Converters._`. -Therefore, under the scope of the import, the signatures `.asObservable` and `.asConsumer` will be available for the `Source`, `Flow`, and `Sink`. +Therefore, under the scope of the import, the signatures `.asObservable` and `.asConsumer` will be available from `Source`, `Flow`, and `Sink` instances, +whereas `asSource` and `asSink` would be for the monix `Observable` and `Consumer` -The below table shows in more detail the specs for the conversion from akka stremas to monix: +The below table shows that in more detail: | _Akka_ | _Monix_ | _Akka → Monix_ | _Akka ← Monix_ | | :---: | :---: | :---: | :---: | @@ -25,7 +26,7 @@ The below table shows in more detail the specs for the conversion from akka stre | _Flow[-In, +Out, +Mat]_ | _Consumer[-In, +Out]_ | `flow.asConsumer[Out]` | - | | _Sink[-In, +Out <: Future[Mat]]_ | _Consumer[-In, +Mat]_ | `sink.asConsumer[Mat]` | `consumer.asSink[In]` | -Note that two methods does not need to be typed as it has been done explicitly in the example table, the compiler will infer it for you. +Note that when calling the methods it is not needed to pass the _type parameter_ (as it has been explicitly indicated in the example table), the compiler will infer it for you. Also, in order to perform these conversion it is required to have an implicit instance of `akka.stream.Materializer` and `monix.execution.Scheduler` in the scope. @@ -38,29 +39,29 @@ libraryDependencies += "io.monix" %% "monix-akka" % "0.1.0" ## Getting started -The following code shows how these implicits can be initialized, but `Scheduler` and `ActorSystem` can be initialized differently and configured differently depending on the use case. +The only three things we will need to perform these conversions would be the implicit conversion, and an instance of the monix `Scheduler` and the akka `ActorMaterializer`. ```scala import monix.connect.akka.stream.Converters._ import monix.execution.Scheduler.Implicits.global -val actorSystem: ActorSystem = ActorSystem("Akka-Streams-InterOp") -implicit val materializer = ActorMaterializer() +val actorSystem: ActorSystem = ActorSystem("Akka-Streams-InterOp") +implicit val materializer = ActorMaterializer() //setting actorSystem as implicit variable might have ben enough ``` ### Akka → Monix #### asObservable -Let's see an example for converting an `Source[+In, +Mat]` to `Observable[+In]`: +Let's see how easy can be converting an `Source[+In, +Mat]` to `Observable[+In]`: ```scala //given -val elements = 1 until 50 +val source: Source[Int] = Source.from(1 until 50) //when -val ob: Observable[Int] = Source.from(elements).asObservable //`asObservable` converter as extended method of source. +val ob: Observable[Int] = source.asObservable //`asObservable` converter as extended method of source. -//then +//then ob.toListL.runSyncUnsafe() should contain theSameElementsAs elements ``` @@ -68,9 +69,9 @@ In this case we have not needed to consume the `Observable` since we directly us to a list `.toList`, but note that in case you need to use an specific consumer, you can also directly call `consumeWith`, as a shortcut for `source.asObservable.consumeWith(consumer)`, see an example below: ```scala -//given the same `elements` and `source` as above example` +//given the same `source` as the above example` -//then +//when val t: Task[List[Int]] = source.consumeWith(Consumer.toList) //`consumeWith` as extended method of `Source` //then @@ -80,16 +81,16 @@ t.runSyncUnsafe() should contain theSameElementsAs elements #### asConsumer On the other hand, see how to convert an `Sink[-In, +Out <: Future[Mat]]` into a `Consumer[+In, +Mat]`. - ```scala //given val foldSumSink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)((acc, num) => acc + num) //when -val (consumer: Consumer[Int, Int]) = foldSumSink.asConsumer[Int] //`asConsumer` as an extended method of `Sink` -val t: Task[Int] = Observable.fromIterable(Seq(1, 2, 3)).consumeWith(consumer) +val consumer: Consumer[Int, Int] = foldSumSink.asConsumer[Int] //`asConsumer` as an extended method of `Sink` -//then the future value of `t` should be 6 +//then +val t: Task[Int] = Observable.fromIterable(Seq(1, 2, 3)).consumeWith(consumer) +t.runSyncUnsafe() should be 6 ``` Finally, you can also convert `Flow[-In, +Out, +Mat]` into `Consumer[+In, +Out]` in the same way you did with `Sink` @@ -103,7 +104,7 @@ val foldSumFlow: Flow[Int, Int, NotUsed] = Flow[Int].fold[Int](0)((acc, num) => val (consumer: Consumer[Int, Int]) = foldSumFlow.asConsumer //`asConsumer` as an extended method of `Flow` val t: Task[Int] = Observable.fromIterable(Seq(1, 2, 3)).consumeWith(consumer) -//then the future value of `t` should be 6 +t.runSyncUnsafe() should be 6 ``` Notice that this interoperability would allow the Monix user to take advantage of the already pre built integrations @@ -115,19 +116,28 @@ from [Alpakka](https://doc.akka.io/docs/alpakka/current/index.html) or any other On the other hand, for converting from Monix `Observable[+In]` to Akka Streams `Source[+In, NotUsed]` we would use the conversion signature `asSource`. ```scala -import monix.connect.akka.stream.Converters._ -val f: Future[Seq[Long]] = Observable.range(1, 100).asSource.runWith(Sink.seq) -//eventualy will return a sequence from 1 to 100 +//given +val ob: Observable[Int] = Observable.range(1, 100) + +//when +val f: Future[Seq[Long]] = ob.asSource.runWith(Sink.seq) + +//then eventualy will return a sequence from 1 to 100 ``` #### asConsumer -Finally, for converting from `Sink[-In, +Out <: Future[Mat]]` to `Consumer[-In, +Mat]`. +Finally, use `asConsumer` for converting from `Sink[-In, +Out <: Future[Mat]]` to `Consumer[-In, +Mat]`. ```scala //given -import monix.connect.akka.stream.Converters._ val sink: Sink[Int, Future[String]] = Sink.fold[String, Int]("")((s, i) => s + i.toString) -val t: Task[String] = Observable.fromIterable(1 until 10).consumeWith(sink.asConsumer[String]) -//eventually will return "123456789" + +//and +val ob: Observable[Int] = Observable.fromIterable(1 until 10) + +//when +val t: Task[String] = ob.consumeWith(sink.asConsumer[String]) + +//then eventually will return "123456789" ``` \ No newline at end of file diff --git a/docs/dynamo.md b/docs/dynamo.md index 2d13853dd..bba55adfe 100644 --- a/docs/dynamo.md +++ b/docs/dynamo.md @@ -30,35 +30,46 @@ libraryDependencies += "io.monix" %% "monix-dynamodb" % "0.1.0" The `DynamoDbAsyncClient` needs to be defined as `implicit` as it will be required by the _consumer_ and _tranformer_ implementations. -It is also required and additional import for bringing the implicit conversions between DynamoDB requests and `DynamoDbOp`, the upper abstraction layer will allow to exacute them all (no need to worry about that): - - ```scala -import monix.connect.dynamodb.DynamoDbOp._ -``` +It is also required and additional import `monix.connect.dynamodb.DynamoDbOp._` for bringing the implicit conversions between the specific `DynamoDBRequest` to `DynamoDbOp`, you don't have to worry about the second data type +, it is an abstraction that provides with the functionality to execute each request with its onw operation: See below an example for transforming and consuming DynamoDb operations with monix. -###_Transformer:_ -```scala +### Transformer + //this is an example of a stream that transforms and executes DynamoDb `GetItemRequests`: + + +```scala +import monix.connect.dynamodb.DynamoDbOp._ +import software.amazon.awssdk.services.dynamodb.model.{GetItemRequest, GetItemResponse} + +//presumably you will have a stream of dynamo db `GetItemRequest` requests coming inin this case of type val dynamoDbRequests = List[GetItemRequest] = ??? val ob = Observable[Task[GetItemResponse]] = { Observable .fromIterable(dynamoDbRequests) - .transform(DynamoDb.transofrmer()) -} //for each element transforms the get request operations into its respective get response + .transform(DynamoDb.transofrmer()) // transforms each get request operation into its respective get response +} //the resulted observable would be of type Observable[Task[GetItemResponse]] ``` -###_Consumer:_ +### Consumer + +An example of a stream that consumes and executes DynamoDb `PutItemRequest`s: ```scala -//this is an example of a stream that consumes and executes DynamoDb `PutItemRequest`: -val putItemRequests = List[PutItemRequests] = ??? +import monix.connect.dynamodb.DynamoDbOp._ +import software.amazon.awssdk.services.dynamodb.model.{PutItemRequest, PutItemResponse} + +//presumably you will have a stream of dynamo db `PutItemrequest` coming in. +val putItemRequests = List[PutItemRequest] = ??? -Observable -.fromIterable(dynamoDbRequests) -.consumeWith(DynamoDb.consumer()) //a safe and syncronous consumer that executes dynamodb requests +val ob = Task[PutItemResponse] = { + Observable + .fromIterable(dynamoDbRequests) + .consumeWith(DynamoDb.consumer()) // asynchronous consumer that executes put item requests +} //the materialized value would be of type Task[PutItemResponse] ``` diff --git a/docs/hdfs.md b/docs/hdfs.md index 1a203a8c9..cc4a97ec2 100644 --- a/docs/hdfs.md +++ b/docs/hdfs.md @@ -6,13 +6,12 @@ title: HDFS ## Introduction The _Hadoop Distributed File System_ ([HDFS](https://hadoop.apache.org/docs/r1.2.1/hdfs_design.html)) is a distributed file system designed to run on commodity hardware, -it is is highly fault-tolerant and it provides high throughput access which makes it suitable for applications that have to handle with large data sets. +also being highly fault-tolerant and providing high throughput access makes it very suitable for applications that have to handle with large data sets. -This connector then allows to read and write hdfs files of any size in a streaming fashion. - -The methods to perform these operations are exposed under the object ```monix.connect.hdfs.Hdfs```, in which -it has been built on top of the the official _apache hadoop_ api. +This connector then allows to _read_ and _write_ hdfs files of any size in a streaming fashion. +The methods to perform these operations are exposed under the object ```monix.connect.hdfs.Hdfs```, +and it is built on top of the the official _apache hadoop_ api. ## Dependency @@ -21,8 +20,7 @@ Add the following dependency to get started: libraryDependencies += "io.monix" %% "monix-hdfs" % "0.1.0" ``` -By default the connector uses Hadoop version 3.1.1. In case you need a different one you can replace it by excluding `org.apache.hadoop` from `monix-hdfs` and add the new one to your library dependencies. - +By default the connector uses _Hadoop 3.1.1_. In case you need a different one you can replace it by excluding `org.apache.hadoop` from `monix-hdfs` and add the new one to your library dependencies. ## Getting started @@ -31,53 +29,54 @@ By default the connector uses Hadoop version 3.1.1. In case you need a different The following import is a common requirement for all those methods defined in the `Hdfs` object: ```scala import org.apache.hadoop.fs.FileSystem -//The abstract representation of a file system which could be a distributed or a local one. +//abstract representation of a file system which could be a distributed or a local one. import org.apache.hadoop.fs.Path -//Represents a file or directory in a FileSystem +//represents a file or directory in a FileSystem ``` Each use case would need different settings to create the hadoop configurations, but for testing purposes we would just need a plain one: ```scala -val conf = new Configuration() //Provides access to the hadoop configurable parameters -conf.set("fs.default.name", s"hdfs://localhost:$port") //especifies the local endpoint of the test hadoop minicluster +import org.apache.hadoop.conf.Configuration + +val hadoopConf = new Configuration() //provides access to the hadoop configurable parameters val fs: FileSystem = FileSystem.get(conf) ``` ### Reader + +Let's start interacting with hdfs, the following example shows how to construct a pipeline that reads from the specified hdfs file. -Then we can start interacting with hdfs, the following example shows how to construct a pipeline that reads from the specified hdfs file. +Normally working with hdfs means that you will be dealing with big data, it makes it difficult or very expensive (if not impossible) to read the whole file at once from a single machine, + therefore the application will read the file in small parts configured by the user that eventually will end with the whole file being processed. ```scala - val sourcePath: Path = new Path("/source/hdfs/file_source.txt") val chunkSize: Int = 8192 //size of the chunks to be pulled -//Once we have the hadoop classes we can create the hdfs monix reader +//once we have the hadoop classes we can create the hdfs monix reader val ob: Observable[Array[Byte]] = Hdfs.read(fs, path, chunkSize) ``` -Since using hdfs means we are dealing with big data, it makes it difficult or very expensive to read the whole file at once, - therefore with the above example we will read the file in small parts configured by `chunkSize` that eventually will end with the whole file being processed. - ### Writer -Using the stream generated when reading form hdfs in the previous we could write them back into a path like: +The following example shows how consume a stream of bytes create a file and writes chunks into it: + ```scala val destinationPath: Path = new Path("/destination/hdfs/file_dest.txt") val hdfsWriter: Consumer[Array[Byte], Task[Long]] = Hdfs.write(fs, destinationPath) -// eventually it will return the size of the written file +//eventually it will return the size of the written file val t: Task[Long] = ob.consumeWith(hdfsWriter) ``` -The returned type would represent the total size in bytes of the written data. +It materializes to a `Long` value that represents the file size. -Note that the write hdfs consumer implementation provides different configurations to be passed as parameters such as -enable overwrite (true by default), replication factor (3), the bufferSize (4096 bytes), blockSize (134217728 bytes =~ 128 MB) -and finally a line separator which is not used by default (None). +Note that the hdfs `write` signature allows different configurations to be passed as parameters such as to +_enable overwrite_ (`true` by default), _replication factor_ `3`, the _bufferSize_ of `4096 bytes`, _blockSize_ `134217728 bytes =~ 128 MB`_ +and finally a _line separator_ which is not used by default _None_. -Below example shows an example on how can them be tweaked: +Below example shows an example on how easily can them be tweaked: ```scala val hdfsWriter: Consumer[Array[Byte], Long] = @@ -92,20 +91,24 @@ val hdfsWriter: Consumer[Array[Byte], Long] = ### Appender -Finally, the hdfs connector also exposes an append operation, in which in this case it materializes to a `Long` -that would represent the only the size of the appended data, but not of the whole file. +Finally, the hdfs connector also exposes an _append_ operation which is very similar to _writer_ implementation, +but in this case the materialized `Long` value only represents the size of the appended data, but not of the whole file. -Note also that this method does not allow to configure neither the replication factor nor block size and so on, this is because -these configurations are only set whenever a file is created, but an append operation would reuse them from the existing file. +On the other hand, this method does not allow to configure neither the _replication factor_ nor _block size_ and so on, this is because +these configurations are only set whenever a file is created but an append operation would reuse them from the existing file. See below an example: ```scala -// you would probably need to tweak the hadoop configuration to allow the append operation -conf.setBoolean("dfs.support.append", true) -conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER") +val hadoopConf = new Configuration() +//enables the append operation +hadoopConf.setBoolean("dfs.support.append", true) +//found it necessary when running tests on hadoop mini-cluster, but you should tweak the hadoopConf accordingly to your use case +hadoopConf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER") +hadoopConf.set("fs.default.name", s"hdfs://localhost:$port") //especifies the local endpoint of the test hadoop minicluster +val fs: FileSystem = FileSystem.get(conf) -// note that we are re-using the `destinationPath` of the last example since should already exist +//note that we are re-using the `destinationPath` of the last example since should already exist val hdfsAppender: Consumer[Array[Byte], Task[Long]] = Hdfs.append(fs, destinationPath) val ob: Observer[Array[Byte]] = ??? val t: Task[Long] = ob.consumeWith(hdfsAppender) @@ -113,8 +116,8 @@ val t: Task[Long] = ob.consumeWith(hdfsAppender) ## Local testing - Apache Hadoop has a sub project called [Mini Cluster](https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-minicluster) - that allows to locally spin up a single-node Hadoop cluster without the need to set any environment variables or manage configuration files. + _Apache Hadoop_ has a sub project called [Mini Cluster](https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-minicluster) + that allows to locally spin up a single-node Hadoop cluster without the need to set any environment variables or manage hadoop configuration files. Add to your library dependencies with the desired version: @@ -122,8 +125,8 @@ Add to your library dependencies with the desired version: "org.apache.hadoop" % "hadoop-minicluster" % "VERSION" % Test ``` -From there on, since in this case the tests won't depend on a docker container but as per using a library dependency they will run against the JVM, so you will have to specify where to start and stop the -hadoop mini cluster on the same test, it is a good practice do that on `BeforeAndAfterAll`: +From there on, as in this case the tests won't depend on a docker container but will depend the emulation running in the JVM, +you will have to start and stop the hadoop mini cluster from the same test, as a good practice using `BeforeAndAfterAll`: ```scala import java.io.File diff --git a/docs/overview.md b/docs/overview.md index 720cec355..53d7316af 100644 --- a/docs/overview.md +++ b/docs/overview.md @@ -3,17 +3,19 @@ id: overview title: Overview --- -Monix Connect is an initiative to implement stream integrations for [Monix](https://monix.io/). +_Monix Connect_ is an initiative to implement stream integrations for [Monix](https://monix.io/). - A connector describes the connection between the application and a specific data point, which could be a file, a database or any system in which the appication - can interact by sending or receiving information. Therefore, the aim of this project is to catch the most common - connections that users could need when developing reactive applications with Monix, these would basically reduce boilerplate code and furthermore, will let the users to greatly save time and complexity in their implementing projects. + A _connector_ describes the connection between the application and a specific data point, which could be a file, a database or any system in which the appication + can interact by sending or receiving information. - The latest stable version of `monix-connect` is compatible with Monix 3.x, Scala 2.12.x and 2.13.x, you can import + Therefore, the aim of this project is to provide to the user with the most common + connections they could need when developing reactive applications with Monix, these would basically reduce boilerplate code and furthermore, will let them to greatly save time and complexity on implementing projects. + + The latest stable version of `monix-connect` is compatible with _Monix 3.x_, _Scala 2.12.x_ and _2.13.x_, you can import all of the connectors by adding the following dependency (find and fill your release [version](https://github.com/monix/monix-connect/releases)): ```scala - libraryDependencies += "io.monix" %% "monix-connect" % "VERSION" + libraryDependencies += "io.monix" %% "monix-connect" % VERSION ``` -But you would probably only want to add a specific connector to your library dependencies, see how to do so and how to get started with each of the available connectors in the next sections. +But you would probably only want to add a specific connector to your library dependencies, see how to do so and how to get started with them in the next sections. diff --git a/docs/parquet.md b/docs/parquet.md index b004cc657..ce0c0d0f6 100644 --- a/docs/parquet.md +++ b/docs/parquet.md @@ -4,11 +4,11 @@ title: Apache Parquet --- ## Introduction -[Apache Parquet](http://parquet.apache.org/) is a columnar storage format that provides the advantages of compressed, efficient data representation available to any project in the Hadoop ecosystem. +[Apache Parquet](http://parquet.apache.org/) is a columnar storage format that provides the advantages of compressed, efficient data representation available to any project in the _Hadoop_ ecosystem. It has already been proved by multiple projects that have demonstrated the performance impact of applying the right compression and encoding scheme to the data. -Therefore, the parquet connector basically exposes stream integrations for reading and writing into and from parquet files either in the _local system_, _hdfs_ or _S3_. +Therefore, the `monix-parquet` _connector_ basically exposes stream integrations for _reading_ and _writing_ into and from parquet files either in the _local system_, _hdfs_ or _S3_ (this is at least for `avro` and `protobuf` parquet sub-modules). ## Set up @@ -20,13 +20,20 @@ Add the following dependency: ## Getting started -These two signatures `Parquet.write` and `Parquet.read` are built on top of the _apache parquet_ `ParquetWriter[T]` and `ParquetReader[T]`, therefore they need an instance of these types to be passed. +These two signatures `write` and `read` are built on top of the _apache parquet_ `ParquetWriter[T]` and `ParquetReader[T]` respectively, therefore they need an instance of these types to be passed. -The below example shows how to construct a parquet consumer that expects _Protobuf_ messages and pushes -them into the same parquet file of the specified location. +The _type parameter_ `T` represents the data type that is expected to be read or written in the parquet file. +In which it can depend on the parquet implementation chosen, since `ParqueReader` and `ParquetWriter` are just the + generic classes, but you would need to use the implementation that fits to your use case. + ### Writer +The below example shows how to construct a parquet consumer that expects _Protobuf_ messages and pushes +them into the same parquet file of the specified location. +In this case, the type parameter `T` would need to implement `com.google.protobuf.Message`, +and these were generated using [ScalaPB](https://scalapb.github.io/). + ```scala import monix.connect.parquet.Parquet import org.apache.hadoop.fs.Path @@ -42,12 +49,15 @@ val w = new ParquetWriter[ProtoMessage](new Path(file), writeSupport) Observable .fromIterable(messages) .consumeWith(Parquet.writer(w)) -//ProtoMessage implements [[com.google.protobuf.Message]] +] ``` ### Reader -On the other hand, the following code shows how to pull _Avro_ records from a parquet file: +On the other hand, the following code shows how to pull _Avro_ records from a parquet file. +In contrast, this time the type parameter `T` would need to be a subtype of `org.apache.avro.generic.GenericRecord`, +In this case we used is `com.sksamuel.avro4s.Record` generated using [Avro4s](https://github.com/sksamuel/avro4s), + but there are other libraries there such as [Avrohugger](https://github.com/julianpeeters/avrohugger) to generate these classes. ```scala import monix.connect.parquet.Parquet @@ -63,23 +73,21 @@ val r: ParquetReader[AvroRecord] = { } val ob: Observable[AvroRecord] = Parquet.reader(r) -//AvroRecord implements [[org.apache.avro.generic.GenericRecord]] + ``` -Warning: This connector provides with the logic of building a publisher and subscriber from a given apache hadoop `ParquetReader` and `ParquetWriter` respectively, -but it does not cover any existing issue within the support interoperability of the apache parquet library with external ones. -Notice that p.e we have found an issue when reading parquet as protobuf messages with `org.apache.parquet.hadoop.ParquetReader` but not when writing. -Follow the state of this [issue](https://github.com/monix/monix-connect/issues/34). -On the other hand, it was all fine the integration between `Avro` and `Parquet`. +_Warning_: This connector provides with the logic of building a publisher and subscriber from a given apache hadoop `ParquetReader` and `ParquetWriter` respectively, +but it does not cover any existing issue on the support of the apache parquet library with external ones. +Notice that p.e we have found an issue when reading parquet as protobuf messages generated with `SacalaPB` but not when writing. ## Local testing -It will depend on the specific use case, as we mentioned earlier in the introductory section it can operate on the local filesystem on hdfs or even in S3. +It will depend on the specific use case, as we mentioned earlier in the introductory section it can operate on the _local filesystem_ on _hdfs_ or even in _S3_ (for _avro_ and _protobuf_ parquet sub-modules) Therefore, depending on the application requirements, the hadoop `Configuration` class will need to be configured accordingly. -__Local:__ So far in the examples has been shown how to use it locally, in which in that case it would just be needed to create a plain instance like: ```new org.apache.hadoop.conf.Configuration()``` and the local path will be -specified like: `new org.apache.hadoop.fs.Path("/this/represents/a/local/path")`. +__Local:__ So far in the examples has been shown how to use it locally, in which in that case it would just be needed to create a plain instance of hadoop configuration, and the `Path` that would + represent the file in the local system. __Hdfs:__ On the other hand, the most common case is to work with parquet files in hdfs, in that case my recommendation is to find specific posts and examples on how to set up your configuration for that. But on some extend, for setting up the local test environment you would need to use the hadoop minicluster and set the configuration accordingly. diff --git a/docs/redis.md b/docs/redis.md index 04cd03a05..4f2f85a99 100644 --- a/docs/redis.md +++ b/docs/redis.md @@ -8,21 +8,20 @@ _Redis_ is an open source, in-memory data structure store, used as a database, c It supports data structures such as string, hashes, lists, sets, sorted sets with range queries, streams and more. It has a defined a set of [commands](https://redis.io/commands) to inter-operate with, and most of them are also available from the java api. -This connector has been built on top of [lettuce](https://lettuce.io/), the most popular java library for operating with a non blocking Redis client. +This connector has been built on top of [lettuce](https://lettuce.io/), the most popular java library for operating with a _non blocking_ Redis client. - -Then `monix-redis` creates the interoperability between the reactive types returned by the lettuce api like (`Mono` and `Flux`) from [Reactor](https://projectreactor.io/docs/core/release/reference/) or `RedisFuture[T]` -At the same time that it returns the right values from scala lang and not form java, resulting in a greatly reduction of boilerplate code that makes the user to have a nice experience while integrating +Then `monix-redis` is nothing els that a nice interoperability between the reactive types returned by the lettuce api like (`Mono` and `Flux`) from [Reactor](https://projectreactor.io/docs/core/release/reference/) project or `RedisFuture[T]` +At the same time that it returns the right values from scala lang and not form java, resulting in a idiomatic api that greatly reduces boilerplate code makes the user to have a nice experience while integrating redis operations using monix. See an example in below table: - | Signature | Lettuce _Async_ | Lettuce _Reactive_ | _Monix_ | - | :---: | :---: | :---: | :---: | - | __del__ | _RedisFuture_ | _Mono_ | _Task[Long]_ | - | __hset__ | _RedisFuture_ | _Mono_ | _Task[Boolean]_ | - | __hvals__ | _RedisFuture>_ | _Flux>_ | _Observable[V]_ | - | __...__ | ... | ... | ... | + | _Async_ | _Reactive_ | _Monix_ | + | :---: | :---: | :---: | + | _RedisFuture_ | _Mono_ | _Task[Long]_ | + | _RedisFuture_ | _Mono_ | _Task[Boolean]_ | + | _RedisFuture>_ | _Flux_ | _Observable[V]_ | + | ... | ... | ... | ## Dependency @@ -35,40 +34,42 @@ libraryDependencies += "io.monix" %% "monix-redis" % "0.1.0" ## Getting started -Redis provides a wide range of commands to perform a different range of operations, in which it has been splitted between 15 different groups. -Monix Redis connector only currently provides support for the most common used ones: ([Keys](https://redis.io/commands#generic), [Hashes](https://redis.io/commands#hash), [List](https://redis.io/commands#list), [Pub/Sub](https://redis.io/commands#pubsub), [Server](https://redis.io/commands#server), [Sets](https://redis.io/commands#set), [SortedSets](https://redis.io/commands#sorted_set), [Streams](https://redis.io/commands#stream) and [Strings](https://redis.io/commands#string)). -Each of these modules has its own object located under `monix.connect.redis`, being `Redis` the one that aggregates them all. But they can be individually used too. +Redis provides a wide range of commands to perform a different range of operations, divided into 15 different groups. +Currently, this connector only provides support for the most common used ones: ([Keys](https://redis.io/commands#generic), [Hashes](https://redis.io/commands#hash), [List](https://redis.io/commands#list), [Pub/Sub](https://redis.io/commands#pubsub), [Server](https://redis.io/commands#server), [Sets](https://redis.io/commands#set), [SortedSets](https://redis.io/commands#sorted_set), [Streams](https://redis.io/commands#stream) and [Strings](https://redis.io/commands#string)). +Each of these modules has its own object located under `monix.connect.redis` package, being `Redis` the one that aggregates them all. But they can be individually used too. -The only extra thing you need to do for start using it is to have an implicit `StatefulRedisConnection[K, V]` in the scope. +Apart of that, you will only need to define an implicit instance of `StatefulRedisConnection[K, V]` in the scope of the . On continuation let's show an example for each of the redis data group: ### __Keys__ -The following example uses the redis keys api `monix.connect.redis.RedisKey` to show a little example on working with some basic key operations. +The following example uses the redis keys api `RedisKey` to show a little example on working with some basic key operations. ```scala +import monix.connect.redis.RedisKey + //given two keys and a value -val key1: K //assuming that k1 initially exists -val key2: K //k2 does not exists +val key1: K // assuming that k1 initially exists +val key2: K // k2 does not exists val value: String //when -val f: Task[Long, Boolean, Long, Long, Long, Long] = { +val t: Task[Long, Boolean, Long, Long, Long, Long] = { for { - initialTtl <- RedisKey.ttl(key1) //checks the ttl when it hasn't been set yet - expire <- RedisKey.expire(key1, 5) //sets the ttl to 5 seconds - finalTtl <- RedisKey.ttl(key1) //checks the ttl again - existsWithinTtl <- RedisKey.exists(key1) //checks whether k1 exists or not - _ <- RedisKey.rename(key1, key2) //renames k1 to k2 - existsRenamed <- RedisKey.exists(key2) //checks that it exists after being renamed + initialTtl <- RedisKey.ttl(key1) //checks the ttl when it hasn't been set yet + expire <- RedisKey.expire(key1, 5) //sets the ttl to 5 seconds + finalTtl <- RedisKey.ttl(key1) //checks the ttl again + existsWithinTtl <- RedisKey.exists(key1) //checks whether k1 exists or not + _ <- RedisKey.rename(key1, key2) //renames k1 to k2 + existsRenamed <- RedisKey.exists(key2) //checks that it exists after being renamed _ <- Task.sleep(6.seconds) existsAfterFiveSeconds <- RedisKey.exists(key2) //after 6 seconds checks ttl again } yield (initialTtl, expire, finalTtl, existsWithinTtl, existsRenamed, existsAfterFiveSeconds) } //then -val (initialTtl, expire, finalTtl, existsWithinTtl, existsRenamed, existsAfterFiveSeconds).runSyncUnsafe() +val (initialTtl, expire, finalTtl, existsWithinTtl, existsRenamed, existsAfterFiveSeconds) = t.runSyncUnsafe() initialTtl should be < 0L finalTtl should be > 0L expire shouldBe true @@ -79,15 +80,17 @@ existsAfterFiveSeconds shouldBe 0L ### __Hashes__ -The following example uses the redis hash api `monix.connect.redis.RedisHash` to insert a single element into a hash and read it back from the hash. +The following example uses the redis hash api `RedisHash` to insert a single element into a hash and read it back from the hash. ```scala +import monix.connect.redis.RedisHash + val key: String = ??? val field: String = ??? val value: String = ??? val t: Task[String] = for { - _ <- RedisHash.hset(key, field, value).runSyncUnsafe() + _ <- RedisHash.hset(key, field, value) v <- RedisHash.hget(key, field) } yield v @@ -96,9 +99,11 @@ val fv: Future[String] = t.runToFuture() ### __Lists__ -The following example uses the redis list api `monix.connect.redis.RedisList` to insert elements into a redis list and reading them back with limited size. +The following example uses the redis list api `RedisList` to insert elements into a redis list and reading them back with limited size. ```scala +import monix.connect.redis.RedisList + val key: String = String val values: List[String] @@ -107,8 +112,7 @@ val tl: Task[List[String]] = for { l <- RedisList.lrange(key, 0, values.size).toListL } yield l -//a different alternative to use whether there is a risk of fetching a big list of elements -//or that you want to keep working with Observable[v] type rather than Task[List[V]] +//a safer alternative to use that will return Observable[v] rather than Task[List[V]] val ob: Observable[String] = for { _ <- Observable.fromTask(RedisList.lpush(key, values: _*)) ob <- RedisList.lrange(key, 0, values.size) @@ -117,21 +121,25 @@ val ob: Observable[String] = for { ### __Pub/Sub__ -Example coming soon. +Coming soon. ### __Server__ -The following code shows how to remove all keys from all dbs in redis using the server api `monix.connect.redis.RedisServer` a very basic but also common use case: +The following code shows how to remove all keys from all dbs in redis using the server api `RedisServer` a very basic but also common use case: ```scala +import monix.connect.redis.RedisServer + val t: Task[String] = RedisServer.flushall() //returns a simple string reply ``` ### __Sets__ -The following code uses the redis sets api from `monix.connect.redis.RedisSet`, this one is a bit longer than the others but not more complex. +The following code sample uses the redis sets api from `RedisSet` object, this one is a bit longer than the others but not more complex. ```scala +import monix.connect.redis.RedisSet + //given three keys and two redis set of values val k1: K val m1: Set[String] @@ -177,10 +185,12 @@ diff should contain theSameElementsAs List(m1.head) ### __SortedSets__ -The following example uses the redis sorted sets api `monix.connect.redis.RedisSortedSet` to insert three scored elements into a redis sorted set, +The following example uses the redis sorted sets api from `RedisSortedSet` to insert three scored elements into a redis sorted set, incrementing the middle one and then check that the scores are correctly reflected: ```scala +import monix.connect.redis.RedisSortedSet + //given val k: String = "randomKey" val v0: String = "v0" @@ -192,7 +202,6 @@ val maxScore: Double = 4 val increment: Double = 2 //when -RedisSortedSet.zadd(k, minScore, v0) val t: Task[(ScoredValue[String], ScoredValue[String])] = for { _ <- RedisSortedSet.zadd(k, minScore, v0) _ <- RedisSortedSet.zadd(k, middleScore, v1) @@ -202,7 +211,7 @@ val t: Task[(ScoredValue[String], ScoredValue[String])] = for { max <- RedisSortedSet.zpopmax(k) } yield (min, max) -//then we can assume that: +//then we can confirm that: val (min, max) = t.runSyncUnsafe() min.getScore shouldBe minScore min.getValue shouldBe v0 @@ -211,13 +220,15 @@ max.getValue shouldBe v1 ``` ### __Streams__ -Example coming soon. +Coming soon. ### __Strings__ - The following example uses the redis keys api `monix.connect.redis.RedisString` to insert a string into the given key and get its size from redis + The following example uses the redis keys api from `RedisString` to insert a string into the given key and get its size from redis ```scala +import monix.connect.redis.RedisString + val ts: Task[Long] = for { _ <- RedisString.set(key, value).runSyncUnsafe() size <- RedisString.strlen(key) @@ -227,7 +238,7 @@ ts.runToFuture() //eventually will return a failure if there was a redis server ### __All in one__ - See below a complete demonstration on how to compose a different set of Redis commands from different + See below a complete demonstration on how to compose a different set of redis commands from different modules in the same for comprehension: ```scala @@ -243,7 +254,7 @@ val k2: K val values: List[V] val k3: K -val (v: String, len: Long, l: List[V], keys: List[K]) = { +val t: Task[String, Long, List[V], List[K]] = { for { _ <- Redis.flushallAsync() //removes all keys _ <- Redis.touch(k1) //creates the `k1` @@ -257,9 +268,10 @@ val (v: String, len: Long, l: List[V], keys: List[K]) = { l <- Redis.lrange(k3, 0, len).toListL //this is not safe unless you have a reasonable limit keys <- Redis.keys("*").toListL //get all the keys } yield (v, len, l, keys) -}.runSyncUnsafe() // this is unsafe, and only used for testing purposes +} //after this comprehnsion of redis operations it can be confirmed that: +val (v: String, len: Long, l: List[V], keys: List[K]) = t.runSyncUnsafe() // this is unsafe, and only used for testing purposes v shouldBe value len shouldBe values.size + 1 l should contain theSameElementsAs value :: values @@ -269,9 +281,9 @@ keys.head shouldBe k3 ## Local testing -The local tests will be use the [redis docker image](https://hub.docker.com/_/redis/). +The local tests will use the [redis docker image](https://hub.docker.com/_/redis/) from docker hub. -Add the following service description to your `docker-compose.yaml` file: +Add the following service description to your `docker-compose.yml` file: ```yaml redis: @@ -280,15 +292,16 @@ Add the following service description to your `docker-compose.yaml` file: - 6379:6379 ``` -Run the following command to build, and start the redis server: +Run the following command to build and start the redis server: ```shell script -docker-compose -f docker-compose.yml up -d redis +docker-compose -f ./docker-compose.yml up -d redis ``` Check out that the service has started correctly. -Finally, as you might have seen in some of the examples, you can create the redis connection just by: +Finally, following code shows how you can create the redis connection to the local server, but +you would have to modify that to fit your use case - i.e it will be different to connect to a redis cluster or if authenticating to the server is needed using with key and secret, etc.) ```scala val redisClient: RedisClient = RedisClient.create("redis://host:port") @@ -296,4 +309,4 @@ implicit val connection: StatefulRedisConnection[String, String] = redisClient.c ``` And now you are ready to run your application! -_Note that the above example defines the client as `implicit`, since it is how the api will expect it._ +_Note that the above example defines the `connection` as `implicit`, since it is how the api will expect it._ diff --git a/docs/s3.md b/docs/s3.md index 17b81abaa..1653097ff 100644 --- a/docs/s3.md +++ b/docs/s3.md @@ -5,12 +5,8 @@ title: AWS S3 ## Introduction -The object storage service that offers industry leading scalability, availability, security and performance. +_AWS Simple Storage Service_ ([S3](https://aws.amazon.com/s3/?nc=sn&loc=0)) is an object storage service that offers industry leading scalability, availability, security and performance. It allows data storage of any amount of data, commonly used as a data lake for big data applications which can now be easily integrated with monix. - - The module has been implemented using the `S3AsyncClient` since it only exposes non blocking methods. - Therefore, all of the monix s3 methods defined in the `S3` object would expect an implicit instance of - this class to be in the scope of the call. ## Dependency @@ -22,13 +18,19 @@ It allows data storage of any amount of data, commonly used as a data lake for b ## Getting started - First thing is to create the s3 client that will allow us to authenticate and create an channel between our + + First of all, we need to create the s3 client that will allow us to authenticate and create an channel between our application and the AWS S3 service. - ### Connection - So the below code shows an example on how to set up this connection. Note that in this case - the authentication is done thorugh AWS S3 using access and secret keys, - but you might use another method such as IAM roles. + ### asyncClient + + + This module has been implemented using the `S3AsyncClient` from the [aws sdk](https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/s3/model/package-summary.html), since it only exposes non blocking methods. + So, all of the methods exposed under `monix.connect.s3.S3` object would expect an implicit instance of + this async client class to be in the scope of the call. + + Below code shows an example on how to set up this connection. + Note that in this case the authentication is using AWS access and secret keys, but you might use another method such an _IAM_ role. ```scala import java.net.URI @@ -39,80 +41,96 @@ import software.amazon.awssdk.regions.Region.AWS_GLOBAL val basicAWSCredentials: AwsBasicCredentials = AwsBasicCredentials.create(s3AccessKey, s3SecretKey) val credentialsProvider: StaticCredentialsProvider = StaticCredentialsProvider.create(basicAWSCredentials) -// Note that the client is defined as implicit, this is on purpose since each of the methods defined in -// the monix s3 connector will expect that. +//note that the client is defined as implicit, this is on purpose since each of the methods defined in +//the monix s3 connector will expect that. implicit val s3Client: S3AsyncClient = S3AsyncClient .builder() .credentialsProvider(credentialsProvider) .region(AWS_GLOBAL) - .endpointOverride(URI.create(endPoint))//this one is used to point to the localhost s3 service, not used in prod + .endpointOverride(URI.create(endPoint)) //this one is used to point to the localhost s3 service, not used in prod .build ``` - ### Create and delete -Once we have configured the s3 client, let's start with the basic operations to _create_ and _delete_ buckets: + ### createBucket + +Once you have configured the client, you would probably need to create a bucket: + ```scala -import software.amazon.awssdk.services.s3.model.{CreateBucketResponse, DeleteBucketResponse} +import software.amazon.awssdk.services.s3.model.CreateBucketResponse val bucketName: String = "myBucket" -val _: Task[CreateBucketResponse] = S3.createBucket(bucketName) -val _: Task[DeleteBucketResponse] = S3.deleteBucket(bucketName) +val t: Task[CreateBucketResponse] = S3.createBucket(bucketName) ``` + ### deleteBucket - ### Delete object -You can also operate at object level within a bucket with: + On the other hand if you want to remove the bucket: + ```scala -import software.amazon.awssdk.services.s3.model.{DeleteObjectResponse, ListObjectsResponse} +import software.amazon.awssdk.services.s3.model.DeleteBucketResponse val bucketName: String = "myBucket" -val _: Task[DeleteObjectResponse] = S3.deleteObject(bucketName) -val _: Task[ListObjectsResponse] = S3.listObjects(bucketName) +val t: Task[DeleteBucketResponse] = S3.deleteBucket(bucketName) ``` - ### List objects + ### putObject + +You can also easily create and write into an S3 object with put object operation. +Note that if you need to update large amount of data you should not be using this method, see instead [multipartUpload](###m)`. + + ```scala +import software.amazon.awssdk.services.s3.model.PutObjectResponse + +val content: Array[Byte] = "file content".getBytes() +val t: Task[PutObjectResponse] = S3.putObject(bucketName, objectKey, content) +} +``` + ### deleteObject + +You can also operate at object level within a bucket with: ```scala import software.amazon.awssdk.services.s3.model.{DeleteObjectResponse, ListObjectsResponse} -val bucketName: String = "myBucket" -val _: Task[DeleteObjectResponse] = S3.deleteObject(bucketName) -val _: Task[ListObjectsResponse] = S3.listObjects(bucketName) +val t: Task[DeleteObjectResponse] = S3.deleteObject(bucketName) ``` - ### Get object + ### listObject + +Lists all the objects within a bucket: ```scala -val objectKey: String = "/object/file.txt" -val _: Task[Array[Byte]] = S3.getObject(bucketName, objectKey) -} +import software.amazon.awssdk.services.s3.model.{DeleteObjectResponse, ListObjectsResponse} + +val _: Task[ListObjectsResponse] = S3.listObjects(bucketName) ``` - ### Put object + ### getObject + +Download the given S3 object as a single byte array. +Note that this operation is dangerous to perform on large objects, `multipartDownload` would be supported in future releases to +support those use cases. -On the other hand, to put objects: ```scala -import software.amazon.awssdk.services.s3.model.PutObjectResponse +val objectKey: String = "/object/file.txt" +val _: Task[Array[Byte]] = S3.getObject(bucketName, objectKey) -val content: Array[Byte] = "file content".getBytes() -val _: Task[PutObjectResponse] = S3.putObject(bucketName, objectKey, content) -} ``` -### Multipart update +### multipartUpload -Finally, for dealing with large files of data you might want to use the `multipartUpload` consumer. -This one consumes an observable and synchronously makes partial uploads of the incoming chunks. +Finally, for dealing with large files of data you might want to use the `multipartUpload` implementation. +This one can be used to consume an observable of bytes that would send a partial upload request for each received element if it was bigger than the minimum size, otherwise the chunk will be concatenated on the next request. -Thus, it reduces substantially the risk on having jvm overhead errors or getting http requests failures, -since the whole file does not need to be allocated in the memory and the http request body won't be that big. +Thus, it reduces substantially the risk on having _OOM_ errors or getting http requests failures, +since the whole file does not need to be allocated in the memory and the http request body won't be that big because it would have been done by parts. -The partial uploads can be fine tuned by the minimum chunksize that will be sent, being 5MB the default minimum size (equally as an integer value of 5242880). +Note that the method can be tuned with specific aws configurations such as `acl's`, `requestPayer`, etc. But a very important one to have present is by the minimum `chunksize` that will be sent, being 5MB the default and minimum size (lower values would result in failure). ```scala import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse -// given an strem of chunks (Array[Byte]) +// given an stream of array bytes val ob: Observable[Array[Byte]] = Observable.fromIterable(chunks) // and a multipart upload consumer @@ -125,7 +143,7 @@ ob.fromIterable(chunks).consumeWith(multipartUploadConsumer) ## Local testing -For AWS S3 local testing we went with [minio](https://github.com/minio/minio) instead of localstack, since we found an [issue](https://github.com/localstack/localstack/issues/538) that can block you on writing your functional tests. +In order test `AWS S3` localy, we used a docker image provided by [minio](https://github.com/minio/minio). Add the following service description to your `docker-compose.yaml` file: @@ -147,13 +165,13 @@ minio: command: server --compat /data ``` -Run the following command to build, and start the redis server: +Then, run the following command to build and start the S3 service: ```shell script docker-compose -f docker-compose.yml up -d minio ``` Check out that the service has started correctly, notice that a healthcheck has been defined on the description of the minio service, -that's because minio s3 is a very heavy image and sometimes it takes too long to be set up or sometime it even fails, so that would prevent those cases. +that's because it is a heavy image and sometimes it takes bit long to start or sometimes it even fails, so that would prevent that. Finally, create the connection with AWS S3, note that minio does not has support for `AnonymousCredentialsProvider`, therefore you'll have to use `AwsBasicCredentials`, in which the _key_ and _secret_ will correspond respectively to the @@ -166,8 +184,8 @@ import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCrede val minioEndPoint: String = "http://localhost:9000" -val s3AccessKey: String = "TESTKEY" //see docker minio env var `MINIO_ACCESS_KEY` -val s3SecretKey: String = "TESTSECRET" //see docker minio env var `MINIO_SECRET_KEY` +val s3AccessKey: String = "TESTKEY" //has to be equal to the `MINIO_ACCESS_KEY` env var value +val s3SecretKey: String = "TESTSECRET" //has to be equal to the `MINIO_SECRET_KEY` env var value val basicAWSCredentials = AwsBasicCredentials.create(s3AccessKey, s3SecretKey) implicit val s3AsyncClient: S3AsyncClient = S3AsyncClient @@ -176,7 +194,7 @@ implicit val s3AsyncClient: S3AsyncClient = S3AsyncClient .region(AWS_GLOBAL) .endpointOverride(URI.create(minioEndPoint)) .build -``` +``` Now you are ready to run your application! diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 86596fd9c..d1f037cfc 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -1,4 +1,3 @@ -import sbt.Keys.sourceManaged import sbt._ object Dependencies { @@ -29,8 +28,6 @@ object Dependencies { "io.monix" %% "monix-reactive" % DependencyVersions.Monix, "org.scala-lang.modules" %% "scala-collection-compat" % "2.1.6", //todo use as replacement for `collection.JavaConverters` "org.scala-lang.modules" %% "scala-java8-compat" % "0.9.1" - // "com.typesafe.scala-logging" %% "scala-logging" % "3.9.2", - //"org.slf4j" % "log4j-over-slf4j" % "1.7.30" ) private val CommonTestDependencies = Seq( diff --git a/website/i18n/en.json b/website/i18n/en.json index f497e3bbb..c6f2e6611 100644 --- a/website/i18n/en.json +++ b/website/i18n/en.json @@ -5,6 +5,9 @@ "previous": "Previous", "tagline": "A set of connectors and stream integrations for Monix.", "docs": { + "overview": { + "title": "Overview" + }, "akka": { "title": "Akka Streams" }, @@ -17,11 +20,8 @@ "hdfs": { "title": "HDFS" }, - "overview": { - "title": "Overview" - }, "parquet": { - "title": "Apache Parquet" + "title": "Parquet" }, "redis": { "title": "Redis" diff --git a/website/pages/en/index.js b/website/pages/en/index.js index bb5ef86eb..bd79db5b0 100644 --- a/website/pages/en/index.js +++ b/website/pages/en/index.js @@ -107,7 +107,7 @@ const index = ` {[ { - image: `${baseUrl}img/redis-1.png`, + image: `${baseUrl}img/redis.png`, imageAlign: 'right', } , diff --git a/website/sidebars.json b/website/sidebars.json index 0627fd00a..c0c811f15 100644 --- a/website/sidebars.json +++ b/website/sidebars.json @@ -1,5 +1,5 @@ { "docs": { - "Documentation": ["overview", "akka", "dynamodb", "hdfs", "parquet", "redis", "s3"] + "Documentation": ["overview", "akka", "dynamodb", "s3", "hdfs", "parquet", "redis"] } } \ No newline at end of file diff --git a/website/static/img/parquet.png b/website/static/img/parquet.png deleted file mode 100644 index c1914e57d..000000000 Binary files a/website/static/img/parquet.png and /dev/null differ diff --git a/website/static/img/redis-1.png b/website/static/img/redis.png similarity index 100% rename from website/static/img/redis-1.png rename to website/static/img/redis.png