爆肝七七四十九天 flink笔记 就决定用你了

一.前言

大致的技术架构图

项目效果图

实现的功能

 

解决了哪些技术问题

 

性能调优

 学习收获

 环境参数

 前置知识

 目录

 计算框架的发展

二.flink入门介绍

2.1什么是flink

简而言之,他是个有状态的 可以处理流或者批的分布式数据处理引擎框架,

可以再任何通用集群环境下以任意规模以内存级别的运行

2.2处理有界与无界数据的区别

 2.3部署在任何地方

可以使用yarn,mesos,k8s等方式构建集群

2.4以任何规模运行

每天可处理万亿数据,可以处理TB级别的状态,可以运行在上千个core上

 2.5内存级别的性能

状态保存在内存中

 2.6api的分层

三.flink项目开发实战

3.1使用gradle构建多模块项目

创建父工程 

创建子模块

然后项目架子大概就这样

3.2第一个五章俱全的demo

DemoSourceFunction 数据源

package com.juege.source;

import com.juege.pojo.Student;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

public class DemoSourceFunction extends RichSourceFunction<Student> {
    @Override
    public void run(SourceContext<Student> ctx) throws Exception {

        for (int i = 0; i < 10; i++) {
            Student student = new Student(i,"libai"+i+"");
            ctx.collect(student);
        }
        

    }

    @Override
    public void cancel() {

    }
}

 中间处理数据的算子 这里是过滤小于年龄五十岁的学生

package com.juege.operater;

import com.juege.pojo.Student;
import org.apache.flink.api.common.functions.RichFilterFunction;

public class DemoFilter extends RichFilterFunction<Student> {
    @Override
    public boolean filter(Student student) {
        return student.age > 50;
    }
}

 DemoSinkFunction 数据一般落库或者发到下游 这里简单演示只是个输出

package com.juege.sink;

import com.juege.pojo.Student;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

public class DemoSinkFunction extends RichSinkFunction<Student> {

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
    }

    @Override
    public void invoke(Student student, Context context) throws Exception {
        //正常这里会把数据落库或者发到下游
        System.out.println("I am a student"+student.toString());
    }
}

 main方法运行

可以看到需要构建env 添加数source operator sink 等 然后执行

package com.juege;

import com.juege.operater.DemoFilter;
import com.juege.pojo.Student;
import com.juege.sink.DemoSinkFunction;
import com.juege.source.DemoSourceFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

// Press Shift twice to open the Search Everywhere dialog and type `show whitespaces`,
// then press Enter. You can now see whitespace characters in your code.
public class SimpleSourceAndSinkDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

                simpleSourceAndSink(env);


    }

    private static void simpleSourceAndSink(StreamExecutionEnvironment env) throws Exception {
        DataStreamSource<Student> studentDataStreamSource = env.addSource(new DemoSourceFunction());
        studentDataStreamSource
                .filter(new DemoFilter())
                .addSink(new DemoSinkFunction());

        env.execute("第一个个人练习2年半的flink demo");
    }
}

控制台输出结果 可以看到 小于五十岁的数据都被过滤然后学生信息被打印出来了

 3.3根据flink编程模型联想其它同类框架

3.4并行度

话不多说,先看如下代码

package com.juege;

import com.juege.operater.DemoFilter;
import com.juege.pojo.Student;
import com.juege.sink.DemoSinkFunction;
import com.juege.source.DemoSourceFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

// Press Shift twice to open the Search Everywhere dialog and type `show whitespaces`,
// then press Enter. You can now see whitespace characters in your code.
public class SimpleSourceAndSinkDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        env.setParallelism(6);

        simpleSourceAndSink(env);


    }

    private static void simpleSourceAndSink(StreamExecutionEnvironment env) throws Exception {
        DataStreamSource<Student> studentDataStreamSource = env.addSource(new DemoSourceFunction());


        SingleOutputStreamOperator<Student> streamOperator = studentDataStreamSource
                .filter(new DemoFilter());

        DataStreamSink<Student> studentDataStreamSink = streamOperator.addSink(new DemoSinkFunction());

        System.out.println(studentDataStreamSink.getTransformation().getParallelism());
        System.out.println(Runtime.getRuntime().availableProcessors());
        System.out.println(studentDataStreamSource.getParallelism());
        System.out.println(streamOperator.getParallelism());


        env.execute("第一个个人练习2年半的flink demo");
    }
}

可以看到 控制台输出结果如下 

这说明了什么?

默认并行度

是可用核心数然后source的 并行度默认为1

自定义并行度

比如我们想默认并行度改为6 就如下直接在env上设置即可

 可以看到除了source其它的filter 与 sink的并行度都为6了

那么我们想让source的并行度也为6该咋整,可以看到修改父类改成RichParallelSourceFunction就行,但是可以看到控制台输出的数据, student的数据量相当于翻了6倍,这也就是为啥source默认整成1的原因,如果source要并行度大于1比如kafka 6个分区 那么 需要实现 ParallelSourceFunction或者RichParallelSourceFunction

3.5合理的并行度设置

设一共有3个TaskManager,每一个TaskManager中的分配3个TaskSlot,也就是每个TaskManager可以接收3个task,一共9个TaskSlot,如果我们设置parallelism.default=1,即运行程序默认的并行度为1,9个TaskSlot只用了1个,有8个空闲,因此,设置合适的并行度才能提高效率。

四.flink架构

必备的component为如下

client:提交job 

jobmanager 派活 全局的调度 

taskmanager 干活的工头  一般一个taskmanager处于一个jvm

taskmanager下还有slot 正常一个slot对应一个干活的线程

额外的component就是下面的黄色的几个 比如资源的管理 比如metrics的存储

 如下也可以看看官方文档的解释

ComponentPurposeImplementations
Flink ClientCompiles batch or streaming applications into a dataflow graph, which it then submits to the JobManager.
JobManagerJobManager is the name of the central work coordination component of Flink. It has implementations for different resource providers, which differ on high-availability, resource allocation behavior and supported job submission modes.
JobManager modes for job submissions:
  • Application Mode: runs the cluster exclusively for one application. The job's main method (or client) gets executed on the JobManager. Calling `execute`/`executeAsync` multiple times in an application is supported.
  • Per-Job Mode: runs the cluster exclusively for one job. The job's main method (or client) runs only prior to the cluster creation.
  • Session Mode: one JobManager instance manages multiple jobs sharing the same cluster of TaskManagers
TaskManagerTaskManagers are the services actually performing the work of a Flink job.
External Components (all optional)
High Availability Service ProviderFlink's JobManager can be run in high availability mode which allows Flink to recover from JobManager faults. In order to failover faster, multiple standby JobManagers can be started to act as backups.
File Storage and PersistencyFor checkpointing (recovery mechanism for streaming jobs) Flink relies on external file storage systemsSee FileSystems page.
Resource ProviderFlink can be deployed through different Resource Provider Frameworks, such as Kubernetes or YARN.See JobManager implementations above.
Metrics StorageFlink components report internal metrics and Flink jobs can report additional, job specific metrics as well.See Metrics Reporter page.
Application-level data sources and sinksWhile application-level data sources and sinks are not technically part of the deployment of Flink cluster components, they should be considered when planning a new Flink production deployment. Colocating frequently used data with Flink can have significant performance benefitsFor example:
  • Apache Kafka
  • Amazon S3
  • Elasticsearch
  • Apache Cassandra
See Connectors page.

五.单节点部署及访问ui界面

步骤

先官网下载flink Downloads | Apache Flink

然后启动bin目录下的 start-cluster.sh 记得之前要配置jdk环境变量

