Skip to content

Commit

Permalink
Merge pull request #777 from lsst/tickets/DM-38276
Browse files Browse the repository at this point in the history
Merge branch Tickets/dm 38276
  • Loading branch information
hellebore74 authored Jun 8, 2023
2 parents b1ac9ca + bf1e74f commit 6c9b570
Show file tree
Hide file tree
Showing 11 changed files with 860 additions and 50 deletions.
26 changes: 26 additions & 0 deletions admin/tools/docker/base/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,19 @@ RUN dnf install -y 'dnf-command(config-manager)' \
&& dnf clean all \
&& rm -rf /var/cache/yum

ENV ARROW_VERSION 11.0.0

RUN dnf update -y \
&& dnf install -y \
https://apache.jfrog.io/artifactory/arrow/almalinux/$(cut -d: -f5 /etc/system-release-cpe | cut -d. -f1)/apache-arrow-release-latest.rpm \
&& dnf config-manager --set-enabled epel \
&& dnf config-manager --set-enabled powertools \
&& dnf install -y \
arrow-devel-$ARROW_VERSION \
parquet-devel-$ARROW_VERSION \
&& dnf clean all \
&& rm -rf /var/cache/yum

RUN curl -s "https://cmake.org/files/v3.17/cmake-3.17.2-Linux-x86_64.tar.gz" \
| tar --strip-components=1 -xz -C /usr/local

Expand Down Expand Up @@ -196,6 +209,19 @@ RUN dnf install -y 'dnf-command(config-manager)' \
&& dnf clean all \
&& rm -rf /var/cache/yum

ENV ARROW_VERSION 11.0.0

# FIXME: Keep only binaries, and not devel libraries
RUN dnf update -y \
&& dnf install -y \
https://apache.jfrog.io/artifactory/arrow/almalinux/$(cut -d: -f5 /etc/system-release-cpe | cut -d. -f1)/apache-arrow-release-latest.rpm \
&& dnf config-manager --set-enabled epel \
&& dnf config-manager --set-enabled powertools \
&& dnf install -y arrow11-libs-$ARROW_VERSION \
&& dnf install -y parquet11-libs-$ARROW_VERSION \
&& dnf clean all \
&& rm -rf /var/cache/yum

RUN useradd --create-home --uid 1000 --shell /bin/bash qserv
WORKDIR /home/qserv

Expand Down
39 changes: 35 additions & 4 deletions doc/dev/quick-start-devel.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,39 @@
Quick start guide for developers
################################

Using Qserv with your own custom code or arbitrary versions can be done by
connecting your local git repository with an eups software stack containing Qserv
dependencies.

Using Qserv with your own custom code or arbitrary versions can be done using Qserv build containers.
Full documentation is available here:
https://confluence.lsstcorp.org/display/DM/Qserv-Lite+Builds+in+Containers

***********************************
Bootstrap a development environment
***********************************

.. code:: sh
# Clone qserv and its submodules
git clone --recursive https://github.com/lsst/qserv
cd qserv
# Build Qserv base images
qserv build-images
# Build Qserv user image
qserv build-user-build-image
# Open a shell in a development container
# NOTE: Add '--user=qserv' option to command below if user id equals 1000
# NOTE: Use --user-build-image in order to avoid rebuilding a Qserv user image after each commit
qserv run-build --user-build-image docker.io/qserv/lite-build-fjammes:2023.2.1-rc2-10-g6624d8b28
# Build host code inside Qserv container
make
*****************************
Build an image for production
*****************************

.. code:: sh
# NOTE: Remove build/ directory to restart the build from scratch.
# This can fix build errors.
rm -rf build/
# NOTE: Add '--user=qserv' option to command below if user id equals 1000
qserv build -j8
6 changes: 6 additions & 0 deletions src/partition/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@ target_sources(partition PRIVATE
HtmIndex.cc
InputLines.cc
ObjectIndex.cc
ParquetInterface.cc
)

target_link_libraries(partition PUBLIC
boost_program_options
boost_thread
Threads::Threads
arrow
parquet
log
)

install(TARGETS partition)
Expand All @@ -31,6 +35,8 @@ FUNCTION(partition_apps)
partition
boost_filesystem
sphgeom
arrow
parquet
)
install(TARGETS ${APP})
ENDFOREACH()
Expand Down
31 changes: 30 additions & 1 deletion src/partition/CmdLineUtils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <map>
#include <set>
#include <vector>
#include "boost/algorithm/string/predicate.hpp"

