Skip to content

Commit

Permalink
Fixed Protobuf data corruption for CloudEvent serialized/deserialized…
Browse files Browse the repository at this point in the history
… several times (#524)

Fixed issue where mutiple serialize/de-serialize operations
would result in corrupted data if the data was a protobuf
message object.

- Introduced equality checks for ProtoDataWrapper.
- Refactored and cleaned up data-wrappers.

Fixes #523

Signed-off-by: Jem Day <[email protected]>
  • Loading branch information
JemDay authored Feb 27, 2023
1 parent d64aff7 commit 3614a4f
Show file tree
Hide file tree
Showing 6 changed files with 249 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
*/
package io.cloudevents.protobuf;

import com.google.protobuf.Any;
import com.google.protobuf.Message;

import java.util.Arrays;

class ProtoDataWrapper implements ProtoCloudEventData {

private final Message protoMessage;
Expand All @@ -35,4 +38,49 @@ public Message getMessage() {
public byte[] toBytes() {
return protoMessage.toByteArray();
}

@Override
public boolean equals(Object obj) {

if (this == obj) {
return (true);
}

if (!(obj instanceof ProtoDataWrapper)) {
return (false);
}

// Now compare the actual data
ProtoDataWrapper rhs = (ProtoDataWrapper) obj;

if (this.getMessage() == rhs.getMessage()){
return true;
}

// This is split out for readability.
// Compare the content in terms onf an 'Any'.
// - Verify the types match
// - Verify the values match.

final Any lhsAny = getAsAny(this.getMessage());
final Any rhsAny = getAsAny(rhs.getMessage());

final boolean typesMatch = (ProtoSupport.extractMessageType(lhsAny).equals(ProtoSupport.extractMessageType(rhsAny)));

if (typesMatch) {
return lhsAny.getValue().equals(rhsAny.getValue());
} else {
return false;
}
}

private Any getAsAny(Message m) {

if (m instanceof Any) {
return (Any) m;
}

return Any.pack(m);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public <W extends CloudEventWriter<R>, R> R read(
data = BytesCloudEventData.wrap(raw);
break;
case PROTO_DATA:
data = new ProtoAccessor(this.protoCe);
data = new ProtoDataWrapper(this.protoCe.getProtoData());
break;
case DATA_NOT_SET:
break;
Expand All @@ -130,22 +130,4 @@ private OffsetDateTime covertProtoTimestamp(com.google.protobuf.Timestamp timest
return instant.atOffset(ZoneOffset.UTC);
}

private static class ProtoAccessor implements ProtoCloudEventData {

private final Message message;

ProtoAccessor(CloudEvent proto){
this.message = proto.getProtoData();
}

@Override
public Message getMessage() {
return message;
}

@Override
public byte[] toBytes() {
return message.toByteArray();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,8 @@
*/
package io.cloudevents.protobuf;

import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.*;
import com.google.protobuf.Descriptors.FieldDescriptor;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Timestamp;
import io.cloudevents.CloudEventData;
import io.cloudevents.SpecVersion;
import io.cloudevents.core.CloudEventUtils;
Expand Down Expand Up @@ -248,8 +245,14 @@ public CloudEvent end(CloudEventData data) throws CloudEventRWException {
// If it's a proto message we can handle that directly.
if (data instanceof ProtoCloudEventData) {
final ProtoCloudEventData protoData = (ProtoCloudEventData) data;
if (protoData.getMessage() != null) {
protoBuilder.setProtoData(Any.pack(protoData.getMessage()));
final Message m = protoData.getMessage();
if (m != null) {
// If it's already an 'Any' don't re-pack it.
if (m instanceof Any) {
protoBuilder.setProtoData((Any) m);
}else {
protoBuilder.setProtoData(Any.pack(m));
}
}
} else {
if (Objects.equals(dataContentType, PROTO_DATA_CONTENT_TYPE)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package io.cloudevents.protobuf;

import com.google.protobuf.Any;

/**
* General support functions.
*/
Expand Down Expand Up @@ -44,4 +46,16 @@ static boolean isTextContent(String contentType) {
|| contentType.endsWith("+xml")
;
}

/**
* Extract the Protobuf message type from an 'Any'
* @param anyMessage
* @return
*/
static String extractMessageType(final Any anyMessage) {
final String typeUrl = anyMessage.getTypeUrl();
final String[] parts = typeUrl.split("/");

return parts[parts.length -1];
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright 2018-Present The CloudEvents Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.cloudevents.protobuf;

import com.google.protobuf.Any;
import com.google.protobuf.Message;
import org.junit.jupiter.api.Test;

import java.util.Arrays;

import static org.assertj.core.api.Assertions.assertThat;

class ProtoDataWrapperTest {

// == Closing Quotes for 2023/02/23
private final Message quote1 = io.cloudevents.test.v1.proto.Test.Quote.newBuilder()
.setPrice(io.cloudevents.test.v1.proto.Test.Decimal.newBuilder().setScale(2).setUnscaled(7519).build())
.setHigh(io.cloudevents.test.v1.proto.Test.Decimal.newBuilder().setScale(2).setUnscaled(7628).build())
.setSymbol("PYPL")
.build();

private final Message quote2 = io.cloudevents.test.v1.proto.Test.Quote.newBuilder()
.setPrice(io.cloudevents.test.v1.proto.Test.Decimal.newBuilder().setScale(2).setUnscaled(13097).build())
.setHigh(io.cloudevents.test.v1.proto.Test.Decimal.newBuilder().setScale(2).setUnscaled(13170).build())
.setSymbol("IBM")
.build();

@Test
public void testBasic() {

ProtoDataWrapper pdw = new ProtoDataWrapper(quote1);

assertThat(pdw).isNotNull();
assertThat(pdw.getMessage()).isNotNull();
assertThat(pdw.toBytes()).withFailMessage("toBytes was NULL").isNotNull();
assertThat(pdw.toBytes()).withFailMessage("toBytes[] returned empty array").hasSizeGreaterThan(0);

// This is current behavior and will probably change in the next version.
assertThat(pdw.getMessage()).isInstanceOf(io.cloudevents.test.v1.proto.Test.Quote.class);
}

@Test
public void testEquality() {

ProtoDataWrapper pdw1 = new ProtoDataWrapper(quote1);
ProtoDataWrapper pdw2 = new ProtoDataWrapper(quote1);

ProtoDataWrapper pdw3 = new ProtoDataWrapper(quote2);

assertThat(pdw1).withFailMessage("Self Equality Failed - 1").isEqualTo(pdw1);
assertThat(pdw2).withFailMessage("Self Equality Failed - 2").isEqualTo(pdw2);
assertThat(pdw1).withFailMessage("Self Equality Failed - 3").isEqualTo(pdw2);
assertThat(pdw2).withFailMessage("Self Equality Failed - 4").isEqualTo(pdw1);

assertThat(pdw1).withFailMessage("Non-Equality Failed - 1").isNotEqualTo(null);
assertThat(pdw1).withFailMessage("Non-Equality Failed - 2").isNotEqualTo(pdw3);
assertThat(pdw3).withFailMessage("Non-Equality Failed - 3").isNotEqualTo(pdw2);

}

/**
* Verify the generated bytes[] is correct
*/
@Test
public void testBytes() {

// Our expected 'Any'
final Any expAny = Any.pack(quote1);

// Our expected 'data'
final byte[] expData = expAny.toByteArray();

// Build the wrapper
final ProtoDataWrapper pdw = new ProtoDataWrapper(quote1);

// Get the actual data
final byte[] actData = pdw.toBytes();

// Verify
Arrays.equals(expData, actData);

}

}
Loading

0 comments on commit 3614a4f

Please sign in to comment.