Skip to content

Commit

Permalink
useing GsonPostProcessable to init params
Browse files Browse the repository at this point in the history
  • Loading branch information
CalvinKirs committed Dec 27, 2023
1 parent 0d05f80 commit d2e0cd3
Showing 1 changed file with 27 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.mysql.privilege.Privilege;
import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ShowResultSetMetaData;
Expand All @@ -67,7 +68,6 @@
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -84,7 +84,7 @@
@EqualsAndHashCode(callSuper = true)
@Data
@Slf4j
public class InsertJob extends AbstractJob<InsertTask, Map<Object, Object>> {
public class InsertJob extends AbstractJob<InsertTask, Map<Object, Object>> implements GsonPostProcessable {

public static final ImmutableList<Column> SCHEMA = ImmutableList.of(
new Column("Id", ScalarType.createStringType()),
Expand Down Expand Up @@ -156,6 +156,31 @@ public class InsertJob extends AbstractJob<InsertTask, Map<Object, Object>> {
// max save task num, do we need to config it?
private static final int MAX_SAVE_TASK_NUM = 100;

@Override
public void gsonPostProcess() throws IOException {
if (null == plans) {
plans = new ArrayList<>();
}
if (null == idToTasks) {
idToTasks = new ConcurrentHashMap<>();
}
if (null == loadStatistic) {
loadStatistic = new LoadStatistic();
}
if (null == finishedTaskIds) {
finishedTaskIds = new HashSet<>();
}
if (null == errorTabletInfos) {
errorTabletInfos = new ArrayList<>();
}
if (null == commitInfos) {
commitInfos = new ArrayList<>();
}
if (null == historyTaskIdList) {
historyTaskIdList = new ConcurrentLinkedQueue<>();
}
}

/**
* load job type
*/
Expand Down Expand Up @@ -219,12 +244,6 @@ public InsertJob(ConnectContext ctx,

@Override
public List<InsertTask> createTasks(TaskType taskType, Map<Object, Object> taskContext) {
if (null == plans) {
plans = new ArrayList<>();
}
if (null == idToTasks) {
idToTasks = new ConcurrentHashMap<>();
}
if (plans.isEmpty()) {
InsertTask task = new InsertTask(labelName, getCurrentDbName(), getExecuteSql(), getCreateUser());
idToTasks.put(task.getTaskId(), task);
Expand Down Expand Up @@ -466,21 +485,6 @@ public long getTimeout() {
return Config.broker_load_default_timeout_second;
}


public static InsertJob readFields(DataInput in) throws IOException {
//fix me : some field is not set and this method is not used
String jsonJob = Text.readString(in);
InsertJob job = GsonUtils.GSON.fromJson(jsonJob, InsertJob.class);
job.setRunningTasks(new ArrayList<>());
if (null == job.plans) {
job.plans = new ArrayList<>();
}
if (null == job.idToTasks) {
job.idToTasks = new ConcurrentHashMap<>();
}
return job;
}

@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, GsonUtils.GSON.toJson(this));
Expand Down

0 comments on commit d2e0cd3

Please sign in to comment.