Skip to content

Commit

Permalink
feat(dwio): Delta update support in selective readers (facebookincuba…
Browse files Browse the repository at this point in the history
…tor#11501)

Summary: Pull Request resolved: facebookincubator#11501

Reviewed By: oerling

Differential Revision: D65757116
  • Loading branch information
Yuhta authored and facebook-github-bot committed Nov 12, 2024
1 parent 7437093 commit 9965fdd
Show file tree
Hide file tree
Showing 16 changed files with 527 additions and 296 deletions.
176 changes: 140 additions & 36 deletions velox/dwio/common/ColumnLoader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,54 @@

namespace facebook::velox::dwio::common {

namespace {

RowSet read(
SelectiveStructColumnReaderBase* structReader,
SelectiveColumnReader* fieldReader,
uint64_t version,
RowSet rows,
raw_vector<vector_size_t>& selectedRows,
ValueHook* hook) {
VELOX_CHECK_EQ(
version,
structReader->numReads(),
"Loading LazyVector after the enclosing reader has moved");
const auto offset = structReader->lazyVectorReadOffset();
const auto* incomingNulls = structReader->nulls();
const auto outputRows = structReader->outputRows();
RowSet effectiveRows;

if (rows.size() == outputRows.size()) {
// All the rows planned at creation are accessed.
effectiveRows = outputRows;
} else {
// rows is a set of indices into outputRows. There has been a
// selection between creation and loading.
selectedRows.resize(rows.size());
VELOX_DCHECK(!selectedRows.empty());
for (auto i = 0; i < rows.size(); ++i) {
selectedRows[i] = outputRows[rows[i]];
}
effectiveRows = selectedRows;
}

structReader->advanceFieldReader(fieldReader, offset);
fieldReader->scanSpec()->setValueHook(hook);
fieldReader->read(offset, effectiveRows, incomingNulls);
if (fieldReader->fileType().type()->isRow()) {
// 'fieldReader_' may itself produce LazyVectors. For this it must have its
// result row numbers set.
static_cast<SelectiveStructColumnReaderBase*>(fieldReader)
->setLoadableRows(effectiveRows);
}
return effectiveRows;
}

// Wraps '*result' in a dictionary to make the contiguous values
// appear at the indices i 'rows'. Used when loading a LazyVector for
// a sparse set of rows in conditional exprs.
namespace {
static void scatter(RowSet rows, vector_size_t resultSize, VectorPtr* result) {
void scatter(RowSet rows, vector_size_t resultSize, VectorPtr* result) {
VELOX_CHECK_GE(resultSize, rows.back() + 1);

// Initialize the indices to 0 to make the dictionary safely
Expand All @@ -40,6 +83,62 @@ static void scatter(RowSet rows, vector_size_t resultSize, VectorPtr* result) {
result->get()->disableMemo();
*result = BaseVector::wrapInDictionary(nullptr, indices, resultSize, *result);
}

template <typename T>
void addToHookImpl(
const DecodedVector& decoded,
const RowSet& rows,
ValueHook& hook) {
if (decoded.isIdentityMapping()) {
auto* values = decoded.data<T>();
hook.addValues(rows.data(), values, rows.size());
return;
}
for (auto i : rows) {
if (!decoded.isNullAt(i)) {
hook.addValueTyped(i, decoded.valueAt<T>(i));
} else if (hook.acceptsNulls()) {
hook.addNull(i);
}
}
}

void addToHook(
const DecodedVector& decoded,
const RowSet& rows,
ValueHook& hook) {
switch (decoded.base()->typeKind()) {
case TypeKind::BOOLEAN:
addToHookImpl<bool>(decoded, rows, hook);
break;
case TypeKind::TINYINT:
addToHookImpl<int8_t>(decoded, rows, hook);
break;
case TypeKind::SMALLINT:
addToHookImpl<int16_t>(decoded, rows, hook);
break;
case TypeKind::INTEGER:
addToHookImpl<int32_t>(decoded, rows, hook);
break;
case TypeKind::BIGINT:
addToHookImpl<int64_t>(decoded, rows, hook);
break;
case TypeKind::REAL:
addToHookImpl<float>(decoded, rows, hook);
break;
case TypeKind::DOUBLE:
addToHookImpl<double>(decoded, rows, hook);
break;
case TypeKind::VARCHAR:
case TypeKind::VARBINARY:
addToHookImpl<StringView>(decoded, rows, hook);
break;
default:
VELOX_FAIL(
"Unsupported type kind for hook: {}", decoded.base()->typeKind());
}
}

} // namespace

void ColumnLoader::loadInternal(
Expand All @@ -48,48 +147,19 @@ void ColumnLoader::loadInternal(
vector_size_t resultSize,
VectorPtr* result) {
process::TraceContext trace("ColumnLoader::loadInternal");
VELOX_CHECK_EQ(
version_,
structReader_->numReads(),
"Loading LazyVector after the enclosing reader has moved");
const auto offset = structReader_->lazyVectorReadOffset();
const auto* incomingNulls = structReader_->nulls();
const auto outputRows = structReader_->outputRows();
raw_vector<vector_size_t> selectedRows;
RowSet effectiveRows;
ExceptionContextSetter exceptionContext(
{[](VeloxException::Type /*exceptionType*/, auto* reader) {
return static_cast<SelectiveStructColumnReaderBase*>(reader)
->debugString();
},
structReader_});

if (rows.size() == outputRows.size()) {
// All the rows planned at creation are accessed.
effectiveRows = outputRows;
} else {
// rows is a set of indices into outputRows. There has been a
// selection between creation and loading.
selectedRows.resize(rows.size());
assert(!selectedRows.empty());
for (auto i = 0; i < rows.size(); ++i) {
selectedRows[i] = outputRows[rows[i]];
}
effectiveRows = RowSet(selectedRows);
}

structReader_->advanceFieldReader(fieldReader_, offset);
fieldReader_->scanSpec()->setValueHook(hook);
fieldReader_->read(offset, effectiveRows, incomingNulls);
if (fieldReader_->fileType().type()->kind() == TypeKind::ROW) {
// 'fieldReader_' may itself produce LazyVectors. For this it must have its
// result row numbers set.
static_cast<SelectiveStructColumnReaderBase*>(fieldReader_)
->setLoadableRows(effectiveRows);
}
raw_vector<vector_size_t> selectedRows;
auto effectiveRows =
read(structReader_, fieldReader_, version_, rows, selectedRows, hook);
if (!hook) {
fieldReader_->getValues(effectiveRows, result);
if (((rows.back() + 1) < resultSize) || rows.size() != outputRows.size()) {
if (((rows.back() + 1) < resultSize) ||
rows.size() != structReader_->outputRows().size()) {
// We read sparsely. The values that were read should appear
// at the indices in the result vector that were given by
// 'rows'.
Expand All @@ -98,4 +168,38 @@ void ColumnLoader::loadInternal(
}
}

void DeltaUpdateColumnLoader::loadInternal(
RowSet rows,
ValueHook* hook,
vector_size_t resultSize,
VectorPtr* result) {
process::TraceContext trace("DeltaUpdateColumnLoader::loadInternal");
ExceptionContextSetter exceptionContext(
{[](VeloxException::Type /*exceptionType*/, auto* reader) {
return static_cast<SelectiveStructColumnReaderBase*>(reader)
->debugString();
},
structReader_});
auto* scanSpec = fieldReader_->scanSpec();
VELOX_CHECK(scanSpec->readFromFile());
// Filters on delta updated columns need to be disabled and applied after this
// method return.
VELOX_CHECK(!scanSpec->hasFilter());
scanSpec->setValueHook(nullptr);
raw_vector<vector_size_t> selectedRows;
RowSet effectiveRows;
effectiveRows =
read(structReader_, fieldReader_, version_, rows, selectedRows, nullptr);
fieldReader_->getValues(effectiveRows, result);
scanSpec->deltaUpdate()->update(effectiveRows, *result);
if (hook) {
DecodedVector decoded(**result);
addToHook(decoded, effectiveRows, *hook);
} else if (
rows.back() + 1 < resultSize ||
rows.size() != structReader_->outputRows().size()) {
scatter(rows, resultSize, result);
}
}

} // namespace facebook::velox::dwio::common
27 changes: 24 additions & 3 deletions velox/dwio/common/ColumnLoader.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

namespace facebook::velox::dwio::common {

class ColumnLoader : public velox::VectorLoader {
class ColumnLoader : public VectorLoader {
public:
ColumnLoader(
SelectiveStructColumnReaderBase* structReader,
Expand All @@ -30,14 +30,13 @@ class ColumnLoader : public velox::VectorLoader {
fieldReader_(fieldReader),
version_(version) {}

protected:
private:
void loadInternal(
RowSet rows,
ValueHook* hook,
vector_size_t resultSize,
VectorPtr* result) override;

private:
SelectiveStructColumnReaderBase* const structReader_;
SelectiveColumnReader* const fieldReader_;
// This is checked against the version of 'structReader' on load. If
Expand All @@ -46,4 +45,26 @@ class ColumnLoader : public velox::VectorLoader {
const uint64_t version_;
};

class DeltaUpdateColumnLoader : public VectorLoader {
public:
DeltaUpdateColumnLoader(
SelectiveStructColumnReaderBase* structReader,
SelectiveColumnReader* fieldReader,
uint64_t version)
: structReader_(structReader),
fieldReader_(fieldReader),
version_(version) {}

private:
void loadInternal(
RowSet rows,
ValueHook* hook,
vector_size_t resultSize,
VectorPtr* result) override;

SelectiveStructColumnReaderBase* const structReader_;
SelectiveColumnReader* const fieldReader_;
const uint64_t version_;
};

} // namespace facebook::velox::dwio::common
4 changes: 2 additions & 2 deletions velox/dwio/common/FileSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ void WriteFileSink::write(std::vector<DataBuffer<char>>& buffers) {
}

void WriteFileSink::doClose() {
LOG(INFO) << "closing file: " << name()
<< ", total size: " << succinctBytes(size_);
VLOG(1) << "closing file: " << name()
<< ", total size: " << succinctBytes(size_);
if (writeFile_ != nullptr) {
writeFile_->close();
}
Expand Down
13 changes: 13 additions & 0 deletions velox/dwio/common/Mutation.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
#pragma once

#include "velox/common/base/RandomUtil.h"
#include "velox/vector/LazyVector.h"

#include <cstdint>

namespace facebook::velox::dwio::common {

/// Top row level mutations.
struct Mutation {
/// Bit masks for row numbers to be deleted.
const uint64_t* deletedRows = nullptr;
Expand All @@ -33,4 +35,15 @@ inline bool hasDeletion(const Mutation* mutation) {
return mutation && (mutation->deletedRows || mutation->randomSkip);
}

class DeltaColumnUpdater {
public:
virtual ~DeltaColumnUpdater() = default;

/// Update the values in `result' to reflect the delta updates on `baseRows'.
/// `baseRows' are the rows starting from the beginning of the current scan
/// (so that the delta readers can use this to choose which lines to read as
/// well), not based on the positions in `result'.
virtual void update(const RowSet& baseRows, VectorPtr& result) = 0;
};

} // namespace facebook::velox::dwio::common
7 changes: 4 additions & 3 deletions velox/dwio/common/Options.h
Original file line number Diff line number Diff line change
Expand Up @@ -448,9 +448,10 @@ class ReaderOptions : public io::ReaderOptions {
return *this;
}

/// Sets the schema of the file (a Type tree). For "dwrf" format, a default
/// schema is derived from the file. For "rc" format, there is no default
/// schema.
/// Sets the current table schema of the file (a Type tree). This could be
/// different from the actual schema in file if schema evolution happened.
/// For "dwrf" format, a default schema is derived from the file. For "rc"
/// format, there is no default schema.
ReaderOptions& setFileSchema(const RowTypePtr& schema) {
fileSchema_ = schema;
return *this;
Expand Down
Loading

0 comments on commit 9965fdd

Please sign in to comment.