Skip to content

Commit

Permalink
Support different timestamp units in arrow bridge (facebookincubator#…
Browse files Browse the repository at this point in the history
…7625)

Summary:
Arrow bridge supports different timestamp units, including second, milli, micro
and nano. This PR adds `TimestampUnit` in  `ArrowOptions` to support these
units in the process of exportToArrow. For importFromArrow, the unit extracted
from arrow schema is followed. By default, the conversion unit is nano, and in
Gluten, micro is configured to align with Spark.
facebookincubator#4680 (comment)
Arrow Reference: https://github.com/apache/arrow/blob/main/cpp/src/arrow/c/bridge.cc#L402-L421.

Pull Request resolved: facebookincubator#7625

Reviewed By: mbasmanova

Differential Revision: D54852534

Pulled By: Yuhta

fbshipit-source-id: 0494102fedc73f7068424bc09e972e7deb297a6e
  • Loading branch information
rui-mo authored and Joe-Abraham committed Jun 7, 2024
1 parent 37d53ee commit efd0158
Show file tree
Hide file tree
Showing 13 changed files with 541 additions and 127 deletions.
11 changes: 11 additions & 0 deletions velox/connectors/hive/HiveConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,4 +206,15 @@ bool HiveConfig::s3UseProxyFromEnv() const {
return config_->get<bool>(kS3UseProxyFromEnv, false);
}

uint8_t HiveConfig::parquetWriteTimestampUnit(const Config* session) const {
const auto unit = session->get<uint8_t>(
kParquetWriteTimestampUnitSession,
config_->get<uint8_t>(kParquetWriteTimestampUnit, 9 /*nano*/));
VELOX_CHECK(
unit == 0 /*second*/ || unit == 3 /*milli*/ || unit == 6 /*micro*/ ||
unit == 9,
"Invalid timestamp unit.");
return unit;
}

} // namespace facebook::velox::connector::hive
10 changes: 10 additions & 0 deletions velox/connectors/hive/HiveConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,12 @@ class HiveConfig {
static constexpr const char* kS3UseProxyFromEnv =
"hive.s3.use-proxy-from-env";

/// Timestamp unit for Parquet write through Arrow bridge.
static constexpr const char* kParquetWriteTimestampUnit =
"hive.parquet.writer.timestamp-unit";
static constexpr const char* kParquetWriteTimestampUnitSession =
"hive.parquet.writer.timestamp_unit";

InsertExistingPartitionsBehavior insertExistingPartitionsBehavior(
const Config* session) const;

Expand Down Expand Up @@ -247,6 +253,10 @@ class HiveConfig {

bool s3UseProxyFromEnv() const;

/// Returns the timestamp unit used when writing timestamps into Parquet
/// through Arrow bridge. 0: second, 3: milli, 6: micro, 9: nano.
uint8_t parquetWriteTimestampUnit(const Config* session) const;

HiveConfig(std::shared_ptr<const Config> config) {
VELOX_CHECK_NOT_NULL(
config, "Config is null for HiveConfig initialization");
Expand Down
2 changes: 2 additions & 0 deletions velox/connectors/hive/HiveDataSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,8 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) {
hiveConfig_->orcWriterMaxStripeSize(connectorSessionProperties));
options.maxDictionaryMemory = std::optional(
hiveConfig_->orcWriterMaxDictionaryMemory(connectorSessionProperties));
options.parquetWriteTimestampUnit =
hiveConfig_->parquetWriteTimestampUnit(connectorSessionProperties);
options.serdeParameters = std::map<std::string, std::string>(
insertTableHandle_->serdeParameters().begin(),
insertTableHandle_->serdeParameters().end());
Expand Down
6 changes: 6 additions & 0 deletions velox/docs/configs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,12 @@ Each query can override the config by setting corresponding query session proper
- string
- 16M
- Maximum dictionary memory that can be used in orc writer.
* - hive.parquet.writer.timestamp-unit
- hive.parquet.writer.timestamp_unit
- tinyint
- 9
- Timestamp unit used when writing timestamps into Parquet through Arrow bridge.
Valid values are 0 (second), 3 (millisecond), 6 (microsecond), 9 (nanosecond).

``Amazon S3 Configuration``
^^^^^^^^^^^^^^^^^^^^^^^^^^^
Expand Down
1 change: 1 addition & 0 deletions velox/dwio/common/Options.h
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,7 @@ struct WriterOptions {
std::optional<uint64_t> maxStripeSize{std::nullopt};
std::optional<uint64_t> maxDictionaryMemory{std::nullopt};
std::map<std::string, std::string> serdeParameters;
std::optional<uint8_t> parquetWriteTimestampUnit;
};

} // namespace facebook::velox::dwio::common
100 changes: 100 additions & 0 deletions velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,38 @@
* limitations under the License.
*/

#include <arrow/type.h>
#include <folly/init/Init.h>

#include "velox/common/base/tests/GTestUtils.h"
#include "velox/common/testutil/TestValue.h"
#include "velox/connectors/hive/HiveConnector.h"
#include "velox/core/QueryCtx.h"
#include "velox/dwio/parquet/tests/ParquetTestBase.h"
#include "velox/exec/tests/utils/Cursor.h"
#include "velox/exec/tests/utils/PlanBuilder.h"
#include "velox/exec/tests/utils/QueryAssertions.h"
#include "velox/exec/tests/utils/TempDirectoryPath.h"

using namespace facebook::velox;
using namespace facebook::velox::common;
using namespace facebook::velox::dwio::common;
using namespace facebook::velox::exec::test;
using namespace facebook::velox::parquet;

class ParquetWriterTest : public ParquetTestBase {
protected:
static void SetUpTestCase() {
memory::MemoryManager::testingSetInstance({});
testutil::TestValue::enable();
auto hiveConnector =
connector::getConnectorFactory(
connector::hive::HiveConnectorFactory::kHiveConnectorName)
->newConnector(
kHiveConnectorId, std::make_shared<core::MemConfig>());
connector::registerConnector(hiveConnector);
}

std::unique_ptr<RowReader> createRowReaderWithSchema(
const std::unique_ptr<Reader> reader,
const RowTypePtr& rowType) {
Expand All @@ -43,6 +65,8 @@ class ParquetWriterTest : public ParquetTestBase {
std::make_shared<InMemoryReadFile>(data), opts.getMemoryPool()),
opts);
};

inline static const std::string kHiveConnectorId = "test-hive";
};

