diff --git a/docs/mongodb.md b/docs/mongodb.md index 29bb34f42..34b862d2b 100644 --- a/docs/mongodb.md +++ b/docs/mongodb.md @@ -500,6 +500,8 @@ connection.use(_.source.findOneAndUpdate(filter, update)).runToFuture The `Single` and `Sink` components are documented together since they both implement exactly the same set of operations _insert_, _delete_, _replace_ and _update_, although with the difference that the first executes the operation in single task and the latter pipes the elements of the stream to a _Monix_ `Consumer` to execute them. +Most of the `Sink` consumers have an alternative method with the suffix `Par`, which executes requests grouped +in sequences in a parallel fashion as batch operations. The following sub-sections represent the list of different _operations_ and _sinks_ available to use, with a small example on each one. @@ -531,12 +533,19 @@ import monix.connect.mongodb.client.CollectionOperator import monix.eval.Task import monix.reactive.Observable -val employees = List(Employee("Luke", "London", 41)) +val employee = List(Employee("Luke", "London", 41)) val connection: Resource[Task, CollectionOperator[Employee]] connection.use { operator => - Observable.from(employees) // Observable[Employee] + Observable.from(employee) // Observable[Employee] .consumeWith(operator.sink.insertOne()) }.runToFuture + +val employees = List(Employee("Luke", "London", 41), Employee("Matt", "Warsaw", 23), + Employee("Jack", "Berlin", 24), Employee("John", "London", 41)) +connection.use { operator => + Observable.from(employees.grouped(2).toList) // Observable[List[Employee]] + .consumeWith(operator.sink.insertOnePar()) +}.runToFuture ``` #### insertMany @@ -615,6 +624,13 @@ connection.use { operator => .map(name => Filters.eq("name", name)) // Observable[Bson] .consumeWith(operator.sink.deleteOne()) }.runToFuture + +connection.use { operator => + // employees to be deleted from the collection + Observable.from(List("Lauren", "Marie", "Simon", "Matt").grouped(2).toList) // Observable[List[String]] + .map(names => names.map(name => Filters.eq("name", name))) // Observable[List[Bson]] + .consumeWith(operator.sink.deleteOnePar()) +}.runToFuture ``` #### deleteMany @@ -653,6 +669,13 @@ connection.use { operator => .map(city => Filters.eq("city", city)) // Observable[Bson] .consumeWith(operator.sink.deleteMany()) }.runToFuture + +connection.use { operator => + // all employees from `Germany`, `Italy` or `Turkey` were fired (deleted) + Observable.from(List("Germany", "Italy", "Turkey").grouped(2).toList) // Observable[List[String]] + .map(cities => cities.map(city => Filters.eq("city", city))) // Observable[List[Bson]] + .consumeWith(operator.sink.deleteManyPar()) +}.runToFuture ``` ### Replace @@ -699,6 +722,18 @@ connection.use { operator => Observable.from(replacements) // Observable[(Bson, Employee)] .consumeWith(operator.sink.replaceOne()) }.runToFuture + +val replacements2: Seq[(Bson, Employee)] = + List( + (Filters.eq("name", "Employee1"), Employee("Employee5", 43, "Rio")), + (Filters.eq("name", "Employee2"), Employee("Employee6", 37, "Rio")), + (Filters.eq("name", "Employee3"), Employee("Employee7", 43, "Rio")), + (Filters.eq("name", "Employee4"), Employee("Employee8", 37, "Rio")) + ) +connection.use { operator => + Observable.from(replacements2.grouped(2).toList) // Observable[List[(Bson, Employee)]] + .consumeWith(operator.sink.replaceOnePar()) +}.runToFuture ``` ### Update @@ -743,6 +778,16 @@ connection.use { operator => Observable.from(updateElements) // Observable[(Bson, Bson)] .consumeWith(operator.sink.updateOne()) }.runToFuture + +connection.use { operator => + val filter = Filters.eq("city", "Porto") + val update = Updates.set("city", "Lisbon") + val updateElements: List[List[(Bson, Bson)]] = List.fill(4)((filter, update)).grouped(2).toList + // imagine that a company wants to send four of its employees from Porto to Lisbon + Observable.from(updateElements) // Observable[List[(Bson, Bson)]] + .consumeWith(operator.sink.updateOnePar()) + }.runToFuture + ``` #### updateMany @@ -781,6 +826,16 @@ connection.use { operator => .map(city => (Filters.eq("city", city), Updates.pull("hobbies", "Table Tennis"))) // Observable[(Bson, Bson)] .consumeWith(operator.sink.updateMany()) }.runToFuture + +connection.use { operator => + val cities: Set[String] = Set("Seattle", "Nairobi", "Dakar") + Observable.from(cities.grouped(2).toList) // Observable[String] + .map(cities => cities.map(city => ( + Filters.eq("city", city), + Updates.pull("hobbies", "Table Tennis") + )).toList) // Observable[List[(Bson, Bson)]] + .consumeWith(operator.sink.updateManyPar()) +}.runToFuture ``` ### Indexing diff --git a/mongodb/src/it/scala/monix/connect/mongodb/MongoSinkSuite.scala b/mongodb/src/it/scala/monix/connect/mongodb/MongoSinkSuite.scala index 4a0e08acf..99cfb1afe 100644 --- a/mongodb/src/it/scala/monix/connect/mongodb/MongoSinkSuite.scala +++ b/mongodb/src/it/scala/monix/connect/mongodb/MongoSinkSuite.scala @@ -46,6 +46,23 @@ class MongoSinkSuite extends AsyncFlatSpec with MonixTaskSpec with Fixture with .asserting(_ should contain theSameElementsAs e2) } + "deleteOnePar" should "delete single elements by grouped filters, the single group of filters is executed " + + "at once in parallel" in { + val e1 = Gen.nonEmptyListOf(genEmployee).sample.get + val e2 = Gen.nonEmptyListOf(genEmployee).sample.get + val e3 = Gen.nonEmptyListOf(genEmployee).sample.get + + val employeesColRef = randomEmployeesMongoCol + MongoSingle.insertMany(employeesColRef, e1 ++ e2 ++ e3) *> + Observable + .from(List(e1, e2)) + .map(es => es.map(e => Filters.eq("name", e.name))) + .consumeWith(MongoSink.deleteOnePar(employeesColRef)) *> + MongoSource.findAll(employeesColRef) + .toListL + .asserting(_ should contain theSameElementsAs e3) + } + "deleteMany" should "delete multiple documents per each emitted filter" in { val germans = genEmployeesWith(city = Some("Munich")).sample.get val italians = genEmployeesWith(city = Some("Rome")).sample.get @@ -63,6 +80,24 @@ class MongoSinkSuite extends AsyncFlatSpec with MonixTaskSpec with Fixture with .asserting(_ should contain theSameElementsAs egyptians) } + "deleteManyPar" should "delete multiple documents per each emitted grouped filters, the single group " + + "of filters is executed at once in parallel" in { + val germans = genEmployeesWith(city = Some("Munich")).sample.get + val italians = genEmployeesWith(city = Some("Rome")).sample.get + val turks = genEmployeesWith(city = Some("Istanbul")).sample.get + val egyptians = genEmployeesWith(city = Some("El Caire")).sample.get + val employeesCol = randomEmployeesMongoCol + + MongoSingle.insertMany(employeesCol, germans ++ italians ++ turks ++ egyptians) *> + Observable + .from(List(List("Munich", "Rome", "Istanbul"))) + .map(cityList => cityList.map(city => Filters.eq("city", city))) + .consumeWith(MongoSink.deleteManyPar(employeesCol)) *> + MongoSource.findAll(employeesCol) + .toListL + .asserting(_ should contain theSameElementsAs egyptians) + } + "insertOne" should "insert a single document per each received element" in { val employees = Gen.nonEmptyListOf(genEmployee).sample.get MongoConnection @@ -80,6 +115,16 @@ class MongoSinkSuite extends AsyncFlatSpec with MonixTaskSpec with Fixture with }.asserting(_ shouldBe 0L) } + "insertOnePar" should "insert a single document per each received element from a group where " + + "elements from a single group are inserted at once in parallel" in { + val employees = Gen.listOfN(15, genEmployee).sample.get.grouped(3).toList + MongoConnection + .create1(mongoEndpoint, randomEmployeesColRef).use { operator => + Observable.from(employees).consumeWith(operator.sink.insertOnePar()) *> + operator.source.findAll.toListL + }.asserting(_ should contain theSameElementsAs employees.flatten) + } + "insertMany" should "insert documents in batches" in { val n = 20 val employees = Gen.listOfN(n, genEmployee).sample.get @@ -115,6 +160,29 @@ class MongoSinkSuite extends AsyncFlatSpec with MonixTaskSpec with Fixture with } } + "updateOnePar" should "update a single document per each received element whereas elements are grouped in lists. " + + "Elements from a single group are executed at once in parallel." in { + val porto = "Porto" + val lisbon = "Lisbon" + val age = 45 + val employees = genEmployeesWith(city = Some(porto), age = Some(age), n = 10).sample.get + val filter = Filters.eq("city", porto) + val update = Updates.set("city", lisbon) + val updates: Seq[(Bson, Bson)] = List.fill(4)((filter, update)) + + MongoConnection + .create1(mongoEndpoint, randomEmployeesColRef).use { operator => + operator.single.insertMany(employees) *> + Observable.from(List(updates.take(2), updates.drop(2))).consumeWith(operator.sink.updateOnePar()) *> + operator.source.findAll + .toListL + } .asserting { employees => + employees.size shouldBe employees.size + employees.count(_.city == porto) shouldBe employees.size - updates.size + employees.count(_.city == lisbon) shouldBe updates.size + } + } + "replaceOne" should "replace a single document per each received element" in { val e1 = Employee("Employee1", 45, "Rio") val e2 = Employee("Employee2", 34, "Rio") @@ -133,6 +201,25 @@ class MongoSinkSuite extends AsyncFlatSpec with MonixTaskSpec with Fixture with } } + "replaceOnePar" should "replace a single document per each received element whereas elements are grouped in lists. " + + "Elements from a single group are executed at once in parallel." in { + val e1 = Employee("Employee1", 45, "Rio") + val e2 = Employee("Employee2", 34, "Rio") + val t1 = (Filters.eq("name", "Employee1"), Employee("Employee3", 43, "Rio")) + val t2 = (Filters.eq("name", "Employee2"), Employee("Employee4", 37, "Rio")) + val replacements: Seq[(Bson, Employee)] = List(t1, t2) + + MongoConnection + .create1(mongoEndpoint, randomEmployeesColRef).use { operator => + operator.single.insertMany(List(e1, e2)) *> + Observable.from(List(replacements)).consumeWith(operator.sink.replaceOnePar()) *> + operator.source.findAll.toListL + }.asserting { employees => + employees.size shouldBe replacements.size + employees should contain theSameElementsAs replacements.map(_._2) + } + } + "updateMany" should "update many documents per each received request" in { val name1 = "Name1" val name2 = "Name2" @@ -184,4 +271,34 @@ class MongoSinkSuite extends AsyncFlatSpec with MonixTaskSpec with Fixture with } } + "updateManyPar" should "update many documents per each received request whereas requests are grouped in lists. " + + "Requests from a single group are executed at once in parallel." in { + val name1 = "Name1" + val name2 = "Name2" + val name3 = "Name3" + val name4 = "Name4" + val e1 = genEmployeesWith(name = Some(name1), n = 10).sample.get + val e2 = genEmployeesWith(name = Some(name2), age = Some(31), n = 20).sample.get + val e3 = genEmployeesWith(name = Some(name3), n = 30).sample.get + + val u1 = (Filters.eq("name", name1), Updates.set("name", name3)) + val u2 = (Filters.eq("name", name2), Updates.combine(Updates.set("name", name4), Updates.inc("age", 10))) + val updates: Seq[(Bson, Bson)] = List(u1, u2) + + MongoConnection + .create1(mongoEndpoint, randomEmployeesColRef).use { operator => + operator.single.insertMany(e1 ++ e2 ++ e3) *> + Observable.from(List(updates)).consumeWith(operator.sink.updateManyPar()) *> + operator.source.findAll.toListL + }.asserting { employees => + employees.size shouldBe e1.size + e2.size + e3.size + employees.count(_.name == name3) shouldBe (e1 ++ e3).size + employees.count(_.name == name1) shouldBe 0 + employees.count(_.name == name4) shouldBe e2.map(_.copy(name = name4)).size + employees.filter(_.name == name4) should contain theSameElementsAs e2 + .map(_.copy(name = name4)) + .map(e => e.copy(age = e.age + 10)) + } + } + } diff --git a/mongodb/src/main/scala/monix/connect/mongodb/MongoSink.scala b/mongodb/src/main/scala/monix/connect/mongodb/MongoSink.scala index e8e037197..b8984e5aa 100644 --- a/mongodb/src/main/scala/monix/connect/mongodb/MongoSink.scala +++ b/mongodb/src/main/scala/monix/connect/mongodb/MongoSink.scala @@ -51,7 +51,7 @@ object MongoSink extends MongoSinkImpl { * it is not passed by the user * @param retryStrategy defines the amount of retries and backoff delays for failed requests. * @tparam Doc the type of the collection - * @return a [[Consumer]] that expects query filters to apply the the delete operations. + * @return a [[Consumer]] that expects query filters to apply the delete operations. * @see [[com.mongodb.client.model.Filters]] and [[com.mongodb.client.model.Updates]] */ override def deleteOne[Doc]( @@ -61,6 +61,25 @@ object MongoSink extends MongoSinkImpl { super.deleteOne(collection, deleteOptions, retryStrategy) } + /** + * Provides a sink implementation for [[MongoSingle.deleteOne]] that consumes a sequence of elements. + * The elements inside a single sequence will be executed in parallel as a batch operation. + * For each incoming element will remove at most one document from the collection that matches the given filter. + * + * @param collection the abstraction to work with a determined MongoDB Collection + * @param deleteOptions the options to apply to all the delete operations, it will use default ones in case + * it is not passed by the user + * @param retryStrategy defines the amount of retries and backoff delays for failed requests. + * @tparam Doc the type of the collection + * @return a [[Consumer]] that expects sequences of query filters to apply the delete operations. + * @see [[com.mongodb.client.model.Filters]] and [[com.mongodb.client.model.Updates]] + */ + override def deleteOnePar[Doc]( + collection: MongoCollection[Doc], + deleteOptions: DeleteOptions = DefaultDeleteOptions, + retryStrategy: RetryStrategy = DefaultRetryStrategy): Consumer[Seq[Bson], Unit] = + super.deleteOnePar(collection, deleteOptions, retryStrategy) + /** * Provides a sink implementation for [[MongoSingle.deleteMany]] that per each element * removes all documents from the collection that matched the given query filter. @@ -70,7 +89,7 @@ object MongoSink extends MongoSinkImpl { * it is not passed by the user. * @param retryStrategy defines the amount of retries and backoff delays for failed requests. * @tparam Doc the type of the collection - * @return a [[Consumer]] that expects query filters to apply the the delete many operations. + * @return a [[Consumer]] that expects query filters to apply the delete many operations. * @see [[com.mongodb.client.model.Filters]] */ override def deleteMany[Doc]( @@ -79,6 +98,26 @@ object MongoSink extends MongoSinkImpl { retryStrategy: RetryStrategy = DefaultRetryStrategy): Consumer[Bson, Unit] = super.deleteMany(collection, deleteOptions, retryStrategy) + /** + * Provides a sink implementation for [[MongoSingle.deleteMany]] that per each element + * removes all documents from the collection that matched the given query filter. + * The sink requires a sequence of elements, whereas elements from a single sequence will + * be executed in parallel as a batch operation. + * + * @param collection the abstraction to work with a determined MongoDB Collection + * @param deleteOptions the options to apply to the delete operation, it will use default ones in case + * it is not passed by the user. + * @param retryStrategy defines the amount of retries and backoff delays for failed requests. + * @tparam Doc the type of the collection + * @return a [[Consumer]] that expects query filter sequences to apply the delete many operations. + * @see [[com.mongodb.client.model.Filters]] + */ + override def deleteManyPar[Doc]( + collection: MongoCollection[Doc], + deleteOptions: DeleteOptions = DefaultDeleteOptions, + retryStrategy: RetryStrategy = DefaultRetryStrategy): Consumer[Seq[Bson], Unit] = + super.deleteManyPar(collection, deleteOptions, retryStrategy) + /** * Provides a sink implementation for [[MongoSingle.insertOne]] that * expects documents to be passed and inserts them one by one. @@ -97,6 +136,23 @@ object MongoSink extends MongoSinkImpl { super.insertOne(collection, insertOneOptions, retryStrategy) } + /** + * Provides a sink implementation for [[MongoSingle.insertOne]] that expects sequences of documents to be + * passed and inserts all documents from a single sequence in parallel as a batch operation. + * If the document is missing an identifier, the driver should generate one. + * + * @param collection the abstraction to work with the determined mongodb collection + * @param insertOneOptions the options to apply all the insert operations + * @param retryStrategy defines the amount of retries and backoff delays for failed requests. + * @tparam Doc the type of the collection and the incoming documents + * @return a [[Consumer]] that expects documents in batches of type [[Doc]] to be inserted. + */ + override def insertOnePar[Doc]( + collection: MongoCollection[Doc], + insertOneOptions: InsertOneOptions = DefaultInsertOneOptions, + retryStrategy: RetryStrategy = DefaultRetryStrategy): Consumer[Seq[Doc], Unit] = + super.insertOnePar(collection, insertOneOptions, retryStrategy) + /** * Provides a sink implementation for [[MongoSingle.insertMany]] that expects * batches of documents to be inserted at once. @@ -136,6 +192,28 @@ object MongoSink extends MongoSinkImpl { super.replaceOne(collection, replaceOptions, retryStrategy) } + /** + * Provides a sink implementation for [[MongoSingle.replaceOne]] that expects sequences of + * [[Tuple2]] of a filter and the document replacement that for each element + * will execute the replace operation to a single filtered element. + * All elements inside a single sequence will be executed in parallel as a batch operation. + * + * @see [[com.mongodb.client.model.Filters]] + * + * If the documents is missing an identifier, the driver should generate one. + * @param collection the abstraction to work with the determined mongodb collection + * @param replaceOptions the options to apply to the replace operation + * @param retryStrategy defines the amount of retries and backoff delays for failed requests. + * @tparam Doc the type of the collection + * @return a [[Consumer]] that expects [[Tuple2]] of filters and documents in batches of type [[Doc]] to be replaced. + */ + override def replaceOnePar[Doc]( + collection: MongoCollection[Doc], + replaceOptions: ReplaceOptions = DefaultReplaceOptions, + retryStrategy: RetryStrategy = DefaultRetryStrategy): Consumer[Seq[(Bson, Doc)], Unit] = { + super.replaceOnePar(collection, replaceOptions, retryStrategy) + } + /** * Provides a sink implementation for [[MongoSingle.updateOne]] that expects [[Tuple2]] * of a filter and a update that will be executed against the single filtered element. @@ -156,6 +234,28 @@ object MongoSink extends MongoSinkImpl { super.updateOne(collection, updateOptions, retryStrategy) } + /** + * Provides a sink implementation for [[MongoSingle.updateOne]] that expects sequences of [[Tuple2]] + * of a filter and a update that will be executed against the single filtered element. + * All elements inside a single sequence will be executed in parallel as a batch operation. + * + * @see [[com.mongodb.client.model.Filters]] and [[com.mongodb.client.model.Updates]] + * + * If the documents is missing an identifier, the driver should generate one. + * @param collection the abstraction to work with the determined mongodb collection + * @param updateOptions the options to apply to the update operation + * @param retryStrategy defines the amount of retries and backoff delays for failed requests. + * @tparam Doc the type of the collection + * @return a [[Consumer]] that per each element expects a batch of [[Tuple2]] of a filter and the update + * in form of [[Bson]]. + */ + override def updateOnePar[Doc]( + collection: MongoCollection[Doc], + updateOptions: UpdateOptions = DefaultUpdateOptions, + retryStrategy: RetryStrategy = DefaultRetryStrategy): Consumer[Seq[(Bson, Bson)], Unit] = { + super.updateOnePar(collection, updateOptions, retryStrategy) + } + /** * Provides a sink implementation for [[MongoSingle.updateOne]] that expects [[Tuple2]] * of a filter and update that will be executed against all the filtered elements. @@ -176,6 +276,28 @@ object MongoSink extends MongoSinkImpl { super.updateMany(collection, updateOptions, retryStrategy) } + /** + * Provides a sink implementation for [[MongoSingle.updateOne]] that expects sequences of [[Tuple2]] + * of a filter and update that will be executed against all the filtered elements. + * All elements inside a single sequence will be executed in parallel as a batch operation. + * + * @see [[com.mongodb.client.model.Filters]] and [[com.mongodb.client.model.Updates]] + * + * If the documents is missing an identifier, the driver should generate one. + * @param collection the abstraction to work with the determined mongodb collection + * @param updateOptions the options to apply to the update operation + * @param retryStrategy defines the amount of retries and backoff delays for failed requests. + * @tparam Doc the type of the collection + * @return a [[Consumer]] that per each element expects a batch of [[Tuple2]] of a filter and the update + * in form of [[Bson]]. + */ + override def updateManyPar[Doc]( + collection: MongoCollection[Doc], + updateOptions: UpdateOptions = DefaultUpdateOptions, + retryStrategy: RetryStrategy = DefaultRetryStrategy): Consumer[Seq[(Bson, Bson)], Unit] = { + super.updateManyPar(collection, updateOptions, retryStrategy) + } + } class MongoSink[Doc](private[mongodb] val collection: MongoCollection[Doc]) extends MongoSinkImpl { @@ -187,7 +309,7 @@ class MongoSink[Doc](private[mongodb] val collection: MongoCollection[Doc]) exte * @param deleteOptions the options to apply to all the delete operations, it will use default ones in case * it is not passed by the user * @param retryStrategy defines the amount of retries and backoff delays for failed requests. - * @return a [[Consumer]] that expects query filters to apply the the delete operations. + * @return a [[Consumer]] that expects query filters to apply the delete operations. * @see [[com.mongodb.client.model.Filters]] and [[com.mongodb.client.model.Updates]] */ def deleteOne( @@ -196,6 +318,23 @@ class MongoSink[Doc](private[mongodb] val collection: MongoCollection[Doc]) exte super.deleteOne(collection, deleteOptions, retryStrategy) } + /** + * Provides a sink implementation for [[MongoSingle.deleteOne]] that consumes a sequence of elements. + * The elements inside a single sequence will be executed in parallel as a batch operation. + * For each incoming element will remove at most one document from the collection that matches the given filter. + * + * @param deleteOptions the options to apply to all the delete operations, it will use default ones in case + * it is not passed by the user + * @param retryStrategy defines the amount of retries and backoff delays for failed requests. + * @return a [[Consumer]] that expects sequences of query filters to apply the delete operations. + * @see [[com.mongodb.client.model.Filters]] and [[com.mongodb.client.model.Updates]] + */ + def deleteOnePar( + deleteOptions: DeleteOptions = DefaultDeleteOptions, + retryStrategy: RetryStrategy = DefaultRetryStrategy): Consumer[Seq[Bson], Unit] = { + super.deleteOnePar(collection, deleteOptions, retryStrategy) + } + /** * Provides a sink implementation for [[MongoSingle.deleteMany]] that per each element * removes all documents from the collection that matched the given query filter. @@ -203,7 +342,7 @@ class MongoSink[Doc](private[mongodb] val collection: MongoCollection[Doc]) exte * @param deleteOptions the options to apply to the delete operation, it will use default ones in case * it is not passed by the user. * @param retryStrategy defines the amount of retries and backoff delays for failed requests. - * @return a [[Consumer]] that expects query filters to apply the the delete many operations. + * @return a [[Consumer]] that expects query filters to apply the delete many operations. * @see [[com.mongodb.client.model.Filters]] */ def deleteMany( @@ -211,6 +350,23 @@ class MongoSink[Doc](private[mongodb] val collection: MongoCollection[Doc]) exte retryStrategy: RetryStrategy = DefaultRetryStrategy): Consumer[Bson, Unit] = super.deleteMany(collection, deleteOptions, retryStrategy) + /** + * Provides a sink implementation for [[MongoSingle.deleteMany]] that per each element + * removes all documents from the collection that matched the given query filter. + * The sink requires a sequence of elements, whereas elements from a single sequence will + * be executed in parallel as a batch operation. + * + * @param deleteOptions the options to apply to the delete operation, it will use default ones in case + * it is not passed by the user. + * @param retryStrategy defines the amount of retries and backoff delays for failed requests. + * @return a [[Consumer]] that expects query filter sequences to apply the delete many operations. + * @see [[com.mongodb.client.model.Filters]] + */ + def deleteManyPar( + deleteOptions: DeleteOptions = DefaultDeleteOptions, + retryStrategy: RetryStrategy = DefaultRetryStrategy): Consumer[Seq[Bson], Unit] = + super.deleteManyPar(collection, deleteOptions, retryStrategy) + /** * Provides a sink implementation for [[MongoSingle.insertOne]] that * expects documents to be passed and inserts them one by one. @@ -226,6 +382,21 @@ class MongoSink[Doc](private[mongodb] val collection: MongoCollection[Doc]) exte super.insertOne(collection, insertOneOptions, retryStrategy) } + /** + * Provides a sink implementation for [[MongoSingle.insertOne]] that expects sequences of documents to be + * passed and inserts all documents from a single sequence in parallel as a batch operation. + * If the document is missing an identifier, the driver should generate one. + * + * @param insertOneOptions the options to apply all the insert operations + * @param retryStrategy defines the amount of retries and backoff delays for failed requests. + * @return a [[Consumer]] that expects documents in batches of type [[Doc]] to be inserted. + */ + def insertOnePar( + insertOneOptions: InsertOneOptions = DefaultInsertOneOptions, + retryStrategy: RetryStrategy = DefaultRetryStrategy): Consumer[Seq[Doc], Unit] = { + super.insertOnePar(collection, insertOneOptions, retryStrategy) + } + /** * Provides a sink implementation for [[MongoSingle.insertMany]] * that expects batches of documents to be inserted at once. @@ -259,6 +430,25 @@ class MongoSink[Doc](private[mongodb] val collection: MongoCollection[Doc]) exte super.replaceOne(collection, replaceOptions, retryStrategy) } + /** + * Provides a sink implementation for [[MongoSingle.replaceOne]] that expects sequences of + * [[Tuple2]] of a filter and the document replacement that for each element + * will execute the replace operation to a single filtered element. + * All elements inside a single sequence will be executed in parallel as a batch operation. + * + * @see [[com.mongodb.client.model.Filters]] + * + * If the documents is missing an identifier, the driver should generate one. + * @param replaceOptions the options to apply to the replace operation + * @param retryStrategy defines the amount of retries and backoff delays for failed requests. + * @return a [[Consumer]] that expects [[Tuple2]] of filters and documents in batches of type [[Doc]] to be replaced. + */ + def replaceOnePar( + replaceOptions: ReplaceOptions = DefaultReplaceOptions, + retryStrategy: RetryStrategy = DefaultRetryStrategy): Consumer[Seq[(Bson, Doc)], Unit] = { + super.replaceOnePar(collection, replaceOptions, retryStrategy) + } + /** * Provides a sink implementation for [[MongoSingle.updateOne]] that expects [[Tuple2]] * of a filter and a update that will be executed against the single filtered element. @@ -276,6 +466,25 @@ class MongoSink[Doc](private[mongodb] val collection: MongoCollection[Doc]) exte super.updateOne(collection, updateOptions, retryStrategy) } + /** + * Provides a sink implementation for [[MongoSingle.updateOne]] that expects sequences of [[Tuple2]] + * of a filter and a update that will be executed against the single filtered element. + * All elements inside a single sequence will be executed in parallel as a batch operation. + * + * @see [[com.mongodb.client.model.Filters]] and [[com.mongodb.client.model.Updates]] + * + * If the documents is missing an identifier, the driver should generate one. + * @param updateOptions the options to apply to the update operation + * @param retryStrategy defines the amount of retries and backoff delays for failed requests. + * @return a [[Consumer]] that per each element expects a batch of [[Tuple2]] of a filter and the update + * in form of [[Bson]]. + */ + def updateOnePar( + updateOptions: UpdateOptions = DefaultUpdateOptions, + retryStrategy: RetryStrategy = DefaultRetryStrategy): Consumer[Seq[(Bson, Bson)], Unit] = { + super.updateOnePar(collection, updateOptions, retryStrategy) + } + /** * Provides a sink implementation for [[MongoSingle.updateOne]] that expects [[Tuple2]] * of a filter and update that will be executed against all the filtered elements. @@ -293,4 +502,23 @@ class MongoSink[Doc](private[mongodb] val collection: MongoCollection[Doc]) exte super.updateMany(collection, updateOptions, retryStrategy) } + /** + * Provides a sink implementation for [[MongoSingle.updateOne]] that expects sequences of [[Tuple2]] + * of a filter and update that will be executed against all the filtered elements. + * All elements inside a single sequence will be executed in parallel as a batch operation. + * + * @see [[com.mongodb.client.model.Filters]] and [[com.mongodb.client.model.Updates]] + * + * If the documents is missing an identifier, the driver should generate one. + * @param updateOptions the options to apply to the update operation + * @param retryStrategy defines the amount of retries and backoff delays for failed requests. + * @return a [[Consumer]] that per each element expects a batch of [[Tuple2]] of a filter and the update + * in form of [[Bson]]. + */ + def updateManyPar( + updateOptions: UpdateOptions = DefaultUpdateOptions, + retryStrategy: RetryStrategy = DefaultRetryStrategy): Consumer[Seq[(Bson, Bson)], Unit] = { + super.updateManyPar(collection, updateOptions, retryStrategy) + } + } diff --git a/mongodb/src/main/scala/monix/connect/mongodb/internal/MongoSinkImpl.scala b/mongodb/src/main/scala/monix/connect/mongodb/internal/MongoSinkImpl.scala index e2b3fa848..0627e714f 100644 --- a/mongodb/src/main/scala/monix/connect/mongodb/internal/MongoSinkImpl.scala +++ b/mongodb/src/main/scala/monix/connect/mongodb/internal/MongoSinkImpl.scala @@ -38,6 +38,14 @@ private[mongodb] class MongoSinkImpl { new MongoSinkSubscriber(deleteOneOp, retryStrategy) } + protected[this] def deleteOnePar[Doc]( + collection: MongoCollection[Doc], + deleteOptions: DeleteOptions, + retryStrategy: RetryStrategy): Consumer[Seq[Bson], Unit] = { + val deleteOneOp = (filter: Bson) => collection.deleteOne(filter, deleteOptions) + new MongoSinkParSubscriber(deleteOneOp, retryStrategy) + } + protected[this] def deleteMany[Doc]( collection: MongoCollection[Doc], deleteOptions: DeleteOptions, @@ -46,6 +54,14 @@ private[mongodb] class MongoSinkImpl { new MongoSinkSubscriber(deleteManyOnNext, retryStrategy) } + protected[this] def deleteManyPar[Doc]( + collection: MongoCollection[Doc], + deleteOptions: DeleteOptions, + retryStrategy: RetryStrategy): Consumer[Seq[Bson], Unit] = { + val deleteManyOnNext = (filter: Bson) => collection.deleteMany(filter, deleteOptions) + new MongoSinkParSubscriber(deleteManyOnNext, retryStrategy) + } + protected[this] def insertOne[Doc]( collection: MongoCollection[Doc], insertOneOptions: InsertOneOptions, @@ -54,6 +70,14 @@ private[mongodb] class MongoSinkImpl { new MongoSinkSubscriber(insertOneOp, retryStrategy) } + protected[this] def insertOnePar[Doc]( + collection: MongoCollection[Doc], + insertOneOptions: InsertOneOptions, + retryStrategy: RetryStrategy): Consumer[Seq[Doc], Unit] = { + val insertOneOp = (document: Doc) => collection.insertOne(document, insertOneOptions) + new MongoSinkParSubscriber(insertOneOp, retryStrategy) + } + protected[this] def insertMany[Doc]( collection: MongoCollection[Doc], insertManyOptions: InsertManyOptions, @@ -70,6 +94,14 @@ private[mongodb] class MongoSinkImpl { new MongoSinkSubscriber(replaceOp, retryStrategy) } + protected[this] def replaceOnePar[Doc]( + collection: MongoCollection[Doc], + replaceOptions: ReplaceOptions, + retryStrategy: RetryStrategy): Consumer[Seq[(Bson, Doc)], Unit] = { + val replaceOp = (t: (Bson, Doc)) => collection.replaceOne(t._1, t._2, replaceOptions) + new MongoSinkParSubscriber(replaceOp, retryStrategy) + } + protected[this] def updateOne[Doc]( collection: MongoCollection[Doc], updateOptions: UpdateOptions, @@ -78,6 +110,14 @@ private[mongodb] class MongoSinkImpl { new MongoSinkSubscriber(updateOp, retryStrategy) } + protected[this] def updateOnePar[Doc]( + collection: MongoCollection[Doc], + updateOptions: UpdateOptions, + retryStrategy: RetryStrategy): Consumer[Seq[(Bson, Bson)], Unit] = { + val updateOp = (t: (Bson, Bson)) => collection.updateOne(t._1, t._2, updateOptions) + new MongoSinkParSubscriber(updateOp, retryStrategy) + } + protected[this] def updateMany[Doc]( collection: MongoCollection[Doc], updateOptions: UpdateOptions, @@ -86,4 +126,12 @@ private[mongodb] class MongoSinkImpl { new MongoSinkSubscriber(updateOp, retryStrategy) } + protected[this] def updateManyPar[Doc]( + collection: MongoCollection[Doc], + updateOptions: UpdateOptions, + retryStrategy: RetryStrategy): Consumer[Seq[(Bson, Bson)], Unit] = { + val updateOp = (t: (Bson, Bson)) => collection.updateMany(t._1, t._2, updateOptions) + new MongoSinkParSubscriber(updateOp, retryStrategy) + } + } diff --git a/mongodb/src/main/scala/monix/connect/mongodb/internal/MongoSinkParSubscriber.scala b/mongodb/src/main/scala/monix/connect/mongodb/internal/MongoSinkParSubscriber.scala new file mode 100644 index 000000000..82448863e --- /dev/null +++ b/mongodb/src/main/scala/monix/connect/mongodb/internal/MongoSinkParSubscriber.scala @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2020-2021 by The Monix Connect Project Developers. + * See the project homepage at: https://connect.monix.io + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package monix.connect.mongodb.internal + +import monix.connect.mongodb.domain.RetryStrategy +import monix.eval.Task +import monix.execution.{Ack, Callback, Scheduler} +import monix.execution.cancelables.AssignableCancelable +import monix.execution.internal.InternalApi +import monix.reactive.Consumer +import monix.reactive.observers.Subscriber +import org.reactivestreams.Publisher + +import scala.concurrent.Future + +/** + * A pre-built Monix [[Consumer]] implementation representing a Sink that expects sequence of events + * of type [[A]] and executes the mongodb operation [[op]] passed to the class constructor. + * + * @param op the mongodb operation defined as that expects an event of type [[A]] and + * returns a reactivestreams [[Publisher]] of [[Any]]. + * @param retryStrategy defines the amount of retries and backoff delays for failed requests. + * @tparam A the type that the [[Consumer]] expects to receive + */ +@InternalApi +private[mongodb] class MongoSinkParSubscriber[A, B](op: A => Publisher[B], retryStrategy: RetryStrategy) + extends Consumer[Seq[A], Unit] { + + override def createSubscriber( + cb: Callback[Throwable, Unit], + s: Scheduler): (Subscriber[Seq[A]], AssignableCancelable) = { + val sub = new Subscriber[Seq[A]] { + + implicit val scheduler: Scheduler = s + + def onNext(requests: Seq[A]): Future[Ack] = + Task + .parTraverse(requests)(req => retryOnFailure(op(req), retryStrategy)) + .redeem(ex => { + onError(ex) + Ack.Stop + }, _ => { + Ack.Continue + }) + .runToFuture + + def onComplete(): Unit = { + cb.onSuccess(()) + } + + def onError(ex: Throwable): Unit = { + cb.onError(ex) + } + } + (sub, AssignableCancelable.single()) + } + +} diff --git a/mongodb/src/main/scala/monix/connect/mongodb/internal/MongoSinkSubscriber.scala b/mongodb/src/main/scala/monix/connect/mongodb/internal/MongoSinkSubscriber.scala index fee38863b..92196fa70 100644 --- a/mongodb/src/main/scala/monix/connect/mongodb/internal/MongoSinkSubscriber.scala +++ b/mongodb/src/main/scala/monix/connect/mongodb/internal/MongoSinkSubscriber.scala @@ -43,7 +43,7 @@ private[mongodb] class MongoSinkSubscriber[A, B](op: A => Publisher[B], retryStr override def createSubscriber(cb: Callback[Throwable, Unit], s: Scheduler): (Subscriber[A], AssignableCancelable) = { val sub = new Subscriber[A] { - implicit val scheduler = s + implicit val scheduler: Scheduler = s def onNext(request: A): Future[Ack] = { retryOnFailure(op(request), retryStrategy)