Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add API for seekable read/writes #307

Merged
merged 3 commits into from
Sep 17, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib_eio/dune
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
(name eio)
(public_name eio)
(flags (:standard -open Eio__core -open Eio__core.Private))
(libraries eio__core cstruct lwt-dllist fmt bigstringaf))
(libraries eio__core cstruct lwt-dllist fmt bigstringaf optint))
1 change: 1 addition & 0 deletions lib_eio/eio.ml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ module Buf_write = Buf_write
module Net = Net
module Domain_manager = Domain_manager
module Time = Time
module File = File
module Fs = Fs
module Path = Path

Expand Down
3 changes: 3 additions & 0 deletions lib_eio/eio.mli
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ module Time : sig
raising exception [Timeout]. *)
end

(** Operations on open files. *)
module File = File

(** File-system types. *)
module Fs = Fs

Expand Down
52 changes: 52 additions & 0 deletions lib_eio/file.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
(** A file opened for reading. *)
class virtual ro = object (_ : <Generic.t; Flow.source; ..>)
method probe _ = None
method read_methods = []
method virtual pread : file_offset:Optint.Int63.t -> Cstruct.t list -> int
end

(** A file opened for reading and writing. *)
class virtual rw = object (_ : <Generic.t; Flow.source; Flow.sink; ..>)
inherit ro
method virtual pwrite : file_offset:Optint.Int63.t -> Cstruct.t list -> int
end

