diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java index 6f105e192156c..7833b179bc08b 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java @@ -53,6 +53,9 @@ public enum PackagedProgramUtils { ; + private static final String SQL_DRIVER_CLASS_NAME = + "org.apache.flink.table.runtime.application.SqlDriver"; + private static final String PYTHON_GATEWAY_CLASS_NAME = "org.apache.flink.client.python.PythonGatewayServer"; @@ -193,43 +196,21 @@ public static boolean isPython(String[] programArguments) { } public static URL getPythonJar() { - String flinkOptPath = System.getenv(ConfigConstants.ENV_FLINK_OPT_DIR); - final List pythonJarPath = new ArrayList<>(); - try { - Files.walkFileTree( - FileSystems.getDefault().getPath(flinkOptPath), - new SimpleFileVisitor() { - @Override - public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) - throws IOException { - FileVisitResult result = super.visitFile(file, attrs); - if (file.getFileName().toString().startsWith("flink-python")) { - pythonJarPath.add(file); - } - return result; - } - }); - } catch (IOException e) { - throw new RuntimeException( - "Exception encountered during finding the flink-python jar. This should not happen.", - e); - } - - if (pythonJarPath.size() != 1) { - throw new RuntimeException("Found " + pythonJarPath.size() + " flink-python jar."); - } - - try { - return pythonJarPath.get(0).toUri().toURL(); - } catch (MalformedURLException e) { - throw new RuntimeException("URL is invalid. This should not happen.", e); - } + return getOptJar("flink-python"); } public static String getPythonDriverClassName() { return PYTHON_DRIVER_CLASS_NAME; } + public static boolean isSqlApplication(String entryPointClassName) { + return (entryPointClassName != null) && (entryPointClassName.equals(SQL_DRIVER_CLASS_NAME)); + } + + public static URL getSqlGatewayJar() { + return getOptJar("flink-sql-gateway"); + } + public static URI resolveURI(String path) throws URISyntaxException { final URI uri = new URI(path); if (uri.getScheme() != null) { @@ -260,4 +241,39 @@ private static ProgramInvocationException generateException( stderr.length() == 0 ? "(none)" : stderr), cause); } + + private static URL getOptJar(String jarName) { + String flinkOptPath = System.getenv(ConfigConstants.ENV_FLINK_OPT_DIR); + final List optJarPath = new ArrayList<>(); + try { + Files.walkFileTree( + FileSystems.getDefault().getPath(flinkOptPath), + new SimpleFileVisitor() { + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) + throws IOException { + FileVisitResult result = super.visitFile(file, attrs); + if (file.getFileName().toString().startsWith(jarName)) { + optJarPath.add(file); + } + return result; + } + }); + } catch (IOException e) { + throw new RuntimeException( + "Exception encountered during finding the flink-python jar. This should not happen.", + e); + } + + if (optJarPath.size() != 1) { + throw new RuntimeException( + String.format("Found " + optJarPath.size() + " %s jar.", jarName)); + } + + try { + return optJarPath.get(0).toUri().toURL(); + } catch (MalformedURLException e) { + throw new RuntimeException("URL is invalid. This should not happen.", e); + } + } } diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/CreateTableAsITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/CreateTableAsITCase.java index 35a8ff605d17f..c8477deb04142 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/CreateTableAsITCase.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/CreateTableAsITCase.java @@ -28,6 +28,7 @@ import org.junit.Test; +import java.net.URI; import java.time.Duration; import java.util.Arrays; import java.util.Collections; @@ -69,11 +70,13 @@ protected List formatRawResult(List rawResult) { } @Override - protected void executeSqlStatements(ClusterController clusterController, List sqlLines) + protected void executeSqlStatements( + ClusterController clusterController, List sqlLines, List dependencies) throws Exception { clusterController.submitSQLJob( new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines) .addJar(SQL_TOOL_BOX_JAR) + .addJars(dependencies.toArray(new URI[0])) .build(), Duration.ofMinutes(2L)); } diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/HdfsITCaseBase.java b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/HdfsITCaseBase.java index 4aa7601c03ce3..8aae8efcc7124 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/HdfsITCaseBase.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/HdfsITCaseBase.java @@ -36,6 +36,7 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; +import java.net.URI; import java.nio.file.Path; import java.time.Duration; import java.util.List; @@ -91,10 +92,12 @@ protected void destroyHDFS() { } @Override - protected void executeSqlStatements(ClusterController clusterController, List sqlLines) + protected void executeSqlStatements( + ClusterController clusterController, List sqlLines, List dependencies) throws Exception { clusterController.submitSQLJob( new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines) + .addJars(dependencies.toArray(new URI[0])) .setEnvProcessor( map -> map.put("HADOOP_CLASSPATH", getHadoopClassPathContent())) .build(), diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/PlannerScalaFreeITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/PlannerScalaFreeITCase.java index b6433e2c76a44..2ce332ddb63e9 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/PlannerScalaFreeITCase.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/PlannerScalaFreeITCase.java @@ -28,6 +28,7 @@ import org.junit.Test; +import java.net.URI; import java.time.Duration; import java.util.Arrays; import java.util.Collections; @@ -76,11 +77,13 @@ protected List formatRawResult(List rawResult) { } @Override - protected void executeSqlStatements(ClusterController clusterController, List sqlLines) + protected void executeSqlStatements( + ClusterController clusterController, List sqlLines, List dependencies) throws Exception { clusterController.submitSQLJob( new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines) .addJar(SQL_TOOL_BOX_JAR) + .addJars(dependencies.toArray(new URI[0])) .build(), Duration.ofMinutes(2L)); } diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/SqlITCaseBase.java b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/SqlITCaseBase.java index 2f448f6ec7467..41d1f55a1b23e 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/SqlITCaseBase.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/SqlITCaseBase.java @@ -49,6 +49,7 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; +import java.net.URI; import java.net.URL; import java.nio.file.Files; import java.nio.file.Path; @@ -125,23 +126,25 @@ public void runAndCheckSQL( runAndCheckSQL( sqlPath, Collections.singletonMap(result, resultItems), - Collections.singletonMap(result, formatter)); + Collections.singletonMap(result, formatter), + Collections.emptyList()); } public void runAndCheckSQL(String sqlPath, Map> resultItems) throws Exception { - runAndCheckSQL(sqlPath, resultItems, Collections.emptyMap()); + runAndCheckSQL(sqlPath, resultItems, Collections.emptyMap(), Collections.emptyList()); } public void runAndCheckSQL( String sqlPath, Map> resultItems, - Map, List>> formatters) + Map, List>> formatters, + List dependencies) throws Exception { try (ClusterController clusterController = flink.startCluster(1)) { List sqlLines = initializeSqlLines(sqlPath); - executeSqlStatements(clusterController, sqlLines); + executeSqlStatements(clusterController, sqlLines, dependencies); // Wait until all the results flushed to the json file. LOG.info("Verify the result."); @@ -163,7 +166,8 @@ protected Map generateReplaceVars() { } protected abstract void executeSqlStatements( - ClusterController clusterController, List sqlLines) throws Exception; + ClusterController clusterController, List sqlLines, List dependencies) + throws Exception; private List initializeSqlLines(String sqlPath) throws IOException { URL url = SqlITCaseBase.class.getClassLoader().getResource(sqlPath); diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/UsingRemoteJarITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/UsingRemoteJarITCase.java index f1cceeaf2cf55..48dfbb580cf17 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/UsingRemoteJarITCase.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/UsingRemoteJarITCase.java @@ -30,6 +30,7 @@ import org.junit.rules.TestName; import java.io.IOException; +import java.net.URI; import java.util.Arrays; import java.util.Collections; import java.util.Map; @@ -90,6 +91,25 @@ public void testUdfInRemoteJar() throws Exception { raw, USER_ORDER_SCHEMA, USER_ORDER_DESERIALIZATION_SCHEMA)); } + @Test + public void testCreateFunctionFromRemoteJarViaSqlClient() throws Exception { + runAndCheckSQL( + "sql_client_remote_jar_e2e.sql", + Collections.singletonMap(result, Arrays.asList("+I[Bob, 2]", "+I[Alice, 1]")), + Collections.singletonMap( + result, + raw -> + convertToMaterializedResult( + raw, USER_ORDER_SCHEMA, USER_ORDER_DESERIALIZATION_SCHEMA)), + Collections.singletonList( + URI.create( + String.format( + "hdfs://%s:%s/%s", + hdfsCluster.getURI().getHost(), + hdfsCluster.getNameNodePort(), + hdPath)))); + } + @Test public void testScalarUdfWhenCheckpointEnable() throws Exception { runAndCheckSQL( diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/resources/sql_client_remote_jar_e2e.sql b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/resources/sql_client_remote_jar_e2e.sql new file mode 100644 index 0000000000000..4c5735e2fb057 --- /dev/null +++ b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/resources/sql_client_remote_jar_e2e.sql @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE JsonTable ( + user_name STRING, + order_cnt BIGINT +) WITH ( + 'connector' = 'filesystem', + 'path' = '$RESULT', + 'sink.rolling-policy.rollover-interval' = '2s', + 'sink.rolling-policy.check-interval' = '2s', + 'format' = 'debezium-json' +); + +create function count_agg as 'org.apache.flink.table.toolbox.CountAggFunction' LANGUAGE JAVA; + +SET execution.runtime-mode = $MODE; +SET table.exec.mini-batch.enabled = true; +SET table.exec.mini-batch.size = 5; +SET table.exec.mini-batch.allow-latency = 2s; + +INSERT INTO JsonTable +SELECT user_name, count_agg(order_id) +FROM (VALUES (1, 'Bob'), (2, 'Bob'), (1, 'Alice')) T(order_id, user_name) +GROUP BY user_name; diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java index 2ecf977db64f3..f0b43f85f5504 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java @@ -26,14 +26,12 @@ import org.apache.flink.table.client.gateway.DefaultContextUtils; import org.apache.flink.table.client.gateway.Executor; import org.apache.flink.table.client.gateway.SingleSessionManager; -import org.apache.flink.table.client.gateway.SqlExecutionException; import org.apache.flink.table.gateway.SqlGateway; import org.apache.flink.table.gateway.rest.SqlGatewayRestEndpointFactory; import org.apache.flink.table.gateway.rest.util.SqlGatewayRestOptions; import org.apache.flink.table.gateway.service.context.DefaultContext; import org.apache.flink.util.NetUtils; -import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.SystemUtils; import org.jline.terminal.Terminal; import org.slf4j.Logger; @@ -42,16 +40,14 @@ import javax.annotation.Nullable; import java.io.Closeable; -import java.io.IOException; import java.net.InetSocketAddress; -import java.net.URL; -import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Arrays; import java.util.function.Supplier; import static org.apache.flink.table.client.cli.CliClient.DEFAULT_TERMINAL_FACTORY; +import static org.apache.flink.table.client.cli.CliUtils.isApplicationMode; import static org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactoryUtils.getSqlGatewayOptionPrefix; /** @@ -140,7 +136,11 @@ private void openCli(Executor executor) { try (CliClient cli = new CliClient(terminalFactory, executor, historyFilePath)) { if (options.getInitFile() != null) { - boolean success = cli.executeInitialization(readFromURL(options.getInitFile())); + if (isApplicationMode(executor.getSessionConfig())) { + throw new SqlClientException( + "Sql Client doesn't support to run init files when deploying script into cluster."); + } + boolean success = cli.executeInitialization(options.getInitFile()); if (!success) { System.out.println( String.format( @@ -158,7 +158,7 @@ private void openCli(Executor executor) { if (!hasSqlFile) { cli.executeInInteractiveMode(); } else { - cli.executeInNonInteractiveMode(readExecutionContent()); + cli.executeInNonInteractiveMode(options.getSqlFile()); } } } @@ -320,17 +320,4 @@ public void run() { System.out.println("done."); } } - - private String readExecutionContent() { - return readFromURL(options.getSqlFile()); - } - - private String readFromURL(URL file) { - try { - return IOUtils.toString(file, StandardCharsets.UTF_8); - } catch (IOException e) { - throw new SqlExecutionException( - String.format("Fail to read content from the %s.", file.getPath()), e); - } - } } diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java index 0acc4d5afd5a2..c88d64909be35 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java @@ -19,6 +19,8 @@ package org.apache.flink.table.client.cli; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FileSystem; import org.apache.flink.table.client.SqlClientException; import org.apache.flink.table.client.cli.parser.SqlClientSyntaxHighlighter; import org.apache.flink.table.client.cli.parser.SqlCommandParserImpl; @@ -26,7 +28,10 @@ import org.apache.flink.table.client.config.SqlClientOptions; import org.apache.flink.table.client.gateway.Executor; import org.apache.flink.table.client.gateway.SqlExecutionException; +import org.apache.flink.util.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; import org.jline.reader.EndOfFileException; import org.jline.reader.LineReader; import org.jline.reader.LineReaderBuilder; @@ -47,10 +52,17 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.URI; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.util.function.Supplier; +import static org.apache.flink.table.client.cli.CliStrings.MESSAGE_DEPLOY_SCRIPT; +import static org.apache.flink.table.client.cli.CliStrings.messageInfo; +import static org.apache.flink.table.client.cli.CliUtils.isApplicationMode; + /** SQL CLI client. */ public class CliClient implements AutoCloseable { @@ -125,21 +137,37 @@ void executeInInteractiveMode(LineReader lineReader) { } /** Opens the non-interactive CLI shell. */ - public void executeInNonInteractiveMode(String content) { + public void executeInNonInteractiveMode(URI uri) { try { terminal = terminalFactory.get(); - executeFile(content, terminal.output(), ExecutionMode.NON_INTERACTIVE_EXECUTION); + if (isApplicationMode(executor.getSessionConfig())) { + String scheme = StringUtils.lowerCase(uri.getScheme()); + String clusterId; + // local files + if (scheme == null || scheme.equals("file")) { + clusterId = executor.deployScript(readFile(uri), null); + } else { + clusterId = executor.deployScript(null, uri); + } + terminal.writer().println(messageInfo(MESSAGE_DEPLOY_SCRIPT).toAnsi()); + terminal.writer().println(String.format("Cluster ID: %s\n", clusterId)); + terminal.flush(); + } else { + executeFile( + readFile(uri), terminal.output(), ExecutionMode.NON_INTERACTIVE_EXECUTION); + } } finally { closeTerminal(); } } /** Initialize the Cli Client with the content. */ - public boolean executeInitialization(String content) { + public boolean executeInitialization(URI file) { try { OutputStream outputStream = new ByteArrayOutputStream(256); terminal = TerminalUtils.createDumbTerminal(outputStream); - boolean success = executeFile(content, outputStream, ExecutionMode.INITIALIZATION); + boolean success = + executeFile(readFile(file), outputStream, ExecutionMode.INITIALIZATION); LOG.info(outputStream.toString()); return success; } finally { @@ -326,4 +354,39 @@ private LineReader createLineReader(Terminal terminal, ExecutionMode mode) { } return lineReader; } + + public static String readFile(URI uri) { + try { + if (uri.getScheme() != null + && (uri.getScheme().equals("http") || uri.getScheme().equals("https"))) { + return readFromHttp(uri); + } else { + return readFileUtf8(uri); + } + } catch (IOException e) { + throw new SqlClientException("Failed to read file " + uri, e); + } + } + + private static String readFromHttp(URI uri) throws IOException { + HttpURLConnection conn = (HttpURLConnection) uri.toURL().openConnection(); + + conn.setRequestMethod("GET"); + + try (InputStream inputStream = conn.getInputStream(); + ByteArrayOutputStream targetFile = new ByteArrayOutputStream()) { + IOUtils.copy(inputStream, targetFile); + return targetFile.toString(StandardCharsets.UTF_8); + } + } + + private static String readFileUtf8(URI uri) throws IOException { + org.apache.flink.core.fs.Path path = new org.apache.flink.core.fs.Path(uri.toString()); + FileSystem fs = path.getFileSystem(); + try (FSDataInputStream inputStream = fs.open(path)) { + return new String( + FileUtils.read(inputStream, (int) fs.getFileStatus(path).getLen()), + StandardCharsets.UTF_8); + } + } } diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptions.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptions.java index 3d1aa802abdf9..1177c9cbabaa0 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptions.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptions.java @@ -22,6 +22,7 @@ import javax.annotation.Nullable; +import java.net.URI; import java.net.URL; import java.util.List; import java.util.Optional; @@ -35,16 +36,16 @@ public class CliOptions { private final boolean isPrintHelp; private final String sessionId; - private final URL initFile; - private final URL sqlFile; + private final URI initFile; + private final URI sqlFile; private final String historyFilePath; private final Properties sessionConfig; private CliOptions( boolean isPrintHelp, String sessionId, - URL initFile, - URL sqlFile, + URI initFile, + URI sqlFile, String historyFilePath, Properties sessionConfig) { this.isPrintHelp = isPrintHelp; @@ -63,11 +64,11 @@ public String getSessionId() { return sessionId; } - public @Nullable URL getInitFile() { + public @Nullable URI getInitFile() { return initFile; } - public @Nullable URL getSqlFile() { + public @Nullable URI getSqlFile() { return sqlFile; } @@ -82,19 +83,19 @@ public Properties getSessionConfig() { /** Command option lines to configure SQL Client in the embedded mode. */ public static class EmbeddedCliOptions extends CliOptions { - private final List jars; - private final List libraryDirs; + private final List jars; + private final List libraryDirs; private final Configuration pythonConfiguration; public EmbeddedCliOptions( boolean isPrintHelp, String sessionId, - URL initFile, - URL sqlFile, + URI initFile, + URI sqlFile, String historyFilePath, - List jars, - List libraryDirs, + List jars, + List libraryDirs, Configuration pythonConfiguration, Properties sessionConfig) { super(isPrintHelp, sessionId, initFile, sqlFile, historyFilePath, sessionConfig); @@ -103,11 +104,11 @@ public EmbeddedCliOptions( this.pythonConfiguration = pythonConfiguration; } - public List getJars() { + public List getJars() { return jars; } - public List getLibraryDirs() { + public List getLibraryDirs() { return libraryDirs; } @@ -124,8 +125,8 @@ public static class GatewayCliOptions extends CliOptions { GatewayCliOptions( boolean isPrintHelp, String sessionId, - URL initFile, - URL sqlFile, + URI initFile, + URI sqlFile, String historyFilePath, @Nullable URL gatewayAddress, Properties sessionConfig) { diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java index debd88df71899..59fe727e19589 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java @@ -19,7 +19,6 @@ package org.apache.flink.table.client.cli; import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.Path; import org.apache.flink.table.client.SqlClientException; import org.apache.flink.util.NetUtils; @@ -32,11 +31,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; +import javax.annotation.Nullable; + import java.io.PrintWriter; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.MalformedURLException; +import java.net.URI; import java.net.URL; import java.util.Arrays; import java.util.List; @@ -249,11 +250,11 @@ public static CliOptions.EmbeddedCliOptions parseEmbeddedModeClient(String[] arg return new CliOptions.EmbeddedCliOptions( line.hasOption(CliOptionsParser.OPTION_HELP.getOpt()), checkSessionId(line), - checkUrl(line, CliOptionsParser.OPTION_INIT_FILE), - checkUrl(line, CliOptionsParser.OPTION_FILE), + parseURI(line, CliOptionsParser.OPTION_INIT_FILE), + parseURI(line, CliOptionsParser.OPTION_FILE), line.getOptionValue(CliOptionsParser.OPTION_HISTORY.getOpt()), - checkUrls(line, CliOptionsParser.OPTION_JAR), - checkUrls(line, CliOptionsParser.OPTION_LIBRARY), + parseURIs(line, CliOptionsParser.OPTION_JAR), + parseURIs(line, CliOptionsParser.OPTION_LIBRARY), getPythonConfiguration(line), line.getOptionProperties(OPTION_SESSION_CONFIG.getOpt())); } catch (ParseException e) { @@ -268,8 +269,8 @@ public static CliOptions parseGatewayModeClient(String[] args) { return new CliOptions.GatewayCliOptions( line.hasOption(CliOptionsParser.OPTION_HELP.getOpt()), checkSessionId(line), - checkUrl(line, CliOptionsParser.OPTION_INIT_FILE), - checkUrl(line, CliOptionsParser.OPTION_FILE), + parseURI(line, CliOptionsParser.OPTION_INIT_FILE), + parseURI(line, CliOptionsParser.OPTION_FILE), line.getOptionValue(CliOptionsParser.OPTION_HISTORY.getOpt()), line.hasOption(CliOptionsParser.OPTION_ENDPOINT_ADDRESS.getOpt()) ? parseGatewayAddress( @@ -308,32 +309,30 @@ private static URL parseGatewayAddress(String cliOptionAddress) { // -------------------------------------------------------------------------------------------- - private static URL checkUrl(CommandLine line, Option option) { - final List urls = checkUrls(line, option); - if (urls != null && !urls.isEmpty()) { - return urls.get(0); + private static @Nullable URI parseURI(CommandLine line, Option option) { + List uris = parseURIs(line, option); + if (uris == null || uris.isEmpty()) { + return null; + } else { + return uris.get(0); } - return null; } - private static List checkUrls(CommandLine line, Option option) { + private static @Nullable List parseURIs(CommandLine line, Option option) { if (line.hasOption(option.getOpt())) { - final String[] urls = line.getOptionValues(option.getOpt()); - return Arrays.stream(urls) + final String[] uris = line.getOptionValues(option.getOpt()); + return Arrays.stream(uris) .distinct() .map( - (url) -> { - checkFilePath(url); + uri -> { try { - return Path.fromLocalFile(new File(url).getAbsoluteFile()) - .toUri() - .toURL(); + return URI.create(uri); } catch (Exception e) { throw new SqlClientException( - "Invalid path for option '" + "Invalid uri for option '" + option.getLongOpt() + "': " - + url, + + uri, e); } }) @@ -342,14 +341,6 @@ private static List checkUrls(CommandLine line, Option option) { return null; } - public static void checkFilePath(String filePath) { - Path path = new Path(filePath); - String scheme = path.toUri().getScheme(); - if (scheme != null && !scheme.equals("file")) { - throw new SqlClientException("SQL Client only supports to load files in local."); - } - } - private static String checkSessionId(CommandLine line) { final String sessionId = line.getOptionValue(CliOptionsParser.OPTION_SESSION.getOpt()); if (sessionId == null) { diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java index 430d40145de63..1287ac4d7caa9 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java @@ -213,6 +213,8 @@ public AttributedString build() { public static final String MESSAGE_EXECUTE_STATEMENT = "Execute statement succeeded."; + public static final String MESSAGE_DEPLOY_SCRIPT = "Deploy script in application mode: "; + // -------------------------------------------------------------------------------------------- public static final String RESULT_TITLE = "SQL Query Result"; diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliUtils.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliUtils.java index 0f068762a06d8..4b9e9934ea053 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliUtils.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliUtils.java @@ -19,8 +19,8 @@ package org.apache.flink.table.client.cli; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.configuration.ReadableConfig; -import org.apache.flink.table.api.config.TableConfigOptions; import org.apache.flink.table.types.DataType; import org.jline.utils.AttributedString; @@ -29,7 +29,6 @@ import java.nio.file.Files; import java.nio.file.Path; -import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.Arrays; import java.util.Iterator; @@ -121,11 +120,8 @@ public static boolean createFile(final Path filePath) { } } - /** Get time zone from the given session config. */ - public static ZoneId getSessionTimeZone(ReadableConfig sessionConfig) { - final String zone = sessionConfig.get(TableConfigOptions.LOCAL_TIME_ZONE); - return TableConfigOptions.LOCAL_TIME_ZONE.defaultValue().equals(zone) - ? ZoneId.systemDefault() - : ZoneId.of(zone); + public static boolean isApplicationMode(ReadableConfig config) { + final String executionTarget = config.getOptional(DeploymentOptions.TARGET).orElse(""); + return executionTarget.trim().endsWith("application"); } } diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/DefaultContextUtils.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/DefaultContextUtils.java index 5f716d99ecbfa..5ad4d0bdceb7e 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/DefaultContextUtils.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/DefaultContextUtils.java @@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory; import java.io.File; +import java.net.URI; import java.net.URL; import java.util.ArrayList; import java.util.Collections; @@ -40,13 +41,13 @@ public class DefaultContextUtils { private static final Logger LOG = LoggerFactory.getLogger(DefaultContextUtils.class); public static DefaultContext buildDefaultContext(CliOptions.EmbeddedCliOptions options) { - final List jars; + final List jars; if (options.getJars() != null) { jars = options.getJars(); } else { jars = Collections.emptyList(); } - final List libDirs; + final List libDirs; if (options.getLibraryDirs() != null) { libDirs = options.getLibraryDirs(); } else { @@ -66,18 +67,19 @@ public static DefaultContext buildDefaultContext(CliOptions.GatewayCliOptions op // -------------------------------------------------------------------------------------------- - private static List discoverDependencies(List jars, List libraries) { - final List dependencies = new ArrayList<>(); + private static List discoverDependencies(List jars, List libraries) { + final List dependencies = new ArrayList<>(); try { // find jar files - for (URL url : jars) { - JarUtils.checkJarFile(url); - dependencies.add(url); + for (URI uri : jars) { + // delay the file check until ResourceManager is created + // ResourceManager supports to download files from external system. + dependencies.add(uri); } // find jar files in library directories - for (URL libUrl : libraries) { - final File dir = new File(libUrl.toURI()); + for (URI libURI : libraries) { + final File dir = new File(libURI); if (!dir.isDirectory()) { throw new SqlClientException("Directory expected: " + dir); } else if (!dir.canRead()) { @@ -92,7 +94,7 @@ private static List discoverDependencies(List jars, List librarie if (f.isFile() && f.getAbsolutePath().toLowerCase().endsWith(".jar")) { final URL url = f.toURI().toURL(); JarUtils.checkJarFile(url); - dependencies.add(url); + dependencies.add(f.toURI()); } } } diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java index 3be128e07080f..de6a479f084c2 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java @@ -22,8 +22,11 @@ import org.apache.flink.table.gateway.rest.util.RowFormat; import org.apache.flink.table.gateway.service.context.DefaultContext; +import javax.annotation.Nullable; + import java.io.Closeable; import java.net.InetSocketAddress; +import java.net.URI; import java.net.URL; import java.util.List; import java.util.Map; @@ -87,6 +90,15 @@ static Executor create(DefaultContext defaultContext, URL address, String sessio */ List completeStatement(String statement, int position); + /** + * Deploy script in application mode. + * + * @param script content to run in application mode + * @param uri uri to the script + * @return the cluster id + */ + String deployScript(@Nullable String script, @Nullable URI uri); + /** Close the {@link Executor} and process all exceptions. */ void close(); } diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java index 88dd22f432b98..21ba32454d46c 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java @@ -43,6 +43,7 @@ import org.apache.flink.table.gateway.api.operation.OperationHandle; import org.apache.flink.table.gateway.api.results.ResultSet; import org.apache.flink.table.gateway.api.session.SessionHandle; +import org.apache.flink.table.gateway.rest.header.application.DeployScriptHeaders; import org.apache.flink.table.gateway.rest.header.operation.CancelOperationHeaders; import org.apache.flink.table.gateway.rest.header.operation.CloseOperationHeaders; import org.apache.flink.table.gateway.rest.header.session.CloseSessionHeaders; @@ -54,6 +55,7 @@ import org.apache.flink.table.gateway.rest.header.statement.ExecuteStatementHeaders; import org.apache.flink.table.gateway.rest.header.statement.FetchResultsHeaders; import org.apache.flink.table.gateway.rest.header.util.GetApiVersionHeaders; +import org.apache.flink.table.gateway.rest.message.application.DeployScriptRequestBody; import org.apache.flink.table.gateway.rest.message.operation.OperationMessageParameters; import org.apache.flink.table.gateway.rest.message.operation.OperationStatusResponseBody; import org.apache.flink.table.gateway.rest.message.session.CloseSessionResponseBody; @@ -84,6 +86,7 @@ import java.io.IOException; import java.net.InetSocketAddress; +import java.net.URI; import java.net.URL; import java.util.Collection; import java.util.Collections; @@ -331,6 +334,19 @@ public List completeStatement(String statement, int position) { .getCandidates(); } + @Override + public String deployScript(@Nullable String script, @Nullable URI uri) { + return getResponse( + sendRequest( + DeployScriptHeaders.getInstance(), + new SessionMessageParameters(sessionHandle), + new DeployScriptRequestBody( + script, + uri == null ? null : uri.toString(), + Collections.emptyMap()))) + .getClusterID(); + } + @Override public void close() { if (!registry.isClosed()) { diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SingleSessionManager.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SingleSessionManager.java index 9c7e7dee0bb0c..99dd4e72ed2c4 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SingleSessionManager.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SingleSessionManager.java @@ -18,7 +18,9 @@ package org.apache.flink.table.client.gateway; +import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.table.api.internal.TableResultInternal; import org.apache.flink.table.client.SqlClientException; import org.apache.flink.table.client.resource.ClientResourceManager; @@ -36,13 +38,24 @@ import org.apache.flink.table.gateway.service.result.ResultFetcher; import org.apache.flink.table.gateway.service.session.Session; import org.apache.flink.table.gateway.service.session.SessionManager; +import org.apache.flink.table.resource.ResourceType; +import org.apache.flink.table.resource.ResourceUri; import org.apache.flink.util.MutableURLClassLoader; import org.apache.flink.util.Preconditions; +import java.io.IOException; +import java.net.URI; import java.net.URL; import java.net.URLClassLoader; +import java.util.Collections; +import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.stream.Collectors; + +import static org.apache.flink.client.cli.ArtifactFetchOptions.ARTIFACT_LIST; +import static org.apache.flink.configuration.ConfigOptions.key; +import static org.apache.flink.table.client.cli.CliUtils.isApplicationMode; /** * A {@link SessionManager} only has one session at most. It uses the less resources and also @@ -53,6 +66,9 @@ */ public class SingleSessionManager implements SessionManager { + private static final ConfigOption> YARN_SHIP_FILES = + key("yarn.ship-files").stringType().asList().noDefaultValue(); + private final DefaultContext defaultContext; private final ExecutorService operationExecutorService; @@ -96,6 +112,7 @@ public Session openSession(SessionEnvironment environment) throws SqlGatewayExce sessionHandle, environment, operationExecutorService)); + session.open(); return session; } @@ -136,23 +153,56 @@ public static EmbeddedSessionContext create( ExecutorService operationExecutorService) { Configuration configuration = initializeConfiguration(defaultContext, environment, sessionId); + List dependencies; + // rewrite the dependencies + if (isApplicationMode(configuration)) { + dependencies = Collections.emptyList(); + if (!defaultContext.getDependencies().isEmpty()) { + String target = configuration.getOptional(DeploymentOptions.TARGET).orElse(""); + if (target.equals("yarn-application")) { + configuration.set( + YARN_SHIP_FILES, + defaultContext.getDependencies().stream() + .map(URI::toString) + .collect(Collectors.toList())); + } else if (target.equals("kubernetes-application")) { + configuration.set( + ARTIFACT_LIST, + defaultContext.getDependencies().stream() + .map(URI::toString) + .collect(Collectors.toList())); + } else { + throw new SqlGatewayException("Unknown deployment target: " + target); + } + } + } else { + dependencies = defaultContext.getDependencies(); + } final MutableURLClassLoader userClassLoader = new ClientWrapperClassLoader( ClientClassloaderUtil.buildUserClassLoader( - defaultContext.getDependencies(), + Collections.emptyList(), SessionContext.class.getClassLoader(), new Configuration(configuration)), configuration); ClientResourceManager resourceManager = new ClientResourceManager(configuration, userClassLoader); - return new EmbeddedSessionContext( - defaultContext, - sessionId, - environment.getSessionEndpointVersion(), - configuration, - userClassLoader, - initializeSessionState(environment, configuration, resourceManager), - new OperationManager(operationExecutorService)); + try { + resourceManager.registerJarResources( + dependencies.stream() + .map(uri -> new ResourceUri(ResourceType.JAR, uri.toString())) + .collect(Collectors.toList())); + return new EmbeddedSessionContext( + defaultContext, + sessionId, + environment.getSessionEndpointVersion(), + configuration, + userClassLoader, + initializeSessionState(environment, configuration, resourceManager), + new OperationManager(operationExecutorService)); + } catch (IOException e) { + throw new SqlGatewayException("Failed to open the session.", e); + } } @Override diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientTest.java index 428c76ea10a17..d8f270a8dbef6 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientTest.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientTest.java @@ -19,6 +19,7 @@ package org.apache.flink.table.client; import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.gateway.rest.DeployScriptITCase; import org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointExtension; import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension; import org.apache.flink.util.FileUtils; @@ -27,10 +28,12 @@ import org.junit.jupiter.api.Order; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; import java.io.File; import java.net.URL; import java.nio.file.Files; +import java.nio.file.Path; import java.nio.file.Paths; import java.util.Arrays; import java.util.Collections; @@ -39,7 +42,6 @@ import static org.apache.flink.configuration.DeploymentOptions.TARGET; import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for {@link SqlClient}. */ class SqlClientTest extends SqlClientTestBase { @@ -250,14 +252,6 @@ void testDisplayMultiLineSqlInInteractiveMode() throws Exception { + "> END;"); } - @Test - void testExecuteSqlWithHDFSFile() { - String[] args = new String[] {"-f", "hdfs://path/to/file/test.sql"}; - assertThatThrownBy(() -> runSqlClient(args)) - .isInstanceOf(SqlClientException.class) - .hasMessage("SQL Client only supports to load files in local."); - } - @Test public void testPrintEmbeddedModeHelp() throws Exception { runTestCliHelp(new String[] {"embedded", "--help"}, "cli/embedded-mode-help.out"); @@ -273,6 +267,17 @@ public void testPrintAllModeHelp() throws Exception { runTestCliHelp(new String[] {"--help"}, "cli/all-mode-help.out"); } + @Test + public void testDeployScript(@TempDir Path home) throws Exception { + DeployScriptITCase.TestApplicationClusterClientFactory.id = "test-application"; + Path script = home.resolve("script.sql"); + assertThat(script.toFile().createNewFile()).isTrue(); + String[] args = {"-f", script.toString(), "-Dexecution.target=test-application"}; + assertThat(runSqlClient(args)) + .contains("[INFO] Deploy script in application mode:") + .contains("Cluster ID: test"); + } + private void runTestCliHelp(String[] args, String expected) throws Exception { String actual = new String( diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java index f2920c21d180f..938d7d24d29a5 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java @@ -20,6 +20,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.core.testutils.CheckedThread; import org.apache.flink.runtime.testutils.CommonTestUtils; @@ -41,6 +42,7 @@ import org.apache.flink.table.operations.QueryOperation; import org.apache.flink.util.CloseableIterator; +import org.apache.commons.io.FileUtils; import org.jline.reader.Candidate; import org.jline.reader.LineReader; import org.jline.reader.LineReaderBuilder; @@ -49,6 +51,9 @@ import org.jline.terminal.Terminal; import org.jline.terminal.impl.DumbTerminal; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import javax.annotation.Nullable; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -56,6 +61,8 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.net.URI; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; @@ -80,6 +87,8 @@ class CliClientTest { private static final String SQL_WITHOUT_COMPLETER = "SELECT pos FROM source_table;"; private static final String SQL_WITH_COMPLETER = "SELECT POSITION FROM source_table;"; + private static @TempDir Path home; + @Test void testUpdateSubmission() throws Exception { verifyUpdateSubmission(INSERT_INTO_STATEMENT, false, false); @@ -257,14 +266,18 @@ void testCancelExecutionInNonInteractiveMode() throws Exception { Path historyFilePath = historyTempFile(); OutputStream outputStream = new ByteArrayOutputStream(256); - + File script = + home.resolve(String.format("script_%s.sql", System.currentTimeMillis())).toFile(); + assertThat(script.createNewFile()).isTrue(); try (CliClient client = new CliClient( () -> TerminalUtils.createDumbTerminal(outputStream), mockExecutor, historyFilePath, null)) { - Thread thread = new Thread(() -> client.executeInNonInteractiveMode(content)); + FileUtils.writeStringToFile(script, content, StandardCharsets.UTF_8); + Thread thread = new Thread(() -> client.executeInNonInteractiveMode(script.toURI())); + thread.start(); while (!mockExecutor.isAwait) { @@ -316,6 +329,29 @@ public void go() { } } + @Test + void testDeployScript() throws Exception { + final MockExecutor mockExecutor = new MockExecutor(new SqlParserHelper(), true); + Path historyFilePath = historyTempFile(); + + File script = + home.resolve(String.format("script_%s.sql", System.currentTimeMillis())).toFile(); + mockExecutor.configuration.set(DeploymentOptions.TARGET, "kubernetes-application"); + assertThat(script.createNewFile()).isTrue(); + try (OutputStream outputStream = new ByteArrayOutputStream(256); + CliClient client = + new CliClient( + () -> TerminalUtils.createDumbTerminal(outputStream), + mockExecutor, + historyFilePath, + null)) { + client.executeInNonInteractiveMode(script.toURI()); + assertThat(outputStream.toString()) + .contains("[INFO] Deploy script in application mode: ") + .contains("Cluster ID: test-application-cluster"); + } + } + // -------------------------------------------------------------------------------------------- private void verifyUpdateSubmission( @@ -368,13 +404,18 @@ private Path historyTempFile() throws IOException { private String executeSqlFromContent(MockExecutor executor, String content) throws IOException { OutputStream outputStream = new ByteArrayOutputStream(256); + File script = home.resolve("script.sql").toFile(); + assertThat(script.createNewFile()).isTrue(); try (CliClient client = new CliClient( () -> TerminalUtils.createDumbTerminal(outputStream), executor, historyTempFile(), null)) { - client.executeInNonInteractiveMode(content); + FileUtils.writeStringToFile(script, content, StandardCharsets.UTF_8); + client.executeInNonInteractiveMode(script.toURI()); + } finally { + script.delete(); } return outputStream.toString(); } @@ -465,6 +506,11 @@ public List completeStatement(String statement, int position) { return Arrays.asList(helper.getSqlParser().getCompletionHints(statement, position)); } + @Override + public String deployScript(@Nullable String script, @Nullable URI uri) { + return "test-application-cluster"; + } + @Override public void close() { // do nothing diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/ExecutorImplITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/ExecutorImplITCase.java index a2bd7836bada3..ea304bc97bd76 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/ExecutorImplITCase.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/ExecutorImplITCase.java @@ -166,7 +166,7 @@ class ExecutorImplITCase { private static RestClusterClient clusterClient; // a generated UDF jar used for testing classloading of dependencies - private static URL udfDependency; + private static URI udfDependency; private final ThreadFactory threadFactory = new ExecutorThreadFactory("Executor Test Pool", IgnoreExceptionHandler.INSTANCE); @@ -181,7 +181,7 @@ static void setup(@InjectClusterClient RestClusterClient injectedClusterClien "test-classloader-udf.jar", GENERATED_LOWER_UDF_CLASS, String.format(GENERATED_LOWER_UDF_CODE, GENERATED_LOWER_UDF_CLASS)); - udfDependency = udfJar.toURI().toURL(); + udfDependency = udfJar.toURI(); } private static Configuration getConfig() { @@ -662,7 +662,7 @@ private Executor createRestServiceExecutor() { } private Executor createRestServiceExecutor( - List dependencies, Configuration configuration) { + List dependencies, Configuration configuration) { return createExecutor( dependencies, configuration, @@ -681,7 +681,7 @@ private Executor createTestServiceExecutor() { } private Executor createExecutor( - List dependencies, Configuration configuration, InetSocketAddress address) { + List dependencies, Configuration configuration, InetSocketAddress address) { configuration.addAll(clusterClient.getFlinkConfiguration()); DefaultContext defaultContext = new DefaultContext(configuration, dependencies); // frequently trigger heartbeat diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/DefaultContext.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/DefaultContext.java index 78eccc7337df7..1b91dce16e5ef 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/DefaultContext.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/DefaultContext.java @@ -37,7 +37,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.net.URL; +import java.net.URI; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -48,13 +49,13 @@ public class DefaultContext { private static final Logger LOG = LoggerFactory.getLogger(DefaultContext.class); private final Configuration flinkConfig; - private final List dependencies; + private final List dependencies; - public DefaultContext(Map flinkConfig, List dependencies) { + public DefaultContext(Map flinkConfig, List dependencies) { this(Configuration.fromMap(flinkConfig), dependencies); } - public DefaultContext(Configuration flinkConfig, List dependencies) { + public DefaultContext(Configuration flinkConfig, List dependencies) { this.flinkConfig = flinkConfig; this.dependencies = dependencies; } @@ -63,7 +64,7 @@ public Configuration getFlinkConfig() { return flinkConfig; } - public List getDependencies() { + public List getDependencies() { return dependencies; } @@ -80,8 +81,7 @@ private static Options collectCommandLineOptions(List command private static Configuration createExecutionConfig( CommandLine commandLine, Options commandLineOptions, - List availableCommandLines, - List dependencies) + List availableCommandLines) throws FlinkException { LOG.debug("Available commandline options: {}", commandLineOptions); List options = @@ -105,7 +105,8 @@ private static Configuration createExecutionConfig( try { final ProgramOptions programOptions = ProgramOptions.create(commandLine); final ExecutionConfigAccessor executionConfigAccessor = - ExecutionConfigAccessor.fromProgramOptions(programOptions, dependencies); + ExecutionConfigAccessor.fromProgramOptions( + programOptions, Collections.emptyList()); executionConfigAccessor.applyToConfiguration(executionConfig); } catch (CliArgsException e) { throw new SqlGatewayException("Invalid deployment run options.", e); @@ -136,7 +137,7 @@ private static CustomCommandLine findActiveCommandLine( * @param discoverExecutionConfig flag whether to load the execution configuration */ public static DefaultContext load( - Configuration dynamicConfig, List dependencies, boolean discoverExecutionConfig) { + Configuration dynamicConfig, List dependencies, boolean discoverExecutionConfig) { // 1. find the configuration directory String flinkConfigDir = CliFrontend.getConfigurationDirectoryFromEnv(); @@ -161,10 +162,7 @@ public static DefaultContext load( CliFrontendParser.parse(commandLineOptions, new String[] {}, true); configuration.addAll( createExecutionConfig( - deploymentCommandLine, - commandLineOptions, - commandLines, - dependencies)); + deploymentCommandLine, commandLineOptions, commandLines)); } catch (Exception e) { throw new SqlGatewayException( "Could not load available CLI with Environment Deployment entry.", e); diff --git a/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkStatementTest.java b/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkStatementTest.java index 8ff4af291a81b..5d198b9a6e62b 100644 --- a/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkStatementTest.java +++ b/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkStatementTest.java @@ -30,6 +30,9 @@ import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.io.TempDir; +import javax.annotation.Nullable; + +import java.net.URI; import java.nio.file.Path; import java.sql.ResultSet; import java.sql.SQLFeatureNotSupportedException; @@ -251,6 +254,11 @@ public List completeStatement(String statement, int position) { throw new UnsupportedOperationException(); } + @Override + public String deployScript(@Nullable String script, @Nullable URI uri) { + throw new UnsupportedOperationException(); + } + @Override public void close() {} } diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SQLJobSubmission.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SQLJobSubmission.java index 0d0700d999404..19dc6f3aae235 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SQLJobSubmission.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SQLJobSubmission.java @@ -18,6 +18,7 @@ package org.apache.flink.test.util; +import java.net.URI; import java.nio.file.Path; import java.util.ArrayList; import java.util.List; @@ -78,11 +79,23 @@ public SQLJobSubmissionBuilder setClientMode(SQLJobClientMode clientMode) { return this; } + public SQLJobSubmissionBuilder addJar(URI jarFile) { + this.jars.add(jarFile.toString()); + return this; + } + public SQLJobSubmissionBuilder addJar(Path jarFile) { this.jars.add(jarFile.toAbsolutePath().toString()); return this; } + public SQLJobSubmissionBuilder addJars(URI... jarFiles) { + for (URI jarFile : jarFiles) { + addJar(jarFile); + } + return this; + } + public SQLJobSubmissionBuilder addJars(Path... jarFiles) { for (Path jarFile : jarFiles) { addJar(jarFile); diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/SqlYARNApplicationITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/SqlYARNApplicationITCase.java new file mode 100644 index 0000000000000..282e297c35fc5 --- /dev/null +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/SqlYARNApplicationITCase.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DeploymentOptions; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.RpcOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.util.FileUtils; +import org.apache.flink.yarn.configuration.YarnConfigOptions; +import org.apache.flink.yarn.configuration.YarnDeploymentTarget; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.apache.flink.yarn.configuration.YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests to deploy script into mini yarn cluster. */ +public class SqlYARNApplicationITCase extends YarnTestBase { + + private static final Logger LOG = LoggerFactory.getLogger(SqlYARNApplicationITCase.class); + + private static final Duration yarnAppTerminateTimeout = Duration.ofSeconds(30); + private static final int sleepIntervalInMS = 100; + private static @TempDir Path workDir; + private static File script; + + @BeforeAll + static void setup() throws Exception { + YARN_CONFIGURATION.set( + YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-sql-yarn-test-application"); + startYARNWithConfig(YARN_CONFIGURATION, true); + script = workDir.resolve("script.sql").toFile(); + assertThat(script.createNewFile()).isTrue(); + FileUtils.writeFileUtf8( + script, + "CREATE TEMPORARY TABLE sink(\n" + + " a INT\n" + + ") WITH (\n" + + " 'connector' = 'blackhole'\n" + + ");\n" + + "INSERT INTO sink VALUES (1), (2), (3);"); + } + + @Test + void testDeployScriptViaSqlClient() throws Exception { + runTest(this::runSqlClient); + } + + private void runSqlClient() throws Exception { + Path path = flinkLibFolder.getParentFile().toPath().resolve("bin").resolve("sql-client.sh"); + if (!path.toFile().exists()) { + throw new RuntimeException(); + } + + List parameters = new ArrayList<>(); + // command line parameters: sql-client.sh -Dkey=value -f + parameters.add(path.toString()); + parameters.add( + getSqlClientParameter(JobManagerOptions.TOTAL_PROCESS_MEMORY.key(), "768MB")); + parameters.add(getSqlClientParameter(TaskManagerOptions.TOTAL_PROCESS_MEMORY.key(), "1g")); + parameters.add(getSqlClientParameter(RpcOptions.ASK_TIMEOUT_DURATION.key(), "30s")); + parameters.add( + getSqlClientParameter( + DeploymentOptions.TARGET.key(), + YarnDeploymentTarget.APPLICATION.getName())); + parameters.add( + getSqlClientParameter( + CLASSPATH_INCLUDE_USER_JAR.key(), + YarnConfigOptions.UserJarInclusion.LAST.name())); + parameters.add("-f"); + parameters.add(script.getAbsolutePath()); + + ProcessBuilder builder = new ProcessBuilder(parameters); + // prepare environment + builder.environment().put("HADOOP_CLASSPATH", getYarnClasspath()); + builder.environment().putAll(env); + Process process = builder.start(); + + // start to deploy script + StringBuilder output = new StringBuilder(); + consumeOutput(process.getErrorStream(), line -> output.append(line).append("\n")); + consumeOutput(process.getInputStream(), line -> output.append(line).append("\n")); + + process.waitFor(120, TimeUnit.SECONDS); + + // validate results + assertThat(output).contains("Deploy script in application mode:"); + assertThat(output).contains("Cluster ID:"); + + Pattern pattern = Pattern.compile("Cluster ID: (application_\\w+_\\w+)"); + Matcher matcher = pattern.matcher(output.toString()); + assertThat(matcher.find()).isTrue(); + ApplicationId applicationId = ApplicationId.fromString(matcher.group(1)); + + try (final YarnClusterDescriptor yarnClusterDescriptor = + createYarnClusterDescriptor( + Configuration.fromMap( + Collections.singletonMap( + DeploymentOptions.TARGET.key(), + YarnDeploymentTarget.APPLICATION.getName())))) { + waitApplicationFinishedElseKillIt( + applicationId, + yarnAppTerminateTimeout, + yarnClusterDescriptor, + sleepIntervalInMS); + } + } + + private String getSqlClientParameter(String key, String value) { + return String.format("-D%s=%s", key, value); + } + + private static void consumeOutput( + final InputStream stream, final Consumer streamConsumer) { + new Thread( + () -> { + try (BufferedReader bufferedReader = + new BufferedReader( + new InputStreamReader( + stream, StandardCharsets.UTF_8))) { + String line; + while ((line = bufferedReader.readLine()) != null) { + streamConsumer.accept(line); + } + } catch (IOException e) { + LOG.error("Failure while processing process stdout/stderr.", e); + } + }) + .start(); + } +} diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java index e8fccb54997d3..45b5023abcd91 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java @@ -207,6 +207,7 @@ public abstract class YarnTestBase { protected static File yarnSiteXML = null; protected static File hdfsSiteXML = null; + protected static Map env; private YarnClient yarnClient = null; @@ -244,7 +245,7 @@ public abstract class YarnTestBase { * * @return a classpath suitable for running all YARN-launched JVMs */ - private static String getYarnClasspath() { + protected static String getYarnClasspath() { final String start = "../flink-yarn-tests"; try { File classPathFile = @@ -819,7 +820,7 @@ private static void start( yarnCluster.start(); } - Map map = new HashMap(System.getenv()); + env = new HashMap<>(System.getenv()); File flinkConfDirPath = TestUtils.findFile( @@ -852,7 +853,7 @@ private static void start( assertThat(configDir).isNotNull(); - map.put(ConfigConstants.ENV_FLINK_CONF_DIR, configDir); + env.put(ConfigConstants.ENV_FLINK_CONF_DIR, configDir); File targetTestClassesFolder = new File("target/test-classes"); writeYarnSiteConfigXML(conf, targetTestClassesFolder); @@ -862,12 +863,12 @@ private static void start( setMiniDFSCluster(targetTestClassesFolder); } - map.put( + env.put( "IN_TESTS", "yes we are in tests"); // see YarnClusterDescriptor() for more infos - map.put("YARN_CONF_DIR", targetTestClassesFolder.getAbsolutePath()); - map.put("MAX_LOG_FILE_NUMBER", "10"); - CommonTestUtils.setEnv(map); + env.put("YARN_CONF_DIR", targetTestClassesFolder.getAbsolutePath()); + env.put("MAX_LOG_FILE_NUMBER", "10"); + CommonTestUtils.setEnv(env); assertThat(yarnCluster.getServiceState()).isEqualTo(Service.STATE.STARTED); diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java index ee6207087bd06..c4c7e52ab7c77 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java @@ -28,12 +28,14 @@ import org.apache.flink.yarn.configuration.YarnDeploymentTarget; import org.apache.flink.yarn.configuration.YarnLogConfigUtil; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import javax.annotation.Nullable; +import java.io.File; import java.util.Optional; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -78,6 +80,12 @@ private YarnClusterDescriptor getClusterDescriptor(Configuration configuration) final YarnConfiguration yarnConfiguration = Utils.getYarnAndHadoopConfiguration(configuration); + if (System.getenv().get("IN_TESTS") != null) { + File f = new File(System.getenv("YARN_CONF_DIR"), Utils.YARN_SITE_FILE_NAME); + Path yarnSitePath = new Path(f.getAbsolutePath()); + yarnConfiguration.addResource(yarnSitePath); + } + yarnClient.init(yarnConfiguration); yarnClient.start(); diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java index 8d2e88e253adc..42a6e3e833722 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java @@ -988,14 +988,23 @@ private ApplicationReport startAppMaster( } // only for application mode - // Python jar file only needs to be shipped and should not be added to classpath. - if (YarnApplicationClusterEntryPoint.class.getName().equals(yarnClusterEntrypoint) - && PackagedProgramUtils.isPython(configuration.get(APPLICATION_MAIN_CLASS))) { - fileUploader.registerMultipleLocalResources( - Collections.singletonList( - new Path(PackagedProgramUtils.getPythonJar().toURI())), - ConfigConstants.DEFAULT_FLINK_OPT_DIR, - LocalResourceType.FILE); + if (YarnApplicationClusterEntryPoint.class.getName().equals(yarnClusterEntrypoint)) { + // Python jar/Sql Gateway jar only need to be shipped and should not be added to + // classpath. + if (PackagedProgramUtils.isPython(configuration.get(APPLICATION_MAIN_CLASS))) { + fileUploader.registerMultipleLocalResources( + Collections.singletonList( + new Path(PackagedProgramUtils.getPythonJar().toURI())), + ConfigConstants.DEFAULT_FLINK_OPT_DIR, + LocalResourceType.FILE); + } else if (PackagedProgramUtils.isSqlApplication( + configuration.get(APPLICATION_MAIN_CLASS))) { + fileUploader.registerMultipleLocalResources( + Collections.singletonList( + new Path(PackagedProgramUtils.getSqlGatewayJar().toURI())), + ConfigConstants.DEFAULT_FLINK_OPT_DIR, + LocalResourceType.FILE); + } } // Upload and register user jars