diff --git a/docs/getting-started.adoc b/docs/getting-started.adoc index 9db9029a..637e3105 100644 --- a/docs/getting-started.adoc +++ b/docs/getting-started.adoc @@ -351,8 +351,6 @@ By leveraging Decaton's deferred completion and async-client of your middleware Now you know the basics and ready to start implementing Decaton apps! -If you're attempting to consume existing topic which contains records in schema other than Decaton's task protocol, or maybe you want to use task schema that can be understandable even for non-decaton consumers. In case visit link:./consuming-any-data.adoc[Consuming Arbitrary Topic] to see how. - For those thinking to run Decaton on production, link:./monitoring.adoc[Monitoring] might helps to always ensure your Decaton processors doing good. If you're using link:https://spring.io/[Spring] for running your applications, you might wanna take a look at link:./spring-integration.adoc[Spring Integration]. diff --git a/docs/index.adoc b/docs/index.adoc index 213fec50..4be1abc6 100644 --- a/docs/index.adoc +++ b/docs/index.adoc @@ -8,7 +8,7 @@ Decaton Documents - link:./runtime.adoc[Subpartition Runtime] - link:./spring-integration.adoc[Spring Integration] - link:./tracing.adoc[Tracing] -- link:./consuming-any-data.adoc[Use Decaton for consuming topics of non-Decaton tasks] +- link:./task-extractor.adoc[Implement custom task extractor] - link:./dynamic-property-configuration.adoc[Dynamic property configuration for the processor] - link:./monitoring.adoc[Monitoring Decaton] - Features diff --git a/docs/consuming-any-topic.adoc b/docs/task-extractor.adoc similarity index 68% rename from docs/consuming-any-topic.adoc rename to docs/task-extractor.adoc index fba0ec2b..88f4dd07 100644 --- a/docs/consuming-any-topic.adoc +++ b/docs/task-extractor.adoc @@ -1,16 +1,33 @@ -Consuming Arbitrary Topic -========================= +TaskExtractor +============= :base_version: 9.0.0 :modules: common,protocol,processor -This document guides you how to consume and process topics containing records not produced by DecatonClient using Decaton processors. +[NOTE] +==== +From Decaton 9.0.0, you can just use link:../processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java[ProcessorsBuilder#consuming(String topic, Deserializer deserializer)] to consume arbitrary topics, +without worrying if the topic is produced by DecatonClient or not. + +You may need to read through this guide *ONLY* in specific situations described just below. +==== + +Decaton provides two ways to consume topics in link:../processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java[ProcessorsBuilder]: + +* `ProcessorsBuilder#consuming(String topic, Deserializer deserializer)` +* `ProcessorsBuilder#consuming(String topic, TaskExtractor deserializer)` -By default, Decaton assumes messages are produced `DecatonClient`, where task metadata are stored as link:../protocol/src/main/proto/decaton.proto[TaskMetadataProto] in record headers. -But Decaton has the capability to consume arbitrary topics other than topics produced by `DecatonClient`. +As you may have learned through link:./getting-started.adoc[Getting Started], former is the most common and convenient way to consume topics, where you can just pass a value deserializer. -This means you can use Decaton as a drop-in replacement for a vanilla KafkaConsumer to leverage powerful features like deferred completion, delayed processing and so on. +However, sometimes you may need to apply custom logic to extract a task from raw consumed messages: -Through this guide, we assume the topic is JSON-serialized and use link:https://github.com/FasterXML/jackson-databind[jackson-databind] for deserialization, but it's trivial to consume arbitrary formats other than JSON. +* You need to extract custom task metadata on consumption. (e.g. Set `scheduledTimeMillis` for delayed processing) +* You need to access additional information (e.g. record headers) for deserialization + +This is where latter way with `TaskExtractor` comes in. + +This guide will show you how to implement `TaskExtractor` and use it. + +Through this guide, we assume the topic is JSON-serialized and use link:https://github.com/FasterXML/jackson-databind[jackson-databind] for deserialization. == TaskExtractor @@ -29,6 +46,7 @@ public class JSONUserEventExtractor implements TaskExtractor { TaskMetadata metadata = TaskMetadata.builder() // Filling timestampMillis is not mandatory, but it would be useful // when you monitor delivery latency between event production time and event processing time. + // Also, this will be used for scheduling tasks when scheduledTimeMillis is set. .timestampMillis(event.getEventTimestampMillis()) // This field is not mandatory too, but you can track which application produced the task by filling this. .sourceApplicationId("event-tracker") @@ -66,7 +84,7 @@ public class UserEventProcessor implements DecatonProcessor { } ---- -As you can see, once you implement TaskExtractor, the implementation of the DecatonProcessor can be done as when you consume a regular Decaton topic. +As you can see, there's no difference the implementation of the DecatonProcessor from the case where you use `Deserializer`. Lastly, you need to instantiate link:../processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorSubscription.java[ProcessorSubscription] as follows. @@ -84,11 +102,9 @@ ProcessorsBuilder.consuming( ... ---- -You have to pass TaskExtractor which you implemented above instead of link:../common/src/main/java/com/linecorp/decaton/common/Deserializer.java[Deseiralizer]. - == Run Example -Now we are ready to process a JSON topic. +Now we are ready to process a JSON topic with custom task extraction logic. Before trying out, let's download and extract the kafka binary from https://kafka.apache.org/downloads to use `kafka-console-producer.sh`. diff --git a/processor/src/it/java/com/linecorp/decaton/processor/RetryQueueingTest.java b/processor/src/it/java/com/linecorp/decaton/processor/RetryQueueingTest.java index 06bd2dca..84bab2c1 100644 --- a/processor/src/it/java/com/linecorp/decaton/processor/RetryQueueingTest.java +++ b/processor/src/it/java/com/linecorp/decaton/processor/RetryQueueingTest.java @@ -258,16 +258,18 @@ public void testRetryQueueingFromCompletionTimeoutCallback() throws Exception { @Test @Timeout(60) public void testRetryQueueingMigrateToHeader() throws Exception { - DynamicProperty metadataAsHeader = - new DynamicProperty<>(ProcessorProperties.CONFIG_TASK_METADATA_AS_HEADER); - metadataAsHeader.set(false); + DynamicProperty retryTaskInLegacyFormat = + new DynamicProperty<>(ProcessorProperties.CONFIG_RETRY_TASK_IN_LEGACY_FORMAT); + retryTaskInLegacyFormat.set(true); AtomicInteger processCount = new AtomicInteger(0); CountDownLatch migrationLatch = new CountDownLatch(1); ProcessorTestSuite .builder(rule) .numTasks(100) - .propertySupplier(StaticPropertySupplier.of(metadataAsHeader)) + .propertySupplier(StaticPropertySupplier.of( + retryTaskInLegacyFormat, + Property.ofStatic(ProcessorProperties.CONFIG_LEGACY_PARSE_FALLBACK_ENABLED, true))) .produceTasksWithHeaderMetadata(false) .configureProcessorsBuilder(builder -> builder.thenProcess((ctx, task) -> { if (ctx.metadata().retryCount() == 0) { @@ -276,7 +278,7 @@ public void testRetryQueueingMigrateToHeader() throws Exception { if (cnt < 50) { ctx.retry(); } else if (cnt == 50) { - metadataAsHeader.set(true); + retryTaskInLegacyFormat.set(true); migrationLatch.countDown(); ctx.retry(); } else { diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/ConsumedRecord.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/ConsumedRecord.java index 590386b3..83a5a7e3 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/ConsumedRecord.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/ConsumedRecord.java @@ -29,6 +29,11 @@ @Builder @Accessors(fluent = true) public class ConsumedRecord { + /** + * The timestamp of the record + */ + long recordTimestampMillis; + /** * Headers of the record */ diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorProperties.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorProperties.java index b71d047c..ca65fa32 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorProperties.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorProperties.java @@ -24,6 +24,7 @@ import org.slf4j.MDC; +import com.linecorp.decaton.common.Deserializer; import com.linecorp.decaton.processor.Completion; import com.linecorp.decaton.processor.DecatonProcessor; import com.linecorp.decaton.processor.ProcessingContext; @@ -226,17 +227,31 @@ public class ProcessorProperties extends AbstractDecatonProperties { Long.MAX_VALUE, v -> v instanceof Long && (Long) v >= 0); /** - * Controls whether to produce retry tasks with task metadata as headers, instead of as deprecated - * {@link DecatonTaskRequest} format. + * Controls whether to produce retry tasks in deprecated {@link DecatonTaskRequest} format. *

