Skip to content

Commit

Permalink
perf fuchsia: use a stack to hold in-flight spans, not a hashtable
Browse files Browse the repository at this point in the history
  • Loading branch information
c-cube committed Dec 27, 2023
1 parent 2e4971d commit bc92d97
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 63 deletions.
1 change: 1 addition & 0 deletions dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
(ocaml (>= 4.08))
(trace (= :version))
(mtime (>= 2.0))
base-bigarray
base-unix
dune)
(tags
Expand Down
12 changes: 0 additions & 12 deletions src/fuchsia/bg_thread.ml
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,6 @@ let bg_thread ~buf_pool ~out ~(events : event B_queue.t) () : unit =
let st = { oc; buf_pool; events } in
bg_loop st

(* TODO:
(* write a message about us closing *)
Writer.emit_instant_event ~name:"tef-worker.exit"
~tid:(Thread.id @@ Thread.self ())
~ts:(now_us ()) ~args:[] writer;
(* warn if app didn't close all spans *)
if Span_tbl.length spans > 0 then
Printf.eprintf "trace-tef: warning: %d spans were not closed\n%!"
(Span_tbl.length spans);
*)

(** Thread that simply regularly "ticks", sending events to
the background thread so it has a chance to write to the file,
and call [f()] *)
Expand Down
6 changes: 0 additions & 6 deletions src/fuchsia/common_.ml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,6 @@ module Buf = FWrite.Buf
module Buf_pool = FWrite.Buf_pool
module Output = FWrite.Output

module Span_tbl = Hashtbl.Make (struct
include Int64

let hash : t -> int = Hashtbl.hash
end)

let on_tracing_error =
ref (fun s -> Printf.eprintf "trace-fuchsia error: %s\n%!" s)

Expand Down
2 changes: 1 addition & 1 deletion src/fuchsia/dune
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@
(public_name trace-fuchsia)
(synopsis "A high-performance backend for trace, emitting a Fuchsia trace into a file")
(libraries trace.core trace.private.util thread-local-storage
(re_export trace-fuchsia.write)
(re_export trace-fuchsia.write) bigarray
mtime mtime.clock.os atomic unix threads))
188 changes: 147 additions & 41 deletions src/fuchsia/fcollector.ml
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,101 @@ module Int_map = Map.Make (Int)

let pid = Unix.getpid ()

type span_info = {
start_time_ns: int64;
name: string;
mutable data: (string * user_data) list;
}
(** Thread-local stack of span info *)
module Span_info_stack : sig
type t

val create : unit -> t

val push :
t ->
span ->
name:string ->
start_time_ns:int64 ->
data:(string * user_data) list ->
unit

val pop : t -> int64 * string * int64 * (string * user_data) list
val find_ : t -> span -> int option
val add_data : t -> int -> (string * user_data) list -> unit
end = struct
module BA = Bigarray
module BA1 = Bigarray.Array1

type int64arr = (int64, BA.int64_elt, BA.c_layout) BA1.t

type t = {
mutable len: int;
mutable span: int64arr;
mutable start_time_ns: int64arr;
mutable name: string array;
mutable data: (string * user_data) list array;
}

let create () : t =
{
len = 0;
span = BA1.create BA.Int64 BA.C_layout 64;
start_time_ns = BA1.create BA.Int64 BA.C_layout 64;
name = Array.make 64 "";
data = Array.make 64 [];
}

let[@inline] cap self = Array.length self.name

let grow_ (self : t) : unit =
let new_cap = 2 * cap self in
let new_span = BA1.create BA.Int64 BA.C_layout new_cap in
BA1.blit self.span (BA1.sub new_span 0 self.len);
let new_startime_ns = BA1.create BA.Int64 BA.C_layout new_cap in
BA1.blit self.start_time_ns (BA1.sub new_startime_ns 0 self.len);
let new_name = Array.make new_cap "" in
Array.blit self.name 0 new_name 0 self.len;
let new_data = Array.make new_cap [] in
Array.blit self.data 0 new_data 0 self.len;
self.span <- new_span;
self.start_time_ns <- new_startime_ns;
self.name <- new_name;
self.data <- new_data

let push (self : t) (span : int64) ~name ~start_time_ns ~data =
if cap self = self.len then grow_ self;
BA1.set self.span self.len span;
BA1.set self.start_time_ns self.len start_time_ns;
Array.set self.name self.len name;
Array.set self.data self.len data;
self.len <- self.len + 1

let pop (self : t) =
assert (self.len > 0);
self.len <- self.len - 1;

let span = BA1.get self.span self.len in
let name = self.name.(self.len) in
let start_time_ns = BA1.get self.start_time_ns self.len in
let data = self.data.(self.len) in

(* avoid holding onto old values *)
Array.set self.name self.len "";
Array.set self.data self.len [];

span, name, start_time_ns, data

let[@inline] add_data self i d : unit =
assert (i < self.len);
self.data.(i) <- List.rev_append d self.data.(i)

exception Found of int

let[@inline] find_ (self : t) span : _ option =
try
for i = self.len - 1 downto 0 do
if Int64.equal (BA1.get self.span i) span then raise_notrace (Found i)
done;

None
with Found i -> Some i
end

type async_span_info = {
async_id: int;
Expand All @@ -33,7 +123,7 @@ type per_thread_state = {
local_span_id_gen: int A.t; (** Used for thread-local spans *)
mutable thread_ref: FWrite.Thread_ref.t;
mutable out: Output.t option;
spans: span_info Span_tbl.t; (** In-flight spans *)
spans: Span_info_stack.t; (** In-flight spans *)
}

type state = {
Expand All @@ -43,9 +133,9 @@ type state = {
bg_thread: Thread.t;
buf_pool: Buf_pool.t;
next_thread_ref: int A.t; (** in [0x01 .. 0xff], to allocate thread refs *)
per_thread: per_thread_state Int_map.t A.t;
per_thread: per_thread_state Int_map.t A.t array;
(** the state keeps tabs on thread-local state, so it can flush writers
at the end *)
at the end. This is a tid-sharded array of maps. *)
}

let key_thread_local_st : per_thread_state TLS.key =
Expand All @@ -57,7 +147,7 @@ let key_thread_local_st : per_thread_state TLS.key =
thread_ref = FWrite.Thread_ref.inline ~pid ~tid;
local_span_id_gen = A.make 0;
out = None;
spans = Span_tbl.create 32;
spans = Span_info_stack.create ();
})