#include "partition/ConfigStore.h"
#include "partition/Constants.h"
Expand Down Expand Up @@ -140,6 +141,7 @@ InputLines const makeInputLines(ConfigStore const& config) {
"using --in.path.");
}
std::vector<fs::path> paths;
bool bIsParquetFile = false;
for (auto&& s : config.get<std::vector<std::string>>("in.path")) {
fs::path p(s);
fs::file_status stat = fs::status(p);
Expand All @@ -152,13 +154,40 @@ InputLines const makeInputLines(ConfigStore const& config) {
}
}
}
if (!bIsParquetFile)
bIsParquetFile = (boost::algorithm::ends_with(s.c_str(), ".parquet") ||
boost::algorithm::ends_with(s.c_str(), ".parq"));
}
if (paths.empty()) {
throw std::runtime_error(
"No non-empty input files found among the "
"files and directories specified via --in.path.");
}
return InputLines(paths, blockSize * MiB, false);

// return InputLines(paths, blockSize * MiB, false, names);
if (!bIsParquetFile) return InputLines(paths, blockSize * MiB, false);

// In case input files are parquet files, data from config file have to be transfered to the parquet
// reading class Arrow : collect parameter name list to be read from parquet file
std::vector<std::string> names;
std::string st_null = "";
std::string st_delimiter = "";
std::string st_escape = "";

if (config.has("in.csv.field")) names = config.get<std::vector<std::string>>("in.csv.field");
if (config.has("in.csv.null")) st_null = config.get<std::string>("in.csv.null");
if (config.has("in.csv.delimiter")) st_delimiter = config.get<std::string>("in.csv.delimiter");
if (config.has("in.csv.escape")) st_escape = config.get<std::string>("in.csv.escape");

ConfigParamArrow const configParamArrow{names, st_null, st_delimiter, st_escape};

// Direct parquet file reading is not possible using MT - March 2023
if (config.has("mr.num-workers") && config.get<int>("mr.num-workers") > 1)
throw std::runtime_error(
"Parquet file partition cannot be done in MT - mr.num-workers parameter must be set to 1 in "
"partition.json file ");

return InputLines(paths, blockSize * MiB, false, configParamArrow);
}

