Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nhse d34 nhskv.i33 token #35

Open
wants to merge 37 commits into
base: nhse-develop-3.4
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
79a0e55
Initial support for OTP26
martinsumner May 2, 2024
246774a
Remove tracers
martinsumner May 2, 2024
6c46b9d
Update rebar.config
martinsumner May 11, 2024
1d04e59
Update rebar.config
martinsumner May 14, 2024
38907f2
Token Manager
martinsumner May 29, 2024
4e3489d
Add strong conditional check to PB API
martinsumner May 29, 2024
b615d2e
Reset state on PB connection after completion
martinsumner May 29, 2024
cb53d98
Add downstream recording of state
martinsumner May 30, 2024
deb3ce3
Fix uniquenes of node list
martinsumner May 31, 2024
6854977
Refactor riak_kv_pb_object
martinsumner May 31, 2024
47bd106
Move strong conditional check to FSM (#34)
martinsumner May 31, 2024
39a3e87
Add support to HTTP API
martinsumner May 31, 2024
09fcda2
Conditional PUT to require GET not HEAD
martinsumner Jun 5, 2024
56b6833
Add configuration
martinsumner Jun 5, 2024
a0b027b
Add profile function
martinsumner Jun 5, 2024
ef61d8b
Add profiler
martinsumner Jun 6, 2024
a11d4d2
Merge branch 'nhse-d34-otp26' into nhse-d34-nhskv.i30-profiler
martinsumner Jun 6, 2024
664b6e5
Type fix
martinsumner Jun 6, 2024
5b8cb9f
Add to extending list of defaults
martinsumner Jun 7, 2024
346f276
Merge branch 'nhse-d34-nhskv.i30-profiler' into nhse-d34-nhskv.i33-token
martinsumner Jun 12, 2024
87cf5b1
Initial write-up of riak_kv_token_manager
martinsumner Jun 12, 2024
5c18813
Refactor to use monitoring
martinsumner Jun 13, 2024
665d467
Change to config parameters
martinsumner Jun 13, 2024
2f95b40
Remove duplicate
martinsumner Jun 13, 2024
55615f7
Rename (again), make messages async
martinsumner Jun 16, 2024
30a871b
Clarify erpc errors
martinsumner Jun 17, 2024
7758686
Use token_manager to control downstream messages
martinsumner Jun 18, 2024
ce10992
Add GC process
martinsumner Jun 19, 2024
140f883
Avoid re-fetching objects (#36)
martinsumner Jun 19, 2024
c4a033f
Merge branch 'nhse-d34-nhskv.i33-token' of https://github.com/nhs-ria…
martinsumner Jun 19, 2024
09ad183
Extend eunit test of token_manager
martinsumner Jun 19, 2024
10de288
Try and keep github formatter happy
martinsumner Jun 19, 2024
11caa89
Use monitor rather than spoof 'DOWN'
martinsumner Jun 19, 2024
6ba1736
Use uniq code in OTP 24
martinsumner Jun 19, 2024
8f9b5e0
Update comments for perceived clarity
martinsumner Jun 21, 2024
904fd69
Merge branch 'nhse-develop-3.4' into nhse-d34-nhskv.i33-token
martinsumner Sep 23, 2024
3a102b3
Remove double-definition on merge
martinsumner Sep 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion eqc/ec_eqc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -671,13 +671,15 @@ get_fsm_proc(ReqId, #params{n = N, r = R}) ->
DeletedVclock = true,
ExpectedVclock = false,
NodeConfirms = 0,
ReturnBody = true,
GetCore = riak_kv_get_core:init(N, R,
0, %% SLF hack
FailThreshold,
NotFoundOk, AllowMult, DeletedVclock,
[{Idx, primary} || Idx <- lists:seq(1, N)], %% SLF hack
ExpectedVclock,
NodeConfirms
NodeConfirms,
ReturnBody
),
#proc{name = {get_fsm, ReqId}, handler = get_fsm,
procst = #getfsmst{getcore = GetCore}}.
Expand Down
68 changes: 68 additions & 0 deletions priv/riak_kv.schema
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,74 @@
hidden
]}.

%% @doc Mode for handling conditional checks on PUTs
%% Handling if-not-modified (vclock-based) and if-none-match conditionals on
%% PUTS, there are three possible modes:
%% - api_only (default)
%% - prefer_token
%% - mandate_token (not currently implemented)
%%
%% In the api_only mode, a read will be done before the write within the API,
%% and if the read passes the condition the PUT will be allowed (even though
%% a parallel conditional PUT may be in-flight).
%% In the prefer_token mode a token must be requested for the key to be
%% updated, and the read and the write will be managed within the token
%% session. Only a single update will normally have access to the token,
%% requests will queue for use of the token. When a token cannot be secured
%% within a timeout, then the api_only method will be used.
%%
%% Future releases may support a mandate_token mode which will error on failure
%% to get a token, rather than proceed and accept eventual consistency (as with
%% prefer_token).
%%
%% Only conditional PUTs are impacted by this setting, non-conditional PUTs
%% being sent in parallel to conditional PUTs may cause siblings. The
%% HTTP standard headers of If-Unmodified-Since and If-Match, are always
%% applied as api_only checks. The riak-specific vector-clock based
%% if-not-modified header, and the HTTP-default if-none-match header are the
%% only conditional PUTs that will be subject to stronger, token-based
%% restrictions.
{mapping, "conditional_put_mode", "riak_kv.conditional_put_mode", [
{default, api_only},
{datatype, {enum, [api_only, prefer_token]}}
]}.

%% @doc Set the level of verification required on token access
%% When requesting access to a token, this can be done in three different
%% verification modes:
%% - head_only
%% - basic_consensus
%% - primary_consensus (default)
%%
%% The head_only mode will make the node currently at the head of each preflist
%% responsible for granting tokens in isolation. This is intended to meet
%% constraints only in healthy clusters (or single-node clusters).
%%
%% In a consensus mode, the token will be granted by the node at the head of
%% the preflist, and the issuance will be validated by up to two "downstream"
%% nodes. This means that when a node recovers from failure, and becomes head
%% of the preflist it is prevented from making grants which are duplicates of
%% ones made by a downstream node during the failure.
%%
%% There are two forms of consensus - basic and primary. With basic
%% consensus, any avaliable unique nodes in the preflist (either primary or
%% fallback) can be used for consensus. With primary consensus, the nodes
%% must be 3 of 5 primary nodes (and hence a target_n_val of at least 5 is
%% required in this mode).
%%
%% With basic_consensus, tokens can still be granted in a wide range of failure
%% scenarios, but with a risk of duplicate grants, in particular should a
%% cluster be partitioned.
%%
%% No mode provides strict guarantees, including primary_consensus, especially
%% in complex partition scenarios where different nodes have alternative views
%% of node reachability.
{mapping, "token_request_mode", "riak_kv.token_request_mode", [
{default, primary_consensus},
{datatype, {enum, [head_only, basic_consensus, primary_consensus]}}
]}.


%% @doc Controls which binary representation of a riak value is stored
%% on disk.
%% * 0: Original erlang:term_to_binary format. Higher space overhead.
Expand Down
1 change: 0 additions & 1 deletion src/riak_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
%% @type default_timeout() = 60000
-define(DEFAULT_TIMEOUT, 60000).
-define(DEFAULT_FOLD_TIMEOUT, 3600000).
-define(DEFAULT_ERRTOL, 0.00003).

%% TODO: This type needs to be better specified and validated against
%% any dependents on riak_kv.
Expand Down
84 changes: 29 additions & 55 deletions src/riak_kv_get_core.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
%%
%% -------------------------------------------------------------------
-module(riak_kv_get_core).
-export([init/10, update_init/2, head_merge/1,
-export([init/11, update_init/2, head_merge/1,
add_result/4, update_result/5, result_shortcode/1,
enough/1, response/1, has_all_results/1, final_action/1, info/1]).
-export_type([getcore/0, result/0, reply/0, final_action/0]).
Expand Down Expand Up @@ -76,6 +76,7 @@
head_merge = false :: boolean(),
expected_fetchclock = false :: boolean()|vclock:vclock(),
node_confirms = 0 :: non_neg_integer(),
return_body = true :: boolean(),
confirmed_nodes = []}).
-opaque getcore() :: #getcore{}.

Expand All @@ -89,26 +90,30 @@
AllowMult::boolean(), DeletedVClock::boolean(),
IdxType::idx_type(),
ExpClock::false|vclock:vclock(),
NodeConfirms::non_neg_integer()) -> getcore().
NodeConfirms::non_neg_integer(),
ReturnBody::boolean()
) -> getcore().
init(N, R, PR, FailThreshold, NotFoundOk, AllowMult,
DeletedVClock, IdxType, ExpClock, NodeConfirms) ->
#getcore{n = N,
r = case ExpClock of false -> R; _ -> N end,
pr = PR,
ur = 0,
fail_threshold = FailThreshold,
notfound_ok = NotFoundOk,
allow_mult = AllowMult,
deletedvclock = DeletedVClock,
idx_type = IdxType,
expected_fetchclock = ExpClock,
node_confirms = NodeConfirms}.
DeletedVClock, IdxType, ExpClock, NodeConfirms, ReturnBody) ->
#getcore{
n = N,
r = case ExpClock of false -> R; _ -> N end,
pr = PR,
ur = 0,
fail_threshold = FailThreshold,
notfound_ok = NotFoundOk,
allow_mult = AllowMult,
deletedvclock = DeletedVClock,
idx_type = IdxType,
expected_fetchclock = ExpClock,
node_confirms = NodeConfirms,
return_body = ReturnBody
}.

