Skip to content

Commit

Permalink
tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
henricasanova committed Nov 21, 2023
1 parent 4d5f0c8 commit af76c4e
Show file tree
Hide file tree
Showing 11 changed files with 63 additions and 57 deletions.
4 changes: 3 additions & 1 deletion include/wrench/failure_causes/NetworkError.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ namespace wrench {
/***********************/
/** \cond INTERNAL */
/***********************/
NetworkError(NetworkError::OperationType, NetworkError::ErrorType, const std::string &commport_name);
NetworkError(NetworkError::OperationType, NetworkError::ErrorType, const std::string &commport_name, const std::string &message_name);
/***********************/
/** \endcond */
/***********************/
Expand All @@ -55,11 +55,13 @@ namespace wrench {
bool whileSending();
bool isTimeout();
std::string getCommPortName();
std::string getMessageName();

private:
NetworkError::OperationType operation_type;
NetworkError::ErrorType error_type;
std::string commport_name;
std::string message_name;
};


Expand Down
10 changes: 1 addition & 9 deletions include/wrench/simgrid_S4U_util/S4U_CommPort.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,7 @@ namespace wrench {
return boost::core::demangle(type_name);
}

/**
* @brief Constructor
*/
S4U_CommPort() {
auto number = std::to_string(S4U_CommPort::generateUniqueSequenceNumber());
this->s4u_mb = simgrid::s4u::Mailbox::by_name("mb_" + number);
this->s4u_mq = simgrid::s4u::MessageQueue::by_name("mq_" + number);
this->name = "cp_" + number;
}
S4U_CommPort();

/**
* @brief Synchronously receive a message from a commport_name
Expand Down
11 changes: 7 additions & 4 deletions src/wrench/failure_causes/NetworkError.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,19 @@ namespace wrench {
* NetworkError::OperationType::UNKNOWN
* @param error_type: the error type
* @param commport_name: the name of a commport (or "" if unknown)
* @param message_name: the name of the message (or "" if unknown)
*/
NetworkError::NetworkError(NetworkError::OperationType operation_type,
NetworkError::ErrorType error_type,
const std::string &commport_name) {
const std::string &commport_name,
const std::string &message_name) {
if (commport_name.empty()) {
throw std::invalid_argument("NetworkError::NetworkError(): invalid arguments");
}
this->operation_type = operation_type;
this->error_type = error_type;
this->commport_name = commport_name;
this->message_name = message_name;
}

/**
Expand Down Expand Up @@ -77,17 +80,17 @@ namespace wrench {
std::string NetworkError::toString() {
std::string operation;
if (this->whileSending()) {
operation = "sending to";
operation = "sending";
} else {
operation = "receiving from";
operation = "receiving";
}
std::string error;
if (this->isTimeout()) {
error = "timeout";
} else {
error = "link failure, or communication peer died";
}
return "Network error (" + error + ") while " + operation + " commport " + this->commport_name;
return "Network error (" + error + ") while " + operation + (this->message_name.empty() ? "" : " a message with name " + this->message_name) + " via commport " + this->commport_name;
}


Expand Down
6 changes: 6 additions & 0 deletions src/wrench/managers/job_manager/JobManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,12 @@ namespace wrench {
if (it != this->jobs_to_dispatch.end()) {
this->cjob_to_sjob_map.erase(*it);
this->jobs_to_dispatch.erase(it);
job->compound_job->state = CompoundJob::State::DISCONTINUED;
job->state = StandardJob::State::TERMINATED;
for (auto const &t : job->getTasks()) {
t->setInternalState(WorkflowTask::InternalState::TASK_READY);
t->setState(WorkflowTask::State::READY);
}
this->releaseDaemonLock();
return;
}
Expand Down
5 changes: 2 additions & 3 deletions src/wrench/services/compute/ComputeService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ namespace wrench {
// Send a termination message to the daemon's commport - SYNCHRONOUSLY
auto ack_commport = S4U_Daemon::getRunningActorRecvCommPort();
try {
this->commport->putMessage(new ServiceStopDaemonMessage(
this->commport->dputMessage(new ServiceStopDaemonMessage(
ack_commport,
send_failure_notifications,
termination_cause,
Expand All @@ -68,8 +68,7 @@ namespace wrench {
this->network_timeout,
"ComputeService::stop(): Received an");
} catch (...) {
this->shutting_down = false;
throw;
// If we don't get the ack we assum it's down anyway
}

// Set the service state to down
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ int wrench::ServiceTerminationDetector::main() {
WRENCH_INFO("Detected termination of service %s (notifying commport %s)",
this->service_to_monitor->getName().c_str(), this->commport_to_notify->get_cname());
this->commport_to_notify->putMessage(
new ServiceHasTerminatedMessage(this->service_to_monitor, return_value_from_main));
new ServiceHasTerminatedMessage(this->service_to_monitor, return_value_from_main));
}

this->service_to_monitor = nullptr;// released, so that it can be freed in case ref-count = 0
Expand Down
49 changes: 26 additions & 23 deletions src/wrench/simgrid_S4U_util/S4U_CommPort.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ namespace wrench {
unsigned long S4U_CommPort::commport_pool_size;
double S4U_CommPort::default_control_message_size;

/**
* @brief Constructor
*/
S4U_CommPort::S4U_CommPort() {
auto number = std::to_string(S4U_CommPort::generateUniqueSequenceNumber());
this->s4u_mb = simgrid::s4u::Mailbox::by_name("mb_" + number);
this->s4u_mq = simgrid::s4u::MessageQueue::by_name("mq_" + number);
this->name = "cp_" + number;
}


/**
* @brief Helper method that avoids calling WRENCH_DEBUG from a .h file and do the logging for the templated getMessage() method.
Expand Down Expand Up @@ -102,47 +112,41 @@ namespace wrench {
simgrid::s4u::ActivitySet pending_receives;
auto mb_comm = this->s4u_mb->get_async<SimulationMessage>(&msg);
pending_receives.push(mb_comm);

auto mq_comm = this->s4u_mq->get_async<SimulationMessage>(&msg);
pending_receives.push(mq_comm);

simgrid::s4u::ActivityPtr finished_recv = nullptr;
simgrid::s4u::ActivityPtr finished_recv;
try {
// Wait for one activity to complete
finished_recv = pending_receives.wait_any_for(timeout);
// msg = this->s4u_mb->get<SimulationMessage>(timeout);
} catch (simgrid::TimeoutException &e) {
mq_comm->cancel();
mb_comm->cancel();
// mq_comm->cancel();
throw ExecutionException(std::make_shared<NetworkError>(NetworkError::RECEIVING, NetworkError::TIMEOUT, this->name));
throw ExecutionException(std::make_shared<NetworkError>(NetworkError::RECEIVING, NetworkError::TIMEOUT, this->name, ""));
} catch (simgrid::Exception &e) {
auto failed_recv = pending_receives.get_failed_activity();
if (failed_recv == mb_comm) {
// mq_comm->cancel();
// mb_comm.reset();
mq_comm->cancel();
throw ExecutionException(std::make_shared<NetworkError>(
NetworkError::RECEIVING, NetworkError::FAILURE, this->name));
NetworkError::RECEIVING, NetworkError::FAILURE, this->name, ""));
} else {
mb_comm->cancel();
throw ExecutionException(std::make_shared<FatalFailure>("A communication on a MQ should never fail"));
}
}

WRENCH_DEBUG("Got the message\n");

if (finished_recv == mb_comm) {
mq_comm->cancel();
mb_comm->wait();
} else if (finished_recv == mq_comm) {
mb_comm->cancel();
mq_comm->wait();
} else {
std::cerr << "WTF\n";
}


// } catch (simgrid::NetworkFailureException &e) {
// throw ExecutionException(std::make_shared<NetworkError>(
// NetworkError::RECEIVING, NetworkError::FAILURE, this->s4u_mb->get_name()));
// } catch (simgrid::TimeoutException &e) {
// throw ExecutionException(std::make_shared<NetworkError>(
// NetworkError::RECEIVING, NetworkError::TIMEOUT, this->s4u_mb->get_name()));
// }


#ifdef MESSAGE_MANAGER
MessageManager::removeReceivedMessage(this, msg);
#endif
Expand Down Expand Up @@ -176,11 +180,11 @@ namespace wrench {
this->s4u_mb->put(msg, (uint64_t) msg->payload);
} catch (simgrid::NetworkFailureException &e) {
throw ExecutionException(std::make_shared<NetworkError>(
NetworkError::SENDING, NetworkError::FAILURE, this->s4u_mb->get_name()));
NetworkError::SENDING, NetworkError::FAILURE, this->s4u_mb->get_name(), msg->getName()));
} catch (simgrid::TimeoutException &e) {
// Can happen if the other side is doing a timeout.... I think
throw ExecutionException(std::make_shared<NetworkError>(
NetworkError::SENDING, NetworkError::TIMEOUT, this->s4u_mb->get_name()));
NetworkError::SENDING, NetworkError::TIMEOUT, this->s4u_mb->get_name(), msg->getName()));
}
}

Expand Down Expand Up @@ -237,7 +241,7 @@ namespace wrench {
comm_ptr = this->s4u_mb->put_async(msg, (uint64_t) msg->payload);
} catch (simgrid::NetworkFailureException &e) {
throw ExecutionException(std::make_shared<NetworkError>(
NetworkError::SENDING, NetworkError::FAILURE, this->s4u_mb->get_name()));
NetworkError::SENDING, NetworkError::FAILURE, this->s4u_mb->get_name(), msg->getName()));
}

auto pending_communication = std::make_shared<S4U_PendingCommunication>(
Expand Down Expand Up @@ -273,7 +277,7 @@ namespace wrench {
comm_ptr = this->s4u_mb->get_async<void>((void **) (&(pending_communication->simulation_message)));
} catch (simgrid::NetworkFailureException &e) {
throw ExecutionException(std::make_shared<NetworkError>(
NetworkError::RECEIVING, NetworkError::FAILURE, this->s4u_mb->get_name()));
NetworkError::RECEIVING, NetworkError::FAILURE, this->s4u_mb->get_name(), ""));
}
pending_communication->comm_ptr = comm_ptr;
return pending_communication;
Expand Down Expand Up @@ -325,7 +329,6 @@ namespace wrench {
}

S4U_CommPort::used_commports.insert(commport);

return commport;
}

Expand Down
8 changes: 4 additions & 4 deletions src/wrench/simgrid_S4U_util/S4U_PendingCommunication.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,18 @@ namespace wrench {
} catch (simgrid::NetworkFailureException &e) {
if (this->operation_type == S4U_PendingCommunication::OperationType::SENDING) {
throw ExecutionException(std::make_shared<NetworkError>(
NetworkError::OperationType::SENDING, NetworkError::FAILURE, this->commport->s4u_mb->get_name()));
NetworkError::OperationType::SENDING, NetworkError::FAILURE, this->commport->s4u_mb->get_name(), ""));
} else {
throw ExecutionException(std::make_shared<NetworkError>(
NetworkError::OperationType::RECEIVING, NetworkError::FAILURE, this->commport->s4u_mb->get_name()));
NetworkError::OperationType::RECEIVING, NetworkError::FAILURE, this->commport->s4u_mb->get_name(), ""));
}
} catch (simgrid::TimeoutException &e) {
if (this->operation_type == S4U_PendingCommunication::OperationType::SENDING) {
throw ExecutionException(std::make_shared<NetworkError>(
NetworkError::OperationType::SENDING, NetworkError::TIMEOUT, this->commport->s4u_mb->get_name()));
NetworkError::OperationType::SENDING, NetworkError::TIMEOUT, this->commport->s4u_mb->get_name(), ""));
} else {
throw ExecutionException(std::make_shared<NetworkError>(
NetworkError::OperationType::RECEIVING, NetworkError::TIMEOUT, this->commport->s4u_mb->get_name()));
NetworkError::OperationType::RECEIVING, NetworkError::TIMEOUT, this->commport->s4u_mb->get_name(), ""));
}
}
#ifdef MESSAGE_MANAGER
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,20 @@ class FailureCauseConstructorTest : public ::testing::Test {
TEST_F(FailureCauseConstructorTest, NetworkError) {

wrench::NetworkError *cause;
ASSERT_NO_THROW(cause = new wrench::NetworkError(wrench::NetworkError::SENDING, wrench::NetworkError::TIMEOUT, "commport_name"));
ASSERT_NO_THROW(cause = new wrench::NetworkError(wrench::NetworkError::RECEIVING, wrench::NetworkError::TIMEOUT, "commport_name"));
ASSERT_NO_THROW(cause = new wrench::NetworkError(wrench::NetworkError::SENDING, wrench::NetworkError::FAILURE, "commport_name"));
ASSERT_NO_THROW(cause = new wrench::NetworkError(wrench::NetworkError::RECEIVING, wrench::NetworkError::FAILURE, "commport_name"));
ASSERT_NO_THROW(cause = new wrench::NetworkError(wrench::NetworkError::RECEIVING, wrench::NetworkError::FAILURE, "commport_name"));
ASSERT_THROW(cause = new wrench::NetworkError(wrench::NetworkError::RECEIVING, wrench::NetworkError::FAILURE, ""), std::invalid_argument);
ASSERT_NO_THROW(cause = new wrench::NetworkError(wrench::NetworkError::SENDING, wrench::NetworkError::TIMEOUT, "commport_name", "message_name"));
ASSERT_NO_THROW(cause = new wrench::NetworkError(wrench::NetworkError::RECEIVING, wrench::NetworkError::TIMEOUT, "commport_name", "message_name"));
ASSERT_NO_THROW(cause = new wrench::NetworkError(wrench::NetworkError::SENDING, wrench::NetworkError::FAILURE, "commport_name", "message_name"));
ASSERT_NO_THROW(cause = new wrench::NetworkError(wrench::NetworkError::RECEIVING, wrench::NetworkError::FAILURE, "commport_name", "message_name"));
ASSERT_NO_THROW(cause = new wrench::NetworkError(wrench::NetworkError::RECEIVING, wrench::NetworkError::FAILURE, "commport_name", "message_name"));
ASSERT_THROW(cause = new wrench::NetworkError(wrench::NetworkError::RECEIVING, wrench::NetworkError::FAILURE, "", "message_name"), std::invalid_argument);


ASSERT_NO_THROW(cause = new wrench::NetworkError(wrench::NetworkError::SENDING, wrench::NetworkError::TIMEOUT, "commport_name"));
ASSERT_NO_THROW(cause = new wrench::NetworkError(wrench::NetworkError::SENDING, wrench::NetworkError::TIMEOUT, "commport_name", "message_name"));
ASSERT_EQ(cause->isTimeout(), true);
ASSERT_EQ(cause->whileReceiving(), false);
ASSERT_EQ(cause->whileSending(), true);

ASSERT_NO_THROW(cause = new wrench::NetworkError(wrench::NetworkError::RECEIVING, wrench::NetworkError::FAILURE, "commport_name"));
ASSERT_NO_THROW(cause = new wrench::NetworkError(wrench::NetworkError::RECEIVING, wrench::NetworkError::FAILURE, "commport_name", "message_name"));
ASSERT_EQ(cause->isTimeout(), false);
ASSERT_EQ(cause->whileReceiving(), true);
ASSERT_EQ(cause->whileSending(), false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1036,6 +1036,7 @@ class BareMetalComputeServiceJobImmediateTerminationTestWMS : public wrench::Exe
wrench::WorkflowTask::stateToString(this->test->task2->getState()) + ")");
}


return 0;
}
};
Expand All @@ -1051,7 +1052,7 @@ void BareMetalComputeServiceTestStandardJobs::do_JobImmediateTermination_test()
int argc = 1;
auto **argv = (char **) calloc(argc, sizeof(char *));
argv[0] = strdup("unit_test");
// argv[1] = strdup("--wrench-full-log");
// argv[1] = strdup("--wrench-full-log");

ASSERT_NO_THROW(simulation->init(&argc, argv));

Expand Down Expand Up @@ -1090,8 +1091,8 @@ void BareMetalComputeServiceTestStandardJobs::do_JobImmediateTermination_test()
ASSERT_EQ(this->task1->getState(), wrench::WorkflowTask::READY);
ASSERT_EQ(this->task2->getState(), wrench::WorkflowTask::READY);

ASSERT_EQ(this->task1->getFailureCount(), 1);
ASSERT_EQ(this->task2->getFailureCount(), 1);
ASSERT_EQ(this->task1->getFailureCount(), 0);
ASSERT_EQ(this->task2->getFailureCount(), 0);


for (int i = 0; i < argc; i++)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -798,7 +798,6 @@ class StopAllVMsTestWMS : public wrench::ExecutionController {
}

wrench::Simulation::sleep(10);

// stop all VMs
this->test->compute_service->stop();

Expand All @@ -816,6 +815,7 @@ void VirtualizedClusterServiceTest::do_StopAllVMsTest_test() {
int argc = 1;
auto argv = (char **) calloc(argc, sizeof(char *));
argv[0] = strdup("unit_test");
//argv[1] = strdup("--wrench-full-log");

ASSERT_NO_THROW(simulation->init(&argc, argv));

Expand Down

0 comments on commit af76c4e

Please sign in to comment.