[root@localhost bin]# java -version
java version "12.0.1" 2019-04-16
Java(TM) SE Runtime Environment (build 12.0.1+12)
Java HotSpot(TM) 64-Bit Server VM (build 12.0.1+12, mixed mode, sharing)
[root@localhost opt]# cd flink-1.10.1-bin-scala_2.12/
[root@localhost flink-1.10.1-bin-scala_2.12]# ls
flink-1.10.1
[root@localhost flink-1.10.1-bin-scala_2.12]# cd flink-1.10.1/
[root@localhost flink-1.10.1]# ls
bin  conf  examples  flink-test-1.0-SNAPSHOT.jar  lib  LICENSE  licenses  log  NOTICE  opt  plugins  README.txt
[root@localhost flink-1.10.1]# cd bin/
[root@localhost bin]# ls
bash-java-utils.jar  flink             historyserver.sh     kubernetes-session.sh   mesos-taskmanager.sh       sql-client.sh      start-scala-shell.sh       stop-zookeeper-quorum.sh  zookeeper.sh
config.sh            flink-console.sh  jobmanager.sh        mesos-appmaster-job.sh  pyflink-gateway-server.sh  standalone-job.sh  start-zookeeper-quorum.sh  taskmanager.sh
find-flink-home.sh   flink-daemon.sh   kubernetes-entry.sh  mesos-appmaster.sh      pyflink-shell.sh           start-cluster.sh   stop-cluster.sh            yarn-session.sh
[root@localhost bin]# ./start-cluster.sh

访问ui界面,默认是8081端口 也可以在config下的flink-conf.yaml里配置

http://192.168.56.10:8081/

预览图如下,可以看到我们之前说的 jobmanager 以及taskmanager都有 

然后可以提交job 编译成jar包后提交时写明启动的main class 然后加上参数即可

六.connectors

flink好用的一个点还有他封装好了一些对一些比较流行的数据库与MQ的连接的封装,比如 如果没有这个

封装,那么我们需要自己手动写更多的代码 这里flink封装好 更加便于使用

 这里拿kafka举例,引入如下依赖然后使用如下代码即可将数据源改为kafka

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
    <version>1.17.0</version>
</dependency>
KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers(brokers)
    .setTopics("input-topic")
    .setGroupId("my-group")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

七.operators

https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/operators/overview/

 比如我们常用的map,flatmap,filter,keyby等等,相信了解jdk8中的stream流或者有过其它大数据产品经验 比如hadoop的小伙伴应该很熟悉,这里不过多讲解,示例如下

map

DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
    @Override
    public Integer map(Integer value) throws Exception {
        return 2 * value;
    }
});

 flatmap

dataStream.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public void flatMap(String value, Collector<String> out)
        throws Exception {
        for(String word: value.split(" ")){
            out.collect(word);
        }
    }
});

map

map可以理解为映射,对每个元素进行一定的变换后,映射为另一个元素。

flatmap

flatmap可以理解为将元素摊平,每个元素可以变为0个、1个、或者多个元素。

filter

filter是进行筛选。

keyBy

逻辑上将Stream根据指定的Key进行分区,是根据key的散列值进行分区的。

reduce

reduce是归并操作,它可以将KeyedStream 转变为 DataStream。

fold

给定一个初始值,将各个元素逐个归并计算。它将KeyedStream转变为DataStream。

union

union可以将多个流合并到一个流中,以便对合并的流进行统一处理。是对多个流的水平拼接。

参与合并的流必须是同一种类型。

join

根据指定的Key将两个流进行关联。

coGroup

关联两个流,关联不上的也保留下来。

connect

参考: https://www.jianshu.com/p/5b0574d466f8

将两个流纵向地连接起来。DataStream的connect操作创建的是ConnectedStreams或BroadcastConnectedStream,它用了两个泛型,即不要求两个dataStream的element是同一类型。

split

参考: https://cloud.tencent.com/developer/article/1382892

union多流合并 之后可以看作一个流

connect由于 合并后相当于还是俩流 所以有与之对应的类与算子方法 比如CoMapFunction ,就是对俩流分别处理还有 CoFlatMapFunction

八.生命周期方法

生命周期方法只在 继承 类名中带Rich的算子中才有

如下,open方法调用的

package com.juege.operater;

import com.juege.pojo.Student;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;

public class DemoFilter extends RichFilterFunction<Student> {

    @Override
    public boolean filter(Student student) {
        System.out.println("I am the number"+getRuntimeContext().getIndexOfThisSubtask()+" task");
        return student.age > 50;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
    }

    @Override
    public void close() throws Exception {
        super.close();
    }

    @Override
    public RuntimeContext getRuntimeContext() {
        return super.getRuntimeContext();
    }
}

我将以如下案例让大家看看 这些方法的作用以及 他们执行的时机与次数

先展示用到的类代码

pojo

package com.juege.pojo;

import lombok.Data;

@Data
public class Student {

    public int age;

    public String name;

    public Student(int age, String name) {
        this.age = age;
        this.name = name;
    }


    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        return "name:" + name + " age:" + age;
    }

}

source

package com.juege.source;

import com.juege.pojo.Student;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

public class DemoSourceFunction extends RichParallelSourceFunction<Student> {

    @Override
    public void open(Configuration parameters) throws Exception {
        System.out.println("number " + (getRuntimeContext().getIndexOfThisSubtask() + 1) + "task" + "execute open method in DemoSinkFunction class !");
        super.open(parameters);
    }

    @Override
    public void run(SourceContext<Student> ctx) {

        for (int i = 1; i < 11; i++) {
            Student student = new Student(i, "libai" + i + "");

            System.out.println("number " + (getRuntimeContext().getIndexOfThisSubtask() + 1) + "task" + "collect data: " + student);

            ctx.collect(student);
        }


    }

    @Override
    public void cancel() {

    }


    @Override
    public RuntimeContext getRuntimeContext() {
        return super.getRuntimeContext();
    }
}

filter

package com.juege.operater;

import com.juege.pojo.Student;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;

public class DemoFilter extends RichFilterFunction<Student> {

    @Override
    public boolean filter(Student student) {
        System.out.println("number " + (getRuntimeContext().getIndexOfThisSubtask() + 1) + "task"
                + "filter data: " + student + " result:" + (student.age > 10));

        return student.age > 5;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        System.out.println("number " + (getRuntimeContext().getIndexOfThisSubtask() + 1) + "task"
                + "execute open method in DemoFilter class !");
        super.open(parameters);
    }

    @Override
    public void close() throws Exception {
        super.close();
    }

    @Override
    public RuntimeContext getRuntimeContext() {
        return super.getRuntimeContext();
    }
}

sink 

package com.juege.sink;

import com.juege.pojo.Student;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

public class DemoSinkFunction extends RichSinkFunction<Student> {

    @Override
    public void open(Configuration parameters) throws Exception {
        System.out.println("number " + (getRuntimeContext().getIndexOfThisSubtask() + 1) + "task"
                + "execute open method in DemoSinkFunction class !");
        super.open(parameters);
    }

    @Override
    public void invoke(Student student, Context context) {
        //正常这里会把数据落库或者发到下游
        System.out.println("number " + (getRuntimeContext().getIndexOfThisSubtask() + 1) + "task"
                + "sink data: " + student);
    }

    @Override
    public RuntimeContext getRuntimeContext() {
        return super.getRuntimeContext();
    }
}

启动类

package com.juege;

import com.juege.operater.DemoFilter;
import com.juege.pojo.Student;
import com.juege.sink.DemoSinkFunction;
import com.juege.source.DemoSourceFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

