diff --git a/dune-project b/dune-project index c65eada..0c7e4d7 100644 --- a/dune-project +++ b/dune-project @@ -52,6 +52,7 @@ (ocaml (>= 4.08)) (trace (= :version)) (mtime (>= 2.0)) + base-bigarray base-unix dune) (tags diff --git a/src/fuchsia/bg_thread.ml b/src/fuchsia/bg_thread.ml index b8c9100..ecaf1c0 100644 --- a/src/fuchsia/bg_thread.ml +++ b/src/fuchsia/bg_thread.ml @@ -51,18 +51,6 @@ let bg_thread ~buf_pool ~out ~(events : event B_queue.t) () : unit = let st = { oc; buf_pool; events } in bg_loop st -(* TODO: - (* write a message about us closing *) - Writer.emit_instant_event ~name:"tef-worker.exit" - ~tid:(Thread.id @@ Thread.self ()) - ~ts:(now_us ()) ~args:[] writer; - - (* warn if app didn't close all spans *) - if Span_tbl.length spans > 0 then - Printf.eprintf "trace-tef: warning: %d spans were not closed\n%!" - (Span_tbl.length spans); -*) - (** Thread that simply regularly "ticks", sending events to the background thread so it has a chance to write to the file, and call [f()] *) diff --git a/src/fuchsia/common_.ml b/src/fuchsia/common_.ml index 38ded80..14b78bf 100644 --- a/src/fuchsia/common_.ml +++ b/src/fuchsia/common_.ml @@ -5,12 +5,6 @@ module Buf = FWrite.Buf module Buf_pool = FWrite.Buf_pool module Output = FWrite.Output -module Span_tbl = Hashtbl.Make (struct - include Int64 - - let hash : t -> int = Hashtbl.hash -end) - let on_tracing_error = ref (fun s -> Printf.eprintf "trace-fuchsia error: %s\n%!" s) diff --git a/src/fuchsia/dune b/src/fuchsia/dune index aedeea1..f67f2d4 100644 --- a/src/fuchsia/dune +++ b/src/fuchsia/dune @@ -5,5 +5,5 @@ (public_name trace-fuchsia) (synopsis "A high-performance backend for trace, emitting a Fuchsia trace into a file") (libraries trace.core trace.private.util thread-local-storage - (re_export trace-fuchsia.write) + (re_export trace-fuchsia.write) bigarray mtime mtime.clock.os atomic unix threads)) diff --git a/src/fuchsia/fcollector.ml b/src/fuchsia/fcollector.ml index aa43342..223def1 100644 --- a/src/fuchsia/fcollector.ml +++ b/src/fuchsia/fcollector.ml @@ -5,11 +5,101 @@ module Int_map = Map.Make (Int) let pid = Unix.getpid () -type span_info = { - start_time_ns: int64; - name: string; - mutable data: (string * user_data) list; -} +(** Thread-local stack of span info *) +module Span_info_stack : sig + type t + + val create : unit -> t + + val push : + t -> + span -> + name:string -> + start_time_ns:int64 -> + data:(string * user_data) list -> + unit + + val pop : t -> int64 * string * int64 * (string * user_data) list + val find_ : t -> span -> int option + val add_data : t -> int -> (string * user_data) list -> unit +end = struct + module BA = Bigarray + module BA1 = Bigarray.Array1 + + type int64arr = (int64, BA.int64_elt, BA.c_layout) BA1.t + + type t = { + mutable len: int; + mutable span: int64arr; + mutable start_time_ns: int64arr; + mutable name: string array; + mutable data: (string * user_data) list array; + } + + let create () : t = + { + len = 0; + span = BA1.create BA.Int64 BA.C_layout 64; + start_time_ns = BA1.create BA.Int64 BA.C_layout 64; + name = Array.make 64 ""; + data = Array.make 64 []; + } + + let[@inline] cap self = Array.length self.name + + let grow_ (self : t) : unit = + let new_cap = 2 * cap self in + let new_span = BA1.create BA.Int64 BA.C_layout new_cap in + BA1.blit self.span (BA1.sub new_span 0 self.len); + let new_startime_ns = BA1.create BA.Int64 BA.C_layout new_cap in + BA1.blit self.start_time_ns (BA1.sub new_startime_ns 0 self.len); + let new_name = Array.make new_cap "" in + Array.blit self.name 0 new_name 0 self.len; + let new_data = Array.make new_cap [] in + Array.blit self.data 0 new_data 0 self.len; + self.span <- new_span; + self.start_time_ns <- new_startime_ns; + self.name <- new_name; + self.data <- new_data + + let push (self : t) (span : int64) ~name ~start_time_ns ~data = + if cap self = self.len then grow_ self; + BA1.set self.span self.len span; + BA1.set self.start_time_ns self.len start_time_ns; + Array.set self.name self.len name; + Array.set self.data self.len data; + self.len <- self.len + 1 + + let pop (self : t) = + assert (self.len > 0); + self.len <- self.len - 1; + + let span = BA1.get self.span self.len in + let name = self.name.(self.len) in + let start_time_ns = BA1.get self.start_time_ns self.len in + let data = self.data.(self.len) in + + (* avoid holding onto old values *) + Array.set self.name self.len ""; + Array.set self.data self.len []; + + span, name, start_time_ns, data + + let[@inline] add_data self i d : unit = + assert (i < self.len); + self.data.(i) <- List.rev_append d self.data.(i) + + exception Found of int + + let[@inline] find_ (self : t) span : _ option = + try + for i = self.len - 1 downto 0 do + if Int64.equal (BA1.get self.span i) span then raise_notrace (Found i) + done; + + None + with Found i -> Some i +end type async_span_info = { async_id: int; @@ -33,7 +123,7 @@ type per_thread_state = { local_span_id_gen: int A.t; (** Used for thread-local spans *) mutable thread_ref: FWrite.Thread_ref.t; mutable out: Output.t option; - spans: span_info Span_tbl.t; (** In-flight spans *) + spans: Span_info_stack.t; (** In-flight spans *) } type state = { @@ -43,9 +133,9 @@ type state = { bg_thread: Thread.t; buf_pool: Buf_pool.t; next_thread_ref: int A.t; (** in [0x01 .. 0xff], to allocate thread refs *) - per_thread: per_thread_state Int_map.t A.t; + per_thread: per_thread_state Int_map.t A.t array; (** the state keeps tabs on thread-local state, so it can flush writers - at the end *) + at the end. This is a tid-sharded array of maps. *) } let key_thread_local_st : per_thread_state TLS.key = @@ -57,7 +147,7 @@ let key_thread_local_st : per_thread_state TLS.key = thread_ref = FWrite.Thread_ref.inline ~pid ~tid; local_span_id_gen = A.make 0; out = None; - spans = Span_tbl.create 32; + spans = Span_info_stack.create (); }) let out_of_st (st : state) : Output.t = @@ -74,7 +164,8 @@ struct let state_id = 1 + A.fetch_and_add state_id_ 1 (** prepare the thread's state *) - let[@inline never] update_local_state (self : per_thread_state) : unit = + let[@inline never] update_or_init_local_state (self : per_thread_state) : unit + = (* get an output *) let out = out_of_st st in self.out <- Some out; @@ -87,17 +178,22 @@ struct ); (* add to [st]'s list of threads *) + let shard_of_per_thread = st.per_thread.(self.tid land 0b1111) in while - let old = A.get st.per_thread in - not (A.compare_and_set st.per_thread old (Int_map.add self.tid self old)) + let old = A.get shard_of_per_thread in + not + (A.compare_and_set shard_of_per_thread old + (Int_map.add self.tid self old)) do () done; let on_exit _ = while - let old = A.get st.per_thread in - not (A.compare_and_set st.per_thread old (Int_map.remove self.tid old)) + let old = A.get shard_of_per_thread in + not + (A.compare_and_set shard_of_per_thread old + (Int_map.remove self.tid old)) do () done; @@ -111,21 +207,29 @@ struct (** Obtain the output for the current thread *) let[@inline] get_thread_output () : Output.t * per_thread_state = let tls = TLS.get key_thread_local_st in - if tls.state_id != state_id || tls.out == None then update_local_state tls; - Option.get tls.out, tls + if tls.state_id != state_id || tls.out == None then + update_or_init_local_state tls; + let out = + match tls.out with + | None -> assert false + | Some o -> o + in + out, tls + + let close_per_thread (tls : per_thread_state) = + Option.iter Output.flush tls.out + + (** flush all outputs *) + let flush_all_outputs_ () = + Array.iter + (fun shard -> + let tls_l = A.get shard in + Int_map.iter (fun _tid tls -> close_per_thread tls) tls_l) + st.per_thread let shutdown () = if A.exchange st.active false then ( - (* flush all outputs *) - let tls_l = A.get st.per_thread in - - (* FIXME: there's a potential race condition here. How to fix it - without overhead on every regular event? *) - Int_map.iter - (fun _tid (tls : per_thread_state) -> - Printf.eprintf "flush for %d\n%!" tls.tid; - Option.iter Output.flush tls.out) - tls_l; + flush_all_outputs_ (); B_queue.close st.events; (* wait for writer thread to be done. The writer thread will exit @@ -137,32 +241,34 @@ struct let tls = TLS.get key_thread_local_st in let span = Int64.of_int (A.fetch_and_add tls.local_span_id_gen 1) in let time_ns = Time.now_ns () in - Span_tbl.add tls.spans span { name; data; start_time_ns = time_ns }; + Span_info_stack.push tls.spans span ~name ~data ~start_time_ns:time_ns; span let exit_span span : unit = let out, tls = get_thread_output () in let end_time_ns = Time.now_ns () in - match Span_tbl.find_opt tls.spans span with - | None -> !on_tracing_error (spf "unknown span %Ld" span) - | Some info -> - Span_tbl.remove tls.spans span; - FWrite.Event.Duration_complete.encode out ~name:info.name - ~t_ref:tls.thread_ref ~time_ns:info.start_time_ns ~end_time_ns - ~args:info.data () + + let span', name, start_time_ns, data = Span_info_stack.pop tls.spans in + if span <> span' then + !on_tracing_error + (spf "span mismatch: top is %Ld, expected %Ld" span' span) + else + FWrite.Event.Duration_complete.encode out ~name ~t_ref:tls.thread_ref + ~time_ns:start_time_ns ~end_time_ns ~args:data () let with_span ~__FUNCTION__:_ ~__FILE__:_ ~__LINE__:_ ~data name f = let out, tls = get_thread_output () in let time_ns = Time.now_ns () in let span = Int64.of_int (A.fetch_and_add tls.local_span_id_gen 1) in - let info = { start_time_ns = time_ns; data; name } in - Span_tbl.add tls.spans span info; + Span_info_stack.push tls.spans span ~start_time_ns:time_ns ~data ~name; let[@inline] exit () : unit = let end_time_ns = Time.now_ns () in - Span_tbl.remove tls.spans span; + + let _span', _, _, data = Span_info_stack.pop tls.spans in + assert (span = _span'); FWrite.Event.Duration_complete.encode out ~name ~time_ns ~end_time_ns - ~t_ref:tls.thread_ref ~args:info.data () + ~t_ref:tls.thread_ref ~args:data () in try @@ -175,9 +281,9 @@ struct let add_data_to_span span data = let tls = TLS.get key_thread_local_st in - match Span_tbl.find_opt tls.spans span with + match Span_info_stack.find_ tls.spans span with | None -> !on_tracing_error (spf "unknown span %Ld" span) - | Some info -> info.data <- List.rev_append data info.data + | Some idx -> Span_info_stack.add_data tls.spans idx data let enter_manual_span ~(parent : explicit_span option) ~flavor ~__FUNCTION__:_ ~__FILE__:_ ~__LINE__:_ ~data name : explicit_span = @@ -262,7 +368,7 @@ let create ~out () : collector = events; span_id_gen = A.make 0; next_thread_ref = A.make 1; - per_thread = A.make Int_map.empty; + per_thread = Array.init 16 (fun _ -> A.make Int_map.empty); } in diff --git a/src/fuchsia/write/output.ml b/src/fuchsia/write/output.ml index 3911192..0ee6be7 100644 --- a/src/fuchsia/write/output.ml +++ b/src/fuchsia/write/output.ml @@ -10,10 +10,20 @@ let create ~(buf_pool : Buf_pool.t) ~send_buf () : t = { buf; send_buf; buf_pool } open struct + (* NOTE: there is a potential race condition if an output is + flushed from the main thread upon closing, while + the local thread is blissfully writing new records to it + as we're winding down the collector. This is trying to reduce + the likelyhood of a race happening. *) + let[@poll error] replace_buf_ (self : t) (new_buf : Buf.t) : Buf.t = + let old_buf = self.buf in + self.buf <- new_buf; + old_buf + let flush_ (self : t) : unit = - self.send_buf self.buf; - let buf = Buf_pool.alloc self.buf_pool in - self.buf <- buf + let new_buf = Buf_pool.alloc self.buf_pool in + let old_buf = replace_buf_ self new_buf in + self.send_buf old_buf let[@inline never] cycle_buf (self : t) ~available : Buf.t = flush_ self; diff --git a/trace-fuchsia.opam b/trace-fuchsia.opam index 20fa305..62ef782 100644 --- a/trace-fuchsia.opam +++ b/trace-fuchsia.opam @@ -13,6 +13,7 @@ depends: [ "ocaml" {>= "4.08"} "trace" {= version} "mtime" {>= "2.0"} + "base-bigarray" "base-unix" "dune" {>= "2.9"} "odoc" {with-doc}