diff --git a/config/config.yaml b/config/config.yaml index 3b259c76bf..7d8d71120d 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -128,7 +128,7 @@ preprocessorConfig: serverConfig: serverPort: ${KALDB_PREPROCESSOR_SERVER_PORT:-8086} serverAddress: ${KALDB_PREPROCESSOR_SERVER_ADDRESS:-localhost} - requestTimeoutMs: ${KALDB_PREPROCESSOR_REQUEST_TIMEOUT_MS:-5000} + requestTimeoutMs: ${KALDB_PREPROCESSOR_REQUEST_TIMEOUT_MS:-30000} upstreamTopics: [${KAKFA_UPSTREAM_TOPICS:-test-topic-in}] downstreamTopic: ${KAKFA_DOWNSTREAM_TOPIC:-test-topic} preprocessorInstanceCount: ${PREPROCESSOR_INSTANCE_COUNT:-1} diff --git a/kaldb/src/main/java/com/slack/kaldb/server/ArmeriaService.java b/kaldb/src/main/java/com/slack/kaldb/server/ArmeriaService.java index 598a34c1a8..3ee6128534 100644 --- a/kaldb/src/main/java/com/slack/kaldb/server/ArmeriaService.java +++ b/kaldb/src/main/java/com/slack/kaldb/server/ArmeriaService.java @@ -62,16 +62,11 @@ public Builder(int port, String serviceName, PrometheusMeterRegistry prometheusM this.serviceName = serviceName; this.serverBuilder = Server.builder().http(port); - initializeLimits(); initializeCompression(); initializeLogging(); initializeManagementEndpoints(prometheusMeterRegistry); } - private void initializeLimits() { - serverBuilder.maxNumConnections(MAX_CONNECTIONS); - } - private void initializeCompression() { serverBuilder.decorator(EncodingService.builder().newDecorator()); } diff --git a/kaldb/src/main/java/com/slack/kaldb/server/OpenSearchBulkIngestApi.java b/kaldb/src/main/java/com/slack/kaldb/server/OpenSearchBulkIngestApi.java index 7d81997f1c..90a828ea18 100644 --- a/kaldb/src/main/java/com/slack/kaldb/server/OpenSearchBulkIngestApi.java +++ b/kaldb/src/main/java/com/slack/kaldb/server/OpenSearchBulkIngestApi.java @@ -39,6 +39,7 @@ import org.apache.kafka.common.errors.AuthorizationException; import org.apache.kafka.common.errors.OutOfOrderSequenceException; import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.errors.TimeoutException; import org.opensearch.action.index.IndexRequest; import org.opensearch.common.Strings; import org.slf4j.Logger; @@ -220,13 +221,21 @@ public BulkIngestResponse produceDocuments(Map> indexDo try { kafkaProducer.beginTransaction(); for (Trace.Span doc : indexDoc.getValue()) { - ProducerRecord producerRecord = new ProducerRecord<>( preprocessorConfig.getDownstreamTopic(), partition, index, doc.toByteArray()); kafkaProducer.send(producerRecord); } + LOG.info("begin commitTransaction()"); kafkaProducer.commitTransaction(); + LOG.info("end commitTransaction()"); + } catch (TimeoutException te) { + LOG.error("Commit transaction timeout", te); + // the commitTransaction waits till "max.block.ms" after which it will timeout + // in that case we cannot cause abort exception because that throws the following error + // "Cannot attempt operation `abortTransaction` because the previous + // call to `commitTransaction` timed out and must be retried" + // } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) { // We can't recover from these exceptions, so our only option is to close the producer and // exit. @@ -268,6 +277,8 @@ private KafkaProducer createKafkaTransactionProducer(String tran props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); props.put("transactional.id", transactionId); + props.put("max.block.ms", "25000"); + props.put("compression.type", "snappy"); props.put("linger.ms", 250); return new KafkaProducer<>(props); }