Skip to content

Commit

Permalink
Support timestamp units in arrow bridge
Browse files Browse the repository at this point in the history
  • Loading branch information
rui-mo committed Nov 17, 2023
1 parent 3d8a881 commit 44b270a
Show file tree
Hide file tree
Showing 14 changed files with 391 additions and 106 deletions.
5 changes: 5 additions & 0 deletions velox/connectors/hive/HiveConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ uint64_t HiveConfig::sortWriterMaxOutputBytes(const Config* config) {
return config->get<uint64_t>(kSortWriterMaxOutputBytes, 10UL << 20);
}

// static.
uint8_t HiveConfig::arrowBridgeTimestampUnit(const Config* config) {
return config->get<uint8_t>(kArrowBridgeTimestampUnit, 3 /* nano */);
}

uint64_t HiveConfig::getOrcWriterMaxStripeSize(
const Config* connectorQueryCtxConfig,
const Config* connectorPropertiesConfig) {
Expand Down
8 changes: 8 additions & 0 deletions velox/connectors/hive/HiveConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ class HiveConfig {
static constexpr const char* kSortWriterMaxOutputBytes =
"sort_writer_max_output_bytes";

// Timestamp unit used during Velox-Arrow conversion.
static constexpr const char* kArrowBridgeTimestampUnit =
"arrow.bridge.timestamp.unit";

static InsertExistingPartitionsBehavior insertExistingPartitionsBehavior(
const Config* config);

Expand Down Expand Up @@ -176,6 +180,10 @@ class HiveConfig {

static uint64_t sortWriterMaxOutputBytes(const Config* config);

/// Returns the timestamp unit used in Velox-Arrow conversion.
/// 0: second, 1: milli, 2: micro, 3: nano.
static uint8_t arrowBridgeTimestampUnit(const Config* config);

static uint64_t getOrcWriterMaxStripeSize(
const Config* connectorQueryCtxConfig,
const Config* connectorPropertiesConfig);
Expand Down
2 changes: 2 additions & 0 deletions velox/connectors/hive/HiveDataSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,8 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) {
options.maxDictionaryMemory =
std::optional(HiveConfig::getOrcWriterMaxDictionaryMemory(
connectorQueryCtx_->config(), connectorProperties_.get()));
options.arrowBridgeTimestampUnit =
HiveConfig::arrowBridgeTimestampUnit(connectorQueryCtx_->config());
ioStats_.emplace_back(std::make_shared<io::IoStatistics>());

// Prevents the memory allocation during the writer creation.
Expand Down
11 changes: 11 additions & 0 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,10 @@ class QueryConfig {
static constexpr const char* kEnableExpressionEvaluationCache =
"enable_expression_evaluation_cache";

// Timestamp unit used during Velox-Arrow conversion.
static constexpr const char* kArrowBridgeTimestampUnit =
"arrow_bridge_timestamp_unit";

uint64_t queryMaxMemoryPerNode() const {
return toCapacity(
get<std::string>(kQueryMaxMemoryPerNode, "0B"), CapacityUnit::BYTE);
Expand Down Expand Up @@ -549,6 +553,13 @@ class QueryConfig {
return get<uint8_t>(kSpillStartPartitionBit, kDefaultStartBit);
}

/// Returns the timestamp unit used in Velox-Arrow conversion.
/// 0: second, 1: milli, 2: micro, 3: nano.
uint8_t arrowBridgeTimestampUnit() const {
constexpr uint8_t kDefaultUnit = 3;
return get<uint8_t>(kArrowBridgeTimestampUnit, kDefaultUnit);
}

/// Returns the number of bits used to calculate the spilling partition
/// number for hash join. The number of spilling partitions will be power of
/// two.
Expand Down
1 change: 1 addition & 0 deletions velox/dwio/common/Options.h
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,7 @@ struct WriterOptions {
std::optional<uint64_t> maxStripeSize{std::nullopt};
std::optional<uint64_t> maxDictionaryMemory{std::nullopt};
std::map<std::string, std::string> serdeParameters;
std::optional<uint8_t> arrowBridgeTimestampUnit;
};

} // namespace facebook::velox::dwio::common
12 changes: 8 additions & 4 deletions velox/dwio/parquet/writer/Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
* limitations under the License.
*/

#include "velox/vector/arrow/Bridge.h"

#include <arrow/c/bridge.h>
#include <arrow/io/interfaces.h>
#include <arrow/table.h>
Expand Down Expand Up @@ -155,6 +153,8 @@ Writer::Writer(
} else {
flushPolicy_ = std::make_unique<DefaultFlushPolicy>();
}
options_.timestampUnit =
static_cast<TimestampUnit>(options.arrowBridgeTimestampUnit);
arrowContext_->properties =
getArrowParquetWriterOptions(options, flushPolicy_);
}
Expand Down Expand Up @@ -226,8 +226,8 @@ dwio::common::StripeProgress getStripeProgress(
void Writer::write(const VectorPtr& data) {
ArrowArray array;
ArrowSchema schema;
exportToArrow(data, array, generalPool_.get());
exportToArrow(data, schema);
exportToArrow(data, options_, array, generalPool_.get());
exportToArrow(data, options_, schema);
PARQUET_ASSIGN_OR_THROW(
auto recordBatch, ::arrow::ImportRecordBatch(&array, &schema));
if (!arrowContext_->schema) {
Expand Down Expand Up @@ -287,6 +287,10 @@ parquet::WriterOptions getParquetOptions(
if (options.compressionKind.has_value()) {
parquetOptions.compression = options.compressionKind.value();
}
if (options.arrowBridgeTimestampUnit.has_value()) {
parquetOptions.arrowBridgeTimestampUnit =
options.arrowBridgeTimestampUnit.value();
}
return parquetOptions;
}

Expand Down
3 changes: 3 additions & 0 deletions velox/dwio/parquet/writer/Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "velox/dwio/common/WriterFactory.h"
#include "velox/dwio/parquet/writer/arrow/util/Compression.h"
#include "velox/vector/ComplexVector.h"
#include "velox/vector/arrow/Bridge.h"

namespace facebook::velox::parquet {

Expand Down Expand Up @@ -98,6 +99,7 @@ struct WriterOptions {
// policy with the configs in its ctor.
std::function<std::unique_ptr<DefaultFlushPolicy>()> flushPolicyFactory;
std::shared_ptr<CodecOptions> codecOptions;
uint8_t arrowBridgeTimestampUnit = static_cast<uint8_t>(TimestampUnit::kNano);
};

// Writes Velox vectors into a DataSink using Arrow Parquet writer.
Expand Down Expand Up @@ -146,6 +148,7 @@ class Writer : public dwio::common::Writer {
std::shared_ptr<ArrowContext> arrowContext_;

std::unique_ptr<DefaultFlushPolicy> flushPolicy_;
BridgeOptions options_;
};

class ParquetWriterFactory : public dwio::common::WriterFactory {
Expand Down
4 changes: 3 additions & 1 deletion velox/exec/ArrowStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ ArrowStream::ArrowStream(
operatorId,
arrowStreamNode->id(),
"ArrowStream") {
options_.timestampUnit = static_cast<TimestampUnit>(
driverCtx->queryConfig().arrowBridgeTimestampUnit());
arrowStream_ = arrowStreamNode->arrowStream();
}

Expand Down Expand Up @@ -66,7 +68,7 @@ RowVectorPtr ArrowStream::getOutput() {

// Convert Arrow Array into RowVector and return.
return std::dynamic_pointer_cast<RowVector>(
importFromArrowAsOwner(arrowSchema, arrowArray, pool()));
importFromArrowAsOwner(arrowSchema, arrowArray, options_, pool()));
}

bool ArrowStream::isFinished() {
Expand Down
1 change: 1 addition & 0 deletions velox/exec/ArrowStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class ArrowStream : public SourceOperator {

bool finished_ = false;
std::shared_ptr<ArrowArrayStream> arrowStream_;
BridgeOptions options_;
};

} // namespace facebook::velox::exec
5 changes: 3 additions & 2 deletions velox/exec/tests/ArrowStreamTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class ArrowStreamTest : public OperatorTestBase {

int getNext(struct ArrowArray* outArray) {
if (vectorIndex_ < vectors_.size()) {
exportToArrow(vectors_[vectorIndex_], *outArray, pool_.get());
exportToArrow(vectors_[vectorIndex_], options_, *outArray, pool_.get());
vectorIndex_ += 1;
} else {
// End of stream. Mark the array released.
Expand All @@ -56,12 +56,13 @@ class ArrowStreamTest : public OperatorTestBase {
}

int getArrowSchema(ArrowSchema& out) {
exportToArrow(BaseVector::create(type_, 0, pool_.get()), out);
exportToArrow(BaseVector::create(type_, 0, pool_.get()), options_, out);
return failGetSchema_ ? (int)ErrorCode::kGetSchemaFailed
: (int)ErrorCode::kNoError;
}

private:
BridgeOptions options_;
const std::shared_ptr<memory::MemoryPool> pool_;
const std::vector<RowVectorPtr>& vectors_;
const TypePtr type_;
Expand Down
Loading

0 comments on commit 44b270a

Please sign in to comment.