Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added non-blocking root communicator #1478

Open
wants to merge 19 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/axom/lumberjack/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ set(lumberjack_headers
Message.hpp
MPIUtility.hpp
RootCommunicator.hpp
NonCollectiveRootCommunicator.hpp
TextEqualityCombiner.hpp
TextTagCombiner.hpp
)
Expand All @@ -27,6 +28,7 @@ set(lumberjack_sources
Message.cpp
MPIUtility.cpp
RootCommunicator.cpp
NonCollectiveRootCommunicator.cpp
)


Expand Down
39 changes: 36 additions & 3 deletions src/axom/lumberjack/MPIUtility.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
*/

#include "axom/lumberjack/MPIUtility.hpp"

#include <cstring>

namespace axom
Expand Down Expand Up @@ -49,16 +48,50 @@ const char* mpiBlockingReceiveMessages(MPI_Comm comm)
return charArray;
}

const char* mpiNonBlockingReceiveMessages(MPI_Comm comm, int tag)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to add tag to mpiBlockingReceiveMessages?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mpiBlockingReceiveMessages is not being used by the non-collective root communicator that's added in this MR, so it just uses the default LJ_TAG defined in this cpp file. That said, if we do want to go for consistency with the other function signatures in this file, I can add the tag argument to that function as well. It could potentially be useful in the future. Any thoughts/preferences?

{
const int mpiTag = (tag == 0) ? LJ_TAG : tag;
char* charArray = nullptr;
int messageSize = -1;
MPI_Status mpiStatus;

// Get size and source of MPI message
int mpiFlag = 0;
MPI_Iprobe(MPI_ANY_SOURCE, tag, comm, &mpiFlag, &mpiStatus);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MPI_Iprobe is nonblocking here, so is there a chance the mpiFlag is not set to true when it is expected to be? Would it be better to have this be a blocking MPI_Probe? Basing this comment off this stackoverflow post: https://stackoverflow.com/questions/43823458/mpi-iprobe-vs-mpi-probe

Additionally, if using MPI_Iprobe, should mpiFlag default be set to false, so it can be set to true only by a successful function call?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the mpiFlag will be set in either context to either true or false, but to your point, it is safer to initialize this as false.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The stackoverflow example illustrates an interesting but slightly different approach than what I'm intending to do. They are calling MPI_Iprobe in a while loop that does not exit until it returns a flag that is non-zero. In my case, I am checking to see if any messages need to be received only once, and if there are no messages, the function exits by returning nullptr. This intent in the stackoverflow example is to continuously monitor the status, whereas I'm only intending to periodically monitor the status whenever the code path enters into this function. Both could be relevant to the problem I'm trying to solve with this communicator, where the root rank needs to receive information from other ranks that they are aborting. I had a preference toward the latter option (periodically monitoring the status whenever the root rank reaches a point where it enters this code path) because it seemed to me like the more efficient option, even if it comes at a cost of sometimes not receiving the status before the program aborts. But I'm not really sure which option is best for this scenario. I'd be curious to hear your thoughts.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a preference toward the latter option (periodically monitoring the status whenever the root rank reaches a point where it enters this code path) because it seemed to me like the more efficient option, even if it comes at a cost of sometimes not receiving the status before the program aborts.

I agree, I would expect the latter option to have less overhead, doing a single poll with MPI_Iprobe instead of spinning on MPI_Iprobe until status is updated in the former case. Nevertheless, I might not be considering something, so am also curious if others have ideas.


if(mpiFlag == 1)
{
MPI_Get_count(&mpiStatus, MPI_CHAR, &messageSize);

// Setup where to receive the char array
charArray = new char[messageSize + 1];
charArray[messageSize] = '\0';

// Receive packed Message
MPI_Recv(charArray,
messageSize,
MPI_CHAR,
mpiStatus.MPI_SOURCE,
mpiTag,
comm,
&mpiStatus);
}

return charArray;
}