// Press Shift twice to open the Search Everywhere dialog and type `show whitespaces`,
// then press Enter. You can now see whitespace characters in your code.
public class SimpleSourceAndSinkDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);

        simpleSourceAndSink(env);


    }

    private static void simpleSourceAndSink(StreamExecutionEnvironment env) throws Exception {
        DataStreamSource<Student> studentDataStreamSource = env.addSource(new DemoSourceFunction());


        SingleOutputStreamOperator<Student> streamOperator = studentDataStreamSource
                .filter(new DemoFilter());

        DataStreamSink<Student> studentDataStreamSink = streamOperator.addSink(new DemoSinkFunction());

        System.out.println(Runtime.getRuntime().availableProcessors());
        System.out.println(studentDataStreamSource.getParallelism());
        System.out.println(streamOperator.getParallelism());
        System.out.println(studentDataStreamSink.getTransformation().getParallelism());


        env.execute("第一个个人练习2年半的flink demo");
    }
}

输出结果:可以看到 当我们都使用parallel修饰的父类的时候并行度统一的变成了 3,

source也是,继续往下看 open方法在每个任务执行之前就执行了,并且执行次数与并行度数一致,一般open方法中做一些初始化配置,获取连接等事情,再接着往下看,可以看到我们的数据来源变成了三份 然后每份数据 都会走一遍各个算子的方法 如果filter通过就到sink

> Task :flink-basic:SimpleSourceAndSinkDemo.main()
12
3
3
3
number 3taskexecute open method in DemoSinkFunction class !
number 1taskexecute open method in DemoSinkFunction class !
number 2taskexecute open method in DemoSinkFunction class !
number 3taskexecute open method in DemoFilter class !
number 2taskexecute open method in DemoFilter class !
number 1taskexecute open method in DemoFilter class !
number 3taskexecute open method in DemoSinkFunction class !
number 2taskexecute open method in DemoSinkFunction class !
number 1taskexecute open method in DemoSinkFunction class !
number 3taskcollect data: name:libai1 age:1
number 2taskcollect data: name:libai1 age:1
number 1taskcollect data: name:libai1 age:1
number 2taskfilter data: name:libai1 age:1 result:false
number 1taskfilter data: name:libai1 age:1 result:false
number 3taskfilter data: name:libai1 age:1 result:false
number 1taskcollect data: name:libai2 age:2
number 2taskcollect data: name:libai2 age:2
number 3taskcollect data: name:libai2 age:2
number 2taskfilter data: name:libai2 age:2 result:false
number 1taskfilter data: name:libai2 age:2 result:false
number 3taskfilter data: name:libai2 age:2 result:false
number 2taskcollect data: name:libai3 age:3
number 3taskcollect data: name:libai3 age:3
number 2taskfilter data: name:libai3 age:3 result:false
number 1taskcollect data: name:libai3 age:3
number 2taskcollect data: name:libai4 age:4
number 1taskfilter data: name:libai3 age:3 result:false
number 2taskfilter data: name:libai4 age:4 result:false
number 3taskfilter data: name:libai3 age:3 result:false
number 2taskcollect data: name:libai5 age:5
number 1taskcollect data: name:libai4 age:4
number 2taskfilter data: name:libai5 age:5 result:false
number 1taskfilter data: name:libai4 age:4 result:false
number 3taskcollect data: name:libai4 age:4
number 1taskcollect data: name:libai5 age:5
number 3taskfilter data: name:libai4 age:4 result:false
number 2taskcollect data: name:libai6 age:6
number 1taskfilter data: name:libai5 age:5 result:false
number 2taskfilter data: name:libai6 age:6 result:true
number 3taskcollect data: name:libai5 age:5
number 1taskcollect data: name:libai6 age:6
number 3taskfilter data: name:libai5 age:5 result:false
number 1taskfilter data: name:libai6 age:6 result:true
number 3taskcollect data: name:libai6 age:6
number 3taskfilter data: name:libai6 age:6 result:true
number 2tasksink data: name:libai6 age:6
number 3tasksink data: name:libai6 age:6
number 1tasksink data: name:libai6 age:6
number 2taskcollect data: name:libai7 age:7
number 1taskcollect data: name:libai7 age:7
number 3taskcollect data: name:libai7 age:7
number 1taskfilter data: name:libai7 age:7 result:true
number 2taskfilter data: name:libai7 age:7 result:true
number 1tasksink data: name:libai7 age:7
number 3taskfilter data: name:libai7 age:7 result:true
number 1taskcollect data: name:libai8 age:8
number 2tasksink data: name:libai7 age:7
number 1taskfilter data: name:libai8 age:8 result:true
number 3tasksink data: name:libai7 age:7
number 1tasksink data: name:libai8 age:8
number 2taskcollect data: name:libai8 age:8
number 1taskcollect data: name:libai9 age:9
number 3taskcollect data: name:libai8 age:8
number 1taskfilter data: name:libai9 age:9 result:true
number 3taskfilter data: name:libai8 age:8 result:true
number 1tasksink data: name:libai9 age:9
number 2taskfilter data: name:libai8 age:8 result:true
number 1taskcollect data: name:libai10 age:10
number 3tasksink data: name:libai8 age:8
number 2tasksink data: name:libai8 age:8
number 3taskcollect data: name:libai9 age:9
number 1taskfilter data: name:libai10 age:10 result:true
number 2taskcollect data: name:libai9 age:9
number 1tasksink data: name:libai10 age:10
number 3taskfilter data: name:libai9 age:9 result:true
number 2taskfilter data: name:libai9 age:9 result:true
number 3tasksink data: name:libai9 age:9
number 2tasksink data: name:libai9 age:9
number 3taskcollect data: name:libai10 age:10
number 2taskcollect data: name:libai10 age:10
number 3taskfilter data: name:libai10 age:10 result:true
number 2taskfilter data: name:libai10 age:10 result:true
number 3tasksink data: name:libai10 age:10
number 2tasksink data: name:libai10 age:10

九.自定义分区器

我们都知道kafka中可以定义 计算分区的规则,来决定数据发往哪个分区,flink也同样可以

分区器代码

package com.juege.partitioner;

import org.apache.flink.api.common.functions.Partitioner;

public class DemoPartitioner implements Partitioner<Integer> {
    @Override
    public int partition(Integer age, int numPartitions) {
        int partition = age % 3 ;

        return partition;
    }
}

就在之前的代码上加了这段代码 

 DataStream<Student> partitionedDataStream = streamOperator.partitionCustom(new DemoPartitioner(), (KeySelector<Student, Integer>) value -> value.age);

package com.juege;

import com.juege.operater.DemoFilter;
import com.juege.partitioner.DemoPartitioner;
import com.juege.pojo.Student;
import com.juege.sink.DemoSinkFunction;
import com.juege.source.DemoSourceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

// Press Shift twice to open the Search Everywhere dialog and type `show whitespaces`,
// then press Enter. You can now see whitespace characters in your code.
public class SimpleSourceAndSinkDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);

        simpleSourceAndSink(env);


    }

    private static void simpleSourceAndSink(StreamExecutionEnvironment env) throws Exception {
        DataStreamSource<Student> studentDataStreamSource = env.addSource(new DemoSourceFunction());

        SingleOutputStreamOperator<Student> streamOperator = studentDataStreamSource
                .filter(new DemoFilter());

        DataStream<Student> partitionedDataStream = streamOperator.partitionCustom(new DemoPartitioner(), (KeySelector<Student, Integer>) value -> value.age);

        DataStreamSink<Student> studentDataStreamSink = partitionedDataStream.addSink(new DemoSinkFunction());

        System.out.println(Runtime.getRuntime().availableProcessors());
        System.out.println(studentDataStreamSource.getParallelism());
        System.out.println(streamOperator.getParallelism());
        System.out.println(studentDataStreamSink.getTransformation().getParallelism());


        env.execute("第一个个人练习2年半的flink demo");
    }
}

