From ad003fef03b2b294871c0f5a9510a11831c8db28 Mon Sep 17 00:00:00 2001 From: Igor Gaponenko Date: Thu, 5 Sep 2024 23:07:37 -0700 Subject: [PATCH] Refactored and fixed state management of the contribution request The new code also fixes a lock up that existed in the previous version of the code. --- src/replica/ingest/IngestRequest.cc | 203 ++++++++++++++++------------ src/replica/ingest/IngestRequest.h | 51 ++++++- 2 files changed, 165 insertions(+), 89 deletions(-) diff --git a/src/replica/ingest/IngestRequest.cc b/src/replica/ingest/IngestRequest.cc index 725960b2e..fd0b86577 100644 --- a/src/replica/ingest/IngestRequest.cc +++ b/src/replica/ingest/IngestRequest.cc @@ -243,25 +243,26 @@ IngestRequest::IngestRequest(shared_ptr const& serviceProvider, unsigned int maxNumWarnings, unsigned int maxRetries) : IngestFileSvc(serviceProvider, workerName) { // Initialize the descriptor - _contrib.transactionId = transactionId; - _contrib.table = table; - _contrib.chunk = chunk; - _contrib.isOverlap = isOverlap; - _contrib.worker = workerName; - _contrib.url = url; - _contrib.charsetName = charsetName; - _contrib.async = async; - _contrib.dialectInput = dialectInput; - _contrib.httpMethod = httpMethod; - _contrib.httpData = httpData; - _contrib.httpHeaders = httpHeaders; + TransactionContribInfo contrib; + contrib.transactionId = transactionId; + contrib.table = table; + contrib.chunk = chunk; + contrib.isOverlap = isOverlap; + contrib.worker = workerName; + contrib.url = url; + contrib.charsetName = charsetName; + contrib.async = async; + contrib.dialectInput = dialectInput; + contrib.httpMethod = httpMethod; + contrib.httpData = httpData; + contrib.httpHeaders = httpHeaders; if (maxNumWarnings == 0) { - _contrib.maxNumWarnings = + contrib.maxNumWarnings = serviceProvider->config()->get("worker", "loader-max-warnings"); } else { - _contrib.maxNumWarnings = maxNumWarnings; + contrib.maxNumWarnings = maxNumWarnings; } - _contrib.maxRetries = std::min( + contrib.maxRetries = std::min( maxRetries, serviceProvider->config()->get("worker", "ingest-max-retries")); // Prescreen parameters of the request to ensure the request has a valid @@ -271,37 +272,38 @@ IngestRequest::IngestRequest(shared_ptr const& serviceProvider, bool const failed = true; auto const config = serviceProvider->config(); auto const databaseServices = serviceProvider->databaseServices(); - auto const trans = databaseServices->transaction(_contrib.transactionId); + auto const trans = databaseServices->transaction(contrib.transactionId); - _contrib.database = trans.database; + contrib.database = trans.database; - DatabaseInfo const database = config->databaseInfo(_contrib.database); - if (!database.tableExists(_contrib.table)) { - throw invalid_argument(context + "no such table '" + _contrib.table + "' in database '" + - _contrib.database + "'."); + DatabaseInfo const database = config->databaseInfo(contrib.database); + if (!database.tableExists(contrib.table)) { + throw invalid_argument(context + "no such table '" + contrib.table + "' in database '" + + contrib.database + "'."); } // Any failures detected hereafter will result in registering the contribution // as failed for further analysis by the ingest workflows. try { - IngestRequest::_validateState(trans, database, _contrib); - _resource.reset(new http::Url(_contrib.url)); + IngestRequest::_validateState(trans, database, contrib); + _resource.reset(new http::Url(contrib.url)); switch (_resource->scheme()) { case http::Url::FILE: case http::Url::HTTP: case http::Url::HTTPS: break; default: - throw invalid_argument(context + "unsupported url '" + _contrib.url + "'"); + throw invalid_argument(context + "unsupported url '" + contrib.url + "'"); } _dialect = csv::Dialect(dialectInput); } catch (exception const& ex) { - _contrib.error = context + ex.what(); - _contrib.retryAllowed = false; - _contrib = databaseServices->createdTransactionContrib(_contrib, failed); + auto contrib = transactionContribInfo(); + contrib.error = context + ex.what(); + contrib.retryAllowed = false; + _updateTransactionContribInfo(databaseServices->createdTransactionContrib(contrib, failed)); throw; } - _contrib = databaseServices->createdTransactionContrib(_contrib); + _updateTransactionContribInfo(databaseServices->createdTransactionContrib(contrib)); } IngestRequest::IngestRequest(shared_ptr const& serviceProvider, string const& workerName, @@ -318,10 +320,16 @@ IngestRequest::IngestRequest(TransactionContribInfo const& contrib) TransactionContribInfo IngestRequest::transactionContribInfo() const { string const context = ::context_ + string(__func__) + " "; - replica::Lock lock(_mtx, context); + replica::Lock lock(_contribMtx, context); return _contrib; } +void IngestRequest::_updateTransactionContribInfo(TransactionContribInfo const& contrib) { + string const context = ::context_ + string(__func__) + " "; + replica::Lock lock(_contribMtx, context); + _contrib = contrib; +} + void IngestRequest::process() { // No actual processing for the test requests made for unit testing. if (serviceProvider() == nullptr) return; @@ -355,10 +363,11 @@ void IngestRequest::_processStart() { bool const failed = true; auto const databaseServices = serviceProvider()->databaseServices(); if (_cancelled) { - _contrib.error = "cancelled before beginning processing the request."; - _contrib.retryAllowed = true; - _contrib = databaseServices->startedTransactionContrib(_contrib, failed, - TransactionContribInfo::Status::CANCELLED); + auto contrib = transactionContribInfo(); + contrib.error = "cancelled before beginning processing the request."; + contrib.retryAllowed = true; + _updateTransactionContribInfo(databaseServices->startedTransactionContrib( + contrib, failed, TransactionContribInfo::Status::CANCELLED)); throw IngestRequestInterrupted(context + "request " + to_string(_contrib.id) + _contrib.error); } @@ -380,9 +389,10 @@ void IngestRequest::_processStart() { try { IngestRequest::_validateState(trans, database, _contrib); } catch (exception const& ex) { - _contrib.error = context + ex.what(); - _contrib.retryAllowed = false; - _contrib = databaseServices->startedTransactionContrib(_contrib, failed); + auto contrib = transactionContribInfo(); + contrib.error = context + ex.what(); + contrib.retryAllowed = false; + _updateTransactionContribInfo(databaseServices->startedTransactionContrib(contrib, failed)); throw; } @@ -395,24 +405,27 @@ void IngestRequest::_openTmpFileAndStart(replica::Lock const& lock) { bool const failed = true; auto const databaseServices = serviceProvider()->databaseServices(); try { - _contrib.tmpFile = openFile(_contrib.transactionId, _contrib.table, _dialect, _contrib.charsetName, - _contrib.chunk, _contrib.isOverlap); - _contrib = databaseServices->startedTransactionContrib(_contrib); + auto contrib = transactionContribInfo(); + contrib.tmpFile = openFile(contrib.transactionId, contrib.table, _dialect, contrib.charsetName, + contrib.chunk, contrib.isOverlap); + _updateTransactionContribInfo(databaseServices->startedTransactionContrib(contrib)); } catch (http::Error const& ex) { + auto contrib = transactionContribInfo(); json const errorExt = ex.errorExt(); if (!errorExt.empty()) { - _contrib.httpError = errorExt["http_error"]; - _contrib.systemError = errorExt["system_error"]; + contrib.httpError = errorExt["http_error"]; + contrib.systemError = errorExt["system_error"]; } - _contrib.error = ex.what(); - _contrib.retryAllowed = true; - _contrib = databaseServices->startedTransactionContrib(_contrib, failed); + contrib.error = ex.what(); + contrib.retryAllowed = true; + _updateTransactionContribInfo(databaseServices->startedTransactionContrib(contrib, failed)); throw; } catch (exception const& ex) { - _contrib.systemError = errno; - _contrib.error = ex.what(); - _contrib.retryAllowed = true; - _contrib = databaseServices->startedTransactionContrib(_contrib, failed); + auto contrib = transactionContribInfo(); + contrib.systemError = errno; + contrib.error = ex.what(); + contrib.retryAllowed = true; + _updateTransactionContribInfo(databaseServices->startedTransactionContrib(contrib, failed)); throw; } } @@ -429,10 +442,11 @@ void IngestRequest::_processReadData() { while (true) { // Start reading and preprocessing the input file. if (_cancelled) { - _contrib.error = "cancelled before reading the input file."; - _contrib.retryAllowed = true; - _contrib = databaseServices->readTransactionContrib(_contrib, failed, - TransactionContribInfo::Status::CANCELLED); + auto contrib = transactionContribInfo(); + contrib.error = "cancelled before reading the input file."; + contrib.retryAllowed = true; + _updateTransactionContribInfo(databaseServices->readTransactionContrib( + contrib, failed, TransactionContribInfo::Status::CANCELLED)); closeFile(); throw IngestRequestInterrupted(context + "request " + to_string(_contrib.id) + _contrib.error); } @@ -448,23 +462,25 @@ void IngestRequest::_processReadData() { default: throw invalid_argument(context + "unsupported url '" + _contrib.url + "'"); } - _contrib = databaseServices->readTransactionContrib(_contrib); + _updateTransactionContribInfo(databaseServices->readTransactionContrib(_contrib)); return; } catch (http::Error const& ex) { + auto contrib = transactionContribInfo(); json const errorExt = ex.errorExt(); if (!errorExt.empty()) { - _contrib.httpError = errorExt["http_error"]; - _contrib.systemError = errorExt["system_error"]; + contrib.httpError = errorExt["http_error"]; + contrib.systemError = errorExt["system_error"]; } - _contrib.error = ex.what(); - _contrib.retryAllowed = true; - _contrib = databaseServices->readTransactionContrib(_contrib, failed); + contrib.error = ex.what(); + contrib.retryAllowed = true; + _updateTransactionContribInfo(databaseServices->readTransactionContrib(contrib, failed)); if (!_closeTmpFileAndRetry(lock)) throw; } catch (exception const& ex) { - _contrib.systemError = errno; - _contrib.error = ex.what(); - _contrib.retryAllowed = true; - _contrib = databaseServices->readTransactionContrib(_contrib, failed); + auto contrib = transactionContribInfo(); + contrib.systemError = errno; + contrib.error = ex.what(); + contrib.retryAllowed = true; + _updateTransactionContribInfo(databaseServices->readTransactionContrib(contrib, failed)); if (!_closeTmpFileAndRetry(lock)) throw; } } @@ -480,8 +496,10 @@ bool IngestRequest::_closeTmpFileAndRetry(replica::Lock const& lock) { // into the retry. The corresponding fields of the contribution objects // will get reset to the initial values (which are the same as in the default // constructed retry object). + auto contrib = transactionContribInfo(); TransactionContribInfo::FailedRetry const failedRetry = - _contrib.resetForRetry(_contrib.status, _contrib.async); + contrib.resetForRetry(contrib.status, contrib.async); + _updateTransactionContribInfo(contrib); // This method will open the new temporary file save the updated state of // the contribution to prepare the current context for the next attempt @@ -489,9 +507,11 @@ bool IngestRequest::_closeTmpFileAndRetry(replica::Lock const& lock) { _openTmpFileAndStart(lock); // The retry object has to be saved separately. - _contrib.failedRetries.push_back(failedRetry); - _contrib.numFailedRetries = _contrib.failedRetries.size(); - _contrib = serviceProvider()->databaseServices()->saveLastTransactionContribRetry(_contrib); + contrib = transactionContribInfo(); + contrib.failedRetries.push_back(failedRetry); + contrib.numFailedRetries = contrib.failedRetries.size(); + _updateTransactionContribInfo( + serviceProvider()->databaseServices()->saveLastTransactionContribRetry(contrib)); return true; } @@ -506,23 +526,26 @@ void IngestRequest::_processLoadData() { // Load the preprocessed input file into MySQL and update the persistent // state of the contribution request. if (_cancelled) { - _contrib.error = "cancelled before loading data into MySQL"; - _contrib.retryAllowed = true; - _contrib = databaseServices->loadedTransactionContrib(_contrib, failed, - TransactionContribInfo::Status::CANCELLED); + auto contrib = transactionContribInfo(); + contrib.error = "cancelled before loading data into MySQL"; + contrib.retryAllowed = true; + _updateTransactionContribInfo(databaseServices->loadedTransactionContrib( + contrib, failed, TransactionContribInfo::Status::CANCELLED)); closeFile(); throw IngestRequestInterrupted(context + "request " + to_string(_contrib.id) + _contrib.error); } try { loadDataIntoTable(_contrib.maxNumWarnings); - _contrib.numWarnings = numWarnings(); - _contrib.warnings = warnings(); - _contrib.numRowsLoaded = numRowsLoaded(); - _contrib = databaseServices->loadedTransactionContrib(_contrib); + auto contrib = transactionContribInfo(); + contrib.numWarnings = numWarnings(); + contrib.warnings = warnings(); + contrib.numRowsLoaded = numRowsLoaded(); + _updateTransactionContribInfo(databaseServices->loadedTransactionContrib(contrib)); } catch (exception const& ex) { - _contrib.systemError = errno; - _contrib.error = ex.what(); - _contrib = databaseServices->loadedTransactionContrib(_contrib, failed); + auto contrib = transactionContribInfo(); + contrib.systemError = errno; + contrib.error = ex.what(); + _updateTransactionContribInfo(databaseServices->loadedTransactionContrib(contrib, failed)); closeFile(); throw; } @@ -532,8 +555,8 @@ void IngestRequest::_processLoadData() { void IngestRequest::_readLocalFile(replica::Lock const& lock) { string const context = ::context_ + string(__func__) + " "; - _contrib.numBytes = 0; - _contrib.numRows = 0; + uint64_t numBytes = 0; + uint64_t numRows = 0; unique_ptr const record(new char[defaultRecordSizeBytes]); ifstream infile(_resource->filePath(), ios::binary); @@ -552,22 +575,27 @@ void IngestRequest::_readLocalFile(replica::Lock const& lock) { "', errno: " + to_string(errno)); } size_t const num = infile.gcount(); - _contrib.numBytes += num; + numBytes += num; // Flush the last record if the end of the file. parser->parse(record.get(), num, eof, [&](char const* buf, size_t size) { writeRowIntoFile(buf, size); - _contrib.numRows++; + numRows++; }); } while (!eof); + + auto contrib = transactionContribInfo(); + contrib.numBytes = numBytes; + contrib.numRows = numRows; + _updateTransactionContribInfo(contrib); } void IngestRequest::_readRemoteFile(replica::Lock const& lock) { - _contrib.numBytes = 0; - _contrib.numRows = 0; + uint64_t numBytes = 0; + uint64_t numRows = 0; auto const reportRow = [&](char const* buf, size_t size) { writeRowIntoFile(buf, size); - _contrib.numRows++; + numRows++; }; // The configuration may be updated later if certificate bundles were loaded @@ -603,11 +631,16 @@ void IngestRequest::_readRemoteFile(replica::Lock const& lock) { clientConfig); reader.read([&](char const* record, size_t size) { parser->parse(record, size, !flush, reportRow); - _contrib.numBytes += size; + numBytes += size; }); // Flush the last non-terminated line stored in the parser (if any). string const emptyRecord; parser->parse(emptyRecord.data(), emptyRecord.size(), flush, reportRow); + + auto contrib = transactionContribInfo(); + contrib.numBytes = numBytes; + contrib.numRows = numRows; + _updateTransactionContribInfo(contrib); } http::ClientConfig IngestRequest::_clientConfig(replica::Lock const& lock) const { diff --git a/src/replica/ingest/IngestRequest.h b/src/replica/ingest/IngestRequest.h index b2c42473c..cf752c40b 100644 --- a/src/replica/ingest/IngestRequest.h +++ b/src/replica/ingest/IngestRequest.h @@ -211,6 +211,11 @@ class IngestRequest : public std::enable_shared_from_this, public /// @see method IngestRequest::test() IngestRequest(TransactionContribInfo const& contrib); + /// Update the contribution object. The method is thread-safe. It's required + /// to be called from any method but the constructors when the contribution + /// state needs to be updated. + void _updateTransactionContribInfo(TransactionContribInfo const& contrib); + // Three processing stages of the request void _processStart(); @@ -238,10 +243,6 @@ class IngestRequest : public std::enable_shared_from_this, public /// Mutex guarding internal state. mutable replica::Mutex _mtx; - /// The descriptor is build by the c-tor after validating the input - /// parameters of the request. - TransactionContribInfo _contrib; - // These variables are set by the constructors after completing parameter validation. std::unique_ptr _resource; csv::Dialect _dialect; @@ -255,6 +256,48 @@ class IngestRequest : public std::enable_shared_from_this, public /// Set by calling the public method cancel(). Setting the flag will interrupt /// request processing (if the one is still going on). std::atomic _cancelled{false}; + + /// Mutex guarding transitions of the transaction contribution object _contrib. + mutable replica::Mutex _contribMtx; + + /** + * The descriptor is initialized either from a value passed into the corresponding + * constructor or it's built from scratch by another constructor after validating + * input parameters of a request. + * + * In order to understand how the descriptor gets evolved during the lifecycle of + * a request object one has to keep in mind that the descriptor is used in 5 different + * contexts: + * - It represents the current state of an ingest request (current class), and + * it changes during subsequent request processing after it starts. + * - It's used in communications with the Replication system's database API + * when the persistent state of the ingest request needs to be updated. + * - It's used for coordinating and managing the request processing by the ingest + * requests manager (class IngestRequestMgr). + * - It's used for resuming the request processing after a restart of + * the Replication worker. + * - It's used by the worker's REST API for providing the status of the request + * to the clients. + * + * Altogether these requirements led to the following "copy-on-write" state management + * strategy for the descriptor in the implementation of the current class: + * - The descriptor is guarded by the mutex _contribMtx. + * - Values of non-changing attributes of the descriptor initialized by the c-tor can + * be read by any method of the current class w/o any synchronization. + * - Any changes to other attributes of the descriptior made by methods of the current + * class should be done in the transactional mode by the following sequence + * of operations: + * 1. Obtaining a copy of the descriptor by calling method transactionContribInfo() + * (that is protected by the mutex). + * 2. Modifying the copy of the descriptor. + * 3. Updating the descriptor by calling method _updateTransactionContribInfo() + * (that is protected by the mutex). + * + * This sequence ensures that clients of the request object will always get the consistent + * state of the transaction contribution descriptor, and the descriptor retrieval won't + * be blocked by any stage of the request processing. + */ + TransactionContribInfo _contrib; }; } // namespace lsst::qserv::replica