diff --git a/spring/pom.xml b/spring/pom.xml
index 76e3bc4e9..035c344cd 100644
--- a/spring/pom.xml
+++ b/spring/pom.xml
@@ -63,6 +63,11 @@
spring-messaging
true
+
+ org.springframework.amqp
+ spring-rabbit
+ true
+
io.cloudevents
cloudevents-core
diff --git a/spring/src/main/java/io/cloudevents/spring/amqp/CloudEventMessageConverter.java b/spring/src/main/java/io/cloudevents/spring/amqp/CloudEventMessageConverter.java
new file mode 100644
index 000000000..7b846cb71
--- /dev/null
+++ b/spring/src/main/java/io/cloudevents/spring/amqp/CloudEventMessageConverter.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2020-Present The CloudEvents Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.cloudevents.spring.amqp;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.CloudEventContext;
+import io.cloudevents.SpecVersion;
+import io.cloudevents.core.CloudEventUtils;
+import io.cloudevents.core.format.EventFormat;
+import io.cloudevents.core.message.MessageReader;
+import io.cloudevents.core.message.impl.GenericStructuredMessageReader;
+import io.cloudevents.core.message.impl.MessageUtils;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.core.MessageProperties;
+import org.springframework.amqp.support.converter.MessageConverter;
+
+/**
+ * A {@link MessageConverter} that can translate to and from a {@link Message} and a {@link CloudEvent}.
+ * The {@link CloudEventContext} is canonicalized, with key names given a {@code ce-} prefix in the
+ * {@link MessageProperties}.
+ *
+ * @author Lars Michele
+ * @see io.cloudevents.spring.messaging.CloudEventMessageConverter used as stencil for the implementation
+ */
+public class CloudEventMessageConverter implements MessageConverter {
+
+ @Override
+ public CloudEvent fromMessage(Message message) {
+ return createMessageReader(message).toEvent();
+ }
+
+ @Override
+ public Message toMessage(Object object, MessageProperties messageProperties) {
+ if (object instanceof CloudEvent) {
+ CloudEvent event = (CloudEvent) object;
+ return CloudEventUtils.toReader(event).read(new MessageBuilderMessageWriter(messageProperties));
+ }
+ return null;
+ }
+
+ private MessageReader createMessageReader(Message message) {
+ return MessageUtils.parseStructuredOrBinaryMessage(
+ () -> contentType(message.getMessageProperties()),
+ format -> structuredMessageReader(message, format),
+ () -> version(message.getMessageProperties()),
+ version -> binaryMessageReader(message, version)
+ );
+ }
+
+ private String version(MessageProperties properties) {
+ Object header = properties.getHeader(CloudEventsHeaders.SPEC_VERSION);
+ return header == null ? null : header.toString();
+ }
+
+ private MessageReader binaryMessageReader(Message message, SpecVersion version) {
+ return new MessageBinaryMessageReader(version, message.getMessageProperties(), message.getBody());
+ }
+
+ private MessageReader structuredMessageReader(Message message, EventFormat format) {
+ return new GenericStructuredMessageReader(format, message.getBody());
+ }
+
+ private String contentType(MessageProperties properties) {
+ String contentType = properties.getContentType();
+ if (contentType == null) {
+ Object header = properties.getHeader(CloudEventsHeaders.CONTENT_TYPE);
+ return header == null ? null : header.toString();
+ }
+ return contentType;
+ }
+}
diff --git a/spring/src/main/java/io/cloudevents/spring/amqp/CloudEventsHeaders.java b/spring/src/main/java/io/cloudevents/spring/amqp/CloudEventsHeaders.java
new file mode 100644
index 000000000..132910216
--- /dev/null
+++ b/spring/src/main/java/io/cloudevents/spring/amqp/CloudEventsHeaders.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2020-Present The CloudEvents Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.cloudevents.spring.amqp;
+
+public class CloudEventsHeaders {
+
+ /**
+ * CloudEvent attributes MUST be prefixed with either "cloudEvents_" or "cloudEvents:" for use in the application-properties section.
+ * @see
+ * AMQP Protocol Binding for CloudEvents
+ * */
+ public static final String CE_PREFIX = "cloudEvents_";
+
+ public static final String SPEC_VERSION = CE_PREFIX + "specversion";
+
+ public static final String CONTENT_TYPE = CE_PREFIX + "datacontenttype";
+}
diff --git a/spring/src/main/java/io/cloudevents/spring/amqp/MessageBinaryMessageReader.java b/spring/src/main/java/io/cloudevents/spring/amqp/MessageBinaryMessageReader.java
new file mode 100644
index 000000000..c4a11c27c
--- /dev/null
+++ b/spring/src/main/java/io/cloudevents/spring/amqp/MessageBinaryMessageReader.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2020-Present The CloudEvents Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.cloudevents.spring.amqp;
+
+import static io.cloudevents.spring.amqp.CloudEventsHeaders.*;
+import static org.springframework.amqp.support.AmqpHeaders.CONTENT_TYPE;
+
+import java.util.Map;
+import java.util.function.BiConsumer;
+
+import io.cloudevents.SpecVersion;
+import io.cloudevents.core.data.BytesCloudEventData;
+import io.cloudevents.core.impl.StringUtils;
+import io.cloudevents.core.message.impl.BaseGenericBinaryMessageReaderImpl;
+import org.springframework.amqp.core.MessageProperties;
+
+/**
+ * Utility for converting {@link MessageProperties} (message headers) to `CloudEvent` contexts.
+ *
+ * @author Lars Michele
+ * @see io.cloudevents.spring.messaging.MessageBinaryMessageReader used as stencil for the implementation
+ */
+class MessageBinaryMessageReader extends BaseGenericBinaryMessageReaderImpl {
+
+ private final Map headers;
+
+ public MessageBinaryMessageReader(SpecVersion version, MessageProperties properties, byte[] payload) {
+ super(version, payload == null ? null : BytesCloudEventData.wrap(payload));
+ this.headers = properties.getHeaders();
+ }
+
+ @Override
+ protected boolean isContentTypeHeader(String key) {
+ return CONTENT_TYPE.equalsIgnoreCase(key);
+ }
+
+ @Override
+ protected boolean isCloudEventsHeader(String key) {
+ return key != null && key.length() > CE_PREFIX.length() && StringUtils.startsWithIgnoreCase(key, CE_PREFIX);
+ }
+
+ @Override
+ protected String toCloudEventsKey(String key) {
+ return key.substring(CE_PREFIX.length()).toLowerCase();
+ }
+
+ @Override
+ protected void forEachHeader(BiConsumer fn) {
+ headers.forEach(fn);
+ }
+
+ @Override
+ protected String toCloudEventsValue(Object value) {
+ return value.toString();
+ }
+}
diff --git a/spring/src/main/java/io/cloudevents/spring/amqp/MessageBuilderMessageWriter.java b/spring/src/main/java/io/cloudevents/spring/amqp/MessageBuilderMessageWriter.java
new file mode 100644
index 000000000..f51046133
--- /dev/null
+++ b/spring/src/main/java/io/cloudevents/spring/amqp/MessageBuilderMessageWriter.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2020-Present The CloudEvents Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.cloudevents.spring.amqp;
+
+import static io.cloudevents.spring.amqp.CloudEventsHeaders.*;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import io.cloudevents.CloudEventData;
+import io.cloudevents.SpecVersion;
+import io.cloudevents.core.format.EventFormat;
+import io.cloudevents.core.message.MessageWriter;
+import io.cloudevents.rw.CloudEventContextWriter;
+import io.cloudevents.rw.CloudEventRWException;
+import io.cloudevents.rw.CloudEventWriter;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.core.MessageBuilder;
+import org.springframework.amqp.core.MessageProperties;
+
+/**
+ * Internal utility class for copying CloudEvent
context to {@link MessageProperties} (message
+ * headers).
+ *
+ * @author Lars Michele
+ * @see io.cloudevents.spring.messaging.MessageBuilderMessageWriter used as stencil for the implementation
+ */
+class MessageBuilderMessageWriter implements CloudEventWriter, MessageWriter {
+
+ private final Map headers = new HashMap<>();
+
+ public MessageBuilderMessageWriter(MessageProperties properties) {
+ this.headers.putAll(properties.getHeaders());
+ }
+
+ @Override
+ public Message setEvent(EventFormat format, byte[] value) throws CloudEventRWException {
+ headers.put(CONTENT_TYPE, format.serializedContentType());
+ return MessageBuilder.withBody(value).copyHeaders(headers).build();
+ }
+
+ @Override
+ public Message end(CloudEventData value) throws CloudEventRWException {
+ return MessageBuilder.withBody(value == null ? new byte[0] : value.toBytes()).copyHeaders(headers).build();
+ }
+
+ @Override
+ public Message end() {
+ return MessageBuilder.withBody(new byte[0]).copyHeaders(headers).build();
+ }
+
+ @Override
+ public CloudEventContextWriter withContextAttribute(String name, String value) throws CloudEventRWException {
+ headers.put(CE_PREFIX + name, value);
+ return this;
+ }
+
+ @Override
+ public MessageBuilderMessageWriter create(SpecVersion version) {
+ headers.put(SPEC_VERSION, version.toString());
+ return this;
+ }
+}
diff --git a/spring/src/main/java/io/cloudevents/spring/amqp/package-info.java b/spring/src/main/java/io/cloudevents/spring/amqp/package-info.java
new file mode 100644
index 000000000..83706d3e8
--- /dev/null
+++ b/spring/src/main/java/io/cloudevents/spring/amqp/package-info.java
@@ -0,0 +1,4 @@
+/**
+ * Provides classes related to working with Cloud Events within the context of Spring Amqp.
+ */
+package io.cloudevents.spring.amqp;
diff --git a/spring/src/test/java/io/cloudevents/spring/amqp/CloudEventMessageConverterTests.java b/spring/src/test/java/io/cloudevents/spring/amqp/CloudEventMessageConverterTests.java
new file mode 100644
index 000000000..f70690513
--- /dev/null
+++ b/spring/src/test/java/io/cloudevents/spring/amqp/CloudEventMessageConverterTests.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2019-2019 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.cloudevents.spring.amqp;
+
+import static org.assertj.core.api.Assertions.*;
+
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.SpecVersion;
+import io.cloudevents.core.builder.CloudEventBuilder;
+import io.cloudevents.rw.CloudEventRWException;
+import org.junit.jupiter.api.Test;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.core.MessageBuilder;
+import org.springframework.amqp.core.MessageProperties;
+
+/**
+ * @author Lars Michele
+ * @see io.cloudevents.spring.messaging.CloudEventMessageConverterTests used as stencil for the implementation
+ */
+class CloudEventMessageConverterTests {
+
+ private static final String JSON = "{\"specversion\":\"1.0\"," //
+ + "\"id\":\"12345\"," //
+ + "\"source\":\"https://spring.io/events\"," //
+ + "\"type\":\"io.spring.event\"," //
+ + "\"datacontenttype\":\"application/json\"," //
+ + "\"data\":{\"value\":\"Dave\"}" //
+ + "}";
+
+ private final CloudEventMessageConverter converter = new CloudEventMessageConverter();
+
+ @Test
+ void noSpecVersion() {
+ Message message = MessageBuilder.withBody(new byte[0]).build();
+ assertThatExceptionOfType(CloudEventRWException.class).isThrownBy(() -> {
+ assertThat(converter.fromMessage(message)).isNull();
+ });
+ }
+
+ @Test
+ void notValidCloudEvent() {
+ Message message = MessageBuilder.withBody(new byte[0]).setHeader("cloudEvents_specversion", "1.0").build();
+ assertThatExceptionOfType(IllegalStateException.class).isThrownBy(() -> {
+ assertThat(converter.fromMessage(message)).isNull();
+ });
+ }
+
+ @Test
+ void validCloudEvent() {
+ Message message = MessageBuilder.withBody(new byte[0]).setHeader("cloudEvents_specversion", "1.0")
+ .setHeader("cloudEvents_id", "12345").setHeader("cloudEvents_source", "https://spring.io/events")
+ .setHeader("cloudEvents_type", "io.spring.event").build();
+ CloudEvent event = converter.fromMessage(message);
+ assertThat(event).isNotNull();
+ assertThat(event.getSpecVersion()).isEqualTo(SpecVersion.V1);
+ assertThat(event.getId()).isEqualTo("12345");
+ assertThat(event.getSource()).isEqualTo(URI.create("https://spring.io/events"));
+ assertThat(event.getType()).isEqualTo("io.spring.event");
+ }
+
+ @Test
+ void structuredCloudEvent() {
+ byte[] payload = JSON.getBytes(StandardCharsets.UTF_8);
+ Message message = MessageBuilder.withBody(payload)
+ .setContentType("application/cloudevents+json").build();
+ CloudEvent event = converter.fromMessage(message);
+ assertThat(event).isNotNull();
+ assertThat(event.getSpecVersion()).isEqualTo(SpecVersion.V1);
+ assertThat(event.getId()).isEqualTo("12345");
+ assertThat(event.getSource()).isEqualTo(URI.create("https://spring.io/events"));
+ assertThat(event.getType()).isEqualTo("io.spring.event");
+ }
+
+ @Test
+ void fromCloudEvent() {
+ CloudEvent attributes = CloudEventBuilder.v1().withId("A234-1234-1234")
+ .withSource(URI.create("https://spring.io/")).withType("org.springframework")
+ .withData("hello".getBytes(StandardCharsets.UTF_8)).build();
+ Message message = converter.toMessage(attributes, new MessageProperties());
+ Map headers = message.getMessageProperties().getHeaders();
+ assertThat(headers.get("cloudEvents_id")).isEqualTo("A234-1234-1234");
+ assertThat(headers.get("cloudEvents_specversion")).isEqualTo("1.0");
+ assertThat(headers.get("cloudEvents_source")).isEqualTo("https://spring.io/");
+ assertThat(headers.get("cloudEvents_type")).isEqualTo("org.springframework");
+ assertThat("hello".getBytes(StandardCharsets.UTF_8)).isEqualTo(message.getBody());
+ }
+
+ @Test
+ void fromNonCloudEvent() {
+ assertThat(converter.toMessage(new byte[0], new MessageProperties())).isNull();
+ }
+}