diff --git a/flake.lock b/flake.lock index 897785ed0..1979a3d97 100644 --- a/flake.lock +++ b/flake.lock @@ -1,5 +1,26 @@ { "nodes": { + "flake-parts": { + "inputs": { + "nixpkgs-lib": [ + "nix-eval-jobs", + "nixpkgs" + ] + }, + "locked": { + "lastModified": 1722555600, + "narHash": "sha256-XOQkdLafnb/p9ij77byFQjDf5m5QYl9b2REiVClC+x4=", + "owner": "hercules-ci", + "repo": "flake-parts", + "rev": "8471fe90ad337a8074e957b69ca4d0089218391d", + "type": "github" + }, + "original": { + "owner": "hercules-ci", + "repo": "flake-parts", + "type": "github" + } + }, "libgit2": { "flake": false, "locked": { @@ -46,6 +67,30 @@ "type": "github" } }, + "nix-eval-jobs": { + "inputs": { + "flake-parts": "flake-parts", + "nix-github-actions": [], + "nixpkgs": [ + "nixpkgs" + ], + "treefmt-nix": "treefmt-nix" + }, + "locked": { + "lastModified": 1732550783, + "narHash": "sha256-HvIOV9Gf82Sc+WaQy7Wi9h8d5LXDkGwhbRUOmvGfEZM=", + "owner": "Ericson2314", + "repo": "nix-eval-jobs", + "rev": "bd5e93d30f87c936199af31d2448eb57b2357d57", + "type": "github" + }, + "original": { + "owner": "Ericson2314", + "ref": "aggregate-jobs", + "repo": "nix-eval-jobs", + "type": "github" + } + }, "nixpkgs": { "locked": { "lastModified": 1726688310, @@ -66,8 +111,30 @@ "inputs": { "libgit2": "libgit2", "nix": "nix", + "nix-eval-jobs": "nix-eval-jobs", "nixpkgs": "nixpkgs" } + }, + "treefmt-nix": { + "inputs": { + "nixpkgs": [ + "nix-eval-jobs", + "nixpkgs" + ] + }, + "locked": { + "lastModified": 1723303070, + "narHash": "sha256-krGNVA30yptyRonohQ+i9cnK+CfCpedg6z3qzqVJcTs=", + "owner": "numtide", + "repo": "treefmt-nix", + "rev": "14c092e0326de759e16b37535161b3cb9770cea3", + "type": "github" + }, + "original": { + "owner": "numtide", + "repo": "treefmt-nix", + "type": "github" + } } }, "root": "root", diff --git a/flake.nix b/flake.nix index fccd45b96..048800c5b 100644 --- a/flake.nix +++ b/flake.nix @@ -8,6 +8,9 @@ inputs.nix.inputs.nixpkgs.follows = "nixpkgs"; inputs.nix.inputs.libgit2.follows = "libgit2"; + inputs.nix-eval-jobs.url = "github:Ericson2314/nix-eval-jobs/aggregate-jobs"; + inputs.nix-eval-jobs.inputs.nixpkgs.follows = "nixpkgs"; + # hide nix dev tooling from our lock file inputs.nix.inputs.flake-parts.follows = ""; inputs.nix.inputs.git-hooks-nix.follows = ""; @@ -15,7 +18,10 @@ inputs.nix.inputs.nixpkgs-23-11.follows = ""; inputs.nix.inputs.flake-compat.follows = ""; - outputs = { self, nixpkgs, nix, ... }: + # hide nix-eval-jobs dev tooling from our lock file + inputs.nix-eval-jobs.inputs.nix-github-actions.follows = ""; + + outputs = { self, nixpkgs, nix, nix-eval-jobs, ... }: let systems = [ "x86_64-linux" "aarch64-linux" ]; forEachSystem = nixpkgs.lib.genAttrs systems; @@ -26,6 +32,7 @@ overlays.default = final: prev: { hydra = final.callPackage ./package.nix { inherit (nixpkgs.lib) fileset; + nix-eval-jobs = nix-eval-jobs.packages.${final.system}.default; rawSrc = self; nix-perl-bindings = final.nixComponents.nix-perl-bindings; }; @@ -69,6 +76,7 @@ packages = forEachSystem (system: { hydra = nixpkgs.legacyPackages.${system}.callPackage ./package.nix { inherit (nixpkgs.lib) fileset; + nix-eval-jobs = nix-eval-jobs.packages.${system}.default; rawSrc = self; nix = nix.packages.${system}.nix; nix-perl-bindings = nix.hydraJobs.perlBindings.${system}; diff --git a/package.nix b/package.nix index f944fe2b6..a9ec12c85 100644 --- a/package.nix +++ b/package.nix @@ -50,6 +50,7 @@ , xz , gnutar , gnused +, nix-eval-jobs , rpm , dpkg @@ -190,6 +191,7 @@ stdenv.mkDerivation (finalAttrs: { openldap postgresql_13 pixz + nix-eval-jobs ]; checkInputs = [ @@ -218,6 +220,7 @@ stdenv.mkDerivation (finalAttrs: { darcs gnused breezy + nix-eval-jobs ] ++ lib.optionals stdenv.isLinux [ rpm dpkg cdrkit ] ); @@ -232,7 +235,7 @@ stdenv.mkDerivation (finalAttrs: { shellHook = '' pushd $(git rev-parse --show-toplevel) >/dev/null - PATH=$(pwd)/src/hydra-evaluator:$(pwd)/src/script:$(pwd)/src/hydra-eval-jobs:$(pwd)/src/hydra-queue-runner:$PATH + PATH=$(pwd)/src/hydra-evaluator:$(pwd)/src/script:$(pwd)/src/hydra-queue-runner:$PATH PERL5LIB=$(pwd)/src/lib:$PERL5LIB export HYDRA_HOME="$(pwd)/src/" mkdir -p .hydra-data diff --git a/src/hydra-eval-jobs/hydra-eval-jobs.cc b/src/hydra-eval-jobs/hydra-eval-jobs.cc deleted file mode 100644 index b83cae916..000000000 --- a/src/hydra-eval-jobs/hydra-eval-jobs.cc +++ /dev/null @@ -1,587 +0,0 @@ -#include -#include -#include -#include - -#include "shared.hh" -#include "store-api.hh" -#include "eval.hh" -#include "eval-gc.hh" -#include "eval-inline.hh" -#include "eval-settings.hh" -#include "signals.hh" -#include "terminal.hh" -#include "util.hh" -#include "get-drvs.hh" -#include "globals.hh" -#include "common-eval-args.hh" -#include "flake/flakeref.hh" -#include "flake/flake.hh" -#include "attr-path.hh" -#include "derivations.hh" -#include "local-fs-store.hh" - -#include "hydra-config.hh" - -#include -#include -#include - -#include - -void check_pid_status_nonblocking(pid_t check_pid) -{ - // Only check 'initialized' and known PID's - if (check_pid <= 0) { return; } - - int wstatus = 0; - pid_t pid = waitpid(check_pid, &wstatus, WNOHANG); - // -1 = failure, WNOHANG: 0 = no change - if (pid <= 0) { return; } - - std::cerr << "child process (" << pid << ") "; - - if (WIFEXITED(wstatus)) { - std::cerr << "exited with status=" << WEXITSTATUS(wstatus) << std::endl; - } else if (WIFSIGNALED(wstatus)) { - std::cerr << "killed by signal=" << WTERMSIG(wstatus) << std::endl; - } else if (WIFSTOPPED(wstatus)) { - std::cerr << "stopped by signal=" << WSTOPSIG(wstatus) << std::endl; - } else if (WIFCONTINUED(wstatus)) { - std::cerr << "continued" << std::endl; - } -} - -using namespace nix; - -static Path gcRootsDir; -static size_t maxMemorySize; - -struct MyArgs : MixEvalArgs, MixCommonArgs, RootArgs -{ - Path releaseExpr; - bool flake = false; - bool dryRun = false; - - MyArgs() : MixCommonArgs("hydra-eval-jobs") - { - addFlag({ - .longName = "gc-roots-dir", - .description = "garbage collector roots directory", - .labels = {"path"}, - .handler = {&gcRootsDir} - }); - - addFlag({ - .longName = "dry-run", - .description = "don't create store derivations", - .handler = {&dryRun, true} - }); - - addFlag({ - .longName = "flake", - .description = "build a flake", - .handler = {&flake, true} - }); - - expectArg("expr", &releaseExpr); - } -}; - -static MyArgs myArgs; - -static std::string queryMetaStrings(EvalState & state, PackageInfo & drv, const std::string & name, const std::string & subAttribute) -{ - Strings res; - std::function rec; - - rec = [&](Value & v) { - state.forceValue(v, noPos); - if (v.type() == nString) - res.emplace_back(v.string_view()); - else if (v.isList()) - for (unsigned int n = 0; n < v.listSize(); ++n) - rec(*v.listElems()[n]); - else if (v.type() == nAttrs) { - auto a = v.attrs()->find(state.symbols.create(subAttribute)); - if (a != v.attrs()->end()) - res.push_back(std::string(state.forceString(*a->value, a->pos, "while evaluating meta attributes"))); - } - }; - - Value * v = drv.queryMeta(name); - if (v) rec(*v); - - return concatStringsSep(", ", res); -} - -static void worker( - EvalState & state, - Bindings & autoArgs, - AutoCloseFD & to, - AutoCloseFD & from) -{ - Value vTop; - - if (myArgs.flake) { - using namespace flake; - - auto [flakeRef, fragment, outputSpec] = parseFlakeRefWithFragmentAndExtendedOutputsSpec(fetchSettings, myArgs.releaseExpr, absPath(".")); - - auto vFlake = state.allocValue(); - - auto lockedFlake = lockFlake( - flakeSettings, - state, - flakeRef, - LockFlags { - .updateLockFile = false, - .useRegistries = false, - .allowUnlocked = false, - }); - - callFlake(state, lockedFlake, *vFlake); - - auto vOutputs = vFlake->attrs()->get(state.symbols.create("outputs"))->value; - state.forceValue(*vOutputs, noPos); - - auto aHydraJobs = vOutputs->attrs()->get(state.symbols.create("hydraJobs")); - if (!aHydraJobs) - aHydraJobs = vOutputs->attrs()->get(state.symbols.create("checks")); - if (!aHydraJobs) - throw Error("flake '%s' does not provide any Hydra jobs or checks", flakeRef); - - vTop = *aHydraJobs->value; - - } else { - state.evalFile(lookupFileArg(state, myArgs.releaseExpr), vTop); - } - - auto vRoot = state.allocValue(); - state.autoCallFunction(autoArgs, vTop, *vRoot); - - while (true) { - /* Wait for the master to send us a job name. */ - writeLine(to.get(), "next"); - - auto s = readLine(from.get()); - if (s == "exit") break; - if (!hasPrefix(s, "do ")) abort(); - std::string attrPath(s, 3); - - debug("worker process %d at '%s'", getpid(), attrPath); - - /* Evaluate it and send info back to the master. */ - nlohmann::json reply; - - try { - auto vTmp = findAlongAttrPath(state, attrPath, autoArgs, *vRoot).first; - - auto v = state.allocValue(); - state.autoCallFunction(autoArgs, *vTmp, *v); - - if (auto drv = getDerivation(state, *v, false)) { - - // CA derivations do not have static output paths, so we - // have to defensively not query output paths in case we - // encounter one. - PackageInfo::Outputs outputs = drv->queryOutputs( - !experimentalFeatureSettings.isEnabled(Xp::CaDerivations)); - - if (drv->querySystem() == "unknown") - state.error("derivation must have a 'system' attribute").debugThrow(); - - auto drvPath = state.store->printStorePath(drv->requireDrvPath()); - - nlohmann::json job; - - job["nixName"] = drv->queryName(); - job["system"] =drv->querySystem(); - job["drvPath"] = drvPath; - job["description"] = drv->queryMetaString("description"); - job["license"] = queryMetaStrings(state, *drv, "license", "shortName"); - job["homepage"] = drv->queryMetaString("homepage"); - job["maintainers"] = queryMetaStrings(state, *drv, "maintainers", "email"); - job["schedulingPriority"] = drv->queryMetaInt("schedulingPriority", 100); - job["timeout"] = drv->queryMetaInt("timeout", 36000); - job["maxSilent"] = drv->queryMetaInt("maxSilent", 7200); - job["isChannel"] = drv->queryMetaBool("isHydraChannel", false); - - /* If this is an aggregate, then get its constituents. */ - auto a = v->attrs()->get(state.symbols.create("_hydraAggregate")); - if (a && state.forceBool(*a->value, a->pos, "while evaluating the `_hydraAggregate` attribute")) { - auto a = v->attrs()->get(state.symbols.create("constituents")); - if (!a) - state.error("derivation must have a ‘constituents’ attribute").debugThrow(); - - NixStringContext context; - state.coerceToString(a->pos, *a->value, context, "while evaluating the `constituents` attribute", true, false); - for (auto & c : context) - std::visit(overloaded { - [&](const NixStringContextElem::Built & b) { - job["constituents"].push_back(b.drvPath->to_string(*state.store)); - }, - [&](const NixStringContextElem::Opaque & o) { - }, - [&](const NixStringContextElem::DrvDeep & d) { - }, - }, c.raw); - - state.forceList(*a->value, a->pos, "while evaluating the `constituents` attribute"); - for (unsigned int n = 0; n < a->value->listSize(); ++n) { - auto v = a->value->listElems()[n]; - state.forceValue(*v, noPos); - if (v->type() == nString) - job["namedConstituents"].push_back(v->string_view()); - } - } - - /* Register the derivation as a GC root. !!! This - registers roots for jobs that we may have already - done. */ - auto localStore = state.store.dynamic_pointer_cast(); - if (gcRootsDir != "" && localStore) { - Path root = gcRootsDir + "/" + std::string(baseNameOf(drvPath)); - if (!pathExists(root)) - localStore->addPermRoot(localStore->parseStorePath(drvPath), root); - } - - nlohmann::json out; - for (auto & [outputName, optOutputPath] : outputs) { - if (optOutputPath) { - out[outputName] = state.store->printStorePath(*optOutputPath); - } else { - // See the `queryOutputs` call above; we should - // not encounter missing output paths otherwise. - assert(experimentalFeatureSettings.isEnabled(Xp::CaDerivations)); - out[outputName] = nullptr; - } - } - job["outputs"] = std::move(out); - reply["job"] = std::move(job); - } - - else if (v->type() == nAttrs) { - auto attrs = nlohmann::json::array(); - StringSet ss; - for (auto & i : v->attrs()->lexicographicOrder(state.symbols)) { - std::string name(state.symbols[i->name]); - if (name.find(' ') != std::string::npos) { - printError("skipping job with illegal name '%s'", name); - continue; - } - attrs.push_back(name); - } - reply["attrs"] = std::move(attrs); - } - - else if (v->type() == nNull) - ; - - else state.error("attribute '%s' is %s, which is not supported", attrPath, showType(*v)).debugThrow(); - - } catch (EvalError & e) { - auto msg = e.msg(); - // Transmits the error we got from the previous evaluation - // in the JSON output. - reply["error"] = filterANSIEscapes(msg, true); - // Don't forget to print it into the STDERR log, this is - // what's shown in the Hydra UI. - printError(msg); - } - - writeLine(to.get(), reply.dump()); - - /* If our RSS exceeds the maximum, exit. The master will - start a new process. */ - struct rusage r; - getrusage(RUSAGE_SELF, &r); - if ((size_t) r.ru_maxrss > maxMemorySize * 1024) break; - } - - writeLine(to.get(), "restart"); -} - -int main(int argc, char * * argv) -{ - /* Prevent undeclared dependencies in the evaluation via - $NIX_PATH. */ - unsetenv("NIX_PATH"); - - return handleExceptions(argv[0], [&]() { - - auto config = std::make_unique(); - - auto nrWorkers = config->getIntOption("evaluator_workers", 1); - maxMemorySize = config->getIntOption("evaluator_max_memory_size", 4096); - - initNix(); - initGC(); - - myArgs.parseCmdline(argvToStrings(argc, argv)); - - auto pureEval = config->getBoolOption("evaluator_pure_eval", myArgs.flake); - - /* FIXME: The build hook in conjunction with import-from-derivation is causing "unexpected EOF" during eval */ - settings.builders = ""; - - /* Prevent access to paths outside of the Nix search path and - to the environment. */ - evalSettings.restrictEval = true; - - /* When building a flake, use pure evaluation (no access to - 'getEnv', 'currentSystem' etc. */ - evalSettings.pureEval = pureEval; - - if (myArgs.dryRun) settings.readOnlyMode = true; - - if (myArgs.releaseExpr == "") throw UsageError("no expression specified"); - - if (gcRootsDir == "") printMsg(lvlError, "warning: `--gc-roots-dir' not specified"); - - struct State - { - std::set todo{""}; - std::set active; - nlohmann::json jobs; - std::exception_ptr exc; - }; - - std::condition_variable wakeup; - - Sync state_; - - /* Start a handler thread per worker process. */ - auto handler = [&]() - { - pid_t pid = -1; - try { - AutoCloseFD from, to; - - while (true) { - - /* Start a new worker process if necessary. */ - if (pid == -1) { - Pipe toPipe, fromPipe; - toPipe.create(); - fromPipe.create(); - pid = startProcess( - [&, - to{std::make_shared(std::move(fromPipe.writeSide))}, - from{std::make_shared(std::move(toPipe.readSide))} - ]() - { - try { - auto evalStore = myArgs.evalStoreUrl - ? openStore(*myArgs.evalStoreUrl) - : openStore(); - EvalState state(myArgs.lookupPath, - evalStore, fetchSettings, evalSettings); - Bindings & autoArgs = *myArgs.getAutoArgs(state); - worker(state, autoArgs, *to, *from); - } catch (Error & e) { - nlohmann::json err; - auto msg = e.msg(); - err["error"] = filterANSIEscapes(msg, true); - printError(msg); - writeLine(to->get(), err.dump()); - // Don't forget to print it into the STDERR log, this is - // what's shown in the Hydra UI. - writeLine(to->get(), "restart"); - } - }, - ProcessOptions { .allowVfork = false }); - from = std::move(fromPipe.readSide); - to = std::move(toPipe.writeSide); - debug("created worker process %d", pid); - } - - /* Check whether the existing worker process is still there. */ - auto s = readLine(from.get()); - if (s == "restart") { - pid = -1; - continue; - } else if (s != "next") { - auto json = nlohmann::json::parse(s); - throw Error("worker error: %s", (std::string) json["error"]); - } - - /* Wait for a job name to become available. */ - std::string attrPath; - - while (true) { - checkInterrupt(); - auto state(state_.lock()); - if ((state->todo.empty() && state->active.empty()) || state->exc) { - writeLine(to.get(), "exit"); - return; - } - if (!state->todo.empty()) { - attrPath = *state->todo.begin(); - state->todo.erase(state->todo.begin()); - state->active.insert(attrPath); - break; - } else - state.wait(wakeup); - } - - /* Tell the worker to evaluate it. */ - writeLine(to.get(), "do " + attrPath); - - /* Wait for the response. */ - auto response = nlohmann::json::parse(readLine(from.get())); - - /* Handle the response. */ - StringSet newAttrs; - - if (response.find("job") != response.end()) { - auto state(state_.lock()); - state->jobs[attrPath] = response["job"]; - } - - if (response.find("attrs") != response.end()) { - for (auto & i : response["attrs"]) { - std::string path = i; - if (path.find(".") != std::string::npos){ - path = "\"" + path + "\""; - } - auto s = (attrPath.empty() ? "" : attrPath + ".") + (std::string) path; - newAttrs.insert(s); - } - } - - if (response.find("error") != response.end()) { - auto state(state_.lock()); - state->jobs[attrPath]["error"] = response["error"]; - } - - /* Add newly discovered job names to the queue. */ - { - auto state(state_.lock()); - state->active.erase(attrPath); - for (auto & s : newAttrs) - state->todo.insert(s); - wakeup.notify_all(); - } - } - } catch (...) { - check_pid_status_nonblocking(pid); - auto state(state_.lock()); - state->exc = std::current_exception(); - wakeup.notify_all(); - } - }; - - std::vector threads; - for (size_t i = 0; i < nrWorkers; i++) - threads.emplace_back(std::thread(handler)); - - for (auto & thread : threads) - thread.join(); - - auto state(state_.lock()); - - if (state->exc) - std::rethrow_exception(state->exc); - - /* For aggregate jobs that have named consistuents - (i.e. constituents that are a job name rather than a - derivation), look up the referenced job and add it to the - dependencies of the aggregate derivation. */ - auto store = openStore(); - - for (auto i = state->jobs.begin(); i != state->jobs.end(); ++i) { - auto jobName = i.key(); - auto & job = i.value(); - - auto named = job.find("namedConstituents"); - if (named == job.end()) continue; - - std::unordered_map brokenJobs; - auto getNonBrokenJobOrRecordError = [&brokenJobs, &jobName, &state]( - const std::string & childJobName) -> std::optional { - auto childJob = state->jobs.find(childJobName); - if (childJob == state->jobs.end()) { - printError("aggregate job '%s' references non-existent job '%s'", jobName, childJobName); - brokenJobs[childJobName] = "does not exist"; - return std::nullopt; - } - if (childJob->find("error") != childJob->end()) { - std::string error = (*childJob)["error"]; - printError("aggregate job '%s' references broken job '%s': %s", jobName, childJobName, error); - brokenJobs[childJobName] = error; - return std::nullopt; - } - return *childJob; - }; - - if (myArgs.dryRun) { - for (std::string jobName2 : *named) { - auto job2 = getNonBrokenJobOrRecordError(jobName2); - if (!job2) { - continue; - } - std::string drvPath2 = (*job2)["drvPath"]; - job["constituents"].push_back(drvPath2); - } - } else { - auto drvPath = store->parseStorePath((std::string) job["drvPath"]); - auto drv = store->readDerivation(drvPath); - - for (std::string jobName2 : *named) { - auto job2 = getNonBrokenJobOrRecordError(jobName2); - if (!job2) { - continue; - } - auto drvPath2 = store->parseStorePath((std::string) (*job2)["drvPath"]); - auto drv2 = store->readDerivation(drvPath2); - job["constituents"].push_back(store->printStorePath(drvPath2)); - drv.inputDrvs.map[drvPath2].value = {drv2.outputs.begin()->first}; - } - - if (brokenJobs.empty()) { - std::string drvName(drvPath.name()); - assert(hasSuffix(drvName, drvExtension)); - drvName.resize(drvName.size() - drvExtension.size()); - - auto hashModulo = hashDerivationModulo(*store, drv, true); - if (hashModulo.kind != DrvHash::Kind::Regular) continue; - auto h = hashModulo.hashes.find("out"); - if (h == hashModulo.hashes.end()) continue; - auto outPath = store->makeOutputPath("out", h->second, drvName); - drv.env["out"] = store->printStorePath(outPath); - drv.outputs.insert_or_assign("out", DerivationOutput::InputAddressed { .path = outPath }); - auto newDrvPath = store->printStorePath(writeDerivation(*store, drv)); - - debug("rewrote aggregate derivation %s -> %s", store->printStorePath(drvPath), newDrvPath); - - job["drvPath"] = newDrvPath; - job["outputs"]["out"] = store->printStorePath(outPath); - } - } - - job.erase("namedConstituents"); - - /* Register the derivation as a GC root. !!! This - registers roots for jobs that we may have already - done. */ - auto localStore = store.dynamic_pointer_cast(); - if (gcRootsDir != "" && localStore) { - auto drvPath = job["drvPath"].get(); - Path root = gcRootsDir + "/" + std::string(baseNameOf(drvPath)); - if (!pathExists(root)) - localStore->addPermRoot(localStore->parseStorePath(drvPath), root); - } - - if (!brokenJobs.empty()) { - std::stringstream ss; - for (const auto& [jobName, error] : brokenJobs) { - ss << jobName << ": " << error << "\n"; - } - job["error"] = ss.str(); - } - } - - std::cout << state->jobs.dump(2) << "\n"; - }); -} diff --git a/src/hydra-eval-jobs/meson.build b/src/hydra-eval-jobs/meson.build deleted file mode 100644 index 916212e16..000000000 --- a/src/hydra-eval-jobs/meson.build +++ /dev/null @@ -1,8 +0,0 @@ -hydra_eval_jobs = executable('hydra-eval-jobs', - 'hydra-eval-jobs.cc', - dependencies: [ - libhydra_dep, - nix_dep, - ], - install: true, -) diff --git a/src/meson.build b/src/meson.build index 8c7562ed2..52b821bc1 100644 --- a/src/meson.build +++ b/src/meson.build @@ -1,6 +1,5 @@ # Native code subdir('libhydra') -subdir('hydra-eval-jobs') subdir('hydra-evaluator') subdir('hydra-queue-runner') diff --git a/src/script/hydra-eval-jobset b/src/script/hydra-eval-jobset index 72a386f57..6dc1514b0 100755 --- a/src/script/hydra-eval-jobset +++ b/src/script/hydra-eval-jobset @@ -17,6 +17,7 @@ use Hydra::Helper::Nix; use Hydra::Model::DB; use Hydra::Plugin; use Hydra::Schema; +use IPC::Run; use JSON::MaybeXS; use Net::Statsd; use Nix::Store; @@ -357,22 +358,24 @@ sub evalJobs { my @cmd; if (defined $flakeRef) { - @cmd = ("hydra-eval-jobs", - "--flake", $flakeRef, - "--gc-roots-dir", getGCRootsDir, - "--max-jobs", 1); + @cmd = ("nix-eval-jobs", "--flake", $flakeRef . '#hydraJobs'); } else { my $nixExprInput = $inputInfo->{$nixExprInputName}->[0] or die "cannot find the input containing the job expression\n"; - @cmd = ("hydra-eval-jobs", + @cmd = ("nix-eval-jobs", "<" . $nixExprInputName . "/" . $nixExprPath . ">", - "--gc-roots-dir", getGCRootsDir, - "--max-jobs", 1, inputsToArgs($inputInfo)); } - push @cmd, "--no-allow-import-from-derivation" if $config->{allow_import_from_derivation} // "true" ne "true"; + push @cmd, ("--gc-roots-dir", getGCRootsDir); + push @cmd, ("--max-jobs", 1); + push @cmd, "--meta"; + push @cmd, "--constituents"; + push @cmd, "--force-recurse"; + push @cmd, ("--option", "allow-import-from-derivation", "false") if $config->{allow_import_from_derivation} // "true" ne "true"; + push @cmd, ("--workers", $config->{evaluator_workers} // 1); + push @cmd, ("--max-memory-size", $config->{evaluator_max_memory_size} // 4096); if (defined $ENV{'HYDRA_DEBUG'}) { sub escape { @@ -384,14 +387,33 @@ sub evalJobs { print STDERR "evaluator: @escaped\n"; } - (my $res, my $jobsJSON, my $stderr) = captureStdoutStderr(21600, @cmd); - die "hydra-eval-jobs returned " . ($res & 127 ? "signal $res" : "exit code " . ($res >> 8)) - . ":\n" . ($stderr ? decode("utf-8", $stderr) : "(no output)\n") - if $res; + my $evalProc = IPC::Run::start \@cmd, + '>', IPC::Run::new_chunker, \my $out, + '2>', \my $err; + + return sub { + while (1) { + $evalProc->pump; + if (!defined $out && !defined $err) { + $evalProc->finish; + if ($?) { + die "nix-eval-jobs returned " . ($? & 127 ? "signal $?" : "exit code " . ($? >> 8)) . "\n"; + } + return; + } - print STDERR "$stderr"; + if (defined $err) { + print STDERR "$err"; + undef $err; + } - return decode_json($jobsJSON); + if (defined $out && $out ne '') { + my $job = decode_json($out); + undef $out; + return $job; + } + } + }; } @@ -420,7 +442,7 @@ sub checkBuild { my $firstOutputName = $outputNames[0]; my $firstOutputPath = $buildInfo->{outputs}->{$firstOutputName}; - my $jobName = $buildInfo->{jobName} or die; + my $jobName = $buildInfo->{attr} or die; my $drvPath = $buildInfo->{drvPath} or die; my $build; @@ -474,9 +496,30 @@ sub checkBuild { my $time = time(); - sub null { - my ($s) = @_; - return $s eq "" ? undef : $s; + sub getMeta { + my ($s, $def) = @_; + return ($s || "") eq "" ? $def : $s; + } + + sub getMetaStrings { + my ($v, $k, $acc) = @_; + my $t = ref $v; + + if ($t eq 'HASH') { + push @$acc, $v->{$k} if exists $v->{$k}; + } elsif ($t eq 'ARRAY') { + getMetaStrings($_, $k, $acc) foreach @$v; + } elsif (defined $v) { + push @$acc, $v; + } + } + + sub getMetaConcatStrings { + my ($v, $k) = @_; + + my @strings; + getMetaStrings($v, $k, \@strings); + return join(", ", @strings) || undef; } # Add the build to the database. @@ -484,19 +527,19 @@ sub checkBuild { { timestamp => $time , jobset_id => $jobset->id , job => $jobName - , description => null($buildInfo->{description}) - , license => null($buildInfo->{license}) - , homepage => null($buildInfo->{homepage}) - , maintainers => null($buildInfo->{maintainers}) - , maxsilent => $buildInfo->{maxSilent} - , timeout => $buildInfo->{timeout} - , nixname => $buildInfo->{nixName} + , description => getMeta($buildInfo->{meta}->{description}, undef) + , license => getMetaConcatStrings($buildInfo->{meta}->{license}, "shortName") + , homepage => getMeta($buildInfo->{meta}->{homepage}, undef) + , maintainers => getMetaConcatStrings($buildInfo->{meta}->{maintainers}, "email") + , maxsilent => getMeta($buildInfo->{meta}->{maxSilent}, 7200) + , timeout => getMeta($buildInfo->{meta}->{timeout}, 36000) + , nixname => $buildInfo->{name} , drvpath => $drvPath , system => $buildInfo->{system} - , priority => $buildInfo->{schedulingPriority} + , priority => getMeta($buildInfo->{meta}->{schedulingPriority}, 100) , finished => 0 , iscurrent => 1 - , ischannel => $buildInfo->{isChannel} + , ischannel => getMeta($buildInfo->{meta}->{isChannel}, 0) }); $build->buildoutputs->create({ name => $_, path => $buildInfo->{outputs}->{$_} }) @@ -665,7 +708,7 @@ sub checkJobsetWrapped { return; } - # Hash the arguments to hydra-eval-jobs and check the + # Hash the arguments to nix-eval-jobs and check the # JobsetInputHashes to see if the previous evaluation had the same # inputs. If so, bail out. my @args = ($jobset->nixexprinput // "", $jobset->nixexprpath // "", inputsToArgs($inputInfo)); @@ -687,19 +730,12 @@ sub checkJobsetWrapped { # Evaluate the job expression. my $evalStart = clock_gettime(CLOCK_MONOTONIC); - my $jobs = evalJobs($project->name . ":" . $jobset->name, $inputInfo, $jobset->nixexprinput, $jobset->nixexprpath, $flakeRef); - my $evalStop = clock_gettime(CLOCK_MONOTONIC); - - if ($jobsetsJobset) { - my @keys = keys %$jobs; - die "The .jobsets jobset must only have a single job named 'jobsets'" - unless (scalar @keys) == 1 && $keys[0] eq "jobsets"; - } - Net::Statsd::timing("hydra.evaluator.eval_time", int(($evalStop - $evalStart) * 1000)); + my $evalStop; + my $jobsIter = evalJobs($project->name . ":" . $jobset->name, $inputInfo, $jobset->nixexprinput, $jobset->nixexprpath, $flakeRef); if ($dryRun) { - foreach my $name (keys %{$jobs}) { - my $job = $jobs->{$name}; + while (defined(my $job = $jobsIter->())) { + my $name = $job->{attr}; if (defined $job->{drvPath}) { print STDERR "good job $name: $job->{drvPath}\n"; } else { @@ -709,36 +745,20 @@ sub checkJobsetWrapped { return; } - die "Jobset contains a job with an empty name. Make sure the jobset evaluates to an attrset of jobs.\n" - if defined $jobs->{""}; - - $jobs->{$_}->{jobName} = $_ for keys %{$jobs}; - - my $jobOutPathMap = {}; - my $jobsetChanged = 0; - my $dbStart = clock_gettime(CLOCK_MONOTONIC); - - # Store the error messages for jobs that failed to evaluate. my $evaluationErrorTime = time; my $evaluationErrorMsg = ""; - foreach my $job (values %{$jobs}) { - next unless defined $job->{error}; - $evaluationErrorMsg .= - ($job->{jobName} ne "" ? "in job ‘$job->{jobName}’" : "at top-level") . - ":\n" . $job->{error} . "\n\n"; - } - setJobsetError($jobset, $evaluationErrorMsg, $evaluationErrorTime); - my $evaluationErrorRecord = $db->resultset('EvaluationErrors')->create( { errormsg => $evaluationErrorMsg , errortime => $evaluationErrorTime } ); + my $jobOutPathMap = {}; + my $jobsetChanged = 0; my %buildMap; - $db->txn_do(sub { + $db->txn_do(sub { my $prevEval = getPrevJobsetEval($db, $jobset, 1); # Clear the "current" flag on all builds. Since we're in a @@ -751,7 +771,7 @@ sub checkJobsetWrapped { , evaluationerror => $evaluationErrorRecord , timestamp => time , checkouttime => abs(int($checkoutStop - $checkoutStart)) - , evaltime => abs(int($evalStop - $evalStart)) + , evaltime => 0 , hasnewbuilds => 0 , nrbuilds => 0 , flake => $flakeRef @@ -759,11 +779,24 @@ sub checkJobsetWrapped { , nixexprpath => $jobset->nixexprpath }); - # Schedule each successfully evaluated job. - foreach my $job (permute(values %{$jobs})) { - next if defined $job->{error}; - #print STDERR "considering job " . $project->name, ":", $jobset->name, ":", $job->{jobName} . "\n"; - checkBuild($db, $jobset, $ev, $inputInfo, $job, \%buildMap, $prevEval, $jobOutPathMap, $plugins); + my @jobsWithConstituents; + + while (defined(my $job = $jobsIter->())) { + if ($jobsetsJobset) { + die "The .jobsets jobset must only have a single job named 'jobsets'" + unless $job->{attr} eq "jobsets"; + } + + $evaluationErrorMsg .= + ($job->{attr} ne "" ? "in job ‘$job->{attr}’" : "at top-level") . + ":\n" . $job->{error} . "\n\n" if defined $job->{error}; + + checkBuild($db, $jobset, $ev, $inputInfo, $job, \%buildMap, $prevEval, $jobOutPathMap, $plugins) + unless defined $job->{error}; + + if (defined $job->{constituents}) { + push @jobsWithConstituents, $job; + } } # Have any builds been added or removed since last time? @@ -795,27 +828,26 @@ sub checkJobsetWrapped { my $x = $buildMap{$id}; my $y = $drvPathToId{$x->{drvPath}}; if (defined $y) { - next if length $x->{jobName} > length $y->{jobName}; - next if length $x->{jobName} == length $y->{jobName} && $x->{jobName} ge $y->{jobName}; + next if length $x->{attr} > length $y->{attr}; + next if length $x->{attr} == length $y->{attr} && $x->{attr} ge $y->{attr}; } $drvPathToId{$x->{drvPath}} = $x; } - foreach my $job (values %{$jobs}) { - next unless $job->{constituents}; - + foreach my $job (values @jobsWithConstituents) { + next unless defined $job->{constituents}; if (defined $job->{error}) { - die "aggregate job ‘$job->{jobName}’ failed with the error: $job->{error}\n"; + die "aggregate job ‘$job->{attr}’ failed with the error: $job->{error}\n"; } my $x = $drvPathToId{$job->{drvPath}} or - die "aggregate job ‘$job->{jobName}’ has no corresponding build record.\n"; + die "aggregate job ‘$job->{attr}’ has no corresponding build record.\n"; foreach my $drvPath (@{$job->{constituents}}) { my $constituent = $drvPathToId{$drvPath}; if (defined $constituent) { $db->resultset('AggregateConstituents')->update_or_create({aggregate => $x->{id}, constituent => $constituent->{id}}); } else { - warn "aggregate job ‘$job->{jobName}’ has a constituent ‘$drvPath’ that doesn't correspond to a Hydra build\n"; + warn "aggregate job ‘$job->{attr}’ has a constituent ‘$drvPath’ that doesn't correspond to a Hydra build\n"; } } } @@ -857,11 +889,15 @@ sub checkJobsetWrapped { $jobset->update({ enabled => 0 }) if $jobset->enabled == 2; $jobset->update({ lastcheckedtime => time, forceeval => undef }); - }); - my $dbStop = clock_gettime(CLOCK_MONOTONIC); + $evaluationErrorRecord->update({ errormsg => $evaluationErrorMsg }); + setJobsetError($jobset, $evaluationErrorMsg, $evaluationErrorTime); - Net::Statsd::timing("hydra.evaluator.db_time", int(($dbStop - $dbStart) * 1000)); + $evalStop = clock_gettime(CLOCK_MONOTONIC); + $ev->update({ evaltime => abs(int($evalStop - $evalStart)) }); + }); + + Net::Statsd::timing("hydra.evaluator.eval_time", int(($evalStop - $evalStart) * 1000)); Net::Statsd::increment("hydra.evaluator.evals"); Net::Statsd::increment("hydra.evaluator.cached_evals") unless $jobsetChanged; } diff --git a/t/evaluator/evaluate-constituents-broken.t b/t/evaluator/evaluate-constituents-broken.t index ed25d192d..0e5960bf6 100644 --- a/t/evaluator/evaluate-constituents-broken.t +++ b/t/evaluator/evaluate-constituents-broken.t @@ -18,14 +18,14 @@ isnt($res, 0, "hydra-eval-jobset exits non-zero"); ok(utf8::decode($stderr), "Stderr output is UTF8-clean"); like( $stderr, - qr/aggregate job ‘mixed_aggregate’ failed with the error: constituentA: does not exist/, + qr/aggregate job ‘mixed_aggregate’ failed with the error: "constituentA": does not exist/, "The stderr record includes a relevant error message" ); -$jobset->discard_changes; # refresh from DB +$jobset->discard_changes({ '+columns' => {'errormsg' => 'errormsg'} }); # refresh from DB like( $jobset->errormsg, - qr/aggregate job ‘mixed_aggregate’ failed with the error: constituentA: does not exist/, + qr/aggregate job ‘mixed_aggregate’ failed with the error: "constituentA": does not exist/, "The jobset records a relevant error message" ); diff --git a/t/evaluator/evaluate-meta.t b/t/evaluator/evaluate-meta.t new file mode 100644 index 000000000..9f546a7f1 --- /dev/null +++ b/t/evaluator/evaluate-meta.t @@ -0,0 +1,22 @@ +use feature 'unicode_strings'; +use strict; +use warnings; +use Setup; +use Test2::V0; + +my $ctx = test_context(); + +my $builds = $ctx->makeAndEvaluateJobset( + expression => "meta.nix", + build => 1 +); + +my $build = $builds->{"full-of-meta"}; + +is($build->finished, 1, "Build should be finished."); +is($build->description, "This is the description of the job.", "Wrong description extracted from the build."); +is($build->license, "MIT, BSD", "Wrong licenses extracted from the build."); +is($build->homepage, "https://example.com/", "Wrong homepage extracted from the build."); +is($build->maintainers, 'alice@example.com, bob@not.found', "Wrong maintainers extracted from the build."); + +done_testing; diff --git a/t/jobs/meta.nix b/t/jobs/meta.nix new file mode 100644 index 000000000..9204e3843 --- /dev/null +++ b/t/jobs/meta.nix @@ -0,0 +1,17 @@ +with import ./config.nix; +{ + full-of-meta = + mkDerivation { + name = "full-of-meta"; + builder = ./empty-dir-builder.sh; + + meta = { + description = "This is the description of the job."; + license = [ { shortName = "MIT"; } "BSD" ]; + homepage = "https://example.com/"; + maintainers = [ "alice@example.com" { email = "bob@not.found"; } ]; + + outPath = "${placeholder "out"}"; + }; + }; +} diff --git a/t/meson.build b/t/meson.build index 11044a03e..c3c58458d 100644 --- a/t/meson.build +++ b/t/meson.build @@ -27,7 +27,6 @@ testenv.prepend('PERL5LIB', separator: ':' ) testenv.prepend('PATH', - fs.parent(hydra_eval_jobs.full_path()), fs.parent(hydra_evaluator.full_path()), fs.parent(hydra_queue_runner.full_path()), meson.project_source_root() / 'src/script', diff --git a/t/queue-runner/constituents.t b/t/queue-runner/constituents.t index c63336420..e1b8d733f 100644 --- a/t/queue-runner/constituents.t +++ b/t/queue-runner/constituents.t @@ -22,11 +22,11 @@ is(nrQueuedBuildsForJobset($jobset), 0, "Evaluating jobs/broken-constituent.nix like( $jobset->errormsg, - qr/^does-not-exist: does not exist$/m, + qr/^"does-not-exist": does not exist$/m, "Evaluating jobs/broken-constituent.nix should log an error for does-not-exist"); like( $jobset->errormsg, - qr/^does-not-evaluate: error: assertion 'false' failed$/m, + qr/^"does-not-evaluate": "error: assertion 'false' failed/m, "Evaluating jobs/broken-constituent.nix should log an error for does-not-evaluate"); done_testing;