Skip to content

Commit 4a8511d

Browse files
phipagKarthik Puttaswamy
andauthored
fix(kafka): Handle message indices in proto data also for Glue Schema Registry (#1907)
* Bug fix to handle message indices in proto data * Removing links and changing log level from info to debug * Upgrade lambda.events.version tp 3.16.0 * Update Glue schema id to 36. Add some tests to increase coverage. Use Kafka ByteUtils also in unit tests. * Update lambda java events to 3.16.0 everywhere. * Add sample generation code for all Protobuf cases. --------- Co-authored-by: Karthik Puttaswamy <[email protected]>
1 parent 8f70e83 commit 4a8511d

File tree

23 files changed

+433
-137
lines changed

23 files changed

+433
-137
lines changed

examples/powertools-examples-core-utilities/cdk/app/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
<dependency>
4242
<groupId>com.amazonaws</groupId>
4343
<artifactId>aws-lambda-java-events</artifactId>
44-
<version>3.15.0</version>
44+
<version>3.16.0</version>
4545
</dependency>
4646
<dependency>
4747
<groupId>org.apache.logging.log4j</groupId>

examples/powertools-examples-core-utilities/gradle/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ dependencies {
2626
implementation 'com.amazonaws:aws-lambda-java-core:1.2.2'
2727
implementation 'com.fasterxml.jackson.core:jackson-annotations:2.13.2'
2828
implementation 'com.fasterxml.jackson.core:jackson-databind:2.13.2.2'
29-
implementation 'com.amazonaws:aws-lambda-java-events:3.11.0'
29+
implementation 'com.amazonaws:aws-lambda-java-events:3.16.0'
3030
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.13.2'
3131
implementation 'org.aspectj:aspectjrt:1.9.20.1'
3232
aspect 'software.amazon.lambda:powertools-tracing:2.1.0'

examples/powertools-examples-core-utilities/kotlin/build.gradle.kts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ dependencies {
1212
implementation("com.amazonaws:aws-lambda-java-core:1.2.3")
1313
implementation("com.fasterxml.jackson.core:jackson-annotations:2.15.1")
1414
implementation("com.fasterxml.jackson.core:jackson-databind:2.15.3")
15-
implementation("com.amazonaws:aws-lambda-java-events:3.11.3")
15+
implementation("com.amazonaws:aws-lambda-java-events:3.16.0")
1616
implementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.15.2")
1717
implementation("org.aspectj:aspectjrt:1.9.20.1")
1818
aspect("software.amazon.lambda:powertools-tracing:2.1.0")
@@ -23,4 +23,4 @@ dependencies {
2323

2424
kotlin {
2525
jvmToolchain(11)
26-
}
26+
}

examples/powertools-examples-core-utilities/sam-graalvm/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
<dependency>
4040
<groupId>com.amazonaws</groupId>
4141
<artifactId>aws-lambda-java-events</artifactId>
42-
<version>3.11.3</version>
42+
<version>3.16.0</version>
4343
</dependency>
4444
<dependency>
4545
<groupId>org.aspectj</groupId>

examples/powertools-examples-core-utilities/sam/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
<dependency>
3939
<groupId>com.amazonaws</groupId>
4040
<artifactId>aws-lambda-java-events</artifactId>
41-
<version>3.15.0</version>
41+
<version>3.16.0</version>
4242
</dependency>
4343
<dependency>
4444
<groupId>org.aspectj</groupId>

examples/powertools-examples-core-utilities/serverless/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
<dependency>
3939
<groupId>com.amazonaws</groupId>
4040
<artifactId>aws-lambda-java-events</artifactId>
41-
<version>3.15.0</version>
41+
<version>3.16.0</version>
4242
</dependency>
4343
<dependency>
4444
<groupId>org.aspectj</groupId>

examples/powertools-examples-core-utilities/terraform/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
<dependency>
3939
<groupId>com.amazonaws</groupId>
4040
<artifactId>aws-lambda-java-events</artifactId>
41-
<version>3.15.0</version>
41+
<version>3.16.0</version>
4242
</dependency>
4343
<dependency>
4444
<groupId>org.aspectj</groupId>

examples/powertools-examples-idempotency/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@
5252
<dependency>
5353
<groupId>com.amazonaws</groupId>
5454
<artifactId>aws-lambda-java-events</artifactId>
55-
<version>3.15.0</version>
55+
<version>3.16.0</version>
5656
</dependency>
5757
<dependency>
5858
<groupId>org.aspectj</groupId>

examples/powertools-examples-kafka/events/kafka-protobuf-event.json

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,11 @@
3030
{
3131
"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]
3232
}
33-
]
33+
],
34+
"valueSchemaMetadata": {
35+
"schemaId": "123",
36+
"dataFormat": "PROTOBUF"
37+
}
3438
},
3539
{
3640
"topic": "mytopic",
@@ -39,12 +43,34 @@
3943
"timestamp": 1545084650989,
4044
"timestampType": "CREATE_TIME",
4145
"key": null,
42-
"value": "AgEACOkHEgZMYXB0b3AZUrgehes/j0A=",
46+
"value": "BAIACOkHEgZMYXB0b3AZUrgehes/j0A=",
4347
"headers": [
4448
{
4549
"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]
4650
}
47-
]
51+
],
52+
"valueSchemaMetadata": {
53+
"schemaId": "456",
54+
"dataFormat": "PROTOBUF"
55+
}
56+
},
57+
{
58+
"topic": "mytopic",
59+
"partition": 0,
60+
"offset": 18,
61+
"timestamp": 1545084650990,
62+
"timestampType": "CREATE_TIME",
63+
"key": "NDI=",
64+
"value": "AQjpBxIGTGFwdG9wGVK4HoXrP49A",
65+
"headers": [
66+
{
67+
"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]
68+
}
69+
],
70+
"valueSchemaMetadata": {
71+
"schemaId": "12345678-1234-1234-1234-123456789012",
72+
"dataFormat": "PROTOBUF"
73+
}
4874
}
4975
]
5076
}

