Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Change Stream #205

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@ before_script:

env:
matrix:
- MONGOVERSION=2.4.14 TRAVIS_NODE_VERSION=4
- MONGOVERSION=2.6.12 TRAVIS_NODE_VERSION=4
- MONGOVERSION=3.0.15 TRAVIS_NODE_VERSION=4
- MONGOVERSION=3.2.13 TRAVIS_NODE_VERSION=4
- MONGOVERSION=3.4.4 TRAVIS_NODE_VERSION=4
- MONGOVERSION=3.6.0 TRAVIS_NODE_VERSION=4
- MONGOVERSION=3.2.20 TRAVIS_NODE_VERSION=4
- MONGOVERSION=3.4.15 TRAVIS_NODE_VERSION=4
- MONGOVERSION=3.6.4 TRAVIS_NODE_VERSION=4
14 changes: 13 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,16 @@
## v0.4.6
## v0.5.0-dev

* Breaking Changes
* No longer supporting MongoDB < 3.2
* Rename `:connect_timeout_ms` option to `:connect_timeout`

* Bug Fixes
* `:timeout` option will now be properly used per query

* Enhancements
* Support for MongoDB 3.6 collection [Change Streams](https://docs.mongodb.com/manual/changeStreams/)

## v0.4.6 (2018-05-20)

* Enhancements
* Added `:connect_timout_ms` to `Mongo.start_link/1`
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

## Features

* Supports MongoDB versions 2.4, 2.6, 3.0, 3.2, 3.4, 3.6
* Supports MongoDB versions 3.2, 3.4, 3.6
* Connection pooling (through db_connection)
* Streaming cursors
* Performant ObjectID generation
Expand Down
44 changes: 27 additions & 17 deletions lib/mongo.ex
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ defmodule Mongo do
* `:idle` - The idle strategy, `:passive` to avoid checkin when idle and
`:active` to checking when idle (default: `:passive`)
* `:idle_timeout` - The idle timeout to ping the database (default: `1_000`)
* `:connect_timeout_ms` - The maximum timeout for the initial connection
* `:connect_timeout` - The maximum timeout for the initial connection
(default: `5_000`)
* `:backoff_min` - The minimum backoff interval (default: `1_000`)
* `:backoff_max` - The maximum backoff interval (default: `30_000`)
Expand Down Expand Up @@ -138,14 +138,26 @@ defmodule Mongo do
Mongo.IdServer.new
end

@doc """
Performs a $changeStream operation

## Options

"""
@spec watch_collection(GenServer.server, collection, [BSON.document], Keyword.it) :: cursor
def watch_collection(topology_pid, coll, pipeline, opts \\ []) do
full_document = opts[:full_document] || "default"
opts = Keyword.drop(opts, ~w(full_document))
aggregate(topology_pid, coll, [%{"$changeStream" => %{fullDocument: full_document}} | pipeline], opts)
end

@doc """
Performs aggregation operation using the aggregation pipeline.

## Options

* `:allow_disk_use` - Enables writing to temporary files (Default: false)
* `:max_time` - Specifies a time limit in milliseconds
* `:use_cursor` - Use a cursor for a batched response (Default: true)
"""
@spec aggregate(GenServer.server, collection, [BSON.document], Keyword.t) :: cursor
def aggregate(topology_pid, coll, pipeline, opts \\ []) do
Expand All @@ -157,17 +169,11 @@ defmodule Mongo do
] |> filter_nils
wv_query = %Query{action: :wire_version}

with {:ok, conn, _, _} <- select_server(topology_pid, :read, opts),
{:ok, version} <- DBConnection.execute(conn, wv_query, [], defaults(opts)) do
cursor? = version >= 1 and Keyword.get(opts, :use_cursor, true)
opts = Keyword.drop(opts, ~w(allow_disk_use max_time use_cursor)a)
with {:ok, conn, _, _} <- select_server(topology_pid, :read, opts) do
opts = Keyword.drop(opts, ~w(allow_disk_use use_cursor)a)

if cursor? do
query = query ++ [cursor: filter_nils(%{batchSize: opts[:batch_size]})]
aggregation_cursor(conn, "$cmd", query, nil, opts)
else
singly_cursor(conn, "$cmd", query, nil, opts)
end
query = query ++ [cursor: filter_nils(%{batchSize: opts[:batch_size]})]
aggregation_cursor(conn, "$cmd", query, nil, opts)
end
end

Expand Down Expand Up @@ -438,11 +444,15 @@ defmodule Mongo do

