一.前言
大致的技术架构图
项目效果图
实现的功能
解决了哪些技术问题
性能调优
学习收获
环境参数
前置知识
目录
计算框架的发展
二.flink入门介绍
2.1什么是flink
简而言之,他是个有状态的 可以处理流或者批的分布式数据处理引擎框架,
可以再任何通用集群环境下以任意规模以内存级别的运行
2.2处理有界与无界数据的区别
2.3部署在任何地方
可以使用yarn,mesos,k8s等方式构建集群
2.4以任何规模运行
每天可处理万亿数据,可以处理TB级别的状态,可以运行在上千个core上
2.5内存级别的性能
状态保存在内存中
2.6api的分层
三.flink项目开发实战
3.1使用gradle构建多模块项目
创建父工程
创建子模块
然后项目架子大概就这样
3.2第一个五章俱全的demo
DemoSourceFunction 数据源
package com.juege.source;
import com.juege.pojo.Student;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
public class DemoSourceFunction extends RichSourceFunction<Student> {
@Override
public void run(SourceContext<Student> ctx) throws Exception {
for (int i = 0; i < 10; i++) {
Student student = new Student(i,"libai"+i+"");
ctx.collect(student);
}
}
@Override
public void cancel() {
}
}
中间处理数据的算子 这里是过滤小于年龄五十岁的学生
package com.juege.operater;
import com.juege.pojo.Student;
import org.apache.flink.api.common.functions.RichFilterFunction;
public class DemoFilter extends RichFilterFunction<Student> {
@Override
public boolean filter(Student student) {
return student.age > 50;
}
}
DemoSinkFunction 数据一般落库或者发到下游 这里简单演示只是个输出
package com.juege.sink;
import com.juege.pojo.Student;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
public class DemoSinkFunction extends RichSinkFunction<Student> {
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
}
@Override
public void invoke(Student student, Context context) throws Exception {
//正常这里会把数据落库或者发到下游
System.out.println("I am a student"+student.toString());
}
}
main方法运行
可以看到需要构建env 添加数source operator sink 等 然后执行
package com.juege;
import com.juege.operater.DemoFilter;
import com.juege.pojo.Student;
import com.juege.sink.DemoSinkFunction;
import com.juege.source.DemoSourceFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
// Press Shift twice to open the Search Everywhere dialog and type `show whitespaces`,
// then press Enter. You can now see whitespace characters in your code.
public class SimpleSourceAndSinkDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
simpleSourceAndSink(env);
}
private static void simpleSourceAndSink(StreamExecutionEnvironment env) throws Exception {
DataStreamSource<Student> studentDataStreamSource = env.addSource(new DemoSourceFunction());
studentDataStreamSource
.filter(new DemoFilter())
.addSink(new DemoSinkFunction());
env.execute("第一个个人练习2年半的flink demo");
}
}
控制台输出结果 可以看到 小于五十岁的数据都被过滤然后学生信息被打印出来了
3.3根据flink编程模型联想其它同类框架
3.4并行度
话不多说,先看如下代码
package com.juege;
import com.juege.operater.DemoFilter;
import com.juege.pojo.Student;
import com.juege.sink.DemoSinkFunction;
import com.juege.source.DemoSourceFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
// Press Shift twice to open the Search Everywhere dialog and type `show whitespaces`,
// then press Enter. You can now see whitespace characters in your code.
public class SimpleSourceAndSinkDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// env.setParallelism(6);
simpleSourceAndSink(env);
}
private static void simpleSourceAndSink(StreamExecutionEnvironment env) throws Exception {
DataStreamSource<Student> studentDataStreamSource = env.addSource(new DemoSourceFunction());
SingleOutputStreamOperator<Student> streamOperator = studentDataStreamSource
.filter(new DemoFilter());
DataStreamSink<Student> studentDataStreamSink = streamOperator.addSink(new DemoSinkFunction());
System.out.println(studentDataStreamSink.getTransformation().getParallelism());
System.out.println(Runtime.getRuntime().availableProcessors());
System.out.println(studentDataStreamSource.getParallelism());
System.out.println(streamOperator.getParallelism());
env.execute("第一个个人练习2年半的flink demo");
}
}
可以看到 控制台输出结果如下
这说明了什么?
默认并行度
是可用核心数然后source的 并行度默认为1
自定义并行度
比如我们想默认并行度改为6 就如下直接在env上设置即可
可以看到除了source其它的filter 与 sink的并行度都为6了
那么我们想让source的并行度也为6该咋整,可以看到修改父类改成RichParallelSourceFunction就行,但是可以看到控制台输出的数据, student的数据量相当于翻了6倍,这也就是为啥source默认整成1的原因,如果source要并行度大于1比如kafka 6个分区 那么 需要实现 ParallelSourceFunction或者RichParallelSourceFunction
3.5合理的并行度设置
设一共有3个TaskManager,每一个TaskManager中的分配3个TaskSlot,也就是每个TaskManager可以接收3个task,一共9个TaskSlot,如果我们设置parallelism.default=1,即运行程序默认的并行度为1,9个TaskSlot只用了1个,有8个空闲,因此,设置合适的并行度才能提高效率。
四.flink架构
必备的component为如下
client:提交job
jobmanager 派活 全局的调度
taskmanager 干活的工头 一般一个taskmanager处于一个jvm
taskmanager下还有slot 正常一个slot对应一个干活的线程
额外的component就是下面的黄色的几个 比如资源的管理 比如metrics的存储
如下也可以看看官方文档的解释
Component | Purpose | Implementations |
---|---|---|
Flink Client | Compiles batch or streaming applications into a dataflow graph, which it then submits to the JobManager. | |
JobManager | JobManager is the name of the central work coordination component of Flink. It has implementations for different resource providers, which differ on high-availability, resource allocation behavior and supported job submission modes. JobManager modes for job submissions:
|
|
TaskManager | TaskManagers are the services actually performing the work of a Flink job. | |
External Components (all optional) | ||
High Availability Service Provider | Flink's JobManager can be run in high availability mode which allows Flink to recover from JobManager faults. In order to failover faster, multiple standby JobManagers can be started to act as backups. | |
File Storage and Persistency | For checkpointing (recovery mechanism for streaming jobs) Flink relies on external file storage systems | See FileSystems page. |
Resource Provider | Flink can be deployed through different Resource Provider Frameworks, such as Kubernetes or YARN. | See JobManager implementations above. |
Metrics Storage | Flink components report internal metrics and Flink jobs can report additional, job specific metrics as well. | See Metrics Reporter page. |
Application-level data sources and sinks | While application-level data sources and sinks are not technically part of the deployment of Flink cluster components, they should be considered when planning a new Flink production deployment. Colocating frequently used data with Flink can have significant performance benefits | For example:
|
五.单节点部署及访问ui界面
步骤
先官网下载flink Downloads | Apache Flink
然后启动bin目录下的 start-cluster.sh 记得之前要配置jdk环境变量
[root@localhost bin]# java -version java version "12.0.1" 2019-04-16 Java(TM) SE Runtime Environment (build 12.0.1+12) Java HotSpot(TM) 64-Bit Server VM (build 12.0.1+12, mixed mode, sharing) [root@localhost opt]# cd flink-1.10.1-bin-scala_2.12/ [root@localhost flink-1.10.1-bin-scala_2.12]# ls flink-1.10.1 [root@localhost flink-1.10.1-bin-scala_2.12]# cd flink-1.10.1/ [root@localhost flink-1.10.1]# ls bin conf examples flink-test-1.0-SNAPSHOT.jar lib LICENSE licenses log NOTICE opt plugins README.txt [root@localhost flink-1.10.1]# cd bin/ [root@localhost bin]# ls bash-java-utils.jar flink historyserver.sh kubernetes-session.sh mesos-taskmanager.sh sql-client.sh start-scala-shell.sh stop-zookeeper-quorum.sh zookeeper.sh config.sh flink-console.sh jobmanager.sh mesos-appmaster-job.sh pyflink-gateway-server.sh standalone-job.sh start-zookeeper-quorum.sh taskmanager.sh find-flink-home.sh flink-daemon.sh kubernetes-entry.sh mesos-appmaster.sh pyflink-shell.sh start-cluster.sh stop-cluster.sh yarn-session.sh [root@localhost bin]# ./start-cluster.sh
访问ui界面,默认是8081端口 也可以在config下的flink-conf.yaml里配置
预览图如下,可以看到我们之前说的 jobmanager 以及taskmanager都有
然后可以提交job 编译成jar包后提交时写明启动的main class 然后加上参数即可
六.connectors
flink好用的一个点还有他封装好了一些对一些比较流行的数据库与MQ的连接的封装,比如 如果没有这个
封装,那么我们需要自己手动写更多的代码 这里flink封装好 更加便于使用
这里拿kafka举例,引入如下依赖然后使用如下代码即可将数据源改为kafka
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.17.0</version>
</dependency>
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(brokers)
.setTopics("input-topic")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
七.operators
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/operators/overview/
比如我们常用的map,flatmap,filter,keyby等等,相信了解jdk8中的stream流或者有过其它大数据产品经验 比如hadoop的小伙伴应该很熟悉,这里不过多讲解,示例如下
map
DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer value) throws Exception {
return 2 * value;
}
});
flatmap
dataStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out)
throws Exception {
for(String word: value.split(" ")){
out.collect(word);
}
}
});
map
map可以理解为映射,对每个元素进行一定的变换后,映射为另一个元素。
flatmap
flatmap可以理解为将元素摊平,每个元素可以变为0个、1个、或者多个元素。
filter
filter是进行筛选。
keyBy
逻辑上将Stream根据指定的Key进行分区,是根据key的散列值进行分区的。
reduce
reduce是归并操作,它可以将KeyedStream 转变为 DataStream。
fold
给定一个初始值,将各个元素逐个归并计算。它将KeyedStream转变为DataStream。
union
union可以将多个流合并到一个流中,以便对合并的流进行统一处理。是对多个流的水平拼接。
参与合并的流必须是同一种类型。
join
根据指定的Key将两个流进行关联。
coGroup
关联两个流,关联不上的也保留下来。
connect
参考: https://www.jianshu.com/p/5b0574d466f8
将两个流纵向地连接起来。DataStream的connect操作创建的是ConnectedStreams或BroadcastConnectedStream,它用了两个泛型,即不要求两个dataStream的element是同一类型。
split
参考: https://cloud.tencent.com/developer/article/1382892
union多流合并 之后可以看作一个流
connect由于 合并后相当于还是俩流 所以有与之对应的类与算子方法 比如CoMapFunction ,就是对俩流分别处理还有 CoFlatMapFunction
八.生命周期方法
生命周期方法只在 继承 类名中带Rich的算子中才有
如下,open方法调用的
package com.juege.operater;
import com.juege.pojo.Student;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
public class DemoFilter extends RichFilterFunction<Student> {
@Override
public boolean filter(Student student) {
System.out.println("I am the number"+getRuntimeContext().getIndexOfThisSubtask()+" task");
return student.age > 50;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
}
@Override
public void close() throws Exception {
super.close();
}
@Override
public RuntimeContext getRuntimeContext() {
return super.getRuntimeContext();
}
}
我将以如下案例让大家看看 这些方法的作用以及 他们执行的时机与次数
先展示用到的类代码
pojo
package com.juege.pojo;
import lombok.Data;
@Data
public class Student {
public int age;
public String name;
public Student(int age, String name) {
this.age = age;
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "name:" + name + " age:" + age;
}
}
source
package com.juege.source;
import com.juege.pojo.Student;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
public class DemoSourceFunction extends RichParallelSourceFunction<Student> {
@Override
public void open(Configuration parameters) throws Exception {
System.out.println("number " + (getRuntimeContext().getIndexOfThisSubtask() + 1) + "task" + "execute open method in DemoSinkFunction class !");
super.open(parameters);
}
@Override
public void run(SourceContext<Student> ctx) {
for (int i = 1; i < 11; i++) {
Student student = new Student(i, "libai" + i + "");
System.out.println("number " + (getRuntimeContext().getIndexOfThisSubtask() + 1) + "task" + "collect data: " + student);
ctx.collect(student);
}
}
@Override
public void cancel() {
}
@Override
public RuntimeContext getRuntimeContext() {
return super.getRuntimeContext();
}
}
filter
package com.juege.operater;
import com.juege.pojo.Student;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
public class DemoFilter extends RichFilterFunction<Student> {
@Override
public boolean filter(Student student) {
System.out.println("number " + (getRuntimeContext().getIndexOfThisSubtask() + 1) + "task"
+ "filter data: " + student + " result:" + (student.age > 10));
return student.age > 5;
}
@Override
public void open(Configuration parameters) throws Exception {
System.out.println("number " + (getRuntimeContext().getIndexOfThisSubtask() + 1) + "task"
+ "execute open method in DemoFilter class !");
super.open(parameters);
}
@Override
public void close() throws Exception {
super.close();
}
@Override
public RuntimeContext getRuntimeContext() {
return super.getRuntimeContext();
}
}
sink
package com.juege.sink;
import com.juege.pojo.Student;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
public class DemoSinkFunction extends RichSinkFunction<Student> {
@Override
public void open(Configuration parameters) throws Exception {
System.out.println("number " + (getRuntimeContext().getIndexOfThisSubtask() + 1) + "task"
+ "execute open method in DemoSinkFunction class !");
super.open(parameters);
}
@Override
public void invoke(Student student, Context context) {
//正常这里会把数据落库或者发到下游
System.out.println("number " + (getRuntimeContext().getIndexOfThisSubtask() + 1) + "task"
+ "sink data: " + student);
}
@Override
public RuntimeContext getRuntimeContext() {
return super.getRuntimeContext();
}
}
启动类
package com.juege;
import com.juege.operater.DemoFilter;
import com.juege.pojo.Student;
import com.juege.sink.DemoSinkFunction;
import com.juege.source.DemoSourceFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
// Press Shift twice to open the Search Everywhere dialog and type `show whitespaces`,
// then press Enter. You can now see whitespace characters in your code.
public class SimpleSourceAndSinkDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
simpleSourceAndSink(env);
}
private static void simpleSourceAndSink(StreamExecutionEnvironment env) throws Exception {
DataStreamSource<Student> studentDataStreamSource = env.addSource(new DemoSourceFunction());
SingleOutputStreamOperator<Student> streamOperator = studentDataStreamSource
.filter(new DemoFilter());
DataStreamSink<Student> studentDataStreamSink = streamOperator.addSink(new DemoSinkFunction());
System.out.println(Runtime.getRuntime().availableProcessors());
System.out.println(studentDataStreamSource.getParallelism());
System.out.println(streamOperator.getParallelism());
System.out.println(studentDataStreamSink.getTransformation().getParallelism());
env.execute("第一个个人练习2年半的flink demo");
}
}
输出结果:可以看到 当我们都使用parallel修饰的父类的时候并行度统一的变成了 3,
source也是,继续往下看 open方法在每个任务执行之前就执行了,并且执行次数与并行度数一致,一般open方法中做一些初始化配置,获取连接等事情,再接着往下看,可以看到我们的数据来源变成了三份 然后每份数据 都会走一遍各个算子的方法 如果filter通过就到sink
> Task :flink-basic:SimpleSourceAndSinkDemo.main()
12
3
3
3
number 3taskexecute open method in DemoSinkFunction class !
number 1taskexecute open method in DemoSinkFunction class !
number 2taskexecute open method in DemoSinkFunction class !
number 3taskexecute open method in DemoFilter class !
number 2taskexecute open method in DemoFilter class !
number 1taskexecute open method in DemoFilter class !
number 3taskexecute open method in DemoSinkFunction class !
number 2taskexecute open method in DemoSinkFunction class !
number 1taskexecute open method in DemoSinkFunction class !
number 3taskcollect data: name:libai1 age:1
number 2taskcollect data: name:libai1 age:1
number 1taskcollect data: name:libai1 age:1
number 2taskfilter data: name:libai1 age:1 result:false
number 1taskfilter data: name:libai1 age:1 result:false
number 3taskfilter data: name:libai1 age:1 result:false
number 1taskcollect data: name:libai2 age:2
number 2taskcollect data: name:libai2 age:2
number 3taskcollect data: name:libai2 age:2
number 2taskfilter data: name:libai2 age:2 result:false
number 1taskfilter data: name:libai2 age:2 result:false
number 3taskfilter data: name:libai2 age:2 result:false
number 2taskcollect data: name:libai3 age:3
number 3taskcollect data: name:libai3 age:3
number 2taskfilter data: name:libai3 age:3 result:false
number 1taskcollect data: name:libai3 age:3
number 2taskcollect data: name:libai4 age:4
number 1taskfilter data: name:libai3 age:3 result:false
number 2taskfilter data: name:libai4 age:4 result:false
number 3taskfilter data: name:libai3 age:3 result:false
number 2taskcollect data: name:libai5 age:5
number 1taskcollect data: name:libai4 age:4
number 2taskfilter data: name:libai5 age:5 result:false
number 1taskfilter data: name:libai4 age:4 result:false
number 3taskcollect data: name:libai4 age:4
number 1taskcollect data: name:libai5 age:5
number 3taskfilter data: name:libai4 age:4 result:false
number 2taskcollect data: name:libai6 age:6
number 1taskfilter data: name:libai5 age:5 result:false
number 2taskfilter data: name:libai6 age:6 result:true
number 3taskcollect data: name:libai5 age:5
number 1taskcollect data: name:libai6 age:6
number 3taskfilter data: name:libai5 age:5 result:false
number 1taskfilter data: name:libai6 age:6 result:true
number 3taskcollect data: name:libai6 age:6
number 3taskfilter data: name:libai6 age:6 result:true
number 2tasksink data: name:libai6 age:6
number 3tasksink data: name:libai6 age:6
number 1tasksink data: name:libai6 age:6
number 2taskcollect data: name:libai7 age:7
number 1taskcollect data: name:libai7 age:7
number 3taskcollect data: name:libai7 age:7
number 1taskfilter data: name:libai7 age:7 result:true
number 2taskfilter data: name:libai7 age:7 result:true
number 1tasksink data: name:libai7 age:7
number 3taskfilter data: name:libai7 age:7 result:true
number 1taskcollect data: name:libai8 age:8
number 2tasksink data: name:libai7 age:7
number 1taskfilter data: name:libai8 age:8 result:true
number 3tasksink data: name:libai7 age:7
number 1tasksink data: name:libai8 age:8
number 2taskcollect data: name:libai8 age:8
number 1taskcollect data: name:libai9 age:9
number 3taskcollect data: name:libai8 age:8
number 1taskfilter data: name:libai9 age:9 result:true
number 3taskfilter data: name:libai8 age:8 result:true
number 1tasksink data: name:libai9 age:9
number 2taskfilter data: name:libai8 age:8 result:true
number 1taskcollect data: name:libai10 age:10
number 3tasksink data: name:libai8 age:8
number 2tasksink data: name:libai8 age:8
number 3taskcollect data: name:libai9 age:9
number 1taskfilter data: name:libai10 age:10 result:true
number 2taskcollect data: name:libai9 age:9
number 1tasksink data: name:libai10 age:10
number 3taskfilter data: name:libai9 age:9 result:true
number 2taskfilter data: name:libai9 age:9 result:true
number 3tasksink data: name:libai9 age:9
number 2tasksink data: name:libai9 age:9
number 3taskcollect data: name:libai10 age:10
number 2taskcollect data: name:libai10 age:10
number 3taskfilter data: name:libai10 age:10 result:true
number 2taskfilter data: name:libai10 age:10 result:true
number 3tasksink data: name:libai10 age:10
number 2tasksink data: name:libai10 age:10
九.自定义分区器
我们都知道kafka中可以定义 计算分区的规则,来决定数据发往哪个分区,flink也同样可以
分区器代码
package com.juege.partitioner;
import org.apache.flink.api.common.functions.Partitioner;
public class DemoPartitioner implements Partitioner<Integer> {
@Override
public int partition(Integer age, int numPartitions) {
int partition = age % 3 ;
return partition;
}
}
就在之前的代码上加了这段代码
DataStream<Student> partitionedDataStream = streamOperator.partitionCustom(new DemoPartitioner(), (KeySelector<Student, Integer>) value -> value.age);
package com.juege;
import com.juege.operater.DemoFilter;
import com.juege.partitioner.DemoPartitioner;
import com.juege.pojo.Student;
import com.juege.sink.DemoSinkFunction;
import com.juege.source.DemoSourceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
// Press Shift twice to open the Search Everywhere dialog and type `show whitespaces`,
// then press Enter. You can now see whitespace characters in your code.
public class SimpleSourceAndSinkDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
simpleSourceAndSink(env);
}
private static void simpleSourceAndSink(StreamExecutionEnvironment env) throws Exception {
DataStreamSource<Student> studentDataStreamSource = env.addSource(new DemoSourceFunction());
SingleOutputStreamOperator<Student> streamOperator = studentDataStreamSource
.filter(new DemoFilter());
DataStream<Student> partitionedDataStream = streamOperator.partitionCustom(new DemoPartitioner(), (KeySelector<Student, Integer>) value -> value.age);
DataStreamSink<Student> studentDataStreamSink = partitionedDataStream.addSink(new DemoSinkFunction());
System.out.println(Runtime.getRuntime().availableProcessors());
System.out.println(studentDataStreamSource.getParallelism());
System.out.println(streamOperator.getParallelism());
System.out.println(studentDataStreamSink.getTransformation().getParallelism());
env.execute("第一个个人练习2年半的flink demo");
}
}
十.实战 统计某app的不同操作系统的新老用户数量
pojo
package com.juege.pojo;
import lombok.Data;
@Data
//@Builder
public class User {
public String device;
public String deviceType;
public String os;
public String event;
public Integer uid;
public String net;
public String channel;
public int ifNewUser;//1 means new
public String ip;
public long time;
public String version;
@Override
public String toString() {
return "User{" +
"device='" + device + '\'' +
", deviceType='" + deviceType + '\'' +
", os='" + os + '\'' +
", event='" + event + '\'' +
", uid=" + uid +
", net='" + net + '\'' +
", channel='" + channel + '\'' +
", ifNewUser=" + ifNewUser +
", ip='" + ip + '\'' +
", time=" + time +
", version='" + version + '\'' +
'}';
}
}
主体代码
package com.juege.apps;
import com.alibaba.fastjson.JSON;
import com.juege.pojo.User;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import java.util.List;
// Press Shift twice to open the Search Everywhere dialog and type `show whitespaces`,
// then press Enter. You can now see whitespace characters in your code.
public class CountNewUserofOSs {
public static final String datasource = "[\n" +
" {\n" +
" \"channel\": \"browser\",\n" +
" \"device\": \"xiaomi\",\n" +
" \"deviceType\": \"computer\",\n" +
" \"event\": \"order\",\n" +
" \"ifNewUser\": 1,\n" +
" \"ip\": \"35.62.33.21\",\n" +
" \"net\": \"wifi1\",\n" +
" \"os\": \"android\",\n" +
" \"time\": 1682467668447,\n" +
" \"uid\": 2,\n" +
" \"version\": \"1\"\n" +
" },\n" +
" {\n" +
" \"channel\": \"browser\",\n" +
" \"device\": \"xiaomi\",\n" +
" \"deviceType\": \"computer\",\n" +
" \"event\": \"startup\",\n" +
" \"ifNewUser\": 1,\n" +
" \"ip\": \"35.62.33.22\",\n" +
" \"net\": \"wifi2\",\n" +
" \"os\": \"android\",\n" +
" \"time\": 1682467668468,\n" +
" \"uid\": 2,\n" +
" \"version\": \"1\"\n" +
" },\n" +
" {\n" +
" \"channel\": \"browser\",\n" +
" \"device\": \"huawei\",\n" +
" \"deviceType\": \"mobile\",\n" +
" \"event\": \"order\",\n" +
" \"ifNewUser\": 1,\n" +
" \"ip\": \"35.62.33.23\",\n" +
" \"net\": \"wifi3\",\n" +
" \"os\": \"android\",\n" +
" \"time\": 1682467668468,\n" +
" \"uid\": 3,\n" +
" \"version\": \"1\"\n" +
" },\n" +
" {\n" +
" \"channel\": \"huawei mall\",\n" +
" \"device\": \"xiaomi\",\n" +
" \"deviceType\": \"computer\",\n" +
" \"event\": \"startup\",\n" +
" \"ifNewUser\": 1,\n" +
" \"ip\": \"35.62.33.24\",\n" +
" \"net\": \"wifi4\",\n" +
" \"os\": \"android\",\n" +
" \"time\": 1682467668468,\n" +
" \"uid\": 3,\n" +
" \"version\": \"1\"\n" +
" },\n" +
" {\n" +
" \"channel\": \"browser\",\n" +
" \"device\": \"xiaomi\",\n" +
" \"deviceType\": \"computer\",\n" +
" \"event\": \"order\",\n" +
" \"ifNewUser\": 1,\n" +
" \"ip\": \"35.62.33.25\",\n" +
" \"net\": \"wifi5\",\n" +
" \"os\": \"ios\",\n" +
" \"time\": 1682467668468,\n" +
" \"uid\": 5,\n" +
" \"version\": \"1\"\n" +
" },\n" +
" {\n" +
" \"channel\": \"browser\",\n" +
" \"device\": \"huawei\",\n" +
" \"deviceType\": \"mobile\",\n" +
" \"event\": \"startup\",\n" +
" \"ifNewUser\": 1,\n" +
" \"ip\": \"35.62.33.26\",\n" +
" \"net\": \"wifi6\",\n" +
" \"os\": \"android\",\n" +
" \"time\": 1682467668468,\n" +
" \"uid\": 3,\n" +
" \"version\": \"1\"\n" +
" },\n" +
" {\n" +
" \"channel\": \"browser\",\n" +
" \"device\": \"xiaomi\",\n" +
" \"deviceType\": \"computer\",\n" +
" \"event\": \"order\",\n" +
" \"ifNewUser\": 1,\n" +
" \"ip\": \"35.62.33.27\",\n" +
" \"net\": \"wifi7\",\n" +
" \"os\": \"android\",\n" +
" \"time\": 1682467668468,\n" +
" \"uid\": 3,\n" +
" \"version\": \"1\"\n" +
" },\n" +
" {\n" +
" \"channel\": \"huawei mall\",\n" +
" \"device\": \"xiaomi\",\n" +
" \"deviceType\": \"computer\",\n" +
" \"event\": \"startup\",\n" +
" \"ifNewUser\": 1,\n" +
" \"ip\": \"35.62.33.28\",\n" +
" \"net\": \"wifi8\",\n" +
" \"os\": \"android\",\n" +
" \"time\": 1682467668468,\n" +
" \"uid\": 1,\n" +
" \"version\": \"1\"\n" +
" },\n" +
" {\n" +
" \"channel\": \"browser\",\n" +
" \"device\": \"huawei\",\n" +
" \"deviceType\": \"mobile\",\n" +
" \"event\": \"order\",\n" +
" \"ifNewUser\": 1,\n" +
" \"ip\": \"35.62.33.29\",\n" +
" \"net\": \"wifi9\",\n" +
" \"os\": \"android\",\n" +
" \"time\": 1682467668468,\n" +
" \"uid\": 8,\n" +
" \"version\": \"1\"\n" +
" },\n" +
" {\n" +
" \"channel\": \"browser\",\n" +
" \"device\": \"xiaomi\",\n" +
" \"deviceType\": \"computer\",\n" +
" \"event\": \"startup\",\n" +
" \"ifNewUser\": 1,\n" +
" \"ip\": \"35.62.33.210\",\n" +
" \"net\": \"wifi10\",\n" +
" \"os\": \"ios\",\n" +
" \"time\": 1682467668468,\n" +
" \"uid\": 3,\n" +
" \"version\": \"1\"\n" +
" },\n" +
" {\n" +
" \"channel\": \"browser\",\n" +
" \"device\": \"xiaomi\",\n" +
" \"deviceType\": \"computer\",\n" +
" \"event\": \"order\",\n" +
" \"ifNewUser\": 1,\n" +
" \"ip\": \"35.62.33.11\",\n" +
" \"net\": \"wifi11\",\n" +
" \"os\": \"android\",\n" +
" \"time\": 1682467668468,\n" +
" \"uid\": 5,\n" +
" \"version\": \"1\"\n" +
" },\n" +
" {\n" +
" \"channel\": \"huawei mall\",\n" +
" \"device\": \"huawei\",\n" +
" \"deviceType\": \"mobile\",\n" +
" \"event\": \"startup\",\n" +
" \"ifNewUser\": 1,\n" +
" \"ip\": \"35.62.33.12\",\n" +
" \"net\": \"wifi12\",\n" +
" \"os\": \"android\",\n" +
" \"time\": 1682467668468,\n" +
" \"uid\": 6,\n" +
" \"version\": \"1\"\n" +
" },\n" +
" {\n" +
" \"channel\": \"browser\",\n" +
" \"device\": \"xiaomi\",\n" +
" \"deviceType\": \"computer\",\n" +
" \"event\": \"order\",\n" +
" \"ifNewUser\": 1,\n" +
" \"ip\": \"35.62.33.13\",\n" +
" \"net\": \"wifi13\",\n" +
" \"os\": \"android\",\n" +
" \"time\": 1682467668468,\n" +
" \"uid\": 9,\n" +
" \"version\": \"1\"\n" +
" },\n" +
" {\n" +
" \"channel\": \"browser\",\n" +
" \"device\": \"xiaomi\",\n" +
" \"deviceType\": \"computer\",\n" +
" \"event\": \"startup\",\n" +
" \"ifNewUser\": 1,\n" +
" \"ip\": \"35.62.33.14\",\n" +
" \"net\": \"wifi14\",\n" +
" \"os\": \"android\",\n" +
" \"time\": 1682467668468,\n" +
" \"uid\": 7,\n" +
" \"version\": \"1\"\n" +
" },\n" +
" {\n" +
" \"channel\": \"browser\",\n" +
" \"device\": \"huawei\",\n" +
" \"deviceType\": \"mobile\",\n" +
" \"event\": \"order\",\n" +
" \"ifNewUser\": 1,\n" +
" \"ip\": \"35.62.33.15\",\n" +
" \"net\": \"wifi15\",\n" +
" \"os\": \"ios\",\n" +
" \"time\": 1682467668468,\n" +
" \"uid\": 9,\n" +
" \"version\": \"1\"\n" +
" },\n" +
" {\n" +
" \"channel\": \"huawei mall\",\n" +
" \"device\": \"xiaomi\",\n" +
" \"deviceType\": \"computer\",\n" +
" \"event\": \"startup\",\n" +
" \"ifNewUser\": 1,\n" +
" \"ip\": \"35.62.33.16\",\n" +
" \"net\": \"wifi16\",\n" +
" \"os\": \"android\",\n" +
" \"time\": 1682467668468,\n" +
" \"uid\": 5,\n" +
" \"version\": \"1\"\n" +
" },\n" +
" {\n" +
" \"channel\": \"browser\",\n" +
" \"device\": \"xiaomi\",\n" +
" \"deviceType\": \"computer\",\n" +
" \"event\": \"order\",\n" +
" \"ifNewUser\": 1,\n" +
" \"ip\": \"35.62.33.17\",\n" +
" \"net\": \"wifi17\",\n" +
" \"os\": \"android\",\n" +
" \"time\": 1682467668468,\n" +
" \"uid\": 7,\n" +
" \"version\": \"1\"\n" +
" },\n" +
" {\n" +
" \"channel\": \"browser\",\n" +
" \"device\": \"huawei\",\n" +
" \"deviceType\": \"mobile\",\n" +
" \"event\": \"startup\",\n" +
" \"ifNewUser\": 1,\n" +
" \"ip\": \"35.62.33.18\",\n" +
" \"net\": \"wifi18\",\n" +
" \"os\": \"android\",\n" +
" \"time\": 1682467668468,\n" +
" \"uid\": 0,\n" +
" \"version\": \"1\"\n" +
" },\n" +
" {\n" +
" \"channel\": \"browser\",\n" +
" \"device\": \"xiaomi\",\n" +
" \"deviceType\": \"computer\",\n" +
" \"event\": \"order\",\n" +
" \"ifNewUser\": 1,\n" +
" \"ip\": \"35.62.33.19\",\n" +
" \"net\": \"wifi19\",\n" +
" \"os\": \"android\",\n" +
" \"time\": 1682467668468,\n" +
" \"uid\": 0,\n" +
" \"version\": \"1\"\n" +
" },\n" +
" {\n" +
" \"channel\": \"huawei mall\",\n" +
" \"device\": \"xiaomi\",\n" +
" \"deviceType\": \"computer\",\n" +
" \"event\": \"startup\",\n" +
" \"ifNewUser\": 1,\n" +
" \"ip\": \"35.62.33.20\",\n" +
" \"net\": \"wifi20\",\n" +
" \"os\": \"ios\",\n" +
" \"time\": 1682467668468,\n" +
" \"uid\": 3,\n" +
" \"version\": \"1\"\n" +
" },\n" +
" {\n" +
" \"channel\": \"browser\",\n" +
" \"device\": \"huawei\",\n" +
" \"deviceType\": \"mobile\",\n" +
" \"event\": \"order\",\n" +
" \"ifNewUser\": 1,\n" +
" \"ip\": \"35.62.33.21\",\n" +
" \"net\": \"wifi21\",\n" +
" \"os\": \"android\",\n" +
" \"time\": 1682467668468,\n" +
" \"uid\": 0,\n" +
" \"version\": \"1\"\n" +
" },\n" +
" {\n" +
" \"channel\": \"browser\",\n" +
" \"device\": \"xiaomi\",\n" +
" \"deviceType\": \"computer\",\n" +
" \"event\": \"startup\",\n" +
" \"ifNewUser\": 1,\n" +
" \"ip\": \"35.62.33.22\",\n" +
" \"net\": \"wifi22\",\n" +
" \"os\": \"android\",\n" +
" \"time\": 1682467668468,\n" +
" \"uid\": 4,\n" +
" \"version\": \"1\"\n" +
" },\n" +
" {\n" +
" \"channel\": \"browser\",\n" +
" \"device\": \"xiaomi\",\n" +
" \"deviceType\": \"computer\",\n" +
" \"event\": \"order\",\n" +
" \"ifNewUser\": 1,\n" +
" \"ip\": \"35.62.33.23\",\n" +
" \"net\": \"wifi23\",\n" +
" \"os\": \"android\",\n" +
" \"time\": 1682467668468,\n" +
" \"uid\": 5,\n" +
" \"version\": \"1\"\n" +
" },\n" +
" {\n" +
" \"channel\": \"huawei mall\",\n" +
" \"device\": \"huawei\",\n" +
" \"deviceType\": \"mobile\",\n" +
" \"event\": \"startup\",\n" +
" \"ifNewUser\": 1,\n" +
" \"ip\": \"35.62.33.24\",\n" +
" \"net\": \"wifi24\",\n" +
" \"os\": \"android\",\n" +
" \"time\": 1682467668468,\n" +
" \"uid\": 1,\n" +
" \"version\": \"1\"\n" +
" },\n" +
" {\n" +
" \"channel\": \"browser\",\n" +
" \"device\": \"xiaomi\",\n" +
" \"deviceType\": \"computer\",\n" +
" \"event\": \"order\",\n" +
" \"ifNewUser\": 0,\n" +
" \"ip\": \"35.62.33.25\",\n" +
" \"net\": \"wifi25\",\n" +
" \"os\": \"ios\",\n" +
" \"time\": 1682467668468,\n" +
" \"uid\": 6,\n" +
" \"version\": \"1\"\n" +
" },\n" +
" {\n" +
" \"channel\": \"browser\",\n" +
" \"device\": \"xiaomi\",\n" +
" \"deviceType\": \"computer\",\n" +
" \"event\": \"startup\",\n" +
" \"ifNewUser\": 0,\n" +
" \"ip\": \"35.62.33.26\",\n" +
" \"net\": \"wifi26\",\n" +
" \"os\": \"android\",\n" +
" \"time\": 1682467668468,\n" +
" \"uid\": 8,\n" +
" \"version\": \"1\"\n" +
" },\n" +
" {\n" +
" \"channel\": \"browser\",\n" +
" \"device\": \"huawei\",\n" +
" \"deviceType\": \"mobile\",\n" +
" \"event\": \"order\",\n" +
" \"ifNewUser\": 0,\n" +
" \"ip\": \"35.62.33.27\",\n" +
" \"net\": \"wifi27\",\n" +
" \"os\": \"android\",\n" +
" \"time\": 1682467668468,\n" +
" \"uid\": 2,\n" +
" \"version\": \"1\"\n" +
" },\n" +
" {\n" +
" \"channel\": \"huawei mall\",\n" +
" \"device\": \"xiaomi\",\n" +
" \"deviceType\": \"computer\",\n" +
" \"event\": \"startup\",\n" +
" \"ifNewUser\": 0,\n" +
" \"ip\": \"35.62.33.28\",\n" +
" \"net\": \"wifi28\",\n" +
" \"os\": \"android\",\n" +
" \"time\": 1682467668468,\n" +
" \"uid\": 6,\n" +
" \"version\": \"1\"\n" +
" },\n" +
" {\n" +
" \"channel\": \"browser\",\n" +
" \"device\": \"xiaomi\",\n" +
" \"deviceType\": \"computer\",\n" +
" \"event\": \"order\",\n" +
" \"ifNewUser\": 0,\n" +
" \"ip\": \"35.62.33.29\",\n" +
" \"net\": \"wifi29\",\n" +
" \"os\": \"android\",\n" +
" \"time\": 1682467668468,\n" +
" \"uid\": 0,\n" +
" \"version\": \"1\"\n" +
" },\n" +
" {\n" +
" \"channel\": \"browser\",\n" +
" \"device\": \"huawei\",\n" +
" \"deviceType\": \"mobile\",\n" +
" \"event\": \"startup\",\n" +
" \"ifNewUser\": 0,\n" +
" \"ip\": \"35.62.33.30\",\n" +
" \"net\": \"wifi30\",\n" +
" \"os\": \"ios\",\n" +
" \"time\": 1682467668468,\n" +
" \"uid\": 5,\n" +
" \"version\": \"1\"\n" +
" },\n" +
" {\n" +
" \"channel\": \"browser\",\n" +
" \"device\": \"xiaomi\",\n" +
" \"deviceType\": \"computer\",\n" +
" \"event\": \"order\",\n" +
" \"ifNewUser\": 0,\n" +
" \"ip\": \"35.62.33.31\",\n" +
" \"net\": \"wifi31\",\n" +
" \"os\": \"android\",\n" +
" \"time\": 1682467668468,\n" +
" \"uid\": 2,\n" +
" \"version\": \"1\"\n" +
" },\n" +
" {\n" +
" \"channel\": \"huawei mall\",\n" +
" \"device\": \"xiaomi\",\n" +
" \"deviceType\": \"computer\",\n" +
" \"event\": \"startup\",\n" +
" \"ifNewUser\": 0,\n" +
" \"ip\": \"35.62.33.32\",\n" +
" \"net\": \"wifi32\",\n" +
" \"os\": \"android\",\n" +
" \"time\": 1682467668468,\n" +
" \"uid\": 7,\n" +
" \"version\": \"1\"\n" +
" },\n" +
" {\n" +
" \"channel\": \"browser\",\n" +
" \"device\": \"huawei\",\n" +
" \"deviceType\": \"mobile\",\n" +
" \"event\": \"order\",\n" +
" \"ifNewUser\": 0,\n" +
" \"ip\": \"35.62.33.33\",\n" +
" \"net\": \"wifi33\",\n" +
" \"os\": \"android\",\n" +
" \"time\": 1682467668468,\n" +
" \"uid\": 5,\n" +
" \"version\": \"1\"\n" +
" },\n" +
" {\n" +
" \"channel\": \"browser\",\n" +
" \"device\": \"xiaomi\",\n" +
" \"deviceType\": \"computer\",\n" +
" \"event\": \"startup\",\n" +
" \"ifNewUser\": 0,\n" +
" \"ip\": \"35.62.33.34\",\n" +
" \"net\": \"wifi34\",\n" +
" \"os\": \"android\",\n" +
" \"time\": 1682467668468,\n" +
" \"uid\": 9,\n" +
" \"version\": \"1\"\n" +
" },\n" +
" {\n" +
" \"channel\": \"browser\",\n" +
" \"device\": \"xiaomi\",\n" +
" \"deviceType\": \"computer\",\n" +
" \"event\": \"order\",\n" +
" \"ifNewUser\": 0,\n" +
" \"ip\": \"35.62.33.35\",\n" +
" \"net\": \"wifi35\",\n" +
" \"os\": \"ios\",\n" +
" \"time\": 1682467668468,\n" +
" \"uid\": 7,\n" +
" \"version\": \"1\"\n" +
" },\n" +
" {\n" +
" \"channel\": \"huawei mall\",\n" +
" \"device\": \"huawei\",\n" +
" \"deviceType\": \"mobile\",\n" +
" \"event\": \"startup\",\n" +
" \"ifNewUser\": 0,\n" +
" \"ip\": \"35.62.33.36\",\n" +
" \"net\": \"wifi36\",\n" +
" \"os\": \"android\",\n" +
" \"time\": 1682467668468,\n" +
" \"uid\": 8,\n" +
" \"version\": \"1\"\n" +
" },\n" +
" {\n" +
" \"channel\": \"browser\",\n" +
" \"device\": \"xiaomi\",\n" +
" \"deviceType\": \"computer\",\n" +
" \"event\": \"order\",\n" +
" \"ifNewUser\": 0,\n" +
" \"ip\": \"35.62.33.37\",\n" +
" \"net\": \"wifi37\",\n" +
" \"os\": \"android\",\n" +
" \"time\": 1682467668468,\n" +
" \"uid\": 8,\n" +
" \"version\": \"1\"\n" +
" },\n" +
" {\n" +
" \"channel\": \"browser\",\n" +
" \"device\": \"xiaomi\",\n" +
" \"deviceType\": \"computer\",\n" +
" \"event\": \"startup\",\n" +
" \"ifNewUser\": 0,\n" +
" \"ip\": \"35.62.33.38\",\n" +
" \"net\": \"wifi38\",\n" +
" \"os\": \"android\",\n" +
" \"time\": 1682467668468,\n" +
" \"uid\": 4,\n" +
" \"version\": \"1\"\n" +
" },\n" +
" {\n" +
" \"channel\": \"browser\",\n" +
" \"device\": \"huawei\",\n" +
" \"deviceType\": \"mobile\",\n" +
" \"event\": \"order\",\n" +
" \"ifNewUser\": 0,\n" +
" \"ip\": \"35.62.33.39\",\n" +
" \"net\": \"wifi39\",\n" +
" \"os\": \"android\",\n" +
" \"time\": 1682467668468,\n" +
" \"uid\": 4,\n" +
" \"version\": \"1\"\n" +
" },\n" +
" {\n" +
" \"channel\": \"huawei mall\",\n" +
" \"device\": \"xiaomi\",\n" +
" \"deviceType\": \"computer\",\n" +
" \"event\": \"startup\",\n" +
" \"ifNewUser\": 0,\n" +
" \"ip\": \"35.62.33.40\",\n" +
" \"net\": \"wifi40\",\n" +
" \"os\": \"ios\",\n" +
" \"time\": 1682467668468,\n" +
" \"uid\": 6,\n" +
" \"version\": \"1\"\n" +
" }\n" +
"]\n" +
"\n";
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// env.setParallelism(3);
simpleSourceAndSink(env);
}
private static void simpleSourceAndSink(StreamExecutionEnvironment env) throws Exception {
env.addSource(new RichSourceFunction<User>() {
@Override
public void run(SourceContext<User> ctx) throws Exception {
List<User> users = JSON.parseArray(datasource, User.class);
users.forEach(user -> ctx.collect(user));
}
@Override
public void cancel() {
}
})
.map(new RichMapFunction<User, Tuple3<String, Integer, Integer>>() {
@Override
public Tuple3<String, Integer, Integer> map(User value) {
return Tuple3.of(value.os,value.ifNewUser,1);
}
})
.keyBy(new KeySelector<Tuple3<String, Integer, Integer>, Tuple2<String,Integer>>() {
@Override
public Tuple2<String, Integer> getKey(Tuple3<String, Integer, Integer> value) throws Exception {
return Tuple2.of(value.f0,value.f1);
}
})
.sum(2)
.print();
env.execute("个人练习2年半的flink demo");
}
}
十一.时间与窗口
为啥要有这东西呢,虽然这是实时计算流处理框架,但是批处理同样的支持
比如我要针对五秒一批的数据 进行分析统计 比如我要对 10个数据一批 进行操作
11.1时间
这里说的时间的主体是说的数据,分为三种
数据产生时间
数据进入flink的时间
数据在flink中被处理的时间
11.2窗口
滚动窗口进行 5秒一批的交易额统计 数据以空格分隔第二位为 订单的交易额
开启9527端口
[root@localhost ~]# nc -lk 9527
aa 45
cc 23
asd 12
dfg 678
a 2
5 34
package com.juege.timeandwindows;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
// Press Shift twice to open the Search Everywhere dialog and type `show whitespaces`,
// then press Enter. You can now see whitespace characters in your code.
public class TublingWindowDemoApp {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// env.setParallelism(3);
simpleSourceAndSink(env);
}
private static void simpleSourceAndSink(StreamExecutionEnvironment env) throws Exception {
env.socketTextStream("192.168.56.10",9527)
.map(new MapFunction<String, Tuple2<String,Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
String[] strings = value.split(" ");
return Tuple2.of(strings[0],Integer.valueOf(strings[1]));
}
})
// .keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
//
// @Override
// public String getKey(Tuple2<String, Integer> value) throws Exception {
// return value.f0;
// }
// })
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(8)))
.sum(1)
.print();
env.execute("个人练习2年半的flink demo");
}
}
可以看到控制台输出结果如下
4> (aa,80)
5> (dfg,714)
11.3滚动窗口与滑动窗口的区别
上面我们演示的滚动窗口,那么滑动窗口跟滚动窗口的区别是什么呢,在上面的滚动窗口中
每8秒滚动一次,同一个数据不会出现在两个窗口中,若是我们使用滑动窗口,我们设置滑动的
时间与窗口时间长度相同那么就等于滚动窗口,若是滑动间隔时间小于窗口大小,那么处于前一窗口后面的数据仍可能处于下一窗口中
11.4watermark简介 及与eventTime搭配 简单demo
watermark中文水位线的意思,与 eventTime搭配使用,watermark是触发窗口关闭的时机,
比方说现在 5秒为一个窗口 那么 来了一条超过4999毫秒的数据时则会关闭上个窗口开始计算,
如果加了watermark 比如说 为 2秒 那么 就会在来的数据eventtime大于等于 6999毫秒时才会触发上个窗口的关闭,关闭的同时只会计算5秒窗口内的数据
代码如下 五秒做一次 keyby+sum
package com.juege.timeandwindows;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
// Press Shift twice to open the Search Everywhere dialog and type `show whitespaces`,
// then press Enter. You can now see whitespace characters in your code.
public class WatermarkDemoApp {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// env.setParallelism(1);
simpleSourceAndSink(env);
}
private static void simpleSourceAndSink(StreamExecutionEnvironment env) throws Exception {
env.socketTextStream("192.168.56.10", 9527)
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {
@Override
public long extractTimestamp(String element) {
return Long.valueOf(element.split(" ")[0]);
}
})
.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
String[] splitStr = value.split(" ");
return Tuple2.of(splitStr[1], Integer.valueOf(splitStr[2]));
}
})
.keyBy(t -> t.f0)
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.sum(1)
.print();
env.execute("个人练习2年半的flink demo");
}
}
192.168.56.10 上的输入
[root@localhost ~]# nc -lk 9527
1000 2 15
2300 3 2
3400 2 3
4999 3 1
5200 a 1
5290 a 2
8900 a 4
9999 a 1
12000 b 2
13000 b 4
14999 b 1
15200 a 1
16000 b 1
17000 a 1
18000 b 2
19999 a 3
控制台输出结果如下,奇怪的是虽然做了sum 但是keyby没成功,原因待后续查证
11.5sideOutput
首先 中文翻译可以为 侧输出,他会把如下图所示,本应该在上个window执行的数据 但是因为网络等原因 在下个或者更后面的窗口才进入流,导致得不到处理这个时候我们的sideOutput派上用场
待会直接看看代码示例你就懂了
代码示例
package com.juege.timeandwindows;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
public class SideoutputDemoApp {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// env.setParallelism(1);
simpleSourceAndSink(env);
}
private static void simpleSourceAndSink(StreamExecutionEnvironment env) throws Exception {
final OutputTag<Tuple2<String, Integer>> outputTag = new OutputTag<>("side-output") {
};
SingleOutputStreamOperator<String> streamOperator = env.socketTextStream("192.168.56.10", 9527)
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {
@Override
public long extractTimestamp(String element) {
return Long.valueOf(element.split(" ")[0]);
}
})
.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
String[] splitStr = value.split(" ");
return Tuple2.of(splitStr[1], Integer.valueOf(splitStr[2]));
}
})
.keyBy(t -> t.f0)
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.sideOutputLateData(outputTag)
.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
return Tuple2.of(value1.f0, value1.f1 + value2.f1);
}
},
new ProcessAllWindowFunction<Tuple2<String, Integer>, String, TimeWindow>() {
@Override
public void process(ProcessAllWindowFunction<Tuple2<String, Integer>, String, TimeWindow>.Context context, Iterable<Tuple2<String, Integer>> elements, Collector<String> out) throws Exception {
elements.forEach(t -> out.collect("key:" + t.f0 + "value:" + t.f1));
}
});
streamOperator.print();
//测流数据打印
streamOperator.getSideOutput(outputTag).printToErr();
env.execute("个人练习2年半的flink demo");
}
}
192.168.56.10 上的输入
[root@localhost ~]# nc -lk 9527
1000 aa 1
2000 aa 2
4000 aa 3
4999 aa 4
3000 aa 2
4560 aa 1
控制台输出,可以看到 侧流数据我用红体字打印出来了,本该出现在第一个5秒窗口的两笔数据因为延迟到达,错过了窗口的开放期,所以被收集到了侧流中去了,实际项目开发中我们一般会专门对窗口或者侧流数据进行相应处理而不是这里的简单的打印
十二.state
在flink中state用来存放计算过程的节点中间结果或元数据等
12.1ValueState求小批次数据平均值代码示例
实现的效果就是数据源数据类型为Tuple2,以f0分组 每三个数做一次平均数计算 然后 sink输出
package com.juege.state;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
public class ValueStateDemoApp {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
simpleSourceAndSink(env);
}
private static void simpleSourceAndSink(StreamExecutionEnvironment env) throws Exception {
ArrayList<Tuple2<Long, Long>> data = new ArrayList<>();
data.add(Tuple2.of(1L,1L));
data.add(Tuple2.of(1L,3L));
data.add(Tuple2.of(2L,1L));
data.add(Tuple2.of(2L,6L));
data.add(Tuple2.of(2L,7L));
data.add(Tuple2.of(1L,33L));
env.fromCollection(data)
.keyBy(t -> t.f0)
.flatMap(new MyFlatMapFunction())
.print();
env.execute("个人练习2年半的flink demo");
}
}
package com.juege.state;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import java.io.IOException;
public class MyFlatMapFunction extends RichFlatMapFunction<Tuple2<Long,Long>,Tuple2<Long,Double>> {
private transient ValueState<Tuple2<Long,Long>> valueState;
@Override
public void open(Configuration parameters) throws IOException {
ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("AVG", TypeInformation.of(new TypeHint<Tuple2<Long,Long>>(){}));
valueState = getRuntimeContext().getState(valueStateDescriptor);
}
@Override
public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Double>> out) throws Exception {
//f0 = count f1 = sum
Tuple2<Long, Long> sumState = valueState.value();
if(sumState == null){
valueState.update(Tuple2.of(0L,0L));
}
sumState = valueState.value();
sumState.f0 += 1;
sumState.f1 += value.f1;
valueState.update(sumState);
if(sumState.f0 >= 3){
out.collect(Tuple2.of(value.f0,sumState.f1/sumState.f0.doubleValue()));
valueState.clear();
}
}
}
输出结果如下
12.2把以上代码改成MapState方式
很简单,valuestate就是只能存值 而 MapState 还可以多存一个键
那么,把上面这个例子改造的话 就加个没用的键就好了,我们这选UUID
package com.juege.state;
import org.apache.commons.compress.utils.Lists;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import java.io.IOException;
import java.util.ArrayList;
import java.util.UUID;
public class MyMapStateFlatMapFunction extends RichFlatMapFunction<Tuple2<Long,Long>,Tuple2<Long,Double>> {
private transient MapState<String,Long> mapState;
@Override
public void open(Configuration parameters) throws IOException {
MapStateDescriptor mapStateDescriptor = new MapStateDescriptor("AVG", TypeInformation.of(new TypeHint<String>(){}), TypeInformation.of(new TypeHint<Tuple2<String, Long>>(){}));
mapState = getRuntimeContext().getMapState(mapStateDescriptor);
}
@Override
public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Double>> out) throws Exception {
mapState.put(UUID.randomUUID().toString(),value.f1);
ArrayList<Long> arrayList = Lists.newArrayList(mapState.values().iterator());
if(arrayList.size() >= 3){
int sum = 0;
for (int i = 0; i < arrayList.size(); i++) {
sum+=arrayList.get(i);
}
out.collect(Tuple2.of(value.f0,sum/Double.valueOf(arrayList.size())));
mapState.clear();
}
}
}
12.3Checkpoint
checkpoint 中文直译 检查点 是一种状态的容错 机制, 就比如说 你的流消费数据到一半,然后突然异常停止了,那么恢复后也能从之前那个点继续消费,这也是流式处理框架的必备
下面举例说明几种应用场景
12.3.1异常不自动停止程序
如下代码监听 socket "192.168.56.10", 9527端口 的数据
package com.juege.checkpoint;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
// Press Shift twice to open the Search Everywhere dialog and type `show whitespaces`,
// then press Enter. You can now see whitespace characters in your code.
public class CheckpointDemoApp {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// env.setParallelism(1);
simpleSourceAndSink(env);
}
private static void simpleSourceAndSink(StreamExecutionEnvironment env) throws Exception {
env.socketTextStream("192.168.56.10", 9527)
.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
String[] splitStr = value.split(" ");
return Tuple2.of(splitStr[1], Integer.valueOf(splitStr[2]));
}
})
.keyBy(t -> t.f0)
.sum(1)
.print();
env.execute("个人练习2年半的flink demo");
}
}
由于我在对应服务器并没有开启所以报错
加了checkpoint会不断的重试不会终止程序
比如如下就是 输入pk抛异常 但是程序不终止,继续输入后 还可以与之前的结果累加
12.3.2checkpoint配合重启策略
最大重启三次 每五秒重启一次
env.enableCheckpointing(5000);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,
Time.seconds(5)));
12.4State Backends
人话就是 state数据存哪 默认是存内存,也就是下图第一种,重启就会丢 所以生产不用
然后还有fs 文件系统 或者rocksDB