Skip to content

Commit

Permalink
fix gatewayNodeInfo encode concurrency bug
Browse files Browse the repository at this point in the history
  • Loading branch information
cyjseagull committed Sep 6, 2024
1 parent 145eb51 commit 89d3ec5
Show file tree
Hide file tree
Showing 13 changed files with 172 additions and 120 deletions.
3 changes: 0 additions & 3 deletions cpp/wedpr-helper/ppc-utilities/Utilities.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,6 @@ inline uint64_t decodeNetworkBuffer(
return curOffset;
}
CHECK_OFFSET_WITH_THROW_EXCEPTION(curOffset, bufferLen);
//_result.insert(
// _result.end(), (bcos::byte*)buffer + curOffset, (bcos::byte*)buffer + curOffset +
// dataLen);
_result.assign((bcos::byte*)buffer + curOffset, (bcos::byte*)buffer + curOffset + dataLen);
curOffset += dataLen;
return curOffset;
Expand Down
2 changes: 2 additions & 0 deletions cpp/wedpr-transport/ppc-gateway/ppc-gateway/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
namespace ppc::gateway
{
#define GATEWAY_LOG(LEVEL) BCOS_LOG(LEVEL) << "[GATEWAY]"
#define SERVICE_LOG(LEVEL) BCOS_LOG(LEVEL) << "[GATEWAY][SERVICE]"
#define SERVICE_ROUTER_LOG(LEVEL) BCOS_LOG(LEVEL) << "[GATEWAY][SERVICE][ROUTER]"

// HTTP HEADER DEFINE
#define HEAD_TASK_ID "x-ptp-session-id"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ Service::Ptr GatewayFactory::buildService() const
wsInitializer->setConfig(wsConfig);
auto p2pService = std::make_shared<Service>(m_contextConfig->nodeID(),
std::make_shared<RouterTableFactoryImpl>(), m_config->gatewayConfig().unreachableDistance,
"Service");
"Gateway-Service");
p2pService->setTimerFactory(std::make_shared<bcos::timer::TimerFactory>());
p2pService->setNodeEndpoints(m_gatewayConfig->nodeIPEndpointSet());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,18 +58,35 @@ INodeInfo::Ptr GatewayNodeInfoImpl::nodeInfo(bcos::bytes const& nodeID) const
return nullptr;
}

void GatewayNodeInfoImpl::updateNodeList()
{
// Note: can't use clear_nodelist here, for clear_nodelist will destroy the allocated nodelist,
// and cause double release coredump
releaseWithoutDestory();
// re-encode nodeList
for (auto const& it : m_nodeList)
{
auto nodeInfo = std::dynamic_pointer_cast<NodeInfoImpl>(it.second);
m_rawGatewayInfo->mutable_nodelist()->UnsafeArenaAddAllocated(
nodeInfo->rawNodeInfo().get());
}
}

bool GatewayNodeInfoImpl::tryAddNodeInfo(INodeInfo::Ptr const& info)
{
auto nodeID = info->nodeID().toBytes();
auto existedNodeInfo = nodeInfo(nodeID);
// update the info
if (existedNodeInfo == nullptr || !existedNodeInfo->equal(info))
// the node info has not been updated
if (existedNodeInfo != nullptr && existedNodeInfo->equal(info))
{
return false;
}
{
bcos::WriteGuard l(x_nodeList);
m_nodeList[nodeID] = info;
return true;
updateNodeList();
}
return false;
return true;
}

