Skip to content

Commit

Permalink
Support read flat map as struct in table scan (facebookincubator#9899)
Browse files Browse the repository at this point in the history
Summary:

For flat map column, we always get MAP type for both table schema and
file type, but if we are asked for ROW type in result vector, the read-as-struct
code path is triggered.

Reviewed By: pedroerp

Differential Revision: D57693809
  • Loading branch information
Yuhta authored and facebook-github-bot committed May 23, 2024
1 parent 179fefc commit dad9b00
Show file tree
Hide file tree
Showing 9 changed files with 89 additions and 25 deletions.
21 changes: 16 additions & 5 deletions velox/connectors/hive/HiveConnectorUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -327,14 +327,25 @@ void checkColumnNameLowerCase(const core::TypedExprPtr& typeExpr) {

namespace {

void filterOutNullMapKeys(const Type& rootType, common::ScanSpec& rootSpec) {
rootSpec.visit(rootType, [](const Type& type, common::ScanSpec& spec) {
void processFieldSpec(
const RowTypePtr& dataColumns,
const TypePtr& outputType,
common::ScanSpec& fieldSpec) {
fieldSpec.visit(*outputType, [](const Type& type, common::ScanSpec& spec) {
if (type.isMap() && !spec.isConstant()) {
auto* keys = spec.childByName(common::ScanSpec::kMapKeysFieldName);
VELOX_CHECK_NOT_NULL(keys);
keys->addFilter(common::IsNotNull());
}
});
if (dataColumns) {
auto i = dataColumns->getChildIdxIfExists(fieldSpec.fieldName());
if (i.has_value()) {
if (dataColumns->childAt(*i)->isMap() && outputType->isRow()) {
fieldSpec.setFlatMapAsStruct(true);
}
}
}
}

} // namespace
Expand Down Expand Up @@ -374,7 +385,7 @@ std::shared_ptr<common::ScanSpec> makeScanSpec(
auto it = outputSubfields.find(name);
if (it == outputSubfields.end()) {
auto* fieldSpec = spec->addFieldRecursively(name, *type, i);
filterOutNullMapKeys(*type, *fieldSpec);
processFieldSpec(dataColumns, type, *fieldSpec);
filterSubfields.erase(name);
continue;
}
Expand All @@ -390,7 +401,7 @@ std::shared_ptr<common::ScanSpec> makeScanSpec(
}
auto* fieldSpec = spec->addField(name, i);
addSubfields(*type, subfieldSpecs, 1, pool, *fieldSpec);
filterOutNullMapKeys(*type, *fieldSpec);
processFieldSpec(dataColumns, type, *fieldSpec);
subfieldSpecs.clear();
}

Expand All @@ -404,7 +415,7 @@ std::shared_ptr<common::ScanSpec> makeScanSpec(
auto& type = dataColumns->findChild(fieldName);
auto* fieldSpec = spec->getOrCreateChild(common::Subfield(fieldName));
addSubfields(*type, subfieldSpecs, 1, pool, *fieldSpec);
filterOutNullMapKeys(*type, *fieldSpec);
processFieldSpec(dataColumns, type, *fieldSpec);
subfieldSpecs.clear();
}
}
Expand Down
14 changes: 10 additions & 4 deletions velox/connectors/hive/SplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -380,10 +380,16 @@ std::vector<TypePtr> SplitReader::adaptColumns(
childSpec->setConstantValue(nullptr);
auto outputTypeIdx = readerOutputType_->getChildIdxIfExists(fieldName);
if (outputTypeIdx.has_value()) {
// We know the fieldName exists in the file, make the type at that
// position match what we expect in the output.
columnTypes[fileTypeIdx.value()] =
readerOutputType_->childAt(*outputTypeIdx);
auto& outputType = readerOutputType_->childAt(*outputTypeIdx);
auto& columnType = columnTypes[*fileTypeIdx];
if (childSpec->isFlatMapAsStruct()) {
// Flat map column read as struct. Leave the schema type as MAP.
VELOX_CHECK(outputType->isRow() && columnType->isMap());
} else {
// We know the fieldName exists in the file, make the type at that
// position match what we expect in the output.
columnType = outputType;
}
}
}
}
Expand Down
12 changes: 12 additions & 0 deletions velox/dwio/common/ScanSpec.h
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,14 @@ class ScanSpec {
template <typename F>
void visit(const Type& type, F&& f);

bool isFlatMapAsStruct() const {
return isFlatMapAsStruct_;
}

void setFlatMapAsStruct(bool value) {
isFlatMapAsStruct_ = value;
}

private:
void reorder();

Expand Down Expand Up @@ -407,6 +415,10 @@ class ScanSpec {

// Used only for bulk reader to project flat map features.
std::vector<std::string> flatMapFeatureSelection_;

// This node represents a flat map column that need to be read as struct,
// i.e. in table schema it is a MAP, but in result vector it is ROW.
bool isFlatMapAsStruct_ = false;
};

template <typename F>
Expand Down
4 changes: 4 additions & 0 deletions velox/dwio/common/SelectiveRepeatedColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,10 @@ void SelectiveMapColumnReader::read(

void SelectiveMapColumnReader::getValues(RowSet rows, VectorPtr* result) {
VELOX_DCHECK_NOT_NULL(result);
VELOX_CHECK(
!result->get() || result->get()->type()->isMap(),
"Expect MAP result vector, got {}",
result->get()->type()->toString());
prepareResult(*result, requestedType_->type(), rows.size(), &memoryPool_);
auto* resultMap = result->get()->asUnchecked<MapVector>();
makeOffsetsAndSizes(rows, *resultMap);
Expand Down
10 changes: 3 additions & 7 deletions velox/dwio/dwrf/reader/SelectiveFlatMapColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,7 @@ class SelectiveFlatMapAsStructReader : public SelectiveStructColumnReaderBase {
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
DwrfParams& params,
common::ScanSpec& scanSpec,
const std::vector<std::string>& /*keys*/)
common::ScanSpec& scanSpec)
: SelectiveStructColumnReaderBase(
requestedType,
fileType,
Expand Down Expand Up @@ -246,12 +245,9 @@ std::unique_ptr<dwio::common::SelectiveColumnReader> createReader(
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
DwrfParams& params,
common::ScanSpec& scanSpec) {
auto& mapColumnIdAsStruct =
params.stripeStreams().getRowReaderOptions().getMapColumnIdAsStruct();
auto it = mapColumnIdAsStruct.find(requestedType->id());
if (it != mapColumnIdAsStruct.end()) {
if (scanSpec.isFlatMapAsStruct()) {
return std::make_unique<SelectiveFlatMapAsStructReader<T>>(
requestedType, fileType, params, scanSpec, it->second);
requestedType, fileType, params, scanSpec);
} else {
return std::make_unique<SelectiveFlatMapReader<T>>(
requestedType, fileType, params, scanSpec);
Expand Down
12 changes: 4 additions & 8 deletions velox/dwio/dwrf/test/E2EFilterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ class E2EFilterTest : public E2EFilterTestBase {
dwio::common::RowReaderOptions& opts,
const std::shared_ptr<ScanSpec>& spec) override {
E2EFilterTestBase::setUpRowReaderOptions(opts, spec);
if (!flatmapNodeIdsAsStruct_.empty()) {
opts.setFlatmapNodeIdsAsStruct(flatmapNodeIdsAsStruct_);
for (auto& field : flatMapAsStructFields_) {
spec->childByName(field)->setFlatMapAsStruct(true);
}
}

Expand Down Expand Up @@ -127,6 +127,7 @@ class E2EFilterTest : public E2EFilterTestBase {
mapFlatColsStructKeys.back().push_back(name);
}
columnTypes[i] = MAP(VARCHAR(), columnTypes[i]->childAt(0));
flatMapAsStructFields_.push_back(rowType.nameOf(i));
}
writerSchema = ROW(
std::vector<std::string>(rowType.names()), std::move(columnTypes));
Expand All @@ -137,10 +138,6 @@ class E2EFilterTest : public E2EFilterTestBase {
}
auto& child = schemaWithId->childAt(i);
mapFlatCols.push_back(child->column());
if (!rowType.childAt(i)->isRow()) {
continue;
}
flatmapNodeIdsAsStruct_[child->id()] = mapFlatColsStructKeys[i];
}
config->set(dwrf::Config::FLATTEN_MAP, true);
config->set(dwrf::Config::MAP_FLAT_DISABLE_DICT_ENCODING, false);
Expand All @@ -158,8 +155,7 @@ class E2EFilterTest : public E2EFilterTestBase {
}

std::unique_ptr<dwrf::Writer> writer_;
std::unordered_map<uint32_t, std::vector<std::string>>
flatmapNodeIdsAsStruct_;
std::vector<std::string> flatMapAsStructFields_;
};

TEST_F(E2EFilterTest, integerDirect) {
Expand Down
25 changes: 25 additions & 0 deletions velox/exec/tests/TableScanTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4142,6 +4142,31 @@ TEST_F(TableScanTest, partitionKeyNotMatchPartitionKeysHandle) {
assertQuery(op, split, "SELECT c0 FROM tmp");
}

TEST_F(TableScanTest, readFlatMapAsStruct) {
constexpr int kSize = 10;
std::vector<std::string> keys = {"1", "2", "3"};
auto vector = makeRowVector({makeRowVector(
keys,
{
makeFlatVector<int64_t>(kSize, folly::identity),
makeFlatVector<int64_t>(kSize, folly::identity, nullEvery(5)),
makeFlatVector<int64_t>(kSize, folly::identity, nullEvery(7)),
})});
auto config = std::make_shared<dwrf::Config>();
config->set(dwrf::Config::FLATTEN_MAP, true);
config->set<const std::vector<uint32_t>>(dwrf::Config::MAP_FLAT_COLS, {0});
config->set<const std::vector<std::vector<std::string>>>(
dwrf::Config::MAP_FLAT_COLS_STRUCT_KEYS, {keys});
auto file = TempFilePath::create();
auto writeSchema = ROW({"c0"}, {MAP(INTEGER(), BIGINT())});
writeToFile(file->getPath(), {vector}, config, writeSchema);
auto readSchema = asRowType(vector->type());
auto plan =
PlanBuilder().tableScan(readSchema, {}, "", writeSchema).planNode();
auto split = makeHiveConnectorSplit(file->getPath());
AssertQueryBuilder(plan).split(split).assertResults(vector);
}

// TODO: re-enable this test once we add back driver suspension support for
// table scan.
TEST_F(TableScanTest, DISABLED_memoryArbitrationWithSlowTableScan) {
Expand Down
10 changes: 9 additions & 1 deletion velox/exec/tests/utils/HiveConnectorTestBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,17 @@ void HiveConnectorTestBase::writeToFile(
const std::string& filePath,
const std::vector<RowVectorPtr>& vectors,
std::shared_ptr<dwrf::Config> config) {
writeToFile(filePath, vectors, std::move(config), vectors[0]->type());
}

void HiveConnectorTestBase::writeToFile(
const std::string& filePath,
const std::vector<RowVectorPtr>& vectors,
std::shared_ptr<dwrf::Config> config,
const TypePtr& schema) {
velox::dwrf::WriterOptions options;
options.config = config;
options.schema = vectors[0]->type();
options.schema = schema;
auto localWriteFile = std::make_unique<LocalWriteFile>(filePath, true, false);
auto sink = std::make_unique<dwio::common::WriteFileSink>(
std::move(localWriteFile), filePath);
Expand Down
6 changes: 6 additions & 0 deletions velox/exec/tests/utils/HiveConnectorTestBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ class HiveConnectorTestBase : public OperatorTestBase {
std::shared_ptr<dwrf::Config> config =
std::make_shared<facebook::velox::dwrf::Config>());

void writeToFile(
const std::string& filePath,
const std::vector<RowVectorPtr>& vectors,
std::shared_ptr<dwrf::Config> config,
const TypePtr& schema);

std::vector<RowVectorPtr> makeVectors(
const RowTypePtr& rowType,
int32_t numVectors,
Expand Down

0 comments on commit dad9b00

Please sign in to comment.