diff --git a/lib/xapi-fdcaps/operations.ml b/lib/xapi-fdcaps/operations.ml index 12e74d60..7f7c9067 100644 --- a/lib/xapi-fdcaps/operations.ml +++ b/lib/xapi-fdcaps/operations.ml @@ -56,6 +56,8 @@ let pp ppf = let close t = Safefd.idempotent_close_exn t.fd +let fsync t = Unix.fsync (Safefd.unsafe_to_file_descr_exn t.fd) + let with_fd t f = let finally () = close t in Fun.protect ~finally (fun () -> f t) @@ -107,6 +109,14 @@ let creat path flags perm = (Unix.O_RDWR :: Unix.O_CREAT :: Unix.O_EXCL :: Unix.O_CLOEXEC :: flags) perm +let kind_of_fd fd = of_unix_kind Unix.LargeFile.((fstat fd).st_kind) + +let stdin = make_ro_exn (kind_of_fd Unix.stdin) Unix.stdin + +let stdout = make_wo_exn (kind_of_fd Unix.stdout) Unix.stdout + +let stderr = make_wo_exn (kind_of_fd Unix.stderr) Unix.stderr + let dev_null_out () = openfile_wo `chr "/dev/null" [] let dev_null_in () = openfile_ro `chr "/dev/null" [] @@ -122,6 +132,9 @@ let shutdown_send t = let shutdown_all t = Unix.shutdown (Safefd.unsafe_to_file_descr_exn t.fd) Unix.SHUTDOWN_ALL +let setsockopt_float t opt value = + Unix.setsockopt_float (Safefd.unsafe_to_file_descr_exn t.fd) opt value + let ftruncate t size = match t.custom_ftruncate with | None -> @@ -138,6 +151,18 @@ let read t buf off len = let single_write_substring t buf off len = Unix.single_write_substring (Safefd.unsafe_to_file_descr_exn t.fd) buf off len +let fstat t = Unix.LargeFile.fstat (Safefd.unsafe_to_file_descr_exn t.fd) + +let dup t = + { + t with + fd= + t.fd + |> Safefd.unsafe_to_file_descr_exn + |> Unix.dup + |> Safefd.of_file_descr + } + let set_nonblock t = Unix.set_nonblock (Safefd.unsafe_to_file_descr_exn t.fd) let clear_nonblock t = Unix.clear_nonblock (Safefd.unsafe_to_file_descr_exn t.fd) @@ -200,6 +225,60 @@ let with_temp_blk ?(sector_size = 512) name f = let setup () = Sys.set_signal Sys.sigpipe Sys.Signal_ignore +type ('a, 'b) operation = 'a t -> 'b -> int -> int -> int + +let repeat_read op fd buf off len = + let rec loop consumed = + let off = off + consumed and len = len - consumed in + if len = 0 then + consumed (* we filled the buffer *) + else + match op fd buf off len with + | 0 (* EOF *) + | (exception + Unix.( + Unix_error + ((ECONNRESET | ENOTCONN | EAGAIN | EWOULDBLOCK | EINTR), _, _)) + ) (* connection error or non-blocking socket *) -> + consumed + | n -> + assert (n >= 0) ; + assert (n <= len) ; + loop (consumed + n) + in + loop 0 + +let repeat_write op fd buf off len = + let rec loop written = + let off = off + written and len = len - written in + if len = 0 then + written (* we've written the entire buffer *) + else + match op fd buf off len with + | 0 + (* should never happen, but we cannot retry now or we'd enter an infinite loop *) + | (exception + Unix.( + Unix_error + ( ( ECONNRESET + | EPIPE + | EINTR + | ENETDOWN + | ENETUNREACH + | EAGAIN + | EWOULDBLOCK ) + , _ + , _ + )) + ) (* connection error or nonblocking socket *) -> + written + | n -> + assert (n >= 0) ; + assert (n <= len) ; + loop (written + n) + in + loop 0 + module For_test = struct let unsafe_fd_exn t = Safefd.unsafe_to_file_descr_exn t.fd end diff --git a/lib/xapi-fdcaps/operations.mli b/lib/xapi-fdcaps/operations.mli index e320c681..ee4a9f36 100644 --- a/lib/xapi-fdcaps/operations.mli +++ b/lib/xapi-fdcaps/operations.mli @@ -62,11 +62,26 @@ end (** {1 {!mod:Unix} wrappers} *) +val stdin : ([> rdonly], kind) make +(** [stdin] is a readonly file descriptor of unknown kind *) + +val stdout : ([> wronly], kind) make +(** [stdout] is a writeonly file descriptor of unknown kind *) + +val stderr : ([> wronly], kind) make +(** [stderr] is a writeonly file descriptor of unknown kind *) + val close : _ t -> unit (** [close t] closes t. Doesn't raise an exception if it is already closed. Other errors from the underlying {!val:Unix.close} are propagated. *) +val fsync : _ t -> unit +(** [fsync t] flushes [t] buffer to disk. + + Note that the file doesn't necessarily have to be writable, e.g. you can fsync a readonly open directory. + *) + val pipe : unit -> ([> rdonly], [> fifo]) make * ([> wronly], [> fifo]) make (** [pipe ()] creates an unnamed pipe. @see {!val:Unix.pipe} @@ -173,6 +188,12 @@ val single_write_substring : @see {!Unix.single_write_substring} *) +val fstat : _ t -> Unix.LargeFile.stats +(** [fstat t] is {!val:Unix.LargeFile.fstat} *) + +val dup : 'a t -> 'a t +(** [dup t] is {!val:Unix.dup} on [t]. *) + val set_nonblock : (_, [< espipe]) make -> unit (** [set_nonblock t]. @@ -191,6 +212,10 @@ val clear_nonblock : _ t -> unit @see {!Unix.clear_nonblock} *) +val setsockopt_float : + (_, [< sock]) make -> Unix.socket_float_option -> float -> unit +(** [set_sockopt_float t opt val] sets the socket option [opt] to [val] for [t]. *) + (** {1 Temporary files} *) val with_tempfile : @@ -209,6 +234,38 @@ val with_temp_blk : @param sector_size between 512 and 4096 *) +(** {1 Operation wrappers} + + The low-level {!val:read} and {!val:single_write_substring} can raise different exceptions + to mean end-of-file/disconnected depending on the file's kind. + + If you want to consider disconnectins as end-of-file then use these wrappers. + *) + +(** a buffered operation on a file descriptors. + + @see {!val:read} and {!val:single_write_substring} + *) +type ('a, 'b) operation = 'a t -> 'b -> int -> int -> int + +val repeat_read : ('a, bytes) operation -> ('a, bytes) operation +(** [repeat_read op buf off len] repeats [op] on the supplied buffer until EOF or a connection error is encountered. + The following connection errors are treated as EOF and are not reraised: + {!val:Unix.ECONNRESET}, {!val:Unix.ENOTCONN}. + {!val:Unix.EAGAIN} and {!val:Unix.EWOULDBLOCK} also cause the iteration to stop. + + The returned value may be less than [len] if EOF was encountered. +*) + +val repeat_write : ('a, string) operation -> ('a, string) operation +(** [repeat_write op buf off len] repeats [op] on the supplied buffer until a connection error is encountered or the entire buffer is written. + The following are treated as connection errors and not reraised: + {!val:Unix.ECONNRESET}, {!val:Unix.EPIPE}, {!val:Unix.ENETDOWN}, {!val:Unix.ENETUNREACH} + {!val:Unix.EAGAIN} and {!val:Unix.EWOULDBLOCK} also cause the iteration to stop. + + The returned value may be less than [len] if we were not able to complete the write due to a connection error. +*) + (**/**) module For_test : sig diff --git a/lib/xapi-fdcaps/properties.ml b/lib/xapi-fdcaps/properties.ml index 9e359a9b..d26194cf 100644 --- a/lib/xapi-fdcaps/properties.ml +++ b/lib/xapi-fdcaps/properties.ml @@ -59,6 +59,24 @@ let to_unix_kind = | #sock -> S_SOCK +let of_unix_kind = + let open Unix in + function + | S_REG -> + `reg + | S_BLK -> + `blk + | S_CHR -> + `chr + | S_DIR -> + `dir + | S_LNK -> + `lnk + | S_FIFO -> + `fifo + | S_SOCK -> + `sock + let pp_kind fmt = Fmt.using to_unix_kind Safefd.pp_kind fmt let pp fmt = diff --git a/lib/xapi-fdcaps/properties.mli b/lib/xapi-fdcaps/properties.mli index 0afc529c..6b51a3ab 100644 --- a/lib/xapi-fdcaps/properties.mli +++ b/lib/xapi-fdcaps/properties.mli @@ -161,6 +161,9 @@ val as_writable_opt : ([< rw], 'a) t -> ([> writable], 'a) t option val to_unix_kind : kind -> Unix.file_kind (** [to_unix_kind kind] converts the polymorphic variant [kind] to {!type:Unix.file_kind} *) +val of_unix_kind : Unix.file_kind -> kind +(** [of_unix_kind kind] converts the {!type:Unix.file_kind} to {!type:kind}. *) + (** pipe, FIFO or socket that may raise {!val:Unix.ESPIPE} *) type espipe = [fifo | sock] diff --git a/lib/xapi-fdcaps/test/test_operations.ml b/lib/xapi-fdcaps/test/test_operations.ml index f3c22f36..fa60e5f6 100644 --- a/lib/xapi-fdcaps/test/test_operations.ml +++ b/lib/xapi-fdcaps/test/test_operations.ml @@ -227,6 +227,45 @@ let test_creat () = let@ fd2 = with_fd @@ openfile_rw `reg name [] in pp Fmt.stdout fd2 ; read_fd fd2 ; write_fd fd2 +let test_repeat_read () = + let buf = String.init 255 Char.chr in + let read _ dst off len = + let available = String.length buf - off in + let len = Int.min len 11 in + let len = Int.min len available in + Bytes.blit_string buf off dst off len ; + len + in + let dst = Bytes.make 300 '_' in + let@ placeholder = with_fd @@ dev_zero () in + (* not actually used, just to make the types work, we simulate the read using string ops *) + let actual = repeat_read read placeholder dst 0 (Bytes.length dst) in + Alcotest.(check' int) ~msg:"amount read" ~actual ~expected:(String.length buf) ; + Alcotest.(check' string) + ~msg:"contents" + ~actual:(Bytes.sub_string dst 0 actual) + ~expected:buf + +let test_repeat_write () = + let buf = Bytes.make 255 '_' in + let write _ src off len = + let available = Bytes.length buf - off in + let len = Int.min len 11 in + let len = Int.min len available in + Bytes.blit_string src off buf off len ; + len + in + let src = String.init 255 Char.chr in + let@ placeholder = with_fd @@ dev_zero () in + (* not actually used, just to make the types work, we simulate the read using string ops *) + let actual = repeat_write write placeholder src 0 (String.length src) in + Alcotest.(check' int) + ~msg:"amount written" ~actual ~expected:(Bytes.length buf) ; + Alcotest.(check' string) + ~msg:"contents" + ~actual:(Bytes.sub_string buf 0 actual) + ~expected:src + let tests = Alcotest. [ @@ -238,6 +277,8 @@ let tests = ; test_case "socket shutdown write" `Quick test_sock_shutdown_w ; test_case "socket shutdown both" `Quick test_sock_shutdown_all ; test_case "create" `Quick test_creat + ; test_case "repeat_read" `Quick test_repeat_read + ; test_case "repeat_write" `Quick test_repeat_write ] (* this must be the last test *)