Skip to content

Commit

Permalink
boost-ssl not depends on tbb
Browse files Browse the repository at this point in the history
  • Loading branch information
cyjseagull committed Oct 19, 2024
1 parent 323e5a0 commit 831e99b
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 17 deletions.
4 changes: 4 additions & 0 deletions bcos-boostssl/bcos-boostssl/websocket/WsInitializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ void WsInitializer::initWsService(WsService::Ptr _wsService)

auto builder = std::make_shared<WsStreamDelegateBuilder>();

// set the threadPool
auto threadPool = std::make_shared<bcos::ThreadPool>("ws-pool", _config->threadPoolSize());
_wsService->setThreadPool(threadPool);

std::shared_ptr<boost::asio::ssl::context> srvCtx = nullptr;
std::shared_ptr<boost::asio::ssl::context> clientCtx = nullptr;
if (!_config->disableSsl())
Expand Down
6 changes: 2 additions & 4 deletions bcos-boostssl/bcos-boostssl/websocket/WsService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ WsService::WsService()
WsService::~WsService()
{
stop();
m_taskGroup.wait();
WEBSOCKET_SERVICE(INFO) << LOG_KV("[DELOBJ][WsService]", this);
}

Expand Down Expand Up @@ -141,8 +140,7 @@ void WsService::stop()
{
m_ioservicePool->stop();
}
m_taskGroup.cancel();
m_taskGroup.wait();
m_threadPool->stop();

if (m_statTimer)
{
Expand Down Expand Up @@ -390,7 +388,7 @@ std::shared_ptr<WsSession> WsService::newSession(
_wsStreamDelegate->setMaxReadMsgSize(m_config->maxMsgSize());

std::string endPoint = _wsStreamDelegate->remoteEndpoint();
auto session = m_sessionFactory->createSession(m_taskGroup);
auto session = m_sessionFactory->createSession(m_threadPool);

session->setWsStreamDelegate(std::move(_wsStreamDelegate));
session->setIoc(m_ioservicePool->getIOService());
Expand Down
6 changes: 4 additions & 2 deletions bcos-boostssl/bcos-boostssl/websocket/WsService.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
#include <bcos-utilities/Common.h>
#include <bcos-utilities/IOServicePool.h>
#include <bcos-utilities/ThreadPool.h>
#include <oneapi/tbb/task_group.h>
#include <boost/asio/deadline_timer.hpp>
#include <boost/asio/dispatch.hpp>
#include <boost/asio/strand.hpp>
Expand Down Expand Up @@ -189,9 +188,12 @@ class WsService : public std::enable_shared_from_this<WsService>
return m_reconnectedPeers;
}

void setThreadPool(std::shared_ptr<bcos::ThreadPool> threadPool)
{
m_threadPool = std::move(threadPool);
}
private:
bool m_running{false};
tbb::task_group m_taskGroup;

int32_t m_waitConnectFinishTimeout = 30000;

Expand Down
12 changes: 6 additions & 6 deletions bcos-boostssl/bcos-boostssl/websocket/WsSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ using namespace bcos::boostssl;
using namespace bcos::boostssl::ws;
using namespace bcos::boostssl::http;

WsSession::WsSession(tbb::task_group& taskGroup)
: m_taskGroup(taskGroup)
WsSession::WsSession(std::shared_ptr<bcos::ThreadPool> threadPool)
: m_threadPool(std::move(threadPool))
{
WEBSOCKET_SESSION(INFO) << LOG_KV("[NEWOBJ][WSSESSION]", this);

Expand Down Expand Up @@ -86,7 +86,7 @@ void WsSession::drop(uint32_t _reason)
WEBSOCKET_SESSION(TRACE)
<< LOG_DESC("the session has been disconnected") << LOG_KV("seq", cbEntry.first);

m_taskGroup.run([callback = std::move(callback), error]() {
m_threadPool->enqueue([callback = std::move(callback), error]() {
callback->respCallBack(error, nullptr, nullptr);
});
}
Expand All @@ -103,7 +103,7 @@ void WsSession::drop(uint32_t _reason)
m_wsStreamDelegate->close();
}

m_taskGroup.run([self]() {
m_threadPool->enqueue([self]() {
auto session = self.lock();
if (session)
{
Expand Down Expand Up @@ -190,7 +190,7 @@ void WsSession::onReadPacket()
void WsSession::onMessage(bcos::boostssl::MessageFace::Ptr _message)
{
// task enqueue
m_taskGroup.run([self = weak_from_this(), _message = std::move(_message)]() {
m_threadPool->enqueue([self = weak_from_this(), _message = std::move(_message)]() {
auto session = self.lock();
if (!session)
{
Expand Down Expand Up @@ -478,7 +478,7 @@ void WsSession::onRespTimeout(const boost::system::error_code& _error, const std
WEBSOCKET_SESSION(WARNING) << LOG_BADGE("onRespTimeout") << LOG_KV("seq", _seq);

auto error = BCOS_ERROR_PTR(WsError::TimeOut, "waiting for message response timed out");
m_taskGroup.run([callback = std::move(callback), error = std::move(error)]() {
m_threadPool->enqueue([callback = std::move(callback), error = std::move(error)]() {
callback->respCallBack(error, nullptr, nullptr);
});
}
8 changes: 3 additions & 5 deletions bcos-boostssl/bcos-boostssl/websocket/WsSession.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
#include <bcos-utilities/Common.h>
#include <bcos-utilities/ThreadPool.h>
#include <bcos-utilities/Timer.h>
#include <oneapi/tbb/task_group.h>
#include <boost/asio/deadline_timer.hpp>
#include <boost/asio/strand.hpp>
#include <boost/beast/core.hpp>
Expand All @@ -53,7 +52,7 @@ class WsSession : public std::enable_shared_from_this<WsSession>,
using Ptrs = std::vector<std::shared_ptr<WsSession>>;

public:
explicit WsSession(tbb::task_group& taskGroup);
explicit WsSession(std::shared_ptr<bcos::ThreadPool> threadPool);

virtual ~WsSession() noexcept
{
Expand Down Expand Up @@ -180,7 +179,6 @@ class WsSession : public std::enable_shared_from_this<WsSession>,
};

protected:
tbb::task_group& m_taskGroup;
// flag for message that need to check respond packet like p2p message
bool m_needCheckRspPacket = false;
//
Expand Down Expand Up @@ -231,9 +229,9 @@ class WsSessionFactory
virtual ~WsSessionFactory() = default;

public:
virtual WsSession::Ptr createSession(tbb::task_group& taskGroup)
virtual WsSession::Ptr createSession(std::shared_ptr<bcos::ThreadPool> threadPool)
{
auto session = std::make_shared<WsSession>(taskGroup);
auto session = std::make_shared<WsSession>(threadPool);
return session;
}
};
Expand Down

0 comments on commit 831e99b

Please sign in to comment.