@doc false
def get_more(conn, coll, cursor, opts) do
query = %Query{action: :get_more, extra: {coll, cursor}}
with {:ok, reply} <- DBConnection.execute(conn, query, [], defaults(opts)),
:ok <- maybe_failure(reply),
op_reply(docs: docs, cursor_id: cursor_id, from: from, num: num) = reply,
do: {:ok, %{from: from, num: num, cursor_id: cursor_id, docs: docs}}
query = [
{"getMore", cursor},
{"collection", coll},
{"batchSize", opts[:batch_size]},
{"maxTimeMS", opts[:max_time]}
]
|> filter_nils()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use a function call when a pipeline is only one function long


direct_command(conn, query, opts)
end

@doc false
Expand Down
10 changes: 5 additions & 5 deletions lib/mongo/cursor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ defmodule Mongo.Cursor do
opts = batch_size(limit, opts)

case Mongo.get_more(conn, coll, cursor, opts) do
{:ok, %{cursor_id: cursor, docs: []}} ->
{:ok, %{"cursor" => %{"id" => cursor, "nextBatch" => []}}} ->
{:halt, state(state, cursor: cursor)}
{:ok, %{cursor_id: cursor, docs: docs, num: num}} ->
{docs, state(state, cursor: cursor, limit: new_limit(limit, num))}
{:ok, %{"cursor" => %{"id" => cursor, "nextBatch" => docs}}} ->
{docs, state(state, cursor: cursor, limit: new_limit(limit, length(docs)))}
{:error, error} ->
raise error
end
Expand Down Expand Up @@ -163,9 +163,9 @@ defmodule Mongo.AggregationCursor do

state(buffer: [], conn: conn, cursor: cursor, coll: coll) = state ->
case Mongo.get_more(conn, coll, cursor, opts) do
{:ok, %{cursor_id: cursor, docs: []}} ->
{:ok, %{"cursor" => %{"id" => cursor, "nextBatch" => []}}} ->
{:halt, state(state, cursor: cursor)}
{:ok, %{cursor_id: cursor, docs: docs}} ->
{:ok, %{"cursor" => %{"id" => cursor, "nextBatch" => docs}}} ->
{docs, state(state, cursor: cursor)}
{:error, error} ->
raise error
Expand Down
42 changes: 17 additions & 25 deletions lib/mongo/protocol.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ defmodule Mongo.Protocol do
use Mongo.Messages
alias Mongo.Protocol.Utils

@timeout 5000
@timeout 5_000
@find_flags ~w(tailable_cursor slave_ok no_cursor_timeout await_data exhaust allow_partial_results oplog_replay)a
@find_one_flags ~w(slave_ok exhaust partial)a
@insert_flags ~w(continue_on_error)a
Expand All @@ -29,7 +29,7 @@ defmodule Mongo.Protocol do
socket: nil,
request_id: 0,
timeout: opts[:timeout] || @timeout,
connect_timeout_ms: opts[:connect_timeout_ms] || @timeout,
connect_timeout: opts[:connect_timeout] || @timeout,
database: Keyword.fetch!(opts, :database),
write_concern: Map.new(write_concern),
wire_version: nil,
Expand Down Expand Up @@ -86,7 +86,7 @@ defmodule Mongo.Protocol do
defp ssl(%{socket: {:gen_tcp, sock}} = s, opts) do
host = (opts[:hostname] || "localhost") |> to_charlist
ssl_opts = Keyword.put_new(opts[:ssl_opts] || [], :server_name_indication, host)
case :ssl.connect(sock, ssl_opts, s.connect_timeout_ms) do
case :ssl.connect(sock, ssl_opts, s.connect_timeout) do
{:ok, ssl_sock} ->
{:ok, %{s | socket: {:ssl, ssl_sock}}}
{:error, reason} ->
Expand All @@ -102,7 +102,7 @@ defmodule Mongo.Protocol do

s = Map.put(s, :host, "#{host}:#{port}")

case :gen_tcp.connect(host, port, sock_opts, s.connect_timeout_ms) do
case :gen_tcp.connect(host, port, sock_opts, s.connect_timeout) do
{:ok, socket} ->
# A suitable :buffer is only set if :recbuf is included in
# :socket_options.
Expand Down Expand Up @@ -212,15 +212,7 @@ defmodule Mongo.Protocol do

op_query(coll: Utils.namespace(coll, s, opts[:database]), query: query, select: select,
num_skip: num_skip, num_return: num_return, flags: flags(flags))
|> message_reply(s)
end

defp handle_execute(:get_more, {coll, cursor_id}, [], opts, s) do
num_return = Keyword.get(opts, :batch_size, 0)

