From 8c12037d1c073a4ea6df56941dddcf4cd14b73ac Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Sat, 18 Nov 2023 08:22:18 -1000 Subject: [PATCH] Fixes Missing files --- .../compute/batch/BatschedNetworkListener.h | 2 +- .../wrench/simgrid_S4U_util/S4U_CommPort.h | 197 ++++++++ .../compute/batch/BatschedNetworkListener.cpp | 4 +- .../batsched/BatschedBatchScheduler.cpp | 8 +- src/wrench/simgrid_S4U_util/S4U_CommPort.cpp | 359 +++++++++++++++ src/wrench/simgrid_S4U_util/S4U_Daemon.cpp | 3 - test/simulation/S4U_CommPortTest.cpp | 435 ++++++++++++++++++ 7 files changed, 998 insertions(+), 10 deletions(-) create mode 100755 include/wrench/simgrid_S4U_util/S4U_CommPort.h create mode 100755 src/wrench/simgrid_S4U_util/S4U_CommPort.cpp create mode 100755 test/simulation/S4U_CommPortTest.cpp diff --git a/include/wrench/services/compute/batch/BatschedNetworkListener.h b/include/wrench/services/compute/batch/BatschedNetworkListener.h index 8aace1d5ea..16210794bd 100755 --- a/include/wrench/services/compute/batch/BatschedNetworkListener.h +++ b/include/wrench/services/compute/batch/BatschedNetworkListener.h @@ -48,7 +48,7 @@ namespace wrench { std::string data_to_send; std::string reply_received; std::shared_ptr batch_service; - S4U_Commport *batch_service_commport; + S4U_CommPort *batch_service_commport; void sendExecuteMessageToBatchComputeService(S4U_CommPort *answer_commport, std::string execute_job_reply_data); void sendQueryAnswerMessageToBatchComputeService(double estimated_waiting_time); diff --git a/include/wrench/simgrid_S4U_util/S4U_CommPort.h b/include/wrench/simgrid_S4U_util/S4U_CommPort.h new file mode 100755 index 0000000000..c962727b52 --- /dev/null +++ b/include/wrench/simgrid_S4U_util/S4U_CommPort.h @@ -0,0 +1,197 @@ +/** + * Copyright (c) 2017. The WRENCH Team. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + */ + +#ifndef WRENCH_S4U_MAILBOX_H +#define WRENCH_S4U_MAILBOX_H + + +#include +#include +#include +#include +#include +#include +#include +namespace wrench { + + /***********************/ + /** \cond INTERNAL */ + /***********************/ + + //class SimulationMessage; + class S4U_PendingCommunication; + + /** + * @brief Wrappers around S4U's communication methods + */ + class S4U_CommPort { + + public: + + /** + * @brief Constructor + */ + S4U_CommPort() { + this->s4u_mb = simgrid::s4u::Mailbox::by_name("tmp" + std::to_string(S4U_CommPort::generateUniqueSequenceNumber())); + this->name = this->s4u_mb->get_name(); + } + + /** + * @brief Synchronously receive a message from a commport_name + * + * @param error_prefix: any string you wish to prefix the error message with + * @return the message, in a unique_ptr of the type specified. Otherwise throws a runtime_error + * + * @throw std::shared_ptr + */ + template + std::unique_ptr getMessage(const std::string &error_prefix = "") { + auto id = ++messageCounter; +#ifndef NDEBUG + char const *name = typeid(TMessageType).name(); + std::string tn = boost::core::demangle(name); + this->templateWaitingLog(tn, id); +#endif + + + auto message = this->getMessage(false); + + if (auto msg = dynamic_cast(message.get())) { +#ifndef NDEBUG + this->templateWaitingLogUpdate(tn, id); +#endif + message.release(); + return std::unique_ptr(msg); + } else { + char const *name = typeid(TMessageType).name(); + std::string tn = boost::core::demangle(name); + throw std::runtime_error(error_prefix + " Unexpected [" + message->getName() + "] message while waiting for " + tn.c_str() + ". Request ID: " + std::to_string(id)); + } + } + /** + * @brief Synchronously receive a message from a commport_name + * + * @param error_prefix: any string you wish to prefix the error message with + * @param timeout: a timeout value in seconds (<0 means never timeout) + * + * @return the message, in a unique_ptr of the type specified. Otherwise throws a runtime_error + * + * @throw std::shared_ptr + */ + template + std::unique_ptr getMessage(double timeout, const std::string &error_prefix = "") { + auto id = ++messageCounter; +#ifndef NDEBUG + char const *name = typeid(TMessageType).name(); + std::string tn = boost::core::demangle(name); + this->templateWaitingLog(tn, id); +#endif + + + auto message = this->getMessage(timeout, false); + + if (auto msg = dynamic_cast(message.get())) { + message.release(); +#ifndef NDEBUG + this->templateWaitingLogUpdate(tn, id); +#endif + return std::unique_ptr(msg); + } else { + char const *name = typeid(TMessageType).name(); + std::string tn = boost::core::demangle(name); + throw std::runtime_error(error_prefix + " Unexpected [" + message->getName() + "] message while waiting for " + tn.c_str() + ". Request ID: " + std::to_string(id)); + } + } + /** + * @brief Synchronously receive a message from a commport_name + * + * @return the message, or nullptr (in which case it's likely a brutal termination) + * + * @throw std::shared_ptr + */ + std::unique_ptr getMessage() { + return getMessage(true); + } + /** + * @brief Synchronously receive a message from a commport_name, with a timeout + * + * @param timeout: a timeout value in seconds (<0 means never timeout) + * @return the message, or nullptr (in which case it's likely a brutal termination) + * + * @throw std::shared_ptr + */ + std::unique_ptr getMessage(double timeout) { + return this->getMessage(timeout, true); + } + void putMessage(SimulationMessage *m); + void dputMessage(SimulationMessage *msg); + std::shared_ptr iputMessage(SimulationMessage *msg); + std::shared_ptr igetMessage(); + + static unsigned long generateUniqueSequenceNumber(); + + static S4U_CommPort *getTemporaryCommPort(); + static void retireTemporaryCommPort(S4U_CommPort *commport); + + static void createCommPortPool(unsigned long num_commports); + + /** + * @brief The commport_name pool size + */ + static unsigned long commport_pool_size; + + /** + * @brief The default control message size + */ + static double default_control_message_size; + + /** + * @brief The "not a commport_name" commport_name, to avoid getting answers back when asked + * to prove an "answer commport_name" + */ + static S4U_CommPort *NULL_MAILBOX; + + const std::string get_name() const { + return this->name; + } + + const char *get_cname() const { + return this->name.c_str(); + } + + private: + friend class S4U_Daemon; + friend class S4U_PendingCommunication; + + simgrid::s4u::Mailbox *s4u_mb; + + std::unique_ptr getMessage(bool log); + std::unique_ptr getMessage(double timeout, bool log); + + void templateWaitingLog(const std::string& type, unsigned long long id); + void templateWaitingLogUpdate(const std::string& type, unsigned long long id); + + static std::vector> all_commports; + static std::deque free_commports; + static std::set used_commports; + static std::deque commports_to_drain; + static unsigned long long messageCounter; + + std::string name; + }; + + /***********************/ + /** \endcond */ + /***********************/ + +}// namespace wrench + + +#endif//WRENCH_S4U_MAILBOX_H diff --git a/src/wrench/services/compute/batch/BatschedNetworkListener.cpp b/src/wrench/services/compute/batch/BatschedNetworkListener.cpp index ddef97cb5e..aa54258ef8 100644 --- a/src/wrench/services/compute/batch/BatschedNetworkListener.cpp +++ b/src/wrench/services/compute/batch/BatschedNetworkListener.cpp @@ -104,7 +104,7 @@ namespace wrench { */ void BatschedNetworkListener::sendExecuteMessageToBatchComputeService(S4U_CommPort *answer_commport, std::string execute_job_reply_data) { - S4U_CommPort::putMessage(this->batch_service_commport, + this->batch_service_commport->S4U_CommPort::putMessage( new BatchExecuteJobFromBatSchedMessage(answer_commport, execute_job_reply_data, 0)); } @@ -113,7 +113,7 @@ namespace wrench { * @param estimated_waiting_time: BatchComputeService queue wait time estimate */ void BatschedNetworkListener::sendQueryAnswerMessageToBatchComputeService(double estimated_waiting_time) { - S4U_CommPort::putMessage(this->batch_service_commport, + this->batch_service_commport->S4U_CommPort::putMessage( new BatchQueryAnswerMessage(estimated_waiting_time, 0)); } diff --git a/src/wrench/services/compute/batch/batch_schedulers/batsched/BatschedBatchScheduler.cpp b/src/wrench/services/compute/batch/batch_schedulers/batsched/BatschedBatchScheduler.cpp index b7e7e495fb..ed1a38815a 100755 --- a/src/wrench/services/compute/batch/batch_schedulers/batsched/BatschedBatchScheduler.cpp +++ b/src/wrench/services/compute/batch/batch_schedulers/batsched/BatschedBatchScheduler.cpp @@ -256,7 +256,7 @@ namespace wrench { for (auto job: set_of_jobs) { // Get the answer std::unique_ptr message = nullptr; - auto msg = S4U_CommPort::getMessage(batchsched_query_commport, + auto msg = batchsched_query_commport->getMessage( "[This error likely means that the scheduling algorithm that Batsched was configured to use (\"" + this->cs->getPropertyValueAsString(BatchComputeServiceProperty::BATCH_SCHEDULING_ALGORITHM) + "\") does not support queue waiting time predictions!]. Received an"); @@ -347,7 +347,7 @@ namespace wrench { std::string data = batch_submission_data.dump(); std::shared_ptr network_listener = std::shared_ptr( - new BatschedNetworkListener(this->cs->hostname, this->cs->getSharedPtr(), this->cs->commport_name, + new BatschedNetworkListener(this->cs->hostname, this->cs->getSharedPtr(), this->cs->commport, std::to_string(this->batsched_port), data)); network_listener->setSimulation(this->cs->getSimulation()); @@ -416,7 +416,7 @@ namespace wrench { std::shared_ptr network_listener = std::shared_ptr( new BatschedNetworkListener(this->cs->hostname, this->cs->getSharedPtr(), - this->cs->commport_name, + this->cs->commport, std::to_string(this->batsched_port), data)); network_listener->setSimulation(this->cs->getSimulation()); @@ -524,7 +524,7 @@ namespace wrench { try { std::shared_ptr network_listener = std::shared_ptr( - new BatschedNetworkListener(this->cs->hostname, this->cs->getSharedPtr(), this->cs->commport_name, + new BatschedNetworkListener(this->cs->hostname, this->cs->getSharedPtr(), this->cs->commport, std::to_string(this->batsched_port), data)); network_listener->setSimulation(this->cs->getSimulation()); diff --git a/src/wrench/simgrid_S4U_util/S4U_CommPort.cpp b/src/wrench/simgrid_S4U_util/S4U_CommPort.cpp new file mode 100755 index 0000000000..debee3e46a --- /dev/null +++ b/src/wrench/simgrid_S4U_util/S4U_CommPort.cpp @@ -0,0 +1,359 @@ +/** + * Copyright (c) 2017-2021. The WRENCH Team. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + */ + +#include +#include +#include +#include +#include + +#ifdef MESSAGE_MANAGER +#include +#endif + +#include + +#include +#include +#include +#include +#include "wrench/exceptions/ExecutionException.h" + +WRENCH_LOG_CATEGORY(wrench_core_commport, "CommPort"); + +namespace wrench { + + S4U_CommPort *S4U_CommPort::NULL_MAILBOX; + + std::vector> S4U_CommPort::all_commports; + std::deque S4U_CommPort::free_commports; + std::set S4U_CommPort::used_commports; + std::deque S4U_CommPort::commports_to_drain; + unsigned long S4U_CommPort::commport_pool_size; + double S4U_CommPort::default_control_message_size; + + + + class WorkflowTask; + /** + * @brief Helper method that avoids calling WRENCH_DEBUG from a .h file and do the logging for the templated getMessage() method. + * It also has the added bonus of checking for inheritance + * + * @param commport: the commport_name so we can get its name + * @param type: a pointer to the message so we have its type + * @param id: an integer id + * + */ + void S4U_CommPort::templateWaitingLog(const std::string& type, unsigned long long id) { + + WRENCH_DEBUG("Waiting for message of type <%s> from commport_name '%s'. Request ID: %llu", type.c_str(), this->s4u_mb->get_cname(), id); + } + + /** + * @brief Helper method that avoids calling WRENCH_DEBUG from a .h file and do the logging for the templated getMessage() method. + * It also has the added bonus of checking for inheritance. + * + * @param commport: the commport_name so we can get its name + * @param type: a pointer to the message so we have its type + * @param id: an integer id + * + */ + void S4U_CommPort::templateWaitingLogUpdate(const std::string& type, unsigned long long id) { + + WRENCH_DEBUG("Received a message of type <%s> from commport_name '%s'. Request ID: %llu", type.c_str(), this->s4u_mb->get_cname(), id); + } + + /** + * @brief Synchronously receive a message from a commport_name + * + * @param commport: the commport_name + * @param log: should the log message be printed + * @return the message, or nullptr (in which case it's likely a brutal termination) + * + * @throw std::shared_ptr + * + */ + std::unique_ptr S4U_CommPort::getMessage(bool log) { + if (this == S4U_CommPort::NULL_MAILBOX) { + throw std::invalid_argument("S4U_CommPort::getMessage(): Cannot be called with NULL_MAILBOX"); + } + + if (log) WRENCH_DEBUG("Getting a message from commport_name '%s'", this->s4u_mb->get_cname()); + SimulationMessage *msg; + try { + // msg = static_cast(commport_name->get()); + msg = this->s4u_mb->get(); + } catch (simgrid::NetworkFailureException &e) { + throw ExecutionException(std::make_shared( + NetworkError::RECEIVING, NetworkError::FAILURE, this->s4u_mb->get_cname())); + } + +#ifdef MESSAGE_MANAGER + MessageManager::removeReceivedMessage(this, msg); +#endif + + WRENCH_DEBUG("Received a '%s' message from commport_name %s", msg->getName().c_str(), this->s4u_mb->get_cname()); + return std::unique_ptr(msg); + } + + /** + * @brief Synchronously receive a message from a commport_name, with a timeout + * + * @param commport: the commport_name + * @param timeout: a timeout value in seconds (<0 means never timeout) + * @param log: should the log message be printed + * @return the message, or nullptr (in which case it's likely a brutal termination) + * + * @throw std::shared_ptr + */ + std::unique_ptr S4U_CommPort::getMessage(double timeout, bool log) { + if (this == S4U_CommPort::NULL_MAILBOX) { + throw std::invalid_argument("S4U_CommPort::getMessage(): Cannot be called with NULL_MAILBOX"); + } + + if (timeout < 0) { + return this->getMessage(); + } + + if (log) WRENCH_DEBUG("Getting a message from commport_name '%s' with timeout %lf sec", this->s4u_mb->get_cname(), timeout); + wrench::SimulationMessage *msg; + + try { + // data = commport_name->get(timeout); + msg = this->s4u_mb->get(timeout); + } catch (simgrid::NetworkFailureException &e) { + throw ExecutionException(std::make_shared( + NetworkError::RECEIVING, NetworkError::FAILURE, this->s4u_mb->get_name())); + } catch (simgrid::TimeoutException &e) { + throw ExecutionException(std::make_shared( + NetworkError::RECEIVING, NetworkError::TIMEOUT, this->s4u_mb->get_name())); + } + + // auto msg = static_cast(data); + + +#ifdef MESSAGE_MANAGER + MessageManager::removeReceivedMessage(this, msg); +#endif + + WRENCH_DEBUG("Received a '%s' message from commport_name '%s'", msg->getName().c_str(), this->s4u_mb->get_cname()); + + return std::unique_ptr(msg); + } + + /** + * @brief Synchronously send a message to a commport_name + * + * @param commport: the commport_name + * @param msg: the SimulationMessage + * + * @throw std::shared_ptr + */ + void S4U_CommPort::putMessage(SimulationMessage *msg) { + + if (this == S4U_CommPort::NULL_MAILBOX) { + return; + } + + WRENCH_DEBUG("Putting a %s message (%.2lf bytes) to commport_name '%s'", + msg->getName().c_str(), msg->payload, + this->s4u_mb->get_cname()); + try { +#ifdef MESSAGE_MANAGER + MessageManager::manageMessage(this, msg); +#endif + this->s4u_mb->put(msg, (uint64_t) msg->payload); + } catch (simgrid::NetworkFailureException &e) { + throw ExecutionException(std::make_shared( + NetworkError::SENDING, NetworkError::FAILURE, this->s4u_mb->get_name())); + } catch (simgrid::TimeoutException &e) { + // Can happen if the other side is doing a timeout.... I think + throw ExecutionException(std::make_shared( + NetworkError::SENDING, NetworkError::TIMEOUT, this->s4u_mb->get_name())); + } + } + + /** + * @brief Asynchronously send a message to a commport_name in a "fire and forget" fashion + * + * @param commport: the commport_name + * @param msg: the SimulationMessage + * + */ + void S4U_CommPort::dputMessage(SimulationMessage *msg) { + + if (this == S4U_CommPort::NULL_MAILBOX) { + return; + } + + WRENCH_DEBUG("Dputting a %s message (%.2lf bytes) to commport_name '%s'", + msg->getName().c_str(), msg->payload, + this->s4u_mb->get_cname()); + +#ifdef MESSAGE_MANAGER + MessageManager::manageMessage(this, msg); +#endif + //if (msg->payload) + this->s4u_mb->put_init(msg, (uint64_t) msg->payload)->detach(); + //else + // commport_name->put(msg, 0); + } + + /** + * @brief Asynchronously send a message to a commport_name + * + * @param commport: the commport_name + * @param msg: the SimulationMessage + * + * @return a pending communication handle + * + * @throw std::shared_ptr + */ + std::shared_ptr + S4U_CommPort::iputMessage(SimulationMessage *msg) { + + if (this == S4U_CommPort::NULL_MAILBOX) { + return nullptr; + } + + WRENCH_DEBUG("Iputting a %s message (%.2lf bytes) to commport_name '%s'", + msg->getName().c_str(), msg->payload, + this->s4u_mb->get_cname()); + + simgrid::s4u::CommPtr comm_ptr = nullptr; + + try { +#ifdef MESSAGE_MANAGER + MessageManager::manageMessage(this, msg); +#endif + comm_ptr = this->s4u_mb->put_async(msg, (uint64_t) msg->payload); + } catch (simgrid::NetworkFailureException &e) { + throw ExecutionException(std::make_shared( + NetworkError::SENDING, NetworkError::FAILURE, this->s4u_mb->get_name())); + } + + auto pending_communication = std::make_shared( + this, S4U_PendingCommunication::OperationType::SENDING); + pending_communication->comm_ptr = comm_ptr; + return pending_communication; + } + + + /** + * @brief Asynchronously receive a message from a commport_name + * + * @param commport: the commport_name + * + * @return a pending communication handle + * + * @throw std::shared_ptr + */ + std::shared_ptr S4U_CommPort::igetMessage() { + + if (this == S4U_CommPort::NULL_MAILBOX) { + throw std::invalid_argument("S4U_CommPort::igetMessage(): Cannot be called with NULL_MAILBOX"); + } + + simgrid::s4u::CommPtr comm_ptr = nullptr; + + WRENCH_DEBUG("Igetting a message from commport_name '%s'", this->s4u_mb->get_cname()); + + std::shared_ptr pending_communication = std::make_shared( + this, S4U_PendingCommunication::OperationType::RECEIVING); + + try { + comm_ptr = this->s4u_mb->get_async((void **) (&(pending_communication->simulation_message))); + } catch (simgrid::NetworkFailureException &e) { + throw ExecutionException(std::make_shared( + NetworkError::RECEIVING, NetworkError::FAILURE, this->s4u_mb->get_name())); + } + pending_communication->comm_ptr = comm_ptr; + return pending_communication; + } + + + /** + * @brief Generate a unique sequence number + * + * @return a unique sequence number + */ + unsigned long S4U_CommPort::generateUniqueSequenceNumber() { + static unsigned long sequence_number = 0; + return sequence_number++; + } + + + /** + * @brief Get a temporary commport_name + * + * @return a temporary commport_name + */ + S4U_CommPort *S4U_CommPort::getTemporaryCommPort() { + if (S4U_CommPort::free_commports.empty()) { + throw std::runtime_error("S4U_CommPort::getTemporaryCommPort(): Out of commportes! " + "(Increase the commport_name pool size with the --wrench-commport_name-pool-size command-line argument (default is 5000))"); + } + + // std::cerr << "FREE MAILBOX: " << S4U_CommPort::free_commports.size() << "\n"; + + auto commport = *(S4U_CommPort::free_commports.end() - 1); + S4U_CommPort::free_commports.pop_back(); + // std::cerr << simgrid::s4u::this_actor::get_pid() << " GOT TEMPORARY MAILBOX " << commport_name->get_name() << "\n"; + + if (not commport->s4u_mb->empty()) { + // std::cerr << "############### WASTING MAILBOX " << commport_name->get_name() << "\n"; + S4U_CommPort::commports_to_drain.push_front(commport); + return S4U_CommPort::getTemporaryCommPort();// Recursive call! + + // // Drain one commport_name + // if (not S4U_CommPort::commports_to_drain.empty()) { + // auto to_drain = *(S4U_CommPort::commports_to_drain.end() - 1); + // std::cerr << "############ UNWASTING MAILBOX " << to_drain->get_name() << "\n"; + // S4U_CommPort::commports_to_drain.pop_back(); + // while (not to_drain->empty()) { + // to_drain->get(); + // } + // } + } + + S4U_CommPort::used_commports.insert(commport); + + return commport; + } + + + /** + * @brief Retire a temporary commport_name + * @param commport: the commport_name to retire + */ + void S4U_CommPort::retireTemporaryCommPort(S4U_CommPort *commport) { + // std::cerr << simgrid::s4u::this_actor::get_pid() << " TRYING TO RETIRE MAILBOX " << commport_name->get_name() << "\n"; + if (S4U_CommPort::used_commports.find(commport) == S4U_CommPort::used_commports.end()) { + return; + } + S4U_CommPort::used_commports.erase(commport); + S4U_CommPort::free_commports.push_front(commport); + // std::cerr << simgrid::s4u::this_actor::get_pid() << " RETIRED MAILBOX " << commport_name->get_name() << "\n"; + } + + /** + * @brief Create the pool of commports to use + * @param num_commports: numb commports in pool + */ + void S4U_CommPort::createCommPortPool(unsigned long num_commports) { + S4U_CommPort::all_commports.reserve(num_commports); + for (unsigned long i = 0; i < num_commports; i++) { + std::unique_ptr mb = std::make_unique(); + S4U_CommPort::free_commports.push_back(mb.get()); + S4U_CommPort::all_commports.push_back(std::move(mb)); + } + } + + unsigned long long S4U_CommPort::messageCounter = 0; +}// namespace wrench diff --git a/src/wrench/simgrid_S4U_util/S4U_Daemon.cpp b/src/wrench/simgrid_S4U_util/S4U_Daemon.cpp index 6e1ab3f54e..0cd8fd1d1c 100755 --- a/src/wrench/simgrid_S4U_util/S4U_Daemon.cpp +++ b/src/wrench/simgrid_S4U_util/S4U_Daemon.cpp @@ -77,9 +77,6 @@ namespace wrench { unsigned long seq = S4U_CommPort::generateUniqueSequenceNumber(); this->process_name = process_name_prefix + "_" + std::to_string(seq); -// this->commport_name = S4U_CommPort::generateUniqueMailbox("mb"); -// this->recv_commport = S4U_CommPort::generateUniqueMailbox("rmb"); - this->commport = S4U_CommPort::getTemporaryCommPort(); this->recv_commport = S4U_CommPort::getTemporaryCommPort(); diff --git a/test/simulation/S4U_CommPortTest.cpp b/test/simulation/S4U_CommPortTest.cpp new file mode 100755 index 0000000000..1d125547ec --- /dev/null +++ b/test/simulation/S4U_CommPortTest.cpp @@ -0,0 +1,435 @@ +/** + * Copyright (c) 2017. The WRENCH Team. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + */ + +#include +#include +#include "../include/UniqueTmpPathPrefix.h" +#include "../include/TestWithFork.h" + +WRENCH_LOG_CATEGORY(s4u_commport_test, "Log category for S4U_CommPort test"); + + +class S4U_CommPortTest : public ::testing::Test { + +public: + std::shared_ptr wms1, wms2; + + void do_AsynchronousCommunication_test(); + void do_NetworkTimeout_test(); + void do_NullCommPort_test(); + +protected: + S4U_CommPortTest() { + + + // Create a two-host platform file + std::string xml = "" + "" + " " + " " + " " + " " + " " + " " + " " + ""; + FILE *platform_file = fopen(platform_file_path.c_str(), "w"); + fprintf(platform_file, "%s", xml.c_str()); + fclose(platform_file); + } + + std::string platform_file_path = UNIQUE_TMP_PATH_PREFIX + "platform.xml"; +}; + +/**********************************************************************/ +/** ASYNCHRONOUS COMMUNICATION TEST **/ +/**********************************************************************/ + +class AsynchronousCommunicationTestWMS : public wrench::ExecutionController { + +public: + AsynchronousCommunicationTestWMS(S4U_CommPortTest *test, + const std::string &hostname) : wrench::ExecutionController(hostname, "test") { + this->test = test; + } + + +private: + S4U_CommPortTest *test; + std::string mode; + + int main() override { + + unsigned long index; + if (this == this->test->wms1.get()) { + /** SENDER **/ + + // Empty set of pending comms + std::vector empty_pending_comms; + try { + wrench::S4U_PendingCommunication::waitForSomethingToHappen(empty_pending_comms, -1); + throw std::runtime_error("Was expecting a std::invalid_argument exception"); + } catch (std::invalid_argument &) { + } + + // One send + auto pending_send = this->test->wms2->commport->iputMessage(new wrench::SimulationMessage(100)); + pending_send->wait(); + + // Another send + auto another_pending_send = this->test->wms2->commport->iputMessage(new wrench::SimulationMessage(100)); + another_pending_send->wait(200); + + + // Two sends, no timeout + std::vector> sends; + sends.push_back(this->test->wms2->commport->iputMessage(new wrench::SimulationMessage(100))); + sends.push_back(this->test->wms2->commport->iputMessage(new wrench::SimulationMessage(100))); + index = wrench::S4U_PendingCommunication::waitForSomethingToHappen(sends, -1); + sends.at(index)->wait(); + index = wrench::S4U_PendingCommunication::waitForSomethingToHappen(sends, -1); + sends.at(index)->wait(); + + // Two sends, timeout + std::vector> sends_timeout; + sends_timeout.push_back(this->test->wms2->commport->iputMessage(new wrench::SimulationMessage(100))); + sends_timeout.push_back(this->test->wms2->commport->iputMessage(new wrench::SimulationMessage(100))); + double now = wrench::Simulation::getCurrentSimulatedDate(); + index = wrench::S4U_PendingCommunication::waitForSomethingToHappen(sends_timeout, 10); + if (index != ULONG_MAX) { + throw std::runtime_error("Was expecting a timeout on the waitForSomethingToHappen"); + } + if (fabs(10 - (wrench::Simulation::getCurrentSimulatedDate() - now)) > 0.5) { + throw std::runtime_error("Seems like we didn't wait for the timeout!"); + } + index = wrench::S4U_PendingCommunication::waitForSomethingToHappen(sends_timeout, 1000); + sends_timeout.at(index)->wait(); + index = wrench::S4U_PendingCommunication::waitForSomethingToHappen(sends_timeout, 1000); + sends_timeout.at(index)->wait(); + + // One send, network failure + pending_send = this->test->wms2->commport->iputMessage(new wrench::SimulationMessage(100)); + wrench::Simulation::sleep(10); + wrench::Simulation::turnOffLink("1"); + try { + pending_send->wait(); + throw std::runtime_error("Should have gotten a NetworkError"); + } catch (wrench::ExecutionException &e) { + auto cause = std::dynamic_pointer_cast(e.getCause()); + cause->toString(); + cause->getCommPortName(); + } + wrench::Simulation::sleep(10); + wrench::Simulation::turnOnLink("1"); + wrench::Simulation::sleep(10); + + // WRENCH_INFO("TWO ASYNCHRONOUS SENDS / NETWORK FAILURES"); + + // Two asynchronous sends, network failure + std::vector> sends_failure; + sends_failure.push_back(this->test->wms2->commport->iputMessage(new wrench::SimulationMessage(100))); + sends_failure.push_back(this->test->wms2->commport->iputMessage(new wrench::SimulationMessage(100))); + wrench::Simulation::sleep(10); + simgrid::s4u::Link::by_name("1")->turn_off(); + // WRENCH_INFO("SIZE= %ld", sends_failure.size()); + index = wrench::S4U_PendingCommunication::waitForSomethingToHappen(sends_failure, 10000); + // WRENCH_INFO("index = %ld", index); + try { + sends_failure.at(index)->wait(); + throw std::runtime_error("Should have gotten a NetworkError"); + } catch (wrench::ExecutionException &e) { + auto cause = std::dynamic_pointer_cast(e.getCause()); + cause->toString(); + cause->getCommPortName(); + } + index = wrench::S4U_PendingCommunication::waitForSomethingToHappen(sends_failure, 10000); + try { + sends_failure.at(index)->wait(); + throw std::runtime_error("Should have gotten a NetworkError"); + } catch (wrench::ExecutionException &e) { + auto cause = std::dynamic_pointer_cast(e.getCause()); + cause->toString(); + cause->getCommPortName(); + } + + + // One synchronous sends, network failure + try { + this->test->wms2->commport->putMessage(new wrench::SimulationMessage(100)); + throw std::runtime_error("Should have gotten a NetworkError"); + } catch (wrench::ExecutionException &e) { + auto cause = std::dynamic_pointer_cast(e.getCause()); + cause->toString(); + cause->getCommPortName(); + } + + + } else { + /** RECEIVER **/ + + // One recv + auto pending_recv = this->test->wms2->commport->igetMessage(); + pending_recv->wait(); + + // Another recv + auto another_pending_recv = this->test->wms2->commport->igetMessage(); +// another_pending_recv->wait(0.01 - wrench::Simulation::getCurrentSimulatedDate()); + another_pending_recv->wait(200); + + // Two recv, no timeout + std::vector> recvs; + recvs.push_back(this->test->wms2->commport->igetMessage()); + recvs.push_back(this->test->wms2->commport->igetMessage()); + index = wrench::S4U_PendingCommunication::waitForSomethingToHappen(recvs, -1); + recvs.at(index)->wait(); + index = wrench::S4U_PendingCommunication::waitForSomethingToHappen(recvs, -1); + recvs.at(index)->wait(); + + // Two recvs (sends are timing out) + this->test->wms2->commport->getMessage(); + this->test->wms2->commport->getMessage(); + + // One recv (which fails) + try { + this->test->wms2->commport->getMessage(); + throw std::runtime_error("Should have gotten a NetworkError"); + } catch (wrench::ExecutionException &e) { + auto cause = std::dynamic_pointer_cast(e.getCause()); + cause->toString(); + cause->getCommPortName(); + } + + // WRENCH_INFO("TWO ASYNCHRONOUS RECV / NETWORK FAILURES"); + + // Two synchronous recv, network failure + try { + this->test->wms2->commport->getMessage(); + throw std::runtime_error("Should have gotten a NetworkError"); + } catch (wrench::ExecutionException &e) { + } + try { + this->test->wms2->commport->getMessage(); + throw std::runtime_error("Should have gotten a NetworkError"); + } catch (wrench::ExecutionException &e) { + auto cause = std::dynamic_pointer_cast(e.getCause()); + cause->toString(); + cause->getCommPortName(); + } + + // One asynchronous recv, network failure + pending_recv = this->test->wms2->commport->igetMessage(); + wrench::Simulation::sleep(10); + try { + pending_recv->wait(); + throw std::runtime_error("Should have gotten a NetworkError"); + } catch (wrench::ExecutionException &e) { + auto cause = std::dynamic_pointer_cast(e.getCause()); + cause->toString(); + cause->getCommPortName(); + } + + } + + return 0; + } +}; + +TEST_F(S4U_CommPortTest, AsynchronousCommunication) { + DO_TEST_WITH_FORK(do_AsynchronousCommunication_test); +} + +void S4U_CommPortTest::do_AsynchronousCommunication_test() { + + + // Create and initialize a simulation + auto simulation = wrench::Simulation::createSimulation(); + + int argc = 2; + auto argv = (char **) calloc(argc, sizeof(char *)); + argv[0] = strdup("unit_test"); + argv[1] = strdup("--wrench-link-shutdown-simulation"); +// argv[2] = strdup("--wrench-log-full"); + + simulation->init(&argc, argv); + + // Setting up the platform + simulation->instantiatePlatform(platform_file_path); + + // Create the WMSs + auto workflow = wrench::Workflow::createWorkflow(); + this->wms1 = simulation->add(new AsynchronousCommunicationTestWMS(this, "Host1")); + this->wms2 = simulation->add(new AsynchronousCommunicationTestWMS(this, "Host2")); + + simulation->launch(); + + for (int i = 0; i < argc; i++) + free(argv[i]); + free(argv); +} + + +/**********************************************************************/ +/** NETWORK TIMEOUT TEST **/ +/**********************************************************************/ + +class NetworkTimeoutTestWMS : public wrench::ExecutionController { + +public: + NetworkTimeoutTestWMS(S4U_CommPortTest *test, + const std::string &hostname) : wrench::ExecutionController(hostname, "test") { + this->test = test; + } + + +private: + S4U_CommPortTest *test; + std::string mode; + + int main() override { + + try { + this->commport->getMessage(10); + throw std::runtime_error("Should have gotten an exception"); + } catch (wrench::ExecutionException &e) { + auto real_error = std::dynamic_pointer_cast(e.getCause()); + if (not real_error) { + throw std::runtime_error("Unexpected failure cause: " + e.getCause()->toString()); + } + if (!real_error->isTimeout()) { + throw std::runtime_error("Network error failure cause should be a time out"); + } + real_error->toString();// coverage + } + + try { + auto pending = this->commport->igetMessage(); + pending->wait(10); + throw std::runtime_error("Should have gotten an exception"); + } catch (wrench::ExecutionException &e) { + auto real_error = std::dynamic_pointer_cast(e.getCause()); + if (not real_error) { + throw std::runtime_error("Unexpected failure cause: " + e.getCause()->toString()); + } + if (!real_error->isTimeout()) { + throw std::runtime_error("Network error failure cause should be a time out"); + } + real_error->toString();// coverage + } + + { + auto pending = this->commport->igetMessage(); + std::vector pending_comms = {pending.get()}; + auto index = wrench::S4U_PendingCommunication::waitForSomethingToHappen(pending_comms, 10); + if (index != ULONG_MAX) { + throw std::runtime_error("Should have gotten ULONG_MAX"); + } + } + + return 0; + } +}; + +TEST_F(S4U_CommPortTest, NetworkTimeout) { + DO_TEST_WITH_FORK(do_NetworkTimeout_test); +} + +void S4U_CommPortTest::do_NetworkTimeout_test() { + + + // Create and initialize a simulation + auto simulation = wrench::Simulation::createSimulation(); + + int argc = 1; + auto argv = (char **) calloc(argc, sizeof(char *)); + argv[0] = strdup("unit_test"); + // argv[1] = strdup("--wrench-link-shutdown-simulation"); + // argv[2] = strdup("--wrench-log-full"); + + simulation->init(&argc, argv); + + // Setting up the platform + simulation->instantiatePlatform(platform_file_path); + + // Create the WMSs + auto workflow = wrench::Workflow::createWorkflow(); + this->wms1 = simulation->add(new NetworkTimeoutTestWMS(this, "Host1")); + + simulation->launch(); + + for (int i = 0; i < argc; i++) + free(argv[i]); + free(argv); +} + + +/**********************************************************************/ +/** NULL MAILBOX TEST **/ +/**********************************************************************/ + +class NullCommPortTestWMS : public wrench::ExecutionController { + +public: + NullCommPortTestWMS(S4U_CommPortTest *test, + const std::string &hostname) : wrench::ExecutionController(hostname, "test") { + this->test = test; + } + + +private: + S4U_CommPortTest *test; + std::string mode; + + int main() override { + + // Coverage + wrench::S4U_CommPort::NULL_MAILBOX->putMessage(nullptr); + wrench::S4U_CommPort::NULL_MAILBOX->iputMessage(nullptr); + try { + wrench::S4U_CommPort::NULL_MAILBOX->getMessage(); + throw std::runtime_error("Shouldn't be able to get message from NULL_MAILBOX"); + } catch (std::invalid_argument &ignore) {} + try { + wrench::S4U_CommPort::NULL_MAILBOX->igetMessage(); + throw std::runtime_error("Shouldn't be able to get message from NULL_MAILBOX"); + } catch (std::invalid_argument &ignore) {} + + return 0; + } +}; + +TEST_F(S4U_CommPortTest, NullCommPort) { + DO_TEST_WITH_FORK(do_NullCommPort_test); +} + +void S4U_CommPortTest::do_NullCommPort_test() { + + + // Create and initialize a simulation + auto simulation = wrench::Simulation::createSimulation(); + + int argc = 1; + auto argv = (char **) calloc(argc, sizeof(char *)); + argv[0] = strdup("unit_test"); + // argv[1] = strdup("--wrench-link-shutdown-simulation"); + // argv[2] = strdup("--wrench-log-full"); + + simulation->init(&argc, argv); + + // Setting up the platform + simulation->instantiatePlatform(platform_file_path); + + // Create the WMSs + this->wms1 = simulation->add(new NullCommPortTestWMS(this, "Host1")); + + simulation->launch(); + + for (int i = 0; i < argc; i++) + free(argv[i]); + free(argv); +} \ No newline at end of file