Skip to content

Commit

Permalink
[INLONG-11286][Audit] Optimize the statistics of daily Audit data (ap…
Browse files Browse the repository at this point in the history
  • Loading branch information
doleyzi authored Oct 9, 2024
1 parent e97615d commit 778cbe6
Show file tree
Hide file tree
Showing 13 changed files with 413 additions and 278 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,6 @@ public class ConfigConstants {
public static final int DEFAULT_SOURCE_DB_SINK_BATCH = 1000;
public static final String KEY_CONFIG_UPDATE_INTERVAL_SECONDS = "config.update.interval.seconds";
public static final int DEFAULT_CONFIG_UPDATE_INTERVAL_SECONDS = 60;

public static final String KEY_ENABLE_MANAGE_PARTITIONS = "enable.manage.partitions";
public static final boolean DEFAULT_ENABLE_MANAGE_PARTITIONS = true;
public static final String KEY_CHECK_PARTITION_INTERVAL_HOURS = "check.partition.interval.hours";
public static final int DEFAULT_CHECK_PARTITION_INTERVAL_HOURS = 6;

Expand Down Expand Up @@ -113,4 +110,7 @@ public class ConfigConstants {
public static final int MAX_INIT_COUNT = 2;
public static final int RANDOM_BOUND = 10;

public static final String KEY_ENABLE_STAT_AUDIT_DAY = "enable.stat.audit.day";
public static final boolean DEFAULT_ENABLE_STAT_AUDIT_DAY = true;

}
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,12 @@ public class SqlConstants {
public static final String KEY_AUDIT_DATA_TEMP_DELETE_PARTITION_SQL = "audit.data.temp.delete.partition.sql";
public static final String DEFAULT_AUDIT_DATA_TEMP_DELETE_PARTITION_SQL =
"ALTER TABLE audit_data_temp DROP PARTITION %s";

public static final String KEY_AUDIT_DATA_TEMP_CHECK_PARTITION_SQL = "audit.data.temp.check.partition.sql";
public static final String DEFAULT_AUDIT_DATA_TEMP_CHECK_PARTITION_SQL =
"SELECT COUNT(*) AS count FROM INFORMATION_SCHEMA.PARTITIONS WHERE TABLE_NAME = 'audit_data_temp' and PARTITION_NAME = ?";
public static final String KEY_TABLE_AUDIT_DATA_CHECK_PARTITION_SQL = "audit.data.check.partition.sql";
public static final String DEFAULT_TABLE_AUDIT_DATA_CHECK_PARTITION_SQL =
"SELECT COUNT(*) AS count FROM INFORMATION_SCHEMA.PARTITIONS WHERE TABLE_NAME = '%s' and PARTITION_NAME = '%s'";
public static final String KEY_TABLE_AUDIT_DATA_DAY_ADD_PARTITION_SQL = "audit.data.day.add.partition.sql";
public static final String DEFAULT_TABLE_AUDIT_DATA_DAY_ADD_PARTITION_SQL =
"ALTER TABLE audit_data_day ADD PARTITION (PARTITION %s VALUES LESS THAN (TO_DAYS('%s')))";
public static final String TABLE_AUDIT_DATA_DAY = "audit_data_day";
public static final String TABLE_AUDIT_DATA_TEMP = "audit_data_temp";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.audit.entities;

import org.apache.inlong.audit.config.Configuration;

import lombok.Data;

import java.time.LocalDate;
import java.time.format.DateTimeFormatter;

import static org.apache.inlong.audit.config.SqlConstants.DEFAULT_TABLE_AUDIT_DATA_CHECK_PARTITION_SQL;
import static org.apache.inlong.audit.config.SqlConstants.KEY_TABLE_AUDIT_DATA_CHECK_PARTITION_SQL;

@Data
public class PartitionEntity {

private final String tableName;
private final String addPartitionStatement;
private final String deletePartitionStatement;
private final DateTimeFormatter FORMATTER_YYMMDDHH = DateTimeFormatter.ofPattern("yyyyMMdd");
private final DateTimeFormatter FORMATTER_YY_MM_DD_HH = DateTimeFormatter.ofPattern("yyyy-MM-dd");

private String formatPartitionName(LocalDate date) {
return "p" + date.format(FORMATTER_YYMMDDHH);
}

public PartitionEntity(String tableName, String addPartitionStatement, String deletePartitionStatement) {
this.tableName = tableName;
this.addPartitionStatement = addPartitionStatement;
this.deletePartitionStatement = deletePartitionStatement;
}

public String getAddPartitionSql(long daysToAdd) {
String partitionValue = LocalDate.now().plusDays(daysToAdd + 1).format(FORMATTER_YY_MM_DD_HH);
return String.format(addPartitionStatement, getAddPartitionName(daysToAdd), partitionValue);
}

public String getDeletePartitionSql(long daysToDelete) {
return String.format(deletePartitionStatement, getDeletePartitionName(daysToDelete));
}

public String getCheckPartitionSql(long partitionDay, boolean isDelete) {
String partitionName = isDelete ? getDeletePartitionName(partitionDay) : getAddPartitionName(partitionDay);
return String.format(Configuration.getInstance().get(KEY_TABLE_AUDIT_DATA_CHECK_PARTITION_SQL,
DEFAULT_TABLE_AUDIT_DATA_CHECK_PARTITION_SQL), tableName, partitionName);
}

public String getAddPartitionName(long daysToAdd) {
return formatPartitionName(LocalDate.now().plusDays(daysToAdd));
}

public String getDeletePartitionName(long daysToDelete) {
return formatPartitionName(LocalDate.now().minusDays(daysToDelete));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class SourceConfig {
private int statBackTimes;
private final String driverClassName;
private final String jdbcUrl;
private final String username;
private final String userName;
private final String password;
private boolean needJoin = false;

Expand All @@ -48,7 +48,7 @@ public SourceConfig(AuditCycle auditCycle,
this.statBackTimes = statBackTimes;
this.driverClassName = driverClassName;
this.jdbcUrl = jdbcUrl;
this.username = username;
this.userName = username;
this.password = password;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.inlong.audit.service.ApiService;
import org.apache.inlong.audit.service.ConfigService;
import org.apache.inlong.audit.service.EtlService;
import org.apache.inlong.audit.service.PartitionManager;
import org.apache.inlong.audit.utils.JdbcUtils;
import org.apache.inlong.common.util.NetworkUtils;

Expand All @@ -51,6 +52,8 @@ public static void main(String[] args) {
// Periodically obtain audit id and audit course configuration from DB
ConfigService.getInstance().start();

PartitionManager.getInstance().start();

// Etl service aggregate the data from the data source and store the aggregated data to the target storage
etlService.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,27 @@
import org.apache.inlong.audit.entities.JdbcConfig;
import org.apache.inlong.audit.entities.SinkConfig;
import org.apache.inlong.audit.entities.SourceConfig;
import org.apache.inlong.audit.entities.StatData;
import org.apache.inlong.audit.sink.AuditSink;
import org.apache.inlong.audit.sink.CacheSink;
import org.apache.inlong.audit.sink.JdbcSink;
import org.apache.inlong.audit.source.JdbcSource;
import org.apache.inlong.audit.utils.JdbcUtils;

import com.github.benmanes.caffeine.cache.Cache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.LinkedList;
import java.util.List;

import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_DATA_QUEUE_SIZE;
import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_ENABLE_STAT_AUDIT_DAY;
import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SELECTOR_SERVICE_ID;
import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SUMMARY_DAILY_STAT_BACK_TIMES;
import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_SUMMARY_REALTIME_STAT_BACK_TIMES;
import static org.apache.inlong.audit.config.ConfigConstants.KEY_DATA_QUEUE_SIZE;
import static org.apache.inlong.audit.config.ConfigConstants.KEY_ENABLE_STAT_AUDIT_DAY;
import static org.apache.inlong.audit.config.ConfigConstants.KEY_SELECTOR_SERVICE_ID;
import static org.apache.inlong.audit.config.ConfigConstants.KEY_SUMMARY_DAILY_STAT_BACK_TIMES;
import static org.apache.inlong.audit.config.ConfigConstants.KEY_SUMMARY_REALTIME_STAT_BACK_TIMES;
Expand All @@ -60,113 +65,70 @@
public class EtlService {

private static final Logger LOGGER = LoggerFactory.getLogger(EtlService.class);
private JdbcSource mysqlSourceOfTemp;
private JdbcSource mysqlSourceOfTenMinutesCache;
private JdbcSource mysqlSourceOfHalfHourCache;
private JdbcSource mysqlSourceOfHourCache;
private JdbcSink mysqlSinkOfDay;
private final List<JdbcSource> auditJdbcSources = new LinkedList<>();
private JdbcSink mysqlSinkOfTemp;
private CacheSink cacheSinkOfTenMinutesCache;
private CacheSink cacheSinkOfHalfHourCache;
private CacheSink cacheSinkOfHourCache;

// Statistics of original audit data
private final List<JdbcSource> originalSources = new LinkedList<>();
private final int queueSize;
private final int statBackTimes;
private final String serviceId;
private final Configuration configuration;

private final List<JdbcSource> dataFlowSources = new LinkedList<>();
private final List<AuditSink> dataFlowSinks = new LinkedList<>();

public EtlService() {
queueSize = Configuration.getInstance().get(KEY_DATA_QUEUE_SIZE,
configuration = Configuration.getInstance();
queueSize = configuration.get(KEY_DATA_QUEUE_SIZE,
DEFAULT_DATA_QUEUE_SIZE);
statBackTimes = Configuration.getInstance().get(KEY_SUMMARY_REALTIME_STAT_BACK_TIMES,
DEFAULT_SUMMARY_REALTIME_STAT_BACK_TIMES);
serviceId = Configuration.getInstance().get(KEY_SELECTOR_SERVICE_ID, DEFAULT_SELECTOR_SERVICE_ID);
serviceId = configuration.get(KEY_SELECTOR_SERVICE_ID, DEFAULT_SELECTOR_SERVICE_ID);
}

/**
* Start the etl service.
*/
public void start() {
mysqlToMysqlOfDay();
mysqlToTenMinutesCache();
mysqlToHalfHourCache();
mysqlToHourCache();
}

/**
* Aggregate data from mysql data source and store the aggregated data in the target mysql table.
* The audit data cycle is days,and stored in table of day.
*/
private void mysqlToMysqlOfDay() {
DataQueue dataQueue = new DataQueue(queueSize);

mysqlSourceOfTemp = new JdbcSource(dataQueue, buildMysqlSourceConfig(AuditCycle.DAY,
Configuration.getInstance().get(KEY_SUMMARY_DAILY_STAT_BACK_TIMES,
DEFAULT_SUMMARY_DAILY_STAT_BACK_TIMES)));
mysqlSourceOfTemp.start();

SinkConfig sinkConfig = buildMysqlSinkConfig(Configuration.getInstance().get(KEY_MYSQL_SINK_INSERT_DAY_SQL,
DEFAULT_MYSQL_SINK_INSERT_DAY_SQL));
mysqlSinkOfDay = new JdbcSink(dataQueue, sinkConfig);
mysqlSinkOfDay.start();
}

/**
* Aggregate data from mysql data source and store in local cache for openapi.
*/
private void mysqlToTenMinutesCache() {
DataQueue dataQueue = new DataQueue(queueSize);
mysqlSourceOfTenMinutesCache =
new JdbcSource(dataQueue, buildMysqlSourceConfig(AuditCycle.MINUTE_10, statBackTimes));
mysqlSourceOfTenMinutesCache.start();

cacheSinkOfTenMinutesCache = new CacheSink(dataQueue, TenMinutesCache.getInstance().getCache());
cacheSinkOfTenMinutesCache.start();
}
int statBackTimes = configuration.get(KEY_SUMMARY_REALTIME_STAT_BACK_TIMES,
DEFAULT_SUMMARY_REALTIME_STAT_BACK_TIMES);

/**
* Aggregate data from mysql data source and store in local cache for openapi.
*/
private void mysqlToHalfHourCache() {
DataQueue dataQueue = new DataQueue(queueSize);
mysqlSourceOfHalfHourCache =
new JdbcSource(dataQueue, buildMysqlSourceConfig(AuditCycle.MINUTE_30, statBackTimes));
mysqlSourceOfHalfHourCache.start();
startDataFlow(AuditCycle.MINUTE_10, statBackTimes, TenMinutesCache.getInstance().getCache());
startDataFlow(AuditCycle.MINUTE_30, statBackTimes, HalfHourCache.getInstance().getCache());
startDataFlow(AuditCycle.HOUR, statBackTimes, HourCache.getInstance().getCache());

cacheSinkOfHalfHourCache = new CacheSink(dataQueue, HalfHourCache.getInstance().getCache());
cacheSinkOfHalfHourCache.start();
if (configuration.get(KEY_ENABLE_STAT_AUDIT_DAY, DEFAULT_ENABLE_STAT_AUDIT_DAY)) {
statBackTimes = configuration.get(KEY_SUMMARY_DAILY_STAT_BACK_TIMES, DEFAULT_SUMMARY_DAILY_STAT_BACK_TIMES);
startDataFlow(AuditCycle.DAY, statBackTimes, null);
}
}

/**
* Aggregate data from mysql data source and store in local cache for openapi.
*/
private void mysqlToHourCache() {
private void startDataFlow(AuditCycle cycle, int backTimes, Cache<String, StatData> cache) {
DataQueue dataQueue = new DataQueue(queueSize);
mysqlSourceOfHourCache = new JdbcSource(dataQueue, buildMysqlSourceConfig(AuditCycle.HOUR, statBackTimes));
mysqlSourceOfHourCache.start();

cacheSinkOfHourCache = new CacheSink(dataQueue, HourCache.getInstance().getCache());
cacheSinkOfHourCache.start();
JdbcSource source = new JdbcSource(dataQueue, buildMysqlSourceConfig(cycle, backTimes));
source.start();
dataFlowSources.add(source);

AuditSink sink;
if (cache != null) {
sink = new CacheSink(dataQueue, cache);
} else {
SinkConfig sinkConfig = buildMysqlSinkConfig(configuration.get(KEY_MYSQL_SINK_INSERT_DAY_SQL,
DEFAULT_MYSQL_SINK_INSERT_DAY_SQL));
sink = new JdbcSink(dataQueue, sinkConfig);
}
sink.start();
dataFlowSinks.add(sink);
}

/**
* Aggregate data from clickhouse data source and store the aggregated data in the target mysql table.
* The default audit data cycle is 5 minutes,and stored in a temporary table.
* Support multiple audit source clusters.
*/
public void auditSourceToMysql() {
DataQueue dataQueue = new DataQueue(queueSize);
List<JdbcConfig> sourceList = ConfigService.getInstance().getAuditSourceByServiceId(serviceId);
for (JdbcConfig jdbcConfig : sourceList) {
JdbcSource jdbcSource = new JdbcSource(dataQueue, buildAuditJdbcSourceConfig(jdbcConfig));
jdbcSource.start();
auditJdbcSources.add(jdbcSource);
originalSources.add(jdbcSource);
LOGGER.info("Audit source to mysql jdbc config:{}", jdbcConfig);
}

SinkConfig sinkConfig = buildMysqlSinkConfig(Configuration.getInstance().get(KEY_MYSQL_SINK_INSERT_TEMP_SQL,
SinkConfig sinkConfig = buildMysqlSinkConfig(configuration.get(KEY_MYSQL_SINK_INSERT_TEMP_SQL,
DEFAULT_MYSQL_SINK_INSERT_TEMP_SQL));
mysqlSinkOfTemp = new JdbcSink(dataQueue, sinkConfig);
mysqlSinkOfTemp.start();
JdbcSink sink = new JdbcSink(dataQueue, sinkConfig);
sink.start();
dataFlowSinks.add(sink);
}

/**
Expand All @@ -193,7 +155,7 @@ private SinkConfig buildMysqlSinkConfig(String insertSql) {
private SourceConfig buildMysqlSourceConfig(AuditCycle auditCycle, int statBackTimes) {
JdbcConfig jdbcConfig = JdbcUtils.buildMysqlConfig();
return new SourceConfig(auditCycle,
Configuration.getInstance().get(KEY_MYSQL_SOURCE_QUERY_TEMP_SQL,
configuration.get(KEY_MYSQL_SOURCE_QUERY_TEMP_SQL,
DEFAULT_MYSQL_SOURCE_QUERY_TEMP_SQL),
statBackTimes,
jdbcConfig.getDriverClass(),
Expand All @@ -209,9 +171,9 @@ private SourceConfig buildMysqlSourceConfig(AuditCycle auditCycle, int statBackT
*/
private SourceConfig buildAuditJdbcSourceConfig(JdbcConfig jdbcConfig) {
return new SourceConfig(AuditCycle.MINUTE_5,
Configuration.getInstance().get(KEY_SOURCE_STAT_SQL,
configuration.get(KEY_SOURCE_STAT_SQL,
DEFAULT_SOURCE_STAT_SQL),
Configuration.getInstance().get(KEY_SUMMARY_REALTIME_STAT_BACK_TIMES,
configuration.get(KEY_SUMMARY_REALTIME_STAT_BACK_TIMES,
DEFAULT_SUMMARY_REALTIME_STAT_BACK_TIMES),
jdbcConfig.getDriverClass(),
jdbcConfig.getJdbcUrl(),
Expand All @@ -224,21 +186,14 @@ private SourceConfig buildAuditJdbcSourceConfig(JdbcConfig jdbcConfig) {
* Stop the etl service,and destroy related resources.
*/
public void stop() {
mysqlSourceOfTemp.destroy();
mysqlSinkOfDay.destroy();

for (JdbcSource source : auditJdbcSources) {
for (JdbcSource source : originalSources) {
source.destroy();
}
if (null != mysqlSinkOfTemp)
mysqlSinkOfTemp.destroy();

mysqlSourceOfTenMinutesCache.destroy();
mysqlSourceOfHalfHourCache.destroy();
mysqlSourceOfHourCache.destroy();

cacheSinkOfTenMinutesCache.destroy();
cacheSinkOfHalfHourCache.destroy();
cacheSinkOfHourCache.destroy();
for (JdbcSource source : dataFlowSources) {
source.destroy();
}
for (AuditSink sink : dataFlowSinks) {
sink.destroy();
}
}
}
Loading

0 comments on commit 778cbe6

Please sign in to comment.