diff --git a/src/test/main_test.cpp b/src/test/main_test.cpp index 6a550e9..9205128 100644 --- a/src/test/main_test.cpp +++ b/src/test/main_test.cpp @@ -90,10 +90,13 @@ extern const char accTxIss1[]; extern const char accTxIss2[]; extern const char submIss[]; +std::mutex gMCons; + static void fail(boost::beast::error_code ec, char const* what) { auto s = fmt::format("{}: {}", what, ec.message()); + std::lock_guard cl(gMCons); std::cerr << "Error, throw: " << s << std::endl; throw std::runtime_error(s); } @@ -109,8 +112,11 @@ wait_for( return true; auto const b = gCv.wait_for(l, to, stop); - DBG(std::cout << msg << ", wait finished: " << (b ? "condition" : "timeout") - << std::endl;) + DBG({ + std::lock_guard cl(gMCons); + std::cout << msg << ", wait finished: " << (b ? "condition" : "timeout") + << std::endl; + }) return b; } @@ -152,6 +158,12 @@ class engineLoc throw std::runtime_error("No 'method' parameter"); auto const method = msg[ripple::jss::method].asString(); + DBG({ + std::lock_guard cl(gMCons); + std::cout << "process(), method:" << method << ", side: " << side() + << std::endl; + }) + std::string s; unsigned const id = msg["id"].asUInt(); @@ -182,7 +194,10 @@ class engineLoc if (!clientInit_) { - DBG(std::cout << side() << " clientInit" << std::endl;) + DBG({ + std::lock_guard cl(gMCons); + std::cout << side() << " clientInit" << std::endl; + }) std::unique_lock l(gMcv); clientInit_ = true; gCv.notify_all(); @@ -251,6 +266,12 @@ class engineIss throw std::runtime_error("No 'method' parameter"); auto const method = msg[ripple::jss::method].asString(); + DBG({ + std::lock_guard cl(gMCons); + std::cout << "process(), method:" << method << ", side: " << side() + << std::endl; + }) + std::string s; unsigned const id = msg["id"].asUInt(); @@ -288,7 +309,10 @@ class engineIss if (!clientInit_) { - DBG(std::cout << side() << " clientInit" << std::endl;) + DBG({ + std::lock_guard cl(gMCons); + std::cout << side() << " clientInit" << std::endl; + }) std::unique_lock l(gMcv); clientInit_ = true; gCv.notify_all(); @@ -371,7 +395,11 @@ class session : public std::enable_shared_from_this> void run() { - DBG(std::cout << "session::run()" << std::endl;) + DBG({ + std::lock_guard cl(gMCons); + std::cout << "session::run()" + << ", side: " << e_.side() << std::endl; + }) ws_.set_option(websocket::stream_base::timeout::suggested( boost::beast::role_type::server)); ws_.set_option(websocket::stream_base::decorator( @@ -390,7 +418,11 @@ class session : public std::enable_shared_from_this> void onAccept(boost::beast::error_code ec) { - DBG(std::cout << "session::onAccept(), ec:" << ec << std::endl;) + DBG({ + std::lock_guard cl(gMCons); + std::cout << "session::onAccept(), ec:" << ec + << ", side: " << e_.side() << std::endl; + }) if (ec == websocket::error::closed) return; if (ec) @@ -401,7 +433,11 @@ class session : public std::enable_shared_from_this> void doRead() { - DBG(std::cout << "session::doRead()" << std::endl;) + DBG({ + std::lock_guard cl(gMCons); + std::cout << "session::doRead()" + << ", side: " << e_.side() << std::endl; + }) ws_.async_read( buffer_, boost::beast::bind_front_handler( @@ -413,8 +449,12 @@ class session : public std::enable_shared_from_this> void onRead(boost::beast::error_code ec, std::size_t bytes_transferred) { - DBG(std::cout << "session::onRead(), ec:" << ec - << " bytes: " << bytes_transferred << std::endl;) + DBG({ + std::lock_guard cl(gMCons); + std::cout << "session::onRead(), ec:" << ec + << " bytes: " << bytes_transferred + << ", side: " << e_.side() << std::endl; + }) boost::ignore_unused(bytes_transferred); if (ec == websocket::error::closed) return; @@ -461,8 +501,12 @@ class session : public std::enable_shared_from_this> void onWrite(boost::beast::error_code ec, std::size_t bytes_transferred) { - DBG(std::cout << "session::onWrite(), ec:" << ec - << " bytes: " << bytes_transferred << std::endl;) + DBG({ + std::lock_guard cl(gMCons); + std::cout << "session::onWrite(), ec:" << ec + << " bytes: " << bytes_transferred + << ", side: " << e_.side() << std::endl; + }) boost::ignore_unused(bytes_transferred); if (ec == websocket::error::closed) return; @@ -475,11 +519,18 @@ class session : public std::enable_shared_from_this> void shutdown() { - DBG(std::cout << "session::shutdown()" << std::endl;) + DBG({ + std::lock_guard cl(gMCons); + std::cout << "session::shutdown()" + << ", side: " << e_.side() << std::endl; + }) ios_.post([this] { ws_.async_close({}, [this](boost::beast::error_code const& ec) { - DBG(std::cout << "session::onAsync_close(), ec:" << ec - << std::endl;) + DBG({ + std::lock_guard cl(gMCons); + std::cout << "session::onAsync_close(), ec:" << ec + << ", side: " << e_.side() << std::endl; + }) std::unique_lock l(gMcv); this->finished_ = true; gCv.notify_all(); @@ -502,7 +553,10 @@ class session : public std::enable_shared_from_this> void sendNewLedger() { - DBG(std::cout << e_.side() << " session::sendNewLedger()" << std::endl;) + DBG({ + std::lock_guard cl(gMCons); + std::cout << e_.side() << " session::sendNewLedger()" << std::endl; + }) auto const jv = e_.getNewLedger(); auto const s = to_string(jv); @@ -518,8 +572,12 @@ class session : public std::enable_shared_from_this> void onWriteNewLedger(boost::beast::error_code ec, std::size_t bytes_transferred) { - DBG(std::cout << e_.side() << " session::onWriteNewLedger(), ec:" << ec - << " bytes: " << bytes_transferred << std::endl;) + DBG({ + std::lock_guard cl(gMCons); + std::cout << " session::onWriteNewLedger(), ec:" << ec + << ", bytes: " << bytes_transferred + << ", side: " << side() << std::endl; + }) boost::ignore_unused(bytes_transferred); if (ec == websocket::error::closed) return; @@ -538,6 +596,12 @@ class session : public std::enable_shared_from_this> { return e_.blobOk(); } + + std::string + side() const + { + return e_.side(); + } }; //------------------------------------------------------------------------------ @@ -567,7 +631,11 @@ class listener : public std::enable_shared_from_this> if (ec) fail(ec, "bind"); - DBG(std::cout << "listener::listen()" << std::endl;) + DBG({ + std::lock_guard cl(gMCons); + std::cout << "listener::listen()" + << ", side: " << session_->side() << std::endl; + }) acceptor_.listen(boost::asio::socket_base::max_listen_connections, ec); if (ec) fail(ec, "listen"); @@ -576,17 +644,29 @@ class listener : public std::enable_shared_from_this> void run() { - DBG(std::cout << "listener::run()" << std::endl); + DBG({ + std::lock_guard cl(gMCons); + std::cout << "listener::run()" + << ", side: " << session_->side() << std::endl; + }) doAccept(); } void shutdown() { - DBG(std::cout << "listener::shutdown()" << std::endl); + DBG({ + std::lock_guard cl(gMCons); + std::cout << "listener::shutdown()" + << ", side: " << session_->side() << std::endl; + }) boost::system::error_code ec; acceptor_.cancel(ec); - DBG(if (ec) std::cout << "cancel error: " << ec << std::endl); + DBG(if (ec) { + std::lock_guard cl(gMCons); + std::cout << "cancel error: " << ec + << ", side: " << session_->side() << std::endl; + }) if (session_) session_->shutdown(); } @@ -619,7 +699,11 @@ class listener : public std::enable_shared_from_this> void doAccept() { - DBG(std::cout << "listener::doAccept()" << std::endl); + DBG({ + std::lock_guard cl(gMCons); + std::cout << "listener::doAccept()" + << ", side: " << session_->side() << std::endl; + }) acceptor_.async_accept( boost::asio::make_strand(ios_), boost::beast::bind_front_handler( @@ -631,7 +715,11 @@ class listener : public std::enable_shared_from_this> void onAccept(boost::beast::error_code ec, tcp::socket socket) { - DBG(std::cout << "listener::onAccept(), ec:" << ec << std::endl); + DBG({ + std::lock_guard cl(gMCons); + std::cout << "listener::onAccept(), ec:" << ec + << ", side: " << session_->side() << std::endl; + }) if (ec == boost::system::errc::operation_canceled) return; if (ec) @@ -664,19 +752,28 @@ struct Connection void startIOThreads() { - DBG(std::cout << "Connection::startIOThreads()" << std::endl;) + DBG({ + std::lock_guard cl(gMCons); + std::cout << "Connection::startIOThreads()" << std::endl; + }) ioThreads_.reserve(NUM_THREADS); for (auto i = 0; i < NUM_THREADS; ++i) ioThreads_.emplace_back([this, i] { ios_.run(); - DBG(std::cout << "ioThread #" << i << " finished" << std::endl;) + DBG({ + std::lock_guard cl(gMCons); + std::cout << "ioThread #" << i << " finished" << std::endl; + }) }); } void waitIOThreads() { - DBG(std::cout << "Connection::waitIOThreads()" << std::endl;) + DBG({ + std::lock_guard cl(gMCons); + std::cout << "Connection::waitIOThreads()" << std::endl; + }) for (auto& t : ioThreads_) if (t.joinable()) t.join(); @@ -686,7 +783,10 @@ struct Connection void startServers() { - DBG(std::cout << "Connection::startServers()" << std::endl;) + DBG({ + std::lock_guard cl(gMCons); + std::cout << "Connection::startServers()" << std::endl; + }) auto const address = boost::asio::ip::make_address(LHOST); serverLoc_ = std::make_shared>( ios_, tcp::endpoint{address, PORT_LOC}); @@ -699,7 +799,10 @@ struct Connection void shutdownServers() { - DBG(std::cout << "Connection::shutdownServers()" << std::endl;) + DBG({ + std::lock_guard cl(gMCons); + std::cout << "Connection::shutdownServers()" << std::endl; + }) serverLoc_->shutdown(); serverIss_->shutdown(); wait_for(1s, [this]() { @@ -788,17 +891,15 @@ class Main_test : public beast::unit_test::suite bool locRepl = false, issRepl = false; for (std::string l; (!locRepl || !issRepl) && std::getline(logf, l);) { - if (l.find( - "initSyncDone start replay {\"chainType\": \"locking\",") != - std::string::npos) + if (l.find("initSyncDone start replay {\"chainType\": " + "\"locking\",") != std::string::npos) { locRepl = true; continue; } - if (l.find( - "initSyncDone start replay {\"chainType\": \"issuing\",") != - std::string::npos) + if (l.find("initSyncDone start replay {\"chainType\": " + "\"issuing\",") != std::string::npos) { issRepl = true; continue; diff --git a/src/xbwd/federator/Federator.cpp b/src/xbwd/federator/Federator.cpp index 25d9411..266c5c1 100644 --- a/src/xbwd/federator/Federator.cpp +++ b/src/xbwd/federator/Federator.cpp @@ -347,7 +347,7 @@ Federator::readDBAttests(ChainType ct) } txnsInProcessing_[ct].insert( - fmt::format("create: {:x}", createCount)); + {fmt::format("create: {:x}", createCount), transID}); if (!autoSubmit_[ct]) continue; @@ -458,7 +458,8 @@ Federator::readDBAttests(ChainType ct) continue; } - txnsInProcessing_[ct].insert(fmt::format("claim: {:x}", claimID)); + txnsInProcessing_[ct].insert( + {fmt::format("claim: {:x}", claimID), transID}); if (!autoSubmit_[ct]) continue; @@ -781,17 +782,15 @@ Federator::onEvent(event::XChainCommitDetected const& e) } auto const txnIdHex = ripple::strHex(e.txnHash_.begin(), e.txnHash_.end()); - auto const res = - txnsInProcessing_[ct].insert(fmt::format("claim: {:x}", e.claimID_)); + auto const res = txnsInProcessing_[ct].insert( + {fmt::format("claim: {:x}", e.claimID_), txnIdHex}); if (!res.second) { // Already have this transaction // TODO: Sanity check the claim id and deliveredAmt match // TODO: Stop historical transaction collection JLOGV( - j_.error(), - "XChainCommit already present", - jv("event", e.toJson())); + j_.warn(), "XChainCommit already present", jv("event", e.toJson())); return; // Don't store it again } @@ -991,7 +990,7 @@ Federator::onEvent(event::XChainAccountCreateCommitDetected const& e) auto const txnIdHex = ripple::strHex(e.txnHash_.begin(), e.txnHash_.end()); auto const res = txnsInProcessing_[ct].insert( - fmt::format("create: {:x}", e.createCount_)); + {fmt::format("create: {:x}", e.createCount_), txnIdHex}); if (!res.second) { // Already have this transaction @@ -1331,7 +1330,7 @@ Federator::onEvent(event::XChainAttestsResult const& e) if ((e.type_ == xbwd::XChainTxnType::xChainAddClaimAttestation) && e.claimID_) { - cnt = txnsInProcessing_[oct].erase( + cnt = txnsInProcessing_[oct].get<1>().erase( fmt::format("claim: {:x}", *e.claimID_)); } else if ( @@ -1339,7 +1338,7 @@ Federator::onEvent(event::XChainAttestsResult const& e) xbwd::XChainTxnType::xChainAddAccountCreateAttestation) && e.createCount_) { - cnt = txnsInProcessing_[oct].erase( + cnt = txnsInProcessing_[oct].get<1>().erase( fmt::format("create: {:x}", *e.createCount_)); } else diff --git a/src/xbwd/federator/Federator.h b/src/xbwd/federator/Federator.h index 58e8066..bdff7ed 100644 --- a/src/xbwd/federator/Federator.h +++ b/src/xbwd/federator/Federator.h @@ -36,6 +36,10 @@ #include #include +#include +#include +#include +#include #include #include @@ -289,6 +293,24 @@ struct AttestedHistoryTx fromEvent(FederatorEvent const& e); }; +struct TransactionCache +{ + std::string id_; // ClaimID or CreateID + std::string tx_; // tx hash of XChainCommit / XChainAccountCreateCommit + // transaction +}; + +typedef boost::multi_index::multi_index_container< + TransactionCache, + boost::multi_index::indexed_by< + boost::multi_index::hashed_unique< + boost::multi_index:: + member>, + boost::multi_index::hashed_non_unique< + boost::multi_index:: + member>>> + TransactionCacheContainer; + class Federator { enum LoopTypes { @@ -336,8 +358,10 @@ class Federator ChainArray> GUARDED_BY(txnsMutex_) errored_; // Cache of the events added to processing. It is added so as not to read - // the DB. No need for mutex as event processing is synchronized. - ChainArray> txnsInProcessing_; + // the DB. No need for mutex as event processing is synchronized. Insertion + // check transaction id for uniqueness. Deletion remove all the entry with + // the same claimID + ChainArray txnsInProcessing_; // "Window" size for sending attestations // 0 - no "window"