Skip to content

Commit

Permalink
Support timestamp units in arrow bridge
Browse files Browse the repository at this point in the history
  • Loading branch information
rui-mo committed Mar 13, 2024
1 parent 85f3973 commit 15e787e
Show file tree
Hide file tree
Showing 12 changed files with 523 additions and 128 deletions.
4 changes: 4 additions & 0 deletions velox/connectors/hive/HiveConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,4 +206,8 @@ bool HiveConfig::s3UseProxyFromEnv() const {
return config_->get<bool>(kS3UseProxyFromEnv, false);
}

uint8_t HiveConfig::arrowBridgeTimestampUnit(const Config* session) const {
return session->get<uint8_t>(kArrowBridgeTimestampUnit, 9 /* nano */);
}

} // namespace facebook::velox::connector::hive
8 changes: 8 additions & 0 deletions velox/connectors/hive/HiveConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ class HiveConfig {
static constexpr const char* kS3UseProxyFromEnv =
"hive.s3.use-proxy-from-env";

// Timestamp unit used during Velox-Arrow conversion.
static constexpr const char* kArrowBridgeTimestampUnit =
"arrow_bridge_timestamp_unit";

InsertExistingPartitionsBehavior insertExistingPartitionsBehavior(
const Config* session) const;

Expand Down Expand Up @@ -247,6 +251,10 @@ class HiveConfig {

bool s3UseProxyFromEnv() const;

/// Returns the timestamp unit used when exporting to Arrow.
/// 0: second, 3: milli, 6: micro, 9: nano.
uint8_t arrowBridgeTimestampUnit(const Config* session) const;

HiveConfig(std::shared_ptr<const Config> config) {
VELOX_CHECK_NOT_NULL(
config, "Config is null for HiveConfig initialization");
Expand Down
2 changes: 2 additions & 0 deletions velox/connectors/hive/HiveDataSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,8 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) {
hiveConfig_->orcWriterMaxStripeSize(connectorSessionProperties));
options.maxDictionaryMemory = std::optional(
hiveConfig_->orcWriterMaxDictionaryMemory(connectorSessionProperties));
options.arrowBridgeTimestampUnit =
hiveConfig_->arrowBridgeTimestampUnit(connectorSessionProperties);
options.serdeParameters = std::map<std::string, std::string>(
insertTableHandle_->serdeParameters().begin(),
insertTableHandle_->serdeParameters().end());
Expand Down
1 change: 1 addition & 0 deletions velox/dwio/common/Options.h
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,7 @@ struct WriterOptions {
std::optional<uint64_t> maxStripeSize{std::nullopt};
std::optional<uint64_t> maxDictionaryMemory{std::nullopt};
std::map<std::string, std::string> serdeParameters;
std::optional<uint8_t> arrowBridgeTimestampUnit;
};

} // namespace facebook::velox::dwio::common
97 changes: 97 additions & 0 deletions velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,38 @@
* limitations under the License.
*/

#include <arrow/type.h>
#include <folly/init/Init.h>

#include "velox/common/base/tests/GTestUtils.h"
#include "velox/common/testutil/TestValue.h"
#include "velox/connectors/hive/HiveConnector.h"
#include "velox/core/QueryCtx.h"
#include "velox/dwio/parquet/tests/ParquetTestBase.h"
#include "velox/exec/tests/utils/Cursor.h"
#include "velox/exec/tests/utils/PlanBuilder.h"
#include "velox/exec/tests/utils/QueryAssertions.h"
#include "velox/exec/tests/utils/TempDirectoryPath.h"

using namespace facebook::velox;
using namespace facebook::velox::common;
using namespace facebook::velox::dwio::common;
using namespace facebook::velox::exec::test;
using namespace facebook::velox::parquet;

class ParquetWriterTest : public ParquetTestBase {
protected:
static void SetUpTestCase() {
memory::MemoryManager::testingSetInstance({});
testutil::TestValue::enable();
auto hiveConnector =
connector::getConnectorFactory(
connector::hive::HiveConnectorFactory::kHiveConnectorName)
->newConnector(
kHiveConnectorId, std::make_shared<core::MemConfig>());
connector::registerConnector(hiveConnector);
}

std::unique_ptr<RowReader> createRowReaderWithSchema(
const std::unique_ptr<Reader> reader,
const RowTypePtr& rowType) {
Expand All @@ -43,6 +65,8 @@ class ParquetWriterTest : public ParquetTestBase {
std::make_shared<InMemoryReadFile>(data), opts.getMemoryPool()),
opts);
};

inline static const std::string kHiveConnectorId = "test-hive";
};

