From 24fdf1cf210830d885d06666a4149c468310a2f9 Mon Sep 17 00:00:00 2001 From: John Gates Date: Fri, 2 Feb 2018 13:09:39 -0800 Subject: [PATCH 001/103] Modified FileServer and added lader to build. --- core/modules/SConscript | 5 + core/modules/loader/FileServer.cc | 383 +++++++ core/modules/loader/FileServer.h | 181 +++ core/modules/proto/SConscript | 4 + core/modules/proto/loader.proto | 49 + core/modules/replica/CmdParser.cc | 253 ++++ core/modules/replica/CmdParser.h | 276 +++++ core/modules/replica/TODO.txt | 4 + core/modules/replica/replica_calculate_cs.cc | 67 ++ .../replica/replica_controller_admin.cc | 202 ++++ .../replica/replica_controller_chunks.cc | 174 +++ .../modules/replica/replica_controller_cmd.cc | 383 +++++++ .../replica/replica_controller_purge.cc | 211 ++++ .../replica/replica_controller_replicate.cc | 235 ++++ .../replica/replica_controller_test.cc | 343 ++++++ core/modules/replica/replica_copy_file.cc | 112 ++ core/modules/replica/replica_embedded_test.cc | 133 +++ core/modules/replica/replica_file_read.cc | 121 ++ core/modules/replica/replica_file_server.cc | 83 ++ core/modules/replica/replica_fill_file.cc | 89 ++ core/modules/replica/replica_job_chunks.cc | 259 +++++ .../replica/replica_job_delete_worker.cc | 209 ++++ core/modules/replica/replica_job_fixup.cc | 229 ++++ core/modules/replica/replica_job_move.cc | 215 ++++ core/modules/replica/replica_job_purge.cc | 157 +++ core/modules/replica/replica_job_rebalance.cc | 249 ++++ core/modules/replica/replica_job_replicate.cc | 156 +++ core/modules/replica/replica_job_verify.cc | 174 +++ core/modules/replica/replica_jobctrl_test.cc | 152 +++ core/modules/replica/replica_json_test.cc | 12 + core/modules/replica/replica_master.cc | 267 +++++ .../modules/replica/replica_messenger_test.cc | 153 +++ core/modules/replica/replica_mysql_test.cc | 282 +++++ core/modules/replica/replica_test_sql.cc | 140 +++ core/modules/replica/replica_worker.cc | 102 ++ core/modules/replica/replication.sql | 564 +++++++++ .../replica/replication_config_dev.sql | 79 ++ .../replica/replication_config_nebula.sql | 84 ++ core/modules/replica_core/BlockPost.cc | 78 ++ core/modules/replica_core/BlockPost.h | 110 ++ core/modules/replica_core/ChunkLocker.cc | 199 ++++ core/modules/replica_core/ChunkLocker.h | 222 ++++ core/modules/replica_core/Common.cc | 206 ++++ core/modules/replica_core/Common.h | 179 +++ core/modules/replica_core/Configuration.cc | 317 +++++ core/modules/replica_core/Configuration.h | 452 ++++++++ .../modules/replica_core/ConfigurationFile.cc | 246 ++++ core/modules/replica_core/ConfigurationFile.h | 123 ++ .../replica_core/ConfigurationMySQL.cc | 277 +++++ .../modules/replica_core/ConfigurationMySQL.h | 120 ++ core/modules/replica_core/Controller.cc | 879 ++++++++++++++ core/modules/replica_core/Controller.h | 689 +++++++++++ core/modules/replica_core/DatabaseMySQL.cc | 538 +++++++++ core/modules/replica_core/DatabaseMySQL.h | 918 +++++++++++++++ core/modules/replica_core/DatabaseServices.cc | 139 +++ core/modules/replica_core/DatabaseServices.h | 247 ++++ .../replica_core/DatabaseServicesMySQL.cc | 1016 +++++++++++++++++ .../replica_core/DatabaseServicesMySQL.h | 209 ++++ core/modules/replica_core/DeleteRequest.cc | 718 ++++++++++++ core/modules/replica_core/DeleteRequest.h | 370 ++++++ core/modules/replica_core/DeleteWorkerJob.cc | 540 +++++++++ core/modules/replica_core/DeleteWorkerJob.h | 301 +++++ core/modules/replica_core/ErrorReporting.h | 75 ++ core/modules/replica_core/FileClient.cc | 336 ++++++ core/modules/replica_core/FileClient.h | 214 ++++ core/modules/replica_core/FileServer.cc | 131 +++ core/modules/replica_core/FileServer.h | 140 +++ .../replica_core/FileServerConnection.cc | 383 +++++++ .../replica_core/FileServerConnection.h | 218 ++++ core/modules/replica_core/FileUtils.cc | 355 ++++++ core/modules/replica_core/FileUtils.h | 338 ++++++ core/modules/replica_core/FindAllJob.cc | 357 ++++++ core/modules/replica_core/FindAllJob.h | 273 +++++ core/modules/replica_core/FindAllRequest.cc | 698 +++++++++++ core/modules/replica_core/FindAllRequest.h | 359 ++++++ core/modules/replica_core/FindRequest.cc | 726 ++++++++++++ core/modules/replica_core/FindRequest.h | 380 ++++++ core/modules/replica_core/FixUpJob.cc | 451 ++++++++ core/modules/replica_core/FixUpJob.h | 284 +++++ core/modules/replica_core/Job.cc | 177 +++ core/modules/replica_core/Job.h | 312 +++++ core/modules/replica_core/JobController.cc | 537 +++++++++ core/modules/replica_core/JobController.h | 432 +++++++ core/modules/replica_core/Messenger.cc | 96 ++ core/modules/replica_core/Messenger.h | 176 +++ .../replica_core/MessengerConnector.cc | 633 ++++++++++ .../modules/replica_core/MessengerConnector.h | 473 ++++++++ core/modules/replica_core/MoveReplicaJob.cc | 469 ++++++++ core/modules/replica_core/MoveReplicaJob.h | 272 +++++ core/modules/replica_core/Performance.cc | 194 ++++ core/modules/replica_core/Performance.h | 196 ++++ core/modules/replica_core/ProtocolBuffer.cc | 108 ++ core/modules/replica_core/ProtocolBuffer.h | 197 ++++ core/modules/replica_core/PurgeJob.cc | 530 +++++++++ core/modules/replica_core/PurgeJob.h | 292 +++++ core/modules/replica_core/RebalanceJob.cc | 674 +++++++++++ core/modules/replica_core/RebalanceJob.h | 356 ++++++ core/modules/replica_core/ReplicaInfo.cc | 253 ++++ core/modules/replica_core/ReplicaInfo.h | 229 ++++ core/modules/replica_core/ReplicateJob.cc | 591 ++++++++++ core/modules/replica_core/ReplicateJob.h | 292 +++++ .../replica_core/ReplicationRequest.cc | 734 ++++++++++++ .../modules/replica_core/ReplicationRequest.h | 377 ++++++ core/modules/replica_core/Request.cc | 263 +++++ core/modules/replica_core/Request.h | 400 +++++++ .../modules/replica_core/RequestConnection.cc | 265 +++++ core/modules/replica_core/RequestConnection.h | 220 ++++ core/modules/replica_core/RequestMessenger.cc | 78 ++ core/modules/replica_core/RequestMessenger.h | 116 ++ core/modules/replica_core/RequestTracker.cc | 194 ++++ core/modules/replica_core/RequestTracker.h | 326 ++++++ core/modules/replica_core/RequestTypesFwd.h | 234 ++++ .../replica_core/ServiceManagementRequest.cc | 460 ++++++++ .../replica_core/ServiceManagementRequest.h | 559 +++++++++ core/modules/replica_core/ServiceProvider.cc | 72 ++ core/modules/replica_core/ServiceProvider.h | 119 ++ core/modules/replica_core/StatusRequest.cc | 552 +++++++++ core/modules/replica_core/StatusRequest.h | 726 ++++++++++++ core/modules/replica_core/StopRequest.cc | 554 +++++++++ core/modules/replica_core/StopRequest.h | 726 ++++++++++++ .../replica_core/SuccessRateGenerator.cc | 47 + .../replica_core/SuccessRateGenerator.h | 96 ++ core/modules/replica_core/VerifyJob.cc | 467 ++++++++ core/modules/replica_core/VerifyJob.h | 300 +++++ .../replica_core/WorkerDeleteRequest.cc | 213 ++++ .../replica_core/WorkerDeleteRequest.h | 190 +++ .../replica_core/WorkerFindAllRequest.cc | 281 +++++ .../replica_core/WorkerFindAllRequest.h | 191 ++++ .../modules/replica_core/WorkerFindRequest.cc | 410 +++++++ core/modules/replica_core/WorkerFindRequest.h | 210 ++++ core/modules/replica_core/WorkerProcessor.cc | 903 +++++++++++++++ core/modules/replica_core/WorkerProcessor.h | 504 ++++++++ .../replica_core/WorkerProcessorThread.cc | 154 +++ .../replica_core/WorkerProcessorThread.h | 149 +++ .../replica_core/WorkerReplicationRequest.cc | 970 ++++++++++++++++ .../replica_core/WorkerReplicationRequest.h | 359 ++++++ core/modules/replica_core/WorkerRequest.cc | 236 ++++ core/modules/replica_core/WorkerRequest.h | 281 +++++ .../replica_core/WorkerRequestFactory.cc | 445 ++++++++ .../replica_core/WorkerRequestFactory.h | 301 +++++ core/modules/replica_core/WorkerServer.cc | 134 +++ core/modules/replica_core/WorkerServer.h | 151 +++ .../replica_core/WorkerServerConnection.cc | 493 ++++++++ .../replica_core/WorkerServerConnection.h | 212 ++++ 144 files changed, 43763 insertions(+) create mode 100644 core/modules/loader/FileServer.cc create mode 100644 core/modules/loader/FileServer.h create mode 100644 core/modules/proto/loader.proto create mode 100644 core/modules/replica/CmdParser.cc create mode 100644 core/modules/replica/CmdParser.h create mode 100644 core/modules/replica/TODO.txt create mode 100644 core/modules/replica/replica_calculate_cs.cc create mode 100644 core/modules/replica/replica_controller_admin.cc create mode 100644 core/modules/replica/replica_controller_chunks.cc create mode 100644 core/modules/replica/replica_controller_cmd.cc create mode 100644 core/modules/replica/replica_controller_purge.cc create mode 100644 core/modules/replica/replica_controller_replicate.cc create mode 100644 core/modules/replica/replica_controller_test.cc create mode 100644 core/modules/replica/replica_copy_file.cc create mode 100644 core/modules/replica/replica_embedded_test.cc create mode 100644 core/modules/replica/replica_file_read.cc create mode 100644 core/modules/replica/replica_file_server.cc create mode 100644 core/modules/replica/replica_fill_file.cc create mode 100644 core/modules/replica/replica_job_chunks.cc create mode 100644 core/modules/replica/replica_job_delete_worker.cc create mode 100644 core/modules/replica/replica_job_fixup.cc create mode 100644 core/modules/replica/replica_job_move.cc create mode 100644 core/modules/replica/replica_job_purge.cc create mode 100644 core/modules/replica/replica_job_rebalance.cc create mode 100644 core/modules/replica/replica_job_replicate.cc create mode 100644 core/modules/replica/replica_job_verify.cc create mode 100644 core/modules/replica/replica_jobctrl_test.cc create mode 100644 core/modules/replica/replica_json_test.cc create mode 100644 core/modules/replica/replica_master.cc create mode 100644 core/modules/replica/replica_messenger_test.cc create mode 100644 core/modules/replica/replica_mysql_test.cc create mode 100644 core/modules/replica/replica_test_sql.cc create mode 100644 core/modules/replica/replica_worker.cc create mode 100644 core/modules/replica/replication.sql create mode 100644 core/modules/replica/replication_config_dev.sql create mode 100644 core/modules/replica/replication_config_nebula.sql create mode 100644 core/modules/replica_core/BlockPost.cc create mode 100644 core/modules/replica_core/BlockPost.h create mode 100644 core/modules/replica_core/ChunkLocker.cc create mode 100644 core/modules/replica_core/ChunkLocker.h create mode 100644 core/modules/replica_core/Common.cc create mode 100644 core/modules/replica_core/Common.h create mode 100644 core/modules/replica_core/Configuration.cc create mode 100644 core/modules/replica_core/Configuration.h create mode 100644 core/modules/replica_core/ConfigurationFile.cc create mode 100644 core/modules/replica_core/ConfigurationFile.h create mode 100644 core/modules/replica_core/ConfigurationMySQL.cc create mode 100644 core/modules/replica_core/ConfigurationMySQL.h create mode 100644 core/modules/replica_core/Controller.cc create mode 100644 core/modules/replica_core/Controller.h create mode 100644 core/modules/replica_core/DatabaseMySQL.cc create mode 100644 core/modules/replica_core/DatabaseMySQL.h create mode 100644 core/modules/replica_core/DatabaseServices.cc create mode 100644 core/modules/replica_core/DatabaseServices.h create mode 100644 core/modules/replica_core/DatabaseServicesMySQL.cc create mode 100644 core/modules/replica_core/DatabaseServicesMySQL.h create mode 100644 core/modules/replica_core/DeleteRequest.cc create mode 100644 core/modules/replica_core/DeleteRequest.h create mode 100644 core/modules/replica_core/DeleteWorkerJob.cc create mode 100644 core/modules/replica_core/DeleteWorkerJob.h create mode 100644 core/modules/replica_core/ErrorReporting.h create mode 100644 core/modules/replica_core/FileClient.cc create mode 100644 core/modules/replica_core/FileClient.h create mode 100644 core/modules/replica_core/FileServer.cc create mode 100644 core/modules/replica_core/FileServer.h create mode 100644 core/modules/replica_core/FileServerConnection.cc create mode 100644 core/modules/replica_core/FileServerConnection.h create mode 100644 core/modules/replica_core/FileUtils.cc create mode 100644 core/modules/replica_core/FileUtils.h create mode 100644 core/modules/replica_core/FindAllJob.cc create mode 100644 core/modules/replica_core/FindAllJob.h create mode 100644 core/modules/replica_core/FindAllRequest.cc create mode 100644 core/modules/replica_core/FindAllRequest.h create mode 100644 core/modules/replica_core/FindRequest.cc create mode 100644 core/modules/replica_core/FindRequest.h create mode 100644 core/modules/replica_core/FixUpJob.cc create mode 100644 core/modules/replica_core/FixUpJob.h create mode 100644 core/modules/replica_core/Job.cc create mode 100644 core/modules/replica_core/Job.h create mode 100644 core/modules/replica_core/JobController.cc create mode 100644 core/modules/replica_core/JobController.h create mode 100644 core/modules/replica_core/Messenger.cc create mode 100644 core/modules/replica_core/Messenger.h create mode 100644 core/modules/replica_core/MessengerConnector.cc create mode 100644 core/modules/replica_core/MessengerConnector.h create mode 100644 core/modules/replica_core/MoveReplicaJob.cc create mode 100644 core/modules/replica_core/MoveReplicaJob.h create mode 100644 core/modules/replica_core/Performance.cc create mode 100644 core/modules/replica_core/Performance.h create mode 100644 core/modules/replica_core/ProtocolBuffer.cc create mode 100644 core/modules/replica_core/ProtocolBuffer.h create mode 100644 core/modules/replica_core/PurgeJob.cc create mode 100644 core/modules/replica_core/PurgeJob.h create mode 100644 core/modules/replica_core/RebalanceJob.cc create mode 100644 core/modules/replica_core/RebalanceJob.h create mode 100644 core/modules/replica_core/ReplicaInfo.cc create mode 100644 core/modules/replica_core/ReplicaInfo.h create mode 100644 core/modules/replica_core/ReplicateJob.cc create mode 100644 core/modules/replica_core/ReplicateJob.h create mode 100644 core/modules/replica_core/ReplicationRequest.cc create mode 100644 core/modules/replica_core/ReplicationRequest.h create mode 100644 core/modules/replica_core/Request.cc create mode 100644 core/modules/replica_core/Request.h create mode 100644 core/modules/replica_core/RequestConnection.cc create mode 100644 core/modules/replica_core/RequestConnection.h create mode 100644 core/modules/replica_core/RequestMessenger.cc create mode 100644 core/modules/replica_core/RequestMessenger.h create mode 100644 core/modules/replica_core/RequestTracker.cc create mode 100644 core/modules/replica_core/RequestTracker.h create mode 100644 core/modules/replica_core/RequestTypesFwd.h create mode 100644 core/modules/replica_core/ServiceManagementRequest.cc create mode 100644 core/modules/replica_core/ServiceManagementRequest.h create mode 100644 core/modules/replica_core/ServiceProvider.cc create mode 100644 core/modules/replica_core/ServiceProvider.h create mode 100644 core/modules/replica_core/StatusRequest.cc create mode 100644 core/modules/replica_core/StatusRequest.h create mode 100644 core/modules/replica_core/StopRequest.cc create mode 100644 core/modules/replica_core/StopRequest.h create mode 100644 core/modules/replica_core/SuccessRateGenerator.cc create mode 100644 core/modules/replica_core/SuccessRateGenerator.h create mode 100644 core/modules/replica_core/VerifyJob.cc create mode 100644 core/modules/replica_core/VerifyJob.h create mode 100644 core/modules/replica_core/WorkerDeleteRequest.cc create mode 100644 core/modules/replica_core/WorkerDeleteRequest.h create mode 100644 core/modules/replica_core/WorkerFindAllRequest.cc create mode 100644 core/modules/replica_core/WorkerFindAllRequest.h create mode 100644 core/modules/replica_core/WorkerFindRequest.cc create mode 100644 core/modules/replica_core/WorkerFindRequest.h create mode 100644 core/modules/replica_core/WorkerProcessor.cc create mode 100644 core/modules/replica_core/WorkerProcessor.h create mode 100644 core/modules/replica_core/WorkerProcessorThread.cc create mode 100644 core/modules/replica_core/WorkerProcessorThread.h create mode 100644 core/modules/replica_core/WorkerReplicationRequest.cc create mode 100644 core/modules/replica_core/WorkerReplicationRequest.h create mode 100644 core/modules/replica_core/WorkerRequest.cc create mode 100644 core/modules/replica_core/WorkerRequest.h create mode 100644 core/modules/replica_core/WorkerRequestFactory.cc create mode 100644 core/modules/replica_core/WorkerRequestFactory.h create mode 100644 core/modules/replica_core/WorkerServer.cc create mode 100644 core/modules/replica_core/WorkerServer.h create mode 100644 core/modules/replica_core/WorkerServerConnection.cc create mode 100644 core/modules/replica_core/WorkerServerConnection.h diff --git a/core/modules/SConscript b/core/modules/SConscript index 0959632683..c891d52a15 100644 --- a/core/modules/SConscript +++ b/core/modules/SConscript @@ -284,6 +284,11 @@ shlibs["qmetaLib"] = dict(mods="qmeta:python", libs="qserv_qmeta log", SHLIBPREFIX='', instDir="$python_prefix/lsst/qserv/qmeta") + +# library of tools for building binary applications of the loader subsystem +shlibs["loader"] = dict(mods="""loader""", + libs="""util protobuf boost_filesystem boost_system + log log4cxx""") # get list of all modules all_modules = sorted(str(d) for d in Glob('*', source=True) if os.path.isdir(d.srcnode().abspath)) diff --git a/core/modules/loader/FileServer.cc b/core/modules/loader/FileServer.cc new file mode 100644 index 0000000000..83b592ca35 --- /dev/null +++ b/core/modules/loader/FileServer.cc @@ -0,0 +1,383 @@ +/* + * LSST Data Management System + * Copyright 2017 LSST Corporation. + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ + +// Class header +#include "loader/FileServer.h" + +// System headers +#include +#include +#include // &&& delete? +#include +#include +#include +#include + +// Qserv headers + +#include "lsst/log/Log.h" + +namespace fs = boost::filesystem; + +namespace { + +LOG_LOGGER _log = LOG_GET("lsst.qserv.loader.FileServer"); + +/// The limit of 16 MB fo rthe maximum record size for file I/O and +/// network operations. +const size_t maxFileBufSizeBytes = 16 * 1024 * 1024; // &&& there must be a better way to store this. + +} /// namespace + +namespace lsst { +namespace qserv { +namespace loader { + + +FileServer::Ptr FileServer::create(FileServerConfig::Ptr const& fileServerConfig) { + return FileServer::Ptr (new FileServer(fileServerConfig)); +} + + +FileServer::FileServer (FileServerConfig::Ptr const& fileServerConfig) + : _fileServerConfig(fileServerConfig), + _acceptor(_io_service, + boost::asio::ip::tcp::endpoint ( + boost::asio::ip::tcp::v4(), + _fileServerConfig->getPort())) { + // Allow socket recycling ports after catastrophic failures. + _acceptor.set_option(boost::asio::socket_base::reuse_address(true)); +} + + +void FileServer::run() { + // Start with this so there is something for the pool to do. + beginAccept(); + + // Launch all threads in the pool + std::vector> threads(_serviceProvider.config()->workerNumFsProcessingThreads()); + for (std::size_t i = 0; i < threads.size(); ++i) { + std::shared_ptr ptr( + new std::thread(boost::bind(&boost::asio::io_service::run, &_io_service))); // &&& Why is boost::bind needed? + threads[i] = ptr; + } + + // Wait for all threads in the pool to exit. + for (std::size_t i = 0; i < threads.size(); ++i) + threads[i]->join(); +} + + +void FileServer::beginAccept () { + auto connection = FileServerConnection::create(shared_from_this()); + + _acceptor.async_accept ( + connection->socket(), + boost::bind ( + &FileServer::handleAccept, + shared_from_this(), + connection, + boost::asio::placeholders::error + ) + ); +} + +void FileServer::handleAccept (FileServerConnection::Ptr const& connection, boost::system::error_code const& err) { + if (!err) { + connection->beginProtocol(); + } else { + LOGS(_log, LOG_LVL_DEBUG, context() << "handleAccept err:" << err); + } + beginAccept(); +} + + +//////////////////////// &&& delete this + +typedef std::shared_ptr ProtocolBufferPtr; + +/// The context for diagnostic & debug printouts +const std::string context = "FILE-SERVER-CONNECTION "; // &&& replace with identifier for file request. + +bool FileServerConnection::_isErrorCode (boost::system::error_code ec, std::string const& scope) { + + if (ec) { + if (ec == boost::asio::error::eof) + LOGS(_log, LOG_LVL_DEBUG, context << scope << " ** closed **"); + else + LOGS(_log, LOG_LVL_ERROR, context << scope << " ** failed: " << ec << " **"); + return true; + } + return false; +} + +bool FileServerConnection::_readIntoBuffer (size_t bytes) { + _bufferPtr->resize(bytes); // make sure the buffer has enough space to accomodate + // the data of the message. + + boost::system::error_code ec; + boost::asio::read (_socket, boost::asio::buffer (_bufferPtr->data(), bytes), + boost::asio::transfer_at_least(bytes), ec); + return !_isErrorCode(ec, "readIntoBuffer"); +} + + +bool FileServerConnection::_readMessage(size_t bytes, proto::ReplicationFileRequest &message) { + if (!_readIntoBuffer (bytes)) return false; + + _bufferPtr->parse(message, bytes); + return true; +} + +/* &&& +FileServerConnection::Ptr +FileServerConnection::create(ServiceProvider &serviceProvider, + const std::string &workerName, + boost::asio::io_service &io_service) { + return FileServerConnection::pointer ( + new FileServerConnection ( + serviceProvider, + workerName, + io_service)); +} +*/ + +FileServerConnection::Ptr FileServerConnection::create(FileServer::Ptr const& fileServer) { + return FileServerConnection::Ptr (new FileServerConnection(fileServer, fileServer->getConfig())); +} + +/// &&& it looks like injecting a bufferPtr object would be a better way to go... ?? +FileServerConnection::FileServerConnection(FileServer::Ptr const& fileServer, + FileServerConfig::Ptr const& fileServerConfig) + : _fileServer(fileServer), + _bufferPtr (std::make_shared(fileServerConfig()->requestBufferSizeBytes())), + _socket(fileServer->getIOService()), + _fileBufSize(serviceProvider.config()->workerFsBufferSizeBytes()) { + + if (!_fileBufSize || _fileBufSize > maxFileBufSizeBytes) + throw std::invalid_argument("FileServerConnection: the buffer size must be in a range of: 0-" + + std::to_string(maxFileBufSizeBytes) + " bytes. Check the configuration."); + + _fileVect.resize(_fileBufSize); + // _fileBuf = new uint8_t[_fileBufSize]; &&& + _fileBuf = &(_fileVect[0]); + + if (!_fileBuf) + throw std::runtime_error("FileServerConnection: failed to allocate the buffer, size: " + + std::to_string(maxFileBufSizeBytes) + " bytes."); +} + + + +FileServerConnection::~FileServerConnection () { + // delete [] _fileBuf; &&& delete, destroyed with vector now. +} + +void FileServerConnection::beginProtocol () { + receiveRequest(); +} + +void FileServerConnection::receiveRequest () { + + LOGS(_log, LOG_LVL_DEBUG, context << "receiveRequest"); + + // Start with receiving the fixed length frame carrying + // the size (in bytes) the length of the subsequent message. + // + // The message itself will be read from the handler using + // the synchronous read method. This is based on an assumption + // that a client sends the whole message (its frame and + // the message itsef) at once. + + const size_t bytes = sizeof(uint32_t); + + _bufferPtr->resize(bytes); + + boost::asio::async_read ( + _socket, + boost::asio::buffer ( + _bufferPtr->data(), + bytes + ), + boost::asio::transfer_at_least(bytes), + boost::bind ( + &FileServerConnection::requestReceived, + shared_from_this(), + boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred + ) + ); +} + + +void FileServerConnection::requestReceived (boost::system::error_code const& ec, size_t bytes_transferred) { + LOGS(_log, LOG_LVL_DEBUG, context << "requestReceived"); + + if ( ::isErrorCode (ec, "requestReceived")) return; + + // Now read the body of the request + proto::ReplicationFileRequest request; + if (!_readMessage(_socket, _bufferPtr, _bufferPtr->parseLength(), request)) return; + + LOGS(_log, LOG_LVL_INFO, context << "requestReceived database: " << request.database() + << ", file: " << request.file()); + + // Find a file requested by a client + + bool available = false; + uint64_t size = 0; + std::time_t mtime = 0; + do { + boost::system::error_code ec; + const fs::path file = fs::path(_workerInfo.dataDir) / request.database() / request.file(); + const fs::file_status stat = fs::status(file, ec); + + if (stat.type() == fs::status_error) { + LOGS(_log, LOG_LVL_ERROR, context << "requestReceived failed to check the status of file: " << file); + break; + } + if (!fs::exists(stat)) { + LOGS(_log, LOG_LVL_ERROR, context << "requestReceived file does not exist: " << file); + break; + } + size = fs::file_size(file, ec); + if (ec) { + LOGS(_log, LOG_LVL_ERROR, context << "requestReceived failed to get the file size of: " << file); + break; + } + mtime = fs::last_write_time(file, ec); + if (ec) { + LOGS(_log, LOG_LVL_ERROR, context << "requestReceived failed to get file mtime of: " << file); + break; + } + + // If requested open the file and leave its descriptor open + + _fileName = file.string(); + if (request.send_content()) { + _filePtr = std::fopen (file.string().c_str(), "rb"); + if (!_filePtr) { + LOGS(_log, LOG_LVL_ERROR, context << "requestReceived file open error: " << std::strerror(errno) << ", file: " << file); + break; + } + } + available = true; + + } while (false); // &&& why bother with do..while ? + + // Serialize the response into the buffer and send it back to a caller + + proto::ReplicationFileResponse response; + response.set_available (available); + response.set_size (size); + response.set_mtime (mtime); + + _bufferPtr->resize(); + _bufferPtr->serialize(response); + + sendResponse (); +} + +void FileServerConnection::sendResponse () { + + LOGS(_log, LOG_LVL_DEBUG, context << "sendResponse"); + + boost::asio::async_write ( + _socket, + boost::asio::buffer ( + _bufferPtr->data(), + _bufferPtr->size() + ), + boost::bind ( + &FileServerConnection::responseSent, + shared_from_this(), + boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred + ) + ); +} + +void FileServerConnection::responseSent (const boost::system::error_code &ec, size_t bytes_transferred) { + LOGS(_log, LOG_LVL_DEBUG, context << "responseSent"); + + if (_isErrorCode (ec, "sent") || !_filePtr) { + // &&& Need some internal indication that transfer has failed? + return; + } + + // The file is open. Begin streaming its content. + sendData(); +} + +void FileServerConnection::sendData () { + + LOGS(_log, LOG_LVL_DEBUG, context << "sendData file: " << _fileName); + + // Read next record if possible (a failure or EOF) + const size_t bytes = std::fread(_fileBuf, sizeof(uint8_t), _fileBufSize, _filePtr); + if (!bytes) { + if (std::ferror(_filePtr)) { + LOGS(_log, LOG_LVL_ERROR, + context << "sendData file read error: " << std::strerror(errno) << ", file: " << _fileName); + } else if (std::feof(_filePtr)) { + LOGS(_log, LOG_LVL_INFO, context << "sendData file: " << _fileName); + // &&& indicate success ? + } else { + ; // This file was empty, or the previous read was aligned exactly on the end of the file. + } + + std::fclose(_filePtr); + return; + } + + // Send the record + boost::asio::async_write ( + _socket, + boost::asio::buffer ( + _fileBuf, + bytes + ), + boost::bind ( + &FileServerConnection::dataSent, + shared_from_this(), + boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred + ) + ); +} + +void FileServerConnection::dataSent (const boost::system::error_code &ec, + size_t bytes_transferred) { + + LOGS(_log, LOG_LVL_DEBUG, context << "dataSent"); + + if (::isErrorCode (ec, "dataSent")) return; + + sendData(); +} + + +}}} // namespace lsst::qserv::loader + + + + diff --git a/core/modules/loader/FileServer.h b/core/modules/loader/FileServer.h new file mode 100644 index 0000000000..e9d70cf55a --- /dev/null +++ b/core/modules/loader/FileServer.h @@ -0,0 +1,181 @@ +// -*- LSST-C++ -*- +/* + * LSST Data Management System + * Copyright 2017 LSST Corporation. + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ +#ifndef LSST_QSERV_LOADER_FILESERVER_H +#define LSST_QSERV_LOADER_FILESERVER_H + +// System headers +#include +#include + +// Qserv headers +#include "replica_core/FileServerConnection.h" + + +namespace lsst { +namespace qserv { +namespace loader { + +class FileServerConnection; +class FileServerConfig; + + +/// This class is meant to simply provide the requested file. +/// Based on FileServer in Igor Gaponenko's replication system. +class FileServer : public std::enable_shared_from_this { +public: + typedef std::shared_ptr Ptr; + + // Factory function to ensure proper creation for enable_shared_from_this. + static Ptr create(FileServerConfig::Ptr const& fileServerConfig); + + FileServer () = delete; + FileServer (FileServer const&) = delete; + FileServer & operator= (FileServer const&) = delete; + + /// Where the work happens, run in a separate thread. + void run (); + + FileServerConfig::Ptr getConfig() { return _fileServerConfig; } + boost::asio::io_service& getIoService() { return _io_service; } + +private: + explicit FileServer (FileServerConfig::Ptr const& fileServerConfig); + + /// Following function accept and handle the file requests. + void beginAccept (); // &&& add _ + void handleAccept (std::shared_ptr const& connection, + boost::system::error_code const& ec); + + /// Return the context string + std::string context () const { return "FILE-SERVER "; } // &&& is this needed? + + FileServerConfig::Ptr _fileServerConfig; + + boost::asio::io_service _io_service; // &&& change to _ioService + boost::asio::ip::tcp::acceptor _acceptor; +}; + + + +/** + * Class FileServerConnection is used for handling file read requests from + * remote clients. One instance of the class serves one file from one client + * at a time. + * + * Objects of this class are inistantiated by FileServer. After that + * the server calls this class's method beginProtocol() which starts + * a series of asynchronous operations to communicate with remote client. + * When all details of an incoming request are obtained from the client + * the connection object begins actual processing of the request and + * communicates with a client as required by the file transfer protocol. + * All communications are asynchronous using Google protobuf. + * + * The lifespan of this object is exactly one request until it's fully + * satisfied or any failure during request execution (when reading a file, + * or communicating with a client) occures. When this happens the object + * stops doing anything. + * + * This is essentially an RPC protocol which runs in a loop this sequence of steps + * starting with a call to beginProtocol(). + * - ASYNC: read a frame header of a request + * - SYNC: read the request header (a specification of a file, etc.) + * - ASYNC: write a frame header of a reply to the request + * followed by a status (to tell a client if the specified file + * is available or not, and if so then what would be its size, etc.) + * - ASYNC: if the request is accepted then begin streaming the content of + * a file in a series of records until it's done. + * + * NOTES: A reason why the read phase is split into three steps is + * that a client is expected to send all components of the request + * (frame header and request header) at once. This means + * the whole incomming message will be already available on the server's + * host memory when an asyncronous handler for the frame header will fire. + * However, due to a variable length of the request we should know its length + * before attempting to read the rest of the incomming message as this (the later) + * will require two things: 1) to ensure we have enough buffer space + * allocated, and 2) to tell the asynchrnous reader function + * how many bytes exactly are we going to read. + * + * The chain ends when a client disconnects or when an error condition + * is met. + * + * Based on FileServerConnection in Igor Gaponenko's replication system. + */ +class FileServerConnection : public std::enable_shared_from_this { +public: + typedef std::shared_ptr Ptr; + + /// Factory to ensure correct construction for enable_shared_from_this. + static Ptr create(FileServer::Ptr const& fileServer); + + FileServerConnection() = delete; + FileServerConnection(FileServerConnection const&) = delete; + FileServerConnection& operator=(FileServerConnection const&) = delete; + + virtual ~FileServerConnection(); + + /// Return a network socket associated with the connection. + boost::asio::ip::tcp::socket& socket() { return _socket; } + + ///Begin communicating asynchroniously with a client. + void beginProtocol(); + +private: + FileServerConnection (FileServer::Ptr const& fileServer, + std::shared_ptr const& fileServerConfig); + + ///Begin reading the frame header of a new request + void receiveRequest (); // &&& add _ + + /// Parse the request and begin file transfer. + void requestReceived (boost::system::error_code const& ec, size_t bytes_transferred); // &&& add _ + + /// Begin sending a result back to a client + void sendResponse (); // &&& add _ + + /// Send the next record for the file + void sendData (); // &&& add _ + + /// The callback on finishing (either successfully or not) of aynchronious writes. + void responseSent (boost::system::error_code const& ec, size_t bytes_transferred); // &&& add _ + + /// The callback on finishing (either successfully or not) of aynchronious writes. + void dataSent (boost::system::error_code const& ec, size_t bytes_transferred); + + bool _isErrorCode (boost::system::error_code ec, std::string const& scope); + + std::weak_ptr _fileServer; + boost::asio::ip::tcp::socket _socket; + + std::shared_ptr _bufferPtr; ///< Buffer serialization. + std::string _fileName; ///< The name of the file being transferred. + std::FILE* _filePtr; ///< The file. + size_t _fileBufSize{0}; ///< The file record buffer size (bytes) + std::vector _fileVect; ///< container for the fileBuf + uint8_t *_fileBuf; ///< Pointer to the start of _fileVect's internal array. +}; + + +}}} // namespace lsst::qserv::loader + +#endif // LSST_QSERV_LOADER_FILESERVER_H diff --git a/core/modules/proto/SConscript b/core/modules/proto/SConscript index c32c4c7821..a738b309b6 100644 --- a/core/modules/proto/SConscript +++ b/core/modules/proto/SConscript @@ -7,5 +7,9 @@ env.Protoc(File("worker.proto"), PROTOC_PATH='.', PROTOC_CCOUT='.', PROTOC_PYOUT='.',) +env.Protoc(File("loader.proto"), + PROTOC_PATH='.', + PROTOC_CCOUT='.', + PROTOC_PYOUT='.',) standardModule(env, test_libs='log4cxx') diff --git a/core/modules/proto/loader.proto b/core/modules/proto/loader.proto new file mode 100644 index 0000000000..34b8038f08 --- /dev/null +++ b/core/modules/proto/loader.proto @@ -0,0 +1,49 @@ +/* + * LSST Data Management System + * Copyright 2011-2018 LSST Corporation. + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ +/// replication.proto +/// This defines the wire-messages sent between replication master and workers. + +package lsst.qserv.proto; + + +message LoaderFileRequest { + /// The name of a file (including its extention and excluding any path) + required string file = 2; +} + +message LoaderFileResponse { + + /// The flag indicating if the earlier requested file is available, + /// and it can be read by the server. + required bool available = 1; + + /// The file size (bytes) + required uint64 size = 2; + + /// The file content modification time in seconds (since UNIX Epoch) + required uint32 mtime = 3; + +} + + + + \ No newline at end of file diff --git a/core/modules/replica/CmdParser.cc b/core/modules/replica/CmdParser.cc new file mode 100644 index 0000000000..d331365da0 --- /dev/null +++ b/core/modules/replica/CmdParser.cc @@ -0,0 +1,253 @@ +/* + * LSST Data Management System + * Copyright 2017 LSST Corporation. + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ + +// Class header + +#include "replica/CmdParser.h" + +// System headers + +#include +#include +#include + +// Qserv headers + +namespace lsst { +namespace qserv { +namespace replica { + +bool +CmdParser::found_in (std::string const& val, + std::vector const& col) { + return col.end() != std::find(col.begin(), col.end(), val); +} + +CmdParser::CmdParser (int argc, + const char* const* argv, + const char* usage) + : _usage(usage) { + + _usage = _usage + + "\nSpecial options:\n" + " --help - print the help page\n"; + + for (int i=0; i < argc; ++i) _argv.push_back(argv[i]); + parse(); +} + +CmdParser::~CmdParser () { +} + +bool +CmdParser::flag (std::string const& name) const { + if (name == "help") { + std::cerr + << _usage << std::endl; + return true; + } + return _flag.count(name); +} + +std::string +CmdParser::parameterRestrictedBy (unsigned int pos, + std::vector const& allowedValues) const { + + const std::string str = parameter(pos); + if (found_in(str, allowedValues)) return str; + + std::cerr + << "CmdParser::parameterRestrictedBy(" << std::to_string(pos) + << "): parameter value is not permitted: " << str << "\n" + << _usage << std::endl; + + throw std::invalid_argument ( + "CmdParser::parameterRestrictedBy(" + std::to_string(pos) + + "): parameter value is not permitted: " + str); +} + +bool +CmdParser::optionImpl (std::string const& name, + bool const& defaultValue) const { + + std::string const str = optionImpl(name, std::string()); + if (str.empty()) return defaultValue; + + if (str == "true") return true; + else if (str == "false") return false; + + throw std::invalid_argument ( + "CmdParser::optionImpl: failed to parse a value of option: " + name); +} + +int +CmdParser::optionImpl (std::string const& name, + int const& defaultValue) const { + + std::string const str = optionImpl(name, std::string()); + if (str.empty()) return defaultValue; + + try { + return std::stoi(str); + } catch (std::exception const&) { + throw std::invalid_argument ( + "CmdParser::optionImpl: failed to parse a value of option: " + name); + } +} + +unsigned int +CmdParser::optionImpl (std::string const& name, + unsigned int const& defaultValue) const { + + std::string const str = optionImpl(name, std::string()); + if (str.empty()) return defaultValue; + + try { + return std::stoul(str); + } catch (std::exception const&) { + throw std::invalid_argument ( + "CmdParser::optionImpl: failed to parse a value of option: " + name); + } +} + +std::string +CmdParser::optionImpl (std::string const& name, + std::string const& defaultValue) const { + return _option.count(name) ? _option.at(name) : defaultValue; +} + +void +CmdParser::parameterImpl (unsigned int pos, + bool& val) const { + std::string str; + parameterImpl(pos, str); + + if (str == "true") { val = true; return; } + else if (str == "false") { val = false; return; } + + throw std::invalid_argument ( + "CmdParser::parameterImpl(" + std::to_string(pos) + + "): failed to parse a value of argument: " + str); +} + +void +CmdParser::parameterImpl (unsigned int pos, + int& val) const { + std::string str; + parameterImpl(pos, str); + + try { + val = std::stoi(str); + return; + } catch (std::exception const&) { + throw std::invalid_argument ( + "CmdParser::parameterImpl(" + std::to_string(pos) + + "): failed to parse a value of argument: " + str); + } +} + +void +CmdParser::parameterImpl (unsigned int pos, + unsigned int& val) const { + std::string str; + parameterImpl(pos, str); + + try { + val = std::stoul(str); + return; + } catch (std::exception const&) { + throw std::invalid_argument ( + "CmdParser::parameterImpl(" + std::to_string(pos) + + "): failed to parse a value of argument: " + str); + } +} +void +CmdParser::parameterImpl (unsigned int pos, + std::string& val) const { + if (pos >= _parameter.size()) { + std::cerr + << "CmdParser::parameterImpl(" << pos << "): too few positional arguments\n" + << _usage << std::endl; + throw std::out_of_range ( + "CmdParser::parameterImpl(" + std::to_string(pos) + "): too few positional arguments"); + } + val = _parameter[pos]; + return; +} + +void +CmdParser::dump (std::ostream& os) const { + + os << "CmdParser::dump()\n"; + + os << " PARAMETERS:\n"; + for (auto const& p: _parameter) + os << " " << p << "\n"; + + os << " OPTIONS:\n"; + for (auto const& p: _option) + os << " " << p.first << "=" << p.second << "\n"; + + os << " FLAGS:\n"; + for (auto const& p: _flag) + os << " " << p << "\n"; +} + +void +CmdParser::parse () { + for (auto const& arg: _argv) { + if (arg.substr(0,2) == "--") { + std::string const nameEqualValue = arg.substr(2); + if (nameEqualValue.empty()) { + std::cerr + << "CmdParser::parse: illegal command line argument: " << arg << "\n" + << _usage << std::endl; + throw std::invalid_argument( + "CmdParser::parse: illegal command line argument: " + arg); + } + std::string::size_type const equalPos = nameEqualValue.find("="); + if (equalPos == std::string::npos) { + if (nameEqualValue == "help") { + std::cerr + << _usage << std::endl; + throw std::invalid_argument ( + "CmdParser::parse: help mode intercepted"); + } + _flag.insert(nameEqualValue); + } else { + std::string const option = nameEqualValue.substr(0, equalPos); + std::string const value = nameEqualValue.substr(equalPos+1); + if (value.empty()) { + std::cerr + << "CmdParser::parse: no value provided for option: " << option << "\n" + << _usage << std::endl; + throw std::invalid_argument ( + "CmdParser::parse: no value provided for option: " + option); + } + _option[option] = value; + } + } else { + _parameter.push_back(arg); + } + } +} +}}} // namespace lsst::qserv::replica diff --git a/core/modules/replica/CmdParser.h b/core/modules/replica/CmdParser.h new file mode 100644 index 0000000000..d73693bd12 --- /dev/null +++ b/core/modules/replica/CmdParser.h @@ -0,0 +1,276 @@ +/* + * LSST Data Management System + * Copyright 2017 LSST Corporation. + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ +#ifndef LSST_QSERV_REPLICA_CMD_PARSER_H +#define LSST_QSERV_REPLICA_CMD_PARSER_H + +/// CmdParser.h declares: +/// +/// class CmdParser +/// (see individual class documentation for more information) + +// System headers + +#include +#include +#include +#include +#include + +// Qserv headers + +// This header declarations + +namespace lsst { +namespace qserv { +namespace replica { + +/** + * The command line parser class for the command-line applications. + * It helps with parsing and interpreting command line arguments into + * positional parameters, flags and options: + * + * + * -- + * --