目标:使用flink从socket接收字符串,根据空格分割成单词,统计单词出现的次数。使用savepoint取消任务,恢复,使计算从上一次停止的地方继续。
主程序:
package com.maxiu.state;
import com.maxiu.function.StatefulWordCountFunction;
import com.maxiu.model.WordWithCount;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
/**
* @author xianghu.wang
* @time 2019/8/19
* @description
*/
public class StatefulWordCount {
public static void main(String[] args) throws Exception {
// 1.获取执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置检查点时间间隔为1s,取消任务时不删除检查点
env.enableCheckpointing(1000);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 2.读取数据:连接到socket获取文本数据
DataStream<String> text = env.socketTextStream("localhost", 9999, "\n");
// 3.业务逻辑编码
DataStream<WordWithCount> windowCounts = text
.flatMap(new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(String value, Collector<WordWithCount> out) {
for (String word : value.split("\\s")) {
out.collect(new WordWithCount(word, 1));
}
}
})
.keyBy(WordWithCount::getWord)
.timeWindow(Time.seconds(5), Time.seconds(5))
.process(new StatefulWordCountFunction());
// 4.输出结果:使用一个线程打印结果
windowCounts.print().setParallelism(1);
// 5.开始执行任务
env.execute("StatefulWordCount");
}
}
bean类:
package com.ccclubs.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.ToString;
/**
* @author xianghu.wang
* @time 2019/8/19
* @description
*/
@Data
@AllArgsConstructor
public class WordWithCount {
public String word;
public Integer count;
}
开干:
1、打包: 包名 flink-demo-stateful.jar
2、启动 netcat
3、启动 job
4、随便发几个单词,计算
5、取消job,保存savepoint
该步骤需要指定 savepoint地址,以及jobid。
flink cancel -s hdfs://zhumei00:9000/flink/savepoints 2084e7baadd60a8f762feec6b2826771
6、从savepoint恢复job
flink run -s hdfs://zhumei00:9000/flink/savepoints/savepoint-2084e7-591930867f28/_metadata flink-demo-stateful.jar
7、查看ui界面
8、继续发单词
9、查看ui
搞定
从checkpoint恢复方法相同只是命令不同而已。
参考:
https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/cli.html