Skip to content

Add Kafka producer configuration #23

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@
<properties>
<log4j2.version>2.24.1</log4j2.version>
<project.scm.id>github</project.scm.id>
<kafka.version>3.3.2</kafka.version>
<airlift.version>214</airlift.version>
<aws-msk-iam-auth.version>1.1.5</aws-msk-iam-auth.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.release>11</maven.compiler.release>
<jackson.version>2.18.0</jackson.version>
</properties>
<dependencies>
<dependency>
Expand Down Expand Up @@ -48,6 +54,26 @@
<artifactId>commons-codec</artifactId>
<version>1.17.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>io.airlift</groupId>
<artifactId>log</artifactId>
<version>${airlift.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.msk</groupId>
<artifactId>aws-msk-iam-auth</artifactId>
<version>${aws-msk-iam-auth.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
</dependencies>

<distributionManagement>
Expand Down Expand Up @@ -95,4 +121,4 @@
</plugins>
</build>

</project>
</project>
29 changes: 29 additions & 0 deletions src/main/assembly/plugin.conf
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,32 @@ enable_compute_all_query_digest=false

# Filter conditions when importing audit information
filter=


### kafka configuration

# Set to True if you want to enable Kafka process
kafka_enableMSK=false

# Name of current Starrocks instance/cluster
kafka_instanceName=

# Name of the kafka topic that data will be written.
kafka_topic=

# String with all bootstrap servers related to the Kafka cluster.
kafka_bootstrapServer=

# Properties for kafka producer
kafka_conf_keySerializer=org.apache.kafka.common.serialization.StringSerializer
kafka_conf_valueSerializer=org.apache.kafka.common.serialization.StringSerializer
kafka_conf_maxRequestSize=52428800
kafka_conf_bufferMemory=36700160
kafka_conf_maxBlockMs=180000
kafka_conf_batchSize=102400
kafka_conf_compressionType=snappy
kafka_conf_lingerMs=20
kafka_conf_securityProtocol=SASL_SSL
kafka_conf_saslMechanism=AWS_MSK_IAM
kafka_conf_saslJaasConfig=software.amazon.msk.auth.iam.IAMLoginModule required;
kafka_conf_saslClientCallbackHandlerClass=software.amazon.msk.auth.iam.IAMClientCallbackHandler
160 changes: 160 additions & 0 deletions src/main/java/com/starrocks/plugin/audit/AuditLoaderPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,16 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.starrocks.plugin.audit.model.QueryEventData;
import java.util.concurrent.ExecutionException;

/*
* This plugin will load audit log to specified starrocks table at specified interval
*/
Expand All @@ -70,6 +80,8 @@ public class AuditLoaderPlugin extends Plugin implements AuditPlugin {
private volatile boolean isClosed = false;
private volatile boolean isInit = false;

public static boolean kafkaEnable = false;

/**
* 是否包含新字段 candidateMvs,如果旧版本没有该字段则值为空
*/
Expand Down Expand Up @@ -329,6 +341,23 @@ public static class AuditLoaderConf {
public static final String PROP_SECRET_KEY = "secret_key";
public String secretKey = "";

public static String PROP_KAFKA_ENABLE = "kafka_enableMSK";
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is normal Kafka or Confluent not supported?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normal Kafka supported.

public static String PROP_KAFKA_INSTANCE_NAME = "kafka_instanceName";
public static String PROP_KAFKA_TOPIC = "kafka_topic";
public static String PROP_KAFKA_BOOTSTRAP_SERVERS = "kafka_bootstrapServer";
public static String PROP_KAFKA_CONF_KEYSERIALIZER = "kafka_conf_keySerializer";
public static String PROP_KAFKA_CONF_VALUESERIALIZER = "kafka_conf_valueSerializer";
public static String PROP_KAFKA_CONF_MAXREQUESTSIZE = "kafka_conf_maxRequestSize";
public static String PROP_KAFKA_CONF_BUFFERMEMORY = "kafka_conf_bufferMemory";
public static String PROP_KAFKA_CONF_MAXBLOCKMS = "kafka_conf_maxBlockMs";
public static String PROP_KAFKA_CONF_BATCHSIZE = "kafka_conf_batchSize";
public static String PROP_KAFKA_CONF_COMPRESSIONTYPE = "kafka_conf_compressionType";
public static String PROP_KAFKA_CONF_LINGERMS = "kafka_conf_lingerMs";
public static String PROP_KAFKA_CONF_SECURITYPROTOCOL = "kafka_conf_securityProtocol";
public static String PROP_KAFKA_CONF_SASLMECHANISM = "kafka_conf_saslMechanism";
public static String PROP_KAFKA_CONF_SASLJAASCONFIG = "kafka_conf_saslJaasConfig";
public static String PROP_KAFKA_CONF_SASLCLIENTCALLBACKHANDLERCLASS = "kafka_conf_saslClientCallbackHandlerClass";

public void init(Map<String, String> properties) throws PluginException {
try {
if (properties.containsKey(PROP_MAX_BATCH_SIZE)) {
Expand Down Expand Up @@ -373,6 +402,54 @@ public void init(Map<String, String> properties) throws PluginException {
if (properties.containsKey(STREAM_LOAD_FILTER)) {
streamLoadFilter = properties.get(STREAM_LOAD_FILTER);
}
if (properties.containsKey(PROP_KAFKA_ENABLE)) {
kafkaEnable = Boolean.parseBoolean(properties.get(PROP_KAFKA_ENABLE));
}
if (properties.containsKey(PROP_KAFKA_INSTANCE_NAME)) {
PROP_KAFKA_INSTANCE_NAME = properties.get(PROP_KAFKA_INSTANCE_NAME);
}
if (properties.containsKey(PROP_KAFKA_TOPIC)) {
PROP_KAFKA_TOPIC = properties.get(PROP_KAFKA_TOPIC);
}
if (properties.containsKey(PROP_KAFKA_BOOTSTRAP_SERVERS)) {
PROP_KAFKA_BOOTSTRAP_SERVERS = properties.get(PROP_KAFKA_BOOTSTRAP_SERVERS);
}
if (properties.containsKey(PROP_KAFKA_CONF_KEYSERIALIZER)) {
PROP_KAFKA_CONF_KEYSERIALIZER = properties.get(PROP_KAFKA_CONF_KEYSERIALIZER);
}
if (properties.containsKey(PROP_KAFKA_CONF_VALUESERIALIZER)) {
PROP_KAFKA_CONF_VALUESERIALIZER = properties.get(PROP_KAFKA_CONF_VALUESERIALIZER);
}
if (properties.containsKey(PROP_KAFKA_CONF_MAXREQUESTSIZE)) {
PROP_KAFKA_CONF_MAXREQUESTSIZE = properties.get(PROP_KAFKA_CONF_MAXREQUESTSIZE);
}
if (properties.containsKey(PROP_KAFKA_CONF_BUFFERMEMORY)) {
PROP_KAFKA_CONF_BUFFERMEMORY = properties.get(PROP_KAFKA_CONF_BUFFERMEMORY);
}
if (properties.containsKey(PROP_KAFKA_CONF_MAXBLOCKMS)) {
PROP_KAFKA_CONF_MAXBLOCKMS = properties.get(PROP_KAFKA_CONF_MAXBLOCKMS);
}
if (properties.containsKey(PROP_KAFKA_CONF_BATCHSIZE)) {
PROP_KAFKA_CONF_BATCHSIZE = properties.get(PROP_KAFKA_CONF_BATCHSIZE);
}
if (properties.containsKey(PROP_KAFKA_CONF_COMPRESSIONTYPE)) {
PROP_KAFKA_CONF_COMPRESSIONTYPE = properties.get(PROP_KAFKA_CONF_COMPRESSIONTYPE);
}
if (properties.containsKey(PROP_KAFKA_CONF_LINGERMS)) {
PROP_KAFKA_CONF_LINGERMS = properties.get(PROP_KAFKA_CONF_LINGERMS);
}
if (properties.containsKey(PROP_KAFKA_CONF_SECURITYPROTOCOL)) {
PROP_KAFKA_CONF_SECURITYPROTOCOL = properties.get(PROP_KAFKA_CONF_SECURITYPROTOCOL);
}
if (properties.containsKey(PROP_KAFKA_CONF_SASLMECHANISM)) {
PROP_KAFKA_CONF_SASLMECHANISM = properties.get(PROP_KAFKA_CONF_SASLMECHANISM);
}
if (properties.containsKey(PROP_KAFKA_CONF_SASLJAASCONFIG)) {
PROP_KAFKA_CONF_SASLJAASCONFIG = properties.get(PROP_KAFKA_CONF_SASLJAASCONFIG);
}
if (properties.containsKey(PROP_KAFKA_CONF_SASLCLIENTCALLBACKHANDLERCLASS)) {
PROP_KAFKA_CONF_SASLCLIENTCALLBACKHANDLERCLASS = properties.get(PROP_KAFKA_CONF_SASLCLIENTCALLBACKHANDLERCLASS);
}
} catch (Exception e) {
throw new PluginException(e.getMessage());
}
Expand All @@ -392,6 +469,9 @@ public void run() {
AuditEvent event = auditEventQueue.poll(5, TimeUnit.SECONDS);
if (event != null) {
assembleAudit(event);
if (kafkaEnable) {
sendToKafka(event);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it send a message to kafka when each event arrives? Will it cost FE too resource to process events?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, when each audit event arrives it will be send to the kafka queue. It will not generate extra cost because we are only sending messages to the kafka queue, not doing any extra process to it.

}
}
loadIfNecessary(loader);
} catch (InterruptedException ie) {
Expand All @@ -410,4 +490,84 @@ public static synchronized String longToTimeString(long timeStamp) {
return DATETIME_FORMAT.format(new Date(timeStamp));
}

public void sendToKafka(AuditEvent event){

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", AuditLoaderConf.PROP_KAFKA_BOOTSTRAP_SERVERS);
properties.setProperty("key.serializer", AuditLoaderConf.PROP_KAFKA_CONF_KEYSERIALIZER);
properties.setProperty("value.serializer", AuditLoaderConf.PROP_KAFKA_CONF_VALUESERIALIZER);
properties.setProperty("max.request.size", AuditLoaderConf.PROP_KAFKA_CONF_MAXREQUESTSIZE);
properties.setProperty("buffer.memory", AuditLoaderConf.PROP_KAFKA_CONF_BUFFERMEMORY);
properties.setProperty("max.block.ms", AuditLoaderConf.PROP_KAFKA_CONF_MAXBLOCKMS);
properties.setProperty("batch.size", AuditLoaderConf.PROP_KAFKA_CONF_BATCHSIZE);
properties.setProperty("compression.type", AuditLoaderConf.PROP_KAFKA_CONF_COMPRESSIONTYPE);
properties.setProperty("linger.ms", AuditLoaderConf.PROP_KAFKA_CONF_LINGERMS);
properties.setProperty("security.protocol",AuditLoaderConf.PROP_KAFKA_CONF_SECURITYPROTOCOL);
properties.setProperty("sasl.mechanism",AuditLoaderConf.PROP_KAFKA_CONF_SASLMECHANISM);
properties.setProperty("sasl.jaas.config",AuditLoaderConf.PROP_KAFKA_CONF_SASLJAASCONFIG);
properties.setProperty("sasl.client.callback.handler.class",AuditLoaderConf.PROP_KAFKA_CONF_SASLCLIENTCALLBACKHANDLERCLASS);

String queryType = getQueryType(event);
String eventAuditId = getQueryId(queryType,event);

QueryEventData eventAuditEG = new QueryEventData();
eventAuditEG.setId(eventAuditId);
eventAuditEG.setInstanceName(AuditLoaderConf.PROP_KAFKA_INSTANCE_NAME);
eventAuditEG.setTimestamp(longToTimeString(event.timestamp));
eventAuditEG.setQueryType(queryType);
eventAuditEG.setClientIp(event.clientIp);
eventAuditEG.setUser(event.user);
eventAuditEG.setAuthorizedUser(event.authorizedUser);
eventAuditEG.setResourceGroup(event.resourceGroup);
eventAuditEG.setCatalog(event.catalog);
eventAuditEG.setDb(event.db);
eventAuditEG.setState(event.state);
eventAuditEG.setErrorCode(event.errorCode);
eventAuditEG.setQueryTime(event.queryTime);
eventAuditEG.setScanBytes(event.scanBytes);
eventAuditEG.setScanRows(event.scanRows);
eventAuditEG.setReturnRows(event.returnRows);
eventAuditEG.setCpuCostNs(event.cpuCostNs);
eventAuditEG.setMemCostBytes(event.memCostBytes);
eventAuditEG.setStmtId(event.stmtId);
eventAuditEG.setIsQuery(event.isQuery ? true : false);
eventAuditEG.setFeIp(event.feIp);
eventAuditEG.setStmt(truncateByBytes(event.stmt));
if (conf.enableComputeAllQueryDigest && (event.digest == null || StringUtils.isBlank(event.digest))) {
event.digest = computeStatementDigest(event.stmt);
LOG.debug("compute stmt digest, queryId: {} digest: {}", event.queryId, event.digest);
}
eventAuditEG.setDigest(event.digest);
eventAuditEG.setPlanCpuCosts(event.planCpuCosts);
eventAuditEG.setPlanMemCosts(event.planMemCosts);
eventAuditEG.setCandidateMvs(event.candidateMvs);
eventAuditEG.setHitMVs(event.hitMVs);

ObjectMapper mapperEventAuditEG = new ObjectMapper();
mapperEventAuditEG.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);

try {
Producer<String, String> producer = new KafkaProducer<>(properties);
Future<RecordMetadata> res = producer.send(
new ProducerRecord<>(
AuditLoaderConf.PROP_KAFKA_TOPIC,
eventAuditId,
mapperEventAuditEG.writeValueAsString(eventAuditEG)));
try {
RecordMetadata metadata = res.get();
if (metadata.hasOffset()){
LOG.info("Query created event with id: " + eventAuditId + " in partition: "+ String.valueOf(metadata.partition()) + " with offset: " + metadata.offset());
} else {
LOG.error("Query created event with id: " + eventAuditId + " doesn't have offset. It wasn't sent to the topic. ");
}
} catch (InterruptedException | ExecutionException e) {
LOG.error(String.format("Query id: "+ eventAuditId + " Not written to kafka topic - Error of interrupted execution on sendToKafka method: %s", e.getMessage()));
}
producer.close();
} catch (Exception e) {
LOG.error(String.format("Error on sending to kafka: %s", e.getMessage()));
}

}

}
Loading