Skip to content

Commit

Permalink
refactor: JobTransBuilder (#2591)
Browse files Browse the repository at this point in the history
* refactor: JobTransBuilder

* feat: add print stacktrace

* Spotless Apply

---------

Co-authored-by: leechor <[email protected]>
  • Loading branch information
leechor and leechor authored Dec 4, 2023
1 parent 2372527 commit a41aac2
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 129 deletions.
1 change: 1 addition & 0 deletions dinky-core/src/main/java/org/dinky/job/JobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ public JobResult executeSql(String statement) throws Exception {
} catch (Exception e) {
String error = StrFormatter.format(
"Exception in executing FlinkSQL:\n{}\n{}", SqlUtil.addLineNumber(currentSql), e.getMessage());
e.printStackTrace();
job.setEndTime(LocalDateTime.now());
job.setStatus(Job.JobStatus.FAILED);
job.setError(error);
Expand Down
253 changes: 124 additions & 129 deletions dinky-core/src/main/java/org/dinky/job/builder/JobTransBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.flink.table.api.TableResult;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
Expand All @@ -60,136 +61,130 @@ public static JobTransBuilder build(JobManager jobManager) {

@Override
public void run() throws Exception {
if (!jobParam.getTrans().isEmpty()) {
// Use statement set or gateway only submit inserts.
if (useStatementSet && useGateway) {
List<String> inserts = new ArrayList<>();
for (StatementParam item : jobParam.getTrans()) {
inserts.add(item.getValue());
}

// Use statement set need to merge all insert sql into a sql.
jobManager.setCurrentSql(String.join(sqlSeparator, inserts));
GatewayResult gatewayResult = submitByGateway(inserts);
// Use statement set only has one jid.
job.setResult(InsertResult.success(gatewayResult.getId()));
job.setJobId(gatewayResult.getId());
job.setJids(gatewayResult.getJids());
job.setJobManagerAddress(URLUtils.formatAddress(gatewayResult.getWebURL()));
if (gatewayResult.isSuccess()) {
job.setStatus(Job.JobStatus.SUCCESS);
} else {
job.setStatus(Job.JobStatus.FAILED);
job.setError(gatewayResult.getError());
}
} else if (useStatementSet && !useGateway) {
List<String> inserts = new ArrayList<>();
for (StatementParam item : jobParam.getTrans()) {
if (item.getType().equals(SqlType.INSERT)) {
inserts.add(item.getValue());
} else if (item.getType().equals(SqlType.CTAS)) {
executor.getCustomTableEnvironment()
.getParser()
.parse(item.getValue())
.forEach(x -> {
executor.getCustomTableEnvironment().executeCTAS(x);
});
}
}
if (!inserts.isEmpty()) {
jobManager.setCurrentSql(String.join(sqlSeparator, inserts));
// Remote mode can get the table result.
TableResult tableResult = executor.executeStatementSet(inserts);
if (tableResult.getJobClient().isPresent()) {
job.setJobId(tableResult.getJobClient().get().getJobID().toHexString());
job.setJids(new ArrayList<String>() {

{
add(job.getJobId());
}
});
}
if (config.isUseResult()) {
// Build insert result.
IResult result = ResultBuilder.build(
SqlType.INSERT,
job.getId().toString(),
config.getMaxRowNum(),
config.isUseChangeLog(),
config.isUseAutoCancel(),
executor.getTimeZone())
.getResult(tableResult);
job.setResult(result);
}
}
} else if (!useStatementSet && useGateway) {
List<String> inserts = new ArrayList<>();
for (StatementParam item : jobParam.getTrans()) {
inserts.add(item.getValue());
// Only can submit the first of insert sql, when not use statement set.
break;
}
jobManager.setCurrentSql(String.join(sqlSeparator, inserts));
GatewayResult gatewayResult = submitByGateway(inserts);
job.setResult(InsertResult.success(gatewayResult.getId()));
job.setJobId(gatewayResult.getId());
job.setJids(gatewayResult.getJids());
job.setJobManagerAddress(URLUtils.formatAddress(gatewayResult.getWebURL()));
if (gatewayResult.isSuccess()) {
job.setStatus(Job.JobStatus.SUCCESS);
} else {
job.setStatus(Job.JobStatus.FAILED);
job.setError(gatewayResult.getError());
}
} else {
for (StatementParam item : jobParam.getTrans()) {
jobManager.setCurrentSql(item.getValue());
FlinkInterceptorResult flinkInterceptorResult = FlinkInterceptor.build(executor, item.getValue());
if (Asserts.isNotNull(flinkInterceptorResult.getTableResult())) {
if (config.isUseResult()) {
IResult result = ResultBuilder.build(
item.getType(),
job.getId().toString(),
config.getMaxRowNum(),
config.isUseChangeLog(),
config.isUseAutoCancel(),
executor.getTimeZone())
.getResult(flinkInterceptorResult.getTableResult());
job.setResult(result);
}
} else {
if (!flinkInterceptorResult.isNoExecute()) {
TableResult tableResult = executor.executeSql(item.getValue());
if (tableResult.getJobClient().isPresent()) {
job.setJobId(tableResult
.getJobClient()
.get()
.getJobID()
.toHexString());
job.setJids(new ArrayList<String>() {

{
add(job.getJobId());
}
});
}
if (config.isUseResult()) {
IResult result = ResultBuilder.build(
item.getType(),
job.getId().toString(),
config.getMaxRowNum(),
config.isUseChangeLog(),
config.isUseAutoCancel(),
executor.getTimeZone())
.getResult(tableResult);
job.setResult(result);
}
}
}
// Only can submit the first of insert sql, when not use statement set.
break;
}
if (jobParam.getTrans().isEmpty()) {
return;
}

if (useStatementSet) {
handleStatementSet();
return;
}

handleNonStatementSet();
}

private void handleStatementSet() throws Exception {
List<String> inserts = collectInserts();

if (useGateway) {
processWithGateway(inserts);
return;
}
processWithoutGateway(inserts);
}

private void handleNonStatementSet() throws Exception {
if (useGateway) {
processSingleInsertWithGateway();
return;
}
processFirstStatement();
}

private List<String> collectInserts() {
List<String> inserts = new ArrayList<>();
List<StatementParam> statementParams = useStatementSet
? jobParam.getTrans()
: Collections.singletonList(jobParam.getTrans().get(0));
for (StatementParam item : statementParams) {
if (!useGateway && !item.getType().equals(SqlType.INSERT)) {
handleNonInsertType(item);
continue;
}
inserts.add(item.getValue());
}
return inserts;
}

private void handleNonInsertType(StatementParam item) {
if (item.getType().equals(SqlType.CTAS)) {
executor.getCustomTableEnvironment()
.getParser()
.parse(item.getValue())
.forEach(executor.getCustomTableEnvironment()::executeCTAS);
}
}

private void processWithGateway(List<String> inserts) throws Exception {
jobManager.setCurrentSql(String.join(sqlSeparator, inserts));
GatewayResult gatewayResult = submitByGateway(inserts);
setJobResultFromGatewayResult(gatewayResult);
}

private void processWithoutGateway(List<String> inserts) throws Exception {
if (!inserts.isEmpty()) {
jobManager.setCurrentSql(String.join(sqlSeparator, inserts));
TableResult tableResult = executor.executeStatementSet(inserts);
updateJobWithTableResult(tableResult);
}
}

private void processSingleInsertWithGateway() throws Exception {
List<String> singleInsert = collectInserts();
processWithGateway(singleInsert);
}

private void processFirstStatement() throws Exception {
if (jobParam.getTrans().isEmpty()) {
return;
}
// Only process the first statement when not using statement set
StatementParam item = jobParam.getTrans().get(0);
jobManager.setCurrentSql(item.getValue());
processSingleStatement(item);
}

private void processSingleStatement(StatementParam item) throws Exception {
FlinkInterceptorResult flinkInterceptorResult = FlinkInterceptor.build(executor, item.getValue());
if (Asserts.isNotNull(flinkInterceptorResult.getTableResult())) {
updateJobWithTableResult(flinkInterceptorResult.getTableResult(), item.getType());
} else if (!flinkInterceptorResult.isNoExecute()) {
TableResult tableResult = executor.executeSql(item.getValue());
updateJobWithTableResult(tableResult, item.getType());
}
}

private void setJobResultFromGatewayResult(GatewayResult gatewayResult) {
job.setResult(InsertResult.success(gatewayResult.getId()));
job.setJobId(gatewayResult.getId());
job.setJids(gatewayResult.getJids());
job.setJobManagerAddress(URLUtils.formatAddress(gatewayResult.getWebURL()));
job.setStatus(gatewayResult.isSuccess() ? Job.JobStatus.SUCCESS : Job.JobStatus.FAILED);
if (!gatewayResult.isSuccess()) {
job.setError(gatewayResult.getError());
}
}

private void updateJobWithTableResult(TableResult tableResult) {
updateJobWithTableResult(tableResult, SqlType.INSERT);
}

private void updateJobWithTableResult(TableResult tableResult, SqlType sqlType) {
if (tableResult.getJobClient().isPresent()) {
job.setJobId(tableResult.getJobClient().get().getJobID().toHexString());
job.setJids(Collections.singletonList(job.getJobId()));
}

if (config.isUseResult()) {
IResult result = ResultBuilder.build(
sqlType,
job.getId().toString(),
config.getMaxRowNum(),
config.isUseChangeLog(),
config.isUseAutoCancel(),
executor.getTimeZone())
.getResult(tableResult);
job.setResult(result);
}
}

Expand Down

0 comments on commit a41aac2

Please sign in to comment.