Skip to content

Commit

Permalink
Make GitFileSystemObjectSink multi-threaded
Browse files Browse the repository at this point in the history
  • Loading branch information
edolstra committed Dec 17, 2024
1 parent 00f08de commit ae72c4a
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 158 deletions.
312 changes: 158 additions & 154 deletions src/libfetchers/git-utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "users.hh"
#include "fs-sink.hh"
#include "sync.hh"
#include "thread-pool.hh"

#include <git2/attr.h>
#include <git2/blob.h>
Expand Down Expand Up @@ -256,6 +257,7 @@ struct GitRepoImpl : GitRepo, std::enable_shared_from_this<GitRepoImpl>
if (git_repository_open(Setter(repo), path.string().c_str()))
throw Error("opening Git repository %s: %s", path, git_error_last()->message);

#if 0
ObjectDb odb;
if (git_repository_odb(Setter(odb), repo.get()))
throw Error("getting Git object database: %s", git_error_last()->message);
Expand All @@ -266,6 +268,7 @@ struct GitRepoImpl : GitRepo, std::enable_shared_from_this<GitRepoImpl>

if (git_odb_add_backend(odb.get(), mempack_backend, 999))
throw Error("adding mempack backend to Git object database: %s", git_error_last()->message);
#endif
}

operator git_repository * ()
Expand Down Expand Up @@ -977,216 +980,217 @@ struct GitFileSystemObjectSinkImpl : GitFileSystemObjectSink
{
ref<GitRepoImpl> repo;

struct PendingDir
{
std::string name;
TreeBuilder builder;
};
ThreadPool workers;

std::vector<PendingDir> pendingDirs;
GitFileSystemObjectSinkImpl(ref<GitRepoImpl> repo) : repo(repo)
{ }

struct Directory;

void pushBuilder(std::string name)
struct Directory
{
const git_tree_entry * entry;
Tree prevTree = nullptr;
#if 0
Directory * parent = nullptr; // FIXME: remove
std::string name;
#endif
using Child = std::pair<git_filemode_t, std::variant<Directory, git_oid>>;
std::map<std::string, Child> children;
std::optional<git_oid> oid;

if (!pendingDirs.empty() &&
(entry = git_treebuilder_get(pendingDirs.back().builder.get(), name.c_str())))
#if 0
CanonPath toPath() const
{
/* Clone a tree that we've already finished. This happens
if a tarball has directory entries that are not
contiguous. */
if (git_tree_entry_type(entry) != GIT_OBJECT_TREE)
throw Error("parent of '%s' is not a directory", name);

if (git_tree_entry_to_object((git_object * *) (git_tree * *) Setter(prevTree), *repo, entry))
throw Error("looking up parent of '%s': %s", name, git_error_last()->message);
if (!parent) return CanonPath::root;
auto res = parent->toPath();
res.push(name);
return res;
}
#endif

git_treebuilder * b;
if (git_treebuilder_new(&b, *repo, prevTree.get()))
throw Error("creating a tree builder: %s", git_error_last()->message);
pendingDirs.push_back({ .name = std::move(name), .builder = TreeBuilder(b) });
};

GitFileSystemObjectSinkImpl(ref<GitRepoImpl> repo) : repo(repo)
{
pushBuilder("");
}
Child & lookup(const CanonPath & path)
{
assert(!path.isRoot());
auto parent = path.parent();
auto cur = this;
for (auto & name : *parent) {
auto i = cur->children.find(std::string(name));
if (i == cur->children.end())
throw Error("path '%s' does not exist", path);
auto dir = std::get_if<Directory>(&i->second.second);
if (!dir)
throw Error("path '%s' has a non-directory parent", path);
cur = dir;
}

std::pair<git_oid, std::string> popBuilder()
{
assert(!pendingDirs.empty());
auto pending = std::move(pendingDirs.back());
git_oid oid;
if (git_treebuilder_write(&oid, pending.builder.get()))
throw Error("creating a tree object: %s", git_error_last()->message);
pendingDirs.pop_back();
return {oid, pending.name};
auto i = cur->children.find(std::string(*path.baseName()));
if (i == cur->children.end())
throw Error("path '%s' does not exist", path);
return i->second;
}
};

void addToTree(const std::string & name, const git_oid & oid, git_filemode_t mode)
struct State
{
assert(!pendingDirs.empty());
auto & pending = pendingDirs.back();
if (git_treebuilder_insert(nullptr, pending.builder.get(), name.c_str(), &oid, mode))
throw Error("adding a file to a tree builder: %s", git_error_last()->message);
Directory root;
};

void updateBuilders(std::span<const std::string> names)
{
// Find the common prefix of pendingDirs and names.
size_t prefixLen = 0;
for (; prefixLen < names.size() && prefixLen + 1 < pendingDirs.size(); ++prefixLen)
if (names[prefixLen] != pendingDirs[prefixLen + 1].name)
break;
Sync<State> _state;

// Finish the builders that are not part of the common prefix.
for (auto n = pendingDirs.size(); n > prefixLen + 1; --n) {
auto [oid, name] = popBuilder();
addToTree(name, oid, GIT_FILEMODE_TREE);
}

// Create builders for the new directories.
for (auto n = prefixLen; n < names.size(); ++n)
pushBuilder(names[n]);
};

bool prepareDirs(const std::vector<std::string> & pathComponents, bool isDir)
void addNode(State & state, const CanonPath & path, Directory::Child && child)
{
std::span<const std::string> pathComponents2{pathComponents};
assert(!path.isRoot());
auto parent = path.parent();

updateBuilders(
isDir
? pathComponents2
: pathComponents2.first(pathComponents2.size() - 1));
Directory * cur = &state.root;

for (auto & i : *parent) {
auto child = std::get_if<Directory>(&cur->children.emplace(
std::string(i),
Directory::Child{GIT_FILEMODE_TREE, {Directory()}}).first->second.second);
assert(child);
#if 0
child->parent = cur;
child->name = i;
#endif
cur = child;
}

return true;
// FIXME: handle conflicts
cur->children.emplace(std::string(*path.baseName()), std::move(child));
}

void createRegularFile(
const CanonPath & path,
std::function<void(CreateRegularFileSink &)> func) override
{
auto pathComponents = tokenizeString<std::vector<std::string>>(path.rel(), "/");
if (!prepareDirs(pathComponents, false)) return;

git_writestream * stream = nullptr;
if (git_blob_create_from_stream(&stream, *repo, nullptr))
throw Error("creating a blob stream object: %s", git_error_last()->message);

struct CRF : CreateRegularFileSink {
const CanonPath & path;
GitFileSystemObjectSinkImpl & back;
git_writestream * stream;
std::string data;
bool executable = false;
CRF(const CanonPath & path, GitFileSystemObjectSinkImpl & back, git_writestream * stream)
: path(path), back(back), stream(stream)
{}

void operator () (std::string_view data) override
{
if (stream->write(stream, data.data(), data.size()))
throw Error("writing a blob for tarball member '%s': %s", path, git_error_last()->message);
this->data += data;
}

void isExecutable() override
{
executable = true;
}
} crf { path, *this, stream };
func(crf);
} crf;

git_oid oid;
if (git_blob_create_from_stream_commit(&oid, stream))
throw Error("creating a blob object for tarball member '%s': %s", path, git_error_last()->message);
func(crf);

addToTree(*pathComponents.rbegin(), oid,
crf.executable
? GIT_FILEMODE_BLOB_EXECUTABLE
: GIT_FILEMODE_BLOB);
workers.enqueue([this, path, data{std::move(crf.data)}, executable(crf.executable)]()
{
// FIXME: leak
git_writestream * stream = nullptr;
if (git_blob_create_from_stream(&stream, *repo, nullptr))
throw Error("creating a blob stream object: %s", git_error_last()->message);

if (stream->write(stream, data.data(), data.size()))
throw Error("writing a blob for tarball member '%s': %s", path, git_error_last()->message);

git_oid oid;
if (git_blob_create_from_stream_commit(&oid, stream))
throw Error("creating a blob object for tarball member '%s': %s", path, git_error_last()->message);

auto state(_state.lock());
addNode(*state, path,
Directory::Child{
executable
? GIT_FILEMODE_BLOB_EXECUTABLE
: GIT_FILEMODE_BLOB,
oid});
});
}

