Skip to content

Commit

Permalink
feat: Subscriber.tee
Browse files Browse the repository at this point in the history
  • Loading branch information
c-cube committed Sep 9, 2024
1 parent 5b1ad72 commit d8059e9
Showing 1 changed file with 86 additions and 39 deletions.
125 changes: 86 additions & 39 deletions src/subscriber/subscriber.ml
Original file line number Diff line number Diff line change
Expand Up @@ -8,42 +8,89 @@ type t =

let dummy : t = Sub { st = (); callbacks = Callbacks.dummy () }

(* TODO:
let multiplex (l : t list) : t =
match l with
| [] -> dummy
| [ s ] -> s
| _ ->
let module M = struct
type st = t list
let on_init l ~time_ns =
List.iter
(fun (Sub { st; callbacks = (module CB) }) -> CB.on_init st ~time_ns)
l
let on_shutdown _ ~time_ns:_ = ()
let on_tick _ = ()
let on_name_thread _ ~time_ns:_ ~tid:_ ~name:_ = ()
let on_name_process _ ~time_ns:_ ~tid:_ ~name:_ = ()
let on_message _ ~time_ns:_ ~tid:_ ~span:_ ~data:_ _msg = ()
let on_counter _ ~time_ns:_ ~tid:_ ~data:_ ~name:_ _v = ()
let on_enter_span _ ~__FUNCTION__:_ ~__FILE__:_ ~__LINE__:_ ~time_ns:_
~tid:_ ~data:_ ~name:_ _sp =
()
let on_exit_span _ ~time_ns:_ ~tid:_ _ = ()
let on_add_data _ ~data:_ _sp = ()
let on_enter_manual_span _ ~__FUNCTION__:_ ~__FILE__:_ ~__LINE__:_
~time_ns:_ ~tid:_ ~parent:_ ~data:_ ~name:_ ~flavor:_ ~trace_id:_ _sp
=
()
let on_exit_manual_span _ ~time_ns:_ ~tid:_ ~name:_ ~data:_ ~flavor:_
~trace_id:_ _ =
()
end in
Sub { st = l; callbacks = (module M) }
*)
open struct
module Tee_cb : Callbacks.S with type st = t * t = struct
type nonrec st = t * t

let on_init
( Sub { st = s1; callbacks = (module CB1) },
Sub { st = s2; callbacks = (module CB2) } ) ~time_ns =
CB1.on_init s1 ~time_ns;
CB2.on_init s2 ~time_ns

let on_shutdown
( Sub { st = s1; callbacks = (module CB1) },
Sub { st = s2; callbacks = (module CB2) } ) ~time_ns =
CB1.on_shutdown s1 ~time_ns;
CB2.on_shutdown s2 ~time_ns

let on_name_thread
( Sub { st = s1; callbacks = (module CB1) },
Sub { st = s2; callbacks = (module CB2) } ) ~time_ns ~tid ~name =
CB1.on_name_thread s1 ~time_ns ~tid ~name;
CB2.on_name_thread s2 ~time_ns ~tid ~name

let on_name_process
( Sub { st = s1; callbacks = (module CB1) },
Sub { st = s2; callbacks = (module CB2) } ) ~time_ns ~tid ~name =
CB1.on_name_process s1 ~time_ns ~tid ~name;
CB2.on_name_process s2 ~time_ns ~tid ~name

let on_enter_span
( Sub { st = s1; callbacks = (module CB1) },
Sub { st = s2; callbacks = (module CB2) } ) ~__FUNCTION__ ~__FILE__
~__LINE__ ~time_ns ~tid ~data ~name span =
CB1.on_enter_span s1 ~__FUNCTION__ ~__FILE__ ~__LINE__ ~time_ns ~tid ~data
~name span;
CB2.on_enter_span s2 ~__FUNCTION__ ~__FILE__ ~__LINE__ ~time_ns ~tid ~data
~name span

let on_exit_span
( Sub { st = s1; callbacks = (module CB1) },
Sub { st = s2; callbacks = (module CB2) } ) ~time_ns ~tid span =
CB1.on_exit_span s1 ~time_ns ~tid span;
CB2.on_exit_span s2 ~time_ns ~tid span

let on_add_data
( Sub { st = s1; callbacks = (module CB1) },
Sub { st = s2; callbacks = (module CB2) } ) ~data span =
CB1.on_add_data s1 ~data span;
CB2.on_add_data s2 ~data span

let on_message
( Sub { st = s1; callbacks = (module CB1) },
Sub { st = s2; callbacks = (module CB2) } ) ~time_ns ~tid ~span ~data
msg =
CB1.on_message s1 ~time_ns ~tid ~span ~data msg;
CB2.on_message s2 ~time_ns ~tid ~span ~data msg

let on_counter
( Sub { st = s1; callbacks = (module CB1) },
Sub { st = s2; callbacks = (module CB2) } ) ~time_ns ~tid ~data ~name
n =
CB1.on_counter s1 ~time_ns ~tid ~data ~name n;
CB2.on_counter s2 ~time_ns ~tid ~data ~name n

let on_enter_manual_span
( Sub { st = s1; callbacks = (module CB1) },
Sub { st = s2; callbacks = (module CB2) } ) ~__FUNCTION__ ~__FILE__
~__LINE__ ~time_ns ~tid ~parent ~data ~name ~flavor ~trace_id span =
CB1.on_enter_manual_span s1 ~__FUNCTION__ ~__FILE__ ~__LINE__ ~time_ns
~tid ~parent ~data ~name ~flavor ~trace_id span;
CB2.on_enter_manual_span s2 ~__FUNCTION__ ~__FILE__ ~__LINE__ ~time_ns
~tid ~parent ~data ~name ~flavor ~trace_id span

let on_exit_manual_span
( Sub { st = s1; callbacks = (module CB1) },
Sub { st = s2; callbacks = (module CB2) } ) ~time_ns ~tid ~name ~data
~flavor ~trace_id span =
CB1.on_exit_manual_span s1 ~time_ns ~tid ~name ~data ~flavor ~trace_id
span;
CB2.on_exit_manual_span s2 ~time_ns ~tid ~name ~data ~flavor ~trace_id
span
end
end

let tee (s1 : t) (s2 : t) : t =
let st = s1, s2 in
Sub { st; callbacks = (module Tee_cb) }

0 comments on commit d8059e9

Please sign in to comment.