Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-46017: A possible lockup in the Qserv worker ingest service #865

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 119 additions & 87 deletions src/replica/ingest/IngestRequest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -243,25 +243,26 @@ IngestRequest::IngestRequest(shared_ptr<ServiceProvider> const& serviceProvider,
unsigned int maxNumWarnings, unsigned int maxRetries)
: IngestFileSvc(serviceProvider, workerName) {
// Initialize the descriptor
_contrib.transactionId = transactionId;
_contrib.table = table;
_contrib.chunk = chunk;
_contrib.isOverlap = isOverlap;
_contrib.worker = workerName;
_contrib.url = url;
_contrib.charsetName = charsetName;
_contrib.async = async;
_contrib.dialectInput = dialectInput;
_contrib.httpMethod = httpMethod;
_contrib.httpData = httpData;
_contrib.httpHeaders = httpHeaders;
TransactionContribInfo contrib;
contrib.transactionId = transactionId;
contrib.table = table;
contrib.chunk = chunk;
contrib.isOverlap = isOverlap;
contrib.worker = workerName;
contrib.url = url;
contrib.charsetName = charsetName;
contrib.async = async;
contrib.dialectInput = dialectInput;
contrib.httpMethod = httpMethod;
contrib.httpData = httpData;
contrib.httpHeaders = httpHeaders;
if (maxNumWarnings == 0) {
_contrib.maxNumWarnings =
contrib.maxNumWarnings =
serviceProvider->config()->get<unsigned int>("worker", "loader-max-warnings");
} else {
_contrib.maxNumWarnings = maxNumWarnings;
contrib.maxNumWarnings = maxNumWarnings;
}
_contrib.maxRetries = std::min(
contrib.maxRetries = std::min(
maxRetries, serviceProvider->config()->get<unsigned int>("worker", "ingest-max-retries"));

// Prescreen parameters of the request to ensure the request has a valid
Expand All @@ -271,37 +272,38 @@ IngestRequest::IngestRequest(shared_ptr<ServiceProvider> const& serviceProvider,
bool const failed = true;
auto const config = serviceProvider->config();
auto const databaseServices = serviceProvider->databaseServices();
auto const trans = databaseServices->transaction(_contrib.transactionId);
auto const trans = databaseServices->transaction(contrib.transactionId);

_contrib.database = trans.database;
contrib.database = trans.database;

DatabaseInfo const database = config->databaseInfo(_contrib.database);
if (!database.tableExists(_contrib.table)) {
throw invalid_argument(context + "no such table '" + _contrib.table + "' in database '" +
_contrib.database + "'.");
DatabaseInfo const database = config->databaseInfo(contrib.database);
if (!database.tableExists(contrib.table)) {
throw invalid_argument(context + "no such table '" + contrib.table + "' in database '" +
contrib.database + "'.");
}

// Any failures detected hereafter will result in registering the contribution
// as failed for further analysis by the ingest workflows.
try {
IngestRequest::_validateState(trans, database, _contrib);
_resource.reset(new http::Url(_contrib.url));
IngestRequest::_validateState(trans, database, contrib);
_resource.reset(new http::Url(contrib.url));
switch (_resource->scheme()) {
case http::Url::FILE:
case http::Url::HTTP:
case http::Url::HTTPS:
break;
default:
throw invalid_argument(context + "unsupported url '" + _contrib.url + "'");
throw invalid_argument(context + "unsupported url '" + contrib.url + "'");
}
_dialect = csv::Dialect(dialectInput);
} catch (exception const& ex) {
_contrib.error = context + ex.what();
_contrib.retryAllowed = false;
_contrib = databaseServices->createdTransactionContrib(_contrib, failed);
auto contrib = transactionContribInfo();
contrib.error = context + ex.what();
contrib.retryAllowed = false;
_updateTransactionContribInfo(databaseServices->createdTransactionContrib(contrib, failed));
throw;
}
_contrib = databaseServices->createdTransactionContrib(_contrib);
_updateTransactionContribInfo(databaseServices->createdTransactionContrib(contrib));
}

IngestRequest::IngestRequest(shared_ptr<ServiceProvider> const& serviceProvider, string const& workerName,
Expand All @@ -318,10 +320,16 @@ IngestRequest::IngestRequest(TransactionContribInfo const& contrib)

TransactionContribInfo IngestRequest::transactionContribInfo() const {
string const context = ::context_ + string(__func__) + " ";
replica::Lock lock(_mtx, context);
replica::Lock lock(_contribMtx, context);
return _contrib;
}

void IngestRequest::_updateTransactionContribInfo(TransactionContribInfo const& contrib) {
string const context = ::context_ + string(__func__) + " ";
replica::Lock lock(_contribMtx, context);
_contrib = contrib;
}

void IngestRequest::process() {
// No actual processing for the test requests made for unit testing.
if (serviceProvider() == nullptr) return;
Expand All @@ -347,19 +355,19 @@ void IngestRequest::_processStart() {
string const context = ::context_ + string(__func__) + " ";
replica::Lock const lock(_mtx, context);

if (_processing) {
if (_processing.exchange(true)) {
throw logic_error(context + "the contribution request " + to_string(_contrib.id) +
" is already being processed or has been processed.");
}
_processing = true;

bool const failed = true;
auto const databaseServices = serviceProvider()->databaseServices();
if (_cancelled) {
_contrib.error = "cancelled before beginning processing the request.";
_contrib.retryAllowed = true;
_contrib = databaseServices->startedTransactionContrib(_contrib, failed,
TransactionContribInfo::Status::CANCELLED);
auto contrib = transactionContribInfo();
contrib.error = "cancelled before beginning processing the request.";
contrib.retryAllowed = true;
_updateTransactionContribInfo(databaseServices->startedTransactionContrib(
contrib, failed, TransactionContribInfo::Status::CANCELLED));
throw IngestRequestInterrupted(context + "request " + to_string(_contrib.id) + _contrib.error);
}

Expand All @@ -381,9 +389,10 @@ void IngestRequest::_processStart() {
try {
IngestRequest::_validateState(trans, database, _contrib);
} catch (exception const& ex) {
_contrib.error = context + ex.what();
_contrib.retryAllowed = false;
_contrib = databaseServices->startedTransactionContrib(_contrib, failed);
auto contrib = transactionContribInfo();
contrib.error = context + ex.what();
contrib.retryAllowed = false;
_updateTransactionContribInfo(databaseServices->startedTransactionContrib(contrib, failed));
throw;
}

Expand All @@ -396,24 +405,27 @@ void IngestRequest::_openTmpFileAndStart(replica::Lock const& lock) {
bool const failed = true;
auto const databaseServices = serviceProvider()->databaseServices();
try {
_contrib.tmpFile = openFile(_contrib.transactionId, _contrib.table, _dialect, _contrib.charsetName,
_contrib.chunk, _contrib.isOverlap);
_contrib = databaseServices->startedTransactionContrib(_contrib);
auto contrib = transactionContribInfo();
contrib.tmpFile = openFile(contrib.transactionId, contrib.table, _dialect, contrib.charsetName,
contrib.chunk, contrib.isOverlap);
_updateTransactionContribInfo(databaseServices->startedTransactionContrib(contrib));
} catch (http::Error const& ex) {
auto contrib = transactionContribInfo();
json const errorExt = ex.errorExt();
if (!errorExt.empty()) {
_contrib.httpError = errorExt["http_error"];
_contrib.systemError = errorExt["system_error"];
contrib.httpError = errorExt["http_error"];
contrib.systemError = errorExt["system_error"];
}
_contrib.error = ex.what();
_contrib.retryAllowed = true;
_contrib = databaseServices->startedTransactionContrib(_contrib, failed);
contrib.error = ex.what();
contrib.retryAllowed = true;
_updateTransactionContribInfo(databaseServices->startedTransactionContrib(contrib, failed));
throw;
} catch (exception const& ex) {
_contrib.systemError = errno;
_contrib.error = ex.what();
_contrib.retryAllowed = true;
_contrib = databaseServices->startedTransactionContrib(_contrib, failed);
auto contrib = transactionContribInfo();
contrib.systemError = errno;
contrib.error = ex.what();
contrib.retryAllowed = true;
_updateTransactionContribInfo(databaseServices->startedTransactionContrib(contrib, failed));
throw;
}
}
Expand All @@ -430,10 +442,11 @@ void IngestRequest::_processReadData() {
while (true) {
// Start reading and preprocessing the input file.
if (_cancelled) {
_contrib.error = "cancelled before reading the input file.";
_contrib.retryAllowed = true;
_contrib = databaseServices->readTransactionContrib(_contrib, failed,
TransactionContribInfo::Status::CANCELLED);
auto contrib = transactionContribInfo();
contrib.error = "cancelled before reading the input file.";
contrib.retryAllowed = true;
_updateTransactionContribInfo(databaseServices->readTransactionContrib(
contrib, failed, TransactionContribInfo::Status::CANCELLED));
closeFile();
throw IngestRequestInterrupted(context + "request " + to_string(_contrib.id) + _contrib.error);
}
Expand All @@ -449,23 +462,25 @@ void IngestRequest::_processReadData() {
default:
throw invalid_argument(context + "unsupported url '" + _contrib.url + "'");
}
_contrib = databaseServices->readTransactionContrib(_contrib);
_updateTransactionContribInfo(databaseServices->readTransactionContrib(_contrib));
return;
} catch (http::Error const& ex) {
auto contrib = transactionContribInfo();
json const errorExt = ex.errorExt();
if (!errorExt.empty()) {
_contrib.httpError = errorExt["http_error"];
_contrib.systemError = errorExt["system_error"];
contrib.httpError = errorExt["http_error"];
contrib.systemError = errorExt["system_error"];
}
_contrib.error = ex.what();
_contrib.retryAllowed = true;
_contrib = databaseServices->readTransactionContrib(_contrib, failed);
contrib.error = ex.what();
contrib.retryAllowed = true;
_updateTransactionContribInfo(databaseServices->readTransactionContrib(contrib, failed));
if (!_closeTmpFileAndRetry(lock)) throw;
} catch (exception const& ex) {
_contrib.systemError = errno;
_contrib.error = ex.what();
_contrib.retryAllowed = true;
_contrib = databaseServices->readTransactionContrib(_contrib, failed);
auto contrib = transactionContribInfo();
contrib.systemError = errno;
contrib.error = ex.what();
contrib.retryAllowed = true;
_updateTransactionContribInfo(databaseServices->readTransactionContrib(contrib, failed));
if (!_closeTmpFileAndRetry(lock)) throw;
}
}
Expand All @@ -481,18 +496,22 @@ bool IngestRequest::_closeTmpFileAndRetry(replica::Lock const& lock) {
// into the retry. The corresponding fields of the contribution objects
// will get reset to the initial values (which are the same as in the default
// constructed retry object).
auto contrib = transactionContribInfo();
TransactionContribInfo::FailedRetry const failedRetry =
_contrib.resetForRetry(_contrib.status, _contrib.async);
contrib.resetForRetry(contrib.status, contrib.async);
_updateTransactionContribInfo(contrib);

// This method will open the new temporary file save the updated state of
// the contribution to prepare the current context for the next attempt
// to read the input data.
_openTmpFileAndStart(lock);

// The retry object has to be saved separately.
_contrib.failedRetries.push_back(failedRetry);
_contrib.numFailedRetries = _contrib.failedRetries.size();
_contrib = serviceProvider()->databaseServices()->saveLastTransactionContribRetry(_contrib);
contrib = transactionContribInfo();
contrib.failedRetries.push_back(failedRetry);
contrib.numFailedRetries = contrib.failedRetries.size();
_updateTransactionContribInfo(
serviceProvider()->databaseServices()->saveLastTransactionContribRetry(contrib));

return true;
}
Expand All @@ -507,23 +526,26 @@ void IngestRequest::_processLoadData() {
// Load the preprocessed input file into MySQL and update the persistent
// state of the contribution request.
if (_cancelled) {
_contrib.error = "cancelled before loading data into MySQL";
_contrib.retryAllowed = true;
_contrib = databaseServices->loadedTransactionContrib(_contrib, failed,
TransactionContribInfo::Status::CANCELLED);
auto contrib = transactionContribInfo();
contrib.error = "cancelled before loading data into MySQL";
contrib.retryAllowed = true;
_updateTransactionContribInfo(databaseServices->loadedTransactionContrib(
contrib, failed, TransactionContribInfo::Status::CANCELLED));
closeFile();
throw IngestRequestInterrupted(context + "request " + to_string(_contrib.id) + _contrib.error);
}
try {
loadDataIntoTable(_contrib.maxNumWarnings);
_contrib.numWarnings = numWarnings();
_contrib.warnings = warnings();
_contrib.numRowsLoaded = numRowsLoaded();
_contrib = databaseServices->loadedTransactionContrib(_contrib);
auto contrib = transactionContribInfo();
contrib.numWarnings = numWarnings();
contrib.warnings = warnings();
contrib.numRowsLoaded = numRowsLoaded();
_updateTransactionContribInfo(databaseServices->loadedTransactionContrib(contrib));
} catch (exception const& ex) {
_contrib.systemError = errno;
_contrib.error = ex.what();
_contrib = databaseServices->loadedTransactionContrib(_contrib, failed);
auto contrib = transactionContribInfo();
contrib.systemError = errno;
contrib.error = ex.what();
_updateTransactionContribInfo(databaseServices->loadedTransactionContrib(contrib, failed));
closeFile();
throw;
}
Expand All @@ -533,8 +555,8 @@ void IngestRequest::_processLoadData() {
void IngestRequest::_readLocalFile(replica::Lock const& lock) {
string const context = ::context_ + string(__func__) + " ";

_contrib.numBytes = 0;
_contrib.numRows = 0;
uint64_t numBytes = 0;
uint64_t numRows = 0;

unique_ptr<char[]> const record(new char[defaultRecordSizeBytes]);
ifstream infile(_resource->filePath(), ios::binary);
Expand All @@ -553,22 +575,27 @@ void IngestRequest::_readLocalFile(replica::Lock const& lock) {
"', errno: " + to_string(errno));
}
size_t const num = infile.gcount();
_contrib.numBytes += num;
numBytes += num;
// Flush the last record if the end of the file.
parser->parse(record.get(), num, eof, [&](char const* buf, size_t size) {
writeRowIntoFile(buf, size);
_contrib.numRows++;
numRows++;
});
} while (!eof);

auto contrib = transactionContribInfo();
contrib.numBytes = numBytes;
contrib.numRows = numRows;
_updateTransactionContribInfo(contrib);
}

void IngestRequest::_readRemoteFile(replica::Lock const& lock) {
_contrib.numBytes = 0;
_contrib.numRows = 0;
uint64_t numBytes = 0;
uint64_t numRows = 0;

auto const reportRow = [&](char const* buf, size_t size) {
writeRowIntoFile(buf, size);
_contrib.numRows++;
numRows++;
};

// The configuration may be updated later if certificate bundles were loaded
Expand Down Expand Up @@ -604,11 +631,16 @@ void IngestRequest::_readRemoteFile(replica::Lock const& lock) {
clientConfig);
reader.read([&](char const* record, size_t size) {
parser->parse(record, size, !flush, reportRow);
_contrib.numBytes += size;
numBytes += size;
});
// Flush the last non-terminated line stored in the parser (if any).
string const emptyRecord;
parser->parse(emptyRecord.data(), emptyRecord.size(), flush, reportRow);

auto contrib = transactionContribInfo();
contrib.numBytes = numBytes;
contrib.numRows = numRows;
_updateTransactionContribInfo(contrib);
}

http::ClientConfig IngestRequest::_clientConfig(replica::Lock const& lock) const {
Expand Down
Loading
Loading