Skip to content

Commit

Permalink
Nhse d34 nhskv.i33 token (#35)
Browse files Browse the repository at this point in the history
Start a token manager on each node.  The token manager will on request register a session against a token, and remove the registration when the session terminates.  If another request is received for the same token, it will be queued until the previous session releases the token (or the queued session dies).

There is a session server that can be started to run a session (which request a token from the local token_manager on startup), and will allow for the session to be used by making calls to riak_client functions.  Each call will renew the session.  After token timeout, the session will terminate.

https://github.com/orgs/OpenRiak/discussions/3
  • Loading branch information
martinsumner authored Dec 23, 2024
1 parent 9934efe commit d8d7e86
Show file tree
Hide file tree
Showing 13 changed files with 2,079 additions and 269 deletions.
4 changes: 3 additions & 1 deletion eqc/ec_eqc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -671,13 +671,15 @@ get_fsm_proc(ReqId, #params{n = N, r = R}) ->
DeletedVclock = true,
ExpectedVclock = false,
NodeConfirms = 0,
ReturnBody = true,
GetCore = riak_kv_get_core:init(N, R,
0, %% SLF hack
FailThreshold,
NotFoundOk, AllowMult, DeletedVclock,
[{Idx, primary} || Idx <- lists:seq(1, N)], %% SLF hack
ExpectedVclock,
NodeConfirms
NodeConfirms,
ReturnBody
),
#proc{name = {get_fsm, ReqId}, handler = get_fsm,
procst = #getfsmst{getcore = GetCore}}.
Expand Down
68 changes: 68 additions & 0 deletions priv/riak_kv.schema
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,74 @@
hidden
]}.

%% @doc Mode for handling conditional checks on PUTs
%% Handling if-not-modified (vclock-based) and if-none-match conditionals on
%% PUTS, there are three possible modes:
%% - api_only (default)
%% - prefer_token
%% - mandate_token (not currently implemented)
%%
%% In the api_only mode, a read will be done before the write within the API,
%% and if the read passes the condition the PUT will be allowed (even though
%% a parallel conditional PUT may be in-flight).
%% In the prefer_token mode a token must be requested for the key to be
%% updated, and the read and the write will be managed within the token
%% session. Only a single update will normally have access to the token,
%% requests will queue for use of the token. When a token cannot be secured
%% within a timeout, then the api_only method will be used.
%%
%% Future releases may support a mandate_token mode which will error on failure
%% to get a token, rather than proceed and accept eventual consistency (as with
%% prefer_token).
%%
%% Only conditional PUTs are impacted by this setting, non-conditional PUTs
%% being sent in parallel to conditional PUTs may cause siblings. The
%% HTTP standard headers of If-Unmodified-Since and If-Match, are always
%% applied as api_only checks. The riak-specific vector-clock based
%% if-not-modified header, and the HTTP-default if-none-match header are the
%% only conditional PUTs that will be subject to stronger, token-based
%% restrictions.
{mapping, "conditional_put_mode", "riak_kv.conditional_put_mode", [
{default, api_only},
{datatype, {enum, [api_only, prefer_token]}}
]}.

%% @doc Set the level of verification required on token access
%% When requesting access to a token, this can be done in three different
%% verification modes:
%% - head_only
%% - basic_consensus
%% - primary_consensus (default)
%%
%% The head_only mode will make the node currently at the head of each preflist
%% responsible for granting tokens in isolation. This is intended to meet
%% constraints only in healthy clusters (or single-node clusters).
%%
%% In a consensus mode, the token will be granted by the node at the head of
%% the preflist, and the issuance will be validated by up to two "downstream"
%% nodes. This means that when a node recovers from failure, and becomes head
%% of the preflist it is prevented from making grants which are duplicates of
%% ones made by a downstream node during the failure.
%%
%% There are two forms of consensus - basic and primary. With basic
%% consensus, any avaliable unique nodes in the preflist (either primary or
%% fallback) can be used for consensus. With primary consensus, the nodes
%% must be 3 of 5 primary nodes (and hence a target_n_val of at least 5 is
%% required in this mode).
%%
%% With basic_consensus, tokens can still be granted in a wide range of failure
%% scenarios, but with a risk of duplicate grants, in particular should a
%% cluster be partitioned.
%%
%% No mode provides strict guarantees, including primary_consensus, especially
%% in complex partition scenarios where different nodes have alternative views
%% of node reachability.
{mapping, "token_request_mode", "riak_kv.token_request_mode", [
{default, primary_consensus},
{datatype, {enum, [head_only, basic_consensus, primary_consensus]}}
]}.


