Skip to content

Commit

Permalink
[INLONG-10594][Sort] Provide default kafka producer configuration (ap…
Browse files Browse the repository at this point in the history
…ache#10595)

* [INLONG-10594][Sort] Provide default kafka producer configuration
---------

Co-authored-by: vernedeng <[email protected]>
  • Loading branch information
vernedeng and vernedeng authored Jul 10, 2024
1 parent a8db0d2 commit 00035ee
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,52 +17,19 @@

package org.apache.inlong.sort.standalone.config.pojo;

import lombok.Data;

import java.util.HashMap;
import java.util.Map;

/**
*
* CacheClusterConfig
*/
@Data
public class CacheClusterConfig {

private String clusterName;
private Map<String, String> params = new HashMap<>();

/**
* get clusterName
*
* @return the clusterName
*/
public String getClusterName() {
return clusterName;
}

/**
* set clusterName
*
* @param clusterName the clusterName to set
*/
public void setClusterName(String clusterName) {
this.clusterName = clusterName;
}

/**
* get params
*
* @return the params
*/
public Map<String, String> getParams() {
return params;
}

/**
* set params
*
* @param params the params to set
*/
public void setParams(Map<String, String> params) {
this.params = params;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
import org.apache.inlong.sort.standalone.config.pojo.InlongId;
import org.apache.inlong.sort.standalone.metrics.SortConfigMetricReporter;
import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
Expand Down Expand Up @@ -52,6 +53,7 @@ public class KafkaFederationSinkContext extends SinkContext {
public static final String KEY_EVENT_HANDLER = "eventHandler";

private KafkaNodeConfig kafkaNodeConfig;
private CacheClusterConfig cacheClusterConfig;
private Map<String, KafkaIdConfig> idConfigMap = new ConcurrentHashMap<>();

public KafkaFederationSinkContext(String sinkName, Context context, Channel channel) {
Expand Down Expand Up @@ -82,6 +84,11 @@ public void reload() {
this.taskConfig = newTaskConfig;
this.sortTaskConfig = newSortTaskConfig;

CacheClusterConfig clusterConfig = new CacheClusterConfig();
clusterConfig.setClusterName(this.taskName);
clusterConfig.setParams(this.sortTaskConfig.getSinkParams());
this.cacheClusterConfig = clusterConfig;

Map<String, KafkaIdConfig> fromTaskConfig = fromTaskConfig(taskConfig);
Map<String, KafkaIdConfig> fromSortTaskConfig = fromSortTaskConfig(sortTaskConfig);
SortConfigMetricReporter.reportClusterDiff(clusterId, taskName, fromTaskConfig, fromSortTaskConfig);
Expand Down Expand Up @@ -121,6 +128,10 @@ public KafkaNodeConfig getNodeConfig() {
return kafkaNodeConfig;
}

public CacheClusterConfig getCacheClusterConfig() {
return cacheClusterConfig;
}

/**
* get Topic by uid
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@

import org.apache.inlong.common.pojo.sort.node.KafkaNodeConfig;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
import org.apache.inlong.sort.standalone.utils.Constants;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import org.apache.flume.Context;
import org.apache.flume.Transaction;
import org.apache.flume.lifecycle.LifecycleAware;
import org.apache.flume.lifecycle.LifecycleState;
Expand All @@ -36,6 +36,7 @@
import org.slf4j.Logger;

import java.io.IOException;
import java.util.HashMap;
import java.util.Properties;

/** wrapper of kafka producer */
Expand All @@ -45,54 +46,113 @@ public class KafkaProducerCluster implements LifecycleAware {

private final String workerName;
protected final KafkaNodeConfig nodeConfig;
protected final CacheClusterConfig cacheClusterConfig;
private final KafkaFederationSinkContext sinkContext;
private final Context context;

private final String cacheClusterName;
private LifecycleState state;
private IEvent2KafkaRecordHandler handler;

private KafkaProducer<String, byte[]> producer;

public KafkaProducerCluster(
String workerName,
CacheClusterConfig cacheClusterConfig,
KafkaNodeConfig nodeConfig,
KafkaFederationSinkContext kafkaFederationSinkContext) {
this.workerName = Preconditions.checkNotNull(workerName);
this.nodeConfig = nodeConfig;
this.cacheClusterConfig = cacheClusterConfig;
this.sinkContext = Preconditions.checkNotNull(kafkaFederationSinkContext);
this.context = new Context(nodeConfig.getProperties() != null ? nodeConfig.getProperties() : Maps.newHashMap());
this.state = LifecycleState.IDLE;
this.cacheClusterName = nodeConfig.getNodeName();
this.handler = sinkContext.createEventHandler();
}

/** start and init kafka producer */
@Override
public void start() {
if (CommonPropertiesHolder.useUnifiedConfiguration()) {
startByNodeConfig();
} else {
startByCacheCluster();
}
}

private void startByCacheCluster() {
this.state = LifecycleState.START;
if (cacheClusterConfig == null) {
LOG.error("start kafka producer cluster failed, cacheClusterConfig config is null");
return;
}
try {
Properties props = new Properties();
props.putAll(context.getParameters());
props.put(
ProducerConfig.PARTITIONER_CLASS_CONFIG,
context.getString(ProducerConfig.PARTITIONER_CLASS_CONFIG, PartitionerSelector.class.getName()));
props.put(
ProducerConfig.ACKS_CONFIG,
context.getString(ProducerConfig.ACKS_CONFIG, "all"));
props.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
nodeConfig.getBootstrapServers());
Properties props = defaultKafkaProperties();
props.putAll(cacheClusterConfig.getParams());
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, PartitionerSelector.class.getName());
props.put(ProducerConfig.ACKS_CONFIG,
cacheClusterConfig.getParams().getOrDefault(ProducerConfig.ACKS_CONFIG, "all"));

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
cacheClusterConfig.getParams().get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));

props.put(ProducerConfig.CLIENT_ID_CONFIG,
nodeConfig.getClientId() + "-" + workerName);
LOG.info("init kafka client info: " + props);
cacheClusterConfig.getParams().get(ProducerConfig.CLIENT_ID_CONFIG) + "-" + workerName);
LOG.info("init kafka client by cache cluster info: " + props);
producer = new KafkaProducer<>(props, new StringSerializer(), new ByteArraySerializer());
Preconditions.checkNotNull(producer);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
}

private void startByNodeConfig() {
this.state = LifecycleState.START;
if (nodeConfig == null) {
LOG.error("start kafka producer cluster failed, node config is null");
return;
}
try {
Properties props = defaultKafkaProperties();
props.putAll(nodeConfig.getProperties() == null ? new HashMap<>() : nodeConfig.getProperties());
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, PartitionerSelector.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, nodeConfig.getAcks());
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, nodeConfig.getBootstrapServers());
props.put(ProducerConfig.CLIENT_ID_CONFIG, nodeConfig.getClientId() + "-" + workerName);
LOG.info("init kafka client by node config info: " + props);
producer = new KafkaProducer<>(props, new StringSerializer(), new ByteArraySerializer());
Preconditions.checkNotNull(producer);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
}

public Properties defaultKafkaProperties() {
Properties props = new Properties();
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
props.put(ProducerConfig.BATCH_SIZE_CONFIG, "122880");
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "44740000");
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "86400000");
props.put(ProducerConfig.LINGER_MS_CONFIG, "500");
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "8388608");
props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, "300000");
props.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, "32768");
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000");
props.put(ProducerConfig.RETRIES_CONFIG, "100000");
props.put(ProducerConfig.SEND_BUFFER_CONFIG, "524288");
props.put("mute.partition.error.max.times", "20");
props.put("mute.partition.max.percentage", "20");
props.put("rpc.timeout.ms", "30000");
props.put("topic.expiry.ms", "86400000");
props.put("unmute.partition.interval.ms", "600000");
props.put("metadata.retry.backoff.ms", "500");
props.put("metadata.fetch.timeout.ms", "1000");
props.put("maxThreads", "2");
props.put("enable.replace.partition.for.can.retry", "true");
props.put("enable.replace.partition.for.not.leader", "true");
props.put("enable.topic.partition.circuit.breaker", "true");
return props;
}

