4.1 Kafka Streams 流处理框架
Kafka Streams 是 Apache Kafka 提供的轻量级流处理框架,可在无额外集群的情况下实现实时数据处理。与 Flink、Spark Streaming 等框架相比,它的优势在于与 Kafka 无缝集成、低延迟、高吞吐量。
核心概念
- 流(Stream):无限的、有序的、不可变的键值对序列,是 Kafka Streams 的核心抽象。
- 表(Table):基于键的变更日志,反映了键值对的当前状态,可理解为动态更新的数据库表。
- 流表转换:Kafka Streams 支持将流转换为表(
toTable()
)或将表转换为流(toStream()
),实现事件流与状态的相互转换。 - 状态存储(State Store):用于存储流处理过程中的中间结果,如窗口聚合的中间值、join 操作的临时表。
- 窗口(Window):对流数据按时间或数量进行分组,支持滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)和会话窗口(Session Window)。
编程模型
Kafka Streams 提供两种 API:
- DSL(Domain-Specific Language):高层 API,提供
map()
、filter()
、groupByKey()
、aggregate()
等操作,简单易用。 - Processor API:底层 API,允许自定义流处理逻辑,实现更复杂的处理模式。
以下是使用 DSL 实现单词计数的示例:
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;
import java.util.Properties;
public class WordCountExample {
public static void main(String[] args) {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream("input-topic");
KTable<String, Long> wordCounts = textLines
.flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word)
.count(Materialized.as("counts-store"));
wordCounts.toStream().to("output-topic", Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
// 优雅关闭
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
容错与 Exactly-Once 语义
Kafka Streams 通过以下机制实现容错:
- Changelog Topic:每个状态存储对应一个 Changelog Topic,用于记录状态变更,当实例失败重启时可恢复状态。
- 事务性处理:通过 Kafka 的事务 API,将处理结果和 Offset 提交作为原子操作,保证 Exactly-Once 语义。
配置事务性流处理:
config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
4.2 与其他系统集成
日志采集:Flume/Kafka Connect 对接 Kafka
1. Flume 集成 Kafka:
配置 Flume Source(如exec
、spoolDir
)收集日志
配置 Kafka Sink 将日志发送到 Kafka:
# flume.conf示例
agent.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafka-sink.kafka.bootstrap.servers = localhost:9092
agent.sinks.kafka-sink.kafka.topic = logs-topic
agent.sinks.kafka-sink.kafka.flumeBatchSize = 20
agent.sinks.kafka-sink.kafka.producer.acks = 1
2. Kafka Connect 集成:
使用 FileStream Source Connector 从文件读取数据:
{
"name": "file-source",
"config": {
"connector.class": "FileStreamSource",
"tasks.max": "1",
"file": "/var/log/application.log",
"topic": "logs-topic"
}
}
使用 JDBC Sink Connector 将 Kafka 数据写入关系型数据库:
{
"name": "jdbc-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"connection.url": "jdbc:mysql://localhost:3306/mydb",
"connection.user": "username",
"connection.password": "password",
"topics": "orders-topic",
"auto.create": "true"
}
}
数据湖 / 数仓集成
Kafka 可作为数据湖的实时数据摄入层,与 Hadoop、Iceberg、Delta Lake 等集成:
1. Kafka → HDFS:使用 Kafka Connect 的 HDFS Sink Connector 将数据写入 HDFS:
{
"name": "hdfs-sink",
"config": {
"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
"tasks.max": "1",
"topics": "data-lake-topic",
"hdfs.url": "hdfs://namenode:8020",
"flush.size": "3000",
"format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat",
"schema.compatibility": "BACKWARD"
}
}
2. Kafka → Iceberg:通过自定义 Sink 将 Kafka 数据写入 Iceberg 表:
// 伪代码示例
IcebergSink icebergSink = new IcebergSink("jdbc:hive2://localhost:10000/default", "iceberg_table");
consumer.subscribe(Collections.singletonList("iceberg-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
icebergSink.write(record.key(), record.value());
}
icebergSink.flush();
}
微服务架构集成
使用 Spring Kafka 实现消息驱动的微服务:
1. 生产者服务:
@Service
public class OrderProducer {
private final KafkaTemplate<String, Order> kafkaTemplate;
@Autowired
public OrderProducer(KafkaTemplate<String, Order> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendOrder(Order order) {
kafkaTemplate.send("orders-topic", order.getId(), order);
}
}
2. 消费者服务:
@Service
public class OrderProcessor {
@KafkaListener(topics = "orders-topic", groupId = "order-group")
public void processOrder(@Payload Order order) {
// 处理订单逻辑
System.out.println("Processing order: " + order);
// 更新订单状态并发送确认消息
}
}
4.3 多集群与数据复制
跨数据中心同步:MirrorMaker 2.0
MirrorMaker 2.0 是 Kafka 官方提供的多集群数据复制工具,支持单向 / 双向复制、增量配置、自动故障转移:
1. 配置文件示例:
# mm2.properties
clusters = source, target
source.bootstrap.servers = source-cluster:9092
target.bootstrap.servers = target-cluster:9092
# 复制策略
source->target.enabled = true
target->source.enabled = false # 单向复制
# 连接器配置
tasks.max = 1
connector.class = org.apache.kafka.connect.mirror.MirrorSourceConnector
replication.factor = 3
offset-syncs.topic.replication.factor = 3
sync.topic.configs = true
2. 启动命令:
bin/connect-mirror-maker.sh config/mm2.properties
多活架构设计
在异地多活架构中,Kafka 集群需满足跨区域数据同步与故障自动切换:
- 主动 - 主动模式:两个数据中心的 Kafka 集群同时对外提供服务,通过双向 MirrorMaker 同步数据。需解决冲突检测与解决问题。
- 主动 - 被动模式:主数据中心提供服务,备数据中心通过单向 MirrorMaker 同步数据。故障时手动或自动切换至备中心。
以下是主动 - 主动模式的配置示例:
# 集群A到集群B的复制配置
clusters = cluster-a, cluster-b
cluster-a.bootstrap.servers = kafka-a-1:9092,kafka-a-2:9092
cluster-b.bootstrap.servers = kafka-b-1:9092,kafka-b-2:9092
# 双向复制
cluster-a->cluster-b.enabled = true
cluster-b->cluster-a.enabled = true
# 主题过滤(仅复制需要的主题)
topics = .*
topics.exclude = __.*
设计多活架构时需考虑:
- 网络延迟:跨区域复制可能引入较高延迟,需优化 MirrorMaker 配置。
- 冲突解决:双向复制时可能出现数据冲突,可通过时间戳、业务规则或 UUID 等方式解决。
- 故障恢复:设计自动化故障转移机制,确保切换后数据一致性。