3131import org .apache .iceberg .TableProperties ;
3232import org .apache .iceberg .UpdateRequirement ;
3333import org .apache .iceberg .UpdateRequirements ;
34+ import org .apache .iceberg .encryption .EncryptedKey ;
35+ import org .apache .iceberg .encryption .EncryptingFileIO ;
3436import org .apache .iceberg .encryption .EncryptionManager ;
37+ import org .apache .iceberg .encryption .EncryptionUtil ;
38+ import org .apache .iceberg .encryption .KeyManagementClient ;
39+ import org .apache .iceberg .encryption .PlaintextEncryptionManager ;
40+ import org .apache .iceberg .encryption .StandardEncryptionManager ;
3541import org .apache .iceberg .io .FileIO ;
3642import org .apache .iceberg .io .LocationProvider ;
3743import org .apache .iceberg .relocated .com .google .common .base .Preconditions ;
3844import org .apache .iceberg .relocated .com .google .common .collect .ImmutableList ;
3945import org .apache .iceberg .relocated .com .google .common .collect .Lists ;
46+ import org .apache .iceberg .relocated .com .google .common .collect .Maps ;
4047import org .apache .iceberg .rest .requests .UpdateTableRequest ;
4148import org .apache .iceberg .rest .responses .ErrorResponse ;
4249import org .apache .iceberg .rest .responses .LoadTableResponse ;
@@ -55,27 +62,45 @@ enum UpdateType {
5562 private final String path ;
5663 private final Supplier <Map <String , String >> headers ;
5764 private final FileIO io ;
65+ private final KeyManagementClient kmsClient ;
5866 private final List <MetadataUpdate > createChanges ;
5967 private final TableMetadata replaceBase ;
6068 private final Set <Endpoint > endpoints ;
6169 private UpdateType updateType ;
6270 private TableMetadata current ;
6371
72+ private EncryptionManager encryptionManager ;
73+ private EncryptingFileIO encryptingFileIO ;
74+ private String encryptionKeyId ;
75+ private int encryptionDekLength ;
76+ private List <EncryptedKey > encryptedKeysFromMetadata ;
77+
6478 RESTTableOperations (
6579 RESTClient client ,
6680 String path ,
6781 Supplier <Map <String , String >> headers ,
6882 FileIO io ,
83+ KeyManagementClient kmsClient ,
6984 TableMetadata current ,
7085 Set <Endpoint > endpoints ) {
71- this (client , path , headers , io , UpdateType .SIMPLE , Lists .newArrayList (), current , endpoints );
86+ this (
87+ client ,
88+ path ,
89+ headers ,
90+ io ,
91+ kmsClient ,
92+ UpdateType .SIMPLE ,
93+ Lists .newArrayList (),
94+ current ,
95+ endpoints );
7296 }
7397
7498 RESTTableOperations (
7599 RESTClient client ,
76100 String path ,
77101 Supplier <Map <String , String >> headers ,
78102 FileIO io ,
103+ KeyManagementClient kmsClient ,
79104 UpdateType updateType ,
80105 List <MetadataUpdate > createChanges ,
81106 TableMetadata current ,
@@ -84,6 +109,7 @@ enum UpdateType {
84109 this .path = path ;
85110 this .headers = headers ;
86111 this .io = io ;
112+ this .kmsClient = kmsClient ;
87113 this .updateType = updateType ;
88114 this .createChanges = createChanges ;
89115 this .replaceBase = current ;
@@ -93,6 +119,10 @@ enum UpdateType {
93119 this .current = current ;
94120 }
95121 this .endpoints = endpoints ;
122+
123+ // N.B. We don't use this.current because for tables-to-be-created, because it would be null,
124+ // and ee still want encrypted properties in this case for its TableOperations.
125+ encryptionPropsFromMetadata (current );
96126 }
97127
98128 @ Override
@@ -113,6 +143,17 @@ public void commit(TableMetadata base, TableMetadata metadata) {
113143 Consumer <ErrorResponse > errorHandler ;
114144 List <UpdateRequirement > requirements ;
115145 List <MetadataUpdate > updates ;
146+
147+ if (encryption () instanceof StandardEncryptionManager ) {
148+ TableMetadata .Builder builder = TableMetadata .buildFrom (metadata );
149+ for (Map .Entry <String , EncryptedKey > entry :
150+ EncryptionUtil .encryptionKeys (encryption ()).entrySet ()) {
151+ builder .addEncryptionKey (entry .getValue ());
152+ }
153+ metadata = builder .build ();
154+ // TODO(smaheshwar): Think about requirements.
155+ }
156+
116157 switch (updateType ) {
117158 case CREATE :
118159 Preconditions .checkState (
@@ -166,7 +207,67 @@ public void commit(TableMetadata base, TableMetadata metadata) {
166207
167208 @ Override
168209 public FileIO io () {
169- return io ;
210+ if (encryptionKeyId == null ) {
211+ return io ;
212+ }
213+
214+ if (encryptingFileIO == null ) {
215+ encryptingFileIO = EncryptingFileIO .combine (io , encryption ());
216+ }
217+
218+ return encryptingFileIO ;
219+ }
220+
221+ @ Override
222+ public EncryptionManager encryption () {
223+ if (encryptionManager != null ) {
224+ return encryptionManager ;
225+ }
226+
227+ if (encryptionKeyId != null ) {
228+ if (kmsClient == null ) {
229+ throw new RuntimeException (
230+ "Cant create encryption manager, because key management client is not set" );
231+ }
232+
233+ Map <String , String > tableProperties = Maps .newHashMap ();
234+ tableProperties .put (TableProperties .ENCRYPTION_TABLE_KEY , encryptionKeyId );
235+ tableProperties .put (
236+ TableProperties .ENCRYPTION_DEK_LENGTH , String .valueOf (encryptionDekLength ));
237+ encryptionManager =
238+ EncryptionUtil .createEncryptionManager (
239+ encryptedKeysFromMetadata , tableProperties , kmsClient );
240+ } else {
241+ return PlaintextEncryptionManager .instance ();
242+ }
243+
244+ return encryptionManager ;
245+ }
246+
247+ private void encryptionPropsFromMetadata (TableMetadata metadata ) {
248+ // TODO(smaheshwar): Check generally for changed encryption-related properties!
249+ if (metadata == null || metadata .properties () == null ) {
250+ return ;
251+ }
252+
253+ encryptedKeysFromMetadata = metadata .encryptionKeys ();
254+
255+ Map <String , String > tableProperties = metadata .properties ();
256+ if (encryptionKeyId == null ) {
257+ encryptionKeyId = tableProperties .get (TableProperties .ENCRYPTION_TABLE_KEY );
258+ }
259+
260+ if (encryptionKeyId != null && encryptionDekLength <= 0 ) {
261+ String dekLength = tableProperties .get (TableProperties .ENCRYPTION_DEK_LENGTH );
262+ encryptionDekLength =
263+ (dekLength == null )
264+ ? TableProperties .ENCRYPTION_DEK_LENGTH_DEFAULT
265+ : Integer .parseInt (dekLength );
266+ }
267+
268+ // Force re-creation of encryptingFileIO and encryptionManager
269+ encryptingFileIO = null ;
270+ encryptionManager = null ;
170271 }
171272
172273 private TableMetadata updateCurrentMetadata (LoadTableResponse response ) {
@@ -175,6 +276,7 @@ private TableMetadata updateCurrentMetadata(LoadTableResponse response) {
175276 // safely ignored. there is no requirement to update config on refresh or commit.
176277 if (current == null
177278 || !Objects .equals (current .metadataFileLocation (), response .metadataLocation ())) {
279+ encryptionPropsFromMetadata (response .tableMetadata ());
178280 this .current = response .tableMetadata ();
179281 }
180282
0 commit comments