Skip to content

Commit

Permalink
Refined documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
paualarco committed Jun 4, 2020
1 parent add95b7 commit b8a2fa4
Show file tree
Hide file tree
Showing 18 changed files with 324 additions and 260 deletions.
7 changes: 7 additions & 0 deletions .github/scripts/create-web-site.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/usr/bin/env bash

set -e

echo "GIT_DEPLOY_KEY: $GIT_DEPLOY_KEY"

sbt docs/docusaurusPublishGhpages
26 changes: 7 additions & 19 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,24 +61,7 @@ jobs:

- name: Run Mima binary compatibility test for Java ${{ matrix.java }} and Scala ${{ matrix.scala }}
run: sbt mimaReportBinaryIssues

publish-page:
name: Publish documentation page
runs-on: ubuntu-latest
strategy:
matrix:
java: [ 8 ]
scala: [ 2.12.11 ]

steps:
- uses: actions/checkout@v2
- uses: olafurpg/setup-scala@v7
with:
java-version: "adopt@1.${{ matrix.java }}"

- name: Publishes documentation page
run: sbt docs/docusaurusPublishGhpages


publish:
name: Publish
needs: [ tests, mima ]
Expand All @@ -88,11 +71,16 @@ jobs:
- uses: actions/checkout@v1
- uses: olafurpg/setup-scala@v2
- uses: olafurpg/setup-gpg@v2
- name: Publish ${{ github.ref }}
- name: Publish release ${{ github.ref }}
run: sbt ci-release
env:
PGP_PASSPHRASE: ${{ secrets.PGP_PASSPHRASE }}
PGP_SECRET: ${{ secrets.PGP_SECRET }}
SONATYPE_PASSWORD: ${{ secrets.SONATYPE_PASSWORD }}
SONATYPE_USERNAME: ${{ secrets.SONATYPE_USERNAME }}
- name: Publis documentation web page
run: |
./.github/scripts/create-web-site.sh
env:
GIT_DEPLOY_KEY: ${{ secrets.GIT_DEPLOY_KEY }}

File renamed without changes.
39 changes: 13 additions & 26 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,49 +1,36 @@
# Monix Connect



