From e9846b9764ae6ba7ad01612a42b18cab2068ce50 Mon Sep 17 00:00:00 2001 From: Ross Geesman Date: Thu, 19 Sep 2024 15:11:26 -0700 Subject: [PATCH] Add notes regarding closing sessions when working with streams --- lib/mongo.ex | 14 +++++++++++++- lib/mongo/session.ex | 2 ++ lib/mongo/stream.ex | 5 ++++- 3 files changed, 19 insertions(+), 2 deletions(-) diff --git a/lib/mongo.ex b/lib/mongo.ex index 2ef69435..1932e76c 100644 --- a/lib/mongo.ex +++ b/lib/mongo.ex @@ -418,7 +418,19 @@ defmodule Mongo do end @doc """ - Performs aggregation operation using the aggregation pipeline. + Performs aggregation operation using the aggregation pipeline and returns a Mongo.Stream. + It should be noted that code that uses the paginated query results without engaging Mongo.Streams Enumerable behavior + can result in the sessions hanging around and causing resource exhaustion. + + Example: + # Results in an open session + %Mongo.Stream{docs: docs} = Mongo.aggregate(@topology, collection, pipeline, opts) + docs |> Enum.map(fn elem -> elem end) + + # Results in a closed session via the Enumerable protocol + Mongo.aggregate(@topology, collection, pipeline, opts) + |> Enum.map(fn elem -> elem end) + For all options see [Options](https://docs.mongodb.com/manual/reference/command/aggregate/#aggregate) diff --git a/lib/mongo/session.ex b/lib/mongo/session.ex index a1ea4149..0ed33d6b 100644 --- a/lib/mongo/session.ex +++ b/lib/mongo/session.ex @@ -62,6 +62,8 @@ defmodule Mongo.Session do For more information about causal consistency see the [officially documentation](https://docs.mongodb.com/manual/core/read-isolation-consistency-recency/#causal-consistency). + Note that Mongo.Stream implements the Enumerable protocol and the reduce/3 function calls Mongo.Stream.checkin_session/3 after the stream is exhausted. + If you want to use transaction, then you need to create a session as well: alias Mongo.Session diff --git a/lib/mongo/stream.ex b/lib/mongo/stream.ex index c32d5f54..d158ec5d 100644 --- a/lib/mongo/stream.ex +++ b/lib/mongo/stream.ex @@ -1,5 +1,8 @@ defmodule Mongo.Stream do - @moduledoc false + @moduledoc """ + This module provides a stream implementation that automatically checks out a session when the stream is started and an Enumerable + protocol that checks it back in when the stream has been consumed. + """ alias Mongo.Session alias Mongo.Error