Skip to content

Commit

Permalink
Make Decaton can consume any topic with deserializer (#241)
Browse files Browse the repository at this point in the history
* Make Decaton can consume any topic with deserializer

* fix

* get rid of workaround

* nits

* refactor
  • Loading branch information
ocadaruma authored Aug 16, 2024
1 parent e0f6dc9 commit d5dae11
Show file tree
Hide file tree
Showing 21 changed files with 190 additions and 73 deletions.
2 changes: 0 additions & 2 deletions docs/getting-started.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -351,8 +351,6 @@ By leveraging Decaton's deferred completion and async-client of your middleware

Now you know the basics and ready to start implementing Decaton apps!

If you're attempting to consume existing topic which contains records in schema other than Decaton's task protocol, or maybe you want to use task schema that can be understandable even for non-decaton consumers. In case visit link:./consuming-any-data.adoc[Consuming Arbitrary Topic] to see how.

For those thinking to run Decaton on production, link:./monitoring.adoc[Monitoring] might helps to always ensure your Decaton processors doing good.

If you're using link:https://spring.io/[Spring] for running your applications, you might wanna take a look at link:./spring-integration.adoc[Spring Integration].
Expand Down
2 changes: 1 addition & 1 deletion docs/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Decaton Documents
- link:./runtime.adoc[Subpartition Runtime]
- link:./spring-integration.adoc[Spring Integration]
- link:./tracing.adoc[Tracing]
- link:./consuming-any-data.adoc[Use Decaton for consuming topics of non-Decaton tasks]
- link:./task-extractor.adoc[Implement custom task extractor]
- link:./dynamic-property-configuration.adoc[Dynamic property configuration for the processor]
- link:./monitoring.adoc[Monitoring Decaton]
- Features
Expand Down
38 changes: 27 additions & 11 deletions docs/consuming-any-topic.adoc → docs/task-extractor.adoc
Original file line number Diff line number Diff line change
@@ -1,16 +1,33 @@
Consuming Arbitrary Topic
=========================
TaskExtractor
=============
:base_version: 9.0.0
:modules: common,protocol,processor

This document guides you how to consume and process topics containing records not produced by DecatonClient using Decaton processors.
[NOTE]
====
From Decaton 9.0.0, you can just use link:../processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java[ProcessorsBuilder#consuming(String topic, Deserializer<T> deserializer)] to consume arbitrary topics,
without worrying if the topic is produced by DecatonClient or not.
You may need to read through this guide *ONLY* in specific situations described just below.
====

Decaton provides two ways to consume topics in link:../processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java[ProcessorsBuilder]:

* `ProcessorsBuilder#consuming(String topic, Deserializer<T> deserializer)`
* `ProcessorsBuilder#consuming(String topic, TaskExtractor<T> deserializer)`
By default, Decaton assumes messages are produced `DecatonClient`, where task metadata are stored as link:../protocol/src/main/proto/decaton.proto[TaskMetadataProto] in record headers.
But Decaton has the capability to consume arbitrary topics other than topics produced by `DecatonClient`.
As you may have learned through link:./getting-started.adoc[Getting Started], former is the most common and convenient way to consume topics, where you can just pass a value deserializer.

This means you can use Decaton as a drop-in replacement for a vanilla KafkaConsumer to leverage powerful features like deferred completion, delayed processing and so on.
However, sometimes you may need to apply custom logic to extract a task from raw consumed messages:

Through this guide, we assume the topic is JSON-serialized and use link:https://github.com/FasterXML/jackson-databind[jackson-databind] for deserialization, but it's trivial to consume arbitrary formats other than JSON.
* You need to extract custom task metadata on consumption. (e.g. Set `scheduledTimeMillis` for delayed processing)
* You need to access additional information (e.g. record headers) for deserialization
This is where latter way with `TaskExtractor` comes in.

This guide will show you how to implement `TaskExtractor` and use it.

Through this guide, we assume the topic is JSON-serialized and use link:https://github.com/FasterXML/jackson-databind[jackson-databind] for deserialization.

== TaskExtractor

Expand All @@ -29,6 +46,7 @@ public class JSONUserEventExtractor implements TaskExtractor<UserEvent> {
TaskMetadata metadata = TaskMetadata.builder()
// Filling timestampMillis is not mandatory, but it would be useful
// when you monitor delivery latency between event production time and event processing time.
// Also, this will be used for scheduling tasks when scheduledTimeMillis is set.
.timestampMillis(event.getEventTimestampMillis())
// This field is not mandatory too, but you can track which application produced the task by filling this.
.sourceApplicationId("event-tracker")
Expand Down Expand Up @@ -66,7 +84,7 @@ public class UserEventProcessor implements DecatonProcessor<UserEvent> {
}
----

As you can see, once you implement TaskExtractor, the implementation of the DecatonProcessor can be done as when you consume a regular Decaton topic.
As you can see, there's no difference the implementation of the DecatonProcessor from the case where you use `Deserializer`.

Lastly, you need to instantiate link:../processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorSubscription.java[ProcessorSubscription] as follows.

Expand All @@ -84,11 +102,9 @@ ProcessorsBuilder.consuming(
...
----

You have to pass TaskExtractor which you implemented above instead of link:../common/src/main/java/com/linecorp/decaton/common/Deserializer.java[Deseiralizer].

== Run Example

Now we are ready to process a JSON topic.
Now we are ready to process a JSON topic with custom task extraction logic.

Before trying out, let's download and extract the kafka binary from https://kafka.apache.org/downloads to use `kafka-console-producer.sh`.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,16 +258,18 @@ public void testRetryQueueingFromCompletionTimeoutCallback() throws Exception {
@Test
@Timeout(60)
public void testRetryQueueingMigrateToHeader() throws Exception {
DynamicProperty<Boolean> metadataAsHeader =
new DynamicProperty<>(ProcessorProperties.CONFIG_TASK_METADATA_AS_HEADER);
metadataAsHeader.set(false);
DynamicProperty<Boolean> retryTaskInLegacyFormat =
new DynamicProperty<>(ProcessorProperties.CONFIG_RETRY_TASK_IN_LEGACY_FORMAT);
retryTaskInLegacyFormat.set(true);

AtomicInteger processCount = new AtomicInteger(0);
CountDownLatch migrationLatch = new CountDownLatch(1);
ProcessorTestSuite
.builder(rule)
.numTasks(100)
.propertySupplier(StaticPropertySupplier.of(metadataAsHeader))
.propertySupplier(StaticPropertySupplier.of(
retryTaskInLegacyFormat,
Property.ofStatic(ProcessorProperties.CONFIG_LEGACY_PARSE_FALLBACK_ENABLED, true)))
.produceTasksWithHeaderMetadata(false)
.configureProcessorsBuilder(builder -> builder.thenProcess((ctx, task) -> {
if (ctx.metadata().retryCount() == 0) {
Expand All @@ -276,7 +278,7 @@ public void testRetryQueueingMigrateToHeader() throws Exception {
if (cnt < 50) {
ctx.retry();
} else if (cnt == 50) {
metadataAsHeader.set(true);
retryTaskInLegacyFormat.set(true);
migrationLatch.countDown();
ctx.retry();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@
@Builder
@Accessors(fluent = true)
public class ConsumedRecord {
/**
* The timestamp of the record
*/
long recordTimestampMillis;

/**
* Headers of the record
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import org.slf4j.MDC;

import com.linecorp.decaton.common.Deserializer;
import com.linecorp.decaton.processor.Completion;
import com.linecorp.decaton.processor.DecatonProcessor;
import com.linecorp.decaton.processor.ProcessingContext;
Expand Down Expand Up @@ -226,17 +227,31 @@ public class ProcessorProperties extends AbstractDecatonProperties {
Long.MAX_VALUE, v -> v instanceof Long && (Long) v >= 0);

/**
* Controls whether to produce retry tasks with task metadata as headers, instead of as deprecated
* {@link DecatonTaskRequest} format.
* Controls whether to produce retry tasks in deprecated {@link DecatonTaskRequest} format.
* <p>
* <b>CAUTION!!! YOU MAY NEED TO SET THIS TO FALSE WHEN YOU UPGRADE FROM 8.0.1 OR EARLIER</b>
* <b>CAUTION!!! YOU MAY NEED TO SET THIS TO TRUE WHEN YOU UPGRADE FROM 8.0.1 OR EARLIER</b>
* <p>
* Please read <a href="https://github.com/line/decaton/releases/tag/v9.0.0">Decaton 9.0.0 Release Note</a> carefully.
* <p>
* Reloadable: yes
*/
public static final PropertyDefinition<Boolean> CONFIG_TASK_METADATA_AS_HEADER =
PropertyDefinition.define("decaton.task.metadata.as.header", Boolean.class, true,
public static final PropertyDefinition<Boolean> CONFIG_RETRY_TASK_IN_LEGACY_FORMAT =
PropertyDefinition.define("decaton.retry.task.in.legacy.format", Boolean.class, false,
v -> v instanceof Boolean);

/**
* Controls whether to parse records as {@link DecatonTaskRequest} format when task metadata header is missing
* when {@link Deserializer} is used, instead of parsing task directly with the deserializer and
* fill reasonably-default task metadata.
* <p>
* <b>CAUTION!!! YOU MAY NEED TO SET THIS TO TRUE WHEN YOU UPGRADE FROM 8.0.1 OR EARLIER</b>
* <p>
* Please read <a href="https://github.com/line/decaton/releases/tag/v9.0.0">Decaton 9.0.0 Release Note</a> carefully.
* <p>
* Reloadable: yes
*/
public static final PropertyDefinition<Boolean> CONFIG_LEGACY_PARSE_FALLBACK_ENABLED =
PropertyDefinition.define("decaton.legacy.parse.fallback.enabled", Boolean.class, false,
v -> v instanceof Boolean);

public static final List<PropertyDefinition<?>> PROPERTY_DEFINITIONS =
Expand All @@ -253,7 +268,8 @@ public class ProcessorProperties extends AbstractDecatonProperties {
CONFIG_DEFERRED_COMPLETE_TIMEOUT_MS,
CONFIG_PROCESSOR_THREADS_TERMINATION_TIMEOUT_MS,
CONFIG_PER_KEY_QUOTA_PROCESSING_RATE,
CONFIG_TASK_METADATA_AS_HEADER));
CONFIG_RETRY_TASK_IN_LEGACY_FORMAT,
CONFIG_LEGACY_PARSE_FALLBACK_ENABLED));

/**
* Find and return a {@link PropertyDefinition} from its name.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@

import com.linecorp.decaton.common.Deserializer;
import com.linecorp.decaton.processor.DecatonProcessor;
import com.linecorp.decaton.processor.TaskMetadata;
import com.linecorp.decaton.processor.runtime.internal.DecatonProcessorSupplierImpl;
import com.linecorp.decaton.processor.runtime.internal.DefaultTaskExtractor;
import com.linecorp.decaton.processor.runtime.internal.Processors;

import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.experimental.Accessors;

/**
Expand All @@ -39,29 +39,34 @@
public class ProcessorsBuilder<T> {
@Getter
private final String topic;
private final TaskExtractor<T> taskExtractor;
private final TaskExtractor<T> retryTaskExtractor;
private final Deserializer<T> userSuppliedDeserializer;
private final TaskExtractor<T> userSuppliedTaskExtractor;

private final List<DecatonProcessorSupplier<T>> suppliers;

public ProcessorsBuilder(String topic, TaskExtractor<T> taskExtractor, TaskExtractor<T> retryTaskExtractor) {
ProcessorsBuilder(String topic, Deserializer<T> userSuppliedDeserializer, TaskExtractor<T> userSuppliedTaskExtractor) {
this.topic = topic;
this.taskExtractor = taskExtractor;
this.retryTaskExtractor = retryTaskExtractor;
this.userSuppliedDeserializer = userSuppliedDeserializer;
this.userSuppliedTaskExtractor = userSuppliedTaskExtractor;
suppliers = new ArrayList<>();
}

/**
* Create new {@link ProcessorsBuilder} that consumes message from topic expecting tasks of type
* which can be parsed by valueParser.
* which can be parsed by deserializer.
* <p>
* From Decaton 9.0.0, you can use this overload to consume tasks from arbitrary topics not only
* topics that are produced by DecatonClient.
* <p>
* If you want to extract custom {@link TaskMetadata} (e.g. for delayed processing), you can use
* {@link #consuming(String, TaskExtractor)} instead.
* @param topic the name of topic to consume.
* @param deserializer the deserializer to instantiate task of type {@link T} from serialized bytes.
* @param <T> the type of instantiated tasks.
* @return an instance of {@link ProcessorsBuilder}.
*/
public static <T> ProcessorsBuilder<T> consuming(String topic, Deserializer<T> deserializer) {
DefaultTaskExtractor<T> taskExtractor = new DefaultTaskExtractor<>(deserializer);
return new ProcessorsBuilder<>(topic, taskExtractor, taskExtractor);
return new ProcessorsBuilder<>(topic, deserializer, null);
}

/**
Expand All @@ -73,7 +78,7 @@ public static <T> ProcessorsBuilder<T> consuming(String topic, Deserializer<T> d
* @return an instance of {@link ProcessorsBuilder}.
*/
public static <T> ProcessorsBuilder<T> consuming(String topic, TaskExtractor<T> taskExtractor) {
return new ProcessorsBuilder<>(topic, taskExtractor, new RetryTaskExtractor<>(taskExtractor));
return new ProcessorsBuilder<>(topic, null, taskExtractor);
}

/**
Expand Down Expand Up @@ -116,15 +121,36 @@ public ProcessorsBuilder<T> thenProcess(DecatonProcessor<T> processor) {
return thenProcess(new DecatonProcessorSupplierImpl<>(() -> processor, ProcessorScope.PROVIDED));
}

Processors<T> build(DecatonProcessorSupplier<byte[]> retryProcessorSupplier) {
Processors<T> build(DecatonProcessorSupplier<byte[]> retryProcessorSupplier, ProcessorProperties properties) {
Property<Boolean> legacyFallbackEnabledProperty = properties.get(ProcessorProperties.CONFIG_LEGACY_PARSE_FALLBACK_ENABLED);

final TaskExtractor<T> taskExtractor;
final TaskExtractor<T> retryTaskExtractor;

// consuming(String, Deserializer) is used
if (userSuppliedDeserializer != null) {
DefaultTaskExtractor<T> extractor = new DefaultTaskExtractor<>(userSuppliedDeserializer, legacyFallbackEnabledProperty);
taskExtractor = extractor;
retryTaskExtractor = extractor;
} else {
// consuming(String, TaskExtractor) is used
taskExtractor = userSuppliedTaskExtractor;
retryTaskExtractor = new RetryTaskExtractor<>(legacyFallbackEnabledProperty, userSuppliedTaskExtractor);
}

return new Processors<>(suppliers, retryProcessorSupplier, taskExtractor, retryTaskExtractor);
}

@RequiredArgsConstructor
private static class RetryTaskExtractor<T> implements TaskExtractor<T> {
private final DefaultTaskExtractor<byte[]> outerExtractor = new DefaultTaskExtractor<>(bytes -> bytes);
private final DefaultTaskExtractor<byte[]> outerExtractor;
private final TaskExtractor<T> innerExtractor;

RetryTaskExtractor(Property<Boolean> legacyFallbackEnabledProperty,
TaskExtractor<T> innerExtractor) {
this.innerExtractor = innerExtractor;
this.outerExtractor = new DefaultTaskExtractor<>(bytes -> bytes, legacyFallbackEnabledProperty);
}

@Override
public DecatonTask<T> extract(ConsumedRecord record) {
// Retry tasks might be stored in retry-topic in DecatonTaskRequest format depending on
Expand All @@ -134,6 +160,7 @@ public DecatonTask<T> extract(ConsumedRecord record) {
DecatonTask<byte[]> outerTask = outerExtractor.extract(record);
ConsumedRecord inner = ConsumedRecord
.builder()
.recordTimestampMillis(record.recordTimestampMillis())
.headers(record.headers())
.key(record.key())
.value(outerTask.taskDataBytes())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ public ProcessorSubscription build() {
return new ProcessorSubscription(scope,
consumerSupplier.get(),
quotaApplier(scope),
processorsBuilder.build(maybeRetryProcessorSupplier(scope)),
processorsBuilder.build(maybeRetryProcessorSupplier(scope), props),
props,
stateListener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ public class DecatonTaskRetryQueueingProcessor implements DecatonProcessor<byte[
private final Duration backoff;
private final RetryMetrics metrics;
private final String retryTopic;
private final Property<Boolean> metadataAsHeaderProperty;
private final Property<Boolean> retryTaskInLegacyFormatProperty;

public DecatonTaskRetryQueueingProcessor(SubscriptionScope scope, DecatonTaskProducer producer) {
RetryConfig retryConfig = scope.retryConfig().get(); // This won't be instantiated unless it present
this.producer = producer;
backoff = retryConfig.backoff();
retryTopic = scope.retryTopic().get(); // This won't be instantiated unless it present
metadataAsHeaderProperty = scope.props().get(ProcessorProperties.CONFIG_TASK_METADATA_AS_HEADER);
retryTaskInLegacyFormatProperty = scope.props().get(ProcessorProperties.CONFIG_RETRY_TASK_IN_LEGACY_FORMAT);

metrics = Metrics.withTags("subscription", scope.subscriptionId()).new RetryMetrics();
}
Expand All @@ -70,15 +70,7 @@ public void process(ProcessingContext<byte[]> context, byte[] serializedTask)
.build();

final ProducerRecord<byte[], byte[]> record;
if (metadataAsHeaderProperty.value()) {
record = new ProducerRecord<>(
retryTopic,
null,
context.key(),
serializedTask,
context.headers());
TaskMetadataUtil.writeAsHeader(taskMetadata, record.headers());
} else {
if (retryTaskInLegacyFormatProperty.value()) {
DecatonTaskRequest request =
DecatonTaskRequest.newBuilder()
.setMetadata(taskMetadata)
Expand All @@ -90,6 +82,14 @@ record = new ProducerRecord<>(
context.key(),
request.toByteArray(),
context.headers());
} else {
record = new ProducerRecord<>(
retryTopic,
null,
context.key(),
serializedTask,
context.headers());
TaskMetadataUtil.writeAsHeader(taskMetadata, record.headers());
}
metrics.retryTaskRetries.record(nextRetryCount);

Expand Down
Loading

0 comments on commit d5dae11

Please sign in to comment.