Skip to content

Commit

Permalink
finish testing multipart and read_range for integration
Browse files Browse the repository at this point in the history
  • Loading branch information
ferd committed Apr 5, 2024
1 parent 5aee60b commit a1d8470
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 13 deletions.
27 changes: 18 additions & 9 deletions apps/revault/src/revault_s3.erl
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,14 @@ read_range(Path, Offset, Bytes) ->
Res = aws_s3:get_object(client(), bucket(), PathBin, #{}, #{<<"Range">> => RangeStr}),
maybe
ok ?= handle_result(Res),
{ok, #{<<"Body">> := Contents}, _} = Res,
{ok, Contents}
{ok, #{<<"Body">> := Contents,
<<"ContentLength">> := LengthBin}, _} = Res,
%% AWS will give us data shorter than expected if we read after
%% the end of a file, so catch that.
case binary_to_integer(LengthBin) =:= Bytes of
true -> {ok, Contents};
false -> {error, invalid_range}
end
end.


Expand All @@ -178,7 +184,7 @@ multipart_init(Path, PartsTotal, Hash) ->
%% so we do a cheat here by uploading them to a temporary location, then
%% moving them via the S3 API. Under the covers, the S3 API does the copy
%% as a single atomic operation, which re-computes the hash we need.
Tmp = tmp(),
Tmp = unicode:characters_to_binary(tmp()),
{ok, #{<<"InitiateMultipartUploadResult">> :=
#{<<"UploadId">> := UploadId}},
{200, _Headers, _Ref}} =
Expand Down Expand Up @@ -217,7 +223,7 @@ multipart_update({state, Path, _PartsSeen, PartsTotal, Hash,
Chk = base64:encode(PartHash),
NewRollingHash = crypto:hash_update(RollingHash, Bin),
Res = aws_s3:upload_part(
client(), client(), Tmp,
client(), bucket(), Tmp,
#{<<"Body">> => Bin,
<<"ChecksumAlgorithm">> => <<"SHA256">>,
<<"ChecksumSHA256">> => Chk,
Expand All @@ -229,9 +235,9 @@ multipart_update({state, Path, _PartsSeen, PartsTotal, Hash,
maybe
ok ?= handle_result(Res),
{ok, #{<<"ETag">> := PartETag, <<"ChecksumSHA256">> := Chk}, _} = Res,
{state, Path, PartNum, PartsTotal, Hash,
{Tmp, UploadId, NewRollingHash,
[{PartNum, {PartHash, PartETag}} | PartsAcc]}}
{ok, {state, Path, PartNum, PartsTotal, Hash,
{Tmp, UploadId, NewRollingHash,
[{PartNum, {PartHash, PartETag}} | PartsAcc]}}}
end.

-spec multipart_final(State, Path, PartsTotal, Hash) -> ok when
Expand All @@ -248,7 +254,9 @@ multipart_final({state, Path, _PartsSeen, PartsTotal, Hash,
%% S3 multipart upload does a checksum of checksums with a -N suffix.
%% We'll move the upload to get the final hash, but we still want to do
%% a partial check.
MultipartHash = << <<PartHash/binary>> || {_, {PartHash, _}} <- lists:reverse(PartsAcc) >>,
MultipartHash = revault_file:hash_bin(
<< <<PartHash/binary>> || {_, {PartHash, _}} <- lists:reverse(PartsAcc) >>
),
MultipartChk = << (base64:encode(MultipartHash))/binary,
"-", (integer_to_binary(length(PartsAcc)))/binary>>,
%% finalize the upload
Expand All @@ -268,7 +276,7 @@ multipart_final({state, Path, _PartsSeen, PartsTotal, Hash,
{ok, #{<<"CompleteMultipartUploadResult">> :=
#{<<"ChecksumSHA256">> := MultipartChk}},
_} = ResFinalize,
CopyRes = copy_raw(Tmp, Path),
CopyRes = copy_raw(Tmp, unicode:characters_to_binary(Path)),
ok ?= handle_result(CopyRes),
{ok, #{<<"CopyObjectResult">> := #{<<"ChecksumSHA256">> := Chk}}, _} = CopyRes,
handle_result(aws_s3:delete_object(client(), bucket(), Tmp, #{}))
Expand Down Expand Up @@ -304,6 +312,7 @@ handle_result(_Unknown) ->
{error, badarg}.

translate_code(<<"NoSuchKey">>) -> enoent;
translate_code(<<"InvalidRange">>) -> invalid_range;
translate_code(Code) -> Code.

copy_raw(From, To) ->
Expand Down
95 changes: 91 additions & 4 deletions apps/revault/test/s3_integration_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@ all() -> [{group, api},
{group, abstraction},
{group, cache}].

%% TODO: test range reads, multipart abstraction, range abstraction

groups() ->
[{api, [sequence], [list_objects_empty, crud_object, rename_raw, pagination,
multipart_upload]},
multipart_upload, get_object_range]},
{abstraction, [sequence], [read_write_delete, hasht, copy, rename,
find_hashes, consult, is_regular]},
find_hashes, consult, is_regular,
multipart, read_range]},
{cache, [sequence], [find_hashes_cached]}
].

Expand Down Expand Up @@ -283,6 +282,47 @@ multipart_upload(Config) ->
aws_s3:delete_object(Client, Bucket, KeyCopy, #{}),
ok.

get_object_range() ->
[{doc, "Range queries can be submitted to read subset of objects"}].
get_object_range(Config) ->
Client = ?config(aws_client, Config),
Bucket = ?config(bucket, Config),
Dir = ?config(bucket_dir, Config),
Key = filename:join([Dir, "crud_object"]),
Body = <<"0123456789abcdef">>,
Chk = hash(Body),
%% Create, with SHA256
?assertMatch({ok, #{<<"ChecksumSHA256">> := Chk}, _Http},
aws_s3:put_object(Client, Bucket, Key,
#{<<"Body">> => Body,
<<"ChecksumAlgorithm">> => <<"SHA256">>,
<<"ChecksumSHA256">> => Chk})),
%% Read, range being firstbyte-lastbyte
?assertMatch({ok, #{<<"Body">> := <<"0">>}, _Http},
aws_s3:get_object(Client, Bucket, Key, #{},
#{<<"Range">> => <<"bytes=0-0">>})),
?assertMatch({ok, #{<<"Body">> := <<"01">>}, _Http},
aws_s3:get_object(Client, Bucket, Key, #{},
#{<<"Range">> => <<"bytes=0-1">>})),
?assertMatch({ok, #{<<"Body">> := <<"1">>}, _Http},
aws_s3:get_object(Client, Bucket, Key, #{},
#{<<"Range">> => <<"bytes=1-1">>})),
?assertMatch({ok, #{<<"Body">> := <<"9abcd">>}, _Http},
aws_s3:get_object(Client, Bucket, Key, #{},
#{<<"Range">> => <<"bytes=9-13">>})),
%% It ignores incomplete endings so long as the start position is valid
?assertMatch({ok, #{<<"Body">> := <<"9abcdef">>,
<<"ContentLength">> := <<"7">>}, _Http},
aws_s3:get_object(Client, Bucket, Key, #{},
#{<<"Range">> => <<"bytes=9-130">>})),
?assertMatch({error, #{<<"Error">> := #{<<"Code">> := <<"InvalidRange">>}}, _Http},
aws_s3:get_object(Client, Bucket, Key, #{},
#{<<"Range">> => <<"bytes=19-130">>})),
%% Delete
?assertMatch({ok, #{}, _Http},
aws_s3:delete_object(Client, Bucket, Key, #{})),
ok.

%%%%%%%%%%%%%%%%%%%%%%%%%
%%% ABSTRACTION GROUP %%%
%%%%%%%%%%%%%%%%%%%%%%%%%
Expand Down Expand Up @@ -386,6 +426,53 @@ is_regular(Config) ->
?assertEqual(ok, revault_s3:delete(Path)),
ok.

multipart() ->
[{doc, "Test the multipart upload functionality"}].
multipart(Config) ->
Dir = ?config(bucket_dir, Config),
Path = filename:join([Dir, "mpart"]),

WidthBytes = 1024*1024*5,
WidthBits = 8*WidthBytes,
Parts = 11,
Bin = <<0:WidthBits, 1:WidthBits, 2:WidthBits, 3:WidthBits, 4:WidthBits,
5:WidthBits, 6:WidthBits, 7:WidthBits, 8:WidthBits, 9:WidthBits, 10>>,
Hash = revault_file:hash_bin(Bin),
{_, State} = lists:foldl(
fun(Part, {N,S}) ->
{ok, NewS} = revault_s3:multipart_update(S, Path, N, Parts, Hash, Part),
{N+1, NewS}
end,
{1, revault_s3:multipart_init(Path, Parts, Hash)},
[<<N:WidthBits>> || N <- lists:seq(0,Parts-2)]++[<<10>>]
),
ok = revault_s3:multipart_final(State, Path, Parts, Hash),
?assertEqual({ok, Bin}, revault_s3:read_file(Path)),
?assertEqual(ok, revault_s3:delete(Path)),
ok.

read_range() ->
[{doc, "Test the read_range functionality"}].
read_range(Config) ->
Dir = ?config(bucket_dir, Config),
Path = filename:join([Dir, "read_range"]),
WidthBytes = 100,
WidthBits = 8*WidthBytes,
Bin = <<0:WidthBits, 1:WidthBits, 2:WidthBits, 3:WidthBits, 4:WidthBits,
5:WidthBits, 6:WidthBits, 7:WidthBits, 8:WidthBits, 9:WidthBits>>,
revault_s3:write_file(Path, Bin),
?assertMatch({ok, Bin}, revault_s3:read_range(Path, 0, WidthBytes*10)),
?assertMatch({error, _}, revault_s3:read_range(Path, 0, WidthBytes*10+1000)),
?assertMatch({error, _}, revault_s3:read_range(Path, WidthBytes*1000, 1)),
?assertMatch({ok, <<5:100/unit:8, _:400/binary>>},
revault_s3:read_range(Path, WidthBytes*5, WidthBytes*5)),
?assertMatch({ok, <<5:100/unit:8>>},
revault_s3:read_range(Path, WidthBytes*5, WidthBytes)),
?assertMatch({ok, <<5:100/unit:8, 0>>},
revault_s3:read_range(Path, WidthBytes*5, WidthBytes+1)),
?assertEqual(ok, revault_s3:delete(Path)),
ok.

%%%%%%%%%%%%%%%%%%%
%%% CACHE GROUP %%%
%%%%%%%%%%%%%%%%%%%
Expand Down

0 comments on commit a1d8470

Please sign in to comment.