[![release-badge][]][release] [![workflow-badge][]][workflow]
[![Gitter](https://badges.gitter.im/monix/monix-connect.svg)](https://gitter.im/monix/monix-connect?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge)
[![Scala Steward badge](https://img.shields.io/badge/Scala_Steward-helping-blue.svg?style=flat&logo=data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAA4AAAAQCAMAAAARSr4IAAAAVFBMVEUAAACHjojlOy5NWlrKzcYRKjGFjIbp293YycuLa3pYY2LSqql4f3pCUFTgSjNodYRmcXUsPD/NTTbjRS+2jomhgnzNc223cGvZS0HaSD0XLjbaSjElhIr+AAAAAXRSTlMAQObYZgAAAHlJREFUCNdNyosOwyAIhWHAQS1Vt7a77/3fcxxdmv0xwmckutAR1nkm4ggbyEcg/wWmlGLDAA3oL50xi6fk5ffZ3E2E3QfZDCcCN2YtbEWZt+Drc6u6rlqv7Uk0LdKqqr5rk2UCRXOk0vmQKGfc94nOJyQjouF9H/wCc9gECEYfONoAAAAASUVORK5CYII=)](https://scala-steward.org)

[workflow]: https://github.com/monix/monix-connect/actions?query=branch%3Amaster+workflow%3Abuild
[workflow-badge]: https://github.com/monix/monix-connect/workflows/build/badge.svg


[release]: https://search.maven.org/search?q=a:monix-connect*
[release-badge]: https://img.shields.io/github/v/tag/monix/monix-connect.svg

_Warning:_ Mind that the project is yet in early stages and its API is likely to be changed.
⚠️ _Mind that the project isn't yet stable, so binary compatibility is not guaranteed._

[Monix Connect]() is an initiative to implement stream integrations for [Monix](https://monix.io/).

The `connector` describes a connection between the application and a specific data point, which could be a file, a database or any system in which the appication
can interact by sending or receiving information.
Therefore, the aim of this project is to implement the most common
connections that users could need when developing reactive applications with Monix, these would basically reduce boilerplate code and furthermore will let users to greatly save time and complexity in their implementing projects.

The latest stable version of `monix-connect` is compatible with Monix 3.x, Scala 2.12.x and 2.13.x, you can import
all of the connectors by adding the following dependency (find and fill your release [version](https://github.com/monix/monix-connect/releases)):

```scala
libraryDependencies += "io.monix" %% "monix-connect" % "VERSION"
```
Monix Connect is an initiative to implement stream integrations for [Monix](https://monix.io/).

But you can also only include to a specific connector to your library dependencies, see below how to do so and how to get started with each of the available [connectors](#Connectors).
Learn more on how to get started in the [documentation page](https://monix.github.io/monix-connect/).

### Connectors
1. [Akka](#Akka)
2. [DynamoDB](#DynamoDB)
3. [Hdfs](#HDFS)
4. [Parquet](#Parquet)
5. [Redis](#Redis)
6. [S3](#S3)
The below list comprehends the current set of connectors that are available to use.
1. [Akka](https://monix.github.io/monix-connect/docs/akka)
2. [DynamoDB](https://monix.github.io/monix-connect/docs/dynamodb)
3. [Hdfs](https://monix.github.io/monix-connect/docs/hdfs)
4. [Parquet](https://monix.github.io/monix-connect/docs/parquet)
5. [Redis](https://monix.github.io/monix-connect/docs/redis)
6. [S3](https://monix.github.io/monix-connect/docs/s3)

### Contributing

The Monix Connect project welcomes contributions from anybody wishing to
participate. All code or documentation that is provided must be
licensed with the same license that Monix Connect is licensed with (Apache
2.0, see LICENSE.txt).
2.0, see [LICENCE](./LICENSE)).

People are expected to follow the
[Scala Code of Conduct](./CODE_OF_CONDUCT.md) when
Expand All @@ -57,5 +44,5 @@ gladly accepted. For more information, check out the
## License

All code in this repository is licensed under the Apache License,
Version 2.0. See [LICENCE.txt](./LICENSE.txt).
Version 2.0. See [LICENCE](./LICENSE).

28 changes: 24 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,27 @@ lazy val sharedSettings = Seq(
publishArtifact in Test := false,
pomIncludeRepository := { _ => false }, // removes optional dependencies

// ScalaDoc settings
autoAPIMappings := true,
apiURL := Some(url("https://monix.github.io/monix-connect/api/")),
apiMappings ++= {
val cp: Seq[Attributed[File]] = (fullClasspath in Compile).value
def findManagedDependency(organization: String, name: String): File = {
( for {
entry <- cp
module <- entry.get(moduleID.key)
if module.organization == organization
if module.name.startsWith(name)
} yield entry.data
).head
}
Map(
findManagedDependency("io.monix","monix-execution") -> url("https://monix.io/api/3.1/"),
findManagedDependency("io.monix","monix-catnap") -> url("https://monix.io/api/3.1/"),
findManagedDependency("org.typelevel","cats-effect") -> url("https://typelevel.org/cats-effect/api/")
)
},

licenses := Seq("APL2" -> url("http://www.apache.org/licenses/LICENSE-2.0.txt")),
//homepage := Some(url("https://monix.io")), //todo homepage settings
headerLicense := Some(HeaderLicense.Custom(
Expand Down Expand Up @@ -104,6 +125,8 @@ lazy val sharedSettings = Seq(
doctestTestFramework := DoctestTestFramework.ScalaTest,
doctestTestFramework := DoctestTestFramework.ScalaCheck,
doctestOnlyCodeBlocksMode := true


)

def mimaSettings(projectName: String) = Seq(
Expand Down Expand Up @@ -144,12 +167,9 @@ lazy val monix = (project in file("."))
.settings(name := "monix-connect")
.aggregate(akka, dynamodb, hdfs, parquet, redis, s3)
.dependsOn(akka, dynamodb, hdfs, parquet, redis, s3)
//.settings(unidocSettings) //todo enable unidoc settings
//.enablePlugins(ScalaUnidocPlugin)

lazy val akka = monixConnector("akka", Dependencies.Akka)


lazy val dynamodb = monixConnector("dynamodb", Dependencies.DynamoDb)

lazy val hdfs = monixConnector("hdfs", Dependencies.Hdfs)
Expand Down Expand Up @@ -230,7 +250,7 @@ lazy val mdocSettings = Seq(
scalacOptions in (ScalaUnidoc, unidoc) ++= Seq(
"-doc-source-url", s"https://github.com/monix/monix-connect/tree/v${version.value}€{FILE_PATH}.scala",
"-sourcepath", baseDirectory.in(LocalRootProject).value.getAbsolutePath,
"-doc-title", "Monix BIO",
"-doc-title", "Monix Connect",
"-doc-version", s"v${version.value}",
"-groups"
),
Expand Down
72 changes: 41 additions & 31 deletions docs/akka.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,26 @@ title: Akka Streams

This module makes interoperability with akka streams easier by simply defining implicit extended classes for reactive stream conversions between akka and monix.

The three main core abstractions defined in akka streams are _Source_, _Flow_ and _Sink_,
they were designed following the JVM [reactive streams](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md) standards,
meaning that under it's design they implement the `Publisher` and `Subscriber` contract.
Which at the end it means that they can interoperate with other libraries that also implements the reactive stream specification, such as [Monix Reactive](https://monix.io/api/3.2/monix/reactive/index.html).
So this module aims to provide a nice interoperability between these two reactive streams libraries.

In order to achieve it, this module will provide an extended conversion method for each of the stream abstractions mentioned before.
The [reactive streams](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md) specification is an standard to follow for those
JVM libraries that aims to provide asynchronous stream processing with non-blocking back pressure.
As you might probably know, both _Akka Streams_ and _[Monix Reactive](https://monix.io/api/3.2/monix/reactive/index.html)_ follows the same standard, meaning that it is possible to convert between
their own different stream data types (since both implement the `Publisher` and `Subscriber` contract).

So this module aims to provide a nice and easy inter-operability between the two mentioned libraries, and to
achieve that it provides extended conversion methods.
These implicit extended methods can be imported from: `monix.connect.akka.stream.Converters._`.
Therefore, under the scope of the import, the signatures `.asObservable` and `.asConsumer` will be available for the `Source`, `Flow`, and `Sink`.
Therefore, under the scope of the import, the signatures `.asObservable` and `.asConsumer` will be available from `Source`, `Flow`, and `Sink` instances,
whereas `asSource` and `asSink` would be for the monix `Observable` and `Consumer`

The below table shows in more detail the specs for the conversion from akka stremas to monix:
The below table shows that in more detail:

| _Akka_ | _Monix_ | _Akka &rarr; Monix_ | _Akka &larr; Monix_ |
| :---: | :---: | :---: | :---: |
| _Source[+In, +Mat]_ | _Observable[+In]_ | `source.asObservable[In]` | `observable.asSource[In]` |
| _Flow[-In, +Out, +Mat]_ | _Consumer[-In, +Out]_ | `flow.asConsumer[Out]` | - |
| _Sink[-In, +Out <: Future[Mat]]_ | _Consumer[-In, +Mat]_ | `sink.asConsumer[Mat]` | `consumer.asSink[In]` |

Note that two methods does not need to be typed as it has been done explicitly in the example table, the compiler will infer it for you.
Note that when calling the methods it is not needed to pass the _type parameter_ (as it has been explicitly indicated in the example table), the compiler will infer it for you.

Also, in order to perform these conversion it is required to have an implicit instance of `akka.stream.Materializer` and `monix.execution.Scheduler` in the scope.

Expand All @@ -38,39 +39,39 @@ libraryDependencies += "io.monix" %% "monix-akka" % "0.1.0"

## Getting started

The following code shows how these implicits can be initialized, but `Scheduler` and `ActorSystem` can be initialized differently and configured differently depending on the use case.
The only three things we will need to perform these conversions would be the implicit conversion, and an instance of the monix `Scheduler` and the akka `ActorMaterializer`.
```scala
import monix.connect.akka.stream.Converters._
import monix.execution.Scheduler.Implicits.global

val actorSystem: ActorSystem = ActorSystem("Akka-Streams-InterOp")
implicit val materializer = ActorMaterializer()
val actorSystem: ActorSystem = ActorSystem("Akka-Streams-InterOp")
implicit val materializer = ActorMaterializer() //setting actorSystem as implicit variable might have ben enough
```

### Akka &rarr; Monix

#### asObservable

Let's see an example for converting an `Source[+In, +Mat]` to `Observable[+In]`:
Let's see how easy can be converting an `Source[+In, +Mat]` to `Observable[+In]`:

```scala
//given
val elements = 1 until 50
val source: Source[Int] = Source.from(1 until 50)

//when
val ob: Observable[Int] = Source.from(elements).asObservable //`asObservable` converter as extended method of source.
val ob: Observable[Int] = source.asObservable //`asObservable` converter as extended method of source.

//then
//then
ob.toListL.runSyncUnsafe() should contain theSameElementsAs elements
```

In this case we have not needed to consume the `Observable` since we directly used an operator that collects
to a list `.toList`, but note that in case you need to use an specific consumer, you can also directly call `consumeWith`, as a shortcut for `source.asObservable.consumeWith(consumer)`, see an example below:

```scala
//given the same `elements` and `source` as above example`
//given the same `source` as the above example`

//then
//when
val t: Task[List[Int]] = source.consumeWith(Consumer.toList) //`consumeWith` as extended method of `Source`

//then
Expand All @@ -80,16 +81,16 @@ t.runSyncUnsafe() should contain theSameElementsAs elements
#### asConsumer

On the other hand, see how to convert an `Sink[-In, +Out <: Future[Mat]]` into a `Consumer[+In, +Mat]`.

```scala
//given
val foldSumSink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)((acc, num) => acc + num)

//when
val (consumer: Consumer[Int, Int]) = foldSumSink.asConsumer[Int] //`asConsumer` as an extended method of `Sink`
val t: Task[Int] = Observable.fromIterable(Seq(1, 2, 3)).consumeWith(consumer)
val consumer: Consumer[Int, Int] = foldSumSink.asConsumer[Int] //`asConsumer` as an extended method of `Sink`

//then the future value of `t` should be 6
//then
val t: Task[Int] = Observable.fromIterable(Seq(1, 2, 3)).consumeWith(consumer)
t.runSyncUnsafe() should be 6
```

Finally, you can also convert `Flow[-In, +Out, +Mat]` into `Consumer[+In, +Out]` in the same way you did with `Sink`
Expand All @@ -103,7 +104,7 @@ val foldSumFlow: Flow[Int, Int, NotUsed] = Flow[Int].fold[Int](0)((acc, num) =>
val (consumer: Consumer[Int, Int]) = foldSumFlow.asConsumer //`asConsumer` as an extended method of `Flow`
val t: Task[Int] = Observable.fromIterable(Seq(1, 2, 3)).consumeWith(consumer)

//then the future value of `t` should be 6
t.runSyncUnsafe() should be 6
```

Notice that this interoperability would allow the Monix user to take advantage of the already pre built integrations
Expand All @@ -115,19 +116,28 @@ from [Alpakka](https://doc.akka.io/docs/alpakka/current/index.html) or any other
On the other hand, for converting from Monix `Observable[+In]` to Akka Streams `Source[+In, NotUsed]` we would use the conversion signature `asSource`.

```scala
import monix.connect.akka.stream.Converters._
val f: Future[Seq[Long]] = Observable.range(1, 100).asSource.runWith(Sink.seq)
//eventualy will return a sequence from 1 to 100
//given
val ob: Observable[Int] = Observable.range(1, 100)

//when
val f: Future[Seq[Long]] = ob.asSource.runWith(Sink.seq)

//then eventualy will return a sequence from 1 to 100
```

#### asConsumer

Finally, for converting from `Sink[-In, +Out <: Future[Mat]]` to `Consumer[-In, +Mat]`.
Finally, use `asConsumer` for converting from `Sink[-In, +Out <: Future[Mat]]` to `Consumer[-In, +Mat]`.

```scala
//given
import monix.connect.akka.stream.Converters._
val sink: Sink[Int, Future[String]] = Sink.fold[String, Int]("")((s, i) => s + i.toString)
val t: Task[String] = Observable.fromIterable(1 until 10).consumeWith(sink.asConsumer[String])
//eventually will return "123456789"

//and
val ob: Observable[Int] = Observable.fromIterable(1 until 10)

//when
val t: Task[String] = ob.consumeWith(sink.asConsumer[String])

//then eventually will return "123456789"
```
41 changes: 26 additions & 15 deletions docs/dynamo.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,35 +30,46 @@ libraryDependencies += "io.monix" %% "monix-dynamodb" % "0.1.0"

The `DynamoDbAsyncClient` needs to be defined as `implicit` as it will be required by the _consumer_ and _tranformer_ implementations.

It is also required and additional import for bringing the implicit conversions between DynamoDB requests and `DynamoDbOp`, the upper abstraction layer will allow to exacute them all (no need to worry about that):

```scala
import monix.connect.dynamodb.DynamoDbOp._
```
It is also required and additional import `monix.connect.dynamodb.DynamoDbOp._` for bringing the implicit conversions between the specific `DynamoDBRequest` to `DynamoDbOp`, you don't have to worry about the second data type
, it is an abstraction that provides with the functionality to execute each request with its onw operation:

See below an example for transforming and consuming DynamoDb operations with monix.

###_Transformer:_
```scala
### Transformer

//this is an example of a stream that transforms and executes DynamoDb `GetItemRequests`:


```scala
import monix.connect.dynamodb.DynamoDbOp._
import software.amazon.awssdk.services.dynamodb.model.{GetItemRequest, GetItemResponse}

//presumably you will have a stream of dynamo db `GetItemRequest` requests coming inin this case of type
val dynamoDbRequests = List[GetItemRequest] = ???

val ob = Observable[Task[GetItemResponse]] = {
Observable
.fromIterable(dynamoDbRequests)
.transform(DynamoDb.transofrmer())
} //for each element transforms the get request operations into its respective get response
.transform(DynamoDb.transofrmer()) // transforms each get request operation into its respective get response
}
//the resulted observable would be of type Observable[Task[GetItemResponse]]
```

###_Consumer:_
### Consumer

An example of a stream that consumes and executes DynamoDb `PutItemRequest`s:
```scala
//this is an example of a stream that consumes and executes DynamoDb `PutItemRequest`:
val putItemRequests = List[PutItemRequests] = ???
import monix.connect.dynamodb.DynamoDbOp._
import software.amazon.awssdk.services.dynamodb.model.{PutItemRequest, PutItemResponse}

//presumably you will have a stream of dynamo db `PutItemrequest` coming in.
val putItemRequests = List[PutItemRequest] = ???

Observable
.fromIterable(dynamoDbRequests)
.consumeWith(DynamoDb.consumer()) //a safe and syncronous consumer that executes dynamodb requests
val ob = Task[PutItemResponse] = {
Observable
.fromIterable(dynamoDbRequests)
.consumeWith(DynamoDb.consumer()) // asynchronous consumer that executes put item requests
}
//the materialized value would be of type Task[PutItemResponse]
```

Expand Down
Loading

0 comments on commit b8a2fa4

Please sign in to comment.