Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ protected void refreshFromMetadataLocation(
this.shouldRefresh = false;
}

private String metadataFileLocation(TableMetadata metadata, String filename) {
protected String metadataFileLocation(TableMetadata metadata, String filename) {
String metadataLocation = metadata.properties().get(TableProperties.WRITE_METADATA_LOCATION);

if (metadataLocation != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public static KeyManagementClient createKmsClient(Map<String, String> catalogPro
return kmsClient;
}

static EncryptionManager createEncryptionManager(
public static EncryptionManager createEncryptionManager(
List<EncryptedKey> keys, Map<String, String> tableProperties, KeyManagementClient kmsClient) {
Preconditions.checkArgument(kmsClient != null, "Invalid KMS client: null");
String tableKeyId = tableProperties.get(TableProperties.ENCRYPTION_TABLE_KEY);
Expand All @@ -96,7 +96,7 @@ static EncryptionManager createEncryptionManager(
"Invalid data key length: %s (must be 16, 24, or 32)",
dataKeyLength);

return new StandardEncryptionManager(tableKeyId, dataKeyLength, kmsClient);
return new StandardEncryptionManager(keys, tableKeyId, dataKeyLength, kmsClient);
}

public static EncryptedOutputFile plainAsEncryptedOutput(OutputFile encryptingOutputFile) {
Expand Down Expand Up @@ -128,6 +128,14 @@ public static ByteBuffer decryptManifestListKeyMetadata(
return ByteBuffer.wrap(decryptedKeyMetadata);
}

public static Map<String, EncryptedKey> encryptionKeys(EncryptionManager em) {
Preconditions.checkState(
em instanceof StandardEncryptionManager,
"Retrieving encryption keys requires a StandardEncryptionManager");
StandardEncryptionManager sem = (StandardEncryptionManager) em;
return sem.encryptionKeys();
}

/**
* Encrypts the key metadata for a manifest list.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.nio.ByteBuffer;
import java.security.SecureRandom;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.iceberg.TableProperties;
Expand All @@ -46,9 +47,19 @@ private class TransientEncryptionState {
private final Map<String, EncryptedKey> encryptionKeys;
private final LoadingCache<String, ByteBuffer> unwrappedKeyCache;

private TransientEncryptionState(KeyManagementClient kmsClient) {
private TransientEncryptionState(KeyManagementClient kmsClient, List<EncryptedKey> keys) {
this.kmsClient = kmsClient;
this.encryptionKeys = Maps.newLinkedHashMap();

if (keys != null) {
for (EncryptedKey key : keys) {
encryptionKeys.put(
key.keyId(),
new BaseEncryptedKey(
key.keyId(), key.encryptedKeyMetadata(), key.encryptedById(), key.properties()));
}
}

this.unwrappedKeyCache =
Caffeine.newBuilder()
.expireAfterWrite(1, TimeUnit.HOURS)
Expand All @@ -64,20 +75,33 @@ private TransientEncryptionState(KeyManagementClient kmsClient) {
private transient volatile SecureRandom lazyRNG = null;

/**
* @deprecated will be removed in 2.0.
*/
@Deprecated
public StandardEncryptionManager(
String tableKeyId, int dataKeyLength, KeyManagementClient kmsClient) {
this(List.of(), tableKeyId, dataKeyLength, kmsClient);
}

/**
* @param keys encryption keys from table metadata
* @param tableKeyId table encryption key id
* @param dataKeyLength length of data encryption key (16/24/32 bytes)
* @param kmsClient Client of KMS used to wrap/unwrap keys in envelope encryption
*/
public StandardEncryptionManager(
String tableKeyId, int dataKeyLength, KeyManagementClient kmsClient) {
List<EncryptedKey> keys,
String tableKeyId,
int dataKeyLength,
KeyManagementClient kmsClient) {
Preconditions.checkNotNull(tableKeyId, "Invalid encryption key ID: null");
Preconditions.checkArgument(
dataKeyLength == 16 || dataKeyLength == 24 || dataKeyLength == 32,
"Invalid data key length: %s (must be 16, 24, or 32)",
dataKeyLength);
Preconditions.checkNotNull(kmsClient, "Invalid KMS client: null");
this.tableKeyId = tableKeyId;
this.transientState = new TransientEncryptionState(kmsClient);
this.transientState = new TransientEncryptionState(kmsClient, keys);
this.dataKeyLength = dataKeyLength;
}

Expand Down Expand Up @@ -134,6 +158,14 @@ public ByteBuffer unwrapKey(ByteBuffer wrappedSecretKey) {
return transientState.kmsClient.unwrapKey(wrappedSecretKey, tableKeyId);
}

Map<String, EncryptedKey> encryptionKeys() {
if (transientState == null) {
throw new IllegalStateException("Cannot return the encryption keys after serialization");
}

return transientState.encryptionKeys;
}

private String keyEncryptionKeyID() {
if (transientState == null) {
throw new IllegalStateException("Cannot return the current key after serialization");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.hive;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -45,6 +46,8 @@
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.encryption.EncryptionUtil;
import org.apache.iceberg.encryption.KeyManagementClient;
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchIcebergViewException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
Expand Down Expand Up @@ -88,6 +91,7 @@ public class HiveCatalog extends BaseMetastoreViewCatalog
private String name;
private Configuration conf;
private FileIO fileIO;
private KeyManagementClient keyManagementClient;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if some this logic should live in the parent BaseMetastoreCatalog class, I assume it will be relevant for most of the other sub classes as well as other implementers? I'm not sure though, and it can always get moved later

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed wrt relevance, but there are a couple of issues:

  • Security. If table properties are fetched by a catalog from the storage backend (eg the metadata file), then a "key removal attack" becomes possible. In this attack, the key id is removed from the table properties by tampering with the metadata file. This makes the writers produce unencrypted data files.. Therefore, it is important to make sure each catalog client gets the table key id directly from its catalog service, and not from the storage. Also, one of the classes that extend BaseMetastoreCatalog is the HadoopCatalog, which is not safe for encryption because it does not have a catalog service independent of the storage backend.
  • Code re-use is thin. Most of the PR logic is in the "table operations". But the "newTableOps" is an abstract method in BaseMetastoreCatalog; which makes sense since many things (inc table key retrieval) are catalog-specific.

Maybe we should start with the Hive catalog? (Plus the REST catalog in another PR). Then, as more catalogs are added, we can check if/how we can safely move a common encryption code into a parent or util?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After the last update, the reusable code grew; but still, we have the security challenge. Also, the Hive and REST catalogs don't have a common parent. I'd still take the option of handling this later, as more catalogs are added.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to handle this later after more catalogs are added.

private ClientPool<IMetaStoreClient, TException> clients;
private boolean listAllTables = false;
private Map<String, String> catalogProperties;
Expand Down Expand Up @@ -122,6 +126,10 @@ public void initialize(String inputName, Map<String, String> properties) {
? new HadoopFileIO(conf)
: CatalogUtil.loadFileIO(fileIOImpl, properties, conf);

if (catalogProperties.containsKey(CatalogProperties.ENCRYPTION_KMS_IMPL)) {
this.keyManagementClient = EncryptionUtil.createKmsClient(properties);
}

this.clients = new CachedClientPool(conf, properties);
}

Expand Down Expand Up @@ -686,7 +694,8 @@ private boolean isValidateNamespace(Namespace namespace) {
public TableOperations newTableOps(TableIdentifier tableIdentifier) {
String dbName = tableIdentifier.namespace().level(0);
String tableName = tableIdentifier.name();
return new HiveTableOperations(conf, clients, fileIO, name, dbName, tableName);
return new HiveTableOperations(
conf, clients, fileIO, keyManagementClient, name, dbName, tableName);
}

@Override
Expand Down Expand Up @@ -815,6 +824,15 @@ protected Map<String, String> properties() {
return catalogProperties == null ? ImmutableMap.of() : catalogProperties;
}

@Override
public void close() throws IOException {
super.close();

if (keyManagementClient != null) {
keyManagementClient.close();
}
}

@VisibleForTesting
void setListAllTables(boolean listAllTables) {
this.listAllTables = listAllTables;
Expand Down
Loading