Skip to content

Commit

Permalink
Bug--
Browse files Browse the repository at this point in the history
  • Loading branch information
henricasanova committed Dec 9, 2023
1 parent f14ad94 commit 0f063cc
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -198,14 +198,12 @@ namespace wrench {

auto answer_commport = S4U_Daemon::getRunningActorRecvCommPort();

std::cerr << "PUTTING MESSAGE TO THE CS\n";
// send a "run a standard job" message to the daemon's commport
this->commport->putMessage(
new ComputeServiceSubmitCompoundJobRequestMessage(
answer_commport, job, service_specific_args,
this->getMessagePayloadValue(
ComputeServiceMessagePayload::SUBMIT_COMPOUND_JOB_REQUEST_MESSAGE_PAYLOAD)));
std::cerr << "PUT THE MESSAGE TO THE CS\n";

// Get the answer
auto msg = answer_commport->getMessage<ComputeServiceSubmitCompoundJobAnswerMessage>(this->network_timeout,
Expand Down
24 changes: 4 additions & 20 deletions src/wrench/simgrid_S4U_util/S4U_CommPort.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,32 +111,27 @@ namespace wrench {
SimulationMessage *msg_mb;
SimulationMessage *msg_mq;

WRENCH_INFO("IN GET MESSAGE FOR COMMPORT %p", this);
simgrid::s4u::ActivitySet pending_receives;
auto mb_comm = this->s4u_mb->get_async<SimulationMessage>(&msg_mb);
pending_receives.push(mb_comm);
auto mq_comm = this->s4u_mq->get_async<SimulationMessage>(&msg_mq);
pending_receives.push(mq_comm);

WRENCH_INFO("IN GET MESSAGE: %p(%s) %p(%s)",
mb_comm.get(), mb_comm->get_mailbox()->get_cname(),
mq_comm.get(), mq_comm->get_queue()->get_cname());
// WRENCH_INFO("IN GET MESSAGE: %p(%s) %p(%s)",
// mb_comm.get(), mb_comm->get_mailbox()->get_cname(),
// mq_comm.get(), mq_comm->get_queue()->get_cname());

simgrid::s4u::ActivityPtr finished_recv;
try {
// Wait for one activity to complete
WRENCH_INFO("CALLING WAIT_ANY_FOR");
finished_recv = pending_receives.wait_any_for(timeout);
WRENCH_INFO("CALLED WAIT_ANY_FOR");
} catch (simgrid::TimeoutException &e) {
WRENCH_INFO("IN THIS TRY CATCH: TIMEOUT (timeoutvalue=%lf)", timeout);
mq_comm->cancel();
pending_receives.erase(mq_comm);
mb_comm->cancel();
pending_receives.erase(mq_comm);
pending_receives.erase(mb_comm);
throw ExecutionException(std::make_shared<NetworkError>(NetworkError::RECEIVING, NetworkError::TIMEOUT, this->name, ""));
} catch (simgrid::Exception &e) {
WRENCH_INFO("IN THAT TRY CATCH: FAILURE");
auto failed_recv = pending_receives.get_failed_activity();
if (failed_recv == mb_comm) {
mq_comm->cancel();
Expand All @@ -151,26 +146,18 @@ namespace wrench {
}
}

WRENCH_DEBUG("Got the message\n");

SimulationMessage *msg = nullptr;

WRENCH_INFO("XXX IN GET MESSAGE BEFORE CANCEL");
if (finished_recv == mb_comm) {
msg = msg_mb;
WRENCH_INFO("IT WAS THE MB_COM");
WRENCH_INFO("CANCELING %p", mq_comm.get());
mq_comm->cancel();
pending_receives.erase(mq_comm);
pending_receives.erase(mb_comm);
WRENCH_INFO("WAITING ON %p", mb_comm.get());
// mb_comm->wait();
} else if (finished_recv == mq_comm) {
msg = msg_mq;
mb_comm->cancel();
pending_receives.erase(mb_comm);
pending_receives.erase(mq_comm);
// mq_comm->wait();
}

#ifdef MESSAGE_MANAGER
Expand Down Expand Up @@ -265,7 +252,6 @@ namespace wrench {
MessageManager::manageMessage(this, msg);
#endif
comm_ptr = this->s4u_mb->put_async(msg, (uint64_t) msg->payload);
std::cerr << "IN iPUTMESSAGE: " << comm_ptr.get() << "\n";
} catch (simgrid::NetworkFailureException &e) {
throw ExecutionException(std::make_shared<NetworkError>(
NetworkError::SENDING, NetworkError::FAILURE, this->s4u_mb->get_name(), msg->getName()));
Expand Down Expand Up @@ -301,15 +287,13 @@ namespace wrench {

try {
auto comm_ptr = this->s4u_mb->get_async<void>((void **) (&(pending_communication->simulation_message)));
std::cerr << "in IGETMESSAGE " << comm_ptr.get() << "\n";
pending_communication->comm_ptr = comm_ptr;
} catch (simgrid::NetworkFailureException &e) {
throw ExecutionException(std::make_shared<NetworkError>(
NetworkError::RECEIVING, NetworkError::FAILURE, this->s4u_mb->get_name(), ""));
}
simgrid::s4u::MessPtr mess_ptr = this->s4u_mq->get_async<void>((void **) (&(pending_communication->simulation_message)));
pending_communication->mess_ptr = mess_ptr;
std::cerr << "in IGETMESSAGE MQ " << mess_ptr.get() << "\n";
return pending_communication;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1999,7 +1999,6 @@ class ExecutionWithSuspendedServiceTestWMS : public wrench::ExecutionController

// Suspend the service
wrench::Simulation::sleep(1);
std::cerr << "SUSPENDING THE SERVICE\n";
test->compute_service->suspend();

// Create a job
Expand All @@ -2026,11 +2025,9 @@ class ExecutionWithSuspendedServiceTestWMS : public wrench::ExecutionController
}
}

// Sleep for 1 sec
// Sleep for 10 sec
wrench::Simulation::sleep(10);

std::cerr << "RESUMING THE SERVICE\n";

// Resume the service
test->compute_service->resume();

Expand All @@ -2052,10 +2049,10 @@ TEST_F(BareMetalComputeServiceOneTaskTest, ExecutionWithSuspendedService) {
void BareMetalComputeServiceOneTaskTest::do_ExecutionWithSuspendedService_test() {
// Create and initialize a simulation
auto simulation = wrench::Simulation::createSimulation();
int argc = 2;
int argc = 1;
auto **argv = (char **) calloc(argc, sizeof(char *));
argv[0] = strdup("one_task_test");
argv[1] = strdup("--wrench-full-log");
// argv[1] = strdup("--wrench-full-log");

simulation->init(&argc, argv);

Expand Down

0 comments on commit 0f063cc

Please sign in to comment.