Flink 停机恢复(savepoint)入门示例

本文介绍如何使用Apache Flink实现状态化的单词计数,包括从socket接收数据、按单词分割并计数,以及利用Savepoint进行任务恢复,确保计算从上次停止处继续。

摘要生成于 C知道 ,由 DeepSeek-R1 满血版支持, 前往体验 >

目标:使用flink从socket接收字符串,根据空格分割成单词,统计单词出现的次数。使用savepoint取消任务,恢复,使计算从上一次停止的地方继续。

主程序:

package com.maxiu.state;

import com.maxiu.function.StatefulWordCountFunction;
import com.maxiu.model.WordWithCount;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

/**
 * @author xianghu.wang
 * @time 2019/8/19
 * @description
 */
public class StatefulWordCount {
    public static void main(String[] args) throws Exception {
        // 1.获取执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 设置检查点时间间隔为1s,取消任务时不删除检查点
        env.enableCheckpointing(1000);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        // 2.读取数据:连接到socket获取文本数据
        DataStream<String> text = env.socketTextStream("localhost", 9999, "\n");

        // 3.业务逻辑编码
        DataStream<WordWithCount> windowCounts = text
                .flatMap(new FlatMapFunction<String, WordWithCount>() {
                    @Override
                    public void flatMap(String value, Collector<WordWithCount> out) {
                        for (String word : value.split("\\s")) {
                            out.collect(new WordWithCount(word, 1));
                        }
                    }
                })
                .keyBy(WordWithCount::getWord)
                .timeWindow(Time.seconds(5), Time.seconds(5))
                .process(new StatefulWordCountFunction());


        // 4.输出结果:使用一个线程打印结果
        windowCounts.print().setParallelism(1);

        // 5.开始执行任务
        env.execute("StatefulWordCount");
    }
}

bean类:

package com.ccclubs.model;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.ToString;

/**
 * @author xianghu.wang
 * @time 2019/8/19
 * @description
 */
@Data
@AllArgsConstructor
public class WordWithCount {
    public String word;
    public Integer count;
}

开干:

1、打包: 包名 flink-demo-stateful.jar

2、启动 netcat

3、启动 job

4、随便发几个单词,计算

5、取消job,保存savepoint

该步骤需要指定 savepoint地址,以及jobid。

flink cancel -s hdfs://zhumei00:9000/flink/savepoints 2084e7baadd60a8f762feec6b2826771

6、从savepoint恢复job

flink run -s hdfs://zhumei00:9000/flink/savepoints/savepoint-2084e7-591930867f28/_metadata flink-demo-stateful.jar

 

7、查看ui界面

8、继续发单词

9、查看ui

搞定

从checkpoint恢复方法相同只是命令不同而已。

参考:

https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/cli.html

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值