The ZeroMQ project cppzmq provides C++ bindings for libzmq. With them we can exercise the power of ZeroMQ with idiomatic, modern C++. The bindings provide type safety, exception-based error reporting, and the RAII approach to resource management. Most cppzmq functionality will work with C++11 and even older and here we consider features requiring C++17 and we ignore deprecated interfaces.
In this document, I tour the main parts of the cppzmq package so I can learn them better. I write down what I learn along the way so maybe others can learn something too.
Brevity is attempted and so the general concepts of ZeroMQ are assumed to already be understood. If you, the curious reader, have not yet done so please do yourself a big favor and read the entire ZeroMQ Guide. Read it at least twice. I think gets even better each time I read it as I find something new and useful that I had previously missed.
This tour may not be needed by everyone. We may make effective use of cppzmq simply by reading its header files and maybe checking the unit tests. In some cases we may also need to refer to the libzmq API documentation which should be installed on your system as Unix man pages (man zmq
to start) and which are available online. This document attempts to distill what was learned in the process of just such a reading. I hope it may serve as a gentle narrative or tour that will help others quickly feel comfortable developing with cppzmq.
The cppzmq package core is a header-only library providing the C++ namespace zmq::
. Currently the library consists of two headers and here is a summary of what they provide:
- ~zmq.hpp~
- (single part) message, context, buffer, socket, monitor, poller
- ~zmq_addon.hpp~
- multipart and related functions and another form of poller
Most of the goodies are in zmq.hpp
and we go through them in the order they appear in that file. The goodies in zmq_addon.hpp
are just as good and I feel there’s no need for a separation. All my cppzmq projects benefit from both.
The package also provides substantial unit tests which can be mined for programming examples. The package README file gives instructions for formal installation but one very simple option is copying the zmq.hpp
and zmq_addon.hpp
files right into our own package.
The message_t
class from zmq.hpp
is a C++ facade over an opaque zmq_msg_t
from libzmq. It is conceptually equivalent to the {{{czman(zframe,3)}}} from CZMQ, the high level C bindings to libzmq. In ZeroMQ terms, message_t
is a single-part message in that it holds a contiguous block of memory. Later, (sections Socket and Multipart) the idea of multiple single-part messages and multipart messages are covered.
Here are some examples of constructing a message_t
:
// Default constructs an empty message of size zero.
zmq::message_t msg;
// Have message allocate some memory internally.
const size_t size = 1024;
zmq::message_t msg(size);
// Initialize internal memory with a copy of external data.
const char bytes[s] = {0};
zmq::message_t msg(bytes, size);
// Or, from string literal (see also str_buffer).
zmq::message_t msg("hello world!", 12);
// Initialize through iteration on a container.
std::vector<int> ivec = {1,2,3};
zmq::message_t msg(ivec.begin(), ivec.end());
// or more simply
zmq::message_t msg(ivec);
// Zero-copy provision with custom deallocation callback.
// Wraps zmq_msg_init_data(3), see man page for details.
zmq::message_t msg(byte, size, myfree, nullptr);
// Move/swap the data of one message into a new one
zmq::message_t msg2(msg);
// or
zmq::message_t msg2 = msg;
We can also “rebuild” an existing message. The methods to do this mirror the constructor prototypes:
// Empty the message
msg.rebuild();
// Allocate a new size
msg.rebuild(size);
// Allocate and set with a copy
msg.rebuild(bytes, size);
Not included here is a zero-copy version similar to the zero-copy constructor shown above.
The message is a block of bytes and we can get at those bytes in a few ways:
// Size of the data in bytes
size_t msg.size();
// Low level, type free access
const void* vptr = msg.data();
// As above but type cast
const int* iptr = msg.data<int>();
// With a copy into a non-const collection
std::vector ints(msg.data<int>(), msg.size()/sizeof(int));
// If the data makes sense as a string.
std::string str = msg.to_string();
// And we can get a zero copy view.
std::string_view strv = msg.to_string_view();
// An artistic string representation of the contents.
std::cout << msg.str() << std::endl;
Here we have a few details that are specific to certain socket types. We still haven’t gotten to sockets, but they are coming. You may feel comfortable to skip this section for now.
When we receive and send messages via a socket of type SERVER our application must manage a “routing ID” in order to associate the messages with a remote CLIENT socket. We do this by getting and setting this ID from/on the message as:
// After we receive a message, remember its routing ID:
uint32_t rid = msg.routing_id();
// Later, just before sending, we make sure to set the ID:
zmq::message_t msg2;
msg2.set_routing_id(rid);
In this example, two messages are used. If the received message is reused for the subsequent send, and we have not rebuilt it, the routing ID is retained and no explicit get/set is required.
The RADIO/DISH sockets have a concept similar to SERVER/CLIENT routing ID and PUB/SUB topics which is that of a named group to which messages are associated. This group name may be set on and retrieved from the message.
// 16 byte max including null '\0' terminator char
const char* grp = "hello world!";
msg.set_group(grp);
// Get the group name.
grp = msg.group();
More generally, messages may carry per-connection metadata “properties”. The keys and values for these are of type string. We’ll describe how these may be set through socket options later but for now here is an example of how properties may be retrieved.
// Get system property
const char* stype = msg.gets("Socket-Type");
// Get an application property
const char* color = msg.gets("X-Favorite-Color");
This gets()
method wraps {{{zman(zmq_msg_gets,3)}}} so see that man page for details.
In cppzmq, a buffer is like a message, but different. It allows us another way to transmit data without creating a message, or it gives us easier ways to create a message from our data.
There are two variants of buffers, mutable_buffer
and const_buffer
. As the name suggests, the data given to the first may be modified while the data given to the second may not be.
Either type of buffer may be constructed directly as in this example:
// Empty
zmq::mutable_buffer mbuf();
zmq::const_buffer cbuf();
// Fodder data
const size_t size = 1024;
void* ptr = malloc(size);
const void* cptr = ptr;
// With data
zmq::mutable_buffer mbuf(ptr, size);
zmq::const_buffer cbuf(cptr, size);
We are also given a variety of functions named buffer()
to construct buffers in useful ways. We give some examples next and will see later some examples of how to use buffers in the sections Socket and Multipart.
// Fodder data from previous example
// Basic construction
zmq::mutable_buffer mbuf = zmq::buffer(ptr, size);
zmq::const_buffer cbuf = zmq::buffer(cptr, size);
// C array.
int data[1024];
mbuf = zmq::buffer(data);
// C++ vector, std::array is similar
std::vector<int> data(size);
mbuf = zmq::buffer(data);
// C++ string and string literal
std::string str = "hello world";
mbuf = zmq::buffer(str);
cbuf = zmq::str_buffer("hello world");
Once constructed, buffers are rather simple but we can operate on them to narrow their view of the underlying data.
// Truncate tail half, same works with const_buffer
mbuf = zmq::buffer(mbuf, mbuf.size()/2);
// Truncate front half, etc cbuf.
mbuf += mbuf.size()/2;
Why do this narrowing? One very useful pattern is for an application to take some action based on the prefix or postfix of a message. This narrowing can be performed after this information is used and the remainder can be forwarded to an output socket. No copying needed.
Unlike a message, a buffer has only a basic pair of methods to get back the original data and its size, reflecting any narrowing that occurred.
void *vptr = mbuf.data();
size_t size = mbuf.size();
And, that’s about it.
The context_t
from zmq.hpp
embodies an opaque libzmq context such as created by {{{zman(zmq_ctx_new,3)}}}. A context is used by ZeroMQ to collect and manage a set of sockets which the application creates on/in the context. The context is thread safe (unlike some sockets) and may be shared between threads without concern for locking at the application level.
Almost always we create the default context:
zmq::context_t ctx;
A move constructor is also available. After construction, various context options may be set and queried and cppzmq provides ctx.set()
and ctx.get()
to do this. I have yet to use them but we may check {{{zman(zmq_ctx_set,3)}}} for anything interesting.
We can not copy a context but we can move. Typically, we will construct it, keep it alive as long as we need the sockets that we have created with it and if we need it in some other code context we may pass it by reference. Here is a contrived example.
void run_app(zmq::context_t& ctx);
void main()
{
zmq::context_t ctx;
run_app(ctx);
}
In most applications, we will let the context_t
destruct and that will tear down our sockets. We may also tear down even sooner:
// Cease any blocking operations in progress.
ctx.shutdown();
// Do a shutdown, if needed and destroy the context.
ctx.close();
The heart of ZeroMQ is the socket and for it cppzmq supplies us with the class socket_t
. There exists too much useful information on the intriguing variety of ZeroMQ sockets to repeat here. A concise and definitive source is {{{zman(zmq_socket,3)}}}. Really, read that carefully as it answers a large fraction of questions I have had and see asked by others.
To be useful, sockets must be created with a context (which we covered in section Context) and a socket type identifier. In libzmq the type is identified with an integer, usually as provided by a CPP macro like ZMQ_PUB
. In cppzmq (in C++11) we may still use these macros or bare integers or we may use an enum class socket_type
.
using namespace zmq;
context_t ctx;
socket_t pub(ctx, ZMQ_PUB);
socket_t sub(ctx, socket_type::sub);
A default socket may be constructed and later swap guts with another socket through a move assignment. A socket may also be move-constructed. But we can not copy a socket.
When a socket destructs or if its socket_t::close()
method is explicitly called, the underlying libzmq socket will be destroyed via {{{zman(zmq_close,3)}}}.
ZeroMQ sockets link up with each other via transport addresses (see eg {{{zman(zmq_tcp,7)}}}, {{{zman(zmq_ipc,7)}}}, {{{zman(zmq_inproc,7)}}}). One or more sockets may “bind” to an address and one or more may “connect”. At least one of each type of link must be made for messages to be exchanged. Other rules apply. For example, only certain socket types may intercommunicate and some socket types do not work with some transports. These rules are all general to libzmq (not just cppzmq). It’s up to us to write applications that follow these rules while cppzmq provides us very simple ways to form the links.
const std::string addr = "tcp://127.0.0.1:5678";
sock1.bind(addr);
sock2.connect(addr);
Note the lack of return values. A zmq::error_t
will be thrown if something goes wrong. The linkage may be kept for the entire life cycle of the sockets but some applications may want to explicitly undo these actions:
sock1.unbind(addr);
sock2.disconnect(addr);
We have several ways to send with cppzmq and several more which are deprecated. Here, let’s focus on the preferred methods. The choice is to pass a message_t
by lvalue reference or by rvalue reference or to pass a const_buffer
by value In all cases we must also provide something called “send flags” which we will cover in a little bit
zmq::message_t msg = ...;
// Pass by reference
auto res = sock.send(msg, zmq::send_flags::none);
// Pass by move
auto res = sock.send(std::move(msg), zmq::send_flags::none);
// Pass by buffer
auto res = sock.send(zmq::str_buffer("hello world"), zmq::send_flags::none);
The first two call {{{zman(zmq_msg_send,3)}}} and the last calls {{{zman(msg_send,3)}}} so see those man pages for any nitty gritty details you may want.
ZeroMQ accepts a few “hints” on how it is to treat a message when sending. In cppzmq we may provide these with the enum class
called zmq::send_flags
. It is rare that I use anything other than zmq::send_flags::none
, as shown in the example, but two more are available:
dontwait
- if the send may block instead return immediately with error EAGAIN
sndmore
- the message is to be sent together with a following message
Details of the applicability and meaning of these flags are found in {{{zman(zmq_msg_send,3)}}}.
That auto res
holding the return of a send()
in the example above is a zmq::send_result_t
and that is a std::optional
which may hold a size_t
giving the number of bytes sent or nothing if EAGAIN error occurred and that only happens with a dontwait
flag (see above).
Regardless of the send flag used we must save the send result in a variable because the send()
methods are compiled marked with the C++ attribute [[nodiscard]]
. We can be lazy and then do nothing with the res
variable, but we should be more rigorous and compile with -Wunused
(maybe via -Wall
) so we can be told when we fail to use the res
send result. Let’s be even better and compile with -Werror
to really force us to do something with it.
Regardless of what send flag we use, any other error that occurs will lead to a throw of the zmq::error_t
exception so we do not require any special compiler flags to be sure all other errors can not go silently unchecked.
At times our application may send messages faster than downstream applications can receive. Eventually, our socket’s internal buffer will reach its “high water mark” (HWM) and enter what is called the “mute” state. What happens next depends on the socket types used and is explained in nice detail in {{{zman(zmq_socket,3)}}}. We may elect to let that built-in behavior handle the issue or we may develop our application to be more proactive. For that, we can learn if the next message will tip our socket into “mute” step by polling it prior to output (pollout). A pollout is maybe not so commonly used compared to its opposite (pollin) which we will touch on in the next section and we will revisit polling in more detail in section Poller.
The opposite to sending to a socket is receiving from a socket. We may do this with recv()
. The misspelling of this verb has a long history in network programming and ZeroMQ and cppzmq cheerfully continues it. Like with send()
, we have one recv()
method for a message, one for a buffer, and for both we give recv flags.
// Fill a message passed by reference
auto res = sock.recv(msg, zmq::recv_flags::none);
// Fill a suitably pre-sized mutable_buffer
auto res = sock.recv(buf, zmq::recv_flags::none);
These two examples correspond to the low-level {{{zman(zmq_msg_recv,3)}}} and {{{zman(zmq_recv,3)}}} respectively. When using the buffer to receive the data, take care that it has sufficient capacity to hold the expected message. The care you must take pays back in avoiding a copy.
Like send()
, recv()
takes flags but with this method we may often want to specify a flag besides none
. There is however only one other choice:
dontwait
- if the recv may block instead return immediately with error EAGAIN
This flag can be useful if our application wants to do a “quick check” to see if a message is waiting, and to receive it. If no message is sitting in the socket’s input queue, the recv()
will return immediately. If we use none
then recv()
may, in principle, wait until the end of the universe before returning.
Also like send()
, our recv()
returns a std::optional
result which is empty if EAGAIN was returned by libzmq. Otherwise it returns the number of bytes received. And likewise, this return value also must be saved to a variable. The same comments about the importance to check this value as described in the Send results section apply here.
When we recv()
with message_t
, a non-empty receive result will hold the size in bytes of the message.
OTOH, when using a buffer, it is extra important to check the receive result. If non-empty it holds a zmq::recv_buffer_size
which is a little struct
holding two size attributes. The size
says how much data we received and the untruncated_size
tells us how much data ZeroMQ wanted to give us. If the two values differ we know our application failed to provide a large enough buffer. Oops.
Depending on the receive flags we can either assure an immediate return from recv()
or we may risk it never returning. That’s a tough dichotomy to live with. Thankfully we can assure that the call waits at most some time in between those extremes using receive polling. This is discussed more below in the section Poller.
Usually we develop an application that makes some ZeroMQ sockets, do our bind()/connect()
and some send()/recv()
and that’s all we need for a lot of fun. There’s not much detail to worry about. Sometimes we have a tough problem that needs something special and there’s very likely a way that past ZeroMQ community members have found a solution. We then just need to find and set the right socket options.
The rather long list of possible options (85 counted today) are given in {{{zman(zmq_setsockopt,3)}}}. There they have names like ZMQ_IMMEDIATE
or ZMQ_ROUTING_ID
which are CPP macros expanding to some integer. For each, cppzmq creates a type tag to use with friendly get()/set()
methods on socket_t
.
sock.set(zmq::sockopt::immediate, false);
sock.set(zmq::sockopt::routing_id, "me");
auto rid = sock.get(zmq::sockopt::routing_id);
We will still need to carefully read zmq_setsockopt
to discover and understand what options may help us, but then applying or querying them with a cppzmq socket is simple.
One special socket option lets us set socket properties. These can be retrieved from messages that pass through the socket as described above in Message metadata.
sock.set(zmq::sockopt::metadata, zmq::str_buffer("X-color:purple"));
auto res = sock.recv(msg, zmq::recv_flags::none);
std::string val = msg.gets("X-color");
assert(val == "purple");
Sockets in cppzmq cannot be copied (they can be moved), while it is usual that we want various parts of our application code to share access to the same socket. We can pass around an lvalue reference to our socket but that is not always possible or convenient.
To help with this, we are given a zmq::socket_ref
which refers to but does not own a socket. With a socket ref our code can do almost everything it would do with a full socket object and it can know if the underlying socket has been closed.
zmq::socket_t sock(ctx, zmq::socket_type::pub);
zmq::socket_ref sref = sock;
// pass ref by value (copy)
do_some_socket_stuff(sref);
// Check if our socket is still there
if (sref == nullptr) {
respond_to_closure();
}
// Do stuff with collections of socket refs
std::unordered_set<zmq::socket_ref> bag_of_socks;
bag_of_socks.insert(sref);
for (auto s : bag_of_socks) {
auto res = s.send(bcast, zmq::send_flags::none);
}
Our nice cppzmq socket is a facade over an opaque socket object from libzmq. Rarely do we care about that but there may be cases where we do. For example, if we want to directly call some libzmq function or maybe inter-operate with CZMQ code, we can do so by getting the underlying libzmq socket.
zmq::socket_t sock = ...;
void* libzmq_socket = sock.handle();
// use libzmq to bind. We would probably never do this as we'd prefer
// to use sock.bind(), but it shows the possibility.
int port = zmq_bind(libzmq_socket, "tcp://*:*");
// Use a CZMQ socket. We might do something this if we use nice CZMQ
// CLASS like Zyre!
zsock_t czmq_socket = zsock_new_pub("tcp://*:*");
zmq::socket_ref sock(zmq::from_handle, zsock_resolve(czmq_socket));
Most of the time, let’s ignore we have a “handle” on the libzmq socket and rest assured that if we ever need it, it will be there.
ZeroMQ gives us great power and all it asks is that we stay out of its way. To nag ZeroMQ to tell us every detail about its internal operation is also to slow it down and impede its job. We strive then to relax and use ZeroMQ sockets as they are intended, a sink or a source of messages to and from the cosmos.
However, and zen platitudes aside, sometimes we must get uptight and worry about what our trusty ZeroMQ friend is busy doing. For that, cppzmq provides us a socket monitor, spelled zmq::monitor_t
.
The way for our application to make use of a monitor_t
is to subclass it and implement some of the many monitor_t:on_event_*()
virtual methods. Each such method corresponds to one of the event types listed in {{{zman(zmq_socket_monitor,3)}}}. We may also limit the events that our monitor reacts to by giving it a list of events as a bitmap.
class connect_monitor_t : public zmq::monitor_t {
public:
void on_event_connected(const zmq_event_t& event,
const char* addr) override
{
std::cout << "got connection from " << addr << std::endl;
}
};
// elsewhere
zmq::socket_t sock = ...;
const int events = ZMQ_EVENT_CONNECTED;
// Monitor sock using the given transport for internal communication
connect_monitor_t mon(sock, "inproc://conmon", events);
// mon runs forever....
// Now, try it a different way:
connect_monitor_t mon2;
// here, default is ZMQ_EVENT_ALL
mon2.init(sock, "inproc://conmon2");
// init returns and we poll when we want
if (! mon2.check_event(100)) {
std::cout << "timeout" << std::endl;
}
In this example, the construction of mon
with arguments tells it to run forever. This might make the rest of the code in our application jealous so it would have been better were we to put it running in its own thread. For mon2
we default construct and then after an explicit init()
we may check for any activity with a timeout. In this case we ask for all events but since our connect_monitor_t
implements but one callback, all the other events result in silent no-op calls to the base class on_event_*()
methods.
In several places above we have alluded to something called polling. It was even used, if maybe not noticed, in the example in the Monitor section when we call check_event(100)
. This call takes at most 100 milliseconds to return. If an event is waiting or arrives within that timeout, it will return even sooner. Had we used a timeout of -1 the method will never return if no new monitored events were raised.
This act of waiting for an event with a finite or infinite timeout is generally termed polling. Most often, as with check_event()
we poll for the event that a message is ready to recv()
. This is called pollin (“poll in”) or in libzmq spelling ZMQ_POLLIN
. Less common, but important for robust applications is to poll for the ability to send. This is called pollout or ZMQ_POLLOUT
.
A “poller” then helps application code to respond to a successful poll or to know that instead the poll timed out. In cppzmq we have the zmq::poller_t
in zmq.hpp
and the active_poller_t
from zmq_addon.hpp
.
In the following example, we construct two poller_t
instances, one to poll on input and one to poll on output, then add some sockets and their associated events (pollin/pollout). We then use the pollers to wait up to a timeout. Depending on the return value, our application reacts.
// We could combine the pollers but here keep input and output polling
// separate.
zmq::poller_t<> in_poller, out_poller;
// Our application has two input sockets and one output.
in_poller.add(input_socket1, zmq::event_flags::pollout);
in_poller.add(input_socket2, zmq::event_flags::pollout);
out_poller.add(output_socket, zmq::event_flags::pollout);
const std::chrono::milliseconds timeout{100};
std::vector<zmq::poller_event<>> in_events(2);
std::vector<zmq::poller_event<>> out_events(1);
while (true) {
const auto nin = in_poller.wait_all(in_events, timeout);
if (!nin) {
std::cout << "input timeout, try again" << std::endl;
continue;
}
for (int ind=0; ind<nin; ++ind) {
zmq::message_t msg;
auto rres = in_events[ind].socket.recv(msg, zmq::recv_flags::none);
const auto nout = out_poller.wait_all(out_events, timeout);
if (!nout) {
std::cout << "output timeout, freakout" << std::endl;
abort();
}
auto sres = out_events[0].socket.send(msg, zmq::send_flags::none);
}
}
In the example above you notice the poller_t<>
declaration. That empty template argument list sure is curious. We may fill it in order to associate some user data to a socket event being polled. This user data is added along with the socket and event and is then made available in any event resulting from a poll. Here is an example where the user data is of type int
.
using mypoller_t = zmq::poller_t<int>;
mypoller_t poller;
int val = 42;
poller.add(sock, zmq::event_flags::pollin, &val);
std::vector<mypoller_t> events(1);
auto n = poller.wait_all(events, timeout);
if (n) {
assert(42 == *events[0].val);
}
Each socket/event registered with .add()
can have its own user data but they must all be of the same (pointer to) type.
So far this tour has taken us through zmq.hpp
and here, right in the middle of this section, we move to zmq_addon.hpp
. As noted above, this switch of files feels somewhat arbitrary to me. Parts of zmq_addon.hpp
are more “core” for my own use while parts of zmq.hpp
are not so “core” to me. So, let us not worry that we now switch our focus to a header that on the surface may sound somehow secondary. It’s in fact full of more goodies.
The active_poller_t
is created and filled with sockets in a similar manner as poller_t
. In fact it uses a poller_t
under the hood. However, its add()
method takes a third argument which is a std::function<void(event_flags)>
and that function will be called when that event is seen.
This allows our application the option of structuring its response to events differently. Instead of complexity held inside the loop, it can be placed in functions or lambdas.
void chirp(zmq::event_flags ef);
zmq::active_poller_t ap;
ap.add(sock1, zmq::event_flags::pollin, [](zmq::event_flags ef) {
std::cout << "sock1 got " << ef << std::endl;
});
ap.add(sock2, zmq::event_flags::pollin, chirp);
while (true) {
auto n = ap.wait(timeout);
if (n) {
std::cout << "got " << n << " sockets hit\n";
}
else {
std::cout << "timeout" << std::endl;
}
}
When I first started to learn ZeroMQ I learned lots of interesting things and also struggled with lots of confusion. One thing that confused me extra was multipart messages. Take for example the delightful puzzle from {{{zman(zmq_msg_send,3)}}} which describes
int zmq_msg_send (zmq_msg_t *msg, void *socket, int flags);
Multi-part messages: A 0MQ message is composed of 1 or more message parts. Each message part is an independent
zmq_msg_t
in its own right. 0MQ ensures atomic delivery of messages: peers shall receive either all message parts of a message or none at all. The total number of message parts is unlimited except by available memory.
So, we send a zmq_msg_t
which is a message, but really it’s a message part and we can send many of them but really we send only one. Okay, okay, so, it all kind of makes sense now, but at the start it was all rather confusing.
Thankfully, things are more clear and precise using cppzmq’s terms and objects. Previously in this tour, we described a message_t
as being of a single part because it held data in a single contiguous block of memory. We will stick to that but then describe how cppzmq allows us to use multiple calls to send()
or recv()
in order to transmit multiple, single-part messages altogether. Finally, we will describe the multipart_t
object of cppzmq which allows us to aggregate multiple, single-part message_t
in useful ways and then send()
or recv()
them with but a single call. Finally we describe how to send and receive multiple parts with out even any message_t
at all.
ZeroMQ has the concept of ”more”. As we send a single (single-part) message we also tell the socket, “you know what, next I’m gonna send you another message and I want you to wait until I do that and then you can transmit both of them together”. This is done with cppzmq like:
zmq::message_t msg1, msg2;
auto res = sock.send(msg1, zmq::send_flags::sndmore);
res = sock.send(msg2, zmq::send_flags::none);
That’s ”more” and of course you can have more ”more” messages than just the first one. The chain can continue and, as it does, ZeroMQ is collecting these messages behind the scenes, waiting for that final none
. When none
comes, ZeroMQ races to do its magic to assure every message in the chain is delivered to all linked sockets or none of them are.
On the other end of the transport, ZeroMQ receives the data from these multiple ”more” messages and presents it to the application through multiple calls to recv()
. We must write our application to expect this chain of separate-but-together messages and cppzmq provides the help we need.
zmq::message_t msg1, msg2;
auto res = sock.recv(msg1, zmq::recv_flags::none);
if (!msg1.more()) {
std::cout << "Oh dear, sheeps are going astray\n";
}
else {
auto res = sock.recv(msg2, zmq::recv_flags::none);
}
Enter multipart_t
. I hesitate to call this a “message” because it’s really multiple messages which are aggregated into a collection (called multipart_t
). Behind the scenes a multipart_t::send()
and ::recv()
are simply handling multiple single-part message_t
objects with ”more”, as described above. Later we will describe how this can be done without even forming a multipart_t
by using code which is to send/recv sort of like what buffers are to message_t
.
But first multipart_t
, which we should think about as if it were an STL collection (and in fact it uses a std::deque<message_t>
under the covers). We can construct them in many ways.
zmq::multipart_t empty;
zmq::multipart_t first_from_recv(sock);
std::vector<int> data(1024,0);
zmq::multipart_t first_from_data(data.data(), data.size()*sizeof(int));
zmq::multipart_t first_from_string(std::string("hello world"));
As the variable names are meant to imply, these latter constructors fill in an initial element of the multipart_t
container. We can then add more message_t
elements:
zmq::message_t msg;
zmq::multipart_t mp, mp2;
// Prefix and postfix concatenation.
mp.prepend(mp2);
mp.append(mp2);
// push a part in various forms to front, also "add" variants to back.
mp.pushmem(data, size);
mp.pushstr("hello again");
mp.pushtyp(myobject); // effectively a memcpy
mp.push(msg);
mp.push_back(msg); // same as push, but STL spelling
We can also remove a message (a part):
// from front
zmq::message_t msg = mp.pop();
std::string s = mp.popstr();
MyObject mo = mp.pop<MyObject>();
// from back
zmq::message_t msg = mp.remove();
// all
mp.clear();
We can query and iterate on the multipart_t
in many ways:
zmq::message_t& first = mp.front();
zmq::message_t& last = mp.back();
zmq::message_t* nth = mp.peek(42);
std::count << mp.size() << " parts:\n";
for (auto& msg : mp) {
std::cout << msg.size() << " bytes:\n" << std::endl;
std::cout << msg.str() << std::endl;
}
if (mp.empty()) {
std::cout << "no parts\n";
}
Finally, getting back to multiple single-part message transmission we may do:
// Clear and receive one or MORE message_t from a socket
mp.recv(sock);
// Send the one or MORE message_t parts to a socket
mp.send(sock);
This happily hides all the ”more” business from our delicate eyes.
Building up a multipart_t
to send and receive data that is already “multipart’ish” can require unwanted code and memory copies. If your application already has some collection of data, why should you go to this extra trouble? The answer is, you may not need to.
The templated recv_multipart()
and send_multipart()
free functions are provided. They may be used with a multipart_t
as a replacement for that classes send()
and recv()
. But they may work with other collections holding message_t
or buffers or other types. First some receives.
std::vector<message_t> msgs;
aut res = zmq::recv_multipart(sock, std::back_inserter(msgs));
zmq::multipart_t mp;
aut res = zmq::recv_multipart(sock, std::back_inserter(mp));
And some sends.
std::vector<zmq::message_t> msgs;
auto res = zmq::send_multipart(sock, msgs);
std::vector<const_buffer> cbufs;
auto res = zmq::send_multipart(sock, cbufs);
std::vector<mutable_buffer> mbufs;
auto res = zmq::send_multipart(sock, mbufs);
We just saw how cppzmq can transmit a sequence of multiple, single-part messages in the ZeroMQ ”more” way and how the multipart_t
can be used to bundle up the ”more“‘ing inside the application. CZMQ introduced another way to handle multiple, single-part messages and that is to encode them into a single, single-part message inside the application.
With the nice multipart_t
class methods and free functions for send/recv that were just described, why introduce a codec? The reason is that the new thread-safe sockets such as SERVER/CLIENT do not support the ”more” algorithm at all. They send one message_t
at a time Something must be done in if application protocol requires transmitting an sequence of messages in an atomic manner. The inspiration for this was generaldomo which generalizes the Majordomo protocol (v1) to use SERVER/CLIENT as well as ROUTER/DEALER.
So, we must do something to squish multiple messages into single one. A codec for this can be invented by any application but interoperability is increased if a common one is shared by various ZeroMQ bindings and so the one used in CZMQ is offered in the cppzmq.
Okay, enough blah blah about the codec.
There are two ways to use it. One may call methods on multipart_t
or one may use free functions in analogy to how multipart send()/recv()
was described above. First, the multipart_t
methods.
zmq::multipart_t mp;
zmq::message_t msg = mp.encode();
zmq::multipart_t mp2;
// in place, decode+append
mp2.decode_append(msg);
// return constructed
zmq::multipart_t mp3 = zmq::multipart_t::decode(msg);
Here’s the codec exercised with free functions. They have the added flexibility to operate on buffers.
std::vector<zmq::message_t> msgs, msgs2;
auto msg = zmq::encode(msgs);
zmq::decode(msg, std::back_inserter(msgs2));
std::vector<zmq::const_buffer> cbufs;
auto msg = zmq::encode(cbufs);
zmq::multipart_t mp;
zmq::decode(msg, std::back_inserter(mp));
That’s it. Go forth and make the next great ZeroMQ application in C++!