diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Instance.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Instance.java new file mode 100755 index 00000000000..90bac4c94f0 --- /dev/null +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Instance.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin; + +import org.apache.inlong.agent.conf.InstanceProfile; +import org.apache.inlong.agent.state.AbstractStateWrapper; + +import java.io.IOException; + +/** + * Instance interface, which generated by task in condition. + */ +public abstract class Instance extends AbstractStateWrapper { + + /** + * init instance by instance profile + * + * @throws IOException + */ + public abstract void init(Object instanceManager, InstanceProfile profile); + + /** + * destroy instance. + */ + public abstract void destroy(); + + /** + * get instance profile + */ + public abstract InstanceProfile getProfile(); + + /** + * get task id + */ + public abstract String getTaskId(); + + /** + * get instance id + */ + public abstract String getInstanceId(); +} diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java new file mode 100644 index 00000000000..a6c8381de36 --- /dev/null +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java @@ -0,0 +1,350 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.core.instance; + +import org.apache.inlong.agent.common.AbstractDaemon; +import org.apache.inlong.agent.common.AgentThreadFactory; +import org.apache.inlong.agent.conf.AgentConfiguration; +import org.apache.inlong.agent.conf.InstanceProfile; +import org.apache.inlong.agent.constant.AgentConstants; +import org.apache.inlong.agent.db.Db; +import org.apache.inlong.agent.db.InstanceDb; +import org.apache.inlong.agent.plugin.Instance; +import org.apache.inlong.agent.utils.AgentUtils; +import org.apache.inlong.agent.utils.ThreadUtils; +import org.apache.inlong.common.enums.InstanceStateEnum; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * handle the instance created by task, including add, delete, update etc. + * the instance info is store in both db and memory. + */ +public class InstanceManager extends AbstractDaemon { + + private static final Logger LOGGER = LoggerFactory.getLogger(InstanceManager.class); + private static final int ACTION_QUEUE_CAPACITY = 100000; + public static final int CORE_THREAD_SLEEP_TIME = 100; + // task in db + private final InstanceDb instanceDb; + // task in memory + private final ConcurrentHashMap instanceMap; + // instance profile queue. + private final BlockingQueue actionQueue; + // task thread pool; + private static final ThreadPoolExecutor EXECUTOR_SERVICE = new ThreadPoolExecutor( + 0, Integer.MAX_VALUE, + 1L, TimeUnit.SECONDS, + new SynchronousQueue<>(), + new AgentThreadFactory("instance-manager")); + + private final int taskMaxLimit; + private final AgentConfiguration agentConf; + private final String taskId; + private volatile boolean runAtLeastOneTime = false; + private volatile boolean running = false; + + /** + * Init task manager. + */ + public InstanceManager(String taskId, Db basicDb) { + this.taskId = taskId; + instanceDb = new InstanceDb(basicDb); + this.agentConf = AgentConfiguration.getAgentConf(); + instanceMap = new ConcurrentHashMap<>(); + taskMaxLimit = agentConf.getInt(AgentConstants.JOB_NUMBER_LIMIT, AgentConstants.DEFAULT_JOB_NUMBER_LIMIT); + actionQueue = new LinkedBlockingQueue<>(ACTION_QUEUE_CAPACITY); + } + + public String getTaskId() { + return taskId; + } + + public Instance getInstance(String instanceId) { + return instanceMap.get(instanceId); + } + + public InstanceProfile getInstanceProfile(String instanceId) { + return instanceDb.getInstance(taskId, instanceId); + } + + public boolean submitAction(InstanceAction action) { + if (action == null) { + return false; + } + return actionQueue.offer(action); + } + + /** + * thread for core thread. + * + * @return runnable profile. + */ + private Runnable coreThread() { + return () -> { + Thread.currentThread().setName("instance-manager-core-" + taskId); + running = true; + while (isRunnable()) { + try { + AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME); + dealWithActionQueue(actionQueue); + keepPaceWithDb(); + } catch (Throwable ex) { + LOGGER.error("coreThread {}", ex.getMessage()); + ThreadUtils.threadThrowableHandler(Thread.currentThread(), ex); + } + runAtLeastOneTime = true; + } + running = false; + }; + } + + private void keepPaceWithDb() { + traverseDbTasksToMemory(); + traverseMemoryTasksToDb(); + } + + private void traverseDbTasksToMemory() { + instanceDb.getInstances(taskId).forEach((profileFromDb) -> { + InstanceStateEnum dbState = profileFromDb.getState(); + Instance task = instanceMap.get(profileFromDb.getInstanceId()); + switch (dbState) { + case DEFAULT: { + if (task == null) { + LOGGER.info("traverseDbTasksToMemory add instance to mem taskId {} instanceId {}", + profileFromDb.getTaskId(), profileFromDb.getInstanceId()); + addToMemory(profileFromDb); + } + break; + } + case FINISHED: + DELETE: { + if (task != null) { + LOGGER.info("traverseDbTasksToMemory delete instance from mem taskId {} instanceId {}", + profileFromDb.getTaskId(), profileFromDb.getInstanceId()); + deleteFromMemory(profileFromDb.getInstanceId()); + } + break; + } + default: { + LOGGER.error("instance invalid state {} taskId {} instanceId {}", dbState, + profileFromDb.getTaskId(), + profileFromDb.getInstanceId()); + } + } + }); + } + + private void traverseMemoryTasksToDb() { + instanceMap.values().forEach((instance) -> { + InstanceProfile profileFromDb = instanceDb.getInstance(instance.getTaskId(), instance.getInstanceId()); + if (profileFromDb == null) { + deleteFromMemory(instance.getInstanceId()); + return; + } + InstanceStateEnum stateFromDb = profileFromDb.getState(); + if (stateFromDb != InstanceStateEnum.DEFAULT) { + deleteFromMemory(instance.getInstanceId()); + } + }); + } + + private void dealWithActionQueue(BlockingQueue queue) { + while (isRunnable()) { + try { + InstanceAction action = queue.poll(); + if (action == null) { + break; + } + switch (action.getActionType()) { + case ADD: + addInstance(action.getProfile()); + break; + case FINISH: + finishInstance(action.getProfile()); + break; + case DELETE: + deleteInstance(action.getProfile().getInstanceId()); + break; + default: + LOGGER.error("invalid action type for instance manager: taskId {} type {}", taskId, + action.getActionType()); + } + } catch (Throwable ex) { + LOGGER.error("dealWithActionQueue", ex); + ThreadUtils.threadThrowableHandler(Thread.currentThread(), ex); + } + } + } + + @Override + public void start() { + restoreFromDb(); + submitWorker(coreThread()); + } + + @Override + public void stop() { + waitForTerminate(); + stopAllInstances(); + } + + public void waitForTerminate() { + super.waitForTerminate(); + while (running) { + AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME); + } + } + + private void restoreFromDb() { + List profileList = instanceDb.getInstances(taskId); + profileList.forEach((profile) -> { + InstanceStateEnum state = profile.getState(); + if (state == InstanceStateEnum.DEFAULT) { + LOGGER.info("instance restoreFromDb addToMem state {} taskId {} instanceId {}", state, taskId, + profile.getInstanceId()); + addToMemory(profile); + } else { + LOGGER.info("instance restoreFromDb ignore state {} taskId {} instanceId {}", state, taskId, + profile.getInstanceId()); + } + }); + } + + private void addInstance(InstanceProfile profile) { + LOGGER.info("addInstance taskId {} instanceId {}", taskId, profile.getInstanceId()); + addToDb(profile); + addToMemory(profile); + } + + private void finishInstance(InstanceProfile profile) { + profile.setState(InstanceStateEnum.FINISHED); + profile.setModifyTime(AgentUtils.getCurrentTime()); + addToDb(profile); + deleteFromMemory(profile.getInstanceId()); + LOGGER.info("finished instance state {} taskId {} instanceId {}", profile.getState(), + profile.getTaskId(), profile.getInstanceId()); + } + + private void deleteInstance(String instanceId) { + deleteFromDb(instanceId); + deleteFromMemory(instanceId); + } + + private void deleteFromDb(String instanceId) { + instanceDb.deleteInstance(taskId, instanceId); + LOGGER.info("delete instance from db: taskId {} instanceId {} result {}", taskId, + instanceId, instanceDb.getInstance(taskId, instanceId)); + } + + private void deleteFromMemory(String instanceId) { + Instance instance = instanceMap.get(instanceId); + if (instance == null) { + LOGGER.error("try to delete instance from memory but not found: taskId {} instanceId {}", taskId, + instanceId); + return; + } + instance.destroy(); + instanceMap.remove(instanceId); + LOGGER.info("delete instance from memory: taskId {} instanceId {}", taskId, instance.getInstanceId()); + } + + private void addToDb(InstanceProfile profile) { + LOGGER.info("add instance to db instanceId {} ", profile.getInstanceId()); + instanceDb.storeInstance(profile); + } + + /** + * add instance to memory, if there is a record refer to the instance id exist we need to destroy it first. + */ + private void addToMemory(InstanceProfile instanceProfile) { + Instance oldInstance = instanceMap.get(instanceProfile.getInstanceId()); + if (oldInstance != null) { + oldInstance.destroy(); + instanceMap.remove(instanceProfile.getInstanceId()); + LOGGER.error("old instance {} should not exist, try stop it first", + instanceProfile); + } + LOGGER.info("instanceProfile {}", instanceProfile.toJsonStr()); + try { + Class taskClass = Class.forName(instanceProfile.getInstanceClass()); + Instance instance = (Instance) taskClass.newInstance(); + instance.init(this, instanceProfile); + instanceMap.put(instanceProfile.getInstanceId(), instance); + EXECUTOR_SERVICE.submit(instance); + LOGGER.info( + "add instance to memory instanceId {} instanceMap size {}, runningPool instance total {}, runningPool instance active {}", + instance.getInstanceId(), instanceMap.size(), EXECUTOR_SERVICE.getTaskCount(), + EXECUTOR_SERVICE.getActiveCount()); + } catch (Throwable t) { + LOGGER.error("add instance error {}", t.getMessage()); + } + } + + private void stopAllInstances() { + instanceMap.values().forEach((instance) -> { + deleteInstance(instance.getInstanceId()); + }); + instanceMap.clear(); + } + + public boolean shouldAddAgain(String fileName, long lastModifyTime) { + InstanceProfile profileFromDb = instanceDb.getInstance(taskId, fileName); + if (profileFromDb == null) { + return true; + } else { + InstanceStateEnum state = profileFromDb.getState(); + if (state == InstanceStateEnum.FINISHED && lastModifyTime > profileFromDb.getModifyTime()) { + return true; + } + if (state == InstanceStateEnum.DELETE) { + return true; + } + return false; + } + } + + public boolean allInstanceFinished() { + if (!runAtLeastOneTime) { + return false; + } + if (!instanceMap.isEmpty()) { + return false; + } + if (!actionQueue.isEmpty()) { + return false; + } + List instances = instanceDb.getInstances(taskId); + for (int i = 0; i < instances.size(); i++) { + InstanceProfile profile = instances.get(i); + if (profile.getState() != InstanceStateEnum.FINISHED) { + return false; + } + } + return true; + } +} \ No newline at end of file diff --git a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/MockInstance.java b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/MockInstance.java new file mode 100644 index 00000000000..ada8cefcdd9 --- /dev/null +++ b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/MockInstance.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.core.instance; + +import org.apache.inlong.agent.conf.InstanceProfile; +import org.apache.inlong.agent.plugin.Instance; + +public class MockInstance extends Instance { + + public static final int INIT_TIME = 100; + public static final int RUN_TIME = 101; + public static final int DESTROY_TIME = 102; + private InstanceProfile profile; + private long index = INIT_TIME; + public long initTime = 0; + public long destroyTime = 0; + public long runtime = 0; + private InstanceManager instanceManager; + + @Override + public void init(Object instanceManager, InstanceProfile profile) { + this.instanceManager = (InstanceManager) instanceManager; + this.profile = profile; + initTime = index++; + } + + @Override + public void destroy() { + destroyTime = index++; + } + + @Override + public InstanceProfile getProfile() { + return profile; + } + + @Override + public String getTaskId() { + return profile.getTaskId(); + } + + @Override + public String getInstanceId() { + return profile.getInstanceId(); + } + + @Override + public void addCallbacks() { + + } + + @Override + public void run() { + runtime = index++; + } + + public void sendFinishAction() { + InstanceAction action = new InstanceAction(); + action.setActionType(ActionType.FINISH); + action.setProfile(profile); + instanceManager.submitAction(action); + } +} \ No newline at end of file diff --git a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java new file mode 100755 index 00000000000..62bfa85d8a8 --- /dev/null +++ b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.core.instance; + +import org.apache.inlong.agent.conf.InstanceProfile; +import org.apache.inlong.agent.conf.TaskProfile; +import org.apache.inlong.agent.core.AgentBaseTestsHelper; +import org.apache.inlong.agent.core.task.file.TaskManager; +import org.apache.inlong.agent.db.Db; +import org.apache.inlong.agent.utils.AgentUtils; +import org.apache.inlong.common.enums.InstanceStateEnum; +import org.apache.inlong.common.enums.TaskStateEnum; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; + +import static org.awaitility.Awaitility.await; + +public class TestInstanceManager { + + private static final Logger LOGGER = LoggerFactory.getLogger(TestInstanceManager.class); + private static InstanceManager manager; + private static AgentBaseTestsHelper helper; + private static TaskProfile taskProfile; + + @BeforeClass + public static void setup() { + helper = new AgentBaseTestsHelper(TestInstanceManager.class.getName()).setupAgentHome(); + String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+"; + Db basicDb = TaskManager.initDb("/localdb"); + taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L, TaskStateEnum.RUNNING); + manager = new InstanceManager("1", basicDb); + manager.start(); + } + + @AfterClass + public static void teardown() { + manager.stop(); + helper.teardownAgentHome(); + } + + @Test + public void testInstanceManager() { + long timeBefore = AgentUtils.getCurrentTime(); + InstanceProfile profile = taskProfile.createInstanceProfile(MockInstance.class.getCanonicalName(), + helper.getTestRootDir() + "/20230927.log_1", "20230927"); + String instanceId = profile.getInstanceId(); + InstanceAction action = new InstanceAction(); + action.setActionType(ActionType.ADD); + action.setProfile(profile); + // test add action + manager.submitAction(action); + await().atMost(1, TimeUnit.SECONDS).until(() -> manager.getInstance(instanceId) != null); + Assert.assertTrue(manager.getInstanceProfile(instanceId).getState() == InstanceStateEnum.DEFAULT); + + // test finish action + MockInstance instance = (MockInstance) manager.getInstance(profile.getInstanceId()); + instance.sendFinishAction(); + await().atMost(1, TimeUnit.SECONDS).until(() -> manager.getInstance(instanceId) == null); + Assert.assertTrue(manager.getInstanceProfile(instanceId).getState() == InstanceStateEnum.FINISHED); + // test modify before finish + Assert.assertFalse(manager.shouldAddAgain(profile.getInstanceId(), timeBefore)); + // test modify after finish + Assert.assertTrue(manager.shouldAddAgain(profile.getInstanceId(), AgentUtils.getCurrentTime())); + + // test continue + action.setActionType(ActionType.ADD); + profile.setState(InstanceStateEnum.DEFAULT); + manager.submitAction(action); + await().atMost(1, TimeUnit.SECONDS).until(() -> manager.getInstance(instanceId) != null); + Assert.assertTrue(manager.getInstanceProfile(instanceId).getState() == InstanceStateEnum.DEFAULT); + + // test delete action + action.setActionType(ActionType.DELETE); + manager.submitAction(action); + await().atMost(1, TimeUnit.SECONDS).until(() -> manager.getInstanceProfile(instanceId) == null); + Assert.assertTrue(instance.initTime == MockInstance.INIT_TIME); + Assert.assertTrue(instance.runtime == MockInstance.RUN_TIME); + Assert.assertTrue(instance.destroyTime == MockInstance.DESTROY_TIME); + } +}