Skip to content

Commit

Permalink
CP-32622: replace select in proxy with polly
Browse files Browse the repository at this point in the history
Signed-off-by: Edwin Török <[email protected]>
  • Loading branch information
edwintorok committed Nov 28, 2023
1 parent d9951c6 commit 4456546
Showing 1 changed file with 47 additions and 36 deletions.
83 changes: 47 additions & 36 deletions lib/xapi-stdext-unix/unixext.ml
Original file line number Diff line number Diff line change
Expand Up @@ -422,31 +422,47 @@ let string_of_signal x =
else
Printf.sprintf "(ocaml signal %d with an unknown name)" x

let with_polly f =
let polly = Polly.create () in
let finally () = Polly.close polly in
Xapi_stdext_pervasives.Pervasiveext.finally (fun () -> f polly) finally

let proxy (a : Unix.file_descr) (b : Unix.file_descr) =
let size = 64 * 1024 in
(* [a'] is read from [a] and will be written to [b] *)
(* [b'] is read from [b] and will be written to [a] *)
let a' = CBuf.empty size and b' = CBuf.empty size in
Unix.set_nonblock a ;
Unix.set_nonblock b ;
with_polly @@ fun polly ->
try
while true do
let r =
(if CBuf.should_read a' then [a] else [])
@ if CBuf.should_read b' then [b] else []
in
let w =
(if CBuf.should_write a' then [b] else [])
@ if CBuf.should_write b' then [a] else []
in
let any = ref false in
if CBuf.should_read a' then (
Polly.add polly a Polly.Events.(inp lor oneshot) ;
any := true
) ;
if CBuf.should_read b' then (
Polly.add polly b Polly.Events.(inp lor oneshot) ;
any := true
) ;
if CBuf.should_write a' then (
Polly.add polly b Polly.Events.(out lor oneshot) ;
any := true
) ;
if CBuf.should_write b' then (
Polly.add polly a Polly.Events.(out lor oneshot) ;
any := true
) ;
(* If we can't make any progress (because fds have been closed), then stop *)
if r = [] && w = [] then raise End_of_file ;
let r, w, _ = Unix.select r w [] (-1.0) in
(* Do the writing before the reading *)
List.iter
(fun fd -> if a = fd then CBuf.write b' a else CBuf.write a' b)
w ;
List.iter (fun fd -> if a = fd then CBuf.read a' a else CBuf.read b' b) r ;
if not !any then raise End_of_file ;
Polly.wait_fold polly 4 (-1) () (fun _polly fd events () ->
(* Do the writing before the reading *)
if Polly.Events.(test out events) then
if a = fd then CBuf.write b' a else CBuf.write a' b ;
if Polly.Events.(test inp events) then
if a = fd then CBuf.read a' a else CBuf.read b' b
) ;
(* If there's nothing else to read or write then signal the other end *)
List.iter
(fun (buf, fd) ->
Expand Down Expand Up @@ -539,32 +555,27 @@ let to_milliseconds ms = ms *. 1000. |> int_of_float
[setsockopt_float] to set the timeout
[clear_nonblock] to ensure the socket is non-blocking
*)
let with_polly kind fd f =
let polly = Polly.create () in
let finally () = Polly.close polly in
Xapi_stdext_pervasives.Pervasiveext.finally
(fun () ->
Polly.add polly fd kind ;
let wait remaining_time =
if remaining_time < 0. then raise Timeout ;
(* allow a timeout of 0 to check for current state without waiting *)
let ready =
Polly.wait polly 1 (to_milliseconds remaining_time)
@@ fun _ event_on_fd _ -> assert (event_on_fd == fd)
in
if ready = 0 then raise Timeout
in
f wait fd
)
finally
let with_polly_wait kind fd f =
with_polly @@ fun polly ->
Polly.add polly fd kind ;
let wait remaining_time =
if remaining_time < 0. then raise Timeout ;
(* allow a timeout of 0 to check for current state without waiting *)
let ready =
Polly.wait polly 1 (to_milliseconds remaining_time)
@@ fun _ event_on_fd _ -> assert (event_on_fd == fd)
in
if ready = 0 then raise Timeout
in
f wait fd

(* Write as many bytes to a file descriptor as possible from data before a given clock time. *)
(* Raises Timeout exception if the number of bytes written is less than the specified length. *)
(* Writes into the file descriptor at the current cursor position. *)
let time_limited_write_internal
(write : Unix.file_descr -> 'a -> int -> int -> int) filedesc length data
target_response_time =
with_polly Polly.Events.out filedesc @@ fun wait filedesc ->
with_polly_wait Polly.Events.out filedesc @@ fun wait filedesc ->
let total_bytes_to_write = length in
let bytes_written = ref 0 in
let now = ref (Unix.gettimeofday ()) in
Expand Down Expand Up @@ -601,7 +612,7 @@ let time_limited_write_substring filedesc length data target_response_time =
(* Raises Timeout exception if the number of bytes read is less than the desired number. *)
(* Reads from the file descriptor at the current cursor position. *)
let time_limited_read filedesc length target_response_time =
with_polly Polly.Events.inp filedesc @@ fun wait filedesc ->
with_polly_wait Polly.Events.inp filedesc @@ fun wait filedesc ->
let total_bytes_to_read = length in
let bytes_read = ref 0 in
let buf = Bytes.make total_bytes_to_read '\000' in
Expand Down Expand Up @@ -632,7 +643,7 @@ let time_limited_read filedesc length target_response_time =

let time_limited_single_read filedesc length ~max_wait =
let buf = Bytes.make length '\000' in
with_polly Polly.Events.inp filedesc @@ fun wait filedesc ->
with_polly_wait Polly.Events.inp filedesc @@ fun wait filedesc ->
wait max_wait ;
let bytes =
try Unix.read filedesc buf 0 length
Expand Down

0 comments on commit 4456546

Please sign in to comment.