diff --git a/src/replica/ingest/IngestRequestMgr.cc b/src/replica/ingest/IngestRequestMgr.cc index 87e528ed8..a1208b0c8 100644 --- a/src/replica/ingest/IngestRequestMgr.cc +++ b/src/replica/ingest/IngestRequestMgr.cc @@ -227,6 +227,7 @@ IngestRequestMgr::IngestRequestMgr(shared_ptr const& servicePro _resourceMgr(IngestResourceMgrP::create(serviceProvider)) {} TransactionContribInfo IngestRequestMgr::find(unsigned int id) { + LOGS(_log, LOG_LVL_INFO, context_ << __func__ << "(_asyncSubmitRequest) 1"); unique_lock lock(_mtx); for (auto&& databaseItr : _input) { list> const& queue = databaseItr.second; @@ -234,21 +235,25 @@ TransactionContribInfo IngestRequestMgr::find(unsigned int id) { return request->transactionContribInfo().id == id; }); if (inputItr != queue.cend()) { + LOGS(_log, LOG_LVL_INFO, context_ << __func__ << "(_asyncSubmitRequest) 2"); return (*inputItr)->transactionContribInfo(); } } auto const inProgressItr = _inProgress.find(id); if (inProgressItr != _inProgress.cend()) { + LOGS(_log, LOG_LVL_INFO, context_ << __func__ << "(_asyncSubmitRequest) 3"); return inProgressItr->second->transactionContribInfo(); } auto const outputItr = _output.find(id); if (outputItr != _output.cend()) { + LOGS(_log, LOG_LVL_INFO, context_ << __func__ << "(_asyncSubmitRequest) 4"); return outputItr->second->transactionContribInfo(); } try { // This extra test is needed to allow unit testing the class w/o // making side effects. if (_serviceProvider != nullptr) { + LOGS(_log, LOG_LVL_INFO, context_ << __func__ << "(_asyncSubmitRequest) 5"); return _serviceProvider->databaseServices()->transactionContrib(id); } } catch (DatabaseServicesNotFound const& ex) { @@ -261,6 +266,7 @@ IngestRequestMgr::IngestRequestMgr(shared_ptr const& resource : _resourceMgr(resourceMgr == nullptr ? IngestResourceMgrT::create() : resourceMgr) {} void IngestRequestMgr::submit(shared_ptr const& request) { + LOGS(_log, LOG_LVL_INFO, context_ << __func__ << "(_asyncSubmitRequest) 1"); if (request == nullptr) { throw invalid_argument(context_ + string(__func__) + " null pointer passed into the method"); } @@ -280,6 +286,7 @@ void IngestRequestMgr::submit(shared_ptr const& request) { // Concurrency has increased. Unblock all processing threads. lock.unlock(); _cv.notify_all(); + LOGS(_log, LOG_LVL_INFO, context_ << __func__ << "(_asyncSubmitRequest) 2"); } else { // Concurrency has not changed, or got lower. Unblock one processing thread for // the new request only. @@ -287,10 +294,12 @@ void IngestRequestMgr::submit(shared_ptr const& request) { //_cv.notify_one(); // TEMPORARY FIX: notify all threads to avoid a deadlock. _cv.notify_all(); + LOGS(_log, LOG_LVL_INFO, context_ << __func__ << "(_asyncSubmitRequest) 3"); } } TransactionContribInfo IngestRequestMgr::cancel(unsigned int id) { + LOGS(_log, LOG_LVL_INFO, context_ << __func__ << "(_asyncSubmitRequest) 1"); unique_lock lock(_mtx); // Scan input queues of all active databases. for (auto&& databaseItr : _input) { @@ -320,6 +329,7 @@ TransactionContribInfo IngestRequestMgr::cancel(unsigned int id) { _cv.notify_all(); } } + LOGS(_log, LOG_LVL_INFO, context_ << __func__ << "(_asyncSubmitRequest) 2"); return request->transactionContribInfo(); } } @@ -331,31 +341,41 @@ TransactionContribInfo IngestRequestMgr::cancel(unsigned int id) { // may be involved into the blocking operations such as reading/writing // from disk, network, or interacting with MySQL at this time. inProgressItr->second->cancel(); + LOGS(_log, LOG_LVL_INFO, context_ << __func__ << "(_asyncSubmitRequest) 3"); return inProgressItr->second->transactionContribInfo(); } auto const outputItr = _output.find(id); if (outputItr != _output.cend()) { // No cancellation needed for contributions that have already been processed. // A client will receive the actual completion status of the request. + LOGS(_log, LOG_LVL_INFO, context_ << __func__ << "(_asyncSubmitRequest) 4"); return outputItr->second->transactionContribInfo(); } throw IngestRequestNotFound(context_ + string(__func__) + " request " + to_string(id) + " was not found"); } shared_ptr IngestRequestMgr::next() { + LOGS(_log, LOG_LVL_INFO, context_ << __func__ << "(_asyncSubmitRequest) 1"); unique_lock lock(_mtx); shared_ptr request = _next(lock); if (request == nullptr) { _cv.wait(lock, [&]() { // The mutex is guaranteed to be re-locked here. request = _next(lock); + LOGS(_log, LOG_LVL_INFO, + context_ << __func__ << "(_asyncSubmitRequest) 2 request: " + << (request == nullptr ? "null" : "not null")); return request != nullptr; }); } + LOGS(_log, LOG_LVL_INFO, + context_ << __func__ + << "(_asyncSubmitRequest) 3 request: " << (request == nullptr ? "null" : "not null")); return request; } shared_ptr IngestRequestMgr::next(chrono::milliseconds const& ivalMsec) { + LOGS(_log, LOG_LVL_INFO, context_ << __func__ << "(_asyncSubmitRequest) 1"); string const context = context_ + string(__func__) + " "; if (ivalMsec.count() == 0) throw invalid_argument(context + "the interval can not be 0."); unique_lock lock(_mtx); @@ -364,6 +384,9 @@ shared_ptr IngestRequestMgr::next(chrono::milliseconds const& iva bool const ivalExpired = !_cv.wait_for(lock, ivalMsec, [&]() { // The mutex is guaranteed to be re-locked here. request = _next(lock); + LOGS(_log, LOG_LVL_INFO, + context_ << __func__ << "(_asyncSubmitRequest) 2 request: " + << (request == nullptr ? "null" : "not null")); return request != nullptr; }); if (ivalExpired) { @@ -371,10 +394,14 @@ shared_ptr IngestRequestMgr::next(chrono::milliseconds const& iva to_string(ivalMsec.count()) + "ms"); } } + LOGS(_log, LOG_LVL_INFO, + context_ << __func__ + << "(_asyncSubmitRequest) 3 request: " << (request == nullptr ? "null" : "not null")); return request; } void IngestRequestMgr::completed(unsigned int id) { + LOGS(_log, LOG_LVL_INFO, context_ << __func__ << "(_asyncSubmitRequest) 1"); string const context = context_ + string(__func__) + " "; unique_lock lock(_mtx); auto const inProgressItr = _inProgress.find(id); @@ -395,14 +422,17 @@ void IngestRequestMgr::completed(unsigned int id) { // Concurrency has increased. Unblock all processing threads. lock.unlock(); _cv.notify_all(); + LOGS(_log, LOG_LVL_INFO, context_ << __func__ << "(_asyncSubmitRequest) 2"); return; } } lock.unlock(); _cv.notify_one(); + LOGS(_log, LOG_LVL_INFO, context_ << __func__ << "(_asyncSubmitRequest) 3"); } shared_ptr IngestRequestMgr::_next(unique_lock const& lock) { + LOGS(_log, LOG_LVL_INFO, context_ << __func__ << "(_asyncSubmitRequest) 1"); shared_ptr request; for (auto&& databaseItr : _input) { string const& databaseName = databaseItr.first; @@ -433,10 +463,14 @@ shared_ptr IngestRequestMgr::_next(unique_lock const& lock _maxConcurrency.erase(contrib.database); } } + LOGS(_log, LOG_LVL_INFO, + context_ << __func__ + << " 2(_asyncSubmitRequest) request: " << (request == nullptr ? "null" : "not null")); return request; } bool IngestRequestMgr::_updateMaxConcurrency(unique_lock const& lock, string const& database) { + LOGS(_log, LOG_LVL_INFO, context_ << __func__ << "(_asyncSubmitRequest) 1"); bool concurrencyHasIncreased = false; // The previous concurrency limit will be initialize with 0, if the database // wasn't registered in the dictionary. @@ -450,6 +484,9 @@ bool IngestRequestMgr::_updateMaxConcurrency(unique_lock const& lock ((maxConcurrencyRef != 0) && (newMaxConcurrency > maxConcurrencyRef)); maxConcurrencyRef = newMaxConcurrency; } + LOGS(_log, LOG_LVL_INFO, + context_ << __func__ << "(_asyncSubmitRequest) 2 concurrencyHasIncreased: " + << (concurrencyHasIncreased ? "1" : "0")); return concurrencyHasIncreased; }