diff --git a/src/replica/services/DatabaseServices.h b/src/replica/services/DatabaseServices.h index f75635a69..7737f7337 100644 --- a/src/replica/services/DatabaseServices.h +++ b/src/replica/services/DatabaseServices.h @@ -940,14 +940,17 @@ class DatabaseServices : public std::enable_shared_from_this { /// @param table (optional) the base name of a table (all tables if not provided) /// @param workerName (optional) the name of a worker (all workers if not provided) /// @param typeSelector (optional) type of the contributions - /// @param includeWarnings if 'true' then include info on the MySQL warnings after LOAD DATA INFILE - /// @param includeRetries if 'true' then include info on the failed retries to pull the input data + /// @param includeWarnings (optional) if 'true' then include info on the MySQL warnings after LOAD DATA + /// INFILE + /// @param includeRetries (optional) if 'true' then include info on the failed retries to pull the input + /// data + /// @param maxEntries (optional) the maximum number of contributions to be reported (no limit if 0). virtual std::vector transactionContribs( TransactionId transactionId = 0, std::string const& table = std::string(), std::string const& workerName = std::string(), TransactionContribInfo::TypeSelector typeSelector = TransactionContribInfo::TypeSelector::SYNC_OR_ASYNC, - bool includeWarnings = false, bool includeRetries = false) = 0; + bool includeWarnings = false, bool includeRetries = false, size_t maxEntries = 0) = 0; /// @return contributions into a super-transaction for the given selectors /// @param transactionId (optional) a unique identifier of the transaction (all transactions if not @@ -957,29 +960,35 @@ class DatabaseServices : public std::enable_shared_from_this { /// @param table (optional) the base name of a table (all tables if not provided) /// @param workerName (optional) the name of a worker (all workers if not provided) /// @param typeSelector (optional) type of the contributions - /// @param includeWarnings if 'true' then include info on the MySQL warnings after LOAD DATA INFILE - /// @param includeRetries if 'true' then include info on the failed retries to pull the input data + /// @param includeWarnings (optional) if 'true' then include info on the MySQL warnings after LOAD DATA + /// INFILE + /// @param includeRetries (optional) if 'true' then include info on the failed retries to pull the input + /// data + /// @param maxEntries (optional) the maximum number of contributions to be reported (no limit if 0). virtual std::vector transactionContribs( TransactionId transactionId = 0, TransactionContribInfo::Status status = TransactionContribInfo::Status::IN_PROGRESS, std::string const& table = std::string(), std::string const& workerName = std::string(), TransactionContribInfo::TypeSelector typeSelector = TransactionContribInfo::TypeSelector::SYNC_OR_ASYNC, - bool includeWarnings = false, bool includeRetries = false) = 0; + bool includeWarnings = false, bool includeRetries = false, size_t maxEntries = 0) = 0; /// @return contributions into super-transactions for the given selectors /// @param database the name of a database /// @param table (optional) the base name of a table (all tables if not provided) /// @param workerName (optional) the name of a worker (all workers if not provided) /// @param typeSelector (optional) type of the contributions - /// @param includeWarnings if 'true' then include info on the MySQL warnings after LOAD DATA INFILE - /// @param includeRetries if 'true' then include info on the failed retries to pull the input data + /// @param includeWarnings (optional) if 'true' then include info on the MySQL warnings after LOAD DATA + /// INFILE + /// @param includeRetries (optional) if 'true' then include info on the failed retries to pull the input + /// data + /// @param maxEntries (optional) the maximum number of contributions to be reported (no limit if 0). virtual std::vector transactionContribs( std::string const& database, std::string const& table = std::string(), std::string const& workerName = std::string(), TransactionContribInfo::TypeSelector typeSelector = TransactionContribInfo::TypeSelector::SYNC_OR_ASYNC, - bool includeWarnings = false, bool includeRetries = false) = 0; + bool includeWarnings = false, bool includeRetries = false, size_t maxEntries = 0) = 0; /** * Insert the initial record on the contribution. diff --git a/src/replica/services/DatabaseServicesMySQL.cc b/src/replica/services/DatabaseServicesMySQL.cc index dcdcba8a1..7a46a9246 100644 --- a/src/replica/services/DatabaseServicesMySQL.cc +++ b/src/replica/services/DatabaseServicesMySQL.cc @@ -1625,7 +1625,8 @@ string DatabaseServicesMySQL::_typeSelectorPredicate( vector DatabaseServicesMySQL::transactionContribs( TransactionId transactionId, string const& table, string const& workerName, - TransactionContribInfo::TypeSelector typeSelector, bool includeWarnings, bool includeRetries) { + TransactionContribInfo::TypeSelector typeSelector, bool includeWarnings, bool includeRetries, + size_t maxEntries) { string const context = _context(__func__) + "transactionId=" + to_string(transactionId) + " table=" + table + " worker=" + workerName + " " + " typeSelector=" + TransactionContribInfo::typeSelector2str(typeSelector) + " "; @@ -1636,13 +1637,13 @@ vector DatabaseServicesMySQL::transactionContribs( table.empty() ? "" : _g.eq("table", table), workerName.empty() ? "" : _g.eq("worker", workerName), _typeSelectorPredicate(typeSelector)); - return _transactionContribs(lock, predicate, includeWarnings, includeRetries); + return _transactionContribs(lock, predicate, includeWarnings, includeRetries, maxEntries); } vector DatabaseServicesMySQL::transactionContribs( TransactionId transactionId, TransactionContribInfo::Status status, string const& table, string const& workerName, TransactionContribInfo::TypeSelector typeSelector, bool includeWarnings, - bool includeRetries) { + bool includeRetries, size_t maxEntries) { string const context = _context(__func__) + "transactionId=" + to_string(transactionId) + " status=" + TransactionContribInfo::status2str(status) + " table=" + table + " worker=" + workerName + " " + @@ -1654,12 +1655,13 @@ vector DatabaseServicesMySQL::transactionContribs( table.empty() ? "" : _g.eq("table", table), workerName.empty() ? "" : _g.eq("worker", workerName), _typeSelectorPredicate(typeSelector)); - return _transactionContribs(lock, predicate, includeWarnings, includeRetries); + return _transactionContribs(lock, predicate, includeWarnings, includeRetries, maxEntries); } vector DatabaseServicesMySQL::transactionContribs( string const& database, string const& table, string const& workerName, - TransactionContribInfo::TypeSelector typeSelector, bool includeWarnings, bool includeRetries) { + TransactionContribInfo::TypeSelector typeSelector, bool includeWarnings, bool includeRetries, + size_t maxEntries) { string const context = _context(__func__) + "database=" + database + " table=" + table + " worker=" + workerName + " " + " typeSelector=" + TransactionContribInfo::typeSelector2str(typeSelector) + " "; @@ -1671,7 +1673,7 @@ vector DatabaseServicesMySQL::transactionContribs( string const predicate = _g.packConds( _g.eq("database", database), table.empty() ? "" : _g.eq("table", table), workerName.empty() ? "" : _g.eq("worker", workerName), _typeSelectorPredicate(typeSelector)); - return _transactionContribs(lock, predicate, includeWarnings, includeRetries); + return _transactionContribs(lock, predicate, includeWarnings, includeRetries, maxEntries); } TransactionContribInfo DatabaseServicesMySQL::createdTransactionContrib( @@ -1840,13 +1842,16 @@ TransactionContribInfo DatabaseServicesMySQL::saveLastTransactionContribRetry( vector DatabaseServicesMySQL::_transactionContribs(replica::Lock const& lock, string const& predicate, bool includeWarnings, - bool includeRetries) { - string const context = _context(__func__) + "predicate=" + predicate + " "; + bool includeRetries, + size_t maxEntries) { + string const context = + _context(__func__) + "predicate=" + predicate + " maxEntries=" + to_string(maxEntries) + " "; LOGS(_log, LOG_LVL_DEBUG, context); vector collection; try { _conn->executeInOwnTransaction([&](decltype(_conn) conn) { - collection = _transactionContribsImpl(lock, predicate, includeWarnings, includeRetries); + collection = + _transactionContribsImpl(lock, predicate, includeWarnings, includeRetries, maxEntries); }); } catch (exception const& ex) { LOGS(_log, LOG_LVL_ERROR, context << "failed, exception: " << ex.what()); @@ -1871,12 +1876,14 @@ TransactionContribInfo DatabaseServicesMySQL::_transactionContribImpl(replica::L vector DatabaseServicesMySQL::_transactionContribsImpl(replica::Lock const& lock, string const& predicate, bool includeWarnings, - bool includeRetries) { + bool includeRetries, + size_t maxEntries) { string const context = _context(__func__) + "predicate=" + predicate + " "; LOGS(_log, LOG_LVL_DEBUG, context); vector collection; - string const query = _g.select(Sql::STAR) + _g.from("transaction_contrib") + _g.where(predicate); + string const query = _g.select(Sql::STAR) + _g.from("transaction_contrib") + _g.where(predicate) + + _g.limit(maxEntries); _conn->execute(query); if (_conn->hasResult()) { database::mysql::Row row; diff --git a/src/replica/services/DatabaseServicesMySQL.h b/src/replica/services/DatabaseServicesMySQL.h index aaed825a7..c97538192 100644 --- a/src/replica/services/DatabaseServicesMySQL.h +++ b/src/replica/services/DatabaseServicesMySQL.h @@ -171,7 +171,7 @@ class DatabaseServicesMySQL : public DatabaseServices { std::string const& workerName = std::string(), TransactionContribInfo::TypeSelector typeSelector = TransactionContribInfo::TypeSelector::SYNC_OR_ASYNC, - bool includeWarnings = false, bool includeRetries = false) final; + bool includeWarnings = false, bool includeRetries = false, size_t maxEntries = 0) final; std::vector transactionContribs( TransactionId transactionId = 0, @@ -179,14 +179,14 @@ class DatabaseServicesMySQL : public DatabaseServices { std::string const& table = std::string(), std::string const& workerName = std::string(), TransactionContribInfo::TypeSelector typeSelector = TransactionContribInfo::TypeSelector::SYNC_OR_ASYNC, - bool includeWarnings = false, bool includeRetries = false) final; + bool includeWarnings = false, bool includeRetries = false, size_t maxEntries = 0) final; std::vector transactionContribs( std::string const& database, std::string const& table = std::string(), std::string const& workerName = std::string(), TransactionContribInfo::TypeSelector typeSelector = TransactionContribInfo::TypeSelector::SYNC_OR_ASYNC, - bool includeWarnings = false, bool includeRetries = false) final; + bool includeWarnings = false, bool includeRetries = false, size_t maxEntries = 0) final; TransactionContribInfo createdTransactionContrib( TransactionContribInfo const& info, bool failed = false, @@ -351,7 +351,8 @@ class DatabaseServicesMySQL : public DatabaseServices { std::vector _transactionContribs(replica::Lock const& lock, std::string const& predicate, bool includeWarnings = false, - bool includeRetries = false); + bool includeRetries = false, + size_t maxEntries = 0); TransactionContribInfo _transactionContribImpl(replica::Lock const& lock, std::string const& predicate, bool includeWarnings = false, bool includeRetries = false); @@ -359,7 +360,8 @@ class DatabaseServicesMySQL : public DatabaseServices { std::vector _transactionContribsImpl(replica::Lock const& lock, std::string const& predicate, bool includeWarnings = false, - bool includeRetries = false); + bool includeRetries = false, + size_t maxEntries = 0); DatabaseIngestParam _ingestParamImpl(replica::Lock const& lock, std::string const& predicate); diff --git a/src/replica/services/DatabaseServicesPool.cc b/src/replica/services/DatabaseServicesPool.cc index 66ed28123..26a7e824e 100644 --- a/src/replica/services/DatabaseServicesPool.cc +++ b/src/replica/services/DatabaseServicesPool.cc @@ -295,27 +295,29 @@ TransactionContribInfo DatabaseServicesPool::transactionContrib(unsigned int id, vector DatabaseServicesPool::transactionContribs( TransactionId transactionId, string const& table, string const& workerName, - TransactionContribInfo::TypeSelector typeSelector, bool includeWarnings, bool includeRetries) { + TransactionContribInfo::TypeSelector typeSelector, bool includeWarnings, bool includeRetries, + size_t maxEntries) { ServiceAllocator service(shared_from_base()); return service()->transactionContribs(transactionId, table, workerName, typeSelector, includeWarnings, - includeRetries); + includeRetries, maxEntries); } vector DatabaseServicesPool::transactionContribs( TransactionId transactionId, TransactionContribInfo::Status status, string const& table, string const& workerName, TransactionContribInfo::TypeSelector typeSelector, bool includeWarnings, - bool includeRetries) { + bool includeRetries, size_t maxEntries) { ServiceAllocator service(shared_from_base()); return service()->transactionContribs(transactionId, status, table, workerName, typeSelector, - includeWarnings, includeRetries); + includeWarnings, includeRetries, maxEntries); } vector DatabaseServicesPool::transactionContribs( string const& database, string const& table, string const& workerName, - TransactionContribInfo::TypeSelector typeSelector, bool includeWarnings, bool includeRetries) { + TransactionContribInfo::TypeSelector typeSelector, bool includeWarnings, bool includeRetries, + size_t maxEntries) { ServiceAllocator service(shared_from_base()); return service()->transactionContribs(database, table, workerName, typeSelector, includeWarnings, - includeRetries); + includeRetries, maxEntries); } TransactionContribInfo DatabaseServicesPool::createdTransactionContrib( diff --git a/src/replica/services/DatabaseServicesPool.h b/src/replica/services/DatabaseServicesPool.h index 93087052f..b6138297b 100644 --- a/src/replica/services/DatabaseServicesPool.h +++ b/src/replica/services/DatabaseServicesPool.h @@ -172,7 +172,7 @@ class DatabaseServicesPool : public DatabaseServices { std::string const& workerName = std::string(), TransactionContribInfo::TypeSelector typeSelector = TransactionContribInfo::TypeSelector::SYNC_OR_ASYNC, - bool includeWarnings = false, bool includeRetries = false) final; + bool includeWarnings = false, bool includeRetries = false, size_t maxEntries = 0) final; std::vector transactionContribs( TransactionId transactionId = 0, @@ -180,14 +180,14 @@ class DatabaseServicesPool : public DatabaseServices { std::string const& table = std::string(), std::string const& workerName = std::string(), TransactionContribInfo::TypeSelector typeSelector = TransactionContribInfo::TypeSelector::SYNC_OR_ASYNC, - bool includeWarnings = false, bool includeRetries = false) final; + bool includeWarnings = false, bool includeRetries = false, size_t maxEntries = 0) final; std::vector transactionContribs( std::string const& database, std::string const& table = std::string(), std::string const& workerName = std::string(), TransactionContribInfo::TypeSelector typeSelector = TransactionContribInfo::TypeSelector::SYNC_OR_ASYNC, - bool includeWarnings = false, bool includeRetries = false) final; + bool includeWarnings = false, bool includeRetries = false, size_t maxEntries = 0) final; TransactionContribInfo createdTransactionContrib( TransactionContribInfo const& info, bool failed = false,