Skip to content

Commit

Permalink
update boostssl (#112)
Browse files Browse the repository at this point in the history
* update boostssl

* rename getDesc to detail

* optimize wserror output
  • Loading branch information
cyjseagull authored Sep 13, 2024
1 parent c57dbac commit 6b09c39
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 31 deletions.
9 changes: 8 additions & 1 deletion bcos-boostssl/interfaces/NodeInfoDef.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ namespace bcos
{
namespace boostssl
{

/**
* @brief client end endpoint. Node will connect to NodeIPEndpoint.
*/
Expand Down Expand Up @@ -65,6 +64,14 @@ struct NodeIPEndpoint
{
return boost::asio::ip::tcp::endpoint(boost::asio::ip::make_address(m_host), m_port);
}
std::string detail() const
{
if (m_host.empty())
{
return "";
}
return m_host + ":" + std::to_string(m_port);
}

// Get the port associated with the endpoint.
uint16_t port() const { return m_port; };
Expand Down
57 changes: 57 additions & 0 deletions bcos-boostssl/websocket/WsError.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
* @date 2021-10-02
*/
#pragma once
#include <memory>
#include <sstream>
#include <string>

namespace bcos
{
Expand All @@ -43,6 +46,60 @@ enum WsError
MessageEncodeError = -4013
};

inline std::ostream& operator<<(std::ostream& _out, WsError const& error)
{
switch (error)
{
case WsError::AcceptError:
_out << "AcceptError";
break;
case WsError::ReadError:
_out << "ReadError";
break;
case WsError::WriteError:
_out << "WriteError";
break;
case WsError::PingError:
_out << "PingError";
break;
case WsError::PongError:
_out << "PongError";
break;
case WsError::PacketError:
_out << "PacketError";
break;
case WsError::SessionDisconnect:
_out << "SessionDisconnect";
break;
case WsError::UserDisconnect:
_out << "UserDisconnect";
break;
case WsError::TimeOut:
_out << "TimeOut";
break;
case WsError::NoActiveCons:
_out << "NoActiveCons";
break;
case WsError::EndPointNotExist:
_out << "EndPointNotExist";
break;
case WsError::MessageOverflow:
_out << "MessageOverflow";
break;
case WsError::UndefinedException:
_out << "UndefinedException";
break;
case WsError::MessageEncodeError:
_out << "MessageEncodeError";
break;
default:
_out << "Unkown";
break;
}
return _out;
}


inline bool notRetryAgain(int _wsError)
{
return (_wsError == boostssl::ws::WsError::MessageOverflow);
Expand Down
2 changes: 0 additions & 2 deletions bcos-boostssl/websocket/WsMessage.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ namespace boostssl
{
namespace ws
{


// the message format for ws protocol
class WsMessage : public boostssl::MessageFace
{
Expand Down
31 changes: 19 additions & 12 deletions bcos-boostssl/websocket/WsService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ WsService::asyncConnectToEndpoints(EndPointsPtr _peers)

for (auto& peer : *_peers)
{
std::string connectedEndPoint = peer.address() + ":" + std::to_string(peer.port());
std::string connectedEndPoint = peer.detail();

/*
WEBSOCKET_SERVICE(DEBUG) << LOG_BADGE("asyncConnect")
Expand All @@ -271,7 +271,7 @@ WsService::asyncConnectToEndpoints(EndPointsPtr _peers)

auto self = std::weak_ptr<WsService>(shared_from_this());
m_connector->connectToWsServer(host, port, m_config->disableSsl(),
[p, self, connectedEndPoint](boost::beast::error_code _ec,
[p, self, connectedEndPoint, peer](boost::beast::error_code _ec,
const std::string& _extErrorMsg,
std::shared_ptr<WsStreamDelegate> _wsStreamDelegate,
std::shared_ptr<std::string> _nodeId) {
Expand All @@ -290,7 +290,7 @@ WsService::asyncConnectToEndpoints(EndPointsPtr _peers)
}

auto session = service->newSession(_wsStreamDelegate, *_nodeId.get());
session->setEndPoint(connectedEndPoint);
session->setEndPoint(peer);
session->startAsClient();
});
}
Expand All @@ -307,7 +307,7 @@ void WsService::reconnect()
{
for (auto& peer : *m_reconnectedPeers)
{
std::string connectedEndPoint = peer.address() + ":" + std::to_string(peer.port());
std::string connectedEndPoint = peer.detail();
auto session = getSession(connectedEndPoint);
if (session)
{
Expand All @@ -321,9 +321,7 @@ void WsService::reconnect()
{
for (auto reconnectPeer : *connectPeers)
{
WEBSOCKET_SERVICE(INFO) << ("reconnect")
<< LOG_KV("peer", reconnectPeer.address() + ":" +
std::to_string(reconnectPeer.port()));
WEBSOCKET_SERVICE(INFO) << ("reconnect") << LOG_KV("peer", reconnectPeer.detail());
}
asyncConnectToEndpoints(connectPeers);
}
Expand Down Expand Up @@ -372,14 +370,13 @@ std::shared_ptr<WsSession> WsService::newSession(
{
_wsStreamDelegate->setMaxReadMsgSize(m_config->maxMsgSize());

std::string endPoint = _wsStreamDelegate->remoteEndpoint();
auto session = m_sessionFactory->createSession(m_moduleName);

session->setWsStreamDelegate(_wsStreamDelegate);
session->setIoc(m_ioservicePool->getIOService());
session->setThreadPool(threadPool());
session->setMessageFactory(messageFactory());
session->setEndPoint(endPoint);
session->setEndPoint(_wsStreamDelegate->remoteEndpointInfo());
session->setMaxWriteMsgSize(m_config->maxMsgSize());
session->setSendMsgTimeout(m_config->sendMsgTimeout());
session->setNodeId(_nodeId);
Expand Down Expand Up @@ -410,7 +407,7 @@ std::shared_ptr<WsSession> WsService::newSession(
});

WEBSOCKET_SERVICE(INFO) << LOG_BADGE("newSession") << LOG_DESC("start the session")
<< LOG_KV("endPoint", endPoint);
<< LOG_KV("endPoint", _wsStreamDelegate->remoteEndpoint());
return session;
}

Expand Down Expand Up @@ -459,6 +456,16 @@ std::shared_ptr<WsSession> WsService::getSession(const std::string& _endPoint)
return nullptr;
}

bool WsService::isConnected(NodeIPEndpoint const& _nodeIPEndpoint)
{
auto session = getSession(_nodeIPEndpoint.detail());
if (session == nullptr || !session->isConnected())
{
return false;
}
return true;
}

WsSessions WsService::sessions()
{
WsSessions sessions;
Expand Down Expand Up @@ -533,7 +540,7 @@ void WsService::onRecvMessage(
<< LOG_DESC("receive message from server")
<< LOG_KV("type", _msg->packetType()) << LOG_KV("seq", seq)
<< LOG_KV("endpoint", _session->endPoint())
<< LOG_KV("data size", _msg->payload()->size())
<< LOG_KV("dataSize", _msg->payload() ? _msg->payload()->size() : 0)
<< LOG_KV("use_count", _session.use_count());

auto typeHandler = getMsgHandler(_msg->packetType());
Expand Down Expand Up @@ -725,4 +732,4 @@ void WsService::broadcastMessage(
}

WEBSOCKET_SERVICE(DEBUG) << LOG_BADGE("broadcastMessage");
}
}
8 changes: 4 additions & 4 deletions bcos-boostssl/websocket/WsService.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ class WsService : public std::enable_shared_from_this<WsService>
std::string genConnectError(const std::string& _error, const std::string& endpoint, bool end);
void syncConnectToEndpoints(EndPointsPtr _peers);

virtual bool isConnected(NodeIPEndpoint const& _nodeIPEndpoint);

public:
std::shared_ptr<WsSession> newSession(
std::shared_ptr<WsStreamDelegate> _wsStreamDelegate, std::string const& _nodeId);
Expand Down Expand Up @@ -195,7 +197,7 @@ class WsService : public std::enable_shared_from_this<WsService>
return m_reconnectedPeers;
}

private:
protected:
bool m_running{false};

int32_t m_waitConnectFinishTimeout = 30000;
Expand All @@ -208,8 +210,6 @@ class WsService : public std::enable_shared_from_this<WsService>
// listen host port
std::string m_listenHost = "";
uint16_t m_listenPort = 0;
// nodeID
std::string m_nodeID;
// Config
std::shared_ptr<WsConfig> m_config;

Expand All @@ -228,7 +228,7 @@ class WsService : public std::enable_shared_from_this<WsService>
// timer
timer::TimerFactory::Ptr m_timerFactory = nullptr;

private:
protected:
// mutex for m_sessions
mutable boost::shared_mutex x_mutex;
// all active sessions
Expand Down
7 changes: 4 additions & 3 deletions bcos-boostssl/websocket/WsSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,15 @@ void WsSession::drop(uint32_t _reason)
{
WEBSOCKET_SESSION(INFO) << LOG_BADGE("drop")
<< LOG_DESC("the session has already been dropped")
<< LOG_KV("endpoint", m_endPoint) << LOG_KV("session", this);
<< LOG_KV("endpoint", m_endPoint.detail())
<< LOG_KV("session", this);
return;
}

m_isDrop = true;

WEBSOCKET_SESSION(INFO) << LOG_BADGE("drop") << LOG_KV("reason", _reason)
<< LOG_KV("endpoint", m_endPoint) << LOG_KV("session", this);
<< LOG_KV("endpoint", m_endPoint.detail()) << LOG_KV("session", this);

auto self = std::weak_ptr<WsSession>(shared_from_this());
// call callbacks
Expand All @@ -71,7 +72,7 @@ void WsSession::drop(uint32_t _reason)
<< LOG_KV("cb size", m_callbacks.size()) << LOG_KV("session", this);

Guard lockGuard(x_callback);

for (auto& cbEntry : m_callbacks)
{
auto callback = cbEntry.second;
Expand Down
9 changes: 6 additions & 3 deletions bcos-boostssl/websocket/WsSession.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
* @date 2021-07-28
*/
#pragma once
#include "../interfaces/NodeInfoDef.h"
#include "bcos-boostssl/interfaces/MessageFace.h"
#include <bcos-boostssl/httpserver/Common.h>
#include <bcos-boostssl/websocket/Common.h>
Expand Down Expand Up @@ -82,8 +83,10 @@ class WsSession : public std::enable_shared_from_this<WsSession>
Options _options = Options(), RespCallBack _respCallback = RespCallBack());


std::string endPoint() const { return m_endPoint; }
void setEndPoint(const std::string& _endPoint) { m_endPoint = _endPoint; }
std::string endPoint() const { return m_endPoint.detail(); }
NodeIPEndpoint const& endPointInfo() const { return m_endPoint; }
void setEndPoint(const NodeIPEndpoint& _endPoint) { m_endPoint = _endPoint; }


void setConnectHandler(WsConnectHandler _connectHandler) { m_connectHandler = _connectHandler; }
WsConnectHandler connectHandler() { return m_connectHandler; }
Expand Down Expand Up @@ -192,7 +195,7 @@ class WsSession : public std::enable_shared_from_this<WsSession>
// buffer used to read message
boost::beast::flat_buffer m_buffer;

std::string m_endPoint;
NodeIPEndpoint m_endPoint;
std::string m_connectedEndPoint;
std::string m_nodeId;

Expand Down
17 changes: 11 additions & 6 deletions bcos-boostssl/websocket/WsStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/
#pragma once

#include "../interfaces/NodeInfoDef.h"
#include <bcos-boostssl/httpserver/Common.h>
#include <bcos-boostssl/websocket/Common.h>
#include <bcos-boostssl/websocket/WsTools.h>
Expand Down Expand Up @@ -180,22 +181,21 @@ class WsStream
return std::string("");
}

virtual std::string remoteEndpoint()
virtual std::string remoteEndpoint() { return remoteEndpointInfo().detail(); }

virtual NodeIPEndpoint remoteEndpointInfo()
{
try
{
auto& s = tcpStream();
auto remoteEndpoint = s.socket().remote_endpoint();
auto endPoint =
remoteEndpoint.address().to_string() + ":" + std::to_string(remoteEndpoint.port());
return endPoint;
return NodeIPEndpoint(remoteEndpoint.address(), remoteEndpoint.port());
}
catch (const std::exception& e)
{
WEBSOCKET_STREAM(WARNING) << LOG_BADGE("remoteEndpoint") << LOG_KV("e", e.what());
}

return std::string("");
return NodeIPEndpoint();
}

private:
Expand Down Expand Up @@ -234,6 +234,11 @@ class WsStreamDelegate
return m_isSsl ? m_sslStream->remoteEndpoint() : m_rawStream->remoteEndpoint();
}

NodeIPEndpoint remoteEndpointInfo()
{
return m_isSsl ? m_sslStream->remoteEndpointInfo() : m_rawStream->remoteEndpointInfo();
}

void asyncWrite(const bcos::bytes& _buffer, bool _fin, WsStreamRWHandler _handler)
{
return m_isSsl ? m_sslStream->asyncWrite(_buffer, _fin, _handler) :
Expand Down

0 comments on commit 6b09c39

Please sign in to comment.