Skip to content

Commit

Permalink
RXI-1136 UT improvement:
Browse files Browse the repository at this point in the history
Added more logs
Added mutex for console output
  • Loading branch information
oleks-rip committed Jun 18, 2024
1 parent 1035a84 commit c1de51b
Showing 1 changed file with 126 additions and 33 deletions.
159 changes: 126 additions & 33 deletions src/test/main_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ namespace xbwd {
namespace tests {
namespace all {

#if defined(_DEBUG) && defined(TESTS_DEBUG)
#define TESTS_DEBUG

// #if defined(_DEBUG) && defined(TESTS_DEBUG)
#if defined(TESTS_DEBUG)
#define REL(...)
#define DBG(...) __VA_ARGS__
#define DBG_ARGS(...) __VA_OPT__(, ) __VA_ARGS__
Expand Down Expand Up @@ -90,10 +93,13 @@ extern const char accTxIss1[];
extern const char accTxIss2[];
extern const char submIss[];

std::mutex gMCons;

static void
fail(boost::beast::error_code ec, char const* what)
{
auto s = fmt::format("{}: {}", what, ec.message());
std::lock_guard cl(gMCons);
std::cerr << "Error, throw: " << s << std::endl;
throw std::runtime_error(s);
}
Expand All @@ -109,8 +115,11 @@ wait_for(
return true;

auto const b = gCv.wait_for(l, to, stop);
DBG(std::cout << msg << ", wait finished: " << (b ? "condition" : "timeout")
<< std::endl;)
DBG({
std::lock_guard cl(gMCons);
std::cout << msg << ", wait finished: " << (b ? "condition" : "timeout")
<< std::endl;
})
return b;
}

Expand Down Expand Up @@ -152,6 +161,11 @@ class engineLoc
throw std::runtime_error("No 'method' parameter");
auto const method = msg[ripple::jss::method].asString();

DBG({
std::lock_guard cl(gMCons);
std::cout << "loc::process(), method:" << method << std::endl;
})

std::string s;
unsigned const id = msg["id"].asUInt();

Expand Down Expand Up @@ -182,7 +196,10 @@ class engineLoc

if (!clientInit_)
{
DBG(std::cout << side() << " clientInit" << std::endl;)
DBG({
std::lock_guard cl(gMCons);
std::cout << side() << " clientInit" << std::endl;
})
std::unique_lock l(gMcv);
clientInit_ = true;
gCv.notify_all();
Expand Down Expand Up @@ -251,6 +268,11 @@ class engineIss
throw std::runtime_error("No 'method' parameter");
auto const method = msg[ripple::jss::method].asString();

DBG({
std::lock_guard cl(gMCons);
std::cout << "iss::process(), method:" << method << std::endl;
})

std::string s;
unsigned const id = msg["id"].asUInt();

Expand Down Expand Up @@ -288,7 +310,10 @@ class engineIss

if (!clientInit_)
{
DBG(std::cout << side() << " clientInit" << std::endl;)
DBG({
std::lock_guard cl(gMCons);
std::cout << side() << " clientInit" << std::endl;
})
std::unique_lock l(gMcv);
clientInit_ = true;
gCv.notify_all();
Expand Down Expand Up @@ -390,7 +415,11 @@ class session : public std::enable_shared_from_this<session<engine>>
void
onAccept(boost::beast::error_code ec)
{
DBG(std::cout << "session::onAccept(), ec:" << ec << std::endl;)
DBG({
std::lock_guard cl(gMCons);
std::cout << "session::onAccept(), ec:" << ec
<< ", side: " << e_.side() << std::endl;
})
if (ec == websocket::error::closed)
return;
if (ec)
Expand All @@ -401,7 +430,11 @@ class session : public std::enable_shared_from_this<session<engine>>
void
doRead()
{
DBG(std::cout << "session::doRead()" << std::endl;)
DBG({
std::lock_guard cl(gMCons);
std::cout << "session::doRead()"
<< ", side: " << e_.side() << std::endl;
})
ws_.async_read(
buffer_,
boost::beast::bind_front_handler(
Expand All @@ -413,8 +446,12 @@ class session : public std::enable_shared_from_this<session<engine>>
void
onRead(boost::beast::error_code ec, std::size_t bytes_transferred)
{
DBG(std::cout << "session::onRead(), ec:" << ec
<< " bytes: " << bytes_transferred << std::endl;)
DBG({
std::lock_guard cl(gMCons);
std::cout << "session::onRead(), ec:" << ec
<< " bytes: " << bytes_transferred
<< ", side: " << e_.side() << std::endl;
})
boost::ignore_unused(bytes_transferred);
if (ec == websocket::error::closed)
return;
Expand Down Expand Up @@ -461,8 +498,12 @@ class session : public std::enable_shared_from_this<session<engine>>
void
onWrite(boost::beast::error_code ec, std::size_t bytes_transferred)
{
DBG(std::cout << "session::onWrite(), ec:" << ec
<< " bytes: " << bytes_transferred << std::endl;)
DBG({
std::lock_guard cl(gMCons);
std::cout << "session::onWrite(), ec:" << ec
<< " bytes: " << bytes_transferred
<< ", side: " << e_.side() << std::endl;
})
boost::ignore_unused(bytes_transferred);
if (ec == websocket::error::closed)
return;
Expand All @@ -475,11 +516,18 @@ class session : public std::enable_shared_from_this<session<engine>>
void
shutdown()
{
DBG(std::cout << "session::shutdown()" << std::endl;)
DBG({
std::lock_guard cl(gMCons);
std::cout << "session::shutdown()"
<< ", side: " << e_.side() << std::endl;
})
ios_.post([this] {
ws_.async_close({}, [this](boost::beast::error_code const& ec) {
DBG(std::cout << "session::onAsync_close(), ec:" << ec
<< std::endl;)
DBG({
std::lock_guard cl(gMCons);
std::cout << "session::onAsync_close(), ec:" << ec
<< ", side: " << e_.side() << std::endl;
})
std::unique_lock l(gMcv);
this->finished_ = true;
gCv.notify_all();
Expand All @@ -502,7 +550,10 @@ class session : public std::enable_shared_from_this<session<engine>>
void
sendNewLedger()
{
DBG(std::cout << e_.side() << " session::sendNewLedger()" << std::endl;)
DBG({
std::lock_guard cl(gMCons);
std::cout << e_.side() << " session::sendNewLedger()" << std::endl;
})
auto const jv = e_.getNewLedger();
auto const s = to_string(jv);

Expand All @@ -518,8 +569,11 @@ class session : public std::enable_shared_from_this<session<engine>>
void
onWriteNewLedger(boost::beast::error_code ec, std::size_t bytes_transferred)
{
DBG(std::cout << e_.side() << " session::onWriteNewLedger(), ec:" << ec
<< " bytes: " << bytes_transferred << std::endl;)
DBG({
std::lock_guard cl(gMCons);
std::cout << e_.side() << " session::onWriteNewLedger(), ec:" << ec
<< " bytes: " << bytes_transferred << std::endl;
})
boost::ignore_unused(bytes_transferred);
if (ec == websocket::error::closed)
return;
Expand All @@ -538,6 +592,12 @@ class session : public std::enable_shared_from_this<session<engine>>
{
return e_.blobOk();
}

std::string
side() const
{
return e_.side();
}
};

//------------------------------------------------------------------------------
Expand Down Expand Up @@ -567,7 +627,11 @@ class listener : public std::enable_shared_from_this<listener<engine>>
if (ec)
fail(ec, "bind");

DBG(std::cout << "listener::listen()" << std::endl;)
DBG({
std::lock_guard cl(gMCons);
std::cout << "listener::listen()"
<< ", side: " << session_->side() << std::endl;
})
acceptor_.listen(boost::asio::socket_base::max_listen_connections, ec);
if (ec)
fail(ec, "listen");
Expand All @@ -576,17 +640,29 @@ class listener : public std::enable_shared_from_this<listener<engine>>
void
run()
{
DBG(std::cout << "listener::run()" << std::endl);
DBG({
std::lock_guard cl(gMCons);
std::cout << "listener::run()"
<< ", side: " << session_->side() << std::endl;
})
doAccept();
}

void
shutdown()
{
DBG(std::cout << "listener::shutdown()" << std::endl);
DBG({
std::lock_guard cl(gMCons);
std::cout << "listener::shutdown()"
<< ", side: " << session_->side() << std::endl;
})
boost::system::error_code ec;
acceptor_.cancel(ec);
DBG(if (ec) std::cout << "cancel error: " << ec << std::endl);
DBG(if (ec) {
std::lock_guard cl(gMCons);
std::cout << "cancel error: " << ec
<< ", side: " << session_->side() << std::endl;
})
if (session_)
session_->shutdown();
}
Expand Down Expand Up @@ -631,7 +707,11 @@ class listener : public std::enable_shared_from_this<listener<engine>>
void
onAccept(boost::beast::error_code ec, tcp::socket socket)
{
DBG(std::cout << "listener::onAccept(), ec:" << ec << std::endl);
DBG({
std::lock_guard cl(gMCons);
std::cout << "listener::onAccept(), ec:" << ec
<< ", side: " << session_->side() << std::endl;
})
if (ec == boost::system::errc::operation_canceled)
return;
if (ec)
Expand Down Expand Up @@ -664,19 +744,28 @@ struct Connection
void
startIOThreads()
{
DBG(std::cout << "Connection::startIOThreads()" << std::endl;)
DBG({
std::lock_guard cl(gMCons);
std::cout << "Connection::startIOThreads()" << std::endl;
})
ioThreads_.reserve(NUM_THREADS);
for (auto i = 0; i < NUM_THREADS; ++i)
ioThreads_.emplace_back([this, i] {
ios_.run();
DBG(std::cout << "ioThread #" << i << " finished" << std::endl;)
DBG({
std::lock_guard cl(gMCons);
std::cout << "ioThread #" << i << " finished" << std::endl;
})
});
}

void
waitIOThreads()
{
DBG(std::cout << "Connection::waitIOThreads()" << std::endl;)
DBG({
std::lock_guard cl(gMCons);
std::cout << "Connection::waitIOThreads()" << std::endl;
})
for (auto& t : ioThreads_)
if (t.joinable())
t.join();
Expand All @@ -686,7 +775,10 @@ struct Connection
void
startServers()
{
DBG(std::cout << "Connection::startServers()" << std::endl;)
DBG({
std::lock_guard cl(gMCons);
std::cout << "Connection::startServers()" << std::endl;
})
auto const address = boost::asio::ip::make_address(LHOST);
serverLoc_ = std::make_shared<listener<engineLoc>>(
ios_, tcp::endpoint{address, PORT_LOC});
Expand All @@ -699,7 +791,10 @@ struct Connection
void
shutdownServers()
{
DBG(std::cout << "Connection::shutdownServers()" << std::endl;)
DBG({
std::lock_guard cl(gMCons);
std::cout << "Connection::shutdownServers()" << std::endl;
})
serverLoc_->shutdown();
serverIss_->shutdown();
wait_for(1s, [this]() {
Expand Down Expand Up @@ -788,17 +883,15 @@ class Main_test : public beast::unit_test::suite
bool locRepl = false, issRepl = false;
for (std::string l; (!locRepl || !issRepl) && std::getline(logf, l);)
{
if (l.find(
"initSyncDone start replay {\"chainType\": \"locking\",") !=
std::string::npos)
if (l.find("initSyncDone start replay {\"chainType\": "
"\"locking\",") != std::string::npos)
{
locRepl = true;
continue;
}

if (l.find(
"initSyncDone start replay {\"chainType\": \"issuing\",") !=
std::string::npos)
if (l.find("initSyncDone start replay {\"chainType\": "
"\"issuing\",") != std::string::npos)
{
issRepl = true;
continue;
Expand Down

0 comments on commit c1de51b

Please sign in to comment.