From fbe8a71cb88efdf224dc79ca7e9794d45fa2d2bd Mon Sep 17 00:00:00 2001 From: Thomas Leonard Date: Sun, 31 Dec 2023 10:17:29 +0000 Subject: [PATCH 1/2] Add Flow.copy benchmark --- bench/bench_copy.ml | 42 ++++++++++++++++++++++++++++++++++++++++++ bench/main.ml | 1 + 2 files changed, 43 insertions(+) create mode 100644 bench/bench_copy.ml diff --git a/bench/bench_copy.ml b/bench/bench_copy.ml new file mode 100644 index 000000000..e369813ea --- /dev/null +++ b/bench/bench_copy.ml @@ -0,0 +1,42 @@ +open Eio.Std + +let chunk_size = 1 lsl 16 +let n_chunks = 10000 +let n_bytes = n_chunks * chunk_size + +let run_client sock = + Fiber.both + (fun () -> + let chunk = Cstruct.create chunk_size in + for _ = 1 to n_chunks do + Eio.Flow.write sock [chunk] + done; + Eio.Flow.shutdown sock `Send + ) + (fun () -> + let chunk = Cstruct.create chunk_size in + for _ = 1 to n_chunks do + Eio.Flow.read_exact sock chunk + done + ) + +let time name service = + Switch.run @@ fun sw -> + let client_sock, server_sock = Eio_unix.Net.socketpair_stream ~sw () in + let t0 = Unix.gettimeofday () in + Fiber.both + (fun () -> service server_sock) + (fun () -> run_client client_sock); + let t1 = Unix.gettimeofday () in + let time = t1 -. t0 in + let bytes_per_second = float n_bytes /. time in + traceln "%s: %.2f MB/s" name (bytes_per_second /. 1024. /. 1024.); + (Metric.create name (`Float bytes_per_second) "bytes/s" (name ^ " Flow.copy")) + +let run _env = + [ + time "default" (fun sock -> Eio.Flow.copy sock sock); + time "buf_read" (fun sock -> + let r = Eio.Buf_read.of_flow sock ~initial_size:(64 * 1024) ~max_size:(64 * 1024) |> Eio.Buf_read.as_flow in + Eio.Flow.copy r sock); + ] diff --git a/bench/main.ml b/bench/main.ml index 866aff92c..49825eb49 100644 --- a/bench/main.ml +++ b/bench/main.ml @@ -11,6 +11,7 @@ let benchmarks = [ "Eio_unix.Fd", Bench_fd.run; "File.stat", Bench_fstat.run; "Path.stat", Bench_stat.run; + "Flow.copy", Bench_copy.run; ] let usage_error () = From 19c43d71536d60dd79e77e55a2377c4689b87c0b Mon Sep 17 00:00:00 2001 From: Thomas Leonard Date: Sun, 31 Dec 2023 10:18:27 +0000 Subject: [PATCH 2/2] Optimise Flow.copy with Buf_read.as_flow By default, Flow.copy creates a 4KB buffer and copies the data through that. However, if the source of the copy is a buffered reader then it is much more efficient to use its buffer directly. This updates the flow you get from `Buf_read.as_flow` to offer this optimisation, and also updates the eio_posix backend's flow to use this optimisation when available (eio_linux already supported this). Detected in a benchmark by Leandro Ostera. --- bench/bench_copy.ml | 4 +++- lib_eio/buf_read.ml | 8 +++++++- lib_eio_posix/flow.ml | 17 ++++++++++++++++- 3 files changed, 26 insertions(+), 3 deletions(-) diff --git a/bench/bench_copy.ml b/bench/bench_copy.ml index e369813ea..b39fece19 100644 --- a/bench/bench_copy.ml +++ b/bench/bench_copy.ml @@ -1,3 +1,5 @@ +(* A client opens a connection to an echo service and sends a load of data via it. *) + open Eio.Std let chunk_size = 1 lsl 16 @@ -31,7 +33,7 @@ let time name service = let time = t1 -. t0 in let bytes_per_second = float n_bytes /. time in traceln "%s: %.2f MB/s" name (bytes_per_second /. 1024. /. 1024.); - (Metric.create name (`Float bytes_per_second) "bytes/s" (name ^ " Flow.copy")) + Metric.create name (`Float bytes_per_second) "bytes/s" (name ^ " Flow.copy") let run _env = [ diff --git a/lib_eio/buf_read.ml b/lib_eio/buf_read.ml index 5baad9093..51e2213b7 100644 --- a/lib_eio/buf_read.ml +++ b/lib_eio/buf_read.ml @@ -140,7 +140,13 @@ module F = struct consume t len; len - let read_methods = [] + let rsb t fn = + ensure t 1; + let data = peek t in + let sent = fn [data] in + consume t sent + + let read_methods = [Flow.Read_source_buffer rsb] end let as_flow = diff --git a/lib_eio_posix/flow.ml b/lib_eio_posix/flow.ml index 02aa42946..89f663375 100644 --- a/lib_eio_posix/flow.ml +++ b/lib_eio_posix/flow.ml @@ -64,7 +64,22 @@ module Impl = struct with Unix.Unix_error (code, name, arg) -> raise (Err.wrap code name arg) - let copy t ~src = Eio.Flow.Pi.simple_copy ~single_write t ~src + (* Copy using the [Read_source_buffer] optimisation. + Avoids a copy if the source already has the data. *) + let copy_with_rsb rsb dst = + try + while true do rsb (single_write dst) done + with End_of_file -> () + + let copy t ~src = + let Eio.Resource.T (src_t, ops) = src in + let module Src = (val (Eio.Resource.get ops Eio.Flow.Pi.Source)) in + let rec aux = function + | Eio.Flow.Read_source_buffer rsb :: _ -> copy_with_rsb (rsb src_t) t + | _ :: xs -> aux xs + | [] -> Eio.Flow.Pi.simple_copy ~single_write t ~src + in + aux Src.read_methods let single_read t buf = match Low_level.readv t [| buf |] with