From df2c945b3d701d8b474e9d05408a1af752a3ba93 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Wed, 8 May 2024 16:08:59 +0000 Subject: [PATCH] threads: handle zero-sized requests and preload a la mpi --- src/batch-scheduler/BinPackScheduler.cpp | 7 ++ src/batch-scheduler/SchedulingDecision.cpp | 2 +- src/planner/Planner.cpp | 98 ++++++++++++++-------- src/planner/PlannerClient.cpp | 13 +-- src/planner/PlannerEndpointHandler.cpp | 5 ++ src/proto/faabric.proto | 4 + src/scheduler/Scheduler.cpp | 4 + 7 files changed, 94 insertions(+), 39 deletions(-) diff --git a/src/batch-scheduler/BinPackScheduler.cpp b/src/batch-scheduler/BinPackScheduler.cpp index 452435ab7..ef5733964 100644 --- a/src/batch-scheduler/BinPackScheduler.cpp +++ b/src/batch-scheduler/BinPackScheduler.cpp @@ -307,6 +307,13 @@ std::shared_ptr BinPackScheduler::makeSchedulingDecision( auto decisionType = getDecisionType(inFlightReqs, req); auto sortedHosts = getSortedHosts(hostMap, inFlightReqs, req, decisionType); + // For an OpenMP request with the single host hint, we only consider + // scheduling in one VM + bool isOmp = req->messages_size() > 0 && req->messages(0).isomp(); + if (req->singlehosthint() && isOmp) { + sortedHosts.erase(sortedHosts.begin() + 1, sortedHosts.end()); + } + // Assign slots from the list (i.e. bin-pack) auto it = sortedHosts.begin(); int numLeftToSchedule = req->messages_size(); diff --git a/src/batch-scheduler/SchedulingDecision.cpp b/src/batch-scheduler/SchedulingDecision.cpp index 31bd97351..9c953c605 100644 --- a/src/batch-scheduler/SchedulingDecision.cpp +++ b/src/batch-scheduler/SchedulingDecision.cpp @@ -15,7 +15,7 @@ bool SchedulingDecision::isSingleHost() const std::string thisHost = conf.endpointHost; std::set hostSet(hosts.begin(), hosts.end()); - return hostSet.size() == 1; + return hostSet.size() <= 1; } void SchedulingDecision::addMessage(const std::string& host, diff --git a/src/planner/Planner.cpp b/src/planner/Planner.cpp index c5b106cec..878bf23df 100644 --- a/src/planner/Planner.cpp +++ b/src/planner/Planner.cpp @@ -19,7 +19,7 @@ // Special group ID magic to indicate MPI decisions that we have preemptively // scheduled -#define MPI_PRELOADED_DECISION_GROUPID -99 +#define FIXED_SIZE_PRELOADED_DECISION_GROUPID -99 namespace faabric::planner { @@ -781,7 +781,8 @@ Planner::callBatch(std::shared_ptr req) decisionType == faabric::batch_scheduler::DecisionType::SCALE_CHANGE; bool isDistChange = decisionType == faabric::batch_scheduler::DecisionType::DIST_CHANGE; - bool existsPreloadedDec = state.preloadedSchedulingDecisions.contains(appId); + bool existsPreloadedDec = + state.preloadedSchedulingDecisions.contains(appId); // For a SCALE_CHANGE decision (i.e. fork) with the elastic flag set, we // want to scale up to as many available cores as possible in the app's @@ -793,26 +794,49 @@ Planner::callBatch(std::shared_ptr req) int numAvail = availableSlots(state.hostMap.at(mainHost)); int numRequested = req->messages_size(); - int lastMsgIdx = req->messages(numRequested - 1).groupidx(); - for (int itr = 0; itr < (numAvail - numRequested); itr++) { + int lastMsgIdx = + numRequested == 0 ? 0 : req->messages(numRequested - 1).groupidx(); + for (int itr = 0; itr < (numAvail - numRequested); itr++) { // Differentiate between the position in the message array (itr) // and the new group index. Usually, in a fork, they would be // offset by one int msgIdx = lastMsgIdx + itr + 1; - SPDLOG_DEBUG("Adding elastically scaled up msg idx {} (app: {})", msgIdx, appId); + SPDLOG_DEBUG("Adding elastically scaled up msg idx {} (app: {})", + msgIdx, + appId); // To add a new message, copy from the last, and update the indexes - *req->add_messages() = req->messages(numRequested - 1); - req->mutable_messages(numRequested + itr)->set_appidx(msgIdx); - req->mutable_messages(numRequested + itr)->set_groupidx(msgIdx); + if (numRequested == 0) { + // This is a special case where we scale up from zero + // parallelism (i.e. 1 OpenMP thread) that requires special + // care + auto* newMsg = req->add_messages(); + *newMsg = state.inFlightReqs.at(appId).first->messages(0); + newMsg->set_mainhost(mainHost); + newMsg->set_appidx(msgIdx); + newMsg->set_groupidx(msgIdx); + + // For requests that elastically scale from 1 (i.e. zero- + // parallelism) we make use of the group id field to pass the + // actual function pointer as a hack + newMsg->set_funcptr(req->groupid()); + } else { + *req->add_messages() = req->messages(numRequested - 1); + req->mutable_messages(numRequested + itr)->set_appidx(msgIdx); + req->mutable_messages(numRequested + itr)->set_groupidx(msgIdx); + } // Also update the message id to make sure we can wait-for and // clean-up the resources we use - req->mutable_messages(numRequested + itr)->set_id(faabric::util::generateGid()); + req->mutable_messages(numRequested + itr) + ->set_id(faabric::util::generateGid()); } if (numAvail > numRequested) { - SPDLOG_INFO("Elastically scaled-up app {} ({} -> {})", appId, numRequested, numAvail); + SPDLOG_INFO("Elastically scaled-up app {} ({} -> {})", + appId, + numRequested, + numAvail); } else { SPDLOG_INFO("Decided NOT to elastically scaled-up app {}", appId); } @@ -831,13 +855,14 @@ Planner::callBatch(std::shared_ptr req) } } - // For a NEW decision of an MPI application, we know that it will be - // followed-up by a SCALE_CHANGE one, and that the mpi_world_size parameter - // must be set. Thus, we can schedule slots for all the MPI ranks, and - // consume them later as a preloaded scheduling decision + // For a NEW decision of an MPI/OpenMP application, we know that it will be + // followed-up by a SCALE_CHANGE one, and that the size parameter + // must be set. Thus, we can schedule slots for all the MPI ranks/OMP + // threads, and consume them later as a preloaded scheduling decision bool isNew = decisionType == faabric::batch_scheduler::DecisionType::NEW; - bool isMpi = req->messages(0).ismpi(); - std::shared_ptr mpiReq = nullptr; + bool isMpi = req->messages_size() > 0 && req->messages(0).ismpi(); + bool isOmp = req->messages_size() > 0 && req->messages(0).isomp(); + std::shared_ptr knownSizeReq = nullptr; // Check if there exists a pre-loaded scheduling decision for this app // (e.g. if we want to force a migration). Note that we don't want to check @@ -849,25 +874,29 @@ Planner::callBatch(std::shared_ptr req) // In general, after a scale change decision (that has been preloaded) // it is safe to remove it if (isScaleChange) { + SPDLOG_DEBUG("Removing pre-loaded scheduling decision for app {}", + appId); state.preloadedSchedulingDecisions.erase(appId); } - } else if (isNew && isMpi) { - mpiReq = std::make_shared(); - *mpiReq = *req; + } else if (isNew && (isMpi || isOmp)) { + knownSizeReq = std::make_shared(); + *knownSizeReq = *req; // Deep-copy as many messages we can from the original BER, and mock // the rest - for (int i = req->messages_size(); i < req->messages(0).mpiworldsize(); - i++) { - auto* newMpiMsg = mpiReq->add_messages(); + size_t reqSize = isMpi ? req->messages(0).mpiworldsize() + : req->messages(0).ompnumthreads(); + assert(reqSize > 0); + for (int i = req->messages_size(); i < reqSize; i++) { + auto* newMpiMsg = knownSizeReq->add_messages(); newMpiMsg->set_appid(req->appid()); newMpiMsg->set_groupidx(i); } - assert(mpiReq->messages_size() == req->messages(0).mpiworldsize()); + assert(knownSizeReq->messages_size() == reqSize); decision = batchScheduler->makeSchedulingDecision( - hostMapCopy, state.inFlightReqs, mpiReq); + hostMapCopy, state.inFlightReqs, knownSizeReq); } else { decision = batchScheduler->makeSchedulingDecision( hostMapCopy, state.inFlightReqs, req); @@ -953,7 +982,7 @@ Planner::callBatch(std::shared_ptr req) } // Skip claiming slots and ports if we have preemptively allocated them - bool skipClaim = decision->groupId == MPI_PRELOADED_DECISION_GROUPID; + bool skipClaim = decision->groupId == FIXED_SIZE_PRELOADED_DECISION_GROUPID; // A scheduling decision will create a new PTP mapping and, as a // consequence, a new group ID @@ -990,17 +1019,19 @@ Planner::callBatch(std::shared_ptr req) // For a NEW MPI decision that was not preloaded we have // preemptively scheduled all MPI messages but now we just need to // return the first one, and preload the rest - if (isMpi && mpiReq != nullptr) { - auto mpiDecision = std::make_shared< + if ((isMpi || isOmp) && knownSizeReq != nullptr) { + auto knownSizeDecision = std::make_shared< faabric::batch_scheduler::SchedulingDecision>(req->appid(), req->groupid()); - *mpiDecision = *decision; - mpiDecision->groupId = MPI_PRELOADED_DECISION_GROUPID; - state.preloadedSchedulingDecisions[appId] = mpiDecision; + *knownSizeDecision = *decision; + knownSizeDecision->groupId = + FIXED_SIZE_PRELOADED_DECISION_GROUPID; + state.preloadedSchedulingDecisions[appId] = knownSizeDecision; // Remove all messages that we do not have to dispatch now - for (int i = 1; i < mpiDecision->messageIds.size(); i++) { - decision->removeMessage(mpiDecision->messageIds.at(i)); + for (int i = 1; i < knownSizeDecision->messageIds.size(); i++) { + decision->removeMessage( + knownSizeDecision->messageIds.at(i)); } } @@ -1192,7 +1223,8 @@ void Planner::dispatchSchedulingDecision( // Propagate the single host hint hostRequests[thisHost]->set_singlehosthint(req->singlehosthint()); // Propagate the elastic scaling hint - hostRequests[thisHost]->set_elasticscalehint(req->elasticscalehint()); + hostRequests[thisHost]->set_elasticscalehint( + req->elasticscalehint()); } *hostRequests[thisHost]->add_messages() = msg; diff --git a/src/planner/PlannerClient.cpp b/src/planner/PlannerClient.cpp index 0dc3a7e97..237e278d7 100644 --- a/src/planner/PlannerClient.cpp +++ b/src/planner/PlannerClient.cpp @@ -307,13 +307,16 @@ faabric::batch_scheduler::SchedulingDecision PlannerClient::callFunctions( // to other hosts. Given that we don't support nested threading, if we // have a THREADS request here it means that we are being called from the // main thread (which holds the main snapshot) - const std::string funcStr = - faabric::util::funcToString(req->messages(0), false); + std::string snapshotKey; auto& reg = faabric::snapshot::getSnapshotRegistry(); - std::string snapshotKey; - const auto firstMsg = req->messages(0); - if (isThreads) { + // Note that with threads we may have 0-sized BERs + if (isThreads && req->messages_size() > 0) { + const std::string funcStr = + faabric::util::funcToString(req->messages(0), false); + + const auto firstMsg = req->messages(0); + if (!firstMsg.snapshotkey().empty()) { SPDLOG_ERROR("{} should not provide snapshot key for {} threads", funcStr, diff --git a/src/planner/PlannerEndpointHandler.cpp b/src/planner/PlannerEndpointHandler.cpp index 2ef7f65ea..b76385d63 100644 --- a/src/planner/PlannerEndpointHandler.cpp +++ b/src/planner/PlannerEndpointHandler.cpp @@ -187,6 +187,11 @@ void PlannerEndpointHandler::onRequest( inFlightPair.first->messages(0).mpiworldsize()); } + if (inFlightPair.first->messages(0).isomp()) { + inFlightAppResp->set_size( + inFlightPair.first->messages(0).ompnumthreads()); + } + for (const auto& hostIp : decision->hosts) { inFlightAppResp->add_hostips(hostIp); } diff --git a/src/proto/faabric.proto b/src/proto/faabric.proto index 56abf85dd..a875305a2 100644 --- a/src/proto/faabric.proto +++ b/src/proto/faabric.proto @@ -145,6 +145,10 @@ message Message { repeated int32 chainedMsgIds = 36; map intExecGraphDetails = 37; map execGraphDetails = 38; + + // OpenMP + bool isOmp = 39; + int32 ompNumThreads = 40; } // --------------------------------------------- diff --git a/src/scheduler/Scheduler.cpp b/src/scheduler/Scheduler.cpp index 5c16c6666..224710711 100644 --- a/src/scheduler/Scheduler.cpp +++ b/src/scheduler/Scheduler.cpp @@ -249,6 +249,10 @@ long Scheduler::getFunctionExecutorCount(const faabric::Message& msg) void Scheduler::executeBatch(std::shared_ptr req) { + if (req->messages_size() == 0) { + return; + } + faabric::util::FullLock lock(mx); bool isThreads = req->type() == faabric::BatchExecuteRequest::THREADS;