@@ -84,7 +84,7 @@ public class HiveTableOperations extends BaseMetastoreTableOperations
8484
8585 private EncryptionManager encryptionManager ;
8686 private EncryptingFileIO encryptingFileIO ;
87- private String encryptionKeyId ;
87+ private String tableKeyId ;
8888 private int encryptionDekLength ;
8989 private List <EncryptedKey > encryptedKeysFromMetadata ;
9090
@@ -119,7 +119,7 @@ protected String tableName() {
119119
120120 @ Override
121121 public FileIO io () {
122- if (encryptionKeyId == null ) {
122+ if (tableKeyId == null ) {
123123 return fileIO ;
124124 }
125125
@@ -136,19 +136,19 @@ public EncryptionManager encryption() {
136136 return encryptionManager ;
137137 }
138138
139- if (encryptionKeyId != null ) {
139+ if (tableKeyId != null ) {
140140 if (keyManagementClient == null ) {
141141 throw new RuntimeException (
142142 "Cant create encryption manager, because key management client is not set" );
143143 }
144144
145- Map <String , String > tableProperties = Maps .newHashMap ();
146- tableProperties .put (TableProperties .ENCRYPTION_TABLE_KEY , encryptionKeyId );
147- tableProperties .put (
145+ Map <String , String > encryptionProperties = Maps .newHashMap ();
146+ encryptionProperties .put (TableProperties .ENCRYPTION_TABLE_KEY , tableKeyId );
147+ encryptionProperties .put (
148148 TableProperties .ENCRYPTION_DEK_LENGTH , String .valueOf (encryptionDekLength ));
149149 encryptionManager =
150150 EncryptionUtil .createEncryptionManager (
151- encryptedKeysFromMetadata , tableProperties , keyManagementClient );
151+ encryptedKeysFromMetadata , encryptionProperties , keyManagementClient );
152152 } else {
153153 return PlaintextEncryptionManager .instance ();
154154 }
@@ -159,7 +159,7 @@ public EncryptionManager encryption() {
159159 @ Override
160160 protected void doRefresh () {
161161 String metadataLocation = null ;
162- String encryptionKeyIdFromHMS = null ;
162+ String tableKeyIdFromHMS = null ;
163163 String dekLengthFromHMS = null ;
164164 try {
165165 Table table = metaClients .run (client -> client .getTable (database , tableName ));
@@ -170,9 +170,12 @@ protected void doRefresh() {
170170 HiveOperationsBase .validateTableIsIceberg (table , fullName );
171171
172172 metadataLocation = table .getParameters ().get (METADATA_LOCATION_PROP );
173- encryptionKeyIdFromHMS = table .getParameters ().get (TableProperties .ENCRYPTION_TABLE_KEY );
173+ /* Table key ID must be retrieved from a catalog service, and not from untrusted storage
174+ (e.g. metadata json file) that can be tampered with. For example, an attacker can remove
175+ the table key parameter (along with existing snapshots) in the file, making the writers
176+ produce unencrypted files. Table key ID is taken directly from HMS catalog */
177+ tableKeyIdFromHMS = table .getParameters ().get (TableProperties .ENCRYPTION_TABLE_KEY );
174178 dekLengthFromHMS = table .getParameters ().get (TableProperties .ENCRYPTION_DEK_LENGTH );
175-
176179 } catch (NoSuchObjectException e ) {
177180 if (currentMetadataLocation () != null ) {
178181 throw new NoSuchTableException ("No such table: %s.%s" , database , tableName );
@@ -188,18 +191,16 @@ protected void doRefresh() {
188191 throw new RuntimeException ("Interrupted during refresh" , e );
189192 }
190193
191- if (encryptionKeyIdFromHMS != null ) {
192- encryptionKeyId = encryptionKeyIdFromHMS ; // todo gg
194+ refreshFromMetadataLocation (metadataLocation , metadataRefreshMaxRetries );
195+
196+ if (tableKeyIdFromHMS != null ) {
197+ tableKeyId = tableKeyIdFromHMS ;
193198 encryptionDekLength =
194199 (dekLengthFromHMS != null )
195200 ? Integer .parseInt (dekLengthFromHMS )
196- : TableProperties .ENCRYPTION_DEK_LENGTH_DEFAULT ; // todo gg
197- }
201+ : TableProperties .ENCRYPTION_DEK_LENGTH_DEFAULT ;
198202
199- refreshFromMetadataLocation (metadataLocation , metadataRefreshMaxRetries );
200-
201- if (encryptionKeyIdFromHMS != null ) {
202- checkEncryptionProperties (encryptionKeyIdFromHMS , dekLengthFromHMS );
203+ checkEncryptionProperties (tableKeyIdFromHMS , dekLengthFromHMS );
203204 encryptedKeysFromMetadata = current ().encryptionKeys ();
204205 }
205206 }
@@ -210,16 +211,20 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
210211 boolean newTable = base == null ;
211212 encryptionPropsFromMetadata (metadata .properties ());
212213
214+ String newMetadataLocation ;
213215 if (encryption () instanceof StandardEncryptionManager ) {
216+ // Add new encryption keys to the metadata
214217 TableMetadata .Builder builder = TableMetadata .buildFrom (metadata );
215218 for (Map .Entry <String , EncryptedKey > entry :
216219 EncryptionUtil .encryptionKeys (encryption ()).entrySet ()) {
217220 builder .addEncryptionKey (entry .getValue ());
218221 }
219- metadata = builder .build ();
222+
223+ newMetadataLocation = writeNewMetadataIfRequired (newTable , builder .build ());
224+ } else {
225+ newMetadataLocation = writeNewMetadataIfRequired (newTable , metadata );
220226 }
221227
222- String newMetadataLocation = writeNewMetadataIfRequired (newTable , metadata );
223228 boolean hiveEngineEnabled = hiveEngineEnabled (metadata , conf );
224229 boolean keepHiveStats = conf .getBoolean (ConfigProperties .KEEP_HIVE_STATS , false );
225230
@@ -272,10 +277,9 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
272277 // get Iceberg props that have been removed
273278 Set <String > removedProps = Collections .emptySet ();
274279 if (base != null ) {
275- TableMetadata finalMetadata = metadata ;
276280 removedProps =
277281 base .properties ().keySet ().stream ()
278- .filter (key -> !finalMetadata .properties ().containsKey (key ))
282+ .filter (key -> !metadata .properties ().containsKey (key ))
279283 .collect (Collectors .toSet ());
280284 }
281285
@@ -410,6 +414,54 @@ public ClientPool<IMetaStoreClient, TException> metaClients() {
410414 return metaClients ;
411415 }
412416
417+ @ Override
418+ public TableOperations temp (TableMetadata uncommittedMetadata ) {
419+ return new TableOperations () {
420+ @ Override
421+ public TableMetadata current () {
422+ return uncommittedMetadata ;
423+ }
424+
425+ @ Override
426+ public TableMetadata refresh () {
427+ throw new UnsupportedOperationException (
428+ "Cannot call refresh on temporary table operations" );
429+ }
430+
431+ @ Override
432+ public void commit (TableMetadata base , TableMetadata metadata ) {
433+ throw new UnsupportedOperationException ("Cannot call commit on temporary table operations" );
434+ }
435+
436+ @ Override
437+ public String metadataFileLocation (String fileName ) {
438+ return HiveTableOperations .this .metadataFileLocation (uncommittedMetadata , fileName );
439+ }
440+
441+ @ Override
442+ public LocationProvider locationProvider () {
443+ return LocationProviders .locationsFor (
444+ uncommittedMetadata .location (), uncommittedMetadata .properties ());
445+ }
446+
447+ @ Override
448+ public FileIO io () {
449+ HiveTableOperations .this .encryptionPropsFromMetadata (uncommittedMetadata .properties ());
450+ return HiveTableOperations .this .io ();
451+ }
452+
453+ @ Override
454+ public EncryptionManager encryption () {
455+ return HiveTableOperations .this .encryption ();
456+ }
457+
458+ @ Override
459+ public long newSnapshotId () {
460+ return HiveTableOperations .this .newSnapshotId ();
461+ }
462+ };
463+ }
464+
413465 /**
414466 * Returns if the hive engine related values should be enabled on the table, or not.
415467 *
@@ -465,11 +517,11 @@ private static boolean hiveLockEnabled(TableMetadata metadata, Configuration con
465517 }
466518
467519 private void encryptionPropsFromMetadata (Map <String , String > tableProperties ) {
468- if (encryptionKeyId == null ) {
469- encryptionKeyId = tableProperties .get (TableProperties .ENCRYPTION_TABLE_KEY );
520+ if (tableKeyId == null ) {
521+ tableKeyId = tableProperties .get (TableProperties .ENCRYPTION_TABLE_KEY );
470522 }
471523
472- if (encryptionKeyId != null && encryptionDekLength <= 0 ) {
524+ if (tableKeyId != null && encryptionDekLength <= 0 ) {
473525 String dekLength = tableProperties .get (TableProperties .ENCRYPTION_DEK_LENGTH );
474526 encryptionDekLength =
475527 (dekLength == null )
@@ -483,7 +535,7 @@ private void checkEncryptionProperties(String encryptionKeyIdFromHMS, String dek
483535
484536 String encryptionKeyIdFromMetadata =
485537 propertiesFromMetadata .get (TableProperties .ENCRYPTION_TABLE_KEY );
486- if (!encryptionKeyIdFromHMS .equals (encryptionKeyIdFromMetadata )) {
538+ if (!Objects .equals (encryptionKeyIdFromHMS , encryptionKeyIdFromMetadata )) {
487539 String errMsg =
488540 String .format (
489541 "Metadata file might have been modified. Encryption key id %s differs from HMS value %s" ,
0 commit comments