diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java index 5cfca599abb..9381a4e3c44 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java @@ -17,27 +17,6 @@ package org.apache.inlong.agent.core.job; -import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_DB_CACHE_CHECK_INTERVAL; -import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_DB_CACHE_TIME; -import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_NUMBER_LIMIT; -import static org.apache.inlong.agent.constant.AgentConstants.JOB_DB_CACHE_CHECK_INTERVAL; -import static org.apache.inlong.agent.constant.AgentConstants.JOB_DB_CACHE_TIME; -import static org.apache.inlong.agent.constant.AgentConstants.JOB_NUMBER_LIMIT; -import static org.apache.inlong.agent.constant.AgentConstants.JOB_VERSION; -import static org.apache.inlong.agent.constant.JobConstants.JOB_ID; -import static org.apache.inlong.agent.constant.JobConstants.JOB_ID_PREFIX; -import static org.apache.inlong.agent.constant.JobConstants.JOB_INSTANCE_ID; -import static org.apache.inlong.agent.constant.JobConstants.SQL_JOB_ID; -import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_COMPONENT_NAME; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import org.apache.inlong.agent.common.AbstractDaemon; import org.apache.inlong.agent.common.AgentThreadFactory; import org.apache.inlong.agent.conf.AgentConfiguration; @@ -52,9 +31,32 @@ import org.apache.inlong.agent.utils.GsonUtil; import org.apache.inlong.agent.utils.ThreadUtils; import org.apache.inlong.common.metric.MetricRegister; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_DB_CACHE_CHECK_INTERVAL; +import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_DB_CACHE_TIME; +import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_NUMBER_LIMIT; +import static org.apache.inlong.agent.constant.AgentConstants.JOB_DB_CACHE_CHECK_INTERVAL; +import static org.apache.inlong.agent.constant.AgentConstants.JOB_DB_CACHE_TIME; +import static org.apache.inlong.agent.constant.AgentConstants.JOB_NUMBER_LIMIT; +import static org.apache.inlong.agent.constant.AgentConstants.JOB_VERSION; +import static org.apache.inlong.agent.constant.JobConstants.JOB_ID; +import static org.apache.inlong.agent.constant.JobConstants.JOB_ID_PREFIX; +import static org.apache.inlong.agent.constant.JobConstants.JOB_INSTANCE_ID; +import static org.apache.inlong.agent.constant.JobConstants.SQL_JOB_ID; +import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_COMPONENT_NAME; + /** * JobManager maintains lots of jobs, and communicate between server and task manager. */ diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobWrapper.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobWrapper.java index 224f1d95674..397812624bd 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobWrapper.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobWrapper.java @@ -17,21 +17,6 @@ package org.apache.inlong.agent.core.job; -import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_VERSION; -import static org.apache.inlong.agent.constant.AgentConstants.JOB_VERSION; -import static org.apache.inlong.agent.constant.JobConstants.JOB_OFFSET_DELIMITER; - -import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Queue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.commons.codec.digest.DigestUtils; import org.apache.inlong.agent.common.AgentThreadFactory; import org.apache.inlong.agent.conf.AgentConfiguration; import org.apache.inlong.agent.conf.JobProfile; @@ -50,9 +35,26 @@ import org.apache.inlong.agent.utils.ThreadUtils; import org.apache.inlong.common.constant.Constants; import org.apache.inlong.common.db.CommandEntity; + +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import org.apache.commons.codec.digest.DigestUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_VERSION; +import static org.apache.inlong.agent.constant.AgentConstants.JOB_VERSION; +import static org.apache.inlong.agent.constant.JobConstants.JOB_OFFSET_DELIMITER; + /** * JobWrapper is used in JobManager, it defines the life cycle of * running job and maintains the state of job. diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java index 3c883ad683e..786bcf62cdd 100755 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java @@ -17,16 +17,6 @@ package org.apache.inlong.agent.core.trigger; -import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_TRIGGER_MAX_RUNNING_NUM; -import static org.apache.inlong.agent.constant.AgentConstants.TRIGGER_MAX_RUNNING_NUM; -import static org.apache.inlong.agent.constant.JobConstants.JOB_ID; - -import com.google.common.base.Preconditions; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; -import org.apache.commons.lang3.StringUtils; import org.apache.inlong.agent.common.AbstractDaemon; import org.apache.inlong.agent.conf.AgentConfiguration; import org.apache.inlong.agent.conf.JobProfile; @@ -38,9 +28,21 @@ import org.apache.inlong.agent.db.TriggerProfileDb; import org.apache.inlong.agent.plugin.Trigger; import org.apache.inlong.agent.utils.ThreadUtils; + +import com.google.common.base.Preconditions; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_TRIGGER_MAX_RUNNING_NUM; +import static org.apache.inlong.agent.constant.AgentConstants.TRIGGER_MAX_RUNNING_NUM; +import static org.apache.inlong.agent.constant.JobConstants.JOB_ID; + /** * manager for triggers. */ diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java index d4269740b47..1dd85bc15d4 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java @@ -17,6 +17,43 @@ package org.apache.inlong.agent.plugin.fetcher; +import org.apache.inlong.agent.common.AbstractDaemon; +import org.apache.inlong.agent.conf.AgentConfiguration; +import org.apache.inlong.agent.conf.JobProfile; +import org.apache.inlong.agent.conf.ProfileFetcher; +import org.apache.inlong.agent.conf.TriggerProfile; +import org.apache.inlong.agent.core.AgentManager; +import org.apache.inlong.agent.db.CommandDb; +import org.apache.inlong.agent.plugin.Trigger; +import org.apache.inlong.agent.plugin.utils.PluginUtils; +import org.apache.inlong.agent.pojo.ConfirmAgentIpRequest; +import org.apache.inlong.agent.pojo.DbCollectorTaskRequestDto; +import org.apache.inlong.agent.pojo.DbCollectorTaskResult; +import org.apache.inlong.agent.utils.AgentUtils; +import org.apache.inlong.agent.utils.HttpManager; +import org.apache.inlong.agent.utils.ThreadUtils; +import org.apache.inlong.common.db.CommandEntity; +import org.apache.inlong.common.enums.ManagerOpEnum; +import org.apache.inlong.common.enums.PullJobTypeEnum; +import org.apache.inlong.common.pojo.agent.CmdConfig; +import org.apache.inlong.common.pojo.agent.TaskRequest; +import org.apache.inlong.common.pojo.agent.TaskResult; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + import static java.util.Objects.requireNonNull; import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_NAME; import static org.apache.inlong.agent.constant.AgentConstants.AGENT_UNIQ_ID; @@ -46,41 +83,6 @@ import static org.apache.inlong.agent.utils.AgentUtils.fetchLocalIp; import static org.apache.inlong.agent.utils.AgentUtils.fetchLocalUuid; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import com.google.gson.JsonArray; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import java.io.File; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; -import org.apache.inlong.agent.common.AbstractDaemon; -import org.apache.inlong.agent.conf.AgentConfiguration; -import org.apache.inlong.agent.conf.JobProfile; -import org.apache.inlong.agent.conf.ProfileFetcher; -import org.apache.inlong.agent.conf.TriggerProfile; -import org.apache.inlong.agent.core.AgentManager; -import org.apache.inlong.agent.db.CommandDb; -import org.apache.inlong.agent.plugin.Trigger; -import org.apache.inlong.agent.plugin.utils.PluginUtils; -import org.apache.inlong.agent.pojo.ConfirmAgentIpRequest; -import org.apache.inlong.agent.pojo.DbCollectorTaskRequestDto; -import org.apache.inlong.agent.pojo.DbCollectorTaskResult; -import org.apache.inlong.agent.utils.AgentUtils; -import org.apache.inlong.agent.utils.HttpManager; -import org.apache.inlong.agent.utils.ThreadUtils; -import org.apache.inlong.common.db.CommandEntity; -import org.apache.inlong.common.enums.ManagerOpEnum; -import org.apache.inlong.common.enums.PullJobTypeEnum; -import org.apache.inlong.common.pojo.agent.CmdConfig; -import org.apache.inlong.common.pojo.agent.TaskRequest; -import org.apache.inlong.common.pojo.agent.TaskResult; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** * Fetch command from Inlong-Manager */ @@ -136,7 +138,7 @@ private boolean requiredKeys(AgentConfiguration conf) { private String buildBaseUrl() { return "http://" + conf.get(AGENT_MANAGER_VIP_HTTP_HOST) + ":" + conf.get(AGENT_MANAGER_VIP_HTTP_PORT) + conf.get( - AGENT_MANAGER_VIP_HTTP_PREFIX_PATH, DEFAULT_AGENT_MANAGER_VIP_HTTP_PREFIX_PATH); + AGENT_MANAGER_VIP_HTTP_PREFIX_PATH, DEFAULT_AGENT_MANAGER_VIP_HTTP_PREFIX_PATH); } /**