diff --git a/dinky-core/src/main/java/org/dinky/job/JobManager.java b/dinky-core/src/main/java/org/dinky/job/JobManager.java index f9a43eed82..3ec80afbb3 100644 --- a/dinky-core/src/main/java/org/dinky/job/JobManager.java +++ b/dinky-core/src/main/java/org/dinky/job/JobManager.java @@ -285,6 +285,7 @@ public JobResult executeSql(String statement) throws Exception { } catch (Exception e) { String error = StrFormatter.format( "Exception in executing FlinkSQL:\n{}\n{}", SqlUtil.addLineNumber(currentSql), e.getMessage()); + e.printStackTrace(); job.setEndTime(LocalDateTime.now()); job.setStatus(Job.JobStatus.FAILED); job.setError(error); diff --git a/dinky-core/src/main/java/org/dinky/job/builder/JobTransBuilder.java b/dinky-core/src/main/java/org/dinky/job/builder/JobTransBuilder.java index f53167a8ed..350406a988 100644 --- a/dinky-core/src/main/java/org/dinky/job/builder/JobTransBuilder.java +++ b/dinky-core/src/main/java/org/dinky/job/builder/JobTransBuilder.java @@ -42,6 +42,7 @@ import org.apache.flink.table.api.TableResult; import java.util.ArrayList; +import java.util.Collections; import java.util.List; /** @@ -60,136 +61,130 @@ public static JobTransBuilder build(JobManager jobManager) { @Override public void run() throws Exception { - if (!jobParam.getTrans().isEmpty()) { - // Use statement set or gateway only submit inserts. - if (useStatementSet && useGateway) { - List inserts = new ArrayList<>(); - for (StatementParam item : jobParam.getTrans()) { - inserts.add(item.getValue()); - } - - // Use statement set need to merge all insert sql into a sql. - jobManager.setCurrentSql(String.join(sqlSeparator, inserts)); - GatewayResult gatewayResult = submitByGateway(inserts); - // Use statement set only has one jid. - job.setResult(InsertResult.success(gatewayResult.getId())); - job.setJobId(gatewayResult.getId()); - job.setJids(gatewayResult.getJids()); - job.setJobManagerAddress(URLUtils.formatAddress(gatewayResult.getWebURL())); - if (gatewayResult.isSuccess()) { - job.setStatus(Job.JobStatus.SUCCESS); - } else { - job.setStatus(Job.JobStatus.FAILED); - job.setError(gatewayResult.getError()); - } - } else if (useStatementSet && !useGateway) { - List inserts = new ArrayList<>(); - for (StatementParam item : jobParam.getTrans()) { - if (item.getType().equals(SqlType.INSERT)) { - inserts.add(item.getValue()); - } else if (item.getType().equals(SqlType.CTAS)) { - executor.getCustomTableEnvironment() - .getParser() - .parse(item.getValue()) - .forEach(x -> { - executor.getCustomTableEnvironment().executeCTAS(x); - }); - } - } - if (!inserts.isEmpty()) { - jobManager.setCurrentSql(String.join(sqlSeparator, inserts)); - // Remote mode can get the table result. - TableResult tableResult = executor.executeStatementSet(inserts); - if (tableResult.getJobClient().isPresent()) { - job.setJobId(tableResult.getJobClient().get().getJobID().toHexString()); - job.setJids(new ArrayList() { - - { - add(job.getJobId()); - } - }); - } - if (config.isUseResult()) { - // Build insert result. - IResult result = ResultBuilder.build( - SqlType.INSERT, - job.getId().toString(), - config.getMaxRowNum(), - config.isUseChangeLog(), - config.isUseAutoCancel(), - executor.getTimeZone()) - .getResult(tableResult); - job.setResult(result); - } - } - } else if (!useStatementSet && useGateway) { - List inserts = new ArrayList<>(); - for (StatementParam item : jobParam.getTrans()) { - inserts.add(item.getValue()); - // Only can submit the first of insert sql, when not use statement set. - break; - } - jobManager.setCurrentSql(String.join(sqlSeparator, inserts)); - GatewayResult gatewayResult = submitByGateway(inserts); - job.setResult(InsertResult.success(gatewayResult.getId())); - job.setJobId(gatewayResult.getId()); - job.setJids(gatewayResult.getJids()); - job.setJobManagerAddress(URLUtils.formatAddress(gatewayResult.getWebURL())); - if (gatewayResult.isSuccess()) { - job.setStatus(Job.JobStatus.SUCCESS); - } else { - job.setStatus(Job.JobStatus.FAILED); - job.setError(gatewayResult.getError()); - } - } else { - for (StatementParam item : jobParam.getTrans()) { - jobManager.setCurrentSql(item.getValue()); - FlinkInterceptorResult flinkInterceptorResult = FlinkInterceptor.build(executor, item.getValue()); - if (Asserts.isNotNull(flinkInterceptorResult.getTableResult())) { - if (config.isUseResult()) { - IResult result = ResultBuilder.build( - item.getType(), - job.getId().toString(), - config.getMaxRowNum(), - config.isUseChangeLog(), - config.isUseAutoCancel(), - executor.getTimeZone()) - .getResult(flinkInterceptorResult.getTableResult()); - job.setResult(result); - } - } else { - if (!flinkInterceptorResult.isNoExecute()) { - TableResult tableResult = executor.executeSql(item.getValue()); - if (tableResult.getJobClient().isPresent()) { - job.setJobId(tableResult - .getJobClient() - .get() - .getJobID() - .toHexString()); - job.setJids(new ArrayList() { - - { - add(job.getJobId()); - } - }); - } - if (config.isUseResult()) { - IResult result = ResultBuilder.build( - item.getType(), - job.getId().toString(), - config.getMaxRowNum(), - config.isUseChangeLog(), - config.isUseAutoCancel(), - executor.getTimeZone()) - .getResult(tableResult); - job.setResult(result); - } - } - } - // Only can submit the first of insert sql, when not use statement set. - break; - } + if (jobParam.getTrans().isEmpty()) { + return; + } + + if (useStatementSet) { + handleStatementSet(); + return; + } + + handleNonStatementSet(); + } + + private void handleStatementSet() throws Exception { + List inserts = collectInserts(); + + if (useGateway) { + processWithGateway(inserts); + return; + } + processWithoutGateway(inserts); + } + + private void handleNonStatementSet() throws Exception { + if (useGateway) { + processSingleInsertWithGateway(); + return; + } + processFirstStatement(); + } + + private List collectInserts() { + List inserts = new ArrayList<>(); + List statementParams = useStatementSet + ? jobParam.getTrans() + : Collections.singletonList(jobParam.getTrans().get(0)); + for (StatementParam item : statementParams) { + if (!useGateway && !item.getType().equals(SqlType.INSERT)) { + handleNonInsertType(item); + continue; } + inserts.add(item.getValue()); + } + return inserts; + } + + private void handleNonInsertType(StatementParam item) { + if (item.getType().equals(SqlType.CTAS)) { + executor.getCustomTableEnvironment() + .getParser() + .parse(item.getValue()) + .forEach(executor.getCustomTableEnvironment()::executeCTAS); + } + } + + private void processWithGateway(List inserts) throws Exception { + jobManager.setCurrentSql(String.join(sqlSeparator, inserts)); + GatewayResult gatewayResult = submitByGateway(inserts); + setJobResultFromGatewayResult(gatewayResult); + } + + private void processWithoutGateway(List inserts) throws Exception { + if (!inserts.isEmpty()) { + jobManager.setCurrentSql(String.join(sqlSeparator, inserts)); + TableResult tableResult = executor.executeStatementSet(inserts); + updateJobWithTableResult(tableResult); + } + } + + private void processSingleInsertWithGateway() throws Exception { + List singleInsert = collectInserts(); + processWithGateway(singleInsert); + } + + private void processFirstStatement() throws Exception { + if (jobParam.getTrans().isEmpty()) { + return; + } + // Only process the first statement when not using statement set + StatementParam item = jobParam.getTrans().get(0); + jobManager.setCurrentSql(item.getValue()); + processSingleStatement(item); + } + + private void processSingleStatement(StatementParam item) throws Exception { + FlinkInterceptorResult flinkInterceptorResult = FlinkInterceptor.build(executor, item.getValue()); + if (Asserts.isNotNull(flinkInterceptorResult.getTableResult())) { + updateJobWithTableResult(flinkInterceptorResult.getTableResult(), item.getType()); + } else if (!flinkInterceptorResult.isNoExecute()) { + TableResult tableResult = executor.executeSql(item.getValue()); + updateJobWithTableResult(tableResult, item.getType()); + } + } + + private void setJobResultFromGatewayResult(GatewayResult gatewayResult) { + job.setResult(InsertResult.success(gatewayResult.getId())); + job.setJobId(gatewayResult.getId()); + job.setJids(gatewayResult.getJids()); + job.setJobManagerAddress(URLUtils.formatAddress(gatewayResult.getWebURL())); + job.setStatus(gatewayResult.isSuccess() ? Job.JobStatus.SUCCESS : Job.JobStatus.FAILED); + if (!gatewayResult.isSuccess()) { + job.setError(gatewayResult.getError()); + } + } + + private void updateJobWithTableResult(TableResult tableResult) { + updateJobWithTableResult(tableResult, SqlType.INSERT); + } + + private void updateJobWithTableResult(TableResult tableResult, SqlType sqlType) { + if (tableResult.getJobClient().isPresent()) { + job.setJobId(tableResult.getJobClient().get().getJobID().toHexString()); + job.setJids(Collections.singletonList(job.getJobId())); + } + + if (config.isUseResult()) { + IResult result = ResultBuilder.build( + sqlType, + job.getId().toString(), + config.getMaxRowNum(), + config.isUseChangeLog(), + config.isUseAutoCancel(), + executor.getTimeZone()) + .getResult(tableResult); + job.setResult(result); } }