From f287adff0d7e605fad853f6572dc9f1e7dc064c1 Mon Sep 17 00:00:00 2001 From: Bilwa ST Date: Wed, 6 Nov 2024 13:29:26 +0530 Subject: [PATCH] [Bug] [Seatunnel-web] Error when conditional column is not used in SELECT clause --- .../service/impl/JobInstanceServiceImpl.java | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java index 8d0b79579..684f529dc 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java @@ -103,6 +103,10 @@ public class JobInstanceServiceImpl extends SeatunnelBaseServiceImpl private static final String DAG_PARSING_MODE = "dag-parsing.mode"; + private static final String WHERE_CONDITION = "where_condition"; + + private static final String QUERY = "query"; + @Resource private ConnectorDataSourceMapperConfig dataSourceMapperConfig; @Resource private IDatasourceService datasourceService; @@ -233,6 +237,7 @@ public String generateJobConfig( businessMode, config, optionRule); + mergeConfig = appendWhereClauseToQuery(mergeConfig); sourceMap .get(task.getConnectorType()) .add(filterEmptyValue(mergeConfig)); @@ -330,6 +335,19 @@ public String generateJobConfig( return JobUtils.replaceJobConfigPlaceholders(jobConfig, executeParam); } + private Config appendWhereClauseToQuery(Config mergeConfig) { + String where_condition = mergeConfig.getString(WHERE_CONDITION); + if (where_condition != null && !where_condition.isEmpty()) { + String query = mergeConfig.getString(QUERY); + String queryWithWhereClause = query + " " + where_condition; + mergeConfig = + mergeConfig.withValue( + QUERY, ConfigValueFactory.fromAnyRef(queryWithWhereClause)); + mergeConfig = mergeConfig.withoutPath(WHERE_CONDITION); + } + return mergeConfig; + } + @Override public JobExecutorRes getExecuteResource(@NonNull Long jobEngineId) { funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.JOB_EXECUTOR_INSTANCE, 0);