十.实战 统计某app的不同操作系统的新老用户数量 

pojo

package com.juege.pojo;

import lombok.Data;

@Data
//@Builder
public class User {

    public String device;

    public String deviceType;

    public String os;

    public String event;

    public Integer uid;

    public String net;

    public String channel;

    public int ifNewUser;//1 means new

    public String ip;

    public long time;

    public String version;

    @Override
    public String toString() {
        return "User{" +
                "device='" + device + '\'' +
                ", deviceType='" + deviceType + '\'' +
                ", os='" + os + '\'' +
                ", event='" + event + '\'' +
                ", uid=" + uid +
                ", net='" + net + '\'' +
                ", channel='" + channel + '\'' +
                ", ifNewUser=" + ifNewUser +
                ", ip='" + ip + '\'' +
                ", time=" + time +
                ", version='" + version + '\'' +
                '}';
    }
}

主体代码

package com.juege.apps;

import com.alibaba.fastjson.JSON;
import com.juege.pojo.User;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.util.List;

// Press Shift twice to open the Search Everywhere dialog and type `show whitespaces`,
// then press Enter. You can now see whitespace characters in your code.
public class CountNewUserofOSs {

    public static final String datasource = "[\n" +
            "  {\n" +
            "    \"channel\": \"browser\",\n" +
            "    \"device\": \"xiaomi\",\n" +
            "    \"deviceType\": \"computer\",\n" +
            "    \"event\": \"order\",\n" +
            "    \"ifNewUser\": 1,\n" +
            "    \"ip\": \"35.62.33.21\",\n" +
            "    \"net\": \"wifi1\",\n" +
            "    \"os\": \"android\",\n" +
            "    \"time\": 1682467668447,\n" +
            "    \"uid\": 2,\n" +
            "    \"version\": \"1\"\n" +
            "  },\n" +
            "  {\n" +
            "    \"channel\": \"browser\",\n" +
            "    \"device\": \"xiaomi\",\n" +
            "    \"deviceType\": \"computer\",\n" +
            "    \"event\": \"startup\",\n" +
            "    \"ifNewUser\": 1,\n" +
            "    \"ip\": \"35.62.33.22\",\n" +
            "    \"net\": \"wifi2\",\n" +
            "    \"os\": \"android\",\n" +
            "    \"time\": 1682467668468,\n" +
            "    \"uid\": 2,\n" +
            "    \"version\": \"1\"\n" +
            "  },\n" +
            "  {\n" +
            "    \"channel\": \"browser\",\n" +
            "    \"device\": \"huawei\",\n" +
            "    \"deviceType\": \"mobile\",\n" +
            "    \"event\": \"order\",\n" +
            "    \"ifNewUser\": 1,\n" +
            "    \"ip\": \"35.62.33.23\",\n" +
            "    \"net\": \"wifi3\",\n" +
            "    \"os\": \"android\",\n" +
            "    \"time\": 1682467668468,\n" +
            "    \"uid\": 3,\n" +
            "    \"version\": \"1\"\n" +
            "  },\n" +
            "  {\n" +
            "    \"channel\": \"huawei mall\",\n" +
            "    \"device\": \"xiaomi\",\n" +
            "    \"deviceType\": \"computer\",\n" +
            "    \"event\": \"startup\",\n" +
            "    \"ifNewUser\": 1,\n" +
            "    \"ip\": \"35.62.33.24\",\n" +
            "    \"net\": \"wifi4\",\n" +
            "    \"os\": \"android\",\n" +
            "    \"time\": 1682467668468,\n" +
            "    \"uid\": 3,\n" +
            "    \"version\": \"1\"\n" +
            "  },\n" +
            "  {\n" +
            "    \"channel\": \"browser\",\n" +
            "    \"device\": \"xiaomi\",\n" +
            "    \"deviceType\": \"computer\",\n" +
            "    \"event\": \"order\",\n" +
            "    \"ifNewUser\": 1,\n" +
            "    \"ip\": \"35.62.33.25\",\n" +
            "    \"net\": \"wifi5\",\n" +
            "    \"os\": \"ios\",\n" +
            "    \"time\": 1682467668468,\n" +
            "    \"uid\": 5,\n" +
            "    \"version\": \"1\"\n" +
            "  },\n" +
            "  {\n" +
            "    \"channel\": \"browser\",\n" +
            "    \"device\": \"huawei\",\n" +
            "    \"deviceType\": \"mobile\",\n" +
            "    \"event\": \"startup\",\n" +
            "    \"ifNewUser\": 1,\n" +
            "    \"ip\": \"35.62.33.26\",\n" +
            "    \"net\": \"wifi6\",\n" +
            "    \"os\": \"android\",\n" +
            "    \"time\": 1682467668468,\n" +
            "    \"uid\": 3,\n" +
            "    \"version\": \"1\"\n" +
            "  },\n" +
            "  {\n" +
            "    \"channel\": \"browser\",\n" +
            "    \"device\": \"xiaomi\",\n" +
            "    \"deviceType\": \"computer\",\n" +
            "    \"event\": \"order\",\n" +
            "    \"ifNewUser\": 1,\n" +
            "    \"ip\": \"35.62.33.27\",\n" +
            "    \"net\": \"wifi7\",\n" +
            "    \"os\": \"android\",\n" +
            "    \"time\": 1682467668468,\n" +
            "    \"uid\": 3,\n" +
            "    \"version\": \"1\"\n" +
            "  },\n" +
            "  {\n" +
            "    \"channel\": \"huawei mall\",\n" +
            "    \"device\": \"xiaomi\",\n" +
            "    \"deviceType\": \"computer\",\n" +
            "    \"event\": \"startup\",\n" +
            "    \"ifNewUser\": 1,\n" +
            "    \"ip\": \"35.62.33.28\",\n" +
            "    \"net\": \"wifi8\",\n" +
            "    \"os\": \"android\",\n" +
            "    \"time\": 1682467668468,\n" +
            "    \"uid\": 1,\n" +
            "    \"version\": \"1\"\n" +
            "  },\n" +
            "  {\n" +
            "    \"channel\": \"browser\",\n" +
            "    \"device\": \"huawei\",\n" +
            "    \"deviceType\": \"mobile\",\n" +
            "    \"event\": \"order\",\n" +
            "    \"ifNewUser\": 1,\n" +
            "    \"ip\": \"35.62.33.29\",\n" +
            "    \"net\": \"wifi9\",\n" +
            "    \"os\": \"android\",\n" +
            "    \"time\": 1682467668468,\n" +
            "    \"uid\": 8,\n" +
            "    \"version\": \"1\"\n" +
            "  },\n" +
            "  {\n" +
            "    \"channel\": \"browser\",\n" +
            "    \"device\": \"xiaomi\",\n" +
            "    \"deviceType\": \"computer\",\n" +
            "    \"event\": \"startup\",\n" +
            "    \"ifNewUser\": 1,\n" +
            "    \"ip\": \"35.62.33.210\",\n" +
            "    \"net\": \"wifi10\",\n" +
            "    \"os\": \"ios\",\n" +
            "    \"time\": 1682467668468,\n" +
            "    \"uid\": 3,\n" +
            "    \"version\": \"1\"\n" +
            "  },\n" +
            "  {\n" +
            "    \"channel\": \"browser\",\n" +
            "    \"device\": \"xiaomi\",\n" +
            "    \"deviceType\": \"computer\",\n" +
            "    \"event\": \"order\",\n" +
            "    \"ifNewUser\": 1,\n" +
            "    \"ip\": \"35.62.33.11\",\n" +
            "    \"net\": \"wifi11\",\n" +
            "    \"os\": \"android\",\n" +
            "    \"time\": 1682467668468,\n" +
            "    \"uid\": 5,\n" +
            "    \"version\": \"1\"\n" +
            "  },\n" +
            "  {\n" +
            "    \"channel\": \"huawei mall\",\n" +
            "    \"device\": \"huawei\",\n" +
            "    \"deviceType\": \"mobile\",\n" +
            "    \"event\": \"startup\",\n" +
            "    \"ifNewUser\": 1,\n" +
            "    \"ip\": \"35.62.33.12\",\n" +
            "    \"net\": \"wifi12\",\n" +
            "    \"os\": \"android\",\n" +
            "    \"time\": 1682467668468,\n" +
            "    \"uid\": 6,\n" +
            "    \"version\": \"1\"\n" +
            "  },\n" +
            "  {\n" +
            "    \"channel\": \"browser\",\n" +
            "    \"device\": \"xiaomi\",\n" +
            "    \"deviceType\": \"computer\",\n" +
            "    \"event\": \"order\",\n" +
            "    \"ifNewUser\": 1,\n" +
            "    \"ip\": \"35.62.33.13\",\n" +
            "    \"net\": \"wifi13\",\n" +
            "    \"os\": \"android\",\n" +
            "    \"time\": 1682467668468,\n" +
            "    \"uid\": 9,\n" +
            "    \"version\": \"1\"\n" +
            "  },\n" +
            "  {\n" +
            "    \"channel\": \"browser\",\n" +
            "    \"device\": \"xiaomi\",\n" +
            "    \"deviceType\": \"computer\",\n" +
            "    \"event\": \"startup\",\n" +
            "    \"ifNewUser\": 1,\n" +
            "    \"ip\": \"35.62.33.14\",\n" +
            "    \"net\": \"wifi14\",\n" +
            "    \"os\": \"android\",\n" +
            "    \"time\": 1682467668468,\n" +
            "    \"uid\": 7,\n" +
            "    \"version\": \"1\"\n" +
            "  },\n" +
            "  {\n" +
            "    \"channel\": \"browser\",\n" +
            "    \"device\": \"huawei\",\n" +
            "    \"deviceType\": \"mobile\",\n" +
            "    \"event\": \"order\",\n" +
            "    \"ifNewUser\": 1,\n" +
            "    \"ip\": \"35.62.33.15\",\n" +
            "    \"net\": \"wifi15\",\n" +
            "    \"os\": \"ios\",\n" +
            "    \"time\": 1682467668468,\n" +
            "    \"uid\": 9,\n" +
            "    \"version\": \"1\"\n" +
            "  },\n" +
            "  {\n" +
            "    \"channel\": \"huawei mall\",\n" +
            "    \"device\": \"xiaomi\",\n" +
            "    \"deviceType\": \"computer\",\n" +
            "    \"event\": \"startup\",\n" +
            "    \"ifNewUser\": 1,\n" +
            "    \"ip\": \"35.62.33.16\",\n" +
            "    \"net\": \"wifi16\",\n" +
            "    \"os\": \"android\",\n" +
            "    \"time\": 1682467668468,\n" +
            "    \"uid\": 5,\n" +
            "    \"version\": \"1\"\n" +
            "  },\n" +
            "  {\n" +
            "    \"channel\": \"browser\",\n" +
            "    \"device\": \"xiaomi\",\n" +
            "    \"deviceType\": \"computer\",\n" +
            "    \"event\": \"order\",\n" +
            "    \"ifNewUser\": 1,\n" +
            "    \"ip\": \"35.62.33.17\",\n" +
            "    \"net\": \"wifi17\",\n" +
            "    \"os\": \"android\",\n" +
            "    \"time\": 1682467668468,\n" +
            "    \"uid\": 7,\n" +
            "    \"version\": \"1\"\n" +
            "  },\n" +
            "  {\n" +
            "    \"channel\": \"browser\",\n" +
            "    \"device\": \"huawei\",\n" +
            "    \"deviceType\": \"mobile\",\n" +
            "    \"event\": \"startup\",\n" +
            "    \"ifNewUser\": 1,\n" +
            "    \"ip\": \"35.62.33.18\",\n" +
            "    \"net\": \"wifi18\",\n" +
            "    \"os\": \"android\",\n" +
            "    \"time\": 1682467668468,\n" +
            "    \"uid\": 0,\n" +
            "    \"version\": \"1\"\n" +
            "  },\n" +
            "  {\n" +
            "    \"channel\": \"browser\",\n" +
            "    \"device\": \"xiaomi\",\n" +
            "    \"deviceType\": \"computer\",\n" +
            "    \"event\": \"order\",\n" +
            "    \"ifNewUser\": 1,\n" +
            "    \"ip\": \"35.62.33.19\",\n" +
            "    \"net\": \"wifi19\",\n" +
            "    \"os\": \"android\",\n" +
            "    \"time\": 1682467668468,\n" +
            "    \"uid\": 0,\n" +
            "    \"version\": \"1\"\n" +
            "  },\n" +
            "  {\n" +
            "    \"channel\": \"huawei mall\",\n" +
            "    \"device\": \"xiaomi\",\n" +
            "    \"deviceType\": \"computer\",\n" +
            "    \"event\": \"startup\",\n" +
            "    \"ifNewUser\": 1,\n" +
            "    \"ip\": \"35.62.33.20\",\n" +
            "    \"net\": \"wifi20\",\n" +
            "    \"os\": \"ios\",\n" +
            "    \"time\": 1682467668468,\n" +
            "    \"uid\": 3,\n" +
            "    \"version\": \"1\"\n" +
            "  },\n" +
            "  {\n" +
            "    \"channel\": \"browser\",\n" +
            "    \"device\": \"huawei\",\n" +
            "    \"deviceType\": \"mobile\",\n" +
            "    \"event\": \"order\",\n" +
            "    \"ifNewUser\": 1,\n" +
            "    \"ip\": \"35.62.33.21\",\n" +
            "    \"net\": \"wifi21\",\n" +
            "    \"os\": \"android\",\n" +
            "    \"time\": 1682467668468,\n" +
            "    \"uid\": 0,\n" +
            "    \"version\": \"1\"\n" +
            "  },\n" +
            "  {\n" +
            "    \"channel\": \"browser\",\n" +
            "    \"device\": \"xiaomi\",\n" +
            "    \"deviceType\": \"computer\",\n" +
            "    \"event\": \"startup\",\n" +
            "    \"ifNewUser\": 1,\n" +
            "    \"ip\": \"35.62.33.22\",\n" +
            "    \"net\": \"wifi22\",\n" +
            "    \"os\": \"android\",\n" +
            "    \"time\": 1682467668468,\n" +
            "    \"uid\": 4,\n" +
            "    \"version\": \"1\"\n" +
            "  },\n" +
            "  {\n" +
            "    \"channel\": \"browser\",\n" +
            "    \"device\": \"xiaomi\",\n" +
            "    \"deviceType\": \"computer\",\n" +
            "    \"event\": \"order\",\n" +
            "    \"ifNewUser\": 1,\n" +
            "    \"ip\": \"35.62.33.23\",\n" +
            "    \"net\": \"wifi23\",\n" +
            "    \"os\": \"android\",\n" +
            "    \"time\": 1682467668468,\n" +
            "    \"uid\": 5,\n" +
            "    \"version\": \"1\"\n" +
            "  },\n" +
            "  {\n" +
            "    \"channel\": \"huawei mall\",\n" +
            "    \"device\": \"huawei\",\n" +
            "    \"deviceType\": \"mobile\",\n" +
            "    \"event\": \"startup\",\n" +
            "    \"ifNewUser\": 1,\n" +
            "    \"ip\": \"35.62.33.24\",\n" +
            "    \"net\": \"wifi24\",\n" +
            "    \"os\": \"android\",\n" +
            "    \"time\": 1682467668468,\n" +
            "    \"uid\": 1,\n" +
            "    \"version\": \"1\"\n" +
            "  },\n" +
            "  {\n" +
            "    \"channel\": \"browser\",\n" +
            "    \"device\": \"xiaomi\",\n" +
            "    \"deviceType\": \"computer\",\n" +
            "    \"event\": \"order\",\n" +
            "    \"ifNewUser\": 0,\n" +
            "    \"ip\": \"35.62.33.25\",\n" +
            "    \"net\": \"wifi25\",\n" +
            "    \"os\": \"ios\",\n" +
            "    \"time\": 1682467668468,\n" +
            "    \"uid\": 6,\n" +
            "    \"version\": \"1\"\n" +
            "  },\n" +
            "  {\n" +
            "    \"channel\": \"browser\",\n" +
            "    \"device\": \"xiaomi\",\n" +
            "    \"deviceType\": \"computer\",\n" +
            "    \"event\": \"startup\",\n" +
            "    \"ifNewUser\": 0,\n" +
            "    \"ip\": \"35.62.33.26\",\n" +
            "    \"net\": \"wifi26\",\n" +
            "    \"os\": \"android\",\n" +
            "    \"time\": 1682467668468,\n" +
            "    \"uid\": 8,\n" +
            "    \"version\": \"1\"\n" +
            "  },\n" +
            "  {\n" +
            "    \"channel\": \"browser\",\n" +
            "    \"device\": \"huawei\",\n" +
            "    \"deviceType\": \"mobile\",\n" +
            "    \"event\": \"order\",\n" +
            "    \"ifNewUser\": 0,\n" +
            "    \"ip\": \"35.62.33.27\",\n" +
            "    \"net\": \"wifi27\",\n" +
            "    \"os\": \"android\",\n" +
            "    \"time\": 1682467668468,\n" +
            "    \"uid\": 2,\n" +
            "    \"version\": \"1\"\n" +
            "  },\n" +
            "  {\n" +
            "    \"channel\": \"huawei mall\",\n" +
            "    \"device\": \"xiaomi\",\n" +
            "    \"deviceType\": \"computer\",\n" +
            "    \"event\": \"startup\",\n" +
            "    \"ifNewUser\": 0,\n" +
            "    \"ip\": \"35.62.33.28\",\n" +
            "    \"net\": \"wifi28\",\n" +
            "    \"os\": \"android\",\n" +
            "    \"time\": 1682467668468,\n" +
            "    \"uid\": 6,\n" +
            "    \"version\": \"1\"\n" +
            "  },\n" +
            "  {\n" +
            "    \"channel\": \"browser\",\n" +
            "    \"device\": \"xiaomi\",\n" +
            "    \"deviceType\": \"computer\",\n" +
            "    \"event\": \"order\",\n" +
            "    \"ifNewUser\": 0,\n" +
            "    \"ip\": \"35.62.33.29\",\n" +
            "    \"net\": \"wifi29\",\n" +
            "    \"os\": \"android\",\n" +
            "    \"time\": 1682467668468,\n" +
            "    \"uid\": 0,\n" +
            "    \"version\": \"1\"\n" +
            "  },\n" +
            "  {\n" +
            "    \"channel\": \"browser\",\n" +
            "    \"device\": \"huawei\",\n" +
            "    \"deviceType\": \"mobile\",\n" +
            "    \"event\": \"startup\",\n" +
            "    \"ifNewUser\": 0,\n" +
            "    \"ip\": \"35.62.33.30\",\n" +
            "    \"net\": \"wifi30\",\n" +
            "    \"os\": \"ios\",\n" +
            "    \"time\": 1682467668468,\n" +
            "    \"uid\": 5,\n" +
            "    \"version\": \"1\"\n" +
            "  },\n" +
            "  {\n" +
            "    \"channel\": \"browser\",\n" +
            "    \"device\": \"xiaomi\",\n" +
            "    \"deviceType\": \"computer\",\n" +
            "    \"event\": \"order\",\n" +
            "    \"ifNewUser\": 0,\n" +
            "    \"ip\": \"35.62.33.31\",\n" +
            "    \"net\": \"wifi31\",\n" +
            "    \"os\": \"android\",\n" +
            "    \"time\": 1682467668468,\n" +
            "    \"uid\": 2,\n" +
            "    \"version\": \"1\"\n" +
            "  },\n" +
            "  {\n" +
            "    \"channel\": \"huawei mall\",\n" +
            "    \"device\": \"xiaomi\",\n" +
            "    \"deviceType\": \"computer\",\n" +
            "    \"event\": \"startup\",\n" +
            "    \"ifNewUser\": 0,\n" +
            "    \"ip\": \"35.62.33.32\",\n" +
            "    \"net\": \"wifi32\",\n" +
            "    \"os\": \"android\",\n" +
            "    \"time\": 1682467668468,\n" +
            "    \"uid\": 7,\n" +
            "    \"version\": \"1\"\n" +
            "  },\n" +
            "  {\n" +
            "    \"channel\": \"browser\",\n" +
            "    \"device\": \"huawei\",\n" +
            "    \"deviceType\": \"mobile\",\n" +
            "    \"event\": \"order\",\n" +
            "    \"ifNewUser\": 0,\n" +
            "    \"ip\": \"35.62.33.33\",\n" +
            "    \"net\": \"wifi33\",\n" +
            "    \"os\": \"android\",\n" +
            "    \"time\": 1682467668468,\n" +
            "    \"uid\": 5,\n" +
            "    \"version\": \"1\"\n" +
            "  },\n" +
            "  {\n" +
            "    \"channel\": \"browser\",\n" +
            "    \"device\": \"xiaomi\",\n" +
            "    \"deviceType\": \"computer\",\n" +
            "    \"event\": \"startup\",\n" +
            "    \"ifNewUser\": 0,\n" +
            "    \"ip\": \"35.62.33.34\",\n" +
            "    \"net\": \"wifi34\",\n" +
            "    \"os\": \"android\",\n" +
            "    \"time\": 1682467668468,\n" +
            "    \"uid\": 9,\n" +
            "    \"version\": \"1\"\n" +
            "  },\n" +
            "  {\n" +
            "    \"channel\": \"browser\",\n" +
            "    \"device\": \"xiaomi\",\n" +
            "    \"deviceType\": \"computer\",\n" +
            "    \"event\": \"order\",\n" +
            "    \"ifNewUser\": 0,\n" +
            "    \"ip\": \"35.62.33.35\",\n" +
            "    \"net\": \"wifi35\",\n" +
            "    \"os\": \"ios\",\n" +
            "    \"time\": 1682467668468,\n" +
            "    \"uid\": 7,\n" +
            "    \"version\": \"1\"\n" +
            "  },\n" +
            "  {\n" +
            "    \"channel\": \"huawei mall\",\n" +
            "    \"device\": \"huawei\",\n" +
            "    \"deviceType\": \"mobile\",\n" +
            "    \"event\": \"startup\",\n" +
            "    \"ifNewUser\": 0,\n" +
            "    \"ip\": \"35.62.33.36\",\n" +
            "    \"net\": \"wifi36\",\n" +
            "    \"os\": \"android\",\n" +
            "    \"time\": 1682467668468,\n" +
            "    \"uid\": 8,\n" +
            "    \"version\": \"1\"\n" +
            "  },\n" +
            "  {\n" +
            "    \"channel\": \"browser\",\n" +
            "    \"device\": \"xiaomi\",\n" +
            "    \"deviceType\": \"computer\",\n" +
            "    \"event\": \"order\",\n" +
            "    \"ifNewUser\": 0,\n" +
            "    \"ip\": \"35.62.33.37\",\n" +
            "    \"net\": \"wifi37\",\n" +
            "    \"os\": \"android\",\n" +
            "    \"time\": 1682467668468,\n" +
            "    \"uid\": 8,\n" +
            "    \"version\": \"1\"\n" +
            "  },\n" +
            "  {\n" +
            "    \"channel\": \"browser\",\n" +
            "    \"device\": \"xiaomi\",\n" +
            "    \"deviceType\": \"computer\",\n" +
            "    \"event\": \"startup\",\n" +
            "    \"ifNewUser\": 0,\n" +
            "    \"ip\": \"35.62.33.38\",\n" +
            "    \"net\": \"wifi38\",\n" +
            "    \"os\": \"android\",\n" +
            "    \"time\": 1682467668468,\n" +
            "    \"uid\": 4,\n" +
            "    \"version\": \"1\"\n" +
            "  },\n" +
            "  {\n" +
            "    \"channel\": \"browser\",\n" +
            "    \"device\": \"huawei\",\n" +
            "    \"deviceType\": \"mobile\",\n" +
            "    \"event\": \"order\",\n" +
            "    \"ifNewUser\": 0,\n" +
            "    \"ip\": \"35.62.33.39\",\n" +
            "    \"net\": \"wifi39\",\n" +
            "    \"os\": \"android\",\n" +
            "    \"time\": 1682467668468,\n" +
            "    \"uid\": 4,\n" +
            "    \"version\": \"1\"\n" +
            "  },\n" +
            "  {\n" +
            "    \"channel\": \"huawei mall\",\n" +
            "    \"device\": \"xiaomi\",\n" +
            "    \"deviceType\": \"computer\",\n" +
            "    \"event\": \"startup\",\n" +
            "    \"ifNewUser\": 0,\n" +
            "    \"ip\": \"35.62.33.40\",\n" +
            "    \"net\": \"wifi40\",\n" +
            "    \"os\": \"ios\",\n" +
            "    \"time\": 1682467668468,\n" +
            "    \"uid\": 6,\n" +
            "    \"version\": \"1\"\n" +
            "  }\n" +
            "]\n" +
            "\n";


    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        env.setParallelism(3);