void defineOutputOptions(po::options_description& opts) {
Expand Down
123 changes: 98 additions & 25 deletions src/partition/FileUtils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
*/

#include "partition/FileUtils.h"
#include "partition/ParquetInterface.h"

#include <sys/types.h>
#include <sys/stat.h>
Expand All @@ -34,46 +35,54 @@

#include <stdexcept>

// LSST headers
#include "lsst/log/Log.h"

namespace {
LOG_LOGGER _log = LOG_GET("lsst.qserv.partitionner");
} // namespace

namespace fs = boost::filesystem;

namespace lsst::partition {

InputFile::InputFile(fs::path const &path) : _path(path), _fd(-1), _sz(-1) {
char msg[1024];
struct ::stat st;
int fd = ::open(path.string().c_str(), O_RDONLY);
if (fd == -1) {
::strerror_r(errno, msg, sizeof(msg));
throw std::runtime_error("open() failed [" + path.string() + "]: " + msg);
::strerror_r(errno, _msg, sizeof(_msg));
throw std::runtime_error("InputFile::" + std::string(__func__) + " :open() failed [" + path.string() +
"]: " + _msg);
}
if (::fstat(fd, &st) != 0) {
::strerror_r(errno, msg, sizeof(msg));
::strerror_r(errno, _msg, sizeof(_msg));
::close(fd);
throw std::runtime_error("fstat() failed [" + path.string() + "]: " + msg);
throw std::runtime_error("InputFile::" + std::string(__func__) + " :fstat() failed [" +
path.string() + "]: " + _msg);
}
_fd = fd;
_sz = st.st_size;
}

InputFile::~InputFile() {
char msg[1024];
if (_fd != -1 && ::close(_fd) != 0) {
::snprintf(msg, sizeof(msg), "close() failed [%s]", _path.string().c_str());
::perror(msg);
::exit(EXIT_FAILURE);
::snprintf(_msg, sizeof(_msg), "InputFile::~InputFile close() failed [%s]", _path.string().c_str());
::perror(_msg);
LOGS(_log, LOG_LVL_WARN, _msg);
}
}

void InputFile::read(void *buf, off_t off, size_t sz) const {
char msg[1024];
uint8_t *cur = static_cast<uint8_t *>(buf);
while (sz > 0) {
ssize_t n = ::pread(_fd, cur, sz, off);
if (n == 0) {
throw std::runtime_error("pread() received EOF [" + _path.string() + "]");
throw std::runtime_error("InputFile::" + std::string(__func__) + ":received EOF [" +
_path.string() + "]");
} else if (n < 0 && errno != EINTR) {
::strerror_r(errno, msg, sizeof(msg));
throw std::runtime_error("pread() failed [" + _path.string() + "]: " + msg);
::strerror_r(errno, _msg, sizeof(_msg));
throw std::runtime_error("InputFile::" + std::string(__func__) + ":failed [" + _path.string() +
"]: " + _msg);
} else if (n > 0) {
sz -= static_cast<size_t>(n);
off += n;
Expand All @@ -82,38 +91,101 @@ void InputFile::read(void *buf, off_t off, size_t sz) const {
}
}

void InputFile::read(void *buf, off_t off, size_t sz, int & /*bufferSize*/,
ConfigParamArrow const & /*params*/) const {
read(buf, off, sz);
}

InputFileArrow::InputFileArrow(fs::path const &path, off_t blockSize)
: InputFile(path), _path(path), _fd(-1), _sz(-1) {
struct ::stat st;

_batchReader = std::make_unique<lsst::partition::ParquetFile>(path.string());
arrow::Status status = _batchReader->setupBatchReader(blockSize);
if (!status.ok())
throw std::runtime_error("InputArrowFile::" + std::string(__func__) +
": Could not setup Arrow recordbatchreader");

int fd = ::open(path.string().c_str(), O_RDONLY);

if (fd == -1) {
::strerror_r(errno, _msg, sizeof(_msg));
throw std::runtime_error("InputFileArrow::" + std::string(__func__) + ": open() failed [" +
path.string() + "]: " + _msg);
}
if (::fstat(fd, &st) != 0) {
::strerror_r(errno, _msg, sizeof(_msg));
::close(fd);
throw std::runtime_error("InputFileArrow::" + std::string(__func__) + ": fstat() failed [" +
path.string() + "]: " + _msg);
}
_fd = fd;
_sz = st.st_size;
}

InputFileArrow::~InputFileArrow() {
if (_fd != -1 && ::close(_fd) != 0) {
::snprintf(_msg, sizeof(_msg), "InputFileArrow::~InputFileArrow : close() failed [%s]",
_path.string().c_str());
::perror(_msg);
LOGS(_log, LOG_LVL_WARN, _msg);
}
}

int InputFileArrow::getBatchNumber() const { return _batchReader->getTotalBatchNumber(); }

void InputFileArrow::read(void *buf, off_t off, size_t sz, int &csvBufferSize,
ConfigParamArrow const &params) const {
uint8_t *cur = static_cast<uint8_t *>(buf);

arrow::Status status = _batchReader->readNextBatch_Table2CSV(cur, csvBufferSize, params.paramNames,
params.str_null, params.str_delimiter);

if (status.ok()) {
ssize_t n = csvBufferSize;
if (n == 0) {
throw std::runtime_error("InputFileArrow::" + std::string(__func__) + ": received EOF [" +
_path.string() + "]");
} else if (n < 0 && errno != EINTR) {
::strerror_r(errno, _msg, sizeof(_msg));
throw std::runtime_error("InputFileArrow::" + std::string(__func__) + ": failed [" +
_path.string() + "]: " + _msg);
}
}
}

OutputFile::OutputFile(fs::path const &path, bool truncate) : _path(path), _fd(-1) {
char msg[1024];
int flags = O_CREAT | O_WRONLY;
if (truncate) {
flags |= O_TRUNC;
}
int fd = ::open(path.string().c_str(), flags, S_IROTH | S_IRGRP | S_IRUSR | S_IWUSR);
if (fd == -1) {
::strerror_r(errno, msg, sizeof(msg));
throw std::runtime_error("open() failed [" + path.string() + "]: " + msg);
::strerror_r(errno, _msg, sizeof(_msg));
throw std::runtime_error("OutputFile::" + std::string(__func__) + ": open() failed [" +
path.string() + "]: " + _msg);
}
if (!truncate) {
if (::lseek(fd, 0, SEEK_END) < 0) {
::strerror_r(errno, msg, sizeof(msg));
::strerror_r(errno, _msg, sizeof(_msg));
close(fd);
throw std::runtime_error("lseek() failed [" + path.string() + "]: " + msg);
throw std::runtime_error("OutputFile::" + std::string(__func__) + ": lseek() failed [" +
path.string() + "]: " + _msg);
}
}
_fd = fd;
}

OutputFile::~OutputFile() {
char msg[1024];
if (_fd != -1 && close(_fd) != 0) {
::snprintf(msg, sizeof(msg), "close() failed [%s]", _path.string().c_str());
::perror(msg);
::exit(EXIT_FAILURE);
::snprintf(_msg, sizeof(_msg), "OutputFile::~OutputFile : close() failed [%s]",
_path.string().c_str());
::perror(_msg);
LOGS(_log, LOG_LVL_WARN, _msg);
}
}

void OutputFile::append(void const *buf, size_t sz) {
char msg[1024];
if (!buf || sz == 0) {
return;
}
Expand All @@ -122,8 +194,9 @@ void OutputFile::append(void const *buf, size_t sz) {
ssize_t n = ::write(_fd, b, sz);
if (n < 0) {
if (errno != EINTR) {
::strerror_r(errno, msg, sizeof(msg));
throw std::runtime_error("write() failed [" + _path.string() + "]: " + msg);
::strerror_r(errno, _msg, sizeof(_msg));
throw std::runtime_error("OutputFile::" + std::string(__func__) + ": write() failed [" +
_path.string() + "]: " + _msg);
}
} else {
sz -= static_cast<size_t>(n);
Expand Down
Loading

0 comments on commit 6c9b570

Please sign in to comment.