std::vector<CompressionKind> params = {
Expand Down Expand Up @@ -110,3 +134,76 @@ TEST_F(ParquetWriterTest, compression) {
auto rowReader = createRowReaderWithSchema(std::move(reader), schema);
assertReadWithReaderAndExpected(schema, *rowReader, data, *leafPool_);
};

DEBUG_ONLY_TEST_F(ParquetWriterTest, unitFromWriterOptions) {
SCOPED_TESTVALUE_SET(
"facebook::velox::parquet::Writer::write",
std::function<void(const ::arrow::Schema*)>(
([&](const ::arrow::Schema* arrowSchema) {
const auto tsType =
std::dynamic_pointer_cast<::arrow::TimestampType>(
arrowSchema->field(0)->type());
ASSERT_EQ(tsType->unit(), ::arrow::TimeUnit::MICRO);
})));

const auto data = makeRowVector({makeFlatVector<Timestamp>(
10'000, [](auto row) { return Timestamp(row, row); })});
parquet::WriterOptions writerOptions;
writerOptions.memoryPool = leafPool_.get();
writerOptions.arrowBridgeTimestampUnit =
static_cast<uint8_t>(TimestampUnit::kMicro);

// Create an in-memory writer.
auto sink = std::make_unique<MemorySink>(
200 * 1024 * 1024,
dwio::common::FileSink::Options{.pool = leafPool_.get()});
auto writer = std::make_unique<parquet::Writer>(
std::move(sink), writerOptions, rootPool_, ROW({"c0"}, {TIMESTAMP()}));
writer->write(data);
writer->close();
};

DEBUG_ONLY_TEST_F(ParquetWriterTest, unitFromHiveConfig) {
SCOPED_TESTVALUE_SET(
"facebook::velox::parquet::Writer::write",
std::function<void(const ::arrow::Schema*)>(
([&](const ::arrow::Schema* arrowSchema) {
const auto tsType =
std::dynamic_pointer_cast<::arrow::TimestampType>(
arrowSchema->field(0)->type());
ASSERT_EQ(tsType->unit(), ::arrow::TimeUnit::MICRO);
})));

const auto data = makeRowVector({makeFlatVector<Timestamp>(
10'000, [](auto row) { return Timestamp(row, row); })});
const auto outputDirectory = TempDirectoryPath::create();
const auto plan =
PlanBuilder()
.values({data})
.tableWrite(outputDirectory->path, dwio::common::FileFormat::PARQUET)
.planNode();

CursorParameters params;
std::shared_ptr<folly::Executor> executor =
std::make_shared<folly::CPUThreadPoolExecutor>(
std::thread::hardware_concurrency());
std::shared_ptr<core::QueryCtx> queryCtx =
std::make_shared<core::QueryCtx>(executor.get());
std::unordered_map<std::string, std::string> session = {
{std::string(connector::hive::HiveConfig::kArrowBridgeTimestampUnit),
"6" /*kMicro*/}};
queryCtx->setConnectorSessionOverridesUnsafe(
kHiveConnectorId, std::move(session));
params.queryCtx = queryCtx;
params.planNode = plan;

auto addSplits = [&](exec::Task* task) {};
auto result = readCursor(params, addSplits);
ASSERT_TRUE(waitForTaskCompletion(result.first->task().get()));
}

int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
folly::Init init{&argc, &argv, false};
return RUN_ALL_TESTS();
}
15 changes: 11 additions & 4 deletions velox/dwio/parquet/writer/Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
#include <arrow/c/bridge.h>
#include <arrow/io/interfaces.h>
#include <arrow/table.h>
#include "velox/common/testutil/TestValue.h"
#include "velox/dwio/parquet/writer/arrow/Properties.h"
#include "velox/dwio/parquet/writer/arrow/Writer.h"
#include "velox/exec/MemoryReclaimer.h"
#include "velox/vector/arrow/Bridge.h"

namespace facebook::velox::parquet {

Expand Down Expand Up @@ -234,6 +234,8 @@ Writer::Writer(
} else {
flushPolicy_ = std::make_unique<DefaultFlushPolicy>();
}
options_.timestampUnit =
static_cast<TimestampUnit>(options.arrowBridgeTimestampUnit);
arrowContext_->properties =
getArrowParquetWriterOptions(options, flushPolicy_);
setMemoryReclaimers();
Expand Down Expand Up @@ -310,15 +312,16 @@ void Writer::write(const VectorPtr& data) {
data->type()->equivalent(*schema_),
"The file schema type should be equal with the input rowvector type.");

ArrowOptions options{.flattenDictionary = true, .flattenConstant = true};
ArrowArray array;
ArrowSchema schema;
exportToArrow(data, array, generalPool_.get(), options);
exportToArrow(data, schema, options);
exportToArrow(data, array, generalPool_.get(), options_);
exportToArrow(data, schema, options_);

// Convert the arrow schema to Schema and then update the column names based
// on schema_.
auto arrowSchema = ::arrow::ImportSchema(&schema).ValueOrDie();
common::testutil::TestValue::adjust(
"facebook::velox::parquet::Writer::write", arrowSchema.get());
std::vector<std::shared_ptr<::arrow::Field>> newFields;
auto childSize = schema_->size();
for (auto i = 0; i < childSize; i++) {
Expand Down Expand Up @@ -386,6 +389,10 @@ parquet::WriterOptions getParquetOptions(
if (options.compressionKind.has_value()) {
parquetOptions.compression = options.compressionKind.value();
}
if (options.arrowBridgeTimestampUnit.has_value()) {
parquetOptions.arrowBridgeTimestampUnit =
options.arrowBridgeTimestampUnit.value();
}
return parquetOptions;
}

Expand Down
4 changes: 4 additions & 0 deletions velox/dwio/parquet/writer/Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "velox/dwio/parquet/writer/arrow/Types.h"
#include "velox/dwio/parquet/writer/arrow/util/Compression.h"
#include "velox/vector/ComplexVector.h"
#include "velox/vector/arrow/Bridge.h"

namespace facebook::velox::parquet {

Expand Down Expand Up @@ -102,6 +103,7 @@ struct WriterOptions {
std::shared_ptr<CodecOptions> codecOptions;
std::unordered_map<std::string, common::CompressionKind>
columnCompressionsMap;
uint8_t arrowBridgeTimestampUnit = static_cast<uint8_t>(TimestampUnit::kNano);
};

// Writes Velox vectors into a DataSink using Arrow Parquet writer.
Expand Down Expand Up @@ -158,6 +160,8 @@ class Writer : public dwio::common::Writer {
std::unique_ptr<DefaultFlushPolicy> flushPolicy_;

const RowTypePtr schema_;

ArrowOptions options_{.flattenDictionary = true, .flattenConstant = true};
};

class ParquetWriterFactory : public dwio::common::WriterFactory {
Expand Down
11 changes: 8 additions & 3 deletions velox/exec/tests/ArrowStreamTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class ArrowStreamTest : public OperatorTestBase {

int getNext(struct ArrowArray* outArray) {
if (vectorIndex_ < vectors_.size()) {
exportToArrow(vectors_[vectorIndex_], *outArray, pool_.get());
exportToArrow(vectors_[vectorIndex_], *outArray, pool_.get(), options_);
vectorIndex_ += 1;
} else {
// End of stream. Mark the array released.
Expand All @@ -56,12 +56,13 @@ class ArrowStreamTest : public OperatorTestBase {
}

int getArrowSchema(ArrowSchema& out) {
exportToArrow(BaseVector::create(type_, 0, pool_.get()), out);
exportToArrow(BaseVector::create(type_, 0, pool_.get()), out, options_);
return failGetSchema_ ? (int)ErrorCode::kGetSchemaFailed
: (int)ErrorCode::kNoError;
}

private:
ArrowOptions options_;
const std::shared_ptr<memory::MemoryPool> pool_;
const std::vector<RowVectorPtr>& vectors_;
const TypePtr type_;
Expand Down Expand Up @@ -184,7 +185,11 @@ TEST_F(ArrowStreamTest, basic) {
return StringView::makeInline(
std::to_string(100000000000 + (uint64_t)(row % 100)));
},
nullEvery(7))}));
nullEvery(7)),
makeFlatVector<Timestamp>(
size,
[&](vector_size_t row) { return Timestamp(row, row * 1000); },
nullEvery(5))}));
}
createDuckDbTable(vectors);
auto type = asRowType(vectors[0]->type());
Expand Down
Loading

0 comments on commit 15e787e

Please sign in to comment.