Skip to content

Commit

Permalink
tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
henricasanova committed Nov 20, 2023
1 parent b57a1fe commit d686bd7
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 31 deletions.
70 changes: 41 additions & 29 deletions src/wrench/simgrid_S4U_util/S4U_CommPort.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <wrench/simgrid_S4U_util/S4U_PendingCommunication.h>
#include <wrench/simulation/SimulationMessage.h>
#include "wrench/exceptions/ExecutionException.h"
#include "wrench/failure_causes/FatalFailure.h"

WRENCH_LOG_CATEGORY(wrench_core_commport, "CommPort");

Expand Down Expand Up @@ -78,28 +79,6 @@ namespace wrench {
std::unique_ptr<SimulationMessage> S4U_CommPort::getMessage(bool log) {

return this->getMessage(-1, log);
#if 0
if (this == S4U_CommPort::NULL_COMMPORT) {
throw std::invalid_argument("S4U_CommPort::getMessage(): Cannot be called with NULL_COMMPORT");
}

if (log) WRENCH_DEBUG("Getting a message from commport '%s'", this->s4u_mb->get_cname());
SimulationMessage *msg;
try {
// msg = static_cast<SimulationMessage *>(commport->get());
msg = this->s4u_mb->get<SimulationMessage>(-1);
} catch (simgrid::NetworkFailureException &e) {
throw ExecutionException(std::make_shared<NetworkError>(
NetworkError::RECEIVING, NetworkError::FAILURE, this->s4u_mb->get_cname()));
}

#ifdef MESSAGE_MANAGER
MessageManager::removeReceivedMessage(this, msg);
#endif

WRENCH_DEBUG("Received a '%s' message from commport %s", msg->getName().c_str(), this->s4u_mb->get_cname());
return std::unique_ptr<SimulationMessage>(msg);
#endif
}

/**
Expand All @@ -120,19 +99,52 @@ namespace wrench {
if (log) WRENCH_DEBUG("Getting a message from commport '%s' with timeout %lf sec", this->s4u_mb->get_cname(), timeout);
SimulationMessage *msg;

simgrid::s4u::ActivitySet pending_receives;
auto mb_comm = this->s4u_mb->get_async<SimulationMessage>(&msg);
pending_receives.push(mb_comm);

auto mq_comm = this->s4u_mq->get_async<SimulationMessage>(&msg);

simgrid::s4u::ActivityPtr finished_recv = nullptr;
try {
msg = this->s4u_mb->get<SimulationMessage>(timeout);
} catch (simgrid::NetworkFailureException &e) {
throw ExecutionException(std::make_shared<NetworkError>(
NetworkError::RECEIVING, NetworkError::FAILURE, this->s4u_mb->get_name()));
// Wait for one activity to complete
finished_recv = pending_receives.wait_any_for(timeout);
// msg = this->s4u_mb->get<SimulationMessage>(timeout);
} catch (simgrid::TimeoutException &e) {
throw ExecutionException(std::make_shared<NetworkError>(
NetworkError::RECEIVING, NetworkError::TIMEOUT, this->s4u_mb->get_name()));
mb_comm->cancel();
// mq_comm->cancel();
throw ExecutionException(std::make_shared<NetworkError>(NetworkError::RECEIVING, NetworkError::TIMEOUT, this->name));
} catch (simgrid::Exception &e) {
auto failed_recv = pending_receives.get_failed_activity();
if (failed_recv == mb_comm) {
// mq_comm->cancel();
// mb_comm.reset();
throw ExecutionException(std::make_shared<NetworkError>(
NetworkError::RECEIVING, NetworkError::FAILURE, this->name));
} else {
mb_comm->cancel();
throw ExecutionException(std::make_shared<FatalFailure>("A communication on a MQ should never fail"));
}
}

if (finished_recv == mb_comm) {
mb_comm->wait();
} else {
std::cerr << "WTF\n";
}


// } catch (simgrid::NetworkFailureException &e) {
// throw ExecutionException(std::make_shared<NetworkError>(
// NetworkError::RECEIVING, NetworkError::FAILURE, this->s4u_mb->get_name()));
// } catch (simgrid::TimeoutException &e) {
// throw ExecutionException(std::make_shared<NetworkError>(
// NetworkError::RECEIVING, NetworkError::TIMEOUT, this->s4u_mb->get_name()));
// }


#ifdef MESSAGE_MANAGER
MessageManager::removeReceivedMessage(this, msg);
MessageManager::removeReceivedMessage(this, msg);
#endif

WRENCH_DEBUG("Received a '%s' message from commport '%s'", msg->getName().c_str(), this->s4u_mb->get_cname());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,10 +317,10 @@ void BareMetalComputeServiceOneActionTest::do_Noop_test() {
// Create and initialize a simulation
auto simulation = wrench::Simulation::createSimulation();

int argc = 1;
int argc = 2;
auto argv = (char **) calloc(argc, sizeof(char *));
argv[0] = strdup("one_action_test");
// argv[1] = strdup("--wrench-full-log");
argv[1] = strdup("--wrench-full-log");

ASSERT_NO_THROW(simulation->init(&argc, argv));

Expand Down Expand Up @@ -356,6 +356,7 @@ void BareMetalComputeServiceOneActionTest::do_Noop_test() {

// Running a "do nothing" simulation
ASSERT_NO_THROW(simulation->launch());
std::cerr << "END OF SIMULATION\n";


for (int i = 0; i < argc; i++)
Expand Down

0 comments on commit d686bd7

Please sign in to comment.