op_get_more(coll: Utils.namespace(coll, s, opts[:database]), cursor_id: cursor_id,
num_return: num_return)
|> message_reply(s)
|> message_reply(s, opts[:timeout])

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use a function call when a pipeline is only one function long

end

defp handle_execute(:kill_cursors, cursor_ids, [], _opts, s) do
Expand All @@ -232,57 +224,57 @@ defmodule Mongo.Protocol do
defp handle_execute(:insert_one, coll, [doc], opts, s) do
flags = flags(Keyword.take(opts, @insert_flags))
op = op_insert(coll: Utils.namespace(coll, s, opts[:database]), docs: [doc], flags: flags)
message_gle(-11, op, opts, s)
message_gle(-11, op, opts, s, opts[:timeout])
end

defp handle_execute(:insert_many, coll, docs, opts, s) do
flags = flags(Keyword.take(opts, @insert_flags))
op = op_insert(coll: Utils.namespace(coll, s, opts[:database]), docs: docs, flags: flags)
message_gle(-12, op, opts, s)
message_gle(-12, op, opts, s, opts[:timeout])
end

defp handle_execute(:delete_one, coll, [query], opts, s) do
flags = [:single]
op = op_delete(coll: Utils.namespace(coll, s, opts[:database]), query: query, flags: flags)
message_gle(-13, op, opts, s)
message_gle(-13, op, opts, s, opts[:timeout])
end

defp handle_execute(:delete_many, coll, [query], opts, s) do
flags = []
op = op_delete(coll: Utils.namespace(coll, s, opts[:database]), query: query, flags: flags)
message_gle(-14, op, opts, s)
message_gle(-14, op, opts, s, opts[:timeout])
end

defp handle_execute(:replace_one, coll, [query, replacement], opts, s) do
flags = flags(Keyword.take(opts, @update_flags))
op = op_update(coll: Utils.namespace(coll, s, opts[:database]), query: query, update: replacement,
flags: flags)
message_gle(-15, op, opts, s)
message_gle(-15, op, opts, s, opts[:timeout])
end

defp handle_execute(:update_one, coll, [query, update], opts, s) do
flags = flags(Keyword.take(opts, @update_flags))
op = op_update(coll: Utils.namespace(coll, s, opts[:database]), query: query, update: update,
flags: flags)
message_gle(-16, op, opts, s)
message_gle(-16, op, opts, s, opts[:timeout])
end

defp handle_execute(:update_many, coll, [query, update], opts, s) do
flags = [:multi | flags(Keyword.take(opts, @update_flags))]
op = op_update(coll: Utils.namespace(coll, s, opts[:database]), query: query, update: update,
flags: flags)
message_gle(-17, op, opts, s)
message_gle(-17, op, opts, s, opts[:timeout])
end

defp handle_execute(:command, nil, [query], opts, s) do
flags = Keyword.take(opts, @find_one_flags)
op_query(coll: Utils.namespace("$cmd", s, opts[:database]), query: query, select: "",
num_skip: 0, num_return: 1, flags: flags(flags))
|> message_reply(s)
|> message_reply(s, opts[:timeout])

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use a function call when a pipeline is only one function long

end

defp message_reply(op, s) do
with {:ok, reply} <- Utils.message(s.request_id, op, s),
defp message_reply(op, s, timeout) do
with {:ok, reply} <- Utils.message(s.request_id, op, s, timeout),
s = %{s | request_id: s.request_id + 1},
do: {:ok, reply, s}
end
Expand All @@ -294,7 +286,7 @@ defmodule Mongo.Protocol do
end)
end

defp message_gle(id, op, opts, s) do
defp message_gle(id, op, opts, s, timeout) do
write_concern = Keyword.take(opts, @write_concern) |> Map.new
write_concern = Map.merge(s.write_concern, write_concern)

Expand All @@ -306,7 +298,7 @@ defmodule Mongo.Protocol do
select: "", num_skip: 0, num_return: -1, flags: [])

ops = [{id, op}, {s.request_id, gle_op}]
message_reply(ops, s)
message_reply(ops, s, timeout)
end
end

Expand Down
32 changes: 16 additions & 16 deletions lib/mongo/protocol/utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,22 @@ defmodule Mongo.Protocol.Utils do
import Kernel, except: [send: 2]
import Mongo.Messages

def message(id, ops, s) when is_list(ops) do
def message(id, ops, s, timeout) when is_list(ops) do
with :ok <- send(ops, s),
{:ok, ^id, reply} <- recv(s),
{:ok, ^id, reply} <- recv(s, timeout),
do: {:ok, reply}
end
def message(id, op, s) do
def message(id, op, s, timeout) do
with :ok <- send(id, op, s),
{:ok, ^id, reply} <- recv(s),
{:ok, ^id, reply} <- recv(s, timeout),
do: {:ok, reply}
end

