Skip to content

Commit

Permalink
[BugFix]Fix yarn parallel submit (#4042)
Browse files Browse the repository at this point in the history
Co-authored-by: zackyoungh <[email protected]>
  • Loading branch information
zackyoungh and zackyoungh authored Dec 13, 2024
1 parent 28d6c8b commit 0d980ad
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 14 deletions.
11 changes: 11 additions & 0 deletions dinky-admin/src/main/java/org/dinky/init/EnvInit.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@

package org.dinky.init;

import org.apache.hadoop.fs.FileSystem;

import java.io.IOException;

import javax.annotation.PreDestroy;

import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.ApplicationContext;
Expand Down Expand Up @@ -61,4 +67,9 @@ public void run(ApplicationArguments args) throws Exception {
ipAddress,
port);
}

@PreDestroy
private void destroy() throws IOException {
FileSystem.closeAll();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1004,7 +1004,9 @@ localizedKeytabPath, new Path(keytab), "", LocalResourceType.FILE, false, false)
}

amContainer.setLocalResources(fileUploader.getRegisteredLocalResources());
fileUploader.close();
// overwrite zh:这里必需需要剔除close,因为它会关闭filesystem,导致并发提交出现异常 en: Close must be culled here, as it closes the
// filesystem, causing an exception to the concurrent commit
// fileUploader.close();

// Setup CLASSPATH and environment variables for ApplicationMaster
final Map<String, String> appMasterEnv = new HashMap<>();
Expand Down Expand Up @@ -1494,7 +1496,9 @@ public void run() {
throw new IOException("Deleting files in " + yarnFilesDir + " was unsuccessful");
}

fs.close();
// overwrite zh:这里必需需要剔除close,因为它会关闭filesystem,导致并发提交出现异常 en: Close must be culled here, as it closes
// the filesystem, causing an exception to the concurrent commit
// fs.close();
} catch (IOException e) {
LOG.error("Failed to delete Flink Jar and configuration files in HDFS", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1037,7 +1037,9 @@ localizedKeytabPath, new Path(keytab), "", LocalResourceType.FILE, false, false)
}

amContainer.setLocalResources(fileUploader.getRegisteredLocalResources());
fileUploader.close();
// overwrite zh:这里必需需要剔除close,因为它会关闭filesystem,导致并发提交出现异常 en: Close must be culled here, as it closes the
// filesystem, causing an exception to the concurrent commit
// fileUploader.close();

// Setup CLASSPATH and environment variables for ApplicationMaster
final Map<String, String> appMasterEnv = new HashMap<>();
Expand Down Expand Up @@ -1545,7 +1547,9 @@ public void run() {
throw new IOException("Deleting files in " + yarnFilesDir + " was unsuccessful");
}

fs.close();
// overwrite zh:这里必需需要剔除close,因为它会关闭filesystem,导致并发提交出现异常 en: Close must be culled here, as it closes
// the filesystem, causing an exception to the concurrent commit
// fs.close();
} catch (IOException e) {
LOG.error("Failed to delete Flink Jar and configuration files in HDFS", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1054,7 +1054,9 @@ localizedKeytabPath, new Path(keytab), "", LocalResourceType.FILE, false, false)
}

amContainer.setLocalResources(fileUploader.getRegisteredLocalResources());
fileUploader.close();
// overwrite zh:这里必需需要剔除close,因为它会关闭filesystem,导致并发提交出现异常 en: Close must be culled here, as it closes the
// filesystem, causing an exception to the concurrent commit
// fileUploader.close();

// Setup CLASSPATH and environment variables for ApplicationMaster
final Map<String, String> appMasterEnv = generateApplicationMasterEnv(
Expand Down Expand Up @@ -1550,7 +1552,9 @@ public void run() {
throw new IOException("Deleting files in " + yarnFilesDir + " was unsuccessful");
}

fs.close();
// overwrite zh:这里必需需要剔除close,因为它会关闭filesystem,导致并发提交出现异常 en: Close must be culled here, as it closes
// the filesystem, causing an exception to the concurrent commit
// fs.close();
} catch (IOException e) {
LOG.error("Failed to delete Flink Jar and configuration files in HDFS", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1047,7 +1047,9 @@ localizedKeytabPath, new Path(keytab), "", LocalResourceType.FILE, false, false)
}

