Skip to content

Commit

Permalink
Fixes
Browse files Browse the repository at this point in the history
Missing files
  • Loading branch information
henricasanova committed Nov 18, 2023
1 parent 842bc4a commit 8c12037
Show file tree
Hide file tree
Showing 7 changed files with 998 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ namespace wrench {
std::string data_to_send;
std::string reply_received;
std::shared_ptr<BatchComputeService> batch_service;
S4U_Commport *batch_service_commport;
S4U_CommPort *batch_service_commport;

void sendExecuteMessageToBatchComputeService(S4U_CommPort *answer_commport, std::string execute_job_reply_data);
void sendQueryAnswerMessageToBatchComputeService(double estimated_waiting_time);
Expand Down
197 changes: 197 additions & 0 deletions include/wrench/simgrid_S4U_util/S4U_CommPort.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/**
* Copyright (c) 2017. The WRENCH Team.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
*/

#ifndef WRENCH_S4U_MAILBOX_H
#define WRENCH_S4U_MAILBOX_H


#include <string>
#include <map>
#include <set>
#include <typeinfo>
#include <boost/core/demangle.hpp>
#include <simgrid/s4u.hpp>
#include <wrench/simulation/SimulationMessage.h>
namespace wrench {

/***********************/
/** \cond INTERNAL */
/***********************/

//class SimulationMessage;
class S4U_PendingCommunication;

/**
* @brief Wrappers around S4U's communication methods
*/
class S4U_CommPort {

public:

/**
* @brief Constructor
*/
S4U_CommPort() {
this->s4u_mb = simgrid::s4u::Mailbox::by_name("tmp" + std::to_string(S4U_CommPort::generateUniqueSequenceNumber()));
this->name = this->s4u_mb->get_name();
}

/**
* @brief Synchronously receive a message from a commport_name
*
* @param error_prefix: any string you wish to prefix the error message with
* @return the message, in a unique_ptr of the type specified. Otherwise throws a runtime_error
*
* @throw std::shared_ptr<NetworkError>
*/
template<class TMessageType>
std::unique_ptr<TMessageType> getMessage(const std::string &error_prefix = "") {
auto id = ++messageCounter;
#ifndef NDEBUG
char const *name = typeid(TMessageType).name();
std::string tn = boost::core::demangle(name);
this->templateWaitingLog(tn, id);
#endif


auto message = this->getMessage(false);

if (auto msg = dynamic_cast<TMessageType *>(message.get())) {
#ifndef NDEBUG
this->templateWaitingLogUpdate(tn, id);
#endif
message.release();
return std::unique_ptr<TMessageType>(msg);
} else {
char const *name = typeid(TMessageType).name();
std::string tn = boost::core::demangle(name);
throw std::runtime_error(error_prefix + " Unexpected [" + message->getName() + "] message while waiting for " + tn.c_str() + ". Request ID: " + std::to_string(id));
}
}
/**
* @brief Synchronously receive a message from a commport_name
*
* @param error_prefix: any string you wish to prefix the error message with
* @param timeout: a timeout value in seconds (<0 means never timeout)
*
* @return the message, in a unique_ptr of the type specified. Otherwise throws a runtime_error
*
* @throw std::shared_ptr<NetworkError>
*/
template<class TMessageType>
std::unique_ptr<TMessageType> getMessage(double timeout, const std::string &error_prefix = "") {
auto id = ++messageCounter;
#ifndef NDEBUG
char const *name = typeid(TMessageType).name();
std::string tn = boost::core::demangle(name);
this->templateWaitingLog(tn, id);
#endif


auto message = this->getMessage(timeout, false);

if (auto msg = dynamic_cast<TMessageType *>(message.get())) {
message.release();
#ifndef NDEBUG
this->templateWaitingLogUpdate(tn, id);
#endif
return std::unique_ptr<TMessageType>(msg);
} else {
char const *name = typeid(TMessageType).name();
std::string tn = boost::core::demangle(name);
throw std::runtime_error(error_prefix + " Unexpected [" + message->getName() + "] message while waiting for " + tn.c_str() + ". Request ID: " + std::to_string(id));
}
}
/**
* @brief Synchronously receive a message from a commport_name
*
* @return the message, or nullptr (in which case it's likely a brutal termination)
*
* @throw std::shared_ptr<NetworkError>
*/
std::unique_ptr<SimulationMessage> getMessage() {
return getMessage(true);
}
/**
* @brief Synchronously receive a message from a commport_name, with a timeout
*
* @param timeout: a timeout value in seconds (<0 means never timeout)
* @return the message, or nullptr (in which case it's likely a brutal termination)
*
* @throw std::shared_ptr<NetworkError>
*/
std::unique_ptr<SimulationMessage> getMessage(double timeout) {
return this->getMessage(timeout, true);
}
void putMessage(SimulationMessage *m);
void dputMessage(SimulationMessage *msg);
std::shared_ptr<S4U_PendingCommunication> iputMessage(SimulationMessage *msg);
std::shared_ptr<S4U_PendingCommunication> igetMessage();

static unsigned long generateUniqueSequenceNumber();

static S4U_CommPort *getTemporaryCommPort();
static void retireTemporaryCommPort(S4U_CommPort *commport);

static void createCommPortPool(unsigned long num_commports);

/**
* @brief The commport_name pool size
*/
static unsigned long commport_pool_size;

/**
* @brief The default control message size
*/
static double default_control_message_size;

/**
* @brief The "not a commport_name" commport_name, to avoid getting answers back when asked
* to prove an "answer commport_name"
*/
static S4U_CommPort *NULL_MAILBOX;

const std::string get_name() const {
return this->name;
}

const char *get_cname() const {
return this->name.c_str();
}

private:
friend class S4U_Daemon;
friend class S4U_PendingCommunication;

simgrid::s4u::Mailbox *s4u_mb;

std::unique_ptr<SimulationMessage> getMessage(bool log);
std::unique_ptr<SimulationMessage> getMessage(double timeout, bool log);

void templateWaitingLog(const std::string& type, unsigned long long id);
void templateWaitingLogUpdate(const std::string& type, unsigned long long id);

static std::vector<std::unique_ptr<S4U_CommPort>> all_commports;
static std::deque<S4U_CommPort *> free_commports;
static std::set<S4U_CommPort *> used_commports;
static std::deque<S4U_CommPort *> commports_to_drain;
static unsigned long long messageCounter;

std::string name;
};

/***********************/
/** \endcond */
/***********************/

}// namespace wrench


#endif//WRENCH_S4U_MAILBOX_H
4 changes: 2 additions & 2 deletions src/wrench/services/compute/batch/BatschedNetworkListener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ namespace wrench {
*/
void BatschedNetworkListener::sendExecuteMessageToBatchComputeService(S4U_CommPort *answer_commport,
std::string execute_job_reply_data) {
S4U_CommPort::putMessage(this->batch_service_commport,
this->batch_service_commport->S4U_CommPort::putMessage(
new BatchExecuteJobFromBatSchedMessage(answer_commport, execute_job_reply_data, 0));
}

Expand All @@ -113,7 +113,7 @@ namespace wrench {
* @param estimated_waiting_time: BatchComputeService queue wait time estimate
*/
void BatschedNetworkListener::sendQueryAnswerMessageToBatchComputeService(double estimated_waiting_time) {
S4U_CommPort::putMessage(this->batch_service_commport,
this->batch_service_commport->S4U_CommPort::putMessage(
new BatchQueryAnswerMessage(estimated_waiting_time, 0));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ namespace wrench {
for (auto job: set_of_jobs) {
// Get the answer
std::unique_ptr<SimulationMessage> message = nullptr;
auto msg = S4U_CommPort::getMessage<BatchQueryAnswerMessage>(batchsched_query_commport,
auto msg = batchsched_query_commport->getMessage<BatchQueryAnswerMessage>(
"[This error likely means that the scheduling algorithm that Batsched was configured to use (\"" +
this->cs->getPropertyValueAsString(BatchComputeServiceProperty::BATCH_SCHEDULING_ALGORITHM) +
"\") does not support queue waiting time predictions!]. Received an");
Expand Down Expand Up @@ -347,7 +347,7 @@ namespace wrench {
std::string data = batch_submission_data.dump();
std::shared_ptr<BatschedNetworkListener> network_listener =
std::shared_ptr<BatschedNetworkListener>(
new BatschedNetworkListener(this->cs->hostname, this->cs->getSharedPtr<BatchComputeService>(), this->cs->commport_name,
new BatschedNetworkListener(this->cs->hostname, this->cs->getSharedPtr<BatchComputeService>(), this->cs->commport,
std::to_string(this->batsched_port),
data));
network_listener->setSimulation(this->cs->getSimulation());
Expand Down Expand Up @@ -416,7 +416,7 @@ namespace wrench {
std::shared_ptr<BatschedNetworkListener> network_listener =
std::shared_ptr<BatschedNetworkListener>(
new BatschedNetworkListener(this->cs->hostname, this->cs->getSharedPtr<BatchComputeService>(),
this->cs->commport_name,
this->cs->commport,
std::to_string(this->batsched_port),
data));
network_listener->setSimulation(this->cs->getSimulation());
Expand Down Expand Up @@ -524,7 +524,7 @@ namespace wrench {
try {
std::shared_ptr<BatschedNetworkListener> network_listener =
std::shared_ptr<BatschedNetworkListener>(
new BatschedNetworkListener(this->cs->hostname, this->cs->getSharedPtr<BatchComputeService>(), this->cs->commport_name,
new BatschedNetworkListener(this->cs->hostname, this->cs->getSharedPtr<BatchComputeService>(), this->cs->commport,
std::to_string(this->batsched_port),
data));
network_listener->setSimulation(this->cs->getSimulation());
Expand Down
Loading

0 comments on commit 8c12037

Please sign in to comment.