Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experimental Eio-port #113

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,7 @@
_build
_opam
.vscode

tar.*
lwt_eio.*
eioio
4 changes: 3 additions & 1 deletion dune
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,6 @@
(name main)
(package obuilder)
(preprocess (pps ppx_deriving.show))
(libraries lwt lwt.unix fmt fmt.cli fmt.tty tar-unix obuilder cmdliner logs.fmt logs.cli))
(libraries eio_main fmt fmt.cli fmt.tty tar-unix obuilder cmdliner logs.fmt logs.cli))

(vendored_dirs lwt_eio.0.2 eioio tar.2.0.1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's called eio now. You might want to rename your fork.

134 changes: 67 additions & 67 deletions lib/btrfs_store.ml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
open Lwt.Infix
open Eio

let strf = Printf.sprintf

Expand All @@ -11,48 +11,50 @@ let running_as_root = Unix.getuid () = 0
since being cloned. The counter starts from zero when the in-memory cache
value is created (i.e. you cannot compare across restarts). *)
type cache = {
lock : Lwt_mutex.t;
lock : Eio.Mutex.t;
mutable gen : int;
}

type t = {
root : string; (* The top-level directory (containing `result`, etc). *)
process : Process.t;
caches : (string, cache) Hashtbl.t;
mutable next : int; (* Used to generate unique temporary IDs. *)
}

let ( / ) = Filename.concat