%% @doc Controls which binary representation of a riak value is stored
%% on disk.
%% * 0: Original erlang:term_to_binary format. Higher space overhead.
Expand Down
1 change: 0 additions & 1 deletion src/riak_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
%% @type default_timeout() = 60000
-define(DEFAULT_TIMEOUT, 60000).
-define(DEFAULT_FOLD_TIMEOUT, 3600000).
-define(DEFAULT_ERRTOL, 0.00003).

%% TODO: This type needs to be better specified and validated against
%% any dependents on riak_kv.
Expand Down
84 changes: 29 additions & 55 deletions src/riak_kv_get_core.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
%%
%% -------------------------------------------------------------------
-module(riak_kv_get_core).
-export([init/10, update_init/2, head_merge/1,
-export([init/11, update_init/2, head_merge/1,
add_result/4, update_result/5, result_shortcode/1,
enough/1, response/1, has_all_results/1, final_action/1, info/1]).
-export_type([getcore/0, result/0, reply/0, final_action/0]).
Expand Down Expand Up @@ -76,6 +76,7 @@
head_merge = false :: boolean(),
expected_fetchclock = false :: boolean()|vclock:vclock(),
node_confirms = 0 :: non_neg_integer(),
return_body = true :: boolean(),
confirmed_nodes = []}).
-opaque getcore() :: #getcore{}.

Expand All @@ -89,26 +90,30 @@
AllowMult::boolean(), DeletedVClock::boolean(),
IdxType::idx_type(),
ExpClock::false|vclock:vclock(),
NodeConfirms::non_neg_integer()) -> getcore().
NodeConfirms::non_neg_integer(),
ReturnBody::boolean()
) -> getcore().
init(N, R, PR, FailThreshold, NotFoundOk, AllowMult,
DeletedVClock, IdxType, ExpClock, NodeConfirms) ->
#getcore{n = N,
r = case ExpClock of false -> R; _ -> N end,
pr = PR,
ur = 0,
fail_threshold = FailThreshold,
notfound_ok = NotFoundOk,
allow_mult = AllowMult,
deletedvclock = DeletedVClock,
idx_type = IdxType,
expected_fetchclock = ExpClock,
node_confirms = NodeConfirms}.
DeletedVClock, IdxType, ExpClock, NodeConfirms, ReturnBody) ->
#getcore{
n = N,
r = case ExpClock of false -> R; _ -> N end,
pr = PR,
ur = 0,
fail_threshold = FailThreshold,
notfound_ok = NotFoundOk,
allow_mult = AllowMult,
deletedvclock = DeletedVClock,
idx_type = IdxType,
expected_fetchclock = ExpClock,
node_confirms = NodeConfirms,
return_body = ReturnBody
}.

%% Re-initialise a get to a restricted number of vnodes (that must all respond)
-spec update_init(N::pos_integer(), getcore()) -> getcore().
update_init(N, PrevGetCore) ->
PrevGetCore#getcore{ur = N,
head_merge = true}.
PrevGetCore#getcore{ur = N, head_merge = true}.