/** stop and close kafka producer */
@Override
public void stop() {
Expand Down Expand Up @@ -159,12 +219,4 @@ public boolean send(ProfileEvent profileEvent, Transaction tx) throws IOExceptio
}
}

/**
* get cache cluster name
*
* @return cacheClusterName
*/
public String getCacheClusterName() {
return cacheClusterName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import org.apache.inlong.common.pojo.sort.node.KafkaNodeConfig;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;

import com.google.common.base.Preconditions;
Expand All @@ -45,6 +47,7 @@ public class KafkaProducerFederation implements Runnable {
private KafkaNodeConfig nodeConfig;
private KafkaProducerCluster cluster;
private KafkaProducerCluster deleteCluster;
private CacheClusterConfig cacheClusterConfig;

public KafkaProducerFederation(String workerName, KafkaFederationSinkContext context) {
this.workerName = Preconditions.checkNotNull(workerName);
Expand Down Expand Up @@ -86,13 +89,39 @@ private void reload() {
LOG.error("failed to close delete cluster, ex={}", e.getMessage(), e);
}

if (CommonPropertiesHolder.useUnifiedConfiguration()) {
reloadByNodeConfig();
} else {
reloadByCacheClusterConfig();
}

}

private void reloadByCacheClusterConfig() {
try {
if (cacheClusterConfig != null && !cacheClusterConfig.equals(context.getCacheClusterConfig())) {
return;
}
this.cacheClusterConfig = context.getCacheClusterConfig();
KafkaProducerCluster updateCluster =
new KafkaProducerCluster(workerName, cacheClusterConfig, nodeConfig, context);
updateCluster.start();
this.deleteCluster = cluster;
this.cluster = updateCluster;
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}

}

private void reloadByNodeConfig() {
try {
if (nodeConfig != null && context.getNodeConfig().getVersion() <= nodeConfig.getVersion()) {
return;
}
this.nodeConfig = context.getNodeConfig();
KafkaProducerCluster updateCluster = new KafkaProducerCluster(workerName, nodeConfig, context);
KafkaProducerCluster updateCluster =
new KafkaProducerCluster(workerName, cacheClusterConfig, nodeConfig, context);
updateCluster.start();
this.deleteCluster = cluster;
this.cluster = updateCluster;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
import org.apache.inlong.sort.standalone.config.pojo.InlongId;
import org.apache.inlong.sort.standalone.metrics.SortConfigMetricReporter;
import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
Expand All @@ -50,6 +51,7 @@ public class PulsarFederationSinkContext extends SinkContext {
public static final String KEY_EVENT_HANDLER = "eventHandler";
private Map<String, PulsarIdConfig> idConfigMap = new ConcurrentHashMap<>();
private PulsarNodeConfig pulsarNodeConfig;
private CacheClusterConfig cacheClusterConfig;

public PulsarFederationSinkContext(String sinkName, Context context, Channel channel) {
super(sinkName, context, channel);
Expand All @@ -73,6 +75,12 @@ public void reload() {
if (pulsarNodeConfig == null || requestNodeConfig.getVersion() > pulsarNodeConfig.getVersion()) {
this.pulsarNodeConfig = requestNodeConfig;
}

CacheClusterConfig clusterConfig = new CacheClusterConfig();
clusterConfig.setClusterName(this.taskName);
clusterConfig.setParams(this.sortTaskConfig.getSinkParams());
this.cacheClusterConfig = clusterConfig;

this.taskConfig = newTaskConfig;
this.sortTaskConfig = newSortTaskConfig;

Expand Down

0 comments on commit 00035ee

Please sign in to comment.