diff --git a/examples/spring/README.md b/examples/spring/README.md new file mode 100644 index 000000000..c4527f7c4 --- /dev/null +++ b/examples/spring/README.md @@ -0,0 +1,6 @@ +## Examples of Cloud Events with Spring + +### Introduction +These modules provides various examples on how you can interact with Cloud Events and various Spring frameworks. + +Please refer to individual modules for more details on specific sample. diff --git a/examples/spring/cloudevents-spring-function-stream-kafka/README.md b/examples/spring/cloudevents-spring-function-stream-kafka/README.md new file mode 100644 index 000000000..ac39e3aec --- /dev/null +++ b/examples/spring/cloudevents-spring-function-stream-kafka/README.md @@ -0,0 +1,78 @@ +## Examples of Cloud Events with Spring Cloud Function and Kafka + +### Introduction +The current example uses [Spring Cloud Function](https://spring.io/projects/spring-cloud-function) framework as its core as well as +the support provided by [Cloud Events Java SDK](https://github.com/cloudevents/sdk-java). +As many things in Spring, Spring Cloud Function allows users to concentrate only on functional aspects of +their requirement while taking care-off the non-functional ones. +For more information on Spring Cloud Function please visit our [project page](https://spring.io/projects/spring-cloud-function). + +It also uses [Spring Cloud Stream](https://spring.io/projects/spring-cloud-stream) microservices framework which also comes with +many messaging middleware binders and as you may have guessed we'll be using Kafka binder to connect your +functionality (i.e., your function) to Kafka topics. + +The example consists of a `CloudeventDemoApplication` class that is a typical Spring Boot Application with a single +function bean which provides implementation of some imaginary functional requirements. + +```java +@Bean +public Function pojoToPojo() { + return event -> { + System.out.println("RECEIVED Spring Release Event: " + event); + return event.setReleaseDateAsString("01-10-2006").setVersion("2.0"); + }; +} +``` +As you can see from its definition it expects and instance of POJO (i.e., `SpringReleaseEvent`) which it also returns +after few updates. + +Once you start the application you can send Kafka messages to it. +Please refer to [Kafka Quick Start](https://kafka.apache.org/quickstart) on how to navigate Kafka message broker, +although we assume that if you are here you already know how. + +Assuming you have Apache Kafka broker running we simplified the process of demo/testing of how to _produce_ and _consume_ Cloud Events +with Apache Kafka. To do this we included `CloudeventDemoApplicationTests` which uses [Spring Kafka](https://spring.io/projects/spring-kafka) +framework to assist with sending Spring Messages as binary-mode and structured-mode Cloud Events. + +Here is one example of sending binary-mode message: + +```java +public void testAsBinary() throws Exception { + try( ConfigurableApplicationContext context = SpringApplication.run(CloudeventDemoApplication.class)) { + KafkaTemplate kafka = context.getBean(KafkaTemplate.class); + + String binaryEvent = "{\"releaseDate\":\"24-03-2004\", \"releaseName\":\"Spring Framework\", \"version\":\"1.0\"}"; + + Message message = MessageBuilder.withPayload(binaryEvent.getBytes(StandardCharsets.UTF_8)) + .setHeader(CloudEventAttributeUtils.DEFAULT_ATTR_PREFIX + CloudEventAttributeUtils.ID, UUID.randomUUID().toString()) + .setHeader(CloudEventAttributeUtils.DEFAULT_ATTR_PREFIX + CloudEventAttributeUtils.SOURCE, "https://spring.io/") + .setHeader(CloudEventAttributeUtils.DEFAULT_ATTR_PREFIX + CloudEventAttributeUtils.SPECVERSION, "1.0") + .setHeader(CloudEventAttributeUtils.DEFAULT_ATTR_PREFIX + CloudEventAttributeUtils.TYPE, "org.springframework") + .setHeader(KafkaHeaders.TOPIC, "pojoToPojo-in-0") + .build(); + + ListenableFuture> future = kafka.send(message); + + assertThat(future.get(1000, TimeUnit.MILLISECONDS).getRecordMetadata()).isNotNull(); + } +} + ``` + +You really don't need to do anything else as Spring Cloud Stream and Spring Cloud Function will take care of all the boilerplate functionality +that deals with connectivity to Kafka, creation of topics and more. You should refer to individual project documentation to learn more details +on how it is done, but here is a quick description: + +_By including `spring-cloud-stream-binder-kafka` as your dependency you've enabled Spring Boot auto-configuration which as typical to Spring Boot +relies on certain defaults (i.e., host, port etc.). It is also recognizes that you have a function and such function will be bound as message +listener to topics which will also be auto-created for you by the framework (yes, you can manage and configure and override all these defaults; +see individual project documentation for more details). The two topics that will be auto-created for you are `pojoToPojo-in-0` and `pojoToPojo-out-0`. +As you can see their names derived from function name and the `in/out` part signifies input and output which effectively corresponds to input and +output of your function, so your function will effectively listen on `pojoToPojo-in-0` topic and it's output will be sent to `pojoToPojo-out-0`._ + +Once you run the test you can see the log message form your function + +```text +RECEIVED Spring Release Event: releaseDate:24-03-2004; releaseName:Spring Framework; version:1.0 +``` + +You can also subscribe to `pojoToPojo-out-0` to see the result message. \ No newline at end of file diff --git a/examples/spring/cloudevents-spring-function-stream-kafka/pom.xml b/examples/spring/cloudevents-spring-function-stream-kafka/pom.xml new file mode 100644 index 000000000..e4f9ad3ed --- /dev/null +++ b/examples/spring/cloudevents-spring-function-stream-kafka/pom.xml @@ -0,0 +1,155 @@ + + + 4.0.0 + cloudevents-spring-function-stream-kafka + 2.0.0-SNAPSHOT + + + org.springframework.boot + spring-boot-starter-parent + 2.4.0-RC1 + + + + + 1.8 + 3.1.0-SNAPSHOT + 1.0.21.RELEASE + + + + + io.cloudevents + cloudevents-spring + ${project.version} + + + + org.springframework.cloud + spring-cloud-stream-binder-kafka + 3.1.0-SNAPSHOT + + + + org.springframework.boot + spring-boot-starter-test + test + + + org.junit.vintage + junit-vintage-engine + + + + + + + + + org.springframework.cloud + spring-cloud-function-dependencies + ${spring-cloud-function.version} + pom + import + + + + + + + + + org.apache.maven.plugins + maven-deploy-plugin + + true + + + + org.springframework.boot + spring-boot-maven-plugin + + + org.springframework.boot.experimental + spring-boot-thin-layout + ${wrapper.version} + + + + + maven-surefire-plugin + + + **/*Tests.java + **/*Test.java + + + **/Abstract*.java + + + + + + + + + spring-snapshots + Spring Snapshots + https://repo.spring.io/libs-snapshot-local + + true + + + false + + + + spring-milestones + Spring Milestones + https://repo.spring.io/libs-milestone-local + + false + + + + spring-releases + Spring Releases + https://repo.spring.io/release + + false + + + + + + spring-snapshots + Spring Snapshots + https://repo.spring.io/libs-snapshot-local + + true + + + false + + + + spring-milestones + Spring Milestones + https://repo.spring.io/libs-milestone-local + + false + + + + spring-releases + Spring Releases + https://repo.spring.io/libs-release-local + + false + + + + + + diff --git a/examples/spring/cloudevents-spring-function-stream-kafka/src/main/java/io/cloudevents/spring/kafka/CloudeventDemoApplication.java b/examples/spring/cloudevents-spring-function-stream-kafka/src/main/java/io/cloudevents/spring/kafka/CloudeventDemoApplication.java new file mode 100644 index 000000000..1aeb0b316 --- /dev/null +++ b/examples/spring/cloudevents-spring-function-stream-kafka/src/main/java/io/cloudevents/spring/kafka/CloudeventDemoApplication.java @@ -0,0 +1,68 @@ +/* + * Copyright 2020-Present The CloudEvents Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cloudevents.spring.kafka; + +import java.net.URI; +import java.util.function.Function; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; + +import io.cloudevents.spring.core.CloudEventAttributeUtils; +import io.cloudevents.spring.core.CloudEventAttributesProvider; + +/** + * Sample application that demonstrates how user functions can be triggered + * by cloud event. + * Given that this particular sample based on spring-cloud-function-web + * support the function itself is a valid REST endpoint where function name + * signifies URL path (e.g., http://localhost:8080/pojoToPojo). + * + * Simply start the application and post cloud event to individual + * function - (see README for instructions) + * + * You can also run CloudeventDemoApplicationTests. + * + * @author Oleg Zhurakousky + * + */ +@SpringBootApplication +public class CloudeventDemoApplication { + + public static void main(String[] args) throws Exception { + SpringApplication.run(CloudeventDemoApplication.class, args); + } + + /* + * This strategy will be called internally by Spring to set Cloud Event output attributes + */ +// @Bean + public CloudEventAttributesProvider cloudEventAttributesProvider() { + return attributes -> CloudEventAttributeUtils.toMutable(attributes) + .setSource(URI.create("https://interface21.com/")) + .setType("com.interface21"); + } + + @Bean + public Function pojoToPojo() { + return event -> { + System.out.println("RECEIVED Spring Release Event: " + event); + return event.setReleaseDateAsString("01-10-2006").setVersion("2.0"); + }; + } +} diff --git a/examples/spring/cloudevents-spring-function-stream-kafka/src/main/java/io/cloudevents/spring/kafka/SpringReleaseEvent.java b/examples/spring/cloudevents-spring-function-stream-kafka/src/main/java/io/cloudevents/spring/kafka/SpringReleaseEvent.java new file mode 100644 index 000000000..a45998589 --- /dev/null +++ b/examples/spring/cloudevents-spring-function-stream-kafka/src/main/java/io/cloudevents/spring/kafka/SpringReleaseEvent.java @@ -0,0 +1,80 @@ +/* + * Copyright 2020-Present The CloudEvents Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cloudevents.spring.kafka; + +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; + +import com.fasterxml.jackson.annotation.JsonFormat; + + +/** + * An example POJO that represents cloud event data + * + * @author Oleg Zhurakousky + * + */ +public class SpringReleaseEvent { + + @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "dd-MM-yyyy", timezone = "Europe/Paris") + private Date releaseDate; + + private String releaseName; + + private String version; + public Date getReleaseDate() { + return releaseDate; + } + + public void setReleaseDate(Date releaseDate) { + this.releaseDate = releaseDate; + } + + public SpringReleaseEvent setReleaseDateAsString(String releaseDate) { + try { + this.releaseDate = new SimpleDateFormat("dd-MM-yyyy").parse(releaseDate); + } + catch (ParseException e) { + throw new IllegalArgumentException(e); + } + return this; + } + + public String getReleaseName() { + return releaseName; + } + + public SpringReleaseEvent setReleaseName(String releaseName) { + this.releaseName = releaseName; + return this; + } + + public String getVersion() { + return version; + } + + public SpringReleaseEvent setVersion(String version) { + this.version = version; + return this; + } + + @Override + public String toString() { + return "releaseDate:" + new SimpleDateFormat("dd-MM-yyyy").format(releaseDate) + "; releaseName:" + releaseName + "; version:" + version; + } +} \ No newline at end of file diff --git a/examples/spring/cloudevents-spring-function-stream-kafka/src/main/resources/application.properties b/examples/spring/cloudevents-spring-function-stream-kafka/src/main/resources/application.properties new file mode 100644 index 000000000..e69de29bb diff --git a/examples/spring/cloudevents-spring-function-stream-kafka/src/test/java/io/cloudevents/spring/kafka/CloudeventDemoApplicationTests.java b/examples/spring/cloudevents-spring-function-stream-kafka/src/test/java/io/cloudevents/spring/kafka/CloudeventDemoApplicationTests.java new file mode 100644 index 000000000..0101e5a24 --- /dev/null +++ b/examples/spring/cloudevents-spring-function-stream-kafka/src/test/java/io/cloudevents/spring/kafka/CloudeventDemoApplicationTests.java @@ -0,0 +1,101 @@ +/* + * Copyright 2020-Present The CloudEvents Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cloudevents.spring.kafka; + +import java.nio.charset.StandardCharsets; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import io.cloudevents.spring.core.CloudEventAttributeUtils; +import io.cloudevents.spring.core.MutableCloudEventAttributes; +import org.junit.jupiter.api.Test; + +import org.springframework.boot.SpringApplication; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.kafka.support.SendResult; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.util.concurrent.ListenableFuture; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Oleg Zhurakousky + * + */ +public class CloudeventDemoApplicationTests { + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testAsBinary() throws Exception { + try (ConfigurableApplicationContext context = SpringApplication.run(CloudeventDemoApplication.class)) { + KafkaTemplate kafka = context.getBean(KafkaTemplate.class); + + String binaryEvent = "{\"releaseDate\":\"24-03-2004\", \"releaseName\":\"Spring Framework\", \"version\":\"1.0\"}"; + + Message message = MessageBuilder.withPayload(binaryEvent.getBytes(StandardCharsets.UTF_8)) + .setHeader(CloudEventAttributeUtils.DEFAULT_ATTR_PREFIX + MutableCloudEventAttributes.ID, + UUID.randomUUID().toString()) + .setHeader(CloudEventAttributeUtils.DEFAULT_ATTR_PREFIX + MutableCloudEventAttributes.SOURCE, + "https://spring.io/") + .setHeader(CloudEventAttributeUtils.DEFAULT_ATTR_PREFIX + MutableCloudEventAttributes.SPECVERSION, + "1.0") + .setHeader(CloudEventAttributeUtils.DEFAULT_ATTR_PREFIX + MutableCloudEventAttributes.TYPE, + "org.springframework") + .setHeader(KafkaHeaders.TOPIC, "pojoToPojo-in-0").build(); + + ListenableFuture> future = kafka.send(message); + + assertThat(future.get(1000, TimeUnit.MILLISECONDS).getRecordMetadata()).isNotNull(); + } + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testAsStructured() throws Exception { + try (ConfigurableApplicationContext context = SpringApplication.run(CloudeventDemoApplication.class)) { + KafkaTemplate kafka = context.getBean(KafkaTemplate.class); + + String structuredEvent = "{\n" + + " \"specversion\" : \"1.0\",\n" + + " \"type\" : \"org.springframework\",\n" + + " \"source\" : \"https://spring.io/\",\n" + + " \"id\" : \"A234-1234-1234\",\n" + + " \"datacontenttype\" : \"application/json\",\n" + + " \"data\" : {\n" + + " \"version\" : \"1.0\",\n" + + " \"releaseName\" : \"Spring Framework\",\n" + + " \"releaseDate\" : \"24-03-2004\"\n" + + " }\n" + + "}"; + + System.out.println(structuredEvent); + Message message = MessageBuilder.withPayload(structuredEvent.getBytes(StandardCharsets.UTF_8)) + .setHeader(MessageHeaders.CONTENT_TYPE, + CloudEventAttributeUtils.APPLICATION_CLOUDEVENTS_VALUE + "+json") + .setHeader(KafkaHeaders.TOPIC, "pojoToPojo-in-0").build(); + + ListenableFuture> future = kafka.send(message); + + assertThat(future.get(1000, TimeUnit.MILLISECONDS).getRecordMetadata()).isNotNull(); + } + } + +} diff --git a/examples/spring/cloudevents-spring-function-web/README.md b/examples/spring/cloudevents-spring-function-web/README.md new file mode 100644 index 000000000..de6ff862b --- /dev/null +++ b/examples/spring/cloudevents-spring-function-web/README.md @@ -0,0 +1,77 @@ +## Examples of Cloud Events with Spring Cloud Function Web + +### Introduction +The current example uses [Spring Cloud Function](https://spring.io/projects/spring-cloud-function) framework as its core as well as +the support provided by [Cloud Events Java SDK](https://github.com/cloudevents/sdk-java). +As many things in Spring, Spring Cloud Function allows users to concentrate only on functional aspects of +their requirement while taking care-off the non-functional ones. +For more information on Spring Cloud Function please visit our [project page](https://spring.io/projects/spring-cloud-function). + +The example consists of a `CloudeventDemoApplication` class that is a typical Spring Boot Application with a single +function bean which provides implementation of some imaginary functional requirements. + +```java +@Bean +public Function pojoToPojo() { + return event -> event.setReleaseDateAsString("01-10-2006").setVersion("2.0"); +} +``` +As you can see from its definition it expects and instance of POJO (i.e., `SpringReleaseEvent`) which it also returns +after few updates. + +Once you start the application you can post HTTP Request as Cloud Event in _binary-mode_ using the following `curl` command: + +```text +curl -w'\n' localhost:8080/pojoToPojo \ + -H "ce-specversion: 1.0" \ + -H "ce-type: com.example.springevent" \ + -H "ce-source: spring.io/spring-event" \ + -H "Content-Type: application/json" \ + -H "ce-id: 0001" \ + -d '{"releaseDate":"24-03-2004", "releaseName":"Spring Framework", "version":"1.0"}' -i + ``` + +...and receive the following response + +```text +{"releaseDate":"01-10-2006","releaseName":"Spring Framework","version":"2.0"} +``` + +You can also inspect response headers and notice that some response headers corresponding to Cloud Event attributes are +different then the request ones. + +``` +. . . +ce-source: http://spring.io/application-application +ce-specversion: 1.0 +ce-type: io.spring.cloudevent.SpringReleaseEvent +ce-id: cf1745f2-3c5a-4095-82f1-29ad5b1ec4f3 +. . . +``` +That is because framework will automatically generate default values for Cloud Event output attributes. + +In the event you want to have control over setting these attributes you can simply define `CloudEventAttributesProvider` +bean where you can set any attribute you want. There is one already provided for you in the example. It's +commented out, but feel free to un-comment and see the difference in the results. + +You can also interact with the same functionality by posting Cloud Event in structured mode using the following `curl` command: + +```text +curl -w'\n' localhost:8080/pojoToPojo \ + -H "Content-Type: application/cloudevents+json" \ + -d '{ + "specversion" : "1.0", + "type" : "org.springframework", + "source" : "https://spring.io/", + "id" : "A234-1234-1234", + "datacontenttype" : "application/json", + "data" : { + "version" : "1.0", + "releaseName" : "Spring Framework", + "releaseDate" : "24-03-2004" + } +}' +``` +... and observe the same results. + +There is also a test case which contains two test which uses Spring's RestTemplate to post Cloud Event in binary-mode and structured-mode. \ No newline at end of file diff --git a/examples/spring/cloudevents-spring-function-web/pom.xml b/examples/spring/cloudevents-spring-function-web/pom.xml new file mode 100644 index 000000000..4c1991ddd --- /dev/null +++ b/examples/spring/cloudevents-spring-function-web/pom.xml @@ -0,0 +1,159 @@ + + + 4.0.0 + cloudevents-spring-function-web + 2.0.0-SNAPSHOT + + + org.springframework.boot + spring-boot-starter-parent + 2.4.0-RC1 + + + + + 1.8 + 3.1.0-SNAPSHOT + 1.0.21.RELEASE + + + + + io.cloudevents + cloudevents-spring + ${project.version} + + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.cloud + spring-cloud-function-web + + + + + org.springframework.boot + spring-boot-starter-test + test + + + org.junit.vintage + junit-vintage-engine + + + + + + + + + org.springframework.cloud + spring-cloud-function-dependencies + ${spring-cloud-function.version} + pom + import + + + + + + + + + org.apache.maven.plugins + maven-deploy-plugin + + true + + + + org.springframework.boot + spring-boot-maven-plugin + + + org.springframework.boot.experimental + spring-boot-thin-layout + ${wrapper.version} + + + + + maven-surefire-plugin + + + **/*Tests.java + **/*Test.java + + + **/Abstract*.java + + + + + + + + + spring-snapshots + Spring Snapshots + https://repo.spring.io/libs-snapshot-local + + true + + + false + + + + spring-milestones + Spring Milestones + https://repo.spring.io/libs-milestone-local + + false + + + + spring-releases + Spring Releases + https://repo.spring.io/release + + false + + + + + + spring-snapshots + Spring Snapshots + https://repo.spring.io/libs-snapshot-local + + true + + + false + + + + spring-milestones + Spring Milestones + https://repo.spring.io/libs-milestone-local + + false + + + + spring-releases + Spring Releases + https://repo.spring.io/libs-release-local + + false + + + + + + diff --git a/examples/spring/cloudevents-spring-function-web/src/main/java/io/cloudevents/spring/functions/CloudeventDemoApplication.java b/examples/spring/cloudevents-spring-function-web/src/main/java/io/cloudevents/spring/functions/CloudeventDemoApplication.java new file mode 100644 index 000000000..e3a5bf58b --- /dev/null +++ b/examples/spring/cloudevents-spring-function-web/src/main/java/io/cloudevents/spring/functions/CloudeventDemoApplication.java @@ -0,0 +1,65 @@ +/* + * Copyright 2020-Present The CloudEvents Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cloudevents.spring.functions; + +import java.net.URI; +import java.util.function.Function; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; + +import io.cloudevents.spring.core.CloudEventAttributeUtils; +import io.cloudevents.spring.core.CloudEventAttributesProvider; + +/** + * Sample application that demonstrates how user functions can be triggered + * by cloud event. + * Given that this particular sample based on spring-cloud-function-web + * support the function itself is a valid REST endpoint where function name + * signifies URL path (e.g., http://localhost:8080/pojoToPojo). + * + * Simply start the application and post cloud event to individual + * function - (see README for instructions) + * + * You can also run CloudeventDemoApplicationTests. + * + * @author Oleg Zhurakousky + * + */ +@SpringBootApplication +public class CloudeventDemoApplication { + + public static void main(String[] args) throws Exception { + SpringApplication.run(CloudeventDemoApplication.class, args); + } + + /* + * This strategy will be called internally by Spring to set Cloud Event output attributes + */ + @Bean + public CloudEventAttributesProvider cloudEventAttributesProvider() { + return attributes -> CloudEventAttributeUtils.toMutable(attributes) + .setSource(URI.create("https://interface21.com/")) + .setType("com.interface21"); + } + + @Bean + public Function pojoToPojo() { + return event -> event.setReleaseDateAsString("01-10-2006").setVersion("2.0"); + } +} diff --git a/examples/spring/cloudevents-spring-function-web/src/main/java/io/cloudevents/spring/functions/SpringReleaseEvent.java b/examples/spring/cloudevents-spring-function-web/src/main/java/io/cloudevents/spring/functions/SpringReleaseEvent.java new file mode 100644 index 000000000..28a53b1fe --- /dev/null +++ b/examples/spring/cloudevents-spring-function-web/src/main/java/io/cloudevents/spring/functions/SpringReleaseEvent.java @@ -0,0 +1,79 @@ +/* + * Copyright 2020-Present The CloudEvents Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cloudevents.spring.functions; + +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; + +import com.fasterxml.jackson.annotation.JsonFormat; + +/** + * An example POJO that represents cloud event data + * + * @author Oleg Zhurakousky + * + */ +public class SpringReleaseEvent { + + @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "dd-MM-yyyy", timezone = "Europe/Paris") + private Date releaseDate; + + private String releaseName; + + private String version; + public Date getReleaseDate() { + return releaseDate; + } + + public void setReleaseDate(Date releaseDate) { + this.releaseDate = releaseDate; + } + + public SpringReleaseEvent setReleaseDateAsString(String releaseDate) { + try { + this.releaseDate = new SimpleDateFormat("dd-MM-yyyy").parse(releaseDate); + } + catch (ParseException e) { + throw new IllegalArgumentException(e); + } + return this; + } + + public String getReleaseName() { + return releaseName; + } + + public SpringReleaseEvent setReleaseName(String releaseName) { + this.releaseName = releaseName; + return this; + } + + public String getVersion() { + return version; + } + + public SpringReleaseEvent setVersion(String version) { + this.version = version; + return this; + } + + @Override + public String toString() { + return "releaseDate:" + new SimpleDateFormat("dd-MM-yyyy").format(releaseDate) + "; releaseName:" + releaseName + "; version:" + version; + } +} diff --git a/examples/spring/cloudevents-spring-function-web/src/main/resources/application.properties b/examples/spring/cloudevents-spring-function-web/src/main/resources/application.properties new file mode 100644 index 000000000..e69de29bb diff --git a/examples/spring/cloudevents-spring-function-web/src/test/java/io/cloudevents/spring/functions/CloudeventDemoApplicationTests.java b/examples/spring/cloudevents-spring-function-web/src/test/java/io/cloudevents/spring/functions/CloudeventDemoApplicationTests.java new file mode 100644 index 000000000..8b0d9d191 --- /dev/null +++ b/examples/spring/cloudevents-spring-function-web/src/test/java/io/cloudevents/spring/functions/CloudeventDemoApplicationTests.java @@ -0,0 +1,159 @@ +/* + * Copyright 2020-Present The CloudEvents Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cloudevents.spring.functions; + +import java.net.URI; +import java.text.SimpleDateFormat; +import java.util.UUID; + +import io.cloudevents.spring.core.CloudEventAttributeUtils; +import io.cloudevents.spring.core.MutableCloudEventAttributes; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.test.web.client.TestRestTemplate; +import org.springframework.cloud.function.json.JsonMapper; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; +import org.springframework.http.MediaType; +import org.springframework.http.RequestEntity; +import org.springframework.http.ResponseEntity; +import org.springframework.util.SocketUtils; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Oleg Zhurakousky + * + */ +public class CloudeventDemoApplicationTests { + + private TestRestTemplate testRestTemplate = new TestRestTemplate(); + + @BeforeEach + public void init() throws Exception { + System.setProperty("server.port", String.valueOf(SocketUtils.findAvailableTcpPort())); + } + + /* + * This test demonstrates consumption of Cloud Event via HTTP POST - binary-mode + * message. According to specification - + * https://github.com/cloudevents/spec/blob/v1.0/spec.md - A "binary-mode message" is + * one where the event data is stored in the message body, and event attributes are + * stored as part of message meta-data. + * + * The above means that it fits perfectly with Spring Message model and as such there + * is absolutely nothing that needs to be done at the framework or user level to + * consume it. It just works! + */ + @Test + public void testAsBinary() throws Exception { + try (ConfigurableApplicationContext context = SpringApplication.run(CloudeventDemoApplication.class)) { + HttpHeaders headers = this.buildHeaders(MediaType.APPLICATION_JSON); + // will work with either content type + // HttpHeaders headers = + // this.buildHeaders(MediaType.valueOf(CloudEventAttributeUtils.APPLICATION_CLOUDEVENTS_VALUE + // + "+json")); + + String payload = "{\"releaseDate\":\"24-03-2004\", \"releaseName\":\"Spring Framework\", \"version\":\"1.0\"}"; + + RequestEntity re = new RequestEntity<>(payload, headers, HttpMethod.POST, + URI.create("http://localhost:" + System.getProperty("server.port") + "/pojoToPojo")); + ResponseEntity response = testRestTemplate.exchange(re, String.class); + + SpringReleaseEvent springEvent = context.getBean(JsonMapper.class).fromJson(response.getBody(), + SpringReleaseEvent.class); + assertThat(springEvent.getVersion()).isEqualTo("2.0"); + assertThat(springEvent.getReleaseDate()).isEqualTo(new SimpleDateFormat("dd-MM-yyyy").parse("01-10-2006")); + + /* + * Uncomment this and comment the next two assertion if + * CloudEventAttributesProvider is enabled in CloudeventDemoApplication + */ +// assertThat(response.getHeaders().getFirst("ce-" + MutableCloudEventAttributes.SOURCE)) +// .isEqualTo("https://interface21.com/"); +// assertThat(response.getHeaders().getFirst("ce-" + MutableCloudEventAttributes.TYPE)) +// .isEqualTo("com.interface21"); + + assertThat(response.getHeaders().getFirst("ce-" + MutableCloudEventAttributes.SOURCE)) + .isEqualTo("http://spring.io/application-application"); + assertThat(response.getHeaders() + .getFirst("ce-" + MutableCloudEventAttributes.TYPE)) .isEqualTo(SpringReleaseEvent.class.getName()); + } + } + + + @Test + public void testAsStrtuctured() throws Exception { + String payload = "{\n" + + " \"specversion\" : \"1.0\",\n" + + " \"type\" : \"org.springframework\",\n" + + " \"source\" : \"https://spring.io/\",\n" + + " \"id\" : \"A234-1234-1234\",\n" + + " \"datacontenttype\" : \"application/json\",\n" + + " \"data\" : {\n" + + " \"version\" : \"1.0\",\n" + + " \"releaseName\" : \"Spring Framework\",\n" + + " \"releaseDate\" : \"24-03-2004\"\n" + + " }\n" + + "}"; + + try (ConfigurableApplicationContext context = SpringApplication.run(CloudeventDemoApplication.class)) { + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.valueOf(CloudEventAttributeUtils.APPLICATION_CLOUDEVENTS_VALUE + "+json")); + + RequestEntity re = new RequestEntity<>(payload, headers, HttpMethod.POST, + URI.create("http://localhost:" + System.getProperty("server.port") + "/pojoToPojo")); + ResponseEntity response = testRestTemplate.exchange(re, String.class); + + SpringReleaseEvent springEvent = context.getBean(JsonMapper.class).fromJson(response.getBody(), + SpringReleaseEvent.class); + assertThat(springEvent.getVersion()).isEqualTo("2.0"); + assertThat(springEvent.getReleaseDate()).isEqualTo(new SimpleDateFormat("dd-MM-yyyy").parse("01-10-2006")); + + /* + * Uncomment this and comment the next two assertion if + * CloudEventAttributesProvider is enabled in CloudeventDemoApplication + */ +// assertThat(response.getHeaders().getFirst("ce-" + MutableCloudEventAttributes.SOURCE)) +// .isEqualTo("https://interface21.com/"); +// assertThat(response.getHeaders().getFirst("ce-" + MutableCloudEventAttributes.TYPE)) +// .isEqualTo("com.interface21"); + + assertThat(response.getHeaders().getFirst("ce-" + MutableCloudEventAttributes.SOURCE)) + .isEqualTo("http://spring.io/application-application"); + assertThat(response.getHeaders().getFirst("ce-" + MutableCloudEventAttributes.TYPE)) + .isEqualTo(SpringReleaseEvent.class.getName()); + } + } + + private HttpHeaders buildHeaders(MediaType contentType) { + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(contentType); + headers.set(CloudEventAttributeUtils.HTTP_ATTR_PREFIX + MutableCloudEventAttributes.ID, + UUID.randomUUID().toString()); + headers.set(CloudEventAttributeUtils.HTTP_ATTR_PREFIX + MutableCloudEventAttributes.SOURCE, + "https://spring.io/"); + headers.set(CloudEventAttributeUtils.HTTP_ATTR_PREFIX + MutableCloudEventAttributes.SPECVERSION, "1.0"); + headers.set(CloudEventAttributeUtils.HTTP_ATTR_PREFIX + MutableCloudEventAttributes.TYPE, + "org.springframework"); + return headers; + } + +} diff --git a/examples/spring/cloudevents-spring-webmvc/README.md b/examples/spring/cloudevents-spring-webmvc/README.md new file mode 100644 index 000000000..d31ee353b --- /dev/null +++ b/examples/spring/cloudevents-spring-webmvc/README.md @@ -0,0 +1,76 @@ +## Examples of Cloud Events with Spring MVC + +### Introduction +The current example uses [Spring Web MVC](https://docs.spring.io/spring-framework/docs/current/reference/html/web.html) and the support provided by [Cloud Events Java SDK](https://github.com/cloudevents/sdk-java). + +The example consists of a `CloudeventDemoApplication` which mapps two HTTP endpoints to accept Cloud Events. One endpoint is for binary-mode and one for structured-mode Cloud Event. + +```java +@PostMapping("/") +public ResponseEntity binary(@RequestBody Person person, @RequestHeader HttpHeaders headers) { + . . . . +} + +@PostMapping(path = "/", consumes = "application/cloudevents+json") +public ResponseEntity structured(@RequestBody Map body, + @RequestHeader HttpHeaders headers) { + . . . . +} +``` + +Once you start the application you can post HTTP Request as Cloud Event in _binary-mode_ using the following `curl` command: + +```text +curl -w'\n' localhost:8080/ \ + -H "ce-specversion: 1.0" \ + -H "ce-type: com.example.person" \ + -H "ce-source: https://spring.io/" \ + -H "Content-Type: application/json" \ + -H "ce-id: 0001" \ + -d '{"name":"Julien"}' -i + ``` + +...and receive the following response + +```text +ce-specversion: 1.0 +ce-id: 5c8e994f-b11e-4ec1-8ae5-c989f7e4838b +ce-source: https://spring.io/ce-webmvc/binary +ce-type: io.cloudevents.spring.webmvc.CloudeventDemoApplication$Person +Content-Type: application/json +Transfer-Encoding: chunked +Date: Fri, 20 Nov 2020 13:56:26 GMT + +{"name":"Julien"} +``` + +You can also interact with the same functionality by posting Cloud Event in structured mode using the following `curl` command: + +```text +curl -w'\n' localhost:8080/ \ + -H "Content-Type: application/cloudevents+json" \ + -d '{ + "specversion" : "1.0", + "type" : "com.example.person", + "source" : "https://spring.io/", + "id" : "A234-1234-1234", + "datacontenttype" : "application/json", + "data" : {"name":"Julien"} +}' -i +``` +... and observe the results: + +```text +ce-datacontenttype: application/json +ce-specversion: 1.0 +ce-id: 3db55b31-6344-4d9b-b39d-04377a137d19 +ce-source: https://spring.io/ce-webmvc/structured +ce-type: io.cloudevents.spring.webmvc.CloudeventDemoApplication$Person +Content-Type: application/json +Transfer-Encoding: chunked +Date: Fri, 20 Nov 2020 13:56:54 GMT + +{"name":"Julien"} +``` + +There is also a test case which contains two test which uses Spring's RestTemplate to post Cloud Event in binary-mode and structured-mode. \ No newline at end of file diff --git a/examples/spring/cloudevents-spring-webmvc/pom.xml b/examples/spring/cloudevents-spring-webmvc/pom.xml new file mode 100644 index 000000000..01b03cc52 --- /dev/null +++ b/examples/spring/cloudevents-spring-webmvc/pom.xml @@ -0,0 +1,156 @@ + + + 4.0.0 + cloudevents-spring-webmvc + 2.0.0-SNAPSHOT + + + org.springframework.boot + spring-boot-starter-parent + 2.4.0-RC1 + + + + + 1.8 + 3.1.0-SNAPSHOT + 1.0.21.RELEASE + + + + + io.cloudevents + cloudevents-spring + ${project.version} + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework + spring-webmvc + + + + org.springframework.boot + spring-boot-starter-test + test + + + org.junit.vintage + junit-vintage-engine + + + + + + + + + org.springframework.cloud + spring-cloud-function-dependencies + ${spring-cloud-function.version} + pom + import + + + + + + + + org.apache.maven.plugins + maven-deploy-plugin + + true + + + + org.springframework.boot + spring-boot-maven-plugin + + + org.springframework.boot.experimental + spring-boot-thin-layout + ${wrapper.version} + + + + + maven-surefire-plugin + + + **/*Tests.java + **/*Test.java + + + **/Abstract*.java + + + + + + + + + spring-snapshots + Spring Snapshots + https://repo.spring.io/libs-snapshot-local + + true + + + false + + + + spring-milestones + Spring Milestones + https://repo.spring.io/libs-milestone-local + + false + + + + spring-releases + Spring Releases + https://repo.spring.io/release + + false + + + + + + spring-snapshots + Spring Snapshots + https://repo.spring.io/libs-snapshot-local + + true + + + false + + + + spring-milestones + Spring Milestones + https://repo.spring.io/libs-milestone-local + + false + + + + spring-releases + Spring Releases + https://repo.spring.io/libs-release-local + + false + + + + + + diff --git a/examples/spring/cloudevents-spring-webmvc/src/main/java/io/cloudevents/spring/webmvc/CloudeventDemoApplication.java b/examples/spring/cloudevents-spring-webmvc/src/main/java/io/cloudevents/spring/webmvc/CloudeventDemoApplication.java new file mode 100644 index 000000000..39f4e650b --- /dev/null +++ b/examples/spring/cloudevents-spring-webmvc/src/main/java/io/cloudevents/spring/webmvc/CloudeventDemoApplication.java @@ -0,0 +1,98 @@ +/* + * Copyright 2020-Present The CloudEvents Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cloudevents.spring.webmvc; + +import java.net.URI; +import java.util.Map; +import java.util.UUID; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.http.HttpHeaders; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestHeader; +import org.springframework.web.bind.annotation.RestController; + +import io.cloudevents.spring.core.CloudEventAttributeUtils; +import io.cloudevents.spring.core.MutableCloudEventAttributes; +import io.cloudevents.spring.http.CloudEventHttpUtils; +/** + * Sample application that demonstrates classing Spring MVC RestController + * + * Simply start the application and post cloud events (see README for instructions) + * + * You can also run CloudeventDemoApplicationTests. + * + * @author Dave Syer + * + */ +@SpringBootApplication +@RestController +public class CloudeventDemoApplication { + + public static void main(String[] args) throws Exception { + SpringApplication.run(CloudeventDemoApplication.class, args); + } + + @PostMapping("/") + public ResponseEntity binary(@RequestBody Person person, @RequestHeader HttpHeaders headers) { + MutableCloudEventAttributes attributes = CloudEventHttpUtils.fromHttp(headers) + .setId(UUID.randomUUID().toString()) + .setSource(URI.create("https://spring.io/ce-webmvc/binary")) + .setType(person.getClass().getName()); + HttpHeaders outgoing = CloudEventHttpUtils.toHttp(attributes); + return ResponseEntity.ok().headers(outgoing).body(person); + } + + @PostMapping(path = "/", consumes = "application/cloudevents+json") + public ResponseEntity structured(@RequestBody Map body, + @RequestHeader HttpHeaders headers) { + MutableCloudEventAttributes attributes = CloudEventAttributeUtils.toAttributes(body) + .setId(UUID.randomUUID().toString()) + .setSource(URI.create("https://spring.io/ce-webmvc/structured")) + .setType(Person.class.getName()); + HttpHeaders outgoing = CloudEventHttpUtils.toHttp(attributes); + return ResponseEntity.ok().headers(outgoing).body(body.get(CloudEventAttributeUtils.DATA)); + } + + public static class Person { + private String name; + + public Person() { + } + + public Person(String name) { + this.name = name; + } + + public String getName() { + return this.name; + } + + public void setName(String name) { + this.name = name; + } + + @Override + public String toString() { + return "Person [name=" + this.name + "]"; + } + } + +} diff --git a/examples/spring/cloudevents-spring-webmvc/src/main/resources/application.properties b/examples/spring/cloudevents-spring-webmvc/src/main/resources/application.properties new file mode 100644 index 000000000..e69de29bb diff --git a/examples/spring/cloudevents-spring-webmvc/src/test/java/io/cloudevents/spring/webmvc/CloudeventDemoApplicationTests.java b/examples/spring/cloudevents-spring-webmvc/src/test/java/io/cloudevents/spring/webmvc/CloudeventDemoApplicationTests.java new file mode 100644 index 000000000..a197dcdcf --- /dev/null +++ b/examples/spring/cloudevents-spring-webmvc/src/test/java/io/cloudevents/spring/webmvc/CloudeventDemoApplicationTests.java @@ -0,0 +1,99 @@ +/* + * Copyright 2020-Present The CloudEvents Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cloudevents.spring.webmvc; + +import java.net.URI; +import io.cloudevents.spring.webmvc.CloudeventDemoApplication.Person; + +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; +import org.springframework.boot.test.web.client.TestRestTemplate; +import org.springframework.boot.web.server.LocalServerPort; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.RequestEntity; +import org.springframework.http.ResponseEntity; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Dave Syer + * + */ +@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT) +public class CloudeventDemoApplicationTests { + + @Autowired + private TestRestTemplate rest; + + @LocalServerPort + private int port; + + @Test + void testAsBinary() { + ResponseEntity response = rest.exchange(RequestEntity.post(URI.create("http://localhost:" + port + "/")) // + .header("ce-id", "12345") // + .header("ce-specversion", "1.0") // + .header("ce-type", "io.spring.event") // + .header("ce-source", "https://spring.io/events") // + .contentType(MediaType.APPLICATION_JSON) // + .body("{\"name\":\"Dave\"}"), String.class); + + assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK); + assertThat(response.getBody()).isEqualTo("{\"name\":\"Dave\"}"); + + HttpHeaders headers = response.getHeaders(); + + assertThat(headers).containsKey("ce-id"); + assertThat(headers).containsKey("ce-source"); + assertThat(headers).containsKey("ce-type"); + + // assertThat(headers.getFirst("ce-id")).isNotEqualTo("12345"); + assertThat(headers.getFirst("ce-type")).isEqualTo(Person.class.getName()); + assertThat(headers.getFirst("ce-source")).isEqualTo("https://spring.io/ce-webmvc/binary"); + + } + + @Test + void testAsStructured() { + ResponseEntity response = rest.exchange(RequestEntity.post(URI.create("http://localhost:" + port + "/")) // + .contentType(new MediaType("application", "cloudevents+json")) // + .body("{" // + + "\"id\":\"12345\"," // + + "\"specversion\":\"1.0\"," // + + "\"type\":\"io.spring.event\"," // + + "\"source\":\"https://spring.io/events\"," // + + "\"data\":{\"name\":\"Dave\"}}"), + String.class); + + assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK); + assertThat(response.getBody()).isEqualTo("{\"name\":\"Dave\"}"); + + HttpHeaders headers = response.getHeaders(); + + assertThat(headers).containsKey("ce-id"); + assertThat(headers).containsKey("ce-source"); + assertThat(headers).containsKey("ce-type"); + + assertThat(headers.getFirst("ce-type")).isEqualTo(Person.class.getName()); + assertThat(headers.getFirst("ce-source")).isEqualTo("https://spring.io/ce-webmvc/structured"); + + } +} diff --git a/examples/spring/pom.xml b/examples/spring/pom.xml new file mode 100644 index 000000000..c2e9f1832 --- /dev/null +++ b/examples/spring/pom.xml @@ -0,0 +1,25 @@ + + + + cloudevents-examples + io.cloudevents + 2.0.0-SNAPSHOT + + 4.0.0 + + cloudevents-spring-examples-parent + cloudevents-spring-examples-parent + pom + + + + + + cloudevents-spring-function-web + cloudevents-spring-function-stream-kafka + cloudevents-spring-webmvc + + + diff --git a/pom.xml b/pom.xml index 70a38ba36..7eb318648 100644 --- a/pom.xml +++ b/pom.xml @@ -74,6 +74,7 @@ http/vertx http/restful-ws kafka + spring diff --git a/spring/README.md b/spring/README.md new file mode 100644 index 000000000..f72df1773 --- /dev/null +++ b/spring/README.md @@ -0,0 +1,12 @@ +## Spring Support + +### Introduction + +This module provides classes and interfaces that can be used by [Spring frameworks](https://spring.io/) and integrations to assist with Cloud Event processing. + +Given that Spring defines [Message](https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/messaging/Message.html) abstraction, +which perfectly maps to the structure defined by Cloud Events specification, one may say Cloud Events are already supported by any Spring framework that +relies on `Message`. So this modules provides several utilities and strategies to simplify working with Cloud Events in the context of Spring +frameworks and integrations (see individual component's javadocs for more details). + +Please see individual samples in `examples/spring` directory of this SDK for more details. diff --git a/spring/pom.xml b/spring/pom.xml new file mode 100644 index 000000000..ce41d40c5 --- /dev/null +++ b/spring/pom.xml @@ -0,0 +1,131 @@ + + + + 4.0.0 + + + io.cloudevents + cloudevents-parent + 2.0.0-SNAPSHOT + + + cloudevents-spring + CloudEvents - support for Spring + jar + + + io.cloudevents.spring + + + + + + org.springframework.boot + spring-boot-dependencies + 2.4.0 + pom + import + + + + + + + io.cloudevents + cloudevents-api + ${project.version} + + + org.springframework + spring-webmvc + true + + + org.springframework + spring-messaging + true + + + org.springframework.cloud + spring-cloud-function-core + 3.1.0-SNAPSHOT + true + compile + + + + + io.cloudevents + cloudevents-core + ${project.version} + test + + + io.cloudevents + cloudevents-core + tests + test-jar + ${project.version} + test + + + org.springframework.boot + spring-boot-starter-web + test + + + org.springframework.boot + spring-boot-starter-test + test + + + + + + spring-snapshots + Spring Snapshots + https://repo.spring.io/libs-snapshot-local + + true + + + false + + + + spring-milestones + Spring Milestones + https://repo.spring.io/libs-milestone-local + + false + + + + spring-releases + Spring Releases + https://repo.spring.io/release + + false + + + + + + diff --git a/spring/src/main/java/io/cloudevents/spring/core/CloudEventAttributeUtils.java b/spring/src/main/java/io/cloudevents/spring/core/CloudEventAttributeUtils.java new file mode 100644 index 000000000..239ff18b7 --- /dev/null +++ b/spring/src/main/java/io/cloudevents/spring/core/CloudEventAttributeUtils.java @@ -0,0 +1,146 @@ +/* + * Copyright 2020-Present The CloudEvents Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cloudevents.spring.core; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import io.cloudevents.CloudEventAttributes; +import io.cloudevents.SpecVersion; + +import org.springframework.util.MimeType; +import org.springframework.util.MimeTypeUtils; + +/** + * Miscellaneous utility methods to assist with Cloud Event attributes. Primarily intended + * for the internal use within Spring-based frameworks or integrations. + * + * @author Oleg Zhurakousky + * @author Dave Syer + * @since 2.0 + */ +public final class CloudEventAttributeUtils { + + private CloudEventAttributeUtils() { + } + + /** + * String value of 'application/cloudevents' mime type. + */ + public static String APPLICATION_CLOUDEVENTS_VALUE = "application/cloudevents"; + + /** + * {@link MimeType} instance representing 'application/cloudevents' mime type. + */ + public static MimeType APPLICATION_CLOUDEVENTS = MimeTypeUtils.parseMimeType(APPLICATION_CLOUDEVENTS_VALUE); + + /** + * Prefix for attributes. + */ + public static String DEFAULT_ATTR_PREFIX = "ce_"; + + /** + * AMQP attributes prefix. + */ + public static String AMQP_ATTR_PREFIX = "cloudEvents:"; + + /** + * Prefix for attributes. + */ + public static String HTTP_ATTR_PREFIX = "ce-"; + + /** + * Value for 'data' attribute. + */ + public static String DATA = "data"; + + /** + * Make a mutable copy of the input (or just return the input if it is already + * mutable). + * @param attributes input CloudEventAttributes + * @return a mutable instance with the same attributes + */ + public static MutableCloudEventAttributes toMutable(CloudEventAttributes attributes) { + if (attributes instanceof MutableCloudEventAttributes) { + return (MutableCloudEventAttributes) attributes; + } + HashMap headers = new HashMap<>(); + for (String name : attributes.getAttributeNames()) { + headers.put(name, attributes.getAttribute(name)); + } + return CloudEventAttributeUtils.toAttributes(headers); + } + + /** + * Will wrap the provided map of headers as {@link MutableCloudEventAttributes}. + * @param headers map representing headers + * @return instance of {@link MutableCloudEventAttributes} + */ + public static MutableCloudEventAttributes toAttributes(Map headers) { + Map attributes = extractAttributes(headers); + return new MutableCloudEventAttributes(attributes); + } + + private static String determinePrefixToUse(Map messageHeaders) { + Set keys = messageHeaders.keySet(); + if (keys.contains(CloudEventAttributeUtils.DEFAULT_ATTR_PREFIX + MutableCloudEventAttributes.ID)) { + return CloudEventAttributeUtils.DEFAULT_ATTR_PREFIX; + } + else if (keys.contains(CloudEventAttributeUtils.HTTP_ATTR_PREFIX + MutableCloudEventAttributes.ID)) { + return CloudEventAttributeUtils.HTTP_ATTR_PREFIX; + } + else if (keys.contains(CloudEventAttributeUtils.AMQP_ATTR_PREFIX + MutableCloudEventAttributes.ID)) { + return CloudEventAttributeUtils.AMQP_ATTR_PREFIX; + } + else if (keys.contains("user-agent")) { + return CloudEventAttributeUtils.HTTP_ATTR_PREFIX; + } + return ""; + } + + private static Map extractAttributes(Map headers) { + String prefix = determinePrefixToUse(headers); + SpecVersion specVersion = extractSpecVersion(headers, prefix); + Map result = new HashMap<>(); + for (String name : specVersion.getAllAttributes()) { + if (headers.containsKey(prefix + name)) { + result.put(name, headers.get(prefix + name)); + } + } + result.put(MutableCloudEventAttributes.SPECVERSION, specVersion); + if (headers.containsKey(prefix + CloudEventAttributeUtils.DATA)) { + result.put(CloudEventAttributeUtils.DATA, headers.get(prefix + CloudEventAttributeUtils.DATA)); + } + return result; + } + + private static SpecVersion extractSpecVersion(Map headers, String prefix) { + String key = prefix + MutableCloudEventAttributes.SPECVERSION; + if (headers.containsKey(key)) { + Object object = headers.get(key); + if (object instanceof SpecVersion) { + return (SpecVersion) object; + } + if (object != null) { + return SpecVersion.parse(object.toString()); + } + } + return SpecVersion.V1; + } + +} diff --git a/spring/src/main/java/io/cloudevents/spring/core/CloudEventAttributesProvider.java b/spring/src/main/java/io/cloudevents/spring/core/CloudEventAttributesProvider.java new file mode 100644 index 000000000..53d709881 --- /dev/null +++ b/spring/src/main/java/io/cloudevents/spring/core/CloudEventAttributesProvider.java @@ -0,0 +1,56 @@ +/* + * Copyright 2020-Present The CloudEvents Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cloudevents.spring.core; + +import io.cloudevents.CloudEventAttributes; + +/** + * Strategy that should be implemented by the user to help with outgoing Cloud Event + * attributes. + *
+ *
+ * NOTE: The provided `attributes` may or may not be initialized with default values, + * so it is the responsibility of the user to ensure that all required Cloud Events + * attributes are set. That said, various Spring frameworks which utilize this interface + * will ensure that the 'provided' attributes are initialized with default values, leaving + * to responsible to only set the attributes you need. + *
+ * Once implemented, simply configure it as a bean and the framework will invoke it before + * the outbound Cloud Event Message is finalized. + * + *
+ * @Bean
+ * public CloudEventAttributesProvider cloudEventAttributesProvider() {
+ * 	return attributes ->
+ *		CloudEventAttributeUtils.get(attributes).setSource("https://interface21.com/").setType("com.interface21");
+ * }
+ * 
+ * + * @author Oleg Zhurakousky + * @author Dave Syer + * @since 2.0 + */ +@FunctionalInterface +public interface CloudEventAttributesProvider { + + /** + * @param attributes instance of {@link CloudEventAttributes} + * @return instance of modified {@link CloudEventAttributes} + */ + CloudEventAttributes getOutputAttributes(CloudEventAttributes attributes); + +} diff --git a/spring/src/main/java/io/cloudevents/spring/core/MutableCloudEventAttributes.java b/spring/src/main/java/io/cloudevents/spring/core/MutableCloudEventAttributes.java new file mode 100644 index 000000000..2d0107be4 --- /dev/null +++ b/spring/src/main/java/io/cloudevents/spring/core/MutableCloudEventAttributes.java @@ -0,0 +1,246 @@ +/* + * Copyright 2020-Present The CloudEvents Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cloudevents.spring.core; + +import java.io.Serializable; +import java.net.URI; +import java.time.OffsetDateTime; +import java.util.HashMap; +import java.util.Map; + +import io.cloudevents.CloudEventAttributes; +import io.cloudevents.SpecVersion; + +import org.springframework.util.StringUtils; + +/** + * Utility class to assist with accessing and setting Cloud Events attributes from headers + * in messages and HTTP exchanges. + * + * It is primarily used within various Spring frameworks. + * + * @author Oleg Zhurakousky + * @author Dave Syer + * @since 2.0 + */ +public class MutableCloudEventAttributes implements CloudEventAttributes, Serializable { + + private static final long serialVersionUID = 5393610770855366497L; + + private Map map = new HashMap<>(); + + boolean isV03 = false; + + /** + * Value for 'id' attribute. + */ + public static String ID = "id"; + + /** + * Value for 'source' attribute. + */ + public static String SOURCE = "source"; + + /** + * Value for 'specversion' attribute. + */ + public static String SPECVERSION = "specversion"; + + /** + * Value for 'type' attribute. + */ + public static String TYPE = "type"; + + /** + * Value for 'datacontenttype' attribute. + */ + public static String DATACONTENTTYPE = "datacontenttype"; + + /** + * Value for 'dataschema' attribute. + */ + public static String DATASCHEMA = "dataschema"; + + /** + * V03 name for 'dataschema' attribute. + */ + public static final String SCHEMAURL = "schemaurl"; + + /** + * Value for 'subject' attribute. + */ + public static String SUBJECT = "subject"; + + /** + * Value for 'time' attribute. + */ + public static String TIME = "time"; + + MutableCloudEventAttributes(Map headers) { + map.putAll(headers); + safe(headers, MutableCloudEventAttributes.SOURCE); + safe(headers, MutableCloudEventAttributes.DATASCHEMA); + this.isV03 = this.getSpecVersion().equals(SpecVersion.V03); + } + + private void safe(Map headers, String key) { + Object value = headers.get(key); + if (value != null) { + map.put(key, value.toString()); + } + } + + @Override + public SpecVersion getSpecVersion() { + SpecVersion specVersion = (SpecVersion) this.getAttribute(MutableCloudEventAttributes.SPECVERSION); + return specVersion == null ? SpecVersion.V1 : specVersion; + } + + public MutableCloudEventAttributes setId(String id) { + this.setAttribute(MutableCloudEventAttributes.ID, id); + return this; + } + + @Override + public String getId() { + Object id = this.getAttribute(MutableCloudEventAttributes.ID); + return id == null ? null : id.toString(); + } + + public MutableCloudEventAttributes setType(String type) { + this.setAttribute(MutableCloudEventAttributes.TYPE, type); + return this; + } + + @Override + public String getType() { + return (String) this.getAttribute(MutableCloudEventAttributes.TYPE); + } + + public MutableCloudEventAttributes setSource(URI source) { + this.setAttribute(MutableCloudEventAttributes.SOURCE, source.toString()); + return this; + } + + @Override + public URI getSource() { + Object value = this.getAttribute(MutableCloudEventAttributes.SOURCE); + return value == null ? null : URI.create((String) value); + } + + public MutableCloudEventAttributes setDataContentType(String datacontenttype) { + this.setAttribute(MutableCloudEventAttributes.DATACONTENTTYPE, datacontenttype); + return this; + } + + @Override + public String getDataContentType() { + return (String) this.getAttribute(MutableCloudEventAttributes.DATACONTENTTYPE); + } + + public MutableCloudEventAttributes setDataSchema(URI dataschema) { + this.setAttribute(MutableCloudEventAttributes.DATASCHEMA, dataschema.toString()); + return this; + } + + @Override + public URI getDataSchema() { + Object value = this.getAttribute(MutableCloudEventAttributes.DATASCHEMA); + if (value == null && this.getSpecVersion() == SpecVersion.V03) { + value = this.getAttribute(MutableCloudEventAttributes.SCHEMAURL); + } + return value == null ? null : URI.create((String) value); + } + + public MutableCloudEventAttributes setSubject(String subject) { + this.setAttribute(MutableCloudEventAttributes.SUBJECT, subject); + return this; + } + + @Override + public String getSubject() { + return (String) this.getAttribute(MutableCloudEventAttributes.SUBJECT); + } + + public MutableCloudEventAttributes setTime(String time) { + this.setAttribute(MutableCloudEventAttributes.TIME, time); + return this; + } + + @Override + public OffsetDateTime getTime() { + String time = (String) this.getAttribute(MutableCloudEventAttributes.TIME); + return OffsetDateTime.parse(time); + } + + /** + * Will delegate to the underlying {@link Map} returning the value for the requested + * attribute or null. + */ + @Override + public Object getAttribute(String attributeName) { + if (isV03 && MutableCloudEventAttributes.SCHEMAURL.equals(attributeName) + && map.containsKey(MutableCloudEventAttributes.DATASCHEMA)) { + return map.get(MutableCloudEventAttributes.DATASCHEMA); + } + return map.get(attributeName); + } + + /** + * Determines if this instance of {@link CloudEventAttributes} represents valid Cloud + * Event. This implies that it contains all 4 required attributes (id, source, type + * and specversion) + * @return true if this instance represents a valid Cloud Event + */ + public boolean isValidCloudEvent() { + return StringUtils.hasText(this.getId()) && this.getSource() != null + && StringUtils.hasText(this.getSource().toString()) && this.getSpecVersion() != null + && StringUtils.hasText(this.getType()); + } + + /** + * Will convert these attributes to {@link Map} of headers where each attribute will + * be prefixed with the value of 'prefixToUse'. + * @param prefixToUse prefix to be used on attributes + * @return map of headers. + */ + public Map toMap(String prefixToUse) { + Map result = new HashMap<>(); + if (!StringUtils.hasText(prefixToUse)) { + prefixToUse = ""; + } + for (String key : this.getAttributeNames()) { + Object value = this.getAttribute(key); + if (value != null) { + result.put(prefixToUse + key, value); + } + } + result.put(prefixToUse + "specversion", this.getSpecVersion().toString()); + return result; + } + + public MutableCloudEventAttributes setAttribute(String attrName, Object attrValue) { + map.put(attrName, attrValue); + return this; + } + + @Override + public String toString() { + return map.toString(); + } + +} diff --git a/spring/src/main/java/io/cloudevents/spring/core/package-info.java b/spring/src/main/java/io/cloudevents/spring/core/package-info.java new file mode 100644 index 000000000..30101e2d9 --- /dev/null +++ b/spring/src/main/java/io/cloudevents/spring/core/package-info.java @@ -0,0 +1,4 @@ +/** + * Provides classes related to working with Cloud Events within the context of Spring. + */ +package io.cloudevents.spring.core; \ No newline at end of file diff --git a/spring/src/main/java/io/cloudevents/spring/http/CloudEventHttpUtils.java b/spring/src/main/java/io/cloudevents/spring/http/CloudEventHttpUtils.java new file mode 100644 index 000000000..9bb67765a --- /dev/null +++ b/spring/src/main/java/io/cloudevents/spring/http/CloudEventHttpUtils.java @@ -0,0 +1,56 @@ +/* + * Copyright 2020-Present The CloudEvents Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cloudevents.spring.http; + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import io.cloudevents.CloudEventAttributes; +import io.cloudevents.spring.core.CloudEventAttributeUtils; +import io.cloudevents.spring.core.MutableCloudEventAttributes; + +import org.springframework.http.HttpHeaders; + +/** + * Miscellaneous utility methods to assist with Cloud Events in the context of Spring Web frameworks. + * Primarily intended for the internal use within Spring-based frameworks or integrations. + * + * @author Dave Syer + * @since 2.0 + */ +public class CloudEventHttpUtils { + + public static HttpHeaders toHttp(CloudEventAttributes attributes) { + HttpHeaders headers = new HttpHeaders(); + for (String key : attributes.getAttributeNames()) { + String target = CloudEventAttributeUtils.HTTP_ATTR_PREFIX + key; + if (attributes.getAttribute(key) != null) { + // TODO: need to convert timestamps? + headers.set(target, attributes.getAttribute(key).toString()); + } + } + return headers; + } + + public static MutableCloudEventAttributes fromHttp(HttpHeaders headers) { + Map map = new HashMap<>(); + map.putAll(headers.toSingleValueMap()); + return CloudEventAttributeUtils.toAttributes(map).setId(UUID.randomUUID().toString()); + } + +} diff --git a/spring/src/main/java/io/cloudevents/spring/http/package-info.java b/spring/src/main/java/io/cloudevents/spring/http/package-info.java new file mode 100644 index 000000000..42baa9547 --- /dev/null +++ b/spring/src/main/java/io/cloudevents/spring/http/package-info.java @@ -0,0 +1,4 @@ +/** + * Provides classes related to working with Cloud Events within the context of Spring MVC. + */ +package io.cloudevents.spring.http; \ No newline at end of file diff --git a/spring/src/main/java/io/cloudevents/spring/messaging/CloudEventMessageUtils.java b/spring/src/main/java/io/cloudevents/spring/messaging/CloudEventMessageUtils.java new file mode 100644 index 000000000..b5b7d0f09 --- /dev/null +++ b/spring/src/main/java/io/cloudevents/spring/messaging/CloudEventMessageUtils.java @@ -0,0 +1,126 @@ +/* + * Copyright 2020-Present The CloudEvents Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cloudevents.spring.messaging; + +import java.util.Collections; +import java.util.Map; +import java.util.function.Consumer; +import java.util.function.Function; + +import io.cloudevents.CloudEventAttributes; +import io.cloudevents.spring.core.CloudEventAttributeUtils; +import io.cloudevents.spring.core.CloudEventAttributesProvider; +import io.cloudevents.spring.core.MutableCloudEventAttributes; + +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.converter.ContentTypeResolver; +import org.springframework.messaging.converter.DefaultContentTypeResolver; +import org.springframework.messaging.converter.MessageConverter; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.util.MimeType; +import org.springframework.util.MimeTypeUtils; +import org.springframework.util.StringUtils; + +/** + * Miscellaneous utility methods to assist with representing Cloud Event as Spring + * {@link Message}
+ * Primarily intended for the internal use within Spring-based frameworks and + * integrations; + * + * @author Oleg Zhurakousky + * @author Dave Syer + * @since 2.0 + */ +public final class CloudEventMessageUtils { + + private static final ContentTypeResolver contentTypeResolver = new DefaultContentTypeResolver(); + + private CloudEventMessageUtils() { + + } + + @SuppressWarnings("unchecked") + public static Message toBinary(Message inputMessage, MessageConverter messageConverter) { + Map headers = inputMessage.getHeaders(); + MutableCloudEventAttributes attributes = CloudEventAttributeUtils.toAttributes(headers); + + // first check the obvious and see if content-type is `cloudevents` + if (!attributes.isValidCloudEvent() && headers.containsKey(MessageHeaders.CONTENT_TYPE)) { + MimeType contentType = contentTypeResolver.resolve(inputMessage.getHeaders()); + if (contentType.getType().equals(CloudEventAttributeUtils.APPLICATION_CLOUDEVENTS.getType()) && contentType + .getSubtype().startsWith(CloudEventAttributeUtils.APPLICATION_CLOUDEVENTS.getSubtype())) { + + String dataContentType = StringUtils.hasText(attributes.getDataContentType()) + ? attributes.getDataContentType() : MimeTypeUtils.APPLICATION_JSON_VALUE; + + String suffix = contentType.getSubtypeSuffix(); + MimeType cloudEventDeserializationContentType = MimeTypeUtils + .parseMimeType(contentType.getType() + "/" + suffix); + Message cloudEventMessage = MessageBuilder.fromMessage(inputMessage) + .setHeader(MessageHeaders.CONTENT_TYPE, cloudEventDeserializationContentType) + .setHeader(MutableCloudEventAttributes.DATACONTENTTYPE, dataContentType).build(); + Map structuredCloudEvent = (Map) messageConverter + .fromMessage(cloudEventMessage, Map.class); + Message binaryCeMessage = buildBinaryMessageFromStructuredMap(structuredCloudEvent, + inputMessage.getHeaders()); + return binaryCeMessage; + } + } + else if (StringUtils.hasText(attributes.getDataContentType())) { + return MessageBuilder.fromMessage(inputMessage) + .setHeader(MessageHeaders.CONTENT_TYPE, attributes.getDataContentType()).build(); + } + return inputMessage; + } + + /** + * Utility method to assist with creating output attributes.
+ * Typically user by {@link Consumer}. Unlike {@link Function} where framework(s) + * internally do that once the function is executed and output is produced, Consumer + * does not produce any output, so from the framework perspective it is the end of the + * line. However, such Consumer may want to send new Cloud Event (e.g., via HTTP or + * some messaging template) and thus still requires generation of output attributes. + * @param message instance of input {@link Message}. + * @param provider instance of CloudEventAttributesProvider. + * @return an instance of {@link CloudEventAttributes} as + * {@link MutableCloudEventAttributes} + */ + public static MutableCloudEventAttributes getOutputAttributes(Message message, + CloudEventAttributesProvider provider) { + MutableCloudEventAttributes attributes = CloudEventAttributeUtils.toAttributes(message.getHeaders()) + .setId(message.getHeaders().getId().toString()) + .setType(message.getPayload().getClass().getName().getClass().getName()); + return CloudEventAttributeUtils.toMutable(provider.getOutputAttributes(attributes)); + } + + private static Message buildBinaryMessageFromStructuredMap(Map structuredCloudEvent, + MessageHeaders originalHeaders) { + MutableCloudEventAttributes attributes = CloudEventAttributeUtils.toAttributes(structuredCloudEvent); + Object payload = attributes.getAttribute(CloudEventAttributeUtils.DATA); + if (payload == null) { + payload = Collections.emptyMap(); + } + return MessageBuilder.withPayload(payload) + .copyHeaders(attributes.toMap(CloudEventAttributeUtils.DEFAULT_ATTR_PREFIX)) + .copyHeaders(originalHeaders) + .setHeader(CloudEventAttributeUtils.DEFAULT_ATTR_PREFIX + MutableCloudEventAttributes.ID, + attributes.getId()) + .build(); + } + +} diff --git a/spring/src/main/java/io/cloudevents/spring/messaging/package-info.java b/spring/src/main/java/io/cloudevents/spring/messaging/package-info.java new file mode 100644 index 000000000..484497d6d --- /dev/null +++ b/spring/src/main/java/io/cloudevents/spring/messaging/package-info.java @@ -0,0 +1,4 @@ +/** + * Provides classes related to working with Cloud Events within the context of Spring Messaging. + */ +package io.cloudevents.spring.messaging; \ No newline at end of file diff --git a/spring/src/test/java/io/cloudevents/spring/core/CloudEventAttributeUtilsTests.java b/spring/src/test/java/io/cloudevents/spring/core/CloudEventAttributeUtilsTests.java new file mode 100644 index 000000000..96c8350ea --- /dev/null +++ b/spring/src/test/java/io/cloudevents/spring/core/CloudEventAttributeUtilsTests.java @@ -0,0 +1,92 @@ +package io.cloudevents.spring.core; + +import java.net.URI; +import java.util.HashMap; +import java.util.Map; + +import io.cloudevents.SpecVersion; +import io.cloudevents.core.v1.CloudEventBuilder; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class CloudEventAttributeUtilsTests { + + @Test + public void testWithEmpty() { + Map headers = new HashMap<>(); + MutableCloudEventAttributes attributes = CloudEventAttributeUtils.toAttributes(headers); + assertThat(attributes.getSpecVersion()).isEqualTo(SpecVersion.V1); + assertThat(attributes.getId()).isNull(); + assertThat(attributes.getSource()).isNull(); + assertThat(attributes.getType()).isNull(); + } + + @Test + public void testWithPrefix() { + Map headers = new HashMap<>(); + headers.put("ce-scpecversion", "1.0"); + headers.put("ce-id", "A234-1234-1234"); + headers.put("ce-source", "https://spring.io/"); + headers.put("ce-type", "org.springframework"); + headers.put("ce-datacontenttype", "application/json"); + MutableCloudEventAttributes attributes = CloudEventAttributeUtils.toAttributes(headers); + assertThat(attributes.getSpecVersion()).isEqualTo(SpecVersion.V1); + assertThat(attributes.getId()).isEqualTo("A234-1234-1234"); + assertThat(attributes.getSource()).isEqualTo(URI.create("https://spring.io/")); + assertThat(attributes.getType()).isEqualTo("org.springframework"); + assertThat(attributes.getDataContentType()).isEqualTo("application/json"); + } + + @Test + public void testWithNoPrefix() { + Map headers = new HashMap<>(); + headers.put("id", "A234-1234-1234"); + headers.put("source", "https://spring.io/"); + headers.put("type", "org.springframework"); + headers.put("datacontenttype", "application/json"); + MutableCloudEventAttributes attributes = CloudEventAttributeUtils.toAttributes(headers); + assertThat(attributes.getId()).isEqualTo("A234-1234-1234"); + assertThat(attributes.getSpecVersion()).isEqualTo(SpecVersion.V1); + assertThat(attributes.getSource()).isEqualTo(URI.create("https://spring.io/")); + assertThat(attributes.getType()).isEqualTo("org.springframework"); + assertThat(attributes.getDataContentType()).isEqualTo("application/json"); + } + + @Test + public void testWithNative() { + MutableCloudEventAttributes attributes = CloudEventAttributeUtils + .toMutable(new CloudEventBuilder().withId("A234-1234-1234") + .withSource(URI.create("https://spring.io/")).withType("org.springframework").build()); + assertThat(attributes.getId()).isEqualTo("A234-1234-1234"); + assertThat(attributes.getSpecVersion()).isEqualTo(SpecVersion.V1); + assertThat(attributes.getSource()).isEqualTo(URI.create("https://spring.io/")); + assertThat(attributes.getType()).isEqualTo("org.springframework"); + } + + @Test + public void testToHeadersNoPrefix() { + MutableCloudEventAttributes attributes = CloudEventAttributeUtils + .toMutable(new CloudEventBuilder().withId("A234-1234-1234") + .withSource(URI.create("https://spring.io/")).withType("org.springframework").build()); + Map headers = attributes.toMap(null); + assertThat(headers.get("id")).isEqualTo("A234-1234-1234"); + assertThat(headers.get("specversion")).isEqualTo("1.0"); + assertThat(headers.get("source")).isEqualTo("https://spring.io/"); + assertThat(headers.get("type")).isEqualTo("org.springframework"); + } + + @Test + public void testToHeaders() { + MutableCloudEventAttributes attributes = CloudEventAttributeUtils + .toMutable(new CloudEventBuilder().withId("A234-1234-1234") + .withSource(URI.create("https://spring.io/")).withType("org.springframework").build()); + Map headers = attributes.toMap("ce-"); + assertThat(headers.get("ce-id")).isEqualTo("A234-1234-1234"); + assertThat(headers).doesNotContainKey("id"); + assertThat(headers.get("ce-specversion")).isEqualTo("1.0"); + assertThat(headers.get("ce-source")).isEqualTo("https://spring.io/"); + assertThat(headers.get("ce-type")).isEqualTo("org.springframework"); + } + +} diff --git a/spring/src/test/java/io/cloudevents/spring/core/MutableCloudEventAttributesTests.java b/spring/src/test/java/io/cloudevents/spring/core/MutableCloudEventAttributesTests.java new file mode 100644 index 000000000..e73e0807b --- /dev/null +++ b/spring/src/test/java/io/cloudevents/spring/core/MutableCloudEventAttributesTests.java @@ -0,0 +1,89 @@ +/* + * Copyright 2019-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.cloudevents.spring.core; + +import java.net.URI; +import java.util.Collections; +import java.util.Map; + +import io.cloudevents.SpecVersion; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Dave Syer + * + */ +public class MutableCloudEventAttributesTests { + + @Test + void testEmpty() throws Exception { + MutableCloudEventAttributes attributes = new MutableCloudEventAttributes(Collections.emptyMap()); + assertThat(attributes.getSpecVersion()).isEqualTo(SpecVersion.V1); + assertThat(attributes.getId()).isNull(); + } + + @Test + void testSetAttribute() throws Exception { + MutableCloudEventAttributes attributes = new MutableCloudEventAttributes(Collections.emptyMap()); + attributes.setAttribute(MutableCloudEventAttributes.ID, "A1234-1234"); + assertThat(attributes.getSpecVersion()).isEqualTo(SpecVersion.V1); + assertThat(attributes.getId()).isEqualTo("A1234-1234"); + } + + @Test + void testV03() throws Exception { + MutableCloudEventAttributes attributes = new MutableCloudEventAttributes( + Collections.singletonMap(MutableCloudEventAttributes.SPECVERSION, SpecVersion.V03)); + attributes.setAttribute(MutableCloudEventAttributes.ID, "A1234-1234"); + attributes.setAttribute(MutableCloudEventAttributes.SCHEMAURL, "https://schema.spring.io/ce-0.3"); + assertThat(attributes.getSpecVersion()).isEqualTo(SpecVersion.V03); + assertThat(attributes.getId()).isEqualTo("A1234-1234"); + assertThat(attributes.getDataSchema().toString()).isEqualTo("https://schema.spring.io/ce-0.3"); + } + + @Test + void testV03MapWithExplicitSchema() throws Exception { + MutableCloudEventAttributes attributes = new MutableCloudEventAttributes( + Collections.singletonMap(MutableCloudEventAttributes.SPECVERSION, SpecVersion.V03)); + attributes.setId("A1234-1234"); + attributes.setSource(URI.create("https://spring.io/")); + attributes.setType("org.springframework"); + attributes.setDataSchema(URI.create("https://schema.spring.io/ce-0.3")); + Map headers = attributes.toMap("ce-"); + assertThat(headers.get("ce-specversion")).isEqualTo("0.3"); + assertThat(headers.get("ce-source")).isEqualTo("https://spring.io/"); + assertThat(headers.get("ce-type")).isEqualTo("org.springframework"); + assertThat(headers.get("ce-schemaurl")).isEqualTo("https://schema.spring.io/ce-0.3"); + } + + @Test + void testV03MapWithAttributeSchema() throws Exception { + MutableCloudEventAttributes attributes = new MutableCloudEventAttributes( + Collections.singletonMap(MutableCloudEventAttributes.SPECVERSION, SpecVersion.V03)); + attributes.setId("A1234-1234"); + attributes.setSource(URI.create("https://spring.io/")); + attributes.setType("org.springframework"); + attributes.setAttribute(MutableCloudEventAttributes.SCHEMAURL, "https://schema.spring.io/ce-0.3"); + Map headers = attributes.toMap("ce-"); + assertThat(headers.get("ce-specversion")).isEqualTo("0.3"); + assertThat(headers.get("ce-source")).isEqualTo("https://spring.io/"); + assertThat(headers.get("ce-type")).isEqualTo("org.springframework"); + assertThat(headers.get("ce-schemaurl")).isEqualTo("https://schema.spring.io/ce-0.3"); + } + +} diff --git a/spring/src/test/java/io/cloudevents/spring/http/RestControllerTests.java b/spring/src/test/java/io/cloudevents/spring/http/RestControllerTests.java new file mode 100644 index 000000000..8b073417b --- /dev/null +++ b/spring/src/test/java/io/cloudevents/spring/http/RestControllerTests.java @@ -0,0 +1,163 @@ +/* + * Copyright 2020-Present The CloudEvents Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cloudevents.spring.http; + +import java.net.URI; +import java.util.Map; +import java.util.UUID; + +import io.cloudevents.spring.core.CloudEventAttributeUtils; +import io.cloudevents.spring.core.MutableCloudEventAttributes; +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; +import org.springframework.boot.test.web.client.TestRestTemplate; +import org.springframework.boot.web.server.LocalServerPort; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.RequestEntity; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestHeader; +import org.springframework.web.bind.annotation.RestController; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Dave Syer + * + */ +@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT) +class RestControllerTests { + + @Autowired + private TestRestTemplate rest; + + @LocalServerPort + private int port; + + @Test + void echoWithCorrectHeaders() { + + ResponseEntity response = rest.exchange(RequestEntity.post(URI.create("http://localhost:" + port + "/")) // + .header("ce-id", "12345") // + .header("ce-specversion", "1.0") // + .header("ce-type", "io.spring.event") // + .header("ce-source", "https://spring.io/events") // + .contentType(MediaType.APPLICATION_JSON) // + .body("{\"value\":\"Dave\"}"), String.class); + + assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK); + assertThat(response.getBody()).isEqualTo("{\"value\":\"Dave\"}"); + + HttpHeaders headers = response.getHeaders(); + + assertThat(headers).containsKey("ce-id"); + assertThat(headers).containsKey("ce-source"); + assertThat(headers).containsKey("ce-type"); + + // assertThat(headers.getFirst("ce-id")).isNotEqualTo("12345"); + assertThat(headers.getFirst("ce-type")).isEqualTo("io.spring.event.Foo"); + assertThat(headers.getFirst("ce-source")).isEqualTo("https://spring.io/foos"); + + } + + @Test + void structured() { + + ResponseEntity response = rest.exchange(RequestEntity.post(URI.create("http://localhost:" + port + "/")) // + .contentType(new MediaType("application", "cloudevents+json")) // + .body("{" // + + "\"id\":\"12345\"," // + + "\"specversion\":\"1.0\"," // + + "\"type\":\"io.spring.event\"," // + + "\"source\":\"https://spring.io/events\"," // + + "\"data\":{\"value\":\"Dave\"}}"), + String.class); + + assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK); + assertThat(response.getBody()).isEqualTo("{\"value\":\"Dave\"}"); + + HttpHeaders headers = response.getHeaders(); + + assertThat(headers).containsKey("ce-id"); + assertThat(headers).containsKey("ce-source"); + assertThat(headers).containsKey("ce-type"); + + // assertThat(headers.getFirst("ce-id")).isNotEqualTo("12345"); + assertThat(headers.getFirst("ce-type")).isEqualTo("io.spring.event.Foo"); + assertThat(headers.getFirst("ce-source")).isEqualTo("https://spring.io/foos"); + + } + + @SpringBootApplication + @RestController + static class TestApplication { + + @PostMapping("/") + public ResponseEntity echo(@RequestBody Foo foo, @RequestHeader HttpHeaders headers) { + MutableCloudEventAttributes attributes = CloudEventHttpUtils.fromHttp(headers) + .setId(UUID.randomUUID().toString()).setSource(URI.create("https://spring.io/foos")) + .setType("io.spring.event.Foo"); + HttpHeaders outgoing = CloudEventHttpUtils.toHttp(attributes); + return ResponseEntity.ok().headers(outgoing).body(foo); + } + + @PostMapping(path = "/", consumes = "application/cloudevents+json") + public ResponseEntity structured(@RequestBody Map body, + @RequestHeader HttpHeaders headers) { + MutableCloudEventAttributes attributes = CloudEventAttributeUtils.toAttributes(body) + .setId(UUID.randomUUID().toString()).setSource(URI.create("https://spring.io/foos")) + .setType("io.spring.event.Foo"); + HttpHeaders outgoing = CloudEventHttpUtils.toHttp(attributes); + return ResponseEntity.ok().headers(outgoing).body(body.get(CloudEventAttributeUtils.DATA)); + } + + } + +} + +class Foo { + + private String value; + + public Foo() { + } + + public Foo(String value) { + this.value = value; + } + + public String getValue() { + return this.value; + } + + public void setValue(String value) { + this.value = value; + } + + @Override + public String toString() { + return "Foo [value=" + this.value + "]"; + } + +} diff --git a/spring/src/test/java/io/cloudevents/spring/messaging/CloudEventMessageUtilsTests.java b/spring/src/test/java/io/cloudevents/spring/messaging/CloudEventMessageUtilsTests.java new file mode 100644 index 000000000..61afc8976 --- /dev/null +++ b/spring/src/test/java/io/cloudevents/spring/messaging/CloudEventMessageUtilsTests.java @@ -0,0 +1,204 @@ +/* + * Copyright 2020-Present The CloudEvents Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cloudevents.spring.messaging; + +import java.net.URI; +import java.util.Map; + +import io.cloudevents.core.v1.CloudEventBuilder; +import io.cloudevents.spring.core.CloudEventAttributeUtils; +import io.cloudevents.spring.core.MutableCloudEventAttributes; +import org.junit.jupiter.api.Test; + +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.converter.MappingJackson2MessageConverter; +import org.springframework.messaging.support.MessageBuilder; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * + * @author Oleg Zhurakousky + * + */ +public class CloudEventMessageUtilsTests { + + String payloadWithHttpPrefix = "{\n" + + " \"ce-specversion\" : \"1.0\",\n" + + " \"ce-type\" : \"org.springframework\",\n" + + " \"ce-source\" : \"https://spring.io/\",\n" + + " \"ce-id\" : \"A234-1234-1234\",\n" + + " \"ce-datacontenttype\" : \"application/json\",\n" + + " \"ce-data\" : {\n" + + " \"version\" : \"1.0\",\n" + + " \"releaseName\" : \"Spring Framework\",\n" + + " \"releaseDate\" : \"24-03-2004\"\n" + + " }\n" + + "}"; + + String payloadNoPrefix = "{\n" + + " \"specversion\" : \"1.0\",\n" + + " \"type\" : \"org.springframework\",\n" + + " \"source\" : \"https://spring.io/\",\n" + + " \"id\" : \"A234-1234-1234\",\n" + + " \"datacontenttype\" : \"application/json\",\n" + + " \"data\" : {\n" + + " \"version\" : \"1.0\",\n" + + " \"releaseName\" : \"Spring Framework\",\n" + + " \"releaseDate\" : \"24-03-2004\"\n" + + " }\n" + + "}"; + + String payloadNoDataContentType = "{\n" + + " \"ce_specversion\" : \"1.0\",\n" + + " \"ce_type\" : \"org.springframework\",\n" + + " \"ce_source\" : \"https://spring.io/\",\n" + + " \"ce_id\" : \"A234-1234-1234\",\n" + + " \"ce_datacontenttype\" : \"application/json\",\n" + + " \"data\" : {\n" + + " \"version\" : \"1.0\",\n" + + " \"releaseName\" : \"Spring Framework\",\n" + + " \"releaseDate\" : \"24-03-2004\"\n" + + " }\n" + + "}"; + + @Test + public void testGenerateAttributes() { + Message message = MessageBuilder.withPayload("Hello") + .copyHeaders(CloudEventAttributeUtils + .toMutable(new CloudEventBuilder().withId("A234-1234-1234") + .withSource(URI.create("https://spring.io/")).withType("org.springframework").build()) + .toMap("ce_")) + .build(); + MutableCloudEventAttributes attributes = CloudEventMessageUtils.getOutputAttributes(message, attrs -> attrs); + assertThat(attributes.getId()).isNotEqualTo("A234-1234-1234"); + assertThat(attributes.getSource()).isEqualTo(URI.create("https://spring.io/")); + assertThat(attributes.getType()).isEqualTo(String.class.getName()); + } + + @SuppressWarnings("unchecked") + @Test + public void testStructuredToBinaryWithPrefix() { + Message structuredMessage = MessageBuilder.withPayload(payloadWithHttpPrefix) + .setHeader(MessageHeaders.CONTENT_TYPE, + CloudEventAttributeUtils.APPLICATION_CLOUDEVENTS_VALUE + "+json") + .setHeader("foo", "bar").build(); + + MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter(); + Message> binaryMessage = (Message>) CloudEventMessageUtils + .toBinary(structuredMessage, converter); + MutableCloudEventAttributes attributes = CloudEventAttributeUtils.toAttributes(binaryMessage.getHeaders()); + assertThat(attributes.getId()).isEqualTo("A234-1234-1234"); + assertThat(attributes.getSource()).isEqualTo(URI.create("https://spring.io/")); + assertThat(attributes.getType()).isEqualTo("org.springframework"); + assertThat(attributes.getDataContentType()).isEqualTo("application/json"); + assertThat(binaryMessage.getHeaders().get("foo")).isEqualTo("bar"); + } + + @SuppressWarnings("unchecked") + @Test + public void testStructuredToBinaryWithPrefixAndUserAgent() { + Message structuredMessage = MessageBuilder.withPayload(payloadWithHttpPrefix) + .setHeader(MessageHeaders.CONTENT_TYPE, + CloudEventAttributeUtils.APPLICATION_CLOUDEVENTS_VALUE + "+json") + .setHeader("user-agent", "oleg").build(); + + MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter(); + Message> binaryMessage = (Message>) CloudEventMessageUtils + .toBinary(structuredMessage, converter); + MutableCloudEventAttributes attributes = CloudEventAttributeUtils.toAttributes(binaryMessage.getHeaders()); + assertThat(attributes.getId()).isEqualTo("A234-1234-1234"); + assertThat(attributes.getSource()).isEqualTo(URI.create("https://spring.io/")); + assertThat(attributes.getType()).isEqualTo("org.springframework"); + assertThat(attributes.getDataContentType()).isEqualTo("application/json"); + } + + @SuppressWarnings("unchecked") + @Test + public void testStructuredToBinaryNoPrefix() { + Message structuredMessage = MessageBuilder.withPayload(payloadNoPrefix).setHeader( + MessageHeaders.CONTENT_TYPE, CloudEventAttributeUtils.APPLICATION_CLOUDEVENTS_VALUE + "+json").build(); + + MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter(); + Message> binaryMessage = (Message>) CloudEventMessageUtils + .toBinary(structuredMessage, converter); + MutableCloudEventAttributes attributes = CloudEventAttributeUtils.toAttributes(binaryMessage.getHeaders()); + assertThat(attributes.getId()).isEqualTo("A234-1234-1234"); + assertThat(attributes.getSource()).isEqualTo(URI.create("https://spring.io/")); + assertThat(attributes.getType()).isEqualTo("org.springframework"); + assertThat(attributes.getDataContentType()).isEqualTo("application/json"); + } + + @SuppressWarnings("unchecked") + @Test + public void testStructuredToBinaryNoDataContentType() { + Message structuredMessage = MessageBuilder.withPayload(payloadNoPrefix).setHeader( + MessageHeaders.CONTENT_TYPE, CloudEventAttributeUtils.APPLICATION_CLOUDEVENTS_VALUE + "+json").build(); + + MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter(); + Message> binaryMessage = (Message>) CloudEventMessageUtils + .toBinary(structuredMessage, converter); + MutableCloudEventAttributes attributes = CloudEventAttributeUtils.toAttributes(binaryMessage.getHeaders()); + assertThat(attributes.getId()).isEqualTo("A234-1234-1234"); + assertThat(attributes.getSource()).isEqualTo(URI.create("https://spring.io/")); + assertThat(attributes.getType()).isEqualTo("org.springframework"); + assertThat(attributes.getDataContentType()).isEqualTo("application/json"); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testStructuredToBinaryBackToMessageHeaders() { + MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter(); + Message structuredMessage = MessageBuilder.withPayload(payloadWithHttpPrefix).setHeader( + MessageHeaders.CONTENT_TYPE, CloudEventAttributeUtils.APPLICATION_CLOUDEVENTS_VALUE + "+json").build(); + + Message> binaryMessage = (Message>) CloudEventMessageUtils + .toBinary(structuredMessage, converter); + assertThat(binaryMessage.getHeaders().containsKey("ce-data")).isFalse(); + MutableCloudEventAttributes attributes = CloudEventAttributeUtils.toAttributes(binaryMessage.getHeaders()); + + Map headers = attributes.toMap(CloudEventAttributeUtils.DEFAULT_ATTR_PREFIX); + assertThat(headers.get(CloudEventAttributeUtils.DEFAULT_ATTR_PREFIX + MutableCloudEventAttributes.ID)) + .isEqualTo("A234-1234-1234"); + assertThat(headers.get(CloudEventAttributeUtils.DEFAULT_ATTR_PREFIX + MutableCloudEventAttributes.SOURCE)) + .isEqualTo("https://spring.io/"); + assertThat(headers.get(CloudEventAttributeUtils.DEFAULT_ATTR_PREFIX + MutableCloudEventAttributes.TYPE)) + .isEqualTo("org.springframework"); + assertThat( + headers.get(CloudEventAttributeUtils.DEFAULT_ATTR_PREFIX + MutableCloudEventAttributes.DATACONTENTTYPE)) + .isEqualTo("application/json"); + + structuredMessage = MessageBuilder.withPayload(payloadNoPrefix).setHeader(MessageHeaders.CONTENT_TYPE, + CloudEventAttributeUtils.APPLICATION_CLOUDEVENTS_VALUE + "+json").build(); + + binaryMessage = (Message>) CloudEventMessageUtils.toBinary(structuredMessage, converter); + assertThat(binaryMessage.getHeaders().containsKey("data")).isFalse(); + attributes = CloudEventAttributeUtils.toAttributes(binaryMessage.getHeaders()); + + headers = attributes.toMap(CloudEventAttributeUtils.HTTP_ATTR_PREFIX); + assertThat(headers.get(CloudEventAttributeUtils.HTTP_ATTR_PREFIX + MutableCloudEventAttributes.ID)) + .isEqualTo("A234-1234-1234"); + assertThat(headers.get(CloudEventAttributeUtils.HTTP_ATTR_PREFIX + MutableCloudEventAttributes.SOURCE)) + .isEqualTo("https://spring.io/"); + assertThat(headers.get(CloudEventAttributeUtils.HTTP_ATTR_PREFIX + MutableCloudEventAttributes.TYPE)) + .isEqualTo("org.springframework"); + assertThat(headers.get(CloudEventAttributeUtils.HTTP_ATTR_PREFIX + MutableCloudEventAttributes.DATACONTENTTYPE)) + .isEqualTo("application/json"); + } + +}