diff --git a/config/config.yaml b/config/config.yaml
index 3b259c76bf..7d8d71120d 100644
--- a/config/config.yaml
+++ b/config/config.yaml
@@ -128,7 +128,7 @@ preprocessorConfig:
serverConfig:
serverPort: ${KALDB_PREPROCESSOR_SERVER_PORT:-8086}
serverAddress: ${KALDB_PREPROCESSOR_SERVER_ADDRESS:-localhost}
- requestTimeoutMs: ${KALDB_PREPROCESSOR_REQUEST_TIMEOUT_MS:-5000}
+ requestTimeoutMs: ${KALDB_PREPROCESSOR_REQUEST_TIMEOUT_MS:-30000}
upstreamTopics: [${KAKFA_UPSTREAM_TOPICS:-test-topic-in}]
downstreamTopic: ${KAKFA_DOWNSTREAM_TOPIC:-test-topic}
preprocessorInstanceCount: ${PREPROCESSOR_INSTANCE_COUNT:-1}
diff --git a/kaldb/pom.xml b/kaldb/pom.xml
index 798f81ba59..ad0dd990ed 100644
--- a/kaldb/pom.xml
+++ b/kaldb/pom.xml
@@ -20,7 +20,7 @@
4.4.16
3.23.2
1.57.2
- 1.11.0
+ 1.11.5
1.25.2
3.5.0
2.15.2
diff --git a/kaldb/src/main/java/com/slack/kaldb/server/ArmeriaService.java b/kaldb/src/main/java/com/slack/kaldb/server/ArmeriaService.java
index 598a34c1a8..3ee6128534 100644
--- a/kaldb/src/main/java/com/slack/kaldb/server/ArmeriaService.java
+++ b/kaldb/src/main/java/com/slack/kaldb/server/ArmeriaService.java
@@ -62,16 +62,11 @@ public Builder(int port, String serviceName, PrometheusMeterRegistry prometheusM
this.serviceName = serviceName;
this.serverBuilder = Server.builder().http(port);
- initializeLimits();
initializeCompression();
initializeLogging();
initializeManagementEndpoints(prometheusMeterRegistry);
}
- private void initializeLimits() {
- serverBuilder.maxNumConnections(MAX_CONNECTIONS);
- }
-
private void initializeCompression() {
serverBuilder.decorator(EncodingService.builder().newDecorator());
}
diff --git a/kaldb/src/main/java/com/slack/kaldb/server/OpenSearchBulkIngestApi.java b/kaldb/src/main/java/com/slack/kaldb/server/OpenSearchBulkIngestApi.java
index 7d81997f1c..f7a3548f20 100644
--- a/kaldb/src/main/java/com/slack/kaldb/server/OpenSearchBulkIngestApi.java
+++ b/kaldb/src/main/java/com/slack/kaldb/server/OpenSearchBulkIngestApi.java
@@ -26,6 +26,7 @@
import com.slack.kaldb.util.RuntimeHalterImpl;
import com.slack.service.murron.trace.Trace;
import io.micrometer.core.instrument.Timer;
+import io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics;
import io.micrometer.prometheus.PrometheusMeterRegistry;
import java.util.List;
import java.util.Map;
@@ -39,6 +40,7 @@
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.TimeoutException;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.common.Strings;
import org.slf4j.Logger;
@@ -66,6 +68,7 @@ public class OpenSearchBulkIngestApi extends AbstractService {
private final Timer configReloadTimer;
private final KafkaProducer kafkaProducer;
+ private final KafkaClientMetrics kafkaMetrics;
private final ReentrantLock lockTransactionalProducer = new ReentrantLock();
@@ -88,6 +91,9 @@ protected void doStop() {
LOG.info("Stopping OpenSearchBulkIngestApi service");
datasetMetadataStore.removeListener(datasetListener);
kafkaProducer.close();
+ if (kafkaMetrics != null) {
+ kafkaMetrics.close();
+ }
LOG.info("OpenSearchBulkIngestApi service closed");
notifyStopped();
} catch (Throwable t) {
@@ -154,6 +160,8 @@ public OpenSearchBulkIngestApi(
// consumer sets isolation.level as "read_committed"
// see "zombie fencing" https://www.confluent.io/blog/transactions-apache-kafka/
this.kafkaProducer = createKafkaTransactionProducer(UUID.randomUUID().toString());
+ kafkaMetrics = new KafkaClientMetrics(kafkaProducer);
+ kafkaMetrics.bindTo(meterRegistry);
this.kafkaProducer.initTransactions();
}
@@ -220,13 +228,24 @@ public BulkIngestResponse produceDocuments(Map> indexDo
try {
kafkaProducer.beginTransaction();
for (Trace.Span doc : indexDoc.getValue()) {
-
ProducerRecord producerRecord =
new ProducerRecord<>(
preprocessorConfig.getDownstreamTopic(), partition, index, doc.toByteArray());
kafkaProducer.send(producerRecord);
}
kafkaProducer.commitTransaction();
+ } catch (TimeoutException te) {
+ LOG.error("Commit transaction timeout", te);
+ new RuntimeHalterImpl()
+ .handleFatal(
+ new Throwable(
+ "KafkaProducer needs to shutdown as we don't have retry yet and we cannot call abortTxn on timeout",
+ te));
+ // the commitTransaction waits till "max.block.ms" after which it will timeout
+ // in that case we cannot cause abort exception because that throws the following error
+ // "Cannot attempt operation `abortTransaction` because the previous
+ // call to `commitTransaction` timed out and must be retried"
+ //
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// We can't recover from these exceptions, so our only option is to close the producer and
// exit.
@@ -268,6 +287,8 @@ private KafkaProducer createKafkaTransactionProducer(String tran
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("transactional.id", transactionId);
+ props.put("max.block.ms", "25000");
+ props.put("compression.type", "snappy");
props.put("linger.ms", 250);
return new KafkaProducer<>(props);
}