def command(id, command, s) do
def command(id, command, s), do: command(id, command, s, s.timeout)
def command(id, command, s, timeout) do
op = op_query(coll: namespace("$cmd", s, nil), query: BSON.Encoder.document(command),
select: "", num_skip: 0, num_return: 1, flags: [])
case message(id, op, s) do
case message(id, op, s, timeout) do
{:ok, op_reply(docs: docs)} ->
case BSON.Decoder.documents(docs) do
[] -> {:ok, nil}
Expand Down Expand Up @@ -68,32 +69,31 @@ defmodule Mongo.Protocol.Utils do
end
end

def recv(s) do
recv(nil, "", s)
end
def recv(s, nil), do: recv(nil, "", s, 5_000)
def recv(s, timeout), do: recv(nil, "", s, timeout)

# TODO: Optimize to reduce :gen_tcp.recv and decode_message calls
# based on message size in header.
# :gen.tcp.recv(socket, min(size, max_packet))
# where max_packet = 64mb
defp recv(nil, data, %{socket: {mod, sock}} = s) do
defp recv(nil, data, %{socket: {mod, sock}} = s, timeout) do
case decode_header(data) do
{:ok, header, rest} ->
recv(header, rest, s)
recv(header, rest, s, timeout)
:error ->
case mod.recv(sock, 0, s.timeout) do
{:ok, tail} -> recv(nil, [data|tail], s)
case mod.recv(sock, 0, timeout) do
{:ok, tail} -> recv(nil, [data|tail], s, timeout)
{:error, reason} -> recv_error(reason, s)
end
end
end
defp recv(header, data, %{socket: {mod, sock}} = s) do
defp recv(header, data, %{socket: {mod, sock}} = s, timeout) do
case decode_message(header, data) do
{:ok, id, reply, ""} ->
{:ok, id, reply}
:error ->
case mod.recv(sock, 0, s.timeout) do
{:ok, tail} -> recv(header, [data|tail], s)
case mod.recv(sock, 0, timeout) do
{:ok, tail} -> recv(header, [data|tail], s, timeout)
{:error, reason} -> recv_error(reason, s)
end
end
Expand Down
4 changes: 2 additions & 2 deletions mix.exs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
defmodule Mongodb.Mixfile do
use Mix.Project

@version "0.4.6"
@version "0.5.0-dev"

def project do
[app: :mongodb,
version: @version,
elixirc_paths: elixirc_paths(Mix.env),
elixir: "~> 1.3",
elixir: "~> 1.4",
name: "Mongodb",
deps: deps(),
docs: docs(),
Expand Down
8 changes: 4 additions & 4 deletions test/mongo/connection_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,11 @@ defmodule Mongo.ConnectionTest do

assert {:ok, %{cursor_id: cursor_id, from: 0, docs: [%{"foo" => 42}, %{"foo" => 43}]}} =
Mongo.raw_find(conn, coll, %{}, nil, batch_size: 2)
assert {:ok, %{cursor_id: ^cursor_id, from: 2, docs: [%{"foo" => 44}, %{"foo" => 45}]}} =
assert {:ok, %{"cursor" => %{"id" => ^cursor_id, "nextBatch" => [%{"foo" => 44}, %{"foo" => 45}]}}} =
Mongo.get_more(conn, coll, cursor_id, batch_size: 2)
assert {:ok, %{cursor_id: ^cursor_id, from: 4, docs: [%{"foo" => 46}, %{"foo" => 47}]}} =
assert {:ok, %{"cursor" => %{"id" => ^cursor_id, "nextBatch" => [%{"foo" => 46}, %{"foo" => 47}]}}} =
Mongo.get_more(conn, coll, cursor_id, batch_size: 2)
assert {:ok, %{cursor_id: 0, from: 6, docs: []}} =
assert {:ok, %{"cursor" => %{"id" => 0, "nextBatch" => []}}} =
Mongo.get_more(conn, coll, cursor_id, batch_size: 2)
end

Expand All @@ -141,7 +141,7 @@ defmodule Mongo.ConnectionTest do
Mongo.raw_find(conn, coll, %{}, nil, batch_size: 2)
assert :ok = Mongo.kill_cursors(conn, [cursor_id], [])

assert {:error, %Mongo.Error{code: nil, message: "cursor not found"}} =
assert {:error, %Mongo.Error{code: 43}} =
Mongo.get_more(conn, coll, cursor_id, [])
end

Expand Down
Loading