Skip to content

Commit

Permalink
[BugFix][Flink]Fix execute jar submit in yarn-application (#3820)
Browse files Browse the repository at this point in the history
Co-authored-by: zackyoungh <[email protected]>
  • Loading branch information
zackyoungh and zackyoungh authored Sep 20, 2024
1 parent 2387318 commit 1c657bd
Show file tree
Hide file tree
Showing 9 changed files with 75 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,7 @@ public TaskDTO prepareTask(TaskSubmitDto submitDto) {

@ProcessStep(type = ProcessStepType.SUBMIT_EXECUTE)
public JobResult executeJob(TaskDTO task) throws Exception {
JobResult jobResult = BaseTask.getTask(task).execute();
log.info("execute job finished,status is {}", jobResult.getStatus());
return jobResult;
return executeJob(task, false);
}

@ProcessStep(type = ProcessStepType.SUBMIT_EXECUTE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.dinky.service.task;

import org.dinky.config.Dialect;
import org.dinky.context.TaskContextHolder;
import org.dinky.data.annotations.SupportDialect;
import org.dinky.data.dto.TaskDTO;
import org.dinky.data.exception.NotSupportExplainExcepition;
Expand Down Expand Up @@ -64,6 +65,7 @@ public static BaseTask getTask(TaskDTO taskDTO) {
if (annotation != null) {
for (Dialect dialect : annotation.value()) {
if (dialect.isDialect(taskDTO.getDialect())) {
TaskContextHolder.setDialect(dialect);
return (BaseTask) ReflectUtil.newInstance(clazz, taskDTO);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

@Profile("!test")
public class RsURLStreamHandlerFactory implements URLStreamHandlerFactory {
private final List<String> notContains = Arrays.asList("jar", "file");
private final List<String> notContains = Arrays.asList("jar", "file", "http", "https");

@Override
public URLStreamHandler createURLStreamHandler(String protocol) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import static org.dinky.utils.RunTimeUtil.extractArgs;

import org.dinky.config.Dialect;
import org.dinky.context.TaskContextHolder;
import org.dinky.executor.CustomTableEnvironment;
import org.dinky.trans.AbstractOperation;
import org.dinky.trans.ExtendOperation;
Expand Down Expand Up @@ -61,7 +63,11 @@ public ExecuteJarOperation(String statement) {
public Optional<? extends TableResult> execute(CustomTableEnvironment tEnv) {
try {
StreamExecutionEnvironment streamExecutionEnvironment = tEnv.getStreamExecutionEnvironment();
FlinkStreamEnvironmentUtil.executeAsync(getStreamGraph(tEnv), streamExecutionEnvironment);
if (TaskContextHolder.getDialect().equals(Dialect.FLINK_JAR)) {
FlinkStreamEnvironmentUtil.executeAsync(getStreamGraph(tEnv), streamExecutionEnvironment);
} else {
throw new RuntimeException("Please perform Execute jar syntax in the FlinkJar task !");
}
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,22 @@

import java.net.URLStreamHandler;
import java.net.URLStreamHandlerFactory;
import java.util.Arrays;
import java.util.List;

import cn.hutool.core.util.StrUtil;

public class RsURLStreamHandlerFactory implements URLStreamHandlerFactory {
private static final String PREFIX = "sun.net.www.protocol";
private final List<String> notContains = Arrays.asList("jar", "file", "http", "https");

@Override
public URLStreamHandler createURLStreamHandler(String protocol) {
for (String tempProtocol : notContains) {
if (tempProtocol.equals(StrUtil.sub(protocol, 0, tempProtocol.length()))) {
return null;
}
}
if (ResourceFileSystem.URI_SCHEMA.getScheme().equals(protocol)) {
return new RsURLStreamHandler();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.context;

import org.dinky.config.Dialect;

import java.util.Optional;

public class TaskContextHolder {
private static final ThreadLocal<Dialect> DIALECT_THREAD_LOCAL = new InheritableThreadLocal<>();

public static Dialect getDialect() {
return Optional.ofNullable(DIALECT_THREAD_LOCAL.get())
.orElseThrow(() -> new RuntimeException("task dialect is null"));
}

public static void setDialect(Dialect dialect) {
DIALECT_THREAD_LOCAL.set(dialect);
}
}
4 changes: 2 additions & 2 deletions dinky-core/src/main/java/org/dinky/executor/Executor.java
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,8 @@ public void initPyUDF(String executable, String... udfPyFilePath) {
}

Configuration configuration = tableEnvironment.getConfig().getConfiguration();
configuration.setString(PythonOptions.PYTHON_FILES, String.join(",", udfPyFilePath));
configuration.setString(PythonOptions.PYTHON_CLIENT_EXECUTABLE, executable);
configuration.set(PythonOptions.PYTHON_FILES, String.join(",", udfPyFilePath));
configuration.set(PythonOptions.PYTHON_CLIENT_EXECUTABLE, executable);
}

private void addJar(String... jarPath) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
Expand Down Expand Up @@ -117,6 +118,7 @@ public void run() throws Exception {
}

private GatewayResult submitGateway() throws Exception {
configuration.set(PipelineOptions.JARS, getUris(job.getStatement()));
config.addGatewayConfig(configuration);
config.getGatewayConfig().setSql(job.getStatement());
return Gateway.build(config.getGatewayConfig()).submitJar(jobManager.getUdfPathContextHolder());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;

import java.io.File;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.URLUtil;

/**
Expand All @@ -58,8 +59,9 @@ public GatewayType getType() {
}

/**
* format url
* <p>if url is rs protocol, convert to file path</p>
* format url
* <p>if url is rs protocol, convert to file path</p>
*
* @param url url
* @return formatted url
*/
Expand All @@ -77,8 +79,11 @@ public GatewayResult submitJar(FlinkUdfPathContextHolder udfPathContextHolder) {
init();
}

List<String> beforePipelineJars = configuration.get(PipelineOptions.JARS);

AppConfig appConfig = config.getAppConfig();
configuration.set(PipelineOptions.JARS, Collections.singletonList(formatUrl(appConfig.getUserJarPath())));

configuration.setString(
"python.files",
udfPathContextHolder.getPyUdfFile().stream().map(File::getName).collect(Collectors.joining(",")));
Expand All @@ -90,13 +95,16 @@ public GatewayResult submitJar(FlinkUdfPathContextHolder udfPathContextHolder) {
createClusterSpecificationBuilder();
ApplicationConfiguration applicationConfiguration =
new ApplicationConfiguration(userJarParas, appConfig.getUserJarMainAppClass());

YarnResult result = YarnResult.build(getType());
String webUrl;
try (YarnClusterDescriptor yarnClusterDescriptor = createYarnClusterDescriptorWithJar(udfPathContextHolder)) {
ClusterDescriptorAdapterImpl clusterDescriptorAdapter =
new ClusterDescriptorAdapterImpl(yarnClusterDescriptor);
clusterDescriptorAdapter.addShipFiles(Arrays.asList(preparSqlFile()));
if (CollUtil.isNotEmpty(beforePipelineJars)) {
clusterDescriptorAdapter.addShipFiles(
beforePipelineJars.stream().map(URLUtils::toFile).collect(Collectors.toList()));
}
clusterDescriptorAdapter.addShipFiles(Collections.singletonList(preparSqlFile()));
addConfigParas(
CustomerConfigureOptions.EXEC_SQL_FILE, configuration.get(CustomerConfigureOptions.EXEC_SQL_FILE));

Expand Down

0 comments on commit 1c657bd

Please sign in to comment.