Skip to content

Commit

Permalink
Merge pull request #32 from c-cube/simon/subscribers
Browse files Browse the repository at this point in the history
trace-subscriber
  • Loading branch information
c-cube authored Sep 17, 2024
2 parents d362755 + 9dd2cf5 commit 11d313d
Show file tree
Hide file tree
Showing 27 changed files with 2,598 additions and 2,100 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,6 @@ jobs:
- run: opam exec -- dune runtest -p trace-tef,trace-fuchsia

# with depopts
- run: opam install hmap
- run: opam install hmap mtime
- run: opam exec -- dune build '@install' -p trace,trace-tef,trace-fuchsia

2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ clean:

test:
@dune runtest $(DUNE_OPTS)
test-autopromote:
@dune runtest $(DUNE_OPTS) --auto-promote

doc:
@dune build $(DUNE_OPTS) @doc
Expand Down
73 changes: 49 additions & 24 deletions dune-project
Original file line number Diff line number Diff line change
@@ -1,64 +1,89 @@
(lang dune 2.9)

(name trace)

(generate_opam_files true)

(version 0.7)

(source
(github c-cube/ocaml-trace))

(authors "Simon Cruanes")

(maintainers "Simon Cruanes")

(license MIT)

;(documentation https://url/to/documentation)

(package
(name trace)
(synopsis "A stub for tracing/observability, agnostic in how data is collected")
(synopsis
"A stub for tracing/observability, agnostic in how data is collected")
(depends
(ocaml (>= 4.08))
dune)
(ocaml
(>= 4.08))
dune)
(depopts
hmap
(mtime (>= 2.0)))
hmap
(mtime
(>= 2.0)))
(tags
(trace tracing observability profiling)))

(package
(name ppx_trace)
(synopsis "A ppx-based preprocessor for trace")
(depends
(ocaml (>= 4.12)) ; we use __FUNCTION__
(ppxlib (>= 0.28))
(trace (= :version))
(trace-tef (and (= :version) :with-test))
dune)
(ocaml
(>= 4.12)) ; we use __FUNCTION__
(ppxlib
(>= 0.28))
(trace
(= :version))
(trace-tef
(and
(= :version)
:with-test))
dune)
(depopts
(mtime (>= 2.0)))
(tags
(trace ppx)))

(package
(name trace-tef)
(synopsis "A simple backend for trace, emitting Catapult/TEF JSON into a file")
(synopsis
"A simple backend for trace, emitting Catapult/TEF JSON into a file")
(depends
(ocaml (>= 4.08))
(trace (= :version))
(mtime (>= 2.0))
base-unix
dune)
(ocaml
(>= 4.08))
(trace
(= :version))
(mtime
(>= 2.0))
base-unix
dune)
(tags
(trace tracing catapult TEF chrome-format)))

(package
(name trace-fuchsia)
(synopsis "A high-performance backend for trace, emitting a Fuchsia trace into a file")
(synopsis
"A high-performance backend for trace, emitting a Fuchsia trace into a file")
(depends
(ocaml (>= 4.08))
(trace (= :version))
(mtime (>= 2.0))
(thread-local-storage (>= 0.2))
base-bigarray
base-unix
dune)
(ocaml
(>= 4.08))
(trace
(= :version))
(mtime
(>= 2.0))
(thread-local-storage
(>= 0.2))
base-bigarray
base-unix
dune)
(tags
(trace tracing fuchsia)))