examples/powertools-examples-kafka/tools/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
<maven.compiler.target>11</maven.compiler.target>
1414
<avro.version>1.12.0</avro.version>
1515
<protobuf.version>4.31.0</protobuf.version>
16+
<kafka-clients.version>4.0.0</kafka-clients.version>
1617
</properties>
1718

1819
<dependencies>
@@ -26,6 +27,11 @@
2627
<artifactId>protobuf-java</artifactId>
2728
<version>${protobuf.version}</version>
2829
</dependency>
30+
<dependency>
31+
<groupId>org.apache.kafka</groupId>
32+
<artifactId>kafka-clients</artifactId>
33+
<version>${kafka-clients.version}</version>
34+
</dependency>
2935
<dependency>
3036
<groupId>com.fasterxml.jackson.core</groupId>
3137
<artifactId>jackson-databind</artifactId>

examples/powertools-examples-kafka/tools/src/main/java/org/demo/kafka/tools/GenerateProtobufSamples.java

Lines changed: 84 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@
22

33
import java.io.ByteArrayOutputStream;
44
import java.io.IOException;
5+
import java.nio.ByteBuffer;
56
import java.util.Base64;
67

8+
import org.apache.kafka.common.utils.ByteUtils;
79
import org.demo.kafka.protobuf.ProtobufProduct;
810

911
import com.google.protobuf.CodedOutputStream;
@@ -19,77 +21,102 @@ private GenerateProtobufSamples() {
1921
}
2022

2123
public static void main(String[] args) throws IOException {
22-
// Create a single product that will be used for all three scenarios
24+
// Create a single product that will be used for all four scenarios
2325
ProtobufProduct product = ProtobufProduct.newBuilder()
2426
.setId(1001)
2527
.setName("Laptop")
2628
.setPrice(999.99)
2729
.build();
2830

29-
// Create three different serializations of the same product
31+
// Create four different serializations of the same product
3032
String standardProduct = serializeAndEncode(product);
31-
String productWithSimpleIndex = serializeWithSimpleMessageIndex(product);
32-
String productWithComplexIndex = serializeWithComplexMessageIndex(product);
33+
String productWithConfluentSimpleIndex = serializeWithConfluentSimpleMessageIndex(product);
34+
String productWithConfluentComplexIndex = serializeWithConfluentComplexMessageIndex(product);
35+
String productWithGlueMagicByte = serializeWithGlueMagicByte(product);
3336

3437
// Serialize and encode an integer key (same for all records)
3538
String encodedKey = serializeAndEncodeInteger(42);
3639

3740
// Print the results
38-
System.out.println("Base64 encoded Protobuf products with different message index scenarios:");
39-
System.out.println("\n1. Standard Protobuf (no message index):");
41+
System.out.println("Base64 encoded Protobuf products with different scenarios:");
42+
System.out.println("\n1. Plain Protobuf (no schema registry):");
4043
System.out.println("value: \"" + standardProduct + "\"");
4144

42-
System.out.println("\n2. Simple Message Index (single 0):");
43-
System.out.println("value: \"" + productWithSimpleIndex + "\"");
45+
System.out.println("\n2. Confluent with Simple Message Index (optimized single 0):");
46+
System.out.println("value: \"" + productWithConfluentSimpleIndex + "\"");
4447

45-
System.out.println("\n3. Complex Message Index (array [1,0]):");
46-
System.out.println("value: \"" + productWithComplexIndex + "\"");
48+
System.out.println("\n3. Confluent with Complex Message Index (array [1,0]):");
49+
System.out.println("value: \"" + productWithConfluentComplexIndex + "\"");
50+
51+
System.out.println("\n4. Glue with Magic Byte:");
52+
System.out.println("value: \"" + productWithGlueMagicByte + "\"");
4753

4854
// Print the merged event structure
4955
System.out.println("\n" + "=".repeat(80));
50-
System.out.println("MERGED EVENT WITH ALL THREE SCENARIOS");
56+
System.out.println("MERGED EVENT WITH ALL FOUR SCENARIOS");
5157
System.out.println("=".repeat(80));
52-
printSampleEvent(encodedKey, standardProduct, productWithSimpleIndex, productWithComplexIndex);
58+
printSampleEvent(encodedKey, standardProduct, productWithConfluentSimpleIndex, productWithConfluentComplexIndex,
59+
productWithGlueMagicByte);
5360
}
5461

