Skip to content

Commit

Permalink
[INLONG-11135][Agent] Support filtering capability when supplementing…
Browse files Browse the repository at this point in the history
… data
  • Loading branch information
justinwwhuang committed Sep 18, 2024
1 parent 9e443cb commit 433307c
Show file tree
Hide file tree
Showing 16 changed files with 90 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@
public abstract class AbstractConfiguration {

private static final Logger LOGGER = LoggerFactory.getLogger(AbstractConfiguration.class);
private static final JsonParser JSON_PARSER = new JsonParser();

private final Map<String, JsonPrimitive> configStorage = new HashMap<>();

/**
Expand Down Expand Up @@ -81,7 +79,7 @@ private void loadResource(String fileName, boolean isJson) {
if (inputStream != null) {
reader = new InputStreamReader(inputStream, StandardCharsets.UTF_8);
if (isJson) {
JsonElement tmpElement = JSON_PARSER.parse(reader).getAsJsonObject();
JsonElement tmpElement = JsonParser.parseReader(reader).getAsJsonObject();
updateConfig(new HashMap<>(10), 0, tmpElement);
} else {
Properties properties = new Properties();
Expand All @@ -103,7 +101,7 @@ private void loadResource(String fileName, boolean isJson) {
* @param jsonStr json string
*/
public void loadJsonStrResource(String jsonStr) {
JsonElement tmpElement = JSON_PARSER.parse(jsonStr);
JsonElement tmpElement = JsonParser.parseString(jsonStr);
updateConfig(new HashMap<>(10), 0, tmpElement);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public class TaskConstants extends CommonConstants {
public static final String TASK_FILE_CONTENT_COLLECT_TYPE = "task.fileTask.contentCollectType";
public static final String SOURCE_DATA_CONTENT_STYLE = "task.fileTask.dataContentStyle";
public static final String SOURCE_DATA_SEPARATOR = "task.fileTask.dataSeparator";
public static final String SOURCE_FILTER_STREAMS = "task.fileTask.filterStreams";
public static final String TASK_RETRY = "task.fileTask.retry";
public static final String TASK_START_TIME = "task.fileTask.startTime";
public static final String TASK_END_TIME = "task.fileTask.endTime";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import lombok.Data;

import java.util.Map;
import java.util.List;

@Data
public class FileTask {
Expand All @@ -46,8 +46,8 @@ public class FileTask {

private String dataSeparator;

// JSON string, the content format is Map<String,Object>
private String properties;
// The streamIds to be filtered out
private String filterStreams;

// Monitor interval for file
private Long monitorInterval;
Expand Down Expand Up @@ -121,8 +121,8 @@ public static class FileTaskConfig {
// Column separator of data source
private String dataSeparator;

// Properties for file
private Map<String, Object> properties;
// The streamIds to be filtered out
private List<String> filterStreams;

// Monitor interval for file
private Long monitorInterval;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ private static FileTask getFileTask(DataConfig dataConfig) {
fileTask.setCycleUnit(taskConfig.getCycleUnit());
fileTask.setStartTime(taskConfig.getStartTime());
fileTask.setEndTime(taskConfig.getEndTime());
if (taskConfig.getFilterStreams() != null) {
fileTask.setFilterStreams(GSON.toJson(taskConfig.getFilterStreams()));
}
if (taskConfig.getTimeOffset() != null) {
fileTask.setTimeOffset(taskConfig.getTimeOffset());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,37 @@ private void initExtendHandler() {

@Override
public Message read() {
SourceData sourceData = readFromQueue();
while (sourceData != null) {
Message msg = createMessage(sourceData);
if (filterSourceData(msg)) {
long auditTime = 0;
if (isRealTime) {
auditTime = AgentUtils.getCurrentTime();
} else {
auditTime = profile.getSinkDataTime();
}
Map<String, String> header = msg.getHeader();
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, header.get(PROXY_KEY_STREAM_ID),
auditTime, 1, sourceData.getData().length, auditVersion);
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS_REAL_TIME, inlongGroupId,
header.get(PROXY_KEY_STREAM_ID),
AgentUtils.getCurrentTime(), 1, sourceData.getData().length, auditVersion);
return msg;
}
sourceData = readFromQueue();
}
return null;
}

private boolean filterSourceData(Message msg) {
if (extendedHandler != null) {
return extendedHandler.filterMessage(msg);
}
return true;
}

private SourceData readFromQueue() {
SourceData sourceData = null;
try {
sourceData = queue.poll(READ_WAIT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
Expand All @@ -321,7 +352,7 @@ public Message read() {
}
LOGGER.debug("Read from source queue {} {}", new String(sourceData.getData()), inlongGroupId);
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, sourceData.getData().length);
return createMessage(sourceData);
return sourceData;
}

private Message createMessage(SourceData sourceData) {
Expand All @@ -333,16 +364,6 @@ private Message createMessage(SourceData sourceData) {
if (extendedHandler != null) {
extendedHandler.dealWithHeader(header, sourceData.getData());
}
long auditTime = 0;
if (isRealTime) {
auditTime = AgentUtils.getCurrentTime();
} else {
auditTime = profile.getSinkDataTime();
}
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, header.get(PROXY_KEY_STREAM_ID),
auditTime, 1, sourceData.getData().length, auditVersion);
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS_REAL_TIME, inlongGroupId, header.get(PROXY_KEY_STREAM_ID),
AgentUtils.getCurrentTime(), 1, sourceData.getData().length, auditVersion);
Message finalMsg = new DefaultMessage(sourceData.getData(), header);
// if the message size is greater than max pack size,should drop it.
if (finalMsg.getBody().length > maxPackSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,23 @@
package org.apache.inlong.agent.plugin.sources.file.extend;

import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.plugin.Message;

import java.util.Map;

// For some private, customized extension processing
public abstract class ExtendedHandler {

public ExtendedHandler(InstanceProfile profile) {
protected InstanceProfile profile;

public ExtendedHandler(InstanceProfile profile) {
this.profile = profile;
}

// Modify the header by the body
public void dealWithHeader(Map<String, String> header, byte[] body) {
abstract public void dealWithHeader(Map<String, String> header, byte[] body);

}
abstract public boolean filterMessage(Message msg);

public static class Constants {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;

/**
* common environment setting up for test cases.
Expand Down Expand Up @@ -80,15 +81,19 @@ public void teardownAgentHome() {
}
}

public TaskProfile getTaskProfile(int taskId, String pattern, boolean retry, Long startTime, Long endTime,
TaskStateEnum state, String cycleUnit, String timeZone) {
DataConfig dataConfig = getDataConfig(taskId, pattern, retry, startTime, endTime, state, cycleUnit, timeZone);
public TaskProfile getTaskProfile(int taskId, String pattern, String dataContentStyle, boolean retry,
Long startTime, Long endTime,
TaskStateEnum state, String cycleUnit, String timeZone, List<String> filterStreams) {
DataConfig dataConfig = getDataConfig(taskId, pattern, dataContentStyle, retry, startTime, endTime,
state, cycleUnit, timeZone,
filterStreams);
TaskProfile profile = TaskProfile.convertToTaskProfile(dataConfig);
return profile;
}

private DataConfig getDataConfig(int taskId, String pattern, boolean retry, Long startTime, Long endTime,
TaskStateEnum state, String cycleUnit, String timeZone) {
private DataConfig getDataConfig(int taskId, String pattern, String dataContentStyle, boolean retry, Long startTime,
Long endTime,
TaskStateEnum state, String cycleUnit, String timeZone, List<String> filterStreams) {
DataConfig dataConfig = new DataConfig();
dataConfig.setInlongGroupId("testGroupId");
dataConfig.setInlongStreamId("testStreamId");
Expand All @@ -107,8 +112,9 @@ private DataConfig getDataConfig(int taskId, String pattern, boolean retry, Long
fileTaskConfig.setStartTime(startTime);
fileTaskConfig.setEndTime(endTime);
// mix: login|87601|968|67826|23579 or login|a=b&c=d&x=y&asdf
fileTaskConfig.setDataContentStyle("mix");
fileTaskConfig.setDataContentStyle(dataContentStyle);
fileTaskConfig.setDataSeparator("|");
fileTaskConfig.setFilterStreams(filterStreams);
dataConfig.setExtParams(GSON.toJson(fileTaskConfig));
return dataConfig;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ public static void setup() {
helper = new AgentBaseTestsHelper(TestInstanceManager.class.getName()).setupAgentHome();
String pattern = helper.getTestRootDir() + "/YYYYMMDDhh_[0-9]+.txt";
Store basicInstanceStore = TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_INSTANCE);
taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L, TaskStateEnum.RUNNING, CycleUnitType.HOUR,
"GMT+6:00");
taskProfile = helper.getTaskProfile(1, pattern, "csv", false, 0L, 0L, TaskStateEnum.RUNNING, CycleUnitType.HOUR,
"GMT+6:00", null);
Store taskBasicStore = TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_TASK);
TaskStore taskStore = new TaskStore(taskBasicStore);
taskStore.storeTask(taskProfile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ public static void setUp() throws Exception {
String fileName = LOADER.getResource("test/20230928_1.txt").getPath();
helper = new AgentBaseTestsHelper(TestSenderManager.class.getName()).setupAgentHome();
String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
TaskProfile taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L, TaskStateEnum.RUNNING, "D",
"GMT+8:00");
TaskProfile taskProfile = helper.getTaskProfile(1, pattern, "csv", false, 0L, 0L, TaskStateEnum.RUNNING, "D",
"GMT+8:00", null);
profile = taskProfile.createInstanceProfile("", fileName,
taskProfile.getCycleUnit(), "20230927", AgentUtils.getCurrentTime());
kafkaSink = new MockSink();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ public static void setUp() throws Exception {
String fileName = LOADER.getResource("test/20230928_1.txt").getPath();
helper = new AgentBaseTestsHelper(TestSenderManager.class.getName()).setupAgentHome();
String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
TaskProfile taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L, TaskStateEnum.RUNNING, "D",
"GMT+8:00");
TaskProfile taskProfile = helper.getTaskProfile(1, pattern, "csv", false, 0L, 0L, TaskStateEnum.RUNNING, "D",
"GMT+8:00", null);
profile = taskProfile.createInstanceProfile("", fileName,
taskProfile.getCycleUnit(), "20230927", AgentUtils.getCurrentTime());
pulsarSink = new MockSink();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ public static void setup() {
String fileName = LOADER.getResource("test/20230928_1.txt").getPath();
helper = new AgentBaseTestsHelper(TestSenderManager.class.getName()).setupAgentHome();
String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
TaskProfile taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L, TaskStateEnum.RUNNING, "D",
"GMT+8:00");
TaskProfile taskProfile = helper.getTaskProfile(1, pattern, "csv", false, 0L, 0L, TaskStateEnum.RUNNING, "D",
"GMT+8:00", null);
profile = taskProfile.createInstanceProfile("", fileName,
taskProfile.getCycleUnit(), "20230927", AgentUtils.getCurrentTime());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;

import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_QUEUE_PERMIT;
Expand Down Expand Up @@ -77,8 +78,9 @@ public static void setup() {
private LogFileSource getSource(int taskId, long offset) {
try {
String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
TaskProfile taskProfile = helper.getTaskProfile(taskId, pattern, false, 0L, 0L, TaskStateEnum.RUNNING, "D",
"GMT+8:00");
TaskProfile taskProfile = helper.getTaskProfile(taskId, pattern, "csv", false, 0L, 0L,
TaskStateEnum.RUNNING, "D",
"GMT+8:00", Arrays.asList(""));
String fileName = LOADER.getResource("test/20230928_1.txt").getPath();
InstanceProfile instanceProfile = taskProfile.createInstanceProfile("",
fileName, taskProfile.getCycleUnit(), "20230928", AgentUtils.getCurrentTime());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.powermock.api.mockito.PowerMockito.mockStatic;
Expand Down Expand Up @@ -134,8 +136,8 @@ private SQLServerSource getSource() {
final String tableName = "test_source";
final String serverName = "server-01";

TaskProfile taskProfile = helper.getTaskProfile(1, "", false, 0L, 0L, TaskStateEnum.RUNNING, "D",
"GMT+8:00");
TaskProfile taskProfile = helper.getTaskProfile(1, "", "csv", false, 0L, 0L, TaskStateEnum.RUNNING, "D",
"GMT+8:00", null);
instanceProfile = taskProfile.createInstanceProfile("",
"", taskProfile.getCycleUnit(), "20240725", AgentUtils.getCurrentTime());
instanceProfile.set(CommonConstants.PROXY_INLONG_GROUP_ID, groupId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,9 @@ private void doTest(int taskId, List<String> resources, String pattern, String c
for (int i = 0; i < resources.size(); i++) {
resourceName.add(LOADER.getResource(resources.get(i)).getPath());
}
TaskProfile taskProfile = helper.getTaskProfile(taskId, pattern, true, 0L, 0L, TaskStateEnum.RUNNING, cycle,
"GMT+8:00");
TaskProfile taskProfile = helper.getTaskProfile(taskId, pattern, "csv", true, 0L, 0L, TaskStateEnum.RUNNING,
cycle,
"GMT+8:00", null);
LogFileTask dayTask = null;
final List<String> fileName = new ArrayList();
final List<String> dataTime = new ArrayList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ public void testTaskManager() {
manager = new TaskManager();
TaskStore taskStore = manager.getTaskStore();
for (int i = 1; i <= 10; i++) {
TaskProfile taskProfile = helper.getTaskProfile(i, pattern, false, 0L, 0L, TaskStateEnum.RUNNING,
"D", "GMT+8:00");
TaskProfile taskProfile = helper.getTaskProfile(i, pattern, "csv", false, 0L, 0L, TaskStateEnum.RUNNING,
"D", "GMT+8:00", null);
taskProfile.setTaskClass(MockTask.class.getCanonicalName());
taskStore.storeTask(taskProfile);
}
Expand All @@ -74,8 +74,8 @@ public void testTaskManager() {
Assert.assertTrue("manager start error", false);
}

TaskProfile taskProfile1 = helper.getTaskProfile(100, pattern, false, 0L, 0L, TaskStateEnum.RUNNING,
"D", "GMT+8:00");
TaskProfile taskProfile1 = helper.getTaskProfile(100, pattern, "csv", false, 0L, 0L, TaskStateEnum.RUNNING,
"D", "GMT+8:00", null);
String taskId1 = taskProfile1.getTaskId();
taskProfile1.setTaskClass(MockTask.class.getCanonicalName());
List<TaskProfile> taskProfiles1 = new ArrayList<>();
Expand All @@ -99,8 +99,8 @@ public void testTaskManager() {
Assert.assertTrue(manager.getTaskProfile(taskId1).getState() == TaskStateEnum.RUNNING);

// test delete
TaskProfile taskProfile2 = helper.getTaskProfile(200, pattern, false, 0L, 0L, TaskStateEnum.RUNNING,
"D", "GMT+8:00");
TaskProfile taskProfile2 = helper.getTaskProfile(200, pattern, "csv", false, 0L, 0L, TaskStateEnum.RUNNING,
"D", "GMT+8:00", null);
taskProfile2.setTaskClass(MockTask.class.getCanonicalName());
List<TaskProfile> taskProfiles2 = new ArrayList<>();
taskProfiles2.add(taskProfile2);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ok|hello line-end-symbol aa
no|world line-end-symbol
ok|agent line-end-symbol

0 comments on commit 433307c

Please sign in to comment.