Expand Down
3 changes: 3 additions & 0 deletions ppx_trace.opam
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ depends: [
"dune" {>= "2.9"}
"odoc" {with-doc}
]
depopts: [
"mtime" {>= "2.0"}
]
build: [
["dune" "subst"] {dev}
[
Expand Down
129 changes: 129 additions & 0 deletions src/subscriber/callbacks.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
open Trace_core
open Types

(** First class module signature for callbacks *)
module type S = sig
type st
(** Type of the state passed to every callback. *)

val on_init : st -> time_ns:float -> unit
(** Called when the subscriber is initialized in a collector *)

val on_shutdown : st -> time_ns:float -> unit
(** Called when the collector is shutdown *)

val on_name_thread : st -> time_ns:float -> tid:int -> name:string -> unit
(** Current thread is being named *)

val on_name_process : st -> time_ns:float -> tid:int -> name:string -> unit
(** Current process is being named *)

val on_enter_span :
st ->
__FUNCTION__:string option ->
__FILE__:string ->
__LINE__:int ->
time_ns:float ->
tid:int ->
data:(string * user_data) list ->
name:string ->
span ->
unit
(** Enter a regular (sync) span *)

val on_exit_span : st -> time_ns:float -> tid:int -> span -> unit
(** Exit a span. This and [on_enter_span] must follow strict stack discipline *)

val on_add_data : st -> data:(string * user_data) list -> span -> unit
(** Add data to a regular span (which must be active) *)

val on_message :
st ->
time_ns:float ->
tid:int ->
span:span option ->
data:(string * user_data) list ->
string ->
unit
(** Emit a log message *)

val on_counter :
st ->
time_ns:float ->
tid:int ->
data:(string * user_data) list ->
name:string ->
float ->
unit
(** Emit the current value of a counter *)

val on_enter_manual_span :
st ->
__FUNCTION__:string option ->
__FILE__:string ->
__LINE__:int ->
time_ns:float ->
tid:int ->
parent:span option ->
data:(string * user_data) list ->
name:string ->
flavor:flavor option ->
trace_id:int ->
span ->
unit
(** Enter a manual (possibly async) span *)

val on_exit_manual_span :
st ->
time_ns:float ->
tid:int ->
name:string ->
data:(string * user_data) list ->
flavor:flavor option ->
trace_id:int ->
span ->
unit
(** Exit a manual span *)
end

type 'st t = (module S with type st = 'st)
(** Callbacks for a subscriber. There is one callback per event
in {!Trace}. The type ['st] is the state that is passed to
every single callback. *)

(** Dummy callbacks.
It can be useful to reuse some of these functions in a
real subscriber that doesn't want to handle {b all}
events, but only some of them. *)
module Dummy = struct
let on_init _ ~time_ns:_ = ()
let on_shutdown _ ~time_ns:_ = ()
let on_name_thread _ ~time_ns:_ ~tid:_ ~name:_ = ()
let on_name_process _ ~time_ns:_ ~tid:_ ~name:_ = ()
let on_message _ ~time_ns:_ ~tid:_ ~span:_ ~data:_ _msg = ()
let on_counter _ ~time_ns:_ ~tid:_ ~data:_ ~name:_ _v = ()

let on_enter_span _ ~__FUNCTION__:_ ~__FILE__:_ ~__LINE__:_ ~time_ns:_ ~tid:_
~data:_ ~name:_ _sp =
()

let on_exit_span _ ~time_ns:_ ~tid:_ _ = ()
let on_add_data _ ~data:_ _sp = ()

let on_enter_manual_span _ ~__FUNCTION__:_ ~__FILE__:_ ~__LINE__:_ ~time_ns:_
~tid:_ ~parent:_ ~data:_ ~name:_ ~flavor:_ ~trace_id:_ _sp =
()

let on_exit_manual_span _ ~time_ns:_ ~tid:_ ~name:_ ~data:_ ~flavor:_
~trace_id:_ _ =
()
end

(** Dummy callbacks, do nothing. *)
let dummy (type st) () : st t =
let module M = struct
type nonrec st = st

include Dummy
end in
(module M)
13 changes: 13 additions & 0 deletions src/subscriber/dune
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@

(library
(name trace_subscriber)
(public_name trace.subscriber)
(libraries (re_export trace.core)
(select thread_.ml from
(threads -> thread_.real.ml)
( -> thread_.dummy.ml))
(select time_.ml from
(mtime mtime.clock.os -> time_.mtime.ml)
(mtime mtime.clock.jsoo -> time_.mtime.ml)
( -> time_.dummy.ml))))

104 changes: 104 additions & 0 deletions src/subscriber/subscriber.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
(** A trace subscriber. It pairs a set of callbacks
with the state they need (which can contain a file handle,
a socket, config, etc.).
The design goal for this is that it should be possible to avoid allocations
when the trace collector calls the callbacks. *)
type t =
| Sub : {
st: 'st;
callbacks: 'st Callbacks.t;
}
-> t

(** Dummy subscriber that ignores every call. *)
let dummy : t = Sub { st = (); callbacks = Callbacks.dummy () }

open struct
module Tee_cb : Callbacks.S with type st = t * t = struct
type nonrec st = t * t

let on_init
( Sub { st = s1; callbacks = (module CB1) },
Sub { st = s2; callbacks = (module CB2) } ) ~time_ns =
CB1.on_init s1 ~time_ns;
CB2.on_init s2 ~time_ns

let on_shutdown
( Sub { st = s1; callbacks = (module CB1) },
Sub { st = s2; callbacks = (module CB2) } ) ~time_ns =
CB1.on_shutdown s1 ~time_ns;
CB2.on_shutdown s2 ~time_ns

let on_name_thread
( Sub { st = s1; callbacks = (module CB1) },
Sub { st = s2; callbacks = (module CB2) } ) ~time_ns ~tid ~name =
CB1.on_name_thread s1 ~time_ns ~tid ~name;
CB2.on_name_thread s2 ~time_ns ~tid ~name

let on_name_process
( Sub { st = s1; callbacks = (module CB1) },
Sub { st = s2; callbacks = (module CB2) } ) ~time_ns ~tid ~name =
CB1.on_name_process s1 ~time_ns ~tid ~name;
CB2.on_name_process s2 ~time_ns ~tid ~name

let on_enter_span
( Sub { st = s1; callbacks = (module CB1) },
Sub { st = s2; callbacks = (module CB2) } ) ~__FUNCTION__ ~__FILE__
~__LINE__ ~time_ns ~tid ~data ~name span =
CB1.on_enter_span s1 ~__FUNCTION__ ~__FILE__ ~__LINE__ ~time_ns ~tid ~data
~name span;
CB2.on_enter_span s2 ~__FUNCTION__ ~__FILE__ ~__LINE__ ~time_ns ~tid ~data
~name span

let on_exit_span
( Sub { st = s1; callbacks = (module CB1) },
Sub { st = s2; callbacks = (module CB2) } ) ~time_ns ~tid span =
CB1.on_exit_span s1 ~time_ns ~tid span;
CB2.on_exit_span s2 ~time_ns ~tid span

let on_add_data
( Sub { st = s1; callbacks = (module CB1) },
Sub { st = s2; callbacks = (module CB2) } ) ~data span =
CB1.on_add_data s1 ~data span;
CB2.on_add_data s2 ~data span

let on_message
( Sub { st = s1; callbacks = (module CB1) },
Sub { st = s2; callbacks = (module CB2) } ) ~time_ns ~tid ~span ~data
msg =
CB1.on_message s1 ~time_ns ~tid ~span ~data msg;
CB2.on_message s2 ~time_ns ~tid ~span ~data msg

let on_counter
( Sub { st = s1; callbacks = (module CB1) },
Sub { st = s2; callbacks = (module CB2) } ) ~time_ns ~tid ~data ~name
n =
CB1.on_counter s1 ~time_ns ~tid ~data ~name n;
CB2.on_counter s2 ~time_ns ~tid ~data ~name n

let on_enter_manual_span
( Sub { st = s1; callbacks = (module CB1) },
Sub { st = s2; callbacks = (module CB2) } ) ~__FUNCTION__ ~__FILE__
~__LINE__ ~time_ns ~tid ~parent ~data ~name ~flavor ~trace_id span =
CB1.on_enter_manual_span s1 ~__FUNCTION__ ~__FILE__ ~__LINE__ ~time_ns
~tid ~parent ~data ~name ~flavor ~trace_id span;
CB2.on_enter_manual_span s2 ~__FUNCTION__ ~__FILE__ ~__LINE__ ~time_ns
~tid ~parent ~data ~name ~flavor ~trace_id span

let on_exit_manual_span
( Sub { st = s1; callbacks = (module CB1) },
Sub { st = s2; callbacks = (module CB2) } ) ~time_ns ~tid ~name ~data
~flavor ~trace_id span =
CB1.on_exit_manual_span s1 ~time_ns ~tid ~name ~data ~flavor ~trace_id
span;
CB2.on_exit_manual_span s2 ~time_ns ~tid ~name ~data ~flavor ~trace_id
span
end
end

(** [tee s1 s2] is a subscriber that forwards every
call to [s1] and [s2] both. *)
let tee (s1 : t) (s2 : t) : t =
let st = s1, s2 in
Sub { st; callbacks = (module Tee_cb) }
1 change: 1 addition & 0 deletions src/subscriber/thread_.dummy.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
let[@inline] get_tid () = 0
2 changes: 2 additions & 0 deletions src/subscriber/thread_.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
val get_tid : unit -> int
(** Get current thread ID *)
1 change: 1 addition & 0 deletions src/subscriber/thread_.real.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
let[@inline] get_tid () = Thread.id @@ Thread.self ()
1 change: 1 addition & 0 deletions src/subscriber/time_.dummy.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
let[@inline] get_time_ns () : float = 0.
Loading

0 comments on commit 11d313d

Please sign in to comment.