        simpleSourceAndSink(env);


    }

    private static void simpleSourceAndSink(StreamExecutionEnvironment env) throws Exception {


        env.addSource(new RichSourceFunction<User>() {
                    @Override
                    public void run(SourceContext<User> ctx) throws Exception {
                        List<User> users = JSON.parseArray(datasource, User.class);
                        users.forEach(user -> ctx.collect(user));
                    }

                    @Override
                    public void cancel() {

                    }

                })
                .map(new RichMapFunction<User, Tuple3<String, Integer, Integer>>() {

                    @Override
                    public Tuple3<String, Integer, Integer> map(User value) {
                        return Tuple3.of(value.os,value.ifNewUser,1);
                    }
                })
                .keyBy(new KeySelector<Tuple3<String, Integer, Integer>, Tuple2<String,Integer>>() {
                    @Override
                    public Tuple2<String, Integer> getKey(Tuple3<String, Integer, Integer> value) throws Exception {
                        return Tuple2.of(value.f0,value.f1);
                    }
                })
                .sum(2)
                .print();

        env.execute("个人练习2年半的flink demo");
    }
}

十一.时间与窗口

为啥要有这东西呢,虽然这是实时计算流处理框架,但是批处理同样的支持

比如我要针对五秒一批的数据 进行分析统计 比如我要对 10个数据一批 进行操作

