Skip to content

Commit

Permalink
Refactored and fixed state management of the contribution request
Browse files Browse the repository at this point in the history
The new code also fixes a lock up that existed in the previous version
of the code.
  • Loading branch information
iagaponenko committed Sep 6, 2024
1 parent 451844b commit ad003fe
Show file tree
Hide file tree
Showing 2 changed files with 165 additions and 89 deletions.
203 changes: 118 additions & 85 deletions src/replica/ingest/IngestRequest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -243,25 +243,26 @@ IngestRequest::IngestRequest(shared_ptr<ServiceProvider> const& serviceProvider,
unsigned int maxNumWarnings, unsigned int maxRetries)
: IngestFileSvc(serviceProvider, workerName) {
// Initialize the descriptor
_contrib.transactionId = transactionId;
_contrib.table = table;
_contrib.chunk = chunk;
_contrib.isOverlap = isOverlap;
_contrib.worker = workerName;
_contrib.url = url;
_contrib.charsetName = charsetName;
_contrib.async = async;
_contrib.dialectInput = dialectInput;
_contrib.httpMethod = httpMethod;
_contrib.httpData = httpData;
_contrib.httpHeaders = httpHeaders;
TransactionContribInfo contrib;
contrib.transactionId = transactionId;
contrib.table = table;
contrib.chunk = chunk;
contrib.isOverlap = isOverlap;
contrib.worker = workerName;
contrib.url = url;
contrib.charsetName = charsetName;
contrib.async = async;
contrib.dialectInput = dialectInput;
contrib.httpMethod = httpMethod;
contrib.httpData = httpData;
contrib.httpHeaders = httpHeaders;
if (maxNumWarnings == 0) {
_contrib.maxNumWarnings =
contrib.maxNumWarnings =
serviceProvider->config()->get<unsigned int>("worker", "loader-max-warnings");
} else {
_contrib.maxNumWarnings = maxNumWarnings;
contrib.maxNumWarnings = maxNumWarnings;
}
_contrib.maxRetries = std::min(
contrib.maxRetries = std::min(
maxRetries, serviceProvider->config()->get<unsigned int>("worker", "ingest-max-retries"));

// Prescreen parameters of the request to ensure the request has a valid
Expand All @@ -271,37 +272,38 @@ IngestRequest::IngestRequest(shared_ptr<ServiceProvider> const& serviceProvider,
bool const failed = true;
auto const config = serviceProvider->config();
auto const databaseServices = serviceProvider->databaseServices();
auto const trans = databaseServices->transaction(_contrib.transactionId);
auto const trans = databaseServices->transaction(contrib.transactionId);

_contrib.database = trans.database;
contrib.database = trans.database;

DatabaseInfo const database = config->databaseInfo(_contrib.database);
if (!database.tableExists(_contrib.table)) {
throw invalid_argument(context + "no such table '" + _contrib.table + "' in database '" +
_contrib.database + "'.");
DatabaseInfo const database = config->databaseInfo(contrib.database);
if (!database.tableExists(contrib.table)) {
throw invalid_argument(context + "no such table '" + contrib.table + "' in database '" +
contrib.database + "'.");
}

// Any failures detected hereafter will result in registering the contribution
// as failed for further analysis by the ingest workflows.
try {
IngestRequest::_validateState(trans, database, _contrib);
_resource.reset(new http::Url(_contrib.url));
IngestRequest::_validateState(trans, database, contrib);
_resource.reset(new http::Url(contrib.url));
switch (_resource->scheme()) {
case http::Url::FILE:
case http::Url::HTTP:
case http::Url::HTTPS:
break;
default:
throw invalid_argument(context + "unsupported url '" + _contrib.url + "'");
throw invalid_argument(context + "unsupported url '" + contrib.url + "'");
}
_dialect = csv::Dialect(dialectInput);
} catch (exception const& ex) {
_contrib.error = context + ex.what();
_contrib.retryAllowed = false;
_contrib = databaseServices->createdTransactionContrib(_contrib, failed);
auto contrib = transactionContribInfo();
contrib.error = context + ex.what();
contrib.retryAllowed = false;
_updateTransactionContribInfo(databaseServices->createdTransactionContrib(contrib, failed));
throw;
}
_contrib = databaseServices->createdTransactionContrib(_contrib);
_updateTransactionContribInfo(databaseServices->createdTransactionContrib(contrib));
}

IngestRequest::IngestRequest(shared_ptr<ServiceProvider> const& serviceProvider, string const& workerName,
Expand All @@ -318,10 +320,16 @@ IngestRequest::IngestRequest(TransactionContribInfo const& contrib)

TransactionContribInfo IngestRequest::transactionContribInfo() const {
string const context = ::context_ + string(__func__) + " ";
replica::Lock lock(_mtx, context);
replica::Lock lock(_contribMtx, context);
return _contrib;
}

void IngestRequest::_updateTransactionContribInfo(TransactionContribInfo const& contrib) {
string const context = ::context_ + string(__func__) + " ";
replica::Lock lock(_contribMtx, context);
_contrib = contrib;
}

void IngestRequest::process() {
// No actual processing for the test requests made for unit testing.
if (serviceProvider() == nullptr) return;
Expand Down Expand Up @@ -355,10 +363,11 @@ void IngestRequest::_processStart() {
bool const failed = true;
auto const databaseServices = serviceProvider()->databaseServices();
if (_cancelled) {
_contrib.error = "cancelled before beginning processing the request.";
_contrib.retryAllowed = true;
_contrib = databaseServices->startedTransactionContrib(_contrib, failed,
TransactionContribInfo::Status::CANCELLED);
auto contrib = transactionContribInfo();
contrib.error = "cancelled before beginning processing the request.";
contrib.retryAllowed = true;
_updateTransactionContribInfo(databaseServices->startedTransactionContrib(
contrib, failed, TransactionContribInfo::Status::CANCELLED));
throw IngestRequestInterrupted(context + "request " + to_string(_contrib.id) + _contrib.error);
}

Expand All @@ -380,9 +389,10 @@ void IngestRequest::_processStart() {
try {
IngestRequest::_validateState(trans, database, _contrib);
} catch (exception const& ex) {
_contrib.error = context + ex.what();
_contrib.retryAllowed = false;
_contrib = databaseServices->startedTransactionContrib(_contrib, failed);
auto contrib = transactionContribInfo();
contrib.error = context + ex.what();
contrib.retryAllowed = false;
_updateTransactionContribInfo(databaseServices->startedTransactionContrib(contrib, failed));
throw;
}

Expand All @@ -395,24 +405,27 @@ void IngestRequest::_openTmpFileAndStart(replica::Lock const& lock) {
bool const failed = true;
auto const databaseServices = serviceProvider()->databaseServices();
try {
_contrib.tmpFile = openFile(_contrib.transactionId, _contrib.table, _dialect, _contrib.charsetName,
_contrib.chunk, _contrib.isOverlap);
_contrib = databaseServices->startedTransactionContrib(_contrib);
auto contrib = transactionContribInfo();
contrib.tmpFile = openFile(contrib.transactionId, contrib.table, _dialect, contrib.charsetName,
contrib.chunk, contrib.isOverlap);
_updateTransactionContribInfo(databaseServices->startedTransactionContrib(contrib));
} catch (http::Error const& ex) {
auto contrib = transactionContribInfo();
json const errorExt = ex.errorExt();
if (!errorExt.empty()) {
_contrib.httpError = errorExt["http_error"];
_contrib.systemError = errorExt["system_error"];
contrib.httpError = errorExt["http_error"];
contrib.systemError = errorExt["system_error"];
}
_contrib.error = ex.what();
_contrib.retryAllowed = true;
_contrib = databaseServices->startedTransactionContrib(_contrib, failed);
contrib.error = ex.what();
contrib.retryAllowed = true;
_updateTransactionContribInfo(databaseServices->startedTransactionContrib(contrib, failed));
throw;
} catch (exception const& ex) {
_contrib.systemError = errno;
_contrib.error = ex.what();
_contrib.retryAllowed = true;
_contrib = databaseServices->startedTransactionContrib(_contrib, failed);
auto contrib = transactionContribInfo();
contrib.systemError = errno;
contrib.error = ex.what();
contrib.retryAllowed = true;
_updateTransactionContribInfo(databaseServices->startedTransactionContrib(contrib, failed));
throw;
}
}
Expand All @@ -429,10 +442,11 @@ void IngestRequest::_processReadData() {
while (true) {
// Start reading and preprocessing the input file.
if (_cancelled) {
_contrib.error = "cancelled before reading the input file.";
_contrib.retryAllowed = true;
_contrib = databaseServices->readTransactionContrib(_contrib, failed,
TransactionContribInfo::Status::CANCELLED);
auto contrib = transactionContribInfo();
contrib.error = "cancelled before reading the input file.";
contrib.retryAllowed = true;
_updateTransactionContribInfo(databaseServices->readTransactionContrib(
contrib, failed, TransactionContribInfo::Status::CANCELLED));
closeFile();
throw IngestRequestInterrupted(context + "request " + to_string(_contrib.id) + _contrib.error);
}
Expand All @@ -448,23 +462,25 @@ void IngestRequest::_processReadData() {
default:
throw invalid_argument(context + "unsupported url '" + _contrib.url + "'");
}
_contrib = databaseServices->readTransactionContrib(_contrib);
_updateTransactionContribInfo(databaseServices->readTransactionContrib(_contrib));
return;
} catch (http::Error const& ex) {
auto contrib = transactionContribInfo();
json const errorExt = ex.errorExt();
if (!errorExt.empty()) {
_contrib.httpError = errorExt["http_error"];
_contrib.systemError = errorExt["system_error"];
contrib.httpError = errorExt["http_error"];
contrib.systemError = errorExt["system_error"];
}
_contrib.error = ex.what();
_contrib.retryAllowed = true;
_contrib = databaseServices->readTransactionContrib(_contrib, failed);
contrib.error = ex.what();
contrib.retryAllowed = true;
_updateTransactionContribInfo(databaseServices->readTransactionContrib(contrib, failed));
if (!_closeTmpFileAndRetry(lock)) throw;
} catch (exception const& ex) {
_contrib.systemError = errno;
_contrib.error = ex.what();
_contrib.retryAllowed = true;
_contrib = databaseServices->readTransactionContrib(_contrib, failed);
auto contrib = transactionContribInfo();
contrib.systemError = errno;
contrib.error = ex.what();
contrib.retryAllowed = true;
_updateTransactionContribInfo(databaseServices->readTransactionContrib(contrib, failed));
if (!_closeTmpFileAndRetry(lock)) throw;
}
}
Expand All @@ -480,18 +496,22 @@ bool IngestRequest::_closeTmpFileAndRetry(replica::Lock const& lock) {
// into the retry. The corresponding fields of the contribution objects
// will get reset to the initial values (which are the same as in the default
// constructed retry object).
auto contrib = transactionContribInfo();
TransactionContribInfo::FailedRetry const failedRetry =
_contrib.resetForRetry(_contrib.status, _contrib.async);
contrib.resetForRetry(contrib.status, contrib.async);
_updateTransactionContribInfo(contrib);

// This method will open the new temporary file save the updated state of
// the contribution to prepare the current context for the next attempt
// to read the input data.
_openTmpFileAndStart(lock);

// The retry object has to be saved separately.
_contrib.failedRetries.push_back(failedRetry);
_contrib.numFailedRetries = _contrib.failedRetries.size();
_contrib = serviceProvider()->databaseServices()->saveLastTransactionContribRetry(_contrib);
contrib = transactionContribInfo();
contrib.failedRetries.push_back(failedRetry);
contrib.numFailedRetries = contrib.failedRetries.size();
_updateTransactionContribInfo(
serviceProvider()->databaseServices()->saveLastTransactionContribRetry(contrib));

return true;
}
Expand All @@ -506,23 +526,26 @@ void IngestRequest::_processLoadData() {
// Load the preprocessed input file into MySQL and update the persistent
// state of the contribution request.
if (_cancelled) {
_contrib.error = "cancelled before loading data into MySQL";
_contrib.retryAllowed = true;
_contrib = databaseServices->loadedTransactionContrib(_contrib, failed,
TransactionContribInfo::Status::CANCELLED);
auto contrib = transactionContribInfo();
contrib.error = "cancelled before loading data into MySQL";
contrib.retryAllowed = true;
_updateTransactionContribInfo(databaseServices->loadedTransactionContrib(
contrib, failed, TransactionContribInfo::Status::CANCELLED));
closeFile();
throw IngestRequestInterrupted(context + "request " + to_string(_contrib.id) + _contrib.error);
}
try {
loadDataIntoTable(_contrib.maxNumWarnings);
_contrib.numWarnings = numWarnings();
_contrib.warnings = warnings();
_contrib.numRowsLoaded = numRowsLoaded();
_contrib = databaseServices->loadedTransactionContrib(_contrib);
auto contrib = transactionContribInfo();
contrib.numWarnings = numWarnings();
contrib.warnings = warnings();
contrib.numRowsLoaded = numRowsLoaded();
_updateTransactionContribInfo(databaseServices->loadedTransactionContrib(contrib));
} catch (exception const& ex) {
_contrib.systemError = errno;
_contrib.error = ex.what();
_contrib = databaseServices->loadedTransactionContrib(_contrib, failed);
auto contrib = transactionContribInfo();
contrib.systemError = errno;
contrib.error = ex.what();
_updateTransactionContribInfo(databaseServices->loadedTransactionContrib(contrib, failed));
closeFile();
throw;
}
Expand All @@ -532,8 +555,8 @@ void IngestRequest::_processLoadData() {
void IngestRequest::_readLocalFile(replica::Lock const& lock) {
string const context = ::context_ + string(__func__) + " ";

_contrib.numBytes = 0;
_contrib.numRows = 0;
uint64_t numBytes = 0;
uint64_t numRows = 0;

unique_ptr<char[]> const record(new char[defaultRecordSizeBytes]);
ifstream infile(_resource->filePath(), ios::binary);
Expand All @@ -552,22 +575,27 @@ void IngestRequest::_readLocalFile(replica::Lock const& lock) {
"', errno: " + to_string(errno));
}
size_t const num = infile.gcount();
_contrib.numBytes += num;
numBytes += num;
// Flush the last record if the end of the file.
parser->parse(record.get(), num, eof, [&](char const* buf, size_t size) {
writeRowIntoFile(buf, size);
_contrib.numRows++;
numRows++;
});
} while (!eof);

auto contrib = transactionContribInfo();
contrib.numBytes = numBytes;
contrib.numRows = numRows;
_updateTransactionContribInfo(contrib);
}

void IngestRequest::_readRemoteFile(replica::Lock const& lock) {
_contrib.numBytes = 0;
_contrib.numRows = 0;
uint64_t numBytes = 0;
uint64_t numRows = 0;

auto const reportRow = [&](char const* buf, size_t size) {
writeRowIntoFile(buf, size);
_contrib.numRows++;
numRows++;
};

// The configuration may be updated later if certificate bundles were loaded
Expand Down Expand Up @@ -603,11 +631,16 @@ void IngestRequest::_readRemoteFile(replica::Lock const& lock) {
clientConfig);
reader.read([&](char const* record, size_t size) {
parser->parse(record, size, !flush, reportRow);
_contrib.numBytes += size;
numBytes += size;
});
// Flush the last non-terminated line stored in the parser (if any).
string const emptyRecord;
parser->parse(emptyRecord.data(), emptyRecord.size(), flush, reportRow);

auto contrib = transactionContribInfo();
contrib.numBytes = numBytes;
contrib.numRows = numRows;
_updateTransactionContribInfo(contrib);
}

http::ClientConfig IngestRequest::_clientConfig(replica::Lock const& lock) const {
Expand Down
Loading

0 comments on commit ad003fe

Please sign in to comment.