From 6480fa336a0ef1fc72ebe707fe0fd72c31cc2272 Mon Sep 17 00:00:00 2001 From: Fabrice Jammes Date: Wed, 8 Mar 2023 13:27:06 +0100 Subject: [PATCH 1/5] Install arrow --- admin/tools/docker/base/Dockerfile | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/admin/tools/docker/base/Dockerfile b/admin/tools/docker/base/Dockerfile index c974ccd09..eb9ff58ac 100644 --- a/admin/tools/docker/base/Dockerfile +++ b/admin/tools/docker/base/Dockerfile @@ -56,6 +56,19 @@ RUN dnf install -y 'dnf-command(config-manager)' \ && dnf clean all \ && rm -rf /var/cache/yum +ENV ARROW_VERSION 11.0.0 + +RUN dnf update -y \ + && dnf install -y \ + https://apache.jfrog.io/artifactory/arrow/almalinux/$(cut -d: -f5 /etc/system-release-cpe | cut -d. -f1)/apache-arrow-release-latest.rpm \ + && dnf config-manager --set-enabled epel \ + && dnf config-manager --set-enabled powertools \ + && dnf install -y \ + arrow-devel-$ARROW_VERSION \ + parquet-devel-$ARROW_VERSION \ + && dnf clean all \ + && rm -rf /var/cache/yum + RUN curl -s "https://cmake.org/files/v3.17/cmake-3.17.2-Linux-x86_64.tar.gz" \ | tar --strip-components=1 -xz -C /usr/local @@ -196,6 +209,19 @@ RUN dnf install -y 'dnf-command(config-manager)' \ && dnf clean all \ && rm -rf /var/cache/yum +ENV ARROW_VERSION 11.0.0 + +# FIXME: Keep only binaries, and not devel libraries +RUN dnf update -y \ + && dnf install -y \ + https://apache.jfrog.io/artifactory/arrow/almalinux/$(cut -d: -f5 /etc/system-release-cpe | cut -d. -f1)/apache-arrow-release-latest.rpm \ + && dnf config-manager --set-enabled epel \ + && dnf config-manager --set-enabled powertools \ + && dnf install -y arrow11-libs-$ARROW_VERSION \ + && dnf install -y parquet11-libs-$ARROW_VERSION \ + && dnf clean all \ + && rm -rf /var/cache/yum + RUN useradd --create-home --uid 1000 --shell /bin/bash qserv WORKDIR /home/qserv From 8cad85bdeeb855e7f38d377743659f11aa03f3f0 Mon Sep 17 00:00:00 2001 From: Fabrice Jammes Date: Fri, 10 Mar 2023 16:36:16 +0100 Subject: [PATCH 2/5] Document how to set up a development container --- doc/dev/quick-start-devel.rst | 39 +++++++++++++++++++++++++++++++---- 1 file changed, 35 insertions(+), 4 deletions(-) diff --git a/doc/dev/quick-start-devel.rst b/doc/dev/quick-start-devel.rst index 588ac1470..4b0cd7b34 100644 --- a/doc/dev/quick-start-devel.rst +++ b/doc/dev/quick-start-devel.rst @@ -4,8 +4,39 @@ Quick start guide for developers ################################ -Using Qserv with your own custom code or arbitrary versions can be done by -connecting your local git repository with an eups software stack containing Qserv -dependencies. - +Using Qserv with your own custom code or arbitrary versions can be done using Qserv build containers. +Full documentation is available here: https://confluence.lsstcorp.org/display/DM/Qserv-Lite+Builds+in+Containers + +*********************************** +Bootstrap a development environment +*********************************** + +.. code:: sh + + # Clone qserv and its submodules + git clone --recursive https://github.com/lsst/qserv + cd qserv + # Build Qserv base images + qserv build-images + # Build Qserv user image + qserv build-user-build-image + # Open a shell in a development container + # NOTE: Add '--user=qserv' option to command below if user id equals 1000 + # NOTE: Use --user-build-image in order to avoid rebuilding a Qserv user image after each commit + qserv run-build --user-build-image docker.io/qserv/lite-build-fjammes:2023.2.1-rc2-10-g6624d8b28 + # Build host code inside Qserv container + make + + +***************************** +Build an image for production +***************************** + +.. code:: sh + + # NOTE: Remove build/ directory to restart the build from scratch. + # This can fix build errors. + rm -rf build/ + # NOTE: Add '--user=qserv' option to command below if user id equals 1000 + qserv build -j8 \ No newline at end of file From 78b548a6d861eef95588f3dcfdc812423f98eebf Mon Sep 17 00:00:00 2001 From: elles Date: Thu, 16 Mar 2023 12:31:48 +0100 Subject: [PATCH 3/5] Add parquet file interface to partitioner --- Dockerfile | 5 + res.txt | 0 src/partition/CMakeLists.txt | 5 + src/partition/CmdLineUtils.cc | 16 +- src/partition/FileUtils.cc | 65 +++++- src/partition/FileUtils.h | 37 +++- src/partition/InputLines.cc | 64 +++++- src/partition/InputLines.h | 8 +- src/partition/ParquetInterface.cc | 353 ++++++++++++++++++++++++++++++ src/partition/ParquetInterface.h | 51 +++++ 10 files changed, 588 insertions(+), 16 deletions(-) create mode 100644 Dockerfile create mode 100644 res.txt create mode 100644 src/partition/ParquetInterface.cc create mode 100644 src/partition/ParquetInterface.h diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 000000000..739e07024 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,5 @@ + + +FROM qserv/lite-qserv:2023.2.1-rc2-13-g7fc142a05 + +CMD tail -f /dev/null diff --git a/res.txt b/res.txt new file mode 100644 index 000000000..e69de29bb diff --git a/src/partition/CMakeLists.txt b/src/partition/CMakeLists.txt index 2a3a8c940..fc97a18bd 100644 --- a/src/partition/CMakeLists.txt +++ b/src/partition/CMakeLists.txt @@ -12,12 +12,15 @@ target_sources(partition PRIVATE HtmIndex.cc InputLines.cc ObjectIndex.cc + ParquetInterface.cc ) target_link_libraries(partition PUBLIC boost_program_options boost_thread Threads::Threads + arrow + parquet ) install(TARGETS partition) @@ -31,6 +34,8 @@ FUNCTION(partition_apps) partition boost_filesystem sphgeom + arrow + parquet ) install(TARGETS ${APP}) ENDFOREACH() diff --git a/src/partition/CmdLineUtils.cc b/src/partition/CmdLineUtils.cc index a8ff2cb1a..69d826a25 100644 --- a/src/partition/CmdLineUtils.cc +++ b/src/partition/CmdLineUtils.cc @@ -28,6 +28,7 @@ #include #include #include +#include "boost/algorithm/string/predicate.hpp" #include "partition/ConfigStore.h" #include "partition/Constants.h" @@ -140,6 +141,7 @@ InputLines const makeInputLines(ConfigStore const& config) { "using --in.path."); } std::vector paths; + bool bIsParquetFile = false; for (auto&& s : config.get>("in.path")) { fs::path p(s); fs::file_status stat = fs::status(p); @@ -152,13 +154,25 @@ InputLines const makeInputLines(ConfigStore const& config) { } } } + bIsParquetFile = (boost::algorithm::ends_with(s.c_str(), ".parquet") || + boost::algorithm::ends_with(s.c_str(), ".parq")); } if (paths.empty()) { throw std::runtime_error( "No non-empty input files found among the " "files and directories specified via --in.path."); } - return InputLines(paths, blockSize * MiB, false); + + // Arrow : collect parameter name list to be read from parquet file + std::vector names; + if (config.has("out.csv.field")) names = config.get>("in.csv.field"); + // Direct parquet file reading is not possible using MT - March 2023 + if (bIsParquetFile && config.has("mr.num-workers") && config.get("mr.num-workers") > 1) + throw std::runtime_error( + "Parquet file partition cannot be done in MT - mr.num-workers parameter must be set to 1 in " + "parition.json file "); + + return InputLines(paths, blockSize * MiB, false, names); } void defineOutputOptions(po::options_description& opts) { diff --git a/src/partition/FileUtils.cc b/src/partition/FileUtils.cc index e69f1c498..c53f6fd1d 100644 --- a/src/partition/FileUtils.cc +++ b/src/partition/FileUtils.cc @@ -21,6 +21,7 @@ */ #include "partition/FileUtils.h" +#include "partition/ParquetInterface.h" #include #include @@ -64,7 +65,8 @@ InputFile::~InputFile() { } } -void InputFile::read(void *buf, off_t off, size_t sz) const { +void InputFile::read(void *buf, off_t off, size_t sz, int *bufferSize, + std::vector params) const { char msg[1024]; uint8_t *cur = static_cast(buf); while (sz > 0) { @@ -82,6 +84,67 @@ void InputFile::read(void *buf, off_t off, size_t sz) const { } } +InputFileArrow::InputFileArrow(fs::path const &path, off_t blockSize) : InputFile(path), _fd(-1), _sz(-1) { + char msg[1024]; + struct ::stat st; + + m_batchReader = std::make_unique(ParquetFile(path.string().c_str())); + arrow::Status status = m_batchReader->SetupBatchReader(blockSize); + if (!status.ok()) throw std::runtime_error("Could not setup Arrow recordbatchreader"); + + int fd = ::open(path.string().c_str(), O_RDONLY); + + if (fd == -1) { + ::strerror_r(errno, msg, sizeof(msg)); + throw std::runtime_error("open() failed [" + path.string() + "]: " + msg); + } + if (::fstat(fd, &st) != 0) { + ::strerror_r(errno, msg, sizeof(msg)); + ::close(fd); + throw std::runtime_error("fstat() failed [" + path.string() + "]: " + msg); + } + _fd = fd; + _sz = st.st_size; +} + +InputFileArrow::~InputFileArrow() { + char msg[1024]; + if (_fd != -1 && ::close(_fd) != 0) { + ::snprintf(msg, sizeof(msg), "close() failed [%s]", _path.string().c_str()); + ::perror(msg); + ::exit(EXIT_FAILURE); + } +} + +int InputFileArrow::getBatchNumber() const { return m_batchReader->GetTotalBatchNumber(); } + +void InputFileArrow::read(void *buf, off_t off, size_t sz, int *bufferSize, + std::vector params) const { + char msg[1024]; + uint8_t *cur = static_cast(buf); + + int buffSize; + arrow::Status status = m_batchReader->ReadNextBatch_Table2CSV(cur, buffSize, params); + // return the buffersize because it varies from batch to batch + *bufferSize = buffSize; + + if (status.ok()) { + ssize_t n = buffSize; + if (n == 0) { + throw std::runtime_error("pread() received EOF [" + _path.string() + "]"); + } else if (n < 0 && errno != EINTR) { + ::strerror_r(errno, msg, sizeof(msg)); + throw std::runtime_error("pread() failed [" + _path.string() + "]: " + msg); + } + /*else if (n > 0) { + sz -= static_cast(n); + off += n; + cur += n; + } + */ + } +} + OutputFile::OutputFile(fs::path const &path, bool truncate) : _path(path), _fd(-1) { char msg[1024]; int flags = O_CREAT | O_WRONLY; diff --git a/src/partition/FileUtils.h b/src/partition/FileUtils.h index d6af09627..0598506ba 100644 --- a/src/partition/FileUtils.h +++ b/src/partition/FileUtils.h @@ -34,6 +34,8 @@ namespace lsst::partition { +class ParquetFile; + /// An input file. Safe for use from multiple threads. class InputFile { public: @@ -46,8 +48,12 @@ class InputFile { /// Return the path of the input file. boost::filesystem::path const &path() const { return _path; } + // Needed in derived class InputFileArrow + virtual int getBatchNumber() const { return -1; } + /// Read a range of bytes into `buf`. - void read(void *buf, off_t off, size_t sz) const; + virtual void read(void *buf, off_t off, size_t sz, int *bufferSize = 0, + std::vector params = {}) const; private: // Disable copy construction and assignment. @@ -59,6 +65,35 @@ class InputFile { off_t _sz; }; +class InputFileArrow : public InputFile { +public: + explicit InputFileArrow(boost::filesystem::path const &path, off_t blockSize); + ~InputFileArrow(); + + /// Return the size of the input file. + off_t size() const { return _sz; } + + /// Return the path of the input file. + boost::filesystem::path const &path() const { return _path; } + + int getBatchNumber() const; + + /// Read a range of bytes into `buf`. + void read(void *buf, off_t off, size_t sz, int *bufferSize, std::vector params) const; + +private: + // Disable copy construction and assignment. + InputFileArrow(InputFileArrow const &); + InputFileArrow &operator=(InputFileArrow const &); + + boost::filesystem::path _path; + std::vector _paramNames; + int _fd; + off_t _sz; + + std::unique_ptr m_batchReader; +}; + /// An output file that can only be appended to, and which should only be /// used by a single thread at a time. class OutputFile { diff --git a/src/partition/InputLines.cc b/src/partition/InputLines.cc index 23e6f69d4..5a9d1daf2 100644 --- a/src/partition/InputLines.cc +++ b/src/partition/InputLines.cc @@ -28,6 +28,7 @@ #include "boost/make_shared.hpp" #include "boost/static_assert.hpp" #include "boost/thread.hpp" +#include "boost/algorithm/string/predicate.hpp" #include "partition/Constants.h" #include "partition/FileUtils.h" @@ -92,15 +93,23 @@ struct Block { Block() : file(), offset(0), size(0), head(), tail() {} - CharPtrPair const read(char *buf, bool skipFirstLine); + CharPtrPair const read(char *buf, bool skipFirstLine, std::vector paramNames = {}); }; // Read a file block and handle the lines crossing its boundaries. -CharPtrPair const Block::read(char *buf, bool skipFirstLine) { +CharPtrPair const Block::read(char *buf, bool skipFirstLine, std::vector paramNames) { // Read into buf, leaving space for a line on either side of the block. char *readBeg = buf + MAX_LINE_SIZE; char *readEnd = readBeg + size; - file->read(readBeg, offset, size); + + // Arrow/Parquet : retrieve the real size of the arrow CSV block + int bufferSize = 0; + file->read(readBeg, offset, size, &bufferSize, paramNames); + if (bufferSize > 0) { + size = bufferSize; + readEnd = readBeg + size; + } + // The responsibility for returning a line which crosses the beginning // or end of this block lies with the last thread to encounter the // line. @@ -164,14 +173,36 @@ CharPtrPair const Block::read(char *buf, bool skipFirstLine) { std::vector const split(fs::path const &path, off_t blockSize) { std::vector blocks; Block b; - b.file = boost::make_shared(path); + b.offset = 0; + off_t fileSize = 0; + off_t numBlocks = 0; + + if (boost::algorithm::ends_with(path.c_str(), ".parquet") || + boost::algorithm::ends_with(path.c_str(), ".parq")) { + b.file = boost::make_shared(path, blockSize); + + b.size = blockSize; + fileSize = b.file->getBatchNumber(); + numBlocks = b.file->getBatchNumber(); + + blocks.reserve(numBlocks); + for (off_t i = 0; i < numBlocks; ++i) { + b.offset = 1; + blocks.push_back(b); + } + return blocks; + } + + b.file = boost::make_shared(path); + b.size = blockSize; - off_t const fileSize = b.file->size(); - off_t numBlocks = fileSize / blockSize; + fileSize = b.file->size(); + numBlocks = fileSize / blockSize; if (fileSize % blockSize != 0) { ++numBlocks; } + blocks.reserve(numBlocks); for (off_t i = 0; i < numBlocks; ++i, b.offset += blockSize) { b.size = static_cast(std::min(fileSize - b.offset, blockSize)); @@ -190,7 +221,8 @@ std::vector const split(fs::path const &path, off_t blockSize) { class InputLines::Impl { public: - Impl(std::vector const &paths, size_t blockSize, bool skipFirstLine); + Impl(std::vector const &paths, size_t blockSize, bool skipFirstLine, + std::vector paramNames); ~Impl() {} size_t getBlockSize() const { return _blockSize; } @@ -200,6 +232,8 @@ class InputLines::Impl { return _blockCount == 0; } + std::vector getParameterNames() const { return _paramNames; } + CharPtrPair const read(char *buf); private: @@ -210,6 +244,7 @@ class InputLines::Impl { size_t const _blockSize; bool const _skipFirstLine; + const std::vector _paramNames; char _pad0[CACHE_LINE_SIZE]; @@ -221,9 +256,11 @@ class InputLines::Impl { char _pad1[CACHE_LINE_SIZE]; }; -InputLines::Impl::Impl(std::vector const &paths, size_t blockSize, bool skipFirstLine) +InputLines::Impl::Impl(std::vector const &paths, size_t blockSize, bool skipFirstLine, + std::vector paramNames) : _blockSize(std::min(std::max(blockSize, 1 * MiB), 1 * GiB)), _skipFirstLine(skipFirstLine), + _paramNames(paramNames), _mutex(), _blockCount(paths.size()), _queue(), @@ -238,7 +275,7 @@ CharPtrPair const InputLines::Impl::read(char *buf) { _queue.pop_back(); --_blockCount; lock.unlock(); // allow block reads to proceed in parallel - return b.read(buf, _skipFirstLine); + return b.read(buf, _skipFirstLine, _paramNames); } else if (!_paths.empty()) { // The queue is empty - grab the next file and split it into blocks. fs::path path = _paths.back(); @@ -261,7 +298,7 @@ CharPtrPair const InputLines::Impl::read(char *buf) { _queue.insert(_queue.end(), v.rbegin(), v.rend() - 1); _blockCount += v.size() - 1; lock.unlock(); // allow block reads to proceed in parallel - return b.read(buf, _skipFirstLine); + return b.read(buf, _skipFirstLine, _paramNames); } else { // The queue is empty and all input paths have been processed, but // the block count is non-zero. This means one or more threads are @@ -279,8 +316,9 @@ CharPtrPair const InputLines::Impl::read(char *buf) { // Method delegation. -InputLines::InputLines(std::vector const &paths, size_t blockSize, bool skipFirstLine) - : _impl(boost::make_shared(paths, blockSize, skipFirstLine)) {} +InputLines::InputLines(std::vector const &paths, size_t blockSize, bool skipFirstLine, + std::vector paramNames) + : _impl(boost::make_shared(paths, blockSize, skipFirstLine, paramNames)) {} size_t InputLines::getBlockSize() const { return _impl ? _impl->getBlockSize() : 0; } @@ -288,6 +326,8 @@ size_t InputLines::getMinimumBufferCapacity() const { return _impl ? _impl->getM bool InputLines::empty() const { return _impl ? _impl->empty() : true; } +std::vector InputLines::getParameterNames() const { return _impl->getParameterNames(); } + CharPtrPair const InputLines::read(char *buf) { if (_impl) { return _impl->read(buf); diff --git a/src/partition/InputLines.h b/src/partition/InputLines.h index eae0d4b97..f7fa6bd3d 100644 --- a/src/partition/InputLines.h +++ b/src/partition/InputLines.h @@ -33,6 +33,8 @@ #include "boost/filesystem.hpp" #include "boost/shared_ptr.hpp" +#include "ParquetInterface.h" + namespace lsst::partition { /// The InputLines class reads lines from a list of input text files in an IO @@ -68,7 +70,8 @@ class InputLines { /// ignoring the first line in each file. The user is responsible for /// ensuring that the file list contains no empty or duplicate entries. /// Note that `blockSize` is clamped to lie between 1MiB and 1GiB. - InputLines(std::vector const &paths, size_t blockSize, bool skipFirstLine); + InputLines(std::vector const &paths, size_t blockSize, bool skipFirstLine, + std::vector paramNames = {}); ~InputLines() {} @@ -81,6 +84,9 @@ class InputLines { /// Has all the input been read? bool empty() const; + /// Return the parameter names as defined in in.csv + std::vector getParameterNames() const; + /// Read consecutive lines of text into `buf`, and return a pointer range /// `[i,end)` identifying the bytes in `buf` containing valid data. The /// pointers returned will both be NULL if and only if there is no more diff --git a/src/partition/ParquetInterface.cc b/src/partition/ParquetInterface.cc new file mode 100644 index 000000000..8f5e386f0 --- /dev/null +++ b/src/partition/ParquetInterface.cc @@ -0,0 +1,353 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 + +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// #include "arrow/util/key_value_metadata.h" + +#include +#include + +#include "ParquetInterface.h" + +namespace lsst::partition { + +std::map, int> typeBufSize{ + {arrow::int8(), 3}, {arrow::int16(), 5}, {arrow::int32(), 10}, {arrow::int64(), 20}, + {arrow::uint8(), 3}, {arrow::uint16(), 5}, {arrow::uint32(), 10}, {arrow::uint64(), 20}, + {arrow::boolean(), 1}, {arrow::float16(), 20}, {arrow::float32(), 20}, {arrow::float64(), 20}, + {arrow::float16(), 20}, {arrow::date32(), 20}, {arrow::date64(), 20}}; + +ParquetFile::ParquetFile(std::string fileName, int maxMemAllocated) + : m_path_to_file(fileName), + m_maxMemory(maxMemAllocated), + m_vmRSS_init(0), + m_batchNumber(0), + m_batchSize(0) {} + +// Memory used by the current process +int ParquetFile::DumpProcessMemory(std::string idValue, bool bVerbose) const { + int tSize = 0, resident = 0, share = 0; + std::ifstream buffer("/proc/self/statm"); + buffer >> tSize >> resident >> share; + buffer.close(); + + long page_size_kb = sysconf(_SC_PAGE_SIZE) / 1024; // in case x86-64 is configured to use 2MB pages + + double vmSize = (tSize * page_size_kb) / 1024.0; + double rss = (resident * page_size_kb) / 1024.0; + double shared_mem = (share * page_size_kb) / 1024.0; + + if (bVerbose) { + std::cout << "VmSize - " << vmSize << " MB "; + std::cout << "VmRSS - " << rss << " MB "; + std::cout << "Shared Memory - " << shared_mem << " MB "; + std::cout << "Private Memory - " << rss - shared_mem << "MB" << std::endl; + } + + if (!idValue.empty()) { + std::map res{{"VmSize", vmSize}, {"VmRSS", rss}, {"SharedMem", shared_mem}}; + if (res.find(idValue) != res.end()) return res[idValue]; + } + return 0; +} + +// Compute the memory size of a row in butes by adding its element size +// stringDefaultSize is the default size of a parameter identified as a string +int ParquetFile::GetRecordSize(std::shared_ptr schema, int stringDefaultSize) const { + int recordSize = 0; + + const arrow::FieldVector& vFields = schema->fields(); + for (const auto& field : vFields) { + int fieldSize = field->type()->byte_width(); + if (fieldSize < 0) fieldSize = stringDefaultSize; + recordSize += fieldSize; + } + std::cout << "Record size (Bytes) " << recordSize << std::endl; + return recordSize; +} + +// Compute the memory size of a row in butes by adding its element size +// stringDefaultSize is the default size of a parameter identified as a string +int ParquetFile::GetStringRecordSize(std::shared_ptr schema, int stringDefaultSize) const { + int recordSize = 0; + + typeBufSize.insert({arrow::utf8(), stringDefaultSize}); + typeBufSize.insert({arrow::large_utf8(), stringDefaultSize}); + + const arrow::FieldVector& vFields = schema->fields(); + for (const auto& field : vFields) { + int fieldSize = typeBufSize[field->type()]; + recordSize += fieldSize; + recordSize++; + } + std::cout << "Record size (approx. CSV string length) " << recordSize << std::endl; + return recordSize; +} + +// setup the reader that access te parquet file +arrow::Status ParquetFile::SetupBatchReader(int maxBufferSize) { + m_vmRSS_init = DumpProcessMemory("VmRSS", true); + + int fileRowNumber = GetTotalRowNumber(m_path_to_file); + + arrow::MemoryPool* pool = arrow::default_memory_pool(); + + // Configure general Parquet reader settings + auto reader_properties = parquet::ReaderProperties(pool); + reader_properties.set_buffer_size(4096 * 4); + reader_properties.enable_buffered_stream(); + + // Configure Arrow-specific Parquet reader settings + auto arrow_reader_props = parquet::ArrowReaderProperties(); + m_batchSize = 5000; // batchSize is in fact the number of rows + arrow_reader_props.set_batch_size(m_batchSize); // default 64 * 1024 + + parquet::arrow::FileReaderBuilder reader_builder; + ARROW_RETURN_NOT_OK(reader_builder.OpenFile(m_path_to_file, /*memory_map=*/false, reader_properties)); + reader_builder.memory_pool(pool); + reader_builder.properties(arrow_reader_props); + + ARROW_ASSIGN_OR_RAISE(m_arrow_reader_gbl, reader_builder.Build()); + ARROW_RETURN_NOT_OK(m_arrow_reader_gbl->GetRecordBatchReader(&m_rb_reader_gbl)); + + // Compute the nimber of lines read by each batch in function of the maximum memory + // allocated to the process + std::shared_ptr<::arrow::Schema> schema; + arrow::Status st = m_arrow_reader_gbl->GetSchema(&schema); + + // std::cout<ToString()< batch size : " << batchSize_mem << std::endl; + + int64_t batchSize_buf = -1; + m_maxBufferSize = maxBufferSize; + if (maxBufferSize > 0) { + m_recordBufferSize = GetStringRecordSize(schema); + // batchSize_buf = int((maxBufferSize*1024*1024)/m_recordBufferSize); + batchSize_buf = int(maxBufferSize / m_recordBufferSize); + std::cout << "\nMax buffer size : " << maxBufferSize << " vs " << m_recordBufferSize + << " -> batch size : " << batchSize_buf << std::endl; + } + + m_batchSize = std::min(batchSize_mem, batchSize_buf); + m_arrow_reader_gbl->set_batch_size(m_batchSize); + m_totalBatchNumber = int(fileRowNumber / m_batchSize); + if (m_totalBatchNumber * m_batchSize < fileRowNumber) m_totalBatchNumber++; + + std::cout << "Number of rows : " << fileRowNumber << " batchSize " << m_batchSize << std::endl; + std::cout << "RecordBatchReader : batch number " << m_totalBatchNumber << std::endl; + return arrow::Status::OK(); +} + +int ParquetFile::GetTotalRowNumber(std::string fileName) const { + std::shared_ptr infile; + PARQUET_ASSIGN_OR_THROW(infile, arrow::io::ReadableFile::Open(fileName, arrow::default_memory_pool())); + + std::unique_ptr reader; + PARQUET_THROW_NOT_OK(parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader)); + + std::shared_ptr metadata = reader->parquet_reader()->metadata(); + // std::cout<<"Metadata version : "<version()<created_by() <num_row_groups() <num_rows() <num_rows(); +} + +// Read an arrow batch, format the table acording to the config file and save it in csv format +arrow::Status ParquetFile::ReadNextBatch_Table2CSV(void* buf, int& buffSize, + std::vector params) { + std::shared_ptr table_loc; + + m_parameterNames = params; + arrow::Status batchStatus = ReadNextBatchTable_Formatted(table_loc); + + if (!batchStatus.ok()) return arrow::Status::ExecutionError("Error while reading and formating batch"); + + arrow::Status status = Table2CSVBuffer(table_loc, buffSize, buf); + + if (status.ok()) return arrow::Status::OK(); + + return arrow::Status::ExecutionError("Error while writing table to CSV buffer"); +} + +// Create char buffer containing the table in csv format +arrow::Status ParquetFile::Table2CSVBuffer(std::shared_ptr& table, int& buffSize, void* buf) { + ARROW_ASSIGN_OR_RAISE(auto outstream, arrow::io::BufferOutputStream::Create(1 << 10)); + + // Options : null string, no header, no quotes around strings + arrow::csv::WriteOptions writeOpt = arrow::csv::WriteOptions::Defaults(); + writeOpt.null_string = "\\N"; + writeOpt.include_header = false; + writeOpt.quoting_style = arrow::csv::QuotingStyle::None; + + ARROW_RETURN_NOT_OK(arrow::csv::WriteCSV(*table, writeOpt, outstream.get())); + ARROW_ASSIGN_OR_RAISE(auto buffer, outstream->Finish()); + + // auto buffer_ptr = buffer.get()->data(); + buffSize = buffer->size(); + std::cout << "ParquetFile::Table2CSVBuffer - buffer length : " << buffSize << " // " << m_maxBufferSize + << std::endl; + + memcpy(buf, (void*)buffer.get()->data(), buffer->size()); + + // auto myfile = std::fstream("buffer_"+std::to_string(m_batchNumber)+".csv", std::ios::out); + // myfile.write((char*)&buffer_ptr[0], buffSize); + // myfile.close(); + + return arrow::Status::OK(); +} + +// Read a batch from the file and format the table iaccording to the partition configuration file +// -> column reordering, true/false -> 1/0, remove null values, ... +arrow::Status ParquetFile::ReadNextBatchTable_Formatted(std::shared_ptr& outputTable) { + arrow::Result> maybe_batch = m_rb_reader_gbl->Next(); + + std::vector paramNotFound; + std::map> fieldConfig; + + if (maybe_batch != nullptr) { + // DumpProcessMemory("VmRSS", true); + + ARROW_ASSIGN_OR_RAISE(auto batch, maybe_batch); + std::shared_ptr initTable; + ARROW_ASSIGN_OR_RAISE(initTable, arrow::Table::FromRecordBatches(batch->schema(), {batch})); + + // Increment the batch number + m_batchNumber++; + + const arrow::FieldVector fields = initTable->schema()->fields(); + for (auto fd : fields) { + fieldConfig[fd->name()] = fd; + } + + arrow::FieldVector formatedTable_fields; + std::vector> formatedTable_columns; + + // Loop over the column names as defined in the partition config file + for (std::string paramName : m_parameterNames) { + std::shared_ptr chunkedArray = initTable->GetColumnByName(paramName); + + // Column not found in the arrow table... + if (chunkedArray == NULLPTR) { + paramNotFound.push_back(paramName); + } else { + // Column type is boolean -> switch to 0/1 representation + if (fieldConfig[paramName]->type() == arrow::boolean()) { + auto newChunkedArray = ChunkArrayReformatBoolean(chunkedArray, true); + if (newChunkedArray == nullptr) { + return arrow::Status::ExecutionError("Error while formating boolean chunk array"); + } + formatedTable_columns.push_back(newChunkedArray); + + std::shared_ptr newField = + std::make_shared(fieldConfig[paramName]->name(), arrow::int8()); + formatedTable_fields.push_back(newField); + } + // Simply keep the chunk as it is defined in teh arrow table + else { + formatedTable_columns.push_back(chunkedArray); + formatedTable_fields.push_back(fieldConfig[paramName]); + } + } + } // end of loop over parameters + + // If a column is not found (i.e. a parameter defined in partition.json does not exist in parquet + // file), throw an error and stop + if (paramNotFound.size() > 0) { + for (auto name : paramNotFound) + std::cout << "ERROR : param name " << name << " not found in table columns" << std::endl; + return arrow::Status::ExecutionError("Configuration file : missing parameter in table"); + } + + // Create the arrow::schema of the new table + std::shared_ptr formatedSchema = std::make_shared( + arrow::Schema(formatedTable_fields, initTable->schema()->endianness())); + + // and finally create the arrow::Table that matches the partition config file + outputTable = arrow::Table::Make(formatedSchema, formatedTable_columns); + arrow::Status resTable = outputTable->ValidateFull(); + if (!resTable.ok()) { + std::cout << "ERROR : formated table full validation not OK" << std::endl; + return arrow::Status::ExecutionError("CSV output table not valid"); + } + + return arrow::Status::OK(); + } + + // The end of the parquet file has been reached + return arrow::Status::ExecutionError("End of RecorBatchReader iterator"); +} + +// Reformat a boolean chunk array : true/false boolean array => 1/0 int8 array +std::shared_ptr ParquetFile::ChunkArrayReformatBoolean( + std::shared_ptr& inputArray, bool bCheck) { + std::vector> newChunks; + std::shared_ptr array; + arrow::Int8Builder builder; + + const arrow::ArrayVector& chunks = inputArray->chunks(); + for (auto& elemChunk : chunks) { + std::shared_ptr chunkData = elemChunk->data(); + builder.Reset(); + + auto bool_array = std::static_pointer_cast(elemChunk); + for (int64_t i = 0; i < elemChunk->length(); ++i) { + bool bIsNull = bool_array->IsNull(i); + if (bIsNull) + arrow::Status status = builder.AppendNull(); + else + arrow::Status status = builder.Append(bool_array->Value(i)); + } + + if (!builder.Finish(&array).ok()) { + std::string errorMsg = "ERROR while finalizing " + inputArray->ToString() + " new chunked array"; + std::cout << errorMsg << std::endl; + return nullptr; + } + + if (bCheck) { + assert(array->length() == elemChunk->length()); + + auto new_array = std::static_pointer_cast(array); + for (int64_t i = 0; i < elemChunk->length(); ++i) { + assert(bool_array->IsNull(i) == array->IsNull(i)); + assert((bool_array->Value(i) == true && new_array->Value(i) != 0) || + (bool_array->Value(i) == false && new_array->Value(i) == 0)); + } + } + + newChunks.push_back(std::move(array)); + } + + auto newChunkedArray = std::make_shared(std::move(newChunks)); + + auto status = newChunkedArray->ValidateFull(); + if (!status.ok()) { + std::string errorMsg = "Invalid new chunkArraay : " + status.ToString(); + std::cout << errorMsg << std::endl; + return nullptr; + } + + return newChunkedArray; +} + +} // namespace lsst::partition diff --git a/src/partition/ParquetInterface.h b/src/partition/ParquetInterface.h new file mode 100644 index 000000000..c61ea8813 --- /dev/null +++ b/src/partition/ParquetInterface.h @@ -0,0 +1,51 @@ + +#include "arrow/api.h" +#include "arrow/io/api.h" +#include "arrow/result.h" +#include "arrow/type.h" +#include "arrow/chunked_array.h" +#include "arrow/util/type_fwd.h" +#include "parquet/arrow/reader.h" +#include "parquet/arrow/writer.h" + +#include +#include +#include +#include + +// class PartitionConfig; + +namespace lsst::partition { + +class ParquetFile { +public: + ParquetFile(std::string fileName, int maxMemAllocated = 3000 /*MB*/); + arrow::Status SetupBatchReader(int maxBufferSize = -1); + arrow::Status ReadNextBatch_Table2CSV(void* buf, int& buffSize, std::vector params); + + int GetBatchSize() const { return m_batchSize; } + int GetTotalBatchNumber() const { return m_totalBatchNumber; } + +private: + int DumpProcessMemory(std::string idValue = "", bool bVerbose = false) const; + int GetRecordSize(std::shared_ptr schema, int defaultSize = 32) const; + int GetStringRecordSize(std::shared_ptr schema, int defaultSize = 32) const; + arrow::Status ReadNextBatchTable_Formatted(std::shared_ptr& table); + arrow::Status Table2CSVBuffer(std::shared_ptr& table, int& buffSize, void* buf); + int GetTotalRowNumber(std::string fileName) const; + std::shared_ptr ChunkArrayReformatBoolean( + std::shared_ptr& inputArray, bool bCheck = false); + + std::string m_path_to_file; + std::string m_part_config_file; + int m_maxMemory, m_recordSize, m_recordBufferSize; + int m_vmRSS_init; + int m_batchNumber, m_batchSize; + int m_totalBatchNumber; + int m_maxBufferSize; + + std::vector m_parameterNames; + std::unique_ptr m_arrow_reader_gbl; + std::unique_ptr<::arrow::RecordBatchReader> m_rb_reader_gbl; +}; +} // namespace lsst::partition From c15d41f603a72747133edc34fedd7b042bfeb423 Mon Sep 17 00:00:00 2001 From: elles Date: Fri, 2 Jun 2023 13:08:19 +0200 Subject: [PATCH 4/5] Add parquet interface : first review --- Dockerfile | 5 - res.txt | 0 src/partition/CMakeLists.txt | 1 + src/partition/CmdLineUtils.cc | 29 +++-- src/partition/FileUtils.cc | 126 ++++++++++--------- src/partition/FileUtils.h | 74 ++++++++---- src/partition/InputLines.cc | 39 +++--- src/partition/InputLines.h | 16 +-- src/partition/ParquetInterface.cc | 172 +++++++++++++------------- src/partition/ParquetInterface.h | 193 +++++++++++++++++++++++++----- 10 files changed, 418 insertions(+), 237 deletions(-) delete mode 100644 Dockerfile delete mode 100644 res.txt diff --git a/Dockerfile b/Dockerfile deleted file mode 100644 index 739e07024..000000000 --- a/Dockerfile +++ /dev/null @@ -1,5 +0,0 @@ - - -FROM qserv/lite-qserv:2023.2.1-rc2-13-g7fc142a05 - -CMD tail -f /dev/null diff --git a/res.txt b/res.txt deleted file mode 100644 index e69de29bb..000000000 diff --git a/src/partition/CMakeLists.txt b/src/partition/CMakeLists.txt index fc97a18bd..6bf315082 100644 --- a/src/partition/CMakeLists.txt +++ b/src/partition/CMakeLists.txt @@ -21,6 +21,7 @@ target_link_libraries(partition PUBLIC Threads::Threads arrow parquet + log ) install(TARGETS partition) diff --git a/src/partition/CmdLineUtils.cc b/src/partition/CmdLineUtils.cc index 69d826a25..078532c68 100644 --- a/src/partition/CmdLineUtils.cc +++ b/src/partition/CmdLineUtils.cc @@ -154,8 +154,9 @@ InputLines const makeInputLines(ConfigStore const& config) { } } } - bIsParquetFile = (boost::algorithm::ends_with(s.c_str(), ".parquet") || - boost::algorithm::ends_with(s.c_str(), ".parq")); + if (!bIsParquetFile) + bIsParquetFile = (boost::algorithm::ends_with(s.c_str(), ".parquet") || + boost::algorithm::ends_with(s.c_str(), ".parq")); } if (paths.empty()) { throw std::runtime_error( @@ -163,16 +164,30 @@ InputLines const makeInputLines(ConfigStore const& config) { "files and directories specified via --in.path."); } - // Arrow : collect parameter name list to be read from parquet file + // return InputLines(paths, blockSize * MiB, false, names); + if (!bIsParquetFile) return InputLines(paths, blockSize * MiB, false); + + // In case input files are parquet files, data from config file have to be transfered to the parquet + // reading class Arrow : collect parameter name list to be read from parquet file std::vector names; - if (config.has("out.csv.field")) names = config.get>("in.csv.field"); + std::string st_null = ""; + std::string st_delimiter = ""; + std::string st_escape = ""; + + if (config.has("in.csv.field")) names = config.get>("in.csv.field"); + if (config.has("in.csv.null")) st_null = config.get("in.csv.null"); + if (config.has("in.csv.delimiter")) st_delimiter = config.get("in.csv.delimiter"); + if (config.has("in.csv.escape")) st_escape = config.get("in.csv.escape"); + + ConfigParamArrow const configParamArrow{names, st_null, st_delimiter, st_escape}; + // Direct parquet file reading is not possible using MT - March 2023 - if (bIsParquetFile && config.has("mr.num-workers") && config.get("mr.num-workers") > 1) + if (config.has("mr.num-workers") && config.get("mr.num-workers") > 1) throw std::runtime_error( "Parquet file partition cannot be done in MT - mr.num-workers parameter must be set to 1 in " - "parition.json file "); + "partition.json file "); - return InputLines(paths, blockSize * MiB, false, names); + return InputLines(paths, blockSize * MiB, false, configParamArrow); } void defineOutputOptions(po::options_description& opts) { diff --git a/src/partition/FileUtils.cc b/src/partition/FileUtils.cc index c53f6fd1d..dd5b4f606 100644 --- a/src/partition/FileUtils.cc +++ b/src/partition/FileUtils.cc @@ -35,47 +35,54 @@ #include +// LSST headers +#include "lsst/log/Log.h" + +namespace { +LOG_LOGGER _log = LOG_GET("lsst.qserv.partitionner"); +} // namespace + namespace fs = boost::filesystem; namespace lsst::partition { InputFile::InputFile(fs::path const &path) : _path(path), _fd(-1), _sz(-1) { - char msg[1024]; struct ::stat st; int fd = ::open(path.string().c_str(), O_RDONLY); if (fd == -1) { - ::strerror_r(errno, msg, sizeof(msg)); - throw std::runtime_error("open() failed [" + path.string() + "]: " + msg); + ::strerror_r(errno, _msg, sizeof(_msg)); + throw std::runtime_error("InputFile::" + std::string(__func__) + " :open() failed [" + path.string() + + "]: " + _msg); } if (::fstat(fd, &st) != 0) { - ::strerror_r(errno, msg, sizeof(msg)); + ::strerror_r(errno, _msg, sizeof(_msg)); ::close(fd); - throw std::runtime_error("fstat() failed [" + path.string() + "]: " + msg); + throw std::runtime_error("InputFile::" + std::string(__func__) + " :fstat() failed [" + + path.string() + "]: " + _msg); } _fd = fd; _sz = st.st_size; } InputFile::~InputFile() { - char msg[1024]; if (_fd != -1 && ::close(_fd) != 0) { - ::snprintf(msg, sizeof(msg), "close() failed [%s]", _path.string().c_str()); - ::perror(msg); - ::exit(EXIT_FAILURE); + ::snprintf(_msg, sizeof(_msg), "InputFile::~InputFile close() failed [%s]", _path.string().c_str()); + ::perror(_msg); + LOGS(_log, LOG_LVL_WARN, _msg); } } -void InputFile::read(void *buf, off_t off, size_t sz, int *bufferSize, - std::vector params) const { - char msg[1024]; +void InputFile::read(void *buf, off_t off, size_t sz) const { uint8_t *cur = static_cast(buf); while (sz > 0) { ssize_t n = ::pread(_fd, cur, sz, off); if (n == 0) { - throw std::runtime_error("pread() received EOF [" + _path.string() + "]"); + throw std::runtime_error("InputFile::" + std::string(__func__) + ":received EOF [" + + _path.string() + "]"); } else if (n < 0 && errno != EINTR) { - ::strerror_r(errno, msg, sizeof(msg)); - throw std::runtime_error("pread() failed [" + _path.string() + "]: " + msg); + ::strerror_r(errno, _msg, sizeof(_msg)); + throw std::runtime_error("InputFile::" + std::string(__func__) + ":failed [" + _path.string() + + "]: " + _msg); } else if (n > 0) { sz -= static_cast(n); off += n; @@ -84,99 +91,101 @@ void InputFile::read(void *buf, off_t off, size_t sz, int *bufferSize, } } -InputFileArrow::InputFileArrow(fs::path const &path, off_t blockSize) : InputFile(path), _fd(-1), _sz(-1) { - char msg[1024]; +void InputFile::read(void *buf, off_t off, size_t sz, int & /*bufferSize*/, + ConfigParamArrow const & /*params*/) const { + read(buf, off, sz); +} + +InputFileArrow::InputFileArrow(fs::path const &path, off_t blockSize) + : InputFile(path), _path(path), _fd(-1), _sz(-1) { struct ::stat st; - m_batchReader = std::make_unique(ParquetFile(path.string().c_str())); - arrow::Status status = m_batchReader->SetupBatchReader(blockSize); - if (!status.ok()) throw std::runtime_error("Could not setup Arrow recordbatchreader"); + _batchReader = std::make_unique(path.string()); + arrow::Status status = _batchReader->setupBatchReader(blockSize); + if (!status.ok()) + throw std::runtime_error("InputArrowFile::" + std::string(__func__) + + ": Could not setup Arrow recordbatchreader"); int fd = ::open(path.string().c_str(), O_RDONLY); if (fd == -1) { - ::strerror_r(errno, msg, sizeof(msg)); - throw std::runtime_error("open() failed [" + path.string() + "]: " + msg); + ::strerror_r(errno, _msg, sizeof(_msg)); + throw std::runtime_error("InputFileArrow::" + std::string(__func__) + ": open() failed [" + + path.string() + "]: " + _msg); } if (::fstat(fd, &st) != 0) { - ::strerror_r(errno, msg, sizeof(msg)); + ::strerror_r(errno, _msg, sizeof(_msg)); ::close(fd); - throw std::runtime_error("fstat() failed [" + path.string() + "]: " + msg); + throw std::runtime_error("InputFileArrow::" + std::string(__func__) + ": fstat() failed [" + + path.string() + "]: " + _msg); } _fd = fd; _sz = st.st_size; } InputFileArrow::~InputFileArrow() { - char msg[1024]; if (_fd != -1 && ::close(_fd) != 0) { - ::snprintf(msg, sizeof(msg), "close() failed [%s]", _path.string().c_str()); - ::perror(msg); - ::exit(EXIT_FAILURE); + ::snprintf(_msg, sizeof(_msg), "InputFileArrow::~InputFileArrow : close() failed [%s]", + _path.string().c_str()); + ::perror(_msg); + LOGS(_log, LOG_LVL_WARN, _msg); } } -int InputFileArrow::getBatchNumber() const { return m_batchReader->GetTotalBatchNumber(); } +int InputFileArrow::getBatchNumber() const { return _batchReader->getTotalBatchNumber(); } -void InputFileArrow::read(void *buf, off_t off, size_t sz, int *bufferSize, - std::vector params) const { - char msg[1024]; +void InputFileArrow::read(void *buf, off_t off, size_t sz, int &csvBufferSize, + ConfigParamArrow const ¶ms) const { uint8_t *cur = static_cast(buf); - int buffSize; - arrow::Status status = m_batchReader->ReadNextBatch_Table2CSV(cur, buffSize, params); - // return the buffersize because it varies from batch to batch - *bufferSize = buffSize; + arrow::Status status = _batchReader->readNextBatch_Table2CSV(cur, csvBufferSize, params.paramNames, + params.str_null, params.str_delimiter); if (status.ok()) { - ssize_t n = buffSize; + ssize_t n = csvBufferSize; if (n == 0) { - throw std::runtime_error("pread() received EOF [" + _path.string() + "]"); + throw std::runtime_error("InputFileArrow::" + std::string(__func__) + ": received EOF [" + + _path.string() + "]"); } else if (n < 0 && errno != EINTR) { - ::strerror_r(errno, msg, sizeof(msg)); - throw std::runtime_error("pread() failed [" + _path.string() + "]: " + msg); - } - /*else if (n > 0) { - sz -= static_cast(n); - off += n; - cur += n; + ::strerror_r(errno, _msg, sizeof(_msg)); + throw std::runtime_error("InputFileArrow::" + std::string(__func__) + ": failed [" + + _path.string() + "]: " + _msg); } - */ } } OutputFile::OutputFile(fs::path const &path, bool truncate) : _path(path), _fd(-1) { - char msg[1024]; int flags = O_CREAT | O_WRONLY; if (truncate) { flags |= O_TRUNC; } int fd = ::open(path.string().c_str(), flags, S_IROTH | S_IRGRP | S_IRUSR | S_IWUSR); if (fd == -1) { - ::strerror_r(errno, msg, sizeof(msg)); - throw std::runtime_error("open() failed [" + path.string() + "]: " + msg); + ::strerror_r(errno, _msg, sizeof(_msg)); + throw std::runtime_error("OutputFile::" + std::string(__func__) + ": open() failed [" + + path.string() + "]: " + _msg); } if (!truncate) { if (::lseek(fd, 0, SEEK_END) < 0) { - ::strerror_r(errno, msg, sizeof(msg)); + ::strerror_r(errno, _msg, sizeof(_msg)); close(fd); - throw std::runtime_error("lseek() failed [" + path.string() + "]: " + msg); + throw std::runtime_error("OutputFile::" + std::string(__func__) + ": lseek() failed [" + + path.string() + "]: " + _msg); } } _fd = fd; } OutputFile::~OutputFile() { - char msg[1024]; if (_fd != -1 && close(_fd) != 0) { - ::snprintf(msg, sizeof(msg), "close() failed [%s]", _path.string().c_str()); - ::perror(msg); - ::exit(EXIT_FAILURE); + ::snprintf(_msg, sizeof(_msg), "OutputFile::~OutputFile : close() failed [%s]", + _path.string().c_str()); + ::perror(_msg); + LOGS(_log, LOG_LVL_WARN, _msg); } } void OutputFile::append(void const *buf, size_t sz) { - char msg[1024]; if (!buf || sz == 0) { return; } @@ -185,8 +194,9 @@ void OutputFile::append(void const *buf, size_t sz) { ssize_t n = ::write(_fd, b, sz); if (n < 0) { if (errno != EINTR) { - ::strerror_r(errno, msg, sizeof(msg)); - throw std::runtime_error("write() failed [" + _path.string() + "]: " + msg); + ::strerror_r(errno, _msg, sizeof(_msg)); + throw std::runtime_error("OutputFile::" + std::string(__func__) + ": write() failed [" + + _path.string() + "]: " + _msg); } } else { sz -= static_cast(n); diff --git a/src/partition/FileUtils.h b/src/partition/FileUtils.h index 0598506ba..2a27bd664 100644 --- a/src/partition/FileUtils.h +++ b/src/partition/FileUtils.h @@ -36,11 +36,37 @@ namespace lsst::partition { class ParquetFile; +struct ConfigParamArrow { + std::vector const paramNames; + std::string str_null; + std::string str_delimiter; + std::string str_escape; + + ConfigParamArrow() + : paramNames(std::vector()), str_null(""), str_delimiter(""), str_escape("") {} + ConfigParamArrow(std::vector const ¶mNames, std::string const &vnull, + std::string const &vdelimiter, std::string const &vescape) + : paramNames(paramNames), str_null(vnull), str_delimiter(vdelimiter), str_escape(vescape) {} + + ConfigParamArrow(const ConfigParamArrow &v) + : paramNames(v.paramNames), + str_null(v.str_null), + str_delimiter(v.str_delimiter), + str_escape(v.str_escape) {} + ConfigParamArrow &operator=(const ConfigParamArrow &) = delete; +}; + +typedef struct ConfigParamArrow ConfigParamArrow; + /// An input file. Safe for use from multiple threads. class InputFile { public: explicit InputFile(boost::filesystem::path const &path); - ~InputFile(); + virtual ~InputFile(); + + // Disable copy construction and assignment. + InputFile(InputFile const &) = delete; + InputFile &operator=(InputFile const &) = delete; /// Return the size of the input file. off_t size() const { return _sz; } @@ -52,46 +78,40 @@ class InputFile { virtual int getBatchNumber() const { return -1; } /// Read a range of bytes into `buf`. - virtual void read(void *buf, off_t off, size_t sz, int *bufferSize = 0, - std::vector params = {}) const; + void read(void *buf, off_t off, size_t sz) const; + virtual void read(void *buf, off_t off, size_t sz, int &bufferSize, ConfigParamArrow const ¶ms) const; private: - // Disable copy construction and assignment. - InputFile(InputFile const &); - InputFile &operator=(InputFile const &); + mutable char _msg[1024]; - boost::filesystem::path _path; + boost::filesystem::path const _path; int _fd; off_t _sz; }; class InputFileArrow : public InputFile { public: - explicit InputFileArrow(boost::filesystem::path const &path, off_t blockSize); - ~InputFileArrow(); - - /// Return the size of the input file. - off_t size() const { return _sz; } + InputFileArrow(boost::filesystem::path const &path, off_t blockSize); + virtual ~InputFileArrow(); - /// Return the path of the input file. - boost::filesystem::path const &path() const { return _path; } + // Disable copy construction and assignment. + InputFileArrow(InputFileArrow const &) = delete; + InputFileArrow &operator=(InputFileArrow const &) = delete; - int getBatchNumber() const; + virtual int getBatchNumber() const override; /// Read a range of bytes into `buf`. - void read(void *buf, off_t off, size_t sz, int *bufferSize, std::vector params) const; + virtual void read(void *buf, off_t off, size_t sz, int &bufferSize, + ConfigParamArrow const ¶ms) const override; private: - // Disable copy construction and assignment. - InputFileArrow(InputFileArrow const &); - InputFileArrow &operator=(InputFileArrow const &); + mutable char _msg[1024]; - boost::filesystem::path _path; - std::vector _paramNames; + boost::filesystem::path const _path; int _fd; off_t _sz; - std::unique_ptr m_batchReader; + std::unique_ptr _batchReader; }; /// An output file that can only be appended to, and which should only be @@ -104,6 +124,10 @@ class OutputFile { OutputFile(boost::filesystem::path const &path, bool truncate); ~OutputFile(); + // Disable copy construction and assignment. + OutputFile(OutputFile const &) = delete; + OutputFile &operator=(OutputFile const &) = delete; + /// Return the path of the output file. boost::filesystem::path const &path() const { return _path; } @@ -111,11 +135,9 @@ class OutputFile { void append(void const *buf, size_t size); private: - // Disable copy construction and assignment. - OutputFile(OutputFile const &); - OutputFile &operator=(OutputFile const &); + mutable char _msg[1024]; - boost::filesystem::path _path; + boost::filesystem::path const _path; int _fd; }; diff --git a/src/partition/InputLines.cc b/src/partition/InputLines.cc index 5a9d1daf2..41eae8e61 100644 --- a/src/partition/InputLines.cc +++ b/src/partition/InputLines.cc @@ -93,18 +93,18 @@ struct Block { Block() : file(), offset(0), size(0), head(), tail() {} - CharPtrPair const read(char *buf, bool skipFirstLine, std::vector paramNames = {}); + CharPtrPair const read(char *buf, bool skipFirstLine, ConfigParamArrow const ¶ms); }; // Read a file block and handle the lines crossing its boundaries. -CharPtrPair const Block::read(char *buf, bool skipFirstLine, std::vector paramNames) { +CharPtrPair const Block::read(char *buf, bool skipFirstLine, ConfigParamArrow const &configArrow) { // Read into buf, leaving space for a line on either side of the block. char *readBeg = buf + MAX_LINE_SIZE; char *readEnd = readBeg + size; // Arrow/Parquet : retrieve the real size of the arrow CSV block int bufferSize = 0; - file->read(readBeg, offset, size, &bufferSize, paramNames); + file->read(readBeg, offset, size, bufferSize, configArrow); if (bufferSize > 0) { size = bufferSize; readEnd = readBeg + size; @@ -221,8 +221,9 @@ std::vector const split(fs::path const &path, off_t blockSize) { class InputLines::Impl { public: + Impl(std::vector const &paths, size_t blockSize, bool skipFirstLine); Impl(std::vector const &paths, size_t blockSize, bool skipFirstLine, - std::vector paramNames); + ConfigParamArrow const &config); ~Impl() {} size_t getBlockSize() const { return _blockSize; } @@ -232,8 +233,6 @@ class InputLines::Impl { return _blockCount == 0; } - std::vector getParameterNames() const { return _paramNames; } - CharPtrPair const read(char *buf); private: @@ -244,7 +243,7 @@ class InputLines::Impl { size_t const _blockSize; bool const _skipFirstLine; - const std::vector _paramNames; + ConfigParamArrow const _configArrow; char _pad0[CACHE_LINE_SIZE]; @@ -256,11 +255,20 @@ class InputLines::Impl { char _pad1[CACHE_LINE_SIZE]; }; +InputLines::Impl::Impl(std::vector const &paths, size_t blockSize, bool skipFirstLine) + : _blockSize(std::min(std::max(blockSize, 1 * MiB), 1 * GiB)), + _skipFirstLine(skipFirstLine), + _configArrow(ConfigParamArrow()), + _mutex(), + _blockCount(paths.size()), + _queue(), + _paths(paths) {} + InputLines::Impl::Impl(std::vector const &paths, size_t blockSize, bool skipFirstLine, - std::vector paramNames) + ConfigParamArrow const &config) : _blockSize(std::min(std::max(blockSize, 1 * MiB), 1 * GiB)), _skipFirstLine(skipFirstLine), - _paramNames(paramNames), + _configArrow(config), _mutex(), _blockCount(paths.size()), _queue(), @@ -275,7 +283,7 @@ CharPtrPair const InputLines::Impl::read(char *buf) { _queue.pop_back(); --_blockCount; lock.unlock(); // allow block reads to proceed in parallel - return b.read(buf, _skipFirstLine, _paramNames); + return b.read(buf, _skipFirstLine, _configArrow); } else if (!_paths.empty()) { // The queue is empty - grab the next file and split it into blocks. fs::path path = _paths.back(); @@ -298,7 +306,7 @@ CharPtrPair const InputLines::Impl::read(char *buf) { _queue.insert(_queue.end(), v.rbegin(), v.rend() - 1); _blockCount += v.size() - 1; lock.unlock(); // allow block reads to proceed in parallel - return b.read(buf, _skipFirstLine, _paramNames); + return b.read(buf, _skipFirstLine, _configArrow); } else { // The queue is empty and all input paths have been processed, but // the block count is non-zero. This means one or more threads are @@ -316,9 +324,12 @@ CharPtrPair const InputLines::Impl::read(char *buf) { // Method delegation. +InputLines::InputLines(std::vector const &paths, size_t blockSize, bool skipFirstLine) + : _impl(boost::make_shared(paths, blockSize, skipFirstLine)) {} + InputLines::InputLines(std::vector const &paths, size_t blockSize, bool skipFirstLine, - std::vector paramNames) - : _impl(boost::make_shared(paths, blockSize, skipFirstLine, paramNames)) {} + ConfigParamArrow const &configArrow) + : _impl(boost::make_shared(paths, blockSize, skipFirstLine, configArrow)) {} size_t InputLines::getBlockSize() const { return _impl ? _impl->getBlockSize() : 0; } @@ -326,8 +337,6 @@ size_t InputLines::getMinimumBufferCapacity() const { return _impl ? _impl->getM bool InputLines::empty() const { return _impl ? _impl->empty() : true; } -std::vector InputLines::getParameterNames() const { return _impl->getParameterNames(); } - CharPtrPair const InputLines::read(char *buf) { if (_impl) { return _impl->read(buf); diff --git a/src/partition/InputLines.h b/src/partition/InputLines.h index f7fa6bd3d..f107c29ac 100644 --- a/src/partition/InputLines.h +++ b/src/partition/InputLines.h @@ -33,10 +33,13 @@ #include "boost/filesystem.hpp" #include "boost/shared_ptr.hpp" -#include "ParquetInterface.h" +// Qserv headers +#include "partition/ParquetInterface.h" namespace lsst::partition { +typedef struct ConfigParamArrow ConfigParamArrow; + /// The InputLines class reads lines from a list of input text files in an IO /// efficient and parallel way. Each file is split up into blocks, where all /// blocks except those at the end of a file have the same size. Files are read @@ -70,9 +73,9 @@ class InputLines { /// ignoring the first line in each file. The user is responsible for /// ensuring that the file list contains no empty or duplicate entries. /// Note that `blockSize` is clamped to lie between 1MiB and 1GiB. - InputLines(std::vector const &paths, size_t blockSize, bool skipFirstLine, - std::vector paramNames = {}); - + InputLines(std::vector const& paths, size_t blockSize, bool skipFirstLine); + InputLines(std::vector const& paths, size_t blockSize, bool skipFirstLine, + ConfigParamArrow const& config); ~InputLines() {} /// Return the IO read block size in bytes. @@ -84,15 +87,12 @@ class InputLines { /// Has all the input been read? bool empty() const; - /// Return the parameter names as defined in in.csv - std::vector getParameterNames() const; - /// Read consecutive lines of text into `buf`, and return a pointer range /// `[i,end)` identifying the bytes in `buf` containing valid data. The /// pointers returned will both be NULL if and only if there is no more /// input left to read. Note that `buf` must have a capacity of at least /// getMinimumBufferCapacity() bytes. - std::pair const read(char *buf); + std::pair const read(char* buf); private: class Impl; diff --git a/src/partition/ParquetInterface.cc b/src/partition/ParquetInterface.cc index 8f5e386f0..fd101bee8 100644 --- a/src/partition/ParquetInterface.cc +++ b/src/partition/ParquetInterface.cc @@ -15,12 +15,19 @@ // specific language governing permissions and limitations // under the License. -// #include "arrow/util/key_value_metadata.h" +// Class header +#include "partition/ParquetInterface.h" +// Third party headers #include #include -#include "ParquetInterface.h" +// LSST headers +#include "lsst/log/Log.h" + +namespace { +LOG_LOGGER _log = LOG_GET("lsst.qserv.partitioner"); +} // namespace namespace lsst::partition { @@ -31,14 +38,15 @@ std::map, int> typeBufSize{ {arrow::float16(), 20}, {arrow::date32(), 20}, {arrow::date64(), 20}}; ParquetFile::ParquetFile(std::string fileName, int maxMemAllocated) - : m_path_to_file(fileName), - m_maxMemory(maxMemAllocated), - m_vmRSS_init(0), - m_batchNumber(0), - m_batchSize(0) {} - -// Memory used by the current process -int ParquetFile::DumpProcessMemory(std::string idValue, bool bVerbose) const { + : _path_to_file(fileName), + _maxMemory(maxMemAllocated), + _vmRSS_init(0), + _batchNumber(0), + _batchSize(0) { + LOGS(_log, LOG_LVL_DEBUG, "Partitioner parquet interface..."); +} + +int ParquetFile::_dumpProcessMemory(std::string idValue, bool bVerbose) const { int tSize = 0, resident = 0, share = 0; std::ifstream buffer("/proc/self/statm"); buffer >> tSize >> resident >> share; @@ -51,10 +59,10 @@ int ParquetFile::DumpProcessMemory(std::string idValue, bool bVerbose) const { double shared_mem = (share * page_size_kb) / 1024.0; if (bVerbose) { - std::cout << "VmSize - " << vmSize << " MB "; - std::cout << "VmRSS - " << rss << " MB "; - std::cout << "Shared Memory - " << shared_mem << " MB "; - std::cout << "Private Memory - " << rss - shared_mem << "MB" << std::endl; + LOGS(_log, LOG_LVL_DEBUG, "VmSize - " << vmSize << " MB "); + LOGS(_log, LOG_LVL_DEBUG, "VmRSS - " << rss << " MB "); + LOGS(_log, LOG_LVL_DEBUG, "Shared Memory - " << shared_mem << " MB "); + LOGS(_log, LOG_LVL_DEBUG, "Private Memory - " << rss - shared_mem << "MB"); } if (!idValue.empty()) { @@ -64,9 +72,7 @@ int ParquetFile::DumpProcessMemory(std::string idValue, bool bVerbose) const { return 0; } -// Compute the memory size of a row in butes by adding its element size -// stringDefaultSize is the default size of a parameter identified as a string -int ParquetFile::GetRecordSize(std::shared_ptr schema, int stringDefaultSize) const { +int ParquetFile::_getRecordSize(std::shared_ptr schema, int stringDefaultSize) const { int recordSize = 0; const arrow::FieldVector& vFields = schema->fields(); @@ -75,13 +81,11 @@ int ParquetFile::GetRecordSize(std::shared_ptr schema, int string if (fieldSize < 0) fieldSize = stringDefaultSize; recordSize += fieldSize; } - std::cout << "Record size (Bytes) " << recordSize << std::endl; + LOGS(_log, LOG_LVL_DEBUG, "Record size (Bytes) " << recordSize); return recordSize; } -// Compute the memory size of a row in butes by adding its element size -// stringDefaultSize is the default size of a parameter identified as a string -int ParquetFile::GetStringRecordSize(std::shared_ptr schema, int stringDefaultSize) const { +int ParquetFile::_getStringRecordSize(std::shared_ptr schema, int stringDefaultSize) const { int recordSize = 0; typeBufSize.insert({arrow::utf8(), stringDefaultSize}); @@ -93,15 +97,14 @@ int ParquetFile::GetStringRecordSize(std::shared_ptr schema, int recordSize += fieldSize; recordSize++; } - std::cout << "Record size (approx. CSV string length) " << recordSize << std::endl; + LOGS(_log, LOG_LVL_DEBUG, "Record size (approx. CSV string length) " << recordSize); return recordSize; } -// setup the reader that access te parquet file -arrow::Status ParquetFile::SetupBatchReader(int maxBufferSize) { - m_vmRSS_init = DumpProcessMemory("VmRSS", true); +arrow::Status ParquetFile::setupBatchReader(int maxBufferSize) { + _vmRSS_init = _dumpProcessMemory("VmRSS", true); - int fileRowNumber = GetTotalRowNumber(m_path_to_file); + int fileRowNumber = _getTotalRowNumber(_path_to_file); arrow::MemoryPool* pool = arrow::default_memory_pool(); @@ -112,51 +115,52 @@ arrow::Status ParquetFile::SetupBatchReader(int maxBufferSize) { // Configure Arrow-specific Parquet reader settings auto arrow_reader_props = parquet::ArrowReaderProperties(); - m_batchSize = 5000; // batchSize is in fact the number of rows - arrow_reader_props.set_batch_size(m_batchSize); // default 64 * 1024 + _batchSize = 5000; // batchSize is in fact the number of rows + arrow_reader_props.set_batch_size(_batchSize); // default 64 * 1024 parquet::arrow::FileReaderBuilder reader_builder; - ARROW_RETURN_NOT_OK(reader_builder.OpenFile(m_path_to_file, /*memory_map=*/false, reader_properties)); + ARROW_RETURN_NOT_OK(reader_builder.OpenFile(_path_to_file, /*memory_map=*/false, reader_properties)); reader_builder.memory_pool(pool); reader_builder.properties(arrow_reader_props); - ARROW_ASSIGN_OR_RAISE(m_arrow_reader_gbl, reader_builder.Build()); - ARROW_RETURN_NOT_OK(m_arrow_reader_gbl->GetRecordBatchReader(&m_rb_reader_gbl)); + ARROW_ASSIGN_OR_RAISE(_arrow_reader_gbl, reader_builder.Build()); + ARROW_RETURN_NOT_OK(_arrow_reader_gbl->GetRecordBatchReader(&_rb_reader_gbl)); // Compute the nimber of lines read by each batch in function of the maximum memory // allocated to the process std::shared_ptr<::arrow::Schema> schema; - arrow::Status st = m_arrow_reader_gbl->GetSchema(&schema); + arrow::Status st = _arrow_reader_gbl->GetSchema(&schema); - // std::cout<ToString()< batch size : " << batchSize_mem << std::endl; + _recordSize = _getRecordSize(schema); + double tmp = double(_maxMemory) * 1024 * 1024 * 0.85; + LOGS(_log, LOG_LVL_DEBUG, "Batch size mem " << tmp); + int64_t batchSize_mem = int64_t(tmp / _recordSize); // .85 is a "a la louche" factor + LOGS(_log, LOG_LVL_DEBUG, + "Max RAM (MB): " << _maxMemory << " // record size : " << _recordSize + << " -> batch size : " << batchSize_mem); int64_t batchSize_buf = -1; - m_maxBufferSize = maxBufferSize; + _maxBufferSize = maxBufferSize; if (maxBufferSize > 0) { - m_recordBufferSize = GetStringRecordSize(schema); - // batchSize_buf = int((maxBufferSize*1024*1024)/m_recordBufferSize); - batchSize_buf = int(maxBufferSize / m_recordBufferSize); - std::cout << "\nMax buffer size : " << maxBufferSize << " vs " << m_recordBufferSize - << " -> batch size : " << batchSize_buf << std::endl; + _recordBufferSize = _getStringRecordSize(schema); + // batchSize_buf = int((maxBufferSize*1024*1024)/_recordBufferSize); + batchSize_buf = int(maxBufferSize / _recordBufferSize); + LOGS(_log, LOG_LVL_DEBUG, + "\nMax buffer size : " << maxBufferSize << " vs " << _recordBufferSize + << " -> batch size : " << batchSize_buf); } - m_batchSize = std::min(batchSize_mem, batchSize_buf); - m_arrow_reader_gbl->set_batch_size(m_batchSize); - m_totalBatchNumber = int(fileRowNumber / m_batchSize); - if (m_totalBatchNumber * m_batchSize < fileRowNumber) m_totalBatchNumber++; + _batchSize = std::min(batchSize_mem, batchSize_buf); + _arrow_reader_gbl->set_batch_size(_batchSize); + _totalBatchNumber = int(fileRowNumber / _batchSize); + if (_totalBatchNumber * _batchSize < fileRowNumber) _totalBatchNumber++; - std::cout << "Number of rows : " << fileRowNumber << " batchSize " << m_batchSize << std::endl; - std::cout << "RecordBatchReader : batch number " << m_totalBatchNumber << std::endl; + LOGS(_log, LOG_LVL_DEBUG, "Number of rows : " << fileRowNumber << " batchSize " << _batchSize); + LOGS(_log, LOG_LVL_DEBUG, "RecordBatchReader : batch number " << _totalBatchNumber); return arrow::Status::OK(); } -int ParquetFile::GetTotalRowNumber(std::string fileName) const { +int ParquetFile::_getTotalRowNumber(std::string fileName) const { std::shared_ptr infile; PARQUET_ASSIGN_OR_THROW(infile, arrow::io::ReadableFile::Open(fileName, arrow::default_memory_pool())); @@ -164,38 +168,37 @@ int ParquetFile::GetTotalRowNumber(std::string fileName) const { PARQUET_THROW_NOT_OK(parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader)); std::shared_ptr metadata = reader->parquet_reader()->metadata(); - // std::cout<<"Metadata version : "<version()<created_by() <num_row_groups() <num_rows() <num_rows(); } -// Read an arrow batch, format the table acording to the config file and save it in csv format -arrow::Status ParquetFile::ReadNextBatch_Table2CSV(void* buf, int& buffSize, - std::vector params) { +arrow::Status ParquetFile::readNextBatch_Table2CSV(void* buf, int& buffSize, + std::vector const& params, + std::string const& nullStr, std::string const& delimStr) { std::shared_ptr table_loc; - m_parameterNames = params; - arrow::Status batchStatus = ReadNextBatchTable_Formatted(table_loc); + _parameterNames = params; + // Get the next data batch, data are formated + arrow::Status batchStatus = _readNextBatchTable_Formatted(table_loc); if (!batchStatus.ok()) return arrow::Status::ExecutionError("Error while reading and formating batch"); - arrow::Status status = Table2CSVBuffer(table_loc, buffSize, buf); + arrow::Status status = _table2CSVBuffer(table_loc, buffSize, buf, nullStr, delimStr); if (status.ok()) return arrow::Status::OK(); return arrow::Status::ExecutionError("Error while writing table to CSV buffer"); } -// Create char buffer containing the table in csv format -arrow::Status ParquetFile::Table2CSVBuffer(std::shared_ptr& table, int& buffSize, void* buf) { +arrow::Status ParquetFile::_table2CSVBuffer(std::shared_ptr const& table, int& buffSize, + void* buf, std::string const& nullStr, + std::string const& delimStr) { ARROW_ASSIGN_OR_RAISE(auto outstream, arrow::io::BufferOutputStream::Create(1 << 10)); // Options : null string, no header, no quotes around strings arrow::csv::WriteOptions writeOpt = arrow::csv::WriteOptions::Defaults(); - writeOpt.null_string = "\\N"; + writeOpt.null_string = nullStr; + writeOpt.delimiter = delimStr[0]; writeOpt.include_header = false; writeOpt.quoting_style = arrow::csv::QuotingStyle::None; @@ -204,35 +207,26 @@ arrow::Status ParquetFile::Table2CSVBuffer(std::shared_ptr& table, // auto buffer_ptr = buffer.get()->data(); buffSize = buffer->size(); - std::cout << "ParquetFile::Table2CSVBuffer - buffer length : " << buffSize << " // " << m_maxBufferSize - << std::endl; + LOGS(_log, LOG_LVL_DEBUG, + "ParquetFile::Table2CSVBuffer - buffer length : " << buffSize << " // " << _maxBufferSize); memcpy(buf, (void*)buffer.get()->data(), buffer->size()); - - // auto myfile = std::fstream("buffer_"+std::to_string(m_batchNumber)+".csv", std::ios::out); - // myfile.write((char*)&buffer_ptr[0], buffSize); - // myfile.close(); - return arrow::Status::OK(); } -// Read a batch from the file and format the table iaccording to the partition configuration file -// -> column reordering, true/false -> 1/0, remove null values, ... -arrow::Status ParquetFile::ReadNextBatchTable_Formatted(std::shared_ptr& outputTable) { - arrow::Result> maybe_batch = m_rb_reader_gbl->Next(); +arrow::Status ParquetFile::_readNextBatchTable_Formatted(std::shared_ptr& outputTable) { + auto const maybe_batch = _rb_reader_gbl->Next(); std::vector paramNotFound; std::map> fieldConfig; if (maybe_batch != nullptr) { - // DumpProcessMemory("VmRSS", true); - ARROW_ASSIGN_OR_RAISE(auto batch, maybe_batch); std::shared_ptr initTable; ARROW_ASSIGN_OR_RAISE(initTable, arrow::Table::FromRecordBatches(batch->schema(), {batch})); // Increment the batch number - m_batchNumber++; + _batchNumber++; const arrow::FieldVector fields = initTable->schema()->fields(); for (auto fd : fields) { @@ -243,16 +237,16 @@ arrow::Status ParquetFile::ReadNextBatchTable_Formatted(std::shared_ptr> formatedTable_columns; // Loop over the column names as defined in the partition config file - for (std::string paramName : m_parameterNames) { + for (std::string paramName : _parameterNames) { std::shared_ptr chunkedArray = initTable->GetColumnByName(paramName); // Column not found in the arrow table... - if (chunkedArray == NULLPTR) { + if (chunkedArray == nullptr) { paramNotFound.push_back(paramName); } else { // Column type is boolean -> switch to 0/1 representation if (fieldConfig[paramName]->type() == arrow::boolean()) { - auto newChunkedArray = ChunkArrayReformatBoolean(chunkedArray, true); + auto newChunkedArray = _chunkArrayReformatBoolean(chunkedArray, true); if (newChunkedArray == nullptr) { return arrow::Status::ExecutionError("Error while formating boolean chunk array"); } @@ -274,7 +268,7 @@ arrow::Status ParquetFile::ReadNextBatchTable_Formatted(std::shared_ptr 0) { for (auto name : paramNotFound) - std::cout << "ERROR : param name " << name << " not found in table columns" << std::endl; + LOGS(_log, LOG_LVL_DEBUG, "ERROR : param name " << name << " not found in table columns"); return arrow::Status::ExecutionError("Configuration file : missing parameter in table"); } @@ -286,7 +280,7 @@ arrow::Status ParquetFile::ReadNextBatchTable_Formatted(std::shared_ptrValidateFull(); if (!resTable.ok()) { - std::cout << "ERROR : formated table full validation not OK" << std::endl; + LOGS(_log, LOG_LVL_DEBUG, "ERROR : formated table full validation not OK"); return arrow::Status::ExecutionError("CSV output table not valid"); } @@ -297,13 +291,13 @@ arrow::Status ParquetFile::ReadNextBatchTable_Formatted(std::shared_ptr 1/0 int8 array -std::shared_ptr ParquetFile::ChunkArrayReformatBoolean( +std::shared_ptr ParquetFile::_chunkArrayReformatBoolean( std::shared_ptr& inputArray, bool bCheck) { std::vector> newChunks; std::shared_ptr array; arrow::Int8Builder builder; + // Loop over the chunks defined in the chunkedArray const arrow::ArrayVector& chunks = inputArray->chunks(); for (auto& elemChunk : chunks) { std::shared_ptr chunkData = elemChunk->data(); @@ -320,7 +314,7 @@ std::shared_ptr ParquetFile::ChunkArrayReformatBoolean( if (!builder.Finish(&array).ok()) { std::string errorMsg = "ERROR while finalizing " + inputArray->ToString() + " new chunked array"; - std::cout << errorMsg << std::endl; + LOGS(_log, LOG_LVL_DEBUG, errorMsg); return nullptr; } @@ -338,12 +332,14 @@ std::shared_ptr ParquetFile::ChunkArrayReformatBoolean( newChunks.push_back(std::move(array)); } + // Create new chunkArray based on modified chunks auto newChunkedArray = std::make_shared(std::move(newChunks)); + // arrow validation of the new chunkedArray auto status = newChunkedArray->ValidateFull(); if (!status.ok()) { std::string errorMsg = "Invalid new chunkArraay : " + status.ToString(); - std::cout << errorMsg << std::endl; + LOGS(_log, LOG_LVL_DEBUG, errorMsg); return nullptr; } diff --git a/src/partition/ParquetInterface.h b/src/partition/ParquetInterface.h index c61ea8813..19724f619 100644 --- a/src/partition/ParquetInterface.h +++ b/src/partition/ParquetInterface.h @@ -1,51 +1,184 @@ +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ +/// \file +/// \brief read parquet file using libarrow library (RecordBatchReader interface) + +// System headers +#include +#include +#include + +// Third party headers #include "arrow/api.h" -#include "arrow/io/api.h" #include "arrow/result.h" #include "arrow/type.h" #include "arrow/chunked_array.h" +#include "arrow/io/api.h" #include "arrow/util/type_fwd.h" #include "parquet/arrow/reader.h" #include "parquet/arrow/writer.h" -#include -#include -#include -#include - -// class PartitionConfig; - namespace lsst::partition { +/** + * Class ParquetFile is used to read a parquet file using the parquet feature defined in libarrow. + * This method has been developed in order to read the parquet file batch by batch (a batch is a group + * of rows) and therefore to control the amount of RAM used by the process. + * Indeed, to this day (May 2023), the parquet files are created in one block without using the rowgroup + * option. + * + * The main idea is to read the file by group of rows, each arrow table build from this rows is then formated + * - true/false boolean are replaced by 1/0 + * - arrow::csv::WriteCSV function is used to convert the formated table to a CSV character buffer + * - null values are replaced by the null charcater defined in csv.in.null + * the CSV buffer is then returned to the partitioner. + * + * This method is not MT compliant. + */ class ParquetFile { public: + /** + * Parquet file constructor + * @param fileName - parquet file name + * @param maxMemAllocated - max RAM allocated to the process + */ ParquetFile(std::string fileName, int maxMemAllocated = 3000 /*MB*/); - arrow::Status SetupBatchReader(int maxBufferSize = -1); - arrow::Status ReadNextBatch_Table2CSV(void* buf, int& buffSize, std::vector params); - int GetBatchSize() const { return m_batchSize; } - int GetTotalBatchNumber() const { return m_totalBatchNumber; } + // Disable copy construction and assignment. + ParquetFile(ParquetFile const&) = delete; + ParquetFile& operator=(ParquetFile const&) = delete; + + /** + * This method initializes the arrow batch reader. The number of data rows read by each batch is defined + * to match the constraints defined by the maximum RAM allocate to the reading process and the maximum + * buffer size as defined in the partitioner configration file + * @param maxBufferSize - maximum buffer size as defined in the partitioner configuration file + * @returns The completion status, where arrow::Status::Success is for success, arrow::raise_error stops + * the process otherwise + * @throws arrow::raise_error if the arrow parquet interface or the batch reader cannot be be setup + */ + arrow::Status setupBatchReader(int maxBufferSize = -1); + + /** + * This method reads an arrow batch, formats the table acording to the partition configuration file and + * saves it in csv format + * @param buf - character buffer containing the content of the arrow table dumped in CSV format + * @param buffSize - CSV buffer size returned by the function + * @param params - names of the data columns to be retrieved as defined in the partitioner configuration + * file + * @param nullString - string that replaces a null value in the csv output buffer + * @param delimStr - delimiter used betweenn values in csv buffer + * @returns The completion status, where arrow::Status::Success is for success, arrow::raise_error stops + * the process otherwise + * @throws arrow::raise_error if batch could not be read or if the table formating process goes wrong + */ + arrow::Status readNextBatch_Table2CSV(void* buf, int& buffSize, std::vector const& params, + std::string const& nullStr, std::string const& delimStr); + + int getBatchSize() const { return _batchSize; } + int getTotalBatchNumber() const { return _totalBatchNumber; } private: - int DumpProcessMemory(std::string idValue = "", bool bVerbose = false) const; - int GetRecordSize(std::shared_ptr schema, int defaultSize = 32) const; - int GetStringRecordSize(std::shared_ptr schema, int defaultSize = 32) const; - arrow::Status ReadNextBatchTable_Formatted(std::shared_ptr& table); - arrow::Status Table2CSVBuffer(std::shared_ptr& table, int& buffSize, void* buf); - int GetTotalRowNumber(std::string fileName) const; - std::shared_ptr ChunkArrayReformatBoolean( + /** + * This method monitores the memory used by the current procees + * @param idValue - memory type to monitor (VmSize, VmRSS or SharedMem) + * @param bVerbose - verbosity level + * @returns the memory (MB) used by the process + */ + int _dumpProcessMemory(std::string idValue = "", bool bVerbose = false) const; + + /** + * This method computates the size in bytes of a parquet data row + * @param schema - arrow table schema (list of the column names and types ) + * @param stringDefaultSize - the default size assigned to a column with string type + * @returns the size in bytes of a parquet data row + */ + int _getRecordSize(std::shared_ptr schema, int defaultSize = 32) const; + + /** + * This method computates an approximation of the size of a CSV row corresponding to a parquet data row + * @param schema - arrow table schema (list of the column names and types ) + * @param stringDefaultSize - the default size assigned to a column with string type + * @returns an approximation if a CSV string corresponding to a parquet data row + */ + int _getStringRecordSize(std::shared_ptr schema, int defaultSize = 32) const; + + /** + * This method read the next arrow batch data and proceed to some data formating (column reodering as + * defined by partitioner, true/false -> 0/1). + * @param outputTable - arrow table containing the data read by the arrow:batch + * @returns The completion status, where arrow::Status::Success is for success, arrow::raise_error stops + * the process otherwise + * @throws arrow::raise_error if batch could not be read, if the data table could not be read from the + * batch, if a data column needed by the partitioner is not found in the table or if the outptTable is not + * valid as defined + */ + arrow::Status _readNextBatchTable_Formatted(std::shared_ptr& table); + + /** + * This method creates a character buffer containing the input arrow::Table data. CSV conversion is done + * using the arrow::csv functionality. + * @param table - arrow table containing the data to be dumped in the CSV buffer + * @param buffSize - CSV buffer size returned by the function + * @param buf - character buffer containing the content of the arrow table dumped in CSV format + * @param nullStr - string that replaces a null value in the csv output buffer + * @param delimStr - delimiter used betweenn values in csv buffer + * @returns The completion status, where arrow::Status::Success is for success, arrow::raise_error stops + * the process otherwise + * @throws arrow::raise_error if CSV conversion could not be done + */ + arrow::Status _table2CSVBuffer(std::shared_ptr const& table, int& buffSize, void* buf, + std::string const& nullStr, std::string const& delimStr); + + /** + * This method returns the number of data rows stored in the parquet file + * @param filename - the parquet file name + * @returns the number of data rows + * @throws parquet::throw_error if file could not be open or parquet reader could not be defined + */ + int _getTotalRowNumber(std::string fileName) const; + + /** + * This method reformates a boolean chunk array : a true/false boolean array becomes a 1/0 int8 array + * @param inputArray - chunkedArray to reformat + * @param bCheck - boolean to check or not if the formating went right + * @returns the formated chunkedArray + * @throws arrow::raise_error if batch could not be read, if the data table could not be read from the + * batch, if a data column needed by the partitioner is not found in the table or if the outptTable is not + * valid as defined + */ + std::shared_ptr _chunkArrayReformatBoolean( std::shared_ptr& inputArray, bool bCheck = false); - std::string m_path_to_file; - std::string m_part_config_file; - int m_maxMemory, m_recordSize, m_recordBufferSize; - int m_vmRSS_init; - int m_batchNumber, m_batchSize; - int m_totalBatchNumber; - int m_maxBufferSize; - - std::vector m_parameterNames; - std::unique_ptr m_arrow_reader_gbl; - std::unique_ptr<::arrow::RecordBatchReader> m_rb_reader_gbl; + std::string _path_to_file; + std::string _part_config_file; + int _maxMemory, _recordSize, _recordBufferSize; + int _vmRSS_init; + int _batchNumber, _batchSize; + int _totalBatchNumber; + int _maxBufferSize; + + std::vector _parameterNames; + std::unique_ptr _arrow_reader_gbl; + std::unique_ptr<::arrow::RecordBatchReader> _rb_reader_gbl; }; } // namespace lsst::partition From bf1e74f7885e754e7f47768180dda6208bdf11aa Mon Sep 17 00:00:00 2001 From: elles Date: Mon, 5 Jun 2023 19:00:58 +0200 Subject: [PATCH 5/5] fix clang format in QueryAnaHelper --- src/tests/QueryAnaHelper.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tests/QueryAnaHelper.cc b/src/tests/QueryAnaHelper.cc index a5369f6e4..036147f7d 100644 --- a/src/tests/QueryAnaHelper.cc +++ b/src/tests/QueryAnaHelper.cc @@ -32,7 +32,7 @@ #include "QueryAnaHelper.h" // System headers -//#include +// #include // Third-party headers