Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support different timestamp units in arrow bridge #7625

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions velox/connectors/hive/HiveConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,4 +206,15 @@ bool HiveConfig::s3UseProxyFromEnv() const {
return config_->get<bool>(kS3UseProxyFromEnv, false);
}

uint8_t HiveConfig::parquetWriteTimestampUnit(const Config* session) const {
const auto unit = session->get<uint8_t>(
kParquetWriteTimestampUnitSession,
config_->get<uint8_t>(kParquetWriteTimestampUnit, 9 /*nano*/));
VELOX_CHECK(
unit == 0 /*second*/ || unit == 3 /*milli*/ || unit == 6 /*micro*/ ||
unit == 9,
"Invalid timestamp unit.");
return unit;
}

} // namespace facebook::velox::connector::hive
10 changes: 10 additions & 0 deletions velox/connectors/hive/HiveConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,12 @@ class HiveConfig {
static constexpr const char* kS3UseProxyFromEnv =
"hive.s3.use-proxy-from-env";

/// Timestamp unit for Parquet write through Arrow bridge.
static constexpr const char* kParquetWriteTimestampUnit =
"hive.parquet.writer.timestamp-unit";
static constexpr const char* kParquetWriteTimestampUnitSession =
"hive.parquet.writer.timestamp_unit";

InsertExistingPartitionsBehavior insertExistingPartitionsBehavior(
const Config* session) const;

Expand Down Expand Up @@ -247,6 +253,10 @@ class HiveConfig {

bool s3UseProxyFromEnv() const;

/// Returns the timestamp unit used when writing timestamps into Parquet
/// through Arrow bridge. 0: second, 3: milli, 6: micro, 9: nano.
uint8_t parquetWriteTimestampUnit(const Config* session) const;

HiveConfig(std::shared_ptr<const Config> config) {
VELOX_CHECK_NOT_NULL(
config, "Config is null for HiveConfig initialization");
Expand Down
2 changes: 2 additions & 0 deletions velox/connectors/hive/HiveDataSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,8 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) {
hiveConfig_->orcWriterMaxStripeSize(connectorSessionProperties));
options.maxDictionaryMemory = std::optional(
hiveConfig_->orcWriterMaxDictionaryMemory(connectorSessionProperties));
options.parquetWriteTimestampUnit =
hiveConfig_->parquetWriteTimestampUnit(connectorSessionProperties);
options.serdeParameters = std::map<std::string, std::string>(
insertTableHandle_->serdeParameters().begin(),
insertTableHandle_->serdeParameters().end());
Expand Down
6 changes: 6 additions & 0 deletions velox/docs/configs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,12 @@ Each query can override the config by setting corresponding query session proper
- string
- 16M
- Maximum dictionary memory that can be used in orc writer.
* - hive.parquet.writer.timestamp-unit
- hive.parquet.writer.timestamp_unit
- tinyint
- 9
- Timestamp unit used when writing timestamps into Parquet through Arrow bridge.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rui-mo Would you clarify what are valid values? It may not be obvious.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the documentation. Thanks.

Valid values are 0 (second), 3 (millisecond), 6 (microsecond), 9 (nanosecond).

``Amazon S3 Configuration``
^^^^^^^^^^^^^^^^^^^^^^^^^^^
Expand Down
1 change: 1 addition & 0 deletions velox/dwio/common/Options.h
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,7 @@ struct WriterOptions {
std::optional<uint64_t> maxStripeSize{std::nullopt};
std::optional<uint64_t> maxDictionaryMemory{std::nullopt};
std::map<std::string, std::string> serdeParameters;
std::optional<uint8_t> parquetWriteTimestampUnit;
};

} // namespace facebook::velox::dwio::common
98 changes: 98 additions & 0 deletions velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,38 @@
* limitations under the License.
*/

#include <arrow/type.h>
#include <folly/init/Init.h>

#include "velox/common/base/tests/GTestUtils.h"
#include "velox/common/testutil/TestValue.h"
#include "velox/connectors/hive/HiveConnector.h"
#include "velox/core/QueryCtx.h"
#include "velox/dwio/parquet/tests/ParquetTestBase.h"
#include "velox/exec/tests/utils/Cursor.h"
#include "velox/exec/tests/utils/PlanBuilder.h"
#include "velox/exec/tests/utils/QueryAssertions.h"
#include "velox/exec/tests/utils/TempDirectoryPath.h"

using namespace facebook::velox;
using namespace facebook::velox::common;
using namespace facebook::velox::dwio::common;
using namespace facebook::velox::exec::test;
using namespace facebook::velox::parquet;

class ParquetWriterTest : public ParquetTestBase {
protected:
static void SetUpTestCase() {
memory::MemoryManager::testingSetInstance({});
testutil::TestValue::enable();
auto hiveConnector =
connector::getConnectorFactory(
connector::hive::HiveConnectorFactory::kHiveConnectorName)
->newConnector(
kHiveConnectorId, std::make_shared<core::MemConfig>());
connector::registerConnector(hiveConnector);
}

std::unique_ptr<RowReader> createRowReaderWithSchema(
const std::unique_ptr<Reader> reader,
const RowTypePtr& rowType) {
Expand All @@ -43,6 +65,8 @@ class ParquetWriterTest : public ParquetTestBase {
std::make_shared<InMemoryReadFile>(data), opts.getMemoryPool()),
opts);
};

inline static const std::string kHiveConnectorId = "test-hive";
};

