From ce0b6b4618610d4befcf89bf89f44d0fc21c01b9 Mon Sep 17 00:00:00 2001 From: Patrick Ferris Date: Thu, 7 Sep 2023 17:33:52 +0100 Subject: [PATCH] Make meio-runtime-events a library --- dune-project | 2 +- example/zero.ml | 6 +- meio-runtime-events.opam | 31 ++ src/bin/dune | 1 + src/bin/meio.ml | 8 +- src/lib/console.ml | 2 +- src/lib/dune | 9 +- src/lib/help.ml | 2 +- src/lib/lf_queue.ml | 140 +++++++++ src/lib/logging.ml | 2 +- src/lib/meio.ml | 10 +- src/lib/state.ml | 17 +- src/lib/state.mli | 3 +- src/lib/task.ml | 20 +- src/lib/task.mli | 17 +- src/lib/task_tree.ml | 49 ++-- src/lib/task_tree.mli | 9 +- src/runtime_events/dune | 6 + src/runtime_events/meio_runtime_events.ml | 323 +++++++++++++++++++++ src/runtime_events/meio_runtime_events.mli | 73 +++++ 20 files changed, 664 insertions(+), 66 deletions(-) create mode 100644 meio-runtime-events.opam create mode 100644 src/lib/lf_queue.ml create mode 100644 src/runtime_events/dune create mode 100644 src/runtime_events/meio_runtime_events.ml create mode 100644 src/runtime_events/meio_runtime_events.mli diff --git a/dune-project b/dune-project index 361600e..548126f 100644 --- a/dune-project +++ b/dune-project @@ -1,2 +1,2 @@ (lang dune 2.9) -(name ec) +(name meio) diff --git a/example/zero.ml b/example/zero.ml index bbca728..09f607f 100644 --- a/example/zero.ml +++ b/example/zero.ml @@ -8,9 +8,9 @@ let main clock = (* Eio.Switch.run ~name:"forked context" @@ fun sw -> traceln "in another context I do this" *)) *) (* for i = 0 to 10 do - Eio.Switch.run ~name:(Fmt.str "switch %d" i) @@ fun sw -> - traceln "welcome %d" i - done; *) + Eio.Switch.run ~name:(Fmt.str "switch %d" i) @@ fun sw -> + traceln "welcome %d" i + done; *) Eio.Time.sleep clock 100000.0 let () = diff --git a/meio-runtime-events.opam b/meio-runtime-events.opam new file mode 100644 index 0000000..90c2e4b --- /dev/null +++ b/meio-runtime-events.opam @@ -0,0 +1,31 @@ +# This file is generated by dune, edit dune-project instead +opam-version: "2.0" +synopsis: "Monitor live Eio programs" +description: + "Eio-console provides an executable that allows you to monitor OCaml programs using Eventring." +maintainer: ["patrick@sirref.org"] +authors: ["Patrick Ferris"] +license: "MIT" +homepage: "https://github.com/patricoferris/eio-console" +bug-reports: "https://github.com/patricoferris/eio-console/issues" +depends: [ + "dune" {>= "2.9"} + "odoc" {with-doc} +] +build: [ + ["dune" "subst"] {dev} + [ + "dune" + "build" + "-p" + name + "-j" + jobs + "--promote-install-files=false" + "@install" + "@runtest" {with-test} + "@doc" {with-doc} + ] + ["dune" "install" "-p" name "--create-install-files" name] +] +dev-repo: "git+https://github.com/patricoferris/eio-console.git" diff --git a/src/bin/dune b/src/bin/dune index 60ac72f..e8e3eff 100644 --- a/src/bin/dune +++ b/src/bin/dune @@ -1,4 +1,5 @@ (executable (name meio) (public_name meio) + (package meio) (libraries meio)) diff --git a/src/bin/meio.ml b/src/bin/meio.ml index fbed467..f86bf30 100644 --- a/src/bin/meio.ml +++ b/src/bin/meio.ml @@ -1,4 +1,4 @@ -let run _stdenv exec_args = +let run exec_args = let argsl = String.split_on_char ' ' exec_args in let executable_filename = List.hd argsl in let argsl = Array.of_list argsl in @@ -12,7 +12,9 @@ let run _stdenv exec_args = |] (Unix.environment ()) in - let dev_null = Unix.openfile Filename.null [ Unix.O_WRONLY; Unix.O_KEEPEXEC ] 0o666 in + let dev_null = + Unix.openfile Filename.null [ Unix.O_WRONLY; Unix.O_KEEPEXEC ] 0o666 + in let child_pid = Unix.create_process_env executable_filename argsl env Unix.stdin dev_null dev_null @@ -27,4 +29,4 @@ let run _stdenv exec_args = in Unix.unlink ring_file -let () = Eio_main.run @@ fun stdenv -> run stdenv Sys.argv.(1) +let () = run Sys.argv.(1) diff --git a/src/lib/console.ml b/src/lib/console.ml index 7b5fe2e..3960060 100644 --- a/src/lib/console.ml +++ b/src/lib/console.ml @@ -147,7 +147,7 @@ let render_task sort now ~depth ~filtered let kind = W.string ~attr (match kind with - | Cancellation_context _ -> "cc" + | Meio_runtime_events.Cancellation_context _ -> "cc" | Task -> "task" | _ -> "??") in diff --git a/src/lib/dune b/src/lib/dune index 671de41..aadb032 100644 --- a/src/lib/dune +++ b/src/lib/dune @@ -4,4 +4,11 @@ (foreign_stubs (language c) (names meio_console_stubs)) - (libraries runtime_events ptime logs fmt nottui hdr_histogram)) + (libraries + runtime_events + ptime + logs + fmt + nottui + hdr_histogram + meio-runtime-events)) diff --git a/src/lib/help.ml b/src/lib/help.ml index 29b6df5..aac3e65 100644 --- a/src/lib/help.ml +++ b/src/lib/help.ml @@ -22,7 +22,7 @@ let footer sort screen = space; key_help ~attr:(attr `Task) 'e' "Fiber info"; space; - key_help ~attr:(attr `Logs) 'l' ("Logs"); + key_help ~attr:(attr `Logs) 'l' "Logs"; space; key_help 's' ("Sort " ^ Sort.to_string sort); space; diff --git a/src/lib/lf_queue.ml b/src/lib/lf_queue.ml new file mode 100644 index 0000000..e0601ca --- /dev/null +++ b/src/lib/lf_queue.ml @@ -0,0 +1,140 @@ +(* Copyright (C) 2021 Anil Madhavapeddy + Copyright (C) 2022 Thomas Leonard + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. *) + +(* A lock-free multi-producer, single-consumer, thread-safe queue without support for cancellation. + This makes a good data structure for a scheduler's run queue. + + See: "Implementing lock-free queues" + https://people.cs.pitt.edu/~jacklange/teaching/cs2510-f12/papers/implementing_lock_free.pdf + + It is simplified slightly because we don't need multiple consumers. + Therefore [head] is not atomic. *) + +exception Closed + +module Node : sig + type 'a t = { next : 'a opt Atomic.t; mutable value : 'a } + and +'a opt + + val make : next:'a opt -> 'a -> 'a t + + val none : 'a opt + (** [t.next = none] means that [t] is currently the last node. *) + + val closed : 'a opt + (** [t.next = closed] means that [t] will always be the last node. *) + + val some : 'a t -> 'a opt + val fold : 'a opt -> none:(unit -> 'b) -> some:('a t -> 'b) -> 'b +end = struct + (* https://github.com/ocaml/RFCs/pull/14 should remove the need for magic here *) + + type +'a opt (* special | 'a t *) + type 'a t = { next : 'a opt Atomic.t; mutable value : 'a } + type special = Nothing | Closed + + let none : 'a. 'a opt = Obj.magic Nothing + let closed : 'a. 'a opt = Obj.magic Closed + let some (t : 'a t) : 'a opt = Obj.magic t + + let fold (opt : 'a opt) ~none:n ~some = + if opt == none then n () + else if opt == closed then raise Closed + else some (Obj.magic opt : 'a t) + + let make ~next value = { value; next = Atomic.make next } +end + +type 'a t = { tail : 'a Node.t Atomic.t; mutable head : 'a Node.t } +(* [head] is the last node dequeued (or a dummy node, initially). + [head.next] gives the real first node, if not [Node.none]. + If [tail.next] is [none] then it is the last node in the queue. + Otherwise, [tail.next] is a node that is closer to the tail. *) + +let push t x = + let node = Node.(make ~next:none) x in + let rec aux () = + let p = Atomic.get t.tail in + (* While [p.next == none], [p] is the last node in the queue. *) + if Atomic.compare_and_set p.next Node.none (Node.some node) then + (* [node] has now been added to the queue (and possibly even consumed). + Update [tail], unless someone else already did it for us. *) + ignore (Atomic.compare_and_set t.tail p node : bool) + else + (* Someone else added a different node first ([p.next] is not [none]). + Make [t.tail] more up-to-date, if it hasn't already changed, and try again. *) + Node.fold (Atomic.get p.next) + ~none:(fun () -> assert false) + ~some:(fun p_next -> + ignore (Atomic.compare_and_set t.tail p p_next : bool); + aux ()) + in + aux () + +let rec push_head t x = + let p = t.head in + let next = Atomic.get p.next in + if next == Node.closed then raise Closed; + let node = Node.make ~next x in + if Atomic.compare_and_set p.next next (Node.some node) then + if + (* We don't want to let [tail] get too far behind, so if the queue was empty, move it to the new node. *) + next == Node.none + then ignore (Atomic.compare_and_set t.tail p node : bool) + else + ( (* If the queue wasn't empty, there's nothing to do. + Either tail isn't at head or there is some [push] thread working to update it. + Either [push] will update it directly to the new tail, or will update it to [node] + and then retry. Either way, it ends up at the real tail. *) ) + else ( + (* Someone else changed it first. This can only happen if the queue was empty. *) + assert (next == Node.none); + push_head t x) + +let rec close (t : 'a t) = + (* Mark the tail node as final. *) + let p = Atomic.get t.tail in + if not (Atomic.compare_and_set p.next Node.none Node.closed) then + (* CAS failed because [p] is no longer the tail (or is already closed). *) + Node.fold (Atomic.get p.next) + ~none:(fun () -> assert false) + (* Can't switch from another state to [none] *) + ~some:(fun p_next -> + (* Make [tail] more up-to-date if it hasn't changed already *) + ignore (Atomic.compare_and_set t.tail p p_next : bool); + (* Retry *) + close t) + +let pop t = + let p = t.head in + (* [p] is the previously-popped item. *) + let node = Atomic.get p.next in + Node.fold node + ~none:(fun () -> None) + ~some:(fun node -> + t.head <- node; + let v = node.value in + node.value <- Obj.magic (); + (* So it can be GC'd *) + Some v) + +let is_empty t = + Node.fold (Atomic.get t.head.next) + ~none:(fun () -> true) + ~some:(fun _ -> false) + +let create () = + let dummy = { Node.value = Obj.magic (); next = Atomic.make Node.none } in + { tail = Atomic.make dummy; head = dummy } diff --git a/src/lib/logging.ml b/src/lib/logging.ml index 0f6b41d..ecfd6a3 100644 --- a/src/lib/logging.ml +++ b/src/lib/logging.ml @@ -1,4 +1,4 @@ -module Queue = Eio_utils.Lf_queue +module Queue = Lf_queue let table = Lwd_table.make () let waiting = Queue.create () diff --git a/src/lib/meio.ml b/src/lib/meio.ml index 41d66c8..fca15d0 100644 --- a/src/lib/meio.ml +++ b/src/lib/meio.ml @@ -1,10 +1,10 @@ -module Ctf = Eio.Private.Ctf +module Ctf = Meio_runtime_events let add_callback = Runtime_events.Callbacks.add_user_event let task_events ~latency_begin ~latency_end q = let current_id = ref (-1) in - let module Queue = Eio_utils.Lf_queue in + let module Queue = Lf_queue in let evs = Runtime_events.Callbacks.create ~runtime_begin:latency_begin ~runtime_end:latency_end @@ -14,7 +14,9 @@ let task_events ~latency_begin ~latency_end q = in let id_event_callback d ts c ((i : Ctf.id), v) = match (Runtime_events.User.tag c, v) with - | Ctf.Created, (Ctf.Task | Ctf.Cancellation_context _) -> + | ( Meio_runtime_events.Created, + (Meio_runtime_events.Task | Meio_runtime_events.Cancellation_context _) + ) -> Queue.push q (`Created ((i :> int), !current_id, d, ts, v)) | _ -> () in @@ -82,7 +84,7 @@ let screens duration hist sort = let min_thresh = 1e-6 let sleepf f = if f > min_thresh then Unix.sleepf f -module Queue = Eio_utils.Lf_queue +module Queue = Lf_queue let runtime_event_loop ~child_pid ~q ~stop ~cursor ~callbacks = let max_sleep = 0.01 in diff --git a/src/lib/state.ml b/src/lib/state.ml index f0a87fc..42c6a2b 100644 --- a/src/lib/state.ml +++ b/src/lib/state.ml @@ -2,11 +2,13 @@ let tasks = Task_tree.make () let set_parent ~child ~parent ts = - Task_tree.set_parent tasks ~child:(Task.Id.eio_of_int child) - ~parent:(Task.Id.eio_of_int parent) + Task_tree.set_parent tasks + ~child:(Task.Id.extern_of_int child) + ~parent:(Task.Id.extern_of_int parent) (Runtime_events.Timestamp.to_int64 ts) let add_tasks (id, parent_id, domain, ts, kind) = + Logs.info (fun f -> f "Created %i" id); let task = Task.create ~id ~domain ~parent_id (Runtime_events.Timestamp.to_int64 ts) @@ -15,23 +17,24 @@ let add_tasks (id, parent_id, domain, ts, kind) = Task_tree.add tasks task let update_loc i loc = - Task_tree.update tasks (Task.Id.eio_of_int i) (fun t -> + Task_tree.update tasks (Task.Id.extern_of_int i) (fun t -> { t with Task.loc = loc :: t.loc }) let update_logs i logs = - Task_tree.update tasks (Task.Id.eio_of_int i) (fun t -> + Task_tree.update tasks (Task.Id.extern_of_int i) (fun t -> + (* ignore (failwith(Fmt.str "Log updating for %a" Task.Id.pp t.id)); *) { t with Task.logs = logs :: t.logs }) let update_name i name = - Task_tree.update tasks (Task.Id.eio_of_int i) (fun t -> + Task_tree.update tasks (Task.Id.extern_of_int i) (fun t -> { t with Task.name = name :: t.name }) let switch_to ~id ~domain:_ ts = - Task_tree.update_active tasks ~id:(Task.Id.eio_of_int id) + Task_tree.update_active tasks ~id:(Task.Id.extern_of_int id) (Runtime_events.Timestamp.to_int64 ts) let resolved v ts = - Task_tree.update tasks (Task.Id.eio_of_int v) (fun t -> + Task_tree.update tasks (Task.Id.extern_of_int v) (fun t -> { t with status = Resolved (Runtime_events.Timestamp.to_int64 ts) }) let terminated status ts = diff --git a/src/lib/state.mli b/src/lib/state.mli index b95c57c..8a05921 100644 --- a/src/lib/state.mli +++ b/src/lib/state.mli @@ -1,7 +1,8 @@ val tasks : Task_tree.t val add_tasks : - int * int * int * Runtime_events.Timestamp.t * Eio.Private.Ctf.event -> unit + int * int * int * Runtime_events.Timestamp.t * Meio_runtime_events.event -> + unit val update_loc : int -> string -> unit val update_logs : int -> string -> unit diff --git a/src/lib/task.ml b/src/lib/task.ml index 5a4d81a..ce7e972 100644 --- a/src/lib/task.ml +++ b/src/lib/task.ml @@ -54,29 +54,29 @@ end type display = Auto | Yes | No | Toggle_requested module Id = struct - type t = { eio : int; counter : int; global_counter : int ref } + type t = { extern : int; counter : int; global_counter : int ref } - let make eio = { eio; counter = 0; global_counter = ref 0 } + let make extern = { extern; counter = 0; global_counter = ref 0 } let fork t = incr t.global_counter; { t with counter = !(t.global_counter) } let pp fmt t = - if t.counter == 0 then Fmt.int fmt t.eio - else Fmt.pf fmt "%d.%d" t.eio t.counter + if t.counter == 0 then Fmt.int fmt t.extern + else Fmt.pf fmt "%d.%d" t.extern t.counter let compare a b = - match Int.compare a.eio b.eio with + match Int.compare a.extern b.extern with | 0 -> Int.compare a.counter b.counter | v -> v - type eio = int + type extern = int - let pp_eio = Fmt.int - let eio t = t.eio + let pp_extern = Fmt.int + let to_extern t = t.extern - let eio_of_int t = + let extern_of_int t = assert (t >= -1); t end @@ -91,7 +91,7 @@ type t = { loc : string list; logs : string list; status : status; - kind : Eio.Private.Ctf.event; + kind : Meio_runtime_events.event; selected : bool ref; display : display ref; } diff --git a/src/lib/task.mli b/src/lib/task.mli index 23ab6c7..939eae1 100644 --- a/src/lib/task.mli +++ b/src/lib/task.mli @@ -17,14 +17,14 @@ type display = Auto | Yes | No | Toggle_requested module Id : sig type t - type eio + type extern val pp : t Fmt.t val compare : t -> t -> int val fork : t -> t - val eio : t -> eio - val pp_eio : eio Fmt.t - val eio_of_int : int -> eio + val to_extern : t -> extern + val pp_extern : extern Fmt.t + val extern_of_int : int -> extern end type t = { @@ -37,7 +37,7 @@ type t = { loc : string list; logs : string list; status : status; - kind : Eio.Private.Ctf.event; + kind : Meio_runtime_events.event; selected : bool ref; display : display ref; } @@ -45,6 +45,11 @@ type t = { val is_active : t -> bool val create : - id:int -> domain:int -> parent_id:int -> int64 -> Eio.Private.Ctf.event -> t + id:int -> + domain:int -> + parent_id:int -> + int64 -> + Meio_runtime_events.event -> + t val ui : t -> Nottui.ui diff --git a/src/lib/task_tree.ml b/src/lib/task_tree.ml index fe1ebeb..eaa5b1c 100644 --- a/src/lib/task_tree.ml +++ b/src/lib/task_tree.ml @@ -3,9 +3,9 @@ and 'a c = 'a tree list ref type t = { root : Task.t tree; - pending : (Task.Id.eio, Task.t) Hashtbl.t; - by_id : (Task.Id.eio, Task.t tree) Hashtbl.t; - mutable active_id : Task.Id.eio; + pending : (Task.Id.extern, Task.t) Hashtbl.t; + by_id : (Task.Id.extern, Task.t tree) Hashtbl.t; + mutable active_id : Task.Id.extern; mutable waiters : Task.t option Lwd.prim list; } @@ -21,34 +21,33 @@ let make () = } in let root = { node; parent = ref []; children = ref [] } in - let eio_id = Task.Id.eio node.id in - Hashtbl.add by_id eio_id root; - { - root; - by_id; - active_id = eio_id; - pending = Hashtbl.create 100; - waiters = []; - } + let e_id = Task.Id.to_extern node.id in + Hashtbl.add by_id e_id root; + { root; by_id; active_id = e_id; pending = Hashtbl.create 100; waiters = [] } let node p task = { children = ref []; node = task; parent = p } let invalidate t = List.iter Lwd.invalidate t.waiters let add t (task : Task.t) = match task.kind with - | Eio.Private.Ctf.Cancellation_context _ -> ( - match Hashtbl.find_opt t.by_id (Task.Id.eio_of_int task.parent_id) with + | Meio_runtime_events.Cancellation_context _ | Meio_runtime_events.Task -> ( + match Hashtbl.find_opt t.by_id (Task.Id.extern_of_int task.parent_id) with | None -> Logs.warn (fun f -> f "Couldn't find parent %d of %a" task.parent_id Task.Id.pp task.id) | Some p -> + Logs.info (fun f -> + f "Adding parent %a with %a" Task.Id.pp p.node.id Task.Id.pp + task.id); let node = node p.children task in - Hashtbl.add t.by_id (Task.Id.eio task.id) node; + Hashtbl.add t.by_id (Task.Id.to_extern task.id) node; p.children := node :: !(p.children); invalidate t) | _ -> - Hashtbl.add t.pending (Task.Id.eio task.id) task; + Logs.info (fun f -> + f "Adding %a" Task.Id.pp_extern (Task.Id.to_extern task.id)); + Hashtbl.add t.pending (Task.Id.to_extern task.id) task; invalidate t let update t id fn = @@ -56,7 +55,7 @@ let update t id fn = | None -> ( match Hashtbl.find_opt t.pending id with | None -> - Logs.warn (fun f -> f "Couldn't update fiber %a" Task.Id.pp_eio id) + Logs.warn (fun f -> f "Couldn't update fiber %a" Task.Id.pp_extern id) | Some v -> Hashtbl.replace t.pending id (fn v); invalidate t) @@ -71,7 +70,7 @@ let update_active t ~id ts = let value = Int64.sub ts start in if value < 0L then Logs.err (fun f -> - f "Invalid timestamp for %a (%Ld)" Task.Id.pp_eio t.active_id + f "Invalid timestamp for %a (%Ld)" Task.Id.pp_extern t.active_id value) else Task.Busy.add node.busy value; { node with status = Paused ts } @@ -82,18 +81,18 @@ let update_active t ~id ts = let is_cancellation_context task = match task.Task.kind with - | Eio.Private.Ctf.Cancellation_context _ -> true + | Meio_runtime_events.Cancellation_context _ -> true | _ -> false let set_parent t ~child ~parent ts = Logs.debug (fun f -> - f "set parent %a -> %a" Task.Id.pp_eio child Task.Id.pp_eio parent); + f "set parent %a -> %a" Task.Id.pp_extern child Task.Id.pp_extern parent); match Hashtbl.find_opt t.by_id parent with | None -> () | Some parent -> ( match Hashtbl.find_opt t.pending child with | Some child_task -> - Logs.debug (fun f -> f "new child %a" Task.Id.pp_eio child); + Logs.debug (fun f -> f "new child %a" Task.Id.pp_extern child); let node = node parent.children child_task in parent.children := node :: !(parent.children); Hashtbl.remove t.pending child; @@ -108,7 +107,8 @@ let set_parent t ~child ~parent ts = let parent_already_has_child = List.find_opt (fun c -> - Task.Id.eio c.node.Task.id = Task.Id.eio child.node.id) + Task.Id.to_extern c.node.Task.id + = Task.Id.to_extern child.node.id) !(parent.children) in match parent_already_has_child with @@ -130,13 +130,14 @@ let set_parent t ~child ~parent ts = let new_child_node = node parent.children new_child in parent.children := new_child_node :: !(parent.children); new_child_node.parent <- parent.children; - Hashtbl.replace t.by_id (Task.Id.eio new_child.id) + Hashtbl.replace t.by_id + (Task.Id.to_extern new_child.id) new_child_node; invalidate t | Some parent_child -> child.node <- { child.node with status = Resolved ts }; Hashtbl.replace t.by_id - (Task.Id.eio parent_child.node.Task.id) + (Task.Id.to_extern parent_child.node.Task.id) parent_child; invalidate t))) diff --git a/src/lib/task_tree.mli b/src/lib/task_tree.mli index 53ffb78..2c750f6 100644 --- a/src/lib/task_tree.mli +++ b/src/lib/task_tree.mli @@ -2,9 +2,12 @@ type t val make : unit -> t val add : t -> Task.t -> unit -val update : t -> Task.Id.eio -> (Task.t -> Task.t) -> unit -val update_active : t -> id:Task.Id.eio -> int64 -> unit -val set_parent : t -> child:Task.Id.eio -> parent:Task.Id.eio -> int64 -> unit +val update : t -> Task.Id.extern -> (Task.t -> Task.t) -> unit +val update_active : t -> id:Task.Id.extern -> int64 -> unit + +val set_parent : + t -> child:Task.Id.extern -> parent:Task.Id.extern -> int64 -> unit + val iter : t -> (Task.t -> unit) -> unit val iter_mut : t -> (Task.t -> Task.t) -> unit val iter_with_prev : t -> (prev:Task.t option -> Task.t -> unit) -> unit diff --git a/src/runtime_events/dune b/src/runtime_events/dune new file mode 100644 index 0000000..4386c44 --- /dev/null +++ b/src/runtime_events/dune @@ -0,0 +1,6 @@ +(library + (name meio_runtime_events) + (public_name meio-runtime-events) + (libraries astring fmt runtime_events) + (enabled_if + (>= %{ocaml_version} 5.1.0))) diff --git a/src/runtime_events/meio_runtime_events.ml b/src/runtime_events/meio_runtime_events.ml new file mode 100644 index 0000000..5c15e2b --- /dev/null +++ b/src/runtime_events/meio_runtime_events.ml @@ -0,0 +1,323 @@ +(* ID allocation *) + +type id = int + +(* Event types *) + +type hiatus_reason = Wait_for_work + +type cancellation_context = + | Choose + | Pick + | Join + | Switch + | Protect + | Sub + | Root + +type event = + | Wait + | Task + | Bind + | Try + | Map + | Condition + | On_success + | On_failure + | On_termination + | On_any + | Ignore_result + | Async + | Promise + | Semaphore + | Switch + | Stream + | Mutex + | Cancellation_context of { purpose : cancellation_context; protected : bool } + | System_thread + +let int_of_cc_type = function + | Choose -> 0 + | Pick -> 1 + | Join -> 2 + | Switch -> 3 + | Protect -> 4 + | Sub -> 5 + | Root -> 6 + +let serialize_thread_type ~ofs buf t = + let id = + match t with + | Wait -> 0 + | Task -> 1 + | Bind -> 2 + | Try -> 3 + | Map -> 7 + | Condition -> 8 + | On_success -> 9 + | On_failure -> 10 + | On_termination -> 11 + | On_any -> 12 + | Ignore_result -> 13 + | Async -> 14 + | Promise -> 15 + | Semaphore -> 16 + | Switch -> 17 + | Stream -> 18 + | Mutex -> 19 + | Cancellation_context _ -> 20 + | System_thread -> 21 + in + Bytes.set_int8 buf ofs id; + match t with + | Cancellation_context { protected; purpose } -> + Bytes.set_int8 buf (ofs + 1) (int_of_cc_type purpose); + Bytes.set_int8 buf (ofs + 2) (Bool.to_int protected); + 3 + | _ -> 1 + +let cc_to_string = function + | Choose -> "choose" + | Pick -> "pick" + | Join -> "join" + | Switch -> "switch" + | Protect -> "protect" + | Sub -> "sub" + | Root -> "root" + +let event_to_string (t : event) = + match t with + | Wait -> "wait" + | Task -> "task" + | Bind -> "bind" + | Map -> "map" + | Try -> "try" + | Condition -> "condition" + | On_success -> "on-success" + | On_failure -> "on-failure" + | On_termination -> "on-termination" + | On_any -> "on-any" + | Ignore_result -> "ignore-result" + | Async -> "async" + | Promise -> "promise" + | Semaphore -> "semaphore" + | Switch -> "switch" + | Stream -> "stream" + | Mutex -> "mutex" + | Cancellation_context { purpose; _ } -> + "cancellation-context(" ^ cc_to_string purpose ^ ")" + | System_thread -> "system-thread" + +let int_to_cc_type = function + | 0 -> Choose + | 1 -> Pick + | 2 -> Join + | 3 -> Switch + | 4 -> Protect + | 5 -> Sub + | 6 -> Root + | _ -> assert false + +let parse_thread_type ~ofs buf = + match Bytes.get_int8 buf ofs with + | 0 -> Wait + | 1 -> Task + | 2 -> Bind + | 3 -> Try + | 7 -> Map + | 8 -> Condition + | 9 -> On_success + | 10 -> On_failure + | 11 -> On_termination + | 12 -> On_any + | 13 -> Ignore_result + | 14 -> Async + | 15 -> Promise + | 16 -> Semaphore + | 17 -> Switch + | 18 -> Stream + | 19 -> Mutex + | 20 -> + let purpose = Bytes.get_int8 buf (ofs + 1) |> int_to_cc_type in + let protected = Bytes.get_int8 buf (ofs + 2) == 1 in + Cancellation_context { purpose; protected } + | 21 -> System_thread + | _ -> assert false + +(* Runtime events registration *) + +type Runtime_events.User.tag += Created + +let created_type = + let encode buf ((child : int), (thread_type : event)) = + Bytes.set_int32_le buf 0 (Int32.of_int child); + 4 + serialize_thread_type ~ofs:4 buf thread_type + in + let decode buf _size = + let child = Bytes.get_int32_le buf 0 |> Int32.to_int in + let thread_type = parse_thread_type ~ofs:4 buf in + (child, thread_type) + in + Runtime_events.Type.register ~encode ~decode + +let created = Runtime_events.User.register "eio.created" Created created_type + +let two_ids_type = + let encode buf ((child : int), i) = + Bytes.set_int32_le buf 0 (Int32.of_int child); + Bytes.set_int32_le buf 4 (Int32.of_int i); + 8 + in + let decode buf _size = + let child = Bytes.get_int32_le buf 0 |> Int32.to_int in + let i = Bytes.get_int32_le buf 4 |> Int32.to_int in + (child, i) + in + Runtime_events.Type.register ~encode ~decode + +type Runtime_events.User.tag += Read + +let read = Runtime_events.User.register "eio.read" Read two_ids_type + +type Runtime_events.User.tag += Try_read + +let try_read = Runtime_events.User.register "eio.try_read" Try_read two_ids_type + +type Runtime_events.User.tag += Parent + +let parent = Runtime_events.User.register "eio.parent" Parent two_ids_type + +type Runtime_events.User.tag += Failed + +let labelled_type = + let encode buf ((child : int), exn) = + (* Check size of buf and use smallest size which means we may + have to truncate the label. *) + let available_buf_len = Bytes.length buf - 1 in + let exn_len = String.length exn in + let data_len = min available_buf_len exn_len in + Bytes.set_int32_le buf 0 (Int32.of_int child); + Bytes.blit_string exn 0 buf 4 data_len; + data_len + 4 + in + let decode buf size = + let child = Bytes.get_int32_le buf 0 |> Int32.to_int in + let size = size - 4 in + let target = Bytes.create size in + Bytes.blit buf 4 target 0 size; + (child, Bytes.unsafe_to_string target) + in + Runtime_events.Type.register ~encode ~decode + +let failed = Runtime_events.User.register "eio.fail" Failed labelled_type + +type Runtime_events.User.tag += Resolved + +let resolved = + Runtime_events.User.(register "eio.resolved" Resolved Runtime_events.Type.int) + +type Runtime_events.User.tag += Name + +let named = Runtime_events.User.register "eio.name" Name labelled_type + +type Runtime_events.User.tag += Loc + +let located = Runtime_events.User.register "eio.loc" Loc labelled_type + +type Runtime_events.User.tag += Log + +let logged = Runtime_events.User.register "eio.log" Log labelled_type + +type Runtime_events.User.tag += Switch + +let switch = + Runtime_events.User.register "eio.switch" Switch Runtime_events.Type.int + +type Runtime_events.User.tag += Signal + +let signal = Runtime_events.User.register "eio.signal" Signal two_ids_type + +type Runtime_events.User.tag += Suspend + +let suspend = + Runtime_events.User.register "eio.suspend" Suspend Runtime_events.Type.unit + +(* Runtime events generation *) + +let add_event = Runtime_events.User.write + +let note_created child thread_type = + assert ((child :> int) >= 0); + add_event created (child, thread_type) + +let note_read ~reader input = add_event read (reader, input) +let note_try_read thread input = add_event try_read (thread, input) +let note_signal ~src dst = add_event signal (src, dst) + +let note_resolved p ~ex = + match ex with + | Some ex -> + let msg = Printexc.to_string ex in + add_event failed (p, msg) + | None -> add_event resolved p + +let note_log thread msg = add_event logged (thread, msg) +let note_location thread msg = add_event located (thread, msg) +let note_name thread msg = add_event named (thread, msg) +let note_switch new_current = add_event switch new_current +let note_hiatus Wait_for_work = add_event suspend () +let note_parent ~child ~parent:p = add_event parent (child, p) + +let demangle x = + List.flatten + (List.map + (fun i -> + Astring.String.cuts ~sep:"__" i + |> List.fold_left + (fun a b -> + match (a, b) with + | [], b -> [ b ] + | v, "" -> v + | a :: v, s when Astring.Char.Ascii.is_lower s.[0] -> + (a ^ "_" ^ s) :: v + | v, s -> s :: v) + [] + |> List.rev) + x) + +let is_outer raw_entry = + let slot = Printexc.backtrace_slots_of_raw_entry raw_entry in + match slot with + | None -> None + | Some slots -> + Array.find_map + (fun slot -> + let ( let* ) = Option.bind in + let* loc = Printexc.Slot.location slot in + let* name = Printexc.Slot.name slot in + let* name = + match String.split_on_char '.' name |> demangle with + | "Eio_core" :: _ -> None + | "Eio" :: _ -> None + | "Eio_linux" :: _ -> None + | "Eio_luv" :: _ -> None + | "Eio_main" :: _ -> None + | "Stdlib" :: _ -> None + | "Meio_runtime_events" :: _ -> None + | "Miou" :: _ -> None + | "Dune_exe" :: v -> Some (String.concat "." v) + | v -> Some (String.concat "." v) + in + Some (Fmt.str "%s (%s:%d)" name loc.filename loc.line_number)) + slots + +let dune_exe_strategy stack = + let first acc s = match acc with Some _ as v -> v | _ -> is_outer s in + List.fold_left first None stack + +let get_caller () = + let p = Printexc.get_callstack 30 |> Printexc.raw_backtrace_to_string in + let stack = + Printexc.get_callstack 30 |> Printexc.raw_backtrace_entries |> Array.to_list + in + match dune_exe_strategy stack with Some v -> v | None -> p diff --git a/src/runtime_events/meio_runtime_events.mli b/src/runtime_events/meio_runtime_events.mli new file mode 100644 index 0000000..e3dee58 --- /dev/null +++ b/src/runtime_events/meio_runtime_events.mli @@ -0,0 +1,73 @@ +(** This library is used to write event traces using OCaml's runtime events infrastructure. *) + +type id = int +type hiatus_reason = Wait_for_work + +type cancellation_context = + | Choose + | Pick + | Join + | Switch + | Protect + | Sub + | Root + +type event = + | Wait + | Task + | Bind + | Try + | Map + | Condition + | On_success + | On_failure + | On_termination + | On_any + | Ignore_result + | Async + | Promise + | Semaphore + | Switch + | Stream + | Mutex + | Cancellation_context of { purpose : cancellation_context; protected : bool } + | System_thread (** Types of threads or other recorded objects. *) + +val event_to_string : event -> string + +(** Meio event types and tags *) + +val created_type : (id * event) Runtime_events.Type.t + +type Runtime_events.User.tag += Created + +val labelled_type : (id * string) Runtime_events.Type.t + +type Runtime_events.User.tag += Failed | Log | Name | Loc + +val two_ids_type : (id * id) Runtime_events.Type.t + +type Runtime_events.User.tag += Read | Try_read | Signal | Parent + +(* int type *) + +type Runtime_events.User.tag += Resolved | Switch + +(* unit type *) + +type Runtime_events.User.tag += Suspend + +(** Producing events *) + +val note_created : id -> event -> unit +val note_read : reader:id -> id -> unit +val note_try_read : id -> id -> unit +val note_signal : src:id -> id -> unit +val note_resolved : id -> ex:exn option -> unit +val note_log : id -> string -> unit +val note_location : id -> string -> unit +val note_name : id -> string -> unit +val note_switch : id -> unit +val note_hiatus : hiatus_reason -> unit +val note_parent : child:id -> parent:id -> unit +val get_caller : unit -> string