Skip to content

Commit

Permalink
fix MessagePayload decode error when the data size over 65556
Browse files Browse the repository at this point in the history
  • Loading branch information
cyjseagull committed Sep 10, 2024
1 parent 449a68c commit a02bfdd
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 41 deletions.
25 changes: 18 additions & 7 deletions cpp/wedpr-helper/ppc-utilities/Utilities.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,31 @@
namespace ppc
{
template <typename T>
inline uint64_t decodeNetworkBuffer(
T& _result, bcos::byte const* buffer, unsigned int bufferLen, uint64_t const offset)
inline uint64_t decodeNetworkBuffer(T& _result, bcos::byte const* buffer, unsigned int bufferLen,
uint64_t const offset, bool largeBuffer = false)
{
uint64_t curOffset = offset;
CHECK_OFFSET_WITH_THROW_EXCEPTION(curOffset, bufferLen);
// Notice: operator* is higher priority than operator+, the () is essential
auto dataLen =
boost::asio::detail::socket_ops::network_to_host_short(*((uint16_t*)(buffer + curOffset)));
curOffset += 2;
uint32_t dataLen = 0;
if (largeBuffer)
{
CHECK_OFFSET_WITH_THROW_EXCEPTION(curOffset + 4, bufferLen);
dataLen = boost::asio::detail::socket_ops::network_to_host_long(
*((uint32_t*)(buffer + curOffset)));
curOffset += 4;
}
else
{
CHECK_OFFSET_WITH_THROW_EXCEPTION(curOffset + 2, bufferLen);
dataLen = boost::asio::detail::socket_ops::network_to_host_short(
*((uint16_t*)(buffer + curOffset)));
curOffset += 2;
}
if (dataLen == 0)
{
return curOffset;
}
CHECK_OFFSET_WITH_THROW_EXCEPTION(curOffset, bufferLen);
CHECK_OFFSET_WITH_THROW_EXCEPTION(curOffset + dataLen, bufferLen);
_result.assign((bcos::byte*)buffer + curOffset, (bcos::byte*)buffer + curOffset + dataLen);
curOffset += dataLen;
return curOffset;
Expand Down
6 changes: 3 additions & 3 deletions cpp/wedpr-protocol/protocol/src/PPCMessage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
* @author: shawnhe
* @date 2022-10-19
*/

#include "PPCMessage.h"
#include "Common.h"
#include <json/json.h>
#include <boost/asio/detail/socket_ops.hpp>

Expand Down Expand Up @@ -98,7 +98,7 @@ int64_t PPCMessage::decode(uint32_t _length, bcos::byte* _data)
p += dataLength;
}

if (p)
if (p < _data + _length)
{
m_header.insert(m_header.begin(), p, _data + _length);
}
Expand Down Expand Up @@ -149,13 +149,13 @@ PPCMessageFace::Ptr PPCMessageFactory::decodePPCMessage(Message::Ptr msg)
// Note: this field is been setted when onReceiveMessage
if (frontMsg)
{
ppcMsg->decode(bcos::ref(frontMsg->data()));
ppcMsg->setSeq(frontMsg->seq());
ppcMsg->setUuid(frontMsg->traceID());
if (frontMsg->isRespPacket())
{
ppcMsg->setResponse();
}
ppcMsg->decode(bcos::ref(frontMsg->data()));
}
if (msg->header() && msg->header()->optionalField())
{
Expand Down
20 changes: 11 additions & 9 deletions cpp/wedpr-protocol/protocol/src/v1/MessageHeaderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,18 +65,19 @@ int64_t MessageOptionalHeaderImpl::decode(bcos::bytesConstRef data, uint64_t con
auto pointer = data.data() + offset;
m_componentType = boost::asio::detail::socket_ops::network_to_host_short(*((uint16_t*)pointer));
bcos::bytes componentType;
offset = decodeNetworkBuffer(componentType, data.data(), data.size(), (pointer - data.data()));
offset = decodeNetworkBuffer(
componentType, data.data(), data.size(), (pointer - data.data()), false);
m_componentType = std::string(componentType.begin(), componentType.end());
// srcNode
offset = decodeNetworkBuffer(m_srcNode, data.data(), data.size(), offset);
offset = decodeNetworkBuffer(m_srcNode, data.data(), data.size(), offset, false);
// source inst
offset = decodeNetworkBuffer(m_srcInst, data.data(), data.size(), offset);
offset = decodeNetworkBuffer(m_srcInst, data.data(), data.size(), offset, false);
// dstNode
offset = decodeNetworkBuffer(m_dstNode, data.data(), data.size(), offset);
offset = decodeNetworkBuffer(m_dstNode, data.data(), data.size(), offset, false);
// dstInst
offset = decodeNetworkBuffer(m_dstInst, data.data(), data.size(), offset);
offset = decodeNetworkBuffer(m_dstInst, data.data(), data.size(), offset, false);
// topic
offset = decodeNetworkBuffer(m_topic, data.data(), data.size(), offset);
offset = decodeNetworkBuffer(m_topic, data.data(), data.size(), offset, false);
return offset;
}

Expand Down Expand Up @@ -149,11 +150,12 @@ int64_t MessageHeaderImpl::decode(bcos::bytesConstRef data)
m_ext = boost::asio::detail::socket_ops::network_to_host_short(*((uint16_t*)pointer));
pointer += 2;
// the traceID
auto offset = decodeNetworkBuffer(m_traceID, data.data(), data.size(), (pointer - data.data()));
auto offset =
decodeNetworkBuffer(m_traceID, data.data(), data.size(), (pointer - data.data()), false);
// srcGwNode
offset = decodeNetworkBuffer(m_srcGwNode, data.data(), data.size(), offset);
offset = decodeNetworkBuffer(m_srcGwNode, data.data(), data.size(), offset, false);
// dstGwNode
offset = decodeNetworkBuffer(m_dstGwNode, data.data(), data.size(), offset);
offset = decodeNetworkBuffer(m_dstGwNode, data.data(), data.size(), offset, false);
// optionalField
if (hasOptionalField())
{
Expand Down
6 changes: 3 additions & 3 deletions cpp/wedpr-protocol/protocol/src/v1/MessagePayloadImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ int64_t MessagePayloadImpl::encode(bcos::bytes& buffer) const
buffer.insert(buffer.end(), (byte*)&traceIDLen, (byte*)&traceIDLen + 2);
buffer.insert(buffer.end(), m_traceID.begin(), m_traceID.end());
// data
uint16_t dataLen = boost::asio::detail::socket_ops::host_to_network_short(m_data.size());
buffer.insert(buffer.end(), (byte*)&dataLen, (byte*)&dataLen + 2);
uint32_t dataLen = boost::asio::detail::socket_ops::host_to_network_long(m_data.size());
buffer.insert(buffer.end(), (byte*)&dataLen, (byte*)&dataLen + 4);
buffer.insert(buffer.end(), m_data.begin(), m_data.end());
// update the length
m_length = buffer.size();
Expand Down Expand Up @@ -74,5 +74,5 @@ int64_t MessagePayloadImpl::decode(bcos::bytesConstRef buffer)
auto offset =
decodeNetworkBuffer(m_traceID, buffer.data(), buffer.size(), (pointer - buffer.data()));
// data
return decodeNetworkBuffer(m_data, buffer.data(), buffer.size(), offset);
return decodeNetworkBuffer(m_data, buffer.data(), buffer.size(), offset, true);
}
10 changes: 10 additions & 0 deletions cpp/wedpr-protocol/protocol/tests/MessageTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,16 @@ BOOST_AUTO_TEST_CASE(testMessage)
componentType, srcNode, srcInst, dstNode, dstInst, payload);
checkEncodeDecode(msgBuilder, msg);

// with payload over 65535
for (uint32_t i = 0; i < 10000000; i++)
{
payload->emplace_back(i);
}
msg = fakeMsg(msgBuilder, version, traceID, srcGwNode, dstGwNode, packetType, ttl, ext, topic,
componentType, srcNode, srcInst, dstNode, dstInst, payload);
checkEncodeDecode(msgBuilder, msg);


// with header router
traceID = "1233";
srcGwNode = "srcGwNode";
Expand Down
31 changes: 31 additions & 0 deletions cpp/wedpr-transport/ppc-front/ppc-front/Front.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,4 +160,35 @@ bcos::Error::Ptr Front::eraseTaskInfo(std::string const& _taskID)
{
FRONT_LOG(INFO) << LOG_DESC("eraseTaskInfo") << LOG_KV("front", m_front);
return m_front->unRegisterTopic(_taskID);
}

// register message handler for algorithm
void Front::registerMessageHandler(uint8_t _taskType, uint8_t _algorithmType,
std::function<void(front::PPCMessageFace::Ptr)> _handler)
{
uint16_t type = ((uint16_t)_taskType << 8) | _algorithmType;
auto self = weak_from_this();
m_front->registerMessageHandler(
std::to_string(type), [self, type, _handler](ppc::protocol::Message::Ptr msg) {
auto front = self.lock();
if (!front)
{
return;
}
try
{
if (msg == nullptr)
{
_handler(nullptr);
return;
}
_handler(front->m_messageFactory->decodePPCMessage(msg));
}
catch (std::exception const& e)
{
FRONT_LOG(WARNING) << LOG_DESC("Call handler for component failed")
<< LOG_KV("componentType", type)
<< LOG_KV("error", boost::diagnostic_information(e));
}
});
}
20 changes: 1 addition & 19 deletions cpp/wedpr-transport/ppc-front/ppc-front/Front.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,25 +61,7 @@ class Front : public FrontInterface, public std::enable_shared_from_this<Front>

// register message handler for algorithm
void registerMessageHandler(uint8_t _taskType, uint8_t _algorithmType,
std::function<void(front::PPCMessageFace::Ptr)> _handler) override
{
uint16_t type = ((uint16_t)_taskType << 8) | _algorithmType;
auto self = weak_from_this();
m_front->registerMessageHandler(
std::to_string(type), [self, _handler](ppc::protocol::Message::Ptr msg) {
auto front = self.lock();
if (!front)
{
return;
}
if (msg == nullptr)
{
_handler(nullptr);
return;
}
_handler(front->m_messageFactory->decodePPCMessage(msg));
});
}
std::function<void(front::PPCMessageFace::Ptr)> _handler) override;

std::vector<std::string> agencies() const override
{
Expand Down

0 comments on commit a02bfdd

Please sign in to comment.