From 1a7abb4b288d2bc6639bffea37fca96ca936f03c Mon Sep 17 00:00:00 2001 From: Igor Gaponenko Date: Mon, 16 Sep 2024 18:46:37 -0700 Subject: [PATCH] Push ingest mode for contributions sent in the multipart/form-data body --- src/http/FileUploadModule.cc | 1 + src/http/FileUploadModule.h | 2 +- src/replica/ingest/CMakeLists.txt | 1 + src/replica/ingest/IngestFileHttpSvcMod.cc | 250 +++++++++++++++++++++ src/replica/ingest/IngestFileHttpSvcMod.h | 111 +++++++++ src/replica/ingest/IngestHttpSvc.cc | 5 + 6 files changed, 369 insertions(+), 1 deletion(-) create mode 100644 src/replica/ingest/IngestFileHttpSvcMod.cc create mode 100644 src/replica/ingest/IngestFileHttpSvcMod.h diff --git a/src/http/FileUploadModule.cc b/src/http/FileUploadModule.cc index eaa6c7164..2888ee93f 100644 --- a/src/http/FileUploadModule.cc +++ b/src/http/FileUploadModule.cc @@ -62,6 +62,7 @@ void FileUploadModule::execute(string const& subModuleName, http::AuthType const [&](httplib::MultipartFormData const& file) -> bool { processEndOfEntry(); if (!file.filename.empty()) { + enforceAuthorization(authType); onStartOfFile(file.name, file.filename, file.content_type); } *currentFile = file; diff --git a/src/http/FileUploadModule.h b/src/http/FileUploadModule.h index 8d013935c..86b2d0021 100644 --- a/src/http/FileUploadModule.h +++ b/src/http/FileUploadModule.h @@ -129,7 +129,7 @@ class FileUploadModule : public BaseModule { * @param data The data of the file. * @param length The length of the data. */ - virtual void onFileData(char const* data, size_t length) = 0; + virtual void onFileData(char const* data, std::size_t length) = 0; /** * Is called when the file parsing is finished. diff --git a/src/replica/ingest/CMakeLists.txt b/src/replica/ingest/CMakeLists.txt index 8e9b1bd15..90c64d887 100644 --- a/src/replica/ingest/CMakeLists.txt +++ b/src/replica/ingest/CMakeLists.txt @@ -3,6 +3,7 @@ add_dependencies(replica_ingest replica_proto) target_sources(replica_ingest PRIVATE IngestClient.cc IngestDataHttpSvcMod.cc + IngestFileHttpSvcMod.cc IngestFileSvc.cc IngestHttpSvc.cc IngestHttpSvcMod.cc diff --git a/src/replica/ingest/IngestFileHttpSvcMod.cc b/src/replica/ingest/IngestFileHttpSvcMod.cc new file mode 100644 index 000000000..c2219b2a9 --- /dev/null +++ b/src/replica/ingest/IngestFileHttpSvcMod.cc @@ -0,0 +1,250 @@ +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ + +// Class header +#include "replica/ingest/IngestFileHttpSvcMod.h" + +// Qserv header +#include "http/BinaryEncoding.h" +#include "http/Exceptions.h" +#include "http/Url.h" +#include "replica/config/Configuration.h" +#include "replica/services/DatabaseServices.h" +#include "replica/services/ServiceProvider.h" +#include "replica/util/Csv.h" + +// System headers +#include +#include +#include + +// Third party headers +#include "httplib.h" + +using namespace std; +using json = nlohmann::json; + +namespace lsst::qserv::replica { + +void IngestFileHttpSvcMod::process(shared_ptr const& serviceProvider, + string const& workerName, httplib::Request const& req, + httplib::Response& resp, httplib::ContentReader const& contentReader, + http::AuthType const authType) { + IngestFileHttpSvcMod module(serviceProvider, workerName, req, resp, contentReader); + string const subModuleName; + module.execute(subModuleName, authType); +} + +IngestFileHttpSvcMod::IngestFileHttpSvcMod(shared_ptr const& serviceProvider, + string const& workerName, httplib::Request const& req, + httplib::Response& resp, + httplib::ContentReader const& contentReader) + : http::FileUploadModule(serviceProvider->authKey(), serviceProvider->adminAuthKey(), req, resp, + contentReader), + IngestFileSvc(serviceProvider, workerName) {} + +string IngestFileHttpSvcMod::context() const { return "INGEST-FILE-HTTP-SVC "; } + +void IngestFileHttpSvcMod::onStartOfFile(string const& name, string const& fileName, + string const& contentType) { + debug(__func__); + checkApiVersion(__func__, 38); + enforceInstanceId(__func__, serviceProvider()->instanceId()); + + auto const context_ = context() + __func__; + auto const config = serviceProvider()->config(); + auto const databaseServices = serviceProvider()->databaseServices(); + + if (isOpen()) { + throw http::Error(context_, "a file is already opened"); + } + if (!_contrib.tmpFile.empty()) { + throw http::Error(context_, "the service only allows one file per request"); + } + + // Fill out parameters in the contribution descriptor. This information is needed + // for bookeeping and monitoring purposes. The descriptor's state will be kept + // updated in the Replication/Ingest's database as the contribution processing + // will be happening. + _contrib.transactionId = body().required("transaction_id"); + _contrib.table = body().required("table"); + _contrib.chunk = body().required("chunk"); + _contrib.isOverlap = body().required("overlap") != 0; + _contrib.worker = workerName(); + + // To indicate the file contents was streamed directly into the service + _contrib.url = "data-csv://" + req().remote_addr + "/" + fileName; + _contrib.charsetName = + body().optional("charset_name", config->get("worker", "ingest-charset-name")); + + // Retries are allowed before an attemp to load data into MySQL. When such attempt + // is made the persistent state of the destination table is supposed to be changed. + _contrib.retryAllowed = true; + + // This parameters sets a limit foe the number of warnings (should there be any) + // reported by MySQL after contribution loading attempt. Warnings is an important + // mechanism for debugging problems with the ingested data. + _contrib.maxNumWarnings = body().optional( + "max_num_warnings", config->get("worker", "loader-max-warnings")); + + debug(__func__, "transaction_id: " + to_string(_contrib.transactionId)); + debug(__func__, "table: '" + _contrib.table + "'"); + debug(__func__, "chunk: " + to_string(_contrib.chunk)); + debug(__func__, "overlap: " + string(_contrib.isOverlap ? "1" : "0")); + debug(__func__, "charset_name: '" + _contrib.charsetName + "'"); + debug(__func__, "max_num_warnings: " + to_string(_contrib.maxNumWarnings)); + + // Attempts to pass invalid transaction identifiers or tables are not recorded + // as transaction contributions in the persistent state of the Replication/Ingest + // system since it's impossible to determine a context of these operations. + // The following operations will throw exceptions should any problems with + // validation a context of the request will be encountered. + TransactionInfo const trans = databaseServices->transaction(_contrib.transactionId); + _contrib.database = trans.database; + + DatabaseInfo const database = config->databaseInfo(_contrib.database); + TableInfo const table = database.findTable(_contrib.table); + + // Prescreen parameters of the request to ensure they're valid in the given + // contex. Check the state of the transaction. Refuse to proceed with the request + // if any issues were detected. + + bool const failed = true; + + if (trans.state != TransactionInfo::State::STARTED) { + _contrib.error = context_ + " transactionId=" + to_string(_contrib.transactionId) + " is not active"; + _contrib = databaseServices->createdTransactionContrib(_contrib, failed); + _failed(_contrib.error); + throw http::Error(context_, _contrib.error); + } + + csv::Dialect dialect; + try { + http::Url const resource(_contrib.url); + if (resource.scheme() != http::Url::FILE) { + throw invalid_argument(context_ + " unsupported url '" + _contrib.url + "'"); + } + dialect = csv::Dialect(_contrib.dialectInput); + _parser.reset(new csv::Parser(dialect)); + } catch (exception const& ex) { + _contrib.error = ex.what(); + _contrib = databaseServices->createdTransactionContrib(_contrib, failed); + _failed(_contrib.error); + throw; + } + + // Register the contribution + _contrib = databaseServices->createdTransactionContrib(_contrib); + + // This is where the actual processing of the request begins. + try { + _contrib.tmpFile = openFile(_contrib.transactionId, _contrib.table, dialect, _contrib.charsetName, + _contrib.chunk, _contrib.isOverlap); + _contrib = databaseServices->startedTransactionContrib(_contrib); + } catch (http::Error const& ex) { + json const errorExt = ex.errorExt(); + if (!errorExt.empty()) { + _contrib.httpError = errorExt["http_error"]; + _contrib.systemError = errorExt["system_error"]; + _contrib.retryAllowed = errorExt["retry_allowed"].get() != 0; + } + _contrib.error = ex.what(); + _contrib = databaseServices->startedTransactionContrib(_contrib, failed); + _failed(_contrib.error); + throw; + } catch (exception const& ex) { + _contrib.systemError = errno; + _contrib.error = ex.what(); + _contrib = databaseServices->startedTransactionContrib(_contrib, failed); + _failed(_contrib.error); + throw; + } +} + +void IngestFileHttpSvcMod::onFileData(char const* data, size_t length) { + auto const context_ = context() + __func__; + if (!isOpen()) { + throw http::Error(context_, "no file was opened"); + } + _parseAndWriteData(data, length, false); +} + +void IngestFileHttpSvcMod::onEndOfFile() { + auto const context_ = context() + __func__; + if (!isOpen()) { + throw http::Error(context_, "no file was opened"); + } + + // Flush the parser to ensure the last row (if any) has been writen + // into the output file. + char const data[0] = {}; + size_t const length = 0; + _parseAndWriteData(data, length, true); + + // Report that processing of the input data and preparing the contribution file is over. + auto const databaseServices = serviceProvider()->databaseServices(); + _contrib = databaseServices->readTransactionContrib(_contrib); + + // Finished reading and preprocessing the input file. + // Begin making irreversible changes to the destination table. + _contrib.retryAllowed = false; + try { + loadDataIntoTable(_contrib.maxNumWarnings); + _contrib.numWarnings = numWarnings(); + _contrib.warnings = warnings(); + _contrib.numRowsLoaded = numRowsLoaded(); + _contrib = databaseServices->loadedTransactionContrib(_contrib); + closeFile(); + } catch (exception const& ex) { + _contrib.error = "MySQL load failed, ex: " + string(ex.what()); + _contrib.systemError = errno; + bool const failed = true; + databaseServices->loadedTransactionContrib(_contrib, failed); + _failed(context_); + throw http::Error(context_, _contrib.error); + } +} + +json IngestFileHttpSvcMod::onEndOfBody() { + auto const context_ = context() + __func__; + if (_contrib.tmpFile.empty()) { + throw http::Error(context_, "no file was sent in the request"); + } + if (isOpen()) { + throw http::Error(context_, "the file is still open"); + } + return json::object({{"contrib", _contrib.toJson()}}); +} + +void IngestFileHttpSvcMod::_parseAndWriteData(char const* data, size_t length, bool flush) { + _parser->parse(data, length, flush, [&](char const* buf, size_t size) { + writeRowIntoFile(buf, size); + _contrib.numRows++; + }); + _contrib.numBytes += length; // count unmodified input data +} + +void IngestFileHttpSvcMod::_failed(string const& context_) { + error(context_, _contrib.error); + closeFile(); +} + +} // namespace lsst::qserv::replica diff --git a/src/replica/ingest/IngestFileHttpSvcMod.h b/src/replica/ingest/IngestFileHttpSvcMod.h new file mode 100644 index 000000000..d66654a95 --- /dev/null +++ b/src/replica/ingest/IngestFileHttpSvcMod.h @@ -0,0 +1,111 @@ +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ +#ifndef LSST_QSERV_INGESTFILEHTTPSVCMOD_H +#define LSST_QSERV_INGESTFILEHTTPSVCMOD_H + +// System headers +#include +#include + +// Third party headers +#include "nlohmann/json.hpp" + +// Qserv headers +#include "http/FileUploadModule.h" +#include "replica/ingest/IngestFileSvc.h" +#include "replica/ingest/TransactionContrib.h" + +// Forward declarations +namespace lsst::qserv::replica { +class ServiceProvider; +} // namespace lsst::qserv::replica + +namespace lsst::qserv::replica::csv { +class Parser; +} // namespace lsst::qserv::replica::csv + +// This header declarations +namespace lsst::qserv::replica { + +/** + * Class IngestFileHttpSvcMod processes chunk/table contribution requests made over HTTP. + * The class is used by the HTTP server built into the worker Ingest service. + * The current class is meant to be used for ingesting payloads that are pushed directly + * into the service over the HTTP protocol in the "multipart/form-data" body of the request. + */ +class IngestFileHttpSvcMod : public http::FileUploadModule, public IngestFileSvc { +public: + IngestFileHttpSvcMod() = delete; + IngestFileHttpSvcMod(IngestFileHttpSvcMod const&) = delete; + IngestFileHttpSvcMod& operator=(IngestFileHttpSvcMod const&) = delete; + + virtual ~IngestFileHttpSvcMod() = default; + + /** + * Process a request. + * + * @param serviceProvider The provider of services is needed to access + * the configuration and the database services. + * @param workerName The name of a worker this service is acting upon (used to pull + * worker-specific configuration options for the service). + * @param req The HTTP request. + * @param resp The HTTP response channel. + * @param contentReader The content reader to be used for the file upload. + * @param authType The authorization requirements for the module + * @throws std::invalid_argument for unknown values of parameter 'subModuleName' + */ + static void process(std::shared_ptr const& serviceProvider, + std::string const& workerName, httplib::Request const& req, httplib::Response& resp, + httplib::ContentReader const& contentReader, + http::AuthType const authType = http::AuthType::REQUIRED); + +protected: + virtual std::string context() const final; + virtual void onStartOfFile(std::string const& name, std::string const& fileName, + std::string const& contentType) final; + virtual void onFileData(char const* data, std::size_t length) final; + virtual void onEndOfFile() final; + virtual nlohmann::json onEndOfBody() final; + +private: + /// @see method IngestFileHttpSvcMod::create() + IngestFileHttpSvcMod(std::shared_ptr const& serviceProvider, + std::string const& workerName, httplib::Request const& req, httplib::Response& resp, + httplib::ContentReader const& contentReader); + + void _parseAndWriteData(char const* data, std::size_t length, bool flush); + + /** + * Close the temporary file if needed and post an error message. + * @param context_ The caller's context. + */ + void _failed(std::string const& context_); + + TransactionContribInfo _contrib; ///< A state of the contribution processing + + /// The parse of the input stream as configured for the CSV dialect reported + /// by a client. + std::unique_ptr _parser; +}; + +} // namespace lsst::qserv::replica + +#endif // LSST_QSERV_INGESTFILEHTTPSVCMOD_H diff --git a/src/replica/ingest/IngestHttpSvc.cc b/src/replica/ingest/IngestHttpSvc.cc index 24d816fd7..4583f0180 100644 --- a/src/replica/ingest/IngestHttpSvc.cc +++ b/src/replica/ingest/IngestHttpSvc.cc @@ -30,6 +30,7 @@ #include "http/ChttpMetaModule.h" #include "replica/config/Configuration.h" #include "replica/ingest/IngestDataHttpSvcMod.h" +#include "replica/ingest/IngestFileHttpSvcMod.h" #include "replica/ingest/IngestHttpSvcMod.h" #include "replica/ingest/IngestRequest.h" #include "replica/ingest/IngestRequestMgr.h" @@ -81,6 +82,10 @@ void IngestHttpSvc::registerServices(unique_ptr const& server) IngestDataHttpSvcMod::process(self->serviceProvider(), self->_workerName, req, resp, "SYNC-PROCESS-DATA"); }); + server->Post("/ingest/csv", [self](httplib::Request const& req, httplib::Response& resp, + httplib::ContentReader const& contentReader) { + IngestFileHttpSvcMod::process(self->serviceProvider(), self->_workerName, req, resp, contentReader); + }); server->Post("/ingest/file", [self](httplib::Request const& req, httplib::Response& resp) { IngestHttpSvcMod::process(self->serviceProvider(), self->_requestMgr, self->_workerName, req, resp, "SYNC-PROCESS");