std::vector<CompressionKind> params = {
Expand Down Expand Up @@ -110,3 +134,79 @@ TEST_F(ParquetWriterTest, compression) {
auto rowReader = createRowReaderWithSchema(std::move(reader), schema);
assertReadWithReaderAndExpected(schema, *rowReader, data, *leafPool_);
};

DEBUG_ONLY_TEST_F(ParquetWriterTest, unitFromWriterOptions) {
SCOPED_TESTVALUE_SET(
"facebook::velox::parquet::Writer::write",
std::function<void(const ::arrow::Schema*)>(
([&](const ::arrow::Schema* arrowSchema) {
const auto tsType =
std::dynamic_pointer_cast<::arrow::TimestampType>(
arrowSchema->field(0)->type());
ASSERT_EQ(tsType->unit(), ::arrow::TimeUnit::MICRO);
})));

const auto data = makeRowVector({makeFlatVector<Timestamp>(
10'000, [](auto row) { return Timestamp(row, row); })});
parquet::WriterOptions writerOptions;
writerOptions.memoryPool = leafPool_.get();
writerOptions.parquetWriteTimestampUnit =
static_cast<uint8_t>(TimestampUnit::kMicro);

// Create an in-memory writer.
auto sink = std::make_unique<MemorySink>(
200 * 1024 * 1024,
dwio::common::FileSink::Options{.pool = leafPool_.get()});
auto writer = std::make_unique<parquet::Writer>(
std::move(sink), writerOptions, rootPool_, ROW({"c0"}, {TIMESTAMP()}));
writer->write(data);
writer->close();
};

#ifdef VELOX_ENABLE_PARQUET
DEBUG_ONLY_TEST_F(ParquetWriterTest, unitFromHiveConfig) {
SCOPED_TESTVALUE_SET(
"facebook::velox::parquet::Writer::write",
std::function<void(const ::arrow::Schema*)>(
([&](const ::arrow::Schema* arrowSchema) {
const auto tsType =
std::dynamic_pointer_cast<::arrow::TimestampType>(
arrowSchema->field(0)->type());
ASSERT_EQ(tsType->unit(), ::arrow::TimeUnit::MICRO);
})));

const auto data = makeRowVector({makeFlatVector<Timestamp>(
10'000, [](auto row) { return Timestamp(row, row); })});
const auto outputDirectory = TempDirectoryPath::create();
const auto plan =
PlanBuilder()
.values({data})
.tableWrite(outputDirectory->path, dwio::common::FileFormat::PARQUET)
.planNode();

CursorParameters params;
std::shared_ptr<folly::Executor> executor =
std::make_shared<folly::CPUThreadPoolExecutor>(
std::thread::hardware_concurrency());
std::shared_ptr<core::QueryCtx> queryCtx =
std::make_shared<core::QueryCtx>(executor.get());
std::unordered_map<std::string, std::string> session = {
{std::string(
connector::hive::HiveConfig::kParquetWriteTimestampUnitSession),
"6" /*kMicro*/}};
queryCtx->setConnectorSessionOverridesUnsafe(
kHiveConnectorId, std::move(session));
params.queryCtx = queryCtx;
params.planNode = plan;

auto addSplits = [&](exec::Task* task) {};
auto result = readCursor(params, addSplits);
ASSERT_TRUE(waitForTaskCompletion(result.first->task().get()));
}
#endif

int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
folly::Init init{&argc, &argv, false};
return RUN_ALL_TESTS();
}
15 changes: 11 additions & 4 deletions velox/dwio/parquet/writer/Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
#include <arrow/c/bridge.h>
#include <arrow/io/interfaces.h>
#include <arrow/table.h>
#include "velox/common/testutil/TestValue.h"
#include "velox/dwio/parquet/writer/arrow/Properties.h"
#include "velox/dwio/parquet/writer/arrow/Writer.h"
#include "velox/exec/MemoryReclaimer.h"
#include "velox/vector/arrow/Bridge.h"

namespace facebook::velox::parquet {

Expand Down Expand Up @@ -234,6 +234,8 @@ Writer::Writer(
} else {
flushPolicy_ = std::make_unique<DefaultFlushPolicy>();
}
options_.timestampUnit =
static_cast<TimestampUnit>(options.parquetWriteTimestampUnit);
arrowContext_->properties =
getArrowParquetWriterOptions(options, flushPolicy_);
setMemoryReclaimers();
Expand Down Expand Up @@ -310,15 +312,16 @@ void Writer::write(const VectorPtr& data) {
data->type()->equivalent(*schema_),
"The file schema type should be equal with the input rowvector type.");

ArrowOptions options{.flattenDictionary = true, .flattenConstant = true};
ArrowArray array;
ArrowSchema schema;
exportToArrow(data, array, generalPool_.get(), options);
exportToArrow(data, schema, options);
exportToArrow(data, array, generalPool_.get(), options_);
exportToArrow(data, schema, options_);

// Convert the arrow schema to Schema and then update the column names based
// on schema_.
auto arrowSchema = ::arrow::ImportSchema(&schema).ValueOrDie();
common::testutil::TestValue::adjust(
"facebook::velox::parquet::Writer::write", arrowSchema.get());
std::vector<std::shared_ptr<::arrow::Field>> newFields;
auto childSize = schema_->size();
for (auto i = 0; i < childSize; i++) {
Expand Down Expand Up @@ -386,6 +389,10 @@ parquet::WriterOptions getParquetOptions(
if (options.compressionKind.has_value()) {
parquetOptions.compression = options.compressionKind.value();
}
if (options.parquetWriteTimestampUnit.has_value()) {
parquetOptions.parquetWriteTimestampUnit =
options.parquetWriteTimestampUnit.value();
}
return parquetOptions;
}

Expand Down
5 changes: 5 additions & 0 deletions velox/dwio/parquet/writer/Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "velox/dwio/parquet/writer/arrow/Types.h"
#include "velox/dwio/parquet/writer/arrow/util/Compression.h"
#include "velox/vector/ComplexVector.h"
#include "velox/vector/arrow/Bridge.h"

namespace facebook::velox::parquet {

Expand Down Expand Up @@ -102,6 +103,8 @@ struct WriterOptions {
std::shared_ptr<CodecOptions> codecOptions;
std::unordered_map<std::string, common::CompressionKind>
columnCompressionsMap;
uint8_t parquetWriteTimestampUnit =
static_cast<uint8_t>(TimestampUnit::kNano);
};

// Writes Velox vectors into a DataSink using Arrow Parquet writer.
Expand Down Expand Up @@ -158,6 +161,8 @@ class Writer : public dwio::common::Writer {
std::unique_ptr<DefaultFlushPolicy> flushPolicy_;

const RowTypePtr schema_;

ArrowOptions options_{.flattenDictionary = true, .flattenConstant = true};
};

class ParquetWriterFactory : public dwio::common::WriterFactory {
Expand Down
11 changes: 8 additions & 3 deletions velox/exec/tests/ArrowStreamTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class ArrowStreamTest : public OperatorTestBase {

int getNext(struct ArrowArray* outArray) {
if (vectorIndex_ < vectors_.size()) {
exportToArrow(vectors_[vectorIndex_], *outArray, pool_.get());
exportToArrow(vectors_[vectorIndex_], *outArray, pool_.get(), options_);
vectorIndex_ += 1;
} else {
// End of stream. Mark the array released.
Expand All @@ -56,12 +56,13 @@ class ArrowStreamTest : public OperatorTestBase {
}

int getArrowSchema(ArrowSchema& out) {
exportToArrow(BaseVector::create(type_, 0, pool_.get()), out);
exportToArrow(BaseVector::create(type_, 0, pool_.get()), out, options_);
return failGetSchema_ ? (int)ErrorCode::kGetSchemaFailed
: (int)ErrorCode::kNoError;
}

private:
ArrowOptions options_;
const std::shared_ptr<memory::MemoryPool> pool_;
const std::vector<RowVectorPtr>& vectors_;
const TypePtr type_;
Expand Down Expand Up @@ -184,7 +185,11 @@ TEST_F(ArrowStreamTest, basic) {
return StringView::makeInline(
std::to_string(100000000000 + (uint64_t)(row % 100)));
},
nullEvery(7))}));
nullEvery(7)),
makeFlatVector<Timestamp>(
size,
[&](vector_size_t row) { return Timestamp(row, row * 1000); },
nullEvery(5))}));
}
createDuckDbTable(vectors);
auto type = asRowType(vectors[0]->type());
Expand Down
Loading

0 comments on commit efd0158

Please sign in to comment.