Skip to content

Commit

Permalink
[INLONG-9143][Agent] Add log file collect task
Browse files Browse the repository at this point in the history
  • Loading branch information
justinwwhuang committed Oct 27, 2023
1 parent 9c07537 commit 43e36f0
Show file tree
Hide file tree
Showing 11 changed files with 1,590 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.agent.plugin.task;

import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.db.Db;
import org.apache.inlong.agent.plugin.file.Task;

/**
* Generate job by crontab expression.
*/
public class CronTask extends Task {

@Override
public void init(Object srcManager, TaskProfile profile, Db basicDb) {

}

@Override
public void run() {

}

@Override
public void destroy() {

}

@Override
public TaskProfile getProfile() {
return null;
}

@Override
public String getTaskId() {
return null;
}

@Override
public void addCallbacks() {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.agent.plugin.task;

import org.apache.inlong.agent.plugin.task.filecollect.LogFileCollectTask;

/**
* Directory trigger with format date.
*/
public class FormatDateLogFileCollectTask extends LogFileCollectTask {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.agent.plugin.task;

import org.apache.inlong.agent.plugin.filter.DateFormatRegex;
import org.apache.inlong.agent.utils.PathUtils;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Path pattern for file filter.
* It’s identified by watchDir, which matches {@link PathPattern#whiteList}.
*/
public class PathPattern {

private static final Logger LOGGER =
LoggerFactory.getLogger(PathPattern.class);

private final String rootDir;
private final Set<String> subDirs;
// regex for those files should be matched
private final Set<DateFormatRegex> whiteList;

public PathPattern(String rootDir, Set<String> whiteList) {
this(rootDir, whiteList, null);
}

public PathPattern(String rootDir, Set<String> whiteList, String offset) {
this.rootDir = rootDir;
this.subDirs = new HashSet<>();
if (offset != null && StringUtils.isNotBlank(offset)) {
this.whiteList = whiteList.stream()
.map(whiteRegex -> DateFormatRegex.ofRegex(whiteRegex).withOffset(offset))
.collect(Collectors.toSet());
updateDateFormatRegex();
} else {
this.whiteList = whiteList.stream()
.map(whiteRegex -> DateFormatRegex.ofRegex(whiteRegex))
.collect(Collectors.toSet());
}
}

public static Set<PathPattern> buildPathPattern(Set<String> whiteList,
String offset) {
Set<String> commonWatchDir = PathUtils.findCommonRootPath(whiteList);
return commonWatchDir.stream().map(rootDir -> {
Set<String> commonWatchDirWhiteList =
whiteList.stream()
.filter(whiteRegex -> whiteRegex.startsWith(rootDir))
.collect(Collectors.toSet());
return new PathPattern(rootDir, commonWatchDirWhiteList, offset);
}).collect(Collectors.toSet());
}

/**
* cleanup local cache, subDirs is only used to filter duplicated directories
* in one term watch key check.
*/
public void cleanup() {
subDirs.clear();
}

/**
* Research all children files with {@link PathPattern#rootDir} matched whiteList and filtered by blackList.
*
* @param maxNum
* @return
*/
public Collection<File> walkSuitableFiles(int maxNum) {
Collection<File> suitableFiles = new ArrayList<>();
walkSuitableFiles(suitableFiles, new File(rootDir), maxNum);
return suitableFiles;
}

private void walkSuitableFiles(Collection<File> suitableFiles, File file, int maxNum) {
if (suitableFiles.size() > maxNum) {
LOGGER.warn("Suitable files exceed max num {}, just return.", maxNum);
return;
}

if (suitable(file.getAbsolutePath())) {
if (file.isFile()) {
suitableFiles.add(file);
} else if (file.isDirectory()) {
Stream.of(file.listFiles()).forEach(subFile -> walkSuitableFiles(suitableFiles, subFile, maxNum));
}
}
}

/**
* Check whether path is suitable for match whiteList and filtered by blackList
*
* @param path pathString
* @return true if suit else false.
*/
public boolean suitable(String path) {
// remove common root path
String briefSubDir = StringUtils.substringAfter(path, rootDir);
// if already watched, then stop deep find
if (subDirs.contains(briefSubDir)) {
LOGGER.info("already watched {}", path);
return false;
}

subDirs.add(briefSubDir);
File file = new File(path);
return whiteList.stream()
.filter(whiteRegex -> whiteRegex.match(file))
.findAny()
.isPresent();
}

/**
* when a new file is found, update regex since time may change.
*/
public void updateDateFormatRegex() {
whiteList.forEach(DateFormatRegex::setRegexWithCurrentTime);
}

/**
* when job is retry job, the time for searching file should be specified.
*/
public void updateDateFormatRegex(String time) {
whiteList.forEach(whiteRegex -> whiteRegex.setRegexWithTime(time));
}

@Override
public String toString() {
return rootDir;
}

@Override
public int hashCode() {
return HashCodeBuilder.reflectionHashCode(rootDir, false);
}

@Override
public boolean equals(Object object) {
if (object instanceof PathPattern) {
PathPattern entity = (PathPattern) object;
return entity.rootDir.equals(this.rootDir);
} else {
return false;
}
}

public String getRootDir() {
return rootDir;
}

public String getSuitTime() {
// todo: Adapt to datetime in the case of multiple regex
return whiteList.stream().findAny().get().getFormattedTime();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.agent.plugin.task.filecollect;

public class AgentErrMsg {

public static final String CONFIG_SUCCESS = "SUCCESS";

// 数据源配置异常 */
public static final String DATA_SOURCE_CONFIG_ERROR = "ERROR-0-TDAgent|10001|ERROR"
+ "|ERROR_DATA_SOURCE_CONFIG|";

// 监控文件夹不存在 */
public static final String DIRECTORY_NOT_FOUND_ERROR = "ERROR-0-TDAgent|11001|WARN"
+ "|WARN_DIRECTORY_NOT_EXIST|";

// 监控文件夹时出错 */
public static final String WATCH_DIR_ERROR = "ERROR-0-TDAgent|11002|ERROR"
+ "|ERROR_WATCH_DIR_ERROR|";

// 要读取的文件异常(不存在,rotate)
public static final String FILE_ERROR = "ERROR-0-TDAgent|10002|ERROR|ERROR_SOURCE_FILE|";

// 读取文件异常
public static final String FILE_OP_ERROR = "ERROR-1-TDAgent|30002|ERROR|ERROR_OPERATE_FILE|";

// 磁盘满
public static final String DISK_FULL = "ERROR-1-TDAgent|30001|FATAL|FATAL_DISK_FULL|";

// 内存溢出
public static final String OOM_ERROR = "ERROR-1-TDAgent|30001|FATAL|FATAL_OOM_ERROR|";

// watcher异常
public static final String WATCHER_INVALID = "ERROR-1-TDAgent|40001|WARN|WARN_INVALID_WATCHER|";

// 连不上tdmanager
public static final String CONNECT_TDM_ERROR = "ERROR-1-TDAgent|30002|ERROR"
+ "|ERROR_CANNOT_CONNECT_TO_TDM|";

// 发送数据到tdbus失败
public static final String SEND_TO_BUS_ERROR = "ERROR-1-TDAgent|30003|ERROR|ERROR_SEND_TO_BUS|";

// 操作bdb异常
public static final String BDB_ERROR = "ERROR-1-TDAgent|30003|ERROR|BDB_OPERATION_ERROR|";

// 内部缓存满
public static final String MSG_BUFFER_FULL = "ERROR-1-TDAgent|40002|WARN|WARN_MSG_BUFFER_FULL|";

// 监控到的事件不合法(任务已删除)
public static final String FOUND_EVENT_INVALID = "ERROR-1-TDAgent|30003|ERROR"
+ "|FOUND_EVENT_INVALID|";
}
Loading

0 comments on commit 43e36f0

Please sign in to comment.