Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add notes regarding closing sessions when working with streams #252

Merged
merged 1 commit into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion lib/mongo.ex
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,19 @@ defmodule Mongo do
end

@doc """
Performs aggregation operation using the aggregation pipeline.
Performs aggregation operation using the aggregation pipeline and returns a Mongo.Stream.
It should be noted that code that uses the paginated query results without engaging Mongo.Streams Enumerable behavior
can result in the sessions hanging around and causing resource exhaustion.

Example:
# Results in an open session
%Mongo.Stream{docs: docs} = Mongo.aggregate(@topology, collection, pipeline, opts)
docs |> Enum.map(fn elem -> elem end)

# Results in a closed session via the Enumerable protocol
Mongo.aggregate(@topology, collection, pipeline, opts)
|> Enum.map(fn elem -> elem end)


For all options see [Options](https://docs.mongodb.com/manual/reference/command/aggregate/#aggregate)

Expand Down
2 changes: 2 additions & 0 deletions lib/mongo/session.ex
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ defmodule Mongo.Session do

For more information about causal consistency see the [officially documentation](https://docs.mongodb.com/manual/core/read-isolation-consistency-recency/#causal-consistency).

Note that Mongo.Stream implements the Enumerable protocol and the reduce/3 function calls Mongo.Stream.checkin_session/3 after the stream is exhausted.

If you want to use transaction, then you need to create a session as well:

alias Mongo.Session
Expand Down
5 changes: 4 additions & 1 deletion lib/mongo/stream.ex
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
defmodule Mongo.Stream do
@moduledoc false
@moduledoc """
This module provides a stream implementation that automatically checks out a session when the stream is started and an Enumerable
protocol that checks it back in when the stream has been consumed.
"""

alias Mongo.Session
alias Mongo.Error
Expand Down
Loading