Skip to content

Commit

Permalink
zmq: simplify SetUsedSize implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
rbx committed Dec 17, 2020
1 parent aae766d commit 22709ba
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 92 deletions.
108 changes: 22 additions & 86 deletions fairmq/zeromq/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,17 @@ class Message final : public fair::mq::Message
public:
Message(FairMQTransportFactory* factory = nullptr)
: fair::mq::Message(factory)
, fUsedSizeModified(false)
, fUsedSize(0)
, fAlignment(0)
, fMsg(tools::make_unique<zmq_msg_t>())
, fViewMsg(nullptr)
{
if (zmq_msg_init(fMsg.get()) != 0) {
LOG(error) << "failed initializing message, reason: " << zmq_strerror(errno);
}
}
Message(Alignment alignment, FairMQTransportFactory* factory = nullptr)
: fair::mq::Message(factory)
, fUsedSizeModified(false)
, fUsedSize(0)
, fAlignment(alignment.alignment)
, fMsg(tools::make_unique<zmq_msg_t>())
, fViewMsg(nullptr)
{
if (zmq_msg_init(fMsg.get()) != 0) {
LOG(error) << "failed initializing message, reason: " << zmq_strerror(errno);
Expand All @@ -64,11 +58,8 @@ class Message final : public fair::mq::Message

Message(const size_t size, FairMQTransportFactory* factory = nullptr)
: fair::mq::Message(factory)
, fUsedSizeModified(false)
, fUsedSize(size)
, fAlignment(0)
, fMsg(tools::make_unique<zmq_msg_t>())
, fViewMsg(nullptr)
{
void* ptr = malloc(size);
if (!ptr) {
Expand All @@ -81,11 +72,8 @@ class Message final : public fair::mq::Message

Message(const size_t size, Alignment alignment, FairMQTransportFactory* factory = nullptr)
: fair::mq::Message(factory)
, fUsedSizeModified(false)
, fUsedSize(size)
, fAlignment(alignment.alignment)
, fMsg(tools::make_unique<zmq_msg_t>())
, fViewMsg(nullptr)
{
void* ptr = nullptr;
if (alignment.alignment != 0) {
Expand All @@ -103,11 +91,8 @@ class Message final : public fair::mq::Message

Message(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr, FairMQTransportFactory* factory = nullptr)
: fair::mq::Message(factory)
, fUsedSizeModified(false)
, fUsedSize(0)
, fAlignment(0)
, fMsg(tools::make_unique<zmq_msg_t>())
, fViewMsg(nullptr)
{
if (zmq_msg_init_data(fMsg.get(), data, size, ffn, hint) != 0) {
LOG(error) << "failed initializing message with data, reason: " << zmq_strerror(errno);
Expand All @@ -116,11 +101,8 @@ class Message final : public fair::mq::Message

Message(UnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0, FairMQTransportFactory* factory = nullptr)
: fair::mq::Message(factory)
, fUsedSizeModified(false)
, fUsedSize(0)
, fAlignment(0)
, fMsg(tools::make_unique<zmq_msg_t>())
, fViewMsg(nullptr)
{
// FIXME: make this zero-copy:
// simply taking over the provided buffer can casue premature delete, since region could be
Expand Down Expand Up @@ -174,29 +156,14 @@ class Message final : public fair::mq::Message

void* GetData() const override
{
if (!fViewMsg) {
if (zmq_msg_size(fMsg.get()) > 0) {
return zmq_msg_data(fMsg.get());
} else {
return nullptr;
}
if (zmq_msg_size(fMsg.get()) > 0) {
return zmq_msg_data(fMsg.get());
} else {
if (zmq_msg_size(fViewMsg.get()) > 0) {
return zmq_msg_data(fViewMsg.get());
} else {
return nullptr;
}
return nullptr;
}
}

size_t GetSize() const override
{
if (fUsedSizeModified) {
return fUsedSize;
} else {
return zmq_msg_size(fMsg.get());
}
}
size_t GetSize() const override { return zmq_msg_size(fMsg.get()); }

// To emulate shrinking, a new message is created with the new size (ViewMsg), that points to
// the original buffer with the new size. Once the "view message" is transfered, the original is
Expand All @@ -205,30 +172,25 @@ class Message final : public fair::mq::Message
// happens.
bool SetUsedSize(const size_t size) override
{
if (size <= zmq_msg_size(fMsg.get())) {
fUsedSize = size;
fUsedSizeModified = true;
if (size == GetSize()) {
// nothing to do
return true;
} else {
} else if (size > GetSize()) {
LOG(error) << "cannot set used size higher than original.";
return false;
}
}

void ApplyUsedSize()
{
// Apply only once (before actual send).
// The check is needed because a send could fail and can be reattempted by the user, in this
// case we do not want to modify buffer again.
if (fUsedSizeModified && !fViewMsg) {
fViewMsg = tools::make_unique<zmq_msg_t>();
void* ptr = zmq_msg_data(fMsg.get());
if (zmq_msg_init_data(fViewMsg.get(), ptr, fUsedSize, [](void* /* data */, void* obj) {
} else {
auto newMsg = tools::make_unique<zmq_msg_t>();
void* data = GetData();
if (zmq_msg_init_data(newMsg.get(), data, size, [](void* /* data */, void* obj) {
zmq_msg_close(static_cast<zmq_msg_t*>(obj));
delete static_cast<zmq_msg_t*>(obj);
}, fMsg.release()) != 0) {
LOG(error) << "failed initializing view message, reason: " << zmq_strerror(errno);
}, fMsg.get()) != 0) {
LOG(error) << "failed initializing message with data, reason: " << zmq_strerror(errno);
return false;
}
fMsg.release();
fMsg.swap(newMsg);
return true;
}
}

Expand All @@ -250,49 +212,23 @@ class Message final : public fair::mq::Message
LOG(error) << "failed copying message, reason: " << zmq_strerror(errno);
return;
}

// if the target message has been resized, apply same to this message also
if (zMsg.fUsedSizeModified) {
fUsedSizeModified = true;
fUsedSize = zMsg.fUsedSize;
}
}

~Message() override { CloseMessage(); }

private:
bool fUsedSizeModified;
size_t fUsedSize;
size_t fAlignment;
std::unique_ptr<zmq_msg_t> fMsg;
std::unique_ptr<zmq_msg_t> fViewMsg; // view on a subset of fMsg (treating it as user buffer)

zmq_msg_t* GetMessage() const
{
if (!fViewMsg) {
return fMsg.get();
} else {
return fViewMsg.get();
}
}
zmq_msg_t* GetMessage() const { return fMsg.get(); }

void CloseMessage()
{
if (!fViewMsg) {
if (zmq_msg_close(fMsg.get()) != 0) {
LOG(error) << "failed closing message, reason: " << zmq_strerror(errno);
}
// reset the message object to allow reuse in Rebuild
fMsg.reset(nullptr);
} else {
if (zmq_msg_close(fViewMsg.get()) != 0) {
LOG(error) << "failed closing message, reason: " << zmq_strerror(errno);
}
// reset the message object to allow reuse in Rebuild
fViewMsg.reset(nullptr);
if (zmq_msg_close(fMsg.get()) != 0) {
LOG(error) << "failed closing message, reason: " << zmq_strerror(errno);
}
fUsedSizeModified = false;
fUsedSize = 0;
// reset the message object to allow reuse in Rebuild
fMsg.reset(nullptr);
}
};

Expand Down
4 changes: 0 additions & 4 deletions fairmq/zeromq/Socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,6 @@ class Socket final : public fair::mq::Socket
}
int elapsed = 0;

static_cast<Message*>(msg.get())->ApplyUsedSize();

int64_t actualBytes = zmq_msg_size(static_cast<Message*>(msg.get())->GetMessage());

while (true) {
Expand Down Expand Up @@ -211,8 +209,6 @@ class Socket final : public fair::mq::Socket
bool repeat = false;

for (unsigned int i = 0; i < vecSize; ++i) {
static_cast<Message*>(msgVec[i].get())->ApplyUsedSize();

int nbytes = zmq_msg_send(static_cast<Message*>(msgVec[i].get())->GetMessage(), fSocket, (i < vecSize - 1) ? ZMQ_SNDMORE | flags : flags);
if (nbytes >= 0) {
totalSize += nbytes;
Expand Down
17 changes: 15 additions & 2 deletions test/message/_message.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,20 @@ void RunPushPullWithMsgResize(const string& transport, const string& _address)

FairMQMessagePtr outMsg(push.NewMessage(1000));
ASSERT_EQ(outMsg->GetSize(), 1000);
outMsg->SetUsedSize(500);
memcpy(outMsg->GetData(), "ABC", 3);
ASSERT_EQ(outMsg->SetUsedSize(500), true);
ASSERT_EQ(outMsg->SetUsedSize(500), true);
ASSERT_EQ(outMsg->SetUsedSize(700), false);
ASSERT_EQ(outMsg->GetSize(), 500);
outMsg->SetUsedSize(250);
// check if the data is still intact
ASSERT_EQ(static_cast<char*>(outMsg->GetData())[0], 'A');
ASSERT_EQ(static_cast<char*>(outMsg->GetData())[1], 'B');
ASSERT_EQ(static_cast<char*>(outMsg->GetData())[2], 'C');
ASSERT_EQ(outMsg->SetUsedSize(250), true);
ASSERT_EQ(outMsg->GetSize(), 250);
ASSERT_EQ(static_cast<char*>(outMsg->GetData())[0], 'A');
ASSERT_EQ(static_cast<char*>(outMsg->GetData())[1], 'B');
ASSERT_EQ(static_cast<char*>(outMsg->GetData())[2], 'C');
FairMQMessagePtr msgCopy(push.NewMessage());
msgCopy->Copy(*outMsg);
ASSERT_EQ(msgCopy->GetSize(), 250);
Expand All @@ -53,6 +63,9 @@ void RunPushPullWithMsgResize(const string& transport, const string& _address)
FairMQMessagePtr inMsg(pull.NewMessage());
ASSERT_EQ(pull.Receive(inMsg), 250);
ASSERT_EQ(inMsg->GetSize(), 250);
ASSERT_EQ(static_cast<char*>(inMsg->GetData())[0], 'A');
ASSERT_EQ(static_cast<char*>(inMsg->GetData())[1], 'B');
ASSERT_EQ(static_cast<char*>(inMsg->GetData())[2], 'C');
}

void RunMsgRebuild(const string& transport)
Expand Down

0 comments on commit 22709ba

Please sign in to comment.