Skip to content

Commit

Permalink
[INLONG-9112][Agent] Add task and instance profile (apache#9113)
Browse files Browse the repository at this point in the history
  • Loading branch information
justinwwhuang committed Oct 26, 2023
1 parent 6ca6e56 commit 0abbc5d
Show file tree
Hide file tree
Showing 4 changed files with 990 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.agent.conf;

import org.apache.inlong.agent.constant.TaskConstants;
import org.apache.inlong.agent.utils.file.FileUtils;
import org.apache.inlong.common.enums.InstanceStateEnum;
import org.apache.inlong.common.pojo.dataproxy.DataProxyTopicInfo;
import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo;

import com.google.common.collect.ComparisonChain;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

import static org.apache.inlong.agent.constant.TaskConstants.INSTANCE_STATE;
import static org.apache.inlong.agent.constant.TaskConstants.JOB_MQ_ClUSTERS;
import static org.apache.inlong.agent.constant.TaskConstants.JOB_MQ_TOPIC;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_RETRY;

/**
* job profile which contains details describing properties of one job.
*/
public class InstanceProfile extends AbstractConfiguration implements Comparable<InstanceProfile> {

private static final Logger LOGGER = LoggerFactory.getLogger(InstanceProfile.class);
private static final Gson GSON = new Gson();

/**
* parse json string to configuration instance.
*
* @return job configuration
*/
public static InstanceProfile parseJsonStr(String jsonStr) {
InstanceProfile conf = new InstanceProfile();
conf.loadJsonStrResource(jsonStr);
return conf;
}

public String toJsonStr() {
return GSON.toJson(getConfigStorage());
}

public void setInstanceClass(String className) {
set(TaskConstants.INSTANCE_CLASS, className);
}

public String getInstanceClass() {
return get(TaskConstants.INSTANCE_CLASS);
}

public String getTaskId() {
return get(TaskConstants.TASK_ID);
}

public String getInstanceId() {
return get(TaskConstants.INSTANCE_ID);
}

public String getSourceClass() {
return get(TaskConstants.TASK_SOURCE);
}

public String getSinkClass() {
return get(TaskConstants.TASK_SINK);
}

public InstanceStateEnum getState() {
int value = getInt(INSTANCE_STATE, InstanceStateEnum.DEFAULT.ordinal());
return InstanceStateEnum.getTaskState(value);
}

public void setState(InstanceStateEnum state) {
setInt(INSTANCE_STATE, state.ordinal());
}

@Override
public boolean allRequiredKeyExist() {
return true;
}

/**
* get MQClusterInfo list from config
*/
public List<MQClusterInfo> getMqClusters() {
List<MQClusterInfo> result = null;
String mqClusterStr = get(JOB_MQ_ClUSTERS);
if (StringUtils.isNotBlank(mqClusterStr)) {
result = GSON.fromJson(mqClusterStr, new TypeToken<List<MQClusterInfo>>() {
}.getType());
}
return result;
}

/**
* get mqTopic from config
*/
public DataProxyTopicInfo getMqTopic() {
DataProxyTopicInfo result = null;
String topicStr = get(JOB_MQ_TOPIC);
if (StringUtils.isNotBlank(topicStr)) {
result = GSON.fromJson(topicStr, DataProxyTopicInfo.class);
}
return result;
}

public void setCreateTime(Long time) {
setLong(TaskConstants.INSTANCE_CREATE_TIME, time);
}

public Long getCreateTime() {
return getLong(TaskConstants.INSTANCE_CREATE_TIME, 0);
}

public void setModifyTime(Long time) {
setLong(TaskConstants.INSTANCE_MODIFY_TIME, time);
}

public Long getModifyTime() {
return getLong(TaskConstants.INSTANCE_MODIFY_TIME, 0);
}

public void setInstanceId(String instanceId) {
set(TaskConstants.INSTANCE_ID, instanceId);
}

public void setDataTime(String dataTime) {
set(TaskConstants.JOB_DATA_TIME, dataTime);
}

public String getDataTime() {
return get(TaskConstants.JOB_DATA_TIME);
}

@Override
public int compareTo(InstanceProfile object) {
int ret = ComparisonChain.start()
.compare(getDataTime(), object.getDataTime())
.compare(FileUtils.getFileCreationTime(getInstanceId()),
FileUtils.getFileCreationTime(object.getInstanceId()))
.compare(FileUtils.getFileLastModifyTime(getInstanceId()),
FileUtils.getFileLastModifyTime(object.getInstanceId()))
.result();
return ret;
}

public boolean isRetry() {
return getBoolean(TASK_RETRY, false);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.agent.conf;

import org.apache.inlong.agent.constant.TaskConstants;
import org.apache.inlong.agent.pojo.TaskProfileDto;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.common.enums.InstanceStateEnum;
import org.apache.inlong.common.enums.TaskStateEnum;
import org.apache.inlong.common.pojo.agent.DataConfig;

import com.google.gson.Gson;

import static org.apache.inlong.agent.constant.TaskConstants.TASK_RETRY;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_STATE;

/**
* job profile which contains details describing properties of one job.
*/
public class TaskProfile extends AbstractConfiguration {

private static final Gson GSON = new Gson();

/**
* Get a TaskProfile from a DataConfig
*/
public static TaskProfile convertToTaskProfile(DataConfig dataConfig) {
if (dataConfig == null) {
return null;
}
return TaskProfileDto.convertToTaskProfile(dataConfig);
}

public String getTaskId() {
return get(TaskConstants.TASK_ID);
}

public String getCycleUnit() {
return get(TaskConstants.TASK_CYCLE_UNIT);
}

public String getTimeOffset() {
return get(TaskConstants.TASK_FILE_TIME_OFFSET);
}

public TaskStateEnum getState() {
return TaskStateEnum.getTaskState(getInt(TASK_STATE));
}

public void setState(TaskStateEnum state) {
setInt(TASK_STATE, state.ordinal());
}

public boolean isRetry() {
return getBoolean(TASK_RETRY, false);
}

public String getTaskClass() {
return get(TaskConstants.TASK_CLASS);
}

public void setTaskClass(String className) {
set(TaskConstants.TASK_CLASS, className);
}

/**
* parse json string to configuration instance.
*
* @return job configuration
*/
public static TaskProfile parseJsonStr(String jsonStr) {
TaskProfile conf = new TaskProfile();
conf.loadJsonStrResource(jsonStr);
return conf;
}

/**
* check whether required keys exists.
*
* @return return true if all required keys exists else false.
*/
@Override
public boolean allRequiredKeyExist() {
return hasKey(TaskConstants.TASK_ID) && hasKey(TaskConstants.TASK_SOURCE)
&& hasKey(TaskConstants.TASK_SINK) && hasKey(TaskConstants.TASK_CHANNEL)
&& hasKey(TaskConstants.TASK_GROUP_ID) && hasKey(TaskConstants.TASK_STREAM_ID)
&& hasKey(TaskConstants.TASK_CYCLE_UNIT);
}

public String toJsonStr() {
return GSON.toJson(getConfigStorage());
}

public InstanceProfile createInstanceProfile(String instanceClass, String fileName, String dataTime) {
InstanceProfile instanceProfile = InstanceProfile.parseJsonStr(toJsonStr());
instanceProfile.setInstanceClass(instanceClass);
instanceProfile.setInstanceId(fileName);
instanceProfile.setDataTime(dataTime);
instanceProfile.setCreateTime(AgentUtils.getCurrentTime());
instanceProfile.setModifyTime(AgentUtils.getCurrentTime());
instanceProfile.setState(InstanceStateEnum.DEFAULT);
return instanceProfile;
}
}
Loading

0 comments on commit 0abbc5d

Please sign in to comment.