CpperoMQ is a C++11 binding for the 0MQ (libzmq) C library.
CpperoMQ offers several advantages over using the libzmq C library directly and some of the other currently available C++ 0MQ bindings:
- Highly composable multipart messaging
- Type safety
- Performance close to using libzmq directly (via a good optimizer)
To see zguide examples implemented with CpperoMQ as well as some additional examples, see the CpperoMQ-examples repository.
Just like 0MQ, the building blocks in CpperoMQ are contexts, sockets, and messages.
All CpperoMQ classes exist under the CpperoMQ
namespace, so the following saves a few keystrokes for the remainder of this section:
using namespace CpperoMQ;
A context encapsulates 0MQ's internal state. A CpperoMQ Context
can be initialized as follows:
Context context;
A Context
can be customized at construction and via mutators. The most important responsibility of the Context is to create Sockets
:
RequestSocket requestSocket(context.createRequestSocket());
Socket
objects are strongly typed and move-constructed from temporaries provided by the Context
:
auto dealer(context.createDealerSocket());
auto extPub(context.createExtendedPublishSocket());
auto extSub(context.createExtendedSubscribeSocket());
auto pub(context.createPublishSocket());
auto pull(context.createPullSocket());
auto push(context.createPushSocket());
auto reply(context.createReplySocket());
auto request(context.createRequestSocket());
auto router(context.createRouterSocket());
auto sub(context.createSubscribeSocket());
Depending on the socket type, a Socket
can send OutgoingMessage
objects and/or receive IncomingMessage
objects. An OutgoingMessage
is initialized with data at construction only. Instances can be sent but not received:
OutgoingMessage message("Hello");
pub.send(message);
// pub.receive(message); /* Compiler error! */
An IncomingMessage
is default-constructed only. Instances can be received but not sent:
IncomingMessage message;
sub.receive(message);
// sub.send(message); /* Compiler error! */
The primary goal of CpperoMQ is to provide users with extremely composable multipart messaging. The send
and receive
methods on a Socket
take any positive number of parameters, as long as each parameter implements the appropriate Sendable
or Receivable
interface. All parameters in a single call to send
or receive
are treated as the individual message parts (a.k.a. frames) of a multipart message.
Long story short, users send a multipart message like this:
// 'send' treats all of its parameters as a single, multipart message.
// No need to build up a list of message parts before-hand.
pub.send( OutgoingMessage("This")
, OutgoingMessage("is")
, OutgoingMessage("a")
, OutgoingMessage("multipart")
, OutgoingMessage("message") );
Users can then receive the above multipart message as its individual parts like this:
// 'receive' treats all of its parameters as the parts of a single,
// multipart message.
IncomingMessage inMsg1, inMsg2, inMsg3, inMsg4, inMsg5;
sub.receive(inMsg1, inMsg2, inMsg3, inMsg4, inMsg5);
In many cases, it is undesirable to have to know up-front how many message parts are expected when receiving on a socket. It is more convenient to send or receive complex objects directly on a socket. Enter the Sendable
and Receivable
interfaces:
class Employee : public Sendable, public Receivable
{
public:
// ...
private:
int mId;
int mAge;
std::string mName;
};
class Department : public Sendable, public Receivable
{
public:
// ...
private:
int mCode;
std::string mName;
};
Now the user can freely send and receive any combinations of CpperoMQ's OutgoingMessage
and IncomingMessage
as well as the above user-defined Employee
and Department
.
For a somewhat contrived example, imagine publishing a department update message where we broadcast that an employee now belongs to a different department:
// From the publishing executable...
pub.send( Department(/* ... */) // old department
, Department(/* ... */) // new department
, Employee(/* ... */) ); // employee
// From the subscribing executable...
Department oldDept;
Department newDept;
Employee employee;
sub.receive(oldDept, newDept, employee);
The above is sent as a single, multipart message that is guaranteed by 0MQ to be either received in full or not received at all by a connected receiving socket. Note that at any point in the list of parameters of send
we could have incorporated some OutgoingMessage
objects too. Likewise, for receive
we could have included some IncomingMessage
objects.
Take a look at the implementation of the Sendable
interface below for Department
:
virtual bool send(const CpperoMQ::Socket& socket, const bool moreToSend) const override
{
using namespace CpperoMQ;
OutgoingMessage idMsgPart(serializeNumber(mCode));
if (!idMsgPart.send(socket, true))
return false;
OutgoingMessage nameMsgPart(mName.c_str());
if (!nameMsgPart.send(socket, moreToSend))
return false;
return true;
}
Implementing this interface is what enables us to compose multipart messages containing Department
objects. Note that the caller and callee are flipped - instead of calling send
on the socket, we call send
on the OutgoingMessage
objects, passing the Socket
and a boolean as a parameter. The boolean indicates whether or not there are more parts to send for the multipart message.
When implementing the Sendable
interface, the requirement is to pass true for the boolean parameter for all internal send calls up to the final one. The final send must pass the moreToSend
parameter. Composing sendable, multipart messages containing complex objects in CpperoMQ requires that this pattern is correctly followed.
The implementation of the Receivable
interface is similar:
virtual bool receive(CpperoMQ::Socket& socket, bool& moreToReceive) override
{
using namespace CpperoMQ;
IncomingMessage idMsgPart;
if (!idMsgPart.receive(socket, moreToReceive) || !moreToReceive)
return false;
IncomingMessage nameMsgPart;
if (!nameMsgPart.receive(socket, moreToReceive))
return false;
mCode = deserializeNumber(idMsgPart);
mName.assign(nameMsgPart.charData(), nameMsgPart.size());
return true;
}
When implementing the Receivable
interface, the requirement is to pass the moreToReceive boolean reference parameter to all internal receive calls. After a receive is performed, this parameter's value indicates whether or not there exist more parts to receive for the multipart message. Its value must be checked after all internal receive calls up to the last one. Composing receivable, multipart messages containing complex objects in CpperoMQ requires that this pattern is correctly followed.
We can go a step further with the above example by encapsulating the individual objects involved in a Department Update message:
class DepartmentUpdate : public CpperoMQ::Sendable
, public CpperoMQ::Receivable
{
public:
// ...
virtual bool send(const CpperoMQ::Socket& socket, const bool moreToSend) const override
{
using namespace CpperoMQ;
if (!mOldDept.send(socket, true))
return false;
if (!mNewDept.send(socket, true))
return false;
if (!mEmployee.send(socket, moreToSend))
return false;
return true;
}
virtual bool receive(CpperoMQ::Socket& socket, bool& moreToReceive) override
{
using namespace CpperoMQ;
if (!mOldDept.receive(socket, moreToReceive) || !moreToReceive)
return false;
if (!mNewDept.receive(socket, moreToReceive) || !moreToReceive)
return false;
if (!mEmployee.receive(socket, moreToReceive))
return false;
return true;
}
private:
Department mOldDept;
Department mNewDept;
Employee mEmployee;
};
Now we can send DepartmentUpdate
objects directly on Sockets
.
To see zguide examples implemented with CpperoMQ as well as some additional examples, see the CpperoMQ-examples repository.
Disclaimer: Most of the above code did not check for errors. Real code should check the boolean result of each relevant library function. CpperoMQ can throw a CpperoMQ::Error exception, so that should be caught too.
CpperoMQ currently does not implement the following:
- CURVE security
- PAIR sockets
- Some uncommon socket options (workaround by passing socket handle directly to libzmq)
Feel free to contribute implementations for the above items. The author may also add these as needed. The workaround for 1 and 3 is to pass the socket handle to libzmq. 2 requires direct use of libzmq.
The below 0MQ C++ bindings are alternatives to CpperoMQ:
CpperoMQ requires the following:
- A compiler with C++11 support
- libzmq 4.1 or greater.
- libzmq's zmq.hpp header is available on the include path
- Binaries compiled with CpperoMQ must be linked with libzmq.
The specific C++11 features used are listed below. Note that the source code is more likely to change than this README, so the list may be outdated.
auto
Defaulted functions
Delegating constructors
Deleted functions
final
function
noexcept (if not MSVC)
nullptr
override
Rvalue references
static_assert
Trailing return types
Variadic templates
CpperoMQ has been tested on the following operating systems and compilers:
Operating System | Compiler |
---|---|
OS X 10.10 | Xcode 6.4 |
Windows 8 | VS2013 |
The above are the environments and compilers available to the author. Please update this table directly or notify the author upon confirming additional compiler support.
CpperoMQ is a C++11 header-only library, making installation a painless two-step process:
- Download CpperoMQ.
- From the project root's 'include' directory, copy the CpperoMQ directory into a project's (or the system) include path.
Contributions to this binding via pull requests or bug reports are always welcome! See the 0MQ contribution policy page for details.
The CpperoMQ library is licensed under the MIT license. The included LICENSE file and the comment header of each source file contain the full text of the MIT license.