diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java index 10bc93770c..2775902637 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java @@ -21,20 +21,22 @@ import dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcher; import dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcherListener; import dev.knative.eventing.kafka.broker.dispatcher.ResponseHandler; +import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.KafkaConsumerRecordUtils; import io.cloudevents.CloudEvent; +import io.cloudevents.kafka.CloudEventDeserializer; import io.vertx.core.Context; import io.vertx.core.Future; import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.kafka.client.common.tracing.ConsumerTracer; import io.vertx.kafka.client.consumer.KafkaConsumerRecord; +import io.vertx.kafka.client.consumer.impl.KafkaConsumerRecordImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Objects; import java.util.function.Function; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import static dev.knative.eventing.kafka.broker.core.utils.Logging.keyValue; /** @@ -47,6 +49,8 @@ public class RecordDispatcherImpl implements RecordDispatcher { private static final Logger logger = LoggerFactory.getLogger(RecordDispatcherImpl.class); + private static final CloudEventDeserializer cloudEventDeserializer = new CloudEventDeserializer(); + private final Filter filter; private final Function, Future> subscriberSender; private final Function, Future> dlsSender; @@ -92,8 +96,6 @@ public RecordDispatcherImpl( */ @Override public Future dispatch(KafkaConsumerRecord record) { - Promise promise = Promise.promise(); - /* That's pretty much what happens here: @@ -116,9 +118,23 @@ public Future dispatch(KafkaConsumerRecord record) { +->end<--+ */ - onRecordReceived(record, promise); - - return promise.future(); + try { + Promise promise = Promise.promise(); + onRecordReceived(maybeDeserializeValueFromHeaders(record), promise); + return promise.future(); + } catch (final Exception ex) { + // This is a fatal exception that shouldn't happen in normal cases. + // + // It might happen if folks send bad records to a topic that is + // managed by our system. + // + // So discard record if we can't deal with the record, so that we can + // make progress in the partition. + logError("Exception occurred, discarding the record", record, ex); + recordDispatcherListener.recordReceived(record); + recordDispatcherListener.recordDiscarded(record); + return Future.failedFuture(ex); + } } private void onRecordReceived(final KafkaConsumerRecord record, Promise finalProm) { @@ -192,6 +208,23 @@ private void onDeadLetterSinkFailure(final KafkaConsumerRecord maybeDeserializeValueFromHeaders(KafkaConsumerRecord record) { + if (record.value() != null) { + return record; + } + // A valid CloudEvent in the CE binary protocol binding of Kafka + // might be composed by only Headers. + // + // KafkaConsumer doesn't call the deserializer if the value + // is null. + // + // That means that we get a record with a null value and some CE + // headers even though the record is a valid CloudEvent. + logDebug("Value is null", record); + final var value = cloudEventDeserializer.deserialize(record.record().topic(), record.record().headers(), null); + return new KafkaConsumerRecordImpl<>(KafkaConsumerRecordUtils.copyRecordAssigningValue(record.record(), value)); + } + private static Function, Future> composeSenderAndSinkHandler( CloudEventSender sender, ResponseHandler sinkHandler, String senderType) { return rec -> sender.send(rec.value()) diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/KafkaConsumerRecordUtils.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/KafkaConsumerRecordUtils.java new file mode 100644 index 0000000000..e0adfc0925 --- /dev/null +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/KafkaConsumerRecordUtils.java @@ -0,0 +1,43 @@ +/* + * Copyright © 2018 Knative Authors (knative-dev@googlegroups.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.knative.eventing.kafka.broker.dispatcher.impl.consumer; + +import io.cloudevents.CloudEvent; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +public final class KafkaConsumerRecordUtils { + + private KafkaConsumerRecordUtils() { + } + + public static ConsumerRecord copyRecordAssigningValue(final ConsumerRecord record, + final CloudEvent value) { + return new ConsumerRecord<>( + record.topic(), + record.partition(), + record.offset(), + record.timestamp(), + record.timestampType(), + record.checksum(), + record.serializedKeySize(), + record.serializedValueSize(), + record.key(), + value, + record.headers(), + record.leaderEpoch() + ); + } +} diff --git a/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/DataPlaneTest.java b/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/DataPlaneTest.java index 9e9ccc5351..474eba7d5b 100644 --- a/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/DataPlaneTest.java +++ b/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/DataPlaneTest.java @@ -57,6 +57,7 @@ import java.io.File; import java.io.IOException; import java.net.URI; +import java.util.List; import java.util.Properties; import java.util.UUID; import java.util.concurrent.CountDownLatch; @@ -117,35 +118,40 @@ public static void setUp(final Vertx vertx, final VertxTestContext context) thro /* 1: event sent by the source to the Broker - 2: event sent by the service in the response + 2: event sent by the trigger 1 in the response + 3: event sent by the trigger 2 in the response 2 +----------------------+ | | | +-----+-----+ | 1 | | | +---------->+ Trigger 1 | - v | | | + v | 3 | | +------------+ +-------------+ +-------+----+----+ +-----------+ | | 1 | | 2 | | | HTTPClient +--------->+ Receiver | +--------+ Dispatcher | | | | | | | | +------------+ +------+------+ | +--------+---+----+ +-----------+ - | | ^ | | | + | | ^ | 3 | | | v | +---------->+ Trigger 2 | 1 | +--------+--------+ | 2 | | | | | 1 | +-----------+ +----->+ Kafka +--------+ | | 2 +-----------+ - +-----------------+ | | + +-----------------+ 3 | | | Trigger 3 | | | +-----------+ + + + + */ @Test @Timeout(timeUnit = TimeUnit.MINUTES, value = 1) - public void execute(final Vertx vertx, final VertxTestContext context) { + public void execute(final Vertx vertx, final VertxTestContext context) throws InterruptedException { - final var checkpoints = context.checkpoint(3); + final var checkpoints = context.checkpoint(4); // event sent by the source to the Broker (see 1 in diagram) final var expectedRequestEvent = CloudEventBuilder.v1() @@ -158,7 +164,7 @@ public void execute(final Vertx vertx, final VertxTestContext context) { .build(); // event sent in the response by the Callable service (see 2 in diagram) - final var expectedResponseEvent = CloudEventBuilder.v03() + final var expectedResponseEventService2 = CloudEventBuilder.v03() .withId(UUID.randomUUID().toString()) .withDataSchema(URI.create("/api/data-schema-ce-2")) .withSubject("subject-ce-2") @@ -167,6 +173,20 @@ public void execute(final Vertx vertx, final VertxTestContext context) { .withType(TYPE_CE_2) .build(); + // event sent in the response by the Callable service 2 (see 3 in diagram) + final var expectedResponseEventService1 = CloudEventBuilder.v1() + .withId(UUID.randomUUID().toString()) + .withDataSchema(URI.create("/api/data-schema-ce-3")) + .withSource(URI.create("/api/rossi")) + .withSubject("subject-ce-3") + .withType(TYPE_CE_1) + .build(); + + final var service1ExpectedEventsIterator = List.of( + expectedRequestEvent, + expectedResponseEventService1 + ).iterator(); + final var resource = DataPlaneContract.Resource.newBuilder() .addTopics(TOPIC) .setIngress(DataPlaneContract.Ingress.newBuilder().setPath(format("/%s/%s", BROKER_NAMESPACE, BROKER_NAME))) @@ -207,9 +227,13 @@ public void execute(final Vertx vertx, final VertxTestContext context) { new ContractPublisher(vertx.eventBus(), ResourcesReconcilerMessageHandler.ADDRESS) .accept(DataPlaneContract.Contract.newBuilder().addResources(resource).build()); - await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> assertThat(vertx.deploymentIDs()) + await() + .atMost(10, TimeUnit.SECONDS) + .untilAsserted(() -> assertThat(vertx.deploymentIDs()) .hasSize(resource.getEgressesCount() + NUM_RESOURCES + NUM_SYSTEM_VERTICLES)); + Thread.sleep(2000); // Give consumers time to start + // start service vertx.createHttpServer() .exceptionHandler(context::failNow) @@ -221,22 +245,29 @@ public void execute(final Vertx vertx, final VertxTestContext context) { // service 1 receives event sent by the HTTPClient if (request.path().equals(PATH_SERVICE_1)) { + final var expectedEvent = service1ExpectedEventsIterator.next(); context.verify(() -> { - assertThat(event).isEqualTo(expectedRequestEvent); + assertThat(event).isEqualTo(expectedEvent); checkpoints.flag(); // 2 }); - // write event to the response, the event will be handled by service 2 - VertxMessageFactory.createWriter(request.response()) - .writeBinary(expectedResponseEvent); + if (service1ExpectedEventsIterator.hasNext()) { + // write event to the response, the event will be handled by service 2 + VertxMessageFactory.createWriter(request.response()) + .writeBinary(expectedResponseEventService2); + } } // service 2 receives event in the response if (request.path().equals(PATH_SERVICE_2)) { context.verify(() -> { - assertThat(event).isEqualTo(expectedResponseEvent); + assertThat(event).isEqualTo(expectedResponseEventService2); checkpoints.flag(); // 3 }); + + // write event to the response, the event will be handled by service 2 + VertxMessageFactory.createWriter(request.response()) + .writeBinary(expectedResponseEventService1); } if (request.path().equals(PATH_SERVICE_3)) {