From af76c4e5cfe917727e16999e1984f6525a56dc87 Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Mon, 20 Nov 2023 20:47:25 -1000 Subject: [PATCH] tweaks --- include/wrench/failure_causes/NetworkError.h | 4 +- .../wrench/simgrid_S4U_util/S4U_CommPort.h | 10 +--- src/wrench/failure_causes/NetworkError.cpp | 11 +++-- .../managers/job_manager/JobManager.cpp | 6 +++ .../services/compute/ComputeService.cpp | 5 +- .../ServiceTerminationDetector.cpp | 2 +- src/wrench/simgrid_S4U_util/S4U_CommPort.cpp | 49 ++++++++++--------- .../S4U_PendingCommunication.cpp | 8 +-- .../FailureCauseConstructorTest.cpp | 16 +++--- ...areMetalComputeServiceTestStandardJobs.cpp | 7 +-- .../VirtualizedClusterServiceTest.cpp | 2 +- 11 files changed, 63 insertions(+), 57 deletions(-) diff --git a/include/wrench/failure_causes/NetworkError.h b/include/wrench/failure_causes/NetworkError.h index e7df645a0d..ea063b240e 100755 --- a/include/wrench/failure_causes/NetworkError.h +++ b/include/wrench/failure_causes/NetworkError.h @@ -45,7 +45,7 @@ namespace wrench { /***********************/ /** \cond INTERNAL */ /***********************/ - NetworkError(NetworkError::OperationType, NetworkError::ErrorType, const std::string &commport_name); + NetworkError(NetworkError::OperationType, NetworkError::ErrorType, const std::string &commport_name, const std::string &message_name); /***********************/ /** \endcond */ /***********************/ @@ -55,11 +55,13 @@ namespace wrench { bool whileSending(); bool isTimeout(); std::string getCommPortName(); + std::string getMessageName(); private: NetworkError::OperationType operation_type; NetworkError::ErrorType error_type; std::string commport_name; + std::string message_name; }; diff --git a/include/wrench/simgrid_S4U_util/S4U_CommPort.h b/include/wrench/simgrid_S4U_util/S4U_CommPort.h index d39797336a..68ff2808df 100755 --- a/include/wrench/simgrid_S4U_util/S4U_CommPort.h +++ b/include/wrench/simgrid_S4U_util/S4U_CommPort.h @@ -41,15 +41,7 @@ namespace wrench { return boost::core::demangle(type_name); } - /** - * @brief Constructor - */ - S4U_CommPort() { - auto number = std::to_string(S4U_CommPort::generateUniqueSequenceNumber()); - this->s4u_mb = simgrid::s4u::Mailbox::by_name("mb_" + number); - this->s4u_mq = simgrid::s4u::MessageQueue::by_name("mq_" + number); - this->name = "cp_" + number; - } + S4U_CommPort(); /** * @brief Synchronously receive a message from a commport_name diff --git a/src/wrench/failure_causes/NetworkError.cpp b/src/wrench/failure_causes/NetworkError.cpp index 0ca7297c9a..3f912b2e11 100755 --- a/src/wrench/failure_causes/NetworkError.cpp +++ b/src/wrench/failure_causes/NetworkError.cpp @@ -26,16 +26,19 @@ namespace wrench { * NetworkError::OperationType::UNKNOWN * @param error_type: the error type * @param commport_name: the name of a commport (or "" if unknown) + * @param message_name: the name of the message (or "" if unknown) */ NetworkError::NetworkError(NetworkError::OperationType operation_type, NetworkError::ErrorType error_type, - const std::string &commport_name) { + const std::string &commport_name, + const std::string &message_name) { if (commport_name.empty()) { throw std::invalid_argument("NetworkError::NetworkError(): invalid arguments"); } this->operation_type = operation_type; this->error_type = error_type; this->commport_name = commport_name; + this->message_name = message_name; } /** @@ -77,9 +80,9 @@ namespace wrench { std::string NetworkError::toString() { std::string operation; if (this->whileSending()) { - operation = "sending to"; + operation = "sending"; } else { - operation = "receiving from"; + operation = "receiving"; } std::string error; if (this->isTimeout()) { @@ -87,7 +90,7 @@ namespace wrench { } else { error = "link failure, or communication peer died"; } - return "Network error (" + error + ") while " + operation + " commport " + this->commport_name; + return "Network error (" + error + ") while " + operation + (this->message_name.empty() ? "" : " a message with name " + this->message_name) + " via commport " + this->commport_name; } diff --git a/src/wrench/managers/job_manager/JobManager.cpp b/src/wrench/managers/job_manager/JobManager.cpp index 6e4bcb1521..327f7012c6 100755 --- a/src/wrench/managers/job_manager/JobManager.cpp +++ b/src/wrench/managers/job_manager/JobManager.cpp @@ -716,6 +716,12 @@ namespace wrench { if (it != this->jobs_to_dispatch.end()) { this->cjob_to_sjob_map.erase(*it); this->jobs_to_dispatch.erase(it); + job->compound_job->state = CompoundJob::State::DISCONTINUED; + job->state = StandardJob::State::TERMINATED; + for (auto const &t : job->getTasks()) { + t->setInternalState(WorkflowTask::InternalState::TASK_READY); + t->setState(WorkflowTask::State::READY); + } this->releaseDaemonLock(); return; } diff --git a/src/wrench/services/compute/ComputeService.cpp b/src/wrench/services/compute/ComputeService.cpp index 27a3f84b44..35c7e22be6 100755 --- a/src/wrench/services/compute/ComputeService.cpp +++ b/src/wrench/services/compute/ComputeService.cpp @@ -51,7 +51,7 @@ namespace wrench { // Send a termination message to the daemon's commport - SYNCHRONOUSLY auto ack_commport = S4U_Daemon::getRunningActorRecvCommPort(); try { - this->commport->putMessage(new ServiceStopDaemonMessage( + this->commport->dputMessage(new ServiceStopDaemonMessage( ack_commport, send_failure_notifications, termination_cause, @@ -68,8 +68,7 @@ namespace wrench { this->network_timeout, "ComputeService::stop(): Received an"); } catch (...) { - this->shutting_down = false; - throw; + // If we don't get the ack we assum it's down anyway } // Set the service state to down diff --git a/src/wrench/services/helper_services/service_termination_detector/ServiceTerminationDetector.cpp b/src/wrench/services/helper_services/service_termination_detector/ServiceTerminationDetector.cpp index 049827a3a9..d467553cfa 100755 --- a/src/wrench/services/helper_services/service_termination_detector/ServiceTerminationDetector.cpp +++ b/src/wrench/services/helper_services/service_termination_detector/ServiceTerminationDetector.cpp @@ -57,7 +57,7 @@ int wrench::ServiceTerminationDetector::main() { WRENCH_INFO("Detected termination of service %s (notifying commport %s)", this->service_to_monitor->getName().c_str(), this->commport_to_notify->get_cname()); this->commport_to_notify->putMessage( - new ServiceHasTerminatedMessage(this->service_to_monitor, return_value_from_main)); + new ServiceHasTerminatedMessage(this->service_to_monitor, return_value_from_main)); } this->service_to_monitor = nullptr;// released, so that it can be freed in case ref-count = 0 diff --git a/src/wrench/simgrid_S4U_util/S4U_CommPort.cpp b/src/wrench/simgrid_S4U_util/S4U_CommPort.cpp index ff40e4f9a2..a8eacf7e84 100755 --- a/src/wrench/simgrid_S4U_util/S4U_CommPort.cpp +++ b/src/wrench/simgrid_S4U_util/S4U_CommPort.cpp @@ -39,6 +39,16 @@ namespace wrench { unsigned long S4U_CommPort::commport_pool_size; double S4U_CommPort::default_control_message_size; + /** + * @brief Constructor + */ + S4U_CommPort::S4U_CommPort() { + auto number = std::to_string(S4U_CommPort::generateUniqueSequenceNumber()); + this->s4u_mb = simgrid::s4u::Mailbox::by_name("mb_" + number); + this->s4u_mq = simgrid::s4u::MessageQueue::by_name("mq_" + number); + this->name = "cp_" + number; + } + /** * @brief Helper method that avoids calling WRENCH_DEBUG from a .h file and do the logging for the templated getMessage() method. @@ -102,47 +112,41 @@ namespace wrench { simgrid::s4u::ActivitySet pending_receives; auto mb_comm = this->s4u_mb->get_async(&msg); pending_receives.push(mb_comm); - auto mq_comm = this->s4u_mq->get_async(&msg); + pending_receives.push(mq_comm); - simgrid::s4u::ActivityPtr finished_recv = nullptr; + simgrid::s4u::ActivityPtr finished_recv; try { // Wait for one activity to complete finished_recv = pending_receives.wait_any_for(timeout); - // msg = this->s4u_mb->get(timeout); } catch (simgrid::TimeoutException &e) { + mq_comm->cancel(); mb_comm->cancel(); -// mq_comm->cancel(); - throw ExecutionException(std::make_shared(NetworkError::RECEIVING, NetworkError::TIMEOUT, this->name)); + throw ExecutionException(std::make_shared(NetworkError::RECEIVING, NetworkError::TIMEOUT, this->name, "")); } catch (simgrid::Exception &e) { auto failed_recv = pending_receives.get_failed_activity(); if (failed_recv == mb_comm) { -// mq_comm->cancel(); -// mb_comm.reset(); + mq_comm->cancel(); throw ExecutionException(std::make_shared( - NetworkError::RECEIVING, NetworkError::FAILURE, this->name)); + NetworkError::RECEIVING, NetworkError::FAILURE, this->name, "")); } else { mb_comm->cancel(); throw ExecutionException(std::make_shared("A communication on a MQ should never fail")); } } + WRENCH_DEBUG("Got the message\n"); + if (finished_recv == mb_comm) { + mq_comm->cancel(); mb_comm->wait(); + } else if (finished_recv == mq_comm) { + mb_comm->cancel(); + mq_comm->wait(); } else { std::cerr << "WTF\n"; } - -// } catch (simgrid::NetworkFailureException &e) { -// throw ExecutionException(std::make_shared( -// NetworkError::RECEIVING, NetworkError::FAILURE, this->s4u_mb->get_name())); -// } catch (simgrid::TimeoutException &e) { -// throw ExecutionException(std::make_shared( -// NetworkError::RECEIVING, NetworkError::TIMEOUT, this->s4u_mb->get_name())); -// } - - #ifdef MESSAGE_MANAGER MessageManager::removeReceivedMessage(this, msg); #endif @@ -176,11 +180,11 @@ namespace wrench { this->s4u_mb->put(msg, (uint64_t) msg->payload); } catch (simgrid::NetworkFailureException &e) { throw ExecutionException(std::make_shared( - NetworkError::SENDING, NetworkError::FAILURE, this->s4u_mb->get_name())); + NetworkError::SENDING, NetworkError::FAILURE, this->s4u_mb->get_name(), msg->getName())); } catch (simgrid::TimeoutException &e) { // Can happen if the other side is doing a timeout.... I think throw ExecutionException(std::make_shared( - NetworkError::SENDING, NetworkError::TIMEOUT, this->s4u_mb->get_name())); + NetworkError::SENDING, NetworkError::TIMEOUT, this->s4u_mb->get_name(), msg->getName())); } } @@ -237,7 +241,7 @@ namespace wrench { comm_ptr = this->s4u_mb->put_async(msg, (uint64_t) msg->payload); } catch (simgrid::NetworkFailureException &e) { throw ExecutionException(std::make_shared( - NetworkError::SENDING, NetworkError::FAILURE, this->s4u_mb->get_name())); + NetworkError::SENDING, NetworkError::FAILURE, this->s4u_mb->get_name(), msg->getName())); } auto pending_communication = std::make_shared( @@ -273,7 +277,7 @@ namespace wrench { comm_ptr = this->s4u_mb->get_async((void **) (&(pending_communication->simulation_message))); } catch (simgrid::NetworkFailureException &e) { throw ExecutionException(std::make_shared( - NetworkError::RECEIVING, NetworkError::FAILURE, this->s4u_mb->get_name())); + NetworkError::RECEIVING, NetworkError::FAILURE, this->s4u_mb->get_name(), "")); } pending_communication->comm_ptr = comm_ptr; return pending_communication; @@ -325,7 +329,6 @@ namespace wrench { } S4U_CommPort::used_commports.insert(commport); - return commport; } diff --git a/src/wrench/simgrid_S4U_util/S4U_PendingCommunication.cpp b/src/wrench/simgrid_S4U_util/S4U_PendingCommunication.cpp index 6a212ece4a..86bbf3f67c 100755 --- a/src/wrench/simgrid_S4U_util/S4U_PendingCommunication.cpp +++ b/src/wrench/simgrid_S4U_util/S4U_PendingCommunication.cpp @@ -55,18 +55,18 @@ namespace wrench { } catch (simgrid::NetworkFailureException &e) { if (this->operation_type == S4U_PendingCommunication::OperationType::SENDING) { throw ExecutionException(std::make_shared( - NetworkError::OperationType::SENDING, NetworkError::FAILURE, this->commport->s4u_mb->get_name())); + NetworkError::OperationType::SENDING, NetworkError::FAILURE, this->commport->s4u_mb->get_name(), "")); } else { throw ExecutionException(std::make_shared( - NetworkError::OperationType::RECEIVING, NetworkError::FAILURE, this->commport->s4u_mb->get_name())); + NetworkError::OperationType::RECEIVING, NetworkError::FAILURE, this->commport->s4u_mb->get_name(), "")); } } catch (simgrid::TimeoutException &e) { if (this->operation_type == S4U_PendingCommunication::OperationType::SENDING) { throw ExecutionException(std::make_shared( - NetworkError::OperationType::SENDING, NetworkError::TIMEOUT, this->commport->s4u_mb->get_name())); + NetworkError::OperationType::SENDING, NetworkError::TIMEOUT, this->commport->s4u_mb->get_name(), "")); } else { throw ExecutionException(std::make_shared( - NetworkError::OperationType::RECEIVING, NetworkError::TIMEOUT, this->commport->s4u_mb->get_name())); + NetworkError::OperationType::RECEIVING, NetworkError::TIMEOUT, this->commport->s4u_mb->get_name(), "")); } } #ifdef MESSAGE_MANAGER diff --git a/test/constructors/failure_cause_constructor/FailureCauseConstructorTest.cpp b/test/constructors/failure_cause_constructor/FailureCauseConstructorTest.cpp index 84dc49b0f9..2032005a1d 100755 --- a/test/constructors/failure_cause_constructor/FailureCauseConstructorTest.cpp +++ b/test/constructors/failure_cause_constructor/FailureCauseConstructorTest.cpp @@ -25,20 +25,20 @@ class FailureCauseConstructorTest : public ::testing::Test { TEST_F(FailureCauseConstructorTest, NetworkError) { wrench::NetworkError *cause; - ASSERT_NO_THROW(cause = new wrench::NetworkError(wrench::NetworkError::SENDING, wrench::NetworkError::TIMEOUT, "commport_name")); - ASSERT_NO_THROW(cause = new wrench::NetworkError(wrench::NetworkError::RECEIVING, wrench::NetworkError::TIMEOUT, "commport_name")); - ASSERT_NO_THROW(cause = new wrench::NetworkError(wrench::NetworkError::SENDING, wrench::NetworkError::FAILURE, "commport_name")); - ASSERT_NO_THROW(cause = new wrench::NetworkError(wrench::NetworkError::RECEIVING, wrench::NetworkError::FAILURE, "commport_name")); - ASSERT_NO_THROW(cause = new wrench::NetworkError(wrench::NetworkError::RECEIVING, wrench::NetworkError::FAILURE, "commport_name")); - ASSERT_THROW(cause = new wrench::NetworkError(wrench::NetworkError::RECEIVING, wrench::NetworkError::FAILURE, ""), std::invalid_argument); + ASSERT_NO_THROW(cause = new wrench::NetworkError(wrench::NetworkError::SENDING, wrench::NetworkError::TIMEOUT, "commport_name", "message_name")); + ASSERT_NO_THROW(cause = new wrench::NetworkError(wrench::NetworkError::RECEIVING, wrench::NetworkError::TIMEOUT, "commport_name", "message_name")); + ASSERT_NO_THROW(cause = new wrench::NetworkError(wrench::NetworkError::SENDING, wrench::NetworkError::FAILURE, "commport_name", "message_name")); + ASSERT_NO_THROW(cause = new wrench::NetworkError(wrench::NetworkError::RECEIVING, wrench::NetworkError::FAILURE, "commport_name", "message_name")); + ASSERT_NO_THROW(cause = new wrench::NetworkError(wrench::NetworkError::RECEIVING, wrench::NetworkError::FAILURE, "commport_name", "message_name")); + ASSERT_THROW(cause = new wrench::NetworkError(wrench::NetworkError::RECEIVING, wrench::NetworkError::FAILURE, "", "message_name"), std::invalid_argument); - ASSERT_NO_THROW(cause = new wrench::NetworkError(wrench::NetworkError::SENDING, wrench::NetworkError::TIMEOUT, "commport_name")); + ASSERT_NO_THROW(cause = new wrench::NetworkError(wrench::NetworkError::SENDING, wrench::NetworkError::TIMEOUT, "commport_name", "message_name")); ASSERT_EQ(cause->isTimeout(), true); ASSERT_EQ(cause->whileReceiving(), false); ASSERT_EQ(cause->whileSending(), true); - ASSERT_NO_THROW(cause = new wrench::NetworkError(wrench::NetworkError::RECEIVING, wrench::NetworkError::FAILURE, "commport_name")); + ASSERT_NO_THROW(cause = new wrench::NetworkError(wrench::NetworkError::RECEIVING, wrench::NetworkError::FAILURE, "commport_name", "message_name")); ASSERT_EQ(cause->isTimeout(), false); ASSERT_EQ(cause->whileReceiving(), true); ASSERT_EQ(cause->whileSending(), false); diff --git a/test/services/compute_services/bare_metal_standard_jobs/BareMetalComputeServiceTestStandardJobs.cpp b/test/services/compute_services/bare_metal_standard_jobs/BareMetalComputeServiceTestStandardJobs.cpp index 319a8f7a69..62f58a2279 100644 --- a/test/services/compute_services/bare_metal_standard_jobs/BareMetalComputeServiceTestStandardJobs.cpp +++ b/test/services/compute_services/bare_metal_standard_jobs/BareMetalComputeServiceTestStandardJobs.cpp @@ -1036,6 +1036,7 @@ class BareMetalComputeServiceJobImmediateTerminationTestWMS : public wrench::Exe wrench::WorkflowTask::stateToString(this->test->task2->getState()) + ")"); } + return 0; } }; @@ -1051,7 +1052,7 @@ void BareMetalComputeServiceTestStandardJobs::do_JobImmediateTermination_test() int argc = 1; auto **argv = (char **) calloc(argc, sizeof(char *)); argv[0] = strdup("unit_test"); - // argv[1] = strdup("--wrench-full-log"); +// argv[1] = strdup("--wrench-full-log"); ASSERT_NO_THROW(simulation->init(&argc, argv)); @@ -1090,8 +1091,8 @@ void BareMetalComputeServiceTestStandardJobs::do_JobImmediateTermination_test() ASSERT_EQ(this->task1->getState(), wrench::WorkflowTask::READY); ASSERT_EQ(this->task2->getState(), wrench::WorkflowTask::READY); - ASSERT_EQ(this->task1->getFailureCount(), 1); - ASSERT_EQ(this->task2->getFailureCount(), 1); + ASSERT_EQ(this->task1->getFailureCount(), 0); + ASSERT_EQ(this->task2->getFailureCount(), 0); for (int i = 0; i < argc; i++) diff --git a/test/services/compute_services/virtualized_cluster/VirtualizedClusterServiceTest.cpp b/test/services/compute_services/virtualized_cluster/VirtualizedClusterServiceTest.cpp index 7c5a2e12d8..9313680805 100755 --- a/test/services/compute_services/virtualized_cluster/VirtualizedClusterServiceTest.cpp +++ b/test/services/compute_services/virtualized_cluster/VirtualizedClusterServiceTest.cpp @@ -798,7 +798,6 @@ class StopAllVMsTestWMS : public wrench::ExecutionController { } wrench::Simulation::sleep(10); - // stop all VMs this->test->compute_service->stop(); @@ -816,6 +815,7 @@ void VirtualizedClusterServiceTest::do_StopAllVMsTest_test() { int argc = 1; auto argv = (char **) calloc(argc, sizeof(char *)); argv[0] = strdup("unit_test"); + //argv[1] = strdup("--wrench-full-log"); ASSERT_NO_THROW(simulation->init(&argc, argv));