Skip to content

Commit

Permalink
add kafka producer settings
Browse files Browse the repository at this point in the history
  • Loading branch information
vthacker committed Oct 27, 2023
1 parent 014da40 commit 0020cde
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 8 deletions.
2 changes: 1 addition & 1 deletion config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ preprocessorConfig:
serverConfig:
serverPort: ${KALDB_PREPROCESSOR_SERVER_PORT:-8086}
serverAddress: ${KALDB_PREPROCESSOR_SERVER_ADDRESS:-localhost}
requestTimeoutMs: ${KALDB_PREPROCESSOR_REQUEST_TIMEOUT_MS:-5000}
requestTimeoutMs: ${KALDB_PREPROCESSOR_REQUEST_TIMEOUT_MS:-30000}
upstreamTopics: [${KAKFA_UPSTREAM_TOPICS:-test-topic-in}]
downstreamTopic: ${KAKFA_DOWNSTREAM_TOPIC:-test-topic}
preprocessorInstanceCount: ${PREPROCESSOR_INSTANCE_COUNT:-1}
Expand Down
2 changes: 1 addition & 1 deletion kaldb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<http.core.version>4.4.16</http.core.version>
<protobuf.version>3.23.2</protobuf.version>
<grpc.version>1.57.2</grpc.version>
<micrometer.version>1.11.0</micrometer.version>
<micrometer.version>1.11.5</micrometer.version>
<armeria.version>1.25.2</armeria.version>
<kafka.version>3.5.0</kafka.version>
<jackson.version>2.15.2</jackson.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,11 @@ public Builder(int port, String serviceName, PrometheusMeterRegistry prometheusM
this.serviceName = serviceName;
this.serverBuilder = Server.builder().http(port);

initializeLimits();
initializeCompression();
initializeLogging();
initializeManagementEndpoints(prometheusMeterRegistry);
}

private void initializeLimits() {
serverBuilder.maxNumConnections(MAX_CONNECTIONS);
}

private void initializeCompression() {
serverBuilder.decorator(EncodingService.builder().newDecorator());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.slack.kaldb.util.RuntimeHalterImpl;
import com.slack.service.murron.trace.Trace;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics;
import io.micrometer.prometheus.PrometheusMeterRegistry;
import java.util.List;
import java.util.Map;
Expand All @@ -39,6 +40,7 @@
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.common.Strings;
import org.slf4j.Logger;
Expand Down Expand Up @@ -66,6 +68,7 @@ public class OpenSearchBulkIngestApi extends AbstractService {
private final Timer configReloadTimer;

private final KafkaProducer kafkaProducer;
private final KafkaClientMetrics kafkaMetrics;

private final ReentrantLock lockTransactionalProducer = new ReentrantLock();

Expand All @@ -88,6 +91,9 @@ protected void doStop() {
LOG.info("Stopping OpenSearchBulkIngestApi service");
datasetMetadataStore.removeListener(datasetListener);
kafkaProducer.close();
if (kafkaMetrics != null) {
kafkaMetrics.close();
}
LOG.info("OpenSearchBulkIngestApi service closed");
notifyStopped();
} catch (Throwable t) {
Expand Down Expand Up @@ -154,6 +160,8 @@ public OpenSearchBulkIngestApi(
// consumer sets isolation.level as "read_committed"
// see "zombie fencing" https://www.confluent.io/blog/transactions-apache-kafka/
this.kafkaProducer = createKafkaTransactionProducer(UUID.randomUUID().toString());
kafkaMetrics = new KafkaClientMetrics(kafkaProducer);
kafkaMetrics.bindTo(meterRegistry);
this.kafkaProducer.initTransactions();
}

Expand Down Expand Up @@ -220,13 +228,24 @@ public BulkIngestResponse produceDocuments(Map<String, List<Trace.Span>> indexDo
try {
kafkaProducer.beginTransaction();
for (Trace.Span doc : indexDoc.getValue()) {

ProducerRecord<String, byte[]> producerRecord =
new ProducerRecord<>(
preprocessorConfig.getDownstreamTopic(), partition, index, doc.toByteArray());
kafkaProducer.send(producerRecord);
}
kafkaProducer.commitTransaction();
} catch (TimeoutException te) {
LOG.error("Commit transaction timeout", te);
new RuntimeHalterImpl()
.handleFatal(
new Throwable(
"KafkaProducer needs to shutdown as we don't have retry yet and we cannot call abortTxn on timeout",
te));
// the commitTransaction waits till "max.block.ms" after which it will timeout
// in that case we cannot cause abort exception because that throws the following error
// "Cannot attempt operation `abortTransaction` because the previous
// call to `commitTransaction` timed out and must be retried"
//
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// We can't recover from these exceptions, so our only option is to close the producer and
// exit.
Expand Down Expand Up @@ -268,6 +287,8 @@ private KafkaProducer<String, byte[]> createKafkaTransactionProducer(String tran
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("transactional.id", transactionId);
props.put("max.block.ms", "25000");
props.put("compression.type", "snappy");
props.put("linger.ms", 250);
return new KafkaProducer<>(props);
}
Expand Down

0 comments on commit 0020cde

Please sign in to comment.