diff --git a/README.md b/README.md index f5d7c59..c21dcd1 100644 --- a/README.md +++ b/README.md @@ -3,19 +3,17 @@ meio *Status: Experimental* -Meio is a CLI tool for monitoring programs using [Eio][]. It allows you to see, in real-time, the running and idle fibers of your program along with the structure of the fiber tree. Fibers are also labelled with where they were spawned from and each individual fiber carries extra information about how busy it is, how often it is entered and the debug logs that have been run from the fiber. Here's what Meio looks like: +Meio is a CLI tool for monitoring programs asynchronous programs like those that use [Eio][]. It allows you to see, in real-time, the running and idle fibers of your program along with the structure of the fiber tree. Fibers are also labelled with where they were spawned from and each individual fiber carries extra information about how busy it is, how often it is entered and the debug logs that have been run from the fiber. Here's what Meio looks like: ![Meio on asciicast](./.screencast/example.gif) ### Building meio -Meio uses custom events which will be available in OCaml 5.1. This means currently you must use an unreleased compiler and a few forked libraries. - -To build Meio locally, you can add a temporary opam-repository and use the custom-events compiler: +Meio uses custom events which is only available in OCaml 5.1 and beyond. ``` -$ opam repo add custom-events https://github.com/TheLortex/custom-events-opam-repository.git -$ opam switch create --no-install custom-events --packages ocaml-variants.5.0.0+custom-events --repositories=custom-events,default +$ opam update +$ opam switch create 5.1.0 $ opam install --deps-only . $ dune build ``` diff --git a/dune-project b/dune-project index 361600e..548126f 100644 --- a/dune-project +++ b/dune-project @@ -1,2 +1,2 @@ (lang dune 2.9) -(name ec) +(name meio) diff --git a/example/burn.ml b/example/burn.ml index 84a63ce..bc87300 100644 --- a/example/burn.ml +++ b/example/burn.ml @@ -1,17 +1,17 @@ open Eio let woops_sleepy ~clock = - Private.Ctf.set_name "sleeper"; + Private.Tracing.set_name "sleeper"; Switch.run ~name:"sleeper" @@ fun sw -> Fiber.fork ~sw (fun () -> - Private.Ctf.set_name "unix sleeper"; + Private.Tracing.set_name "unix sleeper"; (* Woops! Wrong sleep function, we blocked the fiber *) traceln "Woops! Blocked by Unix.sleepf"; Unix.sleepf 5.; Time.sleep clock 10.) let spawn ~clock min max = - Private.Ctf.set_name (Fmt.str "spawn %d %d" min max); + Private.Tracing.set_name (Fmt.str "spawn %d %d" min max); (* Some GC action *) for _i = 0 to 100 do ignore (Sys.opaque_identity @@ Array.init 1000000 float_of_int) @@ -19,7 +19,7 @@ let spawn ~clock min max = Switch.run @@ fun sw -> for i = min to max do Fiber.fork ~sw (fun () -> - Private.Ctf.set_name (Fmt.str "worker>%d" i); + Private.Tracing.set_name (Fmt.str "worker>%d" i); for i = 0 to max do (* Some more GC action *) for _i = 0 to 100 do @@ -39,7 +39,7 @@ let main clock = Switch.run ~name:"main" @@ fun sw -> (* A long running task *) Fiber.fork ~sw (fun () -> - Private.Ctf.set_name "waiter"; + Private.Tracing.set_name "waiter"; traceln "stuck waiting :("; Promise.await p; traceln "Done"); @@ -53,6 +53,6 @@ let main clock = let () = Eio_main.run @@ fun env -> - Ctf.with_tracing @@ fun () -> + Tracing.with_tracing @@ fun () -> let clock = Stdenv.clock env in main clock diff --git a/example/burn_domains.ml b/example/burn_domains.ml index 73da2ec..a1574a2 100644 --- a/example/burn_domains.ml +++ b/example/burn_domains.ml @@ -42,7 +42,7 @@ let main dom_mgr clock = let () = Eio_main.run @@ fun env -> - Ctf.with_tracing @@ fun () -> + Tracing.with_tracing @@ fun () -> let clock = Stdenv.clock env in let dom_mgr = Stdenv.domain_mgr env in main dom_mgr clock diff --git a/example/deadlock.ml b/example/deadlock.ml index 86bb168..d80dd6c 100644 --- a/example/deadlock.ml +++ b/example/deadlock.ml @@ -4,7 +4,7 @@ let fork wait = Switch.run @@ fun sw -> Fiber.fork ~sw (fun () -> (* Also add a really big label to test the handling of that in CTF. *) - Eio.Private.Ctf.log (String.make 5000 'e'); + Eio.Private.Tracing.log (String.make 5000 'e'); Promise.await wait) let main () = diff --git a/example/main.ml b/example/main.ml index a908654..3f37fbd 100644 --- a/example/main.ml +++ b/example/main.ml @@ -21,6 +21,6 @@ let main clock = let () = Eio_main.run @@ fun env -> - Ctf.with_tracing @@ fun () -> + Tracing.with_tracing @@ fun () -> let clock = Stdenv.clock env in main clock diff --git a/example/zero.ml b/example/zero.ml index bbca728..09f607f 100644 --- a/example/zero.ml +++ b/example/zero.ml @@ -8,9 +8,9 @@ let main clock = (* Eio.Switch.run ~name:"forked context" @@ fun sw -> traceln "in another context I do this" *)) *) (* for i = 0 to 10 do - Eio.Switch.run ~name:(Fmt.str "switch %d" i) @@ fun sw -> - traceln "welcome %d" i - done; *) + Eio.Switch.run ~name:(Fmt.str "switch %d" i) @@ fun sw -> + traceln "welcome %d" i + done; *) Eio.Time.sleep clock 100000.0 let () = diff --git a/meio-runtime-events.opam b/meio-runtime-events.opam new file mode 100644 index 0000000..90c2e4b --- /dev/null +++ b/meio-runtime-events.opam @@ -0,0 +1,31 @@ +# This file is generated by dune, edit dune-project instead +opam-version: "2.0" +synopsis: "Monitor live Eio programs" +description: + "Eio-console provides an executable that allows you to monitor OCaml programs using Eventring." +maintainer: ["patrick@sirref.org"] +authors: ["Patrick Ferris"] +license: "MIT" +homepage: "https://github.com/patricoferris/eio-console" +bug-reports: "https://github.com/patricoferris/eio-console/issues" +depends: [ + "dune" {>= "2.9"} + "odoc" {with-doc} +] +build: [ + ["dune" "subst"] {dev} + [ + "dune" + "build" + "-p" + name + "-j" + jobs + "--promote-install-files=false" + "@install" + "@runtest" {with-test} + "@doc" {with-doc} + ] + ["dune" "install" "-p" name "--create-install-files" name] +] +dev-repo: "git+https://github.com/patricoferris/eio-console.git" diff --git a/meio.opam b/meio.opam index d744978..c2c7a68 100644 --- a/meio.opam +++ b/meio.opam @@ -9,15 +9,15 @@ license: "MIT" homepage: "https://github.com/patricoferris/eio-console" bug-reports: "https://github.com/patricoferris/eio-console/issues" depends: [ - "dune" {>= "2.9"} - "eio_main" - "eio" {= "0.8.1+custom-events.2"} # available in https://github.com/TheLortex/custom-events-opam-repository + "ocaml" {>= "5.1.0"} + "dune" {>= "2.9"} "astring" "ptime" "nottui" "cmdliner" "hdr_histogram" - "odoc" {with-doc} + "eio_main" {with-test} + "odoc" {with-doc} ] build: [ ["dune" "subst"] {dev} @@ -36,3 +36,7 @@ build: [ ["dune" "install" "-p" name "--create-install-files" name] ] dev-repo: "git+https://github.com/patricoferris/eio-console.git" +pin-depends:[ + [ "eio.dev" "git+https://github.com/patricoferris/eio#dbd52c66a6e463720b48cb835d01f0e5c7c0b170"] + [ "eio_main.dev" "git+https://github.com/patricoferris/eio#dbd52c66a6e463720b48cb835d01f0e5c7c0b170" ] +] diff --git a/src/bin/dune b/src/bin/dune index 2bea978..e8e3eff 100644 --- a/src/bin/dune +++ b/src/bin/dune @@ -1,4 +1,5 @@ (executable (name meio) (public_name meio) - (libraries meio eio_main)) + (package meio) + (libraries meio)) diff --git a/src/bin/meio.ml b/src/bin/meio.ml index fbed467..f86bf30 100644 --- a/src/bin/meio.ml +++ b/src/bin/meio.ml @@ -1,4 +1,4 @@ -let run _stdenv exec_args = +let run exec_args = let argsl = String.split_on_char ' ' exec_args in let executable_filename = List.hd argsl in let argsl = Array.of_list argsl in @@ -12,7 +12,9 @@ let run _stdenv exec_args = |] (Unix.environment ()) in - let dev_null = Unix.openfile Filename.null [ Unix.O_WRONLY; Unix.O_KEEPEXEC ] 0o666 in + let dev_null = + Unix.openfile Filename.null [ Unix.O_WRONLY; Unix.O_KEEPEXEC ] 0o666 + in let child_pid = Unix.create_process_env executable_filename argsl env Unix.stdin dev_null dev_null @@ -27,4 +29,4 @@ let run _stdenv exec_args = in Unix.unlink ring_file -let () = Eio_main.run @@ fun stdenv -> run stdenv Sys.argv.(1) +let () = run Sys.argv.(1) diff --git a/src/lib/console.ml b/src/lib/console.ml index 7b5fe2e..3960060 100644 --- a/src/lib/console.ml +++ b/src/lib/console.ml @@ -147,7 +147,7 @@ let render_task sort now ~depth ~filtered let kind = W.string ~attr (match kind with - | Cancellation_context _ -> "cc" + | Meio_runtime_events.Cancellation_context _ -> "cc" | Task -> "task" | _ -> "??") in diff --git a/src/lib/dune b/src/lib/dune index 1c3cf6a..aadb032 100644 --- a/src/lib/dune +++ b/src/lib/dune @@ -4,4 +4,11 @@ (foreign_stubs (language c) (names meio_console_stubs)) - (libraries eio eio.utils runtime_events ptime logs nottui hdr_histogram)) + (libraries + runtime_events + ptime + logs + fmt + nottui + hdr_histogram + meio-runtime-events)) diff --git a/src/lib/help.ml b/src/lib/help.ml index 29b6df5..aac3e65 100644 --- a/src/lib/help.ml +++ b/src/lib/help.ml @@ -22,7 +22,7 @@ let footer sort screen = space; key_help ~attr:(attr `Task) 'e' "Fiber info"; space; - key_help ~attr:(attr `Logs) 'l' ("Logs"); + key_help ~attr:(attr `Logs) 'l' "Logs"; space; key_help 's' ("Sort " ^ Sort.to_string sort); space; diff --git a/src/lib/lf_queue.ml b/src/lib/lf_queue.ml new file mode 100644 index 0000000..e0601ca --- /dev/null +++ b/src/lib/lf_queue.ml @@ -0,0 +1,140 @@ +(* Copyright (C) 2021 Anil Madhavapeddy + Copyright (C) 2022 Thomas Leonard + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. *) + +(* A lock-free multi-producer, single-consumer, thread-safe queue without support for cancellation. + This makes a good data structure for a scheduler's run queue. + + See: "Implementing lock-free queues" + https://people.cs.pitt.edu/~jacklange/teaching/cs2510-f12/papers/implementing_lock_free.pdf + + It is simplified slightly because we don't need multiple consumers. + Therefore [head] is not atomic. *) + +exception Closed + +module Node : sig + type 'a t = { next : 'a opt Atomic.t; mutable value : 'a } + and +'a opt + + val make : next:'a opt -> 'a -> 'a t + + val none : 'a opt + (** [t.next = none] means that [t] is currently the last node. *) + + val closed : 'a opt + (** [t.next = closed] means that [t] will always be the last node. *) + + val some : 'a t -> 'a opt + val fold : 'a opt -> none:(unit -> 'b) -> some:('a t -> 'b) -> 'b +end = struct + (* https://github.com/ocaml/RFCs/pull/14 should remove the need for magic here *) + + type +'a opt (* special | 'a t *) + type 'a t = { next : 'a opt Atomic.t; mutable value : 'a } + type special = Nothing | Closed + + let none : 'a. 'a opt = Obj.magic Nothing + let closed : 'a. 'a opt = Obj.magic Closed + let some (t : 'a t) : 'a opt = Obj.magic t + + let fold (opt : 'a opt) ~none:n ~some = + if opt == none then n () + else if opt == closed then raise Closed + else some (Obj.magic opt : 'a t) + + let make ~next value = { value; next = Atomic.make next } +end + +type 'a t = { tail : 'a Node.t Atomic.t; mutable head : 'a Node.t } +(* [head] is the last node dequeued (or a dummy node, initially). + [head.next] gives the real first node, if not [Node.none]. + If [tail.next] is [none] then it is the last node in the queue. + Otherwise, [tail.next] is a node that is closer to the tail. *) + +let push t x = + let node = Node.(make ~next:none) x in + let rec aux () = + let p = Atomic.get t.tail in + (* While [p.next == none], [p] is the last node in the queue. *) + if Atomic.compare_and_set p.next Node.none (Node.some node) then + (* [node] has now been added to the queue (and possibly even consumed). + Update [tail], unless someone else already did it for us. *) + ignore (Atomic.compare_and_set t.tail p node : bool) + else + (* Someone else added a different node first ([p.next] is not [none]). + Make [t.tail] more up-to-date, if it hasn't already changed, and try again. *) + Node.fold (Atomic.get p.next) + ~none:(fun () -> assert false) + ~some:(fun p_next -> + ignore (Atomic.compare_and_set t.tail p p_next : bool); + aux ()) + in + aux () + +let rec push_head t x = + let p = t.head in + let next = Atomic.get p.next in + if next == Node.closed then raise Closed; + let node = Node.make ~next x in + if Atomic.compare_and_set p.next next (Node.some node) then + if + (* We don't want to let [tail] get too far behind, so if the queue was empty, move it to the new node. *) + next == Node.none + then ignore (Atomic.compare_and_set t.tail p node : bool) + else + ( (* If the queue wasn't empty, there's nothing to do. + Either tail isn't at head or there is some [push] thread working to update it. + Either [push] will update it directly to the new tail, or will update it to [node] + and then retry. Either way, it ends up at the real tail. *) ) + else ( + (* Someone else changed it first. This can only happen if the queue was empty. *) + assert (next == Node.none); + push_head t x) + +let rec close (t : 'a t) = + (* Mark the tail node as final. *) + let p = Atomic.get t.tail in + if not (Atomic.compare_and_set p.next Node.none Node.closed) then + (* CAS failed because [p] is no longer the tail (or is already closed). *) + Node.fold (Atomic.get p.next) + ~none:(fun () -> assert false) + (* Can't switch from another state to [none] *) + ~some:(fun p_next -> + (* Make [tail] more up-to-date if it hasn't changed already *) + ignore (Atomic.compare_and_set t.tail p p_next : bool); + (* Retry *) + close t) + +let pop t = + let p = t.head in + (* [p] is the previously-popped item. *) + let node = Atomic.get p.next in + Node.fold node + ~none:(fun () -> None) + ~some:(fun node -> + t.head <- node; + let v = node.value in + node.value <- Obj.magic (); + (* So it can be GC'd *) + Some v) + +let is_empty t = + Node.fold (Atomic.get t.head.next) + ~none:(fun () -> true) + ~some:(fun _ -> false) + +let create () = + let dummy = { Node.value = Obj.magic (); next = Atomic.make Node.none } in + { tail = Atomic.make dummy; head = dummy } diff --git a/src/lib/logging.ml b/src/lib/logging.ml index 0f6b41d..ecfd6a3 100644 --- a/src/lib/logging.ml +++ b/src/lib/logging.ml @@ -1,4 +1,4 @@ -module Queue = Eio_utils.Lf_queue +module Queue = Lf_queue let table = Lwd_table.make () let waiting = Queue.create () diff --git a/src/lib/meio.ml b/src/lib/meio.ml index 41d66c8..fca15d0 100644 --- a/src/lib/meio.ml +++ b/src/lib/meio.ml @@ -1,10 +1,10 @@ -module Ctf = Eio.Private.Ctf +module Ctf = Meio_runtime_events let add_callback = Runtime_events.Callbacks.add_user_event let task_events ~latency_begin ~latency_end q = let current_id = ref (-1) in - let module Queue = Eio_utils.Lf_queue in + let module Queue = Lf_queue in let evs = Runtime_events.Callbacks.create ~runtime_begin:latency_begin ~runtime_end:latency_end @@ -14,7 +14,9 @@ let task_events ~latency_begin ~latency_end q = in let id_event_callback d ts c ((i : Ctf.id), v) = match (Runtime_events.User.tag c, v) with - | Ctf.Created, (Ctf.Task | Ctf.Cancellation_context _) -> + | ( Meio_runtime_events.Created, + (Meio_runtime_events.Task | Meio_runtime_events.Cancellation_context _) + ) -> Queue.push q (`Created ((i :> int), !current_id, d, ts, v)) | _ -> () in @@ -82,7 +84,7 @@ let screens duration hist sort = let min_thresh = 1e-6 let sleepf f = if f > min_thresh then Unix.sleepf f -module Queue = Eio_utils.Lf_queue +module Queue = Lf_queue let runtime_event_loop ~child_pid ~q ~stop ~cursor ~callbacks = let max_sleep = 0.01 in diff --git a/src/lib/state.ml b/src/lib/state.ml index f0a87fc..42c6a2b 100644 --- a/src/lib/state.ml +++ b/src/lib/state.ml @@ -2,11 +2,13 @@ let tasks = Task_tree.make () let set_parent ~child ~parent ts = - Task_tree.set_parent tasks ~child:(Task.Id.eio_of_int child) - ~parent:(Task.Id.eio_of_int parent) + Task_tree.set_parent tasks + ~child:(Task.Id.extern_of_int child) + ~parent:(Task.Id.extern_of_int parent) (Runtime_events.Timestamp.to_int64 ts) let add_tasks (id, parent_id, domain, ts, kind) = + Logs.info (fun f -> f "Created %i" id); let task = Task.create ~id ~domain ~parent_id (Runtime_events.Timestamp.to_int64 ts) @@ -15,23 +17,24 @@ let add_tasks (id, parent_id, domain, ts, kind) = Task_tree.add tasks task let update_loc i loc = - Task_tree.update tasks (Task.Id.eio_of_int i) (fun t -> + Task_tree.update tasks (Task.Id.extern_of_int i) (fun t -> { t with Task.loc = loc :: t.loc }) let update_logs i logs = - Task_tree.update tasks (Task.Id.eio_of_int i) (fun t -> + Task_tree.update tasks (Task.Id.extern_of_int i) (fun t -> + (* ignore (failwith(Fmt.str "Log updating for %a" Task.Id.pp t.id)); *) { t with Task.logs = logs :: t.logs }) let update_name i name = - Task_tree.update tasks (Task.Id.eio_of_int i) (fun t -> + Task_tree.update tasks (Task.Id.extern_of_int i) (fun t -> { t with Task.name = name :: t.name }) let switch_to ~id ~domain:_ ts = - Task_tree.update_active tasks ~id:(Task.Id.eio_of_int id) + Task_tree.update_active tasks ~id:(Task.Id.extern_of_int id) (Runtime_events.Timestamp.to_int64 ts) let resolved v ts = - Task_tree.update tasks (Task.Id.eio_of_int v) (fun t -> + Task_tree.update tasks (Task.Id.extern_of_int v) (fun t -> { t with status = Resolved (Runtime_events.Timestamp.to_int64 ts) }) let terminated status ts = diff --git a/src/lib/state.mli b/src/lib/state.mli index b95c57c..8a05921 100644 --- a/src/lib/state.mli +++ b/src/lib/state.mli @@ -1,7 +1,8 @@ val tasks : Task_tree.t val add_tasks : - int * int * int * Runtime_events.Timestamp.t * Eio.Private.Ctf.event -> unit + int * int * int * Runtime_events.Timestamp.t * Meio_runtime_events.event -> + unit val update_loc : int -> string -> unit val update_logs : int -> string -> unit diff --git a/src/lib/task.ml b/src/lib/task.ml index 5a4d81a..ce7e972 100644 --- a/src/lib/task.ml +++ b/src/lib/task.ml @@ -54,29 +54,29 @@ end type display = Auto | Yes | No | Toggle_requested module Id = struct - type t = { eio : int; counter : int; global_counter : int ref } + type t = { extern : int; counter : int; global_counter : int ref } - let make eio = { eio; counter = 0; global_counter = ref 0 } + let make extern = { extern; counter = 0; global_counter = ref 0 } let fork t = incr t.global_counter; { t with counter = !(t.global_counter) } let pp fmt t = - if t.counter == 0 then Fmt.int fmt t.eio - else Fmt.pf fmt "%d.%d" t.eio t.counter + if t.counter == 0 then Fmt.int fmt t.extern + else Fmt.pf fmt "%d.%d" t.extern t.counter let compare a b = - match Int.compare a.eio b.eio with + match Int.compare a.extern b.extern with | 0 -> Int.compare a.counter b.counter | v -> v - type eio = int + type extern = int - let pp_eio = Fmt.int - let eio t = t.eio + let pp_extern = Fmt.int + let to_extern t = t.extern - let eio_of_int t = + let extern_of_int t = assert (t >= -1); t end @@ -91,7 +91,7 @@ type t = { loc : string list; logs : string list; status : status; - kind : Eio.Private.Ctf.event; + kind : Meio_runtime_events.event; selected : bool ref; display : display ref; } diff --git a/src/lib/task.mli b/src/lib/task.mli index 23ab6c7..939eae1 100644 --- a/src/lib/task.mli +++ b/src/lib/task.mli @@ -17,14 +17,14 @@ type display = Auto | Yes | No | Toggle_requested module Id : sig type t - type eio + type extern val pp : t Fmt.t val compare : t -> t -> int val fork : t -> t - val eio : t -> eio - val pp_eio : eio Fmt.t - val eio_of_int : int -> eio + val to_extern : t -> extern + val pp_extern : extern Fmt.t + val extern_of_int : int -> extern end type t = { @@ -37,7 +37,7 @@ type t = { loc : string list; logs : string list; status : status; - kind : Eio.Private.Ctf.event; + kind : Meio_runtime_events.event; selected : bool ref; display : display ref; } @@ -45,6 +45,11 @@ type t = { val is_active : t -> bool val create : - id:int -> domain:int -> parent_id:int -> int64 -> Eio.Private.Ctf.event -> t + id:int -> + domain:int -> + parent_id:int -> + int64 -> + Meio_runtime_events.event -> + t val ui : t -> Nottui.ui diff --git a/src/lib/task_tree.ml b/src/lib/task_tree.ml index fe1ebeb..9703dfa 100644 --- a/src/lib/task_tree.ml +++ b/src/lib/task_tree.ml @@ -3,9 +3,9 @@ and 'a c = 'a tree list ref type t = { root : Task.t tree; - pending : (Task.Id.eio, Task.t) Hashtbl.t; - by_id : (Task.Id.eio, Task.t tree) Hashtbl.t; - mutable active_id : Task.Id.eio; + pending : (Task.Id.extern, Task.t) Hashtbl.t; + by_id : (Task.Id.extern, Task.t tree) Hashtbl.t; + mutable active_id : Task.Id.extern; mutable waiters : Task.t option Lwd.prim list; } @@ -21,34 +21,33 @@ let make () = } in let root = { node; parent = ref []; children = ref [] } in - let eio_id = Task.Id.eio node.id in - Hashtbl.add by_id eio_id root; - { - root; - by_id; - active_id = eio_id; - pending = Hashtbl.create 100; - waiters = []; - } + let e_id = Task.Id.to_extern node.id in + Hashtbl.add by_id e_id root; + { root; by_id; active_id = e_id; pending = Hashtbl.create 100; waiters = [] } let node p task = { children = ref []; node = task; parent = p } let invalidate t = List.iter Lwd.invalidate t.waiters let add t (task : Task.t) = match task.kind with - | Eio.Private.Ctf.Cancellation_context _ -> ( - match Hashtbl.find_opt t.by_id (Task.Id.eio_of_int task.parent_id) with + | Meio_runtime_events.Cancellation_context _ -> ( + match Hashtbl.find_opt t.by_id (Task.Id.extern_of_int task.parent_id) with | None -> Logs.warn (fun f -> f "Couldn't find parent %d of %a" task.parent_id Task.Id.pp task.id) | Some p -> + Logs.info (fun f -> + f "Adding parent %a with %a" Task.Id.pp p.node.id Task.Id.pp + task.id); let node = node p.children task in - Hashtbl.add t.by_id (Task.Id.eio task.id) node; + Hashtbl.add t.by_id (Task.Id.to_extern task.id) node; p.children := node :: !(p.children); invalidate t) | _ -> - Hashtbl.add t.pending (Task.Id.eio task.id) task; + Logs.info (fun f -> + f "Adding %a" Task.Id.pp_extern (Task.Id.to_extern task.id)); + Hashtbl.add t.pending (Task.Id.to_extern task.id) task; invalidate t let update t id fn = @@ -56,7 +55,7 @@ let update t id fn = | None -> ( match Hashtbl.find_opt t.pending id with | None -> - Logs.warn (fun f -> f "Couldn't update fiber %a" Task.Id.pp_eio id) + Logs.warn (fun f -> f "Couldn't update fiber %a" Task.Id.pp_extern id) | Some v -> Hashtbl.replace t.pending id (fn v); invalidate t) @@ -71,7 +70,7 @@ let update_active t ~id ts = let value = Int64.sub ts start in if value < 0L then Logs.err (fun f -> - f "Invalid timestamp for %a (%Ld)" Task.Id.pp_eio t.active_id + f "Invalid timestamp for %a (%Ld)" Task.Id.pp_extern t.active_id value) else Task.Busy.add node.busy value; { node with status = Paused ts } @@ -82,18 +81,18 @@ let update_active t ~id ts = let is_cancellation_context task = match task.Task.kind with - | Eio.Private.Ctf.Cancellation_context _ -> true + | Meio_runtime_events.Cancellation_context _ -> true | _ -> false let set_parent t ~child ~parent ts = Logs.debug (fun f -> - f "set parent %a -> %a" Task.Id.pp_eio child Task.Id.pp_eio parent); + f "set parent %a -> %a" Task.Id.pp_extern child Task.Id.pp_extern parent); match Hashtbl.find_opt t.by_id parent with | None -> () | Some parent -> ( match Hashtbl.find_opt t.pending child with | Some child_task -> - Logs.debug (fun f -> f "new child %a" Task.Id.pp_eio child); + Logs.debug (fun f -> f "new child %a" Task.Id.pp_extern child); let node = node parent.children child_task in parent.children := node :: !(parent.children); Hashtbl.remove t.pending child; @@ -108,7 +107,8 @@ let set_parent t ~child ~parent ts = let parent_already_has_child = List.find_opt (fun c -> - Task.Id.eio c.node.Task.id = Task.Id.eio child.node.id) + Task.Id.to_extern c.node.Task.id + = Task.Id.to_extern child.node.id) !(parent.children) in match parent_already_has_child with @@ -130,13 +130,14 @@ let set_parent t ~child ~parent ts = let new_child_node = node parent.children new_child in parent.children := new_child_node :: !(parent.children); new_child_node.parent <- parent.children; - Hashtbl.replace t.by_id (Task.Id.eio new_child.id) + Hashtbl.replace t.by_id + (Task.Id.to_extern new_child.id) new_child_node; invalidate t | Some parent_child -> child.node <- { child.node with status = Resolved ts }; Hashtbl.replace t.by_id - (Task.Id.eio parent_child.node.Task.id) + (Task.Id.to_extern parent_child.node.Task.id) parent_child; invalidate t))) diff --git a/src/lib/task_tree.mli b/src/lib/task_tree.mli index 53ffb78..2c750f6 100644 --- a/src/lib/task_tree.mli +++ b/src/lib/task_tree.mli @@ -2,9 +2,12 @@ type t val make : unit -> t val add : t -> Task.t -> unit -val update : t -> Task.Id.eio -> (Task.t -> Task.t) -> unit -val update_active : t -> id:Task.Id.eio -> int64 -> unit -val set_parent : t -> child:Task.Id.eio -> parent:Task.Id.eio -> int64 -> unit +val update : t -> Task.Id.extern -> (Task.t -> Task.t) -> unit +val update_active : t -> id:Task.Id.extern -> int64 -> unit + +val set_parent : + t -> child:Task.Id.extern -> parent:Task.Id.extern -> int64 -> unit + val iter : t -> (Task.t -> unit) -> unit val iter_mut : t -> (Task.t -> Task.t) -> unit val iter_with_prev : t -> (prev:Task.t option -> Task.t -> unit) -> unit diff --git a/src/runtime_events/dune b/src/runtime_events/dune new file mode 100644 index 0000000..4386c44 --- /dev/null +++ b/src/runtime_events/dune @@ -0,0 +1,6 @@ +(library + (name meio_runtime_events) + (public_name meio-runtime-events) + (libraries astring fmt runtime_events) + (enabled_if + (>= %{ocaml_version} 5.1.0))) diff --git a/src/runtime_events/meio_runtime_events.ml b/src/runtime_events/meio_runtime_events.ml new file mode 100644 index 0000000..5c15e2b --- /dev/null +++ b/src/runtime_events/meio_runtime_events.ml @@ -0,0 +1,323 @@ +(* ID allocation *) + +type id = int + +(* Event types *) + +type hiatus_reason = Wait_for_work + +type cancellation_context = + | Choose + | Pick + | Join + | Switch + | Protect + | Sub + | Root + +type event = + | Wait + | Task + | Bind + | Try + | Map + | Condition + | On_success + | On_failure + | On_termination + | On_any + | Ignore_result + | Async + | Promise + | Semaphore + | Switch + | Stream + | Mutex + | Cancellation_context of { purpose : cancellation_context; protected : bool } + | System_thread + +let int_of_cc_type = function + | Choose -> 0 + | Pick -> 1 + | Join -> 2 + | Switch -> 3 + | Protect -> 4 + | Sub -> 5 + | Root -> 6 + +let serialize_thread_type ~ofs buf t = + let id = + match t with + | Wait -> 0 + | Task -> 1 + | Bind -> 2 + | Try -> 3 + | Map -> 7 + | Condition -> 8 + | On_success -> 9 + | On_failure -> 10 + | On_termination -> 11 + | On_any -> 12 + | Ignore_result -> 13 + | Async -> 14 + | Promise -> 15 + | Semaphore -> 16 + | Switch -> 17 + | Stream -> 18 + | Mutex -> 19 + | Cancellation_context _ -> 20 + | System_thread -> 21 + in + Bytes.set_int8 buf ofs id; + match t with + | Cancellation_context { protected; purpose } -> + Bytes.set_int8 buf (ofs + 1) (int_of_cc_type purpose); + Bytes.set_int8 buf (ofs + 2) (Bool.to_int protected); + 3 + | _ -> 1 + +let cc_to_string = function + | Choose -> "choose" + | Pick -> "pick" + | Join -> "join" + | Switch -> "switch" + | Protect -> "protect" + | Sub -> "sub" + | Root -> "root" + +let event_to_string (t : event) = + match t with + | Wait -> "wait" + | Task -> "task" + | Bind -> "bind" + | Map -> "map" + | Try -> "try" + | Condition -> "condition" + | On_success -> "on-success" + | On_failure -> "on-failure" + | On_termination -> "on-termination" + | On_any -> "on-any" + | Ignore_result -> "ignore-result" + | Async -> "async" + | Promise -> "promise" + | Semaphore -> "semaphore" + | Switch -> "switch" + | Stream -> "stream" + | Mutex -> "mutex" + | Cancellation_context { purpose; _ } -> + "cancellation-context(" ^ cc_to_string purpose ^ ")" + | System_thread -> "system-thread" + +let int_to_cc_type = function + | 0 -> Choose + | 1 -> Pick + | 2 -> Join + | 3 -> Switch + | 4 -> Protect + | 5 -> Sub + | 6 -> Root + | _ -> assert false + +let parse_thread_type ~ofs buf = + match Bytes.get_int8 buf ofs with + | 0 -> Wait + | 1 -> Task + | 2 -> Bind + | 3 -> Try + | 7 -> Map + | 8 -> Condition + | 9 -> On_success + | 10 -> On_failure + | 11 -> On_termination + | 12 -> On_any + | 13 -> Ignore_result + | 14 -> Async + | 15 -> Promise + | 16 -> Semaphore + | 17 -> Switch + | 18 -> Stream + | 19 -> Mutex + | 20 -> + let purpose = Bytes.get_int8 buf (ofs + 1) |> int_to_cc_type in + let protected = Bytes.get_int8 buf (ofs + 2) == 1 in + Cancellation_context { purpose; protected } + | 21 -> System_thread + | _ -> assert false + +(* Runtime events registration *) + +type Runtime_events.User.tag += Created + +let created_type = + let encode buf ((child : int), (thread_type : event)) = + Bytes.set_int32_le buf 0 (Int32.of_int child); + 4 + serialize_thread_type ~ofs:4 buf thread_type + in + let decode buf _size = + let child = Bytes.get_int32_le buf 0 |> Int32.to_int in + let thread_type = parse_thread_type ~ofs:4 buf in + (child, thread_type) + in + Runtime_events.Type.register ~encode ~decode + +let created = Runtime_events.User.register "eio.created" Created created_type + +let two_ids_type = + let encode buf ((child : int), i) = + Bytes.set_int32_le buf 0 (Int32.of_int child); + Bytes.set_int32_le buf 4 (Int32.of_int i); + 8 + in + let decode buf _size = + let child = Bytes.get_int32_le buf 0 |> Int32.to_int in + let i = Bytes.get_int32_le buf 4 |> Int32.to_int in + (child, i) + in + Runtime_events.Type.register ~encode ~decode + +type Runtime_events.User.tag += Read + +let read = Runtime_events.User.register "eio.read" Read two_ids_type + +type Runtime_events.User.tag += Try_read + +let try_read = Runtime_events.User.register "eio.try_read" Try_read two_ids_type + +type Runtime_events.User.tag += Parent + +let parent = Runtime_events.User.register "eio.parent" Parent two_ids_type + +type Runtime_events.User.tag += Failed + +let labelled_type = + let encode buf ((child : int), exn) = + (* Check size of buf and use smallest size which means we may + have to truncate the label. *) + let available_buf_len = Bytes.length buf - 1 in + let exn_len = String.length exn in + let data_len = min available_buf_len exn_len in + Bytes.set_int32_le buf 0 (Int32.of_int child); + Bytes.blit_string exn 0 buf 4 data_len; + data_len + 4 + in + let decode buf size = + let child = Bytes.get_int32_le buf 0 |> Int32.to_int in + let size = size - 4 in + let target = Bytes.create size in + Bytes.blit buf 4 target 0 size; + (child, Bytes.unsafe_to_string target) + in + Runtime_events.Type.register ~encode ~decode + +let failed = Runtime_events.User.register "eio.fail" Failed labelled_type + +type Runtime_events.User.tag += Resolved + +let resolved = + Runtime_events.User.(register "eio.resolved" Resolved Runtime_events.Type.int) + +type Runtime_events.User.tag += Name + +let named = Runtime_events.User.register "eio.name" Name labelled_type + +type Runtime_events.User.tag += Loc + +let located = Runtime_events.User.register "eio.loc" Loc labelled_type + +type Runtime_events.User.tag += Log + +let logged = Runtime_events.User.register "eio.log" Log labelled_type + +type Runtime_events.User.tag += Switch + +let switch = + Runtime_events.User.register "eio.switch" Switch Runtime_events.Type.int + +type Runtime_events.User.tag += Signal + +let signal = Runtime_events.User.register "eio.signal" Signal two_ids_type + +type Runtime_events.User.tag += Suspend + +let suspend = + Runtime_events.User.register "eio.suspend" Suspend Runtime_events.Type.unit + +(* Runtime events generation *) + +let add_event = Runtime_events.User.write + +let note_created child thread_type = + assert ((child :> int) >= 0); + add_event created (child, thread_type) + +let note_read ~reader input = add_event read (reader, input) +let note_try_read thread input = add_event try_read (thread, input) +let note_signal ~src dst = add_event signal (src, dst) + +let note_resolved p ~ex = + match ex with + | Some ex -> + let msg = Printexc.to_string ex in + add_event failed (p, msg) + | None -> add_event resolved p + +let note_log thread msg = add_event logged (thread, msg) +let note_location thread msg = add_event located (thread, msg) +let note_name thread msg = add_event named (thread, msg) +let note_switch new_current = add_event switch new_current +let note_hiatus Wait_for_work = add_event suspend () +let note_parent ~child ~parent:p = add_event parent (child, p) + +let demangle x = + List.flatten + (List.map + (fun i -> + Astring.String.cuts ~sep:"__" i + |> List.fold_left + (fun a b -> + match (a, b) with + | [], b -> [ b ] + | v, "" -> v + | a :: v, s when Astring.Char.Ascii.is_lower s.[0] -> + (a ^ "_" ^ s) :: v + | v, s -> s :: v) + [] + |> List.rev) + x) + +let is_outer raw_entry = + let slot = Printexc.backtrace_slots_of_raw_entry raw_entry in + match slot with + | None -> None + | Some slots -> + Array.find_map + (fun slot -> + let ( let* ) = Option.bind in + let* loc = Printexc.Slot.location slot in + let* name = Printexc.Slot.name slot in + let* name = + match String.split_on_char '.' name |> demangle with + | "Eio_core" :: _ -> None + | "Eio" :: _ -> None + | "Eio_linux" :: _ -> None + | "Eio_luv" :: _ -> None + | "Eio_main" :: _ -> None + | "Stdlib" :: _ -> None + | "Meio_runtime_events" :: _ -> None + | "Miou" :: _ -> None + | "Dune_exe" :: v -> Some (String.concat "." v) + | v -> Some (String.concat "." v) + in + Some (Fmt.str "%s (%s:%d)" name loc.filename loc.line_number)) + slots + +let dune_exe_strategy stack = + let first acc s = match acc with Some _ as v -> v | _ -> is_outer s in + List.fold_left first None stack + +let get_caller () = + let p = Printexc.get_callstack 30 |> Printexc.raw_backtrace_to_string in + let stack = + Printexc.get_callstack 30 |> Printexc.raw_backtrace_entries |> Array.to_list + in + match dune_exe_strategy stack with Some v -> v | None -> p diff --git a/src/runtime_events/meio_runtime_events.mli b/src/runtime_events/meio_runtime_events.mli new file mode 100644 index 0000000..e3dee58 --- /dev/null +++ b/src/runtime_events/meio_runtime_events.mli @@ -0,0 +1,73 @@ +(** This library is used to write event traces using OCaml's runtime events infrastructure. *) + +type id = int +type hiatus_reason = Wait_for_work + +type cancellation_context = + | Choose + | Pick + | Join + | Switch + | Protect + | Sub + | Root + +type event = + | Wait + | Task + | Bind + | Try + | Map + | Condition + | On_success + | On_failure + | On_termination + | On_any + | Ignore_result + | Async + | Promise + | Semaphore + | Switch + | Stream + | Mutex + | Cancellation_context of { purpose : cancellation_context; protected : bool } + | System_thread (** Types of threads or other recorded objects. *) + +val event_to_string : event -> string + +(** Meio event types and tags *) + +val created_type : (id * event) Runtime_events.Type.t + +type Runtime_events.User.tag += Created + +val labelled_type : (id * string) Runtime_events.Type.t + +type Runtime_events.User.tag += Failed | Log | Name | Loc + +val two_ids_type : (id * id) Runtime_events.Type.t + +type Runtime_events.User.tag += Read | Try_read | Signal | Parent + +(* int type *) + +type Runtime_events.User.tag += Resolved | Switch + +(* unit type *) + +type Runtime_events.User.tag += Suspend + +(** Producing events *) + +val note_created : id -> event -> unit +val note_read : reader:id -> id -> unit +val note_try_read : id -> id -> unit +val note_signal : src:id -> id -> unit +val note_resolved : id -> ex:exn option -> unit +val note_log : id -> string -> unit +val note_location : id -> string -> unit +val note_name : id -> string -> unit +val note_switch : id -> unit +val note_hiatus : hiatus_reason -> unit +val note_parent : child:id -> parent:id -> unit +val get_caller : unit -> string