void mpiNonBlockingSendMessages(MPI_Comm comm,
int destinationRank,
const char* packedMessagesToBeSent)
const char* packedMessagesToBeSent,
int tag)
{
const int mpiTag = (tag == 0) ? LJ_TAG : tag;
MPI_Request mpiRequest;
MPI_Isend(const_cast<char*>(packedMessagesToBeSent),
strlen(packedMessagesToBeSent),
MPI_CHAR,
destinationRank,
LJ_TAG,
mpiTag,
comm,
&mpiRequest);
MPI_Request_free(&mpiRequest);
Expand Down
17 changes: 16 additions & 1 deletion src/axom/lumberjack/MPIUtility.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,18 @@ namespace lumberjack
*/
const char* mpiBlockingReceiveMessages(MPI_Comm comm);

/*!
*****************************************************************************
* \brief Receives any Message sent to this rank, if there are any messages
* that are sent. Returns null if no messages are sent.
Copy link
Contributor

@gunney1 gunney1 Feb 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we return from a non-blocking probe, there could be sent messages that haven't arrived yet. I suggest replacing "are sent" with "have arrived." It's pedantic but can avoid confusion when the unexpected happens.

*
* \param [in] comm The MPI Communicator.
* \param [in] tag The MPI tag to use for communication. When set to zero,
* MPI communication uses default LJ_Tag.
*****************************************************************************
*/
const char* mpiNonBlockingReceiveMessages(MPI_Comm comm, int tag = 0);

/*!
*****************************************************************************
* \brief Sends all Message sent to the given rank.
Expand All @@ -40,11 +52,14 @@ const char* mpiBlockingReceiveMessages(MPI_Comm comm);
* \param [in] destinationRank Where the Message classes is being sent.
* \param [in,out] packedMessagesToBeSent All of the Message classes to be sent
* packed together.
* \param [in] tag The MPI tag to use for communication. When set to zero,
* MPI communication uses default LJ_Tag.
gberg617 marked this conversation as resolved.
Show resolved Hide resolved
*****************************************************************************
*/
void mpiNonBlockingSendMessages(MPI_Comm comm,
int destinationRank,
const char* packedMessagesToBeSent);
const char* packedMessagesToBeSent,
int tag = 0);
} // end namespace lumberjack
} // end namespace axom

Expand Down
99 changes: 99 additions & 0 deletions src/axom/lumberjack/NonCollectiveRootCommunicator.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright (c) 2017-2024, Lawrence Livermore National Security, LLC and
// other Axom Project Developers. See the top-level LICENSE file for details.
//
// SPDX-License-Identifier: (BSD-3-Clause)

/*!
******************************************************************************
*
* \file NonCollectiveRootCommunicator.cpp
*
* \brief Implementation of the NonCollectiveRootCommunicator class.
*
******************************************************************************
*/

#include "axom/lumberjack/NonCollectiveRootCommunicator.hpp"
#include "axom/lumberjack/MPIUtility.hpp"

namespace axom
{
namespace lumberjack
{
void NonCollectiveRootCommunicator::initialize(MPI_Comm comm, int ranksLimit)
{
static int mpiTag = 32767;
m_mpiComm = comm;
MPI_Comm_rank(m_mpiComm, &m_mpiCommRank);
MPI_Comm_size(m_mpiComm, &m_mpiCommSize);
m_ranksLimit = ranksLimit;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Presumably, this needs to be a positive number greater than or equal to 1. Should we check it w/ a SLIC_ASSERT?

m_mpiTag = mpiTag;
++mpiTag;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is mpiTag being incremented here? This could use a comment.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each communicator that sends/receives messages non-collectively needs its own mpiTag in order to not interfere with other communicators. I added this comment above.

Copy link
Member

@white238 white238 Jan 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also this could overflow the integer size and should be checked. Resetting back to the original should be safe.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would hope we don't have users who regularly initialize over 2 billion communicators 😆 (although I've seen crazier things). Anyways, I added the numeric_limits check in, and I reset the MPI tag to the initial value if it hits that limit.

}

void NonCollectiveRootCommunicator::finalize() { }

int NonCollectiveRootCommunicator::rank() { return m_mpiCommRank; }

void NonCollectiveRootCommunicator::ranksLimit(int value)
{
m_ranksLimit = value;
}

int NonCollectiveRootCommunicator::ranksLimit() { return m_ranksLimit; }

int NonCollectiveRootCommunicator::numPushesToFlush() { return 1; }

void NonCollectiveRootCommunicator::push(
const char* packedMessagesToBeSent,
std::vector<const char*>& receivedPackedMessages)
{
if(m_mpiCommRank == 0)
{
const char* currPackedMessages = nullptr;
bool receive_messages = true;
while(receive_messages)
{
currPackedMessages = mpiNonBlockingReceiveMessages(m_mpiComm, m_mpiTag);

if(isPackedMessagesEmpty(currPackedMessages))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks odd to me that currPackedMessages is used before checking if it's null.

{
if(currPackedMessages == nullptr)
{
receive_messages = false;
}
else
{
delete[] currPackedMessages;
}
}
else
{
receivedPackedMessages.push_back(currPackedMessages);
}

currPackedMessages = nullptr;
}
}
else
{
if(isPackedMessagesEmpty(packedMessagesToBeSent) == false)
{
mpiNonBlockingSendMessages(m_mpiComm, 0, packedMessagesToBeSent, m_mpiTag);
}
}
}

bool NonCollectiveRootCommunicator::isOutputNode()
{
if(m_mpiCommRank == 0)
{
return true;
}
return false;
}

int NonCollectiveRootCommunicator::mpiTag() const { return m_mpiTag; }

} // end namespace lumberjack
} // end namespace axom
143 changes: 143 additions & 0 deletions src/axom/lumberjack/NonCollectiveRootCommunicator.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
// Copyright (c) 2017-2024, Lawrence Livermore National Security, LLC and
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After merging in develop, please run our update_copyright_script to update the copyright on the new files to 2025.
See: https://github.com/LLNL/axom/blob/develop/scripts/update_copyright_date.sh