module Btrfs = struct
let btrfs ?(sudo=false) args =
let btrfs ?(sudo=false) t args =
let args = "btrfs" :: args in
let args = if sudo && not running_as_root then "sudo" :: args else args in
Os.exec ~stdout:`Dev_null args
Switch.run @@ fun sw ->
Os.exec ~sw ~process:t.process args
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Os.exec shouldn't take a switch, because when it returns the process has already finished.


let subvolume_create path =
let subvolume_create t path =
assert (not (Sys.file_exists path));
btrfs ["subvolume"; "create"; "--"; path]
btrfs t ["subvolume"; "create"; "--"; path]

let subvolume_delete path =
btrfs ~sudo:true ["subvolume"; "delete"; "--"; path]
let subvolume_delete t path =
btrfs t ~sudo:true ["subvolume"; "delete"; "--"; path]

let subvolume_sync path =
btrfs ~sudo:true ["subvolume"; "sync"; "--"; path]
let subvolume_sync t path =
btrfs ~sudo:true t ["subvolume"; "sync"; "--"; path]

let subvolume_snapshot mode ~src dst =
let subvolume_snapshot mode t ~src dst =
assert (not (Sys.file_exists dst));
let readonly =
match mode with
| `RO -> ["-r"]
| `RW -> []
in
btrfs ~sudo:true (["subvolume"; "snapshot"] @ readonly @ ["--"; src; dst])
btrfs ~sudo:true t (["subvolume"; "snapshot"] @ readonly @ ["--"; src; dst])
end

let delete_snapshot_if_exists path =
let delete_snapshot_if_exists t path =
match Os.check_dir path with
| `Missing -> Lwt.return_unit
| `Present -> Btrfs.subvolume_delete path
| `Missing -> ()
| `Present -> Btrfs.subvolume_delete t path

module Path = struct
(* A btrfs store contains several subdirectories:
Expand All @@ -73,24 +75,24 @@ module Path = struct
end

let delete t id =
delete_snapshot_if_exists (Path.result t id)
delete_snapshot_if_exists t (Path.result t id)

let purge path =
Sys.readdir path |> Array.to_list |> Lwt_list.iter_s (fun item ->
let purge t path =
Sys.readdir path |> Array.to_list |> Fiber.iter (fun item ->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: the direct translation of Lwt_list.iter_s is List.iter.

let item = path / item in
Log.warn (fun f -> f "Removing left-over temporary item %S" item);
Btrfs.subvolume_delete item
Btrfs.subvolume_delete t item
)

let check_kernel_version () =
Os.pread ["uname"; "-r"] >>= fun kver ->
let check_kernel_version process =
let kver = Switch.run @@ fun sw -> Os.pread ~sw ~process ["uname"; "-r"] in
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't have a switch here either.

match String.split_on_char '.' kver with
| maj :: min :: _ ->
begin match int_of_string_opt maj, int_of_string_opt min with
| Some maj, Some min when (maj, min) >= (5, 8) ->
Lwt.return_unit
()
| Some maj, Some min ->
Lwt.fail_with
failwith
(Fmt.str
"You need at least linux 5.8 to use the btrfs backend, \
but current kernel version is '%d.%d'"
Expand All @@ -101,41 +103,38 @@ let check_kernel_version () =
| _ ->
Fmt.failwith "Could not parse output of 'uname -r' (%S)" kver

let create root =
check_kernel_version () >>= fun () ->
let create process root =
check_kernel_version process;
Os.ensure_dir (root / "result");
Os.ensure_dir (root / "result-tmp");
Os.ensure_dir (root / "state");
Os.ensure_dir (root / "cache");
Os.ensure_dir (root / "cache-tmp");
purge (root / "result-tmp") >>= fun () ->
purge (root / "cache-tmp") >>= fun () ->
Lwt.return { root; caches = Hashtbl.create 10; next = 0 }
let t = { root; process; caches = Hashtbl.create 10; next = 0 } in
purge t (root / "result-tmp");
purge t (root / "cache-tmp");
t

let build t ?base ~id fn =
let result = Path.result t id in
let result_tmp = Path.result_tmp t id in
assert (not (Sys.file_exists result)); (* Builder should have checked first *)
begin match base with
| None -> Btrfs.subvolume_create result_tmp
| Some base -> Btrfs.subvolume_snapshot `RW ~src:(Path.result t base) result_tmp
end
>>= fun () ->
Lwt.try_bind
(fun () -> fn result_tmp)
(fun r ->
begin match r with
| Ok () -> Btrfs.subvolume_snapshot `RO ~src:result_tmp result
| Error _ -> Lwt.return_unit
end >>= fun () ->
Btrfs.subvolume_delete result_tmp >>= fun () ->
Lwt.return r
)
(fun ex ->
Log.warn (fun f -> f "Uncaught exception from %S build function: %a" id Fmt.exn ex);
Btrfs.subvolume_delete result_tmp >>= fun () ->
Lwt.fail ex
)
| None -> Btrfs.subvolume_create t result_tmp
| Some base -> Btrfs.subvolume_snapshot `RW t ~src:(Path.result t base) result_tmp
end;
try
let r = fn result_tmp in
begin match r with
| Ok () -> Btrfs.subvolume_snapshot `RO t ~src:result_tmp result
| Error _ -> ()
end;
Btrfs.subvolume_delete t result_tmp;
r
with ex ->
Log.warn (fun f -> f "Uncaught exception from %S build function: %a" id Fmt.exn ex);
Btrfs.subvolume_delete t result_tmp;
raise ex

let result t id =
let dir = Path.result t id in
Expand All @@ -147,52 +146,53 @@ let get_cache t name =
match Hashtbl.find_opt t.caches name with
| Some c -> c
| None ->
let c = { lock = Lwt_mutex.create (); gen = 0 } in
let c = { lock = Mutex.create (); gen = 0 } in
Hashtbl.add t.caches name c;
c

let cache ~user t name : (string * (unit -> unit Lwt.t)) Lwt.t =
let cache ~user t name : (string * (unit -> unit)) =
let cache = get_cache t name in
Lwt_mutex.with_lock cache.lock @@ fun () ->
Mutex.use_ro cache.lock @@ fun () ->
let tmp = Path.cache_tmp t t.next name in
t.next <- t.next + 1;
let snapshot = Path.cache t name in
(* Create cache if it doesn't already exist. *)
begin match Os.check_dir snapshot with
| `Missing -> Btrfs.subvolume_create snapshot
| `Present -> Lwt.return_unit
end >>= fun () ->
| `Missing -> Btrfs.subvolume_create t snapshot
| `Present -> ()
end;
(* Create writeable clone. *)
let gen = cache.gen in
Btrfs.subvolume_snapshot `RW ~src:snapshot tmp >>= fun () ->
Btrfs.subvolume_snapshot `RW t ~src:snapshot tmp;
let { Obuilder_spec.uid; gid } = user in
Os.sudo ["chown"; Printf.sprintf "%d:%d" uid gid; tmp] >>= fun () ->
Switch.run @@ fun sw ->
Os.sudo ~sw ~process:t.process ["chown"; Printf.sprintf "%d:%d" uid gid; tmp];
let release () =
Lwt_mutex.with_lock cache.lock @@ fun () ->
Mutex.use_ro cache.lock @@ fun () ->
begin
if cache.gen = gen then (
(* The cache hasn't changed since we cloned it. Update it. *)
(* todo: check if it has actually changed. *)
cache.gen <- cache.gen + 1;
Btrfs.subvolume_delete snapshot >>= fun () ->
Btrfs.subvolume_snapshot `RO ~src:tmp snapshot
) else Lwt.return_unit
end >>= fun () ->
Btrfs.subvolume_delete tmp
Btrfs.subvolume_delete t snapshot;
Btrfs.subvolume_snapshot `RO t ~src:tmp snapshot
) else ()
end;
Btrfs.subvolume_delete t tmp
in
Lwt.return (tmp, release)
(tmp, release)

let delete_cache t name =
let cache = get_cache t name in
Lwt_mutex.with_lock cache.lock @@ fun () ->
Mutex.use_ro cache.lock @@ fun () ->
cache.gen <- cache.gen + 1; (* Ensures in-progress writes will be discarded *)
let snapshot = Path.cache t name in
if Sys.file_exists snapshot then (
Btrfs.subvolume_delete snapshot >>= fun () ->
Lwt_result.return ()
) else Lwt_result.return ()
Btrfs.subvolume_delete t snapshot;
Ok ()
) else Ok ()

let state_dir = Path.state

let complete_deletes t =
Btrfs.subvolume_sync t.root
Btrfs.subvolume_sync t t.root
2 changes: 1 addition & 1 deletion lib/btrfs_store.mli
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@

include S.STORE

val create : string -> t Lwt.t
val create : Eio.Process.t -> string -> t
(** [create path] is a new store in btrfs directory [path]. *)
Loading