Skip to content

Commit

Permalink
Merge pull request #705 from talex5/linux-ll
Browse files Browse the repository at this point in the history
eio_linux: expose more functions in the Low_level module
  • Loading branch information
talex5 authored Feb 29, 2024
2 parents 7b58999 + fb731b5 commit 5e66181
Show file tree
Hide file tree
Showing 5 changed files with 266 additions and 225 deletions.
13 changes: 10 additions & 3 deletions lib_eio_linux/eio_linux.ml
Original file line number Diff line number Diff line change
Expand Up @@ -532,11 +532,18 @@ end = struct
let d = v ~label ~path:(native_internal t path) (Low_level.FD fd) in
Eio.Resource.T (d, Dir_handler.v)

let mkdir t ~perm path = Low_level.mkdir_beneath ~perm t.fd path
let mkdir t ~perm path = Low_level.mkdir ~perm t.fd path

let read_dir t path =
Switch.run ~name:"read_dir" @@ fun sw ->
let fd = Low_level.open_dir ~sw t.fd (if path = "" then "." else path) in
let path = if path = "" then "." else path in
let fd =
Low_level.openat ~sw t.fd path
~seekable:false
~access:`R
~flags:Uring.Open_flags.(cloexec + directory)
~perm:0
in
Low_level.read_dir fd

let read_link t path = Low_level.read_link t.fd path
Expand All @@ -562,7 +569,7 @@ end = struct
if !Sched.statx_works then (
let module X = Uring.Statx in
let x = X.create () in
Low_level.statx_confined ~follow ~mask:X.Mask.basic_stats t.fd path x;
Low_level.statx ~follow ~mask:X.Mask.basic_stats t.fd path x;
{ Eio.File.Stat.
dev = X.dev x;
ino = X.ino x;
Expand Down
205 changes: 1 addition & 204 deletions lib_eio_linux/eio_linux.mli
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*)

open Eio.Std

type fd := Eio_unix.Fd.t

(** {1 Main Loop} *)

type stdenv = Eio_unix.Stdenv.base
Expand All @@ -51,203 +47,4 @@ val run :
The argument is a message describing the problem (for logging).
The default simply raises an exception. *)

(** {1 Low-level API} *)

(** Low-level API for using uring directly. *)
module Low_level : sig
val noop : unit -> unit
(** [noop ()] performs a uring noop. This is only useful for benchmarking. *)

(** {1 Time functions} *)

val sleep_until : Mtime.t -> unit
(** [sleep_until time] blocks until the current time is [time]. *)

(** {1 Fixed-buffer memory allocation functions}
The size of the fixed buffer is set when calling {!run}, which attempts to allocate a fixed buffer.
However, that may fail due to resource limits. *)

val alloc_fixed : unit -> Uring.Region.chunk option
(** Allocate a chunk of memory from the fixed buffer.
Warning: The memory is NOT zeroed out.
Passing such memory to Linux can be faster than using normal memory, in certain cases.
There is a limited amount of such memory, and this will return [None] if none is available at present. *)

val alloc_fixed_or_wait : unit -> Uring.Region.chunk
(** Like {!alloc_fixed}, but if there are no chunks available then it waits until one is. *)

val free_fixed : Uring.Region.chunk -> unit

val with_chunk : fallback:(unit -> 'a) -> (Uring.Region.chunk -> 'a) -> 'a
(** [with_chunk ~fallback fn] runs [fn chunk] with a freshly allocated chunk and then frees it.
If no chunks are available, it runs [fallback ()] instead. *)

(** {1 File manipulation functions} *)

val openat2 :
sw:Switch.t ->
?seekable:bool ->
access:[`R|`W|`RW] ->
flags:Uring.Open_flags.t ->
perm:Unix.file_perm ->
resolve:Uring.Resolve.t ->
?dir:fd -> string -> fd
(** [openat2 ~sw ~flags ~perm ~resolve ~dir path] opens [dir/path].
See {!Uring.openat2} for details. *)

val read_upto : ?file_offset:Optint.Int63.t -> fd -> Uring.Region.chunk -> int -> int
(** [read_upto fd chunk len] reads at most [len] bytes from [fd],
returning as soon as some data is available.
@param file_offset Read from the given position in [fd] (default: 0).
@raise End_of_file Raised if all data has already been read. *)

