Skip to content

Commit

Permalink
Merge branch 'tickets/DM-46430'
Browse files Browse the repository at this point in the history
  • Loading branch information
iagaponenko committed Sep 24, 2024
2 parents 3876a9f + fa7c944 commit 8f2739c
Show file tree
Hide file tree
Showing 4 changed files with 187 additions and 248 deletions.
7 changes: 4 additions & 3 deletions src/replica/ingest/IngestHttpSvcMod.cc
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ json IngestHttpSvcMod::_asyncCancelRequest() const {
checkApiVersion(__func__, 13);

auto const id = stoul(params().at("id"));
auto const contrib = _ingestRequestMgr->cancel(id);
return json::object({{"contrib", contrib.toJson()}});
_ingestRequestMgr->cancel(id);
return json::object({{"contrib", _ingestRequestMgr->find(id).toJson()}});
}

json IngestHttpSvcMod::_asyncTransRequests() const {
Expand Down Expand Up @@ -158,7 +158,8 @@ json IngestHttpSvcMod::_asyncTransCancelRequests() const {
json contribsJson = json::array();
for (auto& contrib : contribs) {
try {
contribsJson.push_back(_ingestRequestMgr->cancel(contrib.id).toJson());
_ingestRequestMgr->cancel(contrib.id);
contribsJson.push_back(_ingestRequestMgr->find(contrib.id).toJson());
} catch (IngestRequestNotFound const& ex) {
// Ignore the false-positive error condition for the inactive requests that don't
// have in-memory representation. These requests only exist in the persistent state
Expand Down
26 changes: 5 additions & 21 deletions src/replica/ingest/IngestRequestMgr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -215,11 +215,6 @@ size_t IngestRequestMgr::inProgressQueueSize(string const& databaseName) const {
return itr->second;
}

size_t IngestRequestMgr::outputQueueSize() const {
unique_lock<mutex> lock(_mtx);
return _output.size();
}

IngestRequestMgr::IngestRequestMgr(shared_ptr<ServiceProvider> const& serviceProvider,
string const& workerName)
: _serviceProvider(serviceProvider),
Expand All @@ -241,10 +236,6 @@ TransactionContribInfo IngestRequestMgr::find(unsigned int id) {
if (inProgressItr != _inProgress.cend()) {
return inProgressItr->second->transactionContribInfo();
}
auto const outputItr = _output.find(id);
if (outputItr != _output.cend()) {
return outputItr->second->transactionContribInfo();
}
try {
// This extra test is needed to allow unit testing the class w/o
// making side effects.
Expand Down Expand Up @@ -288,7 +279,7 @@ void IngestRequestMgr::submit(shared_ptr<IngestRequest> const& request) {
}
}

TransactionContribInfo IngestRequestMgr::cancel(unsigned int id) {
bool IngestRequestMgr::cancel(unsigned int id) {
unique_lock<mutex> lock(_mtx);
// Scan input queues of all active databases.
for (auto&& databaseItr : _input) {
Expand All @@ -304,7 +295,6 @@ TransactionContribInfo IngestRequestMgr::cancel(unsigned int id) {
shared_ptr<IngestRequest> const request = *itr;
request->cancel();
queue.erase(itr);
_output[id] = request;
// Clear the queue and the dictionary if this was the very last element
// in a scope of the database. Otherwise, refresh the concurrency limit
// for the database in case if it was updated by the ingest workflow.
Expand All @@ -318,7 +308,7 @@ TransactionContribInfo IngestRequestMgr::cancel(unsigned int id) {
_cv.notify_all();
}
}
return request->transactionContribInfo();
return true;
}
}
auto const inProgressItr = _inProgress.find(id);
Expand All @@ -329,15 +319,10 @@ TransactionContribInfo IngestRequestMgr::cancel(unsigned int id) {
// may be involved into the blocking operations such as reading/writing
// from disk, network, or interacting with MySQL at this time.
inProgressItr->second->cancel();
return inProgressItr->second->transactionContribInfo();
}
auto const outputItr = _output.find(id);
if (outputItr != _output.cend()) {
// No cancellation needed for contributions that have already been processed.
// A client will receive the actual completion status of the request.
return outputItr->second->transactionContribInfo();
return true;
}
throw IngestRequestNotFound(context_ + string(__func__) + " request " + to_string(id) + " was not found");
// No such request found, or the request was already completed.
return false;
}

shared_ptr<IngestRequest> IngestRequestMgr::next() {
Expand Down Expand Up @@ -381,7 +366,6 @@ void IngestRequestMgr::completed(unsigned int id) {
" was not found");
}
shared_ptr<IngestRequest> const request = inProgressItr->second;
_output[id] = request;
_inProgress.erase(id);
string const databaseName = request->transactionContribInfo().database;
--(_concurrency[databaseName]);
Expand Down
21 changes: 9 additions & 12 deletions src/replica/ingest/IngestRequestMgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,27 +147,26 @@ class IngestRequestMgr : public std::enable_shared_from_this<IngestRequestMgr> {
void submit(std::shared_ptr<IngestRequest> const& request);

/**
* Cancel a request by its unique identifier.
* Cancel a request by its unique identifier if the request is still queued or processed.
*
* - The request has to exist and be found in any of three collections.
* - Requests found in the output queue can't be cancelled.
* - The request has to exist and be found in any of those collections.
* - Completed or previously cancelled requests can't be cancelled.
* - Requests in the final stage of the processing while the data are already being
* ingested into the corresponding MySQL table can't be cancelled as well.
* - Upon successful completion of the operation the status of the request
* will be set to TransactionContribInfo::Status::CANCELLED. Requests that
* had been found completed by the time when the cancellation request was
* made the current status of the request will be retained. Cancellation operations
* for requests in the latter states are also considered as successful.
* It's up to a caller of the method to inspect the returned object descriptor to
* see the actual status of the request.
* It's up to a caller of the method to use the request lookup method find() and
* inspect the returned object descriptor to see the actual status of the request.
* - The method may also throw exception should any problem happened while
* the method using other services (the Replication system's database, etc.).
*
* @param id The unique identifier of a request to be cancelled.
* @return The updated descriptor of the request.
* @throw IngestRequestNotFound If the request is unknown to the manager.
* @return 'true' if the request was found and successfully cancelled (or marked for cancellation).
*/
TransactionContribInfo cancel(unsigned int id);
bool cancel(unsigned int id);

/**
* Retrieves the next request from the input queue or block the calling
Expand Down Expand Up @@ -228,9 +227,6 @@ class IngestRequestMgr : public std::enable_shared_from_this<IngestRequestMgr> {
*/
size_t inProgressQueueSize(std::string const& databaseName = std::string()) const;

/// @return The number of the completed/failed/cancelled requests in the output queue.
size_t outputQueueSize() const;

private:
/// @see method IngestRequestMgr::create()
IngestRequestMgr(std::shared_ptr<ServiceProvider> const& serviceProvider, std::string const& workerName);
Expand Down Expand Up @@ -275,8 +271,9 @@ class IngestRequestMgr : public std::enable_shared_from_this<IngestRequestMgr> {
/// elements are added to the back of the queues.
std::map<std::string, std::list<std::shared_ptr<IngestRequest>>> _input;

/// Requests that are being processed by the threads are indexed by their unique
/// identifiers.
std::map<unsigned int, std::shared_ptr<IngestRequest>> _inProgress;
std::map<unsigned int, std::shared_ptr<IngestRequest>> _output;

/// The maximum number of concurrent requests to be processed for a database.
/// A value of 0 means there is no limit.
Expand Down
Loading

0 comments on commit 8f2739c

Please sign in to comment.