这一节主要是实践编程StreamTableEnvironment下相关table api的使用信息,代码中模拟输入流采用的是socket数据流输入模式。
实例一:
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
public class FlinkTableApiStreamingExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
//source,这里使用socket连接获取数据
DataStreamSource<String> text = env.socketTextStream("127.0.0.1", 9999, "\n");
//处理输入数据流,转换为StudentInfo类型,方便后续处理
SingleOutputStreamOperator<StudentInfo> dataStreamStudent = text.flatMap(new FlatMapFunction<String, StudentInfo>() {
@Override
public void flatMap(String s, Collector<StudentInfo> collector){
String infos[] = s.split(",");
if(StringUtils.isNotBlank(s) && infos.length==5){
StudentInfo studentInfo = new StudentInfo();
studentInfo.setName(infos[0]);
studentInfo.setSex(infos[1]);
studentInfo.setCourse(infos[2]);
studentInfo.setScore(Float.parseFloat(infos[3]));
studentInfo.setTimestamp(Long.parseLong(infos[4]));
collector.collect(studentInfo);
}
}
});
//注册dataStreamStudent流到表中,表名为:studentInfo
tEnv.registerDataStream("studentInfo",dataStreamStudent,"name,sex,course,score,timestamp");
//GroupBy Aggregation 根据name分组,统计学科数量
Table counts = tEnv.scan("studentInfo")
.groupBy("name")
.select("name, course.count as cnt");
DataStream<Tuple2<Boolean, Row>> resultCountsAggr = tEnv.toRetractStream(counts, Row.class);
resultCountsAggr.print();
//GroupBy Aggregation distinct 根据name分组,统计学科数量
Table groupByDistinctResult = tEnv.scan("studentInfo")
.groupBy("name")
.select("name, score.sum.distinct as d");
DataStream<Tuple2<Boolean, Row>> resultDistinctAggr = tEnv.toRetractStream(groupByDistinctResult, Row.class);
resultDistinctAggr.print();
env.execute("studentScoreAnalyse");
}
}
输入数据信息如下:
返回结果信息如下:
1、第一个返回数据如下
2、第二个返回结果如下:
实例二:
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.table.api.Over;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import javax.annotation.Nullable;
public class FlinkTableApiStreamingWatermarkExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
//source,这里使用socket连接获取数据
DataStreamSource<String> text = env.socketTextStream("127.0.0.1", 9999, "\n");
//处理输入数据流,转换为StudentInfo类型,方便后续处理
SingleOutputStreamOperator<StudentInfo> dataStreamStudent = text.flatMap(new FlatMapFunction<String, StudentInfo>() {
@Override
public void flatMap(String s, Collector<StudentInfo> collector){
String infos[] = s.split(",");
if(StringUtils.isNotBlank(s) && infos.length==5){
StudentInfo studentInfo = new StudentInfo();
studentInfo.setName(infos[0]);
studentInfo.setSex(infos[1]);
studentInfo.setCourse(infos[2]);
studentInfo.setScore(Float.parseFloat(infos[3]));
studentInfo.setTimestamp(Long.parseLong(infos[4]));
collector.collect(studentInfo);
}
}
});
//以下实例采用时间窗口模式,需要设置时间属性,否则代码报错
//EventTime
DataStream<StudentInfo> dataStream = dataStreamStudent.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<StudentInfo>() {
private final long maxTimeLag = 5000; // 5 seconds
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(System.currentTimeMillis() - maxTimeLag);
}
@Override
public long extractTimestamp(StudentInfo studentInfo, long l) {
return studentInfo.getTimestamp();
}
});
//注册dataStreamStudent流到表中,表名为:studentInfo
Table tableEvent = tEnv.fromDataStream(dataStream, "name,sex,course,score,timestamp.rowtime");
//GroupBy Window
Table resultGroupByWindow = tableEvent
.filter("name.isNotNull && course.isNotNull ")
// .select("name.lowerCase() as name, course, utc2local(timestamp) as timestamp")
.window(Tumble.over("1.minutes").on("timestamp").as("hourlyWindow"))
.groupBy("hourlyWindow, name, course")
.select("name, hourlyWindow.end, hourlyWindow.start,hourlyWindow.rowtime as hour, course, course.count as courseCount");
DataStream<Row> result2 = tEnv.toAppendStream(resultGroupByWindow, Row.class);
result2.print();
//GroupBy Window Over
Table resultOverWindow = tableEvent
.window(Over
.partitionBy("name")
.orderBy("timestamp")
.preceding("1.minutes")
.following("CURRENT_RANGE")
.as("w"))
.select("name, score.avg over w,score.max over w, score.min over w"); // sliding aggregate
DataStream<Row> resultOver = tEnv.toAppendStream(resultOverWindow, Row.class);
resultOver.print();
// Distinct aggregation on time window group by BatchTableEnvironment不支持
Table groupByWindowDistinctResult = tableEvent
.window(Tumble.over("1.minutes").on("timestamp").as("w")).groupBy("name,w")
.select("name, score.sum.distinct as d");
DataStream<Row> resultDistinct = tEnv.toAppendStream(groupByWindowDistinctResult, Row.class);
resultDistinct.print();
//
// Distinct aggregation on over window TODO
Table resultOverWindowDistinct = tableEvent
.window(Over
.partitionBy("name")
.orderBy("timestamp")
.preceding("1.minutes")
.as("w"))
.select("name, score.sum.distinct over w, score.max over w, score.min over w");
env.execute("studentScoreAnalyse");
}
}
输入数据信息:
输出数据信息: