Skip to content

Commit

Permalink
Created sink parallel execution (#624) (#917)
Browse files Browse the repository at this point in the history
Created sink parallel execution (#624) (#917)
  • Loading branch information
MattSzm authored Jun 19, 2022
1 parent 1827192 commit 4c53530
Show file tree
Hide file tree
Showing 6 changed files with 528 additions and 7 deletions.
59 changes: 57 additions & 2 deletions docs/mongodb.md
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,8 @@ connection.use(_.source.findOneAndUpdate(filter, update)).runToFuture
The `Single` and `Sink` components are documented together since they both implement exactly the same set of
operations _insert_, _delete_, _replace_ and _update_, although with the difference that the first executes the
operation in single task and the latter pipes the elements of the stream to a _Monix_ `Consumer` to execute them.
Most of the `Sink` consumers have an alternative method with the suffix `Par`, which executes requests grouped
in sequences in a parallel fashion as batch operations.

The following sub-sections represent the list of different _operations_ and _sinks_ available to use, with a small
example on each one.
Expand Down Expand Up @@ -531,12 +533,19 @@ import monix.connect.mongodb.client.CollectionOperator
import monix.eval.Task
import monix.reactive.Observable

val employees = List(Employee("Luke", "London", 41))
val employee = List(Employee("Luke", "London", 41))
val connection: Resource[Task, CollectionOperator[Employee]]
connection.use { operator =>
Observable.from(employees) // Observable[Employee]
Observable.from(employee) // Observable[Employee]
.consumeWith(operator.sink.insertOne())
}.runToFuture

val employees = List(Employee("Luke", "London", 41), Employee("Matt", "Warsaw", 23),
Employee("Jack", "Berlin", 24), Employee("John", "London", 41))
connection.use { operator =>
Observable.from(employees.grouped(2).toList) // Observable[List[Employee]]
.consumeWith(operator.sink.insertOnePar())
}.runToFuture
```

#### insertMany
Expand Down Expand Up @@ -615,6 +624,13 @@ connection.use { operator =>
.map(name => Filters.eq("name", name)) // Observable[Bson]
.consumeWith(operator.sink.deleteOne())
}.runToFuture

connection.use { operator =>
// employees to be deleted from the collection
Observable.from(List("Lauren", "Marie", "Simon", "Matt").grouped(2).toList) // Observable[List[String]]
.map(names => names.map(name => Filters.eq("name", name))) // Observable[List[Bson]]
.consumeWith(operator.sink.deleteOnePar())
}.runToFuture
```

#### deleteMany
Expand Down Expand Up @@ -653,6 +669,13 @@ connection.use { operator =>
.map(city => Filters.eq("city", city)) // Observable[Bson]
.consumeWith(operator.sink.deleteMany())
}.runToFuture

connection.use { operator =>
// all employees from `Germany`, `Italy` or `Turkey` were fired (deleted)
Observable.from(List("Germany", "Italy", "Turkey").grouped(2).toList) // Observable[List[String]]
.map(cities => cities.map(city => Filters.eq("city", city))) // Observable[List[Bson]]
.consumeWith(operator.sink.deleteManyPar())
}.runToFuture
```

### Replace
Expand Down Expand Up @@ -699,6 +722,18 @@ connection.use { operator =>
Observable.from(replacements) // Observable[(Bson, Employee)]
.consumeWith(operator.sink.replaceOne())
}.runToFuture

val replacements2: Seq[(Bson, Employee)] =
List(
(Filters.eq("name", "Employee1"), Employee("Employee5", 43, "Rio")),
(Filters.eq("name", "Employee2"), Employee("Employee6", 37, "Rio")),
(Filters.eq("name", "Employee3"), Employee("Employee7", 43, "Rio")),
(Filters.eq("name", "Employee4"), Employee("Employee8", 37, "Rio"))
)
connection.use { operator =>
Observable.from(replacements2.grouped(2).toList) // Observable[List[(Bson, Employee)]]
.consumeWith(operator.sink.replaceOnePar())
}.runToFuture
```

### Update
Expand Down Expand Up @@ -743,6 +778,16 @@ connection.use { operator =>
Observable.from(updateElements) // Observable[(Bson, Bson)]
.consumeWith(operator.sink.updateOne())
}.runToFuture

connection.use { operator =>
val filter = Filters.eq("city", "Porto")
val update = Updates.set("city", "Lisbon")
val updateElements: List[List[(Bson, Bson)]] = List.fill(4)((filter, update)).grouped(2).toList
// imagine that a company wants to send four of its employees from Porto to Lisbon
Observable.from(updateElements) // Observable[List[(Bson, Bson)]]
.consumeWith(operator.sink.updateOnePar())
}.runToFuture

```

#### updateMany
Expand Down Expand Up @@ -781,6 +826,16 @@ connection.use { operator =>
.map(city => (Filters.eq("city", city), Updates.pull("hobbies", "Table Tennis"))) // Observable[(Bson, Bson)]
.consumeWith(operator.sink.updateMany())
}.runToFuture

connection.use { operator =>
val cities: Set[String] = Set("Seattle", "Nairobi", "Dakar")
Observable.from(cities.grouped(2).toList) // Observable[String]
.map(cities => cities.map(city => (
Filters.eq("city", city),
Updates.pull("hobbies", "Table Tennis")
)).toList) // Observable[List[(Bson, Bson)]]
.consumeWith(operator.sink.updateManyPar())
}.runToFuture
```

### Indexing
Expand Down
117 changes: 117 additions & 0 deletions mongodb/src/it/scala/monix/connect/mongodb/MongoSinkSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,23 @@ class MongoSinkSuite extends AsyncFlatSpec with MonixTaskSpec with Fixture with
.asserting(_ should contain theSameElementsAs e2)
}

"deleteOnePar" should "delete single elements by grouped filters, the single group of filters is executed " +
"at once in parallel" in {
val e1 = Gen.nonEmptyListOf(genEmployee).sample.get
val e2 = Gen.nonEmptyListOf(genEmployee).sample.get
val e3 = Gen.nonEmptyListOf(genEmployee).sample.get

val employeesColRef = randomEmployeesMongoCol
MongoSingle.insertMany(employeesColRef, e1 ++ e2 ++ e3) *>
Observable
.from(List(e1, e2))
.map(es => es.map(e => Filters.eq("name", e.name)))
.consumeWith(MongoSink.deleteOnePar(employeesColRef)) *>
MongoSource.findAll(employeesColRef)
.toListL
.asserting(_ should contain theSameElementsAs e3)
}

"deleteMany" should "delete multiple documents per each emitted filter" in {
val germans = genEmployeesWith(city = Some("Munich")).sample.get
val italians = genEmployeesWith(city = Some("Rome")).sample.get
Expand All @@ -63,6 +80,24 @@ class MongoSinkSuite extends AsyncFlatSpec with MonixTaskSpec with Fixture with
.asserting(_ should contain theSameElementsAs egyptians)
}

"deleteManyPar" should "delete multiple documents per each emitted grouped filters, the single group " +
"of filters is executed at once in parallel" in {
val germans = genEmployeesWith(city = Some("Munich")).sample.get
val italians = genEmployeesWith(city = Some("Rome")).sample.get
val turks = genEmployeesWith(city = Some("Istanbul")).sample.get
val egyptians = genEmployeesWith(city = Some("El Caire")).sample.get
val employeesCol = randomEmployeesMongoCol

MongoSingle.insertMany(employeesCol, germans ++ italians ++ turks ++ egyptians) *>
Observable
.from(List(List("Munich", "Rome", "Istanbul")))
.map(cityList => cityList.map(city => Filters.eq("city", city)))
.consumeWith(MongoSink.deleteManyPar(employeesCol)) *>
MongoSource.findAll(employeesCol)
.toListL
.asserting(_ should contain theSameElementsAs egyptians)
}

"insertOne" should "insert a single document per each received element" in {
val employees = Gen.nonEmptyListOf(genEmployee).sample.get
MongoConnection
Expand All @@ -80,6 +115,16 @@ class MongoSinkSuite extends AsyncFlatSpec with MonixTaskSpec with Fixture with
}.asserting(_ shouldBe 0L)
}

"insertOnePar" should "insert a single document per each received element from a group where " +
"elements from a single group are inserted at once in parallel" in {
val employees = Gen.listOfN(15, genEmployee).sample.get.grouped(3).toList
MongoConnection
.create1(mongoEndpoint, randomEmployeesColRef).use { operator =>
Observable.from(employees).consumeWith(operator.sink.insertOnePar()) *>
operator.source.findAll.toListL
}.asserting(_ should contain theSameElementsAs employees.flatten)
}

"insertMany" should "insert documents in batches" in {
val n = 20
val employees = Gen.listOfN(n, genEmployee).sample.get
Expand Down Expand Up @@ -115,6 +160,29 @@ class MongoSinkSuite extends AsyncFlatSpec with MonixTaskSpec with Fixture with
}
}

"updateOnePar" should "update a single document per each received element whereas elements are grouped in lists. " +
"Elements from a single group are executed at once in parallel." in {
val porto = "Porto"
val lisbon = "Lisbon"
val age = 45
val employees = genEmployeesWith(city = Some(porto), age = Some(age), n = 10).sample.get
val filter = Filters.eq("city", porto)
val update = Updates.set("city", lisbon)
val updates: Seq[(Bson, Bson)] = List.fill(4)((filter, update))

MongoConnection
.create1(mongoEndpoint, randomEmployeesColRef).use { operator =>
operator.single.insertMany(employees) *>
Observable.from(List(updates.take(2), updates.drop(2))).consumeWith(operator.sink.updateOnePar()) *>
operator.source.findAll
.toListL
} .asserting { employees =>
employees.size shouldBe employees.size
employees.count(_.city == porto) shouldBe employees.size - updates.size
employees.count(_.city == lisbon) shouldBe updates.size
}
}

