From 097f326eb96b7a4ff0b5d9377a49b4fae70dbc85 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Fri, 26 Apr 2024 05:41:26 +0000 Subject: [PATCH] planner: introduce SPOT VMs policy --- .../faabric/batch-scheduler/BatchScheduler.h | 5 + .../faabric/batch-scheduler/SpotScheduler.h | 33 ++ include/faabric/planner/Planner.h | 8 + include/faabric/planner/PlannerState.h | 18 + include/faabric/util/func.h | 9 + src/batch-scheduler/BatchScheduler.cpp | 3 + src/batch-scheduler/CMakeLists.txt | 1 + src/batch-scheduler/CompactScheduler.cpp | 8 +- src/batch-scheduler/SpotScheduler.cpp | 329 +++++++++++++ src/executor/Executor.cpp | 29 +- src/planner/Planner.cpp | 135 +++++- src/planner/PlannerEndpointHandler.cpp | 28 ++ src/planner/planner.proto | 5 + src/scheduler/Scheduler.cpp | 9 + tests/dist/mpi/mpi_native.cpp | 38 +- tests/dist/mpi/test_multiple_mpi_worlds.cpp | 11 + .../batch-scheduler/test_spot_scheduler.cpp | 448 ++++++++++++++++++ tests/test/planner/test_planner_endpoint.cpp | 62 +++ tests/utils/fixtures.h | 6 + 19 files changed, 1169 insertions(+), 16 deletions(-) create mode 100644 include/faabric/batch-scheduler/SpotScheduler.h create mode 100644 src/batch-scheduler/SpotScheduler.cpp create mode 100644 tests/test/batch-scheduler/test_spot_scheduler.cpp diff --git a/include/faabric/batch-scheduler/BatchScheduler.h b/include/faabric/batch-scheduler/BatchScheduler.h index fd9974e7c..be3ceef02 100644 --- a/include/faabric/batch-scheduler/BatchScheduler.h +++ b/include/faabric/batch-scheduler/BatchScheduler.h @@ -12,6 +12,11 @@ #define NOT_ENOUGH_SLOTS_DECISION \ faabric::batch_scheduler::SchedulingDecision(NOT_ENOUGH_SLOTS, \ NOT_ENOUGH_SLOTS) +#define MUST_FREEZE -97 +#define MUST_FREEZE_DECISION \ + faabric::batch_scheduler::SchedulingDecision(MUST_FREEZE, MUST_FREEZE) + +#define MUST_EVICT_IP "E.VI.CT.ME" namespace faabric::batch_scheduler { diff --git a/include/faabric/batch-scheduler/SpotScheduler.h b/include/faabric/batch-scheduler/SpotScheduler.h new file mode 100644 index 000000000..ff4db3e92 --- /dev/null +++ b/include/faabric/batch-scheduler/SpotScheduler.h @@ -0,0 +1,33 @@ +#pragma once + +#include +#include +#include + +namespace faabric::batch_scheduler { + +// This batch scheduler behaves in the same way than BinPack for NEW and +// SCALE_CHANGE requests, but for DIST_CHANGE it considers if any of the +// hosts in the Host Map have been tainted with the eviction mark. In which +// case it first tries to migrate them to other running hosts and, if not +// enough hosts are available, freezes the messages. +class SpotScheduler final : public BatchScheduler +{ + public: + std::shared_ptr makeSchedulingDecision( + HostMap& hostMap, + const InFlightReqs& inFlightReqs, + std::shared_ptr req) override; + + private: + bool isFirstDecisionBetter( + std::shared_ptr decisionA, + std::shared_ptr decisionB) override; + + std::vector getSortedHosts( + HostMap& hostMap, + const InFlightReqs& inFlightReqs, + std::shared_ptr req, + const DecisionType& decisionType) override; +}; +} diff --git a/include/faabric/planner/Planner.h b/include/faabric/planner/Planner.h index 0c28aeeee..ce977017e 100644 --- a/include/faabric/planner/Planner.h +++ b/include/faabric/planner/Planner.h @@ -33,6 +33,8 @@ class Planner void printConfig() const; + std::string getPolicy(); + void setPolicy(const std::string& newPolicy); // ---------- @@ -91,6 +93,12 @@ class Planner std::shared_ptr callBatch( std::shared_ptr req); + // ---------- + // API exclusive to SPOT policy mode + // ---------- + + void setNextEvictedVm(const std::string& vmIp); + private: // There's a singleton instance of the planner running, but it must allow // concurrent requests diff --git a/include/faabric/planner/PlannerState.h b/include/faabric/planner/PlannerState.h index 92f77006a..0b5e7d97b 100644 --- a/include/faabric/planner/PlannerState.h +++ b/include/faabric/planner/PlannerState.h @@ -12,6 +12,10 @@ namespace faabric::planner { */ struct PlannerState { + // Policy to operate the planner in. Mostly determins the batch scheduler + // behaviour, but also the planner's in some cases + std::string policy; + // Accounting of the hosts that are registered in the system and responsive // We deliberately use the host's IP as unique key, but assign a unique host // id for redundancy @@ -36,5 +40,19 @@ struct PlannerState // Helper coutner of the total number of migrations std::atomic numMigrations = 0; + + // ----- + // Data structures used only under the SPOT policy + // ----- + + // Map containing the BER that have been evicted due to a SPOT VM eviction. + // All messages in the VM have been checkpointed, are in the snapshot + // registry in the planner, and are ready to be scheduled when capacity + // appears + std::map> evictedRequests; + + // This variable simulates the values we would get from a cloud provider's + // API indicating the (set of) VM to be evicted next + std::string nextEvictedHostIp; }; } diff --git a/include/faabric/util/func.h b/include/faabric/util/func.h index e8ec5b93e..35aa1ef1b 100644 --- a/include/faabric/util/func.h +++ b/include/faabric/util/func.h @@ -6,9 +6,18 @@ #include #define MIGRATED_FUNCTION_RETURN_VALUE -99 +#define FROZEN_FUNCTION_RETURN_VALUE -98 namespace faabric::util { +class FunctionFrozenException : public faabric::util::FaabricException +{ + public: + explicit FunctionFrozenException(std::string message) + : FaabricException(std::move(message)) + {} +}; + class FunctionMigratedException : public faabric::util::FaabricException { public: diff --git a/src/batch-scheduler/BatchScheduler.cpp b/src/batch-scheduler/BatchScheduler.cpp index 58dc27d35..416a595d4 100644 --- a/src/batch-scheduler/BatchScheduler.cpp +++ b/src/batch-scheduler/BatchScheduler.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include @@ -23,6 +24,8 @@ std::shared_ptr getBatchScheduler() batchScheduler = std::make_shared(); } else if (mode == "compact") { batchScheduler = std::make_shared(); + } else if (mode == "spot") { + batchScheduler = std::make_shared(); } else { SPDLOG_ERROR("Unrecognised batch scheduler mode: {}", mode); throw std::runtime_error("Unrecognised batch scheduler mode"); diff --git a/src/batch-scheduler/CMakeLists.txt b/src/batch-scheduler/CMakeLists.txt index 7a73ddcfa..79ebdd789 100644 --- a/src/batch-scheduler/CMakeLists.txt +++ b/src/batch-scheduler/CMakeLists.txt @@ -7,6 +7,7 @@ faabric_lib(batch_scheduler BatchScheduler.cpp BinPackScheduler.cpp CompactScheduler.cpp + SpotScheduler.cpp ) target_link_libraries(batch_scheduler PRIVATE diff --git a/src/batch-scheduler/CompactScheduler.cpp b/src/batch-scheduler/CompactScheduler.cpp index f37270dc4..b623afe18 100644 --- a/src/batch-scheduler/CompactScheduler.cpp +++ b/src/batch-scheduler/CompactScheduler.cpp @@ -98,7 +98,7 @@ bool CompactScheduler::isFirstDecisionBetter( throw std::runtime_error("Method not supported for COMPACT scheduler"); } -HostMap deepCopyHostMap(const HostMap& hostMap) +static HostMap deepCopyHostMap(const HostMap& hostMap) { HostMap newHostMap; @@ -173,9 +173,9 @@ bool CompactScheduler::isFirstDecisionBetter( // Filter-out from the host map all nodes that are executing requests from a // different user -void filterHosts(HostMap& hostMap, - const InFlightReqs& inFlightReqs, - std::shared_ptr req) +static void filterHosts(HostMap& hostMap, + const InFlightReqs& inFlightReqs, + std::shared_ptr req) { // We temporarily use the request subtype field to attach a user id for our // multi-tenant simulations diff --git a/src/batch-scheduler/SpotScheduler.cpp b/src/batch-scheduler/SpotScheduler.cpp new file mode 100644 index 000000000..2b21ea41a --- /dev/null +++ b/src/batch-scheduler/SpotScheduler.cpp @@ -0,0 +1,329 @@ +#include +#include +#include +#include + +namespace faabric::batch_scheduler { + +static std::map getHostFreqCount( + std::shared_ptr decision) +{ + std::map hostFreqCount; + for (auto host : decision->hosts) { + hostFreqCount[host] += 1; + } + + return hostFreqCount; +} + +// Given a new decision that improves on an old decision (i.e. to migrate), we +// want to make sure that we minimise the number of migration requests we send. +// This is, we want to keep as many host-message scheduling in the old decision +// as possible, and also have the overall locality of the new decision (i.e. +// the host-message histogram) +// NOTE: keep in mind that the newDecision has the right host histogram, but +// the messages may be completely out-of-order +static std::shared_ptr minimiseNumOfMigrations( + std::shared_ptr newDecision, + std::shared_ptr oldDecision) +{ + auto decision = std::make_shared(oldDecision->appId, + oldDecision->groupId); + + // We want to maintain the new decision's host-message histogram + auto hostFreqCount = getHostFreqCount(newDecision); + + // Helper function to find the next host in the histogram with slots + auto nextHostWithSlots = [&hostFreqCount]() -> std::string { + for (auto [ip, slots] : hostFreqCount) { + if (slots > 0) { + return ip; + } + } + + // Unreachable (in this context) + throw std::runtime_error("No next host with slots found!"); + }; + + assert(newDecision->hosts.size() == oldDecision->hosts.size()); + + // First we try to allocate to each message the same host they used to have + for (int i = 0; i < oldDecision->hosts.size(); i++) { + auto oldHost = oldDecision->hosts.at(i); + + if (hostFreqCount.contains(oldHost) && hostFreqCount.at(oldHost) > 0) { + decision->addMessageInPosition(i, + oldHost, + oldDecision->messageIds.at(i), + oldDecision->appIdxs.at(i), + oldDecision->groupIdxs.at(i), + oldDecision->mpiPorts.at(i)); + + hostFreqCount.at(oldHost) -= 1; + } + } + + // Second we allocate the rest + for (int i = 0; i < oldDecision->hosts.size(); i++) { + if (decision->nFunctions <= i || decision->hosts.at(i).empty()) { + + auto nextHost = nextHostWithSlots(); + decision->addMessageInPosition(i, + nextHost, + oldDecision->messageIds.at(i), + oldDecision->appIdxs.at(i), + oldDecision->groupIdxs.at(i), + -1); + + hostFreqCount.at(nextHost) -= 1; + } + } + + // Assert that we have preserved the new decision's host-message histogram + // (use the pre-processor macro as we assert repeatedly in the loop, so we + // want to avoid having an empty loop in non-debug mode) +#ifndef NDEBUG + for (auto [host, freq] : hostFreqCount) { + assert(freq == 0); + } +#endif + + return decision; +} + +bool SpotScheduler::isFirstDecisionBetter( + std::shared_ptr decisionA, + std::shared_ptr decisionB) +{ + throw std::runtime_error("Method not supported for COMPACT scheduler"); +} + +// Filter-out from the host map the next VM that will be evicted +static std::string filterHosts(HostMap& hostMap) +{ + std::string ipToRemove; + + for (const auto& [hostIp, host] : hostMap) { + if (host->ip == MUST_EVICT_IP) { + ipToRemove = hostIp; + } + } + + hostMap.erase(ipToRemove); + + return ipToRemove; +} + +std::vector SpotScheduler::getSortedHosts( + HostMap& hostMap, + const InFlightReqs& inFlightReqs, + std::shared_ptr req, + const DecisionType& decisionType) +{ + std::vector sortedHosts; + for (auto [ip, host] : hostMap) { + sortedHosts.push_back(host); + } + + std::shared_ptr oldDecision = nullptr; + std::map hostFreqCount; + if (decisionType != DecisionType::NEW) { + oldDecision = inFlightReqs.at(req->appid()).second; + hostFreqCount = getHostFreqCount(oldDecision); + } + + auto isFirstHostLarger = [&](const Host& hostA, const Host& hostB) -> bool { + // The SPOT scheduler sorts hosts by number of available slots + int nAvailableA = numSlotsAvailable(hostA); + int nAvailableB = numSlotsAvailable(hostB); + if (nAvailableA != nAvailableB) { + return nAvailableA > nAvailableB; + } + + // In case of a tie, it will pick larger hosts first + int nSlotsA = numSlots(hostA); + int nSlotsB = numSlots(hostB); + if (nSlotsA != nSlotsB) { + return nSlotsA > nSlotsB; + } + + // Lastly, in case of a tie, return the largest host alphabetically + return getIp(hostA) > getIp(hostB); + }; + + auto isFirstHostLargerWithFreq = [&](auto hostA, auto hostB) -> bool { + // When updating an existing scheduling decision (SCALE_CHANGE or + // DIST_CHANGE), the SPOT scheduler takes into consideration the + // existing host-message histogram (i.e. how many messages for this app + // does each host _already_ run). This behaviour is the same than the + // BIN_PACK and COMPACT policies + + int numInHostA = hostFreqCount.contains(getIp(hostA)) + ? hostFreqCount.at(getIp(hostA)) + : 0; + int numInHostB = hostFreqCount.contains(getIp(hostB)) + ? hostFreqCount.at(getIp(hostB)) + : 0; + + // If at least one of the hosts has messages for this request, return + // the host with the more messages for this request (note that it is + // possible that this host has no available slots at all, in this case + // we will just pack 0 messages here but we still want to sort it first + // nontheless) + if (numInHostA != numInHostB) { + return numInHostA > numInHostB; + } + + // In case of a tie, use the same criteria than NEW + return isFirstHostLarger(hostA, hostB); + }; + + switch (decisionType) { + case DecisionType::NEW: { + // For a NEW decision type, the SPOT scheduler just sorts the + // hosts in decreasing order of capacity, and bin-packs messages + // to hosts in this order. This has one caveat that it skips the + // next VM that we know will be evicted + std::sort( + sortedHosts.begin(), sortedHosts.end(), isFirstHostLarger); + + break; + } + case DecisionType::SCALE_CHANGE: { + // If we are changing the scale of a running app (i.e. via chaining + // or thread/process forking) we want to prioritise co-locating + // as much as possible. This means that we will sort first by the + // frequency of messages of the running app, and second with the + // same criteria than NEW + // IMPORTANT: a SCALE_CHANGE request with 4 messages means that we + // want to add 4 NEW messages to the running app (not that the new + // total count is 4) + std::sort(sortedHosts.begin(), + sortedHosts.end(), + isFirstHostLargerWithFreq); + + break; + } + case DecisionType::DIST_CHANGE: { + // A DIST_CHANGE with the SPOT scheduler means that, if the app + // is running any messages of the to-be-evicted VM, we must move + // them from there. Two things may happen: + // * We have slots to move them-to (equivalent to re-scheduling + // from scratch without the tainted VM) + // * We do not have slots to move them-to, in which case all + // messages need to freeze until there is capacity in the cluster + // again + + auto oldDecision = inFlightReqs.at(req->appid()).second; + auto hostFreqCount = getHostFreqCount(oldDecision); + + // First remove the slots the app occupies to have a fresh new + // shot at the scheduling + for (auto host : sortedHosts) { + if (hostFreqCount.contains(getIp(host))) { + freeSlots(host, hostFreqCount.at(getIp(host))); + } + } + + // Try to schedule again without the tainted VM. Note that this + // app may not be using the tainted VM _at all_ in which case we + // will just discard the suggested migration. + std::sort(sortedHosts.begin(), + sortedHosts.end(), + isFirstHostLargerWithFreq); + + break; + } + default: { + SPDLOG_ERROR("Unrecognised decision type: {}", decisionType); + throw std::runtime_error("Unrecognised decision type"); + } + } + + return sortedHosts; +} + +// The BinPack's scheduler decision algorithm is very simple. It first sorts +// hosts (i.e. bins) in a specific order (depending on the scheduling type), +// and then starts filling bins from begining to end, until it runs out of +// messages to schedule. The SPOT scheduler behaves as the BinPack for +// NEW and SCALE_CHANGE requests, with two caveats: +// - it avoids setting any messages to a host that is going to be evicted +// - when migrating, it will check if the migration candidate has any messages +// running in the to-be-evicted VM. If so, it will try to migrate messages +// away from the evicted-to-VM. If it cannot, it will request the app to +// INTERRUPT +std::shared_ptr SpotScheduler::makeSchedulingDecision( + HostMap& hostMap, + const InFlightReqs& inFlightReqs, + std::shared_ptr req) +{ + auto decision = std::make_shared(req->appid(), 0); + + // Filter the hosts removing the VM that will be evicted next + std::string evictedHostIp = filterHosts(hostMap); + + // Get the sorted list of hosts + auto decisionType = getDecisionType(inFlightReqs, req); + auto sortedHosts = getSortedHosts(hostMap, inFlightReqs, req, decisionType); + + // Assign slots from the list (i.e. bin-pack) + auto itr = sortedHosts.begin(); + int numLeftToSchedule = req->messages_size(); + int msgIdx = 0; + while (itr < sortedHosts.end()) { + // Calculate how many slots can we assign to this host (assign as many + // as possible) + int numOnThisHost = + std::min(numLeftToSchedule, numSlotsAvailable(*itr)); + for (int i = 0; i < numOnThisHost; i++) { + decision->addMessage(getIp(*itr), req->messages(msgIdx)); + msgIdx++; + } + + // Update the number of messages left to schedule + numLeftToSchedule -= numOnThisHost; + + // If there are no more messages to schedule, we are done + if (numLeftToSchedule == 0) { + break; + } + + // Otherwise, it means that we have exhausted this host, and need to + // check in the next one + itr++; + } + + bool isDistChange = decisionType == DecisionType::DIST_CHANGE; + + // If we still have enough slots to schedule, we are out of slots + if (numLeftToSchedule > 0 && !isDistChange) { + return std::make_shared(NOT_ENOUGH_SLOTS_DECISION); + } + + if (isDistChange) { + // If we ran out of slots whilst processing a migration request it + // means that we have some messages running in the to-be-evicted VM + // and we can not migrate them elsewhere. In this case we must FREEZE + // all messages + if (numLeftToSchedule > 0) { + return std::make_shared(MUST_FREEZE_DECISION); + } + + // Check if we are running any messages in the to-be evicted VM. Only + // migrate if we are + auto oldDecision = inFlightReqs.at(req->appid()).second; + for (const auto& hostIp : oldDecision->hosts) { + if (hostIp == evictedHostIp) { + // If we are requesting a migration, make sure that we minimise + // the number of messages to actuall migrate + return minimiseNumOfMigrations(decision, oldDecision); + } + } + + return std::make_shared(DO_NOT_MIGRATE_DECISION); + } + + return decision; +} +} diff --git a/src/executor/Executor.cpp b/src/executor/Executor.cpp index 32a91d776..c58f2093b 100644 --- a/src/executor/Executor.cpp +++ b/src/executor/Executor.cpp @@ -414,6 +414,32 @@ void Executor::threadPoolThread(std::stop_token st, int threadPoolIdx) } } } + } catch (const faabric::util::FunctionFrozenException& ex) { + SPDLOG_DEBUG( + "Task {} frozen, shutting down executor {}", msg.id(), id); + + returnValue = FROZEN_FUNCTION_RETURN_VALUE; + + // TODO: maybe we do not need this here as we know this VM will be + // destroyed soon, but we do it nontheless just in case + if (msg.ismpi()) { + auto& mpiWorldRegistry = faabric::mpi::getMpiWorldRegistry(); + if (mpiWorldRegistry.worldExists(msg.mpiworldid())) { + bool mustClear = + mpiWorldRegistry.getWorld(msg.mpiworldid()).destroy(); + + if (mustClear) { + SPDLOG_DEBUG("{}:{}:{} clearing world {} from host {}", + msg.appid(), + msg.groupid(), + msg.groupidx(), + msg.mpiworldid(), + msg.executedhost()); + + mpiWorldRegistry.clearWorld(msg.mpiworldid()); + } + } + } } catch (const std::exception& ex) { returnValue = 1; @@ -482,8 +508,7 @@ void Executor::threadPoolThread(std::stop_token st, int threadPoolIdx) // main host we still have the zero-th thread executing) auto mainThreadSnapKey = faabric::util::getMainThreadSnapshotKey(msg); std::vector diffs; - // FIXME: thread 0 locally is not part of this batch, but is still - // in the same executor + bool isRemoteThread = task.req->messages(0).mainhost() != conf.endpointHost; if (isLastThreadInBatch && doDirtyTracking && isRemoteThread) { diff --git a/src/planner/Planner.cpp b/src/planner/Planner.cpp index f2a4aba02..3a778ab80 100644 --- a/src/planner/Planner.cpp +++ b/src/planner/Planner.cpp @@ -132,11 +132,19 @@ void Planner::printConfig() const SPDLOG_INFO("HTTP_SERVER_THREADS {}", config.numthreadshttpserver()); } +std::string Planner::getPolicy() +{ + faabric::util::SharedLock lock(plannerMx); + + return state.policy; +} + void Planner::setPolicy(const std::string& newPolicy) { // Acquire lock to prevent any changes in state whilst we change the policy faabric::util::FullLock lock(plannerMx); + state.policy = newPolicy; faabric::batch_scheduler::resetBatchScheduler(newPolicy); } @@ -349,9 +357,36 @@ void Planner::setMessageResult(std::shared_ptr msg) msg->groupid(), msg->groupidx()); + // If we are setting the result for a frozen message, it is important + // that we store the message itself in the evicted BER as it contains + // information like the function pointer and snapshot key to eventually + // un-freeze from + bool isFrozenMsg = msg->returnvalue() == FROZEN_FUNCTION_RETURN_VALUE; + if (isFrozenMsg) { + // TODO: should this be an error? + if (!state.evictedRequests.contains(msg->appid())) { + SPDLOG_ERROR("Message {} is frozen but app (id: {}) not in map!", + msg->id(), + msg->appid()); + throw std::runtime_error("Orphaned frozen message!"); + } + + auto ber = state.evictedRequests.at(msg->appid()); + for (int i = 0; i < ber->messages_size(); i++) { + if (ber->messages(i).id() == msg->id()) { + ber->mutable_messages(i)->set_funcptr(msg->funcptr()); + ber->mutable_messages(i)->set_inputdata(msg->inputdata()); + ber->mutable_messages(i)->set_snapshotkey(msg->snapshotkey()); + ber->mutable_messages(i)->set_returnvalue(msg->returnvalue()); + } + } + } + // Release the slot only once assert(state.hostMap.contains(msg->executedhost())); - if (!state.appResults[appId].contains(msgId)) { + if (!state.appResults[appId].contains(msgId) || + state.appResults.at(appId).at(msgId)->returnvalue() == + FROZEN_FUNCTION_RETURN_VALUE) { releaseHostSlots(state.hostMap.at(msg->executedhost())); } @@ -537,20 +572,64 @@ std::shared_ptr Planner::getBatchResults( { auto berStatus = faabric::util::batchExecStatusFactory(appId); + // When querying for the result of a batch we always check if it has been + // evicted, as it is one of the triggers to try and re-schedule it again + bool isFrozen = false; + std::shared_ptr frozenBer = nullptr; + // Acquire a read lock to copy all the results we have for this batch { faabric::util::SharedLock lock(plannerMx); - if (!state.appResults.contains(appId)) { - return nullptr; + if (state.evictedRequests.contains(appId)) { + isFrozen = true; + + // To prevent race conditions, before treating an app as frozen + // we require all messages to have reported as frozen + for (const auto& msg : + state.evictedRequests.at(appId)->messages()) { + if (msg.returnvalue() != FROZEN_FUNCTION_RETURN_VALUE) { + isFrozen = false; + } + } + + if (isFrozen) { + frozenBer = state.evictedRequests.at(appId); + + // If the app is frozen (i.e. all messages have frozen) it + // should not be in the in-flight map anymore + if (state.inFlightReqs.contains(appId)) { + SPDLOG_ERROR("Inconsistent planner state: app {} is both " + "frozen and in-flight!", + appId); + return nullptr; + } + } } - for (auto msgResultPair : state.appResults.at(appId)) { - *berStatus->add_messageresults() = *(msgResultPair.second); + if (!isFrozen) { + if (!state.appResults.contains(appId)) { + return nullptr; + } + + for (auto msgResultPair : state.appResults.at(appId)) { + *berStatus->add_messageresults() = *(msgResultPair.second); + } + + // Set the finished condition + berStatus->set_finished(!state.inFlightReqs.contains(appId)); } + } - // Set the finished condition - berStatus->set_finished(!state.inFlightReqs.contains(appId)); + if (isFrozen) { + // This should trigger a NEW decision + auto decision = callBatch(frozenBer); + + // This means that there are not enough free slots to schedule the + // decision, we must just return a keep-alive to the poller thread + if (*decision == NOT_ENOUGH_SLOTS_DECISION) { + berStatus->set_finished(false); + } } return berStatus; @@ -593,13 +672,18 @@ int Planner::getNumMigrations() } static faabric::batch_scheduler::HostMap convertToBatchSchedHostMap( - std::map> hostMapIn) + std::map> hostMapIn, + const std::string& nextEvictedHostIp) { faabric::batch_scheduler::HostMap hostMap; for (const auto& [ip, host] : hostMapIn) { hostMap[ip] = std::make_shared( host->ip(), host->slots(), host->usedslots()); + + if (ip == nextEvictedHostIp) { + hostMap.at(ip)->ip = MUST_EVICT_IP; + } } return hostMap; @@ -620,7 +704,8 @@ Planner::callBatch(std::shared_ptr req) // Make a copy of the host-map state to make sure the scheduling process // does not modify it - auto hostMapCopy = convertToBatchSchedHostMap(state.hostMap); + auto hostMapCopy = + convertToBatchSchedHostMap(state.hostMap, state.nextEvictedHostIp); bool isDistChange = decisionType == faabric::batch_scheduler::DecisionType::DIST_CHANGE; @@ -662,6 +747,12 @@ Planner::callBatch(std::shared_ptr req) faabric::util::updateBatchExecAppId(mpiReq, appId); for (int i = 0; i < mpiReq->messages_size(); i++) { mpiReq->mutable_messages()->at(i).set_groupidx(i); + + // Propagate the snapshot key if it exists + if (i < req->messages_size()) { + mpiReq->mutable_messages()->at(i).set_snapshotkey( + req->messages(i).snapshotkey()); + } } decision = batchScheduler->makeSchedulingDecision( @@ -687,6 +778,17 @@ Planner::callBatch(std::shared_ptr req) return decision; } + if (*decision == MUST_FREEZE_DECISION) { + SPDLOG_INFO("Decided to FREEZE app: {}", appId); + + // Note that the app will be naturally removed from in-flight as the + // messages throw an exception and finish, so here we only need to + // add the request to the evicted requests + state.evictedRequests[appId] = state.inFlightReqs.at(appId).first; + + return decision; + } + // Skip claiming slots and ports if we have preemptively allocated them bool skipClaim = decision->groupId == MPI_PRELOADED_DECISION_GROUPID; @@ -967,6 +1069,8 @@ void Planner::dispatchSchedulingDecision( } } + // TODO: FIXME: we need to push the snapshots here + faabric::scheduler::getFunctionCallClient(hostIp)->executeFunctions( hostReq); } @@ -975,6 +1079,19 @@ void Planner::dispatchSchedulingDecision( req->messages_size()); } +void Planner::setNextEvictedVm(const std::string& vmIp) +{ + faabric::util::FullLock lock(plannerMx); + + if (state.policy != "spot") { + SPDLOG_ERROR("Error setting evicted VM (ip: {}) with policy {}", + vmIp, + state.policy); + SPDLOG_ERROR("To set the next evicted VM policy must be: spot"); + throw std::runtime_error("Error setting the next evicted VM!"); + } +} + Planner& getPlanner() { static Planner planner; diff --git a/src/planner/PlannerEndpointHandler.cpp b/src/planner/PlannerEndpointHandler.cpp index bb18e870e..e327921fb 100644 --- a/src/planner/PlannerEndpointHandler.cpp +++ b/src/planner/PlannerEndpointHandler.cpp @@ -328,6 +328,34 @@ void PlannerEndpointHandler::onRequest( return ctx.sendFunction(std::move(response)); } + case faabric::planner::HttpMessage_Type_GET_POLICY: { + SPDLOG_DEBUG("Planner received GET_POLICY request"); + + // Prepare the response + response.result(beast::http::status::ok); + response.body() = faabric::planner::getPlanner().getPolicy(); + + return ctx.sendFunction(std::move(response)); + } + case faabric::planner::HttpMessage_Type_SET_NEXT_EVICTED_VM: { + SPDLOG_DEBUG("Planner received SET_NEXT_EVICTED_VM request"); + + std::string vmIp = msg.payloadjson(); + try { + faabric::planner::getPlanner().setNextEvictedVm(vmIp); + } catch (std::exception& e) { + response.result(beast::http::status::bad_request); + response.body() = std::string( + "Next evicted VM must only be set in 'spot' policy"); + return ctx.sendFunction(std::move(response)); + } + + // Prepare the response + response.result(beast::http::status::ok); + response.body() = std::string("Next evicted VM set"); + + return ctx.sendFunction(std::move(response)); + } default: { SPDLOG_ERROR("Unrecognised message type {}", msg.type()); response.result(beast::http::status::bad_request); diff --git a/src/planner/planner.proto b/src/planner/planner.proto index 6744eaa4b..b3b358bb5 100644 --- a/src/planner/planner.proto +++ b/src/planner/planner.proto @@ -46,6 +46,11 @@ message HttpMessage { EXECUTE_BATCH_STATUS = 11; PRELOAD_SCHEDULING_DECISION = 12; SET_POLICY = 13; + GET_POLICY = 14; + // This endpoint is only used with the SPOT planner policy. In a real + // deployment we would get this value from a cloud-provider-specific + // API + SET_NEXT_EVICTED_VM = 15; } Type type = 1 [json_name = "http_type"]; diff --git a/src/scheduler/Scheduler.cpp b/src/scheduler/Scheduler.cpp index ae7ac7f72..dcccff791 100644 --- a/src/scheduler/Scheduler.cpp +++ b/src/scheduler/Scheduler.cpp @@ -454,6 +454,8 @@ Scheduler::checkForMigrationOpportunities(faabric::Message& msg, // Update the group ID if we want to migrate if (decision == DO_NOT_MIGRATE_DECISION) { newGroupId = groupId; + } else if (decision == MUST_FREEZE_DECISION) { + newGroupId = MUST_FREEZE; } else { newGroupId = decision.groupId; } @@ -475,6 +477,13 @@ Scheduler::checkForMigrationOpportunities(faabric::Message& msg, newGroupId = overwriteNewGroupId; } + if (newGroupId == MUST_FREEZE) { + auto migration = std::make_shared(); + migration->set_appid(MUST_FREEZE); + + return migration; + } + bool appMustMigrate = newGroupId != groupId; if (!appMustMigrate) { return nullptr; diff --git a/tests/dist/mpi/mpi_native.cpp b/tests/dist/mpi/mpi_native.cpp index 95c0fc9de..7c83c4eef 100644 --- a/tests/dist/mpi/mpi_native.cpp +++ b/tests/dist/mpi/mpi_native.cpp @@ -1,5 +1,6 @@ #include "mpi_native.h" +#include #include #include #include @@ -15,6 +16,7 @@ #include #include #include +#include using namespace faabric::mpi; @@ -785,7 +787,41 @@ void mpiMigrationPoint(int entrypointFuncArg) // Detect if there is a pending migration for the current app auto migration = sch.checkForMigrationOpportunities(*call); - bool appMustMigrate = migration != nullptr; + bool appMustFreeze = + migration != nullptr && migration->appid() == MUST_FREEZE; + + // Short-cut for when all messages need to freeze. We only need to send + // a snapshot to the planner, and throw an exception + // FIXME: we also need to set the input data!! + if (appMustFreeze) { + std::string argStr = std::to_string(entrypointFuncArg); + std::vector inputData(argStr.begin(), argStr.end()); + std::string snapKey = "migration_" + std::to_string(call->id()); + + call->set_inputdata(inputData.data(), inputData.size()); + call->set_snapshotkey(snapKey); + + auto* exec = faabric::executor::ExecutorContext::get()->getExecutor(); + auto snap = + std::make_shared(exec->getMemoryView()); + auto& reg = faabric::snapshot::getSnapshotRegistry(); + reg.registerSnapshot(snapKey, snap); + + auto plannerIp = faabric::util::getIPFromHostname( + faabric::util::getSystemConfig().plannerHost); + faabric::snapshot::getSnapshotClient(plannerIp)->pushSnapshot(snapKey, + snap); + + SPDLOG_INFO("{}:{}:{} Freezing message!", + call->appid(), + call->groupid(), + call->groupidx()); + + // Throw an exception to be caught by the executor and terminate + throw faabric::util::FunctionFrozenException("Freezing MPI rank"); + } + + bool appMustMigrate = migration != nullptr && !appMustFreeze; // Detect if this particular function needs to be migrated or not bool funcMustMigrate = false; diff --git a/tests/dist/mpi/test_multiple_mpi_worlds.cpp b/tests/dist/mpi/test_multiple_mpi_worlds.cpp index 74d9990ff..92a731ed9 100644 --- a/tests/dist/mpi/test_multiple_mpi_worlds.cpp +++ b/tests/dist/mpi/test_multiple_mpi_worlds.cpp @@ -243,4 +243,15 @@ TEST_CASE_METHOD(MpiDistTestsFixture, checkAllocationAndResult(req1, hostsAfter1); checkAllocationAndResult(req2, hostsAfter2); } + +TEST_CASE_METHOD( + MpiDistTestsFixture, + "Test migrating an MPI app as a consequence of an eviction (SPOT)", + "[mpi]") +{} + +TEST_CASE_METHOD(MpiDistTestsFixture, + "Test stopping and resuming an MPI application (SPOT)", + "[mpi]") +{} } diff --git a/tests/test/batch-scheduler/test_spot_scheduler.cpp b/tests/test/batch-scheduler/test_spot_scheduler.cpp new file mode 100644 index 000000000..98bc69189 --- /dev/null +++ b/tests/test/batch-scheduler/test_spot_scheduler.cpp @@ -0,0 +1,448 @@ +#include + +#include "fixtures.h" + +#include +#include + +using namespace faabric::batch_scheduler; + +namespace tests { + +class SpotSchedulerTestFixture : public BatchSchedulerFixture +{ + public: + SpotSchedulerTestFixture() + { + conf.batchSchedulerMode = "spot"; + batchScheduler = getBatchScheduler(); + } +}; + +// SPOT should behave the same as COMPACT and BIN-PACK for NEW requests +TEST_CASE_METHOD(SpotSchedulerTestFixture, + "Test scheduling of new requests with Spot", + "[batch-scheduler]") +{ + // To mock new requests (i.e. DecisionType::NEW), we always set the + // InFlightReqs map to an empty map + BatchSchedulerConfig config = { + .hostMap = {}, + .inFlightReqs = {}, + .expectedDecision = SchedulingDecision(appId, groupId), + }; + + SECTION("Compact scheduler gives up if not enough slots are available") + { + config.hostMap = buildHostMap({ "foo", "bar" }, { 1, 1 }, { 0, 0 }); + ber = faabric::util::batchExecFactory("bat", "man", 6); + config.expectedDecision = NOT_ENOUGH_SLOTS_DECISION; + } + + SECTION("Scheduling fits in one host") + { + config.hostMap = buildHostMap({ "foo", "bar" }, { 4, 3 }, { 0, 0 }); + ber = faabric::util::batchExecFactory("bat", "man", 3); + config.expectedDecision = + buildExpectedDecision(ber, { "foo", "foo", "foo" }); + } + + SECTION("Scheduling is exactly one host") + { + config.hostMap = buildHostMap({ "foo", "bar" }, { 4, 3 }, { 0, 0 }); + ber = faabric::util::batchExecFactory("bat", "man", 4); + config.expectedDecision = + buildExpectedDecision(ber, { "foo", "foo", "foo", "foo" }); + } + + // The bin-pack scheduler will pick hosts with larger empty slots first + SECTION("Scheduling spans two hosts") + { + config.hostMap = buildHostMap({ "foo", "bar" }, { 4, 3 }, { 0, 0 }); + ber = faabric::util::batchExecFactory("bat", "man", 6); + config.expectedDecision = buildExpectedDecision( + ber, { "foo", "foo", "foo", "foo", "bar", "bar" }); + } + + SECTION("Scheduling spans exactly two hosts") + { + config.hostMap = buildHostMap({ "foo", "bar" }, { 4, 3 }, { 0, 0 }); + ber = faabric::util::batchExecFactory("bat", "man", 7); + config.expectedDecision = buildExpectedDecision( + ber, { "foo", "foo", "foo", "foo", "bar", "bar", "bar" }); + } + + // In particular, it will prioritise hosts with overall less capacity if + // they have more free resources + SECTION("Scheduling spans two hosts") + { + config.hostMap = buildHostMap({ "foo", "bar" }, { 3, 4 }, { 0, 2 }); + ber = faabric::util::batchExecFactory("bat", "man", 4); + config.expectedDecision = + buildExpectedDecision(ber, { "foo", "foo", "foo", "bar" }); + } + + // In case of a tie in free resources, the Compact scheduler will pick + // hosts with larger overall capacity first + SECTION("Scheduling spans two hosts with same free resources") + { + config.hostMap = buildHostMap({ "foo", "bar" }, { 4, 3 }, { 1, 0 }); + ber = faabric::util::batchExecFactory("bat", "man", 6); + config.expectedDecision = buildExpectedDecision( + ber, { "foo", "foo", "foo", "bar", "bar", "bar" }); + } + + // If there's still a tie, the Compact scheduler will solve the tie by + // sorting the hosts alphabetically (from larger to smaller) + SECTION("Scheduling spans two hosts with same free resources and size") + { + config.hostMap = buildHostMap({ "foo", "bar" }, { 3, 3 }, { 0, 0 }); + ber = faabric::util::batchExecFactory("bat", "man", 6); + config.expectedDecision = buildExpectedDecision( + ber, { "foo", "foo", "foo", "bar", "bar", "bar" }); + } + + SECTION("Scheduling spans an arbitrarily large number of hosts") + { + config.hostMap = buildHostMap({ "foo", "bar", "baz", "bip", "bup" }, + { 4, 6, 2, 3, 1 }, + { 0, 2, 2, 2, 0 }); + ber = faabric::util::batchExecFactory("bat", "man", 10); + config.expectedDecision = buildExpectedDecision(ber, + { "bar", + "bar", + "bar", + "bar", + "foo", + "foo", + "foo", + "foo", + "bip", + "bup" }); + } + + actualDecision = *batchScheduler->makeSchedulingDecision( + config.hostMap, config.inFlightReqs, ber); + compareSchedulingDecisions(actualDecision, config.expectedDecision); +} + +// SPOT should behave the same as COMPACT and BIN-PACK for SCALE_CHANGE requests +TEST_CASE_METHOD(SpotSchedulerTestFixture, + "Test scheduling of scale-change requests with SPOT", + "[batch-scheduler]") +{ + // To mock a scale-change request (i.e. DecisionType::SCALE_CHANGE), we + // need to have one in-flight request in the map with the same app id + // (and not of type MIGRATION) + BatchSchedulerConfig config = { + .hostMap = {}, + .inFlightReqs = {}, + .expectedDecision = SchedulingDecision(appId, groupId), + }; + + // The configs in this test must be read as follows: + // - the host map's used slots contains the current distribution for the app + // (i.e. the number of used slots matches the number in in-flight reqs) + // - the host map's slots contain the total slots + // - the ber contains the NEW messages we are going to add + // - the expected decision includes the expected scheduling decision for + // the new messages + + SECTION("Compact scheduler gives up if not enough slots are available") + { + config.hostMap = buildHostMap({ "foo", "bar" }, { 2, 1 }, { 1, 0 }); + ber = faabric::util::batchExecFactory("bat", "man", 6); + config.inFlightReqs = buildInFlightReqs(ber, 1, { "foo" }); + config.expectedDecision = NOT_ENOUGH_SLOTS_DECISION; + } + + // When scheduling a SCALE_CHANGE request, we always try to colocate as + // much as possible + SECTION("Scheduling fits in one host") + { + config.hostMap = buildHostMap({ "foo", "bar" }, { 4, 3 }, { 1, 0 }); + ber = faabric::util::batchExecFactory("bat", "man", 3); + config.inFlightReqs = buildInFlightReqs(ber, 1, { "foo" }); + config.expectedDecision = + buildExpectedDecision(ber, { "foo", "foo", "foo" }); + } + + // We prefer hosts with less capacity if they are already running requests + // for the same app + SECTION("Scheduling fits in one host and prefers known hosts") + { + config.hostMap = buildHostMap({ "foo", "bar" }, { 5, 4 }, { 0, 1 }); + ber = faabric::util::batchExecFactory("bat", "man", 3); + config.inFlightReqs = buildInFlightReqs(ber, 1, { "bar" }); + config.expectedDecision = + buildExpectedDecision(ber, { "bar", "bar", "bar" }); + } + + // Like with `NEW` requests, we can also spill to other hosts + SECTION("Scheduling spans more than one host") + { + config.hostMap = buildHostMap({ "foo", "bar" }, { 4, 3 }, { 0, 1 }); + ber = faabric::util::batchExecFactory("bat", "man", 4); + config.inFlightReqs = buildInFlightReqs(ber, 1, { "bar" }); + config.expectedDecision = + buildExpectedDecision(ber, { "bar", "bar", "foo", "foo" }); + } + + // If two hosts are already executing the app, we pick the one that is + // running the largest number of messages + SECTION("Scheduler prefers hosts with more running messages") + { + config.hostMap = buildHostMap({ "foo", "bar" }, { 4, 3 }, { 1, 2 }); + ber = faabric::util::batchExecFactory("bat", "man", 1); + config.inFlightReqs = + buildInFlightReqs(ber, 3, { "bar", "bar", "foo" }); + config.expectedDecision = buildExpectedDecision(ber, { "bar" }); + } + + // Again, when picking a new host to spill to, we priorities hosts that + // are already running requests for this app + SECTION("Scheduling always picks known hosts first") + { + config.hostMap = buildHostMap( + { + "foo", + "bar", + "baz", + }, + { 4, 3, 2 }, + { 0, 1, 1 }); + ber = faabric::util::batchExecFactory("bat", "man", 5); + config.inFlightReqs = buildInFlightReqs(ber, 2, { "bar", "baz" }); + config.expectedDecision = + buildExpectedDecision(ber, { "bar", "bar", "baz", "foo", "foo" }); + } + + // Sometimes the preferred hosts just don't have slots. They will be sorted + // first but the scheduler will skip them when bin-packing + SECTION("Scheduler ignores preferred but full hosts") + { + config.hostMap = buildHostMap( + { + "foo", + "bar", + "baz", + }, + { 4, 2, 2 }, + { 0, 2, 1 }); + ber = faabric::util::batchExecFactory("bat", "man", 3); + config.inFlightReqs = + buildInFlightReqs(ber, 3, { "bar", "bar", "baz" }); + config.expectedDecision = + buildExpectedDecision(ber, { "baz", "foo", "foo" }); + } + + // In case of a tie of the number of runing messages, we revert to `NEW`- + // like tie breaking + SECTION("In case of a tie of preferred hosts, fall-back to known " + "tie-breaks (free slots)") + { + config.hostMap = buildHostMap( + { + "foo", + "bar", + "baz", + }, + { 4, 3, 2 }, + { 0, 1, 1 }); + ber = faabric::util::batchExecFactory("bat", "man", 3); + config.inFlightReqs = buildInFlightReqs(ber, 2, { "bar", "baz" }); + config.expectedDecision = + buildExpectedDecision(ber, { "bar", "bar", "baz" }); + } + + SECTION("In case of a tie of preferred hosts, fall-back to known " + "tie-breaks (size)") + { + config.hostMap = buildHostMap( + { + "foo", + "bar", + "baz", + }, + { 4, 3, 2 }, + { 0, 2, 1 }); + ber = faabric::util::batchExecFactory("bat", "man", 3); + config.inFlightReqs = buildInFlightReqs(ber, 2, { "bar", "baz" }); + config.expectedDecision = + buildExpectedDecision(ber, { "bar", "baz", "foo" }); + } + + SECTION("In case of a tie of preferred hosts, fall-back to known " + "tie-breaks (alphabetical)") + { + config.hostMap = buildHostMap( + { + "foo", + "bar", + "baz", + }, + { 4, 2, 2 }, + { 0, 1, 1 }); + ber = faabric::util::batchExecFactory("bat", "man", 3); + config.inFlightReqs = buildInFlightReqs(ber, 2, { "bar", "baz" }); + config.expectedDecision = + buildExpectedDecision(ber, { "baz", "bar", "foo" }); + } + + actualDecision = *batchScheduler->makeSchedulingDecision( + config.hostMap, config.inFlightReqs, ber); + compareSchedulingDecisions(actualDecision, config.expectedDecision); +} + +// DIST_CHANGE requests in the SPOT scheduler only concern with tainted VMs +// (i.e. VMs that are going to be evicted): +// - If nothing is going to be evicted, there should be no migration happening +// - If we know one VM is going to be evicted: +// - We migrate out of it if there are slots +// - We FREEZE if there are no slots to migrate to +TEST_CASE_METHOD(SpotSchedulerTestFixture, + "Test scheduling of dist-change requests with SPOT", + "[batch-scheduler]") +{ + // To mock a dist-change request (i.e. DecisionType::DIST_CHANGE), we + // need to have one in-flight request in the map with the same app id, the + // same size (and of type MIGRATION) + BatchSchedulerConfig config = { + .hostMap = {}, + .inFlightReqs = {}, + .expectedDecision = SchedulingDecision(appId, groupId), + }; + + // Note: the way we let the COMPACT scheduler know the VMs that we will + // evict is by setting the IP field in the HostMap **value** (not key) + // to MUST_EVICT_IP + + SECTION("SPOT returns nothing if there are no tainted VMs") + { + config.hostMap = buildHostMap({ "foo" }, { 4 }, { 2 }); + ber = faabric::util::batchExecFactory("bat", "man", 2); + ber->set_type(BatchExecuteRequest_BatchExecuteType_MIGRATION); + config.inFlightReqs = buildInFlightReqs(ber, 2, { "foo", "foo" }); + config.expectedDecision = DO_NOT_MIGRATE_DECISION; + } + + SECTION("SPOT returns nothing if there are no tainted VMs (multiple)") + { + config.hostMap = buildHostMap({ "foo", "bar" }, { 4, 2 }, { 4, 1 }); + ber = faabric::util::batchExecFactory("bat", "man", 5); + ber->set_type(BatchExecuteRequest_BatchExecuteType_MIGRATION); + config.inFlightReqs = + buildInFlightReqs(ber, 5, { "foo", "foo", "foo", "foo", "bar" }); + config.expectedDecision = DO_NOT_MIGRATE_DECISION; + } + + SECTION("SPOT ignores opportunities to free-up hosts") + { + config.hostMap = + buildHostMap({ "foo", "bar", "baz" }, { 4, 4, 4 }, { 2, 2, 4 }); + ber = faabric::util::batchExecFactory("bat", "man", 4); + ber->set_type(BatchExecuteRequest_BatchExecuteType_MIGRATION); + config.inFlightReqs = + buildInFlightReqs(ber, 4, { "baz", "baz", "baz", "baz" }); + config.expectedDecision = DO_NOT_MIGRATE_DECISION; + } + + SECTION("SPOT deallocates an app if the VM is tainted and not enough slots (single-host)") + { + config.hostMap = buildHostMap({ "foo" }, { 4 }, { 2 }); + ber = faabric::util::batchExecFactory("bat", "man", 2); + ber->set_type(BatchExecuteRequest_BatchExecuteType_MIGRATION); + config.inFlightReqs = buildInFlightReqs(ber, 2, { "foo", "foo" }); + config.expectedDecision = MUST_FREEZE_DECISION; + // Must evict host foo + markHostAsEvicted(config.hostMap, "foo"); + } + + SECTION("SPOT deallocates an app if the VM is tainted and not enough slots (multiple-hosts)") + { + config.hostMap = buildHostMap( + { "foo", "bar", "baz", "lol" }, { 4, 4, 2, 2 }, { 2, 4, 2, 2 }); + ber = faabric::util::batchExecFactory("bat", "man", 4); + ber->set_type(BatchExecuteRequest_BatchExecuteType_MIGRATION); + config.inFlightReqs = + buildInFlightReqs(ber, 4, { "foo", "foo", "bar", "bar" }); + config.expectedDecision = MUST_FREEZE_DECISION; + // Must evict host foo + markHostAsEvicted(config.hostMap, "foo"); + } + + SECTION("SPOT migrated an app if enough slots") + { + config.hostMap = buildHostMap( + { "foo", "bar", "baz", "lol" }, { 4, 4, 2, 2 }, { 2, 2, 2, 2 }); + ber = faabric::util::batchExecFactory("bat", "man", 4); + ber->set_type(BatchExecuteRequest_BatchExecuteType_MIGRATION); + config.inFlightReqs = + buildInFlightReqs(ber, 4, { "baz", "baz", "lol", "lol" }); + + // Must evict host foo + markHostAsEvicted(config.hostMap, "baz"); + + config.expectedDecision = + buildExpectedDecision(ber, { "foo", "foo", "lol", "lol" }); + } + + SECTION("SPOT ignores evicted hosts if not running messages in it") + { + config.hostMap = buildHostMap( + { "foo", "bar", "baz", "lol" }, { 4, 4, 2, 2 }, { 2, 2, 2, 2 }); + ber = faabric::util::batchExecFactory("bat", "man", 4); + ber->set_type(BatchExecuteRequest_BatchExecuteType_MIGRATION); + config.inFlightReqs = + buildInFlightReqs(ber, 4, { "baz", "baz", "lol", "lol" }); + + // Must evict host foo + markHostAsEvicted(config.hostMap, "foo"); + + config.expectedDecision = DO_NOT_MIGRATE_DECISION; + } + + SECTION("Compact prefers hosts running more messages") + { + config.hostMap = buildHostMap( + { + "foo", + "bar", + "baz", + }, + { 3, 2, 1 }, + { 2, 1, 1 }); + ber = faabric::util::batchExecFactory("bat", "man", 4); + ber->set_type(BatchExecuteRequest_BatchExecuteType_MIGRATION); + config.inFlightReqs = + buildInFlightReqs(ber, 4, { "foo", "foo", "bar", "baz" }); + + markHostAsEvicted(config.hostMap, "bar"); + + config.expectedDecision = + buildExpectedDecision(ber, { "foo", "foo", "foo", "baz" }); + } + + SECTION("Compact will minimise the number of messages to migrate") + { + config.hostMap = + buildHostMap({ "foo", "bar", "baz" }, { 5, 4, 2 }, { 3, 4, 2 }); + ber = faabric::util::batchExecFactory("bat", "man", 9); + ber->set_type(BatchExecuteRequest_BatchExecuteType_MIGRATION); + config.inFlightReqs = buildInFlightReqs( + ber, + 9, + { "foo", "foo", "foo", "bar", "bar", "bar", "bar", "baz", "baz" }); + + markHostAsEvicted(config.hostMap, "baz"); + + config.expectedDecision = buildExpectedDecision( + ber, + { "foo", "foo", "foo", "bar", "bar", "bar", "bar", "foo", "foo" }); + } + + actualDecision = *batchScheduler->makeSchedulingDecision( + config.hostMap, config.inFlightReqs, ber); + compareSchedulingDecisions(actualDecision, config.expectedDecision); +} +} diff --git a/tests/test/planner/test_planner_endpoint.cpp b/tests/test/planner/test_planner_endpoint.cpp index 62578f67d..72ba5022c 100644 --- a/tests/test/planner/test_planner_endpoint.cpp +++ b/tests/test/planner/test_planner_endpoint.cpp @@ -748,6 +748,13 @@ TEST_CASE_METHOD(PlannerEndpointExecTestFixture, expectedResponseBody = "Policy set correctly"; } + SECTION("Valid request (spot)") + { + policy = "spot"; + expectedReturnCode = beast::http::status::ok; + expectedResponseBody = "Policy set correctly"; + } + SECTION("Invalid request") { policy = "foo-bar"; @@ -762,5 +769,60 @@ TEST_CASE_METHOD(PlannerEndpointExecTestFixture, REQUIRE(boost::beast::http::int_to_status(result.first) == expectedReturnCode); REQUIRE(result.second == expectedResponseBody); + + if (expectedReturnCode == beast::http::status::ok) { + // Second, get the policy we just set + msg.set_type(HttpMessage_Type_GET_POLICY); + msg.clear_payloadjson(); + msgJsonStr = faabric::util::messageToJson(msg); + + std::pair result = doPost(msgJsonStr); + REQUIRE(boost::beast::http::int_to_status(result.first) == + expectedReturnCode); + REQUIRE(result.second == policy); + } +} + +TEST_CASE_METHOD(PlannerEndpointExecTestFixture, + "Test setting the next evicted VM", + "[planner]") +{ + // First, prepare an HTTP request to execute a batch + HttpMessage policyMsg; + policyMsg.set_type(HttpMessage_Type_SET_POLICY); + HttpMessage msg; + msg.set_type(HttpMessage_Type_SET_NEXT_EVICTED_VM); + msg.set_payloadjson("1.1.1.1"); + msgJsonStr = faabric::util::messageToJson(msg); + + std::string policy; + + SECTION("Valid request") + { + policy = "spot"; + expectedReturnCode = beast::http::status::ok; + expectedResponseBody = "Next evicted VM set"; + } + + SECTION("Invalid request") + { + policy = "compact"; + expectedReturnCode = beast::http::status::bad_request; + expectedResponseBody = + "Next evicted VM must only be set in 'spot' policy"; + } + + std::string policyMsgJsonStr = faabric::util::messageToJson(policyMsg); + + // First set the policy + std::pair result = doPost(policyMsgJsonStr); + REQUIRE(boost::beast::http::int_to_status(result.first) == + boost::beast::http::status::ok); + + // Second set the next evicted VM + result = doPost(msgJsonStr); + REQUIRE(boost::beast::http::int_to_status(result.first) == + expectedReturnCode); + REQUIRE(result.second == expectedResponseBody); } } diff --git a/tests/utils/fixtures.h b/tests/utils/fixtures.h index 86b4776f5..f40e2405f 100644 --- a/tests/utils/fixtures.h +++ b/tests/utils/fixtures.h @@ -646,6 +646,12 @@ class BatchSchedulerFixture : public ConfFixture return decision; } + static void markHostAsEvicted(faabric::batch_scheduler::HostMap& hostMap, + const std::string& hostIp) + { + hostMap.at(hostIp)->ip = MUST_EVICT_IP; + } + static void compareSchedulingDecisions( const faabric::batch_scheduler::SchedulingDecision& decisionA, const faabric::batch_scheduler::SchedulingDecision& decisionB)