Skip to content

Commit

Permalink
Support different timestamp units in arrow bridge (7625)
Browse files Browse the repository at this point in the history
  • Loading branch information
rui-mo authored and glutenperfbot committed Feb 27, 2024
1 parent 3c0d7f8 commit 64adf58
Show file tree
Hide file tree
Showing 14 changed files with 395 additions and 131 deletions.
5 changes: 5 additions & 0 deletions velox/connectors/hive/HiveConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,4 +206,9 @@ bool HiveConfig::s3UseProxyFromEnv() const {
return config_->get<bool>(kS3UseProxyFromEnv, false);
}

// static.
uint8_t HiveConfig::arrowBridgeTimestampUnit(const Config* session) const {
return session->get<uint8_t>(kArrowBridgeTimestampUnit, 9 /* nano */);
}

} // namespace facebook::velox::connector::hive

This comment has been minimized.

Copy link
@Yohahaha

Yohahaha Feb 28, 2024

May I ask whether there are any plans to contribute to the upstream Velox project?

This comment has been minimized.

Copy link
@rui-mo

rui-mo Feb 28, 2024

Author Collaborator

@Yohahaha Yes, there is one PR on this change. Please check facebookincubator#7625.

8 changes: 8 additions & 0 deletions velox/connectors/hive/HiveConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ class HiveConfig {
static constexpr const char* kS3UseProxyFromEnv =
"hive.s3.use-proxy-from-env";

// Timestamp unit used during Velox-Arrow conversion.
static constexpr const char* kArrowBridgeTimestampUnit =
"arrow_bridge_timestamp_unit";

InsertExistingPartitionsBehavior insertExistingPartitionsBehavior(
const Config* session) const;

Expand Down Expand Up @@ -247,6 +251,10 @@ class HiveConfig {

bool s3UseProxyFromEnv() const;

/// Returns the timestamp unit used in Velox-Arrow conversion.
/// 0: second, 3: milli, 6: micro, 9: nano.
uint8_t arrowBridgeTimestampUnit(const Config* session) const;

HiveConfig(std::shared_ptr<const Config> config) {
VELOX_CHECK_NOT_NULL(
config, "Config is null for HiveConfig initialization");
Expand Down
2 changes: 2 additions & 0 deletions velox/connectors/hive/HiveDataSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,8 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) {
options.serdeParameters = std::map<std::string, std::string>(
insertTableHandle_->serdeParameters().begin(),
insertTableHandle_->serdeParameters().end());
options.arrowBridgeTimestampUnit =
hiveConfig_->arrowBridgeTimestampUnit(connectorSessionProperties);
ioStats_.emplace_back(std::make_shared<io::IoStatistics>());

// Prevents the memory allocation during the writer creation.
Expand Down
11 changes: 11 additions & 0 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,10 @@ class QueryConfig {
static constexpr const char* kDriverCpuTimeSliceLimitMs =
"driver_cpu_time_slice_limit_ms";

// Timestamp unit used during Velox-Arrow conversion.
static constexpr const char* kArrowBridgeTimestampUnit =
"arrow_bridge_timestamp_unit";

uint64_t queryMaxMemoryPerNode() const {
return toCapacity(
get<std::string>(kQueryMaxMemoryPerNode, "0B"), CapacityUnit::BYTE);
Expand Down Expand Up @@ -582,6 +586,13 @@ class QueryConfig {
return get<uint8_t>(kSpillStartPartitionBit, kDefaultStartBit);
}

/// Returns the timestamp unit used in Velox-Arrow conversion.
/// 0: second, 3: milli, 6: micro, 9: nano.
uint8_t arrowBridgeTimestampUnit() const {
constexpr uint8_t kDefaultUnit = 9;
return get<uint8_t>(kArrowBridgeTimestampUnit, kDefaultUnit);
}

/// Returns the number of bits used to calculate the spilling partition
/// number for hash join. The number of spilling partitions will be power of
/// two.
Expand Down
1 change: 1 addition & 0 deletions velox/dwio/common/Options.h
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,7 @@ struct WriterOptions {
std::optional<uint64_t> maxStripeSize{std::nullopt};
std::optional<uint64_t> maxDictionaryMemory{std::nullopt};
std::map<std::string, std::string> serdeParameters;
std::optional<uint8_t> arrowBridgeTimestampUnit;
};

} // namespace facebook::velox::dwio::common
12 changes: 8 additions & 4 deletions velox/dwio/parquet/writer/Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#include "velox/dwio/parquet/writer/arrow/Properties.h"
#include "velox/dwio/parquet/writer/arrow/Writer.h"
#include "velox/exec/MemoryReclaimer.h"
#include "velox/vector/arrow/Bridge.h"

namespace facebook::velox::parquet {

Expand Down Expand Up @@ -234,6 +233,8 @@ Writer::Writer(
} else {
flushPolicy_ = std::make_unique<DefaultFlushPolicy>();
}
options_.timestampUnit =
static_cast<TimestampUnit>(options.arrowBridgeTimestampUnit);
arrowContext_->properties =
getArrowParquetWriterOptions(options, flushPolicy_);
setMemoryReclaimers();
Expand Down Expand Up @@ -310,11 +311,10 @@ void Writer::write(const VectorPtr& data) {
data->type()->equivalent(*schema_),
"The file schema type should be equal with the input rowvector type.");

ArrowOptions options{.flattenDictionary = true, .flattenConstant = true};
ArrowArray array;
ArrowSchema schema;
exportToArrow(data, array, generalPool_.get(), options);
exportToArrow(data, schema, options);
exportToArrow(data, array, generalPool_.get(), options_);
exportToArrow(data, schema, options_);

// Convert the arrow schema to Schema and then update the column names based
// on schema_.
Expand Down Expand Up @@ -386,6 +386,10 @@ parquet::WriterOptions getParquetOptions(
if (options.compressionKind.has_value()) {
parquetOptions.compression = options.compressionKind.value();
}
if (options.arrowBridgeTimestampUnit.has_value()) {
parquetOptions.arrowBridgeTimestampUnit =
options.arrowBridgeTimestampUnit.value();
}
return parquetOptions;
}

Expand Down
4 changes: 4 additions & 0 deletions velox/dwio/parquet/writer/Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "velox/dwio/parquet/writer/arrow/Types.h"
#include "velox/dwio/parquet/writer/arrow/util/Compression.h"
#include "velox/vector/ComplexVector.h"
#include "velox/vector/arrow/Bridge.h"

namespace facebook::velox::parquet {

Expand Down Expand Up @@ -102,6 +103,7 @@ struct WriterOptions {
std::shared_ptr<CodecOptions> codecOptions;
std::unordered_map<std::string, common::CompressionKind>
columnCompressionsMap;
uint8_t arrowBridgeTimestampUnit = static_cast<uint8_t>(TimestampUnit::kNano);
};

// Writes Velox vectors into a DataSink using Arrow Parquet writer.
Expand Down Expand Up @@ -158,6 +160,8 @@ class Writer : public dwio::common::Writer {
std::unique_ptr<DefaultFlushPolicy> flushPolicy_;

const RowTypePtr schema_;

ArrowOptions options_{.flattenDictionary = true, .flattenConstant = true};
};

class ParquetWriterFactory : public dwio::common::WriterFactory {
Expand Down
4 changes: 3 additions & 1 deletion velox/exec/ArrowStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ ArrowStream::ArrowStream(
operatorId,
arrowStreamNode->id(),
"ArrowStream") {
options_.timestampUnit = static_cast<TimestampUnit>(
driverCtx->queryConfig().arrowBridgeTimestampUnit());
arrowStream_ = arrowStreamNode->arrowStream();
}

Expand Down Expand Up @@ -66,7 +68,7 @@ RowVectorPtr ArrowStream::getOutput() {

// Convert Arrow Array into RowVector and return.
return std::dynamic_pointer_cast<RowVector>(
importFromArrowAsOwner(arrowSchema, arrowArray, pool()));
importFromArrowAsOwner(arrowSchema, arrowArray, options_, pool()));
}

bool ArrowStream::isFinished() {
Expand Down
1 change: 1 addition & 0 deletions velox/exec/ArrowStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class ArrowStream : public SourceOperator {

bool finished_ = false;
std::shared_ptr<ArrowArrayStream> arrowStream_;
ArrowOptions options_;
};

} // namespace facebook::velox::exec
5 changes: 3 additions & 2 deletions velox/exec/tests/ArrowStreamTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class ArrowStreamTest : public OperatorTestBase {

int getNext(struct ArrowArray* outArray) {
if (vectorIndex_ < vectors_.size()) {
exportToArrow(vectors_[vectorIndex_], *outArray, pool_.get());
exportToArrow(vectors_[vectorIndex_], *outArray, pool_.get(), options_);
vectorIndex_ += 1;
} else {
// End of stream. Mark the array released.
Expand All @@ -56,12 +56,13 @@ class ArrowStreamTest : public OperatorTestBase {
}

int getArrowSchema(ArrowSchema& out) {
exportToArrow(BaseVector::create(type_, 0, pool_.get()), out);
exportToArrow(BaseVector::create(type_, 0, pool_.get()), out, options_);
return failGetSchema_ ? (int)ErrorCode::kGetSchemaFailed
: (int)ErrorCode::kNoError;
}

private:
ArrowOptions options_;
const std::shared_ptr<memory::MemoryPool> pool_;
const std::vector<RowVectorPtr>& vectors_;
const TypePtr type_;
Expand Down
Loading

0 comments on commit 64adf58

Please sign in to comment.