- * CAUTION!!! YOU MAY NEED TO SET THIS TO FALSE WHEN YOU UPGRADE FROM 8.0.1 OR EARLIER + * CAUTION!!! YOU MAY NEED TO SET THIS TO TRUE WHEN YOU UPGRADE FROM 8.0.1 OR EARLIER *

* Please read Decaton 9.0.0 Release Note carefully. *

* Reloadable: yes */ - public static final PropertyDefinition CONFIG_TASK_METADATA_AS_HEADER = - PropertyDefinition.define("decaton.task.metadata.as.header", Boolean.class, true, + public static final PropertyDefinition CONFIG_RETRY_TASK_IN_LEGACY_FORMAT = + PropertyDefinition.define("decaton.retry.task.in.legacy.format", Boolean.class, false, + v -> v instanceof Boolean); + + /** + * Controls whether to parse records as {@link DecatonTaskRequest} format when task metadata header is missing + * when {@link Deserializer} is used, instead of parsing task directly with the deserializer and + * fill reasonably-default task metadata. + *

+ * CAUTION!!! YOU MAY NEED TO SET THIS TO TRUE WHEN YOU UPGRADE FROM 8.0.1 OR EARLIER + *

+ * Please read Decaton 9.0.0 Release Note carefully. + *

+ * Reloadable: yes + */ + public static final PropertyDefinition CONFIG_LEGACY_PARSE_FALLBACK_ENABLED = + PropertyDefinition.define("decaton.legacy.parse.fallback.enabled", Boolean.class, false, v -> v instanceof Boolean); public static final List> PROPERTY_DEFINITIONS = @@ -253,7 +268,8 @@ public class ProcessorProperties extends AbstractDecatonProperties { CONFIG_DEFERRED_COMPLETE_TIMEOUT_MS, CONFIG_PROCESSOR_THREADS_TERMINATION_TIMEOUT_MS, CONFIG_PER_KEY_QUOTA_PROCESSING_RATE, - CONFIG_TASK_METADATA_AS_HEADER)); + CONFIG_RETRY_TASK_IN_LEGACY_FORMAT, + CONFIG_LEGACY_PARSE_FALLBACK_ENABLED)); /** * Find and return a {@link PropertyDefinition} from its name. diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java index b519ac63..fd7af07c 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java @@ -22,12 +22,12 @@ import com.linecorp.decaton.common.Deserializer; import com.linecorp.decaton.processor.DecatonProcessor; +import com.linecorp.decaton.processor.TaskMetadata; import com.linecorp.decaton.processor.runtime.internal.DecatonProcessorSupplierImpl; import com.linecorp.decaton.processor.runtime.internal.DefaultTaskExtractor; import com.linecorp.decaton.processor.runtime.internal.Processors; import lombok.Getter; -import lombok.RequiredArgsConstructor; import lombok.experimental.Accessors; /** @@ -39,29 +39,34 @@ public class ProcessorsBuilder { @Getter private final String topic; - private final TaskExtractor taskExtractor; - private final TaskExtractor retryTaskExtractor; + private final Deserializer userSuppliedDeserializer; + private final TaskExtractor userSuppliedTaskExtractor; private final List> suppliers; - public ProcessorsBuilder(String topic, TaskExtractor taskExtractor, TaskExtractor retryTaskExtractor) { + ProcessorsBuilder(String topic, Deserializer userSuppliedDeserializer, TaskExtractor userSuppliedTaskExtractor) { this.topic = topic; - this.taskExtractor = taskExtractor; - this.retryTaskExtractor = retryTaskExtractor; + this.userSuppliedDeserializer = userSuppliedDeserializer; + this.userSuppliedTaskExtractor = userSuppliedTaskExtractor; suppliers = new ArrayList<>(); } /** * Create new {@link ProcessorsBuilder} that consumes message from topic expecting tasks of type - * which can be parsed by valueParser. + * which can be parsed by deserializer. + *

+ * From Decaton 9.0.0, you can use this overload to consume tasks from arbitrary topics not only + * topics that are produced by DecatonClient. + *

+ * If you want to extract custom {@link TaskMetadata} (e.g. for delayed processing), you can use + * {@link #consuming(String, TaskExtractor)} instead. * @param topic the name of topic to consume. * @param deserializer the deserializer to instantiate task of type {@link T} from serialized bytes. * @param the type of instantiated tasks. * @return an instance of {@link ProcessorsBuilder}. */ public static ProcessorsBuilder consuming(String topic, Deserializer deserializer) { - DefaultTaskExtractor taskExtractor = new DefaultTaskExtractor<>(deserializer); - return new ProcessorsBuilder<>(topic, taskExtractor, taskExtractor); + return new ProcessorsBuilder<>(topic, deserializer, null); } /** @@ -73,7 +78,7 @@ public static ProcessorsBuilder consuming(String topic, Deserializer d * @return an instance of {@link ProcessorsBuilder}. */ public static ProcessorsBuilder consuming(String topic, TaskExtractor taskExtractor) { - return new ProcessorsBuilder<>(topic, taskExtractor, new RetryTaskExtractor<>(taskExtractor)); + return new ProcessorsBuilder<>(topic, null, taskExtractor); } /** @@ -116,15 +121,36 @@ public ProcessorsBuilder thenProcess(DecatonProcessor processor) { return thenProcess(new DecatonProcessorSupplierImpl<>(() -> processor, ProcessorScope.PROVIDED)); } - Processors build(DecatonProcessorSupplier retryProcessorSupplier) { + Processors build(DecatonProcessorSupplier retryProcessorSupplier, ProcessorProperties properties) { + Property legacyFallbackEnabledProperty = properties.get(ProcessorProperties.CONFIG_LEGACY_PARSE_FALLBACK_ENABLED); + + final TaskExtractor taskExtractor; + final TaskExtractor retryTaskExtractor; + + // consuming(String, Deserializer) is used + if (userSuppliedDeserializer != null) { + DefaultTaskExtractor extractor = new DefaultTaskExtractor<>(userSuppliedDeserializer, legacyFallbackEnabledProperty); + taskExtractor = extractor; + retryTaskExtractor = extractor; + } else { + // consuming(String, TaskExtractor) is used + taskExtractor = userSuppliedTaskExtractor; + retryTaskExtractor = new RetryTaskExtractor<>(legacyFallbackEnabledProperty, userSuppliedTaskExtractor); + } + return new Processors<>(suppliers, retryProcessorSupplier, taskExtractor, retryTaskExtractor); } - @RequiredArgsConstructor private static class RetryTaskExtractor implements TaskExtractor { - private final DefaultTaskExtractor outerExtractor = new DefaultTaskExtractor<>(bytes -> bytes); + private final DefaultTaskExtractor outerExtractor; private final TaskExtractor innerExtractor; + RetryTaskExtractor(Property legacyFallbackEnabledProperty, + TaskExtractor innerExtractor) { + this.innerExtractor = innerExtractor; + this.outerExtractor = new DefaultTaskExtractor<>(bytes -> bytes, legacyFallbackEnabledProperty); + } + @Override public DecatonTask extract(ConsumedRecord record) { // Retry tasks might be stored in retry-topic in DecatonTaskRequest format depending on @@ -134,6 +160,7 @@ public DecatonTask extract(ConsumedRecord record) { DecatonTask outerTask = outerExtractor.extract(record); ConsumedRecord inner = ConsumedRecord .builder() + .recordTimestampMillis(record.recordTimestampMillis()) .headers(record.headers()) .key(record.key()) .value(outerTask.taskDataBytes()) diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/SubscriptionBuilder.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/SubscriptionBuilder.java index 7b68be0c..81af148b 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/SubscriptionBuilder.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/SubscriptionBuilder.java @@ -263,7 +263,7 @@ public ProcessorSubscription build() { return new ProcessorSubscription(scope, consumerSupplier.get(), quotaApplier(scope), - processorsBuilder.build(maybeRetryProcessorSupplier(scope)), + processorsBuilder.build(maybeRetryProcessorSupplier(scope), props), props, stateListener); } diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/DecatonTaskRetryQueueingProcessor.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/DecatonTaskRetryQueueingProcessor.java index 1535145a..a2a8211a 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/DecatonTaskRetryQueueingProcessor.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/DecatonTaskRetryQueueingProcessor.java @@ -45,14 +45,14 @@ public class DecatonTaskRetryQueueingProcessor implements DecatonProcessor metadataAsHeaderProperty; + private final Property retryTaskInLegacyFormatProperty; public DecatonTaskRetryQueueingProcessor(SubscriptionScope scope, DecatonTaskProducer producer) { RetryConfig retryConfig = scope.retryConfig().get(); // This won't be instantiated unless it present this.producer = producer; backoff = retryConfig.backoff(); retryTopic = scope.retryTopic().get(); // This won't be instantiated unless it present - metadataAsHeaderProperty = scope.props().get(ProcessorProperties.CONFIG_TASK_METADATA_AS_HEADER); + retryTaskInLegacyFormatProperty = scope.props().get(ProcessorProperties.CONFIG_RETRY_TASK_IN_LEGACY_FORMAT); metrics = Metrics.withTags("subscription", scope.subscriptionId()).new RetryMetrics(); } @@ -70,15 +70,7 @@ public void process(ProcessingContext context, byte[] serializedTask) .build(); final ProducerRecord record; - if (metadataAsHeaderProperty.value()) { - record = new ProducerRecord<>( - retryTopic, - null, - context.key(), - serializedTask, - context.headers()); - TaskMetadataUtil.writeAsHeader(taskMetadata, record.headers()); - } else { + if (retryTaskInLegacyFormatProperty.value()) { DecatonTaskRequest request = DecatonTaskRequest.newBuilder() .setMetadata(taskMetadata) @@ -90,6 +82,14 @@ record = new ProducerRecord<>( context.key(), request.toByteArray(), context.headers()); + } else { + record = new ProducerRecord<>( + retryTopic, + null, + context.key(), + serializedTask, + context.headers()); + TaskMetadataUtil.writeAsHeader(taskMetadata, record.headers()); } metrics.retryTaskRetries.record(nextRetryCount); diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/DefaultTaskExtractor.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/DefaultTaskExtractor.java index 458454d0..139034cd 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/DefaultTaskExtractor.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/DefaultTaskExtractor.java @@ -22,6 +22,7 @@ import com.linecorp.decaton.client.internal.TaskMetadataUtil; import com.linecorp.decaton.processor.runtime.ConsumedRecord; import com.linecorp.decaton.processor.runtime.DecatonTask; +import com.linecorp.decaton.processor.runtime.Property; import com.linecorp.decaton.processor.runtime.TaskExtractor; import com.linecorp.decaton.processor.TaskMetadata; import com.linecorp.decaton.protocol.internal.DecatonInternal.DecatonTaskRequest; @@ -32,6 +33,7 @@ @RequiredArgsConstructor public class DefaultTaskExtractor implements TaskExtractor { private final Deserializer taskDeserializer; + private final Property legacyFallbackEnabledProperty; @Override public DecatonTask extract(ConsumedRecord record) { @@ -43,17 +45,33 @@ public DecatonTask extract(ConsumedRecord record) { taskDeserializer.deserialize(taskDataBytes), taskDataBytes); } else { - try { - DecatonTaskRequest taskRequest = DecatonTaskRequest.parseFrom(record.value()); - TaskMetadata metadata = TaskMetadata.fromProto(taskRequest.getMetadata()); - byte[] taskDataBytes = taskRequest.getSerializedTask().toByteArray(); + // There are two cases where task metadata header is missing: + // 1. The task is produced by an old producer which wraps tasks in DecatonTaskRequest proto. + // 2. The task is produced by non-DecatonClient producer. + // + // From Decaton perspective, there is no way to distinguish between these two cases, + // so we need to rely on a configuration to determine how to deserialize the task. + if (legacyFallbackEnabledProperty.value()) { + try { + DecatonTaskRequest taskRequest = DecatonTaskRequest.parseFrom(record.value()); + TaskMetadata metadata = TaskMetadata.fromProto(taskRequest.getMetadata()); + byte[] taskDataBytes = taskRequest.getSerializedTask().toByteArray(); + return new DecatonTask<>( + metadata, + taskDeserializer.deserialize(taskDataBytes), + taskDataBytes); + } catch (InvalidProtocolBufferException e) { + throw new IllegalArgumentException(e); + } + } else { + T task = taskDeserializer.deserialize(record.value()); return new DecatonTask<>( - metadata, - taskDeserializer.deserialize(taskDataBytes), - taskDataBytes); - } catch (InvalidProtocolBufferException e) { - throw new IllegalArgumentException(e); + TaskMetadata.builder() + .timestampMillis(record.recordTimestampMillis()) + .build(), + task, + record.value()); } } } diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/PartitionContext.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/PartitionContext.java index bd3f0f4a..3aff3419 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/PartitionContext.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/PartitionContext.java @@ -174,7 +174,7 @@ public void addRecord(ConsumerRecord record, QuotaApplier quotaApplier) { if (!quotaApplier.apply(record, offsetState, maybeRecordQuotaUsage(record.key()))) { TaskRequest request = new TaskRequest( - scope.topicPartition(), record.offset(), offsetState, record.key(), + record.timestamp(), scope.topicPartition(), record.offset(), offsetState, record.key(), record.headers(), traceHandle, record.value(), maybeRecordQuotaUsage(record.key())); subPartitions.addTask(request); } diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/ProcessPipeline.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/ProcessPipeline.java index 61d5265a..090c48d3 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/ProcessPipeline.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/ProcessPipeline.java @@ -127,6 +127,7 @@ DecatonTask extract(TaskRequest request) { final DecatonTask extracted; extracted = taskExtractor.extract( ConsumedRecord.builder() + .recordTimestampMillis(request.recordTimestamp()) .headers(request.headers()) .key(request.key()) .value(request.rawRequestBytes()) diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/TaskRequest.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/TaskRequest.java index 3d4b6d14..2936e12f 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/TaskRequest.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/TaskRequest.java @@ -32,6 +32,7 @@ @Accessors(fluent = true) @AllArgsConstructor public class TaskRequest { + private final long recordTimestamp; private final TopicPartition topicPartition; private final long recordOffset; private final OffsetState offsetState; diff --git a/processor/src/test/java/com/linecorp/decaton/processor/processors/CompactionProcessorTest.java b/processor/src/test/java/com/linecorp/decaton/processor/processors/CompactionProcessorTest.java index fdcd09ff..47e16dbd 100644 --- a/processor/src/test/java/com/linecorp/decaton/processor/processors/CompactionProcessorTest.java +++ b/processor/src/test/java/com/linecorp/decaton/processor/processors/CompactionProcessorTest.java @@ -100,7 +100,7 @@ private TaskInput put(DecatonProcessor processor, taskData, taskData.toByteArray()); TaskRequest request = new TaskRequest( - new TopicPartition("topic", 1), 1, null, name.getBytes(StandardCharsets.UTF_8), null, NoopTrace.INSTANCE, null, null); + 1723687072569L, new TopicPartition("topic", 1), 1, null, name.getBytes(StandardCharsets.UTF_8), null, NoopTrace.INSTANCE, null, null); ProcessingContext context = spy(new ProcessingContextImpl<>("subscription", request, task, diff --git a/processor/src/test/java/com/linecorp/decaton/processor/runtime/ProcessorSubscriptionTest.java b/processor/src/test/java/com/linecorp/decaton/processor/runtime/ProcessorSubscriptionTest.java index 86bfd396..04076617 100644 --- a/processor/src/test/java/com/linecorp/decaton/processor/runtime/ProcessorSubscriptionTest.java +++ b/processor/src/test/java/com/linecorp/decaton/processor/runtime/ProcessorSubscriptionTest.java @@ -149,7 +149,7 @@ private static ProcessorSubscription subscription(Consumer consu scope, consumer, NoopQuotaApplier.INSTANCE, - builder.build(null), + builder.build(null, scope.props()), scope.props(), listener); } @@ -280,7 +280,7 @@ public synchronized ConsumerRecords poll(Duration timeout) { (ConsumedRecord record) -> new DecatonTask<>( TaskMetadata.builder().build(), "dummy", record.value())) .thenProcess(processor) - .build(null), + .build(null, scope.props()), scope.props(), newState -> { if (newState == State.RUNNING) { @@ -357,7 +357,7 @@ public synchronized void commitSync(Map offse ctx.deferCompletion().complete(); taskCompleted.countDown(); }) - .build(null), + .build(null, scope.props()), scope.props(), null); subscription.start(); diff --git a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/DecatonTaskRetryQueueingProcessorTest.java b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/DecatonTaskRetryQueueingProcessorTest.java index 5d45733f..507b557c 100644 --- a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/DecatonTaskRetryQueueingProcessorTest.java +++ b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/DecatonTaskRetryQueueingProcessorTest.java @@ -171,8 +171,8 @@ public void testLegacyRetryTaskFormat() throws Exception { SubPartitionRuntime.THREAD_POOL, Optional.of(RetryConfig.builder().backoff(RETRY_BACKOFF).build()), Optional.empty(), ProcessorProperties.builder() - .set(Property.ofStatic(ProcessorProperties.CONFIG_TASK_METADATA_AS_HEADER, - false)) + .set(Property.ofStatic(ProcessorProperties.CONFIG_RETRY_TASK_IN_LEGACY_FORMAT, + true)) .build(), NoopTracingProvider.INSTANCE, ConsumerSupplier.DEFAULT_MAX_POLL_RECORDS, DefaultSubPartitioner::new); diff --git a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/DefaultTaskExtractorTest.java b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/DefaultTaskExtractorTest.java index 2208ec85..3e90c517 100644 --- a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/DefaultTaskExtractorTest.java +++ b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/DefaultTaskExtractorTest.java @@ -22,8 +22,11 @@ import org.apache.kafka.common.header.internals.RecordHeaders; import org.junit.jupiter.api.Test; +import com.linecorp.decaton.processor.TaskMetadata; import com.linecorp.decaton.processor.runtime.ConsumedRecord; import com.linecorp.decaton.processor.runtime.DecatonTask; +import com.linecorp.decaton.processor.runtime.ProcessorProperties; +import com.linecorp.decaton.processor.runtime.Property; import com.linecorp.decaton.protobuf.ProtocolBuffersDeserializer; import com.linecorp.decaton.protocol.internal.DecatonInternal.DecatonTaskRequest; import com.linecorp.decaton.protocol.Decaton.TaskMetadataProto; @@ -40,10 +43,12 @@ public class DefaultTaskExtractorTest { @Test public void testExtract() { DefaultTaskExtractor extractor = new DefaultTaskExtractor<>( - new ProtocolBuffersDeserializer<>(HelloTask.parser())); + new ProtocolBuffersDeserializer<>(HelloTask.parser()), + Property.ofStatic(ProcessorProperties.CONFIG_LEGACY_PARSE_FALLBACK_ENABLED, true)); ConsumedRecord record = ConsumedRecord .builder() + .recordTimestampMillis(1561709151628L) .headers(new RecordHeaders()) .value(LEGACY_REQUEST.toByteArray()) .build(); @@ -55,4 +60,28 @@ public void testExtract() { assertArrayEquals(TASK.toByteArray(), extracted.taskDataBytes()); } + + @Test + public void testExtractBypassLegacyFormatWhenHeaderMissing() { + DefaultTaskExtractor extractor = new DefaultTaskExtractor<>( + new ProtocolBuffersDeserializer<>(HelloTask.parser()), + Property.ofStatic(ProcessorProperties.CONFIG_LEGACY_PARSE_FALLBACK_ENABLED, false)); + + ConsumedRecord record = ConsumedRecord + .builder() + .recordTimestampMillis(1561709151628L) + .headers(new RecordHeaders()) + .value(TASK.toByteArray()) + .build(); + + DecatonTask extracted = extractor.extract(record); + + // check that reasonably default metadata is filled + assertEquals(TaskMetadata.builder() + .timestampMillis(1561709151628L) + .build(), extracted.metadata()); + assertEquals(TASK, extracted.taskData()); + + assertArrayEquals(TASK.toByteArray(), extracted.taskDataBytes()); + } } diff --git a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessPipelineTest.java b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessPipelineTest.java index ce0bd277..8825cc10 100644 --- a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessPipelineTest.java +++ b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessPipelineTest.java @@ -96,7 +96,7 @@ public class ProcessPipelineTest { private static TaskRequest taskRequest() { return new TaskRequest( - new TopicPartition("topic", 1), 1, new OffsetState(1234), "TEST".getBytes(StandardCharsets.UTF_8), null, NoopTrace.INSTANCE, TASK.toByteArray(), null); + 1723687072569L, new TopicPartition("topic", 1), 1, new OffsetState(1234), "TEST".getBytes(StandardCharsets.UTF_8), null, NoopTrace.INSTANCE, TASK.toByteArray(), null); } @Mock diff --git a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessingContextImplTest.java b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessingContextImplTest.java index b0960c6d..d0dc4bdb 100644 --- a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessingContextImplTest.java +++ b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessingContextImplTest.java @@ -69,6 +69,8 @@ @ExtendWith(MockitoExtension.class) public class ProcessingContextImplTest { + private static final long NOW = 1723687072569L; + private static class NamedProcessor implements DecatonProcessor { private final String name; private final DecatonProcessor impl; @@ -120,7 +122,7 @@ private static void terminateExecutor(ExecutorService executor) throws Interrupt private static ProcessingContextImpl context(RecordTraceHandle traceHandle, DecatonProcessor... processors) { TaskRequest request = new TaskRequest( - new TopicPartition("topic", 1), 1, null, "TEST".getBytes(StandardCharsets.UTF_8), + NOW, new TopicPartition("topic", 1), 1, null, "TEST".getBytes(StandardCharsets.UTF_8), null, traceHandle, TASK.toByteArray(), null); DecatonTask task = new DecatonTask<>( TaskMetadata.builder().build(), TASK, TASK.toByteArray()); @@ -366,7 +368,7 @@ public void testRetry() throws InterruptedException { CountDownLatch retryLatch = new CountDownLatch(1); DecatonProcessor retryProcessor = spy(new AsyncCompleteProcessor(retryLatch)); TaskRequest request = new TaskRequest( - new TopicPartition("topic", 1), 1, null, "TEST".getBytes(StandardCharsets.UTF_8), null, null, TASK.toByteArray(), null); + NOW, new TopicPartition("topic", 1), 1, null, "TEST".getBytes(StandardCharsets.UTF_8), null, null, TASK.toByteArray(), null); DecatonTask task = new DecatonTask<>( TaskMetadata.builder().build(), TASK.toByteArray(), TASK.toByteArray()); @@ -397,7 +399,7 @@ public void testRetryAtCompletionTimeout() throws InterruptedException { CountDownLatch retryLatch = new CountDownLatch(1); DecatonProcessor retryProcessor = spy(new AsyncCompleteProcessor(retryLatch)); TaskRequest request = new TaskRequest( - new TopicPartition("topic", 1), 1, null, "TEST".getBytes(StandardCharsets.UTF_8), null, null, TASK.toByteArray(), null); + NOW, new TopicPartition("topic", 1), 1, null, "TEST".getBytes(StandardCharsets.UTF_8), null, null, TASK.toByteArray(), null); DecatonTask task = new DecatonTask<>( TaskMetadata.builder().build(), TASK.toByteArray(), TASK.toByteArray()); diff --git a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessorUnitTest.java b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessorUnitTest.java index ed832b05..5038d5e1 100644 --- a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessorUnitTest.java +++ b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessorUnitTest.java @@ -67,7 +67,7 @@ public void setUp() { unit = spy(new ProcessorUnit(scope, pipeline, Executors.newSingleThreadExecutor())); - taskRequest = new TaskRequest(topicPartition, 1, new OffsetState(1234), null, null, null, HelloTask.getDefaultInstance().toByteArray(), null); + taskRequest = new TaskRequest(1723687072569L, topicPartition, 1, new OffsetState(1234), null, null, null, HelloTask.getDefaultInstance().toByteArray(), null); } @Test diff --git a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessorsTest.java b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessorsTest.java index 2c2de91f..f97adcc0 100644 --- a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessorsTest.java +++ b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessorsTest.java @@ -37,6 +37,7 @@ import com.linecorp.decaton.processor.runtime.DecatonProcessorSupplier; import com.linecorp.decaton.processor.runtime.DefaultSubPartitioner; import com.linecorp.decaton.processor.runtime.ProcessorProperties; +import com.linecorp.decaton.processor.runtime.Property; import com.linecorp.decaton.processor.runtime.SubPartitionRuntime; import com.linecorp.decaton.processor.tracing.internal.NoopTracingProvider; import com.linecorp.decaton.protocol.Sample.HelloTask; @@ -69,7 +70,8 @@ public void testCleanupPartiallyInitializedProcessors() { Processors processors = new Processors<>( suppliers, null, - new DefaultTaskExtractor<>(bytes -> HelloTask.getDefaultInstance()), + new DefaultTaskExtractor<>(bytes -> HelloTask.getDefaultInstance(), + Property.ofStatic(ProcessorProperties.CONFIG_LEGACY_PARSE_FALLBACK_ENABLED)), null); doThrow(new RuntimeException("exception")).when(suppliers.get(2)).getProcessor(any(), any(), anyInt());