// other Axom Project Developers. See the top-level LICENSE file for details.
//
// SPDX-License-Identifier: (BSD-3-Clause)

/*!
*******************************************************************************
* \file NonCollectiveRootCommunicator.hpp
*
* \brief This file contains the class definition of the
* NonCollectiveRootCommunicator.
*******************************************************************************
*/

#ifndef NONCOLLECTIVEROOTCOMMUNICATOR_HPP
#define NONCOLLECTIVEROOTCOMMUNICATOR_HPP

#include "axom/lumberjack/Lumberjack.hpp"
#include "axom/lumberjack/Communicator.hpp"

namespace axom
{
namespace lumberjack
{
/*!
*******************************************************************************
* \class NonCollectiveRootCommunicator
*
* \brief Based off of RootCommunicator. This communicator pushes
messages from any rank to root non-collectively, if any messages are sent.
*******************************************************************************
*/
class NonCollectiveRootCommunicator : public axom::lumberjack::Communicator
{
public:
/*!
*****************************************************************************
* \brief Called to initialize the Communicator.
*
* This performs any setup work the Communicator needs before doing any work.
* It is required that this is called before using the Communicator.
*
* \param [in] comm The MPI Communicator
* \param [in] ranksLimit Limit on how many ranks are individually tracked per
* Message.
Comment on lines +44 to +45
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion:

Suggested change
* \param [in] ranksLimit Limit on how many ranks are individually tracked per
* Message.
* \param [in] ranksLimit Upper limit on number of ranks are tracked per Message.

*****************************************************************************
*/
void initialize(MPI_Comm comm, int ranksLimit);

/*!
*****************************************************************************
* \brief Called to finalize the Communicator.
*
* This performs any cleanup work the Communicator needs to do before going
* away.It is required that this is the last function called by the
* Communicator.
*****************************************************************************
*/
void finalize();

/*!
*****************************************************************************
* \brief Returns the MPI rank of this node
*****************************************************************************
*/
int rank();

/*!
*****************************************************************************
* \brief Sets the rank limit.
*
* This is the limit on how many ranks generated a given message are
* individually tracked per Message. After the limit has been reached, only
* the Message::rankCount is incremented.
*
* \param [in] value Limits how many ranks are tracked per Message.
*****************************************************************************
*/
void ranksLimit(int value);

/*!
*****************************************************************************
* \brief Returns the rank limit.
*
* This is the limit on how many ranks generated a given message are
* individually tracked per Message. After the limit has been reached, only
* the Message::rankCount is incremented.
*****************************************************************************
*/
int ranksLimit();

/*!
*****************************************************************************
* \brief Function used by the Lumberjack class to indicate how many
* individual pushes fully flush all currently held Message classes to the
* root node. The Communicator class's tree structure dictates this.
*****************************************************************************
*/
int numPushesToFlush();

/*!
*****************************************************************************
* \brief This pushes all messages to the root node.
*
* All messages are pushed to the root node. This is the same as
* RootCommunicator::pushMessagesFully for this Communicator.
*
* \param [in] packedMessagesToBeSent All of this rank's Message classes
* packed into a single buffer.
* \param [in,out] receivedPackedMessages Received packed message buffers from
* this nodes children.
*****************************************************************************
*/
void push(const char* packedMessagesToBeSent,
std::vector<const char*>& receivedPackedMessages);

/*!
*****************************************************************************
* \brief Function used by the Lumberjack to indicate whether this node should
* be outputting messages. Only the root node outputs messages.
*****************************************************************************
*/
bool isOutputNode();

/*!
*****************************************************************************
* \brief Returns the MPI tag used for this communicator.
*****************************************************************************
*/
int mpiTag() const;

private:
MPI_Comm m_mpiComm;
int m_mpiCommRank;
int m_mpiCommSize;
int m_ranksLimit;
int m_mpiTag;
};

} // end namespace lumberjack
} // end namespace axom

#endif
gberg617 marked this conversation as resolved.
Show resolved Hide resolved
3 changes: 2 additions & 1 deletion src/axom/lumberjack/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ endforeach()
#------------------------------------------------------------------------------
set(lumberjack_mpi_tests
lumberjack_BinaryCommunicator.hpp
lumberjack_RootCommunicator.hpp )
lumberjack_RootCommunicator.hpp
lumberjack_NonCollectiveRootCommunicator.hpp )

axom_add_executable(NAME lumberjack_mpi_tests
SOURCES lumberjack_mpi_main.cpp
Expand Down
Loading