Skip to content

Commit

Permalink
Merge pull request #307 from nojb/seek
Browse files Browse the repository at this point in the history
Add API for seekable read/writes
  • Loading branch information
talex5 authored Sep 17, 2022
2 parents 995e255 + 5be0ec4 commit 5a77787
Show file tree
Hide file tree
Showing 9 changed files with 115 additions and 37 deletions.
2 changes: 1 addition & 1 deletion lib_eio/dune
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
(name eio)
(public_name eio)
(flags (:standard -open Eio__core -open Eio__core.Private))
(libraries eio__core cstruct lwt-dllist fmt bigstringaf))
(libraries eio__core cstruct lwt-dllist fmt bigstringaf optint))
1 change: 1 addition & 0 deletions lib_eio/eio.ml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ module Buf_write = Buf_write
module Net = Net
module Domain_manager = Domain_manager
module Time = Time
module File = File
module Fs = Fs
module Path = Path

Expand Down
3 changes: 3 additions & 0 deletions lib_eio/eio.mli
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ module Time : sig
raising exception [Timeout]. *)
end

(** Operations on open files. *)
module File = File

(** File-system types. *)
module Fs = Fs

Expand Down
52 changes: 52 additions & 0 deletions lib_eio/file.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
(** A file opened for reading. *)
class virtual ro = object (_ : <Generic.t; Flow.source; ..>)
method probe _ = None
method read_methods = []
method virtual pread : file_offset:Optint.Int63.t -> Cstruct.t list -> int
end

(** A file opened for reading and writing. *)
class virtual rw = object (_ : <Generic.t; Flow.source; Flow.sink; ..>)
inherit ro
method virtual pwrite : file_offset:Optint.Int63.t -> Cstruct.t list -> int
end

