diff --git a/.gitignore b/.gitignore index 9a70a80..da14214 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,5 @@ _opam _build *.json *.exe +perf.* +*.fxt diff --git a/README.md b/README.md index 722a536..8657920 100644 --- a/README.md +++ b/README.md @@ -117,6 +117,21 @@ let f x y z = raise e ``` +Alternatively, a name can be provided for the span, which is useful if you want +to access it and use functions like `Trace.add_data_to_span`: + + +```ocaml +let%trace f x y z = + do_sth x; + do_sth y; + begin + let%trace _sp = "sub-span" in + do_sth z; + Trace.add_data_to_span _sp ["x", `Int 42] + end +``` + ### Dune configuration In your `library` or `executable` stanza, add: `(preprocess (pps ppx_trace))`. diff --git a/bench/bench_fuchsia_write.ml b/bench/bench_fuchsia_write.ml new file mode 100644 index 0000000..a62149e --- /dev/null +++ b/bench/bench_fuchsia_write.ml @@ -0,0 +1,58 @@ +open Trace_fuchsia_write +module B = Benchmark + +let pf = Printf.printf + +let encode_1_span (out : Output.t) () = + Event.Duration_complete.encode out ~name:"span" ~t_ref:(Thread_ref.Ref 5) + ~time_ns:100_000L ~end_time_ns:5_000_000L ~args:[] () + +let encode_3_span (out : Output.t) () = + Event.Duration_complete.encode out ~name:"outer" ~t_ref:(Thread_ref.Ref 5) + ~time_ns:100_000L ~end_time_ns:5_000_000L ~args:[] (); + Event.Duration_complete.encode out ~name:"inner" ~t_ref:(Thread_ref.Ref 5) + ~time_ns:180_000L ~end_time_ns:4_500_000L ~args:[] (); + Event.Instant.encode out ~name:"hello" ~time_ns:1_234_567L + ~t_ref:(Thread_ref.Ref 5) + ~args:[ "x", `Int 42 ] + () + +let time_per_iter_ns (samples : B.t list) : float = + let n_iters = ref 0L in + let time = ref 0. in + List.iter + (fun (s : B.t) -> + n_iters := Int64.add !n_iters s.iters; + time := !time +. s.stime +. s.utime) + samples; + !time *. 1e9 /. Int64.to_float !n_iters + +let () = + let buf_pool = Buf_pool.create () in + let out = + Output.create ~buf_pool + ~send_buf:(fun buf -> Buf_pool.recycle buf_pool buf) + () + in + + let samples = B.throughput1 4 ~name:"encode_1_span" (encode_1_span out) () in + B.print_gc samples; + + let [ (_, samples) ] = samples [@@warning "-8"] in + + let iter_per_ns = time_per_iter_ns samples in + pf "%.3f ns/iter\n" iter_per_ns; + + () + +let () = + let buf_pool = Buf_pool.create () in + let out = + Output.create ~buf_pool + ~send_buf:(fun buf -> Buf_pool.recycle buf_pool buf) + () + in + + let samples = B.throughput1 4 ~name:"encode_3_span" (encode_3_span out) () in + B.print_gc samples; + () diff --git a/bench/dune b/bench/dune index c6fec56..ade51c9 100644 --- a/bench/dune +++ b/bench/dune @@ -1,4 +1,16 @@ (executable (name trace1) + (modules trace1) (libraries trace.core trace-tef)) + +(executable + (name trace_fx) + (modules trace_fx) + (preprocess (pps ppx_trace)) + (libraries trace.core trace-fuchsia)) + +(executable + (name bench_fuchsia_write) + (modules bench_fuchsia_write) + (libraries benchmark trace-fuchsia.write)) diff --git a/bench/trace_fx.ml b/bench/trace_fx.ml new file mode 100644 index 0000000..b2c0b8f --- /dev/null +++ b/bench/trace_fx.ml @@ -0,0 +1,50 @@ +module Trace = Trace_core + +let ( let@ ) = ( @@ ) + +let work ~dom_idx ~n () : unit = + Trace_core.set_thread_name (Printf.sprintf "worker%d" dom_idx); + for _i = 1 to n do + let%trace _sp = "outer" in + Trace_core.add_data_to_span _sp [ "i", `Int _i ]; + for _k = 1 to 10 do + let%trace _sp = "inner" in + () + done; + + (* Thread.delay 1e-6 *) + if dom_idx = 0 && _i mod 4096 = 0 then ( + Trace_core.message "gc stats"; + let stat = Gc.quick_stat () in + Trace_core.counter_float "gc.minor" (8. *. stat.minor_words); + Trace_core.counter_float "gc.major" (8. *. stat.major_words) + ) + done + +let main ~n ~j () : unit = + let domains = + Array.init j (fun dom_idx -> Domain.spawn (fun () -> work ~dom_idx ~n ())) + in + + let%trace () = "join" in + Array.iter Domain.join domains + +let () = + let@ () = Trace_fuchsia.with_setup () in + Trace_core.set_process_name "trace_fxt1"; + Trace_core.set_thread_name "main"; + + let%trace () = "main" in + + let n = ref 10_000 in + let j = ref 4 in + + let args = + [ + "-n", Arg.Set_int n, " number of iterations"; + "-j", Arg.Set_int j, " set number of workers"; + ] + |> Arg.align + in + Arg.parse args ignore "bench1"; + main ~n:!n ~j:!j () diff --git a/bench_fx.sh b/bench_fx.sh new file mode 100755 index 0000000..f2ba8ae --- /dev/null +++ b/bench_fx.sh @@ -0,0 +1,3 @@ +#!/bin/sh +DUNE_OPTS="--profile=release --display=quiet" +exec dune exec $DUNE_OPTS bench/trace_fx.exe -- $@ diff --git a/dune b/dune index b6f39d0..db65d82 100644 --- a/dune +++ b/dune @@ -1,4 +1,4 @@ (env - (_ (flags :standard -strict-sequence -warn-error -a+8+26+27 -w +a-4-40-70))) + (_ (flags :standard -strict-sequence -warn-error -a+8+26+27 -w +a-4-40-42-44-70))) diff --git a/dune-project b/dune-project index c2f4f4b..0c7e4d7 100644 --- a/dune-project +++ b/dune-project @@ -41,9 +41,21 @@ (trace (= :version)) (mtime (>= 2.0)) base-unix - atomic dune) (tags (trace tracing catapult))) +(package + (name trace-fuchsia) + (synopsis "A high-performance backend for trace, emitting a Fuchsia trace into a file") + (depends + (ocaml (>= 4.08)) + (trace (= :version)) + (mtime (>= 2.0)) + base-bigarray + base-unix + dune) + (tags + (trace tracing fuchsia))) + ; See the complete stanza docs at https://dune.readthedocs.io/en/stable/dune-files.html#dune-project diff --git a/src/fuchsia/bg_thread.ml b/src/fuchsia/bg_thread.ml new file mode 100644 index 0000000..ecaf1c0 --- /dev/null +++ b/src/fuchsia/bg_thread.ml @@ -0,0 +1,63 @@ +open Common_ + +type out = + [ `Stdout + | `Stderr + | `File of string + ] + +type event = + | E_write_buf of Buf.t + | E_tick + +type state = { + buf_pool: Buf_pool.t; + oc: out_channel; + events: event B_queue.t; +} + +let with_out_ (out : out) f = + let oc, must_close = + match out with + | `Stdout -> stdout, false + | `Stderr -> stderr, false + | `File path -> open_out path, true + in + + if must_close then ( + let finally () = close_out_noerr oc in + Fun.protect ~finally (fun () -> f oc) + ) else + f oc + +let handle_ev (self : state) (ev : event) : unit = + match ev with + | E_tick -> flush self.oc + | E_write_buf buf -> + output self.oc buf.buf 0 buf.offset; + Buf_pool.recycle self.buf_pool buf + +let bg_loop (self : state) : unit = + let continue = ref true in + + while !continue do + match B_queue.pop_all self.events with + | exception B_queue.Closed -> continue := false + | evs -> List.iter (handle_ev self) evs + done + +let bg_thread ~buf_pool ~out ~(events : event B_queue.t) () : unit = + let@ oc = with_out_ out in + let st = { oc; buf_pool; events } in + bg_loop st + +(** Thread that simply regularly "ticks", sending events to + the background thread so it has a chance to write to the file, + and call [f()] *) +let tick_thread events : unit = + try + while true do + Thread.delay 0.5; + B_queue.push events E_tick + done + with B_queue.Closed -> () diff --git a/src/fuchsia/common_.ml b/src/fuchsia/common_.ml new file mode 100644 index 0000000..14b78bf --- /dev/null +++ b/src/fuchsia/common_.ml @@ -0,0 +1,12 @@ +module A = Trace_core.Internal_.Atomic_ +module FWrite = Trace_fuchsia_write +module B_queue = Trace_private_util.B_queue +module Buf = FWrite.Buf +module Buf_pool = FWrite.Buf_pool +module Output = FWrite.Output + +let on_tracing_error = + ref (fun s -> Printf.eprintf "trace-fuchsia error: %s\n%!" s) + +let ( let@ ) = ( @@ ) +let spf = Printf.sprintf diff --git a/src/fuchsia/dune b/src/fuchsia/dune new file mode 100644 index 0000000..f67f2d4 --- /dev/null +++ b/src/fuchsia/dune @@ -0,0 +1,9 @@ + + +(library + (name trace_fuchsia) + (public_name trace-fuchsia) + (synopsis "A high-performance backend for trace, emitting a Fuchsia trace into a file") + (libraries trace.core trace.private.util thread-local-storage + (re_export trace-fuchsia.write) bigarray + mtime mtime.clock.os atomic unix threads)) diff --git a/src/fuchsia/fcollector.ml b/src/fuchsia/fcollector.ml new file mode 100644 index 0000000..0f5b486 --- /dev/null +++ b/src/fuchsia/fcollector.ml @@ -0,0 +1,395 @@ +open Trace_core +open Common_ +module TLS = Thread_local_storage +module Int_map = Map.Make (Int) + +let pid = Unix.getpid () + +(** Thread-local stack of span info *) +module Span_info_stack : sig + type t + + val create : unit -> t + + val push : + t -> + span -> + name:string -> + start_time_ns:int64 -> + data:(string * user_data) list -> + unit + + val pop : t -> int64 * string * int64 * (string * user_data) list + val find_ : t -> span -> int option + val add_data : t -> int -> (string * user_data) list -> unit +end = struct + module BA = Bigarray + module BA1 = Bigarray.Array1 + + type int64arr = (int64, BA.int64_elt, BA.c_layout) BA1.t + + type t = { + mutable len: int; + mutable span: int64arr; + mutable start_time_ns: int64arr; + mutable name: string array; + mutable data: (string * user_data) list array; + } + + let init_size_ = 1 + + let create () : t = + { + len = 0; + span = BA1.create BA.Int64 BA.C_layout init_size_; + start_time_ns = BA1.create BA.Int64 BA.C_layout init_size_; + name = Array.make init_size_ ""; + data = Array.make init_size_ []; + } + + let[@inline] cap self = Array.length self.name + + let grow_ (self : t) : unit = + let new_cap = 2 * cap self in + let new_span = BA1.create BA.Int64 BA.C_layout new_cap in + BA1.blit self.span (BA1.sub new_span 0 self.len); + let new_startime_ns = BA1.create BA.Int64 BA.C_layout new_cap in + BA1.blit self.start_time_ns (BA1.sub new_startime_ns 0 self.len); + let new_name = Array.make new_cap "" in + Array.blit self.name 0 new_name 0 self.len; + let new_data = Array.make new_cap [] in + Array.blit self.data 0 new_data 0 self.len; + self.span <- new_span; + self.start_time_ns <- new_startime_ns; + self.name <- new_name; + self.data <- new_data + + let push (self : t) (span : int64) ~name ~start_time_ns ~data = + if cap self = self.len then grow_ self; + BA1.set self.span self.len span; + BA1.set self.start_time_ns self.len start_time_ns; + Array.set self.name self.len name; + Array.set self.data self.len data; + self.len <- self.len + 1 + + let pop (self : t) = + assert (self.len > 0); + self.len <- self.len - 1; + + let span = BA1.get self.span self.len in + let name = self.name.(self.len) in + let start_time_ns = BA1.get self.start_time_ns self.len in + let data = self.data.(self.len) in + + (* avoid holding onto old values *) + Array.set self.name self.len ""; + Array.set self.data self.len []; + + span, name, start_time_ns, data + + let[@inline] add_data self i d : unit = + assert (i < self.len); + self.data.(i) <- List.rev_append d self.data.(i) + + exception Found of int + + let[@inline] find_ (self : t) span : _ option = + try + for i = self.len - 1 downto 0 do + if Int64.equal (BA1.get self.span i) span then raise_notrace (Found i) + done; + + None + with Found i -> Some i +end + +type async_span_info = { + async_id: int; + flavor: [ `Sync | `Async ] option; + name: string; + mutable data: (string * user_data) list; +} + +let key_async_data : async_span_info Meta_map.Key.t = Meta_map.Key.create () + +open struct + let state_id_ = A.make 0 + + (* re-raise exception with its backtrace *) + external reraise : exn -> 'a = "%reraise" +end + +type per_thread_state = { + tid: int; + state_id: int; (** ID of the current collector state *) + local_span_id_gen: int A.t; (** Used for thread-local spans *) + mutable thread_ref: FWrite.Thread_ref.t; + mutable out: Output.t option; + spans: Span_info_stack.t; (** In-flight spans *) +} + +type state = { + active: bool A.t; + events: Bg_thread.event B_queue.t; + span_id_gen: int A.t; (** Used for async spans *) + bg_thread: Thread.t; + buf_pool: Buf_pool.t; + next_thread_ref: int A.t; (** in [0x01 .. 0xff], to allocate thread refs *) + per_thread: per_thread_state Int_map.t A.t array; + (** the state keeps tabs on thread-local state, so it can flush writers + at the end. This is a tid-sharded array of maps. *) +} + +let key_thread_local_st : per_thread_state TLS.key = + TLS.new_key (fun () -> + let tid = Thread.id @@ Thread.self () in + { + tid; + state_id = A.get state_id_; + thread_ref = FWrite.Thread_ref.inline ~pid ~tid; + local_span_id_gen = A.make 0; + out = None; + spans = Span_info_stack.create (); + }) + +let out_of_st (st : state) : Output.t = + FWrite.Output.create () ~buf_pool:st.buf_pool ~send_buf:(fun buf -> + try B_queue.push st.events (E_write_buf buf) with B_queue.Closed -> ()) + +module C (St : sig + val st : state +end) +() = +struct + open St + + let state_id = 1 + A.fetch_and_add state_id_ 1 + + (** prepare the thread's state *) + let[@inline never] update_or_init_local_state (self : per_thread_state) : unit + = + (* get an output *) + let out = out_of_st st in + self.out <- Some out; + + (* try to allocate a thread ref for current thread *) + let th_ref = A.fetch_and_add st.next_thread_ref 1 in + if th_ref <= 0xff then ( + self.thread_ref <- FWrite.Thread_ref.ref th_ref; + FWrite.Thread_record.encode out ~as_ref:th_ref ~tid:self.tid ~pid () + ); + + (* add to [st]'s list of threads *) + let shard_of_per_thread = st.per_thread.(self.tid land 0b1111) in + while + let old = A.get shard_of_per_thread in + not + (A.compare_and_set shard_of_per_thread old + (Int_map.add self.tid self old)) + do + () + done; + + let on_exit _ = + while + let old = A.get shard_of_per_thread in + not + (A.compare_and_set shard_of_per_thread old + (Int_map.remove self.tid old)) + do + () + done; + Option.iter Output.flush self.out + in + + (* after thread exits, flush output and remove from global list *) + Gc.finalise on_exit (Thread.self ()); + () + + (** Obtain the output for the current thread *) + let[@inline] get_thread_output () : Output.t * per_thread_state = + let tls = TLS.get key_thread_local_st in + if tls.state_id != state_id || tls.out == None then + update_or_init_local_state tls; + let out = + match tls.out with + | None -> assert false + | Some o -> o + in + out, tls + + let close_per_thread (tls : per_thread_state) = + Option.iter Output.flush tls.out + + (** flush all outputs *) + let flush_all_outputs_ () = + Array.iter + (fun shard -> + let tls_l = A.get shard in + Int_map.iter (fun _tid tls -> close_per_thread tls) tls_l) + st.per_thread + + let shutdown () = + if A.exchange st.active false then ( + flush_all_outputs_ (); + + B_queue.close st.events; + (* wait for writer thread to be done. The writer thread will exit + after processing remaining events because the queue is now closed *) + Thread.join st.bg_thread + ) + + let enter_span ~__FUNCTION__:_ ~__FILE__:_ ~__LINE__:_ ~data name : span = + let tls = TLS.get key_thread_local_st in + let span = Int64.of_int (A.fetch_and_add tls.local_span_id_gen 1) in + let time_ns = Time.now_ns () in + Span_info_stack.push tls.spans span ~name ~data ~start_time_ns:time_ns; + span + + let exit_span span : unit = + let out, tls = get_thread_output () in + let end_time_ns = Time.now_ns () in + + let span', name, start_time_ns, data = Span_info_stack.pop tls.spans in + if span <> span' then + !on_tracing_error + (spf "span mismatch: top is %Ld, expected %Ld" span' span) + else + FWrite.Event.Duration_complete.encode out ~name ~t_ref:tls.thread_ref + ~time_ns:start_time_ns ~end_time_ns ~args:data () + + let with_span ~__FUNCTION__:_ ~__FILE__:_ ~__LINE__:_ ~data name f = + let out, tls = get_thread_output () in + let time_ns = Time.now_ns () in + let span = Int64.of_int (A.fetch_and_add tls.local_span_id_gen 1) in + Span_info_stack.push tls.spans span ~start_time_ns:time_ns ~data ~name; + + let[@inline] exit () : unit = + let end_time_ns = Time.now_ns () in + + let _span', _, _, data = Span_info_stack.pop tls.spans in + assert (span = _span'); + FWrite.Event.Duration_complete.encode out ~name ~time_ns ~end_time_ns + ~t_ref:tls.thread_ref ~args:data () + in + + try + let x = f span in + exit (); + x + with exn -> + exit (); + reraise exn + + let add_data_to_span span data = + let tls = TLS.get key_thread_local_st in + match Span_info_stack.find_ tls.spans span with + | None -> !on_tracing_error (spf "unknown span %Ld" span) + | Some idx -> Span_info_stack.add_data tls.spans idx data + + let enter_manual_span ~(parent : explicit_span option) ~flavor ~__FUNCTION__:_ + ~__FILE__:_ ~__LINE__:_ ~data name : explicit_span = + let out, tls = get_thread_output () in + let time_ns = Time.now_ns () in + + (* get the id, or make a new one *) + let async_id = + match parent with + | Some m -> (Meta_map.find_exn key_async_data m.meta).async_id + | None -> A.fetch_and_add st.span_id_gen 1 + in + + FWrite.Event.Async_begin.encode out ~name ~args:data ~t_ref:tls.thread_ref + ~time_ns ~async_id (); + { + span = 0L; + meta = + Meta_map.( + empty |> add key_async_data { async_id; name; flavor; data = [] }); + } + + let exit_manual_span (es : explicit_span) : unit = + let { async_id; name; data; flavor = _ } = + Meta_map.find_exn key_async_data es.meta + in + let out, tls = get_thread_output () in + let time_ns = Time.now_ns () in + + FWrite.Event.Async_end.encode out ~name ~t_ref:tls.thread_ref ~time_ns + ~args:data ~async_id () + + let add_data_to_manual_span (es : explicit_span) data = + let m = Meta_map.find_exn key_async_data es.meta in + m.data <- List.rev_append data m.data + + let message ?span:_ ~data msg : unit = + let out, tls = get_thread_output () in + let time_ns = Time.now_ns () in + FWrite.Event.Instant.encode out ~name:msg ~time_ns ~t_ref:tls.thread_ref + ~args:data () + + let counter_float ~data name f = + let out, tls = get_thread_output () in + let time_ns = Time.now_ns () in + FWrite.Event.Counter.encode out ~name:"c" ~time_ns ~t_ref:tls.thread_ref + ~args:((name, `Float f) :: data) + () + + let counter_int ~data name i = + let out, tls = get_thread_output () in + let time_ns = Time.now_ns () in + FWrite.Event.Counter.encode out ~name:"c" ~time_ns ~t_ref:tls.thread_ref + ~args:((name, `Int i) :: data) + () + + let name_process name : unit = + let out, _tls = get_thread_output () in + FWrite.Kernel_object.(encode out ~name ~ty:ty_process ~kid:pid ~args:[] ()) + + let name_thread name : unit = + let out, tls = get_thread_output () in + FWrite.Kernel_object.( + encode out ~name ~ty:ty_thread ~kid:tls.tid + ~args:[ "process", `Kid pid ] + ()) +end + +let create ~out () : collector = + let buf_pool = Buf_pool.create () in + let events = B_queue.create () in + + let bg_thread = + Thread.create (Bg_thread.bg_thread ~buf_pool ~out ~events) () + in + + let st = + { + active = A.make true; + buf_pool; + bg_thread; + events; + span_id_gen = A.make 0; + next_thread_ref = A.make 1; + per_thread = Array.init 16 (fun _ -> A.make Int_map.empty); + } + in + + let _tick_thread = Thread.create (fun () -> Bg_thread.tick_thread events) in + + (* write header *) + let out = out_of_st st in + FWrite.Metadata.Magic_record.encode out; + FWrite.Metadata.Initialization_record.( + encode out ~ticks_per_secs:default_ticks_per_sec ()); + FWrite.Metadata.Provider_info.encode out ~id:0 ~name:"ocaml-trace" (); + Output.flush out; + Output.dispose out; + + let module Coll = + C + (struct + let st = st + end) + () + in + (module Coll) diff --git a/src/fuchsia/fcollector.mli b/src/fuchsia/fcollector.mli new file mode 100644 index 0000000..780b3f1 --- /dev/null +++ b/src/fuchsia/fcollector.mli @@ -0,0 +1,3 @@ +open Trace_core + +val create : out:Bg_thread.out -> unit -> collector diff --git a/src/fuchsia/time.ml b/src/fuchsia/time.ml new file mode 100644 index 0000000..d988369 --- /dev/null +++ b/src/fuchsia/time.ml @@ -0,0 +1,6 @@ +let counter = Mtime_clock.counter () + +(** Now, in nanoseconds *) +let[@inline] now_ns () : int64 = + let t = Mtime_clock.count counter in + Mtime.Span.to_uint64_ns t diff --git a/src/fuchsia/trace_fuchsia.ml b/src/fuchsia/trace_fuchsia.ml new file mode 100644 index 0000000..102a744 --- /dev/null +++ b/src/fuchsia/trace_fuchsia.ml @@ -0,0 +1,38 @@ +open Common_ + +type output = + [ `Stdout + | `Stderr + | `File of string + ] + +let collector = Fcollector.create + +let setup ?(out = `Env) () = + match out with + | `Stderr -> Trace_core.setup_collector @@ Fcollector.create ~out:`Stderr () + | `Stdout -> Trace_core.setup_collector @@ Fcollector.create ~out:`Stdout () + | `File path -> + Trace_core.setup_collector @@ Fcollector.create ~out:(`File path) () + | `Env -> + (match Sys.getenv_opt "TRACE" with + | Some ("1" | "true") -> + let path = "trace.fxt" in + let c = Fcollector.create ~out:(`File path) () in + Trace_core.setup_collector c + | Some "stdout" -> + Trace_core.setup_collector @@ Fcollector.create ~out:`Stdout () + | Some "stderr" -> + Trace_core.setup_collector @@ Fcollector.create ~out:`Stderr () + | Some path -> + let c = Fcollector.create ~out:(`File path) () in + Trace_core.setup_collector c + | None -> ()) + +let with_setup ?out () f = + setup ?out (); + Fun.protect ~finally:Trace_core.shutdown f + +module Internal_ = struct + let on_tracing_error = on_tracing_error +end diff --git a/src/fuchsia/trace_fuchsia.mli b/src/fuchsia/trace_fuchsia.mli new file mode 100644 index 0000000..b08620a --- /dev/null +++ b/src/fuchsia/trace_fuchsia.mli @@ -0,0 +1,46 @@ +val collector : + out:[ `File of string | `Stderr | `Stdout ] -> unit -> Trace_core.collector +(** Make a collector that writes into the given output. + See {!setup} for more details. *) + +type output = + [ `Stdout + | `Stderr + | `File of string + ] +(** Output for tracing. + + - [`Stdout] will enable tracing and print events on stdout + - [`Stderr] will enable tracing and print events on stderr + - [`File "foo"] will enable tracing and print events into file + named "foo" +*) + +val setup : ?out:[ output | `Env ] -> unit -> unit +(** [setup ()] installs the collector depending on [out]. + + @param out can take different values: + - regular {!output} value to specify where events go + - [`Env] will enable tracing if the environment + variable "TRACE" is set. + + - If it's set to "1", then the file is "trace.fxt". + - If it's set to "stdout", then logging happens on stdout (since 0.2) + - If it's set to "stderr", then logging happens on stdout (since 0.2) + - Otherwise, if it's set to a non empty string, the value is taken + to be the file path into which to write. +*) + +val with_setup : ?out:[ output | `Env ] -> unit -> (unit -> 'a) -> 'a +(** [with_setup () f] (optionally) sets a collector up, calls [f()], + and makes sure to shutdown before exiting. + since 0.2 a () argument was added. +*) + +(**/**) + +module Internal_ : sig + val on_tracing_error : (string -> unit) ref +end + +(**/**) diff --git a/src/fuchsia/write/buf.ml b/src/fuchsia/write/buf.ml new file mode 100644 index 0000000..52ac55a --- /dev/null +++ b/src/fuchsia/write/buf.ml @@ -0,0 +1,42 @@ +open Util + +type t = { + buf: bytes; + mutable offset: int; +} + +let empty : t = { buf = Bytes.empty; offset = 0 } + +let create (n : int) : t = + let buf = Bytes.create (round_to_word n) in + { buf; offset = 0 } + +let[@inline] clear self = self.offset <- 0 +let[@inline] available self = Bytes.length self.buf - self.offset +let[@inline] size self = self.offset + +(* see below: we assume little endian *) +let () = assert (not Sys.big_endian) + +let[@inline] add_i64 (self : t) (i : int64) : unit = + (* NOTE: we use LE, most systems are this way, even though fuchsia + says we should use the system's native endianess *) + Bytes.set_int64_le self.buf self.offset i; + self.offset <- self.offset + 8 + +let[@inline] add_string (self : t) (s : string) : unit = + let len = String.length s in + let missing = missing_to_round len in + + (* bound check *) + assert (len + missing + self.offset <= Bytes.length self.buf); + Bytes.unsafe_blit_string s 0 self.buf self.offset len; + self.offset <- self.offset + len; + + (* add 0-padding *) + if missing != 0 then ( + Bytes.unsafe_fill self.buf self.offset missing '\x00'; + self.offset <- self.offset + missing + ) + +let to_string (self : t) : string = Bytes.sub_string self.buf 0 self.offset diff --git a/src/fuchsia/write/buf_pool.ml b/src/fuchsia/write/buf_pool.ml new file mode 100644 index 0000000..961a2d3 --- /dev/null +++ b/src/fuchsia/write/buf_pool.ml @@ -0,0 +1,58 @@ +open struct + module A = Trace_core.Internal_.Atomic_ + + exception Got_buf of Buf.t +end + +module List_with_len = struct + type +'a t = + | Nil + | Cons of int * 'a * 'a t + + let empty : _ t = Nil + + let[@inline] len = function + | Nil -> 0 + | Cons (i, _, _) -> i + + let[@inline] cons x self = Cons (len self + 1, x, self) +end + +type t = { + max_len: int; + buf_size: int; + bufs: Buf.t List_with_len.t A.t; +} + +let create ?(max_len = 64) ?(buf_size = 1 lsl 16) () : t = + let buf_size = min (1 lsl 22) (max buf_size (1 lsl 15)) in + { max_len; buf_size; bufs = A.make List_with_len.empty } + +let alloc (self : t) : Buf.t = + try + while + match A.get self.bufs with + | Nil -> false + | Cons (_, buf, tl) as old -> + if A.compare_and_set self.bufs old tl then + raise (Got_buf buf) + else + false + do + () + done; + Buf.create self.buf_size + with Got_buf b -> b + +let recycle (self : t) (buf : Buf.t) : unit = + Buf.clear buf; + try + while + match A.get self.bufs with + | Cons (i, _, _) when i >= self.max_len -> raise Exit + | old -> + not (A.compare_and_set self.bufs old (List_with_len.cons buf old)) + do + () + done + with Exit -> () (* do not recycle *) diff --git a/src/fuchsia/write/dune b/src/fuchsia/write/dune new file mode 100644 index 0000000..b728e53 --- /dev/null +++ b/src/fuchsia/write/dune @@ -0,0 +1,9 @@ + +(library + (name trace_fuchsia_write) + (public_name trace-fuchsia.write) + (synopsis "Serialization part of trace-fuchsia") + (ocamlopt_flags :standard -S + ;-dlambda + ) + (libraries trace.core threads)) diff --git a/src/fuchsia/write/output.ml b/src/fuchsia/write/output.ml new file mode 100644 index 0000000..0ee6be7 --- /dev/null +++ b/src/fuchsia/write/output.ml @@ -0,0 +1,56 @@ +type t = { + mutable buf: Buf.t; + mutable send_buf: Buf.t -> unit; + buf_pool: Buf_pool.t; +} + +let create ~(buf_pool : Buf_pool.t) ~send_buf () : t = + let buf_size = buf_pool.buf_size in + let buf = Buf.create buf_size in + { buf; send_buf; buf_pool } + +open struct + (* NOTE: there is a potential race condition if an output is + flushed from the main thread upon closing, while + the local thread is blissfully writing new records to it + as we're winding down the collector. This is trying to reduce + the likelyhood of a race happening. *) + let[@poll error] replace_buf_ (self : t) (new_buf : Buf.t) : Buf.t = + let old_buf = self.buf in + self.buf <- new_buf; + old_buf + + let flush_ (self : t) : unit = + let new_buf = Buf_pool.alloc self.buf_pool in + let old_buf = replace_buf_ self new_buf in + self.send_buf old_buf + + let[@inline never] cycle_buf (self : t) ~available : Buf.t = + flush_ self; + let buf = self.buf in + + if Buf.available buf < available then + failwith "fuchsia: buffer is too small"; + buf +end + +let[@inline] flush (self : t) : unit = if Buf.size self.buf > 0 then flush_ self + +(** Obtain a buffer with at least [available] bytes *) +let[@inline] get_buf (self : t) ~(available_word : int) : Buf.t = + let available = available_word lsl 3 in + if Buf.available self.buf >= available then + self.buf + else + cycle_buf self ~available + +let into_buffer ~buf_pool (buffer : Buffer.t) : t = + let send_buf (buf : Buf.t) = + Buffer.add_subbytes buffer buf.buf 0 buf.offset + in + create ~buf_pool ~send_buf () + +let dispose (self : t) : unit = + flush_ self; + Buf_pool.recycle self.buf_pool self.buf; + self.buf <- Buf.empty diff --git a/src/fuchsia/write/trace_fuchsia_write.ml b/src/fuchsia/write/trace_fuchsia_write.ml new file mode 100644 index 0000000..ebcc8cf --- /dev/null +++ b/src/fuchsia/write/trace_fuchsia_write.ml @@ -0,0 +1,541 @@ +(** Write fuchsia events into buffers. + +Reference: https://fuchsia.dev/fuchsia-src/reference/tracing/trace-format *) + +module Util = Util +module Buf = Buf +module Output = Output +module Buf_pool = Buf_pool + +open struct + let spf = Printf.sprintf +end + +open Util + +type user_data = Trace_core.user_data + +module I64 = struct + include Int64 + + let ( + ) = add + let ( - ) = sub + let ( = ) = equal + let ( land ) = logand + let ( lor ) = logor + let lnot = lognot + let ( lsl ) = shift_left + let ( lsr ) = shift_right_logical + let ( asr ) = shift_right +end + +module Str_ref = struct + type t = int + (** 16 bits *) + + let inline (size : int) : t = + if size > 32_000 then invalid_arg "fuchsia: max length of strings is 20_000"; + if size = 0 then + 0 + else + (1 lsl 15) lor size +end + +module Thread_ref = struct + type t = + | Ref of int + | Inline of { + pid: int; + tid: int; + } + + let inline ~pid ~tid : t = Inline { pid; tid } + + let ref x : t = + if x = 0 || x > 255 then + invalid_arg "fuchsia: thread inline ref must be >0 < 256"; + Ref x + + let size_word (self : t) : int = + match self with + | Ref _ -> 0 + | Inline _ -> 2 + + (** 8-bit int for the reference *) + let as_i8 (self : t) : int = + match self with + | Ref i -> i + | Inline _ -> 0 +end + +(** record type = 0 *) +module Metadata = struct + (** First record in the trace *) + module Magic_record = struct + let value = 0x0016547846040010L + let size_word = 1 + + let encode (out : Output.t) = + let buf = Output.get_buf out ~available_word:size_word in + Buf.add_i64 buf value + end + + module Initialization_record = struct + let size_word = 2 + + (** Default: 1 tick = 1 ns *) + let default_ticks_per_sec = 1_000_000_000L + + let encode (out : Output.t) ~ticks_per_secs () : unit = + let buf = Output.get_buf out ~available_word:size_word in + let hd = I64.(1L lor (of_int size_word lsl 4)) in + Buf.add_i64 buf hd; + Buf.add_i64 buf ticks_per_secs + end + + module Provider_info = struct + let size_word ~name () = 1 + (round_to_word (String.length name) lsr 3) + + let encode (out : Output.t) ~(id : int) ~name () : unit = + let size = size_word ~name () in + let buf = Output.get_buf out ~available_word:size in + let hd = + I64.( + (of_int size lsl 4) + lor (1L lsl 16) + lor (of_int id lsl 20) + lor (of_int (Str_ref.inline (String.length name)) lsl 52)) + in + Buf.add_i64 buf hd; + Buf.add_string buf name + end + + module Provider_section = struct end + module Trace_info = struct end +end + +module Argument = struct + type 'a t = string * ([< user_data | `Kid of int ] as 'a) + + let check_valid _ = () + (* TODO: check string length *) + + let[@inline] is_i32_ (i : int) : bool = Int32.(to_int (of_int i) = i) + + let size_word (self : _ t) = + let name, data = self in + match data with + | `None | `Bool _ -> 1 + (round_to_word (String.length name) lsr 3) + | `Int i when is_i32_ i -> 1 + (round_to_word (String.length name) lsr 3) + | `Int _ -> (* int64 *) 2 + (round_to_word (String.length name) lsr 3) + | `Float _ -> 2 + (round_to_word (String.length name) lsr 3) + | `String s -> + 1 + + (round_to_word (String.length s) lsr 3) + + (round_to_word (String.length name) lsr 3) + | `Kid _ -> 2 + (round_to_word (String.length name) lsr 3) + + open struct + external int_of_bool : bool -> int = "%identity" + end + + let encode (buf : Buf.t) (self : _ t) : unit = + let name, data = self in + let size = size_word self in + + (* part of header with argument name + size *) + let hd_arg_size = + I64.( + (of_int size lsl 4) + lor (of_int (Str_ref.inline (String.length name)) lsl 16)) + in + + match data with + | `None -> + let hd = hd_arg_size in + Buf.add_i64 buf hd; + Buf.add_string buf name + | `Int i when is_i32_ i -> + let hd = I64.(1L lor hd_arg_size lor (of_int i lsl 32)) in + Buf.add_i64 buf hd; + Buf.add_string buf name + | `Int i -> + (* int64 *) + let hd = I64.(3L lor hd_arg_size) in + Buf.add_i64 buf hd; + Buf.add_string buf name; + Buf.add_i64 buf (I64.of_int i) + | `Float f -> + let hd = I64.(5L lor hd_arg_size) in + Buf.add_i64 buf hd; + Buf.add_string buf name; + Buf.add_i64 buf (I64.bits_of_float f) + | `String s -> + let hd = + I64.( + 6L lor hd_arg_size + lor (of_int (Str_ref.inline (String.length s)) lsl 32)) + in + Buf.add_i64 buf hd; + Buf.add_string buf name; + Buf.add_string buf s + | `Bool b -> + let hd = I64.(9L lor hd_arg_size lor (of_int (int_of_bool b) lsl 16)) in + Buf.add_i64 buf hd; + Buf.add_string buf name + | `Kid kid -> + (* int64 *) + let hd = I64.(8L lor hd_arg_size) in + Buf.add_i64 buf hd; + Buf.add_string buf name; + Buf.add_i64 buf (I64.of_int kid) +end + +module Arguments = struct + type 'a t = 'a Argument.t list + + let[@inline] len (self : _ t) : int = + match self with + | [] -> 0 + | [ _ ] -> 1 + | _ :: _ :: tl -> 2 + List.length tl + + let check_valid (self : _ t) = + let len = len self in + if len > 15 then + invalid_arg (spf "fuchsia: can have at most 15 args, got %d" len); + List.iter Argument.check_valid self; + () + + let[@inline] size_word (self : _ t) = + match self with + | [] -> 0 + | [ a ] -> Argument.size_word a + | a :: b :: tl -> + List.fold_left + (fun n arg -> n + Argument.size_word arg) + (Argument.size_word a + Argument.size_word b) + tl + + let[@inline] encode (buf : Buf.t) (self : _ t) = + let rec aux buf l = + match l with + | [] -> () + | x :: tl -> + Argument.encode buf x; + aux buf tl + in + + match self with + | [] -> () + | [ x ] -> Argument.encode buf x + | x :: tl -> + Argument.encode buf x; + aux buf tl +end + +(** record type = 3 *) +module Thread_record = struct + let size_word : int = 3 + + (** Record that [Thread_ref.ref as_ref] represents the pair [pid, tid] *) + let encode (out : Output.t) ~as_ref ~pid ~tid () : unit = + if as_ref <= 0 || as_ref > 255 then + invalid_arg "fuchsia: thread_record: invalid ref"; + + let buf = Output.get_buf out ~available_word:size_word in + + let hd = I64.(3L lor (of_int size_word lsl 4) lor (of_int as_ref lsl 16)) in + Buf.add_i64 buf hd; + Buf.add_i64 buf (I64.of_int pid); + Buf.add_i64 buf (I64.of_int tid) +end + +(** record type = 4 *) +module Event = struct + (** type=0 *) + module Instant = struct + let size_word ~name ~t_ref ~args () : int = + 1 + Thread_ref.size_word t_ref + 1 + (* timestamp *) + (round_to_word (String.length name) / 8) + + Arguments.size_word args + + let encode (out : Output.t) ~name ~(t_ref : Thread_ref.t) ~time_ns ~args () + : unit = + let size = size_word ~name ~t_ref ~args () in + let buf = Output.get_buf out ~available_word:size in + + (* set category = 0 *) + let hd = + I64.( + 4L + lor (of_int size lsl 4) + lor (of_int (Arguments.len args) lsl 20) + lor (of_int (Thread_ref.as_i8 t_ref) lsl 24) + lor (of_int (Str_ref.inline (String.length name)) lsl 48)) + in + Buf.add_i64 buf hd; + Buf.add_i64 buf time_ns; + + (match t_ref with + | Thread_ref.Inline { pid; tid } -> + Buf.add_i64 buf (I64.of_int pid); + Buf.add_i64 buf (I64.of_int tid) + | Thread_ref.Ref _ -> ()); + + Buf.add_string buf name; + Arguments.encode buf args; + () + end + + (** type=1 *) + module Counter = struct + let size_word ~name ~t_ref ~args () : int = + 1 + Thread_ref.size_word t_ref + 1 + (* timestamp *) + (round_to_word (String.length name) lsr 3) + + Arguments.size_word args + 1 (* counter id *) + + let encode (out : Output.t) ~name ~(t_ref : Thread_ref.t) ~time_ns ~args () + : unit = + let size = size_word ~name ~t_ref ~args () in + let buf = Output.get_buf out ~available_word:size in + + let hd = + I64.( + 4L + lor (of_int size lsl 4) + lor (1L lsl 16) + lor (of_int (Arguments.len args) lsl 20) + lor (of_int (Thread_ref.as_i8 t_ref) lsl 24) + lor (of_int (Str_ref.inline (String.length name)) lsl 48)) + in + Buf.add_i64 buf hd; + Buf.add_i64 buf time_ns; + + (match t_ref with + | Thread_ref.Inline { pid; tid } -> + Buf.add_i64 buf (I64.of_int pid); + Buf.add_i64 buf (I64.of_int tid) + | Thread_ref.Ref _ -> ()); + + Buf.add_string buf name; + Arguments.encode buf args; + (* just use 0 as counter id *) + Buf.add_i64 buf 0L; + () + end + + (** type=2 *) + module Duration_begin = struct + let size_word ~name ~t_ref ~args () : int = + 1 + Thread_ref.size_word t_ref + 1 + (* timestamp *) + (round_to_word (String.length name) lsr 3) + + Arguments.size_word args + + let encode (out : Output.t) ~name ~(t_ref : Thread_ref.t) ~time_ns ~args () + : unit = + let size = size_word ~name ~t_ref ~args () in + let buf = Output.get_buf out ~available_word:size in + + let hd = + I64.( + 4L + lor (of_int size lsl 4) + lor (2L lsl 16) + lor (of_int (Arguments.len args) lsl 20) + lor (of_int (Thread_ref.as_i8 t_ref) lsl 24) + lor (of_int (Str_ref.inline (String.length name)) lsl 48)) + in + Buf.add_i64 buf hd; + Buf.add_i64 buf time_ns; + + (match t_ref with + | Thread_ref.Inline { pid; tid } -> + Buf.add_i64 buf (I64.of_int pid); + Buf.add_i64 buf (I64.of_int tid) + | Thread_ref.Ref _ -> ()); + + Buf.add_string buf name; + Arguments.encode buf args; + () + end + + (** type=3 *) + module Duration_end = struct + let size_word ~name ~t_ref ~args () : int = + 1 + Thread_ref.size_word t_ref + 1 + (* timestamp *) + (round_to_word (String.length name) lsr 3) + + Arguments.size_word args + + let encode (out : Output.t) ~name ~(t_ref : Thread_ref.t) ~time_ns ~args () + : unit = + let size = size_word ~name ~t_ref ~args () in + let buf = Output.get_buf out ~available_word:size in + + let hd = + I64.( + 4L + lor (of_int size lsl 4) + lor (3L lsl 16) + lor (of_int (Arguments.len args) lsl 20) + lor (of_int (Thread_ref.as_i8 t_ref) lsl 24) + lor (of_int (Str_ref.inline (String.length name)) lsl 48)) + in + Buf.add_i64 buf hd; + Buf.add_i64 buf time_ns; + + (match t_ref with + | Thread_ref.Inline { pid; tid } -> + Buf.add_i64 buf (I64.of_int pid); + Buf.add_i64 buf (I64.of_int tid) + | Thread_ref.Ref _ -> ()); + + Buf.add_string buf name; + Arguments.encode buf args; + () + end + + (** type=4 *) + module Duration_complete = struct + let size_word ~name ~t_ref ~args () : int = + 1 + Thread_ref.size_word t_ref + 1 + (* timestamp *) + (round_to_word (String.length name) lsr 3) + + Arguments.size_word args + 1 (* end timestamp *) + + let encode (out : Output.t) ~name ~(t_ref : Thread_ref.t) ~time_ns + ~end_time_ns ~args () : unit = + let size = size_word ~name ~t_ref ~args () in + let buf = Output.get_buf out ~available_word:size in + + (* set category = 0 *) + let hd = + I64.( + 4L + lor (of_int size lsl 4) + lor (4L lsl 16) + lor (of_int (Arguments.len args) lsl 20) + lor (of_int (Thread_ref.as_i8 t_ref) lsl 24) + lor (of_int (Str_ref.inline (String.length name)) lsl 48)) + in + Buf.add_i64 buf hd; + Buf.add_i64 buf time_ns; + + (match t_ref with + | Thread_ref.Inline { pid; tid } -> + Buf.add_i64 buf (I64.of_int pid); + Buf.add_i64 buf (I64.of_int tid) + | Thread_ref.Ref _ -> ()); + + Buf.add_string buf name; + Arguments.encode buf args; + Buf.add_i64 buf end_time_ns; + () + end + + (** type=5 *) + module Async_begin = struct + let size_word ~name ~t_ref ~args () : int = + 1 + Thread_ref.size_word t_ref + 1 + (* timestamp *) + (round_to_word (String.length name) lsr 3) + + Arguments.size_word args + 1 (* async id *) + + let encode (out : Output.t) ~name ~(t_ref : Thread_ref.t) ~time_ns + ~(async_id : int) ~args () : unit = + let size = size_word ~name ~t_ref ~args () in + let buf = Output.get_buf out ~available_word:size in + + let hd = + I64.( + 4L + lor (of_int size lsl 4) + lor (5L lsl 16) + lor (of_int (Arguments.len args) lsl 20) + lor (of_int (Thread_ref.as_i8 t_ref) lsl 24) + lor (of_int (Str_ref.inline (String.length name)) lsl 48)) + in + Buf.add_i64 buf hd; + Buf.add_i64 buf time_ns; + + (match t_ref with + | Thread_ref.Inline { pid; tid } -> + Buf.add_i64 buf (I64.of_int pid); + Buf.add_i64 buf (I64.of_int tid) + | Thread_ref.Ref _ -> ()); + + Buf.add_string buf name; + Arguments.encode buf args; + Buf.add_i64 buf (I64.of_int async_id); + () + end + + (** type=7 *) + module Async_end = struct + let size_word ~name ~t_ref ~args () : int = + 1 + Thread_ref.size_word t_ref + 1 + (* timestamp *) + (round_to_word (String.length name) lsr 3) + + Arguments.size_word args + 1 (* async id *) + + let encode (out : Output.t) ~name ~(t_ref : Thread_ref.t) ~time_ns + ~(async_id : int) ~args () : unit = + let size = size_word ~name ~t_ref ~args () in + let buf = Output.get_buf out ~available_word:size in + + let hd = + I64.( + 4L + lor (of_int size lsl 4) + lor (7L lsl 16) + lor (of_int (Arguments.len args) lsl 20) + lor (of_int (Thread_ref.as_i8 t_ref) lsl 24) + lor (of_int (Str_ref.inline (String.length name)) lsl 48)) + in + Buf.add_i64 buf hd; + Buf.add_i64 buf time_ns; + + (match t_ref with + | Thread_ref.Inline { pid; tid } -> + Buf.add_i64 buf (I64.of_int pid); + Buf.add_i64 buf (I64.of_int tid) + | Thread_ref.Ref _ -> ()); + + Buf.add_string buf name; + Arguments.encode buf args; + Buf.add_i64 buf (I64.of_int async_id); + () + end +end + +(** record type = 7 *) +module Kernel_object = struct + let size_word ~name ~args () : int = + 1 + 1 + + (round_to_word (String.length name) lsr 3) + + Arguments.size_word args + + (* see: + https://cs.opensource.google/fuchsia/fuchsia/+/main:zircon/system/public/zircon/types.h;l=441?q=ZX_OBJ_TYPE&ss=fuchsia%2Ffuchsia + *) + + type ty = int + + let ty_process : ty = 1 + let ty_thread : ty = 2 + + let encode (out : Output.t) ~name ~(ty : ty) ~(kid : int) ~args () : unit = + let size = size_word ~name ~args () in + let buf = Output.get_buf out ~available_word:size in + + let hd = + I64.( + 7L + lor (of_int size lsl 4) + lor (of_int ty lsl 16) + lor (of_int (Arguments.len args) lsl 40) + lor (of_int (Str_ref.inline (String.length name)) lsl 24)) + in + Buf.add_i64 buf hd; + Buf.add_i64 buf (I64.of_int kid); + Buf.add_string buf name; + Arguments.encode buf args; + () +end diff --git a/src/fuchsia/write/util.ml b/src/fuchsia/write/util.ml new file mode 100644 index 0000000..7af7dec --- /dev/null +++ b/src/fuchsia/write/util.ml @@ -0,0 +1,5 @@ +(** How many bytes are missing for [n] to be a multiple of 8 *) +let[@inline] missing_to_round (n : int) : int = lnot (n - 1) land 0b111 + +(** Round up to a multiple of 8 *) +let[@inline] round_to_word (n : int) : int = n + (lnot (n - 1) land 0b111) diff --git a/src/ppx/ppx_trace.ml b/src/ppx/ppx_trace.ml index 8f59221..af2dab3 100644 --- a/src/ppx/ppx_trace.ml +++ b/src/ppx/ppx_trace.ml @@ -8,19 +8,30 @@ let location_errorf ~loc fmt = (** {2 let expression} *) -let expand_let ~ctxt (name : string) body = +let expand_let ~ctxt (var : [ `Var of label loc | `Unit ]) (name : string) body + = let loc = Expansion_context.Extension.extension_point_loc ctxt in Ast_builder.Default.( + let var_pat = + match var with + | `Var v -> ppat_var ~loc:v.loc v + | `Unit -> ppat_var ~loc { loc; txt = "_trace_span" } + in + let var_exp = + match var with + | `Var v -> pexp_ident ~loc:v.loc { txt = lident v.txt; loc = v.loc } + | `Unit -> [%expr _trace_span] + in [%expr - let _trace_span = + let [%p var_pat] = Trace_core.enter_span ~__FILE__ ~__LINE__ [%e estring ~loc name] in try let res = [%e body] in - Trace_core.exit_span _trace_span; + Trace_core.exit_span [%e var_exp]; res with exn -> - Trace_core.exit_span _trace_span; + Trace_core.exit_span [%e var_exp]; raise exn]) let extension_let = @@ -29,7 +40,13 @@ let extension_let = single_expr_payload (pexp_let nonrecursive (value_binding - ~pat:(ppat_construct (lident (string "()")) none) + ~pat: + (let pat_var = ppat_var __' |> map ~f:(fun f v -> f (`Var v)) in + let pat_unit = + as__ @@ ppat_construct (lident (string "()")) none + |> map ~f:(fun f _ -> f `Unit) + in + alt pat_var pat_unit) ~expr:(estring __) ^:: nil) __)) diff --git a/src/tef/dune b/src/tef/dune index acacd95..156eec1 100644 --- a/src/tef/dune +++ b/src/tef/dune @@ -3,7 +3,4 @@ (name trace_tef) (public_name trace-tef) (synopsis "Simple and lightweight tracing using TEF/Catapult format, in-process") - (libraries trace.core mtime mtime.clock.os atomic unix threads - (select relax_.ml from - (base-domain -> relax_.real.ml) - ( -> relax_.dummy.ml)))) + (libraries trace.core trace.private.util mtime mtime.clock.os unix threads)) diff --git a/src/tef/relax_.real.ml b/src/tef/relax_.real.ml deleted file mode 100644 index f3dab5c..0000000 --- a/src/tef/relax_.real.ml +++ /dev/null @@ -1 +0,0 @@ -let cpu_relax = Domain.cpu_relax diff --git a/src/tef/trace_tef.ml b/src/tef/trace_tef.ml index aed6653..bd36ecb 100644 --- a/src/tef/trace_tef.ml +++ b/src/tef/trace_tef.ml @@ -1,4 +1,5 @@ open Trace_core +open Trace_private_util module A = Trace_core.Internal_.Atomic_ module Mock_ = struct @@ -14,7 +15,7 @@ end let counter = Mtime_clock.counter () (** Now, in microseconds *) -let now_us () : float = +let[@inline] now_us () : float = if !Mock_.enabled then Mock_.now_us () else ( @@ -22,16 +23,6 @@ let now_us () : float = Mtime.Span.to_float_ns t /. 1e3 ) -let protect ~finally f = - try - let x = f () in - finally (); - x - with exn -> - let bt = Printexc.get_raw_backtrace () in - finally (); - Printexc.raise_with_backtrace exn bt - let on_tracing_error = ref (fun s -> Printf.eprintf "trace-tef error: %s\n%!" s) type event = @@ -144,7 +135,7 @@ module Writer = struct let with_ ~out f = let writer = create ~out () in - protect ~finally:(fun () -> close writer) (fun () -> f writer) + Fun.protect ~finally:(fun () -> close writer) (fun () -> f writer) let[@inline] flush (self : t) : unit = flush self.oc @@ -499,7 +490,7 @@ let setup ?(out = `Env) () = let with_setup ?out () f = setup ?out (); - protect ~finally:Trace_core.shutdown f + Fun.protect ~finally:Trace_core.shutdown f module Internal_ = struct let mock_all_ () = Mock_.enabled := true diff --git a/src/tef/b_queue.ml b/src/util/b_queue.ml similarity index 84% rename from src/tef/b_queue.ml rename to src/util/b_queue.ml index 1a77aa3..f5ee5f3 100644 --- a/src/tef/b_queue.ml +++ b/src/util/b_queue.ml @@ -1,9 +1,11 @@ +module A = Trace_core.Internal_.Atomic_ + type 'a t = { mutex: Mutex.t; cond: Condition.t; q: 'a Mpsc_bag.t; mutable closed: bool; - consumer_waiting: bool Atomic.t; + consumer_waiting: bool A.t; } exception Closed @@ -14,7 +16,7 @@ let create () : _ t = cond = Condition.create (); q = Mpsc_bag.create (); closed = false; - consumer_waiting = Atomic.make false; + consumer_waiting = A.make false; } let close (self : _ t) = @@ -29,7 +31,7 @@ let push (self : _ t) x : unit = if self.closed then raise Closed; Mpsc_bag.add self.q x; if self.closed then raise Closed; - if Atomic.get self.consumer_waiting then ( + if A.get self.consumer_waiting then ( (* wakeup consumer *) Mutex.lock self.mutex; Condition.broadcast self.cond; @@ -42,14 +44,14 @@ let rec pop_all (self : 'a t) : 'a list = | None -> if self.closed then raise Closed; Mutex.lock self.mutex; - Atomic.set self.consumer_waiting true; + A.set self.consumer_waiting true; (* check again, a producer might have pushed an element since we last checked. However if we still find nothing, because this comes after [consumer_waiting:=true], any producer arriving after that will know to wake us up. *) (match Mpsc_bag.pop_all self.q with | Some l -> - Atomic.set self.consumer_waiting false; + A.set self.consumer_waiting false; Mutex.unlock self.mutex; l | None -> @@ -58,6 +60,6 @@ let rec pop_all (self : 'a t) : 'a list = raise Closed ); Condition.wait self.cond self.mutex; - Atomic.set self.consumer_waiting false; + A.set self.consumer_waiting false; Mutex.unlock self.mutex; pop_all self) diff --git a/src/tef/b_queue.mli b/src/util/b_queue.mli similarity index 100% rename from src/tef/b_queue.mli rename to src/util/b_queue.mli diff --git a/src/tef/relax_.dummy.ml b/src/util/domain_util.dummy.ml similarity index 51% rename from src/tef/relax_.dummy.ml rename to src/util/domain_util.dummy.ml index 3c5fd6f..2a59baf 100644 --- a/src/tef/relax_.dummy.ml +++ b/src/util/domain_util.dummy.ml @@ -1 +1,2 @@ let cpu_relax () = () +let n_domains () = 1 diff --git a/src/util/domain_util.mli b/src/util/domain_util.mli new file mode 100644 index 0000000..666b1f5 --- /dev/null +++ b/src/util/domain_util.mli @@ -0,0 +1,2 @@ +val cpu_relax : unit -> unit +val n_domains : unit -> int diff --git a/src/util/domain_util.real.ml b/src/util/domain_util.real.ml new file mode 100644 index 0000000..ea4c225 --- /dev/null +++ b/src/util/domain_util.real.ml @@ -0,0 +1,2 @@ +let cpu_relax = Domain.cpu_relax +let n_domains = Domain.recommended_domain_count diff --git a/src/util/dune b/src/util/dune new file mode 100644 index 0000000..39f8bcb --- /dev/null +++ b/src/util/dune @@ -0,0 +1,9 @@ + +(library + (public_name trace.private.util) + (synopsis "internal utilities for trace. No guarantees of stability.") + (name trace_private_util) + (libraries trace.core mtime mtime.clock.os unix threads + (select domain_util.ml from + (base-domain -> domain_util.real.ml) + ( -> domain_util.dummy.ml)))) diff --git a/src/tef/mpsc_bag.ml b/src/util/mpsc_bag.ml similarity index 60% rename from src/tef/mpsc_bag.ml rename to src/util/mpsc_bag.ml index 004e8f5..02aeadf 100644 --- a/src/tef/mpsc_bag.ml +++ b/src/util/mpsc_bag.ml @@ -1,7 +1,9 @@ -type 'a t = { bag: 'a list Atomic.t } [@@unboxed] +module A = Trace_core.Internal_.Atomic_ + +type 'a t = { bag: 'a list A.t } [@@unboxed] let create () = - let bag = Atomic.make [] in + let bag = A.make [] in { bag } module Backoff = struct @@ -11,20 +13,20 @@ module Backoff = struct let once (b : t) : t = for _i = 1 to b do - Relax_.cpu_relax () + Domain_util.cpu_relax () done; min (b * 2) 256 end let rec add backoff t x = - let before = Atomic.get t.bag in + let before = A.get t.bag in let after = x :: before in - if not (Atomic.compare_and_set t.bag before after) then + if not (A.compare_and_set t.bag before after) then add (Backoff.once backoff) t x let[@inline] add t x = add Backoff.default t x let[@inline] pop_all t : _ list option = - match Atomic.exchange t.bag [] with + match A.exchange t.bag [] with | [] -> None | l -> Some (List.rev l) diff --git a/src/tef/mpsc_bag.mli b/src/util/mpsc_bag.mli similarity index 100% rename from src/tef/mpsc_bag.mli rename to src/util/mpsc_bag.mli diff --git a/test/fuchsia/write/dune b/test/fuchsia/write/dune new file mode 100644 index 0000000..1261c4b --- /dev/null +++ b/test/fuchsia/write/dune @@ -0,0 +1,5 @@ + +(tests + (names t1 t2) + (package trace-fuchsia) + (libraries trace-fuchsia.write)) diff --git a/test/fuchsia/write/t1.ml b/test/fuchsia/write/t1.ml new file mode 100644 index 0000000..8b59e85 --- /dev/null +++ b/test/fuchsia/write/t1.ml @@ -0,0 +1,65 @@ +open Trace_fuchsia_write + +module Str_ = struct + open String + + let to_hex (s : string) : string = + let i_to_hex (i : int) = + if i < 10 then + Char.chr (i + Char.code '0') + else + Char.chr (i - 10 + Char.code 'a') + in + + let res = Bytes.create (2 * length s) in + for i = 0 to length s - 1 do + let n = Char.code (get s i) in + Bytes.set res (2 * i) (i_to_hex ((n land 0xf0) lsr 4)); + Bytes.set res ((2 * i) + 1) (i_to_hex (n land 0x0f)) + done; + Bytes.unsafe_to_string res + + let of_hex_exn (s : string) : string = + let n_of_c = function + | '0' .. '9' as c -> Char.code c - Char.code '0' + | 'a' .. 'f' as c -> 10 + Char.code c - Char.code 'a' + | 'A' .. 'F' as c -> 10 + Char.code c - Char.code 'A' + | _ -> invalid_arg "string: invalid hex" + in + if String.length s mod 2 <> 0 then + invalid_arg "string: hex sequence must be of even length"; + let res = Bytes.make (String.length s / 2) '\x00' in + for i = 0 to (String.length s / 2) - 1 do + let n1 = n_of_c (String.get s (2 * i)) in + let n2 = n_of_c (String.get s ((2 * i) + 1)) in + let n = (n1 lsl 4) lor n2 in + Bytes.set res i (Char.chr n) + done; + Bytes.unsafe_to_string res +end + +let () = + let l = List.init 100 (fun i -> Util.round_to_word i) in + assert (List.for_all (fun x -> x mod 8 = 0) l) + +let () = + assert (Str_ref.inline 0 = 0b0000_0000_0000_0000); + assert (Str_ref.inline 1 = 0b1000_0000_0000_0001); + assert (Str_ref.inline 6 = 0b1000_0000_0000_0110); + assert (Str_ref.inline 31999 = 0b1111_1100_1111_1111); + () + +let () = + let buf = Buf.create 128 in + Buf.add_i64 buf 42L; + assert (Buf.to_string buf = "\x2a\x00\x00\x00\x00\x00\x00\x00") + +let () = + let buf = Buf.create 128 in + Buf.add_string buf ""; + assert (Buf.to_string buf = "") + +let () = + let buf = Buf.create 128 in + Buf.add_string buf "hello"; + assert (Buf.to_string buf = "hello\x00\x00\x00") diff --git a/test/fuchsia/write/t2.expected b/test/fuchsia/write/t2.expected new file mode 100644 index 0000000..7ea7404 --- /dev/null +++ b/test/fuchsia/write/t2.expected @@ -0,0 +1,4 @@ +first trace +100004467854160033000500000000000100000000000000560000000000000054001005000005804e61bc000000000068656c6c6f000000210001802a0000007800000000000000 +second trace +1000044678541600210000000000000000ca9a3b00000000330005000000000001000000000000005600000000000000300011000000b0006f63616d6c2d747261636500000000004400040500000580a0860100000000006f75746572000000404b4c0000000000440004050000058020bf020000000000696e6e657200000020aa440000000000540010050000058087d612000000000068656c6c6f000000210001802a0000007800000000000000 diff --git a/test/fuchsia/write/t2.ml b/test/fuchsia/write/t2.ml new file mode 100644 index 0000000..0dcc5e9 --- /dev/null +++ b/test/fuchsia/write/t2.ml @@ -0,0 +1,89 @@ +open Trace_fuchsia_write + +let pf = Printf.printf + +module Str_ = struct + open String + + let to_hex (s : string) : string = + let i_to_hex (i : int) = + if i < 10 then + Char.chr (i + Char.code '0') + else + Char.chr (i - 10 + Char.code 'a') + in + + let res = Bytes.create (2 * length s) in + for i = 0 to length s - 1 do + let n = Char.code (get s i) in + Bytes.set res (2 * i) (i_to_hex ((n land 0xf0) lsr 4)); + Bytes.set res ((2 * i) + 1) (i_to_hex (n land 0x0f)) + done; + Bytes.unsafe_to_string res + + let of_hex_exn (s : string) : string = + let n_of_c = function + | '0' .. '9' as c -> Char.code c - Char.code '0' + | 'a' .. 'f' as c -> 10 + Char.code c - Char.code 'a' + | 'A' .. 'F' as c -> 10 + Char.code c - Char.code 'A' + | _ -> invalid_arg "string: invalid hex" + in + if String.length s mod 2 <> 0 then + invalid_arg "string: hex sequence must be of even length"; + let res = Bytes.make (String.length s / 2) '\x00' in + for i = 0 to (String.length s / 2) - 1 do + let n1 = n_of_c (String.get s (2 * i)) in + let n2 = n_of_c (String.get s ((2 * i) + 1)) in + let n = (n1 lsl 4) lor n2 in + Bytes.set res i (Char.chr n) + done; + Bytes.unsafe_to_string res +end + +let with_buf_output (f : Output.t -> unit) : string = + let buf_pool = Buf_pool.create () in + let buffer = Buffer.create 32 in + let out = Output.into_buffer ~buf_pool buffer in + f out; + Output.flush out; + Buffer.contents buffer + +let () = pf "first trace\n" + +let () = + let str = + with_buf_output (fun out -> + Metadata.Magic_record.encode out; + Thread_record.encode out ~as_ref:5 ~pid:1 ~tid:86 (); + Event.Instant.encode out ~name:"hello" ~time_ns:1234_5678L + ~t_ref:(Thread_ref.Ref 5) + ~args:[ "x", `Int 42 ] + ()) + in + pf "%s\n" (Str_.to_hex str) + +let () = pf "second trace\n" + +let () = + let str = + with_buf_output (fun out -> + Metadata.Magic_record.encode out; + Metadata.Initialization_record.( + encode out ~ticks_per_secs:default_ticks_per_sec ()); + Thread_record.encode out ~as_ref:5 ~pid:1 ~tid:86 (); + Metadata.Provider_info.encode out ~id:1 ~name:"ocaml-trace" (); + Event.Duration_complete.encode out ~name:"outer" + ~t_ref:(Thread_ref.Ref 5) ~time_ns:100_000L ~end_time_ns:5_000_000L + ~args:[] (); + Event.Duration_complete.encode out ~name:"inner" + ~t_ref:(Thread_ref.Ref 5) ~time_ns:180_000L ~end_time_ns:4_500_000L + ~args:[] (); + Event.Instant.encode out ~name:"hello" ~time_ns:1_234_567L + ~t_ref:(Thread_ref.Ref 5) + ~args:[ "x", `Int 42 ] + ()) + in + (let oc = open_out "foo.fxt" in + output_string oc str; + close_out oc); + pf "%s\n" (Str_.to_hex str) diff --git a/trace-fuchsia.opam b/trace-fuchsia.opam new file mode 100644 index 0000000..62ef782 --- /dev/null +++ b/trace-fuchsia.opam @@ -0,0 +1,37 @@ +# This file is generated by dune, edit dune-project instead +opam-version: "2.0" +version: "0.5" +synopsis: + "A high-performance backend for trace, emitting a Fuchsia trace into a file" +maintainer: ["Simon Cruanes"] +authors: ["Simon Cruanes"] +license: "MIT" +tags: ["trace" "tracing" "fuchsia"] +homepage: "https://github.com/c-cube/ocaml-trace" +bug-reports: "https://github.com/c-cube/ocaml-trace/issues" +depends: [ + "ocaml" {>= "4.08"} + "trace" {= version} + "mtime" {>= "2.0"} + "base-bigarray" + "base-unix" + "dune" {>= "2.9"} + "odoc" {with-doc} +] +build: [ + ["dune" "subst"] {dev} + [ + "dune" + "build" + "-p" + name + "-j" + jobs + "--promote-install-files=false" + "@install" + "@runtest" {with-test} + "@doc" {with-doc} + ] + ["dune" "install" "-p" name "--create-install-files" name] +] +dev-repo: "git+https://github.com/c-cube/ocaml-trace.git" diff --git a/trace-tef.opam b/trace-tef.opam index 464cee8..d984efa 100644 --- a/trace-tef.opam +++ b/trace-tef.opam @@ -14,7 +14,6 @@ depends: [ "trace" {= version} "mtime" {>= "2.0"} "base-unix" - "atomic" "dune" {>= "2.9"} "odoc" {with-doc} ]