Skip to content

Commit

Permalink
feat: implement CloudEventMessageConverter for spring-amqp
Browse files Browse the repository at this point in the history
  • Loading branch information
lyca committed Feb 17, 2024
1 parent 448f02d commit 099921e
Show file tree
Hide file tree
Showing 7 changed files with 378 additions and 0 deletions.
5 changes: 5 additions & 0 deletions spring/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@
<artifactId>spring-messaging</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright 2020-Present The CloudEvents Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.cloudevents.spring.amqp;

import io.cloudevents.CloudEvent;
import io.cloudevents.CloudEventContext;
import io.cloudevents.SpecVersion;
import io.cloudevents.core.CloudEventUtils;
import io.cloudevents.core.format.EventFormat;
import io.cloudevents.core.message.MessageReader;
import io.cloudevents.core.message.impl.GenericStructuredMessageReader;
import io.cloudevents.core.message.impl.MessageUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConverter;

/**
* A {@link MessageConverter} that can translate to and from a {@link Message} and a {@link CloudEvent}.
* The {@link CloudEventContext} is canonicalized, with key names given a {@code ce-} prefix in the
* {@link MessageProperties}.
*
* @author Lars Michele
* @see io.cloudevents.spring.messaging.CloudEventMessageConverter used as stencil for the implementation
*/
public class CloudEventMessageConverter implements MessageConverter {

@Override
public CloudEvent fromMessage(Message message) {
return createMessageReader(message).toEvent();
}

@Override
public Message toMessage(Object object, MessageProperties messageProperties) {
if (object instanceof CloudEvent) {
CloudEvent event = (CloudEvent) object;
return CloudEventUtils.toReader(event).read(new MessageBuilderMessageWriter(messageProperties));
}
return null;
}

private MessageReader createMessageReader(Message message) {
return MessageUtils.parseStructuredOrBinaryMessage(
() -> contentType(message.getMessageProperties()),
format -> structuredMessageReader(message, format),
() -> version(message.getMessageProperties()),
version -> binaryMessageReader(message, version)
);
}

private String version(MessageProperties properties) {
Object header = properties.getHeader(CloudEventsHeaders.SPEC_VERSION);
return header == null ? null : header.toString();
}

private MessageReader binaryMessageReader(Message message, SpecVersion version) {
return new MessageBinaryMessageReader(version, message.getMessageProperties(), message.getBody());
}

private MessageReader structuredMessageReader(Message message, EventFormat format) {
return new GenericStructuredMessageReader(format, message.getBody());
}

private String contentType(MessageProperties properties) {
String contentType = properties.getContentType();
if (contentType == null) {
Object header = properties.getHeader(CloudEventsHeaders.CONTENT_TYPE);
return header == null ? null : header.toString();
}
return contentType;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2020-Present The CloudEvents Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.cloudevents.spring.amqp;

public class CloudEventsHeaders {

/**
* CloudEvent attributes MUST be prefixed with either "cloudEvents_" or "cloudEvents:" for use in the application-properties section.
* @see <a href="https://github.com/cloudevents/spec/blob/main/cloudevents/bindings/amqp-protocol-binding.md#3131-amqp-application-property-names">
* AMQP Protocol Binding for CloudEvents</a>
* */
public static final String CE_PREFIX = "cloudEvents_";

public static final String SPEC_VERSION = CE_PREFIX + "specversion";

public static final String CONTENT_TYPE = CE_PREFIX + "datacontenttype";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright 2020-Present The CloudEvents Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.cloudevents.spring.amqp;

import static io.cloudevents.spring.amqp.CloudEventsHeaders.*;
import static org.springframework.amqp.support.AmqpHeaders.CONTENT_TYPE;

import java.util.Map;
import java.util.function.BiConsumer;

import io.cloudevents.SpecVersion;
import io.cloudevents.core.data.BytesCloudEventData;
import io.cloudevents.core.impl.StringUtils;
import io.cloudevents.core.message.impl.BaseGenericBinaryMessageReaderImpl;
import org.springframework.amqp.core.MessageProperties;

/**
* Utility for converting {@link MessageProperties} (message headers) to `CloudEvent` contexts.
*
* @author Lars Michele
* @see io.cloudevents.spring.messaging.MessageBinaryMessageReader used as stencil for the implementation
*/
class MessageBinaryMessageReader extends BaseGenericBinaryMessageReaderImpl<String, Object> {

private final Map<String, Object> headers;

public MessageBinaryMessageReader(SpecVersion version, MessageProperties properties, byte[] payload) {
super(version, payload == null ? null : BytesCloudEventData.wrap(payload));
this.headers = properties.getHeaders();
}

@Override
protected boolean isContentTypeHeader(String key) {
return CONTENT_TYPE.equalsIgnoreCase(key);
}

@Override
protected boolean isCloudEventsHeader(String key) {
return key != null && key.length() > CE_PREFIX.length() && StringUtils.startsWithIgnoreCase(key, CE_PREFIX);
}

@Override
protected String toCloudEventsKey(String key) {
return key.substring(CE_PREFIX.length()).toLowerCase();
}

@Override
protected void forEachHeader(BiConsumer<String, Object> fn) {
headers.forEach(fn);
}

@Override
protected String toCloudEventsValue(Object value) {
return value.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright 2020-Present The CloudEvents Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.cloudevents.spring.amqp;

import static io.cloudevents.spring.amqp.CloudEventsHeaders.*;

import java.util.HashMap;
import java.util.Map;

import io.cloudevents.CloudEventData;
import io.cloudevents.SpecVersion;
import io.cloudevents.core.format.EventFormat;
import io.cloudevents.core.message.MessageWriter;
import io.cloudevents.rw.CloudEventContextWriter;
import io.cloudevents.rw.CloudEventRWException;
import io.cloudevents.rw.CloudEventWriter;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;

/**
* Internal utility class for copying <code>CloudEvent</code> context to {@link MessageProperties} (message
* headers).
*
* @author Lars Michele
* @see io.cloudevents.spring.messaging.MessageBuilderMessageWriter used as stencil for the implementation
*/
class MessageBuilderMessageWriter implements CloudEventWriter<Message>, MessageWriter<MessageBuilderMessageWriter, Message> {

private final Map<String, Object> headers = new HashMap<>();

public MessageBuilderMessageWriter(MessageProperties properties) {
this.headers.putAll(properties.getHeaders());
}

@Override
public Message setEvent(EventFormat format, byte[] value) throws CloudEventRWException {
headers.put(CONTENT_TYPE, format.serializedContentType());
return MessageBuilder.withBody(value).copyHeaders(headers).build();
}

@Override
public Message end(CloudEventData value) throws CloudEventRWException {
return MessageBuilder.withBody(value == null ? new byte[0] : value.toBytes()).copyHeaders(headers).build();
}

@Override
public Message end() {
return MessageBuilder.withBody(new byte[0]).copyHeaders(headers).build();
}

@Override
public CloudEventContextWriter withContextAttribute(String name, String value) throws CloudEventRWException {
headers.put(CE_PREFIX + name, value);
return this;
}

@Override
public MessageBuilderMessageWriter create(SpecVersion version) {
headers.put(SPEC_VERSION, version.toString());
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
/**
* Provides classes related to working with Cloud Events within the context of Spring Amqp.
*/
package io.cloudevents.spring.amqp;
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright 2019-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.cloudevents.spring.amqp;

import static org.assertj.core.api.Assertions.*;

import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Map;

import io.cloudevents.CloudEvent;
import io.cloudevents.SpecVersion;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.rw.CloudEventRWException;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;

/**
* @author Lars Michele
* @see io.cloudevents.spring.messaging.CloudEventMessageConverterTests used as stencil for the implementation
*/
class CloudEventMessageConverterTests {

private static final String JSON = "{\"specversion\":\"1.0\"," //
+ "\"id\":\"12345\"," //
+ "\"source\":\"https://spring.io/events\"," //
+ "\"type\":\"io.spring.event\"," //
+ "\"datacontenttype\":\"application/json\"," //
+ "\"data\":{\"value\":\"Dave\"}" //
+ "}";

private final CloudEventMessageConverter converter = new CloudEventMessageConverter();

@Test
void noSpecVersion() {
Message message = MessageBuilder.withBody(new byte[0]).build();
assertThatExceptionOfType(CloudEventRWException.class).isThrownBy(() -> {
assertThat(converter.fromMessage(message)).isNull();
});
}

@Test
void notValidCloudEvent() {
Message message = MessageBuilder.withBody(new byte[0]).setHeader("cloudEvents_specversion", "1.0").build();
assertThatExceptionOfType(IllegalStateException.class).isThrownBy(() -> {
assertThat(converter.fromMessage(message)).isNull();
});
}

@Test
void validCloudEvent() {
Message message = MessageBuilder.withBody(new byte[0]).setHeader("cloudEvents_specversion", "1.0")
.setHeader("cloudEvents_id", "12345").setHeader("cloudEvents_source", "https://spring.io/events")
.setHeader("cloudEvents_type", "io.spring.event").build();
CloudEvent event = converter.fromMessage(message);
assertThat(event).isNotNull();
assertThat(event.getSpecVersion()).isEqualTo(SpecVersion.V1);
assertThat(event.getId()).isEqualTo("12345");
assertThat(event.getSource()).isEqualTo(URI.create("https://spring.io/events"));
assertThat(event.getType()).isEqualTo("io.spring.event");
}

@Test
void structuredCloudEvent() {
byte[] payload = JSON.getBytes(StandardCharsets.UTF_8);
Message message = MessageBuilder.withBody(payload)
.setContentType("application/cloudevents+json").build();
CloudEvent event = converter.fromMessage(message);
assertThat(event).isNotNull();
assertThat(event.getSpecVersion()).isEqualTo(SpecVersion.V1);
assertThat(event.getId()).isEqualTo("12345");
assertThat(event.getSource()).isEqualTo(URI.create("https://spring.io/events"));
assertThat(event.getType()).isEqualTo("io.spring.event");
}

@Test
void fromCloudEvent() {
CloudEvent attributes = CloudEventBuilder.v1().withId("A234-1234-1234")
.withSource(URI.create("https://spring.io/")).withType("org.springframework")
.withData("hello".getBytes(StandardCharsets.UTF_8)).build();
Message message = converter.toMessage(attributes, new MessageProperties());
Map<String, ?> headers = message.getMessageProperties().getHeaders();
assertThat(headers.get("cloudEvents_id")).isEqualTo("A234-1234-1234");
assertThat(headers.get("cloudEvents_specversion")).isEqualTo("1.0");
assertThat(headers.get("cloudEvents_source")).isEqualTo("https://spring.io/");
assertThat(headers.get("cloudEvents_type")).isEqualTo("org.springframework");
assertThat("hello".getBytes(StandardCharsets.UTF_8)).isEqualTo(message.getBody());
}

@Test
void fromNonCloudEvent() {
assertThat(converter.toMessage(new byte[0], new MessageProperties())).isNull();
}
}

0 comments on commit 099921e

Please sign in to comment.