Skip to content

Commit

Permalink
Relaxed filters when puling transaction contributions (1)
Browse files Browse the repository at this point in the history
The new filter allows to set an optional limiting the total number
of contributions pulled from the database. The feature is used
by the Web Dashboard to prevent overloading the UI and to avoid
triggering heavy database queries.
  • Loading branch information
iagaponenko committed Sep 8, 2024
1 parent 1b02e60 commit 01b55f6
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 34 deletions.
27 changes: 18 additions & 9 deletions src/replica/services/DatabaseServices.h
Original file line number Diff line number Diff line change
Expand Up @@ -940,14 +940,17 @@ class DatabaseServices : public std::enable_shared_from_this<DatabaseServices> {
/// @param table (optional) the base name of a table (all tables if not provided)
/// @param workerName (optional) the name of a worker (all workers if not provided)
/// @param typeSelector (optional) type of the contributions
/// @param includeWarnings if 'true' then include info on the MySQL warnings after LOAD DATA INFILE
/// @param includeRetries if 'true' then include info on the failed retries to pull the input data
/// @param includeWarnings (optional) if 'true' then include info on the MySQL warnings after LOAD DATA
/// INFILE
/// @param includeRetries (optional) if 'true' then include info on the failed retries to pull the input
/// data
/// @param maxEntries (optional) the maximum number of contributions to be reported (no limit if 0).
virtual std::vector<TransactionContribInfo> transactionContribs(
TransactionId transactionId = 0, std::string const& table = std::string(),
std::string const& workerName = std::string(),
TransactionContribInfo::TypeSelector typeSelector =
TransactionContribInfo::TypeSelector::SYNC_OR_ASYNC,
bool includeWarnings = false, bool includeRetries = false) = 0;
bool includeWarnings = false, bool includeRetries = false, size_t maxEntries = 0) = 0;

/// @return contributions into a super-transaction for the given selectors
/// @param transactionId (optional) a unique identifier of the transaction (all transactions if not
Expand All @@ -957,29 +960,35 @@ class DatabaseServices : public std::enable_shared_from_this<DatabaseServices> {
/// @param table (optional) the base name of a table (all tables if not provided)
/// @param workerName (optional) the name of a worker (all workers if not provided)
/// @param typeSelector (optional) type of the contributions
/// @param includeWarnings if 'true' then include info on the MySQL warnings after LOAD DATA INFILE
/// @param includeRetries if 'true' then include info on the failed retries to pull the input data
/// @param includeWarnings (optional) if 'true' then include info on the MySQL warnings after LOAD DATA
/// INFILE
/// @param includeRetries (optional) if 'true' then include info on the failed retries to pull the input
/// data
/// @param maxEntries (optional) the maximum number of contributions to be reported (no limit if 0).
virtual std::vector<TransactionContribInfo> transactionContribs(
TransactionId transactionId = 0,
TransactionContribInfo::Status status = TransactionContribInfo::Status::IN_PROGRESS,
std::string const& table = std::string(), std::string const& workerName = std::string(),
TransactionContribInfo::TypeSelector typeSelector =
TransactionContribInfo::TypeSelector::SYNC_OR_ASYNC,
bool includeWarnings = false, bool includeRetries = false) = 0;
bool includeWarnings = false, bool includeRetries = false, size_t maxEntries = 0) = 0;

/// @return contributions into super-transactions for the given selectors
/// @param database the name of a database
/// @param table (optional) the base name of a table (all tables if not provided)
/// @param workerName (optional) the name of a worker (all workers if not provided)
/// @param typeSelector (optional) type of the contributions
/// @param includeWarnings if 'true' then include info on the MySQL warnings after LOAD DATA INFILE
/// @param includeRetries if 'true' then include info on the failed retries to pull the input data
/// @param includeWarnings (optional) if 'true' then include info on the MySQL warnings after LOAD DATA
/// INFILE
/// @param includeRetries (optional) if 'true' then include info on the failed retries to pull the input
/// data
/// @param maxEntries (optional) the maximum number of contributions to be reported (no limit if 0).
virtual std::vector<TransactionContribInfo> transactionContribs(
std::string const& database, std::string const& table = std::string(),
std::string const& workerName = std::string(),
TransactionContribInfo::TypeSelector typeSelector =
TransactionContribInfo::TypeSelector::SYNC_OR_ASYNC,
bool includeWarnings = false, bool includeRetries = false) = 0;
bool includeWarnings = false, bool includeRetries = false, size_t maxEntries = 0) = 0;

