-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathclient.ml
78 lines (71 loc) · 2.33 KB
/
client.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
open! Eio.Std
module O = Ocons_core
open Reckon_shim
(*let () = Ocons_conn_mgr.set_debug_flag ()*)
module Ocons_cli_shim : Types.S = struct
type mgr =
{ cmgr: O.Client.cmgr
; clock: float Eio.Time.clock_ty Eio.Time.clock
; callback: Types.recv_callback }
let submit_rate, run_sr = O.Utils.InternalReporter.rate_reporter "submit_rate"
let submit (t : mgr) (rid, op) =
run_sr := true ;
submit_rate () ;
let op =
match op with
| Types.Write (k, v) ->
[|O.Types.Write (k, v)|]
| Types.Read k ->
[|O.Types.Read k|]
in
try
let cmd =
O.Types.Command.{op; id= rid; submitted = Eio.Time.now t.clock; trace_start= Eio.Time.now t.clock}
in
dtraceln "Submitting request %d" rid ;
O.Client.submit_request t.cmgr cmd
with e when O.Utils.is_not_cancel e -> t.callback (rid, Failure (`Error e))
let create ~sw ~(env : Eio_unix.Stdenv.base) ~(f : Types.recv_callback) ~urls
~id =
O.Utils.InternalReporter.run ~sw env#clock 1. ;
let create_conn addr sw =
let c = Eio.Net.connect ~sw env#net addr in
O.Utils.set_nodelay c ; c
in
let con_ress =
urls
|> List.mapi (fun idx addr ->
(idx, (create_conn addr :> O.Client.Cmgr.resolver)) )
in
let recv_callback ((_, (rid, res, trace)) : _ * O.Client.response) =
O.Utils.TRACE.ex_cli trace ;
let res =
match res with
| O.Types.Failure msg ->
Types.Failure (`Msg msg)
| O.Types.(Success _) ->
Types.Success
in
f (rid, res)
in
let cmgr, r = Promise.create () in
Fiber.fork ~sw (fun () ->
try
Eio.Switch.run
@@ fun sw ->
let cmgr =
O.Client.create_cmgr (*~use_domain:env#domain_mgr*)
~kind:(Ocons_conn_mgr.Iter recv_callback) ~sw con_ress id
(fun () -> Eio.Time.sleep env#clock 1.)
in
Promise.resolve r cmgr
with e ->
Eio.traceln "Error while receiving: %a" Fmt.exn_backtrace
(e, Printexc.get_raw_backtrace ()) ) ;
let cmgr = Promise.await cmgr in
{cmgr; callback= f; clock= (env#clock)}
let flush (t : mgr) =
if false then Ocons_conn_mgr.flush_all t.cmgr else Fiber.yield ()
end
module T = Make (Ocons_cli_shim)
let () = exit @@ Cmdliner.Cmd.eval T.cmd