diff --git a/dinky-admin/src/main/java/org/dinky/init/EnvInit.java b/dinky-admin/src/main/java/org/dinky/init/EnvInit.java index 63bd5b1064..d3d875708d 100644 --- a/dinky-admin/src/main/java/org/dinky/init/EnvInit.java +++ b/dinky-admin/src/main/java/org/dinky/init/EnvInit.java @@ -19,6 +19,12 @@ package org.dinky.init; +import org.apache.hadoop.fs.FileSystem; + +import java.io.IOException; + +import javax.annotation.PreDestroy; + import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.context.ApplicationContext; @@ -61,4 +67,9 @@ public void run(ApplicationArguments args) throws Exception { ipAddress, port); } + + @PreDestroy + private void destroy() throws IOException { + FileSystem.closeAll(); + } } diff --git a/dinky-client/dinky-client-1.14/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/dinky-client/dinky-client-1.14/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java index 5a1cfddcfa..b3ef234f9c 100644 --- a/dinky-client/dinky-client-1.14/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java +++ b/dinky-client/dinky-client-1.14/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java @@ -1004,7 +1004,9 @@ localizedKeytabPath, new Path(keytab), "", LocalResourceType.FILE, false, false) } amContainer.setLocalResources(fileUploader.getRegisteredLocalResources()); - fileUploader.close(); + // overwrite zh:这里必需需要剔除close,因为它会关闭filesystem,导致并发提交出现异常 en: Close must be culled here, as it closes the + // filesystem, causing an exception to the concurrent commit + // fileUploader.close(); // Setup CLASSPATH and environment variables for ApplicationMaster final Map appMasterEnv = new HashMap<>(); @@ -1494,7 +1496,9 @@ public void run() { throw new IOException("Deleting files in " + yarnFilesDir + " was unsuccessful"); } - fs.close(); + // overwrite zh:这里必需需要剔除close,因为它会关闭filesystem,导致并发提交出现异常 en: Close must be culled here, as it closes + // the filesystem, causing an exception to the concurrent commit + // fs.close(); } catch (IOException e) { LOG.error("Failed to delete Flink Jar and configuration files in HDFS", e); } diff --git a/dinky-client/dinky-client-1.15/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/dinky-client/dinky-client-1.15/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java index 0dfc464cb2..7d90b5adc6 100644 --- a/dinky-client/dinky-client-1.15/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java +++ b/dinky-client/dinky-client-1.15/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java @@ -1037,7 +1037,9 @@ localizedKeytabPath, new Path(keytab), "", LocalResourceType.FILE, false, false) } amContainer.setLocalResources(fileUploader.getRegisteredLocalResources()); - fileUploader.close(); + // overwrite zh:这里必需需要剔除close,因为它会关闭filesystem,导致并发提交出现异常 en: Close must be culled here, as it closes the + // filesystem, causing an exception to the concurrent commit + // fileUploader.close(); // Setup CLASSPATH and environment variables for ApplicationMaster final Map appMasterEnv = new HashMap<>(); @@ -1545,7 +1547,9 @@ public void run() { throw new IOException("Deleting files in " + yarnFilesDir + " was unsuccessful"); } - fs.close(); + // overwrite zh:这里必需需要剔除close,因为它会关闭filesystem,导致并发提交出现异常 en: Close must be culled here, as it closes + // the filesystem, causing an exception to the concurrent commit + // fs.close(); } catch (IOException e) { LOG.error("Failed to delete Flink Jar and configuration files in HDFS", e); } diff --git a/dinky-client/dinky-client-1.16/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/dinky-client/dinky-client-1.16/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java index 35aed0eaa3..0e44aa5a0f 100644 --- a/dinky-client/dinky-client-1.16/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java +++ b/dinky-client/dinky-client-1.16/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java @@ -1054,7 +1054,9 @@ localizedKeytabPath, new Path(keytab), "", LocalResourceType.FILE, false, false) } amContainer.setLocalResources(fileUploader.getRegisteredLocalResources()); - fileUploader.close(); + // overwrite zh:这里必需需要剔除close,因为它会关闭filesystem,导致并发提交出现异常 en: Close must be culled here, as it closes the + // filesystem, causing an exception to the concurrent commit + // fileUploader.close(); // Setup CLASSPATH and environment variables for ApplicationMaster final Map appMasterEnv = generateApplicationMasterEnv( @@ -1550,7 +1552,9 @@ public void run() { throw new IOException("Deleting files in " + yarnFilesDir + " was unsuccessful"); } - fs.close(); + // overwrite zh:这里必需需要剔除close,因为它会关闭filesystem,导致并发提交出现异常 en: Close must be culled here, as it closes + // the filesystem, causing an exception to the concurrent commit + // fs.close(); } catch (IOException e) { LOG.error("Failed to delete Flink Jar and configuration files in HDFS", e); } diff --git a/dinky-client/dinky-client-1.17/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/dinky-client/dinky-client-1.17/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java index 3be15079c8..7f912c0541 100644 --- a/dinky-client/dinky-client-1.17/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java +++ b/dinky-client/dinky-client-1.17/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java @@ -1047,7 +1047,9 @@ localizedKeytabPath, new Path(keytab), "", LocalResourceType.FILE, false, false) } amContainer.setLocalResources(fileUploader.getRegisteredLocalResources()); - fileUploader.close(); + // overwrite zh:这里必需需要剔除close,因为它会关闭filesystem,导致并发提交出现异常 en: Close must be culled here, as it closes the + // filesystem, causing an exception to the concurrent commit + // fileUploader.close(); // Setup CLASSPATH and environment variables for ApplicationMaster final Map appMasterEnv = generateApplicationMasterEnv( @@ -1566,7 +1568,9 @@ public void run() { throw new IOException("Deleting files in " + yarnFilesDir + " was unsuccessful"); } - fs.close(); + // overwrite zh:这里必需需要剔除close,因为它会关闭filesystem,导致并发提交出现异常 en: Close must be culled here, as it closes + // the filesystem, causing an exception to the concurrent commit + // fs.close(); } catch (IOException e) { LOG.error("Failed to delete Flink Jar and configuration files in HDFS", e); } diff --git a/dinky-client/dinky-client-1.18/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/dinky-client/dinky-client-1.18/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java index 2628bbc76b..3abeefacb3 100644 --- a/dinky-client/dinky-client-1.18/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java +++ b/dinky-client/dinky-client-1.18/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java @@ -1055,7 +1055,10 @@ localizedKeytabPath, new Path(keytab), "", LocalResourceType.FILE, false, false) } amContainer.setLocalResources(fileUploader.getRegisteredLocalResources()); - fileUploader.close(); + + // overwrite zh:这里必需需要剔除close,因为它会关闭filesystem,导致并发提交出现异常 en: Close must be culled here, as it closes the + // filesystem, causing an exception to the concurrent commit + // fileUploader.close(); Utils.setAclsFor(amContainer, flinkConfiguration); @@ -1576,7 +1579,9 @@ public void run() { throw new IOException("Deleting files in " + yarnFilesDir + " was unsuccessful"); } - fs.close(); + // overwrite zh:这里必需需要剔除close,因为它会关闭filesystem,导致并发提交出现异常 en: Close must be culled here, as it closes + // the filesystem, causing an exception to the concurrent commit + // fs.close(); } catch (IOException e) { LOG.error("Failed to delete Flink Jar and configuration files in HDFS", e); } diff --git a/dinky-client/dinky-client-1.19/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/dinky-client/dinky-client-1.19/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java index 05d79728cd..ac6497fde8 100644 --- a/dinky-client/dinky-client-1.19/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java +++ b/dinky-client/dinky-client-1.19/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java @@ -1093,7 +1093,9 @@ localizedKeytabPath, new Path(keytab), "", LocalResourceType.FILE, false, false) } amContainer.setLocalResources(fileUploader.getRegisteredLocalResources()); - fileUploader.close(); + // overwrite zh:这里必需需要剔除close,因为它会关闭filesystem,导致并发提交出现异常 en: Close must be culled here, as it closes the + // filesystem, causing an exception to the concurrent commit + // fileUploader.close(); Utils.setAclsFor(amContainer, flinkConfiguration); @@ -1613,7 +1615,9 @@ public void run() { throw new IOException("Deleting files in " + yarnFilesDir + " was unsuccessful"); } - fs.close(); + // overwrite zh:这里必需需要剔除close,因为它会关闭filesystem,导致并发提交出现异常 en: Close must be culled here, as it closes + // the filesystem, causing an exception to the concurrent commit + // fs.close(); } catch (IOException e) { LOG.error("Failed to delete Flink Jar and configuration files in HDFS", e); } diff --git a/dinky-client/dinky-client-1.20/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/dinky-client/dinky-client-1.20/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java index f95bcd25bc..01ff1f40a2 100644 --- a/dinky-client/dinky-client-1.20/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java +++ b/dinky-client/dinky-client-1.20/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java @@ -1091,7 +1091,9 @@ localizedKeytabPath, new Path(keytab), "", LocalResourceType.FILE, false, false) } amContainer.setLocalResources(fileUploader.getRegisteredLocalResources()); - fileUploader.close(); + // overwrite zh:这里必需需要剔除close,因为它会关闭filesystem,导致并发提交出现异常 en: Close must be culled here, as it closes the + // filesystem, causing an exception to the concurrent commit + // fileUploader.close(); Utils.setAclsFor(amContainer, flinkConfiguration); @@ -1612,7 +1614,11 @@ public void run() { throw new IOException("Deleting files in " + yarnFilesDir + " was unsuccessful"); } - fs.close(); + // overwrite zh:这里必需需要剔除close,因为它会关闭filesystem,导致并发提交出现异常 en: Close must be culled here, as it closes + // the filesystem, causing an exception to the concurrent commit + // overwrite zh:这里必需需要剔除close,因为它会关闭filesystem,导致并发提交出现异常 en: Close must be culled here, as it closes + // the filesystem, causing an exception to the concurrent commit + // fs.close(); } catch (IOException e) { LOG.error("Failed to delete Flink Jar and configuration files in HDFS", e); }