diff --git a/velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp b/velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp index d5bc8529cd8a..6eec3f02ab2f 100644 --- a/velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp +++ b/velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp @@ -150,6 +150,7 @@ DEBUG_ONLY_TEST_F(ParquetWriterTest, unitFromWriterOptions) { std::dynamic_pointer_cast<::arrow::TimestampType>( arrowSchema->field(0)->type()); ASSERT_EQ(tsType->unit(), ::arrow::TimeUnit::MICRO); + ASSERT_EQ(tsType->timezone(), "America/Los_Angeles"); }))); const auto data = makeRowVector({makeFlatVector( @@ -157,6 +158,7 @@ DEBUG_ONLY_TEST_F(ParquetWriterTest, unitFromWriterOptions) { parquet::WriterOptions writerOptions; writerOptions.memoryPool = leafPool_.get(); writerOptions.parquetWriteTimestampUnit = TimestampUnit::kMicro; + writerOptions.parquetWriteTimestampTimeZone = "America/Los_Angeles"; // Create an in-memory writer. auto sink = std::make_unique( diff --git a/velox/dwio/parquet/writer/Writer.cpp b/velox/dwio/parquet/writer/Writer.cpp index d508b53356f8..01378279dbf0 100644 --- a/velox/dwio/parquet/writer/Writer.cpp +++ b/velox/dwio/parquet/writer/Writer.cpp @@ -19,6 +19,7 @@ #include #include #include "velox/common/testutil/TestValue.h" +#include "velox/core/QueryConfig.h" #include "velox/dwio/parquet/writer/arrow/Properties.h" #include "velox/dwio/parquet/writer/arrow/Writer.h" #include "velox/exec/MemoryReclaimer.h" @@ -237,6 +238,8 @@ Writer::Writer( } options_.timestampUnit = options.parquetWriteTimestampUnit.value_or(TimestampUnit::kNano); + options_.timestampTimeZone = + options.parquetWriteTimestampTimeZone.value_or(""); arrowContext_->properties = getArrowParquetWriterOptions(options, flushPolicy_); setMemoryReclaimers(); @@ -421,6 +424,15 @@ std::optional getTimestampUnit( return std::nullopt; } +std::optional getTimestampTimeZone( + const Config& config, + const char* configKey) { + if (const auto timezone = config.get(configKey)) { + return std::optional(static_cast(timezone.value())); + } + return std::nullopt; +} + } // namespace void WriterOptions::processSessionConfigs(const Config& config) { @@ -428,6 +440,11 @@ void WriterOptions::processSessionConfigs(const Config& config) { parquetWriteTimestampUnit = getTimestampUnit(config, kParquetSessionWriteTimestampUnit); } + + if (!parquetWriteTimestampTimeZone) { + parquetWriteTimestampTimeZone = + getTimestampTimeZone(config, core::QueryConfig::kSessionTimezone); + } } void WriterOptions::processHiveConnectorConfigs(const Config& config) { @@ -435,6 +452,11 @@ void WriterOptions::processHiveConnectorConfigs(const Config& config) { parquetWriteTimestampUnit = getTimestampUnit(config, kParquetHiveConnectorWriteTimestampUnit); } + + if (!parquetWriteTimestampTimeZone) { + parquetWriteTimestampTimeZone = + getTimestampTimeZone(config, core::QueryConfig::kSessionTimezone); + } } std::unique_ptr ParquetWriterFactory::createWriter( diff --git a/velox/dwio/parquet/writer/Writer.h b/velox/dwio/parquet/writer/Writer.h index 6901c2dc3be7..06629f48bfdd 100644 --- a/velox/dwio/parquet/writer/Writer.h +++ b/velox/dwio/parquet/writer/Writer.h @@ -109,7 +109,8 @@ struct WriterOptions : public dwio::common::WriterOptions { /// Timestamp unit for Parquet write through Arrow bridge. /// Default if not specified: TimestampUnit::kNano (9). std::optional parquetWriteTimestampUnit; - bool writeInt96AsTimestamp = false; + std::optional parquetWriteTimestampTimeZone; + bool writeInt96AsTimestamp = true; // Parsing session and hive configs. diff --git a/velox/vector/arrow/Bridge.cpp b/velox/vector/arrow/Bridge.cpp index 8d9fa46ca9cb..3b8f7d63a6d7 100644 --- a/velox/vector/arrow/Bridge.cpp +++ b/velox/vector/arrow/Bridge.cpp @@ -124,7 +124,8 @@ struct VeloxToArrowSchemaBridgeHolder { std::unique_ptr dictionary; - // Buffer required to generate a decimal format. + // Buffer required to generate a decimal format or timestamp with timezone + // format. std::string formatBuffer; void setChildAtIndex( @@ -212,6 +213,28 @@ static void releaseArrowSchema(ArrowSchema* arrowSchema) { arrowSchema->private_data = nullptr; } +const char* exportArrowFormatTimestampStr( + const ArrowOptions& options, + std::string& formatBuffer) { + switch (options.timestampUnit) { + case TimestampUnit::kSecond: + formatBuffer = fmt::format("tss:{}", options.timestampTimeZone); + break; + case TimestampUnit::kMilli: + formatBuffer = fmt::format("tsm:{}", options.timestampTimeZone); + break; + case TimestampUnit::kMicro: + formatBuffer = fmt::format("tsu:{}", options.timestampTimeZone); + break; + case TimestampUnit::kNano: + formatBuffer = fmt::format("tsn:{}", options.timestampTimeZone); + break; + default: + VELOX_UNREACHABLE(); + } + return formatBuffer.c_str(); +} + // Returns the Arrow C data interface format type for a given Velox type. const char* exportArrowFormatStr( const TypePtr& type, @@ -255,18 +278,7 @@ const char* exportArrowFormatStr( case TypeKind::UNKNOWN: return "n"; // NullType case TypeKind::TIMESTAMP: - switch (options.timestampUnit) { - case TimestampUnit::kSecond: - return "tss:"; - case TimestampUnit::kMilli: - return "tsm:"; - case TimestampUnit::kMicro: - return "tsu:"; - case TimestampUnit::kNano: - return "tsn:"; - default: - VELOX_UNREACHABLE(); - } + return exportArrowFormatTimestampStr(options, formatBuffer); // Complex/nested types. case TypeKind::ARRAY: static_assert(sizeof(vector_size_t) == 4); diff --git a/velox/vector/arrow/Bridge.h b/velox/vector/arrow/Bridge.h index 7d93a809881f..9cbc03811eab 100644 --- a/velox/vector/arrow/Bridge.h +++ b/velox/vector/arrow/Bridge.h @@ -36,6 +36,7 @@ struct ArrowOptions { bool flattenDictionary{false}; bool flattenConstant{false}; TimestampUnit timestampUnit = TimestampUnit::kNano; + std::string timestampTimeZone = ""; }; namespace facebook::velox { diff --git a/velox/vector/arrow/tests/ArrowBridgeSchemaTest.cpp b/velox/vector/arrow/tests/ArrowBridgeSchemaTest.cpp index 0837c96a09ae..b32242c15842 100644 --- a/velox/vector/arrow/tests/ArrowBridgeSchemaTest.cpp +++ b/velox/vector/arrow/tests/ArrowBridgeSchemaTest.cpp @@ -192,14 +192,15 @@ TEST_F(ArrowBridgeSchemaExportTest, scalar) { testScalarType(VARCHAR(), "u"); testScalarType(VARBINARY(), "z"); + options_.timestampTimeZone = "America/Los_Angeles"; options_.timestampUnit = TimestampUnit::kSecond; - testScalarType(TIMESTAMP(), "tss:"); + testScalarType(TIMESTAMP(), "tss:America/Los_Angeles"); options_.timestampUnit = TimestampUnit::kMilli; - testScalarType(TIMESTAMP(), "tsm:"); + testScalarType(TIMESTAMP(), "tsm:America/Los_Angeles"); options_.timestampUnit = TimestampUnit::kMicro; - testScalarType(TIMESTAMP(), "tsu:"); + testScalarType(TIMESTAMP(), "tsu:America/Los_Angeles"); options_.timestampUnit = TimestampUnit::kNano; - testScalarType(TIMESTAMP(), "tsn:"); + testScalarType(TIMESTAMP(), "tsn:America/Los_Angeles"); testScalarType(DATE(), "tdD"); testScalarType(INTERVAL_YEAR_MONTH(), "tiM");