diff --git a/apps/revault/src/revault_fsm.erl b/apps/revault/src/revault_fsm.erl index 821c789..7185075 100644 --- a/apps/revault/src/revault_fsm.erl +++ b/apps/revault/src/revault_fsm.erl @@ -564,7 +564,7 @@ client_sync_files(enter, _, Data) -> client_sync_files(cast, {send, File}, DataTmp=#data{name=Name, path=Path, sub=S=#client_sync{remote=R, queue=Q}}) -> - Data = start_span(<<"file_send">>, DataTmp), + Data = start_span(<<"schedule_file_send">>, DataTmp), set_attributes([{<<"peer">>, ?str(R)}, {<<"path">>, File} | ?attrs(Data)]), Q2 = send_file_schedule(Q, R, Name, Path, File), NewData = end_span(DataTmp), @@ -577,22 +577,23 @@ client_sync_files(cast, {fetch, File}, #{attributes => [{<<"path">>, File}, {<<"peer">>, ?str(R)} | ?attrs(Data)]}, fun(_SpanCtx) -> Payload = revault_data_wrapper:fetch_file(File), - %% TODO: track the incoming transfers to know when we're done syncing {_Marker, NewCb} = apply_cb(Cb, send, [R, Payload]), NewQ = send_next_scheduled(Q), {keep_state, Data#data{callback=NewCb, sub=S#client_sync{queue=NewQ, acc=[File|Acc]}}} end); client_sync_files(cast, {send, R, Transfer}, - Data=#data{path=Path, callback=Cb, - sub=S=#client_sync{remote=R, queue=Q}}) -> - %% TODO: wrap in tracing + DataTmp=#data{path=Path, callback=Cb, + sub=S=#client_sync{remote=R, queue=Q}}) -> + Data = start_span(<<"file_send">>, DataTmp), + set_attributes([{<<"remote">>, ?str(R)} | ?attrs(Data)]), Payload = wrap(Path, Transfer), %% TODO: track the success or failures of transfers, detect disconnections {_Marker, NewCb} = apply_cb(Cb, send, [R, Payload]), + NewData = end_span(Data), NewQ = send_next_scheduled(Q), - {keep_state, Data#data{callback=NewCb, - sub=S#client_sync{queue=NewQ}}}; + {keep_state, NewData#data{callback=NewCb, + sub=S#client_sync{queue=NewQ}}}; client_sync_files(cast, sync_complete, Data=#data{sub=#client_sync{acc=[]}}) -> #data{callback=Cb, sub=#client_sync{remote=R}} = Data, Payload = revault_data_wrapper:sync_complete(), @@ -651,8 +652,14 @@ client_sync_files(info, {revault, _Marker, {conflict_file, WorkF, F, CountLeft, end; client_sync_files(info, {revault, _Marker, {file, F, Meta, PartNum, PartTotal, Bin}}, Data) -> #data{name=Name, id=Id, sub=S=#client_sync{multiparts=MP, queue=Q, acc=Acc}} = Data, - %% TODO: wrap in with_span - {MPStage, NewMP} = handle_multipart_file_sync(MP, Name, Id, F, Meta, PartNum, PartTotal, Bin), + {MPStage, NewMP} = ?with_span(<<"file_part_recv">>, + #{attributes => [{<<"path">>, F}, {<<"size">>, byte_size(Bin)}, + {<<"part.num">>,PartNum}, {<<"part.total">>, PartTotal}, + {<<"meta">>, ?str(Meta)} | ?attrs(Data)]}, + fun(_SpanCtx) -> + handle_multipart_file_sync(MP, Name, Id, F, Meta, PartNum, PartTotal, Bin) + end + ), NewQ = send_next_scheduled(Q), NewAcc = case MPStage of done -> Acc -- [F]; @@ -661,8 +668,14 @@ client_sync_files(info, {revault, _Marker, {file, F, Meta, PartNum, PartTotal, B {keep_state, Data#data{scan=true, sub=S#client_sync{queue=NewQ, acc=NewAcc, multiparts=NewMP}}}; client_sync_files(info, {revault, _Marker, {conflict_multipart_file, WorkF, F, CountLeft, Meta, PartNum, PartTotal, Bin}}, Data) -> #data{name=Name, sub=S=#client_sync{multiparts=MP, queue=Q, acc=Acc}} = Data, - %% TODO: wrap in with_span - {MPStage, NewMP} = handle_multipart_conflict_file_sync(MP, Name, WorkF, F, Meta, PartNum, PartTotal, Bin), + {MPStage, NewMP} = ?with_span(<<"conflict_part">>, + #{attributes => [{<<"path">>, F}, {<<"size">>, byte_size(Bin)}, + {<<"part.num">>,PartNum}, {<<"part.total">>, PartTotal}, + {<<"meta">>, ?str(Meta)} | ?attrs(Data)]}, + fun(_SpanCtx) -> + handle_multipart_conflict_file_sync(MP, Name, WorkF, F, Meta, PartNum, PartTotal, Bin) + end + ), case {MPStage, CountLeft} of {done, 0} -> %% conflict file complete. @@ -814,8 +827,14 @@ server_sync_files(info, {revault, _Marker, {file, F, Meta, Bin}}, server_sync_files(info, {revault, _Marker, {file, F, Meta, PartNum, PartTotal, Bin}}, Data=#data{name=Name, id=Id, sub=S=#server{multiparts=MP}}) -> - %% TODO: wrap in with_span - {_, NewMP} = handle_multipart_file_sync(MP, Name, Id, F, Meta, PartNum, PartTotal, Bin), + {_, NewMP} = ?with_span(<<"file_part_recv">>, + #{attributes => [{<<"path">>, F}, {<<"size">>, byte_size(Bin)}, + {<<"part.num">>,PartNum}, {<<"part.total">>, PartTotal}, + {<<"meta">>, ?str(Meta)} | ?attrs(Data)]}, + fun(_SpanCtx) -> + handle_multipart_file_sync(MP, Name, Id, F, Meta, PartNum, PartTotal, Bin) + end + ), {keep_state, Data#data{scan=true, sub=S#server{multiparts=NewMP}}}; server_sync_files(info, {revault, _Marker, {deleted_file, F, Meta}}, Data=#data{name=Name, id=Id}) -> @@ -840,8 +859,14 @@ server_sync_files(info, {revault, _M, {conflict_file, WorkF, F, CountLeft, Meta, {keep_state, Data#data{scan=true}}; server_sync_files(info, {revault, _M, {conflict_multipart_file, WorkF, F, _CountLeft, Meta, PartNum, PartTotal, Bin}}, Data=#data{name=Name, sub=S=#server{multiparts=MP}}) -> - %% TODO: wrap in with_span - {_, NewMP} = handle_multipart_conflict_file_sync(MP, Name, WorkF, F, Meta, PartNum, PartTotal, Bin), + {_, NewMP} = ?with_span(<<"conflict_part">>, + #{attributes => [{<<"path">>, F}, {<<"size">>, byte_size(Bin)}, + {<<"part.num">>,PartNum}, {<<"part.total">>, PartTotal}, + {<<"meta">>, ?str(Meta)} | ?attrs(Data)]}, + fun(_SpanCtx) -> + handle_multipart_conflict_file_sync(MP, Name, WorkF, F, Meta, PartNum, PartTotal, Bin) + end + ), {keep_state, Data#data{scan=true, sub=S#server{multiparts=NewMP}}}; server_sync_files(info, {revault, Marker, {fetch, F}}, Data=#data{name=Name, path=Path, sub=S=#server{queue=Q, remote=R}}) -> @@ -849,14 +874,16 @@ server_sync_files(info, {revault, Marker, {fetch, F}}, NewQ = send_next_scheduled(TmpQ), {keep_state, Data#data{sub=S#server{queue=NewQ}}}; server_sync_files(cast, {send_reply, R, Marker, Transfer}, - Data=#data{path=Path, callback=Cb, - sub=S=#server{remote=R, queue=Q}}) -> - %% TODO: wrap in tracing + DataTmp=#data{path=Path, callback=Cb, + sub=S=#server{remote=R, queue=Q}}) -> + Data = start_span(<<"file_send">>, DataTmp), + set_attributes([{<<"remote">>, ?str(R)} | ?attrs(Data)]), Payload = wrap(Path, Transfer), %% TODO: track the success or failures of transfers, detect disconnections {_Marker, NewCb} = apply_cb(Cb, reply, [R, Marker, Payload]), + NewData = end_span(Data), NewQ = send_next_scheduled(Q), - {keep_state, Data#data{callback=NewCb, sub=S#server{queue=NewQ}}}; + {keep_state, NewData#data{callback=NewCb, sub=S#server{queue=NewQ}}}; server_sync_files(info, {revault, Marker, sync_complete}, DataTmp=#data{name=Name, callback=Cb, scan=ScanNeeded, sub=#server{remote=R}}) -> @@ -1185,17 +1212,33 @@ file_transfer_schedule(Name, Path, File) -> wrap(_Path, {deleted, File, Vsn}) -> + set_attribute(<<"file">>, File), + set_attribute(<<"type">>, <<"deleted">>), revault_data_wrapper:send_deleted(File, Vsn); wrap(Path, {conflict_file, File, FHash, Ct, Meta}) -> + set_attribute(<<"file">>, FHash), + set_attribute(<<"type">>, <<"conflict_file">>), + set_attribute(<<"conflict.ct">>, Ct), {ok, Bin} = revault_file:read_file(filename:join(Path, FHash)), revault_data_wrapper:send_conflict_file(File, FHash, Ct, Meta, Bin); wrap(Path, {file, File, {Vsn, Hash}}) -> + set_attribute(<<"file">>, File), + set_attribute(<<"type">>, <<"file">>), {ok, Bin} = revault_file:read_file(filename:join(Path, File)), revault_data_wrapper:send_file(File, Vsn, Hash, Bin); wrap(Path, {part, file, File, Vsn, Hash, Offset, SizeBytes, NumPart, TotalParts}) -> + set_attribute(<<"file">>, File), + set_attribute(<<"type">>, <<"multipart_file">>), + set_attribute(<<"part.num">>, NumPart), + set_attribute(<<"part.total">>, TotalParts), {ok, Bin} = revault_file:read_range(filename:join(Path, File), Offset, SizeBytes), revault_data_wrapper:send_multipart_file(File, Vsn, Hash, NumPart, TotalParts, Bin); wrap(Path, {part, {conflict_file, F, Ct}, FHash, Vsn, Hash, Offset, SizeBytes, NumPart, TotalParts}) -> + set_attribute(<<"file">>, FHash), + set_attribute(<<"type">>, <<"multipart_conflict_file">>), + set_attribute(<<"conflict.ct">>, Ct), + set_attribute(<<"part.num">>, NumPart), + set_attribute(<<"part.total">>, TotalParts), {ok, Bin} = revault_file:read_range(filename:join(Path, FHash), Offset, SizeBytes), revault_data_wrapper:send_conflict_multipart_file(F, FHash, Ct, {Vsn, Hash}, NumPart, TotalParts, Bin). @@ -1273,9 +1316,9 @@ set_attributes(Attrs) -> SpanCtx = otel_tracer:current_span_ctx(otel_ctx:get_current()), otel_span:set_attributes(SpanCtx, Attrs). -%set_attribute(Attr, Val) -> -% SpanCtx = otel_tracer:current_span_ctx(otel_ctx:get_current()), -% otel_span:set_attribute(SpanCtx, Attr, Val). +set_attribute(Attr, Val) -> + SpanCtx = otel_tracer:current_span_ctx(otel_ctx:get_current()), + otel_span:set_attribute(SpanCtx, Attr, Val). end_span(Data=#data{ctx=[]}) -> Data;