diff --git a/config/config.yaml b/config/config.yaml index 3b259c76bf..7d8d71120d 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -128,7 +128,7 @@ preprocessorConfig: serverConfig: serverPort: ${KALDB_PREPROCESSOR_SERVER_PORT:-8086} serverAddress: ${KALDB_PREPROCESSOR_SERVER_ADDRESS:-localhost} - requestTimeoutMs: ${KALDB_PREPROCESSOR_REQUEST_TIMEOUT_MS:-5000} + requestTimeoutMs: ${KALDB_PREPROCESSOR_REQUEST_TIMEOUT_MS:-30000} upstreamTopics: [${KAKFA_UPSTREAM_TOPICS:-test-topic-in}] downstreamTopic: ${KAKFA_DOWNSTREAM_TOPIC:-test-topic} preprocessorInstanceCount: ${PREPROCESSOR_INSTANCE_COUNT:-1} diff --git a/kaldb/src/main/java/com/slack/kaldb/server/OpenSearchBulkIngestApi.java b/kaldb/src/main/java/com/slack/kaldb/server/OpenSearchBulkIngestApi.java index 7d81997f1c..15c7de67fd 100644 --- a/kaldb/src/main/java/com/slack/kaldb/server/OpenSearchBulkIngestApi.java +++ b/kaldb/src/main/java/com/slack/kaldb/server/OpenSearchBulkIngestApi.java @@ -11,8 +11,15 @@ import static com.slack.kaldb.preprocessor.PreprocessorService.filterValidDatasetMetadata; import static com.slack.kaldb.preprocessor.PreprocessorService.sortDatasetsOnThroughput; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.MapperFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.json.JsonMapper; import com.google.common.util.concurrent.AbstractService; import com.linecorp.armeria.common.HttpResponse; +import com.linecorp.armeria.common.HttpStatus; +import com.linecorp.armeria.common.MediaType; import com.linecorp.armeria.server.annotation.Blocking; import com.linecorp.armeria.server.annotation.Post; import com.slack.kaldb.elasticsearchApi.BulkIngestResponse; @@ -39,6 +46,7 @@ import org.apache.kafka.common.errors.AuthorizationException; import org.apache.kafka.common.errors.OutOfOrderSequenceException; import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.errors.TimeoutException; import org.opensearch.action.index.IndexRequest; import org.opensearch.common.Strings; import org.slf4j.Logger; @@ -69,6 +77,14 @@ public class OpenSearchBulkIngestApi extends AbstractService { private final ReentrantLock lockTransactionalProducer = new ReentrantLock(); + private static final ObjectMapper objectMapper = + JsonMapper.builder() + // sort alphabetically for easier test asserts + .configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, true) + // don't serialize null values or empty maps + .serializationInclusion(JsonInclude.Include.NON_EMPTY) + .build(); + @Override protected void doStart() { try { @@ -186,11 +202,22 @@ public HttpResponse addDocument(String bulkRequest) { } } BulkIngestResponse response = produceDocuments(docs); - return HttpResponse.ofJson(response); + LOG.info("Return response "); + return HttpResponse.of( + HttpStatus.OK, MediaType.JSON_UTF_8, objectMapper.writeValueAsString(response)); + /// return HttpResponse.ofJson(response); } catch (Exception e) { LOG.error("Request failed ", e); - BulkIngestResponse response = new BulkIngestResponse(0, 0, e.getMessage()); - return HttpResponse.ofJson(INTERNAL_SERVER_ERROR, response); + try { + BulkIngestResponse response = new BulkIngestResponse(0, 0, e.getMessage()); + return HttpResponse.of( + HttpStatus.INTERNAL_SERVER_ERROR, + MediaType.JSON_UTF_8, + objectMapper.writeValueAsString(response)); + } catch (JsonProcessingException ex) { + return HttpResponse.of(HttpStatus.INTERNAL_SERVER_ERROR, MediaType.JSON_UTF_8, "{}"); + } + // return HttpResponse.ofJson(INTERNAL_SERVER_ERROR, response); } } @@ -219,14 +246,23 @@ public BulkIngestResponse produceDocuments(Map> indexDo lockTransactionalProducer.lock(); try { kafkaProducer.beginTransaction(); + LOG.info("calling beginTransaction()"); for (Trace.Span doc : indexDoc.getValue()) { - ProducerRecord producerRecord = new ProducerRecord<>( preprocessorConfig.getDownstreamTopic(), partition, index, doc.toByteArray()); kafkaProducer.send(producerRecord); } + LOG.info("begin commitTransaction()"); kafkaProducer.commitTransaction(); + LOG.info("end commitTransaction()"); + } catch (TimeoutException te) { + LOG.error("Commit transaction timeout", te); + // the commitTransaction waits till "max.block.ms" after which it will timeout + // in that case we cannot cause abort exception because that throws the following error + // "Cannot attempt operation `abortTransaction` because the previous + // call to `commitTransaction` timed out and must be retried" + // } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) { // We can't recover from these exceptions, so our only option is to close the producer and // exit. @@ -268,6 +304,8 @@ private KafkaProducer createKafkaTransactionProducer(String tran props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); props.put("transactional.id", transactionId); + props.put("max.block.ms", "25000"); + props.put("compression.type", "snappy"); props.put("linger.ms", 250); return new KafkaProducer<>(props); }