Skip to content

Commit

Permalink
Fixing tracing for multipart
Browse files Browse the repository at this point in the history
  • Loading branch information
ferd committed Apr 6, 2024
1 parent c148d1e commit a1d1290
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 24 deletions.
2 changes: 0 additions & 2 deletions apps/revault/src/revault_file_disk.erl
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,6 @@ read_range(Path, Offset, Bytes) ->
{ok, Fd} ?= file:open(Path, [read, raw, binary]),
Res = file:pread(Fd, Offset, Bytes),
file:close(Fd),
%% TODO: compare with S3 on the behavior of going past the EOF and see
%% that we align errors.
case Res of
{ok, Bin} when byte_size(Bin) =:= Bytes -> Res;
{error, _} -> Res;
Expand Down
87 changes: 65 additions & 22 deletions apps/revault/src/revault_fsm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ client_sync_files(enter, _, Data) ->
client_sync_files(cast, {send, File},
DataTmp=#data{name=Name, path=Path,
sub=S=#client_sync{remote=R, queue=Q}}) ->
Data = start_span(<<"file_send">>, DataTmp),
Data = start_span(<<"schedule_file_send">>, DataTmp),
set_attributes([{<<"peer">>, ?str(R)}, {<<"path">>, File} | ?attrs(Data)]),
Q2 = send_file_schedule(Q, R, Name, Path, File),
NewData = end_span(DataTmp),
Expand All @@ -577,22 +577,23 @@ client_sync_files(cast, {fetch, File},
#{attributes => [{<<"path">>, File}, {<<"peer">>, ?str(R)} | ?attrs(Data)]},
fun(_SpanCtx) ->
Payload = revault_data_wrapper:fetch_file(File),
%% TODO: track the incoming transfers to know when we're done syncing
{_Marker, NewCb} = apply_cb(Cb, send, [R, Payload]),
NewQ = send_next_scheduled(Q),
{keep_state, Data#data{callback=NewCb,
sub=S#client_sync{queue=NewQ, acc=[File|Acc]}}}
end);
client_sync_files(cast, {send, R, Transfer},
Data=#data{path=Path, callback=Cb,
sub=S=#client_sync{remote=R, queue=Q}}) ->
%% TODO: wrap in tracing
DataTmp=#data{path=Path, callback=Cb,
sub=S=#client_sync{remote=R, queue=Q}}) ->
Data = start_span(<<"file_send">>, DataTmp),
set_attributes([{<<"remote">>, ?str(R)} | ?attrs(Data)]),
Payload = wrap(Path, Transfer),
%% TODO: track the success or failures of transfers, detect disconnections
{_Marker, NewCb} = apply_cb(Cb, send, [R, Payload]),
NewData = end_span(Data),
NewQ = send_next_scheduled(Q),
{keep_state, Data#data{callback=NewCb,
sub=S#client_sync{queue=NewQ}}};
{keep_state, NewData#data{callback=NewCb,
sub=S#client_sync{queue=NewQ}}};
client_sync_files(cast, sync_complete, Data=#data{sub=#client_sync{acc=[]}}) ->
#data{callback=Cb, sub=#client_sync{remote=R}} = Data,
Payload = revault_data_wrapper:sync_complete(),
Expand Down Expand Up @@ -651,8 +652,14 @@ client_sync_files(info, {revault, _Marker, {conflict_file, WorkF, F, CountLeft,
end;
client_sync_files(info, {revault, _Marker, {file, F, Meta, PartNum, PartTotal, Bin}}, Data) ->
#data{name=Name, id=Id, sub=S=#client_sync{multiparts=MP, queue=Q, acc=Acc}} = Data,
%% TODO: wrap in with_span
{MPStage, NewMP} = handle_multipart_file_sync(MP, Name, Id, F, Meta, PartNum, PartTotal, Bin),
{MPStage, NewMP} = ?with_span(<<"file_part_recv">>,
#{attributes => [{<<"path">>, F}, {<<"size">>, byte_size(Bin)},
{<<"part.num">>,PartNum}, {<<"part.total">>, PartTotal},
{<<"meta">>, ?str(Meta)} | ?attrs(Data)]},
fun(_SpanCtx) ->
handle_multipart_file_sync(MP, Name, Id, F, Meta, PartNum, PartTotal, Bin)
end
),
NewQ = send_next_scheduled(Q),
NewAcc = case MPStage of
done -> Acc -- [F];
Expand All @@ -661,8 +668,14 @@ client_sync_files(info, {revault, _Marker, {file, F, Meta, PartNum, PartTotal, B
{keep_state, Data#data{scan=true, sub=S#client_sync{queue=NewQ, acc=NewAcc, multiparts=NewMP}}};
client_sync_files(info, {revault, _Marker, {conflict_multipart_file, WorkF, F, CountLeft, Meta, PartNum, PartTotal, Bin}}, Data) ->
#data{name=Name, sub=S=#client_sync{multiparts=MP, queue=Q, acc=Acc}} = Data,
%% TODO: wrap in with_span
{MPStage, NewMP} = handle_multipart_conflict_file_sync(MP, Name, WorkF, F, Meta, PartNum, PartTotal, Bin),
{MPStage, NewMP} = ?with_span(<<"conflict_part">>,
#{attributes => [{<<"path">>, F}, {<<"size">>, byte_size(Bin)},
{<<"part.num">>,PartNum}, {<<"part.total">>, PartTotal},
{<<"meta">>, ?str(Meta)} | ?attrs(Data)]},
fun(_SpanCtx) ->
handle_multipart_conflict_file_sync(MP, Name, WorkF, F, Meta, PartNum, PartTotal, Bin)
end
),
case {MPStage, CountLeft} of
{done, 0} ->
%% conflict file complete.
Expand Down Expand Up @@ -814,8 +827,14 @@ server_sync_files(info, {revault, _Marker, {file, F, Meta, Bin}},
server_sync_files(info, {revault, _Marker, {file, F, Meta, PartNum, PartTotal, Bin}},
Data=#data{name=Name, id=Id,
sub=S=#server{multiparts=MP}}) ->
%% TODO: wrap in with_span
{_, NewMP} = handle_multipart_file_sync(MP, Name, Id, F, Meta, PartNum, PartTotal, Bin),
{_, NewMP} = ?with_span(<<"file_part_recv">>,
#{attributes => [{<<"path">>, F}, {<<"size">>, byte_size(Bin)},
{<<"part.num">>,PartNum}, {<<"part.total">>, PartTotal},
{<<"meta">>, ?str(Meta)} | ?attrs(Data)]},
fun(_SpanCtx) ->
handle_multipart_file_sync(MP, Name, Id, F, Meta, PartNum, PartTotal, Bin)
end
),
{keep_state, Data#data{scan=true, sub=S#server{multiparts=NewMP}}};
server_sync_files(info, {revault, _Marker, {deleted_file, F, Meta}},
Data=#data{name=Name, id=Id}) ->
Expand All @@ -840,23 +859,31 @@ server_sync_files(info, {revault, _M, {conflict_file, WorkF, F, CountLeft, Meta,
{keep_state, Data#data{scan=true}};
server_sync_files(info, {revault, _M, {conflict_multipart_file, WorkF, F, _CountLeft, Meta, PartNum, PartTotal, Bin}},
Data=#data{name=Name, sub=S=#server{multiparts=MP}}) ->
%% TODO: wrap in with_span
{_, NewMP} = handle_multipart_conflict_file_sync(MP, Name, WorkF, F, Meta, PartNum, PartTotal, Bin),
{_, NewMP} = ?with_span(<<"conflict_part">>,
#{attributes => [{<<"path">>, F}, {<<"size">>, byte_size(Bin)},
{<<"part.num">>,PartNum}, {<<"part.total">>, PartTotal},
{<<"meta">>, ?str(Meta)} | ?attrs(Data)]},
fun(_SpanCtx) ->
handle_multipart_conflict_file_sync(MP, Name, WorkF, F, Meta, PartNum, PartTotal, Bin)
end
),
{keep_state, Data#data{scan=true, sub=S#server{multiparts=NewMP}}};
server_sync_files(info, {revault, Marker, {fetch, F}},
Data=#data{name=Name, path=Path, sub=S=#server{queue=Q, remote=R}}) ->
TmpQ = send_file_reply_schedule(Q, R, Marker, Name, Path, F),
NewQ = send_next_scheduled(TmpQ),
{keep_state, Data#data{sub=S#server{queue=NewQ}}};
server_sync_files(cast, {send_reply, R, Marker, Transfer},
Data=#data{path=Path, callback=Cb,
sub=S=#server{remote=R, queue=Q}}) ->
%% TODO: wrap in tracing
DataTmp=#data{path=Path, callback=Cb,
sub=S=#server{remote=R, queue=Q}}) ->
Data = start_span(<<"file_send">>, DataTmp),
set_attributes([{<<"remote">>, ?str(R)} | ?attrs(Data)]),
Payload = wrap(Path, Transfer),
%% TODO: track the success or failures of transfers, detect disconnections
{_Marker, NewCb} = apply_cb(Cb, reply, [R, Marker, Payload]),
NewData = end_span(Data),
NewQ = send_next_scheduled(Q),
{keep_state, Data#data{callback=NewCb, sub=S#server{queue=NewQ}}};
{keep_state, NewData#data{callback=NewCb, sub=S#server{queue=NewQ}}};
server_sync_files(info, {revault, Marker, sync_complete},
DataTmp=#data{name=Name, callback=Cb, scan=ScanNeeded,
sub=#server{remote=R}}) ->
Expand Down Expand Up @@ -1185,17 +1212,33 @@ file_transfer_schedule(Name, Path, File) ->


wrap(_Path, {deleted, File, Vsn}) ->
set_attribute(<<"path">>, File),
set_attribute(<<"transfer_type">>, <<"deleted">>),
revault_data_wrapper:send_deleted(File, Vsn);
wrap(Path, {conflict_file, File, FHash, Ct, Meta}) ->
set_attribute(<<"path">>, FHash),
set_attribute(<<"transfer_type">>, <<"conflict_file">>),
set_attribute(<<"conflict.ct">>, Ct),
{ok, Bin} = revault_file:read_file(filename:join(Path, FHash)),
revault_data_wrapper:send_conflict_file(File, FHash, Ct, Meta, Bin);
wrap(Path, {file, File, {Vsn, Hash}}) ->
set_attribute(<<"path">>, File),
set_attribute(<<"transfer_type">>, <<"file">>),
{ok, Bin} = revault_file:read_file(filename:join(Path, File)),
revault_data_wrapper:send_file(File, Vsn, Hash, Bin);
wrap(Path, {part, file, File, Vsn, Hash, Offset, SizeBytes, NumPart, TotalParts}) ->
set_attribute(<<"path">>, File),
set_attribute(<<"transfer_type">>, <<"multipart_file">>),
set_attribute(<<"part.num">>, NumPart),
set_attribute(<<"part.total">>, TotalParts),
{ok, Bin} = revault_file:read_range(filename:join(Path, File), Offset, SizeBytes),
revault_data_wrapper:send_multipart_file(File, Vsn, Hash, NumPart, TotalParts, Bin);
wrap(Path, {part, {conflict_file, F, Ct}, FHash, Vsn, Hash, Offset, SizeBytes, NumPart, TotalParts}) ->
set_attribute(<<"path">>, FHash),
set_attribute(<<"transfer_type">>, <<"multipart_conflict_file">>),
set_attribute(<<"conflict.ct">>, Ct),
set_attribute(<<"part.num">>, NumPart),
set_attribute(<<"part.total">>, TotalParts),
{ok, Bin} = revault_file:read_range(filename:join(Path, FHash), Offset, SizeBytes),
revault_data_wrapper:send_conflict_multipart_file(F, FHash, Ct, {Vsn, Hash}, NumPart, TotalParts, Bin).

Expand Down Expand Up @@ -1273,9 +1316,9 @@ set_attributes(Attrs) ->
SpanCtx = otel_tracer:current_span_ctx(otel_ctx:get_current()),
otel_span:set_attributes(SpanCtx, Attrs).

%set_attribute(Attr, Val) ->
% SpanCtx = otel_tracer:current_span_ctx(otel_ctx:get_current()),
% otel_span:set_attribute(SpanCtx, Attr, Val).
set_attribute(Attr, Val) ->
SpanCtx = otel_tracer:current_span_ctx(otel_ctx:get_current()),
otel_span:set_attribute(SpanCtx, Attr, Val).

end_span(Data=#data{ctx=[]}) ->
Data;
Expand Down

0 comments on commit a1d1290

Please sign in to comment.