5562
private static String serializeAndEncode(ProtobufProduct product) {
5663
return Base64.getEncoder().encodeToString(product.toByteArray());
5764
}
5865

5966
/**
60-
* Serializes a protobuf product with a simple Confluent message index (single 0).
67+
* Serializes a protobuf product with a simple Confluent message index (optimized single 0).
6168
* Format: [0][protobuf_data]
6269
*
6370
* @see {@link https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format}
6471
*/
65-
private static String serializeWithSimpleMessageIndex(ProtobufProduct product) throws IOException {
72+
private static String serializeWithConfluentSimpleMessageIndex(ProtobufProduct product) throws IOException {
6673
ByteArrayOutputStream baos = new ByteArrayOutputStream();
67-
CodedOutputStream codedOutput = CodedOutputStream.newInstance(baos);
6874

69-
// Write simple message index (single 0)
70-
codedOutput.writeUInt32NoTag(0);
75+
// Write optimized simple message index for Confluent (single 0 byte for [0])
76+
baos.write(0);
7177

7278
// Write the protobuf data
73-
product.writeTo(codedOutput);
79+
baos.write(product.toByteArray());
7480

75-
codedOutput.flush();
7681
return Base64.getEncoder().encodeToString(baos.toByteArray());
7782
}
7883

7984
/**
8085
* Serializes a protobuf product with a complex Confluent message index (array [1,0]).
81-
* Format: [2][1][0][protobuf_data] where 2 is the array length
86+
* Format: [2][1][0][protobuf_data] where 2 is the array length using varint encoding
8287
*
8388
* @see {@link https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format}
8489
*/
85-
private static String serializeWithComplexMessageIndex(ProtobufProduct product) throws IOException {
90+
private static String serializeWithConfluentComplexMessageIndex(ProtobufProduct product) throws IOException {
91+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
92+
93+
// Write complex message index array [1,0] using ByteUtils
94+
ByteBuffer buffer = ByteBuffer.allocate(1024);
95+
ByteUtils.writeVarint(2, buffer); // Array length
96+
ByteUtils.writeVarint(1, buffer); // First index value
97+
ByteUtils.writeVarint(0, buffer); // Second index value
98+
99+
buffer.flip();
100+
byte[] indexData = new byte[buffer.remaining()];
101+
buffer.get(indexData);
102+
baos.write(indexData);
103+
104+
// Write the protobuf data
105+
baos.write(product.toByteArray());
106+
107+
return Base64.getEncoder().encodeToString(baos.toByteArray());
108+
}
109+
110+
/**
111+
* Serializes a protobuf product with Glue magic byte.
112+
* Format: [1][protobuf_data] where 1 is the magic byte
113+
*/
114+
private static String serializeWithGlueMagicByte(ProtobufProduct product) throws IOException {
86115
ByteArrayOutputStream baos = new ByteArrayOutputStream();
87116
CodedOutputStream codedOutput = CodedOutputStream.newInstance(baos);
88117

89-
// Write complex message index array [1,0]
90-
codedOutput.writeUInt32NoTag(2); // Array length
91-
codedOutput.writeUInt32NoTag(1); // First index value
92-
codedOutput.writeUInt32NoTag(0); // Second index value
118+
// Write Glue magic byte (single UInt32)
119+
codedOutput.writeUInt32NoTag(1);
93120

94121
// Write the protobuf data
95122
product.writeTo(codedOutput);
@@ -103,8 +130,8 @@ private static String serializeAndEncodeInteger(Integer value) {
103130
return Base64.getEncoder().encodeToString(value.toString().getBytes());
104131
}
105132

106-
private static void printSampleEvent(String key, String standardProduct, String simpleIndexProduct,
107-
String complexIndexProduct) {
133+
private static void printSampleEvent(String key, String standardProduct, String confluentSimpleProduct,
134+
String confluentComplexProduct, String glueProduct) {
108135
System.out.println("{\n" +
109136
" \"eventSource\": \"aws:kafka\",\n" +
110137
" \"eventSourceArn\": \"arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4\",\n"
@@ -134,12 +161,16 @@ private static void printSampleEvent(String key, String standardProduct, String
134161
" \"timestamp\": 1545084650988,\n" +
135162
" \"timestampType\": \"CREATE_TIME\",\n" +
136163
" \"key\": \"" + key + "\",\n" +
137-
" \"value\": \"" + simpleIndexProduct + "\",\n" +
164+
" \"value\": \"" + confluentSimpleProduct + "\",\n" +
138165
" \"headers\": [\n" +
139166
" {\n" +
140167
" \"headerKey\": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]\n" +
141168
" }\n" +
142-
" ]\n" +
169+
" ],\n" +
170+
" \"valueSchemaMetadata\": {\n" +
171+
" \"schemaId\": \"123\",\n" +
172+
" \"dataFormat\": \"PROTOBUF\"\n" +
173+
" }\n" +
143174
" },\n" +
144175
" {\n" +
145176
" \"topic\": \"mytopic\",\n" +
@@ -148,12 +179,34 @@ private static void printSampleEvent(String key, String standardProduct, String
148179
" \"timestamp\": 1545084650989,\n" +
149180
" \"timestampType\": \"CREATE_TIME\",\n" +
150181
" \"key\": null,\n" +
151-
" \"value\": \"" + complexIndexProduct + "\",\n" +
182+
" \"value\": \"" + confluentComplexProduct + "\",\n" +
152183
" \"headers\": [\n" +
153184
" {\n" +
154185
" \"headerKey\": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]\n" +
155186
" }\n" +
156-
" ]\n" +
187+
" ],\n" +
188+
" \"valueSchemaMetadata\": {\n" +
189+
" \"schemaId\": \"456\",\n" +
190+
" \"dataFormat\": \"PROTOBUF\"\n" +
191+
" }\n" +
192+
" },\n" +
193+
" {\n" +
194+
" \"topic\": \"mytopic\",\n" +
195+
" \"partition\": 0,\n" +
196+
" \"offset\": 18,\n" +
197+
" \"timestamp\": 1545084650990,\n" +
198+
" \"timestampType\": \"CREATE_TIME\",\n" +
199+
" \"key\": \"" + key + "\",\n" +
200+
" \"value\": \"" + glueProduct + "\",\n" +
201+
" \"headers\": [\n" +
202+
" {\n" +
203+
" \"headerKey\": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]\n" +
204+
" }\n" +
205+
" ],\n" +
206+
" \"valueSchemaMetadata\": {\n" +
207+
" \"schemaId\": \"12345678-1234-1234-1234-123456789012\",\n" +
208+
" \"dataFormat\": \"PROTOBUF\"\n" +
209+
" }\n" +
157210
" }\n" +
158211
" ]\n" +
159212
" }\n" +

examples/powertools-examples-parameters/sam-graalvm/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
<dependency>
4040
<groupId>com.amazonaws</groupId>
4141
<artifactId>aws-lambda-java-events</artifactId>
42-
<version>3.15.0</version>
42+
<version>3.16.0</version>
4343
</dependency>
4444
<dependency>
4545
<groupId>org.aspectj</groupId>

examples/powertools-examples-parameters/sam/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
<dependency>
3939
<groupId>com.amazonaws</groupId>
4040
<artifactId>aws-lambda-java-events</artifactId>
41-
<version>3.15.0</version>
41+
<version>3.16.0</version>
4242
</dependency>
4343
<dependency>
4444
<groupId>org.aspectj</groupId>

examples/powertools-examples-serialization/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
<dependency>
3232
<groupId>com.amazonaws</groupId>
3333
<artifactId>aws-lambda-java-events</artifactId>
34-
<version>3.15.0</version>
34+
<version>3.16.0</version>
3535
</dependency>
3636
</dependencies>
3737

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@
8989
<payloadoffloading-common.version>2.2.0</payloadoffloading-common.version>
9090
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
9191
<lambda.core.version>1.2.3</lambda.core.version>
92-
<lambda.events.version>3.15.0</lambda.events.version>
92+
<lambda.events.version>3.16.0</lambda.events.version>
9393
<lambda.serial.version>1.1.5</lambda.serial.version>
9494
<maven-compiler-plugin.version>3.13.0</maven-compiler-plugin.version>
9595
<aspectj.version>1.9.7</aspectj.version>

0 commit comments

Comments
 (0)