diff --git a/src/replica/ingest/IngestHttpSvcMod.cc b/src/replica/ingest/IngestHttpSvcMod.cc index 7a44ed3ea..2782c442e 100644 --- a/src/replica/ingest/IngestHttpSvcMod.cc +++ b/src/replica/ingest/IngestHttpSvcMod.cc @@ -128,8 +128,8 @@ json IngestHttpSvcMod::_asyncCancelRequest() const { checkApiVersion(__func__, 13); auto const id = stoul(params().at("id")); - auto const contrib = _ingestRequestMgr->cancel(id); - return json::object({{"contrib", contrib.toJson()}}); + _ingestRequestMgr->cancel(id); + return json::object({{"contrib", _ingestRequestMgr->find(id).toJson()}}); } json IngestHttpSvcMod::_asyncTransRequests() const { @@ -158,7 +158,8 @@ json IngestHttpSvcMod::_asyncTransCancelRequests() const { json contribsJson = json::array(); for (auto& contrib : contribs) { try { - contribsJson.push_back(_ingestRequestMgr->cancel(contrib.id).toJson()); + _ingestRequestMgr->cancel(contrib.id); + contribsJson.push_back(_ingestRequestMgr->find(contrib.id).toJson()); } catch (IngestRequestNotFound const& ex) { // Ignore the false-positive error condition for the inactive requests that don't // have in-memory representation. These requests only exist in the persistent state diff --git a/src/replica/ingest/IngestRequestMgr.cc b/src/replica/ingest/IngestRequestMgr.cc index 8fa75ac71..798e2883b 100644 --- a/src/replica/ingest/IngestRequestMgr.cc +++ b/src/replica/ingest/IngestRequestMgr.cc @@ -215,11 +215,6 @@ size_t IngestRequestMgr::inProgressQueueSize(string const& databaseName) const { return itr->second; } -size_t IngestRequestMgr::outputQueueSize() const { - unique_lock lock(_mtx); - return _output.size(); -} - IngestRequestMgr::IngestRequestMgr(shared_ptr const& serviceProvider, string const& workerName) : _serviceProvider(serviceProvider), @@ -241,10 +236,6 @@ TransactionContribInfo IngestRequestMgr::find(unsigned int id) { if (inProgressItr != _inProgress.cend()) { return inProgressItr->second->transactionContribInfo(); } - auto const outputItr = _output.find(id); - if (outputItr != _output.cend()) { - return outputItr->second->transactionContribInfo(); - } try { // This extra test is needed to allow unit testing the class w/o // making side effects. @@ -288,7 +279,7 @@ void IngestRequestMgr::submit(shared_ptr const& request) { } } -TransactionContribInfo IngestRequestMgr::cancel(unsigned int id) { +bool IngestRequestMgr::cancel(unsigned int id) { unique_lock lock(_mtx); // Scan input queues of all active databases. for (auto&& databaseItr : _input) { @@ -304,7 +295,6 @@ TransactionContribInfo IngestRequestMgr::cancel(unsigned int id) { shared_ptr const request = *itr; request->cancel(); queue.erase(itr); - _output[id] = request; // Clear the queue and the dictionary if this was the very last element // in a scope of the database. Otherwise, refresh the concurrency limit // for the database in case if it was updated by the ingest workflow. @@ -318,7 +308,7 @@ TransactionContribInfo IngestRequestMgr::cancel(unsigned int id) { _cv.notify_all(); } } - return request->transactionContribInfo(); + return true; } } auto const inProgressItr = _inProgress.find(id); @@ -329,15 +319,10 @@ TransactionContribInfo IngestRequestMgr::cancel(unsigned int id) { // may be involved into the blocking operations such as reading/writing // from disk, network, or interacting with MySQL at this time. inProgressItr->second->cancel(); - return inProgressItr->second->transactionContribInfo(); - } - auto const outputItr = _output.find(id); - if (outputItr != _output.cend()) { - // No cancellation needed for contributions that have already been processed. - // A client will receive the actual completion status of the request. - return outputItr->second->transactionContribInfo(); + return true; } - throw IngestRequestNotFound(context_ + string(__func__) + " request " + to_string(id) + " was not found"); + // No such request found, or the request was already completed. + return false; } shared_ptr IngestRequestMgr::next() { @@ -381,7 +366,6 @@ void IngestRequestMgr::completed(unsigned int id) { " was not found"); } shared_ptr const request = inProgressItr->second; - _output[id] = request; _inProgress.erase(id); string const databaseName = request->transactionContribInfo().database; --(_concurrency[databaseName]); diff --git a/src/replica/ingest/IngestRequestMgr.h b/src/replica/ingest/IngestRequestMgr.h index 3be28dd0e..7f1b9e291 100644 --- a/src/replica/ingest/IngestRequestMgr.h +++ b/src/replica/ingest/IngestRequestMgr.h @@ -147,10 +147,10 @@ class IngestRequestMgr : public std::enable_shared_from_this { void submit(std::shared_ptr const& request); /** - * Cancel a request by its unique identifier. + * Cancel a request by its unique identifier if the request is still queued or processed. * - * - The request has to exist and be found in any of three collections. - * - Requests found in the output queue can't be cancelled. + * - The request has to exist and be found in any of those collections. + * - Completed or previously cancelled requests can't be cancelled. * - Requests in the final stage of the processing while the data are already being * ingested into the corresponding MySQL table can't be cancelled as well. * - Upon successful completion of the operation the status of the request @@ -158,16 +158,15 @@ class IngestRequestMgr : public std::enable_shared_from_this { * had been found completed by the time when the cancellation request was * made the current status of the request will be retained. Cancellation operations * for requests in the latter states are also considered as successful. - * It's up to a caller of the method to inspect the returned object descriptor to - * see the actual status of the request. + * It's up to a caller of the method to use the request lookup method find() and + * inspect the returned object descriptor to see the actual status of the request. * - The method may also throw exception should any problem happened while * the method using other services (the Replication system's database, etc.). * * @param id The unique identifier of a request to be cancelled. - * @return The updated descriptor of the request. - * @throw IngestRequestNotFound If the request is unknown to the manager. + * @return 'true' if the request was found and successfully cancelled (or marked for cancellation). */ - TransactionContribInfo cancel(unsigned int id); + bool cancel(unsigned int id); /** * Retrieves the next request from the input queue or block the calling @@ -228,9 +227,6 @@ class IngestRequestMgr : public std::enable_shared_from_this { */ size_t inProgressQueueSize(std::string const& databaseName = std::string()) const; - /// @return The number of the completed/failed/cancelled requests in the output queue. - size_t outputQueueSize() const; - private: /// @see method IngestRequestMgr::create() IngestRequestMgr(std::shared_ptr const& serviceProvider, std::string const& workerName); @@ -275,8 +271,9 @@ class IngestRequestMgr : public std::enable_shared_from_this { /// elements are added to the back of the queues. std::map>> _input; + /// Requests that are being processed by the threads are indexed by their unique + /// identifiers. std::map> _inProgress; - std::map> _output; /// The maximum number of concurrent requests to be processed for a database. /// A value of 0 means there is no limit. diff --git a/src/replica/tests/testIngestRequestMgr.cc b/src/replica/tests/testIngestRequestMgr.cc index 4834245a5..73c453a33 100644 --- a/src/replica/tests/testIngestRequestMgr.cc +++ b/src/replica/tests/testIngestRequestMgr.cc @@ -138,120 +138,102 @@ BOOST_AUTO_TEST_CASE(IngestRequestMgrSimpleTest) { }); // Instantiate the manager. - shared_ptr requestScheduler; - BOOST_REQUIRE_NO_THROW({ requestScheduler = IngestRequestMgr::test(); }); - BOOST_CHECK(requestScheduler != nullptr); + shared_ptr requestManager; + BOOST_REQUIRE_NO_THROW({ requestManager = IngestRequestMgr::test(); }); + BOOST_CHECK(requestManager != nullptr); // Test if the queues are empty - BOOST_CHECK_EQUAL(requestScheduler->inputQueueSize(), 0U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(), 0U); - BOOST_CHECK_EQUAL(requestScheduler->outputQueueSize(), 0U); + BOOST_CHECK_EQUAL(requestManager->inputQueueSize(), 0U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(), 0U); auto const inContrib1 = ::makeContrib(1, "db1"); // Test if non-existing requests (provided by their unique identifiers) // are rejected by the manager. - BOOST_REQUIRE_THROW({ requestScheduler->find(inContrib1.id); }, IngestRequestNotFound); - BOOST_REQUIRE_THROW({ requestScheduler->cancel(inContrib1.id); }, IngestRequestNotFound); - BOOST_REQUIRE_THROW({ requestScheduler->completed(inContrib1.id); }, IngestRequestNotFound); + BOOST_REQUIRE_THROW({ requestManager->find(inContrib1.id); }, IngestRequestNotFound); + BOOST_REQUIRE_NO_THROW({ BOOST_CHECK(!requestManager->cancel(inContrib1.id)); }); + BOOST_REQUIRE_THROW({ requestManager->completed(inContrib1.id); }, IngestRequestNotFound); // Null objects can't be submitted. // The queues shall not be affected by this. - BOOST_REQUIRE_THROW({ requestScheduler->submit(nullptr); }, std::invalid_argument); - BOOST_CHECK_EQUAL(requestScheduler->inputQueueSize(), 0U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(), 0U); - BOOST_CHECK_EQUAL(requestScheduler->outputQueueSize(), 0U); + BOOST_REQUIRE_THROW({ requestManager->submit(nullptr); }, std::invalid_argument); + BOOST_CHECK_EQUAL(requestManager->inputQueueSize(), 0U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(), 0U); // Test submitting the first (and the only) request. shared_ptr inRequest1; BOOST_REQUIRE_NO_THROW({ inRequest1 = IngestRequest::test(inContrib1); }); BOOST_CHECK(inRequest1 != nullptr); - BOOST_REQUIRE_NO_THROW({ requestScheduler->submit(inRequest1); }); - BOOST_CHECK_EQUAL(requestScheduler->inputQueueSize(), 1U); - BOOST_CHECK_EQUAL(requestScheduler->inputQueueSize(inContrib1.database), 1U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(), 0U); - BOOST_CHECK_EQUAL(requestScheduler->outputQueueSize(), 0U); + BOOST_REQUIRE_NO_THROW({ requestManager->submit(inRequest1); }); + BOOST_CHECK_EQUAL(requestManager->inputQueueSize(), 1U); + BOOST_CHECK_EQUAL(requestManager->inputQueueSize(inContrib1.database), 1U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(), 0U); // The request shall be known to the manager. And request finder shall // not affect the queues. TransactionContribInfo outContrib1; - BOOST_REQUIRE_NO_THROW({ outContrib1 = requestScheduler->find(inContrib1.id); }); + BOOST_REQUIRE_NO_THROW({ outContrib1 = requestManager->find(inContrib1.id); }); BOOST_CHECK_EQUAL(outContrib1.id, inContrib1.id); BOOST_CHECK_EQUAL(outContrib1.createTime, inContrib1.createTime); - BOOST_CHECK_EQUAL(requestScheduler->inputQueueSize(), 1U); - BOOST_CHECK_EQUAL(requestScheduler->inputQueueSize(inContrib1.database), 1U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(), 0U); - BOOST_CHECK_EQUAL(requestScheduler->outputQueueSize(), 0U); + BOOST_CHECK_EQUAL(requestManager->inputQueueSize(), 1U); + BOOST_CHECK_EQUAL(requestManager->inputQueueSize(inContrib1.database), 1U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(), 0U); // Cancel the request while it's in the input queue. // The cancelled request shall move into the output queue. - BOOST_REQUIRE_NO_THROW({ outContrib1 = requestScheduler->cancel(inContrib1.id); }); - BOOST_CHECK_EQUAL(outContrib1.id, inContrib1.id); - BOOST_CHECK_EQUAL(outContrib1.createTime, inContrib1.createTime); - BOOST_CHECK_EQUAL(requestScheduler->inputQueueSize(), 0U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(), 0U); - BOOST_CHECK_EQUAL(requestScheduler->outputQueueSize(), 1U); + BOOST_REQUIRE_NO_THROW({ BOOST_CHECK(requestManager->cancel(inContrib1.id)); }); + BOOST_CHECK_EQUAL(requestManager->inputQueueSize(), 0U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(), 0U); // Register the second request. auto const inContrib2 = ::makeContrib(2, "db1"); - BOOST_REQUIRE_THROW({ requestScheduler->find(inContrib2.id); }, IngestRequestNotFound); - BOOST_REQUIRE_THROW({ requestScheduler->cancel(inContrib2.id); }, IngestRequestNotFound); - BOOST_REQUIRE_THROW({ requestScheduler->completed(inContrib2.id); }, IngestRequestNotFound); + BOOST_REQUIRE_THROW({ requestManager->find(inContrib2.id); }, IngestRequestNotFound); + BOOST_REQUIRE_NO_THROW({ BOOST_CHECK(!requestManager->cancel(inContrib2.id)); }); + BOOST_REQUIRE_THROW({ requestManager->completed(inContrib2.id); }, IngestRequestNotFound); shared_ptr inRequest2; BOOST_REQUIRE_NO_THROW({ inRequest2 = IngestRequest::test(inContrib2); }); BOOST_CHECK(inRequest2 != nullptr); - BOOST_REQUIRE_NO_THROW({ requestScheduler->submit(inRequest2); }); - BOOST_CHECK_EQUAL(requestScheduler->inputQueueSize(), 1U); - BOOST_CHECK_EQUAL(requestScheduler->inputQueueSize(inContrib2.database), 1U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(), 0U); - BOOST_CHECK_EQUAL(requestScheduler->outputQueueSize(), 1U); + BOOST_REQUIRE_NO_THROW({ requestManager->submit(inRequest2); }); + BOOST_CHECK_EQUAL(requestManager->inputQueueSize(), 1U); + BOOST_CHECK_EQUAL(requestManager->inputQueueSize(inContrib2.database), 1U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(), 0U); // Pull the request for processing. // The request shall move from the input queue into the in-progress one. shared_ptr outRequest2; timer->start(); - BOOST_REQUIRE_NO_THROW({ outRequest2 = requestScheduler->next(); }); + BOOST_REQUIRE_NO_THROW({ outRequest2 = requestManager->next(); }); timer->cancel(); BOOST_CHECK(outRequest2 != nullptr); BOOST_CHECK_EQUAL(outRequest2->transactionContribInfo().id, inContrib2.id); BOOST_CHECK_EQUAL(outRequest2->transactionContribInfo().createTime, inContrib2.createTime); - BOOST_CHECK_EQUAL(requestScheduler->inputQueueSize(), 0U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(), 1U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(inContrib2.database), 1U); - BOOST_CHECK_EQUAL(requestScheduler->outputQueueSize(), 1U); + BOOST_CHECK_EQUAL(requestManager->inputQueueSize(), 0U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(), 1U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(inContrib2.database), 1U); // Make sure any further attepts to pull requests from the empty input queue // will time out. - BOOST_REQUIRE_THROW({ outRequest2 = requestScheduler->next(expirationIvalMs); }, - IngestRequestTimerExpired); + BOOST_REQUIRE_THROW({ outRequest2 = requestManager->next(expirationIvalMs); }, IngestRequestTimerExpired); // Cancel the request while it's in the in-progress queue. // The cancelled request will remain in the queue because of // the advisory cancellation. TransactionContribInfo outContrib2; - BOOST_REQUIRE_NO_THROW({ outContrib2 = requestScheduler->cancel(inContrib2.id); }); - BOOST_CHECK_EQUAL(outContrib2.id, inContrib2.id); - BOOST_CHECK_EQUAL(outContrib2.createTime, inContrib2.createTime); - BOOST_CHECK_EQUAL(requestScheduler->inputQueueSize(), 0U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(), 1U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(inContrib2.database), 1U); - BOOST_CHECK_EQUAL(requestScheduler->outputQueueSize(), 1U); + BOOST_REQUIRE_NO_THROW({ BOOST_CHECK(requestManager->cancel(inContrib2.id)); }); + BOOST_CHECK_EQUAL(requestManager->inputQueueSize(), 0U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(), 1U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(inContrib2.database), 1U); // Notify the manager that the request processing has finished. // The request shall move from the in-progress queue into the output one. - BOOST_REQUIRE_NO_THROW({ requestScheduler->completed(inContrib2.id); }); - BOOST_CHECK_EQUAL(requestScheduler->inputQueueSize(), 0U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(), 0U); - BOOST_CHECK_EQUAL(requestScheduler->outputQueueSize(), 2U); - - // Cancel the request while it's in the output queue. - // The cancelled request will remain in the queue. - BOOST_REQUIRE_NO_THROW({ outContrib2 = requestScheduler->cancel(inContrib2.id); }); - BOOST_CHECK_EQUAL(outContrib2.id, inContrib2.id); - BOOST_CHECK_EQUAL(outContrib2.createTime, inContrib2.createTime); - BOOST_CHECK_EQUAL(requestScheduler->inputQueueSize(), 0U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(), 0U); - BOOST_CHECK_EQUAL(requestScheduler->outputQueueSize(), 2U); + BOOST_REQUIRE_NO_THROW({ requestManager->completed(inContrib2.id); }); + BOOST_CHECK_EQUAL(requestManager->inputQueueSize(), 0U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(), 0U); + + // Cancel the request that has already finished. + BOOST_REQUIRE_NO_THROW({ BOOST_CHECK(!requestManager->cancel(inContrib2.id)); }); + BOOST_CHECK_EQUAL(requestManager->inputQueueSize(), 0U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(), 0U); // Cleanup the BOOST ASIO service to prevent the nasty SIGABRT of the process. io_service.stop(); @@ -289,7 +271,7 @@ BOOST_AUTO_TEST_CASE(IngestRequestMgrComplexTest) { }); shared_ptr const resourceMgr = IngestResourceMgrT::create(); - shared_ptr const requestScheduler = IngestRequestMgr::test(resourceMgr); + shared_ptr const requestManager = IngestRequestMgr::test(resourceMgr); // The concurrency limit of 2 set below for the database should be set // before registering requests. Otherwise, the limit won't be recognised. @@ -301,149 +283,136 @@ BOOST_AUTO_TEST_CASE(IngestRequestMgrComplexTest) { auto const inContrib_1_of_db1 = ::makeContrib(1001, db1); auto const inContrib_2_of_db1 = ::makeContrib(1002, db1); auto const inContrib_3_of_db1 = ::makeContrib(1003, db1); - BOOST_CHECK_EQUAL(requestScheduler->inputQueueSize(), 0U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(), 0U); - BOOST_CHECK_EQUAL(requestScheduler->outputQueueSize(), 0U); - BOOST_REQUIRE_NO_THROW({ requestScheduler->submit(IngestRequest::test(inContrib_1_of_db1)); }); - BOOST_REQUIRE_NO_THROW({ requestScheduler->submit(IngestRequest::test(inContrib_2_of_db1)); }); - BOOST_REQUIRE_NO_THROW({ requestScheduler->submit(IngestRequest::test(inContrib_3_of_db1)); }); - BOOST_CHECK_EQUAL(requestScheduler->inputQueueSize(), 3U); - BOOST_CHECK_EQUAL(requestScheduler->inputQueueSize(db1), 3U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(), 0U); - BOOST_CHECK_EQUAL(requestScheduler->outputQueueSize(), 0U); + BOOST_CHECK_EQUAL(requestManager->inputQueueSize(), 0U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(), 0U); + BOOST_REQUIRE_NO_THROW({ requestManager->submit(IngestRequest::test(inContrib_1_of_db1)); }); + BOOST_REQUIRE_NO_THROW({ requestManager->submit(IngestRequest::test(inContrib_2_of_db1)); }); + BOOST_REQUIRE_NO_THROW({ requestManager->submit(IngestRequest::test(inContrib_3_of_db1)); }); + BOOST_CHECK_EQUAL(requestManager->inputQueueSize(), 3U); + BOOST_CHECK_EQUAL(requestManager->inputQueueSize(db1), 3U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(), 0U); // Schedule two requests. The scheduler should not block the current thread. shared_ptr outRequest; timer->start(); - BOOST_REQUIRE_NO_THROW({ outRequest = requestScheduler->next(); }); + BOOST_REQUIRE_NO_THROW({ outRequest = requestManager->next(); }); timer->cancel(); BOOST_CHECK_EQUAL(outRequest->transactionContribInfo().id, inContrib_1_of_db1.id); - BOOST_CHECK_EQUAL(requestScheduler->inputQueueSize(), 2U); - BOOST_CHECK_EQUAL(requestScheduler->inputQueueSize(db1), 2U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(), 1U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(db1), 1U); - BOOST_CHECK_EQUAL(requestScheduler->outputQueueSize(), 0U); + BOOST_CHECK_EQUAL(requestManager->inputQueueSize(), 2U); + BOOST_CHECK_EQUAL(requestManager->inputQueueSize(db1), 2U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(), 1U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(db1), 1U); timer->start(); - BOOST_REQUIRE_NO_THROW({ outRequest = requestScheduler->next(); }); + BOOST_REQUIRE_NO_THROW({ outRequest = requestManager->next(); }); timer->cancel(); BOOST_CHECK_EQUAL(outRequest->transactionContribInfo().id, inContrib_2_of_db1.id); - BOOST_CHECK_EQUAL(requestScheduler->inputQueueSize(), 1U); - BOOST_CHECK_EQUAL(requestScheduler->inputQueueSize(db1), 1U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(), 2U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(db1), 2U); - BOOST_CHECK_EQUAL(requestScheduler->outputQueueSize(), 0U); + BOOST_CHECK_EQUAL(requestManager->inputQueueSize(), 1U); + BOOST_CHECK_EQUAL(requestManager->inputQueueSize(db1), 1U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(), 2U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(db1), 2U); // This attempt should block since we're about to exceed the concurrency limit // specified for the database. - BOOST_REQUIRE_THROW({ outRequest = requestScheduler->next(expirationIvalMs); }, - IngestRequestTimerExpired); + BOOST_REQUIRE_THROW({ outRequest = requestManager->next(expirationIvalMs); }, IngestRequestTimerExpired); // Submit 3 more requests in a scope of a different database. char const* db2 = "db2"; auto const inContrib_1_of_db2 = ::makeContrib(2001, db2); auto const inContrib_2_of_db2 = ::makeContrib(2002, db2); auto const inContrib_3_of_db2 = ::makeContrib(2003, db2); - BOOST_REQUIRE_NO_THROW({ requestScheduler->submit(IngestRequest::test(inContrib_1_of_db2)); }); - BOOST_REQUIRE_NO_THROW({ requestScheduler->submit(IngestRequest::test(inContrib_2_of_db2)); }); - BOOST_REQUIRE_NO_THROW({ requestScheduler->submit(IngestRequest::test(inContrib_3_of_db2)); }); - BOOST_CHECK_EQUAL(requestScheduler->inputQueueSize(), 4U); - BOOST_CHECK_EQUAL(requestScheduler->inputQueueSize(db1), 1U); - BOOST_CHECK_EQUAL(requestScheduler->inputQueueSize(db2), 3U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(), 2U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(db1), 2U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(db2), 0U); - BOOST_CHECK_EQUAL(requestScheduler->outputQueueSize(), 0U); + BOOST_REQUIRE_NO_THROW({ requestManager->submit(IngestRequest::test(inContrib_1_of_db2)); }); + BOOST_REQUIRE_NO_THROW({ requestManager->submit(IngestRequest::test(inContrib_2_of_db2)); }); + BOOST_REQUIRE_NO_THROW({ requestManager->submit(IngestRequest::test(inContrib_3_of_db2)); }); + BOOST_CHECK_EQUAL(requestManager->inputQueueSize(), 4U); + BOOST_CHECK_EQUAL(requestManager->inputQueueSize(db1), 1U); + BOOST_CHECK_EQUAL(requestManager->inputQueueSize(db2), 3U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(), 2U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(db1), 2U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(db2), 0U); // Schedule three requests. The scheduler should not block the current thread. // All scheduled requests should belong to the second database that has no // resource limit. timer->start(); - BOOST_REQUIRE_NO_THROW({ outRequest = requestScheduler->next(); }); + BOOST_REQUIRE_NO_THROW({ outRequest = requestManager->next(); }); timer->cancel(); BOOST_CHECK_EQUAL(outRequest->transactionContribInfo().id, inContrib_1_of_db2.id); - BOOST_CHECK_EQUAL(requestScheduler->inputQueueSize(), 3U); - BOOST_CHECK_EQUAL(requestScheduler->inputQueueSize(db1), 1U); - BOOST_CHECK_EQUAL(requestScheduler->inputQueueSize(db2), 2U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(), 3U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(db1), 2U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(db2), 1U); - BOOST_CHECK_EQUAL(requestScheduler->outputQueueSize(), 0U); + BOOST_CHECK_EQUAL(requestManager->inputQueueSize(), 3U); + BOOST_CHECK_EQUAL(requestManager->inputQueueSize(db1), 1U); + BOOST_CHECK_EQUAL(requestManager->inputQueueSize(db2), 2U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(), 3U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(db1), 2U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(db2), 1U); timer->start(); - BOOST_REQUIRE_NO_THROW({ outRequest = requestScheduler->next(); }); + BOOST_REQUIRE_NO_THROW({ outRequest = requestManager->next(); }); timer->cancel(); BOOST_CHECK_EQUAL(outRequest->transactionContribInfo().id, inContrib_2_of_db2.id); - BOOST_CHECK_EQUAL(requestScheduler->inputQueueSize(), 2U); - BOOST_CHECK_EQUAL(requestScheduler->inputQueueSize(db1), 1U); - BOOST_CHECK_EQUAL(requestScheduler->inputQueueSize(db2), 1U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(), 4U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(db1), 2U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(db2), 2U); - BOOST_CHECK_EQUAL(requestScheduler->outputQueueSize(), 0U); + BOOST_CHECK_EQUAL(requestManager->inputQueueSize(), 2U); + BOOST_CHECK_EQUAL(requestManager->inputQueueSize(db1), 1U); + BOOST_CHECK_EQUAL(requestManager->inputQueueSize(db2), 1U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(), 4U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(db1), 2U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(db2), 2U); timer->start(); - BOOST_REQUIRE_NO_THROW({ outRequest = requestScheduler->next(); }); + BOOST_REQUIRE_NO_THROW({ outRequest = requestManager->next(); }); timer->cancel(); BOOST_CHECK_EQUAL(outRequest->transactionContribInfo().id, inContrib_3_of_db2.id); - BOOST_CHECK_EQUAL(requestScheduler->inputQueueSize(), 1U); - BOOST_CHECK_EQUAL(requestScheduler->inputQueueSize(db1), 1U); - BOOST_CHECK_EQUAL(requestScheduler->inputQueueSize(db2), 0U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(), 5U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(db1), 2U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(db2), 3U); - BOOST_CHECK_EQUAL(requestScheduler->outputQueueSize(), 0U); + BOOST_CHECK_EQUAL(requestManager->inputQueueSize(), 1U); + BOOST_CHECK_EQUAL(requestManager->inputQueueSize(db1), 1U); + BOOST_CHECK_EQUAL(requestManager->inputQueueSize(db2), 0U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(), 5U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(db1), 2U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(db2), 3U); // This attempt should block since we've run out of the resource unconstrained // requests of the second database, and we would still exceed the concurrency limit // specified for the first database. - BOOST_REQUIRE_THROW({ outRequest = requestScheduler->next(expirationIvalMs); }, - IngestRequestTimerExpired); + BOOST_REQUIRE_THROW({ outRequest = requestManager->next(expirationIvalMs); }, IngestRequestTimerExpired); // Now declare one in-progress request of tht first database // as completed. This should unblock us from scheduling the remaing // request of the database. - BOOST_REQUIRE_NO_THROW({ requestScheduler->completed(inContrib_1_of_db1.id); }); - BOOST_CHECK_EQUAL(requestScheduler->inputQueueSize(), 1U); - BOOST_CHECK_EQUAL(requestScheduler->inputQueueSize(db1), 1U); - BOOST_CHECK_EQUAL(requestScheduler->inputQueueSize(db2), 0U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(), 4U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(db1), 1U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(db2), 3U); - BOOST_CHECK_EQUAL(requestScheduler->outputQueueSize(), 1U); + BOOST_REQUIRE_NO_THROW({ requestManager->completed(inContrib_1_of_db1.id); }); + BOOST_CHECK_EQUAL(requestManager->inputQueueSize(), 1U); + BOOST_CHECK_EQUAL(requestManager->inputQueueSize(db1), 1U); + BOOST_CHECK_EQUAL(requestManager->inputQueueSize(db2), 0U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(), 4U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(db1), 1U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(db2), 3U); timer->start(); - BOOST_REQUIRE_NO_THROW({ outRequest = requestScheduler->next(); }); + BOOST_REQUIRE_NO_THROW({ outRequest = requestManager->next(); }); timer->cancel(); BOOST_CHECK_EQUAL(outRequest->transactionContribInfo().id, inContrib_3_of_db1.id); - BOOST_CHECK_EQUAL(requestScheduler->inputQueueSize(), 0U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(), 5U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(db1), 2U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(db2), 3U); - BOOST_CHECK_EQUAL(requestScheduler->outputQueueSize(), 1U); + BOOST_CHECK_EQUAL(requestManager->inputQueueSize(), 0U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(), 5U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(db1), 2U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(db2), 3U); // Submit more requests in a scope of the first database. auto const inContrib_4_of_db1 = ::makeContrib(1004, db1); auto const inContrib_5_of_db1 = ::makeContrib(1005, db1); auto const inContrib_6_of_db1 = ::makeContrib(1006, db1); auto const inContrib_7_of_db1 = ::makeContrib(1007, db1); - BOOST_REQUIRE_NO_THROW({ requestScheduler->submit(IngestRequest::test(inContrib_4_of_db1)); }); - BOOST_REQUIRE_NO_THROW({ requestScheduler->submit(IngestRequest::test(inContrib_5_of_db1)); }); - BOOST_REQUIRE_NO_THROW({ requestScheduler->submit(IngestRequest::test(inContrib_6_of_db1)); }); - BOOST_REQUIRE_NO_THROW({ requestScheduler->submit(IngestRequest::test(inContrib_7_of_db1)); }); - - BOOST_CHECK_EQUAL(requestScheduler->inputQueueSize(), 4U); - BOOST_CHECK_EQUAL(requestScheduler->inputQueueSize(db1), 4U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(), 5U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(db1), 2U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(db2), 3U); - BOOST_CHECK_EQUAL(requestScheduler->outputQueueSize(), 1U); + BOOST_REQUIRE_NO_THROW({ requestManager->submit(IngestRequest::test(inContrib_4_of_db1)); }); + BOOST_REQUIRE_NO_THROW({ requestManager->submit(IngestRequest::test(inContrib_5_of_db1)); }); + BOOST_REQUIRE_NO_THROW({ requestManager->submit(IngestRequest::test(inContrib_6_of_db1)); }); + BOOST_REQUIRE_NO_THROW({ requestManager->submit(IngestRequest::test(inContrib_7_of_db1)); }); + + BOOST_CHECK_EQUAL(requestManager->inputQueueSize(), 4U); + BOOST_CHECK_EQUAL(requestManager->inputQueueSize(db1), 4U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(), 5U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(db1), 2U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(db2), 3U); // Furher increase concurrency limit of the first database. This should *NOT* // unblock the scheduler because the change gets into affect when @@ -452,67 +421,59 @@ BOOST_AUTO_TEST_CASE(IngestRequestMgrComplexTest) { // the input queue, finishing request processing. resourceMgr->setAsyncProcLimit(db1, 3); - BOOST_REQUIRE_THROW({ outRequest = requestScheduler->next(expirationIvalMs); }, - IngestRequestTimerExpired); - BOOST_CHECK_EQUAL(requestScheduler->inputQueueSize(), 4U); - BOOST_CHECK_EQUAL(requestScheduler->inputQueueSize(db1), 4U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(), 5U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(db1), 2U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(db2), 3U); - BOOST_CHECK_EQUAL(requestScheduler->outputQueueSize(), 1U); + BOOST_REQUIRE_THROW({ outRequest = requestManager->next(expirationIvalMs); }, IngestRequestTimerExpired); + BOOST_CHECK_EQUAL(requestManager->inputQueueSize(), 4U); + BOOST_CHECK_EQUAL(requestManager->inputQueueSize(db1), 4U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(), 5U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(db1), 2U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(db2), 3U); // Test the above-mentioned condition by cancelling one of the input // requests of the first database. - BOOST_REQUIRE_NO_THROW({ requestScheduler->cancel(inContrib_4_of_db1.id); }); - BOOST_CHECK_EQUAL(requestScheduler->inputQueueSize(), 3U); - BOOST_CHECK_EQUAL(requestScheduler->inputQueueSize(db1), 3U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(), 5U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(db1), 2U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(db2), 3U); - BOOST_CHECK_EQUAL(requestScheduler->outputQueueSize(), 2U); + BOOST_REQUIRE_NO_THROW({ requestManager->cancel(inContrib_4_of_db1.id); }); + BOOST_CHECK_EQUAL(requestManager->inputQueueSize(), 3U); + BOOST_CHECK_EQUAL(requestManager->inputQueueSize(db1), 3U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(), 5U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(db1), 2U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(db2), 3U); timer->start(); - BOOST_REQUIRE_NO_THROW({ outRequest = requestScheduler->next(); }); + BOOST_REQUIRE_NO_THROW({ outRequest = requestManager->next(); }); timer->cancel(); BOOST_CHECK_EQUAL(outRequest->transactionContribInfo().id, inContrib_5_of_db1.id); - BOOST_CHECK_EQUAL(requestScheduler->inputQueueSize(), 2U); - BOOST_CHECK_EQUAL(requestScheduler->inputQueueSize(db1), 2U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(), 6U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(db1), 3U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(db2), 3U); - BOOST_CHECK_EQUAL(requestScheduler->outputQueueSize(), 2U); + BOOST_CHECK_EQUAL(requestManager->inputQueueSize(), 2U); + BOOST_CHECK_EQUAL(requestManager->inputQueueSize(db1), 2U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(), 6U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(db1), 3U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(db2), 3U); // Now we should be locked up since the number of in-progress requests // for the first database has reached the extended by 1 limit. - BOOST_REQUIRE_THROW({ outRequest = requestScheduler->next(expirationIvalMs); }, - IngestRequestTimerExpired); + BOOST_REQUIRE_THROW({ outRequest = requestManager->next(expirationIvalMs); }, IngestRequestTimerExpired); // Mark one of those requests as completed and make another try. // The scheduler should let us move by one request only and then get // locked again. - BOOST_REQUIRE_NO_THROW({ requestScheduler->completed(inContrib_2_of_db1.id); }); - BOOST_CHECK_EQUAL(requestScheduler->inputQueueSize(), 2U); - BOOST_CHECK_EQUAL(requestScheduler->inputQueueSize(db1), 2U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(), 5U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(db1), 2U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(db2), 3U); - BOOST_CHECK_EQUAL(requestScheduler->outputQueueSize(), 3U); + BOOST_REQUIRE_NO_THROW({ requestManager->completed(inContrib_2_of_db1.id); }); + BOOST_CHECK_EQUAL(requestManager->inputQueueSize(), 2U); + BOOST_CHECK_EQUAL(requestManager->inputQueueSize(db1), 2U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(), 5U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(db1), 2U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(db2), 3U); timer->start(); - BOOST_REQUIRE_NO_THROW({ outRequest = requestScheduler->next(); }); + BOOST_REQUIRE_NO_THROW({ outRequest = requestManager->next(); }); timer->cancel(); BOOST_CHECK_EQUAL(outRequest->transactionContribInfo().id, inContrib_6_of_db1.id); - BOOST_CHECK_EQUAL(requestScheduler->inputQueueSize(), 1U); - BOOST_CHECK_EQUAL(requestScheduler->inputQueueSize(db1), 1U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(), 6U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(db1), 3U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(db2), 3U); - BOOST_CHECK_EQUAL(requestScheduler->outputQueueSize(), 3U); + BOOST_CHECK_EQUAL(requestManager->inputQueueSize(), 1U); + BOOST_CHECK_EQUAL(requestManager->inputQueueSize(db1), 1U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(), 6U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(db1), 3U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(db2), 3U); - BOOST_REQUIRE_THROW({ outRequest = requestScheduler->next(expirationIvalMs); }, - IngestRequestTimerExpired); + BOOST_REQUIRE_THROW({ outRequest = requestManager->next(expirationIvalMs); }, IngestRequestTimerExpired); // Extend the limit byy 2 and add one more request for the for database. // This will allow the scheduler to schedule 2 requests because the limits @@ -521,40 +482,36 @@ BOOST_AUTO_TEST_CASE(IngestRequestMgrComplexTest) { resourceMgr->setAsyncProcLimit(db1, 5); auto const inContrib_8_of_db1 = ::makeContrib(1008, db1); - BOOST_REQUIRE_NO_THROW({ requestScheduler->submit(IngestRequest::test(inContrib_8_of_db1)); }); - BOOST_CHECK_EQUAL(requestScheduler->inputQueueSize(), 2U); - BOOST_CHECK_EQUAL(requestScheduler->inputQueueSize(db1), 2U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(), 6U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(db1), 3U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(db2), 3U); - BOOST_CHECK_EQUAL(requestScheduler->outputQueueSize(), 3U); + BOOST_REQUIRE_NO_THROW({ requestManager->submit(IngestRequest::test(inContrib_8_of_db1)); }); + BOOST_CHECK_EQUAL(requestManager->inputQueueSize(), 2U); + BOOST_CHECK_EQUAL(requestManager->inputQueueSize(db1), 2U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(), 6U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(db1), 3U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(db2), 3U); timer->start(); - BOOST_REQUIRE_NO_THROW({ outRequest = requestScheduler->next(); }); + BOOST_REQUIRE_NO_THROW({ outRequest = requestManager->next(); }); timer->cancel(); BOOST_CHECK_EQUAL(outRequest->transactionContribInfo().id, inContrib_7_of_db1.id); - BOOST_CHECK_EQUAL(requestScheduler->inputQueueSize(), 1U); - BOOST_CHECK_EQUAL(requestScheduler->inputQueueSize(db1), 1U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(), 7U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(db1), 4U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(db2), 3U); - BOOST_CHECK_EQUAL(requestScheduler->outputQueueSize(), 3U); + BOOST_CHECK_EQUAL(requestManager->inputQueueSize(), 1U); + BOOST_CHECK_EQUAL(requestManager->inputQueueSize(db1), 1U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(), 7U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(db1), 4U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(db2), 3U); timer->start(); - BOOST_REQUIRE_NO_THROW({ outRequest = requestScheduler->next(); }); + BOOST_REQUIRE_NO_THROW({ outRequest = requestManager->next(); }); timer->cancel(); BOOST_CHECK_EQUAL(outRequest->transactionContribInfo().id, inContrib_8_of_db1.id); - BOOST_CHECK_EQUAL(requestScheduler->inputQueueSize(), 0U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(), 8U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(db1), 5U); - BOOST_CHECK_EQUAL(requestScheduler->inProgressQueueSize(db2), 3U); - BOOST_CHECK_EQUAL(requestScheduler->outputQueueSize(), 3U); + BOOST_CHECK_EQUAL(requestManager->inputQueueSize(), 0U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(), 8U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(db1), 5U); + BOOST_CHECK_EQUAL(requestManager->inProgressQueueSize(db2), 3U); // No more input requests are available at this point. - BOOST_REQUIRE_THROW({ outRequest = requestScheduler->next(expirationIvalMs); }, - IngestRequestTimerExpired); + BOOST_REQUIRE_THROW({ outRequest = requestManager->next(expirationIvalMs); }, IngestRequestTimerExpired); // Cleanup the BOOST ASIO service to prevent the nasty SIGABRT of the process. io_service.stop();