diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/CronTask.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/CronTask.java new file mode 100644 index 00000000000..0216eb96c67 --- /dev/null +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/CronTask.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.task; + +import org.apache.inlong.agent.conf.TaskProfile; +import org.apache.inlong.agent.db.Db; +import org.apache.inlong.agent.plugin.file.Task; + +/** + * Generate job by crontab expression. + */ +public class CronTask extends Task { + + @Override + public void init(Object srcManager, TaskProfile profile, Db basicDb) { + + } + + @Override + public void run() { + + } + + @Override + public void destroy() { + + } + + @Override + public TaskProfile getProfile() { + return null; + } + + @Override + public String getTaskId() { + return null; + } + + @Override + public void addCallbacks() { + + } +} diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/FormatDateLogFileCollectTask.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/FormatDateLogFileCollectTask.java new file mode 100644 index 00000000000..b8837fc0df9 --- /dev/null +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/FormatDateLogFileCollectTask.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.task; + +import org.apache.inlong.agent.plugin.task.filecollect.LogFileCollectTask; + +/** + * Directory trigger with format date. + */ +public class FormatDateLogFileCollectTask extends LogFileCollectTask { + +} diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PathPattern.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PathPattern.java new file mode 100644 index 00000000000..137b0a98de7 --- /dev/null +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PathPattern.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.task; + +import org.apache.inlong.agent.plugin.filter.DateFormatRegex; +import org.apache.inlong.agent.utils.PathUtils; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Path pattern for file filter. + * It’s identified by watchDir, which matches {@link PathPattern#whiteList}. + */ +public class PathPattern { + + private static final Logger LOGGER = + LoggerFactory.getLogger(PathPattern.class); + + private final String rootDir; + private final Set subDirs; + // regex for those files should be matched + private final Set whiteList; + + public PathPattern(String rootDir, Set whiteList) { + this(rootDir, whiteList, null); + } + + public PathPattern(String rootDir, Set whiteList, String offset) { + this.rootDir = rootDir; + this.subDirs = new HashSet<>(); + if (offset != null && StringUtils.isNotBlank(offset)) { + this.whiteList = whiteList.stream() + .map(whiteRegex -> DateFormatRegex.ofRegex(whiteRegex).withOffset(offset)) + .collect(Collectors.toSet()); + updateDateFormatRegex(); + } else { + this.whiteList = whiteList.stream() + .map(whiteRegex -> DateFormatRegex.ofRegex(whiteRegex)) + .collect(Collectors.toSet()); + } + } + + public static Set buildPathPattern(Set whiteList, + String offset) { + Set commonWatchDir = PathUtils.findCommonRootPath(whiteList); + return commonWatchDir.stream().map(rootDir -> { + Set commonWatchDirWhiteList = + whiteList.stream() + .filter(whiteRegex -> whiteRegex.startsWith(rootDir)) + .collect(Collectors.toSet()); + return new PathPattern(rootDir, commonWatchDirWhiteList, offset); + }).collect(Collectors.toSet()); + } + + /** + * cleanup local cache, subDirs is only used to filter duplicated directories + * in one term watch key check. + */ + public void cleanup() { + subDirs.clear(); + } + + /** + * Research all children files with {@link PathPattern#rootDir} matched whiteList and filtered by blackList. + * + * @param maxNum + * @return + */ + public Collection walkSuitableFiles(int maxNum) { + Collection suitableFiles = new ArrayList<>(); + walkSuitableFiles(suitableFiles, new File(rootDir), maxNum); + return suitableFiles; + } + + private void walkSuitableFiles(Collection suitableFiles, File file, int maxNum) { + if (suitableFiles.size() > maxNum) { + LOGGER.warn("Suitable files exceed max num {}, just return.", maxNum); + return; + } + + if (suitable(file.getAbsolutePath())) { + if (file.isFile()) { + suitableFiles.add(file); + } else if (file.isDirectory()) { + Stream.of(file.listFiles()).forEach(subFile -> walkSuitableFiles(suitableFiles, subFile, maxNum)); + } + } + } + + /** + * Check whether path is suitable for match whiteList and filtered by blackList + * + * @param path pathString + * @return true if suit else false. + */ + public boolean suitable(String path) { + // remove common root path + String briefSubDir = StringUtils.substringAfter(path, rootDir); + // if already watched, then stop deep find + if (subDirs.contains(briefSubDir)) { + LOGGER.info("already watched {}", path); + return false; + } + + subDirs.add(briefSubDir); + File file = new File(path); + return whiteList.stream() + .filter(whiteRegex -> whiteRegex.match(file)) + .findAny() + .isPresent(); + } + + /** + * when a new file is found, update regex since time may change. + */ + public void updateDateFormatRegex() { + whiteList.forEach(DateFormatRegex::setRegexWithCurrentTime); + } + + /** + * when job is retry job, the time for searching file should be specified. + */ + public void updateDateFormatRegex(String time) { + whiteList.forEach(whiteRegex -> whiteRegex.setRegexWithTime(time)); + } + + @Override + public String toString() { + return rootDir; + } + + @Override + public int hashCode() { + return HashCodeBuilder.reflectionHashCode(rootDir, false); + } + + @Override + public boolean equals(Object object) { + if (object instanceof PathPattern) { + PathPattern entity = (PathPattern) object; + return entity.rootDir.equals(this.rootDir); + } else { + return false; + } + } + + public String getRootDir() { + return rootDir; + } + + public String getSuitTime() { + // todo: Adapt to datetime in the case of multiple regex + return whiteList.stream().findAny().get().getFormattedTime(); + } +} diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/AgentErrMsg.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/AgentErrMsg.java new file mode 100644 index 00000000000..eff927736dd --- /dev/null +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/AgentErrMsg.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.task.filecollect; + +public class AgentErrMsg { + + public static final String CONFIG_SUCCESS = "SUCCESS"; + + // 数据源配置异常 */ + public static final String DATA_SOURCE_CONFIG_ERROR = "ERROR-0-TDAgent|10001|ERROR" + + "|ERROR_DATA_SOURCE_CONFIG|"; + + // 监控文件夹不存在 */ + public static final String DIRECTORY_NOT_FOUND_ERROR = "ERROR-0-TDAgent|11001|WARN" + + "|WARN_DIRECTORY_NOT_EXIST|"; + + // 监控文件夹时出错 */ + public static final String WATCH_DIR_ERROR = "ERROR-0-TDAgent|11002|ERROR" + + "|ERROR_WATCH_DIR_ERROR|"; + + // 要读取的文件异常(不存在,rotate) + public static final String FILE_ERROR = "ERROR-0-TDAgent|10002|ERROR|ERROR_SOURCE_FILE|"; + + // 读取文件异常 + public static final String FILE_OP_ERROR = "ERROR-1-TDAgent|30002|ERROR|ERROR_OPERATE_FILE|"; + + // 磁盘满 + public static final String DISK_FULL = "ERROR-1-TDAgent|30001|FATAL|FATAL_DISK_FULL|"; + + // 内存溢出 + public static final String OOM_ERROR = "ERROR-1-TDAgent|30001|FATAL|FATAL_OOM_ERROR|"; + + // watcher异常 + public static final String WATCHER_INVALID = "ERROR-1-TDAgent|40001|WARN|WARN_INVALID_WATCHER|"; + + // 连不上tdmanager + public static final String CONNECT_TDM_ERROR = "ERROR-1-TDAgent|30002|ERROR" + + "|ERROR_CANNOT_CONNECT_TO_TDM|"; + + // 发送数据到tdbus失败 + public static final String SEND_TO_BUS_ERROR = "ERROR-1-TDAgent|30003|ERROR|ERROR_SEND_TO_BUS|"; + + // 操作bdb异常 + public static final String BDB_ERROR = "ERROR-1-TDAgent|30003|ERROR|BDB_OPERATION_ERROR|"; + + // 内部缓存满 + public static final String MSG_BUFFER_FULL = "ERROR-1-TDAgent|40002|WARN|WARN_MSG_BUFFER_FULL|"; + + // 监控到的事件不合法(任务已删除) + public static final String FOUND_EVENT_INVALID = "ERROR-1-TDAgent|30003|ERROR" + + "|FOUND_EVENT_INVALID|"; +} diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java new file mode 100644 index 00000000000..46896d6cdc1 --- /dev/null +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.task.filecollect; + +import org.apache.inlong.agent.conf.TaskProfile; +import org.apache.inlong.agent.constant.TaskConstants; +import org.apache.inlong.agent.plugin.utils.file.FilePathUtil; +import org.apache.inlong.agent.plugin.utils.file.FileTimeComparator; +import org.apache.inlong.agent.plugin.utils.file.Files; +import org.apache.inlong.agent.plugin.utils.file.NewDateUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.Collections; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/* + * This class is mainly used for scanning log file that we want to read. We use this class at + * tdagent recover process, the do and redo tasks and the current log file access when we deploy a + * new data source. + */ +public class FileScanner { + + public static class BasicFileInfo { + + public String fileName; + public String dataTime; + + public BasicFileInfo(String fileName, String dataTime) { + this.fileName = fileName; + this.dataTime = dataTime; + } + + } + + private static final Logger logger = LoggerFactory.getLogger(FileScanner.class); + + public static List scanTaskBetweenTimes(TaskProfile conf, String originPattern, long failTime, + long recoverTime, boolean isRetry) { + String cycleUnit = conf.getCycleUnit(); + if (!isRetry) { + failTime -= NewDateUtils.caclOffset(conf.getTimeOffset()); + recoverTime -= NewDateUtils.caclOffset(conf.getTimeOffset()); + } + + String startTime = NewDateUtils.millSecConvertToTimeStr(failTime, cycleUnit); + String endTime = NewDateUtils.millSecConvertToTimeStr(recoverTime, cycleUnit); + logger.info("task {} this scan time is between {} and {}.", + new Object[]{conf.getTaskId(), startTime, endTime}); + + return scanTaskBetweenTimes(conf, originPattern, startTime, endTime); + } + + /* Scan log files and create tasks between two times. */ + public static List scanTaskBetweenTimes(TaskProfile conf, String originPattern, String startTime, + String endTime) { + String cycleUnit = conf.getCycleUnit(); + int maxFileNum = conf.getInt(TaskConstants.FILE_MAX_NUM); + List dateRegion = NewDateUtils.getDateRegion(startTime, endTime, cycleUnit); + List infos = new ArrayList(); + for (Long time : dateRegion) { + Calendar calendar = Calendar.getInstance(); + calendar.setTimeInMillis(time); + String filename = NewDateUtils.replaceDateExpression(calendar, originPattern); + ArrayList allPaths = FilePathUtil.cutDirectory(filename); + String firstDir = allPaths.get(0); + String secondDir = allPaths.get(0) + File.separator + allPaths.get(1); + ArrayList fileList = getUpdatedOrNewFiles(firstDir, secondDir, filename, 3, + maxFileNum); + for (String file : fileList) { + // TODO the time is not YYYYMMDDHH + String dataTime = NewDateUtils.millSecConvertToTimeStr(time, cycleUnit); + BasicFileInfo info = new BasicFileInfo(file, dataTime); + logger.info("scan new task fileName {} ,dataTime {}", file, + NewDateUtils.millSecConvertToTimeStr(time, cycleUnit)); + infos.add(info); + } + } + return infos; + } + + public static ArrayList scanFile(TaskProfile conf, String originPattern, long dataTime) { + int maxFileNum = conf.getInt(TaskConstants.FILE_MAX_NUM); + Calendar calendar = Calendar.getInstance(); + calendar.setTimeInMillis(dataTime); + + String filename = NewDateUtils.replaceDateExpression(calendar, originPattern); + ArrayList allPaths = FilePathUtil.cutDirectory(filename); + String firstDir = allPaths.get(0); + String secondDir = allPaths.get(0) + File.separator + allPaths.get(1); + return getUpdatedOrNewFiles(firstDir, secondDir, filename, 3, maxFileNum); + } + + private static ArrayList getUpdatedOrNewFiles(String firstDir, String secondDir, + String fileName, long depth, int maxFileNum) { + + // logger.info("getUpdatedOrNewFiles: firstdir: {}, seconddir: {} filename: {}", + // new Object[]{firstDir, secondDir, fileName}); + + ArrayList ret = new ArrayList(); + ArrayList readyFiles = new ArrayList(); + + if (!new File(firstDir).isDirectory()) { + return ret; + } + + for (File pathname : Files.find(firstDir).yieldFilesAndDirectories() + .recursive().withDepth((int) depth).withDirNameRegex(secondDir) + .withFileNameRegex(fileName)) { + if (readyFiles.size() >= maxFileNum) { + break; + } + readyFiles.add(pathname); + } + // sort by last-modified time (older -> newer) + Collections.sort(readyFiles, new FileTimeComparator()); + for (File f : readyFiles) { + // System.out.println(f.getAbsolutePath()); + ret.add(f.getAbsolutePath()); + } + return ret; + } + + @SuppressWarnings("unused") + private static ArrayList getUpdatedOrNewFiles(String logFileName, + int maxFileNum) { + ArrayList ret = new ArrayList(); + ArrayList directories = FilePathUtil + .getDirectoryLayers(logFileName); + String parentDir = directories.get(0) + File.separator + + directories.get(1); + + Pattern pattern = Pattern.compile(directories.get(2), + Pattern.CASE_INSENSITIVE); + for (File file : new File(parentDir).listFiles()) { + Matcher matcher = pattern.matcher(file.getName()); + if (matcher.matches() && ret.size() < maxFileNum) { + ret.add(file.getAbsolutePath()); + } + } + return ret; + } + + public static void main(String[] args) { + + ArrayList fileList = FileScanner.getUpdatedOrNewFiles( + "f:\\\\abc", "f:\\\\abc\\\\", "f:\\\\abc\\\\1.txt", 3, 100); + // fileList = FileScanner.getUpdatedOrNewFiles("F:\\abc\\1.txt", 100); + for (String fileName : fileList) { + System.out.println(fileName); + } + } +} diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java new file mode 100644 index 00000000000..bde4a333616 --- /dev/null +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java @@ -0,0 +1,501 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.task.filecollect; + +import org.apache.inlong.agent.conf.InstanceProfile; +import org.apache.inlong.agent.conf.TaskProfile; +import org.apache.inlong.agent.constant.TaskConstants; +import org.apache.inlong.agent.core.instance.ActionType; +import org.apache.inlong.agent.core.instance.InstanceAction; +import org.apache.inlong.agent.core.instance.InstanceManager; +import org.apache.inlong.agent.core.task.TaskAction; +import org.apache.inlong.agent.core.task.file.TaskManager; +import org.apache.inlong.agent.db.Db; +import org.apache.inlong.agent.plugin.file.Task; +import org.apache.inlong.agent.plugin.task.filecollect.FileScanner.BasicFileInfo; +import org.apache.inlong.agent.plugin.utils.file.FilePathUtil; +import org.apache.inlong.agent.plugin.utils.file.NewDateUtils; +import org.apache.inlong.agent.plugin.utils.file.PathDateExpression; +import org.apache.inlong.agent.state.State; +import org.apache.inlong.agent.utils.AgentUtils; +import org.apache.inlong.agent.utils.file.FileUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardWatchEventKinds; +import java.nio.file.WatchEvent; +import java.nio.file.WatchEvent.Kind; +import java.nio.file.WatchKey; +import java.nio.file.WatchService; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TimeZone; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.regex.Matcher; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Watch directory, if new valid files are created, create jobs correspondingly. + */ +public class LogFileCollectTask extends Task { + + public static final String DEFAULT_FILE_INSTANCE = "org.apache.inlong.agent.plugin.instance.FileInstance"; + private static final Logger LOGGER = LoggerFactory.getLogger(LogFileCollectTask.class); + private TaskProfile taskProfile; + private Db basicDb; + private TaskManager taskManager; + private InstanceManager instanceManager; + private final Map watchers = new ConcurrentHashMap<>(); + private final Set watchFailedDirs = new HashSet<>(); + private final Map> eventMap = + new ConcurrentHashMap<>(); + public static final long DAY_TIMEOUT_INTERVAL = 2 * 24 * 3600 * 1000; + public static final int CORE_THREAD_SLEEP_TIME = 1000; + private boolean retry; + private long startTime; + private long endTime; + private boolean initOK = false; + private Set originPatterns; + private long lastScanTime = 0; + public final long SCAN_INTERVAL = 1 * 60 * 1000; + private volatile boolean runAtLeastOneTime = false; + private volatile boolean running = false; + + @Override + public void init(Object srcManager, TaskProfile taskProfile, Db basicDb) throws IOException { + if (!isProfileValid(taskProfile)) { + LOGGER.error("task profile invalid {}", taskProfile); + return; + } + taskManager = (TaskManager) srcManager; + commonInit(taskProfile, basicDb); + if (retry) { + retryInit(); + } else { + watchInit(); + } + initOK = true; + } + + private void commonInit(TaskProfile taskProfile, Db basicDb) { + this.taskProfile = taskProfile; + this.basicDb = basicDb; + retry = taskProfile.getBoolean(TaskConstants.TASK_RETRY, false); + originPatterns = Stream.of(taskProfile.get(TaskConstants.FILE_DIR_FILTER_PATTERNS).split(",")) + .collect(Collectors.toSet()); + instanceManager = new InstanceManager(taskProfile.getTaskId(), basicDb); + try { + instanceManager.start(); + } catch (Exception e) { + LOGGER.error("start instance manager error {}", e.getMessage()); + } + } + + private boolean isProfileValid(TaskProfile profile) { + if (!profile.allRequiredKeyExist()) { + LOGGER.error("task profile needs all required key"); + return false; + } + boolean ret = + profile.hasKey(TaskConstants.FILE_DIR_FILTER_PATTERNS) + && profile.hasKey(TaskConstants.TASK_FILE_TIME_OFFSET) + && profile.hasKey(TaskConstants.FILE_MAX_NUM); + if (!ret) { + LOGGER.error("task profile needs file keys"); + return false; + } + if (profile.getBoolean(TaskConstants.TASK_RETRY, false)) { + long startTime = profile.getLong(TaskConstants.TASK_START_TIME, 0); + long endTime = profile.getLong(TaskConstants.TASK_END_TIME, 0); + if (startTime == 0 || endTime == 0) { + LOGGER.error("retry task time error start {} end {}", startTime, endTime); + return false; + } + } + return true; + } + + private void retryInit() { + startTime = taskProfile.getLong(TaskConstants.TASK_START_TIME, 0); + endTime = taskProfile.getLong(TaskConstants.TASK_END_TIME, 0); + } + + private void watchInit() { + originPatterns.forEach((pathPattern) -> { + addPathPattern(pathPattern); + }); + } + + public void addPathPattern(String originPattern) { + ArrayList directories = FilePathUtil.getDirectoryLayers(originPattern); + String basicStaticPath = directories.get(0); + LOGGER.info("dataName {} watchPath {}", new Object[]{originPattern, basicStaticPath}); + /* Remember the failed watcher creations. */ + if (!new File(basicStaticPath).exists()) { + LOGGER.warn(AgentErrMsg.DIRECTORY_NOT_FOUND_ERROR + basicStaticPath); + watchFailedDirs.add(originPattern); + return; + } + try { + /* + * When we construct the watch object, we should do some work with the data name, replace yyyy to 4 digits + * regression, mm to 2 digits regression, also because of difference between java regular expression and + * linux regular expression, we have to replace * to ., and replace . with \\. . + */ + WatchService watchService = FileSystems.getDefault().newWatchService(); + WatchEntity entity = new WatchEntity(watchService, originPattern, taskProfile.getCycleUnit(), + taskProfile.getTimeOffset()); + entity.registerRecursively(); + watchers.put(originPattern, entity); + watchFailedDirs.remove(originPattern); + } catch (IOException e) { + if (e.toString().contains("Too many open files") || e.toString().contains("打开的文件过多")) { + LOGGER.error(AgentErrMsg.WATCH_DIR_ERROR + e.toString()); + } else { + LOGGER.error(AgentErrMsg.WATCH_DIR_ERROR + e.toString(), e); + } + } + } + + @Override + public void destroy() { + doChangeState(State.SUCCEEDED); + if (instanceManager != null) { + instanceManager.stop(); + } + releaseWatchers(watchers); + } + + private void releaseWatchers(Map watchers) { + while (running) { + AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME); + } + watchers.forEach((taskId, watcher) -> { + try { + watcher.getWatchService().close(); + } catch (IOException e) { + LOGGER.error("close watch service failed taskId {}", e, taskId); + } + }); + } + + @Override + public TaskProfile getProfile() { + return taskProfile; + } + + @Override + public String getTaskId() { + return taskProfile.getTaskId(); + } + + @Override + public void addCallbacks() { + + } + + @Override + public void run() { + Thread.currentThread().setName("directory-task-core-" + getTaskId()); + running = true; + while (!isFinished()) { + AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME); + if (!initOK) { + continue; + } + if (retry) { + runForRetry(); + } else { + runForNormal(); + } + } + running = false; + } + + private void runForRetry() { + if (!runAtLeastOneTime) { + scanExistingFile(); + dealWithEvenMap(); + runAtLeastOneTime = true; + } + if (instanceManager.allInstanceFinished()) { + LOGGER.info("retry task finished, send action to task manager, taskId {}", getTaskId()); + TaskAction action = new TaskAction(org.apache.inlong.agent.core.task.ActionType.FINISH, taskProfile); + taskManager.submitAction(action); + doChangeState(State.SUCCEEDED); + } + } + + private void runForNormal() { + if (AgentUtils.getCurrentTime() - lastScanTime > SCAN_INTERVAL) { + scanExistingFile(); + lastScanTime = AgentUtils.getCurrentTime(); + } + runForWatching(); + dealWithEvenMap(); + } + + private void scanExistingFile() { + originPatterns.forEach((originPattern) -> { + List fileInfos = scanExistingFileByPattern(originPattern); + LOGGER.debug("scan {} get file count {}", originPattern, fileInfos.size()); + fileInfos.forEach((fileInfo) -> { + addToEvenMap(fileInfo.fileName, fileInfo.dataTime); + }); + }); + } + + private List scanExistingFileByPattern(String originPattern) { + long startScanTime = startTime; + long endScanTime = endTime; + if (!retry) { + long currentTime = System.currentTimeMillis(); + // only scan two cycle, like two hours or two days + long offset = NewDateUtils.caclOffset("-2" + taskProfile.getCycleUnit()); + startScanTime = currentTime - offset; + endScanTime = currentTime; + } + return FileScanner.scanTaskBetweenTimes(taskProfile, originPattern, startScanTime, endScanTime, retry); + } + + private void runForWatching() { + /* Deal with those failed watcher creation tasks. */ + Set tmpWatchFailedDirs = new HashSet<>(watchFailedDirs); + for (String originPattern : tmpWatchFailedDirs) { + addPathPattern(originPattern); + } + /* + * Visit the watchers to see if it gets any new file creation, if it exists and fits the file name pattern, add + * it to the task list. + */ + for (Map.Entry entry : watchers.entrySet()) { + dealWithWatchEntity(entry.getKey()); + } + } + + private void dealWithEvenMap() { + removeTimeoutEven(eventMap, retry); + for (Map.Entry> entry : eventMap.entrySet()) { + Map sameDataTimeEvents = entry.getValue(); + // 根据event的数据时间、业务的周期、偏移量计算出该event是否需要在当前时间处理 + String dataTime = entry.getKey(); + String shouldStartTime = + NewDateUtils.getShouldStartTime(dataTime, taskProfile.getCycleUnit(), taskProfile.getTimeOffset()); + String currentTime = getCurrentTime(); + LOGGER.info("taskId {}, dataTime {}, currentTime {}, shouldStartTime {}", + new Object[]{getTaskId(), dataTime, currentTime, shouldStartTime}); + if (currentTime.compareTo(shouldStartTime) >= 0) { + /* These codes will sort the FileCreationEvents by create time. */ + Set sortedEvents = new TreeSet<>(sameDataTimeEvents.values()); + /* Check the file end with event creation time in asc order. */ + for (InstanceProfile sortEvent : sortedEvents) { + String fileName = sortEvent.getInstanceId(); + InstanceProfile profile = sameDataTimeEvents.get(fileName); + InstanceAction action = new InstanceAction(ActionType.ADD, profile); + while (!instanceManager.submitAction(action)) { + LOGGER.error("instance manager action queue is full: taskId {}", instanceManager.getTaskId()); + AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME); + } + sameDataTimeEvents.remove(fileName); + } + } + } + } + + private void removeTimeoutEven(Map> eventMap, boolean isRetry) { + if (isRetry) { + return; + } + for (Map.Entry> entry : eventMap.entrySet()) { + // 如果event的数据时间在当前时间前(后)2天之内,则有效 + String dataTime = entry.getKey(); + if (!NewDateUtils.isValidCreationTime(dataTime, DAY_TIMEOUT_INTERVAL)) { + /* Remove it from memory map. */ + eventMap.remove(dataTime); + LOGGER.warn("remove too old event from event map. dataTime {}", dataTime); + } + } + } + + private String getCurrentTime() { + SimpleDateFormat dateFormat = new SimpleDateFormat(NewDateUtils.DEFAULT_FORMAT); + TimeZone timeZone = TimeZone.getTimeZone(NewDateUtils.DEFAULT_TIME_ZONE); + dateFormat.setTimeZone(timeZone); + return dateFormat.format(new Date(System.currentTimeMillis())); + } + + public synchronized void dealWithWatchEntity(String originPattern) { + WatchEntity entity = watchers.get(originPattern); + if (entity == null) { + LOGGER.error("Can't find the watch entity for originPattern: " + originPattern); + return; + } + try { + /* Get all creation events until all events are consumed. */ + for (int i = 0; i < entity.getTotalPathSize(); i++) { + // maybe the watchService is closed ,but we catch this exception! + final WatchKey key = entity.getWatchService().poll(); + if (key == null) { + return; + } + dealWithWatchKey(entity, key); + } + } catch (Exception e) { + LOGGER.error("deal with creation event error: ", e); + } + } + + private void dealWithWatchKey(WatchEntity entity, WatchKey key) throws IOException { + Path contextPath = entity.getPath(key); + LOGGER.info("Find creation events in path: " + contextPath.toAbsolutePath()); + for (WatchEvent watchEvent : key.pollEvents()) { + Path child = resolvePathFromEvent(watchEvent, contextPath); + if (child == null) { + continue; + } + if (Files.isDirectory(child)) { + LOGGER.warn("The find creation event is triggered by a directory: " + child + .getFileName()); + entity.registerRecursively(child); + continue; + } + handleFilePath(child, entity); + } + resetWatchKey(entity, key, contextPath); + } + + private Path resolvePathFromEvent(WatchEvent watchEvent, Path contextPath) { + final Kind kind = watchEvent.kind(); + /* + * Can't simply continue when it detects that an event maybe ignored. + */ + if (kind == StandardWatchEventKinds.OVERFLOW) { + LOGGER.error("An event is unclear and lost"); + /* + * TODO: should we do a full scan to avoid lost events? + */ + return null; + } + final WatchEvent watchEventPath = (WatchEvent) watchEvent; + final Path eventPath = watchEventPath.context(); + /* + * Must resolve, otherwise we can't get the file attributes. + */ + return contextPath.resolve(eventPath); + } + + private void handleFilePath(Path filePath, WatchEntity entity) { + String newFileName = filePath.toFile().getAbsolutePath(); + LOGGER.info("[New File] {} {}", newFileName, entity.getOriginPattern()); + Matcher matcher = entity.getPattern().matcher(newFileName); + if (matcher.matches() || matcher.lookingAt()) { + LOGGER.info("[Matched File] {} {}", newFileName, entity.getOriginPattern()); + String dataTime = getDataTimeFromFileName(newFileName, entity.getOriginPattern(), + entity.getDateExpression()); + if (!checkFileNameForTime(newFileName, entity)) { + LOGGER.error(AgentErrMsg.FILE_ERROR + "File Timeout {} {}", newFileName, dataTime); + return; + } + addToEvenMap(newFileName, dataTime); + } + } + + private void addToEvenMap(String fileName, String dataTime) { + Long lastModifyTime = FileUtils.getFileLastModifyTime(fileName); + if (!instanceManager.shouldAddAgain(fileName, lastModifyTime)) { + LOGGER.info("file {} has record in db", fileName); + return; + } + Map sameDataTimeEvents = eventMap.computeIfAbsent(dataTime, + mapKey -> new ConcurrentHashMap<>()); + boolean containsInMemory = sameDataTimeEvents.containsKey(fileName); + if (containsInMemory) { + LOGGER.error("should not happen! may be {} has been deleted and add again", fileName); + return; + } + InstanceProfile instanceProfile = taskProfile.createInstanceProfile(DEFAULT_FILE_INSTANCE, + fileName, dataTime); + sameDataTimeEvents.put(fileName, instanceProfile); + } + + private boolean checkFileNameForTime(String newFileName, WatchEntity entity) { + /* Get the data time for this file. */ + PathDateExpression dateExpression = entity.getDateExpression(); + if (dateExpression.getLongestDatePattern().length() != 0) { + String dataTime = getDataTimeFromFileName(newFileName, entity.getOriginPattern(), dateExpression); + LOGGER.info("file {} ,fileTime {}", newFileName, dataTime); + if (!NewDateUtils.isValidCreationTime(dataTime, entity.getCycleUnit(), + entity.getTimeOffset())) { + return false; + } + } + return true; + } + + private String getDataTimeFromFileName(String fileName, String originPattern, PathDateExpression dateExpression) { + /* + * TODO: what if this file doesn't have any date pattern regex. + * + * For this case, we can simple think that the next file creation means the last task of this conf should finish + * reading and start reading this new file. + */ + // 从文件名称中提取数据时间 + String fileTime = NewDateUtils.getDateTime(fileName, originPattern, dateExpression); + + /** + * 将文件时间中任意非数字字符替换掉 + * 如2015-09-16_00替换成2015091600 + */ + return fileTime.replaceAll("\\D", ""); + } + + private void resetWatchKey(WatchEntity entity, WatchKey key, Path contextPath) { + key.reset(); + /* + * Register a new watch service on the path if the old watcher is invalid. + */ + if (!key.isValid()) { + LOGGER.warn(AgentErrMsg.WATCHER_INVALID + "Invalid Watcher {}", + contextPath.getFileName()); + try { + WatchService oldService = entity.getWatchService(); + oldService.close(); + WatchService watchService = FileSystems.getDefault().newWatchService(); + entity.clearKeys(); + entity.clearPathToKeys(); + entity.setWatchService(watchService); + entity.registerRecursively(); + } catch (IOException e) { + LOGGER.error("Restart a new watcher runs into error: ", e); + } + } + } +} diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/TaskType.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/TaskType.java new file mode 100644 index 00000000000..9bda67b85ae --- /dev/null +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/TaskType.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.task.filecollect; + +public enum TaskType { + + READER(0), + TAILER(1), + UPLOADER(2), + STATE(3), + OTHER(4), + DB(5), + GAIAReader(6); + + private int type; + + TaskType(int type) { + this.type = type; + } + + public int getType() { + return type; + } +} \ No newline at end of file diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/WatchEntity.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/WatchEntity.java new file mode 100644 index 00000000000..ffd74c0100a --- /dev/null +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/WatchEntity.java @@ -0,0 +1,358 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.task.filecollect; + +import org.apache.inlong.agent.plugin.utils.file.DateUtils; +import org.apache.inlong.agent.plugin.utils.file.FilePathUtil; +import org.apache.inlong.agent.plugin.utils.file.NewDateUtils; +import org.apache.inlong.agent.plugin.utils.file.NonRegexPatternPosition; +import org.apache.inlong.agent.plugin.utils.file.PathDateExpression; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardWatchEventKinds; +import java.nio.file.WatchKey; +import java.nio.file.WatchService; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class WatchEntity { + + private static final Logger logger = LoggerFactory.getLogger(WatchEntity.class); + private WatchService watchService; + private final String basicStaticPath; + private final String originPattern; + private final String regexPattern; + private final Pattern pattern; + private final PathDateExpression dateExpression; + private final String originPatternWithoutFileName; + private final Pattern patternWithoutFileName; + private final boolean containRegexPattern; + private final Map keys = new ConcurrentHashMap(); + private final Map pathToKeys = new ConcurrentHashMap(); + private final String dirSeparator = System.getProperty("file.separator"); + private String cycleUnit; + private String timeOffset; + + public WatchEntity(WatchService watchService, + String originPattern, + String cycleUnit, + String timeOffset) { + this.watchService = watchService; + this.originPattern = originPattern; + ArrayList directoryLayers = FilePathUtil.getDirectoryLayers(originPattern); + this.basicStaticPath = directoryLayers.get(0); + this.regexPattern = NewDateUtils.replaceDateExpressionWithRegex(originPattern); + pattern = Pattern.compile(regexPattern, Pattern.CASE_INSENSITIVE | Pattern.DOTALL | Pattern.MULTILINE); + ArrayList directories = FilePathUtil.cutDirectory(originPattern); + this.originPatternWithoutFileName = directories.get(0); + this.patternWithoutFileName = Pattern + .compile(NewDateUtils.replaceDateExpressionWithRegex(originPatternWithoutFileName), + Pattern.CASE_INSENSITIVE | Pattern.DOTALL | Pattern.MULTILINE); + /* + * Get the longest data regex from the data name, it's used if we want to get out the data time from the file + * name. + */ + this.dateExpression = DateUtils.extractLongestTimeRegexWithPrefixOrSuffix(originPattern); + this.containRegexPattern = isPathContainRegexPattern(); + this.cycleUnit = cycleUnit; + this.timeOffset = timeOffset; + logger.info("add a new watchEntity {}", this); + } + + @Override + public String toString() { + return "WatchEntity [parentPathName=" + basicStaticPath + + ", readFilePattern=" + regexPattern + + ", dateExpression=" + dateExpression + ", totalDirPattern=" + + originPatternWithoutFileName + ", containRegexPattern=" + + containRegexPattern + ", totalDirRegexPattern=" + + patternWithoutFileName + ", keys=" + keys + ", pathToKeys=" + pathToKeys + + ", watchService=" + watchService + "]"; + } + + private boolean isPathContainRegexPattern() { + if (originPatternWithoutFileName.contains("YYYY") || originPatternWithoutFileName.contains("MM") + || originPatternWithoutFileName.contains("DD") || originPatternWithoutFileName.contains("hh")) { + return true; + } + + return false; + } + + public boolean isContainRegexPattern() { + return containRegexPattern; + } + + private int calcPathDepth(String rootDir, String dirName) { + // rootDir + return 0; + } + + private void register(Path dir) throws IOException { + + if (dir == null) { + return; + } + + String dirName = dir.toAbsolutePath().toString(); + logger.info(dirName); + Matcher matcher = patternWithoutFileName.matcher(dirName); + String rootDir = Paths.get(basicStaticPath).toAbsolutePath().toString(); + Paths.get(basicStaticPath).toAbsolutePath().getNameCount(); + + // must use suffeix match + // consider /data/YYYYMMDD/abc/YYYYMMDDhh.*.txt this case + if (!pathToKeys.containsKey(dirName) && (matcher.matches() || rootDir.equals(dirName))) { + WatchKey key = dir.register(watchService, StandardWatchEventKinds.ENTRY_CREATE); + keys.put(key, dir); + pathToKeys.put(dirName, key); + + logger.info("Register a new directory: " + dir.toAbsolutePath().toString()); + } + } + + public void registerRecursively() throws IOException { + // register root dir + Path rootPath = Paths.get(basicStaticPath); + String rootDirName = rootPath.toAbsolutePath().toString(); + if (!pathToKeys.containsKey(rootDirName)) { + WatchKey key = rootPath.register(watchService, StandardWatchEventKinds.ENTRY_CREATE); + keys.put(key, rootPath); + pathToKeys.put(rootDirName, key); + logger.info("Register a new directory: " + rootDirName); + } + registerRecursively(rootPath.toFile(), rootPath.toAbsolutePath().toString().length() + 1); + } + + public void registerRecursively(Path dir) throws IOException { + Path rootPath = dir; + String rootDirName = rootPath.toAbsolutePath().toString(); + int beginIndex = rootDirName.lastIndexOf(dirSeparator) + 1; + if (beginIndex == 0) { + return; + } + int index = originPatternWithoutFileName.indexOf(dirSeparator, beginIndex + 1); + Pattern pattern = getPattern(index); + logger.info("beginIndex {} ,index {} ,dirPattern {}", + new Object[]{beginIndex, index, pattern.pattern()}); + if (!pathToKeys.containsKey(rootDirName) && match(pattern, rootDirName)) { + WatchKey key = rootPath.register(watchService, StandardWatchEventKinds.ENTRY_CREATE); + keys.put(key, rootPath); + pathToKeys.put(rootDirName, key); + logger.info("Register a new directory: " + rootDirName); + } else { + return; + } + + logger.info("rootPath len {}", rootPath.toAbsolutePath().toString().length()); + + registerRecursively(rootPath.toFile(), rootPath.toAbsolutePath().toString().length() + 1); + } + + public void registerRecursively(File dir, int beginIndex) throws IOException { + File[] files = dir.listFiles(); + if (files == null) { + return; + } + int index = originPatternWithoutFileName.indexOf(dirSeparator, beginIndex); + Pattern pattern = getPattern(index); + logger.info("beginIndex {} ,index {} ,dirPattern {}", + new Object[]{beginIndex, index, pattern.pattern()}); + for (int i = 0; i < files.length; i++) { + if (files[i].isDirectory()) { + String dirName = files[i].toString(); + Path dirPath = Paths.get(dirName); + if (!pathToKeys.containsKey(dirName) && match(pattern, dirName)) { + try { + WatchKey key = dirPath + .register(watchService, StandardWatchEventKinds.ENTRY_CREATE); + keys.put(key, dirPath); + pathToKeys.put(dirName, key); + logger.info("Register a new directory: " + dirName); + } catch (IOException e) { + /** + * 捕获异常,不能注册的子目录就忽略。 + */ + logger.error("Register directory {} error, skip it. ", dirName, e); + continue; + } + registerRecursively(files[i].getAbsoluteFile(), + files[i].getAbsolutePath().length() + 1); + } + } + } + } + + private Pattern getPattern(int index) { + String dirPattern = ""; + if (index == -1) { + dirPattern = originPatternWithoutFileName; + } else { + dirPattern = originPatternWithoutFileName.substring(0, index); + } + Pattern pattern = Pattern.compile(NewDateUtils.replaceDateExpressionWithRegex(dirPattern), + Pattern.CASE_INSENSITIVE | Pattern.DOTALL | Pattern.MULTILINE); + return pattern; + } + + private boolean match(Pattern pattern, String dirName) { + Matcher matcher = pattern.matcher(dirName); + return matcher.matches() || matcher.lookingAt(); + } + + public Path getPath(WatchKey key) { + return keys.get(key); + } + + public int getTotalPathSize() { + return keys.size(); + } + + public String getWatchPath() { + return basicStaticPath; + } + + public WatchService getWatchService() { + return watchService; + } + + public void setWatchService(WatchService watchService) { + this.watchService = watchService; + } + + public String getRegexPattern() { + return regexPattern; + } + + public PathDateExpression getDateExpression() { + return dateExpression; + } + + public String getLongestDatePattern() { + return dateExpression.getLongestDatePattern(); + } + + public NonRegexPatternPosition getPatternPosition() { + return dateExpression.getPatternPosition(); + } + + /* + * Remove the watched path which is 3 cycle units earlier than current task data time, this is because JDK7 starts a + * thread for each watch path, which should consume lots of memory. + */ + public void removeUselessWatchDirectories(String curDataTime) + throws Exception { + + logger.info("removeUselessWatchDirectories {}", curDataTime); + + /* Calculate the data time which is 3 cycle units earlier than current task data time. */ + long curDataTimeMillis = NewDateUtils.timeStrConvertTomillSec(curDataTime, cycleUnit); + Calendar calendar = Calendar.getInstance(); + calendar.setTimeInMillis(curDataTimeMillis); + if ("D".equalsIgnoreCase(cycleUnit)) { + calendar.add(Calendar.DAY_OF_YEAR, -3); + } else if ("h".equalsIgnoreCase(cycleUnit)) { + calendar.add(Calendar.HOUR_OF_DAY, -3); + } else if ("10m".equalsIgnoreCase(cycleUnit)) { + calendar.add(Calendar.MINUTE, -30); + } + + /* Calculate the 3 cycle units earlier date. */ + String year = String.valueOf(calendar.get(Calendar.YEAR)); + String month = String.valueOf(calendar.get(Calendar.MONTH) + 1); + if (month.length() < 2) { + month = "0" + month; + } + String day = String.valueOf(calendar.get(Calendar.DAY_OF_MONTH)); + if (day.length() < 2) { + day = "0" + day; + } + String hour = String.valueOf(calendar.get(Calendar.HOUR_OF_DAY)); + if (hour.length() < 2) { + hour = "0" + hour; + } + String minute = String.valueOf(calendar.get(Calendar.MINUTE)); + if (minute.length() < 2) { + minute = "0" + minute; + } + + /* Replace it with the date and get a specified watch path. */ + String copyDirPattern = new String(originPatternWithoutFileName); + copyDirPattern = copyDirPattern.replace("YYYY", year); + copyDirPattern = copyDirPattern.replace("MM", month); + copyDirPattern = copyDirPattern.replace("DD", day); + copyDirPattern = copyDirPattern.replace("hh", hour); + copyDirPattern = copyDirPattern.replace("mm", minute); + + Set keys = pathToKeys.keySet(); + Set tmpKeys = new HashSet<>(); + tmpKeys.addAll(keys); + String rootDir = Paths.get(basicStaticPath).toAbsolutePath().toString(); + for (String path : tmpKeys) { + /* + * Remove the watch path whose path is smaller than the 3 cycle units earlier. + */ + logger.info("[Path]{} {}", path, copyDirPattern); + if (path.compareTo(copyDirPattern) < 0 && !copyDirPattern.contains(path)) { + WatchKey key = pathToKeys.get(path); + key.cancel(); + + pathToKeys.remove(path); + + logger.info("Watch path: {} is too old for data time: {}, we should remove", path, + curDataTime); + } + } + } + + public void clearPathToKeys() { + pathToKeys.clear(); + } + + public void clearKeys() { + keys.clear(); + } + + public String getCycleUnit() { + return cycleUnit; + } + + public String getTimeOffset() { + return timeOffset; + } + + public String getOriginPattern() { + return originPattern; + } + + public Pattern getPattern() { + return pattern; + } +} diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java index 610dbc75744..7850678acd5 100755 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java @@ -18,13 +18,18 @@ package org.apache.inlong.agent.plugin; import org.apache.inlong.agent.conf.AgentConfiguration; +import org.apache.inlong.agent.conf.TaskProfile; import org.apache.inlong.agent.constant.AgentConstants; +import org.apache.inlong.agent.pojo.FileTask.FileTaskConfig; +import org.apache.inlong.common.enums.TaskStateEnum; +import org.apache.inlong.common.pojo.agent.DataConfig; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; import org.apache.commons.io.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; import java.nio.file.Path; import java.nio.file.Paths; @@ -34,7 +39,8 @@ public class AgentBaseTestsHelper { private static final Logger LOGGER = LoggerFactory.getLogger(AgentBaseTestsHelper.class); - + private static final GsonBuilder gsonBuilder = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss"); + private static final Gson GSON = gsonBuilder.create(); private final String className; private Path testRootDir; private Path parentPath; @@ -45,8 +51,8 @@ public AgentBaseTestsHelper(String className) { public AgentBaseTestsHelper setupAgentHome() { parentPath = Paths.get("./").toAbsolutePath(); - testRootDir = Paths.get(parentPath + File.separator + "logs", - AgentBaseTestsHelper.class.getSimpleName(), className); + testRootDir = Paths + .get("/tmp", AgentBaseTestsHelper.class.getSimpleName(), className); teardownAgentHome(); boolean result = testRootDir.toFile().mkdirs(); LOGGER.info("try to create {}, result is {}", testRootDir, result); @@ -54,14 +60,14 @@ public AgentBaseTestsHelper setupAgentHome() { return this; } - public Path getTestRootDir() { - return testRootDir.toAbsolutePath(); - } - public Path getParentPath() { return parentPath; } + public Path getTestRootDir() { + return testRootDir; + } + public void teardownAgentHome() { if (testRootDir != null) { try { @@ -71,4 +77,32 @@ public void teardownAgentHome() { } } } + + public TaskProfile getTaskProfile(int taskId, String pattern, boolean retry, Long startTime, Long endTime, + TaskStateEnum state) { + DataConfig dataConfig = getDataConfig(taskId, pattern, retry, startTime, endTime, state); + TaskProfile profile = TaskProfile.convertToTaskProfile(dataConfig); + return profile; + } + + private DataConfig getDataConfig(int taskId, String pattern, boolean retry, Long startTime, Long endTime, + TaskStateEnum state) { + DataConfig dataConfig = new DataConfig(); + dataConfig.setInlongGroupId("testGroupId"); // 老字段 groupId + dataConfig.setInlongStreamId("testStreamId"); // 老字段 streamId + dataConfig.setDataReportType(1); // 老字段 reportType + dataConfig.setTaskType(3); // 老字段 任务类型,3 代表文件采集 + dataConfig.setTaskId(taskId); // 老字段 任务 id + dataConfig.setState(state.ordinal()); // 新增! 任务状态 1 正常 2 暂停 + FileTaskConfig fileTaskConfig = new FileTaskConfig(); + fileTaskConfig.setPattern(pattern);// 正则 + fileTaskConfig.setTimeOffset("0d"); // 老字段 时间偏移 "-1d" 采一天前的 "-2h" 采 2 小时前的 + fileTaskConfig.setMaxFileCount(100); // 最大文件数 + fileTaskConfig.setCycleUnit("D"); // 新增! 任务周期 "D" 天 "h" 小时 + fileTaskConfig.setRetry(retry); // 新增! 是否补录,如果是补录任务则为 true + fileTaskConfig.setStartTime(startTime); + fileTaskConfig.setEndTime(endTime); + dataConfig.setExtParams(GSON.toJson(fileTaskConfig)); + return dataConfig; + } } diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/MockInstanceManager.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/MockInstanceManager.java new file mode 100644 index 00000000000..a989f3bfaf7 --- /dev/null +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/MockInstanceManager.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.task; + +public class MockInstanceManager { + + public void stop() { + } +} diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogfileCollectTask.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogfileCollectTask.java new file mode 100644 index 00000000000..760f3b6b470 --- /dev/null +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogfileCollectTask.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.task; + +import org.apache.inlong.agent.common.AgentThreadFactory; +import org.apache.inlong.agent.conf.TaskProfile; +import org.apache.inlong.agent.constant.TaskConstants; +import org.apache.inlong.agent.core.task.file.TaskManager; +import org.apache.inlong.agent.db.Db; +import org.apache.inlong.agent.plugin.AgentBaseTestsHelper; +import org.apache.inlong.agent.plugin.task.filecollect.LogFileCollectTask; +import org.apache.inlong.common.enums.TaskStateEnum; + +import com.google.gson.Gson; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import static org.awaitility.Awaitility.await; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(LogFileCollectTask.class) +@PowerMockIgnore({"javax.management.*"}) +public class TestLogfileCollectTask { + + private static final Logger LOGGER = LoggerFactory.getLogger(TestLogfileCollectTask.class); + private static final ClassLoader LOADER = TestLogfileCollectTask.class.getClassLoader(); + private static LogFileCollectTask task; + private static AgentBaseTestsHelper helper; + private static final Gson GSON = new Gson(); + private static TaskManager manager; + private static MockInstanceManager instanceManager = new MockInstanceManager(); + private static String resourceName; + private static String fileName; + private static String dataTime; + private static final ThreadPoolExecutor EXECUTOR_SERVICE = new ThreadPoolExecutor( + 0, Integer.MAX_VALUE, + 1L, TimeUnit.SECONDS, + new SynchronousQueue<>(), + new AgentThreadFactory("TestLogfileCollectTask")); + + @BeforeClass + public static void setup() { + helper = new AgentBaseTestsHelper(TestLogfileCollectTask.class.getName()).setupAgentHome(); + Db basicDb = TaskManager.initDb("/localdb"); + resourceName = LOADER.getResource("test/20230928.log_1").getPath(); + File f = new File(resourceName); + String pattern = f.getParent() + "/YYYYMMDD.log_[0-9]+"; + TaskProfile taskProfile = helper.getTaskProfile(1, pattern, true, 0L, 0L, TaskStateEnum.RUNNING); + try { + String startStr = "2023-09-20 00:00:00"; + String endStr = "2023-09-30 00:00:00"; + Date parse = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(startStr); + long start = parse.getTime(); + parse = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(endStr); + long end = parse.getTime(); + taskProfile.setLong(TaskConstants.TASK_START_TIME, start); + taskProfile.setLong(TaskConstants.TASK_END_TIME, end); + manager = new TaskManager(); + task = PowerMockito.spy(new LogFileCollectTask()); + PowerMockito.doAnswer(invocation -> { + fileName = invocation.getArgument(0); + dataTime = invocation.getArgument(1); + return null; + }).when(task, "addToEvenMap", Mockito.anyString(), Mockito.anyString()); + task.init(manager, taskProfile, basicDb); + EXECUTOR_SERVICE.submit(task); + } catch (Exception e) { + LOGGER.error("source init error {}", e); + Assert.assertTrue("source init error", false); + } + } + + @AfterClass + public static void teardown() throws Exception { + task.destroy(); + helper.teardownAgentHome(); + } + + @Test + public void testTaskManager() throws Exception { + await().atMost(2, TimeUnit.SECONDS).until(() -> fileName != null && dataTime != null); + Assert.assertTrue(fileName.compareTo(resourceName) == 0); + Assert.assertTrue(dataTime.compareTo("20230928") == 0); + PowerMockito.verifyPrivate(task, Mockito.times(1)) + .invoke("addToEvenMap", Mockito.anyString(), Mockito.anyString()); + } +} \ No newline at end of file