void GatewayNodeInfoImpl::removeNodeInfo(bcos::bytes const& nodeID)
Expand All @@ -84,6 +101,7 @@ void GatewayNodeInfoImpl::removeNodeInfo(bcos::bytes const& nodeID)
}
bcos::UpgradeGuard ul(l);
m_nodeList.erase(it);
updateNodeList();
}
// remove the topic info
{
Expand Down Expand Up @@ -184,17 +202,6 @@ void GatewayNodeInfoImpl::unRegisterTopic(bcos::bytes const& nodeID, std::string

void GatewayNodeInfoImpl::encode(bcos::bytes& data) const
{
m_rawGatewayInfo->clear_nodelist();
{
bcos::ReadGuard l(x_nodeList);
// encode nodeList
for (auto const& it : m_nodeList)
{
auto nodeInfo = std::dynamic_pointer_cast<NodeInfoImpl>(it.second);
m_rawGatewayInfo->mutable_nodelist()->UnsafeArenaAddAllocated(
nodeInfo->rawNodeInfo().get());
}
}
encodePBObject(data, m_rawGatewayInfo);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,7 @@ class GatewayNodeInfoImpl : public GatewayNodeInfo

GatewayNodeInfoImpl(bcos::bytesConstRef data) : GatewayNodeInfoImpl() { decode(data); }

~GatewayNodeInfoImpl() override
{
// return back the ownership to nodeList
auto allocatedNodeListSize = m_rawGatewayInfo->nodelist_size();
for (int i = 0; i < allocatedNodeListSize; i++)
{
m_rawGatewayInfo->mutable_nodelist()->UnsafeArenaReleaseLast();
}
}
~GatewayNodeInfoImpl() override { releaseWithoutDestory(); }

// the gateway nodeID
std::string const& p2pNodeID() const override;
Expand Down Expand Up @@ -88,6 +80,19 @@ class GatewayNodeInfoImpl : public GatewayNodeInfo
return m_nodeList.size();
}

private:
void updateNodeList();

void releaseWithoutDestory()
{
// return back the ownership to nodeList to shared_ptr
auto allocatedNodeListSize = m_rawGatewayInfo->nodelist_size();
for (int i = 0; i < allocatedNodeListSize; i++)
{
m_rawGatewayInfo->mutable_nodelist()->UnsafeArenaReleaseLast();
}
}

private:
std::shared_ptr<ppc::proto::GatewayNodeInfo> m_rawGatewayInfo;
// NodeID => nodeInfo
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ void GatewayRouterManager::onReceiveNodeSeqMessage(MessageFace::Ptr msg, WsSessi
return;
}
// status changed, request for the nodeStatus
GATEWAY_LOG(TRACE) << LOG_DESC("onReceiveNodeSeqMessage") << LOG_KV("from", from)
GATEWAY_LOG(TRACE) << LOG_DESC("onReceiveNodeSeqMessage")
<< LOG_KV("from", printP2PIDElegantly(from))
<< LOG_KV("statusSeq", statusSeq);
m_service->asyncSendMessageByP2PNodeID(
(uint16_t)GatewayPacketType::RequestNodeStatus, from, std::make_shared<bcos::bytes>());
Expand Down Expand Up @@ -138,11 +139,11 @@ void GatewayRouterManager::onReceiveRequestNodeStatusMsg(
if (!nodeStatusData)
{
GATEWAY_LOG(WARNING) << LOG_DESC("onReceiveRequestNodeStatusMsg: generate nodeInfo error")
<< LOG_KV("from", from);
<< LOG_KV("from", printP2PIDElegantly(from));
return;
}
GATEWAY_LOG(TRACE) << LOG_DESC("onReceiveRequestNodeStatusMsg: response the latest nodeStatus")
<< LOG_KV("from", from);
<< LOG_KV("from", printP2PIDElegantly(from));
m_service->asyncSendMessageByP2PNodeID(
(uint16_t)GatewayPacketType::ResponseNodeStatus, from, nodeStatusData);
}
Expand All @@ -157,7 +158,8 @@ void GatewayRouterManager::onRecvResponseNodeStatusMsg(MessageFace::Ptr msg, WsS
p2pMessage->header()->srcGwNode() :
session->nodeId();

GATEWAY_LOG(INFO) << LOG_DESC("onRecvResponseNodeStatusMsg") << LOG_KV("from", from)
GATEWAY_LOG(INFO) << LOG_DESC("onRecvResponseNodeStatusMsg")
<< LOG_KV("from", printP2PIDElegantly(from))
<< LOG_KV("statusSeq", nodeStatus->statusSeq())
<< LOG_KV("agency", nodeStatus->agency());
updatePeerNodeStatus(from, nodeStatus);
Expand All @@ -176,7 +178,8 @@ void GatewayRouterManager::updatePeerNodeStatus(
UpgradeGuard ul(l);
m_p2pID2Seq[p2pID] = statusSeq;
}
GATEWAY_LOG(INFO) << LOG_DESC("updatePeerNodeStatus") << LOG_KV("from", p2pID)
GATEWAY_LOG(INFO) << LOG_DESC("updatePeerNodeStatus")
<< LOG_KV("from", printP2PIDElegantly(p2pID))
<< LOG_KV("statusSeq", status->statusSeq())
<< LOG_KV("agency", status->agency());
m_peerRouter->updateGatewayInfo(status);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class LocalRouter
increaseSeq();
}

virtual void registerTopic(std::string_view nodeID, std::string const& topic);
virtual void registerTopic(bcos::bytesConstRef nodeID, std::string const& topic);
virtual void unRegisterTopic(bcos::bytesConstRef nodeID, std::string const& topic);

virtual std::vector<ppc::front::IFrontClient::Ptr> chooseReceiver(
Expand Down
Loading

0 comments on commit 89d3ec5

Please sign in to comment.