StreamTableEnvironment主要是流式数据处理,本文主要实现GroupBy Window Aggregation、Over Window aggregation对应sql api的代码实现,同时实现流数据表join 连接。
代码中使用socket方式作为source数据输入方式,闲话不说,具体代码如下:
一、依赖的实体类以及ScalarFunction信息:
import lombok.Data;
import java.sql.Timestamp;
@Data
public class StudentInfo{
private String name;
private String sex;
private String course;
private Float score;
private Long timestamp; //sql 关键字,sql中不可以直接使用
private Timestamp sysDate;
}
import lombok.Data;
@Data
public class Teacher {
private String teacherName;
private String studentName;
}
import lombok.Data;
import java.sql.Timestamp;
@Data
public class StudentAndTeacher {
private String name;
private String sex;
private String course;
private Float score;
private Timestamp sysDate;
private String teacherName;
}
import org.apache.flink.table.functions.ScalarFunction;
import java.sql.Timestamp;
public class UTC2Local extends ScalarFunction {
public Timestamp eval(Timestamp s) {
long timestamp = s.getTime() + 28800000; flink默认的是UTC时间,我们的时区是东八区,时间戳需要增加八个小时
return new Timestamp(timestamp);
}
}
二、实现GroupBy Window Aggregation、Over Window aggregation对应sql操作:
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import javax.annotation.Nullable;
import java.sql.Timestamp;
public class FlinkTableSQLStreamingExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
//source,这里使用socket连接获取数据
DataStreamSource<String> textStudent = env.socketTextStream("127.0.0.1", 9999, "\n");
//处理输入数据流,转换为StudentInfo类型,方便后续处理
SingleOutputStreamOperator<StudentInfo> dataStreamStudent = textStudent.flatMap(new FlatMapFunction<String, StudentInfo>() {
@Override
public void flatMap(String s, Collector<StudentInfo> collector){
String infos[] = s.split(",");
if(StringUtils.isNotBlank(s) && infos.length==5){
StudentInfo studentInfo = new StudentInfo();
studentInfo.setName(infos[0]);
studentInfo.setSex(infos[1]);
studentInfo.setCourse(infos[2]);
studentInfo.setScore(Float.parseFloat(infos[3]));
studentInfo.setTimestamp(Long.parseLong(infos[4]));
studentInfo.setSysDate(new Timestamp(studentInfo.getTimestamp())); //
collector.collect(studentInfo);
}
}
});
DataStream<StudentInfo> dataStream = dataStreamStudent.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<StudentInfo>() {
private final long maxTimeLag = 5000; // 5 seconds
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(System.currentTimeMillis() - maxTimeLag);
}
@Override
public long extractTimestamp(StudentInfo studentInfo, long l) {
return studentInfo.getTimestamp();
}
});
//注册dataStreamStudent流到表中,表名为:studentInfo
tEnv.registerDataStream("studentInfo",dataStream,"name,sex,course,score,timestamp,sysDate.rowtime");
tEnv.registerFunction("utc2local",new UTC2Local());
//GroupBy Window Aggregation 根据name分组,统计学科数量
Table groupWindAggrTable = tEnv.sqlQuery("SELECT " +
"utc2local(TUMBLE_START(sysDate, INTERVAL '2' MINUTE)) as wStart, " +
"name, SUM(score) as score " +
"FROM studentInfo " +
"GROUP BY TUMBLE(sysDate, INTERVAL '2' MINUTE), name");
DataStream<Row> groupWindAggrTableResult = tEnv.toAppendStream(groupWindAggrTable, Row.class);
groupWindAggrTableResult.print();
Table overWindowAggr = tEnv.sqlQuery(
"select name,count(course) over(" +
" partition by name order by sysDate" +
" rows between 2 preceding and current row" +
")" +
"from studentInfo");
DataStream<Row> overWindowAggrResult = tEnv.toAppendStream(overWindowAggr, Row.class);
overWindowAggrResult.print();
Table overWindowAggr2 = tEnv.sqlQuery(
"select name,COUNT(course) OVER w ,SUM(score) OVER w " +
"from studentInfo " +
"window w as ( " +
" partition by name " +
" order by sysDate " +
" rows between 2 preceding and current row " +
") ");
DataStream<Tuple2<Boolean, Row>> overWindowAggr2Result = tEnv.toRetractStream(overWindowAggr2, Row.class);
overWindowAggr2Result.print();
env.execute("studentScoreAnalyse");
}
}
输入数据信息:
输出结果信息:
三、表join 连接,以及在连接之后使用GroupBy Window:
Rowtime属性不能在常规联接的输入行中。作为解决方法,您可以将输入表的时间属性强制转换在连接操作之后,或者操作之前。
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import javax.annotation.Nullable;
import java.sql.Timestamp;
public class FlinkTableSQLStreamingJoinExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
//source,这里使用socket连接获取数据
DataStreamSource<String> textStudent = env.socketTextStream("127.0.0.1", 9999, "\n");
DataStreamSource<String> textTeacher = env.socketTextStream("127.0.0.1", 9998, "\n");
//处理输入数据流,转换为StudentInfo类型,方便后续处理
SingleOutputStreamOperator<StudentInfo> dataStreamStudent = textStudent.flatMap(new FlatMapFunction<String, StudentInfo>() {
@Override
public void flatMap(String s, Collector<StudentInfo> collector){
String infos[] = s.split(",");
if(StringUtils.isNotBlank(s) && infos.length==5){
StudentInfo studentInfo = new StudentInfo();
studentInfo.setName(infos[0]);
studentInfo.setSex(infos[1]);
studentInfo.setCourse(infos[2]);
studentInfo.setScore(Float.parseFloat(infos[3]));
studentInfo.setTimestamp(Long.parseLong(infos[4]));
studentInfo.setSysDate(new Timestamp(studentInfo.getTimestamp())); //
collector.collect(studentInfo);
}
}
});
//注册dataStreamStudent流到表中,表名为:studentInfo
tEnv.registerDataStream("studentInfo",dataStreamStudent,"name,sex,course,score,timestamp,sysDate");
//处理教师流
SingleOutputStreamOperator<Teacher> dataStreamTeacher = textTeacher.flatMap(new FlatMapFunction<String, Teacher>() {
@Override
public void flatMap(String s, Collector<Teacher> collector){
String infos[] = s.split(",");
if(StringUtils.isNotBlank(s) && infos.length==2){
Teacher teacher = new Teacher();
teacher.setTeacherName(infos[0]);
teacher.setStudentName(infos[1]);
collector.collect(teacher);
}
}
});
tEnv.registerTable("teacher",tEnv.fromDataStream(dataStreamTeacher));
Table joinTable = tEnv.sqlQuery("SELECT name,sex,course,score,teacherName,sysDate from ( " +
" select * " +
" FROM studentInfo LEFT JOIN teacher " +
" ON studentInfo.name = teacher.studentName)");
DataStream<Tuple2<Boolean, StudentAndTeacher>> joinTableResult = tEnv.toRetractStream(joinTable,StudentAndTeacher.class);
joinTableResult.print();
//此处主要是数据流类型转换,由Tuple2<Boolean, StudentAndTeacher>转换为StudentAndTeacher类型,主要是为了后面时间窗口操作,设置rowtime信息,
//Tuple2<Boolean, StudentAndTeacher>转化为table类型时,默认对应字段为f0、f1
SingleOutputStreamOperator<StudentAndTeacher> dataStreamFormat = joinTableResult.flatMap(new FlatMapFunction<Tuple2<Boolean, StudentAndTeacher>, StudentAndTeacher>() {
@Override
public void flatMap(Tuple2<Boolean, StudentAndTeacher> booleanStudentAndTeacherTuple2, Collector<StudentAndTeacher> collector) throws Exception {
collector.collect(booleanStudentAndTeacherTuple2.f1);
}
});
DataStream<StudentAndTeacher> dataStream = dataStreamFormat.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<StudentAndTeacher>() {
private final long maxTimeLag = 5000; // 5 seconds
@Override
public long extractTimestamp(StudentAndTeacher studentAndTeacher, long l) {
return studentAndTeacher.getSysDate().getTime();
}
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(System.currentTimeMillis() - maxTimeLag);
}
});
//注册dataStreamStudent流到表中,表名为:studentInfo
tEnv.registerDataStream("studentAndTeacherInfo",dataStream,"name,sex,course,score,teacherName,sysDate.rowtime");
tEnv.registerFunction("utc2local",new UTC2Local());
//GroupBy Window Aggregation 根据name分组,统计学科数量
Table groupWindAggrTable = tEnv.sqlQuery("SELECT " +
"utc2local(TUMBLE_START(sysDate, INTERVAL '2' MINUTE)) as wStart, " +
"name, SUM(score) as score " +
"FROM studentAndTeacherInfo " +
"GROUP BY TUMBLE(sysDate, INTERVAL '2' MINUTE), name");
DataStream<Row> groupWindAggrTableResult = tEnv.toAppendStream(groupWindAggrTable, Row.class);
groupWindAggrTableResult.print();
env.execute("studentScoreAnalyse");
}
}
输入数据:
输出结果信息: