Skip to content

Commit

Permalink
Make Repo::flush interruptible
Browse files Browse the repository at this point in the history
  • Loading branch information
roberth committed Aug 28, 2024
1 parent f015c5b commit a5804e2
Showing 1 changed file with 87 additions and 30 deletions.
117 changes: 87 additions & 30 deletions src/libfetchers/git-utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,46 @@ static Object peelToTreeOrBlob(git_object * obj)
return peelObject<Object>(obj, GIT_OBJECT_TREE);
}

struct PackBuilderContext {
std::exception_ptr exception;

void handleException(const char * activity, int errCode)
{
switch (errCode) {
case GIT_OK:
break;
case GIT_EUSER:
if (!exception)
panic("PackBuilderContext::handleException: user error, but exception not set");

std::rethrow_exception(exception);
panic("PackBuilderContext::handleException: exception not thrown");
default:
throw Error("%s: %i, %s", Uncolored(activity), errCode, git_error_last()->message);
}
}
};

extern "C" {

/**
* A `git_packbuilder_progress` implementation that aborts the pack building if needed.
*/
static int packBuilderProgressCheckInterrupt(int stage, uint32_t current, uint32_t total, void *payload)
{
PackBuilderContext & args = * (PackBuilderContext *) payload;
try {
checkInterrupt();
return GIT_OK;
} catch (const std::exception & e) {
args.exception = std::current_exception();
return GIT_EUSER;
}
};
static git_packbuilder_progress PACKBUILDER_PROGRESS_CHECK_INTERRUPT = &packBuilderProgressCheckInterrupt;

} // extern "C"

struct GitRepoImpl : GitRepo, std::enable_shared_from_this<GitRepoImpl>
{
/** Location of the repository on disk. */
Expand Down Expand Up @@ -213,42 +253,59 @@ struct GitRepoImpl : GitRepo, std::enable_shared_from_this<GitRepoImpl>
}

void flush() override {
checkInterrupt();

git_buf buf = GIT_BUF_INIT;
try {
PackBuilder packBuilder;
git_packbuilder_new(Setter(packBuilder), *this);
checkInterrupt();
git_mempack_write_thin_pack(mempack_backend, packBuilder.get());
checkInterrupt();
// TODO make git_packbuilder_write_buf() interruptible
git_packbuilder_write_buf(&buf, packBuilder.get());
checkInterrupt();
Finally _disposeBuf { [&] { git_buf_dispose(&buf); } };
PackBuilder packBuilder;
PackBuilderContext packBuilderContext;
git_packbuilder_new(Setter(packBuilder), *this);
git_packbuilder_set_callbacks(packBuilder.get(), PACKBUILDER_PROGRESS_CHECK_INTERRUPT, &packBuilderContext);
git_packbuilder_set_threads(packBuilder.get(), 0 /* autodetect */);

// TODO: For an optimal pack it's mandatory to insert objects in recency order, commits followed by trees and blobs.
packBuilderContext.handleException(
"preparing packfile",
git_mempack_write_thin_pack(mempack_backend, packBuilder.get())
);
checkInterrupt();
packBuilderContext.handleException(
"writing packfile",
git_packbuilder_write_buf(&buf, packBuilder.get())
);
checkInterrupt();

std::string repo_path = std::string(git_repository_path(repo.get()));
while (!repo_path.empty() && repo_path.back() == '/')
repo_path.pop_back();
std::string pack_dir_path = repo_path + "/objects/pack";

// TODO: could the indexing be done in a separate thread?
Indexer indexer;
git_indexer_progress stats;
if (git_indexer_new(Setter(indexer), pack_dir_path.c_str(), 0, nullptr, nullptr))
throw Error("creating git packfile indexer: %s", git_error_last()->message);
// TODO: feed buf in (fairly large) chunk to make this interruptible
if (git_indexer_append(indexer.get(), buf.ptr, buf.size, &stats))
std::string repo_path = std::string(git_repository_path(repo.get()));
while (!repo_path.empty() && repo_path.back() == '/')
repo_path.pop_back();
std::string pack_dir_path = repo_path + "/objects/pack";

// TODO (performance): could the indexing be done in a separate thread?
// we'd need a more streaming variation of
// git_packbuilder_write_buf, or incur the cost of
// copying parts of the buffer to a separate thread.
// (synchronously on the git_packbuilder_write_buf thread)
Indexer indexer;
git_indexer_progress stats;
if (git_indexer_new(Setter(indexer), pack_dir_path.c_str(), 0, nullptr, nullptr))
throw Error("creating git packfile indexer: %s", git_error_last()->message);

// TODO: provide index callback for checkInterrupt() termination
// though this is about an order of magnitude faster than the packbuilder
// expect up to 1 sec latency due to uninterruptible git_indexer_append.
constexpr size_t chunkSize = 128 * 1024;
for (size_t offset = 0; offset < buf.size; offset += chunkSize) {
if (git_indexer_append(indexer.get(), buf.ptr + offset, std::min(chunkSize, buf.size - offset), &stats))
throw Error("appending to git packfile index: %s", git_error_last()->message);
checkInterrupt();
if (git_indexer_commit(indexer.get(), &stats))
throw Error("committing git packfile index: %s", git_error_last()->message);
}

if (git_mempack_reset(mempack_backend))
throw Error("resetting git mempack backend: %s", git_error_last()->message);
if (git_indexer_commit(indexer.get(), &stats))
throw Error("committing git packfile index: %s", git_error_last()->message);

if (git_mempack_reset(mempack_backend))
throw Error("resetting git mempack backend: %s", git_error_last()->message);

git_buf_dispose(&buf);
} catch (...) {
git_buf_dispose(&buf);
throw;
}
checkInterrupt();
}

Expand Down

0 comments on commit a5804e2

Please sign in to comment.