Skip to content

add gen_server mongoc to adapt #21

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
.PHONY: test

ERL=erl
BEAMDIR=./deps/*/ebin ./ebin
REBAR=./rebar
REBAR_GEN=../../rebar
DIALYZER=dialyzer

#update-deps
all: deps compile

deps:
@$(REBAR) get-deps

update-deps:
@$(REBAR) update-deps

compile:
@$(REBAR) compile

xref:
@$(REBAR) xref skip_deps=true

clean:
@$(REBAR) clean

test:
$(REBAR) compile ct skip_deps=true

edoc:
@$(REBAR) doc

dialyzer: compile
@$(DIALYZER) ebin deps/ossp_uuid/ebin

setup-dialyzer:
@$(DIALYZER) --build_plt --apps kernel stdlib mnesia eunit erts crypto
2 changes: 1 addition & 1 deletion ebin/mongodb.app
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{application, mongodb,
[{description, "Client interface to MongoDB, also known as the driver. See www.mongodb.org"},
{vsn, "0.2.1"},
{modules, [mongodb_app, mongo, mongo_protocol, mongo_connect, mongo_query, mongo_cursor, mvar, mongodb_tests, mongo_replset, resource_pool]},
{modules, [mongoc,mongodb_app, mongo, mongo_protocol, mongo_connect, mongo_query, mongo_cursor, mvar, mongodb_tests, mongo_replset, resource_pool]},
{registered, []},
{applications, [kernel, stdlib]},
{mod, {mongodb_app, []}}
Expand Down
4 changes: 2 additions & 2 deletions rebar.config
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{deps, [
{bson, ".*", {git, "git://github.com/TonyGen/bson-erlang", "HEAD"}}
{bson, ".*", {git, "https://github.com/TonyGen/bson-erlang.git", "HEAD"}}
]}.

{lib_dirs, ["deps"]}.

{erl_opts, [debug_info, fail_on_warning]}.
{erl_opts, [debug_info, fail_on_warning]}.
456 changes: 235 additions & 221 deletions src/mongo.erl

Large diffs are not rendered by default.

143 changes: 72 additions & 71 deletions src/mongo_connect.erl
Original file line number Diff line number Diff line change
@@ -1,39 +1,39 @@
%@doc Thread-safe TCP connection to a MongoDB server with synchronous call and asynchronous send interface.
-module (mongo_connect).
-module(mongo_connect).

-export_type ([host/0, connection/0, dbconnection/0, failure/0]).
-export_type([host/0, connection/0, dbconnection/0, failure/0]).

-export ([host_port/1, read_host/1, show_host/1]).
-export ([connect/1, connect/2, conn_host/1, close/1, is_closed/1]).
-export([host_port/1, read_host/1, show_host/1]).
-export([connect/1, connect/2, conn_host/1, close/1, is_closed/1]).

-export ([call/3, send/2]). % for mongo_query and mongo_cursor
-export([call/3, send/2]). % for mongo_query and mongo_cursor

-include_lib ("bson/include/bson_binary.hrl").

-type host() :: {inet:hostname(), 0..65535} | inet:hostname().
% Hostname and port. Port defaults to 27017 when missing

-spec host_port (host()) -> host().
-spec host_port(host()) -> host().
%@doc Port explicitly filled in with defaut if missing
host_port ({Hostname, Port}) -> {hostname_string (Hostname), Port};
host_port (Hostname) -> {hostname_string (Hostname), 27017}.
host_port({Hostname, Port}) -> {hostname_string(Hostname), Port};
host_port(Hostname) -> {hostname_string(Hostname), 27017}.

-spec hostname_string (inet:hostname()) -> string().
-spec hostname_string(inet:hostname()) -> string().
%@doc Convert possible hostname atom to string
hostname_string (Name) when is_atom (Name) -> atom_to_list (Name);
hostname_string (Name) -> Name.
hostname_string(Name) when is_atom(Name) -> atom_to_list(Name);
hostname_string(Name) -> Name.

-spec show_host (host()) -> bson:utf8().
-spec show_host(host()) -> bson:utf8().
%@doc UString representation of host, ie. "Hostname:Port"
show_host (Host) ->
{Hostname, Port} = host_port (Host),
bson:utf8 (Hostname ++ ":" ++ integer_to_list (Port)).
show_host(Host) ->
{Hostname, Port} = host_port(Host),
bson:utf8(Hostname ++ ":" ++ integer_to_list(Port)).

-spec read_host (bson:utf8()) -> host().
-spec read_host(bson:utf8()) -> host().
%@doc Interpret ustring as host, ie. "Hostname:Port" -> {Hostname, Port}
read_host (UString) -> case string:tokens (bson:str (UString), ":") of
[Hostname] -> host_port (Hostname);
[Hostname, Port] -> {Hostname, list_to_integer (Port)} end.
read_host(UString) -> case string:tokens(bson:str(UString), ":") of
[Hostname] -> host_port(Hostname);
[Hostname, Port] -> {Hostname, list_to_integer(Port)} end.

-type reason() :: any().

Expand All @@ -42,77 +42,78 @@ read_host (UString) -> case string:tokens (bson:str (UString), ":") of
% Passive raw binary socket.
% Type not opaque to mongo:connection_mode/2

-spec connect (host()) -> {ok, connection()} | {error, reason()}. % IO
-spec connect(host()) -> {ok, connection()} | {error, reason()}. % IO
%@doc Create connection to given MongoDB server or return reason for connection failure.
connect (Host) -> connect (Host, infinity).
connect(Host) -> connect(Host, infinity).

-spec connect (host(), timeout()) -> {ok, connection()} | {error, reason()}. % IO
-spec connect(host(), timeout()) -> {ok, connection()} | {error, reason()}. % IO
%@doc Create connection to given MongoDB server or return reason for connection failure. Timeout is used for initial connection and every call.
connect (Host, TimeoutMS) -> try mvar:create (fun () -> tcp_connect (host_port (Host), TimeoutMS) end, fun gen_tcp:close/1)
of VSocket -> {ok, {connection, host_port (Host), VSocket, TimeoutMS}}
catch Reason -> {error, Reason} end.
connect(Host, TimeoutMS) -> try mvar:create(fun() -> tcp_connect(host_port(Host), TimeoutMS) end, fun gen_tcp:close/1)
of VSocket -> {ok, {connection, host_port(Host), VSocket, TimeoutMS}}
catch Reason -> {error, Reason} end.

-spec conn_host (connection()) -> host().
-spec conn_host(connection()) -> host().
%@doc Host this is connected to
conn_host ({connection, Host, _VSocket, _}) -> Host.
conn_host({connection, Host, _VSocket, _}) -> Host.

-spec close (connection()) -> ok. % IO
-spec close(connection()) -> ok. % IO
%@doc Close connection.
close ({connection, _Host, VSocket, _}) -> mvar:terminate (VSocket).
close({connection, _Host, VSocket, _}) -> mvar:terminate(VSocket).

-spec is_closed (connection()) -> boolean(). % IO
-spec is_closed(connection()) -> boolean(). % IO
%@doc Has connection been closed?
is_closed ({connection, _, VSocket, _}) -> mvar:is_terminated (VSocket).
is_closed({connection, _, VSocket, _}) -> mvar:is_terminated(VSocket).

-type dbconnection() :: {mongo_protocol:db(), connection()}.

-type failure() :: {connection_failure, connection(), reason()}.

-spec call (dbconnection(), [mongo_protocol:notice()], mongo_protocol:request()) -> mongo_protocol:reply(). % IO throws failure()
-spec call(dbconnection(), [mongo_protocol:notice()], mongo_protocol:request()) -> mongo_protocol:reply(). % IO throws failure()
%@doc Synchronous send and reply. Notices are sent right before request in single block. Exclusive access to connection during entire call.
call ({Db, Conn = {connection, _Host, VSocket, TimeoutMS}}, Notices, Request) ->
{MessagesBin, RequestId} = messages_binary (Db, Notices ++ [Request]),
Call = fun (Socket) ->
tcp_send (Socket, MessagesBin),
<<?get_int32 (N)>> = tcp_recv (Socket, 4, TimeoutMS),
tcp_recv (Socket, N-4, TimeoutMS) end,
try mvar:with (VSocket, Call) of
ReplyBin ->
{RequestId, Reply, <<>>} = mongo_protocol:get_reply (ReplyBin),
Reply % ^ ResponseTo must match RequestId
catch
throw: Reason -> close (Conn), throw ({connection_failure, Conn, Reason});
exit: {noproc, _} -> throw ({connection_failure, Conn, closed}) end.

-spec send (dbconnection(), [mongo_protocol:notice()]) -> ok. % IO throws failure()
call({Db, Conn = {connection, _Host, VSocket, TimeoutMS}}, Notices, Request) ->
{MessagesBin, RequestId} = messages_binary(Db, Notices ++ [Request]),
Call = fun(Socket) ->
tcp_send(Socket, MessagesBin),
<<?get_int32(N)>> = tcp_recv(Socket, 4, TimeoutMS),
tcp_recv(Socket, N - 4, TimeoutMS) end,
try mvar:with(VSocket, Call) of
ReplyBin ->
{RequestId, Reply, <<>>} = mongo_protocol:get_reply(ReplyBin),
Reply % ^ ResponseTo must match RequestId
catch
throw: Reason -> close(Conn), throw({connection_failure, Conn, Reason});
exit: {noproc, _} -> throw({connection_failure, Conn, closed}) end.

-spec send(dbconnection(), [mongo_protocol:notice()]) -> ok. % IO throws failure()
%@doc Asynchronous send (no reply). Don't know if send succeeded. Exclusive access to the connection during send.
send ({Db, Conn = {connection, _Host, VSocket, _}}, Notices) ->
{NoticesBin, _} = messages_binary (Db, Notices),
Send = fun (Socket) -> tcp_send (Socket, NoticesBin) end,
try mvar:with (VSocket, Send)
catch
throw: Reason -> close (Conn), throw ({connection_failure, Conn, Reason});
exit: {noproc, _} -> throw ({connection_failure, Conn, closed}) end.

-spec messages_binary (mongo_protocol:db(), [mongo_protocol:message()]) -> {binary(), mongo_protocol:requestid()}.
send({Db, Conn = {connection, _Host, VSocket, _}}, Notices) ->
{NoticesBin, _} = messages_binary(Db, Notices),
Send = fun(Socket) -> tcp_send(Socket, NoticesBin) end,
try mvar:with(VSocket, Send)
catch
throw: Reason -> close(Conn), throw({connection_failure, Conn, Reason});
exit: {noproc, _} -> throw({connection_failure, Conn, closed}) end.

-spec messages_binary(mongo_protocol:db(), [mongo_protocol:message()]) -> {binary(), mongo_protocol:requestid()}.
%@doc Binary representation of messages
messages_binary (Db, Messages) ->
Build = fun (Message, {Bin, _}) ->
RequestId = mongodb_app:next_requestid(),
MBin = mongo_protocol:put_message (Db, Message, RequestId),
{<<Bin /binary, ?put_int32 (byte_size (MBin) + 4), MBin /binary>>, RequestId} end,
lists:foldl (Build, {<<>>, 0}, Messages).
messages_binary(Db, Messages) ->
Build = fun(Message, {Bin, _}) ->
RequestId = mongodb_app:next_requestid(),
MBin = mongo_protocol:put_message(Db, Message, RequestId),
{<<Bin/binary, ?put_int32(byte_size(MBin) + 4), MBin/binary>>, RequestId} end,
lists:foldl(Build, {<<>>, 0}, Messages).

% Util %

tcp_connect ({Hostname, Port}, TimeoutMS) -> case gen_tcp:connect (Hostname, Port, [binary, {active, false}, {packet, 0}], TimeoutMS) of
{ok, Socket} -> Socket;
{error, Reason} -> throw (Reason) end.
tcp_connect({Hostname, Port}, TimeoutMS) ->
case gen_tcp:connect(Hostname, Port, [binary, {active, false}, {packet, 0}], TimeoutMS) of
{ok, Socket} -> Socket;
{error, Reason} -> throw(Reason) end.

tcp_send (Socket, Binary) -> case gen_tcp:send (Socket, Binary) of
ok -> ok;
{error, Reason} -> throw (Reason) end.
tcp_send(Socket, Binary) -> case gen_tcp:send(Socket, Binary) of
ok -> ok;
{error, Reason} -> throw(Reason) end.

tcp_recv (Socket, N, TimeoutMS) -> case gen_tcp:recv (Socket, N, TimeoutMS) of
{ok, Binary} -> Binary;
{error, Reason} -> throw (Reason) end.
tcp_recv(Socket, N, TimeoutMS) -> case gen_tcp:recv(Socket, N, TimeoutMS) of
{ok, Binary} -> Binary;
{error, Reason} -> throw(Reason) end.
89 changes: 45 additions & 44 deletions src/mongo_cursor.erl
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
%@doc A cursor references pending query results on a server.
% TODO: terminate cursor after idle for 10 minutes.
-module (mongo_cursor).
-module(mongo_cursor).

-export_type ([maybe/1]).
-export_type ([cursor/0, expired/0]).
-export_type([maybe/1]).
-export_type([cursor/0, expired/0]).

-export ([next/1, rest/1, close/1, is_closed/1]). % API
-export ([cursor/4]). % for mongo_query
-export([next/1, rest/1, close/1, is_closed/1]). % API
-export([cursor/4]). % for mongo_query

-include ("mongo_protocol.hrl").
-include("mongo_protocol.hrl").

-type maybe(A) :: {A} | {}.

Expand All @@ -21,56 +21,57 @@
-type env() :: {mongo_connect:dbconnection(), collection(), batchsize()}.
-type batch() :: {cursorid(), [bson:document()]}.

-spec cursor (mongo_connect:dbconnection(), collection(), batchsize(), {cursorid(), [bson:document()]}) -> cursor(). % IO
-spec cursor(mongo_connect:dbconnection(), collection(), batchsize(), {cursorid(), [bson:document()]}) -> cursor(). % IO
%@doc Create new cursor from result batch
cursor (DbConn, Collection, BatchSize, Batch) ->
mvar:new ({{DbConn, Collection, BatchSize}, Batch}, fun finalize/1).
cursor(DbConn, Collection, BatchSize, Batch) ->
mvar:new({{DbConn, Collection, BatchSize}, Batch}, fun finalize/1).

-spec close (cursor()) -> ok. % IO
-spec close(cursor()) -> ok. % IO
%@doc Close cursor
close (Cursor) -> mvar:terminate (Cursor).
close(Cursor) -> mvar:terminate(Cursor).

-spec is_closed (cursor()) -> boolean(). % IO
-spec is_closed(cursor()) -> boolean(). % IO
%@doc Is cursor closed
is_closed (Cursor) -> mvar:is_terminated (Cursor).
is_closed(Cursor) -> mvar:is_terminated(Cursor).

-spec rest (cursor()) -> [bson:document()]. % IO throws expired() & mongo_connect:failure()
-spec rest(cursor()) -> [bson:document()]. % IO throws expired() & mongo_connect:failure()
%@doc Return remaining documents in query result
rest (Cursor) -> case next (Cursor) of
{} -> [];
{Doc} -> [Doc | rest (Cursor)] end.
rest(Cursor) -> case next(Cursor) of
{} -> [];
{Doc} -> [Doc | rest(Cursor)] end.

-spec next (cursor()) -> maybe (bson:document()). % IO throws expired() & mongo_connect:failure()
-spec next(cursor()) -> maybe (bson:document()). % IO throws expired() & mongo_connect:failure()
%@doc Return next document in query result or nothing if finished.
next (Cursor) ->
Next = fun ({Env, Batch}) ->
{Batch1, MDoc} = xnext (Env, Batch),
{{Env, Batch1}, MDoc} end,
try mvar:modify (Cursor, Next)
of {} -> close (Cursor), {}; {Doc} -> {Doc}
catch expired -> close (Cursor), throw ({cursor_expired, Cursor}) end.
next(Cursor) ->
Next = fun({Env, Batch}) ->
{Batch1, MDoc} = xnext(Env, Batch),
{{Env, Batch1}, MDoc} end,
try mvar:modify(Cursor, Next)
of {} -> close(Cursor), {}; {Doc} -> {Doc}
catch expired -> close(Cursor), throw({cursor_expired, Cursor}) end.

-spec xnext (env(), batch()) -> {batch(), maybe (bson:document())}. % IO throws expired & mongo_connect:failure()
-spec xnext(env(), batch()) -> {batch(), maybe (bson:document())}. % IO throws expired & mongo_connect:failure()
%@doc Get next document in cursor, fetching next batch from server if necessary
xnext (Env = {DbConn, Coll, BatchSize}, {CursorId, Docs}) -> case Docs of
[Doc | Docs1] -> {{CursorId, Docs1}, {Doc}};
[] -> case CursorId of
0 -> {{0, []}, {}};
_ ->
Getmore = #getmore {collection = Coll, batchsize = BatchSize, cursorid = CursorId},
Reply = mongo_connect:call (DbConn, [], Getmore),
xnext (Env, batch_reply (Reply)) end end.
xnext(Env = {DbConn, Coll, BatchSize}, {CursorId, Docs}) -> case Docs of
[Doc | Docs1] -> {{CursorId, Docs1}, {Doc}};
[] -> case CursorId of
0 -> {{0, []}, {}};
_ ->
Getmore = #getmore{collection = Coll, batchsize = BatchSize, cursorid = CursorId},
Reply = mongo_connect:call(DbConn, [], Getmore),
xnext(Env, batch_reply(Reply)) end end.

-spec batch_reply (mongo_protocol:reply()) -> batch(). % IO throws expired
-spec batch_reply(mongo_protocol:reply()) -> batch(). % IO throws expired
%@doc Extract next batch of results from reply. Throw expired if cursor not found on server.
batch_reply (#reply {
cursornotfound = CursorNotFound, queryerror = false, awaitcapable = _,
cursorid = CursorId, startingfrom = _, documents = Docs }) -> if
CursorNotFound -> throw (expired);
true -> {CursorId, Docs} end.
batch_reply(#reply{
cursornotfound = CursorNotFound, queryerror = false, awaitcapable = _,
cursorid = CursorId, startingfrom = _, documents = Docs}) -> if
CursorNotFound -> throw(expired);
true -> {CursorId, Docs} end.

-spec finalize (state()) -> ok. % IO. Result ignored
-spec finalize(state()) -> ok. % IO. Result ignored
%@doc Kill cursor on server if not already
finalize ({{DbConn, _, _}, {CursorId, _}}) -> case CursorId of
0 -> ok;
_ -> mongo_connect:send (DbConn, [#killcursor {cursorids = [CursorId]}]) end.
finalize({{DbConn, _, _}, {CursorId, _}}) -> case CursorId of
0 -> ok;
_ ->
mongo_connect:send(DbConn, [#killcursor{cursorids = [CursorId]}]) end.
Loading