diff --git a/fairmq/zeromq/Message.h b/fairmq/zeromq/Message.h index 0ee738092..e81e66afd 100644 --- a/fairmq/zeromq/Message.h +++ b/fairmq/zeromq/Message.h @@ -39,11 +39,8 @@ class Message final : public fair::mq::Message public: Message(FairMQTransportFactory* factory = nullptr) : fair::mq::Message(factory) - , fUsedSizeModified(false) - , fUsedSize(0) , fAlignment(0) , fMsg(tools::make_unique()) - , fViewMsg(nullptr) { if (zmq_msg_init(fMsg.get()) != 0) { LOG(error) << "failed initializing message, reason: " << zmq_strerror(errno); @@ -51,11 +48,8 @@ class Message final : public fair::mq::Message } Message(Alignment alignment, FairMQTransportFactory* factory = nullptr) : fair::mq::Message(factory) - , fUsedSizeModified(false) - , fUsedSize(0) , fAlignment(alignment.alignment) , fMsg(tools::make_unique()) - , fViewMsg(nullptr) { if (zmq_msg_init(fMsg.get()) != 0) { LOG(error) << "failed initializing message, reason: " << zmq_strerror(errno); @@ -64,11 +58,8 @@ class Message final : public fair::mq::Message Message(const size_t size, FairMQTransportFactory* factory = nullptr) : fair::mq::Message(factory) - , fUsedSizeModified(false) - , fUsedSize(size) , fAlignment(0) , fMsg(tools::make_unique()) - , fViewMsg(nullptr) { void* ptr = malloc(size); if (!ptr) { @@ -81,11 +72,8 @@ class Message final : public fair::mq::Message Message(const size_t size, Alignment alignment, FairMQTransportFactory* factory = nullptr) : fair::mq::Message(factory) - , fUsedSizeModified(false) - , fUsedSize(size) , fAlignment(alignment.alignment) , fMsg(tools::make_unique()) - , fViewMsg(nullptr) { void* ptr = nullptr; if (alignment.alignment != 0) { @@ -103,11 +91,8 @@ class Message final : public fair::mq::Message Message(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr, FairMQTransportFactory* factory = nullptr) : fair::mq::Message(factory) - , fUsedSizeModified(false) - , fUsedSize(0) , fAlignment(0) , fMsg(tools::make_unique()) - , fViewMsg(nullptr) { if (zmq_msg_init_data(fMsg.get(), data, size, ffn, hint) != 0) { LOG(error) << "failed initializing message with data, reason: " << zmq_strerror(errno); @@ -116,11 +101,8 @@ class Message final : public fair::mq::Message Message(UnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0, FairMQTransportFactory* factory = nullptr) : fair::mq::Message(factory) - , fUsedSizeModified(false) - , fUsedSize(0) , fAlignment(0) , fMsg(tools::make_unique()) - , fViewMsg(nullptr) { // FIXME: make this zero-copy: // simply taking over the provided buffer can casue premature delete, since region could be @@ -174,29 +156,14 @@ class Message final : public fair::mq::Message void* GetData() const override { - if (!fViewMsg) { - if (zmq_msg_size(fMsg.get()) > 0) { - return zmq_msg_data(fMsg.get()); - } else { - return nullptr; - } + if (zmq_msg_size(fMsg.get()) > 0) { + return zmq_msg_data(fMsg.get()); } else { - if (zmq_msg_size(fViewMsg.get()) > 0) { - return zmq_msg_data(fViewMsg.get()); - } else { - return nullptr; - } + return nullptr; } } - size_t GetSize() const override - { - if (fUsedSizeModified) { - return fUsedSize; - } else { - return zmq_msg_size(fMsg.get()); - } - } + size_t GetSize() const override { return zmq_msg_size(fMsg.get()); } // To emulate shrinking, a new message is created with the new size (ViewMsg), that points to // the original buffer with the new size. Once the "view message" is transfered, the original is @@ -205,30 +172,25 @@ class Message final : public fair::mq::Message // happens. bool SetUsedSize(const size_t size) override { - if (size <= zmq_msg_size(fMsg.get())) { - fUsedSize = size; - fUsedSizeModified = true; + if (size == GetSize()) { + // nothing to do return true; - } else { + } else if (size > GetSize()) { LOG(error) << "cannot set used size higher than original."; return false; - } - } - - void ApplyUsedSize() - { - // Apply only once (before actual send). - // The check is needed because a send could fail and can be reattempted by the user, in this - // case we do not want to modify buffer again. - if (fUsedSizeModified && !fViewMsg) { - fViewMsg = tools::make_unique(); - void* ptr = zmq_msg_data(fMsg.get()); - if (zmq_msg_init_data(fViewMsg.get(), ptr, fUsedSize, [](void* /* data */, void* obj) { + } else { + auto newMsg = tools::make_unique(); + void* data = GetData(); + if (zmq_msg_init_data(newMsg.get(), data, size, [](void* /* data */, void* obj) { zmq_msg_close(static_cast(obj)); delete static_cast(obj); - }, fMsg.release()) != 0) { - LOG(error) << "failed initializing view message, reason: " << zmq_strerror(errno); + }, fMsg.get()) != 0) { + LOG(error) << "failed initializing message with data, reason: " << zmq_strerror(errno); + return false; } + fMsg.release(); + fMsg.swap(newMsg); + return true; } } @@ -250,49 +212,23 @@ class Message final : public fair::mq::Message LOG(error) << "failed copying message, reason: " << zmq_strerror(errno); return; } - - // if the target message has been resized, apply same to this message also - if (zMsg.fUsedSizeModified) { - fUsedSizeModified = true; - fUsedSize = zMsg.fUsedSize; - } } ~Message() override { CloseMessage(); } private: - bool fUsedSizeModified; - size_t fUsedSize; size_t fAlignment; std::unique_ptr fMsg; - std::unique_ptr fViewMsg; // view on a subset of fMsg (treating it as user buffer) - zmq_msg_t* GetMessage() const - { - if (!fViewMsg) { - return fMsg.get(); - } else { - return fViewMsg.get(); - } - } + zmq_msg_t* GetMessage() const { return fMsg.get(); } void CloseMessage() { - if (!fViewMsg) { - if (zmq_msg_close(fMsg.get()) != 0) { - LOG(error) << "failed closing message, reason: " << zmq_strerror(errno); - } - // reset the message object to allow reuse in Rebuild - fMsg.reset(nullptr); - } else { - if (zmq_msg_close(fViewMsg.get()) != 0) { - LOG(error) << "failed closing message, reason: " << zmq_strerror(errno); - } - // reset the message object to allow reuse in Rebuild - fViewMsg.reset(nullptr); + if (zmq_msg_close(fMsg.get()) != 0) { + LOG(error) << "failed closing message, reason: " << zmq_strerror(errno); } - fUsedSizeModified = false; - fUsedSize = 0; + // reset the message object to allow reuse in Rebuild + fMsg.reset(nullptr); } }; diff --git a/fairmq/zeromq/Socket.h b/fairmq/zeromq/Socket.h index 1447a70f8..96aed8549 100644 --- a/fairmq/zeromq/Socket.h +++ b/fairmq/zeromq/Socket.h @@ -140,8 +140,6 @@ class Socket final : public fair::mq::Socket } int elapsed = 0; - static_cast(msg.get())->ApplyUsedSize(); - int64_t actualBytes = zmq_msg_size(static_cast(msg.get())->GetMessage()); while (true) { @@ -211,8 +209,6 @@ class Socket final : public fair::mq::Socket bool repeat = false; for (unsigned int i = 0; i < vecSize; ++i) { - static_cast(msgVec[i].get())->ApplyUsedSize(); - int nbytes = zmq_msg_send(static_cast(msgVec[i].get())->GetMessage(), fSocket, (i < vecSize - 1) ? ZMQ_SNDMORE | flags : flags); if (nbytes >= 0) { totalSize += nbytes; diff --git a/test/message/_message.cxx b/test/message/_message.cxx index ff2468731..a07305c0d 100644 --- a/test/message/_message.cxx +++ b/test/message/_message.cxx @@ -40,10 +40,20 @@ void RunPushPullWithMsgResize(const string& transport, const string& _address) FairMQMessagePtr outMsg(push.NewMessage(1000)); ASSERT_EQ(outMsg->GetSize(), 1000); - outMsg->SetUsedSize(500); + memcpy(outMsg->GetData(), "ABC", 3); + ASSERT_EQ(outMsg->SetUsedSize(500), true); + ASSERT_EQ(outMsg->SetUsedSize(500), true); + ASSERT_EQ(outMsg->SetUsedSize(700), false); ASSERT_EQ(outMsg->GetSize(), 500); - outMsg->SetUsedSize(250); + // check if the data is still intact + ASSERT_EQ(static_cast(outMsg->GetData())[0], 'A'); + ASSERT_EQ(static_cast(outMsg->GetData())[1], 'B'); + ASSERT_EQ(static_cast(outMsg->GetData())[2], 'C'); + ASSERT_EQ(outMsg->SetUsedSize(250), true); ASSERT_EQ(outMsg->GetSize(), 250); + ASSERT_EQ(static_cast(outMsg->GetData())[0], 'A'); + ASSERT_EQ(static_cast(outMsg->GetData())[1], 'B'); + ASSERT_EQ(static_cast(outMsg->GetData())[2], 'C'); FairMQMessagePtr msgCopy(push.NewMessage()); msgCopy->Copy(*outMsg); ASSERT_EQ(msgCopy->GetSize(), 250); @@ -53,6 +63,9 @@ void RunPushPullWithMsgResize(const string& transport, const string& _address) FairMQMessagePtr inMsg(pull.NewMessage()); ASSERT_EQ(pull.Receive(inMsg), 250); ASSERT_EQ(inMsg->GetSize(), 250); + ASSERT_EQ(static_cast(inMsg->GetData())[0], 'A'); + ASSERT_EQ(static_cast(inMsg->GetData())[1], 'B'); + ASSERT_EQ(static_cast(inMsg->GetData())[2], 'C'); } void RunMsgRebuild(const string& transport)