Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-29238 [C++][Dataset][Parquet] Support parquet modular encryption in the new Dataset API #34616

Merged
merged 45 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from 43 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
3ff1617
parent ed87a5b7f5ee1081d5613532c28de8e687b8e397
tolleybot Mar 17, 2023
32cdd0e
Update python/pyarrow/_dataset_parquet.pyx
tolleybot Sep 28, 2023
abdb585
Update python/pyarrow/_dataset_parquet.pyx
tolleybot Sep 28, 2023
67ecb48
Update python/pyarrow/_dataset_parquet.pyx
tolleybot Sep 28, 2023
0c719c6
Removed the Setup(..) function from ParquetEncryptionConfig, and Parq…
tolleybot Oct 2, 2023
2f0aaa2
Merge branch 'dataset_encryption' of https://github.com/tolleybot/arr…
tolleybot Oct 2, 2023
8440f51
Removed nogil from shared_ptr[CPyCryptoFactory] unwrap(self)
tolleybot Oct 2, 2023
e13f384
removed arrrow/api.h, and arrow/dataset/api.h from file_parquet_encry…
tolleybot Oct 2, 2023
992c5a8
Append underscores in members of classes in src/arrow/dataset/file_pa…
tolleybot Oct 2, 2023
a2056a3
in file_parquet_encryption_test.cc I added some non-unique values in …
tolleybot Oct 2, 2023
a787728
updated parquet_encryption_config.h docstrings
tolleybot Oct 2, 2023
f3867cc
formatting for parquet_encrytion_config.h
tolleybot Oct 2, 2023
d6d3290
updated _dataset_parquet.pyx, ParquetDecryptionConfig, ParquetEncrypt…
tolleybot Oct 2, 2023
98ba975
Fixed issue in ReadSingleFile Test in src/arrow/dataset/file_parquet_…
tolleybot Oct 3, 2023
d5ba855
removed nogil from shared_ptr[CPyCryptoFactory] unwrap(self) in parqu…
tolleybot Oct 3, 2023
7d42aa3
Replace non-existent doxygen command
anjakefala Oct 3, 2023
48a20f7
Run linter
anjakefala Oct 3, 2023
23f0b40
Structure doxygen declarations so variables can be found
anjakefala Oct 3, 2023
4a1daf0
Removing unneeded comments from file_parquet_test.cc
tolleybot Oct 4, 2023
12d7660
Merge branch 'dataset_encryption' of https://github.com/tolleybot/arr…
tolleybot Oct 4, 2023
a70c0f9
changes to move common unwrap functions out of classes
tolleybot Oct 4, 2023
77a818b
moved unwrap functions out of ParquetEncryptionConfig and ParquetDecr…
tolleybot Oct 4, 2023
f7d39a2
Merge branch 'master' into dataset_encryption
tolleybot Oct 4, 2023
7d78847
1. Updated file_parquet.cc to change the type of exception due to the…
tolleybot Oct 5, 2023
6a69ae1
formatting for _dataset_parquet.pyx
tolleybot Oct 5, 2023
c3fe65a
Moving docstring in parquet_encryption_config.h to correct position.
tolleybot Oct 6, 2023
fa9054a
Move property strings above the struct variables
anjakefala Oct 6, 2023
d2f1584
Merge pull request #3 from anjakefala/dataset_encryption
tolleybot Oct 6, 2023
9e25c5e
Update cpp/src/arrow/dataset/parquet_encryption_config.h
tolleybot Oct 9, 2023
5758119
Renaming encryption_enabled to encryption_unavailable in test_dataset…
tolleybot Oct 9, 2023
36ff914
add set_encryption_config to dataset_parquet.pyx ParquetFileWriteOptions
tolleybot Oct 9, 2023
0fa69e9
adding _set_encryption_config to ParquetFileWriteOptions to make sure…
tolleybot Oct 9, 2023
b2bb9f8
attempt to refactor cython code with separate files for with/without …
jorisvandenbossche Oct 9, 2023
1462a4f
use dynamic python import
jorisvandenbossche Oct 10, 2023
6655b28
update for setting encryption config as well
jorisvandenbossche Oct 10, 2023
1c8aa1c
some cleanup
jorisvandenbossche Oct 10, 2023
1f8ee19
update test to ensure written dataset is encrypted
jorisvandenbossche Oct 10, 2023
d74de45
updated test_dataset_encryption.py to validate that a dataset has enc…
tolleybot Oct 10, 2023
46cc668
validating encryption is enabled for a dataset in test pyarrow/tests/…
tolleybot Oct 10, 2023
6777353
Update python/pyarrow/_dataset_parquet_encryption.pyx
jorisvandenbossche Oct 10, 2023
9df68d3
Merge pull request #4 from jorisvandenbossche/dataset-parquet-encrypt…
tolleybot Oct 10, 2023
d7a6b55
Update python/pyarrow/tests/test_dataset_encryption.py
tolleybot Oct 10, 2023
22033b7
Run linter
anjakefala Oct 10, 2023
b323457
Update python/pyarrow/tests/test_dataset_encryption.py
jorisvandenbossche Oct 10, 2023
ced2ed2
Merge branch 'main' into dataset_encryption
anjakefala Oct 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cpp/src/arrow/dataset/CMakeLists.txt
tolleybot marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,13 @@ endif()

