From 01b55f663cd79955b89981133d9adbec803a6e53 Mon Sep 17 00:00:00 2001 From: Igor Gaponenko Date: Sat, 7 Sep 2024 18:15:33 -0700 Subject: [PATCH] Relaxed filters when puling transaction contributions (1) The new filter allows to set an optional limiting the total number of contributions pulled from the database. The feature is used by the Web Dashboard to prevent overloading the UI and to avoid triggering heavy database queries. --- src/replica/services/DatabaseServices.h | 27 +++++++++++------ src/replica/services/DatabaseServicesMySQL.cc | 29 ++++++++++++------- src/replica/services/DatabaseServicesMySQL.h | 12 ++++---- src/replica/services/DatabaseServicesPool.cc | 14 +++++---- src/replica/services/DatabaseServicesPool.h | 6 ++-- 5 files changed, 54 insertions(+), 34 deletions(-) diff --git a/src/replica/services/DatabaseServices.h b/src/replica/services/DatabaseServices.h index f75635a69..7737f7337 100644 --- a/src/replica/services/DatabaseServices.h +++ b/src/replica/services/DatabaseServices.h @@ -940,14 +940,17 @@ class DatabaseServices : public std::enable_shared_from_this { /// @param table (optional) the base name of a table (all tables if not provided) /// @param workerName (optional) the name of a worker (all workers if not provided) /// @param typeSelector (optional) type of the contributions - /// @param includeWarnings if 'true' then include info on the MySQL warnings after LOAD DATA INFILE - /// @param includeRetries if 'true' then include info on the failed retries to pull the input data + /// @param includeWarnings (optional) if 'true' then include info on the MySQL warnings after LOAD DATA + /// INFILE + /// @param includeRetries (optional) if 'true' then include info on the failed retries to pull the input + /// data + /// @param maxEntries (optional) the maximum number of contributions to be reported (no limit if 0). virtual std::vector transactionContribs( TransactionId transactionId = 0, std::string const& table = std::string(), std::string const& workerName = std::string(), TransactionContribInfo::TypeSelector typeSelector = TransactionContribInfo::TypeSelector::SYNC_OR_ASYNC, - bool includeWarnings = false, bool includeRetries = false) = 0; + bool includeWarnings = false, bool includeRetries = false, size_t maxEntries = 0) = 0; /// @return contributions into a super-transaction for the given selectors /// @param transactionId (optional) a unique identifier of the transaction (all transactions if not @@ -957,29 +960,35 @@ class DatabaseServices : public std::enable_shared_from_this { /// @param table (optional) the base name of a table (all tables if not provided) /// @param workerName (optional) the name of a worker (all workers if not provided) /// @param typeSelector (optional) type of the contributions - /// @param includeWarnings if 'true' then include info on the MySQL warnings after LOAD DATA INFILE - /// @param includeRetries if 'true' then include info on the failed retries to pull the input data + /// @param includeWarnings (optional) if 'true' then include info on the MySQL warnings after LOAD DATA + /// INFILE + /// @param includeRetries (optional) if 'true' then include info on the failed retries to pull the input + /// data + /// @param maxEntries (optional) the maximum number of contributions to be reported (no limit if 0). virtual std::vector transactionContribs( TransactionId transactionId = 0, TransactionContribInfo::Status status = TransactionContribInfo::Status::IN_PROGRESS, std::string const& table = std::string(), std::string const& workerName = std::string(), TransactionContribInfo::TypeSelector typeSelector = TransactionContribInfo::TypeSelector::SYNC_OR_ASYNC, - bool includeWarnings = false, bool includeRetries = false) = 0; + bool includeWarnings = false, bool includeRetries = false, size_t maxEntries = 0) = 0; /// @return contributions into super-transactions for the given selectors /// @param database the name of a database /// @param table (optional) the base name of a table (all tables if not provided) /// @param workerName (optional) the name of a worker (all workers if not provided) /// @param typeSelector (optional) type of the contributions - /// @param includeWarnings if 'true' then include info on the MySQL warnings after LOAD DATA INFILE - /// @param includeRetries if 'true' then include info on the failed retries to pull the input data + /// @param includeWarnings (optional) if 'true' then include info on the MySQL warnings after LOAD DATA + /// INFILE + /// @param includeRetries (optional) if 'true' then include info on the failed retries to pull the input + /// data + /// @param maxEntries (optional) the maximum number of contributions to be reported (no limit if 0). virtual std::vector transactionContribs( std::string const& database, std::string const& table = std::string(), std::string const& workerName = std::string(), TransactionContribInfo::TypeSelector typeSelector = TransactionContribInfo::TypeSelector::SYNC_OR_ASYNC, - bool includeWarnings = false, bool includeRetries = false) = 0; + bool includeWarnings = false, bool includeRetries = false, size_t maxEntries = 0) = 0; /** * Insert the initial record on the contribution. diff --git a/src/replica/services/DatabaseServicesMySQL.cc b/src/replica/services/DatabaseServicesMySQL.cc index dcdcba8a1..7a46a9246 100644 --- a/src/replica/services/DatabaseServicesMySQL.cc +++ b/src/replica/services/DatabaseServicesMySQL.cc @@ -1625,7 +1625,8 @@ string DatabaseServicesMySQL::_typeSelectorPredicate( vector DatabaseServicesMySQL::transactionContribs( TransactionId transactionId, string const& table, string const& workerName, - TransactionContribInfo::TypeSelector typeSelector, bool includeWarnings, bool includeRetries) { + TransactionContribInfo::TypeSelector typeSelector, bool includeWarnings, bool includeRetries, + size_t maxEntries) { string const context = _context(__func__) + "transactionId=" + to_string(transactionId) + " table=" + table + " worker=" + workerName + " " + " typeSelector=" + TransactionContribInfo::typeSelector2str(typeSelector) + " "; @@ -1636,13 +1637,13 @@ vector DatabaseServicesMySQL::transactionContribs( table.empty() ? "" : _g.eq("table", table), workerName.empty() ? "" : _g.eq("worker", workerName), _typeSelectorPredicate(typeSelector)); - return _transactionContribs(lock, predicate, includeWarnings, includeRetries); + return _transactionContribs(lock, predicate, includeWarnings, includeRetries, maxEntries); } vector DatabaseServicesMySQL::transactionContribs( TransactionId transactionId, TransactionContribInfo::Status status, string const& table, string const& workerName, TransactionContribInfo::TypeSelector typeSelector, bool includeWarnings, - bool includeRetries) { + bool includeRetries, size_t maxEntries) { string const context = _context(__func__) + "transactionId=" + to_string(transactionId) + " status=" + TransactionContribInfo::status2str(status) + " table=" + table + " worker=" + workerName + " " + @@ -1654,12 +1655,13 @@ vector DatabaseServicesMySQL::transactionContribs( table.empty() ? "" : _g.eq("table", table), workerName.empty() ? "" : _g.eq("worker", workerName), _typeSelectorPredicate(typeSelector)); - return _transactionContribs(lock, predicate, includeWarnings, includeRetries); + return _transactionContribs(lock, predicate, includeWarnings, includeRetries, maxEntries); } vector DatabaseServicesMySQL::transactionContribs( string const& database, string const& table, string const& workerName, - TransactionContribInfo::TypeSelector typeSelector, bool includeWarnings, bool includeRetries) { + TransactionContribInfo::TypeSelector typeSelector, bool includeWarnings, bool includeRetries, + size_t maxEntries) { string const context = _context(__func__) + "database=" + database + " table=" + table + " worker=" + workerName + " " + " typeSelector=" + TransactionContribInfo::typeSelector2str(typeSelector) + " "; @@ -1671,7 +1673,7 @@ vector DatabaseServicesMySQL::transactionContribs( string const predicate = _g.packConds( _g.eq("database", database), table.empty() ? "" : _g.eq("table", table), workerName.empty() ? "" : _g.eq("worker", workerName), _typeSelectorPredicate(typeSelector)); - return _transactionContribs(lock, predicate, includeWarnings, includeRetries); + return _transactionContribs(lock, predicate, includeWarnings, includeRetries, maxEntries); } TransactionContribInfo DatabaseServicesMySQL::createdTransactionContrib( @@ -1840,13 +1842,16 @@ TransactionContribInfo DatabaseServicesMySQL::saveLastTransactionContribRetry( vector DatabaseServicesMySQL::_transactionContribs(replica::Lock const& lock, string const& predicate, bool includeWarnings, - bool includeRetries) { - string const context = _context(__func__) + "predicate=" + predicate + " "; + bool includeRetries, + size_t maxEntries) { + string const context = + _context(__func__) + "predicate=" + predicate + " maxEntries=" + to_string(maxEntries) + " "; LOGS(_log, LOG_LVL_DEBUG, context); vector collection; try { _conn->executeInOwnTransaction([&](decltype(_conn) conn) { - collection = _transactionContribsImpl(lock, predicate, includeWarnings, includeRetries); + collection = + _transactionContribsImpl(lock, predicate, includeWarnings, includeRetries, maxEntries); }); } catch (exception const& ex) { LOGS(_log, LOG_LVL_ERROR, context << "failed, exception: " << ex.what()); @@ -1871,12 +1876,14 @@ TransactionContribInfo DatabaseServicesMySQL::_transactionContribImpl(replica::L vector DatabaseServicesMySQL::_transactionContribsImpl(replica::Lock const& lock, string const& predicate, bool includeWarnings, - bool includeRetries) { + bool includeRetries, + size_t maxEntries) { string const context = _context(__func__) + "predicate=" + predicate + " "; LOGS(_log, LOG_LVL_DEBUG, context); vector collection; - string const query = _g.select(Sql::STAR) + _g.from("transaction_contrib") + _g.where(predicate); + string const query = _g.select(Sql::STAR) + _g.from("transaction_contrib") + _g.where(predicate) + + _g.limit(maxEntries); _conn->execute(query); if (_conn->hasResult()) { database::mysql::Row row; diff --git a/src/replica/services/DatabaseServicesMySQL.h b/src/replica/services/DatabaseServicesMySQL.h index aaed825a7..c97538192 100644 --- a/src/replica/services/DatabaseServicesMySQL.h +++ b/src/replica/services/DatabaseServicesMySQL.h @@ -171,7 +171,7 @@ class DatabaseServicesMySQL : public DatabaseServices { std::string const& workerName = std::string(), TransactionContribInfo::TypeSelector typeSelector = TransactionContribInfo::TypeSelector::SYNC_OR_ASYNC, - bool includeWarnings = false, bool includeRetries = false) final; + bool includeWarnings = false, bool includeRetries = false, size_t maxEntries = 0) final; std::vector transactionContribs( TransactionId transactionId = 0, @@ -179,14 +179,14 @@ class DatabaseServicesMySQL : public DatabaseServices { std::string const& table = std::string(), std::string const& workerName = std::string(), TransactionContribInfo::TypeSelector typeSelector = TransactionContribInfo::TypeSelector::SYNC_OR_ASYNC, - bool includeWarnings = false, bool includeRetries = false) final; + bool includeWarnings = false, bool includeRetries = false, size_t maxEntries = 0) final; std::vector transactionContribs( std::string const& database, std::string const& table = std::string(), std::string const& workerName = std::string(), TransactionContribInfo::TypeSelector typeSelector = TransactionContribInfo::TypeSelector::SYNC_OR_ASYNC, - bool includeWarnings = false, bool includeRetries = false) final; + bool includeWarnings = false, bool includeRetries = false, size_t maxEntries = 0) final; TransactionContribInfo createdTransactionContrib( TransactionContribInfo const& info, bool failed = false, @@ -351,7 +351,8 @@ class DatabaseServicesMySQL : public DatabaseServices { std::vector _transactionContribs(replica::Lock const& lock, std::string const& predicate, bool includeWarnings = false, - bool includeRetries = false); + bool includeRetries = false, + size_t maxEntries = 0); TransactionContribInfo _transactionContribImpl(replica::Lock const& lock, std::string const& predicate, bool includeWarnings = false, bool includeRetries = false); @@ -359,7 +360,8 @@ class DatabaseServicesMySQL : public DatabaseServices { std::vector _transactionContribsImpl(replica::Lock const& lock, std::string const& predicate, bool includeWarnings = false, - bool includeRetries = false); + bool includeRetries = false, + size_t maxEntries = 0); DatabaseIngestParam _ingestParamImpl(replica::Lock const& lock, std::string const& predicate); diff --git a/src/replica/services/DatabaseServicesPool.cc b/src/replica/services/DatabaseServicesPool.cc index 66ed28123..26a7e824e 100644 --- a/src/replica/services/DatabaseServicesPool.cc +++ b/src/replica/services/DatabaseServicesPool.cc @@ -295,27 +295,29 @@ TransactionContribInfo DatabaseServicesPool::transactionContrib(unsigned int id, vector DatabaseServicesPool::transactionContribs( TransactionId transactionId, string const& table, string const& workerName, - TransactionContribInfo::TypeSelector typeSelector, bool includeWarnings, bool includeRetries) { + TransactionContribInfo::TypeSelector typeSelector, bool includeWarnings, bool includeRetries, + size_t maxEntries) { ServiceAllocator service(shared_from_base()); return service()->transactionContribs(transactionId, table, workerName, typeSelector, includeWarnings, - includeRetries); + includeRetries, maxEntries); } vector DatabaseServicesPool::transactionContribs( TransactionId transactionId, TransactionContribInfo::Status status, string const& table, string const& workerName, TransactionContribInfo::TypeSelector typeSelector, bool includeWarnings, - bool includeRetries) { + bool includeRetries, size_t maxEntries) { ServiceAllocator service(shared_from_base()); return service()->transactionContribs(transactionId, status, table, workerName, typeSelector, - includeWarnings, includeRetries); + includeWarnings, includeRetries, maxEntries); } vector DatabaseServicesPool::transactionContribs( string const& database, string const& table, string const& workerName, - TransactionContribInfo::TypeSelector typeSelector, bool includeWarnings, bool includeRetries) { + TransactionContribInfo::TypeSelector typeSelector, bool includeWarnings, bool includeRetries, + size_t maxEntries) { ServiceAllocator service(shared_from_base()); return service()->transactionContribs(database, table, workerName, typeSelector, includeWarnings, - includeRetries); + includeRetries, maxEntries); } TransactionContribInfo DatabaseServicesPool::createdTransactionContrib( diff --git a/src/replica/services/DatabaseServicesPool.h b/src/replica/services/DatabaseServicesPool.h index 93087052f..b6138297b 100644 --- a/src/replica/services/DatabaseServicesPool.h +++ b/src/replica/services/DatabaseServicesPool.h @@ -172,7 +172,7 @@ class DatabaseServicesPool : public DatabaseServices { std::string const& workerName = std::string(), TransactionContribInfo::TypeSelector typeSelector = TransactionContribInfo::TypeSelector::SYNC_OR_ASYNC, - bool includeWarnings = false, bool includeRetries = false) final; + bool includeWarnings = false, bool includeRetries = false, size_t maxEntries = 0) final; std::vector transactionContribs( TransactionId transactionId = 0, @@ -180,14 +180,14 @@ class DatabaseServicesPool : public DatabaseServices { std::string const& table = std::string(), std::string const& workerName = std::string(), TransactionContribInfo::TypeSelector typeSelector = TransactionContribInfo::TypeSelector::SYNC_OR_ASYNC, - bool includeWarnings = false, bool includeRetries = false) final; + bool includeWarnings = false, bool includeRetries = false, size_t maxEntries = 0) final; std::vector transactionContribs( std::string const& database, std::string const& table = std::string(), std::string const& workerName = std::string(), TransactionContribInfo::TypeSelector typeSelector = TransactionContribInfo::TypeSelector::SYNC_OR_ASYNC, - bool includeWarnings = false, bool includeRetries = false) final; + bool includeWarnings = false, bool includeRetries = false, size_t maxEntries = 0) final; TransactionContribInfo createdTransactionContrib( TransactionContribInfo const& info, bool failed = false,