diff --git a/lib/mongo.ex b/lib/mongo.ex index 2ef6943..1932e76 100644 --- a/lib/mongo.ex +++ b/lib/mongo.ex @@ -418,7 +418,19 @@ defmodule Mongo do end @doc """ - Performs aggregation operation using the aggregation pipeline. + Performs aggregation operation using the aggregation pipeline and returns a Mongo.Stream. + It should be noted that code that uses the paginated query results without engaging Mongo.Streams Enumerable behavior + can result in the sessions hanging around and causing resource exhaustion. + + Example: + # Results in an open session + %Mongo.Stream{docs: docs} = Mongo.aggregate(@topology, collection, pipeline, opts) + docs |> Enum.map(fn elem -> elem end) + + # Results in a closed session via the Enumerable protocol + Mongo.aggregate(@topology, collection, pipeline, opts) + |> Enum.map(fn elem -> elem end) + For all options see [Options](https://docs.mongodb.com/manual/reference/command/aggregate/#aggregate) diff --git a/lib/mongo/session.ex b/lib/mongo/session.ex index a1ea414..0ed33d6 100644 --- a/lib/mongo/session.ex +++ b/lib/mongo/session.ex @@ -62,6 +62,8 @@ defmodule Mongo.Session do For more information about causal consistency see the [officially documentation](https://docs.mongodb.com/manual/core/read-isolation-consistency-recency/#causal-consistency). + Note that Mongo.Stream implements the Enumerable protocol and the reduce/3 function calls Mongo.Stream.checkin_session/3 after the stream is exhausted. + If you want to use transaction, then you need to create a session as well: alias Mongo.Session diff --git a/lib/mongo/stream.ex b/lib/mongo/stream.ex index c32d5f5..d158ec5 100644 --- a/lib/mongo/stream.ex +++ b/lib/mongo/stream.ex @@ -1,5 +1,8 @@ defmodule Mongo.Stream do - @moduledoc false + @moduledoc """ + This module provides a stream implementation that automatically checks out a session when the stream is started and an Enumerable + protocol that checks it back in when the stream has been consumed. + """ alias Mongo.Session alias Mongo.Error