Skip to content

Commit

Permalink
TaskMetadata as header (#238)
Browse files Browse the repository at this point in the history
* TaskMetadata as header

* fix metadata drop

* fix test

* add test

* add integration test

* address comments

* update license header
  • Loading branch information
ocadaruma authored Aug 13, 2024
1 parent 08614dd commit 5ef1bea
Show file tree
Hide file tree
Showing 36 changed files with 497 additions and 276 deletions.
7 changes: 0 additions & 7 deletions .idea/copyright/LINE_OSS.xml

This file was deleted.

7 changes: 7 additions & 0 deletions .idea/copyright/LY_OSS.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 6 additions & 2 deletions .idea/copyright/profiles_settings.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,11 @@ public String get(String key) {
.subPartitionRuntime(subPartitionRuntime)
.processorsBuilder(
ProcessorsBuilder.consuming(config.topic(),
(TaskExtractor<Task>) bytes -> {
(TaskExtractor<Task>) record -> {
Task task = config.taskDeserializer()
.deserialize(config.topic(), bytes);
.deserialize(config.topic(), record.value());
return new DecatonTask<>(
TaskMetadata.builder().build(), task, bytes);
TaskMetadata.builder().build(), task, record.value());
})
.thenProcess(
(ctx, task) -> recording.process(task)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@
import org.apache.kafka.common.serialization.ByteArraySerializer;

import com.linecorp.decaton.client.internal.DecatonClientImpl;
import com.linecorp.decaton.client.kafka.ProtocolBuffersKafkaSerializer;
import com.linecorp.decaton.common.Serializer;
import com.linecorp.decaton.protocol.Decaton.DecatonTaskRequest;

import lombok.AccessLevel;
import lombok.Setter;
Expand All @@ -54,10 +52,10 @@ public class DecatonClientBuilder<T> {

public static class DefaultKafkaProducerSupplier implements KafkaProducerSupplier {
@Override
public Producer<byte[], DecatonTaskRequest> getProducer(Properties config) {
public Producer<byte[], byte[]> getProducer(Properties config) {
return new KafkaProducer<>(config,
new ByteArraySerializer(),
new ProtocolBuffersKafkaSerializer<>());
new ByteArraySerializer());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.kafka.clients.producer.Producer;

import com.linecorp.decaton.client.internal.DecatonClientImpl;
import com.linecorp.decaton.protocol.Decaton.DecatonTaskRequest;

/**
* An interface to specify a custom instantiation function for {@link Producer}.
Expand All @@ -39,5 +38,5 @@ public interface KafkaProducerSupplier {
* @return an Kafka producer instance which implements {@link Producer}. The returned instance will be
* closed along with {@link DecatonClient#close} being called.
*/
Producer<byte[], DecatonTaskRequest> getProducer(Properties config);
Producer<byte[], byte[]> getProducer(Properties config);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,13 @@
import java.util.function.Consumer;
import java.util.function.Supplier;

import com.google.protobuf.ByteString;
import org.apache.kafka.clients.producer.ProducerRecord;

import com.linecorp.decaton.client.DecatonClient;
import com.linecorp.decaton.client.KafkaProducerSupplier;
import com.linecorp.decaton.client.PutTaskResult;
import com.linecorp.decaton.client.kafka.PrintableAsciiStringSerializer;
import com.linecorp.decaton.common.Serializer;
import com.linecorp.decaton.protocol.Decaton.DecatonTaskRequest;
import com.linecorp.decaton.protocol.Decaton.TaskMetadataProto;

public class DecatonClientImpl<T> implements DecatonClient<T> {
Expand All @@ -52,7 +51,7 @@ public class DecatonClientImpl<T> implements DecatonClient<T> {
this.serializer = serializer;
this.applicationId = applicationId;
this.instanceId = instanceId;
producer = new DecatonTaskProducer(topic, producerConfig, producerSupplier);
producer = new DecatonTaskProducer(producerConfig, producerSupplier);
this.timestampSupplier = timestampSupplier;
}

Expand Down Expand Up @@ -109,13 +108,11 @@ private CompletableFuture<PutTaskResult> put(String key, T task, TaskMetadataPro
byte[] serializedKey = keySerializer.serialize(topic, key);
byte[] serializedTask = serializer.serialize(task);

DecatonTaskRequest request =
DecatonTaskRequest.newBuilder()
.setMetadata(taskMetadataProto)
.setSerializedTask(ByteString.copyFrom(serializedTask))
.build();
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
topic, partition, serializedKey, serializedTask);
TaskMetadataUtil.writeAsHeader(taskMetadataProto, record.headers());

return producer.sendRequest(serializedKey, request, partition);
return producer.sendRequest(record);
}

private TaskMetadataProto convertToTaskMetadataProto(TaskMetadata overrideTaskMetadata) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,9 @@
import com.linecorp.decaton.client.DecatonClient;
import com.linecorp.decaton.client.KafkaProducerSupplier;
import com.linecorp.decaton.client.PutTaskResult;
import com.linecorp.decaton.protocol.Decaton.DecatonTaskRequest;

/**
* A raw interface to put a built {@link DecatonTaskRequest} directly.
* A raw interface to put decaton tasks.
* This interface isn't expected to be used by applications unless it's really necessary.
* Use {@link DecatonClient} to put task into a Decaton topic instead.
*/
Expand All @@ -44,8 +43,7 @@ public class DecatonTaskProducer implements AutoCloseable {
presetProducerConfig.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
}

private final Producer<byte[], DecatonTaskRequest> producer;
private final String topic;
private final Producer<byte[], byte[]> producer;

private static Properties completeProducerConfig(Properties producerConfig) {
final Properties result = new Properties();
Expand All @@ -54,20 +52,13 @@ private static Properties completeProducerConfig(Properties producerConfig) {
return result;
}

public DecatonTaskProducer(String topic, Properties producerConfig,
public DecatonTaskProducer(Properties producerConfig,
KafkaProducerSupplier producerSupplier) {
Properties completeProducerConfig = completeProducerConfig(producerConfig);
producer = producerSupplier.getProducer(completeProducerConfig);
this.topic = topic;
}

public CompletableFuture<PutTaskResult> sendRequest(byte[] key, DecatonTaskRequest request,
Integer partition) {
ProducerRecord<byte[], DecatonTaskRequest> record = new ProducerRecord<>(topic, partition, key, request);
return sendRequest(record);
}

private CompletableFuture<PutTaskResult> sendRequest(ProducerRecord<byte[], DecatonTaskRequest> record) {
public CompletableFuture<PutTaskResult> sendRequest(ProducerRecord<byte[], byte[]> record) {
CompletableFuture<PutTaskResult> result = new CompletableFuture<>();
producer.send(record, (metadata, exception) -> {
if (exception == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2024 LY Corporation
*
* LY Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.decaton.client.internal;

import java.io.UncheckedIOException;

import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;

import com.google.protobuf.InvalidProtocolBufferException;

import com.linecorp.decaton.protocol.Decaton.TaskMetadataProto;

public class TaskMetadataUtil {
private static final String METADATA_HEADER_KEY = "dt_meta";

/**
* Write metadata to {@link Headers}
* @param metadata task metadata to be written
* @param headers record header to write to
*/
public static void writeAsHeader(TaskMetadataProto metadata, Headers headers) {
headers.remove(METADATA_HEADER_KEY);
headers.add(METADATA_HEADER_KEY, metadata.toByteArray());
}

/**
* Read metadata from given {@link Headers}
* @param headers record header to read from
* @return parsed {@link TaskMetadataProto} or null if header is absent
* @throws IllegalStateException if metadata bytes is invalid
*/
public static TaskMetadataProto readFromHeader(Headers headers) {
Header header = headers.lastHeader(METADATA_HEADER_KEY);
if (header == null) {
return null;
}
try {
return TaskMetadataProto.parseFrom(header.value());
} catch (InvalidProtocolBufferException e) {
throw new UncheckedIOException(e);
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,20 @@
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import com.linecorp.decaton.client.internal.TaskMetadataUtil;
import com.linecorp.decaton.protobuf.ProtocolBuffersSerializer;
import com.linecorp.decaton.protocol.Decaton.DecatonTaskRequest;
import com.linecorp.decaton.protocol.Decaton.TaskMetadataProto;
import com.linecorp.decaton.protocol.Sample.HelloTask;

@ExtendWith(MockitoExtension.class)
public class DecatonClientBuilderTest {
@Mock
private Producer<byte[], DecatonTaskRequest> producer;
private Producer<byte[], byte[]> producer;

@Captor
private ArgumentCaptor<ProducerRecord<byte[], DecatonTaskRequest>> recordCaptor;
private ArgumentCaptor<ProducerRecord<byte[], byte[]>> recordCaptor;

private ProducerRecord<byte[], DecatonTaskRequest> doProduce(DecatonClient<HelloTask> dclient) {
private ProducerRecord<byte[], byte[]> doProduce(DecatonClient<HelloTask> dclient) {
dclient.put(null, HelloTask.getDefaultInstance());
verify(producer, times(1)).send(recordCaptor.capture(), any(Callback.class));
return recordCaptor.getValue();
Expand All @@ -66,10 +66,10 @@ public void testBuild() {
.producerSupplier(config -> producer)
.build();

ProducerRecord<byte[], DecatonTaskRequest> record = doProduce(dclient);
ProducerRecord<byte[], byte[]> record = doProduce(dclient);
assertEquals(topic, record.topic());

TaskMetadataProto metadata = record.value().getMetadata();
TaskMetadataProto metadata = TaskMetadataUtil.readFromHeader(record.headers());
assertEquals(applicationId, metadata.getSourceApplicationId());
assertEquals(instanceId, metadata.getSourceInstanceId());
}
Expand Down
Loading

0 comments on commit 5ef1bea

Please sign in to comment.