Skip to content

Commit

Permalink
TEMPORARY: instrumented ingest manager to log more debug info
Browse files Browse the repository at this point in the history
  • Loading branch information
iagaponenko committed Sep 5, 2024
1 parent 098f5ee commit 7d18149
Showing 1 changed file with 37 additions and 0 deletions.
37 changes: 37 additions & 0 deletions src/replica/ingest/IngestRequestMgr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -227,28 +227,33 @@ IngestRequestMgr::IngestRequestMgr(shared_ptr<ServiceProvider> const& servicePro
_resourceMgr(IngestResourceMgrP::create(serviceProvider)) {}

TransactionContribInfo IngestRequestMgr::find(unsigned int id) {
LOGS(_log, LOG_LVL_INFO, context_ << __func__ << "(_asyncSubmitRequest) 1");
unique_lock<mutex> lock(_mtx);
for (auto&& databaseItr : _input) {
list<shared_ptr<IngestRequest>> const& queue = databaseItr.second;
auto const inputItr = find_if(queue.cbegin(), queue.cend(), [id](auto const& request) {
return request->transactionContribInfo().id == id;
});
if (inputItr != queue.cend()) {
LOGS(_log, LOG_LVL_INFO, context_ << __func__ << "(_asyncSubmitRequest) 2");
return (*inputItr)->transactionContribInfo();
}
}
auto const inProgressItr = _inProgress.find(id);
if (inProgressItr != _inProgress.cend()) {
LOGS(_log, LOG_LVL_INFO, context_ << __func__ << "(_asyncSubmitRequest) 3");
return inProgressItr->second->transactionContribInfo();
}
auto const outputItr = _output.find(id);
if (outputItr != _output.cend()) {
LOGS(_log, LOG_LVL_INFO, context_ << __func__ << "(_asyncSubmitRequest) 4");
return outputItr->second->transactionContribInfo();
}
try {
// This extra test is needed to allow unit testing the class w/o
// making side effects.
if (_serviceProvider != nullptr) {
LOGS(_log, LOG_LVL_INFO, context_ << __func__ << "(_asyncSubmitRequest) 5");
return _serviceProvider->databaseServices()->transactionContrib(id);
}
} catch (DatabaseServicesNotFound const& ex) {
Expand All @@ -261,6 +266,7 @@ IngestRequestMgr::IngestRequestMgr(shared_ptr<IngestResourceMgr> const& resource
: _resourceMgr(resourceMgr == nullptr ? IngestResourceMgrT::create() : resourceMgr) {}

void IngestRequestMgr::submit(shared_ptr<IngestRequest> const& request) {
LOGS(_log, LOG_LVL_INFO, context_ << __func__ << "(_asyncSubmitRequest) 1");
if (request == nullptr) {
throw invalid_argument(context_ + string(__func__) + " null pointer passed into the method");
}
Expand All @@ -280,17 +286,20 @@ void IngestRequestMgr::submit(shared_ptr<IngestRequest> const& request) {
// Concurrency has increased. Unblock all processing threads.
lock.unlock();
_cv.notify_all();
LOGS(_log, LOG_LVL_INFO, context_ << __func__ << "(_asyncSubmitRequest) 2");
} else {
// Concurrency has not changed, or got lower. Unblock one processing thread for
// the new request only.
lock.unlock();
//_cv.notify_one();
// TEMPORARY FIX: notify all threads to avoid a deadlock.
_cv.notify_all();
LOGS(_log, LOG_LVL_INFO, context_ << __func__ << "(_asyncSubmitRequest) 3");
}
}

TransactionContribInfo IngestRequestMgr::cancel(unsigned int id) {
LOGS(_log, LOG_LVL_INFO, context_ << __func__ << "(_asyncSubmitRequest) 1");
unique_lock<mutex> lock(_mtx);
// Scan input queues of all active databases.
for (auto&& databaseItr : _input) {
Expand Down Expand Up @@ -320,6 +329,7 @@ TransactionContribInfo IngestRequestMgr::cancel(unsigned int id) {
_cv.notify_all();
}
}
LOGS(_log, LOG_LVL_INFO, context_ << __func__ << "(_asyncSubmitRequest) 2");
return request->transactionContribInfo();
}
}
Expand All @@ -331,31 +341,41 @@ TransactionContribInfo IngestRequestMgr::cancel(unsigned int id) {
// may be involved into the blocking operations such as reading/writing
// from disk, network, or interacting with MySQL at this time.
inProgressItr->second->cancel();
LOGS(_log, LOG_LVL_INFO, context_ << __func__ << "(_asyncSubmitRequest) 3");
return inProgressItr->second->transactionContribInfo();
}
auto const outputItr = _output.find(id);
if (outputItr != _output.cend()) {
// No cancellation needed for contributions that have already been processed.
// A client will receive the actual completion status of the request.
LOGS(_log, LOG_LVL_INFO, context_ << __func__ << "(_asyncSubmitRequest) 4");
return outputItr->second->transactionContribInfo();
}
throw IngestRequestNotFound(context_ + string(__func__) + " request " + to_string(id) + " was not found");
}

shared_ptr<IngestRequest> IngestRequestMgr::next() {
LOGS(_log, LOG_LVL_INFO, context_ << __func__ << "(_asyncSubmitRequest) 1");
unique_lock<mutex> lock(_mtx);
shared_ptr<IngestRequest> request = _next(lock);
if (request == nullptr) {
_cv.wait(lock, [&]() {
// The mutex is guaranteed to be re-locked here.
request = _next(lock);
LOGS(_log, LOG_LVL_INFO,
context_ << __func__ << "(_asyncSubmitRequest) 2 request: "
<< (request == nullptr ? "null" : "not null"));
return request != nullptr;
});
}
LOGS(_log, LOG_LVL_INFO,
context_ << __func__
<< "(_asyncSubmitRequest) 3 request: " << (request == nullptr ? "null" : "not null"));
return request;
}

shared_ptr<IngestRequest> IngestRequestMgr::next(chrono::milliseconds const& ivalMsec) {
LOGS(_log, LOG_LVL_INFO, context_ << __func__ << "(_asyncSubmitRequest) 1");
string const context = context_ + string(__func__) + " ";
if (ivalMsec.count() == 0) throw invalid_argument(context + "the interval can not be 0.");
unique_lock<mutex> lock(_mtx);
Expand All @@ -364,17 +384,24 @@ shared_ptr<IngestRequest> IngestRequestMgr::next(chrono::milliseconds const& iva
bool const ivalExpired = !_cv.wait_for(lock, ivalMsec, [&]() {
// The mutex is guaranteed to be re-locked here.
request = _next(lock);
LOGS(_log, LOG_LVL_INFO,
context_ << __func__ << "(_asyncSubmitRequest) 2 request: "
<< (request == nullptr ? "null" : "not null"));
return request != nullptr;
});
if (ivalExpired) {
throw IngestRequestTimerExpired(context + "no request was found in the queue after waiting for " +
to_string(ivalMsec.count()) + "ms");
}
}
LOGS(_log, LOG_LVL_INFO,
context_ << __func__
<< "(_asyncSubmitRequest) 3 request: " << (request == nullptr ? "null" : "not null"));
return request;
}

void IngestRequestMgr::completed(unsigned int id) {
LOGS(_log, LOG_LVL_INFO, context_ << __func__ << "(_asyncSubmitRequest) 1");
string const context = context_ + string(__func__) + " ";
unique_lock<mutex> lock(_mtx);
auto const inProgressItr = _inProgress.find(id);
Expand All @@ -395,14 +422,17 @@ void IngestRequestMgr::completed(unsigned int id) {
// Concurrency has increased. Unblock all processing threads.
lock.unlock();
_cv.notify_all();
LOGS(_log, LOG_LVL_INFO, context_ << __func__ << "(_asyncSubmitRequest) 2");
return;
}
}
lock.unlock();
_cv.notify_one();
LOGS(_log, LOG_LVL_INFO, context_ << __func__ << "(_asyncSubmitRequest) 3");
}

shared_ptr<IngestRequest> IngestRequestMgr::_next(unique_lock<mutex> const& lock) {
LOGS(_log, LOG_LVL_INFO, context_ << __func__ << "(_asyncSubmitRequest) 1");
shared_ptr<IngestRequest> request;
for (auto&& databaseItr : _input) {
string const& databaseName = databaseItr.first;
Expand Down Expand Up @@ -433,10 +463,14 @@ shared_ptr<IngestRequest> IngestRequestMgr::_next(unique_lock<mutex> const& lock
_maxConcurrency.erase(contrib.database);
}
}
LOGS(_log, LOG_LVL_INFO,
context_ << __func__
<< " 2(_asyncSubmitRequest) request: " << (request == nullptr ? "null" : "not null"));
return request;
}

bool IngestRequestMgr::_updateMaxConcurrency(unique_lock<std::mutex> const& lock, string const& database) {
LOGS(_log, LOG_LVL_INFO, context_ << __func__ << "(_asyncSubmitRequest) 1");
bool concurrencyHasIncreased = false;
// The previous concurrency limit will be initialize with 0, if the database
// wasn't registered in the dictionary.
Expand All @@ -450,6 +484,9 @@ bool IngestRequestMgr::_updateMaxConcurrency(unique_lock<std::mutex> const& lock
((maxConcurrencyRef != 0) && (newMaxConcurrency > maxConcurrencyRef));
maxConcurrencyRef = newMaxConcurrency;
}
LOGS(_log, LOG_LVL_INFO,
context_ << __func__ << "(_asyncSubmitRequest) 2 concurrencyHasIncreased: "
<< (concurrencyHasIncreased ? "1" : "0"));
return concurrencyHasIncreased;
}

Expand Down

0 comments on commit 7d18149

Please sign in to comment.