let out_of_st (st : state) : Output.t =
Expand All @@ -74,7 +164,8 @@ struct
let state_id = 1 + A.fetch_and_add state_id_ 1

(** prepare the thread's state *)
let[@inline never] update_local_state (self : per_thread_state) : unit =
let[@inline never] update_or_init_local_state (self : per_thread_state) : unit
=
(* get an output *)
let out = out_of_st st in
self.out <- Some out;
Expand All @@ -87,17 +178,22 @@ struct
);

(* add to [st]'s list of threads *)
let shard_of_per_thread = st.per_thread.(self.tid land 0b1111) in
while
let old = A.get st.per_thread in
not (A.compare_and_set st.per_thread old (Int_map.add self.tid self old))
let old = A.get shard_of_per_thread in
not
(A.compare_and_set shard_of_per_thread old
(Int_map.add self.tid self old))
do
()
done;

let on_exit _ =
while
let old = A.get st.per_thread in
not (A.compare_and_set st.per_thread old (Int_map.remove self.tid old))
let old = A.get shard_of_per_thread in
not
(A.compare_and_set shard_of_per_thread old
(Int_map.remove self.tid old))
do
()
done;
Expand All @@ -111,21 +207,29 @@ struct
(** Obtain the output for the current thread *)
let[@inline] get_thread_output () : Output.t * per_thread_state =
let tls = TLS.get key_thread_local_st in
if tls.state_id != state_id || tls.out == None then update_local_state tls;
Option.get tls.out, tls
if tls.state_id != state_id || tls.out == None then
update_or_init_local_state tls;
let out =
match tls.out with
| None -> assert false
| Some o -> o
in
out, tls

let close_per_thread (tls : per_thread_state) =
Option.iter Output.flush tls.out

(** flush all outputs *)
let flush_all_outputs_ () =
Array.iter
(fun shard ->
let tls_l = A.get shard in
Int_map.iter (fun _tid tls -> close_per_thread tls) tls_l)
st.per_thread

let shutdown () =
if A.exchange st.active false then (
(* flush all outputs *)
let tls_l = A.get st.per_thread in

(* FIXME: there's a potential race condition here. How to fix it
without overhead on every regular event? *)
Int_map.iter
(fun _tid (tls : per_thread_state) ->
Printf.eprintf "flush for %d\n%!" tls.tid;
Option.iter Output.flush tls.out)
tls_l;
flush_all_outputs_ ();

B_queue.close st.events;
(* wait for writer thread to be done. The writer thread will exit
Expand All @@ -137,32 +241,34 @@ struct
let tls = TLS.get key_thread_local_st in
let span = Int64.of_int (A.fetch_and_add tls.local_span_id_gen 1) in
let time_ns = Time.now_ns () in
Span_tbl.add tls.spans span { name; data; start_time_ns = time_ns };
Span_info_stack.push tls.spans span ~name ~data ~start_time_ns:time_ns;
span

let exit_span span : unit =
let out, tls = get_thread_output () in
let end_time_ns = Time.now_ns () in
match Span_tbl.find_opt tls.spans span with
| None -> !on_tracing_error (spf "unknown span %Ld" span)
| Some info ->
Span_tbl.remove tls.spans span;
FWrite.Event.Duration_complete.encode out ~name:info.name
~t_ref:tls.thread_ref ~time_ns:info.start_time_ns ~end_time_ns
~args:info.data ()

let span', name, start_time_ns, data = Span_info_stack.pop tls.spans in
if span <> span' then
!on_tracing_error
(spf "span mismatch: top is %Ld, expected %Ld" span' span)
else
FWrite.Event.Duration_complete.encode out ~name ~t_ref:tls.thread_ref
~time_ns:start_time_ns ~end_time_ns ~args:data ()

let with_span ~__FUNCTION__:_ ~__FILE__:_ ~__LINE__:_ ~data name f =
let out, tls = get_thread_output () in
let time_ns = Time.now_ns () in
let span = Int64.of_int (A.fetch_and_add tls.local_span_id_gen 1) in
let info = { start_time_ns = time_ns; data; name } in
Span_tbl.add tls.spans span info;
Span_info_stack.push tls.spans span ~start_time_ns:time_ns ~data ~name;

let[@inline] exit () : unit =
let end_time_ns = Time.now_ns () in
Span_tbl.remove tls.spans span;

let _span', _, _, data = Span_info_stack.pop tls.spans in
assert (span = _span');
FWrite.Event.Duration_complete.encode out ~name ~time_ns ~end_time_ns
~t_ref:tls.thread_ref ~args:info.data ()
~t_ref:tls.thread_ref ~args:data ()
in

try
Expand All @@ -175,9 +281,9 @@ struct

let add_data_to_span span data =
let tls = TLS.get key_thread_local_st in
match Span_tbl.find_opt tls.spans span with
match Span_info_stack.find_ tls.spans span with
| None -> !on_tracing_error (spf "unknown span %Ld" span)
| Some info -> info.data <- List.rev_append data info.data
| Some idx -> Span_info_stack.add_data tls.spans idx data

let enter_manual_span ~(parent : explicit_span option) ~flavor ~__FUNCTION__:_
~__FILE__:_ ~__LINE__:_ ~data name : explicit_span =
Expand Down Expand Up @@ -262,7 +368,7 @@ let create ~out () : collector =
events;
span_id_gen = A.make 0;
next_thread_ref = A.make 1;
per_thread = A.make Int_map.empty;
per_thread = Array.init 16 (fun _ -> A.make Int_map.empty);
}
in

Expand Down
16 changes: 13 additions & 3 deletions src/fuchsia/write/output.ml
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,20 @@ let create ~(buf_pool : Buf_pool.t) ~send_buf () : t =
{ buf; send_buf; buf_pool }

open struct
(* NOTE: there is a potential race condition if an output is
flushed from the main thread upon closing, while
the local thread is blissfully writing new records to it
as we're winding down the collector. This is trying to reduce
the likelyhood of a race happening. *)
let[@poll error] replace_buf_ (self : t) (new_buf : Buf.t) : Buf.t =
let old_buf = self.buf in
self.buf <- new_buf;
old_buf

let flush_ (self : t) : unit =
self.send_buf self.buf;
let buf = Buf_pool.alloc self.buf_pool in
self.buf <- buf
let new_buf = Buf_pool.alloc self.buf_pool in
let old_buf = replace_buf_ self new_buf in
self.send_buf old_buf

let[@inline never] cycle_buf (self : t) ~available : Buf.t =
flush_ self;
Expand Down
1 change: 1 addition & 0 deletions trace-fuchsia.opam
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ depends: [
"ocaml" {>= "4.08"}
"trace" {= version}
"mtime" {>= "2.0"}
"base-bigarray"
"base-unix"
"dune" {>= "2.9"}
"odoc" {with-doc}
Expand Down

0 comments on commit bc92d97

Please sign in to comment.