(** [pread t ~file_offset bufs] performs a single read of [t] at [file_offset] into [bufs].
It returns the number of bytes read, which may be less than the space in [bufs],
even if more bytes are available. Use {!pread_exact} instead if you require
the buffer to be filled.
To read at the current offset, use {!Flow.read} instead. *)
let pread (t : #ro) ~file_offset bufs =
let got = t#pread ~file_offset bufs in
assert (got > 0 && got <= Cstruct.lenv bufs);
got

(** [pread_exact t ~file_offset bufs] reads from [t] into [bufs] until [bufs] is full.
@raise End_of_file if the buffer could not be filled. *)
let rec pread_exact (t : #ro) ~file_offset bufs =
if Cstruct.lenv bufs > 0 then (
let got = t#pread ~file_offset bufs in
let file_offset = Optint.Int63.add file_offset (Optint.Int63.of_int got) in
pread_exact t ~file_offset (Cstruct.shiftv bufs got)
)

(** [pwrite_single t ~file_offset bufs] performs a single write operation, writing
data from [bufs] to location [file_offset] in [t].
It returns the number of bytes written, which may be less than the length of [bufs].
In most cases, you will want to use {!pwrite_all} instead. *)
let pwrite_single (t : #rw) ~file_offset bufs =
let got = t#pwrite ~file_offset bufs in
assert (got > 0 && got <= Cstruct.lenv bufs);
got

(** [pwrite_all t ~file_offset bufs] writes all the data in [bufs] to location [file_offset] in [t]. *)
let rec pwrite_all (t : #rw) ~file_offset bufs =
if Cstruct.lenv bufs > 0 then (
let got = t#pwrite ~file_offset bufs in
let file_offset = Optint.Int63.add file_offset (Optint.Int63.of_int got) in
pwrite_all t ~file_offset (Cstruct.shiftv bufs got)
)
9 changes: 2 additions & 7 deletions lib_eio/fs.ml
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,6 @@ exception Already_exists of path * exn
exception Not_found of path * exn
exception Permission_denied of path * exn

class virtual rw = object (_ : <Generic.t; Flow.source; Flow.sink; ..>)
method probe _ = None
method read_methods = []
end

(** When to create a new file. *)
type create = [
| `Never (** fail if the named file doesn't exist *)
Expand All @@ -29,12 +24,12 @@ type create = [
(** Note: use the functions in {!Path} to access directories. *)
class virtual dir = object (_ : #Generic.t)
method probe _ = None
method virtual open_in : sw:Switch.t -> path -> <Flow.source; Flow.close>
method virtual open_in : sw:Switch.t -> path -> <File.ro; Flow.close>
method virtual open_out :
sw:Switch.t ->
append:bool ->
create:create ->
path -> <rw; Flow.close>
path -> <File.rw; Flow.close>
method virtual mkdir : perm:Unix_perm.t -> path -> unit
method virtual open_dir : sw:Switch.t -> path -> dir_with_close
method virtual read_dir : path -> string list
Expand Down
8 changes: 4 additions & 4 deletions lib_eio/path.mli
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ val load : _ t -> string
This is a convenience wrapper around {!with_open_in}. *)

val open_in : sw:Switch.t -> _ t -> <Flow.source; Flow.close>
val open_in : sw:Switch.t -> _ t -> <File.ro; Flow.close>
(** [open_in ~sw t] opens [t] for reading.
Note: files are always opened in binary mode. *)

val with_open_in : _ t -> (<Flow.source; Flow.close> -> 'a) -> 'a
val with_open_in : _ t -> (<File.ro; Flow.close> -> 'a) -> 'a
(** [with_open_in] is like [open_in], but calls [fn flow] with the new flow and closes
it automatically when [fn] returns (if it hasn't already been closed by then). *)

Expand All @@ -72,7 +72,7 @@ val open_out :
sw:Switch.t ->
?append:bool ->
create:create ->
_ t -> <rw; Flow.close>
_ t -> <File.rw; Flow.close>
(** [open_out ~sw t] opens [t] for reading and writing.
Note: files are always opened in binary mode.
Expand All @@ -82,7 +82,7 @@ val open_out :
val with_open_out :
?append:bool ->
create:create ->
_ t -> (<rw; Flow.close> -> 'a) -> 'a
_ t -> (<File.rw; Flow.close> -> 'a) -> 'a
(** [with_open_out] is like [open_out], but calls [fn flow] with the new flow and closes
it automatically when [fn] returns (if it hasn't already been closed by then). *)

Expand Down
24 changes: 15 additions & 9 deletions lib_eio_linux/eio_linux.ml
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ let rec enqueue_recv_msg fd msghdr st action =
)
in
if retry then (* wait until an sqe is available *)
Queue.push (fun st -> enqueue_recv_msg fd msghdr st action) st.io_q
Queue.push (fun st -> enqueue_recv_msg fd msghdr st action) st.io_q

let rec enqueue_accept fd client_addr st action =
Log.debug (fun l -> l "accept: submitting call");
Expand Down Expand Up @@ -999,20 +999,20 @@ let udp_socket sock = object

method close = FD.close sock

method send sockaddr buf =
let addr = match sockaddr with
| `Udp (host, port) ->
method send sockaddr buf =
let addr = match sockaddr with
| `Udp (host, port) ->
let host = Eio_unix.Ipaddr.to_unix host in
Unix.ADDR_INET (host, port)
in
Low_level.send_msg sock ~dst:addr [buf]
Low_level.send_msg sock ~dst:addr [buf]

method recv buf =
let addr, recv = Low_level.recv_msg sock [buf] in
match Uring.Sockaddr.get addr with
| Unix.ADDR_INET (inet, port) ->
`Udp (Eio_unix.Ipaddr.of_unix inet, port), recv
| Unix.ADDR_UNIX _ ->
| Unix.ADDR_UNIX _ ->
raise (Failure "Expected INET UDP socket address but got Unix domain socket address.")
end

Expand All @@ -1035,6 +1035,12 @@ let flow fd =
);
Low_level.readv fd [buf]

method pread ~file_offset bufs =
Low_level.readv ~file_offset fd bufs

method pwrite ~file_offset bufs =
Low_level.writev_single ~file_offset fd bufs

method read_methods = []

method copy src =
Expand Down Expand Up @@ -1204,7 +1210,7 @@ class dir ~label (fd : dir_fd) = object
~flags:Uring.Open_flags.cloexec
~perm:0
in
(flow fd :> <Eio.Flow.source; Eio.Flow.close>)
(flow fd :> <Eio.File.ro; Eio.Flow.close>)

method open_out ~sw ~append ~create path =
let perm, flags =
Expand All @@ -1220,7 +1226,7 @@ class dir ~label (fd : dir_fd) = object
~flags:Uring.Open_flags.(cloexec + flags)
~perm
in
(flow fd :> <Eio.Fs.rw; Eio.Flow.close>)
(flow fd :> <Eio.File.rw; Eio.Flow.close>)

method open_dir ~sw path =
let fd = Low_level.openat ~sw ~seekable:false fd path
Expand Down
43 changes: 30 additions & 13 deletions lib_eio_luv/eio_luv.ml
Original file line number Diff line number Diff line change
Expand Up @@ -357,19 +357,24 @@ module Low_level = struct
await_with_cancel ~request (fun loop -> Luv.File.open_ ~loop ?mode ~request path flags)
|> Result.map (of_luv ~sw)

let read fd bufs =
let read ?file_offset fd bufs =
let request = Luv.File.Request.make () in
await_with_cancel ~request (fun loop -> Luv.File.read ~loop ~request (get "read" fd) bufs)
await_with_cancel ~request (fun loop -> Luv.File.read ~loop ~request ?file_offset (get "read" fd) bufs)

let rec write fd bufs =
let write_single ?file_offset fd bufs =
let request = Luv.File.Request.make () in
let sent = await_with_cancel ~request (fun loop -> Luv.File.write ~loop ~request (get "write" fd) bufs) |> or_raise in
let rec aux = function
| [] -> ()
| x :: xs when Luv.Buffer.size x = 0 -> aux xs
| bufs -> write fd bufs
in
aux @@ Luv.Buffer.drop bufs (Unsigned.Size_t.to_int sent)
await_with_cancel ~request (fun loop -> Luv.File.write ~loop ~request ?file_offset (get "write" fd) bufs)

let rec write fd bufs =
match write_single fd bufs with
| Error _ as e -> e
| Ok sent ->
let rec aux = function
| [] -> Ok ()
| x :: xs when Luv.Buffer.size x = 0 -> aux xs
| bufs -> write fd bufs
in
aux @@ Luv.Buffer.drop bufs (Unsigned.Size_t.to_int sent)

let realpath path =
let request = Luv.File.Request.make () in
Expand Down Expand Up @@ -548,6 +553,18 @@ let flow fd = object (_ : <source; sink; ..>)
| 0 -> raise End_of_file
| got -> got

method pread ~file_offset bufs =
let bufs = List.map Cstruct.to_bigarray bufs in
let file_offset = Optint.Int63.to_int64 file_offset in
match File.read ~file_offset fd bufs |> or_raise |> Unsigned.Size_t.to_int with
| 0 -> raise End_of_file
| got -> got

method pwrite ~file_offset bufs =
let bufs = List.map Cstruct.to_bigarray bufs in
let file_offset = Optint.Int63.to_int64 file_offset in
File.write_single ~file_offset fd bufs |> or_raise |> Unsigned.Size_t.to_int

method read_methods = []

method copy src =
Expand All @@ -556,7 +573,7 @@ let flow fd = object (_ : <source; sink; ..>)
while true do
let got = Eio.Flow.read src (Cstruct.of_bigarray buf) in
let sub = Luv.Buffer.sub buf ~offset:0 ~length:got in
File.write fd [sub]
File.write fd [sub] |> or_raise
done
with End_of_file -> ()
end
Expand Down Expand Up @@ -879,7 +896,7 @@ class dir ~label (dir_path : string) = object (self)

method open_in ~sw path =
let fd = File.open_ ~sw (self#resolve path) [`NOFOLLOW; `RDONLY] |> or_raise_path path in
(flow fd :> <Eio.Flow.source; Eio.Flow.close>)
(flow fd :> <Eio.File.ro; Eio.Flow.close>)

method open_out ~sw ~append ~create path =
let mode, flags =
Expand All @@ -896,7 +913,7 @@ class dir ~label (dir_path : string) = object (self)
else self#resolve_new path
in
let fd = File.open_ ~sw real_path flags ~mode:[`NUMERIC mode] |> or_raise_path path in
(flow fd :> <Eio.Fs.rw; Eio.Flow.close>)
(flow fd :> <Eio.File.rw; Eio.Flow.close>)

method open_dir ~sw path =
Switch.check sw;
Expand Down
10 changes: 7 additions & 3 deletions lib_eio_luv/eio_luv.mli
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,15 @@ module Low_level : sig
string -> Luv.File.Open_flag.t list -> t or_error
(** Wraps {!Luv.File.open_} *)

val read : t -> Luv.Buffer.t list -> Unsigned.Size_t.t or_error
val read : ?file_offset:int64 -> t -> Luv.Buffer.t list -> Unsigned.Size_t.t or_error
(** Wraps {!Luv.File.read} *)

val write : t -> Luv.Buffer.t list -> unit
(** [write t bufs] writes all the data in [bufs] (which may take several calls to {!Luv.File.write}). *)
val write_single : ?file_offset:int64 -> t -> Luv.Buffer.t list -> Unsigned.Size_t.t or_error
(** [write_single t bufs] performs a single write call and returns the number of bytes written,
which may be less than the amount of data provided in [bufs]. *)

val write : t -> Luv.Buffer.t list -> unit or_error
(** [write t bufs] writes all the data in [bufs] (which may take several calls to {!write_single}). *)

val realpath : string -> string or_error
(** Wraps {!Luv.File.realpath} *)
Expand Down

0 comments on commit 5a77787

Please sign in to comment.