11.1时间

这里说的时间的主体是说的数据,分为三种

数据产生时间

数据进入flink的时间

数据在flink中被处理的时间

11.2窗口

 

 

 滚动窗口进行 5秒一批的交易额统计 数据以空格分隔第二位为 订单的交易额

开启9527端口

[root@localhost ~]# nc -lk 9527
aa 45
cc 23
asd 12
dfg 678
a 2
5 34
package com.juege.timeandwindows;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

// Press Shift twice to open the Search Everywhere dialog and type `show whitespaces`,
// then press Enter. You can now see whitespace characters in your code.
public class TublingWindowDemoApp {


    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        env.setParallelism(3);

        simpleSourceAndSink(env);


    }

    private static void simpleSourceAndSink(StreamExecutionEnvironment env) throws Exception {

        env.socketTextStream("192.168.56.10",9527)
                .map(new MapFunction<String, Tuple2<String,Integer>>() {

                    @Override
                    public Tuple2<String, Integer> map(String value) throws Exception {
                        String[] strings = value.split(" ");
                        return Tuple2.of(strings[0],Integer.valueOf(strings[1]));
                    }
                })
//                .keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
//
//                    @Override
//                    public String getKey(Tuple2<String, Integer> value) throws Exception {
//                        return value.f0;
//                    }
//                })
                .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(8)))
                .sum(1)
                .print();

        env.execute("个人练习2年半的flink demo");

    }
}