/**
* Insert the initial record on the contribution.
Expand Down
29 changes: 18 additions & 11 deletions src/replica/services/DatabaseServicesMySQL.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1625,7 +1625,8 @@ string DatabaseServicesMySQL::_typeSelectorPredicate(

vector<TransactionContribInfo> DatabaseServicesMySQL::transactionContribs(
TransactionId transactionId, string const& table, string const& workerName,
TransactionContribInfo::TypeSelector typeSelector, bool includeWarnings, bool includeRetries) {
TransactionContribInfo::TypeSelector typeSelector, bool includeWarnings, bool includeRetries,
size_t maxEntries) {
string const context = _context(__func__) + "transactionId=" + to_string(transactionId) +
" table=" + table + " worker=" + workerName + " " +
" typeSelector=" + TransactionContribInfo::typeSelector2str(typeSelector) + " ";
Expand All @@ -1636,13 +1637,13 @@ vector<TransactionContribInfo> DatabaseServicesMySQL::transactionContribs(
table.empty() ? "" : _g.eq("table", table),
workerName.empty() ? "" : _g.eq("worker", workerName),
_typeSelectorPredicate(typeSelector));
return _transactionContribs(lock, predicate, includeWarnings, includeRetries);
return _transactionContribs(lock, predicate, includeWarnings, includeRetries, maxEntries);
}

vector<TransactionContribInfo> DatabaseServicesMySQL::transactionContribs(
TransactionId transactionId, TransactionContribInfo::Status status, string const& table,
string const& workerName, TransactionContribInfo::TypeSelector typeSelector, bool includeWarnings,
bool includeRetries) {
bool includeRetries, size_t maxEntries) {
string const context = _context(__func__) + "transactionId=" + to_string(transactionId) +
" status=" + TransactionContribInfo::status2str(status) + " table=" + table +
" worker=" + workerName + " " +
Expand All @@ -1654,12 +1655,13 @@ vector<TransactionContribInfo> DatabaseServicesMySQL::transactionContribs(
table.empty() ? "" : _g.eq("table", table),
workerName.empty() ? "" : _g.eq("worker", workerName),
_typeSelectorPredicate(typeSelector));
return _transactionContribs(lock, predicate, includeWarnings, includeRetries);
return _transactionContribs(lock, predicate, includeWarnings, includeRetries, maxEntries);
}

vector<TransactionContribInfo> DatabaseServicesMySQL::transactionContribs(
string const& database, string const& table, string const& workerName,
TransactionContribInfo::TypeSelector typeSelector, bool includeWarnings, bool includeRetries) {
TransactionContribInfo::TypeSelector typeSelector, bool includeWarnings, bool includeRetries,
size_t maxEntries) {
string const context = _context(__func__) + "database=" + database + " table=" + table +
" worker=" + workerName + " " +
" typeSelector=" + TransactionContribInfo::typeSelector2str(typeSelector) + " ";
Expand All @@ -1671,7 +1673,7 @@ vector<TransactionContribInfo> DatabaseServicesMySQL::transactionContribs(
string const predicate = _g.packConds(
_g.eq("database", database), table.empty() ? "" : _g.eq("table", table),
workerName.empty() ? "" : _g.eq("worker", workerName), _typeSelectorPredicate(typeSelector));
return _transactionContribs(lock, predicate, includeWarnings, includeRetries);
return _transactionContribs(lock, predicate, includeWarnings, includeRetries, maxEntries);
}

TransactionContribInfo DatabaseServicesMySQL::createdTransactionContrib(
Expand Down Expand Up @@ -1840,13 +1842,16 @@ TransactionContribInfo DatabaseServicesMySQL::saveLastTransactionContribRetry(
vector<TransactionContribInfo> DatabaseServicesMySQL::_transactionContribs(replica::Lock const& lock,
string const& predicate,
bool includeWarnings,
bool includeRetries) {
string const context = _context(__func__) + "predicate=" + predicate + " ";
bool includeRetries,
size_t maxEntries) {
string const context =
_context(__func__) + "predicate=" + predicate + " maxEntries=" + to_string(maxEntries) + " ";
LOGS(_log, LOG_LVL_DEBUG, context);
vector<TransactionContribInfo> collection;
try {
_conn->executeInOwnTransaction([&](decltype(_conn) conn) {
collection = _transactionContribsImpl(lock, predicate, includeWarnings, includeRetries);
collection =
_transactionContribsImpl(lock, predicate, includeWarnings, includeRetries, maxEntries);
});
} catch (exception const& ex) {
LOGS(_log, LOG_LVL_ERROR, context << "failed, exception: " << ex.what());
Expand All @@ -1871,12 +1876,14 @@ TransactionContribInfo DatabaseServicesMySQL::_transactionContribImpl(replica::L
vector<TransactionContribInfo> DatabaseServicesMySQL::_transactionContribsImpl(replica::Lock const& lock,
string const& predicate,
bool includeWarnings,
bool includeRetries) {
bool includeRetries,
size_t maxEntries) {
string const context = _context(__func__) + "predicate=" + predicate + " ";
LOGS(_log, LOG_LVL_DEBUG, context);

vector<TransactionContribInfo> collection;
string const query = _g.select(Sql::STAR) + _g.from("transaction_contrib") + _g.where(predicate);
string const query = _g.select(Sql::STAR) + _g.from("transaction_contrib") + _g.where(predicate) +
_g.limit(maxEntries);
_conn->execute(query);
if (_conn->hasResult()) {
database::mysql::Row row;
Expand Down
12 changes: 7 additions & 5 deletions src/replica/services/DatabaseServicesMySQL.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,22 +171,22 @@ class DatabaseServicesMySQL : public DatabaseServices {
std::string const& workerName = std::string(),
TransactionContribInfo::TypeSelector typeSelector =
TransactionContribInfo::TypeSelector::SYNC_OR_ASYNC,
bool includeWarnings = false, bool includeRetries = false) final;
bool includeWarnings = false, bool includeRetries = false, size_t maxEntries = 0) final;

std::vector<TransactionContribInfo> transactionContribs(
TransactionId transactionId = 0,
TransactionContribInfo::Status status = TransactionContribInfo::Status::IN_PROGRESS,
std::string const& table = std::string(), std::string const& workerName = std::string(),
TransactionContribInfo::TypeSelector typeSelector =
TransactionContribInfo::TypeSelector::SYNC_OR_ASYNC,
bool includeWarnings = false, bool includeRetries = false) final;
bool includeWarnings = false, bool includeRetries = false, size_t maxEntries = 0) final;

std::vector<TransactionContribInfo> transactionContribs(
std::string const& database, std::string const& table = std::string(),
std::string const& workerName = std::string(),
TransactionContribInfo::TypeSelector typeSelector =
TransactionContribInfo::TypeSelector::SYNC_OR_ASYNC,
bool includeWarnings = false, bool includeRetries = false) final;
bool includeWarnings = false, bool includeRetries = false, size_t maxEntries = 0) final;

TransactionContribInfo createdTransactionContrib(
TransactionContribInfo const& info, bool failed = false,
Expand Down Expand Up @@ -351,15 +351,17 @@ class DatabaseServicesMySQL : public DatabaseServices {
std::vector<TransactionContribInfo> _transactionContribs(replica::Lock const& lock,
std::string const& predicate,
bool includeWarnings = false,
bool includeRetries = false);
bool includeRetries = false,
size_t maxEntries = 0);

TransactionContribInfo _transactionContribImpl(replica::Lock const& lock, std::string const& predicate,
bool includeWarnings = false, bool includeRetries = false);

std::vector<TransactionContribInfo> _transactionContribsImpl(replica::Lock const& lock,
std::string const& predicate,
bool includeWarnings = false,
bool includeRetries = false);
bool includeRetries = false,
size_t maxEntries = 0);

DatabaseIngestParam _ingestParamImpl(replica::Lock const& lock, std::string const& predicate);

Expand Down
14 changes: 8 additions & 6 deletions src/replica/services/DatabaseServicesPool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -295,27 +295,29 @@ TransactionContribInfo DatabaseServicesPool::transactionContrib(unsigned int id,

vector<TransactionContribInfo> DatabaseServicesPool::transactionContribs(
TransactionId transactionId, string const& table, string const& workerName,
TransactionContribInfo::TypeSelector typeSelector, bool includeWarnings, bool includeRetries) {
TransactionContribInfo::TypeSelector typeSelector, bool includeWarnings, bool includeRetries,
size_t maxEntries) {
ServiceAllocator service(shared_from_base<DatabaseServicesPool>());
return service()->transactionContribs(transactionId, table, workerName, typeSelector, includeWarnings,
includeRetries);
includeRetries, maxEntries);
}

vector<TransactionContribInfo> DatabaseServicesPool::transactionContribs(
TransactionId transactionId, TransactionContribInfo::Status status, string const& table,
string const& workerName, TransactionContribInfo::TypeSelector typeSelector, bool includeWarnings,
bool includeRetries) {
bool includeRetries, size_t maxEntries) {
ServiceAllocator service(shared_from_base<DatabaseServicesPool>());
return service()->transactionContribs(transactionId, status, table, workerName, typeSelector,
includeWarnings, includeRetries);
includeWarnings, includeRetries, maxEntries);
}

vector<TransactionContribInfo> DatabaseServicesPool::transactionContribs(
string const& database, string const& table, string const& workerName,
TransactionContribInfo::TypeSelector typeSelector, bool includeWarnings, bool includeRetries) {
TransactionContribInfo::TypeSelector typeSelector, bool includeWarnings, bool includeRetries,
size_t maxEntries) {
ServiceAllocator service(shared_from_base<DatabaseServicesPool>());
return service()->transactionContribs(database, table, workerName, typeSelector, includeWarnings,
includeRetries);
includeRetries, maxEntries);
}

TransactionContribInfo DatabaseServicesPool::createdTransactionContrib(
Expand Down
6 changes: 3 additions & 3 deletions src/replica/services/DatabaseServicesPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,22 +172,22 @@ class DatabaseServicesPool : public DatabaseServices {
std::string const& workerName = std::string(),
TransactionContribInfo::TypeSelector typeSelector =
TransactionContribInfo::TypeSelector::SYNC_OR_ASYNC,
bool includeWarnings = false, bool includeRetries = false) final;
bool includeWarnings = false, bool includeRetries = false, size_t maxEntries = 0) final;

std::vector<TransactionContribInfo> transactionContribs(
TransactionId transactionId = 0,
TransactionContribInfo::Status status = TransactionContribInfo::Status::IN_PROGRESS,
std::string const& table = std::string(), std::string const& workerName = std::string(),
TransactionContribInfo::TypeSelector typeSelector =
TransactionContribInfo::TypeSelector::SYNC_OR_ASYNC,
bool includeWarnings = false, bool includeRetries = false) final;
bool includeWarnings = false, bool includeRetries = false, size_t maxEntries = 0) final;

std::vector<TransactionContribInfo> transactionContribs(
std::string const& database, std::string const& table = std::string(),
std::string const& workerName = std::string(),
TransactionContribInfo::TypeSelector typeSelector =
TransactionContribInfo::TypeSelector::SYNC_OR_ASYNC,
bool includeWarnings = false, bool includeRetries = false) final;
bool includeWarnings = false, bool includeRetries = false, size_t maxEntries = 0) final;

TransactionContribInfo createdTransactionContrib(
TransactionContribInfo const& info, bool failed = false,
Expand Down

0 comments on commit 01b55f6

Please sign in to comment.