Skip to content

Commit

Permalink
Fix deserialization exceptions caused by incorrect get methods in enu…
Browse files Browse the repository at this point in the history
…m fields. (DataLinkDC#4082)

[BugFix][FlinkJar]Fix flink jar (DataLinkDC#4079)
[Bug]Change k8s StringUtils import (DataLinkDC#4081)
[BugFix][Flink Jar]Fix FLINK JAR submission (DataLinkDC#4073)
[Optimization] Optimize the execution logic of the script. (DataLinkDC#4068)
  • Loading branch information
zackyoungh committed Jan 2, 2025
1 parent bff0703 commit dad7cc7
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import org.springframework.stereotype.Service;

Expand Down Expand Up @@ -122,8 +123,10 @@ public LineageResult getLineage(StudioLineageDTO studioCADTO) {
TaskDTO taskDTO = taskService.getTaskInfoById(studioCADTO.getTaskId());
taskDTO.setStatement(taskService.buildEnvSql(taskDTO) + studioCADTO.getStatement());
JobConfig jobConfig = taskDTO.getJobConfig();
jobConfig.setUdfRefer(studioCADTO.getConfigJson().getUdfReferMaps());
jobConfig.setConfigJson(studioCADTO.getConfigJson().getCustomConfigMaps());
Optional.ofNullable(studioCADTO.getConfigJson()).ifPresent(config -> {
jobConfig.setUdfRefer(config.getUdfReferMaps());
jobConfig.setConfigJson(config.getCustomConfigMaps());
});

return LineageBuilder.getColumnLineageByLogicalPlan(taskDTO.getStatement(), jobConfig);
}
Expand Down
5 changes: 3 additions & 2 deletions dinky-core/src/main/java/org/dinky/executor/Executor.java
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,10 @@ public JobStatementPlan parseStatementIntoJobStatementPlan(String[] statements)
jobStatementPlan.addJobStatement(statement, JobStatementType.SET, operationType);
} else if (operationType.equals(SqlType.EXECUTE_JAR)) {
JarSubmitParam jarSubmitParam = JarSubmitParam.build(statement);
jarSubmitParam.setUri("base64@" + Base64.encode(pretreatStatement(jarSubmitParam.getArgs())));
String args = jarSubmitParam.getArgs();
jarSubmitParam.setArgs("base64@" + Base64.encode(isUseSqlFragment() ? pretreatStatement(args) : args));
jobStatementPlan.addJobStatement(
jarSubmitParam.toString(), JobStatementType.EXECUTE_JAR, operationType);
jarSubmitParam.getStatement(), JobStatementType.EXECUTE_JAR, operationType);
} else if (operationType.equals(SqlType.EXECUTE)) {
jobStatementPlan.addJobStatement(statement, JobStatementType.PIPELINE, operationType);
} else if (operationType.equals(SqlType.PRINT)) {
Expand Down
17 changes: 11 additions & 6 deletions script/bin/auto.sh
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ function wait_start_process() {
for code in "${success_status_codes[@]}"; do
if [ "$health_status" == "$code" ]; then
echo -ne "\r[==================================================] 100%\n"
echo -e "${GREEN}Application started completed.${RESET}"
return 0
fi
done
Expand All @@ -149,11 +148,11 @@ function wait_start_process() {
local filled_length=$((progress * bar_length / 100))
local empty_length=$((bar_length - filled_length))
local bar=$(printf '>%.0s' $(seq 1 $filled_length))$(printf ' %.0s' $(seq 1 $empty_length))
echo -ne "\r[${bar}] ${progress}% (time consuming: ${formatted_time})"
local processing_inline="\r[${bar}] ${progress}% (time consuming: ${formatted_time})"
echo -ne "${processing_inline}"
sleep $delay
done
echo -ne "\r[==================================================] 100% (time consuming: ${formatted_time})\n"
echo -e "${RED}Application start failed. Please check the log for details.${RESET}"
echo -ne "${processing_inline}\n"
return 1
}

Expand Down Expand Up @@ -218,7 +217,10 @@ start() {
nohup java ${PARAMS_OPT} ${JVM_OPTS} ${OOM_OPT} ${GC_OPT} -Xverify:none -cp "${CLASS_PATH}" org.dinky.Dinky ${JAR_PARAMS_OPT} > ${DINKY_LOG_PATH}/dinky-start.log 2>&1 &
PID=$!
echo "${PID}" >"${PID_PATH}"/${PID_FILE}
wait_start_process
if ! wait_start_process; then
echo -e "${RED}Application start failed. Please check the log for details. you can execute tail -fn1000 ${DINKY_LOG_PATH}/dinky-start.log to watch the log ${RESET}"
exit 1
fi
echo -e "${GREEN}........................................Start Dinky Successfully........................................${RESET}"
echo -e "${GREEN}current log path : ${DINKY_LOG_PATH}/dinky-start.log , you can execute tail -fn1000 ${DINKY_LOG_PATH}/dinky-start.log to watch the log${RESET}"
else
Expand All @@ -243,7 +245,10 @@ startWithJmx() {
if [ -z "$pid" ]; then
nohup java ${PARAMS_OPT} ${JVM_OPTS} ${OOM_OPT} ${GC_OPT} -Xverify:none "${JMX}" -cp "${CLASS_PATH}" org.dinky.Dinky ${JAR_PARAMS_OPT} > ${DINKY_LOG_PATH}/dinky-start.log 2>&1 &
PID=$!
wait_start_process
if ! wait_start_process; then
echo -e "${RED}Application start failed. Please check the log for details. you can execute tail -fn1000 ${DINKY_LOG_PATH}/dinky-start.log to watch the log ${RESET}"
exit 1
fi
echo -e "$GREEN........................................Start Dinky with Jmx Successfully........................................$RESET"
updatePid

Expand Down
37 changes: 26 additions & 11 deletions script/bin/init_flink_dependences.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,27 +28,38 @@ if [ -f "${FLINK_STORE_DIR}/flink-${CURRENT_FLINK_FULL_VERSION}-bin-scala_2.12.t
fi
fi

try_tsinghua_mirror() {
try_mirrors_download_file() {
local tsinghua_url="https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-${CURRENT_FLINK_FULL_VERSION}/flink-${CURRENT_FLINK_FULL_VERSION}-bin-scala_2.12.tgz"
local aliyun_url="https://mirrors.aliyun.com/apache/flink/flink-${CURRENT_FLINK_FULL_VERSION}/flink-${CURRENT_FLINK_FULL_VERSION}-bin-scala_2.12.tgz"
local apache_url="https://archive.apache.org/dist/flink/flink-${CURRENT_FLINK_FULL_VERSION}/flink-${CURRENT_FLINK_FULL_VERSION}-bin-scala_2.12.tgz"

echo -e "${GREEN}Start downloading the Flink-${FLINK_VERSION_SCAN} installation package... Store it in the ${FLINK_STORE_DIR} directory${RESET}"
if download_file "$tsinghua_url" "${FLINK_STORE_DIR}"; then
echo -e "${BLUE}The address of the currently downloaded Flink installation package is:${tsinghua_url}${RESET}"

exec_tsinghua_result=$(download_file "$tsinghua_url" "${FLINK_STORE_DIR}")
if [ ! "$exec_tsinghua_result" ]; then
echo -e "${BLUE}The tsinghua address of the currently downloaded Flink installation package is:${tsinghua_url}${RESET}"
return 0
else
echo -e "${YELLOW}File not found in Tsinghua University mirror, try downloading from Apache official source...${RESET}"
if download_file "$apache_url" "${FLINK_STORE_DIR}"; then
echo -e "${BLUE}The address of the currently downloaded Flink installation package is:${apache_url}${RESET}"
echo -e "${YELLOW}Failed to download from Tsinghua mirror, try downloading from Aliyun mirror...${RESET}"
exec_aliyun_result=$(download_file "$aliyun_url" "${FLINK_STORE_DIR}")
if [ "$exec_aliyun_result" ]; then
echo -e "${BLUE}The aliyun address of the currently downloaded Flink installation package is:${aliyun_url}${RESET}"
return 0
else
echo -e "${RED}Downloading from Apache official source also failed, please check the network or download manually。${RESET}"
return 1
echo -e "${YELLOW}Failed to download from Aliyun mirror too, try downloading from Apache official source...${RESET}"

exec_apache_result=$(download_file "$apache_url" "${FLINK_STORE_DIR}")
if [ "$exec_apache_result" ]; then
echo -e "${RED}Downloading from Apache official source also failed, please check the network or download manually。${RESET}"
return 1
else
echo -e "${BLUE}The apache address of the currently downloaded Flink installation package is:${apache_url}${RESET}"
return 0
fi
fi
fi
}

if ! try_tsinghua_mirror; then
if ! try_mirrors_download_file; then
exit 0
fi

Expand Down Expand Up @@ -100,11 +111,15 @@ echo -e "${GREEN}Process flink-state-processor-api ...${RESET}"
cp -r ${full_flink_dir_tmp}/opt/flink-state-processor-api*.jar ${EXTENDS_HOME}/flink${FLINK_VERSION_SCAN}/
echo -e "${GREEN}Processing completed。${RESET}"

echo -e "${GREEN}Process flink-s3-fs-presto ...${RESET}"
cp -r ${full_flink_dir_tmp}/opt/flink-s3-fs-presto*.jar ${EXTENDS_HOME}/flink${FLINK_VERSION_SCAN}/
echo -e "${GREEN}Processing completed。${RESET}"

echo -e "${GREEN} ================= List files in the ${EXTENDS_HOME}/flink${FLINK_VERSION_SCAN}/ directory ==============${RESET}"
ls -l ${EXTENDS_HOME}/flink${FLINK_VERSION_SCAN}/

echo -e "${YELLOW}Please check the above dependent files。${RESET}"

rm -rf ${FLINK_STORE_DIR}/flink-${CURRENT_FLINK_FULL_VERSION}*

echo -e "${GREEN}The basic dependency processing is completed, please perform subsequent operations according to the actual situation.${RESET}"
echo -e "${GREEN}The basic dependency processing is completed, please perform subsequent operations according to the actual situation.${RESET}"
8 changes: 5 additions & 3 deletions script/bin/init_jdbc_driver.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
DINKY_LIB_DIR=$1

echo -e "${GREEN}Start downloading the mysql driver package...${RESET}"
if [ -f "${DINKY_LIB_DIR}/mysql-connector-j-8.4.0.jar" ]; then
echo -e "${YELLOW}mysql The driver package already exists, no need to download it again. Skip this step。${RESET}"
# Run the command to check whether the file exists
exec_result=$(ll "${DINKY_LIB_DIR}"/mysql-connector-j-8.4.0.jar)
if [ "$exec_result" ]; then
echo -e "${YELLOW}The mysql driver package already exists, no need to download it again. Skip this step。${RESET}"
else
download_file https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.4.0/mysql-connector-j-8.4.0.jar "${DINKY_LIB_DIR}"
echo -e "${GREEN}Download is complete, please verify. The downloaded file storage address is: ${DINKY_LIB_DIR}/mysql-connector-j-8.4.0.jar${RESET}"
Expand All @@ -15,4 +17,4 @@ else
exit 1
fi
echo -e "${GREEN}After the verification is completed, subsequent installation and configuration operations can be performed as needed.。${RESET}"
fi
fi
32 changes: 15 additions & 17 deletions script/bin/init_tools_main.sh
Original file line number Diff line number Diff line change
Expand Up @@ -51,24 +51,17 @@ function get_home_path() {
[[ $SOURCE != /* ]] && SOURCE="$DIR/$SOURCE"
done
DIR="$( cd -P "$( dirname "$SOURCE" )" && pwd )"
RETURN_HOME_PATH=$(dirname "$DIR")
}



if [ -z "${DINKY_HOME}" ]; then
echo -e "${RED}DINKY_HOME environment variable is not set. Attempting to determine the correct path...${RESET}"
get_home_path
APP_HOME="${RETURN_HOME_PATH}"
else
get_home_path
if [ "${DINKY_HOME}" != "${RETURN_HOME_PATH}" ]; then
echo -e "${YELLOW}DINKY_HOME is not equal to the current path, use new path to init: ${RETURN_HOME_PATH}${RESET}"
APP_HOME="${RETURN_HOME_PATH}"
local possible_path=$(dirname "$DIR")
if [ -d "$possible_path" ]; then # Verify that the path obtained is a directory, increasing robustness
RETURN_HOME_PATH="$possible_path"
else
echo -e "${GREEN}DINKY_HOME is already set to: ${DINKY_HOME}${RESET}"
echo -e "${RED}Calculated path $possible_path is not a valid directory. Please check the script location or deployment setup.${RESET}"
exit 1
fi
fi
}

get_home_path
APP_HOME="${RETURN_HOME_PATH}"

echo -e "${GREEN}Dinky root path: ${APP_HOME} ${RESET}"

Expand Down Expand Up @@ -131,7 +124,12 @@ function download_file() {
target_file_dir=$2
echo -e "${GREEN}Start downloading $source_url to $target_file_dir...${RESET}"
wget -P "${target_file_dir}" "${source_url}"
if [ $? -ne 0 ]; then
echo -e "${RED}Failed to download $source_url to $target_file_dir. Please check the network connection and try again.${RESET}"
return 1
fi
echo -e "${GREEN}Download completed. The downloaded file storage address is: $target_file_dir ${RESET}"
return 0
}

export -f download_file
Expand Down Expand Up @@ -382,4 +380,4 @@ echo
echo -e "${GREEN} ====================== Dinky initialization script execution completed ====================== ${RESET}"


set +x
set +x

0 comments on commit dad7cc7

Please sign in to comment.