From 5497d057bc0582680d103ae85fe5ce5ca94e46dd Mon Sep 17 00:00:00 2001 From: Carlos Date: Mon, 13 May 2024 15:54:29 +0100 Subject: [PATCH] threads: elastically scale-up (#418) * threads: elastically scale-up * gh: bump code version * threads: handle zero-sized requests and preload a la mpi * omp: fix race conditions with elastic scaling --- .env | 4 +- .github/workflows/tests.yml | 12 +- VERSION | 2 +- src/batch-scheduler/BinPackScheduler.cpp | 7 + src/batch-scheduler/SchedulingDecision.cpp | 2 +- src/planner/Planner.cpp | 243 ++++++++++++++++++--- src/planner/PlannerClient.cpp | 13 +- src/planner/PlannerEndpointHandler.cpp | 16 ++ src/proto/faabric.proto | 7 + src/scheduler/Scheduler.cpp | 4 + 10 files changed, 263 insertions(+), 47 deletions(-) diff --git a/.env b/.env index a379ae624..3c1d9e416 100644 --- a/.env +++ b/.env @@ -1,4 +1,4 @@ -FAABRIC_VERSION=0.19.0 -FAABRIC_CLI_IMAGE=faasm.azurecr.io/faabric:0.19.0 +FAABRIC_VERSION=0.20.0 +FAABRIC_CLI_IMAGE=faasm.azurecr.io/faabric:0.20.0 COMPOSE_PROJECT_NAME=faabric-dev CONAN_CACHE_MOUNT_SOURCE=./conan-cache/ diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index cb924bb6b..c1d4d1bb6 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -20,7 +20,7 @@ jobs: if: github.event.pull_request.draft == false runs-on: ubuntu-latest container: - image: faasm.azurecr.io/faabric:0.19.0 + image: faasm.azurecr.io/faabric:0.20.0 env: DEPLOYMENT_TYPE: gha-ci steps: @@ -34,7 +34,7 @@ jobs: if: github.event.pull_request.draft == false runs-on: ubuntu-latest container: - image: faasm.azurecr.io/faabric:0.19.0 + image: faasm.azurecr.io/faabric:0.20.0 steps: - name: "Check out code" uses: actions/checkout@v4 @@ -45,7 +45,7 @@ jobs: if: github.event.pull_request.draft == false runs-on: ubuntu-latest container: - image: faasm.azurecr.io/faabric:0.19.0 + image: faasm.azurecr.io/faabric:0.20.0 steps: - name: "Check out code" uses: actions/checkout@v4 @@ -65,7 +65,7 @@ jobs: REDIS_QUEUE_HOST: redis REDIS_STATE_HOST: redis container: - image: faasm.azurecr.io/faabric:0.19.0 + image: faasm.azurecr.io/faabric:0.20.0 options: --privileged services: redis: @@ -104,7 +104,7 @@ jobs: REDIS_QUEUE_HOST: redis REDIS_STATE_HOST: redis container: - image: faasm.azurecr.io/faabric:0.19.0 + image: faasm.azurecr.io/faabric:0.20.0 options: --privileged services: redis: @@ -156,7 +156,7 @@ jobs: REDIS_QUEUE_HOST: redis REDIS_STATE_HOST: redis container: - image: faasm.azurecr.io/faabric:0.19.0 + image: faasm.azurecr.io/faabric:0.20.0 services: redis: image: redis diff --git a/VERSION b/VERSION index 1cf0537c3..5a03fb737 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.19.0 +0.20.0 diff --git a/src/batch-scheduler/BinPackScheduler.cpp b/src/batch-scheduler/BinPackScheduler.cpp index 452435ab7..ef5733964 100644 --- a/src/batch-scheduler/BinPackScheduler.cpp +++ b/src/batch-scheduler/BinPackScheduler.cpp @@ -307,6 +307,13 @@ std::shared_ptr BinPackScheduler::makeSchedulingDecision( auto decisionType = getDecisionType(inFlightReqs, req); auto sortedHosts = getSortedHosts(hostMap, inFlightReqs, req, decisionType); + // For an OpenMP request with the single host hint, we only consider + // scheduling in one VM + bool isOmp = req->messages_size() > 0 && req->messages(0).isomp(); + if (req->singlehosthint() && isOmp) { + sortedHosts.erase(sortedHosts.begin() + 1, sortedHosts.end()); + } + // Assign slots from the list (i.e. bin-pack) auto it = sortedHosts.begin(); int numLeftToSchedule = req->messages_size(); diff --git a/src/batch-scheduler/SchedulingDecision.cpp b/src/batch-scheduler/SchedulingDecision.cpp index 31bd97351..9c953c605 100644 --- a/src/batch-scheduler/SchedulingDecision.cpp +++ b/src/batch-scheduler/SchedulingDecision.cpp @@ -15,7 +15,7 @@ bool SchedulingDecision::isSingleHost() const std::string thisHost = conf.endpointHost; std::set hostSet(hosts.begin(), hosts.end()); - return hostSet.size() == 1; + return hostSet.size() <= 1; } void SchedulingDecision::addMessage(const std::string& host, diff --git a/src/planner/Planner.cpp b/src/planner/Planner.cpp index 1eeff755f..996dd3430 100644 --- a/src/planner/Planner.cpp +++ b/src/planner/Planner.cpp @@ -19,7 +19,7 @@ // Special group ID magic to indicate MPI decisions that we have preemptively // scheduled -#define MPI_PRELOADED_DECISION_GROUPID -99 +#define FIXED_SIZE_PRELOADED_DECISION_GROUPID -99 namespace faabric::planner { @@ -27,6 +27,55 @@ namespace faabric::planner { // Utility Functions // ---------------------- +// Helper method to calculate how many available slots in the current host we +// can scale-up to +int availableOpenMpSlots( + int appId, + const std::string& mainHost, + const std::map>& hostMap, + const faabric::batch_scheduler::InFlightReqs& inFlightReqs) +{ + // At most, we can scale-up to the host size minus one (the caller thread) + int availableSlots = + hostMap.at(mainHost)->slots() - hostMap.at(mainHost)->usedslots(); + assert(availableSlots <= hostMap.at(mainHost)->slots() - 1); + + // However, we need to discard any in-flight apps that are also running + // in this host. This is to prevent a situation where a co-located app + // elastically scales beyond another's app minimum level of parallelism + for (const auto& [thisAppId, inFlightPair] : inFlightReqs) { + if (appId == thisAppId) { + continue; + } + + // Check if the first message in the decision is scheduled to the + // same host we are + if (inFlightPair.second->hosts.at(0) == mainHost) { + // If so, check if the total OMP num threads is more than the + // current number of messages in flight, and if so subtract the + // difference from the available slots list + int requestedButNotOccupiedSlots = + inFlightPair.first->messages(0).ompnumthreads() - + inFlightPair.first->messages_size(); + + // This value could be smaller than zero if elastically scaled-up + if (requestedButNotOccupiedSlots > 0) { + availableSlots -= requestedButNotOccupiedSlots; + + SPDLOG_DEBUG("Subtracting {} possible slots for app {}'s " + "elastic scale from app {}!", + requestedButNotOccupiedSlots, + appId, + thisAppId); + } + } + } + + assert(availableSlots >= 0); + + return availableSlots; +} + static void claimHostSlots(std::shared_ptr host, int slotsToClaim = 1) { host->set_usedslots(host->usedslots() + slotsToClaim); @@ -434,9 +483,10 @@ void Planner::setMessageResult(std::shared_ptr msg) if (it == req->messages().end()) { // Ditto as before. We want to allow setting the message result // more than once without breaking - SPDLOG_DEBUG( - "Setting result for non-existant (or finished) message: {}", - appId); + SPDLOG_DEBUG("Setting result for non-existant (or finished) " + "message: {} (app: {})", + msg->id(), + appId); } else { SPDLOG_DEBUG("Removing message {} from app {}", msg->id(), appId); @@ -769,8 +819,74 @@ Planner::callBatch(std::shared_ptr req) // does not modify it auto hostMapCopy = convertToBatchSchedHostMap(state.hostMap, state.nextEvictedHostIps); + + bool isScaleChange = + decisionType == faabric::batch_scheduler::DecisionType::SCALE_CHANGE; bool isDistChange = decisionType == faabric::batch_scheduler::DecisionType::DIST_CHANGE; + bool existsPreloadedDec = + state.preloadedSchedulingDecisions.contains(appId); + + // For a SCALE_CHANGE decision (i.e. fork) with the elastic flag set, we + // want to scale up to as many available cores as possible in the app's + // main host (bypass this logic if we have pre-loaded a decision) + if (isScaleChange && req->elasticscalehint() && !existsPreloadedDec) { + SPDLOG_INFO("App {} requested to elastically scale-up", appId); + auto oldDec = state.inFlightReqs.at(appId).second; + auto mainHost = oldDec->hosts.at(0); + + // If there are co-located OpenMP apps, we should never use + // their `ompNumThreads`' slots + int numAvail = availableOpenMpSlots( + appId, mainHost, state.hostMap, state.inFlightReqs); + int numRequested = req->messages_size(); + int lastMsgIdx = + numRequested == 0 ? 0 : req->messages(numRequested - 1).groupidx(); + for (int itr = 0; itr < (numAvail - numRequested); itr++) { + // Differentiate between the position in the message array (itr) + // and the new group index. Usually, in a fork, they would be + // offset by one + int msgIdx = lastMsgIdx + itr + 1; + SPDLOG_DEBUG("Adding elastically scaled up msg idx {} (app: {})", + msgIdx, + appId); + + // To add a new message, copy from the last, and update the indexes + if (numRequested == 0) { + // This is a special case where we scale up from zero + // parallelism (i.e. 1 OpenMP thread) that requires special + // care + auto* newMsg = req->add_messages(); + *newMsg = state.inFlightReqs.at(appId).first->messages(0); + newMsg->set_mainhost(mainHost); + newMsg->set_appidx(msgIdx); + newMsg->set_groupidx(msgIdx); + + // For requests that elastically scale from 1 (i.e. zero- + // parallelism) we make use of the group id field to pass the + // actual function pointer as a hack + newMsg->set_funcptr(req->groupid()); + } else { + *req->add_messages() = req->messages(numRequested - 1); + req->mutable_messages(numRequested + itr)->set_appidx(msgIdx); + req->mutable_messages(numRequested + itr)->set_groupidx(msgIdx); + } + + // Also update the message id to make sure we can wait-for and + // clean-up the resources we use + req->mutable_messages(numRequested + itr) + ->set_id(faabric::util::generateGid()); + } + + if (numAvail > numRequested) { + SPDLOG_INFO("Elastically scaled-up app {} ({} -> {})", + appId, + numRequested, + numAvail); + } else { + SPDLOG_INFO("Decided NOT to elastically scaled-up app {}", appId); + } + } // For a DIST_CHANGE decision (i.e. migration) we want to try to imrpove // on the old decision (we don't care the one we send), so we make sure @@ -785,37 +901,79 @@ Planner::callBatch(std::shared_ptr req) } } - // For a NEW decision of an MPI application, we know that it will be - // followed-up by a SCALE_CHANGE one, and that the mpi_world_size parameter - // must be set. Thus, we can schedule slots for all the MPI ranks, and - // consume them later as a preloaded scheduling decision + // For a NEW decision of an MPI/OpenMP application, we know that it will be + // followed-up by a SCALE_CHANGE one, and that the size parameter + // must be set. Thus, we can schedule slots for all the MPI ranks/OMP + // threads, and consume them later as a preloaded scheduling decision bool isNew = decisionType == faabric::batch_scheduler::DecisionType::NEW; - bool isMpi = req->messages(0).ismpi(); - std::shared_ptr mpiReq = nullptr; + bool isMpi = req->messages_size() > 0 && req->messages(0).ismpi(); + bool isOmp = req->messages_size() > 0 && req->messages(0).isomp(); + std::shared_ptr knownSizeReq = nullptr; + + // For an OpenMP decision, we want to make sure that no in-flight + // tasks are currently in a join phase (from a repeated fork-join) + if (isOmp) { + for (const auto& [thisAppId, inFlightPair] : state.inFlightReqs) { + if (thisAppId == appId) { + continue; + } + + int requestedButNotOccupiedSlots = + inFlightPair.first->messages(0).ompnumthreads() - + inFlightPair.first->messages_size(); + + // TODO: this only works for single host OpenMP requests + if (requestedButNotOccupiedSlots > 0) { + auto mainHost = inFlightPair.second->hosts.at(0); + + SPDLOG_DEBUG("Tried to schedule OpenMP app (appid: {})" + " in host {} while another in-flight OpenMP app" + " (appid: {}) had too few messages in flight " + " ({} < {})", + appId, + mainHost, + thisAppId, + inFlightPair.first->messages_size(), + inFlightPair.first->messages(0).ompnumthreads()); + hostMapCopy.at(mainHost)->usedSlots += + requestedButNotOccupiedSlots; + } + } + } // Check if there exists a pre-loaded scheduling decision for this app // (e.g. if we want to force a migration). Note that we don't want to check // pre-loaded decisions for dist-change requests std::shared_ptr decision = nullptr; - if (!isDistChange && state.preloadedSchedulingDecisions.contains(appId)) { + if (!isDistChange && existsPreloadedDec) { decision = getPreloadedSchedulingDecision(appId, req); - } else if (isNew && isMpi) { - mpiReq = std::make_shared(); - *mpiReq = *req; + + // In general, after a scale change decision (that has been preloaded) + // it is safe to remove it + if (isScaleChange) { + SPDLOG_DEBUG("Removing pre-loaded scheduling decision for app {}", + appId); + state.preloadedSchedulingDecisions.erase(appId); + } + } else if (isNew && (isMpi || isOmp)) { + knownSizeReq = std::make_shared(); + *knownSizeReq = *req; // Deep-copy as many messages we can from the original BER, and mock // the rest - for (int i = req->messages_size(); i < req->messages(0).mpiworldsize(); - i++) { - auto* newMpiMsg = mpiReq->add_messages(); + size_t reqSize = isMpi ? req->messages(0).mpiworldsize() + : req->messages(0).ompnumthreads(); + assert(reqSize > 0); + for (int i = req->messages_size(); i < reqSize; i++) { + auto* newMpiMsg = knownSizeReq->add_messages(); newMpiMsg->set_appid(req->appid()); newMpiMsg->set_groupidx(i); } - assert(mpiReq->messages_size() == req->messages(0).mpiworldsize()); + assert(knownSizeReq->messages_size() == reqSize); decision = batchScheduler->makeSchedulingDecision( - hostMapCopy, state.inFlightReqs, mpiReq); + hostMapCopy, state.inFlightReqs, knownSizeReq); } else { decision = batchScheduler->makeSchedulingDecision( hostMapCopy, state.inFlightReqs, req); @@ -853,6 +1011,26 @@ Planner::callBatch(std::shared_ptr req) return decision; } + bool isSingleHost = decision->isSingleHost(); + if (!isSingleHost && req->singlehosthint()) { + // In an elastic OpenMP execution, it may happen that we try to + // schedule an app, but another one has been elastically scaled + if (isNew && isOmp && req->elasticscalehint()) { + // Let the caller handle that there are not enough slots + return std::make_shared< + faabric::batch_scheduler::SchedulingDecision>( + NOT_ENOUGH_SLOTS_DECISION); + } + + // This is likely a fatal error and a sign that something has gone + // very wrong. We still do not crash the planner + SPDLOG_ERROR( + "User provided single-host hint in BER, but decision is not!"); + + return std::make_shared( + NOT_ENOUGH_SLOTS_DECISION); + } + // If we have managed to schedule a frozen request, un-freeze it by // removing it from the evicted request map if (state.evictedRequests.contains(appId)) { @@ -901,7 +1079,7 @@ Planner::callBatch(std::shared_ptr req) } // Skip claiming slots and ports if we have preemptively allocated them - bool skipClaim = decision->groupId == MPI_PRELOADED_DECISION_GROUPID; + bool skipClaim = decision->groupId == FIXED_SIZE_PRELOADED_DECISION_GROUPID; // A scheduling decision will create a new PTP mapping and, as a // consequence, a new group ID @@ -935,20 +1113,22 @@ Planner::callBatch(std::shared_ptr req) decision->print(); #endif - // For a NEW MPI decision that was not preloaded we have + // For a NEW MPI/OpenMP decision that was not preloaded we have // preemptively scheduled all MPI messages but now we just need to // return the first one, and preload the rest - if (isMpi && mpiReq != nullptr) { - auto mpiDecision = std::make_shared< + if ((isMpi || isOmp) && knownSizeReq != nullptr) { + auto knownSizeDecision = std::make_shared< faabric::batch_scheduler::SchedulingDecision>(req->appid(), req->groupid()); - *mpiDecision = *decision; - mpiDecision->groupId = MPI_PRELOADED_DECISION_GROUPID; - state.preloadedSchedulingDecisions[appId] = mpiDecision; + *knownSizeDecision = *decision; + knownSizeDecision->groupId = + FIXED_SIZE_PRELOADED_DECISION_GROUPID; + state.preloadedSchedulingDecisions[appId] = knownSizeDecision; // Remove all messages that we do not have to dispatch now - for (int i = 1; i < mpiDecision->messageIds.size(); i++) { - decision->removeMessage(mpiDecision->messageIds.at(i)); + for (int i = 1; i < knownSizeDecision->messageIds.size(); i++) { + decision->removeMessage( + knownSizeDecision->messageIds.at(i)); } } @@ -1139,16 +1319,15 @@ void Planner::dispatchSchedulingDecision( hostRequests[thisHost]->set_singlehost(isSingleHost); // Propagate the single host hint hostRequests[thisHost]->set_singlehosthint(req->singlehosthint()); + // Propagate the elastic scaling hint + hostRequests[thisHost]->set_elasticscalehint( + req->elasticscalehint()); } *hostRequests[thisHost]->add_messages() = msg; } bool isThreads = req->type() == faabric::BatchExecuteRequest::THREADS; - if (!isSingleHost && req->singlehosthint()) { - SPDLOG_ERROR( - "User provided single-host hint in BER, but decision is not!"); - } for (const auto& [hostIp, hostReq] : hostRequests) { SPDLOG_DEBUG("Dispatching {} messages to host {} for execution", diff --git a/src/planner/PlannerClient.cpp b/src/planner/PlannerClient.cpp index 0dc3a7e97..237e278d7 100644 --- a/src/planner/PlannerClient.cpp +++ b/src/planner/PlannerClient.cpp @@ -307,13 +307,16 @@ faabric::batch_scheduler::SchedulingDecision PlannerClient::callFunctions( // to other hosts. Given that we don't support nested threading, if we // have a THREADS request here it means that we are being called from the // main thread (which holds the main snapshot) - const std::string funcStr = - faabric::util::funcToString(req->messages(0), false); + std::string snapshotKey; auto& reg = faabric::snapshot::getSnapshotRegistry(); - std::string snapshotKey; - const auto firstMsg = req->messages(0); - if (isThreads) { + // Note that with threads we may have 0-sized BERs + if (isThreads && req->messages_size() > 0) { + const std::string funcStr = + faabric::util::funcToString(req->messages(0), false); + + const auto firstMsg = req->messages(0); + if (!firstMsg.snapshotkey().empty()) { SPDLOG_ERROR("{} should not provide snapshot key for {} threads", funcStr, diff --git a/src/planner/PlannerEndpointHandler.cpp b/src/planner/PlannerEndpointHandler.cpp index 2ef7f65ea..4b67ecbfb 100644 --- a/src/planner/PlannerEndpointHandler.cpp +++ b/src/planner/PlannerEndpointHandler.cpp @@ -187,6 +187,22 @@ void PlannerEndpointHandler::onRequest( inFlightPair.first->messages(0).mpiworldsize()); } + if (inFlightPair.first->messages(0).isomp()) { + // What if we told here the scaled-up size (?) + int numOmpThreads = + inFlightPair.first->messages(0).ompnumthreads(); + + if (inFlightPair.first->elasticscalehint() && + numOmpThreads < inFlightPair.first->messages_size()) { + + inFlightAppResp->set_size( + inFlightPair.first->messages_size()); + } else { + inFlightAppResp->set_size( + inFlightPair.first->messages(0).ompnumthreads()); + } + } + for (const auto& hostIp : decision->hosts) { inFlightAppResp->add_hostips(hostIp); } diff --git a/src/proto/faabric.proto b/src/proto/faabric.proto index 490e1cca2..a875305a2 100644 --- a/src/proto/faabric.proto +++ b/src/proto/faabric.proto @@ -54,6 +54,9 @@ message BatchExecuteRequest { // Hint set by the user to hint that this execution should all be in a // single host bool singleHostHint = 11; + + // Hint set by the user to make scale-up requests elastic + bool elasticScaleHint = 12; } message BatchExecuteRequestStatus { @@ -142,6 +145,10 @@ message Message { repeated int32 chainedMsgIds = 36; map intExecGraphDetails = 37; map execGraphDetails = 38; + + // OpenMP + bool isOmp = 39; + int32 ompNumThreads = 40; } // --------------------------------------------- diff --git a/src/scheduler/Scheduler.cpp b/src/scheduler/Scheduler.cpp index 5c16c6666..224710711 100644 --- a/src/scheduler/Scheduler.cpp +++ b/src/scheduler/Scheduler.cpp @@ -249,6 +249,10 @@ long Scheduler::getFunctionExecutorCount(const faabric::Message& msg) void Scheduler::executeBatch(std::shared_ptr req) { + if (req->messages_size() == 0) { + return; + } + faabric::util::FullLock lock(mx); bool isThreads = req->type() == faabric::BatchExecuteRequest::THREADS;