val read_exactly : ?file_offset:Optint.Int63.t -> fd -> Uring.Region.chunk -> int -> unit
(** [read_exactly fd chunk len] reads exactly [len] bytes from [fd],
performing multiple read operations if necessary.
@param file_offset Read from the given position in [fd] (default: 0).
@raise End_of_file Raised if the stream ends before [len] bytes have been read. *)

val readv : ?file_offset:Optint.Int63.t -> fd -> Cstruct.t list -> int
(** [readv] is like {!read_upto} but can read into any cstruct(s),
not just chunks of the pre-shared buffer.
If multiple buffers are given, they are filled in order. *)

val write : ?file_offset:Optint.Int63.t -> fd -> Uring.Region.chunk -> int -> unit
(** [write fd buf len] writes exactly [len] bytes from [buf] to [fd].
It blocks until the OS confirms the write is done,
and resubmits automatically if the OS doesn't write all of it at once. *)

val writev : ?file_offset:Optint.Int63.t -> fd -> Cstruct.t list -> unit
(** [writev] is like {!write} but can write from any cstruct(s),
not just chunks of the pre-shared buffer.
If multiple buffers are given, they are sent in order.
It will make multiple OS calls if the OS doesn't write all of it at once. *)

val writev_single : ?file_offset:Optint.Int63.t -> fd -> Cstruct.t list -> int
(** [writev_single] is like [writev] but only performs a single write operation.
It returns the number of bytes written, which may be smaller than the requested amount. *)

val splice : fd -> dst:fd -> len:int -> int
(** [splice src ~dst ~len] attempts to copy up to [len] bytes of data from [src] to [dst].
@return The number of bytes copied.
@raise End_of_file [src] is at the end of the file.
@raise Unix.Unix_error(EINVAL, "splice", _) if splice is not supported for these FDs. *)

val connect : fd -> Unix.sockaddr -> unit
(** [connect fd addr] attempts to connect socket [fd] to [addr]. *)

val await_readable : fd -> unit
(** [await_readable fd] blocks until [fd] is readable (or has an error). *)

val await_writable : fd -> unit
(** [await_writable fd] blocks until [fd] is writable (or has an error). *)

val fstat : fd -> Eio.File.Stat.t
(** Like {!Unix.LargeFile.fstat}. *)

val statx : ?fd:fd -> mask:Uring.Statx.Mask.t -> string -> Uring.Statx.t -> Uring.Statx.Flags.t -> unit
(** [statx t ?fd ~mask path buf flags] stats [path], which is resolved relative to [fd]
(or the current directory if [fd] is not given).
The results are written to [buf]. *)

val read_dir : fd -> string list
(** [read_dir dir] reads all directory entries from [dir].
The entries are not returned in any particular order
(not even necessarily the order in which Linux returns them). *)

val lseek : fd -> Optint.Int63.t -> [`Set | `Cur | `End] -> Optint.Int63.t
(** Set and/or get the current file position.
Like {!Unix.lseek}. *)

val fsync : fd -> unit
(** Flush file buffers to disk.
Like {!Unix.fsync}. *)

val ftruncate : fd -> Optint.Int63.t -> unit
(** Set the length of a file.
Like {!Unix.ftruncate}. *)

(** {1 Sockets} *)

val accept : sw:Switch.t -> fd -> (fd * Unix.sockaddr)
(** [accept ~sw t] blocks until a new connection is received on listening socket [t].
It returns the new connection and the address of the connecting peer.
The new connection has the close-on-exec flag set automatically.
The new connection is attached to [sw] and will be closed when that finishes, if
not already closed manually by then. *)

val shutdown : fd -> Unix.shutdown_command -> unit
(** Like {!Unix.shutdown}. *)

val send_msg : fd -> ?fds:fd list -> ?dst:Unix.sockaddr -> Cstruct.t list -> int
(** [send_msg socket bufs] is like [writev socket bufs], but also allows setting the destination address
(for unconnected sockets) and attaching FDs (for Unix-domain sockets). *)

val recv_msg : fd -> Cstruct.t list -> Uring.Sockaddr.t * int
(** [recv_msg socket bufs] is like [readv socket bufs] but also returns the address of the sender. *)

val recv_msg_with_fds : sw:Switch.t -> max_fds:int -> fd -> Cstruct.t list -> Uring.Sockaddr.t * int * fd list
(** [recv_msg_with_fds] is like [recv_msg] but also allows receiving up to [max_fds] file descriptors
(sent using SCM_RIGHTS over a Unix domain socket). *)

(** {1 Randomness} *)

val getrandom : Cstruct.t -> unit
(**[getrandom buf] fills [buf] with random bytes.
It uses Linux's [getrandom] call, which is like reading from /dev/urandom
except that it will block (the whole domain) if used at early boot
when the random system hasn't been initialised yet. *)

(** {1 DNS functions} *)

val getaddrinfo : service:string -> string -> Eio.Net.Sockaddr.t list
(** [getaddrinfo host] returns a list of IP addresses for [host]. [host] is either a domain name or
an ipaddress. *)

(** {1 Processes} *)

module Process : sig
type t
(** A child process. *)

module Fork_action = Eio_unix.Private.Fork_action
(** Setup actions to perform in the child process. *)

val spawn : sw:Switch.t -> Fork_action.t list -> t
(** [spawn ~sw actions] forks a child process, which executes [actions].
The last action should be {!Fork_action.execve}.
You will typically want to do [Promise.await (exit_status child)] after this.
@param sw The child will be sent {!Sys.sigkill} if [sw] finishes. *)

val signal : t -> int -> unit
(** [signal t x] sends signal [x] to [t].
This is similar to doing [Unix.kill t.pid x],
except that it ensures no signal is sent after [t] has been reaped. *)

val pid : t -> int

val exit_status : t -> Unix.process_status Promise.t
(** [exit_status t] is a promise for the process's exit status. *)
end

end
module Low_level = Low_level
21 changes: 8 additions & 13 deletions lib_eio_linux/low_level.ml
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ let with_parent_dir op dir path fn =
Fd.use_exn op parent @@ fun parent ->
fn parent leaf

let statx ?fd ~mask path buf flags =
let statx_raw ?fd ~mask path buf flags =
let res =
match fd with
| None -> Sched.enter "statx" (enqueue_statx (None, path, buf, flags, mask))
Expand All @@ -411,24 +411,25 @@ let statx ?fd ~mask path buf flags =
in
if res <> 0 then raise @@ Err.wrap_fs (Uring.error_of_errno res) "statx" path

let statx_confined ~mask ~follow fd path buf =
let statx ~mask ~follow fd path buf =
let module X = Uring.Statx in
let flags = if follow then X.Flags.empty else X.Flags.symlink_nofollow in
let flags = if follow then X.Flags.empty_path else X.Flags.(empty_path + symlink_nofollow) in
match fd with
| Fs -> statx ~mask path buf flags
| Fs -> statx_raw ~mask path buf flags
| FD fd when path = "" -> statx_raw ~fd ~mask "" buf flags
| Cwd | FD _ when not follow ->
with_parent_dir_fd fd path @@ fun parent leaf ->
statx ~mask ~fd:parent leaf buf flags
statx_raw ~mask ~fd:parent leaf buf flags
| Cwd | FD _ ->
Switch.run ~name:"statx" @@ fun sw ->
let fd = openat ~sw ~seekable:false fd (if path = "" then "." else path)
~access:`R
~flags:Uring.Open_flags.(cloexec + path)
~perm:0
in
statx ~fd ~mask "" buf Uring.Statx.Flags.(flags + empty_path)
statx_raw ~fd ~mask "" buf flags

let mkdir_beneath ~perm dir path =
let mkdir ~perm dir path =
(* [mkdir] is really an operation on [path]'s parent. Get a reference to that first: *)
with_parent_dir "mkdir" dir path @@ fun parent leaf ->
try eio_mkdirat parent leaf perm
Expand Down Expand Up @@ -470,12 +471,6 @@ let accept ~sw fd =
client, client_addr
)

let open_dir ~sw dir path =
openat ~sw ~seekable:false dir path
~access:`R
~flags:Uring.Open_flags.(cloexec + directory)
~perm:0

let read_dir fd =
Fd.use_exn "read_dir" fd @@ fun fd ->
let rec read_all acc fd =
Expand Down
Loading

0 comments on commit 5e66181

Please sign in to comment.