可以看到控制台输出结果如下

4> (aa,80)
5> (dfg,714)

11.3滚动窗口与滑动窗口的区别

 上面我们演示的滚动窗口,那么滑动窗口跟滚动窗口的区别是什么呢,在上面的滚动窗口中

每8秒滚动一次,同一个数据不会出现在两个窗口中,若是我们使用滑动窗口,我们设置滑动的

时间与窗口时间长度相同那么就等于滚动窗口,若是滑动间隔时间小于窗口大小,那么处于前一窗口后面的数据仍可能处于下一窗口中

11.4watermark简介  及与eventTime搭配 简单demo

watermark中文水位线的意思,与 eventTime搭配使用,watermark是触发窗口关闭的时机,

比方说现在 5秒为一个窗口 那么 来了一条超过4999毫秒的数据时则会关闭上个窗口开始计算,

如果加了watermark 比如说 为 2秒 那么 就会在来的数据eventtime大于等于 6999毫秒时才会触发上个窗口的关闭,关闭的同时只会计算5秒窗口内的数据

代码如下 五秒做一次 keyby+sum

package com.juege.timeandwindows;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

// Press Shift twice to open the Search Everywhere dialog and type `show whitespaces`,
// then press Enter. You can now see whitespace characters in your code.
public class WatermarkDemoApp {


    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        env.setParallelism(1);

        simpleSourceAndSink(env);


    }

    private static void simpleSourceAndSink(StreamExecutionEnvironment env) throws Exception {

        env.socketTextStream("192.168.56.10", 9527)
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {
                    @Override
                    public long extractTimestamp(String element) {
                        return Long.valueOf(element.split(" ")[0]);
                    }
                })
                .map(new MapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> map(String value) throws Exception {
                        String[] splitStr = value.split(" ");
                        return Tuple2.of(splitStr[1], Integer.valueOf(splitStr[2]));
                    }
                })
                .keyBy(t -> t.f0)
                .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
                .sum(1)
                .print();


        env.execute("个人练习2年半的flink demo");

    }
}

 192.168.56.10 上的输入

[root@localhost ~]# nc -lk 9527
1000 2 15
2300 3 2
3400 2 3
4999 3 1
5200 a 1
5290 a 2
8900 a 4
9999 a 1
12000 b 2 
13000 b 4
14999 b 1
15200 a 1
16000 b 1
17000 a 1
18000 b 2
19999 a 3

 控制台输出结果如下,奇怪的是虽然做了sum 但是keyby没成功,原因待后续查证

11.5sideOutput

首先 中文翻译可以为 侧输出,他会把如下图所示,本应该在上个window执行的数据 但是因为网络等原因 在下个或者更后面的窗口才进入流,导致得不到处理这个时候我们的sideOutput派上用场

