From 11dbbbeba22e3b3b498c5073a8c6d93f057e99e1 Mon Sep 17 00:00:00 2001 From: Donald Tolley Date: Wed, 11 Oct 2023 07:06:58 -0400 Subject: [PATCH] GH-29238 [C++][Dataset][Parquet] Support parquet modular encryption in the new Dataset API (#34616) ### Rationale for this change The purpose of this pull request is to support modular encryption in the new Dataset API. See [https://docs.google.com/document/d/13EysCNC6-Nu9wnJ8YpdzmD-aMLn4i2KXUJTNqIihy7A/edit#](url) for supporting document. ### What changes are included in this PR? I made improvements to the C++ and Python code to enable the Dataset API to have per-file settings for each file saved. Previously, the Dataset API applied the same encryption properties to all saved files, but now I've updated the code to allow for greater flexibility. In the Python code, I've added support for the changes by updating the ParquetFormat class to accept DatasetEncryptionConfiguration and DatasetDecryptionConfiguration structures. With these changes, you can pass the format object to the write_dataset function, giving you the ability to set unique encryption properties for each file in your Dataset. ### Are these changes tested? Yes, unit tests are included. I have also included a python sample project. ### Are there any user-facing changes? Yes, as stated above the ParquetFormat class has optional parameters for DatasetEncryptionConfiguration and DatasetDecryptionConfiguration through setters and getters. The Dataset now has the option using this to set different file encryption properties per file * Closes: #29238 Lead-authored-by: Don Co-authored-by: Donald Tolley Co-authored-by: Joris Van den Bossche Co-authored-by: anjakefala Co-authored-by: Sutou Kouhei Co-authored-by: Weston Pace Co-authored-by: Gang Wu Co-authored-by: scoder Co-authored-by: Will Jones Signed-off-by: Joris Van den Bossche --- cpp/src/arrow/dataset/CMakeLists.txt | 7 + cpp/src/arrow/dataset/file_parquet.cc | 75 +++++- cpp/src/arrow/dataset/file_parquet.h | 8 + .../dataset/file_parquet_encryption_test.cc | 216 ++++++++++++++++++ cpp/src/arrow/dataset/file_parquet_test.cc | 27 +++ .../arrow/dataset/parquet_encryption_config.h | 75 ++++++ cpp/src/arrow/util/config.h.cmake | 1 + cpp/src/parquet/properties.h | 46 +++- python/CMakeLists.txt | 6 + .../dataset/write_dataset_encrypted.py | 93 ++++++++ python/pyarrow/_dataset_parquet.pxd | 42 ++++ python/pyarrow/_dataset_parquet.pyx | 63 ++++- .../pyarrow/_dataset_parquet_encryption.pyx | 170 ++++++++++++++ python/pyarrow/_parquet_encryption.pxd | 119 ++-------- python/pyarrow/_parquet_encryption.pyx | 42 ++-- python/pyarrow/dataset.py | 9 + .../includes/libarrow_dataset_parquet.pxd | 16 ++ .../includes/libparquet_encryption.pxd | 130 +++++++++++ .../pyarrow/tests/test_dataset_encryption.py | 148 ++++++++++++ 19 files changed, 1152 insertions(+), 141 deletions(-) create mode 100644 cpp/src/arrow/dataset/file_parquet_encryption_test.cc create mode 100644 cpp/src/arrow/dataset/parquet_encryption_config.h create mode 100644 python/examples/dataset/write_dataset_encrypted.py create mode 100644 python/pyarrow/_dataset_parquet.pxd create mode 100644 python/pyarrow/_dataset_parquet_encryption.pyx create mode 100644 python/pyarrow/includes/libparquet_encryption.pxd create mode 100644 python/pyarrow/tests/test_dataset_encryption.py diff --git a/cpp/src/arrow/dataset/CMakeLists.txt b/cpp/src/arrow/dataset/CMakeLists.txt index 81cc97781316a..eb8fb54803aa9 100644 --- a/cpp/src/arrow/dataset/CMakeLists.txt +++ b/cpp/src/arrow/dataset/CMakeLists.txt @@ -175,6 +175,13 @@ endif() if(ARROW_PARQUET) add_arrow_dataset_test(file_parquet_test) + if(PARQUET_REQUIRE_ENCRYPTION AND ARROW_DATASET) + add_arrow_dataset_test(file_parquet_encryption_test + SOURCES + file_parquet_encryption_test.cc + ${PROJECT_SOURCE_DIR}/src/parquet/encryption/test_in_memory_kms.cc + ) + endif() endif() if(ARROW_BUILD_BENCHMARKS) diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index 9c187f12f67cf..d486f194f38a3 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -26,6 +26,7 @@ #include "arrow/compute/exec.h" #include "arrow/dataset/dataset_internal.h" +#include "arrow/dataset/parquet_encryption_config.h" #include "arrow/dataset/scanner.h" #include "arrow/filesystem/path_util.h" #include "arrow/table.h" @@ -38,6 +39,9 @@ #include "parquet/arrow/reader.h" #include "parquet/arrow/schema.h" #include "parquet/arrow/writer.h" +#include "parquet/encryption/crypto_factory.h" +#include "parquet/encryption/encryption.h" +#include "parquet/encryption/kms_client.h" #include "parquet/file_reader.h" #include "parquet/properties.h" #include "parquet/statistics.h" @@ -58,6 +62,7 @@ namespace { parquet::ReaderProperties MakeReaderProperties( const ParquetFileFormat& format, ParquetFragmentScanOptions* parquet_scan_options, + const std::string& path = "", std::shared_ptr filesystem = nullptr, MemoryPool* pool = default_memory_pool()) { // Can't mutate pool after construction parquet::ReaderProperties properties(pool); @@ -67,8 +72,28 @@ parquet::ReaderProperties MakeReaderProperties( properties.disable_buffered_stream(); } properties.set_buffer_size(parquet_scan_options->reader_properties->buffer_size()); + +#ifdef PARQUET_REQUIRE_ENCRYPTION + auto parquet_decrypt_config = parquet_scan_options->parquet_decryption_config; + + if (parquet_decrypt_config != nullptr) { + auto file_decryption_prop = + parquet_decrypt_config->crypto_factory->GetFileDecryptionProperties( + *parquet_decrypt_config->kms_connection_config, + *parquet_decrypt_config->decryption_config, path, filesystem); + + parquet_scan_options->reader_properties->file_decryption_properties( + std::move(file_decryption_prop)); + } +#else + if (parquet_scan_options->parquet_decryption_config != nullptr) { + parquet::ParquetException::NYI("Encryption is not supported in this build."); + } +#endif + properties.file_decryption_properties( parquet_scan_options->reader_properties->file_decryption_properties()); + properties.set_thrift_string_size_limit( parquet_scan_options->reader_properties->thrift_string_size_limit()); properties.set_thrift_container_size_limit( @@ -438,7 +463,7 @@ Result> ParquetFileFormat::GetReader GetFragmentScanOptions(kParquetTypeName, options.get(), default_fragment_scan_options)); auto properties = - MakeReaderProperties(*this, parquet_scan_options.get(), options->pool); + MakeReaderProperties(*this, parquet_scan_options.get(), "", nullptr, options->pool); ARROW_ASSIGN_OR_RAISE(auto input, source.Open()); // `parquet::ParquetFileReader::Open` will not wrap the exception as status, // so using `open_parquet_file` to wrap it. @@ -477,9 +502,13 @@ Future> ParquetFileFormat::GetReader auto parquet_scan_options, GetFragmentScanOptions(kParquetTypeName, options.get(), default_fragment_scan_options)); - auto properties = - MakeReaderProperties(*this, parquet_scan_options.get(), options->pool); - + auto properties = MakeReaderProperties(*this, parquet_scan_options.get(), source.path(), + source.filesystem(), options->pool); + ARROW_ASSIGN_OR_RAISE(auto input, source.Open()); + // TODO(ARROW-12259): workaround since we have Future<(move-only type)> + auto reader_fut = parquet::ParquetFileReader::OpenAsync( + std::move(input), std::move(properties), metadata); + auto path = source.path(); auto self = checked_pointer_cast(shared_from_this()); return source.OpenAsync().Then( @@ -666,10 +695,40 @@ Result> ParquetFileFormat::MakeWriter( auto parquet_options = checked_pointer_cast(options); std::unique_ptr parquet_writer; - ARROW_ASSIGN_OR_RAISE(parquet_writer, parquet::arrow::FileWriter::Open( - *schema, default_memory_pool(), destination, - parquet_options->writer_properties, - parquet_options->arrow_writer_properties)); + +#ifdef PARQUET_REQUIRE_ENCRYPTION + auto parquet_encrypt_config = parquet_options->parquet_encryption_config; + + if (parquet_encrypt_config != nullptr) { + auto file_encryption_prop = + parquet_encrypt_config->crypto_factory->GetFileEncryptionProperties( + *parquet_encrypt_config->kms_connection_config, + *parquet_encrypt_config->encryption_config, destination_locator.path, + destination_locator.filesystem); + + auto writer_properties = + parquet::WriterProperties::Builder(*parquet_options->writer_properties) + .encryption(std::move(file_encryption_prop)) + ->build(); + + ARROW_ASSIGN_OR_RAISE( + parquet_writer, parquet::arrow::FileWriter::Open( + *schema, writer_properties->memory_pool(), destination, + writer_properties, parquet_options->arrow_writer_properties)); + } +#else + if (parquet_options->parquet_encryption_config != nullptr) { + return Status::NotImplemented("Encryption is not supported in this build."); + } +#endif + + if (parquet_writer == nullptr) { + ARROW_ASSIGN_OR_RAISE(parquet_writer, + parquet::arrow::FileWriter::Open( + *schema, parquet_options->writer_properties->memory_pool(), + destination, parquet_options->writer_properties, + parquet_options->arrow_writer_properties)); + } return std::shared_ptr( new ParquetFileWriter(std::move(destination), std::move(parquet_writer), diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h index f33190bd93347..5132a805bb4d6 100644 --- a/cpp/src/arrow/dataset/file_parquet.h +++ b/cpp/src/arrow/dataset/file_parquet.h @@ -57,6 +57,9 @@ struct SchemaManifest; namespace arrow { namespace dataset { +struct ParquetDecryptionConfig; +struct ParquetEncryptionConfig; + /// \addtogroup dataset-file-formats /// /// @{ @@ -226,6 +229,8 @@ class ARROW_DS_EXPORT ParquetFragmentScanOptions : public FragmentScanOptions { /// ScanOptions. Additionally, dictionary columns come from /// ParquetFileFormat::ReaderOptions::dict_columns. std::shared_ptr arrow_reader_properties; + /// A configuration structure that provides decryption properties for a dataset + std::shared_ptr parquet_decryption_config = NULLPTR; }; class ARROW_DS_EXPORT ParquetFileWriteOptions : public FileWriteOptions { @@ -236,6 +241,9 @@ class ARROW_DS_EXPORT ParquetFileWriteOptions : public FileWriteOptions { /// \brief Parquet Arrow writer properties. std::shared_ptr arrow_writer_properties; + // A configuration structure that provides encryption properties for a dataset + std::shared_ptr parquet_encryption_config = NULLPTR; + protected: explicit ParquetFileWriteOptions(std::shared_ptr format) : FileWriteOptions(std::move(format)) {} diff --git a/cpp/src/arrow/dataset/file_parquet_encryption_test.cc b/cpp/src/arrow/dataset/file_parquet_encryption_test.cc new file mode 100644 index 0000000000000..572f15e100dcb --- /dev/null +++ b/cpp/src/arrow/dataset/file_parquet_encryption_test.cc @@ -0,0 +1,216 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include "gtest/gtest.h" + +#include +#include +#include +#include "arrow/array.h" +#include "arrow/dataset/parquet_encryption_config.h" +#include "arrow/dataset/partition.h" +#include "arrow/filesystem/mockfs.h" +#include "arrow/io/api.h" +#include "arrow/status.h" +#include "arrow/table.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/type.h" +#include "parquet/arrow/reader.h" +#include "parquet/encryption/crypto_factory.h" +#include "parquet/encryption/encryption.h" +#include "parquet/encryption/kms_client.h" +#include "parquet/encryption/test_in_memory_kms.h" + +constexpr std::string_view kFooterKeyMasterKey = "0123456789012345"; +constexpr std::string_view kFooterKeyMasterKeyId = "footer_key"; +constexpr std::string_view kFooterKeyName = "footer_key"; +constexpr std::string_view kColumnMasterKey = "1234567890123450"; +constexpr std::string_view kColumnMasterKeyId = "col_key"; +constexpr std::string_view kColumnKeyMapping = "col_key: a"; +constexpr std::string_view kBaseDir = ""; + +using arrow::internal::checked_pointer_cast; + +namespace arrow { +namespace dataset { + +class DatasetEncryptionTest : public ::testing::Test { + public: + // This function creates a mock file system using the current time point, creates a + // directory with the given base directory path, and writes a dataset to it using + // provided Parquet file write options. The dataset is partitioned using a Hive + // partitioning scheme. The function also checks if the written files exist in the file + // system. + static void SetUpTestSuite() { + // Creates a mock file system using the current time point. + EXPECT_OK_AND_ASSIGN(file_system_, fs::internal::MockFileSystem::Make( + std::chrono::system_clock::now(), {})); + ASSERT_OK(file_system_->CreateDir(std::string(kBaseDir))); + + // Prepare table data. + auto table_schema = schema({field("a", int64()), field("c", int64()), + field("e", int64()), field("part", utf8())}); + table_ = TableFromJSON(table_schema, {R"([ + [ 0, 9, 1, "a" ], + [ 1, 8, 2, "a" ], + [ 2, 7, 1, "c" ], + [ 3, 6, 2, "c" ], + [ 4, 5, 1, "e" ], + [ 5, 4, 2, "e" ], + [ 6, 3, 1, "g" ], + [ 7, 2, 2, "g" ], + [ 8, 1, 1, "i" ], + [ 9, 0, 2, "i" ] + ])"}); + + // Use a Hive-style partitioning scheme. + partitioning_ = std::make_shared(schema({field("part", utf8())})); + + // Prepare encryption properties. + std::unordered_map key_map; + key_map.emplace(kColumnMasterKeyId, kColumnMasterKey); + key_map.emplace(kFooterKeyMasterKeyId, kFooterKeyMasterKey); + + crypto_factory_ = std::make_shared(); + auto kms_client_factory = + std::make_shared( + /*wrap_locally=*/true, key_map); + crypto_factory_->RegisterKmsClientFactory(std::move(kms_client_factory)); + kms_connection_config_ = std::make_shared(); + + // Set write options with encryption configuration. + auto encryption_config = + std::make_shared( + std::string(kFooterKeyName)); + encryption_config->column_keys = kColumnKeyMapping; + auto parquet_encryption_config = std::make_shared(); + // Directly assign shared_ptr objects to ParquetEncryptionConfig members + parquet_encryption_config->crypto_factory = crypto_factory_; + parquet_encryption_config->kms_connection_config = kms_connection_config_; + parquet_encryption_config->encryption_config = std::move(encryption_config); + + auto file_format = std::make_shared(); + auto parquet_file_write_options = + checked_pointer_cast(file_format->DefaultWriteOptions()); + parquet_file_write_options->parquet_encryption_config = + std::move(parquet_encryption_config); + + // Write dataset. + auto dataset = std::make_shared(table_); + EXPECT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan()); + EXPECT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish()); + + FileSystemDatasetWriteOptions write_options; + write_options.file_write_options = parquet_file_write_options; + write_options.filesystem = file_system_; + write_options.base_dir = kBaseDir; + write_options.partitioning = partitioning_; + write_options.basename_template = "part{i}.parquet"; + ASSERT_OK(FileSystemDataset::Write(write_options, std::move(scanner))); + } + + protected: + inline static std::shared_ptr file_system_; + inline static std::shared_ptr table_; + inline static std::shared_ptr partitioning_; + inline static std::shared_ptr crypto_factory_; + inline static std::shared_ptr + kms_connection_config_; +}; + +// This test demonstrates the process of writing a partitioned Parquet file with the same +// encryption properties applied to each file within the dataset. The encryption +// properties are determined based on the selected columns. After writing the dataset, the +// test reads the data back and verifies that it can be successfully decrypted and +// scanned. +TEST_F(DatasetEncryptionTest, WriteReadDatasetWithEncryption) { + // Create decryption properties. + auto decryption_config = + std::make_shared(); + auto parquet_decryption_config = std::make_shared(); + parquet_decryption_config->crypto_factory = crypto_factory_; + parquet_decryption_config->kms_connection_config = kms_connection_config_; + parquet_decryption_config->decryption_config = std::move(decryption_config); + + // Set scan options. + auto parquet_scan_options = std::make_shared(); + parquet_scan_options->parquet_decryption_config = std::move(parquet_decryption_config); + + auto file_format = std::make_shared(); + file_format->default_fragment_scan_options = std::move(parquet_scan_options); + + // Get FileInfo objects for all files under the base directory + fs::FileSelector selector; + selector.base_dir = kBaseDir; + selector.recursive = true; + + FileSystemFactoryOptions factory_options; + factory_options.partitioning = partitioning_; + factory_options.partition_base_dir = kBaseDir; + ASSERT_OK_AND_ASSIGN(auto dataset_factory, + FileSystemDatasetFactory::Make(file_system_, selector, file_format, + factory_options)); + + // Read dataset into table + ASSERT_OK_AND_ASSIGN(auto dataset, dataset_factory->Finish()); + ASSERT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan()); + ASSERT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish()); + ASSERT_OK_AND_ASSIGN(auto read_table, scanner->ToTable()); + + // Verify the data was read correctly + ASSERT_OK_AND_ASSIGN(auto combined_table, read_table->CombineChunks()); + // Validate the table + ASSERT_OK(combined_table->ValidateFull()); + AssertTablesEqual(*combined_table, *table_); +} + +// Read a single parquet file with and without decryption properties. +TEST_F(DatasetEncryptionTest, ReadSingleFile) { + // Open the Parquet file. + ASSERT_OK_AND_ASSIGN(auto input, file_system_->OpenInputFile("part=a/part0.parquet")); + + // Try to read metadata without providing decryption properties + // when the footer is encrypted. + ASSERT_THROW(parquet::ReadMetaData(input), parquet::ParquetException); + + // Create the ReaderProperties object using the FileDecryptionProperties object + auto decryption_config = + std::make_shared(); + auto file_decryption_properties = crypto_factory_->GetFileDecryptionProperties( + *kms_connection_config_, *decryption_config); + auto reader_properties = parquet::default_reader_properties(); + reader_properties.file_decryption_properties(file_decryption_properties); + + // Read entire file as a single Arrow table + parquet::arrow::FileReaderBuilder reader_builder; + ASSERT_OK(reader_builder.Open(input, reader_properties)); + ASSERT_OK_AND_ASSIGN(auto arrow_reader, reader_builder.Build()); + std::shared_ptr
table; + ASSERT_OK(arrow_reader->ReadTable(&table)); + + // Check the contents of the table + ASSERT_EQ(table->num_rows(), 2); + ASSERT_EQ(table->num_columns(), 3); + ASSERT_EQ(checked_pointer_cast(table->column(0)->chunk(0))->GetView(0), 0); + ASSERT_EQ(checked_pointer_cast(table->column(1)->chunk(0))->GetView(0), 9); + ASSERT_EQ(checked_pointer_cast(table->column(2)->chunk(0))->GetView(0), 1); +} + +} // namespace dataset +} // namespace arrow diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc b/cpp/src/arrow/dataset/file_parquet_test.cc index dc9e085df3c4c..c22cf33eb35f7 100644 --- a/cpp/src/arrow/dataset/file_parquet_test.cc +++ b/cpp/src/arrow/dataset/file_parquet_test.cc @@ -24,6 +24,7 @@ #include "arrow/compute/api_scalar.h" #include "arrow/dataset/dataset_internal.h" +#include "arrow/dataset/parquet_encryption_config.h" #include "arrow/dataset/test_util_internal.h" #include "arrow/io/interfaces.h" #include "arrow/io/memory.h" @@ -424,6 +425,32 @@ TEST_F(TestParquetFileSystemDataset, WriteWithEmptyPartitioningSchema) { TestWriteWithEmptyPartitioningSchema(); } +TEST_F(TestParquetFileSystemDataset, WriteWithEncryptionConfigNotSupported) { +#ifndef PARQUET_REQUIRE_ENCRYPTION + // Create a dummy ParquetEncryptionConfig + std::shared_ptr encryption_config = + std::make_shared(); + + auto options = + checked_pointer_cast(format_->DefaultWriteOptions()); + + // Set the encryption config in the options + options->parquet_encryption_config = encryption_config; + + // Setup mock filesystem and test data + auto mock_fs = std::make_shared(fs::kNoTime); + std::shared_ptr test_schema = schema({field("x", int32())}); + std::shared_ptr batch = RecordBatchFromJSON(test_schema, "[[0]]"); + ASSERT_OK_AND_ASSIGN(std::shared_ptr out_stream, + mock_fs->OpenOutputStream("/foo.parquet")); + // Try to create a writer with the encryption config + auto result = + format_->MakeWriter(out_stream, test_schema, options, {mock_fs, "/foo.parquet"}); + // Expect an error if encryption is not supported in the build + EXPECT_TRUE(result.status().IsNotImplemented()); +#endif +} + class TestParquetFileFormatScan : public FileFormatScanMixin { public: std::shared_ptr SingleBatch(std::shared_ptr fragment) { diff --git a/cpp/src/arrow/dataset/parquet_encryption_config.h b/cpp/src/arrow/dataset/parquet_encryption_config.h new file mode 100644 index 0000000000000..96200b8a3118b --- /dev/null +++ b/cpp/src/arrow/dataset/parquet_encryption_config.h @@ -0,0 +1,75 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "arrow/dataset/type_fwd.h" + +namespace parquet::encryption { +class CryptoFactory; +struct KmsConnectionConfig; +struct EncryptionConfiguration; +struct DecryptionConfiguration; +} // namespace parquet::encryption + +namespace arrow { +namespace dataset { + +/// \brief Core configuration class encapsulating parameters for high-level encryption +/// within Parquet framework. +/// +/// ParquetEncryptionConfig serves as a bridge, passing encryption-related +/// parameters to appropriate components within the Parquet library. It holds references +/// to objects defining encryption strategy, Key Management Service (KMS) configuration, +/// and specific encryption configurations for Parquet data. +struct ARROW_DS_EXPORT ParquetEncryptionConfig { + /// Shared pointer to CryptoFactory object, responsible for creating cryptographic + /// components like encryptors and decryptors. + std::shared_ptr crypto_factory; + + /// Shared pointer to KmsConnectionConfig object, holding configuration parameters for + /// connecting to a Key Management Service (KMS). + std::shared_ptr kms_connection_config; + + /// Shared pointer to EncryptionConfiguration object, defining specific encryption + /// settings for Parquet data, like keys for different columns. + std::shared_ptr encryption_config; +}; + +/// \brief Core configuration class encapsulating parameters for high-level decryption +/// within Parquet framework. +/// +/// ParquetDecryptionConfig is designed to pass decryption-related parameters to +/// appropriate decryption components within Parquet library. It holds references to +/// objects defining decryption strategy, Key Management Service (KMS) configuration, +/// and specific decryption configurations for reading encrypted Parquet data. +struct ARROW_DS_EXPORT ParquetDecryptionConfig { + /// Shared pointer to CryptoFactory object, pivotal in creating cryptographic + /// components for decryption process. + std::shared_ptr crypto_factory; + + /// Shared pointer to KmsConnectionConfig object, containing parameters for connecting + /// to a Key Management Service (KMS) during decryption. + std::shared_ptr kms_connection_config; + + /// Shared pointer to DecryptionConfiguration object, specifying decryption settings + /// for reading encrypted Parquet data. + std::shared_ptr decryption_config; +}; + +} // namespace dataset +} // namespace arrow diff --git a/cpp/src/arrow/util/config.h.cmake b/cpp/src/arrow/util/config.h.cmake index f7125cfd8a235..6c8c31ffb856f 100644 --- a/cpp/src/arrow/util/config.h.cmake +++ b/cpp/src/arrow/util/config.h.cmake @@ -58,3 +58,4 @@ #cmakedefine ARROW_WITH_MUSL #cmakedefine ARROW_WITH_OPENTELEMETRY #cmakedefine ARROW_WITH_UCX +#cmakedefine PARQUET_REQUIRE_ENCRYPTION diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index c15ada0a8060f..6d3d5aa4f44b4 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -232,6 +232,21 @@ class PARQUET_EXPORT WriterProperties { created_by_(DEFAULT_CREATED_BY), store_decimal_as_integer_(false), page_checksum_enabled_(false) {} + + explicit Builder(const WriterProperties& properties) + : pool_(properties.memory_pool()), + dictionary_pagesize_limit_(properties.dictionary_pagesize_limit()), + write_batch_size_(properties.write_batch_size()), + max_row_group_length_(properties.max_row_group_length()), + pagesize_(properties.data_pagesize()), + version_(properties.version()), + data_page_version_(properties.data_page_version()), + created_by_(properties.created_by()), + store_decimal_as_integer_(properties.store_decimal_as_integer()), + page_checksum_enabled_(properties.page_checksum_enabled()), + sorting_columns_(properties.sorting_columns()), + default_column_properties_(properties.default_column_properties()) {} + virtual ~Builder() {} /// Specify the memory pool for the writer. Default default_memory_pool. @@ -240,36 +255,42 @@ class PARQUET_EXPORT WriterProperties { return this; } - /// Enable dictionary encoding in general for all columns. Default enabled. + /// Enable dictionary encoding in general for all columns. Default + /// enabled. Builder* enable_dictionary() { default_column_properties_.set_dictionary_enabled(true); return this; } - /// Disable dictionary encoding in general for all columns. Default enabled. + /// Disable dictionary encoding in general for all columns. Default + /// enabled. Builder* disable_dictionary() { default_column_properties_.set_dictionary_enabled(false); return this; } - /// Enable dictionary encoding for column specified by `path`. Default enabled. + /// Enable dictionary encoding for column specified by `path`. Default + /// enabled. Builder* enable_dictionary(const std::string& path) { dictionary_enabled_[path] = true; return this; } - /// Enable dictionary encoding for column specified by `path`. Default enabled. + /// Enable dictionary encoding for column specified by `path`. Default + /// enabled. Builder* enable_dictionary(const std::shared_ptr& path) { return this->enable_dictionary(path->ToDotString()); } - /// Disable dictionary encoding for column specified by `path`. Default enabled. + /// Disable dictionary encoding for column specified by `path`. Default + /// enabled. Builder* disable_dictionary(const std::string& path) { dictionary_enabled_[path] = false; return this; } - /// Disable dictionary encoding for column specified by `path`. Default enabled. + /// Disable dictionary encoding for column specified by `path`. Default + /// enabled. Builder* disable_dictionary(const std::shared_ptr& path) { return this->disable_dictionary(path->ToDotString()); } @@ -280,8 +301,8 @@ class PARQUET_EXPORT WriterProperties { return this; } - /// Specify the write batch size while writing batches of Arrow values into Parquet. - /// Default 1024. + /// Specify the write batch size while writing batches of Arrow values + /// into Parquet. Default 1024. Builder* write_batch_size(int64_t write_batch_size) { write_batch_size_ = write_batch_size; return this; @@ -560,8 +581,8 @@ class PARQUET_EXPORT WriterProperties { return this; } - /// Disable decimal logical type with 1 <= precision <= 18 to be stored as - /// integer physical type. + /// Disable decimal logical type with 1 <= precision <= 18 to be stored + /// as integer physical type. /// /// Default disabled. Builder* disable_store_decimal_as_integer() { @@ -775,6 +796,11 @@ class PARQUET_EXPORT WriterProperties { } } + // \brief Return the default column properties + const ColumnProperties& default_column_properties() const { + return default_column_properties_; + } + private: explicit WriterProperties( MemoryPool* pool, int64_t dictionary_pagesize_limit, int64_t write_batch_size, diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt index 29f8d2da72f3a..2a430055a5a86 100644 --- a/python/CMakeLists.txt +++ b/python/CMakeLists.txt @@ -340,9 +340,12 @@ if(PYARROW_BUILD_PARQUET_ENCRYPTION) else() list(APPEND PYARROW_CPP_LINK_LIBS Parquet::parquet_static) endif() + message(STATUS "Parquet Encryption Enabled") else() message(FATAL_ERROR "You must build Arrow C++ with PARQUET_REQUIRE_ENCRYPTION=ON") endif() +else() + message(STATUS "Parquet Encryption is NOT Enabled") endif() if(PYARROW_BUILD_HDFS) @@ -627,6 +630,9 @@ if(PYARROW_BUILD_PARQUET) endif() if(PYARROW_BUILD_DATASET) list(APPEND CYTHON_EXTENSIONS _dataset_parquet) + if(PYARROW_BUILD_PARQUET_ENCRYPTION) + list(APPEND CYTHON_EXTENSIONS _dataset_parquet_encryption) + endif() endif() endif() diff --git a/python/examples/dataset/write_dataset_encrypted.py b/python/examples/dataset/write_dataset_encrypted.py new file mode 100644 index 0000000000000..910559939e65e --- /dev/null +++ b/python/examples/dataset/write_dataset_encrypted.py @@ -0,0 +1,93 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import shutil +import os +from datetime import timedelta + +import pyarrow as pa +import pyarrow.dataset as ds +import pyarrow.parquet.encryption as pe +from pyarrow.tests.parquet.encryption import InMemoryKmsClient + +""" A sample to demonstrate parquet dataset encryption and decryption""" + +# create a list of dictionaries that will represent our dataset +table = pa.table({'year': [2020, 2022, 2021, 2022, 2019, 2021], + 'n_legs': [2, 2, 4, 4, 5, 100], + 'animal': ["Flamingo", "Parrot", "Dog", "Horse", + "Brittle stars", "Centipede"]}) + +# create a PyArrow dataset from the table +dataset = ds.dataset(table) + +FOOTER_KEY = b"0123456789112345" +FOOTER_KEY_NAME = "footer_key" +COL_KEY = b"1234567890123450" +COL_KEY_NAME = "col_key" + +encryption_config = pe.EncryptionConfiguration( + footer_key=FOOTER_KEY_NAME, + plaintext_footer=False, + # Use COL_KEY_NAME to encrypt `n_legs` and `animal` columns. + column_keys={ + COL_KEY_NAME: ["n_legs", "animal"], + }, + encryption_algorithm="AES_GCM_V1", + # requires timedelta or an assertion is raised + cache_lifetime=timedelta(minutes=5.0), + data_key_length_bits=256) + +kms_connection_config = pe.KmsConnectionConfig( + custom_kms_conf={ + FOOTER_KEY_NAME: FOOTER_KEY.decode("UTF-8"), + COL_KEY_NAME: COL_KEY.decode("UTF-8"), + } +) + +decryption_config = pe.DecryptionConfiguration(cache_lifetime=300) + + +def kms_factory(kms_connection_configuration): + return InMemoryKmsClient(kms_connection_configuration) + + +crypto_factory = pe.CryptoFactory(kms_factory) +parquet_encryption_cfg = ds.ParquetEncryptionConfig( + crypto_factory, kms_connection_config, encryption_config) +parquet_decryption_cfg = ds.ParquetDecryptionConfig(crypto_factory, + kms_connection_config, + decryption_config) + +# set encryption config for parquet fragment scan options +pq_scan_opts = ds.ParquetFragmentScanOptions() +pq_scan_opts.parquet_decryption_config = parquet_decryption_cfg +pformat = pa.dataset.ParquetFileFormat(default_fragment_scan_options=pq_scan_opts) + +if os.path.exists('sample_dataset'): + shutil.rmtree('sample_dataset') + +write_options = pformat.make_write_options( + encryption_config=parquet_encryption_cfg) + +ds.write_dataset(data=dataset, base_dir="sample_dataset", + partitioning=['year'], format=pformat, file_options=write_options) +# read the dataset back +dataset = ds.dataset('sample_dataset', format=pformat) + +# print the dataset +print(dataset.to_table()) diff --git a/python/pyarrow/_dataset_parquet.pxd b/python/pyarrow/_dataset_parquet.pxd new file mode 100644 index 0000000000000..d5bc172d324d5 --- /dev/null +++ b/python/pyarrow/_dataset_parquet.pxd @@ -0,0 +1,42 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# cython: language_level = 3 + +"""Dataset support for Parquet file format.""" + +from pyarrow.includes.libarrow_dataset cimport * +from pyarrow.includes.libarrow_dataset_parquet cimport * + +from pyarrow._dataset cimport FragmentScanOptions, FileWriteOptions + + +cdef class ParquetFragmentScanOptions(FragmentScanOptions): + cdef: + CParquetFragmentScanOptions* parquet_options + object _parquet_decryption_config + + cdef void init(self, const shared_ptr[CFragmentScanOptions]& sp) + cdef CReaderProperties* reader_properties(self) + cdef ArrowReaderProperties* arrow_reader_properties(self) + + +cdef class ParquetFileWriteOptions(FileWriteOptions): + + cdef: + CParquetFileWriteOptions* parquet_options + object _properties diff --git a/python/pyarrow/_dataset_parquet.pyx b/python/pyarrow/_dataset_parquet.pyx index 9d85142564011..31aa058706a87 100644 --- a/python/pyarrow/_dataset_parquet.pyx +++ b/python/pyarrow/_dataset_parquet.pyx @@ -17,7 +17,7 @@ # cython: language_level = 3 -"""Dataset support for Parquest file format.""" +"""Dataset support for Parquet file format.""" from cython cimport binding from cython.operator cimport dereference as deref @@ -47,16 +47,23 @@ from pyarrow._dataset cimport ( WrittenFile ) - from pyarrow._parquet cimport ( _create_writer_properties, _create_arrow_writer_properties, FileMetaData, ) -cdef Expression _true = Expression._scalar(True) +try: + from pyarrow._dataset_parquet_encryption import ( + set_encryption_config, set_decryption_config + ) + parquet_encryption_enabled = True +except ImportError: + parquet_encryption_enabled = False +cdef Expression _true = Expression._scalar(True) + ctypedef CParquetFileWriter* _CParquetFileWriterPtr @@ -78,7 +85,8 @@ cdef class ParquetFileFormat(FileFormat): CParquetFileFormat* parquet_format def __init__(self, read_options=None, - default_fragment_scan_options=None, **kwargs): + default_fragment_scan_options=None, + **kwargs): cdef: shared_ptr[CParquetFileFormat] wrapped CParquetFileFormatReaderOptions* options @@ -130,6 +138,7 @@ cdef class ParquetFileFormat(FileFormat): 'ParquetFragmentScanOptions') wrapped = make_shared[CParquetFileFormat]() + options = &(wrapped.get().reader_options) if read_options.dictionary_columns is not None: for column in read_options.dictionary_columns: @@ -550,10 +559,6 @@ cdef class ParquetReadOptions(_Weakrefable): cdef class ParquetFileWriteOptions(FileWriteOptions): - cdef: - CParquetFileWriteOptions* parquet_options - object _properties - def update(self, **kwargs): """ Parameters @@ -574,6 +579,8 @@ cdef class ParquetFileWriteOptions(FileWriteOptions): self._properties[name] = value if name in arrow_fields: setters.add(self._set_arrow_properties) + elif name == "encryption_config" and value is not None: + setters.add(self._set_encryption_config) else: setters.add(self._set_properties) @@ -618,6 +625,14 @@ cdef class ParquetFileWriteOptions(FileWriteOptions): ) ) + def _set_encryption_config(self): + if not parquet_encryption_enabled: + raise NotImplementedError( + "Encryption is not enabled in your installation of pyarrow, but an " + "encryption_config was provided." + ) + set_encryption_config(self, self._properties["encryption_config"]) + cdef void init(self, const shared_ptr[CFileWriteOptions]& sp): FileWriteOptions.init(self, sp) self.parquet_options = sp.get() @@ -639,7 +654,9 @@ cdef class ParquetFileWriteOptions(FileWriteOptions): write_batch_size=None, dictionary_pagesize_limit=None, write_page_index=False, + encryption_config=None, ) + self._set_properties() self._set_arrow_properties() @@ -681,11 +698,11 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions): If not None, override the maximum total size of containers allocated when decoding Thrift structures. The default limit should be sufficient for most Parquet files. + decryption_config : pyarrow.dataset.ParquetDecryptionConfig, default None + If not None, use the provided ParquetDecryptionConfig to decrypt the + Parquet file. """ - cdef: - CParquetFragmentScanOptions* parquet_options - # Avoid mistakingly creating attributes __slots__ = () @@ -693,7 +710,8 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions): buffer_size=8192, bint pre_buffer=True, thrift_string_size_limit=None, - thrift_container_size_limit=None): + thrift_container_size_limit=None, + decryption_config=None): self.init(shared_ptr[CFragmentScanOptions]( new CParquetFragmentScanOptions())) self.use_buffered_stream = use_buffered_stream @@ -703,6 +721,8 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions): self.thrift_string_size_limit = thrift_string_size_limit if thrift_container_size_limit is not None: self.thrift_container_size_limit = thrift_container_size_limit + if decryption_config is not None: + self.parquet_decryption_config = decryption_config cdef void init(self, const shared_ptr[CFragmentScanOptions]& sp): FragmentScanOptions.init(self, sp) @@ -763,6 +783,25 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions): raise ValueError("size must be larger than zero") self.reader_properties().set_thrift_container_size_limit(size) + @property + def parquet_decryption_config(self): + if not parquet_encryption_enabled: + raise NotImplementedError( + "Unable to access encryption features. " + "Encryption is not enabled in your installation of pyarrow." + ) + return self._parquet_decryption_config + + @parquet_decryption_config.setter + def parquet_decryption_config(self, config): + if not parquet_encryption_enabled: + raise NotImplementedError( + "Encryption is not enabled in your installation of pyarrow, but a " + "decryption_config was provided." + ) + set_decryption_config(self, config) + self._parquet_decryption_config = config + def equals(self, ParquetFragmentScanOptions other): """ Parameters diff --git a/python/pyarrow/_dataset_parquet_encryption.pyx b/python/pyarrow/_dataset_parquet_encryption.pyx new file mode 100644 index 0000000000000..11a7174eb3c9d --- /dev/null +++ b/python/pyarrow/_dataset_parquet_encryption.pyx @@ -0,0 +1,170 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# cython: language_level = 3 + +"""Dataset support for Parquet encryption.""" + +from pyarrow.includes.libarrow_dataset_parquet cimport * +from pyarrow._parquet_encryption cimport * +from pyarrow._dataset_parquet cimport ParquetFragmentScanOptions, ParquetFileWriteOptions + + +cdef class ParquetEncryptionConfig(_Weakrefable): + """ + Core configuration class encapsulating parameters for high-level encryption + within the Parquet framework. + + The ParquetEncryptionConfig class serves as a bridge for passing encryption-related + parameters to the appropriate components within the Parquet library. It maintains references + to objects that define the encryption strategy, Key Management Service (KMS) configuration, + and specific encryption configurations for Parquet data. + + Parameters + ---------- + crypto_factory : pyarrow.parquet.encryption.CryptoFactory + Shared pointer to a `CryptoFactory` object. The `CryptoFactory` is responsible for + creating cryptographic components, such as encryptors and decryptors. + kms_connection_config : pyarrow.parquet.encryption.KmsConnectionConfig + Shared pointer to a `KmsConnectionConfig` object. This object holds the configuration + parameters necessary for connecting to a Key Management Service (KMS). + encryption_config : pyarrow.parquet.encryption.EncryptionConfiguration + Shared pointer to an `EncryptionConfiguration` object. This object defines specific + encryption settings for Parquet data, including the keys assigned to different columns. + + Raises + ------ + ValueError + Raised if `encryption_config` is None. + """ + cdef: + shared_ptr[CParquetEncryptionConfig] c_config + + # Avoid mistakenly creating attributes + __slots__ = () + + def __cinit__(self, CryptoFactory crypto_factory, KmsConnectionConfig kms_connection_config, + EncryptionConfiguration encryption_config): + + cdef shared_ptr[CEncryptionConfiguration] c_encryption_config + + if crypto_factory is None: + raise ValueError("crypto_factory cannot be None") + + if kms_connection_config is None: + raise ValueError("kms_connection_config cannot be None") + + if encryption_config is None: + raise ValueError("encryption_config cannot be None") + + self.c_config.reset(new CParquetEncryptionConfig()) + + c_encryption_config = pyarrow_unwrap_encryptionconfig( + encryption_config) + + self.c_config.get().crypto_factory = pyarrow_unwrap_cryptofactory(crypto_factory) + self.c_config.get().kms_connection_config = pyarrow_unwrap_kmsconnectionconfig( + kms_connection_config) + self.c_config.get().encryption_config = c_encryption_config + + @staticmethod + cdef wrap(shared_ptr[CParquetEncryptionConfig] c_config): + cdef ParquetEncryptionConfig python_config = ParquetEncryptionConfig.__new__(ParquetEncryptionConfig) + python_config.c_config = c_config + return python_config + + cdef shared_ptr[CParquetEncryptionConfig] unwrap(self): + return self.c_config + + +cdef class ParquetDecryptionConfig(_Weakrefable): + """ + Core configuration class encapsulating parameters for high-level decryption + within the Parquet framework. + + ParquetDecryptionConfig is designed to pass decryption-related parameters to + the appropriate decryption components within the Parquet library. It holds references to + objects that define the decryption strategy, Key Management Service (KMS) configuration, + and specific decryption configurations for reading encrypted Parquet data. + + Parameters + ---------- + crypto_factory : pyarrow.parquet.encryption.CryptoFactory + Shared pointer to a `CryptoFactory` object, pivotal in creating cryptographic + components for the decryption process. + kms_connection_config : pyarrow.parquet.encryption.KmsConnectionConfig + Shared pointer to a `KmsConnectionConfig` object, containing parameters necessary + for connecting to a Key Management Service (KMS) during decryption. + decryption_config : pyarrow.parquet.encryption.DecryptionConfiguration + Shared pointer to a `DecryptionConfiguration` object, specifying decryption settings + for reading encrypted Parquet data. + + Raises + ------ + ValueError + Raised if `decryption_config` is None. + """ + + cdef: + shared_ptr[CParquetDecryptionConfig] c_config + + # Avoid mistakingly creating attributes + __slots__ = () + + def __cinit__(self, CryptoFactory crypto_factory, KmsConnectionConfig kms_connection_config, + DecryptionConfiguration decryption_config): + + cdef shared_ptr[CDecryptionConfiguration] c_decryption_config + + if decryption_config is None: + raise ValueError( + "decryption_config cannot be None") + + self.c_config.reset(new CParquetDecryptionConfig()) + + c_decryption_config = pyarrow_unwrap_decryptionconfig( + decryption_config) + + self.c_config.get().crypto_factory = pyarrow_unwrap_cryptofactory(crypto_factory) + self.c_config.get().kms_connection_config = pyarrow_unwrap_kmsconnectionconfig( + kms_connection_config) + self.c_config.get().decryption_config = c_decryption_config + + @staticmethod + cdef wrap(shared_ptr[CParquetDecryptionConfig] c_config): + cdef ParquetDecryptionConfig python_config = ParquetDecryptionConfig.__new__(ParquetDecryptionConfig) + python_config.c_config = c_config + return python_config + + cdef shared_ptr[CParquetDecryptionConfig] unwrap(self): + return self.c_config + + +def set_encryption_config( + ParquetFileWriteOptions opts not None, + ParquetEncryptionConfig config not None +): + cdef shared_ptr[CParquetEncryptionConfig] c_config = config.unwrap() + opts.parquet_options.parquet_encryption_config = c_config + + +def set_decryption_config( + ParquetFragmentScanOptions opts not None, + ParquetDecryptionConfig config not None +): + cdef shared_ptr[CParquetDecryptionConfig] c_config = config.unwrap() + opts.parquet_options.parquet_decryption_config = c_config diff --git a/python/pyarrow/_parquet_encryption.pxd b/python/pyarrow/_parquet_encryption.pxd index a05312be533f2..d52669501a404 100644 --- a/python/pyarrow/_parquet_encryption.pxd +++ b/python/pyarrow/_parquet_encryption.pxd @@ -19,6 +19,7 @@ # cython: language_level = 3 from pyarrow.includes.common cimport * +from pyarrow.includes.libparquet_encryption cimport * from pyarrow._parquet cimport (ParquetCipher, CFileEncryptionProperties, CFileDecryptionProperties, @@ -26,108 +27,30 @@ from pyarrow._parquet cimport (ParquetCipher, FileDecryptionProperties, ParquetCipher_AES_GCM_V1, ParquetCipher_AES_GCM_CTR_V1) +from pyarrow.lib cimport _Weakrefable +cdef class CryptoFactory(_Weakrefable): + cdef shared_ptr[CPyCryptoFactory] factory + cdef init(self, callable_client_factory) + cdef inline shared_ptr[CPyCryptoFactory] unwrap(self) -cdef extern from "parquet/encryption/kms_client.h" \ - namespace "parquet::encryption" nogil: - cdef cppclass CKmsClient" parquet::encryption::KmsClient": - c_string WrapKey(const c_string& key_bytes, - const c_string& master_key_identifier) except + - c_string UnwrapKey(const c_string& wrapped_key, - const c_string& master_key_identifier) except + +cdef class EncryptionConfiguration(_Weakrefable): + cdef shared_ptr[CEncryptionConfiguration] configuration + cdef inline shared_ptr[CEncryptionConfiguration] unwrap(self) nogil - cdef cppclass CKeyAccessToken" parquet::encryption::KeyAccessToken": - CKeyAccessToken(const c_string value) - void Refresh(const c_string& new_value) - const c_string& value() const +cdef class DecryptionConfiguration(_Weakrefable): + cdef shared_ptr[CDecryptionConfiguration] configuration + cdef inline shared_ptr[CDecryptionConfiguration] unwrap(self) nogil - cdef cppclass CKmsConnectionConfig \ - " parquet::encryption::KmsConnectionConfig": - CKmsConnectionConfig() - c_string kms_instance_id - c_string kms_instance_url - shared_ptr[CKeyAccessToken] refreshable_key_access_token - unordered_map[c_string, c_string] custom_kms_conf +cdef class KmsConnectionConfig(_Weakrefable): + cdef shared_ptr[CKmsConnectionConfig] configuration + cdef inline shared_ptr[CKmsConnectionConfig] unwrap(self) nogil -# Callbacks for implementing Python kms clients -# Use typedef to emulate syntax for std::function -ctypedef void CallbackWrapKey( - object, const c_string&, const c_string&, c_string*) -ctypedef void CallbackUnwrapKey( - object, const c_string&, const c_string&, c_string*) + @staticmethod + cdef wrap(const CKmsConnectionConfig& config) -cdef extern from "parquet/encryption/kms_client_factory.h" \ - namespace "parquet::encryption" nogil: - cdef cppclass CKmsClientFactory" parquet::encryption::KmsClientFactory": - shared_ptr[CKmsClient] CreateKmsClient( - const CKmsConnectionConfig& kms_connection_config) except + -# Callbacks for implementing Python kms client factories -# Use typedef to emulate syntax for std::function -ctypedef void CallbackCreateKmsClient( - object, - const CKmsConnectionConfig&, shared_ptr[CKmsClient]*) - -cdef extern from "parquet/encryption/crypto_factory.h" \ - namespace "parquet::encryption" nogil: - cdef cppclass CEncryptionConfiguration\ - " parquet::encryption::EncryptionConfiguration": - CEncryptionConfiguration(const c_string& footer_key) except + - c_string footer_key - c_string column_keys - ParquetCipher encryption_algorithm - c_bool plaintext_footer - c_bool double_wrapping - double cache_lifetime_seconds - c_bool internal_key_material - int32_t data_key_length_bits - - cdef cppclass CDecryptionConfiguration\ - " parquet::encryption::DecryptionConfiguration": - CDecryptionConfiguration() except + - double cache_lifetime_seconds - - cdef cppclass CCryptoFactory" parquet::encryption::CryptoFactory": - void RegisterKmsClientFactory( - shared_ptr[CKmsClientFactory] kms_client_factory) except + - shared_ptr[CFileEncryptionProperties] GetFileEncryptionProperties( - const CKmsConnectionConfig& kms_connection_config, - const CEncryptionConfiguration& encryption_config) except +* - shared_ptr[CFileDecryptionProperties] GetFileDecryptionProperties( - const CKmsConnectionConfig& kms_connection_config, - const CDecryptionConfiguration& decryption_config) except +* - void RemoveCacheEntriesForToken(const c_string& access_token) except + - void RemoveCacheEntriesForAllTokens() except + - -cdef extern from "arrow/python/parquet_encryption.h" \ - namespace "arrow::py::parquet::encryption" nogil: - cdef cppclass CPyKmsClientVtable \ - " arrow::py::parquet::encryption::PyKmsClientVtable": - CPyKmsClientVtable() - function[CallbackWrapKey] wrap_key - function[CallbackUnwrapKey] unwrap_key - - cdef cppclass CPyKmsClient\ - " arrow::py::parquet::encryption::PyKmsClient"(CKmsClient): - CPyKmsClient(object handler, CPyKmsClientVtable vtable) - - cdef cppclass CPyKmsClientFactoryVtable\ - " arrow::py::parquet::encryption::PyKmsClientFactoryVtable": - CPyKmsClientFactoryVtable() - function[CallbackCreateKmsClient] create_kms_client - - cdef cppclass CPyKmsClientFactory\ - " arrow::py::parquet::encryption::PyKmsClientFactory"( - CKmsClientFactory): - CPyKmsClientFactory(object handler, CPyKmsClientFactoryVtable vtable) - - cdef cppclass CPyCryptoFactory\ - " arrow::py::parquet::encryption::PyCryptoFactory"(CCryptoFactory): - CResult[shared_ptr[CFileEncryptionProperties]] \ - SafeGetFileEncryptionProperties( - const CKmsConnectionConfig& kms_connection_config, - const CEncryptionConfiguration& encryption_config) - CResult[shared_ptr[CFileDecryptionProperties]] \ - SafeGetFileDecryptionProperties( - const CKmsConnectionConfig& kms_connection_config, - const CDecryptionConfiguration& decryption_config) +cdef shared_ptr[CCryptoFactory] pyarrow_unwrap_cryptofactory(object crypto_factory) except * +cdef shared_ptr[CKmsConnectionConfig] pyarrow_unwrap_kmsconnectionconfig(object kmsconnectionconfig) except * +cdef shared_ptr[CEncryptionConfiguration] pyarrow_unwrap_encryptionconfig(object encryptionconfig) except * +cdef shared_ptr[CDecryptionConfiguration] pyarrow_unwrap_decryptionconfig(object decryptionconfig) except * diff --git a/python/pyarrow/_parquet_encryption.pyx b/python/pyarrow/_parquet_encryption.pyx index db29a022118dd..d0a9a6612328c 100644 --- a/python/pyarrow/_parquet_encryption.pyx +++ b/python/pyarrow/_parquet_encryption.pyx @@ -21,10 +21,10 @@ from datetime import timedelta from cython.operator cimport dereference as deref +from libcpp.memory cimport shared_ptr from pyarrow.includes.common cimport * from pyarrow.includes.libarrow cimport * from pyarrow.lib cimport _Weakrefable - from pyarrow.lib import tobytes, frombytes @@ -48,9 +48,6 @@ cdef cipher_to_name(ParquetCipher cipher): cdef class EncryptionConfiguration(_Weakrefable): """Configuration of the encryption, such as which columns to encrypt""" - cdef: - shared_ptr[CEncryptionConfiguration] configuration - # Avoid mistakingly creating attributes __slots__ = () @@ -180,9 +177,6 @@ cdef class EncryptionConfiguration(_Weakrefable): cdef class DecryptionConfiguration(_Weakrefable): """Configuration of the decryption, such as cache timeout.""" - cdef: - shared_ptr[CDecryptionConfiguration] configuration - # Avoid mistakingly creating attributes __slots__ = () @@ -206,9 +200,6 @@ cdef class DecryptionConfiguration(_Weakrefable): cdef class KmsConnectionConfig(_Weakrefable): """Configuration of the connection to the Key Management Service (KMS)""" - cdef: - shared_ptr[CKmsConnectionConfig] configuration - # Avoid mistakingly creating attributes __slots__ = () @@ -358,9 +349,6 @@ cdef void _cb_create_kms_client( cdef class CryptoFactory(_Weakrefable): """ A factory that produces the low-level FileEncryptionProperties and FileDecryptionProperties objects, from the high-level parameters.""" - cdef: - unique_ptr[CPyCryptoFactory] factory - # Avoid mistakingly creating attributes __slots__ = () @@ -466,3 +454,31 @@ cdef class CryptoFactory(_Weakrefable): def remove_cache_entries_for_all_tokens(self): self.factory.get().RemoveCacheEntriesForAllTokens() + + cdef inline shared_ptr[CPyCryptoFactory] unwrap(self): + return self.factory + + +cdef shared_ptr[CCryptoFactory] pyarrow_unwrap_cryptofactory(object crypto_factory) except *: + if isinstance(crypto_factory, CryptoFactory): + pycf = ( crypto_factory).unwrap() + return static_pointer_cast[CCryptoFactory, CPyCryptoFactory](pycf) + raise TypeError("Expected CryptoFactory, got %s" % type(crypto_factory)) + + +cdef shared_ptr[CKmsConnectionConfig] pyarrow_unwrap_kmsconnectionconfig(object kmsconnectionconfig) except *: + if isinstance(kmsconnectionconfig, KmsConnectionConfig): + return ( kmsconnectionconfig).unwrap() + raise TypeError("Expected KmsConnectionConfig, got %s" % type(kmsconnectionconfig)) + + +cdef shared_ptr[CEncryptionConfiguration] pyarrow_unwrap_encryptionconfig(object encryptionconfig) except *: + if isinstance(encryptionconfig, EncryptionConfiguration): + return ( encryptionconfig).unwrap() + raise TypeError("Expected EncryptionConfiguration, got %s" % type(encryptionconfig)) + + +cdef shared_ptr[CDecryptionConfiguration] pyarrow_unwrap_decryptionconfig(object decryptionconfig) except *: + if isinstance(decryptionconfig, DecryptionConfiguration): + return ( decryptionconfig).unwrap() + raise TypeError("Expected DecryptionConfiguration, got %s" % type(decryptionconfig)) diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 435137c7f2993..adf21814a2c99 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -97,6 +97,15 @@ pass +try: + from pyarrow._dataset_parquet_encryption import ( # noqa + ParquetDecryptionConfig, + ParquetEncryptionConfig, + ) +except ImportError: + pass + + def __getattr__(name): if name == "OrcFileFormat" and not _orc_available: raise ImportError(_orc_msg) diff --git a/python/pyarrow/includes/libarrow_dataset_parquet.pxd b/python/pyarrow/includes/libarrow_dataset_parquet.pxd index cc753d66a94f7..e5389b3135faf 100644 --- a/python/pyarrow/includes/libarrow_dataset_parquet.pxd +++ b/python/pyarrow/includes/libarrow_dataset_parquet.pxd @@ -18,9 +18,23 @@ # distutils: language = c++ from pyarrow.includes.libarrow_dataset cimport * +from pyarrow.includes.libparquet_encryption cimport * + from pyarrow._parquet cimport * +cdef extern from "arrow/dataset/parquet_encryption_config.h" namespace "arrow::dataset" nogil: + cdef cppclass CParquetEncryptionConfig "arrow::dataset::ParquetEncryptionConfig": + shared_ptr[CCryptoFactory] crypto_factory + shared_ptr[CKmsConnectionConfig] kms_connection_config + shared_ptr[CEncryptionConfiguration] encryption_config + + cdef cppclass CParquetDecryptionConfig "arrow::dataset::ParquetDecryptionConfig": + shared_ptr[CCryptoFactory] crypto_factory + shared_ptr[CKmsConnectionConfig] kms_connection_config + shared_ptr[CDecryptionConfiguration] decryption_config + + cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: cdef cppclass CParquetFileWriter \ @@ -31,6 +45,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: "arrow::dataset::ParquetFileWriteOptions"(CFileWriteOptions): shared_ptr[WriterProperties] writer_properties shared_ptr[ArrowWriterProperties] arrow_writer_properties + shared_ptr[CParquetEncryptionConfig] parquet_encryption_config cdef cppclass CParquetFileFragment "arrow::dataset::ParquetFileFragment"( CFileFragment): @@ -62,6 +77,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: "arrow::dataset::ParquetFragmentScanOptions"(CFragmentScanOptions): shared_ptr[CReaderProperties] reader_properties shared_ptr[ArrowReaderProperties] arrow_reader_properties + shared_ptr[CParquetDecryptionConfig] parquet_decryption_config cdef cppclass CParquetFactoryOptions \ "arrow::dataset::ParquetFactoryOptions": diff --git a/python/pyarrow/includes/libparquet_encryption.pxd b/python/pyarrow/includes/libparquet_encryption.pxd new file mode 100644 index 0000000000000..2b40414ce5383 --- /dev/null +++ b/python/pyarrow/includes/libparquet_encryption.pxd @@ -0,0 +1,130 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# distutils: language = c++ + +from pyarrow.includes.common cimport * +from pyarrow._parquet cimport (ParquetCipher, + CFileEncryptionProperties, + CFileDecryptionProperties, + ParquetCipher_AES_GCM_V1, + ParquetCipher_AES_GCM_CTR_V1) + + +cdef extern from "parquet/encryption/kms_client.h" \ + namespace "parquet::encryption" nogil: + cdef cppclass CKmsClient" parquet::encryption::KmsClient": + c_string WrapKey(const c_string& key_bytes, + const c_string& master_key_identifier) except + + c_string UnwrapKey(const c_string& wrapped_key, + const c_string& master_key_identifier) except + + + cdef cppclass CKeyAccessToken" parquet::encryption::KeyAccessToken": + CKeyAccessToken(const c_string value) + void Refresh(const c_string& new_value) + const c_string& value() const + + cdef cppclass CKmsConnectionConfig \ + " parquet::encryption::KmsConnectionConfig": + CKmsConnectionConfig() + c_string kms_instance_id + c_string kms_instance_url + shared_ptr[CKeyAccessToken] refreshable_key_access_token + unordered_map[c_string, c_string] custom_kms_conf + +# Callbacks for implementing Python kms clients +# Use typedef to emulate syntax for std::function +ctypedef void CallbackWrapKey( + object, const c_string&, const c_string&, c_string*) +ctypedef void CallbackUnwrapKey( + object, const c_string&, const c_string&, c_string*) + +cdef extern from "parquet/encryption/kms_client_factory.h" \ + namespace "parquet::encryption" nogil: + cdef cppclass CKmsClientFactory" parquet::encryption::KmsClientFactory": + shared_ptr[CKmsClient] CreateKmsClient( + const CKmsConnectionConfig& kms_connection_config) except + + +# Callbacks for implementing Python kms client factories +# Use typedef to emulate syntax for std::function +ctypedef void CallbackCreateKmsClient( + object, + const CKmsConnectionConfig&, shared_ptr[CKmsClient]*) + +cdef extern from "parquet/encryption/crypto_factory.h" \ + namespace "parquet::encryption" nogil: + cdef cppclass CEncryptionConfiguration\ + " parquet::encryption::EncryptionConfiguration": + CEncryptionConfiguration(const c_string& footer_key) except + + c_string footer_key + c_string column_keys + ParquetCipher encryption_algorithm + c_bool plaintext_footer + c_bool double_wrapping + double cache_lifetime_seconds + c_bool internal_key_material + int32_t data_key_length_bits + + cdef cppclass CDecryptionConfiguration\ + " parquet::encryption::DecryptionConfiguration": + CDecryptionConfiguration() except + + double cache_lifetime_seconds + + cdef cppclass CCryptoFactory" parquet::encryption::CryptoFactory": + void RegisterKmsClientFactory( + shared_ptr[CKmsClientFactory] kms_client_factory) except + + shared_ptr[CFileEncryptionProperties] GetFileEncryptionProperties( + const CKmsConnectionConfig& kms_connection_config, + const CEncryptionConfiguration& encryption_config) except +* + shared_ptr[CFileDecryptionProperties] GetFileDecryptionProperties( + const CKmsConnectionConfig& kms_connection_config, + const CDecryptionConfiguration& decryption_config) except +* + void RemoveCacheEntriesForToken(const c_string& access_token) except + + void RemoveCacheEntriesForAllTokens() except + + +cdef extern from "arrow/python/parquet_encryption.h" \ + namespace "arrow::py::parquet::encryption" nogil: + cdef cppclass CPyKmsClientVtable \ + " arrow::py::parquet::encryption::PyKmsClientVtable": + CPyKmsClientVtable() + function[CallbackWrapKey] wrap_key + function[CallbackUnwrapKey] unwrap_key + + cdef cppclass CPyKmsClient\ + " arrow::py::parquet::encryption::PyKmsClient"(CKmsClient): + CPyKmsClient(object handler, CPyKmsClientVtable vtable) + + cdef cppclass CPyKmsClientFactoryVtable\ + " arrow::py::parquet::encryption::PyKmsClientFactoryVtable": + CPyKmsClientFactoryVtable() + function[CallbackCreateKmsClient] create_kms_client + + cdef cppclass CPyKmsClientFactory\ + " arrow::py::parquet::encryption::PyKmsClientFactory"( + CKmsClientFactory): + CPyKmsClientFactory(object handler, CPyKmsClientFactoryVtable vtable) + + cdef cppclass CPyCryptoFactory\ + " arrow::py::parquet::encryption::PyCryptoFactory"(CCryptoFactory): + CResult[shared_ptr[CFileEncryptionProperties]] \ + SafeGetFileEncryptionProperties( + const CKmsConnectionConfig& kms_connection_config, + const CEncryptionConfiguration& encryption_config) + CResult[shared_ptr[CFileDecryptionProperties]] \ + SafeGetFileDecryptionProperties( + const CKmsConnectionConfig& kms_connection_config, + const CDecryptionConfiguration& decryption_config) diff --git a/python/pyarrow/tests/test_dataset_encryption.py b/python/pyarrow/tests/test_dataset_encryption.py new file mode 100644 index 0000000000000..cf9c07a06b813 --- /dev/null +++ b/python/pyarrow/tests/test_dataset_encryption.py @@ -0,0 +1,148 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from datetime import timedelta +import pyarrow.fs as fs +import pyarrow as pa +import pytest + +encryption_unavailable = False + +try: + import pyarrow.dataset as ds +except ImportError: + ds = None + +try: + from pyarrow.tests.parquet.encryption import InMemoryKmsClient + import pyarrow.parquet.encryption as pe +except ImportError: + encryption_unavailable = True + +FOOTER_KEY = b"0123456789112345" +FOOTER_KEY_NAME = "footer_key" +COL_KEY = b"1234567890123450" +COL_KEY_NAME = "col_key" + + +def create_sample_table(): + return pa.table( + { + "year": [2020, 2022, 2021, 2022, 2019, 2021], + "n_legs": [2, 2, 4, 4, 5, 100], + "animal": [ + "Flamingo", + "Parrot", + "Dog", + "Horse", + "Brittle stars", + "Centipede", + ], + } + ) + + +def create_encryption_config(): + return pe.EncryptionConfiguration( + footer_key=FOOTER_KEY_NAME, + plaintext_footer=False, + column_keys={COL_KEY_NAME: ["n_legs", "animal"]}, + encryption_algorithm="AES_GCM_V1", + # requires timedelta or an assertion is raised + cache_lifetime=timedelta(minutes=5.0), + data_key_length_bits=256, + ) + + +def create_decryption_config(): + return pe.DecryptionConfiguration(cache_lifetime=300) + + +def create_kms_connection_config(): + return pe.KmsConnectionConfig( + custom_kms_conf={ + FOOTER_KEY_NAME: FOOTER_KEY.decode("UTF-8"), + COL_KEY_NAME: COL_KEY.decode("UTF-8"), + } + ) + + +def kms_factory(kms_connection_configuration): + return InMemoryKmsClient(kms_connection_configuration) + + +@pytest.mark.skipif( + encryption_unavailable, reason="Parquet Encryption is not currently enabled" +) +def test_dataset_encryption_decryption(): + table = create_sample_table() + + encryption_config = create_encryption_config() + decryption_config = create_decryption_config() + kms_connection_config = create_kms_connection_config() + + crypto_factory = pe.CryptoFactory(kms_factory) + parquet_encryption_cfg = ds.ParquetEncryptionConfig( + crypto_factory, kms_connection_config, encryption_config + ) + parquet_decryption_cfg = ds.ParquetDecryptionConfig( + crypto_factory, kms_connection_config, decryption_config + ) + + # create write_options with dataset encryption config + pformat = pa.dataset.ParquetFileFormat() + write_options = pformat.make_write_options(encryption_config=parquet_encryption_cfg) + + mockfs = fs._MockFileSystem() + mockfs.create_dir("/") + + ds.write_dataset( + data=table, + base_dir="sample_dataset", + format=pformat, + file_options=write_options, + filesystem=mockfs, + ) + + # read without descryption config -> should error is dataset was properly encrypted + pformat = pa.dataset.ParquetFileFormat() + with pytest.raises(IOError, match=r"no decryption"): + ds.dataset("sample_dataset", format=pformat, filesystem=mockfs) + + # set decryption config for parquet fragment scan options + pq_scan_opts = ds.ParquetFragmentScanOptions( + decryption_config=parquet_decryption_cfg + ) + pformat = pa.dataset.ParquetFileFormat(default_fragment_scan_options=pq_scan_opts) + dataset = ds.dataset("sample_dataset", format=pformat, filesystem=mockfs) + + assert table.equals(dataset.to_table()) + + +@pytest.mark.skipif( + not encryption_unavailable, reason="Parquet Encryption is currently enabled" +) +def test_write_dataset_parquet_without_encryption(): + """Test write_dataset with ParquetFileFormat and test if an exception is thrown + if you try to set encryption_config using make_write_options""" + + # Set the encryption configuration using ParquetFileFormat + # and make_write_options + pformat = pa.dataset.ParquetFileFormat() + + with pytest.raises(NotImplementedError): + _ = pformat.make_write_options(encryption_config="some value")