Skip to content

Commit

Permalink
add kafka producer settings
Browse files Browse the repository at this point in the history
  • Loading branch information
vthacker committed Oct 26, 2023
1 parent 014da40 commit 8f964a8
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 5 deletions.
2 changes: 1 addition & 1 deletion config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ preprocessorConfig:
serverConfig:
serverPort: ${KALDB_PREPROCESSOR_SERVER_PORT:-8086}
serverAddress: ${KALDB_PREPROCESSOR_SERVER_ADDRESS:-localhost}
requestTimeoutMs: ${KALDB_PREPROCESSOR_REQUEST_TIMEOUT_MS:-5000}
requestTimeoutMs: ${KALDB_PREPROCESSOR_REQUEST_TIMEOUT_MS:-30000}
upstreamTopics: [${KAKFA_UPSTREAM_TOPICS:-test-topic-in}]
downstreamTopic: ${KAKFA_DOWNSTREAM_TOPIC:-test-topic}
preprocessorInstanceCount: ${PREPROCESSOR_INSTANCE_COUNT:-1}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,15 @@
import static com.slack.kaldb.preprocessor.PreprocessorService.filterValidDatasetMetadata;
import static com.slack.kaldb.preprocessor.PreprocessorService.sortDatasetsOnThroughput;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.google.common.util.concurrent.AbstractService;
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.MediaType;
import com.linecorp.armeria.server.annotation.Blocking;
import com.linecorp.armeria.server.annotation.Post;
import com.slack.kaldb.elasticsearchApi.BulkIngestResponse;
Expand All @@ -39,6 +46,7 @@
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.common.Strings;
import org.slf4j.Logger;
Expand Down Expand Up @@ -69,6 +77,14 @@ public class OpenSearchBulkIngestApi extends AbstractService {

private final ReentrantLock lockTransactionalProducer = new ReentrantLock();

private static final ObjectMapper objectMapper =
JsonMapper.builder()
// sort alphabetically for easier test asserts
.configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, true)
// don't serialize null values or empty maps
.serializationInclusion(JsonInclude.Include.NON_EMPTY)
.build();

@Override
protected void doStart() {
try {
Expand Down Expand Up @@ -186,11 +202,22 @@ public HttpResponse addDocument(String bulkRequest) {
}
}
BulkIngestResponse response = produceDocuments(docs);
return HttpResponse.ofJson(response);
LOG.info("Return response ");
return HttpResponse.of(
HttpStatus.OK, MediaType.JSON_UTF_8, objectMapper.writeValueAsString(response));
/// return HttpResponse.ofJson(response);
} catch (Exception e) {
LOG.error("Request failed ", e);
BulkIngestResponse response = new BulkIngestResponse(0, 0, e.getMessage());
return HttpResponse.ofJson(INTERNAL_SERVER_ERROR, response);
try {
BulkIngestResponse response = new BulkIngestResponse(0, 0, e.getMessage());
return HttpResponse.of(
HttpStatus.INTERNAL_SERVER_ERROR,
MediaType.JSON_UTF_8,
objectMapper.writeValueAsString(response));
} catch (JsonProcessingException ex) {
return HttpResponse.of(HttpStatus.INTERNAL_SERVER_ERROR, MediaType.JSON_UTF_8, "{}");
}
// return HttpResponse.ofJson(INTERNAL_SERVER_ERROR, response);
}
}

Expand Down Expand Up @@ -219,14 +246,23 @@ public BulkIngestResponse produceDocuments(Map<String, List<Trace.Span>> indexDo
lockTransactionalProducer.lock();
try {
kafkaProducer.beginTransaction();
LOG.info("calling beginTransaction()");
for (Trace.Span doc : indexDoc.getValue()) {

ProducerRecord<String, byte[]> producerRecord =
new ProducerRecord<>(
preprocessorConfig.getDownstreamTopic(), partition, index, doc.toByteArray());
kafkaProducer.send(producerRecord);
}
LOG.info("begin commitTransaction()");
kafkaProducer.commitTransaction();
LOG.info("end commitTransaction()");
} catch (TimeoutException te) {
LOG.error("Commit transaction timeout", te);
// the commitTransaction waits till "max.block.ms" after which it will timeout
// in that case we cannot cause abort exception because that throws the following error
// "Cannot attempt operation `abortTransaction` because the previous
// call to `commitTransaction` timed out and must be retried"
//
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// We can't recover from these exceptions, so our only option is to close the producer and
// exit.
Expand Down Expand Up @@ -268,6 +304,8 @@ private KafkaProducer<String, byte[]> createKafkaTransactionProducer(String tran
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("transactional.id", transactionId);
props.put("max.block.ms", "25000");
props.put("compression.type", "snappy");
props.put("linger.ms", 250);
return new KafkaProducer<>(props);
}
Expand Down

0 comments on commit 8f964a8

Please sign in to comment.