std::vector<CompressionKind> params = {
Expand Down Expand Up @@ -110,3 +134,77 @@ TEST_F(ParquetWriterTest, compression) {
auto rowReader = createRowReaderWithSchema(std::move(reader), schema);
assertReadWithReaderAndExpected(schema, *rowReader, data, *leafPool_);
};

DEBUG_ONLY_TEST_F(ParquetWriterTest, unitFromWriterOptions) {
SCOPED_TESTVALUE_SET(
"facebook::velox::parquet::Writer::write",
std::function<void(const ::arrow::Schema*)>(
([&](const ::arrow::Schema* arrowSchema) {
const auto tsType =
std::dynamic_pointer_cast<::arrow::TimestampType>(
arrowSchema->field(0)->type());
ASSERT_EQ(tsType->unit(), ::arrow::TimeUnit::MICRO);
})));

const auto data = makeRowVector({makeFlatVector<Timestamp>(
10'000, [](auto row) { return Timestamp(row, row); })});
parquet::WriterOptions writerOptions;
writerOptions.memoryPool = leafPool_.get();
writerOptions.parquetWriteTimestampUnit =
static_cast<uint8_t>(TimestampUnit::kMicro);

// Create an in-memory writer.
auto sink = std::make_unique<MemorySink>(
200 * 1024 * 1024,
dwio::common::FileSink::Options{.pool = leafPool_.get()});
auto writer = std::make_unique<parquet::Writer>(
std::move(sink), writerOptions, rootPool_, ROW({"c0"}, {TIMESTAMP()}));
writer->write(data);
writer->close();
};

DEBUG_ONLY_TEST_F(ParquetWriterTest, unitFromHiveConfig) {
SCOPED_TESTVALUE_SET(
"facebook::velox::parquet::Writer::write",
std::function<void(const ::arrow::Schema*)>(
([&](const ::arrow::Schema* arrowSchema) {
const auto tsType =
std::dynamic_pointer_cast<::arrow::TimestampType>(
arrowSchema->field(0)->type());
ASSERT_EQ(tsType->unit(), ::arrow::TimeUnit::MICRO);
})));

const auto data = makeRowVector({makeFlatVector<Timestamp>(
10'000, [](auto row) { return Timestamp(row, row); })});
const auto outputDirectory = TempDirectoryPath::create();
const auto plan =
PlanBuilder()
.values({data})
.tableWrite(outputDirectory->path, dwio::common::FileFormat::PARQUET)
.planNode();

CursorParameters params;
std::shared_ptr<folly::Executor> executor =
std::make_shared<folly::CPUThreadPoolExecutor>(
std::thread::hardware_concurrency());
std::shared_ptr<core::QueryCtx> queryCtx =
std::make_shared<core::QueryCtx>(executor.get());
std::unordered_map<std::string, std::string> session = {
{std::string(
connector::hive::HiveConfig::kParquetWriteTimestampUnitSession),
"6" /*kMicro*/}};
queryCtx->setConnectorSessionOverridesUnsafe(
kHiveConnectorId, std::move(session));
params.queryCtx = queryCtx;
params.planNode = plan;

auto addSplits = [&](exec::Task* task) {};
auto result = readCursor(params, addSplits);
ASSERT_TRUE(waitForTaskCompletion(result.first->task().get()));
}

int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
folly::Init init{&argc, &argv, false};
return RUN_ALL_TESTS();
}
15 changes: 11 additions & 4 deletions velox/dwio/parquet/writer/Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
#include <arrow/c/bridge.h>
#include <arrow/io/interfaces.h>
#include <arrow/table.h>
#include "velox/common/testutil/TestValue.h"
#include "velox/dwio/parquet/writer/arrow/Properties.h"
#include "velox/dwio/parquet/writer/arrow/Writer.h"
#include "velox/exec/MemoryReclaimer.h"
#include "velox/vector/arrow/Bridge.h"

namespace facebook::velox::parquet {

Expand Down Expand Up @@ -234,6 +234,8 @@ Writer::Writer(
} else {
flushPolicy_ = std::make_unique<DefaultFlushPolicy>();
}
options_.timestampUnit =
static_cast<TimestampUnit>(options.parquetWriteTimestampUnit);
arrowContext_->properties =
getArrowParquetWriterOptions(options, flushPolicy_);
setMemoryReclaimers();
Expand Down Expand Up @@ -310,15 +312,16 @@ void Writer::write(const VectorPtr& data) {
data->type()->equivalent(*schema_),
"The file schema type should be equal with the input rowvector type.");

ArrowOptions options{.flattenDictionary = true, .flattenConstant = true};
ArrowArray array;
ArrowSchema schema;
exportToArrow(data, array, generalPool_.get(), options);
exportToArrow(data, schema, options);
exportToArrow(data, array, generalPool_.get(), options_);
exportToArrow(data, schema, options_);

// Convert the arrow schema to Schema and then update the column names based
// on schema_.
auto arrowSchema = ::arrow::ImportSchema(&schema).ValueOrDie();
common::testutil::TestValue::adjust(
"facebook::velox::parquet::Writer::write", arrowSchema.get());
std::vector<std::shared_ptr<::arrow::Field>> newFields;
auto childSize = schema_->size();
for (auto i = 0; i < childSize; i++) {
Expand Down Expand Up @@ -386,6 +389,10 @@ parquet::WriterOptions getParquetOptions(
if (options.compressionKind.has_value()) {
parquetOptions.compression = options.compressionKind.value();
}
if (options.parquetWriteTimestampUnit.has_value()) {
parquetOptions.parquetWriteTimestampUnit =
options.parquetWriteTimestampUnit.value();
}
return parquetOptions;
}

Expand Down
5 changes: 5 additions & 0 deletions velox/dwio/parquet/writer/Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "velox/dwio/parquet/writer/arrow/Types.h"
#include "velox/dwio/parquet/writer/arrow/util/Compression.h"
#include "velox/vector/ComplexVector.h"
#include "velox/vector/arrow/Bridge.h"

namespace facebook::velox::parquet {

Expand Down Expand Up @@ -102,6 +103,8 @@ struct WriterOptions {
std::shared_ptr<CodecOptions> codecOptions;
std::unordered_map<std::string, common::CompressionKind>
columnCompressionsMap;
uint8_t parquetWriteTimestampUnit =
static_cast<uint8_t>(TimestampUnit::kNano);
};

// Writes Velox vectors into a DataSink using Arrow Parquet writer.
Expand Down Expand Up @@ -158,6 +161,8 @@ class Writer : public dwio::common::Writer {
std::unique_ptr<DefaultFlushPolicy> flushPolicy_;

const RowTypePtr schema_;

ArrowOptions options_{.flattenDictionary = true, .flattenConstant = true};
};

class ParquetWriterFactory : public dwio::common::WriterFactory {
Expand Down
11 changes: 8 additions & 3 deletions velox/exec/tests/ArrowStreamTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class ArrowStreamTest : public OperatorTestBase {

int getNext(struct ArrowArray* outArray) {
if (vectorIndex_ < vectors_.size()) {
exportToArrow(vectors_[vectorIndex_], *outArray, pool_.get());
exportToArrow(vectors_[vectorIndex_], *outArray, pool_.get(), options_);
vectorIndex_ += 1;
} else {
// End of stream. Mark the array released.
Expand All @@ -56,12 +56,13 @@ class ArrowStreamTest : public OperatorTestBase {
}

int getArrowSchema(ArrowSchema& out) {
exportToArrow(BaseVector::create(type_, 0, pool_.get()), out);
exportToArrow(BaseVector::create(type_, 0, pool_.get()), out, options_);
return failGetSchema_ ? (int)ErrorCode::kGetSchemaFailed
: (int)ErrorCode::kNoError;
}

private:
ArrowOptions options_;
const std::shared_ptr<memory::MemoryPool> pool_;
const std::vector<RowVectorPtr>& vectors_;
const TypePtr type_;
Expand Down Expand Up @@ -184,7 +185,11 @@ TEST_F(ArrowStreamTest, basic) {
return StringView::makeInline(
std::to_string(100000000000 + (uint64_t)(row % 100)));
},
nullEvery(7))}));
nullEvery(7)),
makeFlatVector<Timestamp>(
size,
[&](vector_size_t row) { return Timestamp(row, row * 1000); },
nullEvery(5))}));
}
createDuckDbTable(vectors);
auto type = asRowType(vectors[0]->type());
Expand Down
Loading
Loading