From 0abbc5d006b3a0f0f368d1b8ac42a763d37b7f99 Mon Sep 17 00:00:00 2001 From: justinwwhuang Date: Thu, 26 Oct 2023 11:39:32 +0800 Subject: [PATCH] [INLONG-9112][Agent] Add task and instance profile (#9113) --- .../inlong/agent/conf/InstanceProfile.java | 169 ++++++ .../apache/inlong/agent/conf/TaskProfile.java | 119 ++++ .../apache/inlong/agent/pojo/FileTask.java | 153 +++++ .../inlong/agent/pojo/TaskProfileDto.java | 549 ++++++++++++++++++ 4 files changed, 990 insertions(+) create mode 100644 inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java create mode 100644 inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java create mode 100644 inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileTask.java create mode 100644 inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java new file mode 100644 index 00000000000..024b7674eb6 --- /dev/null +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.conf; + +import org.apache.inlong.agent.constant.TaskConstants; +import org.apache.inlong.agent.utils.file.FileUtils; +import org.apache.inlong.common.enums.InstanceStateEnum; +import org.apache.inlong.common.pojo.dataproxy.DataProxyTopicInfo; +import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo; + +import com.google.common.collect.ComparisonChain; +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +import static org.apache.inlong.agent.constant.TaskConstants.INSTANCE_STATE; +import static org.apache.inlong.agent.constant.TaskConstants.JOB_MQ_ClUSTERS; +import static org.apache.inlong.agent.constant.TaskConstants.JOB_MQ_TOPIC; +import static org.apache.inlong.agent.constant.TaskConstants.TASK_RETRY; + +/** + * job profile which contains details describing properties of one job. + */ +public class InstanceProfile extends AbstractConfiguration implements Comparable { + + private static final Logger LOGGER = LoggerFactory.getLogger(InstanceProfile.class); + private static final Gson GSON = new Gson(); + + /** + * parse json string to configuration instance. + * + * @return job configuration + */ + public static InstanceProfile parseJsonStr(String jsonStr) { + InstanceProfile conf = new InstanceProfile(); + conf.loadJsonStrResource(jsonStr); + return conf; + } + + public String toJsonStr() { + return GSON.toJson(getConfigStorage()); + } + + public void setInstanceClass(String className) { + set(TaskConstants.INSTANCE_CLASS, className); + } + + public String getInstanceClass() { + return get(TaskConstants.INSTANCE_CLASS); + } + + public String getTaskId() { + return get(TaskConstants.TASK_ID); + } + + public String getInstanceId() { + return get(TaskConstants.INSTANCE_ID); + } + + public String getSourceClass() { + return get(TaskConstants.TASK_SOURCE); + } + + public String getSinkClass() { + return get(TaskConstants.TASK_SINK); + } + + public InstanceStateEnum getState() { + int value = getInt(INSTANCE_STATE, InstanceStateEnum.DEFAULT.ordinal()); + return InstanceStateEnum.getTaskState(value); + } + + public void setState(InstanceStateEnum state) { + setInt(INSTANCE_STATE, state.ordinal()); + } + + @Override + public boolean allRequiredKeyExist() { + return true; + } + + /** + * get MQClusterInfo list from config + */ + public List getMqClusters() { + List result = null; + String mqClusterStr = get(JOB_MQ_ClUSTERS); + if (StringUtils.isNotBlank(mqClusterStr)) { + result = GSON.fromJson(mqClusterStr, new TypeToken>() { + }.getType()); + } + return result; + } + + /** + * get mqTopic from config + */ + public DataProxyTopicInfo getMqTopic() { + DataProxyTopicInfo result = null; + String topicStr = get(JOB_MQ_TOPIC); + if (StringUtils.isNotBlank(topicStr)) { + result = GSON.fromJson(topicStr, DataProxyTopicInfo.class); + } + return result; + } + + public void setCreateTime(Long time) { + setLong(TaskConstants.INSTANCE_CREATE_TIME, time); + } + + public Long getCreateTime() { + return getLong(TaskConstants.INSTANCE_CREATE_TIME, 0); + } + + public void setModifyTime(Long time) { + setLong(TaskConstants.INSTANCE_MODIFY_TIME, time); + } + + public Long getModifyTime() { + return getLong(TaskConstants.INSTANCE_MODIFY_TIME, 0); + } + + public void setInstanceId(String instanceId) { + set(TaskConstants.INSTANCE_ID, instanceId); + } + + public void setDataTime(String dataTime) { + set(TaskConstants.JOB_DATA_TIME, dataTime); + } + + public String getDataTime() { + return get(TaskConstants.JOB_DATA_TIME); + } + + @Override + public int compareTo(InstanceProfile object) { + int ret = ComparisonChain.start() + .compare(getDataTime(), object.getDataTime()) + .compare(FileUtils.getFileCreationTime(getInstanceId()), + FileUtils.getFileCreationTime(object.getInstanceId())) + .compare(FileUtils.getFileLastModifyTime(getInstanceId()), + FileUtils.getFileLastModifyTime(object.getInstanceId())) + .result(); + return ret; + } + + public boolean isRetry() { + return getBoolean(TASK_RETRY, false); + } +} diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java new file mode 100644 index 00000000000..1040afa88a2 --- /dev/null +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.conf; + +import org.apache.inlong.agent.constant.TaskConstants; +import org.apache.inlong.agent.pojo.TaskProfileDto; +import org.apache.inlong.agent.utils.AgentUtils; +import org.apache.inlong.common.enums.InstanceStateEnum; +import org.apache.inlong.common.enums.TaskStateEnum; +import org.apache.inlong.common.pojo.agent.DataConfig; + +import com.google.gson.Gson; + +import static org.apache.inlong.agent.constant.TaskConstants.TASK_RETRY; +import static org.apache.inlong.agent.constant.TaskConstants.TASK_STATE; + +/** + * job profile which contains details describing properties of one job. + */ +public class TaskProfile extends AbstractConfiguration { + + private static final Gson GSON = new Gson(); + + /** + * Get a TaskProfile from a DataConfig + */ + public static TaskProfile convertToTaskProfile(DataConfig dataConfig) { + if (dataConfig == null) { + return null; + } + return TaskProfileDto.convertToTaskProfile(dataConfig); + } + + public String getTaskId() { + return get(TaskConstants.TASK_ID); + } + + public String getCycleUnit() { + return get(TaskConstants.TASK_CYCLE_UNIT); + } + + public String getTimeOffset() { + return get(TaskConstants.TASK_FILE_TIME_OFFSET); + } + + public TaskStateEnum getState() { + return TaskStateEnum.getTaskState(getInt(TASK_STATE)); + } + + public void setState(TaskStateEnum state) { + setInt(TASK_STATE, state.ordinal()); + } + + public boolean isRetry() { + return getBoolean(TASK_RETRY, false); + } + + public String getTaskClass() { + return get(TaskConstants.TASK_CLASS); + } + + public void setTaskClass(String className) { + set(TaskConstants.TASK_CLASS, className); + } + + /** + * parse json string to configuration instance. + * + * @return job configuration + */ + public static TaskProfile parseJsonStr(String jsonStr) { + TaskProfile conf = new TaskProfile(); + conf.loadJsonStrResource(jsonStr); + return conf; + } + + /** + * check whether required keys exists. + * + * @return return true if all required keys exists else false. + */ + @Override + public boolean allRequiredKeyExist() { + return hasKey(TaskConstants.TASK_ID) && hasKey(TaskConstants.TASK_SOURCE) + && hasKey(TaskConstants.TASK_SINK) && hasKey(TaskConstants.TASK_CHANNEL) + && hasKey(TaskConstants.TASK_GROUP_ID) && hasKey(TaskConstants.TASK_STREAM_ID) + && hasKey(TaskConstants.TASK_CYCLE_UNIT); + } + + public String toJsonStr() { + return GSON.toJson(getConfigStorage()); + } + + public InstanceProfile createInstanceProfile(String instanceClass, String fileName, String dataTime) { + InstanceProfile instanceProfile = InstanceProfile.parseJsonStr(toJsonStr()); + instanceProfile.setInstanceClass(instanceClass); + instanceProfile.setInstanceId(fileName); + instanceProfile.setDataTime(dataTime); + instanceProfile.setCreateTime(AgentUtils.getCurrentTime()); + instanceProfile.setModifyTime(AgentUtils.getCurrentTime()); + instanceProfile.setState(InstanceStateEnum.DEFAULT); + return instanceProfile; + } +} diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileTask.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileTask.java new file mode 100644 index 00000000000..7942e74befb --- /dev/null +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileTask.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.pojo; + +import lombok.Data; + +import java.util.List; +import java.util.Map; + +@Data +public class FileTask { + + private Dir dir; + private Thread thread; + private Integer id; + private String cycleUnit; + private Boolean retry; + private Long startTime; + private Long endTime; + private String timeOffset; + private String addictiveString; + private String collectType; + private Line line; + private Integer maxFileCount; + + // INCREMENT + // FULL + private String contentCollectType; + + private String envList; + + // JSON string, the content format is List> + private String metaFields; + + private String dataSeparator; + + // JSON string, the content format is Map + private String filterMetaByLabels; + + // JSON string, the content format is Map + private String properties; + + // Monitor interval for file + private Long monitorInterval; + + // Monitor switch, 1 true and 0 false + private Integer monitorStatus; + + // Monitor expire time and the time in milliseconds + private Long monitorExpire; + + @Data + public static class Dir { + + private String patterns; + + private String blackList; + } + + @Data + public static class Running { + + private String core; + } + + @Data + public static class Thread { + + private Running running; + } + + @Data + public static class Line { + + private String endPattern; + } + + @Data + public static class FileTaskConfig { + + private String cycleUnit; + + private Boolean retry; + + private Long startTime; + + private Long endTime; + + private String pattern; + + private String blackList; + // '1m' means one minute after, '-1m' means one minute before + // '1h' means one hour after, '-1h' means one hour before + // '1d' means one day after, '-1d' means one day before + // Null means from current timestamp + private String timeOffset; + // For example: a=b&c=b&e=f + private String additionalAttr; + + private String collectType; + + private String lineEndPattern; + + // Type of file content, for example: FULL, INCREMENT + private String contentCollectType; + + // File needs to collect environment information, for example: kubernetes + private String envList; + // Metadata of data, for example: + // [{data:field1,field2},{kubernetes:namespace,labels,name,uuid}] and so on + private List> metaFields; + // Type of data result for column separator + // CSV format, set this parameter to a custom separator: , | : + // Json format, set this parameter to json + private String dataContentStyle; + + // Column separator of data source + private String dataSeparator; + + // Metadata filters by label, special parameters for K8S + private Map filterMetaByLabels; + + // Properties for file + private Map properties; + + // Monitor interval for file + private Long monitorInterval; + + // Monitor switch, 1 true and 0 false + private Integer monitorStatus; + + // Monitor expire time and the time in milliseconds + private Long monitorExpire; + + private Integer maxFileCount; + } + +} diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java new file mode 100644 index 00000000000..964ac1cf029 --- /dev/null +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java @@ -0,0 +1,549 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.pojo; + +import org.apache.inlong.agent.conf.AgentConfiguration; +import org.apache.inlong.agent.conf.TaskProfile; +import org.apache.inlong.agent.pojo.FileTask.FileTaskConfig; +import org.apache.inlong.agent.pojo.FileTask.Line; +import org.apache.inlong.common.constant.MQType; +import org.apache.inlong.common.enums.TaskTypeEnum; +import org.apache.inlong.common.pojo.agent.DataConfig; + +import com.google.gson.Gson; +import lombok.Data; + +import static java.util.Objects.requireNonNull; +import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_HOST; +import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PORT; +import static org.apache.inlong.agent.constant.TaskConstants.SYNC_SEND_OPEN; +import static org.apache.inlong.common.enums.DataReportTypeEnum.NORMAL_SEND_TO_DATAPROXY; + +@Data +public class TaskProfileDto { + + public static final String DEFAULT_FILE_TASK = "org.apache.inlong.agent.plugin.task.filecollect.LogFileCollectTask"; + public static final String DEFAULT_CHANNEL = "org.apache.inlong.agent.plugin.channel.MemoryChannel"; + public static final String MANAGER_JOB = "MANAGER_JOB"; + public static final String DEFAULT_DATAPROXY_SINK = "org.apache.inlong.agent.plugin.sinks.ProxySink"; + public static final String FILE_DATAPROXY_SINK = + "org.apache.inlong.agent.plugin.sinks.filecollect.ProxySink"; + public static final String PULSAR_SINK = "org.apache.inlong.agent.plugin.sinks.PulsarSink"; + public static final String KAFKA_SINK = "org.apache.inlong.agent.plugin.sinks.KafkaSink"; + + /** + * file source + */ + public static final String DEFAULT_SOURCE = "org.apache.inlong.agent.plugin.sources.LogFileSource"; + /** + * binlog source + */ + public static final String BINLOG_SOURCE = "org.apache.inlong.agent.plugin.sources.BinlogSource"; + /** + * kafka source + */ + public static final String KAFKA_SOURCE = "org.apache.inlong.agent.plugin.sources.KafkaSource"; + /** + * PostgreSQL source + */ + public static final String POSTGRESQL_SOURCE = "org.apache.inlong.agent.plugin.sources.PostgreSQLSource"; + /** + * mongo source + */ + public static final String MONGO_SOURCE = "org.apache.inlong.agent.plugin.sources.MongoDBSource"; + /** + * oracle source + */ + public static final String ORACLE_SOURCE = "org.apache.inlong.agent.plugin.sources.OracleSource"; + /** + * redis source + */ + public static final String REDIS_SOURCE = "org.apache.inlong.agent.plugin.sources.RedisSource"; + /** + * mqtt source + */ + public static final String MQTT_SOURCE = "org.apache.inlong.agent.plugin.sources.MqttSource"; + /** + * sqlserver source + */ + public static final String SQLSERVER_SOURCE = "org.apache.inlong.agent.plugin.sources.SQLServerSource"; + + private static final Gson GSON = new Gson(); + + private Task task; + private Proxy proxy; + + private static BinlogJob getBinlogJob(DataConfig dataConfigs) { + BinlogJob.BinlogJobTaskConfig binlogJobTaskConfig = GSON.fromJson(dataConfigs.getExtParams(), + BinlogJob.BinlogJobTaskConfig.class); + + BinlogJob binlogJob = new BinlogJob(); + binlogJob.setHostname(binlogJobTaskConfig.getHostname()); + binlogJob.setPassword(binlogJobTaskConfig.getPassword()); + binlogJob.setUser(binlogJobTaskConfig.getUser()); + binlogJob.setTableWhiteList(binlogJobTaskConfig.getTableWhiteList()); + binlogJob.setDatabaseWhiteList(binlogJobTaskConfig.getDatabaseWhiteList()); + binlogJob.setSchema(binlogJobTaskConfig.getIncludeSchema()); + binlogJob.setPort(binlogJobTaskConfig.getPort()); + binlogJob.setOffsets(dataConfigs.getSnapshot()); + binlogJob.setDdl(binlogJobTaskConfig.getMonitoredDdl()); + binlogJob.setServerTimezone(binlogJobTaskConfig.getServerTimezone()); + + BinlogJob.Offset offset = new BinlogJob.Offset(); + offset.setIntervalMs(binlogJobTaskConfig.getIntervalMs()); + offset.setFilename(binlogJobTaskConfig.getOffsetFilename()); + offset.setSpecificOffsetFile(binlogJobTaskConfig.getSpecificOffsetFile()); + offset.setSpecificOffsetPos(binlogJobTaskConfig.getSpecificOffsetPos()); + + binlogJob.setOffset(offset); + + BinlogJob.Snapshot snapshot = new BinlogJob.Snapshot(); + snapshot.setMode(binlogJobTaskConfig.getSnapshotMode()); + + binlogJob.setSnapshot(snapshot); + + BinlogJob.History history = new BinlogJob.History(); + history.setFilename(binlogJobTaskConfig.getHistoryFilename()); + + binlogJob.setHistory(history); + + return binlogJob; + } + + private static FileTask getFileJob(DataConfig dataConfig) { + FileTask fileTask = new FileTask(); + fileTask.setId(dataConfig.getTaskId()); + + FileTaskConfig taskConfig = GSON.fromJson(dataConfig.getExtParams(), + FileTaskConfig.class); + + FileTask.Dir dir = new FileTask.Dir(); + dir.setPatterns(taskConfig.getPattern()); + dir.setBlackList(taskConfig.getBlackList()); + fileTask.setDir(dir); + fileTask.setCollectType(taskConfig.getCollectType()); + fileTask.setContentCollectType(taskConfig.getContentCollectType()); + fileTask.setDataSeparator(taskConfig.getDataSeparator()); + fileTask.setMaxFileCount(taskConfig.getMaxFileCount()); + fileTask.setRetry(taskConfig.getRetry()); + fileTask.setCycleUnit(taskConfig.getCycleUnit()); + fileTask.setStartTime(taskConfig.getStartTime()); + fileTask.setEndTime(taskConfig.getEndTime()); + fileTask.setProperties(GSON.toJson(taskConfig.getProperties())); + if (taskConfig.getTimeOffset() != null) { + fileTask.setTimeOffset(taskConfig.getTimeOffset()); + } + + if (taskConfig.getAdditionalAttr() != null) { + fileTask.setAddictiveString(taskConfig.getAdditionalAttr()); + } + + if (null != taskConfig.getLineEndPattern()) { + FileTask.Line line = new Line(); + line.setEndPattern(taskConfig.getLineEndPattern()); + fileTask.setLine(line); + } + + if (null != taskConfig.getEnvList()) { + fileTask.setEnvList(taskConfig.getEnvList()); + } + + if (null != taskConfig.getMetaFields()) { + fileTask.setMetaFields(GSON.toJson(taskConfig.getMetaFields())); + } + + if (null != taskConfig.getFilterMetaByLabels()) { + fileTask.setFilterMetaByLabels(GSON.toJson(taskConfig.getFilterMetaByLabels())); + } + + if (null != taskConfig.getMonitorInterval()) { + fileTask.setMonitorInterval(taskConfig.getMonitorInterval()); + } + + if (null != taskConfig.getMonitorStatus()) { + fileTask.setMonitorStatus(taskConfig.getMonitorStatus()); + } + return fileTask; + } + + private static KafkaJob getKafkaJob(DataConfig dataConfigs) { + + KafkaJob.KafkaJobTaskConfig kafkaJobTaskConfig = GSON.fromJson(dataConfigs.getExtParams(), + KafkaJob.KafkaJobTaskConfig.class); + KafkaJob kafkaJob = new KafkaJob(); + + KafkaJob.Bootstrap bootstrap = new KafkaJob.Bootstrap(); + bootstrap.setServers(kafkaJobTaskConfig.getBootstrapServers()); + kafkaJob.setBootstrap(bootstrap); + KafkaJob.Partition partition = new KafkaJob.Partition(); + partition.setOffset(dataConfigs.getSnapshot()); + kafkaJob.setPartition(partition); + KafkaJob.Group group = new KafkaJob.Group(); + group.setId(kafkaJobTaskConfig.getGroupId()); + kafkaJob.setGroup(group); + KafkaJob.RecordSpeed recordSpeed = new KafkaJob.RecordSpeed(); + recordSpeed.setLimit(kafkaJobTaskConfig.getRecordSpeedLimit()); + kafkaJob.setRecordSpeed(recordSpeed); + KafkaJob.ByteSpeed byteSpeed = new KafkaJob.ByteSpeed(); + byteSpeed.setLimit(kafkaJobTaskConfig.getByteSpeedLimit()); + kafkaJob.setByteSpeed(byteSpeed); + kafkaJob.setAutoOffsetReset(kafkaJobTaskConfig.getAutoOffsetReset()); + + kafkaJob.setTopic(kafkaJobTaskConfig.getTopic()); + + return kafkaJob; + } + + private static PostgreSQLJob getPostgresJob(DataConfig dataConfigs) { + PostgreSQLJob.PostgreSQLJobConfig config = GSON.fromJson(dataConfigs.getExtParams(), + PostgreSQLJob.PostgreSQLJobConfig.class); + PostgreSQLJob postgreSQLJob = new PostgreSQLJob(); + + postgreSQLJob.setUser(config.getUsername()); + postgreSQLJob.setPassword(config.getPassword()); + postgreSQLJob.setHostname(config.getHostname()); + postgreSQLJob.setPort(config.getPort()); + postgreSQLJob.setDbname(config.getDatabase()); + postgreSQLJob.setServername(config.getSchema()); + postgreSQLJob.setPluginname(config.getDecodingPluginName()); + postgreSQLJob.setTableNameList(config.getTableNameList()); + postgreSQLJob.setServerTimeZone(config.getServerTimeZone()); + postgreSQLJob.setScanStartupMode(config.getScanStartupMode()); + postgreSQLJob.setPrimaryKey(config.getPrimaryKey()); + + return postgreSQLJob; + } + + private static RedisJob getRedisJob(DataConfig dataConfig) { + RedisJob.RedisJobConfig config = GSON.fromJson(dataConfig.getExtParams(), RedisJob.RedisJobConfig.class); + RedisJob redisJob = new RedisJob(); + + redisJob.setAuthUser(config.getUsername()); + redisJob.setAuthPassword(config.getPassword()); + redisJob.setHostname(config.getHostname()); + redisJob.setPort(config.getPort()); + redisJob.setSsl(config.getSsl()); + redisJob.setReadTimeout(config.getTimeout()); + redisJob.setQueueSize(config.getQueueSize()); + redisJob.setReplId(config.getReplId()); + + return redisJob; + } + + private static MongoJob getMongoJob(DataConfig dataConfigs) { + + MongoJob.MongoJobTaskConfig config = GSON.fromJson(dataConfigs.getExtParams(), + MongoJob.MongoJobTaskConfig.class); + MongoJob mongoJob = new MongoJob(); + + mongoJob.setHosts(config.getHosts()); + mongoJob.setUser(config.getUsername()); + mongoJob.setPassword(config.getPassword()); + mongoJob.setDatabaseIncludeList(config.getDatabaseIncludeList()); + mongoJob.setDatabaseExcludeList(config.getDatabaseExcludeList()); + mongoJob.setCollectionIncludeList(config.getCollectionIncludeList()); + mongoJob.setCollectionExcludeList(config.getCollectionExcludeList()); + mongoJob.setFieldExcludeList(config.getFieldExcludeList()); + mongoJob.setConnectTimeoutInMs(config.getConnectTimeoutInMs()); + mongoJob.setQueueSize(config.getQueueSize()); + mongoJob.setCursorMaxAwaitTimeInMs(config.getCursorMaxAwaitTimeInMs()); + mongoJob.setSocketTimeoutInMs(config.getSocketTimeoutInMs()); + mongoJob.setSelectionTimeoutInMs(config.getSelectionTimeoutInMs()); + mongoJob.setFieldRenames(config.getFieldRenames()); + mongoJob.setMembersAutoDiscover(config.getMembersAutoDiscover()); + mongoJob.setConnectMaxAttempts(config.getConnectMaxAttempts()); + mongoJob.setConnectBackoffMaxDelayInMs(config.getConnectBackoffMaxDelayInMs()); + mongoJob.setConnectBackoffInitialDelayInMs(config.getConnectBackoffInitialDelayInMs()); + mongoJob.setInitialSyncMaxThreads(config.getInitialSyncMaxThreads()); + mongoJob.setSslInvalidHostnameAllowed(config.getSslInvalidHostnameAllowed()); + mongoJob.setSslEnabled(config.getSslEnabled()); + mongoJob.setPollIntervalInMs(config.getPollIntervalInMs()); + + MongoJob.Offset offset = new MongoJob.Offset(); + offset.setFilename(config.getOffsetFilename()); + offset.setSpecificOffsetFile(config.getSpecificOffsetFile()); + offset.setSpecificOffsetPos(config.getSpecificOffsetPos()); + mongoJob.setOffset(offset); + + MongoJob.Snapshot snapshot = new MongoJob.Snapshot(); + snapshot.setMode(config.getSnapshotMode()); + mongoJob.setSnapshot(snapshot); + + MongoJob.History history = new MongoJob.History(); + history.setFilename(config.getHistoryFilename()); + mongoJob.setHistory(history); + + return mongoJob; + } + + private static OracleJob getOracleJob(DataConfig dataConfigs) { + OracleJob.OracleJobConfig config = GSON.fromJson(dataConfigs.getExtParams(), + OracleJob.OracleJobConfig.class); + OracleJob oracleJob = new OracleJob(); + oracleJob.setUser(config.getUser()); + oracleJob.setHostname(config.getHostname()); + oracleJob.setPassword(config.getPassword()); + oracleJob.setPort(config.getPort()); + oracleJob.setServerName(config.getServerName()); + oracleJob.setDbname(config.getDbname()); + + OracleJob.Offset offset = new OracleJob.Offset(); + offset.setFilename(config.getOffsetFilename()); + offset.setSpecificOffsetFile(config.getSpecificOffsetFile()); + offset.setSpecificOffsetPos(config.getSpecificOffsetPos()); + oracleJob.setOffset(offset); + + OracleJob.Snapshot snapshot = new OracleJob.Snapshot(); + snapshot.setMode(config.getSnapshotMode()); + oracleJob.setSnapshot(snapshot); + + OracleJob.History history = new OracleJob.History(); + history.setFilename(config.getHistoryFilename()); + oracleJob.setHistory(history); + + return oracleJob; + } + + private static SqlServerJob getSqlServerJob(DataConfig dataConfigs) { + SqlServerJob.SqlserverJobConfig config = GSON.fromJson(dataConfigs.getExtParams(), + SqlServerJob.SqlserverJobConfig.class); + SqlServerJob sqlServerJob = new SqlServerJob(); + sqlServerJob.setUser(config.getUsername()); + sqlServerJob.setHostname(config.getHostname()); + sqlServerJob.setPassword(config.getPassword()); + sqlServerJob.setPort(config.getPort()); + sqlServerJob.setServerName(config.getSchemaName()); + sqlServerJob.setDbname(config.getDatabase()); + + SqlServerJob.Offset offset = new SqlServerJob.Offset(); + offset.setFilename(config.getOffsetFilename()); + offset.setSpecificOffsetFile(config.getSpecificOffsetFile()); + offset.setSpecificOffsetPos(config.getSpecificOffsetPos()); + sqlServerJob.setOffset(offset); + + SqlServerJob.Snapshot snapshot = new SqlServerJob.Snapshot(); + snapshot.setMode(config.getSnapshotMode()); + sqlServerJob.setSnapshot(snapshot); + + SqlServerJob.History history = new SqlServerJob.History(); + history.setFilename(config.getHistoryFilename()); + sqlServerJob.setHistory(history); + + return sqlServerJob; + } + + public static MqttJob getMqttJob(DataConfig dataConfigs) { + MqttJob.MqttJobConfig config = GSON.fromJson(dataConfigs.getExtParams(), + MqttJob.MqttJobConfig.class); + MqttJob mqttJob = new MqttJob(); + + mqttJob.setServerURI(config.getServerURI()); + mqttJob.setUserName(config.getUsername()); + mqttJob.setPassword(config.getPassword()); + mqttJob.setTopic(config.getTopic()); + mqttJob.setConnectionTimeOut(config.getConnectionTimeOut()); + mqttJob.setKeepAliveInterval(config.getKeepAliveInterval()); + mqttJob.setQos(config.getQos()); + mqttJob.setCleanSession(config.getCleanSession()); + mqttJob.setClientIdPrefix(config.getClientId()); + mqttJob.setQueueSize(config.getQueueSize()); + mqttJob.setAutomaticReconnect(config.getAutomaticReconnect()); + mqttJob.setMqttVersion(config.getMqttVersion()); + + return mqttJob; + } + + private static Proxy getProxy(DataConfig dataConfigs) { + Proxy proxy = new Proxy(); + Manager manager = new Manager(); + AgentConfiguration agentConf = AgentConfiguration.getAgentConf(); + manager.setHost(agentConf.get(AGENT_MANAGER_VIP_HTTP_HOST)); + manager.setPort(agentConf.get(AGENT_MANAGER_VIP_HTTP_PORT)); + proxy.setInlongGroupId(dataConfigs.getInlongGroupId()); + proxy.setInlongStreamId(dataConfigs.getInlongStreamId()); + proxy.setManager(manager); + if (null != dataConfigs.getSyncSend()) { + proxy.setSync(dataConfigs.getSyncSend() == SYNC_SEND_OPEN); + } + if (null != dataConfigs.getSyncPartitionKey()) { + proxy.setPartitionKey(dataConfigs.getSyncPartitionKey()); + } + return proxy; + } + + /** + * convert DataConfig to TaskProfile + */ + public static TaskProfile convertToTaskProfile(DataConfig dataConfig) { + if (!dataConfig.isValid()) { + throw new IllegalArgumentException("input dataConfig" + dataConfig + "is invalid please check"); + } + + TaskProfileDto profileDto = new TaskProfileDto(); + Proxy proxy = getProxy(dataConfig); + profileDto.setProxy(proxy); + Task task = new Task(); + + // common attribute + task.setId(String.valueOf(dataConfig.getTaskId())); + task.setGroupId(dataConfig.getInlongGroupId()); + task.setStreamId(dataConfig.getInlongStreamId()); + task.setChannel(DEFAULT_CHANNEL); + task.setIp(dataConfig.getIp()); + task.setOp(dataConfig.getOp()); + task.setDeliveryTime(dataConfig.getDeliveryTime()); + task.setUuid(dataConfig.getUuid()); + task.setVersion(dataConfig.getVersion()); + task.setState(dataConfig.getState()); + + // set sink type + if (dataConfig.getDataReportType() == NORMAL_SEND_TO_DATAPROXY.ordinal()) { + task.setSink(FILE_DATAPROXY_SINK); + task.setProxySend(false); + } else if (dataConfig.getDataReportType() == 1) { + task.setSink(FILE_DATAPROXY_SINK); + task.setProxySend(true); + } else { + String mqType = dataConfig.getMqClusters().get(0).getMqType(); + task.setMqClusters(GSON.toJson(dataConfig.getMqClusters())); + task.setTopicInfo(GSON.toJson(dataConfig.getTopicInfo())); + if (mqType.equals(MQType.PULSAR)) { + task.setSink(PULSAR_SINK); + } else if (mqType.equals(MQType.KAFKA)) { + task.setSink(KAFKA_SINK); + } else { + throw new IllegalArgumentException("input dataConfig" + dataConfig + "is invalid please check"); + } + } + TaskTypeEnum taskType = TaskTypeEnum.getTaskType(dataConfig.getTaskType()); + switch (requireNonNull(taskType)) { + case SQL: + case BINLOG: + BinlogJob binlogJob = getBinlogJob(dataConfig); + task.setBinlogJob(binlogJob); + task.setSource(BINLOG_SOURCE); + profileDto.setTask(task); + break; + case FILE: + task.setTaskClass(DEFAULT_FILE_TASK); + FileTask fileTask = getFileJob(dataConfig); + task.setFileTask(fileTask); + task.setSource(DEFAULT_SOURCE); + profileDto.setTask(task); + break; + case KAFKA: + KafkaJob kafkaJob = getKafkaJob(dataConfig); + task.setKafkaJob(kafkaJob); + task.setSource(KAFKA_SOURCE); + profileDto.setTask(task); + break; + case POSTGRES: + PostgreSQLJob postgreSQLJob = getPostgresJob(dataConfig); + task.setPostgreSQLJob(postgreSQLJob); + task.setSource(POSTGRESQL_SOURCE); + profileDto.setTask(task); + break; + case ORACLE: + OracleJob oracleJob = getOracleJob(dataConfig); + task.setOracleJob(oracleJob); + task.setSource(ORACLE_SOURCE); + profileDto.setTask(task); + break; + case SQLSERVER: + SqlServerJob sqlserverJob = getSqlServerJob(dataConfig); + task.setSqlserverJob(sqlserverJob); + task.setSource(SQLSERVER_SOURCE); + profileDto.setTask(task); + break; + case MONGODB: + MongoJob mongoJob = getMongoJob(dataConfig); + task.setMongoJob(mongoJob); + task.setSource(MONGO_SOURCE); + profileDto.setTask(task); + break; + case REDIS: + RedisJob redisJob = getRedisJob(dataConfig); + task.setRedisJob(redisJob); + task.setSource(REDIS_SOURCE); + profileDto.setTask(task); + break; + case MQTT: + MqttJob mqttJob = getMqttJob(dataConfig); + task.setMqttJob(mqttJob); + task.setSource(MQTT_SOURCE); + profileDto.setTask(task); + break; + case MOCK: + profileDto.setTask(task); + break; + default: + } + return TaskProfile.parseJsonStr(GSON.toJson(profileDto)); + } + + @Data + public static class Task { + + private String id; + private String groupId; + private String streamId; + private String ip; + private String source; + private String sink; + private String channel; + private String name; + private String op; + private String retryTime; + private String deliveryTime; + private String uuid; + private Integer version; + private boolean proxySend; + private String mqClusters; + private String topicInfo; + private String taskClass; + private Integer state; + + private FileTask fileTask; + private BinlogJob binlogJob; + private KafkaJob kafkaJob; + private PostgreSQLJob postgreSQLJob; + private OracleJob oracleJob; + private MongoJob mongoJob; + private RedisJob redisJob; + private MqttJob mqttJob; + private SqlServerJob sqlserverJob; + } + + @Data + public static class Manager { + + private String port; + private String host; + } + + @Data + public static class Proxy { + + private String inlongGroupId; + private String inlongStreamId; + private Manager manager; + private Boolean sync; + private String partitionKey; + } + +} \ No newline at end of file