Spring Cloud Stream 是 Spring Framework 中用于构建事件驱动微服务的一个模块,它简化了消息驱动的微服务开发。以下是关于使用 Spring Cloud Stream 进行简单事件驱动微服务的详细介绍:
什么是 Spring Cloud Stream?
Spring Cloud Stream 提供了一个轻量级的消息驱动框架,用于创建高度可扩展的事件驱动微服务。它通过将应用程序连接到现有的消息中间件(如 Kafka, RabbitMQ, Redis 等),使得开发人员可以专注于业务逻辑而不是消息基础设施。
主要特性
- 绑定器抽象: Spring Cloud Stream 引入了 “Binder” 的概念,它是一个适配器,负责与外部消息中间件交互。常见的 Binder 实现包括 Kafka Binder、Rabbit Binder 等。
- 通道抽象: 提供了 Source Channels 和 Sink Channels 来分别处理消息的发送和接收。这些通道可以与不同的 Binder 绑定,从而实现与不同消息中间件的无缝集成。
- 消息分区: 支持对消息进行分区处理,以便更好地利用资源并提高系统的吞吐量。
- 事务管理: 支持分布式事务管理,确保消息处理的可靠性和一致性。
- 动态配置: 可以通过配置文件或环境变量动态配置消息中间件的连接参数。
如何使用 Spring Cloud Stream?
- 添加依赖: 在你的 Spring Boot 项目中添加 Spring Cloud Stream 相关的依赖,例如
spring-cloud-starter-stream-kafka
。 - 定义通道: 在配置类中定义 Source Channel 和 Sink Channel,并指定它们所绑定的消息中间件。
- 编写处理器: 创建一个实现了
@StreamListener
注解的方法,该方法将监听指定的通道并处理接收到的消息。 - 发布消息: 使用
@Autowired
注入的通道对象发布消息到目标通道。
示例代码
// 配置类
@Configuration
public class StreamConfig {
@Bean
public Supplier<String> stringSupplier() {
return () -> "Hello, World!";
}
@Bean
public Function<String, String> uppercase() {
return value -> value.toUpperCase();
}
@Bean
public Sink<String> sink() {
return System.out::println;
}
}
// 主应用类
@SpringBootApplication
public class StreamApplication {
public static void main(String[] args) {
SpringApplication.run(StreamApplication.class, args);
}
}
在这个示例中,我们定义了一个字符串提供者、一个转换函数和一个输出接收器。当程序运行时,它会不断地发送 “Hello, World!” 字符串,将其转换为大写,然后打印出来。
Spring Cloud Stream 是一个用于构建消息驱动的微服务应用程序的框架,它支持多种类型的消息中间件。以下是 Spring Cloud Stream 支持的一些主要消息中间件:
- Kafka: Apache Kafka 是一种高吞吐量的分布式发布订阅消息系统,它被广泛用于实时数据流处理和事件驱动的应用程序。
- RabbitMQ: RabbitMQ 是一个开源的消息代理软件,支持多种消息传递协议,包括 AMQP、XMPP、SMTP 等。
- ActiveMQ: Apache ActiveMQ 是一个开源的消息中间件,实现了 JMS 规范,并支持多种传输协议。
- AWS SQS: Amazon Simple Queue Service (SQS) 是一个完全托管的消息队列服务,适用于需要解耦和缓冲的任务。
- Google Pub/Sub: Google Cloud Pub/Sub 是一个全托管的异步消息传递服务,用于在独立应用之间进行通信。
- Azure Service Bus: Azure Service Bus 是微软提供的一套消息传递解决方案,支持多种消息传递模式,包括队列和主题。
- Redis Streams: Redis Streams 是 Redis 数据库中的一个模块,提供了完整的消息队列功能,支持消息的发布和订阅。
- IBM MQ: IBM MQ 是一个高度可靠的企业消息传递系统,支持事务性消息传递和多种通信协议。
- NATS: NATS 是一个高性能的轻量级消息系统,适用于分布式系统中的异步消息传递。
- Netty: Netty 是一个基于 Java 的网络应用程序框架,虽然不是传统意义上的消息中间件,但也可以用于实现自定义的消息传递机制。
这些消息中间件各有特点和适用场景,开发者可以根据具体需求选择合适的中间件来集成到 Spring Cloud Stream 中。
Simple Event Driven Microservices with Spring Cloud Stream
Engineering
Ben Wilcock
October 15, 2019
Event driven architecture is great. But without a framework, writing the scaffolding required to work with popular event messaging platforms can be messy. In this post we’ll take a look at how Spring Cloud Stream can be used to simplify your code.
The Problem
You just want to write logic for your event driven application, but the boilerplate messaging code can get in the way. Connecting your apps to messaging services is tricky, and if you’re an enterprise developer, you probably need to work with multiple messaging technologies (either on-premises or in the cloud).
The Solution
Let a flexible messaging abstraction take care of the complex messaging platform integration so you can concentrate on writing simple clean business logic. Spring Cloud Stream is a great candidate. It unifies lots of popular messaging platforms behind one easy to use API including RabbitMQ, Apache Kafka, Amazon Kinesis, Google PubSub, Solace PubSub+, Azure Event Hubs, and Apache RocketMQ. It even smoothes away any subtle differences in approach and features between these platforms (like partitioning or exchanges for example) leaving you free to create innovative event-driven solutions.
In the demo that follows, you’ll see exactly how Spring Cloud Stream’s clever abstractions help make event streaming code cleaner and easier to work with. You’ll also see how trivial it is to switch between two different messaging platforms (RabbitMQ or Kafka) using Spring Cloud Stream’s binding libraries.
Before you start
These event driven microservices need the latest of these applications installed on your PC1:
Java 8
Docker (where we’ll run RabbitMQ and Kafka locally)
Git (optional)
Bash (assumed, although alternatives could work)
Running The Demo
First, clone the code repository from GitHub. To do this (if you have Git installed) open a new terminal window and issue the following command. If you don’t have Git installed, download and extract this zip file.
git clone https://github.com/benwilcock/spring-cloud-stream-demo.git
Upon inspection of the code you’ll notice that this repository consists of two microservices.
The Loansource microservice (in the /loansource folder). This microservice acts as a source of event messages. These events are Loan applications similar to what you’d see in the world of banking and finance. Each loan has a “name”, an “amount”, and a “status” (which is set to PENDING at first).
The Loancheck microservice (in the /loancheck folder). This microservice acts as a Loan processor. It checks which loans are good ones to make and sorts them into APPROVED or DECLINED states.
To run the demo, follow the instructions below.
Step 1: Start the Messaging Servers
In a fresh terminal window, go to the root folder of the project and issue the following command.
You’ll need “Docker” to be installed and running on your system for this script to work properly as it requires docker-compose.
./start-servers.sh
This script will start Kafka and RabbitMQ and stream the log output from both to the terminal window (unless you exit with Ctrl-C). The servers do not stop when you press Ctrl-C - they’ll keep running in the background. Once started these servers will all be available to applications running on your computer.
Step 2: Choose Between Kafka or RabbitMQ Mode
In steps 3 & 4 which follow, we must substitute the -P with the name of the messaging platform which we’d like to use.
For Kafka, use: -Pkafka
For RabbitMQ, use: -Prabbit
If you omit the -P setting completely, then Kafka is used.
Note: This demo is not designed to “bridge” messages between Kafka and RabbitMQ, so be sure to choose the same profile name in each of the two applications when you compile and run them. If bridging messaging systems is your goal see the documentation here.
Step 3: Generate Some Loan Events
In a new terminal window, make the /loansource directory the current directory using cd, and then issue the following command substituting the with the mode you’d like to run (either kafka or rabbit mode as discussed in step 2 above).
./mvnw clean package spring-boot:run -DskipTests=true -P
Once the loansource application has started, in the terminal window, you should see a message every second telling you that a new Loan event has been posted to the messaging platform in the PENDING state. Leave this microservice running and move onto the next step.
Step 4: Process The Loan Events
In another new terminal window, make the /loancheck directory your current directory, and then issue the following command, again substituting the with the mode you’d like to run.
./mvnw clean package spring-boot:run -DskipTests=true -P
Once the loancheck application has started, in the terminal window, you should see a message every second telling you that a new PENDING Loan application has been read from the messaging platform and either APPROVED or DECLINED. Skip ahead to “How It Works” if you’d like to understand how these applications were built.
Step 5: Stop the Demo
Once you’re done with the microservices, in each of the terminal windows for the /loansource and the /loancheck microservices press Ctrl-C. The application will come to a halt and the event processing will stop.
If you’re switching modes between Kafka and Rabbit, simply go back to Step 2 and repeat the process.
If you’re completely done with the demo and would also like to stop the Kafka and RabbitMQ servers, in a terminal window in the root folder of the project run the ./stop-servers.sh script. This isn’t necessary if you’re just switching between modes.
How it Works
Maven profiles (in each project’s pom.xml) control which of the Spring Cloud Stream bindings are added as dependencies when you build. When you choose -Pkafka then the [spring-cloud-stream-binder-kafka][kafka] dependency is added to the project. When you choose -Prabbit then the [spring-cloud-stream-binder-rabbit][rabbit] dependency is added.
kafkaYour choice of Maven profile also influences the spring.profiles.active property in the src/main/resources/application.properties file which switches the banner you see boot time.
The Loansource Microservice
For the Loansource misroservice we’re using a new feature from Spring Cloud Stream v2.1 - Spring Cloud Function support. With this new feature, all that’s required to get the LoansourceApplication microservice to act as a source of Loan messages is to declare an @Bean method which generates and returns a Supplier<>. In this case it’s a Supplier of type Loan. The function method code looks something like this…
@Bean
public Supplier supplyLoan() {
return () -> {
Loan loan = new Loan(UUID.randomUUID().toString(), “Ben”, 10000L);
LOG.info(“{} {} for ${} for {}”, loan.getStatus(), loan.getUuid(), loan.getAmount(), loan.getName());
return loan;
};
}
Supplier<> is a Java function data type. Because there is only one @Bean method that returns this type, Spring Cloud Stream knows exactly what to do next. By default it will trigger this function once every second and send the result to the default MessageChannel named “output”. What’s nice about this function method is that it only contains business logic so you can test it using a regular unit test.
We could use the spring.cloud.function.definition property in the application.properties file to explicitly declare which function bean we want to be bound to binding destinations - but for cases when you only have single @Bean defined, this is not necessary.
If we wanted to use a different poller interval, we can use the spring.integration.poller.fixed-delay property in the application.properties file.
The Loancheck Microservice
The loancheck microservice requires a little bit more code, but not much. It’s job is to sort the Loan events into separate channels. In order to do this, it is subscribing to the events coming from the source’s output topic and then sending them into either the approved or declined topics based on the the value of the loan, similar to a fraud checking facility.
Beacuse we’re using 3 messaging channels (one inbound and two outbound), a simple LoanProcessor interface is used to clarify the inputs and the outputs. Currently, it looks something like this:
@Component
public interface LoanProcessor {
String APPLICATIONS_IN = “output”; // Topic where the new loans appear
String APPROVED_OUT = “approved”; // Topic where the approved loans are sent
String DECLINED_OUT = “declined”; // Topic where the declined loans are sent
@Input(APPLICATIONS_IN)
SubscribableChannel sourceOfLoanApplications();
@Output(APPROVED_OUT)
MessageChannel approved();
@Output(DECLINED_OUT)
MessageChannel declined();
}
This LoanProcessor interface is first referenced in the @SpringBootApplication class (LoanCheckApplication.java) as a parameter of the @EnableBinding() annotation as you can see below.
@SpringBootApplication
@EnableBinding(LoanProcessor.class)
public class LoanCheckApplication {
public static void main(String[] args) {
SpringApplication.run(LoanCheckApplication.class, args);
}
}
In addition, a Spring @Component called the LoanChecker.java is constructed with this LoanProcessor at runtime. Furthermore, this component’s checkAndSortLoans(Loan) method is called automatically whenever a new Loan event arrives because it’s been annotated as a @StreamListener() for the LoanProcessor.APPLICATIONS_IN channel. You can see this annotation being used in the following code sample.
@StreamListener(LoanProcessor.APPLICATIONS_IN)
public void checkAndSortLoans(Loan loan) {
if (loan.getAmount() > MAX_AMOUNT) {
loan.setStatus(Statuses.DECLINED.name());
processor.declined().send(message(loan));
} else {
loan.setStatus(Statuses.APPROVED.name());
processor.approved().send(message(loan));
}
}
This method then sorts the Loan objects using simple business logic. Depending on the outcome of the sort it sends them onwards to either the processor.approved() channel or the processor.declined() channel (after setting their Loan Status accordingly).
Wrapping Up
As you can see, the separation of concerns that you get when using Spring Cloud Streams is very healthy indeed. There is absolutely zero Kafka or RabbitMQ specific code in either microservice. This allows us to focus on the business logic regardless of the messaging platform and you can easily swap messaging platforms simply by changing the “binder” dependencies in the project’s pom.xml.
There’s More…
You can see the events flowing through the messaging platforms as follows:
For Kafka the KafDrop tool on localhost:9000 may be used to observe the topics and the event messages. There is no login required.
For RabbitMQ the Rabbit Management Console can be found on localhost:15672 may be used to observe the exchanges and the event messages. To login the username is guest and the password is also guest. To observe the actual message contents, you may need to create a Queue manually and bind it to the desired topic using # as your routing key.
To keep up to date with the latest information on Spring Cloud Stream visit the projects dedicated project page on the Spring website.
To create your own Spring project from scratch, use the project configurator at start.spring.io.
If you’d like to go deeper with Spring and pure Kafka check out these great blog posts:
Gary Russell: Spring for Apache Kafka Deep Dive: Error Handling, Message Conversion and Transaction Support
Soby Chacko: Spring for Apache Kafka Deep Dive: Apache Kafka and Spring Cloud Stream
—
Footnotes
The microservice code in this repository is written and packaged using Maven, Spring Boot, and Spring Cloud Stream. At runtime the code relies on Kafka, Zookeeper, RabbitMQ, and KafDrop (a Docker image by by Obsidian Dynamics). Everything in this list has been provided for you - you don’t need to install them.
comments powered by Disqus
↩︎