void createDirectory(const CanonPath & path) override
{
auto pathComponents = tokenizeString<std::vector<std::string>>(path.rel(), "/");
(void) prepareDirs(pathComponents, true);
if (path.isRoot()) return;
auto state(_state.lock());
addNode(*state, path, {GIT_FILEMODE_TREE, Directory()});
}

void createSymlink(const CanonPath & path, const std::string & target) override
{
auto pathComponents = tokenizeString<std::vector<std::string>>(path.rel(), "/");
if (!prepareDirs(pathComponents, false)) return;

git_oid oid;
if (git_blob_create_from_buffer(&oid, *repo, target.c_str(), target.size()))
throw Error("creating a blob object for tarball symlink member '%s': %s", path, git_error_last()->message);
workers.enqueue([this, path, target]()
{
git_oid oid;
if (git_blob_create_from_buffer(&oid, *repo, target.c_str(), target.size()))
throw Error("creating a blob object for tarball symlink member '%s': %s", path, git_error_last()->message);

addToTree(*pathComponents.rbegin(), oid, GIT_FILEMODE_LINK);
auto state(_state.lock());
addNode(*state, path, Directory::Child{GIT_FILEMODE_LINK, oid});
});
}

std::map<CanonPath, CanonPath> hardLinks;

void createHardlink(const CanonPath & path, const CanonPath & target) override
{
std::vector<std::string> pathComponents;
for (auto & c : path)
pathComponents.emplace_back(c);

if (!prepareDirs(pathComponents, false)) return;

// We can't just look up the path from the start of the root, since
// some parent directories may not have finished yet, so we compute
// a relative path that helps us find the right git_tree_builder or object.
auto relTarget = CanonPath(path).parent()->makeRelative(target);

auto dir = pendingDirs.rbegin();

// For each ../ component at the start, go up one directory.
// CanonPath::makeRelative() always puts all .. elements at the start,
// so they're all handled by this loop:
std::string_view relTargetLeft(relTarget);
while (hasPrefix(relTargetLeft, "../")) {
if (dir == pendingDirs.rend())
throw Error("invalid hard link target '%s' for path '%s'", target, path);
++dir;
relTargetLeft = relTargetLeft.substr(3);
}
if (dir == pendingDirs.rend())
throw Error("invalid hard link target '%s' for path '%s'", target, path);

// Look up the remainder of the target, starting at the
// top-most `git_treebuilder`.
std::variant<git_treebuilder *, git_oid> curDir{dir->builder.get()};
Object tree; // needed to keep `entry` alive
const git_tree_entry * entry = nullptr;

for (auto & c : CanonPath(relTargetLeft)) {
if (auto builder = std::get_if<git_treebuilder *>(&curDir)) {
assert(*builder);
if (!(entry = git_treebuilder_get(*builder, std::string(c).c_str())))
throw Error("cannot find hard link target '%s' for path '%s'", target, path);
curDir = *git_tree_entry_id(entry);
} else if (auto oid = std::get_if<git_oid>(&curDir)) {
tree = lookupObject(*repo, *oid, GIT_OBJECT_TREE);
if (!(entry = git_tree_entry_byname((const git_tree *) &*tree, std::string(c).c_str())))
throw Error("cannot find hard link target '%s' for path '%s'", target, path);
curDir = *git_tree_entry_id(entry);
hardLinks.insert_or_assign(path, target);
}

Hash flush() override
{
workers.process();

/* Create hard links. */
{
auto state(_state.lock());
for (auto & [path, target] : hardLinks) {
if (target.isRoot()) continue;
auto [mode, child] = state->root.lookup(target);
auto oid = std::get_if<git_oid>(&child);
if (!oid)
throw Error("cannot create a hard link from '%s' to directory '%s'", path, target);
addNode(*state, path, {mode, *oid});
}
}

assert(entry);
ThreadPool workers2;

addToTree(*pathComponents.rbegin(),
*git_tree_entry_id(entry),
git_tree_entry_filemode(entry));
}
auto & root = _state.lock()->root;

Hash flush() override
{
updateBuilders({});
processGraph<Directory *>(
workers2,
{&root},
[&](Directory * const & node) -> std::set<Directory *>
{
std::set<Directory *> edges;
for (auto & child : node->children)
if (auto dir = std::get_if<Directory>(&child.second.second))
edges.insert(dir);
return edges;
},
[&](Directory * const & node)
{
//auto state(_state.lock());

git_treebuilder * b;
if (git_treebuilder_new(&b, *repo, nullptr))
throw Error("creating a tree builder: %s", git_error_last()->message);
TreeBuilder builder(b);

for (auto & [name, child] : node->children) {
auto oid_p = std::get_if<git_oid>(&child.second);
auto oid = oid_p ? *oid_p : std::get<Directory>(child.second).oid.value();
if (git_treebuilder_insert(nullptr, builder.get(), name.c_str(), &oid, child.first))
throw Error("adding a file to a tree builder: %s", git_error_last()->message);
}

auto [oid, _name] = popBuilder();
git_oid oid;
if (git_treebuilder_write(&oid, builder.get()))
throw Error("creating a tree object: %s", git_error_last()->message);
node->oid = oid;
},
true);

#if 0
repo->flush();
#endif

return toHash(oid);
return toHash(root.oid.value());
}
};

Expand Down
Loading

0 comments on commit ae72c4a

Please sign in to comment.