待会直接看看代码示例你就懂了

 代码示例

package com.juege.timeandwindows;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

public class SideoutputDemoApp {


    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        env.setParallelism(1);

        simpleSourceAndSink(env);


    }

    private static void simpleSourceAndSink(StreamExecutionEnvironment env) throws Exception {

        final OutputTag<Tuple2<String, Integer>> outputTag = new OutputTag<>("side-output") {
        };

        SingleOutputStreamOperator<String> streamOperator = env.socketTextStream("192.168.56.10", 9527)
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {
                    @Override
                    public long extractTimestamp(String element) {
                        return Long.valueOf(element.split(" ")[0]);
                    }
                })
                .map(new MapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> map(String value) throws Exception {
                        String[] splitStr = value.split(" ");
                        return Tuple2.of(splitStr[1], Integer.valueOf(splitStr[2]));
                    }
                })
                .keyBy(t -> t.f0)
                .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
                .sideOutputLateData(outputTag)
                .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
                            @Override
                            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {

                                return Tuple2.of(value1.f0, value1.f1 + value2.f1);
                            }
                        },
                        new ProcessAllWindowFunction<Tuple2<String, Integer>, String, TimeWindow>() {
                            @Override
                            public void process(ProcessAllWindowFunction<Tuple2<String, Integer>, String, TimeWindow>.Context context, Iterable<Tuple2<String, Integer>> elements, Collector<String> out) throws Exception {
                                elements.forEach(t -> out.collect("key:" + t.f0 + "value:" + t.f1));

                            }
                        });

                    streamOperator.print();

                    //测流数据打印
                    streamOperator.getSideOutput(outputTag).printToErr();


        env.execute("个人练习2年半的flink demo");

    }
}

  192.168.56.10 上的输入

[root@localhost ~]# nc -lk 9527
1000 aa 1
2000 aa 2
4000 aa 3
4999 aa 4
3000 aa 2
4560 aa 1

控制台输出,可以看到 侧流数据我用红体字打印出来了,本该出现在第一个5秒窗口的两笔数据因为延迟到达,错过了窗口的开放期,所以被收集到了侧流中去了,实际项目开发中我们一般会专门对窗口或者侧流数据进行相应处理而不是这里的简单的打印

十二.state

在flink中state用来存放计算过程的节点中间结果或元数据等

12.1ValueState求小批次数据平均值代码示例

实现的效果就是数据源数据类型为Tuple2,以f0分组 每三个数做一次平均数计算 然后 sink输出

package com.juege.state;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.ArrayList;

public class ValueStateDemoApp {


    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        simpleSourceAndSink(env);
    }

    private static void simpleSourceAndSink(StreamExecutionEnvironment env) throws Exception {

        ArrayList<Tuple2<Long, Long>> data = new ArrayList<>();
        data.add(Tuple2.of(1L,1L));
        data.add(Tuple2.of(1L,3L));
        data.add(Tuple2.of(2L,1L));
        data.add(Tuple2.of(2L,6L));
        data.add(Tuple2.of(2L,7L));
        data.add(Tuple2.of(1L,33L));


        env.fromCollection(data)
                .keyBy(t -> t.f0)
                .flatMap(new MyFlatMapFunction())
                .print();

        env.execute("个人练习2年半的flink demo");


    }
}
package com.juege.state;

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;

import java.io.IOException;

public class MyFlatMapFunction extends RichFlatMapFunction<Tuple2<Long,Long>,Tuple2<Long,Double>> {


    private transient ValueState<Tuple2<Long,Long>> valueState;

    @Override
    public void open(Configuration parameters) throws IOException {

        ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("AVG", TypeInformation.of(new TypeHint<Tuple2<Long,Long>>(){}));
        valueState = getRuntimeContext().getState(valueStateDescriptor);
    }

    @Override
    public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Double>> out) throws Exception {
        //f0 = count  f1 = sum
        Tuple2<Long, Long> sumState = valueState.value();
        if(sumState == null){
            valueState.update(Tuple2.of(0L,0L));
        }
        sumState = valueState.value();
        sumState.f0 += 1;
        sumState.f1 += value.f1;
        valueState.update(sumState);

        if(sumState.f0 >= 3){
            out.collect(Tuple2.of(value.f0,sumState.f1/sumState.f0.doubleValue()));
            valueState.clear();
        }
    }
}

输出结果如下

12.2把以上代码改成MapState方式

很简单,valuestate就是只能存值 而 MapState 还可以多存一个键

那么,把上面这个例子改造的话 就加个没用的键就好了,我们这选UUID

package com.juege.state;

import org.apache.commons.compress.utils.Lists;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;

import java.io.IOException;
import java.util.ArrayList;
import java.util.UUID;

public class MyMapStateFlatMapFunction extends RichFlatMapFunction<Tuple2<Long,Long>,Tuple2<Long,Double>> {


    private transient MapState<String,Long> mapState;

    @Override
    public void open(Configuration parameters) throws IOException {
        MapStateDescriptor mapStateDescriptor = new MapStateDescriptor("AVG",  TypeInformation.of(new TypeHint<String>(){}), TypeInformation.of(new TypeHint<Tuple2<String, Long>>(){}));

        mapState = getRuntimeContext().getMapState(mapStateDescriptor);
    }

    @Override
    public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Double>> out) throws Exception {

        mapState.put(UUID.randomUUID().toString(),value.f1);

        ArrayList<Long> arrayList = Lists.newArrayList(mapState.values().iterator());

        if(arrayList.size() >= 3){
            int sum = 0;
            for (int i = 0; i < arrayList.size(); i++) {
                sum+=arrayList.get(i);
            }

            out.collect(Tuple2.of(value.f0,sum/Double.valueOf(arrayList.size())));
            mapState.clear();
        }
    }
}

 

12.3Checkpoint

checkpoint 中文直译 检查点 是一种状态的容错 机制, 就比如说 你的流消费数据到一半,然后突然异常停止了,那么恢复后也能从之前那个点继续消费,这也是流式处理框架的必备

下面举例说明几种应用场景

12.3.1异常不自动停止程序

如下代码监听 socket "192.168.56.10", 9527端口 的数据

package com.juege.checkpoint;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

// Press Shift twice to open the Search Everywhere dialog and type `show whitespaces`,
// then press Enter. You can now see whitespace characters in your code.
public class CheckpointDemoApp {


    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        env.setParallelism(1);

        simpleSourceAndSink(env);


    }

    private static void simpleSourceAndSink(StreamExecutionEnvironment env) throws Exception {

        env.socketTextStream("192.168.56.10", 9527)
                .map(new MapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> map(String value) throws Exception {
                        String[] splitStr = value.split(" ");
                        return Tuple2.of(splitStr[1], Integer.valueOf(splitStr[2]));
                    }
                })
                .keyBy(t -> t.f0)
                .sum(1)
                .print();


        env.execute("个人练习2年半的flink demo");

    }
}

由于我在对应服务器并没有开启所以报错 

 加了checkpoint会不断的重试不会终止程序

比如如下就是 输入pk抛异常 但是程序不终止,继续输入后 还可以与之前的结果累加

12.3.2checkpoint配合重启策略

最大重启三次 每五秒重启一次  

env.enableCheckpointing(5000);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,
Time.seconds(5)));

12.4State Backends

 人话就是 state数据存哪 默认是存内存,也就是下图第一种,重启就会丢 所以生产不用

然后还有fs 文件系统 或者rocksDB

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包

打赏作者

我才是真的封不觉

你的鼓励将是我创作的最大动力

¥1 ¥2 ¥4 ¥6 ¥10 ¥20
扫码支付:¥1
获取中
扫码支付

您的余额不足,请更换扫码支付或充值

打赏作者

实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值