if(ARROW_PARQUET)
add_arrow_dataset_test(file_parquet_test)
if(PARQUET_REQUIRE_ENCRYPTION AND ARROW_DATASET)
add_arrow_dataset_test(file_parquet_encryption_test
SOURCES
file_parquet_encryption_test.cc
${PROJECT_SOURCE_DIR}/src/parquet/encryption/test_in_memory_kms.cc
wgtmac marked this conversation as resolved.
Show resolved Hide resolved
)
endif()
endif()

if(ARROW_BUILD_BENCHMARKS)
Expand Down
75 changes: 67 additions & 8 deletions cpp/src/arrow/dataset/file_parquet.cc
wjones127 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

tolleybot marked this conversation as resolved.
Show resolved Hide resolved
#include "arrow/compute/exec.h"
#include "arrow/dataset/dataset_internal.h"
#include "arrow/dataset/parquet_encryption_config.h"
#include "arrow/dataset/scanner.h"
#include "arrow/filesystem/path_util.h"
#include "arrow/table.h"
Expand All @@ -38,6 +39,9 @@
#include "parquet/arrow/reader.h"
#include "parquet/arrow/schema.h"
#include "parquet/arrow/writer.h"
#include "parquet/encryption/crypto_factory.h"
#include "parquet/encryption/encryption.h"
#include "parquet/encryption/kms_client.h"
#include "parquet/file_reader.h"
#include "parquet/properties.h"
#include "parquet/statistics.h"
Expand All @@ -58,6 +62,7 @@ namespace {

parquet::ReaderProperties MakeReaderProperties(
const ParquetFileFormat& format, ParquetFragmentScanOptions* parquet_scan_options,
const std::string& path = "", std::shared_ptr<fs::FileSystem> filesystem = nullptr,
MemoryPool* pool = default_memory_pool()) {
// Can't mutate pool after construction
parquet::ReaderProperties properties(pool);
Expand All @@ -67,8 +72,28 @@ parquet::ReaderProperties MakeReaderProperties(
properties.disable_buffered_stream();
}
properties.set_buffer_size(parquet_scan_options->reader_properties->buffer_size());

#ifdef PARQUET_REQUIRE_ENCRYPTION
tolleybot marked this conversation as resolved.
Show resolved Hide resolved
auto parquet_decrypt_config = parquet_scan_options->parquet_decryption_config;

if (parquet_decrypt_config != nullptr) {
auto file_decryption_prop =
parquet_decrypt_config->crypto_factory->GetFileDecryptionProperties(
*parquet_decrypt_config->kms_connection_config,
*parquet_decrypt_config->decryption_config, path, filesystem);

parquet_scan_options->reader_properties->file_decryption_properties(
std::move(file_decryption_prop));
}
#else
if (parquet_scan_options->parquet_decryption_config != nullptr) {
parquet::ParquetException::NYI("Encryption is not supported in this build.");
}
#endif
pitrou marked this conversation as resolved.
Show resolved Hide resolved

properties.file_decryption_properties(
parquet_scan_options->reader_properties->file_decryption_properties());

properties.set_thrift_string_size_limit(
parquet_scan_options->reader_properties->thrift_string_size_limit());
properties.set_thrift_container_size_limit(
Expand Down Expand Up @@ -438,7 +463,7 @@ Result<std::shared_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReader
GetFragmentScanOptions<ParquetFragmentScanOptions>(kParquetTypeName, options.get(),
default_fragment_scan_options));
auto properties =
MakeReaderProperties(*this, parquet_scan_options.get(), options->pool);
MakeReaderProperties(*this, parquet_scan_options.get(), "", nullptr, options->pool);
ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
// `parquet::ParquetFileReader::Open` will not wrap the exception as status,
// so using `open_parquet_file` to wrap it.
Expand Down Expand Up @@ -477,9 +502,13 @@ Future<std::shared_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReader
auto parquet_scan_options,
GetFragmentScanOptions<ParquetFragmentScanOptions>(kParquetTypeName, options.get(),
default_fragment_scan_options));
auto properties =
MakeReaderProperties(*this, parquet_scan_options.get(), options->pool);

auto properties = MakeReaderProperties(*this, parquet_scan_options.get(), source.path(),
source.filesystem(), options->pool);
ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
// TODO(ARROW-12259): workaround since we have Future<(move-only type)>
auto reader_fut = parquet::ParquetFileReader::OpenAsync(
std::move(input), std::move(properties), metadata);
auto path = source.path();
auto self = checked_pointer_cast<const ParquetFileFormat>(shared_from_this());

return source.OpenAsync().Then(
Expand Down Expand Up @@ -666,10 +695,40 @@ Result<std::shared_ptr<FileWriter>> ParquetFileFormat::MakeWriter(
auto parquet_options = checked_pointer_cast<ParquetFileWriteOptions>(options);

std::unique_ptr<parquet::arrow::FileWriter> parquet_writer;
ARROW_ASSIGN_OR_RAISE(parquet_writer, parquet::arrow::FileWriter::Open(
*schema, default_memory_pool(), destination,
parquet_options->writer_properties,
parquet_options->arrow_writer_properties));

#ifdef PARQUET_REQUIRE_ENCRYPTION
tolleybot marked this conversation as resolved.
Show resolved Hide resolved
auto parquet_encrypt_config = parquet_options->parquet_encryption_config;

if (parquet_encrypt_config != nullptr) {
auto file_encryption_prop =
parquet_encrypt_config->crypto_factory->GetFileEncryptionProperties(
*parquet_encrypt_config->kms_connection_config,
*parquet_encrypt_config->encryption_config, destination_locator.path,
destination_locator.filesystem);

auto writer_properties =
parquet::WriterProperties::Builder(*parquet_options->writer_properties)
.encryption(std::move(file_encryption_prop))
->build();

ARROW_ASSIGN_OR_RAISE(
parquet_writer, parquet::arrow::FileWriter::Open(
*schema, writer_properties->memory_pool(), destination,
writer_properties, parquet_options->arrow_writer_properties));
}
#else
if (parquet_options->parquet_encryption_config != nullptr) {
return Status::NotImplemented("Encryption is not supported in this build.");
}
#endif

if (parquet_writer == nullptr) {
ARROW_ASSIGN_OR_RAISE(parquet_writer,
parquet::arrow::FileWriter::Open(
*schema, parquet_options->writer_properties->memory_pool(),
destination, parquet_options->writer_properties,
parquet_options->arrow_writer_properties));
}

return std::shared_ptr<FileWriter>(
new ParquetFileWriter(std::move(destination), std::move(parquet_writer),
Expand Down
8 changes: 8 additions & 0 deletions cpp/src/arrow/dataset/file_parquet.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ struct SchemaManifest;
namespace arrow {
namespace dataset {

struct ParquetDecryptionConfig;
struct ParquetEncryptionConfig;

/// \addtogroup dataset-file-formats
///
/// @{
Expand Down Expand Up @@ -226,6 +229,8 @@ class ARROW_DS_EXPORT ParquetFragmentScanOptions : public FragmentScanOptions {
/// ScanOptions. Additionally, dictionary columns come from
/// ParquetFileFormat::ReaderOptions::dict_columns.
std::shared_ptr<parquet::ArrowReaderProperties> arrow_reader_properties;
/// A configuration structure that provides decryption properties for a dataset
std::shared_ptr<ParquetDecryptionConfig> parquet_decryption_config = NULLPTR;
};

class ARROW_DS_EXPORT ParquetFileWriteOptions : public FileWriteOptions {
Expand All @@ -236,6 +241,9 @@ class ARROW_DS_EXPORT ParquetFileWriteOptions : public FileWriteOptions {
/// \brief Parquet Arrow writer properties.
std::shared_ptr<parquet::ArrowWriterProperties> arrow_writer_properties;

// A configuration structure that provides encryption properties for a dataset
std::shared_ptr<ParquetEncryptionConfig> parquet_encryption_config = NULLPTR;

protected:
explicit ParquetFileWriteOptions(std::shared_ptr<FileFormat> format)
: FileWriteOptions(std::move(format)) {}
Expand Down
216 changes: 216 additions & 0 deletions cpp/src/arrow/dataset/file_parquet_encryption_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <string_view>

#include "gtest/gtest.h"

#include <arrow/dataset/dataset.h>
#include <arrow/dataset/file_base.h>
#include <arrow/dataset/file_parquet.h>
#include "arrow/array.h"
#include "arrow/dataset/parquet_encryption_config.h"
#include "arrow/dataset/partition.h"
#include "arrow/filesystem/mockfs.h"
#include "arrow/io/api.h"
#include "arrow/status.h"
#include "arrow/table.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/type.h"
#include "parquet/arrow/reader.h"
#include "parquet/encryption/crypto_factory.h"
#include "parquet/encryption/encryption.h"
#include "parquet/encryption/kms_client.h"
#include "parquet/encryption/test_in_memory_kms.h"

constexpr std::string_view kFooterKeyMasterKey = "0123456789012345";
constexpr std::string_view kFooterKeyMasterKeyId = "footer_key";
constexpr std::string_view kFooterKeyName = "footer_key";
constexpr std::string_view kColumnMasterKey = "1234567890123450";
constexpr std::string_view kColumnMasterKeyId = "col_key";
constexpr std::string_view kColumnKeyMapping = "col_key: a";
constexpr std::string_view kBaseDir = "";

using arrow::internal::checked_pointer_cast;

namespace arrow {
namespace dataset {

class DatasetEncryptionTest : public ::testing::Test {
public:
// This function creates a mock file system using the current time point, creates a
// directory with the given base directory path, and writes a dataset to it using
// provided Parquet file write options. The dataset is partitioned using a Hive
// partitioning scheme. The function also checks if the written files exist in the file
// system.
static void SetUpTestSuite() {
// Creates a mock file system using the current time point.
EXPECT_OK_AND_ASSIGN(file_system_, fs::internal::MockFileSystem::Make(
std::chrono::system_clock::now(), {}));
ASSERT_OK(file_system_->CreateDir(std::string(kBaseDir)));

// Prepare table data.
auto table_schema = schema({field("a", int64()), field("c", int64()),
field("e", int64()), field("part", utf8())});
table_ = TableFromJSON(table_schema, {R"([
[ 0, 9, 1, "a" ],
[ 1, 8, 2, "a" ],
[ 2, 7, 1, "c" ],
[ 3, 6, 2, "c" ],
[ 4, 5, 1, "e" ],
[ 5, 4, 2, "e" ],
[ 6, 3, 1, "g" ],
[ 7, 2, 2, "g" ],
[ 8, 1, 1, "i" ],
[ 9, 0, 2, "i" ]
])"});

// Use a Hive-style partitioning scheme.
partitioning_ = std::make_shared<HivePartitioning>(schema({field("part", utf8())}));

// Prepare encryption properties.
std::unordered_map<std::string, std::string> key_map;
key_map.emplace(kColumnMasterKeyId, kColumnMasterKey);
key_map.emplace(kFooterKeyMasterKeyId, kFooterKeyMasterKey);

crypto_factory_ = std::make_shared<parquet::encryption::CryptoFactory>();
auto kms_client_factory =
std::make_shared<parquet::encryption::TestOnlyInMemoryKmsClientFactory>(
/*wrap_locally=*/true, key_map);
crypto_factory_->RegisterKmsClientFactory(std::move(kms_client_factory));
kms_connection_config_ = std::make_shared<parquet::encryption::KmsConnectionConfig>();

// Set write options with encryption configuration.
auto encryption_config =
std::make_shared<parquet::encryption::EncryptionConfiguration>(
std::string(kFooterKeyName));
encryption_config->column_keys = kColumnKeyMapping;
auto parquet_encryption_config = std::make_shared<ParquetEncryptionConfig>();
// Directly assign shared_ptr objects to ParquetEncryptionConfig members
parquet_encryption_config->crypto_factory = crypto_factory_;
parquet_encryption_config->kms_connection_config = kms_connection_config_;
parquet_encryption_config->encryption_config = std::move(encryption_config);

auto file_format = std::make_shared<ParquetFileFormat>();
auto parquet_file_write_options =
checked_pointer_cast<ParquetFileWriteOptions>(file_format->DefaultWriteOptions());
parquet_file_write_options->parquet_encryption_config =
std::move(parquet_encryption_config);

// Write dataset.
auto dataset = std::make_shared<InMemoryDataset>(table_);
EXPECT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan());
EXPECT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish());

FileSystemDatasetWriteOptions write_options;
write_options.file_write_options = parquet_file_write_options;
write_options.filesystem = file_system_;
write_options.base_dir = kBaseDir;
write_options.partitioning = partitioning_;
write_options.basename_template = "part{i}.parquet";
ASSERT_OK(FileSystemDataset::Write(write_options, std::move(scanner)));
}

protected:
inline static std::shared_ptr<fs::FileSystem> file_system_;
inline static std::shared_ptr<Table> table_;
inline static std::shared_ptr<HivePartitioning> partitioning_;
inline static std::shared_ptr<parquet::encryption::CryptoFactory> crypto_factory_;
inline static std::shared_ptr<parquet::encryption::KmsConnectionConfig>
kms_connection_config_;
};

// This test demonstrates the process of writing a partitioned Parquet file with the same
// encryption properties applied to each file within the dataset. The encryption
// properties are determined based on the selected columns. After writing the dataset, the
// test reads the data back and verifies that it can be successfully decrypted and
// scanned.
TEST_F(DatasetEncryptionTest, WriteReadDatasetWithEncryption) {
// Create decryption properties.
auto decryption_config =
std::make_shared<parquet::encryption::DecryptionConfiguration>();
auto parquet_decryption_config = std::make_shared<ParquetDecryptionConfig>();
parquet_decryption_config->crypto_factory = crypto_factory_;
parquet_decryption_config->kms_connection_config = kms_connection_config_;
parquet_decryption_config->decryption_config = std::move(decryption_config);

// Set scan options.
auto parquet_scan_options = std::make_shared<ParquetFragmentScanOptions>();
parquet_scan_options->parquet_decryption_config = std::move(parquet_decryption_config);

auto file_format = std::make_shared<ParquetFileFormat>();
file_format->default_fragment_scan_options = std::move(parquet_scan_options);

// Get FileInfo objects for all files under the base directory
fs::FileSelector selector;
selector.base_dir = kBaseDir;
selector.recursive = true;

FileSystemFactoryOptions factory_options;
factory_options.partitioning = partitioning_;
factory_options.partition_base_dir = kBaseDir;
ASSERT_OK_AND_ASSIGN(auto dataset_factory,
FileSystemDatasetFactory::Make(file_system_, selector, file_format,
factory_options));

// Read dataset into table
ASSERT_OK_AND_ASSIGN(auto dataset, dataset_factory->Finish());
ASSERT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan());
ASSERT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish());
ASSERT_OK_AND_ASSIGN(auto read_table, scanner->ToTable());

// Verify the data was read correctly
ASSERT_OK_AND_ASSIGN(auto combined_table, read_table->CombineChunks());
// Validate the table
ASSERT_OK(combined_table->ValidateFull());
AssertTablesEqual(*combined_table, *table_);
}

// Read a single parquet file with and without decryption properties.
TEST_F(DatasetEncryptionTest, ReadSingleFile) {
// Open the Parquet file.
ASSERT_OK_AND_ASSIGN(auto input, file_system_->OpenInputFile("part=a/part0.parquet"));

// Try to read metadata without providing decryption properties
// when the footer is encrypted.
ASSERT_THROW(parquet::ReadMetaData(input), parquet::ParquetException);

// Create the ReaderProperties object using the FileDecryptionProperties object
auto decryption_config =
std::make_shared<parquet::encryption::DecryptionConfiguration>();
auto file_decryption_properties = crypto_factory_->GetFileDecryptionProperties(
*kms_connection_config_, *decryption_config);
auto reader_properties = parquet::default_reader_properties();
reader_properties.file_decryption_properties(file_decryption_properties);

// Read entire file as a single Arrow table
parquet::arrow::FileReaderBuilder reader_builder;
ASSERT_OK(reader_builder.Open(input, reader_properties));
ASSERT_OK_AND_ASSIGN(auto arrow_reader, reader_builder.Build());
std::shared_ptr<Table> table;
ASSERT_OK(arrow_reader->ReadTable(&table));

// Check the contents of the table
ASSERT_EQ(table->num_rows(), 2);
ASSERT_EQ(table->num_columns(), 3);
ASSERT_EQ(checked_pointer_cast<Int64Array>(table->column(0)->chunk(0))->GetView(0), 0);
ASSERT_EQ(checked_pointer_cast<Int64Array>(table->column(1)->chunk(0))->GetView(0), 9);
ASSERT_EQ(checked_pointer_cast<Int64Array>(table->column(2)->chunk(0))->GetView(0), 1);
}

} // namespace dataset
} // namespace arrow
Loading