From 4d5e340f74421b222fd436fc38436359835197a9 Mon Sep 17 00:00:00 2001 From: wenweihuang Date: Fri, 27 Oct 2023 15:27:20 +0800 Subject: [PATCH] Revert "[INLONG-9138][Agent] Add task manager" This reverts commit 726f8e3bb8f7dab3020ead862e77bb7409b2b349. --- .../apache/inlong/agent/plugin/file/Task.java | 52 -- .../agent/core/task/file/MemoryManager.java | 117 ----- .../agent/core/task/file/TaskManager.java | 449 ------------------ .../agent/core/AgentBaseTestsHelper.java | 37 +- .../inlong/agent/core/task/MockTask.java | 69 --- .../agent/core/task/TestTaskManager.java | 97 ---- .../agent/plugin/utils/file/NewDateUtils.java | 228 ++++++++- 7 files changed, 222 insertions(+), 827 deletions(-) delete mode 100755 inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Task.java delete mode 100644 inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/MemoryManager.java delete mode 100644 inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java delete mode 100644 inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/MockTask.java delete mode 100755 inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestTaskManager.java diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Task.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Task.java deleted file mode 100755 index ce580a0bb7e..00000000000 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Task.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.agent.plugin.file; - -import org.apache.inlong.agent.conf.TaskProfile; -import org.apache.inlong.agent.db.Db; -import org.apache.inlong.agent.state.AbstractStateWrapper; - -import java.io.IOException; - -/** - * Task interface, which generates instance in condition. - */ -public abstract class Task extends AbstractStateWrapper { - - /** - * init task by profile - * - * @throws IOException - */ - public abstract void init(Object srcManager, TaskProfile profile, Db basicDb) throws IOException; - - /** - * destroy task. - */ - public abstract void destroy(); - - /** - * get task profile - */ - public abstract TaskProfile getProfile(); - - /** - * get task id - */ - public abstract String getTaskId(); -} diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/MemoryManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/MemoryManager.java deleted file mode 100644 index fca9d37c729..00000000000 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/MemoryManager.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.agent.core.task.file; - -import org.apache.inlong.agent.conf.AgentConfiguration; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Semaphore; - -import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_QUEUE_PERMIT; -import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_SOURCE_PERMIT; -import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_WRITER_PERMIT; -import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT; -import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_READER_SOURCE_PERMIT; -import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_WRITER_PERMIT; - -/** - * used to limit global memory to avoid oom - */ -public class MemoryManager { - - private static final Logger LOGGER = LoggerFactory.getLogger(MemoryManager.class); - private static volatile MemoryManager memoryManager = null; - private final AgentConfiguration conf; - private ConcurrentHashMap semaphoreMap = new ConcurrentHashMap<>(); - - private MemoryManager() { - this.conf = AgentConfiguration.getAgentConf(); - Semaphore semaphore = null; - semaphore = new Semaphore( - conf.getInt(AGENT_GLOBAL_READER_SOURCE_PERMIT, DEFAULT_AGENT_GLOBAL_READER_SOURCE_PERMIT)); - semaphoreMap.put(AGENT_GLOBAL_READER_SOURCE_PERMIT, semaphore); - - semaphore = new Semaphore( - conf.getInt(AGENT_GLOBAL_READER_QUEUE_PERMIT, DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT)); - semaphoreMap.put(AGENT_GLOBAL_READER_QUEUE_PERMIT, semaphore); - - semaphore = new Semaphore( - conf.getInt(AGENT_GLOBAL_WRITER_PERMIT, DEFAULT_AGENT_GLOBAL_WRITER_PERMIT)); - semaphoreMap.put(AGENT_GLOBAL_WRITER_PERMIT, semaphore); - } - - /** - * manager singleton - */ - public static MemoryManager getInstance() { - if (memoryManager == null) { - synchronized (MemoryManager.class) { - if (memoryManager == null) { - memoryManager = new MemoryManager(); - } - } - } - return memoryManager; - } - - public boolean tryAcquire(String semaphoreName, int permit) { - Semaphore semaphore = semaphoreMap.get(semaphoreName); - if (semaphore == null) { - LOGGER.error("tryAcquire {} not exist"); - return false; - } - return semaphore.tryAcquire(permit); - } - - public void release(String semaphoreName, int permit) { - Semaphore semaphore = semaphoreMap.get(semaphoreName); - if (semaphore == null) { - LOGGER.error("release {} not exist"); - return; - } - semaphore.release(permit); - } - - public int getLeft(String semaphoreName) { - Semaphore semaphore = semaphoreMap.get(semaphoreName); - if (semaphore == null) { - LOGGER.error("getLeft {} not exist"); - return -1; - } - return semaphore.availablePermits(); - } - - public void printDetail(String semaphoreName) { - Semaphore semaphore = semaphoreMap.get(semaphoreName); - if (semaphore == null) { - LOGGER.error("printDetail {} not exist"); - return; - } - LOGGER.info("permit left {} wait {} {}", semaphore.availablePermits(), semaphore.getQueueLength(), - semaphoreName); - } - - public void printAll() { - printDetail(AGENT_GLOBAL_READER_SOURCE_PERMIT); - printDetail(AGENT_GLOBAL_READER_QUEUE_PERMIT); - printDetail(AGENT_GLOBAL_WRITER_PERMIT); - } -} diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java deleted file mode 100644 index f75cd100977..00000000000 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java +++ /dev/null @@ -1,449 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.agent.core.task.file; - -import org.apache.inlong.agent.common.AbstractDaemon; -import org.apache.inlong.agent.common.AgentThreadFactory; -import org.apache.inlong.agent.conf.AgentConfiguration; -import org.apache.inlong.agent.conf.TaskProfile; -import org.apache.inlong.agent.constant.AgentConstants; -import org.apache.inlong.agent.core.task.TaskAction; -import org.apache.inlong.agent.db.Db; -import org.apache.inlong.agent.db.RocksDbImp; -import org.apache.inlong.agent.db.TaskProfileDb; -import org.apache.inlong.agent.plugin.file.Task; -import org.apache.inlong.agent.utils.AgentUtils; -import org.apache.inlong.agent.utils.ThreadUtils; -import org.apache.inlong.common.enums.TaskStateEnum; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; -import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_NUMBER_LIMIT; -import static org.apache.inlong.agent.constant.AgentConstants.JOB_NUMBER_LIMIT; -import static org.apache.inlong.agent.constant.TaskConstants.TASK_STATE; - -/** - * handle the task config from manager, including add, delete, update etc. - * the task config is store in both db and memory. - */ -public class TaskManager extends AbstractDaemon { - - private static final Logger LOGGER = LoggerFactory.getLogger(TaskManager.class); - public static final int CONFIG_QUEUE_CAPACITY = 1; - public static final int CORE_THREAD_SLEEP_TIME = 1000; - private static final int ACTION_QUEUE_CAPACITY = 100000; - // task basic db - private final Db taskBasicDb; - // instance basic db - private final Db instanceBasicDb; - // task in db - private final TaskProfileDb taskDb; - // task in memory - private final ConcurrentHashMap taskMap; - // task config from manager. - private final BlockingQueue> configQueue; - // task thread pool; - private final ThreadPoolExecutor runningPool; - // tasks which are not accepted by running pool. - private final BlockingQueue pendingTasks; - private final int taskMaxLimit; - private final AgentConfiguration agentConf; - // instance profile queue. - private final BlockingQueue actionQueue; - - /** - * Init task manager. - */ - public TaskManager() { - this.agentConf = AgentConfiguration.getAgentConf(); - this.taskBasicDb = initDb( - agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH, AgentConstants.AGENT_LOCAL_DB_PATH_TASK)); - this.instanceBasicDb = initDb( - agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH, AgentConstants.AGENT_LOCAL_DB_PATH_INSTANCE)); - taskDb = new TaskProfileDb(taskBasicDb); - this.runningPool = new ThreadPoolExecutor( - 0, Integer.MAX_VALUE, - 60L, TimeUnit.SECONDS, - new SynchronousQueue<>(), - new AgentThreadFactory("task-manager-running-pool")); - taskMap = new ConcurrentHashMap<>(); - taskMaxLimit = agentConf.getInt(JOB_NUMBER_LIMIT, DEFAULT_JOB_NUMBER_LIMIT); - pendingTasks = new LinkedBlockingQueue<>(taskMaxLimit); - configQueue = new LinkedBlockingQueue<>(CONFIG_QUEUE_CAPACITY); - actionQueue = new LinkedBlockingQueue<>(ACTION_QUEUE_CAPACITY); - } - - /** - * init db by class name - * - * @return db - */ - public static Db initDb(String childPath) { - try { - return new RocksDbImp(childPath); - } catch (Exception ex) { - throw new UnsupportedClassVersionError(ex.getMessage()); - } - } - - public void submitTaskProfiles(List taskProfiles) { - if (taskProfiles == null) { - return; - } - while (configQueue.size() != 0) { - configQueue.poll(); - } - configQueue.add(taskProfiles); - } - - public boolean submitAction(TaskAction action) { - if (action == null) { - return false; - } - return actionQueue.offer(action); - } - - /** - * thread for core thread. - * - * @return runnable profile. - */ - private Runnable coreThread() { - return () -> { - Thread.currentThread().setName("task-manager-core"); - while (isRunnable()) { - try { - AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME); - dealWithConfigQueue(configQueue); - dealWithActionQueue(actionQueue); - } catch (Throwable ex) { - LOGGER.error("exception caught", ex); - ThreadUtils.threadThrowableHandler(Thread.currentThread(), ex); - } - } - }; - } - - private void dealWithConfigQueue(BlockingQueue> queue) { - List dataConfigs = queue.poll(); - if (dataConfigs == null) { - return; - } - keepPaceWithManager(dataConfigs); - keepPaceWithDb(); - } - - private void dealWithActionQueue(BlockingQueue queue) { - while (isRunnable()) { - try { - TaskAction action = queue.poll(); - if (action == null) { - break; - } - TaskProfile profile = action.getProfile(); - switch (action.getActionType()) { - case FINISH: - LOGGER.info("test123 deal finish action, taskId {}", profile.getTaskId()); - finishTask(profile); - break; - default: - LOGGER.error("invalid action type for action queue: taskId {} type {}", profile.getTaskId(), - action.getActionType()); - } - } catch (Throwable ex) { - LOGGER.error("dealWithActionQueue", ex); - ThreadUtils.threadThrowableHandler(Thread.currentThread(), ex); - } - } - } - - /** - * keep pace with data config from manager, task state should be only RUNNING or FROZEN. - * NEW and STOP only used in manager - */ - private void keepPaceWithManager(List taskProfiles) { - LOGGER.info("deal with List {}", taskProfiles); - Map tasksFromManager = new ConcurrentHashMap<>(); - taskProfiles.forEach((profile) -> { - TaskStateEnum state = profile.getState(); - if (state == TaskStateEnum.RUNNING || state == TaskStateEnum.FROZEN) { - tasksFromManager.put(profile.getTaskId(), profile); - } else { - LOGGER.error("task {} invalid task state {}", profile, state); - } - }); - traverseManagerTasksToDb(tasksFromManager); - traverseDbTasksToManager(tasksFromManager); - } - - /** - * keep pace with task in db - */ - private void keepPaceWithDb() { - traverseDbTasksToMemory(); - traverseMemoryTasksToDb(); - } - - /** - * keep pace with task in db - */ - private void traverseManagerTasksToDb(Map tasksFromManager) { - tasksFromManager.values().forEach((profileFromManager) -> { - TaskProfile taskFromDb = taskDb.getTask(profileFromManager.getTaskId()); - if (taskFromDb == null) { - LOGGER.info("traverseManagerTasksToDb task {} not found in db retry {} state {}, add it", - profileFromManager.getTaskId(), - profileFromManager.isRetry(), profileFromManager.getState()); - addTask(profileFromManager); - } else { - TaskStateEnum managerState = profileFromManager.getState(); - TaskStateEnum dbState = taskFromDb.getState(); - if (managerState == dbState) { - return; - } - if (dbState == TaskStateEnum.FINISH) { - LOGGER.info("traverseManagerTasksToDb task {} dbState {} retry {}, do nothing", - taskFromDb.getTaskId(), dbState, - taskFromDb.isRetry()); - return; - } - if (managerState == TaskStateEnum.RUNNING) { - LOGGER.info("traverseManagerTasksToDb task {} dbState {} retry {}, active it", - taskFromDb.getTaskId(), dbState, taskFromDb.isRetry()); - activeTask(profileFromManager); - } else { - LOGGER.info("traverseManagerTasksToDb task {} dbState {} retry {}, freeze it", - taskFromDb.getTaskId(), dbState, taskFromDb.isRetry()); - freezeTask(profileFromManager); - } - } - }); - } - - /** - * traverse tasks in db, if not found in tasks from manager then delete it - */ - private void traverseDbTasksToManager(Map tasksFromManager) { - taskDb.getTasks().forEach((profileFromDb) -> { - if (!tasksFromManager.containsKey(profileFromDb.getTaskId())) { - LOGGER.info("traverseDbTasksToManager try to delete task {}", profileFromDb.getTaskId()); - deleteTask(profileFromDb); - } - }); - } - - /** - * manager task state is RUNNING and taskMap not found then add - * manager task state is FROZE and taskMap found thrn delete - */ - private void traverseDbTasksToMemory() { - taskDb.getTasks().forEach((profileFromDb) -> { - TaskStateEnum dbState = profileFromDb.getState(); - Task task = taskMap.get(profileFromDb.getTaskId()); - if (dbState == TaskStateEnum.RUNNING) { - if (task == null) { - LOGGER.info("traverseDbTasksToMemory add task to mem taskId {}", profileFromDb.getTaskId()); - addToMemory(profileFromDb); - } - } else if (dbState == TaskStateEnum.FROZEN) { - if (task != null) { - LOGGER.info("traverseDbTasksToMemory delete task from mem taskId {}", - profileFromDb.getTaskId()); - deleteFromMemory(profileFromDb.getTaskId()); - } - } else { - if (dbState != TaskStateEnum.FINISH) { - LOGGER.error("task {} invalid state {}", profileFromDb.getTaskId(), dbState); - } - } - }); - } - - /** - * task in taskMap but not in taskDb then delete - * task in taskMap but task state from db is FROZEN then delete - */ - private void traverseMemoryTasksToDb() { - taskMap.values().forEach((task) -> { - TaskProfile profileFromDb = taskDb.getTask(task.getTaskId()); - if (profileFromDb == null) { - deleteFromMemory(task.getTaskId()); - return; - } - TaskStateEnum stateFromDb = profileFromDb.getState(); - if (stateFromDb != TaskStateEnum.RUNNING) { - deleteFromMemory(task.getTaskId()); - } - }); - } - - /** - * add task profile to db - * if task state is RUNNING then add task to memory - */ - private void addTask(TaskProfile taskProfile) { - if (taskMap.size() >= taskMaxLimit) { - LOGGER.error("taskMap size {} over limit {}", taskMap.size(), taskMaxLimit); - return; - } - addToDb(taskProfile); - TaskStateEnum state = TaskStateEnum.getTaskState(taskProfile.getInt(TASK_STATE)); - if (state == TaskStateEnum.RUNNING) { - addToMemory(taskProfile); - } else { - LOGGER.info("taskId {} state {} no need to add to memory", taskProfile.getTaskId(), - taskProfile.getState()); - } - } - - private void deleteTask(TaskProfile taskProfile) { - deleteFromDb(taskProfile); - deleteFromMemory(taskProfile.getTaskId()); - } - - private void freezeTask(TaskProfile taskProfile) { - updateToDb(taskProfile); - deleteFromMemory(taskProfile.getTaskId()); - } - - private void finishTask(TaskProfile taskProfile) { - taskProfile.setState(TaskStateEnum.FINISH); - updateToDb(taskProfile); - deleteFromMemory(taskProfile.getTaskId()); - } - - private void activeTask(TaskProfile taskProfile) { - updateToDb(taskProfile); - addToMemory(taskProfile); - } - - private void restoreFromDb() { - List taskProfileList = taskDb.getTasks(); - taskProfileList.forEach((profile) -> { - if (profile.getState() == TaskStateEnum.RUNNING) { - LOGGER.info("restoreFromDb taskId {}", profile.getTaskId()); - addToMemory(profile); - } - }); - } - - private void stopAllTasks() { - taskMap.values().forEach((task) -> { - task.destroy(); - }); - taskMap.clear(); - } - - /** - * add task to db, it was expected that there is no record refer the task id. - * cause the task id will change if the task content changes, replace the record - * if it is found, the memory record will be updated by the db. - */ - private void addToDb(TaskProfile taskProfile) { - if (taskDb.getTask(taskProfile.getTaskId()) != null) { - LOGGER.error("task {} should not exist", taskProfile); - } - taskDb.storeTask(taskProfile); - } - - private void deleteFromDb(TaskProfile taskProfile) { - if (taskDb.getTask(taskProfile.getTaskId()) == null) { - LOGGER.error("try to delete task {} but not found in db", taskProfile); - return; - } - taskDb.deleteTask(taskProfile.getTaskId()); - } - - private void updateToDb(TaskProfile taskProfile) { - if (taskDb.getTask(taskProfile.getTaskId()) == null) { - LOGGER.error("task {} not found, agent may have been reinstalled", taskProfile); - } - taskDb.storeTask(taskProfile); - } - - /** - * add task to memory, if there is a record refer to the task id exist we need to destroy it first. - */ - private void addToMemory(TaskProfile taskProfile) { - Task oldTask = taskMap.get(taskProfile.getTaskId()); - if (oldTask != null) { - oldTask.destroy(); - taskMap.remove(taskProfile.getTaskId()); - LOGGER.error("old task {} should not exist, try stop it first", - taskProfile); - } - try { - Class taskClass = Class.forName(taskProfile.getTaskClass()); - Task task = (Task) taskClass.newInstance(); - task.init(this, taskProfile, instanceBasicDb); - taskMap.put(taskProfile.getTaskId(), task); - runningPool.submit(task); - LOGGER.info( - "add task {} into memory, taskMap size {}, runningPool task total {}, runningPool task active {}", - task.getTaskId(), taskMap.size(), runningPool.getTaskCount(), - runningPool.getActiveCount()); - } catch (Throwable t) { - LOGGER.error("add task error {}", t.getMessage()); - } - } - - private void deleteFromMemory(String taskId) { - Task oldTask = taskMap.get(taskId); - if (oldTask == null) { - LOGGER.error("old task {} not found", taskId); - return; - } - oldTask.destroy(); - taskMap.remove(oldTask.getTaskId()); - LOGGER.info( - "delete task {} from memory, taskMap size {}, runningPool task total {}, runningPool task active {}", - oldTask.getTaskId(), taskMap.size(), runningPool.getTaskCount(), - runningPool.getActiveCount()); - } - - public Task getTask(String taskId) { - return taskMap.get(taskId); - } - - public TaskProfile getTaskProfile(String taskId) { - return taskDb.getTask(taskId); - } - - @Override - public void start() throws Exception { - restoreFromDb(); - submitWorker(coreThread()); - } - - @Override - public void stop() throws Exception { - stopAllTasks(); - waitForTerminate(); - runningPool.shutdown(); - } -} diff --git a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java index 1628a4f61fa..ea771e96f46 100755 --- a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java +++ b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java @@ -18,14 +18,8 @@ package org.apache.inlong.agent.core; import org.apache.inlong.agent.conf.AgentConfiguration; -import org.apache.inlong.agent.conf.TaskProfile; import org.apache.inlong.agent.constant.AgentConstants; -import org.apache.inlong.agent.pojo.FileTask.FileTaskConfig; -import org.apache.inlong.common.enums.TaskStateEnum; -import org.apache.inlong.common.pojo.agent.DataConfig; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; import org.apache.commons.io.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,8 +33,7 @@ public class AgentBaseTestsHelper { private static final Logger LOGGER = LoggerFactory.getLogger(AgentBaseTestsHelper.class); - private static final GsonBuilder gsonBuilder = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss"); - private static final Gson GSON = gsonBuilder.create(); + private final String className; private Path testRootDir; @@ -71,32 +64,4 @@ public void teardownAgentHome() { } } } - - public TaskProfile getTaskProfile(int taskId, String pattern, boolean retry, Long startTime, Long endTime, - TaskStateEnum state) { - DataConfig dataConfig = getDataConfig(taskId, pattern, retry, startTime, endTime, state); - TaskProfile profile = TaskProfile.convertToTaskProfile(dataConfig); - return profile; - } - - private DataConfig getDataConfig(int taskId, String pattern, boolean retry, Long startTime, Long endTime, - TaskStateEnum state) { - DataConfig dataConfig = new DataConfig(); - dataConfig.setInlongGroupId("testGroupId"); // 老字段 groupId - dataConfig.setInlongStreamId("testStreamId"); // 老字段 streamId - dataConfig.setDataReportType(1); // 老字段 reportType - dataConfig.setTaskType(3); // 老字段 任务类型,3 代表文件采集 - dataConfig.setTaskId(taskId); // 老字段 任务 id - dataConfig.setState(state.ordinal()); // 新增! 任务状态 1 正常 2 暂停 - FileTaskConfig fileTaskConfig = new FileTaskConfig(); - fileTaskConfig.setPattern(pattern);// 正则 - fileTaskConfig.setTimeOffset("0d"); // 老字段 时间偏移 "-1d" 采一天前的 "-2h" 采 2 小时前的 - fileTaskConfig.setMaxFileCount(100); // 最大文件数 - fileTaskConfig.setCycleUnit("D"); // 新增! 任务周期 "D" 天 "h" 小时 - fileTaskConfig.setRetry(retry); // 新增! 是否补录,如果是补录任务则为 true - fileTaskConfig.setStartTime(startTime); - fileTaskConfig.setEndTime(endTime); - dataConfig.setExtParams(GSON.toJson(fileTaskConfig)); - return dataConfig; - } } diff --git a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/MockTask.java b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/MockTask.java deleted file mode 100644 index 23c5ad5cc5b..00000000000 --- a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/MockTask.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.agent.core.task; - -import org.apache.inlong.agent.conf.TaskProfile; -import org.apache.inlong.agent.core.task.file.TaskManager; -import org.apache.inlong.agent.db.Db; -import org.apache.inlong.agent.plugin.file.Task; - -import java.io.IOException; - -public class MockTask extends Task { - - public static final int INIT_TIME = 100; - public static final int RUN_TIME = 101; - public static final int DESTROY_TIME = 102; - private TaskProfile profile; - private long index = INIT_TIME; - public long initTime = 0; - public long destroyTime = 0; - public long runtime = 0; - private TaskManager manager; - - @Override - public void init(Object srcManager, TaskProfile profile, Db basicDb) throws IOException { - manager = (TaskManager) srcManager; - this.profile = profile; - } - - @Override - public void destroy() { - destroyTime = index++; - } - - @Override - public TaskProfile getProfile() { - return profile; - } - - @Override - public String getTaskId() { - return profile.getTaskId(); - } - - @Override - public void addCallbacks() { - - } - - @Override - public void run() { - runtime = index++; - } -} \ No newline at end of file diff --git a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestTaskManager.java b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestTaskManager.java deleted file mode 100755 index bf9047c40a3..00000000000 --- a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestTaskManager.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.agent.core.task; - -import org.apache.inlong.agent.conf.TaskProfile; -import org.apache.inlong.agent.core.AgentBaseTestsHelper; -import org.apache.inlong.agent.core.task.file.TaskManager; -import org.apache.inlong.common.enums.TaskStateEnum; - -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import static org.awaitility.Awaitility.await; - -public class TestTaskManager { - - private static final Logger LOGGER = LoggerFactory.getLogger(TestTaskManager.class); - private static TaskManager manager; - private static AgentBaseTestsHelper helper; - - @BeforeClass - public static void setup() { - helper = new AgentBaseTestsHelper(TestTaskManager.class.getName()).setupAgentHome(); - try { - manager = new TaskManager(); - manager.start(); - } catch (Exception e) { - Assert.assertTrue("manager start error", false); - } - } - - @AfterClass - public static void teardown() throws Exception { - manager.stop(); - helper.teardownAgentHome(); - } - - @Test - public void testTaskManager() { - String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+"; - TaskProfile taskProfile1 = helper.getTaskProfile(1, pattern, false, 0L, 0L, TaskStateEnum.RUNNING); - String taskId1 = taskProfile1.getTaskId(); - taskProfile1.setTaskClass(MockTask.class.getCanonicalName()); - List taskProfiles1 = new ArrayList<>(); - taskProfiles1.add(taskProfile1); - // test add - manager.submitTaskProfiles(taskProfiles1); - await().atMost(3, TimeUnit.SECONDS).until(() -> manager.getTask(taskId1) != null); - LOGGER.info("state {}", manager.getTaskProfile(taskId1).getState()); - Assert.assertTrue(manager.getTaskProfile(taskId1).getState() == TaskStateEnum.RUNNING); - - // test froze - taskProfile1.setState(TaskStateEnum.FROZEN); - manager.submitTaskProfiles(taskProfiles1); - await().atMost(3, TimeUnit.SECONDS).until(() -> manager.getTask(taskId1) == null); - Assert.assertTrue(manager.getTaskProfile(taskId1).getState() == TaskStateEnum.FROZEN); - taskProfile1.setState(TaskStateEnum.RUNNING); - manager.submitTaskProfiles(taskProfiles1); - await().atMost(3, TimeUnit.SECONDS).until(() -> manager.getTask(taskId1) != null); - Assert.assertTrue(manager.getTaskProfile(taskId1).getState() == TaskStateEnum.RUNNING); - - // test delete - TaskProfile taskProfile2 = helper.getTaskProfile(2, pattern, false, 0L, 0L, TaskStateEnum.RUNNING); - taskProfile2.setTaskClass(MockTask.class.getCanonicalName()); - List taskProfiles2 = new ArrayList<>(); - taskProfiles2.add(taskProfile2); - manager.submitTaskProfiles(taskProfiles2); - await().atMost(3, TimeUnit.SECONDS).until(() -> manager.getTask(taskId1) == null); - Assert.assertTrue(manager.getTaskProfile(taskId1) == null); - String taskId2 = taskProfile2.getTaskId(); - await().atMost(3, TimeUnit.SECONDS).until(() -> manager.getTask(taskId2) != null); - Assert.assertTrue(manager.getTaskProfile(taskId2).getState() == TaskStateEnum.RUNNING); - } -} diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java index f890ba6f133..9a9f8ece968 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java @@ -231,12 +231,10 @@ public static boolean isValidCreationTime(String dataTime, String cycleUnit, } /** - * Calculate offset time based on offset - * The current offset will only be offset forward, or it can be offset backward to be compatible with the previous - * calculation method (subtraction). When it is offset backward, it returns negative; When offset forward, return to - * positive + * 根据偏移量计算偏移时间 + * 当前偏移只会向前偏移,也可向后偏移为兼容之前的计算方式(相减),当为向后偏移时,返回负;当向前偏移,返回正 * - * @param timeOffset offset,such as -1d,-4h,-10m; + * @param timeOffset 偏移量,如-1d,-4h,-10m等; * @return */ public static long caclOffset(String timeOffset) { @@ -412,8 +410,7 @@ public static String getDateTime(String fileName, PathDateExpression dateExpress Matcher mat = Pattern.compile(longestDatePattern).matcher(fileName); boolean find = mat.find(); - // TODO : more than one part match the time regex in file name ("/data/joox_logs/2000701106/201602170040.log" - // YYYYMMDDhh) + // TODO : 存在文件名中有多个部分匹配到时间表达式的情况("/data/joox_logs/2000701106/201602170040.log" YYYYMMDDhh) if (!find) { logger.error("Can't find the pattern {} for file name {}", longestDatePattern, fileName); @@ -720,4 +717,221 @@ public static List getDateRegion(String start, String end, return ret; } + + public static void main(String[] args) throws Exception { + + // String aa = "/data/taox/YYYYMMDDt_log/[0-9]+_YYYYMMDD_hh00.log"; + /* + * String aa = "/data/taox/YYYYt_logMMDD/[0-9]+_YYYYMMDD_hh00.log"; String bb = + * replaceDateExpressionWithRegex(aa); System.out.println("---------: " + bb); + * + * String cc = replaceDateExpression(Calendar.getInstance(), aa); System.out.println("---------: " + cc); + * + * String dd = replaceDateExpression1(Calendar.getInstance(), aa); System.out.println("---------: " + dd); + */ + + // String text = "/data1/qq_BaseInfo/YYYY-MM/YYYY-MM-DD/gamedr.gamedb[0-9]+.minigame + // .db/YYYY-MM-DD-[0-9]+.txt"; + // System.out.println(replaceDateExpressionWithRegex(text)); + // + // int timeInterval = 1000; + // String timeOffset = "10H"; + // + // String offsetUnit = timeOffset.substring(timeOffset.length() - 1); + // int startIndex = timeOffset.charAt(0) == '-' ? 1 : 0; + // int offsetTime = Integer.parseInt(timeOffset.substring(startIndex, timeOffset.length() + // - 1)); + // if("d".equalsIgnoreCase(offsetUnit)){ + // timeInterval += offsetTime * 24 * 3600 * 1000; + // }else if("h".equalsIgnoreCase(offsetUnit)){ + // timeInterval += offsetTime * 3600 * 1000; + // }else if("m".equalsIgnoreCase(offsetUnit)){ + // timeInterval += offsetTime * 60 * 1000; + // } + // + // System.out.println(timeInterval); + // + // SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMdd HH:mm:ss"); + // + // Calendar calendar = NewDateUtils.getCurDate("D", "-10D"); + // System.out.println("year: " + calendar.get(Calendar.YEAR) + ", month: " + // + (calendar.get(Calendar.MONTH) + 1) + ", day: " + // + calendar.get(Calendar.DAY_OF_MONTH) + ", hour: " + // + calendar.get(Calendar.HOUR_OF_DAY) + ", minute: " + // + calendar.get(Calendar.MINUTE) + ", second: " + // + calendar.get(Calendar.SECOND)); + // System.out.println(dateFormat.format(calendar.getTimeInMillis())); + // + // calendar = getCurDate("H", "-2H"); + // System.out.println("year: " + calendar.get(Calendar.YEAR) + ", month: " + // + (calendar.get(Calendar.MONTH) + 1) + ", day: " + // + calendar.get(Calendar.DAY_OF_MONTH) + ", hour: " + // + calendar.get(Calendar.HOUR_OF_DAY) + ", minute: " + // + calendar.get(Calendar.MINUTE) + ", second: " + // + calendar.get(Calendar.SECOND)); + // System.out.println(dateFormat.format(calendar.getTimeInMillis())); + // + // calendar = getCurDate("H", "-2D"); + // System.out.println("year: " + calendar.get(Calendar.YEAR) + ", month: " + // + (calendar.get(Calendar.MONTH) + 1) + ", day: " + // + calendar.get(Calendar.DAY_OF_MONTH) + ", hour: " + // + calendar.get(Calendar.HOUR_OF_DAY) + ", minute: " + // + calendar.get(Calendar.MINUTE) + ", second: " + // + calendar.get(Calendar.SECOND)); + // System.out.println(dateFormat.format(calendar.getTimeInMillis())); + // + // calendar = getCurDate("5m", "-20m"); + // System.out.println("year: " + calendar.get(Calendar.YEAR) + ", month: " + // + (calendar.get(Calendar.MONTH) + 1) + ", day: " + // + calendar.get(Calendar.DAY_OF_MONTH) + ", hour: " + // + calendar.get(Calendar.HOUR_OF_DAY) + ", minute: " + // + calendar.get(Calendar.MINUTE) + ", second: " + // + calendar.get(Calendar.SECOND)); + // System.out.println(dateFormat.format(calendar.getTimeInMillis())); + // + // String directory = "/data/home/user00/xyshome/logsvr/log/YYYYMMDD/[0-9]+_YYYYMMDD_hh00 + // .log"; + // calendar = getCurDate("H", "-3H"); + // System.out.println(replaceDateExpression(calendar, directory)); + // + // System.out.println(NewDateUtils.timeStrConvertTomillSec("201404031105", + // "m")); + // System.out.println(NewDateUtils.timeStrConvertTomillSec("2014040223", + // "H")); + // System.out.println(NewDateUtils + // .timeStrConvertTomillSec("20140402", "D")); + // + // System.out.println(NewDateUtils.millSecConvertToTimeStr( + // System.currentTimeMillis(), "Y")); + // System.out.println(NewDateUtils.millSecConvertToTimeStr( + // System.currentTimeMillis(), "M")); + // System.out.println(NewDateUtils.millSecConvertToTimeStr( + // System.currentTimeMillis(), "D")); + // System.out.println(NewDateUtils.millSecConvertToTimeStr( + // System.currentTimeMillis(), "H")); + // System.out.println(NewDateUtils.millSecConvertToTimeStr( + // System.currentTimeMillis(), "10m")); + // System.out.println(NewDateUtils.millSecConvertToTimeStr( + // System.currentTimeMillis(), "15m")); + // System.out.println(NewDateUtils.millSecConvertToTimeStr( + // System.currentTimeMillis(), "30m")); + // System.out.println(NewDateUtils.millSecConvertToTimeStr( + // NewDateUtils.timeStrConvertTomillSec("201404121900", "10m"), + // "10m")); + // + // NewDateUtils.getDateRegion("20120810", "20120813", "D"); + // NewDateUtils.getDateRegion("2012081005", "2012081300", "H"); + // NewDateUtils.getDateRegion("201404111649", "201404111600", "10m"); + // String dataTime = "20160122"; + // System.out.println(NewDateUtils.getShouldStartTime(dataTime, "D", "-2h")); + + // String dataPath = "/data/herococo/YYYYMMDD_*/YYYYMMDDhhmm.log"; + // dataPath = NewDateUtils.replaceDateExpressionWithRegex(dataPath); + // System.out.println("dataPath: " + dataPath); + // + // Pattern pattern = Pattern.compile(dataPath, Pattern.CASE_INSENSITIVE + // | Pattern.DOTALL | Pattern.MULTILINE); + // // Pattern pattern = Pattern.compile("/data/herococo/\\d+/\\d+.log", + // // Pattern.CASE_INSENSITIVE | Pattern.DOTALL | Pattern.MULTILINE); + // Matcher m = pattern + // .matcher("/data/herococo/20140406_a/20140406152730.log"); + // System.out.println(m.matches()); + // + // dataPath = "/data/home/user00/xyshome/logsvr/log/YYYYMMDD/[0-9]+_YYYYMMDD_hh00.log"; + // dataPath = NewDateUtils.replaceDateExpressionWithRegex(dataPath); + // pattern = Pattern.compile(dataPath, Pattern.CASE_INSENSITIVE + // | Pattern.DOTALL | Pattern.MULTILINE); + // m = pattern + // .matcher("/data/home/user00/xyshome/logsvr/log/20140406/8_20140406_1600.log"); + // System.out.println(dataPath); + // System.out.println(m.matches()); + // + // dataPath = "/data/work/data2/abc/YYYYMMDDhh.*.txt"; + // dataPath = NewDateUtils.replaceDateExpressionWithRegex(dataPath); + // pattern = Pattern.compile(dataPath, Pattern.CASE_INSENSITIVE + // | Pattern.DOTALL | Pattern.MULTILINE); + // m = pattern.matcher("/data/work/data2/abc/201404102242.txt"); + // System.out.println(dataPath); + // System.out.println(m.matches()); + // + // List retTimeList = NewDateUtils.getDateRegion("20140411", + // "20140411", "D"); + // for (Long time : retTimeList) { + // System.out.println(NewDateUtils.millSecConvertToTimeStr(time, "D")); + // } + // + // pattern = Pattern + // .compile( + // "/data/home/tlog/logplat/log/tlogd_1/[0-9]+_\\d{4}\\d{2}\\d{2}_\\d{2}00 + // .log", + // Pattern.CASE_INSENSITIVE | Pattern.DOTALL + // | Pattern.MULTILINE); + // m = pattern + // .matcher("/data/home/tlog/logplat/log/tlogd_1/65535_20140506_1600.log.1"); + // System.out.println(m.matches()); + // System.out.println(m.lookingAt()); + // + // String unit = "h"; + // if (StringUtils.endsWithIgnoreCase("h", "H")) { + // System.out.println("yes"); + // } + // + // System.out.println(NewDateUtils.getDateTime("20160106", "D", "-4h")); + + // PathDateExpression dateExpression = DateUtils + // .extractLongestTimeRegexWithPrefixOrSuffix + // ("/data/log/qqtalk/[0-9]+_[0-9]+_id20522_[0-9]+_YYYYMMDD_hh.log"); + // System.out.println(dateExpression.getLongestDatePattern()); + // String fileTime = getDateTime("/data/log/qqtalk/3900626911_11217_id20522_17_20160420 + // .log", dateExpression); + // System.out.println(fileTime); + + // String dataTime = "20180411"; + // + // String shouldStart = getShouldStartTime(dataTime, "D", "4h"); + // System.out.println(shouldStart); + // + // String fileName = "rc_trade_water[0-9]*.YYYY-MM-DD-hh.[0-9]+"; + // + // String newFileName = "rc_trade_water.2016-11-20-12.9"; + // + // /** + // * 打印出文件名中 最长的时间表达式 + // */ + // PathDateExpression dateExpression = DateUtils.extractLongestTimeRegexWithPrefixOrSuffix + // (fileName); + // System.out.println(dateExpression.getLongestDatePattern()); + // + // /** + // * 检查正则表达式是否能匹配到文件 + // */ + // Pattern pattern = Pattern.compile(replaceDateExpressionWithRegex(fileName), + // Pattern.CASE_INSENSITIVE | Pattern.DOTALL | Pattern.MULTILINE); + // + // Matcher matcher = pattern.matcher(newFileName); + // + // if (matcher.matches() || matcher.lookingAt()) { + // System.out.println("Matched File"); + // } + // + // /** + // * 打印文件名的时间 + // */ + // String fileTime = getDateTime(newFileName, dateExpression); + // System.out.println(fileTime); + // + // + // String fileName1 = "/data/joox_logs/2000701106/201602170040.log"; + // String filePathRegx = "/data/joox_logs/2000701106/{YYYYMMDDhh}40.log"; + // String fullRegx = replaceDateExpressionWithRegex(filePathRegx, "dateTimeGN"); + // System.out.println(fullRegx); + // Pattern fullPattern = Pattern.compile(fullRegx); + // Matcher fullMatcher = fullPattern.matcher(fileName1); + // while (fullMatcher.find()) { + // System.out.println(fullMatcher.group("dateTimeGN")); + // } + + System.out.println( + timeStrConvertTomillSec("2018111209", "h", TimeZone.getTimeZone("GMT+8:00"))); + } }