%% Convert the get so that it is expecting to potentially receive the
%% responses to head requests (though for backwards compatibility these may
Expand Down Expand Up @@ -279,7 +284,7 @@ response(#getcore{r = R, num_ok = NumOK, pr= PR, num_pok = NumPOK,
when (NumOK >= R andalso NumPOK >= PR) orelse ExpClock == true ->
#getcore{results = Results, allow_mult=AllowMult,
deletedvclock = DeletedVClock} = GetCore,
Merged = merge_heads(Results, AllowMult),
Merged = merge_heads(Results, AllowMult, GetCore#getcore.return_body),
case Merged of
{ok, _MergedObj} ->
{Merged, GetCore#getcore{merged = Merged}}; % {ok, MObj}
Expand Down Expand Up @@ -430,11 +435,11 @@ merge(Replies, AllowMult) ->
%% a backend not supporting HEAD was called, or the operation was an UPDATE),
%% or body-less objects.
%%
-spec merge_heads(list(result()), boolean()) ->
-spec merge_heads(list(result()), boolean(), boolean()) ->
{notfound, undefined}|
{tombstone, riak_object:riak_object()}|{ok, riak_object:riak_object()}|
{fetch, list(non_neg_integer())}.
merge_heads(Replies, AllowMult) ->
merge_heads(Replies, AllowMult, ReturnBody) ->
% Replies should be a list of [{Idx, {ok, RObj}]
IdxObjs = [{I, {ok, RObj}} || {I, {ok, RObj}} <- Replies],
% Those that don't pattern match will be not_found
Expand All @@ -443,17 +448,16 @@ merge_heads(Replies, AllowMult) ->
{notfound, undefined};
_ ->
{BestReplies, FetchIdxObjL} = riak_object:find_bestobject(IdxObjs),
case FetchIdxObjL of
[] ->
case {FetchIdxObjL, ReturnBody} of
{_, false} ->
merge(BestReplies ++ FetchIdxObjL, AllowMult);
{[], true} ->
merge(BestReplies, AllowMult);
IdxL ->
{IdxL, true} ->
{fetch, lists:map(fun({Idx, _Rsp}) -> Idx end, IdxL)}
end
end.




%% @private Checks IdxType to see if Idx is a primary.
%% If the Idx is not in the IdxType the world must be
%% resizing (ring expanding). In that case, Idx is
Expand All @@ -474,36 +478,6 @@ num_pr(GetCore = #getcore{num_pok=NumPOK, idx_type=IdxType}, Idx) ->
GetCore
end.

%% @private Print a warning if objects are not equal. Only called on case of no read-repair
%% This situation could happen with pre 2.1 vclocks in very rare cases. Fixing the object
%% requires the user to rewrite the object in 2.1+ of Riak. Logic is enabled when capabilities
%% returns a version(all nodes at least 2.2) and the entropy_manager is not yet version 0
% maybe_log_old_vclock(Results) ->
% case riak_core_capability:get({riak_kv, object_hash_version}, legacy) of
% legacy ->
% ok;
% 0 ->
% Version = riak_kv_entropy_manager:get_version(),
% case [RObj || {_Idx, {ok, RObj}} <- Results] of
% [] ->
% ok;
% [_] ->
% ok;
% _ when Version == 0 ->
% ok;
% [R1|Rest] ->
% case [RObj || RObj <- Rest, not riak_object:equal(R1, RObj)] of
% [] ->
% ok;
% _ ->
% object:warning("Bucket: ~p Key: ~p should be rewritten to guarantee
% compatability with AAE version 0",
% [riak_object:bucket(R1),riak_object:key(R1)])
% end
% end;
% _ ->
% ok
% end.

-ifdef(TEST).

Expand Down
15 changes: 10 additions & 5 deletions src/riak_kv_get_fsm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -346,11 +346,16 @@ validate(timeout, StateData=#state{from = {raw, ReqId, _Pid}, options = Options,
NFOk0 = get_option(notfound_ok, Options, default),
NotFoundOk = riak_kv_util:expand_value(notfound_ok, NFOk0, BucketProps),
DeletedVClock = get_option(deletedvclock, Options, false),
GetCore = riak_kv_get_core:init(N, R, PR, FailThreshold,
NotFoundOk, AllowMult,
DeletedVClock, IdxType,
ExpClock,
NodeConfirms),
ReturnBody = get_option(return_body, Options, true),
GetCore =
riak_kv_get_core:init(
N, R, PR, FailThreshold,
NotFoundOk, AllowMult,
DeletedVClock, IdxType,
ExpClock,
NodeConfirms,
ReturnBody
),
new_state_timeout(execute, StateData#state{get_core = GetCore,
timeout = Timeout,
req_id = ReqId});
Expand Down
Loading

0 comments on commit d8d7e86

Please sign in to comment.