3131import org .apache .iceberg .TableProperties ;
3232import org .apache .iceberg .UpdateRequirement ;
3333import org .apache .iceberg .UpdateRequirements ;
34+ import org .apache .iceberg .encryption .EncryptedKey ;
35+ import org .apache .iceberg .encryption .EncryptingFileIO ;
3436import org .apache .iceberg .encryption .EncryptionManager ;
37+ import org .apache .iceberg .encryption .EncryptionUtil ;
38+ import org .apache .iceberg .encryption .KeyManagementClient ;
39+ import org .apache .iceberg .encryption .PlaintextEncryptionManager ;
40+ import org .apache .iceberg .encryption .StandardEncryptionManager ;
3541import org .apache .iceberg .io .FileIO ;
3642import org .apache .iceberg .io .LocationProvider ;
3743import org .apache .iceberg .relocated .com .google .common .base .Preconditions ;
3844import org .apache .iceberg .relocated .com .google .common .collect .ImmutableList ;
3945import org .apache .iceberg .relocated .com .google .common .collect .Lists ;
46+ import org .apache .iceberg .relocated .com .google .common .collect .Maps ;
4047import org .apache .iceberg .rest .requests .UpdateTableRequest ;
4148import org .apache .iceberg .rest .responses .ErrorResponse ;
4249import org .apache .iceberg .rest .responses .LoadTableResponse ;
@@ -55,27 +62,45 @@ enum UpdateType {
5562 private final String path ;
5663 private final Supplier <Map <String , String >> headers ;
5764 private final FileIO io ;
65+ private final KeyManagementClient kmsClient ;
5866 private final List <MetadataUpdate > createChanges ;
5967 private final TableMetadata replaceBase ;
6068 private final Set <Endpoint > endpoints ;
6169 private UpdateType updateType ;
6270 private TableMetadata current ;
6371
72+ private EncryptionManager encryptionManager ;
73+ private EncryptingFileIO encryptingFileIO ;
74+ private String encryptionKeyId ;
75+ private int encryptionDekLength ;
76+ private List <EncryptedKey > encryptedKeysFromMetadata ;
77+
6478 RESTTableOperations (
6579 RESTClient client ,
6680 String path ,
6781 Supplier <Map <String , String >> headers ,
6882 FileIO io ,
83+ KeyManagementClient kmsClient ,
6984 TableMetadata current ,
7085 Set <Endpoint > endpoints ) {
71- this (client , path , headers , io , UpdateType .SIMPLE , Lists .newArrayList (), current , endpoints );
86+ this (
87+ client ,
88+ path ,
89+ headers ,
90+ io ,
91+ kmsClient ,
92+ UpdateType .SIMPLE ,
93+ Lists .newArrayList (),
94+ current ,
95+ endpoints );
7296 }
7397
7498 RESTTableOperations (
7599 RESTClient client ,
76100 String path ,
77101 Supplier <Map <String , String >> headers ,
78102 FileIO io ,
103+ KeyManagementClient kmsClient ,
79104 UpdateType updateType ,
80105 List <MetadataUpdate > createChanges ,
81106 TableMetadata current ,
@@ -84,6 +109,7 @@ enum UpdateType {
84109 this .path = path ;
85110 this .headers = headers ;
86111 this .io = io ;
112+ this .kmsClient = kmsClient ;
87113 this .updateType = updateType ;
88114 this .createChanges = createChanges ;
89115 this .replaceBase = current ;
@@ -93,6 +119,10 @@ enum UpdateType {
93119 this .current = current ;
94120 }
95121 this .endpoints = endpoints ;
122+
123+ // N.B. We don't use this.current because for tables-to-be-created, because it would be null,
124+ // and ee still want encrypted properties in this case for its TableOperations.
125+ encryptionPropsFromMetadata (current );
96126 }
97127
98128 @ Override
@@ -113,14 +143,26 @@ public void commit(TableMetadata base, TableMetadata metadata) {
113143 Consumer <ErrorResponse > errorHandler ;
114144 List <UpdateRequirement > requirements ;
115145 List <MetadataUpdate > updates ;
146+
147+ TableMetadata metadataToCommit = metadata ;
148+ if (encryption () instanceof StandardEncryptionManager ) {
149+ TableMetadata .Builder builder = TableMetadata .buildFrom (metadata );
150+ for (Map .Entry <String , EncryptedKey > entry :
151+ EncryptionUtil .encryptionKeys (encryption ()).entrySet ()) {
152+ builder .addEncryptionKey (entry .getValue ());
153+ }
154+ metadataToCommit = builder .build ();
155+ // TODO(smaheshwar): Think about requirements.
156+ }
157+
116158 switch (updateType ) {
117159 case CREATE :
118160 Preconditions .checkState (
119161 base == null , "Invalid base metadata for create transaction, expected null: %s" , base );
120162 updates =
121163 ImmutableList .<MetadataUpdate >builder ()
122164 .addAll (createChanges )
123- .addAll (metadata .changes ())
165+ .addAll (metadataToCommit .changes ())
124166 .build ();
125167 requirements = UpdateRequirements .forCreateTable (updates );
126168 errorHandler = ErrorHandlers .tableErrorHandler (); // throws NoSuchTableException
@@ -131,7 +173,7 @@ public void commit(TableMetadata base, TableMetadata metadata) {
131173 updates =
132174 ImmutableList .<MetadataUpdate >builder ()
133175 .addAll (createChanges )
134- .addAll (metadata .changes ())
176+ .addAll (metadataToCommit .changes ())
135177 .build ();
136178 // use the original replace base metadata because the transaction will refresh
137179 requirements = UpdateRequirements .forReplaceTable (replaceBase , updates );
@@ -140,7 +182,7 @@ public void commit(TableMetadata base, TableMetadata metadata) {
140182
141183 case SIMPLE :
142184 Preconditions .checkState (base != null , "Invalid base metadata: null" );
143- updates = metadata .changes ();
185+ updates = metadataToCommit .changes ();
144186 requirements = UpdateRequirements .forUpdateTable (base , updates );
145187 errorHandler = ErrorHandlers .tableCommitHandler ();
146188 break ;
@@ -166,7 +208,67 @@ public void commit(TableMetadata base, TableMetadata metadata) {
166208
167209 @ Override
168210 public FileIO io () {
169- return io ;
211+ if (encryptionKeyId == null ) {
212+ return io ;
213+ }
214+
215+ if (encryptingFileIO == null ) {
216+ encryptingFileIO = EncryptingFileIO .combine (io , encryption ());
217+ }
218+
219+ return encryptingFileIO ;
220+ }
221+
222+ @ Override
223+ public EncryptionManager encryption () {
224+ if (encryptionManager != null ) {
225+ return encryptionManager ;
226+ }
227+
228+ if (encryptionKeyId != null ) {
229+ if (kmsClient == null ) {
230+ throw new RuntimeException (
231+ "Cant create encryption manager, because key management client is not set" );
232+ }
233+
234+ Map <String , String > tableProperties = Maps .newHashMap ();
235+ tableProperties .put (TableProperties .ENCRYPTION_TABLE_KEY , encryptionKeyId );
236+ tableProperties .put (
237+ TableProperties .ENCRYPTION_DEK_LENGTH , String .valueOf (encryptionDekLength ));
238+ encryptionManager =
239+ EncryptionUtil .createEncryptionManager (
240+ encryptedKeysFromMetadata , tableProperties , kmsClient );
241+ } else {
242+ return PlaintextEncryptionManager .instance ();
243+ }
244+
245+ return encryptionManager ;
246+ }
247+
248+ private void encryptionPropsFromMetadata (TableMetadata metadata ) {
249+ // TODO(smaheshwar): Check generally for changed encryption-related properties!
250+ if (metadata == null || metadata .properties () == null ) {
251+ return ;
252+ }
253+
254+ encryptedKeysFromMetadata = metadata .encryptionKeys ();
255+
256+ Map <String , String > tableProperties = metadata .properties ();
257+ if (encryptionKeyId == null ) {
258+ encryptionKeyId = tableProperties .get (TableProperties .ENCRYPTION_TABLE_KEY );
259+ }
260+
261+ if (encryptionKeyId != null && encryptionDekLength <= 0 ) {
262+ String dekLength = tableProperties .get (TableProperties .ENCRYPTION_DEK_LENGTH );
263+ encryptionDekLength =
264+ (dekLength == null )
265+ ? TableProperties .ENCRYPTION_DEK_LENGTH_DEFAULT
266+ : Integer .parseInt (dekLength );
267+ }
268+
269+ // Force re-creation of encryptingFileIO and encryptionManager
270+ encryptingFileIO = null ;
271+ encryptionManager = null ;
170272 }
171273
172274 private TableMetadata updateCurrentMetadata (LoadTableResponse response ) {
@@ -175,6 +277,7 @@ private TableMetadata updateCurrentMetadata(LoadTableResponse response) {
175277 // safely ignored. there is no requirement to update config on refresh or commit.
176278 if (current == null
177279 || !Objects .equals (current .metadataFileLocation (), response .metadataLocation ())) {
280+ encryptionPropsFromMetadata (response .tableMetadata ());
178281 this .current = response .tableMetadata ();
179282 }
180283
0 commit comments