diff --git a/lib_eio/dune b/lib_eio/dune index 15546ee80..139259d5b 100644 --- a/lib_eio/dune +++ b/lib_eio/dune @@ -2,4 +2,4 @@ (name eio) (public_name eio) (flags (:standard -open Eio__core -open Eio__core.Private)) - (libraries eio__core cstruct lwt-dllist fmt bigstringaf)) + (libraries eio__core cstruct lwt-dllist fmt bigstringaf optint)) diff --git a/lib_eio/eio.ml b/lib_eio/eio.ml index d6fe93c49..6e28c206d 100644 --- a/lib_eio/eio.ml +++ b/lib_eio/eio.ml @@ -25,6 +25,7 @@ module Buf_write = Buf_write module Net = Net module Domain_manager = Domain_manager module Time = Time +module File = File module Fs = Fs module Path = Path diff --git a/lib_eio/eio.mli b/lib_eio/eio.mli index 6e396fe42..4cbbb17f2 100644 --- a/lib_eio/eio.mli +++ b/lib_eio/eio.mli @@ -134,6 +134,9 @@ module Time : sig raising exception [Timeout]. *) end +(** Operations on open files. *) +module File = File + (** File-system types. *) module Fs = Fs diff --git a/lib_eio/file.ml b/lib_eio/file.ml new file mode 100644 index 000000000..559506b0a --- /dev/null +++ b/lib_eio/file.ml @@ -0,0 +1,52 @@ +(** A file opened for reading. *) +class virtual ro = object (_ : ) + method probe _ = None + method read_methods = [] + method virtual pread : file_offset:Optint.Int63.t -> Cstruct.t list -> int +end + +(** A file opened for reading and writing. *) +class virtual rw = object (_ : ) + inherit ro + method virtual pwrite : file_offset:Optint.Int63.t -> Cstruct.t list -> int +end + +(** [pread t ~file_offset bufs] performs a single read of [t] at [file_offset] into [bufs]. + + It returns the number of bytes read, which may be less than the space in [bufs], + even if more bytes are available. Use {!pread_exact} instead if you require + the buffer to be filled. + + To read at the current offset, use {!Flow.read} instead. *) +let pread (t : #ro) ~file_offset bufs = + let got = t#pread ~file_offset bufs in + assert (got > 0 && got <= Cstruct.lenv bufs); + got + +(** [pread_exact t ~file_offset bufs] reads from [t] into [bufs] until [bufs] is full. + + @raise End_of_file if the buffer could not be filled. *) +let rec pread_exact (t : #ro) ~file_offset bufs = + if Cstruct.lenv bufs > 0 then ( + let got = t#pread ~file_offset bufs in + let file_offset = Optint.Int63.add file_offset (Optint.Int63.of_int got) in + pread_exact t ~file_offset (Cstruct.shiftv bufs got) + ) + +(** [pwrite_single t ~file_offset bufs] performs a single write operation, writing + data from [bufs] to location [file_offset] in [t]. + + It returns the number of bytes written, which may be less than the length of [bufs]. + In most cases, you will want to use {!pwrite_all} instead. *) +let pwrite_single (t : #rw) ~file_offset bufs = + let got = t#pwrite ~file_offset bufs in + assert (got > 0 && got <= Cstruct.lenv bufs); + got + +(** [pwrite_all t ~file_offset bufs] writes all the data in [bufs] to location [file_offset] in [t]. *) +let rec pwrite_all (t : #rw) ~file_offset bufs = + if Cstruct.lenv bufs > 0 then ( + let got = t#pwrite ~file_offset bufs in + let file_offset = Optint.Int63.add file_offset (Optint.Int63.of_int got) in + pwrite_all t ~file_offset (Cstruct.shiftv bufs got) + ) diff --git a/lib_eio/fs.ml b/lib_eio/fs.ml index 25fbb5ced..867ee74a4 100644 --- a/lib_eio/fs.ml +++ b/lib_eio/fs.ml @@ -12,11 +12,6 @@ exception Already_exists of path * exn exception Not_found of path * exn exception Permission_denied of path * exn -class virtual rw = object (_ : ) - method probe _ = None - method read_methods = [] -end - (** When to create a new file. *) type create = [ | `Never (** fail if the named file doesn't exist *) @@ -29,12 +24,12 @@ type create = [ (** Note: use the functions in {!Path} to access directories. *) class virtual dir = object (_ : #Generic.t) method probe _ = None - method virtual open_in : sw:Switch.t -> path -> + method virtual open_in : sw:Switch.t -> path -> method virtual open_out : sw:Switch.t -> append:bool -> create:create -> - path -> + path -> method virtual mkdir : perm:Unix_perm.t -> path -> unit method virtual open_dir : sw:Switch.t -> path -> dir_with_close method virtual read_dir : path -> string list diff --git a/lib_eio/path.mli b/lib_eio/path.mli index 4eaabbb94..6cce230da 100644 --- a/lib_eio/path.mli +++ b/lib_eio/path.mli @@ -47,12 +47,12 @@ val load : _ t -> string This is a convenience wrapper around {!with_open_in}. *) -val open_in : sw:Switch.t -> _ t -> +val open_in : sw:Switch.t -> _ t -> (** [open_in ~sw t] opens [t] for reading. Note: files are always opened in binary mode. *) -val with_open_in : _ t -> ( -> 'a) -> 'a +val with_open_in : _ t -> ( -> 'a) -> 'a (** [with_open_in] is like [open_in], but calls [fn flow] with the new flow and closes it automatically when [fn] returns (if it hasn't already been closed by then). *) @@ -72,7 +72,7 @@ val open_out : sw:Switch.t -> ?append:bool -> create:create -> - _ t -> + _ t -> (** [open_out ~sw t] opens [t] for reading and writing. Note: files are always opened in binary mode. @@ -82,7 +82,7 @@ val open_out : val with_open_out : ?append:bool -> create:create -> - _ t -> ( -> 'a) -> 'a + _ t -> ( -> 'a) -> 'a (** [with_open_out] is like [open_out], but calls [fn flow] with the new flow and closes it automatically when [fn] returns (if it hasn't already been closed by then). *) diff --git a/lib_eio_linux/eio_linux.ml b/lib_eio_linux/eio_linux.ml index 80240261f..60fb0007c 100644 --- a/lib_eio_linux/eio_linux.ml +++ b/lib_eio_linux/eio_linux.ml @@ -470,7 +470,7 @@ let rec enqueue_recv_msg fd msghdr st action = ) in if retry then (* wait until an sqe is available *) - Queue.push (fun st -> enqueue_recv_msg fd msghdr st action) st.io_q + Queue.push (fun st -> enqueue_recv_msg fd msghdr st action) st.io_q let rec enqueue_accept fd client_addr st action = Log.debug (fun l -> l "accept: submitting call"); @@ -999,20 +999,20 @@ let udp_socket sock = object method close = FD.close sock - method send sockaddr buf = - let addr = match sockaddr with - | `Udp (host, port) -> + method send sockaddr buf = + let addr = match sockaddr with + | `Udp (host, port) -> let host = Eio_unix.Ipaddr.to_unix host in Unix.ADDR_INET (host, port) in - Low_level.send_msg sock ~dst:addr [buf] - + Low_level.send_msg sock ~dst:addr [buf] + method recv buf = let addr, recv = Low_level.recv_msg sock [buf] in match Uring.Sockaddr.get addr with | Unix.ADDR_INET (inet, port) -> `Udp (Eio_unix.Ipaddr.of_unix inet, port), recv - | Unix.ADDR_UNIX _ -> + | Unix.ADDR_UNIX _ -> raise (Failure "Expected INET UDP socket address but got Unix domain socket address.") end @@ -1035,6 +1035,12 @@ let flow fd = ); Low_level.readv fd [buf] + method pread ~file_offset bufs = + Low_level.readv ~file_offset fd bufs + + method pwrite ~file_offset bufs = + Low_level.writev_single ~file_offset fd bufs + method read_methods = [] method copy src = @@ -1204,7 +1210,7 @@ class dir ~label (fd : dir_fd) = object ~flags:Uring.Open_flags.cloexec ~perm:0 in - (flow fd :> ) + (flow fd :> ) method open_out ~sw ~append ~create path = let perm, flags = @@ -1220,7 +1226,7 @@ class dir ~label (fd : dir_fd) = object ~flags:Uring.Open_flags.(cloexec + flags) ~perm in - (flow fd :> ) + (flow fd :> ) method open_dir ~sw path = let fd = Low_level.openat ~sw ~seekable:false fd path diff --git a/lib_eio_luv/eio_luv.ml b/lib_eio_luv/eio_luv.ml index 729a5acd9..6e34bf119 100644 --- a/lib_eio_luv/eio_luv.ml +++ b/lib_eio_luv/eio_luv.ml @@ -357,19 +357,24 @@ module Low_level = struct await_with_cancel ~request (fun loop -> Luv.File.open_ ~loop ?mode ~request path flags) |> Result.map (of_luv ~sw) - let read fd bufs = + let read ?file_offset fd bufs = let request = Luv.File.Request.make () in - await_with_cancel ~request (fun loop -> Luv.File.read ~loop ~request (get "read" fd) bufs) + await_with_cancel ~request (fun loop -> Luv.File.read ~loop ~request ?file_offset (get "read" fd) bufs) - let rec write fd bufs = + let write_single ?file_offset fd bufs = let request = Luv.File.Request.make () in - let sent = await_with_cancel ~request (fun loop -> Luv.File.write ~loop ~request (get "write" fd) bufs) |> or_raise in - let rec aux = function - | [] -> () - | x :: xs when Luv.Buffer.size x = 0 -> aux xs - | bufs -> write fd bufs - in - aux @@ Luv.Buffer.drop bufs (Unsigned.Size_t.to_int sent) + await_with_cancel ~request (fun loop -> Luv.File.write ~loop ~request ?file_offset (get "write" fd) bufs) + + let rec write fd bufs = + match write_single fd bufs with + | Error _ as e -> e + | Ok sent -> + let rec aux = function + | [] -> Ok () + | x :: xs when Luv.Buffer.size x = 0 -> aux xs + | bufs -> write fd bufs + in + aux @@ Luv.Buffer.drop bufs (Unsigned.Size_t.to_int sent) let realpath path = let request = Luv.File.Request.make () in @@ -548,6 +553,18 @@ let flow fd = object (_ : ) | 0 -> raise End_of_file | got -> got + method pread ~file_offset bufs = + let bufs = List.map Cstruct.to_bigarray bufs in + let file_offset = Optint.Int63.to_int64 file_offset in + match File.read ~file_offset fd bufs |> or_raise |> Unsigned.Size_t.to_int with + | 0 -> raise End_of_file + | got -> got + + method pwrite ~file_offset bufs = + let bufs = List.map Cstruct.to_bigarray bufs in + let file_offset = Optint.Int63.to_int64 file_offset in + File.write_single ~file_offset fd bufs |> or_raise |> Unsigned.Size_t.to_int + method read_methods = [] method copy src = @@ -556,7 +573,7 @@ let flow fd = object (_ : ) while true do let got = Eio.Flow.read src (Cstruct.of_bigarray buf) in let sub = Luv.Buffer.sub buf ~offset:0 ~length:got in - File.write fd [sub] + File.write fd [sub] |> or_raise done with End_of_file -> () end @@ -879,7 +896,7 @@ class dir ~label (dir_path : string) = object (self) method open_in ~sw path = let fd = File.open_ ~sw (self#resolve path) [`NOFOLLOW; `RDONLY] |> or_raise_path path in - (flow fd :> ) + (flow fd :> ) method open_out ~sw ~append ~create path = let mode, flags = @@ -896,7 +913,7 @@ class dir ~label (dir_path : string) = object (self) else self#resolve_new path in let fd = File.open_ ~sw real_path flags ~mode:[`NUMERIC mode] |> or_raise_path path in - (flow fd :> ) + (flow fd :> ) method open_dir ~sw path = Switch.check sw; diff --git a/lib_eio_luv/eio_luv.mli b/lib_eio_luv/eio_luv.mli index b8a426867..3f19416e9 100644 --- a/lib_eio_luv/eio_luv.mli +++ b/lib_eio_luv/eio_luv.mli @@ -61,11 +61,15 @@ module Low_level : sig string -> Luv.File.Open_flag.t list -> t or_error (** Wraps {!Luv.File.open_} *) - val read : t -> Luv.Buffer.t list -> Unsigned.Size_t.t or_error + val read : ?file_offset:int64 -> t -> Luv.Buffer.t list -> Unsigned.Size_t.t or_error (** Wraps {!Luv.File.read} *) - val write : t -> Luv.Buffer.t list -> unit - (** [write t bufs] writes all the data in [bufs] (which may take several calls to {!Luv.File.write}). *) + val write_single : ?file_offset:int64 -> t -> Luv.Buffer.t list -> Unsigned.Size_t.t or_error + (** [write_single t bufs] performs a single write call and returns the number of bytes written, + which may be less than the amount of data provided in [bufs]. *) + + val write : t -> Luv.Buffer.t list -> unit or_error + (** [write t bufs] writes all the data in [bufs] (which may take several calls to {!write_single}). *) val realpath : string -> string or_error (** Wraps {!Luv.File.realpath} *)