Skip to content

Commit

Permalink
add kafka producer settings
Browse files Browse the repository at this point in the history
  • Loading branch information
vthacker committed Oct 27, 2023
1 parent 014da40 commit c912f4c
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 7 deletions.
2 changes: 1 addition & 1 deletion config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ preprocessorConfig:
serverConfig:
serverPort: ${KALDB_PREPROCESSOR_SERVER_PORT:-8086}
serverAddress: ${KALDB_PREPROCESSOR_SERVER_ADDRESS:-localhost}
requestTimeoutMs: ${KALDB_PREPROCESSOR_REQUEST_TIMEOUT_MS:-5000}
requestTimeoutMs: ${KALDB_PREPROCESSOR_REQUEST_TIMEOUT_MS:-30000}
upstreamTopics: [${KAKFA_UPSTREAM_TOPICS:-test-topic-in}]
downstreamTopic: ${KAKFA_DOWNSTREAM_TOPIC:-test-topic}
preprocessorInstanceCount: ${PREPROCESSOR_INSTANCE_COUNT:-1}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,11 @@ public Builder(int port, String serviceName, PrometheusMeterRegistry prometheusM
this.serviceName = serviceName;
this.serverBuilder = Server.builder().http(port);

initializeLimits();
initializeCompression();
initializeLogging();
initializeManagementEndpoints(prometheusMeterRegistry);
}

private void initializeLimits() {
serverBuilder.maxNumConnections(MAX_CONNECTIONS);
}

private void initializeCompression() {
serverBuilder.decorator(EncodingService.builder().newDecorator());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.common.Strings;
import org.slf4j.Logger;
Expand Down Expand Up @@ -220,13 +221,21 @@ public BulkIngestResponse produceDocuments(Map<String, List<Trace.Span>> indexDo
try {
kafkaProducer.beginTransaction();
for (Trace.Span doc : indexDoc.getValue()) {

ProducerRecord<String, byte[]> producerRecord =
new ProducerRecord<>(
preprocessorConfig.getDownstreamTopic(), partition, index, doc.toByteArray());
kafkaProducer.send(producerRecord);
}
LOG.info("begin commitTransaction()");
kafkaProducer.commitTransaction();
LOG.info("end commitTransaction()");
} catch (TimeoutException te) {
LOG.error("Commit transaction timeout", te);
// the commitTransaction waits till "max.block.ms" after which it will timeout
// in that case we cannot cause abort exception because that throws the following error
// "Cannot attempt operation `abortTransaction` because the previous
// call to `commitTransaction` timed out and must be retried"
//
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// We can't recover from these exceptions, so our only option is to close the producer and
// exit.
Expand Down Expand Up @@ -268,6 +277,8 @@ private KafkaProducer<String, byte[]> createKafkaTransactionProducer(String tran
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("transactional.id", transactionId);
props.put("max.block.ms", "25000");
props.put("compression.type", "snappy");
props.put("linger.ms", 250);
return new KafkaProducer<>(props);
}
Expand Down

0 comments on commit c912f4c

Please sign in to comment.