Skip to content

Commit

Permalink
Merge branch 'apache:master' into INLONG-11349
Browse files Browse the repository at this point in the history
  • Loading branch information
qy-liuhuo authored Dec 11, 2024
2 parents 22fa6f7 + d3be4d4 commit 866fe40
Show file tree
Hide file tree
Showing 241 changed files with 16,640 additions and 4,217 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.inlong.agent.constant.TaskConstants;
import org.apache.inlong.agent.utils.file.FileUtils;
import org.apache.inlong.common.enums.InstanceStateEnum;
import org.apache.inlong.common.enums.TaskTypeEnum;
import org.apache.inlong.common.pojo.dataproxy.DataProxyTopicInfo;
import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo;

Expand All @@ -40,12 +41,24 @@
import static org.apache.inlong.agent.constant.TaskConstants.TASK_MQ_CLUSTERS;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_MQ_TOPIC;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_RETRY;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_TYPE;

/**
* job profile which contains details describing properties of one job.
*/
public class InstanceProfile extends AbstractConfiguration implements Comparable<InstanceProfile> {

public static final String DEFAULT_FILE_INSTANCE = "org.apache.inlong.agent.plugin.instance.FileInstance";
public static final String DEFAULT_COS_INSTANCE = "org.apache.inlong.agent.plugin.instance.COSInstance";
public static final String DEFAULT_KAFKA_INSTANCE = "org.apache.inlong.agent.plugin.instance.KafkaInstance";
public static final String DEFAULT_MONGODB_INSTANCE = "org.apache.inlong.agent.plugin.instance.MongoDBInstance";
public static final String DEFAULT_MQTT_INSTANCE = "org.apache.inlong.agent.plugin.instance.MqttInstance";
public static final String DEFAULT_ORACLE_INSTANCE = "org.apache.inlong.agent.plugin.instance.OracleInstance";
public static final String DEFAULT_POSTGRES_INSTANCE = "org.apache.inlong.agent.plugin.instance.PostgreSQLInstance";
public static final String DEFAULT_PULSAR_INSTANCE = "org.apache.inlong.agent.plugin.instance.PulsarInstance";
public static final String DEFAULT_REDIS_INSTANCE = "org.apache.inlong.agent.plugin.instance.RedisInstance";
public static final String DEFAULT_SQLSERVER_INSTANCE = "org.apache.inlong.agent.plugin.instance.SQLServerInstance";

private static final Logger LOGGER = LoggerFactory.getLogger(InstanceProfile.class);
private static final Gson GSON = new Gson();

Expand All @@ -64,12 +77,40 @@ public String toJsonStr() {
return GSON.toJson(getConfigStorage());
}

public void setInstanceClass(String className) {
set(TaskConstants.INSTANCE_CLASS, className);
public String getInstanceClass() {
TaskTypeEnum taskType = TaskTypeEnum.getTaskType(getInt(TASK_TYPE, TaskTypeEnum.FILE.getType()));
return getInstanceClassByTaskType(taskType);
}

public String getInstanceClass() {
return get(TaskConstants.INSTANCE_CLASS);
public static String getInstanceClassByTaskType(TaskTypeEnum taskType) {
if (taskType == null) {
return null;
}
switch (taskType) {
case FILE:
return DEFAULT_FILE_INSTANCE;
case KAFKA:
return DEFAULT_KAFKA_INSTANCE;
case PULSAR:
return DEFAULT_PULSAR_INSTANCE;
case POSTGRES:
return DEFAULT_POSTGRES_INSTANCE;
case ORACLE:
return DEFAULT_ORACLE_INSTANCE;
case SQLSERVER:
return DEFAULT_SQLSERVER_INSTANCE;
case MONGODB:
return DEFAULT_MONGODB_INSTANCE;
case REDIS:
return DEFAULT_REDIS_INSTANCE;
case MQTT:
return DEFAULT_MQTT_INSTANCE;
case COS:
return DEFAULT_COS_INSTANCE;
default:
LOGGER.error("invalid task type {}", taskType);
return null;
}
}

public String getTaskId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.inlong.agent.utils.DateTransUtils;
import org.apache.inlong.common.enums.InstanceStateEnum;
import org.apache.inlong.common.enums.TaskStateEnum;
import org.apache.inlong.common.enums.TaskTypeEnum;
import org.apache.inlong.common.pojo.agent.DataConfig;

import com.google.gson.Gson;
Expand All @@ -32,18 +33,32 @@
import java.text.ParseException;
import java.util.TimeZone;

import static java.util.Objects.requireNonNull;
import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID;
import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_RETRY;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_STATE;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_TYPE;

/**
* job profile which contains details describing properties of one job.
*/
public class TaskProfile extends AbstractConfiguration {

public static final String DEFAULT_FILE_TASK = "org.apache.inlong.agent.plugin.task.logcollection.local.FileTask";
public static final String DEFAULT_COS_TASK = "org.apache.inlong.agent.plugin.task.logcollection.cos.COSTask";
public static final String DEFAULT_KAFKA_TASK = "org.apache.inlong.agent.plugin.task.KafkaTask";
public static final String DEFAULT_PULSAR_TASK = "org.apache.inlong.agent.plugin.task.PulsarTask";
public static final String DEFAULT_MONGODB_TASK = "org.apache.inlong.agent.plugin.task.MongoDBTask";
public static final String DEFAULT_ORACLE_TASK = "org.apache.inlong.agent.plugin.task.OracleTask";
public static final String DEFAULT_REDIS_TASK = "org.apache.inlong.agent.plugin.task.RedisTask";
public static final String DEFAULT_POSTGRESQL_TASK = "org.apache.inlong.agent.plugin.task.PostgreSQLTask";
public static final String DEFAULT_MQTT_TASK = "org.apache.inlong.agent.plugin.task.MqttTask";
public static final String DEFAULT_SQLSERVER_TASK = "org.apache.inlong.agent.plugin.task.SQLServerTask";
public static final String DEFAULT_MOCK_TASK = "org.apache.inlong.agent.plugin.task.MockTask";

private static final Gson GSON = new Gson();
private static final Logger logger = LoggerFactory.getLogger(TaskProfile.class);

Expand All @@ -57,6 +72,37 @@ public static TaskProfile convertToTaskProfile(DataConfig dataConfig) {
return TaskProfileDto.convertToTaskProfile(dataConfig);
}

public String getTaskClass() {
TaskTypeEnum taskType = TaskTypeEnum.getTaskType(getInt(TASK_TYPE, TaskTypeEnum.FILE.getType()));
switch (requireNonNull(taskType)) {
case FILE:
return DEFAULT_FILE_TASK;
case KAFKA:
return DEFAULT_KAFKA_TASK;
case PULSAR:
return DEFAULT_PULSAR_TASK;
case POSTGRES:
return DEFAULT_POSTGRESQL_TASK;
case ORACLE:
return DEFAULT_ORACLE_TASK;
case SQLSERVER:
return DEFAULT_SQLSERVER_TASK;
case MONGODB:
return DEFAULT_MONGODB_TASK;
case REDIS:
return DEFAULT_REDIS_TASK;
case MQTT:
return DEFAULT_MQTT_TASK;
case COS:
return DEFAULT_COS_TASK;
case MOCK:
return DEFAULT_MOCK_TASK;
default:
logger.error("invalid task type {}", taskType);
return null;
}
}

public String getTaskId() {
return get(TaskConstants.TASK_ID);
}
Expand All @@ -65,10 +111,6 @@ public String getCycleUnit() {
return get(TaskConstants.TASK_CYCLE_UNIT);
}

public String getTimeOffset() {
return get(TaskConstants.TASK_FILE_TIME_OFFSET, "");
}

public String getTimeZone() {
return get(TaskConstants.TASK_TIME_ZONE);
}
Expand All @@ -85,14 +127,6 @@ public boolean isRetry() {
return getBoolean(TASK_RETRY, false);
}

public String getTaskClass() {
return get(TaskConstants.TASK_CLASS);
}

public void setTaskClass(String className) {
set(TaskConstants.TASK_CLASS, className);
}

public String getInlongGroupId() {
return get(PROXY_INLONG_GROUP_ID, DEFAULT_PROXY_INLONG_GROUP_ID);
}
Expand Down Expand Up @@ -128,11 +162,9 @@ public String toJsonStr() {
return GSON.toJson(getConfigStorage());
}

public InstanceProfile createInstanceProfile(String instanceClass, String fileName, String cycleUnit,
String dataTime,
public InstanceProfile createInstanceProfile(String fileName, String cycleUnit, String dataTime,
long fileUpdateTime) {
InstanceProfile instanceProfile = InstanceProfile.parseJsonStr(toJsonStr());
instanceProfile.setInstanceClass(instanceClass);
instanceProfile.setInstanceId(fileName);
instanceProfile.setSourceDataTime(dataTime);
Long sinkDataTime = 0L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,8 @@
*/
public class TaskConstants extends CommonConstants {

// job id
// public static final String JOB_ID = "job.id";
public static final String TASK_ID = "task.id";
public static final String INSTANCE_ID = "instance.id";
public static final String JOB_INSTANCE_ID = "job.instance.id";
public static final String INSTANCE_CREATE_TIME = "instance.createTime";
public static final String INSTANCE_MODIFY_TIME = "instance.modifyTime";
public static final String TASK_GROUP_ID = "task.groupId";
Expand All @@ -36,9 +33,7 @@ public class TaskConstants extends CommonConstants {
public static final String TASK_SOURCE = "task.source";

public static final String TASK_CHANNEL = "task.channel";

public static final String TASK_CLASS = "task.taskClass";
public static final String INSTANCE_CLASS = "task.instance.class";
public static final String TASK_TYPE = "task.taskType";
public static final String TASK_FILE_TRIGGER = "task.fileTask.trigger";

// sink config
Expand All @@ -59,12 +54,12 @@ public class TaskConstants extends CommonConstants {
public static final String TASK_CYCLE_UNIT = "task.cycleUnit";
public static final String FILE_TASK_CYCLE_UNIT = "task.fileTask.cycleUnit";
public static final String TASK_FILE_CONTENT_COLLECT_TYPE = "task.fileTask.contentCollectType";
public static final String SOURCE_DATA_CONTENT_STYLE = "task.fileTask.dataContentStyle";
public static final String SOURCE_DATA_SEPARATOR = "task.fileTask.dataSeparator";
public static final String SOURCE_FILTER_STREAMS = "task.fileTask.filterStreams";
public static final String TASK_RETRY = "task.fileTask.retry";
public static final String TASK_START_TIME = "task.fileTask.startTime";
public static final String TASK_END_TIME = "task.fileTask.endTime";
public static final String FILE_CONTENT_STYLE = "task.fileTask.dataContentStyle";
public static final String FILE_DATA_SEPARATOR = "task.fileTask.dataSeparator";
public static final String FILE_FILTER_STREAMS = "task.fileTask.filterStreams";
public static final String TASK_RETRY = "task.retry";
public static final String FILE_TASK_TIME_FROM = "task.fileTask.dataTimeFrom";
public static final String FILE_TASK_TIME_TO = "task.fileTask.dataTimeTo";
public static final String FILE_MAX_NUM = "task.fileTask.maxFileCount";
public static final String PREDEFINE_FIELDS = "task.predefinedFields";
public static final String TASK_AUDIT_VERSION = "task.auditVersion";
Expand All @@ -75,6 +70,22 @@ public class TaskConstants extends CommonConstants {
public static final String TASK_KAFKA_OFFSET = "task.kafkaTask.partition.offset";
public static final String TASK_KAFKA_AUTO_COMMIT_OFFSET_RESET = "task.kafkaTask.autoOffsetReset";

// COS task
public static final String COS_TASK_CYCLE_UNIT = "task.cosTask.cycleUnit";
public static final String COS_CONTENT_STYLE = "task.cosTask.contentStyle";
public static final String COS_MAX_NUM = "task.cosTask.maxFileCount";
public static final String COS_TASK_PATTERN = "task.cosTask.pattern";
public static final String TASK_COS_TIME_OFFSET = "task.cosTask.timeOffset";
public static final String COS_TASK_RETRY = "task.cosTask.retry";
public static final String COS_TASK_TIME_FROM = "task.cosTask.dataTimeFrom";
public static final String COS_TASK_TIME_TO = "task.cosTask.dataTimeTo";
public static final String COS_TASK_BUCKET_NAME = "task.cosTask.bucketName";
public static final String COS_TASK_SECRET_ID = "task.cosTask.secretId";
public static final String COS_TASK_SECRET_KEY = "task.cosTask.secretKey";
public static final String COS_TASK_REGION = "task.cosTask.region";
public static final String COS_DATA_SEPARATOR = "task.cosTask.dataSeparator";
public static final String COS_FILTER_STREAMS = "task.cosTask.filterStreams";

/**
* delimiter to split offset for different task
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import org.apache.inlong.audit.AuditOperator;
import org.apache.inlong.audit.entity.AuditComponent;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashSet;

import static org.apache.inlong.agent.constant.AgentConstants.AUDIT_ENABLE;
Expand All @@ -38,6 +41,8 @@
*/
public class AuditUtils {

private static final Logger LOGGER = LoggerFactory.getLogger(AuditUtils.class);
public static final int AGENT_ISOLATE_KEY = 1;
public static int AUDIT_ID_AGENT_READ_SUCCESS = 3;
public static int AUDIT_ID_AGENT_SEND_SUCCESS = 4;
public static int AUDIT_ID_AGENT_READ_FAILED = 524291;
Expand Down Expand Up @@ -90,8 +95,17 @@ public static void add(int auditID, String inlongGroupId, String inlongStreamId,
if (!IS_AUDIT) {
return;
}
AuditOperator.getInstance()
.add(auditID, DEFAULT_AUDIT_TAG, inlongGroupId, inlongStreamId, logTime, count, size, version);
if (inlongGroupId == null || inlongStreamId == null) {
LOGGER.error("invalid args inlongGroupId: {}, inlongStreamId: {}", inlongGroupId, inlongStreamId);
return;
}
try {
AuditOperator.getInstance()
.add(auditID, DEFAULT_AUDIT_TAG, inlongGroupId, inlongStreamId, logTime, count, size, version);
} catch (Throwable e) {
LOGGER.error("call audit add inlongGroupId: {}, inlongStreamId: {}, auditID {}, error", inlongGroupId,
inlongStreamId, auditID, e);
}
}

public static void add(int auditID, String inlongGroupId, String inlongStreamId,
Expand All @@ -106,6 +120,6 @@ public static void send() {
if (!IS_AUDIT) {
return;
}
AuditOperator.getInstance().flush();
AuditOperator.getInstance().flush(AGENT_ISOLATE_KEY);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ public abstract class Instance extends AbstractStateWrapper {
*/
public abstract void destroy();

/**
* notify destroy instance.
*/
public abstract void notifyDestroy();

/**
* get instance profile
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.agent.pojo;

import lombok.Data;

import java.util.List;

@Data
public class COSTask {

private Integer id;
private String pattern;
private String cycleUnit;
private Boolean retry;
private String dataTimeFrom;
private String dataTimeTo;
private String timeOffset;
private Integer maxFileCount;
private String collectType;
private String contentStyle;
private String dataSeparator;
private String filterStreams;
private String bucketName;
private String secretId;
private String secretKey;
private String region;

@Data
public static class COSTaskConfig {

private String pattern;
private String cycleUnit;
private Boolean retry;
private String dataTimeFrom;
private String dataTimeTo;
// '1m' means one minute after, '-1m' means one minute before
// '1h' means one hour after, '-1h' means one hour before
// '1d' means one day after, '-1d' means one day before
// Null means from current timestamp
private String timeOffset;
private Integer maxFileCount;
// Collect type, for example: FULL, INCREMENT
private String collectType;
// Type of data result for column separator
// CSV format, set this parameter to a custom separator: , | :
// Json format, set this parameter to json
private String contentStyle;
// Column separator of data source
private String dataSeparator;
// The streamIds to be filtered out
private List<String> filterStreams;
private String bucketName;
private String credentialsId;
private String credentialsKey;
private String region;
}
}
Loading

0 comments on commit 866fe40

Please sign in to comment.