Skip to content

Commit

Permalink
Update docs and examples for 9.0.0 (#239)
Browse files Browse the repository at this point in the history
* Update docs and examples for 9.0.0

* update
  • Loading branch information
ocadaruma authored Aug 14, 2024
1 parent 5ef1bea commit e0f6dc9
Show file tree
Hide file tree
Showing 15 changed files with 29 additions and 28 deletions.
16 changes: 8 additions & 8 deletions docs/consuming-any-data.adoc → docs/consuming-any-topic.adoc
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
Consuming Arbitrary Topic
=========================
:base_version: 8.0.0
:base_version: 9.0.0
:modules: common,protocol,processor

This document guides you how to consume and process topics containing records not consists of Decaton's protocol (not produced by DecatonClient) using Decaton processors.
This document guides you how to consume and process topics containing records not produced by DecatonClient using Decaton processors.

By default, Decaton assumes messages are serialized as `DecatonTaskRequest` defined in link:../protocol/src/main/proto/decaton.proto[decaton.proto], and `DecatonProcessor` extracts the task from its `serialized_task` field.
But Decaton has the capability to consume arbitrary topics other than topics consisting records of Decaton protocol.
By default, Decaton assumes messages are produced `DecatonClient`, where task metadata are stored as link:../protocol/src/main/proto/decaton.proto[TaskMetadataProto] in record headers.
But Decaton has the capability to consume arbitrary topics other than topics produced by `DecatonClient`.

This means you can use Decaton as a drop-in replacement for a vanilla KafkaConsumer to leverage powerful features like deferred completion, delayed processing and so on.

Through this guide, we assume the topic is JSON-serialized and use link:https://github.com/FasterXML/jackson-databind[jackson-databind] for deserialization, but it's trivial to consume arbitrary formats other than JSON.

== TaskExtractor

First, you need to start by implementing your own link:../processor/src/main/java/com/linecorp/decaton/processor/runtime/TaskExtractor.java[TaskExtractor] to deserialize a task from raw message bytes.
First, you need to start by implementing your own link:../processor/src/main/java/com/linecorp/decaton/processor/runtime/TaskExtractor.java[TaskExtractor] to extract a task from raw consumed messages.

[source,java]
.JSONUserEventExtractor.java
Expand All @@ -23,9 +23,9 @@ public class JSONUserEventExtractor implements TaskExtractor<UserEvent> {
private static final ObjectMapper MAPPER = new ObjectMapper();
@Override
public DecatonTask<UserEvent> extract(byte[] bytes) {
public DecatonTask<UserEvent> extract(ConsumedRecord record) {
try {
UserEvent event = MAPPER.readValue(bytes, UserEvent.class);
UserEvent event = MAPPER.readValue(record.value(), UserEvent.class);
TaskMetadata metadata = TaskMetadata.builder()
// Filling timestampMillis is not mandatory, but it would be useful
// when you monitor delivery latency between event production time and event processing time.
Expand All @@ -35,7 +35,7 @@ public class JSONUserEventExtractor implements TaskExtractor<UserEvent> {
// You can set other TaskMetadata fields as you needed
.build();
return new DecatonTask<>(metadata, event, bytes);
return new DecatonTask<>(metadata, event, record.value());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
Expand Down
2 changes: 1 addition & 1 deletion docs/dynamic-property-configuration.adoc
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Dynamic Property Configuration
=============================
:base_version: 8.0.0
:base_version: 9.0.0
:modules: centraldogma,processor

== Property Supplier
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import com.linecorp.decaton.processor.TaskMetadata;

import com.linecorp.decaton.processor.runtime.ConsumedRecord;
import com.linecorp.decaton.processor.runtime.DecatonTask;
import com.linecorp.decaton.processor.runtime.TaskExtractor;
import example.models.UserEvent;
Expand All @@ -31,9 +32,9 @@ public class JSONUserEventExtractor implements TaskExtractor<UserEvent> {
private static final ObjectMapper MAPPER = new ObjectMapper();

@Override
public DecatonTask<UserEvent> extract(byte[] bytes) {
public DecatonTask<UserEvent> extract(ConsumedRecord record) {
try {
UserEvent event = MAPPER.readValue(bytes, UserEvent.class);
UserEvent event = MAPPER.readValue(record.value(), UserEvent.class);
TaskMetadata metadata = TaskMetadata.builder()
// Filling timestampMillis is not mandatory, but it would be useful
// when you monitor delivery latency between event production time and event processing time.
Expand All @@ -43,7 +44,7 @@ public DecatonTask<UserEvent> extract(byte[] bytes) {
// You can set other TaskMetadata fields as you needed
.build();

return new DecatonTask<>(metadata, event, bytes);
return new DecatonTask<>(metadata, event, record.value());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
Expand Down
6 changes: 3 additions & 3 deletions docs/example/src/main/java/example/TaskBatchingMain.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,16 @@ public static void main(String[] args) throws Exception {
consumerConfig.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-decaton-processor");

TaskExtractor<HelloTask> extractor = bytes -> {
TaskExtractor<HelloTask> extractor = record -> {
TaskMetadata metadata = TaskMetadata.builder().build();
HelloTask data;
try {
data = new ObjectMapper().readValue(bytes, HelloTask.class);
data = new ObjectMapper().readValue(record.value(), HelloTask.class);
} catch (IOException e) {
throw new RuntimeException(e);
}

return new DecatonTask<>(metadata, data, bytes);
return new DecatonTask<>(metadata, data, record.value());
};
long lingerMillis = 1000;
int capacity = 100;
Expand Down
6 changes: 3 additions & 3 deletions docs/example/src/main/java/example/TaskCompactionMain.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,16 @@ public static void main(String[] args) throws Exception {
System.getProperty("bootstrap.servers"));
consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-decaton-processor");

TaskExtractor<LocationEvent> extractor = bytes -> {
TaskExtractor<LocationEvent> extractor = record -> {
TaskMetadata metadata = TaskMetadata.builder().build();
LocationEvent data;
try {
data = new ObjectMapper().readValue(bytes, LocationEvent.class);
data = new ObjectMapper().readValue(record.value(), LocationEvent.class);
} catch (IOException e) {
throw new RuntimeException(e);
}

return new DecatonTask<>(metadata, data, bytes);
return new DecatonTask<>(metadata, data, record.value());
};

ProcessorSubscription subscription =
Expand Down
2 changes: 1 addition & 1 deletion docs/getting-started.adoc
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Getting Started Decaton
=======================
:base_version: 8.0.0
:base_version: 9.0.0
:modules: common,client,processor,protobuf

Let's start from the most basic usage of Decaton client/processor.
Expand Down
2 changes: 1 addition & 1 deletion docs/key-blocking.adoc
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
= Key Blocking
:base_version: 8.0.0
:base_version: 9.0.0
:modules: processor

== Introduction
Expand Down
2 changes: 1 addition & 1 deletion docs/monitoring.adoc
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Monitoring Decaton
==================
:base_version: 8.0.0
:base_version: 9.0.0
:modules: processor

This document guides you how to monitor your Decaton processor applications.
Expand Down
2 changes: 1 addition & 1 deletion docs/rate-limiting.adoc
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
= Rate Limiting
:base_version: 8.0.0
:base_version: 9.0.0
:modules: processor

== Introduction
Expand Down
2 changes: 1 addition & 1 deletion docs/retry-queueing.adoc
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
= Retry Queuing
:base_version: 8.0.0
:base_version: 9.0.0
:modules: processor

== Introduction
Expand Down
2 changes: 1 addition & 1 deletion docs/runtime.adoc
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
= Subpartition Runtime
:base_version: 8.0.0
:base_version: 9.0.0
:modules: processor

This document guides you what Subpartition Runtime is and how to use it.
Expand Down
2 changes: 1 addition & 1 deletion docs/task-batching.adoc
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
= Task Batching
:base_version: 8.0.0
:base_version: 9.0.0
:modules: processor

== Introduction
Expand Down
2 changes: 1 addition & 1 deletion docs/task-compaction.adoc
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
= Task Compaction
:base_version: 8.0.0
:base_version: 9.0.0
:modules: processor

== Introduction
Expand Down
2 changes: 1 addition & 1 deletion docs/tracing.adoc
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
= Tracing
:base_version: 8.0.0
:base_version: 9.0.0
:modules: brave,processor

Decaton can integrate with distributed tracing frameworks so that you can associate the processing of a message
Expand Down
2 changes: 1 addition & 1 deletion docs/why-decaton.adoc
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Why Decaton
===========
:base_version: 8.0.0
:base_version: 9.0.0
:modules: processor

This document explains why we have decided to create a new consumer framework.
Expand Down

0 comments on commit e0f6dc9

Please sign in to comment.