"replaceOne" should "replace a single document per each received element" in {
val e1 = Employee("Employee1", 45, "Rio")
val e2 = Employee("Employee2", 34, "Rio")
Expand All @@ -133,6 +201,25 @@ class MongoSinkSuite extends AsyncFlatSpec with MonixTaskSpec with Fixture with
}
}

"replaceOnePar" should "replace a single document per each received element whereas elements are grouped in lists. " +
"Elements from a single group are executed at once in parallel." in {
val e1 = Employee("Employee1", 45, "Rio")
val e2 = Employee("Employee2", 34, "Rio")
val t1 = (Filters.eq("name", "Employee1"), Employee("Employee3", 43, "Rio"))
val t2 = (Filters.eq("name", "Employee2"), Employee("Employee4", 37, "Rio"))
val replacements: Seq[(Bson, Employee)] = List(t1, t2)

MongoConnection
.create1(mongoEndpoint, randomEmployeesColRef).use { operator =>
operator.single.insertMany(List(e1, e2)) *>
Observable.from(List(replacements)).consumeWith(operator.sink.replaceOnePar()) *>
operator.source.findAll.toListL
}.asserting { employees =>
employees.size shouldBe replacements.size
employees should contain theSameElementsAs replacements.map(_._2)
}
}

"updateMany" should "update many documents per each received request" in {
val name1 = "Name1"
val name2 = "Name2"
Expand Down Expand Up @@ -184,4 +271,34 @@ class MongoSinkSuite extends AsyncFlatSpec with MonixTaskSpec with Fixture with
}
}

"updateManyPar" should "update many documents per each received request whereas requests are grouped in lists. " +
"Requests from a single group are executed at once in parallel." in {
val name1 = "Name1"
val name2 = "Name2"
val name3 = "Name3"
val name4 = "Name4"
val e1 = genEmployeesWith(name = Some(name1), n = 10).sample.get
val e2 = genEmployeesWith(name = Some(name2), age = Some(31), n = 20).sample.get
val e3 = genEmployeesWith(name = Some(name3), n = 30).sample.get

val u1 = (Filters.eq("name", name1), Updates.set("name", name3))
val u2 = (Filters.eq("name", name2), Updates.combine(Updates.set("name", name4), Updates.inc("age", 10)))
val updates: Seq[(Bson, Bson)] = List(u1, u2)

MongoConnection
.create1(mongoEndpoint, randomEmployeesColRef).use { operator =>
operator.single.insertMany(e1 ++ e2 ++ e3) *>
Observable.from(List(updates)).consumeWith(operator.sink.updateManyPar()) *>
operator.source.findAll.toListL
}.asserting { employees =>
employees.size shouldBe e1.size + e2.size + e3.size
employees.count(_.name == name3) shouldBe (e1 ++ e3).size
employees.count(_.name == name1) shouldBe 0
employees.count(_.name == name4) shouldBe e2.map(_.copy(name = name4)).size
employees.filter(_.name == name4) should contain theSameElementsAs e2
.map(_.copy(name = name4))
.map(e => e.copy(age = e.age + 10))
}
}

}
Loading

0 comments on commit 4c53530

Please sign in to comment.