Skip to content

Commit

Permalink
Merge branch 'apache:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
luchunliang authored Mar 4, 2024
2 parents 6f793cd + ebfe371 commit b67e445
Show file tree
Hide file tree
Showing 83 changed files with 1,017 additions and 537 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public String getTimeOffset() {
}

public String getTimeZone() {
return get(TaskConstants.TASK_FILE_TIME_ZONE);
return get(TaskConstants.TASK_TIME_ZONE);
}

public TaskStateEnum getState() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public class CommonConstants {

public static final String PROXY_SENDER_MAX_TIMEOUT = "proxy.sender.maxTimeout";
// max timeout in seconds.
public static final int DEFAULT_PROXY_SENDER_MAX_TIMEOUT = 20;
public static final int DEFAULT_PROXY_SENDER_MAX_TIMEOUT = 60;

public static final String PROXY_SENDER_MAX_RETRY = "proxy.sender.maxRetry";
public static final int DEFAULT_PROXY_SENDER_MAX_RETRY = 5;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
public class FetcherConstants {

public static final String AGENT_FETCHER_INTERVAL = "agent.fetcher.interval";
public static final int DEFAULT_AGENT_FETCHER_INTERVAL = 10;
public static final int DEFAULT_AGENT_FETCHER_INTERVAL = 60;

public static final String AGENT_HEARTBEAT_INTERVAL = "agent.heartbeat.interval";
public static final int DEFAULT_AGENT_HEARTBEAT_INTERVAL = 10;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class TaskConstants extends CommonConstants {
public static final String JOB_UUID = "job.uuid";
public static final String TASK_GROUP_ID = "task.groupId";
public static final String TASK_STREAM_ID = "task.streamId";
public static final String RESTORE_FROM_DB = "task.restoreFromDB";

public static final String TASK_SOURCE = "task.source";
public static final String JOB_SOURCE_TYPE = "job.sourceType";
Expand Down Expand Up @@ -64,7 +65,7 @@ public class TaskConstants extends CommonConstants {
public static final String TASK_DIR_FILTER_PATTERN = "task.fileTask.dir.pattern"; // deprecated
public static final String FILE_DIR_FILTER_PATTERNS = "task.fileTask.dir.patterns";
public static final String TASK_FILE_TIME_OFFSET = "task.fileTask.timeOffset";
public static final String TASK_FILE_TIME_ZONE = "task.fileTask.timeZone";
public static final String TASK_TIME_ZONE = "task.timeZone";
public static final String TASK_FILE_MAX_WAIT = "task.fileTask.file.max.wait";
public static final String TASK_CYCLE_UNIT = "task.cycleUnit";
public static final String FILE_TASK_CYCLE_UNIT = "task.fileTask.cycleUnit";
Expand Down Expand Up @@ -107,14 +108,14 @@ public class TaskConstants extends CommonConstants {
public static final String JOB_DATABASE_PORT = "job.binlogJob.port";

// Kafka job
public static final String JOB_KAFKA_TOPIC = "job.kafkaJob.topic";
public static final String JOB_KAFKA_BOOTSTRAP_SERVERS = "job.kafkaJob.bootstrap.servers";
public static final String JOB_KAFKA_GROUP_ID = "job.kafkaJob.group.id";
public static final String TASK_KAFKA_TOPIC = "task.kafkaJob.topic";
public static final String TASK_KAFKA_BOOTSTRAP_SERVERS = "task.kafkaJob.bootstrap.servers";
public static final String TASK_KAFKA_GROUP_ID = "task.kafkaJob.group.id";
public static final String JOB_KAFKA_RECORD_SPEED_LIMIT = "job.kafkaJob.recordSpeed.limit";
public static final String JOB_KAFKA_BYTE_SPEED_LIMIT = "job.kafkaJob.byteSpeed.limit";
public static final String JOB_KAFKA_OFFSET = "job.kafkaJob.partition.offset";
public static final String TASK_KAFKA_OFFSET = "task.kafkaJob.partition.offset";
public static final String JOB_KAFKA_READ_TIMEOUT = "job.kafkaJob.read.timeout";
public static final String JOB_KAFKA_AUTO_COMMIT_OFFSET_RESET = "job.kafkaJob.autoOffsetReset";
public static final String TASK_KAFKA_AUTO_COMMIT_OFFSET_RESET = "task.kafkaJob.autoOffsetReset";

public static final String JOB_MONGO_HOSTS = "job.mongoJob.hosts";
public static final String JOB_MONGO_USER = "job.mongoJob.user";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,6 @@ public static class FileTaskConfig {
// '1d' means one day after, '-1d' means one day before
// Null means from current timestamp
private String timeOffset;
// Asia/Shanghai
private String timeZone;
// For example: a=b&c=b&e=f
private String additionalAttr;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class KafkaJob {
private RecordSpeed recordSpeed;
private ByteSpeed byteSpeed;
private String autoOffsetReset;
private String partitionOffsets;

@Data
public static class Group {
Expand Down Expand Up @@ -70,5 +71,6 @@ public static class KafkaJobTaskConfig {
private String recordSpeedLimit;
private String byteSpeedLimit;
private String autoOffsetReset;
private String partitionOffsets;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
public class TaskProfileDto {

public static final String DEFAULT_FILE_TASK = "org.apache.inlong.agent.plugin.task.file.LogFileTask";
public static final String DEFAULT_KAFKA_TASK = "org.apache.inlong.agent.plugin.task.KafkaTask";
public static final String DEFAULT_CHANNEL = "org.apache.inlong.agent.plugin.channel.MemoryChannel";
public static final String MANAGER_JOB = "MANAGER_JOB";
public static final String DEFAULT_DATA_PROXY_SINK = "org.apache.inlong.agent.plugin.sinks.ProxySink";
Expand Down Expand Up @@ -147,9 +148,6 @@ private static FileTask getFileJob(DataConfig dataConfig) {
if (taskConfig.getTimeOffset() != null) {
fileTask.setTimeOffset(taskConfig.getTimeOffset());
}
if (taskConfig.getTimeZone() != null) {
fileTask.setTimeZone(taskConfig.getTimeZone());
}

if (taskConfig.getAdditionalAttr() != null) {
fileTask.setAddictiveString(taskConfig.getAdditionalAttr());
Expand Down Expand Up @@ -193,7 +191,7 @@ private static KafkaJob getKafkaJob(DataConfig dataConfigs) {
bootstrap.setServers(kafkaJobTaskConfig.getBootstrapServers());
kafkaJob.setBootstrap(bootstrap);
KafkaJob.Partition partition = new KafkaJob.Partition();
partition.setOffset(dataConfigs.getSnapshot());
partition.setOffset(kafkaJobTaskConfig.getPartitionOffsets());
kafkaJob.setPartition(partition);
KafkaJob.Group group = new KafkaJob.Group();
group.setId(kafkaJobTaskConfig.getGroupId());
Expand Down Expand Up @@ -413,6 +411,7 @@ public static TaskProfile convertToTaskProfile(DataConfig dataConfig) {
task.setState(dataConfig.getState());
task.setPredefinedFields(dataConfig.getPredefinedFields());
task.setCycleUnit(CycleUnitType.REAL_TIME);
task.setTimeZone(dataConfig.getTimeZone());

// set sink type
if (dataConfig.getDataReportType() == NORMAL_SEND_TO_DATAPROXY.ordinal()) {
Expand Down Expand Up @@ -451,6 +450,7 @@ public static TaskProfile convertToTaskProfile(DataConfig dataConfig) {
profileDto.setTask(task);
break;
case KAFKA:
task.setTaskClass(DEFAULT_KAFKA_TASK);
KafkaJob kafkaJob = getKafkaJob(dataConfig);
task.setKafkaJob(kafkaJob);
task.setSource(KAFKA_SOURCE);
Expand Down Expand Up @@ -523,6 +523,7 @@ public static class Task {
private String predefinedFields;
private Integer state;
private String cycleUnit;
private String timeZone;

private FileTask fileTask;
private BinlogJob binlogJob;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.inlong.agent.constant.TaskConstants.RESTORE_FROM_DB;

/**
* handle the instance created by task, including add, delete, update etc.
* the instance info is store in both db and memory.
Expand Down Expand Up @@ -339,6 +341,7 @@ private void restoreFromDb() {
if (state == InstanceStateEnum.DEFAULT) {
LOGGER.info("instance restoreFromDb addToMem state {} taskId {} instanceId {}", state, taskId,
profile.getInstanceId());
profile.setBoolean(RESTORE_FROM_DB, true);
addToMemory(profile);
} else {
LOGGER.info("instance restoreFromDb ignore state {} taskId {} instanceId {}", state, taskId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@

import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_NUMBER_LIMIT;
import static org.apache.inlong.agent.constant.AgentConstants.JOB_NUMBER_LIMIT;
import static org.apache.inlong.agent.constant.TaskConstants.RESTORE_FROM_DB;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_STATE;

/**
Expand Down Expand Up @@ -421,6 +422,7 @@ private void restoreFromDb() {
taskProfileList.forEach((profile) -> {
if (profile.getState() == TaskStateEnum.RUNNING) {
LOGGER.info("restore from db taskId {}", profile.getTaskId());
profile.setBoolean(RESTORE_FROM_DB, true);
addToMemory(profile);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,11 @@ private DataConfig getDataConfig(int taskId, String pattern, boolean retry, Long
dataConfig.setDataReportType(1);
dataConfig.setTaskType(3);
dataConfig.setTaskId(taskId);
dataConfig.setTimeZone(timeZone);
dataConfig.setState(state.ordinal());
FileTaskConfig fileTaskConfig = new FileTaskConfig();
fileTaskConfig.setPattern(pattern);
fileTaskConfig.setTimeOffset("0h");
fileTaskConfig.setTimeZone(timeZone);
fileTaskConfig.setMaxFileCount(100);
fileTaskConfig.setCycleUnit("h");
fileTaskConfig.setRetry(retry);
Expand Down
2 changes: 1 addition & 1 deletion inlong-agent/agent-docker/agent-docker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ local_ip=$(ifconfig $ETH_NETWORK | grep "inet" | grep -v "inet6" | awk '{print $
sed -i "s/agent.local.ip=.*$/agent.local.ip=$local_ip/g" "${file_path}/conf/agent.properties"
sed -i "s/agent.fetcher.interval=.*$/agent.fetcher.interval=$AGENT_FETCH_INTERVAL/g" "${file_path}/conf/agent.properties"
sed -i "s/agent.heartbeat.interval=.*$/agent.heartbeat.interval=$AGENT_HEARTBEAT_INTERVAL/g" "${file_path}/conf/agent.properties"
sed -i "s/agent.manager.addr=.*$/agent.manager.addr=http://$MANAGER_OPENAPI_IP:$MANAGER_OPENAPI_PORT/g" "${file_path}/conf/agent.properties"
sed -i "s/agent.manager.addr=.*$/agent.manager.addr=http:\/\/$MANAGER_OPENAPI_IP:$MANAGER_OPENAPI_PORT/g" "${file_path}/conf/agent.properties"
sed -i "s/audit.enable=.*$/audit.enable=$AUDIT_ENABLE/g" "${file_path}/conf/agent.properties"
sed -i "s/audit.proxys=.*$/audit.proxys=$AUDIT_PROXY_URL/g" "${file_path}/conf/agent.properties"
sed -i "s/agent.cluster.tag=.*$/agent.cluster.tag=$CLUSTER_TAG/g" "${file_path}/conf/agent.properties"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,16 @@
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_NAME;
import static org.apache.inlong.agent.constant.AgentConstants.AGENT_UNIQ_ID;
import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AGENT_UNIQ_ID;
import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_FETCHER_INTERVAL;
import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_ADDR;
import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_RETURN_PARAM_DATA;
import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_TASK_HTTP_PATH;
import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_FETCHER_INTERVAL;
import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MANAGER_CONFIG_HTTP_PATH;
import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MANAGER_TASK_HTTP_PATH;
import static org.apache.inlong.agent.plugin.fetcher.ManagerResultFormatter.getResultData;
Expand Down Expand Up @@ -172,45 +175,19 @@ public TaskRequest getFetchRequest(List<CommandEntity> unackedCommands) {
private Runnable taskConfigFetchThread() {
return () -> {
Thread.currentThread().setName("ManagerFetcher");
int normalTaskId = 100;
int testState = 0;
int retryTaskId = 800;
long count = 1;
while (isRunnable()) {
try {
/*
* int configSleepTime = conf.getInt(AGENT_FETCHER_INTERVAL, DEFAULT_AGENT_FETCHER_INTERVAL);
* TimeUnit.SECONDS.sleep(AgentUtils.getRandomBySeed(configSleepTime));
*/
// fetch task config from manager
TaskResult taskresult;
String testDir = conf.get("test.dir", "");
LOGGER.info("test123 test.dir {}", testDir);
if (testDir == "") {
taskresult = getStaticConfig();
} else {
if (count % 10 == 0) {
normalTaskId++;
retryTaskId++;
}
if (testState == 1) {
testState = 2;
} else {
testState = 1;
}
taskresult = getTestConfig(testDir, normalTaskId, retryTaskId, testState);
int configSleepTime = conf.getInt(AGENT_FETCHER_INTERVAL, DEFAULT_AGENT_FETCHER_INTERVAL);
TaskResult taskResult = getStaticConfig();
if (taskResult != null) {
List<TaskProfile> taskProfiles = new ArrayList<>();
taskResult.getDataConfigs().forEach((config) -> {
TaskProfile profile = TaskProfile.convertToTaskProfile(config);
taskProfiles.add(profile);
});
agentManager.getTaskManager().submitTaskProfiles(taskProfiles);
}
if (taskresult == null) {
continue;
}
List<TaskProfile> taskProfiles = new ArrayList<>();
taskresult.getDataConfigs().forEach((config) -> {
TaskProfile profile = TaskProfile.convertToTaskProfile(config);
taskProfiles.add(profile);
});
agentManager.getTaskManager().submitTaskProfiles(taskProfiles);
count++;
AgentUtils.silenceSleepInSeconds(60);
TimeUnit.SECONDS.sleep(AgentUtils.getRandomBySeed(configSleepTime));
} catch (Throwable ex) {
LOGGER.warn("exception caught", ex);
ThreadUtils.threadThrowableHandler(Thread.currentThread(), ex);
Expand Down
Loading

0 comments on commit b67e445

Please sign in to comment.