Skip to content

Commit

Permalink
KAFKA-16666 Migrate TransactionLogMessageFormatter to tools module (a…
Browse files Browse the repository at this point in the history
…pache#16019)

Reviewers: Chia-Ping Tsai <[email protected]>
  • Loading branch information
m1a2st authored Jul 24, 2024
1 parent 3e2de23 commit ee68f32
Show file tree
Hide file tree
Showing 15 changed files with 409 additions and 31 deletions.
51 changes: 26 additions & 25 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -917,10 +917,6 @@ project(':core') {
archivesName = "kafka_${versions.baseScala}"
}

configurations {
generator
}

dependencies {
// `core` is often used in users' tests, define the following dependencies as `api` for backwards compatibility
// even though the `core` module doesn't expose any public API
Expand Down Expand Up @@ -997,8 +993,6 @@ project(':core') {
testImplementation libs.caffeine

testRuntimeOnly libs.junitPlatformLanucher

generator project(':generator')
}

if (userEnableTestCoverage) {
Expand Down Expand Up @@ -1038,24 +1032,6 @@ project(':core') {
duplicatesStrategy 'exclude'
}

task processMessages(type:JavaExec) {
mainClass = "org.apache.kafka.message.MessageGenerator"
classpath = configurations.generator
args = [ "-p", "kafka.internals.generated",
"-o", "src/generated/java/kafka/internals/generated",
"-i", "src/main/resources/common/message",
"-m", "MessageDataGenerator"
]
inputs.dir("src/main/resources/common/message")
.withPropertyName("messages")
.withPathSensitivity(PathSensitivity.RELATIVE)
outputs.cacheIf { true }
outputs.dir("src/generated/java/kafka/internals/generated")
}

compileJava.dependsOn 'processMessages'
srcJar.dependsOn 'processMessages'

task genProtocolErrorDocs(type: JavaExec) {
classpath = sourceSets.main.runtimeClasspath
mainClass = 'org.apache.kafka.common.protocol.Errors'
Expand Down Expand Up @@ -1456,10 +1432,16 @@ project(':transaction-coordinator') {
archivesName = "kafka-transaction-coordinator"
}

configurations {
generator
}

dependencies {
implementation libs.jacksonDatabind
implementation project(':clients')

generator project(':generator')
}

sourceSets {
main {
java {
Expand All @@ -1477,6 +1459,24 @@ project(':transaction-coordinator') {
configProperties = checkstyleConfigProperties("import-control-transaction-coordinator.xml")
}

task processMessages(type:JavaExec) {
mainClass = "org.apache.kafka.message.MessageGenerator"
classpath = configurations.generator
args = [ "-p", "org.apache.kafka.coordinator.transaction.generated",
"-o", "src/generated/java/org/apache/kafka/coordinator/transaction/generated",
"-i", "src/main/resources/common/message",
"-m", "MessageDataGenerator", "JsonConverterGenerator"
]
inputs.dir("src/main/resources/common/message")
.withPropertyName("messages")
.withPathSensitivity(PathSensitivity.RELATIVE)
outputs.cacheIf { true }
outputs.dir("src/generated/java/org/apache/kafka/coordinator/transaction/generated")
}

compileJava.dependsOn 'processMessages'
srcJar.dependsOn 'processMessages'

javadoc {
enabled = false
}
Expand Down Expand Up @@ -2116,6 +2116,7 @@ project(':tools') {
implementation project(':server-common')
implementation project(':connect:runtime')
implementation project(':tools:tools-api')
implementation project(':transaction-coordinator')
implementation libs.argparse4j
implementation libs.jacksonDatabind
implementation libs.jacksonDataformatCsv
Expand Down
1 change: 1 addition & 0 deletions checkstyle/import-control-core.xml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
<allow pkg="org.apache.kafka.clients.admin"/>
<allow pkg="org.apache.kafka.clients.consumer"/>
<allow pkg="org.apache.kafka.coordinator.group"/>
<allow pkg="org.apache.kafka.coordinator.transaction"/>
<subpackage name="annotation">
<allow pkg="kafka.test"/>
</subpackage>
Expand Down
13 changes: 12 additions & 1 deletion checkstyle/import-control-transaction-coordinator.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,16 @@
<disallow pkg="kafka" />

<!-- anyone can use public classes -->
<allow pkg="org.apache.kafka.common.config" />
<subpackage name="coordinator">
<subpackage name="transaction">
<allow pkg="org.apache.kafka.common.config" />
<subpackage name="generated">
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.errors" />
<allow pkg="org.apache.kafka.coordinator.transaction.generated" />
<allow pkg="org.apache.kafka.common.utils" />
<allow pkg="com.fasterxml.jackson" />
</subpackage>
</subpackage>
</subpackage>
</import-control>
1 change: 1 addition & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@
<allow pkg="javax.rmi.ssl"/>
<allow pkg="kafka.utils" />
<allow pkg="scala.collection" />
<allow pkg="org.apache.kafka.coordinator.transaction" />

<subpackage name="consumer">
<allow pkg="org.apache.kafka.tools"/>
Expand Down
2 changes: 2 additions & 0 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,8 @@
files="storage[\\/]src[\\/](generated|generated-test)[\\/].+.java$"/>
<suppress checks="(NPathComplexity|ClassFanOutComplexity|CyclomaticComplexity|ClassDataAbstractionCoupling|FinalLocalVariable|LocalVariableName|MemberName|ParameterName|MethodLength|JavaNCSS|AvoidStarImport)"
files="group-coordinator[\\/]src[\\/](generated|generated-test)[\\/].+.java$"/>
<suppress checks="(NPathComplexity|ClassFanOutComplexity|CyclomaticComplexity|ClassDataAbstractionCoupling|FinalLocalVariable|LocalVariableName|MemberName|ParameterName|MethodLength|JavaNCSS|AvoidStarImport)"
files="transaction-coordinator[\\/]src[\\/](generated|generated-test)[\\/].+.java$"/>

<suppress checks="ImportControl" files="FetchResponseData.java"/>
<suppress checks="ImportControl" files="RecordsSerdeTest.java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ package kafka.coordinator.transaction
import java.io.PrintStream
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import kafka.internals.generated.{TransactionLogKey, TransactionLogValue}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.protocol.{ByteBufferAccessor, MessageUtil}
import org.apache.kafka.common.record.{Record, RecordBatch}
import org.apache.kafka.common.{MessageFormatter, TopicPartition}
import org.apache.kafka.coordinator.transaction.generated.{TransactionLogKey, TransactionLogValue}

import scala.collection.mutable
import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -143,6 +143,7 @@ object TransactionLog {
}

// Formatter for use with tools to read transaction log messages
@Deprecated
class TransactionLogMessageFormatter extends MessageFormatter {
def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = {
Option(consumerRecord.key).map(key => readTxnRecordKey(ByteBuffer.wrap(key))).foreach {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@
package kafka.coordinator.transaction


import kafka.internals.generated.TransactionLogKey
import kafka.internals.generated.TransactionLogValue
import kafka.utils.TestUtils
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.protocol.{ByteBufferAccessor, MessageUtil}
import org.apache.kafka.common.protocol.types.Field.TaggedFieldsSection
import org.apache.kafka.common.protocol.types.{CompactArrayOf, Field, Schema, Struct, Type}
import org.apache.kafka.common.record.{MemoryRecords, SimpleRecord}
import org.apache.kafka.coordinator.transaction.generated.{TransactionLogKey, TransactionLogValue}
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue}
import org.junit.jupiter.api.Test

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package kafka.coordinator.transaction

import kafka.internals.generated.TransactionLogKey

import java.lang.management.ManagementFactory
import java.nio.ByteBuffer
import java.util.concurrent.CountDownLatch
Expand All @@ -36,6 +34,7 @@ import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests.TransactionResult
import org.apache.kafka.common.utils.MockTime
import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey
import org.apache.kafka.server.util.MockScheduler
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchIsolation, LogConfig, LogOffsetMetadata}
import org.junit.jupiter.api.Assertions._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,10 @@ private static String convertDeprecatedClass(String className) {
System.err.println("WARNING: kafka.tools.NoOpMessageFormatter is deprecated and will be removed in the next major release. " +
"Please use org.apache.kafka.tools.consumer.NoOpMessageFormatter instead");
return NoOpMessageFormatter.class.getName();
case "kafka.coordinator.transaction.TransactionLog$TransactionLogMessageFormatter":
System.err.println("WARNING: kafka.coordinator.transaction.TransactionLog$TransactionLogMessageFormatter is deprecated and will be removed in the next major release. " +
"Please use org.apache.kafka.tools.consumer.TransactionLogMessageFormatter instead");
return className;
default:
return className;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.MessageFormatter;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey;
import org.apache.kafka.coordinator.transaction.generated.TransactionLogKeyJsonConverter;
import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue;
import org.apache.kafka.coordinator.transaction.generated.TransactionLogValueJsonConverter;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.NullNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;

import java.io.IOException;
import java.io.PrintStream;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.Optional;

import static java.nio.charset.StandardCharsets.UTF_8;

public class TransactionLogMessageFormatter implements MessageFormatter {

private static final String VERSION = "version";
private static final String DATA = "data";
private static final String KEY = "key";
private static final String VALUE = "value";
private static final String UNKNOWN = "unknown";

@Override
public void writeTo(ConsumerRecord<byte[], byte[]> consumerRecord, PrintStream output) {
ObjectNode json = new ObjectNode(JsonNodeFactory.instance);

byte[] key = consumerRecord.key();
if (Objects.nonNull(key)) {
short keyVersion = ByteBuffer.wrap(key).getShort();
JsonNode dataNode = readToTransactionLogKey(ByteBuffer.wrap(key))
.map(logKey -> TransactionLogKeyJsonConverter.write(logKey, keyVersion))
.orElseGet(() -> new TextNode(UNKNOWN));
json.putObject(KEY)
.put(VERSION, keyVersion)
.set(DATA, dataNode);
} else {
json.set(KEY, NullNode.getInstance());
}

byte[] value = consumerRecord.value();
if (Objects.nonNull(value)) {
short valueVersion = ByteBuffer.wrap(value).getShort();
JsonNode dataNode = readToTransactionLogValue(ByteBuffer.wrap(value))
.map(logValue -> TransactionLogValueJsonConverter.write(logValue, valueVersion))
.orElseGet(() -> new TextNode(UNKNOWN));
json.putObject(VALUE)
.put(VERSION, valueVersion)
.set(DATA, dataNode);
} else {
json.set(VALUE, NullNode.getInstance());
}

try {
output.write(json.toString().getBytes(UTF_8));
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private Optional<TransactionLogKey> readToTransactionLogKey(ByteBuffer byteBuffer) {
short version = byteBuffer.getShort();
if (version >= TransactionLogKey.LOWEST_SUPPORTED_VERSION
&& version <= TransactionLogKey.HIGHEST_SUPPORTED_VERSION) {
return Optional.of(new TransactionLogKey(new ByteBufferAccessor(byteBuffer), version));
} else {
return Optional.empty();
}
}

private Optional<TransactionLogValue> readToTransactionLogValue(ByteBuffer byteBuffer) {
short version = byteBuffer.getShort();
if (version >= TransactionLogValue.LOWEST_SUPPORTED_VERSION
&& version <= TransactionLogValue.HIGHEST_SUPPORTED_VERSION) {
return Optional.of(new TransactionLogValue(new ByteBufferAccessor(byteBuffer), version));
} else {
return Optional.empty();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -673,4 +673,26 @@ public void testParseDeprecatedFormatter() throws Exception {
};
assertInstanceOf(NoOpMessageFormatter.class, new ConsoleConsumerOptions(deprecatedNoOpMessageFormatter).formatter());
}

@SuppressWarnings("deprecation")
@Test
public void testNewAndDeprecateTransactionLogMessageFormatter() throws Exception {
String[] deprecatedTransactionLogMessageFormatter = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--partition", "0",
"--formatter", "kafka.coordinator.transaction.TransactionLog$TransactionLogMessageFormatter",
};
assertInstanceOf(kafka.coordinator.transaction.TransactionLog.TransactionLogMessageFormatter.class,
new ConsoleConsumerOptions(deprecatedTransactionLogMessageFormatter).formatter());

String[] transactionLogMessageFormatter = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--partition", "0",
"--formatter", "org.apache.kafka.tools.consumer.TransactionLogMessageFormatter",
};
assertInstanceOf(TransactionLogMessageFormatter.class,
new ConsoleConsumerOptions(transactionLogMessageFormatter).formatter());
}
}
Loading

0 comments on commit ee68f32

Please sign in to comment.