Skip to content

Commit

Permalink
[Feature] [PyFlink] optimize resources and pyflink submit (#3120)
Browse files Browse the repository at this point in the history
  • Loading branch information
zackyoungh authored Jan 31, 2024
1 parent 8fe8baf commit 61897f0
Show file tree
Hide file tree
Showing 28 changed files with 1,816 additions and 272 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.dinky.data.model.FlinkUdfManifest;
import org.dinky.function.constant.PathConstant;
import org.dinky.function.util.ZipWriter;
import org.dinky.service.resource.BaseResourceManager;
import org.dinky.resource.BaseResourceManager;

import java.io.File;
import java.io.InputStream;
Expand Down
5 changes: 5 additions & 0 deletions dinky-admin/src/main/java/org/dinky/data/model/Resources.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;

import cn.hutool.core.bean.BeanUtil;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
Expand Down Expand Up @@ -146,4 +147,8 @@ public class Resources extends Model<Resources> {
@TableField(fill = FieldFill.INSERT_UPDATE)
@ApiModelProperty(value = "Updater", required = true, dataType = "Integer", example = "updater")
private Integer updater;

public static Resources of(ResourcesVO resourcesVO) {
return BeanUtil.toBean(resourcesVO, Resources.class);
}
}
2 changes: 1 addition & 1 deletion dinky-admin/src/main/java/org/dinky/init/SystemInit.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.dinky.job.ClearJobHistoryTask;
import org.dinky.job.FlinkJobTask;
import org.dinky.job.SystemMetricsTask;
import org.dinky.resource.BaseResourceManager;
import org.dinky.scheduler.client.ProjectClient;
import org.dinky.scheduler.exception.SchedulerException;
import org.dinky.scheduler.model.Project;
Expand All @@ -44,7 +45,6 @@
import org.dinky.service.SysConfigService;
import org.dinky.service.TaskService;
import org.dinky.service.TenantService;
import org.dinky.service.resource.BaseResourceManager;
import org.dinky.url.RsURLStreamHandlerFactory;
import org.dinky.utils.JsonUtils;
import org.dinky.utils.UDFUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@
import org.dinky.data.model.Resources;
import org.dinky.data.result.Result;
import org.dinky.mapper.ResourcesMapper;
import org.dinky.service.resource.BaseResourceManager;
import org.dinky.resource.BaseResourceManager;
import org.dinky.service.resource.ResourcesService;
import org.dinky.utils.URLUtils;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
Expand Down Expand Up @@ -72,6 +73,7 @@ public boolean syncRemoteDirectoryStructure() {
List<Resources> resourcesList =
getBaseResourceManager().getFullDirectoryStructure(rootResource.getId()).stream()
.filter(x -> x.getPid() != -1)
.map(Resources::of)
.peek(x -> {
// Restore the existing information. If the remotmap is not available,
// it means that the configuration is out of sync and no processing will be done.
Expand Down Expand Up @@ -294,7 +296,19 @@ public void uploadFile(Integer pid, String desc, MultipartFile file) {
}
long size = file.getSize();
String fileName = file.getOriginalFilename();
upload(pid, desc, (fullName) -> getBaseResourceManager().putFile(fullName, file), fileName, pResource, size);
upload(
pid,
desc,
(fullName) -> {
try {
getBaseResourceManager().putFile(fullName, file.getInputStream());
} catch (IOException e) {
throw new RuntimeException(e);
}
},
fileName,
pResource,
size);
}

@Transactional(rollbackFor = Exception.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
package org.dinky.url;

import org.dinky.data.exception.BusException;
import org.dinky.service.resource.BaseResourceManager;
import org.dinky.resource.BaseResourceManager;

import java.io.InputStream;
import java.net.URL;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import org.dinky.app.db.DBUtil;
import org.dinky.app.model.StatementParam;
import org.dinky.app.model.SysConfig;
import org.dinky.app.resource.BaseResourceManager;
import org.dinky.app.url.RsURLStreamHandlerFactory;
import org.dinky.app.util.FlinkAppUtil;
import org.dinky.assertion.Asserts;
import org.dinky.classloader.DinkyClassLoader;
Expand All @@ -37,10 +35,12 @@
import org.dinky.executor.ExecutorFactory;
import org.dinky.interceptor.FlinkInterceptor;
import org.dinky.parser.SqlType;
import org.dinky.resource.BaseResourceManager;
import org.dinky.trans.Operations;
import org.dinky.trans.dml.ExecuteJarOperation;
import org.dinky.trans.parse.AddJarSqlParseStrategy;
import org.dinky.trans.parse.ExecuteJarParseStrategy;
import org.dinky.url.RsURLStreamHandlerFactory;
import org.dinky.utils.SqlUtil;
import org.dinky.utils.ZipUtils;

Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

Loading

0 comments on commit 61897f0

Please sign in to comment.