Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-46430: Garbage collection of the completed table contribution requests #870

Merged
merged 1 commit into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions src/replica/ingest/IngestHttpSvcMod.cc
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ json IngestHttpSvcMod::_asyncCancelRequest() const {
checkApiVersion(__func__, 13);

auto const id = stoul(params().at("id"));
auto const contrib = _ingestRequestMgr->cancel(id);
return json::object({{"contrib", contrib.toJson()}});
_ingestRequestMgr->cancel(id);
return json::object({{"contrib", _ingestRequestMgr->find(id).toJson()}});
}

json IngestHttpSvcMod::_asyncTransRequests() const {
Expand Down Expand Up @@ -158,7 +158,8 @@ json IngestHttpSvcMod::_asyncTransCancelRequests() const {
json contribsJson = json::array();
for (auto& contrib : contribs) {
try {
contribsJson.push_back(_ingestRequestMgr->cancel(contrib.id).toJson());
_ingestRequestMgr->cancel(contrib.id);
contribsJson.push_back(_ingestRequestMgr->find(contrib.id).toJson());
} catch (IngestRequestNotFound const& ex) {
// Ignore the false-positive error condition for the inactive requests that don't
// have in-memory representation. These requests only exist in the persistent state
Expand Down
26 changes: 5 additions & 21 deletions src/replica/ingest/IngestRequestMgr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -215,11 +215,6 @@ size_t IngestRequestMgr::inProgressQueueSize(string const& databaseName) const {
return itr->second;
}

size_t IngestRequestMgr::outputQueueSize() const {
unique_lock<mutex> lock(_mtx);
return _output.size();
}

IngestRequestMgr::IngestRequestMgr(shared_ptr<ServiceProvider> const& serviceProvider,
string const& workerName)
: _serviceProvider(serviceProvider),
Expand All @@ -241,10 +236,6 @@ TransactionContribInfo IngestRequestMgr::find(unsigned int id) {
if (inProgressItr != _inProgress.cend()) {
return inProgressItr->second->transactionContribInfo();
}
auto const outputItr = _output.find(id);
if (outputItr != _output.cend()) {
return outputItr->second->transactionContribInfo();
}
try {
// This extra test is needed to allow unit testing the class w/o
// making side effects.
Expand Down Expand Up @@ -288,7 +279,7 @@ void IngestRequestMgr::submit(shared_ptr<IngestRequest> const& request) {
}
}

TransactionContribInfo IngestRequestMgr::cancel(unsigned int id) {
bool IngestRequestMgr::cancel(unsigned int id) {
unique_lock<mutex> lock(_mtx);
// Scan input queues of all active databases.
for (auto&& databaseItr : _input) {
Expand All @@ -304,7 +295,6 @@ TransactionContribInfo IngestRequestMgr::cancel(unsigned int id) {
shared_ptr<IngestRequest> const request = *itr;
request->cancel();
queue.erase(itr);
_output[id] = request;
// Clear the queue and the dictionary if this was the very last element
// in a scope of the database. Otherwise, refresh the concurrency limit
// for the database in case if it was updated by the ingest workflow.
Expand All @@ -318,7 +308,7 @@ TransactionContribInfo IngestRequestMgr::cancel(unsigned int id) {
_cv.notify_all();
}
}
return request->transactionContribInfo();
return true;
}
}
auto const inProgressItr = _inProgress.find(id);
Expand All @@ -329,15 +319,10 @@ TransactionContribInfo IngestRequestMgr::cancel(unsigned int id) {
// may be involved into the blocking operations such as reading/writing
// from disk, network, or interacting with MySQL at this time.
inProgressItr->second->cancel();
return inProgressItr->second->transactionContribInfo();
}
auto const outputItr = _output.find(id);
if (outputItr != _output.cend()) {
// No cancellation needed for contributions that have already been processed.
// A client will receive the actual completion status of the request.
return outputItr->second->transactionContribInfo();
return true;
}
throw IngestRequestNotFound(context_ + string(__func__) + " request " + to_string(id) + " was not found");
// No such request found, or the request was already completed.
return false;
}

shared_ptr<IngestRequest> IngestRequestMgr::next() {
Expand Down Expand Up @@ -381,7 +366,6 @@ void IngestRequestMgr::completed(unsigned int id) {
" was not found");
}
shared_ptr<IngestRequest> const request = inProgressItr->second;
_output[id] = request;
_inProgress.erase(id);
string const databaseName = request->transactionContribInfo().database;
--(_concurrency[databaseName]);
Expand Down
21 changes: 9 additions & 12 deletions src/replica/ingest/IngestRequestMgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,27 +147,26 @@ class IngestRequestMgr : public std::enable_shared_from_this<IngestRequestMgr> {
void submit(std::shared_ptr<IngestRequest> const& request);

/**
* Cancel a request by its unique identifier.
* Cancel a request by its unique identifier if the request is still queued or processed.
*
* - The request has to exist and be found in any of three collections.
* - Requests found in the output queue can't be cancelled.
* - The request has to exist and be found in any of those collections.
* - Completed or previously cancelled requests can't be cancelled.
* - Requests in the final stage of the processing while the data are already being
* ingested into the corresponding MySQL table can't be cancelled as well.
* - Upon successful completion of the operation the status of the request
* will be set to TransactionContribInfo::Status::CANCELLED. Requests that
* had been found completed by the time when the cancellation request was
* made the current status of the request will be retained. Cancellation operations
* for requests in the latter states are also considered as successful.
* It's up to a caller of the method to inspect the returned object descriptor to
* see the actual status of the request.
* It's up to a caller of the method to use the request lookup method find() and
* inspect the returned object descriptor to see the actual status of the request.
* - The method may also throw exception should any problem happened while
* the method using other services (the Replication system's database, etc.).
*
* @param id The unique identifier of a request to be cancelled.
* @return The updated descriptor of the request.
* @throw IngestRequestNotFound If the request is unknown to the manager.
* @return 'true' if the request was found and successfully cancelled (or marked for cancellation).
*/
TransactionContribInfo cancel(unsigned int id);
bool cancel(unsigned int id);

/**
* Retrieves the next request from the input queue or block the calling
Expand Down Expand Up @@ -228,9 +227,6 @@ class IngestRequestMgr : public std::enable_shared_from_this<IngestRequestMgr> {
*/
size_t inProgressQueueSize(std::string const& databaseName = std::string()) const;

/// @return The number of the completed/failed/cancelled requests in the output queue.
size_t outputQueueSize() const;

private:
/// @see method IngestRequestMgr::create()
IngestRequestMgr(std::shared_ptr<ServiceProvider> const& serviceProvider, std::string const& workerName);
Expand Down Expand Up @@ -275,8 +271,9 @@ class IngestRequestMgr : public std::enable_shared_from_this<IngestRequestMgr> {
/// elements are added to the back of the queues.
std::map<std::string, std::list<std::shared_ptr<IngestRequest>>> _input;

/// Requests that are being processed by the threads are indexed by their unique
/// identifiers.
std::map<unsigned int, std::shared_ptr<IngestRequest>> _inProgress;
std::map<unsigned int, std::shared_ptr<IngestRequest>> _output;

/// The maximum number of concurrent requests to be processed for a database.
/// A value of 0 means there is no limit.
Expand Down
Loading
Loading