diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java
index da464f2215d8..c42195f9d6e8 100644
--- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java
+++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java
@@ -112,7 +112,7 @@ DataStream<TaskResult> append(DataStream<Trigger> trigger) {
           .transform(
               operatorName(DELETE_FILES_OPERATOR_NAME),
               TypeInformation.of(Void.class),
-              new DeleteFilesProcessor(name(), tableLoader(), deleteBatchSize))
+              new DeleteFilesProcessor(name(), tableLoader().loadTable(), deleteBatchSize))
           .uid(DELETE_FILES_OPERATOR_NAME + uidSuffix())
           .slotSharingGroup(slotSharingGroup())
           .setParallelism(parallelism());
diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskBuilder.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskBuilder.java
index ac4ba7642b5d..9c20067e9c59 100644
--- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskBuilder.java
+++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskBuilder.java
@@ -218,8 +218,6 @@ DataStream<TaskResult> append(
       slotSharingGroup = mainSlotSharingGroup;
     }
 
-    tableLoader.open();
-
     return append(sourceStream);
   }
 }
diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java
index 25d4754783b9..fab7325d168b 100644
--- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java
+++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iceberg.flink.maintenance.api;
 
+import java.io.IOException;
 import java.time.Duration;
 import java.util.List;
 import java.util.UUID;
@@ -57,9 +58,7 @@ public class TableMaintenance {
   static final String FILTER_OPERATOR_NAME_PREFIX = "Filter ";
   static final String LOCK_REMOVER_OPERATOR_NAME = "Lock remover";
 
-  private TableMaintenance() {
-    // Do not instantiate directly
-  }
+  private TableMaintenance() {}
 
   /**
    * Use when the change stream is already provided, like in the {@link
@@ -214,7 +213,7 @@ public Builder add(MaintenanceTaskBuilder<?> task) {
     }
 
     /** Builds the task graph for the maintenance tasks. */
-    public void append() {
+    public void append() throws IOException {
       Preconditions.checkArgument(!taskBuilders.isEmpty(), "Provide at least one task");
       Preconditions.checkNotNull(uidSuffix, "Uid suffix should no be null");
 
@@ -225,73 +224,67 @@ public void append() {
         evaluators.add(taskBuilders.get(i).evaluator());
       }
 
-      DataStream<Trigger> triggers =
-          DataStreamUtils.reinterpretAsKeyedStream(changeStream(), unused -> true)
-              .process(
-                  new TriggerManager(
-                      tableLoader,
-                      lockFactory,
-                      taskNames,
-                      evaluators,
-                      rateLimit.toMillis(),
-                      lockCheckDelay.toMillis()))
-              .name(TRIGGER_MANAGER_OPERATOR_NAME)
-              .uid(TRIGGER_MANAGER_OPERATOR_NAME + uidSuffix)
-              .slotSharingGroup(slotSharingGroup)
-              .forceNonParallel()
-              .assignTimestampsAndWatermarks(new PunctuatedWatermarkStrategy())
-              .name(WATERMARK_ASSIGNER_OPERATOR_NAME)
-              .uid(WATERMARK_ASSIGNER_OPERATOR_NAME + uidSuffix)
-              .slotSharingGroup(slotSharingGroup)
-              .forceNonParallel();
-
-      // Add the specific tasks
-      DataStream<TaskResult> unioned = null;
-      for (int i = 0; i < taskBuilders.size(); ++i) {
-        int finalIndex = i;
-        DataStream<Trigger> filtered =
-            triggers
-                .filter(t -> t.taskId() != null && t.taskId() == finalIndex)
-                .name(FILTER_OPERATOR_NAME_PREFIX + i)
+      try (TableLoader loader = tableLoader.clone()) {
+        DataStream<Trigger> triggers =
+            DataStreamUtils.reinterpretAsKeyedStream(changeStream(loader), unused -> true)
+                .process(
+                    new TriggerManager(
+                        loader,
+                        lockFactory,
+                        taskNames,
+                        evaluators,
+                        rateLimit.toMillis(),
+                        lockCheckDelay.toMillis()))
+                .name(TRIGGER_MANAGER_OPERATOR_NAME)
+                .uid(TRIGGER_MANAGER_OPERATOR_NAME + uidSuffix)
+                .slotSharingGroup(slotSharingGroup)
                 .forceNonParallel()
-                .uid(FILTER_OPERATOR_NAME_PREFIX + i + "-" + uidSuffix)
-                .slotSharingGroup(slotSharingGroup);
-        MaintenanceTaskBuilder<?> builder = taskBuilders.get(i);
-        DataStream<TaskResult> result =
-            builder.append(
-                filtered,
-                i,
-                taskNames.get(i),
-                tableLoader,
-                uidSuffix,
-                slotSharingGroup,
-                parallelism);
-        if (unioned == null) {
-          unioned = result;
-        } else {
-          unioned = unioned.union(result);
+                .assignTimestampsAndWatermarks(new PunctuatedWatermarkStrategy())
+                .name(WATERMARK_ASSIGNER_OPERATOR_NAME)
+                .uid(WATERMARK_ASSIGNER_OPERATOR_NAME + uidSuffix)
+                .slotSharingGroup(slotSharingGroup)
+                .forceNonParallel();
+
+        // Add the specific tasks
+        DataStream<TaskResult> unioned = null;
+        for (int i = 0; i < taskBuilders.size(); ++i) {
+          int finalIndex = i;
+          DataStream<Trigger> filtered =
+              triggers
+                  .filter(t -> t.taskId() != null && t.taskId() == finalIndex)
+                  .name(FILTER_OPERATOR_NAME_PREFIX + i)
+                  .forceNonParallel()
+                  .uid(FILTER_OPERATOR_NAME_PREFIX + i + "-" + uidSuffix)
+                  .slotSharingGroup(slotSharingGroup);
+          MaintenanceTaskBuilder<?> builder = taskBuilders.get(i);
+          DataStream<TaskResult> result =
+              builder.append(
+                  filtered, i, taskNames.get(i), loader, uidSuffix, slotSharingGroup, parallelism);
+          if (unioned == null) {
+            unioned = result;
+          } else {
+            unioned = unioned.union(result);
+          }
         }
-      }
 
-      // Add the LockRemover to the end
-      unioned
-          .transform(
-              LOCK_REMOVER_OPERATOR_NAME,
-              TypeInformation.of(Void.class),
-              new LockRemover(lockFactory, taskNames))
-          .forceNonParallel()
-          .uid("lock-remover-" + uidSuffix)
-          .slotSharingGroup(slotSharingGroup);
+        // Add the LockRemover to the end
+        unioned
+            .transform(
+                LOCK_REMOVER_OPERATOR_NAME,
+                TypeInformation.of(Void.class),
+                new LockRemover(lockFactory, taskNames))
+            .forceNonParallel()
+            .uid("lock-remover-" + uidSuffix)
+            .slotSharingGroup(slotSharingGroup);
+      }
     }
 
-    private DataStream<TableChange> changeStream() {
+    private DataStream<TableChange> changeStream(TableLoader loader) {
       if (inputStream == null) {
         // Create a monitor source to provide the TableChange stream
         MonitorSource source =
             new MonitorSource(
-                tableLoader,
-                RateLimiterStrategy.perSecond(1.0 / rateLimit.getSeconds()),
-                maxReadBack);
+                loader, RateLimiterStrategy.perSecond(1.0 / rateLimit.getSeconds()), maxReadBack);
         return env.fromSource(source, WatermarkStrategy.noWatermarks(), SOURCE_OPERATOR_NAME)
             .uid(SOURCE_OPERATOR_NAME + uidSuffix)
             .slotSharingGroup(slotSharingGroup)
diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DeleteFilesProcessor.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DeleteFilesProcessor.java
index 8cf290708481..c3ef059e9c46 100644
--- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DeleteFilesProcessor.java
+++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DeleteFilesProcessor.java
@@ -26,7 +26,6 @@
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.flink.TableLoader;
 import org.apache.iceberg.io.BulkDeletionFailureException;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.SupportsBulkOperations;
@@ -50,12 +49,10 @@ public class DeleteFilesProcessor extends AbstractStreamOperator<Void>
   private transient Counter failedCounter;
   private transient Counter succeededCounter;
 
-  public DeleteFilesProcessor(String name, TableLoader tableLoader, int batchSize) {
+  public DeleteFilesProcessor(String name, Table table, int batchSize) {
     Preconditions.checkNotNull(name, "Name should no be null");
-    Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
+    Preconditions.checkNotNull(table, "Table should no be null");
 
-    tableLoader.open();
-    Table table = tableLoader.loadTable();
     FileIO fileIO = table.io();
     Preconditions.checkArgument(
         fileIO instanceof SupportsBulkOperations,
diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java
index 58581e2a7e6c..a09d0244e95d 100644
--- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java
+++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java
@@ -121,6 +121,7 @@ public void processElement(Trigger trigger, Context ctx, Collector<TaskResult> o
   public void close() throws Exception {
     super.close();
 
+    tableLoader.close();
     if (plannerPoolSize != null) {
       plannerPool.shutdown();
     }
diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshots.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshots.java
index 31273f9d7d0d..bc41bc9f7e06 100644
--- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshots.java
+++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshots.java
@@ -28,7 +28,6 @@
 import java.util.Set;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
-import org.apache.iceberg.SerializableTable;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.flink.SimpleDataUtil;
@@ -47,6 +46,7 @@ class TestExpireSnapshots extends MaintenanceTaskTestBase {
   void before() {
     MetricsReporterFactoryForTests.reset();
     this.table = createTable();
+    tableLoader().open();
   }
 
   @Test
@@ -95,8 +95,6 @@ void testFailure() throws Exception {
     insert(table, 1, "a");
     insert(table, 2, "b");
 
-    SerializableTable serializableTable = (SerializableTable) SerializableTable.copyOf(table);
-
     ExpireSnapshots.builder()
         .append(
             infra.triggerStream(),
@@ -130,7 +128,8 @@ void testFailure() throws Exception {
       closeJobClient(jobClient);
     }
 
-    // Check the metrics
+    // Check the metrics. There are no expired snapshots or data files because ExpireSnapshots has
+    // no max age of number of snapshots set, so no files are removed
     MetricsReporterFactoryForTests.assertCounters(
         new ImmutableMap.Builder<String, Long>()
             .put(
diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenance.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenance.java
index fee0b65a3754..f4c1f8380e89 100644
--- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenance.java
+++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenance.java
@@ -224,7 +224,7 @@ void testMetrics() throws Exception {
   }
 
   @Test
-  void testUidAndSlotSharingGroup() {
+  void testUidAndSlotSharingGroup() throws IOException {
     TableMaintenance.forChangeStream(
             new ManualSource<>(env, TypeInformation.of(TableChange.class)).dataStream(),
             tableLoader(),
@@ -243,7 +243,7 @@ void testUidAndSlotSharingGroup() {
   }
 
   @Test
-  void testUidAndSlotSharingGroupUnset() {
+  void testUidAndSlotSharingGroupUnset() throws IOException {
     TableMaintenance.forChangeStream(
             new ManualSource<>(env, TypeInformation.of(TableChange.class)).dataStream(),
             tableLoader(),
@@ -256,7 +256,7 @@ void testUidAndSlotSharingGroupUnset() {
   }
 
   @Test
-  void testUidAndSlotSharingGroupInherit() {
+  void testUidAndSlotSharingGroupInherit() throws IOException {
     TableMaintenance.forChangeStream(
             new ManualSource<>(env, TypeInformation.of(TableChange.class)).dataStream(),
             tableLoader(),
@@ -271,7 +271,7 @@ void testUidAndSlotSharingGroupInherit() {
   }
 
   @Test
-  void testUidAndSlotSharingGroupOverWrite() {
+  void testUidAndSlotSharingGroupOverWrite() throws IOException {
     String anotherUid = "Another-UID";
     String anotherSlotSharingGroup = "Another-SlotSharingGroup";
     TableMaintenance.forChangeStream(
@@ -312,7 +312,7 @@ void testUidAndSlotSharingGroupOverWrite() {
   }
 
   @Test
-  void testUidAndSlotSharingGroupForMonitorSource() {
+  void testUidAndSlotSharingGroupForMonitorSource() throws IOException {
     TableMaintenance.forTable(env, tableLoader(), LOCK_FACTORY)
         .uidSuffix(UID_SUFFIX)
         .slotSharingGroup(SLOT_SHARING_GROUP)
diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDeleteFilesProcessor.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDeleteFilesProcessor.java
index 3f0cccf08718..1160f6bff601 100644
--- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDeleteFilesProcessor.java
+++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDeleteFilesProcessor.java
@@ -87,9 +87,11 @@ void testInvalidURIScheme() throws Exception {
   }
 
   private void deleteFile(TableLoader tableLoader, String fileName) throws Exception {
+    tableLoader().open();
     try (OneInputStreamOperatorTestHarness<String, Void> testHarness =
         new OneInputStreamOperatorTestHarness<>(
-            new DeleteFilesProcessor(DUMMY_NAME, tableLoader, 10), StringSerializer.INSTANCE)) {
+            new DeleteFilesProcessor(DUMMY_NAME, tableLoader.loadTable(), 10),
+            StringSerializer.INSTANCE)) {
       testHarness.open();
       testHarness.processElement(fileName, System.currentTimeMillis());
       testHarness.processWatermark(EVENT_TIME);