%% Re-initialise a get to a restricted number of vnodes (that must all respond)
-spec update_init(N::pos_integer(), getcore()) -> getcore().
update_init(N, PrevGetCore) ->
PrevGetCore#getcore{ur = N,
head_merge = true}.
PrevGetCore#getcore{ur = N, head_merge = true}.

%% Convert the get so that it is expecting to potentially receive the
%% responses to head requests (though for backwards compatibility these may
Expand Down Expand Up @@ -279,7 +284,7 @@ response(#getcore{r = R, num_ok = NumOK, pr= PR, num_pok = NumPOK,
when (NumOK >= R andalso NumPOK >= PR) orelse ExpClock == true ->
#getcore{results = Results, allow_mult=AllowMult,
deletedvclock = DeletedVClock} = GetCore,
Merged = merge_heads(Results, AllowMult),
Merged = merge_heads(Results, AllowMult, GetCore#getcore.return_body),
case Merged of
{ok, _MergedObj} ->
{Merged, GetCore#getcore{merged = Merged}}; % {ok, MObj}
Expand Down Expand Up @@ -430,11 +435,11 @@ merge(Replies, AllowMult) ->
%% a backend not supporting HEAD was called, or the operation was an UPDATE),
%% or body-less objects.
%%
-spec merge_heads(list(result()), boolean()) ->
-spec merge_heads(list(result()), boolean(), boolean()) ->
{notfound, undefined}|
{tombstone, riak_object:riak_object()}|{ok, riak_object:riak_object()}|
{fetch, list(non_neg_integer())}.
merge_heads(Replies, AllowMult) ->
merge_heads(Replies, AllowMult, ReturnBody) ->
% Replies should be a list of [{Idx, {ok, RObj}]
IdxObjs = [{I, {ok, RObj}} || {I, {ok, RObj}} <- Replies],
% Those that don't pattern match will be not_found
Expand All @@ -443,17 +448,16 @@ merge_heads(Replies, AllowMult) ->
{notfound, undefined};
_ ->
{BestReplies, FetchIdxObjL} = riak_object:find_bestobject(IdxObjs),
case FetchIdxObjL of
[] ->
case {FetchIdxObjL, ReturnBody} of
{_, false} ->
merge(BestReplies ++ FetchIdxObjL, AllowMult);
{[], true} ->
merge(BestReplies, AllowMult);
IdxL ->
{IdxL, true} ->
{fetch, lists:map(fun({Idx, _Rsp}) -> Idx end, IdxL)}
end
end.




%% @private Checks IdxType to see if Idx is a primary.
%% If the Idx is not in the IdxType the world must be
%% resizing (ring expanding). In that case, Idx is
Expand All @@ -474,36 +478,6 @@ num_pr(GetCore = #getcore{num_pok=NumPOK, idx_type=IdxType}, Idx) ->
GetCore
end.

%% @private Print a warning if objects are not equal. Only called on case of no read-repair
%% This situation could happen with pre 2.1 vclocks in very rare cases. Fixing the object
%% requires the user to rewrite the object in 2.1+ of Riak. Logic is enabled when capabilities
%% returns a version(all nodes at least 2.2) and the entropy_manager is not yet version 0
% maybe_log_old_vclock(Results) ->
% case riak_core_capability:get({riak_kv, object_hash_version}, legacy) of
% legacy ->
% ok;
% 0 ->
% Version = riak_kv_entropy_manager:get_version(),
% case [RObj || {_Idx, {ok, RObj}} <- Results] of
% [] ->
% ok;
% [_] ->
% ok;
% _ when Version == 0 ->
% ok;
% [R1|Rest] ->
% case [RObj || RObj <- Rest, not riak_object:equal(R1, RObj)] of
% [] ->
% ok;
% _ ->
% object:warning("Bucket: ~p Key: ~p should be rewritten to guarantee
% compatability with AAE version 0",
% [riak_object:bucket(R1),riak_object:key(R1)])
% end
% end;
% _ ->
% ok
% end.

-ifdef(TEST).

Expand Down
15 changes: 10 additions & 5 deletions src/riak_kv_get_fsm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -346,11 +346,16 @@ validate(timeout, StateData=#state{from = {raw, ReqId, _Pid}, options = Options,
NFOk0 = get_option(notfound_ok, Options, default),
NotFoundOk = riak_kv_util:expand_value(notfound_ok, NFOk0, BucketProps),
DeletedVClock = get_option(deletedvclock, Options, false),
GetCore = riak_kv_get_core:init(N, R, PR, FailThreshold,
NotFoundOk, AllowMult,
DeletedVClock, IdxType,
ExpClock,
NodeConfirms),
ReturnBody = get_option(return_body, Options, true),
GetCore =
riak_kv_get_core:init(
N, R, PR, FailThreshold,
NotFoundOk, AllowMult,
DeletedVClock, IdxType,
ExpClock,
NodeConfirms,
ReturnBody
),
new_state_timeout(execute, StateData#state{get_core = GetCore,
timeout = Timeout,
req_id = ReqId});
Expand Down
Loading