Skip to content

Commit

Permalink
apacheGH-29238 [C++][Dataset][Parquet] Support parquet modular encryp…
Browse files Browse the repository at this point in the history
…tion in the new Dataset API (apache#34616)

### Rationale for this change

The purpose of this pull request is to support modular encryption in the new Dataset API.  See [https://docs.google.com/document/d/13EysCNC6-Nu9wnJ8YpdzmD-aMLn4i2KXUJTNqIihy7A/edit#](url) for supporting document.

### What changes are included in this PR?

I made improvements to the C++ and Python code to enable the Dataset API to have per-file settings for each file saved. Previously, the Dataset API applied the same encryption properties to all saved files, but now I've updated the code to allow for greater flexibility. In the Python code, I've added support for the changes by updating the ParquetFormat class to accept DatasetEncryptionConfiguration and DatasetDecryptionConfiguration structures. With these changes, you can pass the format object to the write_dataset function, giving you the ability to set unique encryption properties for each file in your Dataset.

### Are these changes tested?

Yes, unit tests are included. I have also included a python sample project.

### Are there any user-facing changes?

Yes,  as stated above the ParquetFormat class has optional parameters for DatasetEncryptionConfiguration and DatasetDecryptionConfiguration through setters and getters.  The Dataset now has the option using this to set different file encryption properties per file

* Closes: apache#29238

Lead-authored-by: Don <[email protected]>
Co-authored-by: Donald Tolley <[email protected]>
Co-authored-by: Joris Van den Bossche <[email protected]>
Co-authored-by: anjakefala <[email protected]>
Co-authored-by: Sutou Kouhei <[email protected]>
Co-authored-by: Weston Pace <[email protected]>
Co-authored-by: Gang Wu <[email protected]>
Co-authored-by: scoder <[email protected]>
Co-authored-by: Will Jones <[email protected]>
Signed-off-by: Joris Van den Bossche <[email protected]>
  • Loading branch information
8 people authored and Jeremy Aguilon committed Oct 23, 2023
1 parent ba7b240 commit 11dbbbe
Show file tree
Hide file tree
Showing 19 changed files with 1,152 additions and 141 deletions.
7 changes: 7 additions & 0 deletions cpp/src/arrow/dataset/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,13 @@ endif()

if(ARROW_PARQUET)
add_arrow_dataset_test(file_parquet_test)
if(PARQUET_REQUIRE_ENCRYPTION AND ARROW_DATASET)
add_arrow_dataset_test(file_parquet_encryption_test
SOURCES
file_parquet_encryption_test.cc
${PROJECT_SOURCE_DIR}/src/parquet/encryption/test_in_memory_kms.cc
)
endif()
endif()

if(ARROW_BUILD_BENCHMARKS)
Expand Down
75 changes: 67 additions & 8 deletions cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

#include "arrow/compute/exec.h"
#include "arrow/dataset/dataset_internal.h"
#include "arrow/dataset/parquet_encryption_config.h"
#include "arrow/dataset/scanner.h"
#include "arrow/filesystem/path_util.h"
#include "arrow/table.h"
Expand All @@ -38,6 +39,9 @@
#include "parquet/arrow/reader.h"
#include "parquet/arrow/schema.h"
#include "parquet/arrow/writer.h"
#include "parquet/encryption/crypto_factory.h"
#include "parquet/encryption/encryption.h"
#include "parquet/encryption/kms_client.h"
#include "parquet/file_reader.h"
#include "parquet/properties.h"
#include "parquet/statistics.h"
Expand All @@ -58,6 +62,7 @@ namespace {

parquet::ReaderProperties MakeReaderProperties(
const ParquetFileFormat& format, ParquetFragmentScanOptions* parquet_scan_options,
const std::string& path = "", std::shared_ptr<fs::FileSystem> filesystem = nullptr,
MemoryPool* pool = default_memory_pool()) {
// Can't mutate pool after construction
parquet::ReaderProperties properties(pool);
Expand All @@ -67,8 +72,28 @@ parquet::ReaderProperties MakeReaderProperties(
properties.disable_buffered_stream();
}
properties.set_buffer_size(parquet_scan_options->reader_properties->buffer_size());

#ifdef PARQUET_REQUIRE_ENCRYPTION
auto parquet_decrypt_config = parquet_scan_options->parquet_decryption_config;

if (parquet_decrypt_config != nullptr) {
auto file_decryption_prop =
parquet_decrypt_config->crypto_factory->GetFileDecryptionProperties(
*parquet_decrypt_config->kms_connection_config,
*parquet_decrypt_config->decryption_config, path, filesystem);

parquet_scan_options->reader_properties->file_decryption_properties(
std::move(file_decryption_prop));
}
#else
if (parquet_scan_options->parquet_decryption_config != nullptr) {
parquet::ParquetException::NYI("Encryption is not supported in this build.");
}
#endif

properties.file_decryption_properties(
parquet_scan_options->reader_properties->file_decryption_properties());

properties.set_thrift_string_size_limit(
parquet_scan_options->reader_properties->thrift_string_size_limit());
properties.set_thrift_container_size_limit(
Expand Down Expand Up @@ -438,7 +463,7 @@ Result<std::shared_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReader
GetFragmentScanOptions<ParquetFragmentScanOptions>(kParquetTypeName, options.get(),
default_fragment_scan_options));
auto properties =
MakeReaderProperties(*this, parquet_scan_options.get(), options->pool);
MakeReaderProperties(*this, parquet_scan_options.get(), "", nullptr, options->pool);
ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
// `parquet::ParquetFileReader::Open` will not wrap the exception as status,
// so using `open_parquet_file` to wrap it.
Expand Down Expand Up @@ -477,9 +502,13 @@ Future<std::shared_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReader
auto parquet_scan_options,
GetFragmentScanOptions<ParquetFragmentScanOptions>(kParquetTypeName, options.get(),
default_fragment_scan_options));
auto properties =
MakeReaderProperties(*this, parquet_scan_options.get(), options->pool);

auto properties = MakeReaderProperties(*this, parquet_scan_options.get(), source.path(),
source.filesystem(), options->pool);
ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
// TODO(ARROW-12259): workaround since we have Future<(move-only type)>
auto reader_fut = parquet::ParquetFileReader::OpenAsync(
std::move(input), std::move(properties), metadata);
auto path = source.path();
auto self = checked_pointer_cast<const ParquetFileFormat>(shared_from_this());

return source.OpenAsync().Then(
Expand Down Expand Up @@ -666,10 +695,40 @@ Result<std::shared_ptr<FileWriter>> ParquetFileFormat::MakeWriter(
auto parquet_options = checked_pointer_cast<ParquetFileWriteOptions>(options);

std::unique_ptr<parquet::arrow::FileWriter> parquet_writer;
ARROW_ASSIGN_OR_RAISE(parquet_writer, parquet::arrow::FileWriter::Open(
*schema, default_memory_pool(), destination,
parquet_options->writer_properties,
parquet_options->arrow_writer_properties));

#ifdef PARQUET_REQUIRE_ENCRYPTION
auto parquet_encrypt_config = parquet_options->parquet_encryption_config;

if (parquet_encrypt_config != nullptr) {
auto file_encryption_prop =
parquet_encrypt_config->crypto_factory->GetFileEncryptionProperties(
*parquet_encrypt_config->kms_connection_config,
*parquet_encrypt_config->encryption_config, destination_locator.path,
destination_locator.filesystem);

auto writer_properties =
parquet::WriterProperties::Builder(*parquet_options->writer_properties)
.encryption(std::move(file_encryption_prop))
->build();

ARROW_ASSIGN_OR_RAISE(
parquet_writer, parquet::arrow::FileWriter::Open(
*schema, writer_properties->memory_pool(), destination,
writer_properties, parquet_options->arrow_writer_properties));
}
#else
if (parquet_options->parquet_encryption_config != nullptr) {
return Status::NotImplemented("Encryption is not supported in this build.");
}
#endif

if (parquet_writer == nullptr) {
ARROW_ASSIGN_OR_RAISE(parquet_writer,
parquet::arrow::FileWriter::Open(
*schema, parquet_options->writer_properties->memory_pool(),
destination, parquet_options->writer_properties,
parquet_options->arrow_writer_properties));
}

return std::shared_ptr<FileWriter>(
new ParquetFileWriter(std::move(destination), std::move(parquet_writer),
Expand Down
8 changes: 8 additions & 0 deletions cpp/src/arrow/dataset/file_parquet.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ struct SchemaManifest;
namespace arrow {
namespace dataset {

struct ParquetDecryptionConfig;
struct ParquetEncryptionConfig;

/// \addtogroup dataset-file-formats
///
/// @{
Expand Down Expand Up @@ -226,6 +229,8 @@ class ARROW_DS_EXPORT ParquetFragmentScanOptions : public FragmentScanOptions {
/// ScanOptions. Additionally, dictionary columns come from
/// ParquetFileFormat::ReaderOptions::dict_columns.
std::shared_ptr<parquet::ArrowReaderProperties> arrow_reader_properties;
/// A configuration structure that provides decryption properties for a dataset
std::shared_ptr<ParquetDecryptionConfig> parquet_decryption_config = NULLPTR;
};

class ARROW_DS_EXPORT ParquetFileWriteOptions : public FileWriteOptions {
Expand All @@ -236,6 +241,9 @@ class ARROW_DS_EXPORT ParquetFileWriteOptions : public FileWriteOptions {
/// \brief Parquet Arrow writer properties.
std::shared_ptr<parquet::ArrowWriterProperties> arrow_writer_properties;

// A configuration structure that provides encryption properties for a dataset
std::shared_ptr<ParquetEncryptionConfig> parquet_encryption_config = NULLPTR;

protected:
explicit ParquetFileWriteOptions(std::shared_ptr<FileFormat> format)
: FileWriteOptions(std::move(format)) {}
Expand Down
216 changes: 216 additions & 0 deletions cpp/src/arrow/dataset/file_parquet_encryption_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <string_view>

#include "gtest/gtest.h"

#include <arrow/dataset/dataset.h>
#include <arrow/dataset/file_base.h>
#include <arrow/dataset/file_parquet.h>
#include "arrow/array.h"
#include "arrow/dataset/parquet_encryption_config.h"
#include "arrow/dataset/partition.h"
#include "arrow/filesystem/mockfs.h"
#include "arrow/io/api.h"
#include "arrow/status.h"
#include "arrow/table.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/type.h"
#include "parquet/arrow/reader.h"
#include "parquet/encryption/crypto_factory.h"
#include "parquet/encryption/encryption.h"
#include "parquet/encryption/kms_client.h"
#include "parquet/encryption/test_in_memory_kms.h"

constexpr std::string_view kFooterKeyMasterKey = "0123456789012345";
constexpr std::string_view kFooterKeyMasterKeyId = "footer_key";
constexpr std::string_view kFooterKeyName = "footer_key";
constexpr std::string_view kColumnMasterKey = "1234567890123450";
constexpr std::string_view kColumnMasterKeyId = "col_key";
constexpr std::string_view kColumnKeyMapping = "col_key: a";
constexpr std::string_view kBaseDir = "";

using arrow::internal::checked_pointer_cast;

namespace arrow {
namespace dataset {

class DatasetEncryptionTest : public ::testing::Test {
public:
// This function creates a mock file system using the current time point, creates a
// directory with the given base directory path, and writes a dataset to it using
// provided Parquet file write options. The dataset is partitioned using a Hive
// partitioning scheme. The function also checks if the written files exist in the file
// system.
static void SetUpTestSuite() {
// Creates a mock file system using the current time point.
EXPECT_OK_AND_ASSIGN(file_system_, fs::internal::MockFileSystem::Make(
std::chrono::system_clock::now(), {}));
ASSERT_OK(file_system_->CreateDir(std::string(kBaseDir)));

// Prepare table data.
auto table_schema = schema({field("a", int64()), field("c", int64()),
field("e", int64()), field("part", utf8())});
table_ = TableFromJSON(table_schema, {R"([
[ 0, 9, 1, "a" ],
[ 1, 8, 2, "a" ],
[ 2, 7, 1, "c" ],
[ 3, 6, 2, "c" ],
[ 4, 5, 1, "e" ],
[ 5, 4, 2, "e" ],
[ 6, 3, 1, "g" ],
[ 7, 2, 2, "g" ],
[ 8, 1, 1, "i" ],
[ 9, 0, 2, "i" ]
])"});

// Use a Hive-style partitioning scheme.
partitioning_ = std::make_shared<HivePartitioning>(schema({field("part", utf8())}));

// Prepare encryption properties.
std::unordered_map<std::string, std::string> key_map;
key_map.emplace(kColumnMasterKeyId, kColumnMasterKey);
key_map.emplace(kFooterKeyMasterKeyId, kFooterKeyMasterKey);

crypto_factory_ = std::make_shared<parquet::encryption::CryptoFactory>();
auto kms_client_factory =
std::make_shared<parquet::encryption::TestOnlyInMemoryKmsClientFactory>(
/*wrap_locally=*/true, key_map);
crypto_factory_->RegisterKmsClientFactory(std::move(kms_client_factory));
kms_connection_config_ = std::make_shared<parquet::encryption::KmsConnectionConfig>();

// Set write options with encryption configuration.
auto encryption_config =
std::make_shared<parquet::encryption::EncryptionConfiguration>(
std::string(kFooterKeyName));
encryption_config->column_keys = kColumnKeyMapping;
auto parquet_encryption_config = std::make_shared<ParquetEncryptionConfig>();
// Directly assign shared_ptr objects to ParquetEncryptionConfig members
parquet_encryption_config->crypto_factory = crypto_factory_;
parquet_encryption_config->kms_connection_config = kms_connection_config_;
parquet_encryption_config->encryption_config = std::move(encryption_config);

auto file_format = std::make_shared<ParquetFileFormat>();
auto parquet_file_write_options =
checked_pointer_cast<ParquetFileWriteOptions>(file_format->DefaultWriteOptions());
parquet_file_write_options->parquet_encryption_config =
std::move(parquet_encryption_config);

// Write dataset.
auto dataset = std::make_shared<InMemoryDataset>(table_);
EXPECT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan());
EXPECT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish());

FileSystemDatasetWriteOptions write_options;
write_options.file_write_options = parquet_file_write_options;
write_options.filesystem = file_system_;
write_options.base_dir = kBaseDir;
write_options.partitioning = partitioning_;
write_options.basename_template = "part{i}.parquet";
ASSERT_OK(FileSystemDataset::Write(write_options, std::move(scanner)));
}

protected:
inline static std::shared_ptr<fs::FileSystem> file_system_;
inline static std::shared_ptr<Table> table_;
inline static std::shared_ptr<HivePartitioning> partitioning_;
inline static std::shared_ptr<parquet::encryption::CryptoFactory> crypto_factory_;
inline static std::shared_ptr<parquet::encryption::KmsConnectionConfig>
kms_connection_config_;
};

// This test demonstrates the process of writing a partitioned Parquet file with the same
// encryption properties applied to each file within the dataset. The encryption
// properties are determined based on the selected columns. After writing the dataset, the
// test reads the data back and verifies that it can be successfully decrypted and
// scanned.
TEST_F(DatasetEncryptionTest, WriteReadDatasetWithEncryption) {
// Create decryption properties.
auto decryption_config =
std::make_shared<parquet::encryption::DecryptionConfiguration>();
auto parquet_decryption_config = std::make_shared<ParquetDecryptionConfig>();
parquet_decryption_config->crypto_factory = crypto_factory_;
parquet_decryption_config->kms_connection_config = kms_connection_config_;
parquet_decryption_config->decryption_config = std::move(decryption_config);

// Set scan options.
auto parquet_scan_options = std::make_shared<ParquetFragmentScanOptions>();
parquet_scan_options->parquet_decryption_config = std::move(parquet_decryption_config);

auto file_format = std::make_shared<ParquetFileFormat>();
file_format->default_fragment_scan_options = std::move(parquet_scan_options);

// Get FileInfo objects for all files under the base directory
fs::FileSelector selector;
selector.base_dir = kBaseDir;
selector.recursive = true;

FileSystemFactoryOptions factory_options;
factory_options.partitioning = partitioning_;
factory_options.partition_base_dir = kBaseDir;
ASSERT_OK_AND_ASSIGN(auto dataset_factory,
FileSystemDatasetFactory::Make(file_system_, selector, file_format,
factory_options));

// Read dataset into table
ASSERT_OK_AND_ASSIGN(auto dataset, dataset_factory->Finish());
ASSERT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan());
ASSERT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish());
ASSERT_OK_AND_ASSIGN(auto read_table, scanner->ToTable());

// Verify the data was read correctly
ASSERT_OK_AND_ASSIGN(auto combined_table, read_table->CombineChunks());
// Validate the table
ASSERT_OK(combined_table->ValidateFull());
AssertTablesEqual(*combined_table, *table_);
}

// Read a single parquet file with and without decryption properties.
TEST_F(DatasetEncryptionTest, ReadSingleFile) {
// Open the Parquet file.
ASSERT_OK_AND_ASSIGN(auto input, file_system_->OpenInputFile("part=a/part0.parquet"));

// Try to read metadata without providing decryption properties
// when the footer is encrypted.
ASSERT_THROW(parquet::ReadMetaData(input), parquet::ParquetException);

// Create the ReaderProperties object using the FileDecryptionProperties object
auto decryption_config =
std::make_shared<parquet::encryption::DecryptionConfiguration>();
auto file_decryption_properties = crypto_factory_->GetFileDecryptionProperties(
*kms_connection_config_, *decryption_config);
auto reader_properties = parquet::default_reader_properties();
reader_properties.file_decryption_properties(file_decryption_properties);

// Read entire file as a single Arrow table
parquet::arrow::FileReaderBuilder reader_builder;
ASSERT_OK(reader_builder.Open(input, reader_properties));
ASSERT_OK_AND_ASSIGN(auto arrow_reader, reader_builder.Build());
std::shared_ptr<Table> table;
ASSERT_OK(arrow_reader->ReadTable(&table));

// Check the contents of the table
ASSERT_EQ(table->num_rows(), 2);
ASSERT_EQ(table->num_columns(), 3);
ASSERT_EQ(checked_pointer_cast<Int64Array>(table->column(0)->chunk(0))->GetView(0), 0);
ASSERT_EQ(checked_pointer_cast<Int64Array>(table->column(1)->chunk(0))->GetView(0), 9);
ASSERT_EQ(checked_pointer_cast<Int64Array>(table->column(2)->chunk(0))->GetView(0), 1);
}

} // namespace dataset
} // namespace arrow
Loading

0 comments on commit 11dbbbe

Please sign in to comment.