diff --git a/velox/connectors/hive/HiveDataSink.cpp b/velox/connectors/hive/HiveDataSink.cpp index b68917bbd63c..342da7c5b9f9 100644 --- a/velox/connectors/hive/HiveDataSink.cpp +++ b/velox/connectors/hive/HiveDataSink.cpp @@ -488,6 +488,7 @@ void HiveDataSink::write(size_t index, RowVectorPtr input) { auto dataInput = makeDataInput(dataChannels_, input); writers_[index]->write(dataInput); + writerInfo_[index]->inputSizeInBytes += dataInput->estimateFlatSize(); writerInfo_[index]->numWrittenRows += dataInput->size(); } @@ -661,9 +662,7 @@ std::vector HiveDataSink::close() { ("targetFileName", info->writerParameters.targetFileName()) ("fileSize", ioStats_.at(i)->rawBytesWritten()))) ("rowCount", info->numWrittenRows) - // TODO(gaoge): track and send the fields when inMemoryDataSizeInBytes - // and containsNumberedFileNames are needed at coordinator when file_renaming_enabled are turned on. - ("inMemoryDataSizeInBytes", 0) + ("inMemoryDataSizeInBytes", info->inputSizeInBytes) ("onDiskDataSizeInBytes", ioStats_.at(i)->rawBytesWritten()) ("containsNumberedFileNames", true)); // clang-format on diff --git a/velox/connectors/hive/HiveDataSink.h b/velox/connectors/hive/HiveDataSink.h index 72269131f29f..9f09c7aac140 100644 --- a/velox/connectors/hive/HiveDataSink.h +++ b/velox/connectors/hive/HiveDataSink.h @@ -384,6 +384,7 @@ struct HiveWriterInfo { const std::shared_ptr sinkPool; const std::shared_ptr sortPool; int64_t numWrittenRows = 0; + int64_t inputSizeInBytes = 0; }; /// Identifies a hive writer. diff --git a/velox/exec/tests/TableWriteTest.cpp b/velox/exec/tests/TableWriteTest.cpp index 4a0b0a4890af..6e4a171a3c2c 100644 --- a/velox/exec/tests/TableWriteTest.cpp +++ b/velox/exec/tests/TableWriteTest.cpp @@ -2631,6 +2631,7 @@ TEST_P(AllTableWriterTest, tableWriteOutputCheck) { std::filesystem::path path{writeFileFullPath}; const auto actualFileSize = fs::file_size(path); ASSERT_EQ(obj["onDiskDataSizeInBytes"].asInt(), actualFileSize); + ASSERT_GT(obj["inMemoryDataSizeInBytes"].asInt(), 0); ASSERT_EQ(writerInfoObj["fileSize"], actualFileSize); if (commitStrategy_ == CommitStrategy::kNoCommit) { ASSERT_EQ(writeFileName, targetFileName);