编写kafka stream应用程序

本文档详细介绍了如何编写Kafka Stream应用程序,包括Pipe、Line Split和WordCount三个示例。首先,展示了如何创建一个简单的Pipe应用程序,实现数据从一个主题传输到另一个主题。接着,扩展到Line Split程序,对输入的文本行进行拆分并生成单词流。最后,通过WordCount应用程序,演示如何对单词进行计数并存储在状态存储中,同时将结果写入Kafka主题。

摘要生成于 C知道 ,由 DeepSeek-R1 满血版支持, 前往体验 >

关于kafka stream的介绍此处不再多做介绍,可以参考别的博客。直接看代码。

第一个stream应用程序Pipe

创建一个pipe类:

public class Pipe {
 
    public static void main(String[] args) throws Exception {
 
    }
}

编写Streams应用程序的第一步是创建一个java.util.Properties映射,以指定StreamsConfig中定义的不同Streams执行配置值。 需要设置几个重要的配置值:StreamsConfig.BOOTSTRAP_SERVERS_CONFIG(用于指定用于建立与Kafka集群的初始连接的主机/端口对的列表)和StreamsConfig.APPLICATION_ID_CONFIG(用于提供Streams的唯一标识符) 应用程序,以便与与同一个Kafka群集通信的其他应用程序区分开来:

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 

此外,可以在同一映射中自定义其他配置,例如,记录键值对的默认序列化和反序列化库:

props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

接下来,我们将定义Streams应用程序的计算逻辑。 在Kafka Streams中,此计算逻辑被定义为连接的处理器节点的拓扑。 我们可以使用拓扑构建器来构建这样的拓扑,

final StreamsBuilder builder = new StreamsBuilder();

然后使用以下拓扑生成器从名为streams-plaintext-input的Kafka topic创建源流:

KStream<String, String> source = builder.stream("streams-plaintext-input");

现在,我们得到了一个KStream,它从其源Kafka主题streams-plaintext-input连续生成记录。 记录被组织为String类型的键值对。 我们可以使用此流执行的最简单的操作是将其写入另一个Kafka主题,即名为streams-pipe-output

source.to("streams-pipe-output");

请注意,我们还可以将上述两行连接为一行,如下所示:

builder.stream("streams-plaintext-input").to("streams-pipe-output");

通过执行以下操作,我们可以检查从此builder创建的拓扑类型:

final Topology topology = builder.build();

并将其描述打印为标准输出为:

System.out.println(topology.describe());

此时,如果我们编译并运行该程序,它将输出以下信息:

Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [streams-plaintext-input])
      --> KSTREAM-SINK-0000000001
    Sink: KSTREAM-SINK-0000000001 (topic: streams-pipe-output)
      <-- KSTREAM-SOURCE-0000000000

从上面的打印结果当中,说明了构造的拓扑具有两个处理器节点,即源节点KSTREAM-SOURCE-0000000000和宿节点KSTREAM-SINK-0000000001。 KSTREAM-SOURCE-0000000000连续从Kafka主题流-明文输入中读取记录,并将它们通过管道传输到其下游节点KSTREAM-SINK-0000000001; KSTREAM-SINK-0000000001会将接收到的每个记录写入另一个Kafka topic streams-pipe-output(->和<-箭头指示此节点的下游和上游处理器节点,即“子级”和“父级”)。它还说明了这种简单的拓扑没有与之关联的全局状态存储。

请注意,当我们在代码中构建拓扑时,我们始终可以在任何给定的点上像上面一样描述拓扑,因此作为用户,可以交互地“尝试”拓扑中定义的计算逻辑,直到对它满意为止。假设我们已经完成了这种简单的拓扑结构,即以一种无限的流方式将数据从一个Kafka主题传送到另一个主题,现在我们可以使用上面刚刚构建的两个组件来构造Streams客户端:指定的java.util.Properties实例和Topology 对象。

final KafkaStreams streams = new KafkaStreams(topology, props);

通过调用其start()函数,我们可以触发此客户端的执行。 在此客户端上调用close()之前,执行不会停止。 例如,我们可以添加一个带有倒计时闩锁的关闭hook ,以捕获用户中断并在终止该程序时关闭客户端:

final CountDownLatch latch = new CountDownLatch(1);
 
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
    @Override
    public void run() {
        streams.close();
        latch.countDown();
    }
});
 
try {
    streams.start();
    latch.a
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值