From 1003edc9cc80173b28c28973e87dc1b43afd9973 Mon Sep 17 00:00:00 2001 From: wenweihuang Date: Tue, 8 Aug 2023 11:21:54 +0800 Subject: [PATCH] [INLONG-8652][Agent] delete the capacity of setting blacklist --- .../inlong/agent/constant/JobConstants.java | 1 - .../plugin/trigger/DirectoryTrigger.java | 12 ++----- .../agent/plugin/trigger/PathPattern.java | 31 ++++++++---------- .../agent/plugin/utils/PluginUtils.java | 8 +---- .../plugin/filter/TestDateFormatRegex.java | 3 +- .../plugin/trigger/TestWatchDirTrigger.java | 32 +++++++++++-------- 6 files changed, 36 insertions(+), 51 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java index a2fff7885cc..7a3347f6e8c 100755 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java @@ -54,7 +54,6 @@ public class JobConstants extends CommonConstants { public static final String JOB_FILE_TRIGGER = "job.fileJob.trigger"; public static final String JOB_DIR_FILTER_PATTERN = "job.fileJob.dir.pattern"; // deprecated public static final String JOB_DIR_FILTER_PATTERNS = "job.fileJob.dir.patterns"; - public static final String JOB_DIR_FILTER_BLACKLIST = "job.fileJob.dir.blackList"; public static final String JOB_FILE_TIME_OFFSET = "job.fileJob.timeOffset"; public static final String JOB_FILE_MAX_WAIT = "job.fileJob.file.max.wait"; public static final String JOB_CYCLE_UNIT = "job.fileJob.cycleUnit"; diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/DirectoryTrigger.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/DirectoryTrigger.java index 38e688713a5..b7dcb453072 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/DirectoryTrigger.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/DirectoryTrigger.java @@ -27,7 +27,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Sets; -import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,7 +51,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.apache.inlong.agent.constant.JobConstants.JOB_DIR_FILTER_BLACKLIST; import static org.apache.inlong.agent.constant.JobConstants.JOB_DIR_FILTER_PATTERNS; /** @@ -104,8 +102,8 @@ public TriggerProfile getTriggerProfile() { /** * register pathPattern into watchers, with offset */ - public Set register(Set whiteList, String offset, Set blackList) throws IOException { - this.pathPatterns = PathPattern.buildPathPattern(whiteList, offset, blackList); + public Set register(Set whiteList, String offset) throws IOException { + this.pathPatterns = PathPattern.buildPathPattern(whiteList, offset); LOGGER.info("Watch root path is {}", pathPatterns); resourceProviderThread.initTrigger(this); @@ -121,12 +119,8 @@ public void init(TriggerProfile profile) throws IOException { if (this.profile.hasKey(JOB_DIR_FILTER_PATTERNS)) { Set pathPatterns = Stream.of( this.profile.get(JOB_DIR_FILTER_PATTERNS).split(",")).collect(Collectors.toSet()); - Set blackList = Stream.of( - this.profile.get(JOB_DIR_FILTER_BLACKLIST, "").split(",")) - .filter(black -> !StringUtils.isBlank(black)) - .collect(Collectors.toSet()); String timeOffset = this.profile.get(JobConstants.JOB_FILE_TIME_OFFSET, ""); - register(pathPatterns, timeOffset, blackList); + register(pathPatterns, timeOffset); } } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/PathPattern.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/PathPattern.java index 32b6b87bccb..7df93dcdd0e 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/PathPattern.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/PathPattern.java @@ -35,27 +35,25 @@ /** * Path pattern for file filter. - * It’s identified by watchDir, which matches {@link PathPattern#whiteList} and filters {@link PathPattern#blackList}. + * It’s identified by watchDir, which matches {@link org.apache.inlong.agent.plugin.trigger.PathPattern#whiteList} and filters {@link org.apache.inlong.agent.plugin.trigger.PathPattern#blackList}. */ public class PathPattern { - private static final Logger LOGGER = LoggerFactory.getLogger(PathPattern.class); + private static final Logger LOGGER = + LoggerFactory.getLogger(org.apache.inlong.agent.plugin.trigger.PathPattern.class); private final String rootDir; private final Set subDirs; // regex for those files should be matched private final Set whiteList; - // regex for those files should be filtered - private final Set blackList; - public PathPattern(String rootDir, Set whiteList, Set blackList) { - this(rootDir, whiteList, blackList, null); + public PathPattern(String rootDir, Set whiteList) { + this(rootDir, whiteList, null); } - public PathPattern(String rootDir, Set whiteList, Set blackList, String offset) { + public PathPattern(String rootDir, Set whiteList, String offset) { this.rootDir = rootDir; this.subDirs = new HashSet<>(); - this.blackList = blackList; if (offset != null && StringUtils.isNotBlank(offset)) { this.whiteList = whiteList.stream() .map(whiteRegex -> DateFormatRegex.ofRegex(whiteRegex).withOffset(offset)) @@ -68,14 +66,15 @@ public PathPattern(String rootDir, Set whiteList, Set blackList, } } - public static Set buildPathPattern(Set whiteList, String offset, Set blackList) { + public static Set buildPathPattern(Set whiteList, + String offset) { Set commonWatchDir = PathUtils.findCommonRootPath(whiteList); return commonWatchDir.stream().map(rootDir -> { Set commonWatchDirWhiteList = whiteList.stream() .filter(whiteRegex -> whiteRegex.startsWith(rootDir)) .collect(Collectors.toSet()); - return new PathPattern(rootDir, commonWatchDirWhiteList, blackList, offset); + return new org.apache.inlong.agent.plugin.trigger.PathPattern(rootDir, commonWatchDirWhiteList, offset); }).collect(Collectors.toSet()); } @@ -88,7 +87,7 @@ public void cleanup() { } /** - * Research all children files with {@link PathPattern#rootDir} matched whiteList and filtered by blackList. + * Research all children files with {@link org.apache.inlong.agent.plugin.trigger.PathPattern#rootDir} matched whiteList and filtered by blackList. * * @param maxNum * @return @@ -121,11 +120,6 @@ private void walkSuitableFiles(Collection suitableFiles, File file, int ma * @return true if suit else false. */ public boolean suitable(String path) { - // remove blacklist path - if (blackList.contains(path)) { - LOGGER.info("find blacklist path {}, ignore it.", path); - return false; - } // remove common root path String briefSubDir = StringUtils.substringAfter(path, rootDir); // if already watched, then stop deep find @@ -168,8 +162,9 @@ public int hashCode() { @Override public boolean equals(Object object) { - if (object instanceof PathPattern) { - PathPattern entity = (PathPattern) object; + if (object instanceof org.apache.inlong.agent.plugin.trigger.PathPattern) { + org.apache.inlong.agent.plugin.trigger.PathPattern entity = + (org.apache.inlong.agent.plugin.trigger.PathPattern) object; return entity.rootDir.equals(this.rootDir); } else { return false; diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java index 6d84f6d5204..44ad049cf2e 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java @@ -29,7 +29,6 @@ import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.KubernetesClientBuilder; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.CompressionType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,7 +53,6 @@ import static org.apache.inlong.agent.constant.CommonConstants.AGENT_OS_NAME; import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_FILE_MAX_NUM; import static org.apache.inlong.agent.constant.CommonConstants.FILE_MAX_NUM; -import static org.apache.inlong.agent.constant.JobConstants.JOB_DIR_FILTER_BLACKLIST; import static org.apache.inlong.agent.constant.JobConstants.JOB_DIR_FILTER_PATTERNS; import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_TIME_OFFSET; import static org.apache.inlong.agent.constant.JobConstants.JOB_RETRY_TIME; @@ -95,14 +93,10 @@ public static CompressionType convertType(String type) { public static Collection findSuitFiles(JobProfile jobConf) { Set dirPatterns = Stream.of( jobConf.get(JOB_DIR_FILTER_PATTERNS).split(",")).collect(Collectors.toSet()); - Set blackList = Stream.of( - jobConf.get(JOB_DIR_FILTER_BLACKLIST, "").split(",")) - .filter(black -> !StringUtils.isBlank(black)) - .collect(Collectors.toSet()); LOGGER.info("start to find files with dir pattern {}", dirPatterns); Set pathPatterns = - PathPattern.buildPathPattern(dirPatterns, jobConf.get(JOB_FILE_TIME_OFFSET, null), blackList); + PathPattern.buildPathPattern(dirPatterns, jobConf.get(JOB_FILE_TIME_OFFSET, null)); updateRetryTime(jobConf, pathPatterns); int maxFileNum = jobConf.getInt(FILE_MAX_NUM, DEFAULT_FILE_MAX_NUM); LOGGER.info("dir pattern {}, max file num {}", dirPatterns, maxFileNum); diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/filter/TestDateFormatRegex.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/filter/TestDateFormatRegex.java index 4b4064b5fd0..76fb3553452 100755 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/filter/TestDateFormatRegex.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/filter/TestDateFormatRegex.java @@ -24,7 +24,6 @@ import org.apache.inlong.agent.plugin.trigger.PathPattern; import org.apache.inlong.agent.utils.AgentUtils; -import com.google.common.collect.Sets; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -80,7 +79,7 @@ public void testRegexAndTimeoffset() throws IOException { File file = Paths.get(helper.getTestRootDir().toString(), pathTime.concat(".log")).toFile(); file.createNewFile(); PathPattern entity = new PathPattern(helper.getTestRootDir().toString(), - Collections.singleton(helper.getTestRootDir().toString() + "/yyyyMMdd.log"), Sets.newHashSet(), "-1d"); + Collections.singleton(helper.getTestRootDir().toString() + "/yyyyMMdd.log"), "-1d"); boolean flag = entity.suitable(file.getPath()); Assert.assertTrue(flag); } diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/trigger/TestWatchDirTrigger.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/trigger/TestWatchDirTrigger.java index b98fd5301ec..dadffd7c0fe 100755 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/trigger/TestWatchDirTrigger.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/trigger/TestWatchDirTrigger.java @@ -45,11 +45,12 @@ public class TestWatchDirTrigger { - @ClassRule - public static final TemporaryFolder WATCH_FOLDER = new TemporaryFolder(); private static final Logger LOGGER = LoggerFactory.getLogger(TestWatchDirTrigger.class); private static DirectoryTrigger trigger; + @ClassRule + public static final TemporaryFolder WATCH_FOLDER = new TemporaryFolder(); + @Before public void setupEach() throws Exception { trigger = new DirectoryTrigger(); @@ -68,16 +69,16 @@ public void teardownEach() { trigger.getFetchedJob().clear(); } - public void registerPathPattern(Set whiteList, Set blackList, String offset) throws IOException { - trigger.register(whiteList, offset, blackList); + public void registerPathPattern(Set whiteList, String offset) throws IOException { + trigger.register(whiteList, offset); } @Test public void testWatchEntity() throws Exception { PathPattern a1 = new PathPattern("1", - Collections.singleton(WATCH_FOLDER.getRoot().toString()), Sets.newHashSet()); + Collections.singleton(WATCH_FOLDER.getRoot().toString())); PathPattern a2 = new PathPattern("1", - Collections.singleton(WATCH_FOLDER.getRoot().toString()), Sets.newHashSet()); + Collections.singleton(WATCH_FOLDER.getRoot().toString())); HashMap map = new HashMap<>(); map.put(a1, 10); Integer result = map.remove(a2); @@ -94,17 +95,17 @@ public void testBlackList() throws Exception { registerPathPattern( Sets.newHashSet(WATCH_FOLDER.getRoot().getAbsolutePath() + File.separator + "**" + File.separator + "*.log"), - Sets.newHashSet(WATCH_FOLDER.getRoot().getAbsolutePath() + File.separator + "tmp"), null); File file1 = WATCH_FOLDER.newFile("1.log"); File tmp = WATCH_FOLDER.newFolder("tmp"); File file2 = new File(tmp.getAbsolutePath() + File.separator + "2.log"); file2.createNewFile(); - await().atMost(10, TimeUnit.SECONDS).until(() -> trigger.getFetchedJob().size() >= 0); + await().atMost(10, TimeUnit.SECONDS).until(() -> trigger.getFetchedJob().size() == 1); Collection> jobs = trigger.getFetchedJob(); Set jobPaths = jobs.stream() .map(job -> job.get(JobConstants.JOB_DIR_FILTER_PATTERNS)) .collect(Collectors.toSet()); + Assert.assertTrue(jobPaths.contains(file1.getAbsolutePath())); } @Test @@ -119,7 +120,7 @@ public void testCreateBeforeWatch() throws Exception { registerPathPattern( Sets.newHashSet( WATCH_FOLDER.getRoot().getAbsolutePath() + File.separator + "**" + File.separator + "*.log"), - Collections.emptySet(), null); + null); await().atMost(10, TimeUnit.SECONDS).until(() -> trigger.getFetchedJob().size() == 1); } @@ -132,11 +133,11 @@ public void testWatchDeepMatch() throws Exception { registerPathPattern( Sets.newHashSet( WATCH_FOLDER.getRoot().getAbsolutePath() + File.separator + "**" + File.separator + "*.log"), - Collections.emptySet(), null); + null); File tmp = WATCH_FOLDER.newFolder("tmp", "deep"); File file4 = new File(tmp.getAbsolutePath() + File.separator + "1.log"); file4.createNewFile(); - await().atMost(10, TimeUnit.SECONDS).until(() -> trigger.getFetchedJob().size() >= 0); + await().atMost(10, TimeUnit.SECONDS).until(() -> trigger.getFetchedJob().size() == 1); } @Test @@ -149,7 +150,7 @@ public void testMultiPattern() throws Exception { Sets.newHashSet( WATCH_FOLDER.getRoot().getAbsolutePath() + File.separator + "tmp" + File.separator + "*.log", WATCH_FOLDER.getRoot().getAbsolutePath() + File.separator + "**" + File.separator + "*.txt"), - Collections.emptySet(), null); + null); final File file1 = WATCH_FOLDER.newFile("1.txt"); File file2 = WATCH_FOLDER.newFile("2.log"); File file3 = WATCH_FOLDER.newFile("3.tar.gz"); @@ -158,11 +159,14 @@ public void testMultiPattern() throws Exception { file4.createNewFile(); File file5 = new File(tmp.getAbsolutePath() + File.separator + "5.log"); file5.createNewFile(); - System.out.println("trigger.getFetchedJob().size() " + trigger.getFetchedJob().size()); - await().atMost(10, TimeUnit.SECONDS).until(() -> trigger.getFetchedJob().size() >= 0); + + await().atMost(10, TimeUnit.SECONDS).until(() -> trigger.getFetchedJob().size() == 3); Collection> jobs = trigger.getFetchedJob(); Set jobPaths = jobs.stream() .map(job -> job.get(JobConstants.JOB_DIR_FILTER_PATTERNS)) .collect(Collectors.toSet()); + Assert.assertTrue(jobPaths.contains(file1.getAbsolutePath())); + Assert.assertTrue(jobPaths.contains(file4.getAbsolutePath())); + Assert.assertTrue(jobPaths.contains(file5.getAbsolutePath())); } }