diff --git a/include/riakc.hrl b/include/riakc.hrl
index 6f711d7d..ea063deb 100644
--- a/include/riakc.hrl
+++ b/include/riakc.hrl
@@ -24,7 +24,7 @@
-define(PROTO_MAJOR, 1).
-define(PROTO_MINOR, 0).
-define(DEFAULT_PB_TIMEOUT, 60000).
--define(FIRST_RECONNECT_INTERVAL, 100).
+-define(FIRST_RECONNECT_INTERVAL, 10).
-define(MAX_RECONNECT_INTERVAL, 30000).
-type client_option() :: queue_if_disconnected |
@@ -33,7 +33,8 @@
auto_reconnect |
{auto_reconnect, boolean()} |
keepalive |
- {keepalive, boolean()}.
+ {keepalive, boolean()} |
+ {stats, non_neg_integer()}.
%% Options for starting or modifying the connection:
%% `queue_if_disconnected' when present or true will cause requests to
%% be queued while the connection is down. `auto_reconnect' when
diff --git a/src/riakc_pb_socket.erl b/src/riakc_pb_socket.erl
index 59aca82b..fd4344ee 100644
--- a/src/riakc_pb_socket.erl
+++ b/src/riakc_pb_socket.erl
@@ -42,6 +42,8 @@
set_options/2, set_options/3,
is_connected/1, is_connected/2,
ping/1, ping/2,
+ queue_len/1,
+ stats_peek/1, stats_take/1, stats_change_level/2,
get_client_id/1, get_client_id/2,
set_client_id/2, set_client_id/3,
get_server_info/1, get_server_info/2,
@@ -96,6 +98,7 @@
-deprecated({get_index,'_', eventually}).
+-type timeout2() :: timeout() | {timeout(), timeout()}.
-type ctx() :: any().
-type rpb_req() :: {tunneled, msg_id(), binary()} | atom() | tuple().
-type rpb_resp() :: atom() | tuple().
@@ -128,7 +131,7 @@
%% of the same name on the `riakc' application, for example:
%% `application:set_env(riakc, ping_timeout, 5000).'
-record(request, {ref :: reference(), msg :: rpb_req(), from, ctx :: ctx(), timeout :: timeout(),
- tref :: reference() | undefined }).
+ tref :: reference() | undefined , timestamp}).
-ifdef(namespaced_types).
-type request_queue_t() :: queue:queue(#request{}).
@@ -154,6 +157,7 @@
transport = gen_tcp :: 'gen_tcp' | 'ssl',
active :: #request{} | undefined, % active request
queue :: request_queue_t() | undefined, % queue of pending requests
+ queue_len=0 :: non_neg_integer(), % queue size
connects=0 :: non_neg_integer(), % number of successful connects
failed=[] :: [connection_failure()], % breakdown of failed connects
connect_timeout=infinity :: timeout(), % timeout of TCP connection
@@ -165,6 +169,7 @@
% certificate authentication
ssl_opts = [], % Arbitrary SSL options, see the erlang SSL
% documentation.
+ stats,
reconnect_interval=?FIRST_RECONNECT_INTERVAL :: non_neg_integer()}).
%% @private Like `gen_server:call/3', but with the timeout hardcoded
@@ -244,6 +249,22 @@ ping(Pid) ->
ping(Pid, Timeout) ->
call_infinity(Pid, {req, rpbpingreq, Timeout}).
+%% @doc Check how long the queue of requests is
+queue_len(Pid) ->
+ call_infinity(Pid, {check, queue_len}).
+
+%% @doc Have a look at the stats without flushing them
+stats_peek(Pid) ->
+ riakc_stats:stats_format(call_infinity(Pid, stats_peek)).
+
+%% @doc Have a look at the stats and flush them
+stats_take(Pid) ->
+ riakc_stats:stats_format(call_infinity(Pid, stats_take)).
+
+%% @doc Change the stats level
+stats_change_level(Pid, NewLevel) ->
+ riakc_stats:stats_format(call_infinity(Pid, {stats_change_level, NewLevel})).
+
%% @doc Get the client id for this connection
%% @equiv get_client_id(Pid, default_timeout(get_client_id_timeout))
-spec get_client_id(pid()) -> {ok, client_id()} | {error, term()}.
@@ -289,8 +310,10 @@ get(Pid, Bucket, Key) ->
%% @doc Get bucket/key from the server specifying timeout.
%% Will return {error, notfound} if the key is not on the server.
%% @equiv get(Pid, Bucket, Key, Options, Timeout)
--spec get(pid(), bucket(), key(), TimeoutOrOptions::timeout() | get_options()) ->
+-spec get(pid(), bucket(), key(), TimeoutOrOptions::timeout2() | get_options()) ->
{ok, riakc_obj()} | {error, term()} | unchanged.
+get(Pid, Bucket, Key, {T1,T2} = Timeout) when is_integer(T1), is_integer(T2) ->
+ get(Pid, Bucket, Key, [], Timeout);
get(Pid, Bucket, Key, Timeout) when is_integer(Timeout); Timeout =:= infinity ->
get(Pid, Bucket, Key, [], Timeout);
get(Pid, Bucket, Key, Options) ->
@@ -300,7 +323,7 @@ get(Pid, Bucket, Key, Options) ->
%% unchanged
will be returned when the
%% {if_modified, Vclock}
option is specified and the
%% object is unchanged.
--spec get(pid(), bucket(), key(), get_options(), timeout()) ->
+-spec get(pid(), bucket(), key(), get_options(), timeout2()) ->
{ok, riakc_obj()} | {error, term()} | unchanged.
get(Pid, Bucket, Key, Options, Timeout) ->
{T, B} = maybe_bucket_type(Bucket),
@@ -318,8 +341,10 @@ put(Pid, Obj) ->
%% @doc Put the metadata/value in the object under bucket/key with options or timeout.
%% @equiv put(Pid, Obj, Options, Timeout)
%% @see put/4
--spec put(pid(), riakc_obj(), TimeoutOrOptions::timeout() | put_options()) ->
+-spec put(pid(), riakc_obj(), TimeoutOrOptions::timeout2() | put_options()) ->
ok | {ok, riakc_obj()} | riakc_obj() | {ok, key()} | {error, term()}.
+put(Pid, Obj, {T1,T2} = Timeout) when is_integer(T1), is_integer(T2) ->
+ put(Pid, Obj, [], Timeout);
put(Pid, Obj, Timeout) when is_integer(Timeout); Timeout =:= infinity ->
put(Pid, Obj, [], Timeout);
put(Pid, Obj, Options) ->
@@ -336,7 +361,7 @@ put(Pid, Obj, Options) ->
%% `return_body' was specified.
%% @throws siblings
%% @end
--spec put(pid(), riakc_obj(), put_options(), timeout()) ->
+-spec put(pid(), riakc_obj(), put_options(), timeout2()) ->
ok | {ok, riakc_obj()} | riakc_obj() | {ok, key()} | {error, term()}.
put(Pid, Obj, Options, Timeout) ->
Content = riak_pb_kv_codec:encode_content({riakc_obj:get_update_metadata(Obj),
@@ -357,15 +382,17 @@ delete(Pid, Bucket, Key) ->
%% @doc Delete the key/value specifying timeout or options. Note that the rw quorum is deprecated, use r and w.
%% @equiv delete(Pid, Bucket, Key, Options, Timeout)
--spec delete(pid(), bucket(), key(), TimeoutOrOptions::timeout() | delete_options()) ->
+-spec delete(pid(), bucket(), key(), TimeoutOrOptions::timeout2() | delete_options()) ->
ok | {error, term()}.
+delete(Pid, Bucket, Key, {T1,T2} = Timeout) when is_integer(T1), is_integer(T2) ->
+ delete(Pid, Bucket, Key, [], Timeout);
delete(Pid, Bucket, Key, Timeout) when is_integer(Timeout); Timeout =:= infinity ->
delete(Pid, Bucket, Key, [], Timeout);
delete(Pid, Bucket, Key, Options) ->
delete(Pid, Bucket, Key, Options, default_timeout(delete_timeout)).
%% @doc Delete the key/value with options and timeout. Note that the rw quorum is deprecated, use r and w.
--spec delete(pid(), bucket(), key(), delete_options(), timeout()) -> ok | {error, term()}.
+-spec delete(pid(), bucket(), key(), delete_options(), timeout2()) -> ok | {error, term()}.
delete(Pid, Bucket, Key, Options, Timeout) ->
{T, B} = maybe_bucket_type(Bucket),
Req = delete_options(Options, #rpbdelreq{type = T, bucket = B, key = Key}),
@@ -419,7 +446,7 @@ delete_obj(Pid, Obj, Options) ->
%% @doc Delete the riak object with options and timeout.
%% @equiv delete_vclock(Pid, riakc_obj:bucket(Obj), riakc_obj:key(Obj), riakc_obj:vclock(Obj), Options, Timeout)
%% @see delete_vclock/6
--spec delete_obj(pid(), riakc_obj(), delete_options(), timeout()) -> ok | {error, term()}.
+-spec delete_obj(pid(), riakc_obj(), delete_options(), timeout2()) -> ok | {error, term()}.
delete_obj(Pid, Obj, Options, Timeout) ->
delete_vclock(Pid, riakc_obj:bucket(Obj), riakc_obj:key(Obj),
riakc_obj:vclock(Obj), Options, Timeout).
@@ -1136,7 +1163,7 @@ cs_bucket_fold(Pid, Bucket, Opts) when is_pid(Pid), (is_binary(Bucket) orelse
%% @doc Return the default timeout for an operation if none is provided.
%% Falls back to the default timeout.
--spec default_timeout(timeout_name()) -> timeout().
+-spec default_timeout(timeout_name()) -> timeout2().
default_timeout(OpTimeout) ->
case application:get_env(riakc, OpTimeout) of
{ok, EnvTimeout} ->
@@ -1316,33 +1343,44 @@ handle_call(is_connected, _From, State) ->
end;
handle_call({set_options, Options}, _From, State) ->
{reply, ok, parse_options(Options, State)};
+handle_call({check, queue_len}, _From, #state{queue_len = QueueLen} = State) ->
+ {reply, QueueLen, State};
+handle_call(stats_peek, _From, #state{stats = Stats} = State) ->
+ {reply, Stats, State};
+handle_call(stats_take, _From, #state{stats = Stats} = State) ->
+ {reply, Stats, State#state{stats = riakc_stats:init_stats(Stats)}};
+handle_call({stats_change_level, NewLevel}, _From, #state{stats = Stats} = State) ->
+ {reply, Stats, State#state{stats = riakc_stats:init_stats(NewLevel)}};
handle_call(stop, _From, State) ->
- _ = disconnect(State),
- {stop, normal, ok, State}.
+ _ = disconnect(State, true),
+ {stop, normal, ok, State};
+handle_call(get_state, _From, State) ->
+ {reply, State, State}.
%% @private
handle_info({tcp_error, _Socket, Reason}, State) ->
error_logger:error_msg("PBC client TCP error for ~p:~p - ~p\n",
[State#state.address, State#state.port, Reason]),
- disconnect(State);
+ disconnect(State, true);
handle_info({tcp_closed, _Socket}, State) ->
- disconnect(State);
+ disconnect(State, true);
handle_info({ssl_error, _Socket, Reason}, State) ->
error_logger:error_msg("PBC client SSL error for ~p:~p - ~p\n",
[State#state.address, State#state.port, Reason]),
- disconnect(State);
+ disconnect(State, true);
handle_info({ssl_closed, _Socket}, State) ->
- disconnect(State);
+ disconnect(State, true);
%% Make sure the two Sock's match. If a request timed out, but there was
%% a response queued up behind it we do not want to process it. Instead
%% it should drop through and be ignored.
-handle_info({Proto, Sock, Data}, State=#state{sock = Sock, active = Active})
+handle_info({Proto, Sock, Data}, State=#state{sock = Sock, active = Active, stats = Stats0})
when Proto == tcp; Proto == ssl ->
<> = Data,
+ Stats1 = riakc_stats:record_stat(recv, iolist_size(Data), Stats0),
Resp = case Active#request.msg of
{tunneled, _MsgID} ->
%% don't decode tunneled replies, we may not recognize the msgid
@@ -1352,16 +1390,21 @@ handle_info({Proto, Sock, Data}, State=#state{sock = Sock, active = Active})
end,
NewState = case Resp of
#rpberrorresp{} ->
- NewState1 = maybe_reply(on_error(Active, Resp, State)),
+ NewState1 = maybe_reply(on_error(Active, Resp, State#state{stats = Stats1})),
dequeue_request(NewState1#state{active = undefined});
_ ->
- case process_response(Active, Resp, State) of
- {reply, Response, NewState0} ->
+ case process_response(Active, Resp, State#state{stats = Stats1}) of
+ {reply, Response, NewState0 = #state{stats = Stats2}} ->
%% Send reply and get ready for the next request - send the next request
%% if one is queued up
cancel_req_timer(Active#request.tref),
_ = send_caller(Response, NewState0#state.active),
- dequeue_request(NewState0#state{active = undefined});
+ Stats3 =
+ riakc_stats:record_stat(
+ {service_time, op_timeout(Active#request.timeout),
+ op_type(Active#request.msg), bucket_name(Active)},
+ timer:now_diff(os:timestamp(), Active#request.timestamp), Stats2),
+ dequeue_request(NewState0#state{active = undefined, stats = Stats3});
{pending, NewState0} -> %% Request is still pending - do not queue up a new one
NewActive = restart_req_timer(Active),
NewState0#state{active = NewActive}
@@ -1374,19 +1417,20 @@ handle_info({Proto, Sock, Data}, State=#state{sock = Sock, active = Active})
ok = ssl:setopts(Sock, [{active, once}])
end,
{noreply, NewState};
-handle_info({req_timeout, Ref}, State) ->
- case State#state.active of %%
- undefined ->
- {noreply, remove_queued_request(Ref, State)};
- Active ->
- case Ref == Active#request.ref of
- true -> %% Matches the current operation
- NewState = maybe_reply(on_timeout(State#state.active, State)),
- disconnect(NewState#state{active = undefined});
- false ->
- {noreply, remove_queued_request(Ref, State)}
- end
- end;
+handle_info({TimeoutTag, Ref}, #state{active = #request{ref = Ref, msg = Msg, timeout = Timeout},
+ stats = Stats0} = State)
+ when TimeoutTag == op_timeout; TimeoutTag == req_timeout ->
+ NewState = maybe_reply(
+ on_timeout(
+ State#state.active,
+ State#state{stats = riakc_stats:record_cntr(
+ {TimeoutTag, op_timeout(Timeout),
+ op_type(Msg), bucket_name(State#state.active)},
+ Stats0)})),
+ disconnect(NewState#state{active = undefined}, false);
+handle_info({TimeoutTag, Ref}, State)
+ when TimeoutTag == q_timeout; TimeoutTag == req_timeout ->
+ {noreply, remove_queued_request(Ref, State, TimeoutTag)};
handle_info(reconnect, State) ->
case connect(State) of
{ok, NewState} ->
@@ -1394,7 +1438,7 @@ handle_info(reconnect, State) ->
{error, Reason} ->
%% Update the failed count and reschedule a reconnection
NewState = State#state{failed = orddict:update_counter(Reason, 1, State#state.failed)},
- disconnect(NewState)
+ disconnect(NewState, true)
end;
handle_info(_, State) ->
{noreply, State}.
@@ -1415,14 +1459,18 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}.
%% @private
%% Parse options
-parse_options([], State) ->
+parse_options([], #state{stats=Stats0} = State) ->
+ Stats1 = case Stats0 of
+ undefined -> riakc_stats:init_stats(0);
+ _ -> Stats0
+ end,
%% Once all options are parsed, make sure auto_reconnect is enabled
%% if queue_if_disconnected is enabled.
case State#state.queue_if_disconnected of
true ->
- State#state{auto_reconnect = true};
+ State#state{auto_reconnect = true, stats = Stats1};
_ ->
- State
+ State#state{stats = Stats1}
end;
parse_options([{connect_timeout, T}|Options], State) when is_integer(T) ->
parse_options(Options, State#state{connect_timeout = T});
@@ -1449,7 +1497,9 @@ parse_options([{cacertfile, File}|Options], State) ->
parse_options([{keyfile, File}|Options], State) ->
parse_options(Options, State#state{keyfile=File});
parse_options([{ssl_opts, Opts}|Options], State) ->
- parse_options(Options, State#state{ssl_opts=Opts}).
+ parse_options(Options, State#state{ssl_opts=Opts});
+parse_options([{stats, Level}|Options], State) ->
+ parse_options(Options, State#state{stats=riakc_stats:init_stats(Level)}).
maybe_reply({reply, Reply, State}) ->
Request = State#state.active,
@@ -1982,6 +2032,8 @@ create_req_timer(infinity, _Ref) ->
undefined;
create_req_timer(undefined, _Ref) ->
undefined;
+create_req_timer({Msecs,_}, Ref) ->
+ erlang:send_after(Msecs, self(), {q_timeout, Ref});
create_req_timer(Msecs, Ref) ->
erlang:send_after(Msecs, self(), {req_timeout, Ref}).
@@ -2010,14 +2062,18 @@ restart_req_timer(Request) ->
%% @private
%% Connect the socket if disconnected
connect(State) when State#state.sock =:= undefined ->
- #state{address = Address, port = Port, connects = Connects} = State,
+ #state{address = Address, port = Port, connects = Connects, stats = Stats0} = State,
+ TS0 = os:timestamp(),
case gen_tcp:connect(Address, Port,
[binary, {active, once}, {packet, 4},
{keepalive, State#state.keepalive}],
State#state.connect_timeout) of
{ok, Sock} ->
+ TS1 = os:timestamp(),
State1 = State#state{sock = Sock, connects = Connects+1,
- reconnect_interval = ?FIRST_RECONNECT_INTERVAL},
+ reconnect_interval = ?FIRST_RECONNECT_INTERVAL,
+ stats = riakc_stats:record_stat(
+ connect, timer:now_diff(TS1, TS0), Stats0)},
case State#state.credentials of
undefined ->
{ok, State1};
@@ -2089,7 +2145,7 @@ start_auth(State=#state{credentials={User,Pass}, sock=Sock}) ->
%% @private
%% Disconnect socket if connected
-disconnect(State) ->
+disconnect(State, DelayReconnect) ->
%% Tell any pending requests we've disconnected
_ = case State#state.active of
undefined ->
@@ -2109,12 +2165,15 @@ disconnect(State) ->
%% Decide whether to reconnect or exit
NewState = State#state{sock = undefined, active = undefined},
- case State#state.auto_reconnect of
- true ->
+ case {State#state.auto_reconnect, DelayReconnect} of
+ {true, true} ->
%% Schedule the reconnect message and return state
erlang:send_after(State#state.reconnect_interval, self(), reconnect),
{noreply, increase_reconnect_interval(NewState)};
- false ->
+ {true, false} ->
+ self() ! reconnect,
+ {noreply, NewState};
+ {false, _} ->
{stop, disconnected, NewState}
end.
@@ -2130,16 +2189,29 @@ increase_reconnect_interval(State) ->
%% Send a request to the server and prepare the state for the response
%% @private
-send_request(Request0, State) when State#state.active =:= undefined ->
- {Request, Pkt} = encode_request_message(Request0),
+
+send_request(#request{ref = Ref,
+ tref = TRef,
+ timeout = Timeout} = Request0,
+ State = #state{stats = Stats0})
+ when State#state.active =:= undefined ->
+ {Request1, Pkt} = encode_request_message(Request0#request{timestamp = os:timestamp()}),
+ Stats1 = riakc_stats:record_stat(send, iolist_size(Pkt), Stats0),
Transport = State#state.transport,
case Transport:send(State#state.sock, Pkt) of
ok ->
- maybe_reply(after_send(Request, State#state{active = Request}));
+ case Timeout of
+ {_,Msecs} ->
+ cancel_req_timer(TRef),
+ Request2 = Request1#request{tref = erlang:send_after(Msecs, self(), {op_timeout, Ref})},
+ maybe_reply(after_send(Request2, State#state{active = Request2, stats = Stats1}));
+ _ ->
+ maybe_reply(after_send(Request1, State#state{active = Request1, stats = Stats1}))
+ end;
{error, Reason} ->
error_logger:warning_msg("Socket error while sending riakc request: ~p.", [Reason]),
Transport:close(State#state.sock),
- maybe_enqueue_and_reconnect(Request, State#state{sock=undefined})
+ maybe_enqueue_and_reconnect(Request1, State#state{sock=undefined, stats = Stats1})
end.
%% Already encoded (for tunneled messages), but must provide Message Id
@@ -2164,37 +2236,53 @@ maybe_reconnect(_) -> ok.
%% If we can queue while disconnected, do so, otherwise tell the
%% caller that the socket was disconnected.
enqueue_or_reply_error(Request, #state{queue_if_disconnected=true}=State) ->
- queue_request(Request, State);
+ case Request#request.timeout of
+ {_,_} -> send_caller({error, timeout}, Request); % we've already used part of the op timeout
+ _ -> queue_request_head(Request, State)
+ end;
enqueue_or_reply_error(Request, State) ->
_ = send_caller({error, disconnected}, Request),
State.
%% Queue up a request if one is pending
%% @private
-queue_request(Request, State) ->
- State#state{queue = queue:in(Request, State#state.queue)}.
-
+queue_request(Request, State) -> queue_request(Request, State, in).
+queue_request_head(Request, State) -> queue_request(Request, State, in_r).
+queue_request(Request0, #state{queue_len = QLen, queue = Q, stats = Stats0} = State, Infunc) ->
+ Request1 = Request0#request{timestamp = os:timestamp()},
+ Stats1 = riakc_stats:record_cntr({queue_len, QLen + 1}, Stats0),
+ State#state{queue_len = QLen + 1, queue = queue:Infunc(Request1, Q), stats = Stats1}.
%% Try and dequeue request and send onto the server if one is waiting
%% @private
-dequeue_request(State) ->
+dequeue_request(#state{queue_len = QLen, stats = Stats0} = State) ->
case queue:out(State#state.queue) of
{empty, _} ->
- State;
- {{value, Request}, Q2} ->
- send_request(Request, State#state{queue = Q2})
+ State#state{active = undefined};
+ {{value, #request{timestamp = TS0, timeout = Timeout} = Request}, Q2} ->
+ Now = os:timestamp(),
+ Stats1 = riakc_stats:record_stat(
+ {queue_time, q_timeout(Timeout)}, timer:now_diff(Now, TS0), Stats0),
+ send_request(Request#request{timestamp = Now},
+ State#state{active = undefined,
+ queue_len = QLen - 1,
+ queue = Q2,
+ stats = Stats1})
end.
%% Remove a queued request by reference - returns same queue if ref not present
%% @private
-remove_queued_request(Ref, State) ->
- L = queue:to_list(State#state.queue),
- case lists:keytake(Ref, #request.ref, L) of
+remove_queued_request(Ref, #state{queue_len = QLen, queue = Q, stats = Stats0} = State, TimeoutTag) ->
+ case lists:keytake(Ref, #request.ref, queue:to_list(Q)) of
false -> % Ref not queued up
State;
- {value, Req, L2} ->
+ {value, #request{timeout = Timeout} = Req, L2} ->
{reply, Reply, NewState} = on_timeout(Req, State),
_ = send_caller(Reply, Req),
- NewState#state{queue = queue:from_list(L2)}
+ NewState#state{queue_len = QLen - 1,
+ queue = queue:from_list(L2),
+ stats = riakc_stats:record_cntr(
+ {TimeoutTag, q_timeout(Timeout), bucket_name(Req)},
+ Stats0)}
end.
%% @private
@@ -2357,6 +2445,50 @@ set_index_create_req_timeout(Timeout, Req) when Timeout =:= infinity ->
set_index_create_req_timeout(Timeout, _Req) when not is_integer(Timeout) ->
erlang:error(badarg).
+op_type(rpbpingreq ) -> ping;
+op_type(rpbgetclientidreq ) -> get_client_id;
+op_type(rpbgetserverinforeq ) -> get_server_info;
+op_type({tunneled,_} ) -> tunneled;
+op_type(#dtfetchreq{} ) -> dt_fetch;
+op_type(#dtupdatereq{} ) -> dt_update;
+op_type(#rpbcountergetreq{} ) -> counter_get;
+op_type(#rpbcounterupdatereq{} ) -> counter_update;
+op_type(#rpbcsbucketreq{} ) -> cs_bucket;
+op_type(#rpbdelreq{} ) -> del;
+op_type(#rpbgetbucketreq{} ) -> get_bucket;
+op_type(#rpbgetbuckettypereq{} ) -> get_bucket_type;
+op_type(#rpbgetreq{} ) -> get;
+op_type(#rpbindexreq{} ) -> index;
+op_type(#rpblistbucketsreq{} ) -> list_buckets;
+op_type(#rpblistkeysreq{} ) -> list_keys;
+op_type(#rpbmapredreq{} ) -> mapred;
+op_type(#rpbputreq{} ) -> put;
+op_type(#rpbresetbucketreq{} ) -> reset_bucket;
+op_type(#rpbsearchqueryreq{} ) -> search_query;
+op_type(#rpbsetbucketreq{} ) -> set_bucket;
+op_type(#rpbsetbuckettypereq{} ) -> set_bucket_type;
+op_type(#rpbsetclientidreq{} ) -> set_client_id;
+op_type(#rpbyokozunaindexdeletereq{}) -> yokozuna_index_delete;
+op_type(#rpbyokozunaindexgetreq{} ) -> yokozuna_index_get;
+op_type(#rpbyokozunaindexputreq{} ) -> yokozuna_index_put;
+op_type(#rpbyokozunaschemagetreq{} ) -> yokozuna_schema_get;
+op_type(#rpbyokozunaschemaputreq{} ) -> yokozuna_schema_put;
+op_type(_ ) -> unknown_op.
+
+op_timeout({_,OPTimeout}) -> OPTimeout;
+op_timeout(Timeout) -> Timeout.
+
+q_timeout({QTimeout,_}) -> QTimeout;
+q_timeout(Timeout) -> Timeout.
+
+bucket_name(Req) ->
+ case Req#request.msg of
+ #rpbgetreq{bucket = B} -> B;
+ #rpbputreq{bucket = B} -> B;
+ #rpbdelreq{bucket = B} -> B;
+ rpbpingreq -> ping;
+ _ -> other_bucket
+ end.
%% ====================================================================
%% unit tests
@@ -3921,4 +4053,380 @@ live_node_tests() ->
end)}
].
+timeout_no_conn_test() ->
+ % test req & {q, op} timeouts when there's no connection
+
+ % test requests have to be spawned, so there's some variance in the reponse times
+ % this makes it difficult to decide on the acceptable tolerance for the pass/fail decision
+ % so the test is variable, it may fail the first time but try it again
+
+ {ok, Pid} = start_link(test_ip(), 65225, [auto_reconnect, queue_if_disconnected]),
+ Self = self(),
+ REQ = fun(Func, TO) ->
+ erlang:spawn(fun() ->
+ {T,Info} = (catch timer:tc(?MODULE, Func,
+ [Pid, <<"qwer">>, <<"qwer">>, [], TO])),
+ Self ! {self(), {T div 1000, Info}}
+ end)
+ end,
+
+ RES = fun(RPid) ->
+ receive
+ {RPid, Result} -> Result
+ after
+ 3000 -> error
+ end
+ end,
+
+ P01 = REQ(get, {1500,100}), timer:sleep(1), % sleep to ensure that spawned requests
+ P02 = REQ(get, {1000,100}), timer:sleep(1), % are actually sent in this order
+ P03 = REQ(get, {1500,100}), timer:sleep(1),
+ P04 = REQ(get, {1000,100}), timer:sleep(1),
+ P05 = REQ(get, {1500,100}), timer:sleep(1),
+ P06 = REQ(get, {1000,100}), timer:sleep(1),
+ P07 = REQ(get, {1500,100}), timer:sleep(1),
+ P08 = REQ(get, {1000,100}), timer:sleep(1),
+ P09 = REQ(get, {1500,100}), timer:sleep(1),
+ P10 = REQ(get, 200), timer:sleep(1),
+ P11 = REQ(get, 400), timer:sleep(1),
+ P12 = REQ(get, 600), timer:sleep(1),
+ P13 = REQ(get, 800), timer:sleep(1),
+ P14 = REQ(get, 200), timer:sleep(1),
+ P15 = REQ(get, 1000), timer:sleep(2500), 0 = queue_len(Pid),
+ P16 = REQ(get, {200,1000}), timer:sleep(1),
+ P17 = REQ(get, {200,1000}),
+
+ {T01, {error, timeout}} = RES(P01),
+ {T02, {error, timeout}} = RES(P02),
+ {T03, {error, timeout}} = RES(P03),
+ {T04, {error, timeout}} = RES(P04),
+ {T05, {error, timeout}} = RES(P05),
+ {T06, {error, timeout}} = RES(P06),
+ {T07, {error, timeout}} = RES(P07),
+ {T08, {error, timeout}} = RES(P08),
+ {T09, {error, timeout}} = RES(P09),
+ {T10, {error, timeout}} = RES(P10),
+ {T11, {error, timeout}} = RES(P11),
+ {T12, {error, timeout}} = RES(P12),
+ {T13, {error, timeout}} = RES(P13),
+ {T14, {error, timeout}} = RES(P14),
+ {T15, {error, timeout}} = RES(P15),
+ {T16, {error, timeout}} = RES(P16),
+ {T17, {error, timeout}} = RES(P17),
+
+ % All these requests should spend ~1500ms in the queue
+ io:format(user, "1500ms TIMES: ~p ~p ~p ~p ~p~n", [T01,T03,T05,T07,T09]),
+ lists:foreach(fun(T) -> true = T > 1490, true = T < 1515 end, [T01,T03,T05,T07,T09]),
+
+ % All these requests should spend ~1000ms in the queue
+ io:format(user, "1000ms TIMES: ~p ~p ~p ~p~n", [T02,T04,T06,T08]),
+ lists:foreach(fun(T) -> true = T > 990, true = T < 1015 end, [T02,T04,T06,T08]),
+
+ % These test the old timeouts
+ io:format(user, "TIMES: ~p ~p ~p ~p ~p ~p ~p ~p~n", [T10,T11,T12,T13,T14,T15,T16,T17]),
+ true = T10 > 190, true = T10 < 215,
+ true = T11 > 390, true = T11 < 415,
+ true = T12 > 590, true = T12 < 615,
+ true = T13 > 790, true = T13 < 815,
+ true = T14 > 190, true = T14 < 215,
+ true = T15 > 990, true = T15 < 1015,
+
+ % These 2 requests should spend ~200ms in the queue
+ true = T16 > 190, true = T16 < 215,
+ true = T17 > 190, true = T17 < 215,
+
+ stop(Pid).
+
+timeout_conn_test() ->
+ % test req & {q, op} timeouts when there is a connection that never responds
+
+ % test requests have to be spawned, so there's some variance in the reponse times
+ % this makes it difficult to decide on the acceptable tolerance for the pass/fail decision
+ % so the test is variable, it may fail the first time but try it again
+
+ % Set up a dummy socket to send requests on
+ {ok, DummyServerPid, Port} = dummy_server(noreply),
+ {ok, Pid} = start("127.0.0.1", Port, [auto_reconnect, queue_if_disconnected]),
+ erlang:monitor(process, DummyServerPid),
+ Self = self(),
+ REQ = fun(Func, TO) ->
+ erlang:spawn(fun() ->
+ {T,Info} = (catch timer:tc(?MODULE, Func,
+ [Pid, <<"qwer">>, <<"qwer">>, [], TO])),
+ Self ! {self(), {T div 1000, Info}}
+ end)
+ end,
+
+ RES = fun(RPid) ->
+ receive
+ {RPid, Result} -> Result
+ after
+ 3000 -> error
+ end
+ end,
+
+ P01 = REQ(get, {1000, 200}), timer:sleep(1), % sleep to ensure that spawned requests
+ P02 = REQ(get, {1000, 200}), timer:sleep(1), % are actually sent in this order
+ P03 = REQ(get, {1000, 200}), timer:sleep(1),
+ P04 = REQ(get, {1000, 200}), timer:sleep(1),
+ P05 = REQ(get, {1000, 200}), timer:sleep(1),
+ P06 = REQ(get, {1000, 200}), timer:sleep(1),
+ P07 = REQ(get, {1000, 200}), timer:sleep(1),
+ P08 = REQ(get, {1000, 200}), timer:sleep(1),
+ P09 = REQ(get, {1000, 200}), timer:sleep(1),
+ P10 = REQ(get, {1000, 200}), timer:sleep(1),
+ P11 = REQ(get, 200), timer:sleep(1),
+ P12 = REQ(get, 400), timer:sleep(1),
+ P13 = REQ(get, 600), timer:sleep(1),
+ P14 = REQ(get, 800), timer:sleep(1),
+ P15 = REQ(get, 200), timer:sleep(1),
+ P16 = REQ(get, 1000), timer:sleep(2000), 0 = queue_len(Pid),
+ P17 = REQ(get, {200, 1000}), timer:sleep(1),
+ P18 = REQ(get, {200, 1000}),
+
+ {T01, {error, timeout}} = RES(P01),
+ {T02, {error, timeout}} = RES(P02),
+ {T03, {error, timeout}} = RES(P03),
+ {T04, {error, timeout}} = RES(P04),
+ {T05, {error, timeout}} = RES(P05),
+ {T06, {error, timeout}} = RES(P06),
+ {T07, {error, timeout}} = RES(P07),
+ {T08, {error, timeout}} = RES(P08),
+ {T09, {error, timeout}} = RES(P09),
+ {T10, {error, timeout}} = RES(P10),
+ {T11, {error, timeout}} = RES(P11),
+ {T12, {error, timeout}} = RES(P12),
+ {T13, {error, timeout}} = RES(P13),
+ {T14, {error, timeout}} = RES(P14),
+ {T15, {error, timeout}} = RES(P15),
+ {T16, {error, timeout}} = RES(P16),
+ {T17, {error, timeout}} = RES(P17),
+ {T18, {error, timeout}} = RES(P18),
+
+ catch DummyServerPid ! stop,
+ timer:sleep(10),
+ receive _Msg -> ok
+ after 1 -> ok
+ end,
+
+ io:format(user, "TIMES: ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p ~p~n",
+ [T01,T02,T03,T04,T05,T06,T07,T08,T09,T10,T11,T12,T13,T14,T15,T16,T17,T18]),
+ true = T01 > 190, true = T01 < 215, % This one is serviced right away & should timeout after 200ms
+ true = T02 > 390, true = T02 < 415, % This one is serviced after the 1st one has timed out, ~400ms
+ true = T03 > 590, true = T03 < 615, % This one is serviced after the 1st two have timed out, ~600ms
+ true = T04 > 790, true = T04 < 815, % This one is serviced after the 1st three have timed out, ~800ms
+ true = T05 > 990, true = T05 < 1015, % This one is serviced after the 1st four have timed out, ~1000ms
+
+ [HD|TL] = lists:reverse(lists:sort([T06,T07,T08,T09,T10])),
+ % One will have queued & been serviced, ~1200ms
+ true = HD > 1190, true = HD < 1215,
+ % All these will timeout in the queue, ~1000ms
+ lists:foreach(fun(T) -> true = T > 990, true = T < 1015 end, TL),
+
+ % These test for backward compatibility
+ true = T11 > 190, true = T11 < 215,
+ true = T12 > 390, true = T12 < 415,
+ true = T13 > 590, true = T13 < 615,
+ true = T14 > 790, true = T14 < 815,
+ true = T15 > 190, true = T15 < 215,
+ true = T16 > 990, true = T16 < 1015,
+
+ true = T17 > 990, true = T17 < 1015, % This one will be serviced right away & timeout after ~100ms
+ true = T18 > 190, true = T18 < 215, % This one will timeout in the queue waiting for the previous one ~20ms
+
+ stop(Pid).
+
+stats_demo_test() ->
+ % Set up a dummy socket to send requests on
+ {ok, DummyServerPid, Port} = dummy_server(noreply),
+ {ok, Pid} = start("127.0.0.1", Port, [auto_reconnect, queue_if_disconnected]),
+ erlang:monitor(process, DummyServerPid),
+ % {ok, Pid} = start_link(test_ip(), test_port(), [auto_reconnect, queue_if_disconnected]),
+ Self = self(),
+
+ REQ = fun(Func, TO) ->
+ erlang:spawn(fun() ->
+ {T,Info} = (catch timer:tc(?MODULE, Func,
+ [Pid, <<"qwer">>, <<"qwer">>, [], TO])),
+ % io:format(user, "RES: ~p~n", [{T,Info}]),
+ Self ! {self(), {T div 1000, Info}}
+ end)
+ end,
+
+ RES = fun(RPid) ->
+ receive
+ {RPid, Result} -> Result
+ after
+ 3000 -> error
+ end
+ end,
+
+ Traffic = fun() ->
+ P01 = REQ(get, {100, 20}),
+ P02 = REQ(get, {100, 20}),
+ P03 = REQ(get, {100, 20}),
+ P04 = REQ(get, {100, 20}),
+ P05 = REQ(get, {100, 20}),
+ P06 = REQ(get, {100, 20}),
+ P07 = REQ(get, {100, 20}),
+ P08 = REQ(get, {100, 20}),
+ P09 = REQ(get, {100, 20}),
+ P10 = REQ(get, {100, 20}),
+ P11 = REQ(get, 20),
+ P12 = REQ(get, 40),
+ P13 = REQ(get, 60),
+ P14 = REQ(get, 80),
+ P15 = REQ(get, 20),
+ P16 = REQ(get, 100),
+ P17 = REQ(get, {20, 100}),
+ P18 = REQ(get, {20, 100}),
+
+ lists:foreach(
+ fun(P) -> RES(P) end,
+ [P01,P02,P03,P04,P05,P06,P07,P08,P09,P10,
+ P11,P12,P13,P14,P15,P16,P17,P18])
+ end,
+
+ Traffic(),
+ {_,[],[_]} = stats_take(Pid),
+
+ stats_change_level(Pid, 1),
+ Traffic(),
+ {_,CL1,HL1} = stats_take(Pid),
+
+ stats_change_level(Pid, 2),
+ Traffic(),
+ {_,CL2,HL2} = stats_take(Pid),
+
+ % io:format(user, "~n~n~p~n~n~p~n~n", [CL2, CL1]),
+ % io:format(user, "~n~n~p~n~n~p~n~n", [HL2, HL1]),
+ true = length(CL1) >= 2,
+ true = length(CL2) >= 2,
+ true = length(HL1) >= 2,
+ true = length(HL2) >= 2,
+
+ lists:foreach(fun({_,_,_,[]}) -> ok end, HL1),
+ lists:foreach(fun({_,_,_,L}) -> true = length(L) >= 10 end, HL2),
+
+ catch DummyServerPid ! stop,
+ timer:sleep(10),
+ receive _Msg -> ok
+ after 1 -> ok
+ end,
+
+ stop(Pid).
+
+overload_demo_test() ->
+ {ok, DummyServerPid, Port} = dummy_server({5, <<10>>}),
+ {ok, Pid} = start("127.0.0.1", Port, [auto_reconnect, queue_if_disconnected, {stats,1}]),
+ timer:sleep(50),
+ erlang:monitor(process, DummyServerPid),
+
+ Self = self(),
+
+ REQ = fun(Func, TO) ->
+ erlang:spawn(fun() ->
+ Info = (catch apply(?MODULE, Func,
+ [Pid, <<"qwer">>, <<"qwer">>, [], TO])),
+ Self ! {self(), Info}
+ end)
+ end,
+
+ RES = fun(RPid) ->
+ receive
+ {RPid, Result} -> Result
+ after
+ 4000 -> error
+ end
+ end,
+
+ TEST = fun(TO) ->
+
+ PidList = lists:foldl(fun(_,Acc) ->
+ timer:sleep(4),
+ [REQ(get, TO) | Acc]
+ end, [], lists:seq(1,200)),
+ timer:sleep(20),
+
+ ReplyList = lists:foldl(fun(RPid,Acc) ->
+ [RES(RPid) | Acc]
+ end, [], PidList),
+
+ {_,_,KL} = stats_take(Pid),
+ Reconns = case lists:keyfind(connect, 1, KL) of
+ false -> 0;
+ {_,V,_,_} -> V
+ end,
+ Replies = lists:foldl(
+ fun({error,Reply},Acc) ->
+ case lists:keyfind(Reply, 1, Acc) of
+ false -> [{Reply, 1} | Acc];
+ {Reply, C} ->
+ lists:keyreplace(Reply, 1, Acc, {Reply, C+1})
+ end
+ end, [], ReplyList),
+ TimeOuts = case lists:keyfind(timeout, 1, Replies) of
+ {_,TOV} -> TOV;
+ false -> 0
+ end,
+ NotFounds = case lists:keyfind(notfound, 1, Replies) of
+ {_,NFV} -> NFV;
+ false -> 0
+ end,
+
+ % io:format(user, "~nSTATS: ~p~n", [Stats])
+ io:format(user, " With timeout: ~p we got ~p reconnections, ~p timeouts and ~p replies~n",
+ [TO, Reconns, TimeOuts, NotFounds]),
+ {TimeOuts, NotFounds}
+
+
+ end,
+
+ {OldTO, OldNF} = TEST(9),
+ {NewTO, NewNF} = TEST({1,8}),
+
+ catch DummyServerPid ! stop,
+ timer:sleep(10),
+ receive _Msg -> ok % io:format(user, "MSG: ~p~n", [_Msg])
+ after 1 -> ok % io:format(user, "NO MSG: ~p~n", [process_info(DummyServerPid, messages)])
+ end,
+
+ % the worst-case response time for both timeout mechanisms is the same, 60ms
+ % using the old mechanism we typically get 2 or 3 responses
+ % and the link timesout as long as it remains overloaded
+ true = OldTO > 170,
+ true = OldNF < 30,
+ % but using the new mechanism we get get an almost 66% success rate
+ % consistent during the overloaded state
+ true = NewTO < 80,
+ true = NewNF > 120,
+
+ stop(Pid).
+
+dummy_server(Directive) ->
+ {ok, Listen} = gen_tcp:listen(0, [binary, {packet, 4}, {active, true}]),
+ {ok, Port} = inet:port(Listen),
+ Pid = spawn(?MODULE, dummy_server_loop, [{Listen, no_conn, Directive}]),
+ {ok, Pid, Port}.
+
+dummy_server_loop({Listen, no_conn, Directive}) ->
+ % case Directive of
+ % {SleepMs, _} -> timer:sleep(SleepMs div 2);
+ % _ -> ok
+ % end,
+ {ok, Sock} = gen_tcp:accept(Listen),
+ dummy_server_loop({Listen, Sock, Directive});
+dummy_server_loop({Listen, Sock, Directive}) ->
+ receive
+ stop -> ok;
+ {tcp_closed, Sock} -> dummy_server_loop({Listen, no_conn, Directive});
+ _Data ->
+ case Directive of
+ noreply -> dummy_server_loop({Listen, Sock, Directive}); % ignore requests, let them timeout
+ {SleepMs, Reply} ->
+ spawn(fun() -> timer:sleep(SleepMs), gen_tcp:send(Sock, Reply) end),
+ dummy_server_loop({Listen, Sock, Directive})
+ end
+ end.
+
-endif.
diff --git a/src/riakc_stats.erl b/src/riakc_stats.erl
new file mode 100644
index 00000000..e3b02ec3
--- /dev/null
+++ b/src/riakc_stats.erl
@@ -0,0 +1,230 @@
+-module(riakc_stats).
+-export([stats_format/1,
+ init_stats/1,
+ record_stat/3,
+ record_cntr/2,
+ merge_stats/2,
+ print/1]).
+
+-export([time/1,
+ bytes/1]).
+
+-record(stats, {timestamp, level, dict}).
+
+record_cntr(_Key, #stats{level = 0} = Stats) ->
+ Stats;
+record_cntr(Key, #stats{dict = D} = Stats) ->
+ Stats#stats{dict = dict:update_counter({count, Key}, 1, D)}.
+
+record_stat(_Key, _Val, #stats{level = 0} = Stats) ->
+ Stats;
+record_stat(Key, Val, #stats{dict = Dict0, level = 1} = Stats) ->
+ Dict1 = dict:update_counter({count, Key}, 1, Dict0),
+ Stats#stats{dict = dict:update_counter({total, Key}, Val, Dict1)};
+record_stat(Key, Val, #stats{dict = Dict0, level = 2} = Stats) ->
+ Dict1 = dict:update_counter({count, Key}, 1, Dict0),
+ Dict2 = dict:update_counter({total, Key}, Val, Dict1),
+ Stats#stats{dict = dict:update_counter({histogram, granulate(Val), Key}, 1, Dict2)}.
+
+init_stats(#stats{level = Level}) ->
+ init_stats(Level);
+init_stats(Level) ->
+ #stats{timestamp = os:timestamp(), level = Level, dict = dict:new()}.
+
+stats_format(#stats{timestamp = TS0, level = Level, dict = Dict}) ->
+ TS1 = os:timestamp(),
+ TDiff = timer:now_diff(TS1, TS0),
+ {Cntrs, Hists} = stats_format(
+ Level, lists:sort(dict:to_list(Dict)), [],
+ [{key, count, total, lists:reverse(steps(Level))}]),
+ {{TDiff, 1}, Cntrs, Hists}.
+
+stats_format(_Level, [], CAcc, HAcc) -> {CAcc, HAcc};
+stats_format(Level, [{{count, Key}, CVal} | List], CAcc, HAcc) ->
+ case lists:keytake({total, Key}, 1, List) of
+ false -> stats_format(Level, List, [{Key, CVal} | CAcc], HAcc);
+ {value, {{total, Key}, TVal}, NewList} ->
+ {OutList, Histogram} =
+ lists:foldl(fun(I, {LAcc, XAcc}) ->
+ case lists:keytake({histogram, I, Key}, 1, LAcc) of
+ false -> {LAcc, [0 | XAcc]};
+ {value, {{_, _, Key}, HVal}, LAcc2} -> {LAcc2, [HVal | XAcc]}
+ end
+ end, {NewList, []}, steps(Level)),
+ stats_format(Level, OutList, CAcc, [{Key, CVal, TVal, Histogram} | HAcc])
+ end.
+
+granulate(0) -> 0;
+granulate(1) -> 1;
+granulate(2) -> 2;
+granulate(N) when N =< 4 -> 4;
+granulate(N) when N =< 7 -> 7;
+granulate(N) when N =< 10 -> 10;
+granulate(N) when N =< 20 -> 20;
+granulate(N) when N =< 40 -> 40;
+granulate(N) when N =< 70 -> 70;
+granulate(N) when N =< 100 -> 100;
+granulate(N) when N =< 200 -> 200;
+granulate(N) when N =< 400 -> 400;
+granulate(N) when N =< 700 -> 700;
+granulate(N) when N =< 1000 -> 1000;
+granulate(N) when N =< 2000 -> 2000;
+granulate(N) when N =< 4000 -> 4000;
+granulate(N) when N =< 7000 -> 7000;
+granulate(N) when N =< 10000 -> 10000;
+granulate(N) when N =< 20000 -> 20000;
+granulate(N) when N =< 40000 -> 40000;
+granulate(N) when N =< 70000 -> 70000;
+granulate(N) when N =< 100000 -> 100000;
+granulate(N) when N =< 200000 -> 200000;
+granulate(N) when N =< 400000 -> 400000;
+granulate(N) when N =< 700000 -> 700000;
+granulate(N) when N =< 1000000 -> 1000000;
+granulate(N) when N =< 2000000 -> 2000000;
+granulate(N) when N =< 4000000 -> 4000000;
+granulate(N) when N =< 7000000 -> 7000000;
+granulate(N) when N =< 10000000 -> 10000000;
+granulate(N) when N =< 20000000 -> 20000000;
+granulate(N) when N =< 40000000 -> 40000000;
+granulate(N) when N =< 70000000 -> 70000000;
+granulate(N) when N =< 100000000 -> 100000000;
+granulate(N) when N =< 200000000 -> 200000000;
+granulate(N) when N =< 400000000 -> 400000000;
+granulate(N) when N =< 700000000 -> 700000000;
+granulate(_) -> 1000000000.
+
+steps(2) ->
+ [1000000000,
+ 700000000, 400000000, 200000000, 100000000,
+ 70000000, 40000000, 20000000, 10000000,
+ 7000000, 4000000, 2000000, 1000000,
+ 700000, 400000, 200000, 100000,
+ 70000, 40000, 20000, 10000,
+ 7000, 4000, 2000, 1000,
+ 700, 400, 200, 100,
+ 70, 40, 20, 10,
+ 7, 4, 2, 1,
+ 0];
+steps(_) -> [].
+
+merge_stats({{TAcc1, TCnt1}, Cntrs1, HistG1}, {{TAcc2, TCnt2}, Cntrs2, HistG2}) ->
+ {{TAcc1 + TAcc2, TCnt1 + TCnt2}, add_cntrs(Cntrs1, Cntrs2, []), add_hists(HistG1, HistG2, [])}.
+
+add_cntrs([], [], Acc) -> Acc;
+add_cntrs([], Cntrs2, Acc) -> lists:append([Cntrs2, Acc]);
+add_cntrs(Cntrs1, [], Acc) -> lists:append([Cntrs1, Acc]);
+add_cntrs([{Cntr, Val1} = Cntr1 | Cntrs1], Cntrs2, Acc) ->
+ case lists:keytake(Cntr, 1, Cntrs2) of
+ false ->
+ add_cntrs(Cntrs1, Cntrs2, [Cntr1 | Acc]);
+ {value, {Cntr, Val2}, NewCntrs2} ->
+ add_cntrs(Cntrs1, NewCntrs2, [{Cntr, Val1 + Val2} | Acc])
+ end.
+
+add_hists([], [], Acc) -> Acc;
+add_hists([], Hists2, Acc) -> lists:append([Hists2, Acc]);
+add_hists(Hists1, [], Acc) -> lists:append([Hists1, Acc]);
+add_hists([{key, _, _, _} = HistRec | Hists1], Hists2, Acc) ->
+ add_hists(Hists1, lists:keydelete(key, 1, Hists2), [HistRec | Acc]);
+add_hists([{Key, Cntr1, Ttl1, Hist1} = HistRec1 | Hists1], Hists2, Acc) ->
+ case lists:keytake(Key, 1, Hists2) of
+ false ->
+ add_hists(Hists1, Hists2, [HistRec1 | Acc]);
+ {value, {Key, Cntr2, Ttl2, Hist2}, NewHists2} ->
+ add_hists(Hists1, NewHists2, [{Key, Cntr1 + Cntr2, Ttl1 + Ttl2, add_hist(Hist1, Hist2, [])} | Acc])
+ end.
+
+add_hist([], [], Acc) -> lists:reverse(Acc);
+add_hist([H1 | Hist1], [H2 | Hist2], Acc) ->
+ add_hist(Hist1, Hist2, [H1 + H2 | Acc]).
+
+print({{Time, Conns}, Cntrs, Hists}) ->
+ io:format(user,
+ "~n~nRiak Connection Stats"
+ "~n connections established: ~p"
+ "~n stat monitoring period: " ++ time(Time div Conns) ++ "~n",
+ [Conns]),
+ lists:foreach(fun({Key, Cnt}) ->
+ print_cntr_key(Key, Cnt)
+ end, Cntrs),
+ {value,{key,_,_,KeyDivs}, RestHists} =
+ lists:keytake(key, 1, Hists),
+ lists:foreach(fun({Key, Cnt, Ttl, Divs}) ->
+ TypeFunc = print_hist_key(Key, Cnt, Ttl),
+ print_hist(TypeFunc, 0, Divs, KeyDivs)
+ end, RestHists),
+ io:format(user, "~n~n", []).
+
+print_hist_key(send, Cnt, Ttl) ->
+ io:format(user,
+ "~n~n data sent: " ++ bytes(Ttl) ++
+ "~n packets: ~p"
+ "~n average packet size: " ++ bytes(Ttl div Cnt),
+ [Cnt]),
+ bytes;
+print_hist_key(recv, Cnt, Ttl) ->
+ io:format(user,
+ "~n~n data received: " ++ bytes(Ttl) ++
+ "~n packets: ~p"
+ "~n average packet size: " ++ bytes(Ttl div Cnt),
+ [Cnt]),
+ bytes;
+print_hist_key(connect, Cnt, Ttl) ->
+ io:format(user,
+ "~n~n reconnections: ~p"
+ "~n average reconnect time: " ++ time(Ttl div Cnt),
+ [Cnt]),
+ time;
+print_hist_key({service_time, OpTimeout, OpType, BucketName}, Cnt, Ttl) ->
+ io:format(user,
+ "~n~n ~p ~p operations on bucket: ~p"
+ "~n with timeout: ~p"
+ "~n average service time: " ++ time(Ttl div Cnt),
+ [Cnt, OpType, BucketName, OpTimeout]),
+ time;
+print_hist_key({queue_time, QTimeout}, Cnt, Ttl) ->
+ io:format(user,
+ "~n~n ~p requests got queued with timeout: ~p"
+ "~n average queueing time: " ++ time(Ttl div Cnt),
+ [Cnt, QTimeout]),
+ time.
+
+time(Val) when Val < 1000 ->
+ integer_to_list(Val) ++ " us";
+time(Val) when Val < 1000000 ->
+ integer_to_list(Val div 1000) ++ " ms";
+time(Val) ->
+ integer_to_list(Val div 1000000) ++ " s".
+
+bytes(Val) when Val < 1000 ->
+ integer_to_list(Val) ++ " B";
+bytes(Val) when Val < 1000000 ->
+ integer_to_list(Val div 1000) ++ " KB";
+bytes(Val) when Val < 1000000000 ->
+ integer_to_list(Val div 1000000) ++ " MB";
+bytes(Val) ->
+ integer_to_list(Val div 1000000000) ++ " GB".
+
+
+print_hist(_TypeFunc, _PredDiv, [], []) -> ok;
+print_hist(TypeFunc, _PredDiv, [0|Divs], [D|KeyDivs]) ->
+ print_hist(TypeFunc, D, Divs, KeyDivs);
+print_hist(TypeFunc, PredDiv, [N|Divs], [D|KeyDivs]) ->
+ io:format(user,
+ "~n ~p between " ++ apply(?MODULE, TypeFunc, [PredDiv])
+ ++ " and " ++ apply(?MODULE, TypeFunc, [D]), [N]),
+ print_hist(TypeFunc, D, Divs, KeyDivs).
+
+print_cntr_key({TimeoutTag, Timeout, OPType, BucketName}, Cnt)
+ when TimeoutTag == op_timeout; TimeoutTag == req_timeout ->
+ io:format(user, "~n ~p ~p requests on bucket: ~p timed out while being serviced, timeout: ~p.~n",
+ [Cnt, OPType, BucketName, Timeout]);
+print_cntr_key({TimeoutTag, Timeout, BucketName}, Cnt)
+ when TimeoutTag == q_timeout; TimeoutTag == req_timeout ->
+ io:format(user,
+ "~n ~p requests on bucket: ~p timed out while in the queue, timeout: ~p.~n",
+ [Cnt, BucketName, Timeout]);
+print_cntr_key({queue_len, QLen}, Cnt) ->
+ io:format(user,
+ "~n on ~p occassions the queue was ~p requests long when new requests were added.~n",
+ [Cnt, QLen]).