Skip to content

Commit

Permalink
[INLONG-8652][Agent] delete the capacity of setting blacklist
Browse files Browse the repository at this point in the history
  • Loading branch information
justinwwhuang committed Aug 8, 2023
1 parent 11a5b40 commit 1003edc
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ public class JobConstants extends CommonConstants {
public static final String JOB_FILE_TRIGGER = "job.fileJob.trigger";
public static final String JOB_DIR_FILTER_PATTERN = "job.fileJob.dir.pattern"; // deprecated
public static final String JOB_DIR_FILTER_PATTERNS = "job.fileJob.dir.patterns";
public static final String JOB_DIR_FILTER_BLACKLIST = "job.fileJob.dir.blackList";
public static final String JOB_FILE_TIME_OFFSET = "job.fileJob.timeOffset";
public static final String JOB_FILE_MAX_WAIT = "job.fileJob.file.max.wait";
public static final String JOB_CYCLE_UNIT = "job.fileJob.cycleUnit";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -52,7 +51,6 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.inlong.agent.constant.JobConstants.JOB_DIR_FILTER_BLACKLIST;
import static org.apache.inlong.agent.constant.JobConstants.JOB_DIR_FILTER_PATTERNS;

/**
Expand Down Expand Up @@ -104,8 +102,8 @@ public TriggerProfile getTriggerProfile() {
/**
* register pathPattern into watchers, with offset
*/
public Set<String> register(Set<String> whiteList, String offset, Set<String> blackList) throws IOException {
this.pathPatterns = PathPattern.buildPathPattern(whiteList, offset, blackList);
public Set<String> register(Set<String> whiteList, String offset) throws IOException {
this.pathPatterns = PathPattern.buildPathPattern(whiteList, offset);
LOGGER.info("Watch root path is {}", pathPatterns);

resourceProviderThread.initTrigger(this);
Expand All @@ -121,12 +119,8 @@ public void init(TriggerProfile profile) throws IOException {
if (this.profile.hasKey(JOB_DIR_FILTER_PATTERNS)) {
Set<String> pathPatterns = Stream.of(
this.profile.get(JOB_DIR_FILTER_PATTERNS).split(",")).collect(Collectors.toSet());
Set<String> blackList = Stream.of(
this.profile.get(JOB_DIR_FILTER_BLACKLIST, "").split(","))
.filter(black -> !StringUtils.isBlank(black))
.collect(Collectors.toSet());
String timeOffset = this.profile.get(JobConstants.JOB_FILE_TIME_OFFSET, "");
register(pathPatterns, timeOffset, blackList);
register(pathPatterns, timeOffset);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,27 +35,25 @@

/**
* Path pattern for file filter.
* It’s identified by watchDir, which matches {@link PathPattern#whiteList} and filters {@link PathPattern#blackList}.
* It’s identified by watchDir, which matches {@link org.apache.inlong.agent.plugin.trigger.PathPattern#whiteList} and filters {@link org.apache.inlong.agent.plugin.trigger.PathPattern#blackList}.
*/
public class PathPattern {

private static final Logger LOGGER = LoggerFactory.getLogger(PathPattern.class);
private static final Logger LOGGER =
LoggerFactory.getLogger(org.apache.inlong.agent.plugin.trigger.PathPattern.class);

private final String rootDir;
private final Set<String> subDirs;
// regex for those files should be matched
private final Set<DateFormatRegex> whiteList;
// regex for those files should be filtered
private final Set<String> blackList;

public PathPattern(String rootDir, Set<String> whiteList, Set<String> blackList) {
this(rootDir, whiteList, blackList, null);
public PathPattern(String rootDir, Set<String> whiteList) {
this(rootDir, whiteList, null);
}

public PathPattern(String rootDir, Set<String> whiteList, Set<String> blackList, String offset) {
public PathPattern(String rootDir, Set<String> whiteList, String offset) {
this.rootDir = rootDir;
this.subDirs = new HashSet<>();
this.blackList = blackList;
if (offset != null && StringUtils.isNotBlank(offset)) {
this.whiteList = whiteList.stream()
.map(whiteRegex -> DateFormatRegex.ofRegex(whiteRegex).withOffset(offset))
Expand All @@ -68,14 +66,15 @@ public PathPattern(String rootDir, Set<String> whiteList, Set<String> blackList,
}
}

public static Set<PathPattern> buildPathPattern(Set<String> whiteList, String offset, Set<String> blackList) {
public static Set<org.apache.inlong.agent.plugin.trigger.PathPattern> buildPathPattern(Set<String> whiteList,
String offset) {
Set<String> commonWatchDir = PathUtils.findCommonRootPath(whiteList);
return commonWatchDir.stream().map(rootDir -> {
Set<String> commonWatchDirWhiteList =
whiteList.stream()
.filter(whiteRegex -> whiteRegex.startsWith(rootDir))
.collect(Collectors.toSet());
return new PathPattern(rootDir, commonWatchDirWhiteList, blackList, offset);
return new org.apache.inlong.agent.plugin.trigger.PathPattern(rootDir, commonWatchDirWhiteList, offset);
}).collect(Collectors.toSet());
}

Expand All @@ -88,7 +87,7 @@ public void cleanup() {
}

/**
* Research all children files with {@link PathPattern#rootDir} matched whiteList and filtered by blackList.
* Research all children files with {@link org.apache.inlong.agent.plugin.trigger.PathPattern#rootDir} matched whiteList and filtered by blackList.
*
* @param maxNum
* @return
Expand Down Expand Up @@ -121,11 +120,6 @@ private void walkSuitableFiles(Collection<File> suitableFiles, File file, int ma
* @return true if suit else false.
*/
public boolean suitable(String path) {
// remove blacklist path
if (blackList.contains(path)) {
LOGGER.info("find blacklist path {}, ignore it.", path);
return false;
}
// remove common root path
String briefSubDir = StringUtils.substringAfter(path, rootDir);
// if already watched, then stop deep find
Expand Down Expand Up @@ -168,8 +162,9 @@ public int hashCode() {

@Override
public boolean equals(Object object) {
if (object instanceof PathPattern) {
PathPattern entity = (PathPattern) object;
if (object instanceof org.apache.inlong.agent.plugin.trigger.PathPattern) {
org.apache.inlong.agent.plugin.trigger.PathPattern entity =
(org.apache.inlong.agent.plugin.trigger.PathPattern) object;
return entity.rootDir.equals(this.rootDir);
} else {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.CompressionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -54,7 +53,6 @@
import static org.apache.inlong.agent.constant.CommonConstants.AGENT_OS_NAME;
import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_FILE_MAX_NUM;
import static org.apache.inlong.agent.constant.CommonConstants.FILE_MAX_NUM;
import static org.apache.inlong.agent.constant.JobConstants.JOB_DIR_FILTER_BLACKLIST;
import static org.apache.inlong.agent.constant.JobConstants.JOB_DIR_FILTER_PATTERNS;
import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_TIME_OFFSET;
import static org.apache.inlong.agent.constant.JobConstants.JOB_RETRY_TIME;
Expand Down Expand Up @@ -95,14 +93,10 @@ public static CompressionType convertType(String type) {
public static Collection<File> findSuitFiles(JobProfile jobConf) {
Set<String> dirPatterns = Stream.of(
jobConf.get(JOB_DIR_FILTER_PATTERNS).split(",")).collect(Collectors.toSet());
Set<String> blackList = Stream.of(
jobConf.get(JOB_DIR_FILTER_BLACKLIST, "").split(","))
.filter(black -> !StringUtils.isBlank(black))
.collect(Collectors.toSet());
LOGGER.info("start to find files with dir pattern {}", dirPatterns);

Set<PathPattern> pathPatterns =
PathPattern.buildPathPattern(dirPatterns, jobConf.get(JOB_FILE_TIME_OFFSET, null), blackList);
PathPattern.buildPathPattern(dirPatterns, jobConf.get(JOB_FILE_TIME_OFFSET, null));
updateRetryTime(jobConf, pathPatterns);
int maxFileNum = jobConf.getInt(FILE_MAX_NUM, DEFAULT_FILE_MAX_NUM);
LOGGER.info("dir pattern {}, max file num {}", dirPatterns, maxFileNum);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.inlong.agent.plugin.trigger.PathPattern;
import org.apache.inlong.agent.utils.AgentUtils;

import com.google.common.collect.Sets;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -80,7 +79,7 @@ public void testRegexAndTimeoffset() throws IOException {
File file = Paths.get(helper.getTestRootDir().toString(), pathTime.concat(".log")).toFile();
file.createNewFile();
PathPattern entity = new PathPattern(helper.getTestRootDir().toString(),
Collections.singleton(helper.getTestRootDir().toString() + "/yyyyMMdd.log"), Sets.newHashSet(), "-1d");
Collections.singleton(helper.getTestRootDir().toString() + "/yyyyMMdd.log"), "-1d");
boolean flag = entity.suitable(file.getPath());
Assert.assertTrue(flag);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,12 @@

public class TestWatchDirTrigger {

@ClassRule
public static final TemporaryFolder WATCH_FOLDER = new TemporaryFolder();
private static final Logger LOGGER = LoggerFactory.getLogger(TestWatchDirTrigger.class);
private static DirectoryTrigger trigger;

@ClassRule
public static final TemporaryFolder WATCH_FOLDER = new TemporaryFolder();

@Before
public void setupEach() throws Exception {
trigger = new DirectoryTrigger();
Expand All @@ -68,16 +69,16 @@ public void teardownEach() {
trigger.getFetchedJob().clear();
}

public void registerPathPattern(Set<String> whiteList, Set<String> blackList, String offset) throws IOException {
trigger.register(whiteList, offset, blackList);
public void registerPathPattern(Set<String> whiteList, String offset) throws IOException {
trigger.register(whiteList, offset);
}

@Test
public void testWatchEntity() throws Exception {
PathPattern a1 = new PathPattern("1",
Collections.singleton(WATCH_FOLDER.getRoot().toString()), Sets.newHashSet());
Collections.singleton(WATCH_FOLDER.getRoot().toString()));
PathPattern a2 = new PathPattern("1",
Collections.singleton(WATCH_FOLDER.getRoot().toString()), Sets.newHashSet());
Collections.singleton(WATCH_FOLDER.getRoot().toString()));
HashMap<PathPattern, Integer> map = new HashMap<>();
map.put(a1, 10);
Integer result = map.remove(a2);
Expand All @@ -94,17 +95,17 @@ public void testBlackList() throws Exception {
registerPathPattern(
Sets.newHashSet(WATCH_FOLDER.getRoot().getAbsolutePath()
+ File.separator + "**" + File.separator + "*.log"),
Sets.newHashSet(WATCH_FOLDER.getRoot().getAbsolutePath() + File.separator + "tmp"),
null);
File file1 = WATCH_FOLDER.newFile("1.log");
File tmp = WATCH_FOLDER.newFolder("tmp");
File file2 = new File(tmp.getAbsolutePath() + File.separator + "2.log");
file2.createNewFile();
await().atMost(10, TimeUnit.SECONDS).until(() -> trigger.getFetchedJob().size() >= 0);
await().atMost(10, TimeUnit.SECONDS).until(() -> trigger.getFetchedJob().size() == 1);
Collection<Map<String, String>> jobs = trigger.getFetchedJob();
Set<String> jobPaths = jobs.stream()
.map(job -> job.get(JobConstants.JOB_DIR_FILTER_PATTERNS))
.collect(Collectors.toSet());
Assert.assertTrue(jobPaths.contains(file1.getAbsolutePath()));
}

@Test
Expand All @@ -119,7 +120,7 @@ public void testCreateBeforeWatch() throws Exception {
registerPathPattern(
Sets.newHashSet(
WATCH_FOLDER.getRoot().getAbsolutePath() + File.separator + "**" + File.separator + "*.log"),
Collections.emptySet(), null);
null);
await().atMost(10, TimeUnit.SECONDS).until(() -> trigger.getFetchedJob().size() == 1);
}

Expand All @@ -132,11 +133,11 @@ public void testWatchDeepMatch() throws Exception {
registerPathPattern(
Sets.newHashSet(
WATCH_FOLDER.getRoot().getAbsolutePath() + File.separator + "**" + File.separator + "*.log"),
Collections.emptySet(), null);
null);
File tmp = WATCH_FOLDER.newFolder("tmp", "deep");
File file4 = new File(tmp.getAbsolutePath() + File.separator + "1.log");
file4.createNewFile();
await().atMost(10, TimeUnit.SECONDS).until(() -> trigger.getFetchedJob().size() >= 0);
await().atMost(10, TimeUnit.SECONDS).until(() -> trigger.getFetchedJob().size() == 1);
}

@Test
Expand All @@ -149,7 +150,7 @@ public void testMultiPattern() throws Exception {
Sets.newHashSet(
WATCH_FOLDER.getRoot().getAbsolutePath() + File.separator + "tmp" + File.separator + "*.log",
WATCH_FOLDER.getRoot().getAbsolutePath() + File.separator + "**" + File.separator + "*.txt"),
Collections.emptySet(), null);
null);
final File file1 = WATCH_FOLDER.newFile("1.txt");
File file2 = WATCH_FOLDER.newFile("2.log");
File file3 = WATCH_FOLDER.newFile("3.tar.gz");
Expand All @@ -158,11 +159,14 @@ public void testMultiPattern() throws Exception {
file4.createNewFile();
File file5 = new File(tmp.getAbsolutePath() + File.separator + "5.log");
file5.createNewFile();
System.out.println("trigger.getFetchedJob().size() " + trigger.getFetchedJob().size());
await().atMost(10, TimeUnit.SECONDS).until(() -> trigger.getFetchedJob().size() >= 0);

await().atMost(10, TimeUnit.SECONDS).until(() -> trigger.getFetchedJob().size() == 3);
Collection<Map<String, String>> jobs = trigger.getFetchedJob();
Set<String> jobPaths = jobs.stream()
.map(job -> job.get(JobConstants.JOB_DIR_FILTER_PATTERNS))
.collect(Collectors.toSet());
Assert.assertTrue(jobPaths.contains(file1.getAbsolutePath()));
Assert.assertTrue(jobPaths.contains(file4.getAbsolutePath()));
Assert.assertTrue(jobPaths.contains(file5.getAbsolutePath()));
}
}

0 comments on commit 1003edc

Please sign in to comment.