(** [pread t ~file_offset bufs] performs a single read of [t] at [file_offset] into [bufs].

It returns the number of bytes read, which may be less than the space in [bufs],
even if more bytes are available. Use {!pread_exact} instead if you require
the buffer to be filled.

To read at the current offset, use {!Flow.read} instead. *)
let pread (t : #ro) ~file_offset bufs =
let got = t#pread ~file_offset bufs in
assert (got > 0 && got <= Cstruct.lenv bufs);
got

(** [pread_exact t ~file_offset bufs] reads from [t] into [bufs] until [bufs] is full.

@raise End_of_file if the buffer could not be filled. *)
let rec pread_exact (t : #ro) ~file_offset bufs =
if Cstruct.lenv bufs > 0 then (
let got = t#pread ~file_offset bufs in
let file_offset = Optint.Int63.add file_offset (Optint.Int63.of_int got) in
pread_exact t ~file_offset (Cstruct.shiftv bufs got)
)

(** [pwrite_single t ~file_offset bufs] performs a single write operation, writing
data from [bufs] to location [file_offset] in [t].

It returns the number of bytes written, which may be less than the length of [bufs].
In most cases, you will want to use {!pwrite_all} instead. *)
let pwrite_single (t : #rw) ~file_offset bufs =
let got = t#pwrite ~file_offset bufs in
assert (got > 0 && got <= Cstruct.lenv bufs);
got

(** [pwrite_all t ~file_offset bufs] writes all the data in [bufs] to location [file_offset] in [t]. *)
let rec pwrite_all (t : #rw) ~file_offset bufs =
if Cstruct.lenv bufs > 0 then (
let got = t#pwrite ~file_offset bufs in
let file_offset = Optint.Int63.add file_offset (Optint.Int63.of_int got) in
pwrite_all t ~file_offset (Cstruct.shiftv bufs got)
)
9 changes: 2 additions & 7 deletions lib_eio/fs.ml
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,6 @@ exception Already_exists of path * exn
exception Not_found of path * exn
exception Permission_denied of path * exn

class virtual rw = object (_ : <Generic.t; Flow.source; Flow.sink; ..>)
method probe _ = None
method read_methods = []
end

(** When to create a new file. *)
type create = [
| `Never (** fail if the named file doesn't exist *)
Expand All @@ -29,12 +24,12 @@ type create = [
(** Note: use the functions in {!Path} to access directories. *)
class virtual dir = object (_ : #Generic.t)
method probe _ = None
method virtual open_in : sw:Switch.t -> path -> <Flow.source; Flow.close>
method virtual open_in : sw:Switch.t -> path -> <File.ro; Flow.close>
method virtual open_out :
sw:Switch.t ->
append:bool ->
create:create ->
path -> <rw; Flow.close>
path -> <File.rw; Flow.close>
method virtual mkdir : perm:Unix_perm.t -> path -> unit
method virtual open_dir : sw:Switch.t -> path -> dir_with_close
method virtual read_dir : path -> string list
Expand Down
8 changes: 4 additions & 4 deletions lib_eio/path.mli
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ val load : _ t -> string

This is a convenience wrapper around {!with_open_in}. *)

val open_in : sw:Switch.t -> _ t -> <Flow.source; Flow.close>
val open_in : sw:Switch.t -> _ t -> <File.ro; Flow.close>
(** [open_in ~sw t] opens [t] for reading.

Note: files are always opened in binary mode. *)

val with_open_in : _ t -> (<Flow.source; Flow.close> -> 'a) -> 'a
val with_open_in : _ t -> (<File.ro; Flow.close> -> 'a) -> 'a
(** [with_open_in] is like [open_in], but calls [fn flow] with the new flow and closes
it automatically when [fn] returns (if it hasn't already been closed by then). *)

Expand All @@ -72,7 +72,7 @@ val open_out :
sw:Switch.t ->
?append:bool ->
create:create ->
_ t -> <rw; Flow.close>
_ t -> <File.rw; Flow.close>
(** [open_out ~sw t] opens [t] for reading and writing.

Note: files are always opened in binary mode.
Expand All @@ -82,7 +82,7 @@ val open_out :
val with_open_out :
?append:bool ->
create:create ->
_ t -> (<rw; Flow.close> -> 'a) -> 'a
_ t -> (<File.rw; Flow.close> -> 'a) -> 'a
(** [with_open_out] is like [open_out], but calls [fn flow] with the new flow and closes
it automatically when [fn] returns (if it hasn't already been closed by then). *)

Expand Down
24 changes: 15 additions & 9 deletions lib_eio_linux/eio_linux.ml
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ let rec enqueue_recv_msg fd msghdr st action =
)
in
if retry then (* wait until an sqe is available *)
Queue.push (fun st -> enqueue_recv_msg fd msghdr st action) st.io_q
Queue.push (fun st -> enqueue_recv_msg fd msghdr st action) st.io_q

let rec enqueue_accept fd client_addr st action =
Log.debug (fun l -> l "accept: submitting call");
Expand Down Expand Up @@ -999,20 +999,20 @@ let udp_socket sock = object

method close = FD.close sock

method send sockaddr buf =
let addr = match sockaddr with
| `Udp (host, port) ->
method send sockaddr buf =
let addr = match sockaddr with
| `Udp (host, port) ->
let host = Eio_unix.Ipaddr.to_unix host in
Unix.ADDR_INET (host, port)
in
Low_level.send_msg sock ~dst:addr [buf]
Low_level.send_msg sock ~dst:addr [buf]

method recv buf =
let addr, recv = Low_level.recv_msg sock [buf] in
match Uring.Sockaddr.get addr with
| Unix.ADDR_INET (inet, port) ->
`Udp (Eio_unix.Ipaddr.of_unix inet, port), recv
| Unix.ADDR_UNIX _ ->
| Unix.ADDR_UNIX _ ->
raise (Failure "Expected INET UDP socket address but got Unix domain socket address.")
end

Expand All @@ -1035,6 +1035,12 @@ let flow fd =
);
Low_level.readv fd [buf]

method pread ~file_offset bufs =
Low_level.readv ~file_offset fd bufs

method pwrite ~file_offset bufs =
Low_level.writev_single ~file_offset fd bufs

method read_methods = []

method copy src =
Expand Down Expand Up @@ -1204,7 +1210,7 @@ class dir ~label (fd : dir_fd) = object
~flags:Uring.Open_flags.cloexec
~perm:0
in
(flow fd :> <Eio.Flow.source; Eio.Flow.close>)
(flow fd :> <Eio.File.ro; Eio.Flow.close>)

method open_out ~sw ~append ~create path =
let perm, flags =
Expand All @@ -1220,7 +1226,7 @@ class dir ~label (fd : dir_fd) = object
~flags:Uring.Open_flags.(cloexec + flags)
~perm
in
(flow fd :> <Eio.Fs.rw; Eio.Flow.close>)
(flow fd :> <Eio.File.rw; Eio.Flow.close>)

method open_dir ~sw path =
let fd = Low_level.openat ~sw ~seekable:false fd path
Expand Down
24 changes: 20 additions & 4 deletions lib_eio_luv/eio_luv.ml
Original file line number Diff line number Diff line change
Expand Up @@ -357,9 +357,13 @@ module Low_level = struct
await_with_cancel ~request (fun loop -> Luv.File.open_ ~loop ?mode ~request path flags)
|> Result.map (of_luv ~sw)

let read fd bufs =
let read ?file_offset fd bufs =
let request = Luv.File.Request.make () in
await_with_cancel ~request (fun loop -> Luv.File.read ~loop ~request (get "read" fd) bufs)
await_with_cancel ~request (fun loop -> Luv.File.read ~loop ~request ?file_offset (get "read" fd) bufs)

let write_single ?file_offset fd bufs =
let request = Luv.File.Request.make () in
await_with_cancel ~request (fun loop -> Luv.File.write ~loop ~request ?file_offset (get "write" fd) bufs)

let rec write fd bufs =
let request = Luv.File.Request.make () in
Expand Down Expand Up @@ -548,6 +552,18 @@ let flow fd = object (_ : <source; sink; ..>)
| 0 -> raise End_of_file
| got -> got

method pread ~file_offset bufs =
let bufs = List.map Cstruct.to_bigarray bufs in
let file_offset = Optint.Int63.to_int64 file_offset in
match File.read ~file_offset fd bufs |> or_raise |> Unsigned.Size_t.to_int with
| 0 -> raise End_of_file
| got -> got

method pwrite ~file_offset bufs =
let bufs = List.map Cstruct.to_bigarray bufs in
let file_offset = Optint.Int63.to_int64 file_offset in
File.write_single ~file_offset fd bufs |> or_raise |> Unsigned.Size_t.to_int

method read_methods = []

method copy src =
Expand Down Expand Up @@ -879,7 +895,7 @@ class dir ~label (dir_path : string) = object (self)

method open_in ~sw path =
let fd = File.open_ ~sw (self#resolve path) [`NOFOLLOW; `RDONLY] |> or_raise_path path in
(flow fd :> <Eio.Flow.source; Eio.Flow.close>)
(flow fd :> <Eio.File.ro; Eio.Flow.close>)

method open_out ~sw ~append ~create path =
let mode, flags =
Expand All @@ -896,7 +912,7 @@ class dir ~label (dir_path : string) = object (self)
else self#resolve_new path
in
let fd = File.open_ ~sw real_path flags ~mode:[`NUMERIC mode] |> or_raise_path path in
(flow fd :> <Eio.Fs.rw; Eio.Flow.close>)
(flow fd :> <Eio.File.rw; Eio.Flow.close>)

method open_dir ~sw path =
Switch.check sw;
Expand Down
2 changes: 1 addition & 1 deletion lib_eio_luv/eio_luv.mli
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ module Low_level : sig
string -> Luv.File.Open_flag.t list -> t or_error
(** Wraps {!Luv.File.open_} *)

val read : t -> Luv.Buffer.t list -> Unsigned.Size_t.t or_error
val read : ?file_offset:int64 -> t -> Luv.Buffer.t list -> Unsigned.Size_t.t or_error
(** Wraps {!Luv.File.read} *)

val write : t -> Luv.Buffer.t list -> unit
Expand Down