amContainer.setLocalResources(fileUploader.getRegisteredLocalResources());
fileUploader.close();
// overwrite zh:这里必需需要剔除close,因为它会关闭filesystem,导致并发提交出现异常 en: Close must be culled here, as it closes the
// filesystem, causing an exception to the concurrent commit
// fileUploader.close();

// Setup CLASSPATH and environment variables for ApplicationMaster
final Map<String, String> appMasterEnv = generateApplicationMasterEnv(
Expand Down Expand Up @@ -1566,7 +1568,9 @@ public void run() {
throw new IOException("Deleting files in " + yarnFilesDir + " was unsuccessful");
}

fs.close();
// overwrite zh:这里必需需要剔除close,因为它会关闭filesystem,导致并发提交出现异常 en: Close must be culled here, as it closes
// the filesystem, causing an exception to the concurrent commit
// fs.close();
} catch (IOException e) {
LOG.error("Failed to delete Flink Jar and configuration files in HDFS", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1055,7 +1055,10 @@ localizedKeytabPath, new Path(keytab), "", LocalResourceType.FILE, false, false)
}

amContainer.setLocalResources(fileUploader.getRegisteredLocalResources());
fileUploader.close();

// overwrite zh:这里必需需要剔除close,因为它会关闭filesystem,导致并发提交出现异常 en: Close must be culled here, as it closes the
// filesystem, causing an exception to the concurrent commit
// fileUploader.close();

Utils.setAclsFor(amContainer, flinkConfiguration);

Expand Down Expand Up @@ -1576,7 +1579,9 @@ public void run() {
throw new IOException("Deleting files in " + yarnFilesDir + " was unsuccessful");
}

fs.close();
// overwrite zh:这里必需需要剔除close,因为它会关闭filesystem,导致并发提交出现异常 en: Close must be culled here, as it closes
// the filesystem, causing an exception to the concurrent commit
// fs.close();
} catch (IOException e) {
LOG.error("Failed to delete Flink Jar and configuration files in HDFS", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1093,7 +1093,9 @@ localizedKeytabPath, new Path(keytab), "", LocalResourceType.FILE, false, false)
}

amContainer.setLocalResources(fileUploader.getRegisteredLocalResources());
fileUploader.close();
// overwrite zh:这里必需需要剔除close,因为它会关闭filesystem,导致并发提交出现异常 en: Close must be culled here, as it closes the
// filesystem, causing an exception to the concurrent commit
// fileUploader.close();

Utils.setAclsFor(amContainer, flinkConfiguration);

Expand Down Expand Up @@ -1613,7 +1615,9 @@ public void run() {
throw new IOException("Deleting files in " + yarnFilesDir + " was unsuccessful");
}

fs.close();
// overwrite zh:这里必需需要剔除close,因为它会关闭filesystem,导致并发提交出现异常 en: Close must be culled here, as it closes
// the filesystem, causing an exception to the concurrent commit
// fs.close();
} catch (IOException e) {
LOG.error("Failed to delete Flink Jar and configuration files in HDFS", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1091,7 +1091,9 @@ localizedKeytabPath, new Path(keytab), "", LocalResourceType.FILE, false, false)
}

amContainer.setLocalResources(fileUploader.getRegisteredLocalResources());
fileUploader.close();
// overwrite zh:这里必需需要剔除close,因为它会关闭filesystem,导致并发提交出现异常 en: Close must be culled here, as it closes the
// filesystem, causing an exception to the concurrent commit
// fileUploader.close();

Utils.setAclsFor(amContainer, flinkConfiguration);

Expand Down Expand Up @@ -1612,7 +1614,11 @@ public void run() {
throw new IOException("Deleting files in " + yarnFilesDir + " was unsuccessful");
}

fs.close();
// overwrite zh:这里必需需要剔除close,因为它会关闭filesystem,导致并发提交出现异常 en: Close must be culled here, as it closes
// the filesystem, causing an exception to the concurrent commit
// overwrite zh:这里必需需要剔除close,因为它会关闭filesystem,导致并发提交出现异常 en: Close must be culled here, as it closes
// the filesystem, causing an exception to the concurrent commit
// fs.close();
} catch (IOException e) {
LOG.error("Failed to delete Flink Jar and configuration files in HDFS", e);
}
